go实现grpc四种数据流模式

目录
  • 2.1 简单模式
  • 2.2 服务端数据流模式
  • 2.3 客户端数据流模式
  • 2.4 双向数据流
  • 3.1 代码目录
  • 3.2 编写stream.proto文件
  • 3.3 编写server文件
  • 3.4 编写client文件

1. 什么是数据流

  grpc中的stream,srteam顾名思义就是一种流,可以源源不断的推送数据,很适合传输一些大数据,或者服务端和客户端长时间数据交互,比如客户端可以向服务端订阅一个数据,服务端就可以利用stream,源源不断地推送数据。

底层还原成socket编程

2. grpc的四种数据流

1.简单模式
2.服务端数据流模式(Server-side streaming RPC)
3.客户端数据流模式(Client-side streaming RPC)
4.双向数据流模式(Bidirectional streaming RPC)

2.1 简单模式

  这种模式最为传统,即客户端发起一次请求,服务端响应一个数据,这和大家平时熟悉的RPC没有什么大的区别,上两篇中介绍此模式。

2.2 服务端数据流模式

  这种模式是客户端发起一次请求,服务端返回一段连续的数据流。典型的例子是客户端向服务端发送一个股票代码,服务端就把该股票的实时数据源源不断的返回给客户端

2.3 客户端数据流模式

  与服务端数据流模式相反,这次是客户端源源不断的向服务端发送数据流,而在发送结束后,由服务端返回一个响应。典型的例子是物联网终端向服务器报送数据。

2.4 双向数据流

  顾名思义,这是客户端和服务端都可以向对方发送数据流,这个时候双方的数据可以同时互相发送,也就是可以实现实时交互。典型的例子是聊天机器人。

3. 上代码

3.1 代码目录

3.2 编写stream.proto文件

stream是常量,写在哪一边,哪一边就是数据流
syntax = "proto3";

option go_package = "./;proto";

service Greeter {
    // 定义方法,stream是常量,流模式
    rpc ServerStream (StreamRequestData) returns (stream StreamResponseData);      //服务端流模式,拉消息
    rpc ClientStream (stream StreamRequestData) returns (StreamResponseData);      //客户端流模式,推消息
    rpc AllStream (stream StreamRequestData) returns (stream StreamResponseData);  //双向流模式,能推能拉
}

message StreamRequestData {
    string data = 1; //编号
}

message StreamResponseData {
    string data = 1; //编号
}
生成go的protobuf文件命令:
cd到proto目录下
命令:protoc -I . hello.proto   --go_out=plugins=grpc:.

3.3 编写server文件

package main

import (
	"file_test/grpc_go_stream/proto"
	"fmt"
	"net"
	"sync"
	"time"

	"google.golang.org/grpc"
)

const port = 8082

type server struct{}

func (s *server) ServerStream(req *proto.StreamRequestData, res proto.Greeter_ServerStreamServer) error {
	i := 0
	for {
		i++
		//业务代码
		_ = res.Send(&proto.StreamResponseData{
			Data: fmt.Sprintf("这是发给%s的数据流", req.Data),
		})
		time.Sleep(time.Second * 1)
		if i > 10 {
			break
		}
	}
	return nil
}
func (s *server) ClientStream(cliStr proto.Greeter_ClientStreamServer) error {
	for {
		//业务代码
		res, err := cliStr.Recv()
		if err != nil {
			fmt.Println("本次客户端流数据发送完了:",err)
			break
		}
		fmt.Println("客户端发来消息:",res.Data)
	}
	return nil
}
func (s *server) AllStream(allStr proto.Greeter_AllStreamServer) error {
	wg:=sync.WaitGroup{}
	wg.Add(2)
	//接受客户端消息的协程
	go func() {
		defer wg.Done()
		for  {
			//业务代码
			res, err := allStr.Recv()
			if err != nil {
				fmt.Println("本次客户端流数据发送完了:",err)
				break
			}
			fmt.Println("收到客户端发来消息:",res.Data)
		}
	}()

	//发送消息给客户端的协程
	go func() {
		defer wg.Done()
		i := 0
		for {
			i++
			//业务代码
			_ = allStr.Send(&proto.StreamResponseData{
				Data: fmt.Sprintf("这是发给客户端的数据流"),
			})
			time.Sleep(time.Second * 1)
			if i > 10 {
				break
			}
		}
	}()
	wg.Wait()
	return nil
}

