[jira] [Commented] (SPARK-19965) DataFrame batch reader may fail to infer partitions when reading FileStreamSink's output

2017-03-19 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15931689#comment-15931689
 ] 

Apache Spark commented on SPARK-19965:
--

User 'lw-lin' has created a pull request for this issue:
https://github.com/apache/spark/pull/17346

> DataFrame batch reader may fail to infer partitions when reading 
> FileStreamSink's output
> 
>
> Key: SPARK-19965
> URL: https://issues.apache.org/jira/browse/SPARK-19965
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.1.0
>Reporter: Shixiong Zhu
>
> Reproducer
> {code}
>   test("partitioned writing and batch reading with 'basePath'") {
> val inputData = MemoryStream[Int]
> val ds = inputData.toDS()
> val outputDir = Utils.createTempDir(namePrefix = 
> "stream.output").getCanonicalPath
> val checkpointDir = Utils.createTempDir(namePrefix = 
> "stream.checkpoint").getCanonicalPath
> var query: StreamingQuery = null
> try {
>   query =
> ds.map(i => (i, i * 1000))
>   .toDF("id", "value")
>   .writeStream
>   .partitionBy("id")
>   .option("checkpointLocation", checkpointDir)
>   .format("parquet")
>   .start(outputDir)
>   inputData.addData(1, 2, 3)
>   failAfter(streamingTimeout) {
> query.processAllAvailable()
>   }
>   spark.read.option("basePath", outputDir).parquet(outputDir + 
> "/*").show()
> } finally {
>   if (query != null) {
> query.stop()
>   }
> }
>   }
> {code}
> Stack trace
> {code}
> [info] - partitioned writing and batch reading with 'basePath' *** FAILED *** 
> (3 seconds, 928 milliseconds)
> [info]   java.lang.AssertionError: assertion failed: Conflicting directory 
> structures detected. Suspicious paths:
> [info]***/stream.output-65e3fa45-595a-4d29-b3df-4c001e321637
> [info]
> ***/stream.output-65e3fa45-595a-4d29-b3df-4c001e321637/_spark_metadata
> [info] 
> [info] If provided paths are partition directories, please set "basePath" in 
> the options of the data source to specify the root directory of the table. If 
> there are multiple root directories, please load them separately and then 
> union them.
> [info]   at scala.Predef$.assert(Predef.scala:170)
> [info]   at 
> org.apache.spark.sql.execution.datasources.PartitioningUtils$.parsePartitions(PartitioningUtils.scala:133)
> [info]   at 
> org.apache.spark.sql.execution.datasources.PartitioningUtils$.parsePartitions(PartitioningUtils.scala:98)
> [info]   at 
> org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.inferPartitioning(PartitioningAwareFileIndex.scala:156)
> [info]   at 
> org.apache.spark.sql.execution.datasources.InMemoryFileIndex.partitionSpec(InMemoryFileIndex.scala:54)
> [info]   at 
> org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.partitionSchema(PartitioningAwareFileIndex.scala:55)
> [info]   at 
> org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:133)
> [info]   at 
> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:361)
> [info]   at 
> org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:160)
> [info]   at 
> org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:536)
> [info]   at 
> org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:520)
> [info]   at 
> org.apache.spark.sql.streaming.FileStreamSinkSuite$$anonfun$8.apply$mcV$sp(FileStreamSinkSuite.scala:292)
> [info]   at 
> org.apache.spark.sql.streaming.FileStreamSinkSuite$$anonfun$8.apply(FileStreamSinkSuite.scala:268)
> [info]   at 
> org.apache.spark.sql.streaming.FileStreamSinkSuite$$anonfun$8.apply(FileStreamSinkSuite.scala:268)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19965) DataFrame batch reader may fail to infer partitions when reading FileStreamSink's output

2017-03-16 Thread Shixiong Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15928657#comment-15928657
 ] 

Shixiong Zhu commented on SPARK-19965:
--

[~lwlin] I think we can just ignore “_spark_metadata” in  InMemoryFileIndex. 
Could you try it?

