Re: Failure Restart Strategy leads to error

2022-01-26 Thread Siddhesh Kalgaonkar
Hi Yun and Oran,

Thanks for your time. Much appreciated!

Below are my configs:

val env = StreamExecutionEnvironment.getExecutionEnvironment

env.setParallelism(1)
env.enableCheckpointing(2000)

//env.setDefaultSavepointDirectory("file:home/siddhesh/Desktop/savepoints/")

env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
env.setStateBackend(new
FsStateBackend(("file:home/siddhesh/Desktop/flink/")))
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1000) //Gap after
which next checkpoint can be written.
env.getCheckpointConfig.setCheckpointTimeout(4000) //Checkpoints have
to complete within 4secs
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) //Only 1
checkpoints can be executed at a time
env.getCheckpointConfig.enableExternalizedCheckpoints(
  ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) //Checkpoints
are retained if the job is cancelled explicitly
/*env.setRestartStrategy(RestartStrategies.failureRateRestart(
  3, // max failures per unit
  org.apache.flink.api.common.time.Time.of(5, TimeUnit.MINUTES), //time
interval for measuring failure rate
  org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS) //
delay
))*/

Consumer properties:

val consumer = new FlinkKafkaConsumer[String]("topic_name", new
SimpleStringSchema(), getProperties()) // Setting up the consumer properties


def getProperties(): Properties = {
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("zookeeper.connect", "localhost:2181")
properties.setProperty("isolation.level", "read_committed")
return properties
  }

Also, you can see that I have commented out setting up my own savepoint
directory.  It was also leading to some error and was causing to end
abruptly. Do I need to set it up while running via CLI or is there is
something I am missing for failure restart strategy and savepoints
directory?

Thanks,
Sid


On Wed, Jan 26, 2022 at 1:52 PM Yun Tang  wrote:

> Hi Siddhesh,
>
> The root cause is that the configuration of group.id is missing for the
> Flink program. The configuration of restart strategy has no relationship
> with this.
>
> I think you should pay your attention to kafka related configurations.
>
>
> Best
> Yun Tang
> --
> *From:* Siddhesh Kalgaonkar 
> *Sent:* Wednesday, January 26, 2022 3:17
> *To:* user 
> *Subject:* Failure Restart Strategy leads to error
>
> I have Flink Kafka Consumer in place which works fine until I add the
> below lines:
>
> env.setRestartStrategy(RestartStrategies.failureRateRestart(
> 3,
> *// max failures per unit *Time.of(5, TimeUnit.MINUTES),
> *//time interval for measuring failure rate *Time.of(10, TimeUnit.SECONDS)
> *// delay *))
>
> It gives me the below error stack trace:
>
> DEBUG [flink-akka.actor.default-dispatcher-14] (JobMaster.java:1119) -
> Close ResourceManager connection 05d80aa9f3aca06faf7be80bbc8a0642.
> org.apache.flink.util.FlinkException: Stopping JobMaster for job Flink
> Kafka Example(b425ae91bfb0e81980b878b3e4392137).
> at org.apache.flink.runtime.jobmaster.JobMaster.onStop(JobMaster.java:400)
> at
> org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:214)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:563)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:186)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> DEBUG [flink-akka.actor.default-dispatcher-12]
> (DefaultJobLeaderIdS

Failure Restart Strategy leads to error

2022-01-25 Thread Siddhesh Kalgaonkar
I have Flink Kafka Consumer in place which works fine until I add the below
lines:

env.setRestartStrategy(RestartStrategies.failureRateRestart(
3,
*// max failures per unit* Time.of(5, TimeUnit.MINUTES),
*//time interval for measuring failure rate* Time.of(10, TimeUnit.SECONDS)
*// delay*))

It gives me the below error stack trace:

DEBUG [flink-akka.actor.default-dispatcher-14] (JobMaster.java:1119) -
Close ResourceManager connection 05d80aa9f3aca06faf7be80bbc8a0642.
org.apache.flink.util.FlinkException: Stopping JobMaster for job Flink
Kafka Example(b425ae91bfb0e81980b878b3e4392137).
at org.apache.flink.runtime.jobmaster.JobMaster.onStop(JobMaster.java:400)
at
org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:214)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:563)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:186)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
DEBUG [flink-akka.actor.default-dispatcher-12]
(DefaultJobLeaderIdService.java:148) - Remove job
b425ae91bfb0e81980b878b3e4392137 from job leader id monitoring.
 INFO [flink-akka.actor.default-dispatcher-12] (ResourceManager.java:1047)
- Disconnect job manager
a95c280817468866d08c3230ecd0462f@akka://flink/user/rpc/jobmanager_3
for job b425ae91bfb0e81980b878b3e4392137 from the resource manager.
DEBUG [flink-akka.actor.default-dispatcher-12]
(DefaultResourceTracker.java:80) - Initiating tracking of resources for job
b425ae91bfb0e81980b878b3e4392137.
DEBUG [flink-akka.actor.default-dispatcher-12]
(DefaultResourceTracker.java:60) - Stopping tracking of resources for job
b425ae91bfb0e81980b878b3e4392137.
DEBUG [flink-akka.actor.default-dispatcher-14] (AkkaRpcActor.java:131) -
The RpcEndpoint jobmanager_3 terminated successfully.
 INFO [flink-akka.actor.default-dispatcher-8]
(DefaultJobLeaderService.java:136) - Stop job leader service.
 INFO [flink-akka.actor.default-dispatcher-8]
(TaskExecutorLocalStateStoresManager.java:231) - Shutting down
TaskExecutorLocalStateStoresManager.
DEBUG [flink-akka.actor.default-dispatcher-8] (IOManagerAsync.java:121) -
Shutting down I/O manager.
Exception in thread "main"
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
at
org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137)
at
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
at
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:237)
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1081)
at akka.dispatch.OnComplete.internal(Future.scala:264)
at akka.dispatch.OnComplete.internal(Future.scala:261)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at 

Re: Async IO code not working

2022-01-12 Thread Siddhesh Kalgaonkar
Hi Chesnay,

Thanks for your time. Much appreciated.

I am getting error on the below line:

val res:DataStream[String]=AsyncDataStream.unorderedWait(goodRecords,new
CassandraAsyncSink(),1000,TimeUnit.SECONDS,100)

Yes, earlier it was a wrong import but it is still giving me the below
error:

type mismatch;
 found   : KafkaAsSource.CassandraAsyncSink
 required:
org.apache.flink.streaming.api.scala.async.AsyncFunction[?,String]


P.S: I am not sure whether this would make any difference but in the code
of asyncInvoke, I am not using any ResultFuture or the template given on
the official doc for the AsyncIO example. Anyways the method's return type
is Unit so I am using session.execute("query") under the method which is
used for some RichSinkFunction.

What else am I missing?



On Wed, Jan 12, 2022 at 4:47 PM Chesnay Schepler  wrote:

> It would have good to clarify which line causes the error; as is I can
> only guess.
>
> Please make sure you use the scala variant of the AsyncDataStream
> (org.apache.flink.streaming.api.scala.AsyncDataStream).
>
>
> On 11/01/2022 21:32, Siddhesh Kalgaonkar wrote:
> > I am using below code to get the data from the side output which has
> > filtered records.
> > So, it goes like this:
> >
> > val filterRecords: DataStream[String] = src.process(new
> > ProcessFunction()).getSideOutput(filteredOutputTag)
> >
> > It has filtered records in it.
> >
> > Now, I want to add these records to the db asynchronously. Therefore,
> > I wrote below code using documentation reference:
> >
> > val asyncFunction:AsyncFunction[String,String]=new DBAsyncSink() //SO
> > reference
> > AsyncDataStream.unorderedWait(goodRecords,new DBAsyncSink(), 1000,
> > TimeUnit.SECONDS, 100) //Documentation Reference
> >
> > and the class for the "DBAsyncSink" is as follows:
> >
> > class DBAsyncSink extends RichAsyncFunction[String,String] {
> >
> >   override def open(parameters: Configuration): Unit = {
> >
> >   }
> >
> >   override def asyncInvoke(input:String, resultFuture:
> > ResultFuture[String]): Unit = {
> >
> >   }
> >
> >   override def close(): Unit = {
> > session.close()
> >   }
> >
> > }
> >
> > I am getting below error:
> >
> > type mismatch;
> >  found   : org.apache.flink.streaming.api.scala.DataStream[String]
> >  required: org.apache.flink.streaming.api.datastream.DataStream[?]
> >
> > What am I missing over here? I tried a couple of examples but it
> > didn't work.
> >
> > Thanks,
> > Sid
>
>
>


Async IO code not working

2022-01-11 Thread Siddhesh Kalgaonkar
I am using below code to get the data from the side output which has
filtered records.
So, it goes like this:

val filterRecords: DataStream[String] = src.process(new
ProcessFunction()).getSideOutput(filteredOutputTag)

It has filtered records in it.

Now, I want to add these records to the db asynchronously. Therefore, I
wrote below code using documentation reference:

val asyncFunction:AsyncFunction[String,String]=new DBAsyncSink() //SO
reference
AsyncDataStream.unorderedWait(goodRecords,new DBAsyncSink(), 1000,
TimeUnit.SECONDS, 100) //Documentation Reference

and the class for the "DBAsyncSink" is as follows:

class DBAsyncSink extends RichAsyncFunction[String,String] {

  override def open(parameters: Configuration): Unit = {

  }

  override def asyncInvoke(input:String, resultFuture:
ResultFuture[String]): Unit = {

  }

  override def close(): Unit = {
session.close()
  }

}

I am getting below error:

type mismatch;
 found   : org.apache.flink.streaming.api.scala.DataStream[String]
 required: org.apache.flink.streaming.api.datastream.DataStream[?]

What am I missing over here? I tried a couple of examples but it didn't
work.

Thanks,
Sid


Re: RichMapFunction to convert tuple of strings to DataStream[(String,String)]

2022-01-10 Thread Siddhesh Kalgaonkar
Hi Piotr,

Thanks for the reply. I was looking for how to create a DataStream under a
process function since using that I had to call something else but I came
across one of Fabian's posts where he mentioned that this way of creating
DS is not "encouraged and tested". So, I figured out an alternate way of
using side output and now I can do what I was aiming for.

Thanks,
Sid.

On Mon, Jan 10, 2022 at 5:29 PM Piotr Nowojski  wrote:

> Hi Sid,
>
> I don't see on the stackoverflow explanation of what are you trying to do
> here (no mentions of MapFunction or a tuple).
>
> If you want to create a `DataStream` from some a pre
> existing/static Tuple of Strings, the easiest thing would be to convert the
> tuple to a collection/iterator and use
> `StreamExecutionEnvironment#fromCollection(...)`.
> If you already have a `DataStream>` (for example your source
> produces a tuple) and you want to flatten it to `DataStream`, then
> you need a simple `FlatMapFunction, String>` (or
> `RichFlatMapFunction, String>`), that would do the flattening
> via:
>
> public void flatMap(Tuple value, Collector out) throws
> Exception {
>   out.collect(value.f0);
>   out.collect(value.f1);
>   ...;
>   out.collect(value.fN);
> }
>
> Best,
> Piotrek
>
> pt., 7 sty 2022 o 07:05 Siddhesh Kalgaonkar 
> napisał(a):
>
>> Hi Francis,
>>
>> What I am trying to do is you can see over here
>> https://stackoverflow.com/questions/70592174/richsinkfunction-for-cassandra-in-flink/70593375?noredirect=1#comment124796734_70593375
>>
>>
>> On Fri, Jan 7, 2022 at 5:07 AM Francis Conroy <
>> francis.con...@switchdin.com> wrote:
>>
>>> Hi Siddhesh,
>>>
>>> How are you getting this tuple of strings into the system? I think this
>>> is the important question, you can create a DataStream in many ways, from a
>>> collection, from a source, etc but all of these rely on the
>>> ExecutionEnvironment you're using.
>>> A RichMapFunction doesn't produce a datastream directly, it's used in
>>> the context of the StreamExecutionEnvironment to create a stream i.e.
>>> DataStream.map([YourRichMapFunction]) this implies that you already need a
>>> datastream to transform a datastream using a mapFunction
>>> (MapFunction/RichMapFunction)
>>> Francis
>>>
>>> On Fri, 7 Jan 2022 at 01:48, Siddhesh Kalgaonkar <
>>> kalgaonkarsiddh...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> As I am new and I am facing one issue so I came across RichMapFunction.
>>>> How can I use RichMapFunction to convert a tuple of strings to datastream?
>>>> If not how can I do it apart from using StreamExecutionEnvironment?
>>>>
>>>> Thanks,
>>>> Sid
>>>>
>>>
>>> This email and any attachments are proprietary and confidential and are
>>> intended solely for the use of the individual to whom it is addressed. Any
>>> views or opinions expressed are solely those of the author and do not
>>> necessarily reflect or represent those of SwitchDin Pty Ltd. If you have
>>> received this email in error, please let us know immediately by reply email
>>> and delete it from your system. You may not use, disseminate, distribute or
>>> copy this message nor disclose its contents to anyone.
>>> SwitchDin Pty Ltd (ABN 29 154893857) PO Box 1165, Newcastle NSW 2300
>>> Australia
>>>
>>


Creating DS in low level conversion operators i.e ProcessFunctions

2022-01-08 Thread Siddhesh Kalgaonkar
Hey Team,

I have a flow like Kafka Sink Datastream -> Process Function (Separate
Class) -> DBSink(Separate Class).

Process Function returns me the output as a string and now I want to create
a DataStream out of the string variable so that I can call something like
ds.addSink(new DBSink()). For that, I used the StreamExecution variable as
a global /method variable but I am not able to create it properly.

What is happening is, if I don't create a data stream properly it doesn't
call the sink properly because it doesn't execute the methods under the
Sink class which is according to Fabian's answer in the below post:

https://stackoverflow.com/questions/41210266/apache-flink-use-values-from-a-data-stream-to-dynamically-create-a-streaming-d

