Storm source code analysis (6)

null_ wry 2022-02-13 07:59:15 阅读数:388

storm source code analysis

[email protected]

package org.apache.storm.daemon.worker;

About Worker

adopt worker-data Method defines a mapping set that contains a lot of shared data ,worker Many methods in rely on it .

worker The timer in

Each timer corresponds to a java Threads ,worker The timer is used to maintain the heartbeat and obtain the update information of metadata .

worker The heart of

do-heartbeat Function is used to produce worker Heartbeat information of , This information is written to the local file system ,supervisor Will read these heartbeat information to determine worker The state of , Then decide whether to restart worker.

worker-state Method creates a LocalState object , And call the put Methods will worker The heartbeat information is stored in the local file system , route :STORM-LOCAL-DIR/workers//heartbeats,

worker Of heartbeat A file in a directory , The file name is the current timestamp .

worker After starting, go to zk Of /assignments/{topology} route .
And then according to assignment information acquisition Map<List, NodeInfo> executorToNodePort, And then through Executor.mkExecutor establish Executor.

worker Chinese vs ZMQ Connection maintenance

During the process ,Storm utilize ZMQ To send and receive information , And use the end-to-end method to complete the message transmission ,Worker Will be based on Topology The definition of and the tasks assigned to them , Figure out what messages you send will be sent by Task receive , be based on Topology This task assignment information ,worker Can be familiar with the goal Task Machine and port number .

It can be seen that ,Worker Two mechanisms are used to ensure the reliability of the connection , One is in zookeeper Register in watcher Method of callback notification , This approach is not necessarily reliable , For example, zookeeper Your connection is lost , Then registered watcher The callback method will fail . Second, the timer method is used to execute the function regularly .


public static void main(String[] args) throws Exception {

Preconditions.checkArgument(args.length == 5, "Illegal number of arguments. Expected: 5, Actual: " + args.length);
String stormId = args[0];
String assignmentId = args[1];
String supervisorPort = args[2];
String portStr = args[3];
String workerId = args[4];
Map<String, Object> conf = ConfigUtils.readStormConfig();
int supervisorPortInt = Integer.parseInt(supervisorPort);
Worker worker = new Worker(conf, null, stormId, assignmentId, supervisorPortInt, Integer.parseInt(portStr), workerId);
//Add shutdown hooks before starting any other threads to avoid possible race condition
//between invoking shutdown hooks and registering shutdown hooks. See STORM-3658.
int workerShutdownSleepSecs = ObjectReader.getInt(conf.get(Config.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS));"Adding shutdown hook with kill in {} secs", workerShutdownSleepSecs);
Utils.addShutdownHookWithDelayedForceKill(worker::shutdown, workerShutdownSleepSecs);

main Method creation Worker, And then call start.


