Storm source code analysis (7)

null_ wry 2022-02-13 07:59:13 阅读数:498

storm source code analysis

[email protected]

Get belongs to Worker Of Executor

read-worker-executors The function is used to calculate the number assigned to the Worker Of Executor, It uses the Storm-cluster-state Of assignment-info Function to get all Topology The distribution information of , And then use it worker Of assignemtn-id as well as port To filter , Get something worker Of Executor, there assignment-id Corresponding to node,Worker After starting , It carries out Executor The set will no longer change , But when the task allocation changes ,Supervisor It will restart worker To handle the task . among ,Nimbus When calculating the distribution of points, we will try not to change Worker Executed in Executor. At present Worker Any one of them Executor Processing failure will lead to Worker restart .

establish Executor Receive message queue and lookup table in

mk-receive-queue-map The Worker Each of them Executor Create a receive queue , And deposit it in hash surface , And the key is ExecutorId, The value is Disruptor Queue The object of ;

ExecutorId It's actually data with two elements , namely [startTaskId,endTaskId], It means that we should Executor The range of tasks performed .

worker Receive function in

Worker Medium mk-transfer-local-fn The function is used to produce and send messages to Executor Receive queue for , The same worker Inside Executor Messages will be passed through this function .

short-executor-receive-queue-map Storage Executor First of all Task Of taskid It's time to Executor Corresponding receiving queue (Distuptor Queue) The mapping relation of .

task-getter Function to ZMQ The message sent is the incoming parameter , The message here is an array of two elements , The first element is TaskId,task-getter The goal of the function is to pass the message taskId Get the corresponding Executor First of all Task Of TaskId, The second element is the actual content of the message .

Define the body of a function , The input of the function is ZMQ A set of messages received tuple-batch, According to the message Taskid Corresponding Executor First of all Task Of TaskId Group messages , Its variable grouped The corresponding key is Executor First of all Task Of Taskid, The value belongs to this executor A set of messages ;

adopt executor First of all task Of taskid Gain and Executor Corresponding receive message queue q, call disruptor/publish Method sends the received message to the queue q in



public void setLogFilePermission(String fileName) throws IOException {

Path absFile = logRootDir.resolve(fileName).toAbsolutePath().normalize();
if (!absFile.startsWith(logRootDir)) {

boolean runAsUser = ObjectReader.getBoolean(stormConf.get(SUPERVISOR_RUN_WORKER_AS_USER), false);
Path parent = logRootDir.resolve(fileName).getParent();
Optional<Path> mdFile = (parent == null) ? Optional.empty() : getMetadataFileForWorkerLogDir(parent);
Optional<String> topoOwner = mdFile.isPresent()
? Optional.of(getTopologyOwnerFromMetadataFile(mdFile.get().toAbsolutePath().normalize()))
: Optional.empty();
if (runAsUser && topoOwner.isPresent() && absFile.toFile().exists() && !Files.isReadable(absFile)) {

LOG.debug("Setting permissions on file {} with topo-owner {}", fileName, topoOwner);
try {

ClientSupervisorUtils.processLauncherAndWait(stormConf, topoOwner.get(),
Lists.newArrayList("blob", absFile.toAbsolutePath().normalize().toString()), null,
"setup group read permissions for file: " + fileName);
} catch (IOException e) {

throw e;

Set the permissions of the log file , So that the log viewer can serve the file .


public List<Path> getAllLogsForRootDir() throws IOException {

List<Path> files = new ArrayList<>();
Set<Path> topoDirFiles = getAllWorkerDirs();
if (topoDirFiles != null) {

for (Path portDir : topoDirFiles) {

return files;

Return to the root log directory worker List of all log files in the directory .


public Set<Path> getAllWorkerDirs() {

try (Stream<Path> topoDirs = Files.list(logRootDir)) {

return topoDirs
.flatMap(Unchecked.function(Files::list)) //Worker dirs
} catch (IOException e) {

throw Utils.wrapInRuntime(e);

Returns the collection of all working directories in all topology directories in the root log directory .


public SortedSet<Path> getAliveWorkerDirs() throws IOException {

Set<String> aliveIds = getAliveIds(Time.currentTimeSecs());
Set<Path> logDirs = getAllWorkerDirs();
return getLogDirs(logDirs, (wid) -> aliveIds.contains(wid));

Returns a sorted set of paths , These paths are written by staff who are now active .


public Optional<Path> getMetadataFileForWorkerLogDir(Path logDir) throws IOException {

Path metaFile = logDir.resolve(WORKER_YAML);
if (metaFile.toFile().exists()) {

return Optional.of(metaFile);
} else {

LOG.warn("Could not find {} to clean up for {}", metaFile.toAbsolutePath().normalize(), logDir);
return Optional.empty();

Returns the metadata file of the given worker log directory (worker.yaml).


public String getWorkerIdFromMetadataFile(Path metaFile) {

Map<String, Object> map = (Map<String, Object>) Utils.readYamlFile(metaFile.toString());
return ObjectReader.getString(map == null ? null : map.get("worker-id"), null);

from worker Return... In metafile worker id.


public SortedSet<Path> getLogDirs(Set<Path> logDirs, Predicate<String> predicate) {

// we could also make this static, but not to do it due to mock
TreeSet<Path> ret = new TreeSet<>();
for (Path logDir: logDirs) {

String workerId = "";
try {

Optional<Path> metaFile = getMetadataFileForWorkerLogDir(logDir);
if (metaFile.isPresent()) {

workerId = getWorkerIdFromMetadataFile(metaFile.get().toAbsolutePath().normalize());
if (workerId == null) {

workerId = "";
} catch (IOException e) {

LOG.warn("Error trying to find worker.yaml in {}", logDir, e);
if (predicate.test(workerId)) {

return ret;

Find specific workers that can be cleared id The catalog of .

copyright:author[null_ wry],Please bring the original link to reprint, thank you.