[jira] [Updated] (SPARK-24588) StreamingSymmetricHashJoinExec should require HashClusteredPartitioning from children

2018-06-18 Thread Xiao Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24588?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiao Li updated SPARK-24588:

Description: 
In https://github.com/apache/spark/pull/19080, we simplified the 
distribution/partitioning framework, and make all the join-like operators 
require HashClusteredPartitioning from children. Unfortunately streaming join 
operator was missed.

This can cause wrong result. Think about

{code}
val input1 = MemoryStream[Int]
val input2 = MemoryStream[Int]

val df1 = input1.toDF.select('value as 'a, 'value * 2 as 'b)
val df2 = input2.toDF.select('value as 'a, 'value * 2 as 'b).repartition('b)
val joined = df1.join(df2, Seq("a", "b")).select('a)
{code}

The physical plan is

{code}
*(3) Project [a#5, b#6, c#7, c#14]
+- StreamingSymmetricHashJoin [a#5, b#6], [a#12, b#13], Inner, condition = [ 
leftOnly = null, rightOnly = null, both = null, full = null ], state info [ 
checkpoint = , runId = 5a1ab77a-ed5c-4f0b-8bcb-fc5637152b97, opId = 0, 
ver = 0, numPartitions = 5], 0, state cleanup [ left = null, right = null ]
   :- Exchange hashpartitioning(a#5, b#6, 5)
   :  +- *(1) Project [value#1 AS a#5, (value#1 * 2) AS b#6, (value#1 * 3) AS 
c#7]
   : +- StreamingRelation MemoryStream[value#1], [value#1]
   +- Exchange hashpartitioning(a#12, b#13, 5)
  +- Exchange hashpartitioning(b#13, 5)
 +- *(2) Project [value#3 AS a#12, (value#3 * 3) AS b#13, (value#3 * 4) 
AS c#14]
+- StreamingRelation MemoryStream[value#3], [value#3]
{code}

The left table is hash partitioned by a, b, while the right table is hash 
partitioned by b. This means, we may have a matching record that is in 
different partitions, which should be in the output but not.

  was:
In https://github.com/apache/spark/pull/19080, we simplified the 
distribution/partitioning framework, and make all the join-like operators 
require HashClusteredPartitioning from children. Unfortunately streaming join 
operator was missed.

This can cause wrong result. Think about

val input1 = MemoryStream[Int]
val input2 = MemoryStream[Int]

val df1 = input1.toDF.select('value as 'a, 'value * 2 as 'b)
val df2 = input2.toDF.select('value as 'a, 'value * 2 as 'b).repartition('b)
val joined = df1.join(df2, Seq("a", "b")).select('a)
The physical plan is

*(3) Project [a#5, b#6, c#7, c#14]
+- StreamingSymmetricHashJoin [a#5, b#6], [a#12, b#13], Inner, condition = [ 
leftOnly = null, rightOnly = null, both = null, full = null ], state info [ 
checkpoint = , runId = 5a1ab77a-ed5c-4f0b-8bcb-fc5637152b97, opId = 0, 
ver = 0, numPartitions = 5], 0, state cleanup [ left = null, right = null ]
   :- Exchange hashpartitioning(a#5, b#6, 5)
   :  +- *(1) Project [value#1 AS a#5, (value#1 * 2) AS b#6, (value#1 * 3) AS 
c#7]
   : +- StreamingRelation MemoryStream[value#1], [value#1]
   +- Exchange hashpartitioning(a#12, b#13, 5)
  +- Exchange hashpartitioning(b#13, 5)
 +- *(2) Project [value#3 AS a#12, (value#3 * 3) AS b#13, (value#3 * 4) 
AS c#14]
+- StreamingRelation MemoryStream[value#3], [value#3]
The left table is hash partitioned by a, b, while the right table is hash 
partitioned by b. This means, we may have a matching record that is in 
different partitions, which should be in the output but not.


> StreamingSymmetricHashJoinExec should require HashClusteredPartitioning from 
> children
> -
>
> Key: SPARK-24588
> URL: https://issues.apache.org/jira/browse/SPARK-24588
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.0, 2.3.1
>Reporter: Wenchen Fan
>Assignee: Wenchen Fan
>Priority: Blocker
>  Labels: correctness
>
> In https://github.com/apache/spark/pull/19080, we simplified the 
> distribution/partitioning framework, and make all the join-like operators 
> require HashClusteredPartitioning from children. Unfortunately streaming join 
> operator was missed.
> This can cause wrong result. Think about
> {code}
> val input1 = MemoryStream[Int]
> val input2 = MemoryStream[Int]
> val df1 = input1.toDF.select('value as 'a, 'value * 2 as 'b)
> val df2 = input2.toDF.select('value as 'a, 'value * 2 as 'b).repartition('b)
> val joined = df1.join(df2, Seq("a", "b")).select('a)
> {code}
> The physical plan is
> {code}
> *(3) Project [a#5, b#6, c#7, c#14]
> +- StreamingSymmetricHashJoin [a#5, b#6], [a#12, b#13], Inner, condition = [ 
> leftOnly = null, rightOnly = null, both = null, full = null ], state info [ 
> checkpoint = , runId = 5a1ab77a-ed5c-4f0b-8bcb-fc5637152b97, opId = 
> 0, ver = 0, numPartitions = 5], 0, state cleanup [ left = null, right = null ]
>:- Exchange hashpartitioning(a#5, b#6, 5)
>:  +- *(1) Project [value#1 AS a#5, (value#1 * 2) AS b#6, (value#1 * 3) AS 
> c#7]
>: 

[jira] [Updated] (SPARK-24588) StreamingSymmetricHashJoinExec should require HashClusteredPartitioning from children

2018-06-18 Thread Xiao Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24588?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiao Li updated SPARK-24588:

Description: 
In https://github.com/apache/spark/pull/19080, we simplified the 
distribution/partitioning framework, and make all the join-like operators 
require HashClusteredPartitioning from children. Unfortunately streaming join 
operator was missed.

This can cause wrong result. Think about

val input1 = MemoryStream[Int]
val input2 = MemoryStream[Int]

val df1 = input1.toDF.select('value as 'a, 'value * 2 as 'b)
val df2 = input2.toDF.select('value as 'a, 'value * 2 as 'b).repartition('b)
val joined = df1.join(df2, Seq("a", "b")).select('a)
The physical plan is

*(3) Project [a#5, b#6, c#7, c#14]
+- StreamingSymmetricHashJoin [a#5, b#6], [a#12, b#13], Inner, condition = [ 
leftOnly = null, rightOnly = null, both = null, full = null ], state info [ 
checkpoint = , runId = 5a1ab77a-ed5c-4f0b-8bcb-fc5637152b97, opId = 0, 
ver = 0, numPartitions = 5], 0, state cleanup [ left = null, right = null ]
   :- Exchange hashpartitioning(a#5, b#6, 5)
   :  +- *(1) Project [value#1 AS a#5, (value#1 * 2) AS b#6, (value#1 * 3) AS 
c#7]
   : +- StreamingRelation MemoryStream[value#1], [value#1]
   +- Exchange hashpartitioning(a#12, b#13, 5)
  +- Exchange hashpartitioning(b#13, 5)
 +- *(2) Project [value#3 AS a#12, (value#3 * 3) AS b#13, (value#3 * 4) 
AS c#14]
+- StreamingRelation MemoryStream[value#3], [value#3]
The left table is hash partitioned by a, b, while the right table is hash 
partitioned by b. This means, we may have a matching record that is in 
different partitions, which should be in the output but not.

> StreamingSymmetricHashJoinExec should require HashClusteredPartitioning from 
> children
> -
>
> Key: SPARK-24588
> URL: https://issues.apache.org/jira/browse/SPARK-24588
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.0, 2.3.1
>Reporter: Wenchen Fan
>Assignee: Wenchen Fan
>Priority: Blocker
>  Labels: correctness
>
> In https://github.com/apache/spark/pull/19080, we simplified the 
> distribution/partitioning framework, and make all the join-like operators 
> require HashClusteredPartitioning from children. Unfortunately streaming join 
> operator was missed.
> This can cause wrong result. Think about
> val input1 = MemoryStream[Int]
> val input2 = MemoryStream[Int]
> val df1 = input1.toDF.select('value as 'a, 'value * 2 as 'b)
> val df2 = input2.toDF.select('value as 'a, 'value * 2 as 'b).repartition('b)
> val joined = df1.join(df2, Seq("a", "b")).select('a)
> The physical plan is
> *(3) Project [a#5, b#6, c#7, c#14]
> +- StreamingSymmetricHashJoin [a#5, b#6], [a#12, b#13], Inner, condition = [ 
> leftOnly = null, rightOnly = null, both = null, full = null ], state info [ 
> checkpoint = , runId = 5a1ab77a-ed5c-4f0b-8bcb-fc5637152b97, opId = 
> 0, ver = 0, numPartitions = 5], 0, state cleanup [ left = null, right = null ]
>:- Exchange hashpartitioning(a#5, b#6, 5)
>:  +- *(1) Project [value#1 AS a#5, (value#1 * 2) AS b#6, (value#1 * 3) AS 
> c#7]
>: +- StreamingRelation MemoryStream[value#1], [value#1]
>+- Exchange hashpartitioning(a#12, b#13, 5)
>   +- Exchange hashpartitioning(b#13, 5)
>  +- *(2) Project [value#3 AS a#12, (value#3 * 3) AS b#13, (value#3 * 
> 4) AS c#14]
> +- StreamingRelation MemoryStream[value#3], [value#3]
> The left table is hash partitioned by a, b, while the right table is hash 
> partitioned by b. This means, we may have a matching record that is in 
> different partitions, which should be in the output but not.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24588) StreamingSymmetricHashJoinExec should require HashClusteredPartitioning from children

2018-06-18 Thread Xiao Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24588?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiao Li updated SPARK-24588:

Target Version/s: 2.3.2, 2.4.0

> StreamingSymmetricHashJoinExec should require HashClusteredPartitioning from 
> children
> -
>
> Key: SPARK-24588
> URL: https://issues.apache.org/jira/browse/SPARK-24588
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.0, 2.3.1
>Reporter: Wenchen Fan
>Assignee: Wenchen Fan
>Priority: Blocker
>  Labels: correctness
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-24542) Hive UDF series UDFXPathXXXX allow users to pass carefully crafted XML to access arbitrary files

2018-06-18 Thread Wenchen Fan (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan resolved SPARK-24542.
-
   Resolution: Fixed
Fix Version/s: 2.3.2
   2.4.0

Issue resolved by pull request 21549
[https://github.com/apache/spark/pull/21549]

> Hive UDF series UDFXPath allow users to pass carefully crafted XML to 
> access arbitrary files
> 
>
> Key: SPARK-24542
> URL: https://issues.apache.org/jira/browse/SPARK-24542
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.0.2, 2.1.2, 2.2.1, 2.3.1
>Reporter: Xiao Li
>Assignee: Xiao Li
>Priority: Major
> Fix For: 2.4.0, 2.3.2
>
>
> Hive UDF series UDFXPath allow users to pass carefully crafted XML to 
> access arbitrary files. Spark does not have built-in access control. When 
> users use the external access control library, users might bypass them and 
> access the file contents. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue

2018-06-18 Thread Wenbo Zhao (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16516621#comment-16516621
 ] 

Wenbo Zhao commented on SPARK-24578:


Hi [~irashid], many thanks for clarifying my questions. I tried to compare the 
logs between Spark 2.2.1 and 2.3.0 to see if there something interesting but 
out of luck so far. I added a few logging to benchmark how much time we need to 
transfer ~500MB data in the 
[https://github.com/apache/spark/blob/v2.3.0/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java#L142].
 In Spark 2.2.1, it only took <= 2s, but in Spark 2.3.0, it is very slow and 
never got finish with 120s and thus timeout. 

Are you able to reproduce the issue?

> Reading remote cache block behavior changes and causes timeout issue
> 
>
> Key: SPARK-24578
> URL: https://issues.apache.org/jira/browse/SPARK-24578
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0, 2.3.1
>Reporter: Wenbo Zhao
>Priority: Major
>
> After Spark 2.3, we observed lots of errors like the following in some of our 
> production job
> {code:java}
> 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result 
> ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, 
> chunkIndex=0}, 
> buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to 
> /172.22.18.7:60865; closing connection
> java.io.IOException: Broken pipe
> at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
> at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
> at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
> at sun.nio.ch.IOUtil.write(IOUtil.java:65)
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355)
> at 
> io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
> at 
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901)
> at 
> io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983)
> at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248)
> at 
> io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284)
> at 
> io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
> at 
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
> {code}
>  
> Here is a small reproducible for a small cluster of 2 executors (say host-1 
> and host-2) each with 8 cores. Here, the memory of driver and executors are 
> not an import factor here 

[jira] [Commented] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue

2018-06-18 Thread Imran Rashid (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16516611#comment-16516611
 ] 

Imran Rashid commented on SPARK-24578:
--

btw to answer your initial questions:

{quote}
1. what is the right behavior, should we re-compute or should we transfer block 
from remote?
2. if we should transfer from remote, why the performance is so bad for cache 
block?
{quote}

Spark should try to fetch from the remote, and if that fails for some reason, 
it should fall back to recomputing.  so you should actually see a similar 
sequence of events in the logs in spark 2.3.0 as you do in your spark 2.2.1 
snippet -- the difference being the remote fetch fails in 2.3.0, and so instead 
it does the recomputation.  The real issue here is why the remote fetches are 
failing.  Unfortunately there isn't a ton of info in that stack trace -- are 
there any other warning messages before that in the logs?

I can see how setting spark.locality.wait lets you workaround this, but its 
definitely not an ideal solution.

I wouldn't rule out that commit you mentioned as the issue -- its certainly 
possible.

> Reading remote cache block behavior changes and causes timeout issue
> 
>
> Key: SPARK-24578
> URL: https://issues.apache.org/jira/browse/SPARK-24578
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0, 2.3.1
>Reporter: Wenbo Zhao
>Priority: Major
>
> After Spark 2.3, we observed lots of errors like the following in some of our 
> production job
> {code:java}
> 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result 
> ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, 
> chunkIndex=0}, 
> buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to 
> /172.22.18.7:60865; closing connection
> java.io.IOException: Broken pipe
> at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
> at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
> at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
> at sun.nio.ch.IOUtil.write(IOUtil.java:65)
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355)
> at 
> io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
> at 
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901)
> at 
> io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983)
> at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248)
> at 
> io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284)
> at 
> io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
> at 
> 

[jira] [Commented] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue

2018-06-18 Thread Wenbo Zhao (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16516609#comment-16516609
 ] 

Wenbo Zhao commented on SPARK-24578:


For now, we could reproduce this issue in completely different env and 
different distribution of Spark 2.3.0. We also observed that by setting 
spark.locality.wait to be a big number, e.g. 60s could help but this reduces 
the our system throughput significantly.

> Reading remote cache block behavior changes and causes timeout issue
> 
>
> Key: SPARK-24578
> URL: https://issues.apache.org/jira/browse/SPARK-24578
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0, 2.3.1
>Reporter: Wenbo Zhao
>Priority: Major
>
> After Spark 2.3, we observed lots of errors like the following in some of our 
> production job
> {code:java}
> 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result 
> ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, 
> chunkIndex=0}, 
> buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to 
> /172.22.18.7:60865; closing connection
> java.io.IOException: Broken pipe
> at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
> at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
> at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
> at sun.nio.ch.IOUtil.write(IOUtil.java:65)
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355)
> at 
> io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
> at 
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901)
> at 
> io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983)
> at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248)
> at 
> io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284)
> at 
> io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
> at 
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
> {code}
>  
> Here is a small reproducible for a small cluster of 2 executors (say host-1 
> and host-2) each with 8 cores. Here, the memory of driver and executors are 
> not an import factor here as long as it is big enough, say 20G. 
> {code:java}
> val n = 1
> val df0 = sc.parallelize(1 to n).toDF
> val df = df0.withColumn("x0", rand()).withColumn("x0", rand()
> ).withColumn("x1", rand()
> ).withColumn("x2", rand()
> ).withColumn("x3", rand()
> ).withColumn("x4", rand()
> ).withColumn("x5", 

[jira] [Updated] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue

2018-06-18 Thread Wenbo Zhao (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenbo Zhao updated SPARK-24578:
---
Description: 
After Spark 2.3, we observed lots of errors like the following in some of our 
production job
{code:java}
18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result 
ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, 
chunkIndex=0}, 
buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to 
/172.22.18.7:60865; closing connection
java.io.IOException: Broken pipe
at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
at sun.nio.ch.IOUtil.write(IOUtil.java:65)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
at 
org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156)
at 
org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142)
at 
org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
at 
io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355)
at 
io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224)
at 
io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382)
at 
io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
at 
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362)
at 
io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901)
at 
io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
at 
io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
at 
io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
at 
io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
at 
io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
at 
io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983)
at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248)
at 
io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284)
at 
io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at 
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
at 
io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at 
io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
{code}
 

Here is a small reproducible for a small cluster of 2 executors (say host-1 and 
host-2) each with 8 cores. Here, the memory of driver and executors are not an 
import factor here as long as it is big enough, say 20G. 
{code:java}
val n = 1
val df0 = sc.parallelize(1 to n).toDF
val df = df0.withColumn("x0", rand()).withColumn("x0", rand()
).withColumn("x1", rand()
).withColumn("x2", rand()
).withColumn("x3", rand()
).withColumn("x4", rand()
).withColumn("x5", rand()
).withColumn("x6", rand()
).withColumn("x7", rand()
).withColumn("x8", rand()
).withColumn("x9", rand())

df.cache; df.count

(1 to 10).toArray.par.map { i => println(i); 
df.groupBy("x1").agg(count("value")).show() }
{code}
 

In the above example, we generate a random DataFrame of size around 7G; cache 
it and then perform a parallel DataFrame operations by using `array.par.map`. 
Because of the parallel computation, with high possibility, some task could be 
scheduled to a host-2 where it needs to read the cache block data from host-1. 
This follows the code path of 
[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L691]
 and then tries to transfer a big block (~ 500MB) of cache block from host-1 to 
host-2. Often, this big transfer makes the cluster suffer time out issue (it 
will retry 3 times, each with 120s 

[jira] [Commented] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue

2018-06-18 Thread Wenbo Zhao (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16516602#comment-16516602
 ] 

Wenbo Zhao commented on SPARK-24578:


[~irashid] We didn't touch "spark.maxRemoteBlockSizeFetchToMem". You are right. 
After digging more details, I don't think that commit is relevant,

> Reading remote cache block behavior changes and causes timeout issue
> 
>
> Key: SPARK-24578
> URL: https://issues.apache.org/jira/browse/SPARK-24578
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0, 2.3.1
>Reporter: Wenbo Zhao
>Priority: Major
>
> After Spark 2.3, we observed lots of errors like the following in some of our 
> production job
> {code:java}
> 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result 
> ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, 
> chunkIndex=0}, 
> buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to 
> /172.22.18.7:60865; closing connection
> java.io.IOException: Broken pipe
> at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
> at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
> at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
> at sun.nio.ch.IOUtil.write(IOUtil.java:65)
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355)
> at 
> io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
> at 
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901)
> at 
> io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983)
> at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248)
> at 
> io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284)
> at 
> io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
> at 
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
> {code}
>  
> Here is a small reproducible for a small cluster of 2 executors (say host-1 
> and host-2) each with 8 cores. Here, the memory of driver and executors are 
> not an import factor here as long as it is big enough, say 20G. 
> {code:java}
> val n = 1
> val df0 = sc.parallelize(1 to n).toDF
> val df = df0.withColumn("x0", rand()).withColumn("x0", rand()
> ).withColumn("x1", rand()
> ).withColumn("x2", rand()
> ).withColumn("x3", rand()
> ).withColumn("x4", rand()
> ).withColumn("x5", rand()
> ).withColumn("x6", rand()
> ).withColumn("x7", rand()
> ).withColumn("x8", rand()
> ).withColumn("x9", 

[jira] [Commented] (SPARK-24591) Number of cores and executors in the cluster

2018-06-18 Thread Apache Spark (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16516595#comment-16516595
 ] 

Apache Spark commented on SPARK-24591:
--

User 'MaxGekk' has created a pull request for this issue:
https://github.com/apache/spark/pull/21589

> Number of cores and executors in the cluster
> 
>
> Key: SPARK-24591
> URL: https://issues.apache.org/jira/browse/SPARK-24591
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.3.1
>Reporter: Maxim Gekk
>Priority: Minor
>
> Need to add 2 new methods. The first one should return total number of CPU 
> cores of all executors in the cluster. The second one should give current 
> number of executors registered in the cluster.
> Main motivations for adding of those methods:
> 1. It is the best practice to manage job parallelism relative to available 
> cores, e.g., df.repartition(5 * sc.coreCount) . In particular, it is an 
> anti-pattern to leave a bunch of cores on large clusters twiddling their 
> thumb & doing nothing. Usually users pass predefined constants for 
> _repartition()_ and _coalesce()_. Selection of the constant is based on 
> current cluster size. If the code runs on another cluster and/or on the 
> resized cluster, they need to modify the constant each time. This happens 
> frequently when a job that normally runs on, say, an hour of data on a small 
> cluster needs to run on a week of data on a much larger cluster.
> 2. *spark.default.parallelism* can be used to get total number of cores in 
> the cluster but it can be redefined by user. The info can be taken via 
> registration of a listener but repeating the same looks ugly. We should 
> follow the DRY principle.
> 3. Regarding to executorsCount(), some jobs, e.g., local node ML training, 
> use a lot of parallelism. It's a common practice to aim to distribute such 
> jobs such that there is one partition for each executor. 
>  
> 4. In some places users collect this info, as well as other settings info 
> together with job timing (at the app level) for analysis. E.g., you can use 
> ML to determine optimal cluster size given different objectives, e.g., 
> fastest throughput vs. lowest cost per unit of processing.
> 5. The simpler argument is that basic cluster properties should be easily 
> discoverable via APIs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-24591) Number of cores and executors in the cluster

2018-06-18 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24591?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-24591:


Assignee: (was: Apache Spark)

> Number of cores and executors in the cluster
> 
>
> Key: SPARK-24591
> URL: https://issues.apache.org/jira/browse/SPARK-24591
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.3.1
>Reporter: Maxim Gekk
>Priority: Minor
>
> Need to add 2 new methods. The first one should return total number of CPU 
> cores of all executors in the cluster. The second one should give current 
> number of executors registered in the cluster.
> Main motivations for adding of those methods:
> 1. It is the best practice to manage job parallelism relative to available 
> cores, e.g., df.repartition(5 * sc.coreCount) . In particular, it is an 
> anti-pattern to leave a bunch of cores on large clusters twiddling their 
> thumb & doing nothing. Usually users pass predefined constants for 
> _repartition()_ and _coalesce()_. Selection of the constant is based on 
> current cluster size. If the code runs on another cluster and/or on the 
> resized cluster, they need to modify the constant each time. This happens 
> frequently when a job that normally runs on, say, an hour of data on a small 
> cluster needs to run on a week of data on a much larger cluster.
> 2. *spark.default.parallelism* can be used to get total number of cores in 
> the cluster but it can be redefined by user. The info can be taken via 
> registration of a listener but repeating the same looks ugly. We should 
> follow the DRY principle.
> 3. Regarding to executorsCount(), some jobs, e.g., local node ML training, 
> use a lot of parallelism. It's a common practice to aim to distribute such 
> jobs such that there is one partition for each executor. 
>  
> 4. In some places users collect this info, as well as other settings info 
> together with job timing (at the app level) for analysis. E.g., you can use 
> ML to determine optimal cluster size given different objectives, e.g., 
> fastest throughput vs. lowest cost per unit of processing.
> 5. The simpler argument is that basic cluster properties should be easily 
> discoverable via APIs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-24591) Number of cores and executors in the cluster

2018-06-18 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24591?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-24591:


Assignee: Apache Spark

> Number of cores and executors in the cluster
> 
>
> Key: SPARK-24591
> URL: https://issues.apache.org/jira/browse/SPARK-24591
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.3.1
>Reporter: Maxim Gekk
>Assignee: Apache Spark
>Priority: Minor
>
> Need to add 2 new methods. The first one should return total number of CPU 
> cores of all executors in the cluster. The second one should give current 
> number of executors registered in the cluster.
> Main motivations for adding of those methods:
> 1. It is the best practice to manage job parallelism relative to available 
> cores, e.g., df.repartition(5 * sc.coreCount) . In particular, it is an 
> anti-pattern to leave a bunch of cores on large clusters twiddling their 
> thumb & doing nothing. Usually users pass predefined constants for 
> _repartition()_ and _coalesce()_. Selection of the constant is based on 
> current cluster size. If the code runs on another cluster and/or on the 
> resized cluster, they need to modify the constant each time. This happens 
> frequently when a job that normally runs on, say, an hour of data on a small 
> cluster needs to run on a week of data on a much larger cluster.
> 2. *spark.default.parallelism* can be used to get total number of cores in 
> the cluster but it can be redefined by user. The info can be taken via 
> registration of a listener but repeating the same looks ugly. We should 
> follow the DRY principle.
> 3. Regarding to executorsCount(), some jobs, e.g., local node ML training, 
> use a lot of parallelism. It's a common practice to aim to distribute such 
> jobs such that there is one partition for each executor. 
>  
> 4. In some places users collect this info, as well as other settings info 
> together with job timing (at the app level) for analysis. E.g., you can use 
> ML to determine optimal cluster size given different objectives, e.g., 
> fastest throughput vs. lowest cost per unit of processing.
> 5. The simpler argument is that basic cluster properties should be easily 
> discoverable via APIs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24591) Number of cores and executors in the cluster

2018-06-18 Thread Maxim Gekk (JIRA)
Maxim Gekk created SPARK-24591:
--

 Summary: Number of cores and executors in the cluster
 Key: SPARK-24591
 URL: https://issues.apache.org/jira/browse/SPARK-24591
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 2.3.1
Reporter: Maxim Gekk


Need to add 2 new methods. The first one should return total number of CPU 
cores of all executors in the cluster. The second one should give current 
number of executors registered in the cluster.

Main motivations for adding of those methods:

1. It is the best practice to manage job parallelism relative to available 
cores, e.g., df.repartition(5 * sc.coreCount) . In particular, it is an 
anti-pattern to leave a bunch of cores on large clusters twiddling their thumb 
& doing nothing. Usually users pass predefined constants for _repartition()_ 
and _coalesce()_. Selection of the constant is based on current cluster size. 
If the code runs on another cluster and/or on the resized cluster, they need to 
modify the constant each time. This happens frequently when a job that normally 
runs on, say, an hour of data on a small cluster needs to run on a week of data 
on a much larger cluster.

2. *spark.default.parallelism* can be used to get total number of cores in the 
cluster but it can be redefined by user. The info can be taken via registration 
of a listener but repeating the same looks ugly. We should follow the DRY 
principle.

3. Regarding to executorsCount(), some jobs, e.g., local node ML training, use 
a lot of parallelism. It's a common practice to aim to distribute such jobs 
such that there is one partition for each executor. 
 
4. In some places users collect this info, as well as other settings info 
together with job timing (at the app level) for analysis. E.g., you can use ML 
to determine optimal cluster size given different objectives, e.g., fastest 
throughput vs. lowest cost per unit of processing.

5. The simpler argument is that basic cluster properties should be easily 
discoverable via APIs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24590) Make Jenkins tests passed with hadoop 3 profile

2018-06-18 Thread Apache Spark (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16516586#comment-16516586
 ] 

Apache Spark commented on SPARK-24590:
--

User 'HyukjinKwon' has created a pull request for this issue:
https://github.com/apache/spark/pull/21588

> Make Jenkins tests passed with hadoop 3 profile
> ---
>
> Key: SPARK-24590
> URL: https://issues.apache.org/jira/browse/SPARK-24590
> Project: Spark
>  Issue Type: Sub-task
>  Components: Build
>Affects Versions: 2.4.0
>Reporter: Hyukjin Kwon
>Priority: Major
>
> Currently, some tests are being failed with hadoop-3 profile.
> Given PR builder 
> (https://github.com/apache/spark/pull/21441#issuecomment-397818337), it 
> reported:
> {code}
> org.apache.spark.sql.hive.HiveSparkSubmitSuite.SPARK-8020: set sql conf in 
> spark conf
> org.apache.spark.sql.hive.HiveSparkSubmitSuite.SPARK-9757 Persist Parquet 
> relation with decimal column
> org.apache.spark.sql.hive.HiveSparkSubmitSuite.ConnectionURL
> org.apache.spark.sql.hive.StatisticsSuite.SPARK-22745 - read Hive's 
> statistics for partition
> org.apache.spark.sql.hive.StatisticsSuite.alter table rename after analyze 
> table
> org.apache.spark.sql.hive.StatisticsSuite.alter table SET TBLPROPERTIES after 
> analyze table
> org.apache.spark.sql.hive.StatisticsSuite.alter table UNSET TBLPROPERTIES 
> after analyze table
> org.apache.spark.sql.hive.client.HiveClientSuites.(It is not a test it is a 
> sbt.testing.SuiteSelector)
> org.apache.spark.sql.hive.client.VersionsSuite.success sanity check
> org.apache.spark.sql.hive.client.VersionsSuite.hadoop configuration preserved 
> 75 ms
> org.apache.spark.sql.hive.client.VersionsSuite.*: * (roughly)
> org.apache.spark.sql.hive.execution.HiveCatalogedDDLSuite.basic DDL using 
> locale tr - caseSensitive true
> org.apache.spark.sql.hive.execution.HiveDDLSuite.create Hive-serde table and 
> view with unicode columns and comment
> org.apache.spark.sql.hive.execution.Hive_2_1_DDLSuite.SPARK-21617: ALTER 
> TABLE for non-compatible DataSource tables
> org.apache.spark.sql.hive.execution.Hive_2_1_DDLSuite.SPARK-21617: ALTER 
> TABLE for Hive-compatible DataSource tables
> org.apache.spark.sql.hive.execution.Hive_2_1_DDLSuite.SPARK-21617: ALTER 
> TABLE for Hive tables
> org.apache.spark.sql.hive.execution.Hive_2_1_DDLSuite.SPARK-21617: ALTER 
> TABLE with incompatible schema on Hive-compatible table
> org.apache.spark.sql.hive.execution.Hive_2_1_DDLSuite.(It is not a test it is 
> a sbt.testing.SuiteSelector)
> org.apache.spark.sql.hive.execution.SQLQuerySuite.SPARK-18355 Read data from 
> a hive table with a new column - orc
> org.apache.spark.sql.hive.execution.SQLQuerySuite.SPARK-18355 Read data from 
> a hive table with a new column - parquet
> org.apache.spark.sql.hive.orc.HiveOrcSourceSuite.SPARK-19459/SPARK-18220: 
> read char/varchar column written by Hive
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-24590) Make Jenkins tests passed with hadoop 3 profile

2018-06-18 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24590?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-24590:


Assignee: Apache Spark

> Make Jenkins tests passed with hadoop 3 profile
> ---
>
> Key: SPARK-24590
> URL: https://issues.apache.org/jira/browse/SPARK-24590
> Project: Spark
>  Issue Type: Sub-task
>  Components: Build
>Affects Versions: 2.4.0
>Reporter: Hyukjin Kwon
>Assignee: Apache Spark
>Priority: Major
>
> Currently, some tests are being failed with hadoop-3 profile.
> Given PR builder 
> (https://github.com/apache/spark/pull/21441#issuecomment-397818337), it 
> reported:
> {code}
> org.apache.spark.sql.hive.HiveSparkSubmitSuite.SPARK-8020: set sql conf in 
> spark conf
> org.apache.spark.sql.hive.HiveSparkSubmitSuite.SPARK-9757 Persist Parquet 
> relation with decimal column
> org.apache.spark.sql.hive.HiveSparkSubmitSuite.ConnectionURL
> org.apache.spark.sql.hive.StatisticsSuite.SPARK-22745 - read Hive's 
> statistics for partition
> org.apache.spark.sql.hive.StatisticsSuite.alter table rename after analyze 
> table
> org.apache.spark.sql.hive.StatisticsSuite.alter table SET TBLPROPERTIES after 
> analyze table
> org.apache.spark.sql.hive.StatisticsSuite.alter table UNSET TBLPROPERTIES 
> after analyze table
> org.apache.spark.sql.hive.client.HiveClientSuites.(It is not a test it is a 
> sbt.testing.SuiteSelector)
> org.apache.spark.sql.hive.client.VersionsSuite.success sanity check
> org.apache.spark.sql.hive.client.VersionsSuite.hadoop configuration preserved 
> 75 ms
> org.apache.spark.sql.hive.client.VersionsSuite.*: * (roughly)
> org.apache.spark.sql.hive.execution.HiveCatalogedDDLSuite.basic DDL using 
> locale tr - caseSensitive true
> org.apache.spark.sql.hive.execution.HiveDDLSuite.create Hive-serde table and 
> view with unicode columns and comment
> org.apache.spark.sql.hive.execution.Hive_2_1_DDLSuite.SPARK-21617: ALTER 
> TABLE for non-compatible DataSource tables
> org.apache.spark.sql.hive.execution.Hive_2_1_DDLSuite.SPARK-21617: ALTER 
> TABLE for Hive-compatible DataSource tables
> org.apache.spark.sql.hive.execution.Hive_2_1_DDLSuite.SPARK-21617: ALTER 
> TABLE for Hive tables
> org.apache.spark.sql.hive.execution.Hive_2_1_DDLSuite.SPARK-21617: ALTER 
> TABLE with incompatible schema on Hive-compatible table
> org.apache.spark.sql.hive.execution.Hive_2_1_DDLSuite.(It is not a test it is 
> a sbt.testing.SuiteSelector)
> org.apache.spark.sql.hive.execution.SQLQuerySuite.SPARK-18355 Read data from 
> a hive table with a new column - orc
> org.apache.spark.sql.hive.execution.SQLQuerySuite.SPARK-18355 Read data from 
> a hive table with a new column - parquet
> org.apache.spark.sql.hive.orc.HiveOrcSourceSuite.SPARK-19459/SPARK-18220: 
> read char/varchar column written by Hive
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-24590) Make Jenkins tests passed with hadoop 3 profile

2018-06-18 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24590?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-24590:


Assignee: (was: Apache Spark)

> Make Jenkins tests passed with hadoop 3 profile
> ---
>
> Key: SPARK-24590
> URL: https://issues.apache.org/jira/browse/SPARK-24590
> Project: Spark
>  Issue Type: Sub-task
>  Components: Build
>Affects Versions: 2.4.0
>Reporter: Hyukjin Kwon
>Priority: Major
>
> Currently, some tests are being failed with hadoop-3 profile.
> Given PR builder 
> (https://github.com/apache/spark/pull/21441#issuecomment-397818337), it 
> reported:
> {code}
> org.apache.spark.sql.hive.HiveSparkSubmitSuite.SPARK-8020: set sql conf in 
> spark conf
> org.apache.spark.sql.hive.HiveSparkSubmitSuite.SPARK-9757 Persist Parquet 
> relation with decimal column
> org.apache.spark.sql.hive.HiveSparkSubmitSuite.ConnectionURL
> org.apache.spark.sql.hive.StatisticsSuite.SPARK-22745 - read Hive's 
> statistics for partition
> org.apache.spark.sql.hive.StatisticsSuite.alter table rename after analyze 
> table
> org.apache.spark.sql.hive.StatisticsSuite.alter table SET TBLPROPERTIES after 
> analyze table
> org.apache.spark.sql.hive.StatisticsSuite.alter table UNSET TBLPROPERTIES 
> after analyze table
> org.apache.spark.sql.hive.client.HiveClientSuites.(It is not a test it is a 
> sbt.testing.SuiteSelector)
> org.apache.spark.sql.hive.client.VersionsSuite.success sanity check
> org.apache.spark.sql.hive.client.VersionsSuite.hadoop configuration preserved 
> 75 ms
> org.apache.spark.sql.hive.client.VersionsSuite.*: * (roughly)
> org.apache.spark.sql.hive.execution.HiveCatalogedDDLSuite.basic DDL using 
> locale tr - caseSensitive true
> org.apache.spark.sql.hive.execution.HiveDDLSuite.create Hive-serde table and 
> view with unicode columns and comment
> org.apache.spark.sql.hive.execution.Hive_2_1_DDLSuite.SPARK-21617: ALTER 
> TABLE for non-compatible DataSource tables
> org.apache.spark.sql.hive.execution.Hive_2_1_DDLSuite.SPARK-21617: ALTER 
> TABLE for Hive-compatible DataSource tables
> org.apache.spark.sql.hive.execution.Hive_2_1_DDLSuite.SPARK-21617: ALTER 
> TABLE for Hive tables
> org.apache.spark.sql.hive.execution.Hive_2_1_DDLSuite.SPARK-21617: ALTER 
> TABLE with incompatible schema on Hive-compatible table
> org.apache.spark.sql.hive.execution.Hive_2_1_DDLSuite.(It is not a test it is 
> a sbt.testing.SuiteSelector)
> org.apache.spark.sql.hive.execution.SQLQuerySuite.SPARK-18355 Read data from 
> a hive table with a new column - orc
> org.apache.spark.sql.hive.execution.SQLQuerySuite.SPARK-18355 Read data from 
> a hive table with a new column - parquet
> org.apache.spark.sql.hive.orc.HiveOrcSourceSuite.SPARK-19459/SPARK-18220: 
> read char/varchar column written by Hive
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24590) Make Jenkins tests passed with hadoop 3 profile

2018-06-18 Thread Hyukjin Kwon (JIRA)
Hyukjin Kwon created SPARK-24590:


 Summary: Make Jenkins tests passed with hadoop 3 profile
 Key: SPARK-24590
 URL: https://issues.apache.org/jira/browse/SPARK-24590
 Project: Spark
  Issue Type: Sub-task
  Components: Build
Affects Versions: 2.4.0
Reporter: Hyukjin Kwon


Currently, some tests are being failed with hadoop-3 profile.

Given PR builder 
(https://github.com/apache/spark/pull/21441#issuecomment-397818337), it 
reported:

{code}
org.apache.spark.sql.hive.HiveSparkSubmitSuite.SPARK-8020: set sql conf in 
spark conf
org.apache.spark.sql.hive.HiveSparkSubmitSuite.SPARK-9757 Persist Parquet 
relation with decimal column
org.apache.spark.sql.hive.HiveSparkSubmitSuite.ConnectionURL
org.apache.spark.sql.hive.StatisticsSuite.SPARK-22745 - read Hive's statistics 
for partition
org.apache.spark.sql.hive.StatisticsSuite.alter table rename after analyze table
org.apache.spark.sql.hive.StatisticsSuite.alter table SET TBLPROPERTIES after 
analyze table
org.apache.spark.sql.hive.StatisticsSuite.alter table UNSET TBLPROPERTIES after 
analyze table
org.apache.spark.sql.hive.client.HiveClientSuites.(It is not a test it is a 
sbt.testing.SuiteSelector)
org.apache.spark.sql.hive.client.VersionsSuite.success sanity check
org.apache.spark.sql.hive.client.VersionsSuite.hadoop configuration preserved   
75 ms
org.apache.spark.sql.hive.client.VersionsSuite.*: * (roughly)
org.apache.spark.sql.hive.execution.HiveCatalogedDDLSuite.basic DDL using 
locale tr - caseSensitive true
org.apache.spark.sql.hive.execution.HiveDDLSuite.create Hive-serde table and 
view with unicode columns and comment
org.apache.spark.sql.hive.execution.Hive_2_1_DDLSuite.SPARK-21617: ALTER TABLE 
for non-compatible DataSource tables
org.apache.spark.sql.hive.execution.Hive_2_1_DDLSuite.SPARK-21617: ALTER TABLE 
for Hive-compatible DataSource tables
org.apache.spark.sql.hive.execution.Hive_2_1_DDLSuite.SPARK-21617: ALTER TABLE 
for Hive tables
org.apache.spark.sql.hive.execution.Hive_2_1_DDLSuite.SPARK-21617: ALTER TABLE 
with incompatible schema on Hive-compatible table
org.apache.spark.sql.hive.execution.Hive_2_1_DDLSuite.(It is not a test it is a 
sbt.testing.SuiteSelector)
org.apache.spark.sql.hive.execution.SQLQuerySuite.SPARK-18355 Read data from a 
hive table with a new column - orc
org.apache.spark.sql.hive.execution.SQLQuerySuite.SPARK-18355 Read data from a 
hive table with a new column - parquet
org.apache.spark.sql.hive.orc.HiveOrcSourceSuite.SPARK-19459/SPARK-18220: read 
char/varchar column written by Hive
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue

2018-06-18 Thread Imran Rashid (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16516527#comment-16516527
 ] 

Imran Rashid commented on SPARK-24578:
--

[~wbzhao] [~icexelloss] you're saying this is *without* touching the value of 
"spark.maxRemoteBlockSizeFetchToMem", right?  If you aren't turning on 
fetch-to-disk, then the behavior shouldn't change in 2.3.0 (though of course 
there may be bugs)


cc [~jerryshao] [~attilapiros]

> Reading remote cache block behavior changes and causes timeout issue
> 
>
> Key: SPARK-24578
> URL: https://issues.apache.org/jira/browse/SPARK-24578
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0, 2.3.1
>Reporter: Wenbo Zhao
>Priority: Major
>
> After Spark 2.3, we observed lots of errors like the following in some of our 
> production job
> {code:java}
> 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result 
> ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, 
> chunkIndex=0}, 
> buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to 
> /172.22.18.7:60865; closing connection
> java.io.IOException: Broken pipe
> at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
> at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
> at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
> at sun.nio.ch.IOUtil.write(IOUtil.java:65)
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355)
> at 
> io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
> at 
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901)
> at 
> io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983)
> at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248)
> at 
> io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284)
> at 
> io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
> at 
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
> {code}
>  
> Here is a small reproducible for a small cluster of 2 executors (say host-1 
> and host-2) each with 8 cores. Here, the memory of driver and executors are 
> not an import factor here as long as it is big enough, say 20G. 
> {code:java}
> val n = 1
> val df0 = sc.parallelize(1 to n).toDF
> val df = df0.withColumn("x0", rand()).withColumn("x0", rand()
> ).withColumn("x1", rand()
> ).withColumn("x2", rand()
> ).withColumn("x3", rand()
> ).withColumn("x4", rand()
> 

[jira] [Updated] (SPARK-24552) Task attempt numbers are reused when stages are retried

2018-06-18 Thread Marcelo Vanzin (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24552?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Marcelo Vanzin updated SPARK-24552:
---
Target Version/s: 2.1.3, 2.2.2, 2.3.2

Added some target versions. We should take the chance to fix this now.

> Task attempt numbers are reused when stages are retried
> ---
>
> Key: SPARK-24552
> URL: https://issues.apache.org/jira/browse/SPARK-24552
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.1, 2.2.0, 2.2.1, 2.3.0, 2.3.1
>Reporter: Ryan Blue
>Priority: Blocker
>
> When stages are retried due to shuffle failures, task attempt numbers are 
> reused. This causes a correctness bug in the v2 data sources write path.
> Data sources (both the original and v2) pass the task attempt to writers so 
> that writers can use the attempt number to track and clean up data from 
> failed or speculative attempts. In the v2 docs for DataWriterFactory, the 
> attempt number's javadoc states that "Implementations can use this attempt 
> number to distinguish writers of different task attempts."
> When two attempts of a stage use the same (partition, attempt) pair, two 
> tasks can create the same data and attempt to commit. The commit coordinator 
> prevents both from committing and will abort the attempt that finishes last. 
> When using the (partition, attempt) pair to track data, the aborted task may 
> delete data associated with the (partition, attempt) pair. If that happens, 
> the data for the task that committed is also deleted as well, which is a 
> correctness bug.
> For a concrete example, I have a data source that creates files in place 
> named with {{part---.}}. Because these 
> files are written in place, both tasks create the same file and the one that 
> is aborted deletes the file, leading to data corruption when the file is 
> added to the table.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Issue Comment Deleted] (SPARK-24552) Task attempt numbers are reused when stages are retried

2018-06-18 Thread Marcelo Vanzin (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24552?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Marcelo Vanzin updated SPARK-24552:
---
Comment: was deleted

(was: User 'vanzin' has created a pull request for this issue:
https://github.com/apache/spark/pull/21577)

> Task attempt numbers are reused when stages are retried
> ---
>
> Key: SPARK-24552
> URL: https://issues.apache.org/jira/browse/SPARK-24552
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.1, 2.2.0, 2.2.1, 2.3.0, 2.3.1
>Reporter: Ryan Blue
>Priority: Blocker
>
> When stages are retried due to shuffle failures, task attempt numbers are 
> reused. This causes a correctness bug in the v2 data sources write path.
> Data sources (both the original and v2) pass the task attempt to writers so 
> that writers can use the attempt number to track and clean up data from 
> failed or speculative attempts. In the v2 docs for DataWriterFactory, the 
> attempt number's javadoc states that "Implementations can use this attempt 
> number to distinguish writers of different task attempts."
> When two attempts of a stage use the same (partition, attempt) pair, two 
> tasks can create the same data and attempt to commit. The commit coordinator 
> prevents both from committing and will abort the attempt that finishes last. 
> When using the (partition, attempt) pair to track data, the aborted task may 
> delete data associated with the (partition, attempt) pair. If that happens, 
> the data for the task that committed is also deleted as well, which is a 
> correctness bug.
> For a concrete example, I have a data source that creates files in place 
> named with {{part---.}}. Because these 
> files are written in place, both tasks create the same file and the one that 
> is aborted deletes the file, leading to data corruption when the file is 
> added to the table.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-24589) OutputCommitCoordinator may allow duplicate commits

2018-06-18 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24589?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-24589:


Assignee: (was: Apache Spark)

> OutputCommitCoordinator may allow duplicate commits
> ---
>
> Key: SPARK-24589
> URL: https://issues.apache.org/jira/browse/SPARK-24589
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.1, 2.3.1
>Reporter: Marcelo Vanzin
>Priority: Blocker
>
> This is a sibling bug to SPARK-24552. While investigating the source of that 
> bug, it was found that currently the output committer allows duplicate 
> commits when there are stage retries, and the task with the task attempt 
> number (one in each stage that currently has running tasks) try to commit 
> their output.
> This can lead to duplicate data in the output.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24589) OutputCommitCoordinator may allow duplicate commits

2018-06-18 Thread Apache Spark (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16516524#comment-16516524
 ] 

Apache Spark commented on SPARK-24589:
--

User 'vanzin' has created a pull request for this issue:
https://github.com/apache/spark/pull/21577

> OutputCommitCoordinator may allow duplicate commits
> ---
>
> Key: SPARK-24589
> URL: https://issues.apache.org/jira/browse/SPARK-24589
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.1, 2.3.1
>Reporter: Marcelo Vanzin
>Priority: Blocker
>
> This is a sibling bug to SPARK-24552. While investigating the source of that 
> bug, it was found that currently the output committer allows duplicate 
> commits when there are stage retries, and the task with the task attempt 
> number (one in each stage that currently has running tasks) try to commit 
> their output.
> This can lead to duplicate data in the output.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-24589) OutputCommitCoordinator may allow duplicate commits

2018-06-18 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24589?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-24589:


Assignee: Apache Spark

> OutputCommitCoordinator may allow duplicate commits
> ---
>
> Key: SPARK-24589
> URL: https://issues.apache.org/jira/browse/SPARK-24589
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.1, 2.3.1
>Reporter: Marcelo Vanzin
>Assignee: Apache Spark
>Priority: Blocker
>
> This is a sibling bug to SPARK-24552. While investigating the source of that 
> bug, it was found that currently the output committer allows duplicate 
> commits when there are stage retries, and the task with the task attempt 
> number (one in each stage that currently has running tasks) try to commit 
> their output.
> This can lead to duplicate data in the output.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24589) OutputCommitCoordinator may allow duplicate commits

2018-06-18 Thread Marcelo Vanzin (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16516523#comment-16516523
 ] 

Marcelo Vanzin commented on SPARK-24589:


[~tgraves] fyi

> OutputCommitCoordinator may allow duplicate commits
> ---
>
> Key: SPARK-24589
> URL: https://issues.apache.org/jira/browse/SPARK-24589
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.1, 2.3.1
>Reporter: Marcelo Vanzin
>Priority: Blocker
>
> This is a sibling bug to SPARK-24552. While investigating the source of that 
> bug, it was found that currently the output committer allows duplicate 
> commits when there are stage retries, and the task with the task attempt 
> number (one in each stage that currently has running tasks) try to commit 
> their output.
> This can lead to duplicate data in the output.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24552) Task attempt numbers are reused when stages are retried

2018-06-18 Thread Marcelo Vanzin (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16516522#comment-16516522
 ] 

Marcelo Vanzin commented on SPARK-24552:


I forked the output commiter issue into SPARK-24589 so that we have a separate 
record of each issue.

> Task attempt numbers are reused when stages are retried
> ---
>
> Key: SPARK-24552
> URL: https://issues.apache.org/jira/browse/SPARK-24552
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.1, 2.2.0, 2.2.1, 2.3.0, 2.3.1
>Reporter: Ryan Blue
>Priority: Blocker
>
> When stages are retried due to shuffle failures, task attempt numbers are 
> reused. This causes a correctness bug in the v2 data sources write path.
> Data sources (both the original and v2) pass the task attempt to writers so 
> that writers can use the attempt number to track and clean up data from 
> failed or speculative attempts. In the v2 docs for DataWriterFactory, the 
> attempt number's javadoc states that "Implementations can use this attempt 
> number to distinguish writers of different task attempts."
> When two attempts of a stage use the same (partition, attempt) pair, two 
> tasks can create the same data and attempt to commit. The commit coordinator 
> prevents both from committing and will abort the attempt that finishes last. 
> When using the (partition, attempt) pair to track data, the aborted task may 
> delete data associated with the (partition, attempt) pair. If that happens, 
> the data for the task that committed is also deleted as well, which is a 
> correctness bug.
> For a concrete example, I have a data source that creates files in place 
> named with {{part---.}}. Because these 
> files are written in place, both tasks create the same file and the one that 
> is aborted deletes the file, leading to data corruption when the file is 
> added to the table.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24589) OutputCommitCoordinator may allow duplicate commits

2018-06-18 Thread Marcelo Vanzin (JIRA)
Marcelo Vanzin created SPARK-24589:
--

 Summary: OutputCommitCoordinator may allow duplicate commits
 Key: SPARK-24589
 URL: https://issues.apache.org/jira/browse/SPARK-24589
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.3.1, 2.2.1
Reporter: Marcelo Vanzin


This is a sibling bug to SPARK-24552. While investigating the source of that 
bug, it was found that currently the output committer allows duplicate commits 
when there are stage retries, and the task with the task attempt number (one in 
each stage that currently has running tasks) try to commit their output.

This can lead to duplicate data in the output.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24588) StreamingSymmetricHashJoinExec should require HashClusteredPartitioning from children

