[jira] [Commented] (SPARK-21737) Create communication channel between arbitrary clients and the Spark AM in YARN mode

2017-10-09 Thread Saisai Shao (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16198204#comment-16198204
 ] 

Saisai Shao commented on SPARK-21737:
-

Hi [~tgraves], I'm trying to understand the design of this. As discussed in PR 
we planned to change to create a generic client to driver communication channel 
instead of client-to-AM one. But this raising a question as how to find out RPC 
endpoint? 

In this PR, because it is only targeted to YARN, so it leverages YARN api to 
report AM RPC host/port to RM, and client could get AM RPC endpoint address 
from RM requesting. But if we're going to build a generic client-to-driver 
channel, then how to figure out driver RPC endpoint address?

If we only restrict our topic to Spark on YARN area, then same solution can be 
used to figure out driver RPC address. But how to address this issue when 
running in Standalone/Mesos mode, seems there's no related solution to figure 
out it. I thought of different ways like using hdfs save driver RPC address to 
a file and letting client to read it, but none of them are good enough.

So I'd like to hear your suggestion on it, do you have any better solution? 
Thanks in advance!



 

> Create communication channel between arbitrary clients and the Spark AM in 
> YARN mode
> 
>
> Key: SPARK-21737
> URL: https://issues.apache.org/jira/browse/SPARK-21737
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.1
>Reporter: Jong Yoon Lee
>Priority: Minor
>
> In this JIRA, I develop code to create a communication channel between 
> arbitrary clients and a Spark AM on YARN. This code can be utilized to send 
> commands such as getting status command, getting history info from the CLI, 
> killing the application and pushing new tokens.
> Design Doc:
> https://docs.google.com/document/d/1QMbWhg13ocIoADywZQBRRVj-b9Zf8CnBrruP5JhcOOY/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-22192) An RDD of nested POJO objects cannot be converted into a DataFrame using SQLContext.createDataFrame API

2017-10-09 Thread Asif Hussain Shahid (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16191526#comment-16191526
 ] 

Asif Hussain Shahid edited comment on SPARK-22192 at 10/10/17 5:46 AM:
---

The PR opened for this bug is [https://github.com/apache/spark/pull/19421/files]


was (Author: ahshahid):
The PR opened for this bug is [https://github.com/SnappyDataInc/spark/pull/83]

> An RDD of nested POJO objects cannot be converted into a DataFrame using 
> SQLContext.createDataFrame API
> ---
>
> Key: SPARK-22192
> URL: https://issues.apache.org/jira/browse/SPARK-22192
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
> Environment: Independent of OS / platform
>Reporter: Asif Hussain Shahid
>Priority: Minor
>
> If an RDD contains nested POJO objects, then SQLContext.createDataFrame(RDD, 
> Class) api only handles the top level POJO object. It throws ScalaMatchError 
> exception when handling the nested POJO object as the code does not 
> recursively handle the nested POJOs.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-22233) filter out empty InputSplit in HadoopRDD

2017-10-09 Thread Lijia Liu (JIRA)
Lijia Liu created SPARK-22233:
-

 Summary: filter out empty InputSplit in HadoopRDD
 Key: SPARK-22233
 URL: https://issues.apache.org/jira/browse/SPARK-22233
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 2.2.0
 Environment: spark version:Spark 2.2
master: yarn
deploy-mode: cluster

Reporter: Lijia Liu


Sometimes, Hive will create an empty table with many empty files, Spark use the 
InputFormat stored in Hive Meta Store and will not combine the empty files and 
therefore generate many tasks to handle this empty files.
Hive use CombineHiveInputFormat(hive.input.format) by default.
So, in this case, Spark will spends much more resources than hive.

3 suggestions:
1. add a configuration, filter out empty InputSplit in HadoopRDD.
2. add a configuration, user can customize the inputformatclass in 
HadoopTableReader.
3. use the InputFormatClass configured in hive configuration(hive.input.format).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18492) GeneratedIterator grows beyond 64 KB

2017-10-09 Thread Muhammad Ramzan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16198192#comment-16198192
 ] 

Muhammad Ramzan commented on SPARK-18492:
-

I am using spark 2.1.1 on a production environment and getting the following 
error/warning

2017-10-09 14:01:36 WARN  Utils:66 - Truncated the string representation of a 
plan since it was too large. This behavior can be adjusted by setting 
'spark.debug.maxToStringFields' in SparkEnv.conf.


2017-10-09 14:01:36 ERROR CodeGenerator:91 - failed to compile: 
org.codehaus.janino.JaninoRuntimeException: Code of method "processNext()V" of 
class 
"org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator" 
grows beyond 64 KB

> GeneratedIterator grows beyond 64 KB
> 
>
> Key: SPARK-18492
> URL: https://issues.apache.org/jira/browse/SPARK-18492
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.1
> Environment: CentOS release 6.7 (Final)
>Reporter: Norris Merritt
>
> spark-submit fails with ERROR CodeGenerator: failed to compile: 
> org.codehaus.janino.JaninoRuntimeException: Code of method 
> "(I[Lscala/collection/Iterator;)V" of class 
> "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator" 
> grows beyond 64 KB
> Error message is followed by a huge dump of generated source code.
> The generated code declares 1,454 field sequences like the following:
> /* 036 */   private org.apache.spark.sql.catalyst.expressions.ScalaUDF 
> project_scalaUDF1;
> /* 037 */   private scala.Function1 project_catalystConverter1;
> /* 038 */   private scala.Function1 project_converter1;
> /* 039 */   private scala.Function1 project_converter2;
> /* 040 */   private scala.Function2 project_udf1;
>   (many omitted lines) ...
> /* 6089 */   private org.apache.spark.sql.catalyst.expressions.ScalaUDF 
> project_scalaUDF1454;
> /* 6090 */   private scala.Function1 project_catalystConverter1454;
> /* 6091 */   private scala.Function1 project_converter1695;
> /* 6092 */   private scala.Function1 project_udf1454;
> It then proceeds to emit code for several methods (init, processNext) each of 
> which has totally repetitive sequences of statements pertaining to each of 
> the sequences of variables declared in the class.  For example:
> /* 6101 */   public void init(int index, scala.collection.Iterator inputs[]) {
> The reason that the 64KB JVM limit for code for a method is exceeded is 
> because the code generator is using an incredibly naive strategy.  It emits a 
> sequence like the one shown below for each of the 1,454 groups of variables 
> shown above, in 
> /* 6132 */ this.project_udf = 
> (scala.Function1)project_scalaUDF.userDefinedFunc();
> /* 6133 */ this.project_scalaUDF1 = 
> (org.apache.spark.sql.catalyst.expressions.ScalaUDF) references[10];
> /* 6134 */ this.project_catalystConverter1 = 
> (scala.Function1)org.apache.spark.sql.catalyst.CatalystTypeConverters$.MODULE$.createToCatalystConverter(project_scalaUDF1.dataType());
> /* 6135 */ this.project_converter1 = 
> (scala.Function1)org.apache.spark.sql.catalyst.CatalystTypeConverters$.MODULE$.createToScalaConverter(((org.apache.spark.sql.catalyst.expressions.Expression)(((org.apache.spark.sql.catalyst.expressions.ScalaUDF)references[10]).getChildren().apply(0))).dataType());
> /* 6136 */ this.project_converter2 = 
> (scala.Function1)org.apache.spark.sql.catalyst.CatalystTypeConverters$.MODULE$.createToScalaConverter(((org.apache.spark.sql.catalyst.expressions.Expression)(((org.apache.spark.sql.catalyst.expressions.ScalaUDF)references[10]).getChildren().apply(1))).dataType());
> It blows up after emitting 230 such sequences, while trying to emit the 231st:
> /* 7282 */ this.project_udf230 = 
> (scala.Function2)project_scalaUDF230.userDefinedFunc();
> /* 7283 */ this.project_scalaUDF231 = 
> (org.apache.spark.sql.catalyst.expressions.ScalaUDF) references[240];
> /* 7284 */ this.project_catalystConverter231 = 
> (scala.Function1)org.apache.spark.sql.catalyst.CatalystTypeConverters$.MODULE$.createToCatalystConverter(project_scalaUDF231.dataType());
>   many omitted lines ...
>  Example of repetitive code sequences emitted for processNext method:
> /* 12253 */   boolean project_isNull247 = project_result244 == null;
> /* 12254 */   MapData project_value247 = null;
> /* 12255 */   if (!project_isNull247) {
> /* 12256 */ project_value247 = project_result244;
> /* 12257 */   }
> /* 12258 */   Object project_arg = sort_isNull5 ? null : 
> project_converter489.apply(sort_value5);
> /* 12259 */
> /* 12260 */   ArrayData project_result249 = null;
> /* 12261 */   try {
> /* 12262 */ project_result249 = 
> 

[jira] [Resolved] (SPARK-22225) wholeTextFilesIterators

2017-10-09 Thread Hyukjin Kwon (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-5?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-5.
--
Resolution: Won't Fix

I think we are able to do it and don't think it's worth. Let me resolve it for 
now.

> wholeTextFilesIterators
> ---
>
> Key: SPARK-5
> URL: https://issues.apache.org/jira/browse/SPARK-5
> Project: Spark
>  Issue Type: New Feature
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: sam
>
> It is a very common use case to want to preserve a path -> file mapping in an 
> RDD, or read an entire file in one go.  Especially for unstructured data and 
> ETL.
> Currently wholeTextFiles is the goto method for this, but it read the entire 
> file into memory, which is sometimes an issue (see SPARK-18965).  It also 
> precludes the option to lazily process files.
> It would be nice to have a method with the following signature:
> {code}
> def wholeTextFilesIterators(
> path: String,
> minPartitions: Int = defaultMinPartitions,
> delimiter: String = "\n"): RDD[(String, Iterator[String])]
> {code}
> Where each `Iterator[String]` is a lazy file iterator where each string is 
> delimited by the `delimiter` field.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-22222) Fix the ARRAY_MAX in BufferHolder and add a test

2017-10-09 Thread Xiao Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-2?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiao Li resolved SPARK-2.
-
   Resolution: Fixed
 Assignee: Feng Liu
Fix Version/s: 2.3.0

> Fix the ARRAY_MAX in BufferHolder and add a test
> 
>
> Key: SPARK-2
> URL: https://issues.apache.org/jira/browse/SPARK-2
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: Feng Liu
>Assignee: Feng Liu
> Fix For: 2.3.0
>
>
> This is actually a followup for SPARK-22033, which set the `ARRAY_MAX` to 
> `Int.MaxValue - 8`. It is not a valid number because it will cause the 
> following line fail when such a large byte array is allocated: 
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java#L86
>  We need to make sure the new length is a multiple of 8.
> We need to add one test for the fix. Note that the test should work 
> independently with the heap size of the test JVM. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22231) Support of map, filter, withColumn, dropColumn in nested list of structures

2017-10-09 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16198146#comment-16198146
 ] 

Liang-Chi Hsieh commented on SPARK-22231:
-

Btw, the capacity to work on nested data types looks similar to the higher 
order functions proposed at 
https://databricks.com/blog/2017/05/24/working-with-nested-data-using-higher-order-functions-in-sql-on-databricks.html.

> Support of map, filter, withColumn, dropColumn in nested list of structures
> ---
>
> Key: SPARK-22231
> URL: https://issues.apache.org/jira/browse/SPARK-22231
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: DB Tsai
>
> At Netflix's algorithm team, we work on ranking problems to find the great 
> content to fulfill the unique tastes of our members. Before building a 
> recommendation algorithms, we need to prepare the training, testing, and 
> validation datasets in Apache Spark. Due to the nature of ranking problems, 
> we have a nested list of items to be ranked in one column, and the top level 
> is the contexts describing the setting for where a model is to be used (e.g. 
> profiles, country, time, device, etc.)  Here is a blog post describing the 
> details, [Distributed Time Travel for Feature 
> Generation|https://medium.com/netflix-techblog/distributed-time-travel-for-feature-generation-389cccdd3907].
>  
> To be more concrete, for the ranks of videos for a given profile_id at a 
> given country, our data schema can be looked like this,
> {code:java}
> root
>  |-- profile_id: long (nullable = true)
>  |-- country_iso_code: string (nullable = true)
>  |-- items: array (nullable = false)
>  ||-- element: struct (containsNull = false)
>  |||-- title_id: integer (nullable = true)
>  |||-- scores: double (nullable = true)
> ...
> {code}
> We oftentimes need to work on the nested list of structs by applying some 
> functions on them. Sometimes, we're dropping or adding new columns in the 
> nested list of structs. Currently, there is no easy solution in open source 
> Apache Spark to perform those operations using SQL primitives; many people 
> just convert the data into RDD to work on the nested level of data, and then 
> reconstruct the new dataframe as workaround. This is extremely inefficient 
> because all the optimizations like predicate pushdown in SQL can not be 
> performed, we can not leverage on the columnar format, and the serialization 
> and deserialization cost becomes really huge even we just want to add a new 
> column in the nested level.
> We built a solution internally at Netflix which we're very happy with. We 
> plan to make it open source in Spark upstream. We would like to socialize the 
> API design to see if we miss any use-case.  
> The first API we added is *mapItems* on dataframe which take a function from 
> *Column* to *Column*, and then apply the function on nested dataframe. Here 
> is an example,
> {code:java}
> case class Data(foo: Int, bar: Double, items: Seq[Double])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(10.1, 10.2, 10.3, 10.4)),
>   Data(20, 20.0, Seq(20.1, 20.2, 20.3, 20.4))
> ))
> val result = df.mapItems("items") {
>   item => item * 2.0
> }
> result.printSchema()
> // root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // ||-- element: double (containsNull = true)
> result.show()
> // +---+++
> // |foo| bar|   items|
> // +---+++
> // | 10|10.0|[20.2, 20.4, 20.6...|
> // | 20|20.0|[40.2, 40.4, 40.6...|
> // +---+++
> {code}
> Now, with the ability of applying a function in the nested dataframe, we can 
> add a new function, *withColumn* in *Column* to add or replace the existing 
> column that has the same name in the nested list of struct. Here is two 
> examples demonstrating the API together with *mapItems*; the first one 
> replaces the existing column,
> {code:java}
> case class Item(a: Int, b: Double)
> case class Data(foo: Int, bar: Double, items: Seq[Item])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))),
>   Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0)))
> ))
> val result = df.mapItems("items") {
>   item => item.withColumn(item("b") + 1 as "b")
> }
> result.printSchema
> root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // ||-- element: struct (containsNull = true)
> // |||-- a: integer (nullable = true)
> // |||-- b: double (nullable = true)
> result.show(false)
> // +---++--+
> // |foo|bar 

[jira] [Updated] (SPARK-8515) Improve ML attribute API

2017-10-09 Thread Liang-Chi Hsieh (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-8515?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liang-Chi Hsieh updated SPARK-8515:
---
Attachment: (was: SPARK-8515.pdf)

> Improve ML attribute API
> 
>
> Key: SPARK-8515
> URL: https://issues.apache.org/jira/browse/SPARK-8515
> Project: Spark
>  Issue Type: Improvement
>  Components: ML
>Affects Versions: 1.4.0
>Reporter: Xiangrui Meng
>  Labels: advanced
> Attachments: SPARK-8515.pdf
>
>
> In 1.4.0, we introduced ML attribute API to embed feature/label attribute 
> info inside DataFrame's schema. However, the API is not very friendly to use. 
> We should re-visit this API and see how we can improve it.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-8515) Improve ML attribute API

2017-10-09 Thread Liang-Chi Hsieh (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-8515?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liang-Chi Hsieh updated SPARK-8515:
---
Attachment: SPARK-8515.pdf

> Improve ML attribute API
> 
>
> Key: SPARK-8515
> URL: https://issues.apache.org/jira/browse/SPARK-8515
> Project: Spark
>  Issue Type: Improvement
>  Components: ML
>Affects Versions: 1.4.0
>Reporter: Xiangrui Meng
>  Labels: advanced
> Attachments: SPARK-8515.pdf
>
>
> In 1.4.0, we introduced ML attribute API to embed feature/label attribute 
> info inside DataFrame's schema. However, the API is not very friendly to use. 
> We should re-visit this API and see how we can improve it.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-22231) Support of map, filter, withColumn, dropColumn in nested list of structures

2017-10-09 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16198096#comment-16198096
 ] 

Liang-Chi Hsieh edited comment on SPARK-22231 at 10/10/17 3:28 AM:
---

Looks like {{mapItems}} is an API can work on any Array type column?

When will we detect possible error of operations on the element? Will we do 
analysis check or we only know it during runtime? For example, when we do 
{{item.drop("b")}} but there is no such column in {{item}}, will we know it 
before running the job?


was (Author: viirya):
Looks like `mapItems` is an API can work on any Array type column?

When will we detect possible error of operations on the element? Will we do 
analysis check or we only know it during runtime? For example, when we do 
`item.drop("b")` but there is no such column in `item`, will we know it before 
running the job?

> Support of map, filter, withColumn, dropColumn in nested list of structures
> ---
>
> Key: SPARK-22231
> URL: https://issues.apache.org/jira/browse/SPARK-22231
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: DB Tsai
>
> At Netflix's algorithm team, we work on ranking problems to find the great 
> content to fulfill the unique tastes of our members. Before building a 
> recommendation algorithms, we need to prepare the training, testing, and 
> validation datasets in Apache Spark. Due to the nature of ranking problems, 
> we have a nested list of items to be ranked in one column, and the top level 
> is the contexts describing the setting for where a model is to be used (e.g. 
> profiles, country, time, device, etc.)  Here is a blog post describing the 
> details, [Distributed Time Travel for Feature 
> Generation|https://medium.com/netflix-techblog/distributed-time-travel-for-feature-generation-389cccdd3907].
>  
> To be more concrete, for the ranks of videos for a given profile_id at a 
> given country, our data schema can be looked like this,
> {code:java}
> root
>  |-- profile_id: long (nullable = true)
>  |-- country_iso_code: string (nullable = true)
>  |-- items: array (nullable = false)
>  ||-- element: struct (containsNull = false)
>  |||-- title_id: integer (nullable = true)
>  |||-- scores: double (nullable = true)
> ...
> {code}
> We oftentimes need to work on the nested list of structs by applying some 
> functions on them. Sometimes, we're dropping or adding new columns in the 
> nested list of structs. Currently, there is no easy solution in open source 
> Apache Spark to perform those operations using SQL primitives; many people 
> just convert the data into RDD to work on the nested level of data, and then 
> reconstruct the new dataframe as workaround. This is extremely inefficient 
> because all the optimizations like predicate pushdown in SQL can not be 
> performed, we can not leverage on the columnar format, and the serialization 
> and deserialization cost becomes really huge even we just want to add a new 
> column in the nested level.
> We built a solution internally at Netflix which we're very happy with. We 
> plan to make it open source in Spark upstream. We would like to socialize the 
> API design to see if we miss any use-case.  
> The first API we added is *mapItems* on dataframe which take a function from 
> *Column* to *Column*, and then apply the function on nested dataframe. Here 
> is an example,
> {code:java}
> case class Data(foo: Int, bar: Double, items: Seq[Double])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(10.1, 10.2, 10.3, 10.4)),
>   Data(20, 20.0, Seq(20.1, 20.2, 20.3, 20.4))
> ))
> val result = df.mapItems("items") {
>   item => item * 2.0
> }
> result.printSchema()
> // root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // ||-- element: double (containsNull = true)
> result.show()
> // +---+++
> // |foo| bar|   items|
> // +---+++
> // | 10|10.0|[20.2, 20.4, 20.6...|
> // | 20|20.0|[40.2, 40.4, 40.6...|
> // +---+++
> {code}
> Now, with the ability of applying a function in the nested dataframe, we can 
> add a new function, *withColumn* in *Column* to add or replace the existing 
> column that has the same name in the nested list of struct. Here is two 
> examples demonstrating the API together with *mapItems*; the first one 
> replaces the existing column,
> {code:java}
> case class Item(a: Int, b: Double)
> case class Data(foo: Int, bar: Double, items: Seq[Item])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))),
>   Data(20, 20.0, Seq(Item(20, 

[jira] [Commented] (SPARK-22231) Support of map, filter, withColumn, dropColumn in nested list of structures

2017-10-09 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16198096#comment-16198096
 ] 

Liang-Chi Hsieh commented on SPARK-22231:
-

Looks like `mapItems` is an API can work on any Array type column?

When will we detect possible error of operations on the element? Will we do 
analysis check or we only know it during runtime? For example, when we do 
`item.drop("b")` but there is no such column in `item`, will we know it before 
running the job?

> Support of map, filter, withColumn, dropColumn in nested list of structures
> ---
>
> Key: SPARK-22231
> URL: https://issues.apache.org/jira/browse/SPARK-22231
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: DB Tsai
>
> At Netflix's algorithm team, we work on ranking problems to find the great 
> content to fulfill the unique tastes of our members. Before building a 
> recommendation algorithms, we need to prepare the training, testing, and 
> validation datasets in Apache Spark. Due to the nature of ranking problems, 
> we have a nested list of items to be ranked in one column, and the top level 
> is the contexts describing the setting for where a model is to be used (e.g. 
> profiles, country, time, device, etc.)  Here is a blog post describing the 
> details, [Distributed Time Travel for Feature 
> Generation|https://medium.com/netflix-techblog/distributed-time-travel-for-feature-generation-389cccdd3907].
>  
> To be more concrete, for the ranks of videos for a given profile_id at a 
> given country, our data schema can be looked like this,
> {code:java}
> root
>  |-- profile_id: long (nullable = true)
>  |-- country_iso_code: string (nullable = true)
>  |-- items: array (nullable = false)
>  ||-- element: struct (containsNull = false)
>  |||-- title_id: integer (nullable = true)
>  |||-- scores: double (nullable = true)
> ...
> {code}
> We oftentimes need to work on the nested list of structs by applying some 
> functions on them. Sometimes, we're dropping or adding new columns in the 
> nested list of structs. Currently, there is no easy solution in open source 
> Apache Spark to perform those operations using SQL primitives; many people 
> just convert the data into RDD to work on the nested level of data, and then 
> reconstruct the new dataframe as workaround. This is extremely inefficient 
> because all the optimizations like predicate pushdown in SQL can not be 
> performed, we can not leverage on the columnar format, and the serialization 
> and deserialization cost becomes really huge even we just want to add a new 
> column in the nested level.
> We built a solution internally at Netflix which we're very happy with. We 
> plan to make it open source in Spark upstream. We would like to socialize the 
> API design to see if we miss any use-case.  
> The first API we added is *mapItems* on dataframe which take a function from 
> *Column* to *Column*, and then apply the function on nested dataframe. Here 
> is an example,
> {code:java}
> case class Data(foo: Int, bar: Double, items: Seq[Double])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(10.1, 10.2, 10.3, 10.4)),
>   Data(20, 20.0, Seq(20.1, 20.2, 20.3, 20.4))
> ))
> val result = df.mapItems("items") {
>   item => item * 2.0
> }
> result.printSchema()
> // root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // ||-- element: double (containsNull = true)
> result.show()
> // +---+++
> // |foo| bar|   items|
> // +---+++
> // | 10|10.0|[20.2, 20.4, 20.6...|
> // | 20|20.0|[40.2, 40.4, 40.6...|
> // +---+++
> {code}
> Now, with the ability of applying a function in the nested dataframe, we can 
> add a new function, *withColumn* in *Column* to add or replace the existing 
> column that has the same name in the nested list of struct. Here is two 
> examples demonstrating the API together with *mapItems*; the first one 
> replaces the existing column,
> {code:java}
> case class Item(a: Int, b: Double)
> case class Data(foo: Int, bar: Double, items: Seq[Item])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))),
>   Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0)))
> ))
> val result = df.mapItems("items") {
>   item => item.withColumn(item("b") + 1 as "b")
> }
> result.printSchema
> root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // ||-- element: struct (containsNull = true)
> // |||-- a: integer (nullable = true)
> // |||-- b: double 

