Storm source code analysis (6)

null_ wry 2022-02-13 07:59:15 阅读数:388

storm source code analysis


[email protected]

package org.apache.storm.daemon.worker;

About Worker

adopt worker-data Method defines a mapping set that contains a lot of shared data ,worker Many methods in rely on it .

worker The timer in

Each timer corresponds to a java Threads ,worker The timer is used to maintain the heartbeat and obtain the update information of metadata .

worker The heart of

do-heartbeat Function is used to produce worker Heartbeat information of , This information is written to the local file system ,supervisor Will read these heartbeat information to determine worker The state of , Then decide whether to restart worker.

worker-state Method creates a LocalState object , And call the put Methods will worker The heartbeat information is stored in the local file system , route :STORM-LOCAL-DIR/workers//heartbeats,

worker Of heartbeat A file in a directory , The file name is the current timestamp .

worker After starting, go to zk Of /assignments/{topology} route .
And then according to assignment information acquisition Map<List, NodeInfo> executorToNodePort, And then through Executor.mkExecutor establish Executor.

worker Chinese vs ZMQ Connection maintenance

During the process ,Storm utilize ZMQ To send and receive information , And use the end-to-end method to complete the message transmission ,Worker Will be based on Topology The definition of and the tasks assigned to them , Figure out what messages you send will be sent by Task receive , be based on Topology This task assignment information ,worker Can be familiar with the goal Task Machine and port number .

It can be seen that ,Worker Two mechanisms are used to ensure the reliability of the connection , One is in zookeeper Register in watcher Method of callback notification , This approach is not necessarily reliable , For example, zookeeper Your connection is lost , Then registered watcher The callback method will fail . Second, the timer method is used to execute the function regularly .

Worker

public static void main(String[] args) throws Exception {

Preconditions.checkArgument(args.length == 5, "Illegal number of arguments. Expected: 5, Actual: " + args.length);
String stormId = args[0];
String assignmentId = args[1];
String supervisorPort = args[2];
String portStr = args[3];
String workerId = args[4];
Map<String, Object> conf = ConfigUtils.readStormConfig();
Utils.setupWorkerUncaughtExceptionHandler();
StormCommon.validateDistributedMode(conf);
int supervisorPortInt = Integer.parseInt(supervisorPort);
Worker worker = new Worker(conf, null, stormId, assignmentId, supervisorPortInt, Integer.parseInt(portStr), workerId);
//Add shutdown hooks before starting any other threads to avoid possible race condition
//between invoking shutdown hooks and registering shutdown hooks. See STORM-3658.
int workerShutdownSleepSecs = ObjectReader.getInt(conf.get(Config.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS));
LOG.info("Adding shutdown hook with kill in {} secs", workerShutdownSleepSecs);
Utils.addShutdownHookWithDelayedForceKill(worker::shutdown, workerShutdownSleepSecs);
worker.start();
}

main Method creation Worker, And then call start.

Worker.start

public void start() throws Exception {

LOG.info("Launching worker for {} on {}:{} with id {} and conf {}", topologyId, assignmentId, port, workerId,
ConfigUtils.maskPasswords(conf));
// because in local mode, its not a separate
// process. supervisor will register it in this case
// if ConfigUtils.isLocalMode(conf) returns false then it is in distributed mode.
if (!ConfigUtils.isLocalMode(conf)) {

// Distributed mode
SysOutOverSLF4J.sendSystemOutAndErrToSLF4J();
String pid = Utils.processPid();
FileUtils.touch(new File(ConfigUtils.workerPidPath(conf, workerId, pid)));
FileUtils.writeStringToFile(new File(ConfigUtils.workerArtifactsPidPath(conf, topologyId, port)), pid,
Charset.forName("UTF-8"));
}
ClusterStateContext csContext = new ClusterStateContext(DaemonType.WORKER, topologyConf);
IStateStorage stateStorage = ClusterUtils.mkStateStorage(conf, topologyConf, csContext);
IStormClusterState stormClusterState = ClusterUtils.mkStormClusterState(stateStorage, null, csContext);
metricRegistry.start(topologyConf, port);
SharedMetricRegistries.add(WORKER_METRICS_REGISTRY, metricRegistry.getRegistry());
Credentials initialCredentials = stormClusterState.credentials(topologyId, null);
Map<String, String> initCreds = new HashMap<>();
if (initialCredentials != null) {

initCreds.putAll(initialCredentials.get_creds());
}
autoCreds = ClientAuthUtils.getAutoCredentials(topologyConf);
subject = ClientAuthUtils.populateSubject(null, autoCreds, initCreds);
Subject.doAs(subject, (PrivilegedExceptionAction<Object>)
() -> loadWorker(stateStorage, stormClusterState, initCreds, initialCredentials)
);
}

The main thing here is to call loadWorker.

Worker.loadWorker

