Pythonでネットワーク
-
お手軽サーバ。
import socket
ss = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
ss.bind(("192.168.1.1",8011))
ss.listen(5)
sock = ss.accept()
sock.recv();
-
bindのときに、前にbindしたソケットはちゃんとclose()しているのに
socket.error: (98, 'Address already in use')
が出てしまうとき、
ss = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
ss.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1);
ss.bind((self.ip_addr, self.port))
とすると、(若干の混信の可能性はあるが)すぐにアドレスが使えるようになる。
(はじめはTCPのtimeout自体を設定しようとしたけど、下のコードではうまくいかなかった…)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVTIMEO, 10)
(10のとこはちゃんとバイトオーダーを考えてpack()とかすべき。)
-
PythonでSelect!
毎回fd_setを更新しなくてもいいので楽々です。
import sys
import select
while True:
(r, w, e) = select.select([sys.stdin], [], [])
for fd in r:
print fd
ret = sys.stdin.readline()
print "Read : " + ret
-
ホスト名からIPアドレスを引く → socket.gethostbyname()
import socket
print socket.gethostbyname("www.yahoo.com")
-
ポートは49152〜65535を使うのがお行儀が良いらしい。
1024未満はroot無いと使えませんね。
-
数値とかをネットワークで送ったり、ファイルに書いたりするとき。
structモジュールを使う。
from struct import *
i = 123
msg = pack('l', i)
print "packed : " + msg
# これでmsgを送ったり書いたりできる
back = unpack('l', msg)
print "unpacked :" + str(i)
import socket
class MySocket:
# self.sock
def __init__(self, sock=None):
if sock is None:
self.sock = socket.socket(\
socket.AF_INET, socket.SOCK_STREAM)
else:
self.sock = sock
def connect(self, host, port):
self.sock.connect((host, port))
def close(self):
self.sock.close()
def send(self, msg):
msglen = len(msg)
totalsent = 0
while totalsent < msglen:
sent = self.sock.send(msg[totalsent:])
if sent == 0:
raise RuntimeError, \
"socket connection broken"
totalsent = totalsent + sent
def recv(self, msglen):
msg = ''
while len(msg) < msglen:
chunk = self.sock.recv(msglen-len(msg))
if chunk == '':
raise RuntimeError, \
"socket connection broken"
msg = msg + chunk
return msg
def createServerSocket(self, port):
#create an INET, STREAMing socket
self.server_sock = socket.socket(
socket.AF_INET, socket.SOCK_STREAM)
self.sock.bind((socket.gethostname(), port))
self.sock.listen(5)
#class MyServer (MySocket):
#
# def accept_test(self):
# (clientsocket, address) = self.sock.accept()
# ss = MySocket(clientsocket)
# while True:
# print ss.recv(100)
#
#
#server = MyServer()
#server.createServerSocket(23456)
#server.accept_test()
##server.
hostnameを取得。
socket.gethostbyaddr(socket.gethostname())