[jira] [Comment Edited] (SPARK-22231) Support of map, filter, withColumn, dropColumn in nested list of structures

2017-10-09 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16198082#comment-16198082
 ] 

Liang-Chi Hsieh edited comment on SPARK-22231 at 10/10/17 3:18 AM:
---

I think there is a typo in the second example to add a new column:
{code}
val result = df.mapItems("items") {
  item => item.withColumn(item("b") + 1 as "c")
}
{code}


was (Author: viirya):
I think there is a typo in the second example to add a new column:
{code:scala}
val result = df.mapItems("items") {
  item => item.withColumn(item("b") + 1 as "c")
}
{code}

> Support of map, filter, withColumn, dropColumn in nested list of structures
> ---
>
> Key: SPARK-22231
> URL: https://issues.apache.org/jira/browse/SPARK-22231
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: DB Tsai
>
> At Netflix's algorithm team, we work on ranking problems to find the great 
> content to fulfill the unique tastes of our members. Before building a 
> recommendation algorithms, we need to prepare the training, testing, and 
> validation datasets in Apache Spark. Due to the nature of ranking problems, 
> we have a nested list of items to be ranked in one column, and the top level 
> is the contexts describing the setting for where a model is to be used (e.g. 
> profiles, country, time, device, etc.)  Here is a blog post describing the 
> details, [Distributed Time Travel for Feature 
> Generation|https://medium.com/netflix-techblog/distributed-time-travel-for-feature-generation-389cccdd3907].
>  
> To be more concrete, for the ranks of videos for a given profile_id at a 
> given country, our data schema can be looked like this,
> {code:java}
> root
>  |-- profile_id: long (nullable = true)
>  |-- country_iso_code: string (nullable = true)
>  |-- items: array (nullable = false)
>  ||-- element: struct (containsNull = false)
>  |||-- title_id: integer (nullable = true)
>  |||-- scores: double (nullable = true)
> ...
> {code}
> We oftentimes need to work on the nested list of structs by applying some 
> functions on them. Sometimes, we're dropping or adding new columns in the 
> nested list of structs. Currently, there is no easy solution in open source 
> Apache Spark to perform those operations using SQL primitives; many people 
> just convert the data into RDD to work on the nested level of data, and then 
> reconstruct the new dataframe as workaround. This is extremely inefficient 
> because all the optimizations like predicate pushdown in SQL can not be 
> performed, we can not leverage on the columnar format, and the serialization 
> and deserialization cost becomes really huge even we just want to add a new 
> column in the nested level.
> We built a solution internally at Netflix which we're very happy with. We 
> plan to make it open source in Spark upstream. We would like to socialize the 
> API design to see if we miss any use-case.  
> The first API we added is *mapItems* on dataframe which take a function from 
> *Column* to *Column*, and then apply the function on nested dataframe. Here 
> is an example,
> {code:java}
> case class Data(foo: Int, bar: Double, items: Seq[Double])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(10.1, 10.2, 10.3, 10.4)),
>   Data(20, 20.0, Seq(20.1, 20.2, 20.3, 20.4))
> ))
> val result = df.mapItems("items") {
>   item => item * 2.0
> }
> result.printSchema()
> // root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // ||-- element: double (containsNull = true)
> result.show()
> // +---+++
> // |foo| bar|   items|
> // +---+++
> // | 10|10.0|[20.2, 20.4, 20.6...|
> // | 20|20.0|[40.2, 40.4, 40.6...|
> // +---+++
> {code}
> Now, with the ability of applying a function in the nested dataframe, we can 
> add a new function, *withColumn* in *Column* to add or replace the existing 
> column that has the same name in the nested list of struct. Here is two 
> examples demonstrating the API together with *mapItems*; the first one 
> replaces the existing column,
> {code:java}
> case class Item(a: Int, b: Double)
> case class Data(foo: Int, bar: Double, items: Seq[Item])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))),
>   Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0)))
> ))
> val result = df.mapItems("items") {
>   item => item.withColumn(item("b") + 1 as "b")
> }
> result.printSchema
> root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // ||-- element: struct 

[jira] [Commented] (SPARK-22231) Support of map, filter, withColumn, dropColumn in nested list of structures

2017-10-09 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16198082#comment-16198082
 ] 

Liang-Chi Hsieh commented on SPARK-22231:
-

I think there is a typo in the second example to add a new column:
{code:scala}
val result = df.mapItems("items") {
  item => item.withColumn(item("b") + 1 as "c")
}
{code}

> Support of map, filter, withColumn, dropColumn in nested list of structures
> ---
>
> Key: SPARK-22231
> URL: https://issues.apache.org/jira/browse/SPARK-22231
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: DB Tsai
>
> At Netflix's algorithm team, we work on ranking problems to find the great 
> content to fulfill the unique tastes of our members. Before building a 
> recommendation algorithms, we need to prepare the training, testing, and 
> validation datasets in Apache Spark. Due to the nature of ranking problems, 
> we have a nested list of items to be ranked in one column, and the top level 
> is the contexts describing the setting for where a model is to be used (e.g. 
> profiles, country, time, device, etc.)  Here is a blog post describing the 
> details, [Distributed Time Travel for Feature 
> Generation|https://medium.com/netflix-techblog/distributed-time-travel-for-feature-generation-389cccdd3907].
>  
> To be more concrete, for the ranks of videos for a given profile_id at a 
> given country, our data schema can be looked like this,
> {code:java}
> root
>  |-- profile_id: long (nullable = true)
>  |-- country_iso_code: string (nullable = true)
>  |-- items: array (nullable = false)
>  ||-- element: struct (containsNull = false)
>  |||-- title_id: integer (nullable = true)
>  |||-- scores: double (nullable = true)
> ...
> {code}
> We oftentimes need to work on the nested list of structs by applying some 
> functions on them. Sometimes, we're dropping or adding new columns in the 
> nested list of structs. Currently, there is no easy solution in open source 
> Apache Spark to perform those operations using SQL primitives; many people 
> just convert the data into RDD to work on the nested level of data, and then 
> reconstruct the new dataframe as workaround. This is extremely inefficient 
> because all the optimizations like predicate pushdown in SQL can not be 
> performed, we can not leverage on the columnar format, and the serialization 
> and deserialization cost becomes really huge even we just want to add a new 
> column in the nested level.
> We built a solution internally at Netflix which we're very happy with. We 
> plan to make it open source in Spark upstream. We would like to socialize the 
> API design to see if we miss any use-case.  
> The first API we added is *mapItems* on dataframe which take a function from 
> *Column* to *Column*, and then apply the function on nested dataframe. Here 
> is an example,
> {code:java}
> case class Data(foo: Int, bar: Double, items: Seq[Double])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(10.1, 10.2, 10.3, 10.4)),
>   Data(20, 20.0, Seq(20.1, 20.2, 20.3, 20.4))
> ))
> val result = df.mapItems("items") {
>   item => item * 2.0
> }
> result.printSchema()
> // root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // ||-- element: double (containsNull = true)
> result.show()
> // +---+++
> // |foo| bar|   items|
> // +---+++
> // | 10|10.0|[20.2, 20.4, 20.6...|
> // | 20|20.0|[40.2, 40.4, 40.6...|
> // +---+++
> {code}
> Now, with the ability of applying a function in the nested dataframe, we can 
> add a new function, *withColumn* in *Column* to add or replace the existing 
> column that has the same name in the nested list of struct. Here is two 
> examples demonstrating the API together with *mapItems*; the first one 
> replaces the existing column,
> {code:java}
> case class Item(a: Int, b: Double)
> case class Data(foo: Int, bar: Double, items: Seq[Item])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))),
>   Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0)))
> ))
> val result = df.mapItems("items") {
>   item => item.withColumn(item("b") + 1 as "b")
> }
> result.printSchema
> root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // ||-- element: struct (containsNull = true)
> // |||-- a: integer (nullable = true)
> // |||-- b: double (nullable = true)
> result.show(false)
> // +---++--+
> // |foo|bar |items |
> // 

[jira] [Commented] (SPARK-22232) Row objects in pyspark using the `Row(**kwars)` syntax do not get serialized/deserialized properly

2017-10-09 Thread Bago Amirbekian (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16198068#comment-16198068
 ] 

Bago Amirbekian commented on SPARK-22232:
-

Full trace:

{code:none}
[Row(a=u'a', c=3.0, b=2), Row(a=u'a', c=3.0, b=2)]


---
Py4JJavaError Traceback (most recent call last)
 in ()
 17 
 18 # If we introduce a shuffle we have issues
---> 19 print rdd.repartition(3).toDF(schema).take(2)

