unsubscribe

2022-03-03 Thread Ramaswamy, Muthuraman
unsubscribe


KafkaUtils.createDirectStream Not Fetching Messages with Confluent Serializers as Value Decoder.

2016-05-16 Thread Ramaswamy, Muthuraman
I am trying to consume AVRO formatted message through 
KafkaUtils.createDirectStream. I followed the listed below example (refer link) 
but the messages are not being fetched by the Stream.

http://stackoverflow.com/questions/30339636/spark-python-avro-kafka-deserialiser

Is there any code missing that I must add to make the above sample work. Say, I 
am not sure how the confluent serializers would know the avro schema info as it 
knows only the Schema Registry URL info.

Appreciate your help.

~Muthu





Re: KafkaUtils.createDirectStream Not Fetching Messages with Confluent Serializers as Value Decoder.

2016-05-16 Thread Ramaswamy, Muthuraman
Yes, I can see the messages. Also, I wrote a quick custom decoder for avro and 
it works fine for the following:

>> kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": 
>> brokers}, valueDecoder=decoder)

But, when I use the Confluent Serializers to leverage the Schema Registry 
(based on the link shown below), it doesn’t work for me. I am not sure whether 
I need to configure any more details to consume the Schema Registry. I can 
fetch the schema from the schema registry based on is Ids. The decoder method 
is not returning any values for me.

~Muthu



On 5/16/16, 10:49 AM, "Cody Koeninger"  wrote:

>Have you checked to make sure you can receive messages just using a
>byte array for value?
>
>On Mon, May 16, 2016 at 12:33 PM, Ramaswamy, Muthuraman
> wrote:
>> I am trying to consume AVRO formatted message through
>> KafkaUtils.createDirectStream. I followed the listed below example (refer
>> link) but the messages are not being fetched by the Stream.
>>
>> https://urldefense.proofpoint.com/v2/url?u=http-3A__stackoverflow.com_questions_30339636_spark-2Dpython-2Davro-2Dkafka-2Ddeserialiser&d=CwIBaQ&c=jcv3orpCsv7C4ly8-ubDob57ycZ4jvhoYZNDBA06fPk&r=NQ-dw5X8CJcqaXIvIdMUUdkL0fHDonD07FZzTY3CgiU&m=Nc-rPMFydyCrwOZuNWs2GmSL4NkN8eGoR-mkJUlkCx0&s=hwqxCKl3P4_9pKWeo1OGR134QegMRe3Xh22_WMy-5q8&e=
>>  
>>
>> Is there any code missing that I must add to make the above sample work.
>> Say, I am not sure how the confluent serializers would know the avro schema
>> info as it knows only the Schema Registry URL info.
>>
>> Appreciate your help.
>>
>> ~Muthu
>>
>>
>>


Re: KafkaUtils.createDirectStream Not Fetching Messages with Confluent Serializers as Value Decoder.

2016-05-17 Thread Ramaswamy, Muthuraman
Thank you for the input.

Apparently, I was referring to incorrect Schema Registry Server. Once the 
correct Schema Registry Server IP is used, serializer worked for me.

Thanks again,

~Muthu

From: Jan Uyttenhove mailto:j...@insidin.com>>
Reply-To: "j...@insidin.com<mailto:j...@insidin.com>" 
mailto:j...@insidin.com>>
Date: Tuesday, May 17, 2016 at 3:18 AM
To: "Ramaswamy, Muthuraman" 
mailto:muthuraman.ramasw...@viasat.com>>
Cc: spark users mailto:user@spark.apache.org>>
Subject: Re: KafkaUtils.createDirectStream Not Fetching Messages with Confluent 
Serializers as Value Decoder.

I think that if the Confluent deserializer cannot fetch the schema for the avro 
message (key and/or value), you end up with no data. You should check the logs 
of the Schemaregistry, it should show the HTTP requests it receives so you can 
check if the deserializer can connect to it and if so, what the response code 
looks like.

If you use the Confluent serializer, each avro message is first serialized and 
afterwards the schema id is added to it. This way, the Confluent deserializer 
can fetch the schema id first and use it to lookup the schema in the 
Schemaregistry.


On Tue, May 17, 2016 at 2:19 AM, Ramaswamy, Muthuraman 
mailto:muthuraman.ramasw...@viasat.com>> wrote:
Yes, I can see the messages. Also, I wrote a quick custom decoder for avro and 
it works fine for the following:

>> kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": 
>> brokers}, valueDecoder=decoder)

But, when I use the Confluent Serializers to leverage the Schema Registry 
(based on the link shown below), it doesn’t work for me. I am not sure whether 
I need to configure any more details to consume the Schema Registry. I can 
fetch the schema from the schema registry based on is Ids. The decoder method 
is not returning any values for me.

~Muthu



