Re: spark df schema to hive schema converter func

2016-08-06 Thread Sumit Khanna
wrt https://issues.apache.org/jira/browse/SPARK-5236. How do I also,
usually convert something of type DecimalType to int/ string/ etc etc.

Thanks,

On Sun, Aug 7, 2016 at 10:33 AM, Sumit Khanna  wrote:

> Hi,
>
> was wondering if we have something like that takes as an argument a spark
> df type e.g DecimalType(12,5) and converts it into the corresponding hive
> schema type. Double / Decimal / String ?
>
> Any ideas.
>
> Thanks,
>


spark df schema to hive schema converter func

2016-08-06 Thread Sumit Khanna
Hi,

was wondering if we have something like that takes as an argument a spark
df type e.g DecimalType(12,5) and converts it into the corresponding hive
schema type. Double / Decimal / String ?

Any ideas.

Thanks,


Re: Dropping late date in Structured Streaming

2016-08-06 Thread Matei Zaharia
Yes, a built-in mechanism is planned in future releases. You can also drop it 
using a filter for now but the stateful operators will still keep state for old 
windows.

Matei

> On Aug 6, 2016, at 9:40 AM, Amit Sela  wrote:
> 
> I've noticed that when using Structured Streaming with event-time windows 
> (fixed/sliding), all windows are retained. This is clearly how "late" data is 
> handled, but I was wondering if there is some pruning mechanism that I might 
> have missed ? or is this planned in future releases ?
> 
> Thanks,
> Amit


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Dataframe / Dataset partition size...

2016-08-06 Thread Muthu Jayakumar
Hello Dr Mich Talebzadeh,

>Can you kindly advise on your number of nodes, the cores for each node and
the RAM for each node.
I have a 32 node (1 executor per node currently) cluster. All these have
512 GB  of memory. Most of these are either 16 or 20 physical cores (with
out HT enabled). The HDFS is configured to run on another set of nodes (but
are all part of the same rack / subnet)

>Is this a parquet file?
Yes, it is a parquet directory.

>What I don't understand why you end up with 220 files whereas you
partition says 25
I do have some kind of hack ;) in place that can roughly size the file to a
block size of my HDFS so that the number of parts created can be optimized
for HDFS storage. But I wanted to understand why it allocates smaller
number of cores during a read cycle?

