Source code for ngsildclient.api.temporal

#!/usr/bin/env python3

# Software Name: ngsildclient
# SPDX-FileCopyrightText: Copyright (c) 2021 Orange
# SPDX-License-Identifier: Apache 2.0
#
# This software is distributed under the Apache 2.0;
# see the NOTICE file for more details.
#
# Author: Fabien BATTELLO <fabien.battello@orange.com> et al.

from __future__ import annotations
from typing import TYPE_CHECKING, Union, List, Optional, Generator, Callable
from dataclasses import dataclass
from datetime import timedelta
from isodate import duration_isoformat
from functools import reduce
from operator import iconcat


import logging

if TYPE_CHECKING:
    from .client import Client

from .constants import EntityId, JSONLD_CONTEXT, AggrMethod
from .helper.temporal import TemporalQuery
from ..model.entity import Entity
from ngsildclient.utils import iso8601, is_pandas_installed

logger = logging.getLogger(__name__)


def _addopt(params: dict, newopt: str):
    if params.get("options", "") == "":
        params["options"] = newopt
    else:
        params["options"] += f",{newopt}"


def _troes_to_dfdict(troes: dict):
    # the result dictionary is independant of the number of attributes !
    d = {}
    if not isinstance(troes, List):
        troes = [troes]
    troe: dict = troes[0]
    nentities = len(troes)
    attrs = [str(k) for k in troe.keys() if k not in ("id", "type", "@context")]
    attr0 = attrs[0]
    datetimes = [x[1] for x in troe[attr0]["values"]]
    nmeasures: int = len(datetimes)
    for attr in attrs[1:]:
        if [x[1] for x in troe[attr]["values"]] != datetimes:
            raise ValueError("Cannot pack result : attributes have distinct observedAt values.")
    etype = troe["type"]
    d[etype] = reduce(iconcat, [[troe["id"].rsplit(":")[-1]] * nmeasures for troe in troes], [])
    d["observed"] = [iso8601.parse(x)[2] for x in datetimes] * nentities
    for attr in attrs:
        d[attr] = []
        for troe in troes:
            for value in troe[attr]["values"]:
                d[attr].append(value[0])
    return d


def troes_to_dataframe(troes: dict):
    d = _troes_to_dfdict(troes)
    try:
        import pandas
    except ImportError:
        raise ValueError("Cannot export to dataframe : pandas not installed.")
    return pandas.DataFrame(d)





