Spring Cloud Alibaba Nacos如何实现心跳机制与服务健康状态监控?

2026-05-21 03:513阅读0评论SEO基础
  • 内容介绍
  • 文章标签
  • 相关推荐

本文共计1682个文字,预计阅读时间需要7分钟。

Spring Cloud Alibaba Nacos如何实现心跳机制与服务健康状态监控?

Nacos服务端心跳+1.1、客户端心跳+Nacos Client会维护一个定时任务,通过持续调用服务端接口更新心跳时间,确保自身处于活跃状态,防止服务端将服务剔除。Nacos默认5秒向服务端发送一次心跳。

一、Nacos服务心跳

1.1、客户端心跳

Nacos Client会维护一个定时任务通过持续调用服务端的接口更新心跳时间,保证自己处于存活状态,防止服务端将服务剔除,Nacos默认5秒向服务端发送一次,通过请求服务端接口**/instance/beat**发送心跳。

Spring Cloud Alibaba Nacos如何实现心跳机制与服务健康状态监控?

客户端服务在注册服务

根据nacos-discovery的META-INF目录下的spring.factories配置来完成相关类的自动装配。

  • NacosServiceRegistryAutoConfiguration用来注册管理这几个bean。
@Configuration(proxyBeanMethods = false) @EnableConfigurationProperties @ConditionalOnNacosDiscoveryEnabled @ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true) @AutoConfigureAfter({ AutoServiceRegistrationConfiguration.class, AutoServiceRegistrationAutoConfiguration.class, NacosDiscoveryAutoConfiguration.class }) public class NacosServiceRegistryAutoConfiguration { @Bean public NacosServiceRegistry nacosServiceRegistry( NacosDiscoveryProperties nacosDiscoveryProperties) { return new NacosServiceRegistry(nacosDiscoveryProperties); } @Bean @ConditionalOnBean(AutoServiceRegistrationProperties.class) public NacosRegistration nacosRegistration( ObjectProvider<List<NacosRegistrationCustomizer>> registrationCustomizers, NacosDiscoveryProperties nacosDiscoveryProperties, ApplicationContext context) { return new NacosRegistration(registrationCustomizers.getIfAvailable(), nacosDiscoveryProperties, context); } @Bean @ConditionalOnBean(AutoServiceRegistrationProperties.class) public NacosAutoServiceRegistration nacosAutoServiceRegistration( NacosServiceRegistry registry, AutoServiceRegistrationProperties autoServiceRegistrationProperties, NacosRegistration registration) { return new NacosAutoServiceRegistration(registry, autoServiceRegistrationProperties, registration); } }
  • NacosServiceRegistry:完成服务注册,实现ServiceRegistry。

  • NacosRegistration:用来注册时存储nacos服务端的相关信息。

  • NacosAutoServiceRegistration 继承spring中的AbstractAutoServiceRegistration,AbstractAutoServiceRegistration实现ApplicationListener<WebServerInitializedEvent>,通过事件监听来发起服务注册,到时候会调用NacosServiceRegistry.register(registration)

在NacosServiceRegistry.registry方法中,调用了nacos client sdk中的namingService.registerInstance完成服务注册。

public class NacosServiceRegistry implements ServiceRegistry<Registration> { @Override public void register(Registration registration) { if (StringUtils.isEmpty(registration.getServiceId())) { log.warn("No service to register for nacos client..."); return; } NamingService namingService = namingService(); String serviceId = registration.getServiceId(); String group = nacosDiscoveryProperties.getGroup(); Instance instance = getNacosInstanceFromRegistration(registration); try { namingService.registerInstance(serviceId, group, instance); log.info("nacos registry, {} {} {}:{} register finished", group, serviceId, instance.getIp(), instance.getPort()); } catch (Exception e) { if (nacosDiscoveryProperties.isFailFast()) { log.error("nacos registry, {} register failed...{},", serviceId, registration.toString(), e); rethrowRuntimeException(e); } else { log.warn("Failfast is false. {} register failed...{},", serviceId, registration.toString(), e); } } } }

继续看namingService.registerInstance的实现主要就两件事

  • 1、beatReactor.addBeatInfo创建心跳信息实现健康检查,Nacos Server必须要确保注册的服务实例是健康的,而心跳检测就是服务监控检测的方式。

  • 2、serverProxy.registerService 服务注册。

