Ruby3多线程并行Ractor使用方法详解

Ruby 3 Ractor官方手册:https://github.com/ruby/ruby/blob/master/doc/ractor.md

在Ruby3之前,使用Thread来创建新的线程,但这种方式创建的多线程是并发而非并行的,MRI有一个全局解释器锁GIL来控制同一时刻只能有一个线程在执行:

# main Thread

t1 = Thread.new do
  # new Thread
  sleep 3
end
t1.join

Ruby3通过Ractor(Ruby Actor,Actor模型通过消息传递的方式来修改状态)支持真正的多线程并行,多个Ractor之间可并行独立运行。

# main Ractor

# 创建一个可与main Ractor并行运行的Ractor
r = Ractor.new do
  sleep 2
  Ractor.yield "hello"
end

puts r.take

需注意,每个Ractor中至少有一个原生Ruby线程,但每个Ractor内部都拥有独立的GIL,使得Ractor内部在同一时刻最多只能有一个线程在运行。从这个角度来看,Ractor实际上是解释器线程,每个解释器线程拥有一个全局解释器锁。

如果main Ractor退出,则其他Ractor也会收到退出信号,就像main Thread退出时,其他Thread也会退出一样。

创建Ractor

使用Ractor.new创建一个Ractor实例,创建实例时需指定一个语句块,该语句块中的代码会在该Ractor中运行。

r = Ractor.new do
  puts "new Ractor"
end

可在new方法的参数上为该Ractor实例指定名称:

r = Ractor.new(name: "ractor1") do
  puts "new Ractor"
end

puts r.name  # ractor 1

new方法也可指定其他参数,这些参数必须在name参数之前,且这些参数将直接原样传递给语句块参数:

arr = [11, 22, 33]
r = Ractor.new(arr, name: "r1") do |arr|
  puts "arr"
end
sleep 1

关于new的参数,稍后还会有解释。

可使用Ractor.current获取当前的Ractor实例,使用Ractor.count获取当前存活的Ractor实例数量。

Ractor之间传递消息

Ractor传递消息的方式分两种:

  • Push方式:向某个特定的Ractor实例推送消息,可使用r.send(Msg)或别名r << Msg向该Ractor实例传送消息,并在该Ractor实例内部使用Ractor.receive或别名Ractor.recv或它们的同名私有方法来接收推送进来的消息

    • Ractor还提供了Ractor.receive_if {expr}方法,表示只在expr为true时才接收消息,receive等价于receive_if {true}
  • Pull方式:从某个特定的Ractor实例拉取消息,可在该Ractor实例内部使用Ractor.yield向外传送消息,并在需要的地方使用r.take获取传输出来的消息
    • Ractor.new的语句块返回值,相当于Ractor.yield,它也可被r.take接收

因此,对于Push方式,要求知道消息传递的目标Ractor,对于Pull方式,要求知道消息的来源Ractor。

# yield + take
r = Ractor.new {Ractor.yield "hello"}
puts r.take

# send + receive
r1 = Ractor.new do
  # Ractor.receive或Ractor.recv
  # 或同名私有方法:receive、recv
  puts Ractor.receive
end
r1.send("hello")
r1.take    # 本次take取得r1语句块的返回值,即puts的返回值nil

使用new方法创建Ractor实例时,可指定new的参数,这些参数会被原样传递给Ractor的语句块参数。

arr = [11, 22, 33]
r = Ractor.new(arr) { |arr| ...}

实际上,new的参数等价于在Ractor语句块的开头使用了Ractor.receive接收消息:

r = Ractor.new 'ok' { |msg| msg }
r.take #=> 'ok'

# 基本等价于
r = Ractor.new do
  msg = Ractor.receive
  msg
end
r.send 'ok'
r.take #=> 'ok'

消息端口

Ractor之间传递消息时,实际上是通过Ractor的消息端口进行传递的。

