redis源码分析之evict.c

    xiaoxiao2022-07-04  151

    逐出算法这一块算是比较精华的一部分,redis内部有缓存复用策略,可以提升时间效率,内部使用了一个有序数组,对最大的idle值进行驱逐,近似驱逐算法

    #include "server.h" #include "bio.h" #include "atomicvar.h" /* ---------------------------------------------------------------------------- * Data structures * --------------------------------------------------------------------------*/ /* To improve the quality of the LRU approximation we take a set of keys * that are good candidate for eviction across freeMemoryIfNeeded() calls. * * Entries inside the eviciton pool are taken ordered by idle time, putting * greater idle times to the right (ascending order). * * When an LFU policy is used instead, a reverse frequency indication is used * instead of the idle time, so that we still evict by larger value (larger * inverse frequency means to evict keys with the least frequent accesses). * * Empty entries have the key pointer set to NULL. */ #define EVPOOL_SIZE 16 #define EVPOOL_CACHED_SDS_SIZE 255 struct evictionPoolEntry { unsigned long long idle; /* Object idle time (inverse frequency for LFU) */ sds key; /* Key name. */ sds cached; /* Cached SDS object for key name. */ int dbid; /* Key DB number. */ }; static struct evictionPoolEntry *EvictionPoolLRU; unsigned long LFUDecrAndReturn(robj *o); /* ---------------------------------------------------------------------------- * Implementation of eviction, aging and LRU * --------------------------------------------------------------------------*/ /* Return the LRU clock, based on the clock resolution. This is a time * in a reduced-bits format that can be used to set and check the * object->lru field of redisObject structures. */ unsigned int getLRUClock(void) { //24位lru clock return (mstime()/LRU_CLOCK_RESOLUTION) & LRU_CLOCK_MAX; } /* This function is used to obtain the current LRU clock. * If the current resolution is lower than the frequency we refresh the * LRU clock (as it should be in production servers) we return the * precomputed value, otherwise we need to resort to a system call. */ unsigned int LRU_CLOCK(void) { unsigned int lruclock; if (1000/server.hz <= LRU_CLOCK_RESOLUTION) { atomicGet(server.lruclock,lruclock);//获取当前的lruclock } else { lruclock = getLRUClock();//获取秒数 } return lruclock; } /* Given an object returns the min number of milliseconds the object was never * requested, using an approximated LRU algorithm. *///近似算法 unsigned long long estimateObjectIdleTime(robj *o) { unsigned long long lruclock = LRU_CLOCK(); if (lruclock >= o->lru) { return (lruclock - o->lru) * LRU_CLOCK_RESOLUTION;//返回空闲时间(ms) } else { return (lruclock + (LRU_CLOCK_MAX - o->lru)) * LRU_CLOCK_RESOLUTION;//放大算法 } } /* freeMemoryIfNeeded() gets called when 'maxmemory' is set on the config * file to limit the max memory used by the server, before processing a * command. * * The goal of the function is to free enough memory to keep Redis under the * configured memory limit. * * The function starts calculating how many bytes should be freed to keep * Redis under the limit, and enters a loop selecting the best keys to * evict accordingly to the configured policy. * * If all the bytes needed to return back under the limit were freed the * function returns C_OK, otherwise C_ERR is returned, and the caller * should block the execution of commands that will result in more memory * used by the server. * * ------------------------------------------------------------------------ * * LRU approximation algorithm * * Redis uses an approximation of the LRU algorithm that runs in constant * memory. Every time there is a key to expire, we sample N keys (with * N very small, usually in around 5) to populate a pool of best keys to * evict of M keys (the pool size is defined by EVPOOL_SIZE). * * The N keys sampled are added in the pool of good keys to expire (the one * with an old access time) if they are better than one of the current keys * in the pool. * * After the pool is populated, the best key we have in the pool is expired. * However note that we don't remove keys from the pool when they are deleted * so the pool may contain keys that no longer exist. * * When we try to evict a key, and all the entries in the pool don't exist * we populate it again. This time we'll be sure that the pool has at least * one key that can be evicted, if there is at least one key that can be * evicted in the whole database. */ /* Create a new eviction pool. */ void evictionPoolAlloc(void) { struct evictionPoolEntry *ep; int j; ep = zmalloc(sizeof(*ep)*EVPOOL_SIZE); for (j = 0; j < EVPOOL_SIZE; j++) { ep[j].idle = 0; ep[j].key = NULL; ep[j].cached = sdsnewlen(NULL,EVPOOL_CACHED_SDS_SIZE); ep[j].dbid = 0; } EvictionPoolLRU = ep; } /* This is an helper function for freeMemoryIfNeeded(), it is used in order * to populate the evictionPool with a few entries every time we want to * expire a key. Keys with idle time smaller than one of the current * keys are added. Keys are always added if there are free entries. * * We insert keys on place in ascending order, so keys with the smaller * idle time are on the left, and keys with the higher idle time on the * right. */ /** * evict pool填充 * @param dbid * @param sampledict * @param keydict * @param pool */ void evictionPoolPopulate(int dbid, dict *sampledict, dict *keydict, struct evictionPoolEntry *pool) { int j, k, count; dictEntry *samples[server.maxmemory_samples];//初始化采样数组 count = dictGetSomeKeys(sampledict,samples,server.maxmemory_samples);//随机获取count个key for (j = 0; j < count; j++) { unsigned long long idle; sds key; robj *o; dictEntry *de; de = samples[j];//获取采样的dict entry key = dictGetKey(de); /* If the dictionary we are sampling from is not the main * dictionary (but the expires one) we need to lookup the key * again in the key dictionary to obtain the value object. */ if (server.maxmemory_policy != MAXMEMORY_VOLATILE_TTL) {//过期删除判断 if (sampledict != keydict) de = dictFind(keydict, key);//如果不是过期淘汰策略的话,则回溯查找db.dict o = dictGetVal(de); } /* Calculate the idle time according to the policy. This is called * idle just because the code initially handled LRU, but is in fact * just a score where an higher score means better candidate. */ if (server.maxmemory_policy & MAXMEMORY_FLAG_LRU) {//lru策略 idle = estimateObjectIdleTime(o);//返回key空闲时间 } else if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {//lfu策略 /* When we use an LRU policy, we sort the keys by idle time * so that we expire keys starting from greater idle time. * However when the policy is an LFU one, we have a frequency * estimation, and we want to evict keys with lower frequency * first. So inside the pool we put objects using the inverted * frequency subtracting the actual frequency to the maximum * frequency of 255. */ idle = 255-LFUDecrAndReturn(o);//返回key的空闲频次,最大255 } else if (server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL) { /* In this case the sooner the expire the better. */ idle = ULLONG_MAX - (long)dictGetVal(de);//如果de时间越近,idle越小 } else { serverPanic("Unknown eviction policy in evictionPoolPopulate()"); } /* Insert the element inside the pool. * First, find the first empty bucket or the first populated * bucket that has an idle time smaller than our idle time. */ k = 0; while (k < EVPOOL_SIZE && pool[k].key && pool[k].idle < idle) k++;//寻找填充位置,list有序 if (k == 0 && pool[EVPOOL_SIZE-1].key != NULL) {//需要等待释放, /* Can't insert if the element is < the worst element we have * and there are no empty buckets. */ continue;//不会有多次无用循环么? } else if (k < EVPOOL_SIZE && pool[k].key == NULL) { /* Inserting into empty position. No setup needed before insert. *///直接插入 } else { /* Inserting in the middle. Now k points to the first element * greater than the element to insert. */ if (pool[EVPOOL_SIZE-1].key == NULL) { /* Free space on the right? Insert at k shifting * all the elements from k to end to the right. */ /* Save SDS before overwriting. */ sds cached = pool[EVPOOL_SIZE-1].cached; memmove(pool+k+1,pool+k, sizeof(pool[0])*(EVPOOL_SIZE-k-1));//给将要插入的位置释放空间,后移字节 pool[k].cached = cached;//重新使用未释放空间 } else { /* No free space on right? Insert at k-1 */ k--; /* Shift all elements on the left of k (included) to the * left, so we discard the element with smaller idle time. */ sds cached = pool[0].cached; /* Save SDS before overwriting. */ if (pool[0].key != pool[0].cached) sdsfree(pool[0].key);//释放无法复用的空间 memmove(pool,pool+1,sizeof(pool[0])*k);//前移 pool[k].cached = cached; } } /* Try to reuse the cached SDS string allocated in the pool entry, * because allocating and deallocating this object is costly * (according to the profiler, not my fantasy. Remember: * premature optimizbla bla bla bla. */ int klen = sdslen(key); if (klen > EVPOOL_CACHED_SDS_SIZE) {//优化逻辑,固定使用一块缓存 pool[k].key = sdsdup(key);//如果key长度大于指定长度的话,进行key深克隆 } else { memcpy(pool[k].cached,key,klen+1);//缓存key sdssetlen(pool[k].cached,klen);//设置cached长度 pool[k].key = pool[k].cached;//设置key } pool[k].idle = idle; pool[k].dbid = dbid; } } /* ---------------------------------------------------------------------------- * LFU (Least Frequently Used) implementation. * We have 24 total bits of space in each object in order to implement * an LFU (Least Frequently Used) eviction policy, since we re-use the * LRU field for this purpose. * * We split the 24 bits into two fields: * * 16 bits 8 bits * +----------------+--------+ * + Last decr time | LOG_C | * +----------------+--------+ * * LOG_C is a logarithmic counter that provides an indication of the access * frequency. However this field must also be decremented otherwise what used * to be a frequently accessed key in the past, will remain ranked like that * forever, while we want the algorithm to adapt to access pattern changes. * * So the remaining 16 bits are used in order to store the "decrement time", * a reduced-precision Unix time (we take 16 bits of the time converted * in minutes since we don't care about wrapping around) where the LOG_C * counter is halved if it has an high value, or just decremented if it * has a low value. * * New keys don't start at zero, in order to have the ability to collect * some accesses before being trashed away, so they start at COUNTER_INIT_VAL. * The logarithmic increment performed on LOG_C takes care of COUNTER_INIT_VAL * when incrementing the key, so that keys starting at COUNTER_INIT_VAL * (or having a smaller value) have a very high chance of being incremented * on access. * * During decrement, the value of the logarithmic counter is halved if * its current value is greater than two times the COUNTER_INIT_VAL, otherwise * it is just decremented by one. * --------------------------------------------------------------------------*/ /* Return the current time in minutes, just taking the least significant * 16 bits. The returned time is suitable to be stored as LDT (last decrement * time) for the LFU implementation. */ unsigned long LFUGetTimeInMinutes(void) { return (server.unixtime/60) & 65535; } /* Given an object last decrement time, compute the minimum number of minutes * that elapsed since the last decrement. Handle overflow (ldt greater than * the current 16 bits minutes time) considering the time as wrapping * exactly once. */ unsigned long LFUTimeElapsed(unsigned long ldt) { unsigned long now = LFUGetTimeInMinutes(); if (now >= ldt) return now-ldt; return 65535-ldt+now; } /* Logarithmically increment a counter. The greater is the current counter value * the less likely is that it gets really implemented. Saturate it at 255. */ uint8_t LFULogIncr(uint8_t counter) { if (counter == 255) return 255; double r = (double)rand()/RAND_MAX; double baseval = counter - LFU_INIT_VAL; if (baseval < 0) baseval = 0; double p = 1.0/(baseval*server.lfu_log_factor+1); if (r < p) counter++; return counter; } /* If the object decrement time is reached, decrement the LFU counter and * update the decrement time field. Return the object frequency counter. * * This function is used in order to scan the dataset for the best object * to fit: as we check for the candidate, we incrementally decrement the * counter of the scanned objects if needed. */ #define LFU_DECR_INTERVAL 1 unsigned long LFUDecrAndReturn(robj *o) { unsigned long ldt = o->lru >> 8; unsigned long counter = o->lru & 255; if (LFUTimeElapsed(ldt) >= server.lfu_decay_time && counter) { if (counter > LFU_INIT_VAL*2) { counter /= 2; if (counter < LFU_INIT_VAL*2) counter = LFU_INIT_VAL*2; } else { counter--; } o->lru = (LFUGetTimeInMinutes()<<8) | counter; } return counter; } /* ---------------------------------------------------------------------------- * The external API for eviction: freeMemroyIfNeeded() is called by the * server when there is data to add in order to make space if needed. * --------------------------------------------------------------------------*/ /* We don't want to count AOF buffers and slaves output buffers as * used memory: the eviction should use mostly data size. This function * returns the sum of AOF and slaves buffer. */ size_t freeMemoryGetNotCountedMemory(void) { size_t overhead = 0; int slaves = listLength(server.slaves); if (slaves) { listIter li; listNode *ln; listRewind(server.slaves,&li); while((ln = listNext(&li))) { client *slave = listNodeValue(ln); overhead += getClientOutputBufferMemoryUsage(slave); } } if (server.aof_state != AOF_OFF) { overhead += sdslen(server.aof_buf)+aofRewriteBufferSize(); } return overhead; } /** * 释放内存 * * @return */ int freeMemoryIfNeeded(void) { size_t mem_reported, mem_used, mem_tofree, mem_freed; mstime_t latency, eviction_latency; long long delta; int slaves = listLength(server.slaves);//从服务器数量 /* When clients are paused the dataset should be static not just from the * POV of clients not being able to write, but also from the POV of * expires and evictions of keys not being performed. */ if (clientsArePaused()) return C_OK; /* Check if we are over the memory usage limit. If we are not, no need * to subtract the slaves output buffers. We can just return ASAP. */ mem_reported = zmalloc_used_memory();//获取mem占用空间,非精确 if (mem_reported <= server.maxmemory) return C_OK;//如果小于配置的mem,则返回 /* Remove the size of slaves output buffers and AOF buffer from the * count of used memory. */ mem_used = mem_reported; size_t overhead = freeMemoryGetNotCountedMemory();//slaves同步buffer和aof buffer mem_used = (mem_used > overhead) ? mem_used-overhead : 0;//真正使用的mem空间 /* Check if we are still over the memory limit. */ if (mem_used <= server.maxmemory) return C_OK; /* Compute how much memory we need to free. */ mem_tofree = mem_used - server.maxmemory; mem_freed = 0; if (server.maxmemory_policy == MAXMEMORY_NO_EVICTION)//策略禁止回收 goto cant_free; /* We need to free memory, but policy forbids. */ latencyStartMonitor(latency); while (mem_freed < mem_tofree) { int j, k, i, keys_freed = 0; static int next_db = 0; sds bestkey = NULL; int bestdbid; redisDb *db; dict *dict; dictEntry *de; if (server.maxmemory_policy & (MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU) || server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL)//volatile_ttl在设置了过期时间的键空间中,具有更早过期时间的key优先移除。 { struct evictionPoolEntry *pool = EvictionPoolLRU; while(bestkey == NULL) { unsigned long total_keys = 0, keys; /* We don't want to make local-db choices when expiring keys, * so to start populate the eviction pool sampling keys from * every DB. */ for (i = 0; i < server.dbnum; i++) { db = server.db+i; dict = (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) ?//是否是从所有key里选择 db->dict : db->expires; if ((keys = dictSize(dict)) != 0) { evictionPoolPopulate(i, dict, db->dict, pool); total_keys += keys; } } if (!total_keys) break; /* No keys to evict. */ /* Go backward from best to worst element to evict. */ for (k = EVPOOL_SIZE-1; k >= 0; k--) { if (pool[k].key == NULL) continue; bestdbid = pool[k].dbid; if (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) { de = dictFind(server.db[pool[k].dbid].dict, pool[k].key); } else { de = dictFind(server.db[pool[k].dbid].expires, pool[k].key); } /* Remove the entry from the pool. */ if (pool[k].key != pool[k].cached) sdsfree(pool[k].key); pool[k].key = NULL; pool[k].idle = 0; /* If the key exists, is our pick. Otherwise it is * a ghost and we need to try the next element. */ if (de) { bestkey = dictGetKey(de); break; } else { /* Ghost... Iterate again. */ } } } } /* volatile-random and allkeys-random policy */ else if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM || server.maxmemory_policy == MAXMEMORY_VOLATILE_RANDOM) { /* When evicting a random key, we try to evict a key for * each DB, so we use the static 'next_db' variable to * incrementally visit all DBs. */ for (i = 0; i < server.dbnum; i++) { j = (++next_db) % server.dbnum; db = server.db+j; dict = (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM) ? db->dict : db->expires; if (dictSize(dict) != 0) { de = dictGetRandomKey(dict);//随机获取一个dict entry bestkey = dictGetKey(de); bestdbid = j; break; } } } /* Finally remove the selected key. */ if (bestkey) { db = server.db+bestdbid; robj *keyobj = createStringObject(bestkey,sdslen(bestkey)); propagateExpire(db,keyobj,server.lazyfree_lazy_eviction);//aof缓冲区写入,主从传播 /* We compute the amount of memory freed by db*Delete() alone. * It is possible that actually the memory needed to propagate * the DEL in AOF and replication link is greater than the one * we are freeing removing the key, but we can't account for * that otherwise we would never exit the loop. * * AOF and Output buffer memory will be freed eventually so * we only care about memory used by the key space. */ delta = (long long) zmalloc_used_memory(); latencyStartMonitor(eviction_latency); if (server.lazyfree_lazy_eviction) dbAsyncDelete(db,keyobj); else dbSyncDelete(db,keyobj); latencyEndMonitor(eviction_latency); latencyAddSampleIfNeeded("eviction-del",eviction_latency); latencyRemoveNestedEvent(latency,eviction_latency); delta -= (long long) zmalloc_used_memory(); mem_freed += delta; server.stat_evictedkeys++; notifyKeyspaceEvent(NOTIFY_EVICTED, "evicted", keyobj, db->id);//发送key失效事件,用来做监控 decrRefCount(keyobj); keys_freed++; /* When the memory to free starts to be big enough, we may * start spending so much time here that is impossible to * deliver data to the slaves fast enough, so we force the * transmission here inside the loop. */ if (slaves) flushSlavesOutputBuffers();//刷新到内核缓冲区 /* Normally our stop condition is the ability to release * a fixed, pre-computed amount of memory. However when we * are deleting objects in another thread, it's better to * check, from time to time, if we already reached our target * memory, since the "mem_freed" amount is computed only * across the dbAsyncDelete() call, while the thread can * release the memory all the time. */ if (server.lazyfree_lazy_eviction && !(keys_freed % 16)) { overhead = freeMemoryGetNotCountedMemory(); mem_used = zmalloc_used_memory(); mem_used = (mem_used > overhead) ? mem_used-overhead : 0; if (mem_used <= server.maxmemory) { mem_freed = mem_tofree; } } } if (!keys_freed) { latencyEndMonitor(latency); latencyAddSampleIfNeeded("eviction-cycle",latency); goto cant_free; /* nothing to free... */ } } latencyEndMonitor(latency); latencyAddSampleIfNeeded("eviction-cycle",latency); return C_OK; cant_free: /* We are here if we are not able to reclaim memory. There is only one * last thing we can try: check if the lazyfree thread has jobs in queue * and wait... */ while(bioPendingJobsOfType(BIO_LAZY_FREE)) {//如果job正在执行则阻塞,等待任务执行完成 if (((mem_reported - zmalloc_used_memory()) + mem_freed) >= mem_tofree) break; usleep(1000); } return C_ERR; }
    最新回复(0)