[docs]@dataclass class TemporalResult: result: List[dict] pagination: Optional[Pagination] = None
[docs]class Temporal: """A wrapper for the NGSI-LD API temporal endpoint.""" def __init__(self, client: Client, url: str): self._client = client self._session = client.session self.url = url def _get( self, eid: Union[EntityId, Entity], attrs: List[str] = None, ctx: str = None, verbose: bool = False, lastn: int = 0, pagesize: int = 0, # default broker pageSize pageanchor: str = None, count: bool = True, ) -> TemporalResult: eid = eid.id if isinstance(eid, Entity) else eid params = {} headers = { "Accept": "application/ld+json", "Content-Type": None, } # overrides session headers if ctx is not None: headers["Link"] = f'<{ctx}>; rel="{JSONLD_CONTEXT}"; type="application/ld+json"' if count: _addopt(params, "count") params = {} if attrs: params["attrs"] = ",".join(attrs) if lastn > 0: params["lastN"] = lastn if pagesize > 0: params["pageSize"] = pagesize if pageanchor is not None: params["pageAnchor"] = pageanchor if not verbose: _addopt(params, "temporalValues") r = self._session.get(f"{self.url}/{eid}", headers=headers, params=params) self._client.raise_for_status(r) return TemporalResult(r.json(), Pagination.from_headers(r.headers)) # equivalent to get_all()
[docs] def get( self, eid: Union[EntityId, Entity], attrs: List[str] = None, ctx: str = None, verbose: bool = False, pagesize: int = 0, as_dataframe: bool = False, ) -> List[dict]: """Retrieve the Temporal Representation of (an) Entity (TRoE) given its id. If already dealing with an entity instance one can provide the entity itself instead of its id. Parameters ---------- eid : Union[EntityId, Entity] The entity identifier or the entity instance attrs : List[str] The list of the attributes (changing over time) you're interested in ctx : str The context verbose: bool Default is False, meaning the result is formatted as simplified TRoE. as_dataframe : bool Default is false, meaning it returns JSON TRoE. If set returns a pandas dataframe. Requires pandas. Returns ------- dict A dict equivalent to the Temporal Representation of the Entity """ if as_dataframe: if is_pandas_installed(): verbose = False # force simplified representation else: raise ValueError("Cannot export to dataframe : pandas not installed.") r: TemporalResult = self._get(eid, attrs, ctx, verbose, pagesize=pagesize) troes: List[dict] = r.result while r.pagination.next_url is not None: r: TemporalResult = self._get(eid, attrs, ctx, verbose, pagesize=pagesize, pageanchor=r.pagination.next_url) troes.extend(r.result) return troes_to_dataframe(troes) if as_dataframe else troes
def _query( self, eid: Union[EntityId, Entity] = None, type: str = None, attrs: List[str] = None, q: str = None, gq: str = None, ctx: str = None, verbose: bool = False, tq: TemporalQuery = None, lastn: int = 0, pagesize: int = 0, # default broker pageSize pageanchor: str = None, count: bool = True, ) -> TemporalResult: params = {} if eid: params["id"] = eid if type: params["type"] = type if attrs: params["attrs"] = ",".join(attrs) if q: params["q"] = q if gq: params["georel"] = gq if count: _addopt(params, "count") if not verbose: _addopt(params, "temporalValues") if tq is None: tq = TemporalQuery().before() params |= tq if lastn > 0: params["lastN"] = lastn if pagesize > 0: params["pageSize"] = pagesize if pageanchor is not None: params["pageAnchor"] = pageanchor headers = { "Accept": "application/ld+json", "Content-Type": None, } # overrides session headers if ctx is not None: headers["Link"] = f'<{ctx}>; rel="{JSONLD_CONTEXT}"; type="application/ld+json"' r = self._session.get( self.url, headers=headers, params=params, ) self._client.raise_for_status(r) return TemporalResult(r.json(), Pagination.from_headers(r.headers)) def query_head( self, *, eid: Union[EntityId, Entity] = None, type: str = None, attrs: List[str] = None, q: str = None, gq: str = None, ctx: str = None, verbose: bool = False, tq: TemporalQuery = None, limit: int = 5, as_dataframe: bool = False, ) -> List[dict]: if as_dataframe: if is_pandas_installed(): verbose = False # force simplified representation else: raise ValueError("Cannot export to dataframe : pandas not installed.") troes = self._query(eid, type, attrs, q, gq, ctx, verbose, tq, lastn=limit, pagesize=limit).result return troes_to_dataframe(troes) if as_dataframe else troes
[docs] def query( self, *, eid: Union[EntityId, Entity] = None, type: str = None, attrs: List[str] = None, q: str = None, gq: str = None, ctx: str = None, verbose: bool = False, tq: TemporalQuery = None, lastn: int = 0, pagesize: int = 0, as_dataframe: bool = False, ) -> List[dict]: """Retrieve Temporal Representation of Entities (TRoE) given id, or type and/or query string. Retrieve all TRoEs matching the criteria. Do the dirty pagination job for you, sending under the wood as many requests as needed. Assume data hold in memory. Should not be an issue except for very large datasets. Parameters ---------- eid : Union[EntityId, Entity] The entity identifier or the entity instance etype : str The entity's type attrs : List[str] The list of the attributes (changing over time) you're interested in ctx : str The context q: str The query string (NGSI-LD Query Language) gq: str The geoquery string (NGSI-LD Geoquery Language) verbose: bool Default is False, meaning the result is formatted as simplified TRoE. tq: TemporalQuery The temporal query as a py:class:: TemporalQuery instance lastn: int Among the temporal values, limit the result to the latest <lastn> values. By default returns all values. pagesize: int By default the broker pagesize default. as_dataframe : bool Default is false, meaning it returns JSON TRoE. If set returns a pandas dataframe. Requires pandas. limit: int The number of entities retrieved in each request Returns ------- list[dict] The Temporal Representation of the Entities matching the given criteria Example ------- >>> with Client() as client: >>> troe = client.temporal.query(type="RoomObserved") """ if as_dataframe: if is_pandas_installed(): verbose = False # force simplified representation else: raise ValueError("Cannot export to dataframe : pandas not installed.") r: TemporalResult = self._query(eid, type, attrs, q, gq, ctx, verbose, tq, lastn=lastn, pagesize=pagesize) troes: List[dict] = r.result while r.pagination.next_url is not None: r: TemporalResult = self._query( eid, type, attrs, q, gq, ctx, verbose, tq, lastn=lastn, pagesize=pagesize, pageanchor=r.pagination.next_url ) troes.extend(r.result) return troes_to_dataframe(troes) if as_dataframe else troes
def query_generator( self, *, eid: Union[EntityId, Entity] = None, type: str = None, attrs: List[str] = None, q: str = None, gq: str = None, ctx: str = None, verbose: bool = False, tq: TemporalQuery = None, pagesize: int = 0, ) -> Generator[List[dict], None, None]: r: TemporalResult = self._query(eid, type, attrs, q, gq, ctx, verbose, tq, pagesize=pagesize) troes = r.result yield from troes while r.pagination.next_url is not None: r: TemporalResult = self._query( eid, type, attrs, q, gq, ctx, verbose, tq, pagesize=pagesize, pageanchor=r.pagination.next_url ) troes = r.result yield from troes def query_handle( self, *, eid: Union[EntityId, Entity] = None, type: str = None, attrs: List[str] = None, q: str = None, gq: str = None, ctx: str = None, verbose: bool = False, tq: TemporalQuery = None, pagesize: int = 0, callback: Callable[[Entity], None], ) -> None: for troe in self.query_generator(eid, type, attrs, q, gq, ctx, verbose, tq, pagesize): callback(troe) def aggregate( self, *, type: str = None, attrs: List[str] = None, q: str = None, gq: str = None, ctx: str = None, tq: TemporalQuery = None, lastn: int = 0, pagesize: int = 0, # default broker pageSize pageanchor: str = None, count: bool = False, methods: List[AggrMethod] = [AggrMethod.AVERAGE], period: timedelta = timedelta(days=1), ) -> TemporalResult: params = {} if type: params["type"] = type if attrs: params["attrs"] = ",".join(attrs) if q: params["q"] = q if gq: params["georel"] = gq _addopt(params, "aggregatedValues") if count: _addopt(params, "count") if tq is None: tq = TemporalQuery().before() params |= tq if lastn > 0: params["lastN"] = lastn if pagesize > 0: params["pageSize"] = pagesize if pageanchor is not None: params["pageAnchor"] = pageanchor params["aggrMethods"] = ",".join([m.value for m in methods]) params["aggrPeriodDuration"] = duration_isoformat(period) headers = { "Accept": "application/ld+json", "Content-Type": None, } # overrides session headers if ctx is not None: headers["Link"] = f'<{ctx}>; rel="{JSONLD_CONTEXT}"; type="application/ld+json"' r = self._session.get( self.url, headers=headers, params=params, ) self._client.raise_for_status(r) return TemporalResult(r.json(), Pagination.from_headers(r.headers))