User Community Service Desk Downloads
If you can't find the product or version you're looking for, visit support.ataccama.com/downloads

Streaming Interface

The Streaming interface processes records from a given source (JMS queue, Kafka topic, or a Salesforce channel) and loads them to the MDM Hub. Records are processed in batches when the batch size reaches a defined number or within a specified time-frame, whichever happens first. The benefit is near real-time integration and increased record throughput thanks to the micro-batching of records.

The configured stream consumers are generated into Files/etc/nme-stream.gen.xml, which is referenced in the MDM configuration file (see MDM Configuration).

nme-stream.gen.xml
<stream>
    <consumers>
        <consumer name="Web_Life_JMS_Stream">
            <!-- consumer configuration -->
        </consumer>

        <!-- more consumers -->

    </consumers>
</stream>

Stream Consumer

A stream consumer is responsible for reading messages from some stream, batching them, and sending them for further processing. A consumer consists of the following parts:

  • Stream source

  • Batching settings

  • Data load processor

  • Message transformer

  • Error handling strategy

Sample Consumer
<consumer name="Web_Life_JMS_Stream">
    <source class="com.ataccama.nme.engine.stream.JmsStreamSource">
        <connectionName>esbEvents</connectionName>
        <inputDestination>esb.event.queue</inputDestination>
        <messageSelector>JMSType = 'mdm_change'</messageSelector>
    </source>

    <!-- batching - time and window - decide when to run batch - count_of_records or time_elapsed since last run (and >0 records) -->
    <batching>
        <count>20</count>
        <seconds>10</seconds>
    </batching>

    <processor class="com.ataccama.nme.engine.stream.DeltaLoad">
        <transformer class="com.ataccama.nme.dqc.stream.PlanTransformer" planFileName="../engine/stream/web_life.comp"/>
    </processor>

    <errorHandler>
        <!-- See section Error Handling Strategy for further XML configuration -->
    </errorHandler>
</consumer>

Batching Settings

When a stream consumer is active, it receives messages from a configured source and puts them into the batch. Two conditions determine when the batch is processed:

Name Default value Description

count

100

Maximum number of messages processed in one batch. When this number of messages is received, processing starts.

seconds

10

Maximum amount of time allowed between message reception and the start of processing.

When one of the batching conditions is met (count of messages or waiting time), the batch is processed.

Processing is asynchronous, but serialized: * When processing of a batch is running, new messages are received, therefore timing condition is precise. * If there are enough messages to start a new batch but the previous one is still being processed, consumer waits for the processing to complete.

Because stream consumers batch multiple messages into one write transaction, duplicate records can be put together into one batch (from multiple messages). In such case, the consumer automatically chooses the record with the latest source_timestamp. Therefore it is important to provide source_timestamp. If there is no information about time validity, header JMSTimestamp can be used as the last resort.

Stream Source

The stream source is a connection to a particular stream type. At the moment, JMS, Kafka and Salesforce are supported.

JMS

JMS Source
<source class="com.ataccama.nme.engine.stream.JmsStreamSource">
    <connectionName>esbEvents</connectionName>
    <inputDestination>esb.event.queue</inputDestination>
</source>

Connection properties:

Name Description

connectionName

Name of the JMS connection defined in JMS Provider Component (see 13.2.x@one-runtime-server:ROOT:jms-provider-component.adoc).

inputDestination

Name of input destination (JMS queue).

messageSelector

JMS consumer message filter which is defined like an SQL WHERE clause.

Kafka

Kafka Source
<source class="com.ataccama.nme.engine.stream.KafkaStreamSource">
    <kafkaProperties>
        <kafkaProperty>
            <key>group.id</key>
            <value>id</value>
        </kafkaProperty>
    </kafkaProperties>
    <readFromBeginning>true</readFromBeginning>
    <kafkaResource>kafka_server</kafkaResource>
    <kafkaTopics>
        <topic>MDM_topic</topic>
    </kafkaTopics>
</source>

Connection properties:

Name Description

kafkaProperty.key

A Kafka property key.

kafkaProperty.value

A Kafka property value.

readFromBeginning

A boolean flag forcing the consumer to read messages in the topic from the very beginning on server startup.

This setting is useful for testing and development purposes when it is necessary to configure data-reading algorithms. Do not use on production environments.

kafkaResource

The name of connection to a Kafka server in runtime configuration (see see 13.2.x@one-runtime-server:ROOT:runtime-configuration.adoc).

kafkaTopics.topic

Kafka topic name.

Salesforce

Salesforce Source
<source class="com.ataccama.nme.engine.stream.SalesforceStreamSource">
    <serverName>SFServer</serverName>
    <channelName>/data/AccountChangeEvent</channelName>
</source>

Connection properties:

Name Description

serverName

Name of the configured Salesforce server (see 13.2.x@one-desktop:ROOT:creating-a-new-server-connection.adoc).

channelName

Name of the Salesforce channel.

Data Load Processor

Load processor is responsible for processing input records, that is, loading them into the hub. To load messages as record updates, DeltaLoad is used.

<processor class="com.ataccama.nme.engine.stream.DeltaLoad"> <!-- default: sourceDeletionStrategy="deactivate" -->
    <transformer class="com.ataccama.nme.dqc.stream.PlanTransformer" planFileName="../engine/stream/web_life.comp" />
    <!-- optional -->
    <entities>
        <entity name="party" />
            <columns>
                <column>src_type</column>
                <column>src_last_name</column>
            </columns>
        <entity name="address" />

        <!-- more entities -->
    </entities>
