diff --git a/py/execnet/rsync.py b/py/execnet/rsync.py index 3baab2494..c4979feb8 100644 --- a/py/execnet/rsync.py +++ b/py/execnet/rsync.py @@ -38,6 +38,67 @@ class RSync(object): channel.send((str(destdir), self._options)) self._channels[channel] = finishedcallback + def _end_of_channel(self, channel): + if channel in self._channels: + # too early! we must have got an error + channel.waitclose() + # or else we raise one + raise IOError('connection unexpectedly closed: %s ' % ( + channel.gateway,)) + + def _process_link(self, channel): + for link in self._links: + channel.send(link) + # completion marker, this host is done + channel.send(42) + + def _done(self, channel): + """ Call all callbacks + """ + finishedcallback = self._channels.pop(channel) + if finishedcallback: + finishedcallback() + + def _list_done(self, channel): + # sum up all to send + if self._callback: + s = sum([self._paths[i] for i in self._to_send[channel]]) + self._callback("list", s, channel) + + def _send_item(self, channel, data): + """ Send one item + """ + modified_rel_path, checksum = data + modifiedpath = os.path.join(self._sourcedir, *modified_rel_path) + try: + f = open(modifiedpath, 'rb') + data = f.read() + except IOError: + data = None + + # provide info to progress callback function + modified_rel_path = "/".join(modified_rel_path) + if data is not None: + self._paths[modified_rel_path] = len(data) + else: + self._paths[modified_rel_path] = 0 + if channel not in self._to_send: + self._to_send[channel] = [] + self._to_send[channel].append(modified_rel_path) + + if data is not None: + f.close() + if checksum is not None and checksum == md5.md5(data).digest(): + data = None # not really modified + else: + # ! there is a reason for the interning: + # sharing multiple copies of the file's data + data = intern(data) + print '%s <= %s' % ( + channel.gateway.remoteaddress, + modified_rel_path) + channel.send(data) + def send(self, sourcedir): """ Sends a sourcedir to all added targets. """ @@ -56,63 +117,20 @@ class RSync(object): while self._channels: channel, req = self._receivequeue.get() if req is None: - # end-of-channel - if channel in self._channels: - # too early! we must have got an error - channel.waitclose() - # or else we raise one - raise IOError('connection unexpectedly closed: %s ' % ( - channel.gateway,)) + self._end_of_channel(channel) else: command, data = req if command == "links": - for link in self._links: - channel.send(link) - # completion marker, this host is done - channel.send(42) + self._process_link(channel) elif command == "done": - finishedcallback = self._channels.pop(channel) - if finishedcallback: - finishedcallback() + self._done(channel) elif command == "ack": if self._callback: self._callback("ack", self._paths[data], channel) elif command == "list_done": - # sum up all to send - if self._callback: - s = sum([self._paths[i] for i in self._to_send[channel]]) - self._callback("list", s, channel) + self._list_done(channel) elif command == "send": - modified_rel_path, checksum = data - modifiedpath = os.path.join(self._sourcedir, *modified_rel_path) - try: - f = open(modifiedpath, 'rb') - data = f.read() - except IOError: - data = None - - # provide info to progress callback function - modified_rel_path = "/".join(modified_rel_path) - if data is not None: - self._paths[modified_rel_path] = len(data) - else: - self._paths[modified_rel_path] = 0 - if channel not in self._to_send: - self._to_send[channel] = [] - self._to_send[channel].append(modified_rel_path) - - if data is not None: - f.close() - if checksum is not None and checksum == md5.md5(data).digest(): - data = None # not really modified - else: - # ! there is a reason for the interning: - # sharing multiple copies of the file's data - data = intern(data) - print '%s <= %s' % ( - channel.gateway.remoteaddress, - modified_rel_path) - channel.send(data) + self._send_item(channel, data) del data else: assert "Unknown command %s" % command @@ -124,6 +142,32 @@ class RSync(object): def _send_link(self, basename, linkpoint): self._links.append(("link", basename, linkpoint)) + def _send_directory(self, path): + # dir: send a list of entries + names = [] + subpaths = [] + for name in os.listdir(path): + p = os.path.join(path, name) + if self.filter(p): + names.append(name) + subpaths.append(p) + self._broadcast(names) + for p in subpaths: + self._send_directory_structure(p) + + def _send_link_structure(self, path): + linkpoint = os.readlink(path) + basename = path[len(self._sourcedir) + 1:] + if not linkpoint.startswith(os.sep): + # relative link, just send it + # XXX: do sth with ../ links + self._send_link(basename, linkpoint) + elif linkpoint.startswith(self._sourcedir): + self._send_link(basename, linkpoint[len(self._sourcedir) + 1:]) + else: + self._send_link(basename, linkpoint) + self._broadcast(None) + def _send_directory_structure(self, path): try: st = os.lstat(path) @@ -134,29 +178,9 @@ class RSync(object): # regular file: send a timestamp/size pair self._broadcast((st.st_mtime, st.st_size)) elif stat.S_ISDIR(st.st_mode): - # dir: send a list of entries - names = [] - subpaths = [] - for name in os.listdir(path): - p = os.path.join(path, name) - if self.filter(p): - names.append(name) - subpaths.append(p) - self._broadcast(names) - for p in subpaths: - self._send_directory_structure(p) + self._send_directory(path) elif stat.S_ISLNK(st.st_mode): - linkpoint = os.readlink(path) - basename = path[len(self._sourcedir) + 1:] - if not linkpoint.startswith(os.sep): - # relative link, just send it - # XXX: do sth with ../ links - self._send_link(basename, linkpoint) - elif linkpoint.startswith(self._sourcedir): - self._send_link(basename, linkpoint[len(self._sourcedir) + 1:]) - else: - self._send_link(basename, linkpoint) - self._broadcast(None) + self._send_link_structure(path) else: raise ValueError, "cannot sync %r" % (path,)