[jira] [Commented] (SPARK-22223) ObjectHashAggregate introduces unnecessary shuffle

2017-10-10 Thread Michele Costantino Soccio (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16199831#comment-16199831
 ] 

Michele Costantino Soccio commented on SPARK-3:
---

[~maropu] Not sure I understand your observation.

In the hash-based aggregate implementation Spark does a repartitioning and a 
shuffle at each {{groupBy}}.

In the sort-based aggregate one, Spark does no repartition and no shuffle.

I would like to ignore whether Spark prefers sort-based or hash-based 
aggregation, but I do not want Spark to shuffle and repartition when there is 
no need for it.

> ObjectHashAggregate introduces unnecessary shuffle
> --
>
> Key: SPARK-3
> URL: https://issues.apache.org/jira/browse/SPARK-3
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 2.2.0
> Environment: Spark 2.2.0 and following.
> {{spark.sql.execution.useObjectHashAggregateExec = true}}
>Reporter: Michele Costantino Soccio
>
> Since Spark 2.2 the {{groupBy}} plus {{collect_list}} makes use of 
> unnecessary shuffle when the partitions at previous step are based on looser 
> criteria than the current {{groupBy}}.
> For example:
> {code:java}
> //sample data from 
> https://github.com/databricks/Spark-The-Definitive-Guide/tree/master/data/retail-data
> //Read the data and repartitions by "Country"
> val retailDF = spark.sql("Select * from online_retail")
> .repartition(col("Country"))
> //Group the data and collect.
> val aggregatedDF = retailDF
>   .withColumn("Good", expr("(StockCode, UnitPrice, Quantity, Description)"))
>   .groupBy("Country", "CustomerID", "InvoiceNo", "InvoiceDate")
>   .agg(collect_list("Good").as("Goods"))
>   .withColumn("Invoice", expr("(InvoiceNo, InvoiceDate, Goods)"))
>   .groupBy("Country", "CustomerID")
>   .agg(collect_list("Invoice").as("Invoices"))
>   .withColumn("Customer", expr("(CustomerID, Invoices)"))
>   .groupBy("Country")
>   .agg(collect_list("Customer").as("Customers"))
> {code}
> Without disabling the {{ObjectHashAggregate}} one gets the following physical 
> plan:
> {noformat}
> == Physical Plan ==
> ObjectHashAggregate(keys=[Country#14], 
> functions=[finalmerge_collect_list(merge buf#317) AS 
> collect_list(Customer#299, 0, 0)#310])
> +- Exchange hashpartitioning(Country#14, 200)
>+- ObjectHashAggregate(keys=[Country#14], 
> functions=[partial_collect_list(Customer#299, 0, 0) AS buf#317])
>   +- *Project [Country#14, named_struct(CustomerID, CustomerID#13, 
> Invoices, Invoices#294) AS Customer#299]
>  +- ObjectHashAggregate(keys=[Country#14, CustomerID#13], 
> functions=[finalmerge_collect_list(merge buf#319) AS 
> collect_list(Invoice#278, 0, 0)#293])
> +- Exchange hashpartitioning(Country#14, CustomerID#13, 200)
>+- ObjectHashAggregate(keys=[Country#14, CustomerID#13], 
> functions=[partial_collect_list(Invoice#278, 0, 0) AS buf#319])
>   +- *Project [Country#14, CustomerID#13, 
> named_struct(InvoiceNo, InvoiceNo#7, InvoiceDate, InvoiceDate#11, Goods, 
> Goods#271) AS Invoice#278]
>  +- ObjectHashAggregate(keys=[Country#14, CustomerID#13, 
> InvoiceNo#7, InvoiceDate#11], functions=[finalmerge_collect_list(merge 
> buf#321) AS collect_list(Good#249, 0, 0)#270])
> +- Exchange hashpartitioning(Country#14, 
> CustomerID#13, InvoiceNo#7, InvoiceDate#11, 200)
>+- ObjectHashAggregate(keys=[Country#14, 
> CustomerID#13, InvoiceNo#7, InvoiceDate#11], 
> functions=[partial_collect_list(Good#249, 0, 0) AS buf#321])
>   +- *Project [InvoiceNo#7, InvoiceDate#11, 
> CustomerID#13, Country#14, named_struct(StockCode, StockCode#8, UnitPrice, 
> UnitPrice#12, Quantity, Quantity#10, Description, Description#9) AS Good#249]
>  +- Exchange hashpartitioning(Country#14, 200)
> +- *FileScan csv 
> default.online_retail[InvoiceNo#7,StockCode#8,Description#9,Quantity#10,InvoiceDate#11,UnitPrice#12,CustomerID#13,Country#14]
>  Batched: false, Format: CSV, Location: 
> InMemoryFileIndex[dbfs:/FileStore/tables/scgc0grb1506404260438], 
> PartitionFilters: [], PushedFilters: [], ReadSchema: 
> struct {noformat}
> With Spark 2.1.0 or when {{ObjectHashAggregate}} is disabled, one gets a more 
> efficient:
> {noformat}
> == Physical Plan ==
> SortAggregate(key=[Country#14], functions=[finalmerge_collect_list(merge 
> buf#3834) AS collect_list(Customer#299, 0, 0)#310])
> +- SortAggregate(key=[Country#14], 
> functions=[partial_collect_list(Customer#299, 0, 0) AS buf#3834])
>+- *Project [Country#14, named_struct(CustomerID, CustomerID#13, 

[jira] [Comment Edited] (SPARK-22236) CSV I/O: does not respect RFC 4180

2017-10-10 Thread Hyukjin Kwon (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22236?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16199825#comment-16199825
 ] 

Hyukjin Kwon edited comment on SPARK-22236 at 10/11/17 5:51 AM:


Yup, what is said ^ looks all correct.

I think it would have been rather better to set all the default as are in 
Univocity in the (far) future if possible and if there were no cost at all to 
change them (at least partly because it is sometimes difficult to prepare a 
reproducer and report a bug into Univocity and I carefully guess the Univocity 
author also feels in a similar way assuming few comments).

Anyway, for now, they happen to be set differently in the first place. In a 
quick look, the root cause is due to {{escape}} being {{}} in Spark where 
Univocity parser has {{"}} by default.


was (Author: hyukjin.kwon):
Yup, what is said ^ looks all correct. I think it'd be rather better to set all 
the default as are in Univocity in the (far) future if possible and if there 
were no cost at all to change them (at least partly because it was sometimes 
difficult to prepare a reproducer and report a bug into Univocity and I 
carefully guess the Univocity author also feels in a similar way assuming few 
comments).

They happen to be set differently in the first place. In a quick look, the root 
cause is due to {{escape}} being {{}} in Spark where Univocity parser has 
{{"}} by default.

> CSV I/O: does not respect RFC 4180
> --
>
> Key: SPARK-22236
> URL: https://issues.apache.org/jira/browse/SPARK-22236
> Project: Spark
>  Issue Type: Improvement
>  Components: Input/Output
>Affects Versions: 2.2.0
>Reporter: Ondrej Kokes
>Priority: Minor
>
> When reading or writing CSV files with Spark, double quotes are escaped with 
> a backslash by default. However, the appropriate behaviour as set out by RFC 
> 4180 (and adhered to by many software packages) is to escape using a second 
> double quote.
> This piece of Python code demonstrates the issue
> {code}
> import csv
> with open('testfile.csv', 'w') as f:
> cw = csv.writer(f)
> cw.writerow(['a 2.5" drive', 'another column'])
> cw.writerow(['a "quoted" string', '"quoted"'])
> cw.writerow([1,2])
> with open('testfile.csv') as f:
> print(f.read())
> # "a 2.5"" drive",another column
> # "a ""quoted"" string","""quoted"""
> # 1,2
> spark.read.csv('testfile.csv').collect()
> # [Row(_c0='"a 2.5"" drive"', _c1='another column'),
> #  Row(_c0='"a ""quoted"" string"', _c1='"""quoted"""'),
> #  Row(_c0='1', _c1='2')]
> # explicitly stating the escape character fixed the issue
> spark.read.option('escape', '"').csv('testfile.csv').collect()
> # [Row(_c0='a 2.5" drive', _c1='another column'),
> #  Row(_c0='a "quoted" string', _c1='"quoted"'),
> #  Row(_c0='1', _c1='2')]
> {code}
> The same applies to writes, where reading the file written by Spark may 
> result in garbage.
> {code}
> df = spark.read.option('escape', '"').csv('testfile.csv') # reading the file 
> correctly
> df.write.format("csv").save('testout.csv')
> with open('testout.csv/part-csv') as f:
> cr = csv.reader(f)
> print(next(cr))
> print(next(cr))
> # ['a 2.5\\ drive"', 'another column']
> # ['a \\quoted\\" string"', '\\quoted\\""']
> {code}
> While it's possible to work with CSV files in a "compatible" manner, it would 
> be useful if Spark had sensible defaults that conform to the above-mentioned 
> RFC (as well as W3C recommendations). I realise this would be a breaking 
> change and thus if accepted, it would probably need to result in a warning 
> first, before moving to a new default.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-22236) CSV I/O: does not respect RFC 4180

2017-10-10 Thread Hyukjin Kwon (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22236?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16199825#comment-16199825
 ] 

Hyukjin Kwon edited comment on SPARK-22236 at 10/11/17 5:47 AM:


Yup, what is said ^ looks all correct. I think it'd be rather better to set all 
the default as are in Univocity in the (far) future if possible and if there 
were no cost at all to change them (at least partly because it was sometimes 
difficult to prepare a reproducer and report a bug into Univocity and I 
carefully guess the Univocity author also feels in a similar way assuming few 
comments).

They happen to be set differently in the first place. In a quick look, the root 
cause is due to {{escape}} being {{}} in Spark where Univocity parser has 
{{"}} by default.


was (Author: hyukjin.kwon):
Yup, what is said ^ looks all correct. I think we should set all the default as 
are in Univocity in the future if possible. They happen to be set differently 
in the first place. In a quick look, the root cause is due to {{escape}} being 
{{}} in Spark where Univocity parser has {{"}} by default.

> CSV I/O: does not respect RFC 4180
> --
>
> Key: SPARK-22236
> URL: https://issues.apache.org/jira/browse/SPARK-22236
> Project: Spark
>  Issue Type: Improvement
>  Components: Input/Output
>Affects Versions: 2.2.0
>Reporter: Ondrej Kokes
>Priority: Minor
>
> When reading or writing CSV files with Spark, double quotes are escaped with 
> a backslash by default. However, the appropriate behaviour as set out by RFC 
> 4180 (and adhered to by many software packages) is to escape using a second 
> double quote.
> This piece of Python code demonstrates the issue
> {code}
> import csv
> with open('testfile.csv', 'w') as f:
> cw = csv.writer(f)
> cw.writerow(['a 2.5" drive', 'another column'])
> cw.writerow(['a "quoted" string', '"quoted"'])
> cw.writerow([1,2])
> with open('testfile.csv') as f:
> print(f.read())
> # "a 2.5"" drive",another column
> # "a ""quoted"" string","""quoted"""
> # 1,2
> spark.read.csv('testfile.csv').collect()
> # [Row(_c0='"a 2.5"" drive"', _c1='another column'),
> #  Row(_c0='"a ""quoted"" string"', _c1='"""quoted"""'),
> #  Row(_c0='1', _c1='2')]
> # explicitly stating the escape character fixed the issue
> spark.read.option('escape', '"').csv('testfile.csv').collect()
> # [Row(_c0='a 2.5" drive', _c1='another column'),
> #  Row(_c0='a "quoted" string', _c1='"quoted"'),
> #  Row(_c0='1', _c1='2')]
> {code}
> The same applies to writes, where reading the file written by Spark may 
> result in garbage.
> {code}
> df = spark.read.option('escape', '"').csv('testfile.csv') # reading the file 
> correctly
> df.write.format("csv").save('testout.csv')
> with open('testout.csv/part-csv') as f:
> cr = csv.reader(f)
> print(next(cr))
> print(next(cr))
> # ['a 2.5\\ drive"', 'another column']
> # ['a \\quoted\\" string"', '\\quoted\\""']
> {code}
> While it's possible to work with CSV files in a "compatible" manner, it would 
> be useful if Spark had sensible defaults that conform to the above-mentioned 
> RFC (as well as W3C recommendations). I realise this would be a breaking 
> change and thus if accepted, it would probably need to result in a warning 
> first, before moving to a new default.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22236) CSV I/O: does not respect RFC 4180

2017-10-10 Thread Hyukjin Kwon (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22236?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16199825#comment-16199825
 ] 

Hyukjin Kwon commented on SPARK-22236:
--

Yup, what is said ^ looks all correct. I think we should set all the default as 
are in Univocity in the future if possible. They happen to be set differently 
in the first place. In a quick look, the root cause is due to {{escape}} being 
{{}} in Spark where Univocity parser has {{"}} by default.

> CSV I/O: does not respect RFC 4180
> --
>
> Key: SPARK-22236
> URL: https://issues.apache.org/jira/browse/SPARK-22236
> Project: Spark
>  Issue Type: Improvement
>  Components: Input/Output
>Affects Versions: 2.2.0
>Reporter: Ondrej Kokes
>Priority: Minor
>
> When reading or writing CSV files with Spark, double quotes are escaped with 
> a backslash by default. However, the appropriate behaviour as set out by RFC 
> 4180 (and adhered to by many software packages) is to escape using a second 
> double quote.
> This piece of Python code demonstrates the issue
> {code}
> import csv
> with open('testfile.csv', 'w') as f:
> cw = csv.writer(f)
> cw.writerow(['a 2.5" drive', 'another column'])
> cw.writerow(['a "quoted" string', '"quoted"'])
> cw.writerow([1,2])
> with open('testfile.csv') as f:
> print(f.read())
> # "a 2.5"" drive",another column
> # "a ""quoted"" string","""quoted"""
> # 1,2
> spark.read.csv('testfile.csv').collect()
> # [Row(_c0='"a 2.5"" drive"', _c1='another column'),
> #  Row(_c0='"a ""quoted"" string"', _c1='"""quoted"""'),
> #  Row(_c0='1', _c1='2')]
> # explicitly stating the escape character fixed the issue
> spark.read.option('escape', '"').csv('testfile.csv').collect()
> # [Row(_c0='a 2.5" drive', _c1='another column'),
> #  Row(_c0='a "quoted" string', _c1='"quoted"'),
> #  Row(_c0='1', _c1='2')]
> {code}
> The same applies to writes, where reading the file written by Spark may 
> result in garbage.
> {code}
> df = spark.read.option('escape', '"').csv('testfile.csv') # reading the file 
> correctly
> df.write.format("csv").save('testout.csv')
> with open('testout.csv/part-csv') as f:
> cr = csv.reader(f)
> print(next(cr))
> print(next(cr))
> # ['a 2.5\\ drive"', 'another column']
> # ['a \\quoted\\" string"', '\\quoted\\""']
> {code}
> While it's possible to work with CSV files in a "compatible" manner, it would 
> be useful if Spark had sensible defaults that conform to the above-mentioned 
> RFC (as well as W3C recommendations). I realise this would be a breaking 
> change and thus if accepted, it would probably need to result in a warning 
> first, before moving to a new default.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-15474) ORC data source fails to write and read back empty dataframe

2017-10-10 Thread Dongjoon Hyun (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-15474?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-15474:
--
Affects Version/s: 2.2.0

>  ORC data source fails to write and read back empty dataframe
> -
>
> Key: SPARK-15474
> URL: https://issues.apache.org/jira/browse/SPARK-15474
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.1.1, 2.2.0
>Reporter: Hyukjin Kwon
>
> Currently ORC data source fails to write and read empty data.
> The code below:
> {code}
> val emptyDf = spark.range(10).limit(0)
> emptyDf.write
>   .format("orc")
>   .save(path.getCanonicalPath)
> val copyEmptyDf = spark.read
>   .format("orc")
>   .load(path.getCanonicalPath)
> copyEmptyDf.show()
> {code}
> throws an exception below:
> {code}
> Unable to infer schema for ORC at 
> /private/var/folders/9j/gf_c342d7d150mwrxvkqnc18gn/T/spark-5b7aa45b-a37d-43e9-975e-a15b36b370da.
>  It must be specified manually;
> org.apache.spark.sql.AnalysisException: Unable to infer schema for ORC at 
> /private/var/folders/9j/gf_c342d7d150mwrxvkqnc18gn/T/spark-5b7aa45b-a37d-43e9-975e-a15b36b370da.
>  It must be specified manually;
>   at 
> org.apache.spark.sql.execution.datasources.DataSource$$anonfun$16.apply(DataSource.scala:352)
>   at 
> org.apache.spark.sql.execution.datasources.DataSource$$anonfun$16.apply(DataSource.scala:352)
>   at scala.Option.getOrElse(Option.scala:121)
>   at 
> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:351)
>   at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:130)
>   at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:140)
>   at 
> org.apache.spark.sql.sources.HadoopFsRelationTest$$anonfun$32$$anonfun$apply$mcV$sp$47.apply(HadoopFsRelationTest.scala:892)
>   at 
> org.apache.spark.sql.sources.HadoopFsRelationTest$$anonfun$32$$anonfun$apply$mcV$sp$47.apply(HadoopFsRelationTest.scala:884)
>   at 
> org.apache.spark.sql.test.SQLTestUtils$class.withTempPath(SQLTestUtils.scala:114)
> {code}
> Note that this is a different case with the data below
> {code}
> val emptyDf = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], schema)
> {code}
> In this case, any writer is not initialised and created. (no calls of 
> {{WriterContainer.writeRows()}}.
> For Parquet and JSON, it works but ORC does not.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-15474) ORC data source fails to write and read back empty dataframe

2017-10-10 Thread Dongjoon Hyun (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-15474?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-15474:
--
Affects Version/s: 2.1.1

>  ORC data source fails to write and read back empty dataframe
> -
>
> Key: SPARK-15474
> URL: https://issues.apache.org/jira/browse/SPARK-15474
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.1.1
>Reporter: Hyukjin Kwon
>
> Currently ORC data source fails to write and read empty data.
> The code below:
> {code}
> val emptyDf = spark.range(10).limit(0)
> emptyDf.write
>   .format("orc")
>   .save(path.getCanonicalPath)
> val copyEmptyDf = spark.read
>   .format("orc")
>   .load(path.getCanonicalPath)
> copyEmptyDf.show()
> {code}
> throws an exception below:
> {code}
> Unable to infer schema for ORC at 
> /private/var/folders/9j/gf_c342d7d150mwrxvkqnc18gn/T/spark-5b7aa45b-a37d-43e9-975e-a15b36b370da.
>  It must be specified manually;
> org.apache.spark.sql.AnalysisException: Unable to infer schema for ORC at 
> /private/var/folders/9j/gf_c342d7d150mwrxvkqnc18gn/T/spark-5b7aa45b-a37d-43e9-975e-a15b36b370da.
>  It must be specified manually;
>   at 
> org.apache.spark.sql.execution.datasources.DataSource$$anonfun$16.apply(DataSource.scala:352)
>   at 
> org.apache.spark.sql.execution.datasources.DataSource$$anonfun$16.apply(DataSource.scala:352)
>   at scala.Option.getOrElse(Option.scala:121)
>   at 
> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:351)
>   at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:130)
>   at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:140)
>   at 
> org.apache.spark.sql.sources.HadoopFsRelationTest$$anonfun$32$$anonfun$apply$mcV$sp$47.apply(HadoopFsRelationTest.scala:892)
>   at 
> org.apache.spark.sql.sources.HadoopFsRelationTest$$anonfun$32$$anonfun$apply$mcV$sp$47.apply(HadoopFsRelationTest.scala:884)
>   at 
> org.apache.spark.sql.test.SQLTestUtils$class.withTempPath(SQLTestUtils.scala:114)
> {code}
> Note that this is a different case with the data below
> {code}
> val emptyDf = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], schema)
> {code}
> In this case, any writer is not initialised and created. (no calls of 
> {{WriterContainer.writeRows()}}.
> For Parquet and JSON, it works but ORC does not.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-21641) Combining windowing (groupBy) and mapGroupsWithState (groupByKey) in Spark Structured Streaming

2017-10-10 Thread kant kodali (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16199811#comment-16199811
 ] 

kant kodali edited comment on SPARK-21641 at 10/11/17 4:58 AM:
---

@[~marmbrus] When can we possibly expect this?



was (Author: kant):
@[~marmbrus] When can we possible expect this?


> Combining windowing (groupBy) and mapGroupsWithState (groupByKey) in Spark 
> Structured Streaming
> ---
>
> Key: SPARK-21641
> URL: https://issues.apache.org/jira/browse/SPARK-21641
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Tudor Miu
>
> Given a stream of timestamped data with watermarking, there seems to be no 
> way to combine (1) the {{groupBy}} operation to achieve windowing by the 
> timestamp field and other grouping criteria with (2) the {{groupByKey}} 
> operation in order to apply {{mapGroupsWithState }}to the groups for custom 
> sessionization.
> For context:
> - calling {{groupBy}}, which supports windowing, on a Dataset returns a 
> {{RelationalGroupedDataset}} which does not have {{mapGroupsWithState}}.
> - calling {{groupByKey}}, which supports {{mapGroupsWithState}}, returns a 
> {{KeyValueGroupedDataset}}, but that has no support for windowing.
> The suggestion is to _somehow_ unify the two APIs.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-21641) Combining windowing (groupBy) and mapGroupsWithState (groupByKey) in Spark Structured Streaming

2017-10-10 Thread kant kodali (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16199811#comment-16199811
 ] 

kant kodali edited comment on SPARK-21641 at 10/11/17 4:58 AM:
---

@[~marmbrus] When can we possible expect this?



was (Author: kant):
[~marmbrus]


> Combining windowing (groupBy) and mapGroupsWithState (groupByKey) in Spark 
> Structured Streaming
> ---
>
> Key: SPARK-21641
> URL: https://issues.apache.org/jira/browse/SPARK-21641
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Tudor Miu
>
> Given a stream of timestamped data with watermarking, there seems to be no 
> way to combine (1) the {{groupBy}} operation to achieve windowing by the 
> timestamp field and other grouping criteria with (2) the {{groupByKey}} 
> operation in order to apply {{mapGroupsWithState }}to the groups for custom 
> sessionization.
> For context:
> - calling {{groupBy}}, which supports windowing, on a Dataset returns a 
> {{RelationalGroupedDataset}} which does not have {{mapGroupsWithState}}.
> - calling {{groupByKey}}, which supports {{mapGroupsWithState}}, returns a 
> {{KeyValueGroupedDataset}}, but that has no support for windowing.
> The suggestion is to _somehow_ unify the two APIs.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21641) Combining windowing (groupBy) and mapGroupsWithState (groupByKey) in Spark Structured Streaming

2017-10-10 Thread kant kodali (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16199811#comment-16199811
 ] 

kant kodali commented on SPARK-21641:
-

[~marmbrus]


> Combining windowing (groupBy) and mapGroupsWithState (groupByKey) in Spark 
> Structured Streaming
> ---
>
> Key: SPARK-21641
> URL: https://issues.apache.org/jira/browse/SPARK-21641
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Tudor Miu
>
> Given a stream of timestamped data with watermarking, there seems to be no 
> way to combine (1) the {{groupBy}} operation to achieve windowing by the 
> timestamp field and other grouping criteria with (2) the {{groupByKey}} 
> operation in order to apply {{mapGroupsWithState }}to the groups for custom 
> sessionization.
> For context:
> - calling {{groupBy}}, which supports windowing, on a Dataset returns a 
> {{RelationalGroupedDataset}} which does not have {{mapGroupsWithState}}.
> - calling {{groupByKey}}, which supports {{mapGroupsWithState}}, returns a 
> {{KeyValueGroupedDataset}}, but that has no support for windowing.
> The suggestion is to _somehow_ unify the two APIs.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20937) Describe spark.sql.parquet.writeLegacyFormat property in Spark SQL, DataFrames and Datasets Guide

2017-10-10 Thread Hyukjin Kwon (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16199808#comment-16199808
 ] 

Hyukjin Kwon commented on SPARK-20937:
--

+1 too.

> Describe spark.sql.parquet.writeLegacyFormat property in Spark SQL, 
> DataFrames and Datasets Guide
> -
>
> Key: SPARK-20937
> URL: https://issues.apache.org/jira/browse/SPARK-20937
> Project: Spark
>  Issue Type: Improvement
>  Components: Documentation, SQL
>Affects Versions: 2.3.0
>Reporter: Jacek Laskowski
>Priority: Trivial
>
> As a follow-up to SPARK-20297 (and SPARK-10400) in which 
> {{spark.sql.parquet.writeLegacyFormat}} property was recommended for Impala 
> and Hive, Spark SQL docs for [Parquet 
> Files|https://spark.apache.org/docs/latest/sql-programming-guide.html#configuration]
>  should have it documented.
> p.s. It was asked about in [Why can't Impala read parquet files after Spark 
> SQL's write?|https://stackoverflow.com/q/44279870/1305344] on StackOverflow 
> today.
> p.s. It's also covered in [~holden.ka...@gmail.com]'s "High Performance 
> Spark: Best Practices for Scaling and Optimizing Apache Spark" book (in Table 
> 3-10. Parquet data source options) that gives the option some wider publicity.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-20561) Running SparkR with no RHive installed in secured environment

2017-10-10 Thread Hyukjin Kwon (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-20561.
--
Resolution: Invalid

Question should better usually go to the mailing list, see 
https://spark.apache.org/community.html. I hope you could have a better answer 
from there.

> Running SparkR with no RHive installed in secured environment 
> --
>
> Key: SPARK-20561
> URL: https://issues.apache.org/jira/browse/SPARK-20561
> Project: Spark
>  Issue Type: Question
>  Components: Examples, Input/Output
>Affects Versions: 2.1.0
> Environment: Hadoop, Spark
>Reporter: Natalie
>
> I need to start running data mining analysis in secured environment (IP, 
> Port, and database name are given), where Spark runs on hive tables. So I 
> have installed R, SparkR, dplyr, and some other r libraries. 
> Now I understand that I need to point sparkR to that database(with 
> IP/Port/Name).
> What should be my R code?
> I start with evoking R,
> then SparkR library.
> Next I right sc<-sparkR.init()
> it tells me immediately that spark-submit command:not found
> Do I need to have RHive installed first? 
> Or should I actually point somehow to spark library and to that database?
> I couldn't find any documentation on that.
> Thank you



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22231) Support of map, filter, withColumn, dropColumn in nested list of structures

2017-10-10 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16199789#comment-16199789
 ] 

Liang-Chi Hsieh commented on SPARK-22231:
-

[~Jeremy Smith] Thanks for the context. Regarding the versions for the 
`Column`, I have a question:

You said the input `Column` to the lambda represents an element of the array. 
If so, should we call it `Element` instead of `Column`? Because it sounds not a 
column at all.



> Support of map, filter, withColumn, dropColumn in nested list of structures
> ---
>
> Key: SPARK-22231
> URL: https://issues.apache.org/jira/browse/SPARK-22231
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: DB Tsai
>Assignee: Jeremy Smith
>
> At Netflix's algorithm team, we work on ranking problems to find the great 
> content to fulfill the unique tastes of our members. Before building a 
> recommendation algorithms, we need to prepare the training, testing, and 
> validation datasets in Apache Spark. Due to the nature of ranking problems, 
> we have a nested list of items to be ranked in one column, and the top level 
> is the contexts describing the setting for where a model is to be used (e.g. 
> profiles, country, time, device, etc.)  Here is a blog post describing the 
> details, [Distributed Time Travel for Feature 
> Generation|https://medium.com/netflix-techblog/distributed-time-travel-for-feature-generation-389cccdd3907].
>  
> To be more concrete, for the ranks of videos for a given profile_id at a 
> given country, our data schema can be looked like this,
> {code:java}
> root
>  |-- profile_id: long (nullable = true)
>  |-- country_iso_code: string (nullable = true)
>  |-- items: array (nullable = false)
>  ||-- element: struct (containsNull = false)
>  |||-- title_id: integer (nullable = true)
>  |||-- scores: double (nullable = true)
> ...
> {code}
> We oftentimes need to work on the nested list of structs by applying some 
> functions on them. Sometimes, we're dropping or adding new columns in the 
> nested list of structs. Currently, there is no easy solution in open source 
> Apache Spark to perform those operations using SQL primitives; many people 
> just convert the data into RDD to work on the nested level of data, and then 
> reconstruct the new dataframe as workaround. This is extremely inefficient 
> because all the optimizations like predicate pushdown in SQL can not be 
> performed, we can not leverage on the columnar format, and the serialization 
> and deserialization cost becomes really huge even we just want to add a new 
> column in the nested level.
> We built a solution internally at Netflix which we're very happy with. We 
> plan to make it open source in Spark upstream. We would like to socialize the 
> API design to see if we miss any use-case.  
> The first API we added is *mapItems* on dataframe which take a function from 
> *Column* to *Column*, and then apply the function on nested dataframe. Here 
> is an example,
> {code:java}
> case class Data(foo: Int, bar: Double, items: Seq[Double])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(10.1, 10.2, 10.3, 10.4)),
>   Data(20, 20.0, Seq(20.1, 20.2, 20.3, 20.4))
> ))
> val result = df.mapItems("items") {
>   item => item * 2.0
> }
> result.printSchema()
> // root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // ||-- element: double (containsNull = true)
> result.show()
> // +---+++
> // |foo| bar|   items|
> // +---+++
> // | 10|10.0|[20.2, 20.4, 20.6...|
> // | 20|20.0|[40.2, 40.4, 40.6...|
> // +---+++
> {code}
> Now, with the ability of applying a function in the nested dataframe, we can 
> add a new function, *withColumn* in *Column* to add or replace the existing 
> column that has the same name in the nested list of struct. Here is two 
> examples demonstrating the API together with *mapItems*; the first one 
> replaces the existing column,
> {code:java}
> case class Item(a: Int, b: Double)
> case class Data(foo: Int, bar: Double, items: Seq[Item])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))),
>   Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0)))
> ))
> val result = df.mapItems("items") {
>   item => item.withColumn(item("b") + 1 as "b")
> }
> result.printSchema
> root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // ||-- element: struct (containsNull = true)
> // |||-- a: integer (nullable = true)
> // |||-- b: double 

[jira] [Comment Edited] (SPARK-20528) Add BinaryFileReader and Writer for DataFrames

2017-10-10 Thread Hyukjin Kwon (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16199788#comment-16199788
 ] 

Hyukjin Kwon edited comment on SPARK-20528 at 10/11/17 4:22 AM:


The Scala one in the JIRA description looks not working BTW in the current 
master:

{code}
scala> val binaryFilesRDD = sc.binaryFiles("README.md")
binaryFilesRDD: org.apache.spark.rdd.RDD[(String, 
org.apache.spark.input.PortableDataStream)] = mypath BinaryFileRDD[0] at 
binaryFiles at :24

scala> val binaryFilesDF = spark.createDataFrame(binaryFilesRDD)
java.lang.UnsupportedOperationException: No Encoder found for 
org.apache.spark.input.PortableDataStream
- field (class: "org.apache.spark.input.PortableDataStream", name: "_2")
- root class: "scala.Tuple2"
{code}


was (Author: hyukjin.kwon):
The Scala one looks not working BTW in the current master:

{code}
scala> val binaryFilesRDD = sc.binaryFiles("README.md")
binaryFilesRDD: org.apache.spark.rdd.RDD[(String, 
org.apache.spark.input.PortableDataStream)] = mypath BinaryFileRDD[0] at 
binaryFiles at :24

scala> val binaryFilesDF = spark.createDataFrame(binaryFilesRDD)
java.lang.UnsupportedOperationException: No Encoder found for 
org.apache.spark.input.PortableDataStream
- field (class: "org.apache.spark.input.PortableDataStream", name: "_2")
- root class: "scala.Tuple2"
{code}

> Add BinaryFileReader and Writer for DataFrames
> --
>
> Key: SPARK-20528
> URL: https://issues.apache.org/jira/browse/SPARK-20528
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Joseph K. Bradley
>
> It would be very useful to have a binary data reader/writer for DataFrames, 
> presumably called via {{spark.read.binaryFiles}}, etc.
> Currently, going through RDDs is annoying since it requires different code 
> paths for Scala vs Python:
> Scala:
> {code}
> val binaryFilesRDD = sc.binaryFiles("mypath")
> val binaryFilesDF = spark.createDataFrame(binaryFilesRDD)
> {code}
> Python:
> {code}
> binaryFilesRDD = sc.binaryFiles("mypath")
> binaryFilesRDD_recast = binaryFilesRDD.map(lambda x: (x[0], bytearray(x[1])))
> binaryFilesDF = spark.createDataFrame(binaryFilesRDD_recast)
> {code}
> This is because Scala and Python {{sc.binaryFiles}} return different types, 
> which makes sense in RDD land but not DataFrame land.
> My motivation here is working with images in Spark.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20528) Add BinaryFileReader and Writer for DataFrames

2017-10-10 Thread Hyukjin Kwon (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16199788#comment-16199788
 ] 

Hyukjin Kwon commented on SPARK-20528:
--

The Scala one looks not working BTW in the current master:

{code}
scala> val binaryFilesRDD = sc.binaryFiles("README.md")
binaryFilesRDD: org.apache.spark.rdd.RDD[(String, 
org.apache.spark.input.PortableDataStream)] = mypath BinaryFileRDD[0] at 
binaryFiles at :24

scala> val binaryFilesDF = spark.createDataFrame(binaryFilesRDD)
java.lang.UnsupportedOperationException: No Encoder found for 
org.apache.spark.input.PortableDataStream
- field (class: "org.apache.spark.input.PortableDataStream", name: "_2")
- root class: "scala.Tuple2"
{code}

> Add BinaryFileReader and Writer for DataFrames
> --
>
> Key: SPARK-20528
> URL: https://issues.apache.org/jira/browse/SPARK-20528
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Joseph K. Bradley
>
> It would be very useful to have a binary data reader/writer for DataFrames, 
> presumably called via {{spark.read.binaryFiles}}, etc.
> Currently, going through RDDs is annoying since it requires different code 
> paths for Scala vs Python:
> Scala:
> {code}
> val binaryFilesRDD = sc.binaryFiles("mypath")
> val binaryFilesDF = spark.createDataFrame(binaryFilesRDD)
> {code}
> Python:
> {code}
> binaryFilesRDD = sc.binaryFiles("mypath")
> binaryFilesRDD_recast = binaryFilesRDD.map(lambda x: (x[0], bytearray(x[1])))
> binaryFilesDF = spark.createDataFrame(binaryFilesRDD_recast)
> {code}
> This is because Scala and Python {{sc.binaryFiles}} return different types, 
> which makes sense in RDD land but not DataFrame land.
> My motivation here is working with images in Spark.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20528) Add BinaryFileReader and Writer for DataFrames

2017-10-10 Thread Hyukjin Kwon (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16199786#comment-16199786
 ] 

Hyukjin Kwon commented on SPARK-20528:
--

I know Scala one reads it as a stream, {{PortableDataStream}} and Python reads 
it as bytes (namely {{str}} in Python 2 and {{bytes}} in Python 3) but I think 
the actual codes could be quite similar as below:

Scala:

{code}
val binaryFilesRDD = sc.binaryFiles("README.md").map { x => (x._1, 
x._2.toArray) }
spark.createDataFrame(binaryFilesRDD)
{code}

Python:

{code}
binaryFilesRDD = sc.binaryFiles("README.md").map(lambda x: (x[0], 
bytearray(x[1])))
spark.createDataFrame(binaryFilesRDD).collect()
{code}

> Add BinaryFileReader and Writer for DataFrames
> --
>
> Key: SPARK-20528
> URL: https://issues.apache.org/jira/browse/SPARK-20528
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Joseph K. Bradley
>
> It would be very useful to have a binary data reader/writer for DataFrames, 
> presumably called via {{spark.read.binaryFiles}}, etc.
> Currently, going through RDDs is annoying since it requires different code 
> paths for Scala vs Python:
> Scala:
> {code}
> val binaryFilesRDD = sc.binaryFiles("mypath")
> val binaryFilesDF = spark.createDataFrame(binaryFilesRDD)
> {code}
> Python:
> {code}
> binaryFilesRDD = sc.binaryFiles("mypath")
> binaryFilesRDD_recast = binaryFilesRDD.map(lambda x: (x[0], bytearray(x[1])))
> binaryFilesDF = spark.createDataFrame(binaryFilesRDD_recast)
> {code}
> This is because Scala and Python {{sc.binaryFiles}} return different types, 
> which makes sense in RDD land but not DataFrame land.
> My motivation here is working with images in Spark.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18355) Spark SQL fails to read data from a ORC hive table that has a new column added to it

2017-10-10 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16199774#comment-16199774
 ] 

Apache Spark commented on SPARK-18355:
--

User 'dongjoon-hyun' has created a pull request for this issue:
https://github.com/apache/spark/pull/19470

> Spark SQL fails to read data from a ORC hive table that has a new column 
> added to it
> 
>
> Key: SPARK-18355
> URL: https://issues.apache.org/jira/browse/SPARK-18355
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.2, 2.1.0, 2.2.0
> Environment: Centos6
>Reporter: Sandeep Nemuri
>
> *PROBLEM*:
> Spark SQL fails to read data from a ORC hive table that has a new column 
> added to it.
> Below is the exception:
> {code}
> scala> sqlContext.sql("select click_id,search_id from testorc").show
> 16/11/03 22:17:53 INFO ParseDriver: Parsing command: select 
> click_id,search_id from testorc
> 16/11/03 22:17:54 INFO ParseDriver: Parse Completed
> java.lang.AssertionError: assertion failed
>   at scala.Predef$.assert(Predef.scala:165)
>   at 
> org.apache.spark.sql.execution.datasources.LogicalRelation$$anonfun$1.apply(LogicalRelation.scala:39)
>   at 
> org.apache.spark.sql.execution.datasources.LogicalRelation$$anonfun$1.apply(LogicalRelation.scala:38)
>   at scala.Option.map(Option.scala:145)
>   at 
> org.apache.spark.sql.execution.datasources.LogicalRelation.(LogicalRelation.scala:38)
>   at 
> org.apache.spark.sql.execution.datasources.LogicalRelation.copy(LogicalRelation.scala:31)
>   at 
> org.apache.spark.sql.hive.HiveMetastoreCatalog.org$apache$spark$sql$hive$HiveMetastoreCatalog$$convertToOrcRelation(HiveMetastoreCatalog.scala:588)
>   at 
> org.apache.spark.sql.hive.HiveMetastoreCatalog$OrcConversions$$anonfun$apply$2.applyOrElse(HiveMetastoreCatalog.scala:647)
>   at 
> org.apache.spark.sql.hive.HiveMetastoreCatalog$OrcConversions$$anonfun$apply$2.applyOrElse(HiveMetastoreCatalog.scala:643)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:335)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:335)
>   at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:334)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:332)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:332)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:281)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> {code}
> *STEPS TO SIMULATE THIS ISSUE*:
> 1) Create table in hive.
> {code}
> CREATE TABLE `testorc`( 
> `click_id` string, 
> `search_id` string, 
> `uid` bigint)
> PARTITIONED BY ( 
> `ts` string, 
> `hour` string) 
> STORED AS ORC; 
> {code}
> 2) Load data into table :
> {code}
> INSERT INTO TABLE testorc PARTITION (ts = '98765',hour = '01' ) VALUES 
> (12,2,12345);
> {code}
> 3) Select through spark shell (This works)
> {code}
> sqlContext.sql("select click_id,search_id from testorc").show
> {code}
> 4) Now add column to hive table
> {code}
> ALTER TABLE testorc ADD COLUMNS (dummy string);
> {code}
> 5) Now again select from spark shell
> {code}
> scala> sqlContext.sql("select click_id,search_id from testorc").show
> 16/11/03 22:17:53 INFO ParseDriver: Parsing command: select 
> click_id,search_id from testorc
> 16/11/03 22:17:54 INFO ParseDriver: Parse Completed
> java.lang.AssertionError: assertion failed
>   at scala.Predef$.assert(Predef.scala:165)
>   at 
> org.apache.spark.sql.execution.datasources.LogicalRelation$$anonfun$1.apply(LogicalRelation.scala:39)
>   at 
> org.apache.spark.sql.execution.datasources.LogicalRelation$$anonfun$1.apply(LogicalRelation.scala:38)
>   at scala.Option.map(Option.scala:145)
>   at 
> org.apache.spark.sql.execution.datasources.LogicalRelation.(LogicalRelation.scala:38)
>   at 
> org.apache.spark.sql.execution.datasources.LogicalRelation.copy(LogicalRelation.scala:31)
>   at 
> org.apache.spark.sql.hive.HiveMetastoreCatalog.org$apache$spark$sql$hive$HiveMetastoreCatalog$$convertToOrcRelation(HiveMetastoreCatalog.scala:588)
>   at 
> org.apache.spark.sql.hive.HiveMetastoreCatalog$OrcConversions$$anonfun$apply$2.applyOrElse(HiveMetastoreCatalog.scala:647)
>   at 
> org.apache.spark.sql.hive.HiveMetastoreCatalog$OrcConversions$$anonfun$apply$2.applyOrElse(HiveMetastoreCatalog.scala:643)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:335)
>   at 
> 

[jira] [Assigned] (SPARK-18355) Spark SQL fails to read data from a ORC hive table that has a new column added to it

2017-10-10 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-18355:


Assignee: Apache Spark

> Spark SQL fails to read data from a ORC hive table that has a new column 
> added to it
> 
>
> Key: SPARK-18355
> URL: https://issues.apache.org/jira/browse/SPARK-18355
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.2, 2.1.0, 2.2.0
> Environment: Centos6
>Reporter: Sandeep Nemuri
>Assignee: Apache Spark
>
> *PROBLEM*:
> Spark SQL fails to read data from a ORC hive table that has a new column 
> added to it.
> Below is the exception:
> {code}
> scala> sqlContext.sql("select click_id,search_id from testorc").show
> 16/11/03 22:17:53 INFO ParseDriver: Parsing command: select 
> click_id,search_id from testorc
> 16/11/03 22:17:54 INFO ParseDriver: Parse Completed
> java.lang.AssertionError: assertion failed
>   at scala.Predef$.assert(Predef.scala:165)
>   at 
> org.apache.spark.sql.execution.datasources.LogicalRelation$$anonfun$1.apply(LogicalRelation.scala:39)
>   at 
> org.apache.spark.sql.execution.datasources.LogicalRelation$$anonfun$1.apply(LogicalRelation.scala:38)
>   at scala.Option.map(Option.scala:145)
>   at 
> org.apache.spark.sql.execution.datasources.LogicalRelation.(LogicalRelation.scala:38)
>   at 
> org.apache.spark.sql.execution.datasources.LogicalRelation.copy(LogicalRelation.scala:31)
>   at 
> org.apache.spark.sql.hive.HiveMetastoreCatalog.org$apache$spark$sql$hive$HiveMetastoreCatalog$$convertToOrcRelation(HiveMetastoreCatalog.scala:588)
>   at 
> org.apache.spark.sql.hive.HiveMetastoreCatalog$OrcConversions$$anonfun$apply$2.applyOrElse(HiveMetastoreCatalog.scala:647)
>   at 
> org.apache.spark.sql.hive.HiveMetastoreCatalog$OrcConversions$$anonfun$apply$2.applyOrElse(HiveMetastoreCatalog.scala:643)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:335)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:335)
>   at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:334)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:332)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:332)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:281)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> {code}
> *STEPS TO SIMULATE THIS ISSUE*:
> 1) Create table in hive.
> {code}
> CREATE TABLE `testorc`( 
> `click_id` string, 
> `search_id` string, 
> `uid` bigint)
> PARTITIONED BY ( 
> `ts` string, 
> `hour` string) 
> STORED AS ORC; 
> {code}
> 2) Load data into table :
> {code}
> INSERT INTO TABLE testorc PARTITION (ts = '98765',hour = '01' ) VALUES 
> (12,2,12345);
> {code}
> 3) Select through spark shell (This works)
> {code}
> sqlContext.sql("select click_id,search_id from testorc").show
> {code}
> 4) Now add column to hive table
> {code}
> ALTER TABLE testorc ADD COLUMNS (dummy string);
> {code}
> 5) Now again select from spark shell
> {code}
> scala> sqlContext.sql("select click_id,search_id from testorc").show
> 16/11/03 22:17:53 INFO ParseDriver: Parsing command: select 
> click_id,search_id from testorc
> 16/11/03 22:17:54 INFO ParseDriver: Parse Completed
> java.lang.AssertionError: assertion failed
>   at scala.Predef$.assert(Predef.scala:165)
>   at 
> org.apache.spark.sql.execution.datasources.LogicalRelation$$anonfun$1.apply(LogicalRelation.scala:39)
>   at 
> org.apache.spark.sql.execution.datasources.LogicalRelation$$anonfun$1.apply(LogicalRelation.scala:38)
>   at scala.Option.map(Option.scala:145)
>   at 
> org.apache.spark.sql.execution.datasources.LogicalRelation.(LogicalRelation.scala:38)
>   at 
> org.apache.spark.sql.execution.datasources.LogicalRelation.copy(LogicalRelation.scala:31)
>   at 
> org.apache.spark.sql.hive.HiveMetastoreCatalog.org$apache$spark$sql$hive$HiveMetastoreCatalog$$convertToOrcRelation(HiveMetastoreCatalog.scala:588)
>   at 
> org.apache.spark.sql.hive.HiveMetastoreCatalog$OrcConversions$$anonfun$apply$2.applyOrElse(HiveMetastoreCatalog.scala:647)
>   at 
> org.apache.spark.sql.hive.HiveMetastoreCatalog$OrcConversions$$anonfun$apply$2.applyOrElse(HiveMetastoreCatalog.scala:643)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:335)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:335)

