Hi, I'm trying to use Spark Streaming with a very simple script like this:
from pyspark import SparkContext, SparkConf from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils sc = SparkContext(appName="PythonSparkStreamingKafka") ssc = StreamingContext(sc, 1) kafkaParams = {"metadata.broker.list": "172.31.71.104:9092", "auto.offset.reset": "smallest"} training = KafkaUtils.createDirectStream(ssc, ["test"], kafkaParams) training.pprint() ssc.start() ssc.awaitTermination() But although locally it works, with the cluster using Standalone mode it crashes. I have a cluster with 4 machines: 1 machine with Kafka Producer, 1 Broker and 1 Zookeeper 1 machine is the driver 2 machines are the workers. The strange thing is that when I had Kafka Producer, Broker and Zookeeper in the same machine in which I have the driver, it worked both locally and in the cluster. But obviously for the sake of scalability and modularity I'd like to use the current configuration. I'm using Spark 2.4.6, the Kafka Streaming API are "spark-streaming-kafka-0-8-assembly_2.11-2.4.6" and the Kafka version that I'm currently using is kafka_2.11-2.4.1 The result is the following: 020-07-24 09:48:25,869 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, 172.31.69.185, executor 0): java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:197) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:213) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21) at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224) at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561) at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945) at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195) 2020-07-24 09:48:25,875 INFO scheduler.TaskSetManager: Starting task 0.1 in stage 0.0 (TID 1, 172.31.69.185, executor 0, partition 0, ANY, 7785 bytes) 2020-07-24 09:48:25,950 INFO scheduler.TaskSetManager: Lost task 0.1 in stage 0.0 (TID 1) on 172.31.69.185, executor 0: java.nio.channels.ClosedChannelException (null) [duplicate 1] 2020-07-24 09:48:25,952 INFO scheduler.TaskSetManager: Starting task 0.2 in stage 0.0 (TID 2, 172.31.69.185, executor 0, partition 0, ANY, 7785 bytes) 2020-07-24 09:48:25,984 INFO scheduler.TaskSetManager: Lost task 0.2 in stage 0.0 (TID 2) on 172.31.69.185, executor 0: java.nio.channels.ClosedChannelException (null) [duplicate 2] 2020-07-24 09:48:25,985 INFO scheduler.TaskSetManager: Starting task 0.3 in stage 0.0 (TID 3, 172.31.79.221, executor 1, partition 0, ANY, 7785 bytes) 2020-07-24 09:48:26,026 INFO scheduler.JobScheduler: Added jobs for time 1595584106000 ms 2020-07-24 09:48:26,375 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on 172.31.79.221:44371 (size: 4.0 KB, free: 366.3 MB) 2020-07-24 09:48:27,022 INFO scheduler.JobScheduler: Added jobs for time 1595584107000 ms 2020-07-24 09:48:27,165 INFO scheduler.TaskSetManager: Lost task 0.3 in stage 0.0 (TID 3) on 172.31.79.221, executor 1: java.nio.channels.ClosedChannelException (null) [duplicate 3] 2020-07-24 09:48:27,167 ERROR scheduler.TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job 2020-07-24 09:48:27,171 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 2020-07-24 09:48:27,172 INFO scheduler.TaskSchedulerImpl: Cancelling stage 0 2020-07-24 09:48:27,172 INFO scheduler.TaskSchedulerImpl: Killing all running tasks in stage 0: Stage cancelled 2020-07-24 09:48:27,174 INFO scheduler.DAGScheduler: ResultStage 0 (runJob at PythonRDD.scala:153) failed in 2.943 s due to Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, 172.31.79.221, executor 1): java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:197) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:213) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21) at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224) at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561) at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945) at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195) Driver stacktrace: 2020-07-24 09:48:27,179 INFO scheduler.DAGScheduler: Job 0 failed: runJob at PythonRDD.scala:153, took 3.010820 s 2020-07-24 09:48:27,190 INFO scheduler.JobScheduler: Finished job streaming job 1595584104000 ms.0 from job set of time 1595584104000 ms 2020-07-24 09:48:27,191 INFO scheduler.JobScheduler: Starting job streaming job 1595584105000 ms.0 from job set of time 1595584105000 ms 2020-07-24 09:48:27,193 ERROR scheduler.JobScheduler: Error running job streaming job 1595584104000 ms.0 org.apache.spark.SparkException: An exception was raised by Python: Traceback (most recent call last): File "/home/ubuntu/spark/python/lib/pyspark.zip/pyspark/streaming/util.py", line 68, in call r = self.func(t, *rdds) File "/home/ubuntu/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 173, in takeAndPrint taken = rdd.take(num + 1) File "/home/ubuntu/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1360, in take res = self.context.runJob(self, takeUpToNumLeft, p) File "/home/ubuntu/spark/python/lib/pyspark.zip/pyspark/context.py", line 1069, in runJob sock_info = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions) File "/home/ubuntu/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__ answer, self.gateway_client, self.target_id, self.name) File "/home/ubuntu/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, 172.31.79.221, executor 1): java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:197) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:213) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21) at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224) at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561) at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945) at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101) at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:153) at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) Caused by: java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:197) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:213) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21) at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224) at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561) at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945) at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195) at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95) at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78) at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179) at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) at scala.util.Try$.apply(Try.scala:192) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 2020-07-24 09:48:27,211 INFO spark.SparkContext: Starting job: runJob at PythonRDD.scala:153 2020-07-24 09:48:27,215 INFO scheduler.DAGScheduler: Got job 1 (runJob at PythonRDD.scala:153) with 1 output partitions 2020-07-24 09:48:27,215 INFO scheduler.DAGScheduler: Final stage: ResultStage 1 (runJob at PythonRDD.scala:153) 2020-07-24 09:48:27,215 INFO scheduler.DAGScheduler: Parents of final stage: List() 2020-07-24 09:48:27,216 INFO scheduler.DAGScheduler: Missing parents: List() 2020-07-24 09:48:27,216 INFO scheduler.DAGScheduler: Submitting ResultStage 1 (PythonRDD[9] at RDD at PythonRDD.scala:53), which has no missing parents 2020-07-24 09:48:27,220 INFO memory.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 6.9 KB, free 366.3 MB) 2020-07-24 09:48:27,223 INFO memory.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 4.0 KB, free 366.3 MB) 2020-07-24 09:48:27,225 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on ip-172-31-69-46.ec2.internal:41579 (size: 4.0 KB, free: 366.3 MB) 2020-07-24 09:48:27,226 INFO spark.SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1163 2020-07-24 09:48:27,227 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (PythonRDD[9] at RDD at PythonRDD.scala:53) (first 15 tasks are for partitions Vector(0)) 2020-07-24 09:48:27,229 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0 with 1 tasks 2020-07-24 09:48:27,230 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 1.0 (TID 4, 172.31.69.185, executor 0, partition 0, ANY, 7785 bytes) 2020-07-24 09:48:27,248 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on 172.31.69.185:44675 (size: 4.0 KB, free: 366.3 MB) Traceback (most recent call last): File "/home/ubuntu/./prova2.py", line 22, in <module> ssc.awaitTermination() File "/home/ubuntu/spark/python/lib/pyspark.zip/pyspark/streaming/context.py", line 192, in awaitTermination File "/home/ubuntu/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__ File "/home/ubuntu/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value 2020-07-24 09:48:27,315 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 1.0 (TID 4) in 85 ms on 172.31.69.185 (executor 0) (1/1) 2020-07-24 09:48:27,316 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 2020-07-24 09:48:27,321 INFO python.PythonAccumulatorV2: Connected to AccumulatorServer at host: 127.0.0.1 port: 34673 2020-07-24 09:48:27,324 INFO scheduler.DAGScheduler: ResultStage 1 (runJob at PythonRDD.scala:153) finished in 0.106 s 2020-07-24 09:48:27,325 INFO scheduler.DAGScheduler: Job 1 finished: runJob at PythonRDD.scala:153, took 0.113169 s 2020-07-24 09:48:27,448 INFO spark.ContextCleaner: Cleaned accumulator 20 2020-07-24 09:48:27,448 INFO spark.ContextCleaner: Cleaned accumulator 13 2020-07-24 09:48:27,449 INFO spark.ContextCleaner: Cleaned accumulator 3 2020-07-24 09:48:27,449 INFO spark.ContextCleaner: Cleaned accumulator 8 2020-07-24 09:48:27,449 INFO spark.ContextCleaner: Cleaned accumulator 7 2020-07-24 09:48:27,449 INFO spark.ContextCleaner: Cleaned accumulator 10 2020-07-24 09:48:27,449 INFO spark.ContextCleaner: Cleaned accumulator 4 2020-07-24 09:48:27,449 INFO spark.ContextCleaner: Cleaned accumulator 6 2020-07-24 09:48:27,449 INFO spark.ContextCleaner: Cleaned accumulator 11 2020-07-24 09:48:27,449 INFO spark.ContextCleaner: Cleaned accumulator 5 2020-07-24 09:48:27,449 INFO spark.ContextCleaner: Cleaned accumulator 25 py4j.protocol.Py4JJavaError: An error occurred while calling o23.awaitTermination. : org.apache.spark.SparkException: An exception was raised by Python: Traceback (most recent call last): File "/home/ubuntu/spark/python/lib/pyspark.zip/pyspark/streaming/util.py", line 68, in call r = self.func(t, *rdds) File "/home/ubuntu/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 173, in takeAndPrint taken = rdd.take(num + 1) File "/home/ubuntu/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1360, in take res = self.context.runJob(self, takeUpToNumLeft, p) File "/home/ubuntu/spark/python/lib/pyspark.zip/pyspark/context.py", line 1069, in runJob sock_info = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions) File "/home/ubuntu/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__ answer, self.gateway_client, self.target_id, self.name) File "/home/ubuntu/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, 172.31.79.221, executor 1): java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:197) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:213) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21) at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224) at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561) at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945) at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101) at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:153) at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) Caused by: java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:197) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:213) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21) at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224) at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561) at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945) at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195) at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95) at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78) at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179) at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) at scala.util.Try$.apply(Try.scala:192) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 2020-07-24 09:48:27,475 INFO storage.BlockManagerInfo: Removed broadcast_1_piece0 on ip-172-31-69-46.ec2.internal:41579 in memory (size: 4.0 KB, free: 366.3 MB) 2020-07-24 09:48:27,477 INFO storage.BlockManagerInfo: Removed broadcast_1_piece0 on 172.31.69.185:44675 in memory (size: 4.0 KB, free: 366.3 MB) 2020-07-24 09:48:27,509 INFO spark.ContextCleaner: Cleaned accumulator 15 2020-07-24 09:48:27,509 INFO spark.ContextCleaner: Cleaned accumulator 23 2020-07-24 09:48:27,509 INFO spark.ContextCleaner: Cleaned accumulator 21 2020-07-24 09:48:27,509 INFO spark.ContextCleaner: Cleaned accumulator 12 2020-07-24 09:48:27,511 INFO spark.ContextCleaner: Cleaned accumulator 2 2020-07-24 09:48:27,514 INFO spark.ContextCleaner: Cleaned accumulator 18 2020-07-24 09:48:27,514 INFO spark.ContextCleaner: Cleaned accumulator 1 2020-07-24 09:48:27,514 INFO spark.ContextCleaner: Cleaned accumulator 14 2020-07-24 09:48:27,514 INFO streaming.StreamingContext: Invoking stop(stopGracefully=false) from shutdown hook 2020-07-24 09:48:27,511 INFO scheduler.JobScheduler: Finished job streaming job 1595584105000 ms.0 from job set of time 1595584105000 ms 2020-07-24 09:48:27,523 INFO scheduler.ReceiverTracker: ReceiverTracker stopped 2020-07-24 09:48:27,523 INFO scheduler.JobGenerator: Stopping JobGenerator immediately 2020-07-24 09:48:27,524 INFO scheduler.JobScheduler: Starting job streaming job 1595584106000 ms.0 from job set of time 1595584106000 ms 2020-07-24 09:48:27,527 INFO scheduler.JobScheduler: Finished job streaming job 1595584106000 ms.0 from job set of time 1595584106000 ms 2020-07-24 09:48:27,528 INFO storage.BlockManagerInfo: Removed broadcast_0_piece0 on 172.31.69.185:44675 in memory (size: 4.0 KB, free: 366.3 MB) 2020-07-24 09:48:27,529 INFO storage.BlockManagerInfo: Removed broadcast_0_piece0 on 172.31.79.221:44371 in memory (size: 4.0 KB, free: 366.3 MB) 2020-07-24 09:48:27,530 INFO storage.BlockManagerInfo: Removed broadcast_0_piece0 on ip-172-31-69-46.ec2.internal:41579 in memory (size: 4.0 KB, free: 366.3 MB) 2020-07-24 09:48:27,531 INFO scheduler.JobScheduler: Starting job streaming job 1595584107000 ms.0 from job set of time 1595584107000 ms 2020-07-24 09:48:27,532 INFO scheduler.JobScheduler: Finished job streaming job 1595584107000 ms.0 from job set of time 1595584107000 ms 2020-07-24 09:48:27,532 ERROR scheduler.JobScheduler: Error running job streaming job 1595584105000 ms.0 py4j.Py4JException: Error while sending a command. at py4j.CallbackClient.sendCommand(CallbackClient.java:397) at py4j.CallbackClient.sendCommand(CallbackClient.java:356) at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:106) at com.sun.proxy.$Proxy18.call(Unknown Source) at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:92) at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78) at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179) at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) at scala.util.Try$.apply(Try.scala:192) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: py4j.Py4JNetworkException: Error while sending a command: null response: c p2 call L1595584105000 lo96 e at py4j.CallbackConnection.sendCommand(CallbackConnection.java:158) at py4j.CallbackClient.sendCommand(CallbackClient.java:384) ... 24 more 2020-07-24 09:48:27,534 ERROR scheduler.JobScheduler: Error running job streaming job 1595584106000 ms.0 py4j.Py4JException: Error while sending a command. at py4j.CallbackClient.sendCommand(CallbackClient.java:397) at py4j.CallbackClient.sendCommand(CallbackClient.java:356) at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:106) at com.sun.proxy.$Proxy18.call(Unknown Source) at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:92) at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78) at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179) at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) at scala.util.Try$.apply(Try.scala:192) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: py4j.Py4JNetworkException: Error while sending a command: null response: c p2 call L1595584106000 lo113 e at py4j.CallbackConnection.sendCommand(CallbackConnection.java:158) at py4j.CallbackClient.sendCommand(CallbackClient.java:384) ... 24 more 2020-07-24 09:48:27,535 ERROR scheduler.JobScheduler: Error running job streaming job 1595584107000 ms.0 py4j.Py4JException: Cannot obtain a new communication channel at py4j.CallbackClient.sendCommand(CallbackClient.java:380) at py4j.CallbackClient.sendCommand(CallbackClient.java:356) at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:106) at com.sun.proxy.$Proxy18.call(Unknown Source) at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:92) at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78) at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179) at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) at scala.util.Try$.apply(Try.scala:192) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 2020-07-24 09:48:27,534 INFO util.RecurringTimer: Stopped timer for JobGenerator after time 1595584107000 2020-07-24 09:48:27,540 ERROR python.PythonDStream$$anon$1: Cannot connect to Python process. It's probably dead. Stopping StreamingContext. py4j.Py4JException: Cannot obtain a new communication channel at py4j.CallbackClient.sendCommand(CallbackClient.java:380) at py4j.CallbackClient.sendCommand(CallbackClient.java:356) at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:106) at com.sun.proxy.$Proxy18.call(Unknown Source) at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:92) at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78) at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179) at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) at scala.util.Try$.apply(Try.scala:192) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 2020-07-24 09:48:27,544 INFO scheduler.JobGenerator: Stopped JobGenerator 2020-07-24 09:48:27,551 INFO scheduler.JobScheduler: Stopped JobScheduler 2020-07-24 09:48:27,556 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@6ad44bd5{/streaming,null,UNAVAILABLE,@Spark} 2020-07-24 09:48:27,557 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@68ac401f{/streaming/json,null,UNAVAILABLE,@Spark} 2020-07-24 09:48:27,558 INFO spark.ContextCleaner: Cleaned accumulator 16 2020-07-24 09:48:27,558 INFO spark.ContextCleaner: Cleaned accumulator 9 2020-07-24 09:48:27,558 INFO spark.ContextCleaner: Cleaned accumulator 19 2020-07-24 09:48:27,558 INFO spark.ContextCleaner: Cleaned accumulator 22 2020-07-24 09:48:27,558 INFO spark.ContextCleaner: Cleaned accumulator 17 2020-07-24 09:48:27,558 INFO spark.ContextCleaner: Cleaned accumulator 24 2020-07-24 09:48:27,559 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@11792245{/streaming/batch,null,UNAVAILABLE,@Spark} 2020-07-24 09:48:27,559 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@2e5d35e4{/streaming/batch/json,null,UNAVAILABLE,@Spark} 2020-07-24 09:48:27,560 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@2dedce2c{/static/streaming,null,UNAVAILABLE,@Spark} 2020-07-24 09:48:27,562 INFO streaming.StreamingContext: StreamingContext stopped successfully 2020-07-24 09:48:27,562 WARN streaming.StreamingContext: StreamingContext has already been stopped 2020-07-24 09:48:27,562 INFO spark.SparkContext: Invoking stop() from shutdown hook 2020-07-24 09:48:27,569 INFO server.AbstractConnector: Stopped Spark@4c8c11ce{HTTP/1.1,[http/1.1]}{0.0.0.0:4040} 2020-07-24 09:48:27,572 INFO ui.SparkUI: Stopped Spark web UI at http://ip-172-31-69-46.ec2.internal:4040 2020-07-24 09:48:27,575 INFO cluster.StandaloneSchedulerBackend: Shutting down all executors 2020-07-24 09:48:27,576 INFO cluster.CoarseGrainedSchedulerBackend$DriverEndpoint: Asking each executor to shut down 2020-07-24 09:48:27,592 INFO spark.MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 2020-07-24 09:48:27,600 INFO memory.MemoryStore: MemoryStore cleared 2020-07-24 09:48:27,600 INFO storage.BlockManager: BlockManager stopped 2020-07-24 09:48:27,604 INFO storage.BlockManagerMaster: BlockManagerMaster stopped 2020-07-24 09:48:27,607 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 2020-07-24 09:48:27,613 INFO spark.SparkContext: Successfully stopped SparkContext 2020-07-24 09:48:27,614 INFO util.ShutdownHookManager: Shutdown hook called 2020-07-24 09:48:27,615 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-85b5d4dc-0524-4148-aff4-8a77deb6ccca 2020-07-24 09:48:27,617 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-49e4b426-544b-4cb4-a3e2-c0d98985e7b7 2020-07-24 09:48:27,620 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-49e4b426-544b-4cb4-a3e2-c0d98985e7b7/pyspark-c0db4268-2afd-4336-bf77-9dc7257213d2 I debug everything but I don't have any idea about how to solve this problem. Do you have any suggestion? It could be a Kafka configuration problem? Thanks in advance, Davide