Spring Cloud Alibaba Nacos如何实现服务端注册流程?

2026-05-21 03:513阅读0评论SEO教程
  • 内容介绍
  • 文章标签
  • 相关推荐

本文共计3025个文字,预计阅读时间需要13分钟。

Spring Cloud Alibaba Nacos如何实现服务端注册流程?

Nacos+Server服务注册流程——AP模式+Nacos主要采用AP模式,其RaftConsistencyServiceImpl负责一致性。在nacos-naming工程下的InstanceController类中,register方法作为服务注册的入口。

一、Nacos Server服务注册流程——AP模式

  • Nacos主要是AP模式,CP模式的RaftConsistencyServiceImpl。

1、在Nacos Server的nacos-naming工程下的InstanceController类中的register方法作为服务注册的入口

@RestController @RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance") public class InstanceController { @Autowired private ServiceManager serviceManager; @CanDistro @PostMapping @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE) public String register(HttpServletRequest request) throws Exception { //这里可以看出Nacos作为服务注册中心没有用到group //命名空间 final String namespaceId = WebUtils .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); //服务名称 final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); NamingUtils.checkServiceNameFormat(serviceName); //将服务注册注册请求参数转换成Instance final Instance instance = parseInstance(request); //注册实例 serviceManager.registerInstance(namespaceId, serviceName, instance); return "ok"; } }

2、serviceManager.registerInstance注册服务实例

@Component public class ServiceManager implements RecordListener<Service> { //Nacos的注册表 private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>(); @Resource(name = "consistencyDelegate") private ConsistencyService consistencyService; public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException { //创建一个空的服务 createEmptyService(namespaceId, serviceName, instance.isEphemeral()); //获取服务,从Nacos的注册表获取服务 Service service = getService(namespaceId, serviceName); if (service == null) { throw new NacosException(NacosException.INVALID_PARAM, "service not found, namespace: " + namespaceId + ", service: " + serviceName); } //新增实例 addInstance(namespaceId, serviceName, instance.isEphemeral(), instance); } public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException { String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral); //获取服务,从Nacos的注册表获取服务 Service service = getService(namespaceId, serviceName); //加锁,同一时间同一命名空间下的同一服务,只能允许有一个服务注册请求 synchronized (service) { //更新并返回总的instanceList列表 List<Instance> instanceList = addIpAddresses(service, ephemeral, ips); //创建新的instance列表对象 Instances instances = new Instances(); instances.setInstanceList(instanceList); //将实例列表集合和key设置进consistencyService中 consistencyService.put(key, instances); } } //获取服务 public Service getService(String namespaceId, String serviceName) { if (serviceMap.get(namespaceId) == null) { return null; } return chooseServiceMap(namespaceId).get(serviceName); } }

3、addIpAddresses更新并返回总的instance服务实例列表

这里面还做了挺多的事,先是获取老的数据(持久的或者临时的),从一致性服务里获取,因为这个数据是要同步更新的,所以要拿出来及时更新,然后获取服务实例(持久的或者临时的),用他们来更新的老的数据,然后遍历新增的实例,如果没有集群的话先创建集群,并初始化集群,会开启心跳检查,最后根据是添加还是删除实例来更新老的实例映射,最后封装成集合返回最新的实例集合。

@Component public class ServiceManager implements RecordListener<Service> { @Resource(name = "consistencyDelegate") private ConsistencyService consistencyService; private List<Instance> addIpAddresses(Service service, boolean ephemeral, Instance... ips) throws NacosException { return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD, ephemeral, ips); } public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips) throws NacosException { //重DataStore类中的dataMap获取老的实例集合数据 Datum datum = consistencyService .get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral)); //获取集群中所有相关的实例集合,临时的或者是永久的 List<Instance> currentIPs = service.allIPs(ephemeral); //IP端口和实例的映射 Map<String, Instance> currentInstances = new HashMap<>(currentIPs.size()); //实例ID集合 Set<String> currentInstanceIds = Sets.newHashSet(); //放入对应的集合里 for (Instance instance : currentIPs) { currentInstances.put(instance.toIpAddr(), instance); currentInstanceIds.add(instance.getInstanceId()); } //更新后的老的实例集合 Map<String, Instance> instanceMap; if (datum != null && null != datum.value) { //根据当前服务实例的健康标志和心跳时间,来更新老的实例集合数据 instanceMap = setValid(((Instances) datum.value).getInstanceList(), currentInstances); } else { //重新创建一个 instanceMap = new HashMap<>(ips.length); } for (Instance instance : ips) { //遍历新的实例 if (!service.getClusterMap().containsKey(instance.getClusterName())) { //不存在就创建服务实例集群 Cluster cluster = new Cluster(instance.getClusterName(), service); //初始化,开启集群心跳检查 cluster.init(); //添加服务实例集群 service.getClusterMap().put(instance.getClusterName(), cluster); Loggers.SRV_LOG .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.", instance.getClusterName(), instance.toJson()); } //删除操作的话就删除老的实例集合的数据 if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) { instanceMap.remove(instance.getDatumKey()); } else { //否则添加 Instance oldInstance = instanceMap.get(instance.getDatumKey()); if (oldInstance != null) { //存在原实例,则直接使用原服务InstanceId instance.setInstanceId(oldInstance.getInstanceId()); } else { //否则,则直接使用原服务InstanceId instance.setInstanceId(instance.generateInstanceId(currentInstanceIds)); } instanceMap.put(instance.getDatumKey(), instance); } } if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) { throw new IllegalArgumentException( "ip list can not be empty, service: " + service.getName() + ", ip list: " + JacksonUtils .toJson(instanceMap.values())); } //返回总的实例集合 return new ArrayList<>(instanceMap.values()); } }
  • Service的allIPs获取集群中的实例集合 遍历集群,获取集群里的实例集合,临时的或者是永久的。
public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements Record, RecordListener<Instances> { private Map<String, Cluster> clusterMap = new HashMap<>(); public List<Instance> allIPs(boolean ephemeral) { List<Instance> result = new ArrayList<>(); for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) { result.addAll(entry.getValue().allIPs(ephemeral)); } return result; } }
  • ServiceManager的setValid更新老的实例集合
  • 其实就是用服务集群中获取的实例集合去更新老的实例集合,健康状态和心跳时间。