2018-06-18 Thread Wenchen Fan (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24588?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan updated SPARK-24588:

Affects Version/s: (was: 2.4.0)
   2.3.0
   2.3.1

> StreamingSymmetricHashJoinExec should require HashClusteredPartitioning from 
> children
> -
>
> Key: SPARK-24588
> URL: https://issues.apache.org/jira/browse/SPARK-24588
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.0, 2.3.1
>Reporter: Wenchen Fan
>Assignee: Wenchen Fan
>Priority: Blocker
>  Labels: correctness
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24588) StreamingSymmetricHashJoinExec should require HashClusteredPartitioning from children

2018-06-18 Thread Wenchen Fan (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24588?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan updated SPARK-24588:

Labels: correctness  (was: )

> StreamingSymmetricHashJoinExec should require HashClusteredPartitioning from 
> children
> -
>
> Key: SPARK-24588
> URL: https://issues.apache.org/jira/browse/SPARK-24588
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.0, 2.3.1
>Reporter: Wenchen Fan
>Assignee: Wenchen Fan
>Priority: Blocker
>  Labels: correctness
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24588) StreamingSymmetricHashJoinExec should require HashClusteredPartitioning from children

2018-06-18 Thread Wenchen Fan (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24588?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan updated SPARK-24588:

Priority: Blocker  (was: Major)

> StreamingSymmetricHashJoinExec should require HashClusteredPartitioning from 
> children
> -
>
> Key: SPARK-24588
> URL: https://issues.apache.org/jira/browse/SPARK-24588
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.0, 2.3.1
>Reporter: Wenchen Fan
>Assignee: Wenchen Fan
>Priority: Blocker
>  Labels: correctness
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-24588) StreamingSymmetricHashJoinExec should require HashClusteredPartitioning from children

