From a55332091d96de4b6b6bf0668268dad9549c152a Mon Sep 17 00:00:00 2001 From: hpk Date: Tue, 23 Sep 2008 16:28:13 +0200 Subject: [PATCH] [svn r58385] * move cross-platform process kill functionality and move it to py.process.kill(pid) * simplify test_auth.py * use new functionality from some tests --HG-- branch : trunk --- py/__init__.py | 5 +- py/execnet/testing/test_gateway.py | 4 +- py/misc/killproc.py | 10 - py/misc/testing/test_oskill.py | 23 --- py/path/svn/testing/test_auth.py | 310 +++++++++++++--------------- py/process/killproc.py | 23 +++ py/process/testing/test_killproc.py | 13 ++ 7 files changed, 179 insertions(+), 209 deletions(-) delete mode 100644 py/misc/killproc.py delete mode 100644 py/misc/testing/test_oskill.py create mode 100644 py/process/killproc.py create mode 100644 py/process/testing/test_killproc.py diff --git a/py/__init__.py b/py/__init__.py index 1c7bbdddd..08c38f068 100644 --- a/py/__init__.py +++ b/py/__init__.py @@ -26,8 +26,8 @@ version = "1.0.0a1" initpkg(__name__, description = "pylib and py.test: agile development and test support library", - revision = int('$LastChangedRevision: 58308 $'.split(':')[1][:-1]), - lastchangedate = '$LastChangedDate: 2008-09-21 17:15:28 +0200 (Sun, 21 Sep 2008) $', + revision = int('$LastChangedRevision: 58385 $'.split(':')[1][:-1]), + lastchangedate = '$LastChangedDate: 2008-09-23 16:28:13 +0200 (Tue, 23 Sep 2008) $', version = version, url = "http://pylib.org", download_url = "http://codespeak.net/py/0.9.2/download.html", @@ -99,6 +99,7 @@ initpkg(__name__, 'process.__doc__' : ('./process/__init__.py', '__doc__'), 'process.cmdexec' : ('./process/cmdexec.py', 'cmdexec'), + 'process.kill' : ('./process/killproc.py', 'kill'), 'process.ForkedFunc' : ('./process/forkedfunc.py', 'ForkedFunc'), # path implementation diff --git a/py/execnet/testing/test_gateway.py b/py/execnet/testing/test_gateway.py index d65fb5db2..94d2ec68f 100644 --- a/py/execnet/testing/test_gateway.py +++ b/py/execnet/testing/test_gateway.py @@ -516,8 +516,6 @@ class TestPopenGateway(PopenGatewayTestSetup, BasicRemoteExecution): def test_waitclose_on_remote_killed(self): py.test.skip("fix needed: dying remote process does not cause waitclose() to fail") - if not hasattr(py.std.os, 'kill'): - py.test.skip("no os.kill") gw = py.execnet.PopenGateway() channel = gw.remote_exec(""" import os @@ -527,7 +525,7 @@ class TestPopenGateway(PopenGatewayTestSetup, BasicRemoteExecution): channel.send("#" * 100) """) remotepid = channel.receive() - os.kill(remotepid, 9) + py.process.kill(remotepid) py.test.raises(channel.RemoteError, "channel.waitclose(TESTTIMEOUT)") py.test.raises(EOFError, channel.send, None) py.test.raises(EOFError, channel.receive) diff --git a/py/misc/killproc.py b/py/misc/killproc.py deleted file mode 100644 index 890798766..000000000 --- a/py/misc/killproc.py +++ /dev/null @@ -1,10 +0,0 @@ - -import py -import os, sys - -def killproc(pid): - if sys.platform == "win32": - py.process.cmdexec("taskkill /F /PID %d" %(pid,)) - else: - os.kill(pid, 15) - diff --git a/py/misc/testing/test_oskill.py b/py/misc/testing/test_oskill.py deleted file mode 100644 index 4a46a6f33..000000000 --- a/py/misc/testing/test_oskill.py +++ /dev/null @@ -1,23 +0,0 @@ - -import py, sys - -from py.__.misc.killproc import killproc - -def test_win_killsubprocess(): - if sys.platform == 'win32' and not py.path.local.sysfind('taskkill'): - py.test.skip("you\'re using an older version of windows, which " - "doesn\'t support 'taskkill' - py.misc.killproc is not " - "available") - try: - import subprocess - except ImportError: - py.test.skip("no subprocess module") - tmp = py.test.ensuretemp("test_win_killsubprocess") - t = tmp.join("t.py") - t.write("import time ; time.sleep(100)") - proc = py.std.subprocess.Popen([sys.executable, str(t)]) - assert proc.poll() is None # no return value yet - killproc(proc.pid) - ret = proc.wait() - assert ret != 0 - diff --git a/py/path/svn/testing/test_auth.py b/py/path/svn/testing/test_auth.py index 33f8cc993..fce4817e0 100644 --- a/py/path/svn/testing/test_auth.py +++ b/py/path/svn/testing/test_auth.py @@ -3,7 +3,6 @@ from py.path import SvnAuth import svntestbase from threading import Thread import time -from py.__.misc.killproc import killproc from py.__.conftest import option def make_repo_auth(repo, userdata): @@ -269,6 +268,10 @@ class SvnAuthFunctionalTestBase(object): func_name)) self.auth = py.path.SvnAuth('johnny', 'foo', cache_auth=False, interactive=False) + self.port, self.pid = self._start_svnserve() + + def teardown_method(self, method): + py.process.kill(self.pid) def _start_svnserve(self): make_repo_auth(self.repopath, {'johnny': ('foo', 'rw')}) @@ -279,203 +282,168 @@ class SvnAuthFunctionalTestBase(object): class TestSvnWCAuthFunctional(SvnAuthFunctionalTestBase): def test_checkout_constructor_arg(self): - port, pid = self._start_svnserve() - try: - wc = py.path.svnwc(self.temppath, auth=self.auth) - wc.checkout( - 'svn://localhost:%s/%s' % (port, self.repopath.basename)) - assert wc.join('.svn').check() - finally: - # XXX can we do this in a teardown_method too? not sure if that's - # guaranteed to get called... - killproc(pid) + port = self.port + wc = py.path.svnwc(self.temppath, auth=self.auth) + wc.checkout( + 'svn://localhost:%s/%s' % (port, self.repopath.basename)) + assert wc.join('.svn').check() def test_checkout_function_arg(self): - port, pid = self._start_svnserve() - try: - wc = py.path.svnwc(self.temppath, auth=self.auth) - wc.checkout( - 'svn://localhost:%s/%s' % (port, self.repopath.basename)) - assert wc.join('.svn').check() - finally: - killproc(pid) + port = self.port + wc = py.path.svnwc(self.temppath, auth=self.auth) + wc.checkout( + 'svn://localhost:%s/%s' % (port, self.repopath.basename)) + assert wc.join('.svn').check() def test_checkout_failing_non_interactive(self): - port, pid = self._start_svnserve() - try: - auth = py.path.SvnAuth('johnny', 'bar', cache_auth=False, - interactive=False) - wc = py.path.svnwc(self.temppath, auth) - py.test.raises(Exception, - ("wc.checkout('svn://localhost:%s/%s' % " - "(port, self.repopath.basename))")) - finally: - killproc(pid) + port = self.port + auth = py.path.SvnAuth('johnny', 'bar', cache_auth=False, + interactive=False) + wc = py.path.svnwc(self.temppath, auth) + py.test.raises(Exception, + ("wc.checkout('svn://localhost:%s/%s' % " + "(port, self.repopath.basename))")) def test_log(self): - port, pid = self._start_svnserve() - try: - wc = py.path.svnwc(self.temppath, self.auth) - wc.checkout( - 'svn://localhost:%s/%s' % (port, self.repopath.basename)) - foo = wc.ensure('foo.txt') - wc.commit('added foo.txt') - log = foo.log() - assert len(log) == 1 - assert log[0].msg == 'added foo.txt' - finally: - killproc(pid) + port = self.port + wc = py.path.svnwc(self.temppath, self.auth) + wc.checkout( + 'svn://localhost:%s/%s' % (port, self.repopath.basename)) + foo = wc.ensure('foo.txt') + wc.commit('added foo.txt') + log = foo.log() + assert len(log) == 1 + assert log[0].msg == 'added foo.txt' def test_switch(self): - port, pid = self._start_svnserve() - try: - wc = py.path.svnwc(self.temppath, auth=self.auth) - svnurl = 'svn://localhost:%s/%s' % (port, self.repopath.basename) - wc.checkout(svnurl) - wc.ensure('foo', dir=True).ensure('foo.txt').write('foo') - wc.commit('added foo dir with foo.txt file') - wc.ensure('bar', dir=True) - wc.commit('added bar dir') - bar = wc.join('bar') - bar.switch(svnurl + '/foo') - assert bar.join('foo.txt') - finally: - killproc(pid) + port = self.port + wc = py.path.svnwc(self.temppath, auth=self.auth) + svnurl = 'svn://localhost:%s/%s' % (port, self.repopath.basename) + wc.checkout(svnurl) + wc.ensure('foo', dir=True).ensure('foo.txt').write('foo') + wc.commit('added foo dir with foo.txt file') + wc.ensure('bar', dir=True) + wc.commit('added bar dir') + bar = wc.join('bar') + bar.switch(svnurl + '/foo') + assert bar.join('foo.txt') def test_update(self): - port, pid = self._start_svnserve() - try: - wc1 = py.path.svnwc(self.temppath.ensure('wc1', dir=True), - auth=self.auth) - wc2 = py.path.svnwc(self.temppath.ensure('wc2', dir=True), - auth=self.auth) - wc1.checkout( - 'svn://localhost:%s/%s' % (port, self.repopath.basename)) - wc2.checkout( - 'svn://localhost:%s/%s' % (port, self.repopath.basename)) - wc1.ensure('foo', dir=True) - wc1.commit('added foo dir') - wc2.update() - assert wc2.join('foo').check() + port = self.port + wc1 = py.path.svnwc(self.temppath.ensure('wc1', dir=True), + auth=self.auth) + wc2 = py.path.svnwc(self.temppath.ensure('wc2', dir=True), + auth=self.auth) + wc1.checkout( + 'svn://localhost:%s/%s' % (port, self.repopath.basename)) + wc2.checkout( + 'svn://localhost:%s/%s' % (port, self.repopath.basename)) + wc1.ensure('foo', dir=True) + wc1.commit('added foo dir') + wc2.update() + assert wc2.join('foo').check() - auth = py.path.SvnAuth('unknown', 'unknown', interactive=False) - wc2.auth = auth - py.test.raises(Exception, 'wc2.update()') - finally: - killproc(pid) + auth = py.path.SvnAuth('unknown', 'unknown', interactive=False) + wc2.auth = auth + py.test.raises(Exception, 'wc2.update()') def test_lock_unlock_status(self): - port, pid = self._start_svnserve() - try: - wc = py.path.svnwc(self.temppath, auth=self.auth) - wc.checkout( - 'svn://localhost:%s/%s' % (port, self.repopath.basename,)) - wc.ensure('foo', file=True) - wc.commit('added foo file') - foo = wc.join('foo') - foo.lock() - status = foo.status() - assert status.locked - foo.unlock() - status = foo.status() - assert not status.locked + port = self.port + wc = py.path.svnwc(self.temppath, auth=self.auth) + wc.checkout( + 'svn://localhost:%s/%s' % (port, self.repopath.basename,)) + wc.ensure('foo', file=True) + wc.commit('added foo file') + foo = wc.join('foo') + foo.lock() + status = foo.status() + assert status.locked + foo.unlock() + status = foo.status() + assert not status.locked - auth = py.path.SvnAuth('unknown', 'unknown', interactive=False) - foo.auth = auth - py.test.raises(Exception, 'foo.lock()') - py.test.raises(Exception, 'foo.unlock()') - finally: - killproc(pid) + auth = py.path.SvnAuth('unknown', 'unknown', interactive=False) + foo.auth = auth + py.test.raises(Exception, 'foo.lock()') + py.test.raises(Exception, 'foo.unlock()') def test_diff(self): - port, pid = self._start_svnserve() - try: - wc = py.path.svnwc(self.temppath, auth=self.auth) - wc.checkout( - 'svn://localhost:%s/%s' % (port, self.repopath.basename,)) - wc.ensure('foo', file=True) - wc.commit('added foo file') - wc.update() - rev = int(wc.status().rev) - foo = wc.join('foo') - foo.write('bar') - diff = foo.diff() - assert '\n+bar\n' in diff - foo.commit('added some content') - diff = foo.diff() - assert not diff - diff = foo.diff(rev=rev) - assert '\n+bar\n' in diff + port = self.port + wc = py.path.svnwc(self.temppath, auth=self.auth) + wc.checkout( + 'svn://localhost:%s/%s' % (port, self.repopath.basename,)) + wc.ensure('foo', file=True) + wc.commit('added foo file') + wc.update() + rev = int(wc.status().rev) + foo = wc.join('foo') + foo.write('bar') + diff = foo.diff() + assert '\n+bar\n' in diff + foo.commit('added some content') + diff = foo.diff() + assert not diff + diff = foo.diff(rev=rev) + assert '\n+bar\n' in diff - auth = py.path.SvnAuth('unknown', 'unknown', interactive=False) - foo.auth = auth - py.test.raises(Exception, 'foo.diff(rev=rev)') - finally: - killproc(pid) + auth = py.path.SvnAuth('unknown', 'unknown', interactive=False) + foo.auth = auth + py.test.raises(Exception, 'foo.diff(rev=rev)') class TestSvnURLAuthFunctional(SvnAuthFunctionalTestBase): def test_listdir(self): - port, pid = self._start_svnserve() - try: - u = py.path.svnurl( - 'svn://localhost:%s/%s' % (port, self.repopath.basename), - auth=self.auth) - u.ensure('foo') - paths = u.listdir() - assert len(paths) == 1 - assert paths[0].auth is self.auth + port = self.port + u = py.path.svnurl( + 'svn://localhost:%s/%s' % (port, self.repopath.basename), + auth=self.auth) + u.ensure('foo') + paths = u.listdir() + assert len(paths) == 1 + assert paths[0].auth is self.auth - auth = SvnAuth('foo', 'bar', interactive=False) - u = py.path.svnurl( - 'svn://localhost:%s/%s' % (port, self.repopath.basename), - auth=auth) - py.test.raises(Exception, 'u.listdir()') - finally: - killproc(pid) + auth = SvnAuth('foo', 'bar', interactive=False) + u = py.path.svnurl( + 'svn://localhost:%s/%s' % (port, self.repopath.basename), + auth=auth) + py.test.raises(Exception, 'u.listdir()') def test_copy(self): - port, pid = self._start_svnserve() - try: - u = py.path.svnurl( - 'svn://localhost:%s/%s' % (port, self.repopath.basename), - auth=self.auth) - foo = u.ensure('foo') - bar = u.join('bar') - foo.copy(bar) - assert bar.check() - assert bar.auth is self.auth + port = self.port + u = py.path.svnurl( + 'svn://localhost:%s/%s' % (port, self.repopath.basename), + auth=self.auth) + foo = u.ensure('foo') + bar = u.join('bar') + foo.copy(bar) + assert bar.check() + assert bar.auth is self.auth - auth = SvnAuth('foo', 'bar', interactive=False) - u = py.path.svnurl( - 'svn://localhost:%s/%s' % (port, self.repopath.basename), - auth=auth) - foo = u.join('foo') - bar = u.join('bar') - py.test.raises(Exception, 'foo.copy(bar)') - finally: - killproc(pid) + auth = SvnAuth('foo', 'bar', interactive=False) + u = py.path.svnurl( + 'svn://localhost:%s/%s' % (port, self.repopath.basename), + auth=auth) + foo = u.join('foo') + bar = u.join('bar') + py.test.raises(Exception, 'foo.copy(bar)') def test_write_read(self): - port, pid = self._start_svnserve() + port = self.port + u = py.path.svnurl( + 'svn://localhost:%s/%s' % (port, self.repopath.basename), + auth=self.auth) + foo = u.ensure('foo') + fp = foo.open() try: - u = py.path.svnurl( - 'svn://localhost:%s/%s' % (port, self.repopath.basename), - auth=self.auth) - foo = u.ensure('foo') - fp = foo.open() - try: - data = fp.read() - finally: - fp.close() - assert data == '' - - auth = SvnAuth('foo', 'bar', interactive=False) - u = py.path.svnurl( - 'svn://localhost:%s/%s' % (port, self.repopath.basename), - auth=auth) - foo = u.join('foo') - py.test.raises(Exception, 'foo.open()') + data = fp.read() finally: - killproc(pid) + fp.close() + assert data == '' + + auth = SvnAuth('foo', 'bar', interactive=False) + u = py.path.svnurl( + 'svn://localhost:%s/%s' % (port, self.repopath.basename), + auth=auth) + foo = u.join('foo') + py.test.raises(Exception, 'foo.open()') # XXX rinse, repeat... :| diff --git a/py/process/killproc.py b/py/process/killproc.py new file mode 100644 index 000000000..f3e221677 --- /dev/null +++ b/py/process/killproc.py @@ -0,0 +1,23 @@ +import py +import os, sys + +if sys.platform == "win32": + try: + import ctypes + except ImportError: + def dokill(pid): + py.process.cmdexec("taskkill /F /PID %d" %(pid,)) + else: + def dokill(pid): + PROCESS_TERMINATE = 1 + handle = ctypes.windll.kernel32.OpenProcess( + PROCESS_TERMINATE, False, process.pid) + ctypes.windll.kernel32.TerminateProcess(handle, -1) + ctypes.windll.kernel32.CloseHandle(handle) +else: + def dokill(pid): + os.kill(pid, 15) + +def kill(pid): + """ kill process by id. """ + dokill(pid) diff --git a/py/process/testing/test_killproc.py b/py/process/testing/test_killproc.py new file mode 100644 index 000000000..09cb71b17 --- /dev/null +++ b/py/process/testing/test_killproc.py @@ -0,0 +1,13 @@ + +import py, sys + +def test_kill(): + subprocess = py.test.importorskip("subprocess") + tmp = py.test.ensuretemp("test_kill") + t = tmp.join("t.py") + t.write("import time ; time.sleep(100)") + proc = py.std.subprocess.Popen([sys.executable, str(t)]) + assert proc.poll() is None # no return value yet + py.process.kill(proc.pid) + ret = proc.wait() + assert ret != 0