スレッド

Javaっぽくきれいに書けます。
まず簡単な書き方は、関数と引数を与えてThreadオブジェクトを作り、start()を呼ぶことです。
import threading, time
def f():
    for i in range(10): print "*",
        
th = threading.Thread(target=f, args=())
th.setDaemon(True)
th.start()
th.join()
th.setDaemon(True)を書くことで、メインスレッドが終了したら子スレッドも強制的に終了させられます。 でもjoin()を呼ぶとメインスレッドは子スレッドの終了を待つので、このプログラムでは動作は変わりません。

あるいは、独自のクラスを作ることもできます。 threading.Threadクラスを継承して、run()メソッドを実装します。そして、start()を呼ぶと開始されます。
import threading, time

class Worker(threading.Thread) :
    def run(self) :
        for i in range(0, 10) :
            print i
            time.sleep(0.1)

w1 = Worker()
w2 = Worker()

w1.start()
w2.start()
二つのworkerが独立して0から9までの数字をprintします。

Pythonでスレッドを使うとシグナルを受け取ってくれない!!

Pythonでスレッドを使ったプログラムを書いて、まず気づくのがCtrl-cが効かないという問題。たとえば
import threading

def baby(arg):
    """ 重いループ """
    for i in range(10):
        for j in range(1000000): # このループ回数はマシンによって調整して下さい
            k = i * j
        print arg # 時々出力する


def main():
    # barと言い続けるスレッドとbooと言い続けるスレッドを用意
    ths = [threading.Thread(target=baby, args=("bar",)), 
           threading.Thread(target=baby, args=("boo",))]
    for th in ths: # スレッド開始
        th.start()

    for th in ths: # スレッド終了
        th.join()

main()
みたいなプログラムを書いて実行して、Ctrl-cを押してもなぜか終了しなくて、毎回killall pythonってやるはめになる。
原因だけど、Pythonではシグナルハンドラを実行できるのがメインスレッドだけで、メインスレッドがjoin()でブロックしている時にはシグナルハンドラを実行できないらしい。不便だ…

これを回避するために書いてみたのが以下のコード。上からの変更点をハイライトします。

import threading,time,os,signal,sys

child_pid = -1

def baby(arg):
    """ 重いループ """
    for i in range(10):
        for j in range(1000000): # このループ回数はマシンによって調整して下さい
            k = i * j
        print arg # 時々出力する


def catch_sig(sig, frame):
    """ シグナルハンドラ。childをkillする。 """
    assert child_pid > 0
    print >> sys.stderr, "Interrupted by the signal (mom)"
    print >> sys.stderr, "Killing %d"%child_pid
    os.kill(child_pid, signal.SIGKILL) # SIGKILLで荒っぽくkillしてしまう
    sys.exit(1)


def set_sig_handlers():
    """ シグナルハンドラをセット """
    signal.signal(signal.SIGINT,  catch_sig)  # Ctrl-c
    signal.signal(signal.SIGTERM,  catch_sig) # killで送られる
    signal.signal(signal.SIGHUP,  catch_sig)  # ログアウト時
    signal.signal(signal.SIGQUIT, catch_sig)  # よく知らないけどついでに


def main():
    global child_pid       # グローバル変数
    child_pid = os.fork()  # ここでfork()して、シグナル用とジョブ用の二つに分かれる
    if child_pid == -1:
        raise "Failed to fork"
    if child_pid == 0:
        # こっちはジョブプロセス。本来したいことを書く。
        
        # barと言い続けるスレッドとbooと言い続けるスレッドを用意
        ths = [threading.Thread(target=baby, args=("bar",)), 
               threading.Thread(target=baby, args=("boo",))]
        for th in ths: # スレッド開始
            th.start()

        for th in ths: # スレッド終了
            th.join()
   
    else:
        # こっちはシグナル用プロセス。
        assert child_pid > 0
        set_sig_handlers()
        # 子プロセスの終了を待つ。
        os.wait()
        
main()
ポイントは、開始した瞬間os.fork()して、シグナル用のプロセスを作っていること。こっちのプロセスは子スレッドを作らないから、ちゃんとシグナルを扱える。シグナルを受け取ったら、本来の処理を行っているジョブ用のプロセスにSIGKILLを送る。SIGKILLは受け取られたらハンドラを呼ぶことなく即終了するので、ちゃんときれいに全てのプロセス/スレッドを終了させることができる。

スレッドとキューのちょっとしたサンプル

手元からタスクを打ち込むと、Frontendクラスのrun()によって処理されて、キューにたまります。TaskHandlerクラスは別のスレッドで動いていて、キューからタスクを一つずつ取り出して、マイペースに(1タスク1秒)処理します。このキューはJavaでいうところのsynchronizedになっていて、スレッドセーフです。
☆なお、Python 2.4からはcollections.dequeueを使うのがいいようです。
import sys, time
import threading

class Queue:
    """
        Synchronized queue
    """
    
    def __init__(self):
        self.list = []
        self.cond = threading.Condition()

    def dequeue(self):
        self.cond.acquire()
        while len(self.list) == 0:
            self.cond.wait()
        item = self.list.pop(0)
        self.cond.release()
        return item


    def enqueue(self, item):
        self.cond.acquire()
        self.list.append(item)
        self.cond.notify()
        self.cond.release()


class Task :
    def __init__(self, msg):
        self.msg = msg

    def perform(self):
        print "<Performing " + self.msg  + "..." ,
        sys.stdout.flush()
        time.sleep(1)
        print " done>"
        sys.stdout.flush()


class TaskHandler(threading.Thread) :
    def __init__(self, taskQueue):
        threading.Thread.__init__(self)
        self.setDaemon(True)
        self.taskQueue = taskQueue

    
    def run(self):
        while True :
            task = self.taskQueue.dequeue()
            task.perform();



class Frontend :
    def __init__(self, taskQueue):
        self.taskQueue = taskQueue
        
    def run(self):
        while True :
            msg = sys.stdin.readline()
            task = Task(msg.strip())
            self.taskQueue.enqueue(task)



q = Queue()
th = TaskHandler(q)
th.start()

fe = Frontend(q)
fe.run()