</processor>

Message Transformer

The message transformer is responsible for transforming messages (that is, the string stored in the message body) to input records. The input records are then passed to the data load processor.

Implementations

There are several types of message transformers.

Plan Transformer

PlanTransformer relies on a configured plan to read and parse the message and transform them into MDM-compatible input records.

<transformer class="com.ataccama.nme.dqc.stream.PlanTransformer" planFileName="../engine/stream/web_life.comp">
    <headers>
        <header header="JMSTimestamp" column="hdr_timestamp" />
    </headers>
</transformer>

The plan referenced in PlanTransformer must have:

  • one Integration Input named "in" with one column "message" of type STRING and optionally other columns based on header configuration.

  • one Integration Output per instance entity - same as in batch load plans.

Process Delta Transformer

ProcessDeltaTransformer reuses the native ProcessDelta service (see Native Services) to transform the messages.

Two message formats are supported:

  • XML RPC. Simple XML format. This format requires that source_timestamp is part of the message.

    <transformer class="com.ataccama.nme.engine.stream.ProcessDeltaTransformer">
        <headers>
            <header column="source_timestamp" header="JMSTimestamp"/>
        </headers>
    <transformer/>
  • SOAP. XML wrapped in a SOAP envelope. This format requires that the procesDelta name is specified as well as the list of list of elements to traverse from the root element to the processDelta element.

    <transformer class="com.ataccama.nme.engine.stream.ProcessDeltaTransformer">
        <requestPath>
            <el>Envelope</el>
            <el>Body</el>
            <el>processDelta</el>
        </requestPath>
        <headers>
            <header column="source_timestamp" header="JMSTimestamp"/>
        </headers>
    </transformer>

JMS Headers

The following standard JMS properties are supported (and available as headers):

JMS header name Data type Description from JMS standard

JMSTimestamp

DATETIME

The JMSTimestamp header field contains the time a message was handed off to a provider to be sent.

JMSMessageID

STRING

The JMSMessageID header field contains a value that uniquely identifies each message sent by a provider.

JMSPriority

INTEGER

The JMS API defines ten levels of priority values, with 0 as the lowest priority and 9 as the highest.

JMSCorrelationID

STRING

This method is used to return correlation ID values that are either provider-specific message IDs or application-specific string values.

JMSType

STRING

Gets the message type identifier supplied by the client when the message was sent.

Other custom JMS properties are generated under <headers> too.

Entities

Optional configuration. If no entities are specified, all entities are used for streaming. It is also possible to specify the columns that should be processed for each entity. In this case, all other columns will be ignored. If the column is listed and not included in a stream message, the column value in the hub is set to null.

Error Handling Strategy

Configuring an error handling strategy allows you to have control over issues with loading data from the stream consumer. There are 3 parts to configuring the error handler:

  • Reconnection Total Time - If the consumer fails to connect to the streaming source, it will try to reconnect in increasing intervals (first attempt after 1 second, then the interval doubles after every unsuccessful attempt). The maxReconnectTotalTime defines the total time in seconds after which the consumer stops reconnecting. The default value is 600 seconds.

  • Threshold - Defines how many errors are allowed before the consumer stops. Defined using the windowSize and threshold attributes. If Window Size is set to 20 and threshold to 2, this means that if 2/20 of the messages fail, the consumer will stop. The window size acts as a sliding window taking into account the last 20 messages consumed.

  • Error consumers - Destinations where failed messages are forwarded

    • Log to file - Failed messages are forwarded to a file. It is possible to define datetime in the name of the file.

    • Send to JMS or Send to Kafka - Failed messages are forwarded to the configured destination based on the stream consumer used.

The example below shows a configuration for each error consumer.

Example Error Handling Strategy
<consumer name="Web_Life_JMS_Stream">
    <!-- ... -->
    <errorHandler>
        <maxReconnectTotalTime>600</maxReconnectTotalTime> <!-- number of seconds of total time trying to reconnect - default is 600 = 10 minutes. -->

        <!-- how long history of message load result should be remembered -->
        <windowSize>100</windowSize>
        <!-- if number of messages failed to load (within the window) reaches this threshold, consumer will stop -->
        <threshold>2</threshold>

        <consumers>
            <consumer class="com.ataccama.nme.engine.stream.error.SendToJms">
                <connectionName>testQueue</connectionName>
                <destinationName>dynamicQueues/party.errorHandler.jms</destinationName>
            </consumer>
            <consumer class="com.ataccama.nme.engine.stream.error.LogToFile">
                <!-- logFile cna contain datetime pattern enclosed in ${pattern}-->
                <logFile>jms-errors-${yyyy-MM-dd}.log</logFile>
            </consumer>
            <consumer class="com.ataccama.nme.engine.stream.error.SendToKafka">
                <topic>test-errors</topic>
                <kafkaResource>kafkaConnection1</kafkaResource>
            </consumer>
        </consumers>
    </errorHandler>
</consumer>

Other Practical Information

  • Stream consumers can be started and stopped from the ONE Runtime Server Admin under MD Interfaces (see MD Interfaces).

  • Stream from the command line (see OnlineCtl Commands).

  • Stream consumers are stopped on server startup. This can be changed by the runtime property nme.stream.consumers.active (default value is false).

Was this page useful?