1.:  I will paste the full content of the environment page of the example 
application running against the cluster at the end of this message.
2. and 3.:  Following #2 I was able to see that the count was incorrectly 0 
when running against the cluster, and following #3 I was able to get the 
message:
org.apache.spark.SparkException: Checkpoint RDD CheckpointRDD[4] at count at 
<console>:15(0) has different number of partitions than original RDD 
MappedRDD[3] at textFile at <console>:12(2)

I think I understand - state checkpoints and other file-exchange operations in 
Spark cluster require a distributed/shared filesystem, even with just a 
single-host cluster and the driver/shell on a second host. Is that correct?

Thank you,
Paul



Stages
Storage
Environment
Executors
NetworkWordCumulativeCountUpdateStateByKey application UI
Environment
Runtime Information

Name    Value
Java Home       /usr/lib/jvm/jdk1.8.0/jre
Java Version    1.8.0 (Oracle Corporation)
Scala Home      
Scala Version   version 2.10.3
Spark Properties

Name    Value
spark.app.name  NetworkWordCumulativeCountUpdateStateByKey
spark.cleaner.ttl       3600
spark.deploy.recoveryMode       ZOOKEEPER
spark.deploy.zookeeper.url      pubsub01:2181
spark.driver.host       10.10.41.67
spark.driver.port       37360
spark.fileserver.uri    http://10.10.41.67:40368
spark.home      /home/pmogren/streamproc/spark-0.9.0-incubating-bin-hadoop2
spark.httpBroadcast.uri http://10.10.41.67:45440
spark.jars      
/home/pmogren/streamproc/spark-0.9.0-incubating-bin-hadoop2/examples/target/scala-2.10/spark-examples_2.10-assembly-0.9.0-incubating.jar
spark.master    spark://10.10.41.19:7077
System Properties

Name    Value
awt.toolkit     sun.awt.X11.XToolkit
file.encoding   ANSI_X3.4-1968
file.encoding.pkg       sun.io
file.separator  /
java.awt.graphicsenv    sun.awt.X11GraphicsEnvironment
java.awt.printerjob     sun.print.PSPrinterJob
java.class.version      52.0
java.endorsed.dirs      /usr/lib/jvm/jdk1.8.0/jre/lib/endorsed
java.ext.dirs   /usr/lib/jvm/jdk1.8.0/jre/lib/ext:/usr/java/packages/lib/ext
java.home       /usr/lib/jvm/jdk1.8.0/jre
java.io.tmpdir  /tmp
java.library.path       
java.net.preferIPv4Stack        true
java.runtime.name       Java(TM) SE Runtime Environment
java.runtime.version    1.8.0-b132
java.specification.name Java Platform API Specification
java.specification.vendor       Oracle Corporation
java.specification.version      1.8
java.vendor     Oracle Corporation
java.vendor.url http://java.oracle.com/
java.vendor.url.bug     http://bugreport.sun.com/bugreport/
java.version    1.8.0
java.vm.info    mixed mode
java.vm.name    Java HotSpot(TM) 64-Bit Server VM
java.vm.specification.name      Java Virtual Machine Specification
java.vm.specification.vendor    Oracle Corporation
java.vm.specification.version   1.8
java.vm.vendor  Oracle Corporation
java.vm.version 25.0-b70
line.separator  
log4j.configuration     conf/log4j.properties
os.arch amd64
os.name Linux
os.version      3.5.0-23-generic
path.separator  :
sun.arch.data.model     64
sun.boot.class.path     
/usr/lib/jvm/jdk1.8.0/jre/lib/resources.jar:/usr/lib/jvm/jdk1.8.0/jre/lib/rt.jar:/usr/lib/jvm/jdk1.8.0/jre/lib/sunrsasign.jar:/usr/lib/jvm/jdk1.8.0/jre/lib/jsse.jar:/usr/lib/jvm/jdk1.8.0/jre/lib/jce.jar:/usr/lib/jvm/jdk1.8.0/jre/lib/charsets.jar:/usr/lib/jvm/jdk1.8.0/jre/lib/jfr.jar:/usr/lib/jvm/jdk1.8.0/jre/classes
sun.boot.library.path   /usr/lib/jvm/jdk1.8.0/jre/lib/amd64
sun.cpu.endian  little
sun.cpu.isalist 
sun.io.serialization.extendedDebugInfo  true
sun.io.unicode.encoding UnicodeLittle
sun.java.command        
org.apache.spark.streaming.examples.StatefulNetworkWordCount 
spark://10.10.41.19:7077 localhost 9999
sun.java.launcher       SUN_STANDARD
sun.jnu.encoding        ANSI_X3.4-1968
sun.management.compiler HotSpot 64-Bit Tiered Compilers
sun.nio.ch.bugLevel     
sun.os.patch.level      unknown
user.country    US
user.dir        /home/pmogren/streamproc/spark-0.9.0-incubating-bin-hadoop2
user.home       /home/pmogren
user.language   en
user.name       pmogren
user.timezone   America/New_York
Classpath Entries

