Package wsgitools :: Package scgi :: Module asynchronous
[hide private]
[frames] | no frames]

Source Code for Module wsgitools.scgi.asynchronous

  1  __all__ = [] 
  2   
  3  import asyncore 
  4  import io 
  5  import socket 
  6  import sys 
  7  import errno 
  8   
  9  from wsgitools.internal import bytes2str, str2bytes 
 10  from wsgitools.scgi import _convert_environ, FileWrapper 
 11   
 12  if sys.version_info[0] >= 3: 
13 - def exc_info_for_raise(exc_info):
14 return exc_info[0](exc_info[1]).with_traceback(exc_info[2])
15 else:
16 - def exc_info_for_raise(exc_info):
17 return exc_info[0], exc_info[1], exc_info[2]
18
19 -class SCGIConnection(asyncore.dispatcher):
20 """SCGI connection class used by L{SCGIServer}.""" 21 # connection states 22 NEW = 0*4 | 1 # connection established, waiting for request 23 HEADER = 1*4 | 1 # the request length was received, waiting for the rest 24 BODY = 2*4 | 1 # the request header was received, waiting for the body, 25 # to RESP or RESPH 26 RESP = 3*4 | 2 # sending response, end state 27 RESPH = 4*4 | 2 # buffered response headers, sending headers only, to TRANS 28 TRANS = 5*4 | 2 # transferring using FileWrapper, end state
29 - def __init__(self, server, connection, addr, maxrequestsize=65536, 30 maxpostsize=8<<20, blocksize=4096, config={}):
31 asyncore.dispatcher.__init__(self, connection) 32 33 self.server = server # WSGISCGIServer instance 34 self.addr = addr # scgi client address 35 self.maxrequestsize = maxrequestsize 36 self.maxpostsize = maxpostsize 37 self.blocksize = blocksize 38 self.state = SCGIConnection.NEW # internal state 39 self.environ = config.copy() # environment passed to wsgi app 40 self.reqlen = -1 # request length used in two different meanings 41 self.inbuff = b"" # input buffer 42 self.outbuff = b"" # output buffer 43 self.wsgihandler = None # wsgi application 44 self.wsgiiterator = None # wsgi application iterator 45 self.outheaders = () # headers to be sent 46 # () -> unset, (..,..) -> set, True -> sent 47 self.body = io.BytesIO() # request body
48
49 - def _try_send_headers(self):
50 if self.outheaders != True: 51 assert not self.outbuff 52 status, headers = self.outheaders 53 headdata = "".join(map("%s: %s\r\n".__mod__, headers)) 54 headdata = "Status: %s\r\n%s\r\n" % (status, headdata) 55 self.outbuff = str2bytes(headdata) 56 self.outheaders = True
57
58 - def _wsgi_write(self, data):
59 assert self.state >= SCGIConnection.RESP 60 assert self.state < SCGIConnection.TRANS 61 assert isinstance(data, bytes) 62 if data: 63 self._try_send_headers() 64 self.outbuff += data
65
66 - def readable(self):
67 """C{asyncore} interface""" 68 return self.state & 1 == 1
69
70 - def writable(self):
71 """C{asyncore} interface""" 72 return self.state & 2 == 2
73
74 - def handle_read(self):
75 """C{asyncore} interface""" 76 data = self.recv(self.blocksize) 77 self.inbuff += data 78 if self.state == SCGIConnection.NEW: 79 if b':' in self.inbuff: 80 reqlen, self.inbuff = self.inbuff.split(b':', 1) 81 try: 82 reqlen = int(reqlen) 83 except ValueError: # invalid request format 84 self.close() 85 return 86 if reqlen > self.maxrequestsize: 87 self.close() 88 return # request too long 89 self.reqlen = reqlen 90 self.state = SCGIConnection.HEADER 91 elif len(self.inbuff) > self.maxrequestsize: 92 self.close() 93 return # request too long 94 95 if self.state == SCGIConnection.HEADER: 96 buff = self.inbuff[:self.reqlen] 97 remainder = self.inbuff[self.reqlen:] 98 99 while buff.count(b'\0') >= 2: 100 key, value, buff = buff.split(b'\0', 2) 101 self.environ[bytes2str(key)] = bytes2str(value) 102 self.reqlen -= len(key) + len(value) + 2 103 104 self.inbuff = buff + remainder 105 106 if self.reqlen == 0: 107 if self.inbuff.startswith(b','): 108 self.inbuff = self.inbuff[1:] 109 try: 110 self.reqlen = int(self.environ["CONTENT_LENGTH"]) 111 except ValueError: 112 self.close() 113 return 114 if self.reqlen > self.maxpostsize: 115 self.close() 116 return 117 self.state = SCGIConnection.BODY 118 else: 119 self.close() 120 return # protocol violation 121 122 if self.state == SCGIConnection.BODY: 123 if len(self.inbuff) >= self.reqlen: 124 self.body.write(self.inbuff[:self.reqlen]) 125 self.body.seek(0) 126 self.inbuff = b"" 127 self.reqlen = 0 128 _convert_environ(self.environ) 129 self.environ["wsgi.input"] = self.body 130 self.environ["wsgi.errors"] = self.server.error 131 self.wsgihandler = self.server.wsgiapp(self.environ, 132 self.start_response) 133 if isinstance(self.wsgihandler, FileWrapper) and \ 134 self.wsgihandler.can_transfer(): 135 self._try_send_headers() 136 self.state = SCGIConnection.RESPH 137 else: 138 self.wsgiiterator = iter(self.wsgihandler) 139 self.state = SCGIConnection.RESP 140 else: 141 self.body.write(self.inbuff) 142 self.reqlen -= len(self.inbuff) 143 self.inbuff = b""
144
145 - def start_response(self, status, headers, exc_info=None):
146 assert isinstance(status, str) 147 assert isinstance(headers, list) 148 if exc_info: 149 if self.outheaders == True: 150 try: 151 raise exc_info_for_raise(exc_info) 152 finally: 153 exc_info = None 154 assert self.outheaders != True # unsent 155 self.outheaders = (status, headers) 156 return self._wsgi_write
157
158 - def send_buff(self):
159 try: 160 sentbytes = self.send(self.outbuff[:self.blocksize]) 161 except socket.error: 162 self.close() 163 else: 164 self.outbuff = self.outbuff[sentbytes:]
165
166 - def handle_write(self):
167 """C{asyncore} interface""" 168 if self.state == SCGIConnection.RESP: 169 if len(self.outbuff) < self.blocksize: 170 self._try_send_headers() 171 for data in self.wsgiiterator: 172 assert isinstance(data, bytes) 173 if data: 174 self.outbuff += data 175 break 176 if len(self.outbuff) == 0: 177 self.close() 178 return 179 self.send_buff() 180 elif self.state == SCGIConnection.RESPH: 181 assert len(self.outbuff) > 0 182 self.send_buff() 183 if not self.outbuff: 184 self.state = SCGIConnection.TRANS 185 else: 186 assert self.state == SCGIConnection.TRANS 187 assert self.wsgihandler.can_transfer() 188 sent = self.wsgihandler.transfer(self.socket, self.blocksize) 189 if sent <= 0: 190 self.close()
191
192 - def close(self):
193 # None doesn't have a close attribute 194 if hasattr(self.wsgihandler, "close"): 195 self.wsgihandler.close() 196 asyncore.dispatcher.close(self)
197
198 - def handle_close(self):
199 """C{asyncore} interface""" 200 self.close()
201 202 __all__.append("SCGIServer")
203 -class SCGIServer(asyncore.dispatcher):
204 """SCGI Server for WSGI applications. It does not use multiple processes or 205 multiple threads."""
206 - def __init__(self, wsgiapp, port, interface="localhost", error=sys.stderr, 207 maxrequestsize=None, maxpostsize=None, blocksize=None, 208 config={}, reusesocket=None):
209 """ 210 @param wsgiapp: is the wsgi application to be run. 211 @type port: int 212 @param port: is an int representing the TCP port number to be used. 213 @type interface: str 214 @param interface: is a string specifying the network interface to bind 215 which defaults to C{"localhost"} making the server inaccessible 216 over network. 217 @param error: is a file-like object being passed as C{wsgi.error} in the 218 environ parameter defaulting to stderr. 219 @type maxrequestsize: int 220 @param maxrequestsize: limit the size of request blocks in scgi 221 connections. Connections are dropped when this limit is hit. 222 @type maxpostsize: int 223 @param maxpostsize: limit the size of post bodies that may be processed 224 by this instance. Connections are dropped when this limit is 225 hit. 226 @type blocksize: int 227 @param blocksize: is amount of data to read or write from or to the 228 network at once 229 @type config: {} 230 @param config: the environ dictionary is updated using these values for 231 each request. 232 @type reusesocket: None or socket.socket 233 @param reusesocket: If a socket is passed, do not create a socket. 234 Instead use given socket as listen socket. The passed socket 235 must be set up for accepting tcp connections (i.e. C{AF_INET}, 236 C{SOCK_STREAM} with bind and listen called). 237 """ 238 if reusesocket is None: 239 asyncore.dispatcher.__init__(self) 240 else: 241 asyncore.dispatcher.__init__(self, reusesocket) 242 243 self.wsgiapp = wsgiapp 244 self.error = error 245 self.conf = {} 246 if maxrequestsize is not None: 247 self.conf["maxrequestsize"] = maxrequestsize 248 if maxpostsize is not None: 249 self.conf["maxpostsize"] = maxpostsize 250 if blocksize is not None: 251 self.conf["blocksize"] = blocksize 252 self.conf["config"] = config 253 254 if reusesocket is None: 255 self.create_socket(socket.AF_INET, socket.SOCK_STREAM) 256 self.set_reuse_addr() 257 self.bind((interface, port)) 258 self.listen(5)
259
260 - def handle_accept(self):
261 """asyncore interface""" 262 try: 263 ret = self.accept() 264 except socket.error as err: 265 # See http://bugs.python.org/issue6706 266 if err.args[0] not in (errno.ECONNABORTED, errno.EAGAIN): 267 raise 268 else: 269 if ret is not None: 270 conn, addr = ret 271 SCGIConnection(self, conn, addr, **self.conf)
272
273 - def run(self):
274 """Runs the server. It will not return and you can invoke 275 C{asyncore.loop()} instead achieving the same effect.""" 276 asyncore.loop()
277