以下文章来源Java架构师技术,回复”Spring“获惊喜礼包
大家好,我是Java架构师
今天来分享一下Nacos注册中心的底层原理,从服务注册到服务发现,非常细致

1. Nacos介绍

再讲Nacos之前,先来讲一下服务注册和发现。我们知道,现在微服务架构是目前开发的一个趋势。服务消费者要去调用多个服务提供者组成的集群。这里需要做到以下几点:
  1. 服务消费者需要在本地配置文件中维护服务提供者集群的每个节点的请求地址。
  2. 服务提供者集群中如果某个节点宕机,服务消费者的本地配置中需要同步删除这个节点的请求地址,防止请求发送到已经宕机的节点上造成请求失败。
因此需要引入服务注册中心,它具有以下几个功能:
  1. 服务地址的管理。
  2. 服务注册。
  3. 服务动态感知。
而Nacos致力于解决微服务中的统一配置,服务注册和发现等问题。Nacos集成了注册中心和配置中心。其相关特性包括:
1.服务发现和服务健康监测
Nacos支持基于DNS和RPC的服务发现,即服务消费者可以使用DNS或者HTTP的方式来查找和发现服务。Nacos提供对服务的实时的健康检查,阻止向不健康的主机或者服务实例发送请求。Nacos支持传输层(Ping/TCP)、应用层(HTTP、Mysql)的健康检查。
2.动态配置服务
动态配置服务可以以中心化、外部化和动态化的方式管理所有环境的应用配置和服务配置。
3.动态DNS服务
支持权重路由,让开发者更容易的实现中间层的负载均衡、更灵活的路由策略、流量控制以及DNS解析服务。
4.服务和元数据管理
Nacos允许开发者从微服务平台建设的视角来管理数据中心的所有服务和元数据。如:服务的生命周期、静态依赖分析、服务的健康状态、服务的流量管理、路由和安全策略等。

2. Nacos注册中心实现原理分析

2.1 Nacos架构图

以下是Nacos的架构图:
图片
其中分为这么几个模块:
  • Provider APP:服务提供者。
  • Consumer APP:服务消费者。
  • Name Server:通过Virtual IP或者DNS的方式实现Nacos高可用集群的服务路由。
  • Nacos Server:Nacos服务提供者。
    • OpenAPI:功能访问入口。
    • Config Service、Naming Service:Nacos提供的配置服务、名字服务模块。
    • Consistency Protocol:一致性协议,用来实现Nacos集群节点的数据同步,使用Raft算法实现。
其中包含:
  • Nacos Console:Nacos控制台。
小总结:
  • 服务提供者通过VIP(Virtual IP)访问Nacos Server高可用集群,基于OpenAPI完成服务的注册和服务的查询。
  • Nacos Server的底层则通过数据一致性算法(Raft)来完成节点的数据同步。

2.2 注册中心的原理

这里对其原理做一个大致的介绍,在后文则从源码角度进行分析。
首先,服务注册的功能体现在:
  • 服务实例启动时注册到服务注册表、关闭时则注销(服务注册)。
  • 服务消费者可以通过查询服务注册表来获得可用的实例(服务发现)。
  • 服务注册中心需要调用服务实例的健康检查API来验证其是否可以正确的处理请求(健康检查)。
Nacos服务注册和发现的实现原理的图如下:
图片

3. Nacos源码分析

前提(在本地或者虚机上先启动好Nacos) 这一部分从2个角度来讲Nacos是如何实现的:
  • 服务注册。
  • 服务发现

3.1 Nacos服务注册

首先看下一个包:spring-cloud-commons
图片
这个ServiceRegistry接口是SpringCloud提供的服务注册的标准,集成到SpringCloud中实现服务注册的组件,都需要实现这个接口。来看下它的结构:
publicinterfaceServiceRegistry<RextendsRegistration
{  

voidregister(R registration)
;  


voidderegister(R registration)
;  


voidclose()
;  


voidsetStatus(R registration, String status)
;  


    <T> 
getStatus(R registration)
;  

}  

