Spring Cloud Alibaba Nacos如何实现心跳机制与服务健康状态监控?
- 内容介绍
- 文章标签
- 相关推荐
本文共计1682个文字,预计阅读时间需要7分钟。
Nacos服务端心跳+1.1、客户端心跳+Nacos Client会维护一个定时任务,通过持续调用服务端接口更新心跳时间,确保自身处于活跃状态,防止服务端将服务剔除。Nacos默认5秒向服务端发送一次心跳。
一、Nacos服务心跳
1.1、客户端心跳
Nacos Client会维护一个定时任务通过持续调用服务端的接口更新心跳时间,保证自己处于存活状态,防止服务端将服务剔除,Nacos默认5秒向服务端发送一次,通过请求服务端接口**/instance/beat**发送心跳。
客户端服务在注册服务
根据nacos-discovery的META-INF目录下的spring.factories配置来完成相关类的自动装配。
- NacosServiceRegistryAutoConfiguration用来注册管理这几个bean。
-
NacosServiceRegistry:完成服务注册,实现ServiceRegistry。
-
NacosRegistration:用来注册时存储nacos服务端的相关信息。
-
NacosAutoServiceRegistration 继承spring中的AbstractAutoServiceRegistration,AbstractAutoServiceRegistration实现ApplicationListener<WebServerInitializedEvent>,通过事件监听来发起服务注册,到时候会调用NacosServiceRegistry.register(registration)
在NacosServiceRegistry.registry方法中,调用了nacos client sdk中的namingService.registerInstance完成服务注册。
public class NacosServiceRegistry implements ServiceRegistry<Registration> { @Override public void register(Registration registration) { if (StringUtils.isEmpty(registration.getServiceId())) { log.warn("No service to register for nacos client..."); return; } NamingService namingService = namingService(); String serviceId = registration.getServiceId(); String group = nacosDiscoveryProperties.getGroup(); Instance instance = getNacosInstanceFromRegistration(registration); try { namingService.registerInstance(serviceId, group, instance); log.info("nacos registry, {} {} {}:{} register finished", group, serviceId, instance.getIp(), instance.getPort()); } catch (Exception e) { if (nacosDiscoveryProperties.isFailFast()) { log.error("nacos registry, {} register failed...{},", serviceId, registration.toString(), e); rethrowRuntimeException(e); } else { log.warn("Failfast is false. {} register failed...{},", serviceId, registration.toString(), e); } } } }继续看namingService.registerInstance的实现主要就两件事
-
1、beatReactor.addBeatInfo创建心跳信息实现健康检查,Nacos Server必须要确保注册的服务实例是健康的,而心跳检测就是服务监控检测的方式。
-
2、serverProxy.registerService 服务注册。
看下BeatInfo这个类
- 给周期任务设定时间beatInfo.setPeriod(instance.getInstanceHeartBeatInterval())
接下来我们看下addBeatInfo方法,该方法内部主要是将BeatTask任务加入到线程池ScheduledExecutorService当中。
public class BeatReactor implements Closeable { private final ScheduledExecutorService executorService; private final NamingProxy serverProxy; public BeatReactor(NamingProxy serverProxy, int threadCount) { this.serverProxy = serverProxy; //实例化客户端心跳机制线程池 this.executorService = new ScheduledThreadPoolExecutor(threadCount, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setDaemon(true); thread.setName("com.alibaba.nacos.naming.beat.sender"); return thread; } }); } public void addBeatInfo(String serviceName, BeatInfo beatInfo) { NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo); String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort()); BeatInfo existBeat = null; //fix #1733 if ((existBeat = dom2Beat.remove(key)) != null) { existBeat.setStopped(true); } dom2Beat.put(key, beatInfo); //将心跳任务添加到线程池中,发起一个心跳检测任务 executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS); MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size()); } }重点部分就是看BeatTask
BeatTask继承Runnable,run方法就是我们的重点,该方法调用了NamingProxy的sendBeat方法,服务端请求地址为**/instance/beat**的方法。
public class BeatReactor implements Closeable { private final ScheduledExecutorService executorService; private final NamingProxy serverProxy; class BeatTask implements Runnable { BeatInfo beatInfo; public BeatTask(BeatInfo beatInfo) { this.beatInfo = beatInfo; } @Override public void run() { if (beatInfo.isStopped()) { return; } //心跳周期执行时间 long nextTime = beatInfo.getPeriod(); try { //向Nacos Server服务端发送心跳请求 JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled); long interval = result.get("clientBeatInterval").asLong(); boolean lightBeatEnabled = false; if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) { lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean(); } BeatReactor.this.lightBeatEnabled = lightBeatEnabled; if (interval > 0) { nextTime = interval; } int code = NamingResponseCode.OK; if (result.has(CommonParams.CODE)) { code = result.get(CommonParams.CODE).asInt(); } //如果返回资源未找到,则立即重新注册服务 if (code == NamingResponseCode.RESOURCE_NOT_FOUND) { Instance instance = new Instance(); instance.setPort(beatInfo.getPort()); instance.setIp(beatInfo.getIp()); instance.setWeight(beatInfo.getWeight()); instance.setMetadata(beatInfo.getMetadata()); instance.setClusterName(beatInfo.getCluster()); instance.setServiceName(beatInfo.getServiceName()); instance.setInstanceId(instance.getInstanceId()); instance.setEphemeral(true); try { //发送" + IPUtil.localHostIP() + IPUtil.IP_PORT_SPLITER + EnvUtil.getPort() + EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance?" + request.toUrl(); //发送Http删除请求 // delete instance asynchronously: HttpClient.asyncHttpDelete(url, null, null, new Callback<String>() { @Override public void onReceive(RestResult<String> result) { if (!result.ok()) { Loggers.SRV_LOG .error("[IP-DEAD] failed to delete ip automatically, ip: {}, caused {}, resp code: {}", instance.toJson(), result.getMessage(), result.getCode()); } } @Override public void onError(Throwable throwable) { Loggers.SRV_LOG .error("[IP-DEAD] failed to delete ip automatically, ip: {}, error: {}", instance.toJson(), throwable); } @Override public void onCancel() { } }); } catch (Exception e) { Loggers.SRV_LOG .error("[IP-DEAD] failed to delete ip automatically, ip: {}, error: {}", instance.toJson(), e); } } }删除实例的接口
@RestController @RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance") public class InstanceController { @CanDistro @DeleteMapping @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE) public String deregister(HttpServletRequest request) throws Exception { Instance instance = getIpAddress(request); String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); NamingUtils.checkServiceNameFormat(serviceName); Service service = serviceManager.getService(namespaceId, serviceName); if (service == null) { Loggers.SRV_LOG.warn("remove instance from non-exist service: {}", serviceName); return "ok"; } //删除方法 serviceManager.removeInstance(namespaceId, serviceName, instance.isEphemeral(), instance); return "ok"; } }内部通过调用ServiceManager的removeInstance方法
@Component public class ServiceManager implements RecordListener<Service> { public void removeInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException { Service service = getService(namespaceId, serviceName); synchronized (service) { removeInstance(namespaceId, serviceName, ephemeral, service, ips); } } private void removeInstance(String namespaceId, String serviceName, boolean ephemeral, Service service, Instance... ips) throws NacosException { String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral); //排除要删除的实例信息 List<Instance> instanceList = substractIpAddresses(service, ephemeral, ips); Instances instances = new Instances(); instances.setInstanceList(instanceList); //更新实例信息 consistencyService.put(key, instances); } }重点看下substractIpAddresses内部通过调用updateIpAddresses,该方法内部主要就是移除到超过30秒的实例信息
@Component public class ServiceManager implements RecordListener<Service> { private List<Instance> substractIpAddresses(Service service, boolean ephemeral, Instance... ips) throws NacosException { return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE, ephemeral, ips); } public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips) throws NacosException { Datum datum = consistencyService .get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral)); //获取所有实例信息 List<Instance> currentIPs = service.allIPs(ephemeral); Map<String, Instance> currentInstances = new HashMap<>(currentIPs.size()); Set<String> currentInstanceIds = Sets.newHashSet(); for (Instance instance : currentIPs) { currentInstances.put(instance.toIpAddr(), instance); currentInstanceIds.add(instance.getInstanceId()); } //初始化Map Map<String, Instance> instanceMap; if (datum != null && null != datum.value) { instanceMap = setValid(((Instances) datum.value).getInstanceList(), currentInstances); } else { instanceMap = new HashMap<>(ips.length); } for (Instance instance : ips) { if (!service.getClusterMap().containsKey(instance.getClusterName())) { Cluster cluster = new Cluster(instance.getClusterName(), service); cluster.init(); service.getClusterMap().put(instance.getClusterName(), cluster); Loggers.SRV_LOG .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.", instance.getClusterName(), instance.toJson()); } //移除超过30秒的实例信息 if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) { instanceMap.remove(instance.getDatumKey()); } else { Instance oldInstance = instanceMap.get(instance.getDatumKey()); if (oldInstance != null) { instance.setInstanceId(oldInstance.getInstanceId()); } else { instance.setInstanceId(instance.generateInstanceId(currentInstanceIds)); } instanceMap.put(instance.getDatumKey(), instance); } } if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) { throw new IllegalArgumentException( "ip list can not be empty, service: " + service.getName() + ", ip list: " + JacksonUtils .toJson(instanceMap.values())); } return new ArrayList<>(instanceMap.values()); } }心跳机制简单图
参考: blog.csdn.net/jb84006/article/details/117634375
www.cnblogs.com/wtzbk/p/14366240.html
本文共计1682个文字,预计阅读时间需要7分钟。
Nacos服务端心跳+1.1、客户端心跳+Nacos Client会维护一个定时任务,通过持续调用服务端接口更新心跳时间,确保自身处于活跃状态,防止服务端将服务剔除。Nacos默认5秒向服务端发送一次心跳。
一、Nacos服务心跳
1.1、客户端心跳
Nacos Client会维护一个定时任务通过持续调用服务端的接口更新心跳时间,保证自己处于存活状态,防止服务端将服务剔除,Nacos默认5秒向服务端发送一次,通过请求服务端接口**/instance/beat**发送心跳。
客户端服务在注册服务
根据nacos-discovery的META-INF目录下的spring.factories配置来完成相关类的自动装配。
- NacosServiceRegistryAutoConfiguration用来注册管理这几个bean。
-
NacosServiceRegistry:完成服务注册,实现ServiceRegistry。
-
NacosRegistration:用来注册时存储nacos服务端的相关信息。
-
NacosAutoServiceRegistration 继承spring中的AbstractAutoServiceRegistration,AbstractAutoServiceRegistration实现ApplicationListener<WebServerInitializedEvent>,通过事件监听来发起服务注册,到时候会调用NacosServiceRegistry.register(registration)
在NacosServiceRegistry.registry方法中,调用了nacos client sdk中的namingService.registerInstance完成服务注册。
public class NacosServiceRegistry implements ServiceRegistry<Registration> { @Override public void register(Registration registration) { if (StringUtils.isEmpty(registration.getServiceId())) { log.warn("No service to register for nacos client..."); return; } NamingService namingService = namingService(); String serviceId = registration.getServiceId(); String group = nacosDiscoveryProperties.getGroup(); Instance instance = getNacosInstanceFromRegistration(registration); try { namingService.registerInstance(serviceId, group, instance); log.info("nacos registry, {} {} {}:{} register finished", group, serviceId, instance.getIp(), instance.getPort()); } catch (Exception e) { if (nacosDiscoveryProperties.isFailFast()) { log.error("nacos registry, {} register failed...{},", serviceId, registration.toString(), e); rethrowRuntimeException(e); } else { log.warn("Failfast is false. {} register failed...{},", serviceId, registration.toString(), e); } } } }继续看namingService.registerInstance的实现主要就两件事
-
1、beatReactor.addBeatInfo创建心跳信息实现健康检查,Nacos Server必须要确保注册的服务实例是健康的,而心跳检测就是服务监控检测的方式。
-
2、serverProxy.registerService 服务注册。
看下BeatInfo这个类
- 给周期任务设定时间beatInfo.setPeriod(instance.getInstanceHeartBeatInterval())
接下来我们看下addBeatInfo方法,该方法内部主要是将BeatTask任务加入到线程池ScheduledExecutorService当中。
public class BeatReactor implements Closeable { private final ScheduledExecutorService executorService; private final NamingProxy serverProxy; public BeatReactor(NamingProxy serverProxy, int threadCount) { this.serverProxy = serverProxy; //实例化客户端心跳机制线程池 this.executorService = new ScheduledThreadPoolExecutor(threadCount, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setDaemon(true); thread.setName("com.alibaba.nacos.naming.beat.sender"); return thread; } }); } public void addBeatInfo(String serviceName, BeatInfo beatInfo) { NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo); String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort()); BeatInfo existBeat = null; //fix #1733 if ((existBeat = dom2Beat.remove(key)) != null) { existBeat.setStopped(true); } dom2Beat.put(key, beatInfo); //将心跳任务添加到线程池中,发起一个心跳检测任务 executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS); MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size()); } }重点部分就是看BeatTask
BeatTask继承Runnable,run方法就是我们的重点,该方法调用了NamingProxy的sendBeat方法,服务端请求地址为**/instance/beat**的方法。
public class BeatReactor implements Closeable { private final ScheduledExecutorService executorService; private final NamingProxy serverProxy; class BeatTask implements Runnable { BeatInfo beatInfo; public BeatTask(BeatInfo beatInfo) { this.beatInfo = beatInfo; } @Override public void run() { if (beatInfo.isStopped()) { return; } //心跳周期执行时间 long nextTime = beatInfo.getPeriod(); try { //向Nacos Server服务端发送心跳请求 JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled); long interval = result.get("clientBeatInterval").asLong(); boolean lightBeatEnabled = false; if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) { lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean(); } BeatReactor.this.lightBeatEnabled = lightBeatEnabled; if (interval > 0) { nextTime = interval; } int code = NamingResponseCode.OK; if (result.has(CommonParams.CODE)) { code = result.get(CommonParams.CODE).asInt(); } //如果返回资源未找到,则立即重新注册服务 if (code == NamingResponseCode.RESOURCE_NOT_FOUND) { Instance instance = new Instance(); instance.setPort(beatInfo.getPort()); instance.setIp(beatInfo.getIp()); instance.setWeight(beatInfo.getWeight()); instance.setMetadata(beatInfo.getMetadata()); instance.setClusterName(beatInfo.getCluster()); instance.setServiceName(beatInfo.getServiceName()); instance.setInstanceId(instance.getInstanceId()); instance.setEphemeral(true); try { //发送" + IPUtil.localHostIP() + IPUtil.IP_PORT_SPLITER + EnvUtil.getPort() + EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance?" + request.toUrl(); //发送Http删除请求 // delete instance asynchronously: HttpClient.asyncHttpDelete(url, null, null, new Callback<String>() { @Override public void onReceive(RestResult<String> result) { if (!result.ok()) { Loggers.SRV_LOG .error("[IP-DEAD] failed to delete ip automatically, ip: {}, caused {}, resp code: {}", instance.toJson(), result.getMessage(), result.getCode()); } } @Override public void onError(Throwable throwable) { Loggers.SRV_LOG .error("[IP-DEAD] failed to delete ip automatically, ip: {}, error: {}", instance.toJson(), throwable); } @Override public void onCancel() { } }); } catch (Exception e) { Loggers.SRV_LOG .error("[IP-DEAD] failed to delete ip automatically, ip: {}, error: {}", instance.toJson(), e); } } }删除实例的接口
@RestController @RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance") public class InstanceController { @CanDistro @DeleteMapping @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE) public String deregister(HttpServletRequest request) throws Exception { Instance instance = getIpAddress(request); String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); NamingUtils.checkServiceNameFormat(serviceName); Service service = serviceManager.getService(namespaceId, serviceName); if (service == null) { Loggers.SRV_LOG.warn("remove instance from non-exist service: {}", serviceName); return "ok"; } //删除方法 serviceManager.removeInstance(namespaceId, serviceName, instance.isEphemeral(), instance); return "ok"; } }内部通过调用ServiceManager的removeInstance方法
@Component public class ServiceManager implements RecordListener<Service> { public void removeInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException { Service service = getService(namespaceId, serviceName); synchronized (service) { removeInstance(namespaceId, serviceName, ephemeral, service, ips); } } private void removeInstance(String namespaceId, String serviceName, boolean ephemeral, Service service, Instance... ips) throws NacosException { String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral); //排除要删除的实例信息 List<Instance> instanceList = substractIpAddresses(service, ephemeral, ips); Instances instances = new Instances(); instances.setInstanceList(instanceList); //更新实例信息 consistencyService.put(key, instances); } }重点看下substractIpAddresses内部通过调用updateIpAddresses,该方法内部主要就是移除到超过30秒的实例信息
@Component public class ServiceManager implements RecordListener<Service> { private List<Instance> substractIpAddresses(Service service, boolean ephemeral, Instance... ips) throws NacosException { return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE, ephemeral, ips); } public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips) throws NacosException { Datum datum = consistencyService .get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral)); //获取所有实例信息 List<Instance> currentIPs = service.allIPs(ephemeral); Map<String, Instance> currentInstances = new HashMap<>(currentIPs.size()); Set<String> currentInstanceIds = Sets.newHashSet(); for (Instance instance : currentIPs) { currentInstances.put(instance.toIpAddr(), instance); currentInstanceIds.add(instance.getInstanceId()); } //初始化Map Map<String, Instance> instanceMap; if (datum != null && null != datum.value) { instanceMap = setValid(((Instances) datum.value).getInstanceList(), currentInstances); } else { instanceMap = new HashMap<>(ips.length); } for (Instance instance : ips) { if (!service.getClusterMap().containsKey(instance.getClusterName())) { Cluster cluster = new Cluster(instance.getClusterName(), service); cluster.init(); service.getClusterMap().put(instance.getClusterName(), cluster); Loggers.SRV_LOG .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.", instance.getClusterName(), instance.toJson()); } //移除超过30秒的实例信息 if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) { instanceMap.remove(instance.getDatumKey()); } else { Instance oldInstance = instanceMap.get(instance.getDatumKey()); if (oldInstance != null) { instance.setInstanceId(oldInstance.getInstanceId()); } else { instance.setInstanceId(instance.generateInstanceId(currentInstanceIds)); } instanceMap.put(instance.getDatumKey(), instance); } } if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) { throw new IllegalArgumentException( "ip list can not be empty, service: " + service.getName() + ", ip list: " + JacksonUtils .toJson(instanceMap.values())); } return new ArrayList<>(instanceMap.values()); } }心跳机制简单图
参考: blog.csdn.net/jb84006/article/details/117634375
www.cnblogs.com/wtzbk/p/14366240.html

