1058 lines
		
	
	
		
			32 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			1058 lines
		
	
	
		
			32 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| """Unit tests for contextlib.py, and other context managers."""
 | |
| 
 | |
| import io
 | |
| import sys
 | |
| import tempfile
 | |
| import threading
 | |
| import unittest
 | |
| from contextlib import *  # Tests __all__
 | |
| from test import support
 | |
| import weakref
 | |
| 
 | |
| 
 | |
| class TestAbstractContextManager(unittest.TestCase):
 | |
| 
 | |
|     def test_enter(self):
 | |
|         class DefaultEnter(AbstractContextManager):
 | |
|             def __exit__(self, *args):
 | |
|                 super().__exit__(*args)
 | |
| 
 | |
|         manager = DefaultEnter()
 | |
|         self.assertIs(manager.__enter__(), manager)
 | |
| 
 | |
|     def test_exit_is_abstract(self):
 | |
|         class MissingExit(AbstractContextManager):
 | |
|             pass
 | |
| 
 | |
|         with self.assertRaises(TypeError):
 | |
|             MissingExit()
 | |
| 
 | |
|     def test_structural_subclassing(self):
 | |
|         class ManagerFromScratch:
 | |
|             def __enter__(self):
 | |
|                 return self
 | |
|             def __exit__(self, exc_type, exc_value, traceback):
 | |
|                 return None
 | |
| 
 | |
|         self.assertTrue(issubclass(ManagerFromScratch, AbstractContextManager))
 | |
| 
 | |
|         class DefaultEnter(AbstractContextManager):
 | |
|             def __exit__(self, *args):
 | |
|                 super().__exit__(*args)
 | |
| 
 | |
|         self.assertTrue(issubclass(DefaultEnter, AbstractContextManager))
 | |
| 
 | |
|         class NoEnter(ManagerFromScratch):
 | |
|             __enter__ = None
 | |
| 
 | |
|         self.assertFalse(issubclass(NoEnter, AbstractContextManager))
 | |
| 
 | |
|         class NoExit(ManagerFromScratch):
 | |
|             __exit__ = None
 | |
| 
 | |
|         self.assertFalse(issubclass(NoExit, AbstractContextManager))
 | |
| 
 | |
| 
 | |
| class ContextManagerTestCase(unittest.TestCase):
 | |
| 
 | |
|     def test_contextmanager_plain(self):
 | |
|         state = []
 | |
|         @contextmanager
 | |
|         def woohoo():
 | |
|             state.append(1)
 | |
|             yield 42
 | |
|             state.append(999)
 | |
|         with woohoo() as x:
 | |
|             self.assertEqual(state, [1])
 | |
|             self.assertEqual(x, 42)
 | |
|             state.append(x)
 | |
|         self.assertEqual(state, [1, 42, 999])
 | |
| 
 | |
|     def test_contextmanager_finally(self):
 | |
|         state = []
 | |
|         @contextmanager
 | |
|         def woohoo():
 | |
|             state.append(1)
 | |
|             try:
 | |
|                 yield 42
 | |
|             finally:
 | |
|                 state.append(999)
 | |
|         with self.assertRaises(ZeroDivisionError):
 | |
|             with woohoo() as x:
 | |
|                 self.assertEqual(state, [1])
 | |
|                 self.assertEqual(x, 42)
 | |
|                 state.append(x)
 | |
|                 raise ZeroDivisionError()
 | |
|         self.assertEqual(state, [1, 42, 999])
 | |
| 
 | |
|     def test_contextmanager_no_reraise(self):
 | |
|         @contextmanager
 | |
|         def whee():
 | |
|             yield
 | |
|         ctx = whee()
 | |
|         ctx.__enter__()
 | |
|         # Calling __exit__ should not result in an exception
 | |
|         self.assertFalse(ctx.__exit__(TypeError, TypeError("foo"), None))
 | |
| 
 | |
|     def test_contextmanager_trap_yield_after_throw(self):
 | |
|         @contextmanager
 | |
|         def whoo():
 | |
|             try:
 | |
|                 yield
 | |
|             except:
 | |
|                 yield
 | |
|         ctx = whoo()
 | |
|         ctx.__enter__()
 | |
|         self.assertRaises(
 | |
|             RuntimeError, ctx.__exit__, TypeError, TypeError("foo"), None
 | |
|         )
 | |
| 
 | |
|     def test_contextmanager_except(self):
 | |
|         state = []
 | |
|         @contextmanager
 | |
|         def woohoo():
 | |
|             state.append(1)
 | |
|             try:
 | |
|                 yield 42
 | |
|             except ZeroDivisionError as e:
 | |
|                 state.append(e.args[0])
 | |
|                 self.assertEqual(state, [1, 42, 999])
 | |
|         with woohoo() as x:
 | |
|             self.assertEqual(state, [1])
 | |
|             self.assertEqual(x, 42)
 | |
|             state.append(x)
 | |
|             raise ZeroDivisionError(999)
 | |
|         self.assertEqual(state, [1, 42, 999])
 | |
| 
 | |
|     def test_contextmanager_except_stopiter(self):
 | |
|         stop_exc = StopIteration('spam')
 | |
|         @contextmanager
 | |
|         def woohoo():
 | |
|             yield
 | |
|         try:
 | |
|             with self.assertWarnsRegex(DeprecationWarning,
 | |
|                                        "StopIteration"):
 | |
|                 with woohoo():
 | |
|                     raise stop_exc
 | |
|         except Exception as ex:
 | |
|             self.assertIs(ex, stop_exc)
 | |
