Viewing file: Daemon.py (4.95 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
import os, errno, socket, select, threading, sys, traceback
from Ft.Server.Server import Connection
# possible socket errors during accept() _sock_non_fatal = [] for name in ('ECONNABORTED', 'ECONNRESET', 'ETIMEDOUT', 'EHOSTUNREACH', 'ENETUNREACH', 'EPROTO', 'ENETDOWN', 'EHOSTDOWN', 'ENONET'): if hasattr(errno, name): _sock_non_fatal.append(getattr(errno, name))
class Daemon(threading.Thread): def __init__(self, mutex, listeners, errorLog): threading.Thread.__init__(self) self.accepting_mutex = mutex self.listeners = listeners self.errorLog = errorLog
# scoreboard information self.requests = 0 self.ready = 1 self.active = 0 self.pid = -1 return
def _set_daemon(self): # Let Python exit with active threads return 1
def __repr__(self): return '<Daemon pid=%d>' % self.pid
def run(self): """ Each thread runs within this function. They wait for a job to become available, then handle all the requests on that connection until it is closed, then return to wait for more jobs. """ self.pid = os.getpid() self.errorLog.debug('%s started' % self)
listeners = self.listeners mutex = self.accepting_mutex
# Signify that we are actually running self.active = 1 while self.active: self.ready = 1
try: ready = select.select(listeners, [], [], 1)[0] except select.error: # Single UNIX documents select as returning errnos # EBADF, EINVAL and ENOMEM... and in none of # those cases does it make sense to continue. break else: if not ready: # timed out; this allows for our owner to kill us off # by changing the active flag. continue
try: # Serialize the accepts between all daemons mutex.acquire() try: # Make sure there is still a request left to process, # because of multiple threads, this is not always true. try: ready = select.select(ready, [], [], 0)[0] except select.error: break # As soon as a connection is accepted, it no longer will # be in the input pending list if ready: conn_sock, client_addr = ready[0].socket.accept() server = ready[0].server else: continue finally: mutex.release() except socket.error, (code, error): # Most of the errors are quite fatal. So it seems # best just to exit in most cases. if code in _sock_non_fatal: # ignore common disconnect errors continue else: why = '[errno %d] %s' % (code, error) self.errorLog.error('during socket.accept(): %s' % why) break
# We now have a connection, so set it up with the appropriate # socket options, file descriptors, and read/write buffers. try: local_addr = conn_sock.getsockname() except socket.error: self.errorLog.error('getsockname') continue
try: conn_sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) except socket.error: self.errorLog.warning('setsockopt: TCP_NODELAY')
# Signify that we are currently handling a request self.ready = 0 self.requests += 1
connection = Connection.Connection(server, conn_sock, client_addr, local_addr)
# Read and process each request found on our connection # until no requests are left or we decide to close. while self.active and connection.keepalive and not connection.aborted: connection.keepalive = 0 try: handler = server.handler(connection) handler.handle() except: lines = apply(traceback.format_exception, sys.exc_info()) self.errorLog.error("request failed for %s:\n%s" % (connection.remote_ip, ''.join(lines))) connection.aborted = 1
# Close the connection, being careful to send out whatever is still # in our buffers. If possible, try to avoid a hard close until the # client has ACKed our FIN and/or has stopped sending us data. connection.close() self.active = 0 return
|