Re: TypeInformation | Flink

2021-12-31 Thread Timo Walther

Hi Siddhesh,

how to use a ProcessFunction is documented here:

https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/process_function/

.process() is similar to .map() but with more Flink specific methods 
available. Anyway, a simple map() should also do the job. But the base 
architecture mentioned in Stackoverflow is valid.


Here are some resources that might help you during the development:

https://github.com/twalthr/flink-api-examples

https://github.com/ververica/flink-training (this has Scala examples)

I hope this helps.

Regards,
Timo




On 30.12.21 18:09, Siddhesh Kalgaonkar wrote:

Hi Team,

Dominik has answered the question but I am trying to debug the code but 
since I am new I am not able to understand the code. I think something 
still needs to be changed in his answer. Can somebody help me to 
understand that snippet? The user who answered is not much active it 
seems. I tried testing his answer but it didn't work as expected.


Thanks,
Sid

On Wed, Dec 29, 2021 at 10:58 PM Siddhesh Kalgaonkar 
mailto:kalgaonkarsiddh...@gmail.com>> wrote:


Hi David,

Yes, I already mentioned that I am a newbie to Flink and Scala. I am
making progress as the day progresses. I have modified my question
again. But I am not sure how to use it. Could you please correct it?
or add something if I missed something?

On Wed, Dec 29, 2021 at 10:53 PM David Morávek mailto:d...@apache.org>> wrote:

Hi Siddhesh,

You can not change the method signature when you're implementing
an interface.

I'm not really sure this belongs to the ML anymore as this is
getting more into Scala / Java fundamentals. There are some
great learning resources online for Scala [1], I'd recommend
starting from there. Also if you're are not familiar with Scala
I'd highly recommend starting with Java API first as it's way
more intuitive to use with Flink as you don't have to deal with
Scala / Java interoperability.

[1]

https://docs.scala-lang.org/scala3/book/interacting-with-java.html#extending-java-interfaces-in-scala



On Wed, Dec 29, 2021 at 5:59 PM Siddhesh Kalgaonkar
mailto:kalgaonkarsiddh...@gmail.com>> wrote:

I have modified my question based on Dominik's inputs. Can
somebody help to take it forward?

Thanks,
Siddhesh

On Wed, Dec 29, 2021 at 3:32 PM David Morávek
mailto:d...@apache.org>> wrote:

Please always try to include user@f.a.o in your reply,
so other can participate in the discussion and learn
from your findings.

I think Dominik has already given you pretty good hint.
The JSON parsing in this case is not any different as
with any other java application (with jackson / gson /
...). You can then simply split the parsed elements into
good and bad records.

D.

On Wed, Dec 29, 2021 at 10:53 AM Siddhesh Kalgaonkar
mailto:kalgaonkarsiddh...@gmail.com>> wrote:

Hi David,

Thanks for the clarification. I will check the link
you shared. Also, as mentioned by Dominik, can you
help me with the process functions. How can I use it
for my use case?

Thanks,
Siddhesh

On Wed, Dec 29, 2021 at 2:50 PM David Morávek
mailto:d...@apache.org>> wrote:

Hi Siddhesh,

it seems that the question is already being
answered in the SO thread, so let's keep the
discussion focused there.

Looking at the original question, I think it's
important to understand, that the
TypeInformation is not meant to be used for
"runtime" matching, but to address the type
erasure [1] limitation for the UDFs (user
defined functions), so Flink can pick the
correct serializer / deserializer.

[1]

https://docs.oracle.com/javase/tutorial/java/generics/erasure.html



Best,
D.

On Tue, Dec 28, 2021 at 9:21 PM Siddhesh
Kalgaonkar mailto:kalgaonkarsiddh...@gmail.com>> wrote:

Hi Team,

I am a newbie to Flink and Scala and trying

Re: TypeInformation | Flink

2021-12-30 Thread Siddhesh Kalgaonkar
Hi Team,

Dominik has answered the question but I am trying to debug the code but
since I am new I am not able to understand the code. I think something
still needs to be changed in his answer. Can somebody help me to understand
that snippet? The user who answered is not much active it seems. I tried
testing his answer but it didn't work as expected.