[jira] [Updated] (SPARK-22243) streaming job failed to restart from checkpoint

2017-10-10 Thread StephenZou (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22243?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

StephenZou updated SPARK-22243:
---
Component/s: (was: Spark Core)
 DStreams

> streaming job failed to restart from checkpoint
> ---
>
> Key: SPARK-22243
> URL: https://issues.apache.org/jira/browse/SPARK-22243
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.1.0, 2.2.0
>Reporter: StephenZou
> Attachments: CheckpointTest.scala
>
>
> My spark-defaults.conf has an item related to the issue, I upload all jars in 
> spark's jars folder to the hdfs path:
> spark.yarn.jars  hdfs:///spark/cache/spark2.2/* 
> Streaming job failed to restart from checkpoint, ApplicationMaster throws  
> "Error: Could not find or load main class 
> org.apache.spark.deploy.yarn.ExecutorLauncher".  The problem is always 
> reproducible.
> I examine the sparkconf object recovered from checkpoint, and find 
> spark.yarn.jars are set empty, which let all jars not exist in AM side. The 
> solution is spark.yarn.jars should be reload from properties files when 
> recovering from checkpoint. 
> attach is a demo to reproduce the issue.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-18355) Spark SQL fails to read data from a ORC hive table that has a new column added to it

2017-10-10 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-18355:


