Redis源码解析:集群手动故障转移、从节点迁移详解
|
一:手动故障转移 Redis集群支持手动故障转移。也就是向从节点发送”CLUSTER FAILOVER”命令,使其在主节点未下线的情况下,发起故障转移流程,升级为新的主节点,而原来的主节点降级为从节点。 为了不丢失数据,向从节点发送”CLUSTER FAILOVER”命令后,流程如下: a:从节点收到命令后,向主节点发送CLUSTERMSG_TYPE_MFSTART包; ”CLUSTER FAILOVER”命令支持两个选项:FORCE和TAKEOVER。使用这两个选项,可以改变上述的流程。 如果有FORCE选项,则从节点不会与主节点进行交互,主节点也不会阻塞其客户端,而是从节点立即开始故障转移流程:发起选举、统计选票、赢得选举、升级为主节点并更新配置。 如果有TAKEOVER选项,则更加简单粗暴:从节点不再发起选举,而是直接将自己升级为主节点,接手原主节点的槽位,增加自己的configEpoch后更新配置。 因此,使用FORCE和TAKEOVER选项,主节点可以已经下线;而不使用任何选项,只发送”CLUSTER FAILOVER”命令的话,主节点必须在线。 在clusterCommand函数中,处理”CLUSTER FAILOVER”命令的部分代码如下:
else if (!strcasecmp(c->argv[1]->ptr,"failover") &&
(c->argc == 2 || c->argc == 3))
{
/* CLUSTER FAILOVER [FORCE|TAKEOVER] */
int force = 0,takeover = 0;
if (c->argc == 3) {
if (!strcasecmp(c->argv[2]->ptr,"force")) {
force = 1;
} else if (!strcasecmp(c->argv[2]->ptr,"takeover")) {
takeover = 1;
force = 1; /* Takeover also implies force. */
} else {
addReply(c,shared.syntaxerr);
return;
}
}
/* Check preconditions. */
if (nodeIsMaster(myself)) {
addReplyError(c,"You should send CLUSTER FAILOVER to a slave");
return;
} else if (myself->slaveof == NULL) {
addReplyError(c,"I'm a slave but my master is unknown to me");
return;
} else if (!force &&
(nodeFailed(myself->slaveof) ||
myself->slaveof->link == NULL))
{
addReplyError(c,"Master is down or failed,"
"please use CLUSTER FAILOVER FORCE");
return;
}
resetManualFailover();
server.cluster->mf_end = mstime() + REDIS_CLUSTER_MF_TIMEOUT;
if (takeover) {
/* A takeover does not perform any initial check. It just
* generates a new configuration epoch for this node without
* consensus,claims the master's slots,and broadcast the new
* configuration. */
redisLog(REDIS_WARNING,"Taking over the master (user request).");
clusterBumpConfigEpochWithoutConsensus();
clusterFailoverReplaceYourMaster();
} else if (force) {
/* If this is a forced failover,we don't need to talk with our
* master to agree about the offset. We just failover taking over
* it without coordination. */
redisLog(REDIS_WARNING,"Forced failover user request accepted.");
server.cluster->mf_can_start = 1;
} else {
redisLog(REDIS_WARNING,"Manual failover user request accepted.");
clusterSendMFStart(myself->slaveof);
}
addReply(c,shared.ok);
}
首先检查命令的最后一个参数是否是FORCE或TAKEOVER; 如果当前节点是主节点;或者当前节点是从节点,但没有主节点;或者当前从节点的主节点已经下线或者断链,并且命令中没有FORCE或TAKEOVER参数,则直接回复客户端错误信息后返回; 然后调用resetManualFailover,重置手动强制故障转移的状态; 置mf_end为当前时间加5秒,该属性表示手动强制故障转移流程的超时时间,也用来表示当前是否正在进行手动强制故障转移; 如果命令最后一个参数为TAKEOVER,这表示收到命令的从节点无需经过选举的过程,直接接手其主节点的槽位,并成为新的主节点。因此首先调用函数clusterBumpConfigEpochWithoutConsensus,产生新的configEpoch,以便后续更新配置;然后调用clusterFailoverReplaceYourMaster函数,转变成为新的主节点,并将这种转变广播给集群中所有节点; 如果命令最后一个参数是FORCE,这表示收到命令的从节点可以直接开始选举过程,而无需达到主节点的复制偏移量之后才开始选举过程。因此置mf_can_start为1,这样在函数clusterHandleSlaveFailover中,即使在主节点未下线或者当前从节点的复制数据比较旧的情况下,也可以开始故障转移流程; 如果最后一个参数不是FORCE或TAKEOVER,这表示收到命令的从节点,首先需要向主节点发送CLUSTERMSG_TYPE_MFSTART包,因此调用clusterSendMFStart函数,向其主节点发送该包; 主节点收到CLUSTERMSG_TYPE_MFSTART包后,在clusterProcessPacket函数中,是这样处理的:
else if (type == CLUSTERMSG_TYPE_MFSTART) {
/* This message is acceptable only if I'm a master and the sender
* is one of my slaves. */
if (!sender || sender->slaveof != myself) return 1;
/* Manual failover requested from slaves. Initialize the state
* accordingly. */
resetManualFailover();
server.cluster->mf_end = mstime() + REDIS_CLUSTER_MF_TIMEOUT;
server.cluster->mf_slave = sender;
pauseClients(mstime()+(REDIS_CLUSTER_MF_TIMEOUT*2));
redisLog(REDIS_WARNING,"Manual failover requested by slave %.40s.",sender->name);
}
如果字典中找不到发送节点,或者发送节点的主节点不是当前节点,则直接返回; 调用resetManualFailover,重置手动强制故障转移的状态; 然后置mf_end为当前时间加5秒,该属性表示手动强制故障转移流程的超时时间,也用来表示当前是否正在进行手动强制故障转移; 然后设置mf_slave为sender,该属性表示要进行手动强制故障转移的从节点; 然后调用pauseClients,使所有客户端在之后的10s内阻塞; 主节点在发送心跳包时,在构建包头时,如果发现当前正处于手动强制故障转移阶段,则会在包头中增加CLUSTERMSG_FLAG0_PAUSED标记:
void clusterBuildMessageHdr(clusterMsg *hdr,int type) {
...
/* Set the message flags. */
if (nodeIsMaster(myself) && server.cluster->mf_end)
hdr->mflags[0] |= CLUSTERMSG_FLAG0_PAUSED;
...
}
从节点在clusterProcessPacket函数中处理收到的包,一旦发现主节点发来的,带有CLUSTERMSG_FLAG0_PAUSED标记的包,就会将该主节点的复制偏移量记录到server.cluster->mf_master_offset中:
int clusterProcessPacket(clusterLink *link) {
...
/* Check if the sender is a known node. */
sender = clusterLookupNode(hdr->sender);
if (sender && !nodeInHandshake(sender)) {
...
/* Update the replication offset info for this node. */
sender->repl_offset = ntohu64(hdr->offset);
sender->repl_offset_time = mstime();
/* If we are a slave performing a manual failover and our master
* sent its offset while already paused,populate the MF state. */
if (server.cluster->mf_end &&
nodeIsSlave(myself) &&
myself->slaveof == sender &&
hdr->mflags[0] & CLUSTERMSG_FLAG0_PAUSED &&
server.cluster->mf_master_offset == 0)
{
server.cluster->mf_master_offset = sender->repl_offset;
redisLog(REDIS_WARNING,"Received replication offset for paused "
"master manual failover: %lld",server.cluster->mf_master_offset);
}
}
}
(编辑:安卓应用网) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
