Reading 4 of redis source code - I have sorted out the IO multithreading execution process in redis6

5ycode 2022-01-26 18:09:48 阅读数:719

reading redis source code sorted

Previously analyzed through redis Source code reading 2 - Finally put redis The start-up process is clear Analysis of the redis Startup process , adopt redis Source reading three - Finally understand the main task execution Analysis of the redis The main task of , This time from redis6.2 Analysis on branches redis6 Of io Multithreading .

The code I annotate is mainly in redis5.0 On the branch .6.2 There are few comments on .

acceptTcpHandler Differences in processing

Although in the end, a processor is added readQueryFromClient Of FileEvent event , however 6.2 The branch is a little around , May be 6 I want to move closer to object-oriented in the future .

stay 5.0 in createClient I created a FileEvent

aeCreateFileEvent(server.el,fd,AE_READABLE, readQueryFromClient, c)

stay 6.2 In the branch createClient When , adopt connSetReadHandler Indirect creation FileEvent

 stay networking.c in
// register readQueryFromClient, When connection Called on callback when reading
connSetReadHandler(conn, readQueryFromClient);
This one is a little windy , For those familiar with java For my classmates
stay connection.h The structure is defined in ConnectionType, And connection There is one ConnectionType Properties of , Here are a bunch of interfaces
typedef struct ConnectionType {
void (*ae_handler)(struct aeEventLoop *el, int fd, void *clientData, int mask);
int (*connect)(struct connection *conn, const char *addr, int port, const char *source_addr, ConnectionCallbackFunc connect_handler);
int (*write)(struct connection *conn, const void *data, size_t data_len);
int (*read)(struct connection *conn, void *buf, size_t buf_len);
void (*close)(struct connection *conn);
int (*accept)(struct connection *conn, ConnectionCallbackFunc accept_handler);
int (*set_write_handler)(struct connection *conn, ConnectionCallbackFunc handler, int barrier);
int (*set_read_handler)(struct connection *conn, ConnectionCallbackFunc handler);
const char *(*get_last_error)(struct connection *conn);
int (*blocking_connect)(struct connection *conn, const char *addr, int port, long long timeout);
ssize_t (*sync_write)(struct connection *conn, char *ptr, ssize_t size, long long timeout);
ssize_t (*sync_read)(struct connection *conn, char *ptr, ssize_t size, long long timeout);
ssize_t (*sync_readline)(struct connection *conn, char *ptr, ssize_t size, long long timeout);
int (*get_type)(struct connection *conn);
} ConnectionType;
struct connection {
ConnectionType *type;
ConnectionState state;
short int flags;
short int refs;
int last_errno;
void *private_data;
ConnectionCallbackFunc conn_handler;
ConnectionCallbackFunc write_handler;
ConnectionCallbackFunc read_handler;
int fd;
stay connection.c in , Yes ConnectionType There is a default implementation ( I don't know if that's right )
ConnectionType CT_Socket = {
.ae_handler = connSocketEventHandler,
.close = connSocketClose,
.write = connSocketWrite,
.read = connSocketRead,
.accept = connSocketAccept,
.connect = connSocketConnect,
.set_write_handler = connSocketSetWriteHandler,
.set_read_handler = connSocketSetReadHandler,
.get_last_error = connSocketGetLastError,
.blocking_connect = connSocketBlockingConnect,
.sync_write = connSocketSyncWrite,
.sync_read = connSocketSyncRead,
.sync_readline = connSocketSyncReadLine,
.get_type = connSocketGetType
static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
if (func == conn->read_handler) return C_OK;
conn->read_handler = func;
if (!conn->read_handler)
// The real goal is here , Create a fileEvent event , The callback function is readQueryFromClient
if (aeCreateFileEvent(server.el,conn->fd,
AE_READABLE,conn->type->ae_handler,conn) == AE_ERR) return C_ERR;
return C_OK;
static inline int connSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
// So what I call here is connection.c Medium connSocketSetReadHandler
return conn->type->set_read_handler(conn, func);

fileEvent When it's created , Don't say how to trigger , As mentioned earlier .

InitServerLast The difference in

stay redis6.2 There's a branch initThreadedIO(), ad locum

  • Initialize the io Thread pool array

  • The main thread is io The first thread in the thread pool

  • At the same time, the execution methods of other threads are set , Rotation training is waiting to process the assigned io_threads_list(io Read and write task list )

  • The real deal IO read ,IO Write

* @brief initialization IO Threads
void initThreadedIO(void) {
server.io_threads_active = 0; /* We start with threads not active. */
//io_threads_num=1 sign out , Because the main process undertakes
if (server.io_threads_num == 1) return;
// Greater than 128 A thread exits the startup process
if (server.io_threads_num > IO_THREADS_MAX_NUM) {
serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
"The maximum number is %d.", IO_THREADS_MAX_NUM);
/* Spawn and initialize the I/O threads. */
* @brief Create and initialize io Thread pool
for (int i = 0; i < server.io_threads_num; i++) {
/* Things we do for all the threads including the main thread. */
//i=0 It's the main thread , The main thread also handles io
io_threads_list[i] = listCreate();
if (i == 0) continue; /* Thread 0 is the main thread. */
/* Things we do only for the additional threads. */
// Initialize mutex , And lock
pthread_t tid;
//io Waiting quantity , Set up io_threads_pending[i] by 0
setIOPendingCount(i, 0);
pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */
if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
io_threads[i] = tid;

Let's take another look IOThreadMain in

  • There are mainly empty rotation training , Wait until the task is done

  • Each round of training 100 Ten thousand times , Continue rotation training after running empty 100 Ten thousand times , If the main thread

* @brief io What the thread executes , Be similar to java Thread run
* @param myid io Index of array
* @return void*
void *IOThreadMain(void *myid) {
/* The ID is the thread number (from 0 to server.iothreads_num-1), and is
* used by the thread to just manipulate a single sub-array of clients. */
long id = (unsigned long)myid;
char thdname[16];
//io_thd_+ Thread index Is the corresponding thread name
snprintf(thdname, sizeof(thdname), "io_thd_%ld", id);
// Will later io The thread is bound to the specified cpu On the list
// Set thread to cancel
while(1) {
/* Wait for start */
* @brief Empty rotation training , Until... Corresponding to the thread number io_threads_pending Filled with data or this 100 Ten thousand times , Will come back in for rotation training
for (int j = 0; j < 1000000; j++) {
if (getIOPendingCount(id) != 0) break;
/* Give the main thread a chance to stop this thread. */
// Here let the main thread stop , Look at it later TODO
if (getIOPendingCount(id) == 0) {
serverAssert(getIOPendingCount(id) != 0);
/* Process: note that the main thread will never touch our list
* before we drop the pending count to 0. */
listIter li;
listNode *ln;
// Through each io Tasks in threads
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
// Handle the corresponding write event
if (io_threads_op == IO_THREADS_OP_WRITE) {
} else if (io_threads_op == IO_THREADS_OP_READ) {
// Handle the corresponding read event
} else {
serverPanic("io_threads_op value is unknown");
// Set to 0
setIOPendingCount(id, 0);

readQueryFromClient Differences in processing

stay 6.2 There is one in the branch postponeClientRead function . Here, if it's turned on io Multithreading , Just put client Throw into server.clients_pending_read in

int postponeClientRead(client *c) {
if (server.io_threads_active &&
server.io_threads_do_reads &&
!ProcessingEventsWhileBlocked &&
return 1;
} else {
return 0;


stay redis6.2 In the branch ,beforesleep The execution of is put in aeApiPoll Before

int aeProcessEvents(aeEventLoop *eventLoop, int flags){
//6.0 It was outside , Now it's adjusted to the inside
if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
/* Call the multiplexing API, will return only on timeout or when
* some event fires. */
numevents = aeApiPoll(eventLoop, tvp);
/* After sleep callback. */
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)

meanwhile beforeSleep and afterSleep My settings have been moved to initServer

void initServer(void) {

stay redis6.2 In the branch beforeSleep In, we focus on the following two methods , Both methods networking.c in

// Use io The thread pool handles the client waiting to read
// Use io The thread pool handles the client waiting to write
  1. First from server.clients_pending_read Get the mission , from 0 Start taking the mold and distributing it to all io Threads

  2. The main thread handles the tasks assigned to it io Read task

  3. The main thread waits for all io The thread executes the assigned task

  4. Handle clients_pending_read The task ( Because in aeApiPoll It will be triggered to add here , So the mission here won't change )

  5. This is the time to client from clients_pending_read Delete in

  6. The main thread executes the corresponding command

  7. Add the completed task to server.clients_pending_write

* @brief Use io Thread read
* @return int
int handleClientsWithPendingReadsUsingThreads(void) {
if (!server.io_threads_active || !server.io_threads_do_reads) return 0;
int processed = listLength(server.clients_pending_read);
if (processed == 0) return 0;
/* Distribute the clients across N different lists. */
listIter li;
listNode *ln;
// Get the task from the waiting to read queue
int item_id = 0;
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
// By taking the mold, it is put into the corresponding io_threads_list A party , That's what's blowing inside Round Robin
int target_id = item_id % server.io_threads_num;
io_threads_op = IO_THREADS_OP_READ;
// Set the corresponding io The number of tasks in the thread
for (int j = 1; j < server.io_threads_num; j++) {
int count = listLength(io_threads_list[j]);
setIOPendingCount(j, count);
// The main thread handles the tasks assigned to it io read
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
// Main thread, etc io The thread will not continue until it is processed
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += getIOPendingCount(j);
if (pending == 0) break;
//server.clients_pending_read The length is not 0, I've been carrying out
while(listLength(server.clients_pending_read)) {
ln = listFirst(server.clients_pending_read);
client *c = listNodeValue(ln);
// from clients_pending_read Delete client
serverAssert(!(c->flags & CLIENT_BLOCKED));
// Carry out orders
if (processPendingCommandsAndResetClient(c) == C_ERR) {
// Read
/* We may have pending replies if a thread readQueryFromClient() produced
* replies and did not install a write handler (it can't).
if (!(c->flags & CLIENT_PENDING_WRITE) && clientHasPendingReplies(c))
// writes server.clients_pending_write
/* Update processed count on server */
server.stat_io_reads_processed += processed;
return processed;

handleClientsWithPendingWritesUsingThreads The code processing logic is similar to that above , No more writing .

So just to conclude ( Enable multithreading io Under the circumstances )

  • Press tcp Access sequence , establish fd

  • And then through epoll Use readQueryFromClient Read in to server.clients_pending_read

  • The main thread is from server.clients_pending_read Read client And distribute the tasks to all IO Threads ( The main thread is io The index in the thread pool is 0 The thread of )

  • The main thread executes the tasks assigned to it io Mission , other io The thread waits for the assigned task and executes it through empty rotation training

  • The main thread waits for all io The thread is finished ( Through the various io The total number of tasks of threads is 0 To judge )

  • Again from server.clients_pending_read Read in client, And then from server.clients_pending_read Delete

  • Sequential execution client Tasks bound on , Write to... After execution server.clients_pending_write( These are sequential )

redis Source analysis series

**redis Source code reading - Introduction

**redis Source code reading 2 - Finally put redis The start-up process is clear

**redis Source reading three - Finally understand the main task execution

**redis Of key You can take it out after it expires ?

copyright:author[5ycode],Please bring the original link to reprint, thank you.