Assignee: (was: Apache Spark)

> Spark SQL fails to read data from a ORC hive table that has a new column 
> added to it
> 
>
> Key: SPARK-18355
> URL: https://issues.apache.org/jira/browse/SPARK-18355
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.2, 2.1.0, 2.2.0
> Environment: Centos6
>Reporter: Sandeep Nemuri
>
> *PROBLEM*:
> Spark SQL fails to read data from a ORC hive table that has a new column 
> added to it.
> Below is the exception:
> {code}
> scala> sqlContext.sql("select click_id,search_id from testorc").show
> 16/11/03 22:17:53 INFO ParseDriver: Parsing command: select 
> click_id,search_id from testorc
> 16/11/03 22:17:54 INFO ParseDriver: Parse Completed
> java.lang.AssertionError: assertion failed
>   at scala.Predef$.assert(Predef.scala:165)
>   at 
> org.apache.spark.sql.execution.datasources.LogicalRelation$$anonfun$1.apply(LogicalRelation.scala:39)
>   at 
> org.apache.spark.sql.execution.datasources.LogicalRelation$$anonfun$1.apply(LogicalRelation.scala:38)
>   at scala.Option.map(Option.scala:145)
>   at 
> org.apache.spark.sql.execution.datasources.LogicalRelation.(LogicalRelation.scala:38)
>   at 
> org.apache.spark.sql.execution.datasources.LogicalRelation.copy(LogicalRelation.scala:31)
>   at 
> org.apache.spark.sql.hive.HiveMetastoreCatalog.org$apache$spark$sql$hive$HiveMetastoreCatalog$$convertToOrcRelation(HiveMetastoreCatalog.scala:588)
>   at 
> org.apache.spark.sql.hive.HiveMetastoreCatalog$OrcConversions$$anonfun$apply$2.applyOrElse(HiveMetastoreCatalog.scala:647)
>   at 
> org.apache.spark.sql.hive.HiveMetastoreCatalog$OrcConversions$$anonfun$apply$2.applyOrElse(HiveMetastoreCatalog.scala:643)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:335)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:335)
>   at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:334)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:332)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:332)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:281)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> {code}
> *STEPS TO SIMULATE THIS ISSUE*:
> 1) Create table in hive.
> {code}
> CREATE TABLE `testorc`( 
> `click_id` string, 
> `search_id` string, 
> `uid` bigint)
> PARTITIONED BY ( 
> `ts` string, 
> `hour` string) 
> STORED AS ORC; 
> {code}
> 2) Load data into table :
> {code}
> INSERT INTO TABLE testorc PARTITION (ts = '98765',hour = '01' ) VALUES 
> (12,2,12345);
> {code}
> 3) Select through spark shell (This works)
> {code}
> sqlContext.sql("select click_id,search_id from testorc").show
> {code}
> 4) Now add column to hive table
> {code}
> ALTER TABLE testorc ADD COLUMNS (dummy string);
> {code}
> 5) Now again select from spark shell
> {code}
> scala> sqlContext.sql("select click_id,search_id from testorc").show
> 16/11/03 22:17:53 INFO ParseDriver: Parsing command: select 
> click_id,search_id from testorc
> 16/11/03 22:17:54 INFO ParseDriver: Parse Completed
> java.lang.AssertionError: assertion failed
>   at scala.Predef$.assert(Predef.scala:165)
>   at 
> org.apache.spark.sql.execution.datasources.LogicalRelation$$anonfun$1.apply(LogicalRelation.scala:39)
>   at 
> org.apache.spark.sql.execution.datasources.LogicalRelation$$anonfun$1.apply(LogicalRelation.scala:38)
>   at scala.Option.map(Option.scala:145)
>   at 
> org.apache.spark.sql.execution.datasources.LogicalRelation.(LogicalRelation.scala:38)
>   at 
> org.apache.spark.sql.execution.datasources.LogicalRelation.copy(LogicalRelation.scala:31)
>   at 
> org.apache.spark.sql.hive.HiveMetastoreCatalog.org$apache$spark$sql$hive$HiveMetastoreCatalog$$convertToOrcRelation(HiveMetastoreCatalog.scala:588)
>   at 
> org.apache.spark.sql.hive.HiveMetastoreCatalog$OrcConversions$$anonfun$apply$2.applyOrElse(HiveMetastoreCatalog.scala:647)
>   at 
> org.apache.spark.sql.hive.HiveMetastoreCatalog$OrcConversions$$anonfun$apply$2.applyOrElse(HiveMetastoreCatalog.scala:643)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:335)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:335)
>   at 
> 

[jira] [Updated] (SPARK-22243) streaming job failed to restart from checkpoint

2017-10-10 Thread StephenZou (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22243?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

StephenZou updated SPARK-22243:
---
Shepherd:   (was: Shixiong Zhu)

> streaming job failed to restart from checkpoint
> ---
>
> Key: SPARK-22243
> URL: https://issues.apache.org/jira/browse/SPARK-22243
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.2.0
>Reporter: StephenZou
> Attachments: CheckpointTest.scala
>
>
> My spark-defaults.conf has an item related to the issue, I upload all jars in 
> spark's jars folder to the hdfs path:
> spark.yarn.jars  hdfs:///spark/cache/spark2.2/* 
> Streaming job failed to restart from checkpoint, ApplicationMaster throws  
> "Error: Could not find or load main class 
> org.apache.spark.deploy.yarn.ExecutorLauncher".  The problem is always 
> reproducible.
> I examine the sparkconf object recovered from checkpoint, and find 
> spark.yarn.jars are set empty, which let all jars not exist in AM side. The 
> solution is spark.yarn.jars should be reload from properties files when 
> recovering from checkpoint. 
> attach is a demo to reproduce the issue.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-22243) streaming job failed to restart from checkpoint

2017-10-10 Thread StephenZou (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22243?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

StephenZou updated SPARK-22243:
---
Summary: streaming job failed to restart from checkpoint  (was: job failed 
to restart from checkpoint)

> streaming job failed to restart from checkpoint
> ---
>
> Key: SPARK-22243
> URL: https://issues.apache.org/jira/browse/SPARK-22243
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.2.0
>Reporter: StephenZou
> Attachments: CheckpointTest.scala
>
>
> My spark-defaults.conf has an item related to the issue, I upload all jars in 
> spark's jars folder to the hdfs path:
> spark.yarn.jars  hdfs:///spark/cache/spark2.2/* 
> Streaming job failed to restart from checkpoint, ApplicationMaster throws  
> "Error: Could not find or load main class 
> org.apache.spark.deploy.yarn.ExecutorLauncher".  The problem is always 
> reproducible.
> I examine the sparkconf object recovered from checkpoint, and find 
> spark.yarn.jars are set empty, which let all jars not exist in AM side. The 
> solution is spark.yarn.jars should be reload from properties files when 
> recovering from checkpoint. 
> attach is a demo to reproduce the issue.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-22243) job failed to restart from checkpoint

2017-10-10 Thread StephenZou (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22243?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

StephenZou updated SPARK-22243:
---
Attachment: CheckpointTest.scala

the reproducible demo

> job failed to restart from checkpoint
> -
>
> Key: SPARK-22243
> URL: https://issues.apache.org/jira/browse/SPARK-22243
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.2.0
>Reporter: StephenZou
> Attachments: CheckpointTest.scala
>
>
> My spark-defaults.conf has an item related to the issue, I upload all jars in 
> spark's jars folder to the hdfs path:
> spark.yarn.jars  hdfs:///spark/cache/spark2.2/* 
> Streaming job failed to restart from checkpoint, ApplicationMaster throws  
> "Error: Could not find or load main class 
> org.apache.spark.deploy.yarn.ExecutorLauncher".  The problem is always 
> reproducible.
> I examine the sparkconf object recovered from checkpoint, and find 
> spark.yarn.jars are set empty, which let all jars not exist in AM side. The 
> solution is spark.yarn.jars should be reload from properties files when 
> recovering from checkpoint. 
> attach is a demo to reproduce the issue.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-22243) job failed to restart from checkpoint

2017-10-10 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22243?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-22243:


Assignee: (was: Apache Spark)

> job failed to restart from checkpoint
> -
>
> Key: SPARK-22243
> URL: https://issues.apache.org/jira/browse/SPARK-22243
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.2.0
>Reporter: StephenZou
>
> My spark-defaults.conf has an item related to the issue, I upload all jars in 
> spark's jars folder to the hdfs path:
> spark.yarn.jars  hdfs:///spark/cache/spark2.2/* 
> Streaming job failed to restart from checkpoint, ApplicationMaster throws  
> "Error: Could not find or load main class 
> org.apache.spark.deploy.yarn.ExecutorLauncher".  The problem is always 
> reproducible.
> I examine the sparkconf object recovered from checkpoint, and find 
> spark.yarn.jars are set empty, which let all jars not exist in AM side. The 
> solution is spark.yarn.jars should be reload from properties files when 
> recovering from checkpoint. 
> attach is a demo to reproduce the issue.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-22243) job failed to restart from checkpoint

2017-10-10 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22243?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-22243:


Assignee: Apache Spark

> job failed to restart from checkpoint
> -
>
> Key: SPARK-22243
> URL: https://issues.apache.org/jira/browse/SPARK-22243
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.2.0
>Reporter: StephenZou
>Assignee: Apache Spark
>
> My spark-defaults.conf has an item related to the issue, I upload all jars in 
> spark's jars folder to the hdfs path:
> spark.yarn.jars  hdfs:///spark/cache/spark2.2/* 
> Streaming job failed to restart from checkpoint, ApplicationMaster throws  
> "Error: Could not find or load main class 
> org.apache.spark.deploy.yarn.ExecutorLauncher".  The problem is always 
> reproducible.
> I examine the sparkconf object recovered from checkpoint, and find 
> spark.yarn.jars are set empty, which let all jars not exist in AM side. The 
> solution is spark.yarn.jars should be reload from properties files when 
> recovering from checkpoint. 
> attach is a demo to reproduce the issue.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22243) job failed to restart from checkpoint

2017-10-10 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16199763#comment-16199763
 ] 

Apache Spark commented on SPARK-22243:
--

User 'ChenjunZou' has created a pull request for this issue:
https://github.com/apache/spark/pull/19469

> job failed to restart from checkpoint
> -
>
> Key: SPARK-22243
> URL: https://issues.apache.org/jira/browse/SPARK-22243
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.2.0
>Reporter: StephenZou
>
> My spark-defaults.conf has an item related to the issue, I upload all jars in 
> spark's jars folder to the hdfs path:
> spark.yarn.jars  hdfs:///spark/cache/spark2.2/* 
> Streaming job failed to restart from checkpoint, ApplicationMaster throws  
> "Error: Could not find or load main class 
> org.apache.spark.deploy.yarn.ExecutorLauncher".  The problem is always 
> reproducible.
> I examine the sparkconf object recovered from checkpoint, and find 
> spark.yarn.jars are set empty, which let all jars not exist in AM side. The 
> solution is spark.yarn.jars should be reload from properties files when 
> recovering from checkpoint. 
> attach is a demo to reproduce the issue.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-22243) job failed to restart from checkpoint

2017-10-10 Thread StephenZou (JIRA)
StephenZou created SPARK-22243:
--

 Summary: job failed to restart from checkpoint
 Key: SPARK-22243
 URL: https://issues.apache.org/jira/browse/SPARK-22243
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.2.0, 2.1.0
Reporter: StephenZou


My spark-defaults.conf has an item related to the issue, I upload all jars in 
spark's jars folder to the hdfs path:
spark.yarn.jars  hdfs:///spark/cache/spark2.2/* 

Streaming job failed to restart from checkpoint, ApplicationMaster throws  
"Error: Could not find or load main class 
org.apache.spark.deploy.yarn.ExecutorLauncher".  The problem is always 
reproducible.

I examine the sparkconf object recovered from checkpoint, and find 
spark.yarn.jars are set empty, which let all jars not exist in AM side. The 
solution is spark.yarn.jars should be reload from properties files when 
recovering from checkpoint. 

attach is a demo to reproduce the issue.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-22242) job failed to restart from checkpoint

2017-10-10 Thread StephenZou (JIRA)
StephenZou created SPARK-22242:
--

 Summary: job failed to restart from checkpoint
 Key: SPARK-22242
 URL: https://issues.apache.org/jira/browse/SPARK-22242
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.2.0, 2.1.0
Reporter: StephenZou


My spark-defaults.conf has an item related to the issue, I upload all jars in 
spark's jars folder to the hdfs path:
spark.yarn.jars  hdfs:///spark/cache/spark2.2/* 

Streaming job failed to restart from checkpoint, ApplicationMaster throws  
"Error: Could not find or load main class 
org.apache.spark.deploy.yarn.ExecutorLauncher".  The problem is always 
reproducible.

I examine the sparkconf object recovered from checkpoint, and find 
spark.yarn.jars are set empty, which let all jars not exist in AM side. The 
solution is spark.yarn.jars should be reload from properties files when 
recovering from checkpoint. 

attach is a demo to reproduce the issue.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-14286) Empty ORC table join throws exception

2017-10-10 Thread Dongjoon Hyun (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14286?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16199757#comment-16199757
 ] 

Dongjoon Hyun commented on SPARK-14286:
---

This is irrelevant to `spark.sql.hive.convertMetastoreOrc`, too.

> Empty ORC table join throws exception
> -
>
> Key: SPARK-14286
> URL: https://issues.apache.org/jira/browse/SPARK-14286
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Reporter: Rajesh Balamohan
>Priority: Minor
>
> When joining with an empty ORC table, sparks throws following exception. 
> {noformat}
> java.sql.SQLException: java.lang.IllegalArgumentException: orcFileOperator: 
> path /apps/hive/warehouse/test.db/table does not have valid orc files 
> matching the pattern
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-14286) Empty ORC table join throws exception

2017-10-10 Thread Dongjoon Hyun (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-14286?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun resolved SPARK-14286.
---
Resolution: Cannot Reproduce

Please reopen this with reproducible examples.

> Empty ORC table join throws exception
> -
>
> Key: SPARK-14286
> URL: https://issues.apache.org/jira/browse/SPARK-14286
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Reporter: Rajesh Balamohan
>Priority: Minor
>
> When joining with an empty ORC table, sparks throws following exception. 
> {noformat}
> java.sql.SQLException: java.lang.IllegalArgumentException: orcFileOperator: 
> path /apps/hive/warehouse/test.db/table does not have valid orc files 
> matching the pattern
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-21751) CodeGeneraor.splitExpressions counts code size more precisely

2017-10-10 Thread Xiao Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21751?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiao Li resolved SPARK-21751.
-
   Resolution: Fixed
 Assignee: Kazuaki Ishizaki
Fix Version/s: 2.3.0

> CodeGeneraor.splitExpressions counts code size more precisely
> -
>
> Key: SPARK-21751
> URL: https://issues.apache.org/jira/browse/SPARK-21751
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Kazuaki Ishizaki
>Assignee: Kazuaki Ishizaki
>Priority: Minor
> Fix For: 2.3.0
>
>
> Current {{CodeGeneraor.splitExpressions}} splits statements if their total 
> length is more than 1200 characters. It may include comments or empty line.
> It would be good to exclude comment or empty line to reduce the number of 
> generated methods in a class. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22241) Apache spark giving InvalidSchemaException: Cannot write a schema with an empty group: optional group element {

2017-10-10 Thread Ritika Maheshwari (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22241?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16199724#comment-16199724
 ] 

Ritika Maheshwari commented on SPARK-22241:
---

I know parquet does not allow empty struct types. But is this something that 
the Encoder should handle when generating the schema

> Apache spark giving InvalidSchemaException: Cannot write a schema with an 
> empty group: optional group element {
> ---
>
> Key: SPARK-22241
> URL: https://issues.apache.org/jira/browse/SPARK-22241
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: Ritika Maheshwari
>Priority: Minor
>
> I have a bean which has field of type Arraylist of Doubles. Then I do the 
> following
> Encoder beanEncoder = Encoders.bean(jClass);
> Dataset df = spark.createDataset( 
> Collections.singletonList((T)extractedObj),beanEncoder);
> The schema generated says:
> |-- pixelSpacing: array (nullable = true)
> |-- element: struct (containsNull = true)
> Now I try to save this Dataset as parquet 
> df.write().mode(SaveMode.Append).parquet(jClass.getName()+"_parquet");
> and I get the error Caused by: org.apache.parquet.schema.
> InvalidSchemaException: Cannot write a schema with an empty group: optional 
> group element {}
> Kindly direct how to specify an ArrayList of Strings or Doubles in the Bean 
> passed to the encoders so that the schema generated can be saved in paraquet



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-22241) Apache spark giving InvalidSchemaException: Cannot write a schema with an empty group: optional group element {

2017-10-10 Thread Ritika Maheshwari (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22241?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ritika Maheshwari updated SPARK-22241:
--
Description: 
I have a bean which has field of type Arraylist of Doubles. Then I do the 
following

Encoder beanEncoder = Encoders.bean(jClass);
Dataset df = spark.createDataset( 
Collections.singletonList((T)extractedObj),beanEncoder);
The schema generated says:

|-- pixelSpacing: array (nullable = true)
|-- element: struct (containsNull = true)

Now I try to save this Dataset as parquet 

df.write().mode(SaveMode.Append).parquet(jClass.getName()+"_parquet");

and I get the error Caused by: org.apache.parquet.schema.

InvalidSchemaException: Cannot write a schema with an empty group: optional 
group element {}

Kindly direct how to specify an ArrayList of Strings or Doubles in the Bean 
passed to the encoders so that the schema generated can be saved in paraquet



> Apache spark giving InvalidSchemaException: Cannot write a schema with an 
> empty group: optional group element {
> ---
>
> Key: SPARK-22241
> URL: https://issues.apache.org/jira/browse/SPARK-22241
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: Ritika Maheshwari
>Priority: Minor
>
> I have a bean which has field of type Arraylist of Doubles. Then I do the 
> following
> Encoder beanEncoder = Encoders.bean(jClass);
> Dataset df = spark.createDataset( 
> Collections.singletonList((T)extractedObj),beanEncoder);
> The schema generated says:
> |-- pixelSpacing: array (nullable = true)
> |-- element: struct (containsNull = true)
> Now I try to save this Dataset as parquet 
> df.write().mode(SaveMode.Append).parquet(jClass.getName()+"_parquet");
> and I get the error Caused by: org.apache.parquet.schema.
> InvalidSchemaException: Cannot write a schema with an empty group: optional 
> group element {}
> Kindly direct how to specify an ArrayList of Strings or Doubles in the Bean 
> passed to the encoders so that the schema generated can be saved in paraquet



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-22241) Apache spark giving InvalidSchemaException: Cannot write a schema with an empty group: optional group element {

2017-10-10 Thread Ritika Maheshwari (JIRA)
Ritika Maheshwari created SPARK-22241:
-

 Summary: Apache spark giving InvalidSchemaException: Cannot write 
a schema with an empty group: optional group element {
 Key: SPARK-22241
 URL: https://issues.apache.org/jira/browse/SPARK-22241
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.2.0
Reporter: Ritika Maheshwari
Priority: Minor






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-21686) spark.sql.hive.convertMetastoreOrc is causing NullPointerException while reading ORC tables

2017-10-10 Thread Dongjoon Hyun (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun resolved SPARK-21686.
---
Resolution: Duplicate

> spark.sql.hive.convertMetastoreOrc is causing NullPointerException while 
> reading ORC tables
> ---
>
> Key: SPARK-21686
> URL: https://issues.apache.org/jira/browse/SPARK-21686
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Shell
>Affects Versions: 1.6.1
> Environment: spark_2_4_2_0_258-1.6.1.2.4.2.0-258.el6.noarch 
> spark_2_4_2_0_258-python-1.6.1.2.4.2.0-258.el6.noarch 
> spark_2_4_2_0_258-yarn-shuffle-1.6.1.2.4.2.0-258.el6.noarch
> RHEL-7 (64-Bit)
> JDK 1.8
>Reporter: Ernani Pereira de Mattos Junior
>
> The issue is very similar to SPARK-10304; 
> Spark Query throws a NullPointerException. 
> >>> sqlContext.sql('select * from core_next.spark_categorization').show(57) 
> 17/06/19 11:26:54 ERROR Executor: Exception in task 2.0 in stage 21.0 (TID 
> 48) 
> java.lang.NullPointerException 
> at 
> org.apache.spark.sql.hive.HiveInspectors$class.unwrapperFor(HiveInspectors.scala:488)
>  
> at 
> org.apache.spark.sql.hive.orc.OrcTableScan.unwrapperFor(OrcRelation.scala:244)
>  
> at 
> org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$6.apply(OrcRelation.scala:275)
>  
> at 
> org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$6.apply(OrcRelation.scala:275)
>  
> Turn off ORC optimizations and issue was resolved: 
> "sqlContext.setConf("spark.sql.hive.convertMetastoreOrc", "false")



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21686) spark.sql.hive.convertMetastoreOrc is causing NullPointerException while reading ORC tables

2017-10-10 Thread Dongjoon Hyun (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21686?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16199695#comment-16199695
 ] 

Dongjoon Hyun commented on SPARK-21686:
---

Hi, [~emattosHWX] and [~viirya].
This is a duplicate of SPARK-18355 which happens when Hive adds a new column 
after ORC file is created.
I will close this issue.

> spark.sql.hive.convertMetastoreOrc is causing NullPointerException while 
> reading ORC tables
> ---
>
> Key: SPARK-21686
> URL: https://issues.apache.org/jira/browse/SPARK-21686
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Shell
>Affects Versions: 1.6.1
> Environment: spark_2_4_2_0_258-1.6.1.2.4.2.0-258.el6.noarch 
> spark_2_4_2_0_258-python-1.6.1.2.4.2.0-258.el6.noarch 
> spark_2_4_2_0_258-yarn-shuffle-1.6.1.2.4.2.0-258.el6.noarch
> RHEL-7 (64-Bit)
> JDK 1.8
>Reporter: Ernani Pereira de Mattos Junior
>
> The issue is very similar to SPARK-10304; 
> Spark Query throws a NullPointerException. 
> >>> sqlContext.sql('select * from core_next.spark_categorization').show(57) 
> 17/06/19 11:26:54 ERROR Executor: Exception in task 2.0 in stage 21.0 (TID 
> 48) 
> java.lang.NullPointerException 
> at 
> org.apache.spark.sql.hive.HiveInspectors$class.unwrapperFor(HiveInspectors.scala:488)
>  
> at 
> org.apache.spark.sql.hive.orc.OrcTableScan.unwrapperFor(OrcRelation.scala:244)
>  
> at 
> org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$6.apply(OrcRelation.scala:275)
>  
> at 
> org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$6.apply(OrcRelation.scala:275)
>  
> Turn off ORC optimizations and issue was resolved: 
> "sqlContext.setConf("spark.sql.hive.convertMetastoreOrc", "false")



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-1911) Warn users if their assembly jars are not built with Java 6

2017-10-10 Thread Swaapnika Guntaka (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16199666#comment-16199666
 ] 

Swaapnika Guntaka commented on SPARK-1911:
--

Does this issue still exist with Spark-2.2.? 

> Warn users if their assembly jars are not built with Java 6
> ---
>
> Key: SPARK-1911
> URL: https://issues.apache.org/jira/browse/SPARK-1911
> Project: Spark
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.1.0
>Reporter: Andrew Or
>Assignee: Sean Owen
> Fix For: 1.2.2, 1.3.0
>
>
> The root cause of the problem is detailed in: 
> https://issues.apache.org/jira/browse/SPARK-1520.
> In short, an assembly jar built with Java 7+ is not always accessible by 
> Python or other versions of Java (especially Java 6). If the assembly jar is 
> not built on the cluster itself, this problem may manifest itself in strange 
> exceptions that are not trivial to debug. This is an issue especially for 
> PySpark on YARN, which relies on the python files included within the 
> assembly jar.
> Currently we warn users only in make-distribution.sh, but most users build 
> the jars directly. At the very least we need to emphasize this in the docs 
> (currently missing entirely). The next step is to add a warning prompt in the 
> mvn scripts whenever Java 7+ is detected.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-15757) Error occurs when using Spark sql "select" statement on orc file after hive sql "insert overwrite tb1 select * from sourcTb" has been executed on this orc file

2017-10-10 Thread Dongjoon Hyun (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-15757?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun resolved SPARK-15757.
---
Resolution: Duplicate

This issue only happens when `convertMetastoreOrc` is true. The root cause is 
ORC doesn't store the correct column names.

{code}
$ hive12 --orcfiledump /user/hive/warehouse/inventory/00_0
Type: struct<_col0:int,_col1:int,_col2:int,_col3:int>
{code}

 I can reproduce that and the following is the workaround. This is a well-known 
issue. I'll close this as a duplicate of SPARK-14387.

{code}
scala> sql("set spark.sql.hive.convertMetastoreOrc=false")
res4: org.apache.spark.sql.DataFrame = [key: string, value: string]

scala> sql("select * from inventory").show
+---+---+++
|inv_date_sk|inv_item_sk|inv_warehouse_sk|inv_quantity_on_hand|
+---+---+++
|  1|  2|   3|   4|
+---+---+++
{code}

> Error occurs when using Spark sql "select" statement on orc file after hive 
> sql "insert overwrite tb1 select * from sourcTb" has been executed on this 
> orc file
> ---
>
> Key: SPARK-15757
> URL: https://issues.apache.org/jira/browse/SPARK-15757
> Project: Spark
>  Issue Type: Bug
>Affects Versions: 2.0.0
>Reporter: marymwu
> Attachments: Result.png
>
>
> Error occurs when using Spark sql "select" statement on orc file after hive 
> sql "insert overwrite tb1 select * from sourcTb" has been executed
> 0: jdbc:hive2://172.19.200.158:40099/default> select * from inventory;
> Error: org.apache.spark.SparkException: Job aborted due to stage failure: 
> Task 0 in stage 7.0 failed 8 times, most recent failure: Lost task 0.7 in 
> stage 7.0 (TID 2532, smokeslave5.avatar.lenovomm.com): 
> java.lang.IllegalArgumentException: Field "inv_date_sk" does not exist.
>   at 
> org.apache.spark.sql.types.StructType$$anonfun$fieldIndex$1.apply(StructType.scala:252)
>   at 
> org.apache.spark.sql.types.StructType$$anonfun$fieldIndex$1.apply(StructType.scala:252)
>   at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
>   at scala.collection.AbstractMap.getOrElse(Map.scala:59)
>   at 
> org.apache.spark.sql.types.StructType.fieldIndex(StructType.scala:251)
>   at 
> org.apache.spark.sql.hive.orc.OrcRelation$$anonfun$10.apply(OrcRelation.scala:361)
>   at 
> org.apache.spark.sql.hive.orc.OrcRelation$$anonfun$10.apply(OrcRelation.scala:361)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at org.apache.spark.sql.types.StructType.foreach(StructType.scala:94)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at org.apache.spark.sql.types.StructType.map(StructType.scala:94)
>   at 
> org.apache.spark.sql.hive.orc.OrcRelation$.setRequiredColumns(OrcRelation.scala:361)
>   at 
> org.apache.spark.sql.hive.orc.DefaultSource$$anonfun$buildReader$2.apply(OrcRelation.scala:123)
>   at 
> org.apache.spark.sql.hive.orc.DefaultSource$$anonfun$buildReader$2.apply(OrcRelation.scala:112)
>   at 
> org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(fileSourceInterfaces.scala:278)
>   at 
> org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(fileSourceInterfaces.scala:262)
>   at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:114)
>   at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$7$$anon$1.hasNext(WholeStageCodegenExec.scala:357)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$3.apply(SparkPlan.scala:246)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$3.apply(SparkPlan.scala:240)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$23.apply(RDD.scala:774)
>   at 
> 

[jira] [Commented] (SPARK-18278) SPIP: Support native submission of spark jobs to a kubernetes cluster

2017-10-10 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16199610#comment-16199610
 ] 

Apache Spark commented on SPARK-18278:
--

User 'foxish' has created a pull request for this issue:
https://github.com/apache/spark/pull/19468

> SPIP: Support native submission of spark jobs to a kubernetes cluster
> -
>
> Key: SPARK-18278
> URL: https://issues.apache.org/jira/browse/SPARK-18278
> Project: Spark
>  Issue Type: Umbrella
>  Components: Build, Deploy, Documentation, Scheduler, Spark Core
>Affects Versions: 2.3.0
>Reporter: Erik Erlandson
>  Labels: SPIP
> Attachments: SPARK-18278 Spark on Kubernetes Design Proposal Revision 
> 2 (1).pdf
>
>
> A new Apache Spark sub-project that enables native support for submitting 
> Spark applications to a kubernetes cluster.   The submitted application runs 
> in a driver executing on a kubernetes pod, and executors lifecycles are also 
> managed as pods.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-22240) S3 CSV number of partitions incorrectly computed

