test_ok2/py/execnet/rsync.py

154 lines
6.1 KiB
Python

import py, os, stat, md5
from Queue import Queue
class RSync(object):
""" This is an example usage of py.execnet - a sample RSync
protocol, which can perform syncing 1-to-n.
Sample usage: you instantiate this class, eventually providing a
callback when rsyncing is done, than add some targets
(gateway + destdir) by running add_target and finally
invoking send() which will send provided source tree remotely.
There is limited support for symlinks, which means that symlinks
pointing to the sourcetree will be send "as is" while external
symlinks will be just copied (regardless of existance of such
a path on remote side)
"""
def __init__(self, callback=None, **options):
for name in options:
assert name in ('delete')
self.options = options
self.callback = callback
self.channels = {}
self.receivequeue = Queue()
self.links = []
def filter(self, path):
return True
def add_target(self, gateway, destdir, finishedcallback=None):
""" Adds a target for to-be-send data
"""
def itemcallback(req):
self.receivequeue.put((channel, req))
channel = gateway.remote_exec(REMOTE_SOURCE)
channel.setcallback(itemcallback, endmarker = None)
channel.send((str(destdir), self.options))
self.channels[channel] = finishedcallback
def send(self, sourcedir):
""" Sends a sourcedir to previously prepared targets
"""
self.sourcedir = str(sourcedir)
# normalize a trailing '/' away
self.sourcedir = os.path.dirname(os.path.join(self.sourcedir, 'x'))
# send directory structure and file timestamps/sizes
self._send_directory_structure(self.sourcedir)
# paths and to_send are only used for doing
# progress-related callbacks
self.paths = {}
self.to_send = {}
# send modified file to clients
while self.channels:
channel, req = self.receivequeue.get()
if req is None:
# end-of-channel
if channel in self.channels:
# too early! we must have got an error
channel.waitclose()
# or else we raise one
raise IOError('connection unexpectedly closed: %s ' % (
channel.gateway,))
else:
command, data = req
if command == "links":
for link in self.links:
channel.send(link)
# completion marker, this host is done
channel.send(42)
elif command == "done":
finishedcallback = self.channels.pop(channel)
if finishedcallback:
finishedcallback()
elif command == "ack":
if self.callback:
self.callback("ack", self.paths[data], channel)
elif command == "list_done":
# sum up all to send
if self.callback:
s = sum([self.paths[i] for i in self.to_send[channel]])
self.callback("list", s, channel)
elif command == "send":
modified_rel_path, checksum = data
modifiedpath = os.path.join(self.sourcedir, *modified_rel_path)
f = open(modifiedpath, 'rb')
data = f.read()
# provide info to progress callback function
modified_rel_path = "/".join(modified_rel_path)
self.paths[modified_rel_path] = len(data)
if channel not in self.to_send:
self.to_send[channel] = []
self.to_send[channel].append(modified_rel_path)
f.close()
if checksum is not None and checksum == md5.md5(data).digest():
data = None # not really modified
else:
# ! there is a reason for the interning:
# sharing multiple copies of the file's data
data = intern(data)
print '%s <= %s' % (
channel.gateway._getremoteaddress(),
modified_rel_path)
channel.send(data)
del data
else:
assert "Unknown command %s" % command
def _broadcast(self, msg):
for channel in self.channels:
channel.send(msg)
def _send_link(self, basename, linkpoint):
self.links.append(("link", basename, linkpoint))
def _send_directory_structure(self, path):
st = os.lstat(path)
if stat.S_ISREG(st.st_mode):
# regular file: send a timestamp/size pair
self._broadcast((st.st_mtime, st.st_size))
elif stat.S_ISDIR(st.st_mode):
# dir: send a list of entries
names = []
subpaths = []
for name in os.listdir(path):
p = os.path.join(path, name)
if self.filter(p):
names.append(name)
subpaths.append(p)
self._broadcast(names)
for p in subpaths:
self._send_directory_structure(p)
elif stat.S_ISLNK(st.st_mode):
linkpoint = os.readlink(path)
basename = path[len(self.sourcedir) + 1:]
if not linkpoint.startswith(os.sep):
# relative link, just send it
self._send_link(basename, linkpoint)
elif linkpoint.startswith(self.sourcedir):
self._send_link(basename, linkpoint[len(self.sourcedir) + 1:])
else:
self._send_link(basename, linkpoint)
self._broadcast(None)
else:
raise ValueError, "cannot sync %r" % (path,)
REMOTE_SOURCE = py.path.local(__file__).dirpath().\
join('rsync_remote.py').open().read() + "\nf()"