// 启动
func start() {
	// 1.实例化server
	g := grpc.NewServer()
	// 2.注册逻辑到server中
	proto.RegisterGreeterServer(g, &server{})
	// 3.启动server
	lis, err := net.Listen("tcp", "127.0.0.1:8082")
	if err != nil {
		panic("监听错误:" + err.Error())
	}
	err = g.Serve(lis)
	if err != nil {
		panic("启动错误:" + err.Error())
	}

}

func main() {
	start()
}

3.4 编写client文件

package main

import (
	"context"
	"file_test/grpc_go_stream/proto"
	"fmt"
	"sync"
	"time"

	"google.golang.org/grpc"
)

var rpc proto.GreeterClient

func serverStreamDemo()  {
	//服务端流模式
	res,err:=rpc.ServerStream(context.Background(),&proto.StreamRequestData{Data: "jeff"})
	if err != nil {
		panic("rpc请求错误:"+err.Error())
	}
	for  {
		data,err:=res.Recv() //
		if err != nil {
			fmt.Println("客户端发送完了:",err)
			return
		}
		fmt.Println("客户端返回数据流值:",data.Data)
	}
}

func clientStreamDemo()  {
	//客户端流模式
	cliStr, err := rpc.ClientStream(context.Background())
	if err != nil {
		panic("rpc请求错误:" + err.Error())
	}
	i := 0
	for {
		i++
		_ = cliStr.Send(&proto.StreamRequestData{
			Data: "jeff",
		})
		time.Sleep(time.Second * 1)
		if i > 10 {
			break
		}
	}
}

func clientAndServerStreamDemo()  {
	//双向流模式
	allStr, _ := rpc.AllStream(context.Background())
	wg := sync.WaitGroup{}
	wg.Add(1)
	//接受服务端消息的协程
	go func() {
		defer wg.Done()
		for {
			//业务代码
			res, err := allStr.Recv()
			if err != nil {
				fmt.Println("本次服务端流数据发送完了:", err)
				break
			}
			fmt.Println("收到服务端发来消息:", res.Data)
		}
	}()

	//发送消息给服务端的协程
	go func() {
		defer wg.Done()
		i := 0
		for {
			i++
			//业务代码
			_ = allStr.Send(&proto.StreamRequestData{
				Data: fmt.Sprintf("这是发给服务端的数据流"),
			})
			time.Sleep(time.Second * 1)
			if i > 10 {
				break
			}
		}
	}()
	wg.Wait()
}

// 启动
func start() {
	conn, err := grpc.Dial("127.0.0.1:8082", grpc.WithInsecure())
	if err != nil {
		panic("rpc连接错误:" + err.Error())
	}
	defer conn.Close()
	rpc = proto.NewGreeterClient(conn) //初始化

	serverStreamDemo() //服务端流模式

	clientStreamDemo()  //客户端流模式

	clientAndServerStreamDemo() // 双向流模式
}

func main() {
	start()
}

以上就是go实现grpc四种数据流模式的详细内容,更多关于go实现grpc流模式的资料请关注我们其它相关文章!

(0)

