BLPOP 命令是 LPOP 命令的阻塞版本,当指定列表内没有任何元素可供获取时,连接将被 BLPOP 命令阻塞,直到等待超时或存在可获取元素为止(有另一个客户端对指定key 的任意一个执行 push 命令)。

BLPOP命令指定多个 key 参数时,按照参数 key 的先后顺序依次检查各个列表,并弹出第一个非空列表的头元素。

BLPOP命令接受一个以秒为单位的数字作为超时时间,若超时参数设为 0 ,其表示无超时时间限制(即无限期阻塞等待数据的到来)。

当指定的列表为空且已经超时,返回 nil ;当指定的列表中存在可返回的元素时,返回元素列表,其中第一个元素是被弹出元素所属的 key ,第二个元素是被弹出元素的值。

相同的 key 可以被多个客户端同时阻塞,不同的客户端被放进一个队列中,按“先阻塞先服务”原则,顺序地为 key 执行 BLPOP 命令。

BLPOP 命令可以用于 pipline 中,但把它用在 MULTI/EXEC 块当中没有意义。因为这要求整个服务器被阻塞以保证块执行时的原子性,该行为阻止了其他客户端执行 LPUSH 或 RPUSH 命令。

因此,一个被包裹在 MULTI/EXEC 块内的 BLPOP 命令,行为表现得就像 LPOP 一样,对空列表返回 nil ,对非空列表弹出列表元素,不进行任何阻塞操作。

在调用阻塞队列取操作时,队列中无数据时才会真正的触发代码的阻塞分支。在客户端,Redis对新来的读请求删除了标记(通知),这样直到阻塞的请求在获得服务前,新来的读请求都不能够被正常的服务。

// ./src/t_list.c

void blpopCommand(client *c) {
    blockingPopGenericCommand(c,LIST_HEAD);
}

void brpoplpushCommand(client *c) {
    mstime_t timeout;

    if (getTimeoutFromObjectOrReply(c,c->argv[3],&timeout,UNIT_SECONDS)
        != C_OK) return;

    robj *key = lookupKeyWrite(c->db, c->argv[1]);

    if (key == NULL) {
        if (c->flags & CLIENT_MULTI) {
            /* 在事务中阻塞空列表会立即返回 */
            addReplyNull(c);
        } else {
            /* 列表为空,客户端阻塞 */
            blockForKeys(c,BLOCKED_LIST,c->argv + 1,1,timeout,c->argv[2],NULL);
        }
    } else {
        if (key->type != OBJ_LIST) {
            addReply(c, shared.wrongtypeerr);
        } else {
            /* 该列表存在并有元素,正常执行 rpoplpushCommand。 */
            serverAssertWithInfo(c,key,listTypeLength(key) > 0);
            rpoplpushCommand(c);
        }
    }
}

Redis 是通过 ready_keys 和 blocking_keys 两个链表和事件循环来处理阻塞事件的,BLPOP 不会阻塞 Redis 服务,不会影响其他命令服务。

Redis server 中有两个循环:

  • IO循环:Redis 完成客户端连接应答、命令请求处理和命令处理结果回复等
  • 定时循环:Redis完成过期key的检测等

Redis 一次连接处理的过程包含:

  • IO多路复用检测套接字状态
  • 套接字事件分派
  • 请求事件处理

当BLPOP的key不存在或为空时,将 key 记录在 database 对应的 blocking_keys 数据结构中。blocking_keys 是一个字典结构,key 对应的是需要监听的名字,value 值是一个列表,里面存放被阻塞的客户端信息。处理完这些,然后也不关闭连接,就一直这样等待有客户向key里添加数据。

当服务器下次收到 PUSH 命令,会检查 blocking_keys 中是否存在对应的 key,如果存在,将 key 添加到 ready_keys 全局链表中,同时将 value 插入链表中并响应客户端。

服务端在每次的事件循环当中处理完客户端请求之后,会遍历 ready_keys 链表,并从 blocking_keys 链表当中找到对应的 client ,进行响应,整个过程并不会阻塞事件循环的执行。

而处理有设置 timeout 的 blocking POP 是通过定时任务来完成的,每隔一定时间就执行 clientsCronHandleTimeout ,将那些已经超时的客户端连接进行关闭。

// ./src/blocked.c

/*
 * 这是当前阻塞 lists/sorted sets/streams 的工作方式,
 * 以BLPOP为例,对于其他 list 操作、sorted sets 和XREA,是一样的
 * - 如果用户调用BLPOP,键存在且包含非空列表,则调用LPOP。没有阻塞时,BLPOP在语义上与LPOP相同
 * - 如果调用了BLPOP而键不存在或列表为空,则需要阻塞。
 *   为了做到这一点,我们删除了在客户端套接字中读取新数据的通知(这样,如果没有提供阻塞请求,我们将不提供新请求)。
 *   我们还将客户端放在字典中(db->blocking_keys),将键映射到为该键阻塞的客户端列表。
 * - 如果有PUSH数据到这些阻塞的key中,将这个key标记为“ready”。这个命令后,MULTI/EXEC语块、脚本 将会执行。
 *   我们为所有等待这个列表的客户端提供服务,从第一个阻塞列表到最后一个阻塞列表,根据我们在就绪列表中拥有的元素数量进行排序。
 */

/* Set a client in blocking mode for the specified key (list, zset or stream),
 * with the specified timeout. The 'type' argument is BLOCKED_LIST,
 * BLOCKED_ZSET or BLOCKED_STREAM depending on the kind of operation we are
 * waiting for an empty key in order to awake the client. The client is blocked
 * for all the 'numkeys' keys as in the 'keys' argument. When we block for
 * stream keys, we also provide an array of streamID structures: clients will
 * be unblocked only when items with an ID greater or equal to the specified
 * one is appended to the stream. */
void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, robj *target, streamID *ids) {
    dictEntry *de;
    list *l;
    int j;

    c->bpop.timeout = timeout;
    c->bpop.target = target;

    if (target != NULL) incrRefCount(target);

    for (j = 0; j < numkeys; j++) {
        /* The value associated with the key name in the bpop.keys dictionary
         * is NULL for lists and sorted sets, or the stream ID for streams. */
        void *key_data = NULL;
        if (btype == BLOCKED_STREAM) {
            key_data = zmalloc(sizeof(streamID));
            memcpy(key_data,ids+j,sizeof(streamID));
        }

        /* If the key already exists in the dictionary ignore it. */
        if (dictAdd(c->bpop.keys,keys[j],key_data) != DICT_OK) {
            zfree(key_data);
            continue;
        }
        incrRefCount(keys[j]);

        /* And in the other "side", to map keys -> clients */
        de = dictFind(c->db->blocking_keys,keys[j]);
        if (de == NULL) {
            int retval;

            /* For every key we take a list of clients blocked for it */
            l = listCreate();
            retval = dictAdd(c->db->blocking_keys,keys[j],l);
            incrRefCount(keys[j]);
            serverAssertWithInfo(c,keys[j],retval == DICT_OK);
        } else {
            l = dictGetVal(de);
        }
        listAddNodeTail(l,c);
    }
    blockClient(c,btype);
}