那么对于Nacos而言,该接口的实现类是NacosServiceRegistry,该类在这个pom包下:
图片
再回过头来看spring-cloud-commons包:
图片
spring.factories主要是包含了自动装配的配置信息,如图:
图片
在我之前的文章里我有提到过,在spring.factories中配置EnableAutoConfiguration的内容后,项目在启动的时候,会导入相应的自动配置类,那么也就允许对该类的相关属性进行一个自动装配。那么显然,在这里导入了AutoServiceRegistrationAutoConfiguration这个类,而这个类顾名思义是服务注册相关的配置类。
该类的完整代码如下:
@Configuration
(  

    proxyBeanMethods = 
false
)  

@Import
({AutoServiceRegistrationConfiguration
.
class
})  

@
ConditionalOnProperty
(  

value
= {
"spring.cloud.service-registry.auto-registration.enabled"
},  

    matchIfMissing = 
true
)  

publicclassAutoServiceRegistrationAutoConfiguration
{  

@Autowired
(  

        required = 
false
    )  

private
 AutoServiceRegistration autoServiceRegistration;  

@Autowired
private
 AutoServiceRegistrationProperties properties;  


publicAutoServiceRegistrationAutoConfiguration()
{  

    }  


@PostConstruct
protectedvoidinit()
{  

if
 (
this
.autoServiceRegistration == 
null
 && 
this
.properties.isFailFast()) {  

thrownew
 IllegalStateException(
"Auto Service Registration has been requested, but there is no AutoServiceRegistration bean"
);  

        }  

    }  

}  

这里做一个分析,AutoServiceRegistrationAutoConfiguration中注入了AutoServiceRegistration实例,该类的关系图如下:
图片
我们先来看一下这个抽象类AbstractAutoServiceRegistration
publicabstractclassAbstractAutoServiceRegistration
<
RextendsRegistration
implementsAutoServiceRegistration
,   

ApplicationContextAware
,   

ApplicationListener
<
WebServerInitializedEvent
{  

publicvoidonApplicationEvent(WebServerInitializedEvent event)
{  

this
.bind(event);  

 }  

}  

这里实现了ApplicationListener接口,并且传入了WebServerInitializedEvent作为泛型,啥意思嘞,意思是:
  • NacosAutoServiceRegistration监听WebServerInitializedEvent事件。
  • 也就是WebServer初始化完成后,会调用对应的事件绑定方法,调用onApplicationEvent(),该方法最终调用NacosServiceRegistryregister()方法(NacosServiceRegistry实现了Spring的一个服务注册标准接口)。
对于register()方法,主要调用的是Nacos Client SDK中的NamingService下的registerInstance()方法完成服务的注册。
publicvoidregister(Registration registration)
{  

if
 (StringUtils.isEmpty(registration.getServiceId())) {  

        log.warn(
"No service to register for nacos client..."
);  

    } 
else
 {  

        String serviceId = registration.getServiceId();  

        String group = 
this
.nacosDiscoveryProperties.getGroup();  

        Instance instance = 
this
.getNacosInstanceFromRegistration(registration);  


try
 {  

this
.namingService.registerInstance(serviceId, group, instance);  

            log.info(
"nacos registry, {} {} {}:{} register finished"
new
 Object[]{group, serviceId, instance.getIp(), instance.getPort()});  

        } 
catch
 (Exception var6) {  

            log.error(
"nacos registry, {} register failed...{},"
new
 Object[]{serviceId, registration.toString(), var6});  

            ReflectionUtils.rethrowRuntimeException(var6);  

        }  


    }  

}  


publicvoidregisterInstance(String serviceName, String groupName, Instance instance)throws NacosException 
{  

if
 (instance.isEphemeral()) {  

        BeatInfo beatInfo = 
new
 BeatInfo();  

        beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));  

        beatInfo.setIp(instance.getIp());  

        beatInfo.setPort(instance.getPort());  

        beatInfo.setCluster(instance.getClusterName());  

        beatInfo.setWeight(instance.getWeight());  

        beatInfo.setMetadata(instance.getMetadata());  

        beatInfo.setScheduled(
false
);  

long
 instanceInterval = instance.getInstanceHeartBeatInterval();  

        beatInfo.setPeriod(instanceInterval == 
0L
 ? DEFAULT_HEART_BEAT_INTERVAL : instanceInterval);  