On 5/16/16, 10:49 AM, "Cody Koeninger" 
mailto:c...@koeninger.org>> wrote:

>Have you checked to make sure you can receive messages just using a
>byte array for value?
>
>On Mon, May 16, 2016 at 12:33 PM, Ramaswamy, Muthuraman
>mailto:muthuraman.ramasw...@viasat.com>> 
>wrote:
>> I am trying to consume AVRO formatted message through
>> KafkaUtils.createDirectStream. I followed the listed below example (refer
>> link) but the messages are not being fetched by the Stream.
>>
>> https://urldefense.proofpoint.com/v2/url?u=http-3A__stackoverflow.com_questions_30339636_spark-2Dpython-2Davro-2Dkafka-2Ddeserialiser&d=CwIBaQ&c=jcv3orpCsv7C4ly8-ubDob57ycZ4jvhoYZNDBA06fPk&r=NQ-dw5X8CJcqaXIvIdMUUdkL0fHDonD07FZzTY3CgiU&m=Nc-rPMFydyCrwOZuNWs2GmSL4NkN8eGoR-mkJUlkCx0&s=hwqxCKl3P4_9pKWeo1OGR134QegMRe3Xh22_WMy-5q8&e=
>>
>> Is there any code missing that I must add to make the above sample work.
>> Say, I am not sure how the confluent serializers would know the avro schema
>> info as it knows only the Schema Registry URL info.
>>
>> Appreciate your help.
>>
>> ~Muthu
>>
>>
>>



--
Jan Uyttenhove
Streaming data & digital solutions architect @ Insidin bvba

j...@insidin.com<mailto:j...@insidin.com>
+32 474 56 24 39

https://twitter.com/xorto<https://urldefense.proofpoint.com/v2/url?u=https-3A__twitter.com_xorto&d=CwMFaQ&c=jcv3orpCsv7C4ly8-ubDob57ycZ4jvhoYZNDBA06fPk&r=NQ-dw5X8CJcqaXIvIdMUUdkL0fHDonD07FZzTY3CgiU&m=NHdl0YMOAR5ds_GMw-e5RQ4GX7_2pz0JGCveDNHYpcg&s=IgEUqhGm729eTbbVDC3X1RqxdU286XK5-V-21ir25VM&e=>
https://www.linkedin.com/in/januyttenhove<https://urldefense.proofpoint.com/v2/url?u=https-3A__www.linkedin.com_in_januyttenhove&d=CwMFaQ&c=jcv3orpCsv7C4ly8-ubDob57ycZ4jvhoYZNDBA06fPk&r=NQ-dw5X8CJcqaXIvIdMUUdkL0fHDonD07FZzTY3CgiU&m=NHdl0YMOAR5ds_GMw-e5RQ4GX7_2pz0JGCveDNHYpcg&s=dPIFPsJrLbWkKrnnLpOhcFZ9zHgBN1LLRC0SKQGFz6U&e=>

This e-mail and any files transmitted with it are intended solely for the use 
of the individual or entity to whom they are addressed. It may contain 
privileged and confidential information. If you are not the intended recipient 
please notify the sender immediately and destroy this e-mail. Any form of 
reproduction, dissemination, copying, disclosure, modification, distribution 
and/or publication of this e-mail message is strictly prohibited. Whilst all 
efforts are made to safeguard e-mails, the sender cannot guarantee that 
attachments are virus free or compatible with your systems and does not accept 
liability in respect of viruses or computer problems experienced.


RE: KafkaUtils.createDirectStream Not Fetching Messages with Confluent Serializers as Value Decoder.

2016-05-18 Thread Ramaswamy, Muthuraman
I am using Spark 1.6.1 and Kafka 0.9+ It works for both receiver and 
receiver-less mode.

One thing I noticed when you specify invalid topic name, KafkaUtils doesn't 
fetch any messages. So, check you have specified the topic name correctly.

~Muthu

From: Mail.com [pradeep.mi...@mail.com]
Sent: Monday, May 16, 2016 9:33 PM
To: Ramaswamy, Muthuraman
Cc: Cody Koeninger; spark users
Subject: Re: KafkaUtils.createDirectStream Not Fetching Messages with Confluent 
Serializers as Value Decoder.

Hi Muthu,

Are you on spark 1.4.1 and Kafka 0.8.2? I have a similar issue even for simple 
string messages.

Console producer and consumer work fine. But spark always reruns empty RDD. I 
am using Receiver based Approach.

Thanks,
Pradeep