|         else:
 | |
|             self.fail('StopIteration was suppressed')
 | |
| 
 | |
|     def test_contextmanager_except_pep479(self):
 | |
|         code = """\
 | |
| from __future__ import generator_stop
 | |
| from contextlib import contextmanager
 | |
| @contextmanager
 | |
| def woohoo():
 | |
|     yield
 | |
| """
 | |
|         locals = {}
 | |
|         exec(code, locals, locals)
 | |
|         woohoo = locals['woohoo']
 | |
| 
 | |
|         stop_exc = StopIteration('spam')
 | |
|         try:
 | |
|             with woohoo():
 | |
|                 raise stop_exc
 | |
|         except Exception as ex:
 | |
|             self.assertIs(ex, stop_exc)
 | |
|         else:
 | |
|             self.fail('StopIteration was suppressed')
 | |
| 
 | |
|     def test_contextmanager_do_not_unchain_non_stopiteration_exceptions(self):
 | |
|         @contextmanager
 | |
|         def test_issue29692():
 | |
|             try:
 | |
|                 yield
 | |
|             except Exception as exc:
 | |
|                 raise RuntimeError('issue29692:Chained') from exc
 | |
|         try:
 | |
|             with test_issue29692():
 | |
|                 raise ZeroDivisionError
 | |
|         except Exception as ex:
 | |
|             self.assertIs(type(ex), RuntimeError)
 | |
|             self.assertEqual(ex.args[0], 'issue29692:Chained')
 | |
|             self.assertIsInstance(ex.__cause__, ZeroDivisionError)
 | |
| 
 | |
|         try:
 | |
|             with test_issue29692():
 | |
|                 raise StopIteration('issue29692:Unchained')
 | |
|         except Exception as ex:
 | |
|             self.assertIs(type(ex), StopIteration)
 | |
|             self.assertEqual(ex.args[0], 'issue29692:Unchained')
 | |
|             self.assertIsNone(ex.__cause__)
 | |
| 
 | |
|     def _create_contextmanager_attribs(self):
 | |
|         def attribs(**kw):
 | |
|             def decorate(func):
 | |
|                 for k,v in kw.items():
 | |
|                     setattr(func,k,v)
 | |
|                 return func
 | |
|             return decorate
 | |
|         @contextmanager
 | |
|         @attribs(foo='bar')
 | |
|         def baz(spam):
 | |
|             """Whee!"""
 | |
|         return baz
 | |
| 
 | |
|     def test_contextmanager_attribs(self):
 | |
|         baz = self._create_contextmanager_attribs()
 | |
|         self.assertEqual(baz.__name__,'baz')
 | |
|         self.assertEqual(baz.foo, 'bar')
 | |
| 
 | |
|     @support.requires_docstrings
 | |
|     def test_contextmanager_doc_attrib(self):
 | |
|         baz = self._create_contextmanager_attribs()
 | |
|         self.assertEqual(baz.__doc__, "Whee!")
 | |
| 
 | |
|     @support.requires_docstrings
 | |
|     def test_instance_docstring_given_cm_docstring(self):
 | |
|         baz = self._create_contextmanager_attribs()(None)
 | |
|         self.assertEqual(baz.__doc__, "Whee!")
 | |
| 
 | |
|     def test_keywords(self):
 | |
|         # Ensure no keyword arguments are inhibited
 | |
|         @contextmanager
 | |
|         def woohoo(self, func, args, kwds):
 | |
|             yield (self, func, args, kwds)
 | |
|         with woohoo(self=11, func=22, args=33, kwds=44) as target:
 | |
|             self.assertEqual(target, (11, 22, 33, 44))
 | |
| 
 | |
|     def test_nokeepref(self):
 | |
|         class A:
 | |
|             pass
 | |
| 
 | |
|         @contextmanager
 | |
|         def woohoo(a, b):
 | |
|             a = weakref.ref(a)
 | |
|             b = weakref.ref(b)
 | |
|             self.assertIsNone(a())
 | |
|             self.assertIsNone(b())
 | |
|             yield
 | |
| 
 | |
|         with woohoo(A(), b=A()):
 | |
|             pass
 | |
| 
 | |
|     def test_param_errors(self):
 | |
|         @contextmanager
 | |
|         def woohoo(a, *, b):
 | |
|             yield
 | |
| 
 | |
|         with self.assertRaises(TypeError):
 | |
|             woohoo()
 | |
|         with self.assertRaises(TypeError):
 | |
|             woohoo(3, 5)
 | |
|         with self.assertRaises(TypeError):
 | |
|             woohoo(b=3)
 | |
| 
 | |
|     def test_recursive(self):
 | |
|         depth = 0
 | |
|         @contextmanager
 | |
|         def woohoo():
 | |
|             nonlocal depth
 | |
|             before = depth
 | |
|             depth += 1
 | |
|             yield
 | |
|             depth -= 1
 | |
|             self.assertEqual(depth, before)
 | |
| 
 | |
|         @woohoo()
 | |
|         def recursive():
 | |
|             if depth < 10:
 | |
|                 recursive()
 | |
| 
 | |
|         recursive()
 | |
|         self.assertEqual(depth, 0)
 | |
| 
 | |
| 
 | |
| class ClosingTestCase(unittest.TestCase):
 | |
| 
 | |
|     @support.requires_docstrings
 | |
|     def test_instance_docs(self):
 | |
|         # Issue 19330: ensure context manager instances have good docstrings
 | |
|         cm_docstring = closing.__doc__
 | |
|         obj = closing(None)
 | |
