Flink-kubernetes-operator with Argocd

2023-04-24 Thread Sweta Kalakuntla via user
Hi,

Can anyone who has implemented flink-kubernetes-operator with Argocd share
an example code?

Thank you,
Sweta

--


Re: Recover watermark from savepoint

2022-07-05 Thread Sweta Kalakuntla
Hi Thias,

Thank you for providing a detailed explanation. We did something similar.

The job is set to 0 late events, aggregates every 20 min and sends out the
value. So we are saving the last processed window per key in the state.
During aggregation, if current window > last window, then processes else
discard. This will be applied even during savepoint recovery and discards
any aggregations due to late events.

Thank you,
Sweta




On Fri, Jun 10, 2022 at 2:38 AM Schwalbe Matthias <
matthias.schwa...@viseca.ch> wrote:

> Hi Sweta,
>
>
>
> It is actually a sound idea to implement a dedicated process function for
> this purpose, as David suggests.
>
> Especially if you are in a situation where waiting for a valid natural
> watermark after a restore from savepoint is not sufficient.
>
>
>
> We had a situation with input streams of different update frequencies (one
> only updated once a day, and hence only generated watermarks once a day).
>
>
>
> This is how you can approach the specific task of
>
>- watermark storing:
>   - Create a process function
>   - Create a map that stores the latest watermark per sub-partition
>   (i.e. there are 128 sub-partitions in a job with max-parallelism of 128)
>   - Store this map into operator state with each checkpoint
>   - Create a repeating processing time timer (with high frequency
>   according to your needs), in order to yield a watermark after savepoint
>   restore
>- watermark restoring:
>   - when restoring from operator state (because there might have been
>   a change in parallelism):
>   - determine the lowest watermark among all sub-partition that
>   belong to the respective subtask (on operator state restore)
>   - yield this watermark in processing time handler of above timer
>   (once)
>
>
>
> Feel free to ask details, I hope this helps … I need to ask my folks
> whether I can share our implementation (20 lines of code, odd).
>
>
>
> What do you think?
>
>
>
> Thias
>
>
>
>
>
> *From:* David Anderson 
> *Sent:* Thursday, June 9, 2022 11:35 AM
> *To:* User-Flink 
> *Subject:* Re: Recover watermark from savepoint
>
>
>
> Sweta,
>
>
>
> Flink does not include watermarks in savepoints, nor are they included in
> aligned checkpoints. For what it's worth, I believe that with unaligned
> checkpoints in-flight watermarks are included in checkpoints, but I don't
> believe that would solve the problem, since the watermark strategy's state
> is still lost during a restart.
>
>
>
> I can't think of any way to guarantee that all possibly late events will
> be deterministically identified as late. The commonly used
> bounded-out-of-orderness watermark strategy doesn't guarantee this either,
> even without a restart (because watermarks are delayed by the auto
> watermark interval, rather than being produced at every conceivable
> opportunity).
>
>
>
> If this is a strong requirement, you could decide not to rely on
> watermarks for dropping late events, and implement the logic yourself in a
> process function.
>
>
>
> Best,
>
> David
>
>
>
> On Wed, Jun 8, 2022 at 6:10 PM Sweta Kalakuntla 
> wrote:
>
> Hi,
>
>
>
> I want to understand if flink saves a watermark during savepoint and if
> not, how do we achieve this?
>
>
>
> We are seeing an issue where on recovery, the job processes some late
> events which should have been discarded if the job were to be running
> without any downtime.
>
>
>
> Thank you,
>
> Sweta
>
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>


Re: slack invite link

2022-06-23 Thread Sweta Kalakuntla
Hi,

Please add me to the slack channel.

Thanks,
Sweta

On Sun, Jun 5, 2022 at 6:42 AM Jing Ge  wrote:

> Hi,
>
> Invites have been sent. Please check your emails. Thanks!
>
> Regards,
> JIng
>
> On Sun, Jun 5, 2022 at 12:38 PM Jay Ghiya  wrote:
>
>> Request community to share invite link for me at ghiya6...@gmail.com and
>> jay.gh...@ge.com
>>
>


