Go语言异步API设计的扇入扇出模式详解

目录
  • 前言
  • 扇入/扇出服务
  • Go 语言实现扇入/扇出模式

前言

扇出/扇入模式是更高级 API 集成的主要内容。这些应用程序并不总是表现出相同的可用性或性能特征。

扇出是从电子工程中借用的一个术语,它描述了输入的逻辑门连接到另一个输出门的数量。输出需要提供足够的电流来驱动所有连接的输入。在事务处理系统中,用来描述为了服务一个输入请求而需要做的请求总数。

扇入是指为逻辑单元的输入方程提供输入信号的最大数量。扇入是定义单个逻辑门可以接受的最大数字输入数量的术语。大多数晶体管-晶体管逻辑 (TTL) 门有一个或两个输入,尽管有些有两个以上。典型的逻辑门具有 1 或 2 的扇入。

扇入/扇出服务

我们举一个现实世界的例子,一个电子商务网站将自己与一个第三方支付网关整合在一起。 这里,网站使用支付网关的 API 来弹出支付屏幕并输入安全证书。同时,网站可能会调用另一个称为分析的 API 来记录支付的尝试。这种将一个请求分叉成多个请求的过程被称为 fan-out 扇出。在现实世界中,一个客户请求可能涉及许多扇出服务。

另一个例子是 MapReduce。Map 是一个扇入的操作,而 Reduce 是一个扇出的 操作。一个服务器可以将一个信息扇出到下一组服务(API),并忽略结果。或者可以等到这些服务器的所有响应都返回。如 如下图所示,一个传入的请求被服务器复用为转换成两个传出的请求:

扇入 fan-in 是一种操作,即两个或更多传入的请求会聚成一个请求。这种情况下,API如何聚合来自多个后端服务的结果,并将结果即时返回给客户。

例如,想想一个酒店价格聚合器或航班票务聚合器,它从不同的数据提供者那里获取关于多个酒店或航班的请求信息并显示出来。

下图显示了扇出操作是如何结合多个请求并准备一个最终的响应,由客户端消费的。

客户端也可以是一个服务器,为更多的客户提供服务。如上图所示,左侧的服务器正在收集来自酒店 A、酒店 B 和 航空公司供应商 A,并为不同的客户准备另一个响应。

因此,扇入和扇出操作并不总是完全相互独立的。大多数情况下,它将是一个混合场景,扇入和扇出操作都是相互配合的。

请记住,对下一组服务器的扇出操作可以是异步的。也是如此。对于扇入请求来说,这可能不是真的。扇入操作有时被称为 API 调用。

Go 语言实现扇入/扇出模式

Fan-out:多个 goroutine 从同一个通道读取数据,直到该通道关闭。OUT 是一种张开的模式,所以又被称为扇出,可以用来分发任务。

Fan-in:1 个 goroutine 从多个通道读取数据,直到这些通道关闭。IN 是一种收敛的模式,所以又被称为扇入,用来收集处理的结果。

