Spring Cloud Alibaba Nacos服务发现源码解析是怎样的?
- 内容介绍
- 文章标签
- 相关推荐
本文共计3029个文字,预计阅读时间需要13分钟。
Nacos服务发现领域模型:Namespace+Namespace实现不同环境的隔离(如开发、测试、预发、线上),默认public Group:不同服务可分配到同一组,默认DEFAULT_GROUP+Service:微服务+Cluster指定特定微服务的集群
Nacos服务发现的领域模型
- Namespace:实现各环境的隔离(如开发、测试、预发、线上),默认public
- Group:不同服务可以分到同一个组,默认DEFAULT_GROUP
- Service:微服务
- Cluster:对指定微服务的一个虚拟划分,默认DEFAULT
- Instance:微服务实例
- persistentInstances:持久实例集合
- ephemeralInstances:临时实例集合
一、服务发现前
目前在Spring Cloud,RPC基本都是使用Feign去调用服务,Feign其实也是Ribbon的一个封装,主要功能,是将我们通常www.jianshu.com/p/f3db11f045cc
ribbon最最底层也是实现spring cloud common包下的
-
org.springframework.cloud.alibaba.nacos.ribbon.NacosServerList 主要是ServiceInstanceChooser 下的继承类。
-
org.springframework.cloud.client.loadbalancer.LoadBalancerClient 这是Ribbon实现负载均衡的父类接口,接下来一系列的接口实现最终会落到如何获取serverList这个问题是,答案在这个接口:com.netflix.loadbalancer.ServerList
接下来,就是服务发现组件的事情了
-
eureka对于这个接口的实现就是DiscoveryEnabledNIWSServerList
-
Nacos的实现就是org.springframework.cloud.alibaba.nacos.ribbon.NacosServerList,这也是我们的重点。
二、服务发现
Nacos Client发起RPC调用请求后,通过RibbonLoadBalancerClient的getLoadBalancer方法获取负载均衡器,因为Spring Cloud默认指定了ZoneAwareLoadBalancer,但是ZoneAwareLoadBalancer的构造函数初始化父类DynamicServerListLoadBalancer。
DynamicServerListLoadBalancer
DynamicServerListLoadBalancer继承于BaseLoadBalancer类,它是对基础负载均衡器的扩展。实现了下面两个功能:
- 服务实例在运行期间的动态更新。
- 对服务器实例清单的过滤功能,可以通过过滤器来选择地获取一批服务实例清单。
DynamicServerListLoadBalancer构造函数初始化的时候会调用restOfInit方法
public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer { void restOfInit(IClientConfig clientConfig) { boolean primeConnection = this.isEnablePrimingConnections(); // turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList() this.setEnablePrimingConnections(false); //定时更新Eureka Client实例列表 enableAndInitLearnNewServersFeature(); //获取所有Eureka Client实例列表 updateListOfServers(); if (primeConnection && this.getPrimeConnections() != null) { this.getPrimeConnections() .primeConnections(getReachableServers()); } this.setEnablePrimingConnections(primeConnection); LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString()); } }enableAndInitLearnNewServersFeature() 每30秒定时更新Nacos Client实例列表
public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer { public void enableAndInitLearnNewServersFeature() { LOGGER.info("Using serverListUpdater {}", serverListUpdater.getClass().getSimpleName()); serverListUpdater.start(updateAction); } } public class PollingServerListUpdater implements ServerListUpdater { //更新服务实例在初始化之后延迟1秒后开始执行 private static long LISTOFSERVERS_CACHE_UPDATE_DELAY = 1000; // msecs; //以30秒为周期重复执行 private static int LISTOFSERVERS_CACHE_REPEAT_INTERVAL = 30 * 1000; // msecs; //以定时任务的方式进行服务列表的更新。 @Override public synchronized void start(final UpdateAction updateAction) { if (isActive.compareAndSet(false, true)) { //创建一个Runnable的线程wrapperRunnable final Runnable wrapperRunnable = new Runnable() { @Override public void run() { if (!isActive.get()) { if (scheduledFuture != null) { scheduledFuture.cancel(true); } return; } try { //具体更新服务实例列表的方法 updateAction.doUpdate(); lastUpdated = System.currentTimeMillis(); } catch (Exception e) { logger.warn("Failed one update cycle", e); } } }; //为wrapperRunnable线程启动一个定时任务 scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay( wrapperRunnable, initialDelayMs, //1秒 refreshIntervalMs, //30秒 TimeUnit.MILLISECONDS ); } else { logger.info("Already active, no-op"); } } }DynamicServerListLoadBalancer#updateListOfServers()
- 获取所有Nacos Client实例列表
从Nacos Server中获取服务可用实例列表,最终就调用到了NacosServerList类
public class NacosServerList extends AbstractServerList<NacosServer> { private NacosDiscoveryProperties discoveryProperties; private String serviceId; @Override public List<NacosServer> getUpdatedListOfServers() { return getServers(); } private List<NacosServer> getServers() { try { String group = discoveryProperties.getGroup(); //获取服务实例列表 List<Instance> instances = discoveryProperties.namingServiceInstance() .selectInstances(serviceId, group, true); //将List<Instance>转换成List<NacosServer>数据,并返回。 return instancesToServerList(instances); } catch (Exception e) { throw new IllegalStateException( "Can not get service instances from nacos, serviceId=" + serviceId, e); } } }NacosNamingService.selectInstances(serviceName, new ArrayList<String>(), healthyOnly)
- HostReactor.getServiceInfo():拿到ServiceInfo对象
- 从ServiceInfo对象中取出List<Instance> hosts属性值并返回。
hostReactor.getServiceInfoDirectlyFromServer
- 直接从Nacos服务端获取服务信息
hostReactor.getServiceInfo
- 获取ServiceInfo对象
HostReactor#updateServiceNow
- 从注册中心获取,并更新本地缓存
UpdateTask
- 定时更新任务
服务端处理服务发现请求
- /instance/list方法
ServiceManager#getService
获取服务
@Component public class ServiceManager implements RecordListener<Service> { /** * Map(namespace, Map(group::serviceName, Service)). */ private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>(); public Service getService(String namespaceId, String serviceName) { //namespaceId在注册中心不存在,直接返回null if (serviceMap.get(namespaceId) == null) { return null; } //根据namespaceId和serviceName获取服务 return chooseServiceMap(namespaceId).get(serviceName); } public Map<String, Service> chooseServiceMap(String namespaceId) { return serviceMap.get(namespaceId); } }Service.srvIPs
- 从clusterMap获取集群实例的IP集合
至此,Nacos服务发现源码分析完毕!
参考: blog.csdn.net/C18298182575/article/details/101549822
blog.csdn.net/liyanan21/article/details/89090482/
本文共计3029个文字,预计阅读时间需要13分钟。
Nacos服务发现领域模型:Namespace+Namespace实现不同环境的隔离(如开发、测试、预发、线上),默认public Group:不同服务可分配到同一组,默认DEFAULT_GROUP+Service:微服务+Cluster指定特定微服务的集群
Nacos服务发现的领域模型
- Namespace:实现各环境的隔离(如开发、测试、预发、线上),默认public
- Group:不同服务可以分到同一个组,默认DEFAULT_GROUP
- Service:微服务
- Cluster:对指定微服务的一个虚拟划分,默认DEFAULT
- Instance:微服务实例
- persistentInstances:持久实例集合
- ephemeralInstances:临时实例集合
一、服务发现前
目前在Spring Cloud,RPC基本都是使用Feign去调用服务,Feign其实也是Ribbon的一个封装,主要功能,是将我们通常www.jianshu.com/p/f3db11f045cc
ribbon最最底层也是实现spring cloud common包下的
-
org.springframework.cloud.alibaba.nacos.ribbon.NacosServerList 主要是ServiceInstanceChooser 下的继承类。
-
org.springframework.cloud.client.loadbalancer.LoadBalancerClient 这是Ribbon实现负载均衡的父类接口,接下来一系列的接口实现最终会落到如何获取serverList这个问题是,答案在这个接口:com.netflix.loadbalancer.ServerList
接下来,就是服务发现组件的事情了
-
eureka对于这个接口的实现就是DiscoveryEnabledNIWSServerList
-
Nacos的实现就是org.springframework.cloud.alibaba.nacos.ribbon.NacosServerList,这也是我们的重点。
二、服务发现
Nacos Client发起RPC调用请求后,通过RibbonLoadBalancerClient的getLoadBalancer方法获取负载均衡器,因为Spring Cloud默认指定了ZoneAwareLoadBalancer,但是ZoneAwareLoadBalancer的构造函数初始化父类DynamicServerListLoadBalancer。
DynamicServerListLoadBalancer
DynamicServerListLoadBalancer继承于BaseLoadBalancer类,它是对基础负载均衡器的扩展。实现了下面两个功能:
- 服务实例在运行期间的动态更新。
- 对服务器实例清单的过滤功能,可以通过过滤器来选择地获取一批服务实例清单。
DynamicServerListLoadBalancer构造函数初始化的时候会调用restOfInit方法
public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer { void restOfInit(IClientConfig clientConfig) { boolean primeConnection = this.isEnablePrimingConnections(); // turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList() this.setEnablePrimingConnections(false); //定时更新Eureka Client实例列表 enableAndInitLearnNewServersFeature(); //获取所有Eureka Client实例列表 updateListOfServers(); if (primeConnection && this.getPrimeConnections() != null) { this.getPrimeConnections() .primeConnections(getReachableServers()); } this.setEnablePrimingConnections(primeConnection); LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString()); } }enableAndInitLearnNewServersFeature() 每30秒定时更新Nacos Client实例列表
public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer { public void enableAndInitLearnNewServersFeature() { LOGGER.info("Using serverListUpdater {}", serverListUpdater.getClass().getSimpleName()); serverListUpdater.start(updateAction); } } public class PollingServerListUpdater implements ServerListUpdater { //更新服务实例在初始化之后延迟1秒后开始执行 private static long LISTOFSERVERS_CACHE_UPDATE_DELAY = 1000; // msecs; //以30秒为周期重复执行 private static int LISTOFSERVERS_CACHE_REPEAT_INTERVAL = 30 * 1000; // msecs; //以定时任务的方式进行服务列表的更新。 @Override public synchronized void start(final UpdateAction updateAction) { if (isActive.compareAndSet(false, true)) { //创建一个Runnable的线程wrapperRunnable final Runnable wrapperRunnable = new Runnable() { @Override public void run() { if (!isActive.get()) { if (scheduledFuture != null) { scheduledFuture.cancel(true); } return; } try { //具体更新服务实例列表的方法 updateAction.doUpdate(); lastUpdated = System.currentTimeMillis(); } catch (Exception e) { logger.warn("Failed one update cycle", e); } } }; //为wrapperRunnable线程启动一个定时任务 scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay( wrapperRunnable, initialDelayMs, //1秒 refreshIntervalMs, //30秒 TimeUnit.MILLISECONDS ); } else { logger.info("Already active, no-op"); } } }DynamicServerListLoadBalancer#updateListOfServers()
- 获取所有Nacos Client实例列表
从Nacos Server中获取服务可用实例列表,最终就调用到了NacosServerList类
public class NacosServerList extends AbstractServerList<NacosServer> { private NacosDiscoveryProperties discoveryProperties; private String serviceId; @Override public List<NacosServer> getUpdatedListOfServers() { return getServers(); } private List<NacosServer> getServers() { try { String group = discoveryProperties.getGroup(); //获取服务实例列表 List<Instance> instances = discoveryProperties.namingServiceInstance() .selectInstances(serviceId, group, true); //将List<Instance>转换成List<NacosServer>数据,并返回。 return instancesToServerList(instances); } catch (Exception e) { throw new IllegalStateException( "Can not get service instances from nacos, serviceId=" + serviceId, e); } } }NacosNamingService.selectInstances(serviceName, new ArrayList<String>(), healthyOnly)
- HostReactor.getServiceInfo():拿到ServiceInfo对象
- 从ServiceInfo对象中取出List<Instance> hosts属性值并返回。
hostReactor.getServiceInfoDirectlyFromServer
- 直接从Nacos服务端获取服务信息
hostReactor.getServiceInfo
- 获取ServiceInfo对象
HostReactor#updateServiceNow
- 从注册中心获取,并更新本地缓存
UpdateTask
- 定时更新任务
服务端处理服务发现请求
- /instance/list方法
ServiceManager#getService
获取服务
@Component public class ServiceManager implements RecordListener<Service> { /** * Map(namespace, Map(group::serviceName, Service)). */ private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>(); public Service getService(String namespaceId, String serviceName) { //namespaceId在注册中心不存在,直接返回null if (serviceMap.get(namespaceId) == null) { return null; } //根据namespaceId和serviceName获取服务 return chooseServiceMap(namespaceId).get(serviceName); } public Map<String, Service> chooseServiceMap(String namespaceId) { return serviceMap.get(namespaceId); } }Service.srvIPs
- 从clusterMap获取集群实例的IP集合
至此,Nacos服务发现源码分析完毕!
参考: blog.csdn.net/C18298182575/article/details/101549822
blog.csdn.net/liyanan21/article/details/89090482/

