Replaced subprocess commands by run() wherever possible.

This commit is contained in:
Claude Paroz 2019-08-23 10:53:36 +02:00 committed by Carlton Gibson
parent 7bd9633320
commit 9386586f31
13 changed files with 42 additions and 42 deletions

View File

@ -54,7 +54,7 @@ Compiler library and Java version 6 or later."""
) )
if options.verbose: if options.verbose:
sys.stdout.write("Running: %s\n" % cmd) sys.stdout.write("Running: %s\n" % cmd)
subprocess.call(cmd.split()) subprocess.run(cmd.split())
else: else:
sys.stdout.write("File %s not found. Sure it exists?\n" % to_compress) sys.stdout.write("File %s not found. Sure it exists?\n" % to_compress)

View File

@ -1,7 +1,7 @@
import fnmatch import fnmatch
import os import os
from pathlib import Path from pathlib import Path
from subprocess import PIPE, Popen from subprocess import PIPE, run
from django.apps import apps as installed_apps from django.apps import apps as installed_apps
from django.utils.crypto import get_random_string from django.utils.crypto import get_random_string
@ -17,13 +17,12 @@ def popen_wrapper(args, stdout_encoding='utf-8'):
Return stdout output, stderr output, and OS status code. Return stdout output, stderr output, and OS status code.
""" """
try: try:
p = Popen(args, shell=False, stdout=PIPE, stderr=PIPE, close_fds=os.name != 'nt') p = run(args, stdout=PIPE, stderr=PIPE, close_fds=os.name != 'nt')
except OSError as err: except OSError as err:
raise CommandError('Error executing %s' % args[0]) from err raise CommandError('Error executing %s' % args[0]) from err
output, errors = p.communicate()
return ( return (
output.decode(stdout_encoding), p.stdout.decode(stdout_encoding),
errors.decode(DEFAULT_LOCALE_ENCODING, errors='replace'), p.stderr.decode(DEFAULT_LOCALE_ENCODING, errors='replace'),
p.returncode p.returncode
) )

View File

@ -45,4 +45,4 @@ class DatabaseClient(BaseDatabaseClient):
def runshell(self): def runshell(self):
args = DatabaseClient.settings_to_cmd_args(self.connection.settings_dict) args = DatabaseClient.settings_to_cmd_args(self.connection.settings_dict)
subprocess.check_call(args) subprocess.run(args, check=True)

View File

@ -14,4 +14,4 @@ class DatabaseClient(BaseDatabaseClient):
wrapper_path = shutil.which(self.wrapper_name) wrapper_path = shutil.which(self.wrapper_name)
if wrapper_path: if wrapper_path:
args = [wrapper_path, *args] args = [wrapper_path, *args]
subprocess.check_call(args) subprocess.run(args, check=True)

View File

@ -9,4 +9,4 @@ class DatabaseClient(BaseDatabaseClient):
def runshell(self): def runshell(self):
args = [self.executable_name, args = [self.executable_name,
self.connection.settings_dict['NAME']] self.connection.settings_dict['NAME']]
subprocess.check_call(args) subprocess.run(args, check=True)

View File

@ -227,9 +227,9 @@ def restart_with_reloader():
new_environ = {**os.environ, DJANGO_AUTORELOAD_ENV: 'true'} new_environ = {**os.environ, DJANGO_AUTORELOAD_ENV: 'true'}
args = get_child_arguments() args = get_child_arguments()
while True: while True:
exit_code = subprocess.call(args, env=new_environ, close_fds=False) p = subprocess.run(args, env=new_environ, close_fds=False)
if exit_code != 3: if p.returncode != 3:
return exit_code return p.returncode
class BaseReloader: class BaseReloader:

View File

@ -77,12 +77,12 @@ def get_git_changeset():
so it's sufficient for generating the development version numbers. so it's sufficient for generating the development version numbers.
""" """
repo_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) repo_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
git_log = subprocess.Popen( git_log = subprocess.run(
'git log --pretty=format:%ct --quiet -1 HEAD', 'git log --pretty=format:%ct --quiet -1 HEAD',
stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
shell=True, cwd=repo_dir, universal_newlines=True, shell=True, cwd=repo_dir, universal_newlines=True,
) )
timestamp = git_log.communicate()[0] timestamp = git_log.stdout
try: try:
timestamp = datetime.datetime.utcfromtimestamp(int(timestamp)) timestamp = datetime.datetime.utcfromtimestamp(int(timestamp))
except ValueError: except ValueError:

View File

@ -20,7 +20,7 @@
import os import os
from argparse import ArgumentParser from argparse import ArgumentParser
from subprocess import PIPE, Popen, call from subprocess import PIPE, run
import django import django
from django.conf import settings from django.conf import settings
@ -73,10 +73,9 @@ def _check_diff(cat_name, base_path):
""" """
po_path = '%(path)s/en/LC_MESSAGES/django%(ext)s.po' % { po_path = '%(path)s/en/LC_MESSAGES/django%(ext)s.po' % {
'path': base_path, 'ext': 'js' if cat_name.endswith('-js') else ''} 'path': base_path, 'ext': 'js' if cat_name.endswith('-js') else ''}
p = Popen("git diff -U0 %s | egrep '^[-+]msgid' | wc -l" % po_path, p = run("git diff -U0 %s | egrep '^[-+]msgid' | wc -l" % po_path,
stdout=PIPE, stderr=PIPE, shell=True) stdout=PIPE, stderr=PIPE, shell=True)
output, errors = p.communicate() num_changes = int(p.stdout.strip())
num_changes = int(output.strip())
print("%d changed/added messages in '%s' catalog." % (num_changes, cat_name)) print("%d changed/added messages in '%s' catalog." % (num_changes, cat_name))
@ -122,18 +121,17 @@ def lang_stats(resources=None, languages=None):
po_path = '{path}/{lang}/LC_MESSAGES/django{ext}.po'.format( po_path = '{path}/{lang}/LC_MESSAGES/django{ext}.po'.format(
path=dir_, lang=lang, ext='js' if name.endswith('-js') else '' path=dir_, lang=lang, ext='js' if name.endswith('-js') else ''
) )
p = Popen( p = run(
['msgfmt', '-vc', '-o', '/dev/null', po_path], ['msgfmt', '-vc', '-o', '/dev/null', po_path],
stdout=PIPE, stderr=PIPE, stdout=PIPE, stderr=PIPE,
env={'LANG': 'C'} env={'LANG': 'C'}
) )
output, errors = p.communicate()
if p.returncode == 0: if p.returncode == 0:
# msgfmt output stats on stderr # msgfmt output stats on stderr
print("%s: %s" % (lang, errors.decode().strip())) print("%s: %s" % (lang, p.stderr.decode().strip()))
else: else:
print("Errors happened when checking %s translation for %s:\n%s" % ( print("Errors happened when checking %s translation for %s:\n%s" % (
lang, name, errors.decode())) lang, name, p.stderr.decode()))
def fetch(resources=None, languages=None): def fetch(resources=None, languages=None):
@ -146,11 +144,11 @@ def fetch(resources=None, languages=None):
for name, dir_ in locale_dirs: for name, dir_ in locale_dirs:
# Transifex pull # Transifex pull
if languages is None: if languages is None:
call('tx pull -r %(res)s -a -f --minimum-perc=5' % {'res': _tx_resource_for_name(name)}, shell=True) run('tx pull -r %(res)s -a -f --minimum-perc=5' % {'res': _tx_resource_for_name(name)}, shell=True)
target_langs = sorted(d for d in os.listdir(dir_) if not d.startswith('_') and d != 'en') target_langs = sorted(d for d in os.listdir(dir_) if not d.startswith('_') and d != 'en')
else: else:
for lang in languages: for lang in languages:
call('tx pull -r %(res)s -f -l %(lang)s' % { run('tx pull -r %(res)s -f -l %(lang)s' % {
'res': _tx_resource_for_name(name), 'lang': lang}, shell=True) 'res': _tx_resource_for_name(name), 'lang': lang}, shell=True)
target_langs = languages target_langs = languages
@ -162,9 +160,9 @@ def fetch(resources=None, languages=None):
print("No %(lang)s translation for resource %(name)s" % { print("No %(lang)s translation for resource %(name)s" % {
'lang': lang, 'name': name}) 'lang': lang, 'name': name})
continue continue
call('msgcat --no-location -o %s %s' % (po_path, po_path), shell=True) run('msgcat --no-location -o %s %s' % (po_path, po_path), shell=True)
res = call('msgfmt -c -o %s.mo %s' % (po_path[:-3], po_path), shell=True) msgfmt = run('msgfmt -c -o %s.mo %s' % (po_path[:-3], po_path), shell=True)
if res != 0: if msgfmt.returncode != 0:
errors.append((name, lang)) errors.append((name, lang))
if errors: if errors:
print("\nWARNING: Errors have occurred in following cases:") print("\nWARNING: Errors have occurred in following cases:")

View File

@ -118,12 +118,13 @@ class AdminScriptTestCase(SimpleTestCase):
test_environ['PYTHONPATH'] = os.pathsep.join(python_path) test_environ['PYTHONPATH'] = os.pathsep.join(python_path)
test_environ['PYTHONWARNINGS'] = '' test_environ['PYTHONWARNINGS'] = ''
return subprocess.Popen( p = subprocess.run(
[sys.executable, script] + args, [sys.executable, script] + args,
stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
cwd=self.test_dir, cwd=self.test_dir,
env=test_environ, universal_newlines=True, env=test_environ, universal_newlines=True,
).communicate() )
return p.stdout, p.stderr
def run_django_admin(self, args, settings_file=None): def run_django_admin(self, args, settings_file=None):
script_dir = os.path.abspath(os.path.join(os.path.dirname(django.__file__), 'bin')) script_dir = os.path.abspath(os.path.join(os.path.dirname(django.__file__), 'bin'))

View File

@ -1,3 +1,4 @@
from subprocess import CompletedProcess
from unittest import mock, skipUnless from unittest import mock, skipUnless
from django.db import connection from django.db import connection
@ -9,13 +10,13 @@ from django.test import SimpleTestCase
class OracleDbshellTests(SimpleTestCase): class OracleDbshellTests(SimpleTestCase):
def _run_dbshell(self, rlwrap=False): def _run_dbshell(self, rlwrap=False):
"""Run runshell command and capture its arguments.""" """Run runshell command and capture its arguments."""
def _mock_subprocess_call(*args): def _mock_subprocess_run(*args):
self.subprocess_args = tuple(*args) self.subprocess_args = list(*args)
return 0 return CompletedProcess(self.subprocess_args, 0)
client = DatabaseClient(connection) client = DatabaseClient(connection)
self.subprocess_args = None self.subprocess_args = None
with mock.patch('subprocess.call', new=_mock_subprocess_call): with mock.patch('subprocess.run', new=_mock_subprocess_run):
with mock.patch('shutil.which', return_value='/usr/bin/rlwrap' if rlwrap else None): with mock.patch('shutil.which', return_value='/usr/bin/rlwrap' if rlwrap else None):
client.runshell() client.runshell()
return self.subprocess_args return self.subprocess_args

View File

@ -4,7 +4,7 @@ import stat
import unittest import unittest
from io import StringIO from io import StringIO
from pathlib import Path from pathlib import Path
from subprocess import Popen from subprocess import run
from unittest import mock from unittest import mock
from django.core.management import ( from django.core.management import (
@ -184,7 +184,7 @@ class CompilationErrorHandling(MessageCompilationTests):
# Make sure the output of msgfmt is unaffected by the current locale. # Make sure the output of msgfmt is unaffected by the current locale.
env = os.environ.copy() env = os.environ.copy()
env.update({'LANG': 'C'}) env.update({'LANG': 'C'})
with mock.patch('django.core.management.utils.Popen', lambda *args, **kwargs: Popen(*args, env=env, **kwargs)): with mock.patch('django.core.management.utils.run', lambda *args, **kwargs: run(*args, env=env, **kwargs)):
cmd = MakeMessagesCommand() cmd = MakeMessagesCommand()
if cmd.gettext_version < (0, 18, 3): if cmd.gettext_version < (0, 18, 3):
self.skipTest("python-brace-format is a recent gettext addition.") self.skipTest("python-brace-format is a recent gettext addition.")

View File

@ -355,22 +355,22 @@ def bisect_tests(bisection_label, options, test_labels, parallel, start_at, star
test_labels_b = test_labels[midpoint:] + [bisection_label] test_labels_b = test_labels[midpoint:] + [bisection_label]
print('***** Pass %da: Running the first half of the test suite' % iteration) print('***** Pass %da: Running the first half of the test suite' % iteration)
print('***** Test labels: %s' % ' '.join(test_labels_a)) print('***** Test labels: %s' % ' '.join(test_labels_a))
failures_a = subprocess.call(subprocess_args + test_labels_a) failures_a = subprocess.run(subprocess_args + test_labels_a)
print('***** Pass %db: Running the second half of the test suite' % iteration) print('***** Pass %db: Running the second half of the test suite' % iteration)
print('***** Test labels: %s' % ' '.join(test_labels_b)) print('***** Test labels: %s' % ' '.join(test_labels_b))
print('') print('')
failures_b = subprocess.call(subprocess_args + test_labels_b) failures_b = subprocess.run(subprocess_args + test_labels_b)
if failures_a and not failures_b: if failures_a.returncode and not failures_b.returncode:
print("***** Problem found in first half. Bisecting again...") print("***** Problem found in first half. Bisecting again...")
iteration += 1 iteration += 1
test_labels = test_labels_a[:-1] test_labels = test_labels_a[:-1]
elif failures_b and not failures_a: elif failures_b.returncode and not failures_a.returncode:
print("***** Problem found in second half. Bisecting again...") print("***** Problem found in second half. Bisecting again...")
iteration += 1 iteration += 1
test_labels = test_labels_b[:-1] test_labels = test_labels_b[:-1]
elif failures_a and failures_b: elif failures_a.returncode and failures_b.returncode:
print("***** Multiple sources of failure found") print("***** Multiple sources of failure found")
break break
else: else:

View File

@ -11,6 +11,7 @@ import weakref
import zipfile import zipfile
from importlib import import_module from importlib import import_module
from pathlib import Path from pathlib import Path
from subprocess import CompletedProcess
from unittest import mock, skip, skipIf from unittest import mock, skip, skipIf
from django.apps.registry import Apps from django.apps.registry import Apps
@ -345,7 +346,7 @@ class RestartWithReloaderTests(SimpleTestCase):
executable = '/usr/bin/python' executable = '/usr/bin/python'
def patch_autoreload(self, argv): def patch_autoreload(self, argv):
patch_call = mock.patch('django.utils.autoreload.subprocess.call', return_value=0) patch_call = mock.patch('django.utils.autoreload.subprocess.run', return_value=CompletedProcess(argv, 0))
patches = [ patches = [
mock.patch('django.utils.autoreload.sys.argv', argv), mock.patch('django.utils.autoreload.sys.argv', argv),
mock.patch('django.utils.autoreload.sys.executable', self.executable), mock.patch('django.utils.autoreload.sys.executable', self.executable),