Can't access the data in Kafka Spark Streaming globally

2016-12-22 Thread Sree Eedupuganti
I am trying to stream the data from Kafka to Spark.

JavaPairInputDStream directKafkaStream =
KafkaUtils.createDirectStream(ssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams, topics);

Here i am iterating over the JavaPairInputDStream to process the RDD's.

directKafkaStream.foreachRDD(rdd ->{
rdd.foreachPartition(items ->{
while (items.hasNext()) {
String[] State = items.next()._2.split("\\,");
System.out.println(State[2]+","+State[3]+","+State[4]+"--");
};
});
});


In this i can able to access the String Array but when i am trying to
access the String Array data globally i can't access the data. Here my
requirement is if i had access these data globally i had another lookup
table in Hive. So i am trying to perform an operation on these. Any
suggestions please, Thanks.

-- 
Best Regards,
Sreeharsha Eedupuganti


How to perform Join operation using JAVARDD

2016-12-17 Thread Sree Eedupuganti
I tried like this,

*CrashData_1.csv:*

*CRASH_KEYCRASH_NUMBER  CRASH_DATECRASH_MONTH*
*2016899114 2016899114  01/02/2016   12:00:00
AM +*

*CrashData_2.csv:*

*CITY_NAMEZIPCODE CITY STATE*
*1945 704   PARC PARQUE   PR*


Code:

*JavaRDD firstRDD =
sc.textFile("/Users/apple/Desktop/CrashData_1.csv");*

*JavaRDD secondRDD =
sc.textFile("/Users/apple/Desktop/CrashData_2.csv");*

*JavaRDD allRDD = firstRDD.union(secondRDD);*


*Output i am getting:*

*[CRASH_KEY,CRASH_NUMBER,CRASH_DATE,CRASH_MONTH,
2016899114,2016899114,01/02/2016 12:00:00 AM + *

*CITY_NAME,ZIPCODE,CITY,STATE, **1945,704,PARC PARQUE,PR]*




*Any suggesttions please, Thanks in advance*


How to read a Multi Line json object via Spark

2016-11-14 Thread Sree Eedupuganti
I tried from Spark-Shell and i am getting the following error:

Here is the test.json file:

{
"colorsArray": [{
"red": "#f00",
"green": "#0f0",
"blue": "#00f",
"cyan": "#0ff",
"magenta": "#f0f",
"yellow": "#ff0",
"black": "#000"
}]}


scala> val jtex =
sqlContext.read.format("json").option("samplingRatio","1.0").load("/user/spark/test.json")

   jtex: org.apache.spark.sql.DataFrame = [_corrupt_record: string]


Any suggestions please. Thanks.
-- 
Best Regards,
Sreeharsha Eedupuganti
Data Engineer
innData Analytics Private Limited


Convert hive sql to spark sql

2016-10-09 Thread Sree Eedupuganti
Hi users i need to test the performance of the query in hive and spark. Can
any one convert these sql to spark sql. Here is the sql.