My current work around for this problem is to run multiple parallel queries
of this kind :( (basically scala Future - fork-join magic). But, this seem
incorrect.

I do have some parquet files that uses like 9 partitions (though the files
are 200).

Here is a sample code from Spark 2.0.0 shell that i tried...

case class Customer(number: Int)
import org.apache.spark.sql._
import spark.implicits._
val parquetFile = "hdfs://myip:port/tmp/dummy.parquet"
spark.createDataset(1 to
1).map(Customer).repartition(200).write.mode(SaveMode.Overwrite).parquet(parquetFile)

scala> spark.read.parquet(parquetFile).toJavaRDD.partitions.size()
res1: Int = 23

scala> spark.read.parquet(parquetFile).toJavaRDD.partitions.size()
res2: Int = 20

Can I suspect something with dynamic allocation perhaps?

Please advice,
Muthu


On Sat, Aug 6, 2016 at 3:23 PM, Mich Talebzadeh 
wrote:

> 720 cores Wow. That is a hell of cores  Muthu :)
>
> Ok let us take a step back
>
> Can you kindly advise on your number of nodes, the cores for each node and
> the RAM for each node.
>
> What I don't understand why you end up with 220 files whereas you
> partition says 25. Now you have 2.2GB of size so each file only has
> 2.2GB/220  = 10MB. That is a lot of files for nothing. The app has to load
> each file
> Is this a parquet file?
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 6 August 2016 at 23:09, Muthu Jayakumar  wrote:
>
>> Hello Dr Mich Talebzadeh,
>>
>> Thank you for looking into my question. W.r.t
>> >However, in reality the number of partitions should not exceed the
>> total number of cores in your cluster?
>> I do have 720 cores available in a cluster for this to run. It does run
>> in dynamic provisioning.
>>
>> On a side note, I was expecting the partition count to match up to what
>> you have. But :( , my numbers above now asks me to understand the APIs
>> better :).
>>
>> Please advice,
>> Muthu
>>
>>
>>
>> On Sat, Aug 6, 2016 at 1:54 PM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Hi Muthu.
>>>
>>> Interesting question.
>>>
>>> I have the following:
>>>
>>> scala> val s = HiveContext.table("dummy_parqu
>>> et").toJavaRDD.partitions.size()
>>> s: Int = 256
>>>
>>> and on HDFS it has
>>>
>>> hdfs dfs -ls  /user/hive/warehouse/oraclehadoop.db/dummy_parquet|wc -l
>>> 16/08/06 21:50:45 WARN util.NativeCodeLoader: Unable to load
>>> native-hadoop library for your platform... using builtin-java classes where
>>> applicable
>>>  257
>>>
>>> Which is somehow consistent. its size
>>>
>>> hdfs dfs -du -h -s  /user/hive/warehouse/oraclehadoop.db/dummy_parquet
>>> 16/08/06 21:51:50 WARN util.NativeCodeLoader: Unable to load
>>> native-hadoop library for your platform... using builtin-java classes where
>>> applicable
>>> 5.9 G  /user/hive/warehouse/oraclehadoop.db/dummy_parquet
>>>
>>> nearly 6GB
>>>
>>> sc.defaultParallelism
>>> res6: Int = 1
>>>
>>>
>>> However, in reality the number of partitions should not exceed the total
>>> number of cores in your cluster?
>>>
>>>
>>>
>>> HTH
>>>
>>>
>>>
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no 

Re: Symbol HasInputCol is inaccesible from this place

2016-08-06 Thread Ted Yu
I searched *Suite.scala and found only the following contains some classes
extending Transformer :

./mllib/src/test/scala/org/apache/spark/ml/PipelineSuite.scala

But HasInputCol is not used.

FYI

On Sat, Aug 6, 2016 at 11:01 AM, janardhan shetty 
wrote:

> Yes seems like, wondering if this can be made public in order to develop
> custom transformers or any other alternatives ?
>
> On Sat, Aug 6, 2016 at 10:07 AM, Ted Yu  wrote:
>
>> Is it because HasInputCol is private ?
>>
>> private[ml] trait HasInputCol extends Params {
>>
>> On Thu, Aug 4, 2016 at 1:18 PM, janardhan shetty 
>> wrote:
>>
>>> Version : 2.0.0-preview
>>>
>>> import org.apache.spark.ml.param._
>>> import org.apache.spark.ml.param.shared.{HasInputCol, HasOutputCol}
>>>
>>>
>>> class CustomTransformer(override val uid: String) extends Transformer
>>> with HasInputCol with HasOutputCol with DefaultParamsWritableimport
>>> org.apache.spark.ml.param.shared.{HasInputCol, HasOutputCol}
>>> HasInputCol, HasOutputCol}
>>>
>>> *Error in IntelliJ *
>>> Symbol HasInputCol is inaccessible from this place
>>>  similairly for HasOutputCol and DefaultParamsWritable
>>>
>>> Any thoughts on this error as it is not allowing the compile
>>>
>>>
>>>
>>
>


Spark Application Counters Using Rest API

2016-08-06 Thread Muhammad Haris
Hi,
Could anybody please guide me how to get application or job level counters
for CPU and Memory in Spark 2.0.0 using REST API.
I have explored the API's at
http://spark.apache.org/docs/latest/monitoring.html
but did not find anything similar to what MR provides, see the link below:
(
http://hadoop.apache.org/docs/current/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapredAppMasterRest.html#Job_Counters_API
)

Looking forward for quick help
Regards


Dataframe / Dataset partition size...

2016-08-06 Thread Muthu Jayakumar
Hello there,

I am trying to understand how I could improve (or increase) the parallelism
of tasks that run for a particular spark job.
Here is my observation...

scala> spark.read.parquet("hdfs://somefile").toJavaRDD.partitions.size()
25

> hadoop fs -ls hdfs://somefile | grep 'part-r' | wc -l
200

> hadoop fs -du -h -s hdfs://somefile
2.2 G

I notice that, depending on what the repartition / coalesce the number of
part files to HDFS is created appropriately during the save operation.
Meaning the number of part files can be tweaked according to this parameter.

But, how do I control the 'partitions.size()'? Meaning, I want to have this
to be 200 (without having to repartition it during the read operation so
that I would be able have more number of tasks run for this job)
This has a major impact in-terms of the time it takes to perform query
operations on this job.

On a side note, I do understand that 200 parquet part files for the above
2.2 G seems over-kill for a 128 MB block size. Ideally it should be 18
parts or so.

Please advice,
Muthu


Re: Explanation regarding Spark Streaming

2016-08-06 Thread Mich Talebzadeh
Thanks.

This is very confusing as the thread owner question does not specify
whether there is windowing operations or not.

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 6 August 2016 at 20:35, Mohammed Guller  wrote:

> According to the docs for Spark Streaming, the default for data received
> through receivers is MEMORY_AND_DISK_SER_2. If windowing operations are
> performed, RDDs are persisted with StorageLevel.MEMORY_ONLY_SER.
>
>
>
> http://spark.apache.org/docs/latest/streaming-programming-
> guide.html#data-serialization
>
>
>
> Mohammed
>
> Author: Big Data Analytics with Spark
> 
>
>
>
> *From:* Mich Talebzadeh [mailto:mich.talebza...@gmail.com]
> *Sent:* Saturday, August 6, 2016 12:25 PM
> *To:* Mohammed Guller
> *Cc:* Jacek Laskowski; Saurav Sinha; user
>
> *Subject:* Re: Explanation regarding Spark Streaming
>
>
>
> Hi,
>
>
>
> I think the default storage level
> is
> MEMORY_ONLY
>
>
>
> HTH
>
>
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn  
> *https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
>
> On 6 August 2016 at 18:16, Mohammed Guller  wrote:
>
> Hi Jacek,
>
> Yes, I am assuming that data streams in consistently at the same rate (for
> example, 100MB/s).
>
>
>
> BTW, even if the persistence level for streaming data is set to
> MEMORY_AND_DISK_SER_2 (the default), once Spark runs out of memory, data
> will spill to  disk. That will make the application performance even worse.
>
>
>
> Mohammed
>
>
>
> *From:* Jacek Laskowski [mailto:ja...@japila.pl]
> *Sent:* Saturday, August 6, 2016 1:54 AM
> *To:* Mohammed Guller
> *Cc:* Saurav Sinha; user
> *Subject:* RE: Explanation regarding Spark Streaming
>
>
>
> Hi,
>
> Thanks for explanation, but it does not prove Spark will OOM at some
> point. You assume enough data to store but there could be none.
>
> Jacek
>
>
>
> On 6 Aug 2016 4:23 a.m., "Mohammed Guller"  wrote:
>
> Assume the batch interval is 10 seconds and batch processing time is 30
> seconds. So while Spark Streaming is processing the first batch, the
> receiver will have a backlog of 20 seconds worth of data. By the time Spark
> Streaming finishes batch #2, the receiver will have 40 seconds worth of
> data in memory buffer. This backlog will keep growing as time passes
> assuming data streams in consistently at the same rate.
>
> Also keep in mind that windowing operations on a DStream implicitly
> persist every RDD in a DStream in memory.
>
> Mohammed
>
> -Original Message-
> From: Jacek Laskowski [mailto:ja...@japila.pl]
> Sent: Thursday, August 4, 2016 4:25 PM
> To: Mohammed Guller
> Cc: Saurav Sinha; user
> Subject: Re: Explanation regarding Spark Streaming
>
> On Fri, Aug 5, 2016 at 12:48 AM, Mohammed Guller 
> wrote:
> > and eventually you will run out of memory.
>
> Why? Mind elaborating?
>
> Jacek
>
>
>


RE: Explanation regarding Spark Streaming

2016-08-06 Thread Mohammed Guller
According to the docs for Spark Streaming, the default for data received 
through receivers is MEMORY_AND_DISK_SER_2. If windowing operations are 
performed, RDDs are persisted with StorageLevel.MEMORY_ONLY_SER.

http://spark.apache.org/docs/latest/streaming-programming-guide.html#data-serialization

Mohammed
Author: Big Data Analytics with 
Spark

From: Mich Talebzadeh [mailto:mich.talebza...@gmail.com]
Sent: Saturday, August 6, 2016 12:25 PM
To: Mohammed Guller
Cc: Jacek Laskowski; Saurav Sinha; user
Subject: Re: Explanation regarding Spark Streaming

Hi,

I think the default storage level 
 is 
MEMORY_ONLY

HTH




Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.



On 6 August 2016 at 18:16, Mohammed Guller 
> wrote:
Hi Jacek,
Yes, I am assuming that data streams in consistently at the same rate (for 
example, 100MB/s).

BTW, even if the persistence level for streaming data is set to 
MEMORY_AND_DISK_SER_2 (the default), once Spark runs out of memory, data will 
spill to  disk. That will make the application performance even worse.

Mohammed

From: Jacek Laskowski [mailto:ja...@japila.pl]
Sent: Saturday, August 6, 2016 1:54 AM
To: Mohammed Guller
Cc: Saurav Sinha; user
Subject: RE: Explanation regarding Spark Streaming


Hi,

Thanks for explanation, but it does not prove Spark will OOM at some point. You 
assume enough data to store but there could be none.

Jacek

On 6 Aug 2016 4:23 a.m., "Mohammed Guller" 
> wrote:
Assume the batch interval is 10 seconds and batch processing time is 30 
seconds. So while Spark Streaming is processing the first batch, the receiver 
will have a backlog of 20 seconds worth of data. By the time Spark Streaming 
finishes batch #2, the receiver will have 40 seconds worth of data in memory 
buffer. This backlog will keep growing as time passes assuming data streams in 
consistently at the same rate.

Also keep in mind that windowing operations on a DStream implicitly persist 
every RDD in a DStream in memory.

Mohammed

-Original Message-
From: Jacek Laskowski [mailto:ja...@japila.pl]
Sent: Thursday, August 4, 2016 4:25 PM
To: Mohammed Guller
Cc: Saurav Sinha; user
Subject: Re: Explanation regarding Spark Streaming

On Fri, Aug 5, 2016 at 12:48 AM, Mohammed Guller 
> wrote:
> and eventually you will run out of memory.

Why? Mind elaborating?

Jacek



Re: Explanation regarding Spark Streaming

2016-08-06 Thread Mich Talebzadeh
Hi,

I think the default storage level
is
MEMORY_ONLY

HTH



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 6 August 2016 at 18:16, Mohammed Guller  wrote:

> Hi Jacek,
>
> Yes, I am assuming that data streams in consistently at the same rate (for
> example, 100MB/s).
>
>
>
> BTW, even if the persistence level for streaming data is set to
> MEMORY_AND_DISK_SER_2 (the default), once Spark runs out of memory, data
> will spill to  disk. That will make the application performance even worse.
>
>
>
> Mohammed
>
>
>
> *From:* Jacek Laskowski [mailto:ja...@japila.pl]
> *Sent:* Saturday, August 6, 2016 1:54 AM
> *To:* Mohammed Guller
> *Cc:* Saurav Sinha; user
> *Subject:* RE: Explanation regarding Spark Streaming
>
>
>
> Hi,
>
> Thanks for explanation, but it does not prove Spark will OOM at some
> point. You assume enough data to store but there could be none.
>
> Jacek
>
>
>
> On 6 Aug 2016 4:23 a.m., "Mohammed Guller"  wrote:
>
> Assume the batch interval is 10 seconds and batch processing time is 30
> seconds. So while Spark Streaming is processing the first batch, the
> receiver will have a backlog of 20 seconds worth of data. By the time Spark
> Streaming finishes batch #2, the receiver will have 40 seconds worth of
> data in memory buffer. This backlog will keep growing as time passes
> assuming data streams in consistently at the same rate.
>
> Also keep in mind that windowing operations on a DStream implicitly
> persist every RDD in a DStream in memory.
>
> Mohammed
>
> -Original Message-
> From: Jacek Laskowski [mailto:ja...@japila.pl]
> Sent: Thursday, August 4, 2016 4:25 PM
> To: Mohammed Guller
> Cc: Saurav Sinha; user
> Subject: Re: Explanation regarding Spark Streaming
>
> On Fri, Aug 5, 2016 at 12:48 AM, Mohammed Guller 
> wrote:
> > and eventually you will run out of memory.
>
> Why? Mind elaborating?
>
> Jacek
>


Re: Symbol HasInputCol is inaccesible from this place

2016-08-06 Thread janardhan shetty
Yes seems like, wondering if this can be made public in order to develop
custom transformers or any other alternatives ?

On Sat, Aug 6, 2016 at 10:07 AM, Ted Yu  wrote:

> Is it because HasInputCol is private ?
>
> private[ml] trait HasInputCol extends Params {
>
> On Thu, Aug 4, 2016 at 1:18 PM, janardhan shetty 
> wrote:
>
>> Version : 2.0.0-preview
>>
>> import org.apache.spark.ml.param._
>> import org.apache.spark.ml.param.shared.{HasInputCol, HasOutputCol}
>>
>>
>> class CustomTransformer(override val uid: String) extends Transformer
>> with HasInputCol with HasOutputCol with DefaultParamsWritableimport
>> org.apache.spark.ml.param.shared.{HasInputCol, HasOutputCol}
>> HasInputCol, HasOutputCol}
>>
>> *Error in IntelliJ *
>> Symbol HasInputCol is inaccessible from this place
>>  similairly for HasOutputCol and DefaultParamsWritable
>>
>> Any thoughts on this error as it is not allowing the compile
>>
>>
>>
>


Long running tasks in stages

2016-08-06 Thread Deepak Sharma
I am doing join over 1 dataframe and a empty data frame.
The first dataframe got almost 50k records.
This operation nvere returns back and runs indefinitely.
Is there any solution to get around this?

-- 
Thanks
Deepak
www.bigdatabig.com
www.keosha.net


Help testing the Spark Extensions for the Apache Bahir 2.0.0 release

2016-08-06 Thread Luciano Resende
Apache Bahir is voting it's 2.0.0 release based on Apache Spark 2.0.0.

https://www.mail-archive.com/dev@bahir.apache.org/msg00312.html

We appreciate any help reviewing/testing the release, which contains the
following Apache Spark extensions:

Akka DStream connector
MQTT DStream connector
Twitter DStream connector
ZeroMQ DStream connector

MQTT Structured Streaming

Thanks in advance

-- 
Luciano Resende
http://twitter.com/lresende1975
http://lresende.blogspot.com/


RE: Explanation regarding Spark Streaming

2016-08-06 Thread Mohammed Guller
Hi Jacek,
Yes, I am assuming that data streams in consistently at the same rate (for 
example, 100MB/s).

BTW, even if the persistence level for streaming data is set to 
MEMORY_AND_DISK_SER_2 (the default), once Spark runs out of memory, data will 
spill to  disk. That will make the application performance even worse.

Mohammed

From: Jacek Laskowski [mailto:ja...@japila.pl]
Sent: Saturday, August 6, 2016 1:54 AM
To: Mohammed Guller
Cc: Saurav Sinha; user
Subject: RE: Explanation regarding Spark Streaming


Hi,

Thanks for explanation, but it does not prove Spark will OOM at some point. You 
assume enough data to store but there could be none.

Jacek

On 6 Aug 2016 4:23 a.m., "Mohammed Guller" 
> wrote:
Assume the batch interval is 10 seconds and batch processing time is 30 
seconds. So while Spark Streaming is processing the first batch, the receiver 
will have a backlog of 20 seconds worth of data. By the time Spark Streaming 
finishes batch #2, the receiver will have 40 seconds worth of data in memory 
buffer. This backlog will keep growing as time passes assuming data streams in 
consistently at the same rate.

Also keep in mind that windowing operations on a DStream implicitly persist 
every RDD in a DStream in memory.

Mohammed

-Original Message-
From: Jacek Laskowski [mailto:ja...@japila.pl]
Sent: Thursday, August 4, 2016 4:25 PM
To: Mohammed Guller
Cc: Saurav Sinha; user
Subject: Re: Explanation regarding Spark Streaming

On Fri, Aug 5, 2016 at 12:48 AM, Mohammed Guller 
> wrote:
> and eventually you will run out of memory.

Why? Mind elaborating?

Jacek


Re: Symbol HasInputCol is inaccesible from this place

2016-08-06 Thread Ted Yu
Is it because HasInputCol is private ?

private[ml] trait HasInputCol extends Params {

On Thu, Aug 4, 2016 at 1:18 PM, janardhan shetty 
wrote:

> Version : 2.0.0-preview
>
> import org.apache.spark.ml.param._
> import org.apache.spark.ml.param.shared.{HasInputCol, HasOutputCol}
>
>
> class CustomTransformer(override val uid: String) extends Transformer with
> HasInputCol with HasOutputCol with DefaultParamsWritableimport
> org.apache.spark.ml.param.shared.{HasInputCol, HasOutputCol}
> HasInputCol, HasOutputCol}
>
> *Error in IntelliJ *
> Symbol HasInputCol is inaccessible from this place
>  similairly for HasOutputCol and DefaultParamsWritable
>
> Any thoughts on this error as it is not allowing the compile
>
>
>


Re: Symbol HasInputCol is inaccesible from this place

2016-08-06 Thread janardhan shetty
Any thoughts or suggestions on this error?

On Thu, Aug 4, 2016 at 1:18 PM, janardhan shetty 
wrote:

> Version : 2.0.0-preview
>
> import org.apache.spark.ml.param._
> import org.apache.spark.ml.param.shared.{HasInputCol, HasOutputCol}
>
>
> class CustomTransformer(override val uid: String) extends Transformer with
> HasInputCol with HasOutputCol with DefaultParamsWritableimport
> org.apache.spark.ml.param.shared.{HasInputCol, HasOutputCol}
> HasInputCol, HasOutputCol}
>
> *Error in IntelliJ *
> Symbol HasInputCol is inaccessible from this place
>  similairly for HasOutputCol and DefaultParamsWritable
>
> Any thoughts on this error as it is not allowing the compile
>
>
>


