[jira] [Created] (SPARK-22605) OutputMetrics empty for DataFrame writes

2017-11-24 Thread Jason White (JIRA)
Jason White created SPARK-22605:
---

 Summary: OutputMetrics empty for DataFrame writes
 Key: SPARK-22605
 URL: https://issues.apache.org/jira/browse/SPARK-22605
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.2.0
Reporter: Jason White
Priority: Minor


I am trying to use the SparkListener interface to hook up some custom 
monitoring for some of our critical jobs. Among the first metrics I would like 
is an output row count & size metric. I'm using PySpark and the Py4J interface 
to implement the listener.

I am able to see the recordsRead and bytesRead metrics via the 
taskEnd.taskMetrics().inputMetrics().recordsRead() and .bytesRead() methods. 
taskEnd.taskMetrics().outputMetrics().recordsWritten() and .bytesWritten() are 
always 0. I see similar output if I use the stageCompleted event instead.

To trigger execution, I am using df.write.parquet(path). If I use 
df.rdd.saveAsTextFile(path) instead, the counts and bytes are correct.

Another clue that this bug is deeper in Spark SQL is that the Spark Application 
Master doesn't show the Output Size / Records column with df.write.parquet or 
df.write.text, but does with df.rdd.saveAsTextFile. Since the Spark Application 
Master also gets its output via the Listener interface, this would seem related.

There is a related PR: https://issues.apache.org/jira/browse/SPARK-21882, but I 
believe this to be a distinct issue.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19950) nullable ignored when df.load() is executed for file-based data source

2017-03-21 Thread Jason White (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15934719#comment-15934719
 ] 

Jason White commented on SPARK-19950:
-

Without something that allows us to read using the nullable as exists on-disk, 
we end doing:
df = spark.read.parquet(path)
return spark.createDataFrame(df.rdd, schema)

Which is obviously not desirable. We would much rather rely on the schema as 
defined by the file format (Parquet in our case), or rely on a user-supplied 
schema. Preferably both.

> nullable ignored when df.load() is executed for file-based data source
> --
>
> Key: SPARK-19950
> URL: https://issues.apache.org/jira/browse/SPARK-19950
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Kazuaki Ishizaki
>
> This problem is reported in [Databricks 
> forum|https://forums.databricks.com/questions/7123/nullable-seemingly-ignored-when-reading-parquet.html].
> When we execute the following code, a schema for "id" in {{dfRead}} has 
> {{nullable = true}}. It should be {{nullable = false}}.
> {code:java}
> val field = "id"
> val df = spark.range(0, 5, 1, 1).toDF(field)
> val fmt = "parquet"
> val path = "/tmp/parquet"
> val schema = StructType(Seq(StructField(field, LongType, false)))
> df.write.format(fmt).mode("overwrite").save(path)
> val dfRead = spark.read.format(fmt).schema(schema).load(path)
> dfRead.printSchema
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-19561) Pyspark Dataframes don't allow timestamps near epoch

2017-02-11 Thread Jason White (JIRA)
Jason White created SPARK-19561:
---

 Summary: Pyspark Dataframes don't allow timestamps near epoch
 Key: SPARK-19561
 URL: https://issues.apache.org/jira/browse/SPARK-19561
 Project: Spark
  Issue Type: Bug
  Components: PySpark, SQL
Affects Versions: 2.1.0, 2.0.1
Reporter: Jason White


Pyspark does not allow timestamps at or near the epoch to be created in a 
DataFrame. Related issue: https://issues.apache.org/jira/browse/SPARK-19299

TimestampType.toInternal converts a datetime object to a number representing 
microseconds since the epoch. For all times more than 2148 seconds before or 
after 1970-01-01T00:00:00+, this number is greater than 2^31 and Py4J 
automatically serializes it as a long.

However, for times within this range (~35 minutes before or after the epoch), 
Py4J serializes it as an int. When creating the object on the Scala side, ints 
are not recognized and the value goes to null. This leads to null values in 
non-nullable fields, and corrupted Parquet files.