Recover watermark from savepoint

2022-06-08 Thread Sweta Kalakuntla
Hi,

I want to understand if flink saves a watermark during savepoint and if
not, how do we achieve this?

We are seeing an issue where on recovery, the job processes some late
events which should have been discarded if the job were to be running
without any downtime.

Thank you,
Sweta


Re: Issue with Flink UI for Flink 1.14.0

2022-02-08 Thread Sweta Kalakuntla
I am facing the same issue, do we know when 1.14.4 will be released?

Thanks.

On Fri, Jan 21, 2022 at 3:28 AM Chesnay Schepler  wrote:

> While FLINK-24550 was indeed fixed unfortunately a similar bug was also
> introduced (https://issues.apache.org/jira/browse/FLINK-25732).
> On 20/01/2022 21:18, Peter Westermann wrote:
>
> Just tried this again with Flink 1.14.3 since
> https://issues.apache.org/jira/browse/FLINK-24550 is listed as fixed. I
> am running into similar errors when calling the /v1/jobs/overview endpoint
> (without any running jobs):
>
> {"errors":["Internal server error."," side:\norg.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException:
> Failed to serialize the result for RPC call :
> requestMultipleJobDetails.\n\tat
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.serializeRemoteResultAndVerifySize(AkkaRpcActor.java:417)\n\tat
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$sendAsyncResponse$2(AkkaRpcActor.java:373)\n\tat
> java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930)\n\tat
> java.base/java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:946)\n\tat
> java.base/java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2266)\n\tat
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.sendAsyncResponse(AkkaRpcActor.java:365)\n\tat
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:332)\n\tat
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)\n\tat
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)\n\tat
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)\n\tat
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)\n\tat
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)\n\tat
> scala.PartialFunction.applyOrElse(PartialFunction.scala:123)\n\tat
> scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)\n\tat
> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)\n\tat
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)\n\tat
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)\n\tat
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)\n\tat
> akka.actor.Actor.aroundReceive(Actor.scala:537)\n\tat
> akka.actor.Actor.aroundReceive$(Actor.scala:535)\n\tat
> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)\n\tat
> akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)\n\tat
> akka.actor.ActorCell.invoke(ActorCell.scala:548)\n\tat
> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)\n\tat
> akka.dispatch.Mailbox.run(Mailbox.scala:231)\n\tat
> akka.dispatch.Mailbox.exec(Mailbox.scala:243)\n\tat
> java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)\n\tat
> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)\n\tat
> java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)\n\tat
> java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)\n\tat
> java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)\nCaused
> by: java.io.NotSerializableException: java.util.HashMap$Values\n\tat
> java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1185)\n\tat
> java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)\n\tat
> java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)\n\tat
> java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)\n\tat
> java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)\n\tat
> java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:349)\n\tat
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:632)\n\tat
> org.apache.flink.runtime.rpc.akka.AkkaRpcSerializedValue.valueOf(AkkaRpcSerializedValue.java:66)\n\tat
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.serializeRemoteResultAndVerifySize(AkkaRpcActor.java:400)\n\t...
> 30 more\n\nEnd of exception on server side>"]}
>
>
>
>
>
>
>
> Peter Westermann
>
> Team Lead – Realtime Analytics
>
> [image: cidimage001.jpg@01D78D4C.C00AC080]
>
> peter.westerm...@genesys.com
>
> [image: cidimage001.jpg@01D78D4C.C00AC080]
>
> [image: cidimage002.jpg@01D78D4C.C00AC080] 
>
>
>
>
>
> *From: *Dawid Wysakowicz  
> *Date: *Thursday, October 14, 2021 at 10:00 AM
> *To: *Peter Westermann 
> , user@flink.apache.org 
> 
> *Subject: *Re: Issue with Flink UI for Flink 1.14.0
>
> I am afraid it is a bug in flink 1.14. I created a ticket for it
> FLINK-24550[1]. I believe we should pick it up soonish. Thanks for
> reporting the issue!
>
> Best,
>
> Dawid
>
> [1] https://issues.apache.org/jira/browse/FLINK-24550
>
> On 13/10/2021 20:32, Peter Westermann wrote:
>
> Hello,
>
>
>
> I 

