Spark: Actions with Examples

REDUCE

  • Reduces the elements of this RDD using the specified commutative and associative binary operator.
  • reduce(<function type>) takes a Function Type ; which takes 2 elements of RDD Element Type as argument & returns the Element of same type

Example:-

JavaRDD<Integer> jrdd1 = ctx.parallelize(Arrays.asList(1, 2, 3, 4));

Integer sumRDD = jrdd1
.reduce(new Function2<Integer, Integer, Integer>() {

public Integer call(Integer v1, Integer v2)
throws Exception {

return v1 + v2;
}
});

System.out.println(“SUM:” + sumRDD);

 

FOLD

  • Aggregate the elements of each partition, and then the results for all the partitions, using a given associative and commutative function and a neutral “zero value”.
  • fold() is similar to reduce except that it takes a ‘Zero value‘(Think of it as a kind of initial value) which will be used in the initial call on each Partition.
  • Both fold() and reduce() require that the return type of our result be the same type as that of the elements in the RDD we are operating over.

JavaRDD<Integer> rdd = ctx.parallelize(Arrays.asList(1, 2, 3, 4));
System.out.println(“Num Partitions: “+rdd.partitions().size());
Integer result = rdd.fold(0,
new Function2<Integer, Integer, Integer>() {
public Integer call(Integer x, Integer y) {
return x + y;
}
});
System.out.println(“FOLD Result:”+result);

 

AGGREGATE

  • aggregate() function frees us from the constraint of having the return be the same type as the RDD we are working on.
  • With aggregate(), like fold(), we supply an initial zero value of the type we want to return.

supply an initial zero value of the type we want to return.

AvgCount initial = new AvgCount(0, 0);

supply a function to combine the elements from our RDD with the accumulator

Function2<AvgCount, Integer, AvgCount> addAndCount =Function2<AvgCount, Integer, AvgCount> addAndCount =   new Function2<AvgCount, Integer, AvgCount>() {     public AvgCount call(AvgCount a, Integer x) {       a.total += x;       a.num += 1;       return a;   } };

Consider the RDD

JavaRDD<Integer> rdd = jsc.parallelize(Arrays.asList(1,2,3,4,5));

Above function creates an object AvgCount with the total of values in the RDD ie, 15 and number of elements ie, 5

supply a second function

to merge two accumulators, given that each node accumulates its own results locally

 Function2<AvgCount, AvgCount, AvgCount> combine = Function2<AvgCount, AvgCount, AvgCount> combine =   new Function2<AvgCount, AvgCount, AvgCount>() {   public AvgCount call(AvgCount a, AvgCount b) {     a.total += b.total;     a.num += b.num;     return a;   } };

Above function combines all the values across all the AvgCount objects

Finally pass them to aggregate function to calculate Average, Mean and so on later.

 

AvgCount initial = new AvgCount(0, 0);
AvgCount initial = new AvgCount(0, 0); AvgCount result = rdd.aggregate(initial, addAndCount, combine);

Code:-

static void aggregateTest() { 
JavaRDD<Integer> rdd = jsc.parallelize(Arrays.asList(1,2,3,4,5)); 
Function2<AvgCount, Integer, AvgCount> addAndCount =   new Function2<AvgCount, Integer, AvgCount>() {     public AvgCount call(AvgCount a, Integer x) {       a.total += x;       a.num += 1;       return a;   } }; 
Function2<AvgCount, AvgCount, AvgCount> combine =   new Function2<AvgCount, AvgCount, AvgCount>() {   public AvgCount call(AvgCount a, AvgCount b) {     a.total += b.total;     a.num += b.num;     return a;   } }; 
AvgCount initial = new AvgCount(0, 0); AvgCount result = rdd.aggregate(initial, addAndCount, combine); 
System.out.println("AGGREGATE: "+result.avg()+"**********************************************"); }
}

class AvgCount implements Serializable { public AvgCount(int total, int num) {     this.total = total;     this.num = num;   } int total; int num; double avg(){ return total / num; }

 

PERSIST

  • Faster way to use  the same RDD multiple times
  • Avoid computing an RDD multiple times
  • During persist, the nodes that compute the RDD store their partitions.
  • If a node that has data persisted on it fails, Spark will recompute the lost partitions of the data when needed.
  • Possible to replicate data on multiple nodes
  •  unpersist() to remove RDDs from the cache
  • In case of trying to cache too much data to fit in memory, Spark will automatically evict old partitions using a Least Recently Used (LRU) cache policy
rdd1.persist(StorageLevel.MEMORY_ONLY());

Level Space used CPU time In memory On disk Comments
MEMORY_ONLY High Low Y N
MEMORY_ONLY_SER Low High Y N
MEMORY_AND_DISK High Medium Some Some Spills to disk if there is too much data to fit in memory.
MEMORY_AND_DISK_SER Low High Some Some Spills to disk if there is too much data to fit in memory. Stores serialized representation in memory.
DISK_ONLY Low High N Y

 

 

..

Advertisements

About shalishvj : My Experience with BigData

6+ years of experience using Bigdata technologies in Architect, Developer and Administrator roles for various clients. • Experience using Hortonworks, Cloudera, AWS distributions. • Cloudera Certified Developer for Hadoop. • Cloudera Certified Administrator for Hadoop. • Spark Certification from Big Data Spark Foundations. • SCJP, OCWCD. • Experience in setting up Hadoop clusters in PROD, DR, UAT , DEV environments.
This entry was posted in spark, Uncategorized and tagged , , , , . Bookmark the permalink.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s