|         self.assertEqual(obj.__doc__, cm_docstring)
 | |
| 
 | |
|     def test_closing(self):
 | |
|         state = []
 | |
|         class C:
 | |
|             def close(self):
 | |
|                 state.append(1)
 | |
|         x = C()
 | |
|         self.assertEqual(state, [])
 | |
|         with closing(x) as y:
 | |
|             self.assertEqual(x, y)
 | |
|         self.assertEqual(state, [1])
 | |
| 
 | |
|     def test_closing_error(self):
 | |
|         state = []
 | |
|         class C:
 | |
|             def close(self):
 | |
|                 state.append(1)
 | |
|         x = C()
 | |
|         self.assertEqual(state, [])
 | |
|         with self.assertRaises(ZeroDivisionError):
 | |
|             with closing(x) as y:
 | |
|                 self.assertEqual(x, y)
 | |
|                 1 / 0
 | |
|         self.assertEqual(state, [1])
 | |
| 
 | |
| 
 | |
| class NullcontextTestCase(unittest.TestCase):
 | |
|     def test_nullcontext(self):
 | |
|         class C:
 | |
|             pass
 | |
|         c = C()
 | |
|         with nullcontext(c) as c_in:
 | |
|             self.assertIs(c_in, c)
 | |
| 
 | |
| 
 | |
| class FileContextTestCase(unittest.TestCase):
 | |
| 
 | |
|     def testWithOpen(self):
 | |
|         tfn = tempfile.mktemp()
 | |
|         try:
 | |
|             f = None
 | |
|             with open(tfn, "w") as f:
 | |
|                 self.assertFalse(f.closed)
 | |
|                 f.write("Booh\n")
 | |
|             self.assertTrue(f.closed)
 | |
|             f = None
 | |
|             with self.assertRaises(ZeroDivisionError):
 | |
|                 with open(tfn, "r") as f:
 | |
|                     self.assertFalse(f.closed)
 | |
|                     self.assertEqual(f.read(), "Booh\n")
 | |
|                     1 / 0
 | |
|             self.assertTrue(f.closed)
 | |
|         finally:
 | |
|             support.unlink(tfn)
 | |
| 
 | |
| class LockContextTestCase(unittest.TestCase):
 | |
| 
 | |
|     def boilerPlate(self, lock, locked):
 | |
|         self.assertFalse(locked())
 | |
|         with lock:
 | |
|             self.assertTrue(locked())
 | |
|         self.assertFalse(locked())
 | |
|         with self.assertRaises(ZeroDivisionError):
 | |
|             with lock:
 | |
|                 self.assertTrue(locked())
 | |
|                 1 / 0
 | |
|         self.assertFalse(locked())
 | |
| 
 | |
|     def testWithLock(self):
 | |
|         lock = threading.Lock()
 | |
|         self.boilerPlate(lock, lock.locked)
 | |
| 
 | |
|     def testWithRLock(self):
 | |
|         lock = threading.RLock()
 | |
|         self.boilerPlate(lock, lock._is_owned)
 | |
| 
 | |
|     def testWithCondition(self):
 | |
|         lock = threading.Condition()
 | |
|         def locked():
 | |
|             return lock._is_owned()
 | |
|         self.boilerPlate(lock, locked)
 | |
| 
 | |
|     def testWithSemaphore(self):
 | |
|         lock = threading.Semaphore()
 | |
|         def locked():
 | |
|             if lock.acquire(False):
 | |
|                 lock.release()
 | |
|                 return False
 | |
|             else:
 | |
|                 return True
 | |
|         self.boilerPlate(lock, locked)
 | |
| 
 | |
|     def testWithBoundedSemaphore(self):
 | |
|         lock = threading.BoundedSemaphore()
 | |
|         def locked():
 | |
|             if lock.acquire(False):
 | |
|                 lock.release()
 | |
|                 return False
 | |
|             else:
 | |
|                 return True
 | |
|         self.boilerPlate(lock, locked)
 | |
| 
 | |
| 
 | |
| class mycontext(ContextDecorator):
 | |
|     """Example decoration-compatible context manager for testing"""
 | |
|     started = False
 | |
|     exc = None
 | |
|     catch = False
 | |
| 
 | |
|     def __enter__(self):
 | |
|         self.started = True
 | |
|         return self
 | |
| 
 | |
|     def __exit__(self, *exc):
 | |
|         self.exc = exc
 | |
|         return self.catch
 | |
| 
 | |
| 
 | |
| class TestContextDecorator(unittest.TestCase):
 | |
| 
 | |
|     @support.requires_docstrings
 | |
|     def test_instance_docs(self):
 | |
|         # Issue 19330: ensure context manager instances have good docstrings
 | |
|         cm_docstring = mycontext.__doc__
 | |
|         obj = mycontext()
 | |
|         self.assertEqual(obj.__doc__, cm_docstring)
 | |
| 
 | |
|     def test_contextdecorator(self):
 | |
|         context = mycontext()
 | |
|         with context as result:
 | |
|             self.assertIs(result, context)
 | |
|             self.assertTrue(context.started)
 | |
| 
 | |
|         self.assertEqual(context.exc, (None, None, None))
 | |
| 
 | |
| 
 | |
|     def test_contextdecorator_with_exception(self):
 | |
|         context = mycontext()
 | |
| 
 | |
|         with self.assertRaisesRegex(NameError, 'foo'):
 | |
|             with context:
 | |
|                 raise NameError('foo')
 | |
|         self.assertIsNotNone(context.exc)
 | |
|         self.assertIs(context.exc[0], NameError)
 | |
