gRPC四种传输方式
单对单传输
proto文件
syntax = "proto3"; //指定proto版本
package hello_proto; //指定默认包名
//指定golang包名
option go_package = "/hello_proto"; //在hello.proto文件所在的目录下生成一个名为hello_proto的目录存放.pb.go文件
//option go_package = ".;xxx"; 这样写,是在hello.proto文件所在的目录下直接生成.pb.go文件,文件的包名为xxx
//定义rpc服务
//service 对应的是go语言的接口interface
service HelloService {
//定义函数/方法
//rpc对应的是go结构体中的方法
rpc SayHello (HelloRequest) returns (HelloResponse) {}
}
//HelloRequest 请求内容
//message对应的是go的struct
message HelloRequest {
string name = 1;
string message = 2;
}
//HelloResponse 回应内容
message HelloResponse {
string name = 1;
string message = 2;
}
服务端
// HelloService 这个结构体的名字不重要,能接收参数和实现方法就行
type HelloService struct {
}
func (HelloService) SayHello(ctx context.Context, request *hello_proto.HelloRequest) (response *hello_proto.HelloResponse, err error) {
return &hello_proto.HelloResponse{
Name: "RoLingG",
Message: "你好呀",
}, nil
}
// 监听方法、注册服务、实现grpc实例
func main() {
// 监听端口
listen, err := net.Listen("tcp", ":8080")
if err != nil {
grpclog.Fatalf("Failed to listen: %v", err)
}
// 创建一个gRPC服务器实例。
s := grpc.NewServer()
service := HelloService{} //创建一个proto里面的service的HelloService的服务
// 将server结构体注册为gRPC服务。
hello_proto.RegisterHelloServiceServer(s, &service) //RegisterHelloServiceServer()传过去一个proto里面的service的HelloService的服务
fmt.Println("grpc server running :8080")
// 开始处理客户端请求。
err = s.Serve(listen)
}
客户端
func main() {
port := ":8080"
// 使用 grpc.Dial 创建一个到指定地址的 gRPC 连接。
// 此处使用不安全的证书来实现 SSL/TLS 连
conn, err := grpc.Dial(port, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf(fmt.Sprintf("grpc connect fail [%s] %s", port, err))
}
defer conn.Close()
//初始化客户端
//NewHelloServiceClient()为proto文件输入cmd指令后自动生成的对应HelloService服务的客户端方法
client := hello_proto.NewHelloServiceClient(conn)
result, err := client.SayHello(context.Background(), &hello_proto.HelloRequest{
Name: "rolingg",
Message: "hello world",
})
fmt.Println(result, err)
}
服务端流式传输
那文件下载举例,服务端持续给客户端传输文件数据。
proto文件
syntax = "proto3";
package proto;
option go_package = "/proto";
message FileRequest {
string name = 1;
}
message FileResponse{
string file_name = 1;
bytes content = 2;
}
service DownloadService{
rpc DownLoadFile(FileRequest)returns(stream FileResponse){}
}
// protoc -I . --go_out=plugins=grpc:./stream_proto .\stream_proto\stream.proto
//服务端流式传输的下载,一次下几个G总不能一次传输就下好,所以要流式传输让服务端持续的发送数据给客户端。
服务端
type DownloadService struct{}
func (DownloadService) DownLoadFile(request *proto.FileRequest, stream proto.DownloadService_DownLoadFileServer) error {
fmt.Println(request)
file, err := os.Open("static/Pot-Player64.7z")
if err != nil {
return err
}
//使用defer 在函数结束时关闭文件
defer file.Close()
for {
buf := make([]byte, 2048)
//读取文件,这里和上面意味着一次读文件2048字节
_, err = file.Read(buf)
//超出流式范围则返回
if err == io.EOF {
break
}
if err != nil {
break
}
//服务端流式传输
stream.Send(&proto.FileResponse{
Content: buf,
})
}
return nil
}
func main() {
listen, err := net.Listen("tcp", ":8080")
if err != nil {
log.Fatal(err)
}
server := grpc.NewServer()
proto.RegisterDownloadServiceServer(server, &DownloadService{})
server.Serve(listen)
}
客户端
func main() {
addr := ":8080"
// 使用 grpc.Dial 创建一个到指定地址的 gRPC 连接。
// 此处使用不安全的证书来实现 SSL/TLS 连接
conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf(fmt.Sprintf("grpc connect addr [%s] 连接失败 %s", addr, err))
}
defer conn.Close()
// 初始化客户端
client := proto.NewDownloadServiceClient(conn)
stream, err := client.DownLoadFile(context.Background(), &proto.FileRequest{
Name: "流式文件下载",
})
//新建一个文件去接收传输的文件数据
file, err := os.OpenFile("static/Pot-Player64_1.7z", os.O_CREATE|os.O_WRONLY, 0600)
if err != nil {
log.Fatalln(err)
}
defer file.Close()
//将服务端流式传输的文件数据写入进新建的文件载体
writer := bufio.NewWriter(file) //创建io写操作
var index int
for {
index++ //这里index只是为了计数,实际上没用
response, err := stream.Recv() //接收传输过来的数据
if err == io.EOF {
break
}
fmt.Printf("第%d 次, 写入 %d 数据\n", index, len(response.Content))
writer.Write(response.Content)
}
writer.Flush()
}
客户端流式传输
那文件上传举例,客户端持续给服务端传输数据。
proto文件
syntax = "proto3";
package proto;
option go_package = "/proto";
message UploadRequest {
string file_name = 1;
bytes content = 2;
}
message UploadResponse{
string message = 1;
}
service UploadService{
rpc UploadFile(stream UploadRequest)returns(UploadResponse){}
}
// protoc -I . --go_out=plugins=grpc:./stream_proto .\stream_proto\upload.proto
//客户端流式传输的上传,流式传输让客户端持续的发送文件数据给服务端。
服务端
// UploadFile 这里的参数没了request,具体的实现方法可以看对应的pb.go文件里面的写法
func (UploadService) UploadFile(stream proto.UploadService_UploadFileServer) error {
file, err := os.OpenFile("static/ggst_1.png", os.O_CREATE|os.O_WRONLY, 0600)
if err != nil {
log.Fatalln(err)
}
defer file.Close()
writer := bufio.NewWriter(file)
var index int
for {
index++
response, err := stream.Recv()
if err == io.EOF {
break
}
writer.Write(response.Content)
fmt.Printf("第 %d 次文件数据上传到服务端\n", index)
}
writer.Flush() //将任何缓冲的数据写入底层io.Writer
//给客户端发送文件上传信息回馈
stream.SendAndClose(&proto.UploadResponse{Message: "文件上传完毕"})
return nil
}
func main() {
listen, err := net.Listen("tcp", ":8080")
if err != nil {
log.Fatal(err)
}
server := grpc.NewServer()
proto.RegisterUploadServiceServer(server, &UploadService{})
server.Serve(listen)
}
客户端
func main() {
addr := ":8080"
// 使用 grpc.Dial 创建一个到指定地址的 gRPC 连接。
// 此处使用不安全的证书来实现 SSL/TLS 连接
conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf(fmt.Sprintf("grpc connect addr [%s] 连接失败 %s", addr, err))
}
defer conn.Close()
// 初始化客户端
client := proto.NewUploadServiceClient(conn)
stream, err := client.UploadFile(context.Background())
file, err := os.Open("static/ggst.png")
if err != nil {
log.Fatalln(err)
}
defer file.Close()
//客户端流式传输文件数据
for {
buf := make([]byte, 2048)
_, err = file.Read(buf)
if err == io.EOF {
break
}
if err != nil {
break
}
stream.Send(&proto.UploadRequest{
FileName: "ggst.png",
Content: buf,
})
}
//接收服务端传输的文件上传回馈
response, err := stream.CloseAndRecv()
fmt.Println(response, err)
}
双向流式传输(客户端-服务端流式传输)
proto文件
syntax = "proto3"; //指定proto版本
package proto; //指定默认包名
option go_package = "/proto";
message ChatRequest {
string name = 1;
}
message ChatResponse {
string message = 1;
}
service BothStreamService {
rpc Chat(stream ChatRequest) returns(stream ChatResponse) {}
}
//protoc -I .\stream_proto --go_out=plugins=grpc:./stream_proto .\stream_proto\both_stream.proto
服务端
type BothStreamService struct{}
// Chat 实现服务内Chat方法
func (BothStreamService) Chat(stream proto.BothStreamService_ChatServer) error {
for i := 0; i < 10; i++ {
request, _ := stream.Recv()
fmt.Println(request)
stream.Send(&proto.ChatResponse{Message: "Chat服务端发送"})
}
return nil
}
// 双向流传输服务端
func main() {
listen, err := net.Listen("tcp", ":8080")
if err != nil {
log.Fatal(err)
}
server := grpc.NewServer()
proto.RegisterBothStreamServiceServer(server, &BothStreamService{})
server.Serve(listen)
}
客户端
// 双向流传输客户端
func main() {
addr := ":8080"
// 使用 grpc.Dial 创建一个到指定地址的 gRPC 连接。
// 此处使用不安全的证书来实现 SSL/TLS 连接
conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf(fmt.Sprintf("grpc connect addr [%s] 连接失败 %s", addr, err))
}
defer conn.Close()
// 初始化客户端
client := proto.NewBothStreamServiceClient(conn)
//调用Chat方法
stream, err := client.Chat(context.Background())
for i := 0; i < 10; i++ {
stream.Send(&proto.ChatRequest{Name: "Chat客户端发送"})
response, err := stream.Recv()
fmt.Println(response, err)
}
}
评论(0)