2018-06-18 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24588?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-24588:


Assignee: Wenchen Fan  (was: Apache Spark)

> StreamingSymmetricHashJoinExec should require HashClusteredPartitioning from 
> children
> -
>
> Key: SPARK-24588
> URL: https://issues.apache.org/jira/browse/SPARK-24588
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Wenchen Fan
>Assignee: Wenchen Fan
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24588) StreamingSymmetricHashJoinExec should require HashClusteredPartitioning from children

2018-06-18 Thread Apache Spark (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24588?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16516504#comment-16516504
 ] 

Apache Spark commented on SPARK-24588:
--

User 'cloud-fan' has created a pull request for this issue:
https://github.com/apache/spark/pull/21587

> StreamingSymmetricHashJoinExec should require HashClusteredPartitioning from 
> children
> -
>
> Key: SPARK-24588
> URL: https://issues.apache.org/jira/browse/SPARK-24588
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Wenchen Fan
>Assignee: Wenchen Fan
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-24588) StreamingSymmetricHashJoinExec should require HashClusteredPartitioning from children

2018-06-18 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24588?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-24588:


Assignee: Apache Spark  (was: Wenchen Fan)

> StreamingSymmetricHashJoinExec should require HashClusteredPartitioning from 
> children
> -
>
> Key: SPARK-24588
> URL: https://issues.apache.org/jira/browse/SPARK-24588
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Wenchen Fan
>Assignee: Apache Spark
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24588) StreamingSymmetricHashJoinExec should require HashClusteredPartitioning from children

2018-06-18 Thread Wenchen Fan (JIRA)
Wenchen Fan created SPARK-24588:
---

 Summary: StreamingSymmetricHashJoinExec should require 
HashClusteredPartitioning from children
 Key: SPARK-24588
 URL: https://issues.apache.org/jira/browse/SPARK-24588
 Project: Spark
  Issue Type: Bug
  Components: Structured Streaming
Affects Versions: 2.4.0
Reporter: Wenchen Fan
Assignee: Wenchen Fan






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19084) conditional function: field

2018-06-18 Thread Marcelo Vanzin (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-19084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16516482#comment-16516482
 ] 

Marcelo Vanzin commented on SPARK-19084:


(Please ignore my PR above - it should have tagged SPARK-19804, but I 
apparently cannot type.)

> conditional function: field
> ---
>
> Key: SPARK-19084
> URL: https://issues.apache.org/jira/browse/SPARK-19084
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Chenzhao Guo
>Priority: Major
>
> field(str, str1, str2, ... ) is a variable-length(>=2) function which returns 
> the index of str in the list (str1, str2, ... ) or 0 if not found.
> Every parameter is required to be subtype of AtomicType.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-24586) Upcast should not allow casting from string to other types

2018-06-18 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-24586:


Assignee: Wenchen Fan  (was: Apache Spark)

> Upcast should not allow casting from string to other types
> --
>
> Key: SPARK-24586
> URL: https://issues.apache.org/jira/browse/SPARK-24586
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Wenchen Fan
>Assignee: Wenchen Fan
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24586) Upcast should not allow casting from string to other types

2018-06-18 Thread Apache Spark (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16516457#comment-16516457
 ] 

Apache Spark commented on SPARK-24586:
--

User 'cloud-fan' has created a pull request for this issue:
https://github.com/apache/spark/pull/21586

> Upcast should not allow casting from string to other types
> --
>
> Key: SPARK-24586
> URL: https://issues.apache.org/jira/browse/SPARK-24586
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Wenchen Fan
>Assignee: Wenchen Fan
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-24586) Upcast should not allow casting from string to other types

2018-06-18 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-24586:


Assignee: Apache Spark  (was: Wenchen Fan)

> Upcast should not allow casting from string to other types
> --
>
> Key: SPARK-24586
> URL: https://issues.apache.org/jira/browse/SPARK-24586
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Wenchen Fan
>Assignee: Apache Spark
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24587) RDD.takeOrdered uses reduce, pulling all partition data to the driver

2018-06-18 Thread Ryan Deak (JIRA)
Ryan Deak created SPARK-24587:
-

 Summary: RDD.takeOrdered uses reduce, pulling all partition data 
to the driver
 Key: SPARK-24587
 URL: https://issues.apache.org/jira/browse/SPARK-24587
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 2.3.1
Reporter: Ryan Deak


*NOTE*: _This is likely a *very* impactful change, and likely only matters when 
{{num}} is large, but without something like the proposed change, algorithms 
based on distributed {{top-K}} don't scale very well._

h2. Description

{{[RDD.takeOrdered|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1432-L1437]}}
 uses 
{{[reduce|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1011]}}
 to combine {{num}}\-sized {{BoundedPriorityQueue}} instances, where {{num}} is 
the size of the returned {{Array}}.  Consequently, even when the size of the 
return value is small, relative to the driver memory, errors can occur.

An example error is:

{code}
18/06/18 18:51:59 ERROR TaskSetManager: Total size of serialized results of 28 
tasks (8.1 GB) is bigger than spark.driver.maxResultSize (8.0 GB)
18/06/18 18:51:59 ERROR TaskSetManager: Total size of serialized results of 29 
tasks (8.4 GB) is bigger than spark.driver.maxResultSize (8.0 GB)
...
18/06/18 18:51:59 ERROR TaskSetManager: Total size of serialized results of 160 
tasks (46.4 GB) is bigger than spark.driver.maxResultSize (8.0 GB)
{code}

It's clear from this message that although the resulting size of the result 
will be approximately *0.3 GB*  ({{46.4/160}}), the amount of driver memory 
required to combine the results is more than {{46 GB}}.

h2. Proposed Solution

This amount of memory required can be dramatically reduced by using 
{{[treeReduce|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1040]}}.
  For instance replacing the {{else}} clause with:

{code:language=scala}
else {
  import scala.math.{ceil, log, max}
  val depth = max(2, ceil(log(mapRDDs.partitions.length) / log(2)).toInt)

  mapRDDs.treeReduce(
(queue1, queue2) => queue1 ++= queue2,
depth
  ).toArray.sorted(ord)
}
{code}

This should require less than double the network communication but should scale 
to much larger values of the {{num}} parameter without configuration changes or 
beefier machines.

h2. Code Potentially Impacted

