[an error occurred while processing this directive]
[an error occurred while processing this directive]
import sys,select
def readline_timeout(fd, timeout = 1.0):
(r, w, e) = select.select([fd], [], [], timeout)
if len(r) == 0: return "TIMEOUT!\n"
elif len(r) == 1: return r[0].readline()
else : assert False
print readline_timeout(sys.stdin, 3.0),
標準入力から一行読みます。3秒以内に入力がないと、TIMEOUT!と表示して終了します。
import fcntl flag = fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK fcntl.fcntl(fd, fcntl.F_SETFL, flag)(因みにfcntlはエフ・コントロールと読みます)
import os,sys,fcntl,socket,select
class SelectTest:
def __init__(self, fd):
self.buf = ""
self.fd = fd
flag = fcntl.fcntl(fd, fcntl.F_GETFL)
new_flag = flag | os.O_NONBLOCK
fcntl.fcntl(fd, fcntl.F_SETFL, new_flag)
def handle_msg(self, msg):
ms = msg.split("\n")
if self.buf != "":
ms[0] = self.buf + ms[0]
self.buf = ""
m = ms.pop()
if m != "":
self.buf = m
for m in ms:
print >> sys.stderr, "Received line: <%s>"%m
def async_recv(self):
# Read from FDs with timeouts
r,w,e = select.select([self.fd], [], [], 3.0) # Timeout
print >> sys.stderr, "Selected: [%d, %d, %d]"%(len(r), len(w), len(e))
if len(r) == 0:
print >> sys.stderr, "Timeout!"
else:
for fd in r:
assert fd == self.fd
m = fd.read(1024)
self.handle_msg(m)
st = SelectTest(sys.stdin)
while True:
st.async_recv()
例のようにstdinなんかを入れても、(コンソールからの入力は改行単位でしか送信されないので)
あまり面白くないのですが、ネットワークの入力なんかを入れると、self.bufという変数に
改行が来る前の(不完全な)データがバッファされて、完全な1行単位でデータが出力されるのがポイントです。