Re: MetricRegistryTestUtils java class (flink-runtime/metrics) not found in source code version 1.14.3

2022-02-25 Thread Mason Chen
Hi Prasanna,

Why do you need histograms vs summaries? I'm curious about the change and
want to see if it applies to my usage of the PrometheusReporter.

Best,
Mason

On Mon, Jan 31, 2022 at 11:51 PM Martijn Visser 
wrote:

> Hi Prasanna,
>
> Just a quick note that the Github links are all pointing to release
> candidate 1 for 1.14.3. The released version is in
> https://github.com/apache/flink/blob/release-1.14.3/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTestUtils.java
>
> Best regards,
>
> Martijn
>
> On Tue, 1 Feb 2022 at 07:35, Prasanna kumar 
> wrote:
>
>> NVM was able to find it in a different place
>>
>>
>> https://github.com/apache/flink/blob/release-1.14.3-rc1/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTestUtils.java
>>
>> On Tue, Feb 1, 2022 at 11:58 AM Prasanna kumar <
>> prasannakumarram...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Team, We are writing our own prometheus reporter to make sure that we
>>> are capturing data in histograms rather than summaries.
>>>
>>> We were able to do it successfully in version 1.12.7.
>>>
>>> But while upgrading to version 1.14.3 , we find
>>> that MetricRegistryTestUtils is not available in the src code given by
>>> flink in github.
>>>
>>> PrometheusReporterTest.java throws error that this file is unavailable
>>>
>>> [image: Screen Shot 2022-02-01 at 11.50.09 AM.png]
>>>
>>> [image: Screen Shot 2022-02-01 at 11.53.17 AM.png]
>>>
>>> The below code is in 1.12.7 but method
>>> defaultMetricRegistryConfiguration  is deprecated in the latest version.
>>>
>>> [image: Screen Shot 2022-02-01 at 11.51.46 AM.png]
>>>
>>> Looking at the Github link
>>> https://github.com/apache/flink/tree/release-1.14.3-rc1/flink-runtime/src/main/java/org/apache/flink/runtime/metrics
>>> also shows that the MetricRegistryTestUtils is not available. It's not
>>> available in
>>> https://github.com/apache/flink/tree/master/flink-runtime/src/main/java/org/apache/flink/runtime/metrics
>>> master branch as well.
>>>
>>> [image: Screen Shot 2022-02-01 at 11.55.19 AM.png]
>>>
>>> Could the community please add the Class file in GITHUB.
>>>
>>> Thanks,
>>> Prasanna.
>>>
>>


Re: Flink job recovery after task manager failure

2022-02-25 Thread Afek, Ifat (Nokia - IL/Kfar Sava)
Hi Zhilong,

I will check the issues you raised.

Thanks for your help,
Ifat

From: Zhilong Hong 
Date: Thursday, 24 February 2022 at 19:58
To: "Afek, Ifat (Nokia - IL/Kfar Sava)" 
Cc: "user@flink.apache.org" 
Subject: Re: Flink job recovery after task manager failure

Hi, Afek

I've read the log you provided. Since you've set the value of restart-strategy 
to be exponential-delay and the value of 
restart-strategy.exponential-delay.initial-backoff is 10s, everytime a failover 
is triggered, the JobManager will have to wait for 10 seconds before it 
restarts the job.If you'd prefer a quicker restart, you could change the 
restart strategy to fixed-delay and set a small value for 
restart-strategy.fixed-delay.delay.

Furthermore, there are two more failovers that happened during the 
initialization of recovered tasks. During the initialization of a task, it will 
try to recover the states from the last valid checkpoint. A FileNotFound 
exception happens during the recovery process. I'm not quite sure the reason. 
Since the recovery succeeds after two failovers, I think maybe it's because the 
local disks of your cluster are not stable.

Sincerely,
Zhilong

On Thu, Feb 24, 2022 at 9:04 PM Afek, Ifat (Nokia - IL/Kfar Sava) 
mailto:ifat.a...@nokia.com>> wrote:
Thanks Zhilong.

The first launch of our job is fast, I don’t think that’s the issue. I see in 
flink job manager log that there were several exceptions during the restart, 
and the task manager was restarted a few times until it was stabilized.

You can find the log here:
jobmanager-log.txt.gz

Thanks,
Ifat

From: Zhilong Hong mailto:zhlongh...@gmail.com>>
Date: Wednesday, 23 February 2022 at 19:38
To: "Afek, Ifat (Nokia - IL/Kfar Sava)" 
mailto:ifat.a...@nokia.com>>
Cc: "user@flink.apache.org" 
mailto:user@flink.apache.org>>
Subject: Re: Flink job recovery after task manager failure

Hi, Afek!

