Hands-on Kafka

About Kafka

  • Messaging system
  • It doesnt transform data
  • Messages are organized into Topics
  • Producers push messages
  • Consumers pull messages
  • Kafka runs in a cluster
  • Nodes are called brokers

Why Kafka – Advantages 

  • Large number of Consumers
  • Ad-hoc consumers
  • Batch Consumers
  • Automatic recovery from Broker failures

Limitations

  • Not end-user solution: Need to write Producers and Consumers
  • No data transformations (no encryption..)
  • No authorization, authentication  yet

Topic

  • Topic has multiple partitions
  • can put each partition on a separate machine
  • Partitions help to parallelise the topics

Partitions

  • Inside partitions, each message has an id
  • Ids are called offsets
  • While consuming messages, offsets are specified

 

  • Each Broker has many partitions
  • Each partition has leader and replicas
  • All reads and writes go to the leaders
  • Leader replicates to the replicas
  • Producers / Consumers never interact with replicas
  • Kafka retains the messages or topics for a certain amount of time
  • Consumers are responsible for consuming data ahead of deletion

Producers

  • Synchronous Producers
    • Send a message – wait for Kafka to ack
  • Asyc producer:
    • write to the producer
    • doesnt wait for ack
    • buffers in local memory
    • at some point it writes to Kafka
    • suitable for cases which care about performance and losing some data is less relevant

Kafka On Windows

Download Kafka from : http://kafka.apache.org/

Save and untar it under C:\mykafka\kafka_2.11-0.10.0.1

Start Zookeeper

C:\mykafka\kafka_2.11-0.10.0.1\bin\windows>zookeeper-server-start.bat       C:\mykafka\kafka_2.11-0.10.0.1\config\zookeeper.properties

Edit server.properties

  • broker.id=1
  • port=9091
  • log.dirs=C:\mykafka\kafka-log-2
  • zookeeper.connect=localhost:2181

Similarly create server-1.properties file

Start Broker

kafka-server-start.bat C:\mykafka\kafka_2.11-0.10.0.1\config\server.properties

kafka-server-start.bat C:\mykafka\kafka_2.11-0.10.0.1\config\server-1.properties

Create Topics

Create a topic: first

kafka-topics.bat –zookeeper localhost:2181 –create –topic first –partitions 2 –replication-factor 2

All brokers pickup change from zookeeper

Producer Console

kafka-console-producer.bat –broker-list localhost:9092 –topic first

Consumer Console

kafka-console-consumer.bat –zookeeper localhost:2181 –topic first

Any messages typed in the Producer console is immediately available in the Consumer console.

Get all message from beginning:

C:\mykafka\kafka_2.11-0.10.0.1\bin\windows>kafka-console-consumer.bat –zook
eeper localhost:2181 –topic first –from-beginning

Referencehttps://cwiki.apache.org/confluence/display/KAFKA/Ecosystem

 

Posted in Kafka | Tagged , , , , , , | Leave a comment

Tips: Unix

Display available space on file system

df -h

Display number of kilobyes used by each subdirectory

du -h

du -sh (summary)

Find

Find all files that starts with pro

find . -name pro\*

Groups
Primary group: group to which an id belongs-mainly for access
Netgroup: group containing members to access/perform operations on a machine

Get members in a netgroup:

getent netgroup fg_UP20_dev_users

View netgroup on a machine:

bash-4.2$ cat /etc/passwd-
$ grep @ /etc/passwd

Get PID of process running in a port 

netstat -anp | grep 8080 (get pid)

Get Process Details

ps -aux | grep pid

stay tuned..

Posted in Unix | Tagged , , , , , , , | Leave a comment

Tips: Cluster Installation

  1. For Ambari not to override configs: edit the file corr. to the service          /var/lib/ambari-agent/cache/stacks/HDP/2.0.6/services/YARN/package/templates/yarn-env.sh.j2

 

Posted in Hadoop Cluster Installation, Uncategorized | Leave a comment

Hadoop on Windows

Hadoop on Windows

  • Download the required binaries (e.g., winutils.exe) necessary to run hadoop
  • Download link: https://github.com/srccodes/hadoop-common-2.2.0-bin/archive/master.zip
  • Add it to $HADOOP_HOME/bin
  • Set  $HADOOP_HOME, $JAVA_HOME under environment variables
  • Reference: http://stackoverflow.com/questions/19620642/failed-to-locate-the-winutils-binary-in-the-hadoop-binary-path

 

Spark on Windows

  • While running spark, you can refer to a local path in your computer
  • Spark Master needs to be set to local

String inpath = “C:/New/abc.txt”;
String outpath = “C:/New/New1”;

SparkConf conf = new SparkConf().setAppName(“sparkAction”).setMaster(“local”);

Stay tuned..

Posted in Hadoop, spark, Uncategorized | Tagged , , | Leave a comment