package main
import (
	"context"
	"log"
	"sync"
	"time"
)
// Task 包含任务编号及任务所需时长
type Task struct {
	Number int
	Cost   time.Duration
}
// task channel 生成器
func taskChannelGerenator(ctx context.Context, taskList []Task) <-chan Task {
	taskCh := make(chan Task)
	go func() {
		defer close(taskCh)
		for _, task := range taskList {
			select {
			case <-ctx.Done():
				return
			case taskCh <- task:
			}
		}
	}()
	return taskCh
}
// doTask 处理并返回已处理的任务编号作为通道的函数
func doTask(ctx context.Context, taskCh <-chan Task) <-chan int {
	doneTaskCh := make(chan int)
	go func() {
		defer close(doneTaskCh)
		for task := range taskCh {
			select {
			case <-ctx.Done():
				return
			default:
				log.Printf("do task number: %d\n", task.Number)
				// task 任务处理
				// 根据任务耗时休眠
				time.Sleep(task.Cost)
				doneTaskCh <- task.Number // 已处理任务的编号放入通道
			}
		}
	}()
	return doneTaskCh
}
// `fan-in` 意味着将多个数据流复用或合并成一个流。
// merge 函数接收参数传递的多个通道 “taskChs”,并返回单个通道 “<-chan int”
func merge(ctx context.Context, taskChs []<-chan int) <-chan int {
	var wg sync.WaitGroup
	mergedTaskCh := make(chan int)
	mergeTask := func(taskCh <-chan int) {
		defer wg.Done()
		for t := range taskCh {
			select {
			case <-ctx.Done():
				return
			case mergedTaskCh <- t:
			}
		}
	}
	wg.Add(len(taskChs))
	for _, taskCh := range taskChs {
		go mergeTask(taskCh)
	}
	// 等待所有任务处理完毕
	go func() {
		wg.Wait()
		close(mergedTaskCh)
	}()
	return mergedTaskCh
}
func main() {
	start := time.Now()
	// 使用 context 来防止 goroutine 泄漏,即使在处理过程中被中断
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	// taskList 定义每个任务及其成本
	taskList := []Task{
		Task{1, 1 * time.Second},
		Task{2, 7 * time.Second},
		Task{3, 2 * time.Second},
		Task{4, 3 * time.Second},
		Task{5, 5 * time.Second},
		Task{6, 3 * time.Second},
	}
	// taskChannelGerenator 是一个函数,它接收一个 taskList 并将其转换为 Task 类型的通道
	// 执行结果(int slice channel)存储在 worker 中
	// 由于 doTask 的结果是一个通道,被分给了多个 worker,这就对应了 fan-out 处理
	taskCh := taskChannelGerenator(ctx, taskList)
	numWorkers := 4
	workers := make([]<-chan int, numWorkers)
	for i := 0; i < numWorkers; i++ {
		workers[i] = doTask(ctx, taskCh)  // doTask 处理并返回已处理的任务编号作为通道的函数
	}
	count := 0
	for d := range merge(ctx, workers) { // merge 从中读取已处理的任务编号
		count++
		log.Printf("done task number: %d\n", d)
	}
	log.Printf("Finished. Done %d tasks. Total time: %fs", count, time.Since(start).Seconds())
}

参考链接:

Fan-in/fan-out of services

Understanding the Fan-Out/Fan-In API Integration Pattern

以上就是Go语言异步API设计的扇入扇出模式详解的详细内容,更多关于Go异步API扇入扇出模式的资料请关注我们其它相关文章!

(0)