The solution is trivial - force TimestampType.toInternal to always return a 
long.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-19299) Nulls in non nullable columns causes data corruption in parquet

2017-01-20 Thread Jason White (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15832195#comment-15832195
 ] 

Jason White edited comment on SPARK-19299 at 1/20/17 6:14 PM:
--

These seem like two or three separate issues.
- Python long doesn't fit into Scala long, resulting in Null even in a 
non-nullable field
- Python datetime near the epoch doesn't fit ( ? ) into a Scala timestamp, 
resulting in Null even in a non-nullable field
- DataFrame serialization to disk assumes, but doesn't verify, no nulls exist 
in non-nullable fields. This assumption is obviously fragile. (Also tested with 
JSON serialization - the column simply disappears in the JSON output for that 
row)


was (Author: jason.white):
These seem like two or three separate issues.
- Python long doesn't fit into Scala long, resulting in Null even in a 
non-nullable field
- Python datetime near the epoch doesn't fit ( ? ) into a Scala timestamp, 
resulting in Null even in a non-nullable field
- DataFrame serialization to disk doesn't assumes, but doesn't verify, no nulls 
exist in non-nullable fields. This assumption is obviously fragile. (Also 
tested with JSON serialization - the column simply disappears in the JSON 
output for that row)

> Nulls in non nullable columns causes data corruption in parquet
> ---
>
> Key: SPARK-19299
> URL: https://issues.apache.org/jira/browse/SPARK-19299
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark, Spark Core
>Affects Versions: 1.6.0, 2.0.0, 2.0.1, 2.0.2, 2.1.0
>Reporter: Franklyn Dsouza
>Priority: Critical
>
> The problem we're seeing is that if a null occurs in a no-nullable field and 
> is written down to parquet the resulting file gets corrupt and can not be 
> read back correctly.
> One way that this can occur is when a long value in python is too big to fit 
> into a sql LongType it gets cast to null. 
> We're also seeing that the behaviour is different depending on whether or not 
> the vectorized reader is enabled.
> Here's an example in PySpark
> {code}
> from datetime import datetime
> from pyspark.sql import types
> data = [
>   (1, 6),
>   (2, 7),
>   (3, 2 ** 64), # value overflows sql LongType
>   (4, 8),
>   (5, 9)
> ]
> schema = types.StructType([
>   types.StructField("index", types.LongType(), False),
>   types.StructField("long", types.LongType(), False),
> ])
> df = sc.sql.createDataFrame(data, schema)
> df.collect()
> df.write.parquet("corrupt_parquet")
> df_parquet = sqlCtx.read.parquet("corrupt_parquet/*.parquet")
> df_parquet.collect()
> {code}
> with the vectorized reader enabled this causes
> {code}
> In [2]: df.collect()
> Out[2]:
> [Row(index=1, long=6),
>  Row(index=2, long=7),
>  Row(index=3, long=None),
>  Row(index=4, long=8),
>  Row(index=5, long=9)]
> In [3]: df_parquet.collect()
> Out[3]:
> [Row(index=1, long=6),
>  Row(index=2, long=7),
>  Row(index=3, long=8),
>  Row(index=4, long=9),
>  Row(index=5, long=5)]
> {code}
> as you can see reading the data back from disk causes data to get shifted up 
> and between columns.
> with the vectorized reader disabled we are completely unable to read the file.
> {code}
> Py4JJavaError: An error occurred while calling o143.collectToPython.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 
> in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 
> (TID 3, localhost): org.apache.parquet.io.ParquetDecodingException: Can not 
> read value at 4 in block 0 in file 
> file:/Users/franklyndsouza/dev/starscream/corrupt/part-r-0-4fa5aee8-2138-4e0c-b6d8-22a418d90fd3.snappy.parquet
>   at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:228)
>   at 
> org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201)
>   at 
> org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
>   at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
>   at 
> 

[jira] [Comment Edited] (SPARK-19299) Nulls in non nullable columns causes data corruption in parquet

