redis4.0之利用管道优化aofrewrite

    xiaoxiao2021-04-16  198

    前言

    redis的aof持久化本质上是一个redo log,把所有执行过的写命令追加到aof文件中。那么随着redis的运行,aof文件会不断膨胀,当触发收缩条件时就要做aofrewrite。

    redis是通过fork子进程来做aofrewrite,同时为了保证aof的连续性,父进程把aofrewrite期间的写命令缓存起来,等收割完子进程之后再追加到新的aof文件。如果期间写入量较大的话收割时就要有大量的写磁盘操作,造成性能下降。

    为了提高aofrewrite效率,redis通过在父子进程间建立管道,把aofrewrite期间的写命令通过管道同步给子进程,追加写盘的操作也就转交给了子进程。其实利用管道的优化在3.0时期就已经实现了,最近在总结4.0的新特性,索性就都归到4.0里,方便查阅。

    aofrewrite详解

    1. aofrewrite的基础实现

    上图是aofrewrite的流程,标注为基本的函数调用关系。

    1 - 首先,通过命令或是事件触发aofrewrite,调用rewriteAppendOnlyFileBackground()函数

    该函数会fork出一个子进程

    2 - 父进程记录子进程的pid并开始缓存写命令

    当pid不为-1时就会执行aofRewriteBufferAppend()把写命令缓存起来

    3 - 子进程调用rewriteAppendOnlyFile(tmpfile)函数创建新的aof文件

    调用rewriteAppendOnlyFileRio()函数遍历redis把所有key-value以命令的方式写入新aof文件完成后调用exitFromChild(0)退出

    4 - 子进程退出后父进程调用backgroundRewriteDoneHandler()来处理

    调用aofRewriteBufferWrite()函数把积攒的写命令缓存写入子进程创建的临时aof文件最后rename()用新的aof文件替换掉原来的aof文件

    在aofrewrite过程中,如果redis本身数据量较大子进程执行时间较长,或者写入流量较高,就会导致aof-rewrite-buffer积攒较多,父进程就要进行大量写磁盘操作,这对于redis来说显然是不够高效的。

    2. 使用pipe优化

    为了提高aofrewrite效率,redis使用pipe来优化,下图中红色标注即为优化的部分:

    优化点:

    1 - 父进程建立管道

    共三条管道,分别为一条数据管道,和两条控制管道数据管道用来传输数据,控制管道用来做父子进程交互,控制何时停止数据传输

    2 - 父进程向管道写数据

    注册写事件aofChildWriteDiffData()向数据管道写数据

    3 - 子进程从管道读数据

    子进程在生成新aof文件时会定期调用aofReadDiffFromParent()从管道读取数据,并缓存下来

    4 - 父子进程交互

    子进程生成新aof文件后会通过控制管道向父进程发送"!",发起停止数据传输请求父进程收到停止信号后激活读事件处理函数aofChildPipeReadable(),设置server.aof_stop_sending_diff=1停止数据传输,并向子进程回复"!",表示同意停止子进程收到父进程的应答,调用rioWrite()把积攒的数据追加到新的aof文件,最后退出

    细心的读者会发现,aofRewriteBufferAppend()和aofRewriteBufferWrite()这一对函数仍然保留,父进程还是要把aof-rewrite-buffer写盘吗?是的,这是因为父子进程是异步结构,父子间总会有那么一点代沟,aof-rewrite-buffer还是需要保留的,不过这个时候父进程写盘的数据量就很小了,几乎可以忽略。

    3. aofrewrite代码剖析

    aofrewrite的触发条件

    执行bgrewriteaof命令。 serverCron时间事件检测到aof文件大小超限。

    命令的触发不必详述,主要来看下serverCron的触发:

    int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { ... /* Trigger an AOF rewrite if needed */ if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 && server.aof_rewrite_perc && server.aof_current_size > server.aof_rewrite_min_size) { long long base = server.aof_rewrite_base_size ? server.aof_rewrite_base_size : 1; long long growth = (server.aof_current_size*100/base) - 100; if (growth >= server.aof_rewrite_perc) { serverLog(LL_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth); rewriteAppendOnlyFileBackground(); } } ... }

    也就是说aof文件大小超过了server.aof_rewrite_min_size,并且增长率大于server.aof_rewrite_perc时就会触发(增长率计算的基数server.aof_rewrite_base_size是上次aofrewrite完之后aof文件的大小)。

    目前云redis设置server.aof_rewrite_min_size为内存规格的1/4,server.aof_rewrite_perc为100。

    管道建立

    aofrewrite触发之后进入rewriteAppendOnlyFileBackground()函数:

    int rewriteAppendOnlyFileBackground(void) { pid_t childpid; long long start; if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR; if (aofCreatePipes() != C_OK) return C_ERR; openChildInfoPipe(); start = ustime(); if ((childpid = fork()) == 0) { ...

    OK,重点来了,在fork之前调用了aofCreatePipes()函数来创建管道(openChildInfoPipe()函数只是用来收集子进程copy-on-write用到的内存,就不详细展开了):

    int aofCreatePipes(void) { int fds[6] = {-1, -1, -1, -1, -1, -1}; int j; if (pipe(fds) == -1) goto error; /* parent -> children data. 父进程向子进程写数据的管道*/ if (pipe(fds+2) == -1) goto error; /* children -> parent ack. 子进程向父进程发起停止传输的控制管道*/ if (pipe(fds+4) == -1) goto error; /* parent -> children ack. 父进程向子进程回复的控制管道*/ /* Parent -> children data is non blocking. */ if (anetNonBlock(NULL,fds[0]) != ANET_OK) goto error; if (anetNonBlock(NULL,fds[1]) != ANET_OK) goto error; if (aeCreateFileEvent(server.el, fds[2], AE_READABLE, aofChildPipeReadable, NULL) == AE_ERR) goto error; //注册读事件处理函数,负责处理子进程要求停止数据传输的消息 server.aof_pipe_write_data_to_child = fds[1]; //父进程向子进程写数据的fd server.aof_pipe_read_data_from_parent = fds[0]; //子进程从父进程读数据的fd server.aof_pipe_write_ack_to_parent = fds[3]; //子进程向父进程发起停止消息的fd server.aof_pipe_read_ack_from_child = fds[2]; //父进程从子进程读取停止消息的fd server.aof_pipe_write_ack_to_child = fds[5]; //父进程向子进程回复消息的fd server.aof_pipe_read_ack_from_parent = fds[4]; //子进程从父进程读取回复消息的fd server.aof_stop_sending_diff = 0; //是否停止管道传输标记位 return C_OK; ... }

    父进程与管道传输

    管道建立起来了我们再来看看fork之后父进程和子进程如何工作,首先看下父进程:

    /* Parent */ server.stat_fork_time = ustime()-start; server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */ latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000); ... server.aof_rewrite_scheduled = 0; server.aof_rewrite_time_start = time(NULL); server.aof_child_pid = childpid; updateDictResizePolicy(); /* We set appendseldb to -1 in order to force the next call to the * feedAppendOnlyFile() to issue a SELECT command, so the differences * accumulated by the parent into server.aof_rewrite_buf will start * with a SELECT statement and it will be safe to merge. */ server.aof_selected_db = -1; ...

    父进程这里做的事情并不多,主要是信息的记录和一些标记位设置

    记录fork消耗的时间,info命令可以查看上次fork的耗时latest_fork_usec,单位微秒设置server.aof_rewrite_scheduled = 0,防止serverCron再次触发aofrewrite设置server.aof_child_pid为子进程pid,其不为-1时redis才会向aof-rewrite-buffer缓存写命令updateDictResizePolicy()禁止所有hash数据结构resize,这是为了尽量避免子进程copy-on-write进行内存拷贝设置server.aof_selected_db = -1,下一次的aof日志会强制加上select,这是为了保证命令执行到正确的db

    接下来就是缓存写命令和管道通信部分了,入口是在feedAppendOnlyFile():

    void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) { ... if (server.aof_child_pid != -1) aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf)); ... }

    server.aof_child_pid在这时就生效了,开始缓存写命令:

    void aofRewriteBufferAppend(unsigned char *s, unsigned long len) { listNode *ln = listLast(server.aof_rewrite_buf_blocks); aofrwblock *block = ln ? ln->value : NULL; while(len) { /* If we already got at least an allocated block, try appending * at least some piece into it. */ if (block) { unsigned long thislen = (block->free < len) ? block->free : len; if (thislen) { /* The current block is not already full. */ memcpy(block->buf+block->used, s, thislen); block->used += thislen; block->free -= thislen; s += thislen; len -= thislen; } } if (len) { /* First block to allocate, or need another block. */ int numblocks; block = zmalloc(sizeof(*block)); block->free = AOF_RW_BUF_BLOCK_SIZE; block->used = 0; listAddNodeTail(server.aof_rewrite_buf_blocks,block); /* Log every time we cross more 10 or 100 blocks, respectively * as a notice or warning. */ numblocks = listLength(server.aof_rewrite_buf_blocks); if (((numblocks+1) % 10) == 0) { int level = ((numblocks+1) % 100) == 0 ? LL_WARNING : LL_NOTICE; serverLog(level,"Background AOF buffer size: %lu MB", aofRewriteBufferSize()/(1024*1024)); } } } if (aeGetFileEvents(server.el,server.aof_pipe_write_data_to_child) == 0) { aeCreateFileEvent(server.el, server.aof_pipe_write_data_to_child, AE_WRITABLE, aofChildWriteDiffData, NULL); } }

    redis用链表server.aof_rewrite_buf_blocks来缓存aofrewrite期间的写命令,链表的每个节点最大10MB;重点是在最后的写事件注册,当server.aof_pipe_write_data_to_child这个fd没有注册事件时,就注册写事件函数aofChildWriteDiffData:

    void aofChildWriteDiffData(aeEventLoop *el, int fd, void *privdata, int mask) { listNode *ln; aofrwblock *block; ssize_t nwritten; ... while(1) { ln = listFirst(server.aof_rewrite_buf_blocks); block = ln ? ln->value : NULL; if (server.aof_stop_sending_diff || !block) { aeDeleteFileEvent(server.el,server.aof_pipe_write_data_to_child, AE_WRITABLE); return; } if (block->used > 0) { nwritten = write(server.aof_pipe_write_data_to_child, block->buf,block->used); if (nwritten <= 0) return; memmove(block->buf,block->buf+nwritten,block->used-nwritten); block->used -= nwritten; block->free += nwritten; } if (block->used == 0) listDelNode(server.aof_rewrite_buf_blocks,ln); } }

    每次事件循环都会把server.aof_rewrite_buf_blocks积攒的写命令全部同步给子进程,除非server.aof_stop_sending_diff被设置了停止标记。

    子进程和管道传输

    接下来看下子进程:

    ... /* Child */ char tmpfile[256]; closeListeningSockets(0); redisSetProcTitle("redis-aof-rewrite"); snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid()); if (rewriteAppendOnlyFile(tmpfile) == C_OK) { ... exitFromChild(0); } else { exitFromChild(1); } ...

    子进程首先关闭监听端口,然后就进入rewriteAppendOnlyFile()函数:

    int rewriteAppendOnlyFile(char *filename) { ... snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid()); fp = fopen(tmpfile,"w"); ... server.aof_child_diff = sdsempty(); ... if (rewriteAppendOnlyFileRio(&aof) == C_ERR) goto werr; ...

    首先打开一个临时aof文件,并初始化server.aof_child_diff缓存准备从父进程读数据,然后就调用rewriteAppendOnlyFileRio()来写aof文件和读取管道中的数据:

    int rewriteAppendOnlyFileRio(rio *aof) { ... if (aof->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES) { processed = aof->processed_bytes; aofReadDiffFromParent(); } ... }

    在遍历redis把key-value写入新aof文件过程中,新aof文件每增长10K就会调用aofReadDiffFromParent()从管道中读取数据追加到server.aof_child_diff:

    ssize_t aofReadDiffFromParent(void) { char buf[65536]; /* Default pipe buffer size on most Linux systems. */ ssize_t nread, total = 0; while ((nread = read(server.aof_pipe_read_data_from_parent,buf,sizeof(buf))) > 0) { server.aof_child_diff = sdscatlen(server.aof_child_diff,buf,nread); total += nread; } return total; }

    停止管道传输

    子进程在遍历完redis生成好新的aof文件之后就要准备退出了,那么退出前要先告诉父进程停止管道传输,依然回到rewriteAppendOnlyFile()函数来看:

    int rewriteAppendOnlyFile(char *filename) { ... /* Ask the master to stop sending diffs. */ if (write(server.aof_pipe_write_ack_to_parent,"!",1) != 1) goto werr; if (anetNonBlock(NULL,server.aof_pipe_read_ack_from_parent) != ANET_OK) goto werr; /* We read the ACK from the server using a 10 seconds timeout. Normally * it should reply ASAP, but just in case we lose its reply, we are sure * the child will eventually get terminated. */ if (syncRead(server.aof_pipe_read_ack_from_parent,&byte,1,5000) != 1 || byte != '!') goto werr; serverLog(LL_NOTICE,"Parent agreed to stop sending diffs. Finalizing AOF..."); /* Read the final diff if any. */ aofReadDiffFromParent(); /* Write the received diff to the file. */ serverLog(LL_NOTICE, "Concatenating %.2f MB of AOF diff received from parent.", (double) sdslen(server.aof_child_diff) / (1024*1024)); if (rioWrite(&aof,server.aof_child_diff,sdslen(server.aof_child_diff)) == 0) goto werr; /* Make sure data will not remain on the OS's output buffers */ if (fflush(fp) == EOF) goto werr; if (fsync(fileno(fp)) == -1) goto werr; if (fclose(fp) == EOF) goto werr; ... }

    这里写的就很直接了:

    使用write向控制管道写入"!"发起停止请求,然后读取返回结果,超时时间为10s超时就goto werr异常退出,10s内读取到"!"就继续再次调用aofReadDiffFromParent()从数据管道读取数据确保管道中没有遗留最后rioWrite()把server.aof_child_diff积攒的数据追加到新的aof文件

    那么父进程是如何处理"!"的呢,还记得之前注册的读事件aofChildPipeReadable()吧,子进程向控制管道发送"!"就会激活:

    void aofChildPipeReadable(aeEventLoop *el, int fd, void *privdata, int mask) { char byte; ... if (read(fd,&byte,1) == 1 && byte == '!') { serverLog(LL_NOTICE,"AOF rewrite child asks to stop sending diffs."); server.aof_stop_sending_diff = 1; if (write(server.aof_pipe_write_ack_to_child,"!",1) != 1) { /* If we can't send the ack, inform the user, but don't try again * since in the other side the children will use a timeout if the * kernel can't buffer our write, or, the children was * terminated. */ serverLog(LL_WARNING,"Can't send ACK to AOF child: %s", strerror(errno)); } } /* Remove the handler since this can be called only one time during a * rewrite. */ aeDeleteFileEvent(server.el,server.aof_pipe_read_ack_from_child,AE_READABLE); }

    很简单,标记server.aof_stop_sending_diff=1,给子进程回复"!",并且把自己从事件循环删掉,自此父子进程间通信完成,剩下的就是父进程等待子进程退出进行收尾工作。

    父进程收尾

    serverCron()中会调用wait3()来收割子进程:

    /* Check if a background saving or AOF rewrite in progress terminated. */ if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 || ldbPendingChildren()) { int statloc; pid_t pid; if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) { int exitcode = WEXITSTATUS(statloc); int bysignal = 0; if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc); if (pid == -1) { serverLog(LL_WARNING,"wait3() returned an error: %s. " "rdb_child_pid = %d, aof_child_pid = %d", strerror(errno), (int) server.rdb_child_pid, (int) server.aof_child_pid); } else if (pid == server.rdb_child_pid) { backgroundSaveDoneHandler(exitcode,bysignal); if (!bysignal && exitcode == 0) receiveChildInfo(); } else if (pid == server.aof_child_pid) { backgroundRewriteDoneHandler(exitcode,bysignal); if (!bysignal && exitcode == 0) receiveChildInfo(); } else { if (!ldbRemoveChild(pid)) { serverLog(LL_WARNING, "Warning, detected child with unmatched pid: %ld", (long)pid); } } updateDictResizePolicy(); closeChildInfoPipe(); }

    如果收割到的pid是server.aof_child_pid就进入backgroundRewriteDoneHandler():

    void backgroundRewriteDoneHandler(int exitcode, int bysignal) { ... /* Flush the differences accumulated by the parent to the * rewritten AOF. */ latencyStartMonitor(latency); snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int)server.aof_child_pid); newfd = open(tmpfile,O_WRONLY|O_APPEND); if (newfd == -1) { serverLog(LL_WARNING, "Unable to open the temporary AOF produced by the child: %s", strerror(errno)); goto cleanup; } if (aofRewriteBufferWrite(newfd) == -1) { serverLog(LL_WARNING, "Error trying to flush the parent diff to the rewritten AOF: %s", strerror(errno)); close(newfd); goto cleanup; } latencyEndMonitor(latency); latencyAddSampleIfNeeded("aof-rewrite-diff-write",latency);

    首先会打开子进程生成的新aof文件,并调用aofRewriteBufferWrite()把server.aof_rewrite_buf_blocks中剩余的数据追加到新aof文件。

    /* Rename the temporary file. This will not unlink the target file if * it exists, because we reference it with "oldfd". */ latencyStartMonitor(latency); if (rename(tmpfile,server.aof_filename) == -1) { serverLog(LL_WARNING, "Error trying to rename the temporary AOF file %s into %s: %s", tmpfile, server.aof_filename, strerror(errno)); close(newfd); if (oldfd != -1) close(oldfd); goto cleanup; } latencyEndMonitor(latency); latencyAddSampleIfNeeded("aof-rename",latency);

    之后把新aof文件rename为server.aof_filename记录的文件名。

    /* Asynchronously close the overwritten AOF. */ if (oldfd != -1) bioCreateBackgroundJob(BIO_CLOSE_FILE,(void*)(long)oldfd,NULL,NULL);

    使用bio后台线程来close原来的aof文件。

    cleanup: aofClosePipes(); aofRewriteBufferReset(); aofRemoveTempFile(server.aof_child_pid); server.aof_child_pid = -1; server.aof_rewrite_time_last = time(NULL)-server.aof_rewrite_time_start; server.aof_rewrite_time_start = -1; /* Schedule a new rewrite if we are waiting for it to switch the AOF ON. */ if (server.aof_state == AOF_WAIT_REWRITE) server.aof_rewrite_scheduled = 1;

    最后是清理工作,包括关闭管道、重置aof-rewrite-buffer、复位server.aof_child_pid=-1等,自此aofrewrite完成。

    后记

    基于4.0的云redis已正式上线,登陆阿里云控制台即可开通:https://www.aliyun.com/product/kvstore

    相关资源:七夕情人节表白HTML源码(两款)

    最新回复(0)