Integrating Kafka and Storm

Intro

  • This article focuses on integrating Storm 0.10.0.2 and Kafka 0.8.2.2
  • Data is not encrypted in this case

Create client_jaas file (under /usr/hdp/current/storm-client/conf/)

KafkaClient{

com.sun.security.auth.module.Krb5LoginModule required

useTicketCache=true

renewTicket=true

serviceName="kafka";

};
Client{

com.sun.security.auth.module.Krb5LoginModule required

useTicketCache=true

renewTicket=false

serviceName="zk";

};
StormClient{

com.sun.security.auth.module.Krb5LoginModule required

useTicketCache=true

renewTicket=true

serviceName="nimbus";

};

Create storm.yaml (/usr/hdp/current/storm-client/conf/)

nimbus.seeds:["",""]
nimbus.thrift.port:6627
java.security.auth.login.config:"/etc/storm/conf/client_jaas.conf"
storm.thrift.transport:"backtype.storm.security.auth.kerberos.KerberosSaslTransportPlugin"

Create client_jaas.conf file (which has to be copied to a local directory in the supervisor nodes)

KafkaClient{

com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab=""
storeKey=true
useTicketCache=false
principal="@DOMAIN"
serviceName="kafka";

};
Client{

com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab=""
storeKey=true
useTicketCache=false
principal="@DOMAIN"
serviceName="zookeeper"
};

is the service Id

Create key tab file

ktutil
addent -password -p @DOMAIN -k 1 -e aes256-cts
addent -password -p @DOMAIN -k 1 -e arcfour-hmac
wkt user.keytab
quit

Using secure copy, copy the keytab files and storm client_jaas.conf into supervisor local dirs

KafkaSpout and Topology creation

String zkIp = "zookeeperServer1, server2...";
String zookeepers = "zookeeperServer1:2181,...";
String brokerZkPath = "/kafka/brokers";
String zkRoot = "/kafka-spout";
ZkHosts zkHosts = new ZkHosts(zookeepers,brokerZkPath);
SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, topicName, zkRoot, "storm");
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig);
Config config = new Config();
config.setMaxTaskParallelism(5);
config.setDebug(true);
config.setNumWorkers(2);
config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING,1);
config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,2);
config.put(Config.NIMBUS_THRIFT_PORT, 6627);
config.put(Config.STORM_ZOOKEEPER_PORT, 2181);
config.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList(zkip));
config.put(Config.TOPOLOGY_WORKER_CHILDOPTS,"-Xmx768m -Djava.security.auth.login.config=<clientjaas location local to supervisor nodes, may be an NFS> -javaagent:/usr/hdp/current/storm-client/contrib/storm-jmxetric/lob/jmxetric-1.0.4.jar=host=localhost,port=8650,wireformat31x=true,mode=multicast,config=/usr/hdp/current/storm-client/....jmxetric-conf.xml,process=Worker_%ID%_JVM");
config.put(Config.TOPOLOGY_AUTO_CREDENTIALS,Arrays.asList(new String[]{"backtype.storm.security.auth.kerberos.AutoTGT"});

TopologyBuilder.builder=new TopologyBuilder();
builder.setSpout("spoutName",kafkaSpout,1); 
builder.setBolt("print", new PrintBolt()).shuffleGrouping("spoutName"));
try{
StormSubmitter.submitTopologyWithProgressBar("<topologyName>",config,builder.createTopology());
}catch(Exception e){

}
}
}

Bolt:-

class PrintBolt extends BaseRichBolt{

public void execute (Tuple tuple){

System.out.println(“message…”);

}

}

..

pom.xml

storm version: 0.10.0.2.3.0.0-2557

storm.kafka.version: 0.10.0.2.3.0.0-2557

kafka.version:0.8.2.2.3.0.0-2557

Exclude from each dependency

groupId:org.slf4j

artifactId: slf4j-log4j-over-slf4j

artifactId:slf4j-log4j12

Create an uber jar and move it to the Edge Node

Run Topology

storm jar <jarName> <className>

List Topologies

storm list

Check the Storm log files (from Supervisor Nodes) to verify the output.

Kill Topology

storm kill <topName>

 

Advertisements

About shalishvj : My Experience with BigData

6+ years of experience using Bigdata technologies in Architect, Developer and Administrator roles for various clients. • Experience using Hortonworks, Cloudera, AWS distributions. • Cloudera Certified Developer for Hadoop. • Cloudera Certified Administrator for Hadoop. • Spark Certification from Big Data Spark Foundations. • SCJP, OCWCD. • Experience in setting up Hadoop clusters in PROD, DR, UAT , DEV environments.
This entry was posted in Kafka, Storm and tagged , , . Bookmark the permalink.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s