2017-10-10 Thread Arthur Baudry (JIRA)
Arthur Baudry created SPARK-22240:
-

 Summary: S3 CSV number of partitions incorrectly computed
 Key: SPARK-22240
 URL: https://issues.apache.org/jira/browse/SPARK-22240
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.2.0
 Environment: Running on EMR 5.8.0 with Hadoop 2.7.3 and Spark 2.2.0
Reporter: Arthur Baudry


Reading CSV out of S3 using S3A protocol does not compute the number of 
partitions correctly in Spark 2.2.0.

With Spark 2.2.0 I get only partition when loading a 14GB file
{code:java}
scala> val input = spark.read.format("csv").option("header", 
"true").option("delimiter", "|").option("multiLine", 
"true").load("s3a://")
input: org.apache.spark.sql.DataFrame = [PARTY_KEY: string, ROW_START_DATE: 
string ... 36 more fields]

scala> input.rdd.getNumPartitions
res2: Int = 1
{code}

While in Spark 2.0.2 I had:
{code:java}
scala> val input = spark.read.format("csv").option("header", 
"true").option("delimiter", "|").option("multiLine", 
"true").load("s3a://")
input: org.apache.spark.sql.DataFrame = [PARTY_KEY: string, ROW_START_DATE: 
string ... 36 more fields]

scala> input.rdd.getNumPartitions
res2: Int = 115
{code}

This introduces obvious performance issues in Spark 2.2.0. Maybe there is a 
property that should be set to have the number of partitions computed correctly.

I'm aware that the .option("multiline","true") is not supported in Spark 2.0.2, 
it's not relevant here.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-21929) Support `ALTER TABLE table_name ADD COLUMNS(..)` for ORC data source

