Kafka Producers and Consumers (Console / Java) using SASL_SSL

Intro

  • Producers / Consumers help to send / receive message to / from Kafka
  • SASL is used to provide authentication and SSL for encryption
  • JAAS config files are used to read kerberos ticket and authenticate as a part of SASL
  • Kafka Version used in this article :0.9.0.2

Console Producers and Consumers

Follow the steps given below

Edit kafka_client_jaas.conf file (under /usr/hdp/current/kafka-broker/conf)

KafkaClient{

com.sun.security.auth.module.Krb5LoginModule required

useTicketCache=true

renewTicket=true

serviceName="kafka";

};
Client{

com.sun.security.auth.module.Krb5LoginModule required

useTicketCache=true

renewTicket=false

serviceName="zk";

};

Edit kafka-env.sh file (under /usr/hdp/current/kafka-broker/conf)

KAFKA_KERBEROS_PARAMS="-Djava.security.auth.login.config=/usr/hdp/current/kafka-broker/conf/kafka_client_jaas.conf"

Create a truststore, add Root CA

The trust store must contain the organizations root CA

keytool -keystore client.truststore.jks -alias CARoot -import -file <certName>.cer

Edit producer.properties file

bootstrap.servers=<kafkabrokerserver1>:6667,<kafkabrokerserver2>:6667...
metadata.broker.list=<kafkabrokerserver1>:6667,<kafkabrokerserver2>:6667...
security.protocol=SASL_SSL
sasl.kerberos.service.name=kafka
ssl.truststore.location=/etc/security/ssl/client.truststore.jks
ssl.truststore.password=<password>

Edit consumer.properties

ssl.truststore.location=/etc/security/ssl/client.truststore.jks
ssl.truststore.password=<password>
security.protocol=SASL_SSL

Create a kerberos ticket

kinit user@DOMAIN.COM

Execute Producer

/usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list <kafkabrokerserver1>:6667 --topic <topicName> --security-protocol SASL_SSL --producer.config /usr/hdp/current/kafka-broker/conf/producer.properties --new-producer

Execute Consumer

/usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server <kafkabrokerserver1>:6667 --topic <topicName> --from-beginning --security-protocol SASL_SSL --consumer.config /usr/hdp/current/kafka-broker/conf/consumer.properties --new-consumer

Messages entered in the producer console would be received in the consumer console


Using JAVA API

KafkaProducerNew.java

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
Properties prop = new Properties();
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "<kafkabrokerserver1>:6667,...");
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
prop.put("security.protocol", "SASL_SSL");
prop.put("ssl.truststore.location","/etc/security/ssl/client.truststore.jks");
prop.put("ssl.truststore.password", "<password>");

Producer producer = new KafkaProducer(prop);
String topicName = <topicName>;
String message = "this is a message";
ProducerRecord<String,String> rec = new ProducerRecord<String,String>(topicName,message);
producer.send(rec);

producer.close();

KafkaConsumerNew.java

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ProducerConfig;
import org.apache.kafka.clients.producer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerConfig;
private KafkaConsumer<String, String> kafkaConsumer;
private String topicName=<topicName>;
private String groupId;

Properties prop = new Properties();
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "<kafkabrokerserver1>:6667,...");
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
prop.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
prop.put(ConsumerConfig.CLIENT_ID_CONFIG, "simple");
prop.put("security.protocol", "SASL_SSL");
prop.put("ssl.truststore.location","/etc/security/ssl/client.truststore.jks");
prop.put("ssl.truststore.password", "<password>");

kafkaConsumer = new KafkaConsumer<String, String>(prop);
kafkaConsumer.subscribe(Arrays.asList(topicName));

try{

while(true){

ConsumerRecords<String,String> recs = kafkaConsumer.poll(100);
for(ConsumerRecord<String,String> rec : recs)
System.out.println(rec.value());

}

}catch(WakeupException ex){

system.err.println(ex.getMessage());

}finally{

kafkaConsumer.close();

}

Dependencies (pom.xml)

<project xmlns="http://maven.org./POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
<modelVersion>4.0.0</modelVersion>

<groupId></groupId>
<artifactId>KafkaProducer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>

<dependencies>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.9.0.2.4.2.12-1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.2.4.2.12-1</version>
</dependency>

<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.9</version>
</dependency>

</dependencies>

Execute Producer

java -Djava.security.auth.login.config=/usr/hdp/current/kafka-broker/config/kafka_client_jaas.conf -Djava.security.krb5.conf=/etc/krb5.conf -cp <uberJar> <ProducerClassName> <topicName>

Execute Consumer

java -Djava.security.auth.login.config=/usr/hdp/current/kafka-broker/config/kafka_client_jaas.conf -Djava.security.krb5.conf=/etc/krb5.conf -cp <uberJar> <ConsumerClassName> <topicName>

Advertisements

About shalishvj : My Experience with BigData

6+ years of experience using Bigdata technologies in Architect, Developer and Administrator roles for various clients. • Experience using Hortonworks, Cloudera, AWS distributions. • Cloudera Certified Developer for Hadoop. • Cloudera Certified Administrator for Hadoop. • Spark Certification from Big Data Spark Foundations. • SCJP, OCWCD. • Experience in setting up Hadoop clusters in PROD, DR, UAT , DEV environments.
This entry was posted in Kafka and tagged , , , , . Bookmark the permalink.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s