Re: Question on how to get appended data from structured streaming

2017-08-20 Thread Yanpeng Lin
I am trying to implements some online algorithms based on structured
streaming currently.
My requirement is fetching only delta data at each trigger time from memory
and calculating and updating global variables at the same time.
Here are 2 points I found it's difficult:
1. with foreach writer, it is able to get appended data on each worker, but
it is impossible to pass and update global variables.
2. foreach writer only deal with one piece of data at a time, is there any
other plan for supporting mini-batch at a time?

I don't know if I am using it in the right way or is there any best
practice for implementing online algorithms using Spark.

Thanks.
Yanpeng

On Mon, Aug 21, 2017 at 10:05 AM, Michael Armbrust 
wrote:

> What is your end goal?  Right now the foreach writer is the way to do
> arbitrary processing on the data produced by various output modes.
>
>
> On Sun, Aug 20, 2017 at 12:23 PM, Yanpeng Lin  wrote:
>
>> Hello,
>>
>> I am new to Spark.
>> It would be appreciated if anyone could help me understand how to get
>> appended data from structured streaming. According to the document
>> ,
>> data stream could be treated as new rows appended to unbounded table. I
>> want to know besides writing out data to external storage to get appended
>> data only at every time, is there any other way to get appended data? like
>> from memory directly.
>>
>> Here is my case. I had a Kafka source keeping publish data to Spark with
>> `test` topic.
>>
>> val source = spark.readStream.format("kafka")
>>  .option("kafka.bootstrap.servers",
>> "broker:9092")
>>  .option("subscribe", "test")\
>>  .load()
>>
>> I tried that write stream with format `memory` like the following:
>>
>> val query = source.writeStream.format("memory")
>>   .trigger(ProcessingTime("3 seconds"))
>>   .queryName("tests").outputMode
>> (OutputMode.Append).start()
>> spark.sql("select topic, value from tests")
>> The result table `tests` contains all data from the beginning of stream.
>> like
>>
>> Trigger Time, Topic, Value
>> t1 test,   1
>> t1 test,   2
>> t2 test,   3
>> t3 test,   4
>>
>> By appended data I mean only the delta data after each trigger. For
>> example, after trigger time t1, rows of value 1 and 2 are newly appended.
>> After trigger time t2, row of value 3 will be treated as newly appended.
>> And after t3, row of value 4 could be fetched as newly appended.
>> I understand each appended data could be processed using `ForeachWriter`,
>> but if I want to fetch all newly appended data after any trigger time,
>> is there any way to do that directly from dataframe?
>>
>> Thanks!
>> Yanpeng
>>
>
>


Re: Question on how to get appended data from structured streaming

2017-08-20 Thread Michael Armbrust
What is your end goal?  Right now the foreach writer is the way to do
arbitrary processing on the data produced by various output modes.

On Sun, Aug 20, 2017 at 12:23 PM, Yanpeng Lin  wrote:

> Hello,
>
> I am new to Spark.
> It would be appreciated if anyone could help me understand how to get
> appended data from structured streaming. According to the document
> ,
> data stream could be treated as new rows appended to unbounded table. I
> want to know besides writing out data to external storage to get appended
> data only at every time, is there any other way to get appended data? like
> from memory directly.
>
> Here is my case. I had a Kafka source keeping publish data to Spark with
> `test` topic.
>
> val source = spark.readStream.format("kafka")
>  .option("kafka.bootstrap.servers",
> "broker:9092")
>  .option("subscribe", "test")\
>  .load()
>
> I tried that write stream with format `memory` like the following:
>
> val query = source.writeStream.format("memory")
>   .trigger(ProcessingTime("3 seconds"))
>   .queryName("tests").outputMode
> (OutputMode.Append).start()
> spark.sql("select topic, value from tests")
> The result table `tests` contains all data from the beginning of stream.
> like
>
> Trigger Time, Topic, Value
> t1 test,   1
> t1 test,   2
> t2 test,   3
> t3 test,   4
>
> By appended data I mean only the delta data after each trigger. For
> example, after trigger time t1, rows of value 1 and 2 are newly appended.
> After trigger time t2, row of value 3 will be treated as newly appended.
> And after t3, row of value 4 could be fetched as newly appended.
> I understand each appended data could be processed using `ForeachWriter`,
> but if I want to fetch all newly appended data after any trigger time,
> is there any way to do that directly from dataframe?
>
> Thanks!
> Yanpeng
>


a set of practice and LAB

2017-08-20 Thread Mohsen Pahlevanzadeh

Dear All,


I need to a set of practice and LAB with sparc and hadoop, You will make 
me happy for your help.


Yours,
Mohsen

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark hive overwrite is very very slow

2017-08-20 Thread KhajaAsmath Mohammed
I tried all the approaches.

1.Partitioned by year,month,day on hive table with parquet format when
table is created in impala.
2. Dataset from hive is not partitioned.  used insert overwrite
hivePartitonedTable partition(year,month,day) select * from
tempViewOFDataset . Also tried
Dataset.write.mode(overwrite).insertInto(hivePartitonedTable )
3. Tried approach of repartitioning dataset before inserting into hive
table as below.
unionedDS.repartition(unionedDS("year"),unionedDS("month"),unionedDS("day"))

