Storm source code analysis (2)

null_ wry 2022-02-13 07:59:24 阅读数:471

storm source code analysis

[email protected]

storm Source code analysis ( Two )

[email protected]

One 、 What is? zookeeper?

ZooKeeper Is an open source distributed coordination service . It is a software that provides consistency services for distributed applications , Distributed applications can be based on Zookeeper Implementation such as data publishing / subscribe 、 Load balancing 、 Naming service 、 Distributed coordination / notice 、 Cluster management 、Master The election 、 Distributed lock and distributed queue .

ZooKeeper The goal is to encapsulate complex, error-prone, critical services , Will be simple to use interface and efficient performance 、 A stable system is provided to the user .

Storm Focus on external resources ,Nimbus、Supervisor and Worker And so on are to save the heartbeat data in ZooKeeper On ,Nimbus Also according to ZooKeeper Scheduling and task allocation based on the heartbeat and task health status on the .

Storm All status information of is saved in Zookeeper Inside ,nimbus By means of zookeeper Write status information on it to assign tasks ,
supervisor,task By getting from zookeeper In reading status to get the task , meanwhile supervisor, task It also defines sending heartbeat information to zookeeper,
bring nimbus You can monitor the whole storm The state of the cluster , So you can restart some of the dead task.

ZooKeeper Make the whole storm The cluster is very robust , It doesn't matter if any working machine hangs up , Just restart and start from zookeeper Just retrieve the status information above .

Two 、zookeeper How to keep the state of master and slave nodes consistent ?

Zookeeper At the heart of that is the atomic broadcasting mechanism , This mechanism ensures that server Synchronization between . The protocol that implements this mechanism is called Zab agreement .Zab There are two modes of protocol , They are recovery mode and broadcast mode .

1、 Recovery mode
When the service starts or after the leader crashes ,Zab It's in recovery mode , When leaders are elected , And most of them server Finished and leader After status synchronization of , Recovery mode is over . State synchronization ensures leader and server Have the same system state .

2、 Broadcast mode
once leader Already with most of follower After state synchronization , It can start broadcasting news , That is to say, enter the broadcast state . At this time when a server Join in ZooKeeper In service , It will start in recovery mode , Find out leader, And on and on leader State synchronization . By the end of synchronization , It also participates in the news broadcast .ZooKeeper The service has been maintained at Broadcast state , until leader Collapsed or leader Lost most of followers Support .

Strom stay Zookeeper Directory structure in

/assignments -> Task assignment information
/storms -> Running topology Of ID
/supervisors -> be-all Supervisors Heartbeat information of
/workerbeats -> be-all Worker The heart of
/errors -> Generated error message

First analysis ClientZookeeper class .

Inserted package

package org.apache.storm.zookeeper;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.storm.callback.WatcherCallBack;
import org.apache.storm.cluster.DaemonType;
import org.apache.storm.cluster.VersionedData;
import org.apache.storm.utils.CuratorUtils;
import org.apache.storm.utils.Utils;
import org.apache.storm.utils.ZookeeperAuthInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Properties in this class

 private static final ClientZookeeper INSTANCE = new ClientZookeeper();
private static Logger LOG = LoggerFactory.getLogger(ClientZookeeper.class);
private static ClientZookeeper _instance = INSTANCE;

ClientZookeeper Instance of type _instance and Logger Type of log LOG.

The basic method

public static void setInstance(ClientZookeeper u) {

_instance = u;
public static void resetInstance() {

_instance = INSTANCE;
public static void mkdirs(CuratorFramework zk, String path, List<ACL> acls) {

_instance.mkdirsImpl(zk, path, acls);
public static CuratorFramework mkClient(Map<String, Object> conf, List<String> servers, Object port, String root, final WatcherCallBack watcher, Map<String, Object> authConf, DaemonType type) {

return _instance.mkClientImpl(conf, servers, port, root, watcher, authConf, type);

Some basic methods , Set up _instance.
mkdirs Methods will CuratorFramework、path and List The access control list is passed in _instance in .
mkClient Methods by Map and servers List, etc. to get the instance _instance Corresponding client Information about .

deleteNodeBlobstore Method

public static void deleteNodeBlobstore(CuratorFramework zk, String parentPath, String hostPortInfo) {

String normalizedParentPath = normalizePath(parentPath);
List<String> childPathList = null;
if (existsNode(zk, normalizedParentPath, false)) {

childPathList = getChildren(zk, normalizedParentPath, false);
for (String child : childPathList) {

if (child.startsWith(hostPortInfo)) {

LOG.debug("deleteNode child {}", child);
deleteNode(zk, normalizedParentPath + "/" + child);

Delete zookeeper The state of a key in .
The key content is “nimbus Host port information ” start .

createNode Method

public static String createNode(CuratorFramework zk, String path, byte[] data, CreateMode mode, List<ACL> acls) {

String ret = null;
try {

String npath = normalizePath(path);
ret = zk.create().creatingParentsIfNeeded().withMode(mode).withACL(acls).forPath(npath, data);
} catch (Exception e) {

throw Utils.wrapInRuntime(e);
return ret;
public static String createNode(CuratorFramework zk, String path, byte[] data, List<ACL> acls) {

return createNode(zk, path, data, CreateMode.PERSISTENT, acls);

Create nodes .

tokenizePath Method

public static List<String> tokenizePath(String path) {

String[] toks = path.split("/");
java.util.ArrayList<String> rtn = new ArrayList<String>();
for (String str : toks) {

if (!str.isEmpty()) {

return rtn;

Decompose path string .

// An highlighted block
var foo = 'bar';

getDataWithVersion Method

public static VersionedData<byte[]> getDataWithVersion(CuratorFramework zk, String path, boolean watch) {

VersionedData<byte[]> data = null;
try {

byte[] bytes = null;
Stat stats = new Stat();
String npath = normalizePath(path);
if (existsNode(zk, npath, watch)) {

if (watch) {

bytes = zk.getData().storingStatIn(stats).watched().forPath(npath);
} else {

bytes = zk.getData().storingStatIn(stats).forPath(npath);
if (bytes != null) {

int version = stats.getVersion();
data = new VersionedData<>(version, bytes);
} catch (Exception e) {

if (Utils.exceptionCauseIsInstanceOf(KeeperException.NoNodeException.class, e)) {

// this is fine b/c we still have a watch from the successful exists call
} else {

return data;

Get data and version .

There are some basic in this class set、get Method .

Reference link :

copyright:author[null_ wry],Please bring the original link to reprint, thank you.