Streaming Interface
The Streaming interface processes records from a given source (JMS queue, Kafka topic, Amazon SQS queue, or a Salesforce channel) and loads them to the MDM Hub. Records are processed in batches when the batch size reaches a defined number or within a specified time-frame, whichever happens first. The benefit is near real-time integration and increased record throughput thanks to the micro-batching of records.
Most streaming technologies support two modes of message acknowledgement:
MDM Streaming interface uses client acknowledgement - MDM first receives messages from the stream source, processes them, and after successful processing of data changes, messages are acknowledged. This means that if MDM fails to process the messages, such failed messages are not acknowledged and are kept in the stream source - either for further processing, or to be handled by the stream source itself (there can be technology-specific error handling in place). |
The configured stream consumers are generated into Files/etc/nme-stream.gen.xml
, which is referenced in the MDM configuration file (see MDM Configuration).
<stream>
<consumers>
<consumer name="Web_Life_JMS_Stream">
<!-- consumer configuration -->
</consumer>
<!-- more consumers -->
</consumers>
</stream>
Stream Consumer
A stream consumer is responsible for reading messages from some stream, batching them, and sending them for further processing. A consumer consists of the following parts:
-
Stream source
-
Batching settings
-
Data load processor
-
Message transformer
-
Error handling strategy
<consumer name="Web_Life_JMS_Stream">
<source class="com.ataccama.nme.engine.stream.JmsStreamSource">
<connectionName>esbEvents</connectionName>
<inputDestination>esb.event.queue</inputDestination>
<messageSelector>JMSType = 'mdm_change'</messageSelector>
</source>
<!-- batching - time and window - decide when to run batch - count_of_records or time_elapsed since last run (and >0 records) -->
<batching>
<count>20</count>
<seconds>10</seconds>
</batching>
<processor class="com.ataccama.nme.engine.stream.DeltaLoad">
<transformer class="com.ataccama.nme.dqc.stream.PlanTransformer" planFileName="../engine/stream/web_life.comp"/>
</processor>
<errorHandler>
<!-- See section Error Handling Strategy for further XML configuration -->
</errorHandler>
</consumer>
Batching Settings
When a stream consumer is active, it receives messages from a configured source and puts them into the batch. Two conditions determine when the batch is processed:
Name | Default value | Description |
---|---|---|
count |
100 |
Maximum number of messages processed in one batch. When this number of messages is received, processing starts. |
seconds |
10 |
Maximum amount of time allowed between message reception and the start of processing. |
When one of the batching conditions is met (count of messages or waiting time), the batch is processed.
Processing is synchronous, meaning that when the stream consumer is processing a batch of messages, it is not receiving new messages. Once the processing of the batch is finished, the stream consumer goes back to receiving mode and batching conditions apply again.
Because stream consumers batch multiple messages into one write transaction, duplicate records can be put together into one batch (from multiple messages).
In such case, the consumer automatically chooses the record with the latest source_timestamp .
Therefore it is important to provide source_timestamp.
If there is no information about time validity, header JMSTimestamp can be used as the last resort.
|
Stream Source
The stream source is a connection to a particular stream type. At the moment, JMS, Kafka, Amazon SQS, and Salesforce are supported.
JMS
<source class="com.ataccama.nme.engine.stream.JmsStreamSource">
<connectionName>esbEvents</connectionName>
<inputDestination>esb.event.queue</inputDestination>
</source>
Connection properties:
Name | Description |
---|---|
connectionName |
Name of the JMS connection defined in JMS Provider Component (see JMS Provider Component). |
inputDestination |
Name of input destination (JMS queue). |
messageSelector |
JMS consumer message filter which is defined like an SQL WHERE clause. |
Kafka
<source class="com.ataccama.nme.engine.stream.KafkaStreamSource">
<inputFormat>String</inputFormat>
<kafkaProperties>
<kafkaProperty>
<key>group.id</key>
<value>id</value>
</kafkaProperty>
</kafkaProperties>
<readFromBeginning>true</readFromBeginning>
<kafkaResource>kafka_server</kafkaResource>
<kafkaTopics>
<topic>MDM_topic</topic>
</kafkaTopics>
</source>
Connection properties:
Name | Description | ||
---|---|---|---|
inputFormat |
Streaming input format.
Possible values are |
||
kafkaProperty.key |
A Kafka property key. |
||
kafkaProperty.value |
A Kafka property value. |
||
readFromBeginning |
A boolean flag forcing the consumer to read messages in the topic from the very beginning on server startup.
|
||
kafkaResource |
The name of connection to a Kafka server in runtime configuration (see Runtime Configuration). |
||
kafkaTopics.topic |
Kafka topic name. |
Amazon SQS
<source class="com.ataccama.nme.engine.stream.SqsStreamSource">
<connectionName>awssqsmdm</connectionName>
</source>
Connection properties:
Name | Description |
---|---|
connectionName |
Name of the SQS connection. |
Salesforce
<source class="com.ataccama.nme.engine.stream.SalesforceStreamSource">
<serverName>SFServer</serverName>
<channelName>/data/AccountChangeEvent</channelName>
</source>
Connection properties:
Name | Description |
---|---|
serverName |
Name of the configured Salesforce server (see Connect to a Server). |
channelName |
Name of the Salesforce channel. |
Data Load Processor
Load processor is responsible for processing input records, that is, loading them into the hub. To load messages as record updates, DeltaLoad is used.
<processor class="com.ataccama.nme.engine.stream.DeltaLoad"> <!-- default: sourceDeletionStrategy="deactivate" -->
<transformer class="com.ataccama.nme.dqc.stream.PlanTransformer" planFileName="../engine/stream/web_life.comp" />
<!-- optional -->
<entities>
<entity name="party" />
<columns>
<column>src_type</column>
<column>src_last_name</column>
</columns>
<entity name="address" />
<!-- more entities -->
</entities>
</processor>
Message Transformer
The message transformer is responsible for transforming messages (that is, the string stored in the message body) to input records. The input records are then passed to the data load processor.
Implementations
There are several types of message transformers.
Plan Transformer
PlanTransformer
relies on a configured plan to read and parse the message and transform them into MDM-compatible input records.
<transformer class="com.ataccama.nme.dqc.stream.PlanTransformer" planFileName="../engine/stream/web_life.comp">
<headers>
<header header="JMSTimestamp" column="hdr_timestamp" />
</headers>
</transformer>
The plan referenced in PlanTransformer
must have:
-
one Integration Input named "in" with one column "message" of type STRING and optionally other columns based on header configuration.
-
one Integration Output per instance entity - same as in batch load plans.
Process Delta Transformer
ProcessDeltaTransformer
reuses the native ProcessDelta service (see Native Services) to transform the messages.
Two message formats are supported:
-
XML RPC. Simple XML format. This format requires that source_timestamp is part of the message.
<transformer class="com.ataccama.nme.engine.stream.ProcessDeltaTransformer"> <headers> <header column="source_timestamp" header="JMSTimestamp"/> </headers> <transformer/>
-
SOAP. XML wrapped in a SOAP envelope. This format requires that the procesDelta name is specified as well as the list of list of elements to traverse from the root element to the processDelta element.
<transformer class="com.ataccama.nme.engine.stream.ProcessDeltaTransformer"> <requestPath> <el>Envelope</el> <el>Body</el> <el>processDelta</el> </requestPath> <headers> <header column="source_timestamp" header="JMSTimestamp"/> </headers> </transformer>
JMS Headers
The following standard JMS properties are supported (and available as headers):
JMS header name | Data type | Description from JMS standard |
---|---|---|
JMSTimestamp |
DATETIME |
The |
JMSMessageID |
STRING |
The |
JMSPriority |
INTEGER |
The JMS API defines ten levels of priority values, with 0 as the lowest priority and 9 as the highest. |
JMSCorrelationID |
STRING |
This method is used to return correlation ID values that are either provider-specific message IDs or application-specific string values. |
JMSType |
STRING |
Gets the message type identifier supplied by the client when the message was sent. |
Other custom JMS properties are generated under <headers>
too.
Entities
Optional configuration. If no entities are specified, all entities are used for streaming. It is also possible to specify the columns that should be processed for each entity. In this case, all other columns will be ignored. If the column is listed and not included in a stream message, the column value in the hub is set to null.
Error Handling Strategy
Configuring an error handling strategy allows you to have control over issues with loading data from the stream consumer. There are 3 parts to configuring the error handler:
-
Reconnection Total Time - If the consumer fails to connect to the streaming source, it will try to reconnect in increasing intervals (first attempt after 1 second, then the interval doubles after every unsuccessful attempt). The
maxReconnectTotalTime
defines the total time in seconds after which the consumer stops reconnecting. The default value is 600 seconds. -
Threshold - Defines how many errors are allowed before the consumer stops. Defined using the
windowSize
andthreshold
attributes. If Window Size is set to 20 and threshold to 2, this means that if 2/20 of the messages fail, the consumer will stop. The window size acts as a sliding window taking into account the last 20 messages consumed. -
Error consumers - Destinations where failed messages are forwarded
-
Log to file - Failed messages are forwarded to a file. It is possible to define
datetime
in the name of the file. -
Send to JMS or Send to Kafka - Failed messages are forwarded to the configured destination based on the stream consumer used.
-
The example below shows a configuration for each error consumer.
<consumer name="Web_Life_JMS_Stream">
<!-- ... -->
<errorHandler>
<maxReconnectTotalTime>600</maxReconnectTotalTime> <!-- number of seconds of total time trying to reconnect - default is 600 = 10 minutes. -->
<!-- how long history of message load result should be remembered -->
<windowSize>100</windowSize>
<!-- if number of messages failed to load (within the window) reaches this threshold, consumer will stop -->
<threshold>2</threshold>
<consumers>
<consumer class="com.ataccama.nme.engine.stream.error.SendToJms">
<connectionName>testQueue</connectionName>
<destinationName>dynamicQueues/party.errorHandler.jms</destinationName>
</consumer>
<consumer class="com.ataccama.nme.engine.stream.error.LogToFile">
<!-- logFile cna contain datetime pattern enclosed in ${pattern}-->
<logFile>jms-errors-${yyyy-MM-dd}.log</logFile>
</consumer>
<consumer class="com.ataccama.nme.engine.stream.error.SendToKafka">
<topic>test-errors</topic>
<kafkaResource>kafkaConnection1</kafkaResource>
</consumer>
</consumers>
</errorHandler>
</consumer>
Other Practical Information
-
Stream consumers can be started and stopped from the MDM Admin Center (see MDM Web App Admin Center).
-
Stream consumers can also be started and stopped using REST API (see REST API).
-
Stream consumers are stopped on server startup. This can be changed by the runtime property
nme.stream.consumers.active
(default value is false).
Was this page useful?