Nacos配置中心集群工作原理和源码解析是怎样的?

2026-05-19 17:061阅读0评论SEO问题
  • 内容介绍
  • 文章标签
  • 相关推荐

本文共计2078个文字,预计阅读时间需要9分钟。

Nacos配置中心集群工作原理和源码解析是怎样的?

Nacos作为配置中心,为确保服务节点的可用性,实现高可用性,其集群工作原理如下:

Nacos集群通过以下方式实现高可用性:

1. 主从复制:Nacos集群中,主节点负责处理配置更新和读取请求,从节点则负责同步主节点的配置数据。当主节点出现故障时,从节点可以迅速接管,保证服务的正常运行。

2. 数据持久化:Nacos使用数据库来存储配置信息,确保即使集群发生故障,配置数据也不会丢失。

Nacos配置中心集群工作原理和源码解析是怎样的?

3. 负载均衡:Nacos集群通过负载均衡机制,将请求分发到各个节点,提高系统的吞吐量和响应速度。

下面是Nacos集群的部署图:

+--------+ +--------+ +--------+| Node 1 |----->| Node 2 |----->| Node 3 |+--------+ +--------+ +--------+

在Nacos集群中,作为配置中心的集群结构,是一种分布式架构。

具体来说,Nacos集群的架构特点如下:

1. 分布式存储:配置数据存储在分布式数据库中,确保数据的一致性和可靠性。

2. 服务注册与发现:Nacos提供服务注册与发现功能,使得服务之间可以相互发现,提高系统的可扩展性和可维护性。

3. 配置管理:Nacos提供集中式的配置管理功能,使得配置的修改和更新更加方便。

4. 动态配置:Nacos支持动态配置,使得配置的更新可以实时生效,无需重启服务。

总结来说,Nacos集群通过主从复制、数据持久化、负载均衡等机制,实现了高可用性,确保了服务节点的稳定运行。

Nacos作为配置中心,必然需要保证服务节点的高可用性,那么Nacos是如何实现集群的呢?

下面这个图,表示Nacos集群的部署图。

Nacos集群工作原理

Nacos作为配置中心的集群结构中,是一种无中心化节点的设计,由于没有主从节点,也没有选举机制,所以为了能够实现热备,就需要增加虚拟IP(VIP)。

Nacos的数据存储分为两部分

  1. Mysql数据库存储,所有Nacos节点共享同一份数据,数据的副本机制由Mysql本身的主从方案来解决,从而保证数据的可靠性。
  2. 每个节点的本地磁盘,会保存一份全量数据,具体路径:/data/program/nacos-1/data/config-data/${GROUP}.

在Nacos的设计中,Mysql是一个中心数据仓库,且认为在Mysql中的数据是绝对正确的。 除此之外,Nacos在启动时会把Mysql中的数据写一份到本地磁盘。

这么设计的好处是可以提高性能,当客户端需要请求某个配置项时,服务端会想Ian从磁盘中读取对应文件返回,而磁盘的读取效率要比数据库效率高。

当配置发生变更时:

  1. Nacos会把变更的配置保存到数据库,然后再写入本地文件。
  2. 接着发送一个HTTP请求,给到集群中的其他节点,其他节点收到事件后,从Mysql中dump刚刚写入的数据到本地文件中。

另外,NacosServer启动后,会同步启动一个定时任务,每隔6小时,会dump一次全量数据到本地文件

配置变更同步入口

当配置发生修改、删除、新增操作时,通过发布一个notifyConfigChange事件。

