[jira] [Updated] (SPARK-19629) Partitioning of Parquet is not considered correctly at loading in local[X] mode
[ https://issues.apache.org/jira/browse/SPARK-19629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Navige updated SPARK-19629: --- Description: Running the following two examples will lead to different results depending on whether the code is run using Spark 1.6 or Spark 2.1. h1.What does the example do? - The code creates an exemplary dataframe with random data. - The dataframe is repartitioned and stored to disk. - Then the dataframe is re-read from disk. - The number of partitions of the dataframe is considered. h1. What is the/my expected behaviour? The number of partitions specified when storing the dataframe should be the same as when re-loading the dataframe from disk. h1. Differences in Spark 1.6 and Spark 2 On Spark 1.6 the partitioning is kept, i.e., the code example will return 10 partitions as specified using npartitions; on Spark 2.1 the number of partitions will equal the number of local nodes specified when starting Spark (using local[X] as master). Looking at the data produced, in both Spark versions the number of files in the parquet directory is the same - so Spark 2 produces so many files as the number partitions when storing, but when reading in Spark 2, the number of partitions is messed up. h1.Minimal code example {code:none} # run on Spark 1.6 import scala.util.Random import org.apache.spark.sql.types.{StructField, StructType, FloatType} import org.apache.spark.sql.Row val rdd = sc.parallelize(Seq.fill(100)(Row(Seq(Random.nextFloat()): _*))) val df = sqlContext.createDataFrame(rdd, StructType(Seq(StructField("test", FloatType val npartitions = 10 df.repartition(npartitions).write.parquet("/tmp/test1") val read = sqlContext.read.parquet("/tmp/test1") assert(npartitions == read.rdd.getNumPartitions) //true on Spark 1.6 {code} {code:none} # run on Spark 2.1 import scala.util.Random import org.apache.spark.sql.types.{StructField, StructType, FloatType} import org.apache.spark.sql.Row val rdd = sc.parallelize(Seq.fill(100)(Row(Seq(Random.nextFloat()): _*))) val df = spark.sqlContext.createDataFrame(rdd, StructType(Seq(StructField("test", FloatType val npartitions = 10 df.repartition(npartitions).write.parquet("/tmp/test1") val read = spark.sqlContext.read.parquet("/tmp/test1") assert(npartitions == read.rdd.getNumPartitions) //false on Spark 2.1 {code} h1.What could other solutions be, if this is not a bug? If this is intended, what about introducing a parameter at reading time, which specifies whether the data should truly be repartitioned (depending on the number of nodes) or should be read "as-is". was: Running the following two examples will lead to different results depending on whether the code is run using Spark 1.6 or Spark 2.1. h1.What does the example do? - The code creates an exemplary dataframe with random data. - The dataframe is repartitioned and stored to disk. - Then the dataframe is re-read from disk. - The number of partitions of the dataframe is considered. h1. What is the expected behaviour? The number of partitions specified when storing the dataframe should be the same as when re-loading the dataframe from disk. On Spark 1.6 the partitioning is kept, i.e., the code example will return 10 partitions as specified using npartitions; on Spark 2.1 the number of partitions will equal the number of local nodes specified when starting Spark (using local[X] as master). h1.Minimal code example {code:none} # run on Spark 1.6 import scala.util.Random import org.apache.spark.sql.types.{StructField, StructType, FloatType} import org.apache.spark.sql.Row val rdd = sc.parallelize(Seq.fill(100)(Row(Seq(Random.nextFloat()): _*))) val df = sqlContext.createDataFrame(rdd, StructType(Seq(StructField("test", FloatType val npartitions = 10 df.repartition(npartitions).write.parquet("/tmp/test1") val read = sqlContext.read.parquet("/tmp/test1") assert(npartitions == read.rdd.getNumPartitions) //true on Spark 1.6 {code} {code:none} # run on Spark 2.1 import scala.util.Random import org.apache.spark.sql.types.{StructField, StructType, FloatType} import org.apache.spark.sql.Row val rdd = sc.parallelize(Seq.fill(100)(Row(Seq(Random.nextFloat()): _*))) val df = spark.sqlContext.createDataFrame(rdd, StructType(Seq(StructField("test", FloatType val npartitions = 10 df.repartition(npartitions).write.parquet("/tmp/test1") val read = spark.sqlContext.read.parquet("/tmp/test1") assert(npartitions == read.rdd.getNumPartitions) //false on Spark 2.1 {code} > Partitioning of Parquet is not considered correctly at loading in local[X] > mode > --- > > Key: SPARK-19629 > URL: https://issues.apache.org/jira/browse/SPARK-19629 > Project: Spark > Issue Type: Bug > Components: Input/Output, Spark Core >Affects Versions: 2.0.0, 2.1.0 > Environment:
[jira] [Updated] (SPARK-19629) Partitioning of Parquet is not considered correctly at loading in local[X] mode
[ https://issues.apache.org/jira/browse/SPARK-19629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Navige updated SPARK-19629: --- Description: Running the following two examples will lead to different results depending on whether the code is run using Spark 1.6 or Spark 2.1. h1.What does the example do? - The code creates an exemplary dataframe with random data. - The dataframe is repartitioned and stored to disk. - Then the dataframe is re-read from disk. - The number of partitions of the dataframe is considered. h1. What is the expected behaviour? The number of partitions specified when storing the dataframe should be the same as when re-loading the dataframe from disk. On Spark 1.6 the partitioning is kept, i.e., the code example will return 10 partitions as specified using npartitions; on Spark 2.1 the number of partitions will equal the number of local nodes specified when starting Spark (using local[X] as master). h1.Minimal code example {code:none} # run on Spark 1.6 import scala.util.Random import org.apache.spark.sql.types.{StructField, StructType, FloatType} import org.apache.spark.sql.Row val rdd = sc.parallelize(Seq.fill(100)(Row(Seq(Random.nextFloat()): _*))) val df = sqlContext.createDataFrame(rdd, StructType(Seq(StructField("test", FloatType val npartitions = 10 df.repartition(npartitions).write.parquet("/tmp/test1") val read = sqlContext.read.parquet("/tmp/test1") assert(npartitions == read.rdd.getNumPartitions) //true on Spark 1.6 {code} {code:none} # run on Spark 2.1 import scala.util.Random import org.apache.spark.sql.types.{StructField, StructType, FloatType} import org.apache.spark.sql.Row val rdd = sc.parallelize(Seq.fill(100)(Row(Seq(Random.nextFloat()): _*))) val df = spark.sqlContext.createDataFrame(rdd, StructType(Seq(StructField("test", FloatType val npartitions = 10 df.repartition(npartitions).write.parquet("/tmp/test1") val read = spark.sqlContext.read.parquet("/tmp/test1") assert(npartitions == read.rdd.getNumPartitions) //false on Spark 2.1 {code} was: Running the following two examples will lead to different results depending on whether the code is run using Spark 1.6 or Spark 2.1. What does the example do? === - The code creates an exemplary dataframe with random data. - The dataframe is repartitioned and stored to disk. - Then the dataframe is re-read from disk. - The number of partitions of the dataframe is considered. What is the expected behaviour? === The number of partitions specified when storing the dataframe should be the same as when re-loading the dataframe from disk. On Spark 1.6 the partitioning is kept, i.e., the code example will return 10 partitions as specified using npartitions; on Spark 2.1 the number of partitions will equal the number of local nodes specified when starting Spark (using local[X] as master). Minimal code example === {code:none} # run on Spark 1.6 import scala.util.Random import org.apache.spark.sql.types.{StructField, StructType, FloatType} import org.apache.spark.sql.Row val rdd = sc.parallelize(Seq.fill(100)(Row(Seq(Random.nextFloat()): _*))) val df = sqlContext.createDataFrame(rdd, StructType(Seq(StructField("test", FloatType val npartitions = 10 df.repartition(npartitions).write.parquet("/tmp/test1") val read = sqlContext.read.parquet("/tmp/test1") assert(npartitions == read.rdd.getNumPartitions) //true on Spark 1.6 {code} {code:none} # run on Spark 2.1 import scala.util.Random import org.apache.spark.sql.types.{StructField, StructType, FloatType} import org.apache.spark.sql.Row val rdd = sc.parallelize(Seq.fill(100)(Row(Seq(Random.nextFloat()): _*))) val df = spark.sqlContext.createDataFrame(rdd, StructType(Seq(StructField("test", FloatType val npartitions = 10 df.repartition(npartitions).write.parquet("/tmp/test1") val read = spark.sqlContext.read.parquet("/tmp/test1") assert(npartitions == read.rdd.getNumPartitions) //false on Spark 2.1 {code} > Partitioning of Parquet is not considered correctly at loading in local[X] > mode > --- > > Key: SPARK-19629 > URL: https://issues.apache.org/jira/browse/SPARK-19629 > Project: Spark > Issue Type: Bug > Components: Input/Output, Spark Core >Affects Versions: 2.0.0, 2.1.0 > Environment: Tested using docker run > gettyimages/spark:1.6.1-hadoop-2.6 and > docker run gettyimages/spark:2.1.0-hadoop-2.7. >Reporter: Navige >Priority: Minor > > Running the following two examples will lead to different results depending > on whether the code is run using Spark 1.6 or Spark 2.1. > h1.What does the example do? > - The code creates an exemplary dataframe with random data. > - The dataframe is repartitioned and stored to disk. > - Then the datafr
[jira] [Updated] (SPARK-19629) Partitioning of Parquet is not considered correctly at loading in local[X] mode
[ https://issues.apache.org/jira/browse/SPARK-19629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Navige updated SPARK-19629: --- Description: Running the following two examples will lead to different results depending on whether the code is run using Spark 1.6 or Spark 2.1. What does the example do? === - The code creates an exemplary dataframe with random data. - The dataframe is repartitioned and stored to disk. - Then the dataframe is re-read from disk. - The number of partitions of the dataframe is considered. What is the expected behaviour? === The number of partitions specified when storing the dataframe should be the same as when re-loading the dataframe from disk. On Spark 1.6 the partitioning is kept, i.e., the code example will return 10 partitions as specified using npartitions; on Spark 2.1 the number of partitions will equal the number of local nodes specified when starting Spark (using local[X] as master). Minimal code example === {code:none} # run on Spark 1.6 import scala.util.Random import org.apache.spark.sql.types.{StructField, StructType, FloatType} import org.apache.spark.sql.Row val rdd = sc.parallelize(Seq.fill(100)(Row(Seq(Random.nextFloat()): _*))) val df = sqlContext.createDataFrame(rdd, StructType(Seq(StructField("test", FloatType val npartitions = 10 df.repartition(npartitions).write.parquet("/tmp/test1") val read = sqlContext.read.parquet("/tmp/test1") assert(npartitions == read.rdd.getNumPartitions) //true on Spark 1.6 {code} {code:none} # run on Spark 2.1 import scala.util.Random import org.apache.spark.sql.types.{StructField, StructType, FloatType} import org.apache.spark.sql.Row val rdd = sc.parallelize(Seq.fill(100)(Row(Seq(Random.nextFloat()): _*))) val df = spark.sqlContext.createDataFrame(rdd, StructType(Seq(StructField("test", FloatType val npartitions = 10 df.repartition(npartitions).write.parquet("/tmp/test1") val read = spark.sqlContext.read.parquet("/tmp/test1") assert(npartitions == read.rdd.getNumPartitions) //false on Spark 2.1 {code} was: Running the following two examples will lead to different results depending on whether the code is run using Spark 1.6 or Spark 2.1. What does the example do? === - The code creates an exemplary dataframe with random data. - The dataframe is repartitioned and stored to disk. - Then the dataframe is re-read from disk. - The number of partitions of the dataframe is considered. What is the expected behaviour? === The number of partitions specified when storing the dataframe should be the same as when re-loading the dataframe from disk. On Spark 1.6 the partitioning is kept, i.e., the code example will return 10 partitions as specified using npartitions; on Spark 2.1 the number of partitions will equal the number of local nodes specified when starting Spark (using local[X] as master). Minimal code example === {code:scala} # run on Spark 1.6 import scala.util.Random import org.apache.spark.sql.types.{StructField, StructType, FloatType} import org.apache.spark.sql.Row val rdd = sc.parallelize(Seq.fill(100)(Row(Seq(Random.nextFloat()): _*))) val df = sqlContext.createDataFrame(rdd, StructType(Seq(StructField("test", FloatType val npartitions = 10 df.repartition(npartitions).write.parquet("/tmp/test1") val read = sqlContext.read.parquet("/tmp/test1") assert(npartitions == read.rdd.getNumPartitions) //true on Spark 1.6 {code} {code:none} # run on Spark 2.1 import scala.util.Random import org.apache.spark.sql.types.{StructField, StructType, FloatType} import org.apache.spark.sql.Row val rdd = sc.parallelize(Seq.fill(100)(Row(Seq(Random.nextFloat()): _*))) val df = spark.sqlContext.createDataFrame(rdd, StructType(Seq(StructField("test", FloatType val npartitions = 10 df.repartition(npartitions).write.parquet("/tmp/test1") val read = spark.sqlContext.read.parquet("/tmp/test1") assert(npartitions == read.rdd.getNumPartitions) //false on Spark 2.1 {code} > Partitioning of Parquet is not considered correctly at loading in local[X] > mode > --- > > Key: SPARK-19629 > URL: https://issues.apache.org/jira/browse/SPARK-19629 > Project: Spark > Issue Type: Bug > Components: Input/Output, Spark Core >Affects Versions: 2.0.0, 2.1.0 > Environment: Tested using docker run > gettyimages/spark:1.6.1-hadoop-2.6 and > docker run gettyimages/spark:2.1.0-hadoop-2.7. >Reporter: Navige >Priority: Minor > > Running the following two examples will lead to different results depending > on whether the code is run using Spark 1.6 or Spark 2.1. > What does the example do? > === > - The code creates an exemplary da
[jira] [Updated] (SPARK-19629) Partitioning of Parquet is not considered correctly at loading in local[X] mode
[ https://issues.apache.org/jira/browse/SPARK-19629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Navige updated SPARK-19629: --- Description: Running the following two examples will lead to different results depending on whether the code is run using Spark 1.6 or Spark 2.1. What does the example do? === - The code creates an exemplary dataframe with random data. - The dataframe is repartitioned and stored to disk. - Then the dataframe is re-read from disk. - The number of partitions of the dataframe is considered. What is the expected behaviour? === The number of partitions specified when storing the dataframe should be the same as when re-loading the dataframe from disk. On Spark 1.6 the partitioning is kept, i.e., the code example will return 10 partitions as specified using npartitions; on Spark 2.1 the number of partitions will equal the number of local nodes specified when starting Spark (using local[X] as master). Minimal code example === {code:scala} # run on Spark 1.6 import scala.util.Random import org.apache.spark.sql.types.{StructField, StructType, FloatType} import org.apache.spark.sql.Row val rdd = sc.parallelize(Seq.fill(100)(Row(Seq(Random.nextFloat()): _*))) val df = sqlContext.createDataFrame(rdd, StructType(Seq(StructField("test", FloatType val npartitions = 10 df.repartition(npartitions).write.parquet("/tmp/test1") val read = sqlContext.read.parquet("/tmp/test1") assert(npartitions == read.rdd.getNumPartitions) //true on Spark 1.6 {code} {code:none} # run on Spark 2.1 import scala.util.Random import org.apache.spark.sql.types.{StructField, StructType, FloatType} import org.apache.spark.sql.Row val rdd = sc.parallelize(Seq.fill(100)(Row(Seq(Random.nextFloat()): _*))) val df = spark.sqlContext.createDataFrame(rdd, StructType(Seq(StructField("test", FloatType val npartitions = 10 df.repartition(npartitions).write.parquet("/tmp/test1") val read = spark.sqlContext.read.parquet("/tmp/test1") assert(npartitions == read.rdd.getNumPartitions) //false on Spark 2.1 {code} was: Running the following two examples will lead to different results depending on whether the code is run using Spark 1.6 or Spark 2.1. What does the example do? === - The code creates an exemplary dataframe with random data. - The dataframe is repartitioned and stored to disk. - Then the dataframe is re-read from disk. - The number of partitions of the dataframe is considered. What is the expected behaviour? === The number of partitions specified when storing the dataframe should be the same as when re-loading the dataframe from disk. On Spark 1.6 the partitioning is kept, i.e., the code example will return 10 partitions as specified using npartitions; on Spark 2.1 the number of partitions will equal the number of local nodes specified when starting Spark (using local[X] as master). Minimal code example === {code:scala} # run on Spark 1.6 import scala.util.Random import org.apache.spark.sql.types.{StructField, StructType, FloatType} import org.apache.spark.sql.Row val rdd = sc.parallelize(Seq.fill(100)(Row(Seq(Random.nextFloat()): _*))) val df = sqlContext.createDataFrame(rdd, StructType(Seq(StructField("test", FloatType val npartitions = 10 df.repartition(npartitions).write.parquet("/tmp/test1") val read = sqlContext.read.parquet("/tmp/test1") assert(npartitions == read.rdd.getNumPartitions) //true on Spark 1.6 {code} {code:scala} # run on Spark 2.1 import scala.util.Random import org.apache.spark.sql.types.{StructField, StructType, FloatType} import org.apache.spark.sql.Row val rdd = sc.parallelize(Seq.fill(100)(Row(Seq(Random.nextFloat()): _*))) val df = spark.sqlContext.createDataFrame(rdd, StructType(Seq(StructField("test", FloatType val npartitions = 10 df.repartition(npartitions).write.parquet("/tmp/test1") val read = spark.sqlContext.read.parquet("/tmp/test1") assert(npartitions == read.rdd.getNumPartitions) //false on Spark 2.1 {code} > Partitioning of Parquet is not considered correctly at loading in local[X] > mode > --- > > Key: SPARK-19629 > URL: https://issues.apache.org/jira/browse/SPARK-19629 > Project: Spark > Issue Type: Bug > Components: Input/Output, Spark Core >Affects Versions: 2.0.0, 2.1.0 > Environment: Tested using docker run > gettyimages/spark:1.6.1-hadoop-2.6 and > docker run gettyimages/spark:2.1.0-hadoop-2.7. >Reporter: Navige >Priority: Minor > > Running the following two examples will lead to different results depending > on whether the code is run using Spark 1.6 or Spark 2.1. > What does the example do? > === > - The code creates an exemplary
[jira] [Updated] (SPARK-19629) Partitioning of Parquet is not considered correctly at loading in local[X] mode
[ https://issues.apache.org/jira/browse/SPARK-19629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Navige updated SPARK-19629: --- Description: Running the following two examples will lead to different results depending on whether the code is run using Spark 1.6 or Spark 2.1. What does the example do? === - The code creates an exemplary dataframe with random data. - The dataframe is repartitioned and stored to disk. - Then the dataframe is re-read from disk. - The number of partitions of the dataframe is considered. What is the expected behaviour? === The number of partitions specified when storing the dataframe should be the same as when re-loading the dataframe from disk. On Spark 1.6 the partitioning is kept, i.e., the code example will return 10 partitions as specified using npartitions; on Spark 2.1 the number of partitions will equal the number of local nodes specified when starting Spark (using local[X] as master). Minimal code example === {code:scala} # run on Spark 1.6 import scala.util.Random import org.apache.spark.sql.types.{StructField, StructType, FloatType} import org.apache.spark.sql.Row val rdd = sc.parallelize(Seq.fill(100)(Row(Seq(Random.nextFloat()): _*))) val df = sqlContext.createDataFrame(rdd, StructType(Seq(StructField("test", FloatType val npartitions = 10 df.repartition(npartitions).write.parquet("/tmp/test1") val read = sqlContext.read.parquet("/tmp/test1") assert(npartitions == read.rdd.getNumPartitions) //true on Spark 1.6 {code} {code:scala} # run on Spark 2.1 import scala.util.Random import org.apache.spark.sql.types.{StructField, StructType, FloatType} import org.apache.spark.sql.Row val rdd = sc.parallelize(Seq.fill(100)(Row(Seq(Random.nextFloat()): _*))) val df = spark.sqlContext.createDataFrame(rdd, StructType(Seq(StructField("test", FloatType val npartitions = 10 df.repartition(npartitions).write.parquet("/tmp/test1") val read = spark.sqlContext.read.parquet("/tmp/test1") assert(npartitions == read.rdd.getNumPartitions) //false on Spark 2.1 {code} was: Running the following two examples will lead to different results depending on whether the code is run using Spark 1.6 or Spark 2.1. What does the example do? === - The code creates an exemplary dataframe with random data. - The dataframe is repartitioned and stored to disk. - Then the dataframe is re-read from disk. - The number of partitions of the dataframe is considered. What is the expected behaviour? === The number of partitions specified when storing the dataframe should be the same as when re-loading the dataframe from disk. On Spark 1.6 the partitioning is kept, i.e., the code example will return 10 partitions as specified using npartitions; on Spark 2.1 the number of partitions will equal the number of local nodes specified when starting Spark (using local[X] as master). Minimal code example === ``` # run on Spark 1.6 import scala.util.Random import org.apache.spark.sql.types.{StructField, StructType, FloatType} import org.apache.spark.sql.Row val rdd = sc.parallelize(Seq.fill(100)(Row(Seq(Random.nextFloat()): _*))) val df = sqlContext.createDataFrame(rdd, StructType(Seq(StructField("test", FloatType val npartitions = 10 df.repartition(npartitions).write.parquet("/tmp/test1") val read = sqlContext.read.parquet("/tmp/test1") assert(npartitions == read.rdd.getNumPartitions) //true on Spark 1.6 ``` ``` # run on Spark 2.1 import scala.util.Random import org.apache.spark.sql.types.{StructField, StructType, FloatType} import org.apache.spark.sql.Row val rdd = sc.parallelize(Seq.fill(100)(Row(Seq(Random.nextFloat()): _*))) val df = spark.sqlContext.createDataFrame(rdd, StructType(Seq(StructField("test", FloatType val npartitions = 10 df.repartition(npartitions).write.parquet("/tmp/test1") val read = spark.sqlContext.read.parquet("/tmp/test1") assert(npartitions == read.rdd.getNumPartitions) //false on Spark 2.1 ``` > Partitioning of Parquet is not considered correctly at loading in local[X] > mode > --- > > Key: SPARK-19629 > URL: https://issues.apache.org/jira/browse/SPARK-19629 > Project: Spark > Issue Type: Bug > Components: Input/Output, Spark Core >Affects Versions: 2.0.0, 2.1.0 > Environment: Tested using docker run > gettyimages/spark:1.6.1-hadoop-2.6 and > docker run gettyimages/spark:2.1.0-hadoop-2.7. >Reporter: Navige >Priority: Minor > > Running the following two examples will lead to different results depending > on whether the code is run using Spark 1.6 or Spark 2.1. > What does the example do? > === > - The code creates an exemplary dataframe with random da
[jira] [Updated] (SPARK-19629) Partitioning of Parquet is not considered correctly at loading in local[X] mode
[ https://issues.apache.org/jira/browse/SPARK-19629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Navige updated SPARK-19629: --- Description: Running the following two examples will lead to different results depending on whether the code is run using Spark 1.6 or Spark 2.1. What does the example do? === - The code creates an exemplary dataframe with random data. - The dataframe is repartitioned and stored to disk. - Then the dataframe is re-read from disk. - The number of partitions of the dataframe is considered. What is the expected behaviour? === The number of partitions specified when storing the dataframe should be the same as when re-loading the dataframe from disk. On Spark 1.6 the partitioning is kept, i.e., the code example will return 10 partitions as specified using npartitions; on Spark 2.1 the number of partitions will equal the number of local nodes specified when starting Spark (using local[X] as master). Minimal code example === ``` # run on Spark 1.6 import scala.util.Random import org.apache.spark.sql.types.{StructField, StructType, FloatType} import org.apache.spark.sql.Row val rdd = sc.parallelize(Seq.fill(100)(Row(Seq(Random.nextFloat()): _*))) val df = sqlContext.createDataFrame(rdd, StructType(Seq(StructField("test", FloatType val npartitions = 10 df.repartition(npartitions).write.parquet("/tmp/test1") val read = sqlContext.read.parquet("/tmp/test1") assert(npartitions == read.rdd.getNumPartitions) //true on Spark 1.6 ``` ``` # run on Spark 2.1 import scala.util.Random import org.apache.spark.sql.types.{StructField, StructType, FloatType} import org.apache.spark.sql.Row val rdd = sc.parallelize(Seq.fill(100)(Row(Seq(Random.nextFloat()): _*))) val df = spark.sqlContext.createDataFrame(rdd, StructType(Seq(StructField("test", FloatType val npartitions = 10 df.repartition(npartitions).write.parquet("/tmp/test1") val read = spark.sqlContext.read.parquet("/tmp/test1") assert(npartitions == read.rdd.getNumPartitions) //false on Spark 2.1 ``` was: Running the following two examples will lead to different results depending on whether the code is run using Spark 1.6 or Spark 2.1. What does the example do? === - The code creates an exemplary dataframe with random data. - The dataframe is repartitioned and stored to disk. - Then the dataframe is re-read from disk. - The number of partitions of the dataframe is considered. What is the expected behaviour? === The number of partitions specified when storing the dataframe should be the same as when re-loading the dataframe from disk. On Spark 1.6 the partitioning is kept, i.e., the code example will return 10 partitions as specified using npartitions; on Spark 2.1 the number of partitions will equal the number of local nodes specified when starting Spark (using local[X] as master). Minimal code example === # run on Spark 1.6 import scala.util.Random import org.apache.spark.sql.types.{StructField, StructType, FloatType} import org.apache.spark.sql.Row val rdd = sc.parallelize(Seq.fill(100)(Row(Seq(Random.nextFloat()): _*))) val df = sqlContext.createDataFrame(rdd, StructType(Seq(StructField("test", FloatType val npartitions = 10 df.repartition(npartitions).write.parquet("/tmp/test1") val read = sqlContext.read.parquet("/tmp/test1") assert(npartitions == read.rdd.getNumPartitions) //true on Spark 1.6 # run on Spark 2.1 import scala.util.Random import org.apache.spark.sql.types.{StructField, StructType, FloatType} import org.apache.spark.sql.Row val rdd = sc.parallelize(Seq.fill(100)(Row(Seq(Random.nextFloat()): _*))) val df = spark.sqlContext.createDataFrame(rdd, StructType(Seq(StructField("test", FloatType val npartitions = 10 df.repartition(npartitions).write.parquet("/tmp/test1") val read = spark.sqlContext.read.parquet("/tmp/test1") assert(npartitions == read.rdd.getNumPartitions) //false on Spark 2.1 > Partitioning of Parquet is not considered correctly at loading in local[X] > mode > --- > > Key: SPARK-19629 > URL: https://issues.apache.org/jira/browse/SPARK-19629 > Project: Spark > Issue Type: Bug > Components: Input/Output, Spark Core >Affects Versions: 2.0.0, 2.1.0 > Environment: Tested using docker run > gettyimages/spark:1.6.1-hadoop-2.6 and > docker run gettyimages/spark:2.1.0-hadoop-2.7. >Reporter: Navige >Priority: Minor > > Running the following two examples will lead to different results depending > on whether the code is run using Spark 1.6 or Spark 2.1. > What does the example do? > === > - The code creates an exemplary dataframe with random data. > - The dataframe is repartitioned
[jira] [Updated] (SPARK-19629) Partitioning of Parquet is not considered correctly at loading in local[X] mode
[ https://issues.apache.org/jira/browse/SPARK-19629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Navige updated SPARK-19629: --- Description: Running the following two examples will lead to different results depending on whether the code is run using Spark 1.6 or Spark 2.1. What does the example do? === - The code creates an exemplary dataframe with random data. - The dataframe is repartitioned and stored to disk. - Then the dataframe is re-read from disk. - The number of partitions of the dataframe is considered. What is the expected behaviour? === The number of partitions specified when storing the dataframe should be the same as when re-loading the dataframe from disk. On Spark 1.6 the partitioning is kept, i.e., the code example will return 10 partitions as specified using npartitions; on Spark 2.1 the number of partitions will equal the number of local nodes specified when starting Spark (using local[X] as master). Minimal code example === # run on Spark 1.6 import scala.util.Random import org.apache.spark.sql.types.{StructField, StructType, FloatType} import org.apache.spark.sql.Row val rdd = sc.parallelize(Seq.fill(100)(Row(Seq(Random.nextFloat()): _*))) val df = sqlContext.createDataFrame(rdd, StructType(Seq(StructField("test", FloatType val npartitions = 10 df.repartition(npartitions).write.parquet("/tmp/test1") val read = sqlContext.read.parquet("/tmp/test1") assert(npartitions == read.rdd.getNumPartitions) //true on Spark 1.6 # run on Spark 2.1 import scala.util.Random import org.apache.spark.sql.types.{StructField, StructType, FloatType} import org.apache.spark.sql.Row val rdd = sc.parallelize(Seq.fill(100)(Row(Seq(Random.nextFloat()): _*))) val df = spark.sqlContext.createDataFrame(rdd, StructType(Seq(StructField("test", FloatType val npartitions = 10 df.repartition(npartitions).write.parquet("/tmp/test1") val read = spark.sqlContext.read.parquet("/tmp/test1") assert(npartitions == read.rdd.getNumPartitions) //false on Spark 2.1 was: Running the following two examples will lead to different results depending on whether the code is run using Spark 1.6 or Spark 2.1. What does the example do? === -The code creates an exemplary dataframe with random data. - The dataframe is repartitioned and stored to disk. - Then the dataframe is re-read from disk. - The number of partitions of the dataframe is considered. What is the expected behaviour? === The number of partitions specified when storing the dataframe should be the same as when re-loading the dataframe from disk. On Spark 1.6 the partitioning is kept, i.e., the code example will return 10 partitions as specified using npartitions; on Spark 2.1 the number of partitions will equal the number of local nodes specified when starting Spark (using local[X] as master). Minimal code example === # run on Spark 1.6 import scala.util.Random import org.apache.spark.sql.types.{StructField, StructType, FloatType} import org.apache.spark.sql.Row val rdd = sc.parallelize(Seq.fill(100)(Row(Seq(Random.nextFloat()): _*))) val df = sqlContext.createDataFrame(rdd, StructType(Seq(StructField("test", FloatType val npartitions = 10 df.repartition(npartitions).write.parquet("/tmp/test1") val read = sqlContext.read.parquet("/tmp/test1") assert(npartitions == read.rdd.getNumPartitions) //true on Spark 1.6 # run on Spark 2.1 import scala.util.Random import org.apache.spark.sql.types.{StructField, StructType, FloatType} import org.apache.spark.sql.Row val rdd = sc.parallelize(Seq.fill(100)(Row(Seq(Random.nextFloat()): _*))) val df = spark.sqlContext.createDataFrame(rdd, StructType(Seq(StructField("test", FloatType val npartitions = 10 df.repartition(npartitions).write.parquet("/tmp/test1") val read = spark.sqlContext.read.parquet("/tmp/test1") assert(npartitions == read.rdd.getNumPartitions) //false on Spark 2.1 > Partitioning of Parquet is not considered correctly at loading in local[X] > mode > --- > > Key: SPARK-19629 > URL: https://issues.apache.org/jira/browse/SPARK-19629 > Project: Spark > Issue Type: Bug > Components: Input/Output, Spark Core >Affects Versions: 2.0.0, 2.1.0 > Environment: Tested using docker run > gettyimages/spark:1.6.1-hadoop-2.6 and > docker run gettyimages/spark:2.1.0-hadoop-2.7. >Reporter: Navige >Priority: Minor > > Running the following two examples will lead to different results depending > on whether the code is run using Spark 1.6 or Spark 2.1. > What does the example do? > === > - The code creates an exemplary dataframe with random data. > - The dataframe is repartitioned and stored to