[an error occurred while processing this directive]
[an error occurred while processing this directive]import sys,select def readline_timeout(fd, timeout = 1.0): (r, w, e) = select.select([fd], [], [], timeout) if len(r) == 0: return "TIMEOUT!\n" elif len(r) == 1: return r[0].readline() else : assert False print readline_timeout(sys.stdin, 3.0),標準入力から一行読みます。3秒以内に入力がないと、TIMEOUT!と表示して終了します。
import fcntl flag = fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK fcntl.fcntl(fd, fcntl.F_SETFL, flag)(因みにfcntlはエフ・コントロールと読みます)
import os,sys,fcntl,socket,select class SelectTest: def __init__(self, fd): self.buf = "" self.fd = fd flag = fcntl.fcntl(fd, fcntl.F_GETFL) new_flag = flag | os.O_NONBLOCK fcntl.fcntl(fd, fcntl.F_SETFL, new_flag) def handle_msg(self, msg): ms = msg.split("\n") if self.buf != "": ms[0] = self.buf + ms[0] self.buf = "" m = ms.pop() if m != "": self.buf = m for m in ms: print >> sys.stderr, "Received line: <%s>"%m def async_recv(self): # Read from FDs with timeouts r,w,e = select.select([self.fd], [], [], 3.0) # Timeout print >> sys.stderr, "Selected: [%d, %d, %d]"%(len(r), len(w), len(e)) if len(r) == 0: print >> sys.stderr, "Timeout!" else: for fd in r: assert fd == self.fd m = fd.read(1024) self.handle_msg(m) st = SelectTest(sys.stdin) while True: st.async_recv()例のようにstdinなんかを入れても、(コンソールからの入力は改行単位でしか送信されないので) あまり面白くないのですが、ネットワークの入力なんかを入れると、self.bufという変数に 改行が来る前の(不完全な)データがバッファされて、完全な1行単位でデータが出力されるのがポイントです。