User Community Service Desk Downloads
If you can't find the product or version you're looking for, visit support.ataccama.com/downloads

MMM Events API

The MMM eventing system is mainly intended to support the external events workflow, however, the underlying GraphQL API also has a role in a variety of integration tasks. For more information about how external events are configured, see MMM Configuration, section External events. For more information about the Notification Handler in ONE Runtime Server, see Notifications Handler.

The purpose of the guide is to explain how you can subscribe to MMM events through GraphQL and provide you with basic usage examples. For a brief overview of some key concepts in GraphQL, see ONE API.

For a complete guide about working with GraphQL, refer to the official GraphQL tutorials: Introduction to GraphQL.

Subscribe to events

To subscribe to MMM external events, use the following structure:

GraphQL operation for subscribing to MMM events
subscription ($id: GID!, $ackLimit: Int!) {
    _externalEvents(subscriptionId: $id, entity_type: $entityType, ackLimit: $ackLimit, chunkSize:10, chunkMaxDelay:2000, excludeOwnModifications:false){
        events{
            id
            timestamp
            entityId
            entityType
            entityStatus
        }
    }
}

The subscription can be customized by defining which events the client should subscribe to (entity_type) as well by limiting the number of roundtrips the client makes, which is done through batching events into larger chunks (chunkSize and chunkMaxDelay). To prevent overwhelming the client with events, processed events need to be acknowledged (ackLimit), after which MMM can continue to send newly created events.

As the eventing system supports disconnected scenarios, no events are missed in case the client goes offline.

The following table provides an overview of subscription parameters:

Parameter Data Type Required Description

Basic Parameters

subscriptionId

UUID

Yes

The unique subscription identifier generated by the client.

lastEventId

Long

No

If specified, this event and all preceding events are considered as acknowledged.

excludeOwnModifications

Boolean

No

If set to true, the user does not receive events they triggered themselves. This helps prevent creating infinite loops of generating events, which could occur in cases when you need to update an entity based on a specific event involving the same entity.

Chunking Parameters

chunkSize

Integer

No

The maximum number of events that the system can send at once.

chunkMaxDelay

Integer

No

The maximum amount of time that the system waits before sending a chunk of events, regardless of the current number of events in the chunk. Expressed in milliseconds.

ackLimit

Integer

No

The maximum number of events that the system delivers after the last acknowledged event. Unless acknowledged, this number of messages might be sent multiple times.

Filtering Parameters

entityId

UUID

No

Enables subscribing to events for a particular MMM entity.

entityType

String

No

Enables subscribing to events for a particular entity type.

entityAncestorId

UUID

No

Enables subscribing to events for child entities of a particular MMM entity.

entityStatus

String

No

Enables subscribing to a particular type of events (either CREATED, UPDATED, or DELETED only).

Event data consists of the following elements for each event:

  • id: The identifier of the event, numbered in an increasing sequence.

  • timestamp: The date and time when the event was initiated, in XML format.

  • entityId: The unique identifier of the entity associated with the given event.

  • entityType: The entity type.

  • entityStatus: Describes how the entity was modified (CREATED, UPDATED, DELETED).

Event data example
{
    "id":1,
    "timestamp":"2021-05-25T09:03:23.730010Z",
    "entityId":"cd456b86-157f-4432-b5df-c5602d5b9af5",
    "entityType":"tableAttribute",
    "entityStatus":"UPDATED"
}

Acknowledge events

After the received events have been processed, they need to be acknowledged (at least after every ackLimit number of events). This can be done using the following operation:

GraphQL operation for acknowledging events
mutation($id: GID!, $lastEventId: Long!){
    _acknowledgeExternalEvents(subscriptionId: $id, lastEventId: $lastEventId)
}

Unsubscribe

In case you no longer want to receive events, you can unregister the subscription using the following operation:

GraphQL operation for unregistering a subscription
mutation($id: GID!){
    _unsubscribeExternalEvents(subscriptionId: $id)
}

If you do not remove a subscription this way, it is automatically discarded after the period of time configured using the property plugin.external-events.ataccama.one.externalevents.subscribers-retention-period. For more information, see MMM Configuration, section External events.

Retrieve subscription status

The subscription status can be obtained using the following query. In addition to the subscription status (status), the query returns the subscription identifier (subscriptionId) and the number of undelivered events (undeliveredEventCount).

