import os import subprocess from django.core.files.temp import NamedTemporaryFile from django.db.backends.base.client import BaseDatabaseClient from django.utils.six import print_ def _escape_pgpass(txt): """ Escape a fragment of a PostgreSQL .pgpass file. """ return txt.replace('\\', '\\\\').replace(':', '\\:') class DatabaseClient(BaseDatabaseClient): executable_name = 'psql' @classmethod def runshell_db(cls, conn_params): args = [cls.executable_name] host = conn_params.get('host', '') port = conn_params.get('port', '') dbname = conn_params.get('database', '') user = conn_params.get('user', '') passwd = conn_params.get('password', '') if user: args += ['-U', user] if host: args += ['-h', host] if port: args += ['-p', str(port)] args += [dbname] temp_pgpass = None try: if passwd: # Create temporary .pgpass file. temp_pgpass = NamedTemporaryFile(mode='w+') try: print_( _escape_pgpass(host) or '*', str(port) or '*', _escape_pgpass(dbname) or '*', _escape_pgpass(user) or '*', _escape_pgpass(passwd), file=temp_pgpass, sep=':', flush=True, ) os.environ['PGPASSFILE'] = temp_pgpass.name except UnicodeEncodeError: # If the current locale can't encode the data, we let # the user input the password manually. pass subprocess.check_call(args) finally: if temp_pgpass: temp_pgpass.close() if 'PGPASSFILE' in os.environ: # unit tests need cleanup del os.environ['PGPASSFILE'] def runshell(self): DatabaseClient.runshell_db(self.connection.get_connection_params())