Ruby多线程库(Thread)使用方法详解

Thread是Ruby的线程库,Thread库已经内置在Ruby中,但如果想要使用线程安全的Queue、Mutex以及条件变量等,则需要手动require 'thread'

主线程main

默认情况下,每个Ruby进程都具备一个主线程main,如果没有创建新的线程,所有的代码都将在这个主线程分支中执行。

使用Thread.main()类方法可获取当前线程组的主线程,使用Thread.current()可以获取当前正在执行的线程分支。使用Thread.list()可获取当前进程组中所有存活的线程。

p Thread.main
p Thread.current
p Thread.main == Thread.current
=begin
#<Thread:0x0000000001d9ae58 run>
#<Thread:0x0000000001d9ae58 run>
true
=end

可见,线程其实是一个Thread类的实例对象。

创建Ruby线程

使用Thread库的new()、start()、fork()可创建线程,它们几乎等价,且后两者是别名关系。

创建线程时需传递一个代码块或Proc对象参数, 它们是要执行的任务,它们将在新的线程分支中执行。如果需要,可以为代码块或Proc对象传递参数。

arr=[]
a,b,C=1,2,3
Thread.new(a,b,c) { |d,e,f| arr << d << e << f }
sleep 1
p arr   #=> [1,2,3]

如果主线程先执行完成,主线程将直接退出,主线程的退出将会终止进程,使得其它线程也会退出。

Thread.new {puts "hello"}
puts "world"

上述代码几乎总是会输出world,然后退出,主线程的退出使得子线程不会输出"hello"。之所以总是会输出world而不是输出hello,这和Ruby的线程调度有关,在后面的文章中会详细解释Ruby中的线程调度。

join()和value()等待线程

如果想要等待某个线程先执行完成,可使用t.join(),如果线程t尚未退出,则join()会阻塞。可以在任意线程中调用t.join(),谁调用谁等待。

t = Thread.new { puts "I am Child" }
t.join  # 等待子线程执行完成
puts "I am Parent"

还可以将多个线程对象放进数组,然后执行遍历join,另一种常见的做法是使用map{}.each(&:join)的方式:

threads = []
3.times do |i|
  # 将多个线程加入到数组中
  threads << Thread.new { puts "Thread #{i}" }
end

# 在main线程中join每个线程,
# 因此只有3个线程全都完成后,main线程才会继续,即退出
threads.each(&:join)
=begin
Thread 1
Thread 0
Thread 2
=end

# 另一种常见方式
3.times.map {|i| Thread.new { puts "Thread #{i}" } }.each(&:join)
Array.new(3) {|i| Thread.new { puts "Thread #{i}" } }.each(&:join)

t.value()t.join()类似,不同之处在于t.value()在内部调用t.join()等待线程t之后,还会在等待成功时取得该线程的返回值。

a = Thread.new { 2 + 2 }
p a.value   #=> 4

注意,对于Ruby来说,无论是否执行join()操作,任务执行完成的线程都会马上被操作系统回收(从OS线程表中删除),但被回收的线程仍然能够使用value()方法来获取被回收线程的返回值。之所以会这样,我个人猜想,也许是因为Ruby内部已经帮我们执行了join操作并将线程返回值保存在Ruby内部,这样对于用户来说就更加安全,而且用户执行join()或value()操作,可能是在等待Ruby内部的这个值的出现。

线程的异常处理

默认情况下,当某个非main线程中抛出异常后,该线程将因异常而终止,但是它的终止不会影响其它线程。

t = Thread.new {raise "hello"}    # 抛出异常
sleep 1    # 仍然睡眠1秒后退出

如果使用了t.join()t.value()去等待抛出异常的线程t,异常将会传播给调用这两个方法的线程。例如主线程调用t.join,如果t会抛出一次异常,那么主线程在等待过程中还会抛出一次异常。

t = Thread.new {raise "hello"}    # 抛出异常
t.join()    # 子线程抛异常后,main线程也抛异常

如果想要让任意线程出现异常时终止整个程序,可设置类方法Thread.abort_on_exception为true,它会在任意子线程抛出异常后自动传播给main线程,从而终止进程:

Thread.abort_on_exception = true
Thread.new { raise "Error" }
sleep 1   # 不会睡眠完1秒,而是子线程异常后立即异常退出

如果想要让某个特定的线程出现异常时终止整个程序,可设置同名的实例方法t.abort_on_exception为true,只有t线程异常时才会终止程序。

t1 = Thread.new { raise "Error from t1" }
t1.abort_on_exception = true
sleep 1

另外,线程实例方法t.raise()可以直接在线程t抛出异常。

需注意,Ruby线程有一个巨大的缺点:无论是raise抛出异常还是各种终止(比如kill、exit),都不会执行ensure子句。

