«

Nacos补档-客户端心跳同步问题.md

ZealSinger 发布于 阅读:30 技术文档


我们今天来讨论一下Nacos客户端的心跳同步的一个问题

我们知道如下几个点

  1. Nacos客户端会和服务端进行心跳同步

  2. Nacos服务端在集群环境下采用单个责任节点+多个从节点的方式管理,责任节点负责负责健康检查,心跳处理等操作,从节点仅仅担任数据同步工作

  3. 一个微服务实例连接集群的条件下,请求可能会因为负载策略发送到任意一个Nacos节点上

那么综合上述,我们可以引出一个问题:如果客户端的心跳请求发送到了非责任节点会怎么处理?

这个地方可以参考Nacos仓库下面的一个issue

server端收到client beat请求后,不更新last beat时间,导致实际被判定为unhealthy · Issue #5363 · alibaba/nacos

针对上述问题,我们来举个例子:假设现在我们有三个集群节点,member1、member2、member3,又假设 member1 是负责 sotck-service 健康检查任务的节点。

stock-serivce 客户端发送心跳请求的时候,是随机发送到集群中某一个节点上的,假设 stock-service 没有给 member1 发送心跳请求,如果 lastBeat 属性不同步给集群节点,那么 member1 在做健康检查的时候,是不是会判断 stock-service 为不健康的实例?

我们可以看一下issue上大佬们关于这个问题的讨论与回复

image-20260124220202300

image-20260124220213185

按照大佬的意思就是:心跳请求应该是随机发送到集群中某一台上,然后通过distro协议转发到该服务的责任节点去统一更新实例心跳时间。且心跳时间不会进行同步。

我们可以自己扒一下源码来了解一下

我们来看到服务端对于心跳的处理这一块,可以看到有一个@CanDistro的注解

image-20260124220613615

这个注解是一个自定义注解,但是他的作用好像不是很明了,我们可以看一下哪个地方用到了这个注解,发现这里有个Filter过滤器中用到了

image-20260124220733729

我们来看一下这个过滤器,可以看到最终其匹配的URL是所有的/v1/ns/*的路径

image-20260124220851564

接下来我们就可以来看一下Filter中的过滤逻辑,代码很长,但是我们主要看一下重要的部分,即有我写了注释的那一行其实就行了

@Override
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain)
       throws IOException, ServletException {
   ReuseHttpRequest req = new ReuseHttpServletRequest((HttpServletRequest) servletRequest);
   HttpServletResponse resp = (HttpServletResponse) servletResponse;
   
   String urlString = req.getRequestURI();
   
   if (StringUtils.isNotBlank(req.getQueryString())) {
       urlString += "?" + req.getQueryString();
  }
   
   try {
       String path = new URI(req.getRequestURI()).getPath();
       String serviceName = req.getParameter(CommonParams.SERVICE_NAME);

       if (StringUtils.isBlank(serviceName)) {
           serviceName = req.getParameter("dom");
      }
       
       if (StringUtils.isNotBlank(serviceName)) {
           serviceName = serviceName.trim();
      }
       Method method = controllerMethodsCache.getMethod(req);
       
       if (method == null) {
           throw new NoSuchMethodException(req.getMethod() + " " + path);
      }
       
       String groupName = req.getParameter(CommonParams.GROUP_NAME);
       if (StringUtils.isBlank(groupName)) {
           groupName = Constants.DEFAULT_GROUP;
      }
       

       String groupedServiceName = serviceName;
       if (StringUtils.isNotBlank(serviceName) && !serviceName.contains(Constants.SERVICE_INFO_SPLITER)) {
           groupedServiceName = groupName + Constants.SERVICE_INFO_SPLITER + serviceName;
      }
       
// 判断是否带有CanDistro注解 && responsible方法即判断是否为责任节点
       if (method.isAnnotationPresent(CanDistro.class) && !distroMapper.responsible(groupedServiceName)) {
           
           String userAgent = req.getHeader(HttpHeaderConsts.USER_AGENT_HEADER);
           
           if (StringUtils.isNotBlank(userAgent) && userAgent.contains(UtilsAndCommons.NACOS_SERVER_HEADER)) {

               Loggers.SRV_LOG.error("receive invalid redirect request from peer {}", req.getRemoteAddr());
               resp.sendError(HttpServletResponse.SC_BAD_REQUEST,
                       "receive invalid redirect request from peer " + req.getRemoteAddr());
               return;
          }
           // 得到目标的真正的责任节点
           final String targetServer = distroMapper.mapSrv(groupedServiceName);
           
           List<String> headerList = new ArrayList<>(16);
           Enumeration<String> headers = req.getHeaderNames();
           while (headers.hasMoreElements()) {
               String headerName = headers.nextElement();
               headerList.add(headerName);
               headerList.add(req.getHeader(headerName));
          }
           
           final String body = IoUtils.toString(req.getInputStream(), Charsets.UTF_8.name());
           final Map<String, String> paramsValue = HttpClient.translateParameterMap(req.getParameterMap());
           
           RestResult<String> result = HttpClient
                  .request("http://" + targetServer + req.getRequestURI(), headerList, paramsValue, body,
                           PROXY_CONNECT_TIMEOUT, PROXY_READ_TIMEOUT, Charsets.UTF_8.name(), req.getMethod());
           String data = result.ok() ? result.getData() : result.getMessage();
           try {
               WebUtils.response(resp, data, result.getCode());
          } catch (Exception ignore) {
               Loggers.SRV_LOG.warn("[DISTRO-FILTER] request failed: " + distroMapper.mapSrv(groupedServiceName)
                       + urlString);
          }
      } else {
           OverrideParameterRequestWrapper requestWrapper = OverrideParameterRequestWrapper.buildRequest(req);
           requestWrapper.addParameter(CommonParams.SERVICE_NAME, groupedServiceName);
           filterChain.doFilter(requestWrapper, resp);
      }
  } catch (AccessControlException e) {
       resp.sendError(HttpServletResponse.SC_FORBIDDEN, "access denied: " + ExceptionUtil.getAllExceptionMsg(e));
  } catch (NoSuchMethodException e) {
       resp.sendError(HttpServletResponse.SC_NOT_IMPLEMENTED,
               "no such api:" + req.getMethod() + ":" + req.getRequestURI());
  } catch (Exception e) {
       resp.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
               "Server failed," + ExceptionUtil.getAllExceptionMsg(e));
  }
   
}

那么可以看到,所有的标记了@CanDistro注解的请求,都会请求的时候判断:如果当前处理请求的集群节点,不是这个 service 的责任节点,那么这个过滤器就会转发到对应的责任节点上做处理。

那我们回到最开始的问题上:lastBeat 属性是如何同步的?

心跳检测在请求到达节点后就能自行检测是否责任节点上,如果不在则会直接发送给责任节点,而不是等待全体同步,所以可以理解为心跳时间是定向同步而非全体同步,心跳可能会发送到非责任节点,但是非责任节点一定会检测并且转发给责任节点,责任节点对于心跳的数据不会也没有必要去同步到别的节点上,因为别的节点完全不需要也不负责

编程 Java 项目