| 
 | |
|         context = mycontext()
 | |
|         context.catch = True
 | |
|         with context:
 | |
|             raise NameError('foo')
 | |
|         self.assertIsNotNone(context.exc)
 | |
|         self.assertIs(context.exc[0], NameError)
 | |
| 
 | |
| 
 | |
|     def test_decorator(self):
 | |
|         context = mycontext()
 | |
| 
 | |
|         @context
 | |
|         def test():
 | |
|             self.assertIsNone(context.exc)
 | |
|             self.assertTrue(context.started)
 | |
|         test()
 | |
|         self.assertEqual(context.exc, (None, None, None))
 | |
| 
 | |
| 
 | |
|     def test_decorator_with_exception(self):
 | |
|         context = mycontext()
 | |
| 
 | |
|         @context
 | |
|         def test():
 | |
|             self.assertIsNone(context.exc)
 | |
|             self.assertTrue(context.started)
 | |
|             raise NameError('foo')
 | |
| 
 | |
|         with self.assertRaisesRegex(NameError, 'foo'):
 | |
|             test()
 | |
|         self.assertIsNotNone(context.exc)
 | |
|         self.assertIs(context.exc[0], NameError)
 | |
| 
 | |
| 
 | |
|     def test_decorating_method(self):
 | |
|         context = mycontext()
 | |
| 
 | |
|         class Test(object):
 | |
| 
 | |
|             @context
 | |
|             def method(self, a, b, c=None):
 | |
|                 self.a = a
 | |
|                 self.b = b
 | |
|                 self.c = c
 | |
| 
 | |
|         # these tests are for argument passing when used as a decorator
 | |
|         test = Test()
 | |
|         test.method(1, 2)
 | |
|         self.assertEqual(test.a, 1)
 | |
|         self.assertEqual(test.b, 2)
 | |
|         self.assertEqual(test.c, None)
 | |
| 
 | |
|         test = Test()
 | |
|         test.method('a', 'b', 'c')
 | |
|         self.assertEqual(test.a, 'a')
 | |
|         self.assertEqual(test.b, 'b')
 | |
|         self.assertEqual(test.c, 'c')
 | |
| 
 | |
|         test = Test()
 | |
|         test.method(a=1, b=2)
 | |
|         self.assertEqual(test.a, 1)
 | |
|         self.assertEqual(test.b, 2)
 | |
| 
 | |
| 
 | |
|     def test_typo_enter(self):
 | |
|         class mycontext(ContextDecorator):
 | |
|             def __unter__(self):
 | |
|                 pass
 | |
|             def __exit__(self, *exc):
 | |
|                 pass
 | |
| 
 | |
|         with self.assertRaises(AttributeError):
 | |
|             with mycontext():
 | |
|                 pass
 | |
| 
 | |
| 
 | |
|     def test_typo_exit(self):
 | |
|         class mycontext(ContextDecorator):
 | |
|             def __enter__(self):
 | |
|                 pass
 | |
|             def __uxit__(self, *exc):
 | |
|                 pass
 | |
| 
 | |
|         with self.assertRaises(AttributeError):
 | |
|             with mycontext():
 | |
|                 pass
 | |
| 
 | |
| 
 | |
|     def test_contextdecorator_as_mixin(self):
 | |
|         class somecontext(object):
 | |
|             started = False
 | |
|             exc = None
 | |
| 
 | |
|             def __enter__(self):
 | |
|                 self.started = True
 | |
|                 return self
 | |
| 
 | |
|             def __exit__(self, *exc):
 | |
|                 self.exc = exc
 | |
| 
 | |
|         class mycontext(somecontext, ContextDecorator):
 | |
|             pass
 | |
| 
 | |
|         context = mycontext()
 | |
|         @context
 | |
|         def test():
 | |
|             self.assertIsNone(context.exc)
 | |
|             self.assertTrue(context.started)
 | |
|         test()
 | |
|         self.assertEqual(context.exc, (None, None, None))
 | |
| 
 | |
| 
 | |
|     def test_contextmanager_as_decorator(self):
 | |
|         @contextmanager
 | |
|         def woohoo(y):
 | |
|             state.append(y)
 | |
|             yield
 | |
|             state.append(999)
 | |
| 
 | |
|         state = []
 | |
|         @woohoo(1)
 | |
|         def test(x):
 | |
|             self.assertEqual(state, [1])
 | |
|             state.append(x)
 | |
|         test('something')
 | |
|         self.assertEqual(state, [1, 'something', 999])
 | |
| 
 | |
|         # Issue #11647: Ensure the decorated function is 'reusable'
 | |
|         state = []
 | |
|         test('something else')
 | |
|         self.assertEqual(state, [1, 'something else', 999])
 | |
| 
 | |
| 
 | |
| class TestBaseExitStack:
 | |
|     exit_stack = None
 | |
| 
 | |
|     @support.requires_docstrings
 | |
|     def test_instance_docs(self):
 | |
|         # Issue 19330: ensure context manager instances have good docstrings
 | |
|         cm_docstring = self.exit_stack.__doc__
 | |
|         obj = self.exit_stack()
 | |
|         self.assertEqual(obj.__doc__, cm_docstring)
 | |
| 
 | |
|     def test_no_resources(self):
 | |
|         with self.exit_stack():
 | |
|             pass
 | |
| 
 | |
|     def test_callback(self):
 | |
|         expected = [
 | |
|             ((), {}),
 | |
|             ((1,), {}),
 | |
|             ((1,2), {}),
 | |
|             ((), dict(example=1)),
 | |
|             ((1,), dict(example=1)),
 | |
|             ((1,2), dict(example=1)),
 | |
|             ((1,2), dict(self=3, callback=4)),
 | |
|         ]
 | |
