スレッド

動作中の単一Rubyプログラムの実行の流れを複数持たせられる。 これを表現するのが Threadクラスである。

Threadの基本的使い方

Thread分岐した実行単位は Thread.new で作成する。

A
Thread.new do
  B
end
C

このような形式で新しいスレッドが生成され,B の部分と C の部分の実行が同時に進む。しかし,forkと違い全く同じ プロセスで動くため変数は共有される。以下のプログラムを試してみよ。

th.rb

#!/usr/bin/env ruby
# coding: euc-jp

x = 1
t = Thread.new do
  # ここは子のみが実行するところ
  5.times do
    STDERR.print("\e[32m")      # 子スレッドが走るときに端末文字色を緑に
    x=5; sleep 0.2
  end
end
while t.alive?
  # ここが親スレッド
  STDERR.printf("\e[mx=%dに1足すと:", x) # ESC [ m で標準色に戻す
  sleep 0.05
  x += 1
  STDERR.printf("%d\n", x)
end
t.join
puts

このプログラムで x の値を出力しているのは色を変えた 親スレッドの部分のみで、そこでは それまでの x の値に 1を足した結果を出しているのにもかかわらず、出力結果にはところどころ 1を足した結果になっていないものが見られる。

./th.rb
x=5に1足すと:6
x=6に1足すと:7
x=7に1足すと:8
x=8に1足すと:6
x=6に1足すと:7
x=7に1足すと:8
x=8に1足すと:6
x=6に1足すと:7
x=7に1足すと:8
  :
  :

スレッドはいくつに分かれても,プログラムの実行主体となる プロセスは1つなので,どれか1つのスレッドがexitすると親スレッドを 含めたすべてのスレッドが終了することになる。

Mutexによる競合回避

複数のスレッドで共有アクセスするデータがあるときに 一連の処理を、他のスレッドからロックしつつ完遂させる必要がある場合は Mutex を利用する。th.rb を Mutex の synchrozize で競合から保護するように修正した例を示す。

mutex.rb

#!/usr/bin/env ruby
# coding: euc-jp
require 'thread'

m = Mutex.new
x = 1
t = Thread.new do
  # ここは子のみが実行するところ
  5.times do
    m.synchronize {
      STDERR.print("\e[32m")
      x=5; sleep 0.2
    }
    Thread.pass
  end
end
while t.alive?
  m.synchronize {
    STDERR.printf("\e[mx=%dに1足すと:", x)
    sleep 0.05
    x += 1
    STDERR.printf("%d\n", x)
  }
  Thread.pass
end
t.join

実行した場合、以下のように「1足す」という結果は正しくなる。

./mutex.rb
x=5に1足すと:6
x=5に1足すと:6
x=5に1足すと:6
x=5に1足すと:6
x=5に1足すと:6

synchronizeブロックの後ろにある Thread.pass は,他のスレッドに制御を譲るためのものである(後述)。

スレッドどうしの制御

複数のスレッドを生成し、あるスレッドが別のスレッドの実行を制御する ようにすると、制限時間つきのプログラムなどが比較的容易に作れる。

以下のプログラムは、次々と出る問題に対する解答の入力を5秒だけ 待つような、制限時間つきクイズを行なうものである。

timeshock.rb

#!/usr/koeki/bin/ruby
# coding: euc-jp

# 問と解を列挙。本来ファイルから読むべきだが主題はスレッドなので簡略化
questions = [
             ["この時間習っている言語は", "ruby"],
             ["ファイル一覧を出すコマンドは", "ls"],
             ["ファイルを表示・結合するコマンドは", "cat"],
             ["ファイルを削除するコマンドは", "rm"],
             ["ファイルを移動(リネーム)するコマンドは", "mv"]
  ]

n = 0                           # 問題番号
hit = 0                         # 正解数
STDOUT.sync = true
for q, a in questions
  reply = nil
  t = Thread.new do
    printf("%c[2J%c[1;1H", 27, 27) # 画面消去のエスケープシーケンス
    printf("第%d問 %s: ", n+=1, q)
    reply = gets.chomp
    if reply == a then
      print("正解!")
      hit += 1
    else
      print("ハズレ!\n")
    end
    t.exit
  end
  sleep 5
  if t.alive? then
    t.kill
    print(" 残念!\n")
    sleep 1
  end
