Sunday, January 18, 2015

Testing WSO2 BAM 2.5.0 Kafka Input Event Adaptor

WSO2 BAM 2.5.0 now supports processing data streams based on the Kafka Event Adaptor.

Apache Kafka is a fast, scalable and distributed publish-subscribe messaging system.
It maintains topics which contain message feeds. These messages are written to topics by Producers and read by Consumers.
These topics are partitioned and replicated across multiple nodes, thereby making Kafka a distributed system.

Let's see how to configure a Kafka based input adapter in WSO2 BAM 2.5.0 and capture attributes from a  message published to a topic on Kafka by WSO2 BAM 2.5.0

Setting up Kafka:

Kafka can be downloaded from here

Once downloaded unzip the distribution as follows.
tar xvf kafka_2.10-

Navigate to the folder unzip file was extracted to as follows

cd kafka_2.10-

Execute the following command to start the zookeeper server.
bin/ config/

Then open another console, navigate to the Kafka folder and execute the following command to start the Kafka server
bin/ config/

Now open another console, navigate to the Kafka folder and execute the following command to create a topic
 bin/ --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafkaTestTopic1

The topic name given here is 'kafkaTestTopic' 

A producer needs to be started to send messages to the created topic.
Therefore navigate to open another console and navigate to the Kafka folder and execute the following command.
bin/ --broker-list localhost:9092 --topic kafkaTestTopic1

Setting up WSO2 BAM 2.5.0:

Download and extract  WSO2 BAM 2.5.0 from here

Copy the following .jar files at the <Kafka_Home>/lib to <BAM_HOME>/repository/components/lib


Navigate to <BAM_HOME>/bin and start the server as follows.

Log into the Management console of BAM and navigate to Configure-->Event Processor Configs --> Input Event Adaptors

Click on 'Add Input Event Adaptor', specify the input adaptor details as follows and create an input adaptor.

 Next navigate to Main --> Event processor --> Event streams.
Click on 'Add Event Stream' and specify an event stream to capture the data required.
Specify the payload attributes and their types to be captured.

Click on 'Add Event Stream' and specify the option 'Custom Event Builder' in the event builder options that appear.

Specify the event builder configurations as follows  and add an event builder though the pop up window that appears.

When specifying the event builder configurations you need to add the name of the topic to be listened to.
I have added here the topic created  under Kafka configurations ie. kafkaTestTopic1
The input mapping type is specified as json.

Now it is required to send a message to the topic.
Go back to the producer console started and copy paste the following json string.

{"event": { "payloadData": {"kafkaAtt1": "4","kafkaAtt2": "TestString"}}}

Once this is done, the values of attributes specified in the kafkaEventStream will be captured and and entry will be made in the stream kafkaEventStream under in the Cassandra Key space EVENT_KS.

To view this navigate to Tools --> Cassandra Explorer --> Connect to Cluster
and specify the Cassandra connection details.

Once connected the you will be able to see the kafkaEventStream under EVENT_KS.

Click on 'kafkaEventStream'.
The entry made for the captured data will be displayed.

Click on 'View more' option.

A details version of the stream entry will be displayed.
You can see the captured attributes and their values.