|         result = []
 | |
|         def _exit(*args, **kwds):
 | |
|             """Test metadata propagation"""
 | |
|             result.append((args, kwds))
 | |
|         with self.exit_stack() as stack:
 | |
|             for args, kwds in reversed(expected):
 | |
|                 if args and kwds:
 | |
|                     f = stack.callback(_exit, *args, **kwds)
 | |
|                 elif args:
 | |
|                     f = stack.callback(_exit, *args)
 | |
|                 elif kwds:
 | |
|                     f = stack.callback(_exit, **kwds)
 | |
|                 else:
 | |
|                     f = stack.callback(_exit)
 | |
|                 self.assertIs(f, _exit)
 | |
|             for wrapper in stack._exit_callbacks:
 | |
|                 self.assertIs(wrapper[1].__wrapped__, _exit)
 | |
|                 self.assertNotEqual(wrapper[1].__name__, _exit.__name__)
 | |
|                 self.assertIsNone(wrapper[1].__doc__, _exit.__doc__)
 | |
|         self.assertEqual(result, expected)
 | |
| 
 | |
|         result = []
 | |
|         with self.exit_stack() as stack:
 | |
|             with self.assertRaises(TypeError):
 | |
|                 stack.callback(arg=1)
 | |
|             with self.assertRaises(TypeError):
 | |
|                 self.exit_stack.callback(arg=2)
 | |
|             with self.assertRaises(TypeError):
 | |
|                 stack.callback(callback=_exit, arg=3)
 | |
|         self.assertEqual(result, [])
 | |
| 
 | |
|     def test_push(self):
 | |
|         exc_raised = ZeroDivisionError
 | |
|         def _expect_exc(exc_type, exc, exc_tb):
 | |
|             self.assertIs(exc_type, exc_raised)
 | |
|         def _suppress_exc(*exc_details):
 | |
|             return True
 | |
|         def _expect_ok(exc_type, exc, exc_tb):
 | |
|             self.assertIsNone(exc_type)
 | |
|             self.assertIsNone(exc)
 | |
|             self.assertIsNone(exc_tb)
 | |
|         class ExitCM(object):
 | |
|             def __init__(self, check_exc):
 | |
|                 self.check_exc = check_exc
 | |
|             def __enter__(self):
 | |
|                 self.fail("Should not be called!")
 | |
|             def __exit__(self, *exc_details):
 | |
|                 self.check_exc(*exc_details)
 | |
|         with self.exit_stack() as stack:
 | |
|             stack.push(_expect_ok)
 | |
|             self.assertIs(stack._exit_callbacks[-1][1], _expect_ok)
 | |
|             cm = ExitCM(_expect_ok)
 | |
|             stack.push(cm)
 | |
|             self.assertIs(stack._exit_callbacks[-1][1].__self__, cm)
 | |
|             stack.push(_suppress_exc)
 | |
|             self.assertIs(stack._exit_callbacks[-1][1], _suppress_exc)
 | |
|             cm = ExitCM(_expect_exc)
 | |
|             stack.push(cm)
 | |
|             self.assertIs(stack._exit_callbacks[-1][1].__self__, cm)
 | |
|             stack.push(_expect_exc)
 | |
|             self.assertIs(stack._exit_callbacks[-1][1], _expect_exc)
 | |
|             stack.push(_expect_exc)
 | |
|             self.assertIs(stack._exit_callbacks[-1][1], _expect_exc)
 | |
|             1/0
 | |
| 
 | |
|     def test_enter_context(self):
 | |
|         class TestCM(object):
 | |
|             def __enter__(self):
 | |
|                 result.append(1)
 | |
|             def __exit__(self, *exc_details):
 | |
|                 result.append(3)
 | |
| 
 | |
|         result = []
 | |
|         cm = TestCM()
 | |
|         with self.exit_stack() as stack:
 | |
|             @stack.callback  # Registered first => cleaned up last
 | |
|             def _exit():
 | |
|                 result.append(4)
 | |
|             self.assertIsNotNone(_exit)
 | |
|             stack.enter_context(cm)
 | |
|             self.assertIs(stack._exit_callbacks[-1][1].__self__, cm)
 | |
|             result.append(2)
 | |
|         self.assertEqual(result, [1, 2, 3, 4])
 | |
| 
 | |
|     def test_close(self):
 | |
|         result = []
 | |
|         with self.exit_stack() as stack:
 | |
|             @stack.callback
 | |
|             def _exit():
 | |
|                 result.append(1)
 | |
|             self.assertIsNotNone(_exit)
 | |
|             stack.close()
 | |
|             result.append(2)
 | |
|         self.assertEqual(result, [1, 2])
 | |
| 
 | |
|     def test_pop_all(self):
 | |
|         result = []
 | |
|         with self.exit_stack() as stack:
 | |
|             @stack.callback
 | |
|             def _exit():
 | |
|                 result.append(3)
 | |
|             self.assertIsNotNone(_exit)
 | |
|             new_stack = stack.pop_all()
 | |
|             result.append(1)
 | |
|         result.append(2)
 | |
|         new_stack.close()
 | |
|         self.assertEqual(result, [1, 2, 3])
 | |
| 
 | |
|     def test_exit_raise(self):
 | |
|         with self.assertRaises(ZeroDivisionError):
 | |
|             with self.exit_stack() as stack:
 | |
|                 stack.push(lambda *exc: False)
 | |
