User Community Service Desk Downloads
If you can't find the product or version you're looking for, visit support.ataccama.com/downloads

Configuring Stream Consumers

Streaming interface is a feature that lets MDM consume messages from streaming platforms and transform them into input records for MDM processing. See Features for more information and Streaming Interface for reference documentation on each of the settings. The currently implemented integrations are with JMS, Kafka, and Salesforce.

You can define several stream consumers, each with unique batching settings and a list of processed entities.

General stream consumer configuration

  1. Open the MDM project in ONE Desktop.

  2. Right-click Streaming and select New Consumer.

    Adding new consumer
  3. Select the stream source type: JMS, Kafka, or Salesforce.

  4. Fill in a unique Name and provide an optional Description.

  5. Select the Enable option.

  6. Specify the connection details:

    • For JMS, under Stream Source:

      1. In Connection Name, provide the name of a previously defined JMS connection (in the File Explorer perspective under Servers, see Connect to a Server).

      2. In the Input JMS Source field, provide the name of the JMS queue from which messages will be read.

    • For Kafka:

      1. In Resource Name, provide the name of a previously defined Kafka server connection (in the File Explorer perspective under Servers, see Connect to a Server).

      2. In Consumed Topic, provide the name of the Kafka topic from which records will be read.

      3. Specify group.id.

      4. Select Input Format:

        • String - Default string format.

        • Avro - Avro JSON format.

      5. In Properties, specify any other necessary Kafka properties.

        For SSL, add the following properties:

        Key Value

        security.protocol

        SSL

        ssl.keystore.location

        Path to the keystore JKS file.

        ssl.keystore.password

        Password for the keystore.

        ssl.key.password

        Password for the key.

        ssl.truststore.location

        Path to the truststore JKS file.

        ssl.truststore.password

        Password for the truststore.

        It is also necessary to add the following property for MDM to consume Kafka messages:

        Key Value

        value.deserializer

        org.apache.kafka.common.serialization.StringDeserializer

    • For Salesforce:

      1. In Server Name, provide the name of a previously defined Salesforce server connection (in the File Explorer perspective under Servers, see Connect to a Server).

      2. In Channel Name, provide the name of the Salesforce channel from which messages will be read.

  7. For the JMS provider, it is possible to filter messages using the Message Selector. The filter is a string with a syntax similar to that of an SQL WHERE clause.

  8. Under Message Batching, set the thresholds.

  9. Under Processed Entities:

    1. Specify the entities that should be processed by this stream consumer by doing one of the following:

      • Select All Entities.

      • List the entities that should be parsed from messages by the message transformer.

    2. If required, limit the columns affected by this configuration for all necessary entities.

  10. Under Message Transformer:

    1. Under Transformer Type, add one transformer. See the following for additional configuration on each transformer type.

    2. (Optionally for JMS) Specify JMS Headers used and map them to input columns used in the plan.

    3. (Optionally for JMS) Under JMS Properties, specify custom JMS properties (including their type) and map them to input columns used in the plan.

  11. Switch to the Error Handler tab to select an error handler or select OK to save your configuration and exit the dialog.

    JMS stream source general settings tab

Configure an error handling strategy

The error handling strategy allows you to configure how to deal with issues with loading data from the stream consumer.

  1. Select the Error Handler tab in your General Stream Consumer Configuration.

  2. To activate this function, select Enable.

    JMS stream source error handler setting tab

There are two parts to configuring the error handler: threshold and error handler consumer.

  1. Window size and Threshold both define the number of failed messages allowed before the consumer stops. If Window Size is set to 20 and threshold to 2, this means that if 2 out of 20 messages fail, the consumer stops. The window size acts as a sliding window taking into account the last 20 messages consumed.

    Message Failures

    • A batch of messages is processed by the transformer to produce input records which are then loaded into MDM. In MDM the predefined plan is followed (change detection, cleansing) until the records are committed.

    • If this process fails, the initial message batch load is split, and the smaller batches are processed serially.

      If the process continues to fail, this splitting is repeated until each message is processed individually. This is done to determine which exact message is responsible for the failure and to process as many messages as possible.

    • This single-message batch is considered failed for the purpose of the error handling strategy, to determine whether the threshold for failures has to be exceeded within the given window.

  2. Error Handler Consumers define the destinations where the failed messages are forwarded. To configure, select Add next to the Error Handler Consumers table. There are three available error consumers:

    1. Log to File - Failed messages are forwarded to a file similar to a log. To configure this error consumer:

      1. Double-click the row number of the newly created error handler consumer.

      2. Enter the Name as well as the log file path of the consumer. It is possible to set the date for the log file name, that is, ../etc/storage/jms-errors-${yyyy-MM-dd}.log.

      3. Select Apply and then OK.

    2. Send to JMS - Failed messages are forwarded to the configured destination of the JMS consumer. To configure this error consumer:

      1. Double-click the row number of the newly created error handler consumer.

      2. Enter the Name.

      3. Press Ctrl+Space to select a Connection Name.

      4. Enter the Destination Name which consists of the topic or queue.

      5. Select Apply and then OK.

    3. Send to Kafka - Failed messages are forwarded to the configured destination of the Kafka consumer. To configure this error consumer:

      1. Double-click the row number of the newly created error handler consumer.

      2. Enter the Name.

      3. Enter the Topic.

      4. Enter the Resource Name using Ctrl+Space.

      5. Select Apply and then OK.

Configure process delta transformer

If you selected Process Delta Transformer as your transformer type:

  1. Double-click the number next to the implementation.

  2. Under Format, select the message format.

  3. If you selected SOAP under Format, specify one of the existing Process Delta services (defined under Services in the model project). If you selected XmlRpc, leave the field empty.

Configure plan transformer

If you selected Plan Transformer as your transformer type, you need to generate and configure the plan that will transform JMS messages.

  1. Right-click the configured consumer and select Generate.

  2. Open the generated plan under Files > engine > stream. The file name is the same as the consumer name.

  3. Configure the plan according to your needs.

Example plan

In this example, the messages coming from the JMS queue contain data in the JSON format, so the Json Parser step is used to transform the messages.

Open the built-in General MDM project - CDI example to click through the plan for details. See MDM Example Project for detailed instructions about opening the plan.
Example plan

Was this page useful?