MMM Events API
The MMM eventing system is mainly intended to support the external events workflow, however, the underlying GraphQL API also has a role in a variety of integration tasks. For more information about how external events are configured, see MMM Configuration, section External events. For more information about the Notification Handler in ONE Runtime Server, see [notifications-handler-and-notification-handler-server-component].
The purpose of the guide is to explain how you can subscribe to MMM events through GraphQL and provide you with basic usage examples. For a brief overview of some key concepts in GraphQL, see ONE API. For a complete guide about working with GraphQL, refer to the official GraphQL tutorials: Introduction to GraphQL. |
Subscribe to events
To subscribe to MMM external events, use the following structure:
subscription ($id: GID!, $ackLimit: Int!) {
_externalEvents(subscriptionId: $id, entity_type: $entityType, ackLimit: $ackLimit, chunkSize:10, chunkMaxDelay:2000, excludeOwnModifications:false){
events{
id
timestamp
entityId
entityType
entityStatus
}
}
}
The subscription can be customized by defining which events the client should subscribe to (entity_type
) as well by limiting the number of roundtrips the client makes, which is done through batching events into larger chunks (chunkSize
and chunkMaxDelay
).
To prevent overwhelming the client with events, processed events need to be acknowledged (ackLimit
), after which MMM can continue to send newly created events.
As the eventing system supports disconnected scenarios, no events are missed in case the client goes offline. |
The following table provides an overview of subscription parameters:
Parameter | Data Type | Required | Description |
---|---|---|---|
Basic Parameters |
|||
|
UUID |
Yes |
The unique subscription identifier generated by the client. |
|
Long |
No |
If specified, this event and all preceding events are considered as acknowledged. |
|
Boolean |
No |
If set to |
Chunking Parameters |
|||
|
Integer |
No |
The maximum number of events that the system can send at once. |
|
Integer |
No |
The maximum amount of time that the system waits before sending a chunk of events, regardless of the current number of events in the chunk. Expressed in milliseconds. |
|
Integer |
No |
The maximum number of events that the system delivers after the last acknowledged event. Unless acknowledged, this number of messages might be sent multiple times. |
Filtering Parameters |
|||
|
UUID |
No |
Enables subscribing to events for a particular MMM entity. |
|
String |
No |
Enables subscribing to events for a particular entity type. |
|
UUID |
No |
Enables subscribing to events for child entities of a particular MMM entity. |
|
String |
No |
Enables subscribing to a particular type of events (either |
Event data consists of the following elements for each event:
-
id
: The identifier of the event, numbered in an increasing sequence. -
timestamp
: The date and time when the event was initiated, in XML format. -
entityId
: The unique identifier of the entity associated with the given event. -
entityType
: The entity type. -
entityStatus
: Describes how the entity was modified (CREATED
,UPDATED
,DELETED
).
{
"id":1,
"timestamp":"2021-05-25T09:03:23.730010Z",
"entityId":"cd456b86-157f-4432-b5df-c5602d5b9af5",
"entityType":"tableAttribute",
"entityStatus":"UPDATED"
}
Acknowledge events
After the received events have been processed, they need to be acknowledged (at least after every ackLimit
number of events).
This can be done using the following operation:
mutation($id: GID!, $lastEventId: Long!){
_acknowledgeExternalEvents(subscriptionId: $id, lastEventId: $lastEventId)
}
Unsubscribe
In case you no longer want to receive events, you can unregister the subscription using the following operation:
mutation($id: GID!){
_unsubscribeExternalEvents(subscriptionId: $id)
}
If you do not remove a subscription this way, it is automatically discarded after the period of time configured using the property plugin.external-events.ataccama.one.externalevents.subscribers-retention-period
.
For more information, see MMM Configuration, section External events.
Retrieve subscription status
The subscription status can be obtained using the following query.
In addition to the subscription status (status
), the query returns the subscription identifier (subscriptionId
) and the number of undelivered events (undeliveredEventCount
).
query status($id: GID!){
_externalEventsSubscriptionStatus(subscriptionId:$id){
subscriptionId
undeliveredEventCount
status
}
}
Client subscription example
The following example shows how to enable an event subscription on the client side.
Client subscription example
import asyncio
import base64
import datetime
from asyncio import CancelledError
from aiohttp import BasicAuth
from gql import gql, Client
from gql.transport.aiohttp import AIOHTTPTransport
from gql.transport.exceptions import TransportQueryError
from gql.transport.websockets import WebsocketsTransport
# https://gql.readthedocs.io/en/latest/usage/index.html
class MyClient:
def __init__(self, base_url, subscription_id, user_name, password, ack_limit=10):
self.subscription_id = subscription_id
http_transport = AIOHTTPTransport(url=f"http://{base_url}/graphql", auth=BasicAuth(user_name, password))
self.http_client = Client(transport=http_transport, fetch_schema_from_transport=False)
auth_header = f"Basic {base64.b64encode(f'{user_name}:{password}'.encode('ascii')).decode('ascii')}"
ws_transport = WebsocketsTransport(url=f"ws://{base_url}/subscriptions", init_payload={
"authorization": auth_header,
}, close_timeout=600, ack_timeout=600)
self.ws_client = Client(transport=ws_transport, fetch_schema_from_transport=False)
self.ack_limit = ack_limit
self.event_count_to_ack = 0
async def subscribe(self, entity_type=None):
filter_fragment = f' entityType:"{entity_type}",' if entity_type else ""
# You can specify subscription parameters to get events about specific entities: entityType/Parent/Id
# You can also specify transport details like ackLimit, chunkSize, chunkMaxDelay
query_text = f"""subscription ($id: GID!, $ackLimit: Int!) {{
_externalEvents(subscriptionId: $id, {filter_fragment}ackLimit: $ackLimit, chunkSize:10, chunkMaxDelay:2000, excludeOwnModifications:false){{
events{{
id
timestamp
entityId
entityType
entityStatus
}}
}}
}}"""
query = gql(query_text)
try:
params = {"id": self.subscription_id, "ackLimit": str(self.ack_limit)}
async for result in self.ws_client.subscribe_async(query, params):
await self.__process_events(result)
except CancelledError:
pass
async def __process_events(self, result):
print(datetime.datetime.now(), result)
events = result["_externalEvents"]["events"]
self.event_count_to_ack += len(events)
if self.event_count_to_ack >= self.ack_limit:
last_id = events[-1]["id"]
await self.__acknowledge(last_id)
self.event_count_to_ack = 0
async def __acknowledge(self, last_id):
query = gql("""mutation($id: GID!, $lastEventId: Long!){
_acknowledgeExternalEvents(subscriptionId: $id, lastEventId: $lastEventId)
}""")
params = {"id": self.subscription_id, "lastEventId": str(last_id)}
try:
result = await self.http_client.execute_async(query, params)
print(f"ack ({last_id}), result: [{result}]")
except TransportQueryError as e:
print(f"ack failed: {e.errors}")
async def unregister(self):
query = gql("""mutation($id: GID!){
_unsubscribeExternalEvents(subscriptionId: $id)
}""")
params = {"id": self.subscription_id}
try:
result = await self.http_client.execute_async(query, params)
print(f"unregister result {result}")
except TransportQueryError as e:
print(f"unregister failed: {e.errors}")
async def get_status(self):
query = gql("""query status($id: GID!){
_externalEventsSubscriptionStatus(subscriptionId:$id){
subscriptionId
undeliveredEventCount
status
}
}""")
params = {"id": self.subscription_id}
try:
result = await self.http_client.execute_async(query, params)
print(f"get_status, result: [{result}]")
except TransportQueryError as e:
print(f"get_status failed: {e.errors}")
async def main():
client = MyClient("localhost:8021", "ae92c852-43a7-4942-b274-29aa252c205b", "admin", "admin")
await client.get_status()
#await client.unregister()
print("Listening for events")
await client.subscribe(entity_type=None)
asyncio.run(main())
Was this page useful?