> DataFrame batch reader may fail to infer partitions when reading 
> FileStreamSink's output
> 
>
> Key: SPARK-19965
> URL: https://issues.apache.org/jira/browse/SPARK-19965
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.1.0
>Reporter: Shixiong Zhu
>
> Reproducer
> {code}
>   test("partitioned writing and batch reading with 'basePath'") {
> val inputData = MemoryStream[Int]
> val ds = inputData.toDS()
> val outputDir = Utils.createTempDir(namePrefix = 
> "stream.output").getCanonicalPath
> val checkpointDir = Utils.createTempDir(namePrefix = 
> "stream.checkpoint").getCanonicalPath
> var query: StreamingQuery = null
> try {
>   query =
> ds.map(i => (i, i * 1000))
>   .toDF("id", "value")
>   .writeStream
>   .partitionBy("id")
>   .option("checkpointLocation", checkpointDir)
>   .format("parquet")
>   .start(outputDir)
>   inputData.addData(1, 2, 3)
>   failAfter(streamingTimeout) {
> query.processAllAvailable()
>   }
>   spark.read.option("basePath", outputDir).parquet(outputDir + 
> "/*").show()
> } finally {
>   if (query != null) {
> query.stop()
>   }
> }
>   }
> {code}
> Stack trace
> {code}
> [info] - partitioned writing and batch reading with 'basePath' *** FAILED *** 
> (3 seconds, 928 milliseconds)
> [info]   java.lang.AssertionError: assertion failed: Conflicting directory 
> structures detected. Suspicious paths:
> [info]***/stream.output-65e3fa45-595a-4d29-b3df-4c001e321637
> [info]
> ***/stream.output-65e3fa45-595a-4d29-b3df-4c001e321637/_spark_metadata
> [info] 
> [info] If provided paths are partition directories, please set "basePath" in 
> the options of the data source to specify the root directory of the table. If 
> there are multiple root directories, please load them separately and then 
> union them.
> [info]   at scala.Predef$.assert(Predef.scala:170)
> [info]   at 
> org.apache.spark.sql.execution.datasources.PartitioningUtils$.parsePartitions(PartitioningUtils.scala:133)
> [info]   at 
> org.apache.spark.sql.execution.datasources.PartitioningUtils$.parsePartitions(PartitioningUtils.scala:98)
> [info]   at 
> org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.inferPartitioning(PartitioningAwareFileIndex.scala:156)
> [info]   at 
> org.apache.spark.sql.execution.datasources.InMemoryFileIndex.partitionSpec(InMemoryFileIndex.scala:54)
> [info]   at 
> org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.partitionSchema(PartitioningAwareFileIndex.scala:55)
> [info]   at 
> org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:133)
> [info]   at 
> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:361)
> [info]   at 
> org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:160)
> [info]   at 
> org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:536)
> [info]   at 
> org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:520)
> [info]   at 
> org.apache.spark.sql.streaming.FileStreamSinkSuite$$anonfun$8.apply$mcV$sp(FileStreamSinkSuite.scala:292)
> [info]   at 
> org.apache.spark.sql.streaming.FileStreamSinkSuite$$anonfun$8.apply(FileStreamSinkSuite.scala:268)
> [info]   at 
> org.apache.spark.sql.streaming.FileStreamSinkSuite$$anonfun$8.apply(FileStreamSinkSuite.scala:268)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19965) DataFrame batch reader may fail to infer partitions when reading FileStreamSink's output

2017-03-16 Thread Shixiong Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927572#comment-15927572
 ] 

Shixiong Zhu commented on SPARK-19965:
--

[~lwlin] Go ahead. I guess the root cause probably is using a globed path to 
scan a file sink output. If it's hard to support, then we can throw a better 
exception.

