GRPC 基础知识
基本介绍
gRPC 是一个高性能、开源和通用的 RPC 框架,面向服务端和移动端,基于 HTTP/2 协议标准而设计,默认使用 ProtoBuf(Protocol Buffers) 序列化协议进行开发,当前支持C、Java、Go等多种语言 gRPC提供了一种简单的方法来精确的定义服务,并且为客户端和服务端自动生成可靠的功能库。与很多RPC系统一样,服务端负责实现定义好的接口并处理客户端的请求,客户端根据接口描述直接本地调用需要的服务而不用去关心具体底层通信细节和调用过程。客户端和服务端可以分别使用gRPC支持的不同语言进行实现
基本通信流程

gRPC 通信的第一步是定义 IDL(Interface Definition Language) 即 proto 文件- 第二步是编译
proto 文件,得到存根 Stub 文件。Stub 中集成了服务调用、数据序列化等底层功能,客户端使用它与服务端进行交互。 - 第三步是服务端实现第一步定义的接口并启动,这些接口的定义也在存根
Stub 文件里面 - 最后一步是客户端借助
Stub 文件调用服务端的函数,虽然客户端调用的函数是有服务端实现的,但是调用起来就像是本地函数一样
多语言 Hello World 案例
案例获取地址
准备工作
Protobuf Support 插件官方已弃用,不支持IDEA19+的版本,由于我的电脑IDEA版本较低,所以用的这个 Protobuf Editor 可以支持最新版本的IDEA,但不支持部分老版本的
准备好 python 环境并安装 grpc 相关插件
pip3 install grpcio
pip3 install grpcio-tools
项目搭建
项目结构如下
grpc-sample
├── src
│ └── main
│ ├── java
│ │ └── io
│ │ └── grpc
│ │ └── sample
│ │ └── helloworld
│ ├── proto
│ ├── python
│ └── resources
└── pom.xml
创建一个 Maven 工程,配置 pom 文件,导入 gRPC 的依赖和编译插件
<properties>
<grpc-version>1.20.0</grpc-version>
</properties>
<dependencies>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-core</artifactId>
<version>${grpc-version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<version>${grpc-version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>${grpc-version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>${grpc-version}</version>
</dependency>
</dependencies>
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.5.0.Final</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.5.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:3.7.1:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:1.9.1:exe:${os.detected.classifier}</pluginArtifact>
<protoSourceRoot>src/main/proto</protoSourceRoot>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
在 main/proto 目录下创建 helloworld.proto 文件,定义服务和序列化数据结构
syntax = "proto3";
option java_multiple_files = true;
option java_package = "io.grpc.sample.helloworld";
option java_outer_classname = "HelloWorldProto";
package helloworld;
service Greeter {
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
message HelloRequest {
string name = 1;
}
message HelloReply {
string message = 1;
}
使用 protobuf-maven-plugin 插件编译 proto 文件生成 Java 文件
前面在 pom 文件中加入了 protobuf 插件后,可以在 maven projects 视图界面看到以下几个编译指令
Plugins
└── protobuf
├── protobuf:compile
├── protobuf:compile-cpp
├── protobuf:compile-custom
├── protobuf:compile-javanano
└── protobuf:compile-python
我们在使用时,只需要执行以下两个指令
protobuf:compile 默认在 target/generated-sources/protobuf/java 目录下生成消息文件protobuf:compile-custom 默认在 target/generated-sources/protobuf/grpc-java 目录下生成接口服务文件
执行完成后会生成如下文件
target/generated-sources/
├── annotations
└── protobuf
├── grpc-java
│ └── io/grpc/sample/helloworld
│ └── GreeterGrpc.java
└── java
└── io/grpc/sample/helloworld
├── HelloReply.java
├── HelloReplyOrBuilder.java
├── HelloRequest.java
├── HelloRequestOrBuilder.java
└── HelloWorldProto.java
使用 grpcio-tools 工具编译 proto 文件生成 Python 文件
cd proto
python -m grpc_tools.protoc --python_out=../python --grpc_python_out=../python -I. helloworld.proto
执行完成后会在 python 目录下生成如下文件
python
├── helloworld_pb2.py
└── helloworld_pb2_grpc.py
创建 Java 服务端
- 首通过监听端口创建
server 实例 - 然后重载
GreeterGrpc.GreeterImplBase 中定义的 sayHello 方法来提供服务的具体实现,并将该服务注册到 server 实例中 - 最后再启动
server 服务
package io.grpc.sample.helloworld;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
public class HelloWorldServer {
private static final Logger logger = Logger.getLogger(HelloWorldServer.class.getName());
private Server server;
private void start() throws IOException {
int port = 50051;
server = ServerBuilder.forPort(port)
.addService(new GreeterImpl())
.build()
.start();
logger.info("Server started, listening on " + port);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
System.err.println("*** shutting down gRPC server since JVM is shutting down");
try {
HelloWorldServer.this.stop();
} catch (InterruptedException e) {
e.printStackTrace(System.err);
}
System.err.println("*** server shut down");
}
});
}
private void stop() throws InterruptedException {
if (server != null) {
server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
}
}
private void blockUntilShutdown() throws InterruptedException {
if (server != null) {
server.awaitTermination();
}
}
public static void main(String[] args) throws IOException, InterruptedException {
final HelloWorldServer server = new HelloWorldServer();
server.start();
server.blockUntilShutdown();
}
static class GreeterImpl extends GreeterGrpc.GreeterImplBase {
@Override
public void sayHello(HelloRequest req, StreamObserver<HelloReply> responseObserver) {
HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + req.getName()).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
}
}
创建 Java 客户端
客户端的代码主要分为三个步骤
- 首先用
host 和 port 生成 channel 连接 - 然后用
channel 连接和前面生成的 GreeterGrpc 接口服务类创建 Stub - 最后使用
Stub 的 sayHello 方法发起真正的 Rpc 调用,至于后续的其他通信细节我们就感知不到了
package io.grpc.sample.helloworld;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
public class HelloWorldClient {
private static final Logger logger = Logger.getLogger(HelloWorldClient.class.getName());
private final GreeterGrpc.GreeterBlockingStub blockingStub;
public HelloWorldClient(Channel channel) {
blockingStub = GreeterGrpc.newBlockingStub(channel);
}
public void greet(String name) {
logger.info("Will try to greet " + name + " ...");
HelloRequest request = HelloRequest.newBuilder().setName(name).build();
HelloReply response;
try {
response = blockingStub.sayHello(request);
} catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
return;
}
logger.info("Greeter client received: " + response.getMessage());
}
public static void main(String[] args) throws Exception {
String name = "Java";
String target = "localhost:50051";
if (args.length > 0) {
if ("--help".equals(args[0])) {
System.err.println("Usage: [name [target]]");
System.err.println("");
System.err.println(" name The name you wish to be greeted by. Defaults to " + name);
System.err.println(" target The server to connect to. Defaults to " + target);
System.exit(1);
}
name = args[0];
}
if (args.length > 1) {
target = args[1];
}
ManagedChannel channel = ManagedChannelBuilder.forTarget(target)
.usePlaintext()
.build();
try {
HelloWorldClient client = new HelloWorldClient(channel);
client.greet(name);
} finally {
channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
}
}
}
创建 Python 客户端
from __future__ import print_function
import logging
import grpc
import helloworld_pb2
import helloworld_pb2_grpc
def run():
with grpc.insecure_channel('127.0.0.1:50051') as channel:
stub = helloworld_pb2_grpc.GreeterStub(channel)
response = stub.SayHello(helloworld_pb2.HelloRequest(name='Python'))
print("Greeter client received: " + response.message)
if __name__ == '__main__':
logging.basicConfig()
run()
结果验证
启动服务端并分别使用 Java 客户端和 Python 客户端进行调用

