[jira] [Commented] (SPARK-21551) pyspark's collect fails when getaddrinfo is too slow

2017-10-17 Thread Frank Rosner (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16207220#comment-16207220
 ] 

Frank Rosner commented on SPARK-21551:
--

Do you guys mind if I backport this also to 2.0.x, 2.1.x, and 2.2.x? We are 
having some jobs that we don't want to upgrade to 2.3.0 but that are failing 
regularly because of this problem.

Which branches would that have to go to? branch-2.0, branch-2.1, and branch-2.2?

> pyspark's collect fails when getaddrinfo is too slow
> 
>
> Key: SPARK-21551
> URL: https://issues.apache.org/jira/browse/SPARK-21551
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.1.0
>Reporter: peay
>Assignee: peay
>Priority: Critical
> Fix For: 2.3.0
>
>
> Pyspark's {{RDD.collect}}, as well as {{DataFrame.toLocalIterator}} and 
> {{DataFrame.collect}} all work by starting an ephemeral server in the driver, 
> and having Python connect to it to download the data.
> All three are implemented along the lines of:
> {code}
> port = self._jdf.collectToPython()
> return list(_load_from_socket(port, BatchedSerializer(PickleSerializer(
> {code}
> The server has **a hardcoded timeout of 3 seconds** 
> (https://github.com/apache/spark/blob/e26dac5feb02033f980b1e69c9b0ff50869b6f9e/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala#L695)
>  -- i.e., the Python process has 3 seconds to connect to it from the very 
> moment the driver server starts.
> In general, that seems fine, but I have been encountering frequent timeouts 
> leading to `Exception: could not open socket`.
> After investigating a bit, it turns out that {{_load_from_socket}} makes a 
> call to {{getaddrinfo}}:
> {code}
> def _load_from_socket(port, serializer):
> sock = None
> # Support for both IPv4 and IPv6.
> # On most of IPv6-ready systems, IPv6 will take precedence.
> for res in socket.getaddrinfo("localhost", port, socket.AF_UNSPEC, 
> socket.SOCK_STREAM):
>.. connect ..
> {code}
> I am not sure why, but while most such calls to {{getaddrinfo}} on my machine 
> only take a couple milliseconds, about 10% of them take between 2 and 10 
> seconds, leading to about 10% of jobs failing. I don't think we can always 
> expect {{getaddrinfo}} to return instantly. More generally, Python may 
> sometimes pause for a couple seconds, which may not leave enough time for the 
> process to connect to the server.
> Especially since the server timeout is hardcoded, I think it would be best to 
> set a rather generous value (15 seconds?) to avoid such situations.
> A {{getaddrinfo}}  specific fix could avoid doing it every single time, or do 
> it before starting up the driver server.
>  
> cc SPARK-677 [~davies]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18649) sc.textFile(my_file).collect() raises socket.timeout on large files

2017-10-17 Thread Frank Rosner (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16207217#comment-16207217
 ] 

Frank Rosner commented on SPARK-18649:
--

Looks like in SPARK-21551 they increased the hard coded limit to 15 seconds.

> sc.textFile(my_file).collect() raises socket.timeout on large files
> ---
>
> Key: SPARK-18649
> URL: https://issues.apache.org/jira/browse/SPARK-18649
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
> Environment: PySpark version 1.6.2
>Reporter: Erik Cederstrand
>
> I'm trying to load a file into the driver with this code:
> contents = sc.textFile('hdfs://path/to/big_file.csv').collect()
> Loading into the driver instead of creating a distributed RDD is intentional 
> in this case. The file is ca. 6GB, and I have adjusted driver memory 
> accordingly to fit the local data. After some time, my spark/submitted job 
> crashes with the stack trace below.
> I have traced this to pyspark/rdd.py where the _load_from_socket() method 
> creates a socket with a hard-coded timeout of 3 seconds (this code is also 
> present in HEAD although I'm on PySpark 1.6.2). Raising this hard-coded value 
> to e.g. 600 lets me read the entire file.
> Is there any reason that this value does not use e.g. the 
> 'spark.network.timeout' setting instead?
> Traceback (most recent call last):
>   File "my_textfile_test.py", line 119, in 
> contents = sc.textFile('hdfs://path/to/file.csv').collect()
>   File "/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/rdd.py", 
> line 772, in collect
>   File "/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/rdd.py", 
> line 142, in _load_from_socket
>   File 
> "/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/serializers.py", 
> line 517, in load_stream
>   File 
> "/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/serializers.py", 
> line 511, in loads
>   File "/usr/lib/python2.7/socket.py", line 380, in read
> data = self._sock.recv(left)
> socket.timeout: timed out
> 16/11/30 13:33:14 WARN Utils: Suppressing exception in finally: Broken pipe
> java.net.SocketException: Broken pipe
>   at java.net.SocketOutputStream.socketWrite0(Native Method)
>   at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
>   at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
>   at 
> java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
>   at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
>   at java.io.DataOutputStream.flush(DataOutputStream.java:123)
>   at java.io.FilterOutputStream.close(FilterOutputStream.java:158)
>   at 
> org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$2.apply$mcV$sp(PythonRDD.scala:650)
>   at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1248)
>   at 
> org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:649)
>   Suppressed: java.net.SocketException: Broken pipe
>   at java.net.SocketOutputStream.socketWrite0(Native Method)
>   at 
> java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
>   at 
> java.net.SocketOutputStream.write(SocketOutputStream.java:153)
>   at 
> java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
>   at 
> java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
>   at java.io.FilterOutputStream.close(FilterOutputStream.java:158)
>   at java.io.FilterOutputStream.close(FilterOutputStream.java:159)
>   ... 3 more
> 16/11/30 13:33:14 ERROR PythonRDD: Error while sending iterator
> java.net.SocketException: Connection reset
>   at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113)
>   at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
>   at 
> java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
>   at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
>   at java.io.DataOutputStream.write(DataOutputStream.java:107)
>   at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
>   at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:622)
>   at 
> org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:442)
>   at 
> org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
>   at 
> org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>   at 
> 

[jira] [Commented] (SPARK-20489) Different results in local mode and yarn mode when working with dates (silent corruption due to system timezone setting)

2017-05-15 Thread Frank Rosner (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16010251#comment-16010251
 ] 

Frank Rosner commented on SPARK-20489:
--

The problem is that {{java.util.Date}} is an instant, not a date. I think 
having these problems happens by design when working with instants instead of 
real dates but then implicitly converting them from and to dates. I think one 
should be explicit when converting instants from and to dates, specifying the 
time zone.

Where in your code does it actually break?

> Different results in local mode and yarn mode when working with dates (silent 
> corruption due to system timezone setting)
> 
>
> Key: SPARK-20489
> URL: https://issues.apache.org/jira/browse/SPARK-20489
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1, 2.0.2
> Environment: yarn-client mode in Zeppelin, Cloudera 
> Spark2-distribution
>Reporter: Rick Moritz
>Priority: Critical
>
> Running the following code (in Zeppelin, or spark-shell), I get different 
> results, depending on whether I am using local[*] -mode or yarn-client mode:
> {code:title=test case|borderStyle=solid}
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.types._
> import spark.implicits._
> val counter = 1 to 2
> val size = 1 to 3
> val sampleText = spark.createDataFrame(
> sc.parallelize(size)
> .map(Row(_)),
> StructType(Array(StructField("id", IntegerType, nullable=false))
> )
> )
> .withColumn("loadDTS",lit("2017-04-25T10:45:02.2"))
> 
> val rddList = counter.map(
> count => sampleText
> .withColumn("loadDTS2", 
> date_format(date_add(col("loadDTS"),count),"-MM-dd'T'HH:mm:ss.SSS"))
> .drop(col("loadDTS"))
> .withColumnRenamed("loadDTS2","loadDTS")
> .coalesce(4)
> .rdd
> )
> val resultText = spark.createDataFrame(
> spark.sparkContext.union(rddList),
> sampleText.schema
> )
> val testGrouped = resultText.groupBy("id")
> val timestamps = testGrouped.agg(
> max(unix_timestamp($"loadDTS", "-MM-dd'T'HH:mm:ss.SSS")) as 
> "timestamp"
> )
> val loadDateResult = resultText.join(timestamps, "id")
> val filteredresult = loadDateResult.filter($"timestamp" === 
> unix_timestamp($"loadDTS", "-MM-dd'T'HH:mm:ss.SSS"))
> filteredresult.count
> {code}
> The expected result, *3* is what I obtain in local mode, but as soon as I run 
> fully distributed, I get *0*. If Increase size to {{1 to 32000}}, I do get 
> some results (depending on the size of counter) - none of which makes any 
> sense.
> Up to the application of the last filter, at first glance everything looks 
> okay, but then something goes wrong. Potentially this is due to lingering 
> re-use of SimpleDateFormats, but I can't get it to happen in a 
> non-distributed mode. The generated execution plan is the same in each case, 
> as expected.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-20489) Different results in local mode and yarn mode when working with dates (race condition with SimpleDateFormat?)

2017-04-28 Thread Frank Rosner (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15988350#comment-15988350
 ] 

Frank Rosner edited comment on SPARK-20489 at 4/28/17 7:25 AM:
---