Tips: Maven

Terminologies

GroupId: Package name used in the application

ArtifactId: Name of the class

 

Create Uber Jar

<project>
....
....

<build>
    <plugins>
      <!-- Maven shade plug-in that creates uber JARs -->
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>2.3</version>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>
</project>
Posted in Maven, Uncategorized | Tagged , , | Leave a comment

Tips: Java

Anonymous Classes

  • They enable you to declare and instantiate a class at the same time.
  • They are like local classes except that they do not have a name.
  • Use them if you need to use a local class only once.
  • Help to instantiate an Interface

http://docs.oracle.com/javase/tutorial/java/javaOO/anonymousclasses.html

 

Posted in Java, Uncategorized | Tagged , | Leave a comment

Spark -> Parquet,ORC

Create Java RDDs

String filePath = “hdfs://<HDFSName>:8020/user…”
String outFile = “hdfs://<HDFSName>:8020/user…”
SparkConf conf = new SparkConf().setAppName(“appname”);
JavaSparkContext jsc = new JavaSparkContext(conf);
JavaRDD<String> inFIleRDD = jsc.textFile(filePath);

Remove initial empty lines from a file

import org.apache.spark.api.java.function.Function2;
Function2 removeSpace = new Function2<Integer, Iterator<String>, Iterator<String>>(){
public Iterator<String> call (Integer ind, Iterator<String> iterator) throws Exception {
if (ind == 0 && iterator.hasNext() && (iterator.next().trim != “”) {
return iterator;
}
JavaRDD<String> fileWOSpaceRDD = inFileRDD.mapPartitionsWithIndex( removeSpace, false);
ind == 0 takes first partition of the RDD
iterator.hasNext() && (iterator.next().trim != “” makes sure that all empty lines are skipped

Remove initial non -empty lines including headers from a file

import org.apache.spark.api.java.function.Function2;
Function2 removeLines = new Function2<Integer, Iterator<String>, Iterator<String>>(){
public Iterator<String> call (Integer ind, Iterator<String> iterator) throws Exception {
if (ind == 0 && iterator.hasNext() && (iterator.next().trim != “”) {
iterator.next(); //include this as many times as the number of lines you need to remove
return iterator;
}
else return iterator;
}
JavaRDD<String> fileWOSpaceRDD = inFileRDD.mapPartitionsWithIndex( removeLines, false);
ind == 0 takes first partition of the RDD
include iterator.next(); as many times as the number of lines you need to remove

Store Files in Parquet Format

Create a java bean class (SparkBean) with the fields age, id
JavaRDD<String> inFIleRDD = jsc.textFile(filePath);
JavaRDD<SparkBean> sBRDD = inFileRDD.map(new Function<String, SparkBean> ()){
 public SparkBean call(String line) throws Exception{
String[] parts = line.trim().split(“,”);
SparkBean sb = new SparkBean();
sb.setAge();
sb.setId();
return sb;
}
}

Save as Parquet File

SQLContext sqlCtx = new SQLContext(sparkconf);
DataFrame sd = sqlCtx.createDataFrame(sBRDD, SparkBean.class);
sd.registerTempTable(“testTable”);
sd.saveAsParquetFile(outFile); //in HDFS

Versions and POM File

groupId : org.apache.spark
artifactId : spark-core_2.10
version : 1.6.1
groupId : org.apache.spark
artifactId : spark-sql_2.10
version : 1.3.1

 

groupId : org.apache.spark
artifactId : spark-hive_2.10
version : 1.4.1

Execute

spark-submit –class <fully qualified classname> –master yarn-client –queue <queuename>  –num-executors 3 –driver-memory 3g –executor-memory 1g \

–conf spark.sql.parquet.compression.codec=uncompressed –executor-cores 3

<jar namewithlocation>

Create HIVE Tables 

  • FIELD NAMES, TYPES SHOULD BE SAME AS PARQUET TABLE ( BEAN CLASS)
  • NOT CASE SENSITIVE
  • TABLE NAME CAN BE DIFFERENT
create external table parquetTable (
age string,
id string)
STORED AS PARQUET
LOCATION ‘<hdfs file location>’;

Hive Commands

describe parquetTable;
select * from parquetTable limit 5;

Save As ORC

JavaSparkContext jsc = new JavaSparkContext(conf);
HiveContext hc = new HiveContext(jsc.sc());
DataFrame sd = sqlCtx.createDataFrame(sBRDD, SparkBean.class);
sd.select().save(outFilePathHdfs,”org.apache.spark.sql.hive.orc”,SaveMode.Append);

ORC: Create HIVE Tables 

create external table ORCTable (
age string,
id string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ‘,’
STORED AS ORC
LOCATION ”
tblproperties (“orc.compress”=”NONE”);
Posted in spark, Uncategorized | Tagged , , , , , , , | Leave a comment