Interact with the Broker
This chapter explains how to use the NGSI-LD Client.
Connect to the broker
Connect
from ngsilclient import Client
client = Client("my.broker.org", 8000)
localhost.1026.docker compose up -d.from ngsilclient import Client
client = Client()
Note
In interactive mode the Client displays additional information, probing the broker to guess the vendor and version.
Disconnect
To free resources it’s recommended to properly close the client at the end.
from ngsilclient import Client
client = Client()
print(client.is_connected()) # some processing
client.close()
The with statement
One could use the with statement. It will automatically close the client.
from ngsilclient import Client
with Client() as client:
print(client.is_connected()) # some processing
Note
The is_connected() method sends a dummy but compliant request to the Context Broker then returns True if the broker answered.
Entities and contexts
Note
For convenience common operations such as operations on entities are provided by the Client class.
from ngsilclient import Client
with Client() as client:
print(client.list_types())
submodule |
NGSI-LD Resource |
|---|---|
entities |
entities |
batch |
entityOperations |
types |
types |
contexts |
jsonldContexts |
subscriptions |
subscriptions |
Entities
Entities operations handle Entity objects as defined in ngsildclient.model.entity.
Create a single entity
from ngsilclient import Client, Entity
entity = Entity("AirQualityObserved", "Bordeaux-AirProbe42-2022-03-24T09:00:00Z").prop("NO2", 8)
with Client() as client:
client.create(entity)
You can enable the overwrite argument
client.create(entity, overwrite=True)
It’s equivalent of the upsert() method.
If you prefer to skip creation you can enable the skip argument.
client.create(entity, skip=True)
You can enable an overwrite strategy at the client level globally for all operations.
client = Client(overwrite=True)
Create a batch of entities
from ngsilclient import Client, Entity
e1 = Entity("AirQualityObserved", "Bordeaux-AirProbe42-2022-03-24T09:00:00Z").prop("NO2", 8)
e2 = Entity("AirQualityObserved", "Bordeaux-AirProbe42-2022-03-24T10:00:00Z").prop("NO2", 9)
entities = [e1, e2]
with Client() as client:
client.create(entities)
Note
success identifiers and the errors ones.Note
Retrieve a single entity
from ngsilclient import Client, Entity
with Client() as client:
entity = client.get("urn:ngsi-ld:AirQualityObserved:Bordeaux-AirProbe42-2022-03-24T09:00:00Z")
entity.pprint()
Note
The get() method accept both a NGSI-LD identifier and an entity object.
If the entity doesn’t exist, a NgsiResourceNotFoundError exception is raised.
Note
The corresponding batch methods to retrieve a list of entities are known as query methods and prefixed with query_.
Check whether an entity exists
from ngsilclient import Client, Entity
with Client() as client:
if client.exists("urn:ngsi-ld:AirQualityObserved:Bordeaux-AirProbe42-2022-03-24T09:00:00Z"):
print("found measure at 9AM")
Note
Upsert a single entity
from ngsilclient import Client, Entity
with Client() as client:
entity = Entity("AirQualityObserved", "Bordeaux-AirProbe42-2022-03-24T09:00:00Z").prop("NO2", 8)
client.upsert(entity)
Note
The upsert() method is not atomic as - for an existing entity - it combines a delete operation followed by a create operation.
Upsert a batch of entities
from ngsilclient import Client, Entity
e1 = Entity("AirQualityObserved", "Bordeaux-AirProbe42-2022-03-24T09:00:00Z").prop("NO2", 8)
e2 = Entity("AirQualityObserved", "Bordeaux-AirProbe42-2022-03-24T10:00:00Z").prop("NO2", 9)
entities = [e1, e2]
with Client() as client:
client.upsert(entities)
Note
success and errors entries.Update a single entity
from ngsilclient import Client, Entity
with Client() as client:
entity = client.get("urn:ngsi-ld:AirQualityObserved:Bordeaux-AirProbe42-2022-03-24T09:00:00Z")
entity["NO2.value"] += 1
client.update(entity)
Update a batch of entities
from ngsilclient import Client, Entity
e1 = client.get("urn:ngsi-ld:AirQualityObserved:Bordeaux-AirProbe42-2022-03-24T09:00:00Z")
e2 = client.get("urn:ngsi-ld:AirQualityObserved:Bordeaux-AirProbe42-2022-03-24T10:00:00Z")
entities = [e1, e2]
for e in entities:
e["NO2.value"] += 1
with Client() as client:
client.update(entities)
Note
success and errors entries.Query Head
from ngsilclient import Client, Entity
with Client() as client:
entities = client.query_head(type="AirQualityObserved")
Note
Query All
The query_all() method returns a list of matching entities.
from ngsilclient import Client, Entity
with Client() as client:
entities = client.query_all(type="AirQualityObserved", q="NO2>40")
top10 = sorted(entities, reverse=True, key=lambda x: x["NO2.value"])[:10]
print(top10)
Note
Warning
Query Generator
from ngsilclient import Client, Entity
with Client() as client:
for e in client.query_generator(type="AirQualityObserved"):
e.pprint()
from ngsilclient import Client, Entity
with Client() as client:
g = client.query_generator(type="AirQualityObserved")
g = (e for e in g if e["NO2.value"] > 80) # generator comprehension
for e in g:
e.pprint()
Note
Low-Level Query
Count
The count() method returns the number of matching entities.
from ngsilclient import Client, Entity
with Client() as client:
exceed_threshold: int = client.count(type="AirQualityObserved", q="NO2>80")
print(f"Values over threshold : {exceed_threshold}")
Note
Delete a single entity
from ngsilclient import Client
with Client() as client:
client.delete("urn:ngsi-ld:AirQualityObserved:Bordeaux-AirProbe42-2022-03-24T09:00:00Z")
Note
Delete a batch of entities
from ngsilclient import Client, Entity
with Client() as client:
e1 = client.get("urn:ngsi-ld:AirQualityObserved:Bordeaux-AirProbe42-2022-03-24T09:00:00Z")
e2 = client.get("urn:ngsi-ld:AirQualityObserved:Bordeaux-AirProbe42-2022-03-24T10:00:00Z")
entities = [e1, e2]
client.delete(entities)
Note
success and errors entries.Conditional Delete
from ngsilclient import Client, Entity
with Client() as client:
client.delete_where(type="AirQualityObserved", q="NO2<0|NO2>1000")
Drop all entities of the same type
from ngsilclient import Client
with Client() as client:
client.drop("AirQualityObserved")
Purge all entities
from ngsilclient import Client
with Client() as client:
client.purge()
Caution
purge() removes ALL entities.
Flush all
Remove all entities and all contexts except the Core context.
from ngsilclient import Client
with Client() as client:
client.flush_all()
Caution
No confirmation is asked.
List types
List available entity types.
from ngsilclient import Client
with Client() as client:
client.list_types()
Contexts
List contexts
from ngsilclient import Client
with Client() as client:
contexts = client.contexts.list()
print(contexts)
It returns a list of strings. Each string heads to a context URI.
Note
There should be at least one entry : the default Core context.
Retrieve a context
from ngsilclient import Client
with Client() as client:
ctx_core = client.contexts.get("https://uri.etsi.org/ngsi-ld/v1/ngsi-ld-core-context.jsonld")
print(ctx_core)
Delete a context
from ngsilclient import Client
ctx_device = "https://github.com/smart-data-models/dataModel.Device/raw/aba14f18bb6e5f7ee1bd2f3b866d23c7ad630ad8/context.jsonld"
with Client() as client:
client.contexts.delete(ctx_device)
Delete any context matching a substring
devicefrom ngsilclient import Client
with Client() as client:
client.contexts.delete("device")
Note
Matching is case insensitive.
Cleanup contexts
Remove all contexts except the default core context.
from ngsilclient import Client, CORE_CONTEXT
with Client() as client:
client.contexts.cleanup()
Add a context
from ngsilclient import Client
ctx_nimp = {"@context": {"nimp": "https://nimp.org/nimp"}}
with Client() as client:
client.contexts.add(ctx_nimp)
Note
Raise a ValueError exception if input dictionary does not contain a @context key.
Check whether a context exists
from ngsilclient import Client, CORE_CONTEXT
with Client() as client:
if not client.contexts.exists(CORE_CONTEXT):
print("Missing default context !!")
Subscriptions
Model a Subscription payload
A simple example
from ngsilclient import SubscriptionBuilder
payload = SubscriptionBuilder("http://localhost:8000/air_quality_alerts")
.description("Notify me of high NO2 level")
.select_type("AirQualityObserved").watch(["NO2"]).query("NO2>200")
.build()
{
"type": "Subscription",
"description": "Notify me of high NO2 level",
"entities": [
{
"type": "AirQualityObserved"
}
],
"watchedAttributes": [
"NO2"
],
"q": "NO2>200",
"isActive": true,
"notification": {
"format": "normalized",
"endpoint": {
"uri": "http://localhost:8000/air_quality_alerts",
"accept": "application/ld+json"
}
},
"@context": "https://uri.etsi.org/ngsi-ld/v1/ngsi-ld-core-context.jsonld"
}
A more complex example from the Fiware tutorial
from ngsilclient import SubscriptionBuilder
payload = SubscriptionBuilder(NOTIF_URI)
.description("Notify me of low feedstock on Farm:001")
.select_type("FillingLevelSensor")
.watch(["filling"])
.query("filling>0.4;filling<0.6;controlledAsset==urn:ngsi-ld:Building:farm001")
.notif(["filling", "controlledAsset"])
.build()
{
"type": "Subscription",
"description": "Notify me of low feedstock on Farm:001",
"entities": [
{
"type": "FillingLevelSensor"
}
],
"watchedAttributes": [
"filling"
],
"q": "filling>0.4;filling<0.6;controlledAsset==urn:ngsi-ld:Building:farm001",
"isActive": true,
"notification": {
"attributes": [
"filling",
"controlledAsset"
],
"format": "normalized",
"endpoint": {
"uri": "http://tutorial:3000/subscription/low-stock-farm001-ngsild",
"accept": "application/ld+json"
}
},
"@context": "https://uri.etsi.org/ngsi-ld/v1/ngsi-ld-core-context.jsonld"
}
Subscribe (Create a Subscription)
from ngsilclient import Client
payload = SubscriptionBuilder("http://subscriber.docker.internal:8000/air_quality_alerts")
.description("Notify me of high NO2 level")
.select_type("AirQualityObserved").watch(["NO2"]).query("NO2>200")
.build()
with Client() as client:
subscr_id = client.subscriptions.create(payload)
Note
python simple_alert_receiver_server.py.from ngsilclient import Client, Entity
e = Entity("AirQualityObserved", "Bordeaux-AirProbe42-2022-03-24T09:00:00Z")
.prop("NO2", 300)
with Client() as client:
client.upsert(e)
orion:
image: fiware/orion-ld:latest
links:
- mongo
ports:
- "1026:1026"
command: -dbhost mongo
extra_hosts:
# where 192.168.1.42 is the host IP address
- "subscriber.docker.internal:192.168.1.42"
Note
raise_on_conflict boolean argument.List subscriptions
from ngsilclient import Client
with Client() as client:
subscriptions = client.subscriptions.list()
You can list only subscriptions whose name and description match a substring.
from ngsilclient import Client
with Client() as client:
subscriptions = client.subscriptions.list(pattern="farm")
Note
Matching is case insensitive.
You can list subscriptions that use the same criteria as a given subscription payload.
from ngsilclient import Client
payload = SubscriptionBuilder("http://localhost:8000/air_quality_alerts")
.description("Notify me of high NO2 level")
.select_type("AirQualityObserved").watch(["NO2"]).query("NO2>200")
.build()
with Client() as client:
subscriptions = client.subscriptions.conflicts(payload)
Retrieve a subscription
from ngsilclient import Client
with Client() as client:
subscriptions = client.subscriptions.get("urn:ngsi-ld:Subscription:622f5a9bb5a991ac07791d5a")
Check whether a subscription exists
from ngsilclient import Client
with Client() as client:
if client.subscriptions.get("urn:ngsi-ld:Subscription:622f5a9bb5a991ac07791d5a"):
print("Subscription found !")
Handle Errors
Exception Hierarchy
.-----------.
.-------------- NgsiError |---------------.
| '-----------' |
| |
v v
.----------------. .--------------.
| NgsiModelError | | NgsiApiError |------.
'----------------' '--------------' |
| | v
| | .-----------------------.
| | | NgsiNotConnectedError |
| | '-----------------------'
| | | NgsiHttpError |
| | '---------------'
| |
| v
| .------------------------.
| | NgsiContextBrokerError |
| '------------------------'
| |
v v
.--------------------. .------------------------.
| NgsiMissingIdError | | NgsiAlreadyExistsError |
'---.----------------------. '------.---------------------------.
| NgsiMissingTypeError | | NgsiResourceNotFoundError |
'--.---------------------. '-----.-------------------------.
| NgsiDateFormatError | | NgsiBadRequestDataError |
'---------------------' '-------------------------'
Mapping to NGSI-LD API errors
Exception |
Error Type |
|---|---|
NgsiInvalidRequestError |
|
NgsiBadRequestDataError |
|
NgsiAlreadyExistsError |
|
NgsiOperationNotSupportedError |
|
NgsiResourceNotFoundError |
|
NgsiInternalError |
|
NgsiTooComplexQueryError |
|
NgsiTooManyResultsError |
|
NgsiLdContextNotAvailableError |
|
NgsiNoMultiTenantSupportError |
|
NgsiNonexistentTenantError |
Nominal vs Unattended exceptions
from ngsildclient import Entity, Client, NgsiAlreadyExistsError, NgsiContextBrokerError, NgsiError
try:
entity = Entity("AirQualityObserved", "Bordeaux-AirProbe42-2022-03-24T09:00:00Z")
client.create(entity)
except NgsiAlreadyExistsError:
pass # silently ignore
except NgsiContextBrokerError as e:
print(e.problemdetails)
except NgsiError as e:
print(e)
Note
The library provides advanced methods to tackle intricacies of entity creation, such as an upsert() method.
- 2
IETF RFC 7807: Problem Details for HTTP APIs
- ETSI_GS_CIM_009_1.5.1
Context Information Management (CIM) NGSI-LD API Guidelines for Modelling with NGSI-LD ETSI Group Specification
- NGSI_LD_tutorial
NGSI-LD Subscriptions tutorial for the FIWARE system NGSI-LD tutorial