GraphQL query for retrieving subscription status
query status($id: GID!){
    _externalEventsSubscriptionStatus(subscriptionId:$id){
        subscriptionId
        undeliveredEventCount
        status
    }
}

Client subscription example

The following example shows how to enable an event subscription on the client side.

Client subscription example
import asyncio
import base64
import datetime
from asyncio import CancelledError

from aiohttp import BasicAuth
from gql import gql, Client
from gql.transport.aiohttp import AIOHTTPTransport
from gql.transport.exceptions import TransportQueryError
from gql.transport.websockets import WebsocketsTransport


# https://gql.readthedocs.io/en/latest/usage/index.html

class MyClient:
    def __init__(self, base_url, subscription_id, user_name, password, ack_limit=10):
        self.subscription_id = subscription_id

        http_transport = AIOHTTPTransport(url=f"http://{base_url}/graphql", auth=BasicAuth(user_name, password))
        self.http_client = Client(transport=http_transport, fetch_schema_from_transport=False)

        auth_header = f"Basic {base64.b64encode(f'{user_name}:{password}'.encode('ascii')).decode('ascii')}"
        ws_transport = WebsocketsTransport(url=f"ws://{base_url}/subscriptions", init_payload={
            "authorization": auth_header,
        }, close_timeout=600, ack_timeout=600)

        self.ws_client = Client(transport=ws_transport, fetch_schema_from_transport=False)
        self.ack_limit = ack_limit
        self.event_count_to_ack = 0

    async def subscribe(self, entity_type=None):
        filter_fragment = f' entityType:"{entity_type}",' if entity_type else ""
        # You can specify subscription parameters to get events about specific entities: entityType/Parent/Id
        # You can also specify transport details like ackLimit, chunkSize, chunkMaxDelay
        query_text = f"""subscription ($id: GID!, $ackLimit: Int!) {{
        _externalEvents(subscriptionId: $id, {filter_fragment}ackLimit: $ackLimit, chunkSize:10, chunkMaxDelay:2000, excludeOwnModifications:false){{
            events{{
                id
                timestamp
                entityId
                entityType
                entityStatus
            }}
        }}
    }}"""

        query = gql(query_text)
        try:
            params = {"id": self.subscription_id, "ackLimit": str(self.ack_limit)}

            async for result in self.ws_client.subscribe_async(query, params):
                await self.__process_events(result)
        except CancelledError:
            pass

    async def __process_events(self, result):
        print(datetime.datetime.now(), result)
        events = result["_externalEvents"]["events"]
        self.event_count_to_ack += len(events)
        if self.event_count_to_ack >= self.ack_limit:
            last_id = events[-1]["id"]
            await self.__acknowledge(last_id)
            self.event_count_to_ack = 0

    async def __acknowledge(self, last_id):
        query = gql("""mutation($id: GID!, $lastEventId: Long!){
      _acknowledgeExternalEvents(subscriptionId: $id, lastEventId: $lastEventId)
    }""")
        params = {"id": self.subscription_id, "lastEventId": str(last_id)}

        try:
            result = await self.http_client.execute_async(query, params)
            print(f"ack ({last_id}), result: [{result}]")
        except TransportQueryError as e:
            print(f"ack failed: {e.errors}")

    async def unregister(self):
        query = gql("""mutation($id: GID!){
      _unsubscribeExternalEvents(subscriptionId: $id)
    }""")
        params = {"id": self.subscription_id}

        try:
            result = await self.http_client.execute_async(query, params)
            print(f"unregister result {result}")
        except TransportQueryError as e:
            print(f"unregister failed: {e.errors}")

    async def get_status(self):
        query = gql("""query status($id: GID!){
    _externalEventsSubscriptionStatus(subscriptionId:$id){
        subscriptionId
        undeliveredEventCount
        status
    }
  }""")
        params = {"id": self.subscription_id}

        try:
            result = await self.http_client.execute_async(query, params)
            print(f"get_status, result: [{result}]")
        except TransportQueryError as e:
            print(f"get_status failed: {e.errors}")


async def main():
    client = MyClient("localhost:8021", "ae92c852-43a7-4942-b274-29aa252c205b", "admin", "admin")
    await client.get_status()
    #await client.unregister()
    print("Listening for events")
    await client.subscribe(entity_type=None)


asyncio.run(main())

Was this page useful?