Re: Upgrade to 1.14.3

2022-01-25 Thread Sweta Kalakuntla
Hi Ingo,

So basically, I cannot deploy an older version of flink job in 1.14.3 flink
cluster, is it?

Thanks,
Sweta

On Tue, Jan 25, 2022 at 4:02 AM Ingo Bürk  wrote:

> Hi Sweta,
>
> there was a non-compatible change to SourceReaderContext#metricGroup in
> the 1.14.x release line; I assume this is what you are seeing.
>
> Did you make sure to update the connector (and any other) dependencies
> as well?
>
>
> Best
> Ingo
>
> On 25.01.22 05:36, Sweta Kalakuntla wrote:
> > Hi,
> >
> > We are on flink 1.13.3 and trying to upgrade  the cluster to 1.14.3
> > version. I see that job(on 1.13.3) is unable to start up because it says
> > it couldn't find metrics group(inside flinkkafkaconsumer class).
> >
> > - can I deploy 1.13.3 job on 1.14.3 cluster?
> > - can I deploy 1.14.3 job on 1.13.3 cluster?
> > - How do I upgrade to 1.14.3 cluster without loosing running apps state?
> > I have even tried doing savepoint that did not revive the job.
> >
> > Thank you,
> > Sweta
>


Upgrade to 1.14.3

2022-01-24 Thread Sweta Kalakuntla
Hi,

We are on flink 1.13.3 and trying to upgrade  the cluster to 1.14.3
version. I see that job(on 1.13.3) is unable to start up because it says it
couldn't find metrics group(inside flinkkafkaconsumer class).

- can I deploy 1.13.3 job on 1.14.3 cluster?
- can I deploy 1.14.3 job on 1.13.3 cluster?
- How do I upgrade to 1.14.3 cluster without loosing running apps state? I
have even tried doing savepoint that did not revive the job.

Thank you,
Sweta


Broadcast and watermark

2021-12-02 Thread Sweta Kalakuntla
Hi,

