详解Redis用链表实现消息队列
|
前言 Redis链表经常会被用于消息队列的服务,以完成多程序之间的消息交换。个人认为redis消息队列有一个好处,就是可以实现分布式和共享,就和memcache作为mysql的缓存和mysql自带的缓存一样。 链表实现消息队列 Redis链表支持前后插入以及前后取出,所以如果往尾部插入元素,往头部取出元素,这就是一种消息队列,也可以说是消费者/生产者模型。可以利用lpush和rpop来实现。但是有一个问题,如果链表中没有数据,那么消费者将要在while循环中调用rpop,这样以来就浪费cpu资源,好在Redis提供一种阻塞版pop命令brpop或者blpop,用法为 用法如下: charles@charles-Aspire-4741:~/mydir/mylib/redis$ ./src/redis-cli 127.0.0.1:6379> lpush list hello (integer) 1 127.0.0.1:6379> brpop list 0 1) "list" 2) "hello" 127.0.0.1:6379> brpop list 0 //阻塞在这里 /* ---------------------------------------------------- */ //当我在另一个客户端lpush一个元素之后,客户端输出为 127.0.0.1:6379> brpop list 0 1) "list" 2) "world" (50.60s)//阻塞的时间 当链表为空的时候,brpop是阻塞的,等待超时时间到或者另一个客户端lpush一个元素。接下来,看下源码是如何实现阻塞brpop命令的。要实现客户端阻塞,只需要服务器不给客户端发送消息,那么客户端就会阻塞在read调用中,等待消息到达。这是很好实现的,关键是如何判断这个客户端阻塞的链表有数据到达以及通知客户端解除阻塞?Redis的做法是,将阻塞的键以及阻塞在这个键上的客户端链表存储在一个字典中,然后每当向数据库插入一个链表时,就判断这个新插入的链表是否有客户端阻塞,有的话,就解除这个阻塞的客户端,并且发送刚插入链表元素给客户端,客户端就这样解除阻塞。 先看下有关数据结构,以及server和client有关属性
//阻塞状态
typedef struct blockingState {
/* Generic fields. */
mstime_t timeout; /* 超时时间 */
/* REDIS_BLOCK_LIST */
dict *keys; /* The keys we are waiting to terminate a blocking
* operation such as BLPOP. Otherwise NULL. */
robj *target; /* The key that should receive the element,* for BRPOPLPUSH. */
/* REDIS_BLOCK_WAIT */
int numreplicas; /* Number of replicas we are waiting for ACK. */
long long reploffset; /* Replication offset to reach. */
} blockingState;
//继续列表
typedef struct readyList {
redisDb *db;//就绪键所在的数据库
robj *key;//就绪键
} readyList;
//客户端有关属性
typedef struct redisClient {
int btype; /* Type of blocking op if REDIS_BLOCKED. */
blockingState bpop; /* blocking state */
}
//服务器有关属性
struct redisServer {
/* Blocked clients */
unsigned int bpop_blocked_clients; /* Number of clients blocked by lists */
list *unblocked_clients; /* list of clients to unblock before next loop */
list *ready_keys; /* List of readyList structures for BLPOP & co */
}
//数据库有关属性
typedef struct redisDb {
//keys->redisCLient映射
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP) */
dict *ready_keys; /* Blocked keys that received a PUSH */
}redisDB
必须对上述的数据结构足够了解,否则很难看懂下面的代码,因为这些代码需要操作上述的数据结构。先从brpop命令执行函数开始分析,brpop命令执行函数为
void brpopCommand(redisClient *c) {
blockingPopGenericCommand(c,REDIS_TAIL);
}
//++++++++++++++++++++++++++++++++++++++++++++++++++
void blockingPopGenericCommand(redisClient *c,int where) {
robj *o;
mstime_t timeout;
int j;
if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],&timeout,UNIT_SECONDS)
!= REDIS_OK) return;//将超时时间保存在timeout中
for (j = 1; j < c->argc-1; j++) {
o = lookupKeyWrite(c->db,c->argv[j]);//在数据库中查找操作的链表
if (o != NULL) {//如果不为空
if (o->type != REDIS_LIST) {//不是链表类型
addReply(c,shared.wrongtypeerr);//报错
return;
} else {
if (listTypeLength(o) != 0) {//链表不为空
/* Non empty list,this is like a non normal [LR]POP. */
char *event = (where == REDIS_HEAD) ? "lpop" : "rpop";
robj *value = listTypePop(o,where);//从链表中pop出一个元素
redisAssert(value != NULL);
//给客户端发送pop出来的元素信息
addReplyMultiBulkLen(c,2);
addReplyBulk(c,c->argv[j]);
addReplyBulk(c,value);
decrRefCount(value);
notifyKeyspaceEvent(REDIS_NOTIFY_LIST,event,c->argv[j],c->db->id);
if (listTypeLength(o) == 0) {//如果链表为空,从数据库删除链表
dbDelete(c->db,c->argv[j]);
notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",c->db->id);
}
/* 省略一部分 */
}
}
}
}
/* 如果链表为空,则阻塞客户端 */
blockForKeys(c,c->argv + 1,c->argc - 2,timeout,NULL);
}
从源码可以看出,brpop可以操作多个链表变量,例如
void blockForKeys(redisClient *c,robj **keys,int numkeys,mstime_t timeout,robj *target) {
dictEntry *de;
list *l;
int j;
c->bpop.timeout = timeout;//超时时间赋值给客户端blockingState属性
c->bpop.target = target;//这属性适用于brpoplpush命令的输入对象,如果是brpop,//则target为空
if (target != NULL) incrRefCount(target);//不为空,增加引用计数
for (j = 0; j < numkeys; j++) {
/* 将阻塞的key存入c.bpop.keys字典中 */
if (dictAdd(c->bpop.keys,keys[j],NULL) != DICT_OK) continue;
incrRefCount(keys[j]);
/* And in the other "side",to map keys -> clients */
//将阻塞的key和客户端添加进c->db->blocking_keys
de = dictFind(c->db->blocking_keys,keys[j]);
if (de == NULL) {
int retval;
/* For every key we take a list of clients blocked for it */
l = listCreate();
retval = dictAdd(c->db->blocking_keys,l);
incrRefCount(keys[j]);
redisAssertWithInfo(c,retval == DICT_OK);
} else {
l = dictGetVal(de);
}
listAddNodeTail(l,c);//添加到阻塞键的客户点链表中
}
blockClient(c,REDIS_BLOCKED_LIST);//设置客户端阻塞标志
}
blockClient函数只是简单的设置客户端属性,如下
void blockClient(redisClient *c,int btype) {
c->flags |= REDIS_BLOCKED;//设置标志
c->btype = btype;//阻塞操作类型
server.bpop_blocked_clients++;
}
由于这个函数之后,brpop命令执行函数就结束了,由于没有给客户端发送消息,所以客户端就阻塞在read调用中。那么如何解开客户端的阻塞了? 插入一个元素解阻塞 任何指令的执行函数都是在processCommand函数中调用call函数,然后在call函数中调用命令执行函数,lpush也一样。当执行完lpush之后,此时链表不为空,回到processCommand调用中,执行以下语句 if (listLength(server.ready_keys)) handleClientsBlockedOnLists(); 这两行代码是先检查server.ready_keys是否为空,如果不为空,说明已经有一些就绪的链表,此时可以判断是否有客户端阻塞在这个键值上,如果有,则唤醒;现在问题又来了,这个server.ready_keys在哪更新链表了? 原来是在dbAdd函数中,当往数据库中添加的值类型为REDIS-LIST时,这时就要调用signalListAsReady函数将链表指针添加进server.ready_keys:
//db.c
void dbAdd(redisDb *db,robj *key,robj *val) {
sds copy = sdsdup(key->ptr);
int retval = dictAdd(db->dict,copy,val);//将数据添加进数据库
redisAssertWithInfo(NULL,key,retval == REDIS_OK);
//判断是否为链表类型,如果是,调用有链表已经ready函数
if (val->type == REDIS_LIST) signalListAsReady(db,key);
if (server.cluster_enabled) slotToKeyAdd(key);
}
//t_list.c
void signalListAsReady(redisDb *db,robj *key) {
readyList *rl;
/* 没有客户端阻塞在这个键上,则直接返回. */
if (dictFind(db->blocking_keys,key) == NULL) return;
/* 这个键已近被唤醒了,所以没必要重新入队 */
if (dictFind(db->ready_keys,key) != NULL) return;
/* Ok,除了上述两情况,把这个键放入server.ready_keys */
rl = zmalloc(sizeof(*rl));
rl->key = key;
rl->db = db;
incrRefCount(key);
listAddNodeTail(server.ready_keys,rl);//添加链表末尾
/* We also add the key in the db->ready_keys dictionary in order
* to avoid adding it multiple times into a list with a simple O(1)
* check. */
incrRefCount(key);
//同时将这个阻塞键放入db->ready_keys
redisAssert(dictAdd(db->ready_keys,NULL) == DICT_OK);
}
(编辑:安卓应用网) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
