Spring Cloud Alibaba Nacos 如何在一致性保证下实现AP和CP的并存?

2026-05-21 03:533阅读0评论SEO基础
  • 内容介绍
  • 文章标签
  • 相关推荐

本文共计3149个文字,预计阅读时间需要13分钟。

Spring Cloud Alibaba Nacos 如何在一致性保证下实现AP和CP的并存?

Nacos中的两种一致性策略:共存与疑问。为什么传统的CP模式的Zookeeper或AP模式的Eureka,都只支持CAP理论下的AP实现或CP实现?而Nacos却能同时实现两者呢?

两种一致性策略如何在nacos中共存

或许会有疑问,为什么早先cp模式的Zookeeper或者AP模式的Eureka,都只有支持CAP理论下大家常用的AP实现或者CP实现,而nacos却能够两个都实现呢?

 

其实CAP理论,仅仅是针对分布式下数据的一致性而言,如果你对于数据的一致性要求不高,可忍受最终一致性,那么AP模式的Eureka就可以满足你了,如果说你对数据的一致性要求很高,那么就使用CP模式的Zookeeper,而追其根本,并不是说Eureka是AP的,或者说Zookeeper是CP的,而是他们存储的数据的一致性,满足AP或者CP,因此也就不难实现在一个组件中实现AP模式与CP模式共存。

@DependsOn("ProtocolManager") @Service("consistencyDelegate") public class DelegateConsistencyServiceImpl implements ConsistencyService { private final PersistentConsistencyServiceDelegateImpl persistentConsistencyService; private final EphemeralConsistencyService ephemeralConsistencyService; }

DelegateConsistencyServiceImpl是一个一致性策略选择的类,根据不同的策略触发条件(在nacos中,CP与AP切换的条件是注册的服务实例是否是临时实例),选择PersistentConsistencyService策略或者EphemeralConsistencyService策略,而EphemeralConsistencyService对应的是DistroConsistencyServiceImpl,采用的协议是阿里自研的Distro,我个人觉得就像gossip协议;PersistentConsistencyService对应的是RaftConsistencyServiceImpl,其底层采用的是Raft协议;这两种一致性策略下的数据存储互不影响,所以nacos实现了AP模式与CP模式在一个组件中同时存在。

Nacos AP实现:

Nacos AP一致性策略 Distro

CP实现

重要的协议——RAFT

腾讯文档——Raft论文

Raft算法原论文

一文搞懂Raft算法

Raft为了实现容易理解的目标,在paxos的基础上进行的状态简化以及问题拆分,将之前复杂的逻辑拆成若干个子问题,基本上可以总结成下面几个方面:

  • leader election:选取主节点
  • log replication:日志备份,数据同步
  • safety:为了实现上述两点而产生的一些约束条件和保障条件

leader election

role

首先先说明下Raft算法中节点的角色,分为以下三种:

  • leader:由所有节点选举,在candidate中产生,负责整个集群的状态以及元数据管理,当发现更大的term时,转化为follower。

  • candidate:由follower在集群选举时转化而成,选举时得到多数选票,则转化为leader,若发现主节点或者更大的term则转化为follower。

  • follower:集群初始化时所有节点的角色都是follower,若未发现leader心跳,则发起leader选举,并将角色转化为candidate;leader以及candidate在某些条件下也会转化成follower。

