[jira] [Comment Edited] (SPARK-23191) Workers registration failes in case of network drop
[ https://issues.apache.org/jira/browse/SPARK-23191?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16415194#comment-16415194 ] Sujith Jay Nair edited comment on SPARK-23191 at 3/27/18 8:10 AM: -- This is a known race condition, in which the reconnection attempt made by the worker after the network outage is seen as a request to register a duplicate worker on the (same) master and hence, the attempt fails. Details on this can be found in SPARK-4592. Although the race condition is resolved for the case in which a new master replaces the unresponsive master, it still exists when the same old master recovers, which is the case here. was (Author: suj1th): This is a known race condition, in which the reconnection attempt made by the worker after the network outage is seen as a request to register a duplicate worker on the (same) master and hence, the attempt fails. Details on this can be found in [#SPARK-4592]. Although the race condition is resolved for the case in which a new master replaces the unresponsive master, it still exists when the same old master recovers, which is the case here. > Workers registration failes in case of network drop > --- > > Key: SPARK-23191 > URL: https://issues.apache.org/jira/browse/SPARK-23191 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.6.3, 2.2.1, 2.3.0 > Environment: OS:- Centos 6.9(64 bit) > >Reporter: Neeraj Gupta >Priority: Critical > > We have a 3 node cluster. We were facing issues of multiple driver running in > some scenario in production. > On further investigation we were able to reproduce iin both 1.6.3 and 2.2.1 > versions the scenario with following steps:- > # Setup a 3 node cluster. Start master and slaves. > # On any node where the worker process is running block the connections on > port 7077 using iptables. > {code:java} > iptables -A OUTPUT -p tcp --dport 7077 -j DROP > {code} > # After about 10-15 secs we get the error on node that it is unable to > connect to master. > {code:java} > 2018-01-23 12:08:51,639 [rpc-client-1-1] WARN > org.apache.spark.network.server.TransportChannelHandler - Exception in > connection from > java.io.IOException: Connection timed out > at sun.nio.ch.FileDispatcherImpl.read0(Native Method) > at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) > at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) > at sun.nio.ch.IOUtil.read(IOUtil.java:192) > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) > at > io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:221) > at > io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:899) > at > io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:275) > at > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119) > at > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442) > at > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131) > at > io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144) > at java.lang.Thread.run(Thread.java:745) > 2018-01-23 12:08:51,647 [dispatcher-event-loop-0] ERROR > org.apache.spark.deploy.worker.Worker - Connection to master failed! Waiting > for master to reconnect... > 2018-01-23 12:08:51,647 [dispatcher-event-loop-0] ERROR > org.apache.spark.deploy.worker.Worker - Connection to master failed! Waiting > for master to reconnect... > {code} > # Once we get this exception we renable the connections to port 7077 using > {code:java} > iptables -D OUTPUT -p tcp --dport 7077 -j DROP > {code} > # Worker tries to register again with master but is unable to do so. It > gives following error > {code:java} > 2018-01-23 12:08:58,657 [worker-register-master-threadpool-2] WARN > org.apache.spark.deploy.worker.Worker - Failed to connect to master > :7077 > org.apache.spark.SparkException: Exception thrown in awaitResult: > at > org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205) > at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75) > at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:100) > at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:108) > at >
[jira] [Commented] (SPARK-23191) Workers registration failes in case of network drop
[ https://issues.apache.org/jira/browse/SPARK-23191?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16415194#comment-16415194 ] Sujith Jay Nair commented on SPARK-23191: - This is a known race condition, in which the reconnection attempt made by the worker after the network outage is seen as a request to register a duplicate worker on the (same) master and hence, the attempt fails. Details on this can be found in [#SPARK-4592]. Although the race condition is resolved for the case in which a new master replaces the unresponsive master, it still exists when the same old master recovers, which is the case here. > Workers registration failes in case of network drop > --- > > Key: SPARK-23191 > URL: https://issues.apache.org/jira/browse/SPARK-23191 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.6.3, 2.2.1, 2.3.0 > Environment: OS:- Centos 6.9(64 bit) > >Reporter: Neeraj Gupta >Priority: Critical > > We have a 3 node cluster. We were facing issues of multiple driver running in > some scenario in production. > On further investigation we were able to reproduce iin both 1.6.3 and 2.2.1 > versions the scenario with following steps:- > # Setup a 3 node cluster. Start master and slaves. > # On any node where the worker process is running block the connections on > port 7077 using iptables. > {code:java} > iptables -A OUTPUT -p tcp --dport 7077 -j DROP > {code} > # After about 10-15 secs we get the error on node that it is unable to > connect to master. > {code:java} > 2018-01-23 12:08:51,639 [rpc-client-1-1] WARN > org.apache.spark.network.server.TransportChannelHandler - Exception in > connection from > java.io.IOException: Connection timed out > at sun.nio.ch.FileDispatcherImpl.read0(Native Method) > at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) > at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) > at sun.nio.ch.IOUtil.read(IOUtil.java:192) > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) > at > io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:221) > at > io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:899) > at > io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:275) > at > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119) > at > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442) > at > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131) > at > io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144) > at java.lang.Thread.run(Thread.java:745) > 2018-01-23 12:08:51,647 [dispatcher-event-loop-0] ERROR > org.apache.spark.deploy.worker.Worker - Connection to master failed! Waiting > for master to reconnect... > 2018-01-23 12:08:51,647 [dispatcher-event-loop-0] ERROR > org.apache.spark.deploy.worker.Worker - Connection to master failed! Waiting > for master to reconnect... > {code} > # Once we get this exception we renable the connections to port 7077 using > {code:java} > iptables -D OUTPUT -p tcp --dport 7077 -j DROP > {code} > # Worker tries to register again with master but is unable to do so. It > gives following error > {code:java} > 2018-01-23 12:08:58,657 [worker-register-master-threadpool-2] WARN > org.apache.spark.deploy.worker.Worker - Failed to connect to master > :7077 > org.apache.spark.SparkException: Exception thrown in awaitResult: > at > org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205) > at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75) > at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:100) > at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:108) > at > org.apache.spark.deploy.worker.Worker$$anonfun$org$apache$spark$deploy$worker$Worker$$tryRegisterAllMasters$1$$anon$1.run(Worker.scala:241) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at
[jira] [Commented] (SPARK-23437) [ML] Distributed Gaussian Process Regression for MLlib
[ https://issues.apache.org/jira/browse/SPARK-23437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16413013#comment-16413013 ] Sujith Jay Nair commented on SPARK-23437: - +1 for the this initiative. To garner support for this initiative, we need to come up with strong reasons why GPs are needed as part of Spark ML. This could be done as part of the documentation of your implementation. You do mention GPflow as an example of the TensorFlow ecosystem supporting linear-time GPs; however, that still is a third-party library. If anything, it vouches for the opinion that this functionality should be kept separate from core Spark ML. Like Seth Henderson mentions above, it would help tremendously to showcase more packages which have this algo implemented. > [ML] Distributed Gaussian Process Regression for MLlib > -- > > Key: SPARK-23437 > URL: https://issues.apache.org/jira/browse/SPARK-23437 > Project: Spark > Issue Type: New Feature > Components: ML, MLlib >Affects Versions: 2.2.1 >Reporter: Valeriy Avanesov >Assignee: Apache Spark >Priority: Major > > Gaussian Process Regression (GP) is a well known black box non-linear > regression approach [1]. For years the approach remained inapplicable to > large samples due to its cubic computational complexity, however, more recent > techniques (Sparse GP) allowed for only linear complexity. The field > continues to attracts interest of the researches – several papers devoted to > GP were present on NIPS 2017. > Unfortunately, non-parametric regression techniques coming with mllib are > restricted to tree-based approaches. > I propose to create and include an implementation (which I am going to work > on) of so-called robust Bayesian Committee Machine proposed and investigated > in [2]. > [1] Carl Edward Rasmussen and Christopher K. I. Williams. 2005. _Gaussian > Processes for Machine Learning (Adaptive Computation and Machine Learning)_. > The MIT Press. > [2] Marc Peter Deisenroth and Jun Wei Ng. 2015. Distributed Gaussian > processes. In _Proceedings of the 32nd International Conference on > International Conference on Machine Learning - Volume 37_ (ICML'15), Francis > Bach and David Blei (Eds.), Vol. 37. JMLR.org 1481-1490. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-8682) Range Join for Spark SQL
[ https://issues.apache.org/jira/browse/SPARK-8682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339169#comment-16339169 ] Sujith Jay Nair commented on SPARK-8682: The ticket description refers to the implementation of range-joins in the ADAM project. I believe this is the [ADAM pull-request|https://github.com/bigdatagenomics/adam/pull/117] which is been referred to. FYR. > Range Join for Spark SQL > > > Key: SPARK-8682 > URL: https://issues.apache.org/jira/browse/SPARK-8682 > Project: Spark > Issue Type: Improvement > Components: SQL >Reporter: Herman van Hovell >Priority: Major > Attachments: perf_testing.scala > > > Currently Spark SQL uses a Broadcast Nested Loop join (or a filtered > Cartesian Join) when it has to execute the following range query: > {noformat} > SELECT A.*, >B.* > FROM tableA A >JOIN tableB B > ON A.start <= B.end > AND A.end > B.start > {noformat} > This is horribly inefficient. The performance of this query can be greatly > improved, when one of the tables can be broadcasted, by creating a range > index. A range index is basically a sorted map containing the rows of the > smaller table, indexed by both the high and low keys. using this structure > the complexity of the query would go from O(N * M) to O(N * 2 * LOG(M)), N = > number of records in the larger table, M = number of records in the smaller > (indexed) table. > I have created a pull request for this. According to the [Spark SQL: > Relational Data Processing in > Spark|http://people.csail.mit.edu/matei/papers/2015/sigmod_spark_sql.pdf] > paper similar work (page 11, section 7.2) has already been done by the ADAM > project (cannot locate the code though). > Any comments and/or feedback are greatly appreciated. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17833) 'monotonicallyIncreasingId()' should be deterministic
[ https://issues.apache.org/jira/browse/SPARK-17833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16309411#comment-16309411 ] Sujith Jay Nair commented on SPARK-17833: - This issue is resolved in 2.0, as mentioned in [SPARK-14241 | https://issues.apache.org/jira/browse/SPARK-14241] > 'monotonicallyIncreasingId()' should be deterministic > - > > Key: SPARK-17833 > URL: https://issues.apache.org/jira/browse/SPARK-17833 > Project: Spark > Issue Type: Bug > Components: SQL >Reporter: Kevin Ushey >Priority: Critical > > Right now, it's (IMHO) too easy to shoot yourself in the foot using > 'monotonicallyIncreasingId()', as it's easy to expect the generated numbers > to function as a 'stable' primary key, for example, and then go on to use > that key in e.g. 'joins' and so on. > Is there any reason why this function can't be made deterministic? Or, could > a deterministic analogue of this function be added (e.g. > 'withPrimaryKey(columnName = ...)')? > A solution is to immediately cache / persist the table after calling > 'monotonicallyIncreasingId()'; it's also possible that the documentation > should spell that out loud and clear. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22714) Spark API Not responding when Fatal exception occurred in event loop
[ https://issues.apache.org/jira/browse/SPARK-22714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16309366#comment-16309366 ] Sujith Jay Nair commented on SPARK-22714: - Hi [~todesking], is this reproducible outside of Spark REPL? Trying to understand if this is specific to Spark shell. > Spark API Not responding when Fatal exception occurred in event loop > > > Key: SPARK-22714 > URL: https://issues.apache.org/jira/browse/SPARK-22714 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 >Reporter: todesking >Priority: Critical > > To reproduce, let Spark to throw an OOM Exception in event loop: > {noformat} > scala> spark.sparkContext.getConf.get("spark.driver.memory") > res0: String = 1g > scala> val a = new Array[Int](4 * 1000 * 1000) > scala> val ds = spark.createDataset(a) > scala> ds.rdd.zipWithIndex > [Stage 0:> (0 + 0) / > 3]Exception in thread "dispatcher-event-loop-1" java.lang.OutOfMemoryError: > Java heap space > [Stage 0:> (0 + 0) / > 3] > // Spark is not responding > {noformat} > While not responding, Spark waiting for some Promise, but is never done. > The promise depends some process in event loop thread, but the thread is dead > when Fatal exception is thrown. > {noformat} > "main" #1 prio=5 os_prio=31 tid=0x7ffc9300b000 nid=0x1703 waiting on > condition [0x70216000] >java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0007ad978eb8> (a > scala.concurrent.impl.Promise$CompletionLatch) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at > scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:202) > at > scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218) > at > scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:153) > at > org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:619) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944) > at > org.apache.spark.rdd.ZippedWithIndexRDD.(ZippedWithIndexRDD.scala:50) > at > org.apache.spark.rdd.RDD$$anonfun$zipWithIndex$1.apply(RDD.scala:1293) > at > org.apache.spark.rdd.RDD$$anonfun$zipWithIndex$1.apply(RDD.scala:1293) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) > at org.apache.spark.rdd.RDD.zipWithIndex(RDD.scala:1292) > {noformat} > I don't know how to fix it properly, but it seems we need to add Fatal error > handling to EventLoop.run() in core/EventLoop.scala -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22465) Cogroup of two disproportionate RDDs could lead into 2G limit BUG
[ https://issues.apache.org/jira/browse/SPARK-22465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292581#comment-16292581 ] Sujith Jay Nair commented on SPARK-22465: - Would something along the lines of 'add a safety-check that ignores the partitioner if the number of partitions on the RDDs are very different in magnitude', as the reporter suggests, be a satisfactory solution? Any pointers here would be very helpful. > Cogroup of two disproportionate RDDs could lead into 2G limit BUG > - > > Key: SPARK-22465 > URL: https://issues.apache.org/jira/browse/SPARK-22465 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.0.0, 1.0.1, 1.0.2, 1.1.0, 1.1.1, 1.2.0, 1.2.1, 1.2.2, > 1.3.0, 1.3.1, 1.4.0, 1.4.1, 1.5.0, 1.5.1, 1.5.2, 1.6.0, 1.6.1, 1.6.2, 1.6.3, > 2.0.0, 2.0.1, 2.0.2, 2.1.0, 2.1.1, 2.1.2, 2.2.0 >Reporter: Amit Kumar >Priority: Critical > > While running my spark pipeline, it failed with the following exception > {noformat} > 2017-11-03 04:49:09,776 [Executor task launch worker for task 58670] ERROR > org.apache.spark.executor.Executor - Exception in task 630.0 in stage 28.0 > (TID 58670) > java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE > at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:869) > at > org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:103) > at > org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:91) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1303) > at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:105) > at > org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:469) > at > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:705) > at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:285) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:99) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:324) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {noformat} > After debugging I found that the issue lies with how spark handles cogroup of > two RDDs. > Here is the relevant code from apache spark > {noformat} > /** >* For each key k in `this` or `other`, return a resulting RDD that > contains a tuple with the >* list of values for that key in `this` as well as `other`. >*/ > def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = > self.withScope { > cogroup(other, defaultPartitioner(self, other)) > } > /** >* Choose a partitioner to use for a cogroup-like operation between a > number of RDDs. >* >* If any of the RDDs already has a partitioner, choose that one. >* >* Otherwise, we use a default HashPartitioner. For the number of > partitions, if >* spark.default.parallelism is set, then we'll use the value from > SparkContext >* defaultParallelism, otherwise we'll use the max number of upstream > partitions. >* >* Unless spark.default.parallelism is set, the number of partitions will > be the >* same as the number of partitions in the largest upstream RDD, as this > should >* be least likely to cause out-of-memory errors. >* >* We use two method parameters (rdd, others) to enforce callers passing at > least 1 RDD. >*/ > def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = { > val rdds = (Seq(rdd) ++ others) > val hasPartitioner = rdds.filter(_.partitioner.exists(_.numPartitions > > 0)) > if (hasPartitioner.nonEmpty) { > hasPartitioner.maxBy(_.partitions.length).partitioner.get > } else { > if (rdd.context.conf.contains("spark.default.parallelism")) { > new HashPartitioner(rdd.context.defaultParallelism) > } else { > new HashPartitioner(rdds.map(_.partitions.length).max) > } > } > } > {noformat} > Given this suppose we have two pair RDDs. > RDD1 : A small RDD which fewer data and partitions > RDD2: A huge RDD which has loads of data and partitions > Now in the code if we were to have a cogroup > {noformat} > val RDD3 = RDD1.cogroup(RDD2) > {noformat} > there is a case where this could lead to the SPARK-6235 Bug which is If RDD1 > has a partitioner when it is being called into a cogroup. This is because the > cogroups partitions are then decided by the partitioner and could lead to the > huge
[jira] [Commented] (SPARK-22465) Cogroup of two disproportionate RDDs could lead into 2G limit BUG
[ https://issues.apache.org/jira/browse/SPARK-22465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292327#comment-16292327 ] Sujith Jay Nair commented on SPARK-22465: - Hi [~tgraves], is there a plan to resolve this behaviour of cogroup, outside of the umbrella ticket for fixing 2G limit ([SPARK-6235]). I wish to chip in if that is the case. Thank you. > Cogroup of two disproportionate RDDs could lead into 2G limit BUG > - > > Key: SPARK-22465 > URL: https://issues.apache.org/jira/browse/SPARK-22465 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.0.0, 1.0.1, 1.0.2, 1.1.0, 1.1.1, 1.2.0, 1.2.1, 1.2.2, > 1.3.0, 1.3.1, 1.4.0, 1.4.1, 1.5.0, 1.5.1, 1.5.2, 1.6.0, 1.6.1, 1.6.2, 1.6.3, > 2.0.0, 2.0.1, 2.0.2, 2.1.0, 2.1.1, 2.1.2, 2.2.0 >Reporter: Amit Kumar >Priority: Critical > > While running my spark pipeline, it failed with the following exception > {noformat} > 2017-11-03 04:49:09,776 [Executor task launch worker for task 58670] ERROR > org.apache.spark.executor.Executor - Exception in task 630.0 in stage 28.0 > (TID 58670) > java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE > at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:869) > at > org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:103) > at > org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:91) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1303) > at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:105) > at > org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:469) > at > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:705) > at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:285) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:99) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:324) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {noformat} > After debugging I found that the issue lies with how spark handles cogroup of > two RDDs. > Here is the relevant code from apache spark > {noformat} > /** >* For each key k in `this` or `other`, return a resulting RDD that > contains a tuple with the >* list of values for that key in `this` as well as `other`. >*/ > def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = > self.withScope { > cogroup(other, defaultPartitioner(self, other)) > } > /** >* Choose a partitioner to use for a cogroup-like operation between a > number of RDDs. >* >* If any of the RDDs already has a partitioner, choose that one. >* >* Otherwise, we use a default HashPartitioner. For the number of > partitions, if >* spark.default.parallelism is set, then we'll use the value from > SparkContext >* defaultParallelism, otherwise we'll use the max number of upstream > partitions. >* >* Unless spark.default.parallelism is set, the number of partitions will > be the >* same as the number of partitions in the largest upstream RDD, as this > should >* be least likely to cause out-of-memory errors. >* >* We use two method parameters (rdd, others) to enforce callers passing at > least 1 RDD. >*/ > def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = { > val rdds = (Seq(rdd) ++ others) > val hasPartitioner = rdds.filter(_.partitioner.exists(_.numPartitions > > 0)) > if (hasPartitioner.nonEmpty) { > hasPartitioner.maxBy(_.partitions.length).partitioner.get > } else { > if (rdd.context.conf.contains("spark.default.parallelism")) { > new HashPartitioner(rdd.context.defaultParallelism) > } else { > new HashPartitioner(rdds.map(_.partitions.length).max) > } > } > } > {noformat} > Given this suppose we have two pair RDDs. > RDD1 : A small RDD which fewer data and partitions > RDD2: A huge RDD which has loads of data and partitions > Now in the code if we were to have a cogroup > {noformat} > val RDD3 = RDD1.cogroup(RDD2) > {noformat} > there is a case where this could lead to the SPARK-6235 Bug which is If RDD1 > has a partitioner when it is being called into a cogroup. This is because the > cogroups partitions are then decided by the partitioner and could lead to the > huge RDD2 being shuffled into a small number of partitions. > One way is