Storm source code analysis (10)

null_ wry 2022-02-13 07:59:08 阅读数:901

storm source code analysis


[email protected]

NoneGrouping

This method is used when parallel processing load balancing strategy is not concerned , At present, it is equivalent to Shuffle Grouping, in addition Storm It will put bolt The task and its upstream data providing task are arranged in the same thread .

 public static class NoneGrouping implements CustomStreamGrouping {

private final Random random;
private List<Integer> targetTasks;
private int numTasks;
public NoneGrouper() {

random = new Random();
}
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {

this.targetTasks = targetTasks;
this.numTasks = targetTasks.size();
}
@Override
public List<Integer> chooseTasks(int taskId, List<Object> values) {

int index = random.nextInt(numTasks);
return Collections.singletonList(targetTasks.get(index));
}
}

Through here random.nextInt(numTasks) Random selection task

AllGrouping

Broadcast transmission , For each of these tuple, be-all bolts Will receive .

public static class AllGrouping implements CustomStreamGrouping {

private List<Integer> targetTasks;
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {

this.targetTasks = targetTasks;
}
@Override
public List<Integer> chooseTasks(int taskId, List<Object> values) {

return targetTasks;
}
}

This returns all targetTasks.

PartialKeyGrouping

public class PartialKeyGrouping implements CustomStreamGrouping, Serializable {

private static final long serialVersionUID = -1672360572274911808L;
private List<Integer> targetTasks;
private Fields fields = null;
private Fields outFields = null;
private AssignmentCreator assignmentCreator;
private TargetSelector targetSelector;
public PartialKeyGrouping() {

this(null);
}
public PartialKeyGrouping(Fields fields) {

this(fields, new RandomTwoTaskAssignmentCreator(), new BalancedTargetSelector());
}
public PartialKeyGrouping(Fields fields, AssignmentCreator assignmentCreator) {

this(fields, assignmentCreator, new BalancedTargetSelector());
}
public PartialKeyGrouping(Fields fields, AssignmentCreator assignmentCreator, TargetSelector targetSelector) {

this.fields = fields;
this.assignmentCreator = assignmentCreator;
this.targetSelector = targetSelector;
}
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {

this.targetTasks = targetTasks;
if (this.fields != null) {

this.outFields = context.getComponentOutputFields(stream);
}
}
@Override
public List<Integer> chooseTasks(int taskId, List<Object> values) {

List<Integer> boltIds = new ArrayList<>(1);
if (values.size() > 0) {

final byte[] rawKeyBytes = getKeyBytes(values);
final int[] taskAssignmentForKey = assignmentCreator.createAssignment(this.targetTasks, rawKeyBytes);
final int selectedTask = targetSelector.chooseTask(taskAssignmentForKey);
boltIds.add(selectedTask);
}
return boltIds;
}
private byte[] getKeyBytes(List<Object> values) {

byte[] raw;
if (fields != null) {

List<Object> selectedFields = outFields.select(fields, values);
ByteBuffer out = ByteBuffer.allocate(selectedFields.size() * 4);
for (Object o : selectedFields) {

if (o instanceof List) {

out.putInt(Arrays.deepHashCode(((List) o).toArray()));
} else if (o instanceof Object[]) {

out.putInt(Arrays.deepHashCode((Object[]) o));
} else if (o instanceof byte[]) {

out.putInt(Arrays.hashCode((byte[]) o));
} else if (o instanceof short[]) {

out.putInt(Arrays.hashCode((short[]) o));
} else if (o instanceof int[]) {

out.putInt(Arrays.hashCode((int[]) o));
} else if (o instanceof long[]) {

out.putInt(Arrays.hashCode((long[]) o));
} else if (o instanceof char[]) {

out.putInt(Arrays.hashCode((char[]) o));
} else if (o instanceof float[]) {

out.putInt(Arrays.hashCode((float[]) o));
} else if (o instanceof double[]) {

out.putInt(Arrays.hashCode((double[]) o));
} else if (o instanceof boolean[]) {

out.putInt(Arrays.hashCode((boolean[]) o));
} else if (o != null) {

out.putInt(o.hashCode());
} else {

out.putInt(0);
}
}
raw = out.array();
} else {

raw = values.get(0).toString().getBytes(); // assume key is the first field
}
return raw;
}
/*================================================== * Helper Classes *==================================================*/
/** * This interface is responsible for selecting a subset of the target tasks for a given key . */
public interface AssignmentCreator extends Serializable {

int[] createAssignment(List<Integer> targetTasks, byte[] key);
}
/** * The interface selects an element from the task assignment to send a specific message Tuple. */
public interface TargetSelector extends Serializable {

Integer chooseTask(int[] assignedTasks);
}
/*========== Implementations ==========*/
/** * AssignmentCreator Two arbitrary tasks are selected for the implementation of . */
public static class RandomTwoTaskAssignmentCreator implements AssignmentCreator {

/** * Create two task assignments by selecting random tasks . */
@Override
public int[] createAssignment(List<Integer> tasks, byte[] key) {

// This requires generating a key based deterministic assignment .
final long seedForRandom = Arrays.hashCode(key);
final Random random = new Random(seedForRandom);
final int choice1 = random.nextInt(tasks.size());
int choice2 = random.nextInt(tasks.size());
// Ensure options 1 And options 2 Not the same task .
choice2 = choice1 == choice2 ? (choice2 + 1) % tasks.size() : choice2;
return new int[]{
 tasks.get(choice1), tasks.get(choice2) };
}
}
/** * A basic implementation of target selection. This strategy chooses the task within the assignment that has received the fewest Tuples * overall from this instance of the grouping. */
public static class BalancedTargetSelector implements TargetSelector {

private Map<Integer, Long> targetTaskStats = Maps.newHashMap();
/** * Select an incoming task , And choose the task with the least number of choices so far . */
@Override
public Integer chooseTask(int[] assignedTasks) {

Integer taskIdWithMinLoad = null;
Long minTaskLoad = Long.MAX_VALUE;
for (Integer currentTaskId : assignedTasks) {

final Long currentTaskLoad = targetTaskStats.getOrDefault(currentTaskId, 0L);
if (currentTaskLoad < minTaskLoad) {

minTaskLoad = currentTaskLoad;
taskIdWithMinLoad = currentTaskId;
}
}
targetTaskStats.put(taskIdWithMinLoad, targetTaskStats.getOrDefault(taskIdWithMinLoad, 0L) + 1);
return taskIdWithMinLoad;
}
}
}

stay prepare When , Initialize the long[] targetTaskStats Used to count each task
partialKeyGrouping If not specified fields, By default outputFields One of the first field To calculate .
BalancedTargetSelector According to the selected taskId, And then according to targetTaskStats Calculation taskIdWithMinLoad return .
Through here RandomTwoTaskAssignmentCreator Choose two taskId, Then choose the one that is used less often .
getKeyBytes() Method from the input Tuple Extract key from .

copyright:author[null_ wry],Please bring the original link to reprint, thank you. https://en.javamana.com/2022/02/202202130759058305.html