2017-01-20 Thread Jason White (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15832195#comment-15832195
 ] 

Jason White edited comment on SPARK-19299 at 1/20/17 6:09 PM:
--

These seem like two or three separate issues.
- Python long doesn't fit into Scala long, resulting in Null even in a 
non-nullable field
- Python datetime near the epoch doesn't fit ( ? ) into a Scala timestamp, 
resulting in Null even in a non-nullable field
- DataFrame serialization to disk doesn't assumes, but doesn't verify, no nulls 
exist in non-nullable fields. This assumption is obviously fragile. (Also 
tested with JSON serialization - the column simply disappears in the JSON 
output for that row)


was (Author: jason.white):
These seem like two or three separate issues.
- Python long doesn't fit into Scala long, resulting in Null even in a 
non-nullable field
- Python datetime near the epoch doesn't fit (?) into a Scala timestamp, 
resulting in Null even in a non-nullable field
- DataFrame serialization to disk doesn't assumes, but doesn't verify, no nulls 
exist in non-nullable fields. This assumption is obviously fragile. (Also 
tested with JSON serialization - the column simply disappears in the JSON 
output for that row)

> Nulls in non nullable columns causes data corruption in parquet
> ---
>
> Key: SPARK-19299
> URL: https://issues.apache.org/jira/browse/SPARK-19299
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark, Spark Core
>Affects Versions: 1.6.0, 2.0.0, 2.0.1, 2.0.2, 2.1.0
>Reporter: Franklyn Dsouza
>
> The problem we're seeing is that if a null occurs in a no-nullable field and 
> is written down to parquet the resulting file gets corrupt and can not be 
> read back correctly.
> One way that this can occur is when a long value in python is too big to fit 
> into a sql LongType it gets cast to null. 
> We're also seeing that the behaviour is different depending on whether or not 
> the vectorized reader is enabled.
> Here's an example in PySpark
> {code}
> from datetime import datetime
> from pyspark.sql import types
> data = [
>   (1, 6),
>   (2, 7),
>   (3, 2 ** 64), # value overflows sql LongType
>   (4, 8),
>   (5, 9)
> ]
> schema = types.StructType([
>   types.StructField("index", types.LongType(), False),
>   types.StructField("long", types.LongType(), False),
> ])
> df = sc.sql.createDataFrame(data, schema)
> df.collect()
> df.write.parquet("corrupt_parquet")
> df_parquet = sqlCtx.read.parquet("corrupt_parquet/*.parquet")
> df_parquet.collect()
> {code}
> with the vectorized reader enabled this causes
> {code}
> In [2]: df.collect()
> Out[2]:
> [Row(index=1, long=6),
>  Row(index=2, long=7),
>  Row(index=3, long=None),
>  Row(index=4, long=8),
>  Row(index=5, long=9)]
> In [3]: df_parquet.collect()
> Out[3]:
> [Row(index=1, long=6),
>  Row(index=2, long=7),
>  Row(index=3, long=8),
>  Row(index=4, long=9),
>  Row(index=5, long=5)]
> {code}
> as you can see reading the data back from disk causes data to get shifted up 
> and between columns.
> with the vectorized reader disabled we are completely unable to read the file.
> {code}
> Py4JJavaError: An error occurred while calling o143.collectToPython.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 
> in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 
> (TID 3, localhost): org.apache.parquet.io.ParquetDecodingException: Can not 
> read value at 4 in block 0 in file 
> file:/Users/franklyndsouza/dev/starscream/corrupt/part-r-0-4fa5aee8-2138-4e0c-b6d8-22a418d90fd3.snappy.parquet
>   at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:228)
>   at 
> org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201)
>   at 
> org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
>   at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
>   at 
> 

[jira] [Commented] (SPARK-19299) Nulls in non nullable columns causes data corruption in parquet

2017-01-20 Thread Jason White (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15832195#comment-15832195
 ] 

Jason White commented on SPARK-19299:
-

