home *** CD-ROM | disk | FTP | other *** search
/ H4CK3R 14 / hacker14.iso / programacao / pythonwin / python.exe / TEST_WEAKREF.PY < prev    next >
Encoding:
Python Source  |  2003-07-14  |  21.0 KB  |  618 lines

  1. import sys
  2. import unittest
  3. import UserList
  4. import weakref
  5.  
  6. from test import test_support
  7. from sets import Set
  8.  
  9.  
  10. class C:
  11.     def method(self):
  12.         pass
  13.  
  14.  
  15. class Callable:
  16.     bar = None
  17.  
  18.     def __call__(self, x):
  19.         self.bar = x
  20.  
  21.  
  22. def create_function():
  23.     def f(): pass
  24.     return f
  25.  
  26. def create_bound_method():
  27.     return C().method
  28.  
  29. def create_unbound_method():
  30.     return C.method
  31.  
  32.  
  33. class TestBase(unittest.TestCase):
  34.  
  35.     def setUp(self):
  36.         self.cbcalled = 0
  37.  
  38.     def callback(self, ref):
  39.         self.cbcalled += 1
  40.  
  41.  
  42. class ReferencesTestCase(TestBase):
  43.  
  44.     def test_basic_ref(self):
  45.         self.check_basic_ref(C)
  46.         self.check_basic_ref(create_function)
  47.         self.check_basic_ref(create_bound_method)
  48.         self.check_basic_ref(create_unbound_method)
  49.  
  50.         # Just make sure the tp_repr handler doesn't raise an exception.
  51.         # Live reference:
  52.         o = C()
  53.         wr = weakref.ref(o)
  54.         `wr`
  55.         # Dead reference:
  56.         del o
  57.         `wr`
  58.  
  59.     def test_basic_callback(self):
  60.         self.check_basic_callback(C)
  61.         self.check_basic_callback(create_function)
  62.         self.check_basic_callback(create_bound_method)
  63.         self.check_basic_callback(create_unbound_method)
  64.  
  65.     def test_multiple_callbacks(self):
  66.         o = C()
  67.         ref1 = weakref.ref(o, self.callback)
  68.         ref2 = weakref.ref(o, self.callback)
  69.         del o
  70.         self.assert_(ref1() is None,
  71.                      "expected reference to be invalidated")
  72.         self.assert_(ref2() is None,
  73.                      "expected reference to be invalidated")
  74.         self.assert_(self.cbcalled == 2,
  75.                      "callback not called the right number of times")
  76.  
  77.     def test_multiple_selfref_callbacks(self):
  78.         # Make sure all references are invalidated before callbacks are called
  79.         #
  80.         # What's important here is that we're using the first
  81.         # reference in the callback invoked on the second reference
  82.         # (the most recently created ref is cleaned up first).  This
  83.         # tests that all references to the object are invalidated
  84.         # before any of the callbacks are invoked, so that we only
  85.         # have one invocation of _weakref.c:cleanup_helper() active
  86.         # for a particular object at a time.
  87.         #
  88.         def callback(object, self=self):
  89.             self.ref()
  90.         c = C()
  91.         self.ref = weakref.ref(c, callback)
  92.         ref1 = weakref.ref(c, callback)
  93.         del c
  94.  
  95.     def test_proxy_ref(self):
  96.         o = C()
  97.         o.bar = 1
  98.         ref1 = weakref.proxy(o, self.callback)
  99.         ref2 = weakref.proxy(o, self.callback)
  100.         del o
  101.  
  102.         def check(proxy):
  103.             proxy.bar
  104.  
  105.         self.assertRaises(weakref.ReferenceError, check, ref1)
  106.         self.assertRaises(weakref.ReferenceError, check, ref2)
  107.         self.assert_(self.cbcalled == 2)
  108.  
  109.     def check_basic_ref(self, factory):
  110.         o = factory()
  111.         ref = weakref.ref(o)
  112.         self.assert_(ref() is not None,
  113.                      "weak reference to live object should be live")
  114.         o2 = ref()
  115.         self.assert_(o is o2,
  116.                      "<ref>() should return original object if live")
  117.  
  118.     def check_basic_callback(self, factory):
  119.         self.cbcalled = 0
  120.         o = factory()
  121.         ref = weakref.ref(o, self.callback)
  122.         del o
  123.         self.assert_(self.cbcalled == 1,
  124.                      "callback did not properly set 'cbcalled'")
  125.         self.assert_(ref() is None,
  126.                      "ref2 should be dead after deleting object reference")
  127.  
  128.     def test_ref_reuse(self):
  129.         o = C()
  130.         ref1 = weakref.ref(o)
  131.         # create a proxy to make sure that there's an intervening creation
  132.         # between these two; it should make no difference
  133.         proxy = weakref.proxy(o)
  134.         ref2 = weakref.ref(o)
  135.         self.assert_(ref1 is ref2,
  136.                      "reference object w/out callback should be re-used")
  137.  
  138.         o = C()
  139.         proxy = weakref.proxy(o)
  140.         ref1 = weakref.ref(o)
  141.         ref2 = weakref.ref(o)
  142.         self.assert_(ref1 is ref2,
  143.                      "reference object w/out callback should be re-used")
  144.         self.assert_(weakref.getweakrefcount(o) == 2,
  145.                      "wrong weak ref count for object")
  146.         del proxy
  147.         self.assert_(weakref.getweakrefcount(o) == 1,
  148.                      "wrong weak ref count for object after deleting proxy")
  149.  
  150.     def test_proxy_reuse(self):
  151.         o = C()
  152.         proxy1 = weakref.proxy(o)
  153.         ref = weakref.ref(o)
  154.         proxy2 = weakref.proxy(o)
  155.         self.assert_(proxy1 is proxy2,
  156.                      "proxy object w/out callback should have been re-used")
  157.  
  158.     def test_basic_proxy(self):
  159.         o = C()
  160.         self.check_proxy(o, weakref.proxy(o))
  161.  
  162.         L = UserList.UserList()
  163.         p = weakref.proxy(L)
  164.         self.failIf(p, "proxy for empty UserList should be false")
  165.         p.append(12)
  166.         self.assertEqual(len(L), 1)
  167.         self.failUnless(p, "proxy for non-empty UserList should be true")
  168.         p[:] = [2, 3]
  169.         self.assertEqual(len(L), 2)
  170.         self.assertEqual(len(p), 2)
  171.         self.failUnless(3 in p,
  172.                         "proxy didn't support __contains__() properly")
  173.         p[1] = 5
  174.         self.assertEqual(L[1], 5)
  175.         self.assertEqual(p[1], 5)
  176.         L2 = UserList.UserList(L)
  177.         p2 = weakref.proxy(L2)
  178.         self.assertEqual(p, p2)
  179.         ## self.assertEqual(`L2`, `p2`)
  180.         L3 = UserList.UserList(range(10))
  181.         p3 = weakref.proxy(L3)
  182.         self.assertEqual(L3[:], p3[:])
  183.         self.assertEqual(L3[5:], p3[5:])
  184.         self.assertEqual(L3[:5], p3[:5])
  185.         self.assertEqual(L3[2:5], p3[2:5])
  186.  
  187.     def test_callable_proxy(self):
  188.         o = Callable()
  189.         ref1 = weakref.proxy(o)
  190.  
  191.         self.check_proxy(o, ref1)
  192.  
  193.         self.assert_(type(ref1) is weakref.CallableProxyType,
  194.                      "proxy is not of callable type")
  195.         ref1('twinkies!')
  196.         self.assert_(o.bar == 'twinkies!',
  197.                      "call through proxy not passed through to original")
  198.         ref1(x='Splat.')
  199.         self.assert_(o.bar == 'Splat.',
  200.                      "call through proxy not passed through to original")
  201.  
  202.         # expect due to too few args
  203.         self.assertRaises(TypeError, ref1)
  204.  
  205.         # expect due to too many args
  206.         self.assertRaises(TypeError, ref1, 1, 2, 3)
  207.  
  208.     def check_proxy(self, o, proxy):
  209.         o.foo = 1
  210.         self.assert_(proxy.foo == 1,
  211.                      "proxy does not reflect attribute addition")
  212.         o.foo = 2
  213.         self.assert_(proxy.foo == 2,
  214.                      "proxy does not reflect attribute modification")
  215.         del o.foo
  216.         self.assert_(not hasattr(proxy, 'foo'),
  217.                      "proxy does not reflect attribute removal")
  218.  
  219.         proxy.foo = 1
  220.         self.assert_(o.foo == 1,
  221.                      "object does not reflect attribute addition via proxy")
  222.         proxy.foo = 2
  223.         self.assert_(
  224.             o.foo == 2,
  225.             "object does not reflect attribute modification via proxy")
  226.         del proxy.foo
  227.         self.assert_(not hasattr(o, 'foo'),
  228.                      "object does not reflect attribute removal via proxy")
  229.  
  230.     def test_proxy_deletion(self):
  231.         # Test clearing of SF bug #762891
  232.         class Foo:
  233.             result = None
  234.             def __delitem__(self, accessor):
  235.                 self.result = accessor
  236.         g = Foo()
  237.         f = weakref.proxy(g)
  238.         del f[0]
  239.         self.assertEqual(f.result, 0)
  240.  
  241.     def test_getweakrefcount(self):
  242.         o = C()
  243.         ref1 = weakref.ref(o)
  244.         ref2 = weakref.ref(o, self.callback)
  245.         self.assert_(weakref.getweakrefcount(o) == 2,
  246.                      "got wrong number of weak reference objects")
  247.  
  248.         proxy1 = weakref.proxy(o)
  249.         proxy2 = weakref.proxy(o, self.callback)
  250.         self.assert_(weakref.getweakrefcount(o) == 4,
  251.                      "got wrong number of weak reference objects")
  252.  
  253.     def test_getweakrefs(self):
  254.         o = C()
  255.         ref1 = weakref.ref(o, self.callback)
  256.         ref2 = weakref.ref(o, self.callback)
  257.         del ref1
  258.         self.assert_(weakref.getweakrefs(o) == [ref2],
  259.                      "list of refs does not match")
  260.  
  261.         o = C()
  262.         ref1 = weakref.ref(o, self.callback)
  263.         ref2 = weakref.ref(o, self.callback)
  264.         del ref2
  265.         self.assert_(weakref.getweakrefs(o) == [ref1],
  266.                      "list of refs does not match")
  267.  
  268.     def test_newstyle_number_ops(self):
  269.         class F(float):
  270.             pass
  271.         f = F(2.0)
  272.         p = weakref.proxy(f)
  273.         self.assert_(p + 1.0 == 3.0)
  274.         self.assert_(1.0 + p == 3.0)  # this used to SEGV
  275.  
  276.     def test_callbacks_protected(self):
  277.         # Callbacks protected from already-set exceptions?
  278.         # Regression test for SF bug #478534.
  279.         class BogusError(Exception):
  280.             pass
  281.         data = {}
  282.         def remove(k):
  283.             del data[k]
  284.         def encapsulate():
  285.             f = lambda : ()
  286.             data[weakref.ref(f, remove)] = None
  287.             raise BogusError
  288.         try:
  289.             encapsulate()
  290.         except BogusError:
  291.             pass
  292.         else:
  293.             self.fail("exception not properly restored")
  294.         try:
  295.             encapsulate()
  296.         except BogusError:
  297.             pass
  298.         else:
  299.             self.fail("exception not properly restored")
  300.  
  301.  
  302. class Object:
  303.     def __init__(self, arg):
  304.         self.arg = arg
  305.     def __repr__(self):
  306.         return "<Object %r>" % self.arg
  307.  
  308.  
  309. class MappingTestCase(TestBase):
  310.  
  311.     COUNT = 10
  312.  
  313.     def test_weak_values(self):
  314.         #
  315.         #  This exercises d.copy(), d.items(), d[], del d[], len(d).
  316.         #
  317.         dict, objects = self.make_weak_valued_dict()
  318.         for o in objects:
  319.             self.assert_(weakref.getweakrefcount(o) == 1,
  320.                          "wrong number of weak references to %r!" % o)
  321.             self.assert_(o is dict[o.arg],
  322.                          "wrong object returned by weak dict!")
  323.         items1 = dict.items()
  324.         items2 = dict.copy().items()
  325.         items1.sort()
  326.         items2.sort()
  327.         self.assert_(items1 == items2,
  328.                      "cloning of weak-valued dictionary did not work!")
  329.         del items1, items2
  330.         self.assert_(len(dict) == self.COUNT)
  331.         del objects[0]
  332.         self.assert_(len(dict) == (self.COUNT - 1),
  333.                      "deleting object did not cause dictionary update")
  334.         del objects, o
  335.         self.assert_(len(dict) == 0,
  336.                      "deleting the values did not clear the dictionary")
  337.         # regression on SF bug #447152:
  338.         dict = weakref.WeakValueDictionary()
  339.         self.assertRaises(KeyError, dict.__getitem__, 1)
  340.         dict[2] = C()
  341.         self.assertRaises(KeyError, dict.__getitem__, 2)
  342.  
  343.     def test_weak_keys(self):
  344.         #
  345.         #  This exercises d.copy(), d.items(), d[] = v, d[], del d[],
  346.         #  len(d), d.has_key().
  347.         #
  348.         dict, objects = self.make_weak_keyed_dict()
  349.         for o in objects:
  350.             self.assert_(weakref.getweakrefcount(o) == 1,
  351.                          "wrong number of weak references to %r!" % o)
  352.             self.assert_(o.arg is dict[o],
  353.                          "wrong object returned by weak dict!")
  354.         items1 = dict.items()
  355.         items2 = dict.copy().items()
  356.         self.assert_(Set(items1) == Set(items2),
  357.                      "cloning of weak-keyed dictionary did not work!")
  358.         del items1, items2
  359.         self.assert_(len(dict) == self.COUNT)
  360.         del objects[0]
  361.         self.assert_(len(dict) == (self.COUNT - 1),
  362.                      "deleting object did not cause dictionary update")
  363.         del objects, o
  364.         self.assert_(len(dict) == 0,
  365.                      "deleting the keys did not clear the dictionary")
  366.         o = Object(42)
  367.         dict[o] = "What is the meaning of the universe?"
  368.         self.assert_(dict.has_key(o))
  369.         self.assert_(not dict.has_key(34))
  370.  
  371.     def test_weak_keyed_iters(self):
  372.         dict, objects = self.make_weak_keyed_dict()
  373.         self.check_iters(dict)
  374.  
  375.     def test_weak_valued_iters(self):
  376.         dict, objects = self.make_weak_valued_dict()
  377.         self.check_iters(dict)
  378.  
  379.     def check_iters(self, dict):
  380.         # item iterator:
  381.         items = dict.items()
  382.         for item in dict.iteritems():
  383.             items.remove(item)
  384.         self.assert_(len(items) == 0, "iteritems() did not touch all items")
  385.  
  386.         # key iterator, via __iter__():
  387.         keys = dict.keys()
  388.         for k in dict:
  389.             keys.remove(k)
  390.         self.assert_(len(keys) == 0, "__iter__() did not touch all keys")
  391.  
  392.         # key iterator, via iterkeys():
  393.         keys = dict.keys()
  394.         for k in dict.iterkeys():
  395.             keys.remove(k)
  396.         self.assert_(len(keys) == 0, "iterkeys() did not touch all keys")
  397.  
  398.         # value iterator:
  399.         values = dict.values()
  400.         for v in dict.itervalues():
  401.             values.remove(v)
  402.         self.assert_(len(values) == 0,
  403.                      "itervalues() did not touch all values")
  404.  
  405.     def test_make_weak_keyed_dict_from_dict(self):
  406.         o = Object(3)
  407.         dict = weakref.WeakKeyDictionary({o:364})
  408.         self.assert_(dict[o] == 364)
  409.  
  410.     def test_make_weak_keyed_dict_from_weak_keyed_dict(self):
  411.         o = Object(3)
  412.         dict = weakref.WeakKeyDictionary({o:364})
  413.         dict2 = weakref.WeakKeyDictionary(dict)
  414.         self.assert_(dict[o] == 364)
  415.  
  416.     def make_weak_keyed_dict(self):
  417.         dict = weakref.WeakKeyDictionary()
  418.         objects = map(Object, range(self.COUNT))
  419.         for o in objects:
  420.             dict[o] = o.arg
  421.         return dict, objects
  422.  
  423.     def make_weak_valued_dict(self):
  424.         dict = weakref.WeakValueDictionary()
  425.         objects = map(Object, range(self.COUNT))
  426.         for o in objects:
  427.             dict[o.arg] = o
  428.         return dict, objects
  429.  
  430.     def check_popitem(self, klass, key1, value1, key2, value2):
  431.         weakdict = klass()
  432.         weakdict[key1] = value1
  433.         weakdict[key2] = value2
  434.         self.assert_(len(weakdict) == 2)
  435.         k, v = weakdict.popitem()
  436.         self.assert_(len(weakdict) == 1)
  437.         if k is key1:
  438.             self.assert_(v is value1)
  439.         else:
  440.             self.assert_(v is value2)
  441.         k, v = weakdict.popitem()
  442.         self.assert_(len(weakdict) == 0)
  443.         if k is key1:
  444.             self.assert_(v is value1)
  445.         else:
  446.             self.assert_(v is value2)
  447.  
  448.     def test_weak_valued_dict_popitem(self):
  449.         self.check_popitem(weakref.WeakValueDictionary,
  450.                            "key1", C(), "key2", C())
  451.  
  452.     def test_weak_keyed_dict_popitem(self):
  453.         self.check_popitem(weakref.WeakKeyDictionary,
  454.                            C(), "value 1", C(), "value 2")
  455.  
  456.     def check_setdefault(self, klass, key, value1, value2):
  457.         self.assert_(value1 is not value2,
  458.                      "invalid test"
  459.                      " -- value parameters must be distinct objects")
  460.         weakdict = klass()
  461.         o = weakdict.setdefault(key, value1)
  462.         self.assert_(o is value1)
  463.         self.assert_(weakdict.has_key(key))
  464.         self.assert_(weakdict.get(key) is value1)
  465.         self.assert_(weakdict[key] is value1)
  466.  
  467.         o = weakdict.setdefault(key, value2)
  468.         self.assert_(o is value1)
  469.         self.assert_(weakdict.has_key(key))
  470.         self.assert_(weakdict.get(key) is value1)
  471.         self.assert_(weakdict[key] is value1)
  472.  
  473.     def test_weak_valued_dict_setdefault(self):
  474.         self.check_setdefault(weakref.WeakValueDictionary,
  475.                               "key", C(), C())
  476.  
  477.     def test_weak_keyed_dict_setdefault(self):
  478.         self.check_setdefault(weakref.WeakKeyDictionary,
  479.                               C(), "value 1", "value 2")
  480.  
  481.     def check_update(self, klass, dict):
  482.         #
  483.         #  This exercises d.update(), len(d), d.keys(), d.has_key(),
  484.         #  d.get(), d[].
  485.         #
  486.         weakdict = klass()
  487.         weakdict.update(dict)
  488.         self.assert_(len(weakdict) == len(dict))
  489.         for k in weakdict.keys():
  490.             self.assert_(dict.has_key(k),
  491.                          "mysterious new key appeared in weak dict")
  492.             v = dict.get(k)
  493.             self.assert_(v is weakdict[k])
  494.             self.assert_(v is weakdict.get(k))
  495.         for k in dict.keys():
  496.             self.assert_(weakdict.has_key(k),
  497.                          "original key disappeared in weak dict")
  498.             v = dict[k]
  499.             self.assert_(v is weakdict[k])
  500.             self.assert_(v is weakdict.get(k))
  501.  
  502.     def test_weak_valued_dict_update(self):
  503.         self.check_update(weakref.WeakValueDictionary,
  504.                           {1: C(), 'a': C(), C(): C()})
  505.  
  506.     def test_weak_keyed_dict_update(self):
  507.         self.check_update(weakref.WeakKeyDictionary,
  508.                           {C(): 1, C(): 2, C(): 3})
  509.  
  510.     def test_weak_keyed_delitem(self):
  511.         d = weakref.WeakKeyDictionary()
  512.         o1 = Object('1')
  513.         o2 = Object('2')
  514.         d[o1] = 'something'
  515.         d[o2] = 'something'
  516.         self.assert_(len(d) == 2)
  517.         del d[o1]
  518.         self.assert_(len(d) == 1)
  519.         self.assert_(d.keys() == [o2])
  520.  
  521.     def test_weak_valued_delitem(self):
  522.         d = weakref.WeakValueDictionary()
  523.         o1 = Object('1')
  524.         o2 = Object('2')
  525.         d['something'] = o1
  526.         d['something else'] = o2
  527.         self.assert_(len(d) == 2)
  528.         del d['something']
  529.         self.assert_(len(d) == 1)
  530.         self.assert_(d.items() == [('something else', o2)])
  531.  
  532.     def test_weak_keyed_bad_delitem(self):
  533.         d = weakref.WeakKeyDictionary()
  534.         o = Object('1')
  535.         # An attempt to delete an object that isn't there should raise
  536.         # KeyError.  It didn't before 2.3.
  537.         self.assertRaises(KeyError, d.__delitem__, o)
  538.         self.assertRaises(KeyError, d.__getitem__, o)
  539.  
  540.         # If a key isn't of a weakly referencable type, __getitem__ and
  541.         # __setitem__ raise TypeError.  __delitem__ should too.
  542.         self.assertRaises(TypeError, d.__delitem__,  13)
  543.         self.assertRaises(TypeError, d.__getitem__,  13)
  544.         self.assertRaises(TypeError, d.__setitem__,  13, 13)
  545.  
  546.     def test_weak_keyed_cascading_deletes(self):
  547.         # SF bug 742860.  For some reason, before 2.3 __delitem__ iterated
  548.         # over the keys via self.data.iterkeys().  If things vanished from
  549.         # the dict during this (or got added), that caused a RuntimeError.
  550.  
  551.         d = weakref.WeakKeyDictionary()
  552.         mutate = False
  553.  
  554.         class C(object):
  555.             def __init__(self, i):
  556.                 self.value = i
  557.             def __hash__(self):
  558.                 return hash(self.value)
  559.             def __eq__(self, other):
  560.                 if mutate:
  561.                     # Side effect that mutates the dict, by removing the
  562.                     # last strong reference to a key.
  563.                     del objs[-1]
  564.                 return self.value == other.value
  565.  
  566.         objs = [C(i) for i in range(4)]
  567.         for o in objs:
  568.             d[o] = o.value
  569.         del o   # now the only strong references to keys are in objs
  570.         # Find the order in which iterkeys sees the keys.
  571.         objs = d.keys()
  572.         # Reverse it, so that the iteration implementation of __delitem__
  573.         # has to keep looping to find the first object we delete.
  574.         objs.reverse()
  575.  
  576.         # Turn on mutation in C.__eq__.  The first time thru the loop,
  577.         # under the iterkeys() business the first comparison will delete
  578.         # the last item iterkeys() would see, and that causes a
  579.         #     RuntimeError: dictionary changed size during iteration
  580.         # when the iterkeys() loop goes around to try comparing the next
  581.         # key.  After this was fixed, it just deletes the last object *our*
  582.         # "for o in obj" loop would have gotten to.
  583.         mutate = True
  584.         count = 0
  585.         for o in objs:
  586.             count += 1
  587.             del d[o]
  588.         self.assertEqual(len(d), 0)
  589.         self.assertEqual(count, 2)
  590.  
  591. from test_userdict import TestMappingProtocol
  592.  
  593. class WeakValueDictionaryTestCase(TestMappingProtocol):
  594.     """Check that WeakValueDictionary conforms to the mapping protocol"""
  595.     __ref = {"key1":Object(1), "key2":Object(2), "key3":Object(3)}
  596.     _tested_class = weakref.WeakValueDictionary
  597.     def _reference(self):
  598.         return self.__ref.copy()
  599.  
  600. class WeakKeyDictionaryTestCase(TestMappingProtocol):
  601.     """Check that WeakKeyDictionary conforms to the mapping protocol"""
  602.     __ref = {Object("key1"):1, Object("key2"):2, Object("key3"):3}
  603.     _tested_class = weakref.WeakKeyDictionary
  604.     def _reference(self):
  605.         return self.__ref.copy()
  606.  
  607. def test_main():
  608.     test_support.run_unittest(
  609.         ReferencesTestCase,
  610.         MappingTestCase,
  611.         WeakValueDictionaryTestCase,
  612.         WeakKeyDictionaryTestCase,
  613.         )
  614.  
  615.  
  616. if __name__ == "__main__":
  617.     test_main()
  618.