
gRPC(通用远程过程调用)是谷歌基于RPC协议开发的一个框架,用于在分布式系统中调用函数。它具有使用 HTTP/2 作为其传输协议的一些优点。 gRPC 可以被视为流行的数据查询和操作语言(如 REST 或 GraphQL)的替代方案。
本文旨在作为一个简单的介绍演示,使任何人都可以在下一个项目中启动 gRPC,并专注于协议的核心概念,不涉及更高级的主题,例如负载平衡、调用取消或拦截器。
本文中给出的示例基于 Flutter 客户端和 Rust 服务器,以展示流行的前端/后端组合的基本实现。
gRPC 包
gRPC 包的基本结构基于消息和服务的定义。 gRPC 包与客户端和服务器必须知道的 .proto 文件相关联,以便正确处理和序列化消息。这些消息是使用协议缓冲区定义的,并支持各种数据类型和结构。服务由 RPC 组成,这些 RPC 基本上是一些基于消息的远程函数定义。
总之,服务允许四种类型的 RPC:
- 一个简单的一元 RPC,其中客户端向服务器发送单个请求并返回单个响应
- 服务器端流式 RPC,客户端向服务器发送单个请求并返回数据流
- 客户端流式 RPC,其中客户端将数据流发送到服务器并返回单个响应
- 双向流式 RPC,双方独立发送数据流。
在本文中,我们将重点介绍前三种服务方法及其工作原理。
一个简单的包
在 gRPC 中,客户端触发通信。因此,从客户端的角度定义服务和消息名称通常是有意义的。以下文件显示了简单的包 my_grpc_service,其中包含两条消息 MyGrpcrequest 和 MyGrpcResponse 以及关联的服务 MyGrpcService。该服务由三种不同类型的 RPC 组成。
syntax = "proto3";
package my_grpc_service;
message MyGrpcResponse{
double SomeDouble = 1;
string SomeString = 2;
}
message MyGrpcRequest{
string SomeString = 1;
}
service MyGrpcService{
rpc SendAndGetData (MyGrpcRequest) returns (MyGrpcResponse); // simple unary RPC
rpc GetLotsOfData (MyGrpcRequest) returns (stream MyGrpcResponse); // server-side streaming RPC
rpc SendLotsOfData (stream MyGrpcRequest) returns (MyGrpcResponse); // client-side streaming RPC
}
该文件将用于为客户端和服务器生成 gRPC 代码。 gRPC 官方网站概述了一些用于此目的的流行库。 这些库的基础通常由 protoc 编译器组成。
一个基本的实现
先决条件
对于 Flutter,必须添加包 grpc 和 protobuf。 对于 Rust,使用了流行的 crates tonic 和 tokio。 此外,必须安装 protoc 编译器并将其添加到 PATH。
生成 gRPC 代码
将 .proto 文件放入名为 protos 的顶级文件夹是常见的做法。 出于这个原因,两个项目的 .proto 文件都放在那里。
可以通过在终端中运行以下命令来生成 dart 代码:
protoc --dart_out=grpc:lib/generated -Iprotos protos/my_grpc_service.proto
这将生成实现服务所需的所有 dart 文件,并将它们放在 lib/generated。
Rust 代码可以使用构建脚本生成。 构建脚本 build.rs 位于 .toml 文件旁边,定义了在构建过程之前要执行的操作。 在这种情况下,构建脚本由以下几行组成:
fn main() {
let proto_file = "./protos/my_grpc_service.proto";
tonic_build::configure()
.build_server(true)
.compile(&[proto_file], &["."])
.unwrap_or_else(|e| panic!("protobuf compile error: {}", e));
}
这将生成实现服务所需的所有 Rust 文件并将它们放在构建文件夹中。
之后,必须将生成的 dart 客户端 MyGrpcServiceClient 打包到服务中。 这使得通过 Providers 将服务注入到 Flutter 应用程序的各个部分成为可能。 在服务器端,必须实现生成的特征 MyGrpcService 以便为服务提供一些功能。
简单的一元 RPC
在简单的一元 RPC 的情况下,定义了一个函数来接收一些请求消息并返回一些响应消息。
import 'package:flutter_grpc_demo/generated/my_grpc_service.pbgrpc.dart';
import 'package:grpc/grpc.dart';
class MyGrpcService {
final channel = ClientChannel(
'0.0.0.0', //localhost
port: 50052,
options: const ChannelOptions(credentials: ChannelCredentials.insecure()),
);
//...
Future<MyGrpcResponse> sendAndGetData(MyGrpcRequest request) async {
final stub = MyGrpcServiceClient(channel); // This class is auto generated
try {
return await stub.sendAndGetData(request);
} on Exception catch (e) {
return Future.Error(e);
}
}
//...
}
在简单的一元 RPC 的情况下,还定义了一个函数来接收一些请求消息并返回一些响应消息。
pub struct MyGrpcServer {}
#[tonic::async_trait]
impl MyGrpcService for MyGrpcServer {
async fn send_and_get_data(
&self,
request: Request<MyGrpcRequest>,
) -> Result<Response<MyGrpcResponse>, Status> {
//Incoming request message
let my_grpc_request = request.into_inner();
//Do something with the request message and generate a response message
let my_grpc_response = get_some_response().await;
Ok(Response::new(my_grpc_response))
}
//...
}
mod service {
tonic::include_proto!("my_grpc_service");
}
use service::my_grpc_service_server::MyGrpcServiceServer;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addr = "0.0.0.0:50052".parse().unwrap(); // localhost
let server = MyGrpcServer::default();
// Start server
println!("Server listening on {}", addr);
Server::builder()
.add_service(MyGrpcServiceServer::new(server))
.serve(addr)
.await?;
Ok(())
}
服务器端流式 RPC
在服务器端流式 RPC 的情况下,客户端获取响应消息流。 这些消息可以异步获取,然后通过另一个可以监听的流传递给 Flutter 应用程序。
import 'package:flutter_grpc_demo/generated/my_grpc_service.pbgrpc.dart';
import 'package:grpc/grpc.dart';
class MyGrpcService {
final channel = ClientChannel(
'0.0.0.0', //localhost
port: 50052,
options: const ChannelOptions(credentials: ChannelCredentials.insecure()),
);
//...
Stream<MyGrpcResponse> listenToStreamFromServer(MyGrpcRequest request) async* {
final stub = MyGrpcServiceClient(channel); // This class is auto generated
try {
var responses = stub.getLotsOfData(request);
await for (final response in responses) {
yield response;
}
} on Exception catch (e) {
yield* Stream.error(e);
}
}
//...
}
在服务器端,创建了一些可迭代的数据,并使用 tokio 生成了通道。 这些通道通过对项目进行排队来传输单个响应消息。
pub struct MyGrpcServer {}
#[tonic::async_trait]
impl MyGrpcService for MyGrpcServer {
type GetLotsOfDataStream = Pin<Box<dyn Stream<Item = Result<MyGrpcResponse, Status>> + Send>>;
async fn get_lots_of_data(
&self,
request: Request<MyGrpcRequest>,
) -> Result<Response<Self::GetLotsOfDataStream>, Status> {
//Incoming request message
let my_grpc_request = request.into_inner();
//Do something with the request message and generate a vector of response messages
let my_grpc_responses = get_vector_of_responses().await; // Outgoing response messages
//Spawn up to 128 channels and send asynchronous response messages
let (tx, rx) = mpsc::channel(128);
tokio::spawn(async move {
while let Some(item) = tokio_stream::iter(my_grpc_responses).next().await {
match tx.send(Result::<_, Status>::Ok(item)).await {
Ok(_) => {
//Item was queued
}
Err(_item) => {
//Output stream was build and will be dropped
break;
}
}
}
});
Ok(Response::new(Box::pin(ReceiverStream::new(rx))))
}
//...
}
mod service {
tonic::include_proto!("my_grpc_service");
}
use service::my_grpc_service_server::MyGrpcServiceServer;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addr = "0.0.0.0:50052".parse().unwrap(); // localhost
let server = MyGrpcServer::default();
// Start server
println!("Server listening on {}", addr);
Server::builder()
.add_service(MyGrpcServiceServer::new(server))
.serve(addr)
.await?;
Ok(())
}
客户端流式 RPC
在客户端流式 RPC 的情况下,客户端发送请求消息流并接收单个响应消息。 此服务功能的输入是 Flutter 应用程序提供的流。
import 'package:flutter_grpc_demo/generated/my_grpc_service.pbgrpc.dart';
import 'package:grpc/grpc.dart';
class MyGrpcService {
final channel = ClientChannel(
'0.0.0.0', //localhost
port: 50052,
options: const ChannelOptions(credentials: ChannelCredentials.insecure()),
);
//...
Future<MyGrpcResponse> streamDataToServer(Stream<MyGrpcRequest> requests) async {
final stub = MyGrpcServiceClient(channel); // This class is auto generated
try {
return await stub.sendLotsOfData(requests);
} on Exception catch (e) {
return Future.error(e);
}
}
//...
}
在服务器端获取并迭代请求消息。 之后,将单个响应消息发送回客户端。
pub struct MyGrpcServer {}
#[tonic::async_trait]
impl MyGrpcService for MyGrpcServer {
async fn send_lots_of_data(
&self,
request: Request<Streaming<MyGrpcRequest>>,
) -> Result<tonic::Response<self::MyGrpcResponse>, tonic::Status> {
//Incoming request messages
let mut incoming_stream = request.into_inner();
while let Some(request) = incoming_stream.next().await {
let current_request = Some(request?);
//Perform some operations on incoming request messages
//...
}
//Generate some response message
let response = get_some_response().await;
Ok(Response::new(response))
}
//...
}
mod service {
tonic::include_proto!("my_grpc_service");
}
use service::my_grpc_service_server::MyGrpcServiceServer;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addr = "0.0.0.0:50052".parse().unwrap(); // localhost
let server = MyGrpcServer::default();
// Start server
println!("Server listening on {}", addr);
Server::builder()
.add_service(MyGrpcServiceServer::new(server))
.serve(addr)
.await?;
Ok(())
}
把它包起来
本文应该对 gRPC 框架进行了快速简单的介绍,并展示了简单一元、服务器端流式传输和客户端流式传输 RPC 的基本实现。 应该清楚每次 RPC 过程中服务端和客户端接收和发送什么样的数据,核心概念是什么。

如若转载,请注明出处:https://www.dasum.com/45589.html