Javaっぽくきれいに書けます。
まず簡単な書き方は、関数と引数を与えてThreadオブジェクトを作り、start()を呼ぶことです。
import threading, time
def f():
for i in range(10): print "*",
th = threading.Thread(target=f, args=())
th.setDaemon(True)
th.start()
th.join()
th.setDaemon(True)を書くことで、メインスレッドが終了したら子スレッドも強制的に終了させられます。
でもjoin()を呼ぶとメインスレッドは子スレッドの終了を待つので、このプログラムでは動作は変わりません。
あるいは、独自のクラスを作ることもできます。
threading.Threadクラスを継承して、run()メソッドを実装します。そして、start()を呼ぶと開始されます。
import threading, time
class Worker(threading.Thread) :
def run(self) :
for i in range(0, 10) :
print i
time.sleep(0.1)
w1 = Worker()
w2 = Worker()
w1.start()
w2.start()
二つのworkerが独立して0から9までの数字をprintします。
Pythonでスレッドを使ったプログラムを書いて、まず気づくのがCtrl-cが効かないという問題。たとえば
import threading
def baby(arg):
""" 重いループ """
for i in range(10):
for j in range(1000000): # このループ回数はマシンによって調整して下さい
k = i * j
print arg # 時々出力する
def main():
# barと言い続けるスレッドとbooと言い続けるスレッドを用意
ths = [threading.Thread(target=baby, args=("bar",)),
threading.Thread(target=baby, args=("boo",))]
for th in ths: # スレッド開始
th.start()
for th in ths: # スレッド終了
th.join()
main()
みたいなプログラムを書いて実行して、Ctrl-cを押してもなぜか終了しなくて、毎回killall pythonってやるはめになる。
原因だけど、Pythonではシグナルハンドラを実行できるのがメインスレッドだけで、メインスレッドがjoin()でブロックしている時にはシグナルハンドラを実行できないらしい。不便だ…
これを回避するために書いてみたのが以下のコード。上からの変更点をハイライトします。
import threading,time,os,signal,sys
child_pid = -1
def baby(arg):
""" 重いループ """
for i in range(10):
for j in range(1000000): # このループ回数はマシンによって調整して下さい
k = i * j
print arg # 時々出力する
def catch_sig(sig, frame):
assert child_pid > 0
print >> sys.stderr, "Interrupted by the signal (mom)"
print >> sys.stderr, "Killing %d"%child_pid
os.kill(child_pid, signal.SIGKILL)
sys.exit(1)
def set_sig_handlers():
signal.signal(signal.SIGINT, catch_sig)
signal.signal(signal.SIGTERM, catch_sig)
signal.signal(signal.SIGHUP, catch_sig)
signal.signal(signal.SIGQUIT, catch_sig)
def main():
global child_pid
child_pid = os.fork()
if child_pid == -1:
raise "Failed to fork"
if child_pid == 0:
# barと言い続けるスレッドとbooと言い続けるスレッドを用意
ths = [threading.Thread(target=baby, args=("bar",)),
threading.Thread(target=baby, args=("boo",))]
for th in ths: # スレッド開始
th.start()
for th in ths: # スレッド終了
th.join()
else:
assert child_pid > 0
set_sig_handlers()
os.wait()
main()
ポイントは、開始した瞬間os.fork()して、シグナル用のプロセスを作っていること。こっちのプロセスは子スレッドを作らないから、ちゃんとシグナルを扱える。シグナルを受け取ったら、本来の処理を行っているジョブ用のプロセスにSIGKILLを送る。SIGKILLは受け取られたらハンドラを呼ぶことなく即終了するので、ちゃんときれいに全てのプロセス/スレッドを終了させることができる。
手元からタスクを打ち込むと、Frontendクラスのrun()によって処理されて、キューにたまります。TaskHandlerクラスは別のスレッドで動いていて、キューからタスクを一つずつ取り出して、マイペースに(1タスク1秒)処理します。このキューはJavaでいうところのsynchronizedになっていて、スレッドセーフです。
☆なお、Python 2.4からはcollections.dequeueを使うのがいいようです。
import sys, time
import threading
class Queue:
"""
Synchronized queue
"""
def __init__(self):
self.list = []
self.cond = threading.Condition()
def dequeue(self):
self.cond.acquire()
while len(self.list) == 0:
self.cond.wait()
item = self.list.pop(0)
self.cond.release()
return item
def enqueue(self, item):
self.cond.acquire()
self.list.append(item)
self.cond.notify()
self.cond.release()
class Task :
def __init__(self, msg):
self.msg = msg
def perform(self):
print "<Performing " + self.msg + "..." ,
sys.stdout.flush()
time.sleep(1)
print " done>"
sys.stdout.flush()
class TaskHandler(threading.Thread) :
def __init__(self, taskQueue):
threading.Thread.__init__(self)
self.setDaemon(True)
self.taskQueue = taskQueue
def run(self):
while True :
task = self.taskQueue.dequeue()
task.perform();
class Frontend :
def __init__(self, taskQueue):
self.taskQueue = taskQueue
def run(self):
while True :
msg = sys.stdin.readline()
task = Task(msg.strip())
self.taskQueue.enqueue(task)
q = Queue()
th = TaskHandler(q)
th.start()
fe = Frontend(q)
fe.run()
[an error occurred while processing this directive]