Re: Kmeans dataset initialization

2016-08-06 Thread Tony Lane
Can anyone suggest how I can initialize kmeans structure directly from a
dataset of Row

On Sat, Aug 6, 2016 at 1:03 AM, Tony Lane  wrote:

> I have all the data required for KMeans in a dataset in memory
>
> Standard approach to load this data from a file is
> spark.read().format("libsvm").load(filename)
>
> where the file has data in the format
> 0 1:0.0 2:0.0 3:0.0
>
>
> How do i this from an in-memory dataset already present.
> Any suggestions ?
>
> -Tony
>
>


Dropping late date in Structured Streaming

2016-08-06 Thread Amit Sela
I've noticed that when using Structured Streaming with event-time windows
(fixed/sliding), all windows are retained. This is clearly how "late" data
is handled, but I was wondering if there is some pruning mechanism that I
might have missed ? or is this planned in future releases ?

Thanks,
Amit


Re: Explanation regarding Spark Streaming

2016-08-06 Thread Mich Talebzadeh
The thread owner question is

Q1. What will happen if spark streaming job have batchDurationTime as 60
sec and processing time of complete pipeline is greater then 60 sec. "


This basically means that you will gradually building a backlog and
regardless of whether you are going to blow up the buffer or not that data
analysis will have serious flow!