How can I fix this? Or else, I will have to create normal classes and call
the DB methods which is inappropriate because it will create and close
connection per record which is again an expensive operation in practice.

Thanks,
Sid


Re: Exactly Once Semantics

2022-01-07 Thread Siddhesh Kalgaonkar
Hey Yun,

Thanks for your quick response. Much appreciated. I have replied to your
answer on SO and I will continue with my doubts over there.

Thanks,
Sid

On Fri, Jan 7, 2022 at 9:05 PM Yun Gao  wrote:

> Hi Siddhesh,
>
> I answered on the stackoverflow and I also copied the answers here for
> reference:
>
> For the producer side, Flink Kafka Consumer would bookkeeper the current
> offset in the
>
> distributed checkpoint, and if the consumer task failed, it will restarted
> from the latest
>
> checkpoint and re-emit from the offset recorded in the checkpoint. For
> example, suppose
>
> the latest checkpoint records offset 3, and after that flink continue to
> emit 4, 5 and then
>
> failover, then Flink would continue to emit records from 4. Notes that
> this would not cause
>
> duplication since the state of all the operators are also fallback to the
> state after processed
>
> records 3.
>
>
> For the producer side, Flink use two-phase commit [1] to achieve
> exactly-once. Roughly
>
> Flink Producer would relies on Kafka's transaction to write data, and only
> commit data
>
> formally after the transaction is committed. Users could use
> Semantics.EXACTLY_ONCE
>
> to enable this functionality.
>
>
> We are warmly welcome for reaching to the community for help and very
> thanks
>
> everyone for participating in the community :) I think David and Martijn
> are also try to
>
> make we work together more efficiently. Very thanks for the understandings~
>
>
> Best,
>
> Yun
>
>
> [1]
> https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html
>
> [2]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/#fault-tolerance
>
>
>
> --
> From:Siddhesh Kalgaonkar 
> Send Time:2022 Jan. 7 (Fri.) 23:25
> To:Martijn Visser 
> Cc:"David Morávek" ; user 
> Subject:Re: Exactly Once Semantics
>
> Hi Martijn,
>
> Understood. If possible please help me out with the problem.
>
> Thanks,
> Sid
>
> On Fri, Jan 7, 2022 at 8:45 PM Martijn Visser 
> wrote:
> Hi Siddesh,
>
> The purpose of both Stackoverflow and the mailing list is to solve a
> question or a problem, the mailing list is not for getting attention. It
> equivalents crossposting, which we rather don't. As David mentioned, time
> is limited and we all try to spent it the best we can.
>
> Best regards,
>
> Martijn
>
> Op vr 7 jan. 2022 om 16:04 schreef Siddhesh Kalgaonkar <
> kalgaonkarsiddh...@gmail.com>
> Hi David,
>
> It's actually better in my opinion. Because people who are not aware of
> the ML thread can Google and check the SO posts when they come across any
> similar problems. The reason behind posting on ML is to get attention.
> Because few questions are unanswered for multiple days and since we are
> beginners, the only things which we have are SO and ML.  I won't say
> "Duplication" but more kind of "Availability of similar problems".
>
> It's okay if you don't want to help.
>
> Cheers!
>
> Sid
>
> On Fri, Jan 7, 2022 at 8:18 PM David Morávek  wrote:
> Hi Siddhesh,
>
> can you please focus your questions on one channel only? (either SO or the
> ML)
>
> this could lead to unnecessary work duplication (which would be shame,
> because the community has limited resources) as people answering on SO
> might not be aware of the ML thread
>
> D.
>
> On Fri, Jan 7, 2022 at 3:02 PM Siddhesh Kalgaonkar <
> kalgaonkarsiddh...@gmail.com> wrote:
> I am trying to achieve exactly one semantics using Flink and Kafka. I have
> explained my scenario thoroughly in this post
>
> https://stackoverflow.com/questions/70622321/exactly-once-in-flink-kafka-producer-and-consumer
>
> Any help is much appreciated!
>
> Thanks,
> Sid
> --
>
> Martijn Visser | Product Manager
>
> mart...@ververica.com
>
> <https://www.ververica.com/>
>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
>
>


Re: Exactly Once Semantics

2022-01-07 Thread Siddhesh Kalgaonkar
Hi Martijn,

Understood. If possible please help me out with the problem.

Thanks,
Sid

On Fri, Jan 7, 2022 at 8:45 PM Martijn Visser  wrote:

> Hi Siddesh,
>
> The purpose of both Stackoverflow and the mailing list is to solve a
> question or a problem, the mailing list is not for getting attention. It
> equivalents crossposting, which we rather don't. As David mentioned, time
> is limited and we all try to spent it the best we can.
>
> Best regards,
>
> Martijn
>
> Op vr 7 jan. 2022 om 16:04 schreef Siddhesh Kalgaonkar <
> kalgaonkarsiddh...@gmail.com>
>
>> Hi David,
>>
>> It's actually better in my opinion. Because people who are not aware of
>> the ML thread can Google and check the SO posts when they come across any
>> similar problems. The reason behind posting on ML is to get attention.
>> Because few questions are unanswered for multiple days and since we are
>> beginners, the only things which we have are SO and ML.  I won't say
>> "Duplication" but more kind of "Availability of similar problems".
>>
>> It's okay if you don't want to help.
>>
>> Cheers!
>>
>> Sid
>>
>> On Fri, Jan 7, 2022 at 8:18 PM David Morávek  wrote:
>>
>>> Hi Siddhesh,
>>>
>>> can you please focus your questions on one channel only? (either SO or
>>> the ML)
>>>
>>> this could lead to unnecessary work duplication (which would be shame,
>>> because the community has limited resources) as people answering on SO
>>> might not be aware of the ML thread
>>>
>>> D.
>>>
>>> On Fri, Jan 7, 2022 at 3:02 PM Siddhesh Kalgaonkar <
>>> kalgaonkarsiddh...@gmail.com> wrote:
>>>
>>>> I am trying to achieve exactly one semantics using Flink and Kafka. I
>>>> have explained my scenario thoroughly in this post
>>>>
>>>> https://stackoverflow.com/questions/70622321/exactly-once-in-flink-kafka-producer-and-consumer
>>>>
>>>> Any help is much appreciated!
>>>>
>>>> Thanks,
>>>> Sid
>>>>
>>> --
>
> Martijn Visser | Product Manager
>
> mart...@ververica.com
>
> <https://www.ververica.com/>
>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
>


Re: Exactly Once Semantics

2022-01-07 Thread Siddhesh Kalgaonkar
Hi David,

It's actually better in my opinion. Because people who are not aware of the
ML thread can Google and check the SO posts when they come across any
similar problems. The reason behind posting on ML is to get attention.
Because few questions are unanswered for multiple days and since we are
beginners, the only things which we have are SO and ML.  I won't say
"Duplication" but more kind of "Availability of similar problems".

It's okay if you don't want to help.

Cheers!

Sid

On Fri, Jan 7, 2022 at 8:18 PM David Morávek  wrote:

> Hi Siddhesh,
>
> can you please focus your questions on one channel only? (either SO or the
> ML)
>
> this could lead to unnecessary work duplication (which would be shame,
> because the community has limited resources) as people answering on SO
> might not be aware of the ML thread
>
> D.
>
> On Fri, Jan 7, 2022 at 3:02 PM Siddhesh Kalgaonkar <
> kalgaonkarsiddh...@gmail.com> wrote:
>
>> I am trying to achieve exactly one semantics using Flink and Kafka. I
>> have explained my scenario thoroughly in this post
>>
>> https://stackoverflow.com/questions/70622321/exactly-once-in-flink-kafka-producer-and-consumer
>>
>> Any help is much appreciated!
>>
>> Thanks,
>> Sid
>>
>


Exactly Once Semantics

2022-01-07 Thread Siddhesh Kalgaonkar
I am trying to achieve exactly one semantics using Flink and Kafka. I have
explained my scenario thoroughly in this post
https://stackoverflow.com/questions/70622321/exactly-once-in-flink-kafka-producer-and-consumer

Any help is much appreciated!

Thanks,
Sid


Re: RichMapFunction to convert tuple of strings to DataStream[(String,String)]

2022-01-06 Thread Siddhesh Kalgaonkar
Hi Francis,

What I am trying to do is you can see over here
https://stackoverflow.com/questions/70592174/richsinkfunction-for-cassandra-in-flink/70593375?noredirect=1#comment124796734_70593375


On Fri, Jan 7, 2022 at 5:07 AM Francis Conroy 
wrote:

> Hi Siddhesh,
>
> How are you getting this tuple of strings into the system? I think this is
> the important question, you can create a DataStream in many ways, from a
> collection, from a source, etc but all of these rely on the
> ExecutionEnvironment you're using.
> A RichMapFunction doesn't produce a datastream directly, it's used in the
> context of the StreamExecutionEnvironment to create a stream i.e.
> DataStream.map([YourRichMapFunction]) this implies that you already need a
> datastream to transform a datastream using a mapFunction
> (MapFunction/RichMapFunction)
> Francis
>
> On Fri, 7 Jan 2022 at 01:48, Siddhesh Kalgaonkar <
> kalgaonkarsiddh...@gmail.com> wrote:
>
>> Hi,
>>
>> As I am new and I am facing one issue so I came across RichMapFunction.
>> How can I use RichMapFunction to convert a tuple of strings to datastream?
>> If not how can I do it apart from using StreamExecutionEnvironment?
>>
>> Thanks,
>> Sid
>>
>
> This email and any attachments are proprietary and confidential and are
> intended solely for the use of the individual to whom it is addressed. Any
> views or opinions expressed are solely those of the author and do not
> necessarily reflect or represent those of SwitchDin Pty Ltd. If you have
> received this email in error, please let us know immediately by reply email
> and delete it from your system. You may not use, disseminate, distribute or
> copy this message nor disclose its contents to anyone.
> SwitchDin Pty Ltd (ABN 29 154893857) PO Box 1165, Newcastle NSW 2300
> Australia
>


RichMapFunction to convert tuple of strings to DataStream[(String,String)]

2022-01-06 Thread Siddhesh Kalgaonkar
Hi,

As I am new and I am facing one issue so I came across RichMapFunction. How
can I use RichMapFunction to convert a tuple of strings to datastream? If
not how can I do it apart from using StreamExecutionEnvironment?

Thanks,
Sid


Re: Passing msg and record to the process function

2022-01-06 Thread Siddhesh Kalgaonkar
I was able to modify the code and get the tuple in case of Success. How do
I pass the tuple to the Failure part?

try
{
  //
  //some processing

if (!validationMessages.isEmpty) {
(parsedJson.toString(), validationMessages.foreach(x => {
  val msg: String = x.getMessage
  msg
}).toString())
  }
  else {
(parsedJson.toString(), "Good Record...")
  }

}
match {
  case Success(x) => {
Right(x)
  }
  case Failure(err) => {
Left(json)
  }
}


On Thu, Jan 6, 2022 at 1:43 PM Siddhesh Kalgaonkar <
kalgaonkarsiddh...@gmail.com> wrote:

> Thanks, Caizhi for your explanation. It helped me to understand where I
> went wrong.
>
> On Thu, Jan 6, 2022 at 7:37 AM Caizhi Weng  wrote:
>
>> Hi!
>>
>> The last expression in your try block is
>>
>> if(validationMessages.isEmpty) {
>>   (parsedJson.toString(),
>> validationMessages.foreach((msg=>msg.getMessage.toString)))
>> } else {
>>   (parsedJson.toString(), "Format is correct...")
>> }
>>
>> The first one produces a (String, Unit) type while the second one
>> produces a (String, String) type, so the whole if expression produces
>> (String, Any) type. However your parseJson should return Either[String,
>> String], thus causing this issue.
>>
>>
>> Siddhesh Kalgaonkar  于2022年1月5日周三 19:04写道:
>>
>>> I have written a process function where I am parsing the JSON and if it
>>> is not according to the expected format it passes as Failure to the process
>>> function and I print the records which are working fine. Now, I was trying
>>> to print the message and the record in case of Success and Failure. I
>>> implemented the below code and it gave me the error. What exactly I am
>>> missing?
>>>
>>> package KafkaAsSource
>>>
>>> import com.fasterxml.jackson.databind.ObjectMapper
>>> import com.networknt.schema.{JsonSchemaFactory, SpecVersion}
>>> import org.apache.flink.api.scala.createTypeInformation
>>> import org.apache.flink.streaming.api.functions.ProcessFunction
>>> import org.apache.flink.streaming.api.scala.OutputTag
>>> import org.apache.flink.util.Collector
>>> import scala.jdk.CollectionConverters._
>>> import scala.util.{Failure, Success, Try}
>>>
>>> class JSONParsingProcessFunction extends ProcessFunction[String,String] {
>>>   val outputTag = new OutputTag[String]("failed")
>>>
>>>   def parseJson(json: String): Either[String, String] = {
>>> val schemaJsonString =
>>>   """
>>> {
>>> "$schema": "http://json-schema.org/draft-04/schema#;,
>>> "title": "Product",
>>> "description": "A product from the catalog",
>>> "type": "object",
>>> "properties": {
>>> "id": {
>>> "description": "The unique identifier for a product",
>>> "type": "integer"
>>> },
>>> "premium": {
>>> "description": "Annual Premium",
>>> "type": "integer"
>>> },
>>> "eventTime": {
>>> "description": "Timestamp at which record has arrived at source 
>>> / generated",
>>> "type": "string"
>>> }
>>> },
>>> "required": ["id", "premium","eventTime"]
>>> }
>>> """
>>> Try {
>>>   val schema = 
>>> JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.V4).getSchema(schemaJsonString)
>>>   // You can read a JSON object from String, a file, URL, etc.
>>>   val parsedJson = new ObjectMapper().readTree(json)
>>>   val validationMessages = schema.validate(parsedJson).asScala
>>>   //validationMessages.foreach(msg => println(msg.getMessage))
>>>   require(validationMessages.isEmpty)
>>>   //parsedJson.toString()
>>>   if(validationMessages.isEmpty)
>>> {
>>>   
>>> (parsedJson.toString(),validationMessages.foreach((msg=>msg.getMessage.toString)))
>>> }
>>>   else
>>> {
>>>   (parsedJson.toString(),"Format is correct...")
>>> }
>>>
>>> }
>>> match {
>>>   case Success(x) => {
>>> println("Good: " + x)
>>> Right(x)
>>>   }
>>>   case Failure(err) => {
>>> println("Bad:  " + json)
>>> Left(json)
>>>   }
>>> }
>>>   }
>>>   override def processElement(i: String, context: ProcessFunction[String, 
>>> String]#Context, collector: Collector[String]): Unit = {
>>> parseJson(i) match {
>>>   case Right(data) => {
>>> collector.collect(data)
>>> println("Good Records: " + data)
>>>   }
>>>   case Left(json) => {
>>> context.output(outputTag, json)
>>> println("Bad Records: " + json)
>>>   }
>>> }
>>>   }
>>> }
>>>
>>>
>>> Error:
>>>
>>> type mismatch;
>>>  found   : (String, Any)
>>>  required: String
>>> Right(x)
>>>
>>>


