スレッド

たとえば1人で味噌汁を作るときのことを考えよう。 具材を包丁で切っているときにお湯が沸いた。人間なら包丁の手を止めて コンロを弱めるなり,出しを取る作業をするなりしてからまた包丁に 戻ることができる。ところがこれに相当することをプログラミングせよとなると 要領の悪い仕事になる。単純に書かれた以下のようなプログラム例で考えよう。

お湯を沸かし始める
while 野菜がある間 包丁でトントン end
沸いたお湯に入れる

人間なら機転を利かせられるものの,コンピュータは指示以外には動けないため 「包丁でトントン」の while 〜 end を行なっている間は, お湯が沸騰して鍋がゆれても構うことができない。では,while から end の中に「もし,お湯が沸いたら○○する」のようなif文を入れればよいのだろうか。 ところがお湯の量の割に火力が強くて,「包丁でトントン」 する前に沸騰するかもしれない。となれば「もし,お湯が沸いたら○○する」を while の前の行にも入れる必要が出てくる。さらに,お湯を沸かす以外に 「電子レンジで様子を見ながら冷凍色材を解凍」という処理が増えたら……? と考えを進めると,プログラムはどんどん複雑化する。

これまで作成したプログラムでは,実行するときにまさに「実行」している部分は つねに1箇所である。つまり処理の流れを辿ると1本の線で表せる。 Rubyでは,動作中の単一プログラムの実行の流れを2本以上にすることができる。 これを表現するのが Thread である。Thread を利用すると,下図のB:とC:の流れを並行して進めることができる。

A:お湯を沸かし始める
  
  
【B:野菜を切る流れ】 while 野菜がある間 包丁でトントン end 出しを取る流れと合わせる 【C:出しを取る流れ】 温ったら出しの材料投入 while 煮立っていない 待つ end 弱火にする 野菜を切る流れと合わせる
  
  
D:鍋に野菜を入れて……

このような処理の流れのことをスレッド という。

Threadの基本的使い方

Thread分岐した実行単位は Thread.new で作成する。

A
t = Thread.new do
  B
end
C
t.join
D

このような形式で新しいスレッドが生成され,B の部分と C の部分の実行が同時に進む。join メソッドにより,2つの処理の流れが歩調を合わせて1本に戻り, BC 両方が完了してから D の実行に移る。先述の fork は全く違うプロセスが2つ存在しているため,それぞれ別々の変数管理で その後のプログラム実行が進むのに対し,Thread の場合は2つの実行の流れが 全く同じプロセスで動くため変数は共有される。以下のプログラムは, 変数 x の値を,同時進行する2つのスレッドで同時に操作している。 実際に動かして試してみよ。

th.rb

#!/usr/bin/env ruby
# -*- coding: utf-8 -*-

x = 1
t = Thread.new do
  # ここは子のみが実行するところ
  5.times do
    STDERR.print("\e[32m")      # 子スレッドが走るときに端末文字色を緑に
    x=5; sleep 0.2
  end
end
while t.alive?
  # ここが親スレッド
  STDERR.printf("\e[mx=%dに1足すと:", x) # ESC [ m で標準色に戻す
  sleep 0.05
  x += 1
  STDERR.printf("%d\n", x)
end
t.join
puts

このプログラムで x の値を出力しているのは色を変えた 親スレッドの部分のみで、そこでは それまでの x の値に 1を足した結果を出しているのにもかかわらず、出力結果にはところどころ 1を足した結果になっていないものが見られる。

./th.rb
x=5に1足すと:6
x=6に1足すと:7
x=7に1足すと:8
x=8に1足すと:6
x=6に1足すと:7
x=7に1足すと:8
x=8に1足すと:6
x=6に1足すと:7
x=7に1足すと:8
  :
  :

なお,スレッドはいくつに分かれても,プログラムの実行主体となる プロセスは1つなので,どれか1つのスレッドがexitすると親スレッドを 含めたすべてのスレッドが終了する。

Mutexによる競合回避

複数のスレッドで共有アクセスするデータがあるときに 一連の処理を、他のスレッドから保護しつつ完遂させる必要がある場合は Mutex を利用する。th.rb では変数 x に対する処理が競合していたが, これを Mutex の synchrozize で競合から保護するように修正した例を示す。

mutex.rb

#!/usr/bin/env ruby
# coding: euc-jp
require 'thread'

m = Mutex.new
x = 1
t = Thread.new do
  # ここは子のみが実行するところ
  5.times do
    m.synchronize {
      STDERR.print("\e[32m")
      x=5; sleep 0.2
    }
    Thread.pass
  end
end
while t.alive?
  m.synchronize {
    STDERR.printf("\e[mx=%dに1足すと:", x)
    sleep 0.05
    x += 1
    STDERR.printf("%d\n", x)
  }
  Thread.pass
end
t.join

実行した場合、以下のように「1足す」という結果は正しくなる。

./mutex.rb
x=5に1足すと:6
x=5に1足すと:6
x=5に1足すと:6
x=5に1足すと:6
x=5に1足すと:6

synchronizeブロックの後ろにある Thread.pass は,他のスレッドに制御を譲るためのものである(後述)。

スレッドどうしの制御

複数のスレッドを生成し、 あるスレッドが別のスレッドの実行を制御するようにすると、 制限時間付きのプログラムなどが比較的容易に作れる。

以下のプログラムは、次々と出る問題に対する解答の入力を 5秒だけ待つような、制限時間付きクイズを行なうものである。