/databricks/spark/python/pyspark/sql/dataframe.pyc in take(self, num)
475 [Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]
476 """
--> 477 return self.limit(num).collect()
478 
479 @since(1.3)

/databricks/spark/python/pyspark/sql/dataframe.pyc in collect(self)
437 """
438 with SCCallSiteSync(self._sc) as css:
--> 439 port = self._jdf.collectToPython()
440 return list(_load_from_socket(port, 
BatchedSerializer(PickleSerializer(
441 

/databricks/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in 
__call__(self, *args)
   1131 answer = self.gateway_client.send_command(command)
   1132 return_value = get_return_value(
-> 1133 answer, self.gateway_client, self.target_id, self.name)
   1134 
   1135 for temp_arg in temp_args:

/databricks/spark/python/pyspark/sql/utils.pyc in deco(*a, **kw)
 61 def deco(*a, **kw):
 62 try:
---> 63 return f(*a, **kw)
 64 except py4j.protocol.Py4JJavaError as e:
 65 s = e.java_exception.toString()

/databricks/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in 
get_return_value(answer, gateway_client, target_id, name)
317 raise Py4JJavaError(
318 "An error occurred while calling {0}{1}{2}.\n".
--> 319 format(target_id, ".", name), value)
320 else:
321 raise Py4JError(

Py4JJavaError: An error occurred while calling o204.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 161.0 failed 4 times, most recent failure: Lost task 0.3 in stage 161.0 
(TID 433, 10.0.195.33, executor 0): 
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/worker.py", line 177, in main
process()
  File "/databricks/spark/python/pyspark/worker.py", line 172, in process
serializer.dump_stream(func(split_index, iterator), outfile)
  File "/databricks/spark/python/pyspark/serializers.py", line 285, in 
dump_stream
vs = list(itertools.islice(iterator, batch))
  File "/databricks/spark/python/pyspark/sql/session.py", line 520, in prepare
verify_func(obj, schema)
  File "/databricks/spark/python/pyspark/sql/types.py", line 1458, in 
_verify_type
_verify_type(v, f.dataType, f.nullable)
  File "/databricks/spark/python/pyspark/sql/types.py", line 1422, in 
_verify_type
raise TypeError("%s can not accept object %r in type %s" % (dataType, obj, 
type(obj)))
TypeError: FloatType can not accept object 2 in type 

at 
org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at 
org.apache.spark.api.python.PythonRunner$$anon$1.(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at 

[jira] [Updated] (SPARK-22232) Row objects in pyspark using the `Row(**kwars)` syntax do not get serialized/deserialized properly

2017-10-09 Thread Bago Amirbekian (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22232?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bago Amirbekian updated SPARK-22232:

Description: 
The fields in a Row object created from a dict (ie {{Row(**kwargs)}}) should be 
accessed by field name, not by position because {{Row.__new__}} sorts the 
fields alphabetically by name. It seems like this promise is not being honored 
when these Row objects are shuffled. I've included an example to help reproduce 
the issue.



{code:none}
from pyspark.sql.types import *
from pyspark.sql import *

def toRow(i):
  return Row(a="a", c=3.0, b=2)

schema = StructType([
  # Putting fields in alphabetical order masks the issue
  StructField("a", StringType(),  False),
  StructField("c", FloatType(), False),
  StructField("b", IntegerType(), False),
])

rdd = sc.parallelize(range(10)).repartition(2).map(lambda i: toRow(i))

# As long as we don't shuffle things work fine.
print rdd.toDF(schema).take(2)

# If we introduce a shuffle we have issues
print rdd.repartition(3).toDF(schema).take(2)
{code}


  was:
The fields in a Row object created from a dict (ie {{Row(**kwargs)}}) should be 
accessed by field name, not by position because {{Row.__new__}} sorts the 
fields alphabetically by name. It seems like this promise is not being honored 
when these Row objects are shuffled. I've included an example to help reproduce 
the issue.



{code:none}
from pyspark.sql.types import *
from pyspark.sql import *

def toRow(i):
  return Row(a="a", c=3.0, b=2)

schema = StructType([
  StructField("a", StringType(),  False),
  StructField("c", FloatType(), False),
  StructField("b", IntegerType(), False),
])

rdd = sc.parallelize(range(10)).repartition(2).map(lambda i: toRow(i))

# As long as we don't shuffle things work fine.
print rdd.toDF(schema).take(2)

# If we introduce a shuffle we have issues
print rdd.repartition(3).toDF(schema).take(2)
{code}



> Row objects in pyspark using the `Row(**kwars)` syntax do not get 
> serialized/deserialized properly
> --
>
> Key: SPARK-22232
> URL: https://issues.apache.org/jira/browse/SPARK-22232
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.2.0
>Reporter: Bago Amirbekian
>
> The fields in a Row object created from a dict (ie {{Row(**kwargs)}}) should 
> be accessed by field name, not by position because {{Row.__new__}} sorts the 
> fields alphabetically by name. It seems like this promise is not being 
> honored when these Row objects are shuffled. I've included an example to help 
> reproduce the issue.
> {code:none}
> from pyspark.sql.types import *
> from pyspark.sql import *
> def toRow(i):
>   return Row(a="a", c=3.0, b=2)
> schema = StructType([
>   # Putting fields in alphabetical order masks the issue
>   StructField("a", StringType(),  False),
>   StructField("c", FloatType(), False),
>   StructField("b", IntegerType(), False),
> ])
> rdd = sc.parallelize(range(10)).repartition(2).map(lambda i: toRow(i))
> # As long as we don't shuffle things work fine.
> print rdd.toDF(schema).take(2)
> # If we introduce a shuffle we have issues
> print rdd.repartition(3).toDF(schema).take(2)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-22232) Row objects in pyspark using the `Row(**kwars)` syntax do not get serialized/deserialized properly

2017-10-09 Thread Bago Amirbekian (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22232?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bago Amirbekian updated SPARK-22232:

Description: 
The fields in a Row object created from a dict (ie {{Row(**kwargs)}}) should be 
accessed by field name, not by position because {{Row.__new__}} sorts the 
fields alphabetically by name. It seems like this promise is not being honored 
when these Row objects are shuffled. I've included an example to help reproduce 
the issue.



{code:python}
from pyspark.sql.types import *
from pyspark.sql import *

def toRow(i):
  return Row(a="a", c=3.0, b=2)

schema = StructType([
  StructField("a", StringType(),  False),
  StructField("c", FloatType(), False),
  StructField("b", IntegerType(), False),
])

rdd = sc.parallelize(range(10)).repartition(2).map(lambda i: toRow(i))

# As long as we don't shuffle things work fine.
print rdd.toDF(schema).take(2)

# If we introduce a shuffle we have issues
print rdd.repartition(3).toDF(schema).take(2)
{code}


  was:
The fields in a Row object created from a dict (ie {{Row(**kwargs)}}) should be 
accessed by field name, not by position because {{Row.__new__}} sorts the 
fields alphabetically by name. It seems like this promise is not being honored 
when these Row objects are shuffled. I've included an example to help reproduce 
the issue.



{{
from pyspark.sql.types import *
from pyspark.sql import *

def toRow(i):
  return Row(a="a", c=3.0, b=2)

schema = StructType([
  StructField("a", StringType(),  False),
  StructField("c", FloatType(), False),
  StructField("b", IntegerType(), False),
])

rdd = sc.parallelize(range(10)).repartition(2).map(lambda i: toRow(i))

# As long as we don't shuffle things work fine.
print rdd.toDF(schema).take(2)

# If we introduce a shuffle we have issues
print rdd.repartition(3).toDF(schema).take(2)
}}



> Row objects in pyspark using the `Row(**kwars)` syntax do not get 
> serialized/deserialized properly
> --
>
> Key: SPARK-22232
> URL: https://issues.apache.org/jira/browse/SPARK-22232
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.2.0
>Reporter: Bago Amirbekian
>
> The fields in a Row object created from a dict (ie {{Row(**kwargs)}}) should 
> be accessed by field name, not by position because {{Row.__new__}} sorts the 
> fields alphabetically by name. It seems like this promise is not being 
> honored when these Row objects are shuffled. I've included an example to help 
> reproduce the issue.
> {code:python}
> from pyspark.sql.types import *
> from pyspark.sql import *
> def toRow(i):
>   return Row(a="a", c=3.0, b=2)
> schema = StructType([
>   StructField("a", StringType(),  False),
>   StructField("c", FloatType(), False),
>   StructField("b", IntegerType(), False),
> ])
> rdd = sc.parallelize(range(10)).repartition(2).map(lambda i: toRow(i))
> # As long as we don't shuffle things work fine.
> print rdd.toDF(schema).take(2)
> # If we introduce a shuffle we have issues
> print rdd.repartition(3).toDF(schema).take(2)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-22232) Row objects in pyspark using the `Row(**kwars)` syntax do not get serialized/deserialized properly

2017-10-09 Thread Bago Amirbekian (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22232?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bago Amirbekian updated SPARK-22232:

Description: 
The fields in a Row object created from a dict (ie {{Row(**kwargs)}}) should be 
accessed by field name, not by position because {{Row.__new__}} sorts the 
fields alphabetically by name. It seems like this promise is not being honored 
when these Row objects are shuffled. I've included an example to help reproduce 
the issue.



{code:none}
from pyspark.sql.types import *
from pyspark.sql import *

def toRow(i):
  return Row(a="a", c=3.0, b=2)

schema = StructType([
  StructField("a", StringType(),  False),
  StructField("c", FloatType(), False),
  StructField("b", IntegerType(), False),
])

rdd = sc.parallelize(range(10)).repartition(2).map(lambda i: toRow(i))

# As long as we don't shuffle things work fine.
print rdd.toDF(schema).take(2)

# If we introduce a shuffle we have issues
print rdd.repartition(3).toDF(schema).take(2)
{code}


  was:
The fields in a Row object created from a dict (ie {{Row(**kwargs)}}) should be 
accessed by field name, not by position because {{Row.__new__}} sorts the 
fields alphabetically by name. It seems like this promise is not being honored 
when these Row objects are shuffled. I've included an example to help reproduce 
the issue.



{code:python}
from pyspark.sql.types import *
from pyspark.sql import *

def toRow(i):
  return Row(a="a", c=3.0, b=2)

schema = StructType([
  StructField("a", StringType(),  False),
  StructField("c", FloatType(), False),
  StructField("b", IntegerType(), False),
])

rdd = sc.parallelize(range(10)).repartition(2).map(lambda i: toRow(i))

# As long as we don't shuffle things work fine.
print rdd.toDF(schema).take(2)

# If we introduce a shuffle we have issues
print rdd.repartition(3).toDF(schema).take(2)
{code}



> Row objects in pyspark using the `Row(**kwars)` syntax do not get 
> serialized/deserialized properly
> --
>
> Key: SPARK-22232
> URL: https://issues.apache.org/jira/browse/SPARK-22232
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.2.0
>Reporter: Bago Amirbekian
>
> The fields in a Row object created from a dict (ie {{Row(**kwargs)}}) should 
> be accessed by field name, not by position because {{Row.__new__}} sorts the 
> fields alphabetically by name. It seems like this promise is not being 
> honored when these Row objects are shuffled. I've included an example to help 
> reproduce the issue.
> {code:none}
> from pyspark.sql.types import *
> from pyspark.sql import *
> def toRow(i):
>   return Row(a="a", c=3.0, b=2)
> schema = StructType([
>   StructField("a", StringType(),  False),
>   StructField("c", FloatType(), False),
>   StructField("b", IntegerType(), False),
> ])
> rdd = sc.parallelize(range(10)).repartition(2).map(lambda i: toRow(i))
> # As long as we don't shuffle things work fine.
> print rdd.toDF(schema).take(2)
> # If we introduce a shuffle we have issues
> print rdd.repartition(3).toDF(schema).take(2)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2243) Support multiple SparkContexts in the same JVM

2017-10-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16198062#comment-16198062
 ] 

ASF GitHub Bot commented on SPARK-2243:
---

GitHub user 561152 opened a pull request:

https://github.com/apache/incubator-predictionio/pull/441

 pio batchpredict error

 pio batchpredict --input /tmp/pio/batchpredict-input.json --output 
/tmp/pio/batchpredict-output.json

[WARN] [ALSModel] Product factor is not cached. Prediction could be slow.
Exception in thread "main" org.apache.spark.SparkException: Only one 
SparkContext may be running in this JVM (see SPARK-2243). To ignore this error, 
set spark.driver.allowMultipleContexts = true. The currently running 
SparkContext was created at:
org.apache.spark.SparkContext.(SparkContext.scala:76)

org.apache.predictionio.workflow.WorkflowContext$.apply(WorkflowContext.scala:45)
org.apache.predictionio.workflow.BatchPredict$.run(BatchPredict.scala:160)

org.apache.predictionio.workflow.BatchPredict$$anonfun$main$1$$anonfun$apply$2.apply(BatchPredict.scala:121)

org.apache.predictionio.workflow.BatchPredict$$anonfun$main$1$$anonfun$apply$2.apply(BatchPredict.scala:117)
scala.Option.map(Option.scala:146)

org.apache.predictionio.workflow.BatchPredict$$anonfun$main$1.apply(BatchPredict.scala:117)

org.apache.predictionio.workflow.BatchPredict$$anonfun$main$1.apply(BatchPredict.scala:115)
scala.Option.map(Option.scala:146)
org.apache.predictionio.workflow.BatchPredict$.main(BatchPredict.scala:115)
org.apache.predictionio.workflow.BatchPredict.main(BatchPredict.scala)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498)

org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:738)
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
at 
org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$2.apply(SparkContext.scala:2278)
at 
org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$2.apply(SparkContext.scala:2274)
at scala.Option.foreach(Option.scala:257)
at 
org.apache.spark.SparkContext$.assertNoOtherContextIsRunning(SparkContext.scala:2274)
at 
org.apache.spark.SparkContext$.markPartiallyConstructed(SparkContext.scala:2353)
at org.apache.spark.SparkContext.(SparkContext.scala:85)
at 
org.apache.predictionio.workflow.WorkflowContext$.apply(WorkflowContext.scala:45)
at 
org.apache.predictionio.workflow.BatchPredict$.run(BatchPredict.scala:183)
at 
org.apache.predictionio.workflow.BatchPredict$$anonfun$main$1$$anonfun$apply$2.apply(BatchPredict.scala:121)
at 
org.apache.predictionio.workflow.BatchPredict$$anonfun$main$1$$anonfun$apply$2.apply(BatchPredict.scala:117)
at scala.Option.map(Option.scala:146)
at 
org.apache.predictionio.workflow.BatchPredict$$anonfun$main$1.apply(BatchPredict.scala:117)
at 
org.apache.predictionio.workflow.BatchPredict$$anonfun$main$1.apply(BatchPredict.scala:115)
at scala.Option.map(Option.scala:146)
at 
org.apache.predictionio.workflow.BatchPredict$.main(BatchPredict.scala:115)
at 
org.apache.predictionio.workflow.BatchPredict.main(BatchPredict.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:738)
at 
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
at 
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/apache/incubator-predictionio develop

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-predictionio/pull/441.patch

To close this pull request, make a commit to your master/trunk 

[jira] [Updated] (SPARK-22232) Row objects in pyspark using the `Row(**kwars)` syntax do not get serialized/deserialized properly

2017-10-09 Thread Bago Amirbekian (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22232?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bago Amirbekian updated SPARK-22232:

Description: 
The fields in a Row object created from a dict (ie {{Row(**kwargs)}}) should be 
accessed by field name, not by position because {{Row.__new__}} sorts the 
fields alphabetically by name. It seems like this promise is not being honored 
when these Row objects are shuffled. I've included an example to help reproduce 
the issue.



{{
from pyspark.sql.types import *
from pyspark.sql import *

def toRow(i):
  return Row(a="a", c=3.0, b=2)

schema = StructType([
  StructField("a", StringType(),  False),
  StructField("c", FloatType(), False),
  StructField("b", IntegerType(), False),
])

rdd = sc.parallelize(range(10)).repartition(2).map(lambda i: toRow(i))

# As long as we don't shuffle things work fine.
print rdd.toDF(schema).take(2)

# If we introduce a shuffle we have issues
print rdd.repartition(3).toDF(schema).take(2)
}}


  was:
The fields in a Row object created from a dict (ie {{Row(**kwargs)}}) should be 
accessed by field name, not by position because `Row.__new__` sorts the fields 
alphabetically by name. It seems like this promise is not being honored when 
these Row objects are shuffled. I've included an example to help reproduce the 
issue.



{{
from pyspark.sql.types import *
from pyspark.sql import *

def toRow(i):
  return Row(a="a", c=3.0, b=2)

schema = StructType([
  StructField("a", StringType(),  False),
  StructField("c", FloatType(), False),
  StructField("b", IntegerType(), False),
])

rdd = sc.parallelize(range(10)).repartition(2).map(lambda i: toRow(i))

# As long as we don't shuffle things work fine.
print rdd.toDF(schema).take(2)

# If we introduce a shuffle we have issues
print rdd.repartition(3).toDF(schema).take(2)
}}



> Row objects in pyspark using the `Row(**kwars)` syntax do not get 
> serialized/deserialized properly
> --
>
> Key: SPARK-22232
> URL: https://issues.apache.org/jira/browse/SPARK-22232
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.2.0
>Reporter: Bago Amirbekian
>
> The fields in a Row object created from a dict (ie {{Row(**kwargs)}}) should 
> be accessed by field name, not by position because {{Row.__new__}} sorts the 
> fields alphabetically by name. It seems like this promise is not being 
> honored when these Row objects are shuffled. I've included an example to help 
> reproduce the issue.
> {{
> from pyspark.sql.types import *
> from pyspark.sql import *
> def toRow(i):
>   return Row(a="a", c=3.0, b=2)
> schema = StructType([
>   StructField("a", StringType(),  False),
>   StructField("c", FloatType(), False),
>   StructField("b", IntegerType(), False),
> ])
> rdd = sc.parallelize(range(10)).repartition(2).map(lambda i: toRow(i))
> # As long as we don't shuffle things work fine.
> print rdd.toDF(schema).take(2)
> # If we introduce a shuffle we have issues
> print rdd.repartition(3).toDF(schema).take(2)
> }}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-22232) Row objects in pyspark using the `Row(**kwars)` syntax do not get serialized/deserialized properly

2017-10-09 Thread Bago Amirbekian (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22232?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bago Amirbekian updated SPARK-22232:

Description: 
The fields in a Row object created from a dict (ie {{Row(**kwargs)}}) should be 
accessed by field name, not by position because `Row.__new__` sorts the fields 
alphabetically by name. It seems like this promise is not being honored when 
these Row objects are shuffled. I've included an example to help reproduce the 
issue.



{{
from pyspark.sql.types import *
from pyspark.sql import *

def toRow(i):
  return Row(a="a", c=3.0, b=2)

schema = StructType([
  StructField("a", StringType(),  False),
  StructField("c", FloatType(), False),
  StructField("b", IntegerType(), False),
])

rdd = sc.parallelize(range(10)).repartition(2).map(lambda i: toRow(i))

# As long as we don't shuffle things work fine.
print rdd.toDF(schema).take(2)

# If we introduce a shuffle we have issues
print rdd.repartition(3).toDF(schema).take(2)
}}


  was:
bq. The fields in a Row object created from a dict (ie `Row(**kwargs)`) should 
be accessed by field name, not by position because `Row.__new__` sorts the 
fields alphabetically by name. It seems like this promise is not being honored 
when these Row objects are shuffled. I've included an example to help reproduce 
the issue.



{{
from pyspark.sql.types import *
from pyspark.sql import *

def toRow(i):
  return Row(a="a", c=3.0, b=2)

schema = StructType([
  StructField("a", StringType(),  False),
  StructField("c", FloatType(), False),
  StructField("b", IntegerType(), False),
])

rdd = sc.parallelize(range(10)).repartition(2).map(lambda i: toRow(i))

# As long as we don't shuffle things work fine.
print rdd.toDF(schema).take(2)

# If we introduce a shuffle we have issues
print rdd.repartition(3).toDF(schema).take(2)
}}



> Row objects in pyspark using the `Row(**kwars)` syntax do not get 
> serialized/deserialized properly
> --
>
> Key: SPARK-22232
> URL: https://issues.apache.org/jira/browse/SPARK-22232
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.2.0
>Reporter: Bago Amirbekian
>
> The fields in a Row object created from a dict (ie {{Row(**kwargs)}}) should 
> be accessed by field name, not by position because `Row.__new__` sorts the 
> fields alphabetically by name. It seems like this promise is not being 
> honored when these Row objects are shuffled. I've included an example to help 
> reproduce the issue.
> {{
> from pyspark.sql.types import *
> from pyspark.sql import *
> def toRow(i):
>   return Row(a="a", c=3.0, b=2)
> schema = StructType([
>   StructField("a", StringType(),  False),
>   StructField("c", FloatType(), False),
>   StructField("b", IntegerType(), False),
> ])
> rdd = sc.parallelize(range(10)).repartition(2).map(lambda i: toRow(i))
> # As long as we don't shuffle things work fine.
> print rdd.toDF(schema).take(2)
> # If we introduce a shuffle we have issues
> print rdd.repartition(3).toDF(schema).take(2)
> }}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-22232) Row objects in pyspark using the `Row(**kwars)` syntax do not get serialized/deserialized properly

2017-10-09 Thread Bago Amirbekian (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22232?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bago Amirbekian updated SPARK-22232:

Description: 
The fields in a Row object created from a dict (ie `Row(**kwargs)`) should be 
accessed by field name, not by position because `Row.__new__` sorts the fields 
alphabetically by name. It seems like this promise is not being honored when 
these Row objects are shuffled. I've included an example to help reproduce the 
issue.



{{
from pyspark.sql.types import *
from pyspark.sql import *

def toRow(i):
  return Row(a="a", c=3.0, b=2)

schema = StructType([
  StructField("a", StringType(),  False),
  StructField("c", FloatType(), False),
  StructField("b", IntegerType(), False),
])

rdd = sc.parallelize(range(10)).repartition(2).map(lambda i: toRow(i))

# As long as we don't shuffle things work fine.
print rdd.toDF(schema).take(2)

# If we introduce a shuffle we have issues
print rdd.repartition(3).toDF(schema).take(2)
}}


  was:
The fields in a Row object created from a dict (ie `Row(**kwargs)`) should be 
accessed by field name, not by position because `Row.__new__` sorts the fields 
alphabetically by name. It seems like this promise is not being honored when 
these Row objects are shuffled. I've included an example to help reproduce the 
issue.



{{from pyspark.sql.types import *
from pyspark.sql import *

def toRow(i):
  return Row(a="a", c=3.0, b=2)

schema = StructType([
  StructField("a", StringType(),  False),
  StructField("c", FloatType(), False),
  StructField("b", IntegerType(), False),
])

rdd = sc.parallelize(range(10)).repartition(2).map(lambda i: toRow(i))

# As long as we don't shuffle things work fine.
print rdd.toDF(schema).take(2)

# If we introduce a shuffle we have issues
print rdd.repartition(3).toDF(schema).take(2)}}



> Row objects in pyspark using the `Row(**kwars)` syntax do not get 
> serialized/deserialized properly
> --
>
> Key: SPARK-22232
> URL: https://issues.apache.org/jira/browse/SPARK-22232
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.2.0
>Reporter: Bago Amirbekian
>
> The fields in a Row object created from a dict (ie `Row(**kwargs)`) should be 
> accessed by field name, not by position because `Row.__new__` sorts the 
> fields alphabetically by name. It seems like this promise is not being 
> honored when these Row objects are shuffled. I've included an example to help 
> reproduce the issue.
> {{
> from pyspark.sql.types import *
> from pyspark.sql import *
> def toRow(i):
>   return Row(a="a", c=3.0, b=2)
> schema = StructType([
>   StructField("a", StringType(),  False),
>   StructField("c", FloatType(), False),
>   StructField("b", IntegerType(), False),
> ])
> rdd = sc.parallelize(range(10)).repartition(2).map(lambda i: toRow(i))
> # As long as we don't shuffle things work fine.
> print rdd.toDF(schema).take(2)
> # If we introduce a shuffle we have issues
> print rdd.repartition(3).toDF(schema).take(2)
> }}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-22232) Row objects in pyspark using the `Row(**kwars)` syntax do not get serialized/deserialized properly

2017-10-09 Thread Bago Amirbekian (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22232?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bago Amirbekian updated SPARK-22232:

Description: 
The fields in a Row object created from a dict (ie `Row(**kwargs)`) should be 
accessed by field name, not by position because `Row.__new__` sorts the fields 
alphabetically by name. It seems like this promise is not being honored when 
these Row objects are shuffled. I've included an example to help reproduce the 
issue.



{{from pyspark.sql.types import *
from pyspark.sql import *

def toRow(i):
  return Row(a="a", c=3.0, b=2)

schema = StructType([
  StructField("a", StringType(),  False),
  StructField("c", FloatType(), False),
  StructField("b", IntegerType(), False),
])

rdd = sc.parallelize(range(10)).repartition(2).map(lambda i: toRow(i))

# As long as we don't shuffle things work fine.
print rdd.toDF(schema).take(2)

# If we introduce a shuffle we have issues
print rdd.repartition(3).toDF(schema).take(2)}}


  was:
The fields in a Row object created from a dict (ie `Row(**kwargs)`) should be 
accessed by field name, not by position because `Row.__new__` sorts the fields 
alphabetically by name. It seems like this promise is not being honored when 
these Row objects are shuffled. I've included an example to help reproduce the 
issue.



```
from pyspark.sql.types import *
from pyspark.sql import *

def toRow(i):
  return Row(a="a", c=3.0, b=2)

schema = StructType([
  StructField("a", StringType(),  False),
  StructField("c", FloatType(), False),
  StructField("b", IntegerType(), False),
])

rdd = sc.parallelize(range(10)).repartition(2).map(lambda i: toRow(i))

# As long as we don't shuffle things work fine.
print rdd.toDF(schema).take(2)

# If we introduce a shuffle we have issues
print rdd.repartition(3).toDF(schema).take(2)
```


> Row objects in pyspark using the `Row(**kwars)` syntax do not get 
> serialized/deserialized properly
> --
>
> Key: SPARK-22232
> URL: https://issues.apache.org/jira/browse/SPARK-22232
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.2.0
>Reporter: Bago Amirbekian
>
> The fields in a Row object created from a dict (ie `Row(**kwargs)`) should be 
> accessed by field name, not by position because `Row.__new__` sorts the 
> fields alphabetically by name. It seems like this promise is not being 
> honored when these Row objects are shuffled. I've included an example to help 
> reproduce the issue.
> {{from pyspark.sql.types import *
> from pyspark.sql import *
> def toRow(i):
>   return Row(a="a", c=3.0, b=2)
> schema = StructType([
>   StructField("a", StringType(),  False),
>   StructField("c", FloatType(), False),
>   StructField("b", IntegerType(), False),
> ])
> rdd = sc.parallelize(range(10)).repartition(2).map(lambda i: toRow(i))
> # As long as we don't shuffle things work fine.
> print rdd.toDF(schema).take(2)
> # If we introduce a shuffle we have issues
> print rdd.repartition(3).toDF(schema).take(2)}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-22232) Row objects in pyspark using the `Row(**kwars)` syntax do not get serialized/deserialized properly

2017-10-09 Thread Bago Amirbekian (JIRA)
Bago Amirbekian created SPARK-22232:
---

 Summary: Row objects in pyspark using the `Row(**kwars)` syntax do 
not get serialized/deserialized properly
 Key: SPARK-22232
 URL: https://issues.apache.org/jira/browse/SPARK-22232
 Project: Spark
  Issue Type: Bug
  Components: PySpark
Affects Versions: 2.2.0
Reporter: Bago Amirbekian


The fields in a Row object created from a dict (ie `Row(**kwargs)`) should be 
accessed by field name, not by position because `Row.__new__` sorts the fields 
alphabetically by name. It seems like this promise is not being honored when 
these Row objects are shuffled. I've included an example to help reproduce the 
issue.


```
from pyspark.sql.types import *
from pyspark.sql import *

def toRow(i):
  return Row(a="a", c=3.0, b=2)

schema = StructType([
  StructField("a", StringType(),  False),
  StructField("c", FloatType(), False),
  StructField("b", IntegerType(), False),
])

rdd = sc.parallelize(range(10)).repartition(2).map(lambda i: toRow(i))

# As long as we don't shuffle things work fine.
print rdd.toDF(schema).take(2)

# If we introduce a shuffle we have issues
print rdd.repartition(3).toDF(schema).take(2)
```



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-22232) Row objects in pyspark using the `Row(**kwars)` syntax do not get serialized/deserialized properly

2017-10-09 Thread Bago Amirbekian (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22232?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bago Amirbekian updated SPARK-22232:

Description: 
The fields in a Row object created from a dict (ie `Row(**kwargs)`) should be 
accessed by field name, not by position because `Row.__new__` sorts the fields 
alphabetically by name. It seems like this promise is not being honored when 
these Row objects are shuffled. I've included an example to help reproduce the 
issue.



```
from pyspark.sql.types import *
from pyspark.sql import *

def toRow(i):
  return Row(a="a", c=3.0, b=2)

schema = StructType([
  StructField("a", StringType(),  False),
  StructField("c", FloatType(), False),
  StructField("b", IntegerType(), False),
])

rdd = sc.parallelize(range(10)).repartition(2).map(lambda i: toRow(i))

# As long as we don't shuffle things work fine.
print rdd.toDF(schema).take(2)

# If we introduce a shuffle we have issues
print rdd.repartition(3).toDF(schema).take(2)
```

  was:
The fields in a Row object created from a dict (ie `Row(**kwargs)`) should be 
accessed by field name, not by position because `Row.__new__` sorts the fields 
alphabetically by name. It seems like this promise is not being honored when 
these Row objects are shuffled. I've included an example to help reproduce the 
issue.


```
from pyspark.sql.types import *
from pyspark.sql import *

def toRow(i):
  return Row(a="a", c=3.0, b=2)

schema = StructType([
  StructField("a", StringType(),  False),
  StructField("c", FloatType(), False),
  StructField("b", IntegerType(), False),
])

rdd = sc.parallelize(range(10)).repartition(2).map(lambda i: toRow(i))

# As long as we don't shuffle things work fine.
print rdd.toDF(schema).take(2)

# If we introduce a shuffle we have issues
print rdd.repartition(3).toDF(schema).take(2)
```


> Row objects in pyspark using the `Row(**kwars)` syntax do not get 
> serialized/deserialized properly
> --
>
> Key: SPARK-22232
> URL: https://issues.apache.org/jira/browse/SPARK-22232
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.2.0
>Reporter: Bago Amirbekian
>
> The fields in a Row object created from a dict (ie `Row(**kwargs)`) should be 
> accessed by field name, not by position because `Row.__new__` sorts the 
> fields alphabetically by name. It seems like this promise is not being 
> honored when these Row objects are shuffled. I've included an example to help 
> reproduce the issue.
> ```
> from pyspark.sql.types import *
> from pyspark.sql import *
> def toRow(i):
>   return Row(a="a", c=3.0, b=2)
> schema = StructType([
>   StructField("a", StringType(),  False),
>   StructField("c", FloatType(), False),
>   StructField("b", IntegerType(), False),
> ])
> rdd = sc.parallelize(range(10)).repartition(2).map(lambda i: toRow(i))
> # As long as we don't shuffle things work fine.
> print rdd.toDF(schema).take(2)
> # If we introduce a shuffle we have issues
> print rdd.repartition(3).toDF(schema).take(2)
> ```



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22159) spark.sql.execution.arrow.enable and spark.sql.codegen.aggregate.map.twolevel.enable -> enabled

2017-10-09 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16198052#comment-16198052
 ] 

Apache Spark commented on SPARK-22159:
--

User 'ueshin' has created a pull request for this issue:
https://github.com/apache/spark/pull/19462

> spark.sql.execution.arrow.enable and 
> spark.sql.codegen.aggregate.map.twolevel.enable -> enabled
> ---
>
> Key: SPARK-22159
> URL: https://issues.apache.org/jira/browse/SPARK-22159
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Reynold Xin
>Assignee: Reynold Xin
> Fix For: 2.3.0
>
>
> We should make the config names consistent. They are supposed to end with 
> ".enabled", rather than "enable".



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-22231) Support of map, filter, withColumn, dropColumn in nested list of structures

2017-10-09 Thread DB Tsai (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22231?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

DB Tsai updated SPARK-22231:

Description: 
At Netflix's algorithm team, we work on ranking problems to find the great 
content to fulfill the unique tastes of our members. Before building a 
recommendation algorithms, we need to prepare the training, testing, and 
validation datasets in Apache Spark. Due to the nature of ranking problems, we 
have a nested list of items to be ranked in one column, and the top level is 
the contexts describing the setting for where a model is to be used (e.g. 
profiles, country, time, device, etc.)  Here is a blog post describing the 
details, [Distributed Time Travel for Feature 
Generation|https://medium.com/netflix-techblog/distributed-time-travel-for-feature-generation-389cccdd3907].
 
To be more concrete, for the ranks of videos for a given profile_id at a given 
country, our data schema can be looked like this,
{code:java}
root
 |-- profile_id: long (nullable = true)
 |-- country_iso_code: string (nullable = true)
 |-- items: array (nullable = false)
 ||-- element: struct (containsNull = false)
 |||-- title_id: integer (nullable = true)
 |||-- scores: double (nullable = true)
...
{code}

We oftentimes need to work on the nested list of structs by applying some 
functions on them. Sometimes, we're dropping or adding new columns in the 
nested list of structs. Currently, there is no easy solution in open source 
Apache Spark to perform those operations using SQL primitives; many people just 
convert the data into RDD to work on the nested level of data, and then 
reconstruct the new dataframe as workaround. This is extremely inefficient 
because all the optimizations like predicate pushdown in SQL can not be 
performed, we can not leverage on the columnar format, and the serialization 
and deserialization cost becomes really huge even we just want to add a new 
column in the nested level.

We built a solution internally at Netflix which we're very happy with. We plan 
to make it open source in Spark upstream. We would like to socialize the API 
design to see if we miss any use-case.  

The first API we added is *mapItems* on dataframe which take a function from 
*Column* to *Column*, and then apply the function on nested dataframe. Here is 
an example,
{code:java}
case class Data(foo: Int, bar: Double, items: Seq[Double])

val df: Dataset[Data] = spark.createDataset(Seq(
  Data(10, 10.0, Seq(10.1, 10.2, 10.3, 10.4)),
  Data(20, 20.0, Seq(20.1, 20.2, 20.3, 20.4))
))

val result = df.mapItems("items") {
  item => item * 2.0
}

result.printSchema()
// root
// |-- foo: integer (nullable = false)
// |-- bar: double (nullable = false)
// |-- items: array (nullable = true)
// ||-- element: double (containsNull = true)

result.show()
// +---+++
// |foo| bar|   items|
// +---+++
// | 10|10.0|[20.2, 20.4, 20.6...|
// | 20|20.0|[40.2, 40.4, 40.6...|
// +---+++
{code}

Now, with the ability of applying a function in the nested dataframe, we can 
add a new function, *withColumn* in *Column* to add or replace the existing 
column that has the same name in the nested list of struct. Here is two 
examples demonstrating the API together with *mapItems*; the first one replaces 
the existing column,
{code:java}
case class Item(a: Int, b: Double)

case class Data(foo: Int, bar: Double, items: Seq[Item])

val df: Dataset[Data] = spark.createDataset(Seq(
  Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))),
  Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0)))
))

val result = df.mapItems("items") {
  item => item.withColumn(item("b") + 1 as "b")
}

result.printSchema
root
// |-- foo: integer (nullable = false)
// |-- bar: double (nullable = false)
// |-- items: array (nullable = true)
// ||-- element: struct (containsNull = true)
// |||-- a: integer (nullable = true)
// |||-- b: double (nullable = true)

result.show(false)
// +---++--+
// |foo|bar |items |
// +---++--+
// |10 |10.0|[[10,11.0], [11,12.0]]|
// |20 |20.0|[[20,21.0], [21,22.0]]|
// +---++--+
{code}
and the second one adds a new column in the nested dataframe.
{code:java}
val df: Dataset[Data] = spark.createDataset(Seq(
  Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))),
  Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0)))
))

val result = df.mapItems("items") {
  item => item.withColumn(item("b") + 1 as "b")
}

result.printSchema
root
// |-- foo: integer (nullable = false)
// |-- bar: double (nullable = false)
// |-- items: array (nullable = true)
// ||-- element: struct (containsNull = true)
// |||-- a: integer (nullable = true)
// |||-- b: double (nullable = true)
// |||-- c: double (nullable = true)

result.show(false)

[jira] [Updated] (SPARK-22231) Support of map, filter, withColumn, dropColumn in nested list of structures

2017-10-09 Thread DB Tsai (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22231?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

DB Tsai updated SPARK-22231:

Description: 
At Netflix's algorithm team, we work on ranking problems to find the great 
content to fulfill the unique tastes of our members. Before building a 
recommendation algorithms, we need to prepare the training, testing, and 
validation datasets in Apache Spark. Due to the nature of ranking problems, we 
have a nested list of items to be ranked in one column, and the top level is 
the contexts describing the setting for where a model is to be used (e.g. 
profiles, country, time, device, etc.)  Here is a blog post describing the 
details, [Distributed Time Travel for Feature 
Generation|https://medium.com/netflix-techblog/distributed-time-travel-for-feature-generation-389cccdd3907].
 
To be more concrete, for the ranks of videos for a given profile_id at a given 
country, our data schema can be looked like this,
{code:java}
root
 |-- profile_id: long (nullable = true)
 |-- country_iso_code: string (nullable = true)
 |-- items: array (nullable = false)
 ||-- element: struct (containsNull = false)
 |||-- title_id: integer (nullable = true)
 |||-- scores: double (nullable = true)
...
{code}

We oftentimes need to work on the nested list of structs by applying some 
functions on them. Sometimes, we're dropping or adding new columns in the 
nested list of structs. Currently, there is no easy solution in open source 
Apache Spark to perform those operations using SQL primitives; many people just 
convert the data into RDD to work on the nested level of data, and then 
reconstruct the new dataframe as workaround. This is extremely inefficient 
because all the optimizations like predicate pushdown in SQL can not be 
performed, we can not leverage on the columnar format, and the serialization 
and deserialization cost becomes really huge even we just want to add a new 
column in the nested level.

We built a solution internally at Netflix which we're very happy with. We plan 
to make it open source in Spark upstream, and we would like to socialize the 
API design to see if we miss any use-case.  

The first API we added is *mapItems* on dataframe which take a function from 
*Column* to *Column*, and then apply the function on nested dataframe. Here is 
an example,
{code:java}
case class Data(foo: Int, bar: Double, items: Seq[Double])

val df: Dataset[Data] = spark.createDataset(Seq(
  Data(10, 10.0, Seq(10.1, 10.2, 10.3, 10.4)),
  Data(20, 20.0, Seq(20.1, 20.2, 20.3, 20.4))
))

val result = df.mapItems("items") {
  item => item * 2.0
}

result.printSchema()
// root
// |-- foo: integer (nullable = false)
// |-- bar: double (nullable = false)
// |-- items: array (nullable = true)
// ||-- element: double (containsNull = true)

result.show()
// +---+++
// |foo| bar|   items|
// +---+++
// | 10|10.0|[20.2, 20.4, 20.6...|
// | 20|20.0|[40.2, 40.4, 40.6...|
// +---+++
{code}

Now, with the ability of applying a function in the nested dataframe, we can 
add a new function, *withColumn* in *Column* to add or replace the existing 
column that has the same name in the nested list of struct. Here is two 
examples demonstrating the API together with *mapItems*; the first one replaces 
the existing column,
{code:java}
case class Item(a: Int, b: Double)

case class Data(foo: Int, bar: Double, items: Seq[Item])

val df: Dataset[Data] = spark.createDataset(Seq(
  Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))),
  Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0)))
))