> On May 16, 2016, at 8:19 PM, Ramaswamy, Muthuraman 
>  wrote:
>
> Yes, I can see the messages. Also, I wrote a quick custom decoder for avro 
> and it works fine for the following:
>
>>> kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": 
>>> brokers}, valueDecoder=decoder)
>
> But, when I use the Confluent Serializers to leverage the Schema Registry 
> (based on the link shown below), it doesn’t work for me. I am not sure 
> whether I need to configure any more details to consume the Schema Registry. 
> I can fetch the schema from the schema registry based on is Ids. The decoder 
> method is not returning any values for me.
>
> ~Muthu
>
>
>
>> On 5/16/16, 10:49 AM, "Cody Koeninger"  wrote:
>>
>> Have you checked to make sure you can receive messages just using a
>> byte array for value?
>>
>> On Mon, May 16, 2016 at 12:33 PM, Ramaswamy, Muthuraman
>>  wrote:
>>> I am trying to consume AVRO formatted message through
>>> KafkaUtils.createDirectStream. I followed the listed below example (refer
>>> link) but the messages are not being fetched by the Stream.
>>>
>>> https://urldefense.proofpoint.com/v2/url?u=http-3A__stackoverflow.com_questions_30339636_spark-2Dpython-2Davro-2Dkafka-2Ddeserialiser&d=CwIBaQ&c=jcv3orpCsv7C4ly8-ubDob57ycZ4jvhoYZNDBA06fPk&r=NQ-dw5X8CJcqaXIvIdMUUdkL0fHDonD07FZzTY3CgiU&m=Nc-rPMFydyCrwOZuNWs2GmSL4NkN8eGoR-mkJUlkCx0&s=hwqxCKl3P4_9pKWeo1OGR134QegMRe3Xh22_WMy-5q8&e=
>>>
>>> Is there any code missing that I must add to make the above sample work.
>>> Say, I am not sure how the confluent serializers would know the avro schema
>>> info as it knows only the Schema Registry URL info.
>>>
>>> Appreciate your help.
>>>
>>> ~Muthu
> ?B‹CB•?È?[œÝXœØÜšX™K??K[XZ[?ˆ?\Ù\‹][œÝXœØÜšX™P?Ü?\šË˜\?XÚ?K›Ü™ÃB‘›Üˆ?Y??]?[Û˜[??ÛÛ[X[™?Ë??K[XZ[?ˆ?\Ù\‹Z?[???Ü?\šË˜\?XÚ?K›Ü™ÃBƒB

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: KafkaUtils.createDirectStream Not Fetching Messages with Confluent Serializers as Value Decoder.

2016-05-20 Thread Ramaswamy, Muthuraman
No, I haven’t enabled Kerberos. Just the calls as specified in the stack 
overflow thread on how to use the schema registry based serializer.

~Muthu




On 5/19/16, 5:25 PM, "Mail.com"  wrote:

>Hi Muthu,
>
>Do you have Kerberos enabled?
>
>Thanks,
>Pradeep
>
>> On May 19, 2016, at 12:17 AM, Ramaswamy, Muthuraman 
>>  wrote:
>> 
>> I am using Spark 1.6.1 and Kafka 0.9+ It works for both receiver and 
>> receiver-less mode.
>> 
>> One thing I noticed when you specify invalid topic name, KafkaUtils doesn't 
>> fetch any messages. So, check you have specified the topic name correctly.
>> 
>> ~Muthu
>> ________
>> From: Mail.com [pradeep.mi...@mail.com]
>> Sent: Monday, May 16, 2016 9:33 PM
>> To: Ramaswamy, Muthuraman
>> Cc: Cody Koeninger; spark users
>> Subject: Re: KafkaUtils.createDirectStream Not Fetching Messages with 
>> Confluent Serializers as Value Decoder.
>> 
>> Hi Muthu,
>> 
>> Are you on spark 1.4.1 and Kafka 0.8.2? I have a similar issue even for 
>> simple string messages.
>> 
>> Console producer and consumer work fine. But spark always reruns empty RDD. 
>> I am using Receiver based Approach.
>> 
>> Thanks,
>> Pradeep
>> 
>>> On May 16, 2016, at 8:19 PM, Ramaswamy, Muthuraman 
>>>  wrote:
>>> 
>>> Yes, I can see the messages. Also, I wrote a quick custom decoder for avro 
>>> and it works fine for the following:
>>> 
>>>>> kvs = KafkaUtils.createDirectStream(ssc, [topic], 
>>>>> {"metadata.broker.list": brokers}, valueDecoder=decoder)
>>> 
>>> But, when I use the Confluent Serializers to leverage the Schema Registry 
>>> (based on the link shown below), it doesn’t work for me. I am not sure 
>>> whether I need to configure any more details to consume the Schema 
>>> Registry. I can fetch the schema from the schema registry based on is Ids. 
>>> The decoder method is not returning any values for me.
>>> 
>>> ~Muthu
>>> 
>>> 
>>> 
>>>> On 5/16/16, 10:49 AM, "Cody Koeninger"  wrote:
>>>> 
>>>> Have you checked to make sure you can receive messages just using a
>>>> byte array for value?
>>>> 
>>>> On Mon, May 16, 2016 at 12:33 PM, Ramaswamy, Muthuraman
>>>>  wrote:
>>>>> I am trying to consume AVRO formatted message through
>>>>> KafkaUtils.createDirectStream. I followed the listed below example (refer
>>>>> link) but the messages are not being fetched by the Stream.
>>>>> 
>>>>> https://urldefense.proofpoint.com/v2/url?u=http-3A__stackoverflow.com_questions_30339636_spark-2Dpython-2Davro-2Dkafka-2Ddeserialiser&d=CwIBaQ&c=jcv3orpCsv7C4ly8-ubDob57ycZ4jvhoYZNDBA06fPk&r=NQ-dw5X8CJcqaXIvIdMUUdkL0fHDonD07FZzTY3CgiU&m=Nc-rPMFydyCrwOZuNWs2GmSL4NkN8eGoR-mkJUlkCx0&s=hwqxCKl3P4_9pKWeo1OGR134QegMRe3Xh22_WMy-5q8&e=
>>>>> 
>>>>> Is there any code missing that I must add to make the above sample work.
>>>>> Say, I am not sure how the confluent serializers would know the avro 
>>>>> schema
>>>>> info as it knows only the Schema Registry URL info.
>>>>> 
>>>>> Appreciate your help.
>>>>> 
>>>>> ~Muthu
>>> ?B‹CB•?È?[œÝXœØÜšX™K??K[XZ[?ˆ?\Ù\‹][œÝXœØÜšX™P?Ü?\šË˜\?XÚ?K›Ü™ÃB‘›Üˆ?Y??]?[Û˜[??ÛÛ[X[™?Ë??K[XZ[?ˆ?\Ù\‹Z?[???Ü?\šË˜\?XÚ?K›Ü™ÃBƒB
>> 
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>> 


PySpark Structured Streaming : Writing to DB in Python and Foreach Sink.

2018-03-27 Thread Ramaswamy, Muthuraman
Hi All,

I am exploring PySpark Structured Streaming and the documentation says the 
Foreach Sink is not supported in Python and is available only with Java/Scala. 
Given the unavailability of this sink, what options are there for the following:


  1.  Will there be support for Foreach Sink in Python in future Spark 
Structured Streaming release?
  2.  What options are there to write streaming query output to Database?
 *   In other words, the streaming query output should be written to a 
database at every trigger interval
 *   I cannot use Memory sink as it is recommended for use only with Debug.

Any suggestions to write to database in PySpark Structured Streaming will help. 
Appreciate your time.

Thank you,

Muthu Ramaswamy


Structured Streaming : Custom Source and Sink Development and PySpark.

2018-08-30 Thread Ramaswamy, Muthuraman
I would like to develop Custom Source and Sink. So, I have a couple of 
questions:


  1.  Do I have to use Scala or Java to develop these Custom Source/Sink?


  1.  Also, once the source/sink has been developed, to use in PySpark/Python, 
do I have to develop any Py4J modules? Any pointers or good documentation or 
GitHub Source as a reference will be of great help.

Please advise.

Thank you,





PySpark Streaming : Accessing the Remote Secured Kafka

2018-10-09 Thread Ramaswamy, Muthuraman
All,

Currently, I am using PySpark Streaming (Classic Regular DStream Style and not 
Structured Streaming). Now, our remote Kafka is secured with Kerberos.

To enable PySpark Streaming to access the secured Kafka, what steps I should 
perform? Can I pass the principal/keytab and jaas config in the Spark Submit 
command? What are my options? Any pointers/links to access the secured Kafka 
broker from PySpark Streaming will be helpful.

Appreciate your time.

Thank you,

~Muthu




PySpark Streaming and Secured Kafka.

2018-11-19 Thread Ramaswamy, Muthuraman
Hi All,

I would like to use PySpark Streaming with secured Kafka as the source stream.

What options or arguments that I should pass in spark-submit command?

A sample spark-submit command with all the required options/arguments to access 
a remote-secured Kafka will help.

Thank you,

~Muthu R




PySpark Direct Streaming : SASL Security Compatibility Issue

2018-11-27 Thread Ramaswamy, Muthuraman
Hi All,

I am using PySpark Direct Streaming to connect to a remote secured Kafka broker 
and is secured with Kerberos Authentication. The KafkaUtils.createDirectStream 
python call gives me the following error:

18/11/27 18:20:05 WARN VerifiableProperties: Property sasl.mechanism is not 
valid
18/11/27 18:20:05 WARN VerifiableProperties: Property security.protocol is not 
valid

Does spark-streaming-kafka-0-8_2.11 library support SASL_PLAINTEXT security 
protocol? Is there any compatibility issue with 0.8 version of the package?

Appreciate your help to resolve the security issue.

Thank you,

Muthu R