以 ServiceManagementClient 服务为例分析 Agent 与 OAP 的网络通信
我们知道,通过 Agent 代理的 Web 服务在启动之后不经过任何访问就能在 UI 控制台看到服务和实例信息(端点信息需要访问具体的接口后通过日志切面插件进行上报),这些信息是如何注册到 OAP 的呢?
Agent客户端
类图

网络连接管理
Skywalking Agent 采集到各种 Metric 信息后,就是通过 GRPC 的方式上报到 OAP 。 那么 Agent 作为客户端首先需要创建一个 Channel 连接。对于客户端来说创建和销毁 Channel 代价是昂贵的,但是建立一个 Stub 是很简单的,就像创建一个普通对象。因此 Channel 就需要复用,进而提高交互效率。
GRPCChannel
GRPC 中有两个抽象类
ManagedChannel 它逻辑上表示一个 Channel ,底层持有一个 TCP 链接ManagedChannelBuilder 它负责创建客户端 Channel ,常用的实现有三种
NettyChannelBuilder 底层采用 Netty 创建 ChannelOkHttpChannelBuilder 底层采用 OkHttp 创建 ChannelInProcessChannelBuilder 用于创建进程内通信使用的Channel
Skywalking 对 原生连接 ManagedChannel 进行了一些封装 org.skywalking.apm.agent.core.remote.GRPCChannel
private final ManagedChannel originChannel;
private final Channel channelWithDecorators;
private GRPCChannel(
String host, int port,
List<ChannelBuilder> channelBuilders,
List<ChannelDecorator> decorators
) throws Exception {
ManagedChannelBuilder channelBuilder = NettyChannelBuilder.forAddress(host, port);
for (ChannelBuilder builder : channelBuilders) {
channelBuilder = builder.build(channelBuilder);
}
this.originChannel = channelBuilder.build();
Channel channel = originChannel;
for (ChannelDecorator decorator : decorators) {
channel = decorator.build(channel);
}
channelWithDecorators = channel;
}
public static Builder newBuilder(String host, int port) {
return new Builder(host, port);
}
public static class Builder {
private final String host;
private final int port;
private final List<ChannelBuilder> channelBuilders;
private final List<ChannelDecorator> decorators;
private Builder(String host, int port) {
this.host = host;
this.port = port;
this.channelBuilders = new LinkedList<>();
this.decorators = new LinkedList<>();
}
public Builder addChannelDecorator(ChannelDecorator interceptor) {
this.decorators.add(interceptor);
return this;
}
public GRPCChannel build() throws Exception {
return new GRPCChannel(host, port, channelBuilders, decorators);
}
public Builder addManagedChannelBuilder(ChannelBuilder builder) {
channelBuilders.add(builder);
return this;
}
}
可以看到这里使用到了 装饰器模式 对原始连接进行了扩展,并用 建造者模式 生成装饰后的连接
GRPCChannelManager
该类是 Channel 的连接管理器,它主要负责生成 Channel 连接,提供给其他需要跟 OAP 交互的服务使用,并启动一个定时任务监控连接的活性,如果检测到底层连接处于关闭状态,将会尝试重建连接。
先来介绍下该类中的核心字段 org.skywalking.apm.agent.core.remote.GRPCChannelManager
private volatile GRPCChannel managedChannel = null;
private volatile ScheduledFuture<?> connectCheckFuture;
private volatile boolean reconnect = true;
private volatile int reconnectCount = 0;
private final List<GRPCChannelListener> listeners = Collections.synchronizedList(new LinkedList<>());
private volatile List<String> grpcServers;
private final Random random = new Random();
private volatile int selectedIdx = -1;
从上一篇文章 Agent 启动流程解析 可以得知实现 BootService 接口的服务会依次调用 prepare 、 boot 、onComplete 方法。 GRPCChannelManager 的 prepare 和 onComplete 都是空实现,我们直接来看下 boot 方法
public void boot() {
if (Config.Collector.BACKEND_SERVICE.trim().length() == 0) {
LOGGER.error("Collector server addresses are not set.");
LOGGER.error("Agent will not uplink any data.");
return;
}
grpcServers = Arrays.asList(Config.Collector.BACKEND_SERVICE.split(","));
connectCheckFuture = Executors.newSingleThreadScheduledExecutor(
new DefaultNamedThreadFactory("GRPCChannelManager")
).scheduleAtFixedRate(
new RunnableWithExceptionProtection(
this,
t -> LOGGER.error("unexpected exception.", t)
), 0, Config.Collector.GRPC_CHANNEL_CHECK_INTERVAL, TimeUnit.SECONDS
);
}
再来看下线程任务的具体实现
public void run() {
LOGGER.debug("Selected collector grpc service running, reconnect:{}.", reconnect);
if (IS_RESOLVE_DNS_PERIODICALLY && reconnect) {
String backendService = Config.Collector.BACKEND_SERVICE.split(",")[0];
try {
String[] domainAndPort = backendService.split(":");
List<String> newGrpcServers = Arrays
.stream(InetAddress.getAllByName(domainAndPort[0]))
.map(InetAddress::getHostAddress)
.map(ip -> String.format("%s:%s", ip, domainAndPort[1]))
.collect(Collectors.toList());
grpcServers = newGrpcServers;
} catch (Throwable t) {
LOGGER.error(t, "Failed to resolve {} of backend service.", backendService);
}
}
if (reconnect) {
if (grpcServers.size() > 0) {
String server = "";
try {
int index = Math.abs(random.nextInt()) % grpcServers.size();
if (index != selectedIdx) {
selectedIdx = index;
server = grpcServers.get(index);
String[] ipAndPort = server.split(":");
if (managedChannel != null) {
managedChannel.shutdownNow();
}
managedChannel = GRPCChannel.newBuilder(ipAndPort[0], Integer.parseInt(ipAndPort[1]))
.addManagedChannelBuilder(new StandardChannelBuilder())
.addManagedChannelBuilder(new TLSChannelBuilder())
.addChannelDecorator(new AgentIDDecorator())
.addChannelDecorator(new AuthenticationDecorator())
.build();
notify(GRPCChannelStatus.CONNECTED);
reconnectCount = 0;
reconnect = false;
} else if (managedChannel.isConnected(++reconnectCount > Config.Agent.FORCE_RECONNECTION_PERIOD)) {
reconnectCount = 0;
notify(GRPCChannelStatus.CONNECTED);
reconnect = false;
}
return;
} catch (Throwable t) {
LOGGER.error(t, "Create channel to {} fail.", server);
}
}
LOGGER.debug(
"Selected collector grpc service is not available. Wait {} seconds to retry",
Config.Collector.GRPC_CHANNEL_CHECK_INTERVAL
);
}
}
private void notify(GRPCChannelStatus status) {
for (GRPCChannelListener listener : listeners) {
try {
listener.statusChanged(status);
} catch (Throwable t) {
LOGGER.error(t, "Fail to notify {} about channel connected.", listener.getClass().getName());
}
}
}
核心流程图归纳如下 
实例信息注册与心跳服务定义
Management.proto
首先来介绍服务的协议文件定义 apm-protocol/apm-network/src/main/proto/management/Management.proto 其中定义了2个方法 reportInstanceProperties 负责上报服务实例信息与OS信息 keepAlive 负责上报服务心跳包
syntax = "proto3";
package skywalking.v3;
option java_multiple_files = true;
option java_package = "org.apache.skywalking.apm.network.management.v3";
option csharp_namespace = "SkyWalking.NetworkProtocol.V3";
option go_package = "skywalking/network/management/v3";
import "common/Common.proto";
service ManagementService {
rpc reportInstanceProperties (InstanceProperties) returns (Commands) {
}
rpc keepAlive (InstancePingPkg) returns (Commands) {
}
}
message InstanceProperties {
string service = 1;
string serviceInstance = 2;
repeated KeyStringValuePair properties = 3;
}
message InstancePingPkg {
string service = 1;
string serviceInstance = 2;
}
对应生成的文件结构如下
apm-protocol/apm-network/target/generated-sources/
└── protobuf
├── grpc-java
│ └── org/apache/skywalking/apm/network/management.v3
│ └── ManagementServiceGrpc.java
└── java
└── org/apache/skywalking/apm/network/management.v3
├── InstancePingPkg.java
├── InstancePingPkgOrBuilder.java
├── InstanceProperties.java
├── InstancePropertiesOrBuilder.java
└── Management.java
ServiceManagementClient
客户端具体实现
@DefaultImplementor
public class ServiceManagementClient implements BootService, Runnable, GRPCChannelListener {
private static final ILog LOGGER = LogManager.getLogger(ServiceManagementClient.class);
private static List<KeyStringValuePair> SERVICE_INSTANCE_PROPERTIES;
private volatile GRPCChannelStatus status = GRPCChannelStatus.DISCONNECT;
private volatile ManagementServiceGrpc.ManagementServiceBlockingStub managementServiceBlockingStub;
private volatile ScheduledFuture<?> heartbeatFuture;
private volatile AtomicInteger sendPropertiesCounter = new AtomicInteger(0);
@Override
public void statusChanged(GRPCChannelStatus status) {
if (GRPCChannelStatus.CONNECTED.equals(status)) {
Channel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getChannel();
managementServiceBlockingStub = ManagementServiceGrpc.newBlockingStub(channel);
} else {
managementServiceBlockingStub = null;
}
this.status = status;
}
@Override
public void prepare() {
ServiceManager.INSTANCE.findService(GRPCChannelManager.class).addChannelListener(this);
SERVICE_INSTANCE_PROPERTIES = new ArrayList<>();
for (String key : Config.Agent.INSTANCE_PROPERTIES.keySet()) {
SERVICE_INSTANCE_PROPERTIES.add(KeyStringValuePair.newBuilder()
.setKey(key)
.setValue(Config.Agent.INSTANCE_PROPERTIES.get(key))
.build());
}
Config.Agent.INSTANCE_NAME = StringUtil.isEmpty(Config.Agent.INSTANCE_NAME)
? UUID.randomUUID().toString().replaceAll("-", "") + "@" + OSUtil.getIPV4()
: Config.Agent.INSTANCE_NAME;
}
@Override
public void boot() {
heartbeatFuture = Executors.newSingleThreadScheduledExecutor(
new DefaultNamedThreadFactory("ServiceManagementClient")
).scheduleAtFixedRate(
new RunnableWithExceptionProtection(
this,
t -> LOGGER.error("unexpected exception.", t)
), 0, Config.Collector.HEARTBEAT_PERIOD,
TimeUnit.SECONDS
);
}
@Override
public void onComplete() {
}
@Override
public void shutdown() {
heartbeatFuture.cancel(true);
}
@Override
public void run() {
LOGGER.debug("ServiceManagementClient running, status:{}.", status);
if (GRPCChannelStatus.CONNECTED.equals(status)) {
try {
if (managementServiceBlockingStub != null) {
if (Math.abs(sendPropertiesCounter.getAndAdd(1)) % Config.Collector.PROPERTIES_REPORT_PERIOD_FACTOR == 0) {
managementServiceBlockingStub
.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS)
.reportInstanceProperties(InstanceProperties.newBuilder()
.setService(Config.Agent.SERVICE_NAME)
.setServiceInstance(Config.Agent.INSTANCE_NAME)
.addAllProperties(OSUtil.buildOSInfo(
Config.OsInfo.IPV4_LIST_SIZE))
.addAllProperties(SERVICE_INSTANCE_PROPERTIES)
.build());
} else {
final Commands commands = managementServiceBlockingStub.withDeadlineAfter(
GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS
).keepAlive(InstancePingPkg.newBuilder()
.setService(Config.Agent.SERVICE_NAME)
.setServiceInstance(Config.Agent.INSTANCE_NAME)
.build());
ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands);
}
}
} catch (Throwable t) {
LOGGER.error(t, "ServiceManagementClient execute fail.");
ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError(t);
}
}
}
}
可以看到客户端发起了2个 RPC 调用
reportInstanceProperties 服务实例信息上报默认3分钟发起一次keepAlive 心跳检测默认30秒发起一次
OAP 服务端
类图