val result = df.mapItems("items") {
  item => item.withColumn(item("b") + 1 as "b")
}

result.printSchema
root
// |-- foo: integer (nullable = false)
// |-- bar: double (nullable = false)
// |-- items: array (nullable = true)
// ||-- element: struct (containsNull = true)
// |||-- a: integer (nullable = true)
// |||-- b: double (nullable = true)

result.show(false)
// +---++--+
// |foo|bar |items |
// +---++--+
// |10 |10.0|[[10,11.0], [11,12.0]]|
// |20 |20.0|[[20,21.0], [21,22.0]]|
// +---++--+
{code}
and the second one adds a new column in the nested dataframe.
{code:java}
val df: Dataset[Data] = spark.createDataset(Seq(
  Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))),
  Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0)))
))

val result = df.mapItems("items") {
  item => item.withColumn(item("b") + 1 as "b")
}

result.printSchema
root
// |-- foo: integer (nullable = false)
// |-- bar: double (nullable = false)
// |-- items: array (nullable = true)
// ||-- element: struct (containsNull = true)
// |||-- a: integer (nullable = true)
// |||-- b: double (nullable = true)
// |||-- c: double (nullable = true)


[jira] [Assigned] (SPARK-22230) agg(last('attr)) gives weird results for streaming

2017-10-09 Thread Shixiong Zhu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22230?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shixiong Zhu reassigned SPARK-22230:


Assignee: Jose Torres

> agg(last('attr)) gives weird results for streaming
> --
>
> Key: SPARK-22230
> URL: https://issues.apache.org/jira/browse/SPARK-22230
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Jose Torres
>Assignee: Jose Torres
>Priority: Minor
> Fix For: 2.3.0
>
>
> In stream aggregation, last('attr) yields the last value from the first 
> microbatch forever. I'm not sure if it's fair to call this a correctness bug, 
> since last doesn't have strong correctness semantics, but ignoring all rows 
> past the first microbatch is at least weird.
> Simple repro in StreamingAggregationSuite:
> val input = MemoryStream[Int]
> val aggregated = input.toDF().agg(last('value))
> testStream(aggregated, OutputMode.Complete())(
>   AddData(input, 1, 2, 3),
>   CheckLastBatch(3),
>   AddData(input, 4, 5, 6),
>   CheckLastBatch(6) // actually yields 3 again



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-22230) agg(last('attr)) gives weird results for streaming

2017-10-09 Thread Shixiong Zhu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22230?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shixiong Zhu resolved SPARK-22230.
--
Resolution: Fixed

> agg(last('attr)) gives weird results for streaming
> --
>
> Key: SPARK-22230
> URL: https://issues.apache.org/jira/browse/SPARK-22230
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Jose Torres
>Priority: Minor
> Fix For: 2.3.0
>
>
> In stream aggregation, last('attr) yields the last value from the first 
> microbatch forever. I'm not sure if it's fair to call this a correctness bug, 
> since last doesn't have strong correctness semantics, but ignoring all rows 
> past the first microbatch is at least weird.
> Simple repro in StreamingAggregationSuite:
> val input = MemoryStream[Int]
> val aggregated = input.toDF().agg(last('value))
> testStream(aggregated, OutputMode.Complete())(
>   AddData(input, 1, 2, 3),
>   CheckLastBatch(3),
>   AddData(input, 4, 5, 6),
>   CheckLastBatch(6) // actually yields 3 again



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-22220) Spark SQL: LATERAL VIEW OUTER null pointer exception with GROUP BY

2017-10-09 Thread Dian Fay (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-0?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197899#comment-16197899
 ] 

Dian Fay edited comment on SPARK-0 at 10/9/17 11:55 PM:


I did find a slightly more detailed mention in the Zeppelin logs, but it's no 
stack trace:

{code}
WARN [2017-10-09 23:28:39,445] ({pool-2-thread-32} 
NotebookServer.java[afterStatusChange]:2058) - Job 20171006-205907_443943867 is 
finished, status: ERROR, exception: null, result: %text 
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 549.0 failed 4 times, most recent failure: Lost task 0.3 in stage 549.0 
(TID 28218, $hostname, executor 1): java.lang.NullPointerException
{code}


was (Author: dmfay):
I did find a slightly more detailed mention in the Zeppelin logs, but it's no 
stack trace:

WARN [2017-10-09 23:28:39,445] ({pool-2-thread-32} 
NotebookServer.java[afterStatusChange]:2058) - Job 20171006-205907_443943867 is 
finished, status: ERROR, exception: null, result: %text 
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 549.0 failed 4 times, most recent failure: Lost task 0.3 in stage 549.0 
(TID 28218, wsdemo-dash03.dequecloud.com, executor 1): 
java.lang.NullPointerException

> Spark SQL: LATERAL VIEW OUTER null pointer exception with GROUP BY
> --
>
> Key: SPARK-0
> URL: https://issues.apache.org/jira/browse/SPARK-0
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.1
> Environment: We have Zeppelin using Spark and Livy (error is 
> reproducible without Livy) on an Ambari cluster.
>Reporter: Dian Fay
>
> Given a DataFrame having the fields name (a string) and tags (an array of 
> strings), the following Spark SQL query fails with a NullPointerException:
> {code}
> SELECT name, tag, COUNT(*)
> FROM records
> LATERAL VIEW OUTER explode(tags) AS tag
> GROUP BY name, tag
> {code}
> {code}
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
> stage 137.0 failed 4 times, most recent failure: Lost task 0.3 in stage 137.0 
> (TID 9109, $hostname, executor 1): java.lang.NullPointerException
> {code}
> The query is successful without the "outer", but obviously this excludes rows 
> with empty tags arrays. A version with outer but without aggregation also 
> succeeds, making it possible to work around this issue with a subquery:
> {code}
> SELECT name, tag
> FROM records
> LATERAL VIEW OUTER explode(tags) AS tag
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22220) Spark SQL: LATERAL VIEW OUTER null pointer exception with GROUP BY

2017-10-09 Thread Dian Fay (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-0?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197899#comment-16197899
 ] 

Dian Fay commented on SPARK-0:
--

I did find a slightly more detailed mention in the Zeppelin logs, but it's no 
stack trace:

WARN [2017-10-09 23:28:39,445] ({pool-2-thread-32} 
NotebookServer.java[afterStatusChange]:2058) - Job 20171006-205907_443943867 is 
finished, status: ERROR, exception: null, result: %text 
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 549.0 failed 4 times, most recent failure: Lost task 0.3 in stage 549.0 
(TID 28218, wsdemo-dash03.dequecloud.com, executor 1): 
java.lang.NullPointerException

> Spark SQL: LATERAL VIEW OUTER null pointer exception with GROUP BY
> --
>
> Key: SPARK-0
> URL: https://issues.apache.org/jira/browse/SPARK-0
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.1
> Environment: We have Zeppelin using Spark and Livy (error is 
> reproducible without Livy) on an Ambari cluster.
>Reporter: Dian Fay
>
> Given a DataFrame having the fields name (a string) and tags (an array of 
> strings), the following Spark SQL query fails with a NullPointerException:
> {code}
> SELECT name, tag, COUNT(*)
> FROM records
> LATERAL VIEW OUTER explode(tags) AS tag
> GROUP BY name, tag
> {code}
> {code}
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
> stage 137.0 failed 4 times, most recent failure: Lost task 0.3 in stage 137.0 
> (TID 9109, $hostname, executor 1): java.lang.NullPointerException
> {code}
> The query is successful without the "outer", but obviously this excludes rows 
> with empty tags arrays. A version with outer but without aggregation also 
> succeeds, making it possible to work around this issue with a subquery:
> {code}
> SELECT name, tag
> FROM records
> LATERAL VIEW OUTER explode(tags) AS tag
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22220) Spark SQL: LATERAL VIEW OUTER null pointer exception with GROUP BY

2017-10-09 Thread Dian Fay (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-0?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197883#comment-16197883
 ] 

Dian Fay commented on SPARK-0:
--

Your example does work and mine still fails, but I'm having trouble actually 
finding the stack trace -- I've looked at every log I can think of and I'm not 
seeing anything.

We are using Hortonworks' Spark-HBase connector, but I thought the DataFrame 
would abstract that away. I'll raise this over on their issue tracker and see 
if they have anything to add.

> Spark SQL: LATERAL VIEW OUTER null pointer exception with GROUP BY
> --
>
> Key: SPARK-0
> URL: https://issues.apache.org/jira/browse/SPARK-0
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.1
> Environment: We have Zeppelin using Spark and Livy (error is 
> reproducible without Livy) on an Ambari cluster.
>Reporter: Dian Fay
>
> Given a DataFrame having the fields name (a string) and tags (an array of 
> strings), the following Spark SQL query fails with a NullPointerException:
> {code}
> SELECT name, tag, COUNT(*)
> FROM records
> LATERAL VIEW OUTER explode(tags) AS tag
> GROUP BY name, tag
> {code}
> {code}
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
> stage 137.0 failed 4 times, most recent failure: Lost task 0.3 in stage 137.0 
> (TID 9109, $hostname, executor 1): java.lang.NullPointerException
> {code}
> The query is successful without the "outer", but obviously this excludes rows 
> with empty tags arrays. A version with outer but without aggregation also 
> succeeds, making it possible to work around this issue with a subquery:
> {code}
> SELECT name, tag
> FROM records
> LATERAL VIEW OUTER explode(tags) AS tag
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-22230) agg(last('attr)) gives weird results for streaming

2017-10-09 Thread Shixiong Zhu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22230?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shixiong Zhu updated SPARK-22230:
-
Fix Version/s: 2.3.0

> agg(last('attr)) gives weird results for streaming
> --
>
> Key: SPARK-22230
> URL: https://issues.apache.org/jira/browse/SPARK-22230
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Jose Torres
>Priority: Minor
> Fix For: 2.3.0
>
>
> In stream aggregation, last('attr) yields the last value from the first 
> microbatch forever. I'm not sure if it's fair to call this a correctness bug, 
> since last doesn't have strong correctness semantics, but ignoring all rows 
> past the first microbatch is at least weird.
> Simple repro in StreamingAggregationSuite:
> val input = MemoryStream[Int]
> val aggregated = input.toDF().agg(last('value))
> testStream(aggregated, OutputMode.Complete())(
>   AddData(input, 1, 2, 3),
>   CheckLastBatch(3),
>   AddData(input, 4, 5, 6),
>   CheckLastBatch(6) // actually yields 3 again



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-22231) Support of map, filter, withColumn, dropColumn in nested list of structures

2017-10-09 Thread DB Tsai (JIRA)
DB Tsai created SPARK-22231:
---

 Summary: Support of map, filter, withColumn, dropColumn in nested 
list of structures
 Key: SPARK-22231
 URL: https://issues.apache.org/jira/browse/SPARK-22231
 Project: Spark
  Issue Type: New Feature
  Components: SQL
Affects Versions: 2.2.0
Reporter: DB Tsai


At Netflix's algorithm team, we work on ranking problems to find the great 
content to fulfill the unique tastes of our members. Before building a 
recommendation algorithms, we need to prepare the training, testing, and 
validation datasets in Apache Spark. Due to the nature of ranking problems, we 
have a nested list of items to be ranked in one column, and the top level is 
the contexts describing the setting for where a model is to be used (e.g. 
profiles, country, time, device, etc.)  Here is a blog post describing the 
details, [Distributed Time Travel for Feature 
Generation|https://medium.com/netflix-techblog/distributed-time-travel-for-feature-generation-389cccdd3907]
 
To be more concrete, for the ranks of videos for a given profile_id at a given 
country, our data schema can be looked like this,
{code:java}
root
 |-- profile_id: long (nullable = true)
 |-- country_iso_code: string (nullable = true)
 |-- items: array (nullable = false)
 ||-- element: struct (containsNull = false)
 |||-- title_id: integer (nullable = true)
 |||-- scores: double (nullable = true)
...
{code}

We oftentimes need to work on the nested level by applying some functions in 
it, even dropping, or adding new columns in the nested level. Currently, there 
is no easy solution in open source Apache Spark to perform those operations 
using SQL primitives; many people we talked to just converting the data into 
RDD to work on the nested level of data, and then reconstruct the new 
dataframe. This is extremely inefficient because all the optimizations like 
predicate pushdown in SQL can not be performed, we can not leverage on the 
columnar format, and the serialization and deserialization cost becomes really 
huge even we just want to add a new column in the nested level.

We built a solution internally at Netflix which we're very happy with. We plan 
to make it open source in Spark upstream, and we would like to socialize the 
API design to see if we miss any use-case.  

The first API we added is *mapItems* on dataframe which take a function from 
*Column* to *Column*, and then apply the function on nested dataframe. Here is 
an example,
{code:java}
case class Data(foo: Int, bar: Double, items: Seq[Double])

val df: Dataset[Data] = spark.createDataset(Seq(
  Data(10, 10.0, Seq(10.1, 10.2, 10.3, 10.4)),
  Data(20, 20.0, Seq(20.1, 20.2, 20.3, 20.4))
))

val result = df.mapItems("items") {
  item => item * 2.0
}

result.printSchema()
// root
// |-- foo: integer (nullable = false)
// |-- bar: double (nullable = false)
// |-- items: array (nullable = true)
// ||-- element: double (containsNull = true)

result.show()
// +---+++
// |foo| bar|   items|
// +---+++
// | 10|10.0|[20.2, 20.4, 20.6...|
// | 20|20.0|[40.2, 40.4, 40.6...|
// +---+++
{code}

Now, with the ability of applying a function in the nested dataframe, we can 
add a new function, *withColumn* in *Column* to add or replace the existing 
column that has the same name in the nested list of struct. Here is two 
examples demonstrating the API together with *mapItems*; the first one replaces 
the existing column,
{code:java}
case class Item(a: Int, b: Double)

case class Data(foo: Int, bar: Double, items: Seq[Item])

val df: Dataset[Data] = spark.createDataset(Seq(
  Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))),
  Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0)))
))

val result = df.mapItems("items") {
  item => item.withColumn(item("b") + 1 as "b")
}

result.printSchema
root
// |-- foo: integer (nullable = false)
// |-- bar: double (nullable = false)
// |-- items: array (nullable = true)
// ||-- element: struct (containsNull = true)
// |||-- a: integer (nullable = true)
// |||-- b: double (nullable = true)

result.show(false)
// +---++--+
// |foo|bar |items |
// +---++--+
// |10 |10.0|[[10,11.0], [11,12.0]]|
// |20 |20.0|[[20,21.0], [21,22.0]]|
// +---++--+
{code}
and the second one adds a new column in the nested dataframe.
{code:java}
val df: Dataset[Data] = spark.createDataset(Seq(
  Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))),
  Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0)))
))

val result = df.mapItems("items") {
  item => item.withColumn(item("b") + 1 as "b")
}

result.printSchema
root
// |-- foo: integer (nullable = false)
// |-- bar: double (nullable = false)
// |-- items: array (nullable = true)
// ||-- element: 

[jira] [Resolved] (SPARK-22170) Broadcast join holds an extra copy of rows in driver memory

2017-10-09 Thread Xiao Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiao Li resolved SPARK-22170.
-
   Resolution: Fixed
 Assignee: Ryan Blue
Fix Version/s: 2.3.0

> Broadcast join holds an extra copy of rows in driver memory
> ---
>
> Key: SPARK-22170
> URL: https://issues.apache.org/jira/browse/SPARK-22170
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.2, 2.1.1, 2.2.0
>Reporter: Ryan Blue
>Assignee: Ryan Blue
> Fix For: 2.3.0
>
>
> I investigated a driver OOM that was building a large broadcast table with a 
> memory profiler and found that a huge amount of memory is used while building 
> a broadcast table. This is because [BroadcastExchangeExec uses 
> {{executeCollect}}|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala#L76].
>  In {{executeCollect}}, all of the partitions are fetched as compressed 
> blocks, then each block is decompressed (with a stream), and each row is 
> copied to a new byte buffer and added to an ArrayBuffer, which is copied to 
> an Array. This results in a huge amount of allocation: a buffer for each row 
> in the broadcast. Those rows are only used to get copied into a 
> {{BytesToBytesMap}} that will be broadcasted, so there is no need to keep 
> them in memory.
> Replacing the array buffer step with an iterator reduces the amount of memory 
> held while creating the map by not requiring all rows to be in memory. It 
> also avoids allocating a large Array for the rows. In practice, a 16MB 
> broadcast table used 100MB less memory with this approach, but the reduction 
> depends on the size of rows and compression (16MB was in Parquet format).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-22170) Broadcast join holds an extra copy of rows in driver memory

2017-10-09 Thread Xiao Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiao Li updated SPARK-22170:

Issue Type: Improvement  (was: Bug)

> Broadcast join holds an extra copy of rows in driver memory
> ---
>
> Key: SPARK-22170
> URL: https://issues.apache.org/jira/browse/SPARK-22170
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.0.2, 2.1.1, 2.2.0
>Reporter: Ryan Blue
>Assignee: Ryan Blue
> Fix For: 2.3.0
>
>
> I investigated a driver OOM that was building a large broadcast table with a 
> memory profiler and found that a huge amount of memory is used while building 
> a broadcast table. This is because [BroadcastExchangeExec uses 
> {{executeCollect}}|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala#L76].
>  In {{executeCollect}}, all of the partitions are fetched as compressed 
> blocks, then each block is decompressed (with a stream), and each row is 
> copied to a new byte buffer and added to an ArrayBuffer, which is copied to 
> an Array. This results in a huge amount of allocation: a buffer for each row 
> in the broadcast. Those rows are only used to get copied into a 
> {{BytesToBytesMap}} that will be broadcasted, so there is no need to keep 
> them in memory.
> Replacing the array buffer step with an iterator reduces the amount of memory 
> held while creating the map by not requiring all rows to be in memory. It 
> also avoids allocating a large Array for the rows. In practice, a 16MB 
> broadcast table used 100MB less memory with this approach, but the reduction 
> depends on the size of rows and compression (16MB was in Parquet format).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Issue Comment Deleted] (SPARK-20589) Allow limiting task concurrency per stage

2017-10-09 Thread Mani Vijayakumar (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20589?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mani Vijayakumar updated SPARK-20589:
-
Comment: was deleted

(was: A comment with security level 'jira-users' was removed.)

> Allow limiting task concurrency per stage
> -
>
> Key: SPARK-20589
> URL: https://issues.apache.org/jira/browse/SPARK-20589
> Project: Spark
>  Issue Type: Improvement
>  Components: Scheduler
>Affects Versions: 2.1.0
>Reporter: Thomas Graves
>
> It would be nice to have the ability to limit the number of concurrent tasks 
> per stage.  This is useful when your spark job might be accessing another 
> service and you don't want to DOS that service.  For instance Spark writing 
> to hbase or Spark doing http puts on a service.  Many times you want to do 
> this without limiting the number of partitions. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-22230) agg(last('attr)) gives weird results for streaming

2017-10-09 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22230?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-22230:


Assignee: Apache Spark

> agg(last('attr)) gives weird results for streaming
> --
>
> Key: SPARK-22230
> URL: https://issues.apache.org/jira/browse/SPARK-22230
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Jose Torres
>Assignee: Apache Spark
>Priority: Minor
>
> In stream aggregation, last('attr) yields the last value from the first 
> microbatch forever. I'm not sure if it's fair to call this a correctness bug, 
> since last doesn't have strong correctness semantics, but ignoring all rows 
> past the first microbatch is at least weird.
> Simple repro in StreamingAggregationSuite:
> val input = MemoryStream[Int]
> val aggregated = input.toDF().agg(last('value))
> testStream(aggregated, OutputMode.Complete())(
>   AddData(input, 1, 2, 3),
>   CheckLastBatch(3),
>   AddData(input, 4, 5, 6),
>   CheckLastBatch(6) // actually yields 3 again



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-22230) agg(last('attr)) gives weird results for streaming

2017-10-09 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22230?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-22230:


Assignee: (was: Apache Spark)

> agg(last('attr)) gives weird results for streaming
> --
>
> Key: SPARK-22230
> URL: https://issues.apache.org/jira/browse/SPARK-22230
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Jose Torres
>Priority: Minor
>
> In stream aggregation, last('attr) yields the last value from the first 
> microbatch forever. I'm not sure if it's fair to call this a correctness bug, 
> since last doesn't have strong correctness semantics, but ignoring all rows 
> past the first microbatch is at least weird.
> Simple repro in StreamingAggregationSuite:
> val input = MemoryStream[Int]
> val aggregated = input.toDF().agg(last('value))
> testStream(aggregated, OutputMode.Complete())(
>   AddData(input, 1, 2, 3),
>   CheckLastBatch(3),
>   AddData(input, 4, 5, 6),
>   CheckLastBatch(6) // actually yields 3 again



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22230) agg(last('attr)) gives weird results for streaming

2017-10-09 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197685#comment-16197685
 ] 

Apache Spark commented on SPARK-22230:
--

User 'joseph-torres' has created a pull request for this issue:
https://github.com/apache/spark/pull/19461

> agg(last('attr)) gives weird results for streaming
> --
>
> Key: SPARK-22230
> URL: https://issues.apache.org/jira/browse/SPARK-22230
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Jose Torres
>Priority: Minor
>
> In stream aggregation, last('attr) yields the last value from the first 
> microbatch forever. I'm not sure if it's fair to call this a correctness bug, 
> since last doesn't have strong correctness semantics, but ignoring all rows 
> past the first microbatch is at least weird.
> Simple repro in StreamingAggregationSuite:
> val input = MemoryStream[Int]
> val aggregated = input.toDF().agg(last('value))
> testStream(aggregated, OutputMode.Complete())(
>   AddData(input, 1, 2, 3),
>   CheckLastBatch(3),
>   AddData(input, 4, 5, 6),
>   CheckLastBatch(6) // actually yields 3 again



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-22230) agg(last('attr)) gives weird results for streaming

2017-10-09 Thread Jose Torres (JIRA)
Jose Torres created SPARK-22230:
---

 Summary: agg(last('attr)) gives weird results for streaming
 Key: SPARK-22230
 URL: https://issues.apache.org/jira/browse/SPARK-22230
 Project: Spark
  Issue Type: Bug
  Components: Structured Streaming
Affects Versions: 2.2.0
Reporter: Jose Torres
Priority: Minor


In stream aggregation, last('attr) yields the last value from the first 
microbatch forever. I'm not sure if it's fair to call this a correctness bug, 
since last doesn't have strong correctness semantics, but ignoring all rows 
past the first microbatch is at least weird.

Simple repro in StreamingAggregationSuite:

val input = MemoryStream[Int]

val aggregated = input.toDF().agg(last('value))
testStream(aggregated, OutputMode.Complete())(
  AddData(input, 1, 2, 3),
  CheckLastBatch(3),
  AddData(input, 4, 5, 6),
  CheckLastBatch(6) // actually yields 3 again



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-19430) Cannot read external tables with VARCHAR columns if they're backed by ORC files written by Hive 1.2.1

2017-10-09 Thread Dongjoon Hyun (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19430?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun resolved SPARK-19430.
---
Resolution: Duplicate

This is resolved via SPARK-19459 in Spark 2.2.

> Cannot read external tables with VARCHAR columns if they're backed by ORC 
> files written by Hive 1.2.1
> -
>
> Key: SPARK-19430
> URL: https://issues.apache.org/jira/browse/SPARK-19430
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.3, 2.0.2, 2.1.0
>Reporter: Sameer Agarwal
>
> Spark throws an exception when trying to read external tables with VARCHAR 
> columns if they're backed by ORC files that were written by Hive 1.2.1 (and 
> possibly other versions of hive).
> Steps to reproduce (credits to [~lian cheng]):
> # Write an ORC table using Hive 1.2.1 with
>{noformat}
> CREATE TABLE orc_varchar_test STORED AS ORC
> AS SELECT CASTE('a' AS VARCHAR(10)) AS c0{noformat}
> # Get the raw path of the written ORC file
> # Create an external table pointing to this file and read the table using 
> Spark
>   {noformat}
> val path = "/tmp/orc_varchar_test"
> sql(s"create external table if not exists test (c0 varchar(10)) stored as orc 
> location '$path'")
> spark.table("test").show(){noformat}
> The problem here is that the metadata in the ORC file written by Hive is 
> different from those written by Spark. We can inspect the ORC file written 
> above:
> {noformat}
> $ hive --orcfiledump 
> file:///Users/lian/local/var/lib/hive/warehouse_1.2.1/orc_varchar_test/00_0
> Structure for 
> file:///Users/lian/local/var/lib/hive/warehouse_1.2.1/orc_varchar_test/00_0
> File Version: 0.12 with HIVE_8732
> Rows: 1
> Compression: ZLIB
> Compression size: 262144
> Type: struct<_col0:varchar(10)>   <
> ...
> {noformat}
> On the other hand, if you create an ORC table using the same DDL and inspect 
> the written ORC file, you'll see:
> {noformat}
> ...
> Type: struct
> ...
> {noformat}
> Note that all tests are done with {{spark.sql.hive.convertMetastoreOrc}} set 
> to {{false}}, which is the default case.
> I've verified that Spark 1.6.x, 2.0.x and 2.1.x all fail with instances of 
> the following error:
> {code}
> java.lang.ClassCastException: 
> org.apache.hadoop.hive.serde2.io.HiveVarcharWritable cannot be cast to 
> org.apache.hadoop.io.Text
> at 
> org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector.getPrimitiveWritableObject(WritableStringObjectInspector.java:41)
> at 
> org.apache.spark.sql.hive.HiveInspectors$$anonfun$unwrapperFor$23.apply(HiveInspectors.scala:529)
> at 
> org.apache.spark.sql.hive.HadoopTableReader$$anonfun$14$$anonfun$apply$15.apply(TableReader.scala:419)
> at 
> org.apache.spark.sql.hive.HadoopTableReader$$anonfun$14$$anonfun$apply$15.apply(TableReader.scala:419)
> at 
> org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:435)
> at 
> org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:426)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown
>  Source)
> at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
> at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at 
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:126)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
> at org.apache.spark.scheduler.Task.run(Task.scala:99)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-22222) Fix the ARRAY_MAX in BufferHolder and add a test

2017-10-09 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-2?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-2:


Assignee: Apache Spark

> Fix the ARRAY_MAX in BufferHolder and add a test
> 
>
> Key: SPARK-2
> URL: https://issues.apache.org/jira/browse/SPARK-2
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: Feng Liu
>Assignee: Apache Spark
>
> This is actually a followup for SPARK-22033, which set the `ARRAY_MAX` to 
> `Int.MaxValue - 8`. It is not a valid number because it will cause the 
> following line fail when such a large byte array is allocated: 
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java#L86
>  We need to make sure the new length is a multiple of 8.
> We need to add one test for the fix. Note that the test should work 
> independently with the heap size of the test JVM. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22222) Fix the ARRAY_MAX in BufferHolder and add a test

2017-10-09 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197620#comment-16197620
 ] 

Apache Spark commented on SPARK-2:
--

User 'liufengdb' has created a pull request for this issue:
https://github.com/apache/spark/pull/19460

> Fix the ARRAY_MAX in BufferHolder and add a test
> 
>
> Key: SPARK-2
> URL: https://issues.apache.org/jira/browse/SPARK-2
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: Feng Liu
>
> This is actually a followup for SPARK-22033, which set the `ARRAY_MAX` to 
> `Int.MaxValue - 8`. It is not a valid number because it will cause the 
> following line fail when such a large byte array is allocated: 
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java#L86
>  We need to make sure the new length is a multiple of 8.
> We need to add one test for the fix. Note that the test should work 
> independently with the heap size of the test JVM. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-22222) Fix the ARRAY_MAX in BufferHolder and add a test

2017-10-09 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-2?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-2:


Assignee: (was: Apache Spark)

> Fix the ARRAY_MAX in BufferHolder and add a test
> 
>
> Key: SPARK-2
> URL: https://issues.apache.org/jira/browse/SPARK-2
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: Feng Liu
>
> This is actually a followup for SPARK-22033, which set the `ARRAY_MAX` to 
> `Int.MaxValue - 8`. It is not a valid number because it will cause the 
> following line fail when such a large byte array is allocated: 
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java#L86
>  We need to make sure the new length is a multiple of 8.
> We need to add one test for the fix. Note that the test should work 
> independently with the heap size of the test JVM. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-22218) spark shuffle services fails to update secret on application re-attempts

2017-10-09 Thread Marcelo Vanzin (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22218?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Marcelo Vanzin updated SPARK-22218:
---
Affects Version/s: (was: 2.2.0)
   2.2.1

> spark shuffle services fails to update secret on application re-attempts
> 
>
> Key: SPARK-22218
> URL: https://issues.apache.org/jira/browse/SPARK-22218
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle, YARN
>Affects Versions: 2.2.1
>Reporter: Thomas Graves
>Priority: Blocker
> Fix For: 2.2.1, 2.3.0
>
>
> Running on yarn, If you have any application re-attempts using the spark 2.2 
> shuffle service, the external shuffle service does not update the credentials 
> properly and the application re-attempts fail with 
> javax.security.sasl.SaslException. 
> A bug was fixed in 2.2 (SPARK-21494) where it changed the 
> ShuffleSecretManager to use containsKey 
> (https://git.corp.yahoo.com/hadoop/spark/blob/yspark_2_2_0/common/network-shuffle/src/main/java/org/apache/spark/network/sasl/ShuffleSecretManager.java#L50)
>  , which is the proper behavior, the problem is that between application 
> re-attempts it never removes the key. So when the second attempt starts, the 
> code says it already contains the key (since the application id is the same) 
> and it doesn't update the secret properly.
> to reproduce this you can run something like a word count and have the 
> directory already existing.  The first attempt will fail because the output 
> directory exists, the subsequent attempts will fail with max number of 
> executor failures.   Note that this is assuming the second and third attempts 
> run on the same node as the first attempt.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-22218) spark shuffle services fails to update secret on application re-attempts

2017-10-09 Thread Marcelo Vanzin (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22218?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Marcelo Vanzin resolved SPARK-22218.

   Resolution: Fixed
 Assignee: Thomas Graves
Fix Version/s: 2.3.0
   2.2.1

> spark shuffle services fails to update secret on application re-attempts
> 
>
> Key: SPARK-22218
> URL: https://issues.apache.org/jira/browse/SPARK-22218
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle, YARN
>Affects Versions: 2.2.1
>Reporter: Thomas Graves
>Assignee: Thomas Graves
>Priority: Blocker
> Fix For: 2.2.1, 2.3.0
>
>
> Running on yarn, If you have any application re-attempts using the spark 2.2 
> shuffle service, the external shuffle service does not update the credentials 
> properly and the application re-attempts fail with 
> javax.security.sasl.SaslException. 
> A bug was fixed in 2.2 (SPARK-21494) where it changed the 
> ShuffleSecretManager to use containsKey 
> (https://git.corp.yahoo.com/hadoop/spark/blob/yspark_2_2_0/common/network-shuffle/src/main/java/org/apache/spark/network/sasl/ShuffleSecretManager.java#L50)
>  , which is the proper behavior, the problem is that between application 
> re-attempts it never removes the key. So when the second attempt starts, the 
> code says it already contains the key (since the application id is the same) 
> and it doesn't update the secret properly.
> to reproduce this you can run something like a word count and have the 
> directory already existing.  The first attempt will fail because the output 
> directory exists, the subsequent attempts will fail with max number of 
> executor failures.   Note that this is assuming the second and third attempts 
> run on the same node as the first attempt.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-21568) ConsoleProgressBar should only be enabled in shells

2017-10-09 Thread Marcelo Vanzin (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21568?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Marcelo Vanzin resolved SPARK-21568.

   Resolution: Fixed
 Assignee: Dongjoon Hyun
Fix Version/s: 2.3.0

> ConsoleProgressBar should only be enabled in shells
> ---
>
> Key: SPARK-21568
> URL: https://issues.apache.org/jira/browse/SPARK-21568
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: Marcelo Vanzin
>Assignee: Dongjoon Hyun
>Priority: Minor
> Fix For: 2.3.0
>
>
> This is the current logic that enables the progress bar:
> {code}
> _progressBar =
>   if (_conf.getBoolean("spark.ui.showConsoleProgress", true) && 
> !log.isInfoEnabled) {
> Some(new ConsoleProgressBar(this))
>   } else {
> None
>   }
> {code}
> That is based on the logging level; it just happens to align with the default 
> configuration for shells (WARN) and normal apps (INFO).
> But if someone changes the default logging config for their app, this may 
> break; they may silence logs by setting the default level to WARN or ERROR, 
> and a normal application will see a lot of log spam from the progress bar 
> (which is especially bad when output is redirected to a file, as is usually 
> done when running in cluster mode).
> While it's possible to disable the progress bar separately, this behavior is 
> not really expected.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-20791) Use Apache Arrow to Improve Spark createDataFrame from Pandas.DataFrame

2017-10-09 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20791?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-20791:


Assignee: Apache Spark

> Use Apache Arrow to Improve Spark createDataFrame from Pandas.DataFrame
> ---
>
> Key: SPARK-20791
> URL: https://issues.apache.org/jira/browse/SPARK-20791
> Project: Spark
>  Issue Type: New Feature
>  Components: PySpark, SQL
>Affects Versions: 2.1.1
>Reporter: Bryan Cutler
>Assignee: Apache Spark
>
> The current code for creating a Spark DataFrame from a Pandas DataFrame uses 
> `to_records` to convert the DataFrame to a list of records and then converts 
> each record to a list.  Following this, there are a number of calls to 
> serialize and transfer this data to the JVM.  This process is very 
> inefficient and also discards all schema metadata, requiring another pass 
> over the data to infer types.
> Using Apache Arrow, the Pandas DataFrame could be efficiently converted to 
> Arrow data and directly transferred to the JVM to create the Spark DataFrame. 
>  The performance will be better and the Pandas schema will also be used so 
> that the correct types will be used.  
> Issues with the poor type inference have come up before, causing confusion 
> and frustration with users because it is not clear why it fails or doesn't 
> use the same type from Pandas.  Fixing this with Apache Arrow will solve 
> another pain point for Python users and the following JIRAs could be closed:
> * SPARK-17804
> * SPARK-18178



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20791) Use Apache Arrow to Improve Spark createDataFrame from Pandas.DataFrame

2017-10-09 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197581#comment-16197581
 ] 

Apache Spark commented on SPARK-20791:
--

User 'BryanCutler' has created a pull request for this issue:
https://github.com/apache/spark/pull/19459

> Use Apache Arrow to Improve Spark createDataFrame from Pandas.DataFrame
> ---
>
> Key: SPARK-20791
> URL: https://issues.apache.org/jira/browse/SPARK-20791
> Project: Spark
>  Issue Type: New Feature
>  Components: PySpark, SQL
>Affects Versions: 2.1.1
>Reporter: Bryan Cutler
>
> The current code for creating a Spark DataFrame from a Pandas DataFrame uses 
> `to_records` to convert the DataFrame to a list of records and then converts 
> each record to a list.  Following this, there are a number of calls to 
> serialize and transfer this data to the JVM.  This process is very 
> inefficient and also discards all schema metadata, requiring another pass 
> over the data to infer types.
> Using Apache Arrow, the Pandas DataFrame could be efficiently converted to 
> Arrow data and directly transferred to the JVM to create the Spark DataFrame. 
>  The performance will be better and the Pandas schema will also be used so 
> that the correct types will be used.  
> Issues with the poor type inference have come up before, causing confusion 
> and frustration with users because it is not clear why it fails or doesn't 
> use the same type from Pandas.  Fixing this with Apache Arrow will solve 
> another pain point for Python users and the following JIRAs could be closed:
> * SPARK-17804
> * SPARK-18178



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-20791) Use Apache Arrow to Improve Spark createDataFrame from Pandas.DataFrame

2017-10-09 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20791?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-20791:


Assignee: (was: Apache Spark)

> Use Apache Arrow to Improve Spark createDataFrame from Pandas.DataFrame
> ---
>
> Key: SPARK-20791
> URL: https://issues.apache.org/jira/browse/SPARK-20791
> Project: Spark
>  Issue Type: New Feature
>  Components: PySpark, SQL
>Affects Versions: 2.1.1
>Reporter: Bryan Cutler
>
> The current code for creating a Spark DataFrame from a Pandas DataFrame uses 
> `to_records` to convert the DataFrame to a list of records and then converts 
> each record to a list.  Following this, there are a number of calls to 
> serialize and transfer this data to the JVM.  This process is very 
> inefficient and also discards all schema metadata, requiring another pass 
> over the data to infer types.
> Using Apache Arrow, the Pandas DataFrame could be efficiently converted to 
> Arrow data and directly transferred to the JVM to create the Spark DataFrame. 
>  The performance will be better and the Pandas schema will also be used so 
> that the correct types will be used.  
> Issues with the poor type inference have come up before, causing confusion 
> and frustration with users because it is not clear why it fails or doesn't 
> use the same type from Pandas.  Fixing this with Apache Arrow will solve 
> another pain point for Python users and the following JIRAs could be closed:
> * SPARK-17804
> * SPARK-18178



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-22229) SPIP: RDMA Accelerated Shuffle Engine

2017-10-09 Thread Yuval Degani (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-9?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuval Degani updated SPARK-9:
-
Attachment: SPARK-9_SPIP_RDMA_Accelerated_Shuffle_Engine_Rev_1.0.pdf

> SPIP: RDMA Accelerated Shuffle Engine
> -
>
> Key: SPARK-9
> URL: https://issues.apache.org/jira/browse/SPARK-9
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: Yuval Degani
> Attachments: 
> SPARK-9_SPIP_RDMA_Accelerated_Shuffle_Engine_Rev_1.0.pdf
>
>
> An RDMA-accelerated shuffle engine can provide enormous performance benefits 
> to shuffle-intensive Spark jobs, as demonstrated in the “SparkRDMA” plugin 
> open-source project ([https://github.com/Mellanox/SparkRDMA]).
> Using RDMA for shuffle improves CPU utilization significantly and reduces I/O 
> processing overhead by bypassing the kernel and networking stack as well as 
> avoiding memory copies entirely. Those valuable CPU cycles are then consumed 
> directly by the actual Spark workloads, and help reducing the job runtime 
> significantly. 
> This performance gain is demonstrated with both industry standard HiBench 
> TeraSort (shows 1.5x speedup in sorting) as well as shuffle intensive 
> customer applications. 
> SparkRDMA will be presented at Spark Summit 2017 in Dublin 
> ([https://spark-summit.org/eu-2017/events/accelerating-shuffle-a-tailor-made-rdma-solution-for-apache-spark/]).
> Please see attached proposal document for more information.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-22229) SPIP: RDMA Accelerated Shuffle Engine

2017-10-09 Thread Yuval Degani (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-9?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuval Degani updated SPARK-9:
-
Description: 
An RDMA-accelerated shuffle engine can provide enormous performance benefits to 
shuffle-intensive Spark jobs, as demonstrated in the “SparkRDMA” plugin 
open-source project ([https://github.com/Mellanox/SparkRDMA]).
Using RDMA for shuffle improves CPU utilization significantly and reduces I/O 
processing overhead by bypassing the kernel and networking stack as well as 
avoiding memory copies entirely. Those valuable CPU cycles are then consumed 
directly by the actual Spark workloads, and help reducing the job runtime 
significantly. 
This performance gain is demonstrated with both industry standard HiBench 
TeraSort (shows 1.5x speedup in sorting) as well as shuffle intensive customer 
applications. 
SparkRDMA will be presented at Spark Summit 2017 in Dublin 
([https://spark-summit.org/eu-2017/events/accelerating-shuffle-a-tailor-made-rdma-solution-for-apache-spark/]).

Please see attached proposal document for more information.

  was:
An RDMA-accelerated shuffle engine can provide enormous performance benefits to 
shuffle-intensive Spark jobs, as demonstrated in the “SparkRDMA” plugin 
open-source project ([https://github.com/Mellanox/SparkRDMA]).
Using RDMA for shuffle improves CPU utilization significantly and reduces I/O 
processing overhead by bypassing the kernel and networking stack as well as 
avoiding memory copies entirely. Those valuable CPU cycles are then consumed 
directly by the actual Spark workloads, and help reducing the job runtime 
significantly. 
This performance gain is demonstrated with both industry standard HiBench 
TeraSort (shows 1.5x speedup in sorting) as well as shuffle intensive customer 
applications. 
SparkRDMA will be presented at Spark Summit 2017 in Dublin 
([https://spark-summit.org/eu-2017/events/accelerating-shuffle-a-tailor-made-rdma-solution-for-apache-spark/])


> SPIP: RDMA Accelerated Shuffle Engine
> -
>
> Key: SPARK-9
> URL: https://issues.apache.org/jira/browse/SPARK-9
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: Yuval Degani
>
> An RDMA-accelerated shuffle engine can provide enormous performance benefits 
> to shuffle-intensive Spark jobs, as demonstrated in the “SparkRDMA” plugin 
> open-source project ([https://github.com/Mellanox/SparkRDMA]).
> Using RDMA for shuffle improves CPU utilization significantly and reduces I/O 
> processing overhead by bypassing the kernel and networking stack as well as 
> avoiding memory copies entirely. Those valuable CPU cycles are then consumed 
> directly by the actual Spark workloads, and help reducing the job runtime 
> significantly. 
> This performance gain is demonstrated with both industry standard HiBench 
> TeraSort (shows 1.5x speedup in sorting) as well as shuffle intensive 
> customer applications. 
> SparkRDMA will be presented at Spark Summit 2017 in Dublin 
> ([https://spark-summit.org/eu-2017/events/accelerating-shuffle-a-tailor-made-rdma-solution-for-apache-spark/]).
> Please see attached proposal document for more information.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-22229) SPIP: RDMA Accelerated Shuffle Engine

2017-10-09 Thread Yuval Degani (JIRA)
Yuval Degani created SPARK-9:


 Summary: SPIP: RDMA Accelerated Shuffle Engine
 Key: SPARK-9
 URL: https://issues.apache.org/jira/browse/SPARK-9
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 2.3.0
Reporter: Yuval Degani


An RDMA-accelerated shuffle engine can provide enormous performance benefits to 
shuffle-intensive Spark jobs, as demonstrated in the “SparkRDMA” plugin 
open-source project ([https://github.com/Mellanox/SparkRDMA]).
Using RDMA for shuffle improves CPU utilization significantly and reduces I/O 
processing overhead by bypassing the kernel and networking stack as well as 
avoiding memory copies entirely. Those valuable CPU cycles are then consumed 
directly by the actual Spark workloads, and help reducing the job runtime 
significantly. 
This performance gain is demonstrated with both industry standard HiBench 
TeraSort (shows 1.5x speedup in sorting) as well as shuffle intensive customer 
applications. 
SparkRDMA will be presented at Spark Summit 2017 in Dublin 
([https://spark-summit.org/eu-2017/events/accelerating-shuffle-a-tailor-made-rdma-solution-for-apache-spark/])



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-22228) Add support for Array so from_json can parse

2017-10-09 Thread kant kodali (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-8?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

kant kodali updated SPARK-8:

Description: 
{code:java}
val inputDS = Seq("""["foo", "bar"]""").toDF
{code}


{code:java}
inputDS.printSchema()
root
 |-- value: string (nullable = true)

{code}

Input Dataset inputDS

{code:java}
inputDS.show(false)

value
-
["foo", "bar"]
{code}

Expected output dataset outputDS


{code:java}
value
---
"foo" |
"bar" |

{code}

Tried explode function like below but it doesn't quite work


{code:java}
inputDS.select(explode(from_json(col("value"), ArrayType(StringType
{code}

and got the following error


{code:java}
org.apache.spark.sql.AnalysisException: cannot resolve 'jsontostructs(`value`)' 
due to data type mismatch: Input schema string must be a struct or an array of 
structs
{code}

Also tried the following


{code:java}
inputDS.select(explode(col("value")))
{code}

And got the following error


{code:java}
org.apache.spark.sql.AnalysisException: cannot resolve 'explode(`value`)' due 
to data type mismatch: input to function explode should be array or map type, 
not StringType
{code}




  was:
{code:java}
val inputDS = Seq("""["foo", "bar"]""").toDF
{code}


{code:java}
inputDS.printSchema()
root
 |-- value: string (nullable = true)

{code}

Input Dataset inputDS

{code:java}
inputDS.show(false)

value
-
["foo", "bar"]
{code}

Expected output dataset outputDS


{code:java}
value
---
"foo" |
"bar" |

{code}

Tried explode function like below but it doesn't quite work


{code:java}
inputDS.select(explode(from_json(col("value"), ArrayType(StringType
{code}

and I get the following error


{code:java}
org.apache.spark.sql.AnalysisException: cannot resolve 'jsontostructs(`value`)' 
due to data type mismatch: Input schema string must be a struct or an array of 
structs
{code}




> Add support for Array so from_json can parse
> 
>
> Key: SPARK-8
> URL: https://issues.apache.org/jira/browse/SPARK-8
> Project: Spark
>  Issue Type: Improvement
>  Components: Java API
>Affects Versions: 2.2.0
>Reporter: kant kodali
>
> {code:java}
> val inputDS = Seq("""["foo", "bar"]""").toDF
> {code}
> {code:java}
> inputDS.printSchema()
> root
>  |-- value: string (nullable = true)
> {code}
> Input Dataset inputDS
> {code:java}
> inputDS.show(false)
> value
> -
> ["foo", "bar"]
> {code}
> Expected output dataset outputDS
> {code:java}
> value
> ---
> "foo" |
> "bar" |
> {code}
> Tried explode function like below but it doesn't quite work
> {code:java}
> inputDS.select(explode(from_json(col("value"), ArrayType(StringType
> {code}
> and got the following error
> {code:java}
> org.apache.spark.sql.AnalysisException: cannot resolve 
> 'jsontostructs(`value`)' due to data type mismatch: Input schema string must 
> be a struct or an array of structs
> {code}
> Also tried the following
> {code:java}
> inputDS.select(explode(col("value")))
> {code}
> And got the following error
> {code:java}
> org.apache.spark.sql.AnalysisException: cannot resolve 'explode(`value`)' due 
> to data type mismatch: input to function explode should be array or map type, 
> not StringType
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-22228) Add support for Array so from_json can parse

2017-10-09 Thread kant kodali (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-8?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

kant kodali updated SPARK-8:

Description: 
{code:java}
val inputDS = Seq("""["foo", "bar"]""").toDF
{code}


{code:java}
inputDS.printSchema()
root
 |-- value: string (nullable = true)

{code}

Input Dataset inputDS

{code:java}
inputDS.show(false)

value
-
["foo", "bar"]
{code}

Expected output dataset outputDS


{code:java}
value
---
"foo" |
"bar" |

{code}

Tried explode function like below but it doesn't quite work


{code:java}
inputDS.select(explode(from_json(col("value"), ArrayType(StringType
{code}

and I get the following error


{code:java}
org.apache.spark.sql.AnalysisException: cannot resolve 'jsontostructs(`value`)' 
due to data type mismatch: Input schema string must be a struct or an array of 
structs
{code}



  was:
{code:java}
val inputDS = Seq("""["foo", "bar"]""").toDF
{code}

inputDS.printSchema()

root
 |-- value: string (nullable = true)
Input Dataset inputDS

inputDS.show(false)

value
-
["foo", "bar"]
Expected output dataset outputDS

value
---
"foo" |
"bar" |

Tried explode function like below but it doesn't quite work

inputDS.select(explode(from_json(col("value"), ArrayType(StringType
and I get the following error

org.apache.spark.sql.AnalysisException: cannot resolve 'jsontostructs(`value`)' 
due to data type mismatch: Input schema string must be a struct or an array of 
structs



> Add support for Array so from_json can parse
> 
>
> Key: SPARK-8
> URL: https://issues.apache.org/jira/browse/SPARK-8
> Project: Spark
>  Issue Type: Improvement
>  Components: Java API
>Affects Versions: 2.2.0
>Reporter: kant kodali
>
> {code:java}
> val inputDS = Seq("""["foo", "bar"]""").toDF
> {code}
> {code:java}
> inputDS.printSchema()
> root
>  |-- value: string (nullable = true)
> {code}
> Input Dataset inputDS
> {code:java}
> inputDS.show(false)
> value
> -
> ["foo", "bar"]
> {code}
> Expected output dataset outputDS
> {code:java}
> value
> ---
> "foo" |
> "bar" |
> {code}
> Tried explode function like below but it doesn't quite work
> {code:java}
> inputDS.select(explode(from_json(col("value"), ArrayType(StringType
> {code}
> and I get the following error
> {code:java}
> org.apache.spark.sql.AnalysisException: cannot resolve 
> 'jsontostructs(`value`)' due to data type mismatch: Input schema string must 
> be a struct or an array of structs
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-22228) Add support for Array so from_json can parse

2017-10-09 Thread kant kodali (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-8?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

kant kodali updated SPARK-8:

Description: 
{code:java}
val inputDS = Seq("""["foo", "bar"]""").toDF
{code}

inputDS.printSchema()

root
 |-- value: string (nullable = true)
Input Dataset inputDS

inputDS.show(false)

value
-
["foo", "bar"]
Expected output dataset outputDS

value
---
"foo" |
"bar" |

Tried explode function like below but it doesn't quite work

inputDS.select(explode(from_json(col("value"), ArrayType(StringType
and I get the following error

org.apache.spark.sql.AnalysisException: cannot resolve 'jsontostructs(`value`)' 
due to data type mismatch: Input schema string must be a struct or an array of 
structs


  was:
`val inputDS = Seq("""["foo", "bar"]""").toDF`

inputDS.printSchema()

root
 |-- value: string (nullable = true)
Input Dataset inputDS

inputDS.show(false)

value
-
["foo", "bar"]
Expected output dataset outputDS

value
---
"foo" |
"bar" |

Tried explode function like below but it doesn't quite work

inputDS.select(explode(from_json(col("value"), ArrayType(StringType
and I get the following error

org.apache.spark.sql.AnalysisException: cannot resolve 'jsontostructs(`value`)' 
due to data type mismatch: Input schema string must be a struct or an array of 
structs



> Add support for Array so from_json can parse
> 
>
> Key: SPARK-8
> URL: https://issues.apache.org/jira/browse/SPARK-8
> Project: Spark
>  Issue Type: Improvement
>  Components: Java API
>Affects Versions: 2.2.0
>Reporter: kant kodali
>
> {code:java}
> val inputDS = Seq("""["foo", "bar"]""").toDF
> {code}
> inputDS.printSchema()
> root
>  |-- value: string (nullable = true)
> Input Dataset inputDS
> inputDS.show(false)
> value
> -
> ["foo", "bar"]
> Expected output dataset outputDS
> value
> ---
> "foo" |
> "bar" |
> Tried explode function like below but it doesn't quite work
> inputDS.select(explode(from_json(col("value"), ArrayType(StringType
> and I get the following error
> org.apache.spark.sql.AnalysisException: cannot resolve 
> 'jsontostructs(`value`)' due to data type mismatch: Input schema string must 
> be a struct or an array of structs



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-22228) Add support for Array so from_json can parse

2017-10-09 Thread kant kodali (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-8?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

kant kodali updated SPARK-8:

Description: 
`val inputDS = Seq("""["foo", "bar"]""").toDF`

inputDS.printSchema()

root
 |-- value: string (nullable = true)
Input Dataset inputDS

inputDS.show(false)

value
-
["foo", "bar"]
Expected output dataset outputDS

value
---
"foo" |
"bar" |

Tried explode function like below but it doesn't quite work

inputDS.select(explode(from_json(col("value"), ArrayType(StringType
and I get the following error

org.apache.spark.sql.AnalysisException: cannot resolve 'jsontostructs(`value`)' 
due to data type mismatch: Input schema string must be a struct or an array of 
structs


  was:
[val inputDS = Seq("""["foo", "bar"]""").toDF]

inputDS.printSchema()

root
 |-- value: string (nullable = true)
Input Dataset inputDS

inputDS.show(false)

value
-
["foo", "bar"]
Expected output dataset outputDS

value
---
"foo" |
"bar" |

Tried explode function like below but it doesn't quite work

inputDS.select(explode(from_json(col("value"), ArrayType(StringType
and I get the following error

org.apache.spark.sql.AnalysisException: cannot resolve 'jsontostructs(`value`)' 
due to data type mismatch: Input schema string must be a struct or an array of 
structs



> Add support for Array so from_json can parse
> 
>
> Key: SPARK-8
> URL: https://issues.apache.org/jira/browse/SPARK-8
> Project: Spark
>  Issue Type: Improvement
>  Components: Java API
>Affects Versions: 2.2.0
>Reporter: kant kodali
>
> `val inputDS = Seq("""["foo", "bar"]""").toDF`
> inputDS.printSchema()
> root
>  |-- value: string (nullable = true)
> Input Dataset inputDS
> inputDS.show(false)
> value
> -
> ["foo", "bar"]
> Expected output dataset outputDS
> value
> ---
> "foo" |
> "bar" |
> Tried explode function like below but it doesn't quite work
> inputDS.select(explode(from_json(col("value"), ArrayType(StringType
> and I get the following error
> org.apache.spark.sql.AnalysisException: cannot resolve 
> 'jsontostructs(`value`)' due to data type mismatch: Input schema string must 
> be a struct or an array of structs



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-22228) Add support for Array so from_json can parse

2017-10-09 Thread kant kodali (JIRA)
kant kodali created SPARK-8:
---

 Summary: Add support for Array so from_json can 
parse
 Key: SPARK-8
 URL: https://issues.apache.org/jira/browse/SPARK-8
 Project: Spark
  Issue Type: Improvement
  Components: Java API
Affects Versions: 2.2.0
Reporter: kant kodali


[val inputDS = Seq("""["foo", "bar"]""").toDF]

inputDS.printSchema()

root
 |-- value: string (nullable = true)
Input Dataset inputDS

inputDS.show(false)

value
-
["foo", "bar"]
Expected output dataset outputDS

value
---
"foo" |
"bar" |

Tried explode function like below but it doesn't quite work

inputDS.select(explode(from_json(col("value"), ArrayType(StringType
and I get the following error

org.apache.spark.sql.AnalysisException: cannot resolve 'jsontostructs(`value`)' 
due to data type mismatch: Input schema string must be a struct or an array of 
structs




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-1529) Support DFS based shuffle in addition to Netty shuffle

2017-10-09 Thread Karthik Natarajan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197449#comment-16197449
 ] 

Karthik Natarajan commented on SPARK-1529:
--

Hello [~rkannan82]

Are there any updates for this feature? I was looking for something similar as 
well. Do you happen to have any comparisons between using hdfs to read / write 
shuffle data vs using local disk + netty ?

Thanks.

> Support DFS based shuffle in addition to Netty shuffle
> --
>
> Key: SPARK-1529
> URL: https://issues.apache.org/jira/browse/SPARK-1529
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Reporter: Patrick Wendell
>Assignee: Kannan Rajah
> Attachments: Spark Shuffle using HDFS.pdf
>
>
> In some environments, like with MapR, local volumes are accessed through the 
> Hadoop filesystem interface. Shuffle is implemented by writing intermediate 
> data to local disk and serving it to remote node using Netty as a transport 
> mechanism. We want to provide an HDFS based shuffle such that data can be 
> written to HDFS (instead of local disk) and served using HDFS API on the 
> remote nodes. This could involve exposing a file system abstraction to Spark 
> shuffle and have 2 modes of running it. In default mode, it will write to 
> local disk and in the DFS mode, it will write to HDFS.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-20589) Allow limiting task concurrency per stage

2017-10-09 Thread Michael Park (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197402#comment-16197402
 ] 

Michael Park edited comment on SPARK-20589 at 10/9/17 5:56 PM:
---

Pardon my ignorance of the inner workings of task scheduling, but is it not 
possible provide a way to mark the max-concurrency of a specific RDD? The max 
concurrency of a stage would then be the minimum max-concurrency of all RDDs 
within that stage.

Also, +1 for not being an obscure use-case. We are seeing a need for this any 
time we attempt include an external service as part of a generic pipeline. 
Ideally the bottleneck can be limited to a single stage, rather then an entire 
job. 


was (Author: mike.park):
Pardon my ignorance of the inner workings of task scheduling, but is it not 
possible provide a way to mark the max-concurrency of a specific RDD? The max 
concurrency of a stage would the be the minimum max-concurrency of all RDDs 
within that stage.

Also, +1 for not being an obscure use-case. We are seeing a need for this any 
time we attempt include an external service as part of a generic pipeline. 
Ideally the bottleneck can be limited to a single stage, rather then an entire 
job. 

> Allow limiting task concurrency per stage
> -
>
> Key: SPARK-20589
> URL: https://issues.apache.org/jira/browse/SPARK-20589
> Project: Spark
>  Issue Type: Improvement
>  Components: Scheduler
>Affects Versions: 2.1.0
>Reporter: Thomas Graves
>
> It would be nice to have the ability to limit the number of concurrent tasks 
> per stage.  This is useful when your spark job might be accessing another 
> service and you don't want to DOS that service.  For instance Spark writing 
> to hbase or Spark doing http puts on a service.  Many times you want to do 
> this without limiting the number of partitions. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20589) Allow limiting task concurrency per stage

2017-10-09 Thread Michael Park (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197402#comment-16197402
 ] 

Michael Park commented on SPARK-20589:
--

Pardon my ignorance of the inner workings of task scheduling, but is it not 
possible provide a way to mark the max-concurrency of a specific RDD? The max 
concurrency of a stage would the be the minimum max-concurrency of all RDDs 
within that stage.

Also, +1 for not being an obscure use-case. We are seeing a need for this any 
time we attempt include an external service as part of a generic pipeline. 
Ideally the bottleneck can be limited to a single stage, rather then an entire 
job. 

> Allow limiting task concurrency per stage
> -
>
> Key: SPARK-20589
> URL: https://issues.apache.org/jira/browse/SPARK-20589
> Project: Spark
>  Issue Type: Improvement
>  Components: Scheduler
>Affects Versions: 2.1.0
>Reporter: Thomas Graves
>
> It would be nice to have the ability to limit the number of concurrent tasks 
> per stage.  This is useful when your spark job might be accessing another 
> service and you don't want to DOS that service.  For instance Spark writing 
> to hbase or Spark doing http puts on a service.  Many times you want to do 
> this without limiting the number of partitions. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22227) DiskBlockManager.getAllBlocks could fail if called during shuffle

2017-10-09 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-7?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197303#comment-16197303
 ] 

Apache Spark commented on SPARK-7:
--

User 'superbobry' has created a pull request for this issue:
https://github.com/apache/spark/pull/19458

> DiskBlockManager.getAllBlocks could fail if called during shuffle
> -
>
> Key: SPARK-7
> URL: https://issues.apache.org/jira/browse/SPARK-7
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager
>Affects Versions: 2.2.0
>Reporter: Sergei Lebedev
>Priority: Minor
>
> {{DiskBlockManager.getAllBlocks}} assumes that the directories managed by the 
> block manager only contains files corresponding to "valid" block IDs, i.e. 
> those parsable via {{BlockId.apply}}. This is not always the case as 
> demonstrated by the following snippet
> {code}
> object GetAllBlocksFailure {
>   def main(args: Array[String]): Unit = {
> val sc = new SparkContext(new SparkConf()
> .setMaster("local[*]")
> .setAppName("demo"))
> new Thread {
>   override def run(): Unit = {
> while (true) {
>   
> println(SparkEnv.get.blockManager.diskBlockManager.getAllBlocks().length)
>   Thread.sleep(10)
> }
>   }
> }.start()
> val rdd = sc.range(1, 65536, numSlices = 10)
> .map(x => (x % 4096, x))
> .persist(StorageLevel.DISK_ONLY)
> .reduceByKey { _ + _ }
> .collect()
>   }
> }
> {code}
> We have a thread computing the number of bytes occupied by the block manager 
> on-disk and it frequently crashes due to this assumption being violated. 
> Relevant part of the stacktrace
> {code}
> 2017-10-06 11:20:14,287 ERROR  
> org.apache.spark.util.SparkUncaughtExceptionHandler: Uncaught exception in 
> thread Thread[CoarseGrainedExecutorBackend-stop-executor,5,main]
> java.lang.IllegalStateException: Unrecognized BlockId: 
> shuffle_1_2466_0.data.5684dd9e-9fa2-42f5-9dd2-051474e372be
> at org.apache.spark.storage.BlockId$.apply(BlockId.scala:133)
> at 
> org.apache.spark.storage.DiskBlockManager$$anonfun$getAllBlocks$1.apply(DiskBlockManager.scala:103)
> at 
> org.apache.spark.storage.DiskBlockManager$$anonfun$getAllBlocks$1.apply(DiskBlockManager.scala:103)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at scala.collection.mutable.ArraySeq.foreach(ArraySeq.scala:73)
> at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
> at 
> org.apache.spark.storage.DiskBlockManager.getAllBlocks(DiskBlockManager.scala:103)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-22227) DiskBlockManager.getAllBlocks could fail if called during shuffle

2017-10-09 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-7?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-7:


Assignee: (was: Apache Spark)

> DiskBlockManager.getAllBlocks could fail if called during shuffle
> -
>
> Key: SPARK-7
> URL: https://issues.apache.org/jira/browse/SPARK-7
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager
>Affects Versions: 2.2.0
>Reporter: Sergei Lebedev
>Priority: Minor
>
> {{DiskBlockManager.getAllBlocks}} assumes that the directories managed by the 
> block manager only contains files corresponding to "valid" block IDs, i.e. 
> those parsable via {{BlockId.apply}}. This is not always the case as 
> demonstrated by the following snippet
> {code}
> object GetAllBlocksFailure {
>   def main(args: Array[String]): Unit = {
> val sc = new SparkContext(new SparkConf()
> .setMaster("local[*]")
> .setAppName("demo"))
> new Thread {
>   override def run(): Unit = {
> while (true) {
>   
> println(SparkEnv.get.blockManager.diskBlockManager.getAllBlocks().length)
>   Thread.sleep(10)
> }
>   }
> }.start()
> val rdd = sc.range(1, 65536, numSlices = 10)
> .map(x => (x % 4096, x))
> .persist(StorageLevel.DISK_ONLY)
> .reduceByKey { _ + _ }
> .collect()
>   }
> }
> {code}
> We have a thread computing the number of bytes occupied by the block manager 
> on-disk and it frequently crashes due to this assumption being violated. 
> Relevant part of the stacktrace
> {code}
> 2017-10-06 11:20:14,287 ERROR  
> org.apache.spark.util.SparkUncaughtExceptionHandler: Uncaught exception in 
> thread Thread[CoarseGrainedExecutorBackend-stop-executor,5,main]
> java.lang.IllegalStateException: Unrecognized BlockId: 
> shuffle_1_2466_0.data.5684dd9e-9fa2-42f5-9dd2-051474e372be
> at org.apache.spark.storage.BlockId$.apply(BlockId.scala:133)
> at 
> org.apache.spark.storage.DiskBlockManager$$anonfun$getAllBlocks$1.apply(DiskBlockManager.scala:103)
> at 
> org.apache.spark.storage.DiskBlockManager$$anonfun$getAllBlocks$1.apply(DiskBlockManager.scala:103)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at scala.collection.mutable.ArraySeq.foreach(ArraySeq.scala:73)
> at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
> at 
> org.apache.spark.storage.DiskBlockManager.getAllBlocks(DiskBlockManager.scala:103)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-22227) DiskBlockManager.getAllBlocks could fail if called during shuffle

2017-10-09 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-7?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-7:


Assignee: Apache Spark

> DiskBlockManager.getAllBlocks could fail if called during shuffle
> -
>
> Key: SPARK-7
> URL: https://issues.apache.org/jira/browse/SPARK-7
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager
>Affects Versions: 2.2.0
>Reporter: Sergei Lebedev
>Assignee: Apache Spark
>Priority: Minor
>
> {{DiskBlockManager.getAllBlocks}} assumes that the directories managed by the 
> block manager only contains files corresponding to "valid" block IDs, i.e. 
> those parsable via {{BlockId.apply}}. This is not always the case as 
> demonstrated by the following snippet
> {code}
> object GetAllBlocksFailure {
>   def main(args: Array[String]): Unit = {
> val sc = new SparkContext(new SparkConf()
> .setMaster("local[*]")
> .setAppName("demo"))
> new Thread {
>   override def run(): Unit = {
> while (true) {
>   
> println(SparkEnv.get.blockManager.diskBlockManager.getAllBlocks().length)
>   Thread.sleep(10)
> }
>   }
> }.start()
> val rdd = sc.range(1, 65536, numSlices = 10)
> .map(x => (x % 4096, x))
> .persist(StorageLevel.DISK_ONLY)
> .reduceByKey { _ + _ }
> .collect()
>   }
> }
> {code}
> We have a thread computing the number of bytes occupied by the block manager 
> on-disk and it frequently crashes due to this assumption being violated. 
> Relevant part of the stacktrace
> {code}
> 2017-10-06 11:20:14,287 ERROR  
> org.apache.spark.util.SparkUncaughtExceptionHandler: Uncaught exception in 
> thread Thread[CoarseGrainedExecutorBackend-stop-executor,5,main]
> java.lang.IllegalStateException: Unrecognized BlockId: 
> shuffle_1_2466_0.data.5684dd9e-9fa2-42f5-9dd2-051474e372be
> at org.apache.spark.storage.BlockId$.apply(BlockId.scala:133)
> at 
> org.apache.spark.storage.DiskBlockManager$$anonfun$getAllBlocks$1.apply(DiskBlockManager.scala:103)
> at 
> org.apache.spark.storage.DiskBlockManager$$anonfun$getAllBlocks$1.apply(DiskBlockManager.scala:103)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at scala.collection.mutable.ArraySeq.foreach(ArraySeq.scala:73)
> at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
> at 
> org.apache.spark.storage.DiskBlockManager.getAllBlocks(DiskBlockManager.scala:103)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22226) splitExpression can create too many method calls (generating a Constant Pool limit error)

2017-10-09 Thread Marco Gaido (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197286#comment-16197286
 ] 

Marco Gaido commented on SPARK-6:
-

Exactly [~kiszk], sorry for the bad initial title of the JIRA. Do you think I 
can/should reopen this JIRA and submit the PR then?

> splitExpression can create too many method calls (generating a Constant Pool 
> limit error)
> -
>
> Key: SPARK-6
> URL: https://issues.apache.org/jira/browse/SPARK-6
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Marco Gaido
>
> Code generation for very wide datasets can fail because of the Constant Pool 
> limit reached.
> This can be caused by many reasons. One of them is that we are currently 
> splitting the definition of the generated methods among several 
> {{NestedClass}} but all these methods are called in the main class. Since we 
> have entries added to the constant pool for each method invocation, this is 
> limiting the number of rows and is leading for very wide dataset to:
> {noformat}
> org.codehaus.janino.JaninoRuntimeException: Constant pool for class 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection
>  has grown past JVM limit of 0x
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22227) DiskBlockManager.getAllBlocks could fail if called during shuffle

2017-10-09 Thread Sergei Lebedev (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-7?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197275#comment-16197275
 ] 

Sergei Lebedev commented on SPARK-7:


Sidenote: the trace above is caused by the temporary file created by 
[SortShuffleWriter|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala#L69].
 Sometimes we also saw failures containing {{TempShuffleBlockId}} names.

> DiskBlockManager.getAllBlocks could fail if called during shuffle
> -
>
> Key: SPARK-7
> URL: https://issues.apache.org/jira/browse/SPARK-7
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager
>Affects Versions: 2.2.0
>Reporter: Sergei Lebedev
>Priority: Minor
>
> {{DiskBlockManager.getAllBlocks}} assumes that the directories managed by the 
> block manager only contains files corresponding to "valid" block IDs, i.e. 
> those parsable via {{BlockId.apply}}. This is not always the case as 
> demonstrated by the following snippet
> {code}
> object GetAllBlocksFailure {
>   def main(args: Array[String]): Unit = {
> val sc = new SparkContext(new SparkConf()
> .setMaster("local[*]")
> .setAppName("demo"))
> new Thread {
>   override def run(): Unit = {
> while (true) {
>   
> println(SparkEnv.get.blockManager.diskBlockManager.getAllBlocks().length)
>   Thread.sleep(10)
> }
>   }
> }.start()
> val rdd = sc.range(1, 65536, numSlices = 10)
> .map(x => (x % 4096, x))
> .persist(StorageLevel.DISK_ONLY)
> .reduceByKey { _ + _ }
> .collect()
>   }
> }
> {code}
> We have a thread computing the number of bytes occupied by the block manager 
> on-disk and it frequently crashes due to this assumption being violated. 
> Relevant part of the stacktrace
> {code}
> 2017-10-06 11:20:14,287 ERROR  
> org.apache.spark.util.SparkUncaughtExceptionHandler: Uncaught exception in 
> thread Thread[CoarseGrainedExecutorBackend-stop-executor,5,main]
> java.lang.IllegalStateException: Unrecognized BlockId: 
> shuffle_1_2466_0.data.5684dd9e-9fa2-42f5-9dd2-051474e372be
> at org.apache.spark.storage.BlockId$.apply(BlockId.scala:133)
> at 
> org.apache.spark.storage.DiskBlockManager$$anonfun$getAllBlocks$1.apply(DiskBlockManager.scala:103)
> at 
> org.apache.spark.storage.DiskBlockManager$$anonfun$getAllBlocks$1.apply(DiskBlockManager.scala:103)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at scala.collection.mutable.ArraySeq.foreach(ArraySeq.scala:73)
> at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
> at 
> org.apache.spark.storage.DiskBlockManager.getAllBlocks(DiskBlockManager.scala:103)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22226) splitExpression can create too many method calls (generating a Constant Pool limit error)

2017-10-09 Thread Kazuaki Ishizaki (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197272#comment-16197272
 ] 

Kazuaki Ishizaki commented on SPARK-6:
--

You are right. [This PR|https://github.com/apache/spark/pull/16648] will not 
solve the issue regarding a lot of splited methods. I missed the discussion we 
did [here|https://github.com/apache/spark/pull/19447].

> splitExpression can create too many method calls (generating a Constant Pool 
> limit error)
> -
>
> Key: SPARK-6
> URL: https://issues.apache.org/jira/browse/SPARK-6
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Marco Gaido
>
> Code generation for very wide datasets can fail because of the Constant Pool 
> limit reached.
> This can be caused by many reasons. One of them is that we are currently 
> splitting the definition of the generated methods among several 
> {{NestedClass}} but all these methods are called in the main class. Since we 
> have entries added to the constant pool for each method invocation, this is 
> limiting the number of rows and is leading for very wide dataset to:
> {noformat}
> org.codehaus.janino.JaninoRuntimeException: Constant pool for class 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection
>  has grown past JVM limit of 0x
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-22227) DiskBlockManager.getAllBlocks could fail if called during shuffle

2017-10-09 Thread Sergei Lebedev (JIRA)
Sergei Lebedev created SPARK-7:
--

 Summary: DiskBlockManager.getAllBlocks could fail if called during 
shuffle
 Key: SPARK-7
 URL: https://issues.apache.org/jira/browse/SPARK-7
 Project: Spark
  Issue Type: Bug
  Components: Block Manager
Affects Versions: 2.2.0
Reporter: Sergei Lebedev
Priority: Minor


{{DiskBlockManager.getAllBlocks}} assumes that the directories managed by the 
block manager only contains files corresponding to "valid" block IDs, i.e. 
those parsable via {{BlockId.apply}}. This is not always the case as 
demonstrated by the following snippet

{code}
object GetAllBlocksFailure {
  def main(args: Array[String]): Unit = {
val sc = new SparkContext(new SparkConf()
.setMaster("local[*]")
.setAppName("demo"))

new Thread {
  override def run(): Unit = {
while (true) {
  
println(SparkEnv.get.blockManager.diskBlockManager.getAllBlocks().length)
  Thread.sleep(10)
}
  }
}.start()

val rdd = sc.range(1, 65536, numSlices = 10)
.map(x => (x % 4096, x))
.persist(StorageLevel.DISK_ONLY)
.reduceByKey { _ + _ }
.collect()
  }
}
{code}

We have a thread computing the number of bytes occupied by the block manager 
on-disk and it frequently crashes due to this assumption being violated. 
Relevant part of the stacktrace

{code}
2017-10-06 11:20:14,287 ERROR  
org.apache.spark.util.SparkUncaughtExceptionHandler: Uncaught exception in 
thread Thread[CoarseGrainedExecutorBackend-stop-executor,5,main]
java.lang.IllegalStateException: Unrecognized BlockId: 
shuffle_1_2466_0.data.5684dd9e-9fa2-42f5-9dd2-051474e372be
at org.apache.spark.storage.BlockId$.apply(BlockId.scala:133)
at 
org.apache.spark.storage.DiskBlockManager$$anonfun$getAllBlocks$1.apply(DiskBlockManager.scala:103)
at 
org.apache.spark.storage.DiskBlockManager$$anonfun$getAllBlocks$1.apply(DiskBlockManager.scala:103)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.mutable.ArraySeq.foreach(ArraySeq.scala:73)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at 
org.apache.spark.storage.DiskBlockManager.getAllBlocks(DiskBlockManager.scala:103)
{code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-22226) splitExpression can create too many method calls (generating a Constant Pool limit error)

2017-10-09 Thread Marco Gaido (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-6?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Marco Gaido updated SPARK-6:

Summary: splitExpression can create too many method calls (generating a 
Constant Pool limit error)  (was: Code generation fails for dataframes with 
1 columns)

> splitExpression can create too many method calls (generating a Constant Pool 
> limit error)
> -
>
> Key: SPARK-6
> URL: https://issues.apache.org/jira/browse/SPARK-6
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Marco Gaido
>
> Code generation for very wide datasets can fail because of the Constant Pool 
> limit reached.
> This can be caused by many reasons. One of them is that we are currently 
> splitting the definition of the generated methods among several 
> {{NestedClass}} but all these methods are called in the main class. Since we 
> have entries added to the constant pool for each method invocation, this is 
> limiting the number of rows and is leading for very wide dataset to:
> {noformat}
> org.codehaus.janino.JaninoRuntimeException: Constant pool for class 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection
>  has grown past JVM limit of 0x
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22226) Code generation fails for dataframes with 10000 columns

2017-10-09 Thread Marco Gaido (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197254#comment-16197254
 ] 

Marco Gaido commented on SPARK-6:
-

[~kiszk] I am not sure that the PR you mentioned solves the same issue. I tried 
it and currently it doesn't.
As you can see in [the branch I 
prepared|https://github.com/mgaido91/spark/tree/SPARK-6] what I am changing 
is different from what is done in that PR. Despite this, maybe that PR will 
include also a solution to this, of course I don't know what it is going to be 
like.
As [~srowen] pointed out, I choose a bad title for the JIRA. I am updating it 
with a better one.

> Code generation fails for dataframes with 1 columns
> ---
>
> Key: SPARK-6
> URL: https://issues.apache.org/jira/browse/SPARK-6
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Marco Gaido
>
> Code generation for very wide datasets can fail because of the Constant Pool 
> limit reached.
> This can be caused by many reasons. One of them is that we are currently 
> splitting the definition of the generated methods among several 
> {{NestedClass}} but all these methods are called in the main class. Since we 
> have entries added to the constant pool for each method invocation, this is 
> limiting the number of rows and is leading for very wide dataset to:
> {noformat}
> org.codehaus.janino.JaninoRuntimeException: Constant pool for class 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection
>  has grown past JVM limit of 0x
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22226) Code generation fails for dataframes with 10000 columns

2017-10-09 Thread Kazuaki Ishizaki (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197164#comment-16197164
 ] 

Kazuaki Ishizaki commented on SPARK-6:
--

[This PR|https://github.com/apache/spark/pull/16648] addresses such an issue.

> Code generation fails for dataframes with 1 columns
> ---
>
> Key: SPARK-6
> URL: https://issues.apache.org/jira/browse/SPARK-6
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Marco Gaido
>
> Code generation for very wide datasets can fail because of the Constant Pool 
> limit reached.
> This can be caused by many reasons. One of them is that we are currently 
> splitting the definition of the generated methods among several 
> {{NestedClass}} but all these methods are called in the main class. Since we 
> have entries added to the constant pool for each method invocation, this is 
> limiting the number of rows and is leading for very wide dataset to:
> {noformat}
> org.codehaus.janino.JaninoRuntimeException: Constant pool for class 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection
>  has grown past JVM limit of 0x
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22226) Code generation fails for dataframes with 10000 columns

2017-10-09 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197121#comment-16197121
 ] 

Sean Owen commented on SPARK-6:
---

If it's truly different I'd try to edit the title/description to separate it 
from the issue of just having 1+ cols. I'd also check with the people 
working on the PR about your change, if possible. If it's really something else 
reopen this one.

> Code generation fails for dataframes with 1 columns
> ---
>
> Key: SPARK-6
> URL: https://issues.apache.org/jira/browse/SPARK-6
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Marco Gaido
>
> Code generation for very wide datasets can fail because of the Constant Pool 
> limit reached.
> This can be caused by many reasons. One of them is that we are currently 
> splitting the definition of the generated methods among several 
> {{NestedClass}} but all these methods are called in the main class. Since we 
> have entries added to the constant pool for each method invocation, this is 
> limiting the number of rows and is leading for very wide dataset to:
> {noformat}
> org.codehaus.janino.JaninoRuntimeException: Constant pool for class 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection
>  has grown past JVM limit of 0x
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22226) Code generation fails for dataframes with 10000 columns

2017-10-09 Thread Marco Gaido (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197111#comment-16197111
 ] 

Marco Gaido commented on SPARK-6:
-

I am not sure about what the current open PR is going to address: in the 
current state it doesn't solve the problem I am facing and I'd like to address 
with the PR I have prepared.
I think that there are many issues about code generation and many things in the 
current implementation which limit the scalability on the number of columns. 
Therefore I guess that there are cases which need to be handled differently.
Anyway, since that PR is not yet ready, I am unable to state what it will 
address and what it won't.
The only thing that I can say is that as you can see from my branch 
(https://github.com/mgaido91/spark/commits/SPARK-6), I am doing something 
completely different to what is done in the open PR.

> Code generation fails for dataframes with 1 columns
> ---
>
> Key: SPARK-6
> URL: https://issues.apache.org/jira/browse/SPARK-6
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Marco Gaido
>
> Code generation for very wide datasets can fail because of the Constant Pool 
> limit reached.
> This can be caused by many reasons. One of them is that we are currently 
> splitting the definition of the generated methods among several 
> {{NestedClass}} but all these methods are called in the main class. Since we 
> have entries added to the constant pool for each method invocation, this is 
> limiting the number of rows and is leading for very wide dataset to:
> {noformat}
> org.codehaus.janino.JaninoRuntimeException: Constant pool for class 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection
>  has grown past JVM limit of 0x
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Issue Comment Deleted] (SPARK-22226) Code generation fails for dataframes with 10000 columns

2017-10-09 Thread Sean Owen (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-6?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Owen updated SPARK-6:
--
Comment: was deleted

(was: Would the resolution to the linked issue not resolve this? because it's 
already pretty far along, I don't know if it's useful to solve specific cases 
differently.)

> Code generation fails for dataframes with 1 columns
> ---
>
> Key: SPARK-6
> URL: https://issues.apache.org/jira/browse/SPARK-6
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Marco Gaido
>
> Code generation for very wide datasets can fail because of the Constant Pool 
> limit reached.
> This can be caused by many reasons. One of them is that we are currently 
> splitting the definition of the generated methods among several 
> {{NestedClass}} but all these methods are called in the main class. Since we 
> have entries added to the constant pool for each method invocation, this is 
> limiting the number of rows and is leading for very wide dataset to:
> {noformat}
> org.codehaus.janino.JaninoRuntimeException: Constant pool for class 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection
>  has grown past JVM limit of 0x
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22226) Code generation fails for dataframes with 10000 columns

2017-10-09 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197082#comment-16197082
 ] 

Sean Owen commented on SPARK-6:
---

Would the resolution to the linked issue not resolve this? because it's already 
pretty far along, I don't know if it's useful to solve specific cases 
differently.

> Code generation fails for dataframes with 1 columns
> ---
>
> Key: SPARK-6
> URL: https://issues.apache.org/jira/browse/SPARK-6
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Marco Gaido
>
> Code generation for very wide datasets can fail because of the Constant Pool 
> limit reached.
> This can be caused by many reasons. One of them is that we are currently 
> splitting the definition of the generated methods among several 
> {{NestedClass}} but all these methods are called in the main class. Since we 
> have entries added to the constant pool for each method invocation, this is 
> limiting the number of rows and is leading for very wide dataset to:
> {noformat}
> org.codehaus.janino.JaninoRuntimeException: Constant pool for class 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection
>  has grown past JVM limit of 0x
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22226) Code generation fails for dataframes with 10000 columns

2017-10-09 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197081#comment-16197081
 ] 

Sean Owen commented on SPARK-6:
---

Would the resolution to the linked issue not resolve this? because it's already 
pretty far along, I don't know if it's useful to solve specific cases 
differently.

> Code generation fails for dataframes with 1 columns
> ---
>
> Key: SPARK-6
> URL: https://issues.apache.org/jira/browse/SPARK-6
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Marco Gaido
>
> Code generation for very wide datasets can fail because of the Constant Pool 
> limit reached.
> This can be caused by many reasons. One of them is that we are currently 
> splitting the definition of the generated methods among several 
> {{NestedClass}} but all these methods are called in the main class. Since we 
> have entries added to the constant pool for each method invocation, this is 
> limiting the number of rows and is leading for very wide dataset to:
> {noformat}
> org.codehaus.janino.JaninoRuntimeException: Constant pool for class 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection
>  has grown past JVM limit of 0x
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22226) Code generation fails for dataframes with 10000 columns

2017-10-09 Thread Marco Gaido (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197036#comment-16197036
 ] 

Marco Gaido commented on SPARK-6:
-

[~srowen] I know that there are many ticket for this, but I wanted to submit a 
PR to solve this specific issue I mentioned in the description. This won't 
solve completely the error above (for datasets with 20.000 of columns for 
instance), but it will allow to support a larger number of columns than now. 
Thus I created this JIRA for that PR. If this is not the right approach, may 
you please tell me what I should do?
Thanks.

> Code generation fails for dataframes with 1 columns
> ---
>
> Key: SPARK-6
> URL: https://issues.apache.org/jira/browse/SPARK-6
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Marco Gaido
>
> Code generation for very wide datasets can fail because of the Constant Pool 
> limit reached.
> This can be caused by many reasons. One of them is that we are currently 
> splitting the definition of the generated methods among several 
> {{NestedClass}} but all these methods are called in the main class. Since we 
> have entries added to the constant pool for each method invocation, this is 
> limiting the number of rows and is leading for very wide dataset to:
> {noformat}
> org.codehaus.janino.JaninoRuntimeException: Constant pool for class 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection
>  has grown past JVM limit of 0x
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-22226) Code generation fails for dataframes with 10000 columns

2017-10-09 Thread Sean Owen (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-6?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Owen resolved SPARK-6.
---
Resolution: Duplicate

Duplicate of several

> Code generation fails for dataframes with 1 columns
> ---
>
> Key: SPARK-6
> URL: https://issues.apache.org/jira/browse/SPARK-6
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Marco Gaido
>
> Code generation for very wide datasets can fail because of the Constant Pool 
> limit reached.
> This can be caused by many reasons. One of them is that we are currently 
> splitting the definition of the generated methods among several 
> {{NestedClass}} but all these methods are called in the main class. Since we 
> have entries added to the constant pool for each method invocation, this is 
> limiting the number of rows and is leading for very wide dataset to:
> {noformat}
> org.codehaus.janino.JaninoRuntimeException: Constant pool for class 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection
>  has grown past JVM limit of 0x
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-22226) Code generation fails for dataframes with 10000 columns

2017-10-09 Thread Marco Gaido (JIRA)
Marco Gaido created SPARK-6:
---

 Summary: Code generation fails for dataframes with 1 columns
 Key: SPARK-6
 URL: https://issues.apache.org/jira/browse/SPARK-6
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.2.0
Reporter: Marco Gaido


Code generation for very wide datasets can fail because of the Constant Pool 
limit reached.

This can be caused by many reasons. One of them is that we are currently 
splitting the definition of the generated methods among several {{NestedClass}} 
but all these methods are called in the main class. Since we have entries added 
to the constant pool for each method invocation, this is limiting the number of 
rows and is leading for very wide dataset to:

{noformat}
org.codehaus.janino.JaninoRuntimeException: Constant pool for class 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection
 has grown past JVM limit of 0x
{noformat}




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22225) wholeTextFilesIterators

2017-10-09 Thread Hyukjin Kwon (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16196952#comment-16196952
 ] 

Hyukjin Kwon commented on SPARK-5:
--

Couldn't we do this via {{sc.binaryFiles}} or 
{{spark.format("text").read("...").selectExpr("value", "input_file_name()")}}.

For the latter, there is a JIRA for the delimiter support - SPARK-21289.

> wholeTextFilesIterators
> ---
>
> Key: SPARK-5
> URL: https://issues.apache.org/jira/browse/SPARK-5
> Project: Spark
>  Issue Type: New Feature
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: sam
>
> It is a very common use case to want to preserve a path -> file mapping in an 
> RDD, or read an entire file in one go.  Especially for unstructured data and 
> ETL.
> Currently wholeTextFiles is the goto method for this, but it read the entire 
> file into memory, which is sometimes an issue (see SPARK-18965).  It also 
> precludes the option to lazily process files.
> It would be nice to have a method with the following signature:
> {code}
> def wholeTextFilesIterators(
> path: String,
> minPartitions: Int = defaultMinPartitions,
> delimiter: String = "\n"): RDD[(String, Iterator[String])]
> {code}
> Where each `Iterator[String]` is a lazy file iterator where each string is 
> delimited by the `delimiter` field.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18965) wholeTextFiles() is not able to read large files

2017-10-09 Thread sam (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16196872#comment-16196872
 ] 

sam commented on SPARK-18965:
-

[~pradeep_misra] [~srowen].  Yes it's a new feature.  What we need is this: 
https://issues.apache.org/jira/browse/SPARK-5

> wholeTextFiles() is not able to read large files
> 
>
> Key: SPARK-18965
> URL: https://issues.apache.org/jira/browse/SPARK-18965
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.6.2
> Environment: All Platforms
>Reporter: Pradeep Misra
>  Labels: ReadFile
>   Original Estimate: 1,344h
>  Remaining Estimate: 1,344h
>
> While working on wholeTextFiles() of size  134738099 (gz compressed) spark 
> throws an OOM error.
> ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
> java.lang.OutOfMemoryError
> at 
> java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
> at 
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
> at org.spark-project.guava.io.ByteStreams.copy(ByteStreams.java:211)
> at 
> org.spark-project.guava.io.ByteStreams.toByteArray(ByteStreams.java:252)
> at 
> org.apache.spark.input.WholeTextFileRecordReader.nextKeyValue(WholeTextFileRecordReader.scala:81)
> at 
> org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader.nextKeyValue(CombineFileRecordReader.java:65)
> at 
> org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:168)
> at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1631)
> at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1164)
> at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1164)
> at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1882)
> at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1882)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> 16/11/30 14:25:36 ERROR SparkUncaughtExceptionHandler: Uncaught exception in 
> thread Thread[Executor task launch worker-0,5,main]
> java.lang.OutOfMemoryError
> at 
> java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
> at 
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
> at org.spark-project.guava.io.ByteStreams.copy(ByteStreams.java:211)
> at 
> org.spark-project.guava.io.ByteStreams.toByteArray(ByteStreams.java:252)
> at 
> org.apache.spark.input.WholeTextFileRecordReader.nextKeyValue(WholeTextFileRecordReader.scala:81)
> at 
> org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader.nextKeyValue(CombineFileRecordReader.java:65)
> at 
> org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:168)
> at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1631)
> at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1164)
> at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1164)
> at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1882)
> at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1882)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at 

[jira] [Created] (SPARK-22225) wholeTextFilesIterators

2017-10-09 Thread sam (JIRA)
sam created SPARK-5:
---

 Summary: wholeTextFilesIterators
 Key: SPARK-5
 URL: https://issues.apache.org/jira/browse/SPARK-5
 Project: Spark
  Issue Type: New Feature
  Components: Spark Core
Affects Versions: 2.2.0
Reporter: sam


It is a very common use case to want to preserve a path -> file mapping in an 
RDD, or read an entire file in one go.  Especially for unstructured data and 
ETL.

Currently wholeTextFiles is the goto method for this, but it read the entire 
file into memory, which is sometimes an issue (see SPARK-18965).  It also 
precludes the option to lazily process files.

It would be nice to have a method with the following signature:

{code}
def wholeTextFilesIterators(
path: String,
minPartitions: Int = defaultMinPartitions,
delimiter: String = "\n"): RDD[(String, Iterator[String])]
{code}

Where each `Iterator[String]` is a lazy file iterator where each string is 
delimited by the `delimiter` field.






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18170) Confusing error message when using rangeBetween without specifying an "orderBy"

2017-10-09 Thread Hyukjin Kwon (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16196839#comment-16196839
 ] 

Hyukjin Kwon commented on SPARK-18170:
--

{code}
org.apache.spark.sql.AnalysisException: cannot resolve '(RANGE BETWEEN CURRENT 
ROW AND 1L FOLLOWING)' due to data type mismatch: A range window frame cannot 
be used in an unordered window specification.;;
'Project [sum(id#0L) windowspecdefinition(specifiedwindowframe(RangeFrame, 
currentrow$(), 1)) AS sum(id) OVER (RANGE BETWEEN CURRENT ROW AND 1 
FOLLOWING)#4]
+- Range (1, 3, step=1, splits=Some(8))
{code}

Looks it throws an different message now in the current master. 

> Confusing error message when using rangeBetween without specifying an 
> "orderBy"
> ---
>
> Key: SPARK-18170
> URL: https://issues.apache.org/jira/browse/SPARK-18170
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Reporter: Weiluo Ren
>Priority: Minor
>
> {code}
> spark.range(1,3).select(sum('id) over Window.rangeBetween(0,1)).show
> {code}
> throws runtime exception:
> {code}
> Non-Zero range offsets are not supported for windows with multiple order 
> expressions.
> {code}
> which is confusing in this case because we don't have any order expression 
> here.
> How about add a check on
> {code}
> orderSpec.isEmpty
> {code}
> at 
> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala#L141
> and throw an exception saying "no order expressions is specified"?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18233) Failed to deserialize the task

2017-10-09 Thread Hyukjin Kwon (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16196833#comment-16196833
 ] 

Hyukjin Kwon commented on SPARK-18233:
--

Hi [~davies], do you maybe remember how to reproduce this or have more symptoms 
about this? I would like to look into this when I have some time. 

> Failed to deserialize the task
> --
>
> Key: SPARK-18233
> URL: https://issues.apache.org/jira/browse/SPARK-18233
> Project: Spark
>  Issue Type: Bug
>Reporter: Davies Liu
>
> {code}
> 16/11/02 18:36:32 ERROR Executor: Exception in task 652.0 in stage 27.0 (TID 
> 21101)
> java.io.InvalidClassException: org.apache.spark.executor.TaskMet; 
> serializable and externalizable flags conflict
>   at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:698)
>   at 
> java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:831)
>   at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1602)
>   at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
>   at 
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
>   at 
> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:253)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-17952) SparkSession createDataFrame method throws exception for nested JavaBeans

2017-10-09 Thread Hyukjin Kwon (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17952?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon updated SPARK-17952:
-
Affects Version/s: 2.3.0

> SparkSession createDataFrame method throws exception for nested JavaBeans
> -
>
> Key: SPARK-17952
> URL: https://issues.apache.org/jira/browse/SPARK-17952
> Project: Spark
>  Issue Type: Bug
>Affects Versions: 2.0.0, 2.0.1, 2.3.0
>Reporter: Amit Baghel
>
> As per latest spark documentation for Java at 
> http://spark.apache.org/docs/latest/sql-programming-guide.html#inferring-the-schema-using-reflection,
>  
> {quote}
> Nested JavaBeans and List or Array fields are supported though.
> {quote}
> However nested JavaBean is not working. Please see the below code.
> SubCategory class
> {code}
> public class SubCategory implements Serializable{
>   private String id;
>   private String name;
>   
>   public String getId() {
>   return id;
>   }
>   public void setId(String id) {
>   this.id = id;
>   }
>   public String getName() {
>   return name;
>   }
>   public void setName(String name) {
>   this.name = name;
>   }   
> }
> {code}
> Category class
> {code}
> public class Category implements Serializable{
>   private String id;
>   private SubCategory subCategory;
>   
>   public String getId() {
>   return id;
>   }
>   public void setId(String id) {
>   this.id = id;
>   }
>   public SubCategory getSubCategory() {
>   return subCategory;
>   }
>   public void setSubCategory(SubCategory subCategory) {
>   this.subCategory = subCategory;
>   }
> }
> {code}
> SparkSample class
> {code}
> public class SparkSample {
>   public static void main(String[] args) throws IOException { 
> 
>   SparkSession spark = SparkSession
>   .builder()
>   .appName("SparkSample")
>   .master("local")
>   .getOrCreate();
>   //SubCategory
>   SubCategory sub = new SubCategory();
>   sub.setId("sc-111");
>   sub.setName("Sub-1");
>   //Category
>   Category category = new Category();
>   category.setId("s-111");
>   category.setSubCategory(sub);
>   //categoryList
>   List categoryList = new ArrayList();
>   categoryList.add(category);
>//DF
>   Dataset dframe = spark.createDataFrame(categoryList, 
> Category.class);  
>   dframe.show();  
>   }
> }
> {code}
> Above code throws below error.
> {code}
> Exception in thread "main" scala.MatchError: com.sample.SubCategory@e7391d 
> (of class com.sample.SubCategory)
>   at 
> org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:256)
>   at 
> org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:251)
>   at 
> org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
>   at 
> org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:403)
>   at 
> org.apache.spark.sql.SQLContext$$anonfun$beansToRows$1$$anonfun$apply$1.apply(SQLContext.scala:1106)
>   at 
> org.apache.spark.sql.SQLContext$$anonfun$beansToRows$1$$anonfun$apply$1.apply(SQLContext.scala:1106)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
>   at 
> org.apache.spark.sql.SQLContext$$anonfun$beansToRows$1.apply(SQLContext.scala:1106)
>   at 
> org.apache.spark.sql.SQLContext$$anonfun$beansToRows$1.apply(SQLContext.scala:1104)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>   at scala.collection.Iterator$class.toStream(Iterator.scala:1322)
>   at scala.collection.AbstractIterator.toStream(Iterator.scala:1336)
>   at 
> scala.collection.TraversableOnce$class.toSeq(TraversableOnce.scala:298)
>   at scala.collection.AbstractIterator.toSeq(Iterator.scala:1336)
>

[jira] [Commented] (SPARK-17952) SparkSession createDataFrame method throws exception for nested JavaBeans

2017-10-09 Thread Hyukjin Kwon (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17952?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16196814#comment-16196814
 ] 

Hyukjin Kwon commented on SPARK-17952:
--

This still happens in the master.

> SparkSession createDataFrame method throws exception for nested JavaBeans
> -
>
> Key: SPARK-17952
> URL: https://issues.apache.org/jira/browse/SPARK-17952
> Project: Spark
>  Issue Type: Bug
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Amit Baghel
>
> As per latest spark documentation for Java at 
> http://spark.apache.org/docs/latest/sql-programming-guide.html#inferring-the-schema-using-reflection,
>  
> {quote}
> Nested JavaBeans and List or Array fields are supported though.
> {quote}
> However nested JavaBean is not working. Please see the below code.
> SubCategory class
> {code}
> public class SubCategory implements Serializable{
>   private String id;
>   private String name;
>   
>   public String getId() {
>   return id;
>   }
>   public void setId(String id) {
>   this.id = id;
>   }
>   public String getName() {
>   return name;
>   }
>   public void setName(String name) {
>   this.name = name;
>   }   
> }
> {code}
> Category class
> {code}
> public class Category implements Serializable{
>   private String id;
>   private SubCategory subCategory;
>   
>   public String getId() {
>   return id;
>   }
>   public void setId(String id) {
>   this.id = id;
>   }
>   public SubCategory getSubCategory() {
>   return subCategory;
>   }
>   public void setSubCategory(SubCategory subCategory) {
>   this.subCategory = subCategory;
>   }
> }
> {code}
> SparkSample class
> {code}
> public class SparkSample {
>   public static void main(String[] args) throws IOException { 
> 
>   SparkSession spark = SparkSession
>   .builder()
>   .appName("SparkSample")
>   .master("local")
>   .getOrCreate();
>   //SubCategory
>   SubCategory sub = new SubCategory();
>   sub.setId("sc-111");
>   sub.setName("Sub-1");
>   //Category
>   Category category = new Category();
>   category.setId("s-111");
>   category.setSubCategory(sub);
>   //categoryList
>   List categoryList = new ArrayList();
>   categoryList.add(category);
>//DF
>   Dataset dframe = spark.createDataFrame(categoryList, 
> Category.class);  
>   dframe.show();  
>   }
> }
> {code}
> Above code throws below error.
> {code}
> Exception in thread "main" scala.MatchError: com.sample.SubCategory@e7391d 
> (of class com.sample.SubCategory)
>   at 
> org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:256)
>   at 
> org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:251)
>   at 
> org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
>   at 
> org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:403)
>   at 
> org.apache.spark.sql.SQLContext$$anonfun$beansToRows$1$$anonfun$apply$1.apply(SQLContext.scala:1106)
>   at 
> org.apache.spark.sql.SQLContext$$anonfun$beansToRows$1$$anonfun$apply$1.apply(SQLContext.scala:1106)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
>   at 
> org.apache.spark.sql.SQLContext$$anonfun$beansToRows$1.apply(SQLContext.scala:1106)
>   at 
> org.apache.spark.sql.SQLContext$$anonfun$beansToRows$1.apply(SQLContext.scala:1104)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>   at scala.collection.Iterator$class.toStream(Iterator.scala:1322)
>   at scala.collection.AbstractIterator.toStream(Iterator.scala:1336)
>   at 
> scala.collection.TraversableOnce$class.toSeq(TraversableOnce.scala:298)
>   at 

[jira] [Resolved] (SPARK-17275) Flaky test: org.apache.spark.deploy.RPackageUtilsSuite.jars that don't exist are skipped and print warning

2017-10-09 Thread Hyukjin Kwon (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17275?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-17275.
--
Resolution: Not A Problem

Let me resolve this for now but I will keep my eyes on builds and will reopen 
this and submit a PR to ignore the deletion failure if this could be observed 
even once in the future.

> Flaky test: org.apache.spark.deploy.RPackageUtilsSuite.jars that don't exist 
> are skipped and print warning
> --
>
> Key: SPARK-17275
> URL: https://issues.apache.org/jira/browse/SPARK-17275
> Project: Spark
>  Issue Type: Bug
>  Components: SparkR
>Reporter: Yin Huai
>
> https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-sbt-hadoop-2.4/1623/testReport/junit/org.apache.spark.deploy/RPackageUtilsSuite/jars_that_don_t_exist_are_skipped_and_print_warning/
> {code}
> Error Message
> java.io.IOException: Unable to delete directory 
> /home/jenkins/.ivy2/cache/a/mylib.
> Stacktrace
> sbt.ForkMain$ForkError: java.io.IOException: Unable to delete directory 
> /home/jenkins/.ivy2/cache/a/mylib.
>   at org.apache.commons.io.FileUtils.deleteDirectory(FileUtils.java:1541)
>   at org.apache.commons.io.FileUtils.forceDelete(FileUtils.java:2270)
>   at org.apache.commons.io.FileUtils.cleanDirectory(FileUtils.java:1653)
>   at org.apache.commons.io.FileUtils.deleteDirectory(FileUtils.java:1535)
>   at 
> org.apache.spark.deploy.IvyTestUtils$.purgeLocalIvyCache(IvyTestUtils.scala:394)
>   at 
> org.apache.spark.deploy.IvyTestUtils$.withRepository(IvyTestUtils.scala:384)
>   at 
> org.apache.spark.deploy.RPackageUtilsSuite$$anonfun$3.apply$mcV$sp(RPackageUtilsSuite.scala:103)
>   at 
> org.apache.spark.deploy.RPackageUtilsSuite$$anonfun$3.apply(RPackageUtilsSuite.scala:100)
>   at 
> org.apache.spark.deploy.RPackageUtilsSuite$$anonfun$3.apply(RPackageUtilsSuite.scala:100)
>   at 
> org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
>   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
>   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
>   at org.scalatest.Transformer.apply(Transformer.scala:22)
>   at org.scalatest.Transformer.apply(Transformer.scala:20)
>   at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:166)
>   at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:57)
>   at 
> org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:163)
>   at 
> org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
>   at 
> org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
>   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
>   at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:175)
>   at 
> org.apache.spark.deploy.RPackageUtilsSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(RPackageUtilsSuite.scala:38)
>   at 
> org.scalatest.BeforeAndAfterEach$class.runTest(BeforeAndAfterEach.scala:255)
>   at 
> org.apache.spark.deploy.RPackageUtilsSuite.runTest(RPackageUtilsSuite.scala:38)
>   at 
> org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
>   at 
> org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
>   at 
> org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413)
>   at 
> org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
>   at 
> org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396)
>   at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483)
>   at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:208)
>   at org.scalatest.FunSuite.runTests(FunSuite.scala:1555)
>   at org.scalatest.Suite$class.run(Suite.scala:1424)
>   at 
> org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1555)
>   at 
> org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
>   at 
> org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
>   at org.scalatest.SuperEngine.runImpl(Engine.scala:545)
>   at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:212)
>   at 
> org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:29)
>   at 
> org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:257)
>   at 
> org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:256)
>   at 

[jira] [Resolved] (SPARK-17890) scala.ScalaReflectionException

2017-10-09 Thread Hyukjin Kwon (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17890?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-17890.
--
Resolution: Cannot Reproduce

I can't reproduce this by both {{spark-submit}} and {{spark-shell}}.

For {{spark-submit}}, I followed here 
https://spark.apache.org/docs/latest/quick-start.html#self-contained-applications
 and used the reproducer here:

{code}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession


object Main extends App{

  val conf = new SparkConf()
  conf.setMaster("local")
  val session = SparkSession.builder()
.config(conf)
.getOrCreate()

  import session.implicits._

  val df = session.sparkContext.parallelize(List(1,2,3)).toDF

  println("flatmapping ...")
  df.flatMap(_ => Seq.empty[Foo])

  println("mapping...")
  df.map(_ => Seq.empty[Foo]) //spark-submit fails here. Things work if I 
remove the toDF call

}

case class Foo(value:String)
{code}

> scala.ScalaReflectionException
> --
>
> Key: SPARK-17890
> URL: https://issues.apache.org/jira/browse/SPARK-17890
> Project: Spark
>  Issue Type: Bug
>Affects Versions: 2.0.1
> Environment: x86_64 GNU/Linux
> Java(TM) SE Runtime Environment (build 1.8.0_60-b27)
>Reporter: Khalid Reid
>Priority: Minor
>  Labels: newbie
>
> Hello,
> I am seeing an error message in spark-shell when I map a DataFrame to a 
> Seq\[Foo\].  However, things work fine when I use flatMap.  
> {noformat}
> scala> case class Foo(value:String)
> defined class Foo
> scala> val df = sc.parallelize(List(1,2,3)).toDF
> df: org.apache.spark.sql.DataFrame = [value: int]
> scala> df.map{x => Seq.empty[Foo]}
> scala.ScalaReflectionException: object $line14.$read not found.
>   at scala.reflect.internal.Mirrors$RootsBase.staticModule(Mirrors.scala:162)
>   at scala.reflect.internal.Mirrors$RootsBase.staticModule(Mirrors.scala:22)
>   at $typecreator1$1.apply(:29)
>   at 
> scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(TypeTags.scala:232)
>   at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:232)
>   at 
> org.apache.spark.sql.SQLImplicits$$typecreator9$1.apply(SQLImplicits.scala:125)
>   at 
> scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(TypeTags.scala:232)
>   at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:232)
>   at 
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:49)
>   at 
> org.apache.spark.sql.SQLImplicits.newProductSeqEncoder(SQLImplicits.scala:125)
>   ... 48 elided
> scala> df.flatMap{_ => Seq.empty[Foo]} //flatMap works
> res2: org.apache.spark.sql.Dataset[Foo] = [value: string]
> {noformat}
> I am seeing the same error reported 
> [here|https://issues.apache.org/jira/browse/SPARK-8465?jql=text%20~%20%22scala.ScalaReflectionException%22]
>  when I use spark-submit.
> I am new to Spark but I don't expect this to throw an exception.
> Thanks.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-22192) An RDD of nested POJO objects cannot be converted into a DataFrame using SQLContext.createDataFrame API

2017-10-09 Thread Sean Owen (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22192?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Owen updated SPARK-22192:
--
Target Version/s:   (was: 2.2.0)

> An RDD of nested POJO objects cannot be converted into a DataFrame using 
> SQLContext.createDataFrame API
> ---
>
> Key: SPARK-22192
> URL: https://issues.apache.org/jira/browse/SPARK-22192
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
> Environment: Independent of OS / platform
>Reporter: Asif Hussain Shahid
>Priority: Minor
>
> If an RDD contains nested POJO objects, then SQLContext.createDataFrame(RDD, 
> Class) api only handles the top level POJO object. It throws ScalaMatchError 
> exception when handling the nested POJO object as the code does not 
> recursively handle the nested POJOs.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-22222) Fix the ARRAY_MAX in BufferHolder and add a test

2017-10-09 Thread Sean Owen (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-2?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Owen updated SPARK-2:
--
Affects Version/s: (was: 2.2.1)
   2.3.0

> Fix the ARRAY_MAX in BufferHolder and add a test
> 
>
> Key: SPARK-2
> URL: https://issues.apache.org/jira/browse/SPARK-2
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: Feng Liu
>
> This is actually a followup for SPARK-22033, which set the `ARRAY_MAX` to 
> `Int.MaxValue - 8`. It is not a valid number because it will cause the 
> following line fail when such a large byte array is allocated: 
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java#L86
>  We need to make sure the new length is a multiple of 8.
> We need to add one test for the fix. Note that the test should work 
> independently with the heap size of the test JVM. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-17877) Can not checkpoint connectedComponents resulting graph

2017-10-09 Thread Hyukjin Kwon (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon updated SPARK-17877:
-
Affects Version/s: 2.3.0

> Can not checkpoint connectedComponents resulting graph
> --
>
> Key: SPARK-17877
> URL: https://issues.apache.org/jira/browse/SPARK-17877
> Project: Spark
>  Issue Type: Bug
>  Components: GraphX
>Affects Versions: 1.5.2, 1.6.2, 2.0.1, 2.3.0
>Reporter: Alexander Pivovarov
>Priority: Minor
>
> The following code demonstrates the issue
> {code}
> import org.apache.spark.graphx._
> val users = sc.parallelize(List(3L -> "lucas", 7L -> "john", 5L -> "matt", 2L 
> -> "kelly"))
> val rel = sc.parallelize(List(Edge(3L, 7L, "collab"), Edge(5L, 3L, 
> "advisor"), Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))
> sc.setCheckpointDir("/tmp/check")
> val g = Graph(users, rel)
> g.checkpoint   // /tmp/check/b1f46ba5-357a-4d6d-8f4d-411b64b27c2f appears
> val gg = g.connectedComponents
> gg.checkpoint
> gg.vertices.collect
> gg.edges.collect
> gg.isCheckpointed  // res5: Boolean = false,   /tmp/check still contains only 
> 1 folder b1f46ba5-357a-4d6d-8f4d-411b64b27c2f
> {code}
> I think the last line should return true instead of false



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17877) Can not checkpoint connectedComponents resulting graph

2017-10-09 Thread Hyukjin Kwon (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16196748#comment-16196748
 ] 

Hyukjin Kwon commented on SPARK-17877:
--

I tested this and the last line returned {{false}}.

> Can not checkpoint connectedComponents resulting graph
> --
>
> Key: SPARK-17877
> URL: https://issues.apache.org/jira/browse/SPARK-17877
> Project: Spark
>  Issue Type: Bug
>  Components: GraphX
>Affects Versions: 1.5.2, 1.6.2, 2.0.1, 2.3.0
>Reporter: Alexander Pivovarov
>Priority: Minor
>
> The following code demonstrates the issue
> {code}
> import org.apache.spark.graphx._
> val users = sc.parallelize(List(3L -> "lucas", 7L -> "john", 5L -> "matt", 2L 
> -> "kelly"))
> val rel = sc.parallelize(List(Edge(3L, 7L, "collab"), Edge(5L, 3L, 
> "advisor"), Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))
> sc.setCheckpointDir("/tmp/check")
> val g = Graph(users, rel)
> g.checkpoint   // /tmp/check/b1f46ba5-357a-4d6d-8f4d-411b64b27c2f appears
> val gg = g.connectedComponents
> gg.checkpoint
> gg.vertices.collect
> gg.edges.collect
> gg.isCheckpointed  // res5: Boolean = false,   /tmp/check still contains only 
> 1 folder b1f46ba5-357a-4d6d-8f4d-411b64b27c2f
> {code}
> I think the last line should return true instead of false



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17820) Spark sqlContext.sql() performs only first insert for HiveQL "FROM target INSERT INTO dest" command to insert into multiple target tables from same source

2017-10-09 Thread Hyukjin Kwon (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16196735#comment-16196735
 ] 

Hyukjin Kwon commented on SPARK-17820:
--

Hi [~kmbeyond], would you maybe be able to try this against higher version?

> Spark sqlContext.sql() performs only first insert for HiveQL "FROM target 
> INSERT INTO dest" command to insert into multiple target tables from same 
> source
> --
>
> Key: SPARK-17820
> URL: https://issues.apache.org/jira/browse/SPARK-17820
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.0
> Environment: Cloudera Quickstart VM 5.7
>Reporter: Kiran Miryala
>
> I am executing a HiveQL in spark-shell, I intend to insert a record into 2 
> destination tables from the same source table using same statement. But it 
> inserts in only first destination table. My statement:
> {noformat}
> scala>val departmentsData = sqlContext.sql("from sqoop_import.departments 
> insert into sqoop_import.names_count1 select department_name, count(1) where 
> department_id=2 group by department_name insert into 
> sqoop_import.names_count2 select department_name, count(1) where 
> department_id=4 group by department_name")
> {noformat}
> Same query inserts into both destination tables on hive shell:
> {noformat}
> from sqoop_import.departments 
> insert into sqoop_import.names_count1 
> select department_name, count(1) 
> where department_id=2 group by department_name 
> insert into sqoop_import.names_count2 
> select department_name, count(1) 
> where department_id=4 group by department_name;
> {noformat}
> Both target table definitions are:
> {noformat}
> hive>use sqoop_import;
> hive> create table names_count1 (department_name String, count Int);
> hive> create table names_count2 (department_name String, count Int);
> {noformat}
> Not sure why it is skipping next one.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-13030) Change OneHotEncoder to Estimator

2017-10-09 Thread Weichen Xu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16196717#comment-16196717
 ] 

Weichen Xu commented on SPARK-13030:


[~bago.amirbekian]
Multi-column means generate separated columns from multiple input columns. N 
input columns generate N output columns. If need join resulting vectors we can 
pipeline `VectorAssembler` afterwards, I think.
and +1 for the same `handleInvalid` semantics as StringIndexer.

> Change OneHotEncoder to Estimator
> -
>
> Key: SPARK-13030
> URL: https://issues.apache.org/jira/browse/SPARK-13030
> Project: Spark
>  Issue Type: Improvement
>  Components: ML
>Affects Versions: 1.6.0
>Reporter: Wojciech Jurczyk
>
> OneHotEncoder should be an Estimator, just like in scikit-learn 
> (http://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.OneHotEncoder.html).
> In its current form, it is impossible to use when number of categories is 
> different between training dataset and test dataset.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-22224) Override toString of KeyValueGroupedDataset & RelationalGroupedDataset

2017-10-09 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-4?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-4:


Assignee: Apache Spark

> Override toString of KeyValueGroupedDataset & RelationalGroupedDataset 
> ---
>
> Key: SPARK-4
> URL: https://issues.apache.org/jira/browse/SPARK-4
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Kent Yao
>Assignee: Apache Spark
>Priority: Minor
>
> scala> val words = spark.read.textFile("README.md").flatMap(_.split(" "))
> words: org.apache.spark.sql.Dataset[String] = [value: string]
> scala> val grouped = words.groupByKey(identity)
> grouped: org.apache.spark.sql.KeyValueGroupedDataset[String,String] = 
> org.apache.spark.sql.KeyValueGroupedDataset@65214862



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-22224) Override toString of KeyValueGroupedDataset & RelationalGroupedDataset

2017-10-09 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-4?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-4:


Assignee: (was: Apache Spark)

> Override toString of KeyValueGroupedDataset & RelationalGroupedDataset 
> ---
>
> Key: SPARK-4
> URL: https://issues.apache.org/jira/browse/SPARK-4
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Kent Yao
>Priority: Minor
>
> scala> val words = spark.read.textFile("README.md").flatMap(_.split(" "))
> words: org.apache.spark.sql.Dataset[String] = [value: string]
> scala> val grouped = words.groupByKey(identity)
> grouped: org.apache.spark.sql.KeyValueGroupedDataset[String,String] = 
> org.apache.spark.sql.KeyValueGroupedDataset@65214862



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22224) Override toString of KeyValueGroupedDataset & RelationalGroupedDataset

2017-10-09 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16196696#comment-16196696
 ] 

Apache Spark commented on SPARK-4:
--

User 'yaooqinn' has created a pull request for this issue:
https://github.com/apache/spark/pull/19363

> Override toString of KeyValueGroupedDataset & RelationalGroupedDataset 
> ---
>
> Key: SPARK-4
> URL: https://issues.apache.org/jira/browse/SPARK-4
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Kent Yao
>Priority: Minor
>
> scala> val words = spark.read.textFile("README.md").flatMap(_.split(" "))
> words: org.apache.spark.sql.Dataset[String] = [value: string]
> scala> val grouped = words.groupByKey(identity)
> grouped: org.apache.spark.sql.KeyValueGroupedDataset[String,String] = 
> org.apache.spark.sql.KeyValueGroupedDataset@65214862



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-22224) Override toString of KeyValueGroupedDataset & RelationalGroupedDataset

2017-10-09 Thread Kent Yao (JIRA)
Kent Yao created SPARK-4:


 Summary: Override toString of KeyValueGroupedDataset & 
RelationalGroupedDataset 
 Key: SPARK-4
 URL: https://issues.apache.org/jira/browse/SPARK-4
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.2.0
Reporter: Kent Yao
Priority: Minor



scala> val words = spark.read.textFile("README.md").flatMap(_.split(" "))
words: org.apache.spark.sql.Dataset[String] = [value: string]

scala> val grouped = words.groupByKey(identity)
grouped: org.apache.spark.sql.KeyValueGroupedDataset[String,String] = 
org.apache.spark.sql.KeyValueGroupedDataset@65214862




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-10802) Let ALS recommend for subset of data

2017-10-09 Thread Nick Pentreath (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16196673#comment-16196673
 ] 

Nick Pentreath commented on SPARK-10802:


SPARK-20679 has been completed for the new ML API. I've closed this as we won't 
be doing it in the RDD API as mentioned above.

> Let ALS recommend for subset of data
> 
>
> Key: SPARK-10802
> URL: https://issues.apache.org/jira/browse/SPARK-10802
> Project: Spark
>  Issue Type: Improvement
>  Components: MLlib
>Affects Versions: 1.5.0
>Reporter: Tomasz Bartczak
>Priority: Minor
>
> Currently MatrixFactorizationModel allows to get recommendations for
> - single user 
> - single product 
> - all users
> - all products
> recommendation for all users/products do a cartesian join inside.
> It would be useful in some cases to get recommendations for subset of 
> users/products by providing an RDD with which MatrixFactorizationModel could 
> do an intersection before doing a cartesian join. This would make it much 
> faster in situation where recommendations are needed only for subset of 
> users/products, and when the subset is still too large to make it feasible to 
> recommend one-by-one.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



  1   2   >