When a TaskManager is killed, JobManager will not be acknowledged until a 
heartbeat timeout happens. Currently, the default value of heartbeat.timeout is 
50 seconds [1]. That's why it takes more than 30 seconds for Flink to trigger a 
failover. If you'd like to shorten the time a failover is triggered in this 
situation, you could decrease the value of heartbeat.timeout in 
flink-conf.yaml. However, if the value is set too small, heartbeat timeouts 
will happen more frequently and the cluster will be unstable. As FLINK-23403 
[2] mentions, if you are using Flink 1.14 or 1.15, you could try to set the 
value to 10s.

You mentioned that it takes 5-6 minutes to restart the jobs. It seems a bit 
weird. How long does it take to deploy your job for a brand new launch? You 
could compact and upload the log of JobManager to Google Drive or OneDrive and 
attach the sharing link. Maybe we can find out what happens via the log.

Sincerely,
Zhilong

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#heartbeat-timeout
[2] https://issues.apache.org/jira/browse/FLINK-23403

On Thu, Feb 24, 2022 at 12:25 AM Afek, Ifat (Nokia - IL/Kfar Sava) 
mailto:ifat.a...@nokia.com>> wrote:
Hi,

I am trying to use Flink checkpoints solution in order to support task manager 
recovery.
I’m running flink using beam with filesystem storage and the following 
parameters:
checkpointingInterval=3
checkpointingMode=EXACTLY_ONCE.

What I see is that if I kill a task manager pod, it takes flink about 30 
seconds to identify the failure and another 5-6 minutes to restart the jobs.
Is there a way to shorten the downtime? What is an expected downtime in case 
the task manager is killed, until the jobs are recovered? Are there any best 
practices for handling it? (e.g. different configuration parameters)

Thanks,
Ifat



Re: Low Watermark

2022-02-25 Thread Piotr Nowojski
Hi,

It's the minimal watermark among all 10 parallel instances of that Task.

Using metric (currentInputWatermark) [1] you can access the watermark of
each of those 10 sub tasks individually.

Best,
Piotrek

[1] https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/

pt., 25 lut 2022 o 14:10 Isidoros Ioannou  napisał(a):

> Hello, could someone please explain what the Low Watermark indicates in
> the Flink UI in the attached image?
> I have event time enabled with a boundOutOfOrdernessStrategy of 3s for the
> incoming events and I use CEP with a within window of 5 minutes.
>


Re: Possible BUG in 1.15 SQL JSON_OBJECT()

2022-02-25 Thread Seth Wiesman
Thank you for reporting! That is definitely a bug, and I have opened a
ticket to fix which you can track here.

https://issues.apache.org/jira/browse/FLINK-26374

Seth

On Thu, Feb 24, 2022 at 4:18 PM Jonathan Weaver 
wrote:

> Using the latest SNAPSHOT BUILD.
>
> If I have a column definition as
>
>  .column(
> "events",
> DataTypes.ARRAY(
> DataTypes.ROW(
> DataTypes.FIELD("status",
> DataTypes.STRING().notNull()),
> DataTypes.FIELD("timestamp",
> DataTypes.STRING().notNull()),
> DataTypes.FIELD("increment_identifier",
> DataTypes.STRING().nullable()
>
> And a query as
>
> JSON_OBJECT('events' VALUE events) event_json
>
> Will generate JSON correctly ONLY if increment_identifier is NOT NULL but
> will throw a NullPointerException on the first record that has that column
> as null.
>
> Exception is not helpful.
>
> Exception in thread "main"
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> at
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
> at
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
> at
> java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
> at
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> at
> java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
> at
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:259)
> at
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
> at
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
> at
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> at
> java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
> at
> org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1389)
> at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
> at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
> at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
> at
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
> at
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
> at
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> at
> java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
> at
> org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47)
> at akka.dispatch.OnComplete.internal(Future.scala:300)
> at akka.dispatch.OnComplete.internal(Future.scala:297)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
> at
> org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)
> at
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
> at
> scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
> at
> scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
> at
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:621)
> at
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:24)
> at
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23)
> at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532)
> at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
> at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
> at
> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63)
> at
> akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100)
> at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
> at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
> at
> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
> at
> java.base/java.util.con

Low Watermark

2022-02-25 Thread Isidoros Ioannou
Hello, could someone please explain what the Low Watermark indicates in the
Flink UI in the attached image?
I have event time enabled with a boundOutOfOrdernessStrategy of 3s for the
incoming events and I use CEP with a within window of 5 minutes.


Pods are OOMKilled with RocksDB backend after a few checkpoints

2022-02-25 Thread Alexandre Montecucco
Hi all,

I am trying to reduce the memory usage of a Flink app.
There is about 25+Gb of state when persisted to checkpoint/savepoint. And a
fair amount of short lived objects as incoming traffic is fairly high.
So far, I have 8TM with 20GB each using Flink 1.12. I would like to reduce
the amount of memory I give, as the state will continue growing. I start my
application from an existing savepoint.