每个Ractor都有自己的incoming port和outgoing port:

  • incoming port:是该Ractor接收消息的端口,r.sendRactor.receive使用该端口

    • 每个incoming port都连接到一个大小不限的队列上
    • r.send传入的消息都会写入该队列,由于该队列大小不限,因此r.send从不阻塞
    • Ractor.receive从该队列弹出消息,当队列为空时,Ractor.receive被阻塞直到新消息出现
    • 可使用r.close_incoming关闭incoming port,关闭该端口后,r.send将直接报错,Ractor.receive将先从队列中取数据,当队列为空后,再调用Ractor.receive将报错
  • outgoing port:是该Ractor向外传出消息的端口,Ractor.yieldr.take使用该端口
    • Ractor.yield或Ractor语句块返回时,消息从outgoing port流出
    • 当没有r.take接收消息时,r内部的Ractor.yield将被阻塞
    • 当r内部没有Ractor.yield时,r.take将被阻塞
    • Ractor.yield从outgoing port传出的消息可被任意多个r.take等待,但只有一个r.take可获取到该消息
    • 可使用r.close_outgoing关闭outgoing port,关闭该端口后,再调用r.takeRactor.yield将直接报错。如果r.take正被阻塞(等待Ractor.yield传出消息),关闭outgoing port操作将取消所有等待中的take并报错

Ractor.select等待消息

可使用Ractor.select(r1,r2,r3...)等待一个或多个Ractor实例outgoing port上的消息(因此,select主要用于等待Ractor.yield的消息),等待到第一个消息后立即返回。

Ractor.select的返回值格式为[r, obj],其中:

  • r表示等待到的那个Ractor实例
  • obj表示接收到的消息对象

例如:

r1 = Ractor.new{'r1'}
r2 = Ractor.new{'r2'}
rs = [r1, r2]
as = []

# Wait for r1 or r2's Ractor.yield
r, obj = Ractor.select(*rs)
rs.delete(r)
as << obj

# Second try (rs only contain not-closed ractors)
r, obj = Ractor.select(*rs)
rs.delete(r)
as << obj
as.sort == ['r1', 'r2'] #=> true

通常来说,会使用Ractor.select来轮询等待多个Ractor实例的消息,通用化的处理流程参考如下:

# 充当管道功能的Ractor:接收消息并发送出去,并不断循环
pipe = Ractor.new do
  loop do
    Ractor.yield Ractor.receive
  end
end

RN = 10
# rs变量保存了10个Ractor实例
# 每个Ractor实例都从管道pipe中取一次消息然后由本Ractor发送出去
rs = RN.times.map{|i|
  Ractor.new pipe, i do |pipe, i|
    msg = pipe.take
    msg # ping-pong
  end
}
# 向管道中发送10个数据
RN.times{|i| pipe << i}

# 轮询等待10个Ractor实例的outgoing port
# 每等待成功一次,从rs中删除所等待到的Ractor实例,
# 然后继续等待剩下的Ractor实例
RN.times.map{
  r, n = Ractor.select(*rs)
  rs.delete r
  n
}.sort #=> [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

此外,Ractor.select除了可等待消息外,也可以用来yield传递消息,更多用法参考官方手册:Ractor.select

Ractor并行时如何避免竞态

多个Ractor之间是可并行运行的,为了避免Ractor之间传递数据时出现竞态问题,Ractor采取了一些措施:

  • 对于不可变对象,它们可直接在Ractor之间共享,此时传递它们的引用
  • 对于可变对象,它们不可直接在Ractor之间共享,此时传递数据时,默认先按字节逐字节拷贝,然后后传递副本
  • 也可以显式指定移动数据,将某份数据从Ractor1移动到另一个Ractor2中,即转移数据的所有权(参考Rust的所有权规则),转移所有权后,原始所有者Ractor中将无法再访问该数据

传递可共享对象:传递引用

可共享的对象:自动传递它们的引用,效率高

  • 不可变对象可在Ractor之间直接共享(如Integer、symbol、true/false、nil),如:

    • i=123:i是可共享的
    • s="str".freeze:s是可共享的
    • h={c: Object}.freeze:h是可共享的,因为Object是一个类对象,类对象是可共享的
    • a=[1,[2],3].freeze:a不可共享,因为冻结后仍然包含可变的[2]
  • Class/Module对象,即类对象自身和模块对象自身是可共享的
  • Ractor对象自身是可共享的

例如:

i = 33
r = Ractor.new do
  m = recv
  puts m.object_id
end

r.send(i)  # 传递i
r.take     # 等待Ractor执行结束(语句块返回)
puts i.object_id  # i传递后仍然可用
=begin
67
67
=end