In 
[datetimeExpressions.scala#L569|https://github.com/apache/spark/blob/760c8d088df1d35d7b8942177d47bc1677daf143/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala#L569]
 we reuse a {{SimpleDateFormat}}. Is this {{eval}} function being called 
multi-threaded: 
[datetimeExpressions.scala#L591|https://github.com/apache/spark/blob/760c8d088df1d35d7b8942177d47bc1677daf143/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala#L591]?

If so, the results are completely non-deterministic and possibly wrong: 
https://twitter.com/FRosnerd/status/856994959425236992

Reason is that {{SimpleDateFormat}} is mutable and parsing not synchronized. 
Could that be related?


was (Author: frosner):
In 
[datetimeExpressions.scala#L569|https://github.com/apache/spark/blob/760c8d088df1d35d7b8942177d47bc1677daf143/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala#L569]
 we reuse a {{SimpleDateFormat}}. Is this {{eval}} function being called 
multi-threaded: 
[datetimeExpressions.scala#L591|https://github.com/apache/spark/blob/760c8d088df1d35d7b8942177d47bc1677daf143/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala#L591]?

If so, the results are completely non-deterministic and possibly wrong: 
https://twitter.com/FRosnerd/status/856994959425236992

Reason is that {{SimpleDateFormat}} is not mutable and not synchronized. Could 
that be related?

> Different results in local mode and yarn mode when working with dates (race 
> condition with SimpleDateFormat?)
> -
>
> Key: SPARK-20489
> URL: https://issues.apache.org/jira/browse/SPARK-20489
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1, 2.0.2
> Environment: yarn-client mode in Zeppelin, Cloudera 
> Spark2-distribution
>Reporter: Rick Moritz
>Priority: Critical
>
> Running the following code (in Zeppelin, or spark-shell), I get different 
> results, depending on whether I am using local[*] -mode or yarn-client mode:
> {code:title=test case|borderStyle=solid}
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.types._
> import spark.implicits._
> val counter = 1 to 2
> val size = 1 to 3
> val sampleText = spark.createDataFrame(
> sc.parallelize(size)
> .map(Row(_)),
> StructType(Array(StructField("id", IntegerType, nullable=false))
> )
> )
> .withColumn("loadDTS",lit("2017-04-25T10:45:02.2"))
> 
> val rddList = counter.map(
> count => sampleText
> .withColumn("loadDTS2", 
> date_format(date_add(col("loadDTS"),count),"-MM-dd'T'HH:mm:ss.SSS"))
> .drop(col("loadDTS"))
> .withColumnRenamed("loadDTS2","loadDTS")
> .coalesce(4)
> .rdd
> )
> val resultText = spark.createDataFrame(
> spark.sparkContext.union(rddList),
> sampleText.schema
> )
> val testGrouped = resultText.groupBy("id")
> val timestamps = testGrouped.agg(
> max(unix_timestamp($"loadDTS", "-MM-dd'T'HH:mm:ss.SSS")) as 
> "timestamp"
> )
> val loadDateResult = resultText.join(timestamps, "id")
> val filteredresult = loadDateResult.filter($"timestamp" === 
> unix_timestamp($"loadDTS", "-MM-dd'T'HH:mm:ss.SSS"))
> filteredresult.count
> {code}
> The expected result, *3* is what I obtain in local mode, but as soon as I run 
> fully distributed, I get *0*. If Increase size to {{1 to 32000}}, I do get 
> some results (depending on the size of counter) - none of which makes any 
> sense.
> Up to the application of the last filter, at first glance everything looks 
> okay, but then something goes wrong. Potentially this is due to lingering 
> re-use of SimpleDateFormats, but I can't get it to happen in a 
> non-distributed mode. The generated execution plan is the same in each case, 
> as expected.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20489) Different results in local mode and yarn mode when working with dates (race condition with SimpleDateFormat?)

2017-04-28 Thread Frank Rosner (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15988350#comment-15988350
 ] 

Frank Rosner commented on SPARK-20489:
--

In 
https://github.com/apache/spark/blob/760c8d088df1d35d7b8942177d47bc1677daf143/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala#L569
 we reuse a {{SimpleDateFormat}}. Is this {{eval}} function being called 
multi-threaded: 
https://github.com/apache/spark/blob/760c8d088df1d35d7b8942177d47bc1677daf143/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala#L591?

If so, the results are completely non-deterministic and possibly wrong: 
https://twitter.com/FRosnerd/status/856994959425236992

Reason is that {{SimpleDateFormat}} is not mutable and not synchronized. Could 
that be related?

> Different results in local mode and yarn mode when working with dates (race 
> condition with SimpleDateFormat?)
> -
>
> Key: SPARK-20489
> URL: https://issues.apache.org/jira/browse/SPARK-20489
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1, 2.0.2
> Environment: yarn-client mode in Zeppelin, Cloudera 
> Spark2-distribution
>Reporter: Rick Moritz
>Priority: Critical
>
> Running the following code (in Zeppelin, or spark-shell), I get different 
> results, depending on whether I am using local[*] -mode or yarn-client mode:
> {code:title=test case|borderStyle=solid}
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.types._
> import spark.implicits._
> val counter = 1 to 2
> val size = 1 to 3
> val sampleText = spark.createDataFrame(
> sc.parallelize(size)
> .map(Row(_)),
> StructType(Array(StructField("id", IntegerType, nullable=false))
> )
> )
> .withColumn("loadDTS",lit("2017-04-25T10:45:02.2"))
> 
> val rddList = counter.map(
> count => sampleText
> .withColumn("loadDTS2", 
> date_format(date_add(col("loadDTS"),count),"-MM-dd'T'HH:mm:ss.SSS"))
> .drop(col("loadDTS"))
> .withColumnRenamed("loadDTS2","loadDTS")
> .coalesce(4)
> .rdd
> )
> val resultText = spark.createDataFrame(
> spark.sparkContext.union(rddList),
> sampleText.schema
> )
> val testGrouped = resultText.groupBy("id")
> val timestamps = testGrouped.agg(
> max(unix_timestamp($"loadDTS", "-MM-dd'T'HH:mm:ss.SSS")) as 
> "timestamp"
> )
> val loadDateResult = resultText.join(timestamps, "id")
> val filteredresult = loadDateResult.filter($"timestamp" === 
> unix_timestamp($"loadDTS", "-MM-dd'T'HH:mm:ss.SSS"))
> filteredresult.count
> {code}
> The expected result, *3* is what I obtain in local mode, but as soon as I run 
> fully distributed, I get *0*. If Increase size to {{1 to 32000}}, I do get 
> some results (depending on the size of counter) - none of which makes any 
> sense.
> Up to the application of the last filter, at first glance everything looks 
> okay, but then something goes wrong. Potentially this is due to lingering 
> re-use of SimpleDateFormats, but I can't get it to happen in a 
> non-distributed mode. The generated execution plan is the same in each case, 
> as expected.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-20489) Different results in local mode and yarn mode when working with dates (race condition with SimpleDateFormat?)

2017-04-28 Thread Frank Rosner (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15988350#comment-15988350
 ] 

Frank Rosner edited comment on SPARK-20489 at 4/28/17 7:24 AM:
---

In 
[datetimeExpressions.scala#L569|https://github.com/apache/spark/blob/760c8d088df1d35d7b8942177d47bc1677daf143/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala#L569]
 we reuse a {{SimpleDateFormat}}. Is this {{eval}} function being called 
multi-threaded: 
[datetimeExpressions.scala#L591|https://github.com/apache/spark/blob/760c8d088df1d35d7b8942177d47bc1677daf143/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala#L591]?

If so, the results are completely non-deterministic and possibly wrong: 
https://twitter.com/FRosnerd/status/856994959425236992

Reason is that {{SimpleDateFormat}} is not mutable and not synchronized. Could 
that be related?


was (Author: frosner):
In 
https://github.com/apache/spark/blob/760c8d088df1d35d7b8942177d47bc1677daf143/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala#L569
 we reuse a {{SimpleDateFormat}}. Is this {{eval}} function being called 
multi-threaded: 
https://github.com/apache/spark/blob/760c8d088df1d35d7b8942177d47bc1677daf143/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala#L591?

If so, the results are completely non-deterministic and possibly wrong: 
https://twitter.com/FRosnerd/status/856994959425236992

Reason is that {{SimpleDateFormat}} is not mutable and not synchronized. Could 
that be related?

> Different results in local mode and yarn mode when working with dates (race 
> condition with SimpleDateFormat?)
> -
>
> Key: SPARK-20489
> URL: https://issues.apache.org/jira/browse/SPARK-20489
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1, 2.0.2
> Environment: yarn-client mode in Zeppelin, Cloudera 
> Spark2-distribution
>Reporter: Rick Moritz
>Priority: Critical
>
> Running the following code (in Zeppelin, or spark-shell), I get different 
> results, depending on whether I am using local[*] -mode or yarn-client mode:
> {code:title=test case|borderStyle=solid}
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.types._
> import spark.implicits._
> val counter = 1 to 2
> val size = 1 to 3
> val sampleText = spark.createDataFrame(
> sc.parallelize(size)
> .map(Row(_)),
> StructType(Array(StructField("id", IntegerType, nullable=false))
> )
> )
> .withColumn("loadDTS",lit("2017-04-25T10:45:02.2"))
> 
> val rddList = counter.map(
> count => sampleText
> .withColumn("loadDTS2", 
> date_format(date_add(col("loadDTS"),count),"-MM-dd'T'HH:mm:ss.SSS"))
> .drop(col("loadDTS"))
> .withColumnRenamed("loadDTS2","loadDTS")
> .coalesce(4)
> .rdd
> )
> val resultText = spark.createDataFrame(
> spark.sparkContext.union(rddList),
> sampleText.schema
> )
> val testGrouped = resultText.groupBy("id")
> val timestamps = testGrouped.agg(
> max(unix_timestamp($"loadDTS", "-MM-dd'T'HH:mm:ss.SSS")) as 
> "timestamp"
> )
> val loadDateResult = resultText.join(timestamps, "id")
> val filteredresult = loadDateResult.filter($"timestamp" === 
> unix_timestamp($"loadDTS", "-MM-dd'T'HH:mm:ss.SSS"))
> filteredresult.count
> {code}
> The expected result, *3* is what I obtain in local mode, but as soon as I run 
> fully distributed, I get *0*. If Increase size to {{1 to 32000}}, I do get 
> some results (depending on the size of counter) - none of which makes any 
> sense.
> Up to the application of the last filter, at first glance everything looks 
> okay, but then something goes wrong. Potentially this is due to lingering 
> re-use of SimpleDateFormats, but I can't get it to happen in a 
> non-distributed mode. The generated execution plan is the same in each case, 
> as expected.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17933) Shuffle fails when driver is on one of the same machines as executor

2016-10-14 Thread Frank Rosner (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17933?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15575232#comment-15575232
 ] 

Frank Rosner commented on SPARK-17933:
--

Thanks [~srowen]. I know a lot of discussions about the driver advertised 
address etc. but I am not sure if this is a configuration problem or a bug. The 
ticket you reference talks about the driver IP but I am having troubles with 
the executor (I highlighted the line in the screenshot).

Any ideas how to address this problem?

> Shuffle fails when driver is on one of the same machines as executor
> 
>
> Key: SPARK-17933
> URL: https://issues.apache.org/jira/browse/SPARK-17933
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 1.6.2
>Reporter: Frank Rosner
> Attachments: screenshot-1.png, screenshot-2.png
>
>
> h4. Problem
> When I run a job that requires some shuffle, some tasks fail because the 
> executor cannot fetch the shuffle blocks from another executor.
> {noformat}
> org.apache.spark.shuffle.FetchFailedException: Failed to connect to 
> 10-250-20-140:44042
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:323)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:300)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:51)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>   at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>   at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>   at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>   at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:504)
>   at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.(TungstenAggregationIterator.scala:686)
>   at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:95)
>   at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>   at org.apache.spark.scheduler.Task.run(Task.scala:89)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Failed to connect to 10-250-20-140:44042
>   at 
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:216)
>   at 
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:167)
>   at 
> org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:90)
>   at 
> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
>   at 
> org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
>   at 
> org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   ... 3 more
> Caused by: java.nio.channels.UnresolvedAddressException
>   at sun.nio.ch.Net.checkAddress(Net.java:101)
>   at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:622)
>   at 
> io.netty.channel.socket.nio.NioSocketChannel.doConnect(NioSocketChannel.java:209)

[jira] [Updated] (SPARK-17933) Shuffle fails when driver is on one of the same machines as executor

2016-10-14 Thread Frank Rosner (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17933?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Frank Rosner updated SPARK-17933:
-
Attachment: (was: screenshot-1.png)

> Shuffle fails when driver is on one of the same machines as executor
> 
>
> Key: SPARK-17933
> URL: https://issues.apache.org/jira/browse/SPARK-17933
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 1.6.2
>Reporter: Frank Rosner
> Attachments: screenshot-1.png, screenshot-2.png
>
>
> h4. Problem
> When I run a job that requires some shuffle, some tasks fail because the 
> executor cannot fetch the shuffle blocks from another executor.
> {noformat}
> org.apache.spark.shuffle.FetchFailedException: Failed to connect to 
> 10-250-20-140:44042
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:323)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:300)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:51)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>   at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>   at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>   at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>   at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:504)
>   at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.(TungstenAggregationIterator.scala:686)
>   at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:95)
>   at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>   at org.apache.spark.scheduler.Task.run(Task.scala:89)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Failed to connect to 10-250-20-140:44042
>   at 
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:216)
>   at 
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:167)
>   at 
> org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:90)
>   at 
> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
>   at 
> org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
>   at 
> org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   ... 3 more
> Caused by: java.nio.channels.UnresolvedAddressException
>   at sun.nio.ch.Net.checkAddress(Net.java:101)
>   at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:622)
>   at 
> io.netty.channel.socket.nio.NioSocketChannel.doConnect(NioSocketChannel.java:209)
>   at 
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.connect(AbstractNioChannel.java:207)
>   at 
> io.netty.channel.DefaultChannelPipeline$HeadContext.connect(DefaultChannelPipeline.java:1097)
>   at 
> 

[jira] [Updated] (SPARK-17933) Shuffle fails when driver is on one of the same machines as executor

2016-10-14 Thread Frank Rosner (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17933?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Frank Rosner updated SPARK-17933:
-
Attachment: screenshot-1.png

> Shuffle fails when driver is on one of the same machines as executor
> 
>
> Key: SPARK-17933
> URL: https://issues.apache.org/jira/browse/SPARK-17933
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 1.6.2
>Reporter: Frank Rosner
> Attachments: screenshot-1.png, screenshot-2.png
>
>
> h4. Problem
> When I run a job that requires some shuffle, some tasks fail because the 
> executor cannot fetch the shuffle blocks from another executor.
> {noformat}
> org.apache.spark.shuffle.FetchFailedException: Failed to connect to 
> 10-250-20-140:44042
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:323)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:300)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:51)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>   at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>   at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>   at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>   at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:504)
>   at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.(TungstenAggregationIterator.scala:686)
>   at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:95)
>   at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>   at org.apache.spark.scheduler.Task.run(Task.scala:89)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Failed to connect to 10-250-20-140:44042
>   at 
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:216)
>   at 
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:167)
>   at 
> org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:90)
>   at 
> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
>   at 
> org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
>   at 
> org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   ... 3 more
> Caused by: java.nio.channels.UnresolvedAddressException
>   at sun.nio.ch.Net.checkAddress(Net.java:101)
>   at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:622)
>   at 
> io.netty.channel.socket.nio.NioSocketChannel.doConnect(NioSocketChannel.java:209)
>   at 
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.connect(AbstractNioChannel.java:207)
>   at 
> io.netty.channel.DefaultChannelPipeline$HeadContext.connect(DefaultChannelPipeline.java:1097)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.invokeConnect(AbstractChannelHandlerContext.java:471)
>   

[jira] [Updated] (SPARK-17933) Shuffle fails when driver is on one of the same machines as executor

