Storm source code analysis (12)

null_ wry 2022-02-13 07:59:03 阅读数:482

storm source code analysis

[email protected]

storm Medium trident

trident There are two important classes for reading code TridentTopology、Stream, These two classes can be used for us to learn storm-trident The entry of source code .

trident The construction of topology is divided into two parts :

One : structure trident Logical topology , This part is where we call TridentTopology.newStream(…).each().groupBy()… And so on . This process completes the construction of a logical topology . Save to a DefaultDirectedGraph<Node, IndexedEdge> _graph In the object ;

Two : according to trident The logical topology is constructed as storm The topology of , This part is in TridentTopology Of build() Method

What is? trident?

Trident Is in storm On the basis of , One by realtime Computing as the goal of a high degree of abstraction . It provides the ability to process large throughput data at the same time , It also provides the ability of low latency distributed query and stateful flow processing .Trident Is completely fault-tolerant , Have semantics that are processed only once , In fact, that is transactional The advanced packaging of . This makes it easy for you Use Trident For real-time data processing .Trident Will keep the state in some form , When something goes wrong , It will recover as needed These States .
Trident Is completely fault-tolerant , Have semantics that are processed only once , In fact, that is transactional The advanced packaging of . This makes it easy for you Use Trident For real-time data processing .Trident Will keep the state in some form , When something goes wrong , It will recover as needed These States .

Trident Encapsulates the transactional Transaction class , So we no longer need to learn Batch Relevant basis API 了 , Reduced learning costs .

trident Each message is processed in batch In units of , That is, multiple tuples are processed at one time

trident yes storm A higher level of abstraction , Mainly provides 3 All the benefits :

