Hi,
    My code is below:

from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

def test(record_list):
    print(list(record_list))
    return record_list

def functionToCreateContext():
    conf = 
SparkConf().setAppName("model_event").setMaster("spark://172.22.9.181:7077")
\
        .set("spark.executor.memory", '6g') \
        .set("spark.executor.cores", '8') \
        .set("spark.deploy.defaultCores", '8') \
        .set("spark.cores.max", '16') \
        .set("spark.streaming.kafka.maxRatePerPartition", 1) \
        .set("spark.streaming.blockInterval", 1) \
        .set("spark.default.parallelism", 8) \
        .set("spark.driver.host", '172.22.9.181') \


    sc = SparkContext(conf=conf)
    ssc = StreamingContext(sc, 5)
    ssc.checkpoint("/spark/checkpoints/model_event_spark")
    return ssc

if __name__ == '__main__':
    ssc = StreamingContext.getOrCreate("/spark/checkpoints/model_event_spark",
functionToCreateContext)
    record_dstream =
KafkaUtils.createDirectStream(ssc,topics=["installmentdb_t_bill"],

kafkaParams={"bootstrap.servers":"xxx:9092",

"auto.offset.reset":"smallest",
                                                                },
                                                   )

    record_dstream.checkpoint(5).mapPartitions(test).pprint()
    ssc.start()
    ssc.awaitTermination()


When the scripts starts at the first time,it work well.

But second time started from checkpointDirectory,it has problem like:

2019-07-30 02:48:50,290 ERROR streaming.StreamingContext: Error
starting the context, marking it as stopped
org.apache.spark.SparkException:
org.apache.spark.streaming.api.python.PythonTransformedDStream@319b7bed
has not been initialized
        at 
org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:313)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
        at scala.Option.orElse(Option.scala:289)
        at 
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
        at 
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
        at 
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
        at 
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
        at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at 
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
        at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
        at 
org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
        at 
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:234)
        at 
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:229)
        at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
        at 
org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:229)
        at 
org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:98)
        at 
org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:103)
        at 
org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply$mcV$sp(StreamingContext.scala:583)
        at 
org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:578)
        at 
org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:578)
        at ... run in separate thread using org.apache.spark.util.ThreadUtils 
... ()
        at 
org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:578)
        at 
org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572)
        at 
org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:556)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)

What is wrong with my script?

Reply via email to