Storm source code analysis (V)

null_ wry 2022-02-13 07:59:17 阅读数:718

storm source code analysis

[email protected]

storm-kafka Source code analysis ( One )

storm-kafka yes storm For reading kafka Connector for messages .

One 、org.apache.storm.kafka

org.apache.storm.kafka This package It includes some common modules , as well as storm-core Of spout Handle .

We mainly focus on zk Read and write classes for analysis .

( One )ZkState

ZkState Every one of them partition Treatment of , It is through reading and writing zk To achieve ,zk The contents in are as follows :


The above information is topoId, Topology name , This partition handles offset, Zone number , Which station is this section on kafka machine , Which port , as well as topic name .
ZkState As long as you provide information about this zk Reading and writing of information , Such as readJSON, writeJSON.

The information is in zk The location in is built by KafkaConfig Object 3、4 Parameters specified , As the configuration below , Then the data is written in /kafka2/ljhtest below . So the first 4 Parameters must be unique , Otherwise, different topologies will conflict .

 SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts, "ma30", "/kafka2", "ljhtest");

( Two )trident.ZkBrokerReader

trident.ZkBrokerReader Most of the functions are through DynamicBrokersReader complete , About and zk The connection of , All through the former . At the same time, the following 2 A way :

1、getBrokerForTopic(): Go back to some topic The partition information of , The return is GlobalPartitionInformation object . This is due to the possibility of reading multiple partitions at the same time .
2、getAllBrokers(): Read all partitions , Don't specify topic. Because regular support topic, So there may be multiple topic.
3、refresh(): This is a private Method , Go to... Every once in a while refresh Zone information , on top 2 Called in a method .

public class ZkBrokerReader implements IBrokerReader {

public static final Logger LOG = LoggerFactory
GlobalPartitionInformation cachedBrokers;
DynamicBrokersReader reader;
long lastRefreshTimeMs;
long refreshMillis;
/** * * @param conf * @param topic * Appoint topic Of zkBrokerReader * @param hosts */
public ZkBrokerReader(Map conf, String topic, ZkHosts hosts) {

reader = new DynamicBrokersReader(conf, hosts.brokerZkStr,
hosts.brokerZkPath, topic);
cachedBrokers = reader.getBrokerInfo();
lastRefreshTimeMs = System.currentTimeMillis();
refreshMillis = hosts.refreshFreqSecs * 1000L;
public GlobalPartitionInformation getCurrentBrokers() {

long currTime = System.currentTimeMillis();
// It's simple , Specifies how long you start to refresh Brokerlibiao
if (currTime > lastRefreshTimeMs + refreshMillis) {"brokers need refreshing because " + refreshMillis
+ "ms have expired");
cachedBrokers = reader.getBrokerInfo();
lastRefreshTimeMs = currTime;
return cachedBrokers;
public void close() {


( 3、 ... and )ZkCoordinator

This class implements PartitionCoordinator Interface ,PartitionCoordinator Only 3 A way :

  1. The main method is getMyManagedPartitions(), That is, calculate your own spout Which partitions should be handled .
List<PartitionManager> getMyManagedPartitions();
  1. obtain PartitionManager object :
PartitionManager getManager(Partition partition);
  1. Refresh partition information regularly
void refresh();

( Four )StaticPartitionCoordinator

IPartitionCoordinator There are two methods in the interface :

List<IPartitionManager> getMyPartitionManagers();
IPartitionManager getPartitionManager(String partitionId);

StaticPartitionCoordinator Realized IPartitionCoordinator Interface .

public class StaticPartitionCoordinator implements IPartitionCoordinator {

private static final Logger logger = LoggerFactory.getLogger(StaticPartitionCoordinator.class);
protected final EventHubSpoutConfig config;
protected final int taskIndex;
protected final int totalTasks;
protected final List<IPartitionManager> partitionManagers;
protected final Map<String, IPartitionManager> partitionManagerMap;
protected final IStateStore stateStore;
public StaticPartitionCoordinator(
EventHubSpoutConfig spoutConfig, int taskIndex, int totalTasks, IStateStore stateStore, IPartitionManagerFactory pmFactory, IEventHubReceiverFactory recvFactory) {

this.config = spoutConfig;
this.taskIndex = taskIndex;
this.totalTasks = totalTasks;
this.stateStore = stateStore;
List<String> partitionIds = calculateParititionIdsToOwn();
partitionManagerMap = new HashMap<String, IPartitionManager>();
partitionManagers = new ArrayList<IPartitionManager>();
for (String partitionId : partitionIds) {

IEventHubReceiver receiver = recvFactory.create(config, partitionId);
IPartitionManager partitionManager = pmFactory.create(
config, partitionId, stateStore, receiver);
partitionManagerMap.put(partitionId, partitionManager);
public List<IPartitionManager> getMyPartitionManagers() {

return partitionManagers;
public IPartitionManager getPartitionManager(String partitionId) {

return partitionManagerMap.get(partitionId);
protected List<String> calculateParititionIdsToOwn() {

List<String> taskPartitions = new ArrayList<String>();
for (int i = this.taskIndex; i < config.getPartitionCount(); i += this.totalTasks) {

taskPartitions.add(Integer.toString(i));"taskIndex %d owns partitionId %d.", this.taskIndex, i));
return taskPartitions;

Reference link :

copyright:author[null_ wry],Please bring the original link to reprint, thank you.