[svn r58385] * move cross-platform process kill functionality and move it to

py.process.kill(pid)
* simplify test_auth.py
* use new functionality from some tests

--HG--
branch : trunk
This commit is contained in:
hpk 2008-09-23 16:28:13 +02:00
parent 5b21c540b6
commit a55332091d
7 changed files with 179 additions and 209 deletions

View File

@ -26,8 +26,8 @@ version = "1.0.0a1"
initpkg(__name__,
description = "pylib and py.test: agile development and test support library",
revision = int('$LastChangedRevision: 58308 $'.split(':')[1][:-1]),
lastchangedate = '$LastChangedDate: 2008-09-21 17:15:28 +0200 (Sun, 21 Sep 2008) $',
revision = int('$LastChangedRevision: 58385 $'.split(':')[1][:-1]),
lastchangedate = '$LastChangedDate: 2008-09-23 16:28:13 +0200 (Tue, 23 Sep 2008) $',
version = version,
url = "http://pylib.org",
download_url = "http://codespeak.net/py/0.9.2/download.html",
@ -99,6 +99,7 @@ initpkg(__name__,
'process.__doc__' : ('./process/__init__.py', '__doc__'),
'process.cmdexec' : ('./process/cmdexec.py', 'cmdexec'),
'process.kill' : ('./process/killproc.py', 'kill'),
'process.ForkedFunc' : ('./process/forkedfunc.py', 'ForkedFunc'),
# path implementation

View File

@ -516,8 +516,6 @@ class TestPopenGateway(PopenGatewayTestSetup, BasicRemoteExecution):
def test_waitclose_on_remote_killed(self):
py.test.skip("fix needed: dying remote process does not cause waitclose() to fail")
if not hasattr(py.std.os, 'kill'):
py.test.skip("no os.kill")
gw = py.execnet.PopenGateway()
channel = gw.remote_exec("""
import os
@ -527,7 +525,7 @@ class TestPopenGateway(PopenGatewayTestSetup, BasicRemoteExecution):
channel.send("#" * 100)
""")
remotepid = channel.receive()
os.kill(remotepid, 9)
py.process.kill(remotepid)
py.test.raises(channel.RemoteError, "channel.waitclose(TESTTIMEOUT)")
py.test.raises(EOFError, channel.send, None)
py.test.raises(EOFError, channel.receive)

View File

@ -1,10 +0,0 @@
import py
import os, sys
def killproc(pid):
if sys.platform == "win32":
py.process.cmdexec("taskkill /F /PID %d" %(pid,))
else:
os.kill(pid, 15)

View File

@ -1,23 +0,0 @@
import py, sys
from py.__.misc.killproc import killproc
def test_win_killsubprocess():
if sys.platform == 'win32' and not py.path.local.sysfind('taskkill'):
py.test.skip("you\'re using an older version of windows, which "
"doesn\'t support 'taskkill' - py.misc.killproc is not "
"available")
try:
import subprocess
except ImportError:
py.test.skip("no subprocess module")
tmp = py.test.ensuretemp("test_win_killsubprocess")
t = tmp.join("t.py")
t.write("import time ; time.sleep(100)")
proc = py.std.subprocess.Popen([sys.executable, str(t)])
assert proc.poll() is None # no return value yet
killproc(proc.pid)
ret = proc.wait()
assert ret != 0

View File

@ -3,7 +3,6 @@ from py.path import SvnAuth
import svntestbase
from threading import Thread
import time
from py.__.misc.killproc import killproc
from py.__.conftest import option
def make_repo_auth(repo, userdata):
@ -269,6 +268,10 @@ class SvnAuthFunctionalTestBase(object):
func_name))
self.auth = py.path.SvnAuth('johnny', 'foo', cache_auth=False,
interactive=False)
self.port, self.pid = self._start_svnserve()
def teardown_method(self, method):
py.process.kill(self.pid)
def _start_svnserve(self):
make_repo_auth(self.repopath, {'johnny': ('foo', 'rw')})
@ -279,203 +282,168 @@ class SvnAuthFunctionalTestBase(object):
class TestSvnWCAuthFunctional(SvnAuthFunctionalTestBase):
def test_checkout_constructor_arg(self):
port, pid = self._start_svnserve()
try:
wc = py.path.svnwc(self.temppath, auth=self.auth)
wc.checkout(
'svn://localhost:%s/%s' % (port, self.repopath.basename))
assert wc.join('.svn').check()
finally:
# XXX can we do this in a teardown_method too? not sure if that's
# guaranteed to get called...
killproc(pid)
port = self.port
wc = py.path.svnwc(self.temppath, auth=self.auth)
wc.checkout(
'svn://localhost:%s/%s' % (port, self.repopath.basename))
assert wc.join('.svn').check()
def test_checkout_function_arg(self):
port, pid = self._start_svnserve()
try:
wc = py.path.svnwc(self.temppath, auth=self.auth)
wc.checkout(
'svn://localhost:%s/%s' % (port, self.repopath.basename))
assert wc.join('.svn').check()
finally:
killproc(pid)
port = self.port
wc = py.path.svnwc(self.temppath, auth=self.auth)
wc.checkout(
'svn://localhost:%s/%s' % (port, self.repopath.basename))
assert wc.join('.svn').check()
def test_checkout_failing_non_interactive(self):
port, pid = self._start_svnserve()
try:
auth = py.path.SvnAuth('johnny', 'bar', cache_auth=False,
interactive=False)
wc = py.path.svnwc(self.temppath, auth)
py.test.raises(Exception,
("wc.checkout('svn://localhost:%s/%s' % "
"(port, self.repopath.basename))"))
finally:
killproc(pid)
port = self.port
auth = py.path.SvnAuth('johnny', 'bar', cache_auth=False,
interactive=False)
wc = py.path.svnwc(self.temppath, auth)
py.test.raises(Exception,
("wc.checkout('svn://localhost:%s/%s' % "
"(port, self.repopath.basename))"))
def test_log(self):
port, pid = self._start_svnserve()
try:
wc = py.path.svnwc(self.temppath, self.auth)
wc.checkout(
'svn://localhost:%s/%s' % (port, self.repopath.basename))
foo = wc.ensure('foo.txt')
wc.commit('added foo.txt')
log = foo.log()
assert len(log) == 1
assert log[0].msg == 'added foo.txt'
finally:
killproc(pid)
port = self.port
wc = py.path.svnwc(self.temppath, self.auth)
wc.checkout(
'svn://localhost:%s/%s' % (port, self.repopath.basename))
foo = wc.ensure('foo.txt')
wc.commit('added foo.txt')
log = foo.log()
assert len(log) == 1
assert log[0].msg == 'added foo.txt'
def test_switch(self):
port, pid = self._start_svnserve()
try:
wc = py.path.svnwc(self.temppath, auth=self.auth)
svnurl = 'svn://localhost:%s/%s' % (port, self.repopath.basename)
wc.checkout(svnurl)
wc.ensure('foo', dir=True).ensure('foo.txt').write('foo')
wc.commit('added foo dir with foo.txt file')
wc.ensure('bar', dir=True)
wc.commit('added bar dir')
bar = wc.join('bar')
bar.switch(svnurl + '/foo')
assert bar.join('foo.txt')
finally:
killproc(pid)
port = self.port
wc = py.path.svnwc(self.temppath, auth=self.auth)
svnurl = 'svn://localhost:%s/%s' % (port, self.repopath.basename)
wc.checkout(svnurl)
wc.ensure('foo', dir=True).ensure('foo.txt').write('foo')
wc.commit('added foo dir with foo.txt file')
wc.ensure('bar', dir=True)
wc.commit('added bar dir')
bar = wc.join('bar')
bar.switch(svnurl + '/foo')
assert bar.join('foo.txt')
def test_update(self):
port, pid = self._start_svnserve()
try:
wc1 = py.path.svnwc(self.temppath.ensure('wc1', dir=True),
auth=self.auth)
wc2 = py.path.svnwc(self.temppath.ensure('wc2', dir=True),
auth=self.auth)
wc1.checkout(
'svn://localhost:%s/%s' % (port, self.repopath.basename))
wc2.checkout(
'svn://localhost:%s/%s' % (port, self.repopath.basename))
wc1.ensure('foo', dir=True)
wc1.commit('added foo dir')
wc2.update()
assert wc2.join('foo').check()
port = self.port
wc1 = py.path.svnwc(self.temppath.ensure('wc1', dir=True),
auth=self.auth)
wc2 = py.path.svnwc(self.temppath.ensure('wc2', dir=True),
auth=self.auth)
wc1.checkout(
'svn://localhost:%s/%s' % (port, self.repopath.basename))
wc2.checkout(
'svn://localhost:%s/%s' % (port, self.repopath.basename))
wc1.ensure('foo', dir=True)
wc1.commit('added foo dir')
wc2.update()
assert wc2.join('foo').check()
auth = py.path.SvnAuth('unknown', 'unknown', interactive=False)
wc2.auth = auth
py.test.raises(Exception, 'wc2.update()')
finally:
killproc(pid)
auth = py.path.SvnAuth('unknown', 'unknown', interactive=False)
wc2.auth = auth
py.test.raises(Exception, 'wc2.update()')
def test_lock_unlock_status(self):
port, pid = self._start_svnserve()
try:
wc = py.path.svnwc(self.temppath, auth=self.auth)
wc.checkout(
'svn://localhost:%s/%s' % (port, self.repopath.basename,))
wc.ensure('foo', file=True)
wc.commit('added foo file')
foo = wc.join('foo')
foo.lock()
status = foo.status()
assert status.locked
foo.unlock()
status = foo.status()
assert not status.locked
port = self.port
wc = py.path.svnwc(self.temppath, auth=self.auth)
wc.checkout(
'svn://localhost:%s/%s' % (port, self.repopath.basename,))
wc.ensure('foo', file=True)
wc.commit('added foo file')
foo = wc.join('foo')
foo.lock()
status = foo.status()
assert status.locked
foo.unlock()
status = foo.status()
assert not status.locked
auth = py.path.SvnAuth('unknown', 'unknown', interactive=False)
foo.auth = auth
py.test.raises(Exception, 'foo.lock()')
py.test.raises(Exception, 'foo.unlock()')
finally:
killproc(pid)
auth = py.path.SvnAuth('unknown', 'unknown', interactive=False)
foo.auth = auth
py.test.raises(Exception, 'foo.lock()')
py.test.raises(Exception, 'foo.unlock()')
def test_diff(self):
port, pid = self._start_svnserve()
try:
wc = py.path.svnwc(self.temppath, auth=self.auth)
wc.checkout(
'svn://localhost:%s/%s' % (port, self.repopath.basename,))
wc.ensure('foo', file=True)
wc.commit('added foo file')
wc.update()
rev = int(wc.status().rev)
foo = wc.join('foo')
foo.write('bar')
diff = foo.diff()
assert '\n+bar\n' in diff
foo.commit('added some content')
diff = foo.diff()
assert not diff
diff = foo.diff(rev=rev)
assert '\n+bar\n' in diff
port = self.port
wc = py.path.svnwc(self.temppath, auth=self.auth)
wc.checkout(
'svn://localhost:%s/%s' % (port, self.repopath.basename,))
wc.ensure('foo', file=True)
wc.commit('added foo file')
wc.update()
rev = int(wc.status().rev)
foo = wc.join('foo')
foo.write('bar')
diff = foo.diff()
assert '\n+bar\n' in diff
foo.commit('added some content')
diff = foo.diff()
assert not diff
diff = foo.diff(rev=rev)
assert '\n+bar\n' in diff
auth = py.path.SvnAuth('unknown', 'unknown', interactive=False)
foo.auth = auth
py.test.raises(Exception, 'foo.diff(rev=rev)')
finally:
killproc(pid)
auth = py.path.SvnAuth('unknown', 'unknown', interactive=False)
foo.auth = auth
py.test.raises(Exception, 'foo.diff(rev=rev)')
class TestSvnURLAuthFunctional(SvnAuthFunctionalTestBase):
def test_listdir(self):
port, pid = self._start_svnserve()
try:
u = py.path.svnurl(
'svn://localhost:%s/%s' % (port, self.repopath.basename),
auth=self.auth)
u.ensure('foo')
paths = u.listdir()
assert len(paths) == 1
assert paths[0].auth is self.auth
port = self.port
u = py.path.svnurl(
'svn://localhost:%s/%s' % (port, self.repopath.basename),
auth=self.auth)
u.ensure('foo')
paths = u.listdir()
assert len(paths) == 1
assert paths[0].auth is self.auth
auth = SvnAuth('foo', 'bar', interactive=False)
u = py.path.svnurl(
'svn://localhost:%s/%s' % (port, self.repopath.basename),
auth=auth)
py.test.raises(Exception, 'u.listdir()')
finally:
killproc(pid)
auth = SvnAuth('foo', 'bar', interactive=False)
u = py.path.svnurl(
'svn://localhost:%s/%s' % (port, self.repopath.basename),
auth=auth)
py.test.raises(Exception, 'u.listdir()')
def test_copy(self):
port, pid = self._start_svnserve()
try:
u = py.path.svnurl(
'svn://localhost:%s/%s' % (port, self.repopath.basename),
auth=self.auth)
foo = u.ensure('foo')
bar = u.join('bar')
foo.copy(bar)
assert bar.check()
assert bar.auth is self.auth
port = self.port
u = py.path.svnurl(
'svn://localhost:%s/%s' % (port, self.repopath.basename),
auth=self.auth)
foo = u.ensure('foo')
bar = u.join('bar')
foo.copy(bar)
assert bar.check()
assert bar.auth is self.auth
auth = SvnAuth('foo', 'bar', interactive=False)
u = py.path.svnurl(
'svn://localhost:%s/%s' % (port, self.repopath.basename),
auth=auth)
foo = u.join('foo')
bar = u.join('bar')
py.test.raises(Exception, 'foo.copy(bar)')
finally:
killproc(pid)
auth = SvnAuth('foo', 'bar', interactive=False)
u = py.path.svnurl(
'svn://localhost:%s/%s' % (port, self.repopath.basename),
auth=auth)
foo = u.join('foo')
bar = u.join('bar')
py.test.raises(Exception, 'foo.copy(bar)')
def test_write_read(self):
port, pid = self._start_svnserve()
port = self.port
u = py.path.svnurl(
'svn://localhost:%s/%s' % (port, self.repopath.basename),
auth=self.auth)
foo = u.ensure('foo')
fp = foo.open()
try:
u = py.path.svnurl(
'svn://localhost:%s/%s' % (port, self.repopath.basename),
auth=self.auth)
foo = u.ensure('foo')
fp = foo.open()
try:
data = fp.read()
finally:
fp.close()
assert data == ''
auth = SvnAuth('foo', 'bar', interactive=False)
u = py.path.svnurl(
'svn://localhost:%s/%s' % (port, self.repopath.basename),
auth=auth)
foo = u.join('foo')
py.test.raises(Exception, 'foo.open()')
data = fp.read()
finally:
killproc(pid)
fp.close()
assert data == ''
auth = SvnAuth('foo', 'bar', interactive=False)
u = py.path.svnurl(
'svn://localhost:%s/%s' % (port, self.repopath.basename),
auth=auth)
foo = u.join('foo')
py.test.raises(Exception, 'foo.open()')
# XXX rinse, repeat... :|

23
py/process/killproc.py Normal file
View File

@ -0,0 +1,23 @@
import py
import os, sys
if sys.platform == "win32":
try:
import ctypes
except ImportError:
def dokill(pid):
py.process.cmdexec("taskkill /F /PID %d" %(pid,))
else:
def dokill(pid):
PROCESS_TERMINATE = 1
handle = ctypes.windll.kernel32.OpenProcess(
PROCESS_TERMINATE, False, process.pid)
ctypes.windll.kernel32.TerminateProcess(handle, -1)
ctypes.windll.kernel32.CloseHandle(handle)
else:
def dokill(pid):
os.kill(pid, 15)
def kill(pid):
""" kill process by id. """
dokill(pid)

View File

@ -0,0 +1,13 @@
import py, sys
def test_kill():
subprocess = py.test.importorskip("subprocess")
tmp = py.test.ensuretemp("test_kill")
t = tmp.join("t.py")
t.write("import time ; time.sleep(100)")
proc = py.std.subprocess.Popen([sys.executable, str(t)])
assert proc.poll() is None # no return value yet
py.process.kill(proc.pid)
ret = proc.wait()
assert ret != 0