These seem like two or three separate issues.
- Python long doesn't fit into Scala long, resulting in Null even in a 
non-nullable field
- Python datetime near the epoch doesn't fit (?) into a Scala timestamp, 
resulting in Null even in a non-nullable field
- DataFrame serialization to disk doesn't assumes, but doesn't verify, no nulls 
exist in non-nullable fields. This assumption is obviously fragile. (Also 
tested with JSON serialization - the column simply disappears in the JSON 
output for that row)

> Nulls in non nullable columns causes data corruption in parquet
> ---
>
> Key: SPARK-19299
> URL: https://issues.apache.org/jira/browse/SPARK-19299
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark, Spark Core
>Affects Versions: 1.6.0, 2.0.0, 2.0.1, 2.0.2, 2.1.0
>Reporter: Franklyn Dsouza
>
> The problem we're seeing is that if a null occurs in a no-nullable field and 
> is written down to parquet the resulting file gets corrupt and can not be 
> read back correctly.
> One way that this can occur is when a long value in python is too big to fit 
> into a sql LongType it gets cast to null. 
> We're also seeing that the behaviour is different depending on whether or not 
> the vectorized reader is enabled.
> Here's an example in PySpark
> {code}
> from datetime import datetime
> from pyspark.sql import types
> data = [
>   (1, 6),
>   (2, 7),
>   (3, 2 ** 64), # value overflows sql LongType
>   (4, 8),
>   (5, 9)
> ]
> schema = types.StructType([
>   types.StructField("index", types.LongType(), False),
>   types.StructField("long", types.LongType(), False),
> ])
> df = sc.sql.createDataFrame(data, schema)
> df.collect()
> df.write.parquet("corrupt_parquet")
> df_parquet = sqlCtx.read.parquet("corrupt_parquet/*.parquet")
> df_parquet.collect()
> {code}
> with the vectorized reader enabled this causes
> {code}
> In [2]: df.collect()
> Out[2]:
> [Row(index=1, long=6),
>  Row(index=2, long=7),
>  Row(index=3, long=None),
>  Row(index=4, long=8),
>  Row(index=5, long=9)]
> In [3]: df_parquet.collect()
> Out[3]:
> [Row(index=1, long=6),
>  Row(index=2, long=7),
>  Row(index=3, long=8),
>  Row(index=4, long=9),
>  Row(index=5, long=5)]
> {code}
> as you can see reading the data back from disk causes data to get shifted up 
> and between columns.
> with the vectorized reader disabled we are completely unable to read the file.
> {code}
> Py4JJavaError: An error occurred while calling o143.collectToPython.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 
> in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 
> (TID 3, localhost): org.apache.parquet.io.ParquetDecodingException: Can not 
> read value at 4 in block 0 in file 
> file:/Users/franklyndsouza/dev/starscream/corrupt/part-r-0-4fa5aee8-2138-4e0c-b6d8-22a418d90fd3.snappy.parquet
>   at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:228)
>   at 
> org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201)
>   at 
> org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
>   at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>   at org.apache.spark.scheduler.Task.run(Task.scala:86)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> 

[jira] [Commented] (SPARK-19299) Nulls in non nullable columns causes data corruption in parquet

2017-01-20 Thread Jason White (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15832006#comment-15832006
 ] 

Jason White commented on SPARK-19299:
-

Also seeing this same behaviour in Spark 2.0.1 when creating a DataFrame with a 
timestamp at or near the epoch.

My computer is in Eastern time, so 1969-12-31T19:00:00-0500 is unix timestamp 0.

{code}
>>> from datetime import datetime
>>> dt = datetime(1969, 12, 31, 19, 0, 0)

>>> from pyspark.sql import SQLContext
>>> sql = SQLContext(sc)

>>> from pyspark.sql.types import StructType, StructField, TimestampType
>>> schema = StructType([StructField('ts', TimestampType(), False)])
>>> df = sql.createDataFrame([(dt,)], schema)

>>> df.schema
StructType(List(StructField(ts,TimestampType,false)))

>>> df.collect()
[Row(ts=None)]
{code}

