[jira] [Comment Edited] (SPARK-23191) Workers registration failes in case of network drop

2018-03-27 Thread Sujith Jay Nair (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23191?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16415194#comment-16415194
 ] 

Sujith Jay Nair edited comment on SPARK-23191 at 3/27/18 8:10 AM:
--

This is a known race condition, in which the reconnection attempt made by the 
worker after the network outage is seen as a request to register a duplicate 
worker on the (same) master and hence, the attempt fails. Details on this can 
be found in SPARK-4592. Although the race condition is resolved for the case in 
which a new master replaces the unresponsive master, it still exists when the 
same old master recovers, which is the case here.


was (Author: suj1th):
This is a known race condition, in which the reconnection attempt made by the 
worker after the network outage is seen as a request to register a duplicate 
worker on the (same) master and hence, the attempt fails. Details on this can 
be found in [#SPARK-4592]. Although the race condition is resolved for the case 
in which a new master replaces the unresponsive master, it still exists when 
the same old master recovers, which is the case here.

> Workers registration failes in case of network drop
> ---
>
> Key: SPARK-23191
> URL: https://issues.apache.org/jira/browse/SPARK-23191
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.6.3, 2.2.1, 2.3.0
> Environment: OS:- Centos 6.9(64 bit)
>  
>Reporter: Neeraj Gupta
>Priority: Critical
>
> We have a 3 node cluster. We were facing issues of multiple driver running in 
> some scenario in production.
> On further investigation we were able to reproduce iin both 1.6.3 and 2.2.1 
> versions the scenario with following steps:-
>  # Setup a 3 node cluster. Start master and slaves.
>  # On any node where the worker process is running block the connections on 
> port 7077 using iptables.
> {code:java}
> iptables -A OUTPUT -p tcp --dport 7077 -j DROP
> {code}
>  # After about 10-15 secs we get the error on node that it is unable to 
> connect to master.
> {code:java}
> 2018-01-23 12:08:51,639 [rpc-client-1-1] WARN  
> org.apache.spark.network.server.TransportChannelHandler - Exception in 
> connection from 
> java.io.IOException: Connection timed out
>     at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
>     at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
>     at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
>     at sun.nio.ch.IOUtil.read(IOUtil.java:192)
>     at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
>     at 
> io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:221)
>     at 
> io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:899)
>     at 
> io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:275)
>     at 
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
>     at 
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643)
>     at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)
>     at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)
>     at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
>     at 
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
>     at 
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
>     at java.lang.Thread.run(Thread.java:745)
> 2018-01-23 12:08:51,647 [dispatcher-event-loop-0] ERROR 
> org.apache.spark.deploy.worker.Worker - Connection to master failed! Waiting 
> for master to reconnect...
> 2018-01-23 12:08:51,647 [dispatcher-event-loop-0] ERROR 
> org.apache.spark.deploy.worker.Worker - Connection to master failed! Waiting 
> for master to reconnect...
> {code}
>  # Once we get this exception we renable the connections to port 7077 using
> {code:java}
> iptables -D OUTPUT -p tcp --dport 7077 -j DROP
> {code}
>  # Worker tries to register again with master but is unable to do so. It 
> gives following error
> {code:java}
> 2018-01-23 12:08:58,657 [worker-register-master-threadpool-2] WARN  
> org.apache.spark.deploy.worker.Worker - Failed to connect to master 
> :7077
> org.apache.spark.SparkException: Exception thrown in awaitResult:
>     at 
> org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
>     at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
>     at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:100)
>     at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:108)
>     at 
> 

[jira] [Commented] (SPARK-23191) Workers registration failes in case of network drop

2018-03-27 Thread Sujith Jay Nair (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23191?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16415194#comment-16415194
 ] 

Sujith Jay Nair commented on SPARK-23191:
-

