[jira] [Updated] (SPARK-31356) reduceGroupValues function for KeyValueGroupedDataset

2020-07-25 Thread Martin Loncaric (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-31356?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martin Loncaric updated SPARK-31356:

Description: 
Problem: in Datasets API, it is a very common pattern to do something like this 
whenever a complex reduce function is needed:

{code:scala}
ds
  .groupByKey(_.y)
  .reduceGroups((a, b) => out)
  .map(_._2)
{code}

However, the the optimized plan unfortunately ends up with an unnecessary 
implicit serialization during aggregation step, followed by 
{{DeserializeToObject + MapElements (from (K, V) => V)}}. For this common use 
case, there are 2 things we can improve with a specialized 
{{.reduceGroupValues}}:

1. avoid the extra serialization (baked in to AggregationIterator 
implementations) and deserialization
2. avoid requiring a {{.map}} and the time it takes to unpack tuples by 
emitting the values only

Proposal:

{code:scala}
ds
  .groupByKey(_.y)
  .reduceGroupValues((a, b) => out)
{code}
Create an {{AggregationIteratorBase}} superclass that can emit general 
{{InternalRow}} s instead of just {{UnsafeRow}} s.
Create a new {{AggregationIteratorBase}} implementation called 
{{ObjectValuesAggregationIterator}} that emits {{InternalRow}} s containing 
only the values instead of serializing them into {{UnsafeRow}} s on {{Final}} 
or {{Complete}} modes. Since we don't need to emit the keys, which are 
serialized, this is not too complicated. To make use of this, have the 
{{.reduceGroupValues}} return an {{Aggregate}} wrapped in a 
{{CatalystSerde.serialize}}.

  was:
Problem: in Datasets API, it is a very common pattern to do something like this 
whenever a complex reduce function is needed:

{code:scala}
ds
  .groupByKey(_.y)
  .reduceGroups(((aKey, aVal), (bKey, bVal)) => outVal)
  .map(_._2)
{code}

However, the the optimized plan unfortunately ends up with an unnecessary 
implicit serialization during aggregation step, followed by 
{{DeserializeToObject + MapElements (from (K, V) => V)}}. For this common use 
case, there are 2 things we can improve with a specialized 
{{.reduceGroupValues}}:

1. avoid the extra serialization (baked in to AggregationIterator 
implementations) and deserialization
2. avoid requiring a {{.map}} and the time it takes to unpack tuples by 
emitting the values only

Proposal:

{code:scala}
ds
  .groupByKey(_.y)
  .reduceGroupValues((aVal, bVal) => outVal)
{code}
Create an {{AggregationIteratorBase}} superclass that can emit general 
{{InternalRow}} s instead of just {{UnsafeRow}} s.
Create a new {{AggregationIteratorBase}} implementation called 
{{ObjectValuesAggregationIterator}} that emits {{InternalRow}} s containing 
only the values instead of serializing them into {{UnsafeRow}} s on {{Final}} 
or {{Complete}} modes. Since we don't need to emit the keys, which are 
serialized, this is not too complicated. To make use of this, have the 
{{.reduceGroupValues}} return an {{Aggregate}} wrapped in a 
{{CatalystSerde.serialize}}.


> reduceGroupValues function for KeyValueGroupedDataset
> -
>
> Key: SPARK-31356
> URL: https://issues.apache.org/jira/browse/SPARK-31356
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Martin Loncaric
>Priority: Major
>
> Problem: in Datasets API, it is a very common pattern to do something like 
> this whenever a complex reduce function is needed:
> {code:scala}
> ds
>   .groupByKey(_.y)
>   .reduceGroups((a, b) => out)
>   .map(_._2)
> {code}
> However, the the optimized plan unfortunately ends up with an unnecessary 
> implicit serialization during aggregation step, followed by 
> {{DeserializeToObject + MapElements (from (K, V) => V)}}. For this common use 
> case, there are 2 things we can improve with a specialized 
> {{.reduceGroupValues}}:
> 1. avoid the extra serialization (baked in to AggregationIterator 
> implementations) and deserialization
> 2. avoid requiring a {{.map}} and the time it takes to unpack tuples by 
> emitting the values only
> Proposal:
> {code:scala}
> ds
>   .groupByKey(_.y)
>   .reduceGroupValues((a, b) => out)
> {code}
> Create an {{AggregationIteratorBase}} superclass that can emit general 
> {{InternalRow}} s instead of just {{UnsafeRow}} s.
> Create a new {{AggregationIteratorBase}} implementation called 
> {{ObjectValuesAggregationIterator}} that emits {{InternalRow}} s containing 
> only the values instead of serializing them into {{UnsafeRow}} s on {{Final}} 
> or {{Complete}} modes. Since we don't need to emit the keys, which are 
> serialized, this is not too complicated. To make use of this, have the 
> {{.reduceGroupValues}} return an {{Aggregate}} wrapped in a 
> {{CatalystSerde.serialize}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (SPARK-31356) reduceGroupValues function for KeyValueGroupedDataset

2020-07-18 Thread Martin Loncaric (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-31356?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martin Loncaric updated SPARK-31356:

Description: 
Problem: in Datasets API, it is a very common pattern to do something like this 
whenever a complex reduce function is needed:

{code:scala}
ds
  .groupByKey(_.y)
  .reduceGroups(((aKey, aVal), (bKey, bVal)) => outVal)
  .map(_._2)
{code}

However, the the optimized plan unfortunately ends up with an unnecessary 
implicit serialization during aggregation step, followed by 
{{DeserializeToObject + MapElements (from (K, V) => V)}}. For this common use 
case, there are 2 things we can improve with a specialized 
{{.reduceGroupValues}}:

1. avoid the extra serialization (baked in to AggregationIterator 
implementations) and deserialization
2. avoid requiring a {{.map}} and the time it takes to unpack tuples by 
emitting the values only

Proposal:

{code:scala}
ds
  .groupByKey(_.y)
  .reduceGroupValues((aVal, bVal) => outVal)
{code}
Create an {{AggregationIteratorBase}} superclass that can emit general 
{{InternalRow}} s instead of just {{UnsafeRow}} s.
Create a new {{AggregationIteratorBase}} implementation called 
{{ObjectValuesAggregationIterator}} that emits {{InternalRow}}s containing only 
the values instead of serializing them into {{UnsafeRow}}s on {{Final}} or 
{{Complete}} modes. Since we don't need to emit the keys, which are serialized, 
this is not too complicated. To make use of this, have the 
{{.reduceGroupValues}} return an {{Aggregate}} wrapped in a 
{{CatalystSerde.serialize}}.

  was:
Problem: in Datasets API, it is a very common pattern to do something like this 
whenever a complex reduce function is needed:

{code:scala}
ds
  .groupByKey(_.y)
  .reduceGroups(((aKey, aVal), (bKey, bVal)) => outVal)
  .map(_._2)
{code}

However, the the optimized plan unfortunately ends up with an unnecessary 
implicit serialization during aggregation step, followed by 
{{DeserializeToObject + MapElements (from (K, V) => V)}}. For this common use 
case, there are 2 things we can improve with a specialized 
{{.reduceGroupValues}}:

1. avoid the extra serialization (baked in to AggregationIterator 
implementations) and deserialization
2. avoid requiring a {{.map}} and the time it takes to unpack tuples by 
emitting the values only

Proposal:

{code:scala}
ds
  .groupByKey(_.y)
  .reduceGroupValues((aVal, bVal) => outVal)
{code}
Create an {{AggregationIteratorBase}} superclass that can emit general 
{{InternalRow}}s instead of just {{UnsafeRow}}s.
Create a new {{AggregationIteratorBase}} implementation called 
{{ObjectValuesAggregationIterator}} that emits {{InternalRow}}s containing only 
the values instead of serializing them into {{UnsafeRow}}s on {{Final}} or 
{{Complete}} modes. Since we don't need to emit the keys, which are serialized, 
this is not too complicated. To make use of this, have the 
{{.reduceGroupValues}} return an {{Aggregate}} wrapped in a 
{{CatalystSerde.serialize}}.


> reduceGroupValues function for KeyValueGroupedDataset
> -
>
> Key: SPARK-31356
> URL: https://issues.apache.org/jira/browse/SPARK-31356
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Martin Loncaric
>Priority: Major
>
> Problem: in Datasets API, it is a very common pattern to do something like 
> this whenever a complex reduce function is needed:
> {code:scala}
> ds
>   .groupByKey(_.y)
>   .reduceGroups(((aKey, aVal), (bKey, bVal)) => outVal)
>   .map(_._2)
> {code}
> However, the the optimized plan unfortunately ends up with an unnecessary 
> implicit serialization during aggregation step, followed by 
> {{DeserializeToObject + MapElements (from (K, V) => V)}}. For this common use 
> case, there are 2 things we can improve with a specialized 
> {{.reduceGroupValues}}:
> 1. avoid the extra serialization (baked in to AggregationIterator 
> implementations) and deserialization
> 2. avoid requiring a {{.map}} and the time it takes to unpack tuples by 
> emitting the values only
> Proposal:
> {code:scala}
> ds
>   .groupByKey(_.y)
>   .reduceGroupValues((aVal, bVal) => outVal)
> {code}
> Create an {{AggregationIteratorBase}} superclass that can emit general 
> {{InternalRow}} s instead of just {{UnsafeRow}} s.
> Create a new {{AggregationIteratorBase}} implementation called 
> {{ObjectValuesAggregationIterator}} that emits {{InternalRow}}s containing 
> only the values instead of serializing them into {{UnsafeRow}}s on {{Final}} 
> or {{Complete}} modes. Since we don't need to emit the keys, which are 
> serialized, this is not too complicated. To make use of this, have the 
> {{.reduceGroupValues}} return an {{Aggregate}} wrapped in a 
> {{CatalystSerde.serialize}}.



--
This message was sent by Atlassian Jira

[jira] [Updated] (SPARK-31356) reduceGroupValues function for KeyValueGroupedDataset

2020-07-18 Thread Martin Loncaric (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-31356?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martin Loncaric updated SPARK-31356:

Description: 
Problem: in Datasets API, it is a very common pattern to do something like this 
whenever a complex reduce function is needed:

{code:scala}
ds
  .groupByKey(_.y)
  .reduceGroups(((aKey, aVal), (bKey, bVal)) => outVal)
  .map(_._2)
{code}

However, the the optimized plan unfortunately ends up with an unnecessary 
implicit serialization during aggregation step, followed by 
{{DeserializeToObject + MapElements (from (K, V) => V)}}. For this common use 
case, there are 2 things we can improve with a specialized 
{{.reduceGroupValues}}:

1. avoid the extra serialization (baked in to AggregationIterator 
implementations) and deserialization
2. avoid requiring a {{.map}} and the time it takes to unpack tuples by 
emitting the values only

Proposal:

{code:scala}
ds
  .groupByKey(_.y)
  .reduceGroupValues((aVal, bVal) => outVal)
{code}
Create an {{AggregationIteratorBase}} superclass that can emit general 
{{InternalRow}} s instead of just {{UnsafeRow}} s.
Create a new {{AggregationIteratorBase}} implementation called 
{{ObjectValuesAggregationIterator}} that emits {{InternalRow}} s containing 
only the values instead of serializing them into {{UnsafeRow}} s on {{Final}} 
or {{Complete}} modes. Since we don't need to emit the keys, which are 
serialized, this is not too complicated. To make use of this, have the 
{{.reduceGroupValues}} return an {{Aggregate}} wrapped in a 
{{CatalystSerde.serialize}}.

  was:
Problem: in Datasets API, it is a very common pattern to do something like this 
whenever a complex reduce function is needed:

{code:scala}
ds
  .groupByKey(_.y)
  .reduceGroups(((aKey, aVal), (bKey, bVal)) => outVal)
  .map(_._2)
{code}

However, the the optimized plan unfortunately ends up with an unnecessary 
implicit serialization during aggregation step, followed by 
{{DeserializeToObject + MapElements (from (K, V) => V)}}. For this common use 
case, there are 2 things we can improve with a specialized 
{{.reduceGroupValues}}:

1. avoid the extra serialization (baked in to AggregationIterator 
implementations) and deserialization
2. avoid requiring a {{.map}} and the time it takes to unpack tuples by 
emitting the values only

Proposal:

{code:scala}
ds
  .groupByKey(_.y)
  .reduceGroupValues((aVal, bVal) => outVal)
{code}
Create an {{AggregationIteratorBase}} superclass that can emit general 
{{InternalRow}} s instead of just {{UnsafeRow}} s.
Create a new {{AggregationIteratorBase}} implementation called 
{{ObjectValuesAggregationIterator}} that emits {{InternalRow}}s containing only 
the values instead of serializing them into {{UnsafeRow}}s on {{Final}} or 
{{Complete}} modes. Since we don't need to emit the keys, which are serialized, 
this is not too complicated. To make use of this, have the 
{{.reduceGroupValues}} return an {{Aggregate}} wrapped in a 
{{CatalystSerde.serialize}}.


> reduceGroupValues function for KeyValueGroupedDataset
> -
>
> Key: SPARK-31356
> URL: https://issues.apache.org/jira/browse/SPARK-31356
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Martin Loncaric
>Priority: Major
>
> Problem: in Datasets API, it is a very common pattern to do something like 
> this whenever a complex reduce function is needed:
> {code:scala}
> ds
>   .groupByKey(_.y)
>   .reduceGroups(((aKey, aVal), (bKey, bVal)) => outVal)
>   .map(_._2)
> {code}
> However, the the optimized plan unfortunately ends up with an unnecessary 
> implicit serialization during aggregation step, followed by 
> {{DeserializeToObject + MapElements (from (K, V) => V)}}. For this common use 
> case, there are 2 things we can improve with a specialized 
> {{.reduceGroupValues}}:
> 1. avoid the extra serialization (baked in to AggregationIterator 
> implementations) and deserialization
> 2. avoid requiring a {{.map}} and the time it takes to unpack tuples by 
> emitting the values only
> Proposal:
> {code:scala}
> ds
>   .groupByKey(_.y)
>   .reduceGroupValues((aVal, bVal) => outVal)
> {code}
> Create an {{AggregationIteratorBase}} superclass that can emit general 
> {{InternalRow}} s instead of just {{UnsafeRow}} s.
> Create a new {{AggregationIteratorBase}} implementation called 
> {{ObjectValuesAggregationIterator}} that emits {{InternalRow}} s containing 
> only the values instead of serializing them into {{UnsafeRow}} s on {{Final}} 
> or {{Complete}} modes. Since we don't need to emit the keys, which are 
> serialized, this is not too complicated. To make use of this, have the 
> {{.reduceGroupValues}} return an {{Aggregate}} wrapped in a 
> {{CatalystSerde.serialize}}.



--
This message was sent by Atlassian 

[jira] [Updated] (SPARK-31356) reduceGroupValues function for KeyValueGroupedDataset

2020-07-18 Thread Martin Loncaric (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-31356?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martin Loncaric updated SPARK-31356:

Description: 
Problem: in Datasets API, it is a very common pattern to do something like this 
whenever a complex reduce function is needed:

{code:scala}
ds
  .groupByKey(_.y)
  .reduceGroups(((aKey, aVal), (bKey, bVal)) => outVal)
  .map(_._2)
{code}

However, the the optimized plan unfortunately ends up with an unnecessary 
implicit serialization during aggregation step, followed by 
{{DeserializeToObject + MapElements (from (K, V) => V)}}. For this common use 
case, there are 2 things we can improve with a specialized 
{{.reduceGroupValues}}:

1. avoid the extra serialization (baked in to AggregationIterator 
implementations) and deserialization
2. avoid requiring a {{.map}} and the time it takes to unpack tuples by 
emitting the values only

Proposal:

{code:scala}
ds
  .groupByKey(_.y)
  .reduceGroupValues((aVal, bVal) => outVal)
{code}
Create an {{AggregationIteratorBase}} superclass that can emit general 
{{InternalRow}}s instead of just {{UnsafeRow}}s.
Create a new {{AggregationIteratorBase}} implementation called 
{{ObjectValuesAggregationIterator}} that emits {{InternalRow}}s containing only 
the values instead of serializing them into {{UnsafeRow}}s on {{Final}} or 
{{Complete}} modes. Since we don't need to emit the keys, which are serialized, 
this is not too complicated. To make use of this, have the 
{{.reduceGroupValues}} return an {{Aggregate}} wrapped in a 
{{CatalystSerde.serialize}}.

  was:
Problem: in Datasets API, it is a very common pattern to do something like this 
whenever a complex reduce function is needed:

{code:scala}
ds
  .groupByKey(_.y)
  .reduceGroups((a, b) => {...})
  .map(_._2)
{code}

However, the the optimized plan unfortunately ends up with an unnecessary 
implicit serialization during aggregation step, followed by 
{{DeserializeToObject + MapElements (from (K, V) => V)}}. For this common use 
case, there are 2 things we can improve with a specialized 
{{.reduceGroupValues}}:

1. avoid the extra serialization (baked in to AggregationIterator 
implementations) and deserialization
2. avoid requiring a {{.map}} and the time it takes to unpack tuples by 
emitting the values only

Proposal:
Create an {{AggregationIteratorBase}} superclass that can emit general 
{{InternalRow}}s instead of just {{UnsafeRow}}s.
Create a new {{AggregationIteratorBase}} implementation called 
{{ObjectValuesAggregationIterator}} that emits {{InternalRow}}s containing only 
the values instead of serializing them into {{UnsafeRow}}s on {{Final}} or 
{{Complete}} modes. Since we don't need to emit the keys, which are serialized, 
this is not too complicated. To make use of this, have the 
{{.reduceGroupValues}} return an {{Aggregate}} wrapped in a 
{{CatalystSerde.serialize}}.


> reduceGroupValues function for KeyValueGroupedDataset
> -
>
> Key: SPARK-31356
> URL: https://issues.apache.org/jira/browse/SPARK-31356
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Martin Loncaric
>Priority: Major
>
> Problem: in Datasets API, it is a very common pattern to do something like 
> this whenever a complex reduce function is needed:
> {code:scala}
> ds
>   .groupByKey(_.y)
>   .reduceGroups(((aKey, aVal), (bKey, bVal)) => outVal)
>   .map(_._2)
> {code}
> However, the the optimized plan unfortunately ends up with an unnecessary 
> implicit serialization during aggregation step, followed by 
> {{DeserializeToObject + MapElements (from (K, V) => V)}}. For this common use 
> case, there are 2 things we can improve with a specialized 
> {{.reduceGroupValues}}:
> 1. avoid the extra serialization (baked in to AggregationIterator 
> implementations) and deserialization
> 2. avoid requiring a {{.map}} and the time it takes to unpack tuples by 
> emitting the values only
> Proposal:
> {code:scala}
> ds
>   .groupByKey(_.y)
>   .reduceGroupValues((aVal, bVal) => outVal)
> {code}
> Create an {{AggregationIteratorBase}} superclass that can emit general 
> {{InternalRow}}s instead of just {{UnsafeRow}}s.
> Create a new {{AggregationIteratorBase}} implementation called 
> {{ObjectValuesAggregationIterator}} that emits {{InternalRow}}s containing 
> only the values instead of serializing them into {{UnsafeRow}}s on {{Final}} 
> or {{Complete}} modes. Since we don't need to emit the keys, which are 
> serialized, this is not too complicated. To make use of this, have the 
> {{.reduceGroupValues}} return an {{Aggregate}} wrapped in a 
> {{CatalystSerde.serialize}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: 

[jira] [Updated] (SPARK-31356) reduceGroupValues function for KeyValueGroupedDataset

2020-07-18 Thread Martin Loncaric (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-31356?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martin Loncaric updated SPARK-31356:

Issue Type: New Feature  (was: Improvement)

> reduceGroupValues function for KeyValueGroupedDataset
> -
>
> Key: SPARK-31356
> URL: https://issues.apache.org/jira/browse/SPARK-31356
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Martin Loncaric
>Priority: Major
>
> Problem: in Datasets API, it is a very common pattern to do something like 
> this whenever a complex reduce function is needed:
> {code:scala}
> ds
>   .groupByKey(_.y)
>   .reduceGroups((a, b) => {...})
>   .map(_._2)
> {code}
> However, the the optimized plan unfortunately ends up with an unnecessary 
> implicit serialization during aggregation step, followed by 
> {{DeserializeToObject + MapElements (from (K, V) => V)}}. For this common use 
> case, there are 2 things we can improve with a specialized 
> {{.reduceGroupValues}}:
> 1. avoid the extra serialization (baked in to AggregationIterator 
> implementations) and deserialization
> 2. avoid requiring a {{.map}} and the time it takes to unpack tuples by 
> emitting the values only
> Proposal:
> Create an {{AggregationIteratorBase}} superclass that can emit general 
> {{InternalRow}}s instead of just {{UnsafeRow}}s.
> Create a new {{AggregationIteratorBase}} implementation called 
> {{ObjectValuesAggregationIterator}} that emits {{InternalRow}}s containing 
> only the values instead of serializing them into {{UnsafeRow}}s on {{Final}} 
> or {{Complete}} modes. Since we don't need to emit the keys, which are 
> serialized, this is not too complicated. To make use of this, have the 
> {{.reduceGroupValues}} return an {{Aggregate}} wrapped in a 
> {{CatalystSerde.serialize}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-31356) reduceGroupValues function for KeyValueGroupedDataset

2020-07-18 Thread Martin Loncaric (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17157128#comment-17157128
 ] 

Martin Loncaric edited comment on SPARK-31356 at 7/19/20, 1:07 AM:
---

Actually, there seem to be 4 separate performance issues:
1. unnecessary serialize + deserialize
2. unnecessary map
3. unnecessary appendColumns in the case when the groupByKey function just 
returns a subset of columns (though this is hard to get around in a type safe 
way)
4. actually the RDD's API is roughly a whole 2x faster. There might be even 
more room to improve aggregations


was (Author: mwlon):
Actually, there seem to be 4 separate performance issues:
1. unnecessary serialize + deserialize
2. unnecessary map
3. unnecessary appendColumns when groupByKey function just returns a subset of 
columns (though this is hard to get around in a type safe way)
4. actually the RDD's API is roughly a whole 2x faster. There might be even 
more room to improve aggregations

> reduceGroupValues function for KeyValueGroupedDataset
> -
>
> Key: SPARK-31356
> URL: https://issues.apache.org/jira/browse/SPARK-31356
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Martin Loncaric
>Priority: Major
>
> Problem: in Datasets API, it is a very common pattern to do something like 
> this whenever a complex reduce function is needed:
> {code:scala}
> ds
>   .groupByKey(_.y)
>   .reduceGroups((a, b) => {...})
>   .map(_._2)
> {code}
> However, the the optimized plan unfortunately ends up with an unnecessary 
> implicit serialization during aggregation step, followed by 
> {{DeserializeToObject + MapElements (from (K, V) => V)}}. For this common use 
> case, there are 2 things we can improve with a specialized 
> {{.reduceGroupValues}}:
> 1. avoid the extra serialization (baked in to AggregationIterator 
> implementations) and deserialization
> 2. avoid requiring a {{.map}} and the time it takes to unpack tuples by 
> emitting the values only
> Proposal:
> Create an {{AggregationIteratorBase}} superclass that can emit general 
> {{InternalRow}}s instead of just {{UnsafeRow}}s.
> Create a new {{AggregationIteratorBase}} implementation called 
> {{ObjectValuesAggregationIterator}} that emits {{InternalRow}}s containing 
> only the values instead of serializing them into {{UnsafeRow}}s on {{Final}} 
> or {{Complete}} modes. Since we don't need to emit the keys, which are 
> serialized, this is not too complicated. To make use of this, have the 
> {{.reduceGroupValues}} return an {{Aggregate}} wrapped in a 
> {{CatalystSerde.serialize}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-31356) reduceGroupValues function for KeyValueGroupedDataset

2020-07-18 Thread Martin Loncaric (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17157128#comment-17157128
 ] 

Martin Loncaric edited comment on SPARK-31356 at 7/19/20, 1:07 AM:
---

Actually, there seem to be 4 separate performance issues:
1. unnecessary serialize + deserialize
2. unnecessary map
3. unnecessary appendColumns when groupByKey function just returns a subset of 
columns (though this is hard to get around in a type safe way)
4. actually the RDD's API is roughly a whole 2x faster. There might be even 
more room to improve aggregations


was (Author: mwlon):
Actually, there seem to be 3 separate performance issues:
1. unnecessary appendColumns when groupByKey function just returns a subset of 
columns (though this is hard to get around in a type safe way)
2. unnecessary serialize + deserialize
3. actually the RDD's API is roughly a whole 2x faster. It seems there's a lot 
of room to improve aggregations

> reduceGroupValues function for KeyValueGroupedDataset
> -
>
> Key: SPARK-31356
> URL: https://issues.apache.org/jira/browse/SPARK-31356
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Martin Loncaric
>Priority: Major
>
> Problem: in Datasets API, it is a very common pattern to do something like 
> this whenever a complex reduce function is needed:
> {code:scala}
> ds
>   .groupByKey(_.y)
>   .reduceGroups((a, b) => {...})
>   .map(_._2)
> {code}
> However, the the optimized plan unfortunately ends up with an unnecessary 
> implicit serialization during aggregation step, followed by 
> {{DeserializeToObject + MapElements (from (K, V) => V)}}. For this common use 
> case, there are 2 things we can improve with a specialized 
> {{.reduceGroupValues}}:
> 1. avoid the extra serialization (baked in to AggregationIterator 
> implementations) and deserialization
> 2. avoid requiring a {{.map}} and the time it takes to unpack tuples by 
> emitting the values only
> Proposal:
> Create an {{AggregationIteratorBase}} superclass that can emit general 
> {{InternalRow}}s instead of just {{UnsafeRow}}s.
> Create a new {{AggregationIteratorBase}} implementation called 
> {{ObjectValuesAggregationIterator}} that emits {{InternalRow}}s containing 
> only the values instead of serializing them into {{UnsafeRow}}s on {{Final}} 
> or {{Complete}} modes. Since we don't need to emit the keys, which are 
> serialized, this is not too complicated. To make use of this, have the 
> {{.reduceGroupValues}} return an {{Aggregate}} wrapped in a 
> {{CatalystSerde.serialize}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-31356) reduceGroupValues function for KeyValueGroupedDataset

2020-07-18 Thread Martin Loncaric (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-31356?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martin Loncaric updated SPARK-31356:

Description: 
Problem: in Datasets API, it is a very common pattern to do something like this 
whenever a complex reduce function is needed:

{code:scala}
ds
  .groupByKey(_.y)
  .reduceGroups((a, b) => {...})
  .map(_._2)
{code}

However, the the optimized plan unfortunately ends up with an unnecessary 
implicit serialization during aggregation step, followed by 
{{DeserializeToObject + MapElements (from (K, V) => V)}}. For this common use 
case, there are 2 things we can improve with a specialized 
{{.reduceGroupValues}}:

1. avoid the extra serialization (baked in to AggregationIterator 
implementations) and deserialization
2. avoid requiring a {{.map}} and the time it takes to unpack tuples by 
emitting the values only

Proposal:
Create an {{AggregationIteratorBase}} superclass that can emit general 
{{InternalRow}}s instead of just {{UnsafeRow}}s.
Create a new {{AggregationIteratorBase}} implementation called 
{{ObjectValuesAggregationIterator}} that emits {{InternalRow}}s containing only 
the values instead of serializing them into {{UnsafeRow}}s on {{Final}} or 
{{Complete}} modes. Since we don't need to emit the keys, which are serialized, 
this is not too complicated. To make use of this, have the 
{{.reduceGroupValues}} return an {{Aggregate}} wrapped in a 
{{CatalystSerde.serialize}}.

  was:
Problem: in Datasets API, it is a very common pattern to do something like this 
whenever a complex reduce function is needed:

{code:scala}
ds
  .groupByKey(_.y)
  .reduceGroups((a, b) => {...})
  .map(_._2)
{code}

However, the .map(_._2) step (taking values and throwing keys away) 
unfortunately often ends up as an unnecessary serialization during aggregation 
step, followed by {{DeserializeToObject + MapElements (from (K, V) => V) + 
SerializeFromObject}} in the optimized logical plan. In this example, it would 
be more ideal to either skip the deserialization/serialization or {{Project 
(from (K, V) => V)}}. Even manually doing a {{.select(...).as[T]}} to replace 
the `.map` is quite tricky, because
* the columns are complicated, like {{[value, 
ReduceAggregator(my.data.type)]}}, and seem to be impossible to {{.select}}
* it breaks the nice type checking of Datasets

Proposal:
Change the {{KeyValueGroupedDataset.aggUntyped}} method to (like 
{{KeyValueGroupedDataset.cogroup}}) append add both an {{Aggregate node}} and a 
{{SerializeFromObject}} node so that the Optimizer can eliminate the 
serialization when it is redundant. Change aggregations to emit deserialized 
results.

I had 2 ideas for what we could change: either add a new feature to 
{{.reduceGroupValues}} that projects to only the necessary columns, or do this 
improvement. I thought this would be a better solution because
* it will improve the performance of existing Spark applications with no 
modifications
* feature growth is undesirable

Uncertainties:
Affects Version: I'm not sure - if I submit a PR soon, can we get this into 
3.0? Or only 3.1? And I assume we're not adding new features to 2.4?
Complications: Are there any hazards in splitting Aggregation into Aggregation 
+ SerializeFromObject that I'm not aware of?


> reduceGroupValues function for KeyValueGroupedDataset
> -
>
> Key: SPARK-31356
> URL: https://issues.apache.org/jira/browse/SPARK-31356
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Martin Loncaric
>Priority: Major
>
> Problem: in Datasets API, it is a very common pattern to do something like 
> this whenever a complex reduce function is needed:
> {code:scala}
> ds
>   .groupByKey(_.y)
>   .reduceGroups((a, b) => {...})
>   .map(_._2)
> {code}
> However, the the optimized plan unfortunately ends up with an unnecessary 
> implicit serialization during aggregation step, followed by 
> {{DeserializeToObject + MapElements (from (K, V) => V)}}. For this common use 
> case, there are 2 things we can improve with a specialized 
> {{.reduceGroupValues}}:
> 1. avoid the extra serialization (baked in to AggregationIterator 
> implementations) and deserialization
> 2. avoid requiring a {{.map}} and the time it takes to unpack tuples by 
> emitting the values only
> Proposal:
> Create an {{AggregationIteratorBase}} superclass that can emit general 
> {{InternalRow}}s instead of just {{UnsafeRow}}s.
> Create a new {{AggregationIteratorBase}} implementation called 
> {{ObjectValuesAggregationIterator}} that emits {{InternalRow}}s containing 
> only the values instead of serializing them into {{UnsafeRow}}s on {{Final}} 
> or {{Complete}} modes. Since we don't need to emit the keys, which are 
> serialized, this is not too complicated. To 

[jira] [Updated] (SPARK-31356) reduceGroupValues function for KeyValueGroupedDataset

2020-07-18 Thread Martin Loncaric (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-31356?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martin Loncaric updated SPARK-31356:

Summary: reduceGroupValues function for KeyValueGroupedDataset  (was: 
reduceGroupValues function for Dataset)

> reduceGroupValues function for KeyValueGroupedDataset
> -
>
> Key: SPARK-31356
> URL: https://issues.apache.org/jira/browse/SPARK-31356
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Martin Loncaric
>Priority: Major
>
> Problem: in Datasets API, it is a very common pattern to do something like 
> this whenever a complex reduce function is needed:
> {code:scala}
> ds
>   .groupByKey(_.y)
>   .reduceGroups((a, b) => {...})
>   .map(_._2)
> {code}
> However, the .map(_._2) step (taking values and throwing keys away) 
> unfortunately often ends up as an unnecessary serialization during 
> aggregation step, followed by {{DeserializeToObject + MapElements (from (K, 
> V) => V) + SerializeFromObject}} in the optimized logical plan. In this 
> example, it would be more ideal to either skip the 
> deserialization/serialization or {{Project (from (K, V) => V)}}. Even 
> manually doing a {{.select(...).as[T]}} to replace the `.map` is quite 
> tricky, because
> * the columns are complicated, like {{[value, 
> ReduceAggregator(my.data.type)]}}, and seem to be impossible to {{.select}}
> * it breaks the nice type checking of Datasets
> Proposal:
> Change the {{KeyValueGroupedDataset.aggUntyped}} method to (like 
> {{KeyValueGroupedDataset.cogroup}}) append add both an {{Aggregate node}} and 
> a {{SerializeFromObject}} node so that the Optimizer can eliminate the 
> serialization when it is redundant. Change aggregations to emit deserialized 
> results.
> I had 2 ideas for what we could change: either add a new feature to 
> {{.reduceGroupValues}} that projects to only the necessary columns, or do 
> this improvement. I thought this would be a better solution because
> * it will improve the performance of existing Spark applications with no 
> modifications
> * feature growth is undesirable
> Uncertainties:
> Affects Version: I'm not sure - if I submit a PR soon, can we get this into 
> 3.0? Or only 3.1? And I assume we're not adding new features to 2.4?
> Complications: Are there any hazards in splitting Aggregation into 
> Aggregation + SerializeFromObject that I'm not aware of?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-31356) reduceGroupValues function for Dataset

2020-07-18 Thread Martin Loncaric (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-31356?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martin Loncaric updated SPARK-31356:

Summary: reduceGroupValues function for Dataset  (was: Splitting Aggregate 
node into separate Aggregate and Serialize for Optimizer)

> reduceGroupValues function for Dataset
> --
>
> Key: SPARK-31356
> URL: https://issues.apache.org/jira/browse/SPARK-31356
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Martin Loncaric
>Priority: Major
>
> Problem: in Datasets API, it is a very common pattern to do something like 
> this whenever a complex reduce function is needed:
> {code:scala}
> ds
>   .groupByKey(_.y)
>   .reduceGroups((a, b) => {...})
>   .map(_._2)
> {code}
> However, the .map(_._2) step (taking values and throwing keys away) 
> unfortunately often ends up as an unnecessary serialization during 
> aggregation step, followed by {{DeserializeToObject + MapElements (from (K, 
> V) => V) + SerializeFromObject}} in the optimized logical plan. In this 
> example, it would be more ideal to either skip the 
> deserialization/serialization or {{Project (from (K, V) => V)}}. Even 
> manually doing a {{.select(...).as[T]}} to replace the `.map` is quite 
> tricky, because
> * the columns are complicated, like {{[value, 
> ReduceAggregator(my.data.type)]}}, and seem to be impossible to {{.select}}
> * it breaks the nice type checking of Datasets
> Proposal:
> Change the {{KeyValueGroupedDataset.aggUntyped}} method to (like 
> {{KeyValueGroupedDataset.cogroup}}) append add both an {{Aggregate node}} and 
> a {{SerializeFromObject}} node so that the Optimizer can eliminate the 
> serialization when it is redundant. Change aggregations to emit deserialized 
> results.
> I had 2 ideas for what we could change: either add a new feature to 
> {{.reduceGroupValues}} that projects to only the necessary columns, or do 
> this improvement. I thought this would be a better solution because
> * it will improve the performance of existing Spark applications with no 
> modifications
> * feature growth is undesirable
> Uncertainties:
> Affects Version: I'm not sure - if I submit a PR soon, can we get this into 
> 3.0? Or only 3.1? And I assume we're not adding new features to 2.4?
> Complications: Are there any hazards in splitting Aggregation into 
> Aggregation + SerializeFromObject that I'm not aware of?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-31356) Splitting Aggregate node into separate Aggregate and Serialize for Optimizer

2020-07-13 Thread Martin Loncaric (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17157128#comment-17157128
 ] 

Martin Loncaric commented on SPARK-31356:
-

Actually, there seem to be 3 separate performance issues:
1. unnecessary appendColumns when groupByKey function just returns a subset of 
columns (though this is hard to get around in a type safe way)
2. unnecessary serialize + deserialize
3. actually the RDD's API is roughly a whole 2x faster. It seems there's a lot 
of room to improve aggregations

> Splitting Aggregate node into separate Aggregate and Serialize for Optimizer
> 
>
> Key: SPARK-31356
> URL: https://issues.apache.org/jira/browse/SPARK-31356
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Martin Loncaric
>Priority: Major
>
> Problem: in Datasets API, it is a very common pattern to do something like 
> this whenever a complex reduce function is needed:
> {code:scala}
> ds
>   .groupByKey(_.y)
>   .reduceGroups((a, b) => {...})
>   .map(_._2)
> {code}
> However, the .map(_._2) step (taking values and throwing keys away) 
> unfortunately often ends up as an unnecessary serialization during 
> aggregation step, followed by {{DeserializeToObject + MapElements (from (K, 
> V) => V) + SerializeFromObject}} in the optimized logical plan. In this 
> example, it would be more ideal to either skip the 
> deserialization/serialization or {{Project (from (K, V) => V)}}. Even 
> manually doing a {{.select(...).as[T]}} to replace the `.map` is quite 
> tricky, because
> * the columns are complicated, like {{[value, 
> ReduceAggregator(my.data.type)]}}, and seem to be impossible to {{.select}}
> * it breaks the nice type checking of Datasets
> Proposal:
> Change the {{KeyValueGroupedDataset.aggUntyped}} method to (like 
> {{KeyValueGroupedDataset.cogroup}}) append add both an {{Aggregate node}} and 
> a {{SerializeFromObject}} node so that the Optimizer can eliminate the 
> serialization when it is redundant. Change aggregations to emit deserialized 
> results.
> I had 2 ideas for what we could change: either add a new feature to 
> {{.reduceGroupValues}} that projects to only the necessary columns, or do 
> this improvement. I thought this would be a better solution because
> * it will improve the performance of existing Spark applications with no 
> modifications
> * feature growth is undesirable
> Uncertainties:
> Affects Version: I'm not sure - if I submit a PR soon, can we get this into 
> 3.0? Or only 3.1? And I assume we're not adding new features to 2.4?
> Complications: Are there any hazards in splitting Aggregation into 
> Aggregation + SerializeFromObject that I'm not aware of?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-31356) Splitting Aggregate node into separate Aggregate and Serialize for Optimizer

2020-04-05 Thread Martin Loncaric (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-31356?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martin Loncaric updated SPARK-31356:

Summary: Splitting Aggregate node into separate Aggregate and Serialize for 
Optimizer  (was: KeyValueGroupedDataset method to reduce and take values only)

> Splitting Aggregate node into separate Aggregate and Serialize for Optimizer
> 
>
> Key: SPARK-31356
> URL: https://issues.apache.org/jira/browse/SPARK-31356
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Martin Loncaric
>Priority: Major
>
> Problem: in Datasets API, it is a very common pattern to do something like 
> this whenever a complex reduce function is needed:
> {code:scala}
> ds
>   .groupByKey(_.y)
>   .reduceGroups((a, b) => {...})
>   .map(_._2)
> {code}
> However, the .map(_._2) step (taking values and throwing keys away) 
> unfortunately often ends up as an unnecessary serialization during 
> aggregation step, followed by {{DeserializeToObject + MapElements (from (K, 
> V) => V) + SerializeFromObject}} in the optimized logical plan. In this 
> example, it would be more ideal to either skip the 
> deserialization/serialization or {{Project (from (K, V) => V)}}. Even 
> manually doing a {{.select(...).as[T]}} to replace the `.map` is quite 
> tricky, because
> * the columns are complicated, like {{[value, 
> ReduceAggregator(my.data.type)]}}, and seem to be impossible to {{.select}}
> * it breaks the nice type checking of Datasets
> Proposal:
> Change the {{KeyValueGroupedDataset.aggUntyped}} method to (like 
> {{KeyValueGroupedDataset.cogroup}}) append add both an {{Aggregate node}} and 
> a {{SerializeFromObject}} node so that the Optimizer can eliminate the 
> serialization when it is redundant. Change aggregations to emit deserialized 
> results.
> I had 2 ideas for what we could change: either add a new feature to 
> {{.reduceGroupValues}} that projects to only the necessary columns, or do 
> this improvement. I thought this would be a better solution because
> * it will improve the performance of existing Spark applications with no 
> modifications
> * feature growth is undesirable
> Uncertainties:
> Affects Version: I'm not sure - if I submit a PR soon, can we get this into 
> 3.0? Or only 3.1? And I assume we're not adding new features to 2.4?
> Complications: Are there any hazards in splitting Aggregation into 
> Aggregation + SerializeFromObject that I'm not aware of?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-31356) KeyValueGroupedDataset method to reduce and take values only

2020-04-05 Thread Martin Loncaric (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-31356?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martin Loncaric updated SPARK-31356:

Description: 
Problem: in Datasets API, it is a very common pattern to do something like this 
whenever a complex reduce function is needed:

{code:scala}
ds
  .groupByKey(_.y)
  .reduceGroups((a, b) => {...})
  .map(_._2)
{code}

However, the .map(_._2) step (taking values and throwing keys away) 
unfortunately often ends up as an unnecessary serialization during aggregation 
step, followed by {{DeserializeToObject + MapElements (from (K, V) => V) + 
SerializeFromObject}} in the optimized logical plan. In this example, it would 
be more ideal to either skip the deserialization/serialization or {{Project 
(from (K, V) => V)}}. Even manually doing a {{.select(...).as[T]}} to replace 
the `.map` is quite tricky, because
* the columns are complicated, like {{[value, 
ReduceAggregator(my.data.type)]}}, and seem to be impossible to {{.select}}
* it breaks the nice type checking of Datasets

Proposal:
Change the {{KeyValueGroupedDataset.aggUntyped}} method to (like 
{{KeyValueGroupedDataset.cogroup}} append add both an {{Aggregate node}} and a 
{{SerializeFromObject}} node so that the Optimizer can eliminate the 
serialization when it is redundant. Change aggregations to emit deserialized 
results.

I had 2 ideas for what we could change: either add a new feature to 
{{.reduceGroupValues}} that projects to only the necessary columns, or do this 
improvement. I thought this would be a better solution because
* it will improve the performance of existing Spark applications with no 
modifications
* feature growth is undesirable

Uncertainties:
Affects Version: I'm not sure - if I submit a PR soon, can we get this into 
3.0? Or only 3.1? And I assume we're not adding new features to 2.4?
Complications: Are there any hazards in splitting Aggregation into Aggregation 
+ SerializeFromObject that I'm not aware of?

  was:
Problem: in Datasets API, it is a very common pattern to do something like this 
whenever a complex reduce function is needed:

{code:scala}
ds
  .groupByKey(_.y)
  .reduceGroups((a, b) => {...})
  .map(_._2)
{code}

However, the .map(_._2) step (taking values and throwing keys away) 
unfortunately often ends up as an unnecessary serialization during aggregation 
step, followed by {{DeserializeToObject + MapElements (from (K, V) => V) + 
SerializeFromObject}} in the optimized logical plan. In this example, it would 
be more ideal something like {{Project (from (K, V) => V)}} or . Even manually 
doing a `.select(...).as[T]` to replace the `.map` is quite tricky, because
* the columns are complicated, like {{[value, 
ReduceAggregator(my.data.type)]}}, and seem to be impossible to {{.select}}
* it breaks the nice type checking of Datasets

Proposal:
Change the {{KeyValueGroupedDataset.aggUntyped}} method to (like 
{{KeyValueGroupedDataset.cogroup}} append add both an {{Aggregate node}} and a 
{{SerializeFromObject}} node so that the Optimizer can eliminate the 
serialization when it is redundant. Change aggregations to emit deserialized 
results.

I had 2 ideas for what we could change: either add a new feature to 
{{.reduceGroupValues}} that projects to only the necessary columns, or do this 
improvement. I thought this would be a better solution because
* it will improve the performance of existing Spark applications with no 
modifications
* feature growth is undesirable

Uncertainties:
Affects Version: I'm not sure - if I submit a PR soon, can we get this into 
3.0? Or only 3.1? And I assume we're not adding new features to 2.4?
Complications: Are there any hazards in splitting Aggregation into Aggregation 
+ SerializeFromObject that I'm not aware of?


> KeyValueGroupedDataset method to reduce and take values only
> 
>
> Key: SPARK-31356
> URL: https://issues.apache.org/jira/browse/SPARK-31356
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Martin Loncaric
>Priority: Major
>
> Problem: in Datasets API, it is a very common pattern to do something like 
> this whenever a complex reduce function is needed:
> {code:scala}
> ds
>   .groupByKey(_.y)
>   .reduceGroups((a, b) => {...})
>   .map(_._2)
> {code}
> However, the .map(_._2) step (taking values and throwing keys away) 
> unfortunately often ends up as an unnecessary serialization during 
> aggregation step, followed by {{DeserializeToObject + MapElements (from (K, 
> V) => V) + SerializeFromObject}} in the optimized logical plan. In this 
> example, it would be more ideal to either skip the 
> deserialization/serialization or {{Project (from (K, V) => V)}}. Even 
> manually doing a {{.select(...).as[T]}} to replace the `.map` is quite 
> tricky, 

[jira] [Updated] (SPARK-31356) KeyValueGroupedDataset method to reduce and take values only

2020-04-05 Thread Martin Loncaric (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-31356?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martin Loncaric updated SPARK-31356:

Description: 
Problem: in Datasets API, it is a very common pattern to do something like this 
whenever a complex reduce function is needed:

{code:scala}
ds
  .groupByKey(_.y)
  .reduceGroups((a, b) => {...})
  .map(_._2)
{code}

However, the .map(_._2) step (taking values and throwing keys away) 
unfortunately often ends up as an unnecessary serialization during aggregation 
step, followed by {{DeserializeToObject + MapElements (from (K, V) => V) + 
SerializeFromObject}} in the optimized logical plan. In this example, it would 
be more ideal to either skip the deserialization/serialization or {{Project 
(from (K, V) => V)}}. Even manually doing a {{.select(...).as[T]}} to replace 
the `.map` is quite tricky, because
* the columns are complicated, like {{[value, 
ReduceAggregator(my.data.type)]}}, and seem to be impossible to {{.select}}
* it breaks the nice type checking of Datasets

Proposal:
Change the {{KeyValueGroupedDataset.aggUntyped}} method to (like 
{{KeyValueGroupedDataset.cogroup}}) append add both an {{Aggregate node}} and a 
{{SerializeFromObject}} node so that the Optimizer can eliminate the 
serialization when it is redundant. Change aggregations to emit deserialized 
results.

I had 2 ideas for what we could change: either add a new feature to 
{{.reduceGroupValues}} that projects to only the necessary columns, or do this 
improvement. I thought this would be a better solution because
* it will improve the performance of existing Spark applications with no 
modifications
* feature growth is undesirable

Uncertainties:
Affects Version: I'm not sure - if I submit a PR soon, can we get this into 
3.0? Or only 3.1? And I assume we're not adding new features to 2.4?
Complications: Are there any hazards in splitting Aggregation into Aggregation 
+ SerializeFromObject that I'm not aware of?

  was:
Problem: in Datasets API, it is a very common pattern to do something like this 
whenever a complex reduce function is needed:

{code:scala}
ds
  .groupByKey(_.y)
  .reduceGroups((a, b) => {...})
  .map(_._2)
{code}

However, the .map(_._2) step (taking values and throwing keys away) 
unfortunately often ends up as an unnecessary serialization during aggregation 
step, followed by {{DeserializeToObject + MapElements (from (K, V) => V) + 
SerializeFromObject}} in the optimized logical plan. In this example, it would 
be more ideal to either skip the deserialization/serialization or {{Project 
(from (K, V) => V)}}. Even manually doing a {{.select(...).as[T]}} to replace 
the `.map` is quite tricky, because
* the columns are complicated, like {{[value, 
ReduceAggregator(my.data.type)]}}, and seem to be impossible to {{.select}}
* it breaks the nice type checking of Datasets

Proposal:
Change the {{KeyValueGroupedDataset.aggUntyped}} method to (like 
{{KeyValueGroupedDataset.cogroup}} append add both an {{Aggregate node}} and a 
{{SerializeFromObject}} node so that the Optimizer can eliminate the 
serialization when it is redundant. Change aggregations to emit deserialized 
results.

I had 2 ideas for what we could change: either add a new feature to 
{{.reduceGroupValues}} that projects to only the necessary columns, or do this 
improvement. I thought this would be a better solution because
* it will improve the performance of existing Spark applications with no 
modifications
* feature growth is undesirable

Uncertainties:
Affects Version: I'm not sure - if I submit a PR soon, can we get this into 
3.0? Or only 3.1? And I assume we're not adding new features to 2.4?
Complications: Are there any hazards in splitting Aggregation into Aggregation 
+ SerializeFromObject that I'm not aware of?


> KeyValueGroupedDataset method to reduce and take values only
> 
>
> Key: SPARK-31356
> URL: https://issues.apache.org/jira/browse/SPARK-31356
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Martin Loncaric
>Priority: Major
>
> Problem: in Datasets API, it is a very common pattern to do something like 
> this whenever a complex reduce function is needed:
> {code:scala}
> ds
>   .groupByKey(_.y)
>   .reduceGroups((a, b) => {...})
>   .map(_._2)
> {code}
> However, the .map(_._2) step (taking values and throwing keys away) 
> unfortunately often ends up as an unnecessary serialization during 
> aggregation step, followed by {{DeserializeToObject + MapElements (from (K, 
> V) => V) + SerializeFromObject}} in the optimized logical plan. In this 
> example, it would be more ideal to either skip the 
> deserialization/serialization or {{Project (from (K, V) => V)}}. Even 
> manually doing a {{.select(...).as[T]}} to 

[jira] [Created] (SPARK-31356) KeyValueGroupedDataset method to reduce and take values only

2020-04-05 Thread Martin Loncaric (Jira)
Martin Loncaric created SPARK-31356:
---

 Summary: KeyValueGroupedDataset method to reduce and take values 
only
 Key: SPARK-31356
 URL: https://issues.apache.org/jira/browse/SPARK-31356
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.0.0
Reporter: Martin Loncaric


Problem: in Datasets API, it is a very common pattern to do something like this 
whenever a complex reduce function is needed:

{code:scala}
ds
  .groupByKey(_.y)
  .reduceGroups((a, b) => {...})
  .map(_._2)
{code}

However, the .map(_._2) step (taking values and throwing keys away) 
unfortunately often ends up as an unnecessary serialization during aggregation 
step, followed by {{DeserializeToObject + MapElements (from (K, V) => V) + 
SerializeFromObject}} in the optimized logical plan. In this example, it would 
be more ideal something like {{Project (from (K, V) => V)}} or . Even manually 
doing a `.select(...).as[T]` to replace the `.map` is quite tricky, because
* the columns are complicated, like {{[value, 
ReduceAggregator(my.data.type)]}}, and seem to be impossible to {{.select}}
* it breaks the nice type checking of Datasets

Proposal:
Change the {{KeyValueGroupedDataset.aggUntyped}} method to (like 
{{KeyValueGroupedDataset.cogroup}} append add both an {{Aggregate node}} and a 
{{SerializeFromObject}} node so that the Optimizer can eliminate the 
serialization when it is redundant. Change aggregations to emit deserialized 
results.

I had 2 ideas for what we could change: either add a new feature to 
{{.reduceGroupValues}} that projects to only the necessary columns, or do this 
improvement. I thought this would be a better solution because
* it will improve the performance of existing Spark applications with no 
modifications
* feature growth is undesirable

Uncertainties:
Affects Version: I'm not sure - if I submit a PR soon, can we get this into 
3.0? Or only 3.1? And I assume we're not adding new features to 2.4?
Complications: Are there any hazards in splitting Aggregation into Aggregation 
+ SerializeFromObject that I'm not aware of?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27098) Flaky missing file parts when writing to Ceph without error

2019-05-31 Thread Martin Loncaric (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16853245#comment-16853245
 ] 

Martin Loncaric commented on SPARK-27098:
-

After upgrading to Hadoop 2.9 and using 
{{spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2}}, the problem 
is substantially less frequent, but still present. I think this suggests that 
moving files sometimes quietly fails.

> Flaky missing file parts when writing to Ceph without error
> ---
>
> Key: SPARK-27098
> URL: https://issues.apache.org/jira/browse/SPARK-27098
> Project: Spark
>  Issue Type: Bug
>  Components: Input/Output
>Affects Versions: 2.4.0
>Reporter: Martin Loncaric
>Priority: Major
> Attachments: sanitized_stdout_1.txt
>
>
> https://stackoverflow.com/questions/54935822/spark-s3a-write-omits-upload-part-without-failure/55031233?noredirect=1#comment96835218_55031233
> Using 2.4.0 with Hadoop 2.7, hadoop-aws 2.7.5, and the Ceph S3 endpoint. 
> occasionally a file part will be missing; i.e. part 3 here:
> ```
> > aws s3 ls my-bucket/folder/
> 2019-02-28 13:07:21  0 _SUCCESS
> 2019-02-28 13:06:58   79428651 
> part-0-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:06:59   79586172 
> part-1-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:00   79561910 
> part-2-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:01   79192617 
> part-4-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:07   79364413 
> part-5-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:08   79623254 
> part-6-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:10   79445030 
> part-7-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:10   79474923 
> part-8-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:11   79477310 
> part-9-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:12   79331453 
> part-00010-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:13   79567600 
> part-00011-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:13   79388012 
> part-00012-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:14   79308387 
> part-00013-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:15   79455483 
> part-00014-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:17   79512342 
> part-00015-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:18   79403307 
> part-00016-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:18   79617769 
> part-00017-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:19   79333534 
> part-00018-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:20   79543324 
> part-00019-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> ```
> However, the write succeeds and leaves a _SUCCESS file.
> This can be caught by additionally checking afterward whether the number of 
> written file parts agrees with the number of partitions, but Spark should at 
> least fail on its own and leave a meaningful stack trace in this case.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26192) MesosClusterScheduler reads options from dispatcher conf instead of submission conf

2019-05-26 Thread Martin Loncaric (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16848516#comment-16848516
 ] 

Martin Loncaric commented on SPARK-26192:
-

[~dongjoon] I made a PR: https://github.com/apache/spark/pull/24713

> MesosClusterScheduler reads options from dispatcher conf instead of 
> submission conf
> ---
>
> Key: SPARK-26192
> URL: https://issues.apache.org/jira/browse/SPARK-26192
> Project: Spark
>  Issue Type: Improvement
>  Components: Mesos
>Affects Versions: 2.3.0, 2.3.1, 2.3.2, 2.4.0
>Reporter: Martin Loncaric
>Assignee: Martin Loncaric
>Priority: Minor
> Fix For: 3.0.0
>
>
> There is at least one option accessed in MesosClusterScheduler that should 
> come from the submission's configuration instead of the dispatcher's:
> spark.mesos.fetcherCache.enable
> Coincidentally, the spark.mesos.fetcherCache.enable option was previously 
> misnamed, as referenced in the linked JIRA.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-27098) Flaky missing file parts when writing to Ceph without error

2019-03-28 Thread Martin Loncaric (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16804294#comment-16804294
 ] 

Martin Loncaric edited comment on SPARK-27098 at 3/28/19 8:53 PM:
--

An INFO of this format also appears for each missing file:

{{19/03/28 16:07:24 INFO SparkHadoopMapRedUtil: No need to commit output of 
task because needsTaskCommit=false: attempt_20190328160411_0004_m_000113_343}}


was (Author: mwlon):
An INFO of this format also appears for each missing file: {{19/03/28 16:07:24 
INFO SparkHadoopMapRedUtil: No need to commit output of task because 
needsTaskCommit=false: attempt_20190328160411_0004_m_000113_343}}

> Flaky missing file parts when writing to Ceph without error
> ---
>
> Key: SPARK-27098
> URL: https://issues.apache.org/jira/browse/SPARK-27098
> Project: Spark
>  Issue Type: Bug
>  Components: Input/Output
>Affects Versions: 2.4.0
>Reporter: Martin Loncaric
>Priority: Major
> Attachments: sanitized_stdout_1.txt
>
>
> https://stackoverflow.com/questions/54935822/spark-s3a-write-omits-upload-part-without-failure/55031233?noredirect=1#comment96835218_55031233
> Using 2.4.0 with Hadoop 2.7, hadoop-aws 2.7.5, and the Ceph S3 endpoint. 
> occasionally a file part will be missing; i.e. part 3 here:
> ```
> > aws s3 ls my-bucket/folder/
> 2019-02-28 13:07:21  0 _SUCCESS
> 2019-02-28 13:06:58   79428651 
> part-0-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:06:59   79586172 
> part-1-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:00   79561910 
> part-2-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:01   79192617 
> part-4-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:07   79364413 
> part-5-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:08   79623254 
> part-6-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:10   79445030 
> part-7-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:10   79474923 
> part-8-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:11   79477310 
> part-9-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:12   79331453 
> part-00010-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:13   79567600 
> part-00011-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:13   79388012 
> part-00012-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:14   79308387 
> part-00013-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:15   79455483 
> part-00014-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:17   79512342 
> part-00015-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:18   79403307 
> part-00016-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:18   79617769 
> part-00017-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:19   79333534 
> part-00018-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:20   79543324 
> part-00019-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> ```
> However, the write succeeds and leaves a _SUCCESS file.
> This can be caught by additionally checking afterward whether the number of 
> written file parts agrees with the number of partitions, but Spark should at 
> least fail on its own and leave a meaningful stack trace in this case.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27098) Flaky missing file parts when writing to Ceph without error

2019-03-28 Thread Martin Loncaric (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16804294#comment-16804294
 ] 

Martin Loncaric commented on SPARK-27098:
-

An INFO of this format also appears for each missing file: {{19/03/28 16:07:24 
INFO SparkHadoopMapRedUtil: No need to commit output of task because 
needsTaskCommit=false: attempt_20190328160411_0004_m_000113_343}}

> Flaky missing file parts when writing to Ceph without error
> ---
>
> Key: SPARK-27098
> URL: https://issues.apache.org/jira/browse/SPARK-27098
> Project: Spark
>  Issue Type: Bug
>  Components: Input/Output
>Affects Versions: 2.4.0
>Reporter: Martin Loncaric
>Priority: Major
> Attachments: sanitized_stdout_1.txt
>
>
> https://stackoverflow.com/questions/54935822/spark-s3a-write-omits-upload-part-without-failure/55031233?noredirect=1#comment96835218_55031233
> Using 2.4.0 with Hadoop 2.7, hadoop-aws 2.7.5, and the Ceph S3 endpoint. 
> occasionally a file part will be missing; i.e. part 3 here:
> ```
> > aws s3 ls my-bucket/folder/
> 2019-02-28 13:07:21  0 _SUCCESS
> 2019-02-28 13:06:58   79428651 
> part-0-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:06:59   79586172 
> part-1-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:00   79561910 
> part-2-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:01   79192617 
> part-4-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:07   79364413 
> part-5-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:08   79623254 
> part-6-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:10   79445030 
> part-7-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:10   79474923 
> part-8-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:11   79477310 
> part-9-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:12   79331453 
> part-00010-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:13   79567600 
> part-00011-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:13   79388012 
> part-00012-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:14   79308387 
> part-00013-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:15   79455483 
> part-00014-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:17   79512342 
> part-00015-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:18   79403307 
> part-00016-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:18   79617769 
> part-00017-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:19   79333534 
> part-00018-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:20   79543324 
> part-00019-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> ```
> However, the write succeeds and leaves a _SUCCESS file.
> This can be caught by additionally checking afterward whether the number of 
> written file parts agrees with the number of partitions, but Spark should at 
> least fail on its own and leave a meaningful stack trace in this case.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-27098) Flaky missing file parts when writing to Ceph without error

2019-03-14 Thread Martin Loncaric (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16793082#comment-16793082
 ] 

Martin Loncaric edited comment on SPARK-27098 at 3/14/19 9:28 PM:
--

[~ste...@apache.org] Does this make more sense to you? This seems to suggest a 
bug in either Spark or Hadoop, but do you have a more specific idea of where to 
look?


was (Author: mwlon):
[~ste...@apache.org] Does this make more sense to you? This seems to suggest a 
bug in either Spark or Hadoop, but do you have a better idea of where to look?

> Flaky missing file parts when writing to Ceph without error
> ---
>
> Key: SPARK-27098
> URL: https://issues.apache.org/jira/browse/SPARK-27098
> Project: Spark
>  Issue Type: Bug
>  Components: Input/Output
>Affects Versions: 2.4.0
>Reporter: Martin Loncaric
>Priority: Major
> Attachments: sanitized_stdout_1.txt
>
>
> https://stackoverflow.com/questions/54935822/spark-s3a-write-omits-upload-part-without-failure/55031233?noredirect=1#comment96835218_55031233
> Using 2.4.0 with Hadoop 2.7, hadoop-aws 2.7.5, and the Ceph S3 endpoint. 
> occasionally a file part will be missing; i.e. part 3 here:
> ```
> > aws s3 ls my-bucket/folder/
> 2019-02-28 13:07:21  0 _SUCCESS
> 2019-02-28 13:06:58   79428651 
> part-0-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:06:59   79586172 
> part-1-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:00   79561910 
> part-2-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:01   79192617 
> part-4-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:07   79364413 
> part-5-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:08   79623254 
> part-6-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:10   79445030 
> part-7-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:10   79474923 
> part-8-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:11   79477310 
> part-9-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:12   79331453 
> part-00010-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:13   79567600 
> part-00011-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:13   79388012 
> part-00012-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:14   79308387 
> part-00013-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:15   79455483 
> part-00014-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:17   79512342 
> part-00015-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:18   79403307 
> part-00016-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:18   79617769 
> part-00017-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:19   79333534 
> part-00018-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:20   79543324 
> part-00019-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> ```
> However, the write succeeds and leaves a _SUCCESS file.
> This can be caught by additionally checking afterward whether the number of 
> written file parts agrees with the number of partitions, but Spark should at 
> least fail on its own and leave a meaningful stack trace in this case.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27098) Flaky missing file parts when writing to Ceph without error

2019-03-14 Thread Martin Loncaric (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16793082#comment-16793082
 ] 

Martin Loncaric commented on SPARK-27098:
-

[~ste...@apache.org] Does this make more sense to you? This seems to suggest a 
bug in either Spark or Hadoop, but do you have a better idea of where to look?

> Flaky missing file parts when writing to Ceph without error
> ---
>
> Key: SPARK-27098
> URL: https://issues.apache.org/jira/browse/SPARK-27098
> Project: Spark
>  Issue Type: Bug
>  Components: Input/Output
>Affects Versions: 2.4.0
>Reporter: Martin Loncaric
>Priority: Major
> Attachments: sanitized_stdout_1.txt
>
>
> https://stackoverflow.com/questions/54935822/spark-s3a-write-omits-upload-part-without-failure/55031233?noredirect=1#comment96835218_55031233
> Using 2.4.0 with Hadoop 2.7, hadoop-aws 2.7.5, and the Ceph S3 endpoint. 
> occasionally a file part will be missing; i.e. part 3 here:
> ```
> > aws s3 ls my-bucket/folder/
> 2019-02-28 13:07:21  0 _SUCCESS
> 2019-02-28 13:06:58   79428651 
> part-0-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:06:59   79586172 
> part-1-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:00   79561910 
> part-2-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:01   79192617 
> part-4-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:07   79364413 
> part-5-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:08   79623254 
> part-6-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:10   79445030 
> part-7-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:10   79474923 
> part-8-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:11   79477310 
> part-9-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:12   79331453 
> part-00010-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:13   79567600 
> part-00011-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:13   79388012 
> part-00012-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:14   79308387 
> part-00013-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:15   79455483 
> part-00014-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:17   79512342 
> part-00015-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:18   79403307 
> part-00016-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:18   79617769 
> part-00017-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:19   79333534 
> part-00018-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:20   79543324 
> part-00019-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> ```
> However, the write succeeds and leaves a _SUCCESS file.
> This can be caught by additionally checking afterward whether the number of 
> written file parts agrees with the number of partitions, but Spark should at 
> least fail on its own and leave a meaningful stack trace in this case.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27098) Flaky missing file parts when writing to Ceph without error

2019-03-14 Thread Martin Loncaric (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16793080#comment-16793080
 ] 

Martin Loncaric commented on SPARK-27098:
-

I've gotten the debug logs for (1.), but can't make much of them. In this case, 
`part-0-` was missing:

{{Exception in thread "main" java.lang.AssertionError: assertion failed: 
Expected to write dataframe with 20 partitions in s3a://my-bucket/my_folder but 
instead found 19 written parts!
  1552587026347 82681618 
part-1-5e21727b-508e-4246-b47c-c68c98c04f50-c000.snappy.parquet
  1552587027399 82631123 
part-2-5e21727b-508e-4246-b47c-c68c98c04f50-c000.snappy.parquet
  1552587028592 82513038 
part-3-5e21727b-508e-4246-b47c-c68c98c04f50-c000.snappy.parquet
  1552587029544 82325322 
part-4-5e21727b-508e-4246-b47c-c68c98c04f50-c000.snappy.parquet
  1552587030573 82497917 
part-5-5e21727b-508e-4246-b47c-c68c98c04f50-c000.snappy.parquet
  1552587031590 82736624 
part-6-5e21727b-508e-4246-b47c-c68c98c04f50-c000.snappy.parquet
  1552587032449 82573267 
part-7-5e21727b-508e-4246-b47c-c68c98c04f50-c000.snappy.parquet
  1552587033351 82590538 
part-8-5e21727b-508e-4246-b47c-c68c98c04f50-c000.snappy.parquet
  1552587034582 82617979 
part-9-5e21727b-508e-4246-b47c-c68c98c04f50-c000.snappy.parquet
  1552587035817 82430474 
part-00010-5e21727b-508e-4246-b47c-c68c98c04f50-c000.snappy.parquet
  1552587036808 82688230 
part-00011-5e21727b-508e-4246-b47c-c68c98c04f50-c000.snappy.parquet
  1552587037744 8252 
part-00012-5e21727b-508e-4246-b47c-c68c98c04f50-c000.snappy.parquet
  1552587039017 82434976 
part-00013-5e21727b-508e-4246-b47c-c68c98c04f50-c000.snappy.parquet
  1552587039919 82535772 
part-00014-5e21727b-508e-4246-b47c-c68c98c04f50-c000.snappy.parquet
  1552587040884 82612890 
part-00015-5e21727b-508e-4246-b47c-c68c98c04f50-c000.snappy.parquet
  1552587041898 82535110 
part-00016-5e21727b-508e-4246-b47c-c68c98c04f50-c000.snappy.parquet
  1552587042829 82735449 
part-00017-5e21727b-508e-4246-b47c-c68c98c04f50-c000.snappy.parquet
  1552587043744 82460648 
part-00018-5e21727b-508e-4246-b47c-c68c98c04f50-c000.snappy.parquet
  1552587044641 82658185 
part-00019-5e21727b-508e-4246-b47c-c68c98c04f50-c000.snappy.parquet
  at scala.Predef$.assert(Predef.scala:170)}}

Looking at stdout for the driver, I find that there is absolutely no mention of 
part-0, but the other parts (i.e. part-1) have various logs, including 
the "rename path" ones you mentioned, like so:

{{2019-03-14 18:10:26 DEBUG S3AFileSystem:449 - Rename path 
s3a://my-bucket/my/folder/_temporary/0/task_20190314180906_0016_m_01/part-1-5e21727b-508e-4246-b47c-c68c98c04f50-c000.snappy.parquet
 to 
s3a://my-bucket/my/folder/part-1-5e21727b-508e-4246-b47c-c68c98c04f50-c000.snappy.parquet}}

I have attached all the debugging related to part-1 here. As mentioned, 
there is nothing for the missing part-0 (in other runs, it was a different 
part missing, so there is nothing special about 0, just coincidence). 

[^sanitized_stdout_1.txt] 

> Flaky missing file parts when writing to Ceph without error
> ---
>
> Key: SPARK-27098
> URL: https://issues.apache.org/jira/browse/SPARK-27098
> Project: Spark
>  Issue Type: Bug
>  Components: Input/Output
>Affects Versions: 2.4.0
>Reporter: Martin Loncaric
>Priority: Major
> Attachments: sanitized_stdout_1.txt
>
>
> https://stackoverflow.com/questions/54935822/spark-s3a-write-omits-upload-part-without-failure/55031233?noredirect=1#comment96835218_55031233
> Using 2.4.0 with Hadoop 2.7, hadoop-aws 2.7.5, and the Ceph S3 endpoint. 
> occasionally a file part will be missing; i.e. part 3 here:
> ```
> > aws s3 ls my-bucket/folder/
> 2019-02-28 13:07:21  0 _SUCCESS
> 2019-02-28 13:06:58   79428651 
> part-0-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:06:59   79586172 
> part-1-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:00   79561910 
> part-2-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:01   79192617 
> part-4-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:07   79364413 
> part-5-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:08   79623254 
> part-6-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:10   79445030 
> part-7-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:10   79474923 
> part-8-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:11   79477310 
> part-9-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:12   79331453 
> 

[jira] [Updated] (SPARK-27098) Flaky missing file parts when writing to Ceph without error

2019-03-14 Thread Martin Loncaric (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27098?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martin Loncaric updated SPARK-27098:

Attachment: sanitized_stdout_1.txt

> Flaky missing file parts when writing to Ceph without error
> ---
>
> Key: SPARK-27098
> URL: https://issues.apache.org/jira/browse/SPARK-27098
> Project: Spark
>  Issue Type: Bug
>  Components: Input/Output
>Affects Versions: 2.4.0
>Reporter: Martin Loncaric
>Priority: Major
> Attachments: sanitized_stdout_1.txt
>
>
> https://stackoverflow.com/questions/54935822/spark-s3a-write-omits-upload-part-without-failure/55031233?noredirect=1#comment96835218_55031233
> Using 2.4.0 with Hadoop 2.7, hadoop-aws 2.7.5, and the Ceph S3 endpoint. 
> occasionally a file part will be missing; i.e. part 3 here:
> ```
> > aws s3 ls my-bucket/folder/
> 2019-02-28 13:07:21  0 _SUCCESS
> 2019-02-28 13:06:58   79428651 
> part-0-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:06:59   79586172 
> part-1-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:00   79561910 
> part-2-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:01   79192617 
> part-4-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:07   79364413 
> part-5-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:08   79623254 
> part-6-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:10   79445030 
> part-7-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:10   79474923 
> part-8-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:11   79477310 
> part-9-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:12   79331453 
> part-00010-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:13   79567600 
> part-00011-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:13   79388012 
> part-00012-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:14   79308387 
> part-00013-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:15   79455483 
> part-00014-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:17   79512342 
> part-00015-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:18   79403307 
> part-00016-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:18   79617769 
> part-00017-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:19   79333534 
> part-00018-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> 2019-02-28 13:07:20   79543324 
> part-00019-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
> ```
> However, the write succeeds and leaves a _SUCCESS file.
> This can be caught by additionally checking afterward whether the number of 
> written file parts agrees with the number of partitions, but Spark should at 
> least fail on its own and leave a meaningful stack trace in this case.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26555) Thread safety issue causes createDataset to fail with misleading errors

2019-03-13 Thread Martin Loncaric (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16792240#comment-16792240
 ] 

Martin Loncaric commented on SPARK-26555:
-

This is an existing issue with scala: https://github.com/scala/bug/issues/10766

> Thread safety issue causes createDataset to fail with misleading errors
> ---
>
> Key: SPARK-26555
> URL: https://issues.apache.org/jira/browse/SPARK-26555
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Martin Loncaric
>Priority: Major
>
> This can be replicated (~2% of the time) with
> {code:scala}
> import java.sql.Timestamp
> import java.util.concurrent.{Executors, Future}
> import org.apache.spark.sql.SparkSession
> import scala.collection.mutable.ListBuffer
> import scala.concurrent.ExecutionContext
> import scala.util.Random
> object Main {
>   def main(args: Array[String]): Unit = {
> val sparkSession = SparkSession.builder
>   .getOrCreate()
> import sparkSession.implicits._
> val executor = Executors.newFixedThreadPool(1)
> try {
>   implicit val xc: ExecutionContext = 
> ExecutionContext.fromExecutorService(executor)
>   val futures = new ListBuffer[Future[_]]()
>   for (i <- 1 to 3) {
> futures += executor.submit(new Runnable {
>   override def run(): Unit = {
> val d = if (Random.nextInt(2) == 0) Some("d value") else None
> val e = if (Random.nextInt(2) == 0) Some(5.0) else None
> val f = if (Random.nextInt(2) == 0) Some(6.0) else None
> println("DEBUG", d, e, f)
> sparkSession.createDataset(Seq(
>   MyClass(new Timestamp(1L), "b", "c", d, e, f)
> ))
>   }
> })
>   }
>   futures.foreach(_.get())
> } finally {
>   println("SHUTDOWN")
>   executor.shutdown()
>   sparkSession.stop()
> }
>   }
>   case class MyClass(
> a: Timestamp,
> b: String,
> c: String,
> d: Option[String],
> e: Option[Double],
> f: Option[Double]
>   )
> }
> {code}
> So it will usually come up during
> {code:bash}
> for i in $(seq 1 200); do
>   echo $i
>   spark-submit --master local[4] target/scala-2.11/spark-test_2.11-0.1.jar
> done
> {code}
> causing a variety of possible errors, such as
> {code}Exception in thread "main" java.util.concurrent.ExecutionException: 
> scala.MatchError: scala.Option[String] (of class 
> scala.reflect.internal.Types$ClassArgsTypeRef)
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> Caused by: scala.MatchError: scala.Option[String] (of class 
> scala.reflect.internal.Types$ClassArgsTypeRef)
>   at 
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1.apply(ScalaReflection.scala:210){code}
> or
> {code}Exception in thread "main" java.util.concurrent.ExecutionException: 
> java.lang.UnsupportedOperationException: Schema for type 
> scala.Option[scala.Double] is not supported
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> Caused by: java.lang.UnsupportedOperationException: Schema for type 
> scala.Option[scala.Double] is not supported
>   at 
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:789){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-26555) Thread safety issue causes createDataset to fail with misleading errors

2019-03-13 Thread Martin Loncaric (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16792086#comment-16792086
 ] 

Martin Loncaric edited comment on SPARK-26555 at 3/13/19 8:26 PM:
--

Update: I have proved that the issue lies in reflection thread safety issues in 
org.apache.spark.sql.catalyst.ScalaReflection: 
https://stackoverflow.com/questions/55150590/thread-safety-in-scala-reflection-with-type-matching

Investigating whether this can be fixed with different usage of the reflection 
library, or whether this is a scala issue.


was (Author: mwlon):
Update: I have been able to replicate this without Spark at all, using snippets 
from org.apache.spark.sql.catalyst.ScalaReflection: 
https://stackoverflow.com/questions/55150590/thread-safety-in-scala-reflection-with-type-matching

Investigating whether this can be fixed with different usage of the reflection 
library, or whether this is a scala issue.

> Thread safety issue causes createDataset to fail with misleading errors
> ---
>
> Key: SPARK-26555
> URL: https://issues.apache.org/jira/browse/SPARK-26555
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Martin Loncaric
>Priority: Major
>
> This can be replicated (~2% of the time) with
> {code:scala}
> import java.sql.Timestamp
> import java.util.concurrent.{Executors, Future}
> import org.apache.spark.sql.SparkSession
> import scala.collection.mutable.ListBuffer
> import scala.concurrent.ExecutionContext
> import scala.util.Random
> object Main {
>   def main(args: Array[String]): Unit = {
> val sparkSession = SparkSession.builder
>   .getOrCreate()
> import sparkSession.implicits._
> val executor = Executors.newFixedThreadPool(1)
> try {
>   implicit val xc: ExecutionContext = 
> ExecutionContext.fromExecutorService(executor)
>   val futures = new ListBuffer[Future[_]]()
>   for (i <- 1 to 3) {
> futures += executor.submit(new Runnable {
>   override def run(): Unit = {
> val d = if (Random.nextInt(2) == 0) Some("d value") else None
> val e = if (Random.nextInt(2) == 0) Some(5.0) else None
> val f = if (Random.nextInt(2) == 0) Some(6.0) else None
> println("DEBUG", d, e, f)
> sparkSession.createDataset(Seq(
>   MyClass(new Timestamp(1L), "b", "c", d, e, f)
> ))
>   }
> })
>   }
>   futures.foreach(_.get())
> } finally {
>   println("SHUTDOWN")
>   executor.shutdown()
>   sparkSession.stop()
> }
>   }
>   case class MyClass(
> a: Timestamp,
> b: String,
> c: String,
> d: Option[String],
> e: Option[Double],
> f: Option[Double]
>   )
> }
> {code}
> So it will usually come up during
> {code:bash}
> for i in $(seq 1 200); do
>   echo $i
>   spark-submit --master local[4] target/scala-2.11/spark-test_2.11-0.1.jar
> done
> {code}
> causing a variety of possible errors, such as
> {code}Exception in thread "main" java.util.concurrent.ExecutionException: 
> scala.MatchError: scala.Option[String] (of class 
> scala.reflect.internal.Types$ClassArgsTypeRef)
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> Caused by: scala.MatchError: scala.Option[String] (of class 
> scala.reflect.internal.Types$ClassArgsTypeRef)
>   at 
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1.apply(ScalaReflection.scala:210){code}
> or
> {code}Exception in thread "main" java.util.concurrent.ExecutionException: 
> java.lang.UnsupportedOperationException: Schema for type 
> scala.Option[scala.Double] is not supported
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> Caused by: java.lang.UnsupportedOperationException: Schema for type 
> scala.Option[scala.Double] is not supported
>   at 
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:789){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26555) Thread safety issue causes createDataset to fail with misleading errors

2019-03-13 Thread Martin Loncaric (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16792086#comment-16792086
 ] 

Martin Loncaric commented on SPARK-26555:
-

Update: I have been able to replicate this without Spark at all, using snippets 
from org.apache.spark.sql.catalyst.ScalaReflection: 
https://stackoverflow.com/questions/55150590/thread-safety-in-scala-reflection-with-type-matching

Investigating whether this can be fixed with different usage of the reflection 
library, or whether this is a scala issue.

> Thread safety issue causes createDataset to fail with misleading errors
> ---
>
> Key: SPARK-26555
> URL: https://issues.apache.org/jira/browse/SPARK-26555
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Martin Loncaric
>Priority: Major
>
> This can be replicated (~2% of the time) with
> {code:scala}
> import java.sql.Timestamp
> import java.util.concurrent.{Executors, Future}
> import org.apache.spark.sql.SparkSession
> import scala.collection.mutable.ListBuffer
> import scala.concurrent.ExecutionContext
> import scala.util.Random
> object Main {
>   def main(args: Array[String]): Unit = {
> val sparkSession = SparkSession.builder
>   .getOrCreate()
> import sparkSession.implicits._
> val executor = Executors.newFixedThreadPool(1)
> try {
>   implicit val xc: ExecutionContext = 
> ExecutionContext.fromExecutorService(executor)
>   val futures = new ListBuffer[Future[_]]()
>   for (i <- 1 to 3) {
> futures += executor.submit(new Runnable {
>   override def run(): Unit = {
> val d = if (Random.nextInt(2) == 0) Some("d value") else None
> val e = if (Random.nextInt(2) == 0) Some(5.0) else None
> val f = if (Random.nextInt(2) == 0) Some(6.0) else None
> println("DEBUG", d, e, f)
> sparkSession.createDataset(Seq(
>   MyClass(new Timestamp(1L), "b", "c", d, e, f)
> ))
>   }
> })
>   }
>   futures.foreach(_.get())
> } finally {
>   println("SHUTDOWN")
>   executor.shutdown()
>   sparkSession.stop()
> }
>   }
>   case class MyClass(
> a: Timestamp,
> b: String,
> c: String,
> d: Option[String],
> e: Option[Double],
> f: Option[Double]
>   )
> }
> {code}
> So it will usually come up during
> {code:bash}
> for i in $(seq 1 200); do
>   echo $i
>   spark-submit --master local[4] target/scala-2.11/spark-test_2.11-0.1.jar
> done
> {code}
> causing a variety of possible errors, such as
> {code}Exception in thread "main" java.util.concurrent.ExecutionException: 
> scala.MatchError: scala.Option[String] (of class 
> scala.reflect.internal.Types$ClassArgsTypeRef)
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> Caused by: scala.MatchError: scala.Option[String] (of class 
> scala.reflect.internal.Types$ClassArgsTypeRef)
>   at 
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1.apply(ScalaReflection.scala:210){code}
> or
> {code}Exception in thread "main" java.util.concurrent.ExecutionException: 
> java.lang.UnsupportedOperationException: Schema for type 
> scala.Option[scala.Double] is not supported
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> Caused by: java.lang.UnsupportedOperationException: Schema for type 
> scala.Option[scala.Double] is not supported
>   at 
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:789){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-26555) Thread safety issue causes createDataset to fail with misleading errors

2019-03-09 Thread Martin Loncaric (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16788773#comment-16788773
 ] 

Martin Loncaric edited comment on SPARK-26555 at 3/9/19 7:04 PM:
-

You can literally try any dataset with Option's in the schema and replicate 
this issue. For example,

sparkSession.createDataset(Seq(
  MyClass(new Timestamp(1L), "b", "c", Some("d"), Some(1.0), 
Some(2.0))
))

I think the code I left is pretty clear - it fails sometimes. Run it once, and 
it may or may not work. I don't run multiple spark-submit's in parallel.


was (Author: mwlon):
You can literally try any dataset and replicate this issue. For example,

sparkSession.createDataset(Seq(
  MyClass(new Timestamp(1L), "b", "c", Some("d"), Some(1.0), 
Some(2.0))
))

I think the code I left is pretty clear - it fails sometimes. Run it once, and 
it may or may not work. I don't run multiple spark-submit's in parallel.

> Thread safety issue causes createDataset to fail with misleading errors
> ---
>
> Key: SPARK-26555
> URL: https://issues.apache.org/jira/browse/SPARK-26555
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Martin Loncaric
>Priority: Major
>
> This can be replicated (~2% of the time) with
> {code:scala}
> import java.sql.Timestamp
> import java.util.concurrent.{Executors, Future}
> import org.apache.spark.sql.SparkSession
> import scala.collection.mutable.ListBuffer
> import scala.concurrent.ExecutionContext
> import scala.util.Random
> object Main {
>   def main(args: Array[String]): Unit = {
> val sparkSession = SparkSession.builder
>   .getOrCreate()
> import sparkSession.implicits._
> val executor = Executors.newFixedThreadPool(1)
> try {
>   implicit val xc: ExecutionContext = 
> ExecutionContext.fromExecutorService(executor)
>   val futures = new ListBuffer[Future[_]]()
>   for (i <- 1 to 3) {
> futures += executor.submit(new Runnable {
>   override def run(): Unit = {
> val d = if (Random.nextInt(2) == 0) Some("d value") else None
> val e = if (Random.nextInt(2) == 0) Some(5.0) else None
> val f = if (Random.nextInt(2) == 0) Some(6.0) else None
> println("DEBUG", d, e, f)
> sparkSession.createDataset(Seq(
>   MyClass(new Timestamp(1L), "b", "c", d, e, f)
> ))
>   }
> })
>   }
>   futures.foreach(_.get())
> } finally {
>   println("SHUTDOWN")
>   executor.shutdown()
>   sparkSession.stop()
> }
>   }
>   case class MyClass(
> a: Timestamp,
> b: String,
> c: String,
> d: Option[String],
> e: Option[Double],
> f: Option[Double]
>   )
> }
> {code}
> So it will usually come up during
> {code:bash}
> for i in $(seq 1 200); do
>   echo $i
>   spark-submit --master local[4] target/scala-2.11/spark-test_2.11-0.1.jar
> done
> {code}
> causing a variety of possible errors, such as
> {code}Exception in thread "main" java.util.concurrent.ExecutionException: 
> scala.MatchError: scala.Option[String] (of class 
> scala.reflect.internal.Types$ClassArgsTypeRef)
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> Caused by: scala.MatchError: scala.Option[String] (of class 
> scala.reflect.internal.Types$ClassArgsTypeRef)
>   at 
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1.apply(ScalaReflection.scala:210){code}
> or
> {code}Exception in thread "main" java.util.concurrent.ExecutionException: 
> java.lang.UnsupportedOperationException: Schema for type 
> scala.Option[scala.Double] is not supported
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> Caused by: java.lang.UnsupportedOperationException: Schema for type 
> scala.Option[scala.Double] is not supported
>   at 
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:789){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26555) Thread safety issue causes createDataset to fail with misleading errors

2019-03-09 Thread Martin Loncaric (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16788773#comment-16788773
 ] 

Martin Loncaric commented on SPARK-26555:
-

You can literally try any dataset and replicate this issue. For example,

sparkSession.createDataset(Seq(
  MyClass(new Timestamp(1L), "b", "c", Some("d"), Some(1.0), 
Some(2.0))
))

I think the code I left is pretty clear - it fails sometimes. Run it once, and 
it may or may not work. I don't run multiple spark-submit's in parallel.

> Thread safety issue causes createDataset to fail with misleading errors
> ---
>
> Key: SPARK-26555
> URL: https://issues.apache.org/jira/browse/SPARK-26555
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Martin Loncaric
>Priority: Major
>
> This can be replicated (~2% of the time) with
> {code:scala}
> import java.sql.Timestamp
> import java.util.concurrent.{Executors, Future}
> import org.apache.spark.sql.SparkSession
> import scala.collection.mutable.ListBuffer
> import scala.concurrent.ExecutionContext
> import scala.util.Random
> object Main {
>   def main(args: Array[String]): Unit = {
> val sparkSession = SparkSession.builder
>   .getOrCreate()
> import sparkSession.implicits._
> val executor = Executors.newFixedThreadPool(1)
> try {
>   implicit val xc: ExecutionContext = 
> ExecutionContext.fromExecutorService(executor)
>   val futures = new ListBuffer[Future[_]]()
>   for (i <- 1 to 3) {
> futures += executor.submit(new Runnable {
>   override def run(): Unit = {
> val d = if (Random.nextInt(2) == 0) Some("d value") else None
> val e = if (Random.nextInt(2) == 0) Some(5.0) else None
> val f = if (Random.nextInt(2) == 0) Some(6.0) else None
> println("DEBUG", d, e, f)
> sparkSession.createDataset(Seq(
>   MyClass(new Timestamp(1L), "b", "c", d, e, f)
> ))
>   }
> })
>   }
>   futures.foreach(_.get())
> } finally {
>   println("SHUTDOWN")
>   executor.shutdown()
>   sparkSession.stop()
> }
>   }
>   case class MyClass(
> a: Timestamp,
> b: String,
> c: String,
> d: Option[String],
> e: Option[Double],
> f: Option[Double]
>   )
> }
> {code}
> So it will usually come up during
> {code:bash}
> for i in $(seq 1 200); do
>   echo $i
>   spark-submit --master local[4] target/scala-2.11/spark-test_2.11-0.1.jar
> done
> {code}
> causing a variety of possible errors, such as
> {code}Exception in thread "main" java.util.concurrent.ExecutionException: 
> scala.MatchError: scala.Option[String] (of class 
> scala.reflect.internal.Types$ClassArgsTypeRef)
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> Caused by: scala.MatchError: scala.Option[String] (of class 
> scala.reflect.internal.Types$ClassArgsTypeRef)
>   at 
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1.apply(ScalaReflection.scala:210){code}
> or
> {code}Exception in thread "main" java.util.concurrent.ExecutionException: 
> java.lang.UnsupportedOperationException: Schema for type 
> scala.Option[scala.Double] is not supported
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> Caused by: java.lang.UnsupportedOperationException: Schema for type 
> scala.Option[scala.Double] is not supported
>   at 
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:789){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-27098) Flaky missing file parts when writing to Ceph without error

2019-03-07 Thread Martin Loncaric (JIRA)
Martin Loncaric created SPARK-27098:
---

 Summary: Flaky missing file parts when writing to Ceph without 
error
 Key: SPARK-27098
 URL: https://issues.apache.org/jira/browse/SPARK-27098
 Project: Spark
  Issue Type: Bug
  Components: Input/Output
Affects Versions: 2.4.0
Reporter: Martin Loncaric


https://stackoverflow.com/questions/54935822/spark-s3a-write-omits-upload-part-without-failure/55031233?noredirect=1#comment96835218_55031233

Using 2.4.0 with Hadoop 2.7, hadoop-aws 2.7.5, occasionally a file part will be 
missing; i.e. part 3 here:

```
> aws s3 ls my-bucket/folder/
2019-02-28 13:07:21  0 _SUCCESS
2019-02-28 13:06:58   79428651 
part-0-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
2019-02-28 13:06:59   79586172 
part-1-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
2019-02-28 13:07:00   79561910 
part-2-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
2019-02-28 13:07:01   79192617 
part-4-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
2019-02-28 13:07:07   79364413 
part-5-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
2019-02-28 13:07:08   79623254 
part-6-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
2019-02-28 13:07:10   79445030 
part-7-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
2019-02-28 13:07:10   79474923 
part-8-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
2019-02-28 13:07:11   79477310 
part-9-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
2019-02-28 13:07:12   79331453 
part-00010-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
2019-02-28 13:07:13   79567600 
part-00011-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
2019-02-28 13:07:13   79388012 
part-00012-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
2019-02-28 13:07:14   79308387 
part-00013-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
2019-02-28 13:07:15   79455483 
part-00014-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
2019-02-28 13:07:17   79512342 
part-00015-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
2019-02-28 13:07:18   79403307 
part-00016-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
2019-02-28 13:07:18   79617769 
part-00017-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
2019-02-28 13:07:19   79333534 
part-00018-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
2019-02-28 13:07:20   79543324 
part-00019-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet
```

However, the write succeeds and leaves a _SUCCESS file.

This can be caught by additionally checking afterward whether the number of 
written file parts agrees with the number of partitions, but Spark should at 
least fail on its own and leave a meaningful stack trace in this case.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27015) spark-submit does not properly escape arguments sent to Mesos dispatcher

2019-03-04 Thread Martin Loncaric (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16783944#comment-16783944
 ] 

Martin Loncaric commented on SPARK-27015:
-

Created a PR: https://github.com/apache/spark/pull/23967

> spark-submit does not properly escape arguments sent to Mesos dispatcher
> 
>
> Key: SPARK-27015
> URL: https://issues.apache.org/jira/browse/SPARK-27015
> Project: Spark
>  Issue Type: Bug
>  Components: Mesos
>Affects Versions: 2.3.3, 2.4.0
>Reporter: Martin Loncaric
>Priority: Major
> Fix For: 2.5.0, 3.0.0
>
>
> Arguments sent to the dispatcher must be escaped; for instance,
> {noformat}spark-submit --master mesos://url:port my.jar --arg1 "a 
> b$c"{noformat}
> fails, and instead must be submitted as
> {noformat}spark-submit --master mesos://url:port my.jar --arg1 "a\\ 
> b\\$c"{noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26192) MesosClusterScheduler reads options from dispatcher conf instead of submission conf

2019-03-04 Thread Martin Loncaric (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16783810#comment-16783810
 ] 

Martin Loncaric commented on SPARK-26192:
-

[~dongjoon] Thanks, I will pay more attention to those fields.

However, I believe this is a bug. It violates behavior specified in the 
https://spark.apache.org/docs/latest/running-on-mesos.html#configuration. Can 
we merge into 2.4.1 as well?

> MesosClusterScheduler reads options from dispatcher conf instead of 
> submission conf
> ---
>
> Key: SPARK-26192
> URL: https://issues.apache.org/jira/browse/SPARK-26192
> Project: Spark
>  Issue Type: Improvement
>  Components: Mesos
>Affects Versions: 2.3.0, 2.3.1, 2.3.2, 2.4.0
>Reporter: Martin Loncaric
>Assignee: Martin Loncaric
>Priority: Minor
> Fix For: 3.0.0
>
>
> There is at least one option accessed in MesosClusterScheduler that should 
> come from the submission's configuration instead of the dispatcher's:
> spark.mesos.fetcherCache.enable
> Coincidentally, the spark.mesos.fetcherCache.enable option was previously 
> misnamed, as referenced in the linked JIRA.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-26192) MesosClusterScheduler reads options from dispatcher conf instead of submission conf

2019-03-04 Thread Martin Loncaric (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16783810#comment-16783810
 ] 

Martin Loncaric edited comment on SPARK-26192 at 3/4/19 9:51 PM:
-

[~dongjoon] Thanks, I will pay more attention to those fields.

However, I believe this is a bug. It violates behavior specified in 
https://spark.apache.org/docs/latest/running-on-mesos.html#configuration. Can 
we merge into at least 2.4.1 as well?


was (Author: mwlon):
[~dongjoon] Thanks, I will pay more attention to those fields.

However, I believe this is a bug. It violates behavior specified in 
https://spark.apache.org/docs/latest/running-on-mesos.html#configuration. Can 
we merge into 2.4.1 as well?

> MesosClusterScheduler reads options from dispatcher conf instead of 
> submission conf
> ---
>
> Key: SPARK-26192
> URL: https://issues.apache.org/jira/browse/SPARK-26192
> Project: Spark
>  Issue Type: Improvement
>  Components: Mesos
>Affects Versions: 2.3.0, 2.3.1, 2.3.2, 2.4.0
>Reporter: Martin Loncaric
>Assignee: Martin Loncaric
>Priority: Minor
> Fix For: 3.0.0
>
>
> There is at least one option accessed in MesosClusterScheduler that should 
> come from the submission's configuration instead of the dispatcher's:
> spark.mesos.fetcherCache.enable
> Coincidentally, the spark.mesos.fetcherCache.enable option was previously 
> misnamed, as referenced in the linked JIRA.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27014) Support removal of jars and Spark binaries from Mesos driver and executor sandboxes

2019-03-04 Thread Martin Loncaric (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27014?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16783811#comment-16783811
 ] 

Martin Loncaric commented on SPARK-27014:
-

Sure, will keep that in mind.

> Support removal of jars and Spark binaries from Mesos driver and executor 
> sandboxes
> ---
>
> Key: SPARK-27014
> URL: https://issues.apache.org/jira/browse/SPARK-27014
> Project: Spark
>  Issue Type: New Feature
>  Components: Mesos
>Affects Versions: 3.0.0
>Reporter: Martin Loncaric
>Priority: Minor
>
> Currently, each Spark application run on Mesos leaves behind at least 500MB 
> of data in sandbox directories, coming from Spark binaries and copied URIs. 
> These can build up as a disk leak, causing major issues on Mesos clusters 
> unless their grace period for sandbox directories is very short.
> Spark should have a feature to delete these (from both driver and executor 
> sandboxes) on teardown.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-26192) MesosClusterScheduler reads options from dispatcher conf instead of submission conf

2019-03-04 Thread Martin Loncaric (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16783810#comment-16783810
 ] 

Martin Loncaric edited comment on SPARK-26192 at 3/4/19 9:49 PM:
-

[~dongjoon] Thanks, I will pay more attention to those fields.

However, I believe this is a bug. It violates behavior specified in 
https://spark.apache.org/docs/latest/running-on-mesos.html#configuration. Can 
we merge into 2.4.1 as well?


was (Author: mwlon):
[~dongjoon] Thanks, I will pay more attention to those fields.

However, I believe this is a bug. It violates behavior specified in the 
https://spark.apache.org/docs/latest/running-on-mesos.html#configuration. Can 
we merge into 2.4.1 as well?

> MesosClusterScheduler reads options from dispatcher conf instead of 
> submission conf
> ---
>
> Key: SPARK-26192
> URL: https://issues.apache.org/jira/browse/SPARK-26192
> Project: Spark
>  Issue Type: Improvement
>  Components: Mesos
>Affects Versions: 2.3.0, 2.3.1, 2.3.2, 2.4.0
>Reporter: Martin Loncaric
>Assignee: Martin Loncaric
>Priority: Minor
> Fix For: 3.0.0
>
>
> There is at least one option accessed in MesosClusterScheduler that should 
> come from the submission's configuration instead of the dispatcher's:
> spark.mesos.fetcherCache.enable
> Coincidentally, the spark.mesos.fetcherCache.enable option was previously 
> misnamed, as referenced in the linked JIRA.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27015) spark-submit does not properly escape arguments sent to Mesos dispatcher

2019-03-03 Thread Martin Loncaric (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martin Loncaric updated SPARK-27015:

Affects Version/s: (was: 2.5.0)
   (was: 3.0.0)
   2.3.3
   2.4.0

> spark-submit does not properly escape arguments sent to Mesos dispatcher
> 
>
> Key: SPARK-27015
> URL: https://issues.apache.org/jira/browse/SPARK-27015
> Project: Spark
>  Issue Type: Bug
>  Components: Mesos
>Affects Versions: 2.3.3, 2.4.0
>Reporter: Martin Loncaric
>Priority: Major
>
> Arguments sent to the dispatcher must be escaped; for instance,
> {noformat}spark-submit --master mesos://url:port my.jar --arg1 "a 
> b$c"{noformat}
> fails, and instead must be submitted as
> {noformat}spark-submit --master mesos://url:port my.jar --arg1 "a\\ 
> b\\$c"{noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27015) spark-submit does not properly escape arguments sent to Mesos dispatcher

2019-03-03 Thread Martin Loncaric (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martin Loncaric updated SPARK-27015:

Fix Version/s: 3.0.0
   2.5.0

> spark-submit does not properly escape arguments sent to Mesos dispatcher
> 
>
> Key: SPARK-27015
> URL: https://issues.apache.org/jira/browse/SPARK-27015
> Project: Spark
>  Issue Type: Bug
>  Components: Mesos
>Affects Versions: 2.3.3, 2.4.0
>Reporter: Martin Loncaric
>Priority: Major
> Fix For: 2.5.0, 3.0.0
>
>
> Arguments sent to the dispatcher must be escaped; for instance,
> {noformat}spark-submit --master mesos://url:port my.jar --arg1 "a 
> b$c"{noformat}
> fails, and instead must be submitted as
> {noformat}spark-submit --master mesos://url:port my.jar --arg1 "a\\ 
> b\\$c"{noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27015) spark-submit does not properly escape arguments sent to Mesos dispatcher

2019-03-03 Thread Martin Loncaric (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martin Loncaric updated SPARK-27015:

Affects Version/s: (was: 2.3.3)
   (was: 2.4.0)
   3.0.0
   2.5.0

> spark-submit does not properly escape arguments sent to Mesos dispatcher
> 
>
> Key: SPARK-27015
> URL: https://issues.apache.org/jira/browse/SPARK-27015
> Project: Spark
>  Issue Type: Bug
>  Components: Mesos
>Affects Versions: 2.5.0, 3.0.0
>Reporter: Martin Loncaric
>Priority: Major
>
> Arguments sent to the dispatcher must be escaped; for instance,
> {noformat}spark-submit --master mesos://url:port my.jar --arg1 "a 
> b$c"{noformat}
> fails, and instead must be submitted as
> {noformat}spark-submit --master mesos://url:port my.jar --arg1 "a\\ 
> b\\$c"{noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26555) Thread safety issue causes createDataset to fail with misleading errors

2019-03-03 Thread Martin Loncaric (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16782821#comment-16782821
 ] 

Martin Loncaric commented on SPARK-26555:
-

Yes - when I take away any randomness and use the same dataset every time (say, 
with Some(something) for each optional value), I still get this issue.

> Thread safety issue causes createDataset to fail with misleading errors
> ---
>
> Key: SPARK-26555
> URL: https://issues.apache.org/jira/browse/SPARK-26555
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Martin Loncaric
>Priority: Major
>
> This can be replicated (~2% of the time) with
> {code:scala}
> import java.sql.Timestamp
> import java.util.concurrent.{Executors, Future}
> import org.apache.spark.sql.SparkSession
> import scala.collection.mutable.ListBuffer
> import scala.concurrent.ExecutionContext
> import scala.util.Random
> object Main {
>   def main(args: Array[String]): Unit = {
> val sparkSession = SparkSession.builder
>   .getOrCreate()
> import sparkSession.implicits._
> val executor = Executors.newFixedThreadPool(1)
> try {
>   implicit val xc: ExecutionContext = 
> ExecutionContext.fromExecutorService(executor)
>   val futures = new ListBuffer[Future[_]]()
>   for (i <- 1 to 3) {
> futures += executor.submit(new Runnable {
>   override def run(): Unit = {
> val d = if (Random.nextInt(2) == 0) Some("d value") else None
> val e = if (Random.nextInt(2) == 0) Some(5.0) else None
> val f = if (Random.nextInt(2) == 0) Some(6.0) else None
> println("DEBUG", d, e, f)
> sparkSession.createDataset(Seq(
>   MyClass(new Timestamp(1L), "b", "c", d, e, f)
> ))
>   }
> })
>   }
>   futures.foreach(_.get())
> } finally {
>   println("SHUTDOWN")
>   executor.shutdown()
>   sparkSession.stop()
> }
>   }
>   case class MyClass(
> a: Timestamp,
> b: String,
> c: String,
> d: Option[String],
> e: Option[Double],
> f: Option[Double]
>   )
> }
> {code}
> So it will usually come up during
> {code:bash}
> for i in $(seq 1 200); do
>   echo $i
>   spark-submit --master local[4] target/scala-2.11/spark-test_2.11-0.1.jar
> done
> {code}
> causing a variety of possible errors, such as
> {code}Exception in thread "main" java.util.concurrent.ExecutionException: 
> scala.MatchError: scala.Option[String] (of class 
> scala.reflect.internal.Types$ClassArgsTypeRef)
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> Caused by: scala.MatchError: scala.Option[String] (of class 
> scala.reflect.internal.Types$ClassArgsTypeRef)
>   at 
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1.apply(ScalaReflection.scala:210){code}
> or
> {code}Exception in thread "main" java.util.concurrent.ExecutionException: 
> java.lang.UnsupportedOperationException: Schema for type 
> scala.Option[scala.Double] is not supported
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> Caused by: java.lang.UnsupportedOperationException: Schema for type 
> scala.Option[scala.Double] is not supported
>   at 
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:789){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-26555) Thread safety issue causes createDataset to fail with misleading errors

2019-03-03 Thread Martin Loncaric (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16782821#comment-16782821
 ] 

Martin Loncaric edited comment on SPARK-26555 at 3/3/19 6:56 PM:
-

Yes - when I take away any randomness and use the same dataset every time (say, 
with Some(something) for each optional value), I still get this issue.

I've run this code in a couple different environments and obtained the same 
result, so you should be able to verify this as well.


was (Author: mwlon):
Yes - when I take away any randomness and use the same dataset every time (say, 
with Some(something) for each optional value), I still get this issue.

> Thread safety issue causes createDataset to fail with misleading errors
> ---
>
> Key: SPARK-26555
> URL: https://issues.apache.org/jira/browse/SPARK-26555
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Martin Loncaric
>Priority: Major
>
> This can be replicated (~2% of the time) with
> {code:scala}
> import java.sql.Timestamp
> import java.util.concurrent.{Executors, Future}
> import org.apache.spark.sql.SparkSession
> import scala.collection.mutable.ListBuffer
> import scala.concurrent.ExecutionContext
> import scala.util.Random
> object Main {
>   def main(args: Array[String]): Unit = {
> val sparkSession = SparkSession.builder
>   .getOrCreate()
> import sparkSession.implicits._
> val executor = Executors.newFixedThreadPool(1)
> try {
>   implicit val xc: ExecutionContext = 
> ExecutionContext.fromExecutorService(executor)
>   val futures = new ListBuffer[Future[_]]()
>   for (i <- 1 to 3) {
> futures += executor.submit(new Runnable {
>   override def run(): Unit = {
> val d = if (Random.nextInt(2) == 0) Some("d value") else None
> val e = if (Random.nextInt(2) == 0) Some(5.0) else None
> val f = if (Random.nextInt(2) == 0) Some(6.0) else None
> println("DEBUG", d, e, f)
> sparkSession.createDataset(Seq(
>   MyClass(new Timestamp(1L), "b", "c", d, e, f)
> ))
>   }
> })
>   }
>   futures.foreach(_.get())
> } finally {
>   println("SHUTDOWN")
>   executor.shutdown()
>   sparkSession.stop()
> }
>   }
>   case class MyClass(
> a: Timestamp,
> b: String,
> c: String,
> d: Option[String],
> e: Option[Double],
> f: Option[Double]
>   )
> }
> {code}
> So it will usually come up during
> {code:bash}
> for i in $(seq 1 200); do
>   echo $i
>   spark-submit --master local[4] target/scala-2.11/spark-test_2.11-0.1.jar
> done
> {code}
> causing a variety of possible errors, such as
> {code}Exception in thread "main" java.util.concurrent.ExecutionException: 
> scala.MatchError: scala.Option[String] (of class 
> scala.reflect.internal.Types$ClassArgsTypeRef)
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> Caused by: scala.MatchError: scala.Option[String] (of class 
> scala.reflect.internal.Types$ClassArgsTypeRef)
>   at 
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1.apply(ScalaReflection.scala:210){code}
> or
> {code}Exception in thread "main" java.util.concurrent.ExecutionException: 
> java.lang.UnsupportedOperationException: Schema for type 
> scala.Option[scala.Double] is not supported
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> Caused by: java.lang.UnsupportedOperationException: Schema for type 
> scala.Option[scala.Double] is not supported
>   at 
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:789){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26555) Thread safety issue causes createDataset to fail with misleading errors

2019-03-01 Thread Martin Loncaric (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16782258#comment-16782258
 ] 

Martin Loncaric commented on SPARK-26555:
-

I can also replicate with different schemas containing Option.

When I remove all Option columns from the schema, the sporadic failure goes 
away. This also never happens when I remove the concurrency.

> Thread safety issue causes createDataset to fail with misleading errors
> ---
>
> Key: SPARK-26555
> URL: https://issues.apache.org/jira/browse/SPARK-26555
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Martin Loncaric
>Priority: Major
>
> This can be replicated (~2% of the time) with
> {code:scala}
> import java.sql.Timestamp
> import java.util.concurrent.{Executors, Future}
> import org.apache.spark.sql.SparkSession
> import scala.collection.mutable.ListBuffer
> import scala.concurrent.ExecutionContext
> import scala.util.Random
> object Main {
>   def main(args: Array[String]): Unit = {
> val sparkSession = SparkSession.builder
>   .getOrCreate()
> import sparkSession.implicits._
> val executor = Executors.newFixedThreadPool(1)
> try {
>   implicit val xc: ExecutionContext = 
> ExecutionContext.fromExecutorService(executor)
>   val futures = new ListBuffer[Future[_]]()
>   for (i <- 1 to 3) {
> futures += executor.submit(new Runnable {
>   override def run(): Unit = {
> val d = if (Random.nextInt(2) == 0) Some("d value") else None
> val e = if (Random.nextInt(2) == 0) Some(5.0) else None
> val f = if (Random.nextInt(2) == 0) Some(6.0) else None
> println("DEBUG", d, e, f)
> sparkSession.createDataset(Seq(
>   MyClass(new Timestamp(1L), "b", "c", d, e, f)
> ))
>   }
> })
>   }
>   futures.foreach(_.get())
> } finally {
>   println("SHUTDOWN")
>   executor.shutdown()
>   sparkSession.stop()
> }
>   }
>   case class MyClass(
> a: Timestamp,
> b: String,
> c: String,
> d: Option[String],
> e: Option[Double],
> f: Option[Double]
>   )
> }
> {code}
> So it will usually come up during
> {code:bash}
> for i in $(seq 1 200); do
>   echo $i
>   spark-submit --master local[4] target/scala-2.11/spark-test_2.11-0.1.jar
> done
> {code}
> causing a variety of possible errors, such as
> {code}Exception in thread "main" java.util.concurrent.ExecutionException: 
> scala.MatchError: scala.Option[String] (of class 
> scala.reflect.internal.Types$ClassArgsTypeRef)
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> Caused by: scala.MatchError: scala.Option[String] (of class 
> scala.reflect.internal.Types$ClassArgsTypeRef)
>   at 
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1.apply(ScalaReflection.scala:210){code}
> or
> {code}Exception in thread "main" java.util.concurrent.ExecutionException: 
> java.lang.UnsupportedOperationException: Schema for type 
> scala.Option[scala.Double] is not supported
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> Caused by: java.lang.UnsupportedOperationException: Schema for type 
> scala.Option[scala.Double] is not supported
>   at 
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:789){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-26555) Thread safety issue causes createDataset to fail with misleading errors

2019-03-01 Thread Martin Loncaric (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16782239#comment-16782239
 ] 

Martin Loncaric edited comment on SPARK-26555 at 3/2/19 1:25 AM:
-

I was able to replicate with both all rows in all optional columns as `Some()` 
and all rows in all optional columns as `None`.


was (Author: mwlon):
I was able to replicate with both all rows as `Some()` and all rows as `None`.

> Thread safety issue causes createDataset to fail with misleading errors
> ---
>
> Key: SPARK-26555
> URL: https://issues.apache.org/jira/browse/SPARK-26555
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Martin Loncaric
>Priority: Major
>
> This can be replicated (~2% of the time) with
> {code:scala}
> import java.sql.Timestamp
> import java.util.concurrent.{Executors, Future}
> import org.apache.spark.sql.SparkSession
> import scala.collection.mutable.ListBuffer
> import scala.concurrent.ExecutionContext
> import scala.util.Random
> object Main {
>   def main(args: Array[String]): Unit = {
> val sparkSession = SparkSession.builder
>   .getOrCreate()
> import sparkSession.implicits._
> val executor = Executors.newFixedThreadPool(1)
> try {
>   implicit val xc: ExecutionContext = 
> ExecutionContext.fromExecutorService(executor)
>   val futures = new ListBuffer[Future[_]]()
>   for (i <- 1 to 3) {
> futures += executor.submit(new Runnable {
>   override def run(): Unit = {
> val d = if (Random.nextInt(2) == 0) Some("d value") else None
> val e = if (Random.nextInt(2) == 0) Some(5.0) else None
> val f = if (Random.nextInt(2) == 0) Some(6.0) else None
> println("DEBUG", d, e, f)
> sparkSession.createDataset(Seq(
>   MyClass(new Timestamp(1L), "b", "c", d, e, f)
> ))
>   }
> })
>   }
>   futures.foreach(_.get())
> } finally {
>   println("SHUTDOWN")
>   executor.shutdown()
>   sparkSession.stop()
> }
>   }
>   case class MyClass(
> a: Timestamp,
> b: String,
> c: String,
> d: Option[String],
> e: Option[Double],
> f: Option[Double]
>   )
> }
> {code}
> So it will usually come up during
> {code:bash}
> for i in $(seq 1 200); do
>   echo $i
>   spark-submit --master local[4] target/scala-2.11/spark-test_2.11-0.1.jar
> done
> {code}
> causing a variety of possible errors, such as
> {code}Exception in thread "main" java.util.concurrent.ExecutionException: 
> scala.MatchError: scala.Option[String] (of class 
> scala.reflect.internal.Types$ClassArgsTypeRef)
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> Caused by: scala.MatchError: scala.Option[String] (of class 
> scala.reflect.internal.Types$ClassArgsTypeRef)
>   at 
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1.apply(ScalaReflection.scala:210){code}
> or
> {code}Exception in thread "main" java.util.concurrent.ExecutionException: 
> java.lang.UnsupportedOperationException: Schema for type 
> scala.Option[scala.Double] is not supported
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> Caused by: java.lang.UnsupportedOperationException: Schema for type 
> scala.Option[scala.Double] is not supported
>   at 
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:789){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26555) Thread safety issue causes createDataset to fail with misleading errors

2019-03-01 Thread Martin Loncaric (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16782239#comment-16782239
 ] 

Martin Loncaric commented on SPARK-26555:
-

I was able to replicate with both all rows as `Some()` and all rows as `None`.

> Thread safety issue causes createDataset to fail with misleading errors
> ---
>
> Key: SPARK-26555
> URL: https://issues.apache.org/jira/browse/SPARK-26555
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Martin Loncaric
>Priority: Major
>
> This can be replicated (~2% of the time) with
> {code:scala}
> import java.sql.Timestamp
> import java.util.concurrent.{Executors, Future}
> import org.apache.spark.sql.SparkSession
> import scala.collection.mutable.ListBuffer
> import scala.concurrent.ExecutionContext
> import scala.util.Random
> object Main {
>   def main(args: Array[String]): Unit = {
> val sparkSession = SparkSession.builder
>   .getOrCreate()
> import sparkSession.implicits._
> val executor = Executors.newFixedThreadPool(1)
> try {
>   implicit val xc: ExecutionContext = 
> ExecutionContext.fromExecutorService(executor)
>   val futures = new ListBuffer[Future[_]]()
>   for (i <- 1 to 3) {
> futures += executor.submit(new Runnable {
>   override def run(): Unit = {
> val d = if (Random.nextInt(2) == 0) Some("d value") else None
> val e = if (Random.nextInt(2) == 0) Some(5.0) else None
> val f = if (Random.nextInt(2) == 0) Some(6.0) else None
> println("DEBUG", d, e, f)
> sparkSession.createDataset(Seq(
>   MyClass(new Timestamp(1L), "b", "c", d, e, f)
> ))
>   }
> })
>   }
>   futures.foreach(_.get())
> } finally {
>   println("SHUTDOWN")
>   executor.shutdown()
>   sparkSession.stop()
> }
>   }
>   case class MyClass(
> a: Timestamp,
> b: String,
> c: String,
> d: Option[String],
> e: Option[Double],
> f: Option[Double]
>   )
> }
> {code}
> So it will usually come up during
> {code:bash}
> for i in $(seq 1 200); do
>   echo $i
>   spark-submit --master local[4] target/scala-2.11/spark-test_2.11-0.1.jar
> done
> {code}
> causing a variety of possible errors, such as
> {code}Exception in thread "main" java.util.concurrent.ExecutionException: 
> scala.MatchError: scala.Option[String] (of class 
> scala.reflect.internal.Types$ClassArgsTypeRef)
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> Caused by: scala.MatchError: scala.Option[String] (of class 
> scala.reflect.internal.Types$ClassArgsTypeRef)
>   at 
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1.apply(ScalaReflection.scala:210){code}
> or
> {code}Exception in thread "main" java.util.concurrent.ExecutionException: 
> java.lang.UnsupportedOperationException: Schema for type 
> scala.Option[scala.Double] is not supported
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> Caused by: java.lang.UnsupportedOperationException: Schema for type 
> scala.Option[scala.Double] is not supported
>   at 
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:789){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26555) Thread safety issue causes createDataset to fail with misleading errors

2019-03-01 Thread Martin Loncaric (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16782185#comment-16782185
 ] 

Martin Loncaric commented on SPARK-26555:
-

Will try it out and report back

> Thread safety issue causes createDataset to fail with misleading errors
> ---
>
> Key: SPARK-26555
> URL: https://issues.apache.org/jira/browse/SPARK-26555
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Martin Loncaric
>Priority: Major
>
> This can be replicated (~2% of the time) with
> {code:scala}
> import java.sql.Timestamp
> import java.util.concurrent.{Executors, Future}
> import org.apache.spark.sql.SparkSession
> import scala.collection.mutable.ListBuffer
> import scala.concurrent.ExecutionContext
> import scala.util.Random
> object Main {
>   def main(args: Array[String]): Unit = {
> val sparkSession = SparkSession.builder
>   .getOrCreate()
> import sparkSession.implicits._
> val executor = Executors.newFixedThreadPool(1)
> try {
>   implicit val xc: ExecutionContext = 
> ExecutionContext.fromExecutorService(executor)
>   val futures = new ListBuffer[Future[_]]()
>   for (i <- 1 to 3) {
> futures += executor.submit(new Runnable {
>   override def run(): Unit = {
> val d = if (Random.nextInt(2) == 0) Some("d value") else None
> val e = if (Random.nextInt(2) == 0) Some(5.0) else None
> val f = if (Random.nextInt(2) == 0) Some(6.0) else None
> println("DEBUG", d, e, f)
> sparkSession.createDataset(Seq(
>   MyClass(new Timestamp(1L), "b", "c", d, e, f)
> ))
>   }
> })
>   }
>   futures.foreach(_.get())
> } finally {
>   println("SHUTDOWN")
>   executor.shutdown()
>   sparkSession.stop()
> }
>   }
>   case class MyClass(
> a: Timestamp,
> b: String,
> c: String,
> d: Option[String],
> e: Option[Double],
> f: Option[Double]
>   )
> }
> {code}
> So it will usually come up during
> {code:bash}
> for i in $(seq 1 200); do
>   echo $i
>   spark-submit --master local[4] target/scala-2.11/spark-test_2.11-0.1.jar
> done
> {code}
> causing a variety of possible errors, such as
> {code}Exception in thread "main" java.util.concurrent.ExecutionException: 
> scala.MatchError: scala.Option[String] (of class 
> scala.reflect.internal.Types$ClassArgsTypeRef)
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> Caused by: scala.MatchError: scala.Option[String] (of class 
> scala.reflect.internal.Types$ClassArgsTypeRef)
>   at 
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1.apply(ScalaReflection.scala:210){code}
> or
> {code}Exception in thread "main" java.util.concurrent.ExecutionException: 
> java.lang.UnsupportedOperationException: Schema for type 
> scala.Option[scala.Double] is not supported
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> Caused by: java.lang.UnsupportedOperationException: Schema for type 
> scala.Option[scala.Double] is not supported
>   at 
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:789){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27015) spark-submit does not properly escape arguments sent to Mesos dispatcher

2019-02-28 Thread Martin Loncaric (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martin Loncaric updated SPARK-27015:

Description: 
Arguments sent to the dispatcher must be escaped; for instance,

{{spark-submit --master mesos://url:port my.jar --arg1 "a b c"}}

fails, and instead must be submitted as
{{spark-submit --master mesos://url:port my.jar --arg1 "a\\\ b\\ c"}}

  was:
Arguments sent to the dispatcher must be escaped; for instance,

{{spark-submit --master mesos://url:port my.jar --arg1 "a b c"}}

fails, and instead must be submitted as
{{spark-submit --master mesos://url:port my.jar --arg1 "a\\ b\\ c"}}


> spark-submit does not properly escape arguments sent to Mesos dispatcher
> 
>
> Key: SPARK-27015
> URL: https://issues.apache.org/jira/browse/SPARK-27015
> Project: Spark
>  Issue Type: New Feature
>  Components: Mesos
>Affects Versions: 2.3.3, 2.4.0
>Reporter: Martin Loncaric
>Priority: Major
>
> Arguments sent to the dispatcher must be escaped; for instance,
> {{spark-submit --master mesos://url:port my.jar --arg1 "a b c"}}
> fails, and instead must be submitted as
> {{spark-submit --master mesos://url:port my.jar --arg1 "a\\\ b\\ c"}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27015) spark-submit does not properly escape arguments sent to Mesos dispatcher

2019-02-28 Thread Martin Loncaric (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martin Loncaric updated SPARK-27015:

Description: 
Arguments sent to the dispatcher must be escaped; for instance,

{noformat}spark-submit --master mesos://url:port my.jar --arg1 "a b c"{noformat}

fails, and instead must be submitted as
{noformat}spark-submit --master mesos://url:port my.jar --arg1 "a\\ b\\ 
c"{noformat}

  was:
Arguments sent to the dispatcher must be escaped; for instance,

{{spark-submit --master mesos://url:port my.jar --arg1 "a b c"}}

fails, and instead must be submitted as
{noformat}spark-submit --master mesos://url:port my.jar --arg1 "a\\ b\\ 
c"{noformat}


> spark-submit does not properly escape arguments sent to Mesos dispatcher
> 
>
> Key: SPARK-27015
> URL: https://issues.apache.org/jira/browse/SPARK-27015
> Project: Spark
>  Issue Type: New Feature
>  Components: Mesos
>Affects Versions: 2.3.3, 2.4.0
>Reporter: Martin Loncaric
>Priority: Major
>
> Arguments sent to the dispatcher must be escaped; for instance,
> {noformat}spark-submit --master mesos://url:port my.jar --arg1 "a b 
> c"{noformat}
> fails, and instead must be submitted as
> {noformat}spark-submit --master mesos://url:port my.jar --arg1 "a\\ b\\ 
> c"{noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27015) spark-submit does not properly escape arguments sent to Mesos dispatcher

2019-02-28 Thread Martin Loncaric (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martin Loncaric updated SPARK-27015:

Description: 
Arguments sent to the dispatcher must be escaped; for instance,

{{spark-submit --master mesos://url:port my.jar --arg1 "a b c"}}

fails, and instead must be submitted as
{{spark-submit --master mesos://url:port my.jar --arg1 "a{nolink:} b\\ c"}}

  was:
Arguments sent to the dispatcher must be escaped; for instance,

{{spark-submit --master mesos://url:port my.jar --arg1 "a b c"}}

fails, and instead must be submitted as
spark-submit --master mesos://url:port my.jar --arg1 "a\\ b\\ c"


> spark-submit does not properly escape arguments sent to Mesos dispatcher
> 
>
> Key: SPARK-27015
> URL: https://issues.apache.org/jira/browse/SPARK-27015
> Project: Spark
>  Issue Type: New Feature
>  Components: Mesos
>Affects Versions: 2.3.3, 2.4.0
>Reporter: Martin Loncaric
>Priority: Major
>
> Arguments sent to the dispatcher must be escaped; for instance,
> {{spark-submit --master mesos://url:port my.jar --arg1 "a b c"}}
> fails, and instead must be submitted as
> {{spark-submit --master mesos://url:port my.jar --arg1 "a{nolink:} b\\ 
> c"}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27015) spark-submit does not properly escape arguments sent to Mesos dispatcher

2019-02-28 Thread Martin Loncaric (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martin Loncaric updated SPARK-27015:

Issue Type: Bug  (was: New Feature)

> spark-submit does not properly escape arguments sent to Mesos dispatcher
> 
>
> Key: SPARK-27015
> URL: https://issues.apache.org/jira/browse/SPARK-27015
> Project: Spark
>  Issue Type: Bug
>  Components: Mesos
>Affects Versions: 2.3.3, 2.4.0
>Reporter: Martin Loncaric
>Priority: Major
>
> Arguments sent to the dispatcher must be escaped; for instance,
> {noformat}spark-submit --master mesos://url:port my.jar --arg1 "a 
> b$c"{noformat}
> fails, and instead must be submitted as
> {noformat}spark-submit --master mesos://url:port my.jar --arg1 "a\\ 
> b\\$c"{noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27015) spark-submit does not properly escape arguments sent to Mesos dispatcher

2019-02-28 Thread Martin Loncaric (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martin Loncaric updated SPARK-27015:

Description: 
Arguments sent to the dispatcher must be escaped; for instance,

{noformat}spark-submit --master mesos://url:port my.jar --arg1 "a b$c"{noformat}

fails, and instead must be submitted as
{noformat}spark-submit --master mesos://url:port my.jar --arg1 "a\\ 
b\\$c"{noformat}

  was:
Arguments sent to the dispatcher must be escaped; for instance,

{noformat}spark-submit --master mesos://url:port my.jar --arg1 "a b c"{noformat}

fails, and instead must be submitted as
{noformat}spark-submit --master mesos://url:port my.jar --arg1 "a\\ b\\ 
c"{noformat}


> spark-submit does not properly escape arguments sent to Mesos dispatcher
> 
>
> Key: SPARK-27015
> URL: https://issues.apache.org/jira/browse/SPARK-27015
> Project: Spark
>  Issue Type: New Feature
>  Components: Mesos
>Affects Versions: 2.3.3, 2.4.0
>Reporter: Martin Loncaric
>Priority: Major
>
> Arguments sent to the dispatcher must be escaped; for instance,
> {noformat}spark-submit --master mesos://url:port my.jar --arg1 "a 
> b$c"{noformat}
> fails, and instead must be submitted as
> {noformat}spark-submit --master mesos://url:port my.jar --arg1 "a\\ 
> b\\$c"{noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27015) spark-submit does not properly escape arguments sent to Mesos dispatcher

2019-02-28 Thread Martin Loncaric (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martin Loncaric updated SPARK-27015:

Description: 
Arguments sent to the dispatcher must be escaped; for instance,

{{spark-submit --master mesos://url:port my.jar --arg1 "a b c"}}

fails, and instead must be submitted as
{{spark-submit --master mesos://url:port my.jar --arg1 "a{noformat}\\{noformat} 
b\\ c"}}

  was:
Arguments sent to the dispatcher must be escaped; for instance,

{{spark-submit --master mesos://url:port my.jar --arg1 "a b c"}}

fails, and instead must be submitted as
{{spark-submit --master mesos://url:port my.jar --arg1 "a#092; b\\ c"}}


> spark-submit does not properly escape arguments sent to Mesos dispatcher
> 
>
> Key: SPARK-27015
> URL: https://issues.apache.org/jira/browse/SPARK-27015
> Project: Spark
>  Issue Type: New Feature
>  Components: Mesos
>Affects Versions: 2.3.3, 2.4.0
>Reporter: Martin Loncaric
>Priority: Major
>
> Arguments sent to the dispatcher must be escaped; for instance,
> {{spark-submit --master mesos://url:port my.jar --arg1 "a b c"}}
> fails, and instead must be submitted as
> {{spark-submit --master mesos://url:port my.jar --arg1 
> "a{noformat}\\{noformat} b\\ c"}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27015) spark-submit does not properly escape arguments sent to Mesos dispatcher

2019-02-28 Thread Martin Loncaric (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martin Loncaric updated SPARK-27015:

Description: 
Arguments sent to the dispatcher must be escaped; for instance,

{{spark-submit --master mesos://url:port my.jar --arg1 "a b c"}}

fails, and instead must be submitted as
{noformat}spark-submit --master mesos://url:port my.jar --arg1 "a\\ b\\ 
c"{noformat}

  was:
Arguments sent to the dispatcher must be escaped; for instance,

{{spark-submit --master mesos://url:port my.jar --arg1 "a b c"}}

fails, and instead must be submitted as
{{spark-submit --master mesos://url:port my.jar --arg1 "a{noformat}\\{noformat} 
b\\ c"}}


> spark-submit does not properly escape arguments sent to Mesos dispatcher
> 
>
> Key: SPARK-27015
> URL: https://issues.apache.org/jira/browse/SPARK-27015
> Project: Spark
>  Issue Type: New Feature
>  Components: Mesos
>Affects Versions: 2.3.3, 2.4.0
>Reporter: Martin Loncaric
>Priority: Major
>
> Arguments sent to the dispatcher must be escaped; for instance,
> {{spark-submit --master mesos://url:port my.jar --arg1 "a b c"}}
> fails, and instead must be submitted as
> {noformat}spark-submit --master mesos://url:port my.jar --arg1 "a\\ b\\ 
> c"{noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27015) spark-submit does not properly escape arguments sent to Mesos dispatcher

2019-02-28 Thread Martin Loncaric (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martin Loncaric updated SPARK-27015:

Description: 
Arguments sent to the dispatcher must be escaped; for instance,

{{spark-submit --master mesos://url:port my.jar --arg1 "a b c"}}

fails, and instead must be submitted as
{{spark-submit --master mesos://url:port my.jar --arg1 "a#092; b\\ c"}}

  was:
Arguments sent to the dispatcher must be escaped; for instance,

{{spark-submit --master mesos://url:port my.jar --arg1 "a b c"}}

fails, and instead must be submitted as
{{spark-submit --master mesos://url:port my.jar --arg1 "a{\\} b\\ c"}}


> spark-submit does not properly escape arguments sent to Mesos dispatcher
> 
>
> Key: SPARK-27015
> URL: https://issues.apache.org/jira/browse/SPARK-27015
> Project: Spark
>  Issue Type: New Feature
>  Components: Mesos
>Affects Versions: 2.3.3, 2.4.0
>Reporter: Martin Loncaric
>Priority: Major
>
> Arguments sent to the dispatcher must be escaped; for instance,
> {{spark-submit --master mesos://url:port my.jar --arg1 "a b c"}}
> fails, and instead must be submitted as
> {{spark-submit --master mesos://url:port my.jar --arg1 "a#092; b\\ c"}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27015) spark-submit does not properly escape arguments sent to Mesos dispatcher

2019-02-28 Thread Martin Loncaric (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martin Loncaric updated SPARK-27015:

Description: 
Arguments sent to the dispatcher must be escaped; for instance,

{{spark-submit --master mesos://url:port my.jar --arg1 "a b c"}}

fails, and instead must be submitted as
{{spark-submit --master mesos://url:port my.jar --arg1 "a{\\} b\\ c"}}

  was:
Arguments sent to the dispatcher must be escaped; for instance,

{{spark-submit --master mesos://url:port my.jar --arg1 "a b c"}}

fails, and instead must be submitted as
{{spark-submit --master mesos://url:port my.jar --arg1 "a{nolink:} b\\ c"}}


> spark-submit does not properly escape arguments sent to Mesos dispatcher
> 
>
> Key: SPARK-27015
> URL: https://issues.apache.org/jira/browse/SPARK-27015
> Project: Spark
>  Issue Type: New Feature
>  Components: Mesos
>Affects Versions: 2.3.3, 2.4.0
>Reporter: Martin Loncaric
>Priority: Major
>
> Arguments sent to the dispatcher must be escaped; for instance,
> {{spark-submit --master mesos://url:port my.jar --arg1 "a b c"}}
> fails, and instead must be submitted as
> {{spark-submit --master mesos://url:port my.jar --arg1 "a{\\} b\\ c"}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27015) spark-submit does not properly escape arguments sent to Mesos dispatcher

2019-02-28 Thread Martin Loncaric (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martin Loncaric updated SPARK-27015:

Description: 
Arguments sent to the dispatcher must be escaped; for instance,

{{spark-submit --master mesos://url:port my.jar --arg1 "a b c"}}

fails, and instead must be submitted as
spark-submit --master mesos://url:port my.jar --arg1 "a\\ b\\ c"

  was:
Arguments sent to the dispatcher must be escaped; for instance,

{{spark-submit --master mesos://url:port my.jar --arg1 "a b c"}}

fails, and instead must be submitted as
{{spark-submit --master mesos://url:port my.jar --arg1 "a\\ b\\ c"}}


> spark-submit does not properly escape arguments sent to Mesos dispatcher
> 
>
> Key: SPARK-27015
> URL: https://issues.apache.org/jira/browse/SPARK-27015
> Project: Spark
>  Issue Type: New Feature
>  Components: Mesos
>Affects Versions: 2.3.3, 2.4.0
>Reporter: Martin Loncaric
>Priority: Major
>
> Arguments sent to the dispatcher must be escaped; for instance,
> {{spark-submit --master mesos://url:port my.jar --arg1 "a b c"}}
> fails, and instead must be submitted as
> spark-submit --master mesos://url:port my.jar --arg1 "a\\ b\\ c"



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27015) spark-submit does not properly escape arguments sent to Mesos dispatcher

2019-02-28 Thread Martin Loncaric (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martin Loncaric updated SPARK-27015:

Description: 
Arguments sent to the dispatcher must be escaped; for instance,

{{spark-submit --master mesos://url:port my.jar --arg1 "a b c"}}

fails, and instead must be submitted as
{{spark-submit --master mesos://url:port my.jar --arg1 "a\\ b\\ c"}}

  was:
Arguments sent to the dispatcher must be escaped; for instance,

{{spark-submit --master mesos://url:port my.jar --arg1 "a b c"}}

fails, and instead must be submitted as
{{spark-submit --master mesos://url:port my.jar --arg1 "a\\\ b\\ c"}}


> spark-submit does not properly escape arguments sent to Mesos dispatcher
> 
>
> Key: SPARK-27015
> URL: https://issues.apache.org/jira/browse/SPARK-27015
> Project: Spark
>  Issue Type: New Feature
>  Components: Mesos
>Affects Versions: 2.3.3, 2.4.0
>Reporter: Martin Loncaric
>Priority: Major
>
> Arguments sent to the dispatcher must be escaped; for instance,
> {{spark-submit --master mesos://url:port my.jar --arg1 "a b c"}}
> fails, and instead must be submitted as
> {{spark-submit --master mesos://url:port my.jar --arg1 "a\\ b\\ c"}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27014) Support removal of jars and Spark binaries from Mesos driver and executor sandboxes

2019-02-28 Thread Martin Loncaric (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27014?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martin Loncaric updated SPARK-27014:

Fix Version/s: 3.0.0
   2.5.0

> Support removal of jars and Spark binaries from Mesos driver and executor 
> sandboxes
> ---
>
> Key: SPARK-27014
> URL: https://issues.apache.org/jira/browse/SPARK-27014
> Project: Spark
>  Issue Type: New Feature
>  Components: Mesos
>Affects Versions: 2.5.0
>Reporter: Martin Loncaric
>Priority: Major
> Fix For: 2.5.0, 3.0.0
>
>
> Currently, each Spark application run on Mesos leaves behind at least 500MB 
> of data in sandbox directories, coming from Spark binaries and copied URIs. 
> These can build up as a disk leak, causing major issues on Mesos clusters 
> unless their grace period for sandbox directories is very short.
> Spark should have a feature to delete these (from both driver and executor 
> sandboxes) on teardown.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27015) spark-submit does not properly escape arguments sent to Mesos dispatcher

2019-02-28 Thread Martin Loncaric (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martin Loncaric updated SPARK-27015:

Description: 
Arguments sent to the dispatcher must be escaped; for instance,

`spark-submit --master mesos://url:port my.jar --arg1 "a b c"`

fails, and instead must be submitted as

`spark-submit --master mesos://url:port my.jar --arg1 "a\\ b\\ c"`

  was:
Arguments sent to the dispatcher must be escaped; for instance,
```
spark-submit --master mesos://url:port my.jar --arg1 "a b c"
```
fails, and instead must be submitted as
```
spark-submit --master mesos://url:port my.jar --arg1 "a\\ b\\ c"
```


> spark-submit does not properly escape arguments sent to Mesos dispatcher
> 
>
> Key: SPARK-27015
> URL: https://issues.apache.org/jira/browse/SPARK-27015
> Project: Spark
>  Issue Type: New Feature
>  Components: Mesos
>Affects Versions: 2.3.3, 2.4.0
>Reporter: Martin Loncaric
>Priority: Major
>
> Arguments sent to the dispatcher must be escaped; for instance,
> `spark-submit --master mesos://url:port my.jar --arg1 "a b c"`
> fails, and instead must be submitted as
> `spark-submit --master mesos://url:port my.jar --arg1 "a\\ b\\ c"`



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27015) spark-submit does not properly escape arguments sent to Mesos dispatcher

2019-02-28 Thread Martin Loncaric (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martin Loncaric updated SPARK-27015:

Description: 
Arguments sent to the dispatcher must be escaped; for instance,

{{spark-submit --master mesos://url:port my.jar --arg1 "a b c"}}

fails, and instead must be submitted as
{{spark-submit --master mesos://url:port my.jar --arg1 "a\\ b\\ c"}}

  was:
Arguments sent to the dispatcher must be escaped; for instance,

`spark-submit --master mesos://url:port my.jar --arg1 "a b c"`

fails, and instead must be submitted as

`spark-submit --master mesos://url:port my.jar --arg1 "a\\ b\\ c"`


> spark-submit does not properly escape arguments sent to Mesos dispatcher
> 
>
> Key: SPARK-27015
> URL: https://issues.apache.org/jira/browse/SPARK-27015
> Project: Spark
>  Issue Type: New Feature
>  Components: Mesos
>Affects Versions: 2.3.3, 2.4.0
>Reporter: Martin Loncaric
>Priority: Major
>
> Arguments sent to the dispatcher must be escaped; for instance,
> {{spark-submit --master mesos://url:port my.jar --arg1 "a b c"}}
> fails, and instead must be submitted as
> {{spark-submit --master mesos://url:port my.jar --arg1 "a\\ b\\ c"}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-27015) spark-submit does not properly escape arguments sent to Mesos dispatcher

2019-02-28 Thread Martin Loncaric (JIRA)
Martin Loncaric created SPARK-27015:
---

 Summary: spark-submit does not properly escape arguments sent to 
Mesos dispatcher
 Key: SPARK-27015
 URL: https://issues.apache.org/jira/browse/SPARK-27015
 Project: Spark
  Issue Type: New Feature
  Components: Mesos
Affects Versions: 2.4.0, 2.3.3
Reporter: Martin Loncaric


Arguments sent to the dispatcher must be escaped; for instance,
```
spark-submit --master mesos://url:port my.jar --arg1 "a b c"
```
fails, and instead must be submitted as
```
spark-submit --master mesos://url:port my.jar --arg1 "a\\ b\\ c"
```



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26192) MesosClusterScheduler reads options from dispatcher conf instead of submission conf

2019-02-28 Thread Martin Loncaric (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26192?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martin Loncaric updated SPARK-26192:

Fix Version/s: 3.0.0
   2.4.1
   2.3.4

> MesosClusterScheduler reads options from dispatcher conf instead of 
> submission conf
> ---
>
> Key: SPARK-26192
> URL: https://issues.apache.org/jira/browse/SPARK-26192
> Project: Spark
>  Issue Type: Bug
>  Components: Mesos
>Affects Versions: 2.3.0, 2.3.1, 2.3.2, 2.4.0
>Reporter: Martin Loncaric
>Priority: Major
> Fix For: 2.3.4, 2.4.1, 3.0.0
>
>
> There is at least one option accessed in MesosClusterScheduler that should 
> come from the submission's configuration instead of the dispatcher's:
> spark.mesos.fetcherCache.enable
> Coincidentally, the spark.mesos.fetcherCache.enable option was previously 
> misnamed, as referenced in the linked JIRA.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27014) Support removal of jars and Spark binaries from Mesos driver and executor sandboxes

2019-02-28 Thread Martin Loncaric (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27014?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martin Loncaric updated SPARK-27014:

Affects Version/s: (was: 2.4.0)
   2.5.0

> Support removal of jars and Spark binaries from Mesos driver and executor 
> sandboxes
> ---
>
> Key: SPARK-27014
> URL: https://issues.apache.org/jira/browse/SPARK-27014
> Project: Spark
>  Issue Type: New Feature
>  Components: Mesos
>Affects Versions: 2.5.0
>Reporter: Martin Loncaric
>Priority: Major
>
> Currently, each Spark application run on Mesos leaves behind at least 500MB 
> of data in sandbox directories, coming from Spark binaries and copied URIs. 
> These can build up as a disk leak, causing major issues on Mesos clusters 
> unless their grace period for sandbox directories is very short.
> Spark should have a feature to delete these (from both driver and executor 
> sandboxes) on teardown.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-27014) Support removal of jars and Spark binaries from Mesos driver and executor sandboxes

2019-02-28 Thread Martin Loncaric (JIRA)
Martin Loncaric created SPARK-27014:
---

 Summary: Support removal of jars and Spark binaries from Mesos 
driver and executor sandboxes
 Key: SPARK-27014
 URL: https://issues.apache.org/jira/browse/SPARK-27014
 Project: Spark
  Issue Type: New Feature
  Components: Mesos
Affects Versions: 2.4.0
Reporter: Martin Loncaric


Currently, each Spark application run on Mesos leaves behind at least 500MB of 
data in sandbox directories, coming from Spark binaries and copied URIs. These 
can build up as a disk leak, causing major issues on Mesos clusters unless 
their grace period for sandbox directories is very short.

Spark should have a feature to delete these (from both driver and executor 
sandboxes) on teardown.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26192) MesosClusterScheduler reads options from dispatcher conf instead of submission conf

2019-02-28 Thread Martin Loncaric (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26192?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martin Loncaric updated SPARK-26192:

Description: 
There is at least one option accessed in MesosClusterScheduler that should come 
from the submission's configuration instead of the dispatcher's:

spark.mesos.fetcherCache.enable

Coincidentally, the spark.mesos.fetcherCache.enable option was previously 
misnamed, as referenced in the linked JIRA.

  was:
There are at least two options accessed in MesosClusterScheduler that should 
come from the submission's configuration instead of the dispatcher's:

spark.app.name
spark.mesos.fetcherCache.enable

This means that all Mesos tasks for Spark drivers have uninformative names of 
the form "Driver for (MainClass)" rather than the configured application name, 
and Spark drivers never cache files unless the caching setting is specified on 
dispatcher as well. Coincidentally, the spark.mesos.fetchCache.enable option 
was misnamed, as referenced in the linked JIRA.


> MesosClusterScheduler reads options from dispatcher conf instead of 
> submission conf
> ---
>
> Key: SPARK-26192
> URL: https://issues.apache.org/jira/browse/SPARK-26192
> Project: Spark
>  Issue Type: Bug
>  Components: Mesos
>Affects Versions: 2.3.0, 2.3.1, 2.3.2, 2.4.0
>Reporter: Martin Loncaric
>Priority: Major
>
> There is at least one option accessed in MesosClusterScheduler that should 
> come from the submission's configuration instead of the dispatcher's:
> spark.mesos.fetcherCache.enable
> Coincidentally, the spark.mesos.fetcherCache.enable option was previously 
> misnamed, as referenced in the linked JIRA.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26192) MesosClusterScheduler reads options from dispatcher conf instead of submission conf

2019-02-27 Thread Martin Loncaric (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26192?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martin Loncaric updated SPARK-26192:

Description: 
There are at least two options accessed in MesosClusterScheduler that should 
come from the submission's configuration instead of the dispatcher's:

spark.app.name
spark.mesos.fetcherCache.enable

This means that all Mesos tasks for Spark drivers have uninformative names of 
the form "Driver for (MainClass)" rather than the configured application name, 
and Spark drivers never cache files unless the caching setting is specified on 
dispatcher as well. Coincidentally, the spark.mesos.fetchCache.enable option 
was misnamed, as referenced in the linked JIRA.

  was:
There are at least two options accessed in MesosClusterScheduler that should 
come from the submission's configuration instead of the dispatcher's:

spark.app.name
spark.mesos.fetchCache.enable

This means that all Mesos tasks for Spark drivers have uninformative names of 
the form "Driver for (MainClass)" rather than the configured application name, 
and Spark drivers never cache files unless the caching setting is specified on 
dispatcher as well. Coincidentally, the spark.mesos.fetchCache.enable option 
was misnamed, as referenced in the linked JIRA.


> MesosClusterScheduler reads options from dispatcher conf instead of 
> submission conf
> ---
>
> Key: SPARK-26192
> URL: https://issues.apache.org/jira/browse/SPARK-26192
> Project: Spark
>  Issue Type: Bug
>  Components: Mesos
>Affects Versions: 2.3.0, 2.3.1, 2.3.2, 2.4.0
>Reporter: Martin Loncaric
>Priority: Major
>
> There are at least two options accessed in MesosClusterScheduler that should 
> come from the submission's configuration instead of the dispatcher's:
> spark.app.name
> spark.mesos.fetcherCache.enable
> This means that all Mesos tasks for Spark drivers have uninformative names of 
> the form "Driver for (MainClass)" rather than the configured application 
> name, and Spark drivers never cache files unless the caching setting is 
> specified on dispatcher as well. Coincidentally, the 
> spark.mesos.fetchCache.enable option was misnamed, as referenced in the 
> linked JIRA.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26192) MesosClusterScheduler reads options from dispatcher conf instead of submission conf

2019-02-07 Thread Martin Loncaric (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26192?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martin Loncaric updated SPARK-26192:

Description: 
There are at least two options accessed in MesosClusterScheduler that should 
come from the submission's configuration instead of the dispatcher's:

spark.app.name
spark.mesos.fetchCache.enable

This means that all Mesos tasks for Spark drivers have uninformative names of 
the form "Driver for (MainClass)" rather than the configured application name, 
and Spark drivers never cache files unless the caching setting is specified on 
dispatcher as well. Coincidentally, the spark.mesos.fetchCache.enable option 
was misnamed, as referenced in the linked JIRA.

  was:
There are at least two options accessed in MesosClusterScheduler that should 
come from the submission's configuration instead of the dispatcher's:

spark.app.name
spark.mesos.fetchCache.enable

This means that all Mesos tasks for Spark drivers have uninformative names of 
the form "Driver for (MainClass)" rather than the configured application name, 
and Spark drivers never cache files. Coincidentally, the 
spark.mesos.fetchCache.enable option is misnamed, as referenced in the linked 
JIRA.


> MesosClusterScheduler reads options from dispatcher conf instead of 
> submission conf
> ---
>
> Key: SPARK-26192
> URL: https://issues.apache.org/jira/browse/SPARK-26192
> Project: Spark
>  Issue Type: Bug
>  Components: Mesos
>Affects Versions: 2.3.0, 2.3.1, 2.3.2, 2.4.0
>Reporter: Martin Loncaric
>Priority: Major
>
> There are at least two options accessed in MesosClusterScheduler that should 
> come from the submission's configuration instead of the dispatcher's:
> spark.app.name
> spark.mesos.fetchCache.enable
> This means that all Mesos tasks for Spark drivers have uninformative names of 
> the form "Driver for (MainClass)" rather than the configured application 
> name, and Spark drivers never cache files unless the caching setting is 
> specified on dispatcher as well. Coincidentally, the 
> spark.mesos.fetchCache.enable option was misnamed, as referenced in the 
> linked JIRA.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26555) Thread safety issue causes createDataset to fail with misleading errors

2019-01-06 Thread Martin Loncaric (JIRA)
Martin Loncaric created SPARK-26555:
---

 Summary: Thread safety issue causes createDataset to fail with 
misleading errors
 Key: SPARK-26555
 URL: https://issues.apache.org/jira/browse/SPARK-26555
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.4.0
Reporter: Martin Loncaric


This can be replicated (~2% of the time) with

{code:scala}
import java.sql.Timestamp
import java.util.concurrent.{Executors, Future}

import org.apache.spark.sql.SparkSession

import scala.collection.mutable.ListBuffer
import scala.concurrent.ExecutionContext
import scala.util.Random

object Main {
  def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder
  .getOrCreate()
import sparkSession.implicits._

val executor = Executors.newFixedThreadPool(1)
try {
  implicit val xc: ExecutionContext = 
ExecutionContext.fromExecutorService(executor)
  val futures = new ListBuffer[Future[_]]()

  for (i <- 1 to 3) {
futures += executor.submit(new Runnable {
  override def run(): Unit = {
val d = if (Random.nextInt(2) == 0) Some("d value") else None
val e = if (Random.nextInt(2) == 0) Some(5.0) else None
val f = if (Random.nextInt(2) == 0) Some(6.0) else None
println("DEBUG", d, e, f)
sparkSession.createDataset(Seq(
  MyClass(new Timestamp(1L), "b", "c", d, e, f)
))
  }
})
  }

  futures.foreach(_.get())
} finally {
  println("SHUTDOWN")
  executor.shutdown()
  sparkSession.stop()
}
  }

  case class MyClass(
a: Timestamp,
b: String,
c: String,
d: Option[String],
e: Option[Double],
f: Option[Double]
  )
}
{code}

causing a variety of possible errors, such as

{{Exception in thread "main" java.util.concurrent.ExecutionException: 
scala.MatchError: scala.Option[String] (of class 
scala.reflect.internal.Types$ClassArgsTypeRef)
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
Caused by: scala.MatchError: scala.Option[String] (of class 
scala.reflect.internal.Types$ClassArgsTypeRef)
at 
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1.apply(ScalaReflection.scala:210)}}

or

{{Exception in thread "main" java.util.concurrent.ExecutionException: 
java.lang.UnsupportedOperationException: Schema for type 
scala.Option[scala.Double] is not supported
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
Caused by: java.lang.UnsupportedOperationException: Schema for type 
scala.Option[scala.Double] is not supported
at 
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:789)}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26555) Thread safety issue causes createDataset to fail with misleading errors

2019-01-06 Thread Martin Loncaric (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martin Loncaric updated SPARK-26555:

Description: 
This can be replicated (~2% of the time) with

{code:scala}
import java.sql.Timestamp
import java.util.concurrent.{Executors, Future}

import org.apache.spark.sql.SparkSession

import scala.collection.mutable.ListBuffer
import scala.concurrent.ExecutionContext
import scala.util.Random

object Main {
  def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder
  .getOrCreate()
import sparkSession.implicits._

val executor = Executors.newFixedThreadPool(1)
try {
  implicit val xc: ExecutionContext = 
ExecutionContext.fromExecutorService(executor)
  val futures = new ListBuffer[Future[_]]()

  for (i <- 1 to 3) {
futures += executor.submit(new Runnable {
  override def run(): Unit = {
val d = if (Random.nextInt(2) == 0) Some("d value") else None
val e = if (Random.nextInt(2) == 0) Some(5.0) else None
val f = if (Random.nextInt(2) == 0) Some(6.0) else None
println("DEBUG", d, e, f)
sparkSession.createDataset(Seq(
  MyClass(new Timestamp(1L), "b", "c", d, e, f)
))
  }
})
  }

  futures.foreach(_.get())
} finally {
  println("SHUTDOWN")
  executor.shutdown()
  sparkSession.stop()
}
  }

  case class MyClass(
a: Timestamp,
b: String,
c: String,
d: Option[String],
e: Option[Double],
f: Option[Double]
  )
}
{code}

So it will usually come up during

{code:bash}
for i in $(seq 1 200); do
  echo $i
  spark-submit --master local[4] target/scala-2.11/spark-test_2.11-0.1.jar
done
{code}

causing a variety of possible errors, such as

{code}Exception in thread "main" java.util.concurrent.ExecutionException: 
scala.MatchError: scala.Option[String] (of class 
scala.reflect.internal.Types$ClassArgsTypeRef)
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
Caused by: scala.MatchError: scala.Option[String] (of class 
scala.reflect.internal.Types$ClassArgsTypeRef)
at 
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1.apply(ScalaReflection.scala:210){code}

or

{code}Exception in thread "main" java.util.concurrent.ExecutionException: 
java.lang.UnsupportedOperationException: Schema for type 
scala.Option[scala.Double] is not supported
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
Caused by: java.lang.UnsupportedOperationException: Schema for type 
scala.Option[scala.Double] is not supported
at 
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:789){code}

  was:
This can be replicated (~2% of the time) with

{code:scala}
import java.sql.Timestamp
import java.util.concurrent.{Executors, Future}

import org.apache.spark.sql.SparkSession

import scala.collection.mutable.ListBuffer
import scala.concurrent.ExecutionContext
import scala.util.Random

object Main {
  def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder
  .getOrCreate()
import sparkSession.implicits._

val executor = Executors.newFixedThreadPool(1)
try {
  implicit val xc: ExecutionContext = 
ExecutionContext.fromExecutorService(executor)
  val futures = new ListBuffer[Future[_]]()

  for (i <- 1 to 3) {
futures += executor.submit(new Runnable {
  override def run(): Unit = {
val d = if (Random.nextInt(2) == 0) Some("d value") else None
val e = if (Random.nextInt(2) == 0) Some(5.0) else None
val f = if (Random.nextInt(2) == 0) Some(6.0) else None
println("DEBUG", d, e, f)
sparkSession.createDataset(Seq(
  MyClass(new Timestamp(1L), "b", "c", d, e, f)
))
  }
})
  }

  futures.foreach(_.get())
} finally {
  println("SHUTDOWN")
  executor.shutdown()
  sparkSession.stop()
}
  }

  case class MyClass(
a: Timestamp,
b: String,
c: String,
d: Option[String],
e: Option[Double],
f: Option[Double]
  )
}
{code}

{code:bash}
for i in $(seq 1 200); do
  echo $i
  spark-submit --master local[4] target/scala-2.11/spark-test_2.11-0.1.jar
done
{code}

causing a variety of possible errors, such as

{code}Exception in thread "main" java.util.concurrent.ExecutionException: 
scala.MatchError: scala.Option[String] (of class 
scala.reflect.internal.Types$ClassArgsTypeRef)
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
Caused by: scala.MatchError: scala.Option[String] (of class 
scala.reflect.internal.Types$ClassArgsTypeRef)
at 

[jira] [Updated] (SPARK-26555) Thread safety issue causes createDataset to fail with misleading errors

2019-01-06 Thread Martin Loncaric (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martin Loncaric updated SPARK-26555:

Description: 
This can be replicated (~2% of the time) with

{code:scala}
import java.sql.Timestamp
import java.util.concurrent.{Executors, Future}

import org.apache.spark.sql.SparkSession

import scala.collection.mutable.ListBuffer
import scala.concurrent.ExecutionContext
import scala.util.Random

object Main {
  def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder
  .getOrCreate()
import sparkSession.implicits._

val executor = Executors.newFixedThreadPool(1)
try {
  implicit val xc: ExecutionContext = 
ExecutionContext.fromExecutorService(executor)
  val futures = new ListBuffer[Future[_]]()

  for (i <- 1 to 3) {
futures += executor.submit(new Runnable {
  override def run(): Unit = {
val d = if (Random.nextInt(2) == 0) Some("d value") else None
val e = if (Random.nextInt(2) == 0) Some(5.0) else None
val f = if (Random.nextInt(2) == 0) Some(6.0) else None
println("DEBUG", d, e, f)
sparkSession.createDataset(Seq(
  MyClass(new Timestamp(1L), "b", "c", d, e, f)
))
  }
})
  }

  futures.foreach(_.get())
} finally {
  println("SHUTDOWN")
  executor.shutdown()
  sparkSession.stop()
}
  }

  case class MyClass(
a: Timestamp,
b: String,
c: String,
d: Option[String],
e: Option[Double],
f: Option[Double]
  )
}
{code}

{code:bash}
for i in $(seq 1 200); do
  echo $i
  spark-submit --master local[4] target/scala-2.11/spark-test_2.11-0.1.jar
done
{code}

causing a variety of possible errors, such as

{code}Exception in thread "main" java.util.concurrent.ExecutionException: 
scala.MatchError: scala.Option[String] (of class 
scala.reflect.internal.Types$ClassArgsTypeRef)
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
Caused by: scala.MatchError: scala.Option[String] (of class 
scala.reflect.internal.Types$ClassArgsTypeRef)
at 
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1.apply(ScalaReflection.scala:210){code}

or

{code}Exception in thread "main" java.util.concurrent.ExecutionException: 
java.lang.UnsupportedOperationException: Schema for type 
scala.Option[scala.Double] is not supported
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
Caused by: java.lang.UnsupportedOperationException: Schema for type 
scala.Option[scala.Double] is not supported
at 
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:789){code}

  was:
This can be replicated (~2% of the time) with

{code:scala}
import java.sql.Timestamp
import java.util.concurrent.{Executors, Future}

import org.apache.spark.sql.SparkSession

import scala.collection.mutable.ListBuffer
import scala.concurrent.ExecutionContext
import scala.util.Random

object Main {
  def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder
  .getOrCreate()
import sparkSession.implicits._

val executor = Executors.newFixedThreadPool(1)
try {
  implicit val xc: ExecutionContext = 
ExecutionContext.fromExecutorService(executor)
  val futures = new ListBuffer[Future[_]]()

  for (i <- 1 to 3) {
futures += executor.submit(new Runnable {
  override def run(): Unit = {
val d = if (Random.nextInt(2) == 0) Some("d value") else None
val e = if (Random.nextInt(2) == 0) Some(5.0) else None
val f = if (Random.nextInt(2) == 0) Some(6.0) else None
println("DEBUG", d, e, f)
sparkSession.createDataset(Seq(
  MyClass(new Timestamp(1L), "b", "c", d, e, f)
))
  }
})
  }

  futures.foreach(_.get())
} finally {
  println("SHUTDOWN")
  executor.shutdown()
  sparkSession.stop()
}
  }

  case class MyClass(
a: Timestamp,
b: String,
c: String,
d: Option[String],
e: Option[Double],
f: Option[Double]
  )
}
{code}

causing a variety of possible errors, such as

{code}Exception in thread "main" java.util.concurrent.ExecutionException: 
scala.MatchError: scala.Option[String] (of class 
scala.reflect.internal.Types$ClassArgsTypeRef)
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
Caused by: scala.MatchError: scala.Option[String] (of class 
scala.reflect.internal.Types$ClassArgsTypeRef)
at 
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1.apply(ScalaReflection.scala:210){code}

or

{code}Exception in thread "main" java.util.concurrent.ExecutionException: 

[jira] [Updated] (SPARK-26555) Thread safety issue causes createDataset to fail with misleading errors

2019-01-06 Thread Martin Loncaric (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martin Loncaric updated SPARK-26555:

Description: 
This can be replicated (~2% of the time) with

{code:scala}
import java.sql.Timestamp
import java.util.concurrent.{Executors, Future}

import org.apache.spark.sql.SparkSession

import scala.collection.mutable.ListBuffer
import scala.concurrent.ExecutionContext
import scala.util.Random

object Main {
  def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder
  .getOrCreate()
import sparkSession.implicits._

val executor = Executors.newFixedThreadPool(1)
try {
  implicit val xc: ExecutionContext = 
ExecutionContext.fromExecutorService(executor)
  val futures = new ListBuffer[Future[_]]()

  for (i <- 1 to 3) {
futures += executor.submit(new Runnable {
  override def run(): Unit = {
val d = if (Random.nextInt(2) == 0) Some("d value") else None
val e = if (Random.nextInt(2) == 0) Some(5.0) else None
val f = if (Random.nextInt(2) == 0) Some(6.0) else None
println("DEBUG", d, e, f)
sparkSession.createDataset(Seq(
  MyClass(new Timestamp(1L), "b", "c", d, e, f)
))
  }
})
  }

  futures.foreach(_.get())
} finally {
  println("SHUTDOWN")
  executor.shutdown()
  sparkSession.stop()
}
  }

  case class MyClass(
a: Timestamp,
b: String,
c: String,
d: Option[String],
e: Option[Double],
f: Option[Double]
  )
}
{code}

causing a variety of possible errors, such as

{code}Exception in thread "main" java.util.concurrent.ExecutionException: 
scala.MatchError: scala.Option[String] (of class 
scala.reflect.internal.Types$ClassArgsTypeRef)
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
Caused by: scala.MatchError: scala.Option[String] (of class 
scala.reflect.internal.Types$ClassArgsTypeRef)
at 
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1.apply(ScalaReflection.scala:210){code}

or

{code}Exception in thread "main" java.util.concurrent.ExecutionException: 
java.lang.UnsupportedOperationException: Schema for type 
scala.Option[scala.Double] is not supported
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
Caused by: java.lang.UnsupportedOperationException: Schema for type 
scala.Option[scala.Double] is not supported
at 
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:789){code}

  was:
This can be replicated (~2% of the time) with

{code:scala}
import java.sql.Timestamp
import java.util.concurrent.{Executors, Future}

import org.apache.spark.sql.SparkSession

import scala.collection.mutable.ListBuffer
import scala.concurrent.ExecutionContext
import scala.util.Random

object Main {
  def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder
  .getOrCreate()
import sparkSession.implicits._

val executor = Executors.newFixedThreadPool(1)
try {
  implicit val xc: ExecutionContext = 
ExecutionContext.fromExecutorService(executor)
  val futures = new ListBuffer[Future[_]]()

  for (i <- 1 to 3) {
futures += executor.submit(new Runnable {
  override def run(): Unit = {
val d = if (Random.nextInt(2) == 0) Some("d value") else None
val e = if (Random.nextInt(2) == 0) Some(5.0) else None
val f = if (Random.nextInt(2) == 0) Some(6.0) else None
println("DEBUG", d, e, f)
sparkSession.createDataset(Seq(
  MyClass(new Timestamp(1L), "b", "c", d, e, f)
))
  }
})
  }

  futures.foreach(_.get())
} finally {
  println("SHUTDOWN")
  executor.shutdown()
  sparkSession.stop()
}
  }

  case class MyClass(
a: Timestamp,
b: String,
c: String,
d: Option[String],
e: Option[Double],
f: Option[Double]
  )
}
{code}

causing a variety of possible errors, such as

{{Exception in thread "main" java.util.concurrent.ExecutionException: 
scala.MatchError: scala.Option[String] (of class 
scala.reflect.internal.Types$ClassArgsTypeRef)
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
Caused by: scala.MatchError: scala.Option[String] (of class 
scala.reflect.internal.Types$ClassArgsTypeRef)
at 
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1.apply(ScalaReflection.scala:210)}}

or

{{Exception in thread "main" java.util.concurrent.ExecutionException: 
java.lang.UnsupportedOperationException: Schema for type 
scala.Option[scala.Double] is not supported
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
Caused by: 

[jira] [Created] (SPARK-26192) MesosClusterScheduler reads options from dispatcher conf instead of submission conf

2018-11-27 Thread Martin Loncaric (JIRA)
Martin Loncaric created SPARK-26192:
---

 Summary: MesosClusterScheduler reads options from dispatcher conf 
instead of submission conf
 Key: SPARK-26192
 URL: https://issues.apache.org/jira/browse/SPARK-26192
 Project: Spark
  Issue Type: Bug
  Components: Mesos
Affects Versions: 2.4.0, 2.3.2, 2.3.1, 2.3.0
Reporter: Martin Loncaric


There are at least two options accessed in MesosClusterScheduler that should 
come from the submission's configuration instead of the dispatcher's:

spark.app.name
spark.mesos.fetchCache.enable

This means that all Mesos tasks for Spark drivers have uninformative names of 
the form "Driver for (MainClass)" rather than the configured application name, 
and Spark drivers never cache files. Coincidentally, the 
spark.mesos.fetchCache.enable option is misnamed, as referenced in the linked 
JIRA.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26082) Misnaming of spark.mesos.fetch(er)Cache.enable in MesosClusterScheduler

2018-11-15 Thread Martin Loncaric (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26082?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martin Loncaric updated SPARK-26082:

Description: 
Currently in [docs|https://spark.apache.org/docs/latest/running-on-mesos.html]:
{quote}spark.mesos.fetcherCache.enable / false / If set to `true`, all URIs 
(example: `spark.executor.uri`, `spark.mesos.uris`) will be cached by the Mesos 
Fetcher Cache
{quote}

Currently in {{MesosClusterScheduler.scala}} (which passes parameter to driver):
{{private val useFetchCache = conf.getBoolean("spark.mesos.fetchCache.enable", 
false)}}

Currently in {{MesosCourseGrainedSchedulerBackend.scala}} (which passes mesos 
caching parameter to executors):
{{private val useFetcherCache = 
conf.getBoolean("spark.mesos.fetcherCache.enable", false)}}

This naming discrepancy dates back to version 2.0.0 
([jira|http://mail-archives.apache.org/mod_mbox/spark-issues/201606.mbox/%3cjira.12979909.1466099309000.9921.1466101026...@atlassian.jira%3E]).

This means that when {{spark.mesos.fetcherCache.enable=true}} is specified, the 
Mesos cache will be used only for executors, and not for drivers.

IMPACT:
Not caching these driver files (typically including at least spark binaries, 
custom jar, and additional dependencies) adds considerable overhead network 
traffic and startup time when frequently running spark Applications on a Mesos 
cluster. Additionally, since extracted files like {{spark-x.x.x-bin-*.tgz}} are 
additionally copied and left in the sandbox with the cache off (rather than 
extracted directly without an extra copy), this can considerably increase disk 
usage. Users CAN currently workaround by specifying the 
{{spark.mesos.fetchCache.enable}} option, but this should at least be specified 
in the documentation.

SUGGESTED FIX:
Add {{spark.mesos.fetchCache.enable}} to the documentation for versions 2 - 
2.4, and update {{MesosClusterScheduler.scala}} to use 
{{spark.mesos.fetcherCache.enable}} going forward (literally a one-line change).

  was:
Currently in [docs|https://spark.apache.org/docs/latest/running-on-mesos.html]:
{quote}spark.mesos.fetcherCache.enable / false / If set to `true`, all URIs 
(example: `spark.executor.uri`, `spark.mesos.uris`) will be cached by the Mesos 
Fetcher Cache
{quote}

Currently in {{MesosClusterScheduler.scala}} (which passes parameter to driver):
{{private val useFetchCache = conf.getBoolean("spark.mesos.fetchCache.enable", 
false)}}

Currently in {{MesosCourseGrainedSchedulerBackend.scala}} (which passes mesos 
caching parameter to executors):
{{private val useFetcherCache = 
conf.getBoolean("spark.mesos.fetcherCache.enable", false)}}

This naming discrepancy dates back to version 2.0.0 
([jira|http://mail-archives.apache.org/mod_mbox/spark-issues/201606.mbox/%3cjira.12979909.1466099309000.9921.1466101026...@atlassian.jira%3E]).

This means that when {{spark.mesos.fetcherCache.enable=true}} is specified, the 
Mesos cache will be used only for executors, and not for drivers.

IMPACT:
Not caching these driver files (typically including at least spark binaries, 
custom jar, and additional dependencies) adds considerable overhead network 
traffic and startup time when frequently running spark Applications on a Mesos 
cluster. Additionally, since extracted files like {{spark-x.x.x-bin-*.tgz}} are 
additionally copied and left in the sandbox with the cache off (rather than 
extracted directly without an extra copy), this can considerably increase disk 
usage. Users CAN currently workaround by specifying the 
{{spark.mesos.fetchCache.enable}} option, but this should at least be specified 
in the documentation.

SUGGESTED FIX:
Add {{spark.mesos.fetchCache.enable}} to the documentation for versions 2 - 
2.4, and update to {{spark.mesos.fetcherCache.enable}} going forward (literally 
a one-line change).


> Misnaming of spark.mesos.fetch(er)Cache.enable in MesosClusterScheduler
> ---
>
> Key: SPARK-26082
> URL: https://issues.apache.org/jira/browse/SPARK-26082
> Project: Spark
>  Issue Type: Bug
>  Components: Mesos
>Affects Versions: 2.0.0, 2.0.1, 2.0.2, 2.1.0, 2.1.1, 2.1.2, 2.1.3, 2.2.0, 
> 2.2.1, 2.2.2, 2.3.0, 2.3.1, 2.3.2
>Reporter: Martin Loncaric
>Priority: Major
>
> Currently in 
> [docs|https://spark.apache.org/docs/latest/running-on-mesos.html]:
> {quote}spark.mesos.fetcherCache.enable / false / If set to `true`, all URIs 
> (example: `spark.executor.uri`, `spark.mesos.uris`) will be cached by the 
> Mesos Fetcher Cache
> {quote}
> Currently in {{MesosClusterScheduler.scala}} (which passes parameter to 
> driver):
> {{private val useFetchCache = 
> conf.getBoolean("spark.mesos.fetchCache.enable", false)}}
> Currently in {{MesosCourseGrainedSchedulerBackend.scala}} (which passes mesos 
> caching parameter to 

[jira] [Updated] (SPARK-26082) Misnaming of spark.mesos.fetch(er)Cache.enable in MesosClusterScheduler

2018-11-15 Thread Martin Loncaric (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26082?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martin Loncaric updated SPARK-26082:

Description: 
Currently in [docs|https://spark.apache.org/docs/latest/running-on-mesos.html]:
{quote}spark.mesos.fetcherCache.enable / false / If set to `true`, all URIs 
(example: `spark.executor.uri`, `spark.mesos.uris`) will be cached by the Mesos 
Fetcher Cache
{quote}

Currently in {{MesosClusterScheduler.scala}} (which passes parameter to driver):
{{private val useFetchCache = conf.getBoolean("spark.mesos.fetchCache.enable", 
false)}}

Currently in {{MesosCourseGrainedSchedulerBackend.scala}} (which passes mesos 
caching parameter to executors):
{{private val useFetcherCache = 
conf.getBoolean("spark.mesos.fetcherCache.enable", false)}}

This naming discrepancy dates back to version 2.0.0 
([jira|http://mail-archives.apache.org/mod_mbox/spark-issues/201606.mbox/%3cjira.12979909.1466099309000.9921.1466101026...@atlassian.jira%3E]).

This means that when {{spark.mesos.fetcherCache.enable=true}} is specified, the 
Mesos cache will be used only for executors, and not for drivers.

IMPACT:
Not caching these driver files (typically including at least spark binaries, 
custom jar, and additional dependencies) adds considerable overhead network 
traffic and startup time when frequently running spark Applications on a Mesos 
cluster. Additionally, since extracted files like {{spark-x.x.x-bin-*.tgz}} are 
additionally copied and left in the sandbox with the cache off (rather than 
extracted directly without an extra copy), this can considerably increase disk 
usage. Users CAN currently workaround by specifying the 
{{spark.mesos.fetchCache.enable}} option, but this should at least be specified 
in the documentation.

SUGGESTED FIX:
Add {{spark.mesos.fetchCache.enable}} to the documentation for versions 2 - 
2.4, and update to {{spark.mesos.fetcherCache.enable}} going forward (literally 
a one-line change).

  was:
Currently in [docs|https://spark.apache.org/docs/latest/running-on-mesos.html]:
{quote}spark.mesos.fetcherCache.enable / false / If set to `true`, all URIs 
(example: `spark.executor.uri`, `spark.mesos.uris`) will be cached by the Mesos 
Fetcher Cache
{quote}

Currently in {{MesosClusterScheduler.scala}} (which passes parameter to driver):
{{private val useFetchCache = conf.getBoolean("spark.mesos.fetchCache.enable", 
false)}}

Currently in {{MesosCourseGrainedSchedulerBackend.scala}} (which passes mesos 
caching parameter to executors):
{{private val useFetcherCache = 
conf.getBoolean("spark.mesos.fetcherCache.enable", false)}}

This naming discrepancy dates back to version 2.0.0 
([jira|http://mail-archives.apache.org/mod_mbox/spark-issues/201606.mbox/%3cjira.12979909.1466099309000.9921.1466101026...@atlassian.jira%3E]).

This means that when {{spark.mesos.fetcherCache.enable=true}} is specified, the 
Mesos cache will be used only for executors, and not for drivers.

IMPACT:
Not caching these driver files (typically including at least spark binaries, 
custom jar, and additional dependencies) adds considerable overhead network 
traffic and startup time when frequently running spark Applications on a Mesos 
cluster. Additionally, since extracted files like {{spark-x.x.x-bin-*.tgz}} are 
additionally copied and left in the sandbox with the cache off (rather than 
extracted directly without an extra copy), this can considerably increase disk 
usage. Users CAN currently workaround by specifying the 
{{spark.mesos.fetchCache.enable}} option, but this should at least be specified 
in the documentation.

SUGGESTED FIX:
Add {{spark.mesos.fetchCache.enable}} to the documentation for versions 2 - 
2.4, and update to {{spark.mesos.fetcherCache.enable}} going forward.


> Misnaming of spark.mesos.fetch(er)Cache.enable in MesosClusterScheduler
> ---
>
> Key: SPARK-26082
> URL: https://issues.apache.org/jira/browse/SPARK-26082
> Project: Spark
>  Issue Type: Bug
>  Components: Mesos
>Affects Versions: 2.0.0, 2.0.1, 2.0.2, 2.1.0, 2.1.1, 2.1.2, 2.1.3, 2.2.0, 
> 2.2.1, 2.2.2, 2.3.0, 2.3.1, 2.3.2
>Reporter: Martin Loncaric
>Priority: Major
>
> Currently in 
> [docs|https://spark.apache.org/docs/latest/running-on-mesos.html]:
> {quote}spark.mesos.fetcherCache.enable / false / If set to `true`, all URIs 
> (example: `spark.executor.uri`, `spark.mesos.uris`) will be cached by the 
> Mesos Fetcher Cache
> {quote}
> Currently in {{MesosClusterScheduler.scala}} (which passes parameter to 
> driver):
> {{private val useFetchCache = 
> conf.getBoolean("spark.mesos.fetchCache.enable", false)}}
> Currently in {{MesosCourseGrainedSchedulerBackend.scala}} (which passes mesos 
> caching parameter to executors):
> {{private val useFetcherCache = 
> 

[jira] [Updated] (SPARK-26082) Misnaming of spark.mesos.fetch(er)Cache.enable in MesosClusterScheduler

2018-11-15 Thread Martin Loncaric (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26082?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martin Loncaric updated SPARK-26082:

Description: 
Currently in [docs|https://spark.apache.org/docs/latest/running-on-mesos.html]:
{quote}spark.mesos.fetcherCache.enable / false / If set to `true`, all URIs 
(example: `spark.executor.uri`, `spark.mesos.uris`) will be cached by the Mesos 
Fetcher Cache
{quote}

Currently in {{MesosClusterScheduler.scala}} (which passes parameter to driver):
{{private val useFetchCache = conf.getBoolean("spark.mesos.fetchCache.enable", 
false)}}

Currently in {{MesosCourseGrainedSchedulerBackend.scala}} (which passes mesos 
caching parameter to executors):
{{private val useFetcherCache = 
conf.getBoolean("spark.mesos.fetcherCache.enable", false)}}

This naming discrepancy dates back to version 2.0.0 
([jira|http://mail-archives.apache.org/mod_mbox/spark-issues/201606.mbox/%3cjira.12979909.1466099309000.9921.1466101026...@atlassian.jira%3E]).

This means that when {{spark.mesos.fetcherCache.enable=true}} is specified, the 
Mesos cache will be used only for executors, and not for drivers.

IMPACT:
Not caching these driver files (typically including at least spark binaries, 
custom jar, and additional dependencies) adds considerable overhead network 
traffic and startup time when frequently running spark Applications on a Mesos 
cluster. Additionally, since extracted files like {{spark-x.x.x-bin-*.tgz}} are 
additionally copied and left in the sandbox with the cache off (rather than 
extracted directly without an extra copy), this can considerably increase disk 
usage. Users CAN currently workaround by specifying the 
{{spark.mesos.fetchCache.enable}} option, but this should at least be specified 
in the documentation.

SUGGESTED FIX:
Add {{spark.mesos.fetchCache.enable}} to the documentation for versions 2 - 
2.4, and update to {{spark.mesos.fetcherCache.enable}} going forward.

  was:
Currently in [docs|https://spark.apache.org/docs/latest/running-on-mesos.html]:
{quote}spark.mesos.fetcherCache.enable / false / If set to `true`, all URIs 
(example: `spark.executor.uri`, `spark.mesos.uris`) will be cached by the Mesos 
Fetcher Cache
{quote}

Currently in {{MesosClusterScheduler.scala}} (which passes parameter to driver):
{{private val useFetchCache = conf.getBoolean("spark.mesos.fetchCache.enable", 
false)}}

Currently in {{MesosCourseGrainedSchedulerBackend.scala}} (which passes mesos 
caching parameter to executors):
{{private val useFetcherCache = 
conf.getBoolean("spark.mesos.fetcherCache.enable", false)}}

This naming discrepancy dates back to version 2.0.0 
([jira|http://mail-archives.apache.org/mod_mbox/spark-issues/201606.mbox/%3cjira.12979909.1466099309000.9921.1466101026...@atlassian.jira%3E]).

This means that when {{spark.mesos.fetcherCache.enable=true}} is specified, the 
Mesos cache will be used only for executors, and not for drivers.

IMPACT:
Not caching these driver files (typically including at least spark binaries, 
custom jar, and additional dependencies) adds considerable network traffic when 
frequently running spark Applications on a Mesos cluster. Additionally, since 
extracted files like {{spark-x.x.x-bin-*.tgz}} are additionally copied and left 
in the sandbox with the cache off (rather than extracted directly without an 
extra copy), this can considerably increase disk usage. Users CAN currently 
workaround by specifying the {{spark.mesos.fetchCache.enable}} option, but this 
should at least be specified in the documentation.

SUGGESTED FIX:
Add {{spark.mesos.fetchCache.enable}} to the documentation for versions 2 - 
2.4, and update to {{spark.mesos.fetcherCache.enable}} going forward.


> Misnaming of spark.mesos.fetch(er)Cache.enable in MesosClusterScheduler
> ---
>
> Key: SPARK-26082
> URL: https://issues.apache.org/jira/browse/SPARK-26082
> Project: Spark
>  Issue Type: Bug
>  Components: Mesos
>Affects Versions: 2.0.0, 2.0.1, 2.0.2, 2.1.0, 2.1.1, 2.1.2, 2.1.3, 2.2.0, 
> 2.2.1, 2.2.2, 2.3.0, 2.3.1, 2.3.2
>Reporter: Martin Loncaric
>Priority: Major
>
> Currently in 
> [docs|https://spark.apache.org/docs/latest/running-on-mesos.html]:
> {quote}spark.mesos.fetcherCache.enable / false / If set to `true`, all URIs 
> (example: `spark.executor.uri`, `spark.mesos.uris`) will be cached by the 
> Mesos Fetcher Cache
> {quote}
> Currently in {{MesosClusterScheduler.scala}} (which passes parameter to 
> driver):
> {{private val useFetchCache = 
> conf.getBoolean("spark.mesos.fetchCache.enable", false)}}
> Currently in {{MesosCourseGrainedSchedulerBackend.scala}} (which passes mesos 
> caching parameter to executors):
> {{private val useFetcherCache = 
> conf.getBoolean("spark.mesos.fetcherCache.enable", false)}}
> This naming 

[jira] [Created] (SPARK-26082) Misnaming of spark.mesos.fetch(er)Cache.enable in MesosClusterScheduler

2018-11-15 Thread Martin Loncaric (JIRA)
Martin Loncaric created SPARK-26082:
---

 Summary: Misnaming of spark.mesos.fetch(er)Cache.enable in 
MesosClusterScheduler
 Key: SPARK-26082
 URL: https://issues.apache.org/jira/browse/SPARK-26082
 Project: Spark
  Issue Type: Bug
  Components: Mesos
Affects Versions: 2.3.2, 2.3.1, 2.3.0, 2.2.2, 2.2.1, 2.2.0, 2.1.3, 2.1.2, 
2.1.1, 2.1.0, 2.0.2, 2.0.1, 2.0.0
Reporter: Martin Loncaric


Currently in [docs|https://spark.apache.org/docs/latest/running-on-mesos.html]:
{quote}spark.mesos.fetcherCache.enable / false / If set to `true`, all URIs 
(example: `spark.executor.uri`, `spark.mesos.uris`) will be cached by the Mesos 
Fetcher Cache
{quote}

Currently in {{MesosClusterScheduler.scala}} (which passes parameter to driver):
{{private val useFetchCache = conf.getBoolean("spark.mesos.fetchCache.enable", 
false)}}

Currently in {{MesosCourseGrainedSchedulerBackend.scala}} (which passes mesos 
caching parameter to executors):
{{private val useFetcherCache = 
conf.getBoolean("spark.mesos.fetcherCache.enable", false)}}

This naming discrepancy dates back to version 2.0.0 
([jira|http://mail-archives.apache.org/mod_mbox/spark-issues/201606.mbox/%3cjira.12979909.1466099309000.9921.1466101026...@atlassian.jira%3E]).

This means that when {{spark.mesos.fetcherCache.enable=true}} is specified, the 
Mesos cache will be used only for executors, and not for drivers.

IMPACT:
Not caching these driver files (typically including at least spark binaries, 
custom jar, and additional dependencies) adds considerable network traffic when 
frequently running spark Applications on a Mesos cluster. Additionally, since 
extracted files like {{spark-x.x.x-bin-*.tgz}} are additionally copied and left 
in the sandbox with the cache off (rather than extracted directly without an 
extra copy), this can considerably increase disk usage. Users CAN currently 
workaround by specifying the {{spark.mesos.fetchCache.enable}} option, but this 
should at least be specified in the documentation.

SUGGESTED FIX:
Add {{spark.mesos.fetchCache.enable}} to the documentation for versions 2 - 
2.4, and update to {{spark.mesos.fetcherCache.enable}} going forward.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org