Weirdly, this continues on for over half an hour after the epoch:
{code}
>>> dt = datetime(1969, 12, 31, 19, 35, 47)
>>> df = sql.createDataFrame([(dt,)], schema)
>>> df.collect()
[Row(ts=None)]
>>> dt = datetime(1969, 12, 31, 19, 35, 48)
>>> df = sql.createDataFrame([(dt,)], schema)
>>> df.collect()
[Row(ts=datetime.datetime(1969, 12, 31, 19, 35, 48))]
{code}

> Nulls in non nullable columns causes data corruption in parquet
> ---
>
> Key: SPARK-19299
> URL: https://issues.apache.org/jira/browse/SPARK-19299
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark, Spark Core
>Affects Versions: 1.6.0, 2.0.0, 2.0.1, 2.0.2, 2.1.0
>Reporter: Franklyn Dsouza
>
> The problem we're seeing is that if a null occurs in a no-nullable field and 
> is written down to parquet the resulting file gets corrupt and can not be 
> read back correctly.
> One way that this can occur is when a long value in python is too big to fit 
> into a sql LongType it gets cast to null. 
> We're also seeing that the behaviour is different depending on whether or not 
> the vectorized reader is enabled.
> Here's an example in PySpark
> {code}
> from datetime import datetime
> from pyspark.sql import types
> data = [
>   (1, 6),
>   (2, 7),
>   (3, 2 ** 64), # value overflows sql LongType
>   (4, 8),
>   (5, 9)
> ]
> schema = types.StructType([
>   types.StructField("index", types.LongType(), False),
>   types.StructField("long", types.LongType(), False),
> ])
> df = sc.sql.createDataFrame(data, schema)
> df.collect()
> df.write.parquet("corrupt_parquet")
> df_parquet = sqlCtx.read.parquet("corrupt_parquet/*.parquet")
> df_parquet.collect()
> {code}
> with the vectorized reader enabled this causes
> {code}
> In [2]: df.collect()
> Out[2]:
> [Row(index=1, long=6),
>  Row(index=2, long=7),
>  Row(index=3, long=None),
>  Row(index=4, long=8),
>  Row(index=5, long=9)]
> In [3]: df_parquet.collect()
> Out[3]:
> [Row(index=1, long=6),
>  Row(index=2, long=7),
>  Row(index=3, long=8),
>  Row(index=4, long=9),
>  Row(index=5, long=5)]
> {code}
> as you can see reading the data back from disk causes data to get shifted up 
> and between columns.
> with the vectorized reader disabled we are completely unable to read the file.
> {code}
> Py4JJavaError: An error occurred while calling o143.collectToPython.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 
> in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 
> (TID 3, localhost): org.apache.parquet.io.ParquetDecodingException: Can not 
> read value at 4 in block 0 in file 
> file:/Users/franklyndsouza/dev/starscream/corrupt/part-r-0-4fa5aee8-2138-4e0c-b6d8-22a418d90fd3.snappy.parquet
>   at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:228)
>   at 
> org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201)
>   at 
> org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
>   at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
>   at 
> 

[jira] [Commented] (SPARK-10915) Add support for UDAFs in Python

2016-10-20 Thread Jason White (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15592831#comment-15592831
 ] 

Jason White commented on SPARK-10915:
-

At the moment, we use .repartitionAndSortWithinPartitions to give us a strictly 
ordered iterable that we can process one at a time. We don't have a Python list 
sitting in memory, instead we rely on ExternalSort to order in a memory-safe 
way.

I don't yet have enough experience with DataFrames to know if we will have the 
same or similar problems there. It's possible that collect_list will perform 
better - I'll give that a try when we get there and report back on this ticket 
if it's a suitable approach for our use case.

> Add support for UDAFs in Python
> ---
>
> Key: SPARK-10915
> URL: https://issues.apache.org/jira/browse/SPARK-10915
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark, SQL
>Reporter: Justin Uang
>
> This should support python defined lambdas.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-10915) Add support for UDAFs in Python

