Storm source code analysis (12)

storm source code analysis

storm Medium trident

trident There are two important classes for reading code TridentTopology、Stream, These two classes can be used for us to learn storm-trident The entry of source code .

trident The construction of topology is divided into two parts :

One : structure trident Logical topology , This part is where we call TridentTopology.newStream(…).each().groupBy()… And so on . This process completes the construction of a logical topology . Save to a DefaultDirectedGraph<Node, IndexedEdge> _graph In the object ;

Two : according to trident The logical topology is constructed as storm The topology of , This part is in TridentTopology Of build() Method

What is? trident?

Trident Is in storm On the basis of , One by realtime Computing as the goal of a high degree of abstraction . It provides the ability to process large throughput data at the same time , It also provides the ability of low latency distributed query and stateful flow processing .Trident Is completely fault-tolerant , Have semantics that are processed only once , In fact, that is transactional The advanced packaging of . This makes it easy for you Use Trident For real-time data processing .Trident Will keep the state in some form , When something goes wrong , It will recover as needed These States .
Trident Encapsulates the transactional Transaction class , So we no longer need to learn Batch Relevant basis API 了 , Reduced learning costs .

trident Each message is processed in batch In units of , That is, multiple tuples are processed at one time

trident yes storm A higher level of abstraction , Mainly provides 3 All the benefits :