None of the approaches helped me with performance.


On Sun, Aug 20, 2017 at 1:35 PM, ayan guha  wrote:

> Just curious - is your dataset partitioned on your partition columns?
>
> On Mon, 21 Aug 2017 at 3:54 am, KhajaAsmath Mohammed <
> mdkhajaasm...@gmail.com> wrote:
>
>> We are in cloudera CDH5.10 and we are using spark 2 that comes with
>> cloudera.
>>
>> Coming to second solution, creating a temporary view on dataframe but it
>> didnt improve my performance too.
>>
>> I do remember performance was very fast when doing whole overwrite table
>> without partitons but the problem started after using partitions.
>>
>> On Sun, Aug 20, 2017 at 12:46 PM, Jörn Franke 
>> wrote:
>>
>>> Ah i see then I would check also directly in Hive if you have issues to
>>> insert data in the Hive table. Alternatively you can try to register
>>> the df as temptable and do a insert into the Hive table from the temptable
>>> using Spark sql ("insert into table hivetable select * from temptable")
>>>
>>>
>>> You seem to use Cloudera so you probably have a very outdated Hive
>>> version. So you could switch to a distribution having a recent version of
>>> Hive 2 with Tez+llap - these are much more performant with much more
>>> features.
>>>
>>> Alternatively you can try to register the df as temptable and do a
>>> insert into the Hive table from the temptable using Spark sql ("insert into
>>> table hivetable select * from temptable")
>>>
>>> On 20. Aug 2017, at 18:47, KhajaAsmath Mohammed 
>>> wrote:
>>>
>>> Hi,
>>>
>>> I have created hive table in impala first with storage format as
>>> parquet. With dataframe from spark I am tryinig to insert into the same
>>> table with below syntax.
>>>
>>> Table is partitoned by year,month,day
>>> ds.write.mode(SaveMode.Overwrite).insertInto("db.parqut_table")
>>>
>>> https://issues.apache.org/jira/browse/SPARK-20049
>>>
>>> I saw something in the above link not sure if that is same thing in my
>>> case.
>>>
>>> Thanks,
>>> Asmath
>>>
>>> On Sun, Aug 20, 2017 at 11:42 AM, Jörn Franke 
>>> wrote:
>>>
 Have you made sure that the saveastable stores them as parquet?

 On 20. Aug 2017, at 18:07, KhajaAsmath Mohammed <
 mdkhajaasm...@gmail.com> wrote:

 we are using parquet tables, is it causing any performance issue?

 On Sun, Aug 20, 2017 at 9:09 AM, Jörn Franke 
 wrote:

> Improving the performance of Hive can be also done by switching to
> Tez+llap as an engine.
> Aside from this : you need to check what is the default format that it
> writes to Hive. One issue for the slow storing into a hive table could be
> that it writes by default to csv/gzip or csv/bzip2
>
> > On 20. Aug 2017, at 15:52, KhajaAsmath Mohammed <
> mdkhajaasm...@gmail.com> wrote:
> >
> > Yes we tried hive and want to migrate to spark for better
> performance. I am using paraquet tables . Still no better performance 
> while
> loading.
> >
> > Sent from my iPhone
> >
> >> On Aug 20, 2017, at 2:24 AM, Jörn Franke 
> wrote:
> >>
> >> Have you tried directly in Hive how the performance is?
> >>
> >> In which Format do you expect Hive to write? Have you made sure it
> is in this format? It could be that you use an inefficient format (e.g. 
> CSV
> + bzip2).
> >>
> >>> On 20. Aug 2017, at 03:18, KhajaAsmath Mohammed <
> mdkhajaasm...@gmail.com> wrote:
> >>>
> >>> Hi,
> >>>
> >>> I have written spark sql job on spark2.0 by using scala . It is
> just pulling the data from hive table and add extra columns , remove
> duplicates and then write it back to hive again.
> >>>
> >>> In spark ui, it is taking almost 40 minutes to write 400 go of
> data. Is there anything that I need to improve performance .
> >>>
> >>> Spark.sql.partitions is 2000 in my case with executor memory of
> 16gb and dynamic allocation enabled.
> >>>
> >>> I am doing insert overwrite on partition by
> >>> Da.write.mode(overwrite).insertinto(table)
> >>>
> >>> Any suggestions please ??
> >>>
> >>> Sent from my iPhone
> >>> 
> -
> >>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >>>
>


>>>
>> --
> 

Re: Spark hive overwrite is very very slow

2017-08-20 Thread ayan guha
Just curious - is your dataset partitioned on your partition columns?

On Mon, 21 Aug 2017 at 3:54 am, KhajaAsmath Mohammed <
mdkhajaasm...@gmail.com> wrote:

> We are in cloudera CDH5.10 and we are using spark 2 that comes with
> cloudera.
>
> Coming to second solution, creating a temporary view on dataframe but it
> didnt improve my performance too.
>
> I do remember performance was very fast when doing whole overwrite table
> without partitons but the problem started after using partitions.
>
> On Sun, Aug 20, 2017 at 12:46 PM, Jörn Franke 
> wrote:
>
>> Ah i see then I would check also directly in Hive if you have issues to
>> insert data in the Hive table. Alternatively you can try to register the
>> df as temptable and do a insert into the Hive table from the temptable
>> using Spark sql ("insert into table hivetable select * from temptable")
>>
>>
>> You seem to use Cloudera so you probably have a very outdated Hive
>> version. So you could switch to a distribution having a recent version of
>> Hive 2 with Tez+llap - these are much more performant with much more
>> features.
>>
>> Alternatively you can try to register the df as temptable and do a insert
>> into the Hive table from the temptable using Spark sql ("insert into table
>> hivetable select * from temptable")
>>
>> On 20. Aug 2017, at 18:47, KhajaAsmath Mohammed 
>> wrote:
>>
>> Hi,
>>
>> I have created hive table in impala first with storage format as parquet.
>> With dataframe from spark I am tryinig to insert into the same table with
>> below syntax.
>>
>> Table is partitoned by year,month,day
>> ds.write.mode(SaveMode.Overwrite).insertInto("db.parqut_table")
>>
>> https://issues.apache.org/jira/browse/SPARK-20049
>>
>> I saw something in the above link not sure if that is same thing in my
>> case.
>>
>> Thanks,
>> Asmath
>>
>> On Sun, Aug 20, 2017 at 11:42 AM, Jörn Franke 
>> wrote:
>>
>>> Have you made sure that the saveastable stores them as parquet?
>>>
>>> On 20. Aug 2017, at 18:07, KhajaAsmath Mohammed 
>>> wrote:
>>>
>>> we are using parquet tables, is it causing any performance issue?
>>>
>>> On Sun, Aug 20, 2017 at 9:09 AM, Jörn Franke 
>>> wrote:
>>>
 Improving the performance of Hive can be also done by switching to
 Tez+llap as an engine.
 Aside from this : you need to check what is the default format that it
 writes to Hive. One issue for the slow storing into a hive table could be
 that it writes by default to csv/gzip or csv/bzip2

 > On 20. Aug 2017, at 15:52, KhajaAsmath Mohammed <
 mdkhajaasm...@gmail.com> wrote:
 >
 > Yes we tried hive and want to migrate to spark for better
 performance. I am using paraquet tables . Still no better performance while
 loading.
 >
 > Sent from my iPhone
 >
 >> On Aug 20, 2017, at 2:24 AM, Jörn Franke 
 wrote:
 >>
 >> Have you tried directly in Hive how the performance is?
 >>
 >> In which Format do you expect Hive to write? Have you made sure it
 is in this format? It could be that you use an inefficient format (e.g. CSV
 + bzip2).
 >>
 >>> On 20. Aug 2017, at 03:18, KhajaAsmath Mohammed <
 mdkhajaasm...@gmail.com> wrote:
 >>>
 >>> Hi,
 >>>
 >>> I have written spark sql job on spark2.0 by using scala . It is
 just pulling the data from hive table and add extra columns , remove
 duplicates and then write it back to hive again.
 >>>
 >>> In spark ui, it is taking almost 40 minutes to write 400 go of
 data. Is there anything that I need to improve performance .
 >>>
 >>> Spark.sql.partitions is 2000 in my case with executor memory of
 16gb and dynamic allocation enabled.
 >>>
 >>> I am doing insert overwrite on partition by
 >>> Da.write.mode(overwrite).insertinto(table)
 >>>
 >>> Any suggestions please ??
 >>>
 >>> Sent from my iPhone
 >>>
 -
 >>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
 >>>

>>>
>>>
>>
> --
Best Regards,
Ayan Guha


Re: Spark hive overwrite is very very slow

2017-08-20 Thread KhajaAsmath Mohammed
We are in cloudera CDH5.10 and we are using spark 2 that comes with
cloudera.

Coming to second solution, creating a temporary view on dataframe but it
didnt improve my performance too.

I do remember performance was very fast when doing whole overwrite table
without partitons but the problem started after using partitions.

On Sun, Aug 20, 2017 at 12:46 PM, Jörn Franke  wrote:

> Ah i see then I would check also directly in Hive if you have issues to
> insert data in the Hive table. Alternatively you can try to register the
> df as temptable and do a insert into the Hive table from the temptable
> using Spark sql ("insert into table hivetable select * from temptable")
>
>
> You seem to use Cloudera so you probably have a very outdated Hive
> version. So you could switch to a distribution having a recent version of
> Hive 2 with Tez+llap - these are much more performant with much more
> features.
>
> Alternatively you can try to register the df as temptable and do a insert
> into the Hive table from the temptable using Spark sql ("insert into table
> hivetable select * from temptable")
>
> On 20. Aug 2017, at 18:47, KhajaAsmath Mohammed 
> wrote:
>
> Hi,
>
> I have created hive table in impala first with storage format as parquet.
> With dataframe from spark I am tryinig to insert into the same table with
> below syntax.
>
> Table is partitoned by year,month,day
> ds.write.mode(SaveMode.Overwrite).insertInto("db.parqut_table")
>
> https://issues.apache.org/jira/browse/SPARK-20049
>
> I saw something in the above link not sure if that is same thing in my
> case.
>
> Thanks,
> Asmath
>
> On Sun, Aug 20, 2017 at 11:42 AM, Jörn Franke 
> wrote:
>
>> Have you made sure that the saveastable stores them as parquet?
>>
>> On 20. Aug 2017, at 18:07, KhajaAsmath Mohammed 
>> wrote:
>>
>> we are using parquet tables, is it causing any performance issue?
>>
>> On Sun, Aug 20, 2017 at 9:09 AM, Jörn Franke 
>> wrote:
>>
>>> Improving the performance of Hive can be also done by switching to
>>> Tez+llap as an engine.
>>> Aside from this : you need to check what is the default format that it
>>> writes to Hive. One issue for the slow storing into a hive table could be
>>> that it writes by default to csv/gzip or csv/bzip2
>>>
>>> > On 20. Aug 2017, at 15:52, KhajaAsmath Mohammed <
>>> mdkhajaasm...@gmail.com> wrote:
>>> >
>>> > Yes we tried hive and want to migrate to spark for better performance.
>>> I am using paraquet tables . Still no better performance while loading.
>>> >
>>> > Sent from my iPhone
>>> >
>>> >> On Aug 20, 2017, at 2:24 AM, Jörn Franke 
>>> wrote:
>>> >>
>>> >> Have you tried directly in Hive how the performance is?
>>> >>
>>> >> In which Format do you expect Hive to write? Have you made sure it is
>>> in this format? It could be that you use an inefficient format (e.g. CSV +
>>> bzip2).
>>> >>
>>> >>> On 20. Aug 2017, at 03:18, KhajaAsmath Mohammed <
>>> mdkhajaasm...@gmail.com> wrote:
>>> >>>
>>> >>> Hi,
>>> >>>
>>> >>> I have written spark sql job on spark2.0 by using scala . It is just
>>> pulling the data from hive table and add extra columns , remove duplicates
>>> and then write it back to hive again.
>>> >>>
>>> >>> In spark ui, it is taking almost 40 minutes to write 400 go of data.
>>> Is there anything that I need to improve performance .
>>> >>>
>>> >>> Spark.sql.partitions is 2000 in my case with executor memory of 16gb
>>> and dynamic allocation enabled.
>>> >>>
>>> >>> I am doing insert overwrite on partition by
>>> >>> Da.write.mode(overwrite).insertinto(table)
>>> >>>
>>> >>> Any suggestions please ??
>>> >>>
>>> >>> Sent from my iPhone
>>> >>> 
>>> -
>>> >>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>> >>>
>>>
>>
>>
>


Re: Spark hive overwrite is very very slow

2017-08-20 Thread Jörn Franke
Ah i see then I would check also directly in Hive if you have issues to insert 
data in the Hive table. Alternatively you can try to register the df as 
temptable and do a insert into the Hive table from the temptable using Spark 
sql ("insert into table hivetable select * from temptable")


You seem to use Cloudera so you probably have a very outdated Hive version. So 
you could switch to a distribution having a recent version of Hive 2 with 
Tez+llap - these are much more performant with much more features.

Alternatively you can try to register the df as temptable and do a insert into 
the Hive table from the temptable using Spark sql ("insert into table hivetable 
select * from temptable")

> On 20. Aug 2017, at 18:47, KhajaAsmath Mohammed  
> wrote:
> 
> Hi,
> 
> I have created hive table in impala first with storage format as parquet. 
> With dataframe from spark I am tryinig to insert into the same table with 
> below syntax.
> 
> Table is partitoned by year,month,day 
> ds.write.mode(SaveMode.Overwrite).insertInto("db.parqut_table")
> 
> https://issues.apache.org/jira/browse/SPARK-20049
> 
> I saw something in the above link not sure if that is same thing in my case.
> 
> Thanks,
> Asmath
> 
>> On Sun, Aug 20, 2017 at 11:42 AM, Jörn Franke  wrote:
>> Have you made sure that the saveastable stores them as parquet?
>> 
>>> On 20. Aug 2017, at 18:07, KhajaAsmath Mohammed  
>>> wrote:
>>> 
>>> we are using parquet tables, is it causing any performance issue?
>>> 
 On Sun, Aug 20, 2017 at 9:09 AM, Jörn Franke  wrote:
 Improving the performance of Hive can be also done by switching to 
 Tez+llap as an engine.
 Aside from this : you need to check what is the default format that it 
 writes to Hive. One issue for the slow storing into a hive table could be 
 that it writes by default to csv/gzip or csv/bzip2
 
 > On 20. Aug 2017, at 15:52, KhajaAsmath Mohammed 
 >  wrote:
 >
 > Yes we tried hive and want to migrate to spark for better performance. I 
 > am using paraquet tables . Still no better performance while loading.
 >
 > Sent from my iPhone
 >
 >> On Aug 20, 2017, at 2:24 AM, Jörn Franke  wrote:
 >>
 >> Have you tried directly in Hive how the performance is?
 >>
 >> In which Format do you expect Hive to write? Have you made sure it is 
 >> in this format? It could be that you use an inefficient format (e.g. 
 >> CSV + bzip2).
 >>
 >>> On 20. Aug 2017, at 03:18, KhajaAsmath Mohammed 
 >>>  wrote:
 >>>
 >>> Hi,
 >>>
 >>> I have written spark sql job on spark2.0 by using scala . It is just 
 >>> pulling the data from hive table and add extra columns , remove 
 >>> duplicates and then write it back to hive again.
 >>>
 >>> In spark ui, it is taking almost 40 minutes to write 400 go of data. 
 >>> Is there anything that I need to improve performance .
 >>>
 >>> Spark.sql.partitions is 2000 in my case with executor memory of 16gb 
 >>> and dynamic allocation enabled.
 >>>
 >>> I am doing insert overwrite on partition by
 >>> Da.write.mode(overwrite).insertinto(table)
 >>>
 >>> Any suggestions please ??
 >>>
 >>> Sent from my iPhone
 >>> -
 >>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
 >>>
>>> 
> 


Re: Spark hive overwrite is very very slow

2017-08-20 Thread KhajaAsmath Mohammed
Hi,

I have created hive table in impala first with storage format as parquet.
With dataframe from spark I am tryinig to insert into the same table with
below syntax.

Table is partitoned by year,month,day
ds.write.mode(SaveMode.Overwrite).insertInto("db.parqut_table")

https://issues.apache.org/jira/browse/SPARK-20049

I saw something in the above link not sure if that is same thing in my case.

Thanks,
Asmath

On Sun, Aug 20, 2017 at 11:42 AM, Jörn Franke  wrote:

> Have you made sure that the saveastable stores them as parquet?
>
> On 20. Aug 2017, at 18:07, KhajaAsmath Mohammed 
> wrote:
>
> we are using parquet tables, is it causing any performance issue?
>
> On Sun, Aug 20, 2017 at 9:09 AM, Jörn Franke  wrote:
>
>> Improving the performance of Hive can be also done by switching to
>> Tez+llap as an engine.
>> Aside from this : you need to check what is the default format that it
>> writes to Hive. One issue for the slow storing into a hive table could be
>> that it writes by default to csv/gzip or csv/bzip2
>>
>> > On 20. Aug 2017, at 15:52, KhajaAsmath Mohammed <
>> mdkhajaasm...@gmail.com> wrote:
>> >
>> > Yes we tried hive and want to migrate to spark for better performance.
>> I am using paraquet tables . Still no better performance while loading.
>> >
>> > Sent from my iPhone
>> >
>> >> On Aug 20, 2017, at 2:24 AM, Jörn Franke  wrote:
>> >>
>> >> Have you tried directly in Hive how the performance is?
>> >>
>> >> In which Format do you expect Hive to write? Have you made sure it is
>> in this format? It could be that you use an inefficient format (e.g. CSV +
>> bzip2).
>> >>
>> >>> On 20. Aug 2017, at 03:18, KhajaAsmath Mohammed <
>> mdkhajaasm...@gmail.com> wrote:
>> >>>
>> >>> Hi,
>> >>>
>> >>> I have written spark sql job on spark2.0 by using scala . It is just
>> pulling the data from hive table and add extra columns , remove duplicates
>> and then write it back to hive again.
>> >>>
>> >>> In spark ui, it is taking almost 40 minutes to write 400 go of data.
>> Is there anything that I need to improve performance .
>> >>>
>> >>> Spark.sql.partitions is 2000 in my case with executor memory of 16gb
>> and dynamic allocation enabled.
>> >>>
>> >>> I am doing insert overwrite on partition by
>> >>> Da.write.mode(overwrite).insertinto(table)
>> >>>
>> >>> Any suggestions please ??
>> >>>
>> >>> Sent from my iPhone
>> >>> -
>> >>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >>>
>>
>
>


Re: Spark hive overwrite is very very slow

2017-08-20 Thread Jörn Franke
Have you made sure that the saveastable stores them as parquet?

> On 20. Aug 2017, at 18:07, KhajaAsmath Mohammed  
> wrote:
> 
> we are using parquet tables, is it causing any performance issue?
> 
>> On Sun, Aug 20, 2017 at 9:09 AM, Jörn Franke  wrote:
>> Improving the performance of Hive can be also done by switching to Tez+llap 
>> as an engine.
>> Aside from this : you need to check what is the default format that it 
>> writes to Hive. One issue for the slow storing into a hive table could be 
>> that it writes by default to csv/gzip or csv/bzip2
>> 
>> > On 20. Aug 2017, at 15:52, KhajaAsmath Mohammed  
>> > wrote:
>> >
>> > Yes we tried hive and want to migrate to spark for better performance. I 
>> > am using paraquet tables . Still no better performance while loading.
>> >
>> > Sent from my iPhone
>> >
>> >> On Aug 20, 2017, at 2:24 AM, Jörn Franke  wrote:
>> >>
>> >> Have you tried directly in Hive how the performance is?
>> >>
>> >> In which Format do you expect Hive to write? Have you made sure it is in 
>> >> this format? It could be that you use an inefficient format (e.g. CSV + 
>> >> bzip2).
>> >>
>> >>> On 20. Aug 2017, at 03:18, KhajaAsmath Mohammed 
>> >>>  wrote:
>> >>>
>> >>> Hi,
>> >>>
>> >>> I have written spark sql job on spark2.0 by using scala . It is just 
>> >>> pulling the data from hive table and add extra columns , remove 
>> >>> duplicates and then write it back to hive again.
>> >>>
>> >>> In spark ui, it is taking almost 40 minutes to write 400 go of data. Is 
>> >>> there anything that I need to improve performance .
>> >>>
>> >>> Spark.sql.partitions is 2000 in my case with executor memory of 16gb and 
>> >>> dynamic allocation enabled.
>> >>>
>> >>> I am doing insert overwrite on partition by
>> >>> Da.write.mode(overwrite).insertinto(table)
>> >>>
>> >>> Any suggestions please ??
>> >>>
>> >>> Sent from my iPhone
>> >>> -
>> >>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >>>
> 


Question on how to get appended data from structured streaming

2017-08-20 Thread Yanpeng Lin
Hello,

I am new to Spark.
It would be appreciated if anyone could help me understand how to get
appended data from structured streaming. According to the document
,
data stream could be treated as new rows appended to unbounded table. I
want to know besides writing out data to external storage to get appended
data only at every time, is there any other way to get appended data? like
from memory directly.

Here is my case. I had a Kafka source keeping publish data to Spark with
`test` topic.

val source = spark.readStream.format("kafka")
 .option("kafka.bootstrap.servers",
"broker:9092")
 .option("subscribe", "test")\
 .load()

I tried that write stream with format `memory` like the following:

val query = source.writeStream.format("memory")
  .trigger(ProcessingTime("3 seconds"))
  .queryName("tests").
outputMode(OutputMode.Append).start()
spark.sql("select topic, value from tests")
The result table `tests` contains all data from the beginning of stream.
like

Trigger Time, Topic, Value
t1 test,   1
t1 test,   2
t2 test,   3
t3 test,   4

By appended data I mean only the delta data after each trigger. For
example, after trigger time t1, rows of value 1 and 2 are newly appended.
After trigger time t2, row of value 3 will be treated as newly appended.
And after t3, row of value 4 could be fetched as newly appended.
I understand each appended data could be processed using `ForeachWriter`,
but if I want to fetch all newly appended data after any trigger time,
is there any way to do that directly from dataframe?

Thanks!
Yanpeng


Re: Spark hive overwrite is very very slow

2017-08-20 Thread KhajaAsmath Mohammed
we are using parquet tables, is it causing any performance issue?

On Sun, Aug 20, 2017 at 9:09 AM, Jörn Franke  wrote:

> Improving the performance of Hive can be also done by switching to
> Tez+llap as an engine.
> Aside from this : you need to check what is the default format that it
> writes to Hive. One issue for the slow storing into a hive table could be
> that it writes by default to csv/gzip or csv/bzip2
>
> > On 20. Aug 2017, at 15:52, KhajaAsmath Mohammed 
> wrote:
> >
> > Yes we tried hive and want to migrate to spark for better performance. I
> am using paraquet tables . Still no better performance while loading.
> >
> > Sent from my iPhone
> >
> >> On Aug 20, 2017, at 2:24 AM, Jörn Franke  wrote:
> >>
> >> Have you tried directly in Hive how the performance is?
> >>
> >> In which Format do you expect Hive to write? Have you made sure it is
> in this format? It could be that you use an inefficient format (e.g. CSV +
> bzip2).
> >>
> >>> On 20. Aug 2017, at 03:18, KhajaAsmath Mohammed <
> mdkhajaasm...@gmail.com> wrote:
> >>>
> >>> Hi,
> >>>
> >>> I have written spark sql job on spark2.0 by using scala . It is just
> pulling the data from hive table and add extra columns , remove duplicates
> and then write it back to hive again.
> >>>
> >>> In spark ui, it is taking almost 40 minutes to write 400 go of data.
> Is there anything that I need to improve performance .
> >>>
> >>> Spark.sql.partitions is 2000 in my case with executor memory of 16gb
> and dynamic allocation enabled.
> >>>
> >>> I am doing insert overwrite on partition by
> >>> Da.write.mode(overwrite).insertinto(table)
> >>>
> >>> Any suggestions please ??
> >>>
> >>> Sent from my iPhone
> >>> -
> >>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >>>
>


Re: [BlockMatrix] multiply is an action or a transformation ?

2017-08-20 Thread Yanbo Liang
BlockMatrix.multiply will return another BlockMatrix. Inside this function,
there are lots of steps of RDD operations, but most of them are
transformation. If you don't trigger to obtain the blocks(which is an RDD
of [(Int, Int, Matrix)] of the result BlockMatrix, the job will not run.

Thanks
Yanbo

On Sun, Aug 13, 2017 at 10:30 PM, Jose Francisco Saray Villamizar <
jsa...@gmail.com> wrote:

> Hi Everyone,
>
> Sorry if the question can be simple, or confusing, but I have not see
> anywhere in documentation
> the anwser:
>
> Is multiply method in BlockMatrix a transformation or an action.
> I mean, in order that the multiplication is effectively done it is enough
> with calling :
>
> m1.multiply(m2),
>
> Or do I have to make something like m1.multiply(m2).count().
>
> Thanks.
>
> --
> --
> Buen dia, alegria !!
> José Francisco Saray Villamizar
> cel +33 6 13710693 <+33%206%2013%2071%2006%2093>
> Lyon, France
>
>


Re: Spark hive overwrite is very very slow

2017-08-20 Thread Jörn Franke
Improving the performance of Hive can be also done by switching to Tez+llap as 
an engine.
Aside from this : you need to check what is the default format that it writes 
to Hive. One issue for the slow storing into a hive table could be that it 
writes by default to csv/gzip or csv/bzip2

> On 20. Aug 2017, at 15:52, KhajaAsmath Mohammed  
> wrote:
> 
> Yes we tried hive and want to migrate to spark for better performance. I am 
> using paraquet tables . Still no better performance while loading. 
> 
> Sent from my iPhone
> 
>> On Aug 20, 2017, at 2:24 AM, Jörn Franke  wrote:
>> 
>> Have you tried directly in Hive how the performance is? 
>> 
>> In which Format do you expect Hive to write? Have you made sure it is in 
>> this format? It could be that you use an inefficient format (e.g. CSV + 
>> bzip2).
>> 
>>> On 20. Aug 2017, at 03:18, KhajaAsmath Mohammed  
>>> wrote:
>>> 
>>> Hi,
>>> 
>>> I have written spark sql job on spark2.0 by using scala . It is just 
>>> pulling the data from hive table and add extra columns , remove duplicates 
>>> and then write it back to hive again.
>>> 
>>> In spark ui, it is taking almost 40 minutes to write 400 go of data. Is 
>>> there anything that I need to improve performance .
>>> 
>>> Spark.sql.partitions is 2000 in my case with executor memory of 16gb and 
>>> dynamic allocation enabled.
>>> 
>>> I am doing insert overwrite on partition by
>>> Da.write.mode(overwrite).insertinto(table)
>>> 
>>> Any suggestions please ??
>>> 
>>> Sent from my iPhone
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark hive overwrite is very very slow

2017-08-20 Thread KhajaAsmath Mohammed
Yes we tried hive and want to migrate to spark for better performance. I am 
using paraquet tables . Still no better performance while loading. 

Sent from my iPhone

> On Aug 20, 2017, at 2:24 AM, Jörn Franke  wrote:
> 
> Have you tried directly in Hive how the performance is? 
> 
> In which Format do you expect Hive to write? Have you made sure it is in this 
> format? It could be that you use an inefficient format (e.g. CSV + bzip2).
> 
>> On 20. Aug 2017, at 03:18, KhajaAsmath Mohammed  
>> wrote:
>> 
>> Hi,
>> 
>> I have written spark sql job on spark2.0 by using scala . It is just pulling 
>> the data from hive table and add extra columns , remove duplicates and then 
>> write it back to hive again.
>> 
>> In spark ui, it is taking almost 40 minutes to write 400 go of data. Is 
>> there anything that I need to improve performance .
>> 
>> Spark.sql.partitions is 2000 in my case with executor memory of 16gb and 
>> dynamic allocation enabled.
>> 
>> I am doing insert overwrite on partition by
>> Da.write.mode(overwrite).insertinto(table)
>> 
>> Any suggestions please ??
>> 
>> Sent from my iPhone
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Huber regression in PySpark?

2017-08-20 Thread Yanbo Liang
Hi Jeff,

Actually I have one implementation of robust regression with huber loss for
a long time (https://github.com/apache/spark/pull/14326). This is a fairly
straightforward porting for scikit-learn HuberRegressor.
The PR making huber regression as a separate Estimator, and we found it can
be merged into LinearRegression.
I will update this PR ASAP, and I'm looking forward your reviews and
comments.
After the Scala implementation is merged, it's very easy to add
corresponding PySpark API, then you can use it to train huber regression
model in the distributed environment.

Thanks
Yanbo

On Sun, Aug 20, 2017 at 3:19 PM, Jeff Gates  wrote:

> Hi guys,
>
> Is there huber regression in PySpark? We are using sklearn HuberRegressor (
> http://scikit-learn.org/stable/modules/generated/sklearn.
> linear_model.HuberRegressor.html) to train our model, but with some
> bottleneck in single node.
> If no, is there any obstacle to implement it in PySpark?
>
> Jeff
>


Re: Does Spark SQL uses Calcite?

2017-08-20 Thread kant kodali
Hi Jules,

I am looking to connect to Spark via JDBC so I can run Spark SQL queries
via JDBC but not use SPARK SQL to connect to other JDBC sources.

Thanks!

On Sat, Aug 19, 2017 at 5:54 PM, Jules Damji  wrote:

> Try this link to see how you may connect  https://docs.databricks.com/
> spark/latest/data-sources/sql-databases.html
>
> Cheers
> Jules
>
> Sent from my iPhone
> Pardon the dumb thumb typos :)
>
> On Aug 19, 2017, at 5:27 PM, kant kodali  wrote:
>
> Hi Russell,
>
> I went through this https://jaceklaskowski.gitbooks.io/mastering-apache-
> spark/spark-sql-thrift-server.html and I am still a bit confused on what
> hive is doing in here ? Is there any example I can look at on how to talk
> to Spark using Spark SQL JDBC driver alone and not hive ?
>
> Thanks,
> kant
>
>
>
>
>
> On Sat, Aug 12, 2017 at 6:07 AM, Russell Spitzer <
> russell.spit...@gmail.com> wrote:
>
>> You don't have to go through hive. It's just spark sql. The application
>> is just a forked hive thrift server.
>>
>> On Fri, Aug 11, 2017 at 8:53 PM kant kodali  wrote:
>>
>>> @Ryan it looks like if I enable thrift server I need to go through hive.
>>> I was talking more about having JDBC connector for Spark SQL itself other
>>> words not going through hive.
>>>
>>> On Fri, Aug 11, 2017 at 6:50 PM, kant kodali  wrote:
>>>
 @Ryan Does it work with Spark SQL 2.1.1?

 On Fri, Aug 11, 2017 at 12:53 AM, Ryan  wrote:

> the thrift server is a jdbc server, Kanth
>
> On Fri, Aug 11, 2017 at 2:51 PM,  wrote:
>
>> I also wonder why there isn't a jdbc connector for spark sql?
>>
>> Sent from my iPhone
>>
>> On Aug 10, 2017, at 2:45 PM, Jules Damji  wrote:
>>
>> Yes, it's more used in Hive than Spark
>>
>> Sent from my iPhone
>> Pardon the dumb thumb typos :)
>>
>> On Aug 10, 2017, at 2:24 PM, Sathish Kumaran Vairavelu <
>> vsathishkuma...@gmail.com> wrote:
>>
>> I think it is for hive dependency.
>> On Thu, Aug 10, 2017 at 4:14 PM kant kodali 
>> wrote:
>>
>>> Since I see a calcite dependency in Spark I wonder where Calcite is
>>> being used?
>>>
>>> On Thu, Aug 10, 2017 at 1:30 PM, Sathish Kumaran Vairavelu <
>>> vsathishkuma...@gmail.com> wrote:
>>>
 Spark SQL doesn't use Calcite

 On Thu, Aug 10, 2017 at 3:14 PM kant kodali 
 wrote:

> Hi All,
>
> Does Spark SQL uses Calcite? If so, what for? I thought the Spark
> SQL has catalyst which would generate its own logical plans, physical 
> plans
> and other optimizations.
>
> Thanks,
> Kant
>

>>>
>

>>>
>


Spark billing on shared Clusters

2017-08-20 Thread Jorge Machado
Hi everyone, 

I was wondering how it is possible to do Spark / Yarn  accounting on a shared 
cluster based on resource usage. I found out that is no way to do that. 

So I develop hbilling to deal with this. Is someone interested on a quick demo 
or so ? 

More info under: www.hbilling.io  

Example: 

Dom is an enterprise architect on a company SuperX. SuperX has a new hadoop 
cluster up and running with four departments sharing the hardware. Now Dom 
want's to charge each department by cluster usage. After some research he 
founds out that hbilling is the first Software that addresses exact this 
problem.



Best Regards

Jorge Machado
www.jmachado.me 
jo...@jmachado.me 







Re: Spark hive overwrite is very very slow

2017-08-20 Thread Jörn Franke
Have you tried directly in Hive how the performance is? 

In which Format do you expect Hive to write? Have you made sure it is in this 
format? It could be that you use an inefficient format (e.g. CSV + bzip2).

> On 20. Aug 2017, at 03:18, KhajaAsmath Mohammed  
> wrote:
> 
> Hi,
> 
> I have written spark sql job on spark2.0 by using scala . It is just pulling 
> the data from hive table and add extra columns , remove duplicates and then 
> write it back to hive again.
> 
> In spark ui, it is taking almost 40 minutes to write 400 go of data. Is there 
> anything that I need to improve performance .
> 
> Spark.sql.partitions is 2000 in my case with executor memory of 16gb and 
> dynamic allocation enabled.
> 
> I am doing insert overwrite on partition by
> Da.write.mode(overwrite).insertinto(table)
> 
> Any suggestions please ??
> 
> Sent from my iPhone
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Huber regression in PySpark?

2017-08-20 Thread Jeff Gates
Hi guys,

Is there huber regression in PySpark? We are using sklearn HuberRegressor (
http://scikit-learn.org/stable/modules/generated/sklearn.linear_model.
HuberRegressor.html) to train our model, but with some bottleneck in single
node.
If no, is there any obstacle to implement it in PySpark?

Jeff