Source code for ngsildclient.model.ngsidict

#!/usr/bin/env python3

# Software Name: ngsildclient
# SPDX-FileCopyrightText: Copyright (c) 2021 Orange
# SPDX-License-Identifier: Apache 2.0
#
# This software is distributed under the Apache 2.0;
# see the NOTICE file for more details.
#
# Author: Fabien BATTELLO <fabien.battello@orange.com> et al.

from __future__ import annotations

from typing import Any, Union, List
from functools import reduce
from datetime import datetime
from geojson import Point, LineString, Polygon, MultiPoint

import ngsildclient.model.entity as entity
from ..utils import iso8601, url
from ..utils.urn import Urn
from .constants import *
from .exceptions import *

import json
import operator

"""This module contains the definition of the NgsiDict class.
"""


[docs]class NgsiDict(dict): """This class is a custom dictionary that backs an entity. NgsiDict is used to build and hold the entity properties, as well as the entity's root. It's not exposed to the user but intended to be used by the Entity class. NgsiDict provides methods that allow to build a dictionary compliant with a NGSI-LD structure. See Also -------- model.Entity """ def __init__(self, *args, dtcached: datetime = None, **kwargs): super().__init__(*args, **kwargs) self._dtcached: datetime = dtcached if dtcached else iso8601.utcnow() @classmethod def _from_json(cls, payload: str): d = json.loads(payload) return cls(d) def _cachedate(self, dt: datetime): self._dtcached = dt def _dateauto(self): if self._dtcached is None: self._dtcached = iso8601.utcnow() return self._dtcached def __getitem__(self, element: str): return reduce(dict.__getitem__, element.split("."), self) def __delitem__(self, element: str): try: nested, k = element.rsplit(".", 1) except ValueError: dict.__delitem__(self, element) else: dict.__delitem__(self[nested], k) def __setitem__(self, key: str, value: Any): try: nested, k = key.rsplit(".", 1) except ValueError: dict.__setitem__(self, key, value) else: dict.__setitem__(self[nested], k, value) @classmethod def _load(cls, filename: str): with open(filename, "r") as fp: d = json.load(fp) return cls(d)
[docs] def to_json(self, indent=None) -> str: """Returns the dict in json format""" return json.dumps(self, default=str, ensure_ascii=False, indent=indent)
[docs] def pprint(self, *args, **kwargs) -> None: """Returns the dict pretty-json-formatted""" print(self.to_json(indent=2, *args, **kwargs))
def _save(self, filename: str, indent=2): with open(filename, "w") as fp: json.dump(self, fp, default=str, ensure_ascii=False, indent=indent) def _process_observedat(self, observedat): if observedat is Auto: observedat = self._dateauto() date_str, temporaltype, dt = iso8601.parse(observedat) if temporaltype != TemporalType.DATETIME: raise NgsiDateFormatError(f"observedAt must be a DateTime : {date_str}") self._cachedate(dt) return date_str def _build_property( self, value: Any, unitcode: str = None, observedat: Union[str, datetime, type[Auto]] = None, datasetid: str = None, userdata: NgsiDict = None, escape: bool = False, ) -> NgsiDict: property: NgsiDict = NgsiDict() property["type"] = AttrType.PROP.value # set type if isinstance(value, (int, float, bool, list, dict)): v = value elif isinstance(value, str): v = url.escape(value) if escape else value else: raise NgsiUnmatchedAttributeTypeError(f"Cannot map {type(value)} to NGSI type. {value=}") property["value"] = v # set value if unitcode is not None: property[META_ATTR_UNITCODE] = unitcode if observedat is not None: property[META_ATTR_OBSERVED_AT] = self._process_observedat(observedat) if datasetid is not None: property[META_ATTR_DATASET_ID] = Urn.prefix(datasetid) if userdata: property |= userdata return property def prop(self, name: str, value: str, **kwargs): self[name] = self._build_property(value, **kwargs) return self[name] def _build_geoproperty( self, value: NgsiGeometry, observedat: Union[str, datetime, type[Auto]] = None, datasetid: str = None, ) -> NgsiDict: property: NgsiDict = NgsiDict() property["type"] = AttrType.GEO.value # set type if isinstance(value, (Point, LineString, Polygon, MultiPoint)): geometry = value elif isinstance(value, tuple) and len(value) == 2: # simple way for a location Point lat, lon = value geometry = Point((lon, lat)) else: raise NgsiUnmatchedAttributeTypeError(f"Cannot map {type(value)} to NGSI type. {value=}") property["value"] = geometry # set value if observedat is not None: property[META_ATTR_OBSERVED_AT] = self._process_observedat(observedat) if datasetid is not None: property[META_ATTR_DATASET_ID] = Urn.prefix(datasetid) return property def gprop(self, name: str, value: str, **kwargs): self[name] = self._build_geoproperty(value, **kwargs) return self[name] def _build_temporal_property(self, value: Union[NgsiDate, type[Auto]]) -> NgsiDict: property: NgsiDict = NgsiDict() property["type"] = AttrType.TEMPORAL.value # set type if value is Auto: value = self._dateauto() date_str, temporaltype, dt = iso8601.parse(value) v = { "@type": temporaltype.value, "@value": date_str, } property["value"] = v # set value self._cachedate(dt) return property def tprop(self, name: str, value: str, **kwargs): self[name] = self._build_temporal_property(value, **kwargs) return self[name] def _build_relationship( self, value: Union[str, List[str]], observedat: Union[str, datetime, type[Auto]] = None, datasetid: str = None, userdata: NgsiDict = None, ) -> NgsiDict: property: NgsiDict = NgsiDict() property["type"] = AttrType.REL.value # set types property["object"] = Urn.prefix(value.id) if isinstance(value, entity.Entity) else Urn.prefix(value) if observedat is not None: property[META_ATTR_OBSERVED_AT] = self._process_observedat(observedat) if datasetid is not None: property[META_ATTR_DATASET_ID] = Urn.prefix(datasetid) if userdata: property |= userdata return property def _m_build_relationship( # Multiple Relationship limitation : no metadata self, value: List[str], observedat: Union[str, datetime, type[Auto]] = None, datasetid: str = None, userdata: NgsiDict = None, ) -> NgsiDict: property: List[NgsiDict] = [] if isinstance(observedat, List): if len(observedat) != len(value): raise ValueError("Missing observedAt attributes") else: observedat = [observedat] * len(value) if isinstance(datasetid, List): if len(datasetid) != len(value): raise ValueError("Missing datasetId attributes") else: datasetid = [datasetid] * len(value) if isinstance(userdata, List): if len(userdata) != len(value): raise ValueError("Missing userdata attributes") else: userdata = [userdata] * len(value) for x in zip(value, observedat, datasetid, userdata): property.append(self._build_relationship(*x)) return property def rel(self, name: str, value: str, **kwargs): if isinstance(name, Rel): name = name.value self[name] = self._build_relationship(value, **kwargs) return self[name]