Hi,

I'm trying to use Spark Streaming with a very simple script like this:

from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils


sc = SparkContext(appName="PythonSparkStreamingKafka")


ssc = StreamingContext(sc, 1)
kafkaParams = {"metadata.broker.list": "172.31.71.104:9092",
                       "auto.offset.reset": "smallest"}

training = KafkaUtils.createDirectStream(ssc, ["test"], kafkaParams)

training.pprint()

ssc.start()
ssc.awaitTermination()
But although locally it works, with the cluster using Standalone mode it 
crashes. I have a cluster with 4 machines:

1 machine with Kafka Producer, 1 Broker and 1 Zookeeper
1 machine is the driver
2 machines are the workers.

The strange thing is that when I had Kafka Producer, Broker and Zookeeper in 
the same machine in which I have the driver, it worked both locally and in the 
cluster. But obviously for the sake of scalability and modularity I'd like to 
use the current configuration.

I'm using Spark 2.4.6, the Kafka Streaming API are 
"spark-streaming-kafka-0-8-assembly_2.11-2.4.6" and the Kafka version that I'm 
currently using is kafka_2.11-2.4.1

The result is the following:

020-07-24 09:48:25,869 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 
0.0 (TID 0, 172.31.69.185, executor 0): java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
at 
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:197)
at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:213)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21)
at 
org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
at 
org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561)
at 
org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
at 
org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195)

2020-07-24 09:48:25,875 INFO scheduler.TaskSetManager: Starting task 0.1 in 
stage 0.0 (TID 1, 172.31.69.185, executor 0, partition 0, ANY, 7785 bytes)
2020-07-24 09:48:25,950 INFO scheduler.TaskSetManager: Lost task 0.1 in stage 
0.0 (TID 1) on 172.31.69.185, executor 0: 
java.nio.channels.ClosedChannelException (null) [duplicate 1]
2020-07-24 09:48:25,952 INFO scheduler.TaskSetManager: Starting task 0.2 in 
stage 0.0 (TID 2, 172.31.69.185, executor 0, partition 0, ANY, 7785 bytes)
2020-07-24 09:48:25,984 INFO scheduler.TaskSetManager: Lost task 0.2 in stage 
0.0 (TID 2) on 172.31.69.185, executor 0: 
java.nio.channels.ClosedChannelException (null) [duplicate 2]
2020-07-24 09:48:25,985 INFO scheduler.TaskSetManager: Starting task 0.3 in 
stage 0.0 (TID 3, 172.31.79.221, executor 1, partition 0, ANY, 7785 bytes)
2020-07-24 09:48:26,026 INFO scheduler.JobScheduler: Added jobs for time 
1595584106000 ms
2020-07-24 09:48:26,375 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 
in memory on 172.31.79.221:44371 (size: 4.0 KB, free: 366.3 MB)
2020-07-24 09:48:27,022 INFO scheduler.JobScheduler: Added jobs for time 
1595584107000 ms
2020-07-24 09:48:27,165 INFO scheduler.TaskSetManager: Lost task 0.3 in stage 
0.0 (TID 3) on 172.31.79.221, executor 1: 
java.nio.channels.ClosedChannelException (null) [duplicate 3]
2020-07-24 09:48:27,167 ERROR scheduler.TaskSetManager: Task 0 in stage 0.0 
failed 4 times; aborting job
2020-07-24 09:48:27,171 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, 
whose tasks have all completed, from pool
2020-07-24 09:48:27,172 INFO scheduler.TaskSchedulerImpl: Cancelling stage 0
2020-07-24 09:48:27,172 INFO scheduler.TaskSchedulerImpl: Killing all running 
tasks in stage 0: Stage cancelled
2020-07-24 09:48:27,174 INFO scheduler.DAGScheduler: ResultStage 0 (runJob at 
PythonRDD.scala:153) failed in 2.943 s due to Job aborted due to stage failure: 
Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 
0.0 (TID 3, 172.31.79.221, executor 1): java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
at 
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:197)
at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:213)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21)
at 
org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
at 
org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561)
at 
org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
at 
org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195)