给出状态机,协助大家理解:`

term

上面在讲解role的时候好几次说到了一个名词term,这是raft算法中的一个核心概念,很多的机制都需要依赖term。term通俗来讲就是任期,即一个leader正常工作的时间段,如果因为某些原因当前的leader不再是leader时,则该term结束,重新进行选举,开始新的任期,是不是和现实生活中的选举很像?另外term在raft算法中也起到了逻辑时钟的作用,在raft的实现中起到了重要的作用,此处用一句话先来概括:即term大的优先级高,leader必须是拥有更大的term。用白话理解:就是当前总统和前总统的关系,总统只能有一个就是当期总统,前总统在当前总统面前就变成选民了。在Raft中,term是个整数型的值,term变化即将term的值加1。

 

深入浅出一文搞懂Raft协议:blog.csdn.net/microGP/article/details/114261089

nacos是如何实现CP(raft)的

RaftController

Raft集群内部节点间是通过暴露的 Restful接口,代码在 RaftController 中。RaftController控制器是 Raft集群内部节点间通信使用的,具体的信息如下:

POST HTTP://{ip:port}/v1/ns/raft/vote : 进行投票请求 POST HTTP://{ip:port}/v1/ns/raft/beat : Leader向Follower发送心跳信息 GET HTTP://{ip:port}/v1/ns/raft/peer : 获取该节点的RaftPeer信息 PUT HTTP://{ip:port}/v1/ns/raft/datum/reload : 重新加载某日志信息 POST HTTP://{ip:port}/v1/ns/raft/datum : Leader接收传来的数据并存入 DELETE HTTP://{ip:port}/v1/ns/raft/datum : Leader接收传来的数据删除操作 GET HTTP://{ip:port}/v1/ns/raft/datum : 获取该节点存储的数据信息 GET HTTP://{ip:port}/v1/ns/raft/state : 获取该节点的状态信息{UP or DOWN} POST HTTP://{ip:port}/v1/ns/raft/datum/commit : Follower节点接收Leader传来得到数据存入操作 DELETE HTTP://{ip:port}/v1/ns/raft/datum : Follower节点接收Leader传来的数据删除操作 GET HTTP://{ip:port}/v1/ns/raft/leader : 获取当前集群的Leader节点信息 GET HTTP://{ip:port}/v1/ns/raft/listeners : 获取当前Raft集群的所有事件监听者

RaftPeerSet

这个对象存储的是所有raft协议下的节点信息,存储的元素如下

@Deprecated @Component @DependsOn("ProtocolManager") public class RaftPeerSet extends MemberChangeListener implements Closeable { // 集群节点地址管理 private final ServerMemberManager memberManager; // 周期数 private AtomicLong localTerm = new AtomicLong(0L); // 当前周期内的Leader private RaftPeer leader = null; // 所有的节点信息 private volatile Map<String, RaftPeer> peers = new HashMap<>(8); // 暂时不清楚用途 private Set<String> sites = new HashSet<>(); // 本节点是否已准备完毕 private volatile boolean ready = false; }

同时还具备了raft协议下必要的方法

@Deprecated @Component @DependsOn("ProtocolManager") public class RaftPeerSet extends MemberChangeListener implements Closeable { // 当前IP对应的节点是否是Leader public boolean isLeader(String ip) { if (EnvUtil.getStandaloneMode()) { return true; } if (leader == null) { Loggers.RAFT.warn("[IS LEADER] no leader is available now!"); return false; } return StringUtils.equals(leader.ip, ip); } // 决定Leader节点,根据投票结果以及是否满足majorityCount机制 public RaftPeer decideLeader(RaftPeer candidate) { peers.put(candidate.ip, candidate); SortedBag ips = new TreeBag(); int maxApproveCount = 0; String maxApprovePeer = null; for (RaftPeer peer : peers.values()) { if (StringUtils.isEmpty(peer.voteFor)) { continue; } // 选票计数 ips.add(peer.voteFor); // 如果某节点的得票数大于当前的最大得票数,则更新候选Leader信息 if (ips.getCount(peer.voteFor) > maxApproveCount) { maxApproveCount = ips.getCount(peer.voteFor); maxApprovePeer = peer.voteFor; } } // 是否满足majorityCount数量的限制 if (maxApproveCount >= majorityCount()) { // 若满足则设置Leader节点信息 RaftPeer peer = peers.get(maxApprovePeer); peer.state = RaftPeer.State.LEADER; if (!Objects.equals(leader, peer)) { leader = peer; ApplicationUtils.publishEvent(new LeaderElectFinishedEvent(this, leader, local())); Loggers.RAFT.info("{} has become the LEADER", leader.ip); } } return leader; } public RaftPeer makeLeader(RaftPeer candidate) { // 如果当前Leader与Candidate节点不一样,则进行Leader信息更改 if (!Objects.equals(leader, candidate)) { leader = candidate; ApplicationUtils.publishEvent(new MakeLeaderEvent(this, leader, local())); Loggers.RAFT .info("{} has become the LEADER, local: {}, leader: {}", leader.ip, JacksonUtils.toJson(local()), JacksonUtils.toJson(leader)); } for (final RaftPeer peer : peers.values()) { Map<String, String> params = new HashMap<>(1); // 如果当前节点与远程Leader节点不等且是Follower节点 if (!Objects.equals(peer, candidate) && peer.state == RaftPeer.State.LEADER) { try { // 获取每个节点的RaftPeer节点信息对象数据 String url = RaftCore.buildUrl(peer.ip, RaftCore.API_GET_PEER); HttpClient.asyncHttpGet(url, null, params, new Callback<String>() { @Override public void onReceive(RestResult<String> result) { if (!result.ok()) { Loggers.RAFT .error("[NACOS-RAFT] get peer failed: {}, peer: {}", result.getCode(), peer.ip); peer.state = RaftPeer.State.FOLLOWER; return; } update(JacksonUtils.toObj(result.getData(), RaftPeer.class)); } @Override public void onError(Throwable throwable) { } @Override public void onCancel() { } }); } catch (Exception e) { peer.state = RaftPeer.State.FOLLOWER; Loggers.RAFT.error("[NACOS-RAFT] error while getting peer from peer: {}", peer.ip); } } } return update(candidate); } }

RaftCore

该对象是nacos中raft协议的主要实现,在启动之初,会进行一系列初始化的操作

@Deprecated @DependsOn("ProtocolManager") @Component public class RaftCore implements Closeable { @PostConstruct public void init() throws Exception { Loggers.RAFT.info("initializing Raft sub-system"); final long start = System.currentTimeMillis(); // 进行日志文件的加载到内存数据对象Datums的操作 raftStore.loadDatums(notifier, datums); // 设置当前的周期数 setTerm(NumberUtils.toLong(raftStore.loadMeta().getProperty("term"), 0L)); Loggers.RAFT.info("cache loaded, datum count: {}, current term: {}", datums.size(), peers.getTerm()); // 初始化标识更改 initialized = true; Loggers.RAFT.info("finish to load data from disk, cost: {} ms.", (System.currentTimeMillis() - start)); // 开启定时的Leader选举任务 masterTask = GlobalExecutor.registerMasterElection(new MasterElection()); // 开启定时的Leader心跳服务 heartbeatTask = GlobalExecutor.registerHeartbeat(new HeartBeat()); versionJudgement.registerObserver(isAllNewVersion -> { stopWork = isAllNewVersion; if (stopWork) { try { shutdown(); raftListener.removeOldRaftMetadata(); } catch (NacosException e) { throw new NacosRuntimeException(NacosException.SERVER_ERROR, e); } } }, 100); NotifyCenter.registerSubscriber(notifier); Loggers.RAFT.info("timer started: leader timeout ms: {}, heart-beat timeout ms: {}", GlobalExecutor.LEADER_TIMEOUT_MS, GlobalExecutor.HEARTBEAT_INTERVAL_MS); } }

初始化的一系列操作完成后,此时集群还无法对外提供服务,因为此时Leader还未选举出来,需要在MasterElection选举Leader成功后才可以对外提供服务。

Leader 选举任务

@Deprecated @DependsOn("ProtocolManager") @Component public class RaftCore implements Closeable { // Leader 选举任务 public class MasterElection implements Runnable { @Override public void run() { try { if (stopWork) { return; } // 当前节点是否已准备完毕 if (!peers.isReady()) { return; } // 获取自身节点信息 RaftPeer local = peers.local(); // 本地存储的Leader任期时间 local.leaderDueMs -= GlobalExecutor.TICK_PERIOD_MS; // 如果Leader任期时间还在允许范围内,则不进行Leader选举 if (local.leaderDueMs > 0) { return; } // reset timeout local.resetLeaderDue(); local.resetHeartbeatDue(); // 向其他节点发起投票请求 sendVote(); } catch (Exception e) { Loggers.RAFT.warn("[RAFT] error while master election {}", e); } } private void sendVote() { RaftPeer local = peers.get(NetUtils.localServer()); Loggers.RAFT.info("leader timeout, start voting,leader: {}, term: {}", JacksonUtils.toJson(getLeader()), local.term); //重置Raft集群数据 peers.reset(); local.term.incrementAndGet(); // 设置给自己投票 local.voteFor = local.ip; //将节点状态更新为候选节点 local.state = RaftPeer.State.CANDIDATE; Map<String, String> params = new HashMap<>(1); params.put("vote", JacksonUtils.toJson(local)); // 遍历所有的节点信息(除了自己之外) for (final String server : peers.allServersWithoutMySelf()) { final String url = buildUrl(server, API_VOTE); try { //候选节点向除自身之外的所有其它Raft节点的/v1/ns/raft/vote发送HTTP POST请求 //请求内容为vote:JSON.toJSONString(local) HttpClient.asyncHttpPost(url, null, params, new Callback<String>() { @Override public void onReceive(RestResult<String> result) { if (!result.ok()) { Loggers.RAFT.error("NACOS-RAFT vote failed: {}, url: {}", result.getCode(), url); return; } // 获取投票结果,并进行Leader的选举工作 RaftPeer peer = JacksonUtils.toObj(result.getData(), RaftPeer.class); Loggers.RAFT.info("received approve from peer: {}", JacksonUtils.toJson(peer)); //候选节点收到其他节点投的候选节点数据,交给PeerSet.decideLeader方法处理 peers.decideLeader(peer); } @Override public void onError(Throwable throwable) { Loggers.RAFT.error("error while sending vote to server: {}", server, throwable); } @Override public void onCancel() { } }); } catch (Exception e) { Loggers.RAFT.warn("error while sending vote to server: {}", server); } } } } }

每个节点启动时,都会认为自己可以作为Leader,因此都会以自去己作为被选举人,向其他节点发起投票请求,其他节点的接受投票请求路径为**/v1/ns/raft/vote**

其他节点接收到投票请求,会调用RaftController.vote方法

@Deprecated @RestController @RequestMapping({UtilsAndCommons.NACOS_NAMING_CONTEXT + "/raft", UtilsAndCommons.NACOS_SERVER_CONTEXT + UtilsAndCommons.NACOS_NAMING_CONTEXT + "/raft"}) public class RaftController { private final RaftCore raftCore; private final ClusterVersionJudgement versionJudgement; @PostMapping("/vote") public JsonNode vote(HttpServletRequest request, HttpServletResponse response) throws Exception { if (versionJudgement.allMemberIsNewVersion()) { throw new IllegalStateException("old raft protocol already stop"); } // 处理选举请求 RaftPeer peer = raftCore.receivedVote(JacksonUtils.toObj(WebUtils.required(request, "vote"), RaftPeer.class)); return JacksonUtils.transferToJsonNode(peer); } } @Deprecated @DependsOn("ProtocolManager") @Component public class RaftCore implements Closeable { // 节点接收到投票请求后的处理 public synchronized RaftPeer receivedVote(RaftPeer remote) { if (stopWork) { throw new IllegalStateException("old raft protocol already stop work"); } // 被选举人是否在raft集群节点列表中 if (!peers.contains(remote)) { throw new IllegalStateException("can not find peer: " + remote.ip); } // 获取自身节点信息 RaftPeer local = peers.get(NetUtils.localServer()); // 如果被选举节点的周期数小于本节点的周期数,则将自己的投票投给自己并告诉被选举者 // 若当前节点的 term 大于等于发送选举请求的节点 term,则选择自己为 leader if (remote.term.get() <= local.term.get()) { String msg = "received illegitimate vote" + ", voter-term:" + remote.term + ", votee-term:" + local.term; Loggers.RAFT.info(msg); if (StringUtils.isEmpty(local.voteFor)) { local.voteFor = local.ip; } return local; } // 满足投票条件后,本节点确认将自己的票投给被选举者 // 若当前节点的 term 小于发送请求的节点 term,选择发送请求的节点为 leader local.resetLeaderDue(); local.state = RaftPeer.State.FOLLOWER; local.voteFor = remote.ip; local.term.set(remote.term.get()); Loggers.RAFT.info("vote {} as leader, term: {}", remote.ip, remote.term); return local; } }

通过以上步骤,最终选举出了Leader节点,接下来,就可以对外提供服务了。

数据同步 日志同步

当系统有了leader后,系统就进入对外工作了。客户端的一切请求发送到leader,leader来调度这些并发请求的顺序,并且保证leader与followers状态的一致性。在 raft 集群中,所有日志都必须首先提交至 leader 节点。leader 在每个 heartbeat 向 follower 同步日志。

 

Spring Cloud Alibaba Nacos 如何在一致性保证下实现AP和CP的并存?

因为是CP模式,所以操作都是通过Leader节点进行传达的,Follower节点本身不与Client进行联系,Follower只能接受来自Leader的操作请求,因此就存在请求转发的问题。因此在RaftCore中的signalPublish以及signalDelete中,存在着对Leader节点的判断以及请求转发的逻辑。

Raft协议下的注册流程入口也是在InstanceController.register方法,只不过在ConsistencyService的put方法实现由AP一致性调用的DistroConsistencyServiceImpl变成了RaftConsistencyServiceImpl。

@Deprecated @DependsOn("ProtocolManager") @Service public class RaftConsistencyServiceImpl implements PersistentConsistencyService { private final RaftCore raftCore; @Override public void put(String key, Record value) throws NacosException { checkIsStopWork(); try { raftCore.signalPublish(key, value); } catch (Exception e) { Loggers.RAFT.error("Raft put failed.", e); throw new NacosException(NacosException.SERVER_ERROR, "Raft put failed, key:" + key + ", value:" + value, e); } } }

最终调用到 RaftCore 的 signalPublish() 方法。

RaftCore.signalPublish()

@Deprecated @DependsOn("ProtocolManager") @Component public class RaftCore implements Closeable { public void signalPublish(String key, Record value) throws Exception { if (stopWork) { throw new IllegalStateException("old raft protocol already stop work"); } //不是leader if (!isLeader()) { ObjectNode params = JacksonUtils.createEmptyJsonNode(); params.put("key", key); params.replace("value", JacksonUtils.transferToJsonNode(value)); Map<String, String> parameters = new HashMap<>(1); parameters.put("key", key); final RaftPeer leader = getLeader(); //请求转发 交给leader去做/v1/ns/raft/datum raftProxy.proxyPostLarge(leader.ip, API_PUB, params.toString(), parameters); return; } OPERATE_LOCK.lock(); // 若是 leader,将包发送给所有的 follower try { final long start = System.currentTimeMillis(); final Datum datum = new Datum(); datum.key = key; datum.value = value; if (getDatum(key) == null) { datum.timestamp.set(1L); } else { datum.timestamp.set(getDatum(key).timestamp.incrementAndGet()); } ObjectNode json = JacksonUtils.createEmptyJsonNode(); json.replace("datum", JacksonUtils.transferToJsonNode(datum)); json.replace("source", JacksonUtils.transferToJsonNode(peers.local())); //发布数据改变通知 本地 onPublish 方法用来处理持久化逻辑 onPublish(datum, peers.local()); final String content = json.toString(); //只要过半的结点数 final CountDownLatch latch = new CountDownLatch(peers.majorityCount()); //遍历所有结点 for (final String server : peers.allServersIncludeMyself()) { if (isLeader(server)) { //自己算一次 latch.countDown(); continue; } // 将包发送给所有的 follower,调用 /v1/ns/raft/datum/commit 接口 final String url = buildUrl(server, API_ON_PUB); HttpClient.asyncHttpPostLarge(url, Arrays.asList("key", key), content, new Callback<String>() { @Override public void onReceive(RestResult<String> result) { if (!result.ok()) { Loggers.RAFT .warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, www.liaochuntao.cn/2019/06/01/java-web-41/

blog.csdn.net/liyanan21/article/details/89320872

blog.csdn.net/microGP/article/details/114261089

本文共计3149个文字,预计阅读时间需要13分钟。

Spring Cloud Alibaba Nacos 如何在一致性保证下实现AP和CP的并存?

Nacos中的两种一致性策略:共存与疑问。为什么传统的CP模式的Zookeeper或AP模式的Eureka,都只支持CAP理论下的AP实现或CP实现?而Nacos却能同时实现两者呢?

两种一致性策略如何在nacos中共存

或许会有疑问,为什么早先cp模式的Zookeeper或者AP模式的Eureka,都只有支持CAP理论下大家常用的AP实现或者CP实现,而nacos却能够两个都实现呢?

 

其实CAP理论,仅仅是针对分布式下数据的一致性而言,如果你对于数据的一致性要求不高,可忍受最终一致性,那么AP模式的Eureka就可以满足你了,如果说你对数据的一致性要求很高,那么就使用CP模式的Zookeeper,而追其根本,并不是说Eureka是AP的,或者说Zookeeper是CP的,而是他们存储的数据的一致性,满足AP或者CP,因此也就不难实现在一个组件中实现AP模式与CP模式共存。

@DependsOn("ProtocolManager") @Service("consistencyDelegate") public class DelegateConsistencyServiceImpl implements ConsistencyService { private final PersistentConsistencyServiceDelegateImpl persistentConsistencyService; private final EphemeralConsistencyService ephemeralConsistencyService; }

DelegateConsistencyServiceImpl是一个一致性策略选择的类,根据不同的策略触发条件(在nacos中,CP与AP切换的条件是注册的服务实例是否是临时实例),选择PersistentConsistencyService策略或者EphemeralConsistencyService策略,而EphemeralConsistencyService对应的是DistroConsistencyServiceImpl,采用的协议是阿里自研的Distro,我个人觉得就像gossip协议;PersistentConsistencyService对应的是RaftConsistencyServiceImpl,其底层采用的是Raft协议;这两种一致性策略下的数据存储互不影响,所以nacos实现了AP模式与CP模式在一个组件中同时存在。

Nacos AP实现:

Nacos AP一致性策略 Distro

CP实现

重要的协议——RAFT

腾讯文档——Raft论文

Raft算法原论文

一文搞懂Raft算法

Raft为了实现容易理解的目标,在paxos的基础上进行的状态简化以及问题拆分,将之前复杂的逻辑拆成若干个子问题,基本上可以总结成下面几个方面:

  • leader election:选取主节点
  • log replication:日志备份,数据同步
  • safety:为了实现上述两点而产生的一些约束条件和保障条件

leader election

role

首先先说明下Raft算法中节点的角色,分为以下三种:

  • leader:由所有节点选举,在candidate中产生,负责整个集群的状态以及元数据管理,当发现更大的term时,转化为follower。

  • candidate:由follower在集群选举时转化而成,选举时得到多数选票,则转化为leader,若发现主节点或者更大的term则转化为follower。

  • follower:集群初始化时所有节点的角色都是follower,若未发现leader心跳,则发起leader选举,并将角色转化为candidate;leader以及candidate在某些条件下也会转化成follower。

给出状态机,协助大家理解:`

