Storm source code analysis (V)

null_ wry 2022-02-13 07:59:17 阅读数:718

storm source code analysis


[email protected]

storm-kafka Source code analysis ( One )

storm-kafka yes storm For reading kafka Connector for messages .

One 、org.apache.storm.kafka

org.apache.storm.kafka This package It includes some common modules , as well as storm-core Of spout Handle .

We mainly focus on zk Read and write classes for analysis .

( One )ZkState

ZkState Every one of them partition Treatment of , It is through reading and writing zk To achieve ,zk The contents in are as follows :

{
"topology":{
"id":"2e3226e2-ef45-4c53-b03f-aacd94068bc9","name":"ljhtest"},"offset":8066973,"partition":0,"broker":{
"host":"gdc-kafka08-log.i.nease.net","port":9092},"topic":"ma30"}

The above information is topoId, Topology name , This partition handles offset, Zone number , Which station is this section on kafka machine , Which port , as well as topic name .
ZkState As long as you provide information about this zk Reading and writing of information , Such as readJSON, writeJSON.

The information is in zk The location in is built by KafkaConfig Object 3、4 Parameters specified , As the configuration below , Then the data is written in /kafka2/ljhtest below . So the first 4 Parameters must be unique , Otherwise, different topologies will conflict .

 SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts, "ma30", "/kafka2", "ljhtest");

( Two )trident.ZkBrokerReader

trident.ZkBrokerReader Most of the functions are through DynamicBrokersReader complete , About and zk The connection of , All through the former . At the same time, the following 2 A way :

1、getBrokerForTopic(): Go back to some topic The partition information of , The return is GlobalPartitionInformation object . This is due to the possibility of reading multiple partitions at the same time .
2、getAllBrokers(): Read all partitions , Don't specify topic. Because regular support topic, So there may be multiple topic.
3、refresh(): This is a private Method , Go to... Every once in a while refresh Zone information , on top 2 Called in a method .

public class ZkBrokerReader implements IBrokerReader {

public static final Logger LOG = LoggerFactory
.getLogger(ZkBrokerReader.class);
GlobalPartitionInformation cachedBrokers;
DynamicBrokersReader reader;
long lastRefreshTimeMs;
long refreshMillis;
/** * * @param conf * @param topic * Appoint topic Of zkBrokerReader * @param hosts */
public ZkBrokerReader(Map conf, String topic, ZkHosts hosts) {

reader = new DynamicBrokersReader(conf, hosts.brokerZkStr,
hosts.brokerZkPath, topic);
cachedBrokers = reader.getBrokerInfo();
lastRefreshTimeMs = System.currentTimeMillis();
refreshMillis = hosts.refreshFreqSecs * 1000L;
}
@Override
public GlobalPartitionInformation getCurrentBrokers() {

long currTime = System.currentTimeMillis();
// It's simple , Specifies how long you start to refresh Brokerlibiao
if (currTime > lastRefreshTimeMs + refreshMillis) {

LOG.info("brokers need refreshing because " + refreshMillis
+ "ms have expired");
cachedBrokers = reader.getBrokerInfo();
lastRefreshTimeMs = currTime;
}
return cachedBrokers;
}
@Override
public void close() {

reader.close();
}
}

( 3、 ... and )ZkCoordinator

This class implements PartitionCoordinator Interface ,PartitionCoordinator Only 3 A way :

  1. The main method is getMyManagedPartitions(), That is, calculate your own spout Which partitions should be handled .
List<PartitionManager> getMyManagedPartitions();
  1. obtain PartitionManager object :
PartitionManager getManager(Partition partition);
  1. Refresh partition information regularly
void refresh();

( Four )StaticPartitionCoordinator

IPartitionCoordinator There are two methods in the interface :

List<IPartitionManager> getMyPartitionManagers();
IPartitionManager getPartitionManager(String partitionId);

StaticPartitionCoordinator Realized IPartitionCoordinator Interface .

public class StaticPartitionCoordinator implements IPartitionCoordinator {

private static final Logger logger = LoggerFactory.getLogger(StaticPartitionCoordinator.class);
protected final EventHubSpoutConfig config;
protected final int taskIndex;
protected final int totalTasks;
protected final List<IPartitionManager> partitionManagers;
protected final Map<String, IPartitionManager> partitionManagerMap;
protected final IStateStore stateStore;
public StaticPartitionCoordinator(
EventHubSpoutConfig spoutConfig, int taskIndex, int totalTasks, IStateStore stateStore, IPartitionManagerFactory pmFactory, IEventHubReceiverFactory recvFactory) {

this.config = spoutConfig;
this.taskIndex = taskIndex;
this.totalTasks = totalTasks;
this.stateStore = stateStore;
List<String> partitionIds = calculateParititionIdsToOwn();
partitionManagerMap = new HashMap<String, IPartitionManager>();
partitionManagers = new ArrayList<IPartitionManager>();
for (String partitionId : partitionIds) {

IEventHubReceiver receiver = recvFactory.create(config, partitionId);
IPartitionManager partitionManager = pmFactory.create(
config, partitionId, stateStore, receiver);
partitionManagerMap.put(partitionId, partitionManager);
partitionManagers.add(partitionManager);
}
}
@Override
public List<IPartitionManager> getMyPartitionManagers() {

return partitionManagers;
}
@Override
public IPartitionManager getPartitionManager(String partitionId) {

return partitionManagerMap.get(partitionId);
}
protected List<String> calculateParititionIdsToOwn() {

List<String> taskPartitions = new ArrayList<String>();
for (int i = this.taskIndex; i < config.getPartitionCount(); i += this.totalTasks) {

taskPartitions.add(Integer.toString(i));
logger.info(String.format("taskIndex %d owns partitionId %d.", this.taskIndex, i));
}
return taskPartitions;
}
}

Reference link :
https://blog.csdn.net/jediael_lu/article/details/77149540

copyright:author[null_ wry],Please bring the original link to reprint, thank you. https://en.javamana.com/2022/02/202202130759153050.html