* ML Lib's 
{{[CountVectorizer|https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala#L232]}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24586) Upcast should not allow casting from string to other types

2018-06-18 Thread Wenchen Fan (JIRA)
Wenchen Fan created SPARK-24586:
---

 Summary: Upcast should not allow casting from string to other types
 Key: SPARK-24586
 URL: https://issues.apache.org/jira/browse/SPARK-24586
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.4.0
Reporter: Wenchen Fan
Assignee: Wenchen Fan






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-24375) Design sketch: support barrier scheduling in Apache Spark

2018-06-18 Thread Mridul Muralidharan (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16516411#comment-16516411
 ] 

Mridul Muralidharan edited comment on SPARK-24375 at 6/18/18 10:17 PM:
---

[~jiangxb1987] A couple of comments based on the document and your elaboration 
above:

* Is the 'barrier' logic pluggable ? Instead of only being a global sync point.
* Dynamic resource allocation (dra) triggers allocation of additional resources 
based on pending tasks - hence the comment _We may add a check of total 
available slots before scheduling tasks from a barrier stage taskset._ does not 
necessarily work in that context.
* Currently DRA in spark uniformly allocates resources - are we envisioning 
changes as part of this effort to allocate heterogenous executor resources 
based on pending tasks (atleast initially for barrier support for gpu's) ?
* How is fault tolerance handled w.r.t waiting on incorrect barriers ? Any way 
to identify the barrier ? Example:
{code}
try {
  ... snippet A ...
  // Barrier 1
  context.barrier()
  ... snippet B ...
} catch { ... }
... snippet C ...
// Barrier 2
context.barrier()

{code}
** In face of exceptions, some tasks will wait on barrier 2 and others on 
barrier 1 : causing issues.
* Can you elaborate more on leveraging TaskContext.localProperties ? Is it 
expected to be sync'ed after 'barrier' returns ? What gaurantees are we 
expecting to provide ?


was (Author: mridulm80):
[~jiangxb1987] A couple of comments based on the document and your elaboration 
above:

* Is the 'barrier' logic pluggable ? Instead of only being a global sync point.
* Dynamic resource allocation (dra) triggers allocation of additional resources 
based on pending tasks - hence the comment _We may add a check of total 
available slots before scheduling tasks from a barrier stage taskset._ does not 
necessarily work in that context.
* Currently DRA in spark uniformly allocates resources - are we envisioning 
changes as part of this effort to allocate heterogenous executor resources 
based on pending tasks (atleast initially for barrier support for gpu's) ?
* How is fault tolerance handled w.r.t waiting on incorrect barriers ? Any way 
to identify the barrier ? Example:
{code}
try {
  ... snippet A ...
  // Barrier 1
  context.barrier()
  ... snippet B ...
} catch { ... }
... snippet C ...
// Barrier 2
context.barrier()

{code}
** In face of exceptions, some tasks will wait on barrier 2 and others on 
barrier 1 : causing issues.
*

> Design sketch: support barrier scheduling in Apache Spark
> -
>
> Key: SPARK-24375
> URL: https://issues.apache.org/jira/browse/SPARK-24375
> Project: Spark
>  Issue Type: Story
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: Xiangrui Meng
>Assignee: Jiang Xingbo
>Priority: Major
>
> This task is to outline a design sketch for the barrier scheduling SPIP 
> discussion. It doesn't need to be a complete design before the vote. But it 
> should at least cover both Scala/Java and PySpark.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24375) Design sketch: support barrier scheduling in Apache Spark

2018-06-18 Thread Mridul Muralidharan (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16516411#comment-16516411
 ] 

Mridul Muralidharan commented on SPARK-24375:
-


[~jiangxb1987] A couple of comments based on the document and your elaboration 
above:

* Is the 'barrier' logic pluggable ? Instead of only being a global sync point.
* Dynamic resource allocation (dra) triggers allocation of additional resources 
based on pending tasks - hence *We may add a check of total available slots 
before scheduling tasks from a barrier stage taskset.* does not necessarily 
work in that context.
* Currently DRA in spark uniformly allocates resources - are we envisioning 
changes as part of this effort to allocate heterogenous executor resources 
based on pending tasks (atleast initially for barrier support for gpu's) ?
* How is fault tolerance handled w.r.t waiting on incorrect barriers ? Any way 
to identify the barrier ? Example:
{code}
try {
  ... snippet A ...
  // Barrier 1
  context.barrier()
  ... snippet B ...
} catch { ... }
... snippet C ...
// Barrier 2
context.barrier()

{code}
** In face of exceptions, some tasks will wait on barrier 2 and others on 
barrier 1 : causing issues.
*

> Design sketch: support barrier scheduling in Apache Spark
> -
>
> Key: SPARK-24375
> URL: https://issues.apache.org/jira/browse/SPARK-24375
> Project: Spark
>  Issue Type: Story
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: Xiangrui Meng
>Assignee: Jiang Xingbo
>Priority: Major
>
> This task is to outline a design sketch for the barrier scheduling SPIP 
> discussion. It doesn't need to be a complete design before the vote. But it 
> should at least cover both Scala/Java and PySpark.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-24375) Design sketch: support barrier scheduling in Apache Spark

2018-06-18 Thread Mridul Muralidharan (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16516411#comment-16516411
 ] 

Mridul Muralidharan edited comment on SPARK-24375 at 6/18/18 10:15 PM:
---

[~jiangxb1987] A couple of comments based on the document and your elaboration 
above:

* Is the 'barrier' logic pluggable ? Instead of only being a global sync point.
* Dynamic resource allocation (dra) triggers allocation of additional resources 
based on pending tasks - hence the comment _We may add a check of total 
available slots before scheduling tasks from a barrier stage taskset._ does not 
necessarily work in that context.
* Currently DRA in spark uniformly allocates resources - are we envisioning 
changes as part of this effort to allocate heterogenous executor resources 
based on pending tasks (atleast initially for barrier support for gpu's) ?
* How is fault tolerance handled w.r.t waiting on incorrect barriers ? Any way 
to identify the barrier ? Example:
{code}
try {
  ... snippet A ...
  // Barrier 1
  context.barrier()
  ... snippet B ...
} catch { ... }
... snippet C ...
// Barrier 2
context.barrier()

{code}
** In face of exceptions, some tasks will wait on barrier 2 and others on 
barrier 1 : causing issues.
*


was (Author: mridulm80):

[~jiangxb1987] A couple of comments based on the document and your elaboration 
above:

* Is the 'barrier' logic pluggable ? Instead of only being a global sync point.
* Dynamic resource allocation (dra) triggers allocation of additional resources 
based on pending tasks - hence *We may add a check of total available slots 
before scheduling tasks from a barrier stage taskset.* does not necessarily 
work in that context.
* Currently DRA in spark uniformly allocates resources - are we envisioning 
changes as part of this effort to allocate heterogenous executor resources 
based on pending tasks (atleast initially for barrier support for gpu's) ?
* How is fault tolerance handled w.r.t waiting on incorrect barriers ? Any way 
to identify the barrier ? Example:
{code}
try {
  ... snippet A ...
  // Barrier 1
  context.barrier()
  ... snippet B ...
} catch { ... }
... snippet C ...
// Barrier 2
context.barrier()

{code}
** In face of exceptions, some tasks will wait on barrier 2 and others on 
barrier 1 : causing issues.
*

> Design sketch: support barrier scheduling in Apache Spark
> -
>
> Key: SPARK-24375
> URL: https://issues.apache.org/jira/browse/SPARK-24375
> Project: Spark
>  Issue Type: Story
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: Xiangrui Meng
>Assignee: Jiang Xingbo
>Priority: Major
>
> This task is to outline a design sketch for the barrier scheduling SPIP 
> discussion. It doesn't need to be a complete design before the vote. But it 
> should at least cover both Scala/Java and PySpark.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24585) Adding ability to audit file system before and after test to ensure all files are cleaned up.

2018-06-18 Thread David Lewis (JIRA)
David Lewis created SPARK-24585:
---

 Summary: Adding ability to audit file system before and after test 
to ensure all files are cleaned up.
 Key: SPARK-24585
 URL: https://issues.apache.org/jira/browse/SPARK-24585
 Project: Spark
  Issue Type: Test
  Components: Build
Affects Versions: 2.3.1
Reporter: David Lewis


Some spark tests use temporary files and folders on the file system. This 
proposal is to audit the file system before and after to ensure that all files 
created are cleaned up.

This proposal is to add two flags to SparkFunSuite. One to enable the auditing, 
which will log a warning if there is a discrepancy, and one to throw an 
exception if there is a discrepancy. 

This is intended to help prevent file leakage in spark.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-14540) Support Scala 2.12 closures and Java 8 lambdas in ClosureCleaner

2018-06-18 Thread Stavros Kontopoulos (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-14540?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16516371#comment-16516371
 ] 

Stavros Kontopoulos commented on SPARK-14540:
-

[~srowen] We will prepare a design doc for what changes are needed for cleaning 
closures and what encoding of lambdas are generated in 2.12. 

> Support Scala 2.12 closures and Java 8 lambdas in ClosureCleaner
> 
>
> Key: SPARK-14540
> URL: https://issues.apache.org/jira/browse/SPARK-14540
> Project: Spark
>  Issue Type: Sub-task
>  Components: Spark Core
>Reporter: Josh Rosen
>Priority: Major
>
> Using https://github.com/JoshRosen/spark/tree/build-for-2.12, I tried running 
> ClosureCleanerSuite with Scala 2.12 and ran into two bad test failures:
> {code}
> [info] - toplevel return statements in closures are identified at cleaning 
> time *** FAILED *** (32 milliseconds)
> [info]   Expected exception 
> org.apache.spark.util.ReturnStatementInClosureException to be thrown, but no 
> exception was thrown. (ClosureCleanerSuite.scala:57)
> {code}
> and
> {code}
> [info] - user provided closures are actually cleaned *** FAILED *** (56 
> milliseconds)
> [info]   Expected ReturnStatementInClosureException, but got 
> org.apache.spark.SparkException: Job aborted due to stage failure: Task not 
> serializable: java.io.NotSerializableException: java.lang.Object
> [info]- element of array (index: 0)
> [info]- array (class "[Ljava.lang.Object;", size: 1)
> [info]- field (class "java.lang.invoke.SerializedLambda", name: 
> "capturedArgs", type: "class [Ljava.lang.Object;")
> [info]- object (class "java.lang.invoke.SerializedLambda", 
> SerializedLambda[capturingClass=class 
> org.apache.spark.util.TestUserClosuresActuallyCleaned$, 
> functionalInterfaceMethod=scala/runtime/java8/JFunction1$mcII$sp.apply$mcII$sp:(I)I,
>  implementation=invokeStatic 
> org/apache/spark/util/TestUserClosuresActuallyCleaned$.org$apache$spark$util$TestUserClosuresActuallyCleaned$$$anonfun$69:(Ljava/lang/Object;I)I,
>  instantiatedMethodType=(I)I, numCaptured=1])
> [info]- element of array (index: 0)
> [info]- array (class "[Ljava.lang.Object;", size: 1)
> [info]- field (class "java.lang.invoke.SerializedLambda", name: 
> "capturedArgs", type: "class [Ljava.lang.Object;")
> [info]- object (class "java.lang.invoke.SerializedLambda", 
> SerializedLambda[capturingClass=class org.apache.spark.rdd.RDD, 
> functionalInterfaceMethod=scala/Function3.apply:(Ljava/lang/Object;Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;,
>  implementation=invokeStatic 
> org/apache/spark/rdd/RDD.org$apache$spark$rdd$RDD$$$anonfun$20$adapted:(Lscala/Function1;Lorg/apache/spark/TaskContext;Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;,
>  
> instantiatedMethodType=(Lorg/apache/spark/TaskContext;Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;,
>  numCaptured=1])
> [info]- field (class "org.apache.spark.rdd.MapPartitionsRDD", name: 
> "f", type: "interface scala.Function3")
> [info]- object (class "org.apache.spark.rdd.MapPartitionsRDD", 
> MapPartitionsRDD[2] at apply at Transformer.scala:22)
> [info]- field (class "scala.Tuple2", name: "_1", type: "class 
> java.lang.Object")
> [info]- root object (class "scala.Tuple2", (MapPartitionsRDD[2] at 
> apply at 
> Transformer.scala:22,org.apache.spark.SparkContext$$Lambda$957/431842435@6e803685)).
> [info]   This means the closure provided by user is not actually cleaned. 
> (ClosureCleanerSuite.scala:78)
> {code}
> We'll need to figure out a closure cleaning strategy which works for 2.12 
> lambdas.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24248) [K8S] Use the Kubernetes cluster as the backing store for the state of pods

2018-06-18 Thread Matt Cheah (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16516368#comment-16516368
 ] 

Matt Cheah commented on SPARK-24248:


I've summarized what we ended up going with after some iteration on the PR 
here: 
[https://docs.google.com/document/d/1BWTK76k2242spz66JOx8SKKEl5qFV6Cg9ASxCdmIWbY/edit?usp=sharing].
 Recommendations are still welcome and can be worked on in follow up patches.

> [K8S] Use the Kubernetes cluster as the backing store for the state of pods
> ---
>
> Key: SPARK-24248
> URL: https://issues.apache.org/jira/browse/SPARK-24248
> Project: Spark
>  Issue Type: Improvement
>  Components: Kubernetes
>Affects Versions: 2.3.0
>Reporter: Matt Cheah
>Priority: Major
> Fix For: 2.4.0
>
>
> We have a number of places in KubernetesClusterSchedulerBackend right now 
> that maintains the state of pods in memory. However, the Kubernetes API can 
> always give us the most up to date and correct view of what our executors are 
> doing. We should consider moving away from in-memory state as much as can in 
> favor of using the Kubernetes cluster as the source of truth for pod status. 
> Maintaining less state in memory makes it so that there's a lower chance that 
> we accidentally miss updating one of these data structures and breaking the 
> lifecycle of executors.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24583) Wrong schema type in InsertIntoDataSourceCommand

2018-06-18 Thread Apache Spark (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16516332#comment-16516332
 ] 

Apache Spark commented on SPARK-24583:
--

User 'maryannxue' has created a pull request for this issue:
https://github.com/apache/spark/pull/21585

> Wrong schema type in InsertIntoDataSourceCommand
> 
>
> Key: SPARK-24583
> URL: https://issues.apache.org/jira/browse/SPARK-24583
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Maryann Xue
>Priority: Major
> Fix For: 2.4.0
>
>
> For a DataSource table, whose schema contains a field with "nullable=false", 
> while user tries to insert a NULL value into this field, the input dataFrame 
> will return an incorrect value or throw NullPointerException. And that's 
> because, the schema nullability of the input relation has been overridden 
> bluntly with the destination schema by the code below in 
> {{InsertIntoDataSourceCommand}}:
> {code:java}
>   override def run(sparkSession: SparkSession): Seq[Row] = {
> val relation = logicalRelation.relation.asInstanceOf[InsertableRelation]
> val data = Dataset.ofRows(sparkSession, query)
> // Apply the schema of the existing table to the new data.
> val df = sparkSession.internalCreateDataFrame(data.queryExecution.toRdd, 
> logicalRelation.schema)
> relation.insert(df, overwrite)
> // Re-cache all cached plans(including this relation itself, if it's 
> cached) that refer to this
> // data source relation.
> sparkSession.sharedState.cacheManager.recacheByPlan(sparkSession, 
> logicalRelation)
> Seq.empty[Row]
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-24583) Wrong schema type in InsertIntoDataSourceCommand

2018-06-18 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24583?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-24583:


Assignee: Apache Spark

> Wrong schema type in InsertIntoDataSourceCommand
> 
>
> Key: SPARK-24583
> URL: https://issues.apache.org/jira/browse/SPARK-24583
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Maryann Xue
>Assignee: Apache Spark
>Priority: Major
> Fix For: 2.4.0
>
>
> For a DataSource table, whose schema contains a field with "nullable=false", 
> while user tries to insert a NULL value into this field, the input dataFrame 
> will return an incorrect value or throw NullPointerException. And that's 
> because, the schema nullability of the input relation has been overridden 
> bluntly with the destination schema by the code below in 
> {{InsertIntoDataSourceCommand}}:
> {code:java}
>   override def run(sparkSession: SparkSession): Seq[Row] = {
> val relation = logicalRelation.relation.asInstanceOf[InsertableRelation]
> val data = Dataset.ofRows(sparkSession, query)
> // Apply the schema of the existing table to the new data.
> val df = sparkSession.internalCreateDataFrame(data.queryExecution.toRdd, 
> logicalRelation.schema)
> relation.insert(df, overwrite)
> // Re-cache all cached plans(including this relation itself, if it's 
> cached) that refer to this
> // data source relation.
> sparkSession.sharedState.cacheManager.recacheByPlan(sparkSession, 
> logicalRelation)
> Seq.empty[Row]
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-24583) Wrong schema type in InsertIntoDataSourceCommand

2018-06-18 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24583?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-24583:


Assignee: (was: Apache Spark)

> Wrong schema type in InsertIntoDataSourceCommand
> 
>
> Key: SPARK-24583
> URL: https://issues.apache.org/jira/browse/SPARK-24583
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Maryann Xue
>Priority: Major
> Fix For: 2.4.0
>
>
> For a DataSource table, whose schema contains a field with "nullable=false", 
> while user tries to insert a NULL value into this field, the input dataFrame 
> will return an incorrect value or throw NullPointerException. And that's 
> because, the schema nullability of the input relation has been overridden 
> bluntly with the destination schema by the code below in 
> {{InsertIntoDataSourceCommand}}:
> {code:java}
>   override def run(sparkSession: SparkSession): Seq[Row] = {
> val relation = logicalRelation.relation.asInstanceOf[InsertableRelation]
> val data = Dataset.ofRows(sparkSession, query)
> // Apply the schema of the existing table to the new data.
> val df = sparkSession.internalCreateDataFrame(data.queryExecution.toRdd, 
> logicalRelation.schema)
> relation.insert(df, overwrite)
> // Re-cache all cached plans(including this relation itself, if it's 
> cached) that refer to this
> // data source relation.
> sparkSession.sharedState.cacheManager.recacheByPlan(sparkSession, 
> logicalRelation)
> Seq.empty[Row]
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24432) Add support for dynamic resource allocation

2018-06-18 Thread Henry Robinson (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24432?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16516305#comment-16516305
 ] 

Henry Robinson commented on SPARK-24432:


I'm really interested in this feature. What's the current status? Beyond the 
work of porting the code from the fork, are there design questions still to be 
resolved? Anything that I and my colleagues can help drive forward?

> Add support for dynamic resource allocation
> ---
>
> Key: SPARK-24432
> URL: https://issues.apache.org/jira/browse/SPARK-24432
> Project: Spark
>  Issue Type: New Feature
>  Components: Kubernetes
>Affects Versions: 2.4.0
>Reporter: Yinan Li
>Priority: Major
>
> This is an umbrella ticket for work on adding support for dynamic resource 
> allocation into the Kubernetes mode. This requires a Kubernetes-specific 
> external shuffle service. The feature is available in our fork at 
> github.com/apache-spark-on-k8s/spark.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24584) [K8s] More efficient storage of executor pod state

2018-06-18 Thread Matt Cheah (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16516300#comment-16516300
 ] 

Matt Cheah commented on SPARK-24584:


Related to https://issues.apache.org/jira/browse/SPARK-24248

> [K8s] More efficient storage of executor pod state
> --
>
> Key: SPARK-24584
> URL: https://issues.apache.org/jira/browse/SPARK-24584
> Project: Spark
>  Issue Type: Improvement
>  Components: Kubernetes
>Affects Versions: 2.4.0
>Reporter: Matt Cheah
>Priority: Major
>
> Currently we store buffers of snapshots in {{ExecutorPodsSnapshotStore}}, 
> where the snapshots are duplicated per subscriber. With hundreds or maybe 
> thousands of executors, this buffering may become untenable. Investigate 
> storing less state while still maintaining the same level-triggered semantics.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24584) [K8s] More efficient storage of executor pod state

2018-06-18 Thread Matt Cheah (JIRA)
Matt Cheah created SPARK-24584:
--

 Summary: [K8s] More efficient storage of executor pod state
 Key: SPARK-24584
 URL: https://issues.apache.org/jira/browse/SPARK-24584
 Project: Spark
  Issue Type: Improvement
  Components: Kubernetes
Affects Versions: 2.4.0
Reporter: Matt Cheah


Currently we store buffers of snapshots in {{ExecutorPodsSnapshotStore}}, where 
the snapshots are duplicated per subscriber. With hundreds or maybe thousands 
of executors, this buffering may become untenable. Investigate storing less 
state while still maintaining the same level-triggered semantics.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-24548) JavaPairRDD to Dataset in SPARK generates ambiguous results