> DataFrame batch reader may fail to infer partitions when reading 
> FileStreamSink's output
> 
>
> Key: SPARK-19965
> URL: https://issues.apache.org/jira/browse/SPARK-19965
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.1.0
>Reporter: Shixiong Zhu
>
> Reproducer
> {code}
>   test("partitioned writing and batch reading with 'basePath'") {
> val inputData = MemoryStream[Int]
> val ds = inputData.toDS()
> val outputDir = Utils.createTempDir(namePrefix = 
> "stream.output").getCanonicalPath
> val checkpointDir = Utils.createTempDir(namePrefix = 
> "stream.checkpoint").getCanonicalPath
> var query: StreamingQuery = null
> try {
>   query =
> ds.map(i => (i, i * 1000))
>   .toDF("id", "value")
>   .writeStream
>   .partitionBy("id")
>   .option("checkpointLocation", checkpointDir)
>   .format("parquet")
>   .start(outputDir)
>   inputData.addData(1, 2, 3)
>   failAfter(streamingTimeout) {
> query.processAllAvailable()
>   }
>   spark.read.option("basePath", outputDir).parquet(outputDir + 
> "/*").show()
> } finally {
>   if (query != null) {
> query.stop()
>   }
> }
>   }
> {code}
> Stack trace
> {code}
> [info] - partitioned writing and batch reading with 'basePath' *** FAILED *** 
> (3 seconds, 928 milliseconds)
> [info]   java.lang.AssertionError: assertion failed: Conflicting directory 
> structures detected. Suspicious paths:
> [info]***/stream.output-65e3fa45-595a-4d29-b3df-4c001e321637
> [info]
> ***/stream.output-65e3fa45-595a-4d29-b3df-4c001e321637/_spark_metadata
> [info] 
> [info] If provided paths are partition directories, please set "basePath" in 
> the options of the data source to specify the root directory of the table. If 
> there are multiple root directories, please load them separately and then 
> union them.
> [info]   at scala.Predef$.assert(Predef.scala:170)
> [info]   at 
> org.apache.spark.sql.execution.datasources.PartitioningUtils$.parsePartitions(PartitioningUtils.scala:133)
> [info]   at 
> org.apache.spark.sql.execution.datasources.PartitioningUtils$.parsePartitions(PartitioningUtils.scala:98)
> [info]   at 
> org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.inferPartitioning(PartitioningAwareFileIndex.scala:156)
> [info]   at 
> org.apache.spark.sql.execution.datasources.InMemoryFileIndex.partitionSpec(InMemoryFileIndex.scala:54)
> [info]   at 
> org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.partitionSchema(PartitioningAwareFileIndex.scala:55)
> [info]   at 
> org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:133)
> [info]   at 
> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:361)
> [info]   at 
> org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:160)
> [info]   at 
> org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:536)
> [info]   at 
> org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:520)
> [info]   at 
> org.apache.spark.sql.streaming.FileStreamSinkSuite$$anonfun$8.apply$mcV$sp(FileStreamSinkSuite.scala:292)
> [info]   at 
> org.apache.spark.sql.streaming.FileStreamSinkSuite$$anonfun$8.apply(FileStreamSinkSuite.scala:268)
> [info]   at 
> org.apache.spark.sql.streaming.FileStreamSinkSuite$$anonfun$8.apply(FileStreamSinkSuite.scala:268)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19965) DataFrame batch reader may fail to infer partitions when reading FileStreamSink's output

2017-03-15 Thread Liwei Lin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927431#comment-15927431
 ] 

Liwei Lin commented on SPARK-19965:
---

Hi [~zsxwing], are you working on a patch? Mind if I work on this in case you 
haven't started your work?

> DataFrame batch reader may fail to infer partitions when reading 
> FileStreamSink's output
> 
>
> Key: SPARK-19965
> URL: https://issues.apache.org/jira/browse/SPARK-19965
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.1.0
>Reporter: Shixiong Zhu
>
> Reproducer
> {code}
>   test("partitioned writing and batch reading with 'basePath'") {
> val inputData = MemoryStream[Int]
> val ds = inputData.toDS()
> val outputDir = Utils.createTempDir(namePrefix = 
> "stream.output").getCanonicalPath
> val checkpointDir = Utils.createTempDir(namePrefix = 
> "stream.checkpoint").getCanonicalPath
> var query: StreamingQuery = null
> try {
>   query =
> ds.map(i => (i, i * 1000))
>   .toDF("id", "value")
>   .writeStream
>   .partitionBy("id")
>   .option("checkpointLocation", checkpointDir)
>   .format("parquet")
>   .start(outputDir)
>   inputData.addData(1, 2, 3)
>   failAfter(streamingTimeout) {
> query.processAllAvailable()
>   }
>   spark.read.option("basePath", outputDir).parquet(outputDir + 
> "/*").show()
> } finally {
>   if (query != null) {
> query.stop()
>   }
> }
>   }
> {code}
> Stack trace
> {code}
> [info] - partitioned writing and batch reading with 'basePath' *** FAILED *** 
> (3 seconds, 928 milliseconds)
> [info]   java.lang.AssertionError: assertion failed: Conflicting directory 
> structures detected. Suspicious paths:
> [info]***/stream.output-65e3fa45-595a-4d29-b3df-4c001e321637
> [info]
> ***/stream.output-65e3fa45-595a-4d29-b3df-4c001e321637/_spark_metadata
> [info] 
> [info] If provided paths are partition directories, please set "basePath" in 
> the options of the data source to specify the root directory of the table. If 
> there are multiple root directories, please load them separately and then 
> union them.
> [info]   at scala.Predef$.assert(Predef.scala:170)
> [info]   at 
> org.apache.spark.sql.execution.datasources.PartitioningUtils$.parsePartitions(PartitioningUtils.scala:133)
> [info]   at 
> org.apache.spark.sql.execution.datasources.PartitioningUtils$.parsePartitions(PartitioningUtils.scala:98)
> [info]   at 
> org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.inferPartitioning(PartitioningAwareFileIndex.scala:156)
> [info]   at 
> org.apache.spark.sql.execution.datasources.InMemoryFileIndex.partitionSpec(InMemoryFileIndex.scala:54)
> [info]   at 
> org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.partitionSchema(PartitioningAwareFileIndex.scala:55)
> [info]   at 
> org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:133)
> [info]   at 
> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:361)
> [info]   at 
> org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:160)
> [info]   at 
> org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:536)
> [info]   at 
> org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:520)
> [info]   at 
> org.apache.spark.sql.streaming.FileStreamSinkSuite$$anonfun$8.apply$mcV$sp(FileStreamSinkSuite.scala:292)
> [info]   at 
> org.apache.spark.sql.streaming.FileStreamSinkSuite$$anonfun$8.apply(FileStreamSinkSuite.scala:268)
> [info]   at 
> org.apache.spark.sql.streaming.FileStreamSinkSuite$$anonfun$8.apply(FileStreamSinkSuite.scala:268)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19965) DataFrame batch reader may fail to infer partitions when reading FileStreamSink's output