public class NacosNamingService implements NamingService { private BeatReactor beatReactor; private NamingProxy serverProxy; @Override public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException { NamingUtils.checkInstanceIsLegal(instance); //创建group@@servciceName String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName); //如果当前实例是临时实例(默认是临时实例) if (instance.isEphemeral()) { // 创建心跳信息 BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance); //beanReactor添加心跳信息检测 beatReactor.addBeatInfo(groupedServiceName, beatInfo); } //发送请求向nacos注册服务 serverProxy.registerService(groupedServiceName, groupName, instance); } }

看下BeatInfo这个类

  • 给周期任务设定时间beatInfo.setPeriod(instance.getInstanceHeartBeatInterval())
public class BeatReactor implements Closeable { public BeatInfo buildBeatInfo(String groupedServiceName, Instance instance) { BeatInfo beatInfo = new BeatInfo(); //服务名称 beatInfo.setServiceName(groupedServiceName); //IP beatInfo.setIp(instance.getIp()); //端口 beatInfo.setPort(instance.getPort()); //集群名称 beatInfo.setCluster(instance.getClusterName()); //权重 beatInfo.setWeight(instance.getWeight()); //元数据 beatInfo.setMetadata(instance.getMetadata()); beatInfo.setScheduled(false); //心跳周期,给周期任务设定时间 beatInfo.setPeriod(instance.getInstanceHeartBeatInterval()); return beatInfo; } } public class Instance implements Serializable { public long getInstanceHeartBeatInterval() { return getMetaDataByKeyWithDefault(PreservedMetadataKeys.HEART_BEAT_INTERVAL, Constants.DEFAULT_HEART_BEAT_INTERVAL); } } public class Constants { //Constants内部定义的一个DEFAULT_HEART_BEAT_INTERVAL的常量,设定5秒: public static final long DEFAULT_HEART_BEAT_INTERVAL = TimeUnit.SECONDS.toMillis(5); }

接下来我们看下addBeatInfo方法,该方法内部主要是将BeatTask任务加入到线程池ScheduledExecutorService当中。

public class BeatReactor implements Closeable { private final ScheduledExecutorService executorService; private final NamingProxy serverProxy; public BeatReactor(NamingProxy serverProxy, int threadCount) { this.serverProxy = serverProxy; //实例化客户端心跳机制线程池 this.executorService = new ScheduledThreadPoolExecutor(threadCount, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setDaemon(true); thread.setName("com.alibaba.nacos.naming.beat.sender"); return thread; } }); } public void addBeatInfo(String serviceName, BeatInfo beatInfo) { NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo); String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort()); BeatInfo existBeat = null; //fix #1733 if ((existBeat = dom2Beat.remove(key)) != null) { existBeat.setStopped(true); } dom2Beat.put(key, beatInfo); //将心跳任务添加到线程池中,发起一个心跳检测任务 executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS); MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size()); } }

重点部分就是看BeatTask

BeatTask继承Runnable,run方法就是我们的重点,该方法调用了NamingProxy的sendBeat方法,服务端请求地址为**/instance/beat**的方法。

public class BeatReactor implements Closeable { private final ScheduledExecutorService executorService; private final NamingProxy serverProxy; class BeatTask implements Runnable { BeatInfo beatInfo; public BeatTask(BeatInfo beatInfo) { this.beatInfo = beatInfo; } @Override public void run() { if (beatInfo.isStopped()) { return; } //心跳周期执行时间 long nextTime = beatInfo.getPeriod(); try { //向Nacos Server服务端发送心跳请求 JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled); long interval = result.get("clientBeatInterval").asLong(); boolean lightBeatEnabled = false; if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) { lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean(); } BeatReactor.this.lightBeatEnabled = lightBeatEnabled; if (interval > 0) { nextTime = interval; } int code = NamingResponseCode.OK; if (result.has(CommonParams.CODE)) { code = result.get(CommonParams.CODE).asInt(); } //如果返回资源未找到,则立即重新注册服务 if (code == NamingResponseCode.RESOURCE_NOT_FOUND) { Instance instance = new Instance(); instance.setPort(beatInfo.getPort()); instance.setIp(beatInfo.getIp()); instance.setWeight(beatInfo.getWeight()); instance.setMetadata(beatInfo.getMetadata()); instance.setClusterName(beatInfo.getCluster()); instance.setServiceName(beatInfo.getServiceName()); instance.setInstanceId(instance.getInstanceId()); instance.setEphemeral(true); try { //发送" + IPUtil.localHostIP() + IPUtil.IP_PORT_SPLITER + EnvUtil.getPort() + EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance?" + request.toUrl(); //发送Http删除请求 // delete instance asynchronously: HttpClient.asyncHttpDelete(url, null, null, new Callback<String>() { @Override public void onReceive(RestResult<String> result) { if (!result.ok()) { Loggers.SRV_LOG .error("[IP-DEAD] failed to delete ip automatically, ip: {}, caused {}, resp code: {}", instance.toJson(), result.getMessage(), result.getCode()); } } @Override public void onError(Throwable throwable) { Loggers.SRV_LOG .error("[IP-DEAD] failed to delete ip automatically, ip: {}, error: {}", instance.toJson(), throwable); } @Override public void onCancel() { } }); } catch (Exception e) { Loggers.SRV_LOG .error("[IP-DEAD] failed to delete ip automatically, ip: {}, error: {}", instance.toJson(), e); } } }

删除实例的接口

@RestController @RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance") public class InstanceController { @CanDistro @DeleteMapping @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE) public String deregister(HttpServletRequest request) throws Exception { Instance instance = getIpAddress(request); String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); NamingUtils.checkServiceNameFormat(serviceName); Service service = serviceManager.getService(namespaceId, serviceName); if (service == null) { Loggers.SRV_LOG.warn("remove instance from non-exist service: {}", serviceName); return "ok"; } //删除方法 serviceManager.removeInstance(namespaceId, serviceName, instance.isEphemeral(), instance); return "ok"; } }

内部通过调用ServiceManager的removeInstance方法

@Component public class ServiceManager implements RecordListener<Service> { public void removeInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException { Service service = getService(namespaceId, serviceName); synchronized (service) { removeInstance(namespaceId, serviceName, ephemeral, service, ips); } } private void removeInstance(String namespaceId, String serviceName, boolean ephemeral, Service service, Instance... ips) throws NacosException { String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral); //排除要删除的实例信息 List<Instance> instanceList = substractIpAddresses(service, ephemeral, ips); Instances instances = new Instances(); instances.setInstanceList(instanceList); //更新实例信息 consistencyService.put(key, instances); } }

重点看下substractIpAddresses内部通过调用updateIpAddresses,该方法内部主要就是移除到超过30秒的实例信息

@Component public class ServiceManager implements RecordListener<Service> { private List<Instance> substractIpAddresses(Service service, boolean ephemeral, Instance... ips) throws NacosException { return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE, ephemeral, ips); } public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips) throws NacosException { Datum datum = consistencyService .get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral)); //获取所有实例信息 List<Instance> currentIPs = service.allIPs(ephemeral); Map<String, Instance> currentInstances = new HashMap<>(currentIPs.size()); Set<String> currentInstanceIds = Sets.newHashSet(); for (Instance instance : currentIPs) { currentInstances.put(instance.toIpAddr(), instance); currentInstanceIds.add(instance.getInstanceId()); } //初始化Map Map<String, Instance> instanceMap; if (datum != null && null != datum.value) { instanceMap = setValid(((Instances) datum.value).getInstanceList(), currentInstances); } else { instanceMap = new HashMap<>(ips.length); } for (Instance instance : ips) { if (!service.getClusterMap().containsKey(instance.getClusterName())) { Cluster cluster = new Cluster(instance.getClusterName(), service); cluster.init(); service.getClusterMap().put(instance.getClusterName(), cluster); Loggers.SRV_LOG .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.", instance.getClusterName(), instance.toJson()); } //移除超过30秒的实例信息 if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) { instanceMap.remove(instance.getDatumKey()); } else { Instance oldInstance = instanceMap.get(instance.getDatumKey()); if (oldInstance != null) { instance.setInstanceId(oldInstance.getInstanceId()); } else { instance.setInstanceId(instance.generateInstanceId(currentInstanceIds)); } instanceMap.put(instance.getDatumKey(), instance); } } if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) { throw new IllegalArgumentException( "ip list can not be empty, service: " + service.getName() + ", ip list: " + JacksonUtils .toJson(instanceMap.values())); } return new ArrayList<>(instanceMap.values()); } }

心跳机制简单图

参考: blog.csdn.net/jb84006/article/details/117634375

www.cnblogs.com/wtzbk/p/14366240.html

本文共计1682个文字,预计阅读时间需要7分钟。

Spring Cloud Alibaba Nacos如何实现心跳机制与服务健康状态监控?

Nacos服务端心跳+1.1、客户端心跳+Nacos Client会维护一个定时任务,通过持续调用服务端接口更新心跳时间,确保自身处于活跃状态,防止服务端将服务剔除。Nacos默认5秒向服务端发送一次心跳。

一、Nacos服务心跳

1.1、客户端心跳

Nacos Client会维护一个定时任务通过持续调用服务端的接口更新心跳时间,保证自己处于存活状态,防止服务端将服务剔除,Nacos默认5秒向服务端发送一次,通过请求服务端接口**/instance/beat**发送心跳。

Spring Cloud Alibaba Nacos如何实现心跳机制与服务健康状态监控?

客户端服务在注册服务

根据nacos-discovery的META-INF目录下的spring.factories配置来完成相关类的自动装配。

