679 lines
30 KiB
Python
679 lines
30 KiB
Python
import contextlib
|
|
import os
|
|
import py_compile
|
|
import shutil
|
|
import sys
|
|
import tempfile
|
|
import threading
|
|
import time
|
|
import weakref
|
|
import zipfile
|
|
from importlib import import_module
|
|
from pathlib import Path
|
|
from unittest import mock, skip
|
|
|
|
from django.apps.registry import Apps
|
|
from django.test import SimpleTestCase
|
|
from django.test.utils import extend_sys_path
|
|
from django.utils import autoreload
|
|
from django.utils.autoreload import WatchmanUnavailable
|
|
|
|
|
|
class TestIterModulesAndFiles(SimpleTestCase):
|
|
def import_and_cleanup(self, name):
|
|
import_module(name)
|
|
self.addCleanup(lambda: sys.path_importer_cache.clear())
|
|
self.addCleanup(lambda: sys.modules.pop(name, None))
|
|
|
|
def clear_autoreload_caches(self):
|
|
autoreload.iter_modules_and_files.cache_clear()
|
|
|
|
def assertFileFound(self, filename):
|
|
# Some temp directories are symlinks. Python resolves these fully while
|
|
# importing.
|
|
resolved_filename = filename.resolve()
|
|
self.clear_autoreload_caches()
|
|
# Test uncached access
|
|
self.assertIn(resolved_filename, list(autoreload.iter_all_python_module_files()))
|
|
# Test cached access
|
|
self.assertIn(resolved_filename, list(autoreload.iter_all_python_module_files()))
|
|
self.assertEqual(autoreload.iter_modules_and_files.cache_info().hits, 1)
|
|
|
|
def assertFileNotFound(self, filename):
|
|
resolved_filename = filename.resolve()
|
|
self.clear_autoreload_caches()
|
|
# Test uncached access
|
|
self.assertNotIn(resolved_filename, list(autoreload.iter_all_python_module_files()))
|
|
# Test cached access
|
|
self.assertNotIn(resolved_filename, list(autoreload.iter_all_python_module_files()))
|
|
self.assertEqual(autoreload.iter_modules_and_files.cache_info().hits, 1)
|
|
|
|
def temporary_file(self, filename):
|
|
dirname = tempfile.mkdtemp()
|
|
self.addCleanup(shutil.rmtree, dirname)
|
|
return Path(dirname) / filename
|
|
|
|
def test_paths_are_pathlib_instances(self):
|
|
for filename in autoreload.iter_all_python_module_files():
|
|
self.assertIsInstance(filename, Path)
|
|
|
|
def test_file_added(self):
|
|
"""
|
|
When a file is added, it's returned by iter_all_python_module_files().
|
|
"""
|
|
filename = self.temporary_file('test_deleted_removed_module.py')
|
|
filename.touch()
|
|
|
|
with extend_sys_path(str(filename.parent)):
|
|
self.import_and_cleanup('test_deleted_removed_module')
|
|
|
|
self.assertFileFound(filename.absolute())
|
|
|
|
def test_check_errors(self):
|
|
"""
|
|
When a file containing an error is imported in a function wrapped by
|
|
check_errors(), gen_filenames() returns it.
|
|
"""
|
|
filename = self.temporary_file('test_syntax_error.py')
|
|
filename.write_text("Ceci n'est pas du Python.")
|
|
|
|
with extend_sys_path(str(filename.parent)):
|
|
with self.assertRaises(SyntaxError):
|
|
autoreload.check_errors(import_module)('test_syntax_error')
|
|
self.assertFileFound(filename)
|
|
|
|
def test_check_errors_catches_all_exceptions(self):
|
|
"""
|
|
Since Python may raise arbitrary exceptions when importing code,
|
|
check_errors() must catch Exception, not just some subclasses.
|
|
"""
|
|
filename = self.temporary_file('test_exception.py')
|
|
filename.write_text('raise Exception')
|
|
with extend_sys_path(str(filename.parent)):
|
|
with self.assertRaises(Exception):
|
|
autoreload.check_errors(import_module)('test_exception')
|
|
self.assertFileFound(filename)
|
|
|
|
def test_zip_reload(self):
|
|
"""
|
|
Modules imported from zipped files have their archive location included
|
|
in the result.
|
|
"""
|
|
zip_file = self.temporary_file('zip_import.zip')
|
|
with zipfile.ZipFile(str(zip_file), 'w', zipfile.ZIP_DEFLATED) as zipf:
|
|
zipf.writestr('test_zipped_file.py', '')
|
|
|
|
with extend_sys_path(str(zip_file)):
|
|
self.import_and_cleanup('test_zipped_file')
|
|
self.assertFileFound(zip_file)
|
|
|
|
def test_bytecode_conversion_to_source(self):
|
|
""".pyc and .pyo files are included in the files list."""
|
|
filename = self.temporary_file('test_compiled.py')
|
|
filename.touch()
|
|
compiled_file = Path(py_compile.compile(str(filename), str(filename.with_suffix('.pyc'))))
|
|
filename.unlink()
|
|
with extend_sys_path(str(compiled_file.parent)):
|
|
self.import_and_cleanup('test_compiled')
|
|
self.assertFileFound(compiled_file)
|
|
|
|
def test_weakref_in_sys_module(self):
|
|
"""iter_all_python_module_file() ignores weakref modules."""
|
|
time_proxy = weakref.proxy(time)
|
|
sys.modules['time_proxy'] = time_proxy
|
|
self.addCleanup(lambda: sys.modules.pop('time_proxy', None))
|
|
list(autoreload.iter_all_python_module_files()) # No crash.
|
|
|
|
|
|
class TestCommonRoots(SimpleTestCase):
|
|
def test_common_roots(self):
|
|
paths = (
|
|
Path('/first/second'),
|
|
Path('/first/second/third'),
|
|
Path('/first/'),
|
|
Path('/root/first/'),
|
|
)
|
|
results = autoreload.common_roots(paths)
|
|
self.assertCountEqual(results, [Path('/first/'), Path('/root/first/')])
|
|
|
|
|
|
class TestSysPathDirectories(SimpleTestCase):
|
|
def setUp(self):
|
|
self._directory = tempfile.TemporaryDirectory()
|
|
self.directory = Path(self._directory.name).resolve().absolute()
|
|
self.file = self.directory / 'test'
|
|
self.file.touch()
|
|
|
|
def tearDown(self):
|
|
self._directory.cleanup()
|
|
|
|
def test_sys_paths_with_directories(self):
|
|
with extend_sys_path(str(self.file)):
|
|
paths = list(autoreload.sys_path_directories())
|
|
self.assertIn(self.file.parent, paths)
|
|
|
|
def test_sys_paths_non_existing(self):
|
|
nonexistant_file = Path(self.directory.name) / 'does_not_exist'
|
|
with extend_sys_path(str(nonexistant_file)):
|
|
paths = list(autoreload.sys_path_directories())
|
|
self.assertNotIn(nonexistant_file, paths)
|
|
self.assertNotIn(nonexistant_file.parent, paths)
|
|
|
|
def test_sys_paths_absolute(self):
|
|
paths = list(autoreload.sys_path_directories())
|
|
self.assertTrue(all(p.is_absolute() for p in paths))
|
|
|
|
def test_sys_paths_directories(self):
|
|
with extend_sys_path(str(self.directory)):
|
|
paths = list(autoreload.sys_path_directories())
|
|
self.assertIn(self.directory, paths)
|
|
|
|
|
|
class GetReloaderTests(SimpleTestCase):
|
|
@mock.patch('django.utils.autoreload.WatchmanReloader')
|
|
def test_watchman_unavailable(self, mocked_watchman):
|
|
mocked_watchman.check_availability.side_effect = WatchmanUnavailable
|
|
self.assertIsInstance(autoreload.get_reloader(), autoreload.StatReloader)
|
|
|
|
@mock.patch.object(autoreload.WatchmanReloader, 'check_availability')
|
|
def test_watchman_available(self, mocked_available):
|
|
# If WatchmanUnavailable isn't raised, Watchman will be chosen.
|
|
mocked_available.return_value = None
|
|
result = autoreload.get_reloader()
|
|
self.assertIsInstance(result, autoreload.WatchmanReloader)
|
|
|
|
|
|
class RunWithReloaderTests(SimpleTestCase):
|
|
@mock.patch.dict(os.environ, {autoreload.DJANGO_AUTORELOAD_ENV: 'true'})
|
|
@mock.patch('django.utils.autoreload.get_reloader')
|
|
def test_swallows_keyboard_interrupt(self, mocked_get_reloader):
|
|
mocked_get_reloader.side_effect = KeyboardInterrupt()
|
|
autoreload.run_with_reloader(lambda: None) # No exception
|
|
|
|
@mock.patch.dict(os.environ, {autoreload.DJANGO_AUTORELOAD_ENV: 'false'})
|
|
@mock.patch('django.utils.autoreload.restart_with_reloader')
|
|
def test_calls_sys_exit(self, mocked_restart_reloader):
|
|
mocked_restart_reloader.return_value = 1
|
|
with self.assertRaises(SystemExit) as exc:
|
|
autoreload.run_with_reloader(lambda: None)
|
|
self.assertEqual(exc.exception.code, 1)
|
|
|
|
@mock.patch.dict(os.environ, {autoreload.DJANGO_AUTORELOAD_ENV: 'true'})
|
|
@mock.patch('django.utils.autoreload.start_django')
|
|
@mock.patch('django.utils.autoreload.get_reloader')
|
|
def test_calls_start_django(self, mocked_reloader, mocked_start_django):
|
|
mocked_reloader.return_value = mock.sentinel.RELOADER
|
|
autoreload.run_with_reloader(mock.sentinel.METHOD)
|
|
self.assertEqual(mocked_start_django.call_count, 1)
|
|
self.assertSequenceEqual(
|
|
mocked_start_django.call_args[0],
|
|
[mock.sentinel.RELOADER, mock.sentinel.METHOD]
|
|
)
|
|
|
|
|
|
class StartDjangoTests(SimpleTestCase):
|
|
@mock.patch('django.utils.autoreload.StatReloader')
|
|
def test_watchman_becomes_unavailable(self, mocked_stat):
|
|
mocked_stat.should_stop.return_value = True
|
|
fake_reloader = mock.MagicMock()
|
|
fake_reloader.should_stop = False
|
|
fake_reloader.run.side_effect = autoreload.WatchmanUnavailable()
|
|
|
|
autoreload.start_django(fake_reloader, lambda: None)
|
|
self.assertEqual(mocked_stat.call_count, 1)
|
|
|
|
@mock.patch('django.utils.autoreload.ensure_echo_on')
|
|
def test_echo_on_called(self, mocked_echo):
|
|
fake_reloader = mock.MagicMock()
|
|
autoreload.start_django(fake_reloader, lambda: None)
|
|
self.assertEqual(mocked_echo.call_count, 1)
|
|
|
|
@mock.patch('django.utils.autoreload.check_errors')
|
|
def test_check_errors_called(self, mocked_check_errors):
|
|
fake_method = mock.MagicMock(return_value=None)
|
|
fake_reloader = mock.MagicMock()
|
|
autoreload.start_django(fake_reloader, fake_method)
|
|
self.assertCountEqual(mocked_check_errors.call_args[0], [fake_method])
|
|
|
|
@mock.patch('threading.Thread')
|
|
@mock.patch('django.utils.autoreload.check_errors')
|
|
def test_starts_thread_with_args(self, mocked_check_errors, mocked_thread):
|
|
fake_reloader = mock.MagicMock()
|
|
fake_main_func = mock.MagicMock()
|
|
fake_thread = mock.MagicMock()
|
|
mocked_check_errors.return_value = fake_main_func
|
|
mocked_thread.return_value = fake_thread
|
|
autoreload.start_django(fake_reloader, fake_main_func, 123, abc=123)
|
|
self.assertEqual(mocked_thread.call_count, 1)
|
|
self.assertEqual(
|
|
mocked_thread.call_args[1],
|
|
{'target': fake_main_func, 'args': (123,), 'kwargs': {'abc': 123}}
|
|
)
|
|
self.assertSequenceEqual(fake_thread.setDaemon.call_args[0], [True])
|
|
self.assertTrue(fake_thread.start.called)
|
|
|
|
|
|
class TestCheckErrors(SimpleTestCase):
|
|
def test_mutates_error_files(self):
|
|
fake_method = mock.MagicMock(side_effect=RuntimeError())
|
|
wrapped = autoreload.check_errors(fake_method)
|
|
with mock.patch.object(autoreload, '_error_files') as mocked_error_files:
|
|
with self.assertRaises(RuntimeError):
|
|
wrapped()
|
|
self.assertEqual(mocked_error_files.append.call_count, 1)
|
|
|
|
|
|
class TestRaiseLastException(SimpleTestCase):
|
|
@mock.patch('django.utils.autoreload._exception', None)
|
|
def test_no_exception(self):
|
|
# Should raise no exception if _exception is None
|
|
autoreload.raise_last_exception()
|
|
|
|
def test_raises_exception(self):
|
|
class MyException(Exception):
|
|
pass
|
|
|
|
# Create an exception
|
|
try:
|
|
raise MyException('Test Message')
|
|
except MyException:
|
|
exc_info = sys.exc_info()
|
|
|
|
with mock.patch('django.utils.autoreload._exception', exc_info):
|
|
with self.assertRaises(MyException, msg='Test Message'):
|
|
autoreload.raise_last_exception()
|
|
|
|
|
|
class RestartWithReloaderTests(SimpleTestCase):
|
|
executable = '/usr/bin/python'
|
|
|
|
def patch_autoreload(self, argv):
|
|
patch_call = mock.patch('django.utils.autoreload.subprocess.call', return_value=0)
|
|
patches = [
|
|
mock.patch('django.utils.autoreload.sys.argv', argv),
|
|
mock.patch('django.utils.autoreload.sys.executable', self.executable),
|
|
mock.patch('django.utils.autoreload.sys.warnoptions', ['all']),
|
|
]
|
|
for p in patches:
|
|
p.start()
|
|
self.addCleanup(p.stop)
|
|
mock_call = patch_call.start()
|
|
self.addCleanup(patch_call.stop)
|
|
return mock_call
|
|
|
|
def test_manage_py(self):
|
|
argv = ['./manage.py', 'runserver']
|
|
mock_call = self.patch_autoreload(argv)
|
|
autoreload.restart_with_reloader()
|
|
self.assertEqual(mock_call.call_count, 1)
|
|
self.assertEqual(mock_call.call_args[0][0], [self.executable, '-Wall'] + argv)
|
|
|
|
def test_python_m_django(self):
|
|
main = '/usr/lib/pythonX.Y/site-packages/django/__main__.py'
|
|
argv = [main, 'runserver']
|
|
mock_call = self.patch_autoreload(argv)
|
|
with mock.patch('django.__main__.__file__', main):
|
|
autoreload.restart_with_reloader()
|
|
self.assertEqual(mock_call.call_count, 1)
|
|
self.assertEqual(mock_call.call_args[0][0], [self.executable, '-Wall', '-m', 'django'] + argv[1:])
|
|
|
|
|
|
class ReloaderTests(SimpleTestCase):
|
|
RELOADER_CLS = None
|
|
|
|
def setUp(self):
|
|
self._tempdir = tempfile.TemporaryDirectory()
|
|
self.tempdir = Path(self._tempdir.name).resolve().absolute()
|
|
self.existing_file = self.ensure_file(self.tempdir / 'test.py')
|
|
self.nonexistant_file = (self.tempdir / 'does_not_exist.py').absolute()
|
|
self.reloader = self.RELOADER_CLS()
|
|
|
|
def tearDown(self):
|
|
self._tempdir.cleanup()
|
|
self.reloader.stop()
|
|
|
|
def ensure_file(self, path):
|
|
path.parent.mkdir(exist_ok=True, parents=True)
|
|
path.touch()
|
|
# On Linux and Windows updating the mtime of a file using touch() will set a timestamp
|
|
# value that is in the past, as the time value for the last kernel tick is used rather
|
|
# than getting the correct absolute time.
|
|
# To make testing simpler set the mtime to be the observed time when this function is
|
|
# called.
|
|
self.set_mtime(path, time.time())
|
|
return path.absolute()
|
|
|
|
def set_mtime(self, fp, value):
|
|
os.utime(str(fp), (value, value))
|
|
|
|
def increment_mtime(self, fp, by=1):
|
|
current_time = time.time()
|
|
self.set_mtime(fp, current_time + by)
|
|
|
|
@contextlib.contextmanager
|
|
def tick_twice(self):
|
|
ticker = self.reloader.tick()
|
|
next(ticker)
|
|
yield
|
|
next(ticker)
|
|
|
|
|
|
class IntegrationTests:
|
|
@mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed')
|
|
@mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset())
|
|
def test_file(self, mocked_modules, notify_mock):
|
|
self.reloader.watch_file(self.existing_file)
|
|
with self.tick_twice():
|
|
self.increment_mtime(self.existing_file)
|
|
self.assertEqual(notify_mock.call_count, 1)
|
|
self.assertCountEqual(notify_mock.call_args[0], [self.existing_file])
|
|
|
|
@mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed')
|
|
@mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset())
|
|
def test_nonexistant_file(self, mocked_modules, notify_mock):
|
|
self.reloader.watch_file(self.nonexistant_file)
|
|
with self.tick_twice():
|
|
self.ensure_file(self.nonexistant_file)
|
|
self.assertEqual(notify_mock.call_count, 1)
|
|
self.assertCountEqual(notify_mock.call_args[0], [self.nonexistant_file])
|
|
|
|
@mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed')
|
|
@mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset())
|
|
def test_nonexistant_file_in_non_existing_directory(self, mocked_modules, notify_mock):
|
|
non_existing_directory = self.tempdir / 'non_existing_dir'
|
|
nonexistant_file = non_existing_directory / 'test'
|
|
self.reloader.watch_file(nonexistant_file)
|
|
with self.tick_twice():
|
|
self.ensure_file(nonexistant_file)
|
|
self.assertEqual(notify_mock.call_count, 1)
|
|
self.assertCountEqual(notify_mock.call_args[0], [nonexistant_file])
|
|
|
|
@mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed')
|
|
@mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset())
|
|
def test_glob(self, mocked_modules, notify_mock):
|
|
non_py_file = self.ensure_file(self.tempdir / 'non_py_file')
|
|
self.reloader.watch_dir(self.tempdir, '*.py')
|
|
with self.tick_twice():
|
|
self.increment_mtime(non_py_file)
|
|
self.increment_mtime(self.existing_file)
|
|
self.assertEqual(notify_mock.call_count, 1)
|
|
self.assertCountEqual(notify_mock.call_args[0], [self.existing_file])
|
|
|
|
@mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed')
|
|
@mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset())
|
|
def test_glob_non_existing_directory(self, mocked_modules, notify_mock):
|
|
non_existing_directory = self.tempdir / 'does_not_exist'
|
|
nonexistant_file = non_existing_directory / 'test.py'
|
|
self.reloader.watch_dir(non_existing_directory, '*.py')
|
|
with self.tick_twice():
|
|
self.ensure_file(nonexistant_file)
|
|
self.set_mtime(nonexistant_file, time.time())
|
|
self.assertEqual(notify_mock.call_count, 1)
|
|
self.assertCountEqual(notify_mock.call_args[0], [nonexistant_file])
|
|
|
|
@mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed')
|
|
@mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset())
|
|
def test_multiple_globs(self, mocked_modules, notify_mock):
|
|
self.ensure_file(self.tempdir / 'x.test')
|
|
self.reloader.watch_dir(self.tempdir, '*.py')
|
|
self.reloader.watch_dir(self.tempdir, '*.test')
|
|
with self.tick_twice():
|
|
self.increment_mtime(self.existing_file)
|
|
self.assertEqual(notify_mock.call_count, 1)
|
|
self.assertCountEqual(notify_mock.call_args[0], [self.existing_file])
|
|
|
|
@mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed')
|
|
@mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset())
|
|
def test_overlapping_globs(self, mocked_modules, notify_mock):
|
|
self.reloader.watch_dir(self.tempdir, '*.py')
|
|
self.reloader.watch_dir(self.tempdir, '*.p*')
|
|
with self.tick_twice():
|
|
self.increment_mtime(self.existing_file)
|
|
self.assertEqual(notify_mock.call_count, 1)
|
|
self.assertCountEqual(notify_mock.call_args[0], [self.existing_file])
|
|
|
|
@mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed')
|
|
@mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset())
|
|
def test_glob_recursive(self, mocked_modules, notify_mock):
|
|
non_py_file = self.ensure_file(self.tempdir / 'dir' / 'non_py_file')
|
|
py_file = self.ensure_file(self.tempdir / 'dir' / 'file.py')
|
|
self.reloader.watch_dir(self.tempdir, '**/*.py')
|
|
with self.tick_twice():
|
|
self.increment_mtime(non_py_file)
|
|
self.increment_mtime(py_file)
|
|
self.assertEqual(notify_mock.call_count, 1)
|
|
self.assertCountEqual(notify_mock.call_args[0], [py_file])
|
|
|
|
@mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed')
|
|
@mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset())
|
|
def test_multiple_recursive_globs(self, mocked_modules, notify_mock):
|
|
non_py_file = self.ensure_file(self.tempdir / 'dir' / 'test.txt')
|
|
py_file = self.ensure_file(self.tempdir / 'dir' / 'file.py')
|
|
self.reloader.watch_dir(self.tempdir, '**/*.txt')
|
|
self.reloader.watch_dir(self.tempdir, '**/*.py')
|
|
with self.tick_twice():
|
|
self.increment_mtime(non_py_file)
|
|
self.increment_mtime(py_file)
|
|
self.assertEqual(notify_mock.call_count, 2)
|
|
self.assertCountEqual(notify_mock.call_args_list, [mock.call(py_file), mock.call(non_py_file)])
|
|
|
|
@mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed')
|
|
@mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset())
|
|
def test_nested_glob_recursive(self, mocked_modules, notify_mock):
|
|
inner_py_file = self.ensure_file(self.tempdir / 'dir' / 'file.py')
|
|
self.reloader.watch_dir(self.tempdir, '**/*.py')
|
|
self.reloader.watch_dir(inner_py_file.parent, '**/*.py')
|
|
with self.tick_twice():
|
|
self.increment_mtime(inner_py_file)
|
|
self.assertEqual(notify_mock.call_count, 1)
|
|
self.assertCountEqual(notify_mock.call_args[0], [inner_py_file])
|
|
|
|
@mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed')
|
|
@mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset())
|
|
def test_overlapping_glob_recursive(self, mocked_modules, notify_mock):
|
|
py_file = self.ensure_file(self.tempdir / 'dir' / 'file.py')
|
|
self.reloader.watch_dir(self.tempdir, '**/*.p*')
|
|
self.reloader.watch_dir(self.tempdir, '**/*.py*')
|
|
with self.tick_twice():
|
|
self.increment_mtime(py_file)
|
|
self.assertEqual(notify_mock.call_count, 1)
|
|
self.assertCountEqual(notify_mock.call_args[0], [py_file])
|
|
|
|
|
|
class BaseReloaderTests(ReloaderTests):
|
|
RELOADER_CLS = autoreload.BaseReloader
|
|
|
|
def test_watch_without_absolute(self):
|
|
with self.assertRaisesMessage(ValueError, 'test.py must be absolute.'):
|
|
self.reloader.watch_file('test.py')
|
|
|
|
def test_watch_with_single_file(self):
|
|
self.reloader.watch_file(self.existing_file)
|
|
watched_files = list(self.reloader.watched_files())
|
|
self.assertIn(self.existing_file, watched_files)
|
|
|
|
def test_watch_with_glob(self):
|
|
self.reloader.watch_dir(self.tempdir, '*.py')
|
|
watched_files = list(self.reloader.watched_files())
|
|
self.assertIn(self.existing_file, watched_files)
|
|
|
|
def test_watch_files_with_recursive_glob(self):
|
|
inner_file = self.ensure_file(self.tempdir / 'test' / 'test.py')
|
|
self.reloader.watch_dir(self.tempdir, '**/*.py')
|
|
watched_files = list(self.reloader.watched_files())
|
|
self.assertIn(self.existing_file, watched_files)
|
|
self.assertIn(inner_file, watched_files)
|
|
|
|
def test_run_loop_catches_stopiteration(self):
|
|
def mocked_tick():
|
|
yield
|
|
|
|
with mock.patch.object(self.reloader, 'tick', side_effect=mocked_tick) as tick:
|
|
self.reloader.run_loop()
|
|
self.assertEqual(tick.call_count, 1)
|
|
|
|
def test_run_loop_stop_and_return(self):
|
|
def mocked_tick(*args):
|
|
yield
|
|
self.reloader.stop()
|
|
return # Raises StopIteration
|
|
|
|
with mock.patch.object(self.reloader, 'tick', side_effect=mocked_tick) as tick:
|
|
self.reloader.run_loop()
|
|
|
|
self.assertEqual(tick.call_count, 1)
|
|
|
|
def test_wait_for_apps_ready_checks_for_exception(self):
|
|
app_reg = Apps()
|
|
app_reg.ready_event.set()
|
|
# thread.is_alive() is False if it's not started.
|
|
dead_thread = threading.Thread()
|
|
self.assertFalse(self.reloader.wait_for_apps_ready(app_reg, dead_thread))
|
|
|
|
def test_wait_for_apps_ready_without_exception(self):
|
|
app_reg = Apps()
|
|
app_reg.ready_event.set()
|
|
thread = mock.MagicMock()
|
|
thread.is_alive.return_value = True
|
|
self.assertTrue(self.reloader.wait_for_apps_ready(app_reg, thread))
|
|
|
|
|
|
def skip_unless_watchman_available():
|
|
try:
|
|
autoreload.WatchmanReloader.check_availability()
|
|
except WatchmanUnavailable as e:
|
|
return skip('Watchman unavailable: %s' % e)
|
|
return lambda func: func
|
|
|
|
|
|
@skip_unless_watchman_available()
|
|
class WatchmanReloaderTests(ReloaderTests, IntegrationTests):
|
|
RELOADER_CLS = autoreload.WatchmanReloader
|
|
|
|
def test_watch_glob_ignores_non_existing_directories_two_levels(self):
|
|
with mock.patch.object(self.reloader, '_subscribe') as mocked_subscribe:
|
|
self.reloader._watch_glob(self.tempdir / 'does_not_exist' / 'more', ['*'])
|
|
self.assertFalse(mocked_subscribe.called)
|
|
|
|
def test_watch_glob_uses_existing_parent_directories(self):
|
|
with mock.patch.object(self.reloader, '_subscribe') as mocked_subscribe:
|
|
self.reloader._watch_glob(self.tempdir / 'does_not_exist', ['*'])
|
|
self.assertSequenceEqual(
|
|
mocked_subscribe.call_args[0],
|
|
[
|
|
self.tempdir, 'glob-parent-does_not_exist:%s' % self.tempdir,
|
|
['anyof', ['match', 'does_not_exist/*', 'wholename']]
|
|
]
|
|
)
|
|
|
|
def test_watch_glob_multiple_patterns(self):
|
|
with mock.patch.object(self.reloader, '_subscribe') as mocked_subscribe:
|
|
self.reloader._watch_glob(self.tempdir, ['*', '*.py'])
|
|
self.assertSequenceEqual(
|
|
mocked_subscribe.call_args[0],
|
|
[
|
|
self.tempdir, 'glob:%s' % self.tempdir,
|
|
['anyof', ['match', '*', 'wholename'], ['match', '*.py', 'wholename']]
|
|
]
|
|
)
|
|
|
|
def test_watched_roots_contains_files(self):
|
|
paths = self.reloader.watched_roots([self.existing_file])
|
|
self.assertIn(self.existing_file.parent, paths)
|
|
|
|
def test_watched_roots_contains_directory_globs(self):
|
|
self.reloader.watch_dir(self.tempdir, '*.py')
|
|
paths = self.reloader.watched_roots([])
|
|
self.assertIn(self.tempdir, paths)
|
|
|
|
def test_watched_roots_contains_sys_path(self):
|
|
with extend_sys_path(str(self.tempdir)):
|
|
paths = self.reloader.watched_roots([])
|
|
self.assertIn(self.tempdir, paths)
|
|
|
|
def test_check_server_status(self):
|
|
self.assertTrue(self.reloader.check_server_status())
|
|
|
|
def test_check_server_status_raises_error(self):
|
|
with mock.patch.object(self.reloader.client, 'query') as mocked_query:
|
|
mocked_query.side_effect = Exception()
|
|
with self.assertRaises(autoreload.WatchmanUnavailable):
|
|
self.reloader.check_server_status()
|
|
|
|
@mock.patch('pywatchman.client')
|
|
def test_check_availability(self, mocked_client):
|
|
mocked_client().capabilityCheck.side_effect = Exception()
|
|
with self.assertRaisesMessage(WatchmanUnavailable, 'Cannot connect to the watchman service'):
|
|
self.RELOADER_CLS.check_availability()
|
|
|
|
@mock.patch('pywatchman.client')
|
|
def test_check_availability_lower_version(self, mocked_client):
|
|
mocked_client().capabilityCheck.return_value = {'version': '4.8.10'}
|
|
with self.assertRaisesMessage(WatchmanUnavailable, 'Watchman 4.9 or later is required.'):
|
|
self.RELOADER_CLS.check_availability()
|
|
|
|
def test_pywatchman_not_available(self):
|
|
with mock.patch.object(autoreload, 'pywatchman') as mocked:
|
|
mocked.__bool__.return_value = False
|
|
with self.assertRaisesMessage(WatchmanUnavailable, 'pywatchman not installed.'):
|
|
self.RELOADER_CLS.check_availability()
|
|
|
|
def test_update_watches_raises_exceptions(self):
|
|
class TestException(Exception):
|
|
pass
|
|
|
|
with mock.patch.object(self.reloader, '_update_watches') as mocked_watches:
|
|
with mock.patch.object(self.reloader, 'check_server_status') as mocked_server_status:
|
|
mocked_watches.side_effect = TestException()
|
|
mocked_server_status.return_value = True
|
|
with self.assertRaises(TestException):
|
|
self.reloader.update_watches()
|
|
self.assertIsInstance(mocked_server_status.call_args[0][0], TestException)
|
|
|
|
|
|
class StatReloaderTests(ReloaderTests, IntegrationTests):
|
|
RELOADER_CLS = autoreload.StatReloader
|
|
|
|
def setUp(self):
|
|
super().setUp()
|
|
# Shorten the sleep time to speed up tests.
|
|
self.reloader.SLEEP_TIME = 0.01
|
|
|
|
def test_snapshot_files_ignores_missing_files(self):
|
|
with mock.patch.object(self.reloader, 'watched_files', return_value=[self.nonexistant_file]):
|
|
self.assertEqual(dict(self.reloader.snapshot_files()), {})
|
|
|
|
def test_snapshot_files_updates(self):
|
|
with mock.patch.object(self.reloader, 'watched_files', return_value=[self.existing_file]):
|
|
snapshot1 = dict(self.reloader.snapshot_files())
|
|
self.assertIn(self.existing_file, snapshot1)
|
|
self.increment_mtime(self.existing_file)
|
|
snapshot2 = dict(self.reloader.snapshot_files())
|
|
self.assertNotEqual(snapshot1[self.existing_file], snapshot2[self.existing_file])
|
|
|
|
def test_does_not_fire_without_changes(self):
|
|
with mock.patch.object(self.reloader, 'watched_files', return_value=[self.existing_file]), \
|
|
mock.patch.object(self.reloader, 'notify_file_changed') as notifier:
|
|
mtime = self.existing_file.stat().st_mtime
|
|
initial_snapshot = {self.existing_file: mtime}
|
|
second_snapshot = self.reloader.loop_files(initial_snapshot, time.time())
|
|
self.assertEqual(second_snapshot, {})
|
|
notifier.assert_not_called()
|
|
|
|
def test_fires_when_created(self):
|
|
with mock.patch.object(self.reloader, 'watched_files', return_value=[self.nonexistant_file]), \
|
|
mock.patch.object(self.reloader, 'notify_file_changed') as notifier:
|
|
self.nonexistant_file.touch()
|
|
mtime = self.nonexistant_file.stat().st_mtime
|
|
second_snapshot = self.reloader.loop_files({}, mtime - 1)
|
|
self.assertCountEqual(second_snapshot.keys(), [self.nonexistant_file])
|
|
notifier.assert_called_once_with(self.nonexistant_file)
|
|
|
|
def test_fires_with_changes(self):
|
|
with mock.patch.object(self.reloader, 'watched_files', return_value=[self.existing_file]), \
|
|
mock.patch.object(self.reloader, 'notify_file_changed') as notifier:
|
|
initial_snapshot = {self.existing_file: 1}
|
|
second_snapshot = self.reloader.loop_files(initial_snapshot, time.time())
|
|
notifier.assert_called_once_with(self.existing_file)
|
|
self.assertCountEqual(second_snapshot.keys(), [self.existing_file])
|