public void start() throws Exception {"Launching worker for {} on {}:{} with id {} and conf {}", topologyId, assignmentId, port, workerId,
// because in local mode, its not a separate
// process. supervisor will register it in this case
// if ConfigUtils.isLocalMode(conf) returns false then it is in distributed mode.
if (!ConfigUtils.isLocalMode(conf)) {

// Distributed mode
String pid = Utils.processPid();
FileUtils.touch(new File(ConfigUtils.workerPidPath(conf, workerId, pid)));
FileUtils.writeStringToFile(new File(ConfigUtils.workerArtifactsPidPath(conf, topologyId, port)), pid,
ClusterStateContext csContext = new ClusterStateContext(DaemonType.WORKER, topologyConf);
IStateStorage stateStorage = ClusterUtils.mkStateStorage(conf, topologyConf, csContext);
IStormClusterState stormClusterState = ClusterUtils.mkStormClusterState(stateStorage, null, csContext);
metricRegistry.start(topologyConf, port);
SharedMetricRegistries.add(WORKER_METRICS_REGISTRY, metricRegistry.getRegistry());
Credentials initialCredentials = stormClusterState.credentials(topologyId, null);
Map<String, String> initCreds = new HashMap<>();
if (initialCredentials != null) {

autoCreds = ClientAuthUtils.getAutoCredentials(topologyConf);
subject = ClientAuthUtils.populateSubject(null, autoCreds, initCreds);
Subject.doAs(subject, (PrivilegedExceptionAction<Object>)
() -> loadWorker(stateStorage, stormClusterState, initCreds, initialCredentials)

The main thing here is to call loadWorker.


private Object loadWorker(IStateStorage stateStorage, IStormClusterState stormClusterState,
Map<String, String> initCreds, Credentials initialCredentials)
throws Exception {

workerState =
new WorkerState(conf, context, topologyId, assignmentId, supervisorIfaceSupplier, port, workerId,
topologyConf, stateStorage, stormClusterState,
autoCreds, metricRegistry, initialCredentials);
this.heatbeatMeter = metricRegistry.meter("doHeartbeat-calls", workerState.getWorkerTopologyContext(),
Constants.SYSTEM_COMPONENT_ID, (int) Constants.SYSTEM_TASK_ID);
// Heartbeat here so that worker process dies if this fails
// it's important that worker heartbeat to supervisor ASAP so that supervisor knows
// that worker is running and moves on
executorsAtom = new AtomicReference<>(null);
// launch heartbeat threads immediately so that slow-loading tasks don't cause the worker to timeout
// to the supervisor
.scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS), () -> {

try {

} catch (IOException e) {

throw new RuntimeException(e);
Integer execHeartBeatFreqSecs = workerState.stormClusterState.isPacemakerStateStore()
? (Integer) conf.get(Config.TASK_HEARTBEAT_FREQUENCY_SECS)
: (Integer) conf.get(Config.EXECUTOR_METRICS_FREQUENCY_SECS);
.scheduleRecurring(0, execHeartBeatFreqSecs,
List<Executor> execs = new ArrayList<>();
for (List<Long> e : workerState.getLocalExecutors()) {

if (ConfigUtils.isLocalMode(conf)) {

Executor executor = LocalExecutor.mkExecutor(workerState, e, initCreds);
for (int i = 0; i < executor.getTaskIds().size(); ++i) {

workerState.localReceiveQueues.put(executor.getTaskIds().get(i), executor.getReceiveQueue());
} else {

Executor executor = Executor.mkExecutor(workerState, e, initCreds);
for (int i = 0; i < executor.getTaskIds().size(); ++i) {

workerState.localReceiveQueues.put(executor.getTaskIds().get(i), executor.getReceiveQueue());
List<IRunningExecutor> newExecutors = new ArrayList<IRunningExecutor>();
for (Executor executor : execs) {

// This thread will send out messages destined for remote tasks (on other workers)
// If there are no remote outbound tasks, don't start the thread.
if (workerState.hasRemoteOutboundTasks()) {

transferThread = workerState.makeTransferThread();
final int credCheckMaxAllowed = 10;
final int[] credCheckErrCnt = new int[1]; // consecutive-error-count
(Integer) conf.get(Config.TASK_CREDENTIALS_POLL_SECS), () -> {

try {

credCheckErrCnt[0] = 0;
} catch (Exception ex) {

if (credCheckErrCnt[0] <= credCheckMaxAllowed) {

LOG.warn("Ignoring {} of {} consecutive exceptions when checking for credential change",
credCheckErrCnt[0], credCheckMaxAllowed, ex);
} else {

LOG.error("Received {} consecutive exceptions, {} tolerated, when checking for credential change",
credCheckErrCnt[0], credCheckMaxAllowed, ex);
throw ex;
(Integer) conf.getOrDefault(Config.WORKER_BLOB_UPDATE_POLL_INTERVAL_SECS, 10),
() -> {

try {

LOG.debug("Checking if blobs have updated");
} catch (IOException e) {

// IOException from reading the version files to be ignored
// The jitter allows the clients to get the data at different times, and avoids thundering herd
if (!(Boolean) topologyConf.get(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING)) {

workerState.refreshLoadTimer.scheduleRecurringWithJitter(0, 1, 500, Worker.this::doRefreshLoad);
(Integer) conf.get(Config.TASK_REFRESH_POLL_SECS),
(Integer) conf.get(Config.WORKER_LOG_LEVEL_RESET_POLL_SECS),
workerState.refreshActiveTimer.scheduleRecurring(0, (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS),
setupFlushTupleTimer(topologyConf, newExecutors);
setupBackPressureCheckTimer(topologyConf);"Worker has topology config {}", ConfigUtils.maskPasswords(topologyConf));"Worker {} for storm {} on {}:{} has finished loading", workerId, topologyId, assignmentId, port);
return this;

Through here workerState.getLocalExecutors() obtain List executorId Set . And then through Executor.mkExecutor Create a specified number of Executor, And then call execute() Method to ExecutorShutdown, Then save to AtomicReference<List> executorsAtom.

copyright:author[null_ wry],Please bring the original link to reprint, thank you.