Re: Re: how to extract arraytype data to file

2016-10-18 Thread lk_spark
Thank you, all of you. explode() is helpful:

df.selectExpr("explode(bizs) as e").select("e.*").show()

2016-10-19 

lk_spark 



发件人:Hyukjin Kwon 
发送时间:2016-10-19 13:16
主题:Re: how to extract arraytype data to file
收件人:"Divya Gehlot"
抄送:"lk_spark","user.spark"

This reminds me of 
https://github.com/databricks/spark-xml/issues/141#issuecomment-234835577

Maybe using explode() would be helpful.


Thanks!


2016-10-19 14:05 GMT+09:00 Divya Gehlot :

http://stackoverflow.com/questions/33864389/how-can-i-create-a-spark-dataframe-from-a-nested-array-of-struct-element



Hope this helps 




Thanks,
Divya 


On 19 October 2016 at 11:35, lk_spark  wrote:

hi,all:
I want to read a json file and search it by sql .
the data struct should be :
bid: string (nullable = true)
code: string (nullable = true)
and the json file data should be like :
 {bid":"MzI4MTI5MzcyNw==","code":"罗甸网警"}
 {"bid":"MzI3MzQ5Nzc2Nw==","code":"西早君"}
but in fact my json file data is :
{"bizs":[ 
{bid":"MzI4MTI5MzcyNw==","code":"罗甸网警"},{"bid":"MzI3MzQ5Nzc2Nw==","code":"西早君"}]}
{"bizs":[ 
{bid":"MzI4MTI5Mzcy00==","code":"罗甸网警"},{"bid":"MzI3MzQ5Nzc201==","code":"西早君"}]}

I load it by spark ,data schema shows like this :
root
 |-- bizs: array (nullable = true)
 ||-- element: struct (containsNull = true)
 |||-- bid: string (nullable = true)
 |||-- code: string (nullable = true)

I can select columns by : df.select("bizs.id","bizs.name")
but the colume values is in array type:
+++
|  id|code|
+++
|[4938200, 4938201...|[罗甸网警, 室内设计师杨焰红, ...|
|[4938300, 4938301...|[SDCS十全九美, 旅梦长大, ...|
|[4938400, 4938401...|[日重重工液压行走回转, 氧老家,...|
|[4938500, 4938501...|[PABXSLZ, 陈少燕, 笑蜜...|
|[4938600, 4938601...|[税海微云, 西域美农云家店, 福...|
+++

what I want is I can read colum in normal row type. how I can do it ?

2016-10-19


lk_spark 

Re: Deep learning libraries for scala

2016-10-18 Thread Edward Fine
How about https://deeplearning4j.org/ ?


On Wed, Oct 5, 2016 at 9:25 AM janardhan shetty 
wrote:

> Any help from the experts regarding this is appreciated
> On Oct 3, 2016 1:45 PM, "janardhan shetty"  wrote:
>
> Thanks Ben. The current spark ML package has feed forward multilayer
> perceptron algorithm as well and just wondering how different is your
> implementation ?
>
> https://spark.apache.org/docs/latest/ml-classification-regression.html#multilayer-perceptron-classifier
>
> On Mon, Oct 3, 2016 at 1:40 PM, Benjamin Kim  wrote:
>
> I got this email a while back in regards to this.
>
> Dear Spark users and developers,
>
> I have released version 1.0.0 of scalable-deeplearning package. This
> package is based on the implementation of artificial neural networks in
> Spark ML. It is intended for new Spark deep learning features that were not
> yet merged to Spark ML or that are too specific to be merged. The package
> provides ML pipeline API, distributed training, optimized numerical
> processing with tensor library, and extensible API for developers. Current
> features are the multilayer perceptron classifier and stacked autoencoder.
>
> As a Spark package:
> https://spark-packages.org/package/avulanov/scalable-deeplearning
>
> The source code: https://github.com/avulanov/scalable-deeplearning
>
> Contributions are very welcome! Please, let me know if you have any
> comment or questions.
>
>
> Hope this helps.
>
> Cheers,
> Ben
>
> On Oct 3, 2016, at 12:05 PM, janardhan shetty 
> wrote:
>
> Any leads in this regard ?
>
> On Sat, Oct 1, 2016 at 1:48 PM, janardhan shetty 
> wrote:
>
> Apparently there are no Neural network implementations in tensorframes
> which we can use right ? or Am I missing something here.
>
> I would like to apply neural networks for an NLP settting is there are any
> implementations which can be looked into ?
>
> On Fri, Sep 30, 2016 at 8:14 PM, Suresh Thalamati <
> suresh.thalam...@gmail.com> wrote:
>
> Tensor frames
>
> https://spark-packages.org/package/databricks/tensorframes
>
> Hope that helps
> -suresh
>
> On Sep 30, 2016, at 8:00 PM, janardhan shetty 
> wrote:
>
> Looking for scala dataframes in particular ?
>
> On Fri, Sep 30, 2016 at 7:46 PM, Gavin Yue  wrote:
>
> Skymind you could try. It is java
>
> I never test though.
>
> > On Sep 30, 2016, at 7:30 PM, janardhan shetty 
> wrote:
> >
> > Hi,
> >
> > Are there any good libraries which can be used for scala deep learning
> models ?
> > How can we integrate tensorflow with scala ML ?
>
>
>
>
>
>
>
>


Re: how to extract arraytype data to file

2016-10-18 Thread Hyukjin Kwon
This reminds me of
https://github.com/databricks/spark-xml/issues/141#issuecomment-234835577

Maybe using explode() would be helpful.

Thanks!

2016-10-19 14:05 GMT+09:00 Divya Gehlot :

> http://stackoverflow.com/questions/33864389/how-can-i-
> create-a-spark-dataframe-from-a-nested-array-of-struct-element
>
> Hope this helps
>
>
> Thanks,
> Divya
>
> On 19 October 2016 at 11:35, lk_spark  wrote:
>
>> hi,all:
>> I want to read a json file and search it by sql .
>> the data struct should be :
>>
>> bid: string (nullable = true)
>> code: string (nullable = true)
>>
>> and the json file data should be like :
>>  {bid":"MzI4MTI5MzcyNw==","code":"罗甸网警"}
>>  {"bid":"MzI3MzQ5Nzc2Nw==","code":"西早君"}
>> but in fact my json file data is :
>> {"bizs":[ {bid":"MzI4MTI5MzcyNw==","code
>> ":"罗甸网警"},{"bid":"MzI3MzQ5Nzc2Nw==","code":"西早君"}]}
>> {"bizs":[ {bid":"MzI4MTI5Mzcy00==","code
>> ":"罗甸网警"},{"bid":"MzI3MzQ5Nzc201==","code":"西早君"}]}
>> I load it by spark ,data schema shows like this :
>>
>> root
>>  |-- bizs: array (nullable = true)
>>  ||-- element: struct (containsNull = true)
>>  |||-- bid: string (nullable = true)
>>  |||-- code: string (nullable = true)
>>
>>
>> I can select columns by : df.select("bizs.id","bizs.name")
>> but the colume values is in array type:
>> +++
>> |  id|code|
>> +++
>> |[4938200, 4938201...|[罗甸网警, 室内设计师杨焰红, ...|
>> |[4938300, 4938301...|[SDCS十全九美, 旅梦长大, ...|
>> |[4938400, 4938401...|[日重重工液压行走回转, 氧老家,...|
>> |[4938500, 4938501...|[PABXSLZ, 陈少燕, 笑蜜...|
>> |[4938600, 4938601...|[税海微云, 西域美农云家店, 福...|
>> +++
>>
>> what I want is I can read colum in normal row type. how I can do it ?
>> 2016-10-19
>> --
>> lk_spark
>>
>
>


Re: how to extract arraytype data to file

2016-10-18 Thread Divya Gehlot
http://stackoverflow.com/questions/33864389/how-can-i-create-a-spark-dataframe-from-a-nested-array-of-struct-element

Hope this helps


Thanks,
Divya

On 19 October 2016 at 11:35, lk_spark  wrote:

> hi,all:
> I want to read a json file and search it by sql .
> the data struct should be :
>
> bid: string (nullable = true)
> code: string (nullable = true)
>
> and the json file data should be like :
>  {bid":"MzI4MTI5MzcyNw==","code":"罗甸网警"}
>  {"bid":"MzI3MzQ5Nzc2Nw==","code":"西早君"}
> but in fact my json file data is :
> {"bizs":[ {bid":"MzI4MTI5MzcyNw==","code":"罗甸网警"},{"bid":"
> MzI3MzQ5Nzc2Nw==","code":"西早君"}]}
> {"bizs":[ {bid":"MzI4MTI5Mzcy00==","code":"罗甸网警"},{"bid":"
> MzI3MzQ5Nzc201==","code":"西早君"}]}
> I load it by spark ,data schema shows like this :
>
> root
>  |-- bizs: array (nullable = true)
>  ||-- element: struct (containsNull = true)
>  |||-- bid: string (nullable = true)
>  |||-- code: string (nullable = true)
>
>
> I can select columns by : df.select("bizs.id","bizs.name")
> but the colume values is in array type:
> +++
> |  id|code|
> +++
> |[4938200, 4938201...|[罗甸网警, 室内设计师杨焰红, ...|
> |[4938300, 4938301...|[SDCS十全九美, 旅梦长大, ...|
> |[4938400, 4938401...|[日重重工液压行走回转, 氧老家,...|
> |[4938500, 4938501...|[PABXSLZ, 陈少燕, 笑蜜...|
> |[4938600, 4938601...|[税海微云, 西域美农云家店, 福...|
> +++
>
> what I want is I can read colum in normal row type. how I can do it ?
> 2016-10-19
> --
> lk_spark
>


RE: how to extract arraytype data to file

2016-10-18 Thread Kappaganthu, Sivaram (ES)
There is an option called Explode for this .

From: lk_spark [mailto:lk_sp...@163.com]
Sent: Wednesday, October 19, 2016 9:06 AM
To: user.spark
Subject: how to extract arraytype data to file

hi,all:
I want to read a json file and search it by sql .
the data struct should be :
bid: string (nullable = true)
code: string (nullable = true)
and the json file data should be like :
 {bid":"MzI4MTI5MzcyNw==","code":"罗甸网警"}
 {"bid":"MzI3MzQ5Nzc2Nw==","code":"西早君"}
but in fact my json file data is :
{"bizs":[ 
{bid":"MzI4MTI5MzcyNw==","code":"罗甸网警"},{"bid":"MzI3MzQ5Nzc2Nw==","code":"西早君"}]}
{"bizs":[ 
{bid":"MzI4MTI5Mzcy00==","code":"罗甸网警"},{"bid":"MzI3MzQ5Nzc201==","code":"西早君"}]}
I load it by spark ,data schema shows like this :
root
 |-- bizs: array (nullable = true)
 ||-- element: struct (containsNull = true)
 |||-- bid: string (nullable = true)
 |||-- code: string (nullable = true)

I can select columns by : df.select("bizs.id","bizs.name")
but the colume values is in array type:
+++
|  id|code|
+++
|[4938200, 4938201...|[罗甸网警, 室内设计师杨焰红, ...|
|[4938300, 4938301...|[SDCS十全九美, 旅梦长大, ...|
|[4938400, 4938401...|[日重重工液压行走回转, 氧老家,...|
|[4938500, 4938501...|[PABXSLZ, 陈少燕, 笑蜜...|
|[4938600, 4938601...|[税海微云, 西域美农云家店, 福...|
+++

what I want is I can read colum in normal row type. how I can do it ?
2016-10-19

lk_spark

--
This message and any attachments are intended only for the use of the addressee 
and may contain information that is privileged and confidential. If the reader 
of the message is not the intended recipient or an authorized representative of 
the intended recipient, you are hereby notified that any dissemination of this 
communication is strictly prohibited. If you have received this communication 
in error, notify the sender immediately by return email and delete the message 
and any attachments from your system.


how to extract arraytype data to file

2016-10-18 Thread lk_spark
hi,all:
I want to read a json file and search it by sql .
the data struct should be :
bid: string (nullable = true)
code: string (nullable = true)
and the json file data should be like :
 {bid":"MzI4MTI5MzcyNw==","code":"罗甸网警"}
 {"bid":"MzI3MzQ5Nzc2Nw==","code":"西早君"}
but in fact my json file data is :
{"bizs":[ 
{bid":"MzI4MTI5MzcyNw==","code":"罗甸网警"},{"bid":"MzI3MzQ5Nzc2Nw==","code":"西早君"}]}
{"bizs":[ 
{bid":"MzI4MTI5Mzcy00==","code":"罗甸网警"},{"bid":"MzI3MzQ5Nzc201==","code":"西早君"}]}

I load it by spark ,data schema shows like this :
root
 |-- bizs: array (nullable = true)
 ||-- element: struct (containsNull = true)
 |||-- bid: string (nullable = true)
 |||-- code: string (nullable = true)

I can select columns by : df.select("bizs.id","bizs.name")
but the colume values is in array type:
+++
|  id|code|
+++
|[4938200, 4938201...|[罗甸网警, 室内设计师杨焰红, ...|
|[4938300, 4938301...|[SDCS十全九美, 旅梦长大, ...|
|[4938400, 4938401...|[日重重工液压行走回转, 氧老家,...|
|[4938500, 4938501...|[PABXSLZ, 陈少燕, 笑蜜...|
|[4938600, 4938601...|[税海微云, 西域美农云家店, 福...|
+++

what I want is I can read colum in normal row type. how I can do it ?

2016-10-19


lk_spark 

hive.exec.stagingdir not effect in spark2.0.1

2016-10-18 Thread 谭 成灶

Hi ,
 I have set property "hive.exec.stagingdir" to hdfs dir 
"/tmp/spark_log/${user.name}/.hive-staging" in hive-site.xml ,but it not effect 
in spark2.0.1,
directory name that still be created inside table locations .It works in spark 
1.6,and creates hive-staging files in  hdfs dir 
"/tmp/spark_log/${user.name}/.hive-staging",so I can use shell to   delete  
batch files.





How does Spark determine in-memory partition count when reading Parquet ~files?

2016-10-18 Thread shea.parkes
When reading a parquet ~file with >50 parts, Spark is giving me a DataFrame
object with far fewer in-memory partitions.

I'm happy to troubleshoot this further, but I don't know Scala well and
could use some help pointing me in the right direction.  Where should I be
looking in the code base to understand how many partitions will result from
reading a parquet ~file?

Thanks,

Shea



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-does-Spark-determine-in-memory-partition-count-when-reading-Parquet-files-tp27921.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Broadcasting Non Serializable Objects

2016-10-18 Thread Daniel Imberman
Hi Pedro,

Can you please post your code?

Daniel

On Tue, Oct 18, 2016 at 12:27 PM pedroT  wrote:

> Hi guys.
>
> I know this is a well known topic, but reading about (a lot) I'm not sure
> about the answer..
>
> I need to broadcast a complex estructure with a lot of objects as fields,
> including some of external libraries which I can't happily turn in
> serializable ones.
>
> I tried making a static method returning the data, following this post (I
> use Java API):
>
> https://www.nicolaferraro.me/2016/02/22/using-non-serializable-objects-in-apache-spark/
> but the same serializable exception arise.
>
> Is there a way to achieve this?
>
>
>
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Broadcasting-Non-Serializable-Objects-tp27919.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Why the json file used by sparkSession.read.json must be a valid json object per line

2016-10-18 Thread Hyukjin Kwon
Regarding his recent PR[1], I guess he meant multiple line json.

As far as I know, single line json also conplies the standard. I left a
comment with RFC in the PR but please let me know if I am wrong at any
point.

Thanks!

[1]https://github.com/apache/spark/pull/15511

On 19 Oct 2016 7:00 a.m., "Daniel Barclay" 
wrote:

> Koert,
>
> Koert Kuipers wrote:
>
> A single json object would mean for most parsers it needs to fit in memory
> when reading or writing
>
> Note that codlife didn't seem to being asking about *single-object* JSON
> files, but about *standard-format* JSON files.
>
>
> On Oct 15, 2016 11:09, "codlife" <1004910...@qq.com> wrote:
>
>> Hi:
>>I'm doubt about the design of spark.read.json,  why the json file is
>> not
>> a standard json file, who can tell me the internal reason. Any advice is
>> appreciated.
>>
>>
>>
>> --
>> View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/Why-the-json-file-used-by-sparkSession
>> -read-json-must-be-a-valid-json-object-per-line-tp27907.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


Re: spark with kerberos

2016-10-18 Thread Michael Segel
(Sorry sent reply via wrong account.. )

Steve,

Kinda hijacking the thread, but I promise its still on topic to OP’s issue.. ;-)

Usually you will end up having a local Kerberos set up per cluster.
So your machine accounts (hive, yarn, hbase, etc …) are going to be local  to 
the cluster.

So you will have to set up some sort of realm trusts between the clusters.

If you’re going to be setting up security (Kerberos … ick! shivers… ;-) you’re 
going to want to keep the machine accounts isolated to the cluster.
And the OP said that he didn’t control the other cluster which makes me believe 
that they are separate.


I would also think that you would have trouble with the credential… isn’t is 
tied to a user at a specific machine?
(Its been a while since I looked at this and I drank heavily to forget 
Kerberos… so I may be a bit fuzzy here.)

Thx

-Mike
On Oct 18, 2016, at 2:59 PM, Steve Loughran 
> wrote:


On 17 Oct 2016, at 22:11, Michael Segel 
> wrote:

@Steve you are going to have to explain what you mean by ‘turn Kerberos on’.

Taken one way… it could mean making cluster B secure and running Kerberos and 
then you’d have to create some sort of trust between B and C,



I'd imagined making cluster B a kerberized cluster.

I don't think you need to go near trust relations though —ideally you'd just 
want the same accounts everywhere if you can, if not, the main thing is that 
the user submitting the job can get a credential for  that far NN at job 
submission time, and that credential is propagated all the way to the executors.


Did you mean turn on kerberos on the nodes in Cluster B so that each node 
becomes a trusted client that can connect to C

OR

Did you mean to turn on kerberos on the master node (eg edge node) where the 
data persists if you collect() it so its off the cluster on to a single machine 
and then push it from there so that only that machine has to have kerberos 
running and is a trusted server to Cluster C?


Note: In option 3, I hope I said it correctly, but I believe that you would be 
collecting the data to a client (edge node) before pushing it out to the 
secured cluster.





Does that make sense?

On Oct 14, 2016, at 1:32 PM, Steve Loughran 
> wrote:


On 13 Oct 2016, at 10:50, dbolshak 
> wrote:

Hello community,

We've a challenge and no ideas how to solve it.

The problem,

Say we have the following environment:
1. `cluster A`, the cluster does not use kerberos and we use it as a source
of data, important thing is - we don't manage this cluster.
2. `cluster B`, small cluster where our spark application is running and
performing some logic. (we manage this cluster and it does not have
kerberos).
3. `cluster C`, the cluster uses kerberos and we use it to keep results of
our spark application, we manage this cluster

Our requrements and conditions that are not mentioned yet:
1. All clusters are in a single data center, but in the different
subnetworks.
2. We cannot turn on kerberos on `cluster A`
3. We cannot turn off kerberos on `cluster C`
4. We can turn on/off kerberos on `cluster B`, currently it's turned off.
5. Spark app is built on top of RDD and does not depend on spark-sql.

Does anybody know how to write data using RDD api to remote cluster which is
running with Kerberos?

If you want to talk to the secure clsuter, C, from code running in cluster B, 
you'll need to turn kerberos on there. Maybe, maybe, you could just get away 
with kerberos being turned off, but you, the user, launching the application 
while logged in to kerberos yourself and so trusted by Cluster C.

one of the problems you are likely to hit with Spark here is that it's only 
going to collect the tokens you need to talk to HDFS at the time you launch the 
application, and by default, it only knows about the cluster FS. You will need 
to tell spark about the other filesystem at launch time, so it will know to 
authenticate with it as you, then collect the tokens needed for the application 
itself to work with kerberos.

spark.yarn.access.namenodes=hdfs://cluster-c:8080

-Steve

ps: https://steveloughran.gitbooks.io/kerberos_and_hadoop/content/






Re: Does the delegator map task of SparkLauncher need to stay alive until Spark job finishes ?

2016-10-18 Thread Marcelo Vanzin
On Tue, Oct 18, 2016 at 3:01 PM, Elkhan Dadashov  wrote:
> Does my map task need to wait until Spark job finishes ?

No...

> Or is there any way, my map task finishes after launching Spark job, and I
> can still query and get status of Spark job outside of map task (or failure
> reason, if it has failed) ? (maybe by querying Spark job id ?)

...but if the SparkLauncher handle goes away, then you lose the
ability to track the app's state, unless you talk directly to the
cluster manager.

> I guess also if i want my Spark job to be killed, if corresponding delegator
> map task is killed, that means my map task needs to stay alive, so i still
> have SparkAppHandle reference ?

Correct, unless you talk directly to the cluster manager.

-- 
Marcelo

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Does the delegator map task of SparkLauncher need to stay alive until Spark job finishes ?

2016-10-18 Thread Elkhan Dadashov
Hi,

Does the delegator map task of SparkLauncher need to stay alive until Spark
job finishes ?

1)
Currently, I have mapper tasks, which launches Spark job via
SparkLauncer#startApplication()

Does my map task need to wait until Spark job finishes ?

Or is there any way, my map task finishes after launching Spark job, and I
can still query and get status of Spark job outside of map task (or failure
reason, if it has failed) ? (maybe by querying Spark job id ?)

2)
I guess also if i want my Spark job to be killed, if corresponding
delegator map task is killed, that means my map task needs to stay alive,
so i still have SparkAppHandle reference ?

Thanks.


Re: Why the json file used by sparkSession.read.json must be a valid json object per line

2016-10-18 Thread Daniel Barclay

Koert,

Koert Kuipers wrote:


A single json object would mean for most parsers it needs to fit in memory when 
reading or writing


Note that codlife didn't seem to being asking about /single-object/ JSON files, 
but about /standard-format/ JSON files.


On Oct 15, 2016 11:09, "codlife" <1004910...@qq.com > 
wrote:

Hi:
   I'm doubt about the design of spark.read.json,  why the json file is not
a standard json file, who can tell me the internal reason. Any advice is
appreciated.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Why-the-json-file-used-by-sparkSession-read-json-must-be-a-valid-json-object-per-line-tp27907.html
 

Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org 






Re: Aggregate UDF (UDAF) in Python

2016-10-18 Thread ayan guha
Is it possible to use aggregate by function available at rdd level to do
similar stuff?
On 19 Oct 2016 04:41, "Tobi Bosede"  wrote:

> Thanks Assaf.
>
> This is a lot more complicated than I was expecting...might end up using
> collect if data fits in memory. I was also thinking about using the pivot
> function in pandas, but that wouldn't work in parallel and so would be even
> more inefficient than collect.
>
> On Tue, Oct 18, 2016 at 7:24 AM, Mendelson, Assaf  > wrote:
>
>> A simple example:
>>
>>
>>
>> We have a scala file:
>>
>>
>>
>> *package *com.myorg.example
>>
>> *import *org.apache.spark.sql.{Row, SparkSession}
>> *import *org.apache.spark.sql.expressions.{MutableAggregationBuffer, 
>> UserDefinedAggregateFunction}
>> *import *org.apache.spark.sql.functions.{rand, sum}
>> *import *org.apache.spark.sql.types.{DataType, DoubleType, StructField, 
>> StructType}
>>
>>
>> *class *PerformSumUDAF() *extends *UserDefinedAggregateFunction {
>>
>>   *def *inputSchema: StructType = 
>> *StructType*(*Array*(*StructField*(*"item"*, DoubleType)))
>>
>>   *def *bufferSchema: StructType = 
>> *StructType*(*Array*(*StructField*(*"sum"*, DoubleType)))
>>
>>   *def *dataType: DataType = DoubleType
>>
>>   *def *deterministic: Boolean =
>>
>> *true  def *initialize(buffer: MutableAggregationBuffer): Unit = {
>> buffer(0) = 0.toDouble
>>   }
>>
>>   *def *update(buffer: MutableAggregationBuffer, input: Row): Unit = {
>> buffer(0) = buffer.getDouble(0) + input.getDouble(0)
>>   }
>>
>>   *def *merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
>> buffer1(0) = buffer1.getDouble(0) + buffer2.getDouble(0)
>>   }
>>
>>   *def *evaluate(buffer: Row): Any = {
>> buffer.getDouble(0)
>>   }
>> }
>>
>>
>>
>> We place the file under myroot/src/main/scala/com/myor
>> g/example/ExampleUDAF.scala
>>
>> Under myroot we create a pom file (sorry for not cleaning it up, it
>> includes some stuff you probably not need like guava and avro)
>>
>> <*project*>
>>   <*groupId*>edu.berkeley
>>   <*artifactId*>simple-project
>>   <*modelVersion*>4.0.0
>>   <*name*>example packages
>>   <*packaging*>jar
>>   <*version*>1.0
>>   <*properties*>
>> <*project.build.sourceEncoding*>UTF-8
>> <*maven.compiler.source*>1.8
>> <*maven.compiler.target*>1.8
>>   
>>   <*dependencies*>
>>   <*dependency*>
>>   <*groupId*>com.google.guava
>>   <*artifactId*>guava
>>   <*version*>19.0
>>   
>> <*dependency*>
>> *  *<*groupId*>org.apache.spark
>>   <*artifactId*>spark-core_2.11
>>   <*version*>2.0.0
>>   <*scope*>provided
>> 
>> <*dependency*>
>>   <*groupId*>org.postgresql
>>   <*artifactId*>postgresql
>>   <*version*>9.4.1208
>> 
>> <*dependency*>
>>   <*groupId*>com.databricks
>>   <*artifactId*>spark-avro_2.11
>>   <*version*>3.0.0-preview2
>> 
>> <*dependency*>
>>   <*groupId*>org.apache.spark
>>   <*artifactId*>spark-sql_2.11
>>   <*version*>2.0.0
>>   <*scope*>provided
>> 
>> <*dependency*>
>> <*groupId*>org.scala-lang
>> <*artifactId*>scala-library
>> <*version*>2.11.8
>>   <*scope*>provided
>> 
>>
>>   
>> <*build*>
>>   <*plugins*>
>>   <*plugin*>
>>   <*groupId*>org.apache.maven.plugins
>>   <*artifactId*>maven-shade-plugin
>>   <*version*>2.4.3
>>   <*executions*>
>>   <*execution*>
>>   <*phase*>package
>>   <*goals*>
>>   <*goal*>shade
>>   
>>   <*configuration*>
>>   <*relocations*>
>> <*relocation*>
>>   <*pattern*>com.google.common
>>   
>> <*shadedPattern*>com.myorg.shaded.com.google.common
>> 
>>   
>>   
>> <*finalName*>simple-project-1.0-jar-with-dependencies
>>   
>>   
>>   
>>   
>>   <*plugin*>
>> <*groupId*>org.scala-tools
>> <*artifactId*>maven-scala-plugin
>> <*version*>2.15.2
>> <*executions*>
>>   <*execution*>
>> <*goals*>
>>   <*goal*>compile
>> 
>>   
>> 
>>   
>>
>> 
>> 
>> 
>>
>>
>>
>> Now you can compile the scala like so: mvn clean install (I assume you
>> have maven installed).
>>
>>
>>
>> Now we want to call this from python (assuming spark is your spark
>> session):
>>
>> # get a reference dataframe to do the example on:
>>
>> df = spark.range(20)
>>
>>
>>
>> # get the jvm pointer
>>
>> jvm = spark.sparkContext._gateway.jvm
>>
>> # import the class
>>
>> from py4j.java_gateway import java_import
>>
>> java_import(jvm, "com.myorg.example.PerformSumUDAF")
>>
>>
>>
>> #create an object from the class:
>>
>> udafObj = 

Re: jdbcRDD for data ingestion from RDBMS

2016-10-18 Thread Mich Talebzadeh
Hi,

If we are talking about billions of records and depending on your network
and RDBMs with parallel connections, from my experience it works OK for
Dimension tables of moderate size, in that you can have parallel
connections to RDBMS (assuming the RDBMS has a primary key/unique column)
to parallelise the process and read data  "as is" in Spark using JDBC
connections.

However the other alternative is to get data into HDFS using Sqoop or even
Spark.

The third option is to use bulk copy to get the data out of RDBMS table
into a directory (csv type), scp it into HDFS host and put it into HDFS and
then you can access it though Hive external tables etc.

A real time load of data using Spark JDBC makes sense if the RDBMS table
itself is pretty small. For most dimension tables should satisfy this. This
approach is not advisable for FACT tables.

HTH



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 18 October 2016 at 10:35, Teng Qiu  wrote:

> Hi Ninad, i believe the purpose of jdbcRDD is to use RDBMS as an addtional
> data source during the data processing, main goal of spark is still
> analyzing data from HDFS-like file system.
>
> to use spark as a data integration tool to transfer billions of records
> from RDBMS to HDFS etc. could work, but may not be the best tool... Sqoop
> with --direct sounds better, but the configuration costs, sqoop should be
> used for regular data integration tasks.
>
> not sure if your client need transfer billions of records periodically, if
> it is only an initial load, for such an one-off task, maybe a bash script
> with COPY command is more easier and faster :)
>
> Best,
>
> Teng
>
>
> 2016-10-18 4:24 GMT+02:00 Ninad Shringarpure :
>
>>
>> Hi Team,
>>
>> One of my client teams is trying to see if they can use Spark to source
>> data from RDBMS instead of Sqoop.  Data would be substantially large in the
>> order of billions of records.
>>
>> I am not sure reading the documentations whether jdbcRDD by design is
>> going to be able to scale well for this amount of data. Plus some in-built
>> features provided in Sqoop like --direct might give better performance than
>> straight up jdbc.
>>
>> My primary question to this group is if it is advisable to use jdbcRDD
>> for data sourcing and can we expect it to scale. Also performance wise how
>> would it compare to Sqoop.
>>
>> Please let me know your thoughts and any pointers if anyone in the group
>> has already implemented it.
>>
>> Thanks,
>> Ninad
>>
>>
>


spark streaming client program needs to be restarted after few hours of idle time. how can I fix it?

2016-10-18 Thread kant kodali
Hi Guys,

My Spark Streaming Client program works fine as the long as the receiver
receives the data  but say my receiver has no more data to receive for few
hours like (4-5 hours) and then its starts receiving the data again at that
point spark client program doesn't seem to process any data. It needs to be
restarted in which case everything seem to work fine again. I am using
spark standalone mode and my client program has following lines in the end
for it to run forever. any ideas what can go wrong? I have some potential
suspects and I will share them after a bit of experimentation from my end.

Thanks!

ssc.start();
ssc.awaitTermination();


Re: About Error while reading large JSON file in Spark

2016-10-18 Thread Steve Loughran

On 18 Oct 2016, at 08:43, Chetan Khatri 
> wrote:

Hello Community members,

I am getting error while reading large JSON file in spark,


the underlying read code can't handle more than 2^31 bytes in a single line:

if (bytesConsumed > Integer.MAX_VALUE) {
  throw new IOException("Too many bytes before newline: " + bytesConsumed);
}

That's because it's trying to split work by line, and of course, there aren't 
lines

you need to move over to reading the JSON by other means, i'm afraid. At a 
guess, something involving SparkContext.binaryFiles() streaming the data 
straight into a JSON parser,



Code:

val landingVisitor = 
sqlContext.read.json("s3n://hist-ngdp/lvisitor/lvisitor-01-aug.json")

unrelated, but use s3a if you can. It's better, you know.


Error:

16/10/18 07:30:30 ERROR Executor: Exception in task 8.0 in stage 0.0 (TID 8)
java.io.IOException: Too many bytes before newline: 2147483648
at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:249)
at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
at org.apache.hadoop.mapred.LineRecordReader.(LineRecordReader.java:135)
at 
org.apache.hadoop.mapred.TextInputFormat.getRecordReader(TextInputFormat.java:67)
at org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:237)

What would be resolution for the same ?

Thanks in Advance !


--
Yours Aye,
Chetan Khatri.




Re: Making more features in Logistic Regression

2016-10-18 Thread Nick Pentreath
You can use the PolynomialExpansion in Spark ML (
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.ml.feature.PolynomialExpansion
)

On Tue, 18 Oct 2016 at 21:47 miro  wrote:

> Yes, I was thinking going down this road:
>
>
> http://scikit-learn.org/stable/modules/linear_model.html#polynomial-regression-extending-linear-models-with-basis-functions
>
> http://stats.stackexchange.com/questions/58739/polynomial-regression-using-scikit-learn
>
>
> But I’m not sure if spark actually has polynomial regression implemented
> (I couldn’t find it) - maybe SparkML gurus can help here?
>
> You could take a look also at scikit integration package with Spark (
> https://github.com/databricks/spark-sklearn).
>
> Hope it helped :)
>
> All the best,
> m.
>
>
>
> On 18 Oct 2016, at 20:36, aditya1702  wrote:
>
> 
>
> 
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Making-more-features-in-Logistic-Regression-tp27915p27918.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>
>


Re: spark with kerberos

2016-10-18 Thread Steve Loughran

On 17 Oct 2016, at 22:11, Michael Segel 
> wrote:

@Steve you are going to have to explain what you mean by ‘turn Kerberos on’.

Taken one way… it could mean making cluster B secure and running Kerberos and 
then you’d have to create some sort of trust between B and C,



I'd imagined making cluster B a kerberized cluster.

I don't think you need to go near trust relations though —ideally you'd just 
want the same accounts everywhere if you can, if not, the main thing is that 
the user submitting the job can get a credential for  that far NN at job 
submission time, and that credential is propagated all the way to the executors.


Did you mean turn on kerberos on the nodes in Cluster B so that each node 
becomes a trusted client that can connect to C

OR

Did you mean to turn on kerberos on the master node (eg edge node) where the 
data persists if you collect() it so its off the cluster on to a single machine 
and then push it from there so that only that machine has to have kerberos 
running and is a trusted server to Cluster C?


Note: In option 3, I hope I said it correctly, but I believe that you would be 
collecting the data to a client (edge node) before pushing it out to the 
secured cluster.





Does that make sense?

On Oct 14, 2016, at 1:32 PM, Steve Loughran 
> wrote:


On 13 Oct 2016, at 10:50, dbolshak 
> wrote:

Hello community,

We've a challenge and no ideas how to solve it.

The problem,

Say we have the following environment:
1. `cluster A`, the cluster does not use kerberos and we use it as a source
of data, important thing is - we don't manage this cluster.
2. `cluster B`, small cluster where our spark application is running and
performing some logic. (we manage this cluster and it does not have
kerberos).
3. `cluster C`, the cluster uses kerberos and we use it to keep results of
our spark application, we manage this cluster

Our requrements and conditions that are not mentioned yet:
1. All clusters are in a single data center, but in the different
subnetworks.
2. We cannot turn on kerberos on `cluster A`
3. We cannot turn off kerberos on `cluster C`
4. We can turn on/off kerberos on `cluster B`, currently it's turned off.
5. Spark app is built on top of RDD and does not depend on spark-sql.

Does anybody know how to write data using RDD api to remote cluster which is
running with Kerberos?

If you want to talk to the secure clsuter, C, from code running in cluster B, 
you'll need to turn kerberos on there. Maybe, maybe, you could just get away 
with kerberos being turned off, but you, the user, launching the application 
while logged in to kerberos yourself and so trusted by Cluster C.

one of the problems you are likely to hit with Spark here is that it's only 
going to collect the tokens you need to talk to HDFS at the time you launch the 
application, and by default, it only knows about the cluster FS. You will need 
to tell spark about the other filesystem at launch time, so it will know to 
authenticate with it as you, then collect the tokens needed for the application 
itself to work with kerberos.

spark.yarn.access.namenodes=hdfs://cluster-c:8080

-Steve

ps: https://steveloughran.gitbooks.io/kerberos_and_hadoop/content/




Re: Making more features in Logistic Regression

2016-10-18 Thread miro
Yes, I was thinking going down this road:

http://scikit-learn.org/stable/modules/linear_model.html#polynomial-regression-extending-linear-models-with-basis-functions
 

http://stats.stackexchange.com/questions/58739/polynomial-regression-using-scikit-learn
 



But I’m not sure if spark actually has polynomial regression implemented (I 
couldn’t find it) - maybe SparkML gurus can help here? 

You could take a look also at scikit integration package with Spark 
(https://github.com/databricks/spark-sklearn 
).

Hope it helped :)

All the best,
m.



> On 18 Oct 2016, at 20:36, aditya1702  wrote:
> 
>  
>  
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Making-more-features-in-Logistic-Regression-tp27915p27918.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 



Broadcasting Non Serializable Objects

2016-10-18 Thread pedroT
Hi guys. 

I know this is a well known topic, but reading about (a lot) I'm not sure
about the answer..

I need to broadcast a complex estructure with a lot of objects as fields,
including some of external libraries which I can't happily turn in
serializable ones. 

I tried making a static method returning the data, following this post (I
use Java API):
https://www.nicolaferraro.me/2016/02/22/using-non-serializable-objects-in-apache-spark/
but the same serializable exception arise.

Is there a way to achieve this?








--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Broadcasting-Non-Serializable-Objects-tp27919.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Making more features in Logistic Regression

2016-10-18 Thread aditya1702
 
 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Making-more-features-in-Logistic-Regression-tp27915p27918.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Making more features in Logistic Regression

2016-10-18 Thread aditya1702

 

 

Here is the graph and the features with their corresponding data



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Making-more-features-in-Logistic-Regression-tp27915p27917.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Making more features in Logistic Regression

2016-10-18 Thread miro
Hi, 

I think it depends on how non-linear data you have. You could add polynomial to 
your model,..but everything depends on your data. If you could share more 
details maybe a scatter plot, would help to investigate the problem further.

All the best,
Miro


> On 18 Oct 2016, at 19:09, aditya1702  wrote:
> 
> Hello,
> I am trying to solve a problem of Logistic Regression using Spark. I am
> still a newbie to machine learning. I wanted to ask that if I have 2
> features for logistic regression and if the features are non-linear
> (regularized logistic regression) do we have to make more features by
> considering the higher powers of the features?
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Making-more-features-in-Logistic-Regression-tp27915.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Aggregate UDF (UDAF) in Python

2016-10-18 Thread Tobi Bosede
Thanks Assaf.

This is a lot more complicated than I was expecting...might end up using
collect if data fits in memory. I was also thinking about using the pivot
function in pandas, but that wouldn't work in parallel and so would be even
more inefficient than collect.

On Tue, Oct 18, 2016 at 7:24 AM, Mendelson, Assaf 
wrote:

> A simple example:
>
>
>
> We have a scala file:
>
>
>
> *package *com.myorg.example
>
> *import *org.apache.spark.sql.{Row, SparkSession}
> *import *org.apache.spark.sql.expressions.{MutableAggregationBuffer, 
> UserDefinedAggregateFunction}
> *import *org.apache.spark.sql.functions.{rand, sum}
> *import *org.apache.spark.sql.types.{DataType, DoubleType, StructField, 
> StructType}
>
>
> *class *PerformSumUDAF() *extends *UserDefinedAggregateFunction {
>
>   *def *inputSchema: StructType = 
> *StructType*(*Array*(*StructField*(*"item"*, DoubleType)))
>
>   *def *bufferSchema: StructType = 
> *StructType*(*Array*(*StructField*(*"sum"*, DoubleType)))
>
>   *def *dataType: DataType = DoubleType
>
>   *def *deterministic: Boolean =
>
> *true  def *initialize(buffer: MutableAggregationBuffer): Unit = {
> buffer(0) = 0.toDouble
>   }
>
>   *def *update(buffer: MutableAggregationBuffer, input: Row): Unit = {
> buffer(0) = buffer.getDouble(0) + input.getDouble(0)
>   }
>
>   *def *merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
> buffer1(0) = buffer1.getDouble(0) + buffer2.getDouble(0)
>   }
>
>   *def *evaluate(buffer: Row): Any = {
> buffer.getDouble(0)
>   }
> }
>
>
>
> We place the file under myroot/src/main/scala/com/
> myorg/example/ExampleUDAF.scala
>
> Under myroot we create a pom file (sorry for not cleaning it up, it
> includes some stuff you probably not need like guava and avro)
>
> <*project*>
>   <*groupId*>edu.berkeley
>   <*artifactId*>simple-project
>   <*modelVersion*>4.0.0
>   <*name*>example packages
>   <*packaging*>jar
>   <*version*>1.0
>   <*properties*>
> <*project.build.sourceEncoding*>UTF-8
> <*maven.compiler.source*>1.8
> <*maven.compiler.target*>1.8
>   
>   <*dependencies*>
>   <*dependency*>
>   <*groupId*>com.google.guava
>   <*artifactId*>guava
>   <*version*>19.0
>   
> <*dependency*>
> *  *<*groupId*>org.apache.spark
>   <*artifactId*>spark-core_2.11
>   <*version*>2.0.0
>   <*scope*>provided
> 
> <*dependency*>
>   <*groupId*>org.postgresql
>   <*artifactId*>postgresql
>   <*version*>9.4.1208
> 
> <*dependency*>
>   <*groupId*>com.databricks
>   <*artifactId*>spark-avro_2.11
>   <*version*>3.0.0-preview2
> 
> <*dependency*>
>   <*groupId*>org.apache.spark
>   <*artifactId*>spark-sql_2.11
>   <*version*>2.0.0
>   <*scope*>provided
> 
> <*dependency*>
> <*groupId*>org.scala-lang
> <*artifactId*>scala-library
> <*version*>2.11.8
>   <*scope*>provided
> 
>
>   
> <*build*>
>   <*plugins*>
>   <*plugin*>
>   <*groupId*>org.apache.maven.plugins
>   <*artifactId*>maven-shade-plugin
>   <*version*>2.4.3
>   <*executions*>
>   <*execution*>
>   <*phase*>package
>   <*goals*>
>   <*goal*>shade
>   
>   <*configuration*>
>   <*relocations*>
> <*relocation*>
>   <*pattern*>com.google.common
>   
> <*shadedPattern*>com.myorg.shaded.com.google.common
> 
>   
>   
> <*finalName*>simple-project-1.0-jar-with-dependencies
>   
>   
>   
>   
>   <*plugin*>
> <*groupId*>org.scala-tools
> <*artifactId*>maven-scala-plugin
> <*version*>2.15.2
> <*executions*>
>   <*execution*>
> <*goals*>
>   <*goal*>compile
> 
>   
> 
>   
>
> 
> 
> 
>
>
>
> Now you can compile the scala like so: mvn clean install (I assume you
> have maven installed).
>
>
>
> Now we want to call this from python (assuming spark is your spark
> session):
>
> # get a reference dataframe to do the example on:
>
> df = spark.range(20)
>
>
>
> # get the jvm pointer
>
> jvm = spark.sparkContext._gateway.jvm
>
> # import the class
>
> from py4j.java_gateway import java_import
>
> java_import(jvm, "com.myorg.example.PerformSumUDAF")
>
>
>
> #create an object from the class:
>
> udafObj = jvm.com.myorg.example.PerformSumUDAF()
>
> # define a python function to do the aggregation.
>
> from pyspark.sql.column import Column, _to_java_column, _to_seq
>
> def pythonudaf(c):
>
> # the _to_seq portion is because we need to convert this to a sequence
> of
>
> # input columns the way scala (java) expects them. The returned
>
> # value must then be 

How to add all jars in a folder to executor classpath?

2016-10-18 Thread nitinkak001
I need to add all the jars in hive/lib to my spark job executor classpath. I
tried this

spark.executor.extraLibraryPath=/opt/cloudera/parcels/CDH-5.8.2-1.cdh5.8.2.p0.3/lib/hive/lib
and 
spark.executor.extraClassPath=/opt/cloudera/parcels/CDH-5.8.2-1.cdh5.8.2.p0.3/lib/hive/lib/*

but it does not add any jars to the classpath of the executor. How can I add
all the jars in a folder to executor or driver class path and what if I have
multiple folders? What is the syntax for that?

I am using Spark 1.6.0



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-add-all-jars-in-a-folder-to-executor-classpath-tp27916.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Making more features in Logistic Regression

2016-10-18 Thread aditya1702
Hello,
I am trying to solve a problem of Logistic Regression using Spark. I am
still a newbie to machine learning. I wanted to ask that if I have 2
features for logistic regression and if the features are non-linear
(regularized logistic regression) do we have to make more features by
considering the higher powers of the features?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Making-more-features-in-Logistic-Regression-tp27915.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming 2 Kafka 0.10 Integration for Aggregating Data

2016-10-18 Thread Sean Owen
Try adding the spark-streaming_2.11 artifact as a dependency too. You will
be directly depending on it.

On Tue, Oct 18, 2016 at 2:16 PM Furkan KAMACI 
wrote:

> Hi,
>
> I have a search application and want to monitor queries per second for it.
> I have Kafka at my backend which acts like a bus for messages. Whenever a
> search request is done I publish the nano time of the current system. I
> want to use Spark Streaming to aggregate such data but I am so new to it.
>
> I wanted to follow that example:
> http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
>
> I've added that dependencies:
>
> 
> org.apache.spark
> spark-streaming-kafka-0-10_2.11
> 2.0.1
> 
> 
> org.apache.spark
> spark-core_2.10
> 2.0.1
> 
>
> However I cannot see even Duration class at my dependencies. On the other
> hand given documentation is missing and when you click Java there is no
> code at tabs.
>
> Could you guide me how can I implement monitoring such a metric?
>
> Kind Regards,
> Furkan KAMACI
>


Spark Streaming 2 Kafka 0.10 Integration for Aggregating Data

2016-10-18 Thread Furkan KAMACI
Hi,

I have a search application and want to monitor queries per second for it.
I have Kafka at my backend which acts like a bus for messages. Whenever a
search request is done I publish the nano time of the current system. I
want to use Spark Streaming to aggregate such data but I am so new to it.

I wanted to follow that example:
http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html

I've added that dependencies:


org.apache.spark
spark-streaming-kafka-0-10_2.11
2.0.1


org.apache.spark
spark-core_2.10
2.0.1


However I cannot see even Duration class at my dependencies. On the other
hand given documentation is missing and when you click Java there is no
code at tabs.

Could you guide me how can I implement monitoring such a metric?

Kind Regards,
Furkan KAMACI


Re: mllib model in production web API

2016-10-18 Thread Aseem Bansal
Hi Vincent

I am not sure whether you are asking me or Nicolas. If me, then no we
didn't. Never used Akka and wasn't even aware that it has such
capabilities. Using Java API so we don't have Akka as a dependency right
now.

On Tue, Oct 18, 2016 at 12:47 PM, vincent gromakowski <
vincent.gromakow...@gmail.com> wrote:

> Hi
> Did you try applying the model with akka instead of spark ?
> https://spark-summit.org/eu-2015/events/real-time-anomaly-
> detection-with-spark-ml-and-akka/
>
> Le 18 oct. 2016 5:58 AM, "Aseem Bansal"  a écrit :
>
>> @Nicolas
>>
>> No, ours is different. We required predictions within 10ms time frame so
>> we needed much less latency than that.
>>
>> Every algorithm has some parameters. Correct? We took the parameters from
>> the mllib and used them to create ml package's model. ml package's model's
>> prediction time was much faster compared to mllib package's transformation.
>> So essentially use spark's distributed machine learning library to train
>> the model, save to S3, load from S3 in a different system and then convert
>> it into the vector based API model for actual predictions.
>>
>> There were obviously some transformations involved but we didn't use
>> Pipeline for those transformations. Instead, we re-wrote them for the
>> Vector based API. I know it's not perfect but if we had used the
>> transformations within the pipeline that would make us dependent on spark's
>> distributed API and we didn't see how we will really reach our latency
>> requirements. Would have been much simpler and more DRY if the
>> PipelineModel had a predict method based on vectors and was not distributed.
>>
>> As you can guess it is very much model-specific and more work. If we
>> decide to use another type of Model we will have to add conversion
>> code/transformation code for that also. Only if spark exposed a prediction
>> method which is as fast as the old machine learning package.
>>
>> On Sat, Oct 15, 2016 at 8:42 PM, Nicolas Long 
>> wrote:
>>
>>> Hi Sean and Aseem,
>>>
>>> thanks both. A simple thing which sped things up greatly was simply to
>>> load our sql (for one record effectively) directly and then convert to a
>>> dataframe, rather than using Spark to load it. Sounds stupid, but this took
>>> us from > 5 seconds to ~1 second on a very small instance.
>>>
>>> Aseem: can you explain your solution a bit more? I'm not sure I
>>> understand it. At the moment we load our models from S3
>>> (RandomForestClassificationModel.load(..) ) and then store that in an
>>> object property so that it persists across requests - this is in Scala. Is
>>> this essentially what you mean?
>>>
>>>
>>>
>>>
>>>
>>>
>>> On 12 October 2016 at 10:52, Aseem Bansal  wrote:
>>>
 Hi

 Faced a similar issue. Our solution was to load the model, cache it
 after converting it to a model from mllib and then use that instead of ml
 model.

 On Tue, Oct 11, 2016 at 10:22 PM, Sean Owen  wrote:

> I don't believe it will ever scale to spin up a whole distributed job
> to serve one request. You can look possibly at the bits in mllib-local. 
> You
> might do well to export as something like PMML either with Spark's export
> or JPMML and then load it into a web container and score it, without Spark
> (possibly also with JPMML, OpenScoring)
>
>
> On Tue, Oct 11, 2016, 17:53 Nicolas Long 
> wrote:
>
>> Hi all,
>>
>> so I have a model which has been stored in S3. And I have a Scala
>> webapp which for certain requests loads the model and transforms 
>> submitted
>> data against it.
>>
>> I'm not sure how to run this quickly on a single instance though. At
>> the moment Spark is being bundled up with the web app in an uberjar (sbt
>> assembly).
>>
>> But the process is quite slow. I'm aiming for responses < 1 sec so
>> that the webapp can respond quickly to requests. When I look the Spark 
>> UI I
>> see:
>>
>> Summary Metrics for 1 Completed Tasks
>>
>> MetricMin25th percentileMedian75th percentileMax
>> Duration94 ms94 ms94 ms94 ms94 ms
>> Scheduler Delay0 ms0 ms0 ms0 ms0 ms
>> Task Deserialization Time3 s3 s3 s3 s3 s
>> GC Time2 s2 s2 s2 s2 s
>> Result Serialization Time0 ms0 ms0 ms0 ms0 ms
>> Getting Result Time0 ms0 ms0 ms0 ms0 ms
>> Peak Execution Memory0.0 B0.0 B0.0 B0.0 B0.0 B
>>
>> I don't really understand why deserialization and GC should take so
>> long when the models are already loaded. Is this evidence I am doing
>> something wrong? And where can I get a better understanding on how Spark
>> works under the hood here, and how best to do a 

RE: Aggregate UDF (UDAF) in Python

2016-10-18 Thread Mendelson, Assaf
A simple example:

We have a scala file:


package com.myorg.example

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, 
UserDefinedAggregateFunction}
import org.apache.spark.sql.functions.{rand, sum}
import org.apache.spark.sql.types.{DataType, DoubleType, StructField, 
StructType}

class PerformSumUDAF() extends UserDefinedAggregateFunction {

  def inputSchema: StructType = StructType(Array(StructField("item", 
DoubleType)))

  def bufferSchema: StructType = StructType(Array(StructField("sum", 
DoubleType)))

  def dataType: DataType = DoubleType

  def deterministic: Boolean = true

  def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = 0.toDouble
  }

  def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
buffer(0) = buffer.getDouble(0) + input.getDouble(0)
  }

  def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getDouble(0) + buffer2.getDouble(0)
  }

  def evaluate(buffer: Row): Any = {
buffer.getDouble(0)
  }
}


We place the file under 
myroot/src/main/scala/com/myorg/example/ExampleUDAF.scala
Under myroot we create a pom file (sorry for not cleaning it up, it includes 
some stuff you probably not need like guava and avro)


  edu.berkeley
  simple-project
  4.0.0
  example packages
  jar
  1.0
  
UTF-8
1.8
1.8
  
  
  
  com.google.guava
  guava
  19.0
  
 
  org.apache.spark
  spark-core_2.11
  2.0.0
  provided


  org.postgresql
  postgresql
  9.4.1208


  com.databricks
  spark-avro_2.11
  3.0.0-preview2


  org.apache.spark
  spark-sql_2.11
  2.0.0
  provided


org.scala-lang
scala-library
2.11.8
  provided


  

  
  
  org.apache.maven.plugins
  maven-shade-plugin
  2.4.3
  
  
  package
  
  shade
  
  
  

  com.google.common
  
com.myorg.shaded.com.google.common

  
  
simple-project-1.0-jar-with-dependencies
  
  
  
  
  
org.scala-tools
maven-scala-plugin
2.15.2

  

  compile

  

  





Now you can compile the scala like so: mvn clean install (I assume you have 
maven installed).

Now we want to call this from python (assuming spark is your spark session):
# get a reference dataframe to do the example on:
df = spark.range(20)

# get the jvm pointer
jvm = spark.sparkContext._gateway.jvm
# import the class
from py4j.java_gateway import java_import
java_import(jvm, "com.myorg.example.PerformSumUDAF")

#create an object from the class:
udafObj = jvm.com.myorg.example.PerformSumUDAF()
# define a python function to do the aggregation.
from pyspark.sql.column import Column, _to_java_column, _to_seq
def pythonudaf(c):
# the _to_seq portion is because we need to convert this to a sequence of
# input columns the way scala (java) expects them. The returned
# value must then be converted to a pyspark Column
return Column(udafObj.apply(_to_seq(spark.sparkContext, [c], 
_to_java_column)))

# now lets use the function
df.agg(pythonudaf(df.id)).show()

Lastly when you run, make sure to use both –jars and --driver-class-path with 
the jar created from scala to make sure it is available in all nodes.



From: Tobi Bosede [mailto:ani.to...@gmail.com]
Sent: Monday, October 17, 2016 10:15 PM
To: Mendelson, Assaf
Cc: Holden Karau; user
Subject: Re: Aggregate UDF (UDAF) in Python

Thanks Assaf. Yes please provide an example of how to wrap code for python. I 
am leaning towards scala.

On Mon, Oct 17, 2016 at 1:50 PM, Mendelson, Assaf 
> wrote:
A possible (bad) workaround would be to use the collect_list function. This 
will give you all the values in an array (list) and you can then create a UDF 
to do the aggregation yourself. This would be very slow and cost a lot of 
memory but it would work if your cluster can handle it.
This is the only workaround I can think of, otherwise you  will need to write 
the UDAF in java/scala and wrap it for python use. If you need an example on 
how to do so I can provide one.
Assaf.

From: Tobi Bosede [mailto:ani.to...@gmail.com]
Sent: Sunday, October 16, 2016 7:49 PM
To: Holden Karau
Cc: user
Subject: Re: Aggregate UDF (UDAF) in Python

OK, I misread the year on the dev list. Can you comment on work arounds? (I.e. 
question about if scala/java are the only option.)

On Sun, Oct 16, 2016 at 12:09 PM, Holden Karau 

Re: Contributing to PySpark

2016-10-18 Thread Holden Karau
Hi Krishna,

Thanks for your interest contributing to PySpark! I don't personally use
either of those IDEs so I'll leave that part for someone else to answer -
but in general you can find the building spark documentation at
http://spark.apache.org/docs/latest/building-spark.html which includes
notes on how to run the Python tests as well. You will also probably want
to check out the contributing to Spark guide at
https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark.

Cheers,

Holden :)

On Tue, Oct 18, 2016 at 2:16 AM, Krishna Kalyan 
wrote:

> Hello,
> I am a masters student. Could someone please let me know how set up my dev
> working environment to contribute to pyspark.
> Questions I had were:
> a) Should I use Intellij Idea or PyCharm?.
> b) How do I test my changes?.
>
> Regards,
> Krishna
>



-- 
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau


Re: About Error while reading large JSON file in Spark

2016-10-18 Thread Chetan Khatri
Dear Xi shen,

Thank you for getting back to question.

The approach i am following are as below:
I have MSSQL server as Enterprise data lack.

1. run Java jobs and generated JSON files, every file is almost 6 GB.
*Correct spark need every JSON on **separate line, so i did *
sed -e 's/}/}\n/g' -s old-file.json > new-file.json
to get every json element on separate lines.
2. uploaded to s3 bucket and reading from their using
sqlContext.read.json() function, where i am getting above error.

Note: If i am running for small size files then i am not getting this error
where JSON elements are almost same structured.

*Current approach:*


   -  splitting large JSON(6 GB) to 1-1 GB then will process.

Note: Machine size is , 1 master and 2 slave, each 4 vcore, 26 GB RAM


Thanks.




On Tue, Oct 18, 2016 at 2:50 PM, Xi Shen  wrote:

> It is a plain Java IO error. Your line is too long. You should alter your
> JSON schema, so each line is a small JSON object.
>
> Please do not concatenate all the object into an array, then write the
> array in one line. You will have difficulty handling your super large JSON
> array in Spark anyway.
>
> Because one array is one object, it cannot be split into multiple
> partition.
>
>
> On Tue, Oct 18, 2016 at 3:44 PM Chetan Khatri 
> wrote:
>
>> Hello Community members,
>>
>> I am getting error while reading large JSON file in spark,
>>
>> *Code:*
>>
>> val landingVisitor = sqlContext.read.json("s3n://
>> hist-ngdp/lvisitor/lvisitor-01-aug.json")
>>
>> *Error:*
>>
>> 16/10/18 07:30:30 ERROR Executor: Exception in task 8.0 in stage 0.0 (TID
>> 8)
>> java.io.IOException: Too many bytes before newline: 2147483648
>> at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:249)
>> at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
>> at org.apache.hadoop.mapred.LineRecordReader.(
>> LineRecordReader.java:135)
>> at org.apache.hadoop.mapred.TextInputFormat.getRecordReader(
>> TextInputFormat.java:67)
>> at org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:237)
>>
>> What would be resolution for the same ?
>>
>> Thanks in Advance !
>>
>>
>> --
>> Yours Aye,
>> Chetan Khatri.
>>
>> --
>
>
> Thanks,
> David S.
>



-- 
Yours Aye,
Chetan Khatri.
M.+91 7 80574
Data Science Researcher
INDIA

​​Statement of Confidentiality

The contents of this e-mail message and any attachments are confidential
and are intended solely for addressee. The information may also be legally
privileged. This transmission is sent in trust, for the sole purpose of
delivery to the intended recipient. If you have received this transmission
in error, any use, reproduction or dissemination of this transmission is
strictly prohibited. If you are not the intended recipient, please
immediately notify the sender by reply e-mail or phone and delete this
message and its attachments, if any.​​


Re: jdbcRDD for data ingestion from RDBMS

2016-10-18 Thread Teng Qiu
Hi Ninad, i believe the purpose of jdbcRDD is to use RDBMS as an addtional
data source during the data processing, main goal of spark is still
analyzing data from HDFS-like file system.

to use spark as a data integration tool to transfer billions of records
from RDBMS to HDFS etc. could work, but may not be the best tool... Sqoop
with --direct sounds better, but the configuration costs, sqoop should be
used for regular data integration tasks.

not sure if your client need transfer billions of records periodically, if
it is only an initial load, for such an one-off task, maybe a bash script
with COPY command is more easier and faster :)

Best,

Teng


2016-10-18 4:24 GMT+02:00 Ninad Shringarpure :

>
> Hi Team,
>
> One of my client teams is trying to see if they can use Spark to source
> data from RDBMS instead of Sqoop.  Data would be substantially large in the
> order of billions of records.
>
> I am not sure reading the documentations whether jdbcRDD by design is
> going to be able to scale well for this amount of data. Plus some in-built
> features provided in Sqoop like --direct might give better performance than
> straight up jdbc.
>
> My primary question to this group is if it is advisable to use jdbcRDD for
> data sourcing and can we expect it to scale. Also performance wise how
> would it compare to Sqoop.
>
> Please let me know your thoughts and any pointers if anyone in the group
> has already implemented it.
>
> Thanks,
> Ninad
>
>


Re: About Error while reading large JSON file in Spark

2016-10-18 Thread Xi Shen
It is a plain Java IO error. Your line is too long. You should alter your
JSON schema, so each line is a small JSON object.

Please do not concatenate all the object into an array, then write the
array in one line. You will have difficulty handling your super large JSON
array in Spark anyway.

Because one array is one object, it cannot be split into multiple partition.


On Tue, Oct 18, 2016 at 3:44 PM Chetan Khatri 
wrote:

> Hello Community members,
>
> I am getting error while reading large JSON file in spark,
>
> *Code:*
>
> val landingVisitor =
> sqlContext.read.json("s3n://hist-ngdp/lvisitor/lvisitor-01-aug.json")
>
> *Error:*
>
> 16/10/18 07:30:30 ERROR Executor: Exception in task 8.0 in stage 0.0 (TID
> 8)
> java.io.IOException: Too many bytes before newline: 2147483648
> at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:249)
> at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
> at
> org.apache.hadoop.mapred.LineRecordReader.(LineRecordReader.java:135)
> at
> org.apache.hadoop.mapred.TextInputFormat.getRecordReader(TextInputFormat.java:67)
> at org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:237)
>
> What would be resolution for the same ?
>
> Thanks in Advance !
>
>
> --
> Yours Aye,
> Chetan Khatri.
>
> --


Thanks,
David S.


Contributing to PySpark

2016-10-18 Thread Krishna Kalyan
Hello,
I am a masters student. Could someone please let me know how set up my dev
working environment to contribute to pyspark.
Questions I had were:
a) Should I use Intellij Idea or PyCharm?.
b) How do I test my changes?.

Regards,
Krishna


Re: Help in generating unique Id in spark row

2016-10-18 Thread ayan guha
Do you have any primary key or unique identifier in your data? Even if
multiple columns can make a composite key? In other words, can your data
have exactly same 2 rows with different unique ID? Also, do you have to
have numeric ID?

You may want to pursue hashing algorithm such as sha group to convert
single or composite unique columns to an ID.
On 18 Oct 2016 15:32, "Saurav Sinha"  wrote:

> Can any one help me out
>
> On Mon, Oct 17, 2016 at 7:27 PM, Saurav Sinha 
> wrote:
>
>> Hi,
>>
>> I am in situation where I want to generate unique Id for each row.
>>
>> I have use monotonicallyIncreasingId but it is giving increasing values
>> and start generating from start if it fail.
>>
>> I have two question here:
>>
>> Q1. Does this method give me unique id even in failure situation becaue I
>> want to use that ID in my solr id.
>>
>> Q2. If answer to previous question is NO. Then Is there way yo generate
>> UUID for each row which is uniqe and not updatedable.
>>
>> As I have come up with situation where UUID is updated
>>
>>
>> val idUDF = udf(() => UUID.randomUUID().toString)
>> val a = withColumn("alarmUUID", lit(idUDF()))
>> a.persist(StorageLevel.MEMORY_AND_DISK)
>> rawDataDf.registerTempTable("rawAlarms")
>>
>> ///
>> /// I do some joines
>>
>> but as I reach further below
>>
>> I do sonthing like
>> b is transformation of a
>> sqlContext.sql("""Select a.alarmUUID,b.alarmUUID
>>   from a right outer join b on a.alarmUUID =
>> b.alarmUUID""")
>>
>> it give output as
>>
>> +++
>>
>> |   alarmUUID|   alarmUUID|
>> +++
>> |7d33a516-5532-410...|null|
>> |null|2439d6db-16a2-44b...|
>> +++
>>
>>
>>
>> --
>> Thanks and Regards,
>>
>> Saurav Sinha
>>
>> Contact: 9742879062
>>
>
>
>
> --
> Thanks and Regards,
>
> Saurav Sinha
>
> Contact: 9742879062
>


Re: Did anybody come across this random-forest issue with spark 2.0.1.

2016-10-18 Thread 市场部
Hi YanBo
Thank you very much.
You are totally correct!

I just looked up spark document of 2.0.1.  It says that "Maximum memory in MB 
allocated to histogram aggregation. If too small, then 1 node will be split per 
iteration, and its aggregates may exceed this size. (default = 256 MB)”

Although this setting isn't altered in spark 2.0,  it didn’t  occur with my ml 
source code in spark 1.6.1.   It seems that implementation of random forest 
algorithm  in spark 2.0 occupied more memory and altered the threshold to 
trigger this warning in spite of no change of the default value to maxMemoryInMB



发件人: Yanbo Liang >
日期: 2016年10月18日 星期二 上午11:55
至: zhangjianxin 
>
抄送: Xi Shen >, 
"user@spark.apache.org" 
>
主题: Re: Did anybody come across this random-forest issue with spark 2.0.1.

​Please increase the value of "maxMemoryInMB"​ of your RandomForestClassifier 
or RandomForestRegressor.
It's a warning which will not affect the result but may lead your training 
slower.

Thanks
Yanbo

On Mon, Oct 17, 2016 at 8:21 PM, 张建鑫(市场部) 
> wrote:
Hi Xi Shen

The warning message wasn’t  removed after I had upgraded my java to V8,
but  anyway I appreciate your kind help.

Since it’s just a WARN, I suppose I can bear with it and nothing bad would 
really happen. Am I right?


6/10/18 11:12:42 WARN RandomForest: Tree learning is using approximately 
268437864 bytes per iteration, which exceeds requested limit 
maxMemoryUsage=268435456. This allows splitting 80088 nodes in this iteration.
16/10/18 11:13:07 WARN RandomForest: Tree learning is using approximately 
268436304 bytes per iteration, which exceeds requested limit 
maxMemoryUsage=268435456. This allows splitting 80132 nodes in this iteration.
16/10/18 11:13:32 WARN RandomForest: Tree learning is using approximately 
268437816 bytes per iteration, which exceeds requested limit 
maxMemoryUsage=268435456. This allows splitting 80082 nodes in this iteration.



发件人: zhangjianxin 
>
日期: 2016年10月17日 星期一 下午8:16
至: Xi Shen >
抄送: "user@spark.apache.org" 
>
主题: Re: Did anybody come across this random-forest issue with spark 2.0.1.

Hi Xi Shen

Not yet.  For the moment my idk for spark is still V7. Thanks for your 
reminding, I will try it out by upgrading java.

发件人: Xi Shen >
日期: 2016年10月17日 星期一 下午8:00
至: zhangjianxin 
>, 
"user@spark.apache.org" 
>
主题: Re: Did anybody come across this random-forest issue with spark 2.0.1.

Did you also upgrade to Java from v7 to v8?

On Mon, Oct 17, 2016 at 7:19 PM 张建鑫(市场部) 
> wrote:

Did anybody encounter this problem before and why it happens , how to solve it? 
 The same training data and same source code work in 1.6.1, however become 
lousy in 2.0.1

[X]
--

Thanks,
David S.



Re: Accessing Hbase tables through Spark, this seems to work

2016-10-18 Thread Mich Talebzadeh
The design really needs to look at other stack as well.

If the visualisation layer is going to use Tableau then you cannot use
Spark functional programming. Only Spark SQL or anything that works with
SQL like Hive or Phoenix.

Tableau is not a real time dashboard so for analytics it maps tables in
database as it sees it. It has ODBC/JDBC connection to Hive (don't know
about Phoenix).

So that is the advantage of Hive. Any caching, yes you can cache some data
in Tableau Server cache but we all agree that it is only finite. The same
is true for anything that relies on memory Hive + LLAP, any in-memory
database (I tried Tableau on Oracle TimesTen), you can only cache certain
amount of data and no one is going to splash for large memory for analytics.

Bear in mind that performance is a deployment issue and you are unlikely to
be able to create the same conditions as PROD in a test environment.



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 18 October 2016 at 08:18, Jörn Franke  wrote:

> Careful Hbase with Phoenix is only in certain scenarios faster. When it is
> about processing small amounts out of a bigger amount of data (depends on
> node memory, the operation etc).  Hive+tez+orc can  be rather competitive,
> llap makes sense for interactive ad-hoc queries that are rather similar.
> Both Phoenix and hive follow different purposes with a different
> architecture and underlying data structure.
>
> On 18 Oct 2016, at 07:44, Mich Talebzadeh 
> wrote:
>
> yes Hive external table is partitioned on a daily basis (datestamp below)
>
> CREATE EXTERNAL TABLE IF NOT EXISTS ${DATABASE}.externalMarketData (
>  KEY string
>, SECURITY string
>, TIMECREATED string
>, PRICE float
> )
> COMMENT 'From prices Kakfa delivered by Flume location by day'
> ROW FORMAT serde 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
> STORED AS TEXTFILE
> LOCATION 'hdfs://rhes564:9000/data/prices/'
> --TBLPROPERTIES ("skip.header.line.count"="1")
> ;
> ALTER TABLE ${DATABASE}.externalMarketData set location
> 'hdfs://rhes564:9000/data/prices/${TODAY}';
>
> and there is insert/overwrite into managed table every 15 minutes.
>
> INSERT OVERWRITE TABLE ${DATABASE}.marketData PARTITION (DateStamp =
> "${TODAY}")
> SELECT
>   KEY
> , SECURITY
> , TIMECREATED
> , PRICE
> , 1
> , CAST(from_unixtime(unix_timestamp()) AS timestamp)
> FROM ${DATABASE}.externalMarketData
>
> That works fine. However, Hbase is much faster for data retrieval with
> phoenix
>
> When we get Hive with LLAP, I gather Hive will replace Hbase.
>
> So in summary we have
>
>
>1. raw data delivered to HDFS
>2. data ingested into Hbase via cron
>3. HDFS directory is mapped to Hive external table
>4. There is Hive managed table with added optimisation/indexing (ORC)
>
>
> There are a number of ways of doing it as usual.
>
> Thanks
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 18 October 2016 at 00:48, ayan guha  wrote:
>
>> I do not see a rationale to have hbase in this scheme of thingsmay be
>> I am missing something?
>>
>> If data is delivered in HDFS, why not just add partition to an existing
>> Hive table?
>>
>> On Tue, Oct 18, 2016 at 8:23 AM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Thanks Mike,
>>>
>>> My test csv data comes as
>>>
>>> UUID, ticker,  timecreated,
>>> price
>>> a2c844ed-137f-4820-aa6e-c49739e46fa6, S01, 2016-10-17T22:02:09,
>>> 53.36665625650533484995
>>> a912b65e-b6bc-41d4-9e10-d6a44ea1a2b0, S02, 2016-10-17T22:02:09,
>>> 86.31917515824627016510
>>> 5f4e3a9d-05cc-41a2-98b3-40810685641e, S03, 2016-10-17T22:02:09,
>>> 95.48298277703729129559
>>>
>>>
>>> And this is my Hbase table with one column family
>>>
>>> create 'marketDataHbase', 'price_info'
>>>
>>> It is populated every 15 minutes 

Re: tutorial for access elements of dataframe columns and column values of a specific rows?

2016-10-18 Thread Divya Gehlot
Can you please elaborate your use case ?

On 18 October 2016 at 15:48, muhammet pakyürek  wrote:

>
>
>
>
>
> --
> *From:* muhammet pakyürek 
> *Sent:* Monday, October 17, 2016 11:51 AM
> *To:* user@spark.apache.org
> *Subject:* rdd and dataframe columns dtype
>
>
> how can i set columns dtype of rdd
>
>
>
>


tutorial for access elements of dataframe columns and column values of a specific rows?

2016-10-18 Thread muhammet pakyürek






From: muhammet pakyürek 
Sent: Monday, October 17, 2016 11:51 AM
To: user@spark.apache.org
Subject: rdd and dataframe columns dtype


how can i set columns dtype of rdd




About Error while reading large JSON file in Spark

2016-10-18 Thread Chetan Khatri
Hello Community members,

I am getting error while reading large JSON file in spark,

*Code:*

val landingVisitor =
sqlContext.read.json("s3n://hist-ngdp/lvisitor/lvisitor-01-aug.json")

*Error:*

16/10/18 07:30:30 ERROR Executor: Exception in task 8.0 in stage 0.0 (TID 8)
java.io.IOException: Too many bytes before newline: 2147483648
at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:249)
at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
at
org.apache.hadoop.mapred.LineRecordReader.(LineRecordReader.java:135)
at
org.apache.hadoop.mapred.TextInputFormat.getRecordReader(TextInputFormat.java:67)
at org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:237)

What would be resolution for the same ?

Thanks in Advance !


-- 
Yours Aye,
Chetan Khatri.


Re: Accessing Hbase tables through Spark, this seems to work

2016-10-18 Thread Jörn Franke
Careful Hbase with Phoenix is only in certain scenarios faster. When it is 
about processing small amounts out of a bigger amount of data (depends on node 
memory, the operation etc).  Hive+tez+orc can  be rather competitive, llap 
makes sense for interactive ad-hoc queries that are rather similar. Both 
Phoenix and hive follow different purposes with a different architecture and 
underlying data structure.

> On 18 Oct 2016, at 07:44, Mich Talebzadeh  wrote:
> 
> yes Hive external table is partitioned on a daily basis (datestamp below)
> 
> CREATE EXTERNAL TABLE IF NOT EXISTS ${DATABASE}.externalMarketData (
>  KEY string
>, SECURITY string
>, TIMECREATED string
>, PRICE float
> )
> COMMENT 'From prices Kakfa delivered by Flume location by day'
> ROW FORMAT serde 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
> STORED AS TEXTFILE
> LOCATION 'hdfs://rhes564:9000/data/prices/'
> --TBLPROPERTIES ("skip.header.line.count"="1")
> ;
> ALTER TABLE ${DATABASE}.externalMarketData set location 
> 'hdfs://rhes564:9000/data/prices/${TODAY}';
> 
> and there is insert/overwrite into managed table every 15 minutes.
> 
> INSERT OVERWRITE TABLE ${DATABASE}.marketData PARTITION (DateStamp = 
> "${TODAY}")
> SELECT
>   KEY
> , SECURITY
> , TIMECREATED
> , PRICE
> , 1
> , CAST(from_unixtime(unix_timestamp()) AS timestamp)
> FROM ${DATABASE}.externalMarketData
> 
> That works fine. However, Hbase is much faster for data retrieval with phoenix
> 
> When we get Hive with LLAP, I gather Hive will replace Hbase.
> 
> So in summary we have
> 
> raw data delivered to HDFS
> data ingested into Hbase via cron
> HDFS directory is mapped to Hive external table
> There is Hive managed table with added optimisation/indexing (ORC)
> 
> There are a number of ways of doing it as usual.
> 
> Thanks
> 
> 
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> http://talebzadehmich.wordpress.com
> 
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  
> 
>> On 18 October 2016 at 00:48, ayan guha  wrote:
>> I do not see a rationale to have hbase in this scheme of thingsmay be I 
>> am missing something? 
>> 
>> If data is delivered in HDFS, why not just add partition to an existing Hive 
>> table? 
>> 
>>> On Tue, Oct 18, 2016 at 8:23 AM, Mich Talebzadeh 
>>>  wrote:
>>> Thanks Mike,
>>>  
>>> My test csv data comes as
>>>  
>>> UUID, ticker,  timecreated,  price
>>> a2c844ed-137f-4820-aa6e-c49739e46fa6, S01, 2016-10-17T22:02:09,  
>>> 53.36665625650533484995
>>> a912b65e-b6bc-41d4-9e10-d6a44ea1a2b0, S02, 2016-10-17T22:02:09,  
>>> 86.31917515824627016510
>>> 5f4e3a9d-05cc-41a2-98b3-40810685641e, S03, 2016-10-17T22:02:09,  
>>> 95.48298277703729129559
>>>  
>>>  
>>> And this is my Hbase table with one column family
>>>  
>>> create 'marketDataHbase', 'price_info'
>>>  
>>> It is populated every 15 minutes from test.csv files delivered via Kafka 
>>> and Flume to HDFS
>>>  
>>> Create a fat csv file based on all small csv files for today --> 
>>> prices/2016-10-17
>>> Populate data into Hbase table using 
>>> org.apache.hadoop.hbase.mapreduce.ImportTsv
>>> This is pretty quick using MapReduce
>>>  
>>> That importTsv only appends new rows to Hbase table as the choice of UUID 
>>> as rowKey avoids any issues.
>>>  
>>> So I only have 15 minutes lag in my batch Hbase table.
>>>  
>>> I have both Hive ORC tables and Phoenix views on top of this Hbase tables.
>>>  
>>> Phoenix offers the fastest response if used on top of Hbase. unfortunately, 
>>> Spark 2 with Phoenix is broken
>>> Spark on Hive on Hbase looks OK. This works fine with Spark 2
>>> Spark on Hbase tables directly using key, value DFs for each column. Not as 
>>> fast as 2 but works. I don't think a DF is a good choice for a key, value 
>>> pair?
>>> Now if I use Zeppelin to read from Hbase
>>>  
>>> I can use Phoenix JDBC. That looks very fast
>>> I can use Spark csv directly on HDFS csv files.
>>> I can use Spark on Hive tables
>>>  
>>> If I use Tableau on Hbase data then, only sql like code is useful. Phoenix 
>>> or Hive
>>>  
>>> I don't want to change the design now. But admittedly Hive is the best SQL 
>>> on top of Hbase. Next release of Hive is going to have in-memory database 
>>> (LLAP) so we can cache Hive tables in memory. That will be faster. Many 
>>> people underestimate Hive but I still believe it has a lot to offer besides 
>>> serious ANSI compliant SQL.
>>>  
>>> Regards
>>>  
>>>  Mich
>>>  
>>>  
>>>  
>>>  
>>>  
>>>  
>>>  
>>>  
>>> 

Re: mllib model in production web API

2016-10-18 Thread vincent gromakowski
Hi
Did you try applying the model with akka instead of spark ?
https://spark-summit.org/eu-2015/events/real-time-anomaly-detection-with-spark-ml-and-akka/

Le 18 oct. 2016 5:58 AM, "Aseem Bansal"  a écrit :

> @Nicolas
>
> No, ours is different. We required predictions within 10ms time frame so
> we needed much less latency than that.
>
> Every algorithm has some parameters. Correct? We took the parameters from
> the mllib and used them to create ml package's model. ml package's model's
> prediction time was much faster compared to mllib package's transformation.
> So essentially use spark's distributed machine learning library to train
> the model, save to S3, load from S3 in a different system and then convert
> it into the vector based API model for actual predictions.
>
> There were obviously some transformations involved but we didn't use
> Pipeline for those transformations. Instead, we re-wrote them for the
> Vector based API. I know it's not perfect but if we had used the
> transformations within the pipeline that would make us dependent on spark's
> distributed API and we didn't see how we will really reach our latency
> requirements. Would have been much simpler and more DRY if the
> PipelineModel had a predict method based on vectors and was not distributed.
>
> As you can guess it is very much model-specific and more work. If we
> decide to use another type of Model we will have to add conversion
> code/transformation code for that also. Only if spark exposed a prediction
> method which is as fast as the old machine learning package.
>
> On Sat, Oct 15, 2016 at 8:42 PM, Nicolas Long 
> wrote:
>
>> Hi Sean and Aseem,
>>
>> thanks both. A simple thing which sped things up greatly was simply to
>> load our sql (for one record effectively) directly and then convert to a
>> dataframe, rather than using Spark to load it. Sounds stupid, but this took
>> us from > 5 seconds to ~1 second on a very small instance.
>>
>> Aseem: can you explain your solution a bit more? I'm not sure I
>> understand it. At the moment we load our models from S3
>> (RandomForestClassificationModel.load(..) ) and then store that in an
>> object property so that it persists across requests - this is in Scala. Is
>> this essentially what you mean?
>>
>>
>>
>>
>>
>>
>> On 12 October 2016 at 10:52, Aseem Bansal  wrote:
>>
>>> Hi
>>>
>>> Faced a similar issue. Our solution was to load the model, cache it
>>> after converting it to a model from mllib and then use that instead of ml
>>> model.
>>>
>>> On Tue, Oct 11, 2016 at 10:22 PM, Sean Owen  wrote:
>>>
 I don't believe it will ever scale to spin up a whole distributed job
 to serve one request. You can look possibly at the bits in mllib-local. You
 might do well to export as something like PMML either with Spark's export
 or JPMML and then load it into a web container and score it, without Spark
 (possibly also with JPMML, OpenScoring)


 On Tue, Oct 11, 2016, 17:53 Nicolas Long  wrote:

> Hi all,
>
> so I have a model which has been stored in S3. And I have a Scala
> webapp which for certain requests loads the model and transforms submitted
> data against it.
>
> I'm not sure how to run this quickly on a single instance though. At
> the moment Spark is being bundled up with the web app in an uberjar (sbt
> assembly).
>
> But the process is quite slow. I'm aiming for responses < 1 sec so
> that the webapp can respond quickly to requests. When I look the Spark UI 
> I
> see:
>
> Summary Metrics for 1 Completed Tasks
>
> MetricMin25th percentileMedian75th percentileMax
> Duration94 ms94 ms94 ms94 ms94 ms
> Scheduler Delay0 ms0 ms0 ms0 ms0 ms
> Task Deserialization Time3 s3 s3 s3 s3 s
> GC Time2 s2 s2 s2 s2 s
> Result Serialization Time0 ms0 ms0 ms0 ms0 ms
> Getting Result Time0 ms0 ms0 ms0 ms0 ms
> Peak Execution Memory0.0 B0.0 B0.0 B0.0 B0.0 B
>
> I don't really understand why deserialization and GC should take so
> long when the models are already loaded. Is this evidence I am doing
> something wrong? And where can I get a better understanding on how Spark
> works under the hood here, and how best to do a standalone/bundled jar
> deployment?
>
> Thanks!
>
> Nic
>

>>>
>>
>