This is a known race condition, in which the reconnection attempt made by the 
worker after the network outage is seen as a request to register a duplicate 
worker on the (same) master and hence, the attempt fails. Details on this can 
be found in [#SPARK-4592]. Although the race condition is resolved for the case 
in which a new master replaces the unresponsive master, it still exists when 
the same old master recovers, which is the case here.

> Workers registration failes in case of network drop
> ---
>
> Key: SPARK-23191
> URL: https://issues.apache.org/jira/browse/SPARK-23191
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.6.3, 2.2.1, 2.3.0
> Environment: OS:- Centos 6.9(64 bit)
>  
>Reporter: Neeraj Gupta
>Priority: Critical
>
> We have a 3 node cluster. We were facing issues of multiple driver running in 
> some scenario in production.
> On further investigation we were able to reproduce iin both 1.6.3 and 2.2.1 
> versions the scenario with following steps:-
>  # Setup a 3 node cluster. Start master and slaves.
>  # On any node where the worker process is running block the connections on 
> port 7077 using iptables.
> {code:java}
> iptables -A OUTPUT -p tcp --dport 7077 -j DROP
> {code}
>  # After about 10-15 secs we get the error on node that it is unable to 
> connect to master.
> {code:java}
> 2018-01-23 12:08:51,639 [rpc-client-1-1] WARN  
> org.apache.spark.network.server.TransportChannelHandler - Exception in 
> connection from 
> java.io.IOException: Connection timed out
>     at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
>     at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
>     at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
>     at sun.nio.ch.IOUtil.read(IOUtil.java:192)
>     at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
>     at 
> io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:221)
>     at 
> io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:899)
>     at 
> io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:275)
>     at 
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
>     at 
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643)
>     at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)
>     at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)
>     at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
>     at 
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
>     at 
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
>     at java.lang.Thread.run(Thread.java:745)
> 2018-01-23 12:08:51,647 [dispatcher-event-loop-0] ERROR 
> org.apache.spark.deploy.worker.Worker - Connection to master failed! Waiting 
> for master to reconnect...
> 2018-01-23 12:08:51,647 [dispatcher-event-loop-0] ERROR 
> org.apache.spark.deploy.worker.Worker - Connection to master failed! Waiting 
> for master to reconnect...
> {code}
>  # Once we get this exception we renable the connections to port 7077 using
> {code:java}
> iptables -D OUTPUT -p tcp --dport 7077 -j DROP
> {code}
>  # Worker tries to register again with master but is unable to do so. It 
> gives following error
> {code:java}
> 2018-01-23 12:08:58,657 [worker-register-master-threadpool-2] WARN  
> org.apache.spark.deploy.worker.Worker - Failed to connect to master 
> :7077
> org.apache.spark.SparkException: Exception thrown in awaitResult:
>     at 
> org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
>     at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
>     at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:100)
>     at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:108)
>     at 
> org.apache.spark.deploy.worker.Worker$$anonfun$org$apache$spark$deploy$worker$Worker$$tryRegisterAllMasters$1$$anon$1.run(Worker.scala:241)
>     at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>     at 

[jira] [Commented] (SPARK-23437) [ML] Distributed Gaussian Process Regression for MLlib

2018-03-25 Thread Sujith Jay Nair (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16413013#comment-16413013
 ] 

Sujith Jay Nair commented on SPARK-23437:
-

+1 for the this initiative. To garner support for this initiative, we need to 
come up with strong reasons why GPs are needed as part of Spark ML. This could 
be done as part of the documentation of your implementation. 
You do mention GPflow as an example of the TensorFlow ecosystem supporting 
linear-time GPs; however, that still is a third-party library. If anything, it 
vouches for the opinion that this functionality should be kept separate from 
core Spark ML. Like Seth Henderson mentions above, it would help tremendously 
to showcase more packages which have this algo implemented.

> [ML] Distributed Gaussian Process Regression for MLlib
> --
>
> Key: SPARK-23437
> URL: https://issues.apache.org/jira/browse/SPARK-23437
> Project: Spark
>  Issue Type: New Feature
>  Components: ML, MLlib
>Affects Versions: 2.2.1
>Reporter: Valeriy Avanesov
>Assignee: Apache Spark
>Priority: Major
>
> Gaussian Process Regression (GP) is a well known black box non-linear 
> regression approach [1]. For years the approach remained inapplicable to 
> large samples due to its cubic computational complexity, however, more recent 
> techniques (Sparse GP) allowed for only linear complexity. The field 
> continues to attracts interest of the researches – several papers devoted to 
> GP were present on NIPS 2017. 
> Unfortunately, non-parametric regression techniques coming with mllib are 
> restricted to tree-based approaches.
> I propose to create and include an implementation (which I am going to work 
> on) of so-called robust Bayesian Committee Machine proposed and investigated 
> in [2].
> [1] Carl Edward Rasmussen and Christopher K. I. Williams. 2005. _Gaussian 
> Processes for Machine Learning (Adaptive Computation and Machine Learning)_. 
> The MIT Press.
> [2] Marc Peter Deisenroth and Jun Wei Ng. 2015. Distributed Gaussian 
> processes. In _Proceedings of the 32nd International Conference on 
> International Conference on Machine Learning - Volume 37_ (ICML'15), Francis 
> Bach and David Blei (Eds.), Vol. 37. JMLR.org 1481-1490.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-8682) Range Join for Spark SQL