Given that CPU is not really an issue, I  switched to RocksDB backend, so
that state is serialized and supposedly much more compact in memory.
I am setting taskmanager.memory.process.size=2m and
taskmanager.memory.managed.size=6000m
(and tried other values ranging from 3000m to 1m).

The issue I observed is that the task manager pod memory is increasing
during each checkpoint and the 4th checkpoint fails because most of the
pods are OOMKilled. There is no java exception in the logs, so I really
suspect it is simply RocksDB using more memory than allocated.
I explicitly set state.backend.rocksdb.memory.managed=true to be sure.
I tried intervals of 2 minutes and 5 minutes for the checkpoint, and it
always seems to fail during the 4th checkpoint.

I tried incremental checkpoints and after 30 checkpoints no sign of failure
so far.

I tried with a few GB of overhead memory but that only delays the issue a
bit longer.
>From the heap usage graph, in all cases, it looks as expected. The heap
goes back to a few hundred MB after GC, as the only long lived state is
off-heap. Xmx heap is about 12GB but peak usage is at most 6Gb.


Am I misconfiguring anything that could explain the OOMKilled pods?

Also, what is the best single metric to monitor rocksdb memory usage?  (I
tried estimate-live-data-size and size-all-mem-tables but I am not fully
sure yet about their exact meaning).

Best,
Alex

-- 


By communicating with Grab Inc and/or its subsidiaries, associate 
companies and jointly controlled entities (“Grab Group”), you are deemed to 
have consented to the processing of your personal data as set out in the 
Privacy Notice which can be viewed at https://grab.com/privacy/ 



This email contains confidential information 
and is only for the intended recipient(s). If you are not the intended 
recipient(s), please do not disseminate, distribute or copy this email 
Please notify Grab Group immediately if you have received this by mistake 
and delete this email from your system. Email transmission cannot be 
guaranteed to be secure or error-free as any information therein could be 
intercepted, corrupted, lost, destroyed, delayed or incomplete, or contain 
viruses. Grab Group do not accept liability for any errors or omissions in 
the contents of this email arises as a result of email transmission. All 
intellectual property rights in this email and attachments therein shall 
remain vested in Grab Group, unless otherwise provided by law.



source code build failure

2022-02-25 Thread Edwin
Hi all,


I was trying to build flink source code by using mvn clean install -DskipTests 
command, and it turned out report build failure and the following error info:


[ERROR] Failed to execute goal 
org.apache.maven.plugins:maven-compiler-plugin:3.8.0:testCompile 
(default-testCompile) on project flink-runtime: Compilation failure: 
Compilation failure: 
[ERROR] 
/opt/source-code/flink-master/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerConfigurationTest.java:[45,20]
 sun.net.util.IPAddressUtil is internal proprietary API and may be removed in a 
future release
[ERROR] 
/opt/source-code/flink-master/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerConfigurationTest.java:[45,20]
 sun.net.util.IPAddressUtil is internal proprietary API and may be removed in a 
future release
[ERROR] 
/opt/source-code/flink-master/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerConfigurationTest.java:[45,20]
 sun.net.util.IPAddressUtil is internal proprietary API and may be removed in a 
future release
[ERROR] -> [Help 1]
[ERROR] 
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e 
switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR] 
[ERROR] For more information about the errors and possible solutions, please 
read the following articles:
[ERROR] [Help 1] 
http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException
[ERROR] 
[ERROR] After correcting the problems, you can resume the build with the command
[ERROR]   mvn  -rf :flink-runtime


And my system configuration is shown as following:
Maven home: /opt/apache-maven-3.8.4
Java version: 11, vendor: Oracle Corporation, runtime: /usr/java/jdk-11
Default locale: en_US, platform encoding: UTF-8
OS name: "linux", version: "2.6.32-431.el6.x86_64", arch: "amd64", family: 
"unix"


Can anyone suggest anything to help me fix this? Thanks in advance!



Re: streaming mode with both finite and infinite input sources

2022-02-25 Thread Dawid Wysakowicz
This should be supported in 1.14 if you enable checkpointing with 
finished tasks[1], which has been added in 1.14. In 1.15 it will be 
enabled by default.


Best,

Dawid

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/checkpointing/#enabling-and-configuring-checkpointing


On 25/02/2022 08:18, Jin Yi wrote:
so we have a streaming job where the main work to be done is 
processing infinite kafka sources.  recently, i added a fromCollection 
(finite) source to simply write some state once upon startup.  this 
all seems to work fine.  the finite source operators all finish, while 
all the infinite source operators continue running with watermarks.


