«

Nacos源码学习计划-Day11-集群-集群新增节点进行已有数据的同步

ZealSinger 发布于 阅读:100 技术文档


当我们为Nacos集群中新增一个Nacos节点,以及某个Nacos节点宕机后需要重启,这两个情况下都会需要同步其他节点的数据,所以这次我们围绕这个需求,来看看Nacos中如何实现的

找到主线

从这个需求来看,肯定是服务启动的时候就需要去进行同步,所以肯定关键点肯定在于一些初始化,init,构造方法中,所以这次的找主线任务是比较难的,只能通过大量阅读源码或者搜索一些可能的关键字或者注释尝试去找到。

通过查找NacosCluster,Member,Synchronization等关键字,最后在搜索LoadData关键字的时候,我们发现了这个

可以看到这里有个静态方法submitLoadDataTask,在GlobalExecutor中,从方法名以及入参等情况来看,这个很有可能就是提交加载数据任务的方法,那么其对应的调用地方,就会是和加载数据逻辑相关的地方

image-20251107224148785

网上查找调用该方法的地方,可以看到是一个DistroLoadDataTask任务类的run逻辑中调用了,并且在调用前会使用一个方法checkCompleted进行判断,可以猜测这里是“判断完整性,如果不完整则利用GlobalExecutor全局线程池提交加载数据的任务”

image-20251107224334701

那我们就分析这个DistroLoadDataTask的创建链路以及大致看一下checkCompleted的逻辑进行进一步的判断

DistroLoadDataTask的调用链路如下图,可以看到DistroLoadDataTask只有在startLoadTask这个方法中被创建且被提交,而startLoadTask这个方法在DisreoProtocol的构造方法中被调用,DisreoProtocol是一个Bean,也就是说这个加载任务在Spring Bean容器加载完DisreoProtocol之后就能被触发从而进行同步

image-20251108000617317

到这里为止,结合命名和任务的创建和调用时机,基本可以确定这个DistroLoadDataTask就是新增节点同步已有数据的逻辑,那么接下来,我们可以回过来看看这个Task的任务逻辑

分析主线

发起同步请求

可以看到,run的逻辑中,首先是load方法,我们查看load方法的逻辑,发信啊前面两个while就是等待distro初始化完毕,在最后一个for循环中会调用loadAllDataSnapshotFromRemote(each) 去获取其他集群节点上全部服务实例数据

image-20251108155216204

loadAllDataSnapshotFromRemote的逻辑如下

private boolean loadAllDataSnapshotFromRemote(String resourceType) {
   DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(resourceType);
   DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
   if (null == transportAgent || null == dataProcessor) {
       Loggers.DISTRO.warn("[DISTRO-INIT] Can't find component for type {}, transportAgent: {}, dataProcessor: {}",
           resourceType, transportAgent, dataProcessor);
       return false;
  }
   // 遍历除开自身节点以外的其他节点数量
   for (Member each : memberManager.allMembersWithoutSelf()) {
       try {
           Loggers.DISTRO.info("[DISTRO-INIT] load snapshot {} from {}", resourceType, each.getAddress());
           // 通过 HTTP 的方式获取其他集群节点的数据
           DistroData distroData = transportAgent.getDatumSnapshot(each.getAddress());
           
           // 操作返回结果,同步到自身节点内存注册表
           boolean result = dataProcessor.processSnapshot(distroData);
           Loggers.DISTRO
              .info("[DISTRO-INIT] load snapshot {} from {} result: {}", resourceType, each.getAddress(),
                   result);

           // 需要注意这里:只要有一个节点数据同步成功,就结束了
           if (result) {
               return true;
          }
      } catch (Exception e) {
           Loggers.DISTRO.error("[DISTRO-INIT] load snapshot {} from {} failed.", resourceType, each.getAddress(), e);
      }
  }
   return false;
}

OK,上面的注解可能一下子不能理解,尤其是getDatumSnapshot和processSnapshot这两个方法的逻辑,我们现在可以点进去大概看看这两个方法的逻辑,看看是不是我们所写的功能

首先是getDatumSnapshot,谁调用的我们暂且不需要在意,入参是每个Nacos节点的Address

如下图,很明显是给/v1/ns/distro/datums路径下发送了对应的HTTP请求获取目标节点的数据

image-20251108160926993

然后是processSnapshot,其入参就是上面getDatumSnapshot方法中获取到的目标节点的数据 这部分逻辑相对而言比较麻烦