I am using a broadcast pattern for publishing rules and aggregating the
data(https://flink.apache.org/news/2020/01/15/demo-fraud-detection.html).
My use case is similar and also the code.
One thing I wanted to capture is to figure out any latevents if any and
send them to a sink. But when there are events already on the kafka topic
which weren't consumed and start the app after a couple of hours I see
output timestamps messed up.

timestamp: 2021-12-02T04:48:20.324+, watermark:
292269055-12-02T16:47:04.192+, timeService.watermark:
292269055-12-02T16:47:04.192+

I have watermark strategy set on KafkaSource as:

WatermarkStrategy wmStrategy = WatermarkStrategy

.forBoundedOutOfOrderness(Duration.ofSeconds(config.get(OUT_OF_ORDERNESS)))
.withTimestampAssigner((cdrRecord, timestamp) ->
record.getEventTime());
return

env.addSource(recordSource.assignTimestampsAndWatermarks(wmStrategy))
.name("records Source")
.setParallelism(config.get(SOURCE_PARALLELISM));

Please let me know if you need any more information.

Thanks,
Sweta


Re: Upgrade from 1.13.1 to 1.13.2/1.13.3 failing

2021-11-10 Thread Sweta Kalakuntla
Thank you for your response.

On Tue, Nov 9, 2021 at 6:55 AM Dawid Wysakowicz 
wrote:

> Hey Sweta,
>
> Sorry I did not get back to you earlier.
>
> Could you explain how do you do the upgrade? Do you try to upgrade your
> cluster through HA services (e.g. zookeeper)? Meaning you bring down the
> 1.13.1 cluster down and start a 1.13.2/3 cluster which you intend to pick
> up the job automatically along with the latest checkpoint? Am I guessing
> correct? As far as I can tell we do not support such a way of upgrading.
>
> The way we support upgrades is via a savepoint/checkpoint. I'd suggest to
> either take a savepoint on 1.13.1 and restore[1] the job on 1.13.2 cluster
> or use an externalized checkpoint created from 1.13.1.
>
> Best,
>
> Dawid
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/cli/#starting-a-job-from-a-savepoint
> On 22/10/2021 16:39, Chesnay Schepler wrote:
>
> The only suggestion I can offer is to take a savepoint with 1.13.1 and try
> to restore from that.
>
> We will investigate the problem in
> https://issues.apache.org/jira/browse/FLINK-24621; currently we don't
> know why you are experiencing this issue.
>
> On 22/10/2021 16:02, Sweta Kalakuntla wrote:
>
> Hi,
>
> We are seeing error while upgrading minor versions from 1.13.1 to 1.13.2.
> JobManager is unable to recover the checkpoint state. What would be the
> solution to this issue?
>
> Caused by: org.apache.flink.util.FlinkException: Could not retrieve
> checkpoint 2844 from state handle under checkpointID-0002844.
> This indicates that the retrieved state handle is broken. Try cleaning the
> state handle store.
> at
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore.retrieveCompletedCheckpoint(DefaultCompletedCheckpointStore.java:309)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore.recover(DefaultCompletedCheckpointStore.java:151)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1513)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreInitialCheckpointIfPresent(CheckpointCoordinator.java:1476)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:134)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:342)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:190)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:122)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:132)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:110)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:340)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:317)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:107)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source)
> ~[?:?]
> at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
> ~[?:?]
> at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
> Source) ~[?:?]
> at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
> ~[?:?]
> at java.lang.Thread.run(Unknown Source) ~[?:?]
> Caused by: java.io.InvalidClassException:
> org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor$No

Upgrade from 1.13.1 to 1.13.2/1.13.3 failing

2021-10-22 Thread Sweta Kalakuntla
Hi,

We are seeing error while upgrading minor versions from 1.13.1 to 1.13.2.
JobManager is unable to recover the checkpoint state. What would be the
solution to this issue?

Caused by: org.apache.flink.util.FlinkException: Could not retrieve
checkpoint 2844 from state handle under checkpointID-0002844.
This indicates that the retrieved state handle is broken. Try cleaning the
state handle store.
at
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore.retrieveCompletedCheckpoint(DefaultCompletedCheckpointStore.java:309)
~[flink-dist_2.12-1.13.3.jar:1.13.3]
at
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore.recover(DefaultCompletedCheckpointStore.java:151)
~[flink-dist_2.12-1.13.3.jar:1.13.3]
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1513)
~[flink-dist_2.12-1.13.3.jar:1.13.3]
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreInitialCheckpointIfPresent(CheckpointCoordinator.java:1476)
~[flink-dist_2.12-1.13.3.jar:1.13.3]
at
org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:134)
~[flink-dist_2.12-1.13.3.jar:1.13.3]
at
org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:342)
~[flink-dist_2.12-1.13.3.jar:1.13.3]
at
org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:190)
~[flink-dist_2.12-1.13.3.jar:1.13.3]
at
org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:122)
~[flink-dist_2.12-1.13.3.jar:1.13.3]
at
org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:132)
~[flink-dist_2.12-1.13.3.jar:1.13.3]
at
org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:110)
~[flink-dist_2.12-1.13.3.jar:1.13.3]
at
org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:340)
~[flink-dist_2.12-1.13.3.jar:1.13.3]
at org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:317)
~[flink-dist_2.12-1.13.3.jar:1.13.3]
at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:107)
~[flink-dist_2.12-1.13.3.jar:1.13.3]
at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95)
~[flink-dist_2.12-1.13.3.jar:1.13.3]
at
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
~[flink-dist_2.12-1.13.3.jar:1.13.3]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source)
~[?:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
~[?:?]
at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
Source) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?]
at java.lang.Thread.run(Unknown Source) ~[?:?]
Caused by: java.io.InvalidClassException:
org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor$NoRescalingDescriptor;
local class incompatible: stream classdesc serialVersionUID =
-5544173933105855751, local class serialVersionUID = 1
at java.io.ObjectStreamClass.initNonProxy(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source) ~[?:?]