2017-03-15 Thread Shixiong Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927333#comment-15927333
 ] 

Shixiong Zhu commented on SPARK-19965:
--

This is because inferring partitions doesn't ignore the "_spark_metadata" 
folder.

> DataFrame batch reader may fail to infer partitions when reading 
> FileStreamSink's output
> 
>
> Key: SPARK-19965
> URL: https://issues.apache.org/jira/browse/SPARK-19965
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.1.0
>Reporter: Shixiong Zhu
>
> Reproducer
> {code}
>   test("partitioned writing and batch reading with 'basePath'") {
> val inputData = MemoryStream[Int]
> val ds = inputData.toDS()
> val outputDir = Utils.createTempDir(namePrefix = 
> "stream.output").getCanonicalPath
> val checkpointDir = Utils.createTempDir(namePrefix = 
> "stream.checkpoint").getCanonicalPath
> var query: StreamingQuery = null
> try {
>   query =
> ds.map(i => (i, i * 1000))
>   .toDF("id", "value")
>   .writeStream
>   .partitionBy("id")
>   .option("checkpointLocation", checkpointDir)
>   .format("parquet")
>   .start(outputDir)
>   inputData.addData(1, 2, 3)
>   failAfter(streamingTimeout) {
> query.processAllAvailable()
>   }
>   spark.read.option("basePath", outputDir).parquet(outputDir + 
> "/*").show()
> } finally {
>   if (query != null) {
> query.stop()
>   }
> }
>   }
> {code}
> Stack trace
> {code}
> [info] - partitioned writing and batch reading with 'basePath' *** FAILED *** 
> (3 seconds, 928 milliseconds)
> [info]   java.lang.AssertionError: assertion failed: Conflicting directory 
> structures detected. Suspicious paths:
> [info]***/stream.output-65e3fa45-595a-4d29-b3df-4c001e321637
> [info]
> ***/stream.output-65e3fa45-595a-4d29-b3df-4c001e321637/_spark_metadata
> [info] 
> [info] If provided paths are partition directories, please set "basePath" in 
> the options of the data source to specify the root directory of the table. If 
> there are multiple root directories, please load them separately and then 
> union them.
> [info]   at scala.Predef$.assert(Predef.scala:170)
> [info]   at 
> org.apache.spark.sql.execution.datasources.PartitioningUtils$.parsePartitions(PartitioningUtils.scala:133)
> [info]   at 
> org.apache.spark.sql.execution.datasources.PartitioningUtils$.parsePartitions(PartitioningUtils.scala:98)
> [info]   at 
> org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.inferPartitioning(PartitioningAwareFileIndex.scala:156)
> [info]   at 
> org.apache.spark.sql.execution.datasources.InMemoryFileIndex.partitionSpec(InMemoryFileIndex.scala:54)
> [info]   at 
> org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.partitionSchema(PartitioningAwareFileIndex.scala:55)
> [info]   at 
> org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:133)
> [info]   at 
> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:361)
> [info]   at 
> org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:160)
> [info]   at 
> org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:536)
> [info]   at 
> org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:520)
> [info]   at 
> org.apache.spark.sql.streaming.FileStreamSinkSuite$$anonfun$8.apply$mcV$sp(FileStreamSinkSuite.scala:292)
> [info]   at 
> org.apache.spark.sql.streaming.FileStreamSinkSuite$$anonfun$8.apply(FileStreamSinkSuite.scala:268)
> [info]   at 
> org.apache.spark.sql.streaming.FileStreamSinkSuite$$anonfun$8.apply(FileStreamSinkSuite.scala:268)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org