Data Stream Using Apache Kafka and Camel Application

Apache Kafka is an event streaming platform that was developed by LinkedIn and later made open-source under the Apache Software Foundation. Its primary function is to handle high-volume real-time data streams and provide a scalable and fault-tolerant architecture for creating data pipelines, streaming applications, and microservices.

Kafka employs a publish-subscribe messaging model, in which data is sorted into topics, and publishers send messages to those topics. Subscribers can then receive those messages in real time. The platform offers a scalable and fault-tolerant architecture by spreading data across multiple nodes and replicating data across multiple brokers. This guarantees that data is consistently available, even if a node fails.

Kafka’s architecture is based on several essential components, including brokers, producers, consumers, and topics. Brokers manage the message queues and handle message persistence, while producers and consumers are responsible for publishing and subscribing to Kafka topics, respectively. Topics function as the communication channels through which messages are sent and received.

Kafka also provides an extensive range of APIs and tools to manage data streams and build real-time applications. Kafka Connect, one of its most popular tools and APIs, enables the creation of data pipelines that integrate with other systems. Kafka Streams, on the other hand, allows developers to build streaming applications using a high-level API.

In summary, Kafka is a robust and adaptable platform that can be used to construct real-time data pipelines and streaming applications. It has been widely adopted in various sectors, including finance, healthcare, e-commerce, and more.

To create a Kafka data stream using Camel, you can use the Camel-Kafka component, which is already included in Apache Camel. Below are the steps to follow for creating a Kafka data stream using Camel:

  1. Prepare a Kafka broker and create a topic for the data stream.
  2. Set up a new Camel project on your IDE and include the required Camel dependencies, including the Camel-Kafka component.
  3. Create a new Camel route within your project that defines the data stream. The route should use the Kafka component and specify the topic to which the data should be sent or received.
  4. Select the appropriate data format for the data stream. For instance, if you want to send JSON data, use the Jackson data format to serialize and deserialize the data.
  5. Launch the Camel context and the Kafka producer or consumer to start sending or receiving data.

Overall, using the Camel-Kafka component with Apache Camel is a simple way to create data streams between applications and a Kafka cluster.

Here is the code for reading Table form DB and writing to Kafka cluster: Apache Camel Producer Application:

import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.kafka.KafkaConstants;
import org.springframework.stereotype.Component;

@Component
public class OracleDBToKafkaRouteBuilder extends RouteBuilder {

@Override
public void configure() throws Exception {

// Configure Oracle DB endpoint
String oracleDBEndpoint = "jdbc:oracle:thin:@localhost:1521:orcl";
String oracleDBUser = "username";
String oracleDBPassword = "password";
String oracleDBTable = "mytable";
String selectQuery = "SELECT * FROM " + oracleDBTable;

// Configure Kafka endpoint
String kafkaEndpoint = "kafka:my-topic?brokers=localhost:9092";
String kafkaSerializer = "org.apache.kafka.common.serialization.StringSerializer";

from("timer:oracleDBPoller?period=5000")

// Read from Oracle DB
.to("jdbc:" + oracleDBEndpoint + "?user=" + oracleDBUser + "&password=" + oracleDBPassword)
.setBody(simple(selectQuery))
.split(body())

// Serialize to Kafka
.setHeader(KafkaConstants.KEY, simple("${body.id}"))
.marshal().string(kafkaSerializer)
.to(kafkaEndpoint);
}
}

Here is the code for reading Kafka Topic  and writing the Oracle DB table: Apache Camel Camel  Application;

import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.kafka.KafkaConstants;
import org.springframework.stereotype.Component;

@Component
public class KafkaToOracleDBRouteBuilder extends RouteBuilder {

@Override
public void configure() throws Exception {

// Configure Kafka endpoint
String kafkaEndpoint = "kafka:my-topic?brokers=localhost:9092";
String kafkaDeserializer = "org.apache.kafka.common.serialization.StringDeserializer";

// Configure Oracle DB endpoint
String oracleDBEndpoint = "jdbc:oracle:thin:@localhost:1521:orcl";
String oracleDBUser = "username";
String oracleDBPassword = "password";
String oracleDBTable = "mytable";

from(kafkaEndpoint)

// Deserialize from Kafka
.unmarshal().string(kafkaDeserializer)
.split(body().tokenize("n"))

// Write to Oracle DB
.to("jdbc:" + oracleDBEndpoint + "?user=" + oracleDBUser + "&password=" + oracleDBPassword)
.setBody(simple("INSERT INTO " + oracleDBTable + " VALUES(${body})"))
.to("jdbc:" + oracleDBEndpoint + "?user=" + oracleDBUser + "&password=" + oracleDBPassword);
}
}


Source link