Thanks,
Sid

On Wed, Dec 29, 2021 at 10:58 PM Siddhesh Kalgaonkar <
kalgaonkarsiddh...@gmail.com> wrote:

> Hi David,
>
> Yes, I already mentioned that I am a newbie to Flink and Scala. I am
> making progress as the day progresses. I have modified my question again.
> But I am not sure how to use it. Could you please correct it? or add
> something if I missed something?
>
> On Wed, Dec 29, 2021 at 10:53 PM David Morávek  wrote:
>
>> Hi Siddhesh,
>>
>> You can not change the method signature when you're implementing an
>> interface.
>>
>> I'm not really sure this belongs to the ML anymore as this is getting
>> more into Scala / Java fundamentals. There are some great learning
>> resources online for Scala [1], I'd recommend starting from there. Also if
>> you're are not familiar with Scala I'd highly recommend starting with Java
>> API first as it's way more intuitive to use with Flink as you don't have to
>> deal with Scala / Java interoperability.
>>
>> [1]
>> https://docs.scala-lang.org/scala3/book/interacting-with-java.html#extending-java-interfaces-in-scala
>>
>> On Wed, Dec 29, 2021 at 5:59 PM Siddhesh Kalgaonkar <
>> kalgaonkarsiddh...@gmail.com> wrote:
>>
>>> I have modified my question based on Dominik's inputs. Can somebody help
>>> to take it forward?
>>>
>>> Thanks,
>>> Siddhesh
>>>
>>> On Wed, Dec 29, 2021 at 3:32 PM David Morávek  wrote:
>>>
 Please always try to include user@f.a.o in your reply, so other can
 participate in the discussion and learn from your findings.

 I think Dominik has already given you pretty good hint. The JSON
 parsing in this case is not any different as with any other java
 application (with jackson / gson / ...). You can then simply split the
 parsed elements into good and bad records.

 D.

 On Wed, Dec 29, 2021 at 10:53 AM Siddhesh Kalgaonkar <
 kalgaonkarsiddh...@gmail.com> wrote:

> Hi David,
>
> Thanks for the clarification. I will check the link you shared. Also,
> as mentioned by Dominik, can you help me with the process functions. How
> can I use it for my use case?
>
> Thanks,
> Siddhesh
>
> On Wed, Dec 29, 2021 at 2:50 PM David Morávek  wrote:
>
>> Hi Siddhesh,
>>
>> it seems that the question is already being answered in the SO
>> thread, so let's keep the discussion focused there.
>>
>> Looking at the original question, I think it's important to
>> understand, that the TypeInformation is not meant to be used for 
>> "runtime"
>> matching, but to address the type erasure [1] limitation for the UDFs 
>> (user
>> defined functions), so Flink can pick the correct serializer / 
>> deserializer.
>>
>> [1]
>> https://docs.oracle.com/javase/tutorial/java/generics/erasure.html
>>
>> Best,
>> D.
>>
>> On Tue, Dec 28, 2021 at 9:21 PM Siddhesh Kalgaonkar <
>> kalgaonkarsiddh...@gmail.com> wrote:
>>
>>> Hi Team,
>>>
>>> I am a newbie to Flink and Scala and trying my best to learn
>>> everything I can. I doing a practice where  I am getting incoming JSON 
>>> data
>>> from the Kafka topic and want to perform a data type check on it.
>>> For that, I came across TypeInformation of Flink. Please read my
>>> problem in detail from the below link:
>>>
>>> Flink Problem
>>> 
>>>
>>> I went through the documentation but didn't come across any relevant
>>> examples. Any suggestions would help.
>>>
>>> Looking forward to hearing from you.
>>>
>>>
>>> Thanks,
>>> Siddhesh
>>>
>>


Re: TypeInformation | Flink

2021-12-29 Thread Siddhesh Kalgaonkar
Hi David,

Yes, I already mentioned that I am a newbie to Flink and Scala. I am making
progress as the day progresses. I have modified my question again. But I am
not sure how to use it. Could you please correct it? or add something if I
missed something?

On Wed, Dec 29, 2021 at 10:53 PM David Morávek  wrote:

> Hi Siddhesh,
>
> You can not change the method signature when you're implementing an
> interface.
>
> I'm not really sure this belongs to the ML anymore as this is getting more
> into Scala / Java fundamentals. There are some great learning resources
> online for Scala [1], I'd recommend starting from there. Also if you're are
> not familiar with Scala I'd highly recommend starting with Java API first
> as it's way more intuitive to use with Flink as you don't have to deal with
> Scala / Java interoperability.
>
> [1]
> https://docs.scala-lang.org/scala3/book/interacting-with-java.html#extending-java-interfaces-in-scala
>
> On Wed, Dec 29, 2021 at 5:59 PM Siddhesh Kalgaonkar <
> kalgaonkarsiddh...@gmail.com> wrote:
>
>> I have modified my question based on Dominik's inputs. Can somebody help
>> to take it forward?
>>
>> Thanks,
>> Siddhesh
>>
>> On Wed, Dec 29, 2021 at 3:32 PM David Morávek  wrote:
>>
>>> Please always try to include user@f.a.o in your reply, so other can
>>> participate in the discussion and learn from your findings.
>>>
>>> I think Dominik has already given you pretty good hint. The JSON parsing
>>> in this case is not any different as with any other java application (with
>>> jackson / gson / ...). You can then simply split the parsed elements into
>>> good and bad records.
>>>
>>> D.
>>>
>>> On Wed, Dec 29, 2021 at 10:53 AM Siddhesh Kalgaonkar <
>>> kalgaonkarsiddh...@gmail.com> wrote:
>>>
 Hi David,

 Thanks for the clarification. I will check the link you shared. Also,
 as mentioned by Dominik, can you help me with the process functions. How
 can I use it for my use case?

 Thanks,
 Siddhesh

 On Wed, Dec 29, 2021 at 2:50 PM David Morávek  wrote:

> Hi Siddhesh,
>
> it seems that the question is already being answered in the SO thread,
> so let's keep the discussion focused there.
>
> Looking at the original question, I think it's important to
> understand, that the TypeInformation is not meant to be used for "runtime"
> matching, but to address the type erasure [1] limitation for the UDFs 
> (user
> defined functions), so Flink can pick the correct serializer / 
> deserializer.
>
> [1] https://docs.oracle.com/javase/tutorial/java/generics/erasure.html
>
> Best,
> D.
>
> On Tue, Dec 28, 2021 at 9:21 PM Siddhesh Kalgaonkar <
> kalgaonkarsiddh...@gmail.com> wrote:
>
>> Hi Team,
>>
>> I am a newbie to Flink and Scala and trying my best to learn
>> everything I can. I doing a practice where  I am getting incoming JSON 
>> data
>> from the Kafka topic and want to perform a data type check on it.
>> For that, I came across TypeInformation of Flink. Please read my
>> problem in detail from the below link:
>>
>> Flink Problem
>> 
>>
>> I went through the documentation but didn't come across any relevant
>> examples. Any suggestions would help.
>>
>> Looking forward to hearing from you.
>>
>>
>> Thanks,
>> Siddhesh
>>
>


Re: TypeInformation | Flink

2021-12-29 Thread David Morávek
Hi Siddhesh,

You can not change the method signature when you're implementing an
interface.

I'm not really sure this belongs to the ML anymore as this is getting more
into Scala / Java fundamentals. There are some great learning resources
online for Scala [1], I'd recommend starting from there. Also if you're are
not familiar with Scala I'd highly recommend starting with Java API first
as it's way more intuitive to use with Flink as you don't have to deal with
Scala / Java interoperability.

[1]
https://docs.scala-lang.org/scala3/book/interacting-with-java.html#extending-java-interfaces-in-scala

On Wed, Dec 29, 2021 at 5:59 PM Siddhesh Kalgaonkar <
kalgaonkarsiddh...@gmail.com> wrote:

> I have modified my question based on Dominik's inputs. Can somebody help
> to take it forward?
>
> Thanks,
> Siddhesh
>
> On Wed, Dec 29, 2021 at 3:32 PM David Morávek  wrote:
>
>> Please always try to include user@f.a.o in your reply, so other can
>> participate in the discussion and learn from your findings.
>>
>> I think Dominik has already given you pretty good hint. The JSON parsing
>> in this case is not any different as with any other java application (with
>> jackson / gson / ...). You can then simply split the parsed elements into
>> good and bad records.
>>
>> D.
>>
>> On Wed, Dec 29, 2021 at 10:53 AM Siddhesh Kalgaonkar <
>> kalgaonkarsiddh...@gmail.com> wrote:
>>
>>> Hi David,
>>>
>>> Thanks for the clarification. I will check the link you shared. Also, as
>>> mentioned by Dominik, can you help me with the process functions. How can I
>>> use it for my use case?
>>>
>>> Thanks,
>>> Siddhesh
>>>
>>> On Wed, Dec 29, 2021 at 2:50 PM David Morávek  wrote:
>>>
 Hi Siddhesh,

 it seems that the question is already being answered in the SO thread,
 so let's keep the discussion focused there.

 Looking at the original question, I think it's important to understand,
 that the TypeInformation is not meant to be used for "runtime" matching,
 but to address the type erasure [1] limitation for the UDFs (user defined
 functions), so Flink can pick the correct serializer / deserializer.

 [1] https://docs.oracle.com/javase/tutorial/java/generics/erasure.html

 Best,
 D.

 On Tue, Dec 28, 2021 at 9:21 PM Siddhesh Kalgaonkar <
 kalgaonkarsiddh...@gmail.com> wrote:

> Hi Team,
>
> I am a newbie to Flink and Scala and trying my best to learn
> everything I can. I doing a practice where  I am getting incoming JSON 
> data
> from the Kafka topic and want to perform a data type check on it.
> For that, I came across TypeInformation of Flink. Please read my
> problem in detail from the below link:
>
> Flink Problem
> 
>
> I went through the documentation but didn't come across any relevant
> examples. Any suggestions would help.
>
> Looking forward to hearing from you.
>
>
> Thanks,
> Siddhesh
>



Re: TypeInformation | Flink

2021-12-29 Thread Siddhesh Kalgaonkar
I have modified my question based on Dominik's inputs. Can somebody help to
take it forward?

Thanks,
Siddhesh

On Wed, Dec 29, 2021 at 3:32 PM David Morávek  wrote:

> Please always try to include user@f.a.o in your reply, so other can
> participate in the discussion and learn from your findings.
>
> I think Dominik has already given you pretty good hint. The JSON parsing
> in this case is not any different as with any other java application (with
> jackson / gson / ...). You can then simply split the parsed elements into
> good and bad records.
>
> D.
>
> On Wed, Dec 29, 2021 at 10:53 AM Siddhesh Kalgaonkar <
> kalgaonkarsiddh...@gmail.com> wrote:
>
>> Hi David,
>>
>> Thanks for the clarification. I will check the link you shared. Also, as
>> mentioned by Dominik, can you help me with the process functions. How can I
>> use it for my use case?
>>
>> Thanks,
>> Siddhesh
>>
>> On Wed, Dec 29, 2021 at 2:50 PM David Morávek  wrote:
>>
>>> Hi Siddhesh,
>>>
>>> it seems that the question is already being answered in the SO thread,
>>> so let's keep the discussion focused there.
>>>
>>> Looking at the original question, I think it's important to understand,
>>> that the TypeInformation is not meant to be used for "runtime" matching,
>>> but to address the type erasure [1] limitation for the UDFs (user defined
>>> functions), so Flink can pick the correct serializer / deserializer.
>>>
>>> [1] https://docs.oracle.com/javase/tutorial/java/generics/erasure.html
>>>
>>> Best,
>>> D.
>>>
>>> On Tue, Dec 28, 2021 at 9:21 PM Siddhesh Kalgaonkar <
>>> kalgaonkarsiddh...@gmail.com> wrote:
>>>
 Hi Team,

 I am a newbie to Flink and Scala and trying my best to learn everything
 I can. I doing a practice where  I am getting incoming JSON data from the
 Kafka topic and want to perform a data type check on it.
 For that, I came across TypeInformation of Flink. Please read my
 problem in detail from the below link:

 Flink Problem
 

 I went through the documentation but didn't come across any relevant
 examples. Any suggestions would help.

 Looking forward to hearing from you.


 Thanks,
 Siddhesh