2017-10-10 Thread Dongjoon Hyun (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21929?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-21929:
--
Description: 
SPARK-19261 implemented `ADD COLUMNS` at Spark 2.2, but ORC data source is not 
supported due to its limit.

{code}
scala> sql("CREATE TABLE tab (c1 int, c2 int, c3 int) USING ORC PARTITIONED BY 
(c3)")
scala> sql("ALTER TABLE tab ADD COLUMNS (c4 int)")
org.apache.spark.sql.AnalysisException:
ALTER ADD COLUMNS does not support datasource table with type ORC.
You must drop and re-create the table for adding the new columns. Tables: `tab`;
{code}

  was:SPARK-19261 implemented `ADD COLUMNS` at Spark 2.2, but ORC data source 
is not supported due to its limit.


> Support `ALTER TABLE table_name ADD COLUMNS(..)` for ORC data source
> 
>
> Key: SPARK-21929
> URL: https://issues.apache.org/jira/browse/SPARK-21929
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Dongjoon Hyun
>
> SPARK-19261 implemented `ADD COLUMNS` at Spark 2.2, but ORC data source is 
> not supported due to its limit.
> {code}
> scala> sql("CREATE TABLE tab (c1 int, c2 int, c3 int) USING ORC PARTITIONED 
> BY (c3)")
> scala> sql("ALTER TABLE tab ADD COLUMNS (c4 int)")
> org.apache.spark.sql.AnalysisException:
> ALTER ADD COLUMNS does not support datasource table with type ORC.
> You must drop and re-create the table for adding the new columns. Tables: 
> `tab`;
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22223) ObjectHashAggregate introduces unnecessary shuffle

2017-10-10 Thread Takeshi Yamamuro (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16199597#comment-16199597
 ] 

Takeshi Yamamuro commented on SPARK-3:
--

The hash-based aggregate implementation requires the partitioning that you 
select in group-by clauses. In the looser case you suggested, it seems we need 
shuffles. Do I misunderstand your suggestion?

> ObjectHashAggregate introduces unnecessary shuffle
> --
>
> Key: SPARK-3
> URL: https://issues.apache.org/jira/browse/SPARK-3
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 2.2.0
> Environment: Spark 2.2.0 and following.
> {{spark.sql.execution.useObjectHashAggregateExec = true}}
>Reporter: Michele Costantino Soccio
>
> Since Spark 2.2 the {{groupBy}} plus {{collect_list}} makes use of 
> unnecessary shuffle when the partitions at previous step are based on looser 
> criteria than the current {{groupBy}}.
> For example:
> {code:java}
> //sample data from 
> https://github.com/databricks/Spark-The-Definitive-Guide/tree/master/data/retail-data
> //Read the data and repartitions by "Country"
> val retailDF = spark.sql("Select * from online_retail")
> .repartition(col("Country"))
> //Group the data and collect.
> val aggregatedDF = retailDF
>   .withColumn("Good", expr("(StockCode, UnitPrice, Quantity, Description)"))
>   .groupBy("Country", "CustomerID", "InvoiceNo", "InvoiceDate")
>   .agg(collect_list("Good").as("Goods"))
>   .withColumn("Invoice", expr("(InvoiceNo, InvoiceDate, Goods)"))
>   .groupBy("Country", "CustomerID")
>   .agg(collect_list("Invoice").as("Invoices"))
>   .withColumn("Customer", expr("(CustomerID, Invoices)"))
>   .groupBy("Country")
>   .agg(collect_list("Customer").as("Customers"))
> {code}
> Without disabling the {{ObjectHashAggregate}} one gets the following physical 
> plan:
> {noformat}
> == Physical Plan ==
> ObjectHashAggregate(keys=[Country#14], 
> functions=[finalmerge_collect_list(merge buf#317) AS 
> collect_list(Customer#299, 0, 0)#310])
> +- Exchange hashpartitioning(Country#14, 200)
>+- ObjectHashAggregate(keys=[Country#14], 
> functions=[partial_collect_list(Customer#299, 0, 0) AS buf#317])
>   +- *Project [Country#14, named_struct(CustomerID, CustomerID#13, 
> Invoices, Invoices#294) AS Customer#299]
>  +- ObjectHashAggregate(keys=[Country#14, CustomerID#13], 
> functions=[finalmerge_collect_list(merge buf#319) AS 
> collect_list(Invoice#278, 0, 0)#293])
> +- Exchange hashpartitioning(Country#14, CustomerID#13, 200)
>+- ObjectHashAggregate(keys=[Country#14, CustomerID#13], 
> functions=[partial_collect_list(Invoice#278, 0, 0) AS buf#319])
>   +- *Project [Country#14, CustomerID#13, 
> named_struct(InvoiceNo, InvoiceNo#7, InvoiceDate, InvoiceDate#11, Goods, 
> Goods#271) AS Invoice#278]
>  +- ObjectHashAggregate(keys=[Country#14, CustomerID#13, 
> InvoiceNo#7, InvoiceDate#11], functions=[finalmerge_collect_list(merge 
> buf#321) AS collect_list(Good#249, 0, 0)#270])
> +- Exchange hashpartitioning(Country#14, 
> CustomerID#13, InvoiceNo#7, InvoiceDate#11, 200)
>+- ObjectHashAggregate(keys=[Country#14, 
> CustomerID#13, InvoiceNo#7, InvoiceDate#11], 
> functions=[partial_collect_list(Good#249, 0, 0) AS buf#321])
>   +- *Project [InvoiceNo#7, InvoiceDate#11, 
> CustomerID#13, Country#14, named_struct(StockCode, StockCode#8, UnitPrice, 
> UnitPrice#12, Quantity, Quantity#10, Description, Description#9) AS Good#249]
>  +- Exchange hashpartitioning(Country#14, 200)
> +- *FileScan csv 
> default.online_retail[InvoiceNo#7,StockCode#8,Description#9,Quantity#10,InvoiceDate#11,UnitPrice#12,CustomerID#13,Country#14]
>  Batched: false, Format: CSV, Location: 
> InMemoryFileIndex[dbfs:/FileStore/tables/scgc0grb1506404260438], 
> PartitionFilters: [], PushedFilters: [], ReadSchema: 
> struct {noformat}
> With Spark 2.1.0 or when {{ObjectHashAggregate}} is disabled, one gets a more 
> efficient:
> {noformat}
> == Physical Plan ==
> SortAggregate(key=[Country#14], functions=[finalmerge_collect_list(merge 
> buf#3834) AS collect_list(Customer#299, 0, 0)#310])
> +- SortAggregate(key=[Country#14], 
> functions=[partial_collect_list(Customer#299, 0, 0) AS buf#3834])
>+- *Project [Country#14, named_struct(CustomerID, CustomerID#13, Invoices, 
> Invoices#294) AS Customer#299]
>   +- SortAggregate(key=[Country#14, CustomerID#13], 
> functions=[finalmerge_collect_list(merge buf#319) AS 
> collect_list(Invoice#278, 0, 0)#293])
>  +- 

[jira] [Resolved] (SPARK-19558) Provide a config option to attach QueryExecutionListener to SparkSession

2017-10-10 Thread Xiao Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiao Li resolved SPARK-19558.
-
   Resolution: Fixed
 Assignee: Marcelo Vanzin
Fix Version/s: 2.3.0

> Provide a config option to attach QueryExecutionListener to SparkSession
> 
>
> Key: SPARK-19558
> URL: https://issues.apache.org/jira/browse/SPARK-19558
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: Salil Surendran
>Assignee: Marcelo Vanzin
> Fix For: 2.3.0
>
>
> Provide a configuration property(just like spark.extraListeners) to attach a 
> QueryExecutionListener to a SparkSession



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-22239) Used-defined window functions with pandas udf

2017-10-10 Thread Li Jin (JIRA)
Li Jin created SPARK-22239:
--

 Summary: Used-defined window functions with pandas udf
 Key: SPARK-22239
 URL: https://issues.apache.org/jira/browse/SPARK-22239
 Project: Spark
  Issue Type: Sub-task
  Components: PySpark
Affects Versions: 2.2.0
 Environment: 

Reporter: Li Jin


Window function is another place we can benefit from vectored udf and add 
another useful function to the pandas_udf suite.

Example usage (preliminary):

{code:java}
w = Window.partitionBy('id').orderBy('time').rangeBetween(-200, 0)

@pandas_udf(DoubleType())
def ema(v1):
return v1.ewm(alpha=0.5).mean().iloc[-1]

df.withColumn('v1_ema', ema(df.v1).over(window))
{code}





--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22216) Improving PySpark/Pandas interoperability

2017-10-10 Thread Li Jin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22216?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16199516#comment-16199516
 ] 

Li Jin commented on SPARK-22216:


[~hyukjin.kwon],

My intention is to keep this open to track all the features that we want to do 
related to PySpark/Pandas interoperability and keep adding sub tasks as they 
come up. 

For existing Jiras, there are speed up to '"createDataFrame" and support for 
complex types.

For new features, there are more functions that I think we can add pandas udf 
support to, for instance, window functions. I haven't created Jiras for them 
yet.

> Improving PySpark/Pandas interoperability
> -
>
> Key: SPARK-22216
> URL: https://issues.apache.org/jira/browse/SPARK-22216
> Project: Spark
>  Issue Type: Epic
>  Components: PySpark
>Affects Versions: 2.2.0
>Reporter: Li Jin
>
> This is an umbrella ticket tracking the general effect of improving 
> performance and interoperability between PySpark and Pandas. The core idea is 
> to Apache Arrow as serialization format to reduce the overhead between 
> PySpark and Pandas.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22216) Improving PySpark/Pandas interoperability

2017-10-10 Thread Hyukjin Kwon (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22216?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16199514#comment-16199514
 ] 

Hyukjin Kwon commented on SPARK-22216:
--

Thanks for updating.

> Improving PySpark/Pandas interoperability
> -
>
> Key: SPARK-22216
> URL: https://issues.apache.org/jira/browse/SPARK-22216
> Project: Spark
>  Issue Type: Epic
>  Components: PySpark
>Affects Versions: 2.2.0
>Reporter: Li Jin
>
> This is an umbrella ticket tracking the general effect of improving 
> performance and interoperability between PySpark and Pandas. The core idea is 
> to Apache Arrow as serialization format to reduce the overhead between 
> PySpark and Pandas.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-20791) Use Apache Arrow to Improve Spark createDataFrame from Pandas.DataFrame

2017-10-10 Thread Li Jin (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20791?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Li Jin updated SPARK-20791:
---
Issue Type: Sub-task  (was: New Feature)
Parent: SPARK-22216

> Use Apache Arrow to Improve Spark createDataFrame from Pandas.DataFrame
> ---
>
> Key: SPARK-20791
> URL: https://issues.apache.org/jira/browse/SPARK-20791
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark, SQL
>Affects Versions: 2.1.1
>Reporter: Bryan Cutler
>
> The current code for creating a Spark DataFrame from a Pandas DataFrame uses 
> `to_records` to convert the DataFrame to a list of records and then converts 
> each record to a list.  Following this, there are a number of calls to 
> serialize and transfer this data to the JVM.  This process is very 
> inefficient and also discards all schema metadata, requiring another pass 
> over the data to infer types.
> Using Apache Arrow, the Pandas DataFrame could be efficiently converted to 
> Arrow data and directly transferred to the JVM to create the Spark DataFrame. 
>  The performance will be better and the Pandas schema will also be used so 
> that the correct types will be used.  
> Issues with the poor type inference have come up before, causing confusion 
> and frustration with users because it is not clear why it fails or doesn't 
> use the same type from Pandas.  Fixing this with Apache Arrow will solve 
> another pain point for Python users and the following JIRAs could be closed:
> * SPARK-17804
> * SPARK-18178



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22216) Improving PySpark/Pandas interoperability

2017-10-10 Thread Hyukjin Kwon (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22216?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16199502#comment-16199502
 ] 

Hyukjin Kwon commented on SPARK-22216:
--

[~icexelloss] and [~bryanc], do you think we need more sub-tasks or it is 
resolvable for now (as {{Done}})?

> Improving PySpark/Pandas interoperability
> -
>
> Key: SPARK-22216
> URL: https://issues.apache.org/jira/browse/SPARK-22216
> Project: Spark
>  Issue Type: Epic
>  Components: PySpark
>Affects Versions: 2.2.0
>Reporter: Li Jin
>
> This is an umbrella ticket tracking the general effect of improving 
> performance and interoperability between PySpark and Pandas. The core idea is 
> to Apache Arrow as serialization format to reduce the overhead between 
> PySpark and Pandas.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-20396) groupBy().apply() with pandas udf in pyspark

2017-10-10 Thread Hyukjin Kwon (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20396?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-20396.
--
   Resolution: Fixed
Fix Version/s: 2.3.0

Issue resolved by pull request 18732
[https://github.com/apache/spark/pull/18732]

> groupBy().apply() with pandas udf in pyspark
> 
>
> Key: SPARK-20396
> URL: https://issues.apache.org/jira/browse/SPARK-20396
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark, SQL
>Affects Versions: 2.1.0
>Reporter: Li Jin
>Assignee: Li Jin
> Fix For: 2.3.0
>
>
> split-apply-merge is a common pattern when analyzing data. It is implemented 
> in many popular data analyzing libraries such as Spark, Pandas, R, and etc. 
> Split and merge operations in these libraries are similar to each other, 
> mostly implemented by certain grouping operators. For instance, Spark 
> DataFrame has groupBy, Pandas DataFrame has groupby. Therefore, for users 
> familiar with either Spark DataFrame or pandas DataFrame, it is not difficult 
> for them to understand how grouping works in the other library. However, 
> apply is more native to different libraries and therefore, quite different 
> between libraries. A pandas user knows how to use apply to do curtain 
> transformation in pandas might not know how to do the same using pyspark. 
> Also, the current implementation of passing data from the java executor to 
> python executor is not efficient, there is opportunity to speed it up using 
> Apache Arrow. This feature can enable use cases that uses Spark's grouping 
> operators such as groupBy, rollUp, cube, window and Pandas's native apply 
> operator.
> Related work:
> SPARK-13534
> This enables faster data serialization between Pyspark and Pandas using 
> Apache Arrow. Our work will be on top of this and use the same serialization 
> for pandas udf.
> SPARK-12919 and SPARK-12922
> These implemented two functions: dapply and gapply in Spark R which 
> implements the similar split-apply-merge pattern that we want to implement 
> with Pyspark. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22238) EnsureStatefulOpPartitioning shouldn't ask for the child RDD before planning is completed

2017-10-10 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22238?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16199451#comment-16199451
 ] 

Apache Spark commented on SPARK-22238:
--

User 'brkyvz' has created a pull request for this issue:
https://github.com/apache/spark/pull/19467

> EnsureStatefulOpPartitioning shouldn't ask for the child RDD before planning 
> is completed
> -
>
> Key: SPARK-22238
> URL: https://issues.apache.org/jira/browse/SPARK-22238
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.0
>Reporter: Burak Yavuz
>Assignee: Burak Yavuz
>
> In EnsureStatefulOpPartitioning, we check that the inputRDD to a SparkPlan 
> has the expected partitioning for Streaming Stateful Operators. The problem 
> is that we are not allowed to access this information during planning.
> The reason we added that check was because CoalesceExec could actually create 
> RDDs with 0 partitions. We should fix it such that when CoalesceExec says 
> that there is a SinglePartition, there is in fact an inputRDD of 1 partition 
> instead of 0 partitions.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-22238) EnsureStatefulOpPartitioning shouldn't ask for the child RDD before planning is completed

2017-10-10 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22238?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-22238:


Assignee: Burak Yavuz  (was: Apache Spark)

> EnsureStatefulOpPartitioning shouldn't ask for the child RDD before planning 
> is completed
> -
>
> Key: SPARK-22238
> URL: https://issues.apache.org/jira/browse/SPARK-22238
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.0
>Reporter: Burak Yavuz
>Assignee: Burak Yavuz
>
> In EnsureStatefulOpPartitioning, we check that the inputRDD to a SparkPlan 
> has the expected partitioning for Streaming Stateful Operators. The problem 
> is that we are not allowed to access this information during planning.
> The reason we added that check was because CoalesceExec could actually create 
> RDDs with 0 partitions. We should fix it such that when CoalesceExec says 
> that there is a SinglePartition, there is in fact an inputRDD of 1 partition 
> instead of 0 partitions.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-22238) EnsureStatefulOpPartitioning shouldn't ask for the child RDD before planning is completed

2017-10-10 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22238?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-22238:


Assignee: Apache Spark  (was: Burak Yavuz)

> EnsureStatefulOpPartitioning shouldn't ask for the child RDD before planning 
> is completed
> -
>
> Key: SPARK-22238
> URL: https://issues.apache.org/jira/browse/SPARK-22238
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.0
>Reporter: Burak Yavuz
>Assignee: Apache Spark
>
> In EnsureStatefulOpPartitioning, we check that the inputRDD to a SparkPlan 
> has the expected partitioning for Streaming Stateful Operators. The problem 
> is that we are not allowed to access this information during planning.
> The reason we added that check was because CoalesceExec could actually create 
> RDDs with 0 partitions. We should fix it such that when CoalesceExec says 
> that there is a SinglePartition, there is in fact an inputRDD of 1 partition 
> instead of 0 partitions.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-22238) EnsureStatefulOpPartitioning shouldn't ask for the child RDD before planning is completed

2017-10-10 Thread Burak Yavuz (JIRA)
Burak Yavuz created SPARK-22238:
---

 Summary: EnsureStatefulOpPartitioning shouldn't ask for the child 
RDD before planning is completed
 Key: SPARK-22238
 URL: https://issues.apache.org/jira/browse/SPARK-22238
 Project: Spark
  Issue Type: Bug
  Components: Structured Streaming
Affects Versions: 2.3.0
Reporter: Burak Yavuz
Assignee: Burak Yavuz


In EnsureStatefulOpPartitioning, we check that the inputRDD to a SparkPlan has 
the expected partitioning for Streaming Stateful Operators. The problem is that 
we are not allowed to access this information during planning.

The reason we added that check was because CoalesceExec could actually create 
RDDs with 0 partitions. We should fix it such that when CoalesceExec says that 
there is a SinglePartition, there is in fact an inputRDD of 1 partition instead 
of 0 partitions.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-21988) Add default stats to StreamingExecutionRelation

2017-10-10 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21988?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-21988:


Assignee: Apache Spark  (was: Jose Torres)

> Add default stats to StreamingExecutionRelation
> ---
>
> Key: SPARK-21988
> URL: https://issues.apache.org/jira/browse/SPARK-21988
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Jose Torres
>Assignee: Apache Spark
> Fix For: 2.3.0
>
>
> StreamingExecutionRelation currently doesn't implement stats.
> This makes some sense, but unfortunately the LeafNode contract requires that 
> nodes which survive analysis implement stats, and StreamingExecutionRelation 
> can indeed survive analysis when running explain() on a streaming dataframe.
> This value won't ever be used during execution, because 
> StreamingExecutionRelation does *not* survive analysis on the execution path; 
> it's replaced with each batch.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-21988) Add default stats to StreamingExecutionRelation

2017-10-10 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21988?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-21988:


Assignee: Jose Torres  (was: Apache Spark)

> Add default stats to StreamingExecutionRelation
> ---
>
> Key: SPARK-21988
> URL: https://issues.apache.org/jira/browse/SPARK-21988
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Jose Torres
>Assignee: Jose Torres
> Fix For: 2.3.0
>
>
> StreamingExecutionRelation currently doesn't implement stats.
> This makes some sense, but unfortunately the LeafNode contract requires that 
> nodes which survive analysis implement stats, and StreamingExecutionRelation 
> can indeed survive analysis when running explain() on a streaming dataframe.
> This value won't ever be used during execution, because 
> StreamingExecutionRelation does *not* survive analysis on the execution path; 
> it's replaced with each batch.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Reopened] (SPARK-21988) Add default stats to StreamingExecutionRelation

2017-10-10 Thread Shixiong Zhu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21988?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shixiong Zhu reopened SPARK-21988:
--

> Add default stats to StreamingExecutionRelation
> ---
>
> Key: SPARK-21988
> URL: https://issues.apache.org/jira/browse/SPARK-21988
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Jose Torres
>Assignee: Jose Torres
> Fix For: 2.3.0
>
>
> StreamingExecutionRelation currently doesn't implement stats.
> This makes some sense, but unfortunately the LeafNode contract requires that 
> nodes which survive analysis implement stats, and StreamingExecutionRelation 
> can indeed survive analysis when running explain() on a streaming dataframe.
> This value won't ever be used during execution, because 
> StreamingExecutionRelation does *not* survive analysis on the execution path; 
> it's replaced with each batch.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-21907) NullPointerException in UnsafeExternalSorter.spill()

2017-10-10 Thread Herman van Hovell (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21907?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Herman van Hovell resolved SPARK-21907.
---
   Resolution: Fixed
Fix Version/s: 2.3.0

> NullPointerException in UnsafeExternalSorter.spill()
> 
>
> Key: SPARK-21907
> URL: https://issues.apache.org/jira/browse/SPARK-21907
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: Juliusz Sompolski
>Assignee: Eyal Farago
> Fix For: 2.3.0
>
>
> I see NPE during sorting with the following stacktrace:
> {code}
> java.lang.NullPointerException
>   at 
> org.apache.spark.memory.TaskMemoryManager.getPage(TaskMemoryManager.java:383)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter$SortComparator.compare(UnsafeInMemorySorter.java:63)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter$SortComparator.compare(UnsafeInMemorySorter.java:43)
>   at 
> org.apache.spark.util.collection.TimSort.countRunAndMakeAscending(TimSort.java:270)
>   at org.apache.spark.util.collection.TimSort.sort(TimSort.java:142)
>   at org.apache.spark.util.collection.Sorter.sort(Sorter.scala:37)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.getSortedIterator(UnsafeInMemorySorter.java:345)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:206)
>   at 
> org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:203)
>   at 
> org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:281)
>   at 
> org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:90)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset(UnsafeInMemorySorter.java:173)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:221)
>   at 
> org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:203)
>   at 
> org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:281)
>   at 
> org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:90)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.growPointerArrayIfNecessary(UnsafeExternalSorter.java:349)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:400)
>   at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:109)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
>   at 
> org.apache.spark.sql.execution.RowIteratorFromScala.advanceNext(RowIterator.scala:83)
>   at 
> org.apache.spark.sql.execution.joins.SortMergeJoinScanner.advancedStreamed(SortMergeJoinExec.scala:778)
>   at 
> org.apache.spark.sql.execution.joins.SortMergeJoinScanner.findNextInnerJoinRows(SortMergeJoinExec.scala:685)
>   at 
> org.apache.spark.sql.execution.joins.SortMergeJoinExec$$anonfun$doExecute$1$$anon$2.advanceNext(SortMergeJoinExec.scala:259)
>   at 
> org.apache.spark.sql.execution.RowIteratorToScala.hasNext(RowIterator.scala:68)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>   at 
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
>   at org.apache.spark.scheduler.Task.run(Task.scala:108)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:346)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 

[jira] [Assigned] (SPARK-21907) NullPointerException in UnsafeExternalSorter.spill()

2017-10-10 Thread Herman van Hovell (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21907?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Herman van Hovell reassigned SPARK-21907:
-

Assignee: Eyal Farago

> NullPointerException in UnsafeExternalSorter.spill()
> 
>
> Key: SPARK-21907
> URL: https://issues.apache.org/jira/browse/SPARK-21907
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: Juliusz Sompolski
>Assignee: Eyal Farago
>
> I see NPE during sorting with the following stacktrace:
> {code}
> java.lang.NullPointerException
>   at 
> org.apache.spark.memory.TaskMemoryManager.getPage(TaskMemoryManager.java:383)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter$SortComparator.compare(UnsafeInMemorySorter.java:63)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter$SortComparator.compare(UnsafeInMemorySorter.java:43)
>   at 
> org.apache.spark.util.collection.TimSort.countRunAndMakeAscending(TimSort.java:270)
>   at org.apache.spark.util.collection.TimSort.sort(TimSort.java:142)
>   at org.apache.spark.util.collection.Sorter.sort(Sorter.scala:37)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.getSortedIterator(UnsafeInMemorySorter.java:345)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:206)
>   at 
> org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:203)
>   at 
> org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:281)
>   at 
> org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:90)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset(UnsafeInMemorySorter.java:173)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:221)
>   at 
> org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:203)
>   at 
> org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:281)
>   at 
> org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:90)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.growPointerArrayIfNecessary(UnsafeExternalSorter.java:349)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:400)
>   at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:109)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
>   at 
> org.apache.spark.sql.execution.RowIteratorFromScala.advanceNext(RowIterator.scala:83)
>   at 
> org.apache.spark.sql.execution.joins.SortMergeJoinScanner.advancedStreamed(SortMergeJoinExec.scala:778)
>   at 
> org.apache.spark.sql.execution.joins.SortMergeJoinScanner.findNextInnerJoinRows(SortMergeJoinExec.scala:685)
>   at 
> org.apache.spark.sql.execution.joins.SortMergeJoinExec$$anonfun$doExecute$1$$anon$2.advanceNext(SortMergeJoinExec.scala:259)
>   at 
> org.apache.spark.sql.execution.RowIteratorToScala.hasNext(RowIterator.scala:68)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>   at 
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
>   at org.apache.spark.scheduler.Task.run(Task.scala:108)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:346)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> 

[jira] [Commented] (SPARK-22237) Spark submit script should use downloaded files in standalone/local client mode

2017-10-10 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22237?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16199296#comment-16199296
 ] 

Apache Spark commented on SPARK-22237:
--

User 'loneknightpy' has created a pull request for this issue:
https://github.com/apache/spark/pull/19466

> Spark submit script should use downloaded files in standalone/local client 
> mode
> ---
>
> Key: SPARK-22237
> URL: https://issues.apache.org/jira/browse/SPARK-22237
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: Yu Peng
>
> SPARK-10643 is added to allow spark-submit script to download jars/files from 
> remote hadoop file system in client mode. This is currently broken right now. 
> cc: [~smilegator]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-22237) Spark submit script should use downloaded files in standalone/local client mode

2017-10-10 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22237?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-22237:


Assignee: (was: Apache Spark)

> Spark submit script should use downloaded files in standalone/local client 
> mode
> ---
>
> Key: SPARK-22237
> URL: https://issues.apache.org/jira/browse/SPARK-22237
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: Yu Peng
>
> SPARK-10643 is added to allow spark-submit script to download jars/files from 
> remote hadoop file system in client mode. This is currently broken right now. 
> cc: [~smilegator]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-22237) Spark submit script should use downloaded files in standalone/local client mode

2017-10-10 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22237?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-22237:


Assignee: Apache Spark

> Spark submit script should use downloaded files in standalone/local client 
> mode
> ---
>
> Key: SPARK-22237
> URL: https://issues.apache.org/jira/browse/SPARK-22237
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: Yu Peng
>Assignee: Apache Spark
>
> SPARK-10643 is added to allow spark-submit script to download jars/files from 
> remote hadoop file system in client mode. This is currently broken right now. 
> cc: [~smilegator]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-22237) Spark submit script should use downloaded files in standalone/local client mode

2017-10-10 Thread Yu Peng (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22237?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yu Peng updated SPARK-22237:

Description: 
SPARK-10643 is added to allow spark-submit script to download jars/files from 
remote hadoop file system in client mode. This is currently broken right now. 

cc: [~smilegator]

  was:SPARK-10643 is added to allow spark-submit script to download jars/files 
from remote hadoop file system in client mode. This is currently broken right 
now. 


> Spark submit script should use downloaded files in standalone/local client 
> mode
> ---
>
> Key: SPARK-22237
> URL: https://issues.apache.org/jira/browse/SPARK-22237
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: Yu Peng
>
> SPARK-10643 is added to allow spark-submit script to download jars/files from 
> remote hadoop file system in client mode. This is currently broken right now. 
> cc: [~smilegator]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-22237) Spark submit script should use downloaded files in standalone/local client mode

2017-10-10 Thread Yu Peng (JIRA)
Yu Peng created SPARK-22237:
---

 Summary: Spark submit script should use downloaded files in 
standalone/local client mode
 Key: SPARK-22237
 URL: https://issues.apache.org/jira/browse/SPARK-22237
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.2.0
Reporter: Yu Peng


SPARK-10643 is added to allow spark-submit script to download jars/files from 
remote hadoop file system in client mode. This is currently broken right now. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22229) SPIP: RDMA Accelerated Shuffle Engine

2017-10-10 Thread Yuval Degani (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-9?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16199293#comment-16199293
 ] 

Yuval Degani commented on SPARK-9:
--

We already published an open-source package that implements an RDMA shuffle 
engine: it is available on https://github.com/Mellanox/SparkRDMA.
We have been working on this project for about a year now, and have a growing 
list of enterprises that use it in production, or intend to do so after 
pre-production testing in the very near future.

I think whether to introduce it to Spark or leave it as an external plugin is 
kind of a chicken or egg question.
Not having it available in mainstream Spark is currently what's blocking 
widespread adoptions.

I limited the goals for this SPIP to the shuffle engine, since I didn't want to 
impose a cross-component change.
Actually, it probably makes more sense to integrate RDMA a little lower in the 
stack by offering an alternative for the "NettyBlockTransferService" which 
implements the "BlockTransferService interface". However, this interface is not 
pluginable in today's Spark code.
By implementing an "RdmaBlockTransferService", we can instantly allow RDMA 
transfers across the board for broadcasts, remote RDDs, task results and 
shuffles of course.

My plan is to have those RDMA capabilities up and running for shuffle, and let 
it be the first step in introducing RDMA to Spark.
There's a lot more that RDMA can do in the context of Spark besides those I 
already mentioned: RPC messaging over RDMA, GPUDirect (remotely access GPU 
memory on NVIDIA - crucial for ML applications, and the main driver for 
implementing RDMA in TensorFlow and Caffe2), and more.

> SPIP: RDMA Accelerated Shuffle Engine
> -
>
> Key: SPARK-9
> URL: https://issues.apache.org/jira/browse/SPARK-9
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: Yuval Degani
> Attachments: 
> SPARK-9_SPIP_RDMA_Accelerated_Shuffle_Engine_Rev_1.0.pdf
>
>
> An RDMA-accelerated shuffle engine can provide enormous performance benefits 
> to shuffle-intensive Spark jobs, as demonstrated in the “SparkRDMA” plugin 
> open-source project ([https://github.com/Mellanox/SparkRDMA]).
> Using RDMA for shuffle improves CPU utilization significantly and reduces I/O 
> processing overhead by bypassing the kernel and networking stack as well as 
> avoiding memory copies entirely. Those valuable CPU cycles are then consumed 
> directly by the actual Spark workloads, and help reducing the job runtime 
> significantly. 
> This performance gain is demonstrated with both industry standard HiBench 
> TeraSort (shows 1.5x speedup in sorting) as well as shuffle intensive 
> customer applications. 
> SparkRDMA will be presented at Spark Summit 2017 in Dublin 
> ([https://spark-summit.org/eu-2017/events/accelerating-shuffle-a-tailor-made-rdma-solution-for-apache-spark/]).
> Please see attached proposal document for more information.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-22231) Support of map, filter, withColumn, dropColumn in nested list of structures

2017-10-10 Thread Nathan Kronenfeld (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16198861#comment-16198861
 ] 

Nathan Kronenfeld edited comment on SPARK-22231 at 10/10/17 7:39 PM:
-

One couple related concerns... 

# I think Column.withColumn and Dataset.withColumn should have the same 
signature.  I don't really care which it uses - your pattern works just as well 
as the pattern currently used by Dataset - but consistency between the two 
would be nice.
Perhaps even we could allow either pattern in both places?
# Similarly, perhaps Column should also get a withColumnRenamed method?
# Similarly, is there any reason mapItems should work at the column level, but 
not at the Dataset level? 

In response to [~jeremyrsmith] below:
re: using withField instead of withColumn - that seems perfectly reasonable (I 
could accept either way); the description of this issue uses withColumn, that 
probably want to be updated then.
re: mapItems on a row: I was thinking more of it acting on the row itself, 
looking at it as an array of objects.  Changing 'withColumn' to 'withField' may 
make a sharp enough break to make this a non-issue, though.


was (Author: nkronenfeld):
One couple related concerns... 

# I think Column.withColumn and Dataset.withColumn should have the same 
signature.  I don't really care which it uses - your pattern works just as well 
as the pattern currently used by Dataset - but consistency between the two 
would be nice.
Perhaps even we could allow either pattern in both places?
# Similarly, perhaps Column should also get a withColumnRenamed method?
# Similarly, is there any reason mapItems should work at the column level, but 
not at the Dataset level? 
In response to [~jeremyrsmith] below:
re: using withField instead of withColumn - that seems perfectly reasonable (I 
could accept either way); the description of this issue uses withColumn, that 
probably want to be updated then.
re: mapItems on a row: I was thinking more of it acting on the row itself, 
looking at it as an array of objects.  Changing 'withColumn' to 'withField' may 
make a sharp enough break to make this a non-issue, though.

> Support of map, filter, withColumn, dropColumn in nested list of structures
> ---
>
> Key: SPARK-22231
> URL: https://issues.apache.org/jira/browse/SPARK-22231
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: DB Tsai
>Assignee: Jeremy Smith
>
> At Netflix's algorithm team, we work on ranking problems to find the great 
> content to fulfill the unique tastes of our members. Before building a 
> recommendation algorithms, we need to prepare the training, testing, and 
> validation datasets in Apache Spark. Due to the nature of ranking problems, 
> we have a nested list of items to be ranked in one column, and the top level 
> is the contexts describing the setting for where a model is to be used (e.g. 
> profiles, country, time, device, etc.)  Here is a blog post describing the 
> details, [Distributed Time Travel for Feature 
> Generation|https://medium.com/netflix-techblog/distributed-time-travel-for-feature-generation-389cccdd3907].
>  
> To be more concrete, for the ranks of videos for a given profile_id at a 
> given country, our data schema can be looked like this,
> {code:java}
> root
>  |-- profile_id: long (nullable = true)
>  |-- country_iso_code: string (nullable = true)
>  |-- items: array (nullable = false)
>  ||-- element: struct (containsNull = false)
>  |||-- title_id: integer (nullable = true)
>  |||-- scores: double (nullable = true)
> ...
> {code}
> We oftentimes need to work on the nested list of structs by applying some 
> functions on them. Sometimes, we're dropping or adding new columns in the 
> nested list of structs. Currently, there is no easy solution in open source 
> Apache Spark to perform those operations using SQL primitives; many people 
> just convert the data into RDD to work on the nested level of data, and then 
> reconstruct the new dataframe as workaround. This is extremely inefficient 
> because all the optimizations like predicate pushdown in SQL can not be 
> performed, we can not leverage on the columnar format, and the serialization 
> and deserialization cost becomes really huge even we just want to add a new 
> column in the nested level.
> We built a solution internally at Netflix which we're very happy with. We 
> plan to make it open source in Spark upstream. We would like to socialize the 
> API design to see if we miss any use-case.  
> The first API we added is *mapItems* on dataframe which take a function from 
> *Column* to *Column*, and then apply the function on nested dataframe. Here 
> is an example,
> 

[jira] [Comment Edited] (SPARK-22231) Support of map, filter, withColumn, dropColumn in nested list of structures

2017-10-10 Thread Nathan Kronenfeld (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16198861#comment-16198861
 ] 

Nathan Kronenfeld edited comment on SPARK-22231 at 10/10/17 7:38 PM:
-

One couple related concerns... 

# I think Column.withColumn and Dataset.withColumn should have the same 
signature.  I don't really care which it uses - your pattern works just as well 
as the pattern currently used by Dataset - but consistency between the two 
would be nice.
Perhaps even we could allow either pattern in both places?
# Similarly, perhaps Column should also get a withColumnRenamed method?
# Similarly, is there any reason mapItems should work at the column level, but 
not at the Dataset level? 
In response to [~jeremyrsmith] below:
re: using withField instead of withColumn - that seems perfectly reasonable (I 
could accept either way); the description of this issue uses withColumn, that 
probably want to be updated then.
re: mapItems on a row: I was thinking more of it acting on the row itself, 
looking at it as an array of objects.  Changing 'withColumn' to 'withField' may 
make a sharp enough break to make this a non-issue, though.


was (Author: nkronenfeld):
One couple related concerns... 

# I think Column.withColumn and Dataset.withColumn should have the same 
signature.  I don't really care which it uses - your pattern works just as well 
as the pattern currently used by Dataset - but consistency between the two 
would be nice.
Perhaps even we could allow either pattern in both places?
# Similarly, perhaps Column should also get a withColumnRenamed method?
# Similarly, is there any reason mapItems should work at the column level, but 
not at the Dataset level? 

> Support of map, filter, withColumn, dropColumn in nested list of structures
> ---
>
> Key: SPARK-22231
> URL: https://issues.apache.org/jira/browse/SPARK-22231
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: DB Tsai
>Assignee: Jeremy Smith
>
> At Netflix's algorithm team, we work on ranking problems to find the great 
> content to fulfill the unique tastes of our members. Before building a 
> recommendation algorithms, we need to prepare the training, testing, and 
> validation datasets in Apache Spark. Due to the nature of ranking problems, 
> we have a nested list of items to be ranked in one column, and the top level 
> is the contexts describing the setting for where a model is to be used (e.g. 
> profiles, country, time, device, etc.)  Here is a blog post describing the 
> details, [Distributed Time Travel for Feature 
> Generation|https://medium.com/netflix-techblog/distributed-time-travel-for-feature-generation-389cccdd3907].
>  
> To be more concrete, for the ranks of videos for a given profile_id at a 
> given country, our data schema can be looked like this,
> {code:java}
> root
>  |-- profile_id: long (nullable = true)
>  |-- country_iso_code: string (nullable = true)
>  |-- items: array (nullable = false)
>  ||-- element: struct (containsNull = false)
>  |||-- title_id: integer (nullable = true)
>  |||-- scores: double (nullable = true)
> ...
> {code}
> We oftentimes need to work on the nested list of structs by applying some 
> functions on them. Sometimes, we're dropping or adding new columns in the 
> nested list of structs. Currently, there is no easy solution in open source 
> Apache Spark to perform those operations using SQL primitives; many people 
> just convert the data into RDD to work on the nested level of data, and then 
> reconstruct the new dataframe as workaround. This is extremely inefficient 
> because all the optimizations like predicate pushdown in SQL can not be 
> performed, we can not leverage on the columnar format, and the serialization 
> and deserialization cost becomes really huge even we just want to add a new 
> column in the nested level.
> We built a solution internally at Netflix which we're very happy with. We 
> plan to make it open source in Spark upstream. We would like to socialize the 
> API design to see if we miss any use-case.  
> The first API we added is *mapItems* on dataframe which take a function from 
> *Column* to *Column*, and then apply the function on nested dataframe. Here 
> is an example,
> {code:java}
> case class Data(foo: Int, bar: Double, items: Seq[Double])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(10.1, 10.2, 10.3, 10.4)),
>   Data(20, 20.0, Seq(20.1, 20.2, 20.3, 20.4))
> ))
> val result = df.mapItems("items") {
>   item => item * 2.0
> }
> result.printSchema()
> // root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // |

[jira] [Commented] (SPARK-22236) CSV I/O: does not respect RFC 4180

2017-10-10 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22236?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16199245#comment-16199245
 ] 

Sean Owen commented on SPARK-22236:
---

Interesting, because the Univocity parser internally seems to default to 
RFC4180 settings. But the Spark implementation default overrides this with a 
default of {{\}}. [~hyukjin.kwon] was that for backwards compatibility with 
previous implementations? In any event I'm not sure we'd change the default 
behavior on this side of Spark 3.x, but, you can easily configure the writer to 
use double-quote for escape.

> CSV I/O: does not respect RFC 4180
> --
>
> Key: SPARK-22236
> URL: https://issues.apache.org/jira/browse/SPARK-22236
> Project: Spark
>  Issue Type: Improvement
>  Components: Input/Output
>Affects Versions: 2.2.0
>Reporter: Ondrej Kokes
>Priority: Minor
>
> When reading or writing CSV files with Spark, double quotes are escaped with 
> a backslash by default. However, the appropriate behaviour as set out by RFC 
> 4180 (and adhered to by many software packages) is to escape using a second 
> double quote.
> This piece of Python code demonstrates the issue
> {code}
> import csv
> with open('testfile.csv', 'w') as f:
> cw = csv.writer(f)
> cw.writerow(['a 2.5" drive', 'another column'])
> cw.writerow(['a "quoted" string', '"quoted"'])
> cw.writerow([1,2])
> with open('testfile.csv') as f:
> print(f.read())
> # "a 2.5"" drive",another column
> # "a ""quoted"" string","""quoted"""
> # 1,2
> spark.read.csv('testfile.csv').collect()
> # [Row(_c0='"a 2.5"" drive"', _c1='another column'),
> #  Row(_c0='"a ""quoted"" string"', _c1='"""quoted"""'),
> #  Row(_c0='1', _c1='2')]
> # explicitly stating the escape character fixed the issue
> spark.read.option('escape', '"').csv('testfile.csv').collect()
> # [Row(_c0='a 2.5" drive', _c1='another column'),
> #  Row(_c0='a "quoted" string', _c1='"quoted"'),
> #  Row(_c0='1', _c1='2')]
> {code}
> The same applies to writes, where reading the file written by Spark may 
> result in garbage.
> {code}
> df = spark.read.option('escape', '"').csv('testfile.csv') # reading the file 
> correctly
> df.write.format("csv").save('testout.csv')
> with open('testout.csv/part-csv') as f:
> cr = csv.reader(f)
> print(next(cr))
> print(next(cr))
> # ['a 2.5\\ drive"', 'another column']
> # ['a \\quoted\\" string"', '\\quoted\\""']
> {code}
> While it's possible to work with CSV files in a "compatible" manner, it would 
> be useful if Spark had sensible defaults that conform to the above-mentioned 
> RFC (as well as W3C recommendations). I realise this would be a breaking 
> change and thus if accepted, it would probably need to result in a warning 
> first, before moving to a new default.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22229) SPIP: RDMA Accelerated Shuffle Engine

2017-10-10 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-9?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16199242#comment-16199242
 ] 

Sean Owen commented on SPARK-9:
---

It's still something that should start as a package. I think that makes it easy 
to show demand for it and benefit.

If it's really a whole separate implementation, that's a non-trivial amount of 
code to ask the project to maintain. This has been tricky where proposed in the 
past because it probably means relying on behaviors that may not have been 
guaranteed internally before.

If it's something pretty simple, maybe it need not be a whole different shuffle 
engine? Not sure how different it is.


> SPIP: RDMA Accelerated Shuffle Engine
> -
>
> Key: SPARK-9
> URL: https://issues.apache.org/jira/browse/SPARK-9
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: Yuval Degani
> Attachments: 
> SPARK-9_SPIP_RDMA_Accelerated_Shuffle_Engine_Rev_1.0.pdf
>
>
> An RDMA-accelerated shuffle engine can provide enormous performance benefits 
> to shuffle-intensive Spark jobs, as demonstrated in the “SparkRDMA” plugin 
> open-source project ([https://github.com/Mellanox/SparkRDMA]).
> Using RDMA for shuffle improves CPU utilization significantly and reduces I/O 
> processing overhead by bypassing the kernel and networking stack as well as 
> avoiding memory copies entirely. Those valuable CPU cycles are then consumed 
> directly by the actual Spark workloads, and help reducing the job runtime 
> significantly. 
> This performance gain is demonstrated with both industry standard HiBench 
> TeraSort (shows 1.5x speedup in sorting) as well as shuffle intensive 
> customer applications. 
> SparkRDMA will be presented at Spark Summit 2017 in Dublin 
> ([https://spark-summit.org/eu-2017/events/accelerating-shuffle-a-tailor-made-rdma-solution-for-apache-spark/]).
> Please see attached proposal document for more information.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21988) Add default stats to StreamingExecutionRelation

2017-10-10 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16199182#comment-16199182
 ] 

Apache Spark commented on SPARK-21988:
--

User 'zsxwing' has created a pull request for this issue:
https://github.com/apache/spark/pull/19465

> Add default stats to StreamingExecutionRelation
> ---
>
> Key: SPARK-21988
> URL: https://issues.apache.org/jira/browse/SPARK-21988
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Jose Torres
>Assignee: Jose Torres
> Fix For: 2.3.0
>
>
> StreamingExecutionRelation currently doesn't implement stats.
> This makes some sense, but unfortunately the LeafNode contract requires that 
> nodes which survive analysis implement stats, and StreamingExecutionRelation 
> can indeed survive analysis when running explain() on a streaming dataframe.
> This value won't ever be used during execution, because 
> StreamingExecutionRelation does *not* survive analysis on the execution path; 
> it's replaced with each batch.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-22236) CSV I/O: does not respect RFC 4180

2017-10-10 Thread Ondrej Kokes (JIRA)
Ondrej Kokes created SPARK-22236:


 Summary: CSV I/O: does not respect RFC 4180
 Key: SPARK-22236
 URL: https://issues.apache.org/jira/browse/SPARK-22236
 Project: Spark
  Issue Type: Improvement
  Components: Input/Output
Affects Versions: 2.2.0
Reporter: Ondrej Kokes
Priority: Minor


When reading or writing CSV files with Spark, double quotes are escaped with a 
backslash by default. However, the appropriate behaviour as set out by RFC 4180 
(and adhered to by many software packages) is to escape using a second double 
quote.

This piece of Python code demonstrates the issue


{code}
import csv
with open('testfile.csv', 'w') as f:
cw = csv.writer(f)
cw.writerow(['a 2.5" drive', 'another column'])
cw.writerow(['a "quoted" string', '"quoted"'])
cw.writerow([1,2])

with open('testfile.csv') as f:
print(f.read())

# "a 2.5"" drive",another column
# "a ""quoted"" string","""quoted"""
# 1,2

spark.read.csv('testfile.csv').collect()

# [Row(_c0='"a 2.5"" drive"', _c1='another column'),
#  Row(_c0='"a ""quoted"" string"', _c1='"""quoted"""'),
#  Row(_c0='1', _c1='2')]

# explicitly stating the escape character fixed the issue
spark.read.option('escape', '"').csv('testfile.csv').collect()

# [Row(_c0='a 2.5" drive', _c1='another column'),
#  Row(_c0='a "quoted" string', _c1='"quoted"'),
#  Row(_c0='1', _c1='2')]

{code}

The same applies to writes, where reading the file written by Spark may result 
in garbage.

{code}
df = spark.read.option('escape', '"').csv('testfile.csv') # reading the file 
correctly
df.write.format("csv").save('testout.csv')
with open('testout.csv/part-csv') as f:
cr = csv.reader(f)
print(next(cr))
print(next(cr))

# ['a 2.5\\ drive"', 'another column']
# ['a \\quoted\\" string"', '\\quoted\\""']
{code}

While it's possible to work with CSV files in a "compatible" manner, it would 
be useful if Spark had sensible defaults that conform to the above-mentioned 
RFC (as well as W3C recommendations). I realise this would be a breaking change 
and thus if accepted, it would probably need to result in a warning first, 
before moving to a new default.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-22231) Support of map, filter, withColumn, dropColumn in nested list of structures

2017-10-10 Thread DB Tsai (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22231?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

DB Tsai reassigned SPARK-22231:
---

Assignee: Jeremy Smith

> Support of map, filter, withColumn, dropColumn in nested list of structures
> ---
>
> Key: SPARK-22231
> URL: https://issues.apache.org/jira/browse/SPARK-22231
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: DB Tsai
>Assignee: Jeremy Smith
>
> At Netflix's algorithm team, we work on ranking problems to find the great 
> content to fulfill the unique tastes of our members. Before building a 
> recommendation algorithms, we need to prepare the training, testing, and 
> validation datasets in Apache Spark. Due to the nature of ranking problems, 
> we have a nested list of items to be ranked in one column, and the top level 
> is the contexts describing the setting for where a model is to be used (e.g. 
> profiles, country, time, device, etc.)  Here is a blog post describing the 
> details, [Distributed Time Travel for Feature 
> Generation|https://medium.com/netflix-techblog/distributed-time-travel-for-feature-generation-389cccdd3907].
>  
> To be more concrete, for the ranks of videos for a given profile_id at a 
> given country, our data schema can be looked like this,
> {code:java}
> root
>  |-- profile_id: long (nullable = true)
>  |-- country_iso_code: string (nullable = true)
>  |-- items: array (nullable = false)
>  ||-- element: struct (containsNull = false)
>  |||-- title_id: integer (nullable = true)
>  |||-- scores: double (nullable = true)
> ...
> {code}
> We oftentimes need to work on the nested list of structs by applying some 
> functions on them. Sometimes, we're dropping or adding new columns in the 
> nested list of structs. Currently, there is no easy solution in open source 
> Apache Spark to perform those operations using SQL primitives; many people 
> just convert the data into RDD to work on the nested level of data, and then 
> reconstruct the new dataframe as workaround. This is extremely inefficient 
> because all the optimizations like predicate pushdown in SQL can not be 
> performed, we can not leverage on the columnar format, and the serialization 
> and deserialization cost becomes really huge even we just want to add a new 
> column in the nested level.
> We built a solution internally at Netflix which we're very happy with. We 
> plan to make it open source in Spark upstream. We would like to socialize the 
> API design to see if we miss any use-case.  
> The first API we added is *mapItems* on dataframe which take a function from 
> *Column* to *Column*, and then apply the function on nested dataframe. Here 
> is an example,
> {code:java}
> case class Data(foo: Int, bar: Double, items: Seq[Double])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(10.1, 10.2, 10.3, 10.4)),
>   Data(20, 20.0, Seq(20.1, 20.2, 20.3, 20.4))
> ))
> val result = df.mapItems("items") {
>   item => item * 2.0
> }
> result.printSchema()
> // root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // ||-- element: double (containsNull = true)
> result.show()
> // +---+++
> // |foo| bar|   items|
> // +---+++
> // | 10|10.0|[20.2, 20.4, 20.6...|
> // | 20|20.0|[40.2, 40.4, 40.6...|
> // +---+++
> {code}
> Now, with the ability of applying a function in the nested dataframe, we can 
> add a new function, *withColumn* in *Column* to add or replace the existing 
> column that has the same name in the nested list of struct. Here is two 
> examples demonstrating the API together with *mapItems*; the first one 
> replaces the existing column,
> {code:java}
> case class Item(a: Int, b: Double)
> case class Data(foo: Int, bar: Double, items: Seq[Item])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))),
>   Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0)))
> ))
> val result = df.mapItems("items") {
>   item => item.withColumn(item("b") + 1 as "b")
> }
> result.printSchema
> root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // ||-- element: struct (containsNull = true)
> // |||-- a: integer (nullable = true)
> // |||-- b: double (nullable = true)
> result.show(false)
> // +---++--+
> // |foo|bar |items |
> // +---++--+
> // |10 |10.0|[[10,11.0], [11,12.0]]|
> // |20 |20.0|[[20,21.0], [21,22.0]]|
> // +---++--+
> {code}
> and the second one adds 

[jira] [Commented] (SPARK-22229) SPIP: RDMA Accelerated Shuffle Engine

2017-10-10 Thread Yuval Degani (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-9?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16199052#comment-16199052
 ] 

Yuval Degani commented on SPARK-9:
--

[~srowen], [~viirya], thanks for your response.

Regarding whether RDMA requires specialized hardware:
RDMA is considered a commodity these days. You will find that most 10Gb/s+ 
network cards support it, and RDMA supported NICs are sold by many vendors: 
Mellanox, Intel, Broadcom, Chelsio, Cavium, HP, Dell, Emulex and more. As a 
matter of fact, most people are not even aware that their existing setups 
already support RDMA, and this is where we come in and try to make this 
technology accessible and seamless.
Also, cloud provider support is growing fast: Microsoft Azure A, H nodes 
support RDMA for a while now.

Regarding the pluggable mechanism:
I think that we, as Spark advocates and enthusiasts, would like to keep Spark 
as a framework that shows uncontested performance.
We see lower-level integration reaching almost every mainstream framework with 
GPU and ASIC most recently, and also RDMA is now taking its place.
RDMA is already supported natively in today's most popular distributed ML 
platforms: TensorFlow, Caffe2 and CNTK, and is being driven into others as well.

I think that in order for Spark to keep up with today's performance challenges, 
we must allow some lower-level integration, especially where mature and proven 
technologies such as RDMA are considered.

> SPIP: RDMA Accelerated Shuffle Engine
> -
>
> Key: SPARK-9
> URL: https://issues.apache.org/jira/browse/SPARK-9
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: Yuval Degani
> Attachments: 
> SPARK-9_SPIP_RDMA_Accelerated_Shuffle_Engine_Rev_1.0.pdf
>
>
> An RDMA-accelerated shuffle engine can provide enormous performance benefits 
> to shuffle-intensive Spark jobs, as demonstrated in the “SparkRDMA” plugin 
> open-source project ([https://github.com/Mellanox/SparkRDMA]).
> Using RDMA for shuffle improves CPU utilization significantly and reduces I/O 
> processing overhead by bypassing the kernel and networking stack as well as 
> avoiding memory copies entirely. Those valuable CPU cycles are then consumed 
> directly by the actual Spark workloads, and help reducing the job runtime 
> significantly. 
> This performance gain is demonstrated with both industry standard HiBench 
> TeraSort (shows 1.5x speedup in sorting) as well as shuffle intensive 
> customer applications. 
> SparkRDMA will be presented at Spark Summit 2017 in Dublin 
> ([https://spark-summit.org/eu-2017/events/accelerating-shuffle-a-tailor-made-rdma-solution-for-apache-spark/]).
> Please see attached proposal document for more information.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22231) Support of map, filter, withColumn, dropColumn in nested list of structures

2017-10-10 Thread Jeremy Smith (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16198965#comment-16198965
 ] 

Jeremy Smith commented on SPARK-22231:
--

I implemented these at Netflix, so I wanted to provide some more context and 
answer some of the questions above.

First, [~nkronenfeld]:
1. I named it `withField` on a column, because it makes it clear that it's 
expected to be a struct column and you're dealing with fields of the struct. My 
take was that `withColumn` on a Column is somewhat confusing.
2. Maybe `withFieldRenamed`, following the above reasoning?
3. `mapItems` works at the Dataset level as well. We have extension methods for 
`Column` and `Dataset` to provide `mapItems` and `filterItems`. On `Column` 
there are two versions:

  * `[map|filter]Items(fn: Column => Column)` assumes the column upon which 
it's invoked is an array column, and returns a new array column which is 
updated using the given `Column => Column` lambda. The input `Column` to the 
lambda represents an element of the array column.
  * `[map|filter]Items(fieldName: String)(fn: Column => Column): Column` 
assumes the column upon which it's invoked is a struct, and that `fieldName` is 
an array field of that struct. It applies the `Column => Column` lambda to each 
element of the array field, and returns a new struct column with the 
`fieldName` field updated and all other fields of the original struct preserved.

On `Dataset`, the enrichment provides only the second form of extension methods:

* `[map|filter]Items(columnName: String)(fn: Column => Column): Dataset[Row]` 
assumes the given `columnName` of the `Dataset` upon which it's invoked is an 
array column, and returns a new `Dataset` with that column mapped using the 
given `Column => Column` lambda. All other columns of the original `Dataset` 
are preserved.

I wanted to also add a bit of context around how these are implemented and why 
we didn't simply make a PR with the code.

`mapItems` and `filterItems` behind the scenes use catalyst expressions for 
their implementation. `mapItems` simply provides an interface to the existing 
`MapObjects` expression, and `filterItems` uses a similar expression 
`FilterObjects` that we implemented. This presents a couple of issues with 
implementation:

1. `MapObjects` does not support `eval`, so if you get into the `eval` codepath 
(i.e. with anything that does `CodegenFallback`) then it will fail. We mitigate 
this by never using the default `CodegenFallback` and instead implemented our 
own `CodegenFallback`-like traits which use `eval` only for the current 
expression but use codegen to evaluate child expressions. We implemented both 
codegen and eval for `FilterObjects`, and we had originally intended to also 
implement eval for `MapObjects` and submit both in a PR. But it proved very 
difficult to test these in the existing expression test framework in the Spark 
codebase (we used our own test harness to test both code paths of 
`FilterObjects`). So that effort never came to fruition. Note that we're 
currently on Spark 2.0 with some backported patches, so maybe this will be 
easier when we migrate to Spark 2.1?
2. In order to work properly, we had to make sure that the columns given to 
`MapObjects` and `FilterObjects` are fully resolved when the expression is 
created; this was a while ago, but I think the reason was that transformation 
rules during analysis aren't easily extensible and the rules around 
`MapObjects` enforce that it must be fully resolved before analysis. You also 
need a DataType to pass to these, which you can only get from a resolved 
column. So you currently must do something like `df("col")` rather than 
`$"col"`, or you'll get an immediate exception. Another issue with this is that 
if you're doing nested transformations with `mapItems`, you cannot have fully 
resolved columns. We mitigated this by doing an out-of-band "ad-hoc analysis" 
step where we manually traverse the expression tree and replace 
`UnresolvedAlias` and `UnresolvedExtractValue` with their resolved versions, 
purely for the purposes of getting the DataType out. This is obviously not 
ideal, but again it's something that might require less workaround in newer 
Spark versions.

> Support of map, filter, withColumn, dropColumn in nested list of structures
> ---
>
> Key: SPARK-22231
> URL: https://issues.apache.org/jira/browse/SPARK-22231
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: DB Tsai
>
> At Netflix's algorithm team, we work on ranking problems to find the great 
> content to fulfill the unique tastes of our members. Before building a 
> recommendation algorithms, we need to prepare the training, testing, and 
> validation datasets in Apache 

[jira] [Commented] (SPARK-22231) Support of map, filter, withColumn, dropColumn in nested list of structures

2017-10-10 Thread Nathan Kronenfeld (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16198861#comment-16198861
 ] 

Nathan Kronenfeld commented on SPARK-22231:
---

One couple related concerns... 

# I think Column.withColumn and Dataset.withColumn should have the same 
signature.  I don't really care which it uses - your pattern works just as well 
as the pattern currently used by Dataset - but consistency between the two 
would be nice.
Perhaps even we could allow either pattern in both places?
# Similarly, perhaps Column should also get a withColumnRenamed method?
# Similarly, is there any reason mapItems should work at the column level, but 
not at the Dataset level? 

> Support of map, filter, withColumn, dropColumn in nested list of structures
> ---
>
> Key: SPARK-22231
> URL: https://issues.apache.org/jira/browse/SPARK-22231
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: DB Tsai
>
> At Netflix's algorithm team, we work on ranking problems to find the great 
> content to fulfill the unique tastes of our members. Before building a 
> recommendation algorithms, we need to prepare the training, testing, and 
> validation datasets in Apache Spark. Due to the nature of ranking problems, 
> we have a nested list of items to be ranked in one column, and the top level 
> is the contexts describing the setting for where a model is to be used (e.g. 
> profiles, country, time, device, etc.)  Here is a blog post describing the 
> details, [Distributed Time Travel for Feature 
> Generation|https://medium.com/netflix-techblog/distributed-time-travel-for-feature-generation-389cccdd3907].
>  
> To be more concrete, for the ranks of videos for a given profile_id at a 
> given country, our data schema can be looked like this,
> {code:java}
> root
>  |-- profile_id: long (nullable = true)
>  |-- country_iso_code: string (nullable = true)
>  |-- items: array (nullable = false)
>  ||-- element: struct (containsNull = false)
>  |||-- title_id: integer (nullable = true)
>  |||-- scores: double (nullable = true)
> ...
> {code}
> We oftentimes need to work on the nested list of structs by applying some 
> functions on them. Sometimes, we're dropping or adding new columns in the 
> nested list of structs. Currently, there is no easy solution in open source 
> Apache Spark to perform those operations using SQL primitives; many people 
> just convert the data into RDD to work on the nested level of data, and then 
> reconstruct the new dataframe as workaround. This is extremely inefficient 
> because all the optimizations like predicate pushdown in SQL can not be 
> performed, we can not leverage on the columnar format, and the serialization 
> and deserialization cost becomes really huge even we just want to add a new 
> column in the nested level.
> We built a solution internally at Netflix which we're very happy with. We 
> plan to make it open source in Spark upstream. We would like to socialize the 
> API design to see if we miss any use-case.  
> The first API we added is *mapItems* on dataframe which take a function from 
> *Column* to *Column*, and then apply the function on nested dataframe. Here 
> is an example,
> {code:java}
> case class Data(foo: Int, bar: Double, items: Seq[Double])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(10.1, 10.2, 10.3, 10.4)),
>   Data(20, 20.0, Seq(20.1, 20.2, 20.3, 20.4))
> ))
> val result = df.mapItems("items") {
>   item => item * 2.0
> }
> result.printSchema()
> // root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // ||-- element: double (containsNull = true)
> result.show()
> // +---+++
> // |foo| bar|   items|
> // +---+++
> // | 10|10.0|[20.2, 20.4, 20.6...|
> // | 20|20.0|[40.2, 40.4, 40.6...|
> // +---+++
> {code}
> Now, with the ability of applying a function in the nested dataframe, we can 
> add a new function, *withColumn* in *Column* to add or replace the existing 
> column that has the same name in the nested list of struct. Here is two 
> examples demonstrating the API together with *mapItems*; the first one 
> replaces the existing column,
> {code:java}
> case class Item(a: Int, b: Double)
> case class Data(foo: Int, bar: Double, items: Seq[Item])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))),
>   Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0)))
> ))
> val result = df.mapItems("items") {
>   item => item.withColumn(item("b") + 1 as "b")
> }
> result.printSchema
> root
> // |-- foo: integer (nullable = false)
> // |-- 

