test_ok1/py/execnet/rsync.py

166 lines
6.5 KiB
Python
Raw Normal View History

import py, os, stat, md5
from Queue import Queue
class RSync(object):
""" This class allows to synchronise files and directories
with one or multiple remote filesystems.
An RSync instance allows to dynamically add remote targets
and then synchronizes the remote filesystems with
any provided source directory.
There is limited support for symlinks, which means that symlinks
pointing to the sourcetree will be send "as is" while external
symlinks will be just copied (regardless of existance of such
a path on remote side).
"""
def __init__(self, callback=None, **options):
for name in options:
assert name in ('delete')
self._options = options
self._callback = callback
self._channels = {}
self._receivequeue = Queue()
self._links = []
def filter(self, path):
return True
def add_target(self, gateway, destdir, finishedcallback=None):
""" Adds a remote target specified via a 'gateway'
and a remote destination directory.
"""
def itemcallback(req):
self._receivequeue.put((channel, req))
channel = gateway.remote_exec(REMOTE_SOURCE)
channel.setcallback(itemcallback, endmarker = None)
channel.send((str(destdir), self._options))
self._channels[channel] = finishedcallback
def send(self, sourcedir):
""" Sends a sourcedir to all added targets.
"""
self._sourcedir = str(sourcedir)
# normalize a trailing '/' away
self._sourcedir = os.path.dirname(os.path.join(self._sourcedir, 'x'))
# send directory structure and file timestamps/sizes
self._send_directory_structure(self._sourcedir)
# paths and to_send are only used for doing
# progress-related callbacks
self._paths = {}
self._to_send = {}
# send modified file to clients
while self._channels:
channel, req = self._receivequeue.get()
if req is None:
# end-of-channel
if channel in self._channels:
# too early! we must have got an error
channel.waitclose()
# or else we raise one
raise IOError('connection unexpectedly closed: %s ' % (
channel.gateway,))
else:
command, data = req
if command == "links":
for link in self._links:
channel.send(link)
# completion marker, this host is done
channel.send(42)
elif command == "done":
finishedcallback = self._channels.pop(channel)
if finishedcallback:
finishedcallback()
elif command == "ack":
if self._callback:
self._callback("ack", self._paths[data], channel)
elif command == "list_done":
# sum up all to send
if self._callback:
s = sum([self._paths[i] for i in self._to_send[channel]])
self._callback("list", s, channel)
elif command == "send":
modified_rel_path, checksum = data
modifiedpath = os.path.join(self._sourcedir, *modified_rel_path)
try:
f = open(modifiedpath, 'rb')
data = f.read()
except IOError:
data = None
# provide info to progress callback function
modified_rel_path = "/".join(modified_rel_path)
if data is not None:
self._paths[modified_rel_path] = len(data)
else:
self._paths[modified_rel_path] = 0
if channel not in self._to_send:
self._to_send[channel] = []
self._to_send[channel].append(modified_rel_path)
if data is not None:
f.close()
if checksum is not None and checksum == md5.md5(data).digest():
data = None # not really modified
else:
# ! there is a reason for the interning:
# sharing multiple copies of the file's data
data = intern(data)
print '%s <= %s' % (
channel.gateway.remoteaddress,
modified_rel_path)
channel.send(data)
del data
else:
assert "Unknown command %s" % command
def _broadcast(self, msg):
for channel in self._channels:
channel.send(msg)
def _send_link(self, basename, linkpoint):
self._links.append(("link", basename, linkpoint))
def _send_directory_structure(self, path):
try:
st = os.lstat(path)
except OSError:
self._broadcast((0, 0))
return
if stat.S_ISREG(st.st_mode):
# regular file: send a timestamp/size pair
self._broadcast((st.st_mtime, st.st_size))
elif stat.S_ISDIR(st.st_mode):
# dir: send a list of entries
names = []
subpaths = []
for name in os.listdir(path):
p = os.path.join(path, name)
if self.filter(p):
names.append(name)
subpaths.append(p)
self._broadcast(names)
for p in subpaths:
self._send_directory_structure(p)
elif stat.S_ISLNK(st.st_mode):
linkpoint = os.readlink(path)
basename = path[len(self._sourcedir) + 1:]
if not linkpoint.startswith(os.sep):
# relative link, just send it
# XXX: do sth with ../ links
self._send_link(basename, linkpoint)
elif linkpoint.startswith(self._sourcedir):
self._send_link(basename, linkpoint[len(self._sourcedir) + 1:])
else:
self._send_link(basename, linkpoint)
self._broadcast(None)
else:
raise ValueError, "cannot sync %r" % (path,)
REMOTE_SOURCE = py.path.local(__file__).dirpath().\
join('rsync_remote.py').open().read() + "\nf()"