Thank you,

Sweta K


Re: Can we share state between different keys in the same window?

2021-07-20 Thread Sweta Kalakuntla
That is my understanding as well, but wanted to confirm.

My use case is I need to get additional data from an external database(not
Source) for the data I am reading off of Kafka Source and aggregate data
for a 10min window for a given key. But I did not want to query for every
key in that 10min window but query one time and share data between
different keys. Is there any way we can achieve this?

Thanks,
Sweta



On Mon, Jul 19, 2021 at 11:13 PM JING ZHANG  wrote:

> Hi sweta,
> State of different keys are isolated with each other. It means, you could
> read/write the state of current key in
> `ProcessFunction`/`KeyedProcessFunction`/`ProcessWindowFunction`, there is
> no possible to read/write state of other keys.
>
> Would you please describe your business demand, let's see if there is
> other way to satisfy the requirement.
>
> Best,
> JING ZHANG
>
> Sweta Kalakuntla  于2021年7月20日周二 上午5:04写道:
>
>> Hi,
>>
>> I need to query the database(not a source but for additional information)
>> in ProcessFunction. I want to save the results in a state or some other way
>> so that I can use the data for other keys in the same window. What are my
>> options?
>>
>> Thanks,
>> sweta
>>
>


Can we share state between different keys in the same window?

2021-07-19 Thread Sweta Kalakuntla
Hi,

I need to query the database(not a source but for additional information)
in ProcessFunction. I want to save the results in a state or some other way
so that I can use the data for other keys in the same window. What are my
options?

Thanks,
sweta


Session cluster configmap removal

2021-06-29 Thread Sweta Kalakuntla
Hi,

We have flink session clusters in kubernetes and several long running flink
jobs deployed in them with HA enabled. After we have enabled HA, we are
seeing configmaps created for every new job. Whenever we stop/cancel any
existing jobs, these configmaps do not get deleted. Is that right, these
configmaps will not be removed unless we shutdown the cluster?

Thanks,
Sweta


Flink jobs organization and maintainability

2021-02-23 Thread Sweta Kalakuntla
Hi,

I am going to have to implement many similar jobs. I need guidance and
examples that you may have for organizing them in the Git repository
without having to have one repo per job.

Thanks,
SK

--


Flink Kafka Table API for python with JAAS

2020-11-09 Thread Sweta Kalakuntla
Hi,

I am using Flink 1.11.2 version Python Table API to connect to Kafka Topic
using SASL protocol but it fails with the following error. I tried the same
properties in Flink java version, and I am able to connect. Has anyone
faced this issue and how did you resolve it?

Error:

Caused by: javax.security.auth.login.LoginException: unable to find
LoginModule class:
org.apache.kafka.common.security.plain.PlainLoginModule

Kafka connection:

kafka = Kafka()\
.version("universal") \
.topic("test_topic")\
.property("group.id", "consumer_group")\
.property("security.protocol", "SASL_PLAINTEXT")\
.property("sasl.mechanism", "PLAIN")\
.property("bootstrap.servers",
  ":9093")\
.property("sasl.jaas.config",

"org.apache.kafka.common.security.plain.PlainLoginModule required
username=\"user\" "
  "password=\"abc\";")\
.start_from_latest()


Thank you,
SKala

-- 

[image: BandwidthBlue.png]

Sweta Kalakuntla  •  Software Engineer

900 Main Campus Drive, Raleigh, NC 27606

m: 216-702-1653

e: skalakun...@bandwidth.com