线程的状态和生命周期

Ruby中的线程具有5种状态,可通过t.status()查看,该方法有5种对应的返回值:

- run: 线程正在运行(running)或可运行(runnable)
- sleep: 线程处于睡眠态,比如阻塞(如sleep,mutex,io block)
- false: 线程正常退出后的状态,包括执行完流程、手动退出(t.exit)、信号终止(t.kill)
- nil: 线程因抛出异常(比如raise)而退出的状态
- aborting: 线程被完全kill之前的过渡状态,不考虑这种状态的存在

另外,还有两种统称状态:

  • alive:存活的线程,等价于run + sleep
  • stop:已停止的线程,等价于sleep + dead(false+nil)

可分别使用alive?()stop?()来判断线程是否属于这两种统称状态。

此外:

Kernel.sleep:让当前线程睡眠指定时长,无参数则永久睡眠,线程将进入睡眠队列
Thread.stop:让当前线程睡眠,进入睡眠队列,等价于无参数的sleep
Thread.pass:转让CPU,当前线程进入就绪队列而不是睡眠队列
t.run:唤醒线程t使其进入就绪队列,同时让当前线程放弃CPU,调度程序将重新调度
t.wakeup:唤醒线程t使其进入就绪队列,但不会让当前线程放弃CPU,调度程序将不会立即重新调度  

Thread.kill:终止指定线程,它将不再被调度
Thread.exit:终止当前线程,它将不再被调度
t.exit,t.kill,t.terminate:终止线程t,t将不再被调度

几个注意事项:

  • 这里5个终止线程的方式效果上是完全等价的,三个实例方法是别名关系,而两个类方法的内部也都是调用线程对象的kill
  • 最好要不加区分地看待run和wakeup
  • 对于Thread.pass,除了知道它转让CPU的行为是确定的,不要对它假设任何额外的行为,比如不要认为出让CPU后一定会调度到其它Ruby线程,很有可能会在调度其它一些非Ruby线程后再次先调度到本线程而非其它Ruby线程
  • 需注意,无论是raise抛出异常还是各种终止(比如kill、exit),都不会执行ensure子句

线程私有变量和局部变量

Ruby进程内的所有线程共享进程的虚拟地址空间,所以共享了一些数据。

但线程是语句块或者Proc对象,所以语句块内部创建的变量是在当前线程栈内部的,是每个线程私有的变量。

# 主线程中的变量
a = 1

# 子线程
t1 = Thread.new(3) do |x|
  a += 1
  b=3
  x=4
end

# 主线程
t1.join
p a   # 2
#p b  # 报错,b不存在
#p x  # 报错,x不存在

Ruby为线程提供了局部变量共享的概念,每个线程对象都可以有自己的局部数据空间(即线程本地变量),线程对象的局部空间互不影响,比如两个线程中同时进行正则匹配,两个线程的$~是不一样且互不影响的。

线程对象t的局部数据空间是t[key]=value,即一个名为t的hash结构,因为对象t是可以共享的,所以它的局部空间也是共享的。

t1 = Thread.new do
  t = Thread.current
  t[:name] = "junmajinlong"
  t[:age] = 23
end

t1.join

p t1.keys          # [:name, :age]
p t1.key? :gender  # false
p t1[:name]        # "junmajinlong"
t1[:age] = 24
p t1[:age]         # 24

所以,有这么几个方法:

t[key]
t[key]=
t.keys
t.key?

此外还有一个fetch()方法,类似于Hash的fetch(),默认情况下访问不存在的key会异常,可指定默认值或通过语句块返回默认值。

严格来说,从Ruby 1.9出现Fiber之后,t[]不再是线程本地变量(thread-local),而是纤程(Fiber)本地变量(fiber-local)。但也支持使用线程本地变量:

t.thread_variables
t.thread_variable?
t.thread_variable_get
t.thread_variable_set

线程组

默认情况下,所有线程都在默认的线程组中,这个默认线程组是Ruby程序启动时创建的。可使用ThreadGroup::Default获取默认线程组。

t1 = Thread.new do
  Thread.stop
end

p t1.group
p Thread.current.group
p ThreadGroup::Default
=begin
#<ThreadGroup:0x00000000019bcb60>
#<ThreadGroup:0x00000000019bcb60>
#<ThreadGroup:0x00000000019bcb60>
=end
  • 使用ThreadGroup.new可创建一个自定义的线程组
  • 使用tg.add(t)可将线程t加入线程组tg,这将会从原来的线程组移除t再加入新组tg
  • 使用tg.list可列出线程组tg中的所有线程
  • 使用t.group可获取线程t所属的线程组
  • 子线程会继承父线程的线程组,即子线程也会加入父线程所在的线程组