  • NacosServiceRegistryAutoConfiguration用来注册管理这几个bean。
@Configuration(proxyBeanMethods = false) @EnableConfigurationProperties @ConditionalOnNacosDiscoveryEnabled @ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true) @AutoConfigureAfter({ AutoServiceRegistrationConfiguration.class, AutoServiceRegistrationAutoConfiguration.class, NacosDiscoveryAutoConfiguration.class }) public class NacosServiceRegistryAutoConfiguration { @Bean public NacosServiceRegistry nacosServiceRegistry( NacosDiscoveryProperties nacosDiscoveryProperties) { return new NacosServiceRegistry(nacosDiscoveryProperties); } @Bean @ConditionalOnBean(AutoServiceRegistrationProperties.class) public NacosRegistration nacosRegistration( ObjectProvider<List<NacosRegistrationCustomizer>> registrationCustomizers, NacosDiscoveryProperties nacosDiscoveryProperties, ApplicationContext context) { return new NacosRegistration(registrationCustomizers.getIfAvailable(), nacosDiscoveryProperties, context); } @Bean @ConditionalOnBean(AutoServiceRegistrationProperties.class) public NacosAutoServiceRegistration nacosAutoServiceRegistration( NacosServiceRegistry registry, AutoServiceRegistrationProperties autoServiceRegistrationProperties, NacosRegistration registration) { return new NacosAutoServiceRegistration(registry, autoServiceRegistrationProperties, registration); } }
  • NacosServiceRegistry:完成服务注册,实现ServiceRegistry。