2016-10-20 Thread Jason White (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15592534#comment-15592534
 ] 

Jason White commented on SPARK-10915:
-

That's unfortunate. Materializing a list somewhere is exactly what we're trying 
to avoid. The lists can get unpredictably long for some small number of keys, 
and this approach tends to cause us to blow by our memory ceiling, at least 
when using RDDs. It's why we don't use .groupByKey unless absolutely necessary.

> Add support for UDAFs in Python
> ---
>
> Key: SPARK-10915
> URL: https://issues.apache.org/jira/browse/SPARK-10915
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark, SQL
>Reporter: Justin Uang
>
> This should support python defined lambdas.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-10915) Add support for UDAFs in Python

2016-10-20 Thread Jason White (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15591706#comment-15591706
 ] 

Jason White commented on SPARK-10915:
-

We would also very much like Python UDAFs. In particular, we have some 
situations where value ordering matters, e.g. a state machine. .reduceByKey 
can't be used here (not associative), so we've come up with our own function 
.overByKey that makes use of .repartitionAndSortWithinPartitions, and applies a 
function to the sorted values for each key.

We'd like to move more of our logic over to DataFrames and minimize the number 
of times we need to dive down into RDDs. This issue is one of the primary 
reasons we have to keep going back to RDDs.

> Add support for UDAFs in Python
> ---
>
> Key: SPARK-10915
> URL: https://issues.apache.org/jira/browse/SPARK-10915
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark, SQL
>Reporter: Justin Uang
>
> This should support python defined lambdas.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-17679) Remove unnecessary Py4J ListConverter patch

2016-09-26 Thread Jason White (JIRA)
Jason White created SPARK-17679:
---

 Summary: Remove unnecessary Py4J ListConverter patch
 Key: SPARK-17679
 URL: https://issues.apache.org/jira/browse/SPARK-17679
 Project: Spark
  Issue Type: Bug
  Components: PySpark
Affects Versions: 2.0.0
Reporter: Jason White
Priority: Minor


In SPARK-6949 davies documented a couple of bugs with Py4J that prevented Spark 
from registering a converter for date and datetime objects. Patched in 
https://github.com/apache/spark/pull/5570.

Specifically https://github.com/bartdag/py4j/issues/160 dealt with 
ListConverter automatically converting bytearrays into ArrayList instead of 
leaving it alone.

Py4J #160 has since been fixed in Py4J, since the 0.9 release a couple of 
months after Spark #5570. According to spark-core's pom.xml, we're using 0.10.3.

We should remove this patch on ListConverter since the upstream package no 
longer has this issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-14700) PySpark Row equality operator is not overridden

2016-04-18 Thread Jason White (JIRA)
Jason White created SPARK-14700:
---

 Summary: PySpark Row equality operator is not overridden
 Key: SPARK-14700
 URL: https://issues.apache.org/jira/browse/SPARK-14700
 Project: Spark
  Issue Type: Bug
  Components: PySpark
Affects Versions: 1.6.1
Reporter: Jason White


The pyspark.sql.Row class doesn't override the equality operator. As a result, 
it uses the superclass's equality operator, `tuple`. This is insufficient, as 
the order of the elements in the tuple are meant to be used in combination with 
the private `__fields__` member.

This leads to difficulties in preparing proper unit tests in PySpark 
DataFrames. It leads to seemingly illogical conditions such as:
Row(a=1) == Row(b=1) # True, since column names aren't considered
r1 = Row('b', 'a')(2, 1) # Row(b=2, a=1)
r1 == Row(b=2, a=1) # False, since kwarg operators are sorted alphabetically in 
the Row constructor
r1 == Row(a=2, b=1) # True, since the tuple for each is (2, 1)

Indeed, a few bugs in existing Spark code were exposed when I patched this. PR 
incoming.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-12073) Backpressure causes individual Kafka partitions to lag

