Spark: Transformations with Examples

FILTER

List<String> strList = Arrays.asList("qqqqqqqqwwwww", "eeeeerrrrrrrrr", "ttttttttyyyyyyyyy");

SparkConf conf = new SparkConf();static SparkConf conf = new SparkConf(); static JavaSparkContext jsc = new JavaSparkContext(conf);

JavaRDD<String> filterStrRDD = strRDD.filter ( new Function<String, Boolean>(){JavaRDD<String> filterStrRDD = strRDD.filter ( new Function<String, Boolean>(){
 @Override public Boolean call(String arg0) throws Exception { return arg0.co
ntains("q"); } });

MAP

JavaRDD<Integer> mapIntRDD = intRDD.map(new Function<Integer, Integer>(){JavaRDD<Integer> mapIntRDD = intRDD.map(new Function<Integer, Integer>(){
 @Override public Integer call(Integer arg0) throws Exception { return arg0*arg0; }}); System.out.println("________________________"+mapIntRDD.first()+"_______________________");

FLATMAP

static void flatMapTest () {static void flatMapTest () { JavaRDD<String> flatmaprdd = jsc.parallelize(Arrays.asList("We illustrate the difference between in Figure 3-3","of lists we have an RDD")); JavaRDD<String> flatmaprddres = flatmaprdd.flatMap(new FlatMapFunction<String, String>(){ public Iterable<String> call (String x){ return Arrays.asList(x.split(" ")); } }); for (String y: flatmaprddres.collect()) { System.out.println("OOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOO: "+y); } }

………..

Advertisements
Posted in Uncategorized | Tagged , , , , , | Leave a comment

Spark: Actions with Examples

REDUCE

  • Reduces the elements of this RDD using the specified commutative and associative binary operator.
  • reduce(<function type>) takes a Function Type ; which takes 2 elements of RDD Element Type as argument & returns the Element of same type

Example:-

JavaRDD<Integer> jrdd1 = ctx.parallelize(Arrays.asList(1, 2, 3, 4));

Integer sumRDD = jrdd1
.reduce(new Function2<Integer, Integer, Integer>() {

public Integer call(Integer v1, Integer v2)
throws Exception {

return v1 + v2;
}
});

System.out.println(“SUM:” + sumRDD);

 

FOLD

  • Aggregate the elements of each partition, and then the results for all the partitions, using a given associative and commutative function and a neutral “zero value”.
  • fold() is similar to reduce except that it takes a ‘Zero value‘(Think of it as a kind of initial value) which will be used in the initial call on each Partition.
  • Both fold() and reduce() require that the return type of our result be the same type as that of the elements in the RDD we are operating over.

JavaRDD<Integer> rdd = ctx.parallelize(Arrays.asList(1, 2, 3, 4));
System.out.println(“Num Partitions: “+rdd.partitions().size());
Integer result = rdd.fold(0,
new Function2<Integer, Integer, Integer>() {
public Integer call(Integer x, Integer y) {
return x + y;
}
});
System.out.println(“FOLD Result:”+result);

 

AGGREGATE

  • aggregate() function frees us from the constraint of having the return be the same type as the RDD we are working on.
  • With aggregate(), like fold(), we supply an initial zero value of the type we want to return.

supply an initial zero value of the type we want to return.

AvgCount initial = new AvgCount(0, 0);

supply a function to combine the elements from our RDD with the accumulator

Function2<AvgCount, Integer, AvgCount> addAndCount =Function2<AvgCount, Integer, AvgCount> addAndCount =   new Function2<AvgCount, Integer, AvgCount>() {     public AvgCount call(AvgCount a, Integer x) {       a.total += x;       a.num += 1;       return a;   } };

Consider the RDD

JavaRDD<Integer> rdd = jsc.parallelize(Arrays.asList(1,2,3,4,5));

Above function creates an object AvgCount with the total of values in the RDD ie, 15 and number of elements ie, 5

supply a second function

to merge two accumulators, given that each node accumulates its own results locally

 Function2<AvgCount, AvgCount, AvgCount> combine = Function2<AvgCount, AvgCount, AvgCount> combine =   new Function2<AvgCount, AvgCount, AvgCount>() {   public AvgCount call(AvgCount a, AvgCount b) {     a.total += b.total;     a.num += b.num;     return a;   } };

Above function combines all the values across all the AvgCount objects

Finally pass them to aggregate function to calculate Average, Mean and so on later.

 

AvgCount initial = new AvgCount(0, 0);
AvgCount initial = new AvgCount(0, 0); AvgCount result = rdd.aggregate(initial, addAndCount, combine);

Code:-

static void aggregateTest() { 
JavaRDD<Integer> rdd = jsc.parallelize(Arrays.asList(1,2,3,4,5)); 
Function2<AvgCount, Integer, AvgCount> addAndCount =   new Function2<AvgCount, Integer, AvgCount>() {     public AvgCount call(AvgCount a, Integer x) {       a.total += x;       a.num += 1;       return a;   } }; 
Function2<AvgCount, AvgCount, AvgCount> combine =   new Function2<AvgCount, AvgCount, AvgCount>() {   public AvgCount call(AvgCount a, AvgCount b) {     a.total += b.total;     a.num += b.num;     return a;   } }; 
AvgCount initial = new AvgCount(0, 0); AvgCount result = rdd.aggregate(initial, addAndCount, combine); 
System.out.println("AGGREGATE: "+result.avg()+"**********************************************"); }
}

class AvgCount implements Serializable { public AvgCount(int total, int num) {     this.total = total;     this.num = num;   } int total; int num; double avg(){ return total / num; }

 

PERSIST

  • Faster way to use  the same RDD multiple times
  • Avoid computing an RDD multiple times
  • During persist, the nodes that compute the RDD store their partitions.
  • If a node that has data persisted on it fails, Spark will recompute the lost partitions of the data when needed.
  • Possible to replicate data on multiple nodes
  •  unpersist() to remove RDDs from the cache
  • In case of trying to cache too much data to fit in memory, Spark will automatically evict old partitions using a Least Recently Used (LRU) cache policy
rdd1.persist(StorageLevel.MEMORY_ONLY());

Level Space used CPU time In memory On disk Comments
MEMORY_ONLY High Low Y N
MEMORY_ONLY_SER Low High Y N
MEMORY_AND_DISK High Medium Some Some Spills to disk if there is too much data to fit in memory.
MEMORY_AND_DISK_SER Low High Some Some Spills to disk if there is too much data to fit in memory. Stores serialized representation in memory.
DISK_ONLY Low High N Y

 

 

..

Posted in spark, Uncategorized | Tagged , , , , | Leave a comment

Spark: Resolving Errors

 java.lang.OutOfMemoryError:

Java heap space

Fix: Increase the driver memory using –driver-memory

Posted in spark, Uncategorized | Tagged , | Leave a comment

Use Case: Automate data flow into HDFS / Hive using Oozie

I am planning to publish few use cases relating to Big data that will be helpful to any industry.

Intro

Automate data load process into HDFS, Hive and Hive ORC tables !

Suppose you are receiving daily feeds from any source (database, log files or anything) and need to make it available in HDFS and in turn in Hive for analytics. Here is a simple solution using Oozie. Here, lets schedule the process to run on every Business Day at a specific time. Following features are included:-

  • SFTP data
  • Receiving the feeds
  • Loading into the staging table
  • Backing up the loaded data
  • Cleaning the daily feed location
  • Creating partitions
  • Incremental imports into the ORC.
  • Email Notification

You will get success or failure notifications in your inbox after the daily run.

How to SFTP file into the machine attached to the Hadoop Cluster

sftp user@hostname

sftp>

Switch directories in local machine
 lcd testdir

Switch directories in remote machine
 cd testdestdir

copy files: put filename
 download files: get filename

Schedule using Oozie SSH Action

coordinator.xml

This lets you schedule the load process Monday through Friday at the time you specify.

Runs at 20.00 GMT

“currentDate” is grabbed from the coordinator and passed as a parameter to create HDFS directories and Hive partitions.

<coordinator-app name="coordinator-name" frequency="0 20 * * MON-FRI " start="2015-10-24T15:00Z" end="2099-09-19T18:00Z" timezone="UTC" xmlns="uri:oozie:coordinator:0.2">

<action>

<workflow><app-path>${workflowxml}</app-path>
 <configuration>

<property>

<name>currentDate</name>                          <value>${coord:formatTime(coord:dateOffset(coord:nominalTime(), 0,                           'DAY'), "yyyyMMdd")}

</value>

</property>

</configuration>
 </workflow>

</action>

</coordinator-app>

 

workflow.xml

<workflow-app xmlns = "uri:oozie:workflow:0.4" name = "">
 
 <credentials>
 <credential name="hiveCreds" type="hive2">
 <property>
 <name>hive2.jdbc.url</name>
 <value>jdbc:hive2://<>:10000/default</value>
 </property>
 <property>
 <name>hive2.server.principal</name>
 <value>hive/<>@.....COM</value>
 </property>
 </credential>
 </credentials>

<start to="sshAction"/>



 <action name="sshAction">
 <ssh xmlns="uri:oozie:ssh-action:0.1">
 <host>${focusNode}</host>
 <command>${sh1_script}</command>
 <args>${currentDate}</args>

<capture-output/>
 </ssh>
 <ok to="hivejob"/>
 <error to="sendFailEmail"/>
 </action>
 
 <action name="hivejob" cred="hiveCreds">
 
 <hive2 xmlns="uri:oozie:hive2-action:0.1">
 <job-tracker>${jobTracker}</job-tracker>
 <name-node>${nameNode}</name-node>
 
 <prepare>
 <!-- <mkdir path="hdfs://<>:8020/user/.../${wf:actionData('sshAction')['valu']}"/>-->
 
 </prepare>
 
 <jdbc-url>jdbc:hive2://<>:10000/default</jdbc-url>
 test.sql

<param>partitiondir=${currentDate}</param>
 </hive2>
 
 <ok to="hiveInsertOrc"/>
 <error to="sendFailEmail"/>
 </action>
 
 <action name="hiveInsertOrc" cred="hiveCreds">
 
 <hive2 xmlns="uri:oozie:hive2-action:0.1">
 <job-tracker>${jobTracker}</job-tracker>
 <name-node>${nameNode}</name-node>
 

 
 <jdbc-url>jdbc:hive2://<>:10000/default</jdbc-url>
 insertOrc.sql

<param>partitiondir=${currentDate}</param>
 </hive2>
 
 <ok to="sendSuccessEmail"/>
 <error to="sendFailEmail"/>
 </action>


<action name="sendSuccessEmail"> 
 <email xmlns="uri:oozie:email-action:0.1"> 
 <to>${emailToAddress}</to> 
 <subject>SUCCESS</subject> 
 <body>Loaded data successfully for the date ${currentDate} 
| Ref Workflow Id: ${wf:id()} </body> 
 </email> 
 <ok to="end"/> 
 <error to="killAction"/> 
</action>

<action name="sendFailEmail"> 
 <email xmlns="uri:oozie:email-action:0.1"> 
 <to>${emailFailToAddress}</to> 
 <subject>FAILED</subject> 
 <body>Load Failed
Date : ${currentDate} 
Ref Workflow Id: ${wf:id()} </body> 
 </email> 
 <ok to="killAction"/> 
 <error to="killAction"/> 
</action>
 
 <kill name="killAction"> 
 <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
 </kill> 
 <end name="end"/> 
</workflow-app>

 

coordinator. properties

nameNode = hdfs://<>:8020
 jobTracker = <>:8050
 focusNode = user@hostname.com
 oozie.use.system.libpath = true
 baseDir = hdfs://<>:8020/user/<>
 oozie.coord.application.path=${baseDir}
 oozie.wf.rerun.failnodes=true
 userName = <>@domain.com
 sh1_script =<machine local>/shell1_script.sh
 oozie.libpath=<>
 workflowxml=${baseDir}/workflow.xml
 emailToAddress = <>
 emailFailToAddress = <>

shell script

#!/bin/bash
today=$1

#today='test'
hdfs dfs -mkdir /........./$today
hdfs dfs -put /....../*.txt /....../$today/
mv /..../*.txt /........./BACKUP/
touch $today

test.sql

ALTER TABLE <tablename> ADD PARTITION (dt='${partitiondir}') location '/....../${partitiondir}';

insertOrc.sql

INSERT INTO TABLE <table>_orc
 SELECT
 constant ,
 from_unixtime(unix_timestamp(batch_date, 'yyyy-MM-dd'), 'yyyy-MM-dd HH:mm:ss'),

..............

dt
 FROM <tablename>
 where dt = '${partitiondir}'
 ;

 

Commands

export JAVA_HOME=/usr/......./jdk1.8.0_112
export OOZIE_URL=https://<>:11443/oozie

hdfs dfs -put /...../orc-workflow /user/<hdfsdir>/orc-workflow

oozie job -run -config /...orc-workflow/coordinator.properties

 

Congrats you have automated data load process into HDFS, Hive and Hive ORC tables !

Feel Free to comment in case of any questions..

Stay tuned, Have a good one !

 

Posted in Uncategorized, use case | Tagged , , , , , , , | Leave a comment

Oozie: CRON style scheduling in Oozie

  • Cron scheduling adds a lot of flexibility while scheduling jobs using the Oozie coordinator.
  • Its bit tricky, but once you familiarize its going to benefit a lot.
  • Here, just focus on the frequency part in your coordinator.xml

<coordinator-app name=”oozie-coordinator” frequency=”0/10 1/2 * * *” start=”2017-10-05T:00Z” end=”2099-09-19T13:00Z” timezone=”Canada/Eastern” xmlns=”uri:oozie:coordinator:0.1″>

frequency=”01 19 * * *”

Cron expression has 5 fields

  • Minutes: Accepts values 0-59
  • Hours: 0-23
  • Day of the Month: 1-31
  • Month: JAN-DEC or 1-12
  • Day of the Month:  SUN-SAT or 1-7

 

stay tuned…

Posted in oozie, Uncategorized | Tagged , , | Leave a comment

Oozie: How to pass current date to Work Flow

Coordinator.xml

<coordinator-app name=”oozie-coordinator” frequency=”1440″ start=”2017-10-06T15:00Z” end=”2099-09-19T13:00Z” timezone=”Canada/Eastern” xmlns=”uri:oozie:coordinator:0.1″>
<action>
<workflow>
<app-path>${workflowxml}</app-path>

<configuration>
<property>
<name>currentDate</name>
<value>${coord:formatTime(coord:dateOffset(coord:nominalTime(), 0,
‘DAY’), “yyyyMMdd”)}
</value>
</property>
</configuration>

</workflow>
</action>
</coordinator-app>

**Change “0” to -1 if you prefer PREVIOUS DATE

Access currentDate in your workflow.xml and pass it to your shell / ssh action

<ssh xmlns=”uri:oozie:ssh-action:0.1″>

<host>${focusNode}</host>

<command>${sh1_script}</command>

<args>${currentDate}</args>

<capture-output/>
</ssh>

 

Use it in your shell script

#!/bin/bash

touch $1

Posted in oozie, Uncategorized | Tagged , , , , , | Leave a comment

Tips: Hive

Mask a Column

Create a table, Insert values to it

CREATE TABLE IF NOT EXISTS employee_test1 ( eid String, name String)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ‘\t’
LINES TERMINATED BY ‘\n’
STORED AS TEXTFILE;

INSERT INTO TABLE employee_test1 VALUES (‘1’, ‘abc1’), (‘3’, ‘bcd1’);

INSERT INTO TABLE employee_test1 VALUES (‘2’, ‘abc2’), (‘4’, ‘bcd1’);

Create a View

CREATE VIEW employee_test1_vw
AS
SELECT mask_hash(eid) as eid,name
FROM employee_test1;

 

Create Surrogate Key / Unique rows

CREATE TABLE test(
r_id string,
seq_nbr int,
dt string)
STORED AS ORC TBLPROPERTIES (
………………..
);

Initial Load

INSERT INTO TABLE test
SELECT T.r_id, ROW_NUMBER () OVER(ORDER BY T.r_id) AS seq_nbr
FROM test1 T limit 10;

Subsequent Loads

Tip: Get the max of seq_nbr, add it to the row number

SELECT max(seq_nbr) FROM test;

INSERT INTO TABLE test
SELECT T.r_id, ROW_NUMBER () OVER(ORDER BY T.r_id)+<max of seq_nbr> AS seq_nbr
FROM test1 T ;

Posted in Tips, Uncategorized | Tagged , , , , , , | Leave a comment