SELECT split(DTD.TRAN_RMKS,'/')[0] AS TRAB_RMK1,
split(DTD.TRAN_RMKS,'/')[1] AS ATM_ID,
DTD.ACID,
G.FORACID,
DTD.REF_NUM,
DTD.TRAN_ID,
DTD.TRAN_DATE,
DTD.VALUE_DATE,
DTD.TRAN_PARTICULAR,
DTD.TRAN_RMKS,
DTD.TRAN_AMT,
SYSDATE_ORA(),
DTD.PSTD_DATE,
DTD.PSTD_FLG,
G.CUSTID,
NULL AS PROC_FLG,
DTD.PSTD_USER_ID,
DTD.ENTRY_USER_ID,
G.schemecode as SCODE
FROM DAILY_TRAN_DETAIL_TABLE2 DTD
JOIN ods_gam G
ON DTD.ACID = G.ACID
where substr(DTD.TRAN_PARTICULAR,1,3) rlike '(PUR|POS).*'
AND DTD.PART_TRAN_TYPE = 'D'
AND DTD.DEL_FLG <> 'Y'
AND DTD.PSTD_FLG = 'Y'
AND G.schemecode IN
('SBPRV','SBPRS','WSSTF','BGFRN','NREPV','NROPV','BSNRE','BSNRO')
AND  (SUBSTR(split(DTD.TRAN_RMKS,'/')[0],1,6) IN
('405997','406228','406229','415527','415528','417917','417918','418210','421539','421572','432198','435736','450502','450503','450504','468805','469190','469191','469192','474856','478286','478287','486292','490222','490223','490254','512932','512932','514833','522346','522352','524458','526106','526701','527114','527479','529608','529615','529616','532731','532734','533102','534680','536132','536610','536621','539149','539158','549751','557654','607118','607407','607445','607529','652189','652190','652157')
OR   SUBSTR(split(DTD.TRAN_RMKS,'/')[0],1,8)  IN
('53270200','53270201','53270202','60757401','60757402') )
limit 50;
-- 
Best Regards,
Sreeharsha Eedupuganti


How to convert List into json object / json Array

2016-08-30 Thread Sree Eedupuganti
Here is the snippet of my code :

Dataset rows_salaries = 
spark.read().json("/Users/Macbook/Downloads/rows_salaries.json");
rows_salaries.createOrReplaceTempView("salaries");  
List df = spark.sql("select * from salaries").collectAsList();

I need to read the json data from 'List df = spark.sql("select * from 
salaries").collectAsList();'  but i am unable to convert 'df' to either 
JSONObject or JSONArray. Is the way am i going is right or any other way to 
fetch the JSON data. Any suggestions please...

Thanks...




How to convert List into json object / json Array

2016-08-30 Thread Sree Eedupuganti
Any suggesttions please.


Converting Dataframe to resultSet in Spark Java

2016-08-18 Thread Sree Eedupuganti
Retrieved the data to DataFrame but i can't convert into ResultSet Is
there any possible way how to convert...Any suggestions please...

Exception in thread "main" java.lang.ClassCastException:
org.apache.spark.sql.DataFrame cannot be cast to
com.datastax.driver.core.ResultSet
-- 
Best Regards,
Sreeharsha Eedupuganti
Data Engineer
innData Analytics Private Limited


Re: How to insert data into 2000 partitions(directories) of ORC/parquet at a time using Spark SQL?

2016-06-14 Thread Sree Eedupuganti
Hi Spark users, i am new to spark. I am trying to connect hive using
SparkJavaContext. Unable to connect to the database. By executing the below
code i can see only "default" database. Can anyone help me out. What i need
is a sample program for Querying Hive results using SparkJavaContext. Need
to pass any values like this.

userDF.registerTempTable("userRecordsTemp")

sqlContext.sql("SET hive.default.fileformat=Orc  ")
sqlContext.sql("set hive.enforce.bucketing = true; ")
sqlContext.sql("set hive.enforce.sorting = true; ")

 public static void  main(String[] args ) throws Exception {
  SparkConf sparkConf = new
SparkConf().setAppName("SparkSQL").setMaster("local");
  SparkContext  ctx=new SparkContext(sparkConf);
  HiveContext  hiveql=new
org.apache.spark.sql.hive.HiveContext(ctx);
  DataFrame df=hiveql.sql("show databases");
  df.show();
  }

Any suggestions pleaseThanks.


Content-based Recommendation Engine

2016-05-05 Thread Sree Eedupuganti
Can anyone share the code for Content-based Recommendation Engine to
recommend the user based on E-mail subject.

-- 
Best Regards,
Sreeharsha Eedupuganti
Data Engineer
innData Analytics Private Limited


Content-based Recommendation Engine

2016-05-05 Thread Sree Eedupuganti
Can anyone share the code for Content-based Recommendation Engine to
recommend the user based on E-mail subject.

