Source code for ngsildclient.api.subscriptions

#!/usr/bin/env python3

# Software Name: ngsildclient
# SPDX-FileCopyrightText: Copyright (c) 2021 Orange
# SPDX-License-Identifier: Apache 2.0
#
# This software is distributed under the Apache 2.0;
# see the NOTICE file for more details.
#
# Author: Fabien BATTELLO <fabien.battello@orange.com> et al.

from __future__ import annotations
from typing import TYPE_CHECKING, Optional
from functools import partialmethod
from hashlib import sha1

import re
import json

from ..model.constants import CORE_CONTEXT
from .exceptions import NgsiApiError

if TYPE_CHECKING:
    from .client import Client
from .exceptions import NgsiResourceNotFoundError, rfc7807_error_handle


[docs]class Subscriptions: """A wrapper for the NGSI-LD API subscriptions endpoint.""" def __init__(self, client: Client, url: str): self._client = client self._session = client.session self.url = url @rfc7807_error_handle def create(self, subscr: dict, raise_on_conflict: bool = True) -> bool: if raise_on_conflict: conflicts = self.conflicts(subscr) if conflicts: raise ValueError(f"Some subscriptions already exist with same target : {conflicts}") r = self._session.post(f"{self.url}/", json=subscr) self._client.raise_for_status(r) location = r.headers.get("Location") if location is None: if self._client.ignore_errors: return None else: raise NgsiApiError("Missing Location header") id_returned_from_broker = location.rsplit("/", 1)[-1] id = subscr.get("id") if id is not None and id != id_returned_from_broker: raise NgsiApiError( f"Broker returned wrong id. Expected={id} Returned={id_returned_from_broker}" ) return id_returned_from_broker @rfc7807_error_handle def list(self, pattern: str = None, ctx: str = CORE_CONTEXT) -> Optional[dict]: headers = { "Accept": "application/ld+json", "Content-Type": None, } # overrides session headers if ctx is not None: headers[ "Link" ] = f'<{ctx}>; rel="{CORE_CONTEXT}"; type="application/ld+json"' r = self._session.get(f"{self.url}") subscriptions = r.json() if pattern is not None: subscriptions = [ x for x in subscriptions if re.search( pattern, x.get("name", "") + x.get("description", ""), re.IGNORECASE ) ] return subscriptions @staticmethod def _criteria_only(subscr: dict): params = subscr.copy() params.pop("id", None) params.pop("name", None) params.pop("description", None) params.pop("isActive", None) return params @staticmethod def _hash(subscr: dict): criteria = Subscriptions._criteria_only(subscr) return sha1(json.dumps(criteria, sort_keys=True).encode("utf-8")).digest() @rfc7807_error_handle def conflicts(self, subscr: dict, ctx: str = CORE_CONTEXT) -> list: hashref = Subscriptions._hash(subscr) headers = { "Accept": "application/ld+json", "Content-Type": None, } # overrides session headers if ctx is not None: headers[ "Link" ] = f'<{ctx}>; rel="{CORE_CONTEXT}"; type="application/ld+json"' r = self._session.get(f"{self.url}") return [x for x in r.json() if Subscriptions._hash(x) == hashref] @rfc7807_error_handle def get(self, id: str, ctx: str = CORE_CONTEXT) -> dict: headers = { "Accept": "application/ld+json", "Content-Type": None, } # overrides session headers if ctx is not None: headers[ "Link" ] = f'<{ctx}>; rel="{CORE_CONTEXT}"; type="application/ld+json"' r = self._session.get(f"{self.url}/{id}", headers=headers) self._client.raise_for_status(r) return r.json() @rfc7807_error_handle def exists(self, id: str, ctx: str = CORE_CONTEXT) -> bool: try: payload = self.get(id, ctx) if payload: return "@context" in payload except NgsiResourceNotFoundError: return False return False @rfc7807_error_handle def _delete(self, id: str) -> bool: r = self._session.delete(f"{self.url}/{id}") self._client.raise_for_status(r) return bool(r) @rfc7807_error_handle def delete(self, pattern: str) -> bool: deleted = False for subscription in self.list(pattern): deleted |= self._delete(subscription["id"]) return deleted purge = partialmethod(delete, pattern=None)