2018-01-25 Thread Sujith Jay Nair (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339169#comment-16339169
 ] 

Sujith Jay Nair commented on SPARK-8682:


The ticket description refers to the implementation of range-joins in the ADAM 
project. I believe this is the [ADAM 
pull-request|https://github.com/bigdatagenomics/adam/pull/117] which is been 
referred to. FYR.

> Range Join for Spark SQL
> 
>
> Key: SPARK-8682
> URL: https://issues.apache.org/jira/browse/SPARK-8682
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Reporter: Herman van Hovell
>Priority: Major
> Attachments: perf_testing.scala
>
>
> Currently Spark SQL uses a Broadcast Nested Loop join (or a filtered 
> Cartesian Join) when it has to execute the following range query:
> {noformat}
> SELECT A.*,
>B.*
> FROM   tableA A
>JOIN tableB B
> ON A.start <= B.end
>  AND A.end > B.start
> {noformat}
> This is horribly inefficient. The performance of this query can be greatly 
> improved, when one of the tables can be broadcasted, by creating a range 
> index. A range index is basically a sorted map containing the rows of the 
> smaller table, indexed by both the high and low keys. using this structure 
> the complexity of the query would go from O(N * M) to O(N * 2 * LOG(M)), N = 
> number of records in the larger table, M = number of records in the smaller 
> (indexed) table.
> I have created a pull request for this. According to the [Spark SQL: 
> Relational Data Processing in 
> Spark|http://people.csail.mit.edu/matei/papers/2015/sigmod_spark_sql.pdf] 
> paper similar work (page 11, section 7.2) has already been done by the ADAM 
> project (cannot locate the code though). 
> Any comments and/or feedback are greatly appreciated.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17833) 'monotonicallyIncreasingId()' should be deterministic

2018-01-03 Thread Sujith Jay Nair (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16309411#comment-16309411
 ] 

Sujith Jay Nair commented on SPARK-17833:
-

This issue is resolved in 2.0, as mentioned in [SPARK-14241 | 
https://issues.apache.org/jira/browse/SPARK-14241]

> 'monotonicallyIncreasingId()' should be deterministic
> -
>
> Key: SPARK-17833
> URL: https://issues.apache.org/jira/browse/SPARK-17833
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Reporter: Kevin Ushey
>Priority: Critical
>
> Right now, it's (IMHO) too easy to shoot yourself in the foot using 
> 'monotonicallyIncreasingId()', as it's easy to expect the generated numbers 
> to function as a 'stable' primary key, for example, and then go on to use 
> that key in e.g. 'joins' and so on.
> Is there any reason why this function can't be made deterministic? Or, could 
> a deterministic analogue of this function be added (e.g. 
> 'withPrimaryKey(columnName = ...)')?
> A solution is to immediately cache / persist the table after calling 
> 'monotonicallyIncreasingId()'; it's also possible that the documentation 
> should spell that out loud and clear.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22714) Spark API Not responding when Fatal exception occurred in event loop

2018-01-03 Thread Sujith Jay Nair (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16309366#comment-16309366
 ] 

Sujith Jay Nair commented on SPARK-22714:
-

Hi [~todesking], is this reproducible outside of Spark REPL? Trying to 
understand if this is specific to Spark shell.

> Spark API Not responding when Fatal exception occurred in event loop
> 
>
> Key: SPARK-22714
> URL: https://issues.apache.org/jira/browse/SPARK-22714
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: todesking
>Priority: Critical
>
> To reproduce, let Spark to throw an OOM Exception in event loop:
> {noformat}
> scala> spark.sparkContext.getConf.get("spark.driver.memory")
> res0: String = 1g
> scala> val a = new Array[Int](4 * 1000 * 1000)
> scala> val ds = spark.createDataset(a)
> scala> ds.rdd.zipWithIndex
> [Stage 0:>  (0 + 0) / 
> 3]Exception in thread "dispatcher-event-loop-1" java.lang.OutOfMemoryError: 
> Java heap space
> [Stage 0:>  (0 + 0) / 
> 3]
> // Spark is not responding
> {noformat}
> While not responding, Spark waiting for some Promise, but is never done.
> The promise depends some process in event loop thread, but the thread is dead 
> when Fatal exception is thrown.
> {noformat}
> "main" #1 prio=5 os_prio=31 tid=0x7ffc9300b000 nid=0x1703 waiting on 
> condition [0x70216000]
>java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x0007ad978eb8> (a 
> scala.concurrent.impl.Promise$CompletionLatch)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
> at 
> scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:202)
> at 
> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)
> at 
> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:153)
> at 
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:619)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944)
> at 
> org.apache.spark.rdd.ZippedWithIndexRDD.(ZippedWithIndexRDD.scala:50)
> at 
> org.apache.spark.rdd.RDD$$anonfun$zipWithIndex$1.apply(RDD.scala:1293)
> at 
> org.apache.spark.rdd.RDD$$anonfun$zipWithIndex$1.apply(RDD.scala:1293)
> at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
> at org.apache.spark.rdd.RDD.zipWithIndex(RDD.scala:1292)
> {noformat}
> I don't know how to fix it properly, but it seems we need to add Fatal error 
> handling to EventLoop.run() in core/EventLoop.scala



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22465) Cogroup of two disproportionate RDDs could lead into 2G limit BUG

