Oozie: CRON style scheduling in Oozie

  • Cron scheduling adds a lot of flexibility while scheduling jobs using the Oozie coordinator.
  • Its bit tricky, but once you familiarize its going to benefit a lot.
  • Here, just focus on the frequency part in your coordinator.xml

<coordinator-app name=”oozie-coordinator” frequency=”0/10 1/2 * * *” start=”2017-10-05T:00Z” end=”2099-09-19T13:00Z” timezone=”Canada/Eastern” xmlns=”uri:oozie:coordinator:0.1″>

frequency=”01 19 * * *”

Cron expression has 5 fields

  • Minutes: Accepts values 0-59
  • Hours: 0-23
  • Day of the Month: 1-31
  • Month: JAN-DEC or 1-12
  • Day of the Month:  SUN-SAT or 1-7

 

stay tuned…

Advertisements
Posted in oozie, Uncategorized | Tagged , , | Leave a comment

Oozie: How to pass current date to Work Flow

Coordinator.xml

<coordinator-app name=”oozie-coordinator” frequency=”1440″ start=”2017-10-06T15:00Z” end=”2099-09-19T13:00Z” timezone=”Canada/Eastern” xmlns=”uri:oozie:coordinator:0.1″>
<action>
<workflow>
<app-path>${workflowxml}</app-path>

<configuration>
<property>
<name>currentDate</name>
<value>${coord:formatTime(coord:dateOffset(coord:nominalTime(), 0,
‘DAY’), “yyyyMMdd”)}
</value>
</property>
</configuration>

</workflow>
</action>
</coordinator-app>

**Change “0” to -1 if you prefer PREVIOUS DATE

Access currentDate in your workflow.xml and pass it to your shell / ssh action

<ssh xmlns=”uri:oozie:ssh-action:0.1″>

<host>${focusNode}</host>

<command>${sh1_script}</command>

<args>${currentDate}</args>

<capture-output/>
</ssh>

 

Use it in your shell script

#!/bin/bash

touch $1

Posted in oozie, Uncategorized | Tagged , , , , , | Leave a comment

Tips: Hive

Mask a Column

Create a table, Insert values to it

CREATE TABLE IF NOT EXISTS employee_test1 ( eid String, name String)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ‘\t’
LINES TERMINATED BY ‘\n’
STORED AS TEXTFILE;

INSERT INTO TABLE employee_test1 VALUES (‘1’, ‘abc1’), (‘3’, ‘bcd1’);

INSERT INTO TABLE employee_test1 VALUES (‘2’, ‘abc2’), (‘4’, ‘bcd1’);

Create a View

CREATE VIEW employee_test1_vw
AS
SELECT mask_hash(eid) as eid,name
FROM employee_test1;

 

Create Surrogate Key / Unique rows

CREATE TABLE test(
r_id string,
seq_nbr int,
dt string)
STORED AS ORC TBLPROPERTIES (
………………..
);

Initial Load

INSERT INTO TABLE test
SELECT T.r_id, ROW_NUMBER () OVER(ORDER BY T.r_id) AS seq_nbr
FROM test1 T limit 10;

Subsequent Loads

Tip: Get the max of seq_nbr, add it to the row number

SELECT max(seq_nbr) FROM test;

INSERT INTO TABLE test
SELECT T.r_id, ROW_NUMBER () OVER(ORDER BY T.r_id)+<max of seq_nbr> AS seq_nbr
FROM test1 T ;

Posted in Tips, Uncategorized | Tagged , , , , , , | Leave a comment

Hadoop Cluster : Run Command on ALL Nodes

Its usually tough to run a command on all nodes of a hadoop cluster.

Here is a script to do that..

run_command

!/bin/bash