|                 1/0
 | |
| 
 | |
|     def test_exit_suppress(self):
 | |
|         with self.exit_stack() as stack:
 | |
|             stack.push(lambda *exc: True)
 | |
|             1/0
 | |
| 
 | |
|     def test_exit_exception_chaining_reference(self):
 | |
|         # Sanity check to make sure that ExitStack chaining matches
 | |
|         # actual nested with statements
 | |
|         class RaiseExc:
 | |
|             def __init__(self, exc):
 | |
|                 self.exc = exc
 | |
|             def __enter__(self):
 | |
|                 return self
 | |
|             def __exit__(self, *exc_details):
 | |
|                 raise self.exc
 | |
| 
 | |
|         class RaiseExcWithContext:
 | |
|             def __init__(self, outer, inner):
 | |
|                 self.outer = outer
 | |
|                 self.inner = inner
 | |
|             def __enter__(self):
 | |
|                 return self
 | |
|             def __exit__(self, *exc_details):
 | |
|                 try:
 | |
|                     raise self.inner
 | |
|                 except:
 | |
|                     raise self.outer
 | |
| 
 | |
|         class SuppressExc:
 | |
|             def __enter__(self):
 | |
|                 return self
 | |
|             def __exit__(self, *exc_details):
 | |
|                 type(self).saved_details = exc_details
 | |
|                 return True
 | |
| 
 | |
|         try:
 | |
|             with RaiseExc(IndexError):
 | |
|                 with RaiseExcWithContext(KeyError, AttributeError):
 | |
|                     with SuppressExc():
 | |
|                         with RaiseExc(ValueError):
 | |
|                             1 / 0
 | |
|         except IndexError as exc:
 | |
|             self.assertIsInstance(exc.__context__, KeyError)
 | |
|             self.assertIsInstance(exc.__context__.__context__, AttributeError)
 | |
|             # Inner exceptions were suppressed
 | |
|             self.assertIsNone(exc.__context__.__context__.__context__)
 | |
|         else:
 | |
|             self.fail("Expected IndexError, but no exception was raised")
 | |
|         # Check the inner exceptions
 | |
|         inner_exc = SuppressExc.saved_details[1]
 | |
|         self.assertIsInstance(inner_exc, ValueError)
 | |
|         self.assertIsInstance(inner_exc.__context__, ZeroDivisionError)
 | |
| 
 | |
|     def test_exit_exception_chaining(self):
 | |
|         # Ensure exception chaining matches the reference behaviour
 | |
|         def raise_exc(exc):
 | |
|             raise exc
 | |
| 
 | |
|         saved_details = None
 | |
|         def suppress_exc(*exc_details):
 | |
|             nonlocal saved_details
 | |
|             saved_details = exc_details
 | |
|             return True
 | |
| 
 | |
|         try:
 | |
|             with self.exit_stack() as stack:
 | |
|                 stack.callback(raise_exc, IndexError)
 | |
|                 stack.callback(raise_exc, KeyError)
 | |
|                 stack.callback(raise_exc, AttributeError)
 | |
|                 stack.push(suppress_exc)
 | |
|                 stack.callback(raise_exc, ValueError)
 | |
|                 1 / 0
 | |
|         except IndexError as exc:
 | |
|             self.assertIsInstance(exc.__context__, KeyError)
 | |
|             self.assertIsInstance(exc.__context__.__context__, AttributeError)
 | |
|             # Inner exceptions were suppressed
 | |
|             self.assertIsNone(exc.__context__.__context__.__context__)
 | |
|         else:
 | |
|             self.fail("Expected IndexError, but no exception was raised")
 | |
|         # Check the inner exceptions
 | |
|         inner_exc = saved_details[1]
 | |
|         self.assertIsInstance(inner_exc, ValueError)
 | |
|         self.assertIsInstance(inner_exc.__context__, ZeroDivisionError)
 | |
| 
 | |
|     def test_exit_exception_non_suppressing(self):
 | |
|         # http://bugs.python.org/issue19092
 | |
|         def raise_exc(exc):
 | |
|             raise exc
 | |
| 
 | |
|         def suppress_exc(*exc_details):
 | |
|             return True
 | |
| 
 | |
|         try:
 | |
|             with self.exit_stack() as stack:
 | |
|                 stack.callback(lambda: None)
 | |
|                 stack.callback(raise_exc, IndexError)
 | |
|         except Exception as exc:
 | |
|             self.assertIsInstance(exc, IndexError)
 | |
|         else:
 | |
|             self.fail("Expected IndexError, but no exception was raised")
 | |
| 
 | |
|         try:
 | |
|             with self.exit_stack() as stack:
 | |
|                 stack.callback(raise_exc, KeyError)
 | |
|                 stack.push(suppress_exc)
 | |
|                 stack.callback(raise_exc, IndexError)
 | |
|         except Exception as exc:
 | |
|             self.assertIsInstance(exc, KeyError)
 | |
|         else:
 | |
|             self.fail("Expected KeyError, but no exception was raised")
 | |
| 
 | |
|     def test_exit_exception_with_correct_context(self):
 | |
|         # http://bugs.python.org/issue20317
 | |
|         @contextmanager
 | |
|         def gets_the_context_right(exc):
 | |
|             try:
 | |
|                 yield
 | |
|             finally:
 | |
|                 raise exc
 | |
| 
 | |
|         exc1 = Exception(1)
 | |
|         exc2 = Exception(2)
 | |
|         exc3 = Exception(3)
 | |
|         exc4 = Exception(4)
 | |
| 
 | |