值得注意的是,Ractor对象是可共享的,因此可将某个Ractor实例传递给另一个Ractor实例。例如:

pipe = Ractor.new do
  loop do
    Ractor.yield Ractor.receive
  end
end

RN = 10
rs = RN.times.map{|i|
  # pipe是一个Ractor实例,这里作为参数传递给其他的Ractor实例
  Ractor.new pipe, i do |pipe, i|
    pipe << i
  end
}

RN.times.map{
  pipe.take
}.sort #=> [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

传递不可共享对象:传递副本

绝大多数对象不是可直接共享的。在Ractor之间传递不可共享的对象时,默认会传递deep-copy后的副本,即按字节拷贝的方式拷贝该对象的每一个字节。这种方式效率较低。

例如:

arr = [11, 22, 33]  # 数组是可变的,不可共享
r = Ractor.new do
  m = recv
  puts "copied: #{m.object_id}"
end

r.send(arr)  # 传递数组,此时将逐字节拷贝数组
r.take
puts "origin: #{arr.object_id}"

=begin
copied: 60
origin: 80
=end

从结果看,两个Ractor内的arr不是同一个对象。

需注意,对于全局唯一的对象来说(比如数值、nil、false、true、symbol),逐字节拷贝时并不会拷贝它们。例如:

arr = %i[lang action sub]
r = Ractor.new do
  m = recv
  puts "copied: #{m.object_id}, #{m[0].object_id}, #{m[1].object_id}"
end

r.send(arr)
r.take
puts "origin: #{arr.object_id}, #{arr[0].object_id}, #{arr[1].object_id}"

=begin
copied: 60, 80, 1046748
origin: 100, 80, 1046748
=end

注意,Thread对象无法拷贝,因此无法在Ractor之间传递。

转移数据所有权

还可以让r.send(msg, move: true)Ractor.yield(msg, move: true)传递数据时,明确表示要移动而非拷贝数据,即转移数据的所有权(从原来的所有者Ractor实例转移到目标Ractor实例)。

无论是可共享还是不可共享的对象,都可以转移所有权,只不过转移可共享对象的所有权没有意义,因为转移之后,原所有者仍然拥有所有权。

因此,通常只对不可共享的数据来转移所有权,转移所有权后,原所有者将无法访问该数据。

str = "hello"
puts str.object_id
r = Ractor.new do
  m = recv
  puts m.object_id
end

r.send(str, move: true)  # 转移str的所有权
r.take
#puts str.object_id  # 转移所有权后再访问str,将报错

=begin
60
80
=end

值得注意的是,移动的本质是内存拷贝,它底层也一样是逐字节拷贝原始数据的过程,所以移动传递数据的效率和传递副本数据的效率是类似的。移动传递和传递副本的区别之处在于所有权,移动传递后,原所有者Ractor实例将无法访问该数据,而拷贝传递方式则允许原所有者访问

注意,Thread对象无法转移所有权,因此无法在Ractor之间传递。

不可共享变成可共享:Ractor.make_shareable

对于不可共享的数据obj,可通过Ractor.make_shareable(obj)方法将其转变为可共享的数据,默认转变的方式是逐层次地递归冻结obj。也可指定额外的参数Ractor.make_shareable(obj, copy: true),此时将深拷贝obj得其副本,再让副本(逐层递归冻结)转变为可共享数据。

例如:

arr = %w[lang action sub]
puts arr.object_id
r = Ractor.new do
  m = recv
  puts m.object_id
end

r.send(Ractor.make_shareable(arr))
r.take
puts arr.object_id
puts arr.frozen?

输出:

60
60
60
true

示例

工作者线程池:

require 'prime'

pipe = Ractor.new do
  loop do
    Ractor.yield Ractor.receive
  end
end

N = 1000
RN = 10
workers = (1..RN).map do
  Ractor.new pipe do |pipe|
    while n = pipe.take
      Ractor.yield [n, n.prime?]
    end
  end
end

(1..N).each{|i|
  pipe << i
}

pp (1..N).map{
  _r, (n, b) = Ractor.select(*workers)
  [n, b]
}.sort_by{|(n, b)| n}

Pipeline:

# pipeline with yield/take
r1 = Ractor.new do
  'r1'