In example below I have a batch interval of 2 seconds streaming in 10,000
rows/events. The windows length = 4 sec (twice the batch interval) and
sliding window set at 2 sec. I have deliberately set the volume of
streaming in this case high.

As you can see from the Streaming graphs, there is serious issues here with
average of 213 events/sec and scheduling delay of 5 seconds

[image: Inline images 1]

Technically the app may not crash but its business value is practically
nil. If I was doing this for some form of complex event processing or fraud
detection, I would have to look for a new job :(

So monitor the processing to make sure that it is running smoothly and
ensure that there is no backlog. Also look at your memory usage. For
example, if you are receiving a single stream of 100MB/second, and you want
to do 60 second batches (window length), then you will need a buffer
of 100*60 MB = 6000MB or 6GB at the very least. Note that if you are using
a single receiver, then all the data is coming to a single Spark worker
machine, so each machine should be about that. Add to that other overheads
of running Spark, etc. Accordingly calculate the memory usage, then
double/triple the number to be on the safe side and monitor the processing.

HTH



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 6 August 2016 at 09:53, Jacek Laskowski  wrote:

> Hi,
>
> Thanks for explanation, but it does not prove Spark will OOM at some
> point. You assume enough data to store but there could be none.
>
> Jacek
>
> On 6 Aug 2016 4:23 a.m., "Mohammed Guller"  wrote:
>
>> Assume the batch interval is 10 seconds and batch processing time is 30
>> seconds. So while Spark Streaming is processing the first batch, the
>> receiver will have a backlog of 20 seconds worth of data. By the time Spark
>> Streaming finishes batch #2, the receiver will have 40 seconds worth of
>> data in memory buffer. This backlog will keep growing as time passes
>> assuming data streams in consistently at the same rate.
>>
>> Also keep in mind that windowing operations on a DStream implicitly
>> persist every RDD in a DStream in memory.
>>
>> Mohammed
>>
>> -Original Message-
>> From: Jacek Laskowski [mailto:ja...@japila.pl]
>> Sent: Thursday, August 4, 2016 4:25 PM
>> To: Mohammed Guller
>> Cc: Saurav Sinha; user
>> Subject: Re: Explanation regarding Spark Streaming
>>
>> On Fri, Aug 5, 2016 at 12:48 AM, Mohammed Guller 
>> wrote:
>> > and eventually you will run out of memory.
>>
>> Why? Mind elaborating?
>>
>> Jacek
>>
>


Re: submitting spark job with kerberized Hadoop issue

2016-08-06 Thread Wojciech Pituła
What I can say, is that we successfully use spark on yarn with kerberized
cluster. One of my coworkers also tried using it in the same way as you
are(spark standalone with kerberized cluster) but as far as I remember, he
didn't succeed. I may be wrong, because I was not personally involved in
this use case, but I think that he concluded, that every executor of spark
standalone cluster must also be kinited.

pt., 5.08.2016 o 15:54 użytkownik Aneela Saleem 
napisał:

> Hi all,
>
> I'm trying to connect to Kerberized Hadoop cluster using spark job. I have
> kinit'd from command line. When i run the following job i.e.,
>
> *./bin/spark-submit --keytab /etc/hadoop/conf/spark.keytab --principal
> spark/hadoop-master@platalyticsrealm --class
> com.platalytics.example.spark.App --master spark://hadoop-master:7077
> /home/vm6/project-1-jar-with-dependencies.jar
> hdfs://hadoop-master:8020/text*
>
> I get the error:
>
> Caused by: java.io.IOException:
> org.apache.hadoop.security.AccessControlException: Client cannot
> authenticate via:[TOKEN, KERBEROS]
> at org.apache.hadoop.ipc.Client$Connection$1.run(Client.java:680)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:415)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
>
> Following are the contents of *spark-defaults.conf* file:
>
> spark.master spark://hadoop-master:7077
> spark.eventLog.enabled   true
> spark.eventLog.dir   hdfs://hadoop-master:8020/spark/logs
> spark.serializer org.apache.spark.serializer.KryoSerializer
> spark.yarn.access.namenodes hdfs://hadoop-master:8020/
> spark.yarn.security.tokens.hbase.enabled true
> spark.yarn.security.tokens.hive.enabled true
> spark.yarn.principal yarn/hadoop-master@platalyticsrealm
> spark.yarn.keytab /etc/hadoop/conf/yarn.keytab
>
>
> Also i have added following in *spark-env.sh* file:
>
> HOSTNAME=`hostname -f`
> export SPARK_HISTORY_OPTS="-Dspark.history.kerberos.enabled=true
> -Dspark.history.kerberos.principal=spark/${HOSTNAME}@platalyticsrealm
> -Dspark.history.kerberos.keytab=/etc/hadoop/conf/spark.keytab"
>
>
> Please guide me, how to trace the issue?
>
> Thanks
>
>