// 1.addBeatInfo()负责创建心跳信息实现健康监测。因为Nacos Server必须要确保注册的服务实例是健康的。  
// 而心跳监测就是服务健康监测的一种手段。  
this
.beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);  

    }  

// 2.registerService()实现服务的注册  
this
.serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);  

}  

再来看一下心跳监测的方法addBeatInfo()
publicvoidaddBeatInfo(String serviceName, BeatInfo beatInfo)
{  

    LogUtils.NAMING_LOGGER.info(
"[BEAT] adding beat: {} to beat map."
, beatInfo);  

    String key = 
this
.buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());  

    BeatInfo existBeat = 
null
;  

if
 ((existBeat = (BeatInfo)
this
.dom2Beat.remove(key)) != 
null
) {  

        existBeat.setStopped(
true
);  

    }  


this
.dom2Beat.put(key, beatInfo);  

// 通过schedule()方法,定时的向服务端发送一个数据包,然后启动一个线程不断地检测服务端的回应。  
// 如果在指定的时间内没有收到服务端的回应,那么认为服务器出现了故障。  
// 参数1:可以说是这个实例的相关信息。  
// 参数2:一个long类型的时间,代表从现在开始推迟执行的时间,默认是5000  
// 参数3:时间的单位,默认是毫秒,结合5000即代表每5秒发送一次心跳数据包  
this
.executorService.schedule(
new
 BeatReactor.BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);  

    MetricsMonitor.getDom2BeatSizeMonitor().set((
double
)
this
.dom2Beat.size());  

}  

心跳检查如果正常,即代表这个需要注册的服务是健康的,那么执行下面的注册方法registerInstance()
publicvoidregisterService(String serviceName, String groupName, Instance instance)throws NacosException 
{  

    LogUtils.NAMING_LOGGER.info(
"[REGISTER-SERVICE] {} registering service {} with instance: {}"
new
 Object[]{
this
.namespaceId, serviceName, instance});  

    Map<String, String> params = 
new
 HashMap(
9
);  

    params.put(
"namespaceId"
this
.namespaceId);  

    params.put(
"serviceName"
, serviceName);  

    params.put(
"groupName"
, groupName);  

    params.put(
"clusterName"
, instance.getClusterName());  

    params.put(
"ip"
, instance.getIp());  

    params.put(
"port"
, String.valueOf(instance.getPort()));  

    params.put(
"weight"
, String.valueOf(instance.getWeight()));  

    params.put(
"enable"
, String.valueOf(instance.isEnabled()));  

    params.put(
"healthy"
, String.valueOf(instance.isHealthy()));  

    params.put(
"ephemeral"
, String.valueOf(instance.isEphemeral()));  

    params.put(
"metadata"
, JSON.toJSONString(instance.getMetadata()));  

// 这里可以看出来,把上述服务实例的一些必要参数保存到一个Map中,通过OpenAPI的方式发送注册请求  
this
.reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, (String)
"POST"
);  

}  

下面直接Debug走一遍。两个前提(这里不再展开):
  • 启动一个Nacos服务。
  • 搞一个Maven项目,集成Nacos。

案例1:用Debug来理解Nacos服务注册流程

1.项目初始化后,根据上文说法,会执行抽象类AbstractAutoServiceRegistration下面的onApplicationEvent()方法,即事件被监听到。
图片
2.作为抽象类的子类实现NacosAutoServiceRegistration,监听到Web服务启动后, 开始执行super.register()方法。
图片
3.执行NacosServiceRegistry下的register()方法(super),前面说过,集成到SpringCloud中实现服务注册的组件,都需要实现ServiceRegistry这个接口,而对于Nacos而言,NacosServiceRegistry就是具体的实现子类。执行注册方法需要传入的三个参数:
  • 实例名称serviceId。
  • 实例归属的组。
  • 具体实例
图片
registerInstance()主要做两件事:
  • 检查服务的健康(this.beatReactor.addBeatInfo())。
  • 执行服务的注册(this.serverProxy.registerService())。
服务健康的检查:检查通过后,发送OpenAPI进行服务的注册:

服务注册小总结☆:

这里来做一个大框架式的梳理(也许前面写的有点乱,这里通过几个问答的形式来进行总结)
问题1:Nacos的服务注册为什么和spring-cloud-commons这个包扯上关系?
回答:
  1. 首先,Nacos的服务注册肯定少不了pom包:spring-cloud-starter-alibaba-nacos-discovery吧。
  2. 这个包下面包括了spring-cloud-commons包,那么这个包有什么用?
  3. spring-cloud-commons中有一个接口叫做ServiceRegistry,而集成到SpringCloud中实现服务注册的组件,都需要实现这个接口。
  4. 因此对于需要注册到Nacos上的服务,也需要实现这个接口,那么具体的实现子类为NacosServiceRegistry
问题2:为什么我的项目加了这几个依赖,服务启动时依旧没有注册到Nacos中?
回答:
  1. 本文提到过,进行Nacos服务注册的时候,会有一个事件的监听过程,而监听的对象是WebServer,因此,这个项目需要是一个Web项目!
  2. 因此查看你的pom文件中是否有依赖:spring-boot-starter-web
问题3:除此之外,spring-cloud-commons这个包还有什么作用?
回答:
  1. 这个包下的spring.factories文件中,配置了相关的服务注册的置类,即支持其自动装配。
  2. 这个配置类叫做AutoServiceRegistrationAutoConfiguration。其注入了类AutoServiceRegistration,而NacosAutoServiceRegistration是该类的一个具体实现。
  3. 当WebServer初始化的时候,通过绑定的事件监听器,会实现监听,执行服务的注册逻辑。另外,搜索公众号Java就该这么学后台回复“面试”,获取一份惊喜礼包。
说白了:
  1. 第一件事情:引入一个Spring监听器,当容器初始化后,执行Nacos服务的注册。
  2. 第二件事情:而Nacos服务注册的方法的实现,其需要实现的接口来自于该包下的ServiceRegistry接口。

接下来就对Nacos注册的流程进行一个总结:
  1. 服务(项目)启动时,根据spring-cloud-commonsspring.factories的配置,自动装配了类AutoServiceRegistrationAutoConfiguration
  2. AutoServiceRegistrationAutoConfiguration类中注入了类AutoServiceRegistration,其最终实现子类实现了Spring的监听器。
  3. 根据监听器,执行了服务注册方法。而这个服务注册方法则是调用了NacosServiceRegistryregister()方法。
  4. 该方法主要调用的是Nacos Client SDK中的NamingService下的registerInstance()方法完成服务的注册。
  5. registerInstance()方法主要做两件事:服务实例的健康监测和实例的注册。
  6. 通过schedule()方法定时的发送数据包,检测实例的健康。
  7. 若健康监测通过,调用registerService()方法,通过OpenAPI方式执行服务注册,其中将实例Instance的相关信息存储到HashMap中。

3.2 Nacos服务发现

有一点我们需要清楚:Nacos服务的发现发生在什么时候。例如:微服务发生远程接口调用的时候。一般我们在使用OpenFeign进行远程接口调用时,都需要用到对应的微服务名称,而这个名称就是用来进行服务发现的。
举个例子:
@FeignClient
(
"test-application"
)  

publicinterfaceMyFeignService
{  

@RequestMapping
(
"getInfoById"
)  

info(@PathVariable("id") Long id)
;  

}  

接下来直接开始讲重点,Nacos在进行服务发现的时候,会调用NacosServerList类下的getServers()方法:
publicclassNacosServerListextendsAbstractServerList<NacosServer
{  

private List<NacosServer> getServers()
{  

try
 {  

            String group = 
this
.discoveryProperties.getGroup();  

// 1.通过唯一的serviceId(一般是服务名称)和组来获得对应的所有实例。  
            List<Instance> instances = 
this
.discoveryProperties.namingServiceInstance().selectInstances(
this
.serviceId, group, 
true
);  

// 2.将List<Instance>转换成List<NacosServer>数据,然后返回。  
returnthis
.instancesToServerList(instances);  

        } 
catch
 (Exception var3) {  

thrownew
 IllegalStateException(
"Can not get service instances from nacos, serviceId="
 + 
this
.serviceId, var3);  

        }  

    }  

}  