2018-06-18 Thread Wenchen Fan (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan reassigned SPARK-24548:
---

Assignee: Liang-Chi Hsieh

> JavaPairRDD to Dataset in SPARK generates ambiguous results
> 
>
> Key: SPARK-24548
> URL: https://issues.apache.org/jira/browse/SPARK-24548
> Project: Spark
>  Issue Type: Bug
>  Components: Java API, SQL
>Affects Versions: 2.3.0
> Environment: Using Windows 10, on 64bit machine with 16G of ram.
>Reporter: Jackson
>Assignee: Liang-Chi Hsieh
>Priority: Major
> Fix For: 2.4.0
>
>
> I have data in below JavaPairRDD :
> {quote}JavaPairRDD> MY_RDD;
> {quote}
> I tried using below code:
> {quote}Encoder>> encoder2 =
> Encoders.tuple(Encoders.STRING(), 
> Encoders.tuple(Encoders.STRING(),Encoders.STRING()));
> Dataset newDataSet = 
> spark.createDataset(JavaPairRDD.toRDD(MY_RDD),encoder2).toDF("value1","value2");
> newDataSet.printSchema();
> {quote}
> {{root}}
> {{ |-- value1: string (nullable = true)}}
> {{ |-- value2: struct (nullable = true)}}
> {{ | |-- value: string (nullable = true)}}
> {{ | |-- value: string (nullable = true)}}
> But after creating a StackOverflow question 
> ("https://stackoverflow.com/questions/50834145/javapairrdd-to-datasetrow-in-spark;),
>  i got to know that values in tuple should have distinguish field names, 
> where in this case its generating same name. Cause of this I cannot select 
> specific column under value2.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-24548) JavaPairRDD to Dataset in SPARK generates ambiguous results

2018-06-18 Thread Wenchen Fan (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan resolved SPARK-24548.
-
   Resolution: Fixed
Fix Version/s: 2.4.0

Issue resolved by pull request 21576
[https://github.com/apache/spark/pull/21576]

> JavaPairRDD to Dataset in SPARK generates ambiguous results
> 
>
> Key: SPARK-24548
> URL: https://issues.apache.org/jira/browse/SPARK-24548
> Project: Spark
>  Issue Type: Bug
>  Components: Java API, SQL
>Affects Versions: 2.3.0
> Environment: Using Windows 10, on 64bit machine with 16G of ram.
>Reporter: Jackson
>Priority: Major
> Fix For: 2.4.0
>
>
> I have data in below JavaPairRDD :
> {quote}JavaPairRDD> MY_RDD;
> {quote}
> I tried using below code:
> {quote}Encoder>> encoder2 =
> Encoders.tuple(Encoders.STRING(), 
> Encoders.tuple(Encoders.STRING(),Encoders.STRING()));
> Dataset newDataSet = 
> spark.createDataset(JavaPairRDD.toRDD(MY_RDD),encoder2).toDF("value1","value2");
> newDataSet.printSchema();
> {quote}
> {{root}}
> {{ |-- value1: string (nullable = true)}}
> {{ |-- value2: struct (nullable = true)}}
> {{ | |-- value: string (nullable = true)}}
> {{ | |-- value: string (nullable = true)}}
> But after creating a StackOverflow question 
> ("https://stackoverflow.com/questions/50834145/javapairrdd-to-datasetrow-in-spark;),
>  i got to know that values in tuple should have distinguish field names, 
> where in this case its generating same name. Cause of this I cannot select 
> specific column under value2.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24423) Add a new option `query` for JDBC sources

2018-06-18 Thread Dilip Biswal (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16516071#comment-16516071
 ] 

Dilip Biswal commented on SPARK-24423:
--

[~maropu] Hello, yes. I will open a PR today/tomorrow.

> Add a new option `query` for JDBC sources
> -
>
> Key: SPARK-24423
> URL: https://issues.apache.org/jira/browse/SPARK-24423
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Xiao Li
>Priority: Major
>
> Currently, our JDBC connector provides the option `dbtable` for users to 
> specify the to-be-loaded JDBC source table. 
> {code} 
>  val jdbcDf = spark.read
>    .format("jdbc")
>    .option("*dbtable*", "dbName.tableName")
>    .options(jdbcCredentials: Map)
>    .load()
> {code} 
>  Normally, users do not fetch the whole JDBC table due to the poor 
> performance/throughput of JDBC. Thus, they normally just fetch a small set of 
> tables. For advanced users, they can pass a subquery as the option.   
> {code} 
>  val query = """ (select * from tableName limit 10) as tmp """
>  val jdbcDf = spark.read
>    .format("jdbc")
>    .option("*dbtable*", query)
>    .options(jdbcCredentials: Map)
>    .load()
> {code} 
>  However, this is straightforward to end users. We should simply allow users 
> to specify the query by a new option `query`. We will handle the complexity 
> for them. 
> {code} 
>  val query = """select * from tableName limit 10"""
>  val jdbcDf = spark.read
>    .format("jdbc")
>    .option("*{color:#ff}query{color}*", query)
>    .options(jdbcCredentials: Map)
>    .load()
> {code} 
>  Users are not allowed to specify query and dbtable at the same time. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24583) Wrong schema type in InsertIntoDataSourceCommand

2018-06-18 Thread Maryann Xue (JIRA)
Maryann Xue created SPARK-24583:
---

 Summary: Wrong schema type in InsertIntoDataSourceCommand
 Key: SPARK-24583
 URL: https://issues.apache.org/jira/browse/SPARK-24583
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.3.0
Reporter: Maryann Xue
 Fix For: 2.4.0


For a DataSource table, whose schema contains a field with "nullable=false", 
while user tries to insert a NULL value into this field, the input dataFrame 
will return an incorrect value or throw NullPointerException. And that's 
because, the schema nullability of the input relation has been overridden 
bluntly with the destination schema by the code below in 
{{InsertIntoDataSourceCommand}}:
{code:java}
  override def run(sparkSession: SparkSession): Seq[Row] = {
val relation = logicalRelation.relation.asInstanceOf[InsertableRelation]
val data = Dataset.ofRows(sparkSession, query)
// Apply the schema of the existing table to the new data.
val df = sparkSession.internalCreateDataFrame(data.queryExecution.toRdd, 
logicalRelation.schema)
relation.insert(df, overwrite)

// Re-cache all cached plans(including this relation itself, if it's 
cached) that refer to this
// data source relation.
sparkSession.sharedState.cacheManager.recacheByPlan(sparkSession, 
logicalRelation)

Seq.empty[Row]
  }
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-24526) Spaces in the build dir causes failures in the build/mvn script

2018-06-18 Thread Hyukjin Kwon (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon reassigned SPARK-24526:


Assignee: Trystan Leftwich

> Spaces in the build dir causes failures in the build/mvn script
> ---
>
> Key: SPARK-24526
> URL: https://issues.apache.org/jira/browse/SPARK-24526
> Project: Spark
>  Issue Type: Bug
>  Components: Build
>Affects Versions: 2.3.0
>Reporter: Trystan Leftwich
>Assignee: Trystan Leftwich
>Priority: Minor
> Fix For: 2.4.0
>
>
> If you are running make-distribution in a path that contains a space in it 
> the build/mvn script will fail:
> {code:bash}
> mkdir /tmp/test\ spaces
> cd /tmp/test\ spaces
> git clone https://github.com/apache/spark.git
> cd spark
> # Remove all mvn references in PATH so the script will download mvn to the 
> local dir
> ./build/mvn -DskipTests clean package{code}
> You will get the following errors:
> {code:bash}
> Using `mvn` from path: /tmp/test spaces/spark/build/apache-maven-3.3.9/bin/mvn
> ./build/mvn: line 157: /tmp/test: No such file or directory
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-24526) Spaces in the build dir causes failures in the build/mvn script

2018-06-18 Thread Hyukjin Kwon (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-24526.
--
   Resolution: Fixed
Fix Version/s: 2.4.0

Issue resolved by pull request 21534
[https://github.com/apache/spark/pull/21534]

> Spaces in the build dir causes failures in the build/mvn script
> ---
>
> Key: SPARK-24526
> URL: https://issues.apache.org/jira/browse/SPARK-24526
> Project: Spark
>  Issue Type: Bug
>  Components: Build
>Affects Versions: 2.3.0
>Reporter: Trystan Leftwich
>Assignee: Trystan Leftwich
>Priority: Minor
> Fix For: 2.4.0
>
>
> If you are running make-distribution in a path that contains a space in it 
> the build/mvn script will fail:
> {code:bash}
> mkdir /tmp/test\ spaces
> cd /tmp/test\ spaces
> git clone https://github.com/apache/spark.git
> cd spark
> # Remove all mvn references in PATH so the script will download mvn to the 
> local dir
> ./build/mvn -DskipTests clean package{code}
> You will get the following errors:
> {code:bash}
> Using `mvn` from path: /tmp/test spaces/spark/build/apache-maven-3.3.9/bin/mvn
> ./build/mvn: line 157: /tmp/test: No such file or directory
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-23772) Provide an option to ignore column of all null values or empty map/array during JSON schema inference

2018-06-18 Thread Hyukjin Kwon (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-23772?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-23772.
--
   Resolution: Fixed
Fix Version/s: 2.4.0

Issue resolved by pull request 20929
[https://github.com/apache/spark/pull/20929]

> Provide an option to ignore column of all null values or empty map/array 
> during JSON schema inference
> -
>
> Key: SPARK-23772
> URL: https://issues.apache.org/jira/browse/SPARK-23772
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Xiangrui Meng
>Assignee: Takeshi Yamamuro
>Priority: Major
> Fix For: 2.4.0
>
>
> It is common that we convert data from JSON source to structured format 
> periodically. In the initial batch of JSON data, if a field's values are 
> always null, Spark infers this field as StringType. However, in the second 
> batch, one non-null value appears in this field and its type turns out to be 
> not StringType. Then merge schema failed because schema inconsistency.
> This also applies to empty arrays and empty objects. My proposal is providing 
> an option in Spark JSON source to omit those fields until we see a non-null 
> value.
> This is similar to SPARK-12436 but the proposed solution is different.
> cc: [~rxin] [~smilegator]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-24433) Add Spark R support

2018-06-18 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24433?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-24433:


Assignee: (was: Apache Spark)

> Add Spark R support
> ---
>
> Key: SPARK-24433
> URL: https://issues.apache.org/jira/browse/SPARK-24433
> Project: Spark
>  Issue Type: New Feature
>  Components: Kubernetes
>Affects Versions: 2.4.0
>Reporter: Yinan Li
>Priority: Major
>
> This is the ticket to track work on adding support for R binding into the 
> Kubernetes mode. The feature is available in our fork at 
> github.com/apache-spark-on-k8s/spark and needs to be upstreamed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24433) Add Spark R support

2018-06-18 Thread Apache Spark (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24433?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16515971#comment-16515971
 ] 

Apache Spark commented on SPARK-24433:
--

User 'ifilonenko' has created a pull request for this issue:
https://github.com/apache/spark/pull/21584

> Add Spark R support
> ---
>
> Key: SPARK-24433
> URL: https://issues.apache.org/jira/browse/SPARK-24433
> Project: Spark
>  Issue Type: New Feature
>  Components: Kubernetes
>Affects Versions: 2.4.0
>Reporter: Yinan Li
>Priority: Major
>
> This is the ticket to track work on adding support for R binding into the 
> Kubernetes mode. The feature is available in our fork at 
> github.com/apache-spark-on-k8s/spark and needs to be upstreamed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-24433) Add Spark R support

2018-06-18 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24433?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-24433:


Assignee: Apache Spark

> Add Spark R support
> ---
>
> Key: SPARK-24433
> URL: https://issues.apache.org/jira/browse/SPARK-24433
> Project: Spark
>  Issue Type: New Feature
>  Components: Kubernetes
>Affects Versions: 2.4.0
>Reporter: Yinan Li
>Assignee: Apache Spark
>Priority: Major
>
> This is the ticket to track work on adding support for R binding into the 
> Kubernetes mode. The feature is available in our fork at 
> github.com/apache-spark-on-k8s/spark and needs to be upstreamed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-24375) Design sketch: support barrier scheduling in Apache Spark

2018-06-18 Thread Xiangrui Meng (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiangrui Meng resolved SPARK-24375.
---
Resolution: Done

> Design sketch: support barrier scheduling in Apache Spark
> -
>
> Key: SPARK-24375
> URL: https://issues.apache.org/jira/browse/SPARK-24375
> Project: Spark
>  Issue Type: Story
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: Xiangrui Meng
>Assignee: Jiang Xingbo
>Priority: Major
>
> This task is to outline a design sketch for the barrier scheduling SPIP 
> discussion. It doesn't need to be a complete design before the vote. But it 
> should at least cover both Scala/Java and PySpark.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24375) Design sketch: support barrier scheduling in Apache Spark

2018-06-18 Thread Xiangrui Meng (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16515913#comment-16515913
 ] 

Xiangrui Meng commented on SPARK-24375:
---

I'm closing this Jira in favor of formal design discussions at SPARK-24581 and 
SPARK-24582. Please watch those tickets and provide your inputs there. Thanks!

> Design sketch: support barrier scheduling in Apache Spark
> -
>
> Key: SPARK-24375
> URL: https://issues.apache.org/jira/browse/SPARK-24375
> Project: Spark
>  Issue Type: Story
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: Xiangrui Meng
>Assignee: Jiang Xingbo
>Priority: Major
>
> This task is to outline a design sketch for the barrier scheduling SPIP 
> discussion. It doesn't need to be a complete design before the vote. But it 
> should at least cover both Scala/Java and PySpark.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-24580) List scenarios to be handled by barrier execution mode properly

2018-06-18 Thread Xiangrui Meng (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24580?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiangrui Meng reassigned SPARK-24580:
-

Assignee: Jiang Xingbo  (was: Xiangrui Meng)

> List scenarios to be handled by barrier execution mode properly
> ---
>
> Key: SPARK-24580
> URL: https://issues.apache.org/jira/browse/SPARK-24580
> Project: Spark
>  Issue Type: Story
>  Components: ML, Spark Core
>Affects Versions: 3.0.0
>Reporter: Xiangrui Meng
>Assignee: Jiang Xingbo
>Priority: Major
>
> List scenarios to be handled by barrier execution mode to help the design. We 
> will start with simple ones to complex.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24580) List scenarios to be handled by barrier execution mode properly

2018-06-18 Thread Xiangrui Meng (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24580?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiangrui Meng updated SPARK-24580:
--
Description: 
List scenarios to be handled by barrier execution mode to help the design. We 
will start with simple ones to complex.

 

 

  was:List scenarios to be handled by barrier execution mode to help the 
design. We will start with simple ones to complex.


> List scenarios to be handled by barrier execution mode properly
> ---
>
> Key: SPARK-24580
> URL: https://issues.apache.org/jira/browse/SPARK-24580
> Project: Spark
>  Issue Type: Story
>  Components: ML, Spark Core
>Affects Versions: 3.0.0
>Reporter: Xiangrui Meng
>Assignee: Xiangrui Meng
>Priority: Major
>
> List scenarios to be handled by barrier execution mode to help the design. We 
> will start with simple ones to complex.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24582) Design: Barrier execution mode

2018-06-18 Thread Xiangrui Meng (JIRA)
Xiangrui Meng created SPARK-24582:
-

 Summary: Design: Barrier execution mode
 Key: SPARK-24582
 URL: https://issues.apache.org/jira/browse/SPARK-24582
 Project: Spark
  Issue Type: Story
  Components: ML, Spark Core
Affects Versions: 3.0.0
Reporter: Xiangrui Meng
Assignee: Jiang Xingbo


[~jiangxb1987] and [~cloud_fan] outlined a design sketch in SPARK-24375, which 
covers some basic scenarios. This story is for a formal design of the barrier 
execution mode.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24581) Design: BarrierTaskContext.barrier()

2018-06-18 Thread Xiangrui Meng (JIRA)
Xiangrui Meng created SPARK-24581:
-

 Summary: Design: BarrierTaskContext.barrier()
 Key: SPARK-24581
 URL: https://issues.apache.org/jira/browse/SPARK-24581
 Project: Spark
  Issue Type: Story
  Components: ML, Spark Core
Affects Versions: 3.0.0
Reporter: Xiangrui Meng
Assignee: Jiang Xingbo


We need to provide a communication barrier function to users to help coordinate 
tasks within a barrier stage. This is very similar to MPI_Barrier function in 
MPI. This story is for its design.

 

Requirements:
 * Low-latency. The tasks should be unblocked soon after all tasks have reached 
this barrier. The latency is more important than CPU cycles here.
 * Support unlimited timeout with proper logging. For DL tasks, it might take 
very long to converge, we should support unlimited timeout with proper logging. 
So users know why a task is waiting.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24580) List scenarios to be handled by barrier execution mode properly

2018-06-18 Thread Xiangrui Meng (JIRA)
Xiangrui Meng created SPARK-24580:
-

 Summary: List scenarios to be handled by barrier execution mode 
properly
 Key: SPARK-24580
 URL: https://issues.apache.org/jira/browse/SPARK-24580
 Project: Spark
  Issue Type: Story
  Components: ML, Spark Core
Affects Versions: 3.0.0
Reporter: Xiangrui Meng
Assignee: Xiangrui Meng


List scenarios to be handled by barrier execution mode to help the design. We 
will start with simple ones to complex.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24579) SPIP: Standardize Optimized Data Exchange between Spark and DL/AI frameworks

