home *** CD-ROM | disk | FTP | other *** search
/ Chip 2006 June / CHIP 2006-06.2.iso / program / freeware / Democracy-0.8.2.exe / xulrunner / python / dl_daemon / daemon.py < prev    next >
Encoding:
Python Source  |  2006-04-10  |  8.2 KB  |  244 lines

  1. from dl_daemon import command
  2. import os
  3. import cPickle
  4. import socket
  5. import traceback
  6. from threading import Lock, Thread, Event
  7. from time import sleep
  8. import tempfile
  9.  
  10. class DaemonError(Exception):
  11.     """Exception while communicating to a daemon (either controller or
  12.     downloader).
  13.     """
  14.     pass
  15.  
  16. def launchDownloadDaemon(oldpid, port):
  17.     import app
  18.     delegate = app.Controller.instance.getBackendDelegate()
  19.     delegate.launchDownloadDaemon(oldpid, port)
  20.     
  21. def getDataFile():
  22.     try:
  23.         uid = os.getuid()
  24.     except:
  25.         # This works for win32, where we don't have getuid()
  26.         uid = os.environ['USERNAME']
  27.         
  28.     return os.path.join(tempfile.gettempdir(), 'Democracy_Download_Daemon_%s.txt' % uid)
  29.  
  30. pidfile = None
  31. def writePid(pid):
  32.     """Write out our pid.
  33.  
  34.     This method locks the pid file until the downloader exits.  On windows
  35.     this is achieved by keeping the file open.  On Unix/OS X, we use the
  36.     fcntl.lockf() function.
  37.     """
  38.  
  39.     global pidfile
  40.     # NOTE: we want to open the file in a mode the standard open() doesn't
  41.     # support.  We want to create the file if nessecary, but not truncate it
  42.     # if it's already around.  We can't truncate it because on unix we haven't
  43.     # locked the file yet.
  44.     fd = os.open(getDataFile(), os.O_WRONLY | os.O_CREAT)
  45.     pidfile = os.fdopen(fd, 'w')
  46.     try:
  47.         import fcntl
  48.     except:
  49.         pass
  50.     else:
  51.         fcntl.lockf(pidfile, fcntl.LOCK_EX | fcntl.LOCK_NB)
  52.     pidfile.write("%s\n" % pid)
  53.     pidfile.flush()
  54.     # NOTE: There may be extra data after the line we write left around from
  55.     # prevous writes to the pid file.  This is fine since readPid() only reads
  56.     # the 1st line.
  57.     #
  58.     # NOTE 2: we purposely don't close the file, to achieve locking on
  59.     # windows.
  60.  
  61. def readPid():
  62.     try:
  63.         f = open(getDataFile(), "r")
  64.     except IOError:
  65.         return None
  66.     try:
  67.         try:
  68.             return int(f.readline())
  69.         except ValueError:
  70.             return None
  71.     finally:
  72.         f.close()
  73.  
  74. lastDaemon = None
  75.  
  76. class Daemon:
  77.     def __init__(self):
  78.         global lastDaemon
  79.         lastDaemon = self
  80.         self.shutdown = False
  81.         self.waitingCommands = {}
  82.         self.returnValues = {}
  83.         self.sendLock = Lock() # For serializing data sent over the network
  84.         self.globalLock = Lock() # For serializing access to global object data
  85.         self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  86.         self.socket.settimeout(None)
  87.     
  88.     def listenLoop(self):
  89.         while True:
  90.             #print "Top of dl daemon listen loop"
  91.             comm = cPickle.load(self.stream)
  92.             #print "dl daemon got object %s %s" % (str(comm), comm.id)
  93.             # Process commands in their own thread so actions that
  94.             # need to send stuff over the wire don't hang
  95.             # FIXME: We shouldn't spawn a thread for every command!
  96.             t = Thread(target=self.processCommand,
  97.                     args=(comm,),
  98.                     name="command processor")
  99.             t.setDaemon(False)
  100.             t.start()
  101.             #FIXME This is a bit of a hack
  102.             if isinstance(comm, command.ShutDownCommand):
  103.                 # wait for the command thread to send our reply along the
  104.                 # socket before quitting
  105.                 t.join()
  106.                 break
  107.  
  108.     def processCommand(self, comm):
  109.         if comm.orig:
  110.             comm.setDaemon(self)
  111.             comm.setReturnValue(comm.action())
  112.             comm.send(block=False)
  113.         else:
  114.             self.processReturnValue(comm)
  115.  
  116.     def processReturnValue(self, comm):
  117.         self.globalLock.acquire()
  118.         try:
  119.             if self.waitingCommands.has_key(comm.id):
  120.                 event = self.waitingCommands[comm.id]
  121.                 del self.waitingCommands[comm.id]
  122.                 self.returnValues[comm.id] = comm.getReturnValue()
  123.             else:
  124.                 return
  125.         finally:
  126.             self.globalLock.release()
  127.         event.set()
  128.  
  129.     def waitForReturn(self, comm):
  130.         self.globalLock.acquire()
  131.         try:
  132.             if self.waitingCommands.has_key(comm.id):
  133.                 event = self.waitingCommands[comm.id]
  134.             elif self.returnValues.has_key(comm.id):
  135.                 ret = self.returnValues[comm.id]
  136.                 del self.returnValues[comm.id]
  137.                 return ret
  138.         finally:
  139.             self.globalLock.release()
  140.         event.wait(30)
  141.         if not event.isSet():
  142.             raise DaemonError("timeout waiting for response to %s" % comm)
  143.         self.globalLock.acquire()
  144.         try:
  145.             ret = self.returnValues[comm.id]
  146.             del self.returnValues[comm.id]
  147.             return ret
  148.         finally:
  149.             self.globalLock.release()
  150.             
  151.     def addToWaitingList(self, comm):
  152.         self.globalLock.acquire()
  153.         try:
  154.             self.waitingCommands[comm.id] = Event()
  155.         finally:
  156.             self.globalLock.release()
  157.  
  158.     def send(self, comm, block):
  159.         if block:
  160.             self.addToWaitingList(comm)
  161.         raw = cPickle.dumps(comm, cPickle.HIGHEST_PROTOCOL)
  162.         self.sendLock.acquire()
  163.         try:
  164.             cPickle.dump(comm, self.stream, cPickle.HIGHEST_PROTOCOL)
  165.             self.stream.flush() # If I trusted Python sockets to be
  166.                                 # properly multithreaded, I'd put this
  167.                                 # below the finally block. I don't.
  168.         finally:
  169.             self.sendLock.release()
  170.         if block:
  171.             return self.waitForReturn(comm)
  172.  
  173.  
  174. class DownloaderDaemon(Daemon):
  175.     def __init__(self, port):
  176.         # before anything else, write out our PID 
  177.         writePid(os.getpid())
  178.         # connect to the controller and start our listen loop
  179.         Daemon.__init__(self)
  180.         self.socket.connect(('127.0.0.1', port))
  181.         self.stream = self.socket.makefile("r+b")
  182.         print "Downloader Daemon: Connected on port %s" % port
  183.         t = Thread(target = self.downloaderLoop, name = "Downloader Loop")
  184.         t.start()
  185.  
  186.     def downloaderLoop(self):
  187.         try:
  188.             self.listenLoop()
  189.             print "Downloader listen loop completed"
  190.         finally:
  191.             self.shutdown = True
  192.             from dl_daemon import download
  193.             download.shutDown()
  194.  
  195. class ControllerDaemon(Daemon):
  196.     def __init__(self):
  197.         Daemon.__init__(self)
  198.         # open a port and start our listen loop
  199.         self.socket.bind( ('127.0.0.1', 0) )
  200.         (myAddr, myPort) = self.socket.getsockname()
  201.         print "Controller Daemon: Listening on %s %s" % (myAddr, myPort)
  202.         self.port = myPort
  203.         self.socket.listen(63)
  204.         self.ready = Event()
  205.         t = Thread(target = self.clientLoop, name = "Controller Loop")
  206.         t.start()
  207.         self.ready.wait()
  208.  
  209.     def send(self, comm, block):
  210.         # Don't let traffic through until tho downloader child process is
  211.         # ready
  212.         if comm.orig and not self.ready.isSet():
  213.             print 'DTV: Delaying send of %s %s' % (str(comm), comm.id)
  214.             if block:
  215.                 self.ready.wait()
  216.             else:
  217.                 raise socket.error("server not ready")
  218.         return Daemon.send(self, comm, block)
  219.  
  220.     def clientLoop(self):
  221.         try:
  222.             while True:
  223.                 self.connectToDownloader()
  224.                 try:
  225.                     self.listenLoop()
  226.                     print "Controller listen loop completed"
  227.                     break
  228.                 except socket.error:
  229.                     # On socket errors, the downloader dies, but the
  230.                     # controller stays alive and restarts the downloader
  231.                     self.ready.clear()
  232.                     print "Socket exception in the controller daemon"
  233.                     traceback.print_exc()
  234.         finally:
  235.             self.shutDown = True
  236.  
  237.     def connectToDownloader(self):
  238.         # launch a new daemon
  239.         launchDownloadDaemon(readPid(), self.port)
  240.         # wait for the daemon to connect to our port
  241.         (conn, address) = self.socket.accept()
  242.         conn.settimeout(None)
  243.         self.stream = conn.makefile("r+b")
  244.