Storm source code analysis (9)

null_ wry 2022-02-13 07:59:10 阅读数:192

storm source code analysis


[email protected]

Stream Grouping

stay Storm in , Developers can be upstream Spout/Bolt Launched Tuples Specify downstream Bolt Which one? / Which? Task(s) To deal with the Tuples.

For each bolt Specify which stream should be accepted as input , Stream grouping defines how to bolt The task is distributed directly .
namely Stream Grouping, The main grouping methods are as follows 7 Kind of .

public enum GroupingType {

SHUFFLE {

@Override
public void assign(BoltDeclarer declarer, InputStream stream) {

declarer.shuffleGrouping(stream.fromComponent, stream.id);
}
},
FIELDS {

@Override
public void assign(BoltDeclarer declarer, InputStream stream) {

declarer.fieldsGrouping(stream.fromComponent, stream.id, new Fields("key"));
}
},
ALL {

@Override
public void assign(BoltDeclarer declarer, InputStream stream) {

declarer.allGrouping(stream.fromComponent, stream.id);
}
},
GLOBAL {

@Override
public void assign(BoltDeclarer declarer, InputStream stream) {

declarer.globalGrouping(stream.fromComponent, stream.id);
}
},
LOCAL_OR_SHUFFLE {

@Override
public void assign(BoltDeclarer declarer, InputStream stream) {

declarer.localOrShuffleGrouping(stream.fromComponent, stream.id);
}
},
NONE {

@Override
public void assign(BoltDeclarer declarer, InputStream stream) {

declarer.noneGrouping(stream.fromComponent, stream.id);
}
},
PARTIAL_KEY {

@Override
public void assign(BoltDeclarer declarer, InputStream stream) {

declarer.partialKeyGrouping(stream.fromComponent, stream.id, new Fields("key"));
}
};

ShuffleGrouping

Randomly : Random distribution stream Inside tuple, Make sure that each bolt The received tuple The number is about the same .

public class ShuffleGrouping implements CustomStreamGrouping, Serializable {

private ArrayList<List<Integer>> choices;
private AtomicInteger current;
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {

choices = new ArrayList<List<Integer>>(targetTasks.size());
for (Integer i : targetTasks) {

choices.add(Arrays.asList(i));
}
current = new AtomicInteger(0);
Collections.shuffle(choices, new Random());
}
@Override
public List<Integer> chooseTasks(int taskId, List<Object> values) {

int rightNow;
int size = choices.size();
while (true) {

rightNow = current.incrementAndGet();
if (rightNow < size) {

return choices.get(rightNow);
} else if (rightNow == size) {

current.set(0);
return choices.get(0);
}
} // race condition with another thread, and we lost. try again
}
}

Randomly (Shuffle Grouping) Is the most commonly used flow grouping method , It randomly distributes tuples to Bolt Tasks on , This ensures that each task gets the same number of tuples . Flow grouping is defined as shuffling . This sort of shuffling means coming from Spout The input of will be shuffled , Or random distribution to this Bolt The task .shuffle grouping To each task Of tuple The distribution is fairly even .

Here it is prepare The time is right ArrayList<List> choices Randomize
use current.incrementAndGet() Realization round robbin The effect of , exceed size When reset, return to the first , If it does not exceed, it returns incr After index Value .

GlobalGrouping

Global grouping : This tuple Be assigned to storm One of them bolt One of them task. And more specifically, it's assigned to id The one with the lowest value task.

public class GlobalGrouping implements CustomStreamGrouping {

List<Integer> target;
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targets) {

List<Integer> sorted = new ArrayList<>(targets);
Collections.sort(sorted);
target = Arrays.asList(sorted.get(0));
}
@Override
public List<Integer> chooseTasks(int i, List<Object> list) {

return target;
}
}

Take the first fixed here task, namely targetTasks.get(0).

FieldsGrouping

Group by field : For example, press UserId To group , Have the same UserId Come to group Tuple Will be sent to the same Bolts, And different UserId Will be assigned to different Bolts.

 public static class FieldsGrouping implements CustomStreamGrouping {

private Fields outFields;
private List<List<Integer>> targetTasks;
private Fields groupFields;
private int numTasks;
public FieldsGrouper(Fields outFields, Grouping thriftGrouping) {

this.outFields = outFields;
this.groupFields = new Fields(Thrift.fieldGrouping(thriftGrouping));
}
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {

this.targetTasks = new ArrayList<List<Integer>>();
for (Integer targetTask : targetTasks) {

this.targetTasks.add(Collections.singletonList(targetTask));
}
this.numTasks = targetTasks.size();
}
@Override
public List<Integer> chooseTasks(int taskId, List<Object> values) {

int targetTaskIndex = TupleUtils.chooseTaskIndex(outFields.select(groupFields, values), numTasks);
return targetTasks.get(targetTaskIndex);
}
}

Right selection fields Of values adopt TupleUtils.chooseTaskIndex choice task Subscript ;chooseTaskIndex Mainly using Arrays.deepHashCode Take the hash value and then numTask Take the mold down .

copyright:author[null_ wry],Please bring the original link to reprint, thank you. https://en.javamana.com/2022/02/202202130759078346.html