2018-06-18 Thread Xiangrui Meng (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiangrui Meng updated SPARK-24579:
--
Labels: Hydrogen  (was: )

> SPIP: Standardize Optimized Data Exchange between Spark and DL/AI frameworks
> 
>
> Key: SPARK-24579
> URL: https://issues.apache.org/jira/browse/SPARK-24579
> Project: Spark
>  Issue Type: Epic
>  Components: ML, PySpark, SQL
>Affects Versions: 3.0.0
>Reporter: Xiangrui Meng
>Assignee: Xiangrui Meng
>Priority: Major
>  Labels: Hydrogen
> Attachments: [SPARK-24579] SPIP_ Standardize Optimized Data Exchange 
> between Apache Spark and DL%2FAI Frameworks .pdf
>
>
> (see attached SPIP pdf for more details)
> At the crossroads of big data and AI, we see both the success of Apache Spark 
> as a unified
> analytics engine and the rise of AI frameworks like TensorFlow and Apache 
> MXNet (incubating).
> Both big data and AI are indispensable components to drive business 
> innovation and there have
> been multiple attempts from both communities to bring them together.
> We saw efforts from AI community to implement data solutions for AI 
> frameworks like tf.data and tf.Transform. However, with 50+ data sources and 
> built-in SQL, DataFrames, and Streaming features, Spark remains the community 
> choice for big data. This is why we saw many efforts to integrate DL/AI 
> frameworks with Spark to leverage its power, for example, TFRecords data 
> source for Spark, TensorFlowOnSpark, TensorFrames, etc. As part of Project 
> Hydrogen, this SPIP takes a different angle at Spark + AI unification.
> None of the integrations are possible without exchanging data between Spark 
> and external DL/AI frameworks. And the performance matters. However, there 
> doesn’t exist a standard way to exchange data and hence implementation and 
> performance optimization fall into pieces. For example, TensorFlowOnSpark 
> uses Hadoop InputFormat/OutputFormat for TensorFlow’s TFRecords to load and 
> save data and pass the RDD records to TensorFlow in Python. And TensorFrames 
> converts Spark DataFrames Rows to/from TensorFlow Tensors using TensorFlow’s 
> Java API. How can we reduce the complexity?
> The proposal here is to standardize the data exchange interface (or format) 
> between Spark and DL/AI frameworks and optimize data conversion from/to this 
> interface.  So DL/AI frameworks can leverage Spark to load data virtually 
> from anywhere without spending extra effort building complex data solutions, 
> like reading features from a production data warehouse or streaming model 
> inference. Spark users can use DL/AI frameworks without learning specific 
> data APIs implemented there. And developers from both sides can work on 
> performance optimizations independently given the interface itself doesn’t 
> introduce big overhead.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue

2018-06-18 Thread Li Jin (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16515879#comment-16515879
 ] 

Li Jin edited comment on SPARK-24578 at 6/18/18 3:24 PM:
-

cc @gatorsmile [~cloud_fan]

We found this when switching from 2.2.1 to 2.3.0 in one of our applications. 
The implication is pretty bad - the time outs significantly hurt the 
performance (20s to several minutes for some jobs). This could affect other 
Spark 2.3 users too because it's pretty easy to reproduce.


was (Author: icexelloss):
cc @gatorsmile

We found this when switching from 2.2.1 to 2.3.0 in one of our applications. 
The implication is pretty bad - the time outs significantly hurt the 
performance (20s to several minutes for some jobs). This could affect other 
Spark 2.3 users too because it's pretty easy to reproduce.

> Reading remote cache block behavior changes and causes timeout issue
> 
>
> Key: SPARK-24578
> URL: https://issues.apache.org/jira/browse/SPARK-24578
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0, 2.3.1
>Reporter: Wenbo Zhao
>Priority: Major
>
> After Spark 2.3, we observed lots of errors like the following in some of our 
> production job
> {code:java}
> 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result 
> ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, 
> chunkIndex=0}, 
> buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to 
> /172.22.18.7:60865; closing connection
> java.io.IOException: Broken pipe
> at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
> at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
> at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
> at sun.nio.ch.IOUtil.write(IOUtil.java:65)
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355)
> at 
> io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
> at 
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901)
> at 
> io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983)
> at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248)
> at 
> io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284)
> at 
> io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
> at 
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
> {code}
>  
> Here is a small reproducible for a small cluster of 2 executors (say host-1 
> 

[jira] [Updated] (SPARK-24579) SPIP: Standardize Optimized Data Exchange between Spark and DL/AI frameworks

2018-06-18 Thread Xiangrui Meng (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiangrui Meng updated SPARK-24579:
--
Description: 
(see attached SPIP pdf for more details)

At the crossroads of big data and AI, we see both the success of Apache Spark 
as a unified
analytics engine and the rise of AI frameworks like TensorFlow and Apache MXNet 
(incubating).
Both big data and AI are indispensable components to drive business innovation 
and there have
been multiple attempts from both communities to bring them together.

We saw efforts from AI community to implement data solutions for AI frameworks 
like tf.data and tf.Transform. However, with 50+ data sources and built-in SQL, 
DataFrames, and Streaming features, Spark remains the community choice for big 
data. This is why we saw many efforts to integrate DL/AI frameworks with Spark 
to leverage its power, for example, TFRecords data source for Spark, 
TensorFlowOnSpark, TensorFrames, etc. As part of Project Hydrogen, this SPIP 
takes a different angle at Spark + AI unification.

None of the integrations are possible without exchanging data between Spark and 
external DL/AI frameworks. And the performance matters. However, there doesn’t 
exist a standard way to exchange data and hence implementation and performance 
optimization fall into pieces. For example, TensorFlowOnSpark uses Hadoop 
InputFormat/OutputFormat for TensorFlow’s TFRecords to load and save data and 
pass the RDD records to TensorFlow in Python. And TensorFrames converts Spark 
DataFrames Rows to/from TensorFlow Tensors using TensorFlow’s Java API. How can 
we reduce the complexity?

The proposal here is to standardize the data exchange interface (or format) 
between Spark and DL/AI frameworks and optimize data conversion from/to this 
interface.  So DL/AI frameworks can leverage Spark to load data virtually from 
anywhere without spending extra effort building complex data solutions, like 
reading features from a production data warehouse or streaming model inference. 
Spark users can use DL/AI frameworks without learning specific data APIs 
implemented there. And developers from both sides can work on performance 
optimizations independently given the interface itself doesn’t introduce big 
overhead.

> SPIP: Standardize Optimized Data Exchange between Spark and DL/AI frameworks
> 
>
> Key: SPARK-24579
> URL: https://issues.apache.org/jira/browse/SPARK-24579
> Project: Spark
>  Issue Type: Epic
>  Components: ML, PySpark, SQL
>Affects Versions: 3.0.0
>Reporter: Xiangrui Meng
>Assignee: Xiangrui Meng
>Priority: Major
>  Labels: Hydrogen
> Attachments: [SPARK-24579] SPIP_ Standardize Optimized Data Exchange 
> between Apache Spark and DL%2FAI Frameworks .pdf
>
>
> (see attached SPIP pdf for more details)
> At the crossroads of big data and AI, we see both the success of Apache Spark 
> as a unified
> analytics engine and the rise of AI frameworks like TensorFlow and Apache 
> MXNet (incubating).
> Both big data and AI are indispensable components to drive business 
> innovation and there have
> been multiple attempts from both communities to bring them together.
> We saw efforts from AI community to implement data solutions for AI 
> frameworks like tf.data and tf.Transform. However, with 50+ data sources and 
> built-in SQL, DataFrames, and Streaming features, Spark remains the community 
> choice for big data. This is why we saw many efforts to integrate DL/AI 
> frameworks with Spark to leverage its power, for example, TFRecords data 
> source for Spark, TensorFlowOnSpark, TensorFrames, etc. As part of Project 
> Hydrogen, this SPIP takes a different angle at Spark + AI unification.
> None of the integrations are possible without exchanging data between Spark 
> and external DL/AI frameworks. And the performance matters. However, there 
> doesn’t exist a standard way to exchange data and hence implementation and 
> performance optimization fall into pieces. For example, TensorFlowOnSpark 
> uses Hadoop InputFormat/OutputFormat for TensorFlow’s TFRecords to load and 
> save data and pass the RDD records to TensorFlow in Python. And TensorFrames 
> converts Spark DataFrames Rows to/from TensorFlow Tensors using TensorFlow’s 
> Java API. How can we reduce the complexity?
> The proposal here is to standardize the data exchange interface (or format) 
> between Spark and DL/AI frameworks and optimize data conversion from/to this 
> interface.  So DL/AI frameworks can leverage Spark to load data virtually 
> from anywhere without spending extra effort building complex data solutions, 
> like reading features from a production data warehouse or streaming model 
> inference. Spark users can use DL/AI frameworks 

[jira] [Updated] (SPARK-24579) SPIP: Standardize Optimized Data Exchange between Spark and DL/AI frameworks

2018-06-18 Thread Xiangrui Meng (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiangrui Meng updated SPARK-24579:
--
Attachment: [SPARK-24579] SPIP_ Standardize Optimized Data Exchange between 
Apache Spark and DL%2FAI Frameworks .pdf

> SPIP: Standardize Optimized Data Exchange between Spark and DL/AI frameworks
> 
>
> Key: SPARK-24579
> URL: https://issues.apache.org/jira/browse/SPARK-24579
> Project: Spark
>  Issue Type: Epic
>  Components: ML, PySpark, SQL
>Affects Versions: 3.0.0
>Reporter: Xiangrui Meng
>Assignee: Xiangrui Meng
>Priority: Major
> Attachments: [SPARK-24579] SPIP_ Standardize Optimized Data Exchange 
> between Apache Spark and DL%2FAI Frameworks .pdf
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue

2018-06-18 Thread Li Jin (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16515879#comment-16515879
 ] 

Li Jin commented on SPARK-24578:


cc @gatorsmile

We found this when switching from 2.2.1 to 2.3.0 in one of our applications. 
The implication is pretty bad - the time outs significantly hurt the 
performance (20s to several minutes for some jobs). This could affect other 
Spark 2.3 users too because it's pretty easy to reproduce.

> Reading remote cache block behavior changes and causes timeout issue
> 
>
> Key: SPARK-24578
> URL: https://issues.apache.org/jira/browse/SPARK-24578
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0, 2.3.1
>Reporter: Wenbo Zhao
>Priority: Major
>
> After Spark 2.3, we observed lots of errors like the following in some of our 
> production job
> {code:java}
> 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result 
> ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, 
> chunkIndex=0}, 
> buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to 
> /172.22.18.7:60865; closing connection
> java.io.IOException: Broken pipe
> at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
> at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
> at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
> at sun.nio.ch.IOUtil.write(IOUtil.java:65)
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355)
> at 
> io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
> at 
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901)
> at 
> io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983)
> at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248)
> at 
> io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284)
> at 
> io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
> at 
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
> {code}
>  
> Here is a small reproducible for a small cluster of 2 executors (say host-1 
> and host-2) each with 8 cores. Here, the memory of driver and executors are 
> not an import factor here as long as it is big enough, say 20G. 
> {code:java}
> val n = 1
> val df0 = sc.parallelize(1 to n).toDF
> val df = df0.withColumn("x0", rand()).withColumn("x0", rand()
> ).withColumn("x1", rand()
> ).withColumn("x2", rand()
> ).withColumn("x3", rand()
> ).withColumn("x4", 

[jira] [Created] (SPARK-24579) SPIP: Standardize Optimized Data Exchange between Spark and DL/AI frameworks

2018-06-18 Thread Xiangrui Meng (JIRA)
Xiangrui Meng created SPARK-24579:
-

 Summary: SPIP: Standardize Optimized Data Exchange between Spark 
and DL/AI frameworks
 Key: SPARK-24579
 URL: https://issues.apache.org/jira/browse/SPARK-24579
 Project: Spark
  Issue Type: Epic
  Components: ML, PySpark, SQL
Affects Versions: 3.0.0
Reporter: Xiangrui Meng
Assignee: Xiangrui Meng






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue

2018-06-18 Thread Wenbo Zhao (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenbo Zhao updated SPARK-24578:
---
Description: 
After Spark 2.3, we observed lots of errors like the following in some of our 
production job
{code:java}
18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result 
ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, 
chunkIndex=0}, 
buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to 
/172.22.18.7:60865; closing connection
java.io.IOException: Broken pipe
at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
at sun.nio.ch.IOUtil.write(IOUtil.java:65)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
at 
org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156)
at 
org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142)
at 
org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
at 
io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355)
at 
io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224)
at 
io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382)
at 
io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
at 
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362)
at 
io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901)
at 
io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
at 
io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
at 
io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
at 
io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
at 
io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
at 
io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983)
at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248)
at 
io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284)
at 
io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at 
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
at 
io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at 
io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
{code}
 

Here is a small reproducible for a small cluster of 2 executors (say host-1 and 
host-2) each with 8 cores. Here, the memory of driver and executors are not an 
import factor here as long as it is big enough, say 20G. 
{code:java}
val n = 1
val df0 = sc.parallelize(1 to n).toDF
val df = df0.withColumn("x0", rand()).withColumn("x0", rand()
).withColumn("x1", rand()
).withColumn("x2", rand()
).withColumn("x3", rand()
).withColumn("x4", rand()
).withColumn("x5", rand()
).withColumn("x6", rand()
).withColumn("x7", rand()
).withColumn("x8", rand()
).withColumn("x9", rand())

df.cache; df.count

(1 to 10).toArray.par.map { i => println(i); 
df.groupBy("x1").agg(count("value")).show() }
{code}
 

In the above example, we generate a random DataFrame of size around 7G; cache 
it and then perform a parallel DataFrame operations by using `array.par.map`. 
Because of the parallel computation, with high possibility, some task could be 
scheduled to a host-2 where it needs to read the cache block data from host-1. 
This follows the code path of 
[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L691]
 and then tries to transfer a big block (~ 500MB) of cache block from host-1 to 
host-2. Often, this big transfer makes the cluster suffer time out issue (it 
will retry 3 times, each with 120s 

[jira] [Commented] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue

2018-06-18 Thread Wenbo Zhao (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16515858#comment-16515858
 ] 

Wenbo Zhao commented on SPARK-24578:


An easier reproduciable cluster setting is 10 executors each with 2 cores and 
15G memory.

> Reading remote cache block behavior changes and causes timeout issue
> 
>
> Key: SPARK-24578
> URL: https://issues.apache.org/jira/browse/SPARK-24578
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0, 2.3.1
>Reporter: Wenbo Zhao
>Priority: Major
>
> After Spark 2.3, we observed lots of errors like the following in some of our 
> production job
> {code:java}
> 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result 
> ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, 
> chunkIndex=0}, 
> buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to 
> /172.22.18.7:60865; closing connection
> java.io.IOException: Broken pipe
> at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
> at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
> at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
> at sun.nio.ch.IOUtil.write(IOUtil.java:65)
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355)
> at 
> io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
> at 
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901)
> at 
> io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983)
> at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248)
> at 
> io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284)
> at 
> io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
> at 
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
> {code}
>  
> Here is a small reproducible for a small cluster of 2 executors (say host-1 
> and host-2) each with 8 cores (the memory of driver and executors are not a 
> import factor here as long as it is big enough, say 20G).
> {code:java}
> val n = 1
> val df0 = sc.parallelize(1 to n).toDF
> val df = df0.withColumn("x0", rand()).withColumn("x0", rand()
> ).withColumn("x1", rand()
> ).withColumn("x2", rand()
> ).withColumn("x3", rand()
> ).withColumn("x4", rand()
> ).withColumn("x5", rand()
> ).withColumn("x6", rand()
> ).withColumn("x7", rand()
> ).withColumn("x8", rand()
> ).withColumn("x9", rand())
> df.cache; df.count
> (1 to 10).toArray.par.map { i => 

[jira] [Updated] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue

2018-06-18 Thread Li Jin (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Li Jin updated SPARK-24578:
---
Component/s: (was: Input/Output)
 Spark Core

> Reading remote cache block behavior changes and causes timeout issue
> 
>
> Key: SPARK-24578
> URL: https://issues.apache.org/jira/browse/SPARK-24578
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0, 2.3.1
>Reporter: Wenbo Zhao
>Priority: Major
>
> After Spark 2.3, we observed lots of errors like the following in some of our 
> production job
> {code:java}
> 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result 
> ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, 
> chunkIndex=0}, 
> buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to 
> /172.22.18.7:60865; closing connection
> java.io.IOException: Broken pipe
> at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
> at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
> at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
> at sun.nio.ch.IOUtil.write(IOUtil.java:65)
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355)
> at 
> io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
> at 
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901)
> at 
> io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983)
> at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248)
> at 
> io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284)
> at 
> io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
> at 
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
> {code}
>  
> Here is a small reproducible for a small cluster of 2 executors (say host-1 
> and host-2) each with 8 cores (the memory of driver and executors are not a 
> import factor here as long as it is big enough, say 20G).
> {code:java}
> val n = 1
> val df0 = sc.parallelize(1 to n).toDF
> val df = df0.withColumn("x0", rand()).withColumn("x0", rand()
> ).withColumn("x1", rand()
> ).withColumn("x2", rand()
> ).withColumn("x3", rand()
> ).withColumn("x4", rand()
> ).withColumn("x5", rand()
> ).withColumn("x6", rand()
> ).withColumn("x7", rand()
> ).withColumn("x8", rand()
> ).withColumn("x9", rand())
> df.cache; df.count
> (1 to 10).toArray.par.map { i => println(i); 
> df.groupBy("x1").agg(count("value")).show() }
> {code}
>  
> 

[jira] [Updated] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue

2018-06-18 Thread Wenbo Zhao (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenbo Zhao updated SPARK-24578:
---
Description: 
After Spark 2.3, we observed lots of errors like the following in some of our 
production job
{code:java}
18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result 
ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, 
chunkIndex=0}, 
buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to 
/172.22.18.7:60865; closing connection
java.io.IOException: Broken pipe
at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
at sun.nio.ch.IOUtil.write(IOUtil.java:65)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
at 
org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156)
at 
org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142)
at 
org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
at 
io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355)
at 
io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224)
at 
io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382)
at 
io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
at 
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362)
at 
io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901)
at 
io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
at 
io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
at 
io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
at 
io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
at 
io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
at 
io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983)
at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248)
at 
io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284)
at 
io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at 
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
at 
io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at 
io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
{code}
 

Here is a small reproducible for a small cluster of 2 executors (say host-1 and 
host-2) each with 8 cores (the memory of driver and executors are not a import 
factor here as long as it is big enough, say 20G).
{code:java}
val n = 1
val df0 = sc.parallelize(1 to n).toDF
val df = df0.withColumn("x0", rand()).withColumn("x0", rand()
).withColumn("x1", rand()
).withColumn("x2", rand()
).withColumn("x3", rand()
).withColumn("x4", rand()
).withColumn("x5", rand()
).withColumn("x6", rand()
).withColumn("x7", rand()
).withColumn("x8", rand()
).withColumn("x9", rand())

df.cache; df.count

(1 to 10).toArray.par.map { i => println(i); 
df.groupBy("x1").agg(count("value")).show() }
{code}
 

In the above example, we generated a random DataFrame of size around 7G; cache 
it and then did a parallel DataFrame operations by using `array.par.map`. 
Because of the parallel computation, with high possibility, some task will be 
scheduled to a host-2 where it needs to read the cache block data from host-1. 
This will follow the code path of 
[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L691]
 then try to transfer a big block (~ 600MB) of cache block from host-1 to 
host-2. Often, this big transfer made the cluster suffer time out issue (it 
will retry 3 times, each with 120s timeout, and then 

[jira] [Updated] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue

2018-06-18 Thread Wenbo Zhao (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenbo Zhao updated SPARK-24578:
---
Description: 
After Spark 2.3, we observed lots of errors like the following
{code:java}
18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result 
ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, 
chunkIndex=0}, 
buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to 
/172.22.18.7:60865; closing connection
java.io.IOException: Broken pipe
at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
at sun.nio.ch.IOUtil.write(IOUtil.java:65)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
at 
org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156)
at 
org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142)
at 
org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
at 
io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355)
at 
io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224)
at 
io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382)
at 
io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
at 
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362)
at 
io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901)
at 
io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
at 
io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
at 
io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
at 
io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
at 
io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
at 
io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983)
at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248)
at 
io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284)
at 
io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at 
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
at 
io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at 
io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
{code}
 

Here is a small reproducible for a small cluster of 2 executors (say host-1 and 
host-2) each with 8 cores (the memory of driver and executors are not a import 
factor here as long as it is big enough, say 20G).
{code:java}
val n = 1
val df0 = sc.parallelize(1 to n).toDF
val df = df0.withColumn("x0", rand()).withColumn("x0", rand()
).withColumn("x1", rand()
).withColumn("x2", rand()
).withColumn("x3", rand()
).withColumn("x4", rand()
).withColumn("x5", rand()
).withColumn("x6", rand()
).withColumn("x7", rand()
).withColumn("x8", rand()
).withColumn("x9", rand())

df.cache; df.count

(1 to 10).toArray.par.map { i => println(i); 
df.groupBy("x1").agg(count("value")).show() }
{code}
 

In the above example, we generated a random DataFrame of size around 7G; cache 
it and then did a parallel DataFrame operations by using `array.par.map`. 
Because of the parallel computation, with high possibility, some task will be 
scheduled to a host-2 where it needs to read the cache block data from host-1. 
This will follow the code path of 
[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L691]
 then try to transfer a big block (~ 600MB) of cache block from host-1 to 
host-2. Often, this big transfer made the cluster suffer time out issue (it 
will retry 3 times, each with 120s timeout, and then do recompute to put the 
cache 

[jira] [Updated] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue

2018-06-18 Thread Wenbo Zhao (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenbo Zhao updated SPARK-24578:
---
Description: 
After Spark 2.3, we observed lots of errors like the following

 
{code:java}
18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result 
ChunkFetchSuccess{streamChunkId=StreamChunkId

{streamId=91672904003, chunkIndex=0}

, buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to 
/172.22.18.7:60865; closing connection
java.io.IOException: Broken pipe
at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
at sun.nio.ch.IOUtil.write(IOUtil.java:65)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
at 
org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156)
at 
org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142)
at 
org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
at 
io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355)
at 
io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224)
at 
io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382)
at 
io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
at 
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362)
at 
io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901)
at 
io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
at 
io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
at 
io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
at 
io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
at 
io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
at 
io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983)
at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248)
at 
io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284)
at 
io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at 
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
at 
io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at 
io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
{code}
Here is a small reproducible for a small cluster of 2 executors (say host-1 and 
host-2) each with 8 cores (the memory of driver and executors are not a import 
factor here as long as it is big enough, say 10G).

 

 
{code:java}
val n = 1
val df0 = sc.parallelize(1 to n).toDF
val df = df0.withColumn("x0", rand()).withColumn("x0", rand()
).withColumn("x1", rand()
).withColumn("x2", rand()
).withColumn("x3", rand()
).withColumn("x4", rand()
).withColumn("x5", rand()
).withColumn("x6", rand()
).withColumn("x7", rand()
).withColumn("x8", rand()
).withColumn("x9", rand())

df.cache; df.count

(1 to 10).toArray.par.map { i => println(i); 
df.groupBy("x1").agg(count("value")).show() }
{code}
 

In the above example, we generated a random DataFrame of size around 7G; cache 
it and then did a parallel DataFrame operations by using `array.par.map`. 
Because of the parallel computation, with high possibility, some task will be 
scheduled to a host-2 where the task needs to read the cache block data from 
host-1. This will follow the code path of 
[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L691]
 then try to transfer a big block (~ 600MB) of cache block from host-1 to 
host-2. Often, this big transfer made the cluster suffer time out issue. 

We couldn't to reproduce the same issue in Spark 2.2.1. From the log of 

[jira] [Created] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue

2018-06-18 Thread Wenbo Zhao (JIRA)
Wenbo Zhao created SPARK-24578:
--

 Summary: Reading remote cache block behavior changes and causes 
timeout issue
 Key: SPARK-24578
 URL: https://issues.apache.org/jira/browse/SPARK-24578
 Project: Spark
  Issue Type: Bug
  Components: Input/Output
Affects Versions: 2.3.1, 2.3.0
Reporter: Wenbo Zhao


After Spark 2.3, we observed lots of errors like the following
18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result 
ChunkFetchSuccess\{streamChunkId=StreamChunkId{streamId=91672904003, 
chunkIndex=0}, 
buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to 
/172.22.18.7:60865; closing connection
java.io.IOException: Broken pipe
at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
at sun.nio.ch.IOUtil.write(IOUtil.java:65)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
at 
org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156)
at 
org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142)
at 
org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
at 
io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355)
at 
io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224)
at 
io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382)
at 
io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
at 
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362)
at 
io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901)
at 
io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
at 
io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
at 
io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
at 
io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
at 
io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
at 
io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
at 
io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983)
at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248)
at 
io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284)
at 
io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at 
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
at 
io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at 
io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
Here is a small reproducible for a small cluster of 2 executors each with 8 
cores (the memory of driver and executors are not a import factor here as long 
as it is big enough, say 10G).
val n = 1
val df0 = sc.parallelize(1 to n).toDF
val df = df0.withColumn("x0", rand()).withColumn("x0", rand()
).withColumn("x1", rand()
).withColumn("x2", rand()
).withColumn("x3", rand()
).withColumn("x4", rand()
).withColumn("x5", rand()
).withColumn("x6", rand()
).withColumn("x7", rand()
).withColumn("x8", rand()
).withColumn("x9", rand())
df.cache; df.count
(1 to 10).toArray.par.map { i => println(i);  
df.groupBy("x1").agg(count("value")).show() }
 

In the above example, we generated a random DataFrame of size around 7G; cache 
it and then did a parallel Dataframe operations by using `array.par.map`. 
Because of the parallel computation, with high possibility, some task will be 
scheduled to a  host-2 where the task needs to 

[jira] [Commented] (SPARK-23858) Need to apply pyarrow adjustments to complex types with DateType/TimestampType

2018-06-18 Thread SemanticBeeng (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-23858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16515809#comment-16515809
 ] 

SemanticBeeng commented on SPARK-23858:
---