2017-12-15 Thread Sujith Jay Nair (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292581#comment-16292581
 ] 

Sujith Jay Nair commented on SPARK-22465:
-

Would something along the lines of  'add a safety-check that ignores the 
partitioner if the number of partitions on the RDDs are very different in 
magnitude', as the reporter suggests, be a satisfactory solution? Any pointers 
here would be very helpful.

> Cogroup of two disproportionate RDDs could lead into 2G limit BUG
> -
>
> Key: SPARK-22465
> URL: https://issues.apache.org/jira/browse/SPARK-22465
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.0.0, 1.0.1, 1.0.2, 1.1.0, 1.1.1, 1.2.0, 1.2.1, 1.2.2, 
> 1.3.0, 1.3.1, 1.4.0, 1.4.1, 1.5.0, 1.5.1, 1.5.2, 1.6.0, 1.6.1, 1.6.2, 1.6.3, 
> 2.0.0, 2.0.1, 2.0.2, 2.1.0, 2.1.1, 2.1.2, 2.2.0
>Reporter: Amit Kumar
>Priority: Critical
>
> While running my spark pipeline, it failed with the following exception
> {noformat}
> 2017-11-03 04:49:09,776 [Executor task launch worker for task 58670] ERROR 
> org.apache.spark.executor.Executor  - Exception in task 630.0 in stage 28.0 
> (TID 58670)
> java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
>   at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:869)
>   at 
> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:103)
>   at 
> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:91)
>   at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1303)
>   at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:105)
>   at 
> org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:469)
>   at 
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:705)
>   at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>   at org.apache.spark.scheduler.Task.run(Task.scala:99)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:324)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {noformat}
> After debugging I found that the issue lies with how spark handles cogroup of 
> two RDDs.
> Here is the relevant code from apache spark
> {noformat}
>  /**
>* For each key k in `this` or `other`, return a resulting RDD that 
> contains a tuple with the
>* list of values for that key in `this` as well as `other`.
>*/
>   def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = 
> self.withScope {
> cogroup(other, defaultPartitioner(self, other))
>   }
> /**
>* Choose a partitioner to use for a cogroup-like operation between a 
> number of RDDs.
>*
>* If any of the RDDs already has a partitioner, choose that one.
>*
>* Otherwise, we use a default HashPartitioner. For the number of 
> partitions, if
>* spark.default.parallelism is set, then we'll use the value from 
> SparkContext
>* defaultParallelism, otherwise we'll use the max number of upstream 
> partitions.
>*
>* Unless spark.default.parallelism is set, the number of partitions will 
> be the
>* same as the number of partitions in the largest upstream RDD, as this 
> should
>* be least likely to cause out-of-memory errors.
>*
>* We use two method parameters (rdd, others) to enforce callers passing at 
> least 1 RDD.
>*/
>   def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
> val rdds = (Seq(rdd) ++ others)
> val hasPartitioner = rdds.filter(_.partitioner.exists(_.numPartitions > 
> 0))
> if (hasPartitioner.nonEmpty) {
>   hasPartitioner.maxBy(_.partitions.length).partitioner.get
> } else {
>   if (rdd.context.conf.contains("spark.default.parallelism")) {
> new HashPartitioner(rdd.context.defaultParallelism)
>   } else {
> new HashPartitioner(rdds.map(_.partitions.length).max)
>   }
> }
>   }
> {noformat}
> Given this  suppose we have two  pair RDDs.
> RDD1 : A small RDD which fewer data and partitions
> RDD2: A huge RDD which has loads of data and partitions
> Now in the code if we were to have a cogroup
> {noformat}
> val RDD3 = RDD1.cogroup(RDD2)
> {noformat}
> there is a case where this could lead to the SPARK-6235 Bug which is If RDD1 
> has a partitioner when it is being called into a cogroup. This is because the 
> cogroups partitions are then decided by the partitioner and could lead to the 
> huge 

