Storm source code analysis (13)

null_ wry 2022-02-13 07:59:02 阅读数:324

storm source code analysis

[email protected]



Generate bolt The name of .

private static Map<Group, String> genBoltIds(Collection<Group> groups) {

Map<Group, String> ret = new HashMap<>();
int ctr = 0;
for(Group g: groups) {

if(!isSpoutGroup(g)) {

List<String> name = new ArrayList<>();
name.add("" + ctr);
String groupName = getGroupName(g);
if(groupName!=null && !groupName.isEmpty()) {

ret.put(g, Utils.join(name, "-"));
return ret;
private static String getGroupName(Group g) {

TreeMap<Integer, String> sortedNames = new TreeMap<>();
for(Node n: g.nodes) {

if(!=null) {

List<String> names = new ArrayList<>();
String prevName = null;
for(String n: sortedNames.values()) {

if(prevName==null || !n.equals(prevName)) {

prevName = n;
return Utils.join(names, "-");

genBoltIds Used to bolt Generate a unique id, It uses letters b start , Then there is a number id, Next is group The name of , And then there's the 2 individual id, The first 2 individual group The name of , Go down in turn . and group The name is given by this group Contains Node The name consists of .


protected Stream addSourcedNode(Stream source, Node newNode) {

return addSourcedNode(Arrays.asList(source), newNode);
protected Stream addSourcedNode(List<Stream> sources, Node newNode) {

registerSourcedNode(sources, newNode);
return new Stream(this,, newNode);

Create a new node , Specify the parent node of the new node ( Maybe more than one ). Specify multiple sources Only in merge() Method is called multiReduce() Called when the . So here's just one source The circumstances of .

protected void registerSourcedNode(List<Stream> sources, Node newNode) {

int streamIndex = 0;
for(Stream s: sources) {

_graph.addEdge(s._node, newNode, new IndexedEdge(s._node, newNode, streamIndex));

In addition to registering new nodes registerNode(newNode) outside , Still in each stream Create an edge between and nodes .

protected void registerNode(Node n) {

if(n.stateInfo!=null) {

String id =;
if(!_colocate.containsKey(id)) {

_colocate.put(id, new ArrayList());

Add a node to the graph . If... In the node stateInfo The member is not empty , Then put the node into the storage sequence number (StateId) Corresponding hash table _colocate in ._colocate Variable associates all nodes accessing the same storage , Put them in one Bolt In the implementation of .


protected Stream addNode(Node n) {

return new Stream(this,, n);

This method is relatively simple , It is only in newStream() And newDRPCStream Call in , This is used to provide a new data source . While the above addSourceNode() Is used in bolt To add the next processing node .



 Map<GlobalStreamId, String> _batchIds = new HashMap();
Map<String, TransactionalSpoutComponent> _spouts = new HashMap();
public SpoutDeclarer setSpout(String id, String streamName, String txStateId, ITridentSpout spout, Integer parallelism, String batchGroup) {

Map<String, String> batchGroups = new HashMap();
batchGroups.put(streamName, batchGroup);
markBatchGroups(id, batchGroups);
TransactionalSpoutComponent c = new TransactionalSpoutComponent(spout, streamName, parallelism, txStateId, batchGroup);
_spouts.put(id, c);
return new SpoutDeclarerImpl(c);
private void markBatchGroups(String component, Map<String, String> batchGroups) {

for(Map.Entry<String, String> entry: batchGroups.entrySet()) {

_batchIds.put(new GlobalStreamId(component, entry.getKey()), entry.getValue());

Called markBatchGroups, New component Add to _batchIds in , Also added to _spouts in .


 Map<GlobalStreamId, String> _batchIds = new HashMap();
Map<String, Component> _bolts = new HashMap();
// map from stream name to batch id
public BoltDeclarer setBolt(String id, ITridentBatchBolt bolt, Integer parallelism, Set<String> committerBatches, Map<String, String> batchGroups) {

markBatchGroups(id, batchGroups);
Component c = new Component(bolt, parallelism, committerBatches);
_bolts.put(id, c);
return new BoltDeclarerImpl(c);
private void markBatchGroups(String component, Map<String, String> batchGroups) {

for(Map.Entry<String, String> entry: batchGroups.entrySet()) {

_batchIds.put(new GlobalStreamId(component, entry.getKey()), entry.getValue());

Here we call markBatchGroups New component Add to _batchIds in , Also added to _bolts in ; about trident Come on , It's a series of ProcessorNode( There may also be PartitionNode).


 Map<String, List<String>> batchesToCommitIds = new HashMap<>();
Map<String, List<ITridentSpout>> batchesToSpouts = new HashMap<>();
for(String id: _spouts.keySet()) {

TransactionalSpoutComponent c = _spouts.get(id);
if(c.spout instanceof IRichSpout) {

//TODO: wrap this to set the stream name
builder.setSpout(id, (IRichSpout) c.spout, c.parallelism);
} else {

String batchGroup = c.batchGroupId;
if(!batchesToCommitIds.containsKey(batchGroup)) {

batchesToCommitIds.put(batchGroup, new ArrayList<String>());
if(!batchesToSpouts.containsKey(batchGroup)) {

batchesToSpouts.put(batchGroup, new ArrayList<ITridentSpout>());
batchesToSpouts.get(batchGroup).add((ITridentSpout) c.spout);
BoltDeclarer scd =
builder.setBolt(spoutCoordinator(id), new TridentSpoutCoordinator(c.commitStateId, (ITridentSpout) c.spout))
.globalGrouping(masterCoordinator(c.batchGroupId), MasterBatchCoordinator.BATCH_STREAM_ID)
.globalGrouping(masterCoordinator(c.batchGroupId), MasterBatchCoordinator.SUCCESS_STREAM_ID);
for(Map<String, Object> m: c.componentConfs) {

Map<String, TridentBoltExecutor.CoordSpec> specs = new HashMap();
specs.put(c.batchGroupId, new CoordSpec());
BoltDeclarer bd = builder.setBolt(id,
new TridentBoltExecutor(
new TridentSpoutExecutor(
((ITridentSpout) c.spout)),
bd.allGrouping(spoutCoordinator(id), MasterBatchCoordinator.BATCH_STREAM_ID);
bd.allGrouping(masterCoordinator(batchGroup), MasterBatchCoordinator.SUCCESS_STREAM_ID);
if(c.spout instanceof ICommitterTridentSpout) {

bd.allGrouping(masterCoordinator(batchGroup), MasterBatchCoordinator.COMMIT_STREAM_ID);
for(Map<String, Object> m: c.componentConfs) {

Number onHeap = masterCoordResources.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
Number offHeap = masterCoordResources.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
Number cpuLoad = masterCoordResources.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
for(String batch: batchesToCommitIds.keySet()) {

List<String> commitIds = batchesToCommitIds.get(batch);
SpoutDeclarer masterCoord = builder.setSpout(masterCoordinator(batch), new MasterBatchCoordinator(commitIds, batchesToSpouts.get(batch)));
if(onHeap != null) {

if(offHeap != null) {

masterCoord.setMemoryLoad(onHeap, offHeap);
else {

if(cpuLoad != null) {

for(String id: _bolts.keySet()) {

Component c = _bolts.get(id);
Map<String, CoordSpec> specs = new HashMap<>();
for(GlobalStreamId s: getBoltSubscriptionStreams(id)) {

String batch = batchIdsForBolts.get(s);
if(!specs.containsKey(batch)) specs.put(batch, new CoordSpec());
CoordSpec spec = specs.get(batch);
CoordType ct;
if(_batchPerTupleSpouts.containsKey(s.get_componentId())) {

ct = CoordType.single();
} else {

ct = CoordType.all();
spec.coords.put(s.get_componentId(), ct);
for(String b: c.committerBatches) {

specs.get(b).commitStream = new GlobalStreamId(masterCoordinator(b), MasterBatchCoordinator.COMMIT_STREAM_ID);
BoltDeclarer d = builder.setBolt(id, new TridentBoltExecutor(c.bolt, batchIdsForBolts, specs), c.parallelism);
for(Map<String, Object> conf: c.componentConfs) {

for(InputDeclaration inputDecl: c.declarations) {

Map<String, Set<String>> batchToComponents = getBoltBatchToComponentSubscriptions(id);
for(Map.Entry<String, Set<String>> entry: batchToComponents.entrySet()) {

for(String comp: entry.getValue()) {

d.directGrouping(comp, TridentBoltExecutor.COORD_STREAM(entry.getKey()));
for(String b: c.committerBatches) {

d.allGrouping(masterCoordinator(b), MasterBatchCoordinator.COMMIT_STREAM_ID);
return builder.createTopology();

about bolt Come on ( Packaged ProcessorNode Of SubtopologyBolt), I've set it up here TridentBoltExecutor This bolt, it directGrouping 了 TridentBoltExecutor.COORD_STREAM($coord-)

At the same time allGrouping 了 MasterBatchCoordinator.COMMIT_STREAM_ID($commit).

copyright:author[null_ wry],Please bring the original link to reprint, thank you.