|         # The contextmanager already fixes the context, so prior to the
 | |
|         # fix, ExitStack would try to fix it *again* and get into an
 | |
|         # infinite self-referential loop
 | |
|         try:
 | |
|             with self.exit_stack() as stack:
 | |
|                 stack.enter_context(gets_the_context_right(exc4))
 | |
|                 stack.enter_context(gets_the_context_right(exc3))
 | |
|                 stack.enter_context(gets_the_context_right(exc2))
 | |
|                 raise exc1
 | |
|         except Exception as exc:
 | |
|             self.assertIs(exc, exc4)
 | |
|             self.assertIs(exc.__context__, exc3)
 | |
|             self.assertIs(exc.__context__.__context__, exc2)
 | |
|             self.assertIs(exc.__context__.__context__.__context__, exc1)
 | |
|             self.assertIsNone(
 | |
|                        exc.__context__.__context__.__context__.__context__)
 | |
| 
 | |
|     def test_exit_exception_with_existing_context(self):
 | |
|         # Addresses a lack of test coverage discovered after checking in a
 | |
|         # fix for issue 20317 that still contained debugging code.
 | |
|         def raise_nested(inner_exc, outer_exc):
 | |
|             try:
 | |
|                 raise inner_exc
 | |
|             finally:
 | |
|                 raise outer_exc
 | |
|         exc1 = Exception(1)
 | |
|         exc2 = Exception(2)
 | |
|         exc3 = Exception(3)
 | |
|         exc4 = Exception(4)
 | |
|         exc5 = Exception(5)
 | |
|         try:
 | |
|             with self.exit_stack() as stack:
 | |
|                 stack.callback(raise_nested, exc4, exc5)
 | |
|                 stack.callback(raise_nested, exc2, exc3)
 | |
|                 raise exc1
 | |
|         except Exception as exc:
 | |
|             self.assertIs(exc, exc5)
 | |
|             self.assertIs(exc.__context__, exc4)
 | |
|             self.assertIs(exc.__context__.__context__, exc3)
 | |
|             self.assertIs(exc.__context__.__context__.__context__, exc2)
 | |
|             self.assertIs(
 | |
|                  exc.__context__.__context__.__context__.__context__, exc1)
 | |
|             self.assertIsNone(
 | |
|                 exc.__context__.__context__.__context__.__context__.__context__)
 | |
| 
 | |
|     def test_body_exception_suppress(self):
 | |
|         def suppress_exc(*exc_details):
 | |
|             return True
 | |
|         try:
 | |
|             with self.exit_stack() as stack:
 | |
|                 stack.push(suppress_exc)
 | |
|                 1/0
 | |
|         except IndexError as exc:
 | |
|             self.fail("Expected no exception, got IndexError")
 | |
| 
 | |
|     def test_exit_exception_chaining_suppress(self):
 | |
|         with self.exit_stack() as stack:
 | |
|             stack.push(lambda *exc: True)
 | |
|             stack.push(lambda *exc: 1/0)
 | |
|             stack.push(lambda *exc: {}[1])
 | |
| 
 | |
|     def test_excessive_nesting(self):
 | |
|         # The original implementation would die with RecursionError here
 | |
|         with self.exit_stack() as stack:
 | |
|             for i in range(10000):
 | |
|                 stack.callback(int)
 | |
| 
 | |
|     def test_instance_bypass(self):
 | |
|         class Example(object): pass
 | |
|         cm = Example()
 | |
|         cm.__exit__ = object()
 | |
|         stack = self.exit_stack()
 | |
|         self.assertRaises(AttributeError, stack.enter_context, cm)
 | |
|         stack.push(cm)
 | |
|         self.assertIs(stack._exit_callbacks[-1][1], cm)
 | |
| 
 | |
|     def test_dont_reraise_RuntimeError(self):
 | |
|         # https://bugs.python.org/issue27122
 | |
|         class UniqueException(Exception): pass
 | |
|         class UniqueRuntimeError(RuntimeError): pass
 | |
| 
 | |
|         @contextmanager
 | |
|         def second():
 | |
|             try:
 | |
|                 yield 1
 | |
|             except Exception as exc:
 | |
|                 raise UniqueException("new exception") from exc
 | |
| 
 | |
|         @contextmanager
 | |
|         def first():
 | |
|             try:
 | |
|                 yield 1
 | |
|             except Exception as exc:
 | |
|                 raise exc
 | |
| 
 | |
|         # The UniqueRuntimeError should be caught by second()'s exception
 | |
|         # handler which chain raised a new UniqueException.
 | |
|         with self.assertRaises(UniqueException) as err_ctx:
 | |
|             with self.exit_stack() as es_ctx:
 | |
|                 es_ctx.enter_context(second())
 | |
|                 es_ctx.enter_context(first())
 | |
|                 raise UniqueRuntimeError("please no infinite loop.")
 | |
| 
 | |
|         exc = err_ctx.exception
 | |
|         self.assertIsInstance(exc, UniqueException)
 | |
|         self.assertIsInstance(exc.__context__, UniqueRuntimeError)
 | |
|         self.assertIsNone(exc.__context__.__context__)
 | |
|         self.assertIsNone(exc.__context__.__cause__)
 | |
|         self.assertIs(exc.__cause__, exc.__context__)
 | |
| 
 | |
| 
 | |
| class TestExitStack(TestBaseExitStack, unittest.TestCase):
 | |
|     exit_stack = ExitStack
 | |
| 
 | |
| 
 | |
| class TestRedirectStream:
 | |
| 
 | |
|     redirect_stream = None
 | |