private Object loadWorker(IStateStorage stateStorage, IStormClusterState stormClusterState,
Map<String, String> initCreds, Credentials initialCredentials)
throws Exception {

workerState =
new WorkerState(conf, context, topologyId, assignmentId, supervisorIfaceSupplier, port, workerId,
topologyConf, stateStorage, stormClusterState,
autoCreds, metricRegistry, initialCredentials);
this.heatbeatMeter = metricRegistry.meter("doHeartbeat-calls", workerState.getWorkerTopologyContext(),
Constants.SYSTEM_COMPONENT_ID, (int) Constants.SYSTEM_TASK_ID);
// Heartbeat here so that worker process dies if this fails
// it's important that worker heartbeat to supervisor ASAP so that supervisor knows
// that worker is running and moves on
doHeartBeat();
executorsAtom = new AtomicReference<>(null);
// launch heartbeat threads immediately so that slow-loading tasks don't cause the worker to timeout
// to the supervisor
workerState.heartbeatTimer
.scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS), () -> {

try {

doHeartBeat();
} catch (IOException e) {

throw new RuntimeException(e);
}
});
Integer execHeartBeatFreqSecs = workerState.stormClusterState.isPacemakerStateStore()
? (Integer) conf.get(Config.TASK_HEARTBEAT_FREQUENCY_SECS)
: (Integer) conf.get(Config.EXECUTOR_METRICS_FREQUENCY_SECS);
workerState.executorHeartbeatTimer
.scheduleRecurring(0, execHeartBeatFreqSecs,
Worker.this::doExecutorHeartbeats);
workerState.refreshConnections();
workerState.activateWorkerWhenAllConnectionsReady();
workerState.refreshStormActive(null);
workerState.runWorkerStartHooks();
List<Executor> execs = new ArrayList<>();
for (List<Long> e : workerState.getLocalExecutors()) {

if (ConfigUtils.isLocalMode(conf)) {

Executor executor = LocalExecutor.mkExecutor(workerState, e, initCreds);
execs.add(executor);
for (int i = 0; i < executor.getTaskIds().size(); ++i) {

workerState.localReceiveQueues.put(executor.getTaskIds().get(i), executor.getReceiveQueue());
}
} else {

Executor executor = Executor.mkExecutor(workerState, e, initCreds);
for (int i = 0; i < executor.getTaskIds().size(); ++i) {

workerState.localReceiveQueues.put(executor.getTaskIds().get(i), executor.getReceiveQueue());
}
execs.add(executor);
}
}
List<IRunningExecutor> newExecutors = new ArrayList<IRunningExecutor>();
for (Executor executor : execs) {

newExecutors.add(executor.execute());
}
executorsAtom.set(newExecutors);
// This thread will send out messages destined for remote tasks (on other workers)
// If there are no remote outbound tasks, don't start the thread.
if (workerState.hasRemoteOutboundTasks()) {

transferThread = workerState.makeTransferThread();
transferThread.setName("Worker-Transfer");
}
establishLogSettingCallback();
final int credCheckMaxAllowed = 10;
final int[] credCheckErrCnt = new int[1]; // consecutive-error-count
workerState.refreshCredentialsTimer.scheduleRecurring(0,
(Integer) conf.get(Config.TASK_CREDENTIALS_POLL_SECS), () -> {

try {

checkCredentialsChanged();
credCheckErrCnt[0] = 0;
} catch (Exception ex) {

credCheckErrCnt[0]++;
if (credCheckErrCnt[0] <= credCheckMaxAllowed) {

LOG.warn("Ignoring {} of {} consecutive exceptions when checking for credential change",
credCheckErrCnt[0], credCheckMaxAllowed, ex);
} else {

LOG.error("Received {} consecutive exceptions, {} tolerated, when checking for credential change",
credCheckErrCnt[0], credCheckMaxAllowed, ex);
throw ex;
}
}
});
workerState.checkForUpdatedBlobsTimer.scheduleRecurring(0,
(Integer) conf.getOrDefault(Config.WORKER_BLOB_UPDATE_POLL_INTERVAL_SECS, 10),
() -> {

try {

LOG.debug("Checking if blobs have updated");
updateBlobUpdates();
} catch (IOException e) {

// IOException from reading the version files to be ignored
LOG.error(e.getStackTrace().toString());
}
}
);
// The jitter allows the clients to get the data at different times, and avoids thundering herd
if (!(Boolean) topologyConf.get(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING)) {

workerState.refreshLoadTimer.scheduleRecurringWithJitter(0, 1, 500, Worker.this::doRefreshLoad);
}
workerState.refreshConnectionsTimer.scheduleRecurring(0,
(Integer) conf.get(Config.TASK_REFRESH_POLL_SECS),
workerState::refreshConnections);
workerState.resetLogLevelsTimer.scheduleRecurring(0,
(Integer) conf.get(Config.WORKER_LOG_LEVEL_RESET_POLL_SECS),
logConfigManager::resetLogLevels);
workerState.refreshActiveTimer.scheduleRecurring(0, (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS),
workerState::refreshStormActive);
setupFlushTupleTimer(topologyConf, newExecutors);
setupBackPressureCheckTimer(topologyConf);
LOG.info("Worker has topology config {}", ConfigUtils.maskPasswords(topologyConf));
LOG.info("Worker {} for storm {} on {}:{} has finished loading", workerId, topologyId, assignmentId, port);
return this;
}

Through here workerState.getLocalExecutors() obtain List executorId Set . And then through Executor.mkExecutor Create a specified number of Executor, And then call execute() Method to ExecutorShutdown, Then save to AtomicReference<List> executorsAtom.

copyright:author[null_ wry],Please bring the original link to reprint, thank you. https://en.javamana.com/2022/02/202202130759129159.html