Storm source code analysis (3)

null_ wry 2022-02-13 07:59:22 阅读数:490

storm source code analysis

[email protected]

AclEnforcement class , This is used to perform ZK acl Class .
Let's first introduce some about ACLs Knowledge of authority .

One 、ACLs jurisdiction


1) CREATE: Create permissions , Can be in the current node Create child node
2) DELETE(d): Delete permission , You can delete the current node
3) READ: Read permission , You can get the current node The data of , Sure list At present node be-all child nodes
4) WRITE(w): Write permissions , To the present node Writing data
5) ADMIN(a): Administrative authority , You can set the current node Of permission


Understand from three dimensions : One is scheme; Two is user( You can use a user name or ip); The third is permission( That is, the above permissions ), Usually expressed as scheme?permissions.


scheme: scheme Corresponding to which scheme is used for authority management ,zookeeper Implemented a pluggable Of ACL programme , You can extend scheme, To expand ACL The mechanism of .zookeeper-3.4.4 The following are supported by default scheme:

There is only one under it id, It's called anyone, world:anyone On behalf of anyone ,zookeeper The node that has authority over all people in is the one that belongs to world:anyone Of

It doesn't need it. id, As long as it's through authentication Of user All have authority (zookeeper Supported by kerberos To carry out authencation, Also support username/password Formal authentication)

Its corresponding id by username:BASE64(SHA1(password)), It needs to go through username:password Formal authentication

Its corresponding id For the client IP Address , When setting, you can set a ip paragraph , such as ip:, Before match 16 individual bit Of IP paragraph

In this way scheme Under the circumstances , Corresponding id Have super authority , Can do anything (cdrwa)

sasl It's the same as id, It's a pass sasl authentication User id,zookeeper-3.4.4 Medium sasl authentication It's through kerberos To achieve , That is to say, the user has to pass kerberos authentication , To access the node.

id And scheme It's closely related , The specific situation is introduced above scheme The process has been introduced , No more details here .


jurisdiction cdrwa.

Two 、 The code analysis

verifyAcls() Method

This method is mainly to verify ZK acl Whether it is right , And optionally fix them when needed .
The variable passed in by this method conf Configure for cluster , If you want to fix it acl Then the incoming fixUp by true, If no, yes false.