@Component public class ServiceManager implements RecordListener<Service> { private Map<String, Instance> setValid(List<Instance> oldInstances, Map<String, Instance> map) { Map<String, Instance> instanceMap = new HashMap<>(oldInstances.size()); //遍历老的实例集合,如果新的实例存在的话就更新 for (Instance instance : oldInstances) { //获取对应新的实例 Instance instance1 = map.get(instance.toIpAddr()); //存在就更新 if (instance1 != null) { instance.setHealthy(instance1.isHealthy()); instance.setLastBeat(instance1.getLastBeat()); } //放入映射 instanceMap.put(instance.getDatumKey(), instance); } return instanceMap; } }
  • Cluster的init集群初始化
  • 即是开启一个心跳检查的任务
public class Cluster extends com.alibaba.nacos.api.naming.pojo.Cluster implements Cloneable { private HealthCheckTask checkTask; public void init() { if (inited) { return; } checkTask = new HealthCheckTask(this); HealthCheckReactor.scheduleCheck(checkTask); inited = true; } }

4、将服务注册请求放入到ArrayBlockingQueue阻塞队列中,并将服务实例存入DataStore中的dataMap中

@DependsOn("ProtocolManager") @Service("consistencyDelegate") public class DelegateConsistencyServiceImpl implements ConsistencyService { @Override public void put(String key, Record value) throws NacosException { mapConsistencyService(key).put(key, value); } } @DependsOn("ProtocolManager") @org.springframework.stereotype.Service("distroConsistencyService") public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor { private final DataStore dataStore; private volatile Notifier notifier = new Notifier(); @Override public void put(String key, Record value) throws NacosException { onPut(key, value); distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE, globalConfig.getTaskDispatchPeriod() / 2); } public void onPut(String key, Record value) { if (KeyBuilder.matchEphemeralInstanceListKey(key)) { Datum<Instances> datum = new Datum<>(); datum.value = (Instances) value; datum.key = key; datum.timestamp.incrementAndGet(); //将服务实例集合存入DataStore中的dataMap中 dataStore.put(key, datum); } if (!listeners.containsKey(key)) { return; } //添加数据变更的任务 notifier.addTask(key, DataOperation.CHANGE); } public class Notifier implements Runnable { private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024); private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024); /** * Add new notify task to queue. * * @param datumKey data key * @param action action for data */ public void addTask(String datumKey, DataOperation action) { if (services.containsKey(datumKey) && action == DataOperation.CHANGE) { return; } if (action == DataOperation.CHANGE) { services.put(datumKey, StringUtils.EMPTY); } //在ArrayBlockingQueue中添加任务 tasks.offer(Pair.with(datumKey, action)); } } }

注意:在DistroConsistencyServiceImpl实例化完成之后,启动异步线程池,监听ArrayBlockingQueue中的任务,进行实时消费

Spring Cloud Alibaba Nacos如何实现服务端注册流程?

@DependsOn("ProtocolManager") @org.springframework.stereotype.Service("distroConsistencyService") public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor { private volatile Notifier notifier = new Notifier(); @PostConstruct public void init() { //当前Bean实例话后,启动异步线程池,监听ArrayBlockingQueue中的任务进行消费 GlobalExecutor.submitDistroNotifyTask(notifier); } }

5、将服务注册请求放入到ArrayBlockingQueue阻塞队列后,处理该阻塞队列中的任务,在Notifier中的run方法中处理该任务

@DependsOn("ProtocolManager") @org.springframework.stereotype.Service("distroConsistencyService") public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor { private final DataStore dataStore; private volatile Notifier notifier = new Notifier(); public class Notifier implements Runnable { private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024); private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024); public int getTaskSize() { return tasks.size(); } @Override public void run() { Loggers.DISTRO.info("distro notifier started"); for (; ; ) { try { //取出注册请求任务 Pair<String, DataOperation> pair = tasks.take(); //处理任务 handle(pair); } catch (Throwable e) { Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e); } } } private void handle(Pair<String, DataOperation> pair) { try { String datumKey = pair.getValue0(); DataOperation action = pair.getValue1(); services.remove(datumKey); int count = 0; if (!listeners.containsKey(datumKey)) { return; } for (RecordListener listener : listeners.get(datumKey)) { count++; try { //如果是一个数据变更动作,服务注册数据数据变更 if (action == DataOperation.CHANGE) { //从dataStore中获取服务注册请求放入的服务实例集合 listener.onChange(datumKey, dataStore.get(datumKey).value); continue; } //如果是一个数据删除动作 if (action == DataOperation.DELETE) { listener.onDelete(datumKey); continue; } } catch (Throwable e) { Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e); } } if (Loggers.DISTRO.isDebugEnabled()) { Loggers.DISTRO .debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}", datumKey, count, action.name()); } } catch (Throwable e) { Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e); } } } }

6、从dataStore中获取服务注册请求放入的服务实例集合,调用listener.onChange方法注册

@JsonInclude(Include.NON_NULL) public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements Record, RecordListener<Instances> { private Map<String, Cluster> clusterMap = new HashMap<>(); @Override public void onChange(String key, Instances value) throws Exception { Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value); for (Instance instance : value.getInstanceList()) { if (instance == null) { // Reject this abnormal instance list: throw new RuntimeException("got null instance " + key); } //权重最大值边界设定 if (instance.getWeight() > 10000.0D) { instance.setWeight(10000.0D); } //权重最小值边界设定 if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) { instance.setWeight(0.01D); } } //更新IP列表 updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key)); recalculateChecksum(); } public void updateIPs(Collection<Instance> instances, boolean ephemeral) { Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size()); for (String clusterName : clusterMap.keySet()) { ipMap.put(clusterName, new ArrayList<>()); } //遍历服务注册的实例列表 for (Instance instance : instances) { try { if (instance == null) { Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null"); continue; } if (StringUtils.isEmpty(instance.getClusterName())) { //ClusterName为空,则设置默认值 instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME); } //如果不包含ClusterName,则初始化 if (!clusterMap.containsKey(instance.getClusterName())) { Loggers.SRV_LOG .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.", instance.getClusterName(), instance.toJson()); Cluster cluster = new Cluster(instance.getClusterName(), this); //初始化Cluster,即先创建服务健康检查任务, //并调用HealthCheckReactor.scheduleCheck执行健康检查任务,即心跳机制 cluster.init(); //根据ClusterName注册服务集群实例 getClusterMap().put(instance.getClusterName(), cluster); } //根据ClusterName获取集群IP集合 List<Instance> clusterIPs = ipMap.get(instance.getClusterName()); if (clusterIPs == null) { //IP列表为空,也注册IP为空的服务 clusterIPs = new LinkedList<>(); ipMap.put(instance.getClusterName(), clusterIPs); } //新增服务实例 clusterIPs.add(instance); } catch (Exception e) { Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e); } } for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) { //make every ip mine List<Instance> entryIPs = entry.getValue(); //更新为临时节点 clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral); } //设置最后更新的时间 setLastModifiedMillis(System.currentTimeMillis()); //广播,UDP通知通客户端service发生了改变 getPushService().serviceChanged(this); StringBuilder stringBuilder = new StringBuilder(); for (Instance instance : allIPs()) { stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(","); } Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}", getNamespaceId(), getName(), stringBuilder.toString()); } } public class Cluster extends com.alibaba.nacos.api.naming.pojo.Cluster implements Cloneable { @JsonIgnore private HealthCheckTask checkTask; public void init() { if (inited) { return; } //创建健康检查任务 checkTask = new HealthCheckTask(this); //执行健康检查任务 HealthCheckReactor.scheduleCheck(checkTask); inited = true; } }
  • 最核心的就是updateIPs
public class Cluster extends com.alibaba.nacos.api.naming.pojo.Cluster implements Cloneable { @JsonIgnore private Set<Instance> persistentInstances = new HashSet<>(); @JsonIgnore private Set<Instance> ephemeralInstances = new HashSet<>(); public void updateIps(List<Instance> ips, boolean ephemeral) { //拿到cluster中旧的instance列表 Set<Instance> toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances; HashMap<String, Instance> oldIpMap = new HashMap<>(toUpdateInstances.size()); for (Instance ip : toUpdateInstances) { oldIpMap.put(ip.getDatumKey(), ip); } //updatedIps主要做的是找出oldipmap中的实例并返回 List<Instance> updatedIPs = updatedIps(ips, oldIpMap.values()); if (updatedIPs.size() > 0) { for (Instance ip : updatedIPs) { Instance oldIP = oldIpMap.get(ip.getDatumKey()); // do not update the ip validation status of updated ips // because the checker has the most precise result // Only when ip is not marked, don't we update the health status of IP: if (!ip.isMarked()) { ip.setHealthy(oldIP.isHealthy()); } if (ip.isHealthy() != oldIP.isHealthy()) { // ip validation status updated Loggers.EVT_LOG.info("{} {SYNC} IP-{} {}:{}@{}", getService().getName(), (ip.isHealthy() ? "ENABLED" : "DISABLED"), ip.getIp(), ip.getPort(), getName()); } if (ip.getWeight() != oldIP.getWeight()) { // ip validation status updated Loggers.EVT_LOG.info("{} {SYNC} {IP-UPDATED} {}->{}", getService().getName(), oldIP.toString(), ip.toString()); } } } //找出新增的实例列表,即ips中的实例,oldipmap不存在的实例列表 List<Instance> newIPs = subtract(ips, oldIpMap.values()); if (newIPs.size() > 0) { Loggers.EVT_LOG .info("{} {SYNC} {IP-NEW} cluster: {}, new ips size: {}, content: {}", getService().getName(), getName(), newIPs.size(), newIPs.toString()); for (Instance ip : newIPs) { //进行新实例的健康检查设置 HealthCheckStatus.reset(ip); } } //找出oldipmap的ip的实例。不存在于ips中的实例 List<Instance> deadIPs = subtract(oldIpMap.values(), ips); if (deadIPs.size() > 0) { Loggers.EVT_LOG .info("{} {SYNC} {IP-DEAD} cluster: {}, dead ips size: {}, content: {}", getService().getName(), getName(), deadIPs.size(), deadIPs.toString()); for (Instance ip : deadIPs) { //将不存在新实例ip列表的值的健康检查删除 HealthCheckStatus.remv(ip); } } toUpdateInstances = new HashSet<>(ips); //更新服务下cluster的intances列表 if (ephemeral) { ephemeralInstances = toUpdateInstances; } else { persistentInstances = toUpdateInstances; } } }

为了防止读写并发冲突,直接创建了一个新的HashMap,然后去操作新的HashMap,操作完了之后再去替换老的Map数据,CopyOnWrite的思想。最后还发布了服务变化事件。

  • 服务注册通过CopyOnWrite支持并发读写的能力
  • Cluster类中的updateIPs方法中是对原服务IP列表的副本进行操作,注册完成替换原有服务IP列表即可,即CopyOnWrite操作,不需要加锁,性能高,存在服务延迟。

Eureka防止读写冲突用的是多级缓存结构,多级缓存定时同步,客户端感知及时性不如Nacos。

7、同步实例信息到Nacos Server集群其它节点

回到之前的代码,put方法中distroProtocol.sync();进行同步信息到集群其它节点,跟进代码:

@DependsOn("ProtocolManager") @org.springframework.stereotype.Service("distroConsistencyService") public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor { private final DistroProtocol distroProtocol; @Override public void put(String key, Record value) throws NacosException { onPut(key, value); distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE, globalConfig.getTaskDispatchPeriod() / 2); } }
  • 通过newSingleScheduledExecutorService.scheduleWithFixedDelay()定时执行ProcessRunnable任务,发送www.cnblogs.com/chz-blogs/p/14325288.html

    www.cnblogs.com/guoxiaoyu/p/14248226.html

    blog.csdn.net/wangwei19871103/article/details/105834317

    blog.csdn.net/wangwei19871103/article/details/105835207

本文共计3025个文字,预计阅读时间需要13分钟。

Spring Cloud Alibaba Nacos如何实现服务端注册流程?

Nacos+Server服务注册流程——AP模式+Nacos主要采用AP模式,其RaftConsistencyServiceImpl负责一致性。在nacos-naming工程下的InstanceController类中,register方法作为服务注册的入口。

一、Nacos Server服务注册流程——AP模式

  • Nacos主要是AP模式,CP模式的RaftConsistencyServiceImpl。

1、在Nacos Server的nacos-naming工程下的InstanceController类中的register方法作为服务注册的入口

@RestController @RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance") public class InstanceController { @Autowired private ServiceManager serviceManager; @CanDistro @PostMapping @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE) public String register(HttpServletRequest request) throws Exception { //这里可以看出Nacos作为服务注册中心没有用到group //命名空间 final String namespaceId = WebUtils .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); //服务名称 final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); NamingUtils.checkServiceNameFormat(serviceName); //将服务注册注册请求参数转换成Instance final Instance instance = parseInstance(request); //注册实例 serviceManager.registerInstance(namespaceId, serviceName, instance); return "ok"; } }

2、serviceManager.registerInstance注册服务实例

@Component public class ServiceManager implements RecordListener<Service> { //Nacos的注册表 private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>(); @Resource(name = "consistencyDelegate") private ConsistencyService consistencyService; public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException { //创建一个空的服务 createEmptyService(namespaceId, serviceName, instance.isEphemeral()); //获取服务,从Nacos的注册表获取服务 Service service = getService(namespaceId, serviceName); if (service == null) { throw new NacosException(NacosException.INVALID_PARAM, "service not found, namespace: " + namespaceId + ", service: " + serviceName); } //新增实例 addInstance(namespaceId, serviceName, instance.isEphemeral(), instance); } public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException { String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral); //获取服务,从Nacos的注册表获取服务 Service service = getService(namespaceId, serviceName); //加锁,同一时间同一命名空间下的同一服务,只能允许有一个服务注册请求 synchronized (service) { //更新并返回总的instanceList列表 List<Instance> instanceList = addIpAddresses(service, ephemeral, ips); //创建新的instance列表对象 Instances instances = new Instances(); instances.setInstanceList(instanceList); //将实例列表集合和key设置进consistencyService中 consistencyService.put(key, instances); } } //获取服务 public Service getService(String namespaceId, String serviceName) { if (serviceMap.get(namespaceId) == null) { return null; } return chooseServiceMap(namespaceId).get(serviceName); } }

3、addIpAddresses更新并返回总的instance服务实例列表

这里面还做了挺多的事,先是获取老的数据(持久的或者临时的),从一致性服务里获取,因为这个数据是要同步更新的,所以要拿出来及时更新,然后获取服务实例(持久的或者临时的),用他们来更新的老的数据,然后遍历新增的实例,如果没有集群的话先创建集群,并初始化集群,会开启心跳检查,最后根据是添加还是删除实例来更新老的实例映射,最后封装成集合返回最新的实例集合。

@Component public class ServiceManager implements RecordListener<Service> { @Resource(name = "consistencyDelegate") private ConsistencyService consistencyService; private List<Instance> addIpAddresses(Service service, boolean ephemeral, Instance... ips) throws NacosException { return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD, ephemeral, ips); } public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips) throws NacosException { //重DataStore类中的dataMap获取老的实例集合数据 Datum datum = consistencyService .get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral)); //获取集群中所有相关的实例集合,临时的或者是永久的 List<Instance> currentIPs = service.allIPs(ephemeral); //IP端口和实例的映射 Map<String, Instance> currentInstances = new HashMap<>(currentIPs.size()); //实例ID集合 Set<String> currentInstanceIds = Sets.newHashSet(); //放入对应的集合里 for (Instance instance : currentIPs) { currentInstances.put(instance.toIpAddr(), instance); currentInstanceIds.add(instance.getInstanceId()); } //更新后的老的实例集合 Map<String, Instance> instanceMap; if (datum != null && null != datum.value) { //根据当前服务实例的健康标志和心跳时间,来更新老的实例集合数据 instanceMap = setValid(((Instances) datum.value).getInstanceList(), currentInstances); } else { //重新创建一个 instanceMap = new HashMap<>(ips.length); } for (Instance instance : ips) { //遍历新的实例 if (!service.getClusterMap().containsKey(instance.getClusterName())) { //不存在就创建服务实例集群 Cluster cluster = new Cluster(instance.getClusterName(), service); //初始化,开启集群心跳检查 cluster.init(); //添加服务实例集群 service.getClusterMap().put(instance.getClusterName(), cluster); Loggers.SRV_LOG .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.", instance.getClusterName(), instance.toJson()); } //删除操作的话就删除老的实例集合的数据 if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) { instanceMap.remove(instance.getDatumKey()); } else { //否则添加 Instance oldInstance = instanceMap.get(instance.getDatumKey()); if (oldInstance != null) { //存在原实例,则直接使用原服务InstanceId instance.setInstanceId(oldInstance.getInstanceId()); } else { //否则,则直接使用原服务InstanceId instance.setInstanceId(instance.generateInstanceId(currentInstanceIds)); } instanceMap.put(instance.getDatumKey(), instance); } } if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) { throw new IllegalArgumentException( "ip list can not be empty, service: " + service.getName() + ", ip list: " + JacksonUtils .toJson(instanceMap.values())); } //返回总的实例集合 return new ArrayList<>(instanceMap.values()); } }
  • Service的allIPs获取集群中的实例集合 遍历集群,获取集群里的实例集合,临时的或者是永久的。
public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements Record, RecordListener<Instances> { private Map<String, Cluster> clusterMap = new HashMap<>(); public List<Instance> allIPs(boolean ephemeral) { List<Instance> result = new ArrayList<>(); for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) { result.addAll(entry.getValue().allIPs(ephemeral)); } return result; } }
  • ServiceManager的setValid更新老的实例集合
  • 其实就是用服务集群中获取的实例集合去更新老的实例集合,健康状态和心跳时间。
@Component public class ServiceManager implements RecordListener<Service> { private Map<String, Instance> setValid(List<Instance> oldInstances, Map<String, Instance> map) { Map<String, Instance> instanceMap = new HashMap<>(oldInstances.size()); //遍历老的实例集合,如果新的实例存在的话就更新 for (Instance instance : oldInstances) { //获取对应新的实例 Instance instance1 = map.get(instance.toIpAddr()); //存在就更新 if (instance1 != null) { instance.setHealthy(instance1.isHealthy()); instance.setLastBeat(instance1.getLastBeat()); } //放入映射 instanceMap.put(instance.getDatumKey(), instance); } return instanceMap; } }
  • Cluster的init集群初始化
  • 即是开启一个心跳检查的任务
public class Cluster extends com.alibaba.nacos.api.naming.pojo.Cluster implements Cloneable { private HealthCheckTask checkTask; public void init() { if (inited) { return; } checkTask = new HealthCheckTask(this); HealthCheckReactor.scheduleCheck(checkTask); inited = true; } }

4、将服务注册请求放入到ArrayBlockingQueue阻塞队列中,并将服务实例存入DataStore中的dataMap中

@DependsOn("ProtocolManager") @Service("consistencyDelegate") public class DelegateConsistencyServiceImpl implements ConsistencyService { @Override public void put(String key, Record value) throws NacosException { mapConsistencyService(key).put(key, value); } } @DependsOn("ProtocolManager") @org.springframework.stereotype.Service("distroConsistencyService") public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor { private final DataStore dataStore; private volatile Notifier notifier = new Notifier(); @Override public void put(String key, Record value) throws NacosException { onPut(key, value); distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE, globalConfig.getTaskDispatchPeriod() / 2); } public void onPut(String key, Record value) { if (KeyBuilder.matchEphemeralInstanceListKey(key)) { Datum<Instances> datum = new Datum<>(); datum.value = (Instances) value; datum.key = key; datum.timestamp.incrementAndGet(); //将服务实例集合存入DataStore中的dataMap中 dataStore.put(key, datum); } if (!listeners.containsKey(key)) { return; } //添加数据变更的任务 notifier.addTask(key, DataOperation.CHANGE); } public class Notifier implements Runnable { private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024); private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024); /** * Add new notify task to queue. * * @param datumKey data key * @param action action for data */ public void addTask(String datumKey, DataOperation action) { if (services.containsKey(datumKey) && action == DataOperation.CHANGE) { return; } if (action == DataOperation.CHANGE) { services.put(datumKey, StringUtils.EMPTY); } //在ArrayBlockingQueue中添加任务 tasks.offer(Pair.with(datumKey, action)); } } }

注意:在DistroConsistencyServiceImpl实例化完成之后,启动异步线程池,监听ArrayBlockingQueue中的任务,进行实时消费

Spring Cloud Alibaba Nacos如何实现服务端注册流程?

@DependsOn("ProtocolManager") @org.springframework.stereotype.Service("distroConsistencyService") public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor { private volatile Notifier notifier = new Notifier(); @PostConstruct public void init() { //当前Bean实例话后,启动异步线程池,监听ArrayBlockingQueue中的任务进行消费 GlobalExecutor.submitDistroNotifyTask(notifier); } }

5、将服务注册请求放入到ArrayBlockingQueue阻塞队列后,处理该阻塞队列中的任务,在Notifier中的run方法中处理该任务

@DependsOn("ProtocolManager") @org.springframework.stereotype.Service("distroConsistencyService") public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor { private final DataStore dataStore; private volatile Notifier notifier = new Notifier(); public class Notifier implements Runnable { private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024); private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024); public int getTaskSize() { return tasks.size(); } @Override public void run() { Loggers.DISTRO.info("distro notifier started"); for (; ; ) { try { //取出注册请求任务 Pair<String, DataOperation> pair = tasks.take(); //处理任务 handle(pair); } catch (Throwable e) { Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e); } } } private void handle(Pair<String, DataOperation> pair) { try { String datumKey = pair.getValue0(); DataOperation action = pair.getValue1(); services.remove(datumKey); int count = 0; if (!listeners.containsKey(datumKey)) { return; } for (RecordListener listener : listeners.get(datumKey)) { count++; try { //如果是一个数据变更动作,服务注册数据数据变更 if (action == DataOperation.CHANGE) { //从dataStore中获取服务注册请求放入的服务实例集合 listener.onChange(datumKey, dataStore.get(datumKey).value); continue; } //如果是一个数据删除动作 if (action == DataOperation.DELETE) { listener.onDelete(datumKey); continue; } } catch (Throwable e) { Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e); } } if (Loggers.DISTRO.isDebugEnabled()) { Loggers.DISTRO .debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}", datumKey, count, action.name()); } } catch (Throwable e) { Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e); } } } }

6、从dataStore中获取服务注册请求放入的服务实例集合,调用listener.onChange方法注册

@JsonInclude(Include.NON_NULL) public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements Record, RecordListener<Instances> { private Map<String, Cluster> clusterMap = new HashMap<>(); @Override public void onChange(String key, Instances value) throws Exception { Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value); for (Instance instance : value.getInstanceList()) { if (instance == null) { // Reject this abnormal instance list: throw new RuntimeException("got null instance " + key); } //权重最大值边界设定 if (instance.getWeight() > 10000.0D) { instance.setWeight(10000.0D); } //权重最小值边界设定 if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) { instance.setWeight(0.01D); } } //更新IP列表 updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key)); recalculateChecksum(); } public void updateIPs(Collection<Instance> instances, boolean ephemeral) { Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size()); for (String clusterName : clusterMap.keySet()) { ipMap.put(clusterName, new ArrayList<>()); } //遍历服务注册的实例列表 for (Instance instance : instances) { try { if (instance == null) { Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null"); continue; } if (StringUtils.isEmpty(instance.getClusterName())) { //ClusterName为空,则设置默认值 instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME); } //如果不包含ClusterName,则初始化 if (!clusterMap.containsKey(instance.getClusterName())) { Loggers.SRV_LOG .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.", instance.getClusterName(), instance.toJson()); Cluster cluster = new Cluster(instance.getClusterName(), this); //初始化Cluster,即先创建服务健康检查任务, //并调用HealthCheckReactor.scheduleCheck执行健康检查任务,即心跳机制 cluster.init(); //根据ClusterName注册服务集群实例 getClusterMap().put(instance.getClusterName(), cluster); } //根据ClusterName获取集群IP集合 List<Instance> clusterIPs = ipMap.get(instance.getClusterName()); if (clusterIPs == null) { //IP列表为空,也注册IP为空的服务 clusterIPs = new LinkedList<>(); ipMap.put(instance.getClusterName(), clusterIPs); } //新增服务实例 clusterIPs.add(instance); } catch (Exception e) { Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e); } } for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) { //make every ip mine List<Instance> entryIPs = entry.getValue(); //更新为临时节点 clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral); } //设置最后更新的时间 setLastModifiedMillis(System.currentTimeMillis()); //广播,UDP通知通客户端service发生了改变 getPushService().serviceChanged(this); StringBuilder stringBuilder = new StringBuilder(); for (Instance instance : allIPs()) { stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(","); } Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}", getNamespaceId(), getName(), stringBuilder.toString()); } } public class Cluster extends com.alibaba.nacos.api.naming.pojo.Cluster implements Cloneable { @JsonIgnore private HealthCheckTask checkTask; public void init() { if (inited) { return; } //创建健康检查任务 checkTask = new HealthCheckTask(this); //执行健康检查任务 HealthCheckReactor.scheduleCheck(checkTask); inited = true; } }
  • 最核心的就是updateIPs
public class Cluster extends com.alibaba.nacos.api.naming.pojo.Cluster implements Cloneable { @JsonIgnore private Set<Instance> persistentInstances = new HashSet<>(); @JsonIgnore private Set<Instance> ephemeralInstances = new HashSet<>(); public void updateIps(List<Instance> ips, boolean ephemeral) { //拿到cluster中旧的instance列表 Set<Instance> toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances; HashMap<String, Instance> oldIpMap = new HashMap<>(toUpdateInstances.size()); for (Instance ip : toUpdateInstances) { oldIpMap.put(ip.getDatumKey(), ip); } //updatedIps主要做的是找出oldipmap中的实例并返回 List<Instance> updatedIPs = updatedIps(ips, oldIpMap.values()); if (updatedIPs.size() > 0) { for (Instance ip : updatedIPs) { Instance oldIP = oldIpMap.get(ip.getDatumKey()); // do not update the ip validation status of updated ips // because the checker has the most precise result // Only when ip is not marked, don't we update the health status of IP: if (!ip.isMarked()) { ip.setHealthy(oldIP.isHealthy()); } if (ip.isHealthy() != oldIP.isHealthy()) { // ip validation status updated Loggers.EVT_LOG.info("{} {SYNC} IP-{} {}:{}@{}", getService().getName(), (ip.isHealthy() ? "ENABLED" : "DISABLED"), ip.getIp(), ip.getPort(), getName()); } if (ip.getWeight() != oldIP.getWeight()) { // ip validation status updated Loggers.EVT_LOG.info("{} {SYNC} {IP-UPDATED} {}->{}", getService().getName(), oldIP.toString(), ip.toString()); } } } //找出新增的实例列表,即ips中的实例,oldipmap不存在的实例列表 List<Instance> newIPs = subtract(ips, oldIpMap.values()); if (newIPs.size() > 0) { Loggers.EVT_LOG .info("{} {SYNC} {IP-NEW} cluster: {}, new ips size: {}, content: {}", getService().getName(), getName(), newIPs.size(), newIPs.toString()); for (Instance ip : newIPs) { //进行新实例的健康检查设置 HealthCheckStatus.reset(ip); } } //找出oldipmap的ip的实例。不存在于ips中的实例 List<Instance> deadIPs = subtract(oldIpMap.values(), ips); if (deadIPs.size() > 0) { Loggers.EVT_LOG .info("{} {SYNC} {IP-DEAD} cluster: {}, dead ips size: {}, content: {}", getService().getName(), getName(), deadIPs.size(), deadIPs.toString()); for (Instance ip : deadIPs) { //将不存在新实例ip列表的值的健康检查删除 HealthCheckStatus.remv(ip); } } toUpdateInstances = new HashSet<>(ips); //更新服务下cluster的intances列表 if (ephemeral) { ephemeralInstances = toUpdateInstances; } else { persistentInstances = toUpdateInstances; } } }

为了防止读写并发冲突,直接创建了一个新的HashMap,然后去操作新的HashMap,操作完了之后再去替换老的Map数据,CopyOnWrite的思想。最后还发布了服务变化事件。

  • 服务注册通过CopyOnWrite支持并发读写的能力
  • Cluster类中的updateIPs方法中是对原服务IP列表的副本进行操作,注册完成替换原有服务IP列表即可,即CopyOnWrite操作,不需要加锁,性能高,存在服务延迟。

Eureka防止读写冲突用的是多级缓存结构,多级缓存定时同步,客户端感知及时性不如Nacos。

7、同步实例信息到Nacos Server集群其它节点

回到之前的代码,put方法中distroProtocol.sync();进行同步信息到集群其它节点,跟进代码:

@DependsOn("ProtocolManager") @org.springframework.stereotype.Service("distroConsistencyService") public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor { private final DistroProtocol distroProtocol; @Override public void put(String key, Record value) throws NacosException { onPut(key, value); distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE, globalConfig.getTaskDispatchPeriod() / 2); } }
  • 通过newSingleScheduledExecutorService.scheduleWithFixedDelay()定时执行ProcessRunnable任务,发送www.cnblogs.com/chz-blogs/p/14325288.html

    www.cnblogs.com/guoxiaoyu/p/14248226.html

    blog.csdn.net/wangwei19871103/article/details/105834317

    blog.csdn.net/wangwei19871103/article/details/105835207