end

r2 = Ractor.new r1 do |r1|
  r1.take + 'r2'
end

r3 = Ractor.new r2 do |r2|
  r2.take + 'r3'
end

p r3.take #=> 'r1r2r3'

更多关于Ruby3多线程并行Ractor使用方法请查看下面的相关链接

(0)

相关推荐

  • Ruby中使用多线程队列(Queue)实现下载博客文章保存到本地文件

    Ruby:多线程下载博客文章到本地的完整代码 复制代码 代码如下: #encoding:utf-8 require 'net/http' require 'thread' require 'open-uri' require 'nokogiri' require 'date' $queue = Queue.new #文章列表页数 page_nums = 8 page_nums.times do |num|   $queue.push("http://www.cnblogs.com/hongfei

  • Ruby 多线程的潜力和弱点分析

    Web 应用大多是 IO 密集型的,利用 Ruby 多进程+多线程模型将能大幅提升系统吞吐量.其原因在于:当Ruby 某个线程处于 IO Block 状态时,其它的线程还可以继续执行.但由于存在 Ruby GIL (Global Interpreter Lock),MRI Ruby 并不能真正利用多线程进行并行计算.JRuby 去除了 GIL,是真正意义的多线程,既能应付 IO Block,也能充分利用多核 CPU 加快整体运算速度. 上面说得比较抽象,下面就用例子一一加以说明. Ruby 多线

  • 初步讲解Ruby编程中的多线程

    每个正在系统上运行的程序都是一个进程.每个进程包含一到多个线程. 线程是程序中一个单一的顺序控制流程,在单个程序中同时运行多个线程完成不同的工作,称为多线程. Ruby 中我们可以通过 Thread 类来创建多线程,Ruby的线程是一个轻量级的,可以以高效的方式来实现并行的代码. 创建 Ruby 线程 要启动一个新的线程,只需要调用 Thread.new 即可: # 线程 #1 代码部分 Thread.new { # 线程 #2 执行代码 } # 线程 #1 执行代码 实例 以下实例展示了如何在

  • Ruby多线程库(Thread)使用方法详解

    Thread是Ruby的线程库,Thread库已经内置在Ruby中,但如果想要使用线程安全的Queue.Mutex以及条件变量等,则需要手动require 'thread'. 主线程main 默认情况下,每个Ruby进程都具备一个主线程main,如果没有创建新的线程,所有的代码都将在这个主线程分支中执行. 使用Thread.main()类方法可获取当前线程组的主线程,使用Thread.current()可以获取当前正在执行的线程分支.使用Thread.list()可获取当前进程组中所有存活的线程

  • Ruby多线程编程初步入门

    传统程序有一个单独的线程执行,包含该程序的语句或指令顺序执行直到程序终止. 一个多线程的程序有多个线程的执行.在每个线程是按顺序执行的,但是在多核CPU机器上线程可能并行地执行.例如,通常情况下在单一CPU的机器,多个线程实际上不是并行执行的,而是模拟并行交叉的线程的执行. Ruby的可以使用 Thread 类很容易地编写多线程程序. Ruby线程是一个轻量级的和高效的在代码中实现并行性. 创建Ruby线程: 要启动一个新线程,关联一个块通过调用Thread.new.将创建一个新的线程执行的代码

  • Ruby3多线程并行Ractor使用方法详解

    Ruby 3 Ractor官方手册:https://github.com/ruby/ruby/blob/master/doc/ractor.md 在Ruby3之前,使用Thread来创建新的线程,但这种方式创建的多线程是并发而非并行的,MRI有一个全局解释器锁GIL来控制同一时刻只能有一个线程在执行: # main Thread t1 = Thread.new do # new Thread sleep 3 end t1.join Ruby3通过Ractor(Ruby Actor,Actor模型

  • Java实现多线程同步五种方法详解

    一.为什么要线程同步 因为当我们有多个线程要同时访问一个变量或对象时,如果这些线程中既有读又有写操作时,就会导致变量值或对象的状态出现混乱,从而导致程序异常.举个例子,如果一个银行账户同时被两个线程操作,一个取100块,一个存钱100块.假设账户原本有0块,如果取钱线程和存钱线程同时发生,会出现什么结果呢?取钱不成功,账户余额是100.取钱成功了,账户余额是0.那到底是哪个呢?很难说清楚.因此多线程同步就是要解决这个问题. 二.不同步时的代码 Bank.java package threadTe

  • C++多线程获取返回值方法详解

    在许多时候,我们会有这样的需求--即我们想要得到线程返回的值.但是在C++11 多线程中我们注意到,std::thread对象会忽略顶层函数的返回值. 那问题来了,我们要怎么获得线程的返回值呢? 我们通过一个例子来说明如何实现这个需求.用多个线程计算(a+b)/ (x+y) 的值 有两种方法,分别是 1. 传统的方法:在线程间共享指针 #include<iostream> #include<thread> #include<mutex> #include<atom

  • Java多线程连续打印abc实现方法详解

    一道编程题如下: 实例化三个线程,一个线程打印a,一个线程打印b,一个线程打印c,三个线程同时执行,要求打印出10个连着的abc. 题目分析: 通过题意我们可以得出,本题需要我们使用三个线程,三个线程分别会打印6次字符,关键是如何保证顺序一定是abc...呢.所以此题需要同步机制来解决问题! 令打印字符A的线程为ThreadA,打印B的ThreadB,打印C的为ThreadC.问题为三线程间的同步唤醒操作,主要的目的就是使程序按ThreadA->ThreadB->ThreadC->Thr

  • 多线程死锁的产生以及如何避免死锁方法(详解)

    一.死锁的定义 多线程以及多进程改善了系统资源的利用率并提高了系统 的处理能力.然而,并发执行也带来了新的问题--死锁.所谓死锁是指多个线程因竞争资源而造成的一种僵局(互相等待),若无外力作用,这些进程都将无法向前推进. 下面我们通过一些实例来说明死锁现象. 先看生活中的一个实例,2个人一起吃饭但是只有一双筷子,2人轮流吃(同时拥有2只筷子才能吃).某一个时候,一个拿了左筷子,一人拿了右筷子,2个人都同时占用一个资源,等待另一个资源,这个时候甲在等待乙吃完并释放它占有的筷子,同理,乙也在等待甲吃

  • 对Python 多线程统计所有csv文件的行数方法详解

    如下所示: #统计某文件夹下的所有csv文件的行数(多线程) import threading import csv import os class MyThreadLine(threading.Thread): #用于统计csv文件的行数的线程类 def __init__(self,path): threading.Thread.__init__(self) #父类初始化 self.path=path #路径 self.line=-1 #统计行数 def run(self): reader =

  • python多线程方法详解

    处理多个数据和多文件时,使用for循环的速度非常慢,此时需要用多线程来加速运行进度,常用的模块为multiprocess和joblib,下面对两种包我常用的方法进行说明. 1.模块安装 pip install multiprocessing pip install joblib 2.以分块计算NDVI为例 首先导入需要的包 import numpy as np from osgeo import gdal import time from multiprocessing import cpu_c

  • python多进程和多线程究竟谁更快(详解)

    python3.6 threading和multiprocessing 四核+三星250G-850-SSD 自从用多进程和多线程进行编程,一致没搞懂到底谁更快.网上很多都说python多进程更快,因为GIL(全局解释器锁).但是我在写代码的时候,测试时间却是多线程更快,所以这到底是怎么回事?最近再做分词工作,原来的代码速度太慢,想提速,所以来探求一下有效方法(文末有代码和效果图) 这里先来一张程序的结果图,说明线程和进程谁更快 一些定义 并行是指两个或者多个事件在同一时刻发生.并发是指两个或多个

  • Python异步爬虫多线程与线程池示例详解

    目录 背景 异步爬虫方式 多线程,多进程(不建议) 线程池,进程池(适当使用) 单线程+异步协程(推荐) 多线程 线程池 背景 当对多个url发送请求时,只有请求完第一个url才会接着请求第二个url(requests是一个阻塞的操作),存在等待的时间,这样效率是很低的.那我们能不能在发送请求等待的时候,为其单独开启进程或者线程,继续请求下一个url,执行并行请求 异步爬虫方式 多线程,多进程(不建议) 好处:可以为相关阻塞的操作单独开启线程或者进程,阻塞操作就可以异步会执行 弊端:不能无限制开

随机推荐