我们之前在分析 Nacos 服务端处理实例注册的时候,采用的是内存队列+异步任务,在异步任务中,最终会调用listener.onChange 利用写时负责的机制来同步我们注册表,processSnapshot方法中,也是利用这个listener.onChange方法来同步注册表

    /**
    * 处理接收到的同步数据
    *
    * @param data 需要处理的字节数据
    * @return 处理是否成功
    * @throws Exception 处理过程中可能抛出的异常
    */
   private boolean processData(byte[] data) throws Exception {
       // 检查数据是否为空
       if (data.length > 0) {
           // 反序列化数据为实例映射
           Map<String, Datum<Instances>> datumMap = serializer.deserializeMap(data, Instances.class);

           // 第一轮处理:遍历所有数据项,将其存入数据存储,并处理服务不存在的情况
           for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {
               // 将数据放入本地数据存储
               dataStore.put(entry.getKey(), entry.getValue());

               // 如果当前key没有监听器,说明可能是新服务 这部分可以不要看
               if (!listeners.containsKey(entry.getKey())) {
                   // 检查是否启用默认临时实例模式
                   if (switchDomain.isDefaultInstanceEphemeral()) {
                       // 创建新的服务对象
                       Loggers.DISTRO.info("creating service {}", entry.getKey());
                       Service service = new Service();
                       // 从key中提取服务名和命名空间ID
                       String serviceName = KeyBuilder.getServiceName(entry.getKey());
                       String namespaceId = KeyBuilder.getNamespace(entry.getKey());
                       service.setName(serviceName);
                       service.setNamespaceId(namespaceId);
                       service.setGroupName(Constants.DEFAULT_GROUP);
                       // 设置最后修改时间并重新计算校验和
                       service.setLastModifiedMillis(System.currentTimeMillis());
                       service.recalculateChecksum();

                       // 获取服务元数据监听器,确保不为空
                       RecordListener listener = listeners.get(KeyBuilder.SERVICE_META_KEY_PREFIX).peek();
                       if (Objects.isNull(listener)) {
                           return false;
                      }
                       // 触发服务变更事件,创建服务
                       listener.onChange(KeyBuilder.buildServiceMetaKey(namespaceId, serviceName), service);
                  }
              }
          }

           // 第二轮处理:通知监听器数据变更
           for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {
               // 检查是否存在对应的监听器
               if (!listeners.containsKey(entry.getKey())) {
                   // 记录警告日志,正常情况下不应该发生
                   Loggers.DISTRO.warn("listener of {} not found.", entry.getKey());
                   continue;
              }

               try {
                   // 遍历所有监听器并触发变更事件
                   for (RecordListener listener : listeners.get(entry.getKey())) {
                       listener.onChange(entry.getKey(), entry.getValue().value);
                  }
              } catch (Exception e) {
                   // 记录错误日志,但继续处理其他监听器
                   Loggers.DISTRO.error("[NACOS-DISTRO] error while execute listener of key: {}", entry.getKey(), e);
                   continue;
              }

               // 监听器执行成功后,更新数据存储
               dataStore.put(entry.getKey(), entry.getValue());
          }
      }
       // 返回处理成功
       return true;
  }

从这一部分源码我们得知,在 Nacos 服务端项目启动时,会创建一个DistroProtocol Bean 对象,在这个 Bean 对象的构造方法中会开启一个异步任务,异步任务主要的逻辑就是通过 HTTP 的方式从其他集群节点获取全部的数据,然后最终把数据同步到自己的内存注册表中,完成数据同步。

如果有 N 个集群节点,并不是每一台都会去同步数据,而是只要有一台同步数据成功了,那整个逻辑就结束了。

在同步数据的时候,需要通过 HTTP 请求 /v1/ns/distro/datums 接口获取实例数据,那么我们接下来看看,这个接口的源码 Nacos 是怎么实现的。

处理同步请求

通过URL找到对应的Controller

核心就在于这个dataStore,这个我们在注册实例对象的时候分析过了。在我们分析实例注册的时候,会把我们实例信息存一份放在 dataStore的 Map 当中,在执行注册异步任务的时候,也是会通过 Key 从 dataStore的 Map 中取出实例信息,所以在这个里面,它是会包含整个实例信息数据。

在这里的数据同步,也是利用dataStore这个对象来实现的,而不是直接从内存注册表取。

public DistroData onSnapshot(String type) {
   DistroDataStorage distroDataStorage = distroComponentHolder.findDataStorage(type);
   if (null == distroDataStorage) {
       Loggers.DISTRO.warn("[DISTRO] Can't find data storage for received key {}", type);
       return new DistroData(new DistroKey("snapshot", type), new byte[0]);
  }
   // 核心看这个方法
   return distroDataStorage.getDatumSnapshot();
}

@Override
public DistroData getDatumSnapshot() {
   // 从 dataStore 里面获取
   Map<String, Datum> result = dataStore.getDataMap();
   byte[] dataContent = ApplicationUtils.getBean(Serializer.class).serialize(result);
   DistroKey distroKey = new DistroKey("snapshot", KeyBuilder.INSTANCE_LIST_KEY_PREFIX);
   return new DistroData(distroKey, dataContent);
}

总结

 

编程 Java 项目



收到1条评论
avatar
ZealSinger 20 天前
DistroLoadDataTask这个任务类的命名我们可以看到,其实就是和Distro协议相关的。Nacos采用了AP和CP双设计,其中Distro协议就是为了满足AP的最终一致性协议,在上述的分析中,我们也能看到整个同步任务居然只需要一个节点同步成功之后就能直接return了,很明显不能保证强一致性。关于Disrtro协议的内容我们后续会讲。
回复