>>>


Re: TypeInformation | Flink

2021-12-29 Thread Siddhesh Kalgaonkar
Okay, David. Let me try it and will let you know.

On Wed, Dec 29, 2021 at 3:32 PM David Morávek  wrote:

> Please always try to include user@f.a.o in your reply, so other can
> participate in the discussion and learn from your findings.
>
> I think Dominik has already given you pretty good hint. The JSON parsing
> in this case is not any different as with any other java application (with
> jackson / gson / ...). You can then simply split the parsed elements into
> good and bad records.
>
> D.
>
> On Wed, Dec 29, 2021 at 10:53 AM Siddhesh Kalgaonkar <
> kalgaonkarsiddh...@gmail.com> wrote:
>
>> Hi David,
>>
>> Thanks for the clarification. I will check the link you shared. Also, as
>> mentioned by Dominik, can you help me with the process functions. How can I
>> use it for my use case?
>>
>> Thanks,
>> Siddhesh
>>
>> On Wed, Dec 29, 2021 at 2:50 PM David Morávek  wrote:
>>
>>> Hi Siddhesh,
>>>
>>> it seems that the question is already being answered in the SO thread,
>>> so let's keep the discussion focused there.
>>>
>>> Looking at the original question, I think it's important to understand,
>>> that the TypeInformation is not meant to be used for "runtime" matching,
>>> but to address the type erasure [1] limitation for the UDFs (user defined
>>> functions), so Flink can pick the correct serializer / deserializer.
>>>
>>> [1] https://docs.oracle.com/javase/tutorial/java/generics/erasure.html
>>>
>>> Best,
>>> D.
>>>
>>> On Tue, Dec 28, 2021 at 9:21 PM Siddhesh Kalgaonkar <
>>> kalgaonkarsiddh...@gmail.com> wrote:
>>>
 Hi Team,

 I am a newbie to Flink and Scala and trying my best to learn everything
 I can. I doing a practice where  I am getting incoming JSON data from the
 Kafka topic and want to perform a data type check on it.
 For that, I came across TypeInformation of Flink. Please read my
 problem in detail from the below link:

 Flink Problem
 

 I went through the documentation but didn't come across any relevant
 examples. Any suggestions would help.

 Looking forward to hearing from you.


 Thanks,
 Siddhesh

>>>


Re: TypeInformation | Flink

2021-12-29 Thread David Morávek
Please always try to include user@f.a.o in your reply, so other can
participate in the discussion and learn from your findings.

I think Dominik has already given you pretty good hint. The JSON parsing in
this case is not any different as with any other java application (with
jackson / gson / ...). You can then simply split the parsed elements into
good and bad records.

D.

On Wed, Dec 29, 2021 at 10:53 AM Siddhesh Kalgaonkar <
kalgaonkarsiddh...@gmail.com> wrote:

> Hi David,
>
> Thanks for the clarification. I will check the link you shared. Also, as
> mentioned by Dominik, can you help me with the process functions. How can I
> use it for my use case?
>
> Thanks,
> Siddhesh
>
> On Wed, Dec 29, 2021 at 2:50 PM David Morávek  wrote:
>
>> Hi Siddhesh,
>>
>> it seems that the question is already being answered in the SO thread, so
>> let's keep the discussion focused there.
>>
>> Looking at the original question, I think it's important to understand,
>> that the TypeInformation is not meant to be used for "runtime" matching,
>> but to address the type erasure [1] limitation for the UDFs (user defined
>> functions), so Flink can pick the correct serializer / deserializer.
>>
>> [1] https://docs.oracle.com/javase/tutorial/java/generics/erasure.html
>>
>> Best,
>> D.
>>
>> On Tue, Dec 28, 2021 at 9:21 PM Siddhesh Kalgaonkar <
>> kalgaonkarsiddh...@gmail.com> wrote:
>>
>>> Hi Team,
>>>
>>> I am a newbie to Flink and Scala and trying my best to learn everything
>>> I can. I doing a practice where  I am getting incoming JSON data from the
>>> Kafka topic and want to perform a data type check on it.
>>> For that, I came across TypeInformation of Flink. Please read my problem
>>> in detail from the below link:
>>>
>>> Flink Problem
>>> 
>>>
>>> I went through the documentation but didn't come across any relevant
>>> examples. Any suggestions would help.
>>>
>>> Looking forward to hearing from you.
>>>
>>>
>>> Thanks,
>>> Siddhesh
>>>
>>