Driver stacktrace:
2020-07-24 09:48:27,179 INFO scheduler.DAGScheduler: Job 0 failed: runJob at 
PythonRDD.scala:153, took 3.010820 s
2020-07-24 09:48:27,190 INFO scheduler.JobScheduler: Finished job streaming job 
1595584104000 ms.0 from job set of time 1595584104000 ms
2020-07-24 09:48:27,191 INFO scheduler.JobScheduler: Starting job streaming job 
1595584105000 ms.0 from job set of time 1595584105000 ms
2020-07-24 09:48:27,193 ERROR scheduler.JobScheduler: Error running job 
streaming job 1595584104000 ms.0
org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
  File "/home/ubuntu/spark/python/lib/pyspark.zip/pyspark/streaming/util.py", 
line 68, in call
    r = self.func(t, *rdds)
  File 
"/home/ubuntu/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 
173, in takeAndPrint
    taken = rdd.take(num + 1)
  File "/home/ubuntu/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1360, 
in take
    res = self.context.runJob(self, takeUpToNumLeft, p)
  File "/home/ubuntu/spark/python/lib/pyspark.zip/pyspark/context.py", line 
1069, in runJob
    sock_info = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, 
partitions)
  File 
"/home/ubuntu/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 
1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/home/ubuntu/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", 
line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling 
z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 
3, 172.31.79.221, executor 1): java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
at 
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:197)
at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:213)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21)
at 
org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
at 
org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561)
at 
org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
at 
org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195)

Driver stacktrace:
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
at scala.Option.foreach(Option.scala:257)
at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:153)
at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
at 
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:197)
at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:213)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21)
at 
org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
at 
org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561)
at 
org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
at 
org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195)


at 
org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95)
at 
org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
at 
org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
at 
org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at 
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2020-07-24 09:48:27,211 INFO spark.SparkContext: Starting job: runJob at 
PythonRDD.scala:153
2020-07-24 09:48:27,215 INFO scheduler.DAGScheduler: Got job 1 (runJob at 
PythonRDD.scala:153) with 1 output partitions
2020-07-24 09:48:27,215 INFO scheduler.DAGScheduler: Final stage: ResultStage 1 
(runJob at PythonRDD.scala:153)
2020-07-24 09:48:27,215 INFO scheduler.DAGScheduler: Parents of final stage: 
List()
2020-07-24 09:48:27,216 INFO scheduler.DAGScheduler: Missing parents: List()
2020-07-24 09:48:27,216 INFO scheduler.DAGScheduler: Submitting ResultStage 1 
(PythonRDD[9] at RDD at PythonRDD.scala:53), which has no missing parents
2020-07-24 09:48:27,220 INFO memory.MemoryStore: Block broadcast_1 stored as 
values in memory (estimated size 6.9 KB, free 366.3 MB)
2020-07-24 09:48:27,223 INFO memory.MemoryStore: Block broadcast_1_piece0 
stored as bytes in memory (estimated size 4.0 KB, free 366.3 MB)
2020-07-24 09:48:27,225 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 
in memory on ip-172-31-69-46.ec2.internal:41579 (size: 4.0 KB, free: 366.3 MB)
2020-07-24 09:48:27,226 INFO spark.SparkContext: Created broadcast 1 from 
broadcast at DAGScheduler.scala:1163
2020-07-24 09:48:27,227 INFO scheduler.DAGScheduler: Submitting 1 missing tasks 
from ResultStage 1 (PythonRDD[9] at RDD at PythonRDD.scala:53) (first 15 tasks 
are for partitions Vector(0))
2020-07-24 09:48:27,229 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0 
with 1 tasks
2020-07-24 09:48:27,230 INFO scheduler.TaskSetManager: Starting task 0.0 in 
stage 1.0 (TID 4, 172.31.69.185, executor 0, partition 0, ANY, 7785 bytes)
2020-07-24 09:48:27,248 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 
in memory on 172.31.69.185:44675 (size: 4.0 KB, free: 366.3 MB)
Traceback (most recent call last):
  File "/home/ubuntu/./prova2.py", line 22, in <module>
    ssc.awaitTermination()
  File 
"/home/ubuntu/spark/python/lib/pyspark.zip/pyspark/streaming/context.py", line 
192, in awaitTermination
  File 
"/home/ubuntu/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 
1257, in __call__
  File "/home/ubuntu/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", 