timeshock.rb

#!/usr/koeki/bin/ruby
# -*- coding: utf-8 -*-

# 問と解を列挙。本来ファイルから読むべきだが主題はスレッドなので簡略化
questions = [
             ["この時間習っている言語は", "ruby"],
             ["ファイル一覧を出すコマンドは", "ls"],
             ["ファイルを表示・結合するコマンドは", "cat"],
             ["ファイルを削除するコマンドは", "rm"],
             ["ファイルを移動(リネーム)するコマンドは", "mv"]
  ]

n = 0                           # 問題番号
hit = 0                         # 正解数
STDOUT.sync = true
for q, a in questions
  reply = nil
  t = Thread.new do
    printf("%c[2J%c[1;1H", 27, 27) # 画面消去のエスケープシーケンス
    printf("第%d問 %s: ", n+=1, q)
    reply = gets.chomp
    if reply == a then
      print("正解!")
      hit += 1
    else
      print("ハズレ!\n")
    end
    t.exit
  end
  sleep 5
  if t.alive? then
    t.kill
    print(" 残念!\n")
    sleep 1
  end
end
printf("\n%d問正解\n", hit)
if hit < q.length/2 then
  print("トルネードです。ぐるぐるー\n")
end

走行中のスレッドは、別のスレッドから操作することができる。

たとえば t = Thread.new {...} として t に得たスレッドオブジェクトを介して以下の操作ができる(抜粋)。

t.alive? スレッドtが活きているかの真偽を返す
t.stop スレッドtを停止させる
t.run 停止中のスレッドtの実行を再開する
t.kill スレッドtを終了させる
t.status スレッドtの状態を返す("run", "sleep", "aborting"のいずれか)
Thread.pass 現在実行中のスレッドから他のスレッドに実行権を譲る

これらを用いて,親スレッドから子スレッドの停止・再開制御を 行なう例を示す。

th-op.rb

#!/usr/bin/env ruby
# -*- coding: utf-8 -*-

stop = false
bakudan = Thread.new do
  # 子スレッドのブロック
  puts ''                       # 1行空けておく
  begin
    timer = 11
    while true
      Thread.stop if stop       # stop変数が真ならスレッド実行停止
      # ESC 7 はカーソル位置保存,ESC [ 1 A は1行上,ESC 8 は位置復帰
      STDERR.printf("\e7\e[1A\r爆発 %2d秒前  \e8", timer-=1)
      break if timer < 1
      sleep 1
    end
  ensure
    puts "\nどかあああーーん!!" # 何をやっても必ず実行(ensureブロック)
    exit
  end
end

# メインスレッドのループ
sleep 0.01                       # こちらが僅かにあとで実行されるよう
while bakudan.alive? do
  STDERR.print "コマンド入力(a〜zのどれか):  \b"
  cmd = gets.chomp
  case cmd
  when "s"
    if stop then
      bakudan.run
      stop = false
    else
      stop = true
    end
  when "k"
    STDERR.puts "壊してみよう(Thread.kill)"
    bakudan.kill
    break
  when "q"
    STDERR.puts "逃げてみよう(exit)"
    exit
  end
  STDERR.print "\e[1A"                 # 1行上に戻しておく
end
puts ''

このプログラムを起動し,s を入力すると, bakudanスレッドが走行中なら停止(Thread.stop)し, 停止中なら開始(bakudan.run)する。k を入力するとスレッドをkill(bakudan.kill)し, q を入力するとプログラムそのものの実行を終了する。

プロセスとスレッドの組み合わせ

forkexec は外部プログラムを 起動するときにしばしば利用する。既に述べたようにexec で外部プログラムを起動することが失敗することもある。それに 対応するプログラムは以下のようになる。

f+e+t.rb

#!/usr/bin/env ruby
# -*- coding: utf-8 -*-
cmd = ARGV[0] || "kterm"

if (pid = fork()) then
  # 親プロセスのみ
  th = Process.detach(pid)
  t2 = Thread.new do
    th.join
    STDERR.puts "終わったようだ"
    exit 3
  end
else
  # 子プロセスのみの処理はここ
  begin
    exec(cmd)
  rescue
    STDERR.puts "\e[31m起動失敗\e[m"
    exit 1
  end
end

STDERR.printf("%s について: 終了を待つ=w killする=k 放置=その他: ", cmd)
k = STDIN.gets

if !th.alive? then
  STDERR.puts "と,思ったらこけちゃったみたい"
  exit 2
end
case k
when /^k/i
  STDERR.puts "ボシュっ"
  Process.kill(:QUIT, pid)
when /^w/i
  STDERR.puts "じゃ,待ちます。"
  Process.wait
else
  STDERR.puts "じゃ,わしゃ勝手に終わります。さいなら。"
end

このプログラムは ARGV[0] を指定すると それを起動コマンドとしてexecを試みる。 わざと間違ったコマンドを指定してみる。

./f+e+t.rb ktermmm
ktermmm について: 終了を待つ=w killする=k 放置=その他: 起動失敗
終わったようだ

Process.detach() は, 引数に指定したPIDを持つプロセスが終了するのを待つだけのスレッドを 終了する。したがって,そのスレッドがすぐに終わったことを 検出するもう1つのスレッドを生成しておけば,親プログラム自体を 制御できるようになる。それが t2 に代入している Thread.new の部分である。


本日の目次

yuuji@e.koeki-u.ac.jp