⭐⭐⭐ Spring Boot 项目实战 ⭐⭐⭐ Spring Cloud 项目实战
《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 http://www.iocoder.cn/SkyWalking/agent-send-trace/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文主要基于 SkyWalking 3.2.6 正式版


🙂🙂🙂关注**微信公众号:【芋道源码】**有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

1. 概述

分布式链路追踪系统,链路的追踪大体流程如下:

  1. Agent 收集 Trace 数据。
  2. Agent 发送 Trace 数据给 Collector
  3. Collector 接收 Trace 数据。
  4. Collector 存储 Trace 数据到存储器,例如,数据库。

本文主要分享【第二部分】 SkyWalking Agent 发送 Trace 数据

考虑到减少外部组件的依赖,Agent 收集到 Trace 数据后,不是写入外部消息队列( 例如,Kafka )或者日志文件,而是 Agent 写入内存消息队列后台线程异步】发送给 Collector 。

本文涉及的类非常少,如下图所示:

2. TraceSegmentServiceClient

org.skywalking.apm.agent.core.remote.TraceSegmentServiceClient ,TraceSegment 发送服务客户端。它是一个服务,也是一个客户端,负责将 TraceSegment 异步发送到 Collector 。

我们先来看看 TraceSegmentServiceClient 的属性

  • TIMEOUT 静态属性,发送等待超时时长,单位:毫秒。
  • lastLogTime 属性,最后打印日志时间。该属性主要用于开发调试。
  • segmentUplinkedCounter 属性,TraceSegment 发送数量。
  • segmentAbandonedCounter 属性,TraceSegment 被丢弃数量。在 Agent 未连接上 Collector 时,产生的 TraceSegment 将被丢弃。
  • carrier 属性,内存队列。在 《SkyWalking 源码分析 —— DataCarrier 异步处理库》 有对 DataCarrier 的详细解析。
  • serviceStub 属性,非阻塞 Stub 。
  • status 属性,连接状态。

下面,我们来介绍 TraceSegmentServiceClient 实现的接口以及对应的方法。

2.1 实现 BootService 接口

#beforeBoot() 方法,代码如下:

  • 第 86 行:调用 GRPCChannelManager#addChannelListener(this) 方法,将自己添加到 GRPCChannelManager 中,作为一个监听器,从而调用 #statusChanged(GRPCChannelStatus) 方法,实现对连接状态( status )的监听处理。

#boot() 方法,代码如下:

  • 第 95 至 97 行:创建 DataCarrier 对象,作为内存队列,并设置自己作为消费者,从而调用 #consume(List<TraceSegment> ) 方法,实现异步发送 TraceSegment 到 Collector 。

#afterBoot() 方法,代码如下:

  • 第 102 行:调用 TracingContext.ListenerManager#add(this) 方法,将自己添加到 ListenerManager 中,作为一个监听器,从而调用 #afterFinished(TraceSegment) 方法,实现收集到新的 TraceSegment ,添加到内存队列。

#shutdown() 方法,代码如下:

  • 第 107 行:调用 DataCarrier#shutdownConsumers() 方法,停止消费。

2.2 实现 GRPCChannelListener 接口

#statusChanged(GRPCChannelStatus) 方法,代码如下:

  • 第 211 至 214 行:连接成功,创建 Stub 对象。
  • 第 215 行:记录连接状态。

2.3 实现 TracingContextListener 接口

#afterFinished(TraceSegment) 方法,代码如下:

  • 第 197 至 199 行:当 TraceSegment.ignore = true 时,忽略该 TraceSegment 。
  • 第 201 行:提交 TraceSegment 到内存队列。

2.4 实现 IConsumer 接口

#consume(List<TraceSegment>) 方法,代码如下:

ps:目前 DataCarrier 最长每 20 秒消费一次。

#onError(List<TraceSegment>, Throwable) 方法,当消费发生异常时,打印日志。

666. 彩蛋

知识星球

HOHO ,简单水更一篇,保持 Trace 相关的小系列文章,主题明确。

胖友,分享个朋友圈可好?

文章目录
  1. 1. 1. 概述
  2. 2. 2. TraceSegmentServiceClient
    1. 2.1. 2.1 实现 BootService 接口
    2. 2.2. 2.2 实现 GRPCChannelListener 接口
    3. 2.3. 2.3 实现 TracingContextListener 接口
    4. 2.4. 2.4 实现 IConsumer 接口
  3. 3. 666. 彩蛋