[jira] [Commented] (SPARK-22465) Cogroup of two disproportionate RDDs could lead into 2G limit BUG

2017-12-15 Thread Sujith Jay Nair (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292327#comment-16292327
 ] 

Sujith Jay Nair commented on SPARK-22465:
-

Hi [~tgraves], is there a plan to resolve this behaviour of cogroup, outside of 
the umbrella ticket for fixing 2G limit ([SPARK-6235]). I wish to chip in if 
that is the case. Thank you.

> Cogroup of two disproportionate RDDs could lead into 2G limit BUG
> -
>
> Key: SPARK-22465
> URL: https://issues.apache.org/jira/browse/SPARK-22465
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.0.0, 1.0.1, 1.0.2, 1.1.0, 1.1.1, 1.2.0, 1.2.1, 1.2.2, 
> 1.3.0, 1.3.1, 1.4.0, 1.4.1, 1.5.0, 1.5.1, 1.5.2, 1.6.0, 1.6.1, 1.6.2, 1.6.3, 
> 2.0.0, 2.0.1, 2.0.2, 2.1.0, 2.1.1, 2.1.2, 2.2.0
>Reporter: Amit Kumar
>Priority: Critical
>
> While running my spark pipeline, it failed with the following exception
> {noformat}
> 2017-11-03 04:49:09,776 [Executor task launch worker for task 58670] ERROR 
> org.apache.spark.executor.Executor  - Exception in task 630.0 in stage 28.0 
> (TID 58670)
> java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
>   at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:869)
>   at 
> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:103)
>   at 
> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:91)
>   at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1303)
>   at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:105)
>   at 
> org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:469)
>   at 
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:705)
>   at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>   at org.apache.spark.scheduler.Task.run(Task.scala:99)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:324)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {noformat}
> After debugging I found that the issue lies with how spark handles cogroup of 
> two RDDs.
> Here is the relevant code from apache spark
> {noformat}
>  /**
>* For each key k in `this` or `other`, return a resulting RDD that 
> contains a tuple with the
>* list of values for that key in `this` as well as `other`.
>*/
>   def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = 
> self.withScope {
> cogroup(other, defaultPartitioner(self, other))
>   }
> /**
>* Choose a partitioner to use for a cogroup-like operation between a 
> number of RDDs.
>*
>* If any of the RDDs already has a partitioner, choose that one.
>*
>* Otherwise, we use a default HashPartitioner. For the number of 
> partitions, if
>* spark.default.parallelism is set, then we'll use the value from 
> SparkContext
>* defaultParallelism, otherwise we'll use the max number of upstream 
> partitions.
>*
>* Unless spark.default.parallelism is set, the number of partitions will 
> be the
>* same as the number of partitions in the largest upstream RDD, as this 
> should
>* be least likely to cause out-of-memory errors.
>*
>* We use two method parameters (rdd, others) to enforce callers passing at 
> least 1 RDD.
>*/
>   def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
> val rdds = (Seq(rdd) ++ others)
> val hasPartitioner = rdds.filter(_.partitioner.exists(_.numPartitions > 
> 0))
> if (hasPartitioner.nonEmpty) {
>   hasPartitioner.maxBy(_.partitions.length).partitioner.get
> } else {
>   if (rdd.context.conf.contains("spark.default.parallelism")) {
> new HashPartitioner(rdd.context.defaultParallelism)
>   } else {
> new HashPartitioner(rdds.map(_.partitions.length).max)
>   }
> }
>   }
> {noformat}
> Given this  suppose we have two  pair RDDs.
> RDD1 : A small RDD which fewer data and partitions
> RDD2: A huge RDD which has loads of data and partitions
> Now in the code if we were to have a cogroup
> {noformat}
> val RDD3 = RDD1.cogroup(RDD2)
> {noformat}
> there is a case where this could lead to the SPARK-6235 Bug which is If RDD1 
> has a partitioner when it is being called into a cogroup. This is because the 
> cogroups partitions are then decided by the partitioner and could lead to the 
> huge RDD2 being shuffled into a small number of partitions.
> One way is