Re: submitting spark job with kerberized Hadoop issue

2016-08-06 Thread Jacek Laskowski
Hi Aneela,

I don't really know. I've never been using (or even toying with) Spark
Standalone to access a secured HDFS cluster. I however think the
settings won't work since they are for Spark on YARN (I would not be
surprised to know that it is not supported outside Spark on YARN).

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Sat, Aug 6, 2016 at 11:03 AM, Aneela Saleem  wrote:
> Hi Jacek!
>
> Thanks for your response. I am using spark standalone. I have secured hadoop
> cluster, Can you please guide me  wha to do if i want to access hadoop in my
> spark job?
>
> Thanks
>
> On Sat, Aug 6, 2016 at 12:34 AM, Jacek Laskowski  wrote:
>>
>> Just to make things clear...are you using Spark Standalone and Spark
>> on YARN-specific settings? I don't think it's gonna work.
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> https://medium.com/@jaceklaskowski/
>> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
>> Follow me at https://twitter.com/jaceklaskowski
>>
>>
>> On Fri, Aug 5, 2016 at 3:54 PM, Aneela Saleem 
>> wrote:
>> > Hi all,
>> >
>> > I'm trying to connect to Kerberized Hadoop cluster using spark job. I
>> > have
>> > kinit'd from command line. When i run the following job i.e.,
>> >
>> > ./bin/spark-submit --keytab /etc/hadoop/conf/spark.keytab --principal
>> > spark/hadoop-master@platalyticsrealm --class
>> > com.platalytics.example.spark.App --master spark://hadoop-master:7077
>> > /home/vm6/project-1-jar-with-dependencies.jar
>> > hdfs://hadoop-master:8020/text
>> >
>> > I get the error:
>> >
>> > Caused by: java.io.IOException:
>> > org.apache.hadoop.security.AccessControlException: Client cannot
>> > authenticate via:[TOKEN, KERBEROS]
>> > at org.apache.hadoop.ipc.Client$Connection$1.run(Client.java:680)
>> > at java.security.AccessController.doPrivileged(Native Method)
>> > at javax.security.auth.Subject.doAs(Subject.java:415)
>> > at
>> >
>> > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
>> >
>> > Following are the contents of spark-defaults.conf file:
>> >
>> > spark.master spark://hadoop-master:7077
>> > spark.eventLog.enabled   true
>> > spark.eventLog.dir   hdfs://hadoop-master:8020/spark/logs
>> > spark.serializer
>> > org.apache.spark.serializer.KryoSerializer
>> > spark.yarn.access.namenodes hdfs://hadoop-master:8020/
>> > spark.yarn.security.tokens.hbase.enabled true
>> > spark.yarn.security.tokens.hive.enabled true
>> > spark.yarn.principal yarn/hadoop-master@platalyticsrealm
>> > spark.yarn.keytab /etc/hadoop/conf/yarn.keytab
>> >
>> >
>> > Also i have added following in spark-env.sh file:
>> >
>> > HOSTNAME=`hostname -f`
>> > export SPARK_HISTORY_OPTS="-Dspark.history.kerberos.enabled=true
>> > -Dspark.history.kerberos.principal=spark/${HOSTNAME}@platalyticsrealm
>> > -Dspark.history.kerberos.keytab=/etc/hadoop/conf/spark.keytab"
>> >
>> >
>> > Please guide me, how to trace the issue?
>> >
>> > Thanks
>> >
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: submitting spark job with kerberized Hadoop issue

