import contextlib import os import py_compile import shutil import sys import tempfile import threading import time import types import weakref import zipfile from importlib import import_module from pathlib import Path from subprocess import CompletedProcess from unittest import mock, skip, skipIf from django.apps.registry import Apps from django.test import SimpleTestCase from django.test.utils import extend_sys_path from django.utils import autoreload from django.utils.autoreload import WatchmanUnavailable from .utils import on_macos_with_hfs class TestIterModulesAndFiles(SimpleTestCase): def import_and_cleanup(self, name): import_module(name) self.addCleanup(lambda: sys.path_importer_cache.clear()) self.addCleanup(lambda: sys.modules.pop(name, None)) def clear_autoreload_caches(self): autoreload.iter_modules_and_files.cache_clear() def assertFileFound(self, filename): # Some temp directories are symlinks. Python resolves these fully while # importing. resolved_filename = filename.resolve() self.clear_autoreload_caches() # Test uncached access self.assertIn(resolved_filename, list(autoreload.iter_all_python_module_files())) # Test cached access self.assertIn(resolved_filename, list(autoreload.iter_all_python_module_files())) self.assertEqual(autoreload.iter_modules_and_files.cache_info().hits, 1) def assertFileNotFound(self, filename): resolved_filename = filename.resolve() self.clear_autoreload_caches() # Test uncached access self.assertNotIn(resolved_filename, list(autoreload.iter_all_python_module_files())) # Test cached access self.assertNotIn(resolved_filename, list(autoreload.iter_all_python_module_files())) self.assertEqual(autoreload.iter_modules_and_files.cache_info().hits, 1) def temporary_file(self, filename): dirname = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, dirname) return Path(dirname) / filename def test_paths_are_pathlib_instances(self): for filename in autoreload.iter_all_python_module_files(): self.assertIsInstance(filename, Path) def test_file_added(self): """ When a file is added, it's returned by iter_all_python_module_files(). """ filename = self.temporary_file('test_deleted_removed_module.py') filename.touch() with extend_sys_path(str(filename.parent)): self.import_and_cleanup('test_deleted_removed_module') self.assertFileFound(filename.absolute()) def test_check_errors(self): """ When a file containing an error is imported in a function wrapped by check_errors(), gen_filenames() returns it. """ filename = self.temporary_file('test_syntax_error.py') filename.write_text("Ceci n'est pas du Python.") with extend_sys_path(str(filename.parent)): with self.assertRaises(SyntaxError): autoreload.check_errors(import_module)('test_syntax_error') self.assertFileFound(filename) def test_check_errors_catches_all_exceptions(self): """ Since Python may raise arbitrary exceptions when importing code, check_errors() must catch Exception, not just some subclasses. """ filename = self.temporary_file('test_exception.py') filename.write_text('raise Exception') with extend_sys_path(str(filename.parent)): with self.assertRaises(Exception): autoreload.check_errors(import_module)('test_exception') self.assertFileFound(filename) def test_zip_reload(self): """ Modules imported from zipped files have their archive location included in the result. """ zip_file = self.temporary_file('zip_import.zip') with zipfile.ZipFile(str(zip_file), 'w', zipfile.ZIP_DEFLATED) as zipf: zipf.writestr('test_zipped_file.py', '') with extend_sys_path(str(zip_file)): self.import_and_cleanup('test_zipped_file') self.assertFileFound(zip_file) def test_bytecode_conversion_to_source(self): """.pyc and .pyo files are included in the files list.""" filename = self.temporary_file('test_compiled.py') filename.touch() compiled_file = Path(py_compile.compile(str(filename), str(filename.with_suffix('.pyc')))) filename.unlink() with extend_sys_path(str(compiled_file.parent)): self.import_and_cleanup('test_compiled') self.assertFileFound(compiled_file) def test_weakref_in_sys_module(self): """iter_all_python_module_file() ignores weakref modules.""" time_proxy = weakref.proxy(time) sys.modules['time_proxy'] = time_proxy self.addCleanup(lambda: sys.modules.pop('time_proxy', None)) list(autoreload.iter_all_python_module_files()) # No crash. def test_module_without_spec(self): module = types.ModuleType('test_module') del module.__spec__ self.assertEqual(autoreload.iter_modules_and_files((module,), frozenset()), frozenset()) def test_main_module_is_resolved(self): main_module = sys.modules['__main__'] self.assertFileFound(Path(main_module.__file__)) def test_main_module_without_file_is_not_resolved(self): fake_main = types.ModuleType('__main__') self.assertEqual(autoreload.iter_modules_and_files((fake_main,), frozenset()), frozenset()) def test_path_with_embedded_null_bytes(self): for path in ( 'embedded_null_byte\x00.py', 'di\x00rectory/embedded_null_byte.py', ): with self.subTest(path=path): self.assertEqual( autoreload.iter_modules_and_files((), frozenset([path])), frozenset(), ) class TestCommonRoots(SimpleTestCase): def test_common_roots(self): paths = ( Path('/first/second'), Path('/first/second/third'), Path('/first/'), Path('/root/first/'), ) results = autoreload.common_roots(paths) self.assertCountEqual(results, [Path('/first/'), Path('/root/first/')]) class TestSysPathDirectories(SimpleTestCase): def setUp(self): self._directory = tempfile.TemporaryDirectory() self.directory = Path(self._directory.name).resolve().absolute() self.file = self.directory / 'test' self.file.touch() def tearDown(self): self._directory.cleanup() def test_sys_paths_with_directories(self): with extend_sys_path(str(self.file)): paths = list(autoreload.sys_path_directories()) self.assertIn(self.file.parent, paths) def test_sys_paths_non_existing(self): nonexistent_file = Path(self.directory.name) / 'does_not_exist' with extend_sys_path(str(nonexistent_file)): paths = list(autoreload.sys_path_directories()) self.assertNotIn(nonexistent_file, paths) self.assertNotIn(nonexistent_file.parent, paths) def test_sys_paths_absolute(self): paths = list(autoreload.sys_path_directories()) self.assertTrue(all(p.is_absolute() for p in paths)) def test_sys_paths_directories(self): with extend_sys_path(str(self.directory)): paths = list(autoreload.sys_path_directories()) self.assertIn(self.directory, paths) class GetReloaderTests(SimpleTestCase): @mock.patch('django.utils.autoreload.WatchmanReloader') def test_watchman_unavailable(self, mocked_watchman): mocked_watchman.check_availability.side_effect = WatchmanUnavailable self.assertIsInstance(autoreload.get_reloader(), autoreload.StatReloader) @mock.patch.object(autoreload.WatchmanReloader, 'check_availability') def test_watchman_available(self, mocked_available): # If WatchmanUnavailable isn't raised, Watchman will be chosen. mocked_available.return_value = None result = autoreload.get_reloader() self.assertIsInstance(result, autoreload.WatchmanReloader) class RunWithReloaderTests(SimpleTestCase): @mock.patch.dict(os.environ, {autoreload.DJANGO_AUTORELOAD_ENV: 'true'}) @mock.patch('django.utils.autoreload.get_reloader') def test_swallows_keyboard_interrupt(self, mocked_get_reloader): mocked_get_reloader.side_effect = KeyboardInterrupt() autoreload.run_with_reloader(lambda: None) # No exception @mock.patch.dict(os.environ, {autoreload.DJANGO_AUTORELOAD_ENV: 'false'}) @mock.patch('django.utils.autoreload.restart_with_reloader') def test_calls_sys_exit(self, mocked_restart_reloader): mocked_restart_reloader.return_value = 1 with self.assertRaises(SystemExit) as exc: autoreload.run_with_reloader(lambda: None) self.assertEqual(exc.exception.code, 1) @mock.patch.dict(os.environ, {autoreload.DJANGO_AUTORELOAD_ENV: 'true'}) @mock.patch('django.utils.autoreload.start_django') @mock.patch('django.utils.autoreload.get_reloader') def test_calls_start_django(self, mocked_reloader, mocked_start_django): mocked_reloader.return_value = mock.sentinel.RELOADER autoreload.run_with_reloader(mock.sentinel.METHOD) self.assertEqual(mocked_start_django.call_count, 1) self.assertSequenceEqual( mocked_start_django.call_args[0], [mock.sentinel.RELOADER, mock.sentinel.METHOD] ) class StartDjangoTests(SimpleTestCase): @mock.patch('django.utils.autoreload.StatReloader') def test_watchman_becomes_unavailable(self, mocked_stat): mocked_stat.should_stop.return_value = True fake_reloader = mock.MagicMock() fake_reloader.should_stop = False fake_reloader.run.side_effect = autoreload.WatchmanUnavailable() autoreload.start_django(fake_reloader, lambda: None) self.assertEqual(mocked_stat.call_count, 1) @mock.patch('django.utils.autoreload.ensure_echo_on') def test_echo_on_called(self, mocked_echo): fake_reloader = mock.MagicMock() autoreload.start_django(fake_reloader, lambda: None) self.assertEqual(mocked_echo.call_count, 1) @mock.patch('django.utils.autoreload.check_errors') def test_check_errors_called(self, mocked_check_errors): fake_method = mock.MagicMock(return_value=None) fake_reloader = mock.MagicMock() autoreload.start_django(fake_reloader, fake_method) self.assertCountEqual(mocked_check_errors.call_args[0], [fake_method]) @mock.patch('threading.Thread') @mock.patch('django.utils.autoreload.check_errors') def test_starts_thread_with_args(self, mocked_check_errors, mocked_thread): fake_reloader = mock.MagicMock() fake_main_func = mock.MagicMock() fake_thread = mock.MagicMock() mocked_check_errors.return_value = fake_main_func mocked_thread.return_value = fake_thread autoreload.start_django(fake_reloader, fake_main_func, 123, abc=123) self.assertEqual(mocked_thread.call_count, 1) self.assertEqual( mocked_thread.call_args[1], {'target': fake_main_func, 'args': (123,), 'kwargs': {'abc': 123}, 'name': 'django-main-thread'} ) self.assertSequenceEqual(fake_thread.setDaemon.call_args[0], [True]) self.assertTrue(fake_thread.start.called) class TestCheckErrors(SimpleTestCase): def test_mutates_error_files(self): fake_method = mock.MagicMock(side_effect=RuntimeError()) wrapped = autoreload.check_errors(fake_method) with mock.patch.object(autoreload, '_error_files') as mocked_error_files: with self.assertRaises(RuntimeError): wrapped() self.assertEqual(mocked_error_files.append.call_count, 1) class TestRaiseLastException(SimpleTestCase): @mock.patch('django.utils.autoreload._exception', None) def test_no_exception(self): # Should raise no exception if _exception is None autoreload.raise_last_exception() def test_raises_exception(self): class MyException(Exception): pass # Create an exception try: raise MyException('Test Message') except MyException: exc_info = sys.exc_info() with mock.patch('django.utils.autoreload._exception', exc_info): with self.assertRaisesMessage(MyException, 'Test Message'): autoreload.raise_last_exception() def test_raises_custom_exception(self): class MyException(Exception): def __init__(self, msg, extra_context): super().__init__(msg) self.extra_context = extra_context # Create an exception. try: raise MyException('Test Message', 'extra context') except MyException: exc_info = sys.exc_info() with mock.patch('django.utils.autoreload._exception', exc_info): with self.assertRaisesMessage(MyException, 'Test Message'): autoreload.raise_last_exception() def test_raises_exception_with_context(self): try: raise Exception(2) except Exception as e: try: raise Exception(1) from e except Exception: exc_info = sys.exc_info() with mock.patch('django.utils.autoreload._exception', exc_info): with self.assertRaises(Exception) as cm: autoreload.raise_last_exception() self.assertEqual(cm.exception.args[0], 1) self.assertEqual(cm.exception.__cause__.args[0], 2) class RestartWithReloaderTests(SimpleTestCase): executable = '/usr/bin/python' def patch_autoreload(self, argv): patch_call = mock.patch('django.utils.autoreload.subprocess.run', return_value=CompletedProcess(argv, 0)) patches = [ mock.patch('django.utils.autoreload.sys.argv', argv), mock.patch('django.utils.autoreload.sys.executable', self.executable), mock.patch('django.utils.autoreload.sys.warnoptions', ['all']), ] for p in patches: p.start() self.addCleanup(p.stop) mock_call = patch_call.start() self.addCleanup(patch_call.stop) return mock_call def test_manage_py(self): argv = ['./manage.py', 'runserver'] mock_call = self.patch_autoreload(argv) autoreload.restart_with_reloader() self.assertEqual(mock_call.call_count, 1) self.assertEqual(mock_call.call_args[0][0], [self.executable, '-Wall'] + argv) def test_python_m_django(self): main = '/usr/lib/pythonX.Y/site-packages/django/__main__.py' argv = [main, 'runserver'] mock_call = self.patch_autoreload(argv) with mock.patch('django.__main__.__file__', main): autoreload.restart_with_reloader() self.assertEqual(mock_call.call_count, 1) self.assertEqual(mock_call.call_args[0][0], [self.executable, '-Wall', '-m', 'django'] + argv[1:]) class ReloaderTests(SimpleTestCase): RELOADER_CLS = None def setUp(self): self._tempdir = tempfile.TemporaryDirectory() self.tempdir = Path(self._tempdir.name).resolve().absolute() self.existing_file = self.ensure_file(self.tempdir / 'test.py') self.nonexistent_file = (self.tempdir / 'does_not_exist.py').absolute() self.reloader = self.RELOADER_CLS() def tearDown(self): self._tempdir.cleanup() self.reloader.stop() def ensure_file(self, path): path.parent.mkdir(exist_ok=True, parents=True) path.touch() # On Linux and Windows updating the mtime of a file using touch() will set a timestamp # value that is in the past, as the time value for the last kernel tick is used rather # than getting the correct absolute time. # To make testing simpler set the mtime to be the observed time when this function is # called. self.set_mtime(path, time.time()) return path.absolute() def set_mtime(self, fp, value): os.utime(str(fp), (value, value)) def increment_mtime(self, fp, by=1): current_time = time.time() self.set_mtime(fp, current_time + by) @contextlib.contextmanager def tick_twice(self): ticker = self.reloader.tick() next(ticker) yield next(ticker) class IntegrationTests: @mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed') @mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset()) def test_glob(self, mocked_modules, notify_mock): non_py_file = self.ensure_file(self.tempdir / 'non_py_file') self.reloader.watch_dir(self.tempdir, '*.py') with self.tick_twice(): self.increment_mtime(non_py_file) self.increment_mtime(self.existing_file) self.assertEqual(notify_mock.call_count, 1) self.assertCountEqual(notify_mock.call_args[0], [self.existing_file]) @mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed') @mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset()) def test_multiple_globs(self, mocked_modules, notify_mock): self.ensure_file(self.tempdir / 'x.test') self.reloader.watch_dir(self.tempdir, '*.py') self.reloader.watch_dir(self.tempdir, '*.test') with self.tick_twice(): self.increment_mtime(self.existing_file) self.assertEqual(notify_mock.call_count, 1) self.assertCountEqual(notify_mock.call_args[0], [self.existing_file]) @mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed') @mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset()) def test_overlapping_globs(self, mocked_modules, notify_mock): self.reloader.watch_dir(self.tempdir, '*.py') self.reloader.watch_dir(self.tempdir, '*.p*') with self.tick_twice(): self.increment_mtime(self.existing_file) self.assertEqual(notify_mock.call_count, 1) self.assertCountEqual(notify_mock.call_args[0], [self.existing_file]) @mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed') @mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset()) def test_glob_recursive(self, mocked_modules, notify_mock): non_py_file = self.ensure_file(self.tempdir / 'dir' / 'non_py_file') py_file = self.ensure_file(self.tempdir / 'dir' / 'file.py') self.reloader.watch_dir(self.tempdir, '**/*.py') with self.tick_twice(): self.increment_mtime(non_py_file) self.increment_mtime(py_file) self.assertEqual(notify_mock.call_count, 1) self.assertCountEqual(notify_mock.call_args[0], [py_file]) @mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed') @mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset()) def test_multiple_recursive_globs(self, mocked_modules, notify_mock): non_py_file = self.ensure_file(self.tempdir / 'dir' / 'test.txt') py_file = self.ensure_file(self.tempdir / 'dir' / 'file.py') self.reloader.watch_dir(self.tempdir, '**/*.txt') self.reloader.watch_dir(self.tempdir, '**/*.py') with self.tick_twice(): self.increment_mtime(non_py_file) self.increment_mtime(py_file) self.assertEqual(notify_mock.call_count, 2) self.assertCountEqual(notify_mock.call_args_list, [mock.call(py_file), mock.call(non_py_file)]) @mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed') @mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset()) def test_nested_glob_recursive(self, mocked_modules, notify_mock): inner_py_file = self.ensure_file(self.tempdir / 'dir' / 'file.py') self.reloader.watch_dir(self.tempdir, '**/*.py') self.reloader.watch_dir(inner_py_file.parent, '**/*.py') with self.tick_twice(): self.increment_mtime(inner_py_file) self.assertEqual(notify_mock.call_count, 1) self.assertCountEqual(notify_mock.call_args[0], [inner_py_file]) @mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed') @mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset()) def test_overlapping_glob_recursive(self, mocked_modules, notify_mock): py_file = self.ensure_file(self.tempdir / 'dir' / 'file.py') self.reloader.watch_dir(self.tempdir, '**/*.p*') self.reloader.watch_dir(self.tempdir, '**/*.py*') with self.tick_twice(): self.increment_mtime(py_file) self.assertEqual(notify_mock.call_count, 1) self.assertCountEqual(notify_mock.call_args[0], [py_file]) class BaseReloaderTests(ReloaderTests): RELOADER_CLS = autoreload.BaseReloader def test_watch_dir_with_unresolvable_path(self): path = Path('unresolvable_directory') with mock.patch.object(Path, 'absolute', side_effect=FileNotFoundError): self.reloader.watch_dir(path, '**/*.mo') self.assertEqual(list(self.reloader.directory_globs), []) def test_watch_with_glob(self): self.reloader.watch_dir(self.tempdir, '*.py') watched_files = list(self.reloader.watched_files()) self.assertIn(self.existing_file, watched_files) def test_watch_files_with_recursive_glob(self): inner_file = self.ensure_file(self.tempdir / 'test' / 'test.py') self.reloader.watch_dir(self.tempdir, '**/*.py') watched_files = list(self.reloader.watched_files()) self.assertIn(self.existing_file, watched_files) self.assertIn(inner_file, watched_files) def test_run_loop_catches_stopiteration(self): def mocked_tick(): yield with mock.patch.object(self.reloader, 'tick', side_effect=mocked_tick) as tick: self.reloader.run_loop() self.assertEqual(tick.call_count, 1) def test_run_loop_stop_and_return(self): def mocked_tick(*args): yield self.reloader.stop() return # Raises StopIteration with mock.patch.object(self.reloader, 'tick', side_effect=mocked_tick) as tick: self.reloader.run_loop() self.assertEqual(tick.call_count, 1) def test_wait_for_apps_ready_checks_for_exception(self): app_reg = Apps() app_reg.ready_event.set() # thread.is_alive() is False if it's not started. dead_thread = threading.Thread() self.assertFalse(self.reloader.wait_for_apps_ready(app_reg, dead_thread)) def test_wait_for_apps_ready_without_exception(self): app_reg = Apps() app_reg.ready_event.set() thread = mock.MagicMock() thread.is_alive.return_value = True self.assertTrue(self.reloader.wait_for_apps_ready(app_reg, thread)) def skip_unless_watchman_available(): try: autoreload.WatchmanReloader.check_availability() except WatchmanUnavailable as e: return skip('Watchman unavailable: %s' % e) return lambda func: func @skip_unless_watchman_available() class WatchmanReloaderTests(ReloaderTests, IntegrationTests): RELOADER_CLS = autoreload.WatchmanReloader def setUp(self): super().setUp() # Shorten the timeout to speed up tests. self.reloader.client_timeout = 0.1 def test_watch_glob_ignores_non_existing_directories_two_levels(self): with mock.patch.object(self.reloader, '_subscribe') as mocked_subscribe: self.reloader._watch_glob(self.tempdir / 'does_not_exist' / 'more', ['*']) self.assertFalse(mocked_subscribe.called) def test_watch_glob_uses_existing_parent_directories(self): with mock.patch.object(self.reloader, '_subscribe') as mocked_subscribe: self.reloader._watch_glob(self.tempdir / 'does_not_exist', ['*']) self.assertSequenceEqual( mocked_subscribe.call_args[0], [ self.tempdir, 'glob-parent-does_not_exist:%s' % self.tempdir, ['anyof', ['match', 'does_not_exist/*', 'wholename']] ] ) def test_watch_glob_multiple_patterns(self): with mock.patch.object(self.reloader, '_subscribe') as mocked_subscribe: self.reloader._watch_glob(self.tempdir, ['*', '*.py']) self.assertSequenceEqual( mocked_subscribe.call_args[0], [ self.tempdir, 'glob:%s' % self.tempdir, ['anyof', ['match', '*', 'wholename'], ['match', '*.py', 'wholename']] ] ) def test_watched_roots_contains_files(self): paths = self.reloader.watched_roots([self.existing_file]) self.assertIn(self.existing_file.parent, paths) def test_watched_roots_contains_directory_globs(self): self.reloader.watch_dir(self.tempdir, '*.py') paths = self.reloader.watched_roots([]) self.assertIn(self.tempdir, paths) def test_watched_roots_contains_sys_path(self): with extend_sys_path(str(self.tempdir)): paths = self.reloader.watched_roots([]) self.assertIn(self.tempdir, paths) def test_check_server_status(self): self.assertTrue(self.reloader.check_server_status()) def test_check_server_status_raises_error(self): with mock.patch.object(self.reloader.client, 'query') as mocked_query: mocked_query.side_effect = Exception() with self.assertRaises(autoreload.WatchmanUnavailable): self.reloader.check_server_status() @mock.patch('pywatchman.client') def test_check_availability(self, mocked_client): mocked_client().capabilityCheck.side_effect = Exception() with self.assertRaisesMessage(WatchmanUnavailable, 'Cannot connect to the watchman service'): self.RELOADER_CLS.check_availability() @mock.patch('pywatchman.client') def test_check_availability_lower_version(self, mocked_client): mocked_client().capabilityCheck.return_value = {'version': '4.8.10'} with self.assertRaisesMessage(WatchmanUnavailable, 'Watchman 4.9 or later is required.'): self.RELOADER_CLS.check_availability() def test_pywatchman_not_available(self): with mock.patch.object(autoreload, 'pywatchman') as mocked: mocked.__bool__.return_value = False with self.assertRaisesMessage(WatchmanUnavailable, 'pywatchman not installed.'): self.RELOADER_CLS.check_availability() def test_update_watches_raises_exceptions(self): class TestException(Exception): pass with mock.patch.object(self.reloader, '_update_watches') as mocked_watches: with mock.patch.object(self.reloader, 'check_server_status') as mocked_server_status: mocked_watches.side_effect = TestException() mocked_server_status.return_value = True with self.assertRaises(TestException): self.reloader.update_watches() self.assertIsInstance(mocked_server_status.call_args[0][0], TestException) @mock.patch.dict(os.environ, {'DJANGO_WATCHMAN_TIMEOUT': '10'}) def test_setting_timeout_from_environment_variable(self): self.assertEqual(self.RELOADER_CLS.client_timeout, 10) @skipIf(on_macos_with_hfs(), "These tests do not work with HFS+ as a filesystem") class StatReloaderTests(ReloaderTests, IntegrationTests): RELOADER_CLS = autoreload.StatReloader def setUp(self): super().setUp() # Shorten the sleep time to speed up tests. self.reloader.SLEEP_TIME = 0.01 @mock.patch('django.utils.autoreload.StatReloader.notify_file_changed') def test_tick_does_not_trigger_twice(self, mock_notify_file_changed): with mock.patch.object(self.reloader, 'watched_files', return_value=[self.existing_file]): ticker = self.reloader.tick() next(ticker) self.increment_mtime(self.existing_file) next(ticker) next(ticker) self.assertEqual(mock_notify_file_changed.call_count, 1) def test_snapshot_files_ignores_missing_files(self): with mock.patch.object(self.reloader, 'watched_files', return_value=[self.nonexistent_file]): self.assertEqual(dict(self.reloader.snapshot_files()), {}) def test_snapshot_files_updates(self): with mock.patch.object(self.reloader, 'watched_files', return_value=[self.existing_file]): snapshot1 = dict(self.reloader.snapshot_files()) self.assertIn(self.existing_file, snapshot1) self.increment_mtime(self.existing_file) snapshot2 = dict(self.reloader.snapshot_files()) self.assertNotEqual(snapshot1[self.existing_file], snapshot2[self.existing_file]) def test_snapshot_files_with_duplicates(self): with mock.patch.object(self.reloader, 'watched_files', return_value=[self.existing_file, self.existing_file]): snapshot = list(self.reloader.snapshot_files()) self.assertEqual(len(snapshot), 1) self.assertEqual(snapshot[0][0], self.existing_file)