tg = ThreadGroup.new
t1 = Thread.new { Thread.stop }
t2 = Thread.new { Thread.stop }
tg.add t1
tg.add t2
pp tg.list
pp t1.group
=begin
[#<Thread:0x000000000196c480 a.rb:4 sleep_forever>,
 #<Thread:0x000000000196c3b8 a.rb:5 sleep_forever>]
#<ThreadGroup:0x000000000196c520>
=end

线程组还有一个功能:可使用tg.enclose封闭线程组tg,封闭后的线程组将不允许内部线程移出加入其它组,也不允许外界线程加入该组,只允许在该组中创建新线程。使用tg.enclosed?测试线程组tg是否已封闭。

其实,使用线程组可以将多个线程分类统一管理,线程组本质是一个线程数组加一些额外属性。比如,可以为线程组定义一些额外的针对线程组中所有线程的功能:wakeup组中的所有线程、join所有线程、kill所有线程。

class ThreadGroup
  def wakeup
    list.each(&:wakeup)
  end
  def join
    list.each { |th| th.join if th != Thread.current }
  end
  def kill
    list.each(&:kill)
  end
end

更多关于Ruby多线程知识请查看下面的相关链接

(0)

相关推荐

  • Ruby3多线程并行Ractor使用方法详解

    Ruby 3 Ractor官方手册:https://github.com/ruby/ruby/blob/master/doc/ractor.md 在Ruby3之前,使用Thread来创建新的线程,但这种方式创建的多线程是并发而非并行的,MRI有一个全局解释器锁GIL来控制同一时刻只能有一个线程在执行: # main Thread t1 = Thread.new do # new Thread sleep 3 end t1.join Ruby3通过Ractor(Ruby Actor,Actor模型

  • Ruby中使用多线程队列(Queue)实现下载博客文章保存到本地文件

    Ruby:多线程下载博客文章到本地的完整代码 复制代码 代码如下: #encoding:utf-8 require 'net/http' require 'thread' require 'open-uri' require 'nokogiri' require 'date' $queue = Queue.new #文章列表页数 page_nums = 8 page_nums.times do |num|   $queue.push("http://www.cnblogs.com/hongfei

  • Ruby 多线程的潜力和弱点分析

    Web 应用大多是 IO 密集型的,利用 Ruby 多进程+多线程模型将能大幅提升系统吞吐量.其原因在于:当Ruby 某个线程处于 IO Block 状态时,其它的线程还可以继续执行.但由于存在 Ruby GIL (Global Interpreter Lock),MRI Ruby 并不能真正利用多线程进行并行计算.JRuby 去除了 GIL,是真正意义的多线程,既能应付 IO Block,也能充分利用多核 CPU 加快整体运算速度. 上面说得比较抽象,下面就用例子一一加以说明. Ruby 多线

  • 初步讲解Ruby编程中的多线程

    每个正在系统上运行的程序都是一个进程.每个进程包含一到多个线程. 线程是程序中一个单一的顺序控制流程,在单个程序中同时运行多个线程完成不同的工作,称为多线程. Ruby 中我们可以通过 Thread 类来创建多线程,Ruby的线程是一个轻量级的,可以以高效的方式来实现并行的代码. 创建 Ruby 线程 要启动一个新的线程,只需要调用 Thread.new 即可: # 线程 #1 代码部分 Thread.new { # 线程 #2 执行代码 } # 线程 #1 执行代码 实例 以下实例展示了如何在

  • Ruby多线程编程初步入门

    传统程序有一个单独的线程执行,包含该程序的语句或指令顺序执行直到程序终止. 一个多线程的程序有多个线程的执行.在每个线程是按顺序执行的,但是在多核CPU机器上线程可能并行地执行.例如,通常情况下在单一CPU的机器,多个线程实际上不是并行执行的,而是模拟并行交叉的线程的执行. Ruby的可以使用 Thread 类很容易地编写多线程程序. Ruby线程是一个轻量级的和高效的在代码中实现并行性. 创建Ruby线程: 要启动一个新线程,关联一个块通过调用Thread.new.将创建一个新的线程执行的代码

  • Ruby多线程库(Thread)使用方法详解

    Thread是Ruby的线程库,Thread库已经内置在Ruby中,但如果想要使用线程安全的Queue.Mutex以及条件变量等,则需要手动require 'thread'. 主线程main 默认情况下,每个Ruby进程都具备一个主线程main,如果没有创建新的线程,所有的代码都将在这个主线程分支中执行. 使用Thread.main()类方法可获取当前线程组的主线程,使用Thread.current()可以获取当前正在执行的线程分支.使用Thread.list()可获取当前进程组中所有存活的线程

  • thinkPHP2.1自定义标签库的导入方法详解

    本文详细讲述了thinkPHP2.1自定义标签库的导入方法.分享给大家供大家参考,具体如下: TP的手册似乎跟不上节奏, 对自定义标签只是寥寥几句, 摸索了N久, 终于将自定义的标签进行了导入. 心得如下: 1. 情况:  新建自定义的标签库类: @.Mylib.Tag.TagLibTest - 懂TP的应该知道这代表的路径 使用Examples下的Tag演示文件 <?php // +--------------------------------------------------------

  • python图形开发GUI库wxpython使用方法详解

    一.python gui(图形化)模块介绍: Tkinter :是python最简单的图形化模块,总共只有14种组建 Pyqt     :是python最复杂也是使用最广泛的图形化 Wx       :是python当中居中的一个图形化,学习结构很清晰 Pywin   :是python windows 下的模块,摄像头控制(opencv),常用于外挂制作 二.wx模块的安装: C:\Users\Administrator> pip install wxpython 三.图形化介绍   四.wx主

  • python词云库wordCloud使用方法详解(解决中文乱码)

    文章中的例子主要借鉴wordColud的examples,在文章对examples中的例子做了一些改动. 一.wordColud设计中文词云乱码 使用wordColud设计词云的时候可能会产生乱码问题,因为wordColud默认的字体不支持中文,所以我们只需要替换wordColud的默认字体即可正常显示中文. 1.中文词云乱码 我们使用simhei(黑体)来替换wordColud的默认字体. 2.替换默认字体 a.在字体文件*.tff字体文件(simhei.tff)拷贝到wordColud安装的

  • Ruby操作CSV格式数据方法详解

    CSV格式的数据默认是以逗号分隔各个字段的一条一条记录,默认用换行符分隔每一条记录.此外,有的CSV有标题行,有的没有.还有其他一些格式, 它们都有默认值,但都可以在读.写CSV数据时修改默认设置.后文大多数时候故意忽略这些设置,因为绝大多数读写操作都使用同样的参数**options进行格式设置.例如,在读取csv文件中的数据时想要忽略标题行,可以在参数中设置headers: true 可设置的项及其默认值包括: col_sep: ",", #=> 字段分隔符 row_sep:

  • Java实现多线程同步五种方法详解

    一.为什么要线程同步 因为当我们有多个线程要同时访问一个变量或对象时,如果这些线程中既有读又有写操作时,就会导致变量值或对象的状态出现混乱,从而导致程序异常.举个例子,如果一个银行账户同时被两个线程操作,一个取100块,一个存钱100块.假设账户原本有0块,如果取钱线程和存钱线程同时发生,会出现什么结果呢?取钱不成功,账户余额是100.取钱成功了,账户余额是0.那到底是哪个呢?很难说清楚.因此多线程同步就是要解决这个问题. 二.不同步时的代码 Bank.java package threadTe

  • C++多线程获取返回值方法详解

    在许多时候,我们会有这样的需求--即我们想要得到线程返回的值.但是在C++11 多线程中我们注意到,std::thread对象会忽略顶层函数的返回值. 那问题来了,我们要怎么获得线程的返回值呢? 我们通过一个例子来说明如何实现这个需求.用多个线程计算(a+b)/ (x+y) 的值 有两种方法,分别是 1. 传统的方法:在线程间共享指针 #include<iostream> #include<thread> #include<mutex> #include<atom

  • Java多线程连续打印abc实现方法详解

    一道编程题如下: 实例化三个线程,一个线程打印a,一个线程打印b,一个线程打印c,三个线程同时执行,要求打印出10个连着的abc. 题目分析: 通过题意我们可以得出,本题需要我们使用三个线程,三个线程分别会打印6次字符,关键是如何保证顺序一定是abc...呢.所以此题需要同步机制来解决问题! 令打印字符A的线程为ThreadA,打印B的ThreadB,打印C的为ThreadC.问题为三线程间的同步唤醒操作,主要的目的就是使程序按ThreadA->ThreadB->ThreadC->Thr

  • sqoop如何指定pg库的模式(方法详解)

    目录 说明 解决办法 sqoop是一款用于hadoop和关系型数据库之间数据导入导出的工具.你可以通过sqoop把数据从数据库(比如mysql,oracle)导入到hdfs中:也可以把数据从hdfs中导出到关系型数据库中.sqoop通过Hadoop的MapReduce导入导出,因此提供了很高的并行性能以及良好的容错性. sqoop适合以下的人群使用: 系统和应用开发者 系统管理员 数据库管理员 数据分析师 数据工程师 说明 使用sqoop导出导入数据非常的方便,但是对于postgresql(简称

随机推荐