接下来来看一下NacosNamingService.selectInstances()方法:
public List<Instance> selectInstances(String serviceName, String groupName, boolean healthy)throws NacosException 
{  

returnthis
.selectInstances(serviceName, groupName, healthy, 
true
);  

}  

该方法最终会调用到其重载方法:
public List<Instance> selectInstances
(String serviceName, String groupName, List<String> clusters,   

boolean
 healthy, 
boolean
 subscribe)
throws NacosException 
{  

// 保存服务实例信息的对象  
    ServiceInfo serviceInfo;  

// 如果该消费者订阅了这个服务,那么会在本地维护一个服务列表,服务从本地获取  
if
 (subscribe) {  

        serviceInfo = 
this
.hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, 
","
));  

    } 
else
 {  

// 否则实例会从服务中心进行获取。  
        serviceInfo = 
this
.hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, 
","
));  

    }  


returnthis
.selectInstances(serviceInfo, healthy);  

}  

这里应该重点关注this.hostReactor这个对象,它里面比较重要的是几个Map类型的存储结构:
publicclassHostReactor
{  

privatestaticfinallong
 DEFAULT_DELAY = 
1000L
;  

privatestaticfinallong
 UPDATE_HOLD_INTERVAL = 
5000L
;  

// 存放线程异步调用的一个回调结果  
privatefinal
 Map<String, ScheduledFuture<?>> futureMap;  

// 本地已存在的服务列表,key是服务名称,value是ServiceInfo  
private
 Map<String, ServiceInfo> serviceInfoMap;  

// 待更新的实例列表  
private
 Map<String, Object> updatingMap;  

// 定时任务(负责服务列表的实时更新)  
private
 ScheduledExecutorService executor;  

    ....  

}  

再看一看它的getServiceInfo()方法:
public ServiceInfo getServiceInfo(String serviceName, String clusters)
{  

    LogUtils.NAMING_LOGGER.debug(
"failover-mode: "
 + 
this
.failoverReactor.isFailoverSwitch());  

    String key = ServiceInfo.getKey(serviceName, clusters);  

if
 (
this
.failoverReactor.isFailoverSwitch()) {  

returnthis
.failoverReactor.getService(key);  

    } 
else
 {  

// 1.先通过serverName即服务名获得一个serviceInfo  
        ServiceInfo serviceObj = 
this
.getServiceInfo0(serviceName, clusters);  

// 如果没有serviceInfo,则通过传进来的参数new出一个新的serviceInfo对象,并且同时维护到本地Map和更新Map  
// 这里是serviceInfoMap和updatingMap  
if
 (
null
 == serviceObj) {  

            serviceObj = 
new
 ServiceInfo(serviceName, clusters);  

this
.serviceInfoMap.put(serviceObj.getKey(), serviceObj);  

this
.updatingMap.put(serviceName, 
new
 Object());  

// 2.updateServiceNow(),立刻去Nacos服务端拉去数据。  
this
.updateServiceNow(serviceName, clusters);  

this
.updatingMap.remove(serviceName);  

        } 
elseif
 (
this
.updatingMap.containsKey(serviceName)) {  

synchronized
(serviceObj) {  

try
 {  

                    serviceObj.wait(
5000L
);  

                } 
catch
 (InterruptedException var8) {  

                    LogUtils.NAMING_LOGGER.error(
"[getServiceInfo] serviceName:"
 + serviceName + 
", clusters:"
 + clusters, var8);  

                }  

            }  

        }  

// 3.定时更新实例信息  
this
.scheduleUpdateIfAbsent(serviceName, clusters);  

// 最后返回服务实例数据(前面已经进行了更新)  
return
 (ServiceInfo)
this
.serviceInfoMap.get(serviceObj.getKey());  

    }  

}  

