Kafka Producers and Consumers (Console / Java) using SASL_SSL

Intro

  • Producers / Consumers help to send / receive message to / from Kafka
  • SASL is used to provide authentication and SSL for encryption
  • JAAS config files are used to read kerberos ticket and authenticate as a part of SASL
  • Kafka Version used in this article :0.9.0.2

Console Producers and Consumers

Follow the steps given below

Edit kafka_client_jaas.conf file (under /usr/hdp/current/kafka-broker/conf)

KafkaClient{

com.sun.security.auth.module.Krb5LoginModule required

useTicketCache=true

renewTicket=true

serviceName="kafka";

};
Client{

com.sun.security.auth.module.Krb5LoginModule required

useTicketCache=true

renewTicket=false

serviceName="zk";

};

Edit kafka-env.sh file (under /usr/hdp/current/kafka-broker/conf)

KAFKA_KERBEROS_PARAMS="-Djava.security.auth.login.config=/usr/hdp/current/kafka-broker/conf/kafka_client_jaas.conf"

Create a truststore, add Root CA

The trust store must contain the organizations root CA

keytool -keystore client.truststore.jks -alias CARoot -import -file <certName>.cer

Edit producer.properties file

bootstrap.servers=<kafkabrokerserver1>:6667,<kafkabrokerserver2>:6667...
metadata.broker.list=<kafkabrokerserver1>:6667,<kafkabrokerserver2>:6667...
security.protocol=SASL_SSL
sasl.kerberos.service.name=kafka
ssl.truststore.location=/etc/security/ssl/client.truststore.jks
ssl.truststore.password=<password>

Edit consumer.properties

ssl.truststore.location=/etc/security/ssl/client.truststore.jks
ssl.truststore.password=<password>
security.protocol=SASL_SSL

Create a kerberos ticket

kinit user@DOMAIN.COM

Execute Producer

/usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list <kafkabrokerserver1>:6667 --topic <topicName> --security-protocol SASL_SSL --producer.config /usr/hdp/current/kafka-broker/conf/producer.properties --new-producer

Execute Consumer

/usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server <kafkabrokerserver1>:6667 --topic <topicName> --from-beginning --security-protocol SASL_SSL --consumer.config /usr/hdp/current/kafka-broker/conf/consumer.properties --new-consumer

Messages entered in the producer console would be received in the consumer console


Using JAVA API

KafkaProducerNew.java

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
Properties prop = new Properties();
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "<kafkabrokerserver1>:6667,...");
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
prop.put("security.protocol", "SASL_SSL");
prop.put("ssl.truststore.location","/etc/security/ssl/client.truststore.jks");
prop.put("ssl.truststore.password", "<password>");

Producer producer = new KafkaProducer(prop);
String topicName = <topicName>;
String message = "this is a message";
ProducerRecord<String,String> rec = new ProducerRecord<String,String>(topicName,message);
producer.send(rec);

producer.close();

KafkaConsumerNew.java

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ProducerConfig;
import org.apache.kafka.clients.producer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerConfig;
private KafkaConsumer<String, String> kafkaConsumer;
private String topicName=<topicName>;
private String groupId;

Properties prop = new Properties();
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "<kafkabrokerserver1>:6667,...");
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
prop.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
prop.put(ConsumerConfig.CLIENT_ID_CONFIG, "simple");
prop.put("security.protocol", "SASL_SSL");
prop.put("ssl.truststore.location","/etc/security/ssl/client.truststore.jks");
prop.put("ssl.truststore.password", "<password>");

kafkaConsumer = new KafkaConsumer<String, String>(prop);
kafkaConsumer.subscribe(Arrays.asList(topicName));

try{

while(true){

ConsumerRecords<String,String> recs = kafkaConsumer.poll(100);
for(ConsumerRecord<String,String> rec : recs)
System.out.println(rec.value());

}

}catch(WakeupException ex){

system.err.println(ex.getMessage());

}finally{

kafkaConsumer.close();

}

Dependencies (pom.xml)

<project xmlns="http://maven.org./POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
<modelVersion>4.0.0</modelVersion>

