unsubscribe
unsubscribe
KafkaUtils.createDirectStream Not Fetching Messages with Confluent Serializers as Value Decoder.
I am trying to consume AVRO formatted message through KafkaUtils.createDirectStream. I followed the listed below example (refer link) but the messages are not being fetched by the Stream. http://stackoverflow.com/questions/30339636/spark-python-avro-kafka-deserialiser Is there any code missing that I must add to make the above sample work. Say, I am not sure how the confluent serializers would know the avro schema info as it knows only the Schema Registry URL info. Appreciate your help. ~Muthu
Re: KafkaUtils.createDirectStream Not Fetching Messages with Confluent Serializers as Value Decoder.
Yes, I can see the messages. Also, I wrote a quick custom decoder for avro and it works fine for the following: >> kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": >> brokers}, valueDecoder=decoder) But, when I use the Confluent Serializers to leverage the Schema Registry (based on the link shown below), it doesn’t work for me. I am not sure whether I need to configure any more details to consume the Schema Registry. I can fetch the schema from the schema registry based on is Ids. The decoder method is not returning any values for me. ~Muthu On 5/16/16, 10:49 AM, "Cody Koeninger" wrote: >Have you checked to make sure you can receive messages just using a >byte array for value? > >On Mon, May 16, 2016 at 12:33 PM, Ramaswamy, Muthuraman > wrote: >> I am trying to consume AVRO formatted message through >> KafkaUtils.createDirectStream. I followed the listed below example (refer >> link) but the messages are not being fetched by the Stream. >> >> https://urldefense.proofpoint.com/v2/url?u=http-3A__stackoverflow.com_questions_30339636_spark-2Dpython-2Davro-2Dkafka-2Ddeserialiser&d=CwIBaQ&c=jcv3orpCsv7C4ly8-ubDob57ycZ4jvhoYZNDBA06fPk&r=NQ-dw5X8CJcqaXIvIdMUUdkL0fHDonD07FZzTY3CgiU&m=Nc-rPMFydyCrwOZuNWs2GmSL4NkN8eGoR-mkJUlkCx0&s=hwqxCKl3P4_9pKWeo1OGR134QegMRe3Xh22_WMy-5q8&e= >> >> >> Is there any code missing that I must add to make the above sample work. >> Say, I am not sure how the confluent serializers would know the avro schema >> info as it knows only the Schema Registry URL info. >> >> Appreciate your help. >> >> ~Muthu >> >> >>
Re: KafkaUtils.createDirectStream Not Fetching Messages with Confluent Serializers as Value Decoder.
Thank you for the input. Apparently, I was referring to incorrect Schema Registry Server. Once the correct Schema Registry Server IP is used, serializer worked for me. Thanks again, ~Muthu From: Jan Uyttenhove mailto:j...@insidin.com>> Reply-To: "j...@insidin.com<mailto:j...@insidin.com>" mailto:j...@insidin.com>> Date: Tuesday, May 17, 2016 at 3:18 AM To: "Ramaswamy, Muthuraman" mailto:muthuraman.ramasw...@viasat.com>> Cc: spark users mailto:user@spark.apache.org>> Subject: Re: KafkaUtils.createDirectStream Not Fetching Messages with Confluent Serializers as Value Decoder. I think that if the Confluent deserializer cannot fetch the schema for the avro message (key and/or value), you end up with no data. You should check the logs of the Schemaregistry, it should show the HTTP requests it receives so you can check if the deserializer can connect to it and if so, what the response code looks like. If you use the Confluent serializer, each avro message is first serialized and afterwards the schema id is added to it. This way, the Confluent deserializer can fetch the schema id first and use it to lookup the schema in the Schemaregistry. On Tue, May 17, 2016 at 2:19 AM, Ramaswamy, Muthuraman mailto:muthuraman.ramasw...@viasat.com>> wrote: Yes, I can see the messages. Also, I wrote a quick custom decoder for avro and it works fine for the following: >> kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": >> brokers}, valueDecoder=decoder) But, when I use the Confluent Serializers to leverage the Schema Registry (based on the link shown below), it doesn’t work for me. I am not sure whether I need to configure any more details to consume the Schema Registry. I can fetch the schema from the schema registry based on is Ids. The decoder method is not returning any values for me. ~Muthu On 5/16/16, 10:49 AM, "Cody Koeninger" mailto:c...@koeninger.org>> wrote: >Have you checked to make sure you can receive messages just using a >byte array for value? > >On Mon, May 16, 2016 at 12:33 PM, Ramaswamy, Muthuraman >mailto:muthuraman.ramasw...@viasat.com>> >wrote: >> I am trying to consume AVRO formatted message through >> KafkaUtils.createDirectStream. I followed the listed below example (refer >> link) but the messages are not being fetched by the Stream. >> >> https://urldefense.proofpoint.com/v2/url?u=http-3A__stackoverflow.com_questions_30339636_spark-2Dpython-2Davro-2Dkafka-2Ddeserialiser&d=CwIBaQ&c=jcv3orpCsv7C4ly8-ubDob57ycZ4jvhoYZNDBA06fPk&r=NQ-dw5X8CJcqaXIvIdMUUdkL0fHDonD07FZzTY3CgiU&m=Nc-rPMFydyCrwOZuNWs2GmSL4NkN8eGoR-mkJUlkCx0&s=hwqxCKl3P4_9pKWeo1OGR134QegMRe3Xh22_WMy-5q8&e= >> >> Is there any code missing that I must add to make the above sample work. >> Say, I am not sure how the confluent serializers would know the avro schema >> info as it knows only the Schema Registry URL info. >> >> Appreciate your help. >> >> ~Muthu >> >> >> -- Jan Uyttenhove Streaming data & digital solutions architect @ Insidin bvba j...@insidin.com<mailto:j...@insidin.com> +32 474 56 24 39 https://twitter.com/xorto<https://urldefense.proofpoint.com/v2/url?u=https-3A__twitter.com_xorto&d=CwMFaQ&c=jcv3orpCsv7C4ly8-ubDob57ycZ4jvhoYZNDBA06fPk&r=NQ-dw5X8CJcqaXIvIdMUUdkL0fHDonD07FZzTY3CgiU&m=NHdl0YMOAR5ds_GMw-e5RQ4GX7_2pz0JGCveDNHYpcg&s=IgEUqhGm729eTbbVDC3X1RqxdU286XK5-V-21ir25VM&e=> https://www.linkedin.com/in/januyttenhove<https://urldefense.proofpoint.com/v2/url?u=https-3A__www.linkedin.com_in_januyttenhove&d=CwMFaQ&c=jcv3orpCsv7C4ly8-ubDob57ycZ4jvhoYZNDBA06fPk&r=NQ-dw5X8CJcqaXIvIdMUUdkL0fHDonD07FZzTY3CgiU&m=NHdl0YMOAR5ds_GMw-e5RQ4GX7_2pz0JGCveDNHYpcg&s=dPIFPsJrLbWkKrnnLpOhcFZ9zHgBN1LLRC0SKQGFz6U&e=> This e-mail and any files transmitted with it are intended solely for the use of the individual or entity to whom they are addressed. It may contain privileged and confidential information. If you are not the intended recipient please notify the sender immediately and destroy this e-mail. Any form of reproduction, dissemination, copying, disclosure, modification, distribution and/or publication of this e-mail message is strictly prohibited. Whilst all efforts are made to safeguard e-mails, the sender cannot guarantee that attachments are virus free or compatible with your systems and does not accept liability in respect of viruses or computer problems experienced.
RE: KafkaUtils.createDirectStream Not Fetching Messages with Confluent Serializers as Value Decoder.
I am using Spark 1.6.1 and Kafka 0.9+ It works for both receiver and receiver-less mode. One thing I noticed when you specify invalid topic name, KafkaUtils doesn't fetch any messages. So, check you have specified the topic name correctly. ~Muthu From: Mail.com [pradeep.mi...@mail.com] Sent: Monday, May 16, 2016 9:33 PM To: Ramaswamy, Muthuraman Cc: Cody Koeninger; spark users Subject: Re: KafkaUtils.createDirectStream Not Fetching Messages with Confluent Serializers as Value Decoder. Hi Muthu, Are you on spark 1.4.1 and Kafka 0.8.2? I have a similar issue even for simple string messages. Console producer and consumer work fine. But spark always reruns empty RDD. I am using Receiver based Approach. Thanks, Pradeep > On May 16, 2016, at 8:19 PM, Ramaswamy, Muthuraman > wrote: > > Yes, I can see the messages. Also, I wrote a quick custom decoder for avro > and it works fine for the following: > >>> kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": >>> brokers}, valueDecoder=decoder) > > But, when I use the Confluent Serializers to leverage the Schema Registry > (based on the link shown below), it doesn’t work for me. I am not sure > whether I need to configure any more details to consume the Schema Registry. > I can fetch the schema from the schema registry based on is Ids. The decoder > method is not returning any values for me. > > ~Muthu > > > >> On 5/16/16, 10:49 AM, "Cody Koeninger" wrote: >> >> Have you checked to make sure you can receive messages just using a >> byte array for value? >> >> On Mon, May 16, 2016 at 12:33 PM, Ramaswamy, Muthuraman >> wrote: >>> I am trying to consume AVRO formatted message through >>> KafkaUtils.createDirectStream. I followed the listed below example (refer >>> link) but the messages are not being fetched by the Stream. >>> >>> https://urldefense.proofpoint.com/v2/url?u=http-3A__stackoverflow.com_questions_30339636_spark-2Dpython-2Davro-2Dkafka-2Ddeserialiser&d=CwIBaQ&c=jcv3orpCsv7C4ly8-ubDob57ycZ4jvhoYZNDBA06fPk&r=NQ-dw5X8CJcqaXIvIdMUUdkL0fHDonD07FZzTY3CgiU&m=Nc-rPMFydyCrwOZuNWs2GmSL4NkN8eGoR-mkJUlkCx0&s=hwqxCKl3P4_9pKWeo1OGR134QegMRe3Xh22_WMy-5q8&e= >>> >>> Is there any code missing that I must add to make the above sample work. >>> Say, I am not sure how the confluent serializers would know the avro schema >>> info as it knows only the Schema Registry URL info. >>> >>> Appreciate your help. >>> >>> ~Muthu > ?B‹CB•?È?[œÝXœØÜšX™K??K[XZ[?ˆ?\Ù\‹][œÝXœØÜšX™P?Ü?\šË˜\?XÚ?K›Ü™ÃB‘›Üˆ?Y??]?[Û˜[??ÛÛ[X[™?Ë??K[XZ[?ˆ?\Ù\‹Z?[???Ü?\šË˜\?XÚ?K›Ü™ÃBƒB - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: KafkaUtils.createDirectStream Not Fetching Messages with Confluent Serializers as Value Decoder.
No, I haven’t enabled Kerberos. Just the calls as specified in the stack overflow thread on how to use the schema registry based serializer. ~Muthu On 5/19/16, 5:25 PM, "Mail.com" wrote: >Hi Muthu, > >Do you have Kerberos enabled? > >Thanks, >Pradeep > >> On May 19, 2016, at 12:17 AM, Ramaswamy, Muthuraman >> wrote: >> >> I am using Spark 1.6.1 and Kafka 0.9+ It works for both receiver and >> receiver-less mode. >> >> One thing I noticed when you specify invalid topic name, KafkaUtils doesn't >> fetch any messages. So, check you have specified the topic name correctly. >> >> ~Muthu >> ________ >> From: Mail.com [pradeep.mi...@mail.com] >> Sent: Monday, May 16, 2016 9:33 PM >> To: Ramaswamy, Muthuraman >> Cc: Cody Koeninger; spark users >> Subject: Re: KafkaUtils.createDirectStream Not Fetching Messages with >> Confluent Serializers as Value Decoder. >> >> Hi Muthu, >> >> Are you on spark 1.4.1 and Kafka 0.8.2? I have a similar issue even for >> simple string messages. >> >> Console producer and consumer work fine. But spark always reruns empty RDD. >> I am using Receiver based Approach. >> >> Thanks, >> Pradeep >> >>> On May 16, 2016, at 8:19 PM, Ramaswamy, Muthuraman >>> wrote: >>> >>> Yes, I can see the messages. Also, I wrote a quick custom decoder for avro >>> and it works fine for the following: >>> >>>>> kvs = KafkaUtils.createDirectStream(ssc, [topic], >>>>> {"metadata.broker.list": brokers}, valueDecoder=decoder) >>> >>> But, when I use the Confluent Serializers to leverage the Schema Registry >>> (based on the link shown below), it doesn’t work for me. I am not sure >>> whether I need to configure any more details to consume the Schema >>> Registry. I can fetch the schema from the schema registry based on is Ids. >>> The decoder method is not returning any values for me. >>> >>> ~Muthu >>> >>> >>> >>>> On 5/16/16, 10:49 AM, "Cody Koeninger" wrote: >>>> >>>> Have you checked to make sure you can receive messages just using a >>>> byte array for value? >>>> >>>> On Mon, May 16, 2016 at 12:33 PM, Ramaswamy, Muthuraman >>>> wrote: >>>>> I am trying to consume AVRO formatted message through >>>>> KafkaUtils.createDirectStream. I followed the listed below example (refer >>>>> link) but the messages are not being fetched by the Stream. >>>>> >>>>> https://urldefense.proofpoint.com/v2/url?u=http-3A__stackoverflow.com_questions_30339636_spark-2Dpython-2Davro-2Dkafka-2Ddeserialiser&d=CwIBaQ&c=jcv3orpCsv7C4ly8-ubDob57ycZ4jvhoYZNDBA06fPk&r=NQ-dw5X8CJcqaXIvIdMUUdkL0fHDonD07FZzTY3CgiU&m=Nc-rPMFydyCrwOZuNWs2GmSL4NkN8eGoR-mkJUlkCx0&s=hwqxCKl3P4_9pKWeo1OGR134QegMRe3Xh22_WMy-5q8&e= >>>>> >>>>> Is there any code missing that I must add to make the above sample work. >>>>> Say, I am not sure how the confluent serializers would know the avro >>>>> schema >>>>> info as it knows only the Schema Registry URL info. >>>>> >>>>> Appreciate your help. >>>>> >>>>> ~Muthu >>> ?B‹CB•?È?[œÝXœØÜšX™K??K[XZ[?ˆ?\Ù\‹][œÝXœØÜšX™P?Ü?\šË˜\?XÚ?K›Ü™ÃB‘›Üˆ?Y??]?[Û˜[??ÛÛ[X[™?Ë??K[XZ[?ˆ?\Ù\‹Z?[???Ü?\šË˜\?XÚ?K›Ü™ÃBƒB >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >>
PySpark Structured Streaming : Writing to DB in Python and Foreach Sink.
Hi All, I am exploring PySpark Structured Streaming and the documentation says the Foreach Sink is not supported in Python and is available only with Java/Scala. Given the unavailability of this sink, what options are there for the following: 1. Will there be support for Foreach Sink in Python in future Spark Structured Streaming release? 2. What options are there to write streaming query output to Database? * In other words, the streaming query output should be written to a database at every trigger interval * I cannot use Memory sink as it is recommended for use only with Debug. Any suggestions to write to database in PySpark Structured Streaming will help. Appreciate your time. Thank you, Muthu Ramaswamy
Structured Streaming : Custom Source and Sink Development and PySpark.
I would like to develop Custom Source and Sink. So, I have a couple of questions: 1. Do I have to use Scala or Java to develop these Custom Source/Sink? 1. Also, once the source/sink has been developed, to use in PySpark/Python, do I have to develop any Py4J modules? Any pointers or good documentation or GitHub Source as a reference will be of great help. Please advise. Thank you,
PySpark Streaming : Accessing the Remote Secured Kafka
All, Currently, I am using PySpark Streaming (Classic Regular DStream Style and not Structured Streaming). Now, our remote Kafka is secured with Kerberos. To enable PySpark Streaming to access the secured Kafka, what steps I should perform? Can I pass the principal/keytab and jaas config in the Spark Submit command? What are my options? Any pointers/links to access the secured Kafka broker from PySpark Streaming will be helpful. Appreciate your time. Thank you, ~Muthu
PySpark Streaming and Secured Kafka.
Hi All, I would like to use PySpark Streaming with secured Kafka as the source stream. What options or arguments that I should pass in spark-submit command? A sample spark-submit command with all the required options/arguments to access a remote-secured Kafka will help. Thank you, ~Muthu R
PySpark Direct Streaming : SASL Security Compatibility Issue
Hi All, I am using PySpark Direct Streaming to connect to a remote secured Kafka broker and is secured with Kerberos Authentication. The KafkaUtils.createDirectStream python call gives me the following error: 18/11/27 18:20:05 WARN VerifiableProperties: Property sasl.mechanism is not valid 18/11/27 18:20:05 WARN VerifiableProperties: Property security.protocol is not valid Does spark-streaming-kafka-0-8_2.11 library support SASL_PLAINTEXT security protocol? Is there any compatibility issue with 0.8 version of the package? Appreciate your help to resolve the security issue. Thank you, Muthu R