Source code for ngsildclient.api.asyn.client

#!/usr/bin/env python3

# Software Name: ngsildclient
# SPDX-FileCopyrightText: Copyright (c) 2021 Orange
# SPDX-License-Identifier: Apache 2.0
#
# This software is distributed under the Apache 2.0;
# see the NOTICE file for more details.
#
# Author: Fabien BATTELLO <fabien.battello@orange.com> et al.

import logging
import httpx
from httpx._types import AuthTypes
from dataclasses import dataclass
from typing import Optional, Tuple, Generator, List, Union, overload, Callable
from math import ceil

from ...model.entity import Entity
from ..constants import *
from ..exceptions import *
from .entities import Entities
from .batch import BatchOp
from .types import Types
from .contexts import Contexts
from .subscriptions import Subscriptions
from .temporal import Temporal, TemporalResult
from ..helper.temporal import TemporalQuery

logger = logging.getLogger(__name__)

"""This module contains the definition of the AsyncClient class.
"""


[docs]class AsyncClient: """An implementation of the NGSI-LD client API. It allows to connect to a NGSI-LD Context Broker, check the connection and try to identify the vendor. As for now it focuses on the /entities/{entityId} endpoint. Allowed operations are : - create(), update(), upsert() - get(), exists() - delete() Update() and upsert() operations are not atomic, as they aren't provided as-is by the API, but require chaining 2 API calls. When encountering errors the Client throws enriched Exceptions, as NGSI-LD API supports ProblemDetails [IETF RFC 7807]. Example ------- >>> from ngsildclient import * >>> # Here we don't build our own NGSI-LD entity, but grab an example from SmartDatamodels >>> farm = Entity.load(SmartDatamodels.SmartAgri.Agrifood.AgriFarm) >>> # farm.pprint() >>> # Connect to the Context Broker and upsert the entity >>> # As we don't provide any arguments to the Client constructor >>> # It connects to localhost and default port (without any authentication) >>> with Client() as client: >>> client.upsert(farm) """ def __init__( self, hostname: str = "localhost", port: int = NGSILD_DEFAULT_PORT, port_temporal: int = NGSILD_TEMPORAL_PORT, secure: bool = False, useragent: str = UA, tenant: str = None, overwrite: bool = False, ignore_errors: bool = False, proxy: str = None, custom_auth: AuthTypes = None, ): """Create a Client instance to interact with the Context Broker. The Client allows to retrieve or send entities (model.Entity instances) to the Context Broker. For example, one can retrieve an entity from the Context Broker, modify it (update/delete/add properties), and send it back to the Context Broker. Parameters ---------- hostname : str, optional the hostname to connect to, by default "localhost" port : int, optional the TCP port to connect to, by default NGSILD_DEFAULT_PORT secure : bool, optional whether to use TLS, by default False useragent : str, optional the User Agent string sent in the HTTP headers, by default UA tenant : str, optional the tenant string in case you make use of multi-tenancy, by default None overwrite : bool, optional if set create() will behave like upsert(), by default False ignore_errors : bool, optional if set tests the connection at init time and raisesan exception if failed, by default False proxy : str, optional proxies all requests to the provided proxy string (for debugging purpose), by default None See Also -------- api.model.Entity """ self.hostname = hostname self.port = port self.port_temporal = port_temporal self.secure = secure self.scheme = "https" if secure else "http" self.url = f"{self.scheme}://{hostname}:{port}" self.basepath = f"{self.url}/{NGSILD_PATH}" self.useragent = useragent self.tenant = tenant self.overwrite = overwrite self.ignore_errors = ignore_errors self.proxy = proxy headers = { "User-Agent": self.useragent, "Accept": "application/ld+json", # "Content-Type": "application/ld+json", } if tenant is not None: headers["NGSILD-Tenant"] = tenant proxies = {proxy} if proxy else None logger.info("Connecting client ...") self.client = httpx.AsyncClient(auth=custom_auth, headers=headers, proxies=proxies) self._entities = Entities(self, f"{self.url}/{ENDPOINT_ENTITIES}") self._batch = BatchOp(self, f"{self.url}/{ENDPOINT_BATCH}") self._types = Types(self, f"{self.url}/{ENDPOINT_TYPES}") self._contexts = Contexts(self, f"{self.url}/{ENDPOINT_CONTEXTS}") self._subscriptions = Subscriptions(self, f"{self.url}/{ENDPOINT_SUBSCRIPTIONS}") url_temporal = f"{self.scheme}://{hostname}:{port_temporal}" temporal_path = ( f"{url_temporal}/{NGSILD_BASEPATH}/temporal/entities" if port_temporal == port # temporal share the same port and basepath else f"{url_temporal}/temporal/entities" ) self._temporal = Temporal(self, temporal_path)
[docs] def raise_for_status(self, r: httpx.Response): """Raises an exception depending on the API response. Parameters ---------- r : Response Response from the Context Broker """ if not self.ignore_errors: r.raise_for_status()
@property def entities(self): return self._entities @property def batch(self): return self._batch @property def types(self): return self._types @property def contexts(self): return self._contexts @property def subscriptions(self): return self._subscriptions @property def temporal(self): return self._temporal
[docs] async def close(self): """Terminates the client. Closes the underlying httpx.AsyncClient. """ await self.client.aclose()
@overload async def create(self, entity: Entity, skip: bool = False, overwrite: bool = False) -> Entity: """Create an entity. Facade method for Entities.create(). Parameters ---------- entity : Entity the entity to be created by the Context Broker skip : bool, optional if set, skips creation (do nothing) if already exists, by default False overwrite : bool, optional if set, force upsert the entity if already exists, by default False Returns ------- Entity the entity succesfully created """ ... @overload async def create(self, entities: List[Entity], skip: bool = False, overwrite: bool = False): """Create a batch of entities. Facade method for Batch.create(). Parameters ---------- entities : List[Entity] the entity to be created by the Context Broker skip : bool, optional if set, skips creation (do nothing) if already exists, by default False overwrite : bool, optional if set, force upsert the entity if already exists, by default False Returns ------- BatchOperationResult TODO """ ... async def create( self, _entities: Union[Entity, List[Entity]], skip: bool = False, overwrite: bool = False, ) -> Optional[Entity]: if isinstance(_entities, Entity): entity = _entities return await self.entities.create(entity, skip, overwrite) else: return await self.batch.create(_entities, skip, overwrite)
[docs] async def get( self, eid: Union[EntityId, Entity], ctx: str = None, asdict: bool = False, **kwargs, ) -> Entity: """Retrieve an entity given its id. Facade method for Entities.retrieve(). If already dealing with an entity instance one can provide the entity itself instead of its id. Parameters ---------- eid : Union[EntityId, Entity] The entity identifier or the entity instance ctx : str The context asdict : bool, optional If set (instead of returning an Entity) returns the raw API response (a Python dict that represents the JSON response), by default False Returns ------- Entity The retrieved entity """ return await self.entities.get(eid, ctx, asdict, **kwargs)
@overload async def delete(self, eid: Union[EntityId, Entity]) -> bool: """Delete an entity given its id. Facade method for Entities.delete(). If already dealing with an entity instance one can provide the entity itself instead of its id. Parameters ---------- eid : Union[EntityId, Entity] The entity identifier or the entity instance Returns ------- bool True if the entity has been succefully deleted """ ... @overload async def delete(self, eids: List[Union[EntityId, Entity]]) -> bool: """Delete entities given its id. Facade method for Batch.delete(). If already dealing with entity instances one can provide the entities instead of ids. Parameters ---------- eids : List[Union[EntityId, Entity]] The entities ids or instances Returns ------- bool True if the entity has been succefully deleted """ ... async def delete(self, eids: Union[Union[EntityId, Entity], List[Union[EntityId, Entity]]]) -> bool: if isinstance(eids, list): return await self.batch.delete(eids) else: eid = eids return await self.entities.delete(eid)
[docs] async def delete_from_file(self, filename: str) -> Union[Entity, dict]: """Delete in the broker all entities present in the JSON file. Parameters ---------- filename : str Points to the JSON input file that contains entities. """ entities = await Entity.load_async(filename) return await self.delete(entities)
[docs] async def exists(self, eid: Union[EntityId, Entity]) -> bool: """Tests if an entity exists. Facade method for Entities.exists(). If already dealing with an entity instance one can provide the entity itself instead of its id. Parameters ---------- eid : Union[EntityId, Entity] The entity identifier or the entity instance Returns ------- bool True if the entity exists """ return await self.entities.exists(eid)
@overload async def upsert(self, entity: Entity) -> Entity: """Upsert the entity or update it if already exists. Facade method for Entities.upsert(). Parameters ---------- entity : Entity The entity to be upserted by the Context Broker Returns ------- Entity The entity successfully upserted """ ... @overload async def upsert(self, entities: List[Entity]) -> dict: """Upsert a batch of entities. Facade method for Batch.upsert(). Parameters ---------- entity : Entity The entity to be upserted by the Context Broker Returns ------- Entity The entities successfully upserted """ ... async def upsert(self, entities: Union[Entity, List[Entity]]) -> Union[Entity, dict]: if isinstance(entities, Entity): entity = entities return await self.entities.upsert(entity) else: return await self.batch.upsert(entities)
[docs] async def bulk_import(self, filename: str) -> Union[Entity, dict]: """Upsert all entities from a JSON file. Parameters ---------- filename : str Points to the JSON input file that contains entities. """ entities = await Entity.load_async(filename) return await self.upsert(entities)
@overload async def update(self, entity: Entity) -> Optional[Entity]: """Update the entity. Facade method for Entities.update(). Parameters ---------- entity : Entity The entity to be updated by the Context Broker Returns ------- Optional[Entity] The entity successfully updated (or None if not found) """ ... @overload async def update(self, entities: List[Entity]) -> dict: """Update a batch of entities. Facade method for Batch.update(). Parameters ---------- entities : List[Entity] The entities to be updated by the Context Broker Returns ------- Optional[Entity] The entity successfully updated (or None if not found) """ ... async def update(self, entities: Union[Entity, List[Entity]]) -> Union[Optional[Entity], dict]: if isinstance(entities, Entity): entity = entities return await self.entities.update(entity) else: return await self.batch.update(entities)
[docs] async def query_head( self, type: str = None, q: str = None, gq: str = None, ctx: str = None, n: int = 5 ) -> List[Entity]: """Retrieve entities given its type and/or query string. Retrieve up to PAGINATION_LIMIT_MAX entities. Use query_all() to retrieve all entities. Use entities.query() to deal with limit and offset on your own. Parameters ---------- etype : str The entity's type q: str The query string (NGSI-LD Query Language) gq: str The geoquery string (NGSI-LD Geoquery Language) ctx: str The context n: int The first n entities to be retrieved Returns ------- list[Entity] Retrieved entities matching the given type and/or query string Example ------- >>> with AsyncClient() as client: >>> await client.query(type="AgriFarm") # match a given type >>> with AsyncClient() as client: >>> await client.query(type="AgriFarm", q='contactPoint[email]=="wheatfarm@email.com"') # match type and query """ return await self.entities._query(type, q, gq, ctx, limit=n)
[docs] async def query( self, type: str = None, q: str = None, gq: str = None, ctx: str = None, limit: int = PAGINATION_LIMIT_MAX, max: int = 1_000_000, ) -> List[Entity]: """Retrieve entities given its type and/or query string. Retrieve all entities by sending as many requests as needed, using pagination. Assume data hold in memory. Should not be an issue except for very large datasets. Parameters ---------- etype : str The entity's type q: str The query string (NGSI-LD Query Language) gq: str The geoquery string (NGSI-LD Geoquery Language) ctx: str The context limit: int The number of entities retrieved in each request Returns ------- list[Entity] Retrieved entities matching the given type and/or query string Example ------- >>> with AsyncClient() as client: >>> await client.query(type="AgriFarm") # match a given type >>> with AsyncClient() as client: >>> await client.query(type="AgriFarm", q='contactPoint[email]=="wheatfarm@email.com"') # match type and query """ entities: list[Entity] = [] count = await self.entities.count(type, q, gq, ctx=ctx) if count > max: raise NgsiClientTooManyResultsError(f"{count} results exceed maximum {max}") for page in range(ceil(count / limit)): entities.extend(await self.entities._query(type, q, gq, ctx, limit, page * limit)) return entities
[docs] async def query_generator( self, type: str = None, q: str = None, gq: str = None, ctx: str = None, limit: int = PAGINATION_LIMIT_MAX, batch: bool = False, ) -> Generator[Entity, None, None]: """Retrieve (as a generator) entities given its type and/or query string. By returning a generator it allows to process entities on the fly without any risk of exhausting memory. Parameters ---------- etype : str The entity's type q: str The query string (NGSI-LD Query Language) gq: str The geoquery string (NGSI-LD Geoquery Language) ctx: str The context limit: int The number of entities retrieved in each request Returns ------- list[Entity] Retrieved a generator of entities (matching the given type and/or query string) Example ------- >>> with AsyncClient() as client: >>> async for entity in await client.query_handle(type="AgriFarm"): print(entity) """ count = await self.entities.count(type, q, gq) for page in range(ceil(count / limit)): entities = await self.entities._query(type, q, gq, ctx, limit, page * limit) if batch: yield entities else: for entity in entities: yield entity
[docs] async def query_handle( self, type: str = None, q: str = None, gq: str = None, ctx: str = None, limit: int = PAGINATION_LIMIT_MAX, *, callback: Callable[[Entity], None], ) -> None: """Apply a callback function on entity of the query result. Parameters ---------- etype : str The entity's type q: str The query string (NGSI-LD Query Language) gq: str The geoquery string (NGSI-LD Geoquery Language) ctx: str The context limit: int The number of entities retrieved in each request callback: Callable[Entity] The function to be called on each entity of the result Example ------- >>> with AsyncClient() as client: >>> await client.query_handle(type="AgriFarm", lambda e: print(e)) """ async for entity in self.query_generator(type, q, gq, ctx, limit, False): callback(entity)
[docs] async def count(self, type: str = None, q: str = None, gq: str = None) -> int: """Return number of entities matching type and/or query string. Facade method for Entities.count(). Parameters ---------- etype : str The entity's type q: str The query string (NGSI-LD Query Language) gq: str The geoquery string (NGSI-LD Geoquery Language) Returns ------- int The number of matching entities Example ------- >>> with AsyncClient() as client: >>> await client.count(type="AgriFarm") # match a given type >>> with AsyncClient() as client: >>> await client.count(type="AgriFarm", query='contactPoint[email]=="wheatfarm@email.com"') # match type and query """ return await self.entities.count(type, q)
[docs] async def delete_where(self, type: str = None, q: str = None, **kwargs): """Batch delete entities matching type and/or query string. Parameters ---------- etype : str The entity's type query: str The query string (NGSI-LD Query Language) Example ------- >>> with AsyncClient() as client: >>> await client.delete_where(type="AgriFarm", query='contactPoint[email]=="wheatfarm@email.com"') # match type and query """ g = self.query_generator(type, q, batch=True, **kwargs) async for batch in g: await self.batch.delete(batch)
[docs] async def drop(self, *types: str) -> None: """Batch delete entities matching the given type. Parameters ---------- type : str The entity's type Example ------- >>> with AsyncClient() as client: >>> await client.drop("AgriFarm") """ for t in types: await self.delete_where(type=t)
[docs] async def purge(self) -> None: """Batch delete all entities. Example ------- >>> with AsyncClient() as client: >>> await client.purge() """ for type in await self.types.list(): await self.drop(type)
[docs] async def flush_all(self) -> None: """Batch delete all entities and remove all contexts. Example ------- >>> with AsyncClient() as client: >>> await client.purge() """ await self.purge(type) await self.contexts.cleanup()
# below the context manager methods async def __aenter__(self): return self async def __aexit__(self, type, value, traceback): await self.close()