[jira] [Resolved] (SPARK-22212) Some SQL functions in Python fail with string column name

2017-10-10 Thread Jakub Nowacki (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22212?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jakub Nowacki resolved SPARK-22212.
---
Resolution: Later

Keeping the resolution on-hold until API unification consensus will be reached. 

> Some SQL functions in Python fail with string column name 
> --
>
> Key: SPARK-22212
> URL: https://issues.apache.org/jira/browse/SPARK-22212
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark, SQL
>Affects Versions: 2.2.0
>Reporter: Jakub Nowacki
>Priority: Minor
>
> Most of the functions in {{pyspark.sql.functions}} allow usage of both column 
> name string and {{Column}} object. But there are some functions, like 
> {{trim}}, that require to pass only {{Column}}. See below code for 
> explanation.
> {code}
> >>> import pyspark.sql.functions as func
> >>> df = spark.createDataFrame([tuple(l) for l in "abcde"], ["text"])
> >>> df.select(func.trim(df["text"])).show()
> +--+
> |trim(text)|
> +--+
> | a|
> | b|
> | c|
> | d|
> | e|
> +--+
> >>> df.select(func.trim("text")).show()
> [...]
> Py4JError: An error occurred while calling 
> z:org.apache.spark.sql.functions.trim. Trace:
> py4j.Py4JException: Method trim([class java.lang.String]) does not exist
> at 
> py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
> at 
> py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:339)
> at py4j.Gateway.invoke(Gateway.java:274)
> at 
> py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> at py4j.commands.CallCommand.execute(CallCommand.java:79)
> at py4j.GatewayConnection.run(GatewayConnection.java:214)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> This is because most of the Python function calls map column name to Column 
> in the Python function mapping, but functions created via 
> {{_create_function}} pass them as is, if they are not {{Column}}.
> I am preparing PR with the proposed fix.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20171) Analyzer should include the arity of a function when reporting "AnalysisException: Undefined function"

