Tips: Hive

Mask a Column

Create a table, Insert values to it

CREATE TABLE IF NOT EXISTS employee_test1 ( eid String, name String)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ‘\t’
LINES TERMINATED BY ‘\n’
STORED AS TEXTFILE;

INSERT INTO TABLE employee_test1 VALUES (‘1’, ‘abc1’), (‘3’, ‘bcd1’);

INSERT INTO TABLE employee_test1 VALUES (‘2’, ‘abc2’), (‘4’, ‘bcd1’);

Create a View

CREATE VIEW employee_test1_vw
AS
SELECT mask_hash(eid) as eid,name
FROM employee_test1;

 

Create Surrogate Key / Unique rows

CREATE TABLE test(
r_id string,
seq_nbr int,
dt string)
STORED AS ORC TBLPROPERTIES (
………………..
);

Initial Load

INSERT INTO TABLE test
SELECT T.r_id, ROW_NUMBER () OVER(ORDER BY T.r_id) AS seq_nbr
FROM test1 T limit 10;

Subsequent Loads

Tip: Get the max of seq_nbr, add it to the row number

SELECT max(seq_nbr) FROM test;

INSERT INTO TABLE test
SELECT T.r_id, ROW_NUMBER () OVER(ORDER BY T.r_id)+<max of seq_nbr> AS seq_nbr
FROM test1 T ;

Posted in Tips, Uncategorized | Tagged , , , , , , | Leave a comment

Hadoop Cluster : Run Command on ALL Nodes

Its usually tough to run a command on all nodes of a hadoop cluster.

Here is a script to do that..

run_command

!/bin/bash

TPUT=’tput -T xterm-color’
txtund=$(${TPUT} sgr 0 1) # Underline
txtbld=$(${TPUT} bold) # Bold
txtrst=$(${TPUT} sgr0) # Reset
if [ $# -ne 1 ]; then
echo “Please pass the command to run as the first argument. Use single quotes if the command takes multiple arguments itself”
else
for host in `cat Hostdetail.txt`; do
echo -e “${txtbld}\n######################################################”
echo -e “# Running command $1 on Host: $host”
echo “######################################################${txtrst}”
ssh root@$host COMMAND=”‘$1′” ‘bash -s’ << ‘END’
$COMMAND
END
done
fi

Hostdetails.txt

server1

server2..

Usage:-

Eg:-

./run_command “ls /usr/lib/jvm/java-1.7.0-openjdk-1.7.0.45.x86_64/jre/bin/”

Edit Hostdetail.txt
** Dont include the node from which the command is being executed

Posted in Hadoop Cluster Administration, Hadoop Cluster Installation, Uncategorized, Unix | Leave a comment

Some Curl Commands for BigData

Writing to HDFS

curl -i -X PUT -T $file -L “http://$namenode:50070/webhdfs/v1//$file?op=CREATE&user.name=$user&#8221;

Reading from HDFS

curl -i -X GET “http://$namenode:50070/webhdfs/v1//$file?op=OPEN&#8221;

In a kerberized environment (Writing to HDFS)

curl –negotiate -ku : -X PUT $file “http://:50070/webhdfs/v1//$file?op=CREATE&user.name=&#8221;

OR

curl -iku $userName:$password -L -T $file  -X PUT “http://:50070/webhdfs/v1//$file?op=CREATE&overwrite=true&#8221;

From a Windows Machine (with Security)

  • cd to the bin directory where JRE is installed
  • create a kerberos ticket (using kinit)

Make a directory in HDFS

curl –negotiate -iku : -X PUT  “http://:50070/webhdfs/v1//?op=MKDIRS

Copy File

curl –negotiate -iku : -X PUT -T $file -L “http://:50070/webhdfs/v1//$file?op=CREATE

Posted in Rest API, Uncategorized, webhdfs | Tagged , , , | Leave a comment

Truststore & Keystore

In SSL handshake,

  • TrustStore is to verify credentials
  • stores certificates from third party, Java application communicate or certificates signed by CA(certificate authorities like Verisign) which can be used to identify third party.

 

  • KeyStore is to provide credential.
  • stores private key and certificates corresponding to there public keys and require if you are SSL Server or SSL requires client authentication

 

Posted in SSL, Uncategorized | Tagged , , | Leave a comment

Integrating Kafka and Storm

Intro

  • This article focuses on integrating Storm 0.10.0.2 and Kafka 0.8.2.2
  • Data is not encrypted in this case

Create client_jaas file (under /usr/hdp/current/storm-client/conf/)

KafkaClient{

com.sun.security.auth.module.Krb5LoginModule required

useTicketCache=true

renewTicket=true

serviceName="kafka";

};
Client{

com.sun.security.auth.module.Krb5LoginModule required

useTicketCache=true

renewTicket=false

serviceName="zk";

};
StormClient{

com.sun.security.auth.module.Krb5LoginModule required

useTicketCache=true

renewTicket=true

serviceName="nimbus";

};

Create storm.yaml (/usr/hdp/current/storm-client/conf/)

nimbus.seeds:["",""]
nimbus.thrift.port:6627
java.security.auth.login.config:"/etc/storm/conf/client_jaas.conf"
storm.thrift.transport:"backtype.storm.security.auth.kerberos.KerberosSaslTransportPlugin"

Create client_jaas.conf file (which has to be copied to a local directory in the supervisor nodes)

KafkaClient{

com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab=""
storeKey=true
useTicketCache=false
principal="@DOMAIN"
serviceName="kafka";

};
Client{

com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab=""
storeKey=true
useTicketCache=false
principal="@DOMAIN"
serviceName="zookeeper"
};

is the service Id

Create key tab file

ktutil
addent -password -p @DOMAIN -k 1 -e aes256-cts
addent -password -p @DOMAIN -k 1 -e arcfour-hmac
wkt user.keytab
quit

Using secure copy, copy the keytab files and storm client_jaas.conf into supervisor local dirs

KafkaSpout and Topology creation

String zkIp = "zookeeperServer1, server2...";
String zookeepers = "zookeeperServer1:2181,...";
String brokerZkPath = "/kafka/brokers";
String zkRoot = "/kafka-spout";
ZkHosts zkHosts = new ZkHosts(zookeepers,brokerZkPath);
SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, topicName, zkRoot, "storm");
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig);
Config config = new Config();
config.setMaxTaskParallelism(5);
config.setDebug(true);
config.setNumWorkers(2);
config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING,1);
config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,2);
config.put(Config.NIMBUS_THRIFT_PORT, 6627);
config.put(Config.STORM_ZOOKEEPER_PORT, 2181);
config.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList(zkip));
config.put(Config.TOPOLOGY_WORKER_CHILDOPTS,"-Xmx768m -Djava.security.auth.login.config=<clientjaas location local to supervisor nodes, may be an NFS> -javaagent:/usr/hdp/current/storm-client/contrib/storm-jmxetric/lob/jmxetric-1.0.4.jar=host=localhost,port=8650,wireformat31x=true,mode=multicast,config=/usr/hdp/current/storm-client/....jmxetric-conf.xml,process=Worker_%ID%_JVM");
config.put(Config.TOPOLOGY_AUTO_CREDENTIALS,Arrays.asList(new String[]{"backtype.storm.security.auth.kerberos.AutoTGT"});

TopologyBuilder.builder=new TopologyBuilder();
builder.setSpout("spoutName",kafkaSpout,1); 
builder.setBolt("print", new PrintBolt()).shuffleGrouping("spoutName"));
try{
StormSubmitter.submitTopologyWithProgressBar("<topologyName>",config,builder.createTopology());
}catch(Exception e){

}
}
}