(1) frequently-used count,sum When encapsulation becomes a method , It can be called directly without implementing it yourself .
(2) Provide a primitive , Such as groupby etc. .
(3) Provide transaction support , It can ensure that the data are processed and only processed once ( Exactly Once )



 public Stream newStream(String txId, IRichSpout spout) {

return newStream(txId, new RichSpoutBatchExecutor(spout));
public Stream newStream(String txId, IPartitionedTridentSpout spout) {

return newStream(txId, new PartitionedTridentSpoutExecutor(spout));
public Stream newStream(String txId, IOpaquePartitionedTridentSpout spout) {

return newStream(txId, new OpaquePartitionedTridentSpoutExecutor(spout));
public Stream newStream(String txId, ITridentDataSource dataSource) {

if (dataSource instanceof IBatchSpout) {

return newStream(txId, (IBatchSpout) dataSource);
} else if (dataSource instanceof ITridentSpout) {

return newStream(txId, (ITridentSpout) dataSource);
} else if (dataSource instanceof IPartitionedTridentSpout) {

return newStream(txId, (IPartitionedTridentSpout) dataSource);
} else if (dataSource instanceof IOpaquePartitionedTridentSpout) {

return newStream(txId, (IOpaquePartitionedTridentSpout) dataSource);
} else {

throw new UnsupportedOperationException("Unsupported stream");
public Stream newStream(String txId, IBatchSpout spout) {

Node n = new SpoutNode(getUniqueStreamId(), spout.getOutputFields(), txId, spout, SpoutNode.SpoutType.BATCH);
return addNode(n);
public Stream newStream(String txId, ITridentSpout spout) {

Node n = new SpoutNode(getUniqueStreamId(), spout.getOutputFields(), txId, spout, SpoutNode.SpoutType.BATCH);
return addNode(n);
protected Stream addNode(Node n) {

return new Stream(this,, n);
protected void registerNode(Node n) {

if(n.stateInfo!=null) {

String id =;
if(!_colocate.containsKey(id)) {

_colocate.put(id, new ArrayList());

newStream The first parameter of txId, The second parameter is ITridentDataSource
ITridentDataSource There are several types , There were IBatchSpout、ITridentSpout、IPartitionedTridentSpout、IOpaquePartitionedTridentSpout

The last thing is to create SpoutNode, then registerNode Add to _graph( If node Of stateInfo Not for null, Also added to _colocate, however SpoutNode The value is null), Be careful SpoutNode Of SpoutType by SpoutNode.SpoutType.BATCH.


 public StormTopology build() {

DefaultDirectedGraph<Node, IndexedEdge> graph = (DefaultDirectedGraph) _graph.clone();
List<SpoutNode> spoutNodes = new ArrayList<>();
// can be regular nodes (static state) or processor nodes
Set<Node> boltNodes = new LinkedHashSet<>();
for(Node n: graph.vertexSet()) {

if(n instanceof SpoutNode) {

spoutNodes.add((SpoutNode) n);
} else if(!(n instanceof PartitionNode)) {

Set<Group> initialGroups = new LinkedHashSet<>();
for(Node n: boltNodes) {

initialGroups.add(new Group(graph, n));
GraphGrouper grouper = new GraphGrouper(graph, initialGroups);
Collection<Group> mergedGroups = grouper.getAllGroups();
// add identity partitions between groups
for(IndexedEdge<Node> e: new HashSet<>(graph.edgeSet())) {

if(!(e.source instanceof PartitionNode) && !( instanceof PartitionNode)) {

Group g1 = grouper.nodeGroup(e.source);
Group g2 = grouper.nodeGroup(;
// g1 being null means the source is a spout node
if(g1==null && !(e.source instanceof SpoutNode))
throw new RuntimeException("Planner exception: Null source group must indicate a spout node at this phase of planning");
if(g1==null || !g1.equals(g2)) {

PartitionNode pNode = makeIdentityPartition(e.source);
graph.addEdge(e.source, pNode, new IndexedEdge(e.source, pNode, 0));
graph.addEdge(pNode,, new IndexedEdge(pNode,, e.index));
// add in spouts as groups so we can get parallelisms
for(Node n: spoutNodes) {

grouper.addGroup(new Group(graph, n));
mergedGroups = grouper.getAllGroups();
Map<Node, String> batchGroupMap = new HashMap<>();
List<Set<Node>> connectedComponents = new ConnectivityInspector<>(graph).connectedSets();
for(int i=0; i<connectedComponents.size(); i++) {

String groupId = "bg" + i;
for(Node n: connectedComponents.get(i)) {

batchGroupMap.put(n, groupId);
Map<Group, Integer> parallelisms = getGroupParallelisms(graph, grouper, mergedGroups);
TridentTopologyBuilder builder = new TridentTopologyBuilder();
Map<Node, String> spoutIds = genSpoutIds(spoutNodes);
Map<Group, String> boltIds = genBoltIds(mergedGroups);
for(SpoutNode sn: spoutNodes) {

Integer parallelism = parallelisms.get(grouper.nodeGroup(sn));
Map<String, Number> spoutRes = new HashMap<>(_resourceDefaults);
Number cpuLoad = spoutRes.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
SpoutDeclarer spoutDeclarer = null;
if(sn.type == SpoutNode.SpoutType.DRPC) {

spoutDeclarer = builder.setBatchPerTupleSpout(spoutIds.get(sn), sn.streamId,
(IRichSpout) sn.spout, parallelism, batchGroupMap.get(sn));
} else {

ITridentSpout s;
if(sn.spout instanceof IBatchSpout) {

s = new BatchSpoutExecutor((IBatchSpout)sn.spout);
} else if(sn.spout instanceof ITridentSpout) {

s = (ITridentSpout) sn.spout;
} else {

throw new RuntimeException("Regular rich spouts not supported yet... try wrapping in a RichSpoutBatchExecutor");
// TODO: handle regular rich spout without batches (need lots of updates to support this throughout)
spoutDeclarer = builder.setSpout(spoutIds.get(sn), sn.streamId, sn.txId, s, parallelism, batchGroupMap.get(sn));
if(onHeap != null) {

if(offHeap != null) {

spoutDeclarer.setMemoryLoad(onHeap, offHeap);
else {

if(cpuLoad != null) {

for(Group g: mergedGroups) {

if(!isSpoutGroup(g)) {

Integer p = parallelisms.get(g);
Map<String, String> streamToGroup = getOutputStreamBatchGroups(g, batchGroupMap);
Map<String, Number> groupRes = g.getResources(_resourceDefaults);
Number cpuLoad = groupRes.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
BoltDeclarer d = builder.setBolt(boltIds.get(g), new SubtopologyBolt(graph, g.nodes, batchGroupMap), p,
committerBatches(g, batchGroupMap), streamToGroup);
if(onHeap != null) {

if(offHeap != null) {

d.setMemoryLoad(onHeap, offHeap);
else {

if(cpuLoad != null) {

Collection<PartitionNode> inputs = uniquedSubscriptions(externalGroupInputs(g));
for(PartitionNode n: inputs) {

Node parent = TridentUtils.getParent(graph, n);
String componentId = parent instanceof SpoutNode ?
spoutIds.get(parent) : boltIds.get(grouper.nodeGroup(parent));
d.grouping(new GlobalStreamId(componentId, n.streamId), n.thriftGrouping);
HashMap<String, Number> combinedMasterCoordResources = new HashMap<String, Number>(_resourceDefaults);
return builder.buildTopology(combinedMasterCoordResources);

So here we create TridentTopologyBuilder, And then to spoutNodes, call TridentTopologyBuilder.setSpout(String id, String streamName, String txStateId, ITridentSpout spout, Integer parallelism, String batchGroup) Method , add to spout
about IBatchSpout Type of spout, adopt BatchSpoutExecutor The packing is ITridentSpout.

there streamName by streamId, adopt UniqueIdGen.getUniqueStreamId Generate , With s start , And then _streamCounter Count of , such as 1, All in all s1;txStateId For the user txId;batchGroup With bg start , And then connectedComponents The elements of the index, such as 0, All in all bg0;parallelism Parameters are user built topology It's set at .

Set the spout after , Is set spout Related resource allocation , such as memoryLoad、cpuLoad; After setting bolt, What we use here is SubtopologyBolt, Then set the bolt Related resource allocation
Last call TridentTopologyBuilder.buildTopology.

Reference link :