2017-10-10 Thread Hyukjin Kwon (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20171?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16198719#comment-16198719
 ] 

Hyukjin Kwon commented on SPARK-20171:
--

I happen to look at this JIRA. Looks we now throw a different exception now:

{code}
scala> sql("SELECT cast(id, INT) FROM range(1)").queryExecution.analyzed
org.apache.spark.sql.AnalysisException: cannot resolve '`INT`' given input 
columns: [id]; line 1 pos 16;
'Project [unresolvedalias('cast(id#1L, 'INT), None)]
+- Range (0, 1, step=1, splits=None)
{code}

> Analyzer should include the arity of a function when reporting 
> "AnalysisException: Undefined function"
> --
>
> Key: SPARK-20171
> URL: https://issues.apache.org/jira/browse/SPARK-20171
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Jacek Laskowski
>Priority: Minor
>
> {{Analyzer}} reports {{AnalysisException}} when the arity of the function to 
> be looked up is incorrect.
> The following {{AnalysisException}} was because the arity of {{cast}} was 2 
> (not 1), but it said {{cast}} was not available which is only partially 
> correct. It'd be much more helpful to say something like {{"Undefined 
> function: 'cast' with 2 params"}}.
> Hint: It'd be even nicer if the exception included candidates like: {{Did you 
> mean "cast with 1 param"?}}
> {code}
> scala> sql("SELECT cast(id, INT) FROM range(1)").queryExecution.analyzed
> org.apache.spark.sql.AnalysisException: Undefined function: 'cast'. This 
> function is neither a registered temporary function nor a permanent function 
> registered in the database 'default'.; line 1 pos 7
>   at 
> org.apache.spark.sql.catalyst.analysis.Analyzer$LookupFunctions$$anonfun$apply$13$$anonfun$applyOrElse$44.apply(Analyzer.scala:1130)
>   at 
> org.apache.spark.sql.catalyst.analysis.Analyzer$LookupFunctions$$anonfun$apply$13$$anonfun$applyOrElse$44.apply(Analyzer.scala:1130)
>   at 
> org.apache.spark.sql.catalyst.analysis.package$.withPosition(package.scala:48)
>   at 
> org.apache.spark.sql.catalyst.analysis.Analyzer$LookupFunctions$$anonfun$apply$13.applyOrElse(Analyzer.scala:1129)
>   at 
> org.apache.spark.sql.catalyst.analysis.Analyzer$LookupFunctions$$anonfun$apply$13.applyOrElse(Analyzer.scala:1127)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22229) SPIP: RDMA Accelerated Shuffle Engine

2017-10-10 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-9?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16198716#comment-16198716
 ] 

Liang-Chi Hsieh commented on SPARK-9:
-

As we have the pluggable mechanism to set up external shuffle manager, is there 
any urgent reason to include this into Spark?

> SPIP: RDMA Accelerated Shuffle Engine
> -
>
> Key: SPARK-9
> URL: https://issues.apache.org/jira/browse/SPARK-9
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: Yuval Degani
> Attachments: 
> SPARK-9_SPIP_RDMA_Accelerated_Shuffle_Engine_Rev_1.0.pdf
>
>
> An RDMA-accelerated shuffle engine can provide enormous performance benefits 
> to shuffle-intensive Spark jobs, as demonstrated in the “SparkRDMA” plugin 
> open-source project ([https://github.com/Mellanox/SparkRDMA]).
> Using RDMA for shuffle improves CPU utilization significantly and reduces I/O 
> processing overhead by bypassing the kernel and networking stack as well as 
> avoiding memory copies entirely. Those valuable CPU cycles are then consumed 
> directly by the actual Spark workloads, and help reducing the job runtime 
> significantly. 
> This performance gain is demonstrated with both industry standard HiBench 
> TeraSort (shows 1.5x speedup in sorting) as well as shuffle intensive 
> customer applications. 
> SparkRDMA will be presented at Spark Summit 2017 in Dublin 
> ([https://spark-summit.org/eu-2017/events/accelerating-shuffle-a-tailor-made-rdma-solution-for-apache-spark/]).
> Please see attached proposal document for more information.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20162) Reading data from MySQL - Cannot up cast from decimal(30,6) to decimal(38,18)

2017-10-10 Thread Hyukjin Kwon (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16198712#comment-16198712
 ] 

Hyukjin Kwon commented on SPARK-20162:
--

Let me resolve it as Cannot Reproduce. Please reopen this if anyone can 
reproduce this or any step is given here to reproduce this.

> Reading data from MySQL - Cannot up cast from decimal(30,6) to decimal(38,18)
> -
>
> Key: SPARK-20162
> URL: https://issues.apache.org/jira/browse/SPARK-20162
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: Miroslav Spehar
>
> While reading data from MySQL, type conversion doesn't work for Decimal type 
> when the decimal in database is of lower precision/scale than the one spark 
> expects.
> Error:
> Exception in thread "main" org.apache.spark.sql.AnalysisException: Cannot up 
> cast `DECIMAL_AMOUNT` from decimal(30,6) to decimal(38,18) as it may truncate
> The type path of the target object is:
> - field (class: "org.apache.spark.sql.types.Decimal", name: "DECIMAL_AMOUNT")
> - root class: "com.misp.spark.Structure"
> You can either add an explicit cast to the input data or choose a higher 
> precision type of the field in the target object;
>   at 
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveUpCast$$fail(Analyzer.scala:2119)
>   at 
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$$anonfun$apply$34$$anonfun$applyOrElse$14.applyOrElse(Analyzer.scala:2141)
>   at 
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$$anonfun$apply$34$$anonfun$applyOrElse$14.applyOrElse(Analyzer.scala:2136)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:288)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:288)
>   at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:287)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:293)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:293)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5$$anonfun$apply$11.apply(TreeNode.scala:360)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
>   at scala.collection.immutable.List.map(List.scala:285)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:358)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:329)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:293)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionDown$1(QueryPlan.scala:248)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:258)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$6.apply(QueryPlan.scala:267)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsDown(QueryPlan.scala:267)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressions(QueryPlan.scala:236)
>   at 
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$$anonfun$apply$34.applyOrElse(Analyzer.scala:2136)
>   at 
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$$anonfun$apply$34.applyOrElse(Analyzer.scala:2132)
>   at 
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:61)
>   at 
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:61)
>   at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
>   at 
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:60)
>   at 
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$.apply(Analyzer.scala:2132)
>   at 
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$.apply(Analyzer.scala:2117)
>   at 
> 

[jira] [Resolved] (SPARK-20162) Reading data from MySQL - Cannot up cast from decimal(30,6) to decimal(38,18)

2017-10-10 Thread Hyukjin Kwon (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-20162.
--
Resolution: Cannot Reproduce

> Reading data from MySQL - Cannot up cast from decimal(30,6) to decimal(38,18)
> -
>
> Key: SPARK-20162
> URL: https://issues.apache.org/jira/browse/SPARK-20162
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: Miroslav Spehar
>
> While reading data from MySQL, type conversion doesn't work for Decimal type 
> when the decimal in database is of lower precision/scale than the one spark 
> expects.
> Error:
> Exception in thread "main" org.apache.spark.sql.AnalysisException: Cannot up 
> cast `DECIMAL_AMOUNT` from decimal(30,6) to decimal(38,18) as it may truncate
> The type path of the target object is:
> - field (class: "org.apache.spark.sql.types.Decimal", name: "DECIMAL_AMOUNT")
> - root class: "com.misp.spark.Structure"
> You can either add an explicit cast to the input data or choose a higher 
> precision type of the field in the target object;
>   at 
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveUpCast$$fail(Analyzer.scala:2119)
>   at 
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$$anonfun$apply$34$$anonfun$applyOrElse$14.applyOrElse(Analyzer.scala:2141)
>   at 
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$$anonfun$apply$34$$anonfun$applyOrElse$14.applyOrElse(Analyzer.scala:2136)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:288)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:288)
>   at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:287)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:293)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:293)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5$$anonfun$apply$11.apply(TreeNode.scala:360)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
>   at scala.collection.immutable.List.map(List.scala:285)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:358)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:329)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:293)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionDown$1(QueryPlan.scala:248)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:258)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$6.apply(QueryPlan.scala:267)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsDown(QueryPlan.scala:267)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressions(QueryPlan.scala:236)
>   at 
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$$anonfun$apply$34.applyOrElse(Analyzer.scala:2136)
>   at 
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$$anonfun$apply$34.applyOrElse(Analyzer.scala:2132)
>   at 
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:61)
>   at 
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:61)
>   at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
>   at 
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:60)
>   at 
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$.apply(Analyzer.scala:2132)
>   at 
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$.apply(Analyzer.scala:2117)
>   at 
> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85)
>   at 
> 

[jira] [Commented] (SPARK-19860) DataFrame join get conflict error if two frames has a same name column.

2017-10-10 Thread Hyukjin Kwon (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19860?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16198704#comment-16198704
 ] 

Hyukjin Kwon commented on SPARK-19860:
--

[~wuchang1989], would you mind if I ask self-contained reproducer? It looks 
hard to understand or reproduce.

> DataFrame join get conflict error if two frames has a same name column.
> ---
>
> Key: SPARK-19860
> URL: https://issues.apache.org/jira/browse/SPARK-19860
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.1.0
>Reporter: wuchang
>
> {code}
> >>> print df1.collect()
> [Row(fdate=u'20170223', in_amount1=7758588), Row(fdate=u'20170302', 
> in_amount1=7656414), Row(fdate=u'20170207', in_amount1=7836305), 
> Row(fdate=u'20170208', in_amount1=14887432), Row(fdate=u'20170224', 
> in_amount1=16506043), Row(fdate=u'20170201', in_amount1=7339381), 
> Row(fdate=u'20170221', in_amount1=7490447), Row(fdate=u'20170303', 
> in_amount1=11142114), Row(fdate=u'20170202', in_amount1=7882746), 
> Row(fdate=u'20170306', in_amount1=12977822), Row(fdate=u'20170227', 
> in_amount1=15480688), Row(fdate=u'20170206', in_amount1=11370812), 
> Row(fdate=u'20170217', in_amount1=8208985), Row(fdate=u'20170203', 
> in_amount1=8175477), Row(fdate=u'20170222', in_amount1=11032303), 
> Row(fdate=u'20170216', in_amount1=11986702), Row(fdate=u'20170209', 
> in_amount1=9082380), Row(fdate=u'20170214', in_amount1=8142569), 
> Row(fdate=u'20170307', in_amount1=11092829), Row(fdate=u'20170213', 
> in_amount1=12341887), Row(fdate=u'20170228', in_amount1=13966203), 
> Row(fdate=u'20170220', in_amount1=9397558), Row(fdate=u'20170210', 
> in_amount1=8205431), Row(fdate=u'20170215', in_amount1=7070829), 
> Row(fdate=u'20170301', in_amount1=10159653)]
> >>> print df2.collect()
> [Row(fdate=u'20170223', in_amount2=7072120), Row(fdate=u'20170302', 
> in_amount2=5548515), Row(fdate=u'20170207', in_amount2=5451110), 
> Row(fdate=u'20170208', in_amount2=4483131), Row(fdate=u'20170224', 
> in_amount2=9674888), Row(fdate=u'20170201', in_amount2=3227502), 
> Row(fdate=u'20170221', in_amount2=5084800), Row(fdate=u'20170303', 
> in_amount2=20577801), Row(fdate=u'20170202', in_amount2=4024218), 
> Row(fdate=u'20170306', in_amount2=8581773), Row(fdate=u'20170227', 
> in_amount2=5748035), Row(fdate=u'20170206', in_amount2=7330154), 
> Row(fdate=u'20170217', in_amount2=6838105), Row(fdate=u'20170203', 
> in_amount2=9390262), Row(fdate=u'20170222', in_amount2=3800662), 
> Row(fdate=u'20170216', in_amount2=4338891), Row(fdate=u'20170209', 
> in_amount2=4024611), Row(fdate=u'20170214', in_amount2=4030389), 
> Row(fdate=u'20170307', in_amount2=5504936), Row(fdate=u'20170213', 
> in_amount2=7142428), Row(fdate=u'20170228', in_amount2=8618951), 
> Row(fdate=u'20170220', in_amount2=8172290), Row(fdate=u'20170210', 
> in_amount2=8411312), Row(fdate=u'20170215', in_amount2=5302422), 
> Row(fdate=u'20170301', in_amount2=9475418)]
> >>> ht_net_in_df = df1.join(df2,df1.fdate == df2.fdate,'inner')
> 2017-03-08 10:27:34,357 WARN  [Thread-2] sql.Column: Constructing trivially 
> true equals predicate, 'fdate#42 = fdate#42'. Perhaps you need to use aliases.
> Traceback (most recent call last):
>   File "", line 1, in 
>   File "/home/spark/python/pyspark/sql/dataframe.py", line 652, in join
> jdf = self._jdf.join(other._jdf, on._jc, how)
>   File "/home/spark/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py", 
> line 933, in __call__
>   File "/home/spark/python/pyspark/sql/utils.py", line 69, in deco
> raise AnalysisException(s.split(': ', 1)[1], stackTrace)
> pyspark.sql.utils.AnalysisException: u"
> Failure when resolving conflicting references in Join:
> 'Join Inner, (fdate#42 = fdate#42)
> :- Aggregate [fdate#42], [fdate#42, cast(sum(cast(inoutmoney#47 as double)) 
> as int) AS in_amount1#97]
> :  +- Filter (inorout#44 = A)
> : +- Project [firm_id#40, partnerid#45, inorout#44, inoutmoney#47, 
> fdate#42]
> :+- Filter (((partnerid#45 = pmec) && NOT (firm_id#40 = NULL)) && 
> (NOT (firm_id#40 = -1) && (fdate#42 >= 20170201)))
> :   +- SubqueryAlias history_transfer_v
> :  +- Project [md5(cast(firmid#41 as binary)) AS FIRM_ID#40, 
> fdate#42, ftime#43, inorout#44, partnerid#45, realdate#46, inoutmoney#47, 
> bankwaterid#48, waterid#49, waterstate#50, source#51]
> : +- SubqueryAlias history_transfer
> :+- 
> Relation[firmid#41,fdate#42,ftime#43,inorout#44,partnerid#45,realdate#46,inoutmoney#47,bankwaterid#48,waterid#49,waterstate#50,source#51]
>  parquet
> +- Aggregate [fdate#42], [fdate#42, cast(sum(cast(inoutmoney#47 as double)) 
> as int) AS in_amount2#145]
>+- Filter (inorout#44 = B)
>   +- Project [firm_id#40, partnerid#45, inorout#44, inoutmoney#47, 
> fdate#42]
>  

[jira] [Commented] (SPARK-21737) Create communication channel between arbitrary clients and the Spark AM in YARN mode

2017-10-10 Thread Saisai Shao (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16198674#comment-16198674
 ] 

Saisai Shao commented on SPARK-21737:
-

I was trying to understand how Spark communicate with Mesos, but my knowledge 
of Mesos is quite poor, till now I cannot figure out a way to address this 
problem in Mesos.

If we only restrict this problem to a Spark on YARN problem, then this is not 
an issue any more. But my thinking is that only focusing on YARN makes this 
channel not so useful. 

Another way is that we build an interface to get endpoint address from cluster 
manager, and only have a on-yarn implementation initially. Later on we can 
support Standalone and Mesos if it is required, what do you think?

> Create communication channel between arbitrary clients and the Spark AM in 
> YARN mode
> 
>
> Key: SPARK-21737
> URL: https://issues.apache.org/jira/browse/SPARK-21737
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.1
>Reporter: Jong Yoon Lee
>Priority: Minor
>
> In this JIRA, I develop code to create a communication channel between 
> arbitrary clients and a Spark AM on YARN. This code can be utilized to send 
> commands such as getting status command, getting history info from the CLI, 
> killing the application and pushing new tokens.
> Design Doc:
> https://docs.google.com/document/d/1QMbWhg13ocIoADywZQBRRVj-b9Zf8CnBrruP5JhcOOY/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19325) Running query hang-up 5min

2017-10-10 Thread Hyukjin Kwon (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16198668#comment-16198668
 ] 

Hyukjin Kwon commented on SPARK-19325:
--

[~Sephiroth-Lin], do you maybe have a reproducer for this? Looks I am not 
reproducing this.

> Running query hang-up 5min
> --
>
> Key: SPARK-19325
> URL: https://issues.apache.org/jira/browse/SPARK-19325
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.5.2
> Environment: OS: SUSE  12.1
> JDK: Oracle JDK 1.8.0_112
>Reporter: Weizhong
>Priority: Minor
>
> 1. Running query, 1 stage hang-up 5min(only read 1M data)
> 2. The stage is:
> {noformat}
> PhysicalRDD(read from parquet file) --> Filter --> ConvertToUnsafe --> 
> BroadcastHashJoin --> TungstenProject --> BroadcastHashJoin --> 
> TungstenProject --> TungstenExchange
> {noformat}
> 3. When hang-up, we dump the jstack, and details:
> {noformat}
> "Executor task launch worker-3" #147 daemon prio=5 os_prio=0 
> tid=0x7fb5481af000 nid=0x3a166 runnable [0x7fb4f18a6000]
>java.lang.Thread.State: RUNNABLE
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>   at 
> org.apache.spark.sql.execution.joins.HashJoin$$anon$1.fetchNext(HashJoin.scala:181)
>   at 
> org.apache.spark.sql.execution.joins.HashJoin$$anon$1.hasNext(HashJoin.scala:149)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>   at 
> org.apache.spark.sql.execution.joins.HashJoin$$anon$1.fetchNext(HashJoin.scala:181)
>   at 
> org.apache.spark.sql.execution.joins.HashJoin$$anon$1.hasNext(HashJoin.scala:149)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>   at 
> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:219)
>   at 
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:75)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:42)
>   at org.apache.spark.scheduler.Task.run(Task.scala:90)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:253)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
>Locked ownable synchronizers:
>   - <0x0002c590e7e0> (a 
> java.util.concurrent.ThreadPoolExecutor$Worker)
> "Executor task launch worker-2" #146 daemon prio=5 os_prio=0 
> tid=0x7fb548b15000 nid=0x3a137 runnable [0x7fb4f31e3000]
>java.lang.Thread.State: RUNNABLE
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>   at 
> org.apache.spark.sql.execution.joins.HashJoin$$anon$1.fetchNext(HashJoin.scala:181)
>   at 
> org.apache.spark.sql.execution.joins.HashJoin$$anon$1.hasNext(HashJoin.scala:149)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>   at 
> org.apache.spark.sql.execution.joins.HashJoin$$anon$1.fetchNext(HashJoin.scala:181)
>   at 
> org.apache.spark.sql.execution.joins.HashJoin$$anon$1.hasNext(HashJoin.scala:149)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>   at 
> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:219)
>   at 
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:75)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:42)
>   at org.apache.spark.scheduler.Task.run(Task.scala:90)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:253)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
>Locked ownable synchronizers:
>   - <0x0002c590fab0> (a 
> java.util.concurrent.ThreadPoolExecutor$Worker)
> "Executor task launch worker-1" #145 daemon prio=5 os_prio=0 
> tid=0x7fb54a4c6000 nid=0x3a12f runnable [0x7fb4f19a7000]
>java.lang.Thread.State: RUNNABLE
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>   at 
> org.apache.spark.sql.execution.joins.HashJoin$$anon$1.fetchNext(HashJoin.scala:181)
>   at 
> org.apache.spark.sql.execution.joins.HashJoin$$anon$1.hasNext(HashJoin.scala:149)
>   at 

[jira] [Commented] (SPARK-21737) Create communication channel between arbitrary clients and the Spark AM in YARN mode

2017-10-10 Thread Thomas Graves (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16198665#comment-16198665
 ] 

Thomas Graves commented on SPARK-21737:
---

I haven't had time to think about it much since the pr discussion.  My initial 
thought was its a generic class/api but you still need resource manager 
specific parts. I was thinking it would initially only be supported on yarn 
until someone interested in supporting on the others looked at it. Meaning for 
yarn you would still use the RM to register the endpoint.   Without that you 
need some other service discovery mechanism like you mention.  I'm not a fan of 
saving this information to hdfs.

Have you looked to see if mesos has a similar service?


> Create communication channel between arbitrary clients and the Spark AM in 
> YARN mode
> 
>
> Key: SPARK-21737
> URL: https://issues.apache.org/jira/browse/SPARK-21737
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.1
>Reporter: Jong Yoon Lee
>Priority: Minor
>
> In this JIRA, I develop code to create a communication channel between 
> arbitrary clients and a Spark AM on YARN. This code can be utilized to send 
> commands such as getting status command, getting history info from the CLI, 
> killing the application and pushing new tokens.
> Design Doc:
> https://docs.google.com/document/d/1QMbWhg13ocIoADywZQBRRVj-b9Zf8CnBrruP5JhcOOY/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-19039) UDF ClosureCleaner bug when UDF, col applied in paste mode in REPL

2017-10-10 Thread Hyukjin Kwon (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19039?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon updated SPARK-19039:
-
Affects Version/s: 2.3.0

> UDF ClosureCleaner bug when UDF, col applied in paste mode in REPL
> --
>
> Key: SPARK-19039
> URL: https://issues.apache.org/jira/browse/SPARK-19039
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.3, 2.0.2, 2.1.0, 2.3.0
>Reporter: Joseph K. Bradley
>
> When I try this:
> * Define UDF
> * Apply UDF to get Column
> * Use Column in a DataFrame
> I can find weird behavior in the spark-shell when using paste mode.
> To reproduce this, paste this into the spark-shell:
> {code}
> import org.apache.spark.sql.functions._
> val df = spark.createDataFrame(Seq(
>   ("hi", 1),
>   ("there", 2),
>   ("the", 3),
>   ("end", 4)
> )).toDF("a", "b")
> val myNumbers = Set(1,2,3)
> val tmpUDF = udf { (n: Int) => myNumbers.contains(n) }
> val rowHasMyNumber = tmpUDF($"b")
> df.where(rowHasMyNumber).show()
> {code}
> Stack trace for Spark 2.0 (similar for other versions):
> {code}
> org.apache.spark.SparkException: Task not serializable
>   at 
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
>   at 
> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
>   at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
>   at org.apache.spark.SparkContext.clean(SparkContext.scala:2057)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:817)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:816)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>   at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
>   at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:816)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:364)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
>   at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
>   at 
> org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:225)
>   at 
> org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:308)
>   at 
> org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:39)
>   at 
> org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2193)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
>   at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2551)
>   at 
> org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2192)
>   at 
> org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2199)
>   at 
> org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1935)
>   at 
> org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1934)
>   at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2581)
>   at org.apache.spark.sql.Dataset.head(Dataset.scala:1934)
>   at org.apache.spark.sql.Dataset.take(Dataset.scala:2149)
>   at org.apache.spark.sql.Dataset.showString(Dataset.scala:239)
>   at org.apache.spark.sql.Dataset.show(Dataset.scala:526)
>   at org.apache.spark.sql.Dataset.show(Dataset.scala:486)
>   at org.apache.spark.sql.Dataset.show(Dataset.scala:495)
>   at 
> linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw$$iw.(:45)
>   at 
> linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw.(:57)
>   at 
> linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw.(:59)
>   at linef732283eefe649f4877db916c5ad096f25.$read$$iw.(:61)
>   at 
> linef732283eefe649f4877db916c5ad096f25.$eval$.$print$lzycompute(:7)
>   at linef732283eefe649f4877db916c5ad096f25.$eval$.$print(:6)
> Caused by: java.io.NotSerializableException: org.apache.spark.sql.Column
> Serialization stack:
>   - object not serializable (class: org.apache.spark.sql.Column, value: 
> UDF(b))
>   - field (class: 
> 

[jira] [Commented] (SPARK-19039) UDF ClosureCleaner bug when UDF, col applied in paste mode in REPL

2017-10-10 Thread Hyukjin Kwon (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16198655#comment-16198655
 ] 

Hyukjin Kwon commented on SPARK-19039:
--

Still happens in the master:

{code}
scala> :paste
// Entering paste mode (ctrl-D to finish)

import org.apache.spark.sql.functions._
val df = spark.createDataFrame(Seq(
  ("hi", 1),
  ("there", 2),
  ("the", 3),
  ("end", 4)
)).toDF("a", "b")

val myNumbers = Set(1,2,3)
val tmpUDF = udf { (n: Int) => myNumbers.contains(n) }

val rowHasMyNumber = tmpUDF($"b")
df.where(rowHasMyNumber).show()

// Exiting paste mode, now interpreting.

17/10/10 22:13:56 WARN ObjectStore: Failed to get database global_temp, 
returning NoSuchObjectException
org.apache.spark.SparkException: Task not serializable
  at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:300)
  at 
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:290)
  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
  at org.apache.spark.SparkContext.clean(SparkContext.scala:2288)
  at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:842)
  at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:841)
  at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
  at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:841)
  at 