@PostMapping @Secured(action = ActionTypes.WRITE, parser = ConfigResourceParser.class) public Boolean publishConfig(HttpServletRequest request, HttpServletResponse response, @RequestParam(value = "dataId") String dataId, @RequestParam(value = "group") String group, @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant, @RequestParam(value = "content") String content, @RequestParam(value = "tag", required = false) String tag, @RequestParam(value = "appName", required = false) String appName, @RequestParam(value = "src_user", required = false) String srcUser, @RequestParam(value = "config_tags", required = false) String configTags, @RequestParam(value = "desc", required = false) String desc, @RequestParam(value = "use", required = false) String use, @RequestParam(value = "effect", required = false) String effect, @RequestParam(value = "type", required = false) String type, @RequestParam(value = "schema", required = false) String schema) throws NacosException { //省略.. if (StringUtils.isBlank(betaIps)) { if (StringUtils.isBlank(tag)) { persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, true); ConfigChangePublisher .notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime())); } else { persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, true); ConfigChangePublisher.notifyConfigChange( new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime())); } }//省略 return true; } AsyncNotifyService

配置数据变更事件,专门有一个监听器AsyncNotifyService,它会处理数据变更后的同步事件。

@Autowired public AsyncNotifyService(ServerMemberManager memberManager) { this.memberManager = memberManager; // Register ConfigDataChangeEvent to NotifyCenter. NotifyCenter.registerToPublisher(ConfigDataChangeEvent.class, NotifyCenter.ringBufferSize); // Register A Subscriber to subscribe ConfigDataChangeEvent. NotifyCenter.registerSubscriber(new Subscriber() { @Override public void onEvent(Event event) { // Generate ConfigDataChangeEvent concurrently if (event instanceof ConfigDataChangeEvent) { ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event; long dumpTs = evt.lastModifiedTs; String dataId = evt.dataId; String group = evt.group; String tenant = evt.tenant; String tag = evt.tag; Collection<Member> ipList = memberManager.allMembers(); //得到集群中的ip列表 // 构建NotifySingleTask,并添加到队列中。 Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>(); for (Member member : ipList) { //遍历集群中的每个节点 queue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, member.getAddress(), evt.isBeta)); } //异步执行任务 AsyncTask ConfigExecutor.executeAsyncNotify(new AsyncTask(nacosAsyncRestTemplate, queue)); } } @Override public Class<? extends Event> subscribeType() { return ConfigDataChangeEvent.class; } }); } AsyncTask

@Override public void run() { executeAsyncInvoke(); } private void executeAsyncInvoke() { while (!queue.isEmpty()) {//遍历队列中的数据,直到数据为空 NotifySingleTask task = queue.poll(); //获取task String targetIp = task.getTargetIP(); //获取目标ip if (memberManager.hasMember(targetIp)) { //如果集群中的ip列表包含目标ip // start the health check and there are ips that are not monitored, put them directly in the notification queue, otherwise notify //判断目标ip的健康状态 boolean unHealthNeedDelay = memberManager.isUnHealth(targetIp); // if (unHealthNeedDelay) { //如果目标服务是非健康,则继续添加到队列中,延后再执行。 // target ip is unhealthy, then put it in the notification list ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null, task.getLastModified(), InetUtils.getSelfIP(), ConfigTraceService.NOTIFY_EVENT_UNHEALTH, 0, task.target); // get delay time and set fail count to the task asyncTaskExecute(task); } else { //构建header Header header = Header.newInstance(); header.addParam(NotifyService.NOTIFY_HEADER_LAST_MODIFIED, String.valueOf(task.getLastModified())); header.addParam(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP, InetUtils.getSelfIP()); if (task.isBeta) { header.addParam("isBeta", "true"); } AuthHeaderUtil.addIdentityToHeader(header); //通过restTemplate发起远程调用,如果调用成功,则执行AsyncNotifyCallBack的回调方法 restTemplate.get(task.url, header, Query.EMPTY, String.class, new AsyncNotifyCallBack(task)); } } } } 目标节点接收请求

数据同步的请求地址为,task.url=192.168.8.16:8848/nacos/v1/cs/communication/dataChange?dataId=log.yaml&group=DEFAULT_GROUP

@GetMapping("/dataChange") public Boolean notifyConfigInfo(HttpServletRequest request, @RequestParam("dataId") String dataId, @RequestParam("group") String group, @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant, @RequestParam(value = "tag", required = false) String tag) { dataId = dataId.trim(); group = group.trim(); String lastModified = request.getHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED); long lastModifiedTs = StringUtils.isEmpty(lastModified) ? -1 : Long.parseLong(lastModified); String handleIp = request.getHeader(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP); String isBetaStr = request.getHeader("isBeta"); if (StringUtils.isNotBlank(isBetaStr) && trueStr.equals(isBetaStr)) { dumpService.dump(dataId, group, tenant, lastModifiedTs, handleIp, true); } else { // dumpService.dump(dataId, group, tenant, tag, lastModifiedTs, handleIp); } return true; }

dumpService.dump用来实现配置的更新,代码如下

当前任务会被添加到DumpTaskMgr中管理。

public void dump(String dataId, String group, String tenant, String tag, long lastModified, String handleIp, boolean isBeta) { String groupKey = GroupKey2.getKey(dataId, group, tenant); String taskKey = String.join("+", dataId, group, tenant, String.valueOf(isBeta), tag); dumpTaskMgr.addTask(taskKey, new DumpTask(groupKey, tag, lastModified, handleIp, isBeta)); DUMP_LOG.info("[dump-task] add task. groupKey={}, taskKey={}", groupKey, taskKey); }

TaskManager.addTask, 先调用父类去完成任务添加。

@Override public void addTask(Object key, AbstractDelayTask newTask) { super.addTask(key, newTask); MetricsMonitor.getDumpTaskMonitor().set(tasks.size()); }

在这种场景设计中,一般都会采用生产者消费者模式来完成,因此这里不难猜测到,任务会被保存到一个队列中,然后有另外一个线程来执行。

NacosDelayTaskExecuteEngine

TaskManager的父类是NacosDelayTaskExecuteEngine,

这个类中有一个成员属性protected final ConcurrentHashMap<Object, AbstractDelayTask> tasks;,专门来保存延期执行的任务类型AbstractDelayTask.

在这个类的构造方法中,初始化了一个延期执行的任务,其中具体的任务是ProcessRunnable.

public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) { super(logger); tasks = new ConcurrentHashMap<Object, AbstractDelayTask>(initCapacity); processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name)); processingExecutor .scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS); } ProcessRunnable

private class ProcessRunnable implements Runnable { @Override public void run() { try { processTasks(); } catch (Throwable e) { getEngineLog().error(e.toString(), e); } } } processTasks

protected void processTasks() { //获取所有的任务 Collection<Object> keys = getAllTaskKeys(); for (Object taskKey : keys) { AbstractDelayTask task = removeTask(taskKey); if (null == task) { continue; } //获取任务处理器,这里返回的是DumpProcessor NacosTaskProcessor processor = getProcessor(taskKey); if (null == processor) { getEngineLog().error("processor not found for task, so discarded. " + task); continue; } try { // ReAdd task if process failed //执行具体任务 if (!processor.process(task)) { retryFailedTask(taskKey, task); } } catch (Throwable e) { getEngineLog().error("Nacos task execute error : " + e.toString(), e); retryFailedTask(taskKey, task); } } } DumpProcessor.process

读取数据库的最新数据,然后更新本地缓存和磁盘。

版权声明:本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 Mic带你学架构
如果本篇文章对您有帮助,还请帮忙点个关注和赞,您的坚持是我不断创作的动力。欢迎关注「跟着Mic学架构」公众号公众号获取更多技术干货!

本文共计2078个文字,预计阅读时间需要9分钟。

Nacos配置中心集群工作原理和源码解析是怎样的?

Nacos作为配置中心,为确保服务节点的可用性,实现高可用性,其集群工作原理如下:

Nacos集群通过以下方式实现高可用性:

1. 主从复制:Nacos集群中,主节点负责处理配置更新和读取请求,从节点则负责同步主节点的配置数据。当主节点出现故障时,从节点可以迅速接管,保证服务的正常运行。

2. 数据持久化:Nacos使用数据库来存储配置信息,确保即使集群发生故障,配置数据也不会丢失。

Nacos配置中心集群工作原理和源码解析是怎样的?

3. 负载均衡:Nacos集群通过负载均衡机制,将请求分发到各个节点,提高系统的吞吐量和响应速度。

下面是Nacos集群的部署图:

+--------+ +--------+ +--------+| Node 1 |----->| Node 2 |----->| Node 3 |+--------+ +--------+ +--------+

在Nacos集群中,作为配置中心的集群结构,是一种分布式架构。

具体来说,Nacos集群的架构特点如下:

1. 分布式存储:配置数据存储在分布式数据库中,确保数据的一致性和可靠性。

2. 服务注册与发现:Nacos提供服务注册与发现功能,使得服务之间可以相互发现,提高系统的可扩展性和可维护性。

3. 配置管理:Nacos提供集中式的配置管理功能,使得配置的修改和更新更加方便。

4. 动态配置:Nacos支持动态配置,使得配置的更新可以实时生效,无需重启服务。

总结来说,Nacos集群通过主从复制、数据持久化、负载均衡等机制,实现了高可用性,确保了服务节点的稳定运行。

Nacos作为配置中心,必然需要保证服务节点的高可用性,那么Nacos是如何实现集群的呢?

下面这个图,表示Nacos集群的部署图。

Nacos集群工作原理

Nacos作为配置中心的集群结构中,是一种无中心化节点的设计,由于没有主从节点,也没有选举机制,所以为了能够实现热备,就需要增加虚拟IP(VIP)。

Nacos的数据存储分为两部分

  1. Mysql数据库存储,所有Nacos节点共享同一份数据,数据的副本机制由Mysql本身的主从方案来解决,从而保证数据的可靠性。
  2. 每个节点的本地磁盘,会保存一份全量数据,具体路径:/data/program/nacos-1/data/config-data/${GROUP}.

在Nacos的设计中,Mysql是一个中心数据仓库,且认为在Mysql中的数据是绝对正确的。 除此之外,Nacos在启动时会把Mysql中的数据写一份到本地磁盘。

这么设计的好处是可以提高性能,当客户端需要请求某个配置项时,服务端会想Ian从磁盘中读取对应文件返回,而磁盘的读取效率要比数据库效率高。

当配置发生变更时:

  1. Nacos会把变更的配置保存到数据库,然后再写入本地文件。
  2. 接着发送一个HTTP请求,给到集群中的其他节点,其他节点收到事件后,从Mysql中dump刚刚写入的数据到本地文件中。

另外,NacosServer启动后,会同步启动一个定时任务,每隔6小时,会dump一次全量数据到本地文件

配置变更同步入口

当配置发生修改、删除、新增操作时,通过发布一个notifyConfigChange事件。

@PostMapping @Secured(action = ActionTypes.WRITE, parser = ConfigResourceParser.class) public Boolean publishConfig(HttpServletRequest request, HttpServletResponse response, @RequestParam(value = "dataId") String dataId, @RequestParam(value = "group") String group, @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant, @RequestParam(value = "content") String content, @RequestParam(value = "tag", required = false) String tag, @RequestParam(value = "appName", required = false) String appName, @RequestParam(value = "src_user", required = false) String srcUser, @RequestParam(value = "config_tags", required = false) String configTags, @RequestParam(value = "desc", required = false) String desc, @RequestParam(value = "use", required = false) String use, @RequestParam(value = "effect", required = false) String effect, @RequestParam(value = "type", required = false) String type, @RequestParam(value = "schema", required = false) String schema) throws NacosException { //省略.. if (StringUtils.isBlank(betaIps)) { if (StringUtils.isBlank(tag)) { persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, true); ConfigChangePublisher .notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime())); } else { persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, true); ConfigChangePublisher.notifyConfigChange( new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime())); } }//省略 return true; } AsyncNotifyService

配置数据变更事件,专门有一个监听器AsyncNotifyService,它会处理数据变更后的同步事件。

@Autowired public AsyncNotifyService(ServerMemberManager memberManager) { this.memberManager = memberManager; // Register ConfigDataChangeEvent to NotifyCenter. NotifyCenter.registerToPublisher(ConfigDataChangeEvent.class, NotifyCenter.ringBufferSize); // Register A Subscriber to subscribe ConfigDataChangeEvent. NotifyCenter.registerSubscriber(new Subscriber() { @Override public void onEvent(Event event) { // Generate ConfigDataChangeEvent concurrently if (event instanceof ConfigDataChangeEvent) { ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event; long dumpTs = evt.lastModifiedTs; String dataId = evt.dataId; String group = evt.group; String tenant = evt.tenant; String tag = evt.tag; Collection<Member> ipList = memberManager.allMembers(); //得到集群中的ip列表 // 构建NotifySingleTask,并添加到队列中。 Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>(); for (Member member : ipList) { //遍历集群中的每个节点 queue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, member.getAddress(), evt.isBeta)); } //异步执行任务 AsyncTask ConfigExecutor.executeAsyncNotify(new AsyncTask(nacosAsyncRestTemplate, queue)); } } @Override public Class<? extends Event> subscribeType() { return ConfigDataChangeEvent.class; } }); } AsyncTask

@Override public void run() { executeAsyncInvoke(); } private void executeAsyncInvoke() { while (!queue.isEmpty()) {//遍历队列中的数据,直到数据为空 NotifySingleTask task = queue.poll(); //获取task String targetIp = task.getTargetIP(); //获取目标ip if (memberManager.hasMember(targetIp)) { //如果集群中的ip列表包含目标ip // start the health check and there are ips that are not monitored, put them directly in the notification queue, otherwise notify //判断目标ip的健康状态 boolean unHealthNeedDelay = memberManager.isUnHealth(targetIp); // if (unHealthNeedDelay) { //如果目标服务是非健康,则继续添加到队列中,延后再执行。 // target ip is unhealthy, then put it in the notification list ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null, task.getLastModified(), InetUtils.getSelfIP(), ConfigTraceService.NOTIFY_EVENT_UNHEALTH, 0, task.target); // get delay time and set fail count to the task asyncTaskExecute(task); } else { //构建header Header header = Header.newInstance(); header.addParam(NotifyService.NOTIFY_HEADER_LAST_MODIFIED, String.valueOf(task.getLastModified())); header.addParam(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP, InetUtils.getSelfIP()); if (task.isBeta) { header.addParam("isBeta", "true"); } AuthHeaderUtil.addIdentityToHeader(header); //通过restTemplate发起远程调用,如果调用成功,则执行AsyncNotifyCallBack的回调方法 restTemplate.get(task.url, header, Query.EMPTY, String.class, new AsyncNotifyCallBack(task)); } } } } 目标节点接收请求

数据同步的请求地址为,task.url=192.168.8.16:8848/nacos/v1/cs/communication/dataChange?dataId=log.yaml&group=DEFAULT_GROUP

@GetMapping("/dataChange") public Boolean notifyConfigInfo(HttpServletRequest request, @RequestParam("dataId") String dataId, @RequestParam("group") String group, @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant, @RequestParam(value = "tag", required = false) String tag) { dataId = dataId.trim(); group = group.trim(); String lastModified = request.getHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED); long lastModifiedTs = StringUtils.isEmpty(lastModified) ? -1 : Long.parseLong(lastModified); String handleIp = request.getHeader(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP); String isBetaStr = request.getHeader("isBeta"); if (StringUtils.isNotBlank(isBetaStr) && trueStr.equals(isBetaStr)) { dumpService.dump(dataId, group, tenant, lastModifiedTs, handleIp, true); } else { // dumpService.dump(dataId, group, tenant, tag, lastModifiedTs, handleIp); } return true; }

dumpService.dump用来实现配置的更新,代码如下

当前任务会被添加到DumpTaskMgr中管理。

public void dump(String dataId, String group, String tenant, String tag, long lastModified, String handleIp, boolean isBeta) { String groupKey = GroupKey2.getKey(dataId, group, tenant); String taskKey = String.join("+", dataId, group, tenant, String.valueOf(isBeta), tag); dumpTaskMgr.addTask(taskKey, new DumpTask(groupKey, tag, lastModified, handleIp, isBeta)); DUMP_LOG.info("[dump-task] add task. groupKey={}, taskKey={}", groupKey, taskKey); }

TaskManager.addTask, 先调用父类去完成任务添加。

@Override public void addTask(Object key, AbstractDelayTask newTask) { super.addTask(key, newTask); MetricsMonitor.getDumpTaskMonitor().set(tasks.size()); }

在这种场景设计中,一般都会采用生产者消费者模式来完成,因此这里不难猜测到,任务会被保存到一个队列中,然后有另外一个线程来执行。

NacosDelayTaskExecuteEngine

TaskManager的父类是NacosDelayTaskExecuteEngine,

这个类中有一个成员属性protected final ConcurrentHashMap<Object, AbstractDelayTask> tasks;,专门来保存延期执行的任务类型AbstractDelayTask.

在这个类的构造方法中,初始化了一个延期执行的任务,其中具体的任务是ProcessRunnable.

public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) { super(logger); tasks = new ConcurrentHashMap<Object, AbstractDelayTask>(initCapacity); processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name)); processingExecutor .scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS); } ProcessRunnable

private class ProcessRunnable implements Runnable { @Override public void run() { try { processTasks(); } catch (Throwable e) { getEngineLog().error(e.toString(), e); } } } processTasks

protected void processTasks() { //获取所有的任务 Collection<Object> keys = getAllTaskKeys(); for (Object taskKey : keys) { AbstractDelayTask task = removeTask(taskKey); if (null == task) { continue; } //获取任务处理器,这里返回的是DumpProcessor NacosTaskProcessor processor = getProcessor(taskKey); if (null == processor) { getEngineLog().error("processor not found for task, so discarded. " + task); continue; } try { // ReAdd task if process failed //执行具体任务 if (!processor.process(task)) { retryFailedTask(taskKey, task); } } catch (Throwable e) { getEngineLog().error("Nacos task execute error : " + e.toString(), e); retryFailedTask(taskKey, task); } } } DumpProcessor.process

读取数据库的最新数据,然后更新本地缓存和磁盘。

版权声明:本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 Mic带你学架构
如果本篇文章对您有帮助,还请帮忙点个关注和赞,您的坚持是我不断创作的动力。欢迎关注「跟着Mic学架构」公众号公众号获取更多技术干货!