Go1.18新特性使用Generics泛型进行流式处理

前言

Stream 是一个基于 Go 1.18+ 泛型的流式处理库, 它支持并行处理流中的数据. 并行流会将元素平均划分多个的分区, 并创建相同数量的 goroutine 执行, 并且会保证处理完成后流中元素保持原始顺序.

GitHub - xyctruth/stream: A Stream library based on Go 1.18+ Generics (Support Parallel Stream)

安装

需要安装 Go 1.18+ 版本

$ go get github.com/xyctruth/stream

在代码中导入它

import "github.com/xyctruth/stream"

基础

s := stream.NewSliceByOrdered([]string{"d", "a", "b", "c", "a"}).
    Filter(func(s string) bool { return s != "b" }).
    Map(func(s string) string { return "class_" + s }).
    Sort().
    Distinct().
    ToSlice()
// 需要转换切片元素的类型
s := stream.NewSliceByMapping[int, string, string]([]int{1, 2, 3, 4, 5}).
    Filter(func(v int) bool { return v >3 }).
    Map(func(v int) string { return "mapping_" + strconv.Itoa(v) }).
    Reduce(func(r string, v string) string { return r + v })

类型约束

any 接受任何类型的元素, 所以不能使用 == != > < 比较元素, 导致你不能使用 Sort(), Find()...等函数 ,但是你可以使用 SortFunc(fn), FindFunc(fn)... 代替

type SliceStream[E any] struct {
    slice      []E
}
stream.NewSlice([]int{1, 2, 3, 7, 1})

comparable 接收的类型可以使用 == != 比较元素, 但仍然不能使用 > < 比较元素, 因此你不能使用 Sort(), Min()...等函数 ,但是你可以使用 SortFunc(fn), MinFunc()... 代替

type SliceComparableStream[E comparable] struct {
    SliceStream[E]
}
stream.NewSliceByComparable([]int{1, 2, 3, 7, 1})

constraints.Ordered 接收的类型可以使用 == != > <, 所以可以使用所有的函数

type SliceOrderedStream[E constraints.Ordered] struct {
    SliceComparableStream[E]
}
stream.NewSliceByOrdered([]int{1, 2, 3, 7, 1})

类型转换

有些时候我们需要使用 Map ,Reduce 转换切片元素的类型,但是很遗憾目前 Golang 并不支持结构体的方法有额外的类型参数,所有类型参数必须在结构体中声明。在 Golang 支持之前我们暂时使用临时方案解决这个问题。

// SliceMappingStream  Need to convert the type of slice elements.
// - E elements type
// - MapE map elements type
// - ReduceE reduce elements type
type SliceMappingStream[E any, MapE any, ReduceE any] struct {
    SliceStream[E]
}
s := stream.NewSliceByMapping[int, string, string]([]int{1, 2, 3, 4, 5}).
    Filter(func(v int) bool { return v &gt;3 }).
    Map(func(v int) string { return "mapping_" + strconv.Itoa(v) }).
    Reduce(func(r string, v string) string { return r + v })

并行

Parallel 函数接收一个 goroutines int 参数. 如果 goroutines>1 则开启并行, 否则关闭并行, 默认流是关闭并行的。

并行会将流中的元素平均划分多个的分区, 并创建相同数量的 goroutine 执行, 并且会保证处理完成后流中元素保持原始顺序.

s := stream.NewSliceByOrdered([]string{"d", "a", "b", "c", "a"}).
    Parallel(10).
    Filter(func(s string) bool {
    // 一些耗时操作
    return s != "b"
    }).
    Map(func(s string) string {
    // 一些耗时操作
    return "class_" + s
    }).
    ForEach(
    func(index int, s string) {
    // 一些耗时操作
    },
    ).ToSlice()

并行类型

  • First: 一旦获得第一个返回值,并行处理就结束. For: AllMatch, AnyMatch, FindFunc
  • ALL: 所有元素都需要并行处理,得到所有返回值,然后并行结束. For: Map, Filter
  • Action: 所有元素需要并行处理,不需要返回值. For: ForEach, Action

并行 goroutines

开启并行 goroutine 数量在面对 CPU 操作与 IO 操作有着不同的选择。 一般面对 CPU 操作时 goroutine 数量不需要设置大于 CPU 核心数,而 IO 操作时 goroutine 数量可以设置远远大于 CPU 核心数.

CPU 操作

NewSlice(s).Parallel(goroutines).ForEach(func(i int, v int) {
    sort.Ints(newArray(1000)) //  模拟 CPU 耗时操作
})