line 328, in get_return_value
2020-07-24 09:48:27,315 INFO scheduler.TaskSetManager: Finished task 0.0 in 
stage 1.0 (TID 4) in 85 ms on 172.31.69.185 (executor 0) (1/1)
2020-07-24 09:48:27,316 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0, 
whose tasks have all completed, from pool
2020-07-24 09:48:27,321 INFO python.PythonAccumulatorV2: Connected to 
AccumulatorServer at host: 127.0.0.1 port: 34673
2020-07-24 09:48:27,324 INFO scheduler.DAGScheduler: ResultStage 1 (runJob at 
PythonRDD.scala:153) finished in 0.106 s
2020-07-24 09:48:27,325 INFO scheduler.DAGScheduler: Job 1 finished: runJob at 
PythonRDD.scala:153, took 0.113169 s
2020-07-24 09:48:27,448 INFO spark.ContextCleaner: Cleaned accumulator 20
2020-07-24 09:48:27,448 INFO spark.ContextCleaner: Cleaned accumulator 13
2020-07-24 09:48:27,449 INFO spark.ContextCleaner: Cleaned accumulator 3
2020-07-24 09:48:27,449 INFO spark.ContextCleaner: Cleaned accumulator 8
2020-07-24 09:48:27,449 INFO spark.ContextCleaner: Cleaned accumulator 7
2020-07-24 09:48:27,449 INFO spark.ContextCleaner: Cleaned accumulator 10
2020-07-24 09:48:27,449 INFO spark.ContextCleaner: Cleaned accumulator 4
2020-07-24 09:48:27,449 INFO spark.ContextCleaner: Cleaned accumulator 6
2020-07-24 09:48:27,449 INFO spark.ContextCleaner: Cleaned accumulator 11
2020-07-24 09:48:27,449 INFO spark.ContextCleaner: Cleaned accumulator 5
2020-07-24 09:48:27,449 INFO spark.ContextCleaner: Cleaned accumulator 25
py4j.protocol.Py4JJavaError: An error occurred while calling 
o23.awaitTermination.
: org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
  File "/home/ubuntu/spark/python/lib/pyspark.zip/pyspark/streaming/util.py", 
line 68, in call
    r = self.func(t, *rdds)
  File 
"/home/ubuntu/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 
173, in takeAndPrint
    taken = rdd.take(num + 1)
  File "/home/ubuntu/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1360, 
in take
    res = self.context.runJob(self, takeUpToNumLeft, p)
  File "/home/ubuntu/spark/python/lib/pyspark.zip/pyspark/context.py", line 
1069, in runJob
    sock_info = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, 
partitions)
  File 
"/home/ubuntu/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 
1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/home/ubuntu/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", 
line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling 
z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 
3, 172.31.79.221, executor 1): java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
at 
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:197)
at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:213)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21)
at 
org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
at 
org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561)
at 
org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
at 
org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195)

Driver stacktrace:
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
at scala.Option.foreach(Option.scala:257)
at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:153)
at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
at 
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:197)
at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:213)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21)
at 
org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
at 
org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561)
at 
org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
at 
org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195)


at 
org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95)
at 
org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
at 
org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
at 
org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at 
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

