#!/usr/bin/env python3
# Software Name: ngsildclient
# SPDX-FileCopyrightText: Copyright (c) 2021 Orange
# SPDX-License-Identifier: Apache 2.0
#
# This software is distributed under the Apache 2.0;
# see the NOTICE file for more details.
#
# Author: Fabien BATTELLO <fabien.battello@orange.com> et al.
import logging
import requests
from requests.auth import AuthBase
from dataclasses import dataclass
from typing import Optional, Tuple, Generator, List, Union, overload, Callable
from math import ceil
from ..utils import is_interactive
from ..model.entity import Entity
from .constants import *
from .entities import Entities
from .batch import BatchOp
from .types import Types
from .contexts import Contexts
from .subscriptions import Subscriptions
from .temporal import Temporal, TemporalResult
from .exceptions import *
from .helper.temporal import TemporalQuery
logger = logging.getLogger(__name__)
"""This module contains the definition of the Client class.
"""
[docs]@dataclass
class Broker:
"""Represent a NGSI-LD Context Broker."""
vendor: Vendor = Vendor.UNKNOWN
version: str = "N/A"
[docs]class Client:
"""An implementation of the NGSI-LD client API.
It allows to connect to a NGSI-LD Context Broker, check the connection and
try to identify the vendor.
As for now it focuses on the /entities/{entityId} endpoint.
Allowed operations are :
- create(), update(), upsert()
- get(), exists()
- delete()
Update() and upsert() operations are not atomic, as they
aren't provided as-is by the API, but require chaining 2 API calls.
When encountering errors the Client throws enriched
Exceptions, as NGSI-LD API supports ProblemDetails [IETF RFC 7807].
Example
-------
>>> from ngsildclient import *
>>> # Here we don't build our own NGSI-LD entity, but grab an example from SmartDatamodels
>>> farm = Entity.load(SmartDatamodels.SmartAgri.Agrifood.AgriFarm)
>>> # farm.pprint()
>>> # Connect to the Context Broker and upsert the entity
>>> # As we don't provide any arguments to the Client constructor
>>> # It connects to localhost and default port (without any authentication)
>>> with Client() as client:
>>> client.upsert(farm)
"""
def __init__(
self,
hostname: str = "localhost",
port: int = NGSILD_DEFAULT_PORT,
port_temporal: int = NGSILD_TEMPORAL_PORT,
secure: bool = False,
useragent: str = UA,
tenant: str = None,
overwrite: bool = False,
ignore_errors: bool = False,
proxy: str = None,
custom_auth: AuthBase = None,
):
"""Create a Client instance to interact with the Context Broker.
The Client allows to retrieve or send entities
(model.Entity instances) to the Context Broker.
For example, one can retrieve an entity from the Context Broker,
modify it (update/delete/add properties),
and send it back to the Context Broker.
Parameters
----------
hostname : str, optional
the hostname to connect to, by default "localhost"
port : int, optional
the TCP port to connect to, by default NGSILD_DEFAULT_PORT
secure : bool, optional
whether to use TLS, by default False
useragent : str, optional
the User Agent string sent in the HTTP headers, by default UA
tenant : str, optional
the tenant string in case you make use of multi-tenancy, by default None
overwrite : bool, optional
if set create() will behave like upsert(), by default False
ignore_errors : bool, optional
if set tests the connection at init time and raises an exception if failed, by default False
proxy : str, optional
proxies all requests to the provided proxy string (for debugging purpose), by default None
See Also
--------
api.model.Entity
"""
self.hostname = hostname
self.port = port
self.port_temporal = port_temporal
self.secure = secure
self.scheme = "https" if secure else "http"
self.url = f"{self.scheme}://{hostname}:{port}"
self.basepath = f"{self.url}/{NGSILD_PATH}"
self.useragent = useragent
self.tenant = tenant
self.overwrite = overwrite
self.ignore_errors = ignore_errors
self.proxy = proxy
self.session = requests.Session()
if custom_auth:
self.session.auth = custom_auth
self.session.headers = {
"User-Agent": self.useragent,
"Accept": "application/ld+json",
"Content-Type": "application/ld+json",
}
if tenant is not None:
self.session.headers["NGSILD-Tenant"] = tenant
if proxy:
self.session.proxies = {proxy}
logger.info("Connecting client ...")
self._entities = Entities(self, f"{self.url}/{ENDPOINT_ENTITIES}")
self._batch = BatchOp(self, f"{self.url}/{ENDPOINT_BATCH}")
self._types = Types(self, f"{self.url}/{ENDPOINT_TYPES}")
self._contexts = Contexts(self, f"{self.url}/{ENDPOINT_CONTEXTS}")
self._subscriptions = Subscriptions(self, f"{self.url}/{ENDPOINT_SUBSCRIPTIONS}")
url_temporal = f"{self.scheme}://{hostname}:{port_temporal}"
temporal_path = (
f"{url_temporal}/{NGSILD_BASEPATH}/temporal/entities"
if port_temporal == port # temporal share the same port and basepath
else f"{url_temporal}/temporal/entities"
)
self._temporal = Temporal(self, temporal_path)
self.broker = Broker(Vendor.UNKNOWN, "N/A")
# get status and retrieve Context Broker information
status = self.is_connected(raise_for_disconnected=True)
if status:
self.broker = Broker(*self.guess_vendor())
if is_interactive():
print(self._welcome_message())
else:
if is_interactive():
print(self._fail_message())
[docs] def raise_for_status(self, r: Response):
"""Raises an exception depending on the API response.
Parameters
----------
r : Response
Response from the Context Broker
"""
if not self.ignore_errors:
r.raise_for_status()
[docs] def is_connected(self, raise_for_disconnected=False) -> bool:
"""Test if connection to Context Broker is established.
Send a valid test request to the Context Broker and expects a result.
Parameters
----------
raise_for_disconnected : bool, optional
throw an exception if connection fails (if not set only return False), by default False
Returns
-------
bool
True if the Context Broker replies
Raises
------
NgsiNotConnectedError
"""
url = f"{self.url}/{ENDPOINT_ENTITIES}"
params = {"type": "None", "limit": 0, "count": "true"}
try:
r = self.session.get(
url,
headers={
"Accept": "application/json",
"Content-Type": None,
}, # overrides session headers
params=params,
)
r.raise_for_status()
except Exception as e:
if is_interactive():
return False
if raise_for_disconnected:
raise NgsiNotConnectedError(f"Cannot connect to Context Broker at {self.hostname}:{self.port}: {e}")
else:
logger.error(e)
return False
return True
@property
def version(self) -> str:
return __version__
@property
def entities(self):
return self._entities
@property
def batch(self):
return self._batch
@property
def types(self):
return self._types
@property
def contexts(self):
return self._contexts
@property
def subscriptions(self):
return self._subscriptions
@property
def temporal(self):
return self._temporal
[docs] def close(self):
"""Terminates the client.
Closes the underlying Requests.Session.
"""
self.session.close()
@overload
def create(self, entity: Entity, skip: bool = False, overwrite: bool = False) -> Entity:
"""Create an entity.
Facade method for Entities.create().
Parameters
----------
entity : Entity
the entity to be created by the Context Broker
skip : bool, optional
if set, skips creation (do nothing) if already exists, by default False
overwrite : bool, optional
if set, force upsert the entity if already exists, by default False
Returns
-------
Entity
the entity succesfully created
"""
...
@overload
def create(self, entities: List[Entity], skip: bool = False, overwrite: bool = False):
"""Create a batch of entities.
Facade method for Batch.create().
Parameters
----------
entities : List[Entity]
the entity to be created by the Context Broker
skip : bool, optional
if set, skips creation (do nothing) if already exists, by default False
overwrite : bool, optional
if set, force upsert the entity if already exists, by default False
Returns
-------
BatchOperationResult
TODO
"""
...
def create(
self,
_entities: Union[Entity, List[Entity]],
skip: bool = False,
overwrite: bool = False,
) -> Optional[Entity]:
if isinstance(_entities, Entity):
entity = _entities
return self.entities.create(entity, skip, overwrite)
else:
return self.batch.create(_entities, skip, overwrite)
[docs] def get(
self,
eid: Union[EntityId, Entity],
ctx: str = None,
asdict: bool = False,
**kwargs,
) -> Entity:
"""Retrieve an entity given its id.
Facade method for Entities.retrieve().
If already dealing with an entity instance one can provide the entity itself instead of its id.
Parameters
----------
eid : Union[EntityId, Entity]
The entity identifier or the entity instance
ctx : str
The context
asdict : bool, optional
If set (instead of returning an Entity) returns the raw API response (a Python dict that represents the JSON response), by default False
Returns
-------
Entity
The retrieved entity
"""
return self.entities.get(eid, ctx, asdict, **kwargs)
@overload
def delete(self, eid: Union[EntityId, Entity]) -> bool:
"""Delete an entity given its id.
Facade method for Entities.delete().
If already dealing with an entity instance one can provide the entity itself instead of its id.
Parameters
----------
eid : Union[EntityId, Entity]
The entity identifier or the entity instance
Returns
-------
bool
True if the entity has been succefully deleted
"""
...
@overload
def delete(self, eids: List[Union[EntityId, Entity]]) -> bool:
"""Delete entities given its id.
Facade method for Batch.delete().
If already dealing with entity instances one can provide the entities instead of ids.
Parameters
----------
eids : List[Union[EntityId, Entity]]
The entities ids or instances
Returns
-------
bool
True if the entity has been succefully deleted
"""
...
def delete(self, eids: Union[Union[EntityId, Entity], List[Union[EntityId, Entity]]]) -> bool:
if isinstance(eids, list):
return self.batch.delete(eids)
else:
eid = eids
return self.entities.delete(eid)
[docs] def delete_from_file(self, filename: str) -> Union[Entity, dict]:
"""Delete in the broker all entities present in the JSON file.
Parameters
----------
filename : str
Points to the JSON input file that contains entities.
"""
entities = Entity.load(filename)
return self.delete(entities)
[docs] def exists(self, eid: Union[EntityId, Entity]) -> bool:
"""Tests if an entity exists.
Facade method for Entities.exists().
If already dealing with an entity instance one can provide the entity itself instead of its id.
Parameters
----------
eid : Union[EntityId, Entity]
The entity identifier or the entity instance
Returns
-------
bool
True if the entity exists
"""
return self.entities.exists(eid)
@overload
def upsert(self, entity: Entity) -> Entity:
"""Upsert the entity or update it if already exists.
Facade method for Entities.upsert().
Parameters
----------
entity : Entity
The entity to be upserted by the Context Broker
Returns
-------
Entity
The entity successfully upserted
"""
...
@overload
def upsert(self, entities: List[Entity]) -> dict:
"""Upsert a batch of entities.
Facade method for Batch.upsert().
Parameters
----------
entity : Entity
The entity to be upserted by the Context Broker
Returns
-------
Entity
The entities successfully upserted
"""
...
def upsert(self, entities: Union[Entity, List[Entity]]) -> Union[Entity, dict]:
if isinstance(entities, Entity):
entity = entities
return self.entities.upsert(entity)
else:
return self.batch.upsert(entities)
[docs] def bulk_import(self, filename: str) -> Union[Entity, dict]:
"""Upsert all entities from a JSON file.
Parameters
----------
filename : str
Points to the JSON input file that contains entities.
"""
entities = Entity.load(filename)
return self.upsert(entities)
@overload
def update(self, entity: Entity) -> Optional[Entity]:
"""Update the entity.
Facade method for Entities.update().
Parameters
----------
entity : Entity
The entity to be updated by the Context Broker
Returns
-------
Optional[Entity]
The entity successfully updated (or None if not found)
"""
...
@overload
def update(self, entities: List[Entity]) -> dict:
"""Update a batch of entities.
Facade method for Batch.update().
Parameters
----------
entities : List[Entity]
The entities to be updated by the Context Broker
Returns
-------
Optional[Entity]
The entity successfully updated (or None if not found)
"""
...
def update(self, entities: Union[Entity, List[Entity]]) -> Union[Optional[Entity], dict]:
if isinstance(entities, Entity):
entity = entities
return self.entities.update(entity)
else:
return self.batch.update(entities)
[docs] def query_head(self, type: str = None, q: str = None, gq: str = None, ctx: str = None, n: int = 5) -> List[Entity]:
"""Retrieve entities given its type and/or query string.
Retrieve up to PAGINATION_LIMIT_MAX entities.
Use query_all() to retrieve all entities.
Use entities.query() to deal with limit and offset on your own.
Parameters
----------
etype : str
The entity's type
q: str
The query string (NGSI-LD Query Language)
gq: str
The geoquery string (NGSI-LD Geoquery Language)
ctx: str
The context
n: int
The first n entities to be retrieved
Returns
-------
list[Entity]
Retrieved entities matching the given type and/or query string
Example
-------
>>> with Client() as client:
>>> client.query(type="AgriFarm") # match a given type
>>> with Client() as client:
>>> client.query(type="AgriFarm", q='contactPoint[email]=="wheatfarm@email.com"') # match type and query
"""
return self.entities._query(type, q, gq, ctx, limit=n)
[docs] def query(
self,
type: str = None,
q: str = None,
gq: str = None,
ctx: str = None,
limit: int = PAGINATION_LIMIT_MAX,
max: int = 1_000_000,
) -> List[Entity]:
"""Retrieve entities given its type and/or query string.
Retrieve all entities by sending as many requests as needed, using pagination.
Assume data hold in memory. Should not be an issue except for very large datasets.
Parameters
----------
etype : str
The entity's type
q: str
The query string (NGSI-LD Query Language)
gq: str
The geoquery string (NGSI-LD Geoquery Language)
ctx: str
The context
limit: int
The number of entities retrieved in each request
Returns
-------
list[Entity]
Retrieved entities matching the given type and/or query string
Example
-------
>>> with Client() as client:
>>> client.query(type="AgriFarm") # match a given type
>>> with Client() as client:
>>> client.query(type="AgriFarm", q='contactPoint[email]=="wheatfarm@email.com"') # match type and query
"""
entities: list[Entity] = []
count = self.entities.count(type, q, gq, ctx=ctx)
if count > max:
raise NgsiClientTooManyResultsError(f"{count} results exceed maximum {max}")
for page in range(ceil(count / limit)):
entities.extend(self.entities._query(type, q, gq, ctx, limit, page * limit))
return entities
[docs] def query_generator(
self,
type: str = None,
q: str = None,
gq: str = None,
ctx: str = None,
limit: int = PAGINATION_LIMIT_MAX,
batch: bool = False,
) -> Generator[Entity, None, None]:
"""Retrieve (as a generator) entities given its type and/or query string.
By returning a generator it allows to process entities on the fly without any risk of exhausting memory.
Parameters
----------
etype : str
The entity's type
q: str
The query string (NGSI-LD Query Language)
gq: str
The geoquery string (NGSI-LD Geoquery Language)
ctx: str
The context
limit: int
The number of entities retrieved in each request
Returns
-------
list[Entity]
Retrieved a generator of entities (matching the given type and/or query string)
Example
-------
>>> with Client() as client:
>>> for entity in client.query_handle(type="AgriFarm"):
print(entity)
"""
count = self.entities.count(type, q)
for page in range(ceil(count / limit)):
if batch:
yield self.entities._query(type, q, gq, ctx, limit, page * limit)
else:
yield from self.entities._query(type, q, gq, ctx, limit, page * limit)
[docs] def query_handle(
self,
type: str = None,
q: str = None,
gq: str = None,
ctx: str = None,
limit: int = PAGINATION_LIMIT_MAX,
*,
callback: Callable[[Entity], None],
) -> None:
"""Apply a callback function on entity of the query result.
Parameters
----------
etype : str
The entity's type
q: str
The query string (NGSI-LD Query Language)
gq: str
The geoquery string (NGSI-LD Geoquery Language)
ctx: str
The context
limit: int
The number of entities retrieved in each request
callback: Callable[Entity]
The function to be called on each entity of the result
Example
-------
>>> with Client() as client:
>>> client.query_handle(type="AgriFarm", lambda e: print(e))
"""
for entity in self.query_generator(type, q, gq, ctx, limit, False):
callback(entity)
[docs] def count(self, type: str = None, q: str = None, gq: str = None) -> int:
"""Return number of entities matching type and/or query string.
Facade method for Entities.count().
Parameters
----------
etype : str
The entity's type
q: str
The query string (NGSI-LD Query Language)
gq: str
The geoquery string (NGSI-LD Geoquery Language)
Returns
-------
int
The number of matching entities
Example
-------
>>> with Client() as client:
>>> client.count(type="AgriFarm") # match a given type
>>> with Client() as client:
>>> client.count(type="AgriFarm", query='contactPoint[email]=="wheatfarm@email.com"') # match type and query
"""
return self.entities.count(type, q, gq)
[docs] def delete_where(self, type: str = None, q: str = None, gq: str = None):
"""Batch delete entities matching type and/or query string.
Parameters
----------
etype : str
The entity's type
q: str
The query string (NGSI-LD Query Language)
gq: str
The geoquery string (NGSI-LD Geoquery Language)
Example
-------
>>> with Client() as client:
>>> client.delete_where(type="AgriFarm", query='contactPoint[email]=="wheatfarm@email.com"') # match type and query
"""
g = self.query_generator(type, q, gq, batch=True)
for batch in g:
self.batch.delete(batch)
[docs] def drop(self, *types: str) -> None:
"""Batch delete entities matching the given type.
Parameters
----------
type : str
The entity's type
Example
-------
>>> with Client() as client:
>>> client.drop("AgriFarm")
"""
for t in types:
self.delete_where(type=t)
[docs] def purge(self) -> None:
"""Batch delete all entities.
Example
-------
>>> with Client() as client:
>>> client.purge()
"""
for type in self.types.list():
self.drop(type)
[docs] def flush_all(self) -> None:
"""Batch delete all entities and remove all contexts.
Example
-------
>>> with Client() as client:
>>> client.purge()
"""
self.purge(type)
self.contexts.cleanup()
[docs] def guess_vendor(self) -> tuple[Vendor, Version]:
"""Try to guess the Context Broker vendor.
According to its own API, by using version or status endpoint.
Returns
-------
tuple[Vendor, Version]
A tuple composed of the Vendor (if identified) and the version.
Example
-------
>>> with Client() as client:
>>> print(client.guess_vendor())
(<Vendor.ORIONLD: 'Orion-LD'>, 'post-v0.8.1')
"""
if broker := self._broker_version_orionld():
return broker
if broker := self._broker_version_java_spring():
return broker
return Vendor.UNKNOWN, "N/A"
def _broker_version_orionld(self) -> Optional[str]:
"""Requests the broker looking for Orion-LD version.
Targets the /version endpoint.
Returns
-------
Optional[str]
The Orion-LD version if found
"""
url = f"{self.url}/version"
headers = {
"Accept": "application/json",
"Content-Type": None,
} # overrides session headers
try:
r = self.session.get(url, headers=headers)
r.raise_for_status()
return Vendor.ORIONLD, r.json()["orionld version"]
except Exception:
return None
def _broker_version_java_spring(self) -> Optional[Tuple[Vendor, str]]:
"""Requests the Java-Spring based broker looking for Vendor and Version.
Targets the /actuator/info endpoint.
Returns
-------
Optional[Tuple[Vendor, str]]
A tuple composed of the Vendor and the broker version
"""
url = f"{self.url}/actuator"
headers = {
"Accept": "application/json",
"Content-Type": None,
} # overrides session headers
try:
r = self.session.get(f"{url}/health", headers=headers)
r.raise_for_status()
except Exception:
return None
try:
r = self.session.get(f"{url}/info", headers=headers)
r.raise_for_status()
build = r.json()["build"]
version = build["version"]
group = build["group"]
if group == "eu.neclab.ngsildbroker":
vendor = Vendor.SCORPIO
elif group == "com.egm.stellio":
vendor = Vendor.STELLIO
else:
return None
return vendor, version
except Exception:
if is_interactive():
print("Java-Spring based Context Broker detected. Try to enable info endpoint.")
return None
def _welcome_message(self) -> str:
return f"Connected to Context Broker at {self.hostname}:{self.port} | vendor={self.broker.vendor.value} | version={self.broker.version}"
def _fail_message(self) -> str:
return f"Failed to connect to Context Broker at {self.hostname}:{self.port}"
def _warn_spring_message(self) -> str:
return "Java-Spring based Context Broker detected. Info endpoint disabled."
# below the context manager methods
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self.close()