2015-12-01 Thread Jason White (JIRA)
Jason White created SPARK-12073:
---

 Summary: Backpressure causes individual Kafka partitions to lag
 Key: SPARK-12073
 URL: https://issues.apache.org/jira/browse/SPARK-12073
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.5.2
Reporter: Jason White


We're seeing a growing lag on (2) individual Kafka partitions, on a topic with 
32 partitions. Our individual batch sessions are completing in 5-7s, with a 
batch window of 30s, so there's plenty of room for Streaming to catch up, but 
it looks to be intentionally limiting itself. These partitions are experiencing 
unbalanced load (higher than most of the others)

What I believe is happening is that maxMessagesPerPartition calculates an 
appropriate limit for the message rate from all partitions, and then divides by 
the number of partitions to determine how many messages to retrieve per 
partition. The problem with this approach is that when one partition is behind 
by millions of records (due to random Kafka issues) or is experiencing heavy 
load, the number of messages to be retrieved shouldn't be evenly split among 
the partitions. In this scenario, if the rate estimator calculates only 100k 
total messages can be retrieved, each partition (out of say 32) only retrieves 
max 100k/32=3125 messages.

Under some conditions, this results in the backpressure keeping the lagging 
partition from recovering. The PIDRateEstimator doesn't increase the number of 
messages to retrieve enough to recover, and we stabilize at a point where these 
individual partitions slowly grow.

I have a PR on our fork in progress to allocate the maxMessagesPerPartition by 
weighting the number to be retrieved on the current lag each partition is 
currently experiencing.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-11437) createDataFrame shouldn't .take() when provided schema

2015-10-31 Thread Jason White (JIRA)
Jason White created SPARK-11437:
---

 Summary: createDataFrame shouldn't .take() when provided schema
 Key: SPARK-11437
 URL: https://issues.apache.org/jira/browse/SPARK-11437
 Project: Spark
  Issue Type: Improvement
  Components: PySpark
Reporter: Jason White


When creating a DataFrame from an RDD in PySpark, `createDataFrame` calls 
`.take(10)` to verify the first 10 rows of the RDD match the provided schema. 
Similar to https://issues.apache.org/jira/browse/SPARK-8070, but that issue 
affected cases where a schema was not provided.

Verifying the first 10 rows is of limited utility and causes the DAG to be 
executed non-lazily. If necessary, I believe this verification should be done 
lazily on all rows. However, since the caller is providing a schema to follow, 
I think it's acceptable to simply fail if the schema is incorrect.

https://github.com/apache/spark/blob/master/python/pyspark/sql/context.py#L321-L325



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-11437) createDataFrame shouldn't .take() when provided schema

2015-10-31 Thread Jason White (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-11437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14984009#comment-14984009
 ] 

Jason White commented on SPARK-11437:
-

[~marmbrus] We briefly discussed this at SparkSummitEU this week.

> createDataFrame shouldn't .take() when provided schema
> --
>
> Key: SPARK-11437
> URL: https://issues.apache.org/jira/browse/SPARK-11437
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark
>Reporter: Jason White
>
> When creating a DataFrame from an RDD in PySpark, `createDataFrame` calls 
> `.take(10)` to verify the first 10 rows of the RDD match the provided schema. 
> Similar to https://issues.apache.org/jira/browse/SPARK-8070, but that issue 
> affected cases where a schema was not provided.
> Verifying the first 10 rows is of limited utility and causes the DAG to be 
> executed non-lazily. If necessary, I believe this verification should be done 
> lazily on all rows. However, since the caller is providing a schema to 
> follow, I think it's acceptable to simply fail if the schema is incorrect.
> https://github.com/apache/spark/blob/master/python/pyspark/sql/context.py#L321-L325



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-8453) Unioning two RDDs in PySpark doesn't spill to disk

2015-06-18 Thread Jason White (JIRA)
Jason White created SPARK-8453:
--

 Summary: Unioning two RDDs in PySpark doesn't spill to disk
 Key: SPARK-8453
 URL: https://issues.apache.org/jira/browse/SPARK-8453
 Project: Spark
  Issue Type: Bug
  Components: PySpark
