Tuesday, 22 June 2021

Usage of Kafka Clients: Write a kafka producer that write messages to a kafka topic

Lets say we have created a kafka topic using a command line :

kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --topic twitter-topic --create 

--partitions 3      



Verify the Kafka topic details - by listing in the console:


kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --topic twitter-topic --describe         


Topic: twitter-topic PartitionCount: 3 ReplicationFactor: 1 Configs: segment.bytes=1073741824

Topic: twitter-topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0

Topic: twitter-topic Partition: 1 Leader: 0 Replicas: 0 Isr: 0

Topic: twitter-topic Partition: 2 Leader: 0 Replicas: 0 Isr: 0  



Now you have a kafka topic - Write a Kafka Producer using Kafka clients dependency:



Dependencies (if in gradle):


dependencies {
implementation group: 'org.apache.kafka', name: 'kafka-clients', version: '2.8.0'
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.7.0'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.7.0'
}

Write a Kafka producer in a normal java class inside main method:
----------------------------------------------------------------------
package com.basic.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class ProducerWithoutKeyDemo {
public static void main(String[] args) {
//Add REQUIRED properties - set up kafka config
Properties properties = new Properties();
//If you don't set the following properties
        //possible exception: org.apache.kafka.common.config.ConfigException
        //If don't not set: exception: No resolvable bootstrap urls given in bootstrap.servers
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
//If don't not set: exception:
        //Missing required configuration "key.serializer" which has no default value.
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                                                                            StringSerializer.class.getName());
//If don't not set: exception:
        //Missing required configuration "value.serializer" which has no default value.
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                                                                            StringSerializer.class.getName());
        //create a kafka producer
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
        //send data to kafka topic
        String message = "Hey man";
ProducerRecord<String, String> producerRecord
                    = new ProducerRecord<>("twitter-topic", message);
//send needs a ProducerRecord type - defined without a key.
//without key means - the messages can go to any partition in twitter-topic
producer.send(producerRecord);

//Very important note:
//If you don't flush - message will not go to topic
producer.flush();
//Instead of the above- you flush & close in one step by using .close() method
producer.close();
}
}
----------------------------------------------------------------------
You can test the producer by executing a console consumer on terminal:

kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic twitter-topic 

No comments:

Post a Comment