2020-07-24 09:48:27,475 INFO storage.BlockManagerInfo: Removed 
broadcast_1_piece0 on ip-172-31-69-46.ec2.internal:41579 in memory (size: 4.0 
KB, free: 366.3 MB)
2020-07-24 09:48:27,477 INFO storage.BlockManagerInfo: Removed 
broadcast_1_piece0 on 172.31.69.185:44675 in memory (size: 4.0 KB, free: 366.3 
MB)
2020-07-24 09:48:27,509 INFO spark.ContextCleaner: Cleaned accumulator 15
2020-07-24 09:48:27,509 INFO spark.ContextCleaner: Cleaned accumulator 23
2020-07-24 09:48:27,509 INFO spark.ContextCleaner: Cleaned accumulator 21
2020-07-24 09:48:27,509 INFO spark.ContextCleaner: Cleaned accumulator 12
2020-07-24 09:48:27,511 INFO spark.ContextCleaner: Cleaned accumulator 2
2020-07-24 09:48:27,514 INFO spark.ContextCleaner: Cleaned accumulator 18
2020-07-24 09:48:27,514 INFO spark.ContextCleaner: Cleaned accumulator 1
2020-07-24 09:48:27,514 INFO spark.ContextCleaner: Cleaned accumulator 14
2020-07-24 09:48:27,514 INFO streaming.StreamingContext: Invoking 
stop(stopGracefully=false) from shutdown hook
2020-07-24 09:48:27,511 INFO scheduler.JobScheduler: Finished job streaming job 
1595584105000 ms.0 from job set of time 1595584105000 ms
2020-07-24 09:48:27,523 INFO scheduler.ReceiverTracker: ReceiverTracker stopped
2020-07-24 09:48:27,523 INFO scheduler.JobGenerator: Stopping JobGenerator 
immediately
2020-07-24 09:48:27,524 INFO scheduler.JobScheduler: Starting job streaming job 
1595584106000 ms.0 from job set of time 1595584106000 ms
2020-07-24 09:48:27,527 INFO scheduler.JobScheduler: Finished job streaming job 
1595584106000 ms.0 from job set of time 1595584106000 ms
2020-07-24 09:48:27,528 INFO storage.BlockManagerInfo: Removed 
broadcast_0_piece0 on 172.31.69.185:44675 in memory (size: 4.0 KB, free: 366.3 
MB)
2020-07-24 09:48:27,529 INFO storage.BlockManagerInfo: Removed 
broadcast_0_piece0 on 172.31.79.221:44371 in memory (size: 4.0 KB, free: 366.3 
MB)
2020-07-24 09:48:27,530 INFO storage.BlockManagerInfo: Removed 
broadcast_0_piece0 on ip-172-31-69-46.ec2.internal:41579 in memory (size: 4.0 
KB, free: 366.3 MB)
2020-07-24 09:48:27,531 INFO scheduler.JobScheduler: Starting job streaming job 
1595584107000 ms.0 from job set of time 1595584107000 ms
2020-07-24 09:48:27,532 INFO scheduler.JobScheduler: Finished job streaming job 
1595584107000 ms.0 from job set of time 1595584107000 ms
2020-07-24 09:48:27,532 ERROR scheduler.JobScheduler: Error running job 
streaming job 1595584105000 ms.0
py4j.Py4JException: Error while sending a command.
at py4j.CallbackClient.sendCommand(CallbackClient.java:397)
at py4j.CallbackClient.sendCommand(CallbackClient.java:356)
at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:106)
at com.sun.proxy.$Proxy18.call(Unknown Source)
at 
org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:92)
at 
org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
at 
org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
at 
org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at 
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: py4j.Py4JNetworkException: Error while sending a command: null 
response: c
p2
call
L1595584105000
lo96
e

at py4j.CallbackConnection.sendCommand(CallbackConnection.java:158)
at py4j.CallbackClient.sendCommand(CallbackClient.java:384)
... 24 more
2020-07-24 09:48:27,534 ERROR scheduler.JobScheduler: Error running job 
streaming job 1595584106000 ms.0
py4j.Py4JException: Error while sending a command.
at py4j.CallbackClient.sendCommand(CallbackClient.java:397)
at py4j.CallbackClient.sendCommand(CallbackClient.java:356)
at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:106)
at com.sun.proxy.$Proxy18.call(Unknown Source)
at 
org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:92)
at 
org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
at 
org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
at 
org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at 
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: py4j.Py4JNetworkException: Error while sending a command: null 
response: c
p2
call
L1595584106000
lo113
e

