1 """
2 The L{forkpool.SCGIServer} adapts a wsgi application to a scgi service.
3
4 It works with multiple processes that are periodically cleaned up to prevent
5 memory leaks having an impact to the system.
6 """
7
8 try:
9 import resource
10 except ImportError:
11 resource = None
12 import socket
13 import os
14 import select
15 import sys
16 import errno
17 import signal
18
19 from wsgitools.internal import bytes2str, str2bytes
20 from wsgitools.scgi import _convert_environ, FileWrapper
21
22 if sys.version_info[0] >= 3:
24 return exc_info[0](exc_info[1]).with_traceback(exc_info[2])
25 else:
27 return exc_info[0], exc_info[1], exc_info[2]
28
29 __all__ = []
30
32 """Wraps a socket to a wsgi-compliant file-like object."""
34 """@param sock: is a C{socket.socket()}"""
35 self.sock = sock
36 self.buff = b""
37 self.toread = toread
38
39 - def _recv(self, size=4096):
40 """
41 internal method for receiving and counting incoming data
42 @raises socket.error:
43 """
44 toread = min(size, self.toread)
45 if not toread:
46 return b""
47 try:
48 data = self.sock.recv(toread)
49 except socket.error as why:
50 if why[0] in (errno.ECONNRESET, errno.ENOTCONN, errno.ESHUTDOWN):
51 data = b""
52 else:
53 raise
54 self.toread -= len(data)
55 return data
56
58 """Does not close the socket, because it might still be needed. It
59 reads all data that should have been read as given by C{CONTENT_LENGTH}.
60 """
61 try:
62 while self.toread > 0:
63 if not self._recv(min(self.toread, 4096)):
64 return
65 except socket.error:
66 pass
67
68 - def read(self, size=None):
69 """
70 see pep333
71 @raises socket.error:
72 """
73 if size is None:
74 retl = []
75 data = self.buff
76 self.buff = b""
77 while True:
78 retl.append(data)
79 try:
80 data = self._recv()
81 except socket.error:
82 break
83 if not data:
84 break
85 return b"".join(retl)
86 datalist = [self.buff]
87 datalen = len(self.buff)
88 while datalen < size:
89 try:
90 data = self._recv(min(4096, size - datalen))
91 except socket.error:
92 break
93 if not data:
94 break
95 datalist.append(data)
96 datalen += len(data)
97 self.buff = b"".join(datalist)
98
99 if size <= len(self.buff):
100 ret, self.buff = self.buff[:size], self.buff[size:]
101 return ret
102 ret, self.buff = self.buff, b""
103 return ret
104
106 """
107 see pep333
108 @raises socket.error:
109 """
110 while True:
111 try:
112 split = self.buff.index(b'\n') + 1
113 if size is not None and split > size:
114 split = size
115 ret, self.buff = self.buff[:split], self.buff[split:]
116 return ret
117 except ValueError:
118 if size is not None:
119 if len(self.buff) < size:
120 data = self._recv(size - len(self.buff))
121 else:
122 ret, self.buff = self.buff[:size], self.buff[size:]
123 return ret
124 else:
125 data = self._recv(4096)
126 if not data:
127 ret, self.buff = self.buff, b""
128 return ret
129 self.buff += data
130
132 """
133 see pep333
134 @raises socket.error:
135 """
136 data = self.readline()
137 while data:
138 yield data
139 data = self.readline()
141 """see pep333"""
142 return self
144 """
145 see pep333
146 @raises socket.error:
147 """
148 data = self.read(4096)
149 if not data:
150 raise StopIteration
151 return data
157 """see pep333"""
158 assert isinstance(data, bytes)
159 try:
160 self.sock.sendall(data)
161 except socket.error:
162
163 return
165 """see pep333"""
166 for line in lines:
167 self.write(line)
168
169 __all__.append("SCGIServer")
171 """Usage: create an L{SCGIServer} object and invoke the run method which
172 will then turn this process into an scgi server."""
174 """state: 0 means idle and 1 means working.
175 These values are also sent as strings '0' and '1' over the socket."""
177 """
178 @type pid: int
179 @type state: int
180 """
181 self.pid = pid
182 self.sock = sock
183 self.state = state
184
185 - def __init__(self, wsgiapp, port, interface="localhost", error=sys.stderr,
186 minworkers=2, maxworkers=32, maxrequests=1000, config={},
187 reusesocket=None, cpulimit=None, timelimit=None):
188 """
189 @param wsgiapp: is the WSGI application to be run.
190 @type port: int
191 @param port: is the tcp port to listen on
192 @type interface: str
193 @param interface: is the interface to bind to (default: C{"localhost"})
194 @param error: is a file-like object beeing passed as C{wsgi.errors} in
195 environ
196 @type minworkers: int
197 @param minworkers: is the number of worker processes to spawn
198 @type maxworkers: int
199 @param maxworkers: is the maximum number of workers that can be spawned
200 on demand
201 @type maxrequests: int
202 @param maxrequests: is the number of requests a worker processes before
203 dying
204 @type config: {}
205 @param config: the environ dictionary is updated using these values for
206 each request.
207 @type reusesocket: None or socket.socket
208 @param reusesocket: If a socket is passed, do not create a socket.
209 Instead use given socket as listen socket. The passed socket
210 must be set up for accepting tcp connections (i.e. C{AF_INET},
211 C{SOCK_STREAM} with bind and listen called).
212 @type cpulimit: (int, int)
213 @param cpulimit: a pair of soft and hard cpu time limit in seconds.
214 This limit is installed for each worker using RLIMIT_CPU if
215 resource limits are available to this platform. After reaching
216 the soft limit workers will continue to process the current
217 request and then cleanly terminate.
218 @type timelimit: int
219 @param timelimit: The maximum number of wall clock seconds processing
220 a request should take. If this is specified, an alarm timer is
221 installed and the default action is to kill the worker.
222 """
223 assert hasattr(error, "write")
224 self.wsgiapp = wsgiapp
225 self.bind_address = (interface, port)
226 self.minworkers = minworkers
227 self.maxworkers = maxworkers
228 self.maxrequests = maxrequests
229 self.config = config.copy()
230 self.config["wsgi.errors"] = error
231 self.reusesocket = reusesocket
232
233
234
235 self.cpulimit = cpulimit
236 self.timelimit = timelimit
237 self.server = None
238 self.sigpipe = None
239
240 self.workers = {}
241 self.running = False
242 self.ischild = False
243
245 """
246 Changes the signal handler for the given signal to terminate the run()
247 loop.
248 @param sig: is the signal to handle
249 @returns: self
250 """
251 signal.signal(sig, self.shutdownhandler)
252 return self
253
255 """
256 Serve the wsgi application.
257 """
258 if self.reusesocket is None:
259 self.server = socket.socket()
260 self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
261 self.server.bind(self.bind_address)
262 self.server.listen(5)
263 else:
264 self.server = self.reusesocket
265 self.sigpipe = socket.socketpair()
266 self.running = True
267 while self.running:
268 while (len(self.workers) < self.minworkers or
269 (len(self.workers) < self.maxworkers and
270 not len([w for w in
271 self.workers.values() if w.state == 0]))):
272 self.spawnworker()
273 rs = list(self.workers.keys())
274 rs.append(self.sigpipe[0])
275 try:
276 rs, _, _ = select.select(rs, [], [])
277 except select.error as e:
278 if e[0] != errno.EINTR:
279 raise
280 rs = []
281 for s in rs:
282 if s == self.sigpipe[0]:
283 self.sigpipe[0].recv(1)
284 continue
285 try:
286 data = self.workers[s].sock.recv(1)
287 except socket.error:
288
289 data = b''
290 if data == b'':
291 self.workers[s].sock.close()
292 del self.workers[s]
293 elif data in (b'0', b'1'):
294 self.workers[s].state = int(data)
295 else:
296 raise RuntimeError("unexpected data from worker")
297 try:
298 pid = 1
299 while pid > 0:
300 pid, _ = os.waitpid(0, os.WNOHANG)
301 except OSError:
302 pass
303 if self.reusesocket is None:
304 self.server.close()
305 self.server = None
306 self.sigpipe[0].close()
307 self.sigpipe[1].close()
308 self.sigpipe = None
309 self.killworkers()
310
312 """
313 Kills all worker children.
314 @param sig: is the signal used to kill the children
315 """
316 while self.workers:
317 _, state = self.workers.popitem()
318 state.sock.close()
319 os.kill(state.pid, sig)
320
321
323 """
324 Signal handler function for stopping the run() loop. It works by
325 setting a variable that run() evaluates in each loop. As a signal
326 interrupts accept the loop is terminated, the accepting socket is
327 closed and the workers are killed.
328 @param sig: ignored for usage with signal.signal
329 @param stackframe: ignored for usage with signal.signal
330 """
331 if self.ischild:
332 sys.exit()
333 elif self.running:
334 self.running = False
335 self.sigpipe[1].send(b' ')
336
338 """
339 Signal hanlder function for the SIGXCUP signal. It is sent to a
340 worker when the soft RLIMIT_CPU is crossed.
341 @param sig: ignored for usage with signal.signal
342 @param stackframe: ignored for usage with signal.signal
343 """
344 self.cpulimit = True
345
347 """
348 internal! spawns a single worker
349 """
350 srvsock, worksock = socket.socketpair()
351
352 pid = os.fork()
353 if pid == 0:
354 self.ischild = True
355
356 srvsock.close()
357 for worker in self.workers.values():
358 worker.sock.close()
359 del self.workers
360
361 if self.cpulimit and resource:
362 signal.signal(signal.SIGXCPU, self.sigxcpuhandler)
363 resource.setrlimit(resource.RLIMIT_CPU, self.cpulimit)
364 self.cpulimit = False
365
366 try:
367 self.work(worksock)
368 except socket.error:
369 pass
370
371 sys.exit()
372 elif pid > 0:
373
374 worksock.close()
375
376 self.workers[srvsock.fileno()] = SCGIServer.\
377 WorkerState(pid, srvsock, 0)
378 else:
379 raise RuntimeError("fork failed")
380
381 - def work(self, worksock):
382 """
383 internal! serves maxrequests times
384 @raises socket.error:
385 """
386 for _ in range(self.maxrequests):
387 (con, addr) = self.server.accept()
388
389 worksock.sendall(b'1')
390 if self.timelimit:
391 signal.alarm(self.timelimit)
392 self.process(con)
393 if self.timelimit:
394 signal.alarm(0)
395 worksock.sendall(b'0')
396 if self.cpulimit:
397 break
398
400 """
401 internal! processes a single request on the connection con.
402 """
403
404
405
406
407
408
409 try:
410 data = con.recv(7)
411 except socket.error:
412 con.close()
413 return
414 if not b':' in data:
415 con.close()
416 return
417 length, data = data.split(b':', 1)
418 try:
419 length = int(length)
420 except ValueError:
421 con.close()
422 return
423
424 while len(data) != length + 1:
425 try:
426 t = con.recv(min(4096, length + 1 - len(data)))
427 except socket.error:
428 con.close()
429 return
430 if not t:
431 con.close()
432 return
433 data += t
434
435
436 data = data.split(b'\0')
437
438
439 if data.pop() != b',' or len(data) % 2 != 0:
440 con.close()
441 return
442
443 environ = self.config.copy()
444 while data:
445 key = bytes2str(data.pop(0))
446 value = bytes2str(data.pop(0))
447 environ[key] = value
448
449
450
451
452
453
454 response_head = [None, None]
455
456 def sendheaders():
457 assert response_head[0] is not None
458 if response_head[0] != True:
459 response_head[0] = True
460 try:
461 con.sendall(response_head[1])
462 except socket.error:
463 pass
464
465 def dumbsend(data):
466 sendheaders()
467 try:
468 con.sendall(data)
469 except socket.error:
470 pass
471
472 def start_response(status, headers, exc_info=None):
473 if exc_info and response_head[0]:
474 try:
475 raise exc_info_for_raise(exc_info)
476 finally:
477 exc_info = None
478 assert isinstance(status, str)
479 assert isinstance(headers, list)
480 assert all(isinstance(k, str) and isinstance(v, str)
481 for (k, v) in headers)
482 assert not response_head[0]
483 headers = "".join(map("%s: %s\r\n".__mod__, headers))
484 full_header = "Status: %s\r\n%s\r\n" % (status, headers)
485 response_head[1] = str2bytes(full_header)
486 response_head[0] = False
487 return dumbsend
488
489 try:
490 content_length = int(environ["CONTENT_LENGTH"])
491 except ValueError:
492 con.close()
493 return
494
495 _convert_environ(environ, multiprocess=True)
496 sfw = SocketFileWrapper(con, content_length)
497 environ["wsgi.input"] = sfw
498
499 result = self.wsgiapp(environ, start_response)
500 assert hasattr(result, "__iter__")
501
502 if isinstance(result, FileWrapper) and result.can_transfer():
503 sendheaders()
504 sent = 1
505 while sent > 0:
506 sent = result.transfer(con)
507 else:
508 result_iter = iter(result)
509 for data in result_iter:
510 assert response_head[0] is not None
511 assert isinstance(data, bytes)
512 dumbsend(data)
513 if response_head[0] != True:
514 sendheaders()
515 if hasattr(result, "close"):
516 result.close()
517 sfw.close()
518 con.close()
519