本文共 4514 字,大约阅读时间需要 15 分钟。
前面学习的例子都是单线程的socket收发;如果有多个用户同时接入,那么除了第一个连入的,后面的都会处于挂起等待的状态,直到当前连接的客户端断开为止。
通过使用socketserver,我们可以实现并发的连接。
socketserver的使用很简单:
首先看个简单的例子
服务端:
自己定义一个类,继承socketserver.baserequesthandler;
然后定义一个方法 handle()
然后通过socketserver.threadingTCPServer指定套接字和自己定义的类,每次当客户端连入的时候,会自动实例化一个对象,然后通过server_forever()不断循环读写数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | #!/usr/bin/env python # -*- coding:utf-8 -*- # Author Yuan Li import socketserver class mysocketserver(socketserver.BaseRequestHandler): def handle( self ): conn = self .request conn.sendall(bytes( "Welcome to the Test system." , encoding = 'utf-8' )) while True : try : data = conn.recv( 1024 ) if len (data) = = 0 : break print ( "[%s] sends %s" % ( self .client_address, data.decode())) conn.sendall(data.upper()) except Exception: break if __name__ = = '__main__' : server = socketserver.ThreadingTCPServer(( '127.0.0.1' , 8009 ), mysocketserver) server.serve_forever() |
客户端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | #!/usr/bin/env python # -*- coding:utf-8 -*- # Author Yuan Li import socket ip_port = ( '127.0.0.1' , 8009 ) s = socket.socket() s.connect(ip_port) data = s.recv( 1024 ) print (data.decode()) while True : send_data = input ( "Data>>>" ) s.send(bytes(send_data, encoding = 'utf-8' )) recv_data = s.recv( 1024 ) print (recv_data.decode()) |
上面的效果是多个客户端可以同时连入服务器,输入字母,返回大写字母。
客户端没啥好说的,这个和单线程的操作一样;但是服务器咋一看很混乱。我们可以通过剖析源码来弄清他的执行过程。
这个类的基本结构是如下所示的,我们按照顺序来跑一次看看他怎么调用的
1. 首先执行的这句话,很明显ThredingTCPServer是一个类,点进去看看他的实例化过程
1 | server = socketserver.ThreadingTCPServer(( '127.0.0.1' , 8009 ), mysocketserver) |
2. 点着Ctrl键,点击这个类,PyCharm会自动打开对应的源码,可以看见这个类又继承了两个父类ThredingMixIn和TCPServer
1 | class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass |
因为他的内容是pass,啥也没做,根据继承的顺序,我们继续往上(从左到右)找init构造函数;
3. ThreadingMixIn里面没有构造函数,那就继续往右找,TCPServer里面倒是有构造函数,但是他又调用了他父类BaseServer的构造函数,顺着看上去,发现他就是封装了几个值在里面,注意 self.RequestHandlerClass = RequestHandlerClass把我们自己定义的类传进去了
1 2 3 4 5 6 | def __init__( self , server_address, RequestHandlerClass): """Constructor. May be extended, do not override.""" self .server_address = server_address self .RequestHandlerClass = RequestHandlerClass self .__is_shut_down = threading.Event() self .__shutdown_request = False |
4.接下来,在TCPServer的构造函数里面,他执行了bind,listen的操作,这个和单线程的操作是一样的。到此为止,一个初始化的过程基本就完成了。
5.接下来,执行了server.serve_forever()的操作,我们看看内部是怎么调用的。在这个函数里面,使用了selector的IO多路复用的技术,循环的读取一个文件的操作。接着调用了_handle_request_noblock()函数
1 2 3 4 5 6 7 8 9 10 11 12 | try : # XXX: Consider using another file descriptor or connecting to the # socket to wake this up instead of polling. Polling reduces our # responsiveness to a shutdown request and wastes cpu at all other # times. with _ServerSelector() as selector: selector.register( self , selectors.EVENT_READ) while not self .__shutdown_request: ready = selector.select(poll_interval) if ready: self ._handle_request_noblock() self .service_actions() |
6.每次调用函数的时候都记住查找的顺序,从下往上,从左往右,最后在最上面的BaseServer再次找到这个函数,这个函数里面又调用了 process_request函数
1 2 3 4 5 6 7 8 | """ try : request, client_address = self .get_request() except OSError: return if self .verify_request(request, client_address): try : self .process_request(request, client_address) |
一定要记住继承的顺序!!顺序!!顺序!!
因为baseserver自己有process_request的方法,ThreadingTCPServer也有同名的方法,当他调用的时候,按照顺序,是执行的ThreadingTCPServer里面的方法!!
可以看见他开了一个多线程
1 2 3 4 5 6 | def process_request( self , request, client_address): """Start a new thread to process the request.""" t = threading.Thread(target = self .process_request_thread, args = (request, client_address)) t.daemon = self .daemon_threads t.start() |
在他调用的process_request_thread里面,他又调用了finsih_request
1 2 3 4 5 6 7 8 9 10 | def process_request_thread( self , request, client_address): """Same as in BaseServer but as a thread. In addition, exception handling is done here. """ try : self .finish_request(request, client_address) self .shutdown_request(request) except : self .handle_error(request, client_address) self .shutdown_request(request) |
finish_request里面有对我们自定义的类做了一个实例化的操作
1 2 3 | def finish_request( self , request, client_address): """Finish one request by instantiating RequestHandlerClass.""" self .RequestHandlerClass(request, client_address, self ) |
因为我们自定义的类没有构造函数,他会去父类寻找,父类里面会尝试执行handle()方法,这就是为什么我们需要在自定义的类里面定义一个同名的方法,然后把所有需要执行的内容都放在这里。
1 2 3 4 5 6 7 | def __init__( self , request, client_address, server): self .request = request self .client_address = client_address self .server = server self .setup() try : self .handle() |
到此,socketserver一个完整的过程就结束了