term

上面在讲解role的时候好几次说到了一个名词term,这是raft算法中的一个核心概念,很多的机制都需要依赖term。term通俗来讲就是任期,即一个leader正常工作的时间段,如果因为某些原因当前的leader不再是leader时,则该term结束,重新进行选举,开始新的任期,是不是和现实生活中的选举很像?另外term在raft算法中也起到了逻辑时钟的作用,在raft的实现中起到了重要的作用,此处用一句话先来概括:即term大的优先级高,leader必须是拥有更大的term。用白话理解:就是当前总统和前总统的关系,总统只能有一个就是当期总统,前总统在当前总统面前就变成选民了。在Raft中,term是个整数型的值,term变化即将term的值加1。

 

深入浅出一文搞懂Raft协议:blog.csdn.net/microGP/article/details/114261089

nacos是如何实现CP(raft)的

RaftController

Raft集群内部节点间是通过暴露的 Restful接口,代码在 RaftController 中。RaftController控制器是 Raft集群内部节点间通信使用的,具体的信息如下:

POST HTTP://{ip:port}/v1/ns/raft/vote : 进行投票请求 POST HTTP://{ip:port}/v1/ns/raft/beat : Leader向Follower发送心跳信息 GET HTTP://{ip:port}/v1/ns/raft/peer : 获取该节点的RaftPeer信息 PUT HTTP://{ip:port}/v1/ns/raft/datum/reload : 重新加载某日志信息 POST HTTP://{ip:port}/v1/ns/raft/datum : Leader接收传来的数据并存入 DELETE HTTP://{ip:port}/v1/ns/raft/datum : Leader接收传来的数据删除操作 GET HTTP://{ip:port}/v1/ns/raft/datum : 获取该节点存储的数据信息 GET HTTP://{ip:port}/v1/ns/raft/state : 获取该节点的状态信息{UP or DOWN} POST HTTP://{ip:port}/v1/ns/raft/datum/commit : Follower节点接收Leader传来得到数据存入操作 DELETE HTTP://{ip:port}/v1/ns/raft/datum : Follower节点接收Leader传来的数据删除操作 GET HTTP://{ip:port}/v1/ns/raft/leader : 获取当前集群的Leader节点信息 GET HTTP://{ip:port}/v1/ns/raft/listeners : 获取当前Raft集群的所有事件监听者

