Storm source code analysis (11)

null_ wry 2022-02-13 07:59:06 阅读数:791

storm source code analysis


[email protected]

First of all, let's introduce something about Worker Knowledge .
And then analyze the code .

of Worker

worker, executor,task The relationship between

worker Process , One worker It's a process , A process contains one or more threads , A thread is a executor, A thread will process one or more tasks , A task is a task, One task An instance of a node is a class .

storm A node of a cluster may have one or more worker processes (worker) Running on one or more topologies , A worker process executes a subset of the topology . Working process (worker) Belong to a specific topology , And may be one or more components of this topology (spout/bolt) Run one or more actuators (executor Threads ). A running topology includes multiple running topologies storm Processes of multiple nodes in the cluster .

1 individual worker The process performs 1 individual topology Subset ( notes : There will be no 1 individual worker For multiple topology service ).1 individual worker The process will start 1 One or more executor Thread to execute 1 individual topology Of component(spout or bolt). therefore ,1 In operation topology It is composed of multiple physical machines in the cluster worker The process consists of .

executor yes 1 Quilt worker The individual thread that the process started . Every executor Only run 1 individual topology Of 1 individual component(spout or bolt) Of task( notes :task It can be 1 One or more ,storm The default is 1 individual component Generate only 1 individual task,executor The thread will call all in sequence in each loop task example ).

task It's the final run spout or bolt Unit of code in ( notes :1 individual task That is to say spout or bolt Of 1 An example ,executor The thread calls the task Of nextTuple or execute Method ).topology After starting ,1 individual component(spout or bolt) Of task The number is fixed , But it's time to component The use of executor The number of threads can be adjusted dynamically ( for example :1 individual executor The thread can execute the component Of 1 One or more task example ). It means , about 1 individual component There are such conditions :#threads<=#tasks( namely : The number of threads is less than or equal to task number ). By default task The number of is equal to executor Number of threads , namely 1 individual executor Threads only run 1 individual task.

management Worker Event thread of the process

from Supervisor To manage Worker Event thread of the process .

Each work node runs Supervisor Daemon , Be responsible for monitoring the assigned host jobs on the work node , Start and stop Nimbus The assigned work process .

supervisor It will start from zookeeper Get extension information topologies、 Task assignment information assignments And all kinds of heartbeat information , Take this as the basis for task allocation .

stay supervisor When the synchronization , New tasks will be started according to the new task allocation worker Or close the old worker And carry out load balancing .

The process is as follows :
 Insert picture description here

The code analysis worker-launcher.c

setup_permissions()

static int setup_permissions(FTSENT* entry, uid_t euser, int user_write, boolean setgid_on_dir) {

if (lchown(entry->fts_path, euser, launcher_gid) != 0) {

fprintf(ERRORFILE, "ERROR: Failure to exec app initialization process - %s, fts_path=%s\n",
strerror(errno), entry->fts_path);
return -1;
}
mode_t mode = entry->fts_statp->st_mode;
// Preserve user read and execute and set group read and write.
mode_t new_mode = (mode & (S_IRUSR | S_IXUSR)) | S_IRGRP | S_IWGRP;
if (user_write) {

new_mode = new_mode | S_IWUSR;
}
// If the entry is a directory, Add group execute and setGID bits.
if ((mode & S_IFDIR) == S_IFDIR) {

new_mode = new_mode | S_IXGRP;
if (setgid_on_dir) {

new_mode = new_mode | S_ISGID;
}
}
if (chmod(entry->fts_path, new_mode) != 0) {

fprintf(ERRORFILE, "ERROR: Failure to exec app initialization process - %s, fts_path=%s\n",
strerror(errno), entry->fts_path);
return -1;
}
return 0;
}

Optionally, set permissions for the directory , Make it writable by the user .

We set permissions r(w)xrws—— Such filegroups ( Should be Storm User group ) Have full access to the directory , File users ( The user of the topology owner ) Able to read and execute , And write... In some directories . Set up setGID Bit to ensure storm Users can access any file created in this directory , For cleaning . If setgid_on_dir by FALSE, Then the group permission of the directory should not be set sticky position .

mkdirs()

int mkdirs(const char* path, mode_t perm) {

struct stat sb;
char * npath;
char * p;
if (stat(path, &sb) == 0) {

return check_dir(path, sb.st_mode, perm, 1);
}
npath = strdup(path);
if (npath == NULL) {

fprintf(LOGFILE, "ERROR: Not enough memory to copy path string");
return -1;
}
/* Skip leading slashes. */
p = npath;
while (*p == '/') {

p++;
}
while (NULL != (p = strchr(p, '/'))) {

*p = '\0';
if (create_validate_dir(npath, perm, path, 0) == -1) {

free(npath);
return -1;
}
*p++ = '/'; /* restore slash */
while (*p == '/')
p++;
}
/* Create the final directory component. */
if (create_validate_dir(npath, perm, path, 1) == -1) {

free(npath);
return -1;
}
free(npath);
return 0;
}

Ensure that the given path and all parent directories are created with the required permissions .

create_validate_dir()

int create_validate_dir(const char* npath, mode_t perm, const char* path, int finalComponent) {

struct stat sb;
if (stat(npath, &sb) != 0) {

if (mkdir(npath, perm) != 0) {

if (errno != EEXIST || stat(npath, &sb) != 0) {

fprintf(LOGFILE, "ERROR: Can't create directory %s - %s\n", npath,
strerror(errno));
return -1;
}
// The directory npath should exist.
if (check_dir(npath, sb.st_mode, perm, finalComponent) == -1) {

return -1;
}
}
} else {

if (check_dir(npath, sb.st_mode, perm, finalComponent) == -1) {

return -1;
}
}
return 0;
}

If the parent directory does not exist , create . If competitive conditions occur , Just check the permissions . use 0 or 1 Indicates whether this is the last component . If it is , We need to check permissions .

setup_worker_tmp_permissions()

int setup_worker_tmp_permissions(const char *worker_dir) {

char* worker_tmp = concatenate("%s/tmp", "worker tmp dir", 1, worker_dir);
if (worker_tmp != NULL) {

int exit_code = setup_dir_permissions(worker_tmp, 1, FALSE);
if (exit_code != 0) {

fprintf(ERRORFILE, "ERROR: setup_dir_permissions on %s failed\n", worker_tmp);
fflush(ERRORFILE);
}
return exit_code;
} else {

return -1;
}
}

remove worker-id/tmp On the catalog setgid In order to java Analysis can work . This is for non-container workers I don't need it for you . But it's better to be consistent .

copyright:author[null_ wry],Please bring the original link to reprint, thank you. https://en.javamana.com/2022/02/202202130759035074.html