Hive JDBC Spring Boot Restful Webservice in Pivotal Cloud Foundry


In this post, lets build a Hive JDBC program, expose it as a restful webservice using Spring Boot and  host it in Pivotal Cloud Foundry

Under src/main/resources


Include the following files too

hive-jaas.conf { required

Note: /home/vcap/app/BOOT-INF/classes/ is the home directory in PCF


[libdefaults]  renew_lifetime = 7d  forwardable = true  default_realm = <RealmName>  ticket_lifetime = 24h    dns_lookup_realm = false    dns_lookup_kdc = false    default_ccache_name = /tmp/krb5cc_%{uid}


[logging]  default = FILE:/var/log/krb5kdc.log  admin_server = FILE:/var/log/kadmind.log  kdc = FILE:/var/log/krb5kdc.log
[realms]  <>= {    admin_server =


Imp: Include the keytab file

under src/main/java

Note: Package structure should be consistent across all the classes (eg:-, else your Spring Boot application wouldnt be able to scan and detect the web URIs / controller classes.


import org.springframework.boot.SpringApplication;

import org.springframework.boot.SpringApplication;

import org.springframework.boot.autoconfigure.SpringBootApplication;

public class Application {
public static void main(String[] args) {

Application.class, args);


Create necessary BEAN classes

For eg:-

public class TestBean {

String testId;

String testName;

public String getTestId() { return testId; } public void setTestId(String testId) { this.testId = testId; } public String getTestName() { return testName; } public void setTestName(String testName) { this.testName = testName; } public TestBean(String testId, String testName) { super(); this.testId = testId; this.testName = testName; } public TestBean() {  }



import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;

public class HiveConfig {

private Environment env;

public Statement hiveStatement() throws Exception {
Connection con = DriverManager.getConnection(env.getProperty(“db.url”));
return con.createStatement();


Controller Layer

import java.util.List;

import org.slf4j.Logger;import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.web.bind.annotation.RequestMethod;

import org.springframework.web.bind.annotation.RequestParam;

import org.springframework.web.bind.annotation.RestController;





public class SampleController {
private static final Logger logger = LoggerFactory.getLogger(SampleController.class);
private SampleService service;

public SampleController(SampleService service) {        this.service = service;    }

@RequestMapping(method = RequestMethod.GET)

public List<TestBean> getDetails(@RequestParam(“id”) String id) {

return service.getDetails(id);    }

Service Layer


import java.util.List;
import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;

public class SampleService {
private static final Logger logger = LoggerFactory.getLogger(SampleService.class);
private SampleRepository repository;

public SampleService(SampleRepository repository)

{        this.repository = repository;    }

public List<TestBean> getDetails(String aId) {

return repository.getConcurTxOnDefAprvr(aId);



import java.sql.ResultSet;import java.sql.ResultSetMetaData;

import java.sql.SQLException;import java.sql.Statement;

import java.util.ArrayList;

import java.util.List;
//import org.hibernate.JDBCException;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Repository;


public class SampleRepository {

private Statement statement;

public List<TestBean> getDetails(String id) {

List<TestBean> results = new ArrayList<TestBean>();
String sql = “select * from <tablename> where employee_id=” + id;

try {

ResultSet r = statement.executeQuery(sql);

while(             {

//create a bean obj for each row by using the bean constructor

TestBean=new TestBean(r.getString(1), r.getString(2), r.getString(3),r.getString(4), r.getString(5), r.getDouble(6));

results.add(d);                              }


catch (SQLException ex) { //throw new JDBCException(“hive failed”, ex); }
return results;

Rest API by default return a JSON response, use the below function if you want to return a List instead of Json


private String displayRow(ResultSet rs) throws SQLException {private String displayRow(ResultSet rs) throws SQLException { ResultSetMetaData metaData = rs.getMetaData(); int columns = metaData.getColumnCount(); StringBuffer record = new StringBuffer(); for (int i = 1; i <= columns; i++) { record.append(” ” + rs.getString(i) + ” “); } return record.toString(); }


Note: It needs lot of work to get pom.xml working as you will experience lot of conflicts in terms of log4j dependencies

<?xml version=”1.0″ encoding=”UTF-8″?><project xmlns=”; xmlns:xsi=”; xsi:schemaLocation=””&gt;



<version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging>

<description>Demo project for Spring Boot</description>




<!– <version>2.0.2.RELEASE</version>–>

<relativePath/> <!– lookup parent from repository –> </parent>
<properties> <>UTF-8</> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties>


<!– Spring Boot –>

<dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter</artifactId>        </dependency>

<dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-test</artifactId>            <scope>test</scope>        </dependency>
<dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-web</artifactId> <exclusions> <exclusion> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-tomcat</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> </exclusions>

</dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jetty</artifactId> <scope>provided</scope> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> </exclusions> </dependency>

<!– Hive Dependencies –>


<groupId>org.apache.hive</groupId>            <artifactId>hive-jdbc</artifactId>            <version>2.1.0</version>

<exclusions> <exclusion> <groupId>org.eclipse.jetty.aggregate</groupId> <artifactId>*</artifactId> </exclusion>

<exclusion>                    <groupId>org.apache.logging.log4j</groupId>                    <artifactId>log4j-slf4j-impl</artifactId>                </exclusion>

<exclusion> <artifactId>slf4j-log4j12</artifactId> <groupId>org.slf4j</groupId> </exclusion>

<exclusion> <artifactId>log4j</artifactId> <groupId>log4j</groupId> </exclusion> <exclusion> <artifactId>servlet-api</artifactId> <groupId>javax.servlet</groupId> </exclusion>            </exclusions>        </dependency>

<build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build>


<repository> <snapshots /> <id>snapshots</id> <name>libs-snapshot-local</name> <url>http://<>/artifactory/libs-snapshot/</url&gt; </repository> </repositories>


<pluginRepository> <snapshots /> <id>snapshots</id> <name>plugins-snapshot</name> <url>http://<>/artifactory/plugins-snapshot/</url></pluginRepository&gt;




Build the application using

mvn clean package


manifest.yml file at the same level as that of pom.xml

– name: springboothive
instances: 1
memory: 2G
path: target/<jarname under target>.jar
buildpack: java_buildpack_offline

Login in to Pivotal Cloud Foundry:

In command prompt

cf login -a <>

cf target -o <ORG name> -s <SPACE name>

Push the application

From the application folder

cf push

Check the PCF UI

UI: https://<>/login

Get the route corresponding to the application

eg;- https://<>/api/xyz

Test after forming the actual URI


You will get the output of the query as a JSON response




Posted in hive, Uncategorized | Tagged , , , , , | Leave a comment

Spark: Transformations with Examples


List<String> strList = Arrays.asList("qqqqqqqqwwwww", "eeeeerrrrrrrrr", "ttttttttyyyyyyyyy");

SparkConf conf = new SparkConf();static SparkConf conf = new SparkConf(); static JavaSparkContext jsc = new JavaSparkContext(conf);

JavaRDD<String> filterStrRDD = strRDD.filter ( new Function<String, Boolean>(){JavaRDD<String> filterStrRDD = strRDD.filter ( new Function<String, Boolean>(){
 @Override public Boolean call(String arg0) throws Exception { return
ntains("q"); } });


JavaRDD<Integer> mapIntRDD = Function<Integer, Integer>(){JavaRDD<Integer> mapIntRDD = Function<Integer, Integer>(){
 @Override public Integer call(Integer arg0) throws Exception { return arg0*arg0; }}); System.out.println("________________________"+mapIntRDD.first()+"_______________________");


static void flatMapTest () {static void flatMapTest () { JavaRDD<String> flatmaprdd = jsc.parallelize(Arrays.asList("We illustrate the difference between in Figure 3-3","of lists we have an RDD")); JavaRDD<String> flatmaprddres = flatmaprdd.flatMap(new FlatMapFunction<String, String>(){ public Iterable<String> call (String x){ return Arrays.asList(x.split(" ")); } }); for (String y: flatmaprddres.collect()) { System.out.println("OOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOO: "+y); } }


Posted in Uncategorized | Tagged , , , , , | Leave a comment

Spark: Actions with Examples


  • Reduces the elements of this RDD using the specified commutative and associative binary operator.
  • reduce(<function type>) takes a Function Type ; which takes 2 elements of RDD Element Type as argument & returns the Element of same type


JavaRDD<Integer> jrdd1 = ctx.parallelize(Arrays.asList(1, 2, 3, 4));

Integer sumRDD = jrdd1
.reduce(new Function2<Integer, Integer, Integer>() {

public Integer call(Integer v1, Integer v2)
throws Exception {

return v1 + v2;

System.out.println(“SUM:” + sumRDD);



  • Aggregate the elements of each partition, and then the results for all the partitions, using a given associative and commutative function and a neutral “zero value”.
  • fold() is similar to reduce except that it takes a ‘Zero value‘(Think of it as a kind of initial value) which will be used in the initial call on each Partition.
  • Both fold() and reduce() require that the return type of our result be the same type as that of the elements in the RDD we are operating over.

JavaRDD<Integer> rdd = ctx.parallelize(Arrays.asList(1, 2, 3, 4));
System.out.println(“Num Partitions: “+rdd.partitions().size());
Integer result = rdd.fold(0,
new Function2<Integer, Integer, Integer>() {
public Integer call(Integer x, Integer y) {
return x + y;
System.out.println(“FOLD Result:”+result);



  • aggregate() function frees us from the constraint of having the return be the same type as the RDD we are working on.
  • With aggregate(), like fold(), we supply an initial zero value of the type we want to return.

supply an initial zero value of the type we want to return.

AvgCount initial = new AvgCount(0, 0);

supply a function to combine the elements from our RDD with the accumulator

Function2<AvgCount, Integer, AvgCount> addAndCount =Function2<AvgCount, Integer, AvgCount> addAndCount =   new Function2<AvgCount, Integer, AvgCount>() {     public AvgCount call(AvgCount a, Integer x) { += x;       a.num += 1;       return a;   } };

Consider the RDD

JavaRDD<Integer> rdd = jsc.parallelize(Arrays.asList(1,2,3,4,5));

Above function creates an object AvgCount with the total of values in the RDD ie, 15 and number of elements ie, 5

supply a second function

to merge two accumulators, given that each node accumulates its own results locally

 Function2<AvgCount, AvgCount, AvgCount> combine = Function2<AvgCount, AvgCount, AvgCount> combine =   new Function2<AvgCount, AvgCount, AvgCount>() {   public AvgCount call(AvgCount a, AvgCount b) { +=;     a.num += b.num;     return a;   } };

Above function combines all the values across all the AvgCount objects

Finally pass them to aggregate function to calculate Average, Mean and so on later.


AvgCount initial = new AvgCount(0, 0);
AvgCount initial = new AvgCount(0, 0); AvgCount result = rdd.aggregate(initial, addAndCount, combine);


static void aggregateTest() { 
JavaRDD<Integer> rdd = jsc.parallelize(Arrays.asList(1,2,3,4,5)); 
Function2<AvgCount, Integer, AvgCount> addAndCount =   new Function2<AvgCount, Integer, AvgCount>() {     public AvgCount call(AvgCount a, Integer x) { += x;       a.num += 1;       return a;   } }; 
Function2<AvgCount, AvgCount, AvgCount> combine =   new Function2<AvgCount, AvgCount, AvgCount>() {   public AvgCount call(AvgCount a, AvgCount b) { +=;     a.num += b.num;     return a;   } }; 
AvgCount initial = new AvgCount(0, 0); AvgCount result = rdd.aggregate(initial, addAndCount, combine); 
System.out.println("AGGREGATE: "+result.avg()+"**********************************************"); }

class AvgCount implements Serializable { public AvgCount(int total, int num) { = total;     this.num = num;   } int total; int num; double avg(){ return total / num; }



  • Faster way to use  the same RDD multiple times
  • Avoid computing an RDD multiple times
  • During persist, the nodes that compute the RDD store their partitions.
  • If a node that has data persisted on it fails, Spark will recompute the lost partitions of the data when needed.
  • Possible to replicate data on multiple nodes
  •  unpersist() to remove RDDs from the cache
  • In case of trying to cache too much data to fit in memory, Spark will automatically evict old partitions using a Least Recently Used (LRU) cache policy

Level Space used CPU time In memory On disk Comments
MEMORY_AND_DISK High Medium Some Some Spills to disk if there is too much data to fit in memory.
MEMORY_AND_DISK_SER Low High Some Some Spills to disk if there is too much data to fit in memory. Stores serialized representation in memory.




Posted in spark, Uncategorized | Tagged , , , , | Leave a comment

Spark: Resolving Errors


Java heap space

Fix: Increase the driver memory using –driver-memory

Posted in spark, Uncategorized | Tagged , | Leave a comment

Use Case: Automate data flow into HDFS / Hive using Oozie

I am planning to publish few use cases relating to Big data that will be helpful to any industry.


Automate data load process into HDFS, Hive and Hive ORC tables !

Suppose you are receiving daily feeds from any source (database, log files or anything) and need to make it available in HDFS and in turn in Hive for analytics. Here is a simple solution using Oozie. Here, lets schedule the process to run on every Business Day at a specific time. Following features are included:-

  • SFTP data
  • Receiving the feeds
  • Loading into the staging table
  • Backing up the loaded data
  • Cleaning the daily feed location
  • Creating partitions
  • Incremental imports into the ORC.
  • Email Notification

You will get success or failure notifications in your inbox after the daily run.

How to SFTP file into the machine attached to the Hadoop Cluster

sftp user@hostname


Switch directories in local machine
 lcd testdir

Switch directories in remote machine
 cd testdestdir

copy files: put filename
 download files: get filename

Schedule using Oozie SSH Action


This lets you schedule the load process Monday through Friday at the time you specify.

Runs at 20.00 GMT

“currentDate” is grabbed from the coordinator and passed as a parameter to create HDFS directories and Hive partitions.

<coordinator-app name="coordinator-name" frequency="0 20 * * MON-FRI " start="2015-10-24T15:00Z" end="2099-09-19T18:00Z" timezone="UTC" xmlns="uri:oozie:coordinator:0.2">




<name>currentDate</name>                          <value>${coord:formatTime(coord:dateOffset(coord:nominalTime(), 0,                           'DAY'), "yyyyMMdd")}








<workflow-app xmlns = "uri:oozie:workflow:0.4" name = "">
 <credential name="hiveCreds" type="hive2">

<start to="sshAction"/>

 <action name="sshAction">
 <ssh xmlns="uri:oozie:ssh-action:0.1">

 <ok to="hivejob"/>
 <error to="sendFailEmail"/>
 <action name="hivejob" cred="hiveCreds">
 <hive2 xmlns="uri:oozie:hive2-action:0.1">
 <!-- <mkdir path="hdfs://<>:8020/user/.../${wf:actionData('sshAction')['valu']}"/>-->

 <ok to="hiveInsertOrc"/>
 <error to="sendFailEmail"/>
 <action name="hiveInsertOrc" cred="hiveCreds">
 <hive2 xmlns="uri:oozie:hive2-action:0.1">


 <ok to="sendSuccessEmail"/>
 <error to="sendFailEmail"/>

<action name="sendSuccessEmail"> 
 <email xmlns="uri:oozie:email-action:0.1"> 
 <body>Loaded data successfully for the date ${currentDate} 
| Ref Workflow Id: ${wf:id()} </body> 
 <ok to="end"/> 
 <error to="killAction"/> 

<action name="sendFailEmail"> 
 <email xmlns="uri:oozie:email-action:0.1"> 
 <body>Load Failed
Date : ${currentDate} 
Ref Workflow Id: ${wf:id()} </body> 
 <ok to="killAction"/> 
 <error to="killAction"/> 
 <kill name="killAction"> 
 <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
 <end name="end"/> 


coordinator. properties

nameNode = hdfs://<>:8020
 jobTracker = <>:8050
 focusNode =
 oozie.use.system.libpath = true
 baseDir = hdfs://<>:8020/user/<>
 userName = <>
 sh1_script =<machine local>/
 emailToAddress = <>
 emailFailToAddress = <>

shell script


hdfs dfs -mkdir /........./$today
hdfs dfs -put /....../*.txt /....../$today/
mv /..../*.txt /........./BACKUP/
touch $today


ALTER TABLE <tablename> ADD PARTITION (dt='${partitiondir}') location '/....../${partitiondir}';


 constant ,
 from_unixtime(unix_timestamp(batch_date, 'yyyy-MM-dd'), 'yyyy-MM-dd HH:mm:ss'),


 FROM <tablename>
 where dt = '${partitiondir}'



export JAVA_HOME=/usr/......./jdk1.8.0_112
export OOZIE_URL=https://<>:11443/oozie

hdfs dfs -put /...../orc-workflow /user/<hdfsdir>/orc-workflow

oozie job -run -config /...orc-workflow/


Congrats you have automated data load process into HDFS, Hive and Hive ORC tables !

Feel Free to comment in case of any questions..

Stay tuned, Have a good one !


Posted in Uncategorized, use case | Tagged , , , , , , , | Leave a comment

Oozie: CRON style scheduling in Oozie

  • Cron scheduling adds a lot of flexibility while scheduling jobs using the Oozie coordinator.
  • Its bit tricky, but once you familiarize its going to benefit a lot.
  • Here, just focus on the frequency part in your coordinator.xml

<coordinator-app name=”oozie-coordinator” frequency=”0/10 1/2 * * *” start=”2017-10-05T:00Z” end=”2099-09-19T13:00Z” timezone=”Canada/Eastern” xmlns=”uri:oozie:coordinator:0.1″>

frequency=”01 19 * * *”

Cron expression has 5 fields

  • Minutes: Accepts values 0-59
  • Hours: 0-23
  • Day of the Month: 1-31
  • Month: JAN-DEC or 1-12
  • Day of the Month:  SUN-SAT or 1-7


stay tuned…

Posted in oozie, Uncategorized | Tagged , , | Leave a comment

Oozie: How to pass current date to Work Flow


<coordinator-app name=”oozie-coordinator” frequency=”1440″ start=”2017-10-06T15:00Z” end=”2099-09-19T13:00Z” timezone=”Canada/Eastern” xmlns=”uri:oozie:coordinator:0.1″>

<value>${coord:formatTime(coord:dateOffset(coord:nominalTime(), 0,
‘DAY’), “yyyyMMdd”)}


**Change “0” to -1 if you prefer PREVIOUS DATE

Access currentDate in your workflow.xml and pass it to your shell / ssh action

<ssh xmlns=”uri:oozie:ssh-action:0.1″>






Use it in your shell script


touch $1

Posted in oozie, Uncategorized | Tagged , , , , , | Leave a comment