G

[Golang] gRPC 四种传输方式

RoLingG gRPC 2024-06-07

gRPC四种传输方式

单对单传输

proto文件

syntax = "proto3"; //指定proto版本
package hello_proto;  //指定默认包名

//指定golang包名
option go_package = "/hello_proto"; //在hello.proto文件所在的目录下生成一个名为hello_proto的目录存放.pb.go文件
//option go_package = ".;xxx";  这样写,是在hello.proto文件所在的目录下直接生成.pb.go文件,文件的包名为xxx

//定义rpc服务
//service 对应的是go语言的接口interface
service HelloService {
  //定义函数/方法
  //rpc对应的是go结构体中的方法
  rpc SayHello (HelloRequest) returns (HelloResponse) {}
}

//HelloRequest 请求内容
//message对应的是go的struct
message HelloRequest {
  string name = 1;
  string message = 2;
}

//HelloResponse 回应内容
message HelloResponse {
  string name = 1;
  string message = 2;
}

服务端

// HelloService 这个结构体的名字不重要,能接收参数和实现方法就行
type HelloService struct {
}

func (HelloService) SayHello(ctx context.Context, request *hello_proto.HelloRequest) (response *hello_proto.HelloResponse, err error) {

    return &hello_proto.HelloResponse{
        Name:    "RoLingG",
        Message: "你好呀",
    }, nil
}

// 监听方法、注册服务、实现grpc实例
func main() {
    // 监听端口
    listen, err := net.Listen("tcp", ":8080")
    if err != nil {
        grpclog.Fatalf("Failed to listen: %v", err)
    }

    // 创建一个gRPC服务器实例。
    s := grpc.NewServer()
    service := HelloService{} //创建一个proto里面的service的HelloService的服务
    // 将server结构体注册为gRPC服务。
    hello_proto.RegisterHelloServiceServer(s, &service) //RegisterHelloServiceServer()传过去一个proto里面的service的HelloService的服务
    fmt.Println("grpc server running :8080")
    // 开始处理客户端请求。
    err = s.Serve(listen)
}

客户端

func main() {
    port := ":8080"
    // 使用 grpc.Dial 创建一个到指定地址的 gRPC 连接。
    // 此处使用不安全的证书来实现 SSL/TLS 连
    conn, err := grpc.Dial(port, grpc.WithTransportCredentials(insecure.NewCredentials()))
    if err != nil {
        log.Fatalf(fmt.Sprintf("grpc connect fail [%s] %s", port, err))
    }
    defer conn.Close()
    //初始化客户端
    //NewHelloServiceClient()为proto文件输入cmd指令后自动生成的对应HelloService服务的客户端方法
    client := hello_proto.NewHelloServiceClient(conn)

    result, err := client.SayHello(context.Background(), &hello_proto.HelloRequest{
        Name:    "rolingg",
        Message: "hello world",
    })
    fmt.Println(result, err)
}

服务端流式传输

那文件下载举例,服务端持续给客户端传输文件数据。

proto文件

syntax = "proto3";
package proto;
option go_package = "/proto";


message FileRequest {
  string name = 1;
}

message FileResponse{
  string file_name = 1;
  bytes content = 2;
}

service DownloadService{
  rpc DownLoadFile(FileRequest)returns(stream FileResponse){}
}


// protoc -I . --go_out=plugins=grpc:./stream_proto .\stream_proto\stream.proto
//服务端流式传输的下载,一次下几个G总不能一次传输就下好,所以要流式传输让服务端持续的发送数据给客户端。

服务端

type DownloadService struct{}

func (DownloadService) DownLoadFile(request *proto.FileRequest, stream proto.DownloadService_DownLoadFileServer) error {
    fmt.Println(request)
    file, err := os.Open("static/Pot-Player64.7z")
    if err != nil {
        return err
    }
    //使用defer 在函数结束时关闭文件
    defer file.Close()

    for {
        buf := make([]byte, 2048)
        //读取文件,这里和上面意味着一次读文件2048字节
        _, err = file.Read(buf)
        //超出流式范围则返回
        if err == io.EOF {
            break
        }
        if err != nil {
            break
        }
        //服务端流式传输
        stream.Send(&proto.FileResponse{
            Content: buf,
        })
    }
    return nil
}

func main() {
    listen, err := net.Listen("tcp", ":8080")
    if err != nil {
        log.Fatal(err)
    }
    server := grpc.NewServer()
    proto.RegisterDownloadServiceServer(server, &DownloadService{})

    server.Serve(listen)
}

客户端

func main() {
    addr := ":8080"
    // 使用 grpc.Dial 创建一个到指定地址的 gRPC 连接。
    // 此处使用不安全的证书来实现 SSL/TLS 连接
    conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
    if err != nil {
        log.Fatalf(fmt.Sprintf("grpc connect addr [%s] 连接失败 %s", addr, err))
    }
    defer conn.Close()
    // 初始化客户端
    client := proto.NewDownloadServiceClient(conn)

    stream, err := client.DownLoadFile(context.Background(), &proto.FileRequest{
        Name: "流式文件下载",
    })

    //新建一个文件去接收传输的文件数据
    file, err := os.OpenFile("static/Pot-Player64_1.7z", os.O_CREATE|os.O_WRONLY, 0600)
    if err != nil {
        log.Fatalln(err)
    }
    defer file.Close()

    //将服务端流式传输的文件数据写入进新建的文件载体
    writer := bufio.NewWriter(file) //创建io写操作
    var index int
    for {
        index++                        //这里index只是为了计数,实际上没用
        response, err := stream.Recv() //接收传输过来的数据
        if err == io.EOF {
            break
        }
        fmt.Printf("第%d 次, 写入 %d 数据\n", index, len(response.Content))
        writer.Write(response.Content)
    }
    writer.Flush()
}