  • NacosRegistration:用来注册时存储nacos服务端的相关信息。

  • NacosAutoServiceRegistration 继承spring中的AbstractAutoServiceRegistration,AbstractAutoServiceRegistration实现ApplicationListener<WebServerInitializedEvent>,通过事件监听来发起服务注册,到时候会调用NacosServiceRegistry.register(registration)

在NacosServiceRegistry.registry方法中,调用了nacos client sdk中的namingService.registerInstance完成服务注册。

public class NacosServiceRegistry implements ServiceRegistry<Registration> { @Override public void register(Registration registration) { if (StringUtils.isEmpty(registration.getServiceId())) { log.warn("No service to register for nacos client..."); return; } NamingService namingService = namingService(); String serviceId = registration.getServiceId(); String group = nacosDiscoveryProperties.getGroup(); Instance instance = getNacosInstanceFromRegistration(registration); try { namingService.registerInstance(serviceId, group, instance); log.info("nacos registry, {} {} {}:{} register finished", group, serviceId, instance.getIp(), instance.getPort()); } catch (Exception e) { if (nacosDiscoveryProperties.isFailFast()) { log.error("nacos registry, {} register failed...{},", serviceId, registration.toString(), e); rethrowRuntimeException(e); } else { log.warn("Failfast is false. {} register failed...{},", serviceId, registration.toString(), e); } } } }

继续看namingService.registerInstance的实现主要就两件事

  • 1、beatReactor.addBeatInfo创建心跳信息实现健康检查,Nacos Server必须要确保注册的服务实例是健康的,而心跳检测就是服务监控检测的方式。

