Source code for ngsildclient.api.entities

#!/usr/bin/env python3

# Software Name: ngsildclient
# SPDX-FileCopyrightText: Copyright (c) 2021 Orange
# SPDX-License-Identifier: Apache 2.0
#
# This software is distributed under the Apache 2.0;
# see the NOTICE file for more details.
#
# Author: Fabien BATTELLO <fabien.battello@orange.com> et al.

from __future__ import annotations
from typing import TYPE_CHECKING, Optional, Union, List

import logging

if TYPE_CHECKING:
    from .client import Client

from .constants import ENDPOINT_ENTITIES, EntityId, JSONLD_CONTEXT
from .exceptions import NgsiAlreadyExistsError, NgsiApiError, rfc7807_error_handle
from ..model.entity import Entity


logger = logging.getLogger(__name__)


[docs]class Entities: """A wrapper for the NGSI-LD API entities endpoint.""" def __init__(self, client: Client, url: str): self._client = client self._session = client.session self.url = url def to_broker_url(self, eid: Union[EntityId, Entity]) -> str: eid = eid.id if isinstance(eid, Entity) else eid return f"http://{self._client.hostname}:{self._client.port}/{ENDPOINT_ENTITIES}/{eid}" @rfc7807_error_handle def create(self, entity: Entity, skip: bool = False, overwrite: bool = False) -> Optional[Entity]: r = self._session.post( f"{self.url}/", json=entity._payload, ) if r.status_code == 409: # already exists if skip: return None elif overwrite or self._client.overwrite: return self.update(entity, check_exists=False) self._client.raise_for_status(r) location = r.headers.get("Location") if location is None: if self._client.ignore_errors: return None else: raise NgsiApiError("Missing Location header") logger.info(f"{r.status_code=}") logger.info(f"{location=}") id_returned_from_broker = location.rsplit("/", 1)[-1] if entity.id != id_returned_from_broker: raise NgsiApiError(f"Broker returned wrong id. Expected={entity.id} Returned={id_returned_from_broker}") return entity @rfc7807_error_handle def get( self, eid: Union[EntityId, Entity], ctx: str = None, asdict: bool = False, **kwargs, ) -> Entity: eid = eid.id if isinstance(eid, Entity) else eid headers = { "Accept": "application/ld+json", "Content-Type": None, } # overrides session headers if ctx is not None: headers["Link"] = f'<{ctx}>; rel="{JSONLD_CONTEXT}"; type="application/ld+json"' r = self._session.get(f"{self.url}/{eid}", headers=headers, **kwargs) self._client.raise_for_status(r) return r.json() if asdict else Entity.from_dict(r.json()) @rfc7807_error_handle def delete(self, eid: Union[EntityId, Entity]) -> bool: eid = eid.id if isinstance(eid, Entity) else eid logger.info(f"{eid=}") logger.info(f"url={self.url}/{eid}") r = self._session.delete(f"{self.url}/{eid}") logger.info(f"requests: {r.request.url}") self._client.raise_for_status(r) return bool(r) @rfc7807_error_handle def exists(self, eid: Union[EntityId, Entity]) -> bool: eid = eid.id if isinstance(eid, Entity) else eid r = self._session.get(f"{self.url}/{eid}") if r: payload = r.json() return "@context" in payload return False @rfc7807_error_handle def upsert(self, entity: Entity) -> Entity: try: return self.create(entity) except NgsiAlreadyExistsError: self.delete(entity) return self.create(entity) @rfc7807_error_handle def update(self, entity: Entity, check_exists: bool = True) -> Optional[Entity]: if check_exists and self.exists(entity): self.delete(entity) return self.create(entity) return None @rfc7807_error_handle def _query( self, type: str = None, q: str = None, gq: str = None, ctx: str = None, limit: int = 0, offset: int = 0 ) -> List[Entity]: params = {} if limit != 0: params |= {"limit": limit} if offset != 0: params |= {"offset": offset} if type is None and q is None: raise ValueError("Must indicate at least a type or a query string") if type: params["type"] = type if q: params["q"] = q if gq: params["geoQ"] = gq headers = { "Accept": "application/ld+json", "Content-Type": None, } # overrides session headers if ctx is not None: headers["Link"] = f'<{ctx}>; rel="{JSONLD_CONTEXT}"; type="application/ld+json"' r = self._session.get( self.url, headers=headers, params=params, ) self._client.raise_for_status(r) entities = r.json() logger.debug(f"{entities=}") return [Entity.from_dict(entity) for entity in entities] @rfc7807_error_handle def count(self, type: str = None, q: str = None, gq: str = None, ctx: str = None) -> int: params = {"limit": 0, "count": "true"} if type is None and q is None: raise ValueError("Must indicate at least a type or a query string") if type: params["type"] = type if q: params["q"] = q if gq: params["geoQ"] = gq headers = { "Accept": "application/json", "Content-Type": None, } # overrides session headers if ctx is not None: headers["Link"] = f'<{ctx}>; rel="{JSONLD_CONTEXT}"; type="application/ld+json"' r = self._session.get( self.url, headers=headers, params=params, ) self._client.raise_for_status(r) count = int(r.headers["NGSILD-Results-Count"]) return count