home *** CD-ROM | disk | FTP | other *** search
/ Mac Easy 2010 May / Mac Life Ubuntu.iso / casper / filesystem.squashfs / usr / share / python-support / python-rdflib / rdflib / store / BerkeleyDB.py < prev    next >
Encoding:
Python Source  |  2007-04-04  |  20.6 KB  |  571 lines

  1. import warnings
  2. warnings.warn("This Store implementation is still being debugged. It is currently running out of db lockers after adding around 2k triples.")
  3.  
  4. from rdflib.store import Store, VALID_STORE, CORRUPTED_STORE, NO_STORE, UNKNOWN
  5. from rdflib.URIRef import URIRef
  6. from bsddb import db
  7. from os import mkdir, rmdir, makedirs
  8. from os.path import exists, abspath, join
  9. from urllib import pathname2url
  10. from threading import Thread
  11. from time import sleep, time
  12. import logging
  13.  
  14. SUPPORT_MULTIPLE_STORE_ENVIRON = False
  15.  
  16. _logger = logging.getLogger(__name__)
  17.  
  18.  
  19. class BerkeleyDB(Store):
  20.     """
  21.     A transaction-capable BerkeleyDB implementation
  22.     The major difference are:
  23.       - a dbTxn attribute which is the transaction object used for all bsddb databases
  24.       - All operations (put,delete,get) take the dbTxn instance
  25.       - The actual directory used for the bsddb persistence is the name of the identifier as a subdirectory of the 'path'
  26.       
  27.     """
  28.     context_aware = True
  29.     formula_aware = True
  30.     transaction_aware = True
  31.     def __init__(self, configuration=None, identifier=None):
  32.         self.__open = False
  33.         self.__identifier = identifier and identifier or 'home'
  34.         super(BerkeleyDB, self).__init__(configuration)
  35.         self.configuration = configuration
  36.         self._loads = self.node_pickler.loads
  37.         self._dumps = self.node_pickler.dumps
  38.         #This state is needed to handle all possible combinations of calls to tx methods (close/rollback/commit)
  39.         self.__dbTxn = None
  40.  
  41.     def __get_identifier(self):
  42.         return self.__identifier
  43.     identifier = property(__get_identifier)
  44.  
  45.     def destroy(self, configuration):
  46.         """
  47.         Destroy the underlying bsddb persistence for this store
  48.         """
  49.         if SUPPORT_MULTIPLE_STORE_ENVIRON:
  50.             fullDir = join(configuration,self.identifier)
  51.         else:
  52.             fullDir = configuration
  53.         if exists(configuration):
  54.             #From bsddb docs:
  55.             #A DB_ENV handle that has already been used to open an environment 
  56.             #should not be used to call the DB_ENV->remove function; a new DB_ENV handle should be created for that purpose.
  57.             self.close()
  58.             db.DBEnv().remove(fullDir,db.DB_FORCE)
  59.  
  60.     def open(self, path, create=True):
  61.         if self.__open:
  62.             return
  63.         homeDir = path
  64.         #NOTE: The identifeir is appended to the path as the location for the db
  65.         #This provides proper isolation for stores which have the same path but different identifiers
  66.         if SUPPORT_MULTIPLE_STORE_ENVIRON:
  67.             fullDir = join(homeDir,self.identifier)
  68.         else:
  69.             fullDir = homeDir
  70.         envsetflags  = db.DB_CDB_ALLDB
  71.         envflags = db.DB_INIT_MPOOL | db.DB_INIT_LOCK | db.DB_THREAD | db.DB_INIT_TXN | db.DB_RECOVER
  72.         if not exists(fullDir):
  73.             if create==True:
  74.                 makedirs(fullDir)
  75.                 self.create(path)
  76.             else:                
  77.                 return NO_STORE
  78.         if self.__identifier is None:
  79.             self.__identifier = URIRef(pathname2url(abspath(fullDir)))
  80.         self.db_env = db_env = db.DBEnv()
  81.         db_env.set_cachesize(0, 1024*1024*50) # TODO
  82.         #db_env.set_lg_max(1024*1024)
  83.         #db_env.set_flags(envsetflags, 1)
  84.         db_env.open(fullDir, envflags | db.DB_CREATE,0)
  85.  
  86.         #Transaction object
  87.         self.dbTxn = db_env.txn_begin()
  88.  
  89.         self.__open = True
  90.  
  91.         dbname = None
  92.         dbtype = db.DB_BTREE
  93.         dbopenflags = db.DB_THREAD
  94.  
  95.         dbmode = 0660
  96.         dbsetflags   = 0
  97.  
  98.         # create and open the DBs
  99.         self.__indicies = [None,] * 3
  100.         self.__indicies_info = [None,] * 3
  101.         for i in xrange(0, 3):
  102.             index_name = to_key_func(i)(("s", "p", "o"), "c")
  103.             index = db.DB(db_env)
  104.             index.set_flags(dbsetflags)
  105.             index.open(index_name, dbname, dbtype, dbopenflags|db.DB_CREATE, dbmode,txn=self.dbTxn)
  106.             self.__indicies[i] = index
  107.             self.__indicies_info[i] = (index, to_key_func(i), from_key_func(i))
  108.  
  109.         lookup = {}
  110.         for i in xrange(0, 8):
  111.             results = []
  112.             for start in xrange(0, 3):
  113.                 score = 1
  114.                 len = 0
  115.                 for j in xrange(start, start+3):
  116.                     if i & (1<<(j%3)):
  117.                         score = score << 1
  118.                         len += 1
  119.                     else:
  120.                         break
  121.                 tie_break = 2-start
  122.                 results.append(((score, tie_break), start, len))
  123.  
  124.             results.sort()
  125.             score, start, len = results[-1]
  126.  
  127.             def get_prefix_func(start, end):
  128.                 def get_prefix(triple, context):
  129.                     if context is None:
  130.                         yield ""
  131.                     else:
  132.                         yield context
  133.                     i = start
  134.                     while i<end:
  135.                         yield triple[i%3]
  136.                         i += 1
  137.                     yield ""
  138.                 return get_prefix
  139.  
  140.             lookup[i] = (self.__indicies[start], get_prefix_func(start, start + len), from_key_func(start), results_from_key_func(start, self._from_string))
  141.  
  142.  
  143.         self.__lookup_dict = lookup
  144.  
  145.         self.__contexts = db.DB(db_env)
  146.         self.__contexts.set_flags(dbsetflags)
  147.         self.__contexts.open("contexts", dbname, dbtype, dbopenflags|db.DB_CREATE, dbmode,txn=self.dbTxn)
  148.  
  149.         self.__namespace = db.DB(db_env)
  150.         self.__namespace.set_flags(dbsetflags)
  151.         self.__namespace.open("namespace", dbname, dbtype, dbopenflags|db.DB_CREATE, dbmode,txn=self.dbTxn)
  152.  
  153.         self.__prefix = db.DB(db_env)
  154.         self.__prefix.set_flags(dbsetflags)
  155.         self.__prefix.open("prefix", dbname, dbtype, dbopenflags|db.DB_CREATE, dbmode,txn=self.dbTxn)
  156.  
  157.         self.__i2k = db.DB(db_env)
  158.         self.__i2k.set_flags(dbsetflags)
  159.         self.__i2k.open("i2k", dbname, db.DB_HASH, dbopenflags|db.DB_CREATE, dbmode,txn=self.dbTxn)
  160.  
  161.         self.__needs_sync = False
  162.         t = Thread(target=self.__sync_run)
  163.         t.setDaemon(True)
  164.         t.start()
  165.         self.__sync_thread = t
  166.         return VALID_STORE
  167.  
  168.     def __sync_run(self):
  169.         min_seconds, max_seconds = 10, 300
  170.         while self.__open:
  171.             if self.__needs_sync:
  172.                 t0 = t1 = time()
  173.                 self.__needs_sync = False
  174.                 while self.__open:
  175.                     sleep(.1)
  176.                     if self.__needs_sync:
  177.                         t1 = time()
  178.                         self.__needs_sync = False
  179.                     if time()-t1 > min_seconds or time()-t0 > max_seconds:
  180.                         self.__needs_sync = False
  181.                         _logger.debug("sync")
  182.                         self.sync()
  183.                         break
  184.             else:
  185.                 sleep(1)
  186.  
  187.     def sync(self):
  188.         if self.__open:
  189.             for i in self.__indicies:
  190.                 i.sync()
  191.             self.__contexts.sync()
  192.             self.__namespace.sync()
  193.             self.__prefix.sync()
  194.             self.__i2k.sync()
  195.             #self.__k2i.sync()
  196.  
  197.     #Transactional interfaces
  198.     def commit(self):
  199.         """
  200.         Bsddb tx objects cannot be reused after commit 
  201.         """         
  202.         if self.dbTxn:
  203.             _logger.debug("commiting")
  204.             self.dbTxn.commit(0)
  205.             #Note a new transaction handle is created to support
  206.             #subsequent commit calls (bsddb doesn't support multiple commits by the same
  207.             #tx handle)
  208.             self.dbTxn = self.db_env.txn_begin()
  209.         else:
  210.             _logger.warning("No transaction to commit")
  211.  
  212.     def rollback(self):
  213.         """
  214.         Bsddb tx objects cannot be reused after commit
  215.         """           
  216.         if self.dbTxn is not None:
  217.             _logger.debug("rollingback")
  218.             self.dbTxn.abort()
  219.             #The dbTxn is set to None to indicate to a susequent close
  220.             #call that a rollback is not needed
  221.             self.dbTxn = None
  222.         else:
  223.             _logger.warning("No transaction to rollback")
  224.         
  225.     def __del__(self):
  226.         """
  227.         Redirects python's native garbage collection into Store.close 
  228.         """
  229.         self.close()    
  230.  
  231.     def close(self, commit_pending_transaction=False):
  232.         """
  233.         Properly handles transactions explicitely (with parameter) or by default
  234.         """
  235.         if not self.__open:
  236.             return
  237.         if self.dbTxn:
  238.             if not commit_pending_transaction:
  239.                 self.rollback()
  240.             else:
  241.                 self.commit()            
  242.                 self.dbTxn.abort() # abort the new transaction commit just started since we're closing.
  243.         self.__open = False
  244.         self.__sync_thread.join()
  245.         for i in self.__indicies:
  246.             i.close()
  247.         self.__contexts.close()
  248.         self.__namespace.close()
  249.         self.__prefix.close()
  250.         self.__i2k.close()
  251.         #self.__k2i.close()      
  252.         self.db_env.close()
  253.  
  254.     def add(self, (subject, predicate, object_), context, quoted=False):
  255.         """\
  256.         Add a triple to the store of triples.
  257.         """
  258.         assert self.__open, "The Store must be open."
  259.         assert context!=self, "Can not add triple directly to store"
  260.         Store.add(self, (subject, predicate, object_), context, quoted)
  261.  
  262.         _to_string = self._to_string
  263.  
  264.         s = _to_string(subject)
  265.         p = _to_string(predicate)
  266.         o = _to_string(object_)
  267.         c = _to_string(context)
  268.  
  269.         cspo, cpos, cosp = self.__indicies
  270.  
  271.         value = cspo.get("%s^%s^%s^%s^" % (c, s, p, o),txn=self.dbTxn)
  272.         if value is None:
  273.             self.__contexts.put(c, "",self.dbTxn)
  274.  
  275.             contexts_value = cspo.get("%s^%s^%s^%s^" % ("", s, p, o),txn=self.dbTxn) or ""
  276.             contexts = set(contexts_value.split("^"))
  277.             contexts.add(c)
  278.             contexts_value = "^".join(contexts)
  279.             assert contexts_value!=None
  280.  
  281.             cspo.put("%s^%s^%s^%s^" % (c, s, p, o), "",self.dbTxn)
  282.             cpos.put("%s^%s^%s^%s^" % (c, p, o, s), "",self.dbTxn)
  283.             cosp.put("%s^%s^%s^%s^" % (c, o, s, p), "",self.dbTxn)
  284.             if not quoted:
  285.                 cspo.put("%s^%s^%s^%s^" % ("", s, p, o), contexts_value,self.dbTxn)
  286.                 cpos.put("%s^%s^%s^%s^" % ("", p, o, s), contexts_value,self.dbTxn)
  287.                 cosp.put("%s^%s^%s^%s^" % ("", o, s, p), contexts_value,self.dbTxn)
  288.  
  289.             self.__needs_sync = True
  290.  
  291.     def __remove(self, (s, p, o), c, quoted=False):
  292.         cspo, cpos, cosp = self.__indicies
  293.         contexts_value = cspo.get("^".join(("", s, p, o, "")),txn=self.dbTxn) or ""
  294.         contexts = set(contexts_value.split("^"))
  295.         contexts.discard(c)
  296.         contexts_value = "^".join(contexts)
  297.         for i, _to_key, _from_key in self.__indicies_info:
  298.             i.delete(_to_key((s, p, o), c),txn=self.dbTxn)
  299.         if not quoted:
  300.             if contexts_value:
  301.                 for i, _to_key, _from_key in self.__indicies_info:
  302.                     i.put(_to_key((s, p, o), ""), contexts_value,self.dbTxn)
  303.             else:
  304.                 for i, _to_key, _from_key in self.__indicies_info:
  305.                     try:
  306.                         i.delete(_to_key((s, p, o), ""),txn=self.dbTxn)
  307.                     except db.DBNotFoundError, e: 
  308.                         pass # TODO: is it okay to ignore these?
  309.  
  310.     def remove(self, (subject, predicate, object_), context):
  311.         assert self.__open, "The Store must be open."
  312.         Store.remove(self, (subject, predicate, object_), context)
  313.         _to_string = self._to_string
  314.         if context is not None:
  315.             if context == self:
  316.                 context = None
  317.  
  318.         if subject is not None and predicate is not None and object_ is not None and context is not None:
  319.             s = _to_string(subject)
  320.             p = _to_string(predicate)
  321.             o = _to_string(object_)
  322.             c = _to_string(context)
  323.             value = self.__indicies[0].get("%s^%s^%s^%s^" % (c, s, p, o),txn=self.dbTxn)
  324.             if value is not None:
  325.                 self.__remove((s, p, o), c)
  326.                 self.__needs_sync = True
  327.         else:
  328.             cspo, cpos, cosp = self.__indicies
  329.             index, prefix, from_key, results_from_key = self.__lookup((subject, predicate, object_), context)
  330.  
  331.             cursor = index.cursor(txn=self.dbTxn)
  332.             try:
  333.                 current = cursor.set_range(prefix)
  334.                 needs_sync = True
  335.             except db.DBNotFoundError:
  336.                 current = None
  337.                 needs_sync = False
  338.             cursor.close()
  339.             while current:
  340.                 key, value = current
  341.                 cursor = index.cursor(txn=self.dbTxn)
  342.                 try:
  343.                     cursor.set_range(key)
  344.                     current = cursor.next()
  345.                 except db.DBNotFoundError:
  346.                     current = None
  347.                 cursor.close()
  348.                 if key.startswith(prefix):
  349.                     c, s, p, o = from_key(key)
  350.                     if context is None:
  351.                         contexts_value = index.get(key,txn=self.dbTxn) or ""
  352.                         contexts = set(contexts_value.split("^")) # remove triple from all non quoted contexts
  353.                         contexts.add("") # and from the conjunctive index
  354.                         for c in contexts:
  355.                             for i, _to_key, _ in self.__indicies_info:
  356.                                 i.delete(_to_key((s, p, o), c),txn=self.dbTxn)
  357.                     else:
  358.                         self.__remove((s, p, o), c)
  359.                 else:
  360.                     break
  361.  
  362.             if context is not None:
  363.                 if subject is None and predicate is None and object_ is None:
  364.                     # TODO: also if context becomes empty and not just on remove((None, None, None), c)
  365.                     try:
  366.                         self.__contexts.delete(_to_string(context),txn=self.dbTxn)
  367.                     except db.DBNotFoundError, e:
  368.                         pass
  369.  
  370.             self.__needs_sync = needs_sync
  371.  
  372.     def triples(self, (subject, predicate, object_), context=None):
  373.         """A generator over all the triples matching """
  374.         assert self.__open, "The Store must be open."
  375.  
  376.         if context is not None:
  377.             if context == self:
  378.                 context = None
  379.  
  380.         _from_string = self._from_string
  381.         index, prefix, from_key, results_from_key = self.__lookup((subject, predicate, object_), context)
  382.  
  383.         cursor = index.cursor(txn=self.dbTxn)
  384.         try:
  385.             current = cursor.set_range(prefix)
  386.         except db.DBNotFoundError:
  387.             current = None
  388.         cursor.close()
  389.         while current:
  390.             key, value = current
  391.             cursor = index.cursor(txn=self.dbTxn)
  392.             try:
  393.                 cursor.set_range(key)
  394.                 current = cursor.next()
  395.             except db.DBNotFoundError:
  396.                 current = None
  397.             cursor.close()
  398.             if key and key.startswith(prefix):
  399.                 contexts_value = index.get(key,txn=self.dbTxn)
  400.                 yield results_from_key(key, subject, predicate, object_, contexts_value)
  401.             else:
  402.                 break
  403.  
  404.     def __len__(self, context=None):
  405.         assert self.__open, "The Store must be open."
  406.         if context is not None:
  407.             if context == self:
  408.                 context = None
  409.  
  410.         if context is None:
  411.             prefix = "^"
  412.         else:
  413.             prefix = "%s^" % self._to_string(context)
  414.  
  415.         index = self.__indicies[0]
  416.         cursor = index.cursor(txn=self.dbTxn)
  417.         current = cursor.set_range(prefix)
  418.         count = 0
  419.         while current:
  420.             key, value = current
  421.             if key.startswith(prefix):
  422.                 count +=1
  423.                 current = cursor.next()
  424.             else:
  425.                 break
  426.         cursor.close()
  427.         return count
  428.  
  429.     def bind(self, prefix, namespace):
  430.         prefix = prefix.encode("utf-8")
  431.         namespace = namespace.encode("utf-8")
  432.         bound_prefix = self.__prefix.get(namespace,txn=self.dbTxn)
  433.         if bound_prefix:
  434.             self.__namespace.delete(bound_prefix,txn=self.dbTxn)
  435.         self.__prefix.put(namespace, prefix,self.dbTxn)
  436.         #self.__prefix[namespace] = prefix
  437.         self.__namespace.put(prefix, namespace,self.dbTxn)
  438.         #self.__namespace[prefix] = namespace
  439.  
  440.     def namespace(self, prefix):
  441.         prefix = prefix.encode("utf-8")
  442.         return self.__namespace.get(prefix, None,txn=self.dbTxn)
  443.  
  444.     def prefix(self, namespace):
  445.         namespace = namespace.encode("utf-8")
  446.         return self.__prefix.get(namespace, None,txn=self.dbTxn)
  447.  
  448.     def namespaces(self):
  449.         cursor = self.__namespace.cursor(txn=self.dbTxn)
  450.         results = []
  451.         current = cursor.first()
  452.         while current:
  453.             prefix, namespace = current
  454.             results.append((prefix, namespace))
  455.             current = cursor.next()
  456.         cursor.close()
  457.         for prefix, namespace in results:
  458.             yield prefix, URIRef(namespace)
  459.  
  460.     def contexts(self, triple=None):
  461.         _from_string = self._from_string
  462.         _to_string = self._to_string
  463.  
  464.         if triple:
  465.             s, p, o = triple
  466.             s = _to_string(s)
  467.             p = _to_string(p)
  468.             o = _to_string(o)
  469.             contexts = self.__indicies[0].get("%s^%s^%s^%s^" % ("", s, p, o),txn=self.dbTxn)
  470.             if contexts:
  471.                 for c in contexts.split("^"):
  472.                     if c:
  473.                         yield _from_string(c)
  474.         else:
  475.             index = self.__contexts
  476.             cursor = index.cursor(txn=self.dbTxn)
  477.             current = cursor.first()
  478.             cursor.close()
  479.             while current:
  480.                 key, value = current
  481.                 context = _from_string(key)
  482.                 yield context
  483.                 cursor = index.cursor(txn=self.dbTxn)
  484.                 try:
  485.                     cursor.set_range(key)
  486.                     current = cursor.next()
  487.                 except db.DBNotFoundError:
  488.                     current = None
  489.                 cursor.close()
  490.  
  491.     def _from_string(self, i):
  492.         k = self.__i2k.get(i,txn=self.dbTxn)
  493.         return self._loads(k)
  494.  
  495.     def _to_string(self, term):
  496.         """
  497.         i2k:  hashString -> pickledTerm
  498.         
  499.         i2k basically stores the reverse lookup of the MD5 hash of the term 
  500.         
  501.         """
  502.         assert term is not None
  503.         # depending on what the space time trade off looks like we
  504.         # might still want to record k2i as well. Also recording would
  505.         # protect against hash algo changing.
  506.         i = term.md5_term_hash()
  507.         k = self.__i2k.get(i, txn=self.dbTxn)
  508.         if k is None:
  509.             self.__i2k.put(i,self._dumps(term),txn=self.dbTxn)
  510.         return i
  511.  
  512.     def __lookup(self, (subject, predicate, object_), context):
  513.         _to_string = self._to_string
  514.         if context is not None:
  515.             context = _to_string(context)
  516.         i = 0
  517.         if subject is not None:
  518.             i += 1
  519.             subject = _to_string(subject)
  520.         if predicate is not None:
  521.             i += 2
  522.             predicate = _to_string(predicate)
  523.         if object_ is not None:
  524.             i += 4
  525.             object_ = _to_string(object_)
  526.         index, prefix_func, from_key, results_from_key = self.__lookup_dict[i]
  527.         prefix = "^".join(prefix_func((subject, predicate, object_), context))
  528.         return index, prefix, from_key, results_from_key
  529.  
  530.  
  531. def to_key_func(i):
  532.     def to_key(triple, context):
  533.         "Takes a string; returns key"
  534.         return "^".join((context, triple[i%3], triple[(i+1)%3], triple[(i+2)%3], "")) # "" to tac on the trailing ^
  535.     return to_key
  536.  
  537. def from_key_func(i):
  538.     def from_key(key):
  539.         "Takes a key; returns string"
  540.         parts = key.split("^")
  541.         return parts[0], parts[(3-i+0)%3+1], parts[(3-i+1)%3+1], parts[(3-i+2)%3+1]
  542.     return from_key
  543.  
  544. def results_from_key_func(i, from_string):
  545.     def from_key(key, subject, predicate, object_, contexts_value):
  546.         "Takes a key and subject, predicate, object; returns tuple for yield"
  547.         parts = key.split("^")
  548.         if subject is None:
  549.             # TODO: i & 1: # dis assemble and/or measure to see which is faster
  550.             # subject is None or i & 1
  551.             s = from_string(parts[(3-i+0)%3+1])
  552.         else:
  553.             s = subject
  554.         if predicate is None:#i & 2:
  555.             p = from_string(parts[(3-i+1)%3+1])
  556.         else:
  557.             p = predicate
  558.         if object_ is None:#i & 4:
  559.             o = from_string(parts[(3-i+2)%3+1])
  560.         else:
  561.             o = object_
  562.         return (s, p, o), (from_string(c) for c in contexts_value.split("^") if c)
  563.     return from_key
  564.  
  565. def readable_index(i):
  566.     s, p, o = "?" * 3
  567.     if i & 1: s = "s"
  568.     if i & 2: p = "p"
  569.     if i & 4: o = "o"
  570.     return "%s,%s,%s" % (s, p, o)
  571.