grpc应用之二 gRPC使用 grpc Quick Start
安装grpc
在上一个例子grpc应用之一 使用protobuf的项目下,执行如下命令:
go get -u google.golang.org/grpc@latest
gRPC的四种调用方式
- 一元RPC
- 服务端流式RPC
- 客户端流式RPC
- 双向流式RPC
修改server目录下的server.go文件,其代码如下:
package main
import (
"flag"
// 用pb别名来引用proto里定义的类型方法
pb "github.com/realjf/grpc-demo/proto"
)
var port string
func init() {
flag.StringVar(&port, "p", "8000", "启动端口号")
flag.Parse()
}
每次在proto文件中定义RPC方法的proto时,需要重新在根目录下运行如下命令重新编译生成语句:
protoc --go_out=plugins=grpc:. ./proto/*.proto
一元RPC
一元 RPC,也就是是单次 RPC 调用,简单来讲就是客户端发起一次普通的 RPC 请求,响应,是最基础的调用类型,也是最常用的方式,
proto文件
rpc SayHello(HelloRequest) returns (HelloRespnose) {};
server端
server.go代码如下:
package main
import (
"context"
"flag"
"net"
pb "github.com/realjf/grpc-demo/proto"
"google.golang.org/grpc"
)
var port string
func init() {
flag.StringVar(&port, "p", "8000", "启动端口号")
flag.Parse()
}
type GreeterServer struct{}
func (s *GreeterServer) SayHello(ctx context.Context, r *pb.HelloRequest) (*pb.HelloResponse, error) {
return &pb.HelloResponse{Code: 200, Message: "success"}, nil
}
func main() {
server := grpc.NewServer()
pb.RegisterGreeterServer(server, &GreeterServer{})
lis, _ := net.Listen("tcp", ":"+port)
server.Serve(lis)
}
- 创建gRPC Server对象,你可以理解为它是Server端的抽象对象
- 将GreeterServer注册到gRPC Server的内部注册中心,这样可以在接收到请求时,通过内部的“服务发现”,发现该服务端接口并转移进行逻辑处理
- 创建Listen,监听TCP端口
- gRPC Server开始lis.Accept,直到Stop或GracefulStop
client
client/client.go的代码如下:
package main
import (
"context"
"flag"
"log"
pb "github.com/realjf/grpc-demo/proto"
"google.golang.org/grpc"
)
var port string
func init() {
flag.StringVar(&port, "p", "8000", "启动端口号")
flag.Parse()
}
func main() {
conn, _ := grpc.Dial(":"+port, grpc.WithInsecure())
defer conn.Close()
client := pb.NewGreeterClient(conn)
_ = SayHello(client)
}
func SayHello(client pb.GreeterClient) error {
resp, _ := client.SayHello(context.Background(), &pb.HelloRequest{Name: "realjf"})
log.Printf("client.SayHello resp: %s", resp.Message)
return nil
}
- 创建与给定服务端的连接句柄
- 创建Greeter的客户端对象
- 发送RPC请求,等待同步响应,得到回调后返回响应结果
现在可以运行一元RPC的服务端和客户端查看结果
服务端流式RPC
服务器端流式 RPC,也就是是单向流,并代指 Server 为 Stream,Client 为普通的一元 RPC 请求。
简单来讲就是客户端发起一次普通的 RPC 请求,服务端通过流式响应多次发送数据集,客户端 Recv 接收数据集。
proto文件
新增如下内容:
rpc SayList(HelloRequest) returns (stream HelloResponse) {};
server端
func (s *GreeterServer) SayList(r *pb.HelloRequest, stream pb.Greeter_SayListServer) error {
for n := 0; n <= 6; n++ {
_ = stream.Send(&pb.HelloResponse{Code: 200, Message: "hello.list"})
}
return nil
}
在 Server 端,主要留意 stream.Send 方法,是 protoc 在生成时,根据定义生成了各式各样符合标准的接口方法。最终再统一调度内部的 SendMsg 方法,该方法涉及以下过程:
- 消息体序列化
- 压缩序列化后的消息体
- 对正在传输的消息体增加5个字节的header(标志位)
- 判断压缩+序列化后的消息体的总字节长度是否大于预设的maxSendMessageSize,若超出则提示错误。
- 写入给流的数据集
client端
func SayList(client pb.GreeterClient, r *pb.HelloRequest) error {
stream, _ := client.SayList(context.Background(), r)
for {
resp, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return err
}
log.Printf("resp: %v", resp)
}
return nil
}
在client端,stream.Recv()方法是对ClientStream.RecvMsg方法的封装,而RecvMsg方法会从流中读取完整的gRPC消息体,
- RecvMsg是阻塞等待的
- RecvMsg当流成功/结束(调用了Close)时,会返回io.EOF
- RecvMsg当流出现任何错误时,流会被终止,错误信息会包含RPC错误码,而在RecvMsg中可能出现如下错误:
- io.EOF、io.ErrUnexpectedEOF
- transport.ConnectionError
- google.golang.org/grpc/codes(grpc的预定义错误码)
需要注意的是,默认的 MaxReceiveMessageSize 值为 1024 1024 4,若有特别需求,可以适当调整。
客户端流式RPC
客户端流式 RPC,单向流,客户端通过流式发起多次 RPC 请求给服务端,服务端发起一次响应给客户端,
proto文件
rpc SayRecord(stream HelloRequest) returns (HelloResponse) {};
server端
func (s *GreeterServer) SayRecord(stream pb.Greeter_SayRecordServer) error {
for {
resp, err := stream.Recv()
if err == io.EOF {
return stream.SendAndClose(&pb.HelloResponse{Message: "say.record"})
}
if err != nil {
return err
}
log.Printf("resp: %v", resp)
}
return nil
}
你可以发现在这段程序中,我们对每一个 Recv 都进行了处理,当发现 io.EOF (流关闭) 后,需要通过 stream.SendAndClose 方法将最终的响应结果发送给客户端,同时关闭正在另外一侧等待的 Recv。
client端
func SayRecord(client pb.GreeterClient, r *pb.HelloRequest) error {
stream, _ := client.SayRecord(context.Background())
for n := 0; n < 6; n++ {
_ = stream.Send(r)
}
resp, _ := stream.CloseAndRecv()
log.Printf("resp err: %v", resp)
return nil
}
在 Server 端的 stream.CloseAndRecv,与 Client 端 stream.SendAndClose 是配套使用的方法。
双向流式RPC
双向流式 RPC,顾名思义是双向流,由客户端以流式的方式发起请求,服务端同样以流式的方式响应请求。
首个请求一定是 Client 发起,但具体交互方式(谁先谁后、一次发多少、响应多少、什么时候关闭)根据程序编写的方式来确定(可以结合协程)。
proto文件
rpc SayRoute(stream HelloRequest) returns (stream HelloResponse) {};
server端
func (s *GreeterServer) SayRoute(stream pb.Greeter_SayRouteServer) error {
n := 0
for {
_ = stream.Send(&pb.HelloResponse{Code: 200, Message: "say.route"})
resp, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
n++
log.Printf("resp: %v", resp)
}
}
client端
func SayRoute(client pb.GreeterClient, r *pb.HelloRequest) error {
stream, _ := client.SayRoute(context.Background())
for n := 0; n <= 6; n++ {
_ = stream.Send(r)
resp, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return err
}
log.Printf("resp err: %v", resp)
}
_ = stream.CloseSend()
return nil
}