home *** CD-ROM | disk | FTP | other *** search
/ Mac Easy 2010 May / Mac Life Ubuntu.iso / casper / filesystem.squashfs / usr / lib / python2.6 / dist-packages / checkbox / message.pyc (.txt) < prev    next >
Encoding:
Python Compiled Bytecode  |  2009-10-12  |  12.9 KB  |  362 lines

  1. # Source Generated with Decompyle++
  2. # File: in.pyc (Python 2.6)
  3.  
  4. import os
  5. import logging
  6. import itertools
  7. import posixpath
  8. from checkbox.contrib import bpickle
  9. HELD = 'h'
  10. BROKEN = 'b'
  11. ANCIENT = 1
  12.  
  13. class MessageStore(object):
  14.     '''A message store which stores its messages in a file system hierarchy.'''
  15.     
  16.     def __init__(self, persist, directory, directory_size = 1000):
  17.         self._directory = directory
  18.         self._directory_size = directory_size
  19.         self._schemas = { }
  20.         self._original_persist = persist
  21.         self._persist = persist.root_at('message-store')
  22.         message_dir = self._message_dir()
  23.         if not posixpath.isdir(message_dir):
  24.             os.makedirs(message_dir)
  25.         
  26.  
  27.     
  28.     def commit(self):
  29.         '''Save metadata to disk.'''
  30.         self._original_persist.save()
  31.  
  32.     
  33.     def set_accepted_types(self, types):
  34.         '''Specify the types of messages that the server will expect from us.
  35.  
  36.         If messages are added to the store which are not currently
  37.         accepted, they will be saved but ignored until their type is
  38.         accepted.
  39.         '''
  40.         if not type(types) in (tuple, list, set):
  41.             raise AssertionError
  42.         self._persist.set('accepted-types', sorted(set(types)))
  43.         self._reprocess_holding()
  44.  
  45.     
  46.     def get_accepted_types(self):
  47.         return self._persist.get('accepted-types', ())
  48.  
  49.     
  50.     def accepts(self, type):
  51.         accepted_types = self.get_accepted_types()
  52.         if not not accepted_types:
  53.             pass
  54.         return type in accepted_types
  55.  
  56.     
  57.     def get_sequence(self):
  58.         '''
  59.         Get the sequence number of the message that the server expects us to
  60.         send on the next exchange.
  61.         '''
  62.         return self._persist.get('sequence', 0)
  63.  
  64.     
  65.     def set_sequence(self, number):
  66.         '''
  67.         Set the sequence number of the message that the server expects us to
  68.         send on the next exchange.
  69.         '''
  70.         self._persist.set('sequence', number)
  71.  
  72.     
  73.     def get_pending_offset(self):
  74.         return self._persist.get('pending_offset', 0)
  75.  
  76.     
  77.     def set_pending_offset(self, val):
  78.         '''
  79.         Set the offset into the message pool to consider assigned to the
  80.         current sequence number as returned by l{get_sequence}.
  81.         '''
  82.         self._persist.set('pending_offset', val)
  83.  
  84.     
  85.     def add_pending_offset(self, val):
  86.         self.set_pending_offset(self.get_pending_offset() + val)
  87.  
  88.     
  89.     def count_pending_messages(self):
  90.         '''Return the number of pending messages.'''
  91.         return sum((lambda .0: for x in .0:
  92. 1)(self._walk_pending_messages()))
  93.  
  94.     
  95.     def get_pending_messages(self, max = None):
  96.         """Get any pending messages that aren't being held, up to max."""
  97.         messages = []
  98.         for filename in self._walk_pending_messages():
  99.             if max is not None and len(messages) >= max:
  100.                 break
  101.             
  102.             data = self._get_content(filename)
  103.             
  104.             try:
  105.                 message = bpickle.loads(data)
  106.             except ValueError:
  107.                 e = None
  108.                 logging.exception(e)
  109.                 self._add_flags(filename, BROKEN)
  110.                 continue
  111.  
  112.             if not self.accepts(message['type']):
  113.                 self._add_flags(filename, HELD)
  114.                 continue
  115.             messages.append(message)
  116.         
  117.         return messages
  118.  
  119.     
  120.     def set_pending_flags(self, flags):
  121.         for filename in self._walk_pending_messages():
  122.             self._set_flags(filename, flags)
  123.         
  124.  
  125.     
  126.     def add_pending_flags(self, flags):
  127.         for filename in self._walk_pending_messages():
  128.             self._add_flags(filename, flags)
  129.         
  130.  
  131.     
  132.     def delete_old_messages(self):
  133.         '''Delete messages which are unlikely to be needed in the future.'''
  134.         filenames = self._get_sorted_filenames()
  135.         for fn in itertools.islice(self._walk_messages(exclude = HELD + BROKEN), self.get_pending_offset()):
  136.             os.unlink(fn)
  137.             containing_dir = posixpath.split(fn)[0]
  138.             if not os.listdir(containing_dir):
  139.                 os.rmdir(containing_dir)
  140.                 continue
  141.         
  142.  
  143.     
  144.     def delete_all_messages(self):
  145.         '''Remove ALL stored messages.'''
  146.         self.set_pending_offset(0)
  147.         for filename in self._walk_messages():
  148.             os.unlink(filename)
  149.         
  150.  
  151.     
  152.     def add_schema(self, schema):
  153.         '''Add a schema to be applied to messages of the given type.
  154.  
  155.         The schema must be an instance of L{landscape.schema.Message}.
  156.         '''
  157.         self._schemas[schema.type] = schema
  158.  
  159.     
  160.     def is_pending(self, message_id):
  161.         """Return bool indicating if C{message_id} still hasn't been delivered.
  162.  
  163.         @param message_id: Identifier returned by the L{add()} method.
  164.         """
  165.         i = 0
  166.         pending_offset = self.get_pending_offset()
  167.         for filename in self._walk_messages(exclude = BROKEN):
  168.             flags = self._get_flags(filename)
  169.             if (HELD in flags or i >= pending_offset) and os.stat(filename).st_ino == message_id:
  170.                 return True
  171.             if BROKEN not in flags and HELD not in flags:
  172.                 i += 1
  173.                 continue
  174.             os.stat(filename).st_ino == message_id
  175.         
  176.         return False
  177.  
  178.     
  179.     def add(self, message):
  180.         '''Queue a message for delivery.
  181.  
  182.         @return: message_id, which is an identifier for the added message.
  183.         '''
  184.         if not 'type' in message:
  185.             raise AssertionError
  186.         if message['type'] in self._schemas:
  187.             message = self._schemas[message['type']].coerce(message)
  188.         
  189.         message_data = bpickle.dumps(message)
  190.         filename = self._get_next_message_filename()
  191.         file = open(filename + '.tmp', 'w')
  192.         file.write(message_data)
  193.         file.close()
  194.         os.rename(filename + '.tmp', filename)
  195.         if not self.accepts(message['type']):
  196.             filename = self._set_flags(filename, HELD)
  197.         
  198.         message_id = os.stat(filename).st_ino
  199.         return message_id
  200.  
  201.     
  202.     def _get_next_message_filename(self):
  203.         message_dirs = self._get_sorted_filenames()
  204.         if message_dirs:
  205.             newest_dir = message_dirs[-1]
  206.         else:
  207.             os.makedirs(self._message_dir('0'))
  208.             newest_dir = '0'
  209.         message_filenames = self._get_sorted_filenames(newest_dir)
  210.         if not message_filenames:
  211.             filename = self._message_dir(newest_dir, '0')
  212.         elif len(message_filenames) < self._directory_size:
  213.             filename = str(int(message_filenames[-1].split('_')[0]) + 1)
  214.             filename = self._message_dir(newest_dir, filename)
  215.         else:
  216.             newest_dir = self._message_dir(str(int(newest_dir) + 1))
  217.             os.makedirs(newest_dir)
  218.             filename = posixpath.join(newest_dir, '0')
  219.         return filename
  220.  
  221.     
  222.     def _walk_pending_messages(self):
  223.         '''Walk the files which are definitely pending.'''
  224.         pending_offset = self.get_pending_offset()
  225.         for i, filename in enumerate(self._walk_messages(exclude = HELD + BROKEN)):
  226.             if i >= pending_offset:
  227.                 yield filename
  228.                 continue
  229.         
  230.  
  231.     
  232.     def _walk_messages(self, exclude = None):
  233.         if exclude:
  234.             exclude = set(exclude)
  235.         
  236.         message_dirs = self._get_sorted_filenames()
  237.         for message_dir in message_dirs:
  238.             for filename in self._get_sorted_filenames(message_dir):
  239.                 flags = set(self._get_flags(filename))
  240.                 if not exclude or not (exclude & flags):
  241.                     yield self._message_dir(message_dir, filename)
  242.                     continue
  243.             
  244.         
  245.  
  246.     
  247.     def _get_sorted_filenames(self, dir = ''):
  248.         message_files = _[1]
  249.         message_files = sorted(message_files, key = (lambda x: int(x.split('_')[0])))
  250.         return message_files
  251.  
  252.     
  253.     def _message_dir(self, *args):
  254.         return posixpath.join(self._directory, *args)
  255.  
  256.     
  257.     def _get_content(self, filename):
  258.         file = open(filename)
  259.         
  260.         try:
  261.             return file.read()
  262.         finally:
  263.             file.close()
  264.  
  265.  
  266.     
  267.     def _reprocess_holding(self):
  268.         '''
  269.         Unhold accepted messages left behind, and hold unaccepted
  270.         pending messages.
  271.         '''
  272.         offset = 0
  273.         pending_offset = self.get_pending_offset()
  274.         for old_filename in self._walk_messages():
  275.             flags = self._get_flags(old_filename)
  276.             
  277.             try:
  278.                 message = bpickle.loads(self._get_content(old_filename))
  279.             except ValueError:
  280.                 e = None
  281.                 logging.exception(e)
  282.                 if HELD not in flags:
  283.                     offset += 1
  284.                 
  285.                 HELD not in flags
  286.  
  287.             accepted = self.accepts(message['type'])
  288.             if HELD in flags:
  289.                 if accepted:
  290.                     new_filename = self._get_next_message_filename()
  291.                     os.rename(old_filename, new_filename)
  292.                     self._set_flags(new_filename, set(flags) - set(HELD))
  293.                 
  294.             accepted
  295.             if not accepted and offset >= pending_offset:
  296.                 self._set_flags(old_filename, set(flags) | set(HELD))
  297.             
  298.             offset += 1
  299.         
  300.  
  301.     
  302.     def _get_flags(self, path):
  303.         basename = posixpath.basename(path)
  304.         if '_' in basename:
  305.             return basename.split('_')[1]
  306.         return ''
  307.  
  308.     
  309.     def _set_flags(self, path, flags):
  310.         (dirname, basename) = posixpath.split(path)
  311.         new_path = posixpath.join(dirname, basename.split('_')[0])
  312.         if flags:
  313.             new_path += '_' + ''.join(sorted(set(flags)))
  314.         
  315.         os.rename(path, new_path)
  316.         return new_path
  317.  
  318.     
  319.     def _add_flags(self, path, flags):
  320.         self._set_flags(path, self._get_flags(path) + flags)
  321.  
  322.  
  323.  
  324. def got_next_sequence(message_store, next_sequence):
  325.     """Our peer has told us what it expects our next message's sequence to be.
  326.  
  327.     Call this with the message store and sequence number that the peer
  328.     wants next; this will do various things based on what *this* side
  329.     has in its outbound queue store.
  330.  
  331.     1. The peer expects a sequence greater than what we last
  332.        sent. This is the common case and generally it should be
  333.        expecting last_sent_sequence+len(messages_sent)+1.
  334.  
  335.     2. The peer expects a sequence number our side has already sent,
  336.        and we no longer have that message. In this case, just send
  337.        *all* messages we have, including the previous generation,
  338.        starting at the sequence number the peer expects (meaning that
  339.        messages have probably been lost).
  340.  
  341.     3. The peer expects a sequence number we already sent, and we
  342.        still have that message cached. In this case, we send starting
  343.        from that message.
  344.  
  345.     If the next sequence from the server refers to a message older than
  346.     we have, then L{ANCIENT} will be returned.
  347.     """
  348.     ret = None
  349.     old_sequence = message_store.get_sequence()
  350.     if next_sequence > old_sequence:
  351.         message_store.delete_old_messages()
  352.         pending_offset = next_sequence - old_sequence
  353.     elif next_sequence < old_sequence - message_store.get_pending_offset():
  354.         pending_offset = 0
  355.         ret = ANCIENT
  356.     else:
  357.         pending_offset = message_store.get_pending_offset() + next_sequence - old_sequence
  358.     message_store.set_pending_offset(pending_offset)
  359.     message_store.set_sequence(next_sequence)
  360.     return ret
  361.  
  362.