Package wsgitools :: Package scgi :: Module forkpool
[hide private]
[frames] | no frames]

Source Code for Module wsgitools.scgi.forkpool

  1  """ 
  2  The L{forkpool.SCGIServer} adapts a wsgi application to a scgi service. 
  3   
  4  It works with multiple processes that are periodically cleaned up to prevent 
  5  memory leaks having an impact to the system. 
  6  """ 
  7   
  8  try: 
  9      import resource 
 10  except ImportError: 
 11      resource = None 
 12  import socket 
 13  import os 
 14  import select 
 15  import sys 
 16  import errno 
 17  import signal 
 18   
 19  from wsgitools.internal import bytes2str, str2bytes 
 20  from wsgitools.scgi import _convert_environ, FileWrapper 
 21   
 22  if sys.version_info[0] >= 3: 
23 - def exc_info_for_raise(exc_info):
24 return exc_info[0](exc_info[1]).with_traceback(exc_info[2])
25 else:
26 - def exc_info_for_raise(exc_info):
27 return exc_info[0], exc_info[1], exc_info[2]
28 29 __all__ = [] 30
31 -class SocketFileWrapper(object):
32 """Wraps a socket to a wsgi-compliant file-like object."""
33 - def __init__(self, sock, toread):
34 """@param sock: is a C{socket.socket()}""" 35 self.sock = sock 36 self.buff = b"" 37 self.toread = toread
38
39 - def _recv(self, size=4096):
40 """ 41 internal method for receiving and counting incoming data 42 @raises socket.error: 43 """ 44 toread = min(size, self.toread) 45 if not toread: 46 return b"" 47 try: 48 data = self.sock.recv(toread) 49 except socket.error as why: 50 if why[0] in (errno.ECONNRESET, errno.ENOTCONN, errno.ESHUTDOWN): 51 data = b"" 52 else: 53 raise 54 self.toread -= len(data) 55 return data
56
57 - def close(self):
58 """Does not close the socket, because it might still be needed. It 59 reads all data that should have been read as given by C{CONTENT_LENGTH}. 60 """ 61 try: 62 while self.toread > 0: 63 if not self._recv(min(self.toread, 4096)): 64 return 65 except socket.error: 66 pass
67
68 - def read(self, size=None):
69 """ 70 see pep333 71 @raises socket.error: 72 """ 73 if size is None: 74 retl = [] 75 data = self.buff 76 self.buff = b"" 77 while True: 78 retl.append(data) 79 try: 80 data = self._recv() 81 except socket.error: 82 break 83 if not data: 84 break 85 return b"".join(retl) 86 datalist = [self.buff] 87 datalen = len(self.buff) 88 while datalen < size: 89 try: 90 data = self._recv(min(4096, size - datalen)) 91 except socket.error: 92 break 93 if not data: 94 break 95 datalist.append(data) 96 datalen += len(data) 97 self.buff = b"".join(datalist) 98 99 if size <= len(self.buff): 100 ret, self.buff = self.buff[:size], self.buff[size:] 101 return ret 102 ret, self.buff = self.buff, b"" 103 return ret
104
105 - def readline(self, size=None):
106 """ 107 see pep333 108 @raises socket.error: 109 """ 110 while True: 111 try: 112 split = self.buff.index(b'\n') + 1 113 if size is not None and split > size: 114 split = size 115 ret, self.buff = self.buff[:split], self.buff[split:] 116 return ret 117 except ValueError: 118 if size is not None: 119 if len(self.buff) < size: 120 data = self._recv(size - len(self.buff)) 121 else: 122 ret, self.buff = self.buff[:size], self.buff[size:] 123 return ret 124 else: 125 data = self._recv(4096) 126 if not data: 127 ret, self.buff = self.buff, b"" 128 return ret 129 self.buff += data
130
131 - def readlines(self):
132 """ 133 see pep333 134 @raises socket.error: 135 """ 136 data = self.readline() 137 while data: 138 yield data 139 data = self.readline()
140 - def __iter__(self):
141 """see pep333""" 142 return self
143 - def __next__(self):
144 """ 145 see pep333 146 @raises socket.error: 147 """ 148 data = self.read(4096) 149 if not data: 150 raise StopIteration 151 return data
152 - def next(self):
153 return self.__next__()
154 - def flush(self):
155 """see pep333"""
156 - def write(self, data):
157 """see pep333""" 158 assert isinstance(data, bytes) 159 try: 160 self.sock.sendall(data) 161 except socket.error: 162 # ignore all socket errors: there is no way to report 163 return
164 - def writelines(self, lines):
165 """see pep333""" 166 for line in lines: 167 self.write(line)
168 169 __all__.append("SCGIServer")
170 -class SCGIServer(object):
171 """Usage: create an L{SCGIServer} object and invoke the run method which 172 will then turn this process into an scgi server."""
173 - class WorkerState(object):
174 """state: 0 means idle and 1 means working. 175 These values are also sent as strings '0' and '1' over the socket."""
176 - def __init__(self, pid, sock, state):
177 """ 178 @type pid: int 179 @type state: int 180 """ 181 self.pid = pid 182 self.sock = sock 183 self.state = state
184
185 - def __init__(self, wsgiapp, port, interface="localhost", error=sys.stderr, 186 minworkers=2, maxworkers=32, maxrequests=1000, config={}, 187 reusesocket=None, cpulimit=None, timelimit=None):
188 """ 189 @param wsgiapp: is the WSGI application to be run. 190 @type port: int 191 @param port: is the tcp port to listen on 192 @type interface: str 193 @param interface: is the interface to bind to (default: C{"localhost"}) 194 @param error: is a file-like object beeing passed as C{wsgi.errors} in 195 environ 196 @type minworkers: int 197 @param minworkers: is the number of worker processes to spawn 198 @type maxworkers: int 199 @param maxworkers: is the maximum number of workers that can be spawned 200 on demand 201 @type maxrequests: int 202 @param maxrequests: is the number of requests a worker processes before 203 dying 204 @type config: {} 205 @param config: the environ dictionary is updated using these values for 206 each request. 207 @type reusesocket: None or socket.socket 208 @param reusesocket: If a socket is passed, do not create a socket. 209 Instead use given socket as listen socket. The passed socket 210 must be set up for accepting tcp connections (i.e. C{AF_INET}, 211 C{SOCK_STREAM} with bind and listen called). 212 @type cpulimit: (int, int) 213 @param cpulimit: a pair of soft and hard cpu time limit in seconds. 214 This limit is installed for each worker using RLIMIT_CPU if 215 resource limits are available to this platform. After reaching 216 the soft limit workers will continue to process the current 217 request and then cleanly terminate. 218 @type timelimit: int 219 @param timelimit: The maximum number of wall clock seconds processing 220 a request should take. If this is specified, an alarm timer is 221 installed and the default action is to kill the worker. 222 """ 223 assert hasattr(error, "write") 224 self.wsgiapp = wsgiapp 225 self.bind_address = (interface, port) 226 self.minworkers = minworkers 227 self.maxworkers = maxworkers 228 self.maxrequests = maxrequests 229 self.config = config.copy() 230 self.config["wsgi.errors"] = error 231 self.reusesocket = reusesocket 232 # cpulimit changes meaning: 233 # master: None or a tuple denoting the limit to be configured. 234 # worker: boolean denoting whether the limit is reached. 235 self.cpulimit = cpulimit 236 self.timelimit = timelimit 237 self.server = None # becomes a socket 238 self.sigpipe = None # becomes a pair socketpair endpoints 239 # maps filedescriptors to WorkerStates 240 self.workers = {} 241 self.running = False 242 self.ischild = False
243
244 - def enable_sighandler(self, sig=signal.SIGTERM):
245 """ 246 Changes the signal handler for the given signal to terminate the run() 247 loop. 248 @param sig: is the signal to handle 249 @returns: self 250 """ 251 signal.signal(sig, self.shutdownhandler) 252 return self
253
254 - def run(self):
255 """ 256 Serve the wsgi application. 257 """ 258 if self.reusesocket is None: 259 self.server = socket.socket() 260 self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 261 self.server.bind(self.bind_address) 262 self.server.listen(5) 263 else: 264 self.server = self.reusesocket 265 self.sigpipe = socket.socketpair() 266 self.running = True 267 while self.running: 268 while (len(self.workers) < self.minworkers or # less than min 269 (len(self.workers) < self.maxworkers and # less than max 270 not len([w for w in # no inactive 271 self.workers.values() if w.state == 0]))): 272 self.spawnworker() 273 rs = list(self.workers.keys()) 274 rs.append(self.sigpipe[0]) 275 try: 276 rs, _, _ = select.select(rs, [], []) 277 except select.error as e: 278 if e[0] != errno.EINTR: 279 raise 280 rs = [] 281 for s in rs: 282 if s == self.sigpipe[0]: 283 self.sigpipe[0].recv(1) 284 continue 285 try: 286 data = self.workers[s].sock.recv(1) 287 except socket.error: 288 # we cannot handle errors here, so drop the connection. 289 data = b'' 290 if data == b'': 291 self.workers[s].sock.close() 292 del self.workers[s] 293 elif data in (b'0', b'1'): 294 self.workers[s].state = int(data) 295 else: 296 raise RuntimeError("unexpected data from worker") 297 try: 298 pid = 1 299 while pid > 0: 300 pid, _ = os.waitpid(0, os.WNOHANG) 301 except OSError: 302 pass 303 if self.reusesocket is None: 304 self.server.close() 305 self.server = None 306 self.sigpipe[0].close() 307 self.sigpipe[1].close() 308 self.sigpipe = None 309 self.killworkers()
310
311 - def killworkers(self, sig=signal.SIGTERM):
312 """ 313 Kills all worker children. 314 @param sig: is the signal used to kill the children 315 """ 316 while self.workers: 317 _, state = self.workers.popitem() 318 state.sock.close() 319 os.kill(state.pid, sig)
320 # TODO: handle working children with a timeout 321
322 - def shutdownhandler(self, sig=None, stackframe=None):
323 """ 324 Signal handler function for stopping the run() loop. It works by 325 setting a variable that run() evaluates in each loop. As a signal 326 interrupts accept the loop is terminated, the accepting socket is 327 closed and the workers are killed. 328 @param sig: ignored for usage with signal.signal 329 @param stackframe: ignored for usage with signal.signal 330 """ 331 if self.ischild: 332 sys.exit() 333 elif self.running: 334 self.running = False 335 self.sigpipe[1].send(b' ')
336
337 - def sigxcpuhandler(self, sig=None, stackframe=None):
338 """ 339 Signal hanlder function for the SIGXCUP signal. It is sent to a 340 worker when the soft RLIMIT_CPU is crossed. 341 @param sig: ignored for usage with signal.signal 342 @param stackframe: ignored for usage with signal.signal 343 """ 344 self.cpulimit = True
345
346 - def spawnworker(self):
347 """ 348 internal! spawns a single worker 349 """ 350 srvsock, worksock = socket.socketpair() 351 352 pid = os.fork() 353 if pid == 0: 354 self.ischild = True 355 # close unneeded sockets 356 srvsock.close() 357 for worker in self.workers.values(): 358 worker.sock.close() 359 del self.workers 360 361 if self.cpulimit and resource: 362 signal.signal(signal.SIGXCPU, self.sigxcpuhandler) 363 resource.setrlimit(resource.RLIMIT_CPU, self.cpulimit) 364 self.cpulimit = False 365 366 try: 367 self.work(worksock) 368 except socket.error: 369 pass 370 371 sys.exit() 372 elif pid > 0: 373 # close unneeded sockets 374 worksock.close() 375 376 self.workers[srvsock.fileno()] = SCGIServer.\ 377 WorkerState(pid, srvsock, 0) 378 else: 379 raise RuntimeError("fork failed")
380
381 - def work(self, worksock):
382 """ 383 internal! serves maxrequests times 384 @raises socket.error: 385 """ 386 for _ in range(self.maxrequests): 387 (con, addr) = self.server.accept() 388 # we cannot handle socket.errors here. 389 worksock.sendall(b'1') # tell server we're working 390 if self.timelimit: 391 signal.alarm(self.timelimit) 392 self.process(con) 393 if self.timelimit: 394 signal.alarm(0) 395 worksock.sendall(b'0') # tell server we've finished 396 if self.cpulimit: 397 break
398
399 - def process(self, con):
400 """ 401 internal! processes a single request on the connection con. 402 """ 403 # This is a little bit ugly: 404 # The server has to send the length of the request followed by a colon. 405 # We assume that 1. the colon is within the first seven bytes. 406 # 2. the packet isn't fragmented. 407 # Furthermore 1 implies that the request isn't longer than 999999 bytes. 408 # This method however works. :-) 409 try: 410 data = con.recv(7) 411 except socket.error: 412 con.close() 413 return 414 if not b':' in data: 415 con.close() 416 return 417 length, data = data.split(b':', 1) 418 try: 419 length = int(length) 420 except ValueError: # clear protocol violation 421 con.close() 422 return 423 424 while len(data) != length + 1: # read one byte beyond 425 try: 426 t = con.recv(min(4096, length + 1 - len(data))) 427 except socket.error: 428 con.close() 429 return 430 if not t: # request too short 431 con.close() 432 return 433 data += t 434 435 # netstrings! 436 data = data.split(b'\0') 437 # the byte beyond has to be a ','. 438 # and the number of netstrings excluding the final ',' has to be even 439 if data.pop() != b',' or len(data) % 2 != 0: 440 con.close() 441 return 442 443 environ = self.config.copy() 444 while data: 445 key = bytes2str(data.pop(0)) 446 value = bytes2str(data.pop(0)) 447 environ[key] = value 448 449 # elements: 450 # 0 -> None: no headers set 451 # 0 -> False: set but unsent 452 # 0 -> True: sent 453 # 1 -> bytes of the complete header 454 response_head = [None, None] 455 456 def sendheaders(): 457 assert response_head[0] is not None # headers set 458 if response_head[0] != True: 459 response_head[0] = True 460 try: 461 con.sendall(response_head[1]) 462 except socket.error: 463 pass
464 465 def dumbsend(data): 466 sendheaders() 467 try: 468 con.sendall(data) 469 except socket.error: 470 pass
471 472 def start_response(status, headers, exc_info=None): 473 if exc_info and response_head[0]: 474 try: 475 raise exc_info_for_raise(exc_info) 476 finally: 477 exc_info = None 478 assert isinstance(status, str) 479 assert isinstance(headers, list) 480 assert all(isinstance(k, str) and isinstance(v, str) 481 for (k, v) in headers) 482 assert not response_head[0] # unset or not sent 483 headers = "".join(map("%s: %s\r\n".__mod__, headers)) 484 full_header = "Status: %s\r\n%s\r\n" % (status, headers) 485 response_head[1] = str2bytes(full_header) 486 response_head[0] = False # set but nothing sent 487 return dumbsend 488 489 try: 490 content_length = int(environ["CONTENT_LENGTH"]) 491 except ValueError: 492 con.close() 493 return 494 495 _convert_environ(environ, multiprocess=True) 496 sfw = SocketFileWrapper(con, content_length) 497 environ["wsgi.input"] = sfw 498 499 result = self.wsgiapp(environ, start_response) 500 assert hasattr(result, "__iter__") 501 502 if isinstance(result, FileWrapper) and result.can_transfer(): 503 sendheaders() 504 sent = 1 505 while sent > 0: 506 sent = result.transfer(con) 507 else: 508 result_iter = iter(result) 509 for data in result_iter: 510 assert response_head[0] is not None 511 assert isinstance(data, bytes) 512 dumbsend(data) 513 if response_head[0] != True: 514 sendheaders() 515 if hasattr(result, "close"): 516 result.close() 517 sfw.close() 518 con.close() 519