Server
Skywalking OAP 将对外提供的服务抽象为 Server 接口,它有2个实现
GRPCServer 主要负责接收 Agent 发送的 GRPC 请求,默认监听 11800 端口JettyServer 主要负责接收 UI 界面发送的 HTTP 请求,默认监听 12800 端口
GRPCServer
封装了 GRPC 服务参数的初始化、服务注册接口、启动方法
public class GRPCServer implements Server {
private final String host;
private final int port;
private int maxConcurrentCallsPerConnection;
private int maxMessageSize;
private io.grpc.Server server;
private NettyServerBuilder nettyServerBuilder;
private String certChainFile;
private String privateKeyFile;
private DynamicSslContext sslContext;
private int threadPoolSize = Runtime.getRuntime().availableProcessors() * 4;
private int threadPoolQueueSize = 10000;
public GRPCServer(String host, int port) {
this.host = host;
this.port = port;
this.maxConcurrentCallsPerConnection = 4;
this.maxMessageSize = Integer.MAX_VALUE;
}
public void setMaxConcurrentCallsPerConnection(int maxConcurrentCallsPerConnection) {
this.maxConcurrentCallsPerConnection = maxConcurrentCallsPerConnection;
}
public void setMaxMessageSize(int maxMessageSize) {
this.maxMessageSize = maxMessageSize;
}
public void setThreadPoolSize(int threadPoolSize) {
this.threadPoolSize = threadPoolSize;
}
public void setThreadPoolQueueSize(int threadPoolQueueSize) {
this.threadPoolQueueSize = threadPoolQueueSize;
}
public GRPCServer(String host, int port, String certChainFile, String privateKeyFile) {
this(host, port);
this.certChainFile = certChainFile;
this.privateKeyFile = privateKeyFile;
}
@Override
public String hostPort() {
return host + ":" + port;
}
@Override
public String serverClassify() {
return "Google-RPC";
}
@Override
public void initialize() {
InetSocketAddress address = new InetSocketAddress(host, port);
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue(threadPoolQueueSize);
ExecutorService executor = new ThreadPoolExecutor(
threadPoolSize, threadPoolSize, 60, TimeUnit.SECONDS, blockingQueue,
new CustomThreadFactory("grpcServerPool"), new CustomRejectedExecutionHandler()
);
nettyServerBuilder = NettyServerBuilder.forAddress(address);
nettyServerBuilder = nettyServerBuilder.maxConcurrentCallsPerConnection(maxConcurrentCallsPerConnection)
.maxInboundMessageSize(maxMessageSize)
.executor(executor);
if (!Strings.isNullOrEmpty(privateKeyFile) && !Strings.isNullOrEmpty(certChainFile)) {
sslContext = DynamicSslContext.forServer(privateKeyFile, certChainFile);
nettyServerBuilder.sslContext(sslContext);
}
log.info("Server started, host {} listening on {}", host, port);
}
static class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
log.warn("Grpc server thread pool is full, rejecting the task");
}
}
@Override
public void start() throws ServerException {
try {
Optional.ofNullable(sslContext).ifPresent(DynamicSslContext::start);
server = nettyServerBuilder.build();
server.start();
} catch (IOException e) {
throw new GRPCServerException(e.getMessage(), e);
}
}
public void addHandler(BindableService handler) {
log.info("Bind handler {} into gRPC server {}:{}", handler.getClass().getSimpleName(), host, port);
nettyServerBuilder.addService(handler);
}
public void addHandler(ServerServiceDefinition definition) {
log.info("Bind handler {} into gRPC server {}:{}", definition.getClass().getSimpleName(), host, port);
nettyServerBuilder.addService(definition);
}
public void addHandler(ServerInterceptor serverInterceptor) {
log.info("Bind interceptor {} into gRPC server {}:{}", serverInterceptor.getClass().getSimpleName(), host, port);
nettyServerBuilder.intercept(serverInterceptor);
}
@Override
public boolean isSSLOpen() {
return !Strings.isNullOrEmpty(privateKeyFile) && !Strings.isNullOrEmpty(certChainFile);
}
@Override
public boolean isStatusEqual(Server target) {
if (this == target)
return true;
if (target == null || getClass() != target.getClass())
return false;
GRPCServer that = (GRPCServer) target;
return port == that.port && Objects.equals(host, that.host) && Objects.equals(
certChainFile, that.certChainFile) && Objects
.equals(privateKeyFile, that.privateKeyFile);
}
}
ServerHandler
Server 处理客户端请求逻辑的顶层抽象,具体处理请求的逻辑都会封装在该接口的实现之中 ?
GRPCHandler
GRPCServer 处理 GRPC 请求的逻辑都封装在了该接口的实现类中
ManagementServiceHandler
GRPCHandler 的其中一个实现,该类就是实例信息注册与心跳服务的具体实现
public class ManagementServiceHandler extends ManagementServiceGrpc.ManagementServiceImplBase implements GRPCHandler {
private final SourceReceiver sourceReceiver;
private final NamingControl namingControl;
public ManagementServiceHandler(ModuleManager moduleManager) {
this.sourceReceiver = moduleManager.find(CoreModule.NAME).provider().getService(SourceReceiver.class);
this.namingControl = moduleManager.find(CoreModule.NAME)
.provider()
.getService(NamingControl.class);
}
@Override
public void reportInstanceProperties(final InstanceProperties request,
final StreamObserver<Commands> responseObserver) {
ServiceInstanceUpdate serviceInstanceUpdate = new ServiceInstanceUpdate();
final String serviceName = namingControl.formatServiceName(request.getService());
final String instanceName = namingControl.formatInstanceName(request.getServiceInstance());
serviceInstanceUpdate.setServiceId(IDManager.ServiceID.buildId(serviceName, NodeType.Normal));
serviceInstanceUpdate.setName(instanceName);
JsonObject properties = new JsonObject();
List<String> ipv4List = new ArrayList<>();
request.getPropertiesList().forEach(prop -> {
if (InstanceTraffic.PropertyUtil.IPV4.equals(prop.getKey())) {
ipv4List.add(prop.getValue());
} else {
properties.addProperty(prop.getKey(), prop.getValue());
}
});
properties.addProperty(InstanceTraffic.PropertyUtil.IPV4S, ipv4List.stream().collect(Collectors.joining(",")));
serviceInstanceUpdate.setProperties(properties);
serviceInstanceUpdate.setTimeBucket(
TimeBucket.getTimeBucket(System.currentTimeMillis(), DownSampling.Minute));
sourceReceiver.receive(serviceInstanceUpdate);
responseObserver.onNext(Commands.newBuilder().build());
responseObserver.onCompleted();
}
@Override
public void keepAlive(final InstancePingPkg request, final StreamObserver<Commands> responseObserver) {
final long timeBucket = TimeBucket.getTimeBucket(System.currentTimeMillis(), DownSampling.Minute);
final String serviceName = namingControl.formatServiceName(request.getService());
final String instanceName = namingControl.formatInstanceName(request.getServiceInstance());
ServiceInstanceUpdate serviceInstanceUpdate = new ServiceInstanceUpdate();
serviceInstanceUpdate.setServiceId(IDManager.ServiceID.buildId(serviceName, NodeType.Normal));
serviceInstanceUpdate.setName(instanceName);
serviceInstanceUpdate.setTimeBucket(timeBucket);
sourceReceiver.receive(serviceInstanceUpdate);
ServiceMeta serviceMeta = new ServiceMeta();
serviceMeta.setName(serviceName);
serviceMeta.setNodeType(NodeType.Normal);
serviceMeta.setTimeBucket(timeBucket);
sourceReceiver.receive(serviceMeta);
responseObserver.onNext(Commands.newBuilder().build());
responseObserver.onCompleted();
}
}
InstanceUpdateDispatcher & ServiceMetaDispatcher
从上一步服务的具体实现类中我们可以看到,上报的信息被转换成了 Source 类,然后传入了 SourceReceiver 中,SourceReceiver 会根据 Source 类注解上指定的 Dispatcher 调度器再将 Source 类转换为指标类 实例信息和服务信息的调度器如下 对应转换的指标类分别是 InstanceTraffic 和 ServiceTraffic
public class InstanceUpdateDispatcher implements SourceDispatcher<ServiceInstanceUpdate> {
@Override
public void dispatch(final ServiceInstanceUpdate source) {
InstanceTraffic traffic = new InstanceTraffic();
traffic.setTimeBucket(source.getTimeBucket());
traffic.setName(source.getName());
traffic.setServiceId(source.getServiceId());
traffic.setLastPingTimestamp(source.getTimeBucket());
traffic.setProperties(source.getProperties());
MetricsStreamProcessor.getInstance().in(traffic);
}
}
public class ServiceMetaDispatcher implements SourceDispatcher<ServiceMeta> {
@Override
public void dispatch(final ServiceMeta source) {
ServiceTraffic traffic = new ServiceTraffic();
traffic.setTimeBucket(source.getTimeBucket());
traffic.setName(source.getName());
traffic.setNodeType(source.getNodeType());
MetricsStreamProcessor.getInstance().in(traffic);
}
}
InstanceTraffic & ServiceTraffic
再来看一下这两个指标类 其中 Stream 注解的 name 字段就是指标名称,存储模块例如 ES 在写入的时候会根据它进行索引的创建
@Stream(name = InstanceTraffic.INDEX_NAME, scopeId = SERVICE_INSTANCE,
builder = InstanceTraffic.Builder.class, processor = MetricsStreamProcessor.class)
@MetricsExtension(supportDownSampling = false, supportUpdate = true)
@EqualsAndHashCode(of = {
"serviceId",
"name"
})
public class InstanceTraffic extends Metrics {
public static final String INDEX_NAME = "instance_traffic";
public static final String SERVICE_ID = "service_id";
public static final String NAME = "name";
public static final String LAST_PING_TIME_BUCKET = "last_ping";
public static final String PROPERTIES = "properties";
private static final Gson GSON = new Gson();
}
@Stream(name = ServiceTraffic.INDEX_NAME, scopeId = DefaultScopeDefine.SERVICE,
builder = ServiceTraffic.Builder.class, processor = MetricsStreamProcessor.class)
@MetricsExtension(supportDownSampling = false, supportUpdate = false)
@EqualsAndHashCode(of = {
"name",
"nodeType"
})
public class ServiceTraffic extends Metrics {
public static final String INDEX_NAME = "service_traffic";
public static final String NAME = "name";
public static final String NODE_TYPE = "node_type";
public static final String GROUP = "service_group";
}
之后调度器会将指标类传入到流处理器中,流处理器再将指标类传入到各种 Worker 中进行聚合、持久化存储等操作,具体的原理就不再赘述了,我们用 ES 作为存储模块验证一下实例信息和服务信息上报的结果
结果验证
以 Live-Demo 项目中的 projectB 为例 使用 business-zone::projectB 作为服务名称启动 ProjectB 项目,在 ServiceManagementClient 的 prepare 方法结尾打上断点,查看生成的实例名称 
这里可以看到生成的实例名称为 e4143e44d8ff448583bef96d77b57efc@192.168.100.48
启动 UI 服务,可以看到当前服务和当前实例的名称和预期的一样  再到 ES 中查看对应指标索引的结果 实例信息指标  服务信息指标  以上就是实例信息和服务信息上报的整个流程验证
总结
Skywalking Agent 与 OAP 分别实现了 GRPC 的客户端与服务端并进行远程通信。 GRPC 本身是一个基于HTTP/2 标准设计高性能的 RPC 框架,默认使用的 Protocol Buffers 序列化协议性能相较于 RESTful Json 好很多。并且Skywalking 在使用 GRPC 的长连接时,也保证了多个服务复用一个连接,减少了网络带宽,提高了交互效率,这种思想也值得我们借鉴。
|