客户端流式传输

那文件上传举例,客户端持续给服务端传输数据。

proto文件

syntax = "proto3";
package proto;
option go_package = "/proto";


message UploadRequest {
  string file_name = 1;
  bytes content = 2;
}

message UploadResponse{
  string message = 1;
}

service UploadService{
  rpc UploadFile(stream UploadRequest)returns(UploadResponse){}
}

// protoc -I . --go_out=plugins=grpc:./stream_proto .\stream_proto\upload.proto
//客户端流式传输的上传,流式传输让客户端持续的发送文件数据给服务端。

服务端

// UploadFile 这里的参数没了request,具体的实现方法可以看对应的pb.go文件里面的写法
func (UploadService) UploadFile(stream proto.UploadService_UploadFileServer) error {
    file, err := os.OpenFile("static/ggst_1.png", os.O_CREATE|os.O_WRONLY, 0600)
    if err != nil {
        log.Fatalln(err)
    }
    defer file.Close()

    writer := bufio.NewWriter(file)
    var index int
    for {
        index++
        response, err := stream.Recv()
        if err == io.EOF {
            break
        }
        writer.Write(response.Content)
        fmt.Printf("第 %d 次文件数据上传到服务端\n", index)
    }
    writer.Flush() //将任何缓冲的数据写入底层io.Writer
    //给客户端发送文件上传信息回馈
    stream.SendAndClose(&proto.UploadResponse{Message: "文件上传完毕"})
    return nil
}

func main() {
    listen, err := net.Listen("tcp", ":8080")
    if err != nil {
        log.Fatal(err)
    }
    server := grpc.NewServer()
    proto.RegisterUploadServiceServer(server, &UploadService{})

    server.Serve(listen)
}

客户端

func main() {
    addr := ":8080"
    // 使用 grpc.Dial 创建一个到指定地址的 gRPC 连接。
    // 此处使用不安全的证书来实现 SSL/TLS 连接
    conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
    if err != nil {
        log.Fatalf(fmt.Sprintf("grpc connect addr [%s] 连接失败 %s", addr, err))
    }
    defer conn.Close()
    // 初始化客户端
    client := proto.NewUploadServiceClient(conn)
    stream, err := client.UploadFile(context.Background())

    file, err := os.Open("static/ggst.png")
    if err != nil {
        log.Fatalln(err)
    }
    defer file.Close()

    //客户端流式传输文件数据
    for {
        buf := make([]byte, 2048)
        _, err = file.Read(buf)
        if err == io.EOF {
            break
        }
        if err != nil {
            break
        }
        stream.Send(&proto.UploadRequest{
            FileName: "ggst.png",
            Content:  buf,
        })
    }
    //接收服务端传输的文件上传回馈
    response, err := stream.CloseAndRecv()
    fmt.Println(response, err)
}

双向流式传输(客户端-服务端流式传输)

proto文件

syntax = "proto3"; //指定proto版本
package proto;  //指定默认包名

option go_package = "/proto";

message ChatRequest {
  string name = 1;
}

message ChatResponse {
  string message = 1;
}

service BothStreamService {
  rpc Chat(stream ChatRequest) returns(stream ChatResponse) {}
}

//protoc -I .\stream_proto --go_out=plugins=grpc:./stream_proto .\stream_proto\both_stream.proto

服务端

type BothStreamService struct{}

// Chat 实现服务内Chat方法
func (BothStreamService) Chat(stream proto.BothStreamService_ChatServer) error {
    for i := 0; i < 10; i++ {
        request, _ := stream.Recv()
        fmt.Println(request)
        stream.Send(&proto.ChatResponse{Message: "Chat服务端发送"})
    }
    return nil
}

// 双向流传输服务端
func main() {
    listen, err := net.Listen("tcp", ":8080")
    if err != nil {
        log.Fatal(err)
    }
    server := grpc.NewServer()
    proto.RegisterBothStreamServiceServer(server, &BothStreamService{})
    server.Serve(listen)
}

客户端

// 双向流传输客户端
func main() {
    addr := ":8080"
    // 使用 grpc.Dial 创建一个到指定地址的 gRPC 连接。
    // 此处使用不安全的证书来实现 SSL/TLS 连接
    conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
    if err != nil {
        log.Fatalf(fmt.Sprintf("grpc connect addr [%s] 连接失败 %s", addr, err))
    }
    defer conn.Close()
    // 初始化客户端
    client := proto.NewBothStreamServiceClient(conn)
    //调用Chat方法
    stream, err := client.Chat(context.Background())

    for i := 0; i < 10; i++ {
        stream.Send(&proto.ChatRequest{Name: "Chat客户端发送"})
        response, err := stream.Recv()
        fmt.Println(response, err)
    }
}
PREV
[Golang] gRPC proto文件解读
NEXT
[面试] 开源中国面经

评论(0)

发布评论