  • 2、serverProxy.registerService 服务注册。

public class NacosNamingService implements NamingService { private BeatReactor beatReactor; private NamingProxy serverProxy; @Override public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException { NamingUtils.checkInstanceIsLegal(instance); //创建group@@servciceName String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName); //如果当前实例是临时实例(默认是临时实例) if (instance.isEphemeral()) { // 创建心跳信息 BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance); //beanReactor添加心跳信息检测 beatReactor.addBeatInfo(groupedServiceName, beatInfo); } //发送请求向nacos注册服务 serverProxy.registerService(groupedServiceName, groupName, instance); } }

看下BeatInfo这个类

  • 给周期任务设定时间beatInfo.setPeriod(instance.getInstanceHeartBeatInterval())
public class BeatReactor implements Closeable { public BeatInfo buildBeatInfo(String groupedServiceName, Instance instance) { BeatInfo beatInfo = new BeatInfo(); //服务名称 beatInfo.setServiceName(groupedServiceName); //IP beatInfo.setIp(instance.getIp()); //端口 beatInfo.setPort(instance.getPort()); //集群名称 beatInfo.setCluster(instance.getClusterName()); //权重 beatInfo.setWeight(instance.getWeight()); //元数据 beatInfo.setMetadata(instance.getMetadata()); beatInfo.setScheduled(false); //心跳周期,给周期任务设定时间 beatInfo.setPeriod(instance.getInstanceHeartBeatInterval()); return beatInfo; } } public class Instance implements Serializable { public long getInstanceHeartBeatInterval() { return getMetaDataByKeyWithDefault(PreservedMetadataKeys.HEART_BEAT_INTERVAL, Constants.DEFAULT_HEART_BEAT_INTERVAL); } } public class Constants { //Constants内部定义的一个DEFAULT_HEART_BEAT_INTERVAL的常量,设定5秒: public static final long DEFAULT_HEART_BEAT_INTERVAL = TimeUnit.SECONDS.toMillis(5); }

接下来我们看下addBeatInfo方法,该方法内部主要是将BeatTask任务加入到线程池ScheduledExecutorService当中。

public class BeatReactor implements Closeable { private final ScheduledExecutorService executorService; private final NamingProxy serverProxy; public BeatReactor(NamingProxy serverProxy, int threadCount) { this.serverProxy = serverProxy; //实例化客户端心跳机制线程池 this.executorService = new ScheduledThreadPoolExecutor(threadCount, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setDaemon(true); thread.setName("com.alibaba.nacos.naming.beat.sender"); return thread; } }); } public void addBeatInfo(String serviceName, BeatInfo beatInfo) { NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo); String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort()); BeatInfo existBeat = null; //fix #1733 if ((existBeat = dom2Beat.remove(key)) != null) { existBeat.setStopped(true); } dom2Beat.put(key, beatInfo); //将心跳任务添加到线程池中,发起一个心跳检测任务 executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS); MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size()); } }

重点部分就是看BeatTask

BeatTask继承Runnable,run方法就是我们的重点,该方法调用了NamingProxy的sendBeat方法,服务端请求地址为**/instance/beat**的方法。

public class BeatReactor implements Closeable { private final ScheduledExecutorService executorService; private final NamingProxy serverProxy; class BeatTask implements Runnable { BeatInfo beatInfo; public BeatTask(BeatInfo beatInfo) { this.beatInfo = beatInfo; } @Override public void run() { if (beatInfo.isStopped()) { return; } //心跳周期执行时间 long nextTime = beatInfo.getPeriod(); try { //向Nacos Server服务端发送心跳请求 JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled); long interval = result.get("clientBeatInterval").asLong(); boolean lightBeatEnabled = false; if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) { lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean(); } BeatReactor.this.lightBeatEnabled = lightBeatEnabled; if (interval > 0) { nextTime = interval; } int code = NamingResponseCode.OK; if (result.has(CommonParams.CODE)) { code = result.get(CommonParams.CODE).asInt(); } //如果返回资源未找到,则立即重新注册服务 if (code == NamingResponseCode.RESOURCE_NOT_FOUND) { Instance instance = new Instance(); instance.setPort(beatInfo.getPort()); instance.setIp(beatInfo.getIp()); instance.setWeight(beatInfo.getWeight()); instance.setMetadata(beatInfo.getMetadata()); instance.setClusterName(beatInfo.getCluster()); instance.setServiceName(beatInfo.getServiceName()); instance.setInstanceId(instance.getInstanceId()); instance.setEphemeral(true); try { //发送" + IPUtil.localHostIP() + IPUtil.IP_PORT_SPLITER + EnvUtil.getPort() + EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance?" + request.toUrl(); //发送Http删除请求 // delete instance asynchronously: HttpClient.asyncHttpDelete(url, null, null, new Callback<String>() { @Override public void onReceive(RestResult<String> result) { if (!result.ok()) { Loggers.SRV_LOG .error("[IP-DEAD] failed to delete ip automatically, ip: {}, caused {}, resp code: {}", instance.toJson(), result.getMessage(), result.getCode()); } } @Override public void onError(Throwable throwable) { Loggers.SRV_LOG .error("[IP-DEAD] failed to delete ip automatically, ip: {}, error: {}", instance.toJson(), throwable); } @Override public void onCancel() { } }); } catch (Exception e) { Loggers.SRV_LOG .error("[IP-DEAD] failed to delete ip automatically, ip: {}, error: {}", instance.toJson(), e); } } }

删除实例的接口

@RestController @RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance") public class InstanceController { @CanDistro @DeleteMapping @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE) public String deregister(HttpServletRequest request) throws Exception { Instance instance = getIpAddress(request); String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); NamingUtils.checkServiceNameFormat(serviceName); Service service = serviceManager.getService(namespaceId, serviceName); if (service == null) { Loggers.SRV_LOG.warn("remove instance from non-exist service: {}", serviceName); return "ok"; } //删除方法 serviceManager.removeInstance(namespaceId, serviceName, instance.isEphemeral(), instance); return "ok"; } }

内部通过调用ServiceManager的removeInstance方法

@Component public class ServiceManager implements RecordListener<Service> { public void removeInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException { Service service = getService(namespaceId, serviceName); synchronized (service) { removeInstance(namespaceId, serviceName, ephemeral, service, ips); } } private void removeInstance(String namespaceId, String serviceName, boolean ephemeral, Service service, Instance... ips) throws NacosException { String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral); //排除要删除的实例信息 List<Instance> instanceList = substractIpAddresses(service, ephemeral, ips); Instances instances = new Instances(); instances.setInstanceList(instanceList); //更新实例信息 consistencyService.put(key, instances); } }

重点看下substractIpAddresses内部通过调用updateIpAddresses,该方法内部主要就是移除到超过30秒的实例信息

@Component public class ServiceManager implements RecordListener<Service> { private List<Instance> substractIpAddresses(Service service, boolean ephemeral, Instance... ips) throws NacosException { return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE, ephemeral, ips); } public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips) throws NacosException { Datum datum = consistencyService .get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral)); //获取所有实例信息 List<Instance> currentIPs = service.allIPs(ephemeral); Map<String, Instance> currentInstances = new HashMap<>(currentIPs.size()); Set<String> currentInstanceIds = Sets.newHashSet(); for (Instance instance : currentIPs) { currentInstances.put(instance.toIpAddr(), instance); currentInstanceIds.add(instance.getInstanceId()); } //初始化Map Map<String, Instance> instanceMap; if (datum != null && null != datum.value) { instanceMap = setValid(((Instances) datum.value).getInstanceList(), currentInstances); } else { instanceMap = new HashMap<>(ips.length); } for (Instance instance : ips) { if (!service.getClusterMap().containsKey(instance.getClusterName())) { Cluster cluster = new Cluster(instance.getClusterName(), service); cluster.init(); service.getClusterMap().put(instance.getClusterName(), cluster); Loggers.SRV_LOG .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.", instance.getClusterName(), instance.toJson()); } //移除超过30秒的实例信息 if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) { instanceMap.remove(instance.getDatumKey()); } else { Instance oldInstance = instanceMap.get(instance.getDatumKey()); if (oldInstance != null) { instance.setInstanceId(oldInstance.getInstanceId()); } else { instance.setInstanceId(instance.generateInstanceId(currentInstanceIds)); } instanceMap.put(instance.getDatumKey(), instance); } } if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) { throw new IllegalArgumentException( "ip list can not be empty, service: " + service.getName() + ", ip list: " + JacksonUtils .toJson(instanceMap.values())); } return new ArrayList<>(instanceMap.values()); } }

心跳机制简单图

参考: blog.csdn.net/jb84006/article/details/117634375

www.cnblogs.com/wtzbk/p/14366240.html