""" A Python "serializer". Doesn't do much serializing per se -- just converts to and from basic Python data types (lists, dicts, strings, etc.). Useful as a basis for other serializers. """ from collections import OrderedDict from django.apps import apps from django.core.serializers import base from django.db import DEFAULT_DB_ALIAS, models from django.utils.encoding import is_protected_type class Serializer(base.Serializer): """ Serialize a QuerySet to basic Python objects. """ internal_use_only = True def start_serialization(self): self._current = None self.objects = [] def end_serialization(self): pass def start_object(self, obj): self._current = OrderedDict() def end_object(self, obj): self.objects.append(self.get_dump_object(obj)) self._current = None def get_dump_object(self, obj): data = OrderedDict([('model', str(obj._meta))]) if not self.use_natural_primary_keys or not hasattr(obj, 'natural_key'): data["pk"] = self._value_from_field(obj, obj._meta.pk) data['fields'] = self._current return data def _value_from_field(self, obj, field): value = field.value_from_object(obj) # Protected types (i.e., primitives like None, numbers, dates, # and Decimals) are passed through as is. All other values are # converted to string first. return value if is_protected_type(value) else field.value_to_string(obj) def handle_field(self, obj, field): self._current[field.name] = self._value_from_field(obj, field) def handle_fk_field(self, obj, field): if self.use_natural_foreign_keys and hasattr(field.remote_field.model, 'natural_key'): related = getattr(obj, field.name) if related: value = related.natural_key() else: value = None else: value = self._value_from_field(obj, field) self._current[field.name] = value def handle_m2m_field(self, obj, field): if field.remote_field.through._meta.auto_created: if self.use_natural_foreign_keys and hasattr(field.remote_field.model, 'natural_key'): def m2m_value(value): return value.natural_key() else: def m2m_value(value): return self._value_from_field(value, value._meta.pk) self._current[field.name] = [ m2m_value(related) for related in getattr(obj, field.name).iterator() ] def getvalue(self): return self.objects def Deserializer(object_list, *, using=DEFAULT_DB_ALIAS, ignorenonexistent=False, **options): """ Deserialize simple Python objects back into Django ORM instances. It's expected that you pass the Python objects themselves (instead of a stream or a string) to the constructor """ field_names_cache = {} # Model: for d in object_list: # Look up the model and starting build a dict of data for it. try: Model = _get_model(d["model"]) except base.DeserializationError: if ignorenonexistent: continue else: raise data = {} if 'pk' in d: try: data[Model._meta.pk.attname] = Model._meta.pk.to_python(d.get('pk')) except Exception as e: raise base.DeserializationError.WithData(e, d['model'], d.get('pk'), None) m2m_data = {} if Model not in field_names_cache: field_names_cache[Model] = {f.name for f in Model._meta.get_fields()} field_names = field_names_cache[Model] # Handle each field for (field_name, field_value) in d["fields"].items(): if ignorenonexistent and field_name not in field_names: # skip fields no longer on model continue field = Model._meta.get_field(field_name) # Handle M2M relations if field.remote_field and isinstance(field.remote_field, models.ManyToManyRel): model = field.remote_field.model if hasattr(model._default_manager, 'get_by_natural_key'): def m2m_convert(value): if hasattr(value, '__iter__') and not isinstance(value, str): return model._default_manager.db_manager(using).get_by_natural_key(*value).pk else: return model._meta.pk.to_python(value) else: def m2m_convert(v): return model._meta.pk.to_python(v) try: m2m_data[field.name] = [] for pk in field_value: m2m_data[field.name].append(m2m_convert(pk)) except Exception as e: raise base.DeserializationError.WithData(e, d['model'], d.get('pk'), pk) # Handle FK fields elif field.remote_field and isinstance(field.remote_field, models.ManyToOneRel): model = field.remote_field.model if field_value is not None: try: default_manager = model._default_manager field_name = field.remote_field.field_name if hasattr(default_manager, 'get_by_natural_key'): if hasattr(field_value, '__iter__') and not isinstance(field_value, str): obj = default_manager.db_manager(using).get_by_natural_key(*field_value) value = getattr(obj, field.remote_field.field_name) # If this is a natural foreign key to an object that # has a FK/O2O as the foreign key, use the FK value if model._meta.pk.remote_field: value = value.pk else: value = model._meta.get_field(field_name).to_python(field_value) data[field.attname] = value else: data[field.attname] = model._meta.get_field(field_name).to_python(field_value) except Exception as e: raise base.DeserializationError.WithData(e, d['model'], d.get('pk'), field_value) else: data[field.attname] = None # Handle all other fields else: try: data[field.name] = field.to_python(field_value) except Exception as e: raise base.DeserializationError.WithData(e, d['model'], d.get('pk'), field_value) obj = base.build_instance(Model, data, using) yield base.DeserializedObject(obj, m2m_data) def _get_model(model_identifier): """Look up a model from an "app_label.model_name" string.""" try: return apps.get_model(model_identifier) except (LookupError, TypeError): raise base.DeserializationError("Invalid model identifier: '%s'" % model_identifier)