code hangs in local master mode

2016-01-14 Thread Kai Wei
Hi list,

I ran into an issue which I think could be a bug.

I have a Hive table stored as parquet files. Let's say it's called
testtable. I found the code below stuck forever in spark-shell with a local
master or driver/executor:
sqlContext.sql("select * from testtable").rdd.cache.zipWithIndex().count

But it works if I use a standalone master.

I also tried several different variants:
don't cache the rdd(works):
sqlContext.sql("select * from testtable").rdd.zipWithIndex().count

cache the rdd after zipWithIndex(works):
sqlContext.sql("select * from testtable").rdd.zipWithIndex().cache.count

use parquet file reader(doesn't work):
sqlContext.read.parquet("hdfs://localhost:8020/user/hive/warehouse/testtable").rdd.cache.zipWithIndex().count

use parquet files on local file system(works)
sqlContext.read.parquet("/tmp/testtable").rdd.cache.zipWithIndex().count

I read the code of zipWithIndex() and the DAG visualization. I think the
function cause the Spark firstly retrieve n-1 partitions of target table
and cache them, then the last partition. It must be something wrong when
the driver/executor tries to read the last parition from HDFS .

I am using spark-1.5.2-bin-hadoop-2.6 on cloudera quickstart vm 5.4.2.

-- 
Kai Wei
Big Data Developer

Pythian - love your data

w...@pythian.com
Tel: +1 613 565 8696 x1579
Mobile: +61 403 572 456

-- 


--





Re: code hangs in local master mode

2016-01-14 Thread Kai Wei
Thanks for your reply, Ted.

Below is the stack dump for all threads:

Thread dump for executor driver

Updated at 2016/01/14 20:35:41

Collapse All
Thread 89: Executor task launch worker-0 (TIMED_WAITING)

sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:460)
java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:359)
java.util.concurrent.SynchronousQueue.poll(SynchronousQueue.java:942)
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)

Thread 90: Executor task launch worker-1 (TIMED_WAITING)

sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:460)
java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:359)
java.util.concurrent.SynchronousQueue.poll(SynchronousQueue.java:942)
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)

Thread 91: Executor task launch worker-2 (TIMED_WAITING)

sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:460)
java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:359)
java.util.concurrent.SynchronousQueue.poll(SynchronousQueue.java:942)
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)

Thread 92: Executor task launch worker-3 (TIMED_WAITING)

sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:460)
java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:359)
java.util.concurrent.SynchronousQueue.poll(SynchronousQueue.java:942)
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)

Thread 37: BLOCK_MANAGER cleanup timer (WAITING)

java.lang.Object.wait(Native Method)
java.lang.Object.wait(Object.java:503)
java.util.TimerThread.mainLoop(Timer.java:526)
java.util.TimerThread.run(Timer.java:505)

Thread 38: BROADCAST_VARS cleanup timer (WAITING)

java.lang.Object.wait(Native Method)
java.lang.Object.wait(Object.java:503)
java.util.TimerThread.mainLoop(Timer.java:526)
java.util.TimerThread.run(Timer.java:505)

Thread 61: dag-scheduler-event-loop (WAITING)

sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
java.util.concurrent.LinkedBlockingDeque.takeFirst(LinkedBlockingDeque.java:489)
java.util.concurrent.LinkedBlockingDeque.take(LinkedBlockingDeque.java:678)
org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:46)

Thread 62: driver-heartbeater (TIMED_WAITING)

sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1090)
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:807)
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)

Thread 3: Finalizer (WAITING)

java.lang.Object.wait(Native Method)
java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:135)
java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:151)
java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209)

Thread 59: heartbeat-receiver-event-loop-thread (TIMED_WAITING)

sun.misc.Unsafe.park(Native Method)

Re: code hangs in local master mode

2016-01-14 Thread Ted Yu
Can you capture one or two stack traces of the local master process and
pastebin them ?

Thanks

On Thu, Jan 14, 2016 at 6:01 AM, Kai Wei  wrote:

> Hi list,
>
> I ran into an issue which I think could be a bug.
>
> I have a Hive table stored as parquet files. Let's say it's called
> testtable. I found the code below stuck forever in spark-shell with a local
> master or driver/executor:
> sqlContext.sql("select * from testtable").rdd.cache.zipWithIndex().count
>
> But it works if I use a standalone master.
>
> I also tried several different variants:
> don't cache the rdd(works):
> sqlContext.sql("select * from testtable").rdd.zipWithIndex().count
>
> cache the rdd after zipWithIndex(works):
> sqlContext.sql("select * from testtable").rdd.zipWithIndex().cache.count
>
> use parquet file reader(doesn't work):
>
> sqlContext.read.parquet("hdfs://localhost:8020/user/hive/warehouse/testtable").rdd.cache.zipWithIndex().count
>
> use parquet files on local file system(works)
> sqlContext.read.parquet("/tmp/testtable").rdd.cache.zipWithIndex().count
>
> I read the code of zipWithIndex() and the DAG visualization. I think the
> function cause the Spark firstly retrieve n-1 partitions of target table
> and cache them, then the last partition. It must be something wrong when
> the driver/executor tries to read the last parition from HDFS .
>
> I am using spark-1.5.2-bin-hadoop-2.6 on cloudera quickstart vm 5.4.2.
>
> --
> Kai Wei
> Big Data Developer
>
> Pythian - love your data
>
> w...@pythian.com
> Tel: +1 613 565 8696 x1579
> Mobile: +61 403 572 456
>
> --
>
>
>
>