2016-08-06 Thread Aneela Saleem
Hi Jacek!

Thanks for your response. I am using spark standalone. I have secured
hadoop cluster, Can you please guide me  wha to do if i want to access
hadoop in my spark job?

Thanks

On Sat, Aug 6, 2016 at 12:34 AM, Jacek Laskowski  wrote:

> Just to make things clear...are you using Spark Standalone and Spark
> on YARN-specific settings? I don't think it's gonna work.
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
>
> On Fri, Aug 5, 2016 at 3:54 PM, Aneela Saleem 
> wrote:
> > Hi all,
> >
> > I'm trying to connect to Kerberized Hadoop cluster using spark job. I
> have
> > kinit'd from command line. When i run the following job i.e.,
> >
> > ./bin/spark-submit --keytab /etc/hadoop/conf/spark.keytab --principal
> > spark/hadoop-master@platalyticsrealm --class
> > com.platalytics.example.spark.App --master spark://hadoop-master:7077
> > /home/vm6/project-1-jar-with-dependencies.jar
> hdfs://hadoop-master:8020/text
> >
> > I get the error:
> >
> > Caused by: java.io.IOException:
> > org.apache.hadoop.security.AccessControlException: Client cannot
> > authenticate via:[TOKEN, KERBEROS]
> > at org.apache.hadoop.ipc.Client$Connection$1.run(Client.java:680)
> > at java.security.AccessController.doPrivileged(Native Method)
> > at javax.security.auth.Subject.doAs(Subject.java:415)
> > at
> > org.apache.hadoop.security.UserGroupInformation.doAs(
> UserGroupInformation.java:1628)
> >
> > Following are the contents of spark-defaults.conf file:
> >
> > spark.master spark://hadoop-master:7077
> > spark.eventLog.enabled   true
> > spark.eventLog.dir   hdfs://hadoop-master:8020/spark/logs
> > spark.serializer org.apache.spark.serializer.
> KryoSerializer
> > spark.yarn.access.namenodes hdfs://hadoop-master:8020/
> > spark.yarn.security.tokens.hbase.enabled true
> > spark.yarn.security.tokens.hive.enabled true
> > spark.yarn.principal yarn/hadoop-master@platalyticsrealm
> > spark.yarn.keytab /etc/hadoop/conf/yarn.keytab
> >
> >
> > Also i have added following in spark-env.sh file:
> >
> > HOSTNAME=`hostname -f`
> > export SPARK_HISTORY_OPTS="-Dspark.history.kerberos.enabled=true
> > -Dspark.history.kerberos.principal=spark/${HOSTNAME}@platalyticsrealm
> > -Dspark.history.kerberos.keytab=/etc/hadoop/conf/spark.keytab"
> >
> >
> > Please guide me, how to trace the issue?
> >
> > Thanks
> >
>