Do you have failing tests as specs, Bryan, please?

> Need to apply pyarrow adjustments to complex types with 
> DateType/TimestampType 
> ---
>
> Key: SPARK-23858
> URL: https://issues.apache.org/jira/browse/SPARK-23858
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark, SQL
>Affects Versions: 2.3.0
>Reporter: Bryan Cutler
>Priority: Major
>
> Currently, ArrayTypes with DateType and TimestampType need to perform the 
> same adjustments as simple types, e.g.  
> {{_check_series_localize_timestamps}}, and that should work for nested types 
> as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24577) Spark submit fails with documentation example spark-pi

2018-06-18 Thread Kuku1 (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kuku1 updated SPARK-24577:
--
Description: 
The Spark-submit example in the [K8s 
documentation|http://spark.apache.org/docs/latest/running-on-kubernetes.html#cluster-mode]
 fails for me.
{code:java}
.\spark-submit.cmd --master k8s://https://my-k8s:8443
--conf spark.kubernetes.namespace=my-namespace --deploy-mode cluster --name 
spark-pi --class org.apache.spark.examples.SparkPi
--conf spark.executor.instances=5
--conf spark.kubernetes.container.image=gcr.io/ynli-k8s/spark:v2.3.0
--conf spark.kubernetes.driver.pod.name=spark-pi-driver 
local:///opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar
{code}
Error in the driver log:
{code:java}
++ id -u
+ myuid=0
++ id -g
+ mygid=0
++ getent passwd 0
+ uidentry=root:x:0:0:root:/root:/bin/ash
+ '[' -z root:x:0:0:root:/root:/bin/ash ']'
+ SPARK_K8S_CMD=driver
+ '[' -z driver ']'
+ shift 1
+ SPARK_CLASSPATH=':/opt/spark/jars/*'
+ env
+ grep SPARK_JAVA_OPT_
+ sed 's/[^=]*=\(.*\)/\1/g'
+ readarray -t SPARK_JAVA_OPTS
+ '[' -n 
'/opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar;/opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar'
 ']'
+ 
SPARK_CLASSPATH=':/opt/spark/jars/*:/opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar;/opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar'
+ '[' -n '' ']'
+ case "$SPARK_K8S_CMD" in
+ CMD=(${JAVA_HOME}/bin/java "${SPARK_JAVA_OPTS[@]}" -cp "$SPARK_CLASSPATH" 
-Xms$SPARK_DRIVER_MEMORY -Xmx$SPARK_DRIVER_MEMORY 
-Dspark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS $SPARK_DRIVER_CLASS 
$SPARK_DRIVER_ARGS)
+ exec /sbin/tini -s -- /usr/lib/jvm/java-1.8-openjdk/bin/java 
-Dspark.kubernetes.namespace=my-namespace -Dspark.driver.port=7078 
-Dspark.master=k8s://https://my-k8s:8443  
-Dspark.jars=/opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar,/opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar
 -Dspark.driver.blockManager.port=7079 
-Dspark.app.id=spark-311b7351345240fd89d6d86eaabdff6f 
-Dspark.kubernetes.driver.pod.name=spark-pi-driver -Dspark.executor.instances=5 
-Dspark.app.name=spark-pi 
-Dspark.driver.host=spark-pi-ef6be7cac60a3f789f9714b2ebd1c68c-driver-svc.my-namespace.svc
 -Dspark.submit.deployMode=cluster 
-Dspark.kubernetes.executor.podNamePrefix=spark-pi-ef6be7cac60a3f789f9714b2ebd1c68c
 -Dspark.kubernetes.container.image=gcr.io/ynli-k8s/spark:v2.3.0 -cp 
':/opt/spark/jars/*:/opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar;/opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar'
 -Xms1g -Xmx1g -Dspark.driver.bindAddress=172.101.1.40 
org.apache.spark.examples.SparkPi
Error: Could not find or load main class org.apache.spark.examples.SparkPi
{code}
I am also using spark-operator to run the example and this one works for me. 
The spark-operator outputs its command to spark-submit:

 
{code:java}
++ id -u
+ myuid=0
++ id -g
+ mygid=0
++ getent passwd 0
+ uidentry=root:x:0:0:root:/root:/bin/ash
+ '[' -z root:x:0:0:root:/root:/bin/ash ']'
+ SPARK_K8S_CMD=driver
+ '[' -z driver ']'
+ shift 1
+ SPARK_CLASSPATH=':/opt/spark/jars/*'
+ env
+ grep SPARK_JAVA_OPT_
+ sed 's/[^=]*=\(.*\)/\1/g'
+ readarray -t SPARK_JAVA_OPTS
+ '[' -n 
/opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar:/opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar
 ']'
+ 
SPARK_CLASSPATH=':/opt/spark/jars/*:/opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar:/opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar'
+ '[' -n '' ']'
+ case "$SPARK_K8S_CMD" in
+ CMD=(${JAVA_HOME}/bin/java "${SPARK_JAVA_OPTS[@]}" -cp "$SPARK_CLASSPATH" 
-Xms$SPARK_DRIVER_MEMORY -Xmx$SPARK_DRIVER_MEMORY
-Dspark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS $SPARK_DRIVER_CLASS 
$SPARK_DRIVER_ARGS)
+ exec /sbin/tini -s -- /usr/lib/jvm/java-1.8-openjdk/bin/java
-Dspark.kubernetes.driver.label.sparkoperator.k8s.io/app-id=spark-pi-2557211557
-Dspark.kubernetes.container.image=gcr.io/ynli-k8s/spark:v2.3.0
-Dspark.kubernetes.executor.label.sparkoperator.k8s.io/app-name=spark-pi
-Dspark.app.name=spark-pi
-Dspark.executor.instances=7
-Dspark.driver.blockManager.port=7079
-Dspark.driver.cores=0.10
-Dspark.kubernetes.driver.label.version=2.3.0
-Dspark.kubernetes.executor.podNamePrefix=spark-pi-607e0943cf32319883cc3beb2e02be4f
-Dspark.executor.memory=512m
-Dspark.kubernetes.driver.label.sparkoperator.k8s.io/app-name=spark-pi
-Dspark.kubernetes.authenticate.driver.serviceAccountName=spark
-Dspark.kubernetes.driver.label.sparkoperator.k8s.io/launched-by-spark-operator=true
-Dspark.kubernetes.driver.limit.cores=200m
-Dspark.driver.host=spark-pi-607e0943cf32319883cc3beb2e02be4f
-Driver-svc.big
-Data-analytics.svc
-Dspark.kubernetes.driver.pod.name=spark-pi-607e0943cf32319883cc3beb2e02be4f
-Driver
-Dspark.submit.deployMode=cluster
-Dspark.kubernetes.executor.label.sparkoperator.k8s.io/app-id=spark-pi-2557211557
-Dspark.kubernetes.submission.waitAppCompletion=false

[jira] [Updated] (SPARK-24577) Spark submit fails with documentation example spark-pi

2018-06-18 Thread Kuku1 (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kuku1 updated SPARK-24577:
--
Description: 
The Spark-submit example in the [K8s 
documentation|http://spark.apache.org/docs/latest/running-on-kubernetes.html#cluster-mode]
 fails for me.
{code:java}
.\spark-submit.cmd --master k8s://https://my-k8s:8443
--conf spark.kubernetes.namespace=my-namespace --deploy-mode cluster --name 
spark-pi --class org.apache.spark.examples.SparkPi
--conf spark.executor.instances=5
--conf spark.kubernetes.container.image=gcr.io/ynli-k8s/spark:v2.3.0
--conf spark.kubernetes.driver.pod.name=spark-pi-driver 
local:///opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar
{code}
Error in the driver log:
{code:java}
++ id -u
+ myuid=0
++ id -g
+ mygid=0
++ getent passwd 0
+ uidentry=root:x:0:0:root:/root:/bin/ash
+ '[' -z root:x:0:0:root:/root:/bin/ash ']'
+ SPARK_K8S_CMD=driver
+ '[' -z driver ']'
+ shift 1
+ SPARK_CLASSPATH=':/opt/spark/jars/*'
+ env
+ grep SPARK_JAVA_OPT_
+ sed 's/[^=]*=\(.*\)/\1/g'
+ readarray -t SPARK_JAVA_OPTS
+ '[' -n 
'/opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar;/opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar'
 ']'
+ 
SPARK_CLASSPATH=':/opt/spark/jars/*:/opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar;/opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar'
+ '[' -n '' ']'
+ case "$SPARK_K8S_CMD" in
+ CMD=(${JAVA_HOME}/bin/java "${SPARK_JAVA_OPTS[@]}" -cp "$SPARK_CLASSPATH" 
-Xms$SPARK_DRIVER_MEMORY -Xmx$SPARK_DRIVER_MEMORY 
-Dspark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS $SPARK_DRIVER_CLASS 
$SPARK_DRIVER_ARGS)
+ exec /sbin/tini -s -- /usr/lib/jvm/java-1.8-openjdk/bin/java 
-Dspark.kubernetes.namespace=my-namespace -Dspark.driver.port=7078 
-Dspark.master=k8s://https://my-k8s:8443  
-Dspark.jars=/opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar,/opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar
 -Dspark.driver.blockManager.port=7079 
-Dspark.app.id=spark-311b7351345240fd89d6d86eaabdff6f 
-Dspark.kubernetes.driver.pod.name=spark-pi-driver -Dspark.executor.instances=5 
-Dspark.app.name=spark-pi 
-Dspark.driver.host=spark-pi-ef6be7cac60a3f789f9714b2ebd1c68c-driver-svc.my-namespace.svc
 -Dspark.submit.deployMode=cluster 
-Dspark.kubernetes.executor.podNamePrefix=spark-pi-ef6be7cac60a3f789f9714b2ebd1c68c
 -Dspark.kubernetes.container.image=gcr.io/ynli-k8s/spark:v2.3.0 -cp 
':/opt/spark/jars/*:/opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar;/opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar'
 -Xms1g -Xmx1g -Dspark.driver.bindAddress=172.101.1.40 
org.apache.spark.examples.SparkPi
Error: Could not find or load main class org.apache.spark.examples.SparkPi
{code}
I am also using spark-operator to run the example and this one works for me. 
The spark-operator outputs its command to spark-submit:

 
{code:java}
++ id -u
+ myuid=0
++ id -g
+ mygid=0
++ getent passwd 0
+ uidentry=root:x:0:0:root:/root:/bin/ash
+ '[' -z root:x:0:0:root:/root:/bin/ash ']'
+ SPARK_K8S_CMD=driver
+ '[' -z driver ']'
+ shift 1
+ SPARK_CLASSPATH=':/opt/spark/jars/*'
+ env
+ grep SPARK_JAVA_OPT_
+ sed 's/[^=]*=\(.*\)/\1/g'
+ readarray -t SPARK_JAVA_OPTS
+ '[' -n 
/opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar:/opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar
 ']'
+ 
SPARK_CLASSPATH=':/opt/spark/jars/*:/opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar:/opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar'
+ '[' -n '' ']'
+ case "$SPARK_K8S_CMD" in
+ CMD=(${JAVA_HOME}/bin/java "${SPARK_JAVA_OPTS[@]}" -cp "$SPARK_CLASSPATH" 
-Xms$SPARK_DRIVER_MEMORY -Xmx$SPARK_DRIVER_MEMORY
-Dspark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS $SPARK_DRIVER_CLASS 
$SPARK_DRIVER_ARGS)
+ exec /sbin/tini -s -- /usr/lib/jvm/java-1.8-openjdk/bin/java
-Dspark.kubernetes.driver.label.sparkoperator.k8s.io/app-id=spark-pi-2557211557
-Dspark.kubernetes.container.image=gcr.io/ynli-k8s/spark:v2.3.0
-Dspark.kubernetes.executor.label.sparkoperator.k8s.io/app-name=spark-pi
-Dspark.app.name=spark-pi
-Dspark.executor.instances=7
-Dspark.driver.blockManager.port=7079
-Dspark.driver.cores=0.10
-Dspark.kubernetes.driver.label.version=2.3.0
-Dspark.kubernetes.executor.podNamePrefix=spark-pi-607e0943cf32319883cc3beb2e02be4f
-Dspark.executor.memory=512m
-Dspark.kubernetes.driver.label.sparkoperator.k8s.io/app-name=spark-pi
-Dspark.kubernetes.authenticate.driver.serviceAccountName=spark
-Dspark.kubernetes.driver.label.sparkoperator.k8s.io/launched-by-spark-operator=true
-Dspark.kubernetes.driver.limit.cores=200m
-Dspark.driver.host=spark-pi-607e0943cf32319883cc3beb2e02be4f
-Driver-svc.big
-Data-analytics.svc
-Dspark.kubernetes.driver.pod.name=spark-pi-607e0943cf32319883cc3beb2e02be4f
-Driver
-Dspark.submit.deployMode=cluster
-Dspark.kubernetes.executor.label.sparkoperator.k8s.io/app-id=spark-pi-2557211557
-Dspark.kubernetes.submission.waitAppCompletion=false

[jira] [Updated] (SPARK-24577) Spark submit fails with documentation example spark-pi

2018-06-18 Thread Kuku1 (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kuku1 updated SPARK-24577:
--
Description: 
The Spark-submit example in the [K8s 
documentation|http://spark.apache.org/docs/latest/running-on-kubernetes.html#cluster-mode]
 fails for me.
{code:java}
.\spark-submit.cmd --master k8s://https://my-k8s:8443
--conf spark.kubernetes.namespace=my-namespace --deploy-mode cluster --name 
spark-pi --class org.apache.spark.examples.SparkPi
--conf spark.executor.instances=5
--conf spark.kubernetes.container.image=gcr.io/ynli-k8s/spark:v2.3.0
--conf spark.kubernetes.driver.pod.name=spark-pi-driver 
local:///opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar
{code}
Error in the driver log:
{code:java}
++ id -u
+ myuid=0
++ id -g
+ mygid=0
++ getent passwd 0
+ uidentry=root:x:0:0:root:/root:/bin/ash
+ '[' -z root:x:0:0:root:/root:/bin/ash ']'
+ SPARK_K8S_CMD=driver
+ '[' -z driver ']'
+ shift 1
+ SPARK_CLASSPATH=':/opt/spark/jars/*'
+ env
+ grep SPARK_JAVA_OPT_
+ sed 's/[^=]*=\(.*\)/\1/g'
+ readarray -t SPARK_JAVA_OPTS
+ '[' -n 
'/opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar;/opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar'
 ']'
+ 
SPARK_CLASSPATH=':/opt/spark/jars/*:/opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar;/opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar'
+ '[' -n '' ']'
+ case "$SPARK_K8S_CMD" in
+ CMD=(${JAVA_HOME}/bin/java "${SPARK_JAVA_OPTS[@]}" -cp "$SPARK_CLASSPATH" 
-Xms$SPARK_DRIVER_MEMORY -Xmx$SPARK_DRIVER_MEMORY 
-Dspark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS $SPARK_DRIVER_CLASS 
$SPARK_DRIVER_ARGS)
+ exec /sbin/tini -s -- /usr/lib/jvm/java-1.8-openjdk/bin/java 
-Dspark.kubernetes.namespace=my-namespace -Dspark.driver.port=7078 
-Dspark.master=k8s://https://my-k8s:8443  
-Dspark.jars=/opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar,/opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar
 -Dspark.driver.blockManager.port=7079 
-Dspark.app.id=spark-311b7351345240fd89d6d86eaabdff6f 
-Dspark.kubernetes.driver.pod.name=spark-pi-driver -Dspark.executor.instances=5 
-Dspark.app.name=spark-pi 
-Dspark.driver.host=spark-pi-ef6be7cac60a3f789f9714b2ebd1c68c-driver-svc.my-namespace.svc
 -Dspark.submit.deployMode=cluster 
-Dspark.kubernetes.executor.podNamePrefix=spark-pi-ef6be7cac60a3f789f9714b2ebd1c68c
 -Dspark.kubernetes.container.image=gcr.io/ynli-k8s/spark:v2.3.0 -cp 
':/opt/spark/jars/*:/opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar;/opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar'
 -Xms1g -Xmx1g -Dspark.driver.bindAddress=172.101.1.40 
org.apache.spark.examples.SparkPi
Error: Could not find or load main class org.apache.spark.examples.SparkPi
{code}
I am also using spark-operator to run the example and this one works for me. 
The spark-operator outputs its command to spark-submit:

 
{code:java}
++ id -u
+ myuid=0
++ id -g
+ mygid=0
++ getent passwd 0
+ uidentry=root:x:0:0:root:/root:/bin/ash
+ '[' -z root:x:0:0:root:/root:/bin/ash ']'
+ SPARK_K8S_CMD=driver
+ '[' -z driver ']'
+ shift 1
+ SPARK_CLASSPATH=':/opt/spark/jars/*'
+ env
+ grep SPARK_JAVA_OPT_
+ sed 's/[^=]*=\(.*\)/\1/g'
+ readarray -t SPARK_JAVA_OPTS
+ '[' -n 
/opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar:/opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar
 ']'
+ 
SPARK_CLASSPATH=':/opt/spark/jars/*:/opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar:/opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar'
+ '[' -n '' ']'
+ case "$SPARK_K8S_CMD" in
+ CMD=(${JAVA_HOME}/bin/java "${SPARK_JAVA_OPTS[@]}" -cp "$SPARK_CLASSPATH" 
-Xms$SPARK_DRIVER_MEMORY -Xmx$SPARK_DRIVER_MEMORY
-Dspark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS $SPARK_DRIVER_CLASS 
$SPARK_DRIVER_ARGS)
+ exec /sbin/tini -s -- /usr/lib/jvm/java-1.8-openjdk/bin/java
-Dspark.kubernetes.driver.label.sparkoperator.k8s.io/app-id=spark-pi-2557211557
-Dspark.kubernetes.container.image=gcr.io/ynli-k8s/spark:v2.3.0
-Dspark.kubernetes.executor.label.sparkoperator.k8s.io/app-name=spark-pi
-Dspark.app.name=spark-pi
-Dspark.executor.instances=7
-Dspark.driver.blockManager.port=7079
-Dspark.driver.cores=0.10
-Dspark.kubernetes.driver.label.version=2.3.0
-Dspark.kubernetes.executor.podNamePrefix=spark-pi-607e0943cf32319883cc3beb2e02be4f
-Dspark.executor.memory=512m
-Dspark.kubernetes.driver.label.sparkoperator.k8s.io/app-name=spark-pi
-Dspark.kubernetes.authenticate.driver.serviceAccountName=spark
-Dspark.kubernetes.driver.label.sparkoperator.k8s.io/launched-by-spark-operator=true
-Dspark.kubernetes.driver.limit.cores=200m
-Dspark.driver.host=spark-pi-607e0943cf32319883cc3beb2e02be4f
-Driver-svc.big
-Data-analytics.svc
-Dspark.kubernetes.driver.pod.name=spark-pi-607e0943cf32319883cc3beb2e02be4f
-Driver
-Dspark.submit.deployMode=cluster
-Dspark.kubernetes.executor.label.sparkoperator.k8s.io/app-id=spark-pi-2557211557
-Dspark.kubernetes.submission.waitAppCompletion=false

[jira] [Created] (SPARK-24577) Spark submit fails with documentation example spark-pi

2018-06-18 Thread Kuku1 (JIRA)
Kuku1 created SPARK-24577:
-

 Summary: Spark submit fails with documentation example spark-pi
 Key: SPARK-24577
 URL: https://issues.apache.org/jira/browse/SPARK-24577
 Project: Spark
  Issue Type: Bug
  Components: Kubernetes
Affects Versions: 2.3.1, 2.3.0
Reporter: Kuku1


The Spark-submit example in the [K8s 
documentation|http://spark.apache.org/docs/latest/running-on-kubernetes.html#cluster-mode]
 fails for me.

Error in the driver log:
{code:java}
++ id -u
+ myuid=0
++ id -g
+ mygid=0
++ getent passwd 0
+ uidentry=root:x:0:0:root:/root:/bin/ash
+ '[' -z root:x:0:0:root:/root:/bin/ash ']'
+ SPARK_K8S_CMD=driver
+ '[' -z driver ']'
+ shift 1
+ SPARK_CLASSPATH=':/opt/spark/jars/*'
+ env
+ grep SPARK_JAVA_OPT_
+ sed 's/[^=]*=\(.*\)/\1/g'
+ readarray -t SPARK_JAVA_OPTS
+ '[' -n 
'/opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar;/opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar'
 ']'
+ 
SPARK_CLASSPATH=':/opt/spark/jars/*:/opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar;/opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar'
+ '[' -n '' ']'
+ case "$SPARK_K8S_CMD" in
+ CMD=(${JAVA_HOME}/bin/java "${SPARK_JAVA_OPTS[@]}" -cp "$SPARK_CLASSPATH" 
-Xms$SPARK_DRIVER_MEMORY -Xmx$SPARK_DRIVER_MEMORY 
-Dspark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS $SPARK_DRIVER_CLASS 
$SPARK_DRIVER_ARGS)
+ exec /sbin/tini -s -- /usr/lib/jvm/java-1.8-openjdk/bin/java 
-Dspark.kubernetes.namespace=my-namespace -Dspark.driver.port=7078 
-Dspark.master=k8s://https://my-k8s:8443  
-Dspark.jars=/opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar,/opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar
 -Dspark.driver.blockManager.port=7079 
-Dspark.app.id=spark-311b7351345240fd89d6d86eaabdff6f 
-Dspark.kubernetes.driver.pod.name=spark-pi-driver -Dspark.executor.instances=5 
-Dspark.app.name=spark-pi 
-Dspark.driver.host=spark-pi-ef6be7cac60a3f789f9714b2ebd1c68c-driver-svc.my-namespace.svc
 -Dspark.submit.deployMode=cluster 
-Dspark.kubernetes.executor.podNamePrefix=spark-pi-ef6be7cac60a3f789f9714b2ebd1c68c
 -Dspark.kubernetes.container.image=gcr.io/ynli-k8s/spark:v2.3.0 -cp 
':/opt/spark/jars/*:/opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar;/opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar'
 -Xms1g -Xmx1g -Dspark.driver.bindAddress=172.101.1.40 
org.apache.spark.examples.SparkPi
Error: Could not find or load main class org.apache.spark.examples.SparkPi
{code}
I am also using spark-operator to run the example and this one works for me. 
The spark-operator outputs its command to spark-submit:

 
{code:java}
++ id -u
+ myuid=0
++ id -g
+ mygid=0
++ getent passwd 0
+ uidentry=root:x:0:0:root:/root:/bin/ash
+ '[' -z root:x:0:0:root:/root:/bin/ash ']'
+ SPARK_K8S_CMD=driver
+ '[' -z driver ']'
+ shift 1
+ SPARK_CLASSPATH=':/opt/spark/jars/*'
+ env
+ grep SPARK_JAVA_OPT_
+ sed 's/[^=]*=\(.*\)/\1/g'
+ readarray -t SPARK_JAVA_OPTS
+ '[' -n 
/opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar:/opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar
 ']'
+ 
SPARK_CLASSPATH=':/opt/spark/jars/*:/opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar:/opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar'
+ '[' -n '' ']'
+ case "$SPARK_K8S_CMD" in
+ CMD=(${JAVA_HOME}/bin/java "${SPARK_JAVA_OPTS[@]}" -cp "$SPARK_CLASSPATH" 
-Xms$SPARK_DRIVER_MEMORY -Xmx$SPARK_DRIVER_MEMORY
-Dspark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS $SPARK_DRIVER_CLASS 
$SPARK_DRIVER_ARGS)
+ exec /sbin/tini -s -- /usr/lib/jvm/java-1.8-openjdk/bin/java
-Dspark.kubernetes.driver.label.sparkoperator.k8s.io/app-id=spark-pi-2557211557
-Dspark.kubernetes.container.image=gcr.io/ynli-k8s/spark:v2.3.0
-Dspark.kubernetes.executor.label.sparkoperator.k8s.io/app-name=spark-pi
-Dspark.app.name=spark-pi
-Dspark.executor.instances=7
-Dspark.driver.blockManager.port=7079
-Dspark.driver.cores=0.10
-Dspark.kubernetes.driver.label.version=2.3.0
-Dspark.kubernetes.executor.podNamePrefix=spark-pi-607e0943cf32319883cc3beb2e02be4f
-Dspark.executor.memory=512m
-Dspark.kubernetes.driver.label.sparkoperator.k8s.io/app-name=spark-pi
-Dspark.kubernetes.authenticate.driver.serviceAccountName=spark
-Dspark.kubernetes.driver.label.sparkoperator.k8s.io/launched-by-spark-operator=true
-Dspark.kubernetes.driver.limit.cores=200m
-Dspark.driver.host=spark-pi-607e0943cf32319883cc3beb2e02be4f
-Driver-svc.big
-Data-analytics.svc
-Dspark.kubernetes.driver.pod.name=spark-pi-607e0943cf32319883cc3beb2e02be4f
-Driver
-Dspark.submit.deployMode=cluster
-Dspark.kubernetes.executor.label.sparkoperator.k8s.io/app-id=spark-pi-2557211557
-Dspark.kubernetes.submission.waitAppCompletion=false
-Dspark.kubernetes.driver.annotation.sparkoperator.k8s.io/volumes.test-volume=Cgt0ZXN0LXZvbHVtZRITChEKBC90bXASCURpcmVjdG9yeQ==
-Dspark.driver.port=7078
-Dspark.app.id=spark-a7cdcb5ce1e54879a5286979a197f791

[jira] [Commented] (SPARK-14540) Support Scala 2.12 closures and Java 8 lambdas in ClosureCleaner

2018-06-18 Thread Lukas Rytz (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-14540?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16515648#comment-16515648
 ] 

Lukas Rytz commented on SPARK-14540:


[~skonto] and me (both from Lightbend) are working on this issue now

> Support Scala 2.12 closures and Java 8 lambdas in ClosureCleaner
> 
>
> Key: SPARK-14540
> URL: https://issues.apache.org/jira/browse/SPARK-14540
> Project: Spark
>  Issue Type: Sub-task
>  Components: Spark Core
>Reporter: Josh Rosen
>Priority: Major
>
> Using https://github.com/JoshRosen/spark/tree/build-for-2.12, I tried running 
> ClosureCleanerSuite with Scala 2.12 and ran into two bad test failures:
> {code}
> [info] - toplevel return statements in closures are identified at cleaning 
> time *** FAILED *** (32 milliseconds)
> [info]   Expected exception 
> org.apache.spark.util.ReturnStatementInClosureException to be thrown, but no 
> exception was thrown. (ClosureCleanerSuite.scala:57)
> {code}
> and
> {code}
> [info] - user provided closures are actually cleaned *** FAILED *** (56 
> milliseconds)
> [info]   Expected ReturnStatementInClosureException, but got 
> org.apache.spark.SparkException: Job aborted due to stage failure: Task not 
> serializable: java.io.NotSerializableException: java.lang.Object
> [info]- element of array (index: 0)
> [info]- array (class "[Ljava.lang.Object;", size: 1)
> [info]- field (class "java.lang.invoke.SerializedLambda", name: 
> "capturedArgs", type: "class [Ljava.lang.Object;")
> [info]- object (class "java.lang.invoke.SerializedLambda", 
> SerializedLambda[capturingClass=class 
> org.apache.spark.util.TestUserClosuresActuallyCleaned$, 
> functionalInterfaceMethod=scala/runtime/java8/JFunction1$mcII$sp.apply$mcII$sp:(I)I,
>  implementation=invokeStatic 
> org/apache/spark/util/TestUserClosuresActuallyCleaned$.org$apache$spark$util$TestUserClosuresActuallyCleaned$$$anonfun$69:(Ljava/lang/Object;I)I,
>  instantiatedMethodType=(I)I, numCaptured=1])
> [info]- element of array (index: 0)
> [info]- array (class "[Ljava.lang.Object;", size: 1)
> [info]- field (class "java.lang.invoke.SerializedLambda", name: 
> "capturedArgs", type: "class [Ljava.lang.Object;")
> [info]- object (class "java.lang.invoke.SerializedLambda", 
> SerializedLambda[capturingClass=class org.apache.spark.rdd.RDD, 
> functionalInterfaceMethod=scala/Function3.apply:(Ljava/lang/Object;Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;,
>  implementation=invokeStatic 
> org/apache/spark/rdd/RDD.org$apache$spark$rdd$RDD$$$anonfun$20$adapted:(Lscala/Function1;Lorg/apache/spark/TaskContext;Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;,
>  
> instantiatedMethodType=(Lorg/apache/spark/TaskContext;Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;,
>  numCaptured=1])
> [info]- field (class "org.apache.spark.rdd.MapPartitionsRDD", name: 
> "f", type: "interface scala.Function3")
> [info]- object (class "org.apache.spark.rdd.MapPartitionsRDD", 
> MapPartitionsRDD[2] at apply at Transformer.scala:22)
> [info]- field (class "scala.Tuple2", name: "_1", type: "class 
> java.lang.Object")
> [info]- root object (class "scala.Tuple2", (MapPartitionsRDD[2] at 
> apply at 
> Transformer.scala:22,org.apache.spark.SparkContext$$Lambda$957/431842435@6e803685)).
> [info]   This means the closure provided by user is not actually cleaned. 
> (ClosureCleanerSuite.scala:78)
> {code}
> We'll need to figure out a closure cleaning strategy which works for 2.12 
> lambdas.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-24573) SBT Java checkstyle affecting the build

2018-06-18 Thread Hyukjin Kwon (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24573?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-24573.
--
   Resolution: Fixed
Fix Version/s: 2.4.0

Issue resolved by pull request 21579
[https://github.com/apache/spark/pull/21579]

> SBT Java checkstyle affecting the build
> ---
>
> Key: SPARK-24573
> URL: https://issues.apache.org/jira/browse/SPARK-24573
> Project: Spark
>  Issue Type: Bug
>  Components: Project Infra
>Affects Versions: 2.4.0
>Reporter: Hyukjin Kwon
>Assignee: Hyukjin Kwon
>Priority: Major
> Fix For: 2.4.0
>
>
> Seems checkstyle affects the build in Jenkins. I can't reproduce in my local 
> but it can only be reproduced in Jenkins.
> When PR contains Java, this consistently fails on compilation as below:
> {code}
> [warn] 
> /home/jenkins/workspace/SparkPullRequestBuilder/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala:160:
>  non-variable type argument 
> org.apache.spark.deploy.k8s.KubernetesExecutorSpecificConf in type 
> org.apache.spark.deploy.k8s.KubernetesConf[org.apache.spark.deploy.k8s.KubernetesExecutorSpecificConf]
>  is unchecked since it is eliminated by erasure
> [warn] if 
> (!argument.isInstanceOf[KubernetesConf[KubernetesExecutorSpecificConf]]) {
> [warn]   ^
> java.util.concurrent.ExecutionException: java.lang.OutOfMemoryError: GC 
> overhead limit exceeded
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> sbt.ConcurrentRestrictions$$anon$4.take(ConcurrentRestrictions.scala:188)
>   at sbt.Execute.next$1(Execute.scala:85)
>   at sbt.Execute.processAll(Execute.scala:88)
>   at sbt.Execute.runKeep(Execute.scala:68)
>   at sbt.EvaluateTask$.liftedTree1$1(EvaluateTask.scala:359)
>   at sbt.EvaluateTask$.run$1(EvaluateTask.scala:358)
>   at sbt.EvaluateTask$.runTask(EvaluateTask.scala:378)
>   at sbt.Aggregation$$anonfun$3.apply(Aggregation.scala:69)
>   at sbt.Aggregation$$anonfun$3.apply(Aggregation.scala:67)
>   at sbt.EvaluateTask$.withStreams(EvaluateTask.scala:314)
>   at sbt.Aggregation$.timedRun(Aggregation.scala:67)
>   at sbt.Aggregation$.runTasks(Aggregation.scala:76)
>   at sbt.Aggregation$$anonfun$applyTasks$1.apply(Aggregation.scala:37)
>   at sbt.Aggregation$$anonfun$applyTasks$1.apply(Aggregation.scala:36)
>   at 
> sbt.Command$$anonfun$applyEffect$2$$anonfun$apply$3.apply(Command.scala:61)
>   at 
> sbt.Command$$anonfun$applyEffect$2$$anonfun$apply$3.apply(Command.scala:61)
>   at 
> sbt.Aggregation$$anonfun$evaluatingParser$4$$anonfun$apply$5.apply(Aggregation.scala:158)
>   at 
> sbt.Aggregation$$anonfun$evaluatingParser$4$$anonfun$apply$5.apply(Aggregation.scala:157)
>   at 
> sbt.Act$$anonfun$sbt$Act$$actParser0$1$$anonfun$sbt$Act$$anonfun$$evaluate$1$1$$anonfun$apply$10.apply(Act.scala:253)
>   at 
> sbt.Act$$anonfun$sbt$Act$$actParser0$1$$anonfun$sbt$Act$$anonfun$$evaluate$1$1$$anonfun$apply$10.apply(Act.scala:250)
>   at sbt.Command$.process(Command.scala:93)
>   at sbt.MainLoop$$anonfun$1$$anonfun$apply$1.apply(MainLoop.scala:96)
>   at sbt.MainLoop$$anonfun$1$$anonfun$apply$1.apply(MainLoop.scala:96)
>   at sbt.State$$anon$1.runCmd$1(State.scala:183)
>   at sbt.State$$anon$1.process(State.scala:187)
>   at sbt.MainLoop$$anonfun$1.apply(MainLoop.scala:96)
>   at sbt.MainLoop$$anonfun$1.apply(MainLoop.scala:96)
>   at sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:17)
>   at sbt.MainLoop$.next(MainLoop.scala:96)
>   at sbt.MainLoop$.run(MainLoop.scala:89)
>   at sbt.MainLoop$$anonfun$runWithNewLog$1.apply(MainLoop.scala:68)
>   at sbt.MainLoop$$anonfun$runWithNewLog$1.apply(MainLoop.scala:63)
>   at sbt.Using.apply(Using.scala:24)
>   at sbt.MainLoop$.runWithNewLog(MainLoop.scala:63)
>   at sbt.MainLoop$.runAndClearLast(MainLoop.scala:46)
>   at sbt.MainLoop$.runLoggedLoop(MainLoop.scala:30)
>   at sbt.MainLoop$.runLogged(MainLoop.scala:22)
>   at sbt.StandardMain$.runManaged(Main.scala:61)
>   at sbt.xMain.run(Main.scala:35)
>   at xsbt.boot.Launch$$anonfun$run$1.apply(Launch.scala:109)
>   at xsbt.boot.Launch$.withContextLoader(Launch.scala:128)
>   at xsbt.boot.Launch$.run(Launch.scala:109)
>   at xsbt.boot.Launch$$anonfun$apply$1.apply(Launch.scala:35)
>   at xsbt.boot.Launch$.launch(Launch.scala:117)
>   at xsbt.boot.Launch$.apply(Launch.scala:18)
>   at xsbt.boot.Boot$.runImpl(Boot.scala:41)
>   at xsbt.boot.Boot$.main(Boot.scala:17)
>   at xsbt.boot.Boot.main(Boot.scala)
> Caused by: 

[jira] [Assigned] (SPARK-24573) SBT Java checkstyle affecting the build

2018-06-18 Thread Hyukjin Kwon (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24573?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon reassigned SPARK-24573:


Assignee: Hyukjin Kwon

> SBT Java checkstyle affecting the build
> ---
>
> Key: SPARK-24573
> URL: https://issues.apache.org/jira/browse/SPARK-24573
> Project: Spark
>  Issue Type: Bug
>  Components: Project Infra
>Affects Versions: 2.4.0
>Reporter: Hyukjin Kwon
>Assignee: Hyukjin Kwon
>Priority: Major
> Fix For: 2.4.0
>
>
> Seems checkstyle affects the build in Jenkins. I can't reproduce in my local 
> but it can only be reproduced in Jenkins.
> When PR contains Java, this consistently fails on compilation as below:
> {code}
> [warn] 
> /home/jenkins/workspace/SparkPullRequestBuilder/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala:160:
>  non-variable type argument 
> org.apache.spark.deploy.k8s.KubernetesExecutorSpecificConf in type 
> org.apache.spark.deploy.k8s.KubernetesConf[org.apache.spark.deploy.k8s.KubernetesExecutorSpecificConf]
>  is unchecked since it is eliminated by erasure
> [warn] if 
> (!argument.isInstanceOf[KubernetesConf[KubernetesExecutorSpecificConf]]) {
> [warn]   ^
> java.util.concurrent.ExecutionException: java.lang.OutOfMemoryError: GC 
> overhead limit exceeded
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> sbt.ConcurrentRestrictions$$anon$4.take(ConcurrentRestrictions.scala:188)
>   at sbt.Execute.next$1(Execute.scala:85)
>   at sbt.Execute.processAll(Execute.scala:88)
>   at sbt.Execute.runKeep(Execute.scala:68)
>   at sbt.EvaluateTask$.liftedTree1$1(EvaluateTask.scala:359)
>   at sbt.EvaluateTask$.run$1(EvaluateTask.scala:358)
>   at sbt.EvaluateTask$.runTask(EvaluateTask.scala:378)
>   at sbt.Aggregation$$anonfun$3.apply(Aggregation.scala:69)
>   at sbt.Aggregation$$anonfun$3.apply(Aggregation.scala:67)
>   at sbt.EvaluateTask$.withStreams(EvaluateTask.scala:314)
>   at sbt.Aggregation$.timedRun(Aggregation.scala:67)
>   at sbt.Aggregation$.runTasks(Aggregation.scala:76)
>   at sbt.Aggregation$$anonfun$applyTasks$1.apply(Aggregation.scala:37)
>   at sbt.Aggregation$$anonfun$applyTasks$1.apply(Aggregation.scala:36)
>   at 
> sbt.Command$$anonfun$applyEffect$2$$anonfun$apply$3.apply(Command.scala:61)
>   at 
> sbt.Command$$anonfun$applyEffect$2$$anonfun$apply$3.apply(Command.scala:61)
>   at 
> sbt.Aggregation$$anonfun$evaluatingParser$4$$anonfun$apply$5.apply(Aggregation.scala:158)
>   at 
> sbt.Aggregation$$anonfun$evaluatingParser$4$$anonfun$apply$5.apply(Aggregation.scala:157)
>   at 
> sbt.Act$$anonfun$sbt$Act$$actParser0$1$$anonfun$sbt$Act$$anonfun$$evaluate$1$1$$anonfun$apply$10.apply(Act.scala:253)
>   at 
> sbt.Act$$anonfun$sbt$Act$$actParser0$1$$anonfun$sbt$Act$$anonfun$$evaluate$1$1$$anonfun$apply$10.apply(Act.scala:250)
>   at sbt.Command$.process(Command.scala:93)
>   at sbt.MainLoop$$anonfun$1$$anonfun$apply$1.apply(MainLoop.scala:96)
>   at sbt.MainLoop$$anonfun$1$$anonfun$apply$1.apply(MainLoop.scala:96)
>   at sbt.State$$anon$1.runCmd$1(State.scala:183)
>   at sbt.State$$anon$1.process(State.scala:187)
>   at sbt.MainLoop$$anonfun$1.apply(MainLoop.scala:96)
>   at sbt.MainLoop$$anonfun$1.apply(MainLoop.scala:96)
>   at sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:17)
>   at sbt.MainLoop$.next(MainLoop.scala:96)
>   at sbt.MainLoop$.run(MainLoop.scala:89)
>   at sbt.MainLoop$$anonfun$runWithNewLog$1.apply(MainLoop.scala:68)
>   at sbt.MainLoop$$anonfun$runWithNewLog$1.apply(MainLoop.scala:63)
>   at sbt.Using.apply(Using.scala:24)
>   at sbt.MainLoop$.runWithNewLog(MainLoop.scala:63)
>   at sbt.MainLoop$.runAndClearLast(MainLoop.scala:46)
>   at sbt.MainLoop$.runLoggedLoop(MainLoop.scala:30)
>   at sbt.MainLoop$.runLogged(MainLoop.scala:22)
>   at sbt.StandardMain$.runManaged(Main.scala:61)
>   at sbt.xMain.run(Main.scala:35)
>   at xsbt.boot.Launch$$anonfun$run$1.apply(Launch.scala:109)
>   at xsbt.boot.Launch$.withContextLoader(Launch.scala:128)
>   at xsbt.boot.Launch$.run(Launch.scala:109)
>   at xsbt.boot.Launch$$anonfun$apply$1.apply(Launch.scala:35)
>   at xsbt.boot.Launch$.launch(Launch.scala:117)
>   at xsbt.boot.Launch$.apply(Launch.scala:18)
>   at xsbt.boot.Boot$.runImpl(Boot.scala:41)
>   at xsbt.boot.Boot$.main(Boot.scala:17)
>   at xsbt.boot.Boot.main(Boot.scala)
> Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
>   at 
>