Re: TypeInformation | Flink

2021-12-29 Thread Siddhesh Kalgaonkar
Hi David,

Thanks for the clarification. I will check the link you shared. Also, as
mentioned by Dominik, can you help me with the process functions. How can I
use it for my use case?

Thanks,
Siddhesh


On Wed, Dec 29, 2021 at 3:22 PM Siddhesh Kalgaonkar <
kalgaonkarsiddh...@gmail.com> wrote:

> Hi David,
>
> Thanks for the clarification. I will check the link you shared. Also, as
> mentioned by Dominik, can you help me with the process functions. How can I
> use it for my use case?
>
> Thanks,
> Siddhesh
>
> On Wed, Dec 29, 2021 at 2:50 PM David Morávek  wrote:
>
>> Hi Siddhesh,
>>
>> it seems that the question is already being answered in the SO thread, so
>> let's keep the discussion focused there.
>>
>> Looking at the original question, I think it's important to understand,
>> that the TypeInformation is not meant to be used for "runtime" matching,
>> but to address the type erasure [1] limitation for the UDFs (user defined
>> functions), so Flink can pick the correct serializer / deserializer.
>>
>> [1] https://docs.oracle.com/javase/tutorial/java/generics/erasure.html
>>
>> Best,
>> D.
>>
>> On Tue, Dec 28, 2021 at 9:21 PM Siddhesh Kalgaonkar <
>> kalgaonkarsiddh...@gmail.com> wrote:
>>
>>> Hi Team,
>>>
>>> I am a newbie to Flink and Scala and trying my best to learn everything
>>> I can. I doing a practice where  I am getting incoming JSON data from the
>>> Kafka topic and want to perform a data type check on it.
>>> For that, I came across TypeInformation of Flink. Please read my problem
>>> in detail from the below link:
>>>
>>> Flink Problem
>>> 
>>>
>>> I went through the documentation but didn't come across any relevant
>>> examples. Any suggestions would help.
>>>
>>> Looking forward to hearing from you.
>>>
>>>
>>> Thanks,
>>> Siddhesh
>>>
>>


Re: TypeInformation | Flink

2021-12-29 Thread David Morávek
Hi Siddhesh,

it seems that the question is already being answered in the SO thread, so
let's keep the discussion focused there.

Looking at the original question, I think it's important to understand,
that the TypeInformation is not meant to be used for "runtime" matching,
but to address the type erasure [1] limitation for the UDFs (user defined
functions), so Flink can pick the correct serializer / deserializer.

[1] https://docs.oracle.com/javase/tutorial/java/generics/erasure.html

Best,
D.

On Tue, Dec 28, 2021 at 9:21 PM Siddhesh Kalgaonkar <
kalgaonkarsiddh...@gmail.com> wrote:

> Hi Team,
>
> I am a newbie to Flink and Scala and trying my best to learn everything I
> can. I doing a practice where  I am getting incoming JSON data from the
> Kafka topic and want to perform a data type check on it.
> For that, I came across TypeInformation of Flink. Please read my problem
> in detail from the below link:
>
> Flink Problem
> 
>
> I went through the documentation but didn't come across any relevant
> examples. Any suggestions would help.
>
> Looking forward to hearing from you.
>
>
> Thanks,
> Siddhesh
>


TypeInformation | Flink

2021-12-28 Thread Siddhesh Kalgaonkar
Hi Team,

I am a newbie to Flink and Scala and trying my best to learn everything I
can. I doing a practice where  I am getting incoming JSON data from the
Kafka topic and want to perform a data type check on it.
For that, I came across TypeInformation of Flink. Please read my problem in
detail from the below link:

Flink Problem


I went through the documentation but didn't come across any relevant
examples. Any suggestions would help.

Looking forward to hearing from you.


Thanks,
Siddhesh