Affects Versions: 1.4.0
Reporter: Jason White


When unioning 2 RDDs together in PySpark, spill limits do not seem to be 
recognized. Our YARN containers are frequently killed for exceeding memory 
limits for this reason.

I have been able to reproduce this in the following simple scenario:
- spark.executor.instances: 1, spark.executor.memory: 512m, 
spark.executor.cores: 20, spark.python.worker.reuse: false, 
spark.shuffle.spill: true, spark.yarn.executor.memoryOverhead: 5000
(I recognize this is not a good setup - I set things up this way to explore 
this problem and make the symptom easier to isolate)

I have a 1-billion-row dataset, split up evenly into 1000 partitions. Each 
partition contains exactly 1 million rows. Each row contains approximately 250 
characters, +/- 10.

I executed the following in a PySpark shell:
```
profiler = sc.textFile('/user/jasonwhite/profiler')
profiler_2 = sc.textFile('/user/jasonwhite/profiler')
profiler.count()
profiler_2.count()
```
Total container memory utilization was between 2500  2800 MB over the total 
execution, with no spill. No problem.

Then I executed:
```
z = profiler.union(profiler_2)
z.count()
```
Memory utilization spiked immediately to between 4700  4900 MB over the course 
of execution, also with no spill. Big problem. Since we are setting our 
container memory sizes based in part on the Python spill limit, when these 
spill limits are not properly recognized, our containers are unexpectedly 
killed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-8453) Unioning two RDDs in PySpark doesn't spill to disk

2015-06-18 Thread Jason White (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14592541#comment-14592541
 ] 

Jason White commented on SPARK-8453:


Interestingly, if you repartition the RDDs to the same number of partitions 
before unioning them, there are no memory spikes at all:
```
profiler = sc.textFile('/user/jasonwhite/profiler')
profiler = profiler.repartition(profiler.getNumPartitions())
profiler_2 = sc.textFile('/user/jasonwhite/profiler')
profiler_2 = profiler_2.repartition(profiler_2.getNumPartitions())
z = profiler.union(profiler_2)
z.count()
```
Obviously I'd rather not repartition every dataset after being loaded via 
`sc.textFile`. Any ideas as to what could be the issue?

 Unioning two RDDs in PySpark doesn't spill to disk
 --

 Key: SPARK-8453
 URL: https://issues.apache.org/jira/browse/SPARK-8453
 Project: Spark
  Issue Type: Bug
  Components: PySpark
Affects Versions: 1.4.0
Reporter: Jason White
  Labels: memory, union

 When unioning 2 RDDs together in PySpark, spill limits do not seem to be 
 recognized. Our YARN containers are frequently killed for exceeding memory 
 limits for this reason.
 I have been able to reproduce this in the following simple scenario:
 - spark.executor.instances: 1, spark.executor.memory: 512m, 
 spark.executor.cores: 20, spark.python.worker.reuse: false, 
 spark.shuffle.spill: true, spark.yarn.executor.memoryOverhead: 5000
 (I recognize this is not a good setup - I set things up this way to explore 
 this problem and make the symptom easier to isolate)
 I have a 1-billion-row dataset, split up evenly into 1000 partitions. Each 
 partition contains exactly 1 million rows. Each row contains approximately 
 250 characters, +/- 10.
 I executed the following in a PySpark shell:
 ```
 profiler = sc.textFile('/user/jasonwhite/profiler')
 profiler_2 = sc.textFile('/user/jasonwhite/profiler')
 profiler.count()
 profiler_2.count()
 ```
 Total container memory utilization was between 2500  2800 MB over the total 
 execution, with no spill. No problem.
 Then I executed:
 ```
 z = profiler.union(profiler_2)
 z.count()
 ```
 Memory utilization spiked immediately to between 4700  4900 MB over the 
 course of execution, also with no spill. Big problem. Since we are setting 
 our container memory sizes based in part on the Python spill limit, when 
 these spill limits are not properly recognized, our containers are 
 unexpectedly killed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org