org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:414)
  at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
  at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
  at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
  at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
  at 
org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:227)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:317)
  at 
org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
  at 
org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3105)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2396)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2396)
  at org.apache.spark.sql.Dataset$$anonfun$49.apply(Dataset.scala:3089)
  at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3088)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:2396)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2610)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:241)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:672)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:631)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:640)
  ... 53 elided
Caused by: java.io.NotSerializableException: org.apache.spark.sql.Column
Serialization stack:
- object not serializable (class: org.apache.spark.sql.Column, value: 
UDF(b))
- field (class: $iw, name: rowHasMyNumber, type: class 
org.apache.spark.sql.Column)
- object (class $iw, $iw@6b351a54)
- field (class: $anonfun$1, name: $outer, type: class $iw)
- object (class $anonfun$1, )
- field (class: 
org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2, name: func$2, 
type: interface scala.Function1)
- object (class 
org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2, )
- field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF, 
name: f, type: interface scala.Function1)
- object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF, 
UDF(input[1, int, false]))
- element of array (index: 1)
- array (class [Ljava.lang.Object;, size 2)
- field (class: 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8, name: 
references$1, type: class [Ljava.lang.Object;)
- object (class 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8, )
  at 
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
  at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
  at 
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
  at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:297)
  ... 84 more
{code}

> UDF ClosureCleaner 

[jira] [Commented] (SPARK-8186) date/time function: date_add

2017-10-10 Thread Hyukjin Kwon (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16198651#comment-16198651
 ] 

Hyukjin Kwon commented on SPARK-8186:
-

I think the problem is filed in separately - SPARK-17174.

> date/time function: date_add
> 
>
> Key: SPARK-8186
> URL: https://issues.apache.org/jira/browse/SPARK-8186
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Reynold Xin
>Assignee: Adrian Wang
> Fix For: 1.5.0
>
>
> date_add(timestamp startdate, int days): timestamp
> date_add(timestamp startdate, interval i): timestamp
> date_add(date date, int days): date
> date_add(date date, interval i): date



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-8186) date/time function: date_add

2017-10-10 Thread eugen yushin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16198630#comment-16198630
 ] 

eugen yushin commented on SPARK-8186:
-

Unfortunately, current implementation doesn't correspond to provided 
specification. `date_add` always returns DateType (and truncate time part of 
timestamp value).

> date/time function: date_add
> 
>
> Key: SPARK-8186
> URL: https://issues.apache.org/jira/browse/SPARK-8186
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Reynold Xin
>Assignee: Adrian Wang
> Fix For: 1.5.0
>
>
> date_add(timestamp startdate, int days): timestamp
> date_add(timestamp startdate, interval i): timestamp
> date_add(date date, int days): date
> date_add(date date, interval i): date



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18536) Failed to save to hive table when case class with empty field

2017-10-10 Thread Hyukjin Kwon (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16198613#comment-16198613
 ] 

Hyukjin Kwon commented on SPARK-18536:
--

The codes:

{code}
import scala.collection.mutable.Queue

import org.apache.spark.sql.SaveMode
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext


case class EmptyC()
case class EmptyCTable(dimensions: EmptyC, timebin: java.lang.Long)


val seq = Seq(EmptyCTable(EmptyC(), 100L))
val rdd = sc.makeRDD[EmptyCTable](seq)
val ssc = new StreamingContext(sc, Seconds(1))

val queue = Queue(rdd)
val s = ssc.queueStream(queue, false);
s.foreachRDD((rdd, time) => {
 if (!rdd.isEmpty) {
rdd.toDF.write.mode(SaveMode.Overwrite).saveAsTable("empty_table")
  }
})

ssc.start()
ssc.awaitTermination()
{code}


now throws:


{code}
org.apache.spark.sql.AnalysisException: cannot resolve 'named_struct()' due to 
data type mismatch: input to function named_struct requires at least one 
argument;;
'SerializeFromObject [if (isnull(assertnotnull(assertnotnull(input[0, 
$line22.$read$$iw$$iw$EmptyCTable, true])).dimensions)) null else 
named_struct() AS dimensions#3, assertnotnull(assertnotnull(input[0, 
$line22.$read$$iw$$iw$EmptyCTable, true])).timebin.longValue AS timebin#4L]
+- ExternalRDD [obj#2]

at 
org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:95)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:87)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
{code}

> Failed to save to hive table when case class with empty field
> -
>
> Key: SPARK-18536
> URL: https://issues.apache.org/jira/browse/SPARK-18536
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.1
>Reporter: pin_zhang
>
> {code}import scala.collection.mutable.Queue
> import org.apache.spark.SparkConf
> import org.apache.spark.SparkContext
> import org.apache.spark.sql.SaveMode
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.streaming.Seconds
> import org.apache.spark.streaming.StreamingContext
> {code}
> 1. Test code
> {code}
> case class EmptyC()
> case class EmptyCTable(dimensions: EmptyC, timebin: java.lang.Long)
> object EmptyTest {
>   def main(args: Array[String]): Unit = {
> val conf = new SparkConf().setAppName("scala").setMaster("local[2]")
> val ctx = new SparkContext(conf)
> val spark = 
> SparkSession.builder().enableHiveSupport().config(conf).getOrCreate()
> val seq = Seq(EmptyCTable(EmptyC(), 100L))
> val rdd = ctx.makeRDD[EmptyCTable](seq)
> val ssc = new StreamingContext(ctx, Seconds(1))
> val queue = Queue(rdd)
> val s = ssc.queueStream(queue, false);
> s.foreachRDD((rdd, time) => {
>   if (!rdd.isEmpty) {
> import spark.sqlContext.implicits._
> rdd.toDF.write.mode(SaveMode.Overwrite).saveAsTable("empty_table")
>   }
> })
> ssc.start()
> ssc.awaitTermination()
>   }
> }
> {code}
> 2. Exception
> {noformat}
> Caused by: java.lang.IllegalStateException: Cannot build an empty group
>   at org.apache.parquet.Preconditions.checkState(Preconditions.java:91)
>   at org.apache.parquet.schema.Types$GroupBuilder.build(Types.java:554)
>   at org.apache.parquet.schema.Types$GroupBuilder.build(Types.java:426)
>   at org.apache.parquet.schema.Types$Builder.named(Types.java:228)
>   at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter.convertField(ParquetSchemaConverter.scala:527)
>   at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter.convertField(ParquetSchemaConverter.scala:321)
>   at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter$$anonfun$convert$1.apply(ParquetSchemaConverter.scala:313)
>   at 
> 

[jira] [Assigned] (SPARK-20025) Driver fail over will not work, if SPARK_LOCAL* env is set.

2017-10-10 Thread Wenchen Fan (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20025?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan reassigned SPARK-20025:
---

Assignee: Prashant Sharma

> Driver fail over will not work, if SPARK_LOCAL* env is set.
> ---
>
> Key: SPARK-20025
> URL: https://issues.apache.org/jira/browse/SPARK-20025
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.2.0
>Reporter: Prashant Sharma
>Assignee: Prashant Sharma
> Fix For: 2.3.0
>
>
> In a bare metal system with No DNS setup, spark may be configured with 
> SPARK_LOCAL* for IP and host properties.
> During a driver failover, in cluster deployment mode. SPARK_LOCAL* should be 
> ignored while auto deploying and should be picked up from target system's 
> local environment.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-20025) Driver fail over will not work, if SPARK_LOCAL* env is set.

2017-10-10 Thread Wenchen Fan (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20025?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan resolved SPARK-20025.
-
   Resolution: Fixed
Fix Version/s: 2.3.0

> Driver fail over will not work, if SPARK_LOCAL* env is set.
> ---
>
> Key: SPARK-20025
> URL: https://issues.apache.org/jira/browse/SPARK-20025
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.2.0
>Reporter: Prashant Sharma
>Assignee: Prashant Sharma
> Fix For: 2.3.0
>
>
> In a bare metal system with No DNS setup, spark may be configured with 
> SPARK_LOCAL* for IP and host properties.
> During a driver failover, in cluster deployment mode. SPARK_LOCAL* should be 
> ignored while auto deploying and should be picked up from target system's 
> local environment.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-21506) The description of "spark.executor.cores" may be not correct

2017-10-10 Thread Wenchen Fan (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan reassigned SPARK-21506:
---

Assignee: liuxian

> The description of "spark.executor.cores" may be not  correct
> -
>
> Key: SPARK-21506
> URL: https://issues.apache.org/jira/browse/SPARK-21506
> Project: Spark
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 2.2.0
>Reporter: liuxian
>Assignee: liuxian
>Priority: Trivial
> Fix For: 2.3.0
>
>
> The description for "spark.executor.cores" :"The number of cores assigned to 
> each executor is configurable. When this is not explicitly set, only one 
> executor per application will run the same worker "
> I think it is not correct, because if  one  application is not assigned 
> enough cores in the first `schedule()`, another executor may be launched on 
> the same worker in the next time.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-21506) The description of "spark.executor.cores" may be not correct

2017-10-10 Thread Wenchen Fan (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan resolved SPARK-21506.
-
   Resolution: Fixed
Fix Version/s: 2.3.0

Issue resolved by pull request 18711
[https://github.com/apache/spark/pull/18711]

> The description of "spark.executor.cores" may be not  correct
> -
>
> Key: SPARK-21506
> URL: https://issues.apache.org/jira/browse/SPARK-21506
> Project: Spark
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 2.2.0
>Reporter: liuxian
>Priority: Trivial
> Fix For: 2.3.0
>
>
> The description for "spark.executor.cores" :"The number of cores assigned to 
> each executor is configurable. When this is not explicitly set, only one 
> executor per application will run the same worker "
> I think it is not correct, because if  one  application is not assigned 
> enough cores in the first `schedule()`, another executor may be launched on 
> the same worker in the next time.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-18479) spark.sql.shuffle.partitions defaults should be a prime number

2017-10-10 Thread Hyukjin Kwon (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18479?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-18479.
--
Resolution: Won't Fix

I am resolving this assuming there is no explicit objection on ^.

> spark.sql.shuffle.partitions defaults should be a prime number
> --
>
> Key: SPARK-18479
> URL: https://issues.apache.org/jira/browse/SPARK-18479
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.0.2
>Reporter: Hamel Ajay Kothari
>Priority: Minor
>
> For most hash bucketing use cases it is my understanding that a prime value, 
> such as 199, would be a safer value than the existing value of 200. Using a 
> non-prime value makes the likelihood of collisions much higher when the hash 
> function isn't great.
> Consider the case where you've got a Timestamp or Long column with 
> millisecond times at midnight each day. With the default value for 
> spark.sql.shuffle.partitions, you'll end up with 120/200 partitions being 
> completely empty.
> Looking around there doesn't seem to be a good reason why we chose 200 so I 
> don't see a huge risk in changing it. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-21646) Add new type coercion rules to compatible with Hive

2017-10-10 Thread Yuming Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21646?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuming Wang updated SPARK-21646:

Summary: Add new type coercion rules to compatible with Hive  (was: Add new 
type coercion to compatible with Hive)

> Add new type coercion rules to compatible with Hive
> ---
>
> Key: SPARK-21646
> URL: https://issues.apache.org/jira/browse/SPARK-21646
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Yuming Wang
>
> How to reproduce:
> hive:
> {code:sql}
> $ hive -S
> hive> create table spark_21646(c1 string, c2 string);
> hive> insert into spark_21646 values('92233720368547758071', 'a');
> hive> insert into spark_21646 values('21474836471', 'b');
> hive> insert into spark_21646 values('10', 'c');
> hive> select * from spark_21646 where c1 > 0;
> 92233720368547758071  a
> 10c
> 21474836471   b
> hive>
> {code}
> spark-sql:
> {code:sql}
> $ spark-sql -S
> spark-sql> select * from spark_21646 where c1 > 0;
> 10  c 
>   
> spark-sql> select * from spark_21646 where c1 > 0L;
> 21474836471   b
> 10c
> spark-sql> explain select * from spark_21646 where c1 > 0;
> == Physical Plan ==
> *Project [c1#14, c2#15]
> +- *Filter (isnotnull(c1#14) && (cast(c1#14 as int) > 0))
>+- *FileScan parquet spark_21646[c1#14,c2#15] Batched: true, Format: 
> Parquet, Location: 
> InMemoryFileIndex[viewfs://cluster4/user/hive/warehouse/spark_21646], 
> PartitionFilters: [], PushedFilters: [IsNotNull(c1)], ReadSchema: 
> struct
> spark-sql> 
> {code}
> As you can see, spark auto cast c1 to int type, if this value out of integer 
> range, the result is different from Hive.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22233) filter out empty InputSplit in HadoopRDD

2017-10-10 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16198526#comment-16198526
 ] 

Apache Spark commented on SPARK-22233:
--

User 'liutang123' has created a pull request for this issue:
https://github.com/apache/spark/pull/19464

> filter out empty InputSplit in HadoopRDD
> 
>
> Key: SPARK-22233
> URL: https://issues.apache.org/jira/browse/SPARK-22233
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.2.0
> Environment: spark version:Spark 2.2
> master: yarn
> deploy-mode: cluster
>Reporter: Lijia Liu
>
> Sometimes, Hive will create an empty table with many empty files, Spark use 
> the InputFormat stored in Hive Meta Store and will not combine the empty 
> files and therefore generate many tasks to handle this empty files.
> Hive use CombineHiveInputFormat(hive.input.format) by default.
> So, in this case, Spark will spends much more resources than hive.
> 2 suggestions:
> 1. add a configuration, filter out empty InputSplit in HadoopRDD.
> 2. add a configuration, user can customize the inputformatclass in 
> HadoopTableReader.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-22233) filter out empty InputSplit in HadoopRDD

2017-10-10 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-22233:


Assignee: Apache Spark

> filter out empty InputSplit in HadoopRDD
> 
>
> Key: SPARK-22233
> URL: https://issues.apache.org/jira/browse/SPARK-22233
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.2.0
> Environment: spark version:Spark 2.2
> master: yarn
> deploy-mode: cluster
>Reporter: Lijia Liu
>Assignee: Apache Spark
>
> Sometimes, Hive will create an empty table with many empty files, Spark use 
> the InputFormat stored in Hive Meta Store and will not combine the empty 
> files and therefore generate many tasks to handle this empty files.
> Hive use CombineHiveInputFormat(hive.input.format) by default.
> So, in this case, Spark will spends much more resources than hive.
> 2 suggestions:
> 1. add a configuration, filter out empty InputSplit in HadoopRDD.
> 2. add a configuration, user can customize the inputformatclass in 
> HadoopTableReader.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-22233) filter out empty InputSplit in HadoopRDD

2017-10-10 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-22233:


Assignee: (was: Apache Spark)

> filter out empty InputSplit in HadoopRDD
> 
>
> Key: SPARK-22233
> URL: https://issues.apache.org/jira/browse/SPARK-22233
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.2.0
> Environment: spark version:Spark 2.2
> master: yarn
> deploy-mode: cluster
>Reporter: Lijia Liu
>
> Sometimes, Hive will create an empty table with many empty files, Spark use 
> the InputFormat stored in Hive Meta Store and will not combine the empty 
> files and therefore generate many tasks to handle this empty files.
> Hive use CombineHiveInputFormat(hive.input.format) by default.
> So, in this case, Spark will spends much more resources than hive.
> 2 suggestions:
> 1. add a configuration, filter out empty InputSplit in HadoopRDD.
> 2. add a configuration, user can customize the inputformatclass in 
> HadoopTableReader.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-21646) Add new type coercion to compatible with Hive

2017-10-10 Thread Yuming Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21646?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuming Wang updated SPARK-21646:

Summary: Add new type coercion to compatible with Hive  (was: 
BinaryComparison shouldn't auto cast string to int/long)

> Add new type coercion to compatible with Hive
> -
>
> Key: SPARK-21646
> URL: https://issues.apache.org/jira/browse/SPARK-21646
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Yuming Wang
>
> How to reproduce:
> hive:
> {code:sql}
> $ hive -S
> hive> create table spark_21646(c1 string, c2 string);
> hive> insert into spark_21646 values('92233720368547758071', 'a');
> hive> insert into spark_21646 values('21474836471', 'b');
> hive> insert into spark_21646 values('10', 'c');
> hive> select * from spark_21646 where c1 > 0;
> 92233720368547758071  a
> 10c
> 21474836471   b
> hive>
> {code}
> spark-sql:
> {code:sql}
> $ spark-sql -S
> spark-sql> select * from spark_21646 where c1 > 0;
> 10  c 
>   
> spark-sql> select * from spark_21646 where c1 > 0L;
> 21474836471   b
> 10c
> spark-sql> explain select * from spark_21646 where c1 > 0;
> == Physical Plan ==
> *Project [c1#14, c2#15]
> +- *Filter (isnotnull(c1#14) && (cast(c1#14 as int) > 0))
>+- *FileScan parquet spark_21646[c1#14,c2#15] Batched: true, Format: 
> Parquet, Location: 
> InMemoryFileIndex[viewfs://cluster4/user/hive/warehouse/spark_21646], 
> PartitionFilters: [], PushedFilters: [IsNotNull(c1)], ReadSchema: 
> struct
> spark-sql> 
> {code}
> As you can see, spark auto cast c1 to int type, if this value out of integer 
> range, the result is different from Hive.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-22235) Can not kill job gracefully in spark standalone cluster

2017-10-10 Thread Mariusz Dubielecki (JIRA)
Mariusz Dubielecki created SPARK-22235:
--

 Summary: Can not kill job gracefully in spark standalone cluster
 Key: SPARK-22235
 URL: https://issues.apache.org/jira/browse/SPARK-22235
 Project: Spark
  Issue Type: Bug
  Components: Spark Submit
Affects Versions: 2.1.0
 Environment: Spark standalone cluster
Reporter: Mariusz Dubielecki


There is a problem with killing streaming jobs gracefully in spark 2.1.0 with 
enabled spark.streaming.stopGracefullyOnShutdown I've tested killing spark jobs 
in many ways and I got some conclusions.

# With command spark-submit --master spark:// --kill driver-id
   It kills job almost immediately - not gracefully
# With api curl -X POST http://localhost:6066/v1/submissions/kill/driverId
   The same like in 1. (I looked at the spark-submit code and it seems like 
this tool calls just REST endpoint)
# With unix kill driver-process
   It doesn't kill the job at all (driver is immediately restarted)

Then I noticed that I'd used param: --supervise so I repeated these all tests 
without this flag. It turned out that 1. and 2. methods worked in the same way 
like before but 3. method worked like I assumed. This means, calling kill 
driver-process job - spark digests all messages from kafka which left and than 
turns down job gracefully. It is of course some solution but quite inconvenient 
since I must track machine with driver instead of using simple spark REST 
endpoint. The second downside is that I can not use flag "supervise" so 
whenever node with spark driver fails than job stops.

I noticed also that killing streaming job with spark-submit does not mark app 
as completed in spark history server.




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22225) wholeTextFilesIterators

2017-10-10 Thread Hyukjin Kwon (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16198335#comment-16198335
 ] 

Hyukjin Kwon commented on SPARK-5:
--

Sure, Thanks [~sams].

> wholeTextFilesIterators
> ---
>
> Key: SPARK-5
> URL: https://issues.apache.org/jira/browse/SPARK-5
> Project: Spark
>  Issue Type: New Feature
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: sam
>
> It is a very common use case to want to preserve a path -> file mapping in an 
> RDD, or read an entire file in one go.  Especially for unstructured data and 
> ETL.
> Currently wholeTextFiles is the goto method for this, but it read the entire 
> file into memory, which is sometimes an issue (see SPARK-18965).  It also 
> precludes the option to lazily process files.
> It would be nice to have a method with the following signature:
> {code}
> def wholeTextFilesIterators(
> path: String,
> minPartitions: Int = defaultMinPartitions,
> delimiter: String = "\n"): RDD[(String, Iterator[String])]
> {code}
> Where each `Iterator[String]` is a lazy file iterator where each string is 
> delimited by the `delimiter` field.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22225) wholeTextFilesIterators

2017-10-10 Thread sam (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16198326#comment-16198326
 ] 

sam commented on SPARK-5:
-

Thanks [~srowen] and [~hyukjin.kwon], I wasn't aware of either of these 
approaches and indeed they suffice for the communities needs.  This ticket will 
serve for easy googling of others in future.

> wholeTextFilesIterators
> ---
>
> Key: SPARK-5
> URL: https://issues.apache.org/jira/browse/SPARK-5
> Project: Spark
>  Issue Type: New Feature
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: sam
>
> It is a very common use case to want to preserve a path -> file mapping in an 
> RDD, or read an entire file in one go.  Especially for unstructured data and 
> ETL.
> Currently wholeTextFiles is the goto method for this, but it read the entire 
> file into memory, which is sometimes an issue (see SPARK-18965).  It also 
> precludes the option to lazily process files.
> It would be nice to have a method with the following signature:
> {code}
> def wholeTextFilesIterators(
> path: String,
> minPartitions: Int = defaultMinPartitions,
> delimiter: String = "\n"): RDD[(String, Iterator[String])]
> {code}
> Where each `Iterator[String]` is a lazy file iterator where each string is 
> delimited by the `delimiter` field.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



  1   2   >