|     orig_stream = None
 | |
| 
 | |
|     @support.requires_docstrings
 | |
|     def test_instance_docs(self):
 | |
|         # Issue 19330: ensure context manager instances have good docstrings
 | |
|         cm_docstring = self.redirect_stream.__doc__
 | |
|         obj = self.redirect_stream(None)
 | |
|         self.assertEqual(obj.__doc__, cm_docstring)
 | |
| 
 | |
|     def test_no_redirect_in_init(self):
 | |
|         orig_stdout = getattr(sys, self.orig_stream)
 | |
|         self.redirect_stream(None)
 | |
|         self.assertIs(getattr(sys, self.orig_stream), orig_stdout)
 | |
| 
 | |
|     def test_redirect_to_string_io(self):
 | |
|         f = io.StringIO()
 | |
|         msg = "Consider an API like help(), which prints directly to stdout"
 | |
|         orig_stdout = getattr(sys, self.orig_stream)
 | |
|         with self.redirect_stream(f):
 | |
|             print(msg, file=getattr(sys, self.orig_stream))
 | |
|         self.assertIs(getattr(sys, self.orig_stream), orig_stdout)
 | |
|         s = f.getvalue().strip()
 | |
|         self.assertEqual(s, msg)
 | |
| 
 | |
|     def test_enter_result_is_target(self):
 | |
|         f = io.StringIO()
 | |
|         with self.redirect_stream(f) as enter_result:
 | |
|             self.assertIs(enter_result, f)
 | |
| 
 | |
|     def test_cm_is_reusable(self):
 | |
|         f = io.StringIO()
 | |
|         write_to_f = self.redirect_stream(f)
 | |
|         orig_stdout = getattr(sys, self.orig_stream)
 | |
|         with write_to_f:
 | |
|             print("Hello", end=" ", file=getattr(sys, self.orig_stream))
 | |
|         with write_to_f:
 | |
|             print("World!", file=getattr(sys, self.orig_stream))
 | |
|         self.assertIs(getattr(sys, self.orig_stream), orig_stdout)
 | |
|         s = f.getvalue()
 | |
|         self.assertEqual(s, "Hello World!\n")
 | |
| 
 | |
|     def test_cm_is_reentrant(self):
 | |
|         f = io.StringIO()
 | |
|         write_to_f = self.redirect_stream(f)
 | |
|         orig_stdout = getattr(sys, self.orig_stream)
 | |
|         with write_to_f:
 | |
|             print("Hello", end=" ", file=getattr(sys, self.orig_stream))
 | |
|             with write_to_f:
 | |
|                 print("World!", file=getattr(sys, self.orig_stream))
 | |
|         self.assertIs(getattr(sys, self.orig_stream), orig_stdout)
 | |
|         s = f.getvalue()
 | |
|         self.assertEqual(s, "Hello World!\n")
 | |
| 
 | |
| 
 | |
| class TestRedirectStdout(TestRedirectStream, unittest.TestCase):
 | |
| 
 | |
|     redirect_stream = redirect_stdout
 | |
|     orig_stream = "stdout"
 | |
| 
 | |
| 
 | |
| class TestRedirectStderr(TestRedirectStream, unittest.TestCase):
 | |
| 
 | |
|     redirect_stream = redirect_stderr
 | |
|     orig_stream = "stderr"
 | |
| 
 | |
| 
 | |
| class TestSuppress(unittest.TestCase):
 | |
| 
 | |
|     @support.requires_docstrings
 | |
|     def test_instance_docs(self):
 | |
|         # Issue 19330: ensure context manager instances have good docstrings
 | |
|         cm_docstring = suppress.__doc__
 | |
|         obj = suppress()
 | |
|         self.assertEqual(obj.__doc__, cm_docstring)
 | |
| 
 | |
|     def test_no_result_from_enter(self):
 | |
|         with suppress(ValueError) as enter_result:
 | |
|             self.assertIsNone(enter_result)
 | |
| 
 | |
|     def test_no_exception(self):
 | |
|         with suppress(ValueError):
 | |
|             self.assertEqual(pow(2, 5), 32)
 | |
| 
 | |
|     def test_exact_exception(self):
 | |
|         with suppress(TypeError):
 | |
|             len(5)
 | |
| 
 | |
|     def test_exception_hierarchy(self):
 | |
|         with suppress(LookupError):
 | |
|             'Hello'[50]
 | |
| 
 | |
|     def test_other_exception(self):
 | |
|         with self.assertRaises(ZeroDivisionError):
 | |
|             with suppress(TypeError):
 | |
|                 1/0
 | |
| 
 | |
|     def test_no_args(self):
 | |
|         with self.assertRaises(ZeroDivisionError):
 | |
|             with suppress():
 | |
|                 1/0
 | |
| 
 | |
|     def test_multiple_exception_args(self):
 | |
|         with suppress(ZeroDivisionError, TypeError):
 | |
|             1/0
 | |
|         with suppress(ZeroDivisionError, TypeError):
 | |
|             len(5)
 | |
| 
 | |
|     def test_cm_is_reentrant(self):
 | |
|         ignore_exceptions = suppress(Exception)
 | |
|         with ignore_exceptions:
 | |
|             pass
 | |
|         with ignore_exceptions:
 | |
|             len(5)
 | |
|         with ignore_exceptions:
 | |
|             with ignore_exceptions: # Check nested usage
 | |
|                 len(5)
 | |
|             outer_continued = True
 | |
|             1/0
 | |
|         self.assertTrue(outer_continued)
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     unittest.main()
 |