public static void verifyAcls(Map<String, Object> conf, final boolean fixUp) throws Exception {

if (!Utils.isZkAuthenticationConfiguredStormServer(conf)) {"SECURITY IS DISABLED NO FURTHER CHECKS...");
//There is no security so we are done.
ACL superUserAcl = Utils.getSuperUserAcl(conf);
List<ACL> superAcl = new ArrayList<>(1);
List<ACL> drpcFullAcl = new ArrayList<>(2);
String drpcAclString = (String) conf.get(Config.STORM_ZOOKEEPER_DRPC_ACL);
if (drpcAclString != null) {

Id drpcAclId = Utils.parseZkId(drpcAclString, Config.STORM_ZOOKEEPER_DRPC_ACL);
ACL drpcUserAcl = new ACL(ZooDefs.Perms.READ, drpcAclId);

First pass Utils.isZkAuthenticationConfiguredStormServer() Methods to judge Storm Whether the server is configured with Zk authentication . If not, we think it is unsafe to return directly .
If the , Then get superUser Of Acl jurisdiction , Add to superAcl and drpcFullAcl In the permission list of .

 List<String> zkServers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
int port = ObjectReader.getInt(conf.get(Config.STORM_ZOOKEEPER_PORT));
String stormRoot = (String) conf.get(Config.STORM_ZOOKEEPER_ROOT);
try (CuratorFramework zk = ClientZookeeper.mkClient(conf, zkServers, port, "",
new DefaultWatcherCallBack(), conf, DaemonType.NIMBUS)) {

if (zk.checkExists().forPath(stormRoot) != null) {

//First off we want to verify that ROOT is good
verifyAclStrict(zk, superAcl, stormRoot, fixUp);
} else {

LOG.warn("{} does not exist no need to check any more...", stormRoot);

After the above processing, the root path is now ok , Let's start looking at other paths below it .

try (CuratorFramework zk = ClientZookeeper.mkClient(conf, zkServers, port, stormRoot,
new DefaultWatcherCallBack(), conf, DaemonType.NIMBUS)) {

//Next verify that the blob store is correct before we start it up.
if (zk.checkExists().forPath(ClusterUtils.BLOBSTORE_SUBTREE) != null) {

verifyAclStrictRecursive(zk, superAcl, ClusterUtils.BLOBSTORE_SUBTREE, fixUp);
if (zk.checkExists().forPath(ClusterUtils.BLOBSTORE_MAX_KEY_SEQUENCE_NUMBER_SUBTREE) != null) {

verifyAclStrict(zk, superAcl, ClusterUtils.BLOBSTORE_MAX_KEY_SEQUENCE_NUMBER_SUBTREE, fixUp);
//The blobstore is good, now lets get the list of all topo Ids
Set<String> topoIds = new HashSet<>();
if (zk.checkExists().forPath(ClusterUtils.STORMS_SUBTREE) != null) {

Map<String, Id> topoToZkCreds = new HashMap<>();
//Now lets get the creds for the topos so we can verify those as well.
BlobStore bs = ServerUtils.getNimbusBlobStore(conf, NimbusInfo.fromConf(conf), null);
try {

Subject nimbusSubject = new Subject();
nimbusSubject.getPrincipals().add(new NimbusPrincipal());
for (String topoId : topoIds) {

try {

String blobKey = topoId + "-stormconf.ser";
Map<String, Object> topoConf = Utils.fromCompressedJsonConf(bs.readBlob(blobKey, nimbusSubject));
String payload = (String) topoConf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD);
try {

topoToZkCreds.put(topoId, new Id("digest", DigestAuthenticationProvider.generateDigest(payload)));
} catch (NoSuchAlgorithmException e) {

throw new RuntimeException(e);
} catch (KeyNotFoundException knf) {

LOG.debug("topo removed {}", topoId, knf);
} finally {

if (bs != null) {


Next , Start up blob Before storage , Verify first blob Whether the storage is correct .
blobstore If stored correctly , Just get all topo id A list of , Then get topos Of creds, This can also verify other .

verifyParentWithReadOnlyTopoChildren(zk, superUserAcl, ClusterUtils.STORMS_SUBTREE, topoToZkCreds, fixUp);
verifyParentWithReadOnlyTopoChildren(zk, superUserAcl, ClusterUtils.ASSIGNMENTS_SUBTREE, topoToZkCreds, fixUp);
//There is a race on credentials where they can be leaked in some versions of storm.
verifyParentWithReadOnlyTopoChildrenDeleteDead(zk, superUserAcl, ClusterUtils.CREDENTIALS_SUBTREE, topoToZkCreds, fixUp);
//There is a race on logconfig where they can be leaked in some versions of storm.
verifyParentWithReadOnlyTopoChildrenDeleteDead(zk, superUserAcl, ClusterUtils.LOGCONFIG_SUBTREE, topoToZkCreds, fixUp);
//There is a race on backpressure too...
verifyParentWithReadWriteTopoChildrenDeleteDead(zk, superUserAcl, ClusterUtils.BACKPRESSURE_SUBTREE, topoToZkCreds, fixUp);
if (zk.checkExists().forPath(ClusterUtils.ERRORS_SUBTREE) != null) {

//errors is a bit special because in older versions of storm the worker created the parent directories lazily
// because of this it means we need to auto create at least the topo-id directory for all running topos.
for (String topoId : topoToZkCreds.keySet()) {

String path = ClusterUtils.errorStormRoot(topoId);
if (zk.checkExists().forPath(path) == null) {

LOG.warn("Creating missing errors location {}", path);
zk.create().withACL(getTopoReadWrite(path, topoId, topoToZkCreds, superUserAcl, fixUp)).forPath(path);
//Error should not be leaked according to the code, but they are not important enough to fail the build if
// for some odd reason they are leaked.
verifyParentWithReadWriteTopoChildrenDeleteDead(zk, superUserAcl, ClusterUtils.ERRORS_SUBTREE, topoToZkCreds, fixUp);
if (zk.checkExists().forPath(ClusterUtils.SECRET_KEYS_SUBTREE) != null) {

verifyAclStrict(zk, superAcl, ClusterUtils.SECRET_KEYS_SUBTREE, fixUp);
verifyAclStrictRecursive(zk, superAcl, ClusterUtils.secretKeysPath(WorkerTokenServiceType.NIMBUS), fixUp);
verifyAclStrictRecursive(zk, drpcFullAcl, ClusterUtils.secretKeysPath(WorkerTokenServiceType.DRPC), fixUp);
if (zk.checkExists().forPath(ClusterUtils.NIMBUSES_SUBTREE) != null) {

verifyAclStrictRecursive(zk, superAcl, ClusterUtils.NIMBUSES_SUBTREE, fixUp);
if (zk.checkExists().forPath("/leader-lock") != null) {

verifyAclStrictRecursive(zk, superAcl, "/leader-lock", fixUp);
if (zk.checkExists().forPath(ClusterUtils.PROFILERCONFIG_SUBTREE) != null) {

verifyAclStrictRecursive(zk, superAcl, ClusterUtils.PROFILERCONFIG_SUBTREE, fixUp);
if (zk.checkExists().forPath(ClusterUtils.SUPERVISORS_SUBTREE) != null) {

verifyAclStrictRecursive(zk, superAcl, ClusterUtils.SUPERVISORS_SUBTREE, fixUp);
// When moving to pacemaker workerbeats can be leaked too...
verifyParentWithReadWriteTopoChildrenDeleteDead(zk, superUserAcl, ClusterUtils.WORKERBEATS_SUBTREE, topoToZkCreds, fixUp);

Verify the parent node with read-only topology child nodes .
In some versions of storm in , There is competition for certificates , Yes logconfig The competition on , And in backpressure Competition on counter pressure . Therefore, delete the dead parent node with read-only topology child nodes .
If zk.checkExists().forPath(ClusterUtils.ERRORS_SUBTREE) Non empty , That is, being ClusterUtils.ERRORS_SUBTREE The path of , You need to run for all topos Automatically create at least one topo-id Catalog . because Errors A little special , Because in the old version storm in , It is inert for workers to create parent directories .
Then on ClusterUtils.SECRET_KEYS_SUBTREE,ClusterUtils.NIMBUSES_SUBTREE,"/leader-lock",ClusterUtils.PROFILERCONFIG_SUBTREE,ClusterUtils.SUPERVISORS_SUBTREE Nodes with non empty paths are processed accordingly to verify recursion .
When moving to pacemaker when , The heartbeat will also be leaked , This is to delete these dead parent nodes with read-only topology child nodes .

getTopoAcl() Method

private static List<ACL> getTopoAcl(String path, String topoId, Map<String, Id> topoToZkCreds, ACL superAcl, boolean fixUp, int perms) {

Id id = topoToZkCreds.get(topoId);
if (id == null) {

String error = "Could not find credentials for topology " + topoId + " at path " + path + ".";
if (fixUp) {

error += " Don't know how to fix this automatically. Please add needed ACLs, or delete the path.";
throw new IllegalStateException(error);
List<ACL> ret = new ArrayList<>(2);
ret.add(new ACL(perms, id));
return ret;

This method first passes through the topology id To get permission , Add to the permission list , Finally, return to the permission list .

Reference link :

copyright:author[null_ wry],Please bring the original link to reprint, thank you.