Re: Passing msg and record to the process function

2022-01-06 Thread Siddhesh Kalgaonkar
Thanks, Caizhi for your explanation. It helped me to understand where I
went wrong.

On Thu, Jan 6, 2022 at 7:37 AM Caizhi Weng  wrote:

> Hi!
>
> The last expression in your try block is
>
> if(validationMessages.isEmpty) {
>   (parsedJson.toString(),
> validationMessages.foreach((msg=>msg.getMessage.toString)))
> } else {
>   (parsedJson.toString(), "Format is correct...")
> }
>
> The first one produces a (String, Unit) type while the second one produces
> a (String, String) type, so the whole if expression produces (String, Any)
> type. However your parseJson should return Either[String, String], thus
> causing this issue.
>
>
> Siddhesh Kalgaonkar  于2022年1月5日周三 19:04写道:
>
>> I have written a process function where I am parsing the JSON and if it
>> is not according to the expected format it passes as Failure to the process
>> function and I print the records which are working fine. Now, I was trying
>> to print the message and the record in case of Success and Failure. I
>> implemented the below code and it gave me the error. What exactly I am
>> missing?
>>
>> package KafkaAsSource
>>
>> import com.fasterxml.jackson.databind.ObjectMapper
>> import com.networknt.schema.{JsonSchemaFactory, SpecVersion}
>> import org.apache.flink.api.scala.createTypeInformation
>> import org.apache.flink.streaming.api.functions.ProcessFunction
>> import org.apache.flink.streaming.api.scala.OutputTag
>> import org.apache.flink.util.Collector
>> import scala.jdk.CollectionConverters._
>> import scala.util.{Failure, Success, Try}
>>
>> class JSONParsingProcessFunction extends ProcessFunction[String,String] {
>>   val outputTag = new OutputTag[String]("failed")
>>
>>   def parseJson(json: String): Either[String, String] = {
>> val schemaJsonString =
>>   """
>> {
>> "$schema": "http://json-schema.org/draft-04/schema#;,
>> "title": "Product",
>> "description": "A product from the catalog",
>> "type": "object",
>> "properties": {
>> "id": {
>> "description": "The unique identifier for a product",
>> "type": "integer"
>> },
>> "premium": {
>> "description": "Annual Premium",
>> "type": "integer"
>> },
>> "eventTime": {
>> "description": "Timestamp at which record has arrived at source 
>> / generated",
>> "type": "string"
>> }
>> },
>> "required": ["id", "premium","eventTime"]
>> }
>> """
>> Try {
>>   val schema = 
>> JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.V4).getSchema(schemaJsonString)
>>   // You can read a JSON object from String, a file, URL, etc.
>>   val parsedJson = new ObjectMapper().readTree(json)
>>   val validationMessages = schema.validate(parsedJson).asScala
>>   //validationMessages.foreach(msg => println(msg.getMessage))
>>   require(validationMessages.isEmpty)
>>   //parsedJson.toString()
>>   if(validationMessages.isEmpty)
>> {
>>   
>> (parsedJson.toString(),validationMessages.foreach((msg=>msg.getMessage.toString)))
>> }
>>   else
>> {
>>   (parsedJson.toString(),"Format is correct...")
>> }
>>
>> }
>> match {
>>   case Success(x) => {
>> println("Good: " + x)
>> Right(x)
>>   }
>>   case Failure(err) => {
>> println("Bad:  " + json)
>> Left(json)
>>   }
>> }
>>   }
>>   override def processElement(i: String, context: ProcessFunction[String, 
>> String]#Context, collector: Collector[String]): Unit = {
>> parseJson(i) match {
>>   case Right(data) => {
>> collector.collect(data)
>> println("Good Records: " + data)
>>   }
>>   case Left(json) => {
>> context.output(outputTag, json)
>> println("Bad Records: " + json)
>>   }
>> }
>>   }
>> }
>>
>>
>> Error:
>>
>> type mismatch;
>>  found   : (String, Any)
>>  required: String
>> Right(x)
>>
>>


Re: extending RichSinkFunction doesn't force to implement any of its methods

2022-01-06 Thread Siddhesh Kalgaonkar
Hi Caizhi,

Thanks for your reply. Much appreciated. I understood the difference now.
Also, I have a flow like Kafka Sink Datastream -> Process Function
(Separate Class) -> Cassandra Sink(Separate Class).

Process Function returns me the output as a string and now I want to create
a DataStream out of the string variable so that I can call something like
ds.addSink(new CassandraSink()). For that, I used the StreamExecution
variable as a global /method variable but I am not able to create it
properly. Could you please refer to my StackOverflow post mentioned in the
main thread?
What is happening is, if I don't create a data stream properly it doesn't
call the sink properly because it doesn't execute the methods under the
Cassandra Sink class.

What should I do?

On Thu, Jan 6, 2022 at 7:58 AM Caizhi Weng  wrote:

> Hi!
>
> This is because ProcessFunction#processElement is a must while all methods
> in SinkFunction are not mandatory (for example you can create a sink which
> just discards all records by directly implementing SinkFunction). However
> if you want your sink to be more useful you'll have to see which methods in
> SinkFunction you need to implement. For example you can deal with the
> records fed to the sink in the invoke method or clean up the resources in
> the finish method.
>
> Siddhesh Kalgaonkar  于2022年1月6日周四 03:11写道:
>
>> I have implemented a Cassandra sink and when I am trying to call it from
>> another class via DataStream it is not calling any of the methods. I tried
>> extending other interfaces like ProcessFunction and it is forcing me to
>> implement its methods whereas. when it comes to RichSinkFunction it doesn't
>> force me to do it. Is my problem due to this? or there is something else to
>> it?
>>
>>
>> https://stackoverflow.com/questions/70592174/richsinkfunction-for-cassandra-in-flink/70593375
>>
>


extending RichSinkFunction doesn't force to implement any of its methods