来看下scheduleUpdateIfAbsent()方法:
// 通过心跳的方式,每10秒去更新一次数据,并不是只有在调用服务的时候才会进行更新,而是通过定时任务来异步进行。  
publicvoidscheduleUpdateIfAbsent(String serviceName, String clusters)
{  

if
 (
this
.futureMap.get(ServiceInfo.getKey(serviceName, clusters)) == 
null
) {  

synchronized
(
this
.futureMap) {  

if
 (
this
.futureMap.get(ServiceInfo.getKey(serviceName, clusters)) == 
null
) {  

// 创建一个UpdateTask的更新线程任务,每10秒去异步更新集合数据  
                ScheduledFuture<?> future = 
this
.addTask(
new
 HostReactor.UpdateTask(serviceName, clusters));  

this
.futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);  

            }  

        }  

    }  

}  

案例2:用Debug来理解Nacos服务发现流程

1.进行远程接口调用,触发服务的发现,调用NacosServerListgetServers()方法。传入的serviceId和对应Feign接口上的接口@FeignClient中的名称一致。
图片
例如,我这里调用的Feign接口是:
@FeignClient
(
"gulimall-member"
)  

publicinterfaceMemberFeignService
{  

@RequestMapping
(
"/member/member/info/{id}"
)  

info(@PathVariable("id") Long id)
;  

}  

这里可以看出来,返回的是一个Instance类型的List,对应的服务也发现并返回了。
2.这里则调用了NacosNamingServiceselectInstances()方法,我这里的subscribe值是true,即代表我这个消费者直接订阅了这个服务,因此最终的信息是从本地Map中获取,即Nacos维护了一个注册列表。
3.再看下HostReactor的getServiceInfo()方法:最终所需要的结果是从serviceInfoMap中获取,并且通过多个Map进行维护服务实例,若存在数据的变化,还会通过强制睡眠5秒钟的方式来等待数据的更新。
4.无论怎样都会调用this.scheduleUpdateIfAbsent(serviceName, clusters)方法。
5.通过scheduleUpdateIfAbsent()方法定时的获取实时的实例数据,并且负责维护本地的服务注册列表,若服务发生更新,则更新本地的服务数据。

服务发现小总结☆:

经常有人说过,Nacos有个好处,就是当一个服务挂了之后,短时间内不会造成影响,因为有个本地注册列表,在服务不更新的情况下,服务还能够正常的运转,其原因如下:
  1. Nacos的服务发现,一般是通过订阅的形式来获取服务数据。
  2. 而通过订阅的方式,则是从本地的服务注册列表中获取(可以理解为缓存)。相反,如果不订阅,那么服务的信息将会从Nacos服务端获取,这时候就需要对应的服务是健康的。(宕机就不能使用了)
  3. 在代码设计上,通过Map来存放实例数据,key为实例名称,value为实例的相关信息数据(ServiceInfo对象)。
最后,服务发现的流程就是:
  1. 以调用远程接口(OpenFeign)为例,当执行远程调用时,需要经过服务发现的过程。
  2. 服务发现先执行NacosServerList类中的getServers()方法,根据远程调用接口上@FeignClient中的属性作为serviceId传入NacosNamingService.selectInstances()方法中进行调用。
  3. 根据subscribe的值来决定服务是从本地注册列表中获取还是从Nacos服务端中获取。
  4. 以本地注册列表为例,通过调用HostReactor.getServiceInfo()来获取服务的信息(serviceInfo),Nacos本地注册列表由3个Map来共同维护。
本地Map–>serviceInfoMap,
更新Map–>updatingMap
异步更新结果Map–>futureMap,
最终的结果从serviceInfoMap当中获取。
  1. HostReactor类中的getServiceInfo()方法通过this.scheduleUpdateIfAbsent() 方法和updateServiceNow()方法实现服务的定时更新和立刻更新。
  2. 而对于scheduleUpdateIfAbsent()方法,则通过线程池来进行异步的更新,将回调的结果(Future)保存到futureMap中,并且发生提交线程任务时,还负责更新本地注册列表中的数据。
欢迎有需要的同学试试,如果本文对您有帮助,也请帮忙点个 赞 + 在看 啦!❤️
在 程序员小乐 还有更多优质项目系统学习资源,欢迎分享给其他同学吧!
最后,整理了400多套项目,赠送读者。扫码下方二维码,后台回复赚钱即可获取。
--END--
继续阅读
阅读原文