Re: Avoid Cartesian product in calculating a distance matrix?

2016-08-06 Thread Sonal Goyal
The general approach to the Cartesian problem is to first block or index
your rows so that similar items fall in the same bucket, and then join
within each bucket. Is that possible in your case?

On Friday, August 5, 2016, Paschalis Veskos  wrote:

> Hello everyone,
>
> I am interested in running an application on Spark that at some point
> needs to compare all elements of an RDD against all others to create a
> distance matrix. The RDD is of type  and the Pearson
> correlation is applied to each element against all others, generating
> a matrix with the distance between all possible combinations of
> elements.
>
> I have implemented this by taking the cartesian product of the RDD
> with itself, filtering half the matrix away since it is symmetric,
> then doing a combineByKey to get all other elements that it needs to
> be compared with. I map the output of this over the comparison
> function implementing the Pearson correlation.
>
> You can probably guess this is dead slow. I use Spark 1.6.2, the code
> is written in Java 8. At the rate it is processing in a cluster with 4
> nodes with 16cores and 56gb ram each, for a list with ~15000 elements
> split in 512 partitions, the cartesian operation alone is estimated to
> take about 3000 hours (all cores are maxed out on all machines)!
>
> Is there a way to avoid the cartesian product to calculate what I
> want? Would a DataFrame join be faster? Or is this an operation that
> just requires a much larger cluster?
>
> Thank you,
>
> Paschalis
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
>
>

-- 
Best Regards,
Sonal
Founder, Nube Technologies 
Reifier at Strata Hadoop World 
Reifier at Spark Summit 2015