at py4j.CallbackConnection.sendCommand(CallbackConnection.java:158)
at py4j.CallbackClient.sendCommand(CallbackClient.java:384)
... 24 more
2020-07-24 09:48:27,535 ERROR scheduler.JobScheduler: Error running job 
streaming job 1595584107000 ms.0
py4j.Py4JException: Cannot obtain a new communication channel
at py4j.CallbackClient.sendCommand(CallbackClient.java:380)
at py4j.CallbackClient.sendCommand(CallbackClient.java:356)
at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:106)
at com.sun.proxy.$Proxy18.call(Unknown Source)
at 
org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:92)
at 
org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
at 
org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
at 
org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at 
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2020-07-24 09:48:27,534 INFO util.RecurringTimer: Stopped timer for 
JobGenerator after time 1595584107000
2020-07-24 09:48:27,540 ERROR python.PythonDStream$$anon$1: Cannot connect to 
Python process. It's probably dead. Stopping StreamingContext.
py4j.Py4JException: Cannot obtain a new communication channel
at py4j.CallbackClient.sendCommand(CallbackClient.java:380)
at py4j.CallbackClient.sendCommand(CallbackClient.java:356)
at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:106)
at com.sun.proxy.$Proxy18.call(Unknown Source)
at 
org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:92)
at 
org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
at 
org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
at 
org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at 
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2020-07-24 09:48:27,544 INFO scheduler.JobGenerator: Stopped JobGenerator
2020-07-24 09:48:27,551 INFO scheduler.JobScheduler: Stopped JobScheduler
2020-07-24 09:48:27,556 INFO handler.ContextHandler: Stopped 
o.s.j.s.ServletContextHandler@6ad44bd5{/streaming,null,UNAVAILABLE,@Spark}
2020-07-24 09:48:27,557 INFO handler.ContextHandler: Stopped 
o.s.j.s.ServletContextHandler@68ac401f{/streaming/json,null,UNAVAILABLE,@Spark}
2020-07-24 09:48:27,558 INFO spark.ContextCleaner: Cleaned accumulator 16
2020-07-24 09:48:27,558 INFO spark.ContextCleaner: Cleaned accumulator 9
2020-07-24 09:48:27,558 INFO spark.ContextCleaner: Cleaned accumulator 19
2020-07-24 09:48:27,558 INFO spark.ContextCleaner: Cleaned accumulator 22
2020-07-24 09:48:27,558 INFO spark.ContextCleaner: Cleaned accumulator 17
2020-07-24 09:48:27,558 INFO spark.ContextCleaner: Cleaned accumulator 24
2020-07-24 09:48:27,559 INFO handler.ContextHandler: Stopped 
o.s.j.s.ServletContextHandler@11792245{/streaming/batch,null,UNAVAILABLE,@Spark}
2020-07-24 09:48:27,559 INFO handler.ContextHandler: Stopped 
o.s.j.s.ServletContextHandler@2e5d35e4{/streaming/batch/json,null,UNAVAILABLE,@Spark}
2020-07-24 09:48:27,560 INFO handler.ContextHandler: Stopped 
o.s.j.s.ServletContextHandler@2dedce2c{/static/streaming,null,UNAVAILABLE,@Spark}
2020-07-24 09:48:27,562 INFO streaming.StreamingContext: StreamingContext 
stopped successfully
2020-07-24 09:48:27,562 WARN streaming.StreamingContext: StreamingContext has 
already been stopped
2020-07-24 09:48:27,562 INFO spark.SparkContext: Invoking stop() from shutdown 
hook
2020-07-24 09:48:27,569 INFO server.AbstractConnector: Stopped 
Spark@4c8c11ce{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
2020-07-24 09:48:27,572 INFO ui.SparkUI: Stopped Spark web UI at 
http://ip-172-31-69-46.ec2.internal:4040
2020-07-24 09:48:27,575 INFO cluster.StandaloneSchedulerBackend: Shutting down 
all executors
2020-07-24 09:48:27,576 INFO 
cluster.CoarseGrainedSchedulerBackend$DriverEndpoint: Asking each executor to 
shut down
2020-07-24 09:48:27,592 INFO spark.MapOutputTrackerMasterEndpoint: 
MapOutputTrackerMasterEndpoint stopped!
2020-07-24 09:48:27,600 INFO memory.MemoryStore: MemoryStore cleared
2020-07-24 09:48:27,600 INFO storage.BlockManager: BlockManager stopped
2020-07-24 09:48:27,604 INFO storage.BlockManagerMaster: BlockManagerMaster 
stopped
2020-07-24 09:48:27,607 INFO 
scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: 
OutputCommitCoordinator stopped!
2020-07-24 09:48:27,613 INFO spark.SparkContext: Successfully stopped 
SparkContext
2020-07-24 09:48:27,614 INFO util.ShutdownHookManager: Shutdown hook called
2020-07-24 09:48:27,615 INFO util.ShutdownHookManager: Deleting directory 
/tmp/spark-85b5d4dc-0524-4148-aff4-8a77deb6ccca
2020-07-24 09:48:27,617 INFO util.ShutdownHookManager: Deleting directory 
/tmp/spark-49e4b426-544b-4cb4-a3e2-c0d98985e7b7
2020-07-24 09:48:27,620 INFO util.ShutdownHookManager: Deleting directory 
/tmp/spark-49e4b426-544b-4cb4-a3e2-c0d98985e7b7/pyspark-c0db4268-2afd-4336-bf77-9dc7257213d2
I debug everything but I don't have any idea about how to solve this problem. 
Do you have any suggestion? It could be a Kafka configuration problem?
Thanks in advance,

Davide

Reply via email to