相关推荐

  • go grpc安装使用教程

    gRPC是由Google主导开发的RPC框架,使用HTTP/2协议并用ProtoBuf作为序列化工具.其客户端提供Objective-C.Java接口,服务器侧则有Java.Golang.C++等接口,从而为移动端(iOS/Androi)到服务器端通讯提供了一种解决方案. 当然在当下的环境下,这种解决方案更热门的方式是RESTFull API接口.该方式需要自己去选择编码方式.服务器架构.自己搭建框架(JSON-RPC). 1. 前提 确保go的版本在1.6及以上 确保glibc版本在2.14及

  • golang 微服务之gRPC与Protobuf的使用

    RPC是什么? 所谓RPC(remote procedure call 远程过程调用)框架实际是提供了一套机制,使得应用程序之间可以进行通信,而且也遵从server/client模型.使用的时候客户端调用server端提供的接口就像是调用本地的函数一样. gRPC是什么? 与许多RPC系统一样,gRPC基于定义服务的思想,指定可以使用其参数和返回类型远程调用的方法.默认情况下,gRPC使用协议缓冲区作为接口定义语言(IDL)来描述服务接口和有效负载消息的结构. gRPC有什么好处以及在什么场景下

  • golang grpc 负载均衡的方法

    微服务架构里面,每个服务都会有很多节点,如果流量分配不均匀,会造成资源的浪费,甚至将一些机器压垮,这个时候就需要负载均衡,最简单的一种策略就是轮询,顺序依次选择不同的节点访问. grpc 在客户端提供了负载均衡的实现,并提供了服务地址解析和更新的接口(默认提供了 DNS 域名解析的支持),方便不同服务的集成 使用示例 conn, err := grpc.Dial( "", grpc.WithInsecure(), // 负载均衡,使用 consul 作服务发现 grpc.WithBal

  • 详解golang consul-grpc 服务注册与发现

    在微服务架构里面,每个小服务都是由很多节点组成,节点的添加删除故障希望能对下游透明,因此有必要引入一种服务的自动注册和发现机制,而 consul 提供了完整的解决方案,并且内置了对 GRPC 以及 HTTP 服务的支持 总体架构 服务调用: client 直连 server 调用服务 服务注册: 服务端将服务的信息注册到 consul 里 服务发现: 客户端从 consul 里发现服务信息,主要是服务的地址 健康检查: consul 检查服务器的健康状态 服务注册 服务端将服务信息注册到 con

  • golang在GRPC中设置client的超时时间

    超时 建立连接 主要就2函数Dail和DialContext. // Dial creates a client connection to the given target. func Dial(target string, opts ...DialOption) (*ClientConn, error) { return DialContext(context.Background(), target, opts...) } func DialContext(ctx context.Cont

  • C#语言使用gRPC、protobuf(Google Protocol Buffers)实现文件传输功能

    初识gRPC还是一位做JAVA的同事在项目中用到了它,为了C#的客户端程序和java的服务器程序进行通信和数据交换,当时还是对方编译成C#,我直接调用. 后来,自己下来做了C#版本gRPC编写,搜了很多资料,但许多都是从入门开始?调用说"Say Hi!"这种官方标准的入门示例,然后遇到各种问题-- 关于gRPC和Protobuf介绍,就不介绍了,网络上一搜一大把,随便一抓都是标准的官方,所以直接从使用说起. gPRC源代码:https://github.com/grpc/grpc: p

  • go实现grpc四种数据流模式

    目录 2.1 简单模式 2.2 服务端数据流模式 2.3 客户端数据流模式 2.4 双向数据流 3.1 代码目录 3.2 编写stream.proto文件 3.3 编写server文件 3.4 编写client文件 1. 什么是数据流 grpc中的stream,srteam顾名思义就是一种流,可以源源不断的推送数据,很适合传输一些大数据,或者服务端和客户端长时间数据交互,比如客户端可以向服务端订阅一个数据,服务端就可以利用stream,源源不断地推送数据. 底层还原成socket编程 2. gr

  • Android入门之Activity四种启动模式(standard、singleTop、singleTask、singleInstance)

    当应用运行起来后就会开启一条线程,线程中会运行一个任务栈,当Activity实例创建后就会放入任务栈中.Activity启动模式的设置在AndroidManifest.xml文件中,通过配置Activity的属性android:launchMode=""设置. 一.启动模式介绍 启动模式简单地说就是Activity启动时的策略,在AndroidManifest.xml中的标签的android:launchMode属性设置: 启动模式有4种,分别为standard.singleTop.s

  • Android编程中Activity的四种启动模式

    本文实例讲述了Android编程中Activity的四种启动模式.分享给大家供大家参考,具体如下: Activity启动方式有四种,分别是: standard singleTop singleTask singleInstance 可以根据实际的需求为Activity设置对应的启动模式,从而可以避免创建大量重复的Activity等问题. 设置Activity的启动模式,只需要在AndroidManifest.xml里对应的<activity>标签设置android:launchMode属性,例

  • JavaScript四种调用模式和this示例介绍

    JavaScript调用时除了声明时定义的形参外,每个函数接受两个附加参数:this 和arguments,this在面向对象编程中非常重要,它取决于调用模式. JavaScript有四种调用模式,方法调用模式,函数调用模式,构造器调用模式和apply调用模式.这些模式在初始化关键参数this上存在差异. 方法调用模式:当一个函数被保存为对象的一个属性时,我们称它为一个方法,当一个方法被调用时,this被绑定到该对象上.如果调用表达式包含一个属性取表达式(即一个.点表达式或[script]下标表

  • Javascript 函数的四种调用模式

    Javascript 函数的四种调用模式 1  函数模式 最普通的函数调用 // 声明式函数 function fn1 () { console.log(this); } // 函数表达式函数 var fn2 = function() { console.log(this); }; // 调用 函数中this表示全局对象,在浏览器中就是指window fn1(); //window fn2(); //window 2 方法模式 函数依附于一个对象,是对象的一个属性,我们再调用这个函数.这种模式就

  • Android SharedPreferences四种操作模式使用详解

    Android  SharedPreferences详解 获取SharedPreferences的两种方式: 1 调用Context对象的getSharedPreferences()方法 2 调用Activity对象的getPreferences()方法 两种方式的区别: 调用Context对象的getSharedPreferences()方法获得的SharedPreferences对象可以被同一应用程序下的其他组件共享. 调用Activity对象的getPreferences()方法获得的Sh

  • Activity 四种启动模式详细介绍

    Activity 四种启动模式详细介绍 在Android中每个界面都是一个Activity,切换界面操作其实是多个不同Activity之间的实例化操作.在Android中Activity的启动模式决定了Activity的启动运行方式. Android总Activity的启动模式分为四种: Activity启动模式设置: <activity android:name=".MainActivity" android:launchMode="standard" /&

  • 简单介绍Android中Activity的四种启动模式

    在Android中每个界面都是一个Activity,切换界面操作其实是多个不同Activity之间的实例化操作.在Android中Activity的启动模式决定了Activity的启动运行方式. Activity有四种启动模式: 1. standard,默认的启动模式,只要激活Activity,就会创建一个新的实例,并放入任务栈中,这样任务栈中可能同时有一个Activity的多个实例. 2. singleTop,激活Activity时,如果栈顶是这个Activity,就不会创建新的实例:如果栈顶

  • Android LaunchMode四种启动模式详细介绍

    Android LaunchMode详解 越是做的时间越长,基础知识就忘的越干净,最近做一个项目中,发现启动的几个Activity居然重叠了,我ri--,再不回忆一下就要退出Android界了. 概念解释 Task Task叫做任务,这个简单,表示我们需要完成的事情,注意,这里我们说的是任务,是个名词,例如要发短信,那我们的任务就是发送一条短信,仅此而已,再例如教官说:"张三,你去吃屎!",ok,那张三的任务就是吃屎. Back Stack 我们常叫做回退栈,或者是任务栈,这个是什么意

  • Android中Activity的四种启动模式和onNewIntent()

    写在前面 Activity是Android四大组件之一,用于直接跟用户进行交互,本篇文章将介绍Activity的启动流程.用户启动Activity的方式大致有两种:一种是在桌面点击应用程序的图标,进入应用程序的主界面:另一种是在应用程序中,进入一个新的Activity.前者,桌面其实是系统应用launcher的界面,点击应用程序图标,会进行应用程序的主界面,实质是从一个应用的Activity进入另一个应用Activity. 因此,不管是从桌面进入应用主界面,还是在应用里进入一个新的Activit

随机推荐