home *** CD-ROM | disk | FTP | other *** search
/ Mac Easy 2010 May / Mac Life Ubuntu.iso / casper / filesystem.squashfs / usr / share / pyshared / checkbox / message.py < prev    next >
Encoding:
Python Source  |  2009-04-27  |  12.2 KB  |  342 lines

  1. #
  2. # This file is part of Checkbox.
  3. #
  4. # Copyright 2008 Canonical Ltd.
  5. #
  6. # Checkbox is free software: you can redistribute it and/or modify
  7. # it under the terms of the GNU General Public License as published by
  8. # the Free Software Foundation, either version 3 of the License, or
  9. # (at your option) any later version.
  10. #
  11. # Checkbox is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  14. # GNU General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU General Public License
  17. # along with Checkbox.  If not, see <http://www.gnu.org/licenses/>.
  18. #
  19. import os
  20. import logging
  21. import itertools
  22. import posixpath
  23.  
  24. from checkbox.contrib import bpickle
  25.  
  26.  
  27. HELD = "h"
  28. BROKEN = "b"
  29.  
  30. ANCIENT = 1
  31.  
  32.  
  33. class MessageStore(object):
  34.     """A message store which stores its messages in a file system hierarchy."""
  35.  
  36.     def __init__(self, persist, directory, directory_size=1000):
  37.         self._directory = directory
  38.         self._directory_size = directory_size
  39.         self._schemas = {}
  40.         self._original_persist = persist
  41.         self._persist = persist.root_at("message-store")
  42.         message_dir = self._message_dir()
  43.         if not posixpath.isdir(message_dir):
  44.             os.makedirs(message_dir)
  45.  
  46.     def commit(self):
  47.         """Save metadata to disk."""
  48.         self._original_persist.save()
  49.  
  50.     def set_accepted_types(self, types):
  51.         """Specify the types of messages that the server will expect from us.
  52.  
  53.         If messages are added to the store which are not currently
  54.         accepted, they will be saved but ignored until their type is
  55.         accepted.
  56.         """
  57.         assert type(types) in (tuple, list, set)
  58.         self._persist.set("accepted-types", sorted(set(types)))
  59.         self._reprocess_holding()
  60.  
  61.     def get_accepted_types(self):
  62.         return self._persist.get("accepted-types", ())
  63.  
  64.     def accepts(self, type):
  65.         accepted_types = self.get_accepted_types()
  66.         return not accepted_types or type in accepted_types
  67.  
  68.     def get_sequence(self):
  69.         """
  70.         Get the sequence number of the message that the server expects us to
  71.         send on the next exchange.
  72.         """
  73.         return self._persist.get("sequence", 0)
  74.  
  75.     def set_sequence(self, number):
  76.         """
  77.         Set the sequence number of the message that the server expects us to
  78.         send on the next exchange.
  79.         """
  80.         self._persist.set("sequence", number)
  81.  
  82.     def get_pending_offset(self):
  83.         return self._persist.get("pending_offset", 0)
  84.  
  85.     def set_pending_offset(self, val):
  86.         """
  87.         Set the offset into the message pool to consider assigned to the
  88.         current sequence number as returned by l{get_sequence}.
  89.         """
  90.         self._persist.set("pending_offset", val)
  91.  
  92.     def add_pending_offset(self, val):
  93.         self.set_pending_offset(self.get_pending_offset() + val)
  94.  
  95.     def count_pending_messages(self):
  96.         """Return the number of pending messages."""
  97.         return sum(1 for x in self._walk_pending_messages())
  98.  
  99.     def get_pending_messages(self, max=None):
  100.         """Get any pending messages that aren't being held, up to max."""
  101.         messages = []
  102.         for filename in self._walk_pending_messages():
  103.             if max is not None and len(messages) >= max:
  104.                 break
  105.             data = self._get_content(filename)
  106.             try:
  107.                 message = bpickle.loads(data)
  108.             except ValueError, e:
  109.                 logging.exception(e)
  110.                 self._add_flags(filename, BROKEN)
  111.             else:
  112.                 if not self.accepts(message["type"]):
  113.                     self._add_flags(filename, HELD)
  114.                 else:
  115.                     messages.append(message)
  116.         return messages
  117.  
  118.     def set_pending_flags(self, flags):
  119.         for filename in self._walk_pending_messages():
  120.             self._set_flags(filename, flags)
  121.             break
  122.  
  123.     def add_pending_flags(self, flags):
  124.         for filename in self._walk_pending_messages():
  125.             self._add_flags(filename, flags)
  126.             break
  127.  
  128.     def delete_old_messages(self):
  129.         """Delete messages which are unlikely to be needed in the future."""
  130.         filenames = self._get_sorted_filenames()
  131.         for fn in itertools.islice(self._walk_messages(exclude=HELD+BROKEN),
  132.                                    self.get_pending_offset()):
  133.             os.unlink(fn)
  134.             containing_dir = posixpath.split(fn)[0]
  135.             if not os.listdir(containing_dir):
  136.                 os.rmdir(containing_dir)
  137.  
  138.     def delete_all_messages(self):
  139.         """Remove ALL stored messages."""
  140.         self.set_pending_offset(0)
  141.         for filename in self._walk_messages():
  142.             os.unlink(filename)
  143.  
  144.     def add_schema(self, schema):
  145.         """Add a schema to be applied to messages of the given type.
  146.  
  147.         The schema must be an instance of L{landscape.schema.Message}.
  148.         """
  149.         self._schemas[schema.type] = schema
  150.  
  151.     def is_pending(self, message_id):
  152.         """Return bool indicating if C{message_id} still hasn't been delivered.
  153.  
  154.         @param message_id: Identifier returned by the L{add()} method.
  155.         """
  156.         i = 0
  157.         pending_offset = self.get_pending_offset()
  158.         for filename in self._walk_messages(exclude=BROKEN):
  159.             flags = self._get_flags(filename)
  160.             if ((HELD in flags or i >= pending_offset) and
  161.                 os.stat(filename).st_ino == message_id):
  162.                 return True
  163.             if BROKEN not in flags and HELD not in flags:
  164.                 i += 1
  165.         return False
  166.  
  167.     def add(self, message):
  168.         """Queue a message for delivery.
  169.  
  170.         @return: message_id, which is an identifier for the added message.
  171.         """
  172.         assert "type" in message
  173.         if message["type"] in self._schemas:
  174.             message = self._schemas[message["type"]].coerce(message)
  175.  
  176.         message_data = bpickle.dumps(message)
  177.  
  178.         filename = self._get_next_message_filename()
  179.  
  180.         file = open(filename + ".tmp", "w")
  181.         file.write(message_data)
  182.         file.close()
  183.         os.rename(filename + ".tmp", filename)
  184.  
  185.         if not self.accepts(message["type"]):
  186.             filename = self._set_flags(filename, HELD)
  187.  
  188.         # For now we use the inode as the message id, as it will work
  189.         # correctly even faced with holding/unholding.  It will break
  190.         # if the store is copied over for some reason, but this shouldn't
  191.         # present an issue given the current uses.  In the future we
  192.         # should have a nice transactional storage (e.g. sqlite) which
  193.         # will offer a more strong primary key.
  194.         message_id = os.stat(filename).st_ino
  195.  
  196.         return message_id
  197.  
  198.     def _get_next_message_filename(self):
  199.         message_dirs = self._get_sorted_filenames()
  200.         if message_dirs:
  201.             newest_dir = message_dirs[-1]
  202.         else:
  203.             os.makedirs(self._message_dir("0"))
  204.             newest_dir = "0"
  205.  
  206.         message_filenames = self._get_sorted_filenames(newest_dir)
  207.         if not message_filenames:
  208.             filename = self._message_dir(newest_dir, "0")
  209.         elif len(message_filenames) < self._directory_size:
  210.             filename = str(int(message_filenames[-1].split("_")[0]) + 1)
  211.             filename = self._message_dir(newest_dir, filename)
  212.         else:
  213.             newest_dir = self._message_dir(str(int(newest_dir) + 1))
  214.             os.makedirs(newest_dir)
  215.             filename = posixpath.join(newest_dir, "0")
  216.  
  217.         return filename
  218.  
  219.     def _walk_pending_messages(self):
  220.         """Walk the files which are definitely pending."""
  221.         pending_offset = self.get_pending_offset()
  222.         for i, filename in enumerate(self._walk_messages(exclude=HELD+BROKEN)):
  223.             if i >= pending_offset:
  224.                 yield filename
  225.  
  226.     def _walk_messages(self, exclude=None):
  227.         if exclude:
  228.             exclude = set(exclude)
  229.         message_dirs = self._get_sorted_filenames()
  230.         for message_dir in message_dirs:
  231.             for filename in self._get_sorted_filenames(message_dir):
  232.                 flags = set(self._get_flags(filename))
  233.                 if (not exclude or not exclude & flags):
  234.                     yield self._message_dir(message_dir, filename)
  235.  
  236.     def _get_sorted_filenames(self, dir=""):
  237.         message_files = [x for x in os.listdir(self._message_dir(dir))
  238.                          if not x.endswith(".tmp")]
  239.         message_files = sorted(message_files,
  240.             key=lambda x: int(x.split("_")[0]))
  241.         return message_files
  242.  
  243.     def _message_dir(self, *args):
  244.         return posixpath.join(self._directory, *args)
  245.  
  246.     def _get_content(self, filename):
  247.         file = open(filename)
  248.         try:
  249.             return file.read()
  250.         finally:
  251.             file.close()
  252.  
  253.     def _reprocess_holding(self):
  254.         """
  255.         Unhold accepted messages left behind, and hold unaccepted
  256.         pending messages.
  257.         """
  258.         offset = 0
  259.         pending_offset = self.get_pending_offset()
  260.         for old_filename in self._walk_messages():
  261.             flags = self._get_flags(old_filename)
  262.             try:
  263.                 message = bpickle.loads(self._get_content(old_filename))
  264.             except ValueError, e:
  265.                 logging.exception(e)
  266.                 if HELD not in flags:
  267.                     offset += 1
  268.             else:
  269.                 accepted = self.accepts(message["type"])
  270.                 if HELD in flags:
  271.                     if accepted:
  272.                         new_filename = self._get_next_message_filename()
  273.                         os.rename(old_filename, new_filename)
  274.                         self._set_flags(new_filename, set(flags)-set(HELD))
  275.                 else:
  276.                     if not accepted and offset >= pending_offset:
  277.                         self._set_flags(old_filename, set(flags)|set(HELD))
  278.                     offset += 1
  279.  
  280.     def _get_flags(self, path):
  281.         basename = posixpath.basename(path)
  282.         if "_" in basename:
  283.             return basename.split("_")[1]
  284.         return ""
  285.  
  286.     def _set_flags(self, path, flags):
  287.         dirname, basename = posixpath.split(path)
  288.         new_path = posixpath.join(dirname, basename.split("_")[0])
  289.         if flags:
  290.             new_path += "_"+"".join(sorted(set(flags)))
  291.         os.rename(path, new_path)
  292.         return new_path
  293.  
  294.     def _add_flags(self, path, flags):
  295.         self._set_flags(path, self._get_flags(path)+flags)
  296.  
  297.  
  298. def got_next_sequence(message_store, next_sequence):
  299.     """Our peer has told us what it expects our next message's sequence to be.
  300.  
  301.     Call this with the message store and sequence number that the peer
  302.     wants next; this will do various things based on what *this* side
  303.     has in its outbound queue store.
  304.  
  305.     1. The peer expects a sequence greater than what we last
  306.        sent. This is the common case and generally it should be
  307.        expecting last_sent_sequence+len(messages_sent)+1.
  308.  
  309.     2. The peer expects a sequence number our side has already sent,
  310.        and we no longer have that message. In this case, just send
  311.        *all* messages we have, including the previous generation,
  312.        starting at the sequence number the peer expects (meaning that
  313.        messages have probably been lost).
  314.  
  315.     3. The peer expects a sequence number we already sent, and we
  316.        still have that message cached. In this case, we send starting
  317.        from that message.
  318.  
  319.     If the next sequence from the server refers to a message older than
  320.     we have, then L{ANCIENT} will be returned.
  321.     """
  322.     ret = None
  323.     old_sequence = message_store.get_sequence()
  324.     if next_sequence > old_sequence:
  325.         message_store.delete_old_messages()
  326.         pending_offset = next_sequence - old_sequence
  327.     elif next_sequence < (old_sequence - message_store.get_pending_offset()):
  328.         # "Ancient": The other side wants messages we don't have,
  329.         # so let's just reset our counter to what it expects.
  330.         pending_offset = 0
  331.         ret = ANCIENT
  332.     else:
  333.         # No messages transferred, or
  334.         # "Old": We'll try to send these old messages that the
  335.         # other side still wants.
  336.         pending_offset = (message_store.get_pending_offset() + next_sequence
  337.                           - old_sequence)
  338.  
  339.     message_store.set_pending_offset(pending_offset)
  340.     message_store.set_sequence(next_sequence)
  341.     return ret
  342.