Re: [Spark SQL] why spark sql hash() are returns the same hash value though the keys/expr are not same

2018-09-28 Thread Gokula Krishnan D
Hello Jayesh,

I have masked the input values with .


Thanks & Regards,
Gokula Krishnan* (Gokul)*


On Wed, Sep 26, 2018 at 2:20 PM Thakrar, Jayesh <
jthak...@conversantmedia.com> wrote:

> Cannot reproduce your situation.
>
> Can you share Spark version?
>
>
>
> Welcome to
>
>     __
>
>  / __/__  ___ _/ /__
>
> _\ \/ _ \/ _ `/ __/  '_/
>
>/___/ .__/\_,_/_/ /_/\_\   version 2.2.0
>
>   /_/
>
>
>
> Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java
> 1.8.0_92)
>
> Type in expressions to have them evaluated.
>
> Type :help for more information.
>
>
>
> scala> spark.sql("select hash('40514X'),hash('41751')").show()
>
> ++---+
>
> |hash(40514X)|hash(41751)|
>
> ++---+
>
> | -1898845883|  916273350|
>
> ++---+
>
>
>
>
>
> scala> spark.sql("select hash('14589'),hash('40004')").show()
>
> +---+---+
>
> |hash(14589)|hash(40004)|
>
> +---+---+
>
> |  777096871|-1593820563|
>
> +---+---+
>
>
>
>
>
> scala>
>
>
>
> *From: *Gokula Krishnan D 
> *Date: *Tuesday, September 25, 2018 at 8:57 PM
> *To: *user 
> *Subject: *[Spark SQL] why spark sql hash() are returns the same hash
> value though the keys/expr are not same
>
>
>
> Hello All,
>
>
>
> I am calculating the hash value  of few columns and determining whether
> its an Insert/Delete/Update Record but found a scenario which is little
> weird since some of the records returns same hash value though the key's
> are totally different.
>
>
>
> For the instance,
>
>
>
> scala> spark.sql("select hash('40514X'),hash('41751')").show()
>
> +---+---+
>
> |hash(40514)|hash(41751)|
>
> +---+---+
>
> |  976573657|  976573657|
>
> +---+---+
>
>
>
> scala> spark.sql("select hash('14589'),hash('40004')").show()
>
> +---+---+
>
> |hash(14589)|hash(40004)|
>
> +---+---+
>
> |  777096871|  777096871|
>
> +---+---+
>
> I do understand that hash() returns an integer, are these reached the max
> value?.
>
>
>
> Thanks & Regards,
>
> Gokula Krishnan* (Gokul)*
>


[Spark SQL] why spark sql hash() are returns the same hash value though the keys/expr are not same

2018-09-25 Thread Gokula Krishnan D
Hello All,

I am calculating the hash value  of few columns and determining whether its
an Insert/Delete/Update Record but found a scenario which is little weird
since some of the records returns same hash value though the key's are
totally different.

For the instance,

scala> spark.sql("select hash('40514X'),hash('41751')").show()

+---+---+

|hash(40514)|hash(41751)|

+---+---+

|  976573657|  976573657|

+---+---+

scala> spark.sql("select hash('14589'),hash('40004')").show()

+---+---+

|hash(14589)|hash(40004)|

+---+---+

|  777096871|  777096871|

+---+---+
I do understand that hash() returns an integer, are these reached the max
value?.

Thanks & Regards,
Gokula Krishnan* (Gokul)*


Re: How to avoid duplicate column names after join with multiple conditions

2018-07-06 Thread Gokula Krishnan D
Nirav,

withColumnRenamed() API might help but it does not different column and
renames all the occurrences of the given column. either use select() API
and rename as you want.



Thanks & Regards,
Gokula Krishnan* (Gokul)*

On Mon, Jul 2, 2018 at 5:52 PM, Nirav Patel  wrote:

> Expr is `df1(a) === df2(a) and df1(b) === df2(c)`
>
> How to avoid duplicate column 'a' in result? I don't see any api that
> combines both. Rename manually?
>
>
>
> [image: What's New with Xactly] 
>
> 
> 
>    
> 


Re: What are factors need to Be considered when upgrading to Spark 2.1.0 from Spark 1.6.0

2017-09-29 Thread Gokula Krishnan D
Do you see any changes or improvments in the *Core-API* in Spark 2.X when
compared with Spark 1.6.0. ?.




Thanks & Regards,
Gokula Krishnan* (Gokul)*

On Mon, Sep 25, 2017 at 1:32 PM, Gokula Krishnan D <email2...@gmail.com>
wrote:

> Thanks for the reply. Forgot to mention that, our Batch ETL Jobs are in
> Core-Spark.
>
>
> On Sep 22, 2017, at 3:13 PM, Vadim Semenov <vadim.seme...@datadoghq.com>
> wrote:
>
> 1. 40s is pretty negligible unless you run your job very frequently, there
> can be many factors that influence that.
>
> 2. Try to compare the CPU time instead of the wall-clock time
>
> 3. Check the stages that got slower and compare the DAGs
>
> 4. Test with dynamic allocation disabled
>
> On Fri, Sep 22, 2017 at 2:39 PM, Gokula Krishnan D <email2...@gmail.com>
> wrote:
>
>> Hello All,
>>
>> Currently our Batch ETL Jobs are in Spark 1.6.0 and planning to upgrade
>> into Spark 2.1.0.
>>
>> With minor code changes (like configuration and Spark Session.sc) able to
>> execute the existing JOB into Spark 2.1.0.
>>
>> But noticed that JOB completion timings are much better in Spark 1.6.0
>> but no in Spark 2.1.0.
>>
>> For the instance, JOB A completed in 50s in Spark 1.6.0.
>>
>> And with the same input and JOB A completed in 1.5 mins in Spark 2.1.0.
>>
>> Is there any specific factor needs to be considered when switching to
>> Spark 2.1.0 from Spark 1.6.0.
>>
>>
>>
>> Thanks & Regards,
>> Gokula Krishnan* (Gokul)*
>>
>
>
>


Re: What are factors need to Be considered when upgrading to Spark 2.1.0 from Spark 1.6.0

2017-09-25 Thread Gokula Krishnan D
Thanks for the reply. Forgot to mention that, our Batch ETL Jobs are in
Core-Spark.


On Sep 22, 2017, at 3:13 PM, Vadim Semenov <vadim.seme...@datadoghq.com>
wrote:

1. 40s is pretty negligible unless you run your job very frequently, there
can be many factors that influence that.

2. Try to compare the CPU time instead of the wall-clock time

3. Check the stages that got slower and compare the DAGs

4. Test with dynamic allocation disabled

On Fri, Sep 22, 2017 at 2:39 PM, Gokula Krishnan D <email2...@gmail.com>
wrote:

> Hello All,
>
> Currently our Batch ETL Jobs are in Spark 1.6.0 and planning to upgrade
> into Spark 2.1.0.
>
> With minor code changes (like configuration and Spark Session.sc) able to
> execute the existing JOB into Spark 2.1.0.
>
> But noticed that JOB completion timings are much better in Spark 1.6.0 but
> no in Spark 2.1.0.
>
> For the instance, JOB A completed in 50s in Spark 1.6.0.
>
> And with the same input and JOB A completed in 1.5 mins in Spark 2.1.0.
>
> Is there any specific factor needs to be considered when switching to
> Spark 2.1.0 from Spark 1.6.0.
>
>
>
> Thanks & Regards,
> Gokula Krishnan* (Gokul)*
>


Re: What are factors need to Be considered when upgrading to Spark 2.1.0 from Spark 1.6.0

2017-09-22 Thread Gokula Krishnan D
Thanks for the reply. Forgot to mention that, our Batch ETL Jobs are in 
Core-Spark. 


> On Sep 22, 2017, at 3:13 PM, Vadim Semenov <vadim.seme...@datadoghq.com> 
> wrote:
> 
> 1. 40s is pretty negligible unless you run your job very frequently, there 
> can be many factors that influence that.
> 
> 2. Try to compare the CPU time instead of the wall-clock time
> 
> 3. Check the stages that got slower and compare the DAGs
> 
> 4. Test with dynamic allocation disabled
> 
> On Fri, Sep 22, 2017 at 2:39 PM, Gokula Krishnan D <email2...@gmail.com 
> <mailto:email2...@gmail.com>> wrote:
> Hello All, 
> 
> Currently our Batch ETL Jobs are in Spark 1.6.0 and planning to upgrade into 
> Spark 2.1.0. 
> 
> With minor code changes (like configuration and Spark Session.sc) able to 
> execute the existing JOB into Spark 2.1.0. 
> 
> But noticed that JOB completion timings are much better in Spark 1.6.0 but no 
> in Spark 2.1.0.
> 
> For the instance, JOB A completed in 50s in Spark 1.6.0. 
> 
> And with the same input and JOB A completed in 1.5 mins in Spark 2.1.0. 
> 
> Is there any specific factor needs to be considered when switching to Spark 
> 2.1.0 from Spark 1.6.0. 
> 
> 
> 
> Thanks & Regards, 
> Gokula Krishnan (Gokul)
> 



What are factors need to Be considered when upgrading to Spark 2.1.0 from Spark 1.6.0

2017-09-22 Thread Gokula Krishnan D
Hello All,

Currently our Batch ETL Jobs are in Spark 1.6.0 and planning to upgrade
into Spark 2.1.0.

With minor code changes (like configuration and Spark Session.sc) able to
execute the existing JOB into Spark 2.1.0.

But noticed that JOB completion timings are much better in Spark 1.6.0 but
no in Spark 2.1.0.

For the instance, JOB A completed in 50s in Spark 1.6.0.

And with the same input and JOB A completed in 1.5 mins in Spark 2.1.0.

Is there any specific factor needs to be considered when switching to Spark
2.1.0 from Spark 1.6.0.



Thanks & Regards,
Gokula Krishnan* (Gokul)*


Re: [Spark-Core] sc.textFile() explicit minPartitions did not work

2017-07-25 Thread Gokula Krishnan D
Thanks Xiayun Sun, Robin East for your inputs. It make sense to me.

Thanks & Regards,
Gokula Krishnan* (Gokul)*

On Tue, Jul 25, 2017 at 9:55 AM, Xiayun Sun <xiayun...@gmail.com> wrote:

> I'm guessing by "part files" you mean files like part-r-0. These are
> actually different from hadoop "block size", which is the value actually
> used in partitions.
>
> Looks like your hdfs block size is the default 128mb: 258.2GB in 500 part
> files -> around 528mb per part file -> each part file would take a little
> more than 4 blocks -> total that would be around 2000 blocks.
>
> You cannot set partitions fewer than blocks, that's why 500 does not work
> (spark doc here
> <http://spark.apache.org/docs/latest/rdd-programming-guide.html>)
>
> The textFile method also takes an optional second argument for
>> controlling the number of partitions of the file. By default, Spark creates
>> one partition for each block of the file (blocks being 128MB by default in
>> HDFS), but you can also ask for a higher number of partitions by passing a
>> larger value. Note that you cannot have fewer partitions than blocks.
>
>
>
> Now as to why 3000 gives you 3070 partitions, spark use hadoop's
> InputFormat.getSplits
> <https://github.com/apache/hadoop/blob/f67237cbe7bc48a1b9088e990800b37529f1db2a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/InputFormat.java#L85>,
> and the partition desired is at most a hint, so the final result could be a
> bit different.
>
> On 25 July 2017 at 19:54, Gokula Krishnan D <email2...@gmail.com> wrote:
>
>> Excuse for the too many mails on this post.
>>
>> found a similar issue https://stackoverflow.co
>> m/questions/24671755/how-to-partition-a-rdd
>>
>> Thanks & Regards,
>> Gokula Krishnan* (Gokul)*
>>
>> On Tue, Jul 25, 2017 at 8:21 AM, Gokula Krishnan D <email2...@gmail.com>
>> wrote:
>>
>>> In addition to that,
>>>
>>> tried to read the same file with 3000 partitions but it used 3070
>>> partitions. And took more time than previous please refer the attachment.
>>>
>>> Thanks & Regards,
>>> Gokula Krishnan* (Gokul)*
>>>
>>> On Tue, Jul 25, 2017 at 8:15 AM, Gokula Krishnan D <email2...@gmail.com>
>>> wrote:
>>>
>>>> Hello All,
>>>>
>>>> I have a HDFS file with approx. *1.5 Billion records* with 500 Part
>>>> files (258.2GB Size) and when I tried to execute the following I could see
>>>> that it used 2290 tasks but it supposed to be 500 as like HDFS File, isn't
>>>> it?
>>>>
>>>> val inputFile = 
>>>> val inputRdd = sc.textFile(inputFile)
>>>> inputRdd.count()
>>>>
>>>> I was hoping that I can do the same with the fewer partitions so tried
>>>> the following
>>>>
>>>> val inputFile = 
>>>> val inputrddnqew = sc.textFile(inputFile,500)
>>>> inputRddNew.count()
>>>>
>>>> But still it used 2290 tasks.
>>>>
>>>> As per scala doc, it supposed use as like the HDFS file i.e 500.
>>>>
>>>> It would be great if you could throw some insight on this.
>>>>
>>>> Thanks & Regards,
>>>> Gokula Krishnan* (Gokul)*
>>>>
>>>
>>>
>>
>
>
> --
> Xiayun Sun
>
> Home is behind, the world ahead,
> and there are many paths to tread
> through shadows to the edge of night,
> until the stars are all alight.
>


Re: [Spark-Core] sc.textFile() explicit minPartitions did not work

2017-07-25 Thread Gokula Krishnan D
Excuse for the too many mails on this post.

found a similar issue
https://stackoverflow.com/questions/24671755/how-to-partition-a-rdd

Thanks & Regards,
Gokula Krishnan* (Gokul)*

On Tue, Jul 25, 2017 at 8:21 AM, Gokula Krishnan D <email2...@gmail.com>
wrote:

> In addition to that,
>
> tried to read the same file with 3000 partitions but it used 3070
> partitions. And took more time than previous please refer the attachment.
>
> Thanks & Regards,
> Gokula Krishnan* (Gokul)*
>
> On Tue, Jul 25, 2017 at 8:15 AM, Gokula Krishnan D <email2...@gmail.com>
> wrote:
>
>> Hello All,
>>
>> I have a HDFS file with approx. *1.5 Billion records* with 500 Part
>> files (258.2GB Size) and when I tried to execute the following I could see
>> that it used 2290 tasks but it supposed to be 500 as like HDFS File, isn't
>> it?
>>
>> val inputFile = 
>> val inputRdd = sc.textFile(inputFile)
>> inputRdd.count()
>>
>> I was hoping that I can do the same with the fewer partitions so tried
>> the following
>>
>> val inputFile = 
>> val inputrddnqew = sc.textFile(inputFile,500)
>> inputRddNew.count()
>>
>> But still it used 2290 tasks.
>>
>> As per scala doc, it supposed use as like the HDFS file i.e 500.
>>
>> It would be great if you could throw some insight on this.
>>
>> Thanks & Regards,
>> Gokula Krishnan* (Gokul)*
>>
>
>


Re: [Spark-Core] sc.textFile() explicit minPartitions did not work

2017-07-25 Thread Gokula Krishnan D
In addition to that,

tried to read the same file with 3000 partitions but it used 3070
partitions. And took more time than previous please refer the attachment.

Thanks & Regards,
Gokula Krishnan* (Gokul)*

On Tue, Jul 25, 2017 at 8:15 AM, Gokula Krishnan D <email2...@gmail.com>
wrote:

> Hello All,
>
> I have a HDFS file with approx. *1.5 Billion records* with 500 Part files
> (258.2GB Size) and when I tried to execute the following I could see that
> it used 2290 tasks but it supposed to be 500 as like HDFS File, isn't it?
>
> val inputFile = 
> val inputRdd = sc.textFile(inputFile)
> inputRdd.count()
>
> I was hoping that I can do the same with the fewer partitions so tried the
> following
>
> val inputFile = 
> val inputrddnqew = sc.textFile(inputFile,500)
> inputRddNew.count()
>
> But still it used 2290 tasks.
>
> As per scala doc, it supposed use as like the HDFS file i.e 500.
>
> It would be great if you could throw some insight on this.
>
> Thanks & Regards,
> Gokula Krishnan* (Gokul)*
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

[Spark-Core] sc.textFile() explicit minPartitions did not work

2017-07-25 Thread Gokula Krishnan D
Hello All,

I have a HDFS file with approx. *1.5 Billion records* with 500 Part files
(258.2GB Size) and when I tried to execute the following I could see that
it used 2290 tasks but it supposed to be 500 as like HDFS File, isn't it?

val inputFile = 
val inputRdd = sc.textFile(inputFile)
inputRdd.count()

I was hoping that I can do the same with the fewer partitions so tried the
following

val inputFile = 
val inputrddnqew = sc.textFile(inputFile,500)
inputRddNew.count()

But still it used 2290 tasks.

As per scala doc, it supposed use as like the HDFS file i.e 500.

It would be great if you could throw some insight on this.

Thanks & Regards,
Gokula Krishnan* (Gokul)*

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Re: Spark on Cloudera Configuration (Scheduler Mode = FAIR)

2017-07-21 Thread Gokula Krishnan D
Mark & Ayan, thanks for the inputs.

*Is there anyway can we setup the scheduler mode in Spark Cluster level
besides application (SC level).*

Currently in YARN is in FAIR mode and manually we ensure that Spark
Application also in FAIR mode however noticed that Applications are not
releasing the resources as soon as the tasks are done when we mention
Dynamic allocation = true and did not specify any explicit Executor
allocation.

At this moment, we are specifying the Min and Max Executor allocation at
Spark Application level in order to ensure that all of our ETL Spark
Applications can run parallel without any resource issues.

It would be great if you could throw more insight on the how to set the
preemption within yarn and Spark.

Thanks & Regards,
Gokula Krishnan* (Gokul)*

On Thu, Jul 20, 2017 at 6:46 PM, ayan guha <guha.a...@gmail.com> wrote:

> Hi
>
> As Mark said, scheduler mode works within application ie within a Spark
> Session and Spark context. This is also clear if you think where you set
> the configuration - in a Spark Config which used to build a context.
>
> If you are using Yarn as resource manager, however, you can set YARN with
> fair scheduler. If you do so, both of your applications will get "Fair"
> treatment from Yarn, ie get resources in round robin manner. If you want
> your App A to give up resources while using them, you need to set
> preemption within Yarn and priority of applications so that preemption can
> kick in.
>
> HTH...
>
> Best, Ayan
>
> On Fri, Jul 21, 2017 at 7:11 AM, Mark Hamstra <m...@clearstorydata.com>
> wrote:
>
>> The fair scheduler doesn't have anything to do with reallocating resource
>> across Applications.
>>
>> https://spark.apache.org/docs/latest/job-scheduling.html#sch
>> eduling-across-applications
>> https://spark.apache.org/docs/latest/job-scheduling.html#sch
>> eduling-within-an-application
>>
>> On Thu, Jul 20, 2017 at 2:02 PM, Gokula Krishnan D <email2...@gmail.com>
>> wrote:
>>
>>> Mark, Thanks for the response.
>>>
>>> Let me rephrase my statements.
>>>
>>> "I am submitting a Spark application(*Application*#A) with
>>> scheduler.mode as FAIR and dynamicallocation=true and it got all the
>>> available executors.
>>>
>>> In the meantime, submitting another Spark Application (*Application*
>>> # B) with the scheduler.mode as FAIR and dynamicallocation=true but it got
>>> only one executor. "
>>>
>>> Thanks & Regards,
>>> Gokula Krishnan* (Gokul)*
>>>
>>> On Thu, Jul 20, 2017 at 4:56 PM, Mark Hamstra <m...@clearstorydata.com>
>>> wrote:
>>>
>>>> First, Executors are not allocated to Jobs, but rather to Applications.
>>>> If you run multiple Jobs within a single Application, then each of the
>>>> Tasks associated with Stages of those Jobs has the potential to run on any
>>>> of the Application's Executors. Second, once a Task starts running on an
>>>> Executor, it has to complete before another Task can be scheduled using the
>>>> prior Task's resources -- the fair scheduler is not preemptive of running
>>>> Tasks.
>>>>
>>>> On Thu, Jul 20, 2017 at 1:45 PM, Gokula Krishnan D <email2...@gmail.com
>>>> > wrote:
>>>>
>>>>> Hello All,
>>>>>
>>>>> We are having cluster with 50 Executors each with 4 Cores so can avail
>>>>> max. 200 Executors.
>>>>>
>>>>> I am submitting a Spark application(JOB A) with scheduler.mode as FAIR
>>>>> and dynamicallocation=true and it got all the available executors.
>>>>>
>>>>> In the meantime, submitting another Spark Application (JOB B) with the
>>>>> scheduler.mode as FAIR and dynamicallocation=true but it got only one
>>>>> executor.
>>>>>
>>>>> Normally this situation occurs when any of the JOB runs with the
>>>>> Scheduler.mode= FIFO.
>>>>>
>>>>> 1) Have your ever faced this issue if so how to overcome this?.
>>>>>
>>>>> I was in the impression that as soon as I submit the JOB B the Spark
>>>>> Scheduler should distribute/release few resources from the JOB A and share
>>>>> it with the JOB A in the Round Robin fashion?.
>>>>>
>>>>> Appreciate your response !!!.
>>>>>
>>>>>
>>>>> Thanks & Regards,
>>>>> Gokula Krishnan* (Gokul)*
>>>>>
>>>>
>>>>
>>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>


Re: Spark on Cloudera Configuration (Scheduler Mode = FAIR)

2017-07-20 Thread Gokula Krishnan D
Mark, Thanks for the response.

Let me rephrase my statements.

"I am submitting a Spark application(*Application*#A) with scheduler.mode
as FAIR and dynamicallocation=true and it got all the available executors.

In the meantime, submitting another Spark Application (*Application* # B)
with the scheduler.mode as FAIR and dynamicallocation=true but it got only
one executor. "

Thanks & Regards,
Gokula Krishnan* (Gokul)*

On Thu, Jul 20, 2017 at 4:56 PM, Mark Hamstra <m...@clearstorydata.com>
wrote:

> First, Executors are not allocated to Jobs, but rather to Applications. If
> you run multiple Jobs within a single Application, then each of the Tasks
> associated with Stages of those Jobs has the potential to run on any of the
> Application's Executors. Second, once a Task starts running on an Executor,
> it has to complete before another Task can be scheduled using the prior
> Task's resources -- the fair scheduler is not preemptive of running Tasks.
>
> On Thu, Jul 20, 2017 at 1:45 PM, Gokula Krishnan D <email2...@gmail.com>
> wrote:
>
>> Hello All,
>>
>> We are having cluster with 50 Executors each with 4 Cores so can avail
>> max. 200 Executors.
>>
>> I am submitting a Spark application(JOB A) with scheduler.mode as FAIR
>> and dynamicallocation=true and it got all the available executors.
>>
>> In the meantime, submitting another Spark Application (JOB B) with the
>> scheduler.mode as FAIR and dynamicallocation=true but it got only one
>> executor.
>>
>> Normally this situation occurs when any of the JOB runs with the
>> Scheduler.mode= FIFO.
>>
>> 1) Have your ever faced this issue if so how to overcome this?.
>>
>> I was in the impression that as soon as I submit the JOB B the Spark
>> Scheduler should distribute/release few resources from the JOB A and share
>> it with the JOB A in the Round Robin fashion?.
>>
>> Appreciate your response !!!.
>>
>>
>> Thanks & Regards,
>> Gokula Krishnan* (Gokul)*
>>
>
>


Spark on Cloudera Configuration (Scheduler Mode = FAIR)

2017-07-20 Thread Gokula Krishnan D
Hello All,

We are having cluster with 50 Executors each with 4 Cores so can avail max.
200 Executors.

I am submitting a Spark application(JOB A) with scheduler.mode as FAIR and
dynamicallocation=true and it got all the available executors.

In the meantime, submitting another Spark Application (JOB B) with the
scheduler.mode as FAIR and dynamicallocation=true but it got only one
executor.

Normally this situation occurs when any of the JOB runs with the
Scheduler.mode= FIFO.

1) Have your ever faced this issue if so how to overcome this?.

I was in the impression that as soon as I submit the JOB B the Spark
Scheduler should distribute/release few resources from the JOB A and share
it with the JOB A in the Round Robin fashion?.

Appreciate your response !!!.


Thanks & Regards,
Gokula Krishnan* (Gokul)*


Spark sc.textFile() files with more partitions Vs files with less partitions

2017-07-20 Thread Gokula Krishnan D
Hello All,

our Spark Applications are designed to process the HDFS Files (Hive
External Tables).

Recently modified the Hive file size by setting the following parameters to
ensure that files are having with the average size of 512MB.
set hive.merge.mapfiles=true
set hive.merge.mapredfiles=true
set hive.merge.smallfiles.avgsize=536870912 (512MB)

Now, I do see the difference in the sc.textFile(HDFS File).count()

Apparently the time has increased drastically since its reading with the
less partitions.

*Is it always better to read any file in Spark with more no.partitions?.
Based on this planning to revert the Hive settings. *


Thanks & Regards,
Gokula Krishnan* (Gokul)*


Re: unit testing in spark

2017-04-10 Thread Gokula Krishnan D
Hello Shiv,

Unit Testing is really helping when you follow TDD approach. And it's a
safe way to code a program locally and also you can make use those test
cases during the build process by using any of the continuous integration
tools ( Bamboo, Jenkins). If so you can ensure that artifacts are being
tested before deploying into Cluster.


Thanks & Regards,
Gokula Krishnan* (Gokul)*

On Wed, Apr 5, 2017 at 7:32 AM, Shiva Ramagopal  wrote:

> Hi,
>
> I've been following this thread for a while.
>
> I'm trying to bring in a test strategy in my team to test a number of data
> pipelines before production. I have watched Lars' presentation and find it
> great. However I'm debating whether unit tests are worth the effort if
> there are good job-level and pipeline-level tests. Does anybody have any
> experiences benefitting from unit-tests in such a case?
>
> Cheers,
> Shiv
>
> On Mon, Dec 12, 2016 at 6:00 AM, Juan Rodríguez Hortalá <
> juan.rodriguez.hort...@gmail.com> wrote:
>
>> Hi all,
>>
>> I would also would like to participate on that.
>>
>> Greetings,
>>
>> Juan
>>
>> On Fri, Dec 9, 2016 at 6:03 AM, Michael Stratton <
>> michael.strat...@komodohealth.com> wrote:
>>
>>> That sounds great, please include me so I can get involved.
>>>
>>> On Fri, Dec 9, 2016 at 7:39 AM, Marco Mistroni 
>>> wrote:
>>>
 Me too as I spent most of my time writing unit/integ tests  pls
 advise on where I  can start
 Kr

 On 9 Dec 2016 12:15 am, "Miguel Morales" 
 wrote:

> I would be interested in contributing.  Ive created my own library for
> this as well.  In my blog post I talk about testing with Spark in RSpec
> style:
> https://medium.com/@therevoltingx/test-driven-development-w-
> apache-spark-746082b44941
>
> Sent from my iPhone
>
> On Dec 8, 2016, at 4:09 PM, Holden Karau  wrote:
>
> There are also libraries designed to simplify testing Spark in the
> various platforms, spark-testing-base
>  for Scala/Java/Python
> (& video https://www.youtube.com/watch?v=f69gSGSLGrY), sscheck
>  (scala focused property based),
> pyspark.test (python focused with py.test instead of unittest2) (&
> blog post from nextdoor https://engblog.nextd
> oor.com/unit-testing-apache-spark-with-py-test-3b8970dc013b#.jw3bdcej9
>  )
>
> Good luck on your Spark Adventures :)
>
> P.S.
>
> If anyone is interested in helping improve spark testing libraries I'm
> always looking for more people to be involved with spark-testing-base
> because I'm lazy :p
>
> On Thu, Dec 8, 2016 at 2:05 PM, Lars Albertsson 
> wrote:
>
>> I wrote some advice in a previous post on the list:
>> http://markmail.org/message/bbs5acrnksjxsrrs
>>
>> It does not mention python, but the strategy advice is the same. Just
>> replace JUnit/Scalatest with pytest, unittest, or your favourite
>> python test framework.
>>
>>
>> I recently held a presentation on the subject. There is a video
>> recording at https://vimeo.com/192429554 and slides at
>> http://www.slideshare.net/lallea/test-strategies-for-data-pr
>> ocessing-pipelines-67244458
>>
>> You can find more material on test strategies at
>> http://www.mapflat.com/lands/resources/reading-list/index.html
>>
>>
>>
>>
>> Lars Albertsson
>> Data engineering consultant
>> www.mapflat.com
>> https://twitter.com/lalleal
>> +46 70 7687109
>> Calendar: https://goo.gl/6FBtlS, https://freebusy.io/lalle@mapf
>> lat.com
>>
>>
>> On Thu, Dec 8, 2016 at 4:14 PM, pseudo oduesp 
>> wrote:
>> > somone can tell me how i can make unit test on pyspark ?
>> > (book, tutorial ...)
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>
>
> --
> Cell : 425-233-8271 <(425)%20233-8271>
> Twitter: https://twitter.com/holdenkarau
>
>
>>>
>>
>


Re: Task Deserialization Error

2016-09-21 Thread Gokula Krishnan D
Hello Sumit -

I could see that SparkConf() specification is not being mentioned in your
program. But rest looks good.



Output:


By the way, I have used the README.md template
https://gist.github.com/jxson/1784669

Thanks & Regards,
Gokula Krishnan* (Gokul)*

On Tue, Sep 20, 2016 at 2:15 AM, Chawla,Sumit 
wrote:

> Hi All
>
> I am trying to test a simple Spark APP using scala.
>
>
> import org.apache.spark.SparkContext
>
> object SparkDemo {
>   def main(args: Array[String]) {
> val logFile = "README.md" // Should be some file on your system
>
> // to run in local mode
> val sc = new SparkContext("local", "Simple App", 
> ""PATH_OF_DIRECTORY_WHERE_COMPILED_SPARK_PROJECT_FROM_GIT")
>
> val logData = sc.textFile(logFile).cache()
> val numAs = logData.filter(line => line.contains("a")).count()
> val numBs = logData.filter(line => line.contains("b")).count()
>
>
> println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
>
>   }
> }
>
>
> When running this demo in IntelliJ, i am getting following error:
>
>
> java.lang.IllegalStateException: unread block data
>   at 
> java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2449)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1385)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
>   at 
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
>   at 
> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:253)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
>
>
> I guess its associated with task not being deserializable.  Any help will be 
> appreciated.
>
>
>
> Regards
> Sumit Chawla
>
>


Things to do learn Cassandra in Apache Spark Environment

2016-08-23 Thread Gokula Krishnan D
Hello All -

Hope, you are doing good.

I have a general question. I am working on Hadoop using Apache Spark.

At this moment, we are not using Cassandra but I would like to know what's
the scope of learning and using it in the Hadoop environment.

It would be great if you could provide the use case to understand more.

If so what are things to do in terms of learning.


Thanks & Regards,
Gokula Krishnan* (Gokul)*


How to view the RDD data based on Partition

2016-01-12 Thread Gokula Krishnan D
Hello All -

I'm just trying to understand aggregate() and in the meantime got an
question.

*Is there any way to view the RDD databased on the partition ?.*

For the instance, the following RDD has 2 partitions

val multi2s = List(2,4,6,8,10,12,14,16,18,20)
val multi2s_RDD = sc.parallelize(multi2s,2)

is there anyway to view the data based on the partitions (0,1).


Thanks & Regards,
Gokula Krishnan* (Gokul)*


Re: How to view the RDD data based on Partition

2016-01-12 Thread Gokula Krishnan D
Hello Prem -

Thanks for sharing and I also found the similar example from the link
http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html#aggregate


But trying the understand the actual functionality or behavior.

Thanks & Regards,
Gokula Krishnan* (Gokul)*

On Tue, Jan 12, 2016 at 2:50 PM, Prem Sure <premsure...@gmail.com> wrote:

>  try mapPartitionsWithIndex .. below is an example I used earlier. myfunc
> logic can be further modified as per your need.
> val x = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 3)
> def myfunc(index: Int, iter: Iterator[Int]) : Iterator[String] = {
>   iter.toList.map(x => index + "," + x).iterator
> }
> x.mapPartitionsWithIndex(myfunc).collect()
> res10: Array[String] = Array(0,1, 0,2, 0,3, 1,4, 1,5, 1,6, 2,7, 2,8, 2,9)
>
> On Tue, Jan 12, 2016 at 2:06 PM, Gokula Krishnan D <email2...@gmail.com>
> wrote:
>
>> Hello All -
>>
>> I'm just trying to understand aggregate() and in the meantime got an
>> question.
>>
>> *Is there any way to view the RDD databased on the partition ?.*
>>
>> For the instance, the following RDD has 2 partitions
>>
>> val multi2s = List(2,4,6,8,10,12,14,16,18,20)
>> val multi2s_RDD = sc.parallelize(multi2s,2)
>>
>> is there anyway to view the data based on the partitions (0,1).
>>
>>
>> Thanks & Regards,
>> Gokula Krishnan* (Gokul)*
>>
>
>


Re: Problem with WINDOW functions?

2015-12-30 Thread Gokula Krishnan D
Hello Vadim -

Alternatively, you can achieve by using the *window functions* which is
available from 1.4.0

*code_value.txt (Input)*
=
1000,200,Descr-200,01
1000,200,Descr-200-new,02
1000,201,Descr-201,01
1000,202,Descr-202-new,03
1000,202,Descr-202,01
1000,202,Descr-202-old,02

*Expected Output(DataFrame):*
==
1000,200,Descr-200-new,02
1000,201,Descr-201,01
1000,202,Descr-202-new,03
==
*Code (Spark-Shell)*
import org.apache.spark.{SparkContext,SparkConf}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions

val sqlSC = new org.apache.spark.sql.hive.HiveContext(sc)
import sqlSC.implicits._

case class data(batch_id:Int,code:String,descr:String,seq:Int)

val input_RDD = sc.textFile("Data/Projects/Spark/Input/code_Value.txt")

val data_RDD =
input_RDD.map(line=>line.split(",")).map(x=>data(x(0).toInt,x(1),x(2),x(3).toInt))
val data_DF  = data_RDD.toDF()
val winSpec =
Window.partitionBy(data_DF("code")).orderBy(data_DF("seq").desc)
{data_DF.select($"batch_id",$"code",$"descr",$"seq",
rowNumber.over(winSpec).alias("rn"))
   .filter($"rn"<=1)
  .select($"batch_id",$"code",$"descr",$"seq")
  .show}
==



Thanks & Regards,
Gokula Krishnan* (Gokul)*

On Wed, Dec 30, 2015 at 11:35 AM, Vadim Tkachenko 
wrote:

> Davies,
>
> Thank you, I will wait on 1.6 release.
>
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Problem-with-WINDOW-functions-tt25833.html
> ?
>
> On Wed, Dec 30, 2015 at 12:06 AM, Davies Liu 
> wrote:
>
>> Window functions are improved in 1.6 release, could you try 1.6-RC4
>> (or wait until next week for the final release)?
>>
>> Even In 1.6, the buffer of rows for window function does not support
>> spilling (also does not use memory efficiently), there is a JIRA for
>> it: https://issues.apache.org/jira/browse/SPARK-12295
>>
>> On Tue, Dec 29, 2015 at 5:28 PM, vadimtk  wrote:
>> > Hi,
>> >
>> > I can't successfully execute a query with WINDOW function.
>> >
>> > The statements are following:
>> >
>> > val orcFile =
>> >
>> sqlContext.read.parquet("/data/flash/spark/dat14sn").filter("upper(project)='EN'")
>> > orcFile.registerTempTable("d1")
>> >  sqlContext.sql("SELECT day,page,dense_rank() OVER (PARTITION BY day
>> ORDER
>> > BY pageviews DESC) as rank FROM d1").filter("rank <=
>> > 20").sort($"day",$"rank").collect().foreach(println)
>> >
>> > with default
>> > spark.driver.memory
>> >
>> > I am getting java.lang.OutOfMemoryError: Java heap space.
>> > The same if I set spark.driver.memory=10g.
>> >
>> > When I set spark.driver.memory=45g (the box has 256GB of RAM) the
>> execution
>> > fails with a different error:
>> >
>> > 15/12/29 23:03:19 WARN HeartbeatReceiver: Removing executor 0 with no
>> recent
>> > heartbeats: 129324 ms exceeds timeout 12 ms
>> >
>> > And I see that GC takes a lot of time.
>> >
>> > What is a proper way to execute statements above?
>> >
>> > I see the similar problems reported
>> >
>> http://stackoverflow.com/questions/32196859/org-apache-spark-shuffle-fetchfailedexception
>> >
>> http://stackoverflow.com/questions/32544478/spark-memory-settings-for-count-action-in-a-big-table
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> > --
>> > View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Problem-with-WINDOW-functions-tp25833.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> >
>> > -
>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> > For additional commands, e-mail: user-h...@spark.apache.org
>> >
>>
>
>


Re: difference between ++ and Union of a RDD

2015-12-29 Thread Gokula Krishnan D
Ted - Thanks for the updates. Then its the same case with sc.parallelize()
or sc.makeRDD() right.

Thanks & Regards,
Gokula Krishnan* (Gokul)*

On Tue, Dec 29, 2015 at 1:43 PM, Ted Yu  wrote:

> From RDD.scala :
>
>   def ++(other: RDD[T]): RDD[T] = withScope {
> this.union(other)
>
> They should be the same.
>
> On Tue, Dec 29, 2015 at 10:41 AM, email2...@gmail.com  > wrote:
>
>> Hello All -
>>
>> tried couple of operations by using ++ and union on RDD's but realized
>> that
>> the end results are same. Do you know any differences?.
>>
>> val odd_partA  = List(1,3,5,7,9,11,1,3,5,7,9,11,1,3,5,7,9,11)
>> odd_partA: List[Int] = List(1, 3, 5, 7, 9, 11, 1, 3, 5, 7, 9, 11, 1, 3, 5,
>> 7, 9, 11)
>>
>> val odd_partB  = List(1,3,13,15,9)
>> odd_partB: List[Int] = List(1, 3, 13, 15, 9)
>>
>> val odd_partC  = List(15,9,1,3,13)
>> odd_partC: List[Int] = List(15, 9, 1, 3, 13)
>>
>> val odd_partA_RDD = sc.parallelize(odd_partA)
>> odd_partA_RDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at
>> parallelize at :17
>>
>> val odd_partB_RDD = sc.parallelize(odd_partB)
>> odd_partB_RDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10]
>> at
>> parallelize at :17
>>
>> val odd_partC_RDD = sc.parallelize(odd_partC)
>> odd_partC_RDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[11]
>> at
>> parallelize at :17
>>
>> val odd_PARTAB_pp = odd_partA_RDD ++(odd_partB_RDD)
>> odd_PARTAB_pp: org.apache.spark.rdd.RDD[Int] = UnionRDD[12] at $plus$plus
>> at
>> :23
>>
>> val odd_PARTAB_union = odd_partA_RDD.union(odd_partB_RDD)
>> odd_PARTAB_union: org.apache.spark.rdd.RDD[Int] = UnionRDD[13] at union at
>> :23
>>
>> odd_PARTAB_pp.count
>> res8: Long = 23
>>
>> odd_PARTAB_union.count
>> res9: Long = 23
>>
>> val odd_PARTABC_pp = odd_partA_RDD ++(odd_partB_RDD) ++ (odd_partC_RDD)
>> odd_PARTABC_pp: org.apache.spark.rdd.RDD[Int] = UnionRDD[15] at $plus$plus
>> at :27
>>
>> val odd_PARTABC_union =
>> odd_partA_RDD.union(odd_partB_RDD).union(odd_partC_RDD)
>> odd_PARTABC_union: org.apache.spark.rdd.RDD[Int] = UnionRDD[17] at union
>> at
>> :27
>>
>> odd_PARTABC_pp.count
>> res10: Long = 28
>>
>> odd_PARTABC_union.count
>> res11: Long = 28
>>
>> Thanks
>> Gokul
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/difference-between-and-Union-of-a-RDD-tp25830.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: How to Parse & flatten JSON object in a text file using Spark & Scala into Dataframe

2015-12-23 Thread Gokula Krishnan D
You can try this .. But slightly modified the  input structure since first
two columns were not in Json format.

[image: Inline image 1]

Thanks & Regards,
Gokula Krishnan* (Gokul)*

On Wed, Dec 23, 2015 at 9:46 AM, Eran Witkon  wrote:

> Did you get a solution for this?
>
> On Tue, 22 Dec 2015 at 20:24 raja kbv  wrote:
>
>> Hi,
>>
>> I am new to spark.
>>
>> I have a text file with below structure.
>>
>>
>> (employeeID: Int, Name: String, ProjectDetails: JsonObject{[{ProjectName,
>> Description, Duriation, Role}]})
>> Eg:
>> (123456, Employee1, {“ProjectDetails”:[
>>  {
>> “ProjectName”: “Web Develoement”, “Description” : “Online Sales website”,
>> “Duration” : “6 Months” , “Role” : “Developer”}
>>  {
>> “ProjectName”: “Spark Develoement”, “Description” : “Online Sales
>> Analysis”, “Duration” : “6 Months” , “Role” : “Data Engineer”}
>>  {
>> “ProjectName”: “Scala Training”, “Description” : “Training”, “Duration” :
>> “1 Month” }
>>   ]
>> }
>>
>>
>> Could someone help me to parse & flatten the record as below dataframe
>> using scala?
>>
>> employeeID,Name, ProjectName, Description, Duration, Role
>> 123456, Employee1, Web Develoement, Online Sales website, 6 Months ,
>> Developer
>> 123456, Employee1, Spark Develoement, Online Sales Analysis, 6 Months,
>> Data Engineer
>> 123456, Employee1, Scala Training, Training, 1 Month, null
>>
>>
>> Thank you in advance.
>>
>> Regards,
>> Raja
>>
>


Database does not exist: (Spark-SQL ===> Hive)

2015-12-14 Thread Gokula Krishnan D
Hello All -


I tried to execute a Spark-Scala Program in order to create a table in HIVE
and faced couple of error so I just tried to execute the "show tables" and
"show databases"

And I have already created a database named "test_db".But I have
encountered the error "Database does not exist"

*Note: I do see couple of posts related to this error but nothing was
helpful for me.*


=
name := "ExploreSBT_V1"

version := "1.0"

scalaVersion := "2.11.5"

libraryDependencies
++=Seq("org.apache.spark"%%"spark-core"%"1.3.0","org.apache.spark"%%"spark-sql"%"1.3.0")
libraryDependencies += "org.apache.spark"%%"spark-hive"%"1.3.0"
=
[image: Inline image 1]

Error: Encountered the following exceptions
:org.apache.spark.sql.execution.QueryExecutionException: FAILED: Execution
Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. Database
does not exist: test_db
15/12/14 18:49:57 ERROR HiveContext:
==
HIVE FAILURE OUTPUT
==




 OK
FAILED: Execution Error, return code 1 from
org.apache.hadoop.hive.ql.exec.DDLTask. Database does not exist: test_db

==
END HIVE FAILURE OUTPUT
==


Process finished with exit code 0

Thanks & Regards,
Gokula Krishnan* (Gokul)*


Re: Filtering records based on empty value of column in SparkSql

2015-12-09 Thread Gokula Krishnan D
Hello Prashant -

Can you please try like this :

For the instance, input file name is "student_detail.txt" and

ID,Name,Sex,Age
===
101,Alfred,Male,30
102,Benjamin,Male,31
103,Charlie,Female,30
104,Julie,Female,30
105,Maven,Male,30
106,Dexter,Male,30
107,Lundy,Male,32
108,Rita,Female,30
109,Aster,Female,30
110,Harrison,Male,15
111,Rita,,30
112,Aster,,30
113,Harrison,,15
114,Rita,Male,20
115,Aster,,30
116,Harrison,,20

[image: Inline image 2]

*Output:*

Total No.of Records without SEX 5
[111,Rita,,30]
[112,Aster,,30]
[113,Harrison,,15]
[115,Aster,,30]
[116,Harrison,,20]

Total No.of Records with AGE <=15 2
[110,Harrison,Male,15]
[113,Harrison,,15]

Thanks & Regards,
Gokula Krishnan* (Gokul)*
Contact :+1 980-298-1740

On Wed, Dec 9, 2015 at 8:24 AM, Prashant Bhardwaj <
prashant2006s...@gmail.com> wrote:

> Already tried it. But getting following error.
>
> overloaded method value filter with alternatives: (conditionExpr:
> String)org.apache.spark.sql.DataFrame  (condition:
> org.apache.spark.sql.Column)org.apache.spark.sql.DataFrame cannot be
> applied to (Boolean)
>
> Also tried:
>
> val req_logs_with_dpid = 
> req_logs.filter(req_logs("req_info.dpid").toString.length
> != 0 )
>
> But getting same error.
>
>
> On Wed, Dec 9, 2015 at 6:45 PM, Fengdong Yu 
> wrote:
>
>> val req_logs_with_dpid = req_logs.filter(req_logs("req_info.pid") != "" )
>>
>> Azuryy Yu
>> Sr. Infrastructure Engineer
>>
>> cel: 158-0164-9103
>> wetchat: azuryy
>>
>>
>> On Wed, Dec 9, 2015 at 7:43 PM, Prashant Bhardwaj <
>> prashant2006s...@gmail.com> wrote:
>>
>>> Hi
>>>
>>> I have two columns in my json which can have null, empty and non-empty
>>> string as value.
>>> I know how to filter records which have non-null value using following:
>>>
>>> val req_logs = sqlContext.read.json(filePath)
>>>
>>> val req_logs_with_dpid = req_log.filter("req_info.dpid is not null or
>>> req_info.dpid_sha1 is not null")
>>>
>>> But how to filter if value of column is empty string?
>>> --
>>> Regards
>>> Prashant
>>>
>>
>>
>
>
> --
> Regards
> Prashant
>


Re: Filtering records based on empty value of column in SparkSql

2015-12-09 Thread Gokula Krishnan D
Ok, then you can slightly change like

[image: Inline image 1]

Thanks & Regards,
Gokula Krishnan* (Gokul)*


On Wed, Dec 9, 2015 at 11:09 AM, Prashant Bhardwaj <
prashant2006s...@gmail.com> wrote:

> I have to do opposite of what you're doing. I have to filter non-empty
> records.
>
> On Wed, Dec 9, 2015 at 9:33 PM, Gokula Krishnan D <email2...@gmail.com>
> wrote:
>
>> Hello Prashant -
>>
>> Can you please try like this :
>>
>> For the instance, input file name is "student_detail.txt" and
>>
>> ID,Name,Sex,Age
>> ===
>> 101,Alfred,Male,30
>> 102,Benjamin,Male,31
>> 103,Charlie,Female,30
>> 104,Julie,Female,30
>> 105,Maven,Male,30
>> 106,Dexter,Male,30
>> 107,Lundy,Male,32
>> 108,Rita,Female,30
>> 109,Aster,Female,30
>> 110,Harrison,Male,15
>> 111,Rita,,30
>> 112,Aster,,30
>> 113,Harrison,,15
>> 114,Rita,Male,20
>> 115,Aster,,30
>> 116,Harrison,,20
>>
>> [image: Inline image 2]
>>
>> *Output:*
>>
>> Total No.of Records without SEX 5
>> [111,Rita,,30]
>> [112,Aster,,30]
>> [113,Harrison,,15]
>> [115,Aster,,30]
>> [116,Harrison,,20]
>>
>> Total No.of Records with AGE <=15 2
>> [110,Harrison,Male,15]
>> [113,Harrison,,15]
>>
>> Thanks & Regards,
>> Gokula Krishnan* (Gokul)*
>> Contact :+1 980-298-1740
>>
>> On Wed, Dec 9, 2015 at 8:24 AM, Prashant Bhardwaj <
>> prashant2006s...@gmail.com> wrote:
>>
>>> Already tried it. But getting following error.
>>>
>>> overloaded method value filter with alternatives: (conditionExpr:
>>> String)org.apache.spark.sql.DataFrame  (condition:
>>> org.apache.spark.sql.Column)org.apache.spark.sql.DataFrame cannot be
>>> applied to (Boolean)
>>>
>>> Also tried:
>>>
>>> val req_logs_with_dpid = 
>>> req_logs.filter(req_logs("req_info.dpid").toString.length
>>> != 0 )
>>>
>>> But getting same error.
>>>
>>>
>>> On Wed, Dec 9, 2015 at 6:45 PM, Fengdong Yu <fengdo...@everstring.com>
>>> wrote:
>>>
>>>> val req_logs_with_dpid = req_logs.filter(req_logs("req_info.pid") !=
>>>> "" )
>>>>
>>>> Azuryy Yu
>>>> Sr. Infrastructure Engineer
>>>>
>>>> cel: 158-0164-9103
>>>> wetchat: azuryy
>>>>
>>>>
>>>> On Wed, Dec 9, 2015 at 7:43 PM, Prashant Bhardwaj <
>>>> prashant2006s...@gmail.com> wrote:
>>>>
>>>>> Hi
>>>>>
>>>>> I have two columns in my json which can have null, empty and non-empty
>>>>> string as value.
>>>>> I know how to filter records which have non-null value using following:
>>>>>
>>>>> val req_logs = sqlContext.read.json(filePath)
>>>>>
>>>>> val req_logs_with_dpid = req_log.filter("req_info.dpid is not null or
>>>>> req_info.dpid_sha1 is not null")
>>>>>
>>>>> But how to filter if value of column is empty string?
>>>>> --
>>>>> Regards
>>>>> Prashant
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Regards
>>> Prashant
>>>
>>
>>
>
>
> --
> Regards
> Prashant
>


Re: Filtering records based on empty value of column in SparkSql

2015-12-09 Thread Gokula Krishnan D
Please refer the link and drop() provides features to drop the rows with
Null / Non-Null columns. Hope, it also helps.

https://spark.apache.org/docs/1.5.2/api/scala/index.html#org.apache.spark.sql.DataFrameNaFunctions



Thanks & Regards,
Gokula Krishnan* (Gokul)*

On Wed, Dec 9, 2015 at 11:12 AM, Gokula Krishnan D <email2...@gmail.com>
wrote:

> Ok, then you can slightly change like
>
> [image: Inline image 1]
>
> Thanks & Regards,
> Gokula Krishnan* (Gokul)*
>
>
> On Wed, Dec 9, 2015 at 11:09 AM, Prashant Bhardwaj <
> prashant2006s...@gmail.com> wrote:
>
>> I have to do opposite of what you're doing. I have to filter non-empty
>> records.
>>
>> On Wed, Dec 9, 2015 at 9:33 PM, Gokula Krishnan D <email2...@gmail.com>
>> wrote:
>>
>>> Hello Prashant -
>>>
>>> Can you please try like this :
>>>
>>> For the instance, input file name is "student_detail.txt" and
>>>
>>> ID,Name,Sex,Age
>>> ===
>>> 101,Alfred,Male,30
>>> 102,Benjamin,Male,31
>>> 103,Charlie,Female,30
>>> 104,Julie,Female,30
>>> 105,Maven,Male,30
>>> 106,Dexter,Male,30
>>> 107,Lundy,Male,32
>>> 108,Rita,Female,30
>>> 109,Aster,Female,30
>>> 110,Harrison,Male,15
>>> 111,Rita,,30
>>> 112,Aster,,30
>>> 113,Harrison,,15
>>> 114,Rita,Male,20
>>> 115,Aster,,30
>>> 116,Harrison,,20
>>>
>>> [image: Inline image 2]
>>>
>>> *Output:*
>>>
>>> Total No.of Records without SEX 5
>>> [111,Rita,,30]
>>> [112,Aster,,30]
>>> [113,Harrison,,15]
>>> [115,Aster,,30]
>>> [116,Harrison,,20]
>>>
>>> Total No.of Records with AGE <=15 2
>>> [110,Harrison,Male,15]
>>> [113,Harrison,,15]
>>>
>>> Thanks & Regards,
>>> Gokula Krishnan* (Gokul)*
>>> Contact :+1 980-298-1740
>>>
>>> On Wed, Dec 9, 2015 at 8:24 AM, Prashant Bhardwaj <
>>> prashant2006s...@gmail.com> wrote:
>>>
>>>> Already tried it. But getting following error.
>>>>
>>>> overloaded method value filter with alternatives: (conditionExpr:
>>>> String)org.apache.spark.sql.DataFrame  (condition:
>>>> org.apache.spark.sql.Column)org.apache.spark.sql.DataFrame cannot be
>>>> applied to (Boolean)
>>>>
>>>> Also tried:
>>>>
>>>> val req_logs_with_dpid = 
>>>> req_logs.filter(req_logs("req_info.dpid").toString.length
>>>> != 0 )
>>>>
>>>> But getting same error.
>>>>
>>>>
>>>> On Wed, Dec 9, 2015 at 6:45 PM, Fengdong Yu <fengdo...@everstring.com>
>>>> wrote:
>>>>
>>>>> val req_logs_with_dpid = req_logs.filter(req_logs("req_info.pid") !=
>>>>> "" )
>>>>>
>>>>> Azuryy Yu
>>>>> Sr. Infrastructure Engineer
>>>>>
>>>>> cel: 158-0164-9103
>>>>> wetchat: azuryy
>>>>>
>>>>>
>>>>> On Wed, Dec 9, 2015 at 7:43 PM, Prashant Bhardwaj <
>>>>> prashant2006s...@gmail.com> wrote:
>>>>>
>>>>>> Hi
>>>>>>
>>>>>> I have two columns in my json which can have null, empty and
>>>>>> non-empty string as value.
>>>>>> I know how to filter records which have non-null value using
>>>>>> following:
>>>>>>
>>>>>> val req_logs = sqlContext.read.json(filePath)
>>>>>>
>>>>>> val req_logs_with_dpid = req_log.filter("req_info.dpid is not null
>>>>>> or req_info.dpid_sha1 is not null")
>>>>>>
>>>>>> But how to filter if value of column is empty string?
>>>>>> --
>>>>>> Regards
>>>>>> Prashant
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Regards
>>>> Prashant
>>>>
>>>
>>>
>>
>>
>> --
>> Regards
>> Prashant
>>
>
>


How to get the list of available Transformations and actions for a RDD in Spark-Shell

2015-12-04 Thread Gokula Krishnan D
Hello All -

In spark-shell when we press tab after . ; we could see the
possible list of transformations and actions.

But unable to see all the list. is there any other way to get the rest of
the list. I'm mainly looking for sortByKey()

val sales_RDD = sc.textFile("Data/Scala/phone_sales.txt")
val sales_map = sales_RDD.map(sales=>{val x=sales.split(","); (x(0),x(1))})

Layout of phone_sales.txt is (Brand, #.of Phones sold)

I am mainly looking for SortByKey() but when I do Sales_map or sales_RDD, I
could see only sortBy() but not SortByKey().

By the way, I am using spark 1.3.0 with CDH 5.4

[image: Inline image 1]



Thanks
Gokul


Re: How to get the list of available Transformations and actions for a RDD in Spark-Shell

2015-12-04 Thread Gokula Krishnan D
Thanks Ayan for the updates.

But in my example, I hope "sales_map" is a pair_RDD , isn't it?.

Thanks & Regards,
Gokula Krishnan* (Gokul)*


On Fri, Dec 4, 2015 at 8:16 AM, ayan guha <guha.a...@gmail.com> wrote:

> sortByKey() is a property of pairRDD as it requires key value pair to
> work. I think in scala their are transformation such as .toPairRDD().
>
> On Sat, Dec 5, 2015 at 12:01 AM, Gokula Krishnan D <email2...@gmail.com>
> wrote:
>
>> Hello All -
>>
>> In spark-shell when we press tab after . ; we could see the
>> possible list of transformations and actions.
>>
>> But unable to see all the list. is there any other way to get the rest of
>> the list. I'm mainly looking for sortByKey()
>>
>> val sales_RDD = sc.textFile("Data/Scala/phone_sales.txt")
>> val sales_map = sales_RDD.map(sales=>{val x=sales.split(",");
>> (x(0),x(1))})
>>
>> Layout of phone_sales.txt is (Brand, #.of Phones sold)
>>
>> I am mainly looking for SortByKey() but when I do Sales_map or sales_RDD,
>> I could see only sortBy() but not SortByKey().
>>
>> By the way, I am using spark 1.3.0 with CDH 5.4
>>
>> [image: Inline image 1]
>>
>>
>>
>> Thanks
>> Gokul
>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>