| |
|
|
| |
| |
| |
| |
|
|
| import atexit |
| import os |
| import sys |
| import __main__ |
| from contextlib import suppress |
| from io import BytesIO |
|
|
| import dill |
|
|
| session_file = os.path.join(os.path.dirname(__file__), 'session-refimported-%s.pkl') |
|
|
| |
| |
| |
|
|
| def _error_line(error, obj, refimported): |
| import traceback |
| line = traceback.format_exc().splitlines()[-2].replace('[obj]', '['+repr(obj)+']') |
| return "while testing (with refimported=%s): %s" % (refimported, line.lstrip()) |
|
|
| if __name__ == '__main__' and len(sys.argv) >= 3 and sys.argv[1] == '--child': |
| |
| refimported = (sys.argv[2] == 'True') |
| dill.load_module(session_file % refimported, module='__main__') |
|
|
| def test_modules(refimported): |
| |
| |
| |
| |
| |
| |
| try: |
| for obj in ('json', 'url', 'local_mod', 'sax', 'dom'): |
| assert globals()[obj].__name__ in sys.modules |
| assert 'calendar' in sys.modules and 'cmath' in sys.modules |
| import calendar, cmath |
|
|
| for obj in ('Calendar', 'isleap'): |
| assert globals()[obj] is sys.modules['calendar'].__dict__[obj] |
| assert __main__.day_name.__module__ == 'calendar' |
| if refimported: |
| assert __main__.day_name is calendar.day_name |
|
|
| assert __main__.complex_log is cmath.log |
|
|
| except AssertionError as error: |
| error.args = (_error_line(error, obj, refimported),) |
| raise |
|
|
| test_modules(refimported) |
| sys.exit() |
|
|
| |
| |
| |
|
|
| |
|
|
| |
| import json |
| import urllib as url |
| from xml import sax |
| import xml.dom.minidom as dom |
| import test_dictviews as local_mod |
|
|
| |
| from calendar import Calendar, isleap, day_name |
| from cmath import log as complex_log |
|
|
| |
| x = 17 |
| empty = None |
| names = ['Alice', 'Bob', 'Carol'] |
| def squared(x): return x**2 |
| cubed = lambda x: x**3 |
| class Person: |
| def __init__(self, name, age): |
| self.name = name |
| self.age = age |
| person = Person(names[0], x) |
| class CalendarSubclass(Calendar): |
| def weekdays(self): |
| return [day_name[i] for i in self.iterweekdays()] |
| cal = CalendarSubclass() |
| selfref = __main__ |
|
|
| |
| class TestNamespace: |
| test_globals = globals().copy() |
| def __init__(self, **extra): |
| self.extra = extra |
| def __enter__(self): |
| self.backup = globals().copy() |
| globals().clear() |
| globals().update(self.test_globals) |
| globals().update(self.extra) |
| return self |
| def __exit__(self, *exc_info): |
| globals().clear() |
| globals().update(self.backup) |
|
|
| def _clean_up_cache(module): |
| cached = module.__file__.split('.', 1)[0] + '.pyc' |
| cached = module.__cached__ if hasattr(module, '__cached__') else cached |
| pycache = os.path.join(os.path.dirname(module.__file__), '__pycache__') |
| for remove, file in [(os.remove, cached), (os.removedirs, pycache)]: |
| with suppress(OSError): |
| remove(file) |
|
|
| atexit.register(_clean_up_cache, local_mod) |
|
|
| def _test_objects(main, globals_copy, refimported): |
| try: |
| main_dict = __main__.__dict__ |
| global Person, person, Calendar, CalendarSubclass, cal, selfref |
|
|
| for obj in ('json', 'url', 'local_mod', 'sax', 'dom'): |
| assert globals()[obj].__name__ == globals_copy[obj].__name__ |
|
|
| for obj in ('x', 'empty', 'names'): |
| assert main_dict[obj] == globals_copy[obj] |
|
|
| for obj in ['squared', 'cubed']: |
| assert main_dict[obj].__globals__ is main_dict |
| assert main_dict[obj](3) == globals_copy[obj](3) |
|
|
| assert Person.__module__ == __main__.__name__ |
| assert isinstance(person, Person) |
| assert person.age == globals_copy['person'].age |
|
|
| assert issubclass(CalendarSubclass, Calendar) |
| assert isinstance(cal, CalendarSubclass) |
| assert cal.weekdays() == globals_copy['cal'].weekdays() |
|
|
| assert selfref is __main__ |
|
|
| except AssertionError as error: |
| error.args = (_error_line(error, obj, refimported),) |
| raise |
|
|
| def test_session_main(refimported): |
| """test dump/load_module() for __main__, both in this process and in a subprocess""" |
| extra_objects = {} |
| if refimported: |
| |
| from sys import flags |
| extra_objects['flags'] = flags |
|
|
| with TestNamespace(**extra_objects) as ns: |
| try: |
| |
| dill.dump_module(session_file % refimported, refimported=refimported) |
| from dill.tests.__main__ import python, shell, sp |
| error = sp.call([python, __file__, '--child', str(refimported)], shell=shell) |
| if error: sys.exit(error) |
| finally: |
| with suppress(OSError): |
| os.remove(session_file % refimported) |
|
|
| |
| session_buffer = BytesIO() |
| dill.dump_module(session_buffer, refimported=refimported) |
| session_buffer.seek(0) |
| dill.load_module(session_buffer, module='__main__') |
| ns.backup['_test_objects'](__main__, ns.backup, refimported) |
|
|
| def test_session_other(): |
| """test dump/load_module() for a module other than __main__""" |
| import test_classdef as module |
| atexit.register(_clean_up_cache, module) |
| module.selfref = module |
| dict_objects = [obj for obj in module.__dict__.keys() if not obj.startswith('__')] |
|
|
| session_buffer = BytesIO() |
| dill.dump_module(session_buffer, module) |
|
|
| for obj in dict_objects: |
| del module.__dict__[obj] |
|
|
| session_buffer.seek(0) |
| dill.load_module(session_buffer, module) |
|
|
| assert all(obj in module.__dict__ for obj in dict_objects) |
| assert module.selfref is module |
|
|
| def test_runtime_module(): |
| from types import ModuleType |
| modname = '__runtime__' |
| runtime = ModuleType(modname) |
| runtime.x = 42 |
|
|
| mod = dill.session._stash_modules(runtime) |
| if mod is not runtime: |
| print("There are objects to save by referenece that shouldn't be:", |
| mod.__dill_imported, mod.__dill_imported_as, mod.__dill_imported_top_level, |
| file=sys.stderr) |
|
|
| |
| |
| |
| session_buffer = BytesIO() |
| dill.dump_module(session_buffer, module=runtime, refimported=True) |
| session_dump = session_buffer.getvalue() |
|
|
| |
| runtime = ModuleType(modname) |
| return_val = dill.load_module(BytesIO(session_dump), module=runtime) |
| assert return_val is None |
| assert runtime.__name__ == modname |
| assert runtime.x == 42 |
| assert runtime not in sys.modules.values() |
|
|
| |
| session_buffer.seek(0) |
| runtime = dill.load_module(BytesIO(session_dump)) |
| assert runtime.__name__ == modname |
| assert runtime.x == 42 |
| assert runtime not in sys.modules.values() |
|
|
| def test_refimported_imported_as(): |
| import collections |
| import concurrent.futures |
| import types |
| import typing |
| mod = sys.modules['__test__'] = types.ModuleType('__test__') |
| dill.executor = concurrent.futures.ThreadPoolExecutor(max_workers=1) |
| mod.Dict = collections.UserDict |
| mod.AsyncCM = typing.AsyncContextManager |
| mod.thread_exec = dill.executor |
|
|
| session_buffer = BytesIO() |
| dill.dump_module(session_buffer, mod, refimported=True) |
| session_buffer.seek(0) |
| mod = dill.load(session_buffer) |
| del sys.modules['__test__'] |
|
|
| assert set(mod.__dill_imported_as) == { |
| ('collections', 'UserDict', 'Dict'), |
| ('typing', 'AsyncContextManager', 'AsyncCM'), |
| ('dill', 'executor', 'thread_exec'), |
| } |
|
|
| def test_load_module_asdict(): |
| with TestNamespace(): |
| session_buffer = BytesIO() |
| dill.dump_module(session_buffer) |
|
|
| global empty, names, x, y |
| x = y = 0 |
| del empty |
| globals_state = globals().copy() |
|
|
| session_buffer.seek(0) |
| main_vars = dill.load_module_asdict(session_buffer) |
|
|
| assert main_vars is not globals() |
| assert globals() == globals_state |
|
|
| assert main_vars['__name__'] == '__main__' |
| assert main_vars['names'] == names |
| assert main_vars['names'] is not names |
| assert main_vars['x'] != x |
| assert 'y' not in main_vars |
| assert 'empty' in main_vars |
|
|
| if __name__ == '__main__': |
| test_session_main(refimported=False) |
| test_session_main(refimported=True) |
| test_session_other() |
| test_runtime_module() |
| test_refimported_imported_as() |
| test_load_module_asdict() |
|
|