Icon

Kafka_​integration

Knime Integration with Kafka

This workflow represents a worfklow that receives information from kafka, perform operation on it and publish it back to kafka.

This workflow consumes transactions in JSON format from kafka. For the customer ID field in a record, the workflow fetches extra customer informaton from an already existing table and appends the information to the processing record. Also, the same is done for each product number in the record. Once, complete information is available in a single table, the workflow aggregates the data to calculate all the products bought by a single customer and the total amount spent by him. This aggregated information is passed back to kafka on topic "customer_transactions".

To run this worfklow :
1. Start Kafka cluster on your local system
2. Create a topic "transactions" and publish JSON records of the following form to it:
{"OrderNumber" : 23893756,"Date" : "8-28-2015","CustomerID" : "69-695-442-229","ProductNr" : "I-163-2017"}.
3. Once that is done, edit kafka consumer node to edit the stop criteria and change the time stamp value.
4. Now, run the workflow
5. Once the workflow is completed, run a kafka consumer on your local system and consume from the topic "customer_transactions". The processed records with aggregated information should be available on the new topic.

Consume Transaction records from kafka Append Customer and Product Information to each record Perform necessary aggregation and post the result back to kafka Connect to Kafka ClusterCustomer InformationProduct InformationConsume records from"transactions" topicParse JSON recordsFilter RelevantColumnsFor each customer,collect all the products boughtand total price Publish records back to kafka Kafka Connector Table Reader SQLite Connector DB Table Selector DB Reader Kafka Consumer Java Snippet Column Filter Column Filter GroupBy Table to JSON Kafka Producer Kafka Connector Add customerinformation Add ProductInformation Consume Transaction records from kafka Append Customer and Product Information to each record Perform necessary aggregation and post the result back to kafka Connect to Kafka ClusterCustomer InformationProduct InformationConsume records from"transactions" topicParse JSON recordsFilter RelevantColumnsFor each customer,collect all the products boughtand total price Publish records back to kafkaKafka Connector Table Reader SQLite Connector DB Table Selector DB Reader Kafka Consumer Java Snippet Column Filter Column Filter GroupBy Table to JSON Kafka Producer Kafka Connector Add customerinformation Add ProductInformation

Nodes

Extensions

Links