end
printf("\n%d問正解\n", hit)
if hit < q.length/2 then
  print("トルネードです。ぐるぐるー\n")
end

走行中のスレッドは、別のスレッドから操作することができる。

たとえば t = Thread.new {...} として t に得たスレッドオブジェクトを介して以下の操作ができる(抜粋)。

t.alive? スレッドtが活きているかの真偽を返す
t.stop スレッドtを停止させる
t.run 停止中のスレッドtの実行を再開する
t.kill スレッドtを終了させる
t.status スレッドtの状態を返す("run", "sleep", "aborting"のいずれか)
Thread.pass 現在実行中のスレッドから他のスレッドに実行権を譲る

これらを用いて,親スレッドから子スレッドの停止・再開制御を 行なう例を示す。

th-op.rb

#!/usr/bin/env ruby
# coding: euc-jp

stop = false
bakudan = Thread.new do
  # 子スレッドのブロック
  puts ''                       # 1行空けておく
  begin
    timer = 11
    while true
      Thread.stop if stop       # stop変数が真ならスレッド実行停止
      # ESC 7 はカーソル位置保存,ESC [ 1 A は1行上,ESC 8 は位置復帰
      STDERR.printf("\e7\e[1A\r爆発 %2d秒前  \e8", timer-=1)
      break if timer < 1
      sleep 1
    end
  ensure
    puts "\nどかあああーーん!!" # 何をやっても必ず実行(ensureブロック)
    exit
  end
end

# メインスレッドのループ
sleep 0.01                       # こちらが僅かにあとで実行されるよう
while bakudan.alive? do
  STDERR.print "コマンド入力:  \b"
  cmd = gets.chomp
  case cmd
  when "s"
    if stop then
      bakudan.run
      stop = false
    else
      stop = true
    end
  when "k"
    STDERR.puts "壊してみよう(Thread.kill)"
    bakudan.kill
    break
  when "q"
    STDERR.puts "逃げてみよう(exit)"
    exit
  end
  STDERR.print "\e[1A"                 # 1行上に戻しておく
end
puts ''

プロセスとスレッドの組み合わせ

forkexec は外部プログラムを 起動するときにしばしば利用する。既に述べたようにexec で外部プログラムを起動することが失敗することもある。それに 対応するプログラムは以下のようになる。

f+e+t.rb

#!/usr/bin/env ruby
# coding: euc-jp
cmd = ARGV[0] || "kterm"

if (pid = fork()) then
  # 親プロセスのみ
  th = Process.detach(pid)
  t2 = Thread.new do
    th.join
    STDERR.puts "終わったようだ"
    exit 3
  end
else
  # 子プロセスのみの処理はここ
  begin
    exec(cmd)
  rescue
    STDERR.puts "\e[31m起動失敗\e[m"
    exit 1
  end
end

STDERR.printf("%s について: 終了を待つ=w killする=k 放置=その他: ", cmd)
k = STDIN.gets

if !th.alive? then
  STDERR.puts "と,思ったらこけちゃったみたい"
  exit 2
end
case k
when /^k/i
  STDERR.puts "ボシュっ"
  Process.kill(:KILL, pid)
when /^w/i
  STDERR.puts "じゃ,待ちます。"
  Process.wait
else
  STDERR.puts "じゃ,わしゃ勝手に終わります。さいなら。"
end

このプログラムは ARGV[0] を指定すると それを起動コマンドとしてexecを試みる。 わざと間違ったコマンドを指定してみる。

./f+e+t.rb ktermmm
ktermmm について: 終了を待つ=w killする=k 放置=その他: 起動失敗
終わったようだ

Process.detach() は, 引数に指定したPIDを持つプロセスが終了するのを待つだけのスレッドを 終了する。したがって,そのスレッドがすぐに終わったことを 検出するもう1つのスレッドを生成しておけば,親プログラム自体を 制御できるようになる。それが t2 に代入している Thread.new の部分である。


本日の目次

yuuji@e.koeki-u.ac.jp