How to disable pushdown predicate in spark 2.x query

2020-06-22 Thread Mohit Durgapal
Hi All,

I am trying to read a table of a relational database using spark 2.x.

I am using code like the following:

sparkContext.read().jdbc(url, table ,
connectionProperties).select('SELECT_COLUMN').where(whereClause);


Now, What's happening is spark is actually the SQL query which spark is
running against the relational db is :

select column,(where_clause_columns) from table WHERE SELECT_COLUMN IS NOT
NULL;

And I guess it is doing filtering based on the where clause only after
fetching all the data from DB where SELECT_COLUMN IS NOT NULL.

I searched about it and found out this is because of pushdown predicate. Is
there a way to load data into dataframe using specific query instead of
this.

I found a solution where if we provide actual query instead of the table
name in the following code, it should run that query exactly:

table = "select SELECT_COLUMN from table  "+ whereClause;
sparkContext.read().jdbc(url, table ,
connectionProperties).select('SELECT_COLUMN').where(whereClause);


Does the above seem like a good solution?


Regards,
Mohit


How to split a dataframe into two dataframes based on count

2020-05-18 Thread Mohit Durgapal
Dear All,

I would like to know how, in spark 2.0, can I split a dataframe into two
dataframes when I know the exact counts the two dataframes should have. I
tried using limit but got quite weird results. Also, I am looking for exact
counts in child dfs, not the approximate % based split.

*Following is what I have tried:*

var dfParent = sc.read.parquet("somelocation");// let's say it has 4000 rows

I want to split the parent into two dfs with the following counts:

var dfChild1Count = 1000

var dfChild2Count = 3000

*I tried this: *

var dfChild1 = dfParent.limit(dfChild1Count);

var dfChild2 = dfParent.except(dfChild1);

*and wrote that to output hdfs directories:*

dfChild1.write.parquet("/outputfilechild1");

dfChild2.write.parquet("/outputfilechild2");

It turns out this results in some duplicates saved in
files outputfilechild1 & outputfilechild2. Could anyone explain why they
have duplicates?

When I sorted my parent dataframe before limit, it then worked fine:


*dfParent = dfParent.sortBy(col("unique_col").desc())*
Seems like the limit on parent is executed twice and return different
records each time. Not sure why it is executed twice when I mentioned only
once.

Also, Is there a better way to split a df into multiple dfs when we know
exact counts of the child dfs?




Regards,
Mohit


getting error on spark streaming : java.lang.OutOfMemoryError: unable to create new native thread

2016-11-22 Thread Mohit Durgapal
Hi Everyone,


I am getting the following error while running a spark streaming example on
my local machine, the being ingested is only 506kb.


*16/11/23 03:05:54 INFO MappedDStream: Slicing from 1479850537180 ms to
1479850537235 ms (aligned to 1479850537180 ms and 1479850537235 ms)*

*Exception in thread "streaming-job-executor-0" java.lang.OutOfMemoryError:
unable to create new native thread*


I looked it up and found out that it could be related to ulimit, I even
increased the ulimit to 1 but still the same error.


Regards

Mohit


Re: newbie question about RDD

2016-11-22 Thread Mohit Durgapal
Hi Raghav,

Please refer to the following code:

SparkConf sparkConf = new
SparkConf().setMaster("local[2]").setAppName("PersonApp");

//creating java spark context

JavaSparkContext sc = new JavaSparkContext(sparkConf);

//reading file from hfs into spark rdd , the name node is localhost
JavaRDD personStringRDD =
sc.textFile("hdfs://localhost:9000/custom/inputPersonFile.txt");


//Converting from String RDD to Person RDD ...this is just an example,
you can replace the parsing with a better exception handled code

JavaRDD personObjectRDD = personStringRDD.map(personRow -> {
String[] personValues = personRow.split("\t");

return new Person(Long.parseLong(personValues[0]),
personValues[1], personValues[2],
personValues[3]);
});

//finally just printing the count of objects
System.out.println("Person count = "+personObjectRDD.count());


Regards
Mohit


On Tue, Nov 22, 2016 at 11:17 AM, Raghav  wrote:

> Sorry I forgot to ask how can I use spark context here ? I have hdfs
> directory path of the files, as well as the name node of hdfs cluster.
>
> Thanks for your help.
>
> On Mon, Nov 21, 2016 at 9:45 PM, Raghav  wrote:
>
>> Hi
>>
>> I am extremely new to Spark. I have to read a file form HDFS, and get it
>> in memory  in RDD format.
>>
>> I have a Java class as follows:
>>
>> class Person {
>> private long UUID;
>> private String FirstName;
>> private String LastName;
>> private String zip;
>>
>>// public methods
>> }
>>
>> The file in HDFS is as follows:
>>
>> UUID. FirstName LastName Zip
>> 7462   John Doll06903
>> 5231   Brad Finley 32820
>>
>>
>> Can someone point me how to get a JavaRDD object by reading the
>> file in HDFS ?
>>
>> Thanks.
>>
>> --
>> Raghav
>>
>
>
>
> --
> Raghav
>


Re: how do you convert directstream into data frames

2015-08-13 Thread Mohit Durgapal
Any idea anyone?

On Fri, Aug 14, 2015 at 10:11 AM, Mohit Durgapal 
wrote:

> Hi All,
>
> After creating a direct stream like below:
>
> val events = KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder](
>   ssc, kafkaParams, topicsSet)
>
>
> I would like to convert the above stream into data frames, so that I could
> run hive queries over it. Could anyone please explain how this can be
> achieved? I am using spark version 1.3.0
>


how do you convert directstream into data frames

2015-08-13 Thread Mohit Durgapal
Hi All,

After creating a direct stream like below:

val events = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](
  ssc, kafkaParams, topicsSet)


I would like to convert the above stream into data frames, so that I could
run hive queries over it. Could anyone please explain how this can be
achieved? I am using spark version 1.3.0


spark-kafka directAPI vs receivers based API

2015-08-10 Thread Mohit Durgapal
Hi All,

I just wanted to know how does directAPI for spark streaming compare with
earlier receivers based API. Has anyone used directAPI based approach on
production or is it still being used for pocs?

Also, since I'm new to spark, could anyone share a starting point from
where I could find a working code for both of the above APIs?

Also, in my use case I want to analyse a data stream(comma separated
string) & aggregate over certain fields based on their types. Ideally I
would like to push that aggregated data to a column family based
datastore(like HBase, we are using it currently). But my first I'd like to
find out how to aggregate that data and how does streaming work, whether It
polls & fetches data in batches or does it continuously listen to the kafka
queue for any new message. And how can I configure my application for
either cases. I hope my questions make sense.


Regards
Mohit


checking

2015-02-06 Thread Mohit Durgapal
Just wanted to know If my emails are reaching the user list.


Regards
Mohit


spark streaming from kafka real time + batch processing in java

2015-02-06 Thread Mohit Durgapal
I want to write a spark streaming consumer for kafka in java. I want to
process the data in real-time as well as store the data in hdfs in
year/month/day/hour/ format. I am not sure how to achieve this. Should I
write separate kafka consumers, one for writing data to HDFS and one for
spark streaming?

Also I would like to ask what do people generally do with the result of
spark streams after aggregating over it? Is it okay to update a NoSQL DB
with aggregated counts per batch interval or is it generally stored in hdfs?

Is it possible to store the mini batch data from spark streaming to HDFS in
a way that the data is aggregated  hourly and put into HDFS in its "hour"
folder. I would not want a lot of small files equal to the mini batches of
spark per hour, that would be inefficient for running hadoop jobs later.

Is anyone working on the same problem?

Any help and comments would be great.


Regards

Mohit


spark streaming from kafka real time + batch processing in java

2015-02-05 Thread Mohit Durgapal
I want to write a spark streaming consumer for kafka in java. I want to
process the data in real-time as well as store the data in hdfs in
year/month/day/hour/ format. I am not sure how to achieve this. Should I
write separate kafka consumers, one for writing data to HDFS and one for
spark streaming?

Also I would like to ask what do people generally do with the result of
spark streams after aggregating over it? Is it okay to update a NoSQL DB
with aggregated counts per batch interval or is it generally stored in hdfs?

Is it possible to store the mini batch data from spark streaming to HDFS in
a way that the data is aggregated  hourly and put into HDFS in its "hour"
folder. I would not want a lot of small files equal to the mini batches of
spark per hour, that would be inefficient for running hadoop jobs later.

Is anyone working on the same problem?

Any help and comments would be great.


Regards
Mohit


connecting spark with ActiveMQ

2015-02-03 Thread Mohit Durgapal
Hi All,

I have a requirement where I need to consume messages from ActiveMQ and do
live stream processing as well as batch processing using Spark. Is there a
spark-plugin or library that can enable this? If not, then do you know any
other way this could be done?


Regards
Mohit