2016-10-14 Thread Frank Rosner (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17933?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Frank Rosner updated SPARK-17933:
-
Description: 
h4. Problem

When I run a job that requires some shuffle, some tasks fail because the 
executor cannot fetch the shuffle blocks from another executor.

{noformat}
org.apache.spark.shuffle.FetchFailedException: Failed to connect to 
10-250-20-140:44042
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:323)
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:300)
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:51)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at 
org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:504)
at 
org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.(TungstenAggregationIterator.scala:686)
at 
org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:95)
at 
org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Failed to connect to 10-250-20-140:44042
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:216)
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:167)
at 
org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:90)
at 
org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
at 
org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
at 
org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
... 3 more
Caused by: java.nio.channels.UnresolvedAddressException
at sun.nio.ch.Net.checkAddress(Net.java:101)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:622)
at 
io.netty.channel.socket.nio.NioSocketChannel.doConnect(NioSocketChannel.java:209)
at 
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.connect(AbstractNioChannel.java:207)
at 
io.netty.channel.DefaultChannelPipeline$HeadContext.connect(DefaultChannelPipeline.java:1097)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeConnect(AbstractChannelHandlerContext.java:471)
at 
io.netty.channel.AbstractChannelHandlerContext.connect(AbstractChannelHandlerContext.java:456)
at 
io.netty.channel.ChannelOutboundHandlerAdapter.connect(ChannelOutboundHandlerAdapter.java:47)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeConnect(AbstractChannelHandlerContext.java:471)
at 
io.netty.channel.AbstractChannelHandlerContext.connect(AbstractChannelHandlerContext.java:456)
at 
io.netty.channel.ChannelDuplexHandler.connect(ChannelDuplexHandler.java:50)
at 

[jira] [Updated] (SPARK-17933) Shuffle fails when driver is on one of the same machines as executor

2016-10-14 Thread Frank Rosner (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17933?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Frank Rosner updated SPARK-17933:
-
Description: 
h4. Problem

When I run a job that requires some shuffle, some tasks fail because the 
executor cannot fetch the shuffle blocks from another executor.

{noformat}
org.apache.spark.shuffle.FetchFailedException: Failed to connect to 
10-250-20-140:44042
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:323)
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:300)
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:51)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at 
org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:504)
at 
org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.(TungstenAggregationIterator.scala:686)
at 
org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:95)
at 
org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Failed to connect to 10-250-20-140:44042
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:216)
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:167)
at 
org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:90)
at 
org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
at 
org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
at 
org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
... 3 more
Caused by: java.nio.channels.UnresolvedAddressException
at sun.nio.ch.Net.checkAddress(Net.java:101)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:622)
at 
io.netty.channel.socket.nio.NioSocketChannel.doConnect(NioSocketChannel.java:209)
at 
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.connect(AbstractNioChannel.java:207)
at 
io.netty.channel.DefaultChannelPipeline$HeadContext.connect(DefaultChannelPipeline.java:1097)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeConnect(AbstractChannelHandlerContext.java:471)
at 
io.netty.channel.AbstractChannelHandlerContext.connect(AbstractChannelHandlerContext.java:456)
at 
io.netty.channel.ChannelOutboundHandlerAdapter.connect(ChannelOutboundHandlerAdapter.java:47)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeConnect(AbstractChannelHandlerContext.java:471)
at 
io.netty.channel.AbstractChannelHandlerContext.connect(AbstractChannelHandlerContext.java:456)
at 
io.netty.channel.ChannelDuplexHandler.connect(ChannelDuplexHandler.java:50)
at 

[jira] [Created] (SPARK-17933) Shuffle fails when driver is on one of the same machines as executor

2016-10-14 Thread Frank Rosner (JIRA)
Frank Rosner created SPARK-17933:


 Summary: Shuffle fails when driver is on one of the same machines 
as executor
 Key: SPARK-17933
 URL: https://issues.apache.org/jira/browse/SPARK-17933
 Project: Spark
  Issue Type: Bug
  Components: Shuffle
Affects Versions: 1.6.2
Reporter: Frank Rosner
 Attachments: screenshot-1.png, screenshot-2.png

h4. Problem

When I run a job that requires some shuffle, some tasks fail because the 
executor cannot fetch the shuffle blocks from another executor.

{noformat}
org.apache.spark.shuffle.FetchFailedException: Failed to connect to 
10-250-20-140:44042
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:323)
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:300)
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:51)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at 
org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:504)
at 
org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.(TungstenAggregationIterator.scala:686)
at 
org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:95)
at 
org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Failed to connect to 10-250-20-140:44042
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:216)
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:167)
at 
org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:90)
at 
org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
at 
org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
at 
org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
... 3 more
Caused by: java.nio.channels.UnresolvedAddressException
at sun.nio.ch.Net.checkAddress(Net.java:101)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:622)
at 
io.netty.channel.socket.nio.NioSocketChannel.doConnect(NioSocketChannel.java:209)
at 
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.connect(AbstractNioChannel.java:207)
at 
io.netty.channel.DefaultChannelPipeline$HeadContext.connect(DefaultChannelPipeline.java:1097)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeConnect(AbstractChannelHandlerContext.java:471)
at 
io.netty.channel.AbstractChannelHandlerContext.connect(AbstractChannelHandlerContext.java:456)
at 
io.netty.channel.ChannelOutboundHandlerAdapter.connect(ChannelOutboundHandlerAdapter.java:47)
at 

[jira] [Updated] (SPARK-17933) Shuffle fails when driver is on one of the same machines as executor

2016-10-14 Thread Frank Rosner (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17933?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Frank Rosner updated SPARK-17933:
-
Attachment: screenshot-2.png
screenshot-1.png

> Shuffle fails when driver is on one of the same machines as executor
> 
>
> Key: SPARK-17933
> URL: https://issues.apache.org/jira/browse/SPARK-17933
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 1.6.2
>Reporter: Frank Rosner
> Attachments: screenshot-1.png, screenshot-2.png
>
>
> h4. Problem
> When I run a job that requires some shuffle, some tasks fail because the 
> executor cannot fetch the shuffle blocks from another executor.
> {noformat}
> org.apache.spark.shuffle.FetchFailedException: Failed to connect to 
> 10-250-20-140:44042
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:323)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:300)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:51)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>   at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>   at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>   at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>   at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:504)
>   at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.(TungstenAggregationIterator.scala:686)
>   at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:95)
>   at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>   at org.apache.spark.scheduler.Task.run(Task.scala:89)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Failed to connect to 10-250-20-140:44042
>   at 
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:216)
>   at 
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:167)
>   at 
> org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:90)
>   at 
> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
>   at 
> org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
>   at 
> org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   ... 3 more
> Caused by: java.nio.channels.UnresolvedAddressException
>   at sun.nio.ch.Net.checkAddress(Net.java:101)
>   at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:622)
>   at 
> io.netty.channel.socket.nio.NioSocketChannel.doConnect(NioSocketChannel.java:209)
>   at 
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.connect(AbstractNioChannel.java:207)
>   at 
> io.netty.channel.DefaultChannelPipeline$HeadContext.connect(DefaultChannelPipeline.java:1097)
>   at 
> 

[jira] [Commented] (SPARK-12823) Cannot create UDF with StructType input

2016-01-20 Thread Frank Rosner (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15108689#comment-15108689
 ] 

Frank Rosner commented on SPARK-12823:
--

