Source code for ngsildclient.api.helper.subscription
#!/usr/bin/env python3
# Software Name: ngsildclient
# SPDX-FileCopyrightText: Copyright (c) 2021 Orange
# SPDX-License-Identifier: Apache 2.0
#
# This software is distributed under the Apache 2.0;
# see the NOTICE file for more details.
#
# Author: Fabien BATTELLO <fabien.battello@orange.com> et al.
from dataclasses import dataclass
from ...model.constants import CORE_CONTEXT
[docs]@dataclass
class NotificationParams:
uri: str
attrs: list[str] = None
format: str = "normalized"
def to_dict(self) -> dict:
d = {}
if self.attrs:
d["attributes"] = self.attrs
d["format"] = self.format
d["endpoint"] = {"uri": self.uri, "accept": "application/ld+json"}
return d
[docs]@dataclass
class Subscription:
notification: NotificationParams
id: str = None
type: str = "Subscription"
name: str = None
description: str = None
entities: list[dict] = None # id, idPattern, or type
watched_attrs: list[str] = None
# time_interval: int = 0
query: str = None
# gquery: str = None
# csf: str = None
active: bool = True
# expiresat: datetime = None
# throttling: int = 0
# tquery: str = None
# scope: str = None
# lang: str = None
ctx: str = CORE_CONTEXT
def to_dict(self) -> dict:
d = {}
if self.id:
d["id"] = self.id
d["type"] = self.type
if self.name:
d["name"] = self.name
if self.description:
d["description"] = self.description
if self.entities:
d["entities"] = self.entities
if self.watched_attrs:
d["watchedAttributes"] = self.watched_attrs
if self.query:
d["q"] = self.query
d["isActive"] = self.active
d["notification"] = self.notification.to_dict()
d["@context"] = self.ctx
return d
class SubscriptionBuilder:
def __init__(
self,
uri: str,
):
notification = NotificationParams(uri)
self._subscr = Subscription(notification)
self._subscr.entities = []
def id(self, value: str):
if not isinstance(value, str):
raise ValueError("id shall be a string")
self._subscr.id = value
return self
def name(self, value: str):
if not isinstance(value, str):
raise ValueError("name shall be a string")
self._subscr.name = value
return self
def description(self, value: str):
if not isinstance(value, str):
raise ValueError("description shall be a string")
self._subscr.description = value
return self
def select_id(self, value: str):
if not isinstance(value, str):
raise ValueError("EntitySelector id shall be a string")
self._subscr.entities.append({"id": value})
return self
def select_idpattern(self, value: str):
if not isinstance(value, str):
raise ValueError("EntitySelector idPattern shall be a string")
self._subscr.entities.append({"idPattern": value})
return self
def select_type(self, value: str):
if not isinstance(value, str):
raise ValueError("EntitySelector type shall be a string")
self._subscr.entities.append({"type": value})
return self
def watch(self, value: list[str]):
if not isinstance(value, list):
raise ValueError("watchedAttributes shall be a list of strings")
if value == []:
raise ValueError("Empty array is not allowed")
self._subscr.watched_attrs = value
return self
def query(self, value: str):
if not isinstance(value, str):
raise ValueError("query shall be a string")
self._subscr.query = value
return self
def notif(self, value: list[str]):
if not isinstance(value, list):
raise ValueError("attribute names shall be a list of strings")
if value == []:
raise ValueError("Empty array is not allowed")
self._subscr.notification.attrs = value
return self
def context(self, value: str):
if not isinstance(value, str):
raise ValueError("context shall be a string")
self._subscr.ctx = value
return self
def build(self) -> dict:
if self._subscr.entities == [] and self._subscr.watched_attrs is None:
raise ValueError(
"At least one of (a) entities or (b) watchedAttributes shall be present."
)
return self._subscr.to_dict()