Spring Cloud Eureka Client启动流程是如何详细解析的?
- 内容介绍
- 文章标签
- 相关推荐
本文共计3445个文字,预计阅读时间需要14分钟。
Eureka Client启动流程分析:使用@EnableDiscoveryClient注解,autoRegister()方法返回true则注册到注册中心,若配置为false,则不会自动注册。导入EnableDiscoveryClientImportSelector类,@Target(ElementType.TYPE)
Eureka Client启动流程分析
@EnableDiscoveryClient注解作用
- autoRegister()方法返回true则注册到注册中心,如果你配置为false,那么就不会自动注册
- 导入EnableDiscoveryClientImportSelector类
自动装载核心配置类
根据自动装载原则可以在spring-cloud-netflix-eureka-client-2.2.5.RELEASE.jar下的META-INF目录下找到 spring.factories文件
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ org.springframework.cloud.netflix.eureka.config.EurekaClientConfigServerAutoConfiguration,\ org.springframework.cloud.netflix.eureka.config.DiscoveryClientOptionalArgsConfiguration,\ org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration,\ org.springframework.cloud.netflix.ribbon.eureka.RibbonEurekaAutoConfiguration,\ org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration,\ org.springframework.cloud.netflix.eureka.reactive.EurekaReactiveDiscoveryClientConfiguration,\ org.springframework.cloud.netflix.eureka.loadbalancer.LoadBalancerEurekaAutoConfiguration org.springframework.cloud.bootstrap.BootstrapConfiguration=\ org.springframework.cloud.netflix.eureka.config.EurekaConfigServerBootstrapConfigurationEurekaDiscoveryClientConfigServiceBootstrapConfiguration
@ConditionalOnClass(ConfigServicePropertySourceLocator.class) @ConditionalOnProperty(value = "spring.cloud.config.discovery.enabled", matchIfMissing = false) @Configuration(proxyBeanMethods = false) @EnableConfigurationProperties public class EurekaConfigServerBootstrapConfiguration { }上方两个注解则是这个配置类是否能够开启的条件,这里就不再展开,直接看它引入的配置类吧
EurekaDiscoveryClientConfiguration
- 1、细心的读者可能会发现这里又注册了一个Marker类,可以猜测也是某个地方的开关。
- 2、EurekaClientConfigurationRefresher这个类看名字就知道这是当配置被动态刷新时的一个处理器,这里也不再展开了。
- 3、EurekaHealthCheckHandlerConfiguration这里面注册了一个Eureka健康检查的处理类。
EurekaClientAutoConfiguration
这个类里面全是重点,也是我们本文的核心
@Configuration(proxyBeanMethods = false) @EnableConfigurationProperties @ConditionalOnClass(EurekaClientConfig.class) @ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true) @ConditionalOnDiscoveryEnabled @AutoConfigureBefore({ NoopDiscoveryClientAutoConfiguration.class, CommonsClientAutoConfiguration.class, ServiceRegistryAutoConfiguration.class }) @AutoConfigureAfter(name = { "org.springframework.cloud.netflix.eureka.config.DiscoveryClientOptionalArgsConfiguration", "org.springframework.cloud.autoconfigure.RefreshAutoConfiguration", "org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration", "org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationAutoConfiguration" }) public class EurekaClientAutoConfiguration { }首先可以看到这个类一共包含这些注解,我们来一一解析比较重要的几个注解吧
-
@Import(DiscoveryClientOptionalArgsConfiguration.class)引入了两个bean,RestTemplateDiscoveryClientOptionalArgs和MutableDiscoveryClientOptionalArgs ,这两个类的作用暂且不说。
-
@ConditionalOnBean(EurekaDiscoveryClientConfiguration.Marker.class)刚才说的Marker类的作用出来了。
-
@AutoConfigureBefore 既然必须在这三个类完成自动装配之后才能进行装配,那就代表着这三个类肯定大有用途,研究一下吧
-
NoopDiscoveryClientAutoConfiguration 故名思意,负责服务发现的类,咱们重点关注一下其中的几个方法
-
1、init()方法
这里构造了一个DefaultServiceInstance对象,这个对象包含了当前项目的ip+端口+项目名称。
- 2、注入beanNoopDiscoveryClient
这个类包含了获取当前实例以及当前服务的方法,其类图如下,在使用时肯定使用的是EurekaDiscoveryClient
EurekaClientAutoConfiguration作为自动配置类,看看它主要配置了哪些东西
配置当前实例信息
配置实例信息包含很多,不过核心的无非就是名称、唯一标识、IP地址、端口等等
@Bean @ConditionalOnMissingBean(value = EurekaInstanceConfig.class, search = SearchStrategy.CURRENT) public EurekaInstanceConfigBean eurekaInstanceConfigBean(InetUtils inetUtils, ManagementMetadataProvider managementMetadataProvider) { String hostname = getProperty("eureka.instance.hostname"); boolean preferIpAddress = Boolean .parseBoolean(getProperty("eureka.instance.prefer-ip-address")); String ipAddress = getProperty("eureka.instance.ip-address"); boolean isSecurePortEnabled = Boolean .parseBoolean(getProperty("eureka.instance.secure-port-enabled")); String serverContextPath = env.getProperty("server.servlet.context-path", "/"); int serverPort = Integer.parseInt( env.getProperty("server.port", env.getProperty("port", "8080"))); Integer managementPort = env.getProperty("management.server.port", Integer.class); String managementContextPath = env .getProperty("management.server.servlet.context-path"); Integer jmxPort = env.getProperty("com.sun.management.jmxremote.port", Integer.class); EurekaInstanceConfigBean instance = new EurekaInstanceConfigBean(inetUtils); instance.setNonSecurePort(serverPort); instance.setInstanceId(getDefaultInstanceId(env)); instance.setPreferIpAddress(preferIpAddress); instance.setSecurePortEnabled(isSecurePortEnabled); if (StringUtils.hasText(ipAddress)) { instance.setIpAddress(ipAddress); } if (isSecurePortEnabled) { instance.setSecurePort(serverPort); } if (StringUtils.hasText(hostname)) { instance.setHostname(hostname); } String statusPageUrlPath = getProperty("eureka.instance.status-page-url-path"); String healthCheckUrlPath = getProperty("eureka.instance.health-check-url-path"); if (StringUtils.hasText(statusPageUrlPath)) { instance.setStatusPageUrlPath(statusPageUrlPath); } if (StringUtils.hasText(healthCheckUrlPath)) { instance.setHealthCheckUrlPath(healthCheckUrlPath); } ManagementMetadata metadata = managementMetadataProvider.get(instance, serverPort, serverContextPath, managementContextPath, managementPort); if (metadata != null) { instance.setStatusPageUrl(metadata.getStatusPageUrl()); instance.setHealthCheckUrl(metadata.getHealthCheckUrl()); if (instance.isSecurePortEnabled()) { instance.setSecureHealthCheckUrl(metadata.getSecureHealthCheckUrl()); } Map<String, String> metadataMap = instance.getMetadataMap(); metadataMap.computeIfAbsent("management.port", k -> String.valueOf(metadata.getManagementPort())); } else { // without the metadata the status and health check URLs will not be set // and the status page and health check url paths will not include the // context path so set them here if (StringUtils.hasText(managementContextPath)) { instance.setHealthCheckUrlPath( managementContextPath + instance.getHealthCheckUrlPath()); instance.setStatusPageUrlPath( managementContextPath + instance.getStatusPageUrlPath()); } } setupJmxPort(instance, jmxPort); return instance; }负责注册的Bean
@Bean public EurekaServiceRegistry eurekaServiceRegistry() { return new EurekaServiceRegistry(); }自动注册调用的Bean
@Bean @ConditionalOnBean(AutoServiceRegistrationProperties.class) @ConditionalOnProperty( value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true) public EurekaAutoServiceRegistration eurekaAutoServiceRegistration( ApplicationContext context, EurekaServiceRegistry registry, EurekaRegistration registration) { return new EurekaAutoServiceRegistration(context, registry, registration); }Eureka待注册的对象
这个对象会包含上面的eurekaInstanceIConfigBean
@Bean @ConditionalOnBean(AutoServiceRegistrationProperties.class) @ConditionalOnProperty( value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true) public EurekaRegistration eurekaRegistration(EurekaClient eurekaClient, CloudEurekaInstanceConfig instanceConfig, ApplicationInfoManager applicationInfoManager, @Autowired( required = false) ObjectProvider<HealthCheckHandler> healthCheckHandler) { return EurekaRegistration.builder(instanceConfig).with(applicationInfoManager) .with(eurekaClient).with(healthCheckHandler).build(); }Eureka client配置
@Bean @ConditionalOnMissingBean(value = EurekaClientConfig.class, search = SearchStrategy.CURRENT) public EurekaClientConfigBean eurekaClientConfigBean(ConfigurableEnvironment env) { return new EurekaClientConfigBean(); }EurekaClient
@Bean(destroyMethod = "shutdown") @ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT) public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config) { return new CloudEurekaClient(manager, config, this.optionalArgs, this.context); }其中CloudEurekaClient是DiscoveryClient的子类,而DiscoveryClient则是EurekaClient的核心类。
new CloudEurekaClient会调用父类DiscoveryClient的构造方法
public class CloudEurekaClient extends DiscoveryClient { public CloudEurekaClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs<?> args, ApplicationEventPublisher publisher) { //这里会调用父类DiscoveryClient的构造方法 super(applicationInfoManager, config, args); this.applicationInfoManager = applicationInfoManager; this.publisher = publisher; this.eurekaTransportField = ReflectionUtils.findField(DiscoveryClient.class, "eurekaTransport"); ReflectionUtils.makeAccessible(this.eurekaTransportField); } }在DiscoveryClient中最终会调用到@Inject注解修饰的DiscoveryClient构造方法
@Singleton public class DiscoveryClient implements EurekaClient { @Inject DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) { if (args != null) { this.healthCheckHandlerProvider = args.healthCheckHandlerProvider; this.healthCheckCallbackProvider = args.healthCheckCallbackProvider; this.eventListeners.addAll(args.getEventListeners()); this.preRegistrationHandler = args.preRegistrationHandler; } else { this.healthCheckCallbackProvider = null; this.healthCheckHandlerProvider = null; this.preRegistrationHandler = null; } this.applicationInfoManager = applicationInfoManager; InstanceInfo myInfo = applicationInfoManager.getInfo(); clientConfig = config; staticClientConfig = clientConfig; transportConfig = config.getTransportConfig(); instanceInfo = myInfo; if (myInfo != null) { appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId(); } else { logger.warn("Setting instanceInfo to a passed in null value"); } this.backupRegistryProvider = backupRegistryProvider; this.endpointRandomizer = endpointRandomizer; this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo); localRegionApps.set(new Applications()); fetchRegistryGeneration = new AtomicLong(0); remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions()); remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(",")); if (config.shouldFetchRegistry()) { this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L}); } else { this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC; } if (config.shouldRegisterWithEureka()) { this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L}); } else { this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC; } logger.info("Initializing Eureka in region {}", clientConfig.getRegion()); if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) { logger.info("Client configured to neither register nor query for data."); scheduler = null; heartbeatExecutor = null; cacheRefreshExecutor = null; eurekaTransport = null; instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion()); // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance() // to work with DI'd DiscoveryClient DiscoveryManager.getInstance().setDiscoveryClient(this); DiscoveryManager.getInstance().setEurekaClientConfig(config); initTimestampMs = System.currentTimeMillis(); initRegistrySize = this.getApplications().size(); registrySize = initRegistrySize; logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}", initTimestampMs, initRegistrySize); return; // no need to setup up an network tasks and we are done } try { // default size of 2 - 1 each for heartbeat and cacheRefresh scheduler = Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-%d") .setDaemon(true) .build()); heartbeatExecutor = new ThreadPoolExecutor( 1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d") .setDaemon(true) .build() ); // use direct handoff cacheRefreshExecutor = new ThreadPoolExecutor( 1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d") .setDaemon(true) .build() ); // use direct handoff eurekaTransport = new EurekaTransport(); scheduleServerEndpointTask(eurekaTransport, args); AzToRegionMapper azToRegionMapper; if (clientConfig.shouldUseDnsForFetchingServiceUrls()) { azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig); } else { azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig); } if (null != remoteRegionsToFetch.get()) { azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(",")); } instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion()); } catch (Throwable e) { throw new RuntimeException("Failed to initialize DiscoveryClient!", e); } if (clientConfig.shouldFetchRegistry()) { try { boolean primaryFetchRegistryResult = fetchRegistry(false); if (!primaryFetchRegistryResult) { logger.info("Initial registry fetch from primary servers failed"); } boolean backupFetchRegistryResult = true; if (!primaryFetchRegistryResult && !fetchRegistryFromBackup()) { backupFetchRegistryResult = false; logger.info("Initial registry fetch from backup servers failed"); } if (!primaryFetchRegistryResult && !backupFetchRegistryResult && clientConfig.shouldEnforceFetchRegistryAtInit()) { throw new IllegalStateException("Fetch registry error at startup. Initial fetch failed."); } } catch (Throwable th) { logger.error("Fetch registry error at startup: {}", th.getMessage()); throw new IllegalStateException(th); } } // call and execute the pre registration handler before all background tasks (inc registration) is started if (this.preRegistrationHandler != null) { this.preRegistrationHandler.beforeRegistration(); } if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) { try { if (!register() ) { throw new IllegalStateException("Registration error at startup. Invalid server response."); } } catch (Throwable th) { logger.error("Registration error at startup: {}", th.getMessage()); throw new IllegalStateException(th); } } // finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch //初始化调度任务(例如群集解析程序、心跳、服务实例同步、获取注册信息) initScheduledTasks(); try { Monitors.registerObject(this); } catch (Throwable e) { logger.warn("Cannot register timers", e); } // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance() // to work with DI'd DiscoveryClient DiscoveryManager.getInstance().setDiscoveryClient(this); DiscoveryManager.getInstance().setEurekaClientConfig(config); initTimestampMs = System.currentTimeMillis(); initRegistrySize = this.getApplications().size(); registrySize = initRegistrySize; logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}", initTimestampMs, initRegistrySize); } }initScheduledTasks()
初始化调度任务(例如群集解析程序、心跳、服务实例同步、获取注册信息)
@Singleton public class DiscoveryClient implements EurekaClient { /** * 初始化所有计划的任务 */ private void initScheduledTasks() { //获取注册信息的定时任务 if (clientConfig.shouldFetchRegistry()) { // registry cache refresh timer int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds(); int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound(); cacheRefreshTask = new TimedSupervisorTask( "cacheRefresh", scheduler, cacheRefreshExecutor, registryFetchIntervalSeconds, TimeUnit.SECONDS, expBackOffBound, new CacheRefreshThread() ); scheduler.schedule( cacheRefreshTask, registryFetchIntervalSeconds, TimeUnit.SECONDS); } if (clientConfig.shouldRegisterWithEureka()) { int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound(); logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs); //心跳定时任务 // Heartbeat timer heartbeatTask = new TimedSupervisorTask( "heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread() ); scheduler.schedule( heartbeatTask, renewalIntervalInSecs, TimeUnit.SECONDS); //服务实例同步定时任务 // InstanceInfo replicator instanceInfoReplicator = new InstanceInfoReplicator( this, instanceInfo, clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2); // burstSize statusChangeListener = new ApplicationInfoManager.StatusChangeListener() { @Override public String getId() { return "statusChangeListener"; } @Override public void notify(StatusChangeEvent statusChangeEvent) { if (InstanceStatus.DOWN == statusChangeEvent.getStatus() || InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) { // log at warn level if DOWN was involved logger.warn("Saw local status change event {}", statusChangeEvent); } else { logger.info("Saw local status change event {}", statusChangeEvent); } instanceInfoReplicator.onDemandUpdate(); } }; if (clientConfig.shouldOnDemandUpdateStatusChange()) { applicationInfoManager.registerStatusChangeListener(statusChangeListener); } //初始化定时服务注册任务 instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()); } else { logger.info("Not registering with Eureka server per configuration"); } } }CacheRefreshThread——定时更新服务注册列表信息
@Singleton public class DiscoveryClient implements EurekaClient { class CacheRefreshThread implements Runnable { public void run() { //缓存刷新(refreshRegistry) refreshRegistry(); } } }缓存刷新(refreshRegistry)
- 系统默认是每隔30秒刷新本地存储的注册表
获取注册表——fetchRegistry
@Singleton public class DiscoveryClient implements EurekaClient { private boolean fetchRegistry(boolean forceFullRegistryFetch) { //用Stopwatch做耗时分析 Stopwatch tracer = FETCH_REGISTRY_TIMER.start(); try { // 取出本地缓存的,之前获取的服务列表信息 Applications applications = getApplications(); //判断多个条件,确定是否触发全量更新,如下任一个满足都会全量更新: //1. 是否禁用增量更新; //2. 是否对某个region特别关注; //3. 外部调用时是否通过入参指定全量更新; //4. 本地还未缓存有效的服务列表信息; if (clientConfig.shouldDisableDelta() || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress())) || forceFullRegistryFetch || (applications == null) || (applications.getRegisteredApplications().size() == 0) || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta { //这些详细的日志可以看出触发全量更新的原因 logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta()); logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress()); logger.info("Force full registry fetch : {}", forceFullRegistryFetch); logger.info("Application is null : {}", (applications == null)); logger.info("Registered Applications size is zero : {}", (applications.getRegisteredApplications().size() == 0)); logger.info("Application version is -1: {}", (applications.getVersion() == -1)); //全量更新 getAndStoreFullRegistry(); } else { //增量更新 getAndUpdateDelta(applications); } //重新计算和设置一致性hash码 applications.setAppsHashCode(applications.getReconcileHashCode()); //日志打印所有应用的所有实例数之和 logTotalInstances(); } catch (Throwable e) { logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e); return false; } finally { if (tracer != null) { tracer.stop(); } } //将本地缓存更新的事件广播给所有已注册的监听器,注意该方法已被CloudEurekaClient类重写 onCacheRefreshed(); //检查刚刚更新的缓存中,有来自Eureka server的服务列表,其中包含了当前应用的状态, //当前实例的成员变量lastRemoteInstanceStatus,记录的是最后一次更新的当前应用状态, //上述两种状态在updateInstanceRemoteStatus方法中作比较 ,如果不一致,就更新lastRemoteInstanceStatus,并且广播对应的事件 updateInstanceRemoteStatus(); // registry was fetched successfully, so return true return true; } }全量更新本地缓存的服务列表
- getAndStoreFullRegistry方法负责全量更新,代码如下所示,非常简单的逻辑:
小结:
- 1、官方文档对整个过程做了准确的总结,围绕着这些总结去看代码,能够事半功倍,重要是整个过程都保持的正确的方向,不会由于细节的干扰而偏离主线;
- 2、Eureka的注册中心设计,尽管多个client轮询请求会增加服务器压力,但使用增量更新再加上Server自身缓存3分钟数据的方式,可以有效的减少数据量和相关的计算,再加上一致性哈希码来弥补增量更新的弊端,在性能和完整性方面都有了保证,另外增量更新不需要client的时间戳,这样既节省性能又简化了实现逻辑,这种设计方式值得我们学习;
HeartbeatThread——服务定时续约线程
HeartbeatThread类中,通过调用renew方法实现续租,如下代码所示,方法注释已说明是Restfult请求来实现的,对应Eureka server的返回信息github.com/Netflix/eureka/wiki/Understanding-eureka-client-server-communication#renew
参考: www.cnblogs.com/zhixiang-org-cn/p/11689212.html
blog.csdn.net/u010647035/article/details/83245433
xinchen.blog.csdn.net/article/details/82915355
blog.csdn.net/boling_cavalry/article/details/82813180
本文共计3445个文字,预计阅读时间需要14分钟。
Eureka Client启动流程分析:使用@EnableDiscoveryClient注解,autoRegister()方法返回true则注册到注册中心,若配置为false,则不会自动注册。导入EnableDiscoveryClientImportSelector类,@Target(ElementType.TYPE)
Eureka Client启动流程分析
@EnableDiscoveryClient注解作用
- autoRegister()方法返回true则注册到注册中心,如果你配置为false,那么就不会自动注册
- 导入EnableDiscoveryClientImportSelector类
自动装载核心配置类
根据自动装载原则可以在spring-cloud-netflix-eureka-client-2.2.5.RELEASE.jar下的META-INF目录下找到 spring.factories文件
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ org.springframework.cloud.netflix.eureka.config.EurekaClientConfigServerAutoConfiguration,\ org.springframework.cloud.netflix.eureka.config.DiscoveryClientOptionalArgsConfiguration,\ org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration,\ org.springframework.cloud.netflix.ribbon.eureka.RibbonEurekaAutoConfiguration,\ org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration,\ org.springframework.cloud.netflix.eureka.reactive.EurekaReactiveDiscoveryClientConfiguration,\ org.springframework.cloud.netflix.eureka.loadbalancer.LoadBalancerEurekaAutoConfiguration org.springframework.cloud.bootstrap.BootstrapConfiguration=\ org.springframework.cloud.netflix.eureka.config.EurekaConfigServerBootstrapConfigurationEurekaDiscoveryClientConfigServiceBootstrapConfiguration
@ConditionalOnClass(ConfigServicePropertySourceLocator.class) @ConditionalOnProperty(value = "spring.cloud.config.discovery.enabled", matchIfMissing = false) @Configuration(proxyBeanMethods = false) @EnableConfigurationProperties public class EurekaConfigServerBootstrapConfiguration { }上方两个注解则是这个配置类是否能够开启的条件,这里就不再展开,直接看它引入的配置类吧
EurekaDiscoveryClientConfiguration
- 1、细心的读者可能会发现这里又注册了一个Marker类,可以猜测也是某个地方的开关。
- 2、EurekaClientConfigurationRefresher这个类看名字就知道这是当配置被动态刷新时的一个处理器,这里也不再展开了。
- 3、EurekaHealthCheckHandlerConfiguration这里面注册了一个Eureka健康检查的处理类。
EurekaClientAutoConfiguration
这个类里面全是重点,也是我们本文的核心
@Configuration(proxyBeanMethods = false) @EnableConfigurationProperties @ConditionalOnClass(EurekaClientConfig.class) @ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true) @ConditionalOnDiscoveryEnabled @AutoConfigureBefore({ NoopDiscoveryClientAutoConfiguration.class, CommonsClientAutoConfiguration.class, ServiceRegistryAutoConfiguration.class }) @AutoConfigureAfter(name = { "org.springframework.cloud.netflix.eureka.config.DiscoveryClientOptionalArgsConfiguration", "org.springframework.cloud.autoconfigure.RefreshAutoConfiguration", "org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration", "org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationAutoConfiguration" }) public class EurekaClientAutoConfiguration { }首先可以看到这个类一共包含这些注解,我们来一一解析比较重要的几个注解吧
-
@Import(DiscoveryClientOptionalArgsConfiguration.class)引入了两个bean,RestTemplateDiscoveryClientOptionalArgs和MutableDiscoveryClientOptionalArgs ,这两个类的作用暂且不说。
-
@ConditionalOnBean(EurekaDiscoveryClientConfiguration.Marker.class)刚才说的Marker类的作用出来了。
-
@AutoConfigureBefore 既然必须在这三个类完成自动装配之后才能进行装配,那就代表着这三个类肯定大有用途,研究一下吧
-
NoopDiscoveryClientAutoConfiguration 故名思意,负责服务发现的类,咱们重点关注一下其中的几个方法
-
1、init()方法
这里构造了一个DefaultServiceInstance对象,这个对象包含了当前项目的ip+端口+项目名称。
- 2、注入beanNoopDiscoveryClient
这个类包含了获取当前实例以及当前服务的方法,其类图如下,在使用时肯定使用的是EurekaDiscoveryClient
EurekaClientAutoConfiguration作为自动配置类,看看它主要配置了哪些东西
配置当前实例信息
配置实例信息包含很多,不过核心的无非就是名称、唯一标识、IP地址、端口等等
@Bean @ConditionalOnMissingBean(value = EurekaInstanceConfig.class, search = SearchStrategy.CURRENT) public EurekaInstanceConfigBean eurekaInstanceConfigBean(InetUtils inetUtils, ManagementMetadataProvider managementMetadataProvider) { String hostname = getProperty("eureka.instance.hostname"); boolean preferIpAddress = Boolean .parseBoolean(getProperty("eureka.instance.prefer-ip-address")); String ipAddress = getProperty("eureka.instance.ip-address"); boolean isSecurePortEnabled = Boolean .parseBoolean(getProperty("eureka.instance.secure-port-enabled")); String serverContextPath = env.getProperty("server.servlet.context-path", "/"); int serverPort = Integer.parseInt( env.getProperty("server.port", env.getProperty("port", "8080"))); Integer managementPort = env.getProperty("management.server.port", Integer.class); String managementContextPath = env .getProperty("management.server.servlet.context-path"); Integer jmxPort = env.getProperty("com.sun.management.jmxremote.port", Integer.class); EurekaInstanceConfigBean instance = new EurekaInstanceConfigBean(inetUtils); instance.setNonSecurePort(serverPort); instance.setInstanceId(getDefaultInstanceId(env)); instance.setPreferIpAddress(preferIpAddress); instance.setSecurePortEnabled(isSecurePortEnabled); if (StringUtils.hasText(ipAddress)) { instance.setIpAddress(ipAddress); } if (isSecurePortEnabled) { instance.setSecurePort(serverPort); } if (StringUtils.hasText(hostname)) { instance.setHostname(hostname); } String statusPageUrlPath = getProperty("eureka.instance.status-page-url-path"); String healthCheckUrlPath = getProperty("eureka.instance.health-check-url-path"); if (StringUtils.hasText(statusPageUrlPath)) { instance.setStatusPageUrlPath(statusPageUrlPath); } if (StringUtils.hasText(healthCheckUrlPath)) { instance.setHealthCheckUrlPath(healthCheckUrlPath); } ManagementMetadata metadata = managementMetadataProvider.get(instance, serverPort, serverContextPath, managementContextPath, managementPort); if (metadata != null) { instance.setStatusPageUrl(metadata.getStatusPageUrl()); instance.setHealthCheckUrl(metadata.getHealthCheckUrl()); if (instance.isSecurePortEnabled()) { instance.setSecureHealthCheckUrl(metadata.getSecureHealthCheckUrl()); } Map<String, String> metadataMap = instance.getMetadataMap(); metadataMap.computeIfAbsent("management.port", k -> String.valueOf(metadata.getManagementPort())); } else { // without the metadata the status and health check URLs will not be set // and the status page and health check url paths will not include the // context path so set them here if (StringUtils.hasText(managementContextPath)) { instance.setHealthCheckUrlPath( managementContextPath + instance.getHealthCheckUrlPath()); instance.setStatusPageUrlPath( managementContextPath + instance.getStatusPageUrlPath()); } } setupJmxPort(instance, jmxPort); return instance; }负责注册的Bean
@Bean public EurekaServiceRegistry eurekaServiceRegistry() { return new EurekaServiceRegistry(); }自动注册调用的Bean
@Bean @ConditionalOnBean(AutoServiceRegistrationProperties.class) @ConditionalOnProperty( value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true) public EurekaAutoServiceRegistration eurekaAutoServiceRegistration( ApplicationContext context, EurekaServiceRegistry registry, EurekaRegistration registration) { return new EurekaAutoServiceRegistration(context, registry, registration); }Eureka待注册的对象
这个对象会包含上面的eurekaInstanceIConfigBean
@Bean @ConditionalOnBean(AutoServiceRegistrationProperties.class) @ConditionalOnProperty( value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true) public EurekaRegistration eurekaRegistration(EurekaClient eurekaClient, CloudEurekaInstanceConfig instanceConfig, ApplicationInfoManager applicationInfoManager, @Autowired( required = false) ObjectProvider<HealthCheckHandler> healthCheckHandler) { return EurekaRegistration.builder(instanceConfig).with(applicationInfoManager) .with(eurekaClient).with(healthCheckHandler).build(); }Eureka client配置
@Bean @ConditionalOnMissingBean(value = EurekaClientConfig.class, search = SearchStrategy.CURRENT) public EurekaClientConfigBean eurekaClientConfigBean(ConfigurableEnvironment env) { return new EurekaClientConfigBean(); }EurekaClient
@Bean(destroyMethod = "shutdown") @ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT) public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config) { return new CloudEurekaClient(manager, config, this.optionalArgs, this.context); }其中CloudEurekaClient是DiscoveryClient的子类,而DiscoveryClient则是EurekaClient的核心类。
new CloudEurekaClient会调用父类DiscoveryClient的构造方法
public class CloudEurekaClient extends DiscoveryClient { public CloudEurekaClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs<?> args, ApplicationEventPublisher publisher) { //这里会调用父类DiscoveryClient的构造方法 super(applicationInfoManager, config, args); this.applicationInfoManager = applicationInfoManager; this.publisher = publisher; this.eurekaTransportField = ReflectionUtils.findField(DiscoveryClient.class, "eurekaTransport"); ReflectionUtils.makeAccessible(this.eurekaTransportField); } }在DiscoveryClient中最终会调用到@Inject注解修饰的DiscoveryClient构造方法
@Singleton public class DiscoveryClient implements EurekaClient { @Inject DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) { if (args != null) { this.healthCheckHandlerProvider = args.healthCheckHandlerProvider; this.healthCheckCallbackProvider = args.healthCheckCallbackProvider; this.eventListeners.addAll(args.getEventListeners()); this.preRegistrationHandler = args.preRegistrationHandler; } else { this.healthCheckCallbackProvider = null; this.healthCheckHandlerProvider = null; this.preRegistrationHandler = null; } this.applicationInfoManager = applicationInfoManager; InstanceInfo myInfo = applicationInfoManager.getInfo(); clientConfig = config; staticClientConfig = clientConfig; transportConfig = config.getTransportConfig(); instanceInfo = myInfo; if (myInfo != null) { appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId(); } else { logger.warn("Setting instanceInfo to a passed in null value"); } this.backupRegistryProvider = backupRegistryProvider; this.endpointRandomizer = endpointRandomizer; this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo); localRegionApps.set(new Applications()); fetchRegistryGeneration = new AtomicLong(0); remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions()); remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(",")); if (config.shouldFetchRegistry()) { this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L}); } else { this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC; } if (config.shouldRegisterWithEureka()) { this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L}); } else { this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC; } logger.info("Initializing Eureka in region {}", clientConfig.getRegion()); if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) { logger.info("Client configured to neither register nor query for data."); scheduler = null; heartbeatExecutor = null; cacheRefreshExecutor = null; eurekaTransport = null; instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion()); // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance() // to work with DI'd DiscoveryClient DiscoveryManager.getInstance().setDiscoveryClient(this); DiscoveryManager.getInstance().setEurekaClientConfig(config); initTimestampMs = System.currentTimeMillis(); initRegistrySize = this.getApplications().size(); registrySize = initRegistrySize; logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}", initTimestampMs, initRegistrySize); return; // no need to setup up an network tasks and we are done } try { // default size of 2 - 1 each for heartbeat and cacheRefresh scheduler = Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-%d") .setDaemon(true) .build()); heartbeatExecutor = new ThreadPoolExecutor( 1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d") .setDaemon(true) .build() ); // use direct handoff cacheRefreshExecutor = new ThreadPoolExecutor( 1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d") .setDaemon(true) .build() ); // use direct handoff eurekaTransport = new EurekaTransport(); scheduleServerEndpointTask(eurekaTransport, args); AzToRegionMapper azToRegionMapper; if (clientConfig.shouldUseDnsForFetchingServiceUrls()) { azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig); } else { azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig); } if (null != remoteRegionsToFetch.get()) { azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(",")); } instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion()); } catch (Throwable e) { throw new RuntimeException("Failed to initialize DiscoveryClient!", e); } if (clientConfig.shouldFetchRegistry()) { try { boolean primaryFetchRegistryResult = fetchRegistry(false); if (!primaryFetchRegistryResult) { logger.info("Initial registry fetch from primary servers failed"); } boolean backupFetchRegistryResult = true; if (!primaryFetchRegistryResult && !fetchRegistryFromBackup()) { backupFetchRegistryResult = false; logger.info("Initial registry fetch from backup servers failed"); } if (!primaryFetchRegistryResult && !backupFetchRegistryResult && clientConfig.shouldEnforceFetchRegistryAtInit()) { throw new IllegalStateException("Fetch registry error at startup. Initial fetch failed."); } } catch (Throwable th) { logger.error("Fetch registry error at startup: {}", th.getMessage()); throw new IllegalStateException(th); } } // call and execute the pre registration handler before all background tasks (inc registration) is started if (this.preRegistrationHandler != null) { this.preRegistrationHandler.beforeRegistration(); } if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) { try { if (!register() ) { throw new IllegalStateException("Registration error at startup. Invalid server response."); } } catch (Throwable th) { logger.error("Registration error at startup: {}", th.getMessage()); throw new IllegalStateException(th); } } // finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch //初始化调度任务(例如群集解析程序、心跳、服务实例同步、获取注册信息) initScheduledTasks(); try { Monitors.registerObject(this); } catch (Throwable e) { logger.warn("Cannot register timers", e); } // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance() // to work with DI'd DiscoveryClient DiscoveryManager.getInstance().setDiscoveryClient(this); DiscoveryManager.getInstance().setEurekaClientConfig(config); initTimestampMs = System.currentTimeMillis(); initRegistrySize = this.getApplications().size(); registrySize = initRegistrySize; logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}", initTimestampMs, initRegistrySize); } }initScheduledTasks()
初始化调度任务(例如群集解析程序、心跳、服务实例同步、获取注册信息)
@Singleton public class DiscoveryClient implements EurekaClient { /** * 初始化所有计划的任务 */ private void initScheduledTasks() { //获取注册信息的定时任务 if (clientConfig.shouldFetchRegistry()) { // registry cache refresh timer int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds(); int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound(); cacheRefreshTask = new TimedSupervisorTask( "cacheRefresh", scheduler, cacheRefreshExecutor, registryFetchIntervalSeconds, TimeUnit.SECONDS, expBackOffBound, new CacheRefreshThread() ); scheduler.schedule( cacheRefreshTask, registryFetchIntervalSeconds, TimeUnit.SECONDS); } if (clientConfig.shouldRegisterWithEureka()) { int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound(); logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs); //心跳定时任务 // Heartbeat timer heartbeatTask = new TimedSupervisorTask( "heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread() ); scheduler.schedule( heartbeatTask, renewalIntervalInSecs, TimeUnit.SECONDS); //服务实例同步定时任务 // InstanceInfo replicator instanceInfoReplicator = new InstanceInfoReplicator( this, instanceInfo, clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2); // burstSize statusChangeListener = new ApplicationInfoManager.StatusChangeListener() { @Override public String getId() { return "statusChangeListener"; } @Override public void notify(StatusChangeEvent statusChangeEvent) { if (InstanceStatus.DOWN == statusChangeEvent.getStatus() || InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) { // log at warn level if DOWN was involved logger.warn("Saw local status change event {}", statusChangeEvent); } else { logger.info("Saw local status change event {}", statusChangeEvent); } instanceInfoReplicator.onDemandUpdate(); } }; if (clientConfig.shouldOnDemandUpdateStatusChange()) { applicationInfoManager.registerStatusChangeListener(statusChangeListener); } //初始化定时服务注册任务 instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()); } else { logger.info("Not registering with Eureka server per configuration"); } } }CacheRefreshThread——定时更新服务注册列表信息
@Singleton public class DiscoveryClient implements EurekaClient { class CacheRefreshThread implements Runnable { public void run() { //缓存刷新(refreshRegistry) refreshRegistry(); } } }缓存刷新(refreshRegistry)
- 系统默认是每隔30秒刷新本地存储的注册表
获取注册表——fetchRegistry
@Singleton public class DiscoveryClient implements EurekaClient { private boolean fetchRegistry(boolean forceFullRegistryFetch) { //用Stopwatch做耗时分析 Stopwatch tracer = FETCH_REGISTRY_TIMER.start(); try { // 取出本地缓存的,之前获取的服务列表信息 Applications applications = getApplications(); //判断多个条件,确定是否触发全量更新,如下任一个满足都会全量更新: //1. 是否禁用增量更新; //2. 是否对某个region特别关注; //3. 外部调用时是否通过入参指定全量更新; //4. 本地还未缓存有效的服务列表信息; if (clientConfig.shouldDisableDelta() || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress())) || forceFullRegistryFetch || (applications == null) || (applications.getRegisteredApplications().size() == 0) || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta { //这些详细的日志可以看出触发全量更新的原因 logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta()); logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress()); logger.info("Force full registry fetch : {}", forceFullRegistryFetch); logger.info("Application is null : {}", (applications == null)); logger.info("Registered Applications size is zero : {}", (applications.getRegisteredApplications().size() == 0)); logger.info("Application version is -1: {}", (applications.getVersion() == -1)); //全量更新 getAndStoreFullRegistry(); } else { //增量更新 getAndUpdateDelta(applications); } //重新计算和设置一致性hash码 applications.setAppsHashCode(applications.getReconcileHashCode()); //日志打印所有应用的所有实例数之和 logTotalInstances(); } catch (Throwable e) { logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e); return false; } finally { if (tracer != null) { tracer.stop(); } } //将本地缓存更新的事件广播给所有已注册的监听器,注意该方法已被CloudEurekaClient类重写 onCacheRefreshed(); //检查刚刚更新的缓存中,有来自Eureka server的服务列表,其中包含了当前应用的状态, //当前实例的成员变量lastRemoteInstanceStatus,记录的是最后一次更新的当前应用状态, //上述两种状态在updateInstanceRemoteStatus方法中作比较 ,如果不一致,就更新lastRemoteInstanceStatus,并且广播对应的事件 updateInstanceRemoteStatus(); // registry was fetched successfully, so return true return true; } }全量更新本地缓存的服务列表
- getAndStoreFullRegistry方法负责全量更新,代码如下所示,非常简单的逻辑:
小结:
- 1、官方文档对整个过程做了准确的总结,围绕着这些总结去看代码,能够事半功倍,重要是整个过程都保持的正确的方向,不会由于细节的干扰而偏离主线;
- 2、Eureka的注册中心设计,尽管多个client轮询请求会增加服务器压力,但使用增量更新再加上Server自身缓存3分钟数据的方式,可以有效的减少数据量和相关的计算,再加上一致性哈希码来弥补增量更新的弊端,在性能和完整性方面都有了保证,另外增量更新不需要client的时间戳,这样既节省性能又简化了实现逻辑,这种设计方式值得我们学习;
HeartbeatThread——服务定时续约线程
HeartbeatThread类中,通过调用renew方法实现续租,如下代码所示,方法注释已说明是Restfult请求来实现的,对应Eureka server的返回信息github.com/Netflix/eureka/wiki/Understanding-eureka-client-server-communication#renew
参考: www.cnblogs.com/zhixiang-org-cn/p/11689212.html
blog.csdn.net/u010647035/article/details/83245433
xinchen.blog.csdn.net/article/details/82915355
blog.csdn.net/boling_cavalry/article/details/82813180