2022-01-05 Thread Siddhesh Kalgaonkar
I have implemented a Cassandra sink and when I am trying to call it from
another class via DataStream it is not calling any of the methods. I tried
extending other interfaces like ProcessFunction and it is forcing me to
implement its methods whereas. when it comes to RichSinkFunction it doesn't
force me to do it. Is my problem due to this? or there is something else to
it?

https://stackoverflow.com/questions/70592174/richsinkfunction-for-cassandra-in-flink/70593375


Passing msg and record to the process function

2022-01-05 Thread Siddhesh Kalgaonkar
I have written a process function where I am parsing the JSON and if it is
not according to the expected format it passes as Failure to the process
function and I print the records which are working fine. Now, I was trying
to print the message and the record in case of Success and Failure. I
implemented the below code and it gave me the error. What exactly I am
missing?

package KafkaAsSource

import com.fasterxml.jackson.databind.ObjectMapper
import com.networknt.schema.{JsonSchemaFactory, SpecVersion}
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala.OutputTag
import org.apache.flink.util.Collector
import scala.jdk.CollectionConverters._
import scala.util.{Failure, Success, Try}

class JSONParsingProcessFunction extends ProcessFunction[String,String] {
  val outputTag = new OutputTag[String]("failed")

  def parseJson(json: String): Either[String, String] = {
val schemaJsonString =
  """
{
"$schema": "http://json-schema.org/draft-04/schema#;,
"title": "Product",
"description": "A product from the catalog",
"type": "object",
"properties": {
"id": {
"description": "The unique identifier for a product",
"type": "integer"
},
"premium": {
"description": "Annual Premium",
"type": "integer"
},
"eventTime": {
"description": "Timestamp at which record has arrived at
source / generated",
"type": "string"
}
},
"required": ["id", "premium","eventTime"]
}
"""
Try {
  val schema =
JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.V4).getSchema(schemaJsonString)
  // You can read a JSON object from String, a file, URL, etc.
  val parsedJson = new ObjectMapper().readTree(json)
  val validationMessages = schema.validate(parsedJson).asScala
  //validationMessages.foreach(msg => println(msg.getMessage))
  require(validationMessages.isEmpty)
  //parsedJson.toString()
  if(validationMessages.isEmpty)
{
  
(parsedJson.toString(),validationMessages.foreach((msg=>msg.getMessage.toString)))
}
  else
{
  (parsedJson.toString(),"Format is correct...")
}

}
match {
  case Success(x) => {
println("Good: " + x)
Right(x)
  }
  case Failure(err) => {
println("Bad:  " + json)
Left(json)
  }
}
  }
  override def processElement(i: String, context:
ProcessFunction[String, String]#Context, collector:
Collector[String]): Unit = {
parseJson(i) match {
  case Right(data) => {
collector.collect(data)
println("Good Records: " + data)
  }
  case Left(json) => {
context.output(outputTag, json)
println("Bad Records: " + json)
  }
}
  }
}


Error:

type mismatch;
 found   : (String, Any)
 required: String
Right(x)


Error while writing process functions

2022-01-04 Thread Siddhesh Kalgaonkar
After a lot of struggle with the pure Jackson library which doesn't have a
strict mode within it due to which I wasn't able to validate the JSON
schema. I finally found one way of doing it but now I am not able to map
the correct *Success* and *Failure* messages in order to call the Process
Function.

Below is my code:

case class Premium(id: String, premium: Long, eventTime: String)

class Splitter extends ProcessFunction[String,Premium] {
  val outputTag = new OutputTag[String]("failed")

  def fromJson[T](json: String)(implicit m: Manifest[T]): Either[String, T] = {
Try {
  val schema =
JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.V4).getSchema(schemaJsonString)
  // You can read a JSON object from String, a file, URL, etc.
  val parsedJson = new ObjectMapper().readTree(sampleJsonString)
  val validationMessages = schema.validate(parsedJson).asScala
  validationMessages.foreach(msg => println(msg.getMessage))
} match {
  case Success(x) => {
println(" Good: " + x)
Right(x)
  }
  case Failure(err) => {
println("Bad:  " + json)
Left(json)
  }
}
  }

  override def processElement(i: String, context:
ProcessFunction[String, Premium]#Context, collector:
Collector[Premium]): Unit = {
fromJson(i) match {
  case Right(data) => {
collector.collect(data)
println("Good Records: " + data)
  }
  case Left(json) => {
context.output(outputTag, json)
println("Bad Records: " + json)
  }
}
  }
}

Error:

type mismatch;
 found   : x.type (with underlying type Unit)
 required: T
Right(x)


Re: TypeInformation | Flink

2021-12-30 Thread Siddhesh Kalgaonkar
Hi Team,

Dominik has answered the question but I am trying to debug the code but
since I am new I am not able to understand the code. I think something
still needs to be changed in his answer. Can somebody help me to understand
that snippet? The user who answered is not much active it seems. I tried
testing his answer but it didn't work as expected.

Thanks,
Sid

On Wed, Dec 29, 2021 at 10:58 PM Siddhesh Kalgaonkar <
kalgaonkarsiddh...@gmail.com> wrote:

> Hi David,
>
> Yes, I already mentioned that I am a newbie to Flink and Scala. I am
> making progress as the day progresses. I have modified my question again.
> But I am not sure how to use it. Could you please correct it? or add
> something if I missed something?
>
> On Wed, Dec 29, 2021 at 10:53 PM David Morávek  wrote:
>
>> Hi Siddhesh,
>>
>> You can not change the method signature when you're implementing an
>> interface.
>>
>> I'm not really sure this belongs to the ML anymore as this is getting
>> more into Scala / Java fundamentals. There are some great learning
>> resources online for Scala [1], I'd recommend starting from there. Also if
>> you're are not familiar with Scala I'd highly recommend starting with Java
>> API first as it's way more intuitive to use with Flink as you don't have to
>> deal with Scala / Java interoperability.
>>
>> [1]
>> https://docs.scala-lang.org/scala3/book/interacting-with-java.html#extending-java-interfaces-in-scala
>>
>> On Wed, Dec 29, 2021 at 5:59 PM Siddhesh Kalgaonkar <
>> kalgaonkarsiddh...@gmail.com> wrote:
>>
>>> I have modified my question based on Dominik's inputs. Can somebody help
>>> to take it forward?
>>>
>>> Thanks,
>>> Siddhesh
>>>
>>> On Wed, Dec 29, 2021 at 3:32 PM David Morávek  wrote:
>>>
>>>> Please always try to include user@f.a.o in your reply, so other can
>>>> participate in the discussion and learn from your findings.
>>>>
>>>> I think Dominik has already given you pretty good hint. The JSON
>>>> parsing in this case is not any different as with any other java
>>>> application (with jackson / gson / ...). You can then simply split the
>>>> parsed elements into good and bad records.
>>>>
>>>> D.
>>>>
>>>> On Wed, Dec 29, 2021 at 10:53 AM Siddhesh Kalgaonkar <
>>>> kalgaonkarsiddh...@gmail.com> wrote:
>>>>
>>>>> Hi David,
>>>>>
>>>>> Thanks for the clarification. I will check the link you shared. Also,
>>>>> as mentioned by Dominik, can you help me with the process functions. How
>>>>> can I use it for my use case?
>>>>>
>>>>> Thanks,
>>>>> Siddhesh
>>>>>
>>>>> On Wed, Dec 29, 2021 at 2:50 PM David Morávek  wrote:
>>>>>
>>>>>> Hi Siddhesh,
>>>>>>
>>>>>> it seems that the question is already being answered in the SO
>>>>>> thread, so let's keep the discussion focused there.
>>>>>>
>>>>>> Looking at the original question, I think it's important to
>>>>>> understand, that the TypeInformation is not meant to be used for 
>>>>>> "runtime"
>>>>>> matching, but to address the type erasure [1] limitation for the UDFs 
>>>>>> (user
>>>>>> defined functions), so Flink can pick the correct serializer / 
>>>>>> deserializer.
>>>>>>
>>>>>> [1]
>>>>>> https://docs.oracle.com/javase/tutorial/java/generics/erasure.html
>>>>>>
>>>>>> Best,
>>>>>> D.
>>>>>>
>>>>>> On Tue, Dec 28, 2021 at 9:21 PM Siddhesh Kalgaonkar <
>>>>>> kalgaonkarsiddh...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Team,
>>>>>>>
>>>>>>> I am a newbie to Flink and Scala and trying my best to learn
>>>>>>> everything I can. I doing a practice where  I am getting incoming JSON 
>>>>>>> data
>>>>>>> from the Kafka topic and want to perform a data type check on it.
>>>>>>> For that, I came across TypeInformation of Flink. Please read my
>>>>>>> problem in detail from the below link:
>>>>>>>
>>>>>>> Flink Problem
>>>>>>> <https://stackoverflow.com/questions/70500023/typeinformation-in-flink-to-compare-the-datatypes-dynamically>
>>>>>>>
>>>>>>> I went through the documentation but didn't come across any relevant
>>>>>>> examples. Any suggestions would help.
>>>>>>>
>>>>>>> Looking forward to hearing from you.
>>>>>>>
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Siddhesh
>>>>>>>
>>>>>>


Re: TypeInformation | Flink

2021-12-29 Thread Siddhesh Kalgaonkar
Hi David,

Yes, I already mentioned that I am a newbie to Flink and Scala. I am making
progress as the day progresses. I have modified my question again. But I am
not sure how to use it. Could you please correct it? or add something if I
missed something?

On Wed, Dec 29, 2021 at 10:53 PM David Morávek  wrote:

> Hi Siddhesh,
>
> You can not change the method signature when you're implementing an
> interface.
>
> I'm not really sure this belongs to the ML anymore as this is getting more
> into Scala / Java fundamentals. There are some great learning resources
> online for Scala [1], I'd recommend starting from there. Also if you're are
> not familiar with Scala I'd highly recommend starting with Java API first
> as it's way more intuitive to use with Flink as you don't have to deal with
> Scala / Java interoperability.
>
> [1]
> https://docs.scala-lang.org/scala3/book/interacting-with-java.html#extending-java-interfaces-in-scala
>
> On Wed, Dec 29, 2021 at 5:59 PM Siddhesh Kalgaonkar <
> kalgaonkarsiddh...@gmail.com> wrote:
>
>> I have modified my question based on Dominik's inputs. Can somebody help
>> to take it forward?
>>
>> Thanks,
>> Siddhesh
>>
>> On Wed, Dec 29, 2021 at 3:32 PM David Morávek  wrote:
>>
>>> Please always try to include user@f.a.o in your reply, so other can
>>> participate in the discussion and learn from your findings.
>>>
>>> I think Dominik has already given you pretty good hint. The JSON parsing
>>> in this case is not any different as with any other java application (with
>>> jackson / gson / ...). You can then simply split the parsed elements into
>>> good and bad records.
>>>
>>> D.
>>>
>>> On Wed, Dec 29, 2021 at 10:53 AM Siddhesh Kalgaonkar <
>>> kalgaonkarsiddh...@gmail.com> wrote:
>>>
>>>> Hi David,
>>>>
>>>> Thanks for the clarification. I will check the link you shared. Also,
>>>> as mentioned by Dominik, can you help me with the process functions. How
>>>> can I use it for my use case?
>>>>
>>>> Thanks,
>>>> Siddhesh
>>>>
>>>> On Wed, Dec 29, 2021 at 2:50 PM David Morávek  wrote:
>>>>
>>>>> Hi Siddhesh,
>>>>>
>>>>> it seems that the question is already being answered in the SO thread,
>>>>> so let's keep the discussion focused there.
>>>>>
>>>>> Looking at the original question, I think it's important to
>>>>> understand, that the TypeInformation is not meant to be used for "runtime"
>>>>> matching, but to address the type erasure [1] limitation for the UDFs 
>>>>> (user
>>>>> defined functions), so Flink can pick the correct serializer / 
>>>>> deserializer.
>>>>>
>>>>> [1] https://docs.oracle.com/javase/tutorial/java/generics/erasure.html
>>>>>
>>>>> Best,
>>>>> D.
>>>>>
>>>>> On Tue, Dec 28, 2021 at 9:21 PM Siddhesh Kalgaonkar <
>>>>> kalgaonkarsiddh...@gmail.com> wrote:
>>>>>
>>>>>> Hi Team,
>>>>>>
>>>>>> I am a newbie to Flink and Scala and trying my best to learn
>>>>>> everything I can. I doing a practice where  I am getting incoming JSON 
>>>>>> data
>>>>>> from the Kafka topic and want to perform a data type check on it.
>>>>>> For that, I came across TypeInformation of Flink. Please read my
>>>>>> problem in detail from the below link:
>>>>>>
>>>>>> Flink Problem
>>>>>> <https://stackoverflow.com/questions/70500023/typeinformation-in-flink-to-compare-the-datatypes-dynamically>
>>>>>>
>>>>>> I went through the documentation but didn't come across any relevant
>>>>>> examples. Any suggestions would help.
>>>>>>
>>>>>> Looking forward to hearing from you.
>>>>>>
>>>>>>
>>>>>> Thanks,
>>>>>> Siddhesh
>>>>>>
>>>>>


Re: TypeInformation | Flink

2021-12-29 Thread Siddhesh Kalgaonkar
I have modified my question based on Dominik's inputs. Can somebody help to
take it forward?

Thanks,
Siddhesh

On Wed, Dec 29, 2021 at 3:32 PM David Morávek  wrote:

> Please always try to include user@f.a.o in your reply, so other can
> participate in the discussion and learn from your findings.
>
> I think Dominik has already given you pretty good hint. The JSON parsing
> in this case is not any different as with any other java application (with
> jackson / gson / ...). You can then simply split the parsed elements into
> good and bad records.
>
> D.
>
> On Wed, Dec 29, 2021 at 10:53 AM Siddhesh Kalgaonkar <
> kalgaonkarsiddh...@gmail.com> wrote:
>
>> Hi David,
>>
>> Thanks for the clarification. I will check the link you shared. Also, as
>> mentioned by Dominik, can you help me with the process functions. How can I
>> use it for my use case?
>>
>> Thanks,
>> Siddhesh
>>
>> On Wed, Dec 29, 2021 at 2:50 PM David Morávek  wrote:
>>
>>> Hi Siddhesh,
>>>
>>> it seems that the question is already being answered in the SO thread,
>>> so let's keep the discussion focused there.
>>>
>>> Looking at the original question, I think it's important to understand,
>>> that the TypeInformation is not meant to be used for "runtime" matching,
>>> but to address the type erasure [1] limitation for the UDFs (user defined
>>> functions), so Flink can pick the correct serializer / deserializer.
>>>
>>> [1] https://docs.oracle.com/javase/tutorial/java/generics/erasure.html
>>>
>>> Best,
>>> D.
>>>
>>> On Tue, Dec 28, 2021 at 9:21 PM Siddhesh Kalgaonkar <
>>> kalgaonkarsiddh...@gmail.com> wrote:
>>>
>>>> Hi Team,
>>>>
>>>> I am a newbie to Flink and Scala and trying my best to learn everything
>>>> I can. I doing a practice where  I am getting incoming JSON data from the
>>>> Kafka topic and want to perform a data type check on it.
>>>> For that, I came across TypeInformation of Flink. Please read my
>>>> problem in detail from the below link:
>>>>
>>>> Flink Problem
>>>> <https://stackoverflow.com/questions/70500023/typeinformation-in-flink-to-compare-the-datatypes-dynamically>
>>>>
>>>> I went through the documentation but didn't come across any relevant
>>>> examples. Any suggestions would help.
>>>>
>>>> Looking forward to hearing from you.
>>>>
>>>>
>>>> Thanks,
>>>> Siddhesh
>>>>
>>>


Re: TypeInformation | Flink

2021-12-29 Thread Siddhesh Kalgaonkar
Okay, David. Let me try it and will let you know.

On Wed, Dec 29, 2021 at 3:32 PM David Morávek  wrote:

> Please always try to include user@f.a.o in your reply, so other can
> participate in the discussion and learn from your findings.
>
> I think Dominik has already given you pretty good hint. The JSON parsing
> in this case is not any different as with any other java application (with
> jackson / gson / ...). You can then simply split the parsed elements into
> good and bad records.
>
> D.
>
> On Wed, Dec 29, 2021 at 10:53 AM Siddhesh Kalgaonkar <
> kalgaonkarsiddh...@gmail.com> wrote:
>
>> Hi David,
>>
>> Thanks for the clarification. I will check the link you shared. Also, as
>> mentioned by Dominik, can you help me with the process functions. How can I
>> use it for my use case?
>>
>> Thanks,
>> Siddhesh
>>
>> On Wed, Dec 29, 2021 at 2:50 PM David Morávek  wrote:
>>
>>> Hi Siddhesh,
>>>
>>> it seems that the question is already being answered in the SO thread,
>>> so let's keep the discussion focused there.
>>>
>>> Looking at the original question, I think it's important to understand,
>>> that the TypeInformation is not meant to be used for "runtime" matching,
>>> but to address the type erasure [1] limitation for the UDFs (user defined
>>> functions), so Flink can pick the correct serializer / deserializer.
>>>
>>> [1] https://docs.oracle.com/javase/tutorial/java/generics/erasure.html
>>>
>>> Best,
>>> D.
>>>
>>> On Tue, Dec 28, 2021 at 9:21 PM Siddhesh Kalgaonkar <
>>> kalgaonkarsiddh...@gmail.com> wrote:
>>>
>>>> Hi Team,
>>>>
>>>> I am a newbie to Flink and Scala and trying my best to learn everything
>>>> I can. I doing a practice where  I am getting incoming JSON data from the
>>>> Kafka topic and want to perform a data type check on it.
>>>> For that, I came across TypeInformation of Flink. Please read my
>>>> problem in detail from the below link:
>>>>
>>>> Flink Problem
>>>> <https://stackoverflow.com/questions/70500023/typeinformation-in-flink-to-compare-the-datatypes-dynamically>
>>>>
>>>> I went through the documentation but didn't come across any relevant
>>>> examples. Any suggestions would help.
>>>>
>>>> Looking forward to hearing from you.
>>>>
>>>>
>>>> Thanks,
>>>> Siddhesh
>>>>
>>>


Re: TypeInformation | Flink

2021-12-29 Thread Siddhesh Kalgaonkar
Hi David,

Thanks for the clarification. I will check the link you shared. Also, as
mentioned by Dominik, can you help me with the process functions. How can I
use it for my use case?

Thanks,
Siddhesh


On Wed, Dec 29, 2021 at 3:22 PM Siddhesh Kalgaonkar <
kalgaonkarsiddh...@gmail.com> wrote:

> Hi David,
>
> Thanks for the clarification. I will check the link you shared. Also, as
> mentioned by Dominik, can you help me with the process functions. How can I
> use it for my use case?
>
> Thanks,
> Siddhesh
>
> On Wed, Dec 29, 2021 at 2:50 PM David Morávek  wrote:
>
>> Hi Siddhesh,
>>
>> it seems that the question is already being answered in the SO thread, so
>> let's keep the discussion focused there.
>>
>> Looking at the original question, I think it's important to understand,
>> that the TypeInformation is not meant to be used for "runtime" matching,
>> but to address the type erasure [1] limitation for the UDFs (user defined
>> functions), so Flink can pick the correct serializer / deserializer.
>>
>> [1] https://docs.oracle.com/javase/tutorial/java/generics/erasure.html
>>
>> Best,
>> D.
>>
>> On Tue, Dec 28, 2021 at 9:21 PM Siddhesh Kalgaonkar <
>> kalgaonkarsiddh...@gmail.com> wrote:
>>
>>> Hi Team,
>>>
>>> I am a newbie to Flink and Scala and trying my best to learn everything
>>> I can. I doing a practice where  I am getting incoming JSON data from the
>>> Kafka topic and want to perform a data type check on it.
>>> For that, I came across TypeInformation of Flink. Please read my problem
>>> in detail from the below link:
>>>
>>> Flink Problem
>>> <https://stackoverflow.com/questions/70500023/typeinformation-in-flink-to-compare-the-datatypes-dynamically>
>>>
>>> I went through the documentation but didn't come across any relevant
>>> examples. Any suggestions would help.
>>>
>>> Looking forward to hearing from you.
>>>
>>>
>>> Thanks,
>>> Siddhesh
>>>
>>


TypeInformation | Flink

2021-12-28 Thread Siddhesh Kalgaonkar
Hi Team,

I am a newbie to Flink and Scala and trying my best to learn everything I
can. I doing a practice where  I am getting incoming JSON data from the
Kafka topic and want to perform a data type check on it.
For that, I came across TypeInformation of Flink. Please read my problem in
detail from the below link:

Flink Problem


I went through the documentation but didn't come across any relevant
examples. Any suggestions would help.

Looking forward to hearing from you.


Thanks,
Siddhesh