KARTHIKEYAN RASIPALAYAM DURAIRAJ created SPARK-25014:
--------------------------------------------------------

             Summary: When we tried to read kafka topic through spark streaming 
spark submit is getting failed with Python worker exited unexpectedly (crashed) 
error 
                 Key: SPARK-25014
                 URL: https://issues.apache.org/jira/browse/SPARK-25014
             Project: Spark
          Issue Type: Bug
          Components: PySpark
    Affects Versions: 2.3.1
            Reporter: KARTHIKEYAN RASIPALAYAM DURAIRAJ
             Fix For: 2.3.2


Hi Team , 

 

TOPIC = 'NBC_APPS.TBL_MS_ADVERTISER'

PARTITION = 0

topicAndPartition = TopicAndPartition(TOPIC, PARTITION)

fromOffsets1 = \{topicAndPartition:int(PARTITION)}

 

def handler(message):

    records = message.collect()

    for record in records:

        value_all=record[1]

        value_key=record[0]

#        print(value_all)

 

schema_registry_client = CachedSchemaRegistryClient(url='http://localhost:8081')

serializer = MessageSerializer(schema_registry_client)

sc = SparkContext(appName="PythonStreamingAvro")

ssc = StreamingContext(sc, 10)

kvs = KafkaUtils.createDirectStream(ssc, ['NBC_APPS.TBL_MS_ADVERTISER'], 
\{"metadata.broker.list": 
'localhost:9092'},valueDecoder=serializer.decode_message)

lines = kvs.map(lambda x: x[1])

lines.pprint()

kvs.foreachRDD(handler)

 

ssc.start()

ssc.awaitTermination()

 

This is code we trying to pull the data from kafka topic . when we execute 
through spark submit we are getting below error 

 

 

2018-08-03 11:10:40 INFO  VerifiableProperties:68 - Property zookeeper.connect 
is overridden to 

2018-08-03 11:10:40 ERROR PythonRunner:91 - Python worker exited unexpectedly 
(crashed)

org.apache.spark.api.python.PythonException: Traceback (most recent call last):

  File 
"/Users/KarthikeyanDurairaj/Desktop/Sparkshell/python/lib/pyspark.zip/pyspark/worker.py",
 line 215, in main

    eval_type = read_int(infile)

  File 
"/Users/KarthikeyanDurairaj/Desktop/Sparkshell/python/lib/pyspark.zip/pyspark/serializers.py",
 line 685, in read_int

    raise EOFError

EOFError

 

 at 
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)

 at 
org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)

 at 
org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)

 at 
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to