Thrift详解

RPC调用流程

1
2
3
4
5
6
7
8
9
10
11
1)客户端(client)调用以本地调用方式调用服务;
2)client stub接收到调用后负责将方法、参数等组装成能够进行网络传输的消息体;
3)client stub找到服务地址,并将消息发送到服务端;
4)server stub收到消息后进行解码;
5)server stub根据解码结果调用本地的服务;
6)服务端执行并将结果返回给server stub;
7)server stub将返回结果打包成消息并发送至消费方;
8)client stub接收到消息,并进行解码;
9)客户端得到最终结果。

img

Thrift主要特点

  1. 基于二进制的高性能的编解码框架
  2. 基于NIO的底层通信
  3. 相对简单的服务调用模型
  4. 使用IDL支持跨平台调用

Thrift核心组件

  1. TProtocol 协议和编解码组件
  2. TTransport 传输组件
  3. TProcessor 服务调用组件
  4. TServerClient 服务器和客户端组件
  5. IDL 服务描述组件,负责生产跨平台客户端

Thrift基础架构

img

thrift是一个客户端和服务器端的架构体系(c/s),在最上层是用户自行实现的业务逻辑代码。第二层是由thrift编译器自动生成的代码,主要用于结构化数据的解析,发送和接收。TServer主要任务是高效的接受客户端请求,并将请求转发给Processor处理。Processor负责对客户端的请求做出响应,包括RPC请求转发,调用参数解析和用户逻辑调用,返回值写回等处理。从TProtocol以下部分是thirft的传输协议和底层I/O通信。TProtocol是用于数据类型解析的,将结构化数据转化为字节流给TTransport进行传输。TTransport是与底层数据传输密切相关的传输层,负责以字节流方式接收和发送消息体,不关注是什么数据类型。底层IO负责实际的数据传输,包括socket、文件和压缩数据流等。

传输层TTransport

  • TIOStreamTransport: 这个类封装了InputStreamOutputStream这两个流,用来处理数据传输中的输入输出流。采用的是阻塞同步IO。
    • TSocket: 是TIOStreamTransport类的子类,并且封装了Socket接口。
  • TNonblockingTransport: 这个类是非阻塞IO的抽象类。
    • TNonblockingSocket: 是TNonblockingTransport类的子类,使用了SocketChannel进行了非阻塞IO。
  • TFramedTransport: 使用非阻塞方式,帧传输类就是按照一帧的固定大小来传输数据,所有的写操作首先都是在内存中完成的直到调用了flush操作,然后传输节点在flush操作之后将所有数据根据数据的有效载荷写入数据的长度的二进制块发送出去,允许在接收的另一端按照固定的长度来读取。
1
2
3
4
5
6
7
8
9
10
11
// TTransport.class
public abstract class TTransport {
// 读取指定长度的数据
public abstract int read(byte[] buf, int off, int len) throws TTransportException;
// 写数据
public abstract void write(byte[] buf, int off, int len) throws TTransportException;
// 把缓冲区的数据全部都push出去
public void flush() throws TTransportException;
}
阻塞TSocket
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// TSocket.class
public class TSocket extends TIOStreamTransport {
// 构造socket
public TSocket(String host, int port, int socketTimeout, int connectTimeout) {
host_ = host;
port_ = port;
socketTimeout_ = socketTimeout;
connectTimeout_ = connectTimeout;
initSocket();
}
// 创建新的Socket
private void initSocket() {
socket_ = new Socket();
try {
socket_.setSoLinger(false, 0);
socket_.setTcpNoDelay(true);
socket_.setKeepAlive(true);
socket_.setSoTimeout(socketTimeout_);
} catch (SocketException sx) {
LOGGER.error("Could not configure socket.", sx);
}
}
// 打开链接
public void open() throws TTransportException {
socket_.connect(new InetSocketAddress(host_, port_), timeout_);
inputStream_ = new BufferedInputStream(socket_.getInputStream(), 1024);
outputStream_ = new BufferedOutputStream(socket_.getOutputStream(), 1024);
}
}
非阻塞TFramedTransport
  • 流程图

这里写图片描述

  • 源码分析
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class TFramedTransport extends TTransport {
private TTransport transport_ = null;
// 输出缓冲区
private final TByteArrayOutputStream writeBuffer_ =
new TByteArrayOutputStream(1024);
// 输入缓冲区
private TMemoryInputTransport readBuffer_ = new TMemoryInputTransport(new byte[0]);
// 一帧4byte
private final byte[] i32buf = new byte[4];
// 读取数据
public int read(byte[] buf, int off, int len) throws TTransportException {
if (readBuffer_ != null) {
int got = readBuffer_.read(buf, off, len);
if (got > 0) {
return got;
}
}
// Read another frame of data
readFrame();
return readBuffer_.read(buf, off, len);
}
// 读取一帧数据,放到输入缓冲区
private void readFrame() throws TTransportException {
transport_.readAll(i32buf, 0, 4);
int size = decodeFrameSize(i32buf);
byte[] buff = new byte[size];
transport_.readAll(buff, 0, size);
readBuffer_.reset(buff);
}
}

TFramedTransport封装了TMemoryInputTransport做输入流,封装了TByteArryOutPutStream做输出流,作为内存读写缓冲区的一个封装。TFramedTransportflush方法时,会先写4个字节的输出流的长度作为消息头,然后写消息体;和FrameBuffer的读消息对应起来,FrameBuffer 读消息时,先读4个字节的长度,再读消息体。

1
2
// Test
TTransport transport = new TFramedTransport(new TSocket(SERVER_IP, SERVER_PORT, TIMEOUT));

协议层TProtocol

  • TBinaryProtocol:二进制编码格式进行数据传输。
  • TCompactProtocol:高效的编码方式,对数据进行压缩。
  • TJSONProtocol:使用JSON的数据编码协议进行数据传输。

参考

你应该知道的RPC原理

热评文章