(1) frequently-used count,sum When encapsulation becomes a method , It can be called directly without implementing it yourself .
(2) Provide a primitive , Such as groupby etc. .
(3) Provide transaction support , It can ensure that the data are processed and only processed once ( Exactly Once )



 public Stream newStream(String txId, IRichSpout spout) {

return newStream(txId, new RichSpoutBatchExecutor(spout));
public Stream newStream(String txId, IPartitionedTridentSpout spout) {

return newStream(txId, new PartitionedTridentSpoutExecutor(spout));
public Stream newStream(String txId, IOpaquePartitionedTridentSpout spout) {

return newStream(txId, new OpaquePartitionedTridentSpoutExecutor(spout));
public Stream newStream(String txId, ITridentDataSource dataSource) {

if (dataSource instanceof IBatchSpout) {

return newStream(txId, (IBatchSpout) dataSource);
} else if (dataSource instanceof ITridentSpout) {

return newStream(txId, (ITridentSpout) dataSource);
} else if (dataSource instanceof IPartitionedTridentSpout) {

return newStream(txId, (IPartitionedTridentSpout) dataSource);
} else if (dataSource instanceof IOpaquePartitionedTridentSpout) {

return newStream(txId, (IOpaquePartitionedTridentSpout) dataSource);
} else {

throw new UnsupportedOperationException("Unsupported stream");
public Stream newStream(String txId, IBatchSpout spout) {

Node n = new SpoutNode(getUniqueStreamId(), spout.getOutputFields(), txId, spout, SpoutNode.SpoutType.BATCH);
return addNode(n);
public Stream newStream(String txId, ITridentSpout spout) {

Node n = new SpoutNode(getUniqueStreamId(), spout.getOutputFields(), txId, spout, SpoutNode.SpoutType.BATCH);
return addNode(n);
protected Stream addNode(Node n) {

return new Stream(this,, n);
protected void registerNode(Node n) {

if(n.stateInfo!=null) {

String id =;
if(!_colocate.containsKey(id)) {

_colocate.put(id, new ArrayList());

newStream The first parameter of txId, The second parameter is ITridentDataSource
ITridentDataSource There are several types , There were IBatchSpout、ITridentSpout、IPartitionedTridentSpout、IOpaquePartitionedTridentSpout

The last thing is to create SpoutNode, then registerNode Add to _graph( If node Of stateInfo Not for null, Also added to _colocate, however SpoutNode The value is null), Be careful SpoutNode Of SpoutType by SpoutNode.SpoutType.BATCH.


 public StormTopology build() {

DefaultDirectedGraph<Node, IndexedEdge> graph = (DefaultDirectedGraph) _graph.clone();
List<SpoutNode> spoutNodes = new ArrayList<>();
// can be regular nodes (static state) or processor nodes
Set<Node> boltNodes = new LinkedHashSet<>();
for(Node n: graph.vertexSet()) {

if(n instanceof SpoutNode) {

spoutNodes.add((SpoutNode) n);
} else if(!(n instanceof PartitionNode)) {

Set<Group> initialGroups = new LinkedHashSet<>();
for(Node n: boltNodes) {

initialGroups.add(new Group(graph, n));
GraphGrouper grouper = new GraphGrouper(graph, initialGroups);
Collection<Group> mergedGroups = grouper.getAllGroups();
// add identity partitions between groups
for(IndexedEdge<Node> e: new HashSet<>(graph.edgeSet())) {

if(!(e.source instanceof PartitionNode) && !( instanceof PartitionNode)) {

Group g1 = grouper.nodeGroup(e.source);
Group g2 = grouper.nodeGroup(;
// g1 being null means the source is a spout node
if(g1==null && !(e.source instanceof SpoutNode))
throw new RuntimeException("Planner exception: Null source group must indicate a spout node at this phase of planning");
if(g1==null || !g1.equals(g2)) {

PartitionNode pNode = makeIdentityPartition(e.source);
graph.addEdge(e.source, pNode, new IndexedEdge(e.source, pNode, 0));
graph.addEdge(pNode,, new IndexedEdge(pNode,, e.index));
// add in spouts as groups so we can get parallelisms
for(Node n: spoutNodes) {

grouper.addGroup(new Group(graph, n));
mergedGroups = grouper.getAllGroups();
Map<Node, String> batchGroupMap = new HashMap<>();
List<Set<Node>> connectedComponents = new ConnectivityInspector<>(graph).connectedSets();
for(int i=0; i<connectedComponents.size(); i++) {

String groupId = "bg" + i;
for(Node n: connectedComponents.get(i)) {

batchGroupMap.put(n, groupId);
Map<Group, Integer> parallelisms = getGroupParallelisms(graph, grouper, mergedGroups);
TridentTopologyBuilder builder = new TridentTopologyBuilder();
Map<Node, String> spoutIds = genSpoutIds(spoutNodes);
Map<Group, String> boltIds = genBoltIds(mergedGroups);
for(SpoutNode sn: spoutNodes) {

Integer parallelism = parallelisms.get(grouper.nodeGroup(sn));
Map<String, Number> spoutRes = new HashMap<>(_resourceDefaults);
Number cpuLoad = spoutRes.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
SpoutDeclarer spoutDeclarer = null;
if(sn.type == SpoutNode.SpoutType.DRPC) {

spoutDeclarer = builder.setBatchPerTupleSpout(spoutIds.get(sn), sn.streamId,
(IRichSpout) sn.spout, parallelism, batchGroupMap.get(sn));
} else {

ITridentSpout s;
if(sn.spout instanceof IBatchSpout) {

s = new BatchSpoutExecutor((IBatchSpout)sn.spout);
} else if(sn.spout instanceof ITridentSpout) {

s = (ITridentSpout) sn.spout;
} else {

throw new RuntimeException("Regular rich spouts not supported yet... try wrapping in a RichSpoutBatchExecutor");
// TODO: handle regular rich spout without batches (need lots of updates to support this throughout)
spoutDeclarer = builder.setSpout(spoutIds.get(sn), sn.streamId, sn.txId, s, parallelism, batchGroupMap.get(sn));
if(onHeap != null) {

if(offHeap != null) {

spoutDeclarer.setMemoryLoad(onHeap, offHeap);
else {

if(cpuLoad != null) {

for(Group g: mergedGroups) {

if(!isSpoutGroup(g)) {

Integer p = parallelisms.get(g);
Map<String, String> streamToGroup = getOutputStreamBatchGroups(g, batchGroupMap);
Map<String, Number> groupRes = g.getResources(_resourceDefaults);
Number cpuLoad = groupRes.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
BoltDeclarer d = builder.setBolt(boltIds.get(g), new SubtopologyBolt(graph, g.nodes, batchGroupMap), p,
committerBatches(g, batchGroupMap), streamToGroup);
if(onHeap != null) {

if(offHeap != null) {

d.setMemoryLoad(onHeap, offHeap);
else {

if(cpuLoad != null) {

Collection<PartitionNode> inputs = uniquedSubscriptions(externalGroupInputs(g));
for(PartitionNode n: inputs) {

Node parent = TridentUtils.getParent(graph, n);
String componentId = parent instanceof SpoutNode ?
spoutIds.get(parent) : boltIds.get(grouper.nodeGroup(parent));
d.grouping(new GlobalStreamId(componentId, n.streamId), n.thriftGrouping);
HashMap<String, Number> combinedMasterCoordResources = new HashMap<String, Number>(_resourceDefaults);
return builder.buildTopology(combinedMasterCoordResources);

So here we create TridentTopologyBuilder, And then to spoutNodes, call TridentTopologyBuilder.setSpout(String id, String streamName, String txStateId, ITridentSpout spout, Integer parallelism, String batchGroup) Method , add to spout
about IBatchSpout Type of spout, adopt BatchSpoutExecutor The packing is ITridentSpout.

there streamName by streamId, adopt UniqueIdGen.getUniqueStreamId Generate , With s start , And then _streamCounter Count of , such as 1, All in all s1;txStateId For the user txId;batchGroup With bg start , And then connectedComponents The elements of the index, such as 0, All in all bg0;parallelism Parameters are user built topology It's set at .

Set the spout after , Is set spout Related resource allocation , such as memoryLoad、cpuLoad; After setting bolt, What we use here is SubtopologyBolt, Then set the bolt Related resource allocation
Last call TridentTopologyBuilder.buildTopology.

Reference link :

copyright:author[null_ wry],Please bring the original link to reprint, thank you.