Re: BUG :: UI Spark

2024-05-26 Thread Mich Talebzadeh
sorry i thought i gave an explanation

The issue you are encountering with incorrect record numbers in the
"ShuffleWrite Size/Records" column in the Spark DAG UI when data is read
from cache/persist is a known limitation. This discrepancy arises due to
the way Spark handles and reports shuffle data when caching is involved.

Mich Talebzadeh,

Technologist | Architect | Data Engineer  | Generative AI | FinCrime

London
United Kingdom


   view my Linkedin profile


 https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: The information provided is correct to the best of my knowledge
but of course cannot be guaranteed . It is essential to note that, as with
any advice, quote "one test result is worth one-thousand expert opinions
(Werner Von Braun)".


Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Sun, 26 May 2024 at 21:16, Prem Sahoo  wrote:

> Can anyone please assist me ?
>
> On Fri, May 24, 2024 at 12:29 AM Prem Sahoo  wrote:
>
>> Does anyone have a clue ?
>>
>> On Thu, May 23, 2024 at 11:40 AM Prem Sahoo  wrote:
>>
>>> Hello Team,
>>> in spark DAG UI , we have Stages tab. Once you click on each stage you
>>> can view the tasks.
>>>
>>> In each task we have a column "ShuffleWrite Size/Records " that column
>>> prints wrong data when it gets the data from cache/persist . it
>>> typically will show the wrong record number though the data size is correct
>>> for e.g  3.2G/ 7400 which is wrong .
>>>
>>> please advise.
>>>
>>


Re: BUG :: UI Spark

2024-05-26 Thread Mich Talebzadeh
Just to further clarify that the Shuffle Write Size/Records column in
the Spark UI can be misleading when working with cached/persisted data
because it reflects the shuffled data size and record count, not the
entire cached/persisted data., So it is fair to say that this is a
limitation of the UI's display, not necessarily a bug in the Spark
framework itself.

HTH

Mich Talebzadeh,

Technologist | Architect | Data Engineer  | Generative AI | FinCrime

London
United Kingdom


   view my Linkedin profile


 https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner Von Braun)".

Mich Talebzadeh,

Technologist | Architect | Data Engineer  | Generative AI | FinCrime

London
United Kingdom


   view my Linkedin profile


 https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner Von Braun)".



On Sun, 26 May 2024 at 16:45, Mich Talebzadeh  wrote:
>
> Yep, the Spark UI's Shuffle Write Size/Records" column can sometimes show 
> incorrect record counts when data is retrieved from cache or persisted data. 
> This happens because the record count reflects the number of records written 
> to disk for shuffling, and not the actual number of records in the cached or 
> persisted data itself. Add to it, because of lazy evaluation:, Spark may only 
> materialize a portion of the cached or persisted data when a task needs it. 
> The "Shuffle Write Size/Records" might only reflect the materialized portion, 
> not the total number of records in the cache/persistence. While the "Shuffle 
> Write Size/Records" might be inaccurate for cached/persisted data, the 
> "Shuffle Read Size/Records" column can be more reliable. This metric shows 
> the number of records read from shuffle by the following stage, which should 
> be closer to the actual number of records processed.
>
> HTH
>
> Mich Talebzadeh,
>
> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
>
> London
> United Kingdom
>
>
>view my Linkedin profile
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> Disclaimer: The information provided is correct to the best of my knowledge 
> but of course cannot be guaranteed . It is essential to note that, as with 
> any advice, quote "one test result is worth one-thousand expert opinions 
> (Werner Von Braun)".
>
>
>
> On Thu, 23 May 2024 at 17:45, Prem Sahoo  wrote:
>>
>> Hello Team,
>> in spark DAG UI , we have Stages tab. Once you click on each stage you can 
>> view the tasks.
>>
>> In each task we have a column "ShuffleWrite Size/Records " that column 
>> prints wrong data when it gets the data from cache/persist . it typically 
>> will show the wrong record number though the data size is correct for e.g  
>> 3.2G/ 7400 which is wrong .
>>
>> please advise.

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: BUG :: UI Spark

2024-05-26 Thread Mich Talebzadeh
Yep, the Spark UI's Shuffle Write Size/Records" column can sometimes show
incorrect record counts *when data is retrieved from cache or persisted
data*. This happens because the record count reflects the number of records
written to disk for shuffling, and not the actual number of records in the
cached or persisted data itself. Add to it, because of lazy evaluation:,
Spark may only materialize a portion of the cached or persisted data when a
task needs it. The "Shuffle Write Size/Records" might only reflect the
materialized portion, not the total number of records in the
cache/persistence. While the "Shuffle Write Size/Records" might be
inaccurate for cached/persisted data, the "Shuffle Read Size/Records"
column can be more reliable. This metric shows the number of records read
from shuffle by the following stage, which should be closer to the actual
number of records processed.

HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Thu, 23 May 2024 at 17:45, Prem Sahoo  wrote:

> Hello Team,
> in spark DAG UI , we have Stages tab. Once you click on each stage you can
> view the tasks.
>
> In each task we have a column "ShuffleWrite Size/Records " that column
> prints wrong data when it gets the data from cache/persist . it
> typically will show the wrong record number though the data size is correct
> for e.g  3.2G/ 7400 which is wrong .
>
> please advise.
>


Re: BUG :: UI Spark

2024-05-26 Thread Prem Sahoo
Can anyone please assist me ?

On Fri, May 24, 2024 at 12:29 AM Prem Sahoo  wrote:

> Does anyone have a clue ?
>
> On Thu, May 23, 2024 at 11:40 AM Prem Sahoo  wrote:
>
>> Hello Team,
>> in spark DAG UI , we have Stages tab. Once you click on each stage you
>> can view the tasks.
>>
>> In each task we have a column "ShuffleWrite Size/Records " that column
>> prints wrong data when it gets the data from cache/persist . it
>> typically will show the wrong record number though the data size is correct
>> for e.g  3.2G/ 7400 which is wrong .
>>
>> please advise.
>>
>


Re: BUG :: UI Spark

2024-05-23 Thread Prem Sahoo
Does anyone have a clue ?

On Thu, May 23, 2024 at 11:40 AM Prem Sahoo  wrote:

> Hello Team,
> in spark DAG UI , we have Stages tab. Once you click on each stage you can
> view the tasks.
>
> In each task we have a column "ShuffleWrite Size/Records " that column
> prints wrong data when it gets the data from cache/persist . it
> typically will show the wrong record number though the data size is correct
> for e.g  3.2G/ 7400 which is wrong .
>
> please advise.
>


BUG :: UI Spark

2024-05-23 Thread Prem Sahoo
Hello Team,
in spark DAG UI , we have Stages tab. Once you click on each stage you can
view the tasks.

In each task we have a column "ShuffleWrite Size/Records " that column
prints wrong data when it gets the data from cache/persist . it
typically will show the wrong record number though the data size is correct
for e.g  3.2G/ 7400 which is wrong .

please advise.


Probable Bug in Spark 3.3.0

2023-08-20 Thread Dipayan Dev
Hi Dev Team,

https://issues.apache.org/jira/browse/SPARK-44884


We have recently upgraded to Spark 3.3.0 in our Production Dataproc.
We have a lot of downstream application that relies on the SUCCESS file.

Please let me know if this is a bug or I need to any additional
configuration to fix this in Spark 3.3.0.

Happy to contribute if you suggest.

Thanks
Dipayan


-- 



With Best Regards,

Dipayan Dev
Author of *Deep Learning with Hadoop
<https://www.amazon.com/Deep-Learning-Hadoop-Dipayan-Dev/dp/1787124762>*
M.Tech (AI), IISc, Bangalore


[Bug] dataframe.show() prints incorrect numbers

2023-01-04 Thread Belch, Yaakov (Agoda)
What is the right process for reporting this bug in spark?

SUMMARY: For some small numbers, dataframe.show() chops off the exponential 
field showing wrong numbers instead.

HOW TO REPRODUCE THE BUG: This is a one-liner on the command line:
echo 'Seq((1.8181682328650097E-4)).toDF("x").show()' | docker run -i 
apache/spark /opt/spark/bin/spark-shell

WHAT IS THE OUTPUT:
scala> Seq((1.8181682328650097E-4)).toDF("x").show()
++
|   x|
++
|1.818168232865009...|
++

WHAT SHOULD BE THE CORRECT OUTPUT:
scala> Seq((1.8181682328650097E-4)).toDF("x").show()
++
|   x|
++
|1.818168232865009E-4|
+--------+


Please advise how to report this bug.

Yaakov Belch






This message is confidential and is for the sole use of the intended 
recipient(s). It may also be privileged or otherwise protected by copyright or 
other legal rules. If you have received it by mistake please let us know by 
reply email and delete it from your system. It is prohibited to copy this 
message or disclose its content to anyone. Any confidentiality or privilege is 
not waived or lost by any mistaken delivery or unauthorized disclosure of the 
message. All messages sent to and from Agoda may be monitored to ensure 
compliance with company policies, to protect the company's interests and to 
remove potential malware. Electronic messages may be intercepted, amended, lost 
or deleted, or contain viruses.


Re: Probable bug in async commit of Kafka offset in DirectKafkaInputDStream

2022-04-08 Thread Dongjoon Hyun
Thank you, Souvik.

Dongjoon.

On Thu, Apr 7, 2022 at 10:59 AM Paul, Souvik  wrote:

> Hi Dongjoon,
>
>
>
> Raised the JIRA at https://issues.apache.org/jira/browse/SPARK-38824
>
>
>
> Thanks,
>
> Souvik
>
>
>
> *From:* Dongjoon Hyun 
> *Sent:* Wednesday, March 30, 2022 4:44 AM
> *To:* Paul, Souvik [Engineering] 
> *Cc:* dev@spark.apache.org
> *Subject:* Re: Probable bug in async commit of Kafka offset in
> DirectKafkaInputDStream
>
>
>
> Hi, Souvik
>
>
>
> Could you file a JIRA issue for that?
>
>
>
> Thanks,
>
> Dongjoon
>
>
>
> On Thu, Mar 24, 2022 at 11:08 AM Paul, Souvik  wrote:
>
> Hi Dev,
>
> I added a few debug statements at the following lines and found few issues.
>
> 1. At line 254 of override def compute(validTime: Time):
> Option[KafkaRDD[K, V]] in DirectKafkaInputDStream.scala:
>
> System.out.print("Called commitAll at time " + validTime + " " +
> commitQueue.toArray.mkString("Array(", ", ", ")") + "\n")
>
> 2. At line 454 of test("offset recovery from kafka") in
> DirectKafkaStreamSuite.scala:
>
> print("Called commitAsync at " + time +  " " +
> offsets.mkString("Array(", ", ", ")") + "\n")
>
>
> This shows that the commitAll call is not properly handled. Since, it is
> called inside compute function. There is a chance that during last RDD, we
> will miss the last offset. In the current example we have missed the offset
> commit of range 8->10.
>
> Can someone confirm if this is a design choice or a bug?
>
> The current log is something like this.
>
> Called commitAll at time 1645548063100 ms Array()
> Called commitAll at time 1645548063200 ms Array()
> Called commitAll at time 1645548063300 ms Array()
> Called commitAll at time 1645548063400 ms Array()
> Called commitAll at time 1645548063500 ms Array()
> Called commitAll at time 1645548063600 ms Array()
> Called commitAll at time 1645548063700 ms Array()
> Called commitAll at time 1645548063800 ms Array()
> Called commitAll at time 1645548063900 ms Array()
> Called commitAll at time 1645548064000 ms Array()
> Called commitAll at time 1645548064100 ms Array()
> Called commitAll at time 1645548064200 ms Array()
> Called commitAsync at 1645548063100 ms Array(OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [0 -> 4]))
> Called commitAsync at 1645548063200 ms Array(OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAll at time 1645548064300 ms Array(OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [0 -> 4]), OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAsync at 1645548063300 ms Array(OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAsync at 1645548063400 ms Array(OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAsync at 1645548063500 ms Array(OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAll at time 1645548064400 ms Array(OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAsync at 1645548063600 ms Array(OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAsync at 1645548063700 ms Array(OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAsync at 1645548063800 ms Array(OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAsync at 1645548063900 ms Array(OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAll at time 1645548064500 ms Array(OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAsync at 1645548064000 ms Array(OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAsync at 1645548064100 ms Array(OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, 

RE: Probable bug in async commit of Kafka offset in DirectKafkaInputDStream

2022-04-07 Thread Paul, Souvik
Hi Dongjoon,

Raised the JIRA at https://issues.apache.org/jira/browse/SPARK-38824

Thanks,
Souvik

From: Dongjoon Hyun 
Sent: Wednesday, March 30, 2022 4:44 AM
To: Paul, Souvik [Engineering] 
Cc: dev@spark.apache.org
Subject: Re: Probable bug in async commit of Kafka offset in 
DirectKafkaInputDStream

Hi, Souvik

Could you file a JIRA issue for that?

Thanks,
Dongjoon

On Thu, Mar 24, 2022 at 11:08 AM Paul, Souvik 
mailto:souvik.p...@gs.com>> wrote:
Hi Dev,

I added a few debug statements at the following lines and found few issues.

1. At line 254 of override def compute(validTime: Time): Option[KafkaRDD[K, V]] 
in DirectKafkaInputDStream.scala:

System.out.print("Called commitAll at time " + validTime + " " +
commitQueue.toArray.mkString("Array(", ", ", ")") + "\n")

2. At line 454 of test("offset recovery from kafka") in 
DirectKafkaStreamSuite.scala:

print("Called commitAsync at " + time +  " " + 
offsets.mkString("Array(", ", ", ")") + "\n")


This shows that the commitAll call is not properly handled. Since, it is called 
inside compute function. There is a chance that during last RDD, we will miss 
the last offset. In the current example we have missed the offset commit of 
range 8->10.

Can someone confirm if this is a design choice or a bug?

The current log is something like this.

Called commitAll at time 1645548063100 ms Array()
Called commitAll at time 1645548063200 ms Array()
Called commitAll at time 1645548063300 ms Array()
Called commitAll at time 1645548063400 ms Array()
Called commitAll at time 1645548063500 ms Array()
Called commitAll at time 1645548063600 ms Array()
Called commitAll at time 1645548063700 ms Array()
Called commitAll at time 1645548063800 ms Array()
Called commitAll at time 1645548063900 ms Array()
Called commitAll at time 1645548064000 ms Array()
Called commitAll at time 1645548064100 ms Array()
Called commitAll at time 1645548064200 ms Array()
Called commitAsync at 1645548063100 ms Array(OffsetRange(topic: 
'recoveryfromkafka', partition: 0, range: [0 -> 4]))
Called commitAsync at 1645548063200 ms Array(OffsetRange(topic: 
'recoveryfromkafka', partition: 0, range: [4 -> 4]))
Called commitAll at time 1645548064300 ms Array(OffsetRange(topic: 
'recoveryfromkafka', partition: 0, range: [0 -> 4]), OffsetRange(topic: 
'recoveryfromkafka', partition: 0, range: [4 -> 4]))
Called commitAsync at 1645548063300 ms Array(OffsetRange(topic: 
'recoveryfromkafka', partition: 0, range: [4 -> 4]))
Called commitAsync at 1645548063400 ms Array(OffsetRange(topic: 
'recoveryfromkafka', partition: 0, range: [4 -> 4]))
Called commitAsync at 1645548063500 ms Array(OffsetRange(topic: 
'recoveryfromkafka', partition: 0, range: [4 -> 4]))
Called commitAll at time 1645548064400 ms Array(OffsetRange(topic: 
'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic: 
'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic: 
'recoveryfromkafka', partition: 0, range: [4 -> 4]))
Called commitAsync at 1645548063600 ms Array(OffsetRange(topic: 
'recoveryfromkafka', partition: 0, range: [4 -> 4]))
Called commitAsync at 1645548063700 ms Array(OffsetRange(topic: 
'recoveryfromkafka', partition: 0, range: [4 -> 4]))
Called commitAsync at 1645548063800 ms Array(OffsetRange(topic: 
'recoveryfromkafka', partition: 0, range: [4 -> 4]))
Called commitAsync at 1645548063900 ms Array(OffsetRange(topic: 
'recoveryfromkafka', partition: 0, range: [4 -> 4]))
Called commitAll at time 1645548064500 ms Array(OffsetRange(topic: 
'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic: 
'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic: 
'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic: 
'recoveryfromkafka', partition: 0, range: [4 -> 4]))
Called commitAsync at 1645548064000 ms Array(OffsetRange(topic: 
'recoveryfromkafka', partition: 0, range: [4 -> 4]))
Called commitAsync at 1645548064100 ms Array(OffsetRange(topic: 
'recoveryfromkafka', partition: 0, range: [4 -> 4]))
Called commitAsync at 1645548064200 ms Array(OffsetRange(topic: 
'recoveryfromkafka', partition: 0, range: [4 -> 4]))
Called commitAsync at 1645548064300 ms Array(OffsetRange(topic: 
'recoveryfromkafka', partition: 0, range: [4 -> 4]))
Called commitAll at time 1645548064600 ms Array(OffsetRange(topic: 
'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic: 
'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic: 
'recoveryfromkafka', partition: 0, ran

Re: Probable bug in async commit of Kafka offset in DirectKafkaInputDStream

2022-03-29 Thread Dongjoon Hyun
Hi, Souvik

Could you file a JIRA issue for that?

Thanks,
Dongjoon

On Thu, Mar 24, 2022 at 11:08 AM Paul, Souvik  wrote:

> Hi Dev,
>
> I added a few debug statements at the following lines and found few issues.
>
> 1. At line 254 of override def compute(validTime: Time):
> Option[KafkaRDD[K, V]] in DirectKafkaInputDStream.scala:
>
> System.out.print("Called commitAll at time " + validTime + " " +
> commitQueue.toArray.mkString("Array(", ", ", ")") + "\n")
>
> 2. At line 454 of test("offset recovery from kafka") in
> DirectKafkaStreamSuite.scala:
>
> print("Called commitAsync at " + time +  " " +
> offsets.mkString("Array(", ", ", ")") + "\n")
>
>
> This shows that the commitAll call is not properly handled. Since, it is
> called inside compute function. There is a chance that during last RDD, we
> will miss the last offset. In the current example we have missed the offset
> commit of range 8->10.
>
> Can someone confirm if this is a design choice or a bug?
>
> The current log is something like this.
>
> Called commitAll at time 1645548063100 ms Array()
> Called commitAll at time 1645548063200 ms Array()
> Called commitAll at time 1645548063300 ms Array()
> Called commitAll at time 1645548063400 ms Array()
> Called commitAll at time 1645548063500 ms Array()
> Called commitAll at time 1645548063600 ms Array()
> Called commitAll at time 1645548063700 ms Array()
> Called commitAll at time 1645548063800 ms Array()
> Called commitAll at time 1645548063900 ms Array()
> Called commitAll at time 1645548064000 ms Array()
> Called commitAll at time 1645548064100 ms Array()
> Called commitAll at time 1645548064200 ms Array()
> Called commitAsync at 1645548063100 ms Array(OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [0 -> 4]))
> Called commitAsync at 1645548063200 ms Array(OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAll at time 1645548064300 ms Array(OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [0 -> 4]), OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAsync at 1645548063300 ms Array(OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAsync at 1645548063400 ms Array(OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAsync at 1645548063500 ms Array(OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAll at time 1645548064400 ms Array(OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAsync at 1645548063600 ms Array(OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAsync at 1645548063700 ms Array(OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAsync at 1645548063800 ms Array(OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAsync at 1645548063900 ms Array(OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAll at time 1645548064500 ms Array(OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAsync at 1645548064000 ms Array(OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAsync at 1645548064100 ms Array(OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAsync at 1645548064200 ms Array(OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAsync at 1645548064300 ms Array(OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAll at time 1645548064600 ms Array(OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic:
> 'recoveryfromka

Probable bug in async commit of Kafka offset in DirectKafkaInputDStream

2022-03-24 Thread Paul, Souvik
Hi Dev,

I added a few debug statements at the following lines and found few issues.

1. At line 254 of override def compute(validTime: Time): Option[KafkaRDD[K, V]] 
in DirectKafkaInputDStream.scala:

System.out.print("Called commitAll at time " + validTime + " " +
commitQueue.toArray.mkString("Array(", ", ", ")") + "\n")

2. At line 454 of test("offset recovery from kafka") in 
DirectKafkaStreamSuite.scala:

print("Called commitAsync at " + time +  " " + 
offsets.mkString("Array(", ", ", ")") + "\n")


This shows that the commitAll call is not properly handled. Since, it is called 
inside compute function. There is a chance that during last RDD, we will miss 
the last offset. In the current example we have missed the offset commit of 
range 8->10.

Can someone confirm if this is a design choice or a bug?

The current log is something like this.

Called commitAll at time 1645548063100 ms Array()
Called commitAll at time 1645548063200 ms Array()
Called commitAll at time 1645548063300 ms Array()
Called commitAll at time 1645548063400 ms Array()
Called commitAll at time 1645548063500 ms Array()
Called commitAll at time 1645548063600 ms Array()
Called commitAll at time 1645548063700 ms Array()
Called commitAll at time 1645548063800 ms Array()
Called commitAll at time 1645548063900 ms Array()
Called commitAll at time 1645548064000 ms Array()
Called commitAll at time 1645548064100 ms Array()
Called commitAll at time 1645548064200 ms Array()
Called commitAsync at 1645548063100 ms Array(OffsetRange(topic: 
'recoveryfromkafka', partition: 0, range: [0 -> 4]))
Called commitAsync at 1645548063200 ms Array(OffsetRange(topic: 
'recoveryfromkafka', partition: 0, range: [4 -> 4]))
Called commitAll at time 1645548064300 ms Array(OffsetRange(topic: 
'recoveryfromkafka', partition: 0, range: [0 -> 4]), OffsetRange(topic: 
'recoveryfromkafka', partition: 0, range: [4 -> 4]))
Called commitAsync at 1645548063300 ms Array(OffsetRange(topic: 
'recoveryfromkafka', partition: 0, range: [4 -> 4]))
Called commitAsync at 1645548063400 ms Array(OffsetRange(topic: 
'recoveryfromkafka', partition: 0, range: [4 -> 4]))
Called commitAsync at 1645548063500 ms Array(OffsetRange(topic: 
'recoveryfromkafka', partition: 0, range: [4 -> 4]))
Called commitAll at time 1645548064400 ms Array(OffsetRange(topic: 
'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic: 
'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic: 
'recoveryfromkafka', partition: 0, range: [4 -> 4]))
Called commitAsync at 1645548063600 ms Array(OffsetRange(topic: 
'recoveryfromkafka', partition: 0, range: [4 -> 4]))
Called commitAsync at 1645548063700 ms Array(OffsetRange(topic: 
'recoveryfromkafka', partition: 0, range: [4 -> 4]))
Called commitAsync at 1645548063800 ms Array(OffsetRange(topic: 
'recoveryfromkafka', partition: 0, range: [4 -> 4]))
Called commitAsync at 1645548063900 ms Array(OffsetRange(topic: 
'recoveryfromkafka', partition: 0, range: [4 -> 4]))
Called commitAll at time 1645548064500 ms Array(OffsetRange(topic: 
'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic: 
'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic: 
'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic: 
'recoveryfromkafka', partition: 0, range: [4 -> 4]))
Called commitAsync at 1645548064000 ms Array(OffsetRange(topic: 
'recoveryfromkafka', partition: 0, range: [4 -> 4]))
Called commitAsync at 1645548064100 ms Array(OffsetRange(topic: 
'recoveryfromkafka', partition: 0, range: [4 -> 4]))
Called commitAsync at 1645548064200 ms Array(OffsetRange(topic: 
'recoveryfromkafka', partition: 0, range: [4 -> 4]))
Called commitAsync at 1645548064300 ms Array(OffsetRange(topic: 
'recoveryfromkafka', partition: 0, range: [4 -> 4]))
Called commitAll at time 1645548064600 ms Array(OffsetRange(topic: 
'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic: 
'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic: 
'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic: 
'recoveryfromkafka', partition: 0, range: [4 -> 4]))
Called commitAsync at 1645548064400 ms Array(OffsetRange(topic: 
'recoveryfromkafka', partition: 0, range: [4 -> 8]))
Called commitAsync at 1645548064500 ms Array(OffsetRange(topic: 
'recoveryfromkafka', partition: 0, range: [8 -> 8]))
Called commitAsync at 1645548064600 ms Array(OffsetRange(topic: 
'recoveryfromkafka', p

Re: Help needed to locate the csv parser (for Spark bug reporting/fixing)

2022-02-10 Thread Marnix van den Broek
Thanks, Sean!

It was actually on the Catalyst side of things, but I found where column
pruning pushdown is delegated to univocity, see [1].

I've tried setting the spark configuration
*spark.sql.csv.parser.columnPruning.enabled* to *False* and this prevents
the bug from happening. I am unfamiliar with Java / Scala so I might be
misreading things, but to me everything points to a bug in univocity,
specifically in how the *selectIndexes* parser setting impacts the parsing
of the example in the bug report.

This means that to fix this bug, univocity must be fixed and Spark then
needs to refer to a fixed version, correct? Unless someone thinks this
analysis is off, I'll add this info to the Spark issue and file a bug
report with univocity.

1.
https://github.com/apache/spark/blob/6a59fba248359fb2614837fe8781dc63ac8fdc4c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala#L79

On Thu, Feb 10, 2022 at 5:39 PM Sean Owen  wrote:

> It starts in org.apache.spark.sql.execution.datasources.csv.CSVDataSource.
> Yes univocity is used for much of the parsing.
> I am not sure of the cause of the bug but it does look like one indeed. In
> one case the parser is asked to read all fields, in the other, to skip one.
> The pushdown helps efficiency but something is going wrong.
>
> On Thu, Feb 10, 2022 at 10:34 AM Marnix van den Broek <
> marnix.van.den.br...@bundlesandbatches.io> wrote:
>
>> hi all,
>>
>> Yesterday I filed a CSV parsing bug [1] for Spark, that leads to data
>> incorrectness when data contains sequences similar to the one in the
>> report.
>>
>> I wanted to take a look at the parsing logic to see if I could spot the
>> error to update the issue with more information and to possibly contribute
>> a PR with a bug fix, but I got completely lost navigating my way down the
>> dependencies in the Spark repository. Can someone point me in the right
>> direction?
>>
>> I am looking for the csv parser itself, which is likely a dependency?
>>
>> The next question might need too much knowledge about Spark internals to
>> know where to look or understand what I'd be looking at, but I am also
>> looking to see if and why the implementation of the CSV parsing is
>> different when columns are projected as opposed to the processing of the
>> full dataframe/ The issue only occurs when projecting columns and this
>> inconsistency is a worry in itself.
>>
>> Many thanks,
>>
>> Marnix
>>
>> 1. https://issues.apache.org/jira/browse/SPARK-38167
>>
>>


Re: Help needed to locate the csv parser (for Spark bug reporting/fixing)

2022-02-10 Thread Sean Owen
It starts in org.apache.spark.sql.execution.datasources.csv.CSVDataSource.
Yes univocity is used for much of the parsing.
I am not sure of the cause of the bug but it does look like one indeed. In
one case the parser is asked to read all fields, in the other, to skip one.
The pushdown helps efficiency but something is going wrong.

On Thu, Feb 10, 2022 at 10:34 AM Marnix van den Broek <
marnix.van.den.br...@bundlesandbatches.io> wrote:

> hi all,
>
> Yesterday I filed a CSV parsing bug [1] for Spark, that leads to data
> incorrectness when data contains sequences similar to the one in the
> report.
>
> I wanted to take a look at the parsing logic to see if I could spot the
> error to update the issue with more information and to possibly contribute
> a PR with a bug fix, but I got completely lost navigating my way down the
> dependencies in the Spark repository. Can someone point me in the right
> direction?
>
> I am looking for the csv parser itself, which is likely a dependency?
>
> The next question might need too much knowledge about Spark internals to
> know where to look or understand what I'd be looking at, but I am also
> looking to see if and why the implementation of the CSV parsing is
> different when columns are projected as opposed to the processing of the
> full dataframe/ The issue only occurs when projecting columns and this
> inconsistency is a worry in itself.
>
> Many thanks,
>
> Marnix
>
> 1. https://issues.apache.org/jira/browse/SPARK-38167
>
>


Help needed to locate the csv parser (for Spark bug reporting/fixing)

2022-02-10 Thread Marnix van den Broek
hi all,

Yesterday I filed a CSV parsing bug [1] for Spark, that leads to data
incorrectness when data contains sequences similar to the one in the
report.

I wanted to take a look at the parsing logic to see if I could spot the
error to update the issue with more information and to possibly contribute
a PR with a bug fix, but I got completely lost navigating my way down the
dependencies in the Spark repository. Can someone point me in the right
direction?

I am looking for the csv parser itself, which is likely a dependency?

The next question might need too much knowledge about Spark internals to
know where to look or understand what I'd be looking at, but I am also
looking to see if and why the implementation of the CSV parsing is
different when columns are projected as opposed to the processing of the
full dataframe/ The issue only occurs when projecting columns and this
inconsistency is a worry in itself.

Many thanks,

Marnix

1. https://issues.apache.org/jira/browse/SPARK-38167


Seems like a bug in Spark dynamic allocation, numberMaxNeededExecutors value is negative in JMX exporter, which is unreasonable

2022-01-13 Thread 徐涛
Hi Experts:
 Seems like a bug in ExecutorAllocationManager, because 
numberMaxNeededExecutors value is negative in JMX exporter, which is 
unreasonable
 Spark Version is 3.1.1



And the following is the debug log, the numRunningOrPendingTasks get a negative 
value:


22/01/13 09:32:21 DEBUG ExecutorAllocationManager: max needed for rpId: 0 
numpending: -136, tasksperexecutor: 4

22/01/13 09:32:22 DEBUG ExecutorAllocationManager: max needed for rpId: 0 
numpending: -171, tasksperexecutor: 4

22/01/13 09:32:22 DEBUG ExecutorAllocationManager: max needed for rpId: 0 
numpending: -188, tasksperexecutor: 4

22/01/13 09:32:22 DEBUG ExecutorAllocationManager: max needed for rpId: 0 
numpending: -196, tasksperexecutor: 4

22/01/13 09:32:22 DEBUG ExecutorAllocationManager: max needed for rpId: 0 
numpending: -209, tasksperexecutor: 4

22/01/13 09:32:22 DEBUG ExecutorAllocationManager: max needed for rpId: 0 
numpending: -221, tasksperexecutor: 4

22/01/13 09:32:22 DEBUG ExecutorAllocationManager: max needed for rpId: 0 
numpending: -229, tasksperexecutor: 4

22/01/13 09:32:22 DEBUG ExecutorAllocationManager: max needed for rpId: 0 
numpending: -234, tasksperexecutor: 4

22/01/13 09:32:22 DEBUG ExecutorAllocationManager: max needed for rpId: 0 
numpending: -238, tasksperexecutor: 4

22/01/13 09:32:22 DEBUG ExecutorAllocationManager: max needed for rpId: 0 
numpending: -240, tasksperexecutor: 4

22/01/13 09:32:22 DEBUG ExecutorAllocationManager: max needed for rpId: 0 
numpending: -246, tasksperexecutor: 4

22/01/13 09:32:23 DEBUG ExecutorAllocationManager: max needed for rpId: 0 
numpending: -250, tasksperexecutor: 4

22/01/13 09:32:23 DEBUG ExecutorAllocationManager: max needed for rpId: 0 
numpending: -250, tasksperexecutor: 4

22/01/13 09:32:23 DEBUG ExecutorAllocationManager: max needed for rpId: 0 
numpending: -250, tasksperexecutor: 4

22/01/13 09:32:23 DEBUG ExecutorAllocationManager: max needed for rpId: 0 
numpending: -251, tasksperexecutor: 4

22/01/13 09:32:23 DEBUG ExecutorAllocationManager: max needed for rpId: 0 
numpending: -211, tasksperexecutor: 4




 The return value of maxNumExecutorsNeededPerResourceProfile method turns 
to be uncorrect after some hours running in thrift server.




Best 

xu

Re: Bug?

2021-02-18 Thread Tyson
I am not sure if the problem persists in 3.x.

On Thu, Feb 18, 2021 at 12:14 PM Dongjoon Hyun 
wrote:

> Thank you for sharing, Tyson.
>
> Spark 2.4.4 looks too old to me. Do you think it will occur at 3.x?
>
> Bests,
> Dongjoon.
>
>
> On Thu, Feb 18, 2021 at 11:07 AM Tyson  wrote:
>
>> We observed an interesting stack trace that I'd like to share with you.
>> The logging level is WARN, but it appears to be causing task failures.
>> Please let me know if anyone has any insights. It appears to be a integer
>> overflow issue from looking at the code in Spark 2.4.4
>>
>> WARN TaskSetManager [task-result-getter-0]: Lost task 3175.0 in stage
>> 518.0 (TID 186951, executor 150): java.lang.NegativeArraySizeException
>>
>>  at 
>> org.apache.spark.sql.catalyst.expressions.UnsafeRow.getBinary(UnsafeRow.java:438)
>>  at 
>> org.apache.spark.sql.catalyst.expressions.UnsafeRow.getDecimal(UnsafeRow.java:414)
>>  at 
>> org.apache.spark.sql.catalyst.expressions.JoinedRow.getDecimal(JoinedRow.scala:95)
>>  at 
>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_4$(Unknown
>>  Source)
>>  at 
>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
>>  Source)
>>  at 
>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
>>  Source)
>>  at 
>> org.apache.spark.sql.execution.joins.HashJoin$$anonfun$join$1.apply(HashJoin.scala:218)
>>  at 
>> org.apache.spark.sql.execution.joins.HashJoin$$anonfun$join$1.apply(HashJoin.scala:216)
>>  at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
>>  at 
>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage5.processNext(Unknown
>>  Source)
>>  at 
>> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>>  at 
>> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
>>  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
>>  at 
>> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:187)
>>  at 
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
>>  at 
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
>>  at org.apache.spark.scheduler.Task.run(Task.scala:123)
>>  at 
>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
>>  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
>>  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
>>  at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>  at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>  at java.lang.Thread.run(Thread.java:748)
>>
>>


Re: Bug?

2021-02-18 Thread Dongjoon Hyun
Thank you for sharing, Tyson.

Spark 2.4.4 looks too old to me. Do you think it will occur at 3.x?

Bests,
Dongjoon.


On Thu, Feb 18, 2021 at 11:07 AM Tyson  wrote:

> We observed an interesting stack trace that I'd like to share with you.
> The logging level is WARN, but it appears to be causing task failures.
> Please let me know if anyone has any insights. It appears to be a integer
> overflow issue from looking at the code in Spark 2.4.4
>
> WARN TaskSetManager [task-result-getter-0]: Lost task 3175.0 in stage
> 518.0 (TID 186951, executor 150): java.lang.NegativeArraySizeException
>
>   at 
> org.apache.spark.sql.catalyst.expressions.UnsafeRow.getBinary(UnsafeRow.java:438)
>   at 
> org.apache.spark.sql.catalyst.expressions.UnsafeRow.getDecimal(UnsafeRow.java:414)
>   at 
> org.apache.spark.sql.catalyst.expressions.JoinedRow.getDecimal(JoinedRow.scala:95)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_4$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.joins.HashJoin$$anonfun$join$1.apply(HashJoin.scala:218)
>   at 
> org.apache.spark.sql.execution.joins.HashJoin$$anonfun$join$1.apply(HashJoin.scala:216)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage5.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
>   at 
> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:187)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
>   at org.apache.spark.scheduler.Task.run(Task.scala:123)
>   at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
>   at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
>
>


Bug?

2021-02-18 Thread Tyson
We observed an interesting stack trace that I'd like to share with you. The
logging level is WARN, but it appears to be causing task failures. Please
let me know if anyone has any insights. It appears to be a integer overflow
issue from looking at the code in Spark 2.4.4

WARN TaskSetManager [task-result-getter-0]: Lost task 3175.0 in stage 518.0
(TID 186951, executor 150): java.lang.NegativeArraySizeException

at 
org.apache.spark.sql.catalyst.expressions.UnsafeRow.getBinary(UnsafeRow.java:438)
at 
org.apache.spark.sql.catalyst.expressions.UnsafeRow.getDecimal(UnsafeRow.java:414)
at 
org.apache.spark.sql.catalyst.expressions.JoinedRow.getDecimal(JoinedRow.scala:95)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_4$(Unknown
Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
Source)
at 
org.apache.spark.sql.execution.joins.HashJoin$$anonfun$join$1.apply(HashJoin.scala:218)
at 
org.apache.spark.sql.execution.joins.HashJoin$$anonfun$join$1.apply(HashJoin.scala:216)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage5.processNext(Unknown
Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at 
org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:187)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)


Re: BUG: spark.readStream .schema(staticSchema) not receiving schema information

2020-03-28 Thread Zahid Rahman
Thanks for the tip!

But if the first thing you come across
Is somebody  using the trim function to strip away spaces in /etc/hostnames
like so from :

127.0.0.1 hostname local

To

127.0.0.1hostnamelocal

Then there is a log error message showing the outcome of unnecessarily
using the trim function.

Especially when one of the spark core functionality is to read lines from
files separated by a space, comma.

Also have you seen the log4j.properties
Setting to ERROR and in one case FATAL
for suppressing discrepancies.

Please May I draw your attention and attention of all in the community to
this page Which shows turning on compiler WARNINGS  before releasing
software and other software best practices.

“The Power of 10 — NASA’s Rules for Coding” by Riccardo Giorato
https://link.medium.com/PUz88PIql3

What impression  would you have  ?



On Sat, 28 Mar 2020, 15:50 Jeff Evans, 
wrote:

> Dude, you really need to chill. Have you ever worked with a large open
> source project before? It seems not. Even so, insinuating there are tons of
> bugs that were left uncovered until you came along (despite the fact that
> the project is used by millions across many different organizations) is
> ludicrous. Learn a little bit of humility
>
> If you're new to something, assume you have made a mistake rather than
> that there is a bug. Lurk a bit more, or even do a simple Google search,
> and you will realize Sean is a very senior committer (i.e. expert) in
> Spark, and has been for many years. He, and everyone else participating in
> these lists, is doing it voluntarily on their own time. They're not being
> paid to handhold you and quickly answer to your every whim.
>
> On Sat, Mar 28, 2020, 10:46 AM Zahid Rahman  wrote:
>
>> So the schema is limited to holding only the DEFINITION of schema. For
>> example as you say  the columns, I.e. first column User:Int 2nd column
>> String:password.
>>
>> Not location of source I.e. csv file with or without header.  SQL DB
>> tables.
>>
>> I am pleased for once I am wrong about being another bug, and it was a
>> design decision adding flexibility.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> On Sat, 28 Mar 2020, 15:24 Russell Spitzer, 
>> wrote:
>>
>>> This is probably more of a question for the user support list, but I
>>> believe I understand the issue.
>>>
>>> Schema inside of spark refers to the structure of the output rows, for
>>> example the schema for a particular dataframe could be
>>> (User: Int, Password: String) - Two Columns the first is User of type
>>> int and the second is Password of Type String.
>>>
>>> When you pass the schema from one reader to another, you are only
>>> copyting this structure, not all of the other options associated with the
>>> dataframe.
>>> This is usually useful when you are reading from sources with different
>>> options but data that needs to be read into the same structure.
>>>
>>> The other properties such as "format" and "options" exist independently
>>> of Schema. This is helpful if I was reading from both MySQL and
>>> a comma separated file for example. While the Schema is the same, the
>>> options like ("inferSchema") do not apply to both MySql and CSV and
>>> format actually picks whether to us "JDBC" or "CSV" so copying that
>>> wouldn't be helpful either.
>>>
>>> I hope this clears things up,
>>> Russ
>>>
>>> On Sat, Mar 28, 2020, 12:33 AM Zahid Rahman 
>>> wrote:
>>>
>>>> Hi,
>>>> version: spark-3.0.0-preview2-bin-hadoop2.7
>>>>
>>>> As you can see from the code :
>>>>
>>>> STEP 1:  I  create a object of type static frame which holds all the
>>>> information to the datasource (csv files).
>>>>
>>>> STEP 2: Then I create a variable  called staticSchema  assigning the
>>>> information of the schema from the original static data frame.
>>>>
>>>> STEP 3: then I create another variable called val streamingDataFrame of
>>>> type spark.readStream.
>>>> and Into the .schema function parameters I pass the object staticSchema
>>>> which is meant to hold the information to the  csv files including the
>>>> .load(path) function etc.
>>>>
>>>> So then when I am creating val StreamingDataFrame and passing it
>>>> .schema(staticSchema)
>>>> the variable StreamingDataFrame  should have all the information.
>>>> I should only have

Re: BUG: spark.readStream .schema(staticSchema) not receiving schema information

2020-03-28 Thread Zahid Rahman
So the schema is limited to holding only the DEFINITION of schema. For
example as you say  the columns, I.e. first column User:Int 2nd column
String:password.

Not location of source I.e. csv file with or without header.  SQL DB tables.

I am pleased for once I am wrong about being another bug, and it was a
design decision adding flexibility.









On Sat, 28 Mar 2020, 15:24 Russell Spitzer, 
wrote:

> This is probably more of a question for the user support list, but I
> believe I understand the issue.
>
> Schema inside of spark refers to the structure of the output rows, for
> example the schema for a particular dataframe could be
> (User: Int, Password: String) - Two Columns the first is User of type int
> and the second is Password of Type String.
>
> When you pass the schema from one reader to another, you are only
> copyting this structure, not all of the other options associated with the
> dataframe.
> This is usually useful when you are reading from sources with different
> options but data that needs to be read into the same structure.
>
> The other properties such as "format" and "options" exist independently of
> Schema. This is helpful if I was reading from both MySQL and
> a comma separated file for example. While the Schema is the same, the
> options like ("inferSchema") do not apply to both MySql and CSV and
> format actually picks whether to us "JDBC" or "CSV" so copying that
> wouldn't be helpful either.
>
> I hope this clears things up,
> Russ
>
> On Sat, Mar 28, 2020, 12:33 AM Zahid Rahman  wrote:
>
>> Hi,
>> version: spark-3.0.0-preview2-bin-hadoop2.7
>>
>> As you can see from the code :
>>
>> STEP 1:  I  create a object of type static frame which holds all the
>> information to the datasource (csv files).
>>
>> STEP 2: Then I create a variable  called staticSchema  assigning the
>> information of the schema from the original static data frame.
>>
>> STEP 3: then I create another variable called val streamingDataFrame of
>> type spark.readStream.
>> and Into the .schema function parameters I pass the object staticSchema
>> which is meant to hold the information to the  csv files including the
>> .load(path) function etc.
>>
>> So then when I am creating val StreamingDataFrame and passing it
>> .schema(staticSchema)
>> the variable StreamingDataFrame  should have all the information.
>> I should only have to call .option("maxFilePerTrigger",1) and not .format
>> ("csv") .option("header","true").load("/data/retail-data/by-day/*.csv")
>> Otherwise what is the point of passing .schema(staticSchema) to
>> StreamingDataFrame.
>>
>> You can replicate it using the complete code below.
>>
>> import org.apache.spark.sql.SparkSession
>> import org.apache.spark.sql.functions.{window,column,desc,col}
>>
>> object RetailData {
>>
>>   def main(args: Array[String]): Unit = {
>>
>> // create spark session
>> val spark = 
>> SparkSession.builder().master("spark://192.168.0.38:7077").appName("Retail 
>> Data").getOrCreate();
>> // set spark runtime  configuration
>> spark.conf.set("spark.sql.shuffle.partitions","5")
>> 
>> spark.conf.set("spark.sql.streaming.forceDeleteTempCheckpointLocation","True")
>>
>> // create a static frame
>>   val staticDataFrame = spark.read.format("csv")
>> .option ("header","true")
>> .option("inferschema","true")
>> .load("/data/retail-data/by-day/*.csv")
>>
>>
>> staticDataFrame.createOrReplaceTempView("retail_data")
>> val staticSchema = staticDataFrame.schema
>>
>> staticDataFrame
>>   .selectExpr(
>> "CustomerId","UnitPrice * Quantity as total_cost", "InvoiceDate")
>>   .groupBy(col("CustomerId"),
>> window(col("InvoiceDate"),
>> "1 day"))
>>   .sum("total_cost")
>>   .sort(desc("sum(total_cost)"))
>>   .show(2)
>>
>> val streamingDataFrame = spark.readStream
>>   .schema(staticSchema)
>>   .format("csv")
>>   .option("maxFilesPerTrigger", 1)
>>   .option("header","true")
>>   .load("/data/retail-data/by-day/*.csv")
>>
>>   println(streamingDataFrame.isStreaming)
>>
>> // lazy operation so we will need to call a streaming action to start 
>> the action
>> val purchaseByCustomerPerHour = streamingDataFrame
>> .selectExpr(
>>   "CustomerId",
>>   "(UnitPrice * Quantity) as total_cost",
>>   "InvoiceDate")
>> .groupBy(
>>   col("CustomerId"), window(col("InvoiceDate"), "1 day"))
>> .sum("total_cost")
>>
>> // stream action to write to console
>> purchaseByCustomerPerHour.writeStream
>>   .format("console")
>>   .queryName("customer_purchases")
>>   .outputMode("complete")
>>   .start()
>>
>>   } // main
>>
>> } // object
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> val staticSchema = staticDataFrame.schema
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> Backbutton.co.uk
>> ¯\_(ツ)_/¯
>> ♡۶Java♡۶RMI ♡۶
>> Make Use Method {MUM}
>> makeuse.org
>> <http://www.backbutton.co.uk>
>>
>


Re: BUG: spark.readStream .schema(staticSchema) not receiving schema information

2020-03-28 Thread Zahid Rahman
Very kind of you.

On Sat, 28 Mar 2020, 15:24 Russell Spitzer, 
wrote:

> This is probably more of a question for the user support list, but I
> believe I understand the issue.
>
> Schema inside of spark refers to the structure of the output rows, for
> example the schema for a particular dataframe could be
> (User: Int, Password: String) - Two Columns the first is User of type int
> and the second is Password of Type String.
>
> When you pass the schema from one reader to another, you are only
> copyting this structure, not all of the other options associated with the
> dataframe.
> This is usually useful when you are reading from sources with different
> options but data that needs to be read into the same structure.
>
> The other properties such as "format" and "options" exist independently of
> Schema. This is helpful if I was reading from both MySQL and
> a comma separated file for example. While the Schema is the same, the
> options like ("inferSchema") do not apply to both MySql and CSV and
> format actually picks whether to us "JDBC" or "CSV" so copying that
> wouldn't be helpful either.
>
> I hope this clears things up,
> Russ
>
> On Sat, Mar 28, 2020, 12:33 AM Zahid Rahman  wrote:
>
>> Hi,
>> version: spark-3.0.0-preview2-bin-hadoop2.7
>>
>> As you can see from the code :
>>
>> STEP 1:  I  create a object of type static frame which holds all the
>> information to the datasource (csv files).
>>
>> STEP 2: Then I create a variable  called staticSchema  assigning the
>> information of the schema from the original static data frame.
>>
>> STEP 3: then I create another variable called val streamingDataFrame of
>> type spark.readStream.
>> and Into the .schema function parameters I pass the object staticSchema
>> which is meant to hold the information to the  csv files including the
>> .load(path) function etc.
>>
>> So then when I am creating val StreamingDataFrame and passing it
>> .schema(staticSchema)
>> the variable StreamingDataFrame  should have all the information.
>> I should only have to call .option("maxFilePerTrigger",1) and not .format
>> ("csv") .option("header","true").load("/data/retail-data/by-day/*.csv")
>> Otherwise what is the point of passing .schema(staticSchema) to
>> StreamingDataFrame.
>>
>> You can replicate it using the complete code below.
>>
>> import org.apache.spark.sql.SparkSession
>> import org.apache.spark.sql.functions.{window,column,desc,col}
>>
>> object RetailData {
>>
>>   def main(args: Array[String]): Unit = {
>>
>> // create spark session
>> val spark = 
>> SparkSession.builder().master("spark://192.168.0.38:7077").appName("Retail 
>> Data").getOrCreate();
>> // set spark runtime  configuration
>> spark.conf.set("spark.sql.shuffle.partitions","5")
>> 
>> spark.conf.set("spark.sql.streaming.forceDeleteTempCheckpointLocation","True")
>>
>> // create a static frame
>>   val staticDataFrame = spark.read.format("csv")
>> .option ("header","true")
>> .option("inferschema","true")
>> .load("/data/retail-data/by-day/*.csv")
>>
>>
>> staticDataFrame.createOrReplaceTempView("retail_data")
>> val staticSchema = staticDataFrame.schema
>>
>> staticDataFrame
>>   .selectExpr(
>> "CustomerId","UnitPrice * Quantity as total_cost", "InvoiceDate")
>>   .groupBy(col("CustomerId"),
>> window(col("InvoiceDate"),
>> "1 day"))
>>   .sum("total_cost")
>>   .sort(desc("sum(total_cost)"))
>>   .show(2)
>>
>> val streamingDataFrame = spark.readStream
>>   .schema(staticSchema)
>>   .format("csv")
>>   .option("maxFilesPerTrigger", 1)
>>   .option("header","true")
>>   .load("/data/retail-data/by-day/*.csv")
>>
>>   println(streamingDataFrame.isStreaming)
>>
>> // lazy operation so we will need to call a streaming action to start 
>> the action
>> val purchaseByCustomerPerHour = streamingDataFrame
>> .selectExpr(
>>   "CustomerId",
>>   "(UnitPrice * Quantity) as total_cost",
>>   "InvoiceDate")
>> .groupBy(
>>   col("CustomerId"), window(col("InvoiceDate"), "1 day"))
>> .sum("total_cost")
>>
>> // stream action to write to console
>> purchaseByCustomerPerHour.writeStream
>>   .format("console")
>>   .queryName("customer_purchases")
>>   .outputMode("complete")
>>   .start()
>>
>>   } // main
>>
>> } // object
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> val staticSchema = staticDataFrame.schema
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> Backbutton.co.uk
>> ¯\_(ツ)_/¯
>> ♡۶Java♡۶RMI ♡۶
>> Make Use Method {MUM}
>> makeuse.org
>> 
>>
>


Re: BUG: spark.readStream .schema(staticSchema) not receiving schema information

2020-03-28 Thread Russell Spitzer
This is probably more of a question for the user support list, but I
believe I understand the issue.

Schema inside of spark refers to the structure of the output rows, for
example the schema for a particular dataframe could be
(User: Int, Password: String) - Two Columns the first is User of type int
and the second is Password of Type String.

When you pass the schema from one reader to another, you are only
copyting this structure, not all of the other options associated with the
dataframe.
This is usually useful when you are reading from sources with different
options but data that needs to be read into the same structure.

The other properties such as "format" and "options" exist independently of
Schema. This is helpful if I was reading from both MySQL and
a comma separated file for example. While the Schema is the same, the
options like ("inferSchema") do not apply to both MySql and CSV and
format actually picks whether to us "JDBC" or "CSV" so copying that
wouldn't be helpful either.

I hope this clears things up,
Russ

On Sat, Mar 28, 2020, 12:33 AM Zahid Rahman  wrote:

> Hi,
> version: spark-3.0.0-preview2-bin-hadoop2.7
>
> As you can see from the code :
>
> STEP 1:  I  create a object of type static frame which holds all the
> information to the datasource (csv files).
>
> STEP 2: Then I create a variable  called staticSchema  assigning the
> information of the schema from the original static data frame.
>
> STEP 3: then I create another variable called val streamingDataFrame of
> type spark.readStream.
> and Into the .schema function parameters I pass the object staticSchema
> which is meant to hold the information to the  csv files including the
> .load(path) function etc.
>
> So then when I am creating val StreamingDataFrame and passing it
> .schema(staticSchema)
> the variable StreamingDataFrame  should have all the information.
> I should only have to call .option("maxFilePerTrigger",1) and not .format
> ("csv") .option("header","true").load("/data/retail-data/by-day/*.csv")
> Otherwise what is the point of passing .schema(staticSchema) to
> StreamingDataFrame.
>
> You can replicate it using the complete code below.
>
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.sql.functions.{window,column,desc,col}
>
> object RetailData {
>
>   def main(args: Array[String]): Unit = {
>
> // create spark session
> val spark = 
> SparkSession.builder().master("spark://192.168.0.38:7077").appName("Retail 
> Data").getOrCreate();
> // set spark runtime  configuration
> spark.conf.set("spark.sql.shuffle.partitions","5")
> 
> spark.conf.set("spark.sql.streaming.forceDeleteTempCheckpointLocation","True")
>
> // create a static frame
>   val staticDataFrame = spark.read.format("csv")
> .option ("header","true")
> .option("inferschema","true")
> .load("/data/retail-data/by-day/*.csv")
>
>
> staticDataFrame.createOrReplaceTempView("retail_data")
> val staticSchema = staticDataFrame.schema
>
> staticDataFrame
>   .selectExpr(
> "CustomerId","UnitPrice * Quantity as total_cost", "InvoiceDate")
>   .groupBy(col("CustomerId"),
> window(col("InvoiceDate"),
> "1 day"))
>   .sum("total_cost")
>   .sort(desc("sum(total_cost)"))
>   .show(2)
>
> val streamingDataFrame = spark.readStream
>   .schema(staticSchema)
>   .format("csv")
>   .option("maxFilesPerTrigger", 1)
>   .option("header","true")
>   .load("/data/retail-data/by-day/*.csv")
>
>   println(streamingDataFrame.isStreaming)
>
> // lazy operation so we will need to call a streaming action to start the 
> action
> val purchaseByCustomerPerHour = streamingDataFrame
> .selectExpr(
>   "CustomerId",
>   "(UnitPrice * Quantity) as total_cost",
>   "InvoiceDate")
> .groupBy(
>   col("CustomerId"), window(col("InvoiceDate"), "1 day"))
> .sum("total_cost")
>
> // stream action to write to console
> purchaseByCustomerPerHour.writeStream
>   .format("console")
>   .queryName("customer_purchases")
>   .outputMode("complete")
>   .start()
>
>   } // main
>
> } // object
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> val staticSchema = staticDataFrame.schema
>
>
>
>
>
>
>
>
>
>
>
>
>
> Backbutton.co.uk
> ¯\_(ツ)_/¯
> ♡۶Java♡۶RMI ♡۶
> Make Use Method {MUM}
> makeuse.org
> 
>


BUG: spark.readStream .schema(staticSchema) not receiving schema information

2020-03-27 Thread Zahid Rahman
Hi,
version: spark-3.0.0-preview2-bin-hadoop2.7

As you can see from the code :

STEP 1:  I  create a object of type static frame which holds all the
information to the datasource (csv files).

STEP 2: Then I create a variable  called staticSchema  assigning the
information of the schema from the original static data frame.

STEP 3: then I create another variable called val streamingDataFrame of
type spark.readStream.
and Into the .schema function parameters I pass the object staticSchema
which is meant to hold the information to the  csv files including the
.load(path) function etc.

So then when I am creating val StreamingDataFrame and passing it
.schema(staticSchema)
the variable StreamingDataFrame  should have all the information.
I should only have to call .option("maxFilePerTrigger",1) and not .format
("csv") .option("header","true").load("/data/retail-data/by-day/*.csv")
Otherwise what is the point of passing .schema(staticSchema) to
StreamingDataFrame.

You can replicate it using the complete code below.

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{window,column,desc,col}

object RetailData {

  def main(args: Array[String]): Unit = {

// create spark session
val spark =
SparkSession.builder().master("spark://192.168.0.38:7077").appName("Retail
Data").getOrCreate();
// set spark runtime  configuration
spark.conf.set("spark.sql.shuffle.partitions","5")

spark.conf.set("spark.sql.streaming.forceDeleteTempCheckpointLocation","True")

// create a static frame
  val staticDataFrame = spark.read.format("csv")
.option ("header","true")
.option("inferschema","true")
.load("/data/retail-data/by-day/*.csv")


staticDataFrame.createOrReplaceTempView("retail_data")
val staticSchema = staticDataFrame.schema

staticDataFrame
  .selectExpr(
"CustomerId","UnitPrice * Quantity as total_cost", "InvoiceDate")
  .groupBy(col("CustomerId"),
window(col("InvoiceDate"),
"1 day"))
  .sum("total_cost")
  .sort(desc("sum(total_cost)"))
  .show(2)

val streamingDataFrame = spark.readStream
  .schema(staticSchema)
  .format("csv")
  .option("maxFilesPerTrigger", 1)
  .option("header","true")
  .load("/data/retail-data/by-day/*.csv")

  println(streamingDataFrame.isStreaming)

// lazy operation so we will need to call a streaming action to
start the action
val purchaseByCustomerPerHour = streamingDataFrame
.selectExpr(
  "CustomerId",
  "(UnitPrice * Quantity) as total_cost",
  "InvoiceDate")
.groupBy(
  col("CustomerId"), window(col("InvoiceDate"), "1 day"))
.sum("total_cost")

// stream action to write to console
purchaseByCustomerPerHour.writeStream
  .format("console")
  .queryName("customer_purchases")
  .outputMode("complete")
  .start()

  } // main

} // object




















val staticSchema = staticDataFrame.schema













Backbutton.co.uk
¯\_(ツ)_/¯
♡۶Java♡۶RMI ♡۶
Make Use Method {MUM}
makeuse.org



Re: BUG: take with SparkSession.master[url]

2020-03-26 Thread Zahid Rahman
~/spark-3.0.0-preview2-bin-hadoop2.7$ sbin/start-slave.sh spark://
192.168.0.38:7077
~/spark-3.0.0-preview2-bin-hadoop2.7$ sbin/start-master.sh

Backbutton.co.uk
¯\_(ツ)_/¯
♡۶Java♡۶RMI ♡۶
Make Use Method {MUM}
makeuse.org



On Fri, 27 Mar 2020 at 06:12, Zahid Rahman  wrote:

> sbin/start-master.sh
> sbin/start-slave.sh spark://192.168.0.38:7077
>
> Backbutton.co.uk
> ¯\_(ツ)_/¯
> ♡۶Java♡۶RMI ♡۶
> Make Use Method {MUM}
> makeuse.org
> 
>
>
> On Fri, 27 Mar 2020 at 05:59, Wenchen Fan  wrote:
>
>> Your Spark cluster, spark://192.168.0.38:7077, how is it deployed if you
>> just include Spark dependency in IntelliJ?
>>
>> On Fri, Mar 27, 2020 at 1:54 PM Zahid Rahman 
>> wrote:
>>
>>> I have configured  in IntelliJ as external jars
>>> spark-3.0.0-preview2-bin-hadoop2.7/jar
>>>
>>> not pulling anything from maven.
>>>
>>> Backbutton.co.uk
>>> ¯\_(ツ)_/¯
>>> ♡۶Java♡۶RMI ♡۶
>>> Make Use Method {MUM}
>>> makeuse.org
>>> 
>>>
>>>
>>> On Fri, 27 Mar 2020 at 05:45, Wenchen Fan  wrote:
>>>
 Which Spark/Scala version do you use?

 On Fri, Mar 27, 2020 at 1:24 PM Zahid Rahman 
 wrote:

>
> with the following sparksession configuration
>
> val spark = SparkSession.builder().master("local[*]").appName("Spark 
> Session take").getOrCreate();
>
> this line works
>
> flights.filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != 
> "Canada").map(flight_row => flight_row).take(5)
>
>
> however if change the master url like so, with the ip address then the
> following error is produced by the position of .take(5)
>
> val spark = 
> SparkSession.builder().master("spark://192.168.0.38:7077").appName("Spark 
> Session take").getOrCreate();
>
>
> 20/03/27 05:15:20 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID
> 1, 192.168.0.38, executor 0): java.lang.ClassCastException: cannot assign
> instance of java.lang.invoke.SerializedLambda to field
> org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in 
> instance
> of org.apache.spark.rdd.MapPartitionsRDD
>
> BUT if I  remove take(5) or change the position of take(5) or insert
> an extra take(5) as illustrated in code then it works. I don't see why the
> position of take(5) should cause such an error or be caused by changing 
> the
> master url
>
> flights.take(5).filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != 
> "Canada").map(flight_row => flight_row).take(5)
>
>   flights.take(5)
>
>   flights
>   .take(5)
>   .filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
>   .map(fr => flight(fr.DEST_COUNTRY_NAME, fr.ORIGIN_COUNTRY_NAME,fr.count 
> + 5))
>flights.show(5)
>
>
> complete code if you wish to replicate it.
>
> import org.apache.spark.sql.SparkSession
>
> object sessiontest {
>
>   // define specific  data type class then manipulate it using the filter 
> and map functions
>   // this is also known as an Encoder
>   case class flight (DEST_COUNTRY_NAME: String,
>  ORIGIN_COUNTRY_NAME:String,
>  count: BigInt)
>
>
>   def main(args:Array[String]): Unit ={
>
> val spark = 
> SparkSession.builder().master("spark://192.168.0.38:7077").appName("Spark 
> Session take").getOrCreate();
>
> import spark.implicits._
> val flightDf = 
> spark.read.parquet("/data/flight-data/parquet/2010-summary.parquet/")
> val flights = flightDf.as[flight]
>
> flights.take(5).filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME 
> != "Canada").map(flight_row => flight_row).take(5)
>
>   flights.take(5)
>
>   flights
>   .take(5)
>   .filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
>   .map(fr => flight(fr.DEST_COUNTRY_NAME, 
> fr.ORIGIN_COUNTRY_NAME,fr.count + 5))
>flights.show(5)
>
>   } // main
> }
>
>
>
>
>
> Backbutton.co.uk
> ¯\_(ツ)_/¯
> ♡۶Java♡۶RMI ♡۶
> Make Use Method {MUM}
> makeuse.org
> 
>



Re: BUG: take with SparkSession.master[url]

2020-03-26 Thread Zahid Rahman
sbin/start-master.sh
sbin/start-slave.sh spark://192.168.0.38:7077

Backbutton.co.uk
¯\_(ツ)_/¯
♡۶Java♡۶RMI ♡۶
Make Use Method {MUM}
makeuse.org



On Fri, 27 Mar 2020 at 05:59, Wenchen Fan  wrote:

> Your Spark cluster, spark://192.168.0.38:7077, how is it deployed if you
> just include Spark dependency in IntelliJ?
>
> On Fri, Mar 27, 2020 at 1:54 PM Zahid Rahman  wrote:
>
>> I have configured  in IntelliJ as external jars
>> spark-3.0.0-preview2-bin-hadoop2.7/jar
>>
>> not pulling anything from maven.
>>
>> Backbutton.co.uk
>> ¯\_(ツ)_/¯
>> ♡۶Java♡۶RMI ♡۶
>> Make Use Method {MUM}
>> makeuse.org
>> 
>>
>>
>> On Fri, 27 Mar 2020 at 05:45, Wenchen Fan  wrote:
>>
>>> Which Spark/Scala version do you use?
>>>
>>> On Fri, Mar 27, 2020 at 1:24 PM Zahid Rahman 
>>> wrote:
>>>

 with the following sparksession configuration

 val spark = SparkSession.builder().master("local[*]").appName("Spark 
 Session take").getOrCreate();

 this line works

 flights.filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != 
 "Canada").map(flight_row => flight_row).take(5)


 however if change the master url like so, with the ip address then the
 following error is produced by the position of .take(5)

 val spark = 
 SparkSession.builder().master("spark://192.168.0.38:7077").appName("Spark 
 Session take").getOrCreate();


 20/03/27 05:15:20 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID
 1, 192.168.0.38, executor 0): java.lang.ClassCastException: cannot assign
 instance of java.lang.invoke.SerializedLambda to field
 org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance
 of org.apache.spark.rdd.MapPartitionsRDD

 BUT if I  remove take(5) or change the position of take(5) or insert an
 extra take(5) as illustrated in code then it works. I don't see why the
 position of take(5) should cause such an error or be caused by changing the
 master url

 flights.take(5).filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != 
 "Canada").map(flight_row => flight_row).take(5)

   flights.take(5)

   flights
   .take(5)
   .filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
   .map(fr => flight(fr.DEST_COUNTRY_NAME, fr.ORIGIN_COUNTRY_NAME,fr.count 
 + 5))
flights.show(5)


 complete code if you wish to replicate it.

 import org.apache.spark.sql.SparkSession

 object sessiontest {

   // define specific  data type class then manipulate it using the filter 
 and map functions
   // this is also known as an Encoder
   case class flight (DEST_COUNTRY_NAME: String,
  ORIGIN_COUNTRY_NAME:String,
  count: BigInt)


   def main(args:Array[String]): Unit ={

 val spark = 
 SparkSession.builder().master("spark://192.168.0.38:7077").appName("Spark 
 Session take").getOrCreate();

 import spark.implicits._
 val flightDf = 
 spark.read.parquet("/data/flight-data/parquet/2010-summary.parquet/")
 val flights = flightDf.as[flight]

 flights.take(5).filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != 
 "Canada").map(flight_row => flight_row).take(5)

   flights.take(5)

   flights
   .take(5)
   .filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
   .map(fr => flight(fr.DEST_COUNTRY_NAME, 
 fr.ORIGIN_COUNTRY_NAME,fr.count + 5))
flights.show(5)

   } // main
 }





 Backbutton.co.uk
 ¯\_(ツ)_/¯
 ♡۶Java♡۶RMI ♡۶
 Make Use Method {MUM}
 makeuse.org
 

>>>


Re: BUG: take with SparkSession.master[url]

2020-03-26 Thread Wenchen Fan
Your Spark cluster, spark://192.168.0.38:7077, how is it deployed if you
just include Spark dependency in IntelliJ?

On Fri, Mar 27, 2020 at 1:54 PM Zahid Rahman  wrote:

> I have configured  in IntelliJ as external jars
> spark-3.0.0-preview2-bin-hadoop2.7/jar
>
> not pulling anything from maven.
>
> Backbutton.co.uk
> ¯\_(ツ)_/¯
> ♡۶Java♡۶RMI ♡۶
> Make Use Method {MUM}
> makeuse.org
> 
>
>
> On Fri, 27 Mar 2020 at 05:45, Wenchen Fan  wrote:
>
>> Which Spark/Scala version do you use?
>>
>> On Fri, Mar 27, 2020 at 1:24 PM Zahid Rahman 
>> wrote:
>>
>>>
>>> with the following sparksession configuration
>>>
>>> val spark = SparkSession.builder().master("local[*]").appName("Spark 
>>> Session take").getOrCreate();
>>>
>>> this line works
>>>
>>> flights.filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != 
>>> "Canada").map(flight_row => flight_row).take(5)
>>>
>>>
>>> however if change the master url like so, with the ip address then the
>>> following error is produced by the position of .take(5)
>>>
>>> val spark = 
>>> SparkSession.builder().master("spark://192.168.0.38:7077").appName("Spark 
>>> Session take").getOrCreate();
>>>
>>>
>>> 20/03/27 05:15:20 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID
>>> 1, 192.168.0.38, executor 0): java.lang.ClassCastException: cannot assign
>>> instance of java.lang.invoke.SerializedLambda to field
>>> org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance
>>> of org.apache.spark.rdd.MapPartitionsRDD
>>>
>>> BUT if I  remove take(5) or change the position of take(5) or insert an
>>> extra take(5) as illustrated in code then it works. I don't see why the
>>> position of take(5) should cause such an error or be caused by changing the
>>> master url
>>>
>>> flights.take(5).filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != 
>>> "Canada").map(flight_row => flight_row).take(5)
>>>
>>>   flights.take(5)
>>>
>>>   flights
>>>   .take(5)
>>>   .filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
>>>   .map(fr => flight(fr.DEST_COUNTRY_NAME, fr.ORIGIN_COUNTRY_NAME,fr.count + 
>>> 5))
>>>flights.show(5)
>>>
>>>
>>> complete code if you wish to replicate it.
>>>
>>> import org.apache.spark.sql.SparkSession
>>>
>>> object sessiontest {
>>>
>>>   // define specific  data type class then manipulate it using the filter 
>>> and map functions
>>>   // this is also known as an Encoder
>>>   case class flight (DEST_COUNTRY_NAME: String,
>>>  ORIGIN_COUNTRY_NAME:String,
>>>  count: BigInt)
>>>
>>>
>>>   def main(args:Array[String]): Unit ={
>>>
>>> val spark = 
>>> SparkSession.builder().master("spark://192.168.0.38:7077").appName("Spark 
>>> Session take").getOrCreate();
>>>
>>> import spark.implicits._
>>> val flightDf = 
>>> spark.read.parquet("/data/flight-data/parquet/2010-summary.parquet/")
>>> val flights = flightDf.as[flight]
>>>
>>> flights.take(5).filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != 
>>> "Canada").map(flight_row => flight_row).take(5)
>>>
>>>   flights.take(5)
>>>
>>>   flights
>>>   .take(5)
>>>   .filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
>>>   .map(fr => flight(fr.DEST_COUNTRY_NAME, 
>>> fr.ORIGIN_COUNTRY_NAME,fr.count + 5))
>>>flights.show(5)
>>>
>>>   } // main
>>> }
>>>
>>>
>>>
>>>
>>>
>>> Backbutton.co.uk
>>> ¯\_(ツ)_/¯
>>> ♡۶Java♡۶RMI ♡۶
>>> Make Use Method {MUM}
>>> makeuse.org
>>> 
>>>
>>


Re: BUG: take with SparkSession.master[url]

2020-03-26 Thread Zahid Rahman
I have configured  in IntelliJ as external jars
spark-3.0.0-preview2-bin-hadoop2.7/jar

not pulling anything from maven.

Backbutton.co.uk
¯\_(ツ)_/¯
♡۶Java♡۶RMI ♡۶
Make Use Method {MUM}
makeuse.org



On Fri, 27 Mar 2020 at 05:45, Wenchen Fan  wrote:

> Which Spark/Scala version do you use?
>
> On Fri, Mar 27, 2020 at 1:24 PM Zahid Rahman  wrote:
>
>>
>> with the following sparksession configuration
>>
>> val spark = SparkSession.builder().master("local[*]").appName("Spark Session 
>> take").getOrCreate();
>>
>> this line works
>>
>> flights.filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != 
>> "Canada").map(flight_row => flight_row).take(5)
>>
>>
>> however if change the master url like so, with the ip address then the
>> following error is produced by the position of .take(5)
>>
>> val spark = 
>> SparkSession.builder().master("spark://192.168.0.38:7077").appName("Spark 
>> Session take").getOrCreate();
>>
>>
>> 20/03/27 05:15:20 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1,
>> 192.168.0.38, executor 0): java.lang.ClassCastException: cannot assign
>> instance of java.lang.invoke.SerializedLambda to field
>> org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance
>> of org.apache.spark.rdd.MapPartitionsRDD
>>
>> BUT if I  remove take(5) or change the position of take(5) or insert an
>> extra take(5) as illustrated in code then it works. I don't see why the
>> position of take(5) should cause such an error or be caused by changing the
>> master url
>>
>> flights.take(5).filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != 
>> "Canada").map(flight_row => flight_row).take(5)
>>
>>   flights.take(5)
>>
>>   flights
>>   .take(5)
>>   .filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
>>   .map(fr => flight(fr.DEST_COUNTRY_NAME, fr.ORIGIN_COUNTRY_NAME,fr.count + 
>> 5))
>>flights.show(5)
>>
>>
>> complete code if you wish to replicate it.
>>
>> import org.apache.spark.sql.SparkSession
>>
>> object sessiontest {
>>
>>   // define specific  data type class then manipulate it using the filter 
>> and map functions
>>   // this is also known as an Encoder
>>   case class flight (DEST_COUNTRY_NAME: String,
>>  ORIGIN_COUNTRY_NAME:String,
>>  count: BigInt)
>>
>>
>>   def main(args:Array[String]): Unit ={
>>
>> val spark = 
>> SparkSession.builder().master("spark://192.168.0.38:7077").appName("Spark 
>> Session take").getOrCreate();
>>
>> import spark.implicits._
>> val flightDf = 
>> spark.read.parquet("/data/flight-data/parquet/2010-summary.parquet/")
>> val flights = flightDf.as[flight]
>>
>> flights.take(5).filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != 
>> "Canada").map(flight_row => flight_row).take(5)
>>
>>   flights.take(5)
>>
>>   flights
>>   .take(5)
>>   .filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
>>   .map(fr => flight(fr.DEST_COUNTRY_NAME, 
>> fr.ORIGIN_COUNTRY_NAME,fr.count + 5))
>>flights.show(5)
>>
>>   } // main
>> }
>>
>>
>>
>>
>>
>> Backbutton.co.uk
>> ¯\_(ツ)_/¯
>> ♡۶Java♡۶RMI ♡۶
>> Make Use Method {MUM}
>> makeuse.org
>> 
>>
>


Re: BUG: take with SparkSession.master[url]

2020-03-26 Thread Wenchen Fan
Which Spark/Scala version do you use?

On Fri, Mar 27, 2020 at 1:24 PM Zahid Rahman  wrote:

>
> with the following sparksession configuration
>
> val spark = SparkSession.builder().master("local[*]").appName("Spark Session 
> take").getOrCreate();
>
> this line works
>
> flights.filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != 
> "Canada").map(flight_row => flight_row).take(5)
>
>
> however if change the master url like so, with the ip address then the
> following error is produced by the position of .take(5)
>
> val spark = 
> SparkSession.builder().master("spark://192.168.0.38:7077").appName("Spark 
> Session take").getOrCreate();
>
>
> 20/03/27 05:15:20 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1,
> 192.168.0.38, executor 0): java.lang.ClassCastException: cannot assign
> instance of java.lang.invoke.SerializedLambda to field
> org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance
> of org.apache.spark.rdd.MapPartitionsRDD
>
> BUT if I  remove take(5) or change the position of take(5) or insert an
> extra take(5) as illustrated in code then it works. I don't see why the
> position of take(5) should cause such an error or be caused by changing the
> master url
>
> flights.take(5).filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != 
> "Canada").map(flight_row => flight_row).take(5)
>
>   flights.take(5)
>
>   flights
>   .take(5)
>   .filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
>   .map(fr => flight(fr.DEST_COUNTRY_NAME, fr.ORIGIN_COUNTRY_NAME,fr.count + 
> 5))
>flights.show(5)
>
>
> complete code if you wish to replicate it.
>
> import org.apache.spark.sql.SparkSession
>
> object sessiontest {
>
>   // define specific  data type class then manipulate it using the filter and 
> map functions
>   // this is also known as an Encoder
>   case class flight (DEST_COUNTRY_NAME: String,
>  ORIGIN_COUNTRY_NAME:String,
>  count: BigInt)
>
>
>   def main(args:Array[String]): Unit ={
>
> val spark = 
> SparkSession.builder().master("spark://192.168.0.38:7077").appName("Spark 
> Session take").getOrCreate();
>
> import spark.implicits._
> val flightDf = 
> spark.read.parquet("/data/flight-data/parquet/2010-summary.parquet/")
> val flights = flightDf.as[flight]
>
> flights.take(5).filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != 
> "Canada").map(flight_row => flight_row).take(5)
>
>   flights.take(5)
>
>   flights
>   .take(5)
>   .filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
>   .map(fr => flight(fr.DEST_COUNTRY_NAME, fr.ORIGIN_COUNTRY_NAME,fr.count 
> + 5))
>flights.show(5)
>
>   } // main
> }
>
>
>
>
>
> Backbutton.co.uk
> ¯\_(ツ)_/¯
> ♡۶Java♡۶RMI ♡۶
> Make Use Method {MUM}
> makeuse.org
> 
>


BUG: take with SparkSession.master[url]

2020-03-26 Thread Zahid Rahman
with the following sparksession configuration

val spark = SparkSession.builder().master("local[*]").appName("Spark
Session take").getOrCreate();

this line works

flights.filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME !=
"Canada").map(flight_row => flight_row).take(5)


however if change the master url like so, with the ip address then the
following error is produced by the position of .take(5)

val spark = 
SparkSession.builder().master("spark://192.168.0.38:7077").appName("Spark
Session take").getOrCreate();


20/03/27 05:15:20 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1,
192.168.0.38, executor 0): java.lang.ClassCastException: cannot assign
instance of java.lang.invoke.SerializedLambda to field
org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance
of org.apache.spark.rdd.MapPartitionsRDD

BUT if I  remove take(5) or change the position of take(5) or insert an
extra take(5) as illustrated in code then it works. I don't see why the
position of take(5) should cause such an error or be caused by changing the
master url

flights.take(5).filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME !=
"Canada").map(flight_row => flight_row).take(5)

  flights.take(5)

  flights
  .take(5)
  .filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
  .map(fr => flight(fr.DEST_COUNTRY_NAME, fr.ORIGIN_COUNTRY_NAME,fr.count + 5))
   flights.show(5)


complete code if you wish to replicate it.

import org.apache.spark.sql.SparkSession

object sessiontest {

  // define specific  data type class then manipulate it using the
filter and map functions
  // this is also known as an Encoder
  case class flight (DEST_COUNTRY_NAME: String,
 ORIGIN_COUNTRY_NAME:String,
 count: BigInt)


  def main(args:Array[String]): Unit ={

val spark =
SparkSession.builder().master("spark://192.168.0.38:7077").appName("Spark
Session take").getOrCreate();

import spark.implicits._
val flightDf =
spark.read.parquet("/data/flight-data/parquet/2010-summary.parquet/")
val flights = flightDf.as[flight]

flights.take(5).filter(flight_row =>
flight_row.ORIGIN_COUNTRY_NAME != "Canada").map(flight_row =>
flight_row).take(5)

  flights.take(5)

  flights
  .take(5)
  .filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
  .map(fr => flight(fr.DEST_COUNTRY_NAME,
fr.ORIGIN_COUNTRY_NAME,fr.count + 5))
   flights.show(5)

  } // main
}





Backbutton.co.uk
¯\_(ツ)_/¯
♡۶Java♡۶RMI ♡۶
Make Use Method {MUM}
makeuse.org



Spark-10848 bug in 2.4.4

2019-10-11 Thread Jatin Puri
Hi.

This bug still exists in 2.4.4: 
https://issues.apache.org/jira/browse/SPARK-10848
The `nullable` value is always set as `true` atleast when reading via `json()`. 
Should I log a new issue?

Is there a temporary workaround?

Regards,
Jatin



Re: Dataset schema incompatibility bug when reading column partitioned data

2019-04-13 Thread Felix Cheung
I kinda agree it is confusing when a parameter is not used...


From: Ryan Blue 
Sent: Thursday, April 11, 2019 11:07:25 AM
To: Bruce Robbins
Cc: Dávid Szakállas; Spark Dev List
Subject: Re: Dataset schema incompatibility bug when reading column partitioned 
data


I think the confusion is that the schema passed to spark.read is not a 
projection schema. I don’t think it is even used in this case because the 
Parquet dataset has its own schema. You’re getting the schema of the table. I 
think the correct behavior is to reject a user-specified schema in this case.

On Thu, Apr 11, 2019 at 11:04 AM Bruce Robbins 
mailto:bersprock...@gmail.com>> wrote:
I see a Jira:

https://issues.apache.org/jira/browse/SPARK-21021

On Thu, Apr 11, 2019 at 9:08 AM Dávid Szakállas 
mailto:david.szakal...@gmail.com>> wrote:
+dev for more visibility. Is this a known issue? Is there a plan for a fix?

Thanks,
David

Begin forwarded message:

From: Dávid Szakállas 
mailto:david.szakal...@gmail.com>>
Subject: Dataset schema incompatibility bug when reading column partitioned data
Date: 2019. March 29. 14:15:27 CET
To: u...@spark.apache.org<mailto:u...@spark.apache.org>

We observed the following bug on Spark 2.4.0:


scala> 
spark.createDataset(Seq((1,2))).write.partitionBy("_1").parquet("foo.parquet")

scala> val schema = StructType(Seq(StructField("_1", 
IntegerType),StructField("_2", IntegerType)))

scala> spark.read.schema(schema).parquet("foo.parquet").as[(Int, Int)].show
+---+---+
| _2| _1|
+---+---+
|  2|  1|
+---+- --+

That is, when reading column partitioned Parquet files the explicitly specified 
schema is not adhered to, instead the partitioning columns are appended the end 
of the column list. This is a quite severe issue as some operations, such as 
union, fails if columns are in a different order in two datasets. Thus we have 
to work around the issue with a select:

val columnNames = schema.fields.map(_.name)
ds.select(columnNames.head, columnNames.tail: _*)


Thanks,
David Szakallas
Data Engineer | Whitepages, Inc.



--
Ryan Blue
Software Engineer
Netflix


Re: Dataset schema incompatibility bug when reading column partitioned data

2019-04-11 Thread Ryan Blue
I think the confusion is that the schema passed to spark.read is not a
projection schema. I don’t think it is even used in this case because the
Parquet dataset has its own schema. You’re getting the schema of the table.
I think the correct behavior is to reject a user-specified schema in this
case.

On Thu, Apr 11, 2019 at 11:04 AM Bruce Robbins 
wrote:

> I see a Jira:
>
> https://issues.apache.org/jira/browse/SPARK-21021
>
> On Thu, Apr 11, 2019 at 9:08 AM Dávid Szakállas 
> wrote:
>
>> +dev for more visibility. Is this a known issue? Is there a plan for a
>> fix?
>>
>> Thanks,
>> David
>>
>> Begin forwarded message:
>>
>> *From: *Dávid Szakállas 
>> *Subject: **Dataset schema incompatibility bug when reading column
>> partitioned data*
>> *Date: *2019. March 29. 14:15:27 CET
>> *To: *u...@spark.apache.org
>>
>> We observed the following bug on Spark 2.4.0:
>>
>> scala> 
>> spark.createDataset(Seq((1,2))).write.partitionBy("_1").parquet("foo.parquet")
>>
>> scala> val schema = StructType(Seq(StructField("_1", 
>> IntegerType),StructField("_2", IntegerType)))
>>
>> scala> spark.read.schema(schema).parquet("foo.parquet").as[(Int, Int)].show
>> +---+---+
>> | _2| _1|
>> +---+---+
>> |  2|  1|
>> +---+- --+
>>
>>
>> That is, when reading column partitioned Parquet files the explicitly
>> specified schema is not adhered to, instead the partitioning columns are
>> appended the end of the column list. This is a quite severe issue as some
>> operations, such as union, fails if columns are in a different order in two
>> datasets. Thus we have to work around the issue with a select:
>>
>> val columnNames = schema.fields.map(_.name)
>> ds.select(columnNames.head, columnNames.tail: _*)
>>
>>
>> Thanks,
>> David Szakallas
>> Data Engineer | Whitepages, Inc.
>>
>>
>>

-- 
Ryan Blue
Software Engineer
Netflix


Re: Dataset schema incompatibility bug when reading column partitioned data

2019-04-11 Thread Bruce Robbins
I see a Jira:

https://issues.apache.org/jira/browse/SPARK-21021

On Thu, Apr 11, 2019 at 9:08 AM Dávid Szakállas 
wrote:

> +dev for more visibility. Is this a known issue? Is there a plan for a fix?
>
> Thanks,
> David
>
> Begin forwarded message:
>
> *From: *Dávid Szakállas 
> *Subject: **Dataset schema incompatibility bug when reading column
> partitioned data*
> *Date: *2019. March 29. 14:15:27 CET
> *To: *u...@spark.apache.org
>
> We observed the following bug on Spark 2.4.0:
>
> scala> 
> spark.createDataset(Seq((1,2))).write.partitionBy("_1").parquet("foo.parquet")
>
> scala> val schema = StructType(Seq(StructField("_1", 
> IntegerType),StructField("_2", IntegerType)))
>
> scala> spark.read.schema(schema).parquet("foo.parquet").as[(Int, Int)].show
> +---+---+
> | _2| _1|
> +---+---+
> |  2|  1|
> +---+- --+
>
>
> That is, when reading column partitioned Parquet files the explicitly
> specified schema is not adhered to, instead the partitioning columns are
> appended the end of the column list. This is a quite severe issue as some
> operations, such as union, fails if columns are in a different order in two
> datasets. Thus we have to work around the issue with a select:
>
> val columnNames = schema.fields.map(_.name)
> ds.select(columnNames.head, columnNames.tail: _*)
>
>
> Thanks,
> David Szakallas
> Data Engineer | Whitepages, Inc.
>
>
>


Fwd: Dataset schema incompatibility bug when reading column partitioned data

2019-04-11 Thread Dávid Szakállas
+dev for more visibility. Is this a known issue? Is there a plan for a fix?

Thanks,
David

> Begin forwarded message:
> 
> From: Dávid Szakállas 
> Subject: Dataset schema incompatibility bug when reading column partitioned 
> data
> Date: 2019. March 29. 14:15:27 CET
> To: u...@spark.apache.org
> 
> We observed the following bug on Spark 2.4.0:
> 
> scala> 
> spark.createDataset(Seq((1,2))).write.partitionBy("_1").parquet("foo.parquet")
> 
> scala> val schema = StructType(Seq(StructField("_1", 
> IntegerType),StructField("_2", IntegerType)))
> 
> scala> spark.read.schema(schema).parquet("foo.parquet").as[(Int, Int)].show
> +---+---+
> | _2| _1|
> +---+---+
> |  2|  1|
> +---+- --+
> 
> That is, when reading column partitioned Parquet files the explicitly 
> specified schema is not adhered to, instead the partitioning columns are 
> appended the end of the column list. This is a quite severe issue as some 
> operations, such as union, fails if columns are in a different order in two 
> datasets. Thus we have to work around the issue with a select:
> 
> val columnNames = schema.fields.map(_.name)
> ds.select(columnNames.head, columnNames.tail: _*)
> 
> 
> Thanks, 
> David Szakallas
> Data Engineer | Whitepages, Inc.



A bug from 1.6.5 to 2.4.0

2019-02-17 Thread Moein Hosseini
Today I face with bug on HA master with Zookeeper on 2.4.0 and found there
is issue for it from 1.6.5 SPARK-15544
<https://issues.apache.org/jira/browse/SPARK-15544> but not assigned or PR
about it. As it opened in 2016, please check it and consider it.

Best regards

-- 

Moein Hosseini
Data Engineer
mobile: +98 912 468 1859 <+98+912+468+1859>
site: www.moein.xyz
email: moein...@gmail.com
[image: linkedin] <https://www.linkedin.com/in/moeinhm>
[image: twitter] <https://twitter.com/moein7tl>


Re: Possible bug in DatasourceV2

2018-10-11 Thread Hyukjin Kwon
Thanks, Wenchen.

I opened https://github.com/apache/spark/pull/22697
but I believe https://github.com/apache/spark/pull/22688 is still valid as
well in the master branch since master branch still supports i, the
readsupport should be made only when it's needed and it's already open.

2018년 10월 11일 (목) 오후 8:19, Wenchen Fan 님이 작성:

> Hi Hyukjin, can you open a PR to revert it from 2.4? Now I'm kind of
> convinced this is too breaking and we need more discussion.
>
> + Ryan Blue
> Hi Ryan,
> I think we need to look back at the new write API design and consider data
> sources that don't have table concept. We should opt-in for the schema
> validation of append operator.
>
> On Thu, Oct 11, 2018 at 8:12 PM Hyukjin Kwon  wrote:
>
>> That's why I initially suggested to revert this part out of Spark 2.4 and
>> have more discussion at 3.0 since one of the design goal of Data source V2
>> is no behaviour changes to end users.
>>
>> 2018년 10월 11일 (목) 오후 7:11, Mendelson, Assaf 님이
>> 작성:
>>
>>> Actually, it is not just a question of a write only data source. The
>>> issue is that in my case (and I imagine this is true for others), the
>>> schema is not read from the database but is understood from the options.
>>> This means that I have no way of understanding the schema without supplying
>>> the read options. On the other hand, when writing, I have the schema from
>>> the dataframe.
>>>
>>>
>>>
>>> I know the data source V2 API is considered experimental API and I have
>>> no problem with it, however, this means that the change will require a
>>> change in how the end user works with it (they suddenly need to add schema
>>> information which they did not before), not to mention this being a
>>> regression.
>>>
>>>
>>>
>>> As to the pull request, this only handles cases where the save mode is
>>> not append, for the original example (having non existent path but have
>>> append will still fail and according to the documentation of Append, if the
>>> path does not exist it should create it).
>>>
>>>
>>>
>>> I am currently having problem compiling everything so I can’t test it
>>> myself but wouldn’t changing the relation definition in “save”:
>>>
>>>
>>>
>>> val relation = DataSourceV2Relation.create(source, options, None,
>>> Option(df.schema))
>>>
>>>
>>>
>>> and changing create to look like this:
>>>
>>>
>>>
>>> def create(source: DataSourceV2, options: Map[String, String],
>>> tableIdent: Option[TableIdentifier] = None, userSpecifiedSchema:
>>> Option[StructType] = None): DataSourceV2Relation = {
>>>
>>> val schema =
>>> userSpecifiedSchema.getOrElse(source.createReader(options,
>>> userSpecifiedSchema).readSchema())
>>>
>>> val ident = tableIdent.orElse(tableFromOptions(options))
>>>
>>> DataSourceV2Relation(
>>>
>>>   source, schema.toAttributes, options, ident, userSpecifiedSchema)
>>>
>>>   }
>>>
>>>
>>>
>>> Correct this?
>>>
>>>
>>>
>>> Or even creating a new create which simply gets the schema as non
>>> optional?
>>>
>>>
>>>
>>> Thanks,
>>>
>>> Assaf
>>>
>>>
>>>
>>> *From:* Hyukjin Kwon [mailto:gurwls...@gmail.com]
>>> *Sent:* Thursday, October 11, 2018 10:24 AM
>>> *To:* Mendelson, Assaf; Wenchen Fan
>>> *Cc:* dev
>>> *Subject:* Re: Possible bug in DatasourceV2
>>>
>>>
>>>
>>> [EXTERNAL EMAIL]
>>> Please report any suspicious attachments, links, or requests for
>>> sensitive information.
>>>
>>> See https://github.com/apache/spark/pull/22688
>>>
>>>
>>>
>>> +WEnchen, here looks the problem raised. This might have to be
>>> considered as a blocker ...
>>>
>>> On Thu, 11 Oct 2018, 2:48 pm assaf.mendelson, 
>>> wrote:
>>>
>>> Hi,
>>>
>>> I created a datasource writer WITHOUT a reader. When I do, I get an
>>> exception: org.apache.spark.sql.AnalysisException: Data source is not
>>> readable: DefaultSource
>>>
>>> The reason for this is that when save is called, inside the source match
>>> to
>>> WriterSupport we have the following code:
>>>
>>> val sou

Re: Possible bug in DatasourceV2

2018-10-11 Thread Wenchen Fan
Hi Hyukjin, can you open a PR to revert it from 2.4? Now I'm kind of
convinced this is too breaking and we need more discussion.

+ Ryan Blue
Hi Ryan,
I think we need to look back at the new write API design and consider data
sources that don't have table concept. We should opt-in for the schema
validation of append operator.

On Thu, Oct 11, 2018 at 8:12 PM Hyukjin Kwon  wrote:

> That's why I initially suggested to revert this part out of Spark 2.4 and
> have more discussion at 3.0 since one of the design goal of Data source V2
> is no behaviour changes to end users.
>
> 2018년 10월 11일 (목) 오후 7:11, Mendelson, Assaf 님이
> 작성:
>
>> Actually, it is not just a question of a write only data source. The
>> issue is that in my case (and I imagine this is true for others), the
>> schema is not read from the database but is understood from the options.
>> This means that I have no way of understanding the schema without supplying
>> the read options. On the other hand, when writing, I have the schema from
>> the dataframe.
>>
>>
>>
>> I know the data source V2 API is considered experimental API and I have
>> no problem with it, however, this means that the change will require a
>> change in how the end user works with it (they suddenly need to add schema
>> information which they did not before), not to mention this being a
>> regression.
>>
>>
>>
>> As to the pull request, this only handles cases where the save mode is
>> not append, for the original example (having non existent path but have
>> append will still fail and according to the documentation of Append, if the
>> path does not exist it should create it).
>>
>>
>>
>> I am currently having problem compiling everything so I can’t test it
>> myself but wouldn’t changing the relation definition in “save”:
>>
>>
>>
>> val relation = DataSourceV2Relation.create(source, options, None,
>> Option(df.schema))
>>
>>
>>
>> and changing create to look like this:
>>
>>
>>
>> def create(source: DataSourceV2, options: Map[String, String],
>> tableIdent: Option[TableIdentifier] = None, userSpecifiedSchema:
>> Option[StructType] = None): DataSourceV2Relation = {
>>
>> val schema =
>> userSpecifiedSchema.getOrElse(source.createReader(options,
>> userSpecifiedSchema).readSchema())
>>
>> val ident = tableIdent.orElse(tableFromOptions(options))
>>
>> DataSourceV2Relation(
>>
>>   source, schema.toAttributes, options, ident, userSpecifiedSchema)
>>
>>   }
>>
>>
>>
>> Correct this?
>>
>>
>>
>> Or even creating a new create which simply gets the schema as non
>> optional?
>>
>>
>>
>> Thanks,
>>
>> Assaf
>>
>>
>>
>> *From:* Hyukjin Kwon [mailto:gurwls...@gmail.com]
>> *Sent:* Thursday, October 11, 2018 10:24 AM
>> *To:* Mendelson, Assaf; Wenchen Fan
>> *Cc:* dev
>> *Subject:* Re: Possible bug in DatasourceV2
>>
>>
>>
>> [EXTERNAL EMAIL]
>> Please report any suspicious attachments, links, or requests for
>> sensitive information.
>>
>> See https://github.com/apache/spark/pull/22688
>>
>>
>>
>> +WEnchen, here looks the problem raised. This might have to be considered
>> as a blocker ...
>>
>> On Thu, 11 Oct 2018, 2:48 pm assaf.mendelson, 
>> wrote:
>>
>> Hi,
>>
>> I created a datasource writer WITHOUT a reader. When I do, I get an
>> exception: org.apache.spark.sql.AnalysisException: Data source is not
>> readable: DefaultSource
>>
>> The reason for this is that when save is called, inside the source match
>> to
>> WriterSupport we have the following code:
>>
>> val source = cls.newInstance().asInstanceOf[DataSourceV2]
>>   source match {
>> case ws: WriteSupport =>
>>   val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
>> source,
>> df.sparkSession.sessionState.conf)
>>   val options = sessionOptions ++ extraOptions
>> -->  val relation = DataSourceV2Relation.create(source, options)
>>
>>   if (mode == SaveMode.Append) {
>> runCommand(df.sparkSession, "save") {
>>   AppendData.byName(relation, df.logicalPlan)
>> }
>>
>>   } else {
>> val writer = ws.createWriter(
>>   UUID.randomUUID.toString,
>> df.logicalPlan.output.toStructType,
>&

Re: Possible bug in DatasourceV2

2018-10-11 Thread Hyukjin Kwon
That's why I initially suggested to revert this part out of Spark 2.4 and
have more discussion at 3.0 since one of the design goal of Data source V2
is no behaviour changes to end users.

2018년 10월 11일 (목) 오후 7:11, Mendelson, Assaf 님이 작성:

> Actually, it is not just a question of a write only data source. The issue
> is that in my case (and I imagine this is true for others), the schema is
> not read from the database but is understood from the options. This means
> that I have no way of understanding the schema without supplying the read
> options. On the other hand, when writing, I have the schema from the
> dataframe.
>
>
>
> I know the data source V2 API is considered experimental API and I have no
> problem with it, however, this means that the change will require a change
> in how the end user works with it (they suddenly need to add schema
> information which they did not before), not to mention this being a
> regression.
>
>
>
> As to the pull request, this only handles cases where the save mode is not
> append, for the original example (having non existent path but have append
> will still fail and according to the documentation of Append, if the path
> does not exist it should create it).
>
>
>
> I am currently having problem compiling everything so I can’t test it
> myself but wouldn’t changing the relation definition in “save”:
>
>
>
> val relation = DataSourceV2Relation.create(source, options, None,
> Option(df.schema))
>
>
>
> and changing create to look like this:
>
>
>
> def create(source: DataSourceV2, options: Map[String, String], tableIdent:
> Option[TableIdentifier] = None, userSpecifiedSchema: Option[StructType] =
> None): DataSourceV2Relation = {
>
> val schema =
> userSpecifiedSchema.getOrElse(source.createReader(options,
> userSpecifiedSchema).readSchema())
>
> val ident = tableIdent.orElse(tableFromOptions(options))
>
> DataSourceV2Relation(
>
>   source, schema.toAttributes, options, ident, userSpecifiedSchema)
>
>   }
>
>
>
> Correct this?
>
>
>
> Or even creating a new create which simply gets the schema as non optional?
>
>
>
> Thanks,
>
> Assaf
>
>
>
> *From:* Hyukjin Kwon [mailto:gurwls...@gmail.com]
> *Sent:* Thursday, October 11, 2018 10:24 AM
> *To:* Mendelson, Assaf; Wenchen Fan
> *Cc:* dev
> *Subject:* Re: Possible bug in DatasourceV2
>
>
>
> [EXTERNAL EMAIL]
> Please report any suspicious attachments, links, or requests for sensitive
> information.
>
> See https://github.com/apache/spark/pull/22688
>
>
>
> +WEnchen, here looks the problem raised. This might have to be considered
> as a blocker ...
>
> On Thu, 11 Oct 2018, 2:48 pm assaf.mendelson, 
> wrote:
>
> Hi,
>
> I created a datasource writer WITHOUT a reader. When I do, I get an
> exception: org.apache.spark.sql.AnalysisException: Data source is not
> readable: DefaultSource
>
> The reason for this is that when save is called, inside the source match to
> WriterSupport we have the following code:
>
> val source = cls.newInstance().asInstanceOf[DataSourceV2]
>   source match {
> case ws: WriteSupport =>
>   val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
> source,
> df.sparkSession.sessionState.conf)
>   val options = sessionOptions ++ extraOptions
> -->  val relation = DataSourceV2Relation.create(source, options)
>
>   if (mode == SaveMode.Append) {
> runCommand(df.sparkSession, "save") {
>   AppendData.byName(relation, df.logicalPlan)
> }
>
>   } else {
> val writer = ws.createWriter(
>   UUID.randomUUID.toString, df.logicalPlan.output.toStructType,
> mode,
>   new DataSourceOptions(options.asJava))
>
> if (writer.isPresent) {
>   runCommand(df.sparkSession, "save") {
> WriteToDataSourceV2(writer.get, df.logicalPlan)
>   }
> }
>   }
>
> but DataSourceV2Relation.create actively creates a reader
> (source.createReader) to extract the schema:
>
> def create(
>   source: DataSourceV2,
>   options: Map[String, String],
>   tableIdent: Option[TableIdentifier] = None,
>   userSpecifiedSchema: Option[StructType] = None): DataSourceV2Relation
> = {
> val reader = source.createReader(options, userSpecifiedSchema)
> val ident = tableIdent.orElse(tableFromOptions(options))
> DataSourceV2Relation(
>   source, reader.readSchema().toAttributes, options, ident,
> userSpecifiedSchema)
>   }
>
&

RE: Possible bug in DatasourceV2

2018-10-11 Thread Mendelson, Assaf
Actually, it is not just a question of a write only data source. The issue is 
that in my case (and I imagine this is true for others), the schema is not read 
from the database but is understood from the options. This means that I have no 
way of understanding the schema without supplying the read options. On the 
other hand, when writing, I have the schema from the dataframe.

I know the data source V2 API is considered experimental API and I have no 
problem with it, however, this means that the change will require a change in 
how the end user works with it (they suddenly need to add schema information 
which they did not before), not to mention this being a regression.

As to the pull request, this only handles cases where the save mode is not 
append, for the original example (having non existent path but have append will 
still fail and according to the documentation of Append, if the path does not 
exist it should create it).

I am currently having problem compiling everything so I can’t test it myself 
but wouldn’t changing the relation definition in “save”:

val relation = DataSourceV2Relation.create(source, options, None, 
Option(df.schema))

and changing create to look like this:

def create(source: DataSourceV2, options: Map[String, String], tableIdent: 
Option[TableIdentifier] = None, userSpecifiedSchema: Option[StructType] = 
None): DataSourceV2Relation = {
val schema = userSpecifiedSchema.getOrElse(source.createReader(options, 
userSpecifiedSchema).readSchema())
val ident = tableIdent.orElse(tableFromOptions(options))
DataSourceV2Relation(
  source, schema.toAttributes, options, ident, userSpecifiedSchema)
  }

Correct this?

Or even creating a new create which simply gets the schema as non optional?

Thanks,
Assaf

From: Hyukjin Kwon [mailto:gurwls...@gmail.com]
Sent: Thursday, October 11, 2018 10:24 AM
To: Mendelson, Assaf; Wenchen Fan
Cc: dev
Subject: Re: Possible bug in DatasourceV2


[EXTERNAL EMAIL]
Please report any suspicious attachments, links, or requests for sensitive 
information.
See https://github.com/apache/spark/pull/22688

+WEnchen, here looks the problem raised. This might have to be considered as a 
blocker ...

On Thu, 11 Oct 2018, 2:48 pm assaf.mendelson, 
mailto:assaf.mendel...@rsa.com>> wrote:
Hi,

I created a datasource writer WITHOUT a reader. When I do, I get an
exception: org.apache.spark.sql.AnalysisException: Data source is not
readable: DefaultSource

The reason for this is that when save is called, inside the source match to
WriterSupport we have the following code:

val source = cls.newInstance().asInstanceOf[DataSourceV2]
  source match {
case ws: WriteSupport =>
  val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
source,
df.sparkSession.sessionState.conf)
  val options = sessionOptions ++ extraOptions
-->  val relation = DataSourceV2Relation.create(source, options)

  if (mode == SaveMode.Append) {
runCommand(df.sparkSession, "save") {
  AppendData.byName(relation, df.logicalPlan)
}

  } else {
val writer = ws.createWriter(
  UUID.randomUUID.toString, df.logicalPlan.output.toStructType,
mode,
  new DataSourceOptions(options.asJava))

if (writer.isPresent) {
  runCommand(df.sparkSession, "save") {
WriteToDataSourceV2(writer.get, df.logicalPlan)
  }
}
  }

but DataSourceV2Relation.create actively creates a reader
(source.createReader) to extract the schema:

def create(
  source: DataSourceV2,
  options: Map[String, String],
  tableIdent: Option[TableIdentifier] = None,
  userSpecifiedSchema: Option[StructType] = None): DataSourceV2Relation
= {
val reader = source.createReader(options, userSpecifiedSchema)
val ident = tableIdent.orElse(tableFromOptions(options))
DataSourceV2Relation(
  source, reader.readSchema().toAttributes, options, ident,
userSpecifiedSchema)
  }


This makes me a little confused.

First, the schema is defined by the dataframe itself, not by the data
source, i.e. it should be extracted from df.schema and not by
source.createReader

Second, I see that relation is actually only use if the mode is
SaveMode.append (btw this means if it is needed it should be defined inside
the "if"). I am not sure I understand the portion of the AppendData but why
would reading from the source be included?

Am I missing something here?

Thanks,
   Assaf



--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

-
To unsubscribe e-mail: 
dev-unsubscr...@spark.apache.org<mailto:dev-unsubscr...@spark.apache.org>


Re: Possible bug in DatasourceV2

2018-10-11 Thread Hyukjin Kwon
See https://github.com/apache/spark/pull/22688

+WEnchen, here looks the problem raised. This might have to be considered
as a blocker ...


On Thu, 11 Oct 2018, 2:48 pm assaf.mendelson, 
wrote:

> Hi,
>
> I created a datasource writer WITHOUT a reader. When I do, I get an
> exception: org.apache.spark.sql.AnalysisException: Data source is not
> readable: DefaultSource
>
> The reason for this is that when save is called, inside the source match to
> WriterSupport we have the following code:
>
> val source = cls.newInstance().asInstanceOf[DataSourceV2]
>   source match {
> case ws: WriteSupport =>
>   val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
> source,
> df.sparkSession.sessionState.conf)
>   val options = sessionOptions ++ extraOptions
> -->  val relation = DataSourceV2Relation.create(source, options)
>
>   if (mode == SaveMode.Append) {
> runCommand(df.sparkSession, "save") {
>   AppendData.byName(relation, df.logicalPlan)
> }
>
>   } else {
> val writer = ws.createWriter(
>   UUID.randomUUID.toString, df.logicalPlan.output.toStructType,
> mode,
>   new DataSourceOptions(options.asJava))
>
> if (writer.isPresent) {
>   runCommand(df.sparkSession, "save") {
> WriteToDataSourceV2(writer.get, df.logicalPlan)
>   }
> }
>   }
>
> but DataSourceV2Relation.create actively creates a reader
> (source.createReader) to extract the schema:
>
> def create(
>   source: DataSourceV2,
>   options: Map[String, String],
>   tableIdent: Option[TableIdentifier] = None,
>   userSpecifiedSchema: Option[StructType] = None): DataSourceV2Relation
> = {
> val reader = source.createReader(options, userSpecifiedSchema)
> val ident = tableIdent.orElse(tableFromOptions(options))
> DataSourceV2Relation(
>   source, reader.readSchema().toAttributes, options, ident,
> userSpecifiedSchema)
>   }
>
>
> This makes me a little confused.
>
> First, the schema is defined by the dataframe itself, not by the data
> source, i.e. it should be extracted from df.schema and not by
> source.createReader
>
> Second, I see that relation is actually only use if the mode is
> SaveMode.append (btw this means if it is needed it should be defined inside
> the "if"). I am not sure I understand the portion of the AppendData but why
> would reading from the source be included?
>
> Am I missing something here?
>
> Thanks,
>Assaf
>
>
>
> --
> Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


Possible bug in DatasourceV2

2018-10-10 Thread assaf.mendelson
Hi,

I created a datasource writer WITHOUT a reader. When I do, I get an
exception: org.apache.spark.sql.AnalysisException: Data source is not
readable: DefaultSource

The reason for this is that when save is called, inside the source match to
WriterSupport we have the following code:

val source = cls.newInstance().asInstanceOf[DataSourceV2]
  source match {
case ws: WriteSupport =>
  val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
source,
df.sparkSession.sessionState.conf)
  val options = sessionOptions ++ extraOptions
-->  val relation = DataSourceV2Relation.create(source, options)

  if (mode == SaveMode.Append) {
runCommand(df.sparkSession, "save") {
  AppendData.byName(relation, df.logicalPlan)
}

  } else {
val writer = ws.createWriter(
  UUID.randomUUID.toString, df.logicalPlan.output.toStructType,
mode,
  new DataSourceOptions(options.asJava))

if (writer.isPresent) {
  runCommand(df.sparkSession, "save") {
WriteToDataSourceV2(writer.get, df.logicalPlan)
  }
}
  }

but DataSourceV2Relation.create actively creates a reader
(source.createReader) to extract the schema: 

def create(
  source: DataSourceV2,
  options: Map[String, String],
  tableIdent: Option[TableIdentifier] = None,
  userSpecifiedSchema: Option[StructType] = None): DataSourceV2Relation
= {
val reader = source.createReader(options, userSpecifiedSchema)
val ident = tableIdent.orElse(tableFromOptions(options))
DataSourceV2Relation(
  source, reader.readSchema().toAttributes, options, ident,
userSpecifiedSchema)
  }


This makes me a little confused.

First, the schema is defined by the dataframe itself, not by the data
source, i.e. it should be extracted from df.schema and not by
source.createReader

Second, I see that relation is actually only use if the mode is
SaveMode.append (btw this means if it is needed it should be defined inside
the "if"). I am not sure I understand the portion of the AppendData but why
would reading from the source be included? 

Am I missing something here?

Thanks,
   Assaf



--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Spark data quality bug when reading parquet files from hive metastore

2018-09-07 Thread Long, Andrew
Thanks Fokko,

I will definitely take a look at this.

Cheers Andrew

From: "Driesprong, Fokko" 
Date: Friday, August 24, 2018 at 2:39 AM
To: "reubensaw...@hotmail.com" 
Cc: "dev@spark.apache.org" 
Subject: Re: Spark data quality bug when reading parquet files from hive 
metastore

Hi Andrew,

This blog gives an idea how to schema is resolved: 
https://blog.godatadriven.com/multiformat-spark-partition There is some 
optimisation going on when reading Parquet using Spark. Hope this helps.

Cheers, Fokko


Op wo 22 aug. 2018 om 23:59 schreef t4 
mailto:reubensaw...@hotmail.com>>:
https://issues.apache.org/jira/browse/SPARK-23576 ?



--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

-
To unsubscribe e-mail: 
dev-unsubscr...@spark.apache.org<mailto:dev-unsubscr...@spark.apache.org>


Re: TimSort bug

2018-08-31 Thread Reynold Xin
Thanks for looking into this, Sean! Loved the tl;dr.


On Fri, Aug 31, 2018 at 12:28 PM Sean Owen  wrote:

> TL;DR - We already had the fix from SPARK-5984. The delta from the current
> JDK implementation to Spark's looks actually inconsequential. No action
> required AFAICT.
>
> On Fri, Aug 31, 2018 at 12:30 PM Sean Owen  wrote:
>
>> I looked into this, because it sure sounds like a similar issue from a
>> few years ago that was fixed in
>> https://issues.apache.org/jira/browse/SPARK-5984
>> The change in that JIRA actually looks almost identical to the change
>> mentioned in the JDK bug:
>> http://hg.openjdk.java.net/jdk/jdk/rev/3a6d47df8239
>>
>> Reading the paper
>> http://drops.dagstuhl.de/opus/volltexte/2018/9467/pdf/LIPIcs-ESA-2018-4.pdf 
>> in
>> section 5 a little more, I think they are saying that there were two ways
>> to fix the original problem: a) fix the invariant, or b) increase some data
>> structure size. Java did the latter, it seems, and now they've shown it's
>> actually still busted. However Python and the original paper did the
>> former, and we seem to have copied that fix from
>> http://envisage-project.eu/proving-android-java-and-python-sorting-algorithm-is-broken-and-how-to-fix-it/
>>  My
>> understanding is that this still works, and is what Java *now* implements.
>>
>> The only difference I can see in implementation is an extra check for a
>> negative array index before dereferencing array[n]. We can add that for
>> full consistency with the JVM change, I suppose. I don't think it's
>> relevant to the problem reported in the paper, but could be an issue
>> otherwise. The JVM implementation suggests it thinks this needs to be
>> guarded.
>>
>> I did hack together a crude translation of the paper's bug reproduction
>> at http://igm.univ-mlv.fr/~pivoteau/Timsort/Test.java by copying some
>> Spark test code. It does need a huge amount of memory to run (>32g.. ended
>> up at 44g) so not so feasible to put in the test suite. Running it on Spark
>> master nets a JVM crash:
>>
>> # Problematic frame:
>> # J 10195 C2
>> org.apache.spark.util.collection.TimSort.countRunAndMakeAscending(Ljava/lang/Object;IILjava/util/Comparator;)I
>> (198 bytes) @ 0x7ff64dd9a262 [0x7ff64dd99f20+0x342]
>>
>> Thats... not good, but I can't tell if it's really due to this same issue
>> or something else going on in the off-heap code. Making the tiny change I
>> mentioned above doesn't do anything.
>>
>> On Fri, Aug 31, 2018 at 2:37 AM Reynold Xin  wrote:
>>
>>> “As a byproduct of our study, we uncover a bug in the Java
>>> implementation that can cause the sorting method to fail during the
>>> execution.”
>>>
>>> http://drops.dagstuhl.de/opus/volltexte/2018/9467/
>>>
>>> This might impact Spark since we took the Java based TimSort
>>> implementation. I have seen in the wild TimSort failing in the past. Maybe
>>> this is the cause.
>>>
>>


Re: TimSort bug

2018-08-31 Thread Sean Owen
TL;DR - We already had the fix from SPARK-5984. The delta from the current
JDK implementation to Spark's looks actually inconsequential. No action
required AFAICT.

On Fri, Aug 31, 2018 at 12:30 PM Sean Owen  wrote:

> I looked into this, because it sure sounds like a similar issue from a few
> years ago that was fixed in
> https://issues.apache.org/jira/browse/SPARK-5984
> The change in that JIRA actually looks almost identical to the change
> mentioned in the JDK bug:
> http://hg.openjdk.java.net/jdk/jdk/rev/3a6d47df8239
>
> Reading the paper
> http://drops.dagstuhl.de/opus/volltexte/2018/9467/pdf/LIPIcs-ESA-2018-4.pdf in
> section 5 a little more, I think they are saying that there were two ways
> to fix the original problem: a) fix the invariant, or b) increase some data
> structure size. Java did the latter, it seems, and now they've shown it's
> actually still busted. However Python and the original paper did the
> former, and we seem to have copied that fix from
> http://envisage-project.eu/proving-android-java-and-python-sorting-algorithm-is-broken-and-how-to-fix-it/
>  My
> understanding is that this still works, and is what Java *now* implements.
>
> The only difference I can see in implementation is an extra check for a
> negative array index before dereferencing array[n]. We can add that for
> full consistency with the JVM change, I suppose. I don't think it's
> relevant to the problem reported in the paper, but could be an issue
> otherwise. The JVM implementation suggests it thinks this needs to be
> guarded.
>
> I did hack together a crude translation of the paper's bug reproduction at
> http://igm.univ-mlv.fr/~pivoteau/Timsort/Test.java by copying some Spark
> test code. It does need a huge amount of memory to run (>32g.. ended up at
> 44g) so not so feasible to put in the test suite. Running it on Spark
> master nets a JVM crash:
>
> # Problematic frame:
> # J 10195 C2
> org.apache.spark.util.collection.TimSort.countRunAndMakeAscending(Ljava/lang/Object;IILjava/util/Comparator;)I
> (198 bytes) @ 0x7ff64dd9a262 [0x7ff64dd99f20+0x342]
>
> Thats... not good, but I can't tell if it's really due to this same issue
> or something else going on in the off-heap code. Making the tiny change I
> mentioned above doesn't do anything.
>
> On Fri, Aug 31, 2018 at 2:37 AM Reynold Xin  wrote:
>
>> “As a byproduct of our study, we uncover a bug in the Java
>> implementation that can cause the sorting method to fail during the
>> execution.”
>>
>> http://drops.dagstuhl.de/opus/volltexte/2018/9467/
>>
>> This might impact Spark since we took the Java based TimSort
>> implementation. I have seen in the wild TimSort failing in the past. Maybe
>> this is the cause.
>>
>


Re: TimSort bug

2018-08-31 Thread Sean Owen
I looked into this, because it sure sounds like a similar issue from a few
years ago that was fixed in https://issues.apache.org/jira/browse/SPARK-5984

The change in that JIRA actually looks almost identical to the change
mentioned in the JDK bug:
http://hg.openjdk.java.net/jdk/jdk/rev/3a6d47df8239

Reading the paper
http://drops.dagstuhl.de/opus/volltexte/2018/9467/pdf/LIPIcs-ESA-2018-4.pdf in
section 5 a little more, I think they are saying that there were two ways
to fix the original problem: a) fix the invariant, or b) increase some data
structure size. Java did the latter, it seems, and now they've shown it's
actually still busted. However Python and the original paper did the
former, and we seem to have copied that fix from
http://envisage-project.eu/proving-android-java-and-python-sorting-algorithm-is-broken-and-how-to-fix-it/
My
understanding is that this still works, and is what Java *now* implements.

The only difference I can see in implementation is an extra check for a
negative array index before dereferencing array[n]. We can add that for
full consistency with the JVM change, I suppose. I don't think it's
relevant to the problem reported in the paper, but could be an issue
otherwise. The JVM implementation suggests it thinks this needs to be
guarded.

I did hack together a crude translation of the paper's bug reproduction at
http://igm.univ-mlv.fr/~pivoteau/Timsort/Test.java by copying some Spark
test code. It does need a huge amount of memory to run (>32g.. ended up at
44g) so not so feasible to put in the test suite. Running it on Spark
master nets a JVM crash:

# Problematic frame:
# J 10195 C2
org.apache.spark.util.collection.TimSort.countRunAndMakeAscending(Ljava/lang/Object;IILjava/util/Comparator;)I
(198 bytes) @ 0x7ff64dd9a262 [0x7ff64dd99f20+0x342]

Thats... not good, but I can't tell if it's really due to this same issue
or something else going on in the off-heap code. Making the tiny change I
mentioned above doesn't do anything.

On Fri, Aug 31, 2018 at 2:37 AM Reynold Xin  wrote:

> “As a byproduct of our study, we uncover a bug in the Java implementation
> that can cause the sorting method to fail during the execution.”
>
> http://drops.dagstuhl.de/opus/volltexte/2018/9467/
>
> This might impact Spark since we took the Java based TimSort
> implementation. I have seen in the wild TimSort failing in the past. Maybe
> this is the cause.
>


TimSort bug

2018-08-31 Thread Reynold Xin
“As a byproduct of our study, we uncover a bug in the Java implementation
that can cause the sorting method to fail during the execution.”

http://drops.dagstuhl.de/opus/volltexte/2018/9467/

This might impact Spark since we took the Java based TimSort
implementation. I have seen in the wild TimSort failing in the past. Maybe
this is the cause.


Re: Spark data quality bug when reading parquet files from hive metastore

2018-08-24 Thread Driesprong, Fokko
Hi Andrew,

This blog gives an idea how to schema is resolved:
https://blog.godatadriven.com/multiformat-spark-partition There is some
optimisation going on when reading Parquet using Spark. Hope this helps.

Cheers, Fokko


Op wo 22 aug. 2018 om 23:59 schreef t4 :

> https://issues.apache.org/jira/browse/SPARK-23576 ?
>
>
>
> --
> Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


Re: Spark data quality bug when reading parquet files from hive metastore

2018-08-22 Thread t4
https://issues.apache.org/jira/browse/SPARK-23576 ?



--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Spark data quality bug when reading parquet files from hive metastore

2018-08-22 Thread Long, Andrew
Hello Friends,

I’ve encountered a bug where spark silently corrupts data when reading from a 
parquet hive table where the table schema does not match the file schema.  I’d 
like to give a shot at adding some extra validations to the code to handle this 
corner case and I was wondering if anyone had any suggestions for where to 
start looking in the spark code.

Cheers Andrew


Re: [SS] Bug in StreamExecution? currentBatchId and getBatchDescriptionString for web UI

2017-09-10 Thread Jacek Laskowski
Hi,

Please disregard my finding. It does not seem a bug, but just a small
"dead code" as "init" will never be displayed in web UI = the minimum
batch id can ever be 0 and so getBatchDescriptionString could be a
little "improved".

Sorry for the noise.

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
Spark Structured Streaming (Apache Spark 2.2+)
https://bit.ly/spark-structured-streaming
Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Sat, Sep 9, 2017 at 9:21 PM, Jacek Laskowski  wrote:
> Hi,
>
> While reviewing StreamExecution and how batches are displayed in web
> UI, I've noticed that currentBatchId is -1 when StreamExecution is
> created [1] and becomes 0 when no offsets are available [2].
>
> That leads to my question about setting the job description for a
> query using getBatchDescriptionString [3]. It branches per
> currentBatchId and when it's -1 gives "init" [4] which never happens
> as showed above.
>
> That leads to the PR for SPARK-20464 "Add a job group and description
> for streaming queries and fix cancellation of running jobs using the
> job group" that sets the job description after populateStartOffsets
> [5].
>
> Shouldn't it be before populateStartOffsets so
> getBatchDescriptionString has a chance of giving "init" and we see no
> two 0s?
>
> Help appreciated.
>
> [1] 
> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L116
> [2] 
> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala?utf8=%E2%9C%93#L516
> [3] 
> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala?utf8=%E2%9C%93#L878-L883
> [4] 
> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala?utf8=%E2%9C%93#L879
> [5] 
> https://github.com/apache/spark/commit/6fc6cf88d871f5b05b0ad1a504e0d6213cf9d331#diff-6532dd3b63bdab0364fbcf2303e290e4R294
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://about.me/JacekLaskowski
> Spark Structured Streaming (Apache Spark 2.2+)
> https://bit.ly/spark-structured-streaming
> Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



[SS] Bug in StreamExecution? currentBatchId and getBatchDescriptionString for web UI

2017-09-09 Thread Jacek Laskowski
Hi,

While reviewing StreamExecution and how batches are displayed in web
UI, I've noticed that currentBatchId is -1 when StreamExecution is
created [1] and becomes 0 when no offsets are available [2].

That leads to my question about setting the job description for a
query using getBatchDescriptionString [3]. It branches per
currentBatchId and when it's -1 gives "init" [4] which never happens
as showed above.

That leads to the PR for SPARK-20464 "Add a job group and description
for streaming queries and fix cancellation of running jobs using the
job group" that sets the job description after populateStartOffsets
[5].

Shouldn't it be before populateStartOffsets so
getBatchDescriptionString has a chance of giving "init" and we see no
two 0s?

Help appreciated.

[1] 
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L116
[2] 
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala?utf8=%E2%9C%93#L516
[3] 
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala?utf8=%E2%9C%93#L878-L883
[4] 
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala?utf8=%E2%9C%93#L879
[5] 
https://github.com/apache/spark/commit/6fc6cf88d871f5b05b0ad1a504e0d6213cf9d331#diff-6532dd3b63bdab0364fbcf2303e290e4R294

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
Spark Structured Streaming (Apache Spark 2.2+)
https://bit.ly/spark-structured-streaming
Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: [SS] Writing a test for a possible bug in StateStoreSaveExec with Append output mode?

2017-09-04 Thread Jacek Laskowski
Hi,

I think I know where the issue surfaces. This is with groupBy
aggregation with Append output mode.

What should happen when a state expires for a event time (in groupBy)
with the new rows for the expired key in a streaming batch exactly
when watermark has been moved up and thus expired the state for the
key?

Example's coming up.

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
Spark Structured Streaming (Apache Spark 2.2+)
https://bit.ly/spark-structured-streaming
Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Sun, Sep 3, 2017 at 11:04 PM, Jacek Laskowski  wrote:
> Hi,
>
> I may have found a bug in StateStoreSaveExec with Append output mode
> and would love proving myself I'm wrong or help squashing it by
> writing a test for the case.
>
> Is there a test for StateStoreSaveExec with Append output mode? If
> not, is there a streaming test template that could be very close to a
> test and that I could use?
>
> Thanks for any help you may offer!
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://about.me/JacekLaskowski
> Spark Structured Streaming (Apache Spark 2.2+)
> https://bit.ly/spark-structured-streaming
> Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



[SS] Writing a test for a possible bug in StateStoreSaveExec with Append output mode?

2017-09-03 Thread Jacek Laskowski
Hi,

I may have found a bug in StateStoreSaveExec with Append output mode
and would love proving myself I'm wrong or help squashing it by
writing a test for the case.

Is there a test for StateStoreSaveExec with Append output mode? If
not, is there a streaming test template that could be very close to a
test and that I could use?

Thanks for any help you may offer!

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
Spark Structured Streaming (Apache Spark 2.2+)
https://bit.ly/spark-structured-streaming
Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Possible bug: inconsistent timestamp behavior

2017-08-15 Thread Maciej Szymkiewicz
These two are just not equivalent.

Spark SQL interprets long as seconds when casting between timestamps and
numerics, therefore
lit(148550335L).cast(org.apache.spark.sql.types.TimestampType)
represents 49043-09-23 21:26:400.0. This behavior is intended - see for
example https://issues.apache.org/jira/browse/SPARK-11724

java.sql.Timestamp expects milliseconds as an argument therefore lit(new
java.sql.Timestamp(148550335L)) represents 2017-01-27 08:49:10
.

On 15 August 2017 at 13:16, assaf.mendelson  wrote:

> Hi all,
>
> I encountered weird behavior for timestamp. It seems that when using lit
> to add it to column, the timestamp goes from milliseconds representation to
> seconds representation:
>
>
>
>
>
> scala> spark.range(1).withColumn("a", lit(new java.sql.Timestamp(
> 148550335L)).cast("long")).show()
>
> +---+--+
>
> | id| a|
>
> +---+--+
>
> |  0|1485503350|
>
> +---+--+
>
>
>
>
>
> scala> spark.range(1).withColumn("a", lit(148550335L).cast(org.
> apache.spark.sql.types.TimestampType).cast(org.apache.spark.sql.types.
> LongType)).show()
>
> +---+-----+
>
> | id|a|
>
> +---+-+
>
> |  0|148550335|
>
> +---+-+
>
>
>
>
>
> Is this a bug or am I missing something here?
>
>
>
> Thanks,
>
> Assaf
>
>
>
> --
> View this message in context: Possible bug: inconsistent timestamp
> behavior
> <http://apache-spark-developers-list.1001551.n3.nabble.com/Possible-bug-inconsistent-timestamp-behavior-tp22144.html>
> Sent from the Apache Spark Developers List mailing list archive
> <http://apache-spark-developers-list.1001551.n3.nabble.com/> at
> Nabble.com.
>



-- 

Z poważaniem,
Maciej Szymkiewicz


Possible bug: inconsistent timestamp behavior

2017-08-15 Thread assaf.mendelson
Hi all,
I encountered weird behavior for timestamp. It seems that when using lit to add 
it to column, the timestamp goes from milliseconds representation to seconds 
representation:


scala> spark.range(1).withColumn("a", lit(new 
java.sql.Timestamp(148550335L)).cast("long")).show()
+---+--+
| id| a|
+---+--+
|  0|1485503350|
+---+--+


scala> spark.range(1).withColumn("a", 
lit(148550335L).cast(org.apache.spark.sql.types.TimestampType).cast(org.apache.spark.sql.types.LongType)).show()
+---+-+
| id|a|
+---+-+
|  0|1485503350000|
+---+-+


Is this a bug or am I missing something here?

Thanks,
Assaf





--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Possible-bug-inconsistent-timestamp-behavior-tp22144.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

Re: SQLListener concurrency bug?

2017-06-28 Thread Oleksandr Vayda
Oh, sorry, I was wrong. Concurrent collections in Scala are available since
2.8. Any objections against replacing mutable list and synchronized with a
concurrent collection like or based on TrieMap for instance?

On Wed, Jun 28, 2017, 16:05 Oleksandr Vayda 
wrote:

> Cool. I will be happy to create a PR. The simplest and most obvious
> solution that came to my mind was using Java concurrent collections instead
> of Scala mutable. Don't you mind to have this bit of Java inside Spark? :)
> Or perhaps we could use Scala concurrent collections, but they are only
> available in Scala 2.12 AFAIK.
>
> Alex
>
> On Tue, Jun 27, 2017, 05:42 Shixiong(Ryan) Zhu 
> wrote:
>
>> Right now they are safe because the caller also calls synchronized when
>> using them. This is to avoid copying objects. It's probably a bad design.
>> If you want to refactor them, PR is welcome.
>>
>> On Mon, Jun 26, 2017 at 2:27 AM, Oleksandr Vayda <
>> oleksandr.va...@gmail.com> wrote:
>>
>>> Hi all,
>>>
>>> Reading the source code of the org.apache.spark.sql.execution.ui.
>>> SQLListener, specifically this place -
>>> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala#L328
>>>
>>> def getFailedExecutions: Seq[SQLExecutionUIData] = synchronized {
>>> failedExecutions
>>> }
>>> def getCompletedExecutions: Seq[SQLExecutionUIData] = synchronized {
>>> completedExecutions
>>> }
>>> I believe the synchronized block is used here incorrectly. If I get it
>>> right the main purpose here is to synchronize access to the mutable
>>> collections from the UI (read) and the event bus (read/write) threads. But
>>> in the current implementation the "synchronized" blocks return bare
>>> references to mutable collections and in fact nothing gets synchronized.
>>> Is it a bug?
>>>
>>> Sincerely yours,
>>> Oleksandr Vayda
>>>
>>> mobile: +420 604 113 056 <+420%20604%20113%20056>
>>>
>>
>>


Re: SQLListener concurrency bug?

2017-06-28 Thread Oleksandr Vayda
Cool. I will be happy to create a PR. The simplest and most obvious
solution that came to my mind was using Java concurrent collections instead
of Scala mutable. Don't you mind to have this bit of Java inside Spark? :)
Or perhaps we could use Scala concurrent collections, but they are only
available in Scala 2.12 AFAIK.

Alex

On Tue, Jun 27, 2017, 05:42 Shixiong(Ryan) Zhu 
wrote:

> Right now they are safe because the caller also calls synchronized when
> using them. This is to avoid copying objects. It's probably a bad design.
> If you want to refactor them, PR is welcome.
>
> On Mon, Jun 26, 2017 at 2:27 AM, Oleksandr Vayda <
> oleksandr.va...@gmail.com> wrote:
>
>> Hi all,
>>
>> Reading the source code of the org.apache.spark.sql.execution.ui.
>> SQLListener, specifically this place -
>> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala#L328
>>
>> def getFailedExecutions: Seq[SQLExecutionUIData] = synchronized {
>> failedExecutions
>> }
>> def getCompletedExecutions: Seq[SQLExecutionUIData] = synchronized {
>> completedExecutions
>> }
>> I believe the synchronized block is used here incorrectly. If I get it
>> right the main purpose here is to synchronize access to the mutable
>> collections from the UI (read) and the event bus (read/write) threads. But
>> in the current implementation the "synchronized" blocks return bare
>> references to mutable collections and in fact nothing gets synchronized.
>> Is it a bug?
>>
>> Sincerely yours,
>> Oleksandr Vayda
>>
>> mobile: +420 604 113 056 <+420%20604%20113%20056>
>>
>
>


Re: SQLListener concurrency bug?

2017-06-26 Thread Shixiong(Ryan) Zhu
Right now they are safe because the caller also calls synchronized when
using them. This is to avoid copying objects. It's probably a bad design.
If you want to refactor them, PR is welcome.

On Mon, Jun 26, 2017 at 2:27 AM, Oleksandr Vayda 
wrote:

> Hi all,
>
> Reading the source code of the org.apache.spark.sql.execution.ui.
> SQLListener, specifically this place - https://github.com/apache/
> spark/blob/master/sql/core/src/main/scala/org/apache/
> spark/sql/execution/ui/SQLListener.scala#L328
>
> def getFailedExecutions: Seq[SQLExecutionUIData] = synchronized {
> failedExecutions
> }
> def getCompletedExecutions: Seq[SQLExecutionUIData] = synchronized {
> completedExecutions
> }
> I believe the synchronized block is used here incorrectly. If I get it
> right the main purpose here is to synchronize access to the mutable
> collections from the UI (read) and the event bus (read/write) threads. But
> in the current implementation the "synchronized" blocks return bare
> references to mutable collections and in fact nothing gets synchronized.
> Is it a bug?
>
> Sincerely yours,
> Oleksandr Vayda
>
> mobile: +420 604 113 056 <+420%20604%20113%20056>
>


SQLListener concurrency bug?

2017-06-26 Thread Oleksandr Vayda
Hi all,

Reading the source code of the org.apache.spark.sql.execution.ui.SQLListener,
specifically this place -
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala#L328

def getFailedExecutions: Seq[SQLExecutionUIData] = synchronized {
failedExecutions
}
def getCompletedExecutions: Seq[SQLExecutionUIData] = synchronized {
completedExecutions
}
I believe the synchronized block is used here incorrectly. If I get it
right the main purpose here is to synchronize access to the mutable
collections from the UI (read) and the event bus (read/write) threads. But
in the current implementation the "synchronized" blocks return bare
references to mutable collections and in fact nothing gets synchronized.
Is it a bug?

Sincerely yours,
Oleksandr Vayda

mobile: +420 604 113 056


GraphFrames 0.5.0 - critical bug fix + other improvements

2017-05-19 Thread Joseph Bradley
Hi Spark community,

I'd like to announce a new release of GraphFrames, a Spark Package for
DataFrame-based graphs!

*We strongly encourage all users to use this latest release for the bug fix
described below.*

*Critical bug fix*
This release fixes a bug in indexing vertices.  This may have affected your
results if:
* your graph uses non-Integer IDs and
* you use ConnectedComponents and other algorithms which are wrappers
around GraphX.
The bug occurs when the input DataFrame is non-deterministic. E.g., running
an algorithm on a DataFrame just loaded from disk should be fine in
previous releases, but running that algorithm on a DataFrame produced using
shuffling, unions, and other operators can cause incorrect results. This
issue is fixed in this release.

*New features*
* Python API for aggregateMessages for building custom graph algorithms
* Scala API for parallel personalized PageRank, wrapping the GraphX
implementation. This is only available when using GraphFrames with Spark
2.1+.

Support for Spark 1.6, 2.0, and 2.1

*Special thanks to Felix Cheung for his work as a new committer for
GraphFrames!*

*Full release notes*:
https://github.com/graphframes/graphframes/releases/tag/release-0.5.0
*Docs*: http://graphframes.github.io/
*Spark Package*: https://spark-packages.org/package/graphframes/graphframes
*Source*: https://github.com/graphframes/graphframes

Thanks to all contributors and to the community for feedback!
Joseph

-- 

Joseph Bradley

Software Engineer - Machine Learning

Databricks, Inc.

[image: http://databricks.com] <http://databricks.com/>


Re: A DataFrame cache bug

2017-02-26 Thread Liang-Chi Hsieh


Hi Gen,

I submitted a PR to fix the issue of refreshByPath:
https://github.com/apache/spark/pull/17064

Thanks.



tgbaggio wrote
> Hi, The example that I provided is not very clear. And I add a more clear
> example in jira.
> 
> Thanks
> 
> Cheers
> Gen
> 
> On Wed, Feb 22, 2017 at 3:47 PM, gen tang <

> gen.tang86@

> > wrote:
> 
>> Hi Kazuaki Ishizaki
>>
>> Thanks a lot for your help. It works. However, a more strange bug appears
>> as follows:
>>
>> import org.apache.spark.sql.DataFrame
>> import org.apache.spark.sql.SparkSession
>>
>> def f(path: String, spark: SparkSession): DataFrame = {
>>   val data = spark.read.option("mergeSchema", "true").parquet(path)
>>   println(data.count)
>>   val df = data.filter("id>10")
>>   df.cache
>>   println(df.count)
>>   val df1 = df.filter("id>11")
>>   df1.cache
>>   println(df1.count)
>>   df1
>> }
>>
>> val dir = "/tmp/test"
>> spark.range(100).write.mode("overwrite").parquet(dir)
>> spark.catalog.refreshByPath(dir)
>> f(dir, spark).count // output 88 which is correct
>>
>> spark.range(1000).write.mode("overwrite").parquet(dir)
>> spark.catalog.refreshByPath(dir)
>> f(dir, spark).count // output 88 which is incorrect
>>
>> If we move refreshByPath into f(), just before spark.read. The whole code
>> works fine.
>>
>> Any idea? Thanks a lot
>>
>> Cheers
>> Gen
>>
>>
>> On Wed, Feb 22, 2017 at 2:22 PM, Kazuaki Ishizaki <

> ISHIZAKI@.ibm

> >
>> wrote:
>>
>>> Hi,
>>> Thank you for pointing out the JIRA.
>>> I think that this JIRA suggests you to insert
>>> "spark.catalog.refreshByPath(dir)".
>>>
>>> val dir = "/tmp/test"
>>> spark.range(100).write.mode("overwrite").parquet(dir)
>>> val df = spark.read.parquet(dir)
>>> df.count // output 100 which is correct
>>> f(df).count // output 89 which is correct
>>>
>>> spark.range(1000).write.mode("overwrite").parquet(dir)
>>> spark.catalog.refreshByPath(dir)  // insert a NEW statement
>>> val df1 = spark.read.parquet(dir)
>>> df1.count // output 1000 which is correct, in fact other operation
>>> expect
>>> df1.filter("id>10") return correct result.
>>> f(df1).count // output 89 which is incorrect
>>>
>>> Regards,
>>> Kazuaki Ishizaki
>>>
>>>
>>>
>>> From:gen tang <

> gen.tang86@

> >
>>> To:

> dev@.apache

>>> Date:2017/02/22 15:02
>>> Subject:Re: A DataFrame cache bug
>>> --
>>>
>>>
>>>
>>> Hi All,
>>>
>>> I might find a related issue on jira:
>>>
>>> *https://issues.apache.org/jira/browse/SPARK-15678*
>>> <https://issues.apache.org/jira/browse/SPARK-15678>;
>>>
>>> This issue is closed, may be we should reopen it.
>>>
>>> Thanks
>>>
>>> Cheers
>>> Gen
>>>
>>>
>>> On Wed, Feb 22, 2017 at 1:57 PM, gen tang <*

> gen.tang86@

> *
>>> <

> gen.tang86@

> >> wrote:
>>> Hi All,
>>>
>>> I found a strange bug which is related with reading data from a updated
>>> path and cache operation.
>>> Please consider the following code:
>>>
>>> import org.apache.spark.sql.DataFrame
>>>
>>> def f(data: DataFrame): DataFrame = {
>>>   val df = data.filter("id>10")
>>>   df.cache
>>>   df.count
>>>   df
>>> }
>>>
>>> f(spark.range(100).asInstanceOf[DataFrame]).count // output 89 which is
>>> correct
>>> f(spark.range(1000).asInstanceOf[DataFrame]).count // output 989 which
>>> is correct
>>>
>>> val dir = "/tmp/test"
>>> spark.range(100).write.mode("overwrite").parquet(dir)
>>> val df = spark.read.parquet(dir)
>>> df.count // output 100 which is correct
>>> f(df).count // output 89 which is correct
>>>
>>> spark.range(1000).write.mode("overwrite").parquet(dir)
>>> val df1 = spark.read.parquet(dir)
>>> df1.count // output 1000 which is correct, in fact other operation
>>> expect
>>> df1.filter("id>10") return correct result.
>>> f(df1).count // output 89 which is incorrect
>>>
>>> In fact when we use df1.filter("id>10"), spark will however use old
>>> cached dataFrame
>>>
>>> Any idea? Thanks a lot
>>>
>>> Cheers
>>> Gen
>>>
>>>
>>>
>>





-
Liang-Chi Hsieh | @viirya 
Spark Technology Center 
http://www.spark.tc/ 
--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/A-DataFrame-cache-bug-tp21044p21082.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: A DataFrame cache bug

2017-02-22 Thread gen tang
Hi, The example that I provided is not very clear. And I add a more clear
example in jira.

Thanks

Cheers
Gen

On Wed, Feb 22, 2017 at 3:47 PM, gen tang  wrote:

> Hi Kazuaki Ishizaki
>
> Thanks a lot for your help. It works. However, a more strange bug appears
> as follows:
>
> import org.apache.spark.sql.DataFrame
> import org.apache.spark.sql.SparkSession
>
> def f(path: String, spark: SparkSession): DataFrame = {
>   val data = spark.read.option("mergeSchema", "true").parquet(path)
>   println(data.count)
>   val df = data.filter("id>10")
>   df.cache
>   println(df.count)
>   val df1 = df.filter("id>11")
>   df1.cache
>   println(df1.count)
>   df1
> }
>
> val dir = "/tmp/test"
> spark.range(100).write.mode("overwrite").parquet(dir)
> spark.catalog.refreshByPath(dir)
> f(dir, spark).count // output 88 which is correct
>
> spark.range(1000).write.mode("overwrite").parquet(dir)
> spark.catalog.refreshByPath(dir)
> f(dir, spark).count // output 88 which is incorrect
>
> If we move refreshByPath into f(), just before spark.read. The whole code
> works fine.
>
> Any idea? Thanks a lot
>
> Cheers
> Gen
>
>
> On Wed, Feb 22, 2017 at 2:22 PM, Kazuaki Ishizaki 
> wrote:
>
>> Hi,
>> Thank you for pointing out the JIRA.
>> I think that this JIRA suggests you to insert
>> "spark.catalog.refreshByPath(dir)".
>>
>> val dir = "/tmp/test"
>> spark.range(100).write.mode("overwrite").parquet(dir)
>> val df = spark.read.parquet(dir)
>> df.count // output 100 which is correct
>> f(df).count // output 89 which is correct
>>
>> spark.range(1000).write.mode("overwrite").parquet(dir)
>> spark.catalog.refreshByPath(dir)  // insert a NEW statement
>> val df1 = spark.read.parquet(dir)
>> df1.count // output 1000 which is correct, in fact other operation expect
>> df1.filter("id>10") return correct result.
>> f(df1).count // output 89 which is incorrect
>>
>> Regards,
>> Kazuaki Ishizaki
>>
>>
>>
>> From:gen tang 
>> To:dev@spark.apache.org
>> Date:2017/02/22 15:02
>> Subject:Re: A DataFrame cache bug
>> --
>>
>>
>>
>> Hi All,
>>
>> I might find a related issue on jira:
>>
>> *https://issues.apache.org/jira/browse/SPARK-15678*
>> <https://issues.apache.org/jira/browse/SPARK-15678>
>>
>> This issue is closed, may be we should reopen it.
>>
>> Thanks
>>
>> Cheers
>> Gen
>>
>>
>> On Wed, Feb 22, 2017 at 1:57 PM, gen tang <*gen.tan...@gmail.com*
>> > wrote:
>> Hi All,
>>
>> I found a strange bug which is related with reading data from a updated
>> path and cache operation.
>> Please consider the following code:
>>
>> import org.apache.spark.sql.DataFrame
>>
>> def f(data: DataFrame): DataFrame = {
>>   val df = data.filter("id>10")
>>   df.cache
>>   df.count
>>   df
>> }
>>
>> f(spark.range(100).asInstanceOf[DataFrame]).count // output 89 which is
>> correct
>> f(spark.range(1000).asInstanceOf[DataFrame]).count // output 989 which
>> is correct
>>
>> val dir = "/tmp/test"
>> spark.range(100).write.mode("overwrite").parquet(dir)
>> val df = spark.read.parquet(dir)
>> df.count // output 100 which is correct
>> f(df).count // output 89 which is correct
>>
>> spark.range(1000).write.mode("overwrite").parquet(dir)
>> val df1 = spark.read.parquet(dir)
>> df1.count // output 1000 which is correct, in fact other operation expect
>> df1.filter("id>10") return correct result.
>> f(df1).count // output 89 which is incorrect
>>
>> In fact when we use df1.filter("id>10"), spark will however use old
>> cached dataFrame
>>
>> Any idea? Thanks a lot
>>
>> Cheers
>> Gen
>>
>>
>>
>


Re: A DataFrame cache bug

2017-02-21 Thread gen tang
Hi Kazuaki Ishizaki

Thanks a lot for your help. It works. However, a more strange bug appears
as follows:

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SparkSession

def f(path: String, spark: SparkSession): DataFrame = {
  val data = spark.read.option("mergeSchema", "true").parquet(path)
  println(data.count)
  val df = data.filter("id>10")
  df.cache
  println(df.count)
  val df1 = df.filter("id>11")
  df1.cache
  println(df1.count)
  df1
}

val dir = "/tmp/test"
spark.range(100).write.mode("overwrite").parquet(dir)
spark.catalog.refreshByPath(dir)
f(dir, spark).count // output 88 which is correct

spark.range(1000).write.mode("overwrite").parquet(dir)
spark.catalog.refreshByPath(dir)
f(dir, spark).count // output 88 which is incorrect

If we move refreshByPath into f(), just before spark.read. The whole code
works fine.

Any idea? Thanks a lot

Cheers
Gen


On Wed, Feb 22, 2017 at 2:22 PM, Kazuaki Ishizaki 
wrote:

> Hi,
> Thank you for pointing out the JIRA.
> I think that this JIRA suggests you to insert "spark.catalog.refreshByPath(
> dir)".
>
> val dir = "/tmp/test"
> spark.range(100).write.mode("overwrite").parquet(dir)
> val df = spark.read.parquet(dir)
> df.count // output 100 which is correct
> f(df).count // output 89 which is correct
>
> spark.range(1000).write.mode("overwrite").parquet(dir)
> spark.catalog.refreshByPath(dir)  // insert a NEW statement
> val df1 = spark.read.parquet(dir)
> df1.count // output 1000 which is correct, in fact other operation expect
> df1.filter("id>10") return correct result.
> f(df1).count // output 89 which is incorrect
>
> Regards,
> Kazuaki Ishizaki
>
>
>
> From:gen tang 
> To:dev@spark.apache.org
> Date:2017/02/22 15:02
> Subject:Re: A DataFrame cache bug
> --
>
>
>
> Hi All,
>
> I might find a related issue on jira:
>
> *https://issues.apache.org/jira/browse/SPARK-15678*
> <https://issues.apache.org/jira/browse/SPARK-15678>
>
> This issue is closed, may be we should reopen it.
>
> Thanks
>
> Cheers
> Gen
>
>
> On Wed, Feb 22, 2017 at 1:57 PM, gen tang <*gen.tan...@gmail.com*
> > wrote:
> Hi All,
>
> I found a strange bug which is related with reading data from a updated
> path and cache operation.
> Please consider the following code:
>
> import org.apache.spark.sql.DataFrame
>
> def f(data: DataFrame): DataFrame = {
>   val df = data.filter("id>10")
>   df.cache
>   df.count
>   df
> }
>
> f(spark.range(100).asInstanceOf[DataFrame]).count // output 89 which is
> correct
> f(spark.range(1000).asInstanceOf[DataFrame]).count // output 989 which is
> correct
>
> val dir = "/tmp/test"
> spark.range(100).write.mode("overwrite").parquet(dir)
> val df = spark.read.parquet(dir)
> df.count // output 100 which is correct
> f(df).count // output 89 which is correct
>
> spark.range(1000).write.mode("overwrite").parquet(dir)
> val df1 = spark.read.parquet(dir)
> df1.count // output 1000 which is correct, in fact other operation expect
> df1.filter("id>10") return correct result.
> f(df1).count // output 89 which is incorrect
>
> In fact when we use df1.filter("id>10"), spark will however use old cached
> dataFrame
>
> Any idea? Thanks a lot
>
> Cheers
> Gen
>
>
>


Re: A DataFrame cache bug

2017-02-21 Thread Kazuaki Ishizaki
Hi,
Thank you for pointing out the JIRA.
I think that this JIRA suggests you to insert 
"spark.catalog.refreshByPath(dir)".

val dir = "/tmp/test"
spark.range(100).write.mode("overwrite").parquet(dir)
val df = spark.read.parquet(dir)
df.count // output 100 which is correct
f(df).count // output 89 which is correct

spark.range(1000).write.mode("overwrite").parquet(dir)
spark.catalog.refreshByPath(dir)  // insert a NEW statement
val df1 = spark.read.parquet(dir)
df1.count // output 1000 which is correct, in fact other operation expect 
df1.filter("id>10") return correct result.
f(df1).count // output 89 which is incorrect

Regards,
Kazuaki Ishizaki



From:   gen tang 
To: dev@spark.apache.org
Date:   2017/02/22 15:02
Subject:Re: A DataFrame cache bug



Hi All,

I might find a related issue on jira:

https://issues.apache.org/jira/browse/SPARK-15678

This issue is closed, may be we should reopen it.

Thanks 

Cheers
Gen


On Wed, Feb 22, 2017 at 1:57 PM, gen tang  wrote:
Hi All,

I found a strange bug which is related with reading data from a updated 
path and cache operation.
Please consider the following code:

import org.apache.spark.sql.DataFrame

def f(data: DataFrame): DataFrame = {
  val df = data.filter("id>10")
  df.cache
  df.count
  df
}

f(spark.range(100).asInstanceOf[DataFrame]).count // output 89 which is 
correct
f(spark.range(1000).asInstanceOf[DataFrame]).count // output 989 which is 
correct

val dir = "/tmp/test"
spark.range(100).write.mode("overwrite").parquet(dir)
val df = spark.read.parquet(dir)
df.count // output 100 which is correct
f(df).count // output 89 which is correct

spark.range(1000).write.mode("overwrite").parquet(dir)
val df1 = spark.read.parquet(dir)
df1.count // output 1000 which is correct, in fact other operation expect 
df1.filter("id>10") return correct result.
f(df1).count // output 89 which is incorrect

In fact when we use df1.filter("id>10"), spark will however use old cached 
dataFrame

Any idea? Thanks a lot

Cheers
Gen





Re: A DataFrame cache bug

2017-02-21 Thread gen tang
Hi All,

I might find a related issue on jira:

https://issues.apache.org/jira/browse/SPARK-15678

This issue is closed, may be we should reopen it.

Thanks

Cheers
Gen


On Wed, Feb 22, 2017 at 1:57 PM, gen tang  wrote:

> Hi All,
>
> I found a strange bug which is related with reading data from a updated
> path and cache operation.
> Please consider the following code:
>
> import org.apache.spark.sql.DataFrame
>
> def f(data: DataFrame): DataFrame = {
>   val df = data.filter("id>10")
>   df.cache
>   df.count
>   df
> }
>
> f(spark.range(100).asInstanceOf[DataFrame]).count // output 89 which is
> correct
> f(spark.range(1000).asInstanceOf[DataFrame]).count // output 989 which is
> correct
>
> val dir = "/tmp/test"
> spark.range(100).write.mode("overwrite").parquet(dir)
> val df = spark.read.parquet(dir)
> df.count // output 100 which is correct
> f(df).count // output 89 which is correct
>
> spark.range(1000).write.mode("overwrite").parquet(dir)
> val df1 = spark.read.parquet(dir)
> df1.count // output 1000 which is correct, in fact other operation expect
> df1.filter("id>10") return correct result.
> f(df1).count // output 89 which is incorrect
>
> In fact when we use df1.filter("id>10"), spark will however use old cached
> dataFrame
>
> Any idea? Thanks a lot
>
> Cheers
> Gen
>


A DataFrame cache bug

2017-02-21 Thread gen tang
Hi All,

I found a strange bug which is related with reading data from a updated
path and cache operation.
Please consider the following code:

import org.apache.spark.sql.DataFrame

def f(data: DataFrame): DataFrame = {
  val df = data.filter("id>10")
  df.cache
  df.count
  df
}

f(spark.range(100).asInstanceOf[DataFrame]).count // output 89 which is
correct
f(spark.range(1000).asInstanceOf[DataFrame]).count // output 989 which is
correct

val dir = "/tmp/test"
spark.range(100).write.mode("overwrite").parquet(dir)
val df = spark.read.parquet(dir)
df.count // output 100 which is correct
f(df).count // output 89 which is correct

spark.range(1000).write.mode("overwrite").parquet(dir)
val df1 = spark.read.parquet(dir)
df1.count // output 1000 which is correct, in fact other operation expect
df1.filter("id>10") return correct result.
f(df1).count // output 89 which is incorrect

In fact when we use df1.filter("id>10"), spark will however use old cached
dataFrame

Any idea? Thanks a lot

Cheers
Gen


Simple bug fix PR looking for love

2017-02-09 Thread Michael Allman
Hi Guys,

Can someone help move https://github.com/apache/spark/pull/16499 
 along in the review process? This 
PR fixes replicated off-heap storage.

Thanks!

Michael

Re: Possible bug - Java iterator/iterable inconsistency

2017-01-19 Thread Asher Krim
Thanks Sean!

On Thu, Jan 19, 2017 at 6:09 AM, Sean Owen  wrote:

> Yes, confirmed that fixing it unfortunately causes trouble in Java 8. See
> https://issues.apache.org/jira/browse/SPARK-19287 for further discussion.
>
> On Wed, Jan 18, 2017 at 9:00 PM Sean Owen  wrote:
>
>> Hm. Unless I am also totally missing or forgetting something, I think
>> you're right. The equivalent in PairRDDFunctions.scala operations on a
>> function from T to TraversableOnce[U] and a TraversableOnce is most like
>> java.util.Iterator.
>>
>> You can work around it by wrapping it in a faked IteratorIterable.
>>
>> I think this is fixable in the API by deprecating this method and adding
>> a new one that takes a FlatMapFunction. We'd have to triple-check in a test
>> that this doesn't cause an API compatibility problem with respect to Java 8
>> lambdas, but if that's settled, I think this could be fixed without
>> breaking the API.
>>
>> On Wed, Jan 18, 2017 at 8:50 PM Asher Krim  wrote:
>>
>> In Spark 2 + Java + RDD api, the use of iterables was replaced with
>> iterators. I just encountered an inconsistency in `flatMapValues` that may
>> be a bug:
>>
>> `flatMapValues` (https://github.com/apache/spark/blob/master/core/src/
>> main/scala/org/apache/spark/api/java/JavaPairRDD.scala#L677) takes
>> a `FlatMapFunction` (https://github.com/apache/spark/blob/
>> 39e2bad6a866d27c3ca594d15e574a1da3ee84cc/core/src/main/java/
>> org/apache/spark/api/java/function/FlatMapFunction.java)
>>
>> The problem is that `FlatMapFunction` was changed to return an iterator,
>> but `rdd.flatMapValues` still expects an iterable. Am I using these
>> constructs correctly? Is there a workaround other than converting the
>> iterator to an iterable outside of the function?
>>
>> Thanks,
>> --
>> Asher Krim
>> Senior Software Engineer
>>
>>


-- 
Asher Krim
Senior Software Engineer


Re: Possible bug - Java iterator/iterable inconsistency

2017-01-19 Thread Sean Owen
Yes, confirmed that fixing it unfortunately causes trouble in Java 8. See
https://issues.apache.org/jira/browse/SPARK-19287 for further discussion.

On Wed, Jan 18, 2017 at 9:00 PM Sean Owen  wrote:

> Hm. Unless I am also totally missing or forgetting something, I think
> you're right. The equivalent in PairRDDFunctions.scala operations on a
> function from T to TraversableOnce[U] and a TraversableOnce is most like
> java.util.Iterator.
>
> You can work around it by wrapping it in a faked IteratorIterable.
>
> I think this is fixable in the API by deprecating this method and adding a
> new one that takes a FlatMapFunction. We'd have to triple-check in a test
> that this doesn't cause an API compatibility problem with respect to Java 8
> lambdas, but if that's settled, I think this could be fixed without
> breaking the API.
>
> On Wed, Jan 18, 2017 at 8:50 PM Asher Krim  wrote:
>
> In Spark 2 + Java + RDD api, the use of iterables was replaced with
> iterators. I just encountered an inconsistency in `flatMapValues` that may
> be a bug:
>
> `flatMapValues` (
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala#L677)
> takes a `FlatMapFunction` (
> https://github.com/apache/spark/blob/39e2bad6a866d27c3ca594d15e574a1da3ee84cc/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java
> )
>
> The problem is that `FlatMapFunction` was changed to return an iterator,
> but `rdd.flatMapValues` still expects an iterable. Am I using these
> constructs correctly? Is there a workaround other than converting the
> iterator to an iterable outside of the function?
>
> Thanks,
> --
> Asher Krim
> Senior Software Engineer
>
>


Re: Possible bug - Java iterator/iterable inconsistency

2017-01-18 Thread Sean Owen
Hm. Unless I am also totally missing or forgetting something, I think
you're right. The equivalent in PairRDDFunctions.scala operations on a
function from T to TraversableOnce[U] and a TraversableOnce is most like
java.util.Iterator.

You can work around it by wrapping it in a faked IteratorIterable.

I think this is fixable in the API by deprecating this method and adding a
new one that takes a FlatMapFunction. We'd have to triple-check in a test
that this doesn't cause an API compatibility problem with respect to Java 8
lambdas, but if that's settled, I think this could be fixed without
breaking the API.

On Wed, Jan 18, 2017 at 8:50 PM Asher Krim  wrote:

> In Spark 2 + Java + RDD api, the use of iterables was replaced with
> iterators. I just encountered an inconsistency in `flatMapValues` that may
> be a bug:
>
> `flatMapValues` (
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala#L677)
> takes a `FlatMapFunction` (
> https://github.com/apache/spark/blob/39e2bad6a866d27c3ca594d15e574a1da3ee84cc/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java
> )
>
> The problem is that `FlatMapFunction` was changed to return an iterator,
> but `rdd.flatMapValues` still expects an iterable. Am I using these
> constructs correctly? Is there a workaround other than converting the
> iterator to an iterable outside of the function?
>
> Thanks,
> --
> Asher Krim
> Senior Software Engineer
>


Possible bug - Java iterator/iterable inconsistency

2017-01-18 Thread Asher Krim
In Spark 2 + Java + RDD api, the use of iterables was replaced with
iterators. I just encountered an inconsistency in `flatMapValues` that may
be a bug:

`flatMapValues` (https://github.com/apache/spark/blob/master/core/src/
main/scala/org/apache/spark/api/java/JavaPairRDD.scala#L677) takes
a `FlatMapFunction` (
https://github.com/apache/spark/blob/39e2bad6a866d27c3ca594d15e574a1da3ee84cc/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java
)

The problem is that `FlatMapFunction` was changed to return an iterator,
but `rdd.flatMapValues` still expects an iterable. Am I using these
constructs correctly? Is there a workaround other than converting the
iterator to an iterable outside of the function?

Thanks,
-- 
Asher Krim
Senior Software Engineer


Re: DataFrame Distinct Sample Bug?

2017-01-03 Thread Reynold Xin
I get the same result every time on Spark 2.1:


Using Python version 2.7.12 (default, Jul  2 2016 17:43:17)
SparkSession available as 'spark'.
>>> from pyspark.sql import functions as F
>>>
>>> d = sqlContext.createDataFrame(sc.parallelize([[x] for x in
range(10)]),
... ['t'])
>>> sampled = d.distinct().sample(False, 0.01, 478)
>>> print sampled.select(F.min('t').alias('t')).collect()
[Row(t=4)]

>>> print sampled.select(F.min('t').alias('t')).collect()
[Row(t=4)]
>>> print sampled.select(F.min('t').alias('t')).collect()
[Row(t=4)]


On Wed, Jan 4, 2017 at 8:15 AM, dstuck  wrote:

> I ran into an issue where I'm getting unstable results after sampling a
> dataframe that has had the distinct function called on it. The following
> code should print different answers each time.
>
> from pyspark.sql import functions as F
> d = sqlContext.createDataFrame(sc.parallelize([[x] for x in
> range(10)]),
> ['t'])
> sampled = d.distinct().sample(False, 0.01, 478)
> print sampled.select(F.min('t').alias('t')).collect()
> print sampled.select(F.min('t').alias('t')).collect()
> print sampled.select(F.min('t').alias('t')).collect()
>
> Removing distinct and caching after sampling fix the problem (as does using
> a smaller dataframe). The spark bug reporting docs dissuaded me from
> creating a JIRA issue without checking with this mailing list that this is
> reproducible.
>
> I'm not familiar enough with the spark code to fix this :\
>
>
>
> --
> View this message in context: http://apache-spark-
> developers-list.1001551.n3.nabble.com/DataFrame-Distinct-
> Sample-Bug-tp20439.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


DataFrame Distinct Sample Bug?

2017-01-03 Thread dstuck
I ran into an issue where I'm getting unstable results after sampling a
dataframe that has had the distinct function called on it. The following
code should print different answers each time.

from pyspark.sql import functions as F
d = sqlContext.createDataFrame(sc.parallelize([[x] for x in range(10)]),
['t'])
sampled = d.distinct().sample(False, 0.01, 478)
print sampled.select(F.min('t').alias('t')).collect()
print sampled.select(F.min('t').alias('t')).collect()
print sampled.select(F.min('t').alias('t')).collect()

Removing distinct and caching after sampling fix the problem (as does using
a smaller dataframe). The spark bug reporting docs dissuaded me from
creating a JIRA issue without checking with this mailing list that this is
reproducible.

I'm not familiar enough with the spark code to fix this :\



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/DataFrame-Distinct-Sample-Bug-tp20439.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Diffing execution plans to understand an optimizer bug

2016-11-08 Thread Herman van Hövell tot Westerflier
Replied in the ticket.

On Tue, Nov 8, 2016 at 11:36 PM, Nicholas Chammas <
nicholas.cham...@gmail.com> wrote:

> SPARK-18367 <https://issues.apache.org/jira/browse/SPARK-18367>: limit()
> makes the lame walk again
>
> On Tue, Nov 8, 2016 at 5:00 PM Nicholas Chammas <
> nicholas.cham...@gmail.com> wrote:
>
>> Hmm, it doesn’t seem like I can access the output of
>> df._jdf.queryExecution().hiveResultString() from Python, and until I can
>> boil the issue down a bit, I’m stuck with using Python.
>>
>> I’ll have a go at using regexes to strip some stuff from the printed
>> plans. The one that’s working for me to strip the IDs is #\d+L?.
>>
>> Nick
>> ​
>>
>> On Tue, Nov 8, 2016 at 4:47 PM Reynold Xin  wrote:
>>
>> If you want to peek into the internals and do crazy things, it is much
>> easier to do it in Scala with df.queryExecution.
>>
>> For explain string output, you can work around the comparison simply by
>> doing replaceAll("#\\d+", "#x")
>>
>> similar to the patch here: https://github.com/apache/spark/commit/
>> fd90541c35af2bccf0155467bec8cea7c8865046#diff-
>> 432455394ca50800d5de508861984ca5R217
>>
>>
>>
>> On Tue, Nov 8, 2016 at 1:42 PM, Nicholas Chammas <
>> nicholas.cham...@gmail.com> wrote:
>>
>> I’m trying to understand what I think is an optimizer bug. To do that,
>> I’d like to compare the execution plans for a certain query with and
>> without a certain change, to understand how that change is impacting the
>> plan.
>>
>> How would I do that in PySpark? I’m working with 2.0.1, but I can use
>> master if it helps.
>>
>> explain()
>> <http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.explain>
>> is helpful but is limited in two important ways:
>>
>>1. It prints to screen and doesn’t offer another way to access the
>>plan or capture it.
>>2.
>>
>>The printed plan includes auto-generated IDs that make diffing
>>impossible. e.g.
>>
>> == Physical Plan ==
>> *Project [struct(primary_key#722, person#550, dataset_name#671)
>>
>>
>> Any suggestions on what to do? Any relevant JIRAs I should follow?
>>
>> Nick
>> ​
>>
>>
>>


Re: Diffing execution plans to understand an optimizer bug

2016-11-08 Thread Nicholas Chammas
SPARK-18367 <https://issues.apache.org/jira/browse/SPARK-18367>: limit()
makes the lame walk again

On Tue, Nov 8, 2016 at 5:00 PM Nicholas Chammas 
wrote:

> Hmm, it doesn’t seem like I can access the output of
> df._jdf.queryExecution().hiveResultString() from Python, and until I can
> boil the issue down a bit, I’m stuck with using Python.
>
> I’ll have a go at using regexes to strip some stuff from the printed
> plans. The one that’s working for me to strip the IDs is #\d+L?.
>
> Nick
> ​
>
> On Tue, Nov 8, 2016 at 4:47 PM Reynold Xin  wrote:
>
> If you want to peek into the internals and do crazy things, it is much
> easier to do it in Scala with df.queryExecution.
>
> For explain string output, you can work around the comparison simply by
> doing replaceAll("#\\d+", "#x")
>
> similar to the patch here:
> https://github.com/apache/spark/commit/fd90541c35af2bccf0155467bec8cea7c8865046#diff-432455394ca50800d5de508861984ca5R217
>
>
>
> On Tue, Nov 8, 2016 at 1:42 PM, Nicholas Chammas <
> nicholas.cham...@gmail.com> wrote:
>
> I’m trying to understand what I think is an optimizer bug. To do that, I’d
> like to compare the execution plans for a certain query with and without a
> certain change, to understand how that change is impacting the plan.
>
> How would I do that in PySpark? I’m working with 2.0.1, but I can use
> master if it helps.
>
> explain()
> <http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.explain>
> is helpful but is limited in two important ways:
>
>1. It prints to screen and doesn’t offer another way to access the
>plan or capture it.
>2.
>
>The printed plan includes auto-generated IDs that make diffing
>impossible. e.g.
>
> == Physical Plan ==
> *Project [struct(primary_key#722, person#550, dataset_name#671)
>
>
> Any suggestions on what to do? Any relevant JIRAs I should follow?
>
> Nick
> ​
>
>
>


Re: Diffing execution plans to understand an optimizer bug

2016-11-08 Thread Nicholas Chammas
Hmm, it doesn’t seem like I can access the output of
df._jdf.queryExecution().hiveResultString() from Python, and until I can
boil the issue down a bit, I’m stuck with using Python.

I’ll have a go at using regexes to strip some stuff from the printed plans.
The one that’s working for me to strip the IDs is #\d+L?.

Nick
​

On Tue, Nov 8, 2016 at 4:47 PM Reynold Xin  wrote:

> If you want to peek into the internals and do crazy things, it is much
> easier to do it in Scala with df.queryExecution.
>
> For explain string output, you can work around the comparison simply by
> doing replaceAll("#\\d+", "#x")
>
> similar to the patch here:
> https://github.com/apache/spark/commit/fd90541c35af2bccf0155467bec8cea7c8865046#diff-432455394ca50800d5de508861984ca5R217
>
>
>
> On Tue, Nov 8, 2016 at 1:42 PM, Nicholas Chammas <
> nicholas.cham...@gmail.com> wrote:
>
> I’m trying to understand what I think is an optimizer bug. To do that, I’d
> like to compare the execution plans for a certain query with and without a
> certain change, to understand how that change is impacting the plan.
>
> How would I do that in PySpark? I’m working with 2.0.1, but I can use
> master if it helps.
>
> explain()
> <http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.explain>
> is helpful but is limited in two important ways:
>
>1. It prints to screen and doesn’t offer another way to access the
>plan or capture it.
>2.
>
>The printed plan includes auto-generated IDs that make diffing
>impossible. e.g.
>
> == Physical Plan ==
> *Project [struct(primary_key#722, person#550, dataset_name#671)
>
>
> Any suggestions on what to do? Any relevant JIRAs I should follow?
>
> Nick
> ​
>
>
>


Re: Diffing execution plans to understand an optimizer bug

2016-11-08 Thread Reynold Xin
If you want to peek into the internals and do crazy things, it is much
easier to do it in Scala with df.queryExecution.

For explain string output, you can work around the comparison simply by
doing replaceAll("#\\d+", "#x")

similar to the patch here:
https://github.com/apache/spark/commit/fd90541c35af2bccf0155467bec8cea7c8865046#diff-432455394ca50800d5de508861984ca5R217



On Tue, Nov 8, 2016 at 1:42 PM, Nicholas Chammas  wrote:

> I’m trying to understand what I think is an optimizer bug. To do that, I’d
> like to compare the execution plans for a certain query with and without a
> certain change, to understand how that change is impacting the plan.
>
> How would I do that in PySpark? I’m working with 2.0.1, but I can use
> master if it helps.
>
> explain()
> <http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.explain>
> is helpful but is limited in two important ways:
>
>1. It prints to screen and doesn’t offer another way to access the
>plan or capture it.
>2.
>
>The printed plan includes auto-generated IDs that make diffing
>impossible. e.g.
>
> == Physical Plan ==
> *Project [struct(primary_key#722, person#550, dataset_name#671)
>
>
> Any suggestions on what to do? Any relevant JIRAs I should follow?
>
> Nick
> ​
>


Diffing execution plans to understand an optimizer bug

2016-11-08 Thread Nicholas Chammas
I’m trying to understand what I think is an optimizer bug. To do that, I’d
like to compare the execution plans for a certain query with and without a
certain change, to understand how that change is impacting the plan.

How would I do that in PySpark? I’m working with 2.0.1, but I can use
master if it helps.

explain()
<http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.explain>
is helpful but is limited in two important ways:

   1. It prints to screen and doesn’t offer another way to access the plan
   or capture it.
   2.

   The printed plan includes auto-generated IDs that make diffing
   impossible. e.g.

== Physical Plan ==
*Project [struct(primary_key#722, person#550, dataset_name#671)


Any suggestions on what to do? Any relevant JIRAs I should follow?

Nick
​


Re: Ran in to a bug in Broadcast Hash Join

2016-10-22 Thread Michael Armbrust
2.0.0 or 2.0.1?  There are several correctness fixes in the latter.

On Oct 22, 2016 2:14 PM, "Jeremy Davis"  wrote:

>
> Hello, I ran in to a bug with Broadcast Hash Join in Spark 2.0. (Running
> on EMR)
> If I just toggle spark.sql.autoBroadcastJoinThreshold=-1 then the join
> works, if I leave it as default it does not work.
> When it doesn’t work, then one of my joined columns is filled with very
> small Doubles.
>
> I’m joining two small tables: (datetime,spx) and (datetime,vix)
> Attached are the plans and debug.
>
>
> 
>
> The (Default) Broken case:
>
>
> +-+---+
> | datetime|spx|
> +-+---+
> |147690720|2144.290039|
> |147682080|2139.600098|
> |147673440| 2126.5|
> |147647520| 2132.97998|
> |147638880|2132.550049|
> |147630240|2139.179932|
> |147621600| 2136.72998|
> |147612960|2163.659912|
> |147587040| 2153.73999|
> |147578400| 2160.77002|
> |147569760| 2159.72998|
> |147561120| 2150.48999|
> |147552480|2161.199951|
> |147526560| 2168.27002|
> |147517920|2151.129883|
> |147509280|2171.370117|
> |147500640|2159.929932|
> |147492000|2146.100098|
> |147466080|2164.689941|
> |147457440|2177.179932|
> +-+---+
> only showing top 20 rows
>
> +-+-+
> | datetime|  vix|
> +-+-+
> |147690720|14.41|
> |147682080|15.28|
> |147673440|16.20|
> |147647520|16.120001|
> |147638880|16.690001|
> |147630240|15.91|
> |147621600|15.36|
> |147612960|13.38|
> |147587040|13.48|
> |147578400|12.84|
> |147569760|12.99|
> |147561120|13.63|
> |147552480|13.57|
> |147526560|13.29|
> |147517920|14.02|
> |147509280|12.39|
> |147500640| 13.1|
> |147492000| 14.5|
> |147466080|12.29|
> |147457440|12.02|
> +-+-+
> only showing top 20 rows
>
> 2016-10-22T20:50:31.345+: [GC (Allocation Failure) [PSYoungGen: 
> 704134K->79382K(945664K)] 823872K->199145K(3089408K), 0.0285894 secs] [Times: 
> user=0.29 sys=0.04, real=0.03 secs]
> == Physical Plan ==
> *Project [datetime#34L, spx#25, vix#72]
> +- *BroadcastHashJoin [datetime#34L], [datetime#81L], Inner, BuildRight
>:- *Project [if (((isnull(cast(cast(substring(date#0, 1, 4) as 
> decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as 
> decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as 
> decimal(20,0)) as int null else UDF(cast(cast(substring(date#0, 1, 4) as 
> decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) 
> as int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)) AS 
> datetime#34L, cast(Close#4 as double) AS spx#25]
>:  +- *Filter ((isnotnull(Date#0) && NOT (Date#0 = Date)) && isnotnull(if 
> (((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || 
> isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || 
> isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int null 
> else UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), 
> cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), 
> cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int
>: +- *Scan csv [Date#0,Close#4] Format: CSV, InputPaths: 
> s3n://dataproc-data/data/index/spx, PushedFilters: [IsNotNull(Date), 
> Not(EqualTo(Date,Date))], ReadSchema: struct
>+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, 
> true]))
>   +- *Project [if (((isnull(cast(cast(substring(date#47, 1, 4) as 
> decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as 
> decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as 
> decimal(20,0)) as int null else UDF(cast(cast(substring(date#47, 1, 4) as 
> decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) 
> as int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)) AS 
> datetime#81L, cast(Close#51 as double) AS vix#72]
>  +- *Filter ((isnotnull(Date#47) && NOT (Date#47 = Date)) && 
> isnotnull(if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) 
> as int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as 
> int))) || isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as 
> int null else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as 
> int), cast(cast(substrin

Ran in to a bug in Broadcast Hash Join

2016-10-22 Thread Jeremy Davis

Hello, I ran in to a bug with Broadcast Hash Join in Spark 2.0. (Running on EMR)
If I just toggle spark.sql.autoBroadcastJoinThreshold=-1 then the join works, 
if I leave it as default it does not work.
When it doesn’t work, then one of my joined columns is filled with very small 
Doubles.

I’m joining two small tables: (datetime,spx) and (datetime,vix)
Attached are the plans and debug.


The (Default) Broken case:

+-+---+
| datetime|spx|
+-+---+
|147690720|2144.290039|
|147682080|2139.600098|
|147673440| 2126.5|
|147647520| 2132.97998|
|147638880|2132.550049|
|147630240|2139.179932|
|147621600| 2136.72998|
|147612960|2163.659912|
|147587040| 2153.73999|
|147578400| 2160.77002|
|147569760| 2159.72998|
|147561120| 2150.48999|
|147552480|2161.199951|
|147526560| 2168.27002|
|147517920|2151.129883|
|147509280|2171.370117|
|147500640|2159.929932|
|147492000|2146.100098|
|147466080|2164.689941|
|147457440|2177.179932|
+-+---+
only showing top 20 rows

+-+-+
| datetime|  vix|
+-+-+
|147690720|14.41|
|147682080|15.28|
|147673440|16.20|
|147647520|16.120001|
|147638880|16.690001|
|147630240|15.91|
|147621600|15.36|
|147612960|13.38|
|147587040|13.48|
|147578400|12.84|
|147569760|12.99|
|147561120|13.63|
|147552480|13.57|
|147526560|13.29|
|147517920|14.02|
|147509280|12.39|
|147500640| 13.1|
|147492000| 14.5|
|147466080|12.29|
|147457440|12.02|
+-+-+
only showing top 20 rows

2016-10-22T20:50:31.345+: [GC (Allocation Failure) [PSYoungGen: 
704134K->79382K(945664K)] 823872K->199145K(3089408K), 0.0285894 secs] [Times: 
user=0.29 sys=0.04, real=0.03 secs]
== Physical Plan ==
*Project [datetime#34L, spx#25, vix#72]
+- *BroadcastHashJoin [datetime#34L], [datetime#81L], Inner, BuildRight
   :- *Project [if (((isnull(cast(cast(substring(date#0, 1, 4) as 
decimal(20,0)) as int)) || isnull(cast(cast(substring(date#0, 6, 2) as 
decimal(20,0)) as int))) || isnull(cast(cast(substring(date#0, 9, 2) as 
decimal(20,0)) as int null else UDF(cast(cast(substring(date#0, 1, 4) as 
decimal(20,0)) as int), cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as 
int), cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int)) AS 
datetime#34L, cast(Close#4 as double) AS spx#25]
   :  +- *Filter ((isnotnull(Date#0) && NOT (Date#0 = Date)) && isnotnull(if 
(((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || 
isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || 
isnull(cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int null else 
UDF(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int), 
cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int), 
cast(cast(substring(date#0, 9, 2) as decimal(20,0)) as int
   : +- *Scan csv [Date#0,Close#4] Format: CSV, InputPaths: 
s3n://dataproc-data/data/index/spx, PushedFilters: [IsNotNull(Date), 
Not(EqualTo(Date,Date))], ReadSchema: struct
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, 
true]))
  +- *Project [if (((isnull(cast(cast(substring(date#47, 1, 4) as 
decimal(20,0)) as int)) || isnull(cast(cast(substring(date#47, 6, 2) as 
decimal(20,0)) as int))) || isnull(cast(cast(substring(date#47, 9, 2) as 
decimal(20,0)) as int null else UDF(cast(cast(substring(date#47, 1, 4) as 
decimal(20,0)) as int), cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as 
int), cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int)) AS 
datetime#81L, cast(Close#51 as double) AS vix#72]
 +- *Filter ((isnotnull(Date#47) && NOT (Date#47 = Date)) && 
isnotnull(if (((isnull(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as 
int)) || isnull(cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int))) 
|| isnull(cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int null 
else UDF(cast(cast(substring(date#47, 1, 4) as decimal(20,0)) as int), 
cast(cast(substring(date#47, 6, 2) as decimal(20,0)) as int), 
cast(cast(substring(date#47, 9, 2) as decimal(20,0)) as int
+- *Scan csv [Date#47,Close#51] Format: CSV, InputPaths: 
s3n://dataproc-data/data/index/vix, PushedFilters: [IsNotNull(Date), 
Not(EqualTo(Date,Date))], ReadSchema: struct

== Parsed Logical Plan ==
'Join UsingJoin(Inner,List('datetime))
:- Project [datetime#34L, spx#25]
:  +- Project [Date#0, Open#1, High#2, Low#3, spx#25, Volume#5, Adj Close#6, if 
(((isnull(cast(cast(substring(date#0, 1, 4) as decimal(20,0)) as int)) || 
isnull(cast(cast(substring(date#0, 6, 2) as decimal(20,0)) as int))) || 
isnull(cast(cast(

Re: java.lang.NoClassDefFoundError, is this a bug?

2016-09-22 Thread Xiang Gao
Yes, I mean local here. Thanks for pointing this out. Also thanks for
explaining the problem.



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/java-lang-NoClassDefFoundError-is-this-a-bug-tp18972p19011.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: java.lang.NoClassDefFoundError, is this a bug?

2016-09-21 Thread Jakob Odersky
Hi Xiang,

this error also appears in client mode (maybe the situation that you
were referring to and that worked was local mode?), however the error
is expected and is not a bug.

this line in your snippet:
object Main extends A[String] { //...
is, after desugaring, equivalent to:
object Main extends
A[String]()(Env.spark.implicits.newStringEncoder) { //...
Essentially, when the singleton object `Main` is initialised, it will
evaluate all its parameters, i.e. it will call
`Env.spark.implicitcs.newStringEncoder`. Since your `main` method is
also defined in this object, it will be initialised as soon as your
application starts, that is before a spark session is started. The
"problem" is that encoders require an active session and hence you
have an initialisation order problem. (You can replay the problem
simply by defining a `val x = Env.spark.implicits.newStringEncoder` in
your singleton object)

The error message is weird and not so helpful (I think this is due to
the way Spark uses ClassLoaders internally when running a submitted
application), however it isn't a bug in spark.

In local mode you will not experience the issue because you are
starting a session when the session builder is accessed the first time
via `Env.spark`.

Aside from the errors you're getting, there's another subtlety in your
snippet that may bite you later: the adding "T : Encoder" to your
super class has no effect with the current way that also imports
Env.spark.implicits._

best,
--Jakob


On Sat, Sep 17, 2016 at 8:26 PM, Xiang Gao  wrote:
> Yes. Besides, if you change the "T : Encoder" to "T", it OK too.
>
>
>
> --
> View this message in context: 
> http://apache-spark-developers-list.1001551.n3.nabble.com/java-lang-NoClassDefFoundError-is-this-a-bug-tp18972p18981.html
> Sent from the Apache Spark Developers List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: java.lang.NoClassDefFoundError, is this a bug?

2016-09-17 Thread Xiang Gao
Yes. Besides, if you change the "T : Encoder" to "T", it OK too.



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/java-lang-NoClassDefFoundError-is-this-a-bug-tp18972p18981.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: java.lang.NoClassDefFoundError, is this a bug?

2016-09-17 Thread WangJianfei
if I remove this abstract class A[T : Encoder] {}  it's ok!



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/java-lang-NoClassDefFoundError-is-this-a-bug-tp18972p18980.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: java.lang.NoClassDefFoundError, is this a bug?

2016-09-17 Thread Xiang Gao
spark standalone cluster



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/java-lang-NoClassDefFoundError-is-this-a-bug-tp18972p18979.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: java.lang.NoClassDefFoundError, is this a bug?

2016-09-17 Thread WangJianfei
do you run this on yarn mode or else?



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/java-lang-NoClassDefFoundError-is-this-a-bug-tp18972p18978.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: java.lang.NoClassDefFoundError, is this a bug?

2016-09-17 Thread Xiang Gao
Besides, if you replace line #14 with:
Env.spark.createDataset(Seq("a","b","c")).rdd.map(func).collect()

You will have the same problem with a different stack trace:

Caused by: java.lang.NoClassDefFoundError: Could not initialize class Main$
at Main$$anonfun$main$1.apply(test.scala:14)
at Main$$anonfun$main$1.apply(test.scala:14)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at
scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at scala.collection.AbstractIterator.to(Iterator.scala:1336)
at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
at
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
at
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:912)
at
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:912)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1918)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1918)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/java-lang-NoClassDefFoundError-is-this-a-bug-tp18972p18976.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: java.lang.NoClassDefFoundError, is this a bug?

2016-09-17 Thread Jacek Laskowski
Hi,

I'm surprised too. Here's the entire stack trace for reference. I'd
also like to know what causes the issue.

Caused by: java.lang.NoClassDefFoundError: Could not initialize class Main$
at Main$$anonfun$main$1.apply(test.scala:14)
at Main$$anonfun$main$1.apply(test.scala:14)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:277)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Sat, Sep 17, 2016 at 11:08 PM, Xiang Gao  wrote:
> Hi,
>
> In my application, I got a weird error message:
> java.lang.NoClassDefFoundError: Could not initialize class X
>
> This happens only when I try to submit my application in cluster mode. It
> works perfectly in client mode.
>
> I'm able to reproduce this error message by a simple 16-line program:
> https://github.com/zasdfgbnm/spark-test1/blob/master/src/main/scala/test.scala
>
> To reproduce it, simply clone this git repo, and then execute command like:
> sbt package && spark-submit --master spark://localhost:7077
> target/scala-2.11/createdataset_2.11-0.0.1-SNAPSHOT.jar
>
> Can anyone check whether this is a bug of spark?
>
>
>
> --
> View this message in context: 
> http://apache-spark-developers-list.1001551.n3.nabble.com/java-lang-NoClassDefFoundError-is-this-a-bug-tp18972.html
> Sent from the Apache Spark Developers List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



java.lang.NoClassDefFoundError, is this a bug?

2016-09-17 Thread Xiang Gao
Hi,

In my application, I got a weird error message:
java.lang.NoClassDefFoundError: Could not initialize class X

This happens only when I try to submit my application in cluster mode. It
works perfectly in client mode.

I'm able to reproduce this error message by a simple 16-line program:
https://github.com/zasdfgbnm/spark-test1/blob/master/src/main/scala/test.scala

To reproduce it, simply clone this git repo, and then execute command like:
sbt package && spark-submit --master spark://localhost:7077
target/scala-2.11/createdataset_2.11-0.0.1-SNAPSHOT.jar

Can anyone check whether this is a bug of spark?



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/java-lang-NoClassDefFoundError-is-this-a-bug-tp18972.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Possible bug related to [SPARK-5708]

2016-04-07 Thread Mihir Monani
Hi Everyone,

I was not able to get Metrics in files separately  using Slf4jsink with
below configuration in *metrics.properties* :-

*# Enable Slf4jSink for all instances by class name*
**.sink.slf4j.class=org.apache.spark.metrics.sink.Slf4jSink*

*# Polling period for Slf4JSink*
**.sink.slf4j.period=1*

**.sink.slf4j.unit=minutes*

It only prints in root logger file.

After making below changes in Slf4jsink.scala , i was able to get metrics
using log4j.properties in separate file :-


Slf4jsink.scala (only changes):-

*import org.slf4j.Logger*
*import org.slf4j.LoggerFactory*

*val reporter: Slf4jReporter =
Slf4jReporter.forRegistry(registry).outputTo(LoggerFactory.getLogger("org.apache.spark.metrics"))*
*.convertDurationsTo(TimeUnit.MILLISECONDS)*
*.convertRatesTo(TimeUnit.SECONDS)*
*.build()*

log4j.properties (only changes) :-

log4j.logger.org.apache.spark.metrics=INFO, metricFileAppender
log4j.additivity.org.apache.spark.metrics=true

log4j.appender.metricFileAppender=org.apache.log4j.RollingFileAppender
log4j.appender.metricFileAppender.File=logs/metric.log
log4j.appender.metricFileAppender.MaxFileSize=10MB
log4j.appender.metricFileAppender.MaxBackupIndex=10
log4j.appender.metricFileAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.metricFileAppender.layout.ConversionPattern=%d{-MM-dd
HH:mm:ss} %-5p %c{1}:%L - %m%n

Am i using wrong configuration or there is something missing in
Slf4jsink.scala
<https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/metrics/sink/Slf4jSink.scala#L51>

There is one more bug :-

Spark is printing metric as log twice in file or on console(if you root
logger is enable) with or without above mention changes.

Thanks,
Mihir Monani


Bug in DiskBlockManager subDirs logic?

2016-02-25 Thread Zee Chen
Hi,

I am debugging a situation where SortShuffleWriter sometimes fail to
create a file, with the following stack trace:

16/02/23 11:48:46 ERROR Executor: Exception in task 13.0 in stage
47827.0 (TID 1367089)
java.io.FileNotFoundException:
/tmp/spark-9dd8dca9-6803-4c6c-bb6a-0e9c0111837c/executor-129dfdb8-9422-4668-989e-e789703526ad/blockmgr-dda6e340-7859-468f-b493-04e4162d341a/00/temp_shuffle_69fe1673-9ff2-462b-92b8-683d04669aad
(No such file or directory)
at java.io.FileOutputStream.open0(Native Method)
at java.io.FileOutputStream.open(FileOutputStream.java:270)
at java.io.FileOutputStream.(FileOutputStream.java:213)
at 
org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:88)
at 
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.insertAll(BypassMergeSortShuffleWriter.java:110)
at 
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


I checked the linux file system (ext4) and saw the /00/ subdir is
missing. I went through the heap dump of the
CoarseGrainedExecutorBackend jvm proc and found that
DiskBlockManager's subDirs list had more non-null 2-hex subdirs than
present on the file system! As a test I created all 64 2-hex subdirs
by hand and then the problem went away.

So had anybody else seen this problem? Looking at the relevant logic
in DiskBlockManager and it hasn't changed much since the fix to
https://issues.apache.org/jira/browse/SPARK-6468

My configuration:
spark-1.5.1, hadoop-2.6.0, standalone, oracle jdk8u60

Thanks,
Zee

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: Opening a JIRA for QuantileDiscretizer bug

2016-02-23 Thread Sean Owen
Good catch, though probably very slightly simpler to write

math.min(requiredSamples.toDouble ...

Make sure you're logged in to JIRA maybe. If you have any trouble I'll
open it for you. You can file it as a minor bug against ML.

This is how you open a PR and everything else
https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark

On Tue, Feb 23, 2016 at 2:45 AM, Pierson, Oliver C  wrote:
> Hello,
>
>   I've discovered a bug in the QuantileDiscretizer estimator.  Specifically,
> for large DataFrames QuantileDiscretizer will only create one split (i.e.
> two bins).
>
>
> The error happens in lines 113 and 114 of QuantileDiscretizer.scala:
>
>
> val requiredSamples = math.max(numBins * numBins, 1)
>
> val fraction = math.min(requiredSamples / dataset.count(), 1.0)
>
>
> After the first line, requiredSamples is an Int.  Therefore, if
> requiredSamples > dataset.count() then fraction is always 0.0.
>
>
> The problem can be simply fixed by replacing the first with:
>
>
>   val requiredSamples = math.max(numBins * numBins, 1.0)
>
>
> I've implemented this change in my fork and all tests passed (except for
> docker integration, but I think that's another issue).  I'm happy to submit
> a PR if it will ease someone else's workload.  However, I'm unsure of how to
> create a JIRA.  I've created an account on the issue tracker
> (issues.apache.org) but when I try to create an issue it asks me to choose a
> "Service Desk".  Which one should I be choosing?
>
>
> Thanks much,
>
> Oliver Pierson
>
>
>

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: Opening a JIRA for QuantileDiscretizer bug

2016-02-22 Thread Ted Yu
When you click on Create, you're brought to 'Create Issue' dialog where you
choose Project Spark.
Component should be MLlib.

Please see also:
http://search-hadoop.com/m/q3RTtmsshe1W6cH22/spark+pull+template&subj=pull+request+template


On Mon, Feb 22, 2016 at 6:45 PM, Pierson, Oliver C  wrote:

> Hello,
>
>   I've discovered a bug in the QuantileDiscretizer estimator.
> Specifically, for large DataFrames QuantileDiscretizer will only create one
> split (i.e. two bins).
>
>
> The error happens in lines 113 and 114 of QuantileDiscretizer.scala:
>
>
> val requiredSamples = math.max(numBins * numBins, 1)
>
> val fraction = math.min(requiredSamples / dataset.count(), 1.0)
>
>
> After the first line, requiredSamples is an Int.  Therefore, if
> requiredSamples > dataset.count() then fraction is always 0.0.
>
>
> The problem can be simply fixed by replacing the first with:
>
>
>   val requiredSamples = math.max(numBins * numBins, 1.0)
>
>
> I've implemented this change in my fork and all tests passed (except for
> docker integration, but I think that's another issue).  I'm happy to submit
> a PR if it will ease someone else's workload.  However, I'm unsure of how
> to create a JIRA.  I've created an account on the issue tracker (
> issues.apache.org) but when I try to create an issue it asks me to choose
> a "Service Desk".  Which one should I be choosing?
>
>
> Thanks much,
>
> Oliver Pierson
>
>
>
>


Opening a JIRA for QuantileDiscretizer bug

2016-02-22 Thread Pierson, Oliver C
Hello,

  I've discovered a bug in the QuantileDiscretizer estimator.  Specifically, 
for large DataFrames QuantileDiscretizer will only create one split (i.e. two 
bins).


The error happens in lines 113 and 114 of QuantileDiscretizer.scala:


val requiredSamples = math.max(numBins * numBins, 1)

val fraction = math.min(requiredSamples / dataset.count(), 1.0)


After the first line, requiredSamples is an Int.  Therefore, if requiredSamples 
> dataset.count() then fraction is always 0.0.


The problem can be simply fixed by replacing the first with:


  val requiredSamples = math.max(numBins * numBins, 1.0)


I've implemented this change in my fork and all tests passed (except for docker 
integration, but I think that's another issue).  I'm happy to submit a PR if it 
will ease someone else's workload.  However, I'm unsure of how to create a 
JIRA.  I've created an account on the issue tracker (issues.apache.org) but 
when I try to create an issue it asks me to choose a "Service Desk".  Which one 
should I be choosing?


Thanks much,

Oliver Pierson




Re: Spark 1.6.0 Streaming + Persistance Bug?

2016-02-02 Thread mkhaitman
Actually disregard! Forgot that
spark.dynamicAllocation.cachedExecutorIdleTimeout was defaulted to Infinity,
so lowering that should solve the problem :)

Mark.



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-1-6-0-Streaming-Persistance-Bug-tp16190p16191.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Spark 1.6.0 Streaming + Persistance Bug?

2016-02-02 Thread mkhaitman
Calling unpersist on an RDD in a spark streaming application does not
actually unpersist the blocks from memory and/or disk. After the RDD has
been processed in a .foreach(rdd) call, I attempt to unpersist the rdd since
it is no longer useful to store in memory/disk. This mainly causes a problem
with dynamic allocation where after the batch of data has been processed, we
want the executors to destroy their executors (giving the cores and memory
back to the cluster while waiting for the next batch processing attempt to
occur). 

Is this a known issue? It's not major in that it doesn't break anything...
just prevents dynamic allocation from working as well as it could if
streaming is combined with it.

Thanks,
Mark.



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-1-6-0-Streaming-Persistance-Bug-tp16190.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



A bug in Spark ML? NoSuchElementException while using RandomForest for regression.

2015-12-16 Thread Eugene Morozov
Hi!

I've looked through issues and haven't found anything like that, so I've
created a new one. Everything to reproduce is attached to it:
https://issues.apache.org/jira/browse/SPARK-12367

Could you, please, take a look and if possible advice any workaround.
Thank you in advance.
--
Be well!
Jean Morozov


Re: A bug in Spark standalone? Worker registration and deregistration

2015-12-10 Thread Jacek Laskowski
On Thu, Dec 10, 2015 at 8:10 PM, Shixiong Zhu  wrote:
> Jacek, could you create a JIRA for it? I just reproduced it. It's a bug in
> how Master handles the Worker disconnection.

Hi Shixiong,

I'm saved. Kept thinking I'm lost in the sources and see ghosts :-)

https://issues.apache.org/jira/browse/SPARK-12267

Pozdrawiam,
Jacek

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: A bug in Spark standalone? Worker registration and deregistration

2015-12-10 Thread Shixiong Zhu
Jacek, could you create a JIRA for it? I just reproduced it. It's a bug in
how Master handles the Worker disconnection.

Best Regards,
Shixiong Zhu

2015-12-10 2:45 GMT-08:00 Jacek Laskowski :

> Hi,
>
> I'm on yesterday's master HEAD.
>
> Pozdrawiam,
> Jacek
>
> --
> Jacek Laskowski | https://medium.com/@jaceklaskowski/ |
> http://blog.jaceklaskowski.pl
> Mastering Spark https://jaceklaskowski.gitbooks.io/mastering-apache-spark/
> Follow me at https://twitter.com/jaceklaskowski
> Upvote at http://stackoverflow.com/users/1305344/jacek-laskowski
>
>
> On Thu, Dec 10, 2015 at 9:50 AM, Sasaki Kai 
> wrote:
> > Hi, Jacek
> >
> > What version of Spark do you use?
> > I started sbin/start-master.sh script as you did against master HEAD.
> But there is no warning log such you pasted.
> > While you can specify hostname with -h option, you can also omit it. The
> master name can be set automatically with
> > the name `hostname` command. You can also try it.
> >
> > Kai Sasaki
> >
> >> On Dec 10, 2015, at 5:22 PM, Jacek Laskowski  wrote:
> >>
> >> Hi,
> >>
> >> While toying with Spark Standalone I've noticed the following messages
> >> in the logs of the master:
> >>
> >> INFO Master: Registering worker 192.168.1.6:59919 with 2 cores, 2.0 GB
> RAM
> >> INFO Master: localhost:59920 got disassociated, removing it.
> >> ...
> >> WARN Master: Removing worker-20151210090708-192.168.1.6-59919 because
> >> we got no heartbeat in 60 seconds
> >> INFO Master: Removing worker worker-20151210090708-192.168.1.6-59919
> >> on 192.168.1.6:59919
> >>
> >> Why does the message "WARN Master: Removing
> >> worker-20151210090708-192.168.1.6-59919 because we got no heartbeat in
> >> 60 seconds" appear when the worker should've been gone already (as
> >> pointed out in "INFO Master: localhost:59920 got disassociated,
> >> removing it.")?
> >>
> >> Could it be that the ids are different - 192.168.1.6:59919 vs
> localhost:59920?
> >>
> >> I started master using "./sbin/start-master.sh -h localhost" and the
> >> workers "./sbin/start-slave.sh spark://localhost:7077".
> >>
> >> p.s. Are such questions appropriate for this mailing list?
> >>
> >> Pozdrawiam,
> >> Jacek
> >>
> >> --
> >> Jacek Laskowski | https://medium.com/@jaceklaskowski/ |
> >> http://blog.jaceklaskowski.pl
> >> Mastering Spark
> https://jaceklaskowski.gitbooks.io/mastering-apache-spark/
> >> Follow me at https://twitter.com/jaceklaskowski
> >> Upvote at http://stackoverflow.com/users/1305344/jacek-laskowski
> >>
> >> -
> >> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
> >> For additional commands, e-mail: dev-h...@spark.apache.org
> >>
> >
>
> -
> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
> For additional commands, e-mail: dev-h...@spark.apache.org
>
>


  1   2   3   >