Re: Avoid Cartesian product in calculating a distance matrix?

2016-08-06 Thread Yann-Aël Le Borgne
Hi

I also experienced very slow computation times for the cartesian product,
and could not find an efficient way to do this apart from doing my own
implementation. I used the 'balanced' cluster algorithm described here
http://www.ncbi.nlm.nih.gov/pmc/articles/PMC4246436/

I'd be interested to know if there is a more direct way to do this.

Cheers
Yann

On Fri, Aug 5, 2016 at 7:20 PM, Paschalis Veskos  wrote:

> Hello everyone,
>
> I am interested in running an application on Spark that at some point
> needs to compare all elements of an RDD against all others to create a
> distance matrix. The RDD is of type  and the Pearson
> correlation is applied to each element against all others, generating
> a matrix with the distance between all possible combinations of
> elements.
>
> I have implemented this by taking the cartesian product of the RDD
> with itself, filtering half the matrix away since it is symmetric,
> then doing a combineByKey to get all other elements that it needs to
> be compared with. I map the output of this over the comparison
> function implementing the Pearson correlation.
>
> You can probably guess this is dead slow. I use Spark 1.6.2, the code
> is written in Java 8. At the rate it is processing in a cluster with 4
> nodes with 16cores and 56gb ram each, for a list with ~15000 elements
> split in 512 partitions, the cartesian operation alone is estimated to
> take about 3000 hours (all cores are maxed out on all machines)!
>
> Is there a way to avoid the cartesian product to calculate what I
> want? Would a DataFrame join be faster? Or is this an operation that
> just requires a much larger cluster?
>
> Thank you,
>
> Paschalis
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
=
Yann-Aël Le Borgne
Machine Learning Group
Université Libre de Bruxelles

http://mlg.ulb.ac.be
http://www.ulb.ac.be/di/map/yleborgn
=


RE: Explanation regarding Spark Streaming

2016-08-06 Thread Jacek Laskowski
Hi,

Thanks for explanation, but it does not prove Spark will OOM at some point.
You assume enough data to store but there could be none.

Jacek

On 6 Aug 2016 4:23 a.m., "Mohammed Guller"  wrote:

> Assume the batch interval is 10 seconds and batch processing time is 30
> seconds. So while Spark Streaming is processing the first batch, the
> receiver will have a backlog of 20 seconds worth of data. By the time Spark
> Streaming finishes batch #2, the receiver will have 40 seconds worth of
> data in memory buffer. This backlog will keep growing as time passes
> assuming data streams in consistently at the same rate.
>
> Also keep in mind that windowing operations on a DStream implicitly
> persist every RDD in a DStream in memory.
>
> Mohammed
>
> -Original Message-
> From: Jacek Laskowski [mailto:ja...@japila.pl]
> Sent: Thursday, August 4, 2016 4:25 PM
> To: Mohammed Guller
> Cc: Saurav Sinha; user
> Subject: Re: Explanation regarding Spark Streaming
>
> On Fri, Aug 5, 2016 at 12:48 AM, Mohammed Guller 
> wrote:
> > and eventually you will run out of memory.
>
> Why? Mind elaborating?
>
> Jacek
>


mapWithState handle timeout

2016-08-06 Thread 李剑
I go an error:
Cannot update the state that is timing out

Because I set the timeout:
 val newStateDstream =
newActionDstream.mapWithState(StateSpec.function(mappingFunc).timeout(Seconds(3600)).initialState(initialRDD))

In the spark code :
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/State.scala
the mappingFunction show:

/**
 * :: Experimental ::
 * Abstract class for getting and updating the state in mapping function
used in the `mapWithState`
 * operation of a [[org.apache.spark.streaming.dstream.PairDStreamFunctions
pair DStream]] (Scala)
 * or a [[org.apache.spark.streaming.api.java.JavaPairDStream
JavaPairDStream]] (Java).
 *
 * Scala example of using `State`:
 * {{{
 *// A mapping function that maintains an integer state and returns a
String
 *def mappingFunction(key: String, value: Option[Int], state:
State[Int]): Option[String] = {
 *  // Check if state exists
 *  if (state.exists) {
 *val existingState = state.get  // Get the existing state
 *val shouldRemove = ... // Decide whether to remove the
state
 *if (shouldRemove) {
 *  state.remove() // Remove the state
 *} else {
 *  val newState = ...
 *  state.update(newState)// Set the new state
 *}
 *  } else {
 *val initialState = ...
 *state.update(initialState)  // Set the initial state
 *  }
 *  ... // return something
 *}
 *
 * }}}


update will throw exception in the timeout batch:

 /**
   * Update the state with a new value.
   *
   * State cannot be updated if it has been already removed (that is,
`remove()` has already been
   * called) or it is going to be removed due to timeout (that is,
`isTimingOut()` is `true`).
   *
   * @throws java.lang.IllegalArgumentException If the state has already
been removed, or is
   *going to be removed
   */
  def update(newState: S): Unit


I wonder how to handle timeout in mappingFunc without lost current batch
data?

-- 
http://www.cnblogs.com/hustlijian/
https://github.com/hustlijian


Re: mapWithState handle timeout

2016-08-06 Thread jackerli
any ideas?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/mapWithState-handle-timeout-tp27422p27489.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org