test_ok2/py/test/dsession/mypickle.py

176 lines
5.8 KiB
Python

"""
Pickling support for two processes that want to exchange
*immutable* object instances. Immutable in the sense
that the receiving side of an object can modify its
copy but when it sends it back the original sending
side will continue to see its unmodified version
(and no actual state will go over the wire).
This module also implements an experimental
execnet pickling channel using this idea.
"""
from cStringIO import StringIO
from pickle import Pickler, Unpickler
import py
from py.__.execnet.channel import Channel
import os
#debug = open("log-mypickle-%d" % os.getpid(), 'w')
class MyPickler(Pickler):
""" Pickler with a custom memoize()
to take care of unique ID creation.
See the usage in ImmutablePickler
XXX we could probably extend Pickler
and Unpickler classes to directly
update the other'S memos.
"""
def __init__(self, file, protocol, uneven):
Pickler.__init__(self, file, protocol)
self.uneven = uneven
def memoize(self, obj):
if self.fast:
return
assert id(obj) not in self.memo
memo_len = len(self.memo)
key = memo_len * 2 + self.uneven
self.write(self.put(key))
self.memo[id(obj)] = key, obj
class ImmutablePickler:
def __init__(self, uneven, protocol=0):
""" ImmutablePicklers are instantiated in Pairs.
The two sides need to create unique IDs
while pickling their objects. This is
done by using either even or uneven
numbers, depending on the instantiation
parameter.
"""
self._picklememo = {}
self._unpicklememo = {}
self._protocol = protocol
self.uneven = uneven and 1 or 0
def selfmemoize(self, obj):
# this is for feeding objects to ourselfes
# which be the case e.g. if you want to pickle
# from a forked process back to the original
f = StringIO()
pickler = MyPickler(f, self._protocol, uneven=self.uneven)
pickler.memo = self._picklememo
pickler.memoize(obj)
self._updateunpicklememo()
def dumps(self, obj):
f = StringIO()
pickler = MyPickler(f, self._protocol, uneven=self.uneven)
pickler.memo = self._picklememo
pickler.dump(obj)
self._updateunpicklememo()
#print >>debug, "dumped", obj
#print >>debug, "picklememo", self._picklememo
return f.getvalue()
def loads(self, string):
f = StringIO(string)
unpickler = Unpickler(f)
unpickler.memo = self._unpicklememo
res = unpickler.load()
self._updatepicklememo()
#print >>debug, "loaded", res
#print >>debug, "unpicklememo", self._unpicklememo
return res
def _updatepicklememo(self):
for x, obj in self._unpicklememo.items():
self._picklememo[id(obj)] = (int(x), obj)
def _updateunpicklememo(self):
for key,obj in self._picklememo.values():
key = str(key)
if key in self._unpicklememo:
assert self._unpicklememo[key] is obj
self._unpicklememo[key] = obj
NO_ENDMARKER_WANTED = object()
class UnpickleError(Exception):
""" Problems while unpickling. """
def __init__(self, formatted):
self.formatted = formatted
Exception.__init__(self, formatted)
def __str__(self):
return self.formatted
class PickleChannel(object):
""" PickleChannels wrap execnet channels
and allow to send/receive by using
"immutable pickling".
"""
_unpicklingerror = None
def __init__(self, channel):
self._channel = channel
# we use the fact that each side of a
# gateway connection counts with uneven
# or even numbers depending on which
# side it is (for the purpose of creating
# unique ids - which is what we need it here for)
uneven = channel.gateway._channelfactory.count % 2
self._ipickle = ImmutablePickler(uneven=uneven)
self.RemoteError = channel.RemoteError
def send(self, obj):
if not isinstance(obj, Channel):
pickled_obj = self._ipickle.dumps(obj)
self._channel.send(pickled_obj)
else:
self._channel.send(obj)
def receive(self):
pickled_obj = self._channel.receive()
return self._unpickle(pickled_obj)
def _unpickle(self, pickled_obj):
if isinstance(pickled_obj, self._channel.__class__):
return pickled_obj
return self._ipickle.loads(pickled_obj)
def _getremoteerror(self):
return self._unpicklingerror or self._channel._getremoteerror()
def close(self):
return self._channel.close()
def isclosed(self):
return self._channel.isclosed()
def waitclose(self, timeout=None):
return self._channel.waitclose(timeout=timeout)
def setcallback(self, callback, endmarker=NO_ENDMARKER_WANTED):
if endmarker is NO_ENDMARKER_WANTED:
def unpickle_callback(pickled_obj):
obj = self._unpickle(pickled_obj)
callback(obj)
self._channel.setcallback(unpickle_callback)
return
uniqueendmarker = object()
def unpickle_callback(pickled_obj):
if pickled_obj is uniqueendmarker:
return callback(endmarker)
try:
obj = self._unpickle(pickled_obj)
except KeyboardInterrupt:
raise
except:
excinfo = py.code.ExceptionInfo()
formatted = str(excinfo.getrepr(showlocals=True,funcargs=True))
self._unpicklingerror = UnpickleError(formatted)
callback(endmarker)
else:
callback(obj)
self._channel.setcallback(unpickle_callback, uniqueendmarker)