RaftPeerSet

这个对象存储的是所有raft协议下的节点信息,存储的元素如下

@Deprecated @Component @DependsOn("ProtocolManager") public class RaftPeerSet extends MemberChangeListener implements Closeable { // 集群节点地址管理 private final ServerMemberManager memberManager; // 周期数 private AtomicLong localTerm = new AtomicLong(0L); // 当前周期内的Leader private RaftPeer leader = null; // 所有的节点信息 private volatile Map<String, RaftPeer> peers = new HashMap<>(8); // 暂时不清楚用途 private Set<String> sites = new HashSet<>(); // 本节点是否已准备完毕 private volatile boolean ready = false; }

同时还具备了raft协议下必要的方法

@Deprecated @Component @DependsOn("ProtocolManager") public class RaftPeerSet extends MemberChangeListener implements Closeable { // 当前IP对应的节点是否是Leader public boolean isLeader(String ip) { if (EnvUtil.getStandaloneMode()) { return true; } if (leader == null) { Loggers.RAFT.warn("[IS LEADER] no leader is available now!"); return false; } return StringUtils.equals(leader.ip, ip); } // 决定Leader节点,根据投票结果以及是否满足majorityCount机制 public RaftPeer decideLeader(RaftPeer candidate) { peers.put(candidate.ip, candidate); SortedBag ips = new TreeBag(); int maxApproveCount = 0; String maxApprovePeer = null; for (RaftPeer peer : peers.values()) { if (StringUtils.isEmpty(peer.voteFor)) { continue; } // 选票计数 ips.add(peer.voteFor); // 如果某节点的得票数大于当前的最大得票数,则更新候选Leader信息 if (ips.getCount(peer.voteFor) > maxApproveCount) { maxApproveCount = ips.getCount(peer.voteFor); maxApprovePeer = peer.voteFor; } } // 是否满足majorityCount数量的限制 if (maxApproveCount >= majorityCount()) { // 若满足则设置Leader节点信息 RaftPeer peer = peers.get(maxApprovePeer); peer.state = RaftPeer.State.LEADER; if (!Objects.equals(leader, peer)) { leader = peer; ApplicationUtils.publishEvent(new LeaderElectFinishedEvent(this, leader, local())); Loggers.RAFT.info("{} has become the LEADER", leader.ip); } } return leader; } public RaftPeer makeLeader(RaftPeer candidate) { // 如果当前Leader与Candidate节点不一样,则进行Leader信息更改 if (!Objects.equals(leader, candidate)) { leader = candidate; ApplicationUtils.publishEvent(new MakeLeaderEvent(this, leader, local())); Loggers.RAFT .info("{} has become the LEADER, local: {}, leader: {}", leader.ip, JacksonUtils.toJson(local()), JacksonUtils.toJson(leader)); } for (final RaftPeer peer : peers.values()) { Map<String, String> params = new HashMap<>(1); // 如果当前节点与远程Leader节点不等且是Follower节点 if (!Objects.equals(peer, candidate) && peer.state == RaftPeer.State.LEADER) { try { // 获取每个节点的RaftPeer节点信息对象数据 String url = RaftCore.buildUrl(peer.ip, RaftCore.API_GET_PEER); HttpClient.asyncHttpGet(url, null, params, new Callback<String>() { @Override public void onReceive(RestResult<String> result) { if (!result.ok()) { Loggers.RAFT .error("[NACOS-RAFT] get peer failed: {}, peer: {}", result.getCode(), peer.ip); peer.state = RaftPeer.State.FOLLOWER; return; } update(JacksonUtils.toObj(result.getData(), RaftPeer.class)); } @Override public void onError(Throwable throwable) { } @Override public void onCancel() { } }); } catch (Exception e) { peer.state = RaftPeer.State.FOLLOWER; Loggers.RAFT.error("[NACOS-RAFT] error while getting peer from peer: {}", peer.ip); } } } return update(candidate); } }

RaftCore

该对象是nacos中raft协议的主要实现,在启动之初,会进行一系列初始化的操作

@Deprecated @DependsOn("ProtocolManager") @Component public class RaftCore implements Closeable { @PostConstruct public void init() throws Exception { Loggers.RAFT.info("initializing Raft sub-system"); final long start = System.currentTimeMillis(); // 进行日志文件的加载到内存数据对象Datums的操作 raftStore.loadDatums(notifier, datums); // 设置当前的周期数 setTerm(NumberUtils.toLong(raftStore.loadMeta().getProperty("term"), 0L)); Loggers.RAFT.info("cache loaded, datum count: {}, current term: {}", datums.size(), peers.getTerm()); // 初始化标识更改 initialized = true; Loggers.RAFT.info("finish to load data from disk, cost: {} ms.", (System.currentTimeMillis() - start)); // 开启定时的Leader选举任务 masterTask = GlobalExecutor.registerMasterElection(new MasterElection()); // 开启定时的Leader心跳服务 heartbeatTask = GlobalExecutor.registerHeartbeat(new HeartBeat()); versionJudgement.registerObserver(isAllNewVersion -> { stopWork = isAllNewVersion; if (stopWork) { try { shutdown(); raftListener.removeOldRaftMetadata(); } catch (NacosException e) { throw new NacosRuntimeException(NacosException.SERVER_ERROR, e); } } }, 100); NotifyCenter.registerSubscriber(notifier); Loggers.RAFT.info("timer started: leader timeout ms: {}, heart-beat timeout ms: {}", GlobalExecutor.LEADER_TIMEOUT_MS, GlobalExecutor.HEARTBEAT_INTERVAL_MS); } }

初始化的一系列操作完成后,此时集群还无法对外提供服务,因为此时Leader还未选举出来,需要在MasterElection选举Leader成功后才可以对外提供服务。

Leader 选举任务

@Deprecated @DependsOn("ProtocolManager") @Component public class RaftCore implements Closeable { // Leader 选举任务 public class MasterElection implements Runnable { @Override public void run() { try { if (stopWork) { return; } // 当前节点是否已准备完毕 if (!peers.isReady()) { return; } // 获取自身节点信息 RaftPeer local = peers.local(); // 本地存储的Leader任期时间 local.leaderDueMs -= GlobalExecutor.TICK_PERIOD_MS; // 如果Leader任期时间还在允许范围内,则不进行Leader选举 if (local.leaderDueMs > 0) { return; } // reset timeout local.resetLeaderDue(); local.resetHeartbeatDue(); // 向其他节点发起投票请求 sendVote(); } catch (Exception e) { Loggers.RAFT.warn("[RAFT] error while master election {}", e); } } private void sendVote() { RaftPeer local = peers.get(NetUtils.localServer()); Loggers.RAFT.info("leader timeout, start voting,leader: {}, term: {}", JacksonUtils.toJson(getLeader()), local.term); //重置Raft集群数据 peers.reset(); local.term.incrementAndGet(); // 设置给自己投票 local.voteFor = local.ip; //将节点状态更新为候选节点 local.state = RaftPeer.State.CANDIDATE; Map<String, String> params = new HashMap<>(1); params.put("vote", JacksonUtils.toJson(local)); // 遍历所有的节点信息(除了自己之外) for (final String server : peers.allServersWithoutMySelf()) { final String url = buildUrl(server, API_VOTE); try { //候选节点向除自身之外的所有其它Raft节点的/v1/ns/raft/vote发送HTTP POST请求 //请求内容为vote:JSON.toJSONString(local) HttpClient.asyncHttpPost(url, null, params, new Callback<String>() { @Override public void onReceive(RestResult<String> result) { if (!result.ok()) { Loggers.RAFT.error("NACOS-RAFT vote failed: {}, url: {}", result.getCode(), url); return; } // 获取投票结果,并进行Leader的选举工作 RaftPeer peer = JacksonUtils.toObj(result.getData(), RaftPeer.class); Loggers.RAFT.info("received approve from peer: {}", JacksonUtils.toJson(peer)); //候选节点收到其他节点投的候选节点数据,交给PeerSet.decideLeader方法处理 peers.decideLeader(peer); } @Override public void onError(Throwable throwable) { Loggers.RAFT.error("error while sending vote to server: {}", server, throwable); } @Override public void onCancel() { } }); } catch (Exception e) { Loggers.RAFT.warn("error while sending vote to server: {}", server); } } } } }

每个节点启动时,都会认为自己可以作为Leader,因此都会以自去己作为被选举人,向其他节点发起投票请求,其他节点的接受投票请求路径为**/v1/ns/raft/vote**

其他节点接收到投票请求,会调用RaftController.vote方法

@Deprecated @RestController @RequestMapping({UtilsAndCommons.NACOS_NAMING_CONTEXT + "/raft", UtilsAndCommons.NACOS_SERVER_CONTEXT + UtilsAndCommons.NACOS_NAMING_CONTEXT + "/raft"}) public class RaftController { private final RaftCore raftCore; private final ClusterVersionJudgement versionJudgement; @PostMapping("/vote") public JsonNode vote(HttpServletRequest request, HttpServletResponse response) throws Exception { if (versionJudgement.allMemberIsNewVersion()) { throw new IllegalStateException("old raft protocol already stop"); } // 处理选举请求 RaftPeer peer = raftCore.receivedVote(JacksonUtils.toObj(WebUtils.required(request, "vote"), RaftPeer.class)); return JacksonUtils.transferToJsonNode(peer); } } @Deprecated @DependsOn("ProtocolManager") @Component public class RaftCore implements Closeable { // 节点接收到投票请求后的处理 public synchronized RaftPeer receivedVote(RaftPeer remote) { if (stopWork) { throw new IllegalStateException("old raft protocol already stop work"); } // 被选举人是否在raft集群节点列表中 if (!peers.contains(remote)) { throw new IllegalStateException("can not find peer: " + remote.ip); } // 获取自身节点信息 RaftPeer local = peers.get(NetUtils.localServer()); // 如果被选举节点的周期数小于本节点的周期数,则将自己的投票投给自己并告诉被选举者 // 若当前节点的 term 大于等于发送选举请求的节点 term,则选择自己为 leader if (remote.term.get() <= local.term.get()) { String msg = "received illegitimate vote" + ", voter-term:" + remote.term + ", votee-term:" + local.term; Loggers.RAFT.info(msg); if (StringUtils.isEmpty(local.voteFor)) { local.voteFor = local.ip; } return local; } // 满足投票条件后,本节点确认将自己的票投给被选举者 // 若当前节点的 term 小于发送请求的节点 term,选择发送请求的节点为 leader local.resetLeaderDue(); local.state = RaftPeer.State.FOLLOWER; local.voteFor = remote.ip; local.term.set(remote.term.get()); Loggers.RAFT.info("vote {} as leader, term: {}", remote.ip, remote.term); return local; } }

通过以上步骤,最终选举出了Leader节点,接下来,就可以对外提供服务了。

数据同步 日志同步

当系统有了leader后,系统就进入对外工作了。客户端的一切请求发送到leader,leader来调度这些并发请求的顺序,并且保证leader与followers状态的一致性。在 raft 集群中,所有日志都必须首先提交至 leader 节点。leader 在每个 heartbeat 向 follower 同步日志。

 

Spring Cloud Alibaba Nacos 如何在一致性保证下实现AP和CP的并存?

因为是CP模式,所以操作都是通过Leader节点进行传达的,Follower节点本身不与Client进行联系,Follower只能接受来自Leader的操作请求,因此就存在请求转发的问题。因此在RaftCore中的signalPublish以及signalDelete中,存在着对Leader节点的判断以及请求转发的逻辑。

Raft协议下的注册流程入口也是在InstanceController.register方法,只不过在ConsistencyService的put方法实现由AP一致性调用的DistroConsistencyServiceImpl变成了RaftConsistencyServiceImpl。

@Deprecated @DependsOn("ProtocolManager") @Service public class RaftConsistencyServiceImpl implements PersistentConsistencyService { private final RaftCore raftCore; @Override public void put(String key, Record value) throws NacosException { checkIsStopWork(); try { raftCore.signalPublish(key, value); } catch (Exception e) { Loggers.RAFT.error("Raft put failed.", e); throw new NacosException(NacosException.SERVER_ERROR, "Raft put failed, key:" + key + ", value:" + value, e); } } }

最终调用到 RaftCore 的 signalPublish() 方法。

RaftCore.signalPublish()

@Deprecated @DependsOn("ProtocolManager") @Component public class RaftCore implements Closeable { public void signalPublish(String key, Record value) throws Exception { if (stopWork) { throw new IllegalStateException("old raft protocol already stop work"); } //不是leader if (!isLeader()) { ObjectNode params = JacksonUtils.createEmptyJsonNode(); params.put("key", key); params.replace("value", JacksonUtils.transferToJsonNode(value)); Map<String, String> parameters = new HashMap<>(1); parameters.put("key", key); final RaftPeer leader = getLeader(); //请求转发 交给leader去做/v1/ns/raft/datum raftProxy.proxyPostLarge(leader.ip, API_PUB, params.toString(), parameters); return; } OPERATE_LOCK.lock(); // 若是 leader,将包发送给所有的 follower try { final long start = System.currentTimeMillis(); final Datum datum = new Datum(); datum.key = key; datum.value = value; if (getDatum(key) == null) { datum.timestamp.set(1L); } else { datum.timestamp.set(getDatum(key).timestamp.incrementAndGet()); } ObjectNode json = JacksonUtils.createEmptyJsonNode(); json.replace("datum", JacksonUtils.transferToJsonNode(datum)); json.replace("source", JacksonUtils.transferToJsonNode(peers.local())); //发布数据改变通知 本地 onPublish 方法用来处理持久化逻辑 onPublish(datum, peers.local()); final String content = json.toString(); //只要过半的结点数 final CountDownLatch latch = new CountDownLatch(peers.majorityCount()); //遍历所有结点 for (final String server : peers.allServersIncludeMyself()) { if (isLeader(server)) { //自己算一次 latch.countDown(); continue; } // 将包发送给所有的 follower,调用 /v1/ns/raft/datum/commit 接口 final String url = buildUrl(server, API_ON_PUB); HttpClient.asyncHttpPostLarge(url, Arrays.asList("key", key), content, new Callback<String>() { @Override public void onReceive(RestResult<String> result) { if (!result.ok()) { Loggers.RAFT .warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, www.liaochuntao.cn/2019/06/01/java-web-41/

blog.csdn.net/liyanan21/article/details/89320872

blog.csdn.net/microGP/article/details/114261089