Developer’s template: Spark

Developer’s template series is intended to ease the life of  Bigdata developers with their application development and leave behind the headache of starting from the scratch. Following program helps you develop and execute an application using  Apache Spark with Java.

Prerequisites

  • Hadoop cluster
  • Eclipse
  • Maven
  • Java

 

**This program has been tested with Hortonworks hadoop HDP 2.3.2, Spark 1.4.1, Java 1.7.0_79

Spark-Java code

package com.abc.sparkTest;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;

public class WordCount {
private static final FlatMapFunction<String, String> WORDS_EXTRACTOR =
new FlatMapFunction<String, String>() {
public Iterable call(String s) throws Exception {
return Arrays.asList(s.split(” “));
}
};

private static final PairFunction<String, String, Integer> WORDS_MAPPER =
new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<String, Integer>(s, 1);
}
};

private static final Function2<Integer, Integer, Integer> WORDS_REDUCER =
new Function2<Integer, Integer, Integer>() {
public Integer call(Integer a, Integer b) throws Exception {
return a + b;
}
};

public static void main(String[] args) {

String inpath = “hdfs://<hdfs>:8020/tmp/extTable/test.csv”;
String outpath = “hdfs://<hdfs>:8020/tmp/extTable/out/out1”;

SparkConf conf = new SparkConf().setAppName(“sparkAction”);
//.setMaster(“yarn-cluster”);
JavaSparkContext context = new JavaSparkContext(conf);
JavaRDD file = context.textFile(inpath);
JavaRDD words = file.flatMap(WORDS_EXTRACTOR);
JavaPairRDD<String, Integer> pairs = words.mapToPair(WORDS_MAPPER);
JavaPairRDD<String, Integer> counter = pairs.reduceByKey(WORDS_REDUCER);
counter.saveAsTextFile(outpath);
}
}

pom.xml

<project xmlns=”http://maven.apache.org/POM/4.0.0&#8243; xmlns:xsi=”http://www.w3.org/2001/XMLSchema-instance&#8221;
xsi:schemaLocation=”http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd”&gt;
<modelVersion>4.0.0</modelVersion>

<groupId>com.abc</groupId>
<artifactId>sparkTest</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>

<name>sparkTest</name>
<url>http://maven.apache.org</url&gt;

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<log4j.version>2.0.2</log4j.version>
</properties>

<dependencies>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.4.1</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.2.1</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
</exclusion>
</exclusions>
</dependency>

</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<!– Additional configuration. –>
<artifactSet>
<excludes>
<exclude>org.apache.hadoop:jar</exclude>
</excludes>
</artifactSet>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation=”org.apache.maven.plugins.shade.resource.ManifestResourceTransformer”>
<manifestEntries>
<Main-Class>com/abc/sparkTest/WordCount</Main-Class>
<Build-Number>123</Build-Number>
</manifestEntries>
</transformer>
</transformers>
<finalName>uber-${artifactId}-${version}</finalName>
</configuration>
</execution>
</executions>
</plugin>

</plugins>
</build>

</project>

Generate Jar

mvn clean package

Execute

spark-submit –class com.abc.sparkTest.WordCount –master yarn-cluster –num-executors 3 –driver-memory 512m –executor-memory 512m –executor-cores 1 \
–conf spark.yarn.jar=hdfs://<hdfs>/sparkSamples/spark-assembly-1.2.0.2.2.0.0-82-hadoop2.6.0.2.2.0.0-2041.jar \
–queue <queuename>\
hdfs://<hdfs>/sparkSamples/sparkTest-0.0.1-SNAPSHOT.jar <input dir hdfs> \
<out dir hdfs>

 

stay tuned..

Advertisements

About shalishvj : My Experience with BigData

6+ years of experience using Bigdata technologies in Architect, Developer and Administrator roles for various clients. • Experience using Hortonworks, Cloudera, AWS distributions. • Cloudera Certified Developer for Hadoop. • Cloudera Certified Administrator for Hadoop. • Spark Certification from Big Data Spark Foundations. • SCJP, OCWCD. • Experience in setting up Hadoop clusters in PROD, DR, UAT , DEV environments.
This entry was posted in Java-Maven-Hadoop, spark and tagged , , , . Bookmark the permalink.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s