相关推荐

  • Golang实现异步上传文件支持进度条查询的方法

    业务背景 业务需求要求开发一个异步上传文件的接口,并支持上传进度的查询. 需求分析 ZIP压缩包中,包含一个csv文件和一个图片文件夹,要求:解析csv数据存入mongo,将图片文件夹中的图片信息对应上csv中的人员信息. ZIP压缩包解压 使用golang自带的 "archive/zip" 包解压. func decompressZip(filePath, dest string) (string, string, error) { var csvName string imageF

  • Go语言同步与异步执行多个任务封装详解(Runner和RunnerAsync)

    前言 同步适合多个连续执行的,每一步的执行依赖于上一步操作,异步执行则和任务执行顺序无关(如从10个站点抓取数据) 同步执行类RunnerAsync 支持返回超时检测,系统中断检测 错误常量定义 //超时错误 var ErrTimeout = errors.New("received timeout") //操作系统系统中断错误 var ErrInterrupt = errors.New("received interrupt") 实现代码如下 package ta

  • 基于golang channel实现的轻量级异步任务分发器示例代码

    前言 有时候我们为了更好的利用计算机资源,可以把一些耗时长的任务队列化异步执行.举个对应简单的生活中例子就是大多数餐厅里面点菜都是先找地方做,看了菜单选好菜之后找服务员点菜,此时再等待菜做好送上来.这里餐厅厨房就是计算机的底层资源,菜就是待执行的任务,而服务员就是我们的go channel. 关于消息队列有很多好用的框架,如nsq,nats,kafka等等.但有时我们只需要轻量级的异步任务工具,而不需要太过于"复杂"的框架相对于我们的需求来说.于是借鉴一些项目框架,做了一个小小的封装.

  • golang gin 框架 异步同步 goroutine 并发操作

    goroutine机制可以方便地实现异步处理 package main import ( "log" "time" "github.com/gin-gonic/gin" ) func main() { // 1.创建路由 // 默认使用了2个中间件Logger(), Recovery() r := gin.Default() // 1.异步 r.GET("/long_async", func(c *gin.Context) {

  • golang将多路复异步io转成阻塞io的方法详解

    前言 本文主要给大家介绍了关于golang 如何将多路复异步io转变成阻塞io的相关内容,分享出来供大家参考学习,下面话不多说了,来一起看看详细的介绍: package main import ( "net" ) func handleConnection(c net.Conn) { //读写数据 buffer := make([]byte, 1024) c.Read(buffer) c.Write([]byte("Hello from server")) } fu

  • Go语言异步API设计的扇入扇出模式详解

    目录 前言 扇入/扇出服务 Go 语言实现扇入/扇出模式 前言 扇出/扇入模式是更高级 API 集成的主要内容.这些应用程序并不总是表现出相同的可用性或性能特征. 扇出是从电子工程中借用的一个术语,它描述了输入的逻辑门连接到另一个输出门的数量.输出需要提供足够的电流来驱动所有连接的输入.在事务处理系统中,用来描述为了服务一个输入请求而需要做的请求总数. 扇入是指为逻辑单元的输入方程提供输入信号的最大数量.扇入是定义单个逻辑门可以接受的最大数字输入数量的术语.大多数晶体管-晶体管逻辑 (TTL)

  • C++利用MySQL API连接和操作数据库实例详解

    1.C++连接和操作MySQL的方式 系列文章: MySQL 设计和命令行模式下建立详解 C++利用MySQL API连接和操作数据库实例详解 在Windows平台,我们可以使用ADO.ODBC或者MySQL API进行连接和操作.ADO (ActiveX Data Objects,ActiveX数据对象)是Microsoft提出的一个用于存取数据源的COM组件.它提供了程序语言和统一数据访问方式OLE DB的一个中间层,也就是Microsoft提出的应用程序接口(API)用以实现访问关系或非关

  • Go语言中的数据竞争模式详解

    目录 前言 Go在goroutine中通过引用来透明地捕获自由变量 切片会产生难以诊断的数据竞争 并发访问Go内置的.不安全的线程映射会导致频繁的数据竞争 Go开发人员常在pass-by-value时犯错并导致non-trivial的数据竞争 消息传递(通道)和共享内存的混合使用使代码变得复杂且易受数据竞争的影响 Add和Done方法的错误放置会导致数据竞争 并发运行测试会导致产品或测试代码中的数据竞争 小结 前言 本文主要基于在Uber的Go monorepo中发现的各种数据竞争模式,分析了其

  • Go语言从单体服务到微服务设计方案详解

    目录 概述 业务场景 设计方案 Api网关 数据 Go中的Grpc使用 问题和反思 概述 微服务是一种思想,与编程语言无关,编程语言是思想下具体的一种实现方式,怎么设计架构方案和实现主要看主要面临的业务场景. 业务场景 主站核心业务使用的是yaf(php)开发的,要实现k8s + x编程语言 自主微服务实现,受到陈皓(左耳听风)的影响,我选用的编程语言是Go,Go语言有更强大的生态,有谷歌,k8s作为强大的后盾,摸着石头过河. 设计方案 Api网关 提到微服务我们就联想到Rpc,主流微服务价格设

  • java 中同步、异步、阻塞和非阻塞区别详解

    java 中同步.异步.阻塞和非阻塞区别详解 简单点说: 阻塞就是干不完不准回来,一直处于等待中,直到事情处理完成才返回: 非阻塞就是你先干,我先看看有其他事没有,一发现事情被卡住,马上报告领导. 我们拿最常用的send和recv两个函数来说吧... 比如你调用send函数发送一定的Byte,在系统内部send做的工作其实只是把数据传输(Copy)到TCP/IP协议栈的输出缓冲区,它执行成功并不代表数据已经成功的发送出去了,如果TCP/IP协议栈没有足够的可用缓冲区来保存你Copy过来的数据的话

  • Python 异步协程函数原理及实例详解

    这篇文章主要介绍了Python 异步协程函数原理及实例详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 一. asyncio 1.python3.4开始引入标准库之中,内置对异步io的支持 2.asyncio本身是一个消息循环 3.步骤: (1)创建消息循环 (2)把协程导入 (3)关闭 4.举例: import threading # 引入异步io包 import asyncio # 使用协程 @ asyncio.coroutine def

  • Redis源码设计剖析之事件处理示例详解

    目录 1. Redis事件介绍 2. 事件的抽象 2.1 文件事件结构 2.2 时间事件结构 2.3 事件状态结构 3. 事件的实现 1. Redis事件介绍 Redis服务器是一个事件驱动程序,所谓事件驱动就是输入一条命令并且按下回车,然后消息被组装成Redis协议的格式发送给Redis服务器,这个时候就会产生一个事件,Redis服务器会接收改命令,处理该命令和发送回复,而当我们没有与服务器进行交互时,服务器就会处于阻塞等待状态,它会让出CPU然后进入睡眠状态,当事件触发时,就会被操作系统唤醒

  • Java语言面向对象编程思想之类与对象实例详解

    在初学者学Java的时候,面向对象很难让人搞懂,那么今天小编就来为大家把这个思想来为大家用极为简单的方法理解吧. 首先我们来简单的阐述面向对象的思想. 面向对象: 官方的语言很抽象,我们把官方的解释和定义抛开.想想,自己有什么,对!!我们自己有手脚眼口鼻等一系列的器官.来把自己所具有的器官就可以看作我们的属性,自己是不是可以喜怒哀乐和嬉笑怒骂,这些是不是我们的行为,那么自己的具有的属性加自己有的行为就称为一个对象. 注意!!我们自己,一个个体是一个对象,因为,你是你,我是我,我们虽然有相同的,但

  • C语言实现进制转换函数的实例详解

    C语言实现进制转换函数的实例详解 前言: 写一个二进制,八进制,十六进制转换为十进制的函数 要求: 函数有两个参数,参数(1)是要转换为十进制的进制数,参数(2)是标示参数(1)是什么进制(2,8,16标示二进制,八进制,十六进制). 要有报错信息,比如参数是1012,但参数(2)是2,显然是进制数表示有错误. 系统表 pg_proc 存储关于函数的信息 内部函数在编译之前需要先定义在 pg_proc.h 中,src/include/catalog/pg_proc.h CATALOG(pg_pr

  • C语言在头文件中定义const变量详解

    C语言在头文件中定义const变量详解 在头文件中定义const不会有多变量的警告或错误,如果该头文件被大量包含会造成rom空间的浪费. 通过查看*.i文件的展开呢,可以发现每个.i文件都会有相应的变量展开. 查看*.map文件,能查看到该变量的多个地址分配. 在预编译的时候如果在头文件定义了const变量,每一个包含该头文件的c文件都会将其展开,而在编译的时候不会报错,因为这符合语法规则,每一个包含这个头文件的*.c文件都会编译一次这个变量,分配一个新的地址,然后在链接的时候也不会报错,因为每

随机推荐