Bolt:-

class PrintBolt extends BaseRichBolt{

public void execute (Tuple tuple){

System.out.println(“message…”);

}

}

..

pom.xml

storm version: 0.10.0.2.3.0.0-2557

storm.kafka.version: 0.10.0.2.3.0.0-2557

kafka.version:0.8.2.2.3.0.0-2557

Exclude from each dependency

groupId:org.slf4j

artifactId: slf4j-log4j-over-slf4j

artifactId:slf4j-log4j12

Create an uber jar and move it to the Edge Node

Run Topology

storm jar <jarName> <className>

List Topologies

storm list

Check the Storm log files (from Supervisor Nodes) to verify the output.

Kill Topology

storm kill <topName>

 

Posted in Kafka, Storm | Tagged , , | Leave a comment

Kafka Producers and Consumers (Console / Java) using SASL_SSL

Intro

  • Producers / Consumers help to send / receive message to / from Kafka
  • SASL is used to provide authentication and SSL for encryption
  • JAAS config files are used to read kerberos ticket and authenticate as a part of SASL
  • Kafka Version used in this article :0.9.0.2

Console Producers and Consumers

Follow the steps given below

Edit kafka_client_jaas.conf file (under /usr/hdp/current/kafka-broker/conf)

KafkaClient{

com.sun.security.auth.module.Krb5LoginModule required

useTicketCache=true

renewTicket=true

serviceName="kafka";

};
Client{

com.sun.security.auth.module.Krb5LoginModule required

useTicketCache=true

renewTicket=false

serviceName="zk";

};

Edit kafka-env.sh file (under /usr/hdp/current/kafka-broker/conf)

KAFKA_KERBEROS_PARAMS="-Djava.security.auth.login.config=/usr/hdp/current/kafka-broker/conf/kafka_client_jaas.conf"

Create a truststore, add Root CA

The trust store must contain the organizations root CA