-- 
Best Regards,
Sreeharsha Eedupuganti
Data Engineer
innData Analytics Private Limited


Spark Select Statement

2016-05-04 Thread Sree Eedupuganti
Hello Spark users, can we query the SQL SELECT statement in Spark using
Java.
if it is possible any suggestions please. I tried like this.How to pass the
database name.
Here my database name is nimbus and table name is winbox_opens.

*Source Code :*

*public class Select { public static class SquareKey implements
Function {   public Integer call(Row row) throws Exception {
   return row.getInt(0) * row.getInt(0);   } } public static void
main(String[] args) throws Exception {  SparkConf s = new
SparkConf().setMaster("local[2]").setAppName("Select");  SparkContext sc =
new SparkContext(s);  HiveContext hc = new HiveContext(sc);   DataFrame rdd
= hc.sql("SELECT * FROM winbox_opens");   JavaRDD squaredKeys =
rdd.toJavaRDD().map(new SquareKey());   List result =
squaredKeys.collect();   for (Integer elem : result) {
 System.out.println(elem);   }  }}*

*Error: Exception in thread "main" org.apache.spark.sql.AnalysisException:
no such table winbox_prod_action_logs_1; line 1 pos 14*

-- 
Best Regards,
Sreeharsha Eedupuganti
Data Engineer
innData Analytics Private Limited


Need a sample code to load XML files into cassandra database using spark streaming

2016-01-26 Thread Sree Eedupuganti
Hello everyone, new to spark streaming, need a sample code to load xml
files from AWS S3 server to cassandra database. Any suggesttions please,
Thanks in advance.

-- 
Best Regards,
Sreeharsha Eedupuganti
Data Engineer
innData Analytics Private Limited


How to migrate spark code to spark streaming ?

2016-01-26 Thread Sree Eedupuganti
Hello everyone,
Loading XML files from S3 to database [i.e Cassandra]. Right now my code is
in Spark Core. I want to migrate my code to Spark Streaming because for
every 15 minutes we have to load XML files into database. So in this case i
need to migrate my code to Spark Streaming. Any suggestions please. Thanks
in advance.
-- 
Best Regards,
Sreeharsha Eedupuganti
Data Engineer
innData Analytics Private Limited


How to send a file to database using spark streaming

2016-01-22 Thread Sree Eedupuganti
New to Spark Streaming. My question is i want to load the XML files to
database [cassandra] using spark streaming.Any suggestions please.Thanks in
Advance.

-- 
Best Regards,
Sreeharsha Eedupuganti
Data Engineer
innData Analytics Private Limited


Getting an error while submitting spark jar

2016-01-10 Thread Sree Eedupuganti
The way how i submitting jar

hadoop@localhost:/usr/local/hadoop/spark$ ./bin/spark-submit \
>   --class mllib.perf.TesRunner \
>   --master spark://localhost:7077 \
>   --executor-memory 2G \
>   --total-executor-cores 100 \
>   /usr/local/hadoop/spark/lib/mllib-perf-tests-assembly.jar \
>   1000

And here is my error,Spark assembly has been built with Hive, including
Datanucleus jars on classpath
java.lang.ClassNotFoundException: mllib.perf.TesRunner
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:538)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
hadoop@localhost:/usr/local/hadoop/spark$

Thanks in Advance

-- 
Best Regards,
Sreeharsha Eedupuganti
Data Engineer
innData Analytics Private Limited


Getting an error in insertion to mysql through sparkcontext in java..

2015-12-19 Thread Sree Eedupuganti
i had 9 rows in my Mysql table


options.put("dbtable", "(select * from employee");
   options.put("lowerBound", "1");
   options.put("upperBound", "8");
   options.put("numPartitions", "2");
Error : Parameter index out of range (1 > number of parameters, which is 0)

-- 
Best Regards,
Sreeharsha Eedupuganti
Data Engineer
innData Analytics Private Limited