Resource        Source
/home/pmogren/streamproc/spark-0.9.0-incubating-bin-hadoop2/assembly/target/scala-2.10/spark-assembly_2.10-0.9.0-incubating-hadoop2.2.0.jar
     System Classpath
/home/pmogren/streamproc/spark-0.9.0-incubating-bin-hadoop2/conf        System 
Classpath
/home/pmogren/streamproc/spark-0.9.0-incubating-bin-hadoop2/examples/target/scala-2.10/spark-examples_2.10-assembly-0.9.0-incubating.jar
        System Classpath
http://10.10.41.67:40368/jars/spark-examples_2.10-assembly-0.9.0-incubating.jar 
Added By User











From: Tathagata Das [mailto:tathagata.das1...@gmail.com] 
Sent: Monday, April 07, 2014 7:54 PM
To: user@spark.apache.org
Subject: Re: CheckpointRDD has different number of partitions than original RDD

Few things that would be helpful. 

1. Environment settings - you can find them on the environment tab in the Spark 
application UI
2. Are you setting the HDFS configuration correctly in your Spark program? For 
example, can you write a HDFS file from a Spark program (say spark-shell) to 
your HDFS installation and read it back into Spark (i.e., create a RDD)? You 
can test this by write an RDD as a text file from the shell, and then try to 
read it back from another shell. 
3. If that works, then lets try explicitly checkpointing an RDD. To do this you 
can take any RDD and do the following. 

myRDD.checkpoint()
myRDD.count()

If there is some issue, then this should reproduce the above error.

TD

On Mon, Apr 7, 2014 at 3:48 PM, Paul Mogren <pmog...@commercehub.com> wrote:
Hello, Spark community!  My name is Paul. I am a Spark newbie, evaluating 
version 0.9.0 without any Hadoop at all, and need some help. I run into the 
following error with the StatefulNetworkWordCount example (and similarly in my 
prototype app, when I use the updateStateByKey operation).  I get this when 
running against my small cluster, but not (so far) against local[2].

61904 [spark-akka.actor.default-dispatcher-2] ERROR 
org.apache.spark.streaming.scheduler.JobScheduler - Error running job streaming 
job 1396905956000 ms.0
org.apache.spark.SparkException: Checkpoint RDD CheckpointRDD[310] at take at 
DStream.scala:586(0) has different number of partitions than original RDD 
MapPartitionsRDD[309] at mapPartitions at StateDStream.scala:66(2)
        at 
org.apache.spark.rdd.RDDCheckpointData.doCheckpoint(RDDCheckpointData.scala:99)
        at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:989)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:855)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:870)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:884)
        at org.apache.spark.rdd.RDD.take(RDD.scala:844)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachFunc$2$1.apply(DStream.scala:586)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachFunc$2$1.apply(DStream.scala:585)
        at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
        at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
        at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
        at scala.util.Try$.apply(Try.scala:161)
        at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
        at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:155)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:744)


Please let me know what other information would be helpful; I didn't find any 
question submission guidelines.

Thanks,
Paul

Reply via email to