keytool -keystore client.truststore.jks -alias CARoot -import -file <certName>.cer

Edit producer.properties file

bootstrap.servers=<kafkabrokerserver1>:6667,<kafkabrokerserver2>:6667...
metadata.broker.list=<kafkabrokerserver1>:6667,<kafkabrokerserver2>:6667...
security.protocol=SASL_SSL
sasl.kerberos.service.name=kafka
ssl.truststore.location=/etc/security/ssl/client.truststore.jks
ssl.truststore.password=<password>

Edit consumer.properties

ssl.truststore.location=/etc/security/ssl/client.truststore.jks
ssl.truststore.password=<password>
security.protocol=SASL_SSL

Create a kerberos ticket

kinit user@DOMAIN.COM

Execute Producer

/usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list <kafkabrokerserver1>:6667 --topic <topicName> --security-protocol SASL_SSL --producer.config /usr/hdp/current/kafka-broker/conf/producer.properties --new-producer

Execute Consumer

/usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server <kafkabrokerserver1>:6667 --topic <topicName> --from-beginning --security-protocol SASL_SSL --consumer.config /usr/hdp/current/kafka-broker/conf/consumer.properties --new-consumer

Messages entered in the producer console would be received in the consumer console


Using JAVA API

KafkaProducerNew.java

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
Properties prop = new Properties();
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "<kafkabrokerserver1>:6667,...");
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
prop.put("security.protocol", "SASL_SSL");
prop.put("ssl.truststore.location","/etc/security/ssl/client.truststore.jks");
prop.put("ssl.truststore.password", "<password>");

Producer producer = new KafkaProducer(prop);
String topicName = <topicName>;
String message = "this is a message";
ProducerRecord<String,String> rec = new ProducerRecord<String,String>(topicName,message);
producer.send(rec);

producer.close();

KafkaConsumerNew.java

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ProducerConfig;
import org.apache.kafka.clients.producer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerConfig;
private KafkaConsumer<String, String> kafkaConsumer;
private String topicName=<topicName>;
private String groupId;

Properties prop = new Properties();
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "<kafkabrokerserver1>:6667,...");
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
prop.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
prop.put(ConsumerConfig.CLIENT_ID_CONFIG, "simple");
prop.put("security.protocol", "SASL_SSL");
prop.put("ssl.truststore.location","/etc/security/ssl/client.truststore.jks");
prop.put("ssl.truststore.password", "<password>");

kafkaConsumer = new KafkaConsumer<String, String>(prop);
kafkaConsumer.subscribe(Arrays.asList(topicName));

try{

while(true){

ConsumerRecords<String,String> recs = kafkaConsumer.poll(100);
for(ConsumerRecord<String,String> rec : recs)
System.out.println(rec.value());

}

}catch(WakeupException ex){

system.err.println(ex.getMessage());

}finally{

kafkaConsumer.close();

}

Dependencies (pom.xml)

<project xmlns="http://maven.org./POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
<modelVersion>4.0.0</modelVersion>

<groupId></groupId>
<artifactId>KafkaProducer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>

<dependencies>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.9.0.2.4.2.12-1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.2.4.2.12-1</version>
</dependency>

<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.9</version>
</dependency>

</dependencies>

Execute Producer

java -Djava.security.auth.login.config=/usr/hdp/current/kafka-broker/config/kafka_client_jaas.conf -Djava.security.krb5.conf=/etc/krb5.conf -cp <uberJar> <ProducerClassName> <topicName>

Execute Consumer

java -Djava.security.auth.login.config=/usr/hdp/current/kafka-broker/config/kafka_client_jaas.conf -Djava.security.krb5.conf=/etc/krb5.conf -cp <uberJar> <ConsumerClassName> <topicName>

Posted in Kafka | Tagged , , , , | Leave a comment

Apache Storm

Architecture / Components

  • Nimbus and Supervisor daemons are designed to be
    • fail-fast (process self-destructs whenever any unexpected situation is encountered)
    • stateless (all state is kept in Zookeeper or on disk)
  • Nimbus and Supervisor daemons must be run under supervision using a tool like daemontools or monit.
  • So if the Nimbus or Supervisor daemons die, they restart like nothing happened.

Topology

Graph of computation where the

nodes represent some individual computations and

edges data being passed between nodes

Tuple

Ordered list of values

Storm provides mechanisms for assigning names to the values.

Stream

Unbounded sequence of Tuples.

Spout

Source of stream in  a topology

Bolt

Accepts tuple from input stream

performs computation / transformation

Emits a new tuple to its output stream

**Each spout and bolt will have one or many individual instances that perform all of this processing in parallel

Stream Grouping 

tells Storm how to send individual tuples between instances of the spout and bolts

Shuffle Grouping: tuples are emitted to instances of bolts at random

 

Posted in Storm, Uncategorized | Tagged , | Leave a comment