5ycode 2022-01-26 18:09:48 阅读数:719
The code I annotate is mainly in redis5.0 On the branch .6.2 There are few comments on .
https://github.com/yxkong/redis/tree/6.2
https://github.com/yxkong/redis/tree/5.0
Although in the end, a processor is added readQueryFromClient Of FileEvent event , however 6.2 The branch is a little around , May be 6 I want to move closer to object-oriented in the future .
stay 5.0 in createClient I created a FileEvent
aeCreateFileEvent(server.el,fd,AE_READABLE, readQueryFromClient, c)
stay 6.2 In the branch createClient When , adopt connSetReadHandler Indirect creation FileEvent
stay networking.c in
// register readQueryFromClient, When connection Called on callback when reading
connSetReadHandler(conn, readQueryFromClient);
This one is a little windy , For those familiar with java For my classmates
stay connection.h The structure is defined in ConnectionType, And connection There is one ConnectionType Properties of , Here are a bunch of interfaces
typedef struct ConnectionType {
void (*ae_handler)(struct aeEventLoop *el, int fd, void *clientData, int mask);
int (*connect)(struct connection *conn, const char *addr, int port, const char *source_addr, ConnectionCallbackFunc connect_handler);
int (*write)(struct connection *conn, const void *data, size_t data_len);
int (*read)(struct connection *conn, void *buf, size_t buf_len);
void (*close)(struct connection *conn);
int (*accept)(struct connection *conn, ConnectionCallbackFunc accept_handler);
int (*set_write_handler)(struct connection *conn, ConnectionCallbackFunc handler, int barrier);
int (*set_read_handler)(struct connection *conn, ConnectionCallbackFunc handler);
const char *(*get_last_error)(struct connection *conn);
int (*blocking_connect)(struct connection *conn, const char *addr, int port, long long timeout);
ssize_t (*sync_write)(struct connection *conn, char *ptr, ssize_t size, long long timeout);
ssize_t (*sync_read)(struct connection *conn, char *ptr, ssize_t size, long long timeout);
ssize_t (*sync_readline)(struct connection *conn, char *ptr, ssize_t size, long long timeout);
int (*get_type)(struct connection *conn);
} ConnectionType;
struct connection {
ConnectionType *type;
ConnectionState state;
short int flags;
short int refs;
int last_errno;
void *private_data;
ConnectionCallbackFunc conn_handler;
ConnectionCallbackFunc write_handler;
ConnectionCallbackFunc read_handler;
int fd;
};
stay connection.c in , Yes ConnectionType There is a default implementation ( I don't know if that's right )
ConnectionType CT_Socket = {
.ae_handler = connSocketEventHandler,
.close = connSocketClose,
.write = connSocketWrite,
.read = connSocketRead,
.accept = connSocketAccept,
.connect = connSocketConnect,
.set_write_handler = connSocketSetWriteHandler,
.set_read_handler = connSocketSetReadHandler,
.get_last_error = connSocketGetLastError,
.blocking_connect = connSocketBlockingConnect,
.sync_write = connSocketSyncWrite,
.sync_read = connSocketSyncRead,
.sync_readline = connSocketSyncReadLine,
.get_type = connSocketGetType
};
static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
if (func == conn->read_handler) return C_OK;
conn->read_handler = func;
if (!conn->read_handler)
aeDeleteFileEvent(server.el,conn->fd,AE_READABLE);
else
// The real goal is here , Create a fileEvent event , The callback function is readQueryFromClient
if (aeCreateFileEvent(server.el,conn->fd,
AE_READABLE,conn->type->ae_handler,conn) == AE_ERR) return C_ERR;
return C_OK;
}
static inline int connSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
// So what I call here is connection.c Medium connSocketSetReadHandler
return conn->type->set_read_handler(conn, func);
}
fileEvent When it's created , Don't say how to trigger , As mentioned earlier .
stay redis6.2 There's a branch initThreadedIO(), ad locum
Initialize the io Thread pool array
The main thread is io The first thread in the thread pool
At the same time, the execution methods of other threads are set , Rotation training is waiting to process the assigned io_threads_list(io Read and write task list )
The real deal IO read ,IO Write
/**
* @brief initialization IO Threads
*/
void initThreadedIO(void) {
server.io_threads_active = 0; /* We start with threads not active. */
//io_threads_num=1 sign out , Because the main process undertakes
if (server.io_threads_num == 1) return;
// Greater than 128 A thread exits the startup process
if (server.io_threads_num > IO_THREADS_MAX_NUM) {
serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
"The maximum number is %d.", IO_THREADS_MAX_NUM);
exit(1);
}
/* Spawn and initialize the I/O threads. */
/**
* @brief Create and initialize io Thread pool
*/
for (int i = 0; i < server.io_threads_num; i++) {
/* Things we do for all the threads including the main thread. */
//i=0 It's the main thread , The main thread also handles io
io_threads_list[i] = listCreate();
if (i == 0) continue; /* Thread 0 is the main thread. */
/* Things we do only for the additional threads. */
// Initialize mutex , And lock
pthread_t tid;
pthread_mutex_init(&io_threads_mutex[i],NULL);
//io Waiting quantity , Set up io_threads_pending[i] by 0
setIOPendingCount(i, 0);
pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */
if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
exit(1);
}
io_threads[i] = tid;
}
}
Let's take another look IOThreadMain in
There are mainly empty rotation training , Wait until the task is done
Each round of training 100 Ten thousand times , Continue rotation training after running empty 100 Ten thousand times , If the main thread
**
* @brief io What the thread executes , Be similar to java Thread run
* @param myid io Index of array
* @return void*
*/
void *IOThreadMain(void *myid) {
/* The ID is the thread number (from 0 to server.iothreads_num-1), and is
* used by the thread to just manipulate a single sub-array of clients. */
long id = (unsigned long)myid;
char thdname[16];
//io_thd_+ Thread index Is the corresponding thread name
snprintf(thdname, sizeof(thdname), "io_thd_%ld", id);
redis_set_thread_title(thdname);
// Will later io The thread is bound to the specified cpu On the list
redisSetCpuAffinity(server.server_cpulist);
// Set thread to cancel
makeThreadKillable();
while(1) {
/* Wait for start */
/**
* @brief Empty rotation training , Until... Corresponding to the thread number io_threads_pending Filled with data or this 100 Ten thousand times , Will come back in for rotation training
*/
for (int j = 0; j < 1000000; j++) {
if (getIOPendingCount(id) != 0) break;
}
/* Give the main thread a chance to stop this thread. */
// Here let the main thread stop , Look at it later TODO
if (getIOPendingCount(id) == 0) {
pthread_mutex_lock(&io_threads_mutex[id]);
pthread_mutex_unlock(&io_threads_mutex[id]);
continue;
}
serverAssert(getIOPendingCount(id) != 0);
/* Process: note that the main thread will never touch our list
* before we drop the pending count to 0. */
listIter li;
listNode *ln;
listRewind(io_threads_list[id],&li);
// Through each io Tasks in threads
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
// Handle the corresponding write event
if (io_threads_op == IO_THREADS_OP_WRITE) {
writeToClient(c,0);
} else if (io_threads_op == IO_THREADS_OP_READ) {
// Handle the corresponding read event
readQueryFromClient(c->conn);
} else {
serverPanic("io_threads_op value is unknown");
}
}
listEmpty(io_threads_list[id]);
// Set to 0
setIOPendingCount(id, 0);
}
}
stay 6.2 There is one in the branch postponeClientRead function . Here, if it's turned on io Multithreading , Just put client Throw into server.clients_pending_read in
int postponeClientRead(client *c) {
if (server.io_threads_active &&
server.io_threads_do_reads &&
!ProcessingEventsWhileBlocked &&
!(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ|CLIENT_BLOCKED)))
{
c->flags |= CLIENT_PENDING_READ;
listAddNodeHead(server.clients_pending_read,c);
return 1;
} else {
return 0;
}
}
stay redis6.2 In the branch ,beforesleep The execution of is put in aeApiPoll Before
int aeProcessEvents(aeEventLoop *eventLoop, int flags){
//6.0 It was outside , Now it's adjusted to the inside
if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
eventLoop->beforesleep(eventLoop);
/* Call the multiplexing API, will return only on timeout or when
* some event fires. */
numevents = aeApiPoll(eventLoop, tvp);
/* After sleep callback. */
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
eventLoop->aftersleep(eventLoop);
}
meanwhile beforeSleep and afterSleep My settings have been moved to initServer
void initServer(void) {
aeSetBeforeSleepProc(server.el,beforeSleep);
aeSetAfterSleepProc(server.el,afterSleep);
}
stay redis6.2 In the branch beforeSleep In, we focus on the following two methods , Both methods networking.c in
// Use io The thread pool handles the client waiting to read
handleClientsWithPendingReadsUsingThreads()
// Use io The thread pool handles the client waiting to write
handleClientsWithPendingWritesUsingThreads();
First from server.clients_pending_read Get the mission , from 0 Start taking the mold and distributing it to all io Threads
The main thread handles the tasks assigned to it io Read task
The main thread waits for all io The thread executes the assigned task
Handle clients_pending_read The task ( Because in aeApiPoll It will be triggered to add here , So the mission here won't change )
This is the time to client from clients_pending_read Delete in
The main thread executes the corresponding command
Add the completed task to server.clients_pending_write
/**
* @brief Use io Thread read
* @return int
*/
int handleClientsWithPendingReadsUsingThreads(void) {
if (!server.io_threads_active || !server.io_threads_do_reads) return 0;
int processed = listLength(server.clients_pending_read);
if (processed == 0) return 0;
/* Distribute the clients across N different lists. */
listIter li;
listNode *ln;
// Get the task from the waiting to read queue
listRewind(server.clients_pending_read,&li);
int item_id = 0;
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
// By taking the mold, it is put into the corresponding io_threads_list A party , That's what's blowing inside Round Robin
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}
io_threads_op = IO_THREADS_OP_READ;
// Set the corresponding io The number of tasks in the thread
for (int j = 1; j < server.io_threads_num; j++) {
int count = listLength(io_threads_list[j]);
setIOPendingCount(j, count);
}
// The main thread handles the tasks assigned to it io read
listRewind(io_threads_list[0],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
readQueryFromClient(c->conn);
}
listEmpty(io_threads_list[0]);
// Main thread, etc io The thread will not continue until it is processed
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += getIOPendingCount(j);
if (pending == 0) break;
}
//server.clients_pending_read The length is not 0, I've been carrying out
while(listLength(server.clients_pending_read)) {
ln = listFirst(server.clients_pending_read);
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_READ;
// from clients_pending_read Delete client
listDelNode(server.clients_pending_read,ln);
serverAssert(!(c->flags & CLIENT_BLOCKED));
// Carry out orders
if (processPendingCommandsAndResetClient(c) == C_ERR) {
continue;
}
// Read
processInputBuffer(c);
/* We may have pending replies if a thread readQueryFromClient() produced
* replies and did not install a write handler (it can't).
*/
if (!(c->flags & CLIENT_PENDING_WRITE) && clientHasPendingReplies(c))
// writes server.clients_pending_write
clientInstallWriteHandler(c);
}
/* Update processed count on server */
server.stat_io_reads_processed += processed;
return processed;
}
handleClientsWithPendingWritesUsingThreads The code processing logic is similar to that above , No more writing .
Press tcp Access sequence , establish fd
And then through epoll Use readQueryFromClient Read in to server.clients_pending_read
The main thread is from server.clients_pending_read Read client And distribute the tasks to all IO Threads ( The main thread is io The index in the thread pool is 0 The thread of )
The main thread executes the tasks assigned to it io Mission , other io The thread waits for the assigned task and executes it through empty rotation training
The main thread waits for all io The thread is finished ( Through the various io The total number of tasks of threads is 0 To judge )
Again from server.clients_pending_read Read in client, And then from server.clients_pending_read Delete
Sequential execution client Tasks bound on , Write to... After execution server.clients_pending_write( These are sequential )
redis Source analysis series
**redis Source code reading - Introduction
**
**redis Source code reading 2 - Finally put redis The start-up process is clear
**
**redis Source reading three - Finally understand the main task execution
**
copyright:author[5ycode],Please bring the original link to reprint, thank you. https://en.javamana.com/2022/01/202201261809446916.html