使用6个cpu核心进行基准测试

go test -run=^$ -benchtime=5s -cpu=6  -bench=^BenchmarkParallelByCPU
goarch: amd64
pkg: github.com/xyctruth/stream
cpu: Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz
BenchmarkParallelByCPU/no_parallel(0)-6         	     717	   9183119 ns/op
BenchmarkParallelByCPU/goroutines(2)-6          	    1396	   4303113 ns/op
BenchmarkParallelByCPU/goroutines(4)-6          	    2539	   2388197 ns/op
BenchmarkParallelByCPU/goroutines(6)-6          	    2932	   2159407 ns/op
BenchmarkParallelByCPU/goroutines(8)-6          	    2334	   2577405 ns/op
BenchmarkParallelByCPU/goroutines(10)-6         	    2649	   2352926 ns/op

IO 操作

NewSlice(s).Parallel(goroutines).ForEach(func(i int, v int) {
    time.Sleep(time.Millisecond) // 模拟 IO 耗时操作
})

使用6个cpu核心进行基准测试

go test -run=^$ -benchtime=5s -cpu=6  -bench=^BenchmarkParallelByIO
goos: darwin
goarch: amd64
pkg: github.com/xyctruth/stream
cpu: Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz
BenchmarkParallelByIO/no_parallel(0)-6          	      52	 102023558 ns/op
BenchmarkParallelByIO/goroutines(2)-6           	     100	  55807303 ns/op
BenchmarkParallelByIO/goroutines(4)-6           	     214	  27868725 ns/op
BenchmarkParallelByIO/goroutines(6)-6           	     315	  18925789 ns/op
BenchmarkParallelByIO/goroutines(8)-6           	     411	  14439700 ns/op
BenchmarkParallelByIO/goroutines(10)-6          	     537	  11164758 ns/op
BenchmarkParallelByIO/goroutines(50)-6          	    2629	   2310602 ns/op
BenchmarkParallelByIO/goroutines(100)-6         	    5094	   1221887 ns/op

项目地址 https://github.com/xyctruth/stream

以上就是Go1.18新特性使用Generics泛型进行流式处理的详细内容,更多关于Go1.18 Generics泛型流式处理的资料请关注我们其它相关文章!

(0)