TPUT=’tput -T xterm-color’
txtund=$(${TPUT} sgr 0 1) # Underline
txtbld=$(${TPUT} bold) # Bold
txtrst=$(${TPUT} sgr0) # Reset
if [ $# -ne 1 ]; then
echo “Please pass the command to run as the first argument. Use single quotes if the command takes multiple arguments itself”
else
for host in `cat Hostdetail.txt`; do
echo -e “${txtbld}\n######################################################”
echo -e “# Running command $1 on Host: $host”
echo “######################################################${txtrst}”
ssh root@$host COMMAND=”‘$1′” ‘bash -s’ << ‘END’
$COMMAND
END
done
fi

Hostdetails.txt

server1

server2..

Usage:-

Eg:-

./run_command “ls /usr/lib/jvm/java-1.7.0-openjdk-1.7.0.45.x86_64/jre/bin/”

Edit Hostdetail.txt
** Dont include the node from which the command is being executed

Posted in Hadoop Cluster Administration, Hadoop Cluster Installation, Uncategorized, Unix | Leave a comment

Some Curl Commands for BigData

Writing to HDFS

curl -i -X PUT -T $file -L “http://$namenode:50070/webhdfs/v1//$file?op=CREATE&user.name=$user&#8221;

Reading from HDFS

curl -i -X GET “http://$namenode:50070/webhdfs/v1//$file?op=OPEN&#8221;

In a kerberized environment (Writing to HDFS)

curl –negotiate -ku : -X PUT $file “http://:50070/webhdfs/v1//$file?op=CREATE&user.name=&#8221;

OR

curl -iku $userName:$password -L -T $file  -X PUT “http://:50070/webhdfs/v1//$file?op=CREATE&overwrite=true&#8221;

From a Windows Machine (with Security)

  • cd to the bin directory where JRE is installed
  • create a kerberos ticket (using kinit)

Make a directory in HDFS

curl –negotiate -iku : -X PUT  “http://:50070/webhdfs/v1//?op=MKDIRS

Copy File

curl –negotiate -iku : -X PUT -T $file -L “http://:50070/webhdfs/v1//$file?op=CREATE

Posted in Rest API, Uncategorized, webhdfs | Tagged , , , | Leave a comment

Truststore & Keystore

In SSL handshake,

  • TrustStore is to verify credentials
  • stores certificates from third party, Java application communicate or certificates signed by CA(certificate authorities like Verisign) which can be used to identify third party.

 

  • KeyStore is to provide credential.
  • stores private key and certificates corresponding to there public keys and require if you are SSL Server or SSL requires client authentication

 

Posted in SSL, Uncategorized | Tagged , , | Leave a comment

Integrating Kafka and Storm

Intro

  • This article focuses on integrating Storm 0.10.0.2 and Kafka 0.8.2.2
  • Data is not encrypted in this case

Create client_jaas file (under /usr/hdp/current/storm-client/conf/)

KafkaClient{

com.sun.security.auth.module.Krb5LoginModule required

useTicketCache=true

renewTicket=true

serviceName="kafka";

};
Client{

com.sun.security.auth.module.Krb5LoginModule required

useTicketCache=true

renewTicket=false

serviceName="zk";

};
StormClient{

com.sun.security.auth.module.Krb5LoginModule required

useTicketCache=true

renewTicket=true

serviceName="nimbus";

};

Create storm.yaml (/usr/hdp/current/storm-client/conf/)

nimbus.seeds:["",""]
nimbus.thrift.port:6627
java.security.auth.login.config:"/etc/storm/conf/client_jaas.conf"
storm.thrift.transport:"backtype.storm.security.auth.kerberos.KerberosSaslTransportPlugin"

Create client_jaas.conf file (which has to be copied to a local directory in the supervisor nodes)

KafkaClient{

com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab=""
storeKey=true
useTicketCache=false
principal="@DOMAIN"
serviceName="kafka";

};
Client{

com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab=""
storeKey=true
useTicketCache=false
principal="@DOMAIN"
serviceName="zookeeper"
};

is the service Id

Create key tab file

ktutil
addent -password -p @DOMAIN -k 1 -e aes256-cts
addent -password -p @DOMAIN -k 1 -e arcfour-hmac
wkt user.keytab
quit

Using secure copy, copy the keytab files and storm client_jaas.conf into supervisor local dirs

KafkaSpout and Topology creation

String zkIp = "zookeeperServer1, server2...";
String zookeepers = "zookeeperServer1:2181,...";
String brokerZkPath = "/kafka/brokers";
String zkRoot = "/kafka-spout";
ZkHosts zkHosts = new ZkHosts(zookeepers,brokerZkPath);
SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, topicName, zkRoot, "storm");
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig);
Config config = new Config();
config.setMaxTaskParallelism(5);
config.setDebug(true);
config.setNumWorkers(2);
config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING,1);
config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,2);
config.put(Config.NIMBUS_THRIFT_PORT, 6627);
config.put(Config.STORM_ZOOKEEPER_PORT, 2181);
config.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList(zkip));
config.put(Config.TOPOLOGY_WORKER_CHILDOPTS,"-Xmx768m -Djava.security.auth.login.config=<clientjaas location local to supervisor nodes, may be an NFS> -javaagent:/usr/hdp/current/storm-client/contrib/storm-jmxetric/lob/jmxetric-1.0.4.jar=host=localhost,port=8650,wireformat31x=true,mode=multicast,config=/usr/hdp/current/storm-client/....jmxetric-conf.xml,process=Worker_%ID%_JVM");
config.put(Config.TOPOLOGY_AUTO_CREDENTIALS,Arrays.asList(new String[]{"backtype.storm.security.auth.kerberos.AutoTGT"});

TopologyBuilder.builder=new TopologyBuilder();
builder.setSpout("spoutName",kafkaSpout,1); 
builder.setBolt("print", new PrintBolt()).shuffleGrouping("spoutName"));
try{
StormSubmitter.submitTopologyWithProgressBar("<topologyName>",config,builder.createTopology());
}catch(Exception e){

}
}
}

Bolt:-

class PrintBolt extends BaseRichBolt{

public void execute (Tuple tuple){

System.out.println(“message…”);

}

}

..

pom.xml

storm version: 0.10.0.2.3.0.0-2557

storm.kafka.version: 0.10.0.2.3.0.0-2557

kafka.version:0.8.2.2.3.0.0-2557

Exclude from each dependency

groupId:org.slf4j

artifactId: slf4j-log4j-over-slf4j

artifactId:slf4j-log4j12

Create an uber jar and move it to the Edge Node

Run Topology

storm jar <jarName> <className>

List Topologies

storm list

Check the Storm log files (from Supervisor Nodes) to verify the output.

Kill Topology

storm kill <topName>

 

Posted in Kafka, Storm | Tagged , , | Leave a comment