however, the weird thing is that savepointing throws exceptions, and 
there have been no automatic checkpoints at all while the job has been 
running for 90+minutes (checkpoint config is unaligned, exactly once 
every 5m w/ a 1h timeout).


is this mixed finity not a supported setup?


OpenPGP_signature
Description: OpenPGP digital signature


Re: [Flink-1.14.3] Restart of pod due to duplicatejob submission

2022-02-25 Thread Parag Somani
Thank you Yang for your inputs...!

As a workaround I am doing the same by clearing up its state from ZK.

I am also evaluating alternatives for this by removing ZK based HA. As our
application is already in k8s, and pods are resilient. Also, AFAIK, flink
does not support active-active HA. So it would be equivalent to that of k8s
based resiliency.


On Fri, Feb 25, 2022 at 11:55 AM Yang Wang  wrote:

> This might be related with FLINK-21928 and seems already fixed in 1.14.0.
> But it will have some limitations and users need to manually clean up the
> HA entries.
>
>
> Best,
> Yang
>
> Parag Somani  于2022年2月24日周四 13:42写道:
>
>> Hello,
>>
>> Recently due to log4j vulnerabilities, we have upgraded to Apache Flink
>> 1.14.3. What we observed we are getting following exception, and because of
>> it pod gets in crashloopback. We have seen this issues esp. during the time
>> of upgrade or deployment time when existing pod is already running.
>>
>> What would it be causing this issue during deployment time? Any
>> assistance as a workaround would be much appreciated.
>>
>> Also, i am seeing this issue only after upgrade from 1.14.2 to 1.14.3 .
>>
>> Env:
>> Deployed on : k8s
>> Flink version: 1.14.3
>> HA using zookeeper
>>
>> Logs:
>> 2022-02-23 05:13:14.555 ERROR 45 --- [t-dispatcher-17]
>> c.b.a.his.service.FlinkExecutorService   : Failed to execute job
>>
>> org.apache.flink.util.FlinkException: Failed to execute job 'events rates
>> calculation'.
>> at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2056)
>> ~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
>> at
>> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:137)
>> ~[flink-clients_2.12-1.14.0.jar:1.14.0]
>> at
>> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
>> ~[flink-clients_2.12-1.14.0.jar:1.14.0]
>> at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1917)
>> ~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
>> at
>> com.bmc.ade.his.service.FlinkExecutorService.init(FlinkExecutorService.java:37)
>> ~[health-service-1.0.00.jar:1.0.00]
>> at
>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>> Method) ~[na:na]
>> at
>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> ~[na:na]
>> at
>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> ~[na:na]
>> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>> ~[na:na]
>> at
>> org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleElement.invoke(InitDestroyAnnotationBeanPostProcessor.java:389)
>> ~[spring-beans-5.3.4.jar:5.3.4]
>> at
>> org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleMetadata.invokeInitMethods(InitDestroyAnnotationBeanPostProcessor.java:333)
>> ~[spring-beans-5.3.4.jar:5.3.4]
>> at
>> org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:157)
>> ~[spring-beans-5.3.4.jar:5.3.4]
>> at
>> org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsBeforeInitialization(AbstractAutowireCapableBeanFactory.java:422)
>> ~[spring-beans-5.3.4.jar:5.3.4]
>> at
>> org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1778)
>> ~[spring-beans-5.3.4.jar:5.3.4]
>> at
>> org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:602)
>> ~[spring-beans-5.3.4.jar:5.3.4]
>> at
>> org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:524)
>> ~[spring-beans-5.3.4.jar:5.3.4]
>> at
>> org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:335)
>> ~[spring-beans-5.3.4.jar:5.3.4]
>> at
>> org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:234)
>> ~[spring-beans-5.3.4.jar:5.3.4]
>> at
>> org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:333)
>> ~[spring-beans-5.3.4.jar:5.3.4]
>> at
>> org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:208)
>> ~[spring-beans-5.3.4.jar:5.3.4]
>> at
>> org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:944)
>> ~[spring-beans-5.3.4.jar:

Re: streaming mode with both finite and infinite input sources

2022-02-25 Thread Yuval Itzchakov
One possible option is to look into the hybrid source released in Flink
1.14 to support your use-case:

https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/hybridsource/

On Fri, Feb 25, 2022, 09:19 Jin Yi  wrote:

> so we have a streaming job where the main work to be done is processing
> infinite kafka sources.  recently, i added a fromCollection (finite) source
> to simply write some state once upon startup.  this all seems to work
> fine.  the finite source operators all finish, while all the infinite
> source operators continue running with watermarks.
>
> however, the weird thing is that savepointing throws exceptions, and there
> have been no automatic checkpoints at all while the job has been running
> for 90+minutes (checkpoint config is unaligned, exactly once every 5m w/ a
> 1h timeout).
>
> is this mixed finity not a supported setup?
>