相关推荐

  • 浅谈Go1.18中的泛型编程

    目录 前言 以前的Go泛型 泛型是什么 Go的泛型 泛型函数 泛型类型 类型集合 和接口的差异 总结 前言 经过这几年的千呼万唤,简洁的Go语言终于在1.18版本迎来泛型编程.作为一门已经有了14年历史的强类型语言,很难相信它到现在才开始有一个正式的泛型. 以前的Go泛型 虽然直到1.18版本才加入泛型,但是在2014年便有相关的讨论要在Go中加入泛型设计.但是由于各种原因没有实现.而之后的接口(interface)的提出,让泛型进一步搁置.但是由于接口的缺陷,最终Go团队还是在1.18的版本中

  • Go1.18都出泛型了速来围观

    go泛型使用的官方说明:https://go.dev/doc/tutorial/generics 在使用之前先把go更新到1.18或者以上的版本:https://go.dev/doc/install 如果用过c++或者Java的话,那么对泛型这个概念应该是不陌生的.(下面这段定义摘抄自百度百科)泛型程序设计(generic programming)是程序设计语言的一种风格或范式.泛型允许程序员在强类型程序设计语言中编写代码时使用一些以后才指定的类型,在实例化时作为参数指明这些类型. 我们可以用个

  • Go1.18 新特性之多模块Multi-Module工作区模式

    目录 背景 举例:未发布的 module Go1.18 新特性:多模块(Multi-Module)工作区模式 Go1.18 工作区模式 初始化一个新的工作区 go.work 文件结构 go.work 文件优先级高于 go.mod 中定义在 如何禁用工作区 背景 在 go 中使用多个模块可能真的是一件苦差事.特别是当您的一个模块依赖于另一个模块时,您需要同时编辑这两个模块! 您编辑父模块,但是然后您需要将其推送到repo.然后在依赖模块中运行 update 以下载新版本.最终使用2行修复您需要的.

  • Go1.18新特性对泛型支持详解

    目录 1.泛型是什么 2.泛型类型的定义 2.1.声明一个自定义类型 2.2.内置的泛型类型any和comparable 2.3.泛型中的~符号是什么 1.泛型是什么 Go1.18增加了对泛型的支持,泛型是一种独立于使用的特定类型编写代码的方式.现在可以编写函数和类型适用于一组类型集合的任何一种.泛型生命周期只在编译期,旨在开发中减少重复代码的编写. 由于go属于静态强类型语言,例如在比较两个数的大小时,没有泛型的时候,仅仅只是传入类型不一样,我们就要再复制一份一样的函数,如果有了泛型就可以减少

  • Go1.18新特性之泛型使用三步曲(小结)

    目录 01 Go中的泛型是什么 1.1 传统的函数编写方式 1.2 泛型函数编写方式 02 从泛型被加入之前说起 2.1 针对每一种类型编写一套重复的代码 2.2 使用空接口并通过类型断言来判定具体的类型 2.3 传递空接口并使用反射解析具体类型 2.4 通过自定义接口类型实现 03 深入理解泛型--泛型使用“三步曲” 3.1 第一步:类型参数化 3.2 第二步:给类型添加约束 3.3 第三步:类型参数实例化 04 泛型类型约束和普通接口的区别 总结 01 Go中的泛型是什么 众所周知,Go是一

  • Go1.18新特性使用Generics泛型进行流式处理

    前言 Stream 是一个基于 Go 1.18+ 泛型的流式处理库, 它支持并行处理流中的数据. 并行流会将元素平均划分多个的分区, 并创建相同数量的 goroutine 执行, 并且会保证处理完成后流中元素保持原始顺序. GitHub - xyctruth/stream: A Stream library based on Go 1.18+ Generics (Support Parallel Stream) 安装 需要安装 Go 1.18+ 版本 $ go get github.com/xy

  • Go1.18新特性工作区模糊测试及泛型的使用详解

    目录 前言 Go工作区模式(Go Workspace Mode) 现实的情况 多仓库同时开发 多个新仓库开始开发 工作区模式是什么 推荐的使用方法 使用时的注意点 Go模糊测试(Go Fuzzing Test) 为什么Golang要支持模糊测试 模糊测试是什么 Golang的模糊测试如何使用 最简单的实践例子 提供自定义语料 使用时的注意点 Go的泛型 类型参数(Type Parameters) 类型集合(Type Sets) 类型推导(Type Inference) 类型统一化(Type Un

  • JDK10新特性之var泛型和多个接口实现方法

    简介 在JDK10的新特性:本地变量类型var中我们讲到了为什么使用var和怎么使用var. 今天我们来深入的考虑一下var和泛型,多个接口实现的问题. 实现多个接口 在JDK的实现和我们日常的工作中,很多时候都需要实现多个接口,我们举常用的两个例子ArrayList和CopyOnWriteArrayList.先看下他们的定义: public class ArrayList<E> extends AbstractList<E> implements List<E>, R

  • Go1.16新特性embed打包静态资源文件实现

    目录 背景 embed 嵌入 字符串.字节切片.文件嵌入 嵌入文件 嵌入文件夹 嵌入匹配 FS 文件系统 总结: 背景 相信有一部分人喜爱 GO 的初衷大概是:跨平台静态编译,如果在没用通过 CGO 引用其他库的话,一般编译出来的可执行二进制文件部署起来非常方便,但人们在实际中发现,使用 Go 语言开发的后端 WEB 程序存在 HTML 模版.图片.JS.CSS.JSON 等静态资源,部署时需要把这些静态资源与二进制程序一起上传到服务器部署,在现今遍地花容器的今天,为了简化部署流程,能不能更进一

  • C++11新特性“=default”,“=delete”的使用

    1. =default 和=delete 概述 任何事物的出现都必然有着其出现的理由,伴随着每一个新的概念产生都会带来一系列的便利和价值.C++在不断的演变与发展,与此同时,伴随着许多新的特性和功能产生.=default.=delete 是C++11的新特性,分别为:显式缺省(告知编译器生成函数默认的缺省版本)和显式删除(告知编译器不生成函数默认的缺省版本).C++11中引进这两种新特性的目的是为了增强对"类默认函数的控制",从而让程序员更加精准地去控制默认版本的函数.其具体的功能和使

  • Go1.20 arena新特性示例详解

    目录 正文 快速背景 最新进展 总结 正文 大概半年前,我写过一篇文章<Go 要违背初心吗?新提案:手动管理内存>.有兴趣了深入解的同学,可以再回顾一下. 当时我们还想着 Go 团队应该不会接纳,至少不会那么快: 懒得翻也可以看我再次道来,本文提到的提案<proposal: arena: new package providing memory arenas>,这其中的 Arena 将会是一个突破项. 快速背景 Arena 指的是一种从一个连续的内存区域分配一组内存对象的方式.优点

随机推荐