Ok :( :D

> Cannot create UDF with StructType input
> ---
>
> Key: SPARK-12823
> URL: https://issues.apache.org/jira/browse/SPARK-12823
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.5.2
>Reporter: Frank Rosner
>
> h5. Problem
> It is not possible to apply a UDF to a column that has a struct data type. 
> Two previous requests to the mailing list remained unanswered.
> h5. How-To-Reproduce
> {code}
> val sql = new org.apache.spark.sql.SQLContext(sc)
> import sql.implicits._
> case class KV(key: Long, value: String)
> case class Row(kv: KV)
> val df = sc.parallelize(List(Row(KV(1L, "a")), Row(KV(5L, "b".toDF
> val udf1 = org.apache.spark.sql.functions.udf((kv: KV) => kv.value)
> df.select(udf1(df("kv"))).show
> // java.lang.ClassCastException: 
> org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast 
> to $line78.$read$$iwC$$iwC$KV
> val udf2 = org.apache.spark.sql.functions.udf((kv: (Long, String)) => kv._2)
> df.select(udf2(df("kv"))).show
> // org.apache.spark.sql.AnalysisException: cannot resolve 'UDF(kv)' due to 
> data type mismatch: argument 1 requires struct<_1:bigint,_2:string> type, 
> however, 'kv' is of struct type.;
> {code}
> h5. Mailing List Entries
> - 
> https://mail-archives.apache.org/mod_mbox/spark-user/201511.mbox/%3CCACUahd8M=ipCbFCYDyein_=vqyoantn-tpxe6sq395nh10g...@mail.gmail.com%3E
> - https://www.mail-archive.com/user@spark.apache.org/msg43092.html
> h5. Possible Workaround
> If you create a {{UserDefinedFunction}} manually, not using the {{udf}} 
> helper functions, it works. See https://github.com/FRosner/struct-udf, which 
> exposes the {{UserDefinedFunction}} constructor (public from package 
> private). However, then you have to work with a {{Row}}, because it does not 
> automatically convert the row to a case class / tuple.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12823) Cannot create UDF with StructType input

2016-01-20 Thread Frank Rosner (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15109034#comment-15109034
 ] 

Frank Rosner commented on SPARK-12823:
--

Ok :( :D

> Cannot create UDF with StructType input
> ---
>
> Key: SPARK-12823
> URL: https://issues.apache.org/jira/browse/SPARK-12823
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.5.2
>Reporter: Frank Rosner
>
> h5. Problem
> It is not possible to apply a UDF to a column that has a struct data type. 
> Two previous requests to the mailing list remained unanswered.
> h5. How-To-Reproduce
> {code}
> val sql = new org.apache.spark.sql.SQLContext(sc)
> import sql.implicits._
> case class KV(key: Long, value: String)
> case class Row(kv: KV)
> val df = sc.parallelize(List(Row(KV(1L, "a")), Row(KV(5L, "b".toDF
> val udf1 = org.apache.spark.sql.functions.udf((kv: KV) => kv.value)
> df.select(udf1(df("kv"))).show
> // java.lang.ClassCastException: 
> org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast 
> to $line78.$read$$iwC$$iwC$KV
> val udf2 = org.apache.spark.sql.functions.udf((kv: (Long, String)) => kv._2)
> df.select(udf2(df("kv"))).show
> // org.apache.spark.sql.AnalysisException: cannot resolve 'UDF(kv)' due to 
> data type mismatch: argument 1 requires struct<_1:bigint,_2:string> type, 
> however, 'kv' is of struct type.;
> {code}
> h5. Mailing List Entries
> - 
> https://mail-archives.apache.org/mod_mbox/spark-user/201511.mbox/%3CCACUahd8M=ipCbFCYDyein_=vqyoantn-tpxe6sq395nh10g...@mail.gmail.com%3E
> - https://www.mail-archive.com/user@spark.apache.org/msg43092.html
> h5. Possible Workaround
> If you create a {{UserDefinedFunction}} manually, not using the {{udf}} 
> helper functions, it works. See https://github.com/FRosner/struct-udf, which 
> exposes the {{UserDefinedFunction}} constructor (public from package 
> private). However, then you have to work with a {{Row}}, because it does not 
> automatically convert the row to a case class / tuple.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Issue Comment Deleted] (SPARK-12823) Cannot create UDF with StructType input

2016-01-20 Thread Frank Rosner (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-12823?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Frank Rosner updated SPARK-12823:
-
Comment: was deleted

(was: Ok :( :D)

> Cannot create UDF with StructType input
> ---
>
> Key: SPARK-12823
> URL: https://issues.apache.org/jira/browse/SPARK-12823
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.5.2
>Reporter: Frank Rosner
>
> h5. Problem
> It is not possible to apply a UDF to a column that has a struct data type. 
> Two previous requests to the mailing list remained unanswered.
> h5. How-To-Reproduce
> {code}
> val sql = new org.apache.spark.sql.SQLContext(sc)
> import sql.implicits._
> case class KV(key: Long, value: String)
> case class Row(kv: KV)
> val df = sc.parallelize(List(Row(KV(1L, "a")), Row(KV(5L, "b".toDF
> val udf1 = org.apache.spark.sql.functions.udf((kv: KV) => kv.value)
> df.select(udf1(df("kv"))).show
> // java.lang.ClassCastException: 
> org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast 
> to $line78.$read$$iwC$$iwC$KV
> val udf2 = org.apache.spark.sql.functions.udf((kv: (Long, String)) => kv._2)
> df.select(udf2(df("kv"))).show
> // org.apache.spark.sql.AnalysisException: cannot resolve 'UDF(kv)' due to 
> data type mismatch: argument 1 requires struct<_1:bigint,_2:string> type, 
> however, 'kv' is of struct type.;
> {code}
> h5. Mailing List Entries
> - 
> https://mail-archives.apache.org/mod_mbox/spark-user/201511.mbox/%3CCACUahd8M=ipCbFCYDyein_=vqyoantn-tpxe6sq395nh10g...@mail.gmail.com%3E
> - https://www.mail-archive.com/user@spark.apache.org/msg43092.html
> h5. Possible Workaround
> If you create a {{UserDefinedFunction}} manually, not using the {{udf}} 
> helper functions, it works. See https://github.com/FRosner/struct-udf, which 
> exposes the {{UserDefinedFunction}} constructor (public from package 
> private). However, then you have to work with a {{Row}}, because it does not 
> automatically convert the row to a case class / tuple.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12823) Cannot create UDF with StructType input

2016-01-20 Thread Frank Rosner (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15108342#comment-15108342
 ] 

Frank Rosner commented on SPARK-12823:
--

Any thoughts on this one [~srowen]?

> Cannot create UDF with StructType input
> ---
>
> Key: SPARK-12823
> URL: https://issues.apache.org/jira/browse/SPARK-12823
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.5.2
>Reporter: Frank Rosner
>
> h5. Problem
> It is not possible to apply a UDF to a column that has a struct data type. 
> Two previous requests to the mailing list remained unanswered.
> h5. How-To-Reproduce
> {code}
> val sql = new org.apache.spark.sql.SQLContext(sc)
> import sql.implicits._
> case class KV(key: Long, value: String)
> case class Row(kv: KV)
> val df = sc.parallelize(List(Row(KV(1L, "a")), Row(KV(5L, "b".toDF
> val udf1 = org.apache.spark.sql.functions.udf((kv: KV) => kv.value)
> df.select(udf1(df("kv"))).show
> // java.lang.ClassCastException: 
> org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast 
> to $line78.$read$$iwC$$iwC$KV
> val udf2 = org.apache.spark.sql.functions.udf((kv: (Long, String)) => kv._2)
> df.select(udf2(df("kv"))).show
> // org.apache.spark.sql.AnalysisException: cannot resolve 'UDF(kv)' due to 
> data type mismatch: argument 1 requires struct<_1:bigint,_2:string> type, 
> however, 'kv' is of struct type.;
> {code}
> h5. Mailing List Entries
> - 
> https://mail-archives.apache.org/mod_mbox/spark-user/201511.mbox/%3CCACUahd8M=ipCbFCYDyein_=vqyoantn-tpxe6sq395nh10g...@mail.gmail.com%3E
> - https://www.mail-archive.com/user@spark.apache.org/msg43092.html
> h5. Possible Workaround
> If you create a {{UserDefinedFunction}} manually, not using the {{udf}} 
> helper functions, it works. See https://github.com/FRosner/struct-udf, which 
> exposes the {{UserDefinedFunction}} constructor (public from package 
> private). However, then you have to work with a {{Row}}, because it does not 
> automatically convert the row to a case class / tuple.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-12823) Cannot create UDF with StructType input

2016-01-18 Thread Frank Rosner (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-12823?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Frank Rosner updated SPARK-12823:
-
Description: 
h5. Problem

It is not possible to apply a UDF to a column that has a struct data type. Two 
previous requests to the mailing list remained unanswered.

h5. How-To-Reproduce

{code}
val sql = new org.apache.spark.sql.SQLContext(sc)
import sql.implicits._

case class KV(key: Long, value: String)
case class Row(kv: KV)
val df = sc.parallelize(List(Row(KV(1L, "a")), Row(KV(5L, "b".toDF

val udf1 = org.apache.spark.sql.functions.udf((kv: KV) => kv.value)
df.select(udf1(df("kv"))).show
// java.lang.ClassCastException: 
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast 
to $line78.$read$$iwC$$iwC$KV

val udf2 = org.apache.spark.sql.functions.udf((kv: (Long, String)) => kv._2)
df.select(udf2(df("kv"))).show
// org.apache.spark.sql.AnalysisException: cannot resolve 'UDF(kv)' due to data 
type mismatch: argument 1 requires struct<_1:bigint,_2:string> type, however, 
'kv' is of struct type.;
{code}

h5. Mailing List Entries

- 
https://mail-archives.apache.org/mod_mbox/spark-user/201511.mbox/%3CCACUahd8M=ipCbFCYDyein_=vqyoantn-tpxe6sq395nh10g...@mail.gmail.com%3E

- https://www.mail-archive.com/user@spark.apache.org/msg43092.html

h5. Possible Workaround

If you create a {{UserDefinedFunction}} manually, not using the {{udf}} helper 
functions, it works. See https://github.com/FRosner/struct-udf, which exposes 
the {{UserDefinedFunction}} constructor (public from package private). However, 
then you have to work with Row, because it does not automatically convert the 
row to a case class / tuple.

  was:
h5. Problem

It is not possible to apply a UDF to a column that has a struct data type. Two 
previous requests to the mailing list remained unanswered.

h5. How-To-Reproduce

{code}
val sql = new org.apache.spark.sql.SQLContext(sc)
import sql.implicits._

case class KV(key: Long, value: String)
case class Row(kv: KV)
val df = sc.parallelize(List(Row(KV(1L, "a")), Row(KV(5L, "b".toDF

val udf1 = org.apache.spark.sql.functions.udf((kv: KV) => kv.value)
df.select(udf1(df("kv"))).show
// java.lang.ClassCastException: 
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast 
to $line78.$read$$iwC$$iwC$KV

val udf2 = org.apache.spark.sql.functions.udf((kv: (Long, String)) => kv._2)
df.select(udf2(df("kv"))).show
// org.apache.spark.sql.AnalysisException: cannot resolve 'UDF(kv)' due to data 
type mismatch: argument 1 requires struct<_1:bigint,_2:string> type, however, 
'kv' is of struct type.;
{code}

h5. Mailing List Entries

- 
https://mail-archives.apache.org/mod_mbox/spark-user/201511.mbox/%3CCACUahd8M=ipCbFCYDyein_=vqyoantn-tpxe6sq395nh10g...@mail.gmail.com%3E

- https://www.mail-archive.com/user@spark.apache.org/msg43092.html

h5. Possible Workaround

If you create a {{UserDefinedFunction}} manually, not using the {{udf}} helper 
functions, it works. See https://github.com/FRosner/struct-udf, which exposes 
the {{UserDefinedFunction}} constructor (public from package private).


> Cannot create UDF with StructType input
> ---
>
> Key: SPARK-12823
> URL: https://issues.apache.org/jira/browse/SPARK-12823
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.5.2
>Reporter: Frank Rosner
>
> h5. Problem
> It is not possible to apply a UDF to a column that has a struct data type. 
> Two previous requests to the mailing list remained unanswered.
> h5. How-To-Reproduce
> {code}
> val sql = new org.apache.spark.sql.SQLContext(sc)
> import sql.implicits._
> case class KV(key: Long, value: String)
> case class Row(kv: KV)
> val df = sc.parallelize(List(Row(KV(1L, "a")), Row(KV(5L, "b".toDF
> val udf1 = org.apache.spark.sql.functions.udf((kv: KV) => kv.value)
> df.select(udf1(df("kv"))).show
> // java.lang.ClassCastException: 
> org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast 
> to $line78.$read$$iwC$$iwC$KV
> val udf2 = org.apache.spark.sql.functions.udf((kv: (Long, String)) => kv._2)
> df.select(udf2(df("kv"))).show
> // org.apache.spark.sql.AnalysisException: cannot resolve 'UDF(kv)' due to 
> data type mismatch: argument 1 requires struct<_1:bigint,_2:string> type, 
> however, 'kv' is of struct type.;
> {code}
> h5. Mailing List Entries
> - 
> https://mail-archives.apache.org/mod_mbox/spark-user/201511.mbox/%3CCACUahd8M=ipCbFCYDyein_=vqyoantn-tpxe6sq395nh10g...@mail.gmail.com%3E
> - https://www.mail-archive.com/user@spark.apache.org/msg43092.html
> h5. Possible Workaround
> If you create a {{UserDefinedFunction}} manually, not using the {{udf}} 
> helper functions, it works. See https://github.com/FRosner/struct-udf, 

[jira] [Updated] (SPARK-12823) Cannot create UDF with StructType input

2016-01-18 Thread Frank Rosner (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-12823?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Frank Rosner updated SPARK-12823:
-
Description: 
h5. Problem

It is not possible to apply a UDF to a column that has a struct data type. Two 
previous requests to the mailing list remained unanswered.

h5. How-To-Reproduce

{code}
val sql = new org.apache.spark.sql.SQLContext(sc)
import sql.implicits._

case class KV(key: Long, value: String)
case class Row(kv: KV)
val df = sc.parallelize(List(Row(KV(1L, "a")), Row(KV(5L, "b".toDF

val udf1 = org.apache.spark.sql.functions.udf((kv: KV) => kv.value)
df.select(udf1(df("kv"))).show
// java.lang.ClassCastException: 
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast 
to $line78.$read$$iwC$$iwC$KV

val udf2 = org.apache.spark.sql.functions.udf((kv: (Long, String)) => kv._2)
df.select(udf2(df("kv"))).show
// org.apache.spark.sql.AnalysisException: cannot resolve 'UDF(kv)' due to data 
type mismatch: argument 1 requires struct<_1:bigint,_2:string> type, however, 
'kv' is of struct type.;
{code}

h5. Mailing List Entries

- 
https://mail-archives.apache.org/mod_mbox/spark-user/201511.mbox/%3CCACUahd8M=ipCbFCYDyein_=vqyoantn-tpxe6sq395nh10g...@mail.gmail.com%3E

- https://www.mail-archive.com/user@spark.apache.org/msg43092.html

h5. Possible Workaround

If you create a {{UserDefinedFunction}} manually, not using the {{udf}} helper 
functions, it works. See https://github.com/FRosner/struct-udf, which exposes 
the {{UserDefinedFunction}} constructor (public from package private). However, 
then you have to work with a {{Row}}, because it does not automatically convert 
the row to a case class / tuple.

  was:
h5. Problem

It is not possible to apply a UDF to a column that has a struct data type. Two 
previous requests to the mailing list remained unanswered.

h5. How-To-Reproduce

{code}
val sql = new org.apache.spark.sql.SQLContext(sc)
import sql.implicits._

case class KV(key: Long, value: String)
case class Row(kv: KV)
val df = sc.parallelize(List(Row(KV(1L, "a")), Row(KV(5L, "b".toDF

val udf1 = org.apache.spark.sql.functions.udf((kv: KV) => kv.value)
df.select(udf1(df("kv"))).show
// java.lang.ClassCastException: 
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast 
to $line78.$read$$iwC$$iwC$KV

val udf2 = org.apache.spark.sql.functions.udf((kv: (Long, String)) => kv._2)
df.select(udf2(df("kv"))).show
// org.apache.spark.sql.AnalysisException: cannot resolve 'UDF(kv)' due to data 
type mismatch: argument 1 requires struct<_1:bigint,_2:string> type, however, 
'kv' is of struct type.;
{code}

h5. Mailing List Entries

- 
https://mail-archives.apache.org/mod_mbox/spark-user/201511.mbox/%3CCACUahd8M=ipCbFCYDyein_=vqyoantn-tpxe6sq395nh10g...@mail.gmail.com%3E

- https://www.mail-archive.com/user@spark.apache.org/msg43092.html

h5. Possible Workaround

If you create a {{UserDefinedFunction}} manually, not using the {{udf}} helper 
functions, it works. See https://github.com/FRosner/struct-udf, which exposes 
the {{UserDefinedFunction}} constructor (public from package private). However, 
then you have to work with Row, because it does not automatically convert the 
row to a case class / tuple.


> Cannot create UDF with StructType input
> ---
>
> Key: SPARK-12823
> URL: https://issues.apache.org/jira/browse/SPARK-12823
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.5.2
>Reporter: Frank Rosner
>
> h5. Problem
> It is not possible to apply a UDF to a column that has a struct data type. 
> Two previous requests to the mailing list remained unanswered.
> h5. How-To-Reproduce
> {code}
> val sql = new org.apache.spark.sql.SQLContext(sc)
> import sql.implicits._
> case class KV(key: Long, value: String)
> case class Row(kv: KV)
> val df = sc.parallelize(List(Row(KV(1L, "a")), Row(KV(5L, "b".toDF
> val udf1 = org.apache.spark.sql.functions.udf((kv: KV) => kv.value)
> df.select(udf1(df("kv"))).show
> // java.lang.ClassCastException: 
> org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast 
> to $line78.$read$$iwC$$iwC$KV
> val udf2 = org.apache.spark.sql.functions.udf((kv: (Long, String)) => kv._2)
> df.select(udf2(df("kv"))).show
> // org.apache.spark.sql.AnalysisException: cannot resolve 'UDF(kv)' due to 
> data type mismatch: argument 1 requires struct<_1:bigint,_2:string> type, 
> however, 'kv' is of struct type.;
> {code}
> h5. Mailing List Entries
> - 
> https://mail-archives.apache.org/mod_mbox/spark-user/201511.mbox/%3CCACUahd8M=ipCbFCYDyein_=vqyoantn-tpxe6sq395nh10g...@mail.gmail.com%3E
> - https://www.mail-archive.com/user@spark.apache.org/msg43092.html
> h5. Possible Workaround
> If you create a 

[jira] [Updated] (SPARK-12823) Cannot create UDF with StructType input

2016-01-18 Thread Frank Rosner (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-12823?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Frank Rosner updated SPARK-12823:
-
Description: 
h5. Problem

It is not possible to apply a UDF to a column that has a struct data type. Two 
previous requests to the mailing list remained unanswered.

h5. How-To-Reproduce

{code}
val sql = new org.apache.spark.sql.SQLContext(sc)
import sql.implicits._

case class KV(key: Long, value: String)
case class Row(kv: KV)
val df = sc.parallelize(List(Row(KV(1L, "a")), Row(KV(5L, "b".toDF

val udf1 = org.apache.spark.sql.functions.udf((kv: KV) => kv.value)
df.select(udf1(df("kv"))).show
// java.lang.ClassCastException: 
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast 
to $line78.$read$$iwC$$iwC$KV

val udf2 = org.apache.spark.sql.functions.udf((kv: (Long, String)) => kv._2)
df.select(udf2(df("kv"))).show
// org.apache.spark.sql.AnalysisException: cannot resolve 'UDF(kv)' due to data 
type mismatch: argument 1 requires struct<_1:bigint,_2:string> type, however, 
'kv' is of struct type.;
{code}

h5. Mailing List Entries

- 
https://mail-archives.apache.org/mod_mbox/spark-user/201511.mbox/%3CCACUahd8M=ipCbFCYDyein_=vqyoantn-tpxe6sq395nh10g...@mail.gmail.com%3E

- https://www.mail-archive.com/user@spark.apache.org/msg43092.html

h5. Possible Workaround

If you create a {{UserDefinedFunction}} manually, not using the {{udf}} helper 
functions, it works. See https://github.com/FRosner/struct-udf, which exposes 
the UserDefinedFunction constructor (public from package private).

  was:
h5. Problem

It is not possible to apply a UDF to a column that has a struct data type. Two 
previous requests to the mailing list remained unanswered.

h5. How-To-Reproduce

{code}
val sql = new org.apache.spark.sql.SQLContext(sc)
import sql.implicits._

case class KV(key: Long, value: String)
case class Row(kv: KV)
val df = sc.parallelize(List(Row(KV(1L, "a")), Row(KV(5L, "b".toDF

val udf1 = org.apache.spark.sql.functions.udf((kv: KV) => kv.value)
df.select(udf1(df("kv"))).show
// java.lang.ClassCastException: 
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast 
to $line78.$read$$iwC$$iwC$KV

val udf2 = org.apache.spark.sql.functions.udf((kv: (Long, String)) => kv._2)
df.select(udf2(df("kv"))).show
// org.apache.spark.sql.AnalysisException: cannot resolve 'UDF(kv)' due to data 
type mismatch: argument 1 requires struct<_1:bigint,_2:string> type, however, 
'kv' is of struct type.;
{code}

h5. Mailing List Entries

- 
https://mail-archives.apache.org/mod_mbox/spark-user/201511.mbox/%3CCACUahd8M=ipCbFCYDyein_=vqyoantn-tpxe6sq395nh10g...@mail.gmail.com%3E

- https://www.mail-archive.com/user@spark.apache.org/msg43092.html


> Cannot create UDF with StructType input
> ---
>
> Key: SPARK-12823
> URL: https://issues.apache.org/jira/browse/SPARK-12823
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.5.2
>Reporter: Frank Rosner
>
> h5. Problem
> It is not possible to apply a UDF to a column that has a struct data type. 
> Two previous requests to the mailing list remained unanswered.
> h5. How-To-Reproduce
> {code}
> val sql = new org.apache.spark.sql.SQLContext(sc)
> import sql.implicits._
> case class KV(key: Long, value: String)
> case class Row(kv: KV)
> val df = sc.parallelize(List(Row(KV(1L, "a")), Row(KV(5L, "b".toDF
> val udf1 = org.apache.spark.sql.functions.udf((kv: KV) => kv.value)
> df.select(udf1(df("kv"))).show
> // java.lang.ClassCastException: 
> org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast 
> to $line78.$read$$iwC$$iwC$KV
> val udf2 = org.apache.spark.sql.functions.udf((kv: (Long, String)) => kv._2)
> df.select(udf2(df("kv"))).show
> // org.apache.spark.sql.AnalysisException: cannot resolve 'UDF(kv)' due to 
> data type mismatch: argument 1 requires struct<_1:bigint,_2:string> type, 
> however, 'kv' is of struct type.;
> {code}
> h5. Mailing List Entries
> - 
> https://mail-archives.apache.org/mod_mbox/spark-user/201511.mbox/%3CCACUahd8M=ipCbFCYDyein_=vqyoantn-tpxe6sq395nh10g...@mail.gmail.com%3E
> - https://www.mail-archive.com/user@spark.apache.org/msg43092.html
> h5. Possible Workaround
> If you create a {{UserDefinedFunction}} manually, not using the {{udf}} 
> helper functions, it works. See https://github.com/FRosner/struct-udf, which 
> exposes the UserDefinedFunction constructor (public from package private).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-12823) Cannot create UDF with StructType input

2016-01-18 Thread Frank Rosner (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-12823?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Frank Rosner updated SPARK-12823:
-
Description: 
h5. Problem

It is not possible to apply a UDF to a column that has a struct data type. Two 
previous requests to the mailing list remained unanswered.

h5. How-To-Reproduce

{code}
val sql = new org.apache.spark.sql.SQLContext(sc)
import sql.implicits._

case class KV(key: Long, value: String)
case class Row(kv: KV)
val df = sc.parallelize(List(Row(KV(1L, "a")), Row(KV(5L, "b".toDF

val udf1 = org.apache.spark.sql.functions.udf((kv: KV) => kv.value)
df.select(udf1(df("kv"))).show
// java.lang.ClassCastException: 
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast 
to $line78.$read$$iwC$$iwC$KV

val udf2 = org.apache.spark.sql.functions.udf((kv: (Long, String)) => kv._2)
df.select(udf2(df("kv"))).show
// org.apache.spark.sql.AnalysisException: cannot resolve 'UDF(kv)' due to data 
type mismatch: argument 1 requires struct<_1:bigint,_2:string> type, however, 
'kv' is of struct type.;
{code}

h5. Mailing List Entries

- 
https://mail-archives.apache.org/mod_mbox/spark-user/201511.mbox/%3CCACUahd8M=ipCbFCYDyein_=vqyoantn-tpxe6sq395nh10g...@mail.gmail.com%3E

- https://www.mail-archive.com/user@spark.apache.org/msg43092.html

h5. Possible Workaround

If you create a {{UserDefinedFunction}} manually, not using the {{udf}} helper 
functions, it works. See https://github.com/FRosner/struct-udf, which exposes 
the {{UserDefinedFunction}} constructor (public from package private).

  was:
h5. Problem

It is not possible to apply a UDF to a column that has a struct data type. Two 
previous requests to the mailing list remained unanswered.

h5. How-To-Reproduce

{code}
val sql = new org.apache.spark.sql.SQLContext(sc)
import sql.implicits._

case class KV(key: Long, value: String)
case class Row(kv: KV)
val df = sc.parallelize(List(Row(KV(1L, "a")), Row(KV(5L, "b".toDF

val udf1 = org.apache.spark.sql.functions.udf((kv: KV) => kv.value)
df.select(udf1(df("kv"))).show
// java.lang.ClassCastException: 
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast 
to $line78.$read$$iwC$$iwC$KV

val udf2 = org.apache.spark.sql.functions.udf((kv: (Long, String)) => kv._2)
df.select(udf2(df("kv"))).show
// org.apache.spark.sql.AnalysisException: cannot resolve 'UDF(kv)' due to data 
type mismatch: argument 1 requires struct<_1:bigint,_2:string> type, however, 
'kv' is of struct type.;
{code}

h5. Mailing List Entries

- 
https://mail-archives.apache.org/mod_mbox/spark-user/201511.mbox/%3CCACUahd8M=ipCbFCYDyein_=vqyoantn-tpxe6sq395nh10g...@mail.gmail.com%3E

- https://www.mail-archive.com/user@spark.apache.org/msg43092.html

h5. Possible Workaround

If you create a {{UserDefinedFunction}} manually, not using the {{udf}} helper 
functions, it works. See https://github.com/FRosner/struct-udf, which exposes 
the UserDefinedFunction constructor (public from package private).


> Cannot create UDF with StructType input
> ---
>
> Key: SPARK-12823
> URL: https://issues.apache.org/jira/browse/SPARK-12823
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.5.2
>Reporter: Frank Rosner
>
> h5. Problem
> It is not possible to apply a UDF to a column that has a struct data type. 
> Two previous requests to the mailing list remained unanswered.
> h5. How-To-Reproduce
> {code}
> val sql = new org.apache.spark.sql.SQLContext(sc)
> import sql.implicits._
> case class KV(key: Long, value: String)
> case class Row(kv: KV)
> val df = sc.parallelize(List(Row(KV(1L, "a")), Row(KV(5L, "b".toDF
> val udf1 = org.apache.spark.sql.functions.udf((kv: KV) => kv.value)
> df.select(udf1(df("kv"))).show
> // java.lang.ClassCastException: 
> org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast 
> to $line78.$read$$iwC$$iwC$KV
> val udf2 = org.apache.spark.sql.functions.udf((kv: (Long, String)) => kv._2)
> df.select(udf2(df("kv"))).show
> // org.apache.spark.sql.AnalysisException: cannot resolve 'UDF(kv)' due to 
> data type mismatch: argument 1 requires struct<_1:bigint,_2:string> type, 
> however, 'kv' is of struct type.;
> {code}
> h5. Mailing List Entries
> - 
> https://mail-archives.apache.org/mod_mbox/spark-user/201511.mbox/%3CCACUahd8M=ipCbFCYDyein_=vqyoantn-tpxe6sq395nh10g...@mail.gmail.com%3E
> - https://www.mail-archive.com/user@spark.apache.org/msg43092.html
> h5. Possible Workaround
> If you create a {{UserDefinedFunction}} manually, not using the {{udf}} 
> helper functions, it works. See https://github.com/FRosner/struct-udf, which 
> exposes the {{UserDefinedFunction}} constructor (public from package private).



--
This message was sent by 

[jira] [Created] (SPARK-12823) Cannot create UDF with StructType input

2016-01-14 Thread Frank Rosner (JIRA)
Frank Rosner created SPARK-12823:


 Summary: Cannot create UDF with StructType input
 Key: SPARK-12823
 URL: https://issues.apache.org/jira/browse/SPARK-12823
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 1.5.2
Reporter: Frank Rosner


h5. Problem

It is not possible to apply a UDF to a column that has a struct data type. Two 
previous requests to the mailing list remained unanswered.

h5. How-To-Reproduce

{code}
val sql = new org.apache.spark.sql.SQLContext(sc)
import sql.implicits._

case class KV(key: Long, value: String)
case class Row(kv: KV)
val df = sc.parallelize(List(Row(KV(1L, "a")), Row(KV(5L, "b".toDF

val udf1 = org.apache.spark.sql.functions.udf((kv: KV) => kv.value)
df.select(udf1(df("kv"))).show
// java.lang.ClassCastException: 
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast 
to $line78.$read$$iwC$$iwC$KV

val udf2 = org.apache.spark.sql.functions.udf((kv: (Long, String)) => kv._2)
df.select(udf2(df("kv"))).show
// org.apache.spark.sql.AnalysisException: cannot resolve 'UDF(kv)' due to data 
type mismatch: argument 1 requires struct<_1:bigint,_2:string> type, however, 
'kv' is of struct type.;
{code}

h5. Mailing List Entries

- 
https://mail-archives.apache.org/mod_mbox/spark-user/201511.mbox/%3CCACUahd8M=ipCbFCYDyein_=vqyoantn-tpxe6sq395nh10g...@mail.gmail.com%3E

- https://www.mail-archive.com/user@spark.apache.org/msg43092.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-11258) Remove quadratic runtime complexity for converting a Spark DataFrame into an R data.frame

2015-10-23 Thread Frank Rosner (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-11258?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14971025#comment-14971025
 ] 

Frank Rosner commented on SPARK-11258:
--

Actually I am pretty confused now. Thinking about it, having a for loop and a 
map should not be accessing every element more then one time. However, it still 
seems to be more complex than required to me. Let me try to reproduce the fact 
that we could not load it with the old function but with the new one. Maybe to 
.toArray method is a problem with memory as it is first recreating the whole 
shabang and then copying it to another array?

> Remove quadratic runtime complexity for converting a Spark DataFrame into an 
> R data.frame
> -
>
> Key: SPARK-11258
> URL: https://issues.apache.org/jira/browse/SPARK-11258
> Project: Spark
>  Issue Type: Improvement
>  Components: SparkR
>Affects Versions: 1.5.1
>Reporter: Frank Rosner
>
> h4. Introduction
> We tried to collect a DataFrame with > 1 million rows and a few hundred 
> columns in SparkR. This took a huge amount of time (much more than in the 
> Spark REPL). When looking into the code, I found that the 
> {{org.apache.spark.sql.api.r.SQLUtils.dfToCols}} method has quadratic run 
> time complexity (it goes through the complete data set _m_ times, where _m_ 
> is the number of columns.
> h4. Problem
> The {{dfToCols}} method is transposing the row-wise representation of the 
> Spark DataFrame (array of rows) into a column wise representation (array of 
> columns) to then be put into a data frame. This is done in a very inefficient 
> way, yielding to huge performance (and possibly also memory) problems when 
> collecting bigger data frames.
> h4. Solution
> Directly transpose the row wise representation to the column wise 
> representation with one pass through the data. I will create a pull request 
> for this.
> h4. Runtime comparison
> On a test data frame with 1 million rows and 22 columns, the old {{dfToCols}} 
> method takes average 2267 ms to complete. My implementation takes only 554 ms 
> on average. This effect gets even bigger, the more columns you have.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-11258) Converting a Spark DataFrame into an R data.frame is slow / requires a lot of memory

2015-10-23 Thread Frank Rosner (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-11258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Frank Rosner updated SPARK-11258:
-
Description: 
h4. Problem

We tried to collect a DataFrame with > 1 million rows and a few hundred columns 
in SparkR. This took a huge amount of time (much more than in the Spark REPL). 
When looking into the code, I found that the 
{{org.apache.spark.sql.api.r.SQLUtils.dfToCols}} method does some map and then 
{{.toArray}} which might cause the problem.

h4. Solution

Directly transpose the row wise representation to the column wise 
representation with one pass through the data. I will create a pull request for 
this.

h4. Runtime comparison

On a test data frame with 1 million rows and 22 columns, the old {{dfToCols}} 
method takes average 2267 ms to complete. My implementation takes only 554 ms 
on average. This effect might be due to garbage collection, especially if you 
consider that the old implementation didn't complete on an even bigger data 
frame.

  was:
h4. Problem

We tried to collect a DataFrame with > 1 million rows and a few hundred columns 
in SparkR. This took a huge amount of time (much more than in the Spark REPL). 
When looking into the code, I found that the 
{{org.apache.spark.sql.api.r.SQLUtils.dfToCols}} method does some map and then 
{{.toArray}} which might cause the problem.

h4. Solution

Directly transpose the row wise representation to the column wise 
representation with one pass through the data. I will create a pull request for 
this.

h4. Runtime comparison

On a test data frame with 1 million rows and 22 columns, the old {{dfToCols}} 
method takes average 2267 ms to complete. My implementation takes only 554 ms 
on average. This effect gets even bigger, the more columns you have.


> Converting a Spark DataFrame into an R data.frame is slow / requires a lot of 
> memory
> 
>
> Key: SPARK-11258
> URL: https://issues.apache.org/jira/browse/SPARK-11258
> Project: Spark
>  Issue Type: Improvement
>  Components: SparkR
>Affects Versions: 1.5.1
>Reporter: Frank Rosner
>
> h4. Problem
> We tried to collect a DataFrame with > 1 million rows and a few hundred 
> columns in SparkR. This took a huge amount of time (much more than in the 
> Spark REPL). When looking into the code, I found that the 
> {{org.apache.spark.sql.api.r.SQLUtils.dfToCols}} method does some map and 
> then {{.toArray}} which might cause the problem.
> h4. Solution
> Directly transpose the row wise representation to the column wise 
> representation with one pass through the data. I will create a pull request 
> for this.
> h4. Runtime comparison
> On a test data frame with 1 million rows and 22 columns, the old {{dfToCols}} 
> method takes average 2267 ms to complete. My implementation takes only 554 ms 
> on average. This effect might be due to garbage collection, especially if you 
> consider that the old implementation didn't complete on an even bigger data 
> frame.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-11258) Converting a Spark DataFrame into an R data.frame is slow / requires a lot of memory

2015-10-23 Thread Frank Rosner (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-11258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Frank Rosner updated SPARK-11258:
-
Description: 
h4. Problem

We tried to collect a DataFrame with > 1 million rows and a few hundred columns 
in SparkR. This took a huge amount of time (much more than in the Spark REPL). 
When looking into the code, I found that the 
{{org.apache.spark.sql.api.r.SQLUtils.dfToCols}} method does some map and then 
{{.toArray}} which might cause the problem.

h4. Solution

Directly transpose the row wise representation to the column wise 
representation with one pass through the data. I will create a pull request for 
this.

h4. Runtime comparison

On a test data frame with 1 million rows and 22 columns, the old {{dfToCols}} 
method takes average 2267 ms to complete. My implementation takes only 554 ms 
on average. This effect gets even bigger, the more columns you have.

  was:
h4. Introduction

We tried to collect a DataFrame with > 1 million rows and a few hundred columns 
in SparkR. This took a huge amount of time (much more than in the Spark REPL). 
When looking into the code, I found that the 
{{org.apache.spark.sql.api.r.SQLUtils.dfToCols}} method has quadratic run time 
complexity (it goes through the complete data set _m_ times, where _m_ is the 
number of columns.

h4. Problem

The {{dfToCols}} method is transposing the row-wise representation of the Spark 
DataFrame (array of rows) into a column wise representation (array of columns) 
to then be put into a data frame. This is done in a very inefficient way, 
yielding to huge performance (and possibly also memory) problems when 
collecting bigger data frames.

h4. Solution

Directly transpose the row wise representation to the column wise 
representation with one pass through the data. I will create a pull request for 
this.

h4. Runtime comparison

On a test data frame with 1 million rows and 22 columns, the old {{dfToCols}} 
method takes average 2267 ms to complete. My implementation takes only 554 ms 
on average. This effect gets even bigger, the more columns you have.


> Converting a Spark DataFrame into an R data.frame is slow / requires a lot of 
> memory
> 
>
> Key: SPARK-11258
> URL: https://issues.apache.org/jira/browse/SPARK-11258
> Project: Spark
>  Issue Type: Improvement
>  Components: SparkR
>Affects Versions: 1.5.1
>Reporter: Frank Rosner
>
> h4. Problem
> We tried to collect a DataFrame with > 1 million rows and a few hundred 
> columns in SparkR. This took a huge amount of time (much more than in the 
> Spark REPL). When looking into the code, I found that the 
> {{org.apache.spark.sql.api.r.SQLUtils.dfToCols}} method does some map and 
> then {{.toArray}} which might cause the problem.
> h4. Solution
> Directly transpose the row wise representation to the column wise 
> representation with one pass through the data. I will create a pull request 
> for this.
> h4. Runtime comparison
> On a test data frame with 1 million rows and 22 columns, the old {{dfToCols}} 
> method takes average 2267 ms to complete. My implementation takes only 554 ms 
> on average. This effect gets even bigger, the more columns you have.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-11258) Converting a Spark DataFrame into an R data.frame is slow / requires a lot of memory

2015-10-23 Thread Frank Rosner (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-11258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Frank Rosner updated SPARK-11258:
-
Summary: Converting a Spark DataFrame into an R data.frame is slow / 
requires a lot of memory  (was: Remove quadratic runtime complexity for 
converting a Spark DataFrame into an R data.frame)

> Converting a Spark DataFrame into an R data.frame is slow / requires a lot of 
> memory
> 
>
> Key: SPARK-11258
> URL: https://issues.apache.org/jira/browse/SPARK-11258
> Project: Spark
>  Issue Type: Improvement
>  Components: SparkR
>Affects Versions: 1.5.1
>Reporter: Frank Rosner
>
> h4. Introduction
> We tried to collect a DataFrame with > 1 million rows and a few hundred 
> columns in SparkR. This took a huge amount of time (much more than in the 
> Spark REPL). When looking into the code, I found that the 
> {{org.apache.spark.sql.api.r.SQLUtils.dfToCols}} method has quadratic run 
> time complexity (it goes through the complete data set _m_ times, where _m_ 
> is the number of columns.
> h4. Problem
> The {{dfToCols}} method is transposing the row-wise representation of the 
> Spark DataFrame (array of rows) into a column wise representation (array of 
> columns) to then be put into a data frame. This is done in a very inefficient 
> way, yielding to huge performance (and possibly also memory) problems when 
> collecting bigger data frames.
> h4. Solution
> Directly transpose the row wise representation to the column wise 
> representation with one pass through the data. I will create a pull request 
> for this.
> h4. Runtime comparison
> On a test data frame with 1 million rows and 22 columns, the old {{dfToCols}} 
> method takes average 2267 ms to complete. My implementation takes only 554 ms 
> on average. This effect gets even bigger, the more columns you have.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-11258) Converting a Spark DataFrame into an R data.frame is slow / requires a lot of memory

2015-10-23 Thread Frank Rosner (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-11258?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14971155#comment-14971155
 ] 

Frank Rosner commented on SPARK-11258:
--

I adjusted the description to be more general. I will see if I can get some 
memory profiling or something. Maybe I can also provide a reproducible example.

> Converting a Spark DataFrame into an R data.frame is slow / requires a lot of 
> memory
> 
>
> Key: SPARK-11258
> URL: https://issues.apache.org/jira/browse/SPARK-11258
> Project: Spark
>  Issue Type: Improvement
>  Components: SparkR
>Affects Versions: 1.5.1
>Reporter: Frank Rosner
>
> h4. Problem
> We tried to collect a DataFrame with > 1 million rows and a few hundred 
> columns in SparkR. This took a huge amount of time (much more than in the 
> Spark REPL). When looking into the code, I found that the 
> {{org.apache.spark.sql.api.r.SQLUtils.dfToCols}} method does some map and 
> then {{.toArray}} which might cause the problem.
> h4. Solution
> Directly transpose the row wise representation to the column wise 
> representation with one pass through the data. I will create a pull request 
> for this.
> h4. Runtime comparison
> On a test data frame with 1 million rows and 22 columns, the old {{dfToCols}} 
> method takes average 2267 ms to complete. My implementation takes only 554 ms 
> on average. This effect gets even bigger, the more columns you have.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-11258) Remove quadratic runtime complexity for converting a Spark DataFrame into an R data.frame

2015-10-22 Thread Frank Rosner (JIRA)
Frank Rosner created SPARK-11258:


 Summary: Remove quadratic runtime complexity for converting a 
Spark DataFrame into an R data.frame
 Key: SPARK-11258
 URL: https://issues.apache.org/jira/browse/SPARK-11258
 Project: Spark
  Issue Type: Improvement
  Components: SparkR
Affects Versions: 1.5.1
Reporter: Frank Rosner


h4. Introduction

We tried to collect a DataFrame with > 1 million rows and a few hundred columns 
in SparkR. This took a huge amount of time (much more than in the Spark REPL). 
When looking into the code, I found that the 
{{org.apache.spark.sql.api.r.SQLUtils.dfToCols}} method has quadratic run time 
complexity (it goes through the complete data set _m_ times, where _m_ is the 
number of columns.

h4. Problem

The {{dfToCols}} method is transposing the row-wise representation of the Spark 
DataFrame (array of rows) into a column wise representation (array of columns) 
to then be put into a data frame. This is done in a very inefficient way, 
yielding to huge performance (and possibly also memory) problems when 
collecting bigger data frames.

h4. Solution

Directly transpose the row wise representation to the column wise 
representation with one pass through the data. I will create a pull request for 
this.

h4. Runtime comparison

On a test data frame with 1 million rows and 22 columns, the old `dfToCols` 
method takes average 2267 ms to complete. My implementation takes only 554 ms 
on average. This effect gets even bigger, the more columns you have.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-11258) Remove quadratic runtime complexity for converting a Spark DataFrame into an R data.frame

2015-10-22 Thread Frank Rosner (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-11258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Frank Rosner updated SPARK-11258:
-
Description: 
h4. Introduction

We tried to collect a DataFrame with > 1 million rows and a few hundred columns 
in SparkR. This took a huge amount of time (much more than in the Spark REPL). 
When looking into the code, I found that the 
{{org.apache.spark.sql.api.r.SQLUtils.dfToCols}} method has quadratic run time 
complexity (it goes through the complete data set _m_ times, where _m_ is the 
number of columns.

h4. Problem

The {{dfToCols}} method is transposing the row-wise representation of the Spark 
DataFrame (array of rows) into a column wise representation (array of columns) 
to then be put into a data frame. This is done in a very inefficient way, 
yielding to huge performance (and possibly also memory) problems when 
collecting bigger data frames.

h4. Solution

Directly transpose the row wise representation to the column wise 
representation with one pass through the data. I will create a pull request for 
this.

h4. Runtime comparison

On a test data frame with 1 million rows and 22 columns, the old {{dfToCols}} 
method takes average 2267 ms to complete. My implementation takes only 554 ms 
on average. This effect gets even bigger, the more columns you have.

  was:
h4. Introduction

We tried to collect a DataFrame with > 1 million rows and a few hundred columns 
in SparkR. This took a huge amount of time (much more than in the Spark REPL). 
When looking into the code, I found that the 
{{org.apache.spark.sql.api.r.SQLUtils.dfToCols}} method has quadratic run time 
complexity (it goes through the complete data set _m_ times, where _m_ is the 
number of columns.

h4. Problem

The {{dfToCols}} method is transposing the row-wise representation of the Spark 
DataFrame (array of rows) into a column wise representation (array of columns) 
to then be put into a data frame. This is done in a very inefficient way, 
yielding to huge performance (and possibly also memory) problems when 
collecting bigger data frames.

h4. Solution

Directly transpose the row wise representation to the column wise 
representation with one pass through the data. I will create a pull request for 
this.

h4. Runtime comparison

On a test data frame with 1 million rows and 22 columns, the old `dfToCols` 
method takes average 2267 ms to complete. My implementation takes only 554 ms 
on average. This effect gets even bigger, the more columns you have.


> Remove quadratic runtime complexity for converting a Spark DataFrame into an 
> R data.frame
> -
>
> Key: SPARK-11258
> URL: https://issues.apache.org/jira/browse/SPARK-11258
> Project: Spark
>  Issue Type: Improvement
>  Components: SparkR
>Affects Versions: 1.5.1
>Reporter: Frank Rosner
>
> h4. Introduction
> We tried to collect a DataFrame with > 1 million rows and a few hundred 
> columns in SparkR. This took a huge amount of time (much more than in the 
> Spark REPL). When looking into the code, I found that the 
> {{org.apache.spark.sql.api.r.SQLUtils.dfToCols}} method has quadratic run 
> time complexity (it goes through the complete data set _m_ times, where _m_ 
> is the number of columns.
> h4. Problem
> The {{dfToCols}} method is transposing the row-wise representation of the 
> Spark DataFrame (array of rows) into a column wise representation (array of 
> columns) to then be put into a data frame. This is done in a very inefficient 
> way, yielding to huge performance (and possibly also memory) problems when 
> collecting bigger data frames.
> h4. Solution
> Directly transpose the row wise representation to the column wise 
> representation with one pass through the data. I will create a pull request 
> for this.
> h4. Runtime comparison
> On a test data frame with 1 million rows and 22 columns, the old {{dfToCols}} 
> method takes average 2267 ms to complete. My implementation takes only 554 ms 
> on average. This effect gets even bigger, the more columns you have.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-10493) reduceByKey not returning distinct results

2015-09-08 Thread Frank Rosner (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14735598#comment-14735598
 ] 

Frank Rosner commented on SPARK-10493:
--

Thanks for submitting the issue, [~glenn.strycker] :)

Can you provide a minimal example so we can try to reproduce the issue? It 
should also contain the submit command (or are you using the shell)?

> reduceByKey not returning distinct results
> --
>
> Key: SPARK-10493
> URL: https://issues.apache.org/jira/browse/SPARK-10493
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Reporter: Glenn Strycker
>
> I am running Spark 1.3.0 and creating an RDD by unioning several earlier RDDs 
> (using zipPartitions), partitioning by a hash partitioner, and then applying 
> a reduceByKey to summarize statistics by key.
> Since my set before the reduceByKey consists of records such as (K, V1), (K, 
> V2), (K, V3), I expect the results after reduceByKey to be just (K, 
> f(V1,V2,V3)), where the function f is appropriately associative, commutative, 
> etc.  Therefore, the results after reduceByKey ought to be distinct, correct? 
>  I am running counts of my RDD and finding that adding an additional 
> .distinct after my .reduceByKey is changing the final count!!
> Here is some example code:
> rdd3 = tempRDD1.
>zipPartitions(tempRDD2, true)((iter, iter2) => iter++iter2).
>partitionBy(new HashPartitioner(numPartitions)).
>reduceByKey((a,b) => (math.Ordering.String.min(a._1, b._1), a._2 + b._2, 
> math.max(a._3, b._3), math.max(a._4, b._4), math.max(a._5, b._5)))
> println(rdd3.count)
> rdd4 = rdd3.distinct
> println(rdd4.count)
> I am using persistence, checkpointing, and other stuff in my actual code that 
> I did not paste here, so I can paste my actual code if it would be helpful.
> This issue may be related to SPARK-2620, except I am not using case classes, 
> to my knowledge.
> See also 
> http://stackoverflow.com/questions/32466176/apache-spark-rdd-reducebykey-operation-not-returning-correct-distinct-results



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-9971) MaxFunction not working correctly with columns containing Double.NaN

2015-08-14 Thread Frank Rosner (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-9971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14696889#comment-14696889
 ] 

Frank Rosner commented on SPARK-9971:
-

Ok so shall we close it as a wontfix?

 MaxFunction not working correctly with columns containing Double.NaN
 

 Key: SPARK-9971
 URL: https://issues.apache.org/jira/browse/SPARK-9971
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 1.4.1
Reporter: Frank Rosner
Priority: Minor

 h4. Problem Description
 When using the {{max}} function on a {{DoubleType}} column that contains 
 {{Double.NaN}} values, the returned maximum value will be {{Double.NaN}}. 
 This is because it compares all values with the running maximum. However, {{x 
  Double.NaN}} will always lead false for all {{x: Double}}, so will {{x  
 Double.NaN}}.
 h4. How to Reproduce
 {code}
 import org.apache.spark.sql.{SQLContext, Row}
 import org.apache.spark.sql.types._
 val sql = new SQLContext(sc)
 val rdd = sc.makeRDD(List(Row(Double.NaN), Row(-10d), Row(0d)))
 val dataFrame = sql.createDataFrame(rdd, StructType(List(
   StructField(col, DoubleType, false)
 )))
 dataFrame.select(max(col)).first
 // returns org.apache.spark.sql.Row = [NaN]
 {code}
 h4. Solution
 The {{max}} and {{min}} functions should ignore NaN values, as they are not 
 numbers. If a column contains only NaN values, then the maximum and minimum 
 is not defined.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-9971) MaxFunction not working correctly with columns containing Double.NaN

2015-08-14 Thread Frank Rosner (JIRA)
Frank Rosner created SPARK-9971:
---

 Summary: MaxFunction not working correctly with columns containing 
Double.NaN
 Key: SPARK-9971
 URL: https://issues.apache.org/jira/browse/SPARK-9971
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 1.4.1
Reporter: Frank Rosner


h5. Problem Description

When using the {{max}} function on a {{DoubleType}} column that contains 
{{Double.NaN}} values, the returned maximum value will be {{Double.NaN}}. 

This is because it compares all values with the running maximum. However, {{x  
Double.NaN}} will always lead false for all {{x: Double}}, so will {{x  
Double.NaN}}.

h5. How to Reproduce

{code}
import org.apache.spark.sql.{SQLContext, Row}
import org.apache.spark.sql.types._

val sql = new SQLContext(sc)
val rdd = sc.makeRDD(List(Row(Double.NaN), Row(-10d), Row(0d)))
val dataFrame = sql.createDataFrame(rdd, StructType(List(
  StructField(col, DoubleType, false)
)))
dataFrame.select(max(col)).first
// returns org.apache.spark.sql.Row = [NaN]
{code}

h5. Solution

The {{max}} and {{min}} functions should ignore NaN values, as they are not 
numbers. If a column contains only NaN values, then the maximum and minimum is 
not defined.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-9971) MaxFunction not working correctly with columns containing Double.NaN

2015-08-14 Thread Frank Rosner (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-9971?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Frank Rosner updated SPARK-9971:

Priority: Minor  (was: Major)

 MaxFunction not working correctly with columns containing Double.NaN
 

 Key: SPARK-9971
 URL: https://issues.apache.org/jira/browse/SPARK-9971
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 1.4.1
Reporter: Frank Rosner
Priority: Minor

 h5. Problem Description
 When using the {{max}} function on a {{DoubleType}} column that contains 
 {{Double.NaN}} values, the returned maximum value will be {{Double.NaN}}. 
 This is because it compares all values with the running maximum. However, {{x 
  Double.NaN}} will always lead false for all {{x: Double}}, so will {{x  
 Double.NaN}}.
 h5. How to Reproduce
 {code}
 import org.apache.spark.sql.{SQLContext, Row}
 import org.apache.spark.sql.types._
 val sql = new SQLContext(sc)
 val rdd = sc.makeRDD(List(Row(Double.NaN), Row(-10d), Row(0d)))
 val dataFrame = sql.createDataFrame(rdd, StructType(List(
   StructField(col, DoubleType, false)
 )))
 dataFrame.select(max(col)).first
 // returns org.apache.spark.sql.Row = [NaN]
 {code}
 h5. Solution
 The {{max}} and {{min}} functions should ignore NaN values, as they are not 
 numbers. If a column contains only NaN values, then the maximum and minimum 
 is not defined.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-9971) MaxFunction not working correctly with columns containing Double.NaN

2015-08-14 Thread Frank Rosner (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-9971?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Frank Rosner updated SPARK-9971:

Description: 
h4. Problem Description

When using the {{max}} function on a {{DoubleType}} column that contains 
{{Double.NaN}} values, the returned maximum value will be {{Double.NaN}}. 

This is because it compares all values with the running maximum. However, {{x  
Double.NaN}} will always lead false for all {{x: Double}}, so will {{x  
Double.NaN}}.

h4. How to Reproduce

{code}
import org.apache.spark.sql.{SQLContext, Row}
import org.apache.spark.sql.types._

val sql = new SQLContext(sc)
val rdd = sc.makeRDD(List(Row(Double.NaN), Row(-10d), Row(0d)))
val dataFrame = sql.createDataFrame(rdd, StructType(List(
  StructField(col, DoubleType, false)
)))
dataFrame.select(max(col)).first
// returns org.apache.spark.sql.Row = [NaN]
{code}

h4. Solution

The {{max}} and {{min}} functions should ignore NaN values, as they are not 
numbers. If a column contains only NaN values, then the maximum and minimum is 
not defined.

  was:
h5. Problem Description

When using the {{max}} function on a {{DoubleType}} column that contains 
{{Double.NaN}} values, the returned maximum value will be {{Double.NaN}}. 

This is because it compares all values with the running maximum. However, {{x  
Double.NaN}} will always lead false for all {{x: Double}}, so will {{x  
Double.NaN}}.

h5. How to Reproduce

{code}
import org.apache.spark.sql.{SQLContext, Row}
import org.apache.spark.sql.types._

val sql = new SQLContext(sc)
val rdd = sc.makeRDD(List(Row(Double.NaN), Row(-10d), Row(0d)))
val dataFrame = sql.createDataFrame(rdd, StructType(List(
  StructField(col, DoubleType, false)
)))
dataFrame.select(max(col)).first
// returns org.apache.spark.sql.Row = [NaN]
{code}

h5. Solution

The {{max}} and {{min}} functions should ignore NaN values, as they are not 
numbers. If a column contains only NaN values, then the maximum and minimum is 
not defined.


 MaxFunction not working correctly with columns containing Double.NaN
 

 Key: SPARK-9971
 URL: https://issues.apache.org/jira/browse/SPARK-9971
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 1.4.1
Reporter: Frank Rosner
Priority: Minor

 h4. Problem Description
 When using the {{max}} function on a {{DoubleType}} column that contains 
 {{Double.NaN}} values, the returned maximum value will be {{Double.NaN}}. 
 This is because it compares all values with the running maximum. However, {{x 
  Double.NaN}} will always lead false for all {{x: Double}}, so will {{x  
 Double.NaN}}.
 h4. How to Reproduce
 {code}
 import org.apache.spark.sql.{SQLContext, Row}
 import org.apache.spark.sql.types._
 val sql = new SQLContext(sc)
 val rdd = sc.makeRDD(List(Row(Double.NaN), Row(-10d), Row(0d)))
 val dataFrame = sql.createDataFrame(rdd, StructType(List(
   StructField(col, DoubleType, false)
 )))
 dataFrame.select(max(col)).first
 // returns org.apache.spark.sql.Row = [NaN]
 {code}
 h4. Solution
 The {{max}} and {{min}} functions should ignore NaN values, as they are not 
 numbers. If a column contains only NaN values, then the maximum and minimum 
 is not defined.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-9971) MaxFunction not working correctly with columns containing Double.NaN

2015-08-14 Thread Frank Rosner (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-9971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14696760#comment-14696760
 ] 

Frank Rosner commented on SPARK-9971:
-

I would like to provide a patch to make the following unit tests in 
{{DataFrameFunctionsSuite}} succeed:

{code}
  test(max function ignoring Double.NaN) {
val df = Seq(
  (Double.NaN, Double.NaN),
  (-10d, Double.NaN),
  (10d, Double.NaN)
).toDF(col1, col2)
checkAnswer(
  df.select(max(col1)),
  Seq(Row(10d))
)
checkAnswer(
  df.select(max(col2)),
  Seq(Row(Double.NaN))
)
  }

  test(min function ignoring Double.NaN) {
val df = Seq(
  (Double.NaN, Double.NaN),
  (-10d, Double.NaN),
  (10d, Double.NaN)
).toDF(col1, col2)
checkAnswer(
  df.select(min(col1)),
  Seq(Row(-10d))
)
checkAnswer(
  df.select(min(col1)),
  Seq(Row(Double.NaN))
)
  }
{code}

 MaxFunction not working correctly with columns containing Double.NaN
 

 Key: SPARK-9971
 URL: https://issues.apache.org/jira/browse/SPARK-9971
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 1.4.1
Reporter: Frank Rosner
Priority: Minor

 h4. Problem Description
 When using the {{max}} function on a {{DoubleType}} column that contains 
 {{Double.NaN}} values, the returned maximum value will be {{Double.NaN}}. 
 This is because it compares all values with the running maximum. However, {{x 
  Double.NaN}} will always lead false for all {{x: Double}}, so will {{x  
 Double.NaN}}.
 h4. How to Reproduce
 {code}
 import org.apache.spark.sql.{SQLContext, Row}
 import org.apache.spark.sql.types._
 val sql = new SQLContext(sc)
 val rdd = sc.makeRDD(List(Row(Double.NaN), Row(-10d), Row(0d)))
 val dataFrame = sql.createDataFrame(rdd, StructType(List(
   StructField(col, DoubleType, false)
 )))
 dataFrame.select(max(col)).first
 // returns org.apache.spark.sql.Row = [NaN]
 {code}
 h4. Solution
 The {{max}} and {{min}} functions should ignore NaN values, as they are not 
 numbers. If a column contains only NaN values, then the maximum and minimum 
 is not defined.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-9971) MaxFunction not working correctly with columns containing Double.NaN

2015-08-14 Thread Frank Rosner (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-9971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14696766#comment-14696766
 ] 

Frank Rosner commented on SPARK-9971:
-

[~srowen] I get your point. But given your weird examples, I don't think that 
being consistent with the Scala collection library is desirable. When I work on 
a data set and there are values which are not a number (e.g. divided by zero) 
and I want to compute the maximum, I find it convenient not to give me NaN. NaN 
is not a number so it cannot be a maximum number by definition.

 MaxFunction not working correctly with columns containing Double.NaN
 

 Key: SPARK-9971
 URL: https://issues.apache.org/jira/browse/SPARK-9971
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 1.4.1
Reporter: Frank Rosner
Priority: Minor

 h4. Problem Description
 When using the {{max}} function on a {{DoubleType}} column that contains 
 {{Double.NaN}} values, the returned maximum value will be {{Double.NaN}}. 
 This is because it compares all values with the running maximum. However, {{x 
  Double.NaN}} will always lead false for all {{x: Double}}, so will {{x  
 Double.NaN}}.
 h4. How to Reproduce
 {code}
 import org.apache.spark.sql.{SQLContext, Row}
 import org.apache.spark.sql.types._
 val sql = new SQLContext(sc)
 val rdd = sc.makeRDD(List(Row(Double.NaN), Row(-10d), Row(0d)))
 val dataFrame = sql.createDataFrame(rdd, StructType(List(
   StructField(col, DoubleType, false)
 )))
 dataFrame.select(max(col)).first
 // returns org.apache.spark.sql.Row = [NaN]
 {code}
 h4. Solution
 The {{max}} and {{min}} functions should ignore NaN values, as they are not 
 numbers. If a column contains only NaN values, then the maximum and minimum 
 is not defined.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-9971) MaxFunction not working correctly with columns containing Double.NaN

2015-08-14 Thread Frank Rosner (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-9971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14696760#comment-14696760
 ] 

Frank Rosner edited comment on SPARK-9971 at 8/14/15 9:34 AM:
--

I would like to provide a patch to make the following unit tests in 
{{DataFrameFunctionsSuite}} succeed:

{code}
  test(max function ignoring Double.NaN) {
val df = Seq(
  (Double.NaN, Double.NaN),
  (-10d, Double.NaN),
  (10d, Double.NaN)
).toDF(col1, col2)
checkAnswer(
  df.select(max(col1)),
  Seq(Row(10d))
)
checkAnswer(
  df.select(max(col2)),
  Seq(Row(null))
)
  }

  test(min function ignoring Double.NaN) {
val df = Seq(
  (Double.NaN, Double.NaN),
  (-10d, Double.NaN),
  (10d, Double.NaN)
).toDF(col1, col2)
checkAnswer(
  df.select(min(col1)),
  Seq(Row(-10d))
)
checkAnswer(
  df.select(min(col1)),
  Seq(Row(null))
)
  }
{code}


was (Author: frosner):
I would like to provide a patch to make the following unit tests in 
{{DataFrameFunctionsSuite}} succeed:

{code}
  test(max function ignoring Double.NaN) {
val df = Seq(
  (Double.NaN, Double.NaN),
  (-10d, Double.NaN),
  (10d, Double.NaN)
).toDF(col1, col2)
checkAnswer(
  df.select(max(col1)),
  Seq(Row(10d))
)
checkAnswer(
  df.select(max(col2)),
  Seq(Row(Double.NaN))
)
  }

  test(min function ignoring Double.NaN) {
val df = Seq(
  (Double.NaN, Double.NaN),
  (-10d, Double.NaN),
  (10d, Double.NaN)
).toDF(col1, col2)
checkAnswer(
  df.select(min(col1)),
  Seq(Row(-10d))
)
checkAnswer(
  df.select(min(col1)),
  Seq(Row(Double.NaN))
)
  }
{code}

 MaxFunction not working correctly with columns containing Double.NaN
 

 Key: SPARK-9971
 URL: https://issues.apache.org/jira/browse/SPARK-9971
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 1.4.1
Reporter: Frank Rosner
Priority: Minor

 h4. Problem Description
 When using the {{max}} function on a {{DoubleType}} column that contains 
 {{Double.NaN}} values, the returned maximum value will be {{Double.NaN}}. 
 This is because it compares all values with the running maximum. However, {{x 
  Double.NaN}} will always lead false for all {{x: Double}}, so will {{x  
 Double.NaN}}.
 h4. How to Reproduce
 {code}
 import org.apache.spark.sql.{SQLContext, Row}
 import org.apache.spark.sql.types._
 val sql = new SQLContext(sc)
 val rdd = sc.makeRDD(List(Row(Double.NaN), Row(-10d), Row(0d)))
 val dataFrame = sql.createDataFrame(rdd, StructType(List(
   StructField(col, DoubleType, false)
 )))
 dataFrame.select(max(col)).first
 // returns org.apache.spark.sql.Row = [NaN]
 {code}
 h4. Solution
 The {{max}} and {{min}} functions should ignore NaN values, as they are not 
 numbers. If a column contains only NaN values, then the maximum and minimum 
 is not defined.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6480) histogram() bucket function is wrong in some simple edge cases

2015-03-26 Thread Frank Rosner (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14381588#comment-14381588
 ] 

Frank Rosner commented on SPARK-6480:
-

[~srowen] will do today!

 histogram() bucket function is wrong in some simple edge cases
 --

 Key: SPARK-6480
 URL: https://issues.apache.org/jira/browse/SPARK-6480
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.3.0
Reporter: Sean Owen
Assignee: Sean Owen

 (Credit to a customer report here) This test would fail now: 
 {code}
 val rdd = sc.parallelize(Seq(1, 1, 1, 2, 3, 3))
 assert(Array(3, 1, 2) === rdd.map(_.toDouble).histogram(3)._2)
 {code}
 Because it returns 3, 1, 0. The problem ultimately traces to the 'fast' 
 bucket function that judges buckets based on a multiple of the gap between 
 first and second elements. Errors multiply and the end of the final bucket 
 fails to include the max.
 Fairly plausible use case actually.
 This can be tightened up easily with a slightly better expression. It will 
 also fix this test, which is actually expecting the wrong answer:
 {code}
 val rdd = sc.parallelize(6 to 99)
 val (histogramBuckets, histogramResults) = rdd.histogram(9)
 val expectedHistogramResults =
   Array(11, 10, 11, 10, 10, 11, 10, 10, 11)
 {code}
 (Should be {{Array(11, 10, 10, 11, 10, 10, 11, 10, 11)}})



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6480) histogram() bucket function is wrong in some simple edge cases

2015-03-24 Thread Frank Rosner (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14377611#comment-14377611
 ] 

Frank Rosner commented on SPARK-6480:
-

Thanks for picking it up [~srowen]!

 histogram() bucket function is wrong in some simple edge cases
 --

 Key: SPARK-6480
 URL: https://issues.apache.org/jira/browse/SPARK-6480
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.3.0
Reporter: Sean Owen
Assignee: Sean Owen

 (Credit to a customer report here) This test would fail now: 
 {code}
 val rdd = sc.parallelize(Seq(1, 1, 1, 2, 3, 3))
 assert(Array(3, 1, 2) === rdd.map(_.toDouble).histogram(3)._2)
 {code}
 Because it returns 3, 1, 0. The problem ultimately traces to the 'fast' 
 bucket function that judges buckets based on a multiple of the gap between 
 first and second elements. Errors multiply and the end of the final bucket 
 fails to include the max.
 Fairly plausible use case actually.
 This can be tightened up easily with a slightly better expression. It will 
 also fix this test, which is actually expecting the wrong answer:
 {code}
 val rdd = sc.parallelize(6 to 99)
 val (histogramBuckets, histogramResults) = rdd.histogram(9)
 val expectedHistogramResults =
   Array(11, 10, 11, 10, 10, 11, 10, 10, 11)
 {code}
 (Should be {{Array(11, 10, 10, 11, 10, 10, 11, 10, 11)}})



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2620) case class cannot be used as key for reduce

2015-01-21 Thread Frank Rosner (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14285381#comment-14285381
 ] 

Frank Rosner commented on SPARK-2620:
-

Why is the Spark REPL wrapping user code into classes instead of objects? Is 
this related to how the stuff gets send to the nodes? Can't we change it?

 case class cannot be used as key for reduce
 ---

 Key: SPARK-2620
 URL: https://issues.apache.org/jira/browse/SPARK-2620
 Project: Spark
  Issue Type: Bug
  Components: Spark Shell
Affects Versions: 1.0.0, 1.1.0
 Environment: reproduced on spark-shell local[4]
Reporter: Gerard Maas
Assignee: Tobias Schlatter
Priority: Critical
  Labels: case-class, core

 Using a case class as a key doesn't seem to work properly on Spark 1.0.0
 A minimal example:
 case class P(name:String)
 val ps = Array(P(alice), P(bob), P(charly), P(bob))
 sc.parallelize(ps).map(x= (x,1)).reduceByKey((x,y) = x+y).collect
 [Spark shell local mode] res : Array[(P, Int)] = Array((P(bob),1), 
 (P(bob),1), (P(abe),1), (P(charly),1))
 In contrast to the expected behavior, that should be equivalent to:
 sc.parallelize(ps).map(x= (x.name,1)).reduceByKey((x,y) = x+y).collect
 Array[(String, Int)] = Array((charly,1), (abe,1), (bob,2))
 groupByKey and distinct also present the same behavior.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2620) case class cannot be used as key for reduce

2015-01-19 Thread Frank Rosner (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14282323#comment-14282323
 ] 

Frank Rosner commented on SPARK-2620:
-

The issue is caused by the fact that pattern matching of case classes does not 
work in the Spark shell.

Reducing by key (same as grouping, distinc operators, etc.) rely on equality 
checking of objects. Case classes implement equality checking by using pattern 
matching to perform type checking and conversion. When I implement a custom 
equals method for the case class that checks the type with {{isInstanceOf}} and 
casts with {{asInstanceOf}}, then {{==}} works and so do distinct and key-based 
operations.

Does it make sense to break this down and rework this issue to cover the actual 
problem rather than a symptom?

Are there any plans to fix this issue? It requires some extra effort when 
working with case classes and the REPL, especially blocking rapid data 
exploration and analytics. I will try to dig into the code and see whether I 
can find a solution but there seems to be an ongoing discussion for quite a 
while now.

 case class cannot be used as key for reduce
 ---

 Key: SPARK-2620
 URL: https://issues.apache.org/jira/browse/SPARK-2620
 Project: Spark
  Issue Type: Bug
  Components: Spark Shell
Affects Versions: 1.0.0, 1.1.0
 Environment: reproduced on spark-shell local[4]
Reporter: Gerard Maas
Assignee: Tobias Schlatter
Priority: Critical
  Labels: case-class, core

 Using a case class as a key doesn't seem to work properly on Spark 1.0.0
 A minimal example:
 case class P(name:String)
 val ps = Array(P(alice), P(bob), P(charly), P(bob))
 sc.parallelize(ps).map(x= (x,1)).reduceByKey((x,y) = x+y).collect
 [Spark shell local mode] res : Array[(P, Int)] = Array((P(bob),1), 
 (P(bob),1), (P(abe),1), (P(charly),1))
 In contrast to the expected behavior, that should be equivalent to:
 sc.parallelize(ps).map(x= (x.name,1)).reduceByKey((x,y) = x+y).collect
 Array[(String, Int)] = Array((charly,1), (abe,1), (bob,2))
 groupByKey and distinct also present the same behavior.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org