<groupId></groupId>
<artifactId>KafkaProducer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>

<dependencies>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.9.0.2.4.2.12-1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.2.4.2.12-1</version>
</dependency>

<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.9</version>
</dependency>

</dependencies>

Execute Producer

java -Djava.security.auth.login.config=/usr/hdp/current/kafka-broker/config/kafka_client_jaas.conf -Djava.security.krb5.conf=/etc/krb5.conf -cp <uberJar> <ProducerClassName> <topicName>

Execute Consumer

java -Djava.security.auth.login.config=/usr/hdp/current/kafka-broker/config/kafka_client_jaas.conf -Djava.security.krb5.conf=/etc/krb5.conf -cp <uberJar> <ConsumerClassName> <topicName>

Advertisements
Posted in Kafka | Tagged , , , , | Leave a comment

Apache Storm

Architecture / Components

  • Nimbus and Supervisor daemons are designed to be
    • fail-fast (process self-destructs whenever any unexpected situation is encountered)
    • stateless (all state is kept in Zookeeper or on disk)
  • Nimbus and Supervisor daemons must be run under supervision using a tool like daemontools or monit.
  • So if the Nimbus or Supervisor daemons die, they restart like nothing happened.

Topology

Graph of computation where the

nodes represent some individual computations and

edges data being passed between nodes

Tuple

Ordered list of values

Storm provides mechanisms for assigning names to the values.

Stream

Unbounded sequence of Tuples.

Spout

Source of stream in  a topology

Bolt

Accepts tuple from input stream

performs computation / transformation

Emits a new tuple to its output stream

**Each spout and bolt will have one or many individual instances that perform all of this processing in parallel

Stream Grouping 

tells Storm how to send individual tuples between instances of the spout and bolts

Shuffle Grouping: tuples are emitted to instances of bolts at random

 

Posted in Storm, Uncategorized | Tagged , | Leave a comment

How Kerberos works

Terminologies

Principals:

  • Every user and service that participates in the Kerberos authentication protocol requires a principal to uniquely identify itself.
  • there are user principals and service principals
  • eg:- alice@EXAMPLE.COM

Realm

  • an authentication administrative domain

Key distribution center (KDC):

  • The KDC ()  is comprised of three components:
  • the Kerberos database, the authentication service (AS),
  • and the ticket-granting service (TGS).
  • eg:- kdc.example.com – The KDC for the Kerberos realm EXAMPLE.COM

Kerberos WORKFLOW:-

Aim: User needs to access the Service identified by myservice/server1.example.com@EXAMPLE.COM

  • User initiates a request to the AS at kdc.example.com, (identifying himself as the principal xyz@EXAMPLE.COM)
  • AS responds by providing a TGT that is encrypted using the key (password) for the principal
  • User is now prompted to enter the correct password for the principal in order to decrypt the message
  • User now uses TGT and requests a service ticket from the TGS at kdc.example.com
  • TGS validates the TGT and provides user a service ticket, encrypted with the myservice/server1.example.com@EXAMPLE.COM principal’s key
  • User now presents the service ticket to myservice, which can then decrypt it using the myservice/server1.example.com@EXAMPLE.COM key and validate the ticket.
Posted in Security | Tagged , , , | Leave a comment

Hands-on Kafka

About Kafka

  • Messaging system
  • It doesnt transform data
  • Messages are organized into Topics
  • Producers push messages
  • Consumers pull messages
  • Kafka runs in a cluster
  • Nodes are called brokers

Why Kafka – Advantages 

  • Large number of Consumers
  • Ad-hoc consumers
  • Batch Consumers
  • Automatic recovery from Broker failures

Limitations

  • Not end-user solution: Need to write Producers and Consumers
  • No data transformations (no encryption..)
  • No authorization, authentication  yet

Topic

  • Topic has multiple partitions
  • can put each partition on a separate machine
  • Partitions help to parallelise the topics

Partitions

  • Inside partitions, each message has an id
  • Ids are called offsets
  • While consuming messages, offsets are specified

 

  • Each Broker has many partitions
  • Each partition has leader and replicas
  • All reads and writes go to the leaders
  • Leader replicates to the replicas
  • Producers / Consumers never interact with replicas
  • Kafka retains the messages or topics for a certain amount of time
  • Consumers are responsible for consuming data ahead of deletion

Producers

  • Synchronous Producers
    • Send a message – wait for Kafka to ack
  • Asyc producer:
    • write to the producer
    • doesnt wait for ack
    • buffers in local memory
    • at some point it writes to Kafka
    • suitable for cases which care about performance and losing some data is less relevant

Kafka On Windows

Download Kafka from : http://kafka.apache.org/

Save and untar it under C:\mykafka\kafka_2.11-0.10.0.1

Start Zookeeper

C:\mykafka\kafka_2.11-0.10.0.1\bin\windows>zookeeper-server-start.bat       C:\mykafka\kafka_2.11-0.10.0.1\config\zookeeper.properties

Edit server.properties

  • broker.id=1
  • port=9091
  • log.dirs=C:\mykafka\kafka-log-2
  • zookeeper.connect=localhost:2181

Similarly create server-1.properties file

Start Broker

kafka-server-start.bat C:\mykafka\kafka_2.11-0.10.0.1\config\server.properties

kafka-server-start.bat C:\mykafka\kafka_2.11-0.10.0.1\config\server-1.properties

Create Topics

Create a topic: first

kafka-topics.bat –zookeeper localhost:2181 –create –topic first –partitions 2 –replication-factor 2

All brokers pickup change from zookeeper

Producer Console

kafka-console-producer.bat –broker-list localhost:9092 –topic first

Consumer Console

kafka-console-consumer.bat –zookeeper localhost:2181 –topic first

Any messages typed in the Producer console is immediately available in the Consumer console.

Get all message from beginning:

C:\mykafka\kafka_2.11-0.10.0.1\bin\windows>kafka-console-consumer.bat –zook
eeper localhost:2181 –topic first –from-beginning

Referencehttps://cwiki.apache.org/confluence/display/KAFKA/Ecosystem

 

Posted in Kafka | Tagged , , , , , , | Leave a comment

Tips: Unix

Display available space on file system

df -h

Display number of kilobyes used by each subdirectory

du -h

du -sh (summary)

Find

Find all files that starts with pro

find . -name pro\*

Groups
Primary group: group to which an id belongs-mainly for access
Netgroup: group containing members to access/perform operations on a machine

Get members in a netgroup:

getent netgroup fg_UP20_dev_users

View netgroup on a machine:

bash-4.2$ cat /etc/passwd-
$ grep @ /etc/passwd

Get PID of process running in a port 

netstat -anp | grep 8080 (get pid)

Get Process Details

ps -aux | grep pid

stay tuned..

Posted in Unix | Tagged , , , , , , , | Leave a comment

Tips: Cluster Installation

  1. For Ambari not to override configs: edit the file corr. to the service          /var/lib/ambari-agent/cache/stacks/HDP/2.0.6/services/YARN/package/templates/yarn-env.sh.j2

 

Posted in Hadoop Cluster Installation, Uncategorized | Leave a comment

Hadoop on Windows

Hadoop on Windows

  • Download the required binaries (e.g., winutils.exe) necessary to run hadoop
  • Download link: https://github.com/srccodes/hadoop-common-2.2.0-bin/archive/master.zip
  • Add it to $HADOOP_HOME/bin
  • Set  $HADOOP_HOME, $JAVA_HOME under environment variables
  • Reference: http://stackoverflow.com/questions/19620642/failed-to-locate-the-winutils-binary-in-the-hadoop-binary-path

 

Spark on Windows

  • While running spark, you can refer to a local path in your computer
  • Spark Master needs to be set to local

String inpath = “C:/New/abc.txt”;
String outpath = “C:/New/New1”;

SparkConf conf = new SparkConf().setAppName(“sparkAction”).setMaster(“local”);

Stay tuned..

Posted in Hadoop, spark, Uncategorized | Tagged , , | Leave a comment