Re: RocksDB's state size discrepancy with what's seen with state processor API

2022-04-08 Thread Alexis Sarda-Espinosa
Hi Roman,

Here's an example of a WindowReaderFunction:

public class StateReaderFunction extends WindowReaderFunction {
private static final ListStateDescriptor LSD = new 
ListStateDescriptor<>(
"descriptorId",
Integer.class
);

@Override
public void readWindow(String s, Context context, 
Iterable elements, Collector out) throws Exception {
int count = 0;
for (Integer i : context.windowState().getListState(LSD).get()) {
count++;
}
out.collect(count);
}
}

That's for the operator that uses window state. The other readers do something 
similar but with context.globalState(). That should provide the number of state 
entries for each key+window combination, no? And after collecting all results, 
I would get the number of state entries across all keys+windows for an operator.

And yes, I do mean ProcessWindowFunction.clear(). Therein I call 
context.windowState().getListState(...).clear().

Side note: in the state processor program I call 
ExecutionEnvironment#setParallelism(1) even though my streaming job runs with 
parallelism=4, this doesn't affect the result, does it?

Regards,
Alexis.


From: Roman Khachatryan 
Sent: Friday, April 8, 2022 11:06 PM
To: Alexis Sarda-Espinosa 
Cc: user@flink.apache.org 
Subject: Re: RocksDB's state size discrepancy with what's seen with state 
processor API

Hi Alexis,

If I understand correctly, the provided StateProcessor program gives
you the number of stream elements per operator. However, you mentioned
that these operators have collection-type states (ListState and
MapState). That means that per one entry there can be an arbitrary
number of state elements.

Have you tried estimating the state sizes directly via readKeyedState[1]?

> The other operator does override and call clear()
Just to make sure, you mean ProcessWindowFunction.clear() [2], right?

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/state/api/ExistingSavepoint.html#readKeyedState-java.lang.String-org.apache.flink.state.api.functions.KeyedStateReaderFunction-

[2]
https://nightlies.apache.org/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.html#clear-org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction.Context-

Regards,
Roman


On Fri, Apr 8, 2022 at 4:19 PM Alexis Sarda-Espinosa
 wrote:
>
> Hello,
>
>
>
> I have a streaming job running on Flink 1.14.4 that uses managed state with 
> RocksDB with incremental checkpoints as backend. I’ve been monitoring a dev 
> environment that has been running for the last week and I noticed that state 
> size and end-to-end duration have been increasing steadily. Currently, 
> duration is 11 seconds and size is 917MB (as shown in the UI). The tasks with 
> the largest state (614MB) come from keyed sliding windows. Some attributes of 
> this job’s setup:
>
>
>
> Windows are 11 minutes in size.
> Slide time is 1 minute.
> Throughput is approximately 20 events per minute.
>
>
>
> I have 3 operators with these states:
>
>
>
> Window state with ListState and no TTL.
> Global window state with MapState> and a TTL of 1 hour 
> (with cleanupInRocksdbCompactFilter(1000L)).
> Global window state with ListState where the Pojo has an int and a 
> long, a TTL of 1 hour, and configured with 
> cleanupInRocksdbCompactFilter(1000L) as well.
>
>
>
> Both operators with global window state have logic to manually remove old 
> state in addition to configured TTL. The other operator does override and 
> call clear().
>
>
>
> I have now analyzed the checkpoint folder with the state processor API, and 
> I’ll note here that I see 50 folders named chk-*** even though I don’t set 
> state.checkpoints.num-retained and the default should be 1. I loaded the data 
> from the folder with the highest chk number and I see that my operators have 
> these amounts respectively:
>
>
>
> 10 entries
> 80 entries
> 200 entries
>
>
>
> I got those numbers with something like this:
>
>
>
> savepoint
>
> .window(SlidingEventTimeWindows.of(Time.minutes(11L), 
> Time.minutes(1L)))
>
> .process(...)
>
> .collect()
>
> .parallelStream()
>
> .reduce(0, Integer::sum);
>
>
>
> Where my WindowReaderFunction classes just count the number of entries in 
> each call to readWindow.
>
>
>
> Those amounts cannot possibly account for 614MB, so what am I missing?
>
>
>
> Regards,
>
> Alexis.
>
>


Re: DataStream request / response

2022-04-08 Thread Jason Thomas
I will dig deeper into Statefun.  Also, yes for now I also can try the
Spring/Kafka solution if Statefun doesn't fit.

Austin - as far rewriting our microservices in Flink here are some things I
was looking for:

- We need to be able to easily share/transform data with other teams.
Flink SQL seems really nice for this.  We also have use cases for real-time
analytics within our own application.
- If a Flink job is down temporarily due to redeployment, it can just pick
up where it left off.  With microservices, data gets lost/corrupted.
- I'm trying to help improve developer productivity, have better auditing
and logging, improve testing, etc.  An event driven architecture obviously
isn't required to have these things, but it should help.
- My intuition is that Flink will have lower hosting costs, but I haven't
tested this yet.

Thanks everyone for the help, I really appreciate it!

-Jason

On Fri, Apr 8, 2022 at 2:34 PM Roman Khachatryan  wrote:

> It seems to be possible now with RequestReplyHandlers from Java SDK
> [1] (or other SDKs) unless I'm missing something.
>
> [1]
>
> https://nightlies.apache.org/flink/flink-statefun-docs-release-3.2/docs/sdk/java/#serving-functions
>
> Regards,
> Roman
>
> On Fri, Apr 8, 2022 at 7:45 PM Austin Cawley-Edwards
>  wrote:
> >
> > Good suggestion – though a common misconception with Statefun is that
> HTTP ingestion is possible. Last time I checked it was still under
> theoretical discussion. Do you know the current  state there?
> >
> > Austin
> >
> > On Fri, Apr 8, 2022 at 1:19 PM Roman Khachatryan 
> wrote:
> >>
> >> Hi,
> >>
> >> Besides the solution suggested by Austing, you might also want to look
> >> at Stateful Functions [1]. They provide a more convenient programming
> >> model for the use-case I think, while DataStream is a relatively
> >> low-level API.
> >>
> >> [1]
> >> https://nightlies.apache.org/flink/flink-statefun-docs-stable/
> >>
> >> Regards,
> >> Roman
> >>
> >> On Fri, Apr 8, 2022 at 6:56 PM Austin Cawley-Edwards
> >>  wrote:
> >> >
> >> > Hi Jason,
> >> >
> >> > No, there is no HTTP source/ sink support that I know of for Flink.
> Would running the Spring + Kafka solution in front of Flink work for you?
> >> >
> >> > On a higher level, what drew you to migrating the microservice to
> Flink?
> >> >
> >> > Best,
> >> > Austin
> >> >
> >> > On Fri, Apr 8, 2022 at 12:35 PM Jason Thomas <
> katsoftware...@gmail.com> wrote:
> >> >>
> >> >> I'm taking an existing REST based microservice application and
> moving all of the logic into Flink DataStreams.
> >> >>
> >> >> Is there an easy way to get a request/response from a Flink
> DataStream so I can 'call' into it from a REST service?   For example,
> something similar to this Kafka streams example that uses Spring
> ReplyingKafkaTemplate - https://stackoverflow.com/a/58202587.
> >> >>
> >> >> Thanks for any help!
> >> >>
> >> >> -Jason
> >> >>
>


Re: RocksDB's state size discrepancy with what's seen with state processor API

2022-04-08 Thread Roman Khachatryan
Hi Alexis,

If I understand correctly, the provided StateProcessor program gives
you the number of stream elements per operator. However, you mentioned
that these operators have collection-type states (ListState and
MapState). That means that per one entry there can be an arbitrary
number of state elements.

Have you tried estimating the state sizes directly via readKeyedState[1]?

> The other operator does override and call clear()
Just to make sure, you mean ProcessWindowFunction.clear() [2], right?

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/state/api/ExistingSavepoint.html#readKeyedState-java.lang.String-org.apache.flink.state.api.functions.KeyedStateReaderFunction-

[2]
https://nightlies.apache.org/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.html#clear-org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction.Context-

Regards,
Roman


On Fri, Apr 8, 2022 at 4:19 PM Alexis Sarda-Espinosa
 wrote:
>
> Hello,
>
>
>
> I have a streaming job running on Flink 1.14.4 that uses managed state with 
> RocksDB with incremental checkpoints as backend. I’ve been monitoring a dev 
> environment that has been running for the last week and I noticed that state 
> size and end-to-end duration have been increasing steadily. Currently, 
> duration is 11 seconds and size is 917MB (as shown in the UI). The tasks with 
> the largest state (614MB) come from keyed sliding windows. Some attributes of 
> this job’s setup:
>
>
>
> Windows are 11 minutes in size.
> Slide time is 1 minute.
> Throughput is approximately 20 events per minute.
>
>
>
> I have 3 operators with these states:
>
>
>
> Window state with ListState and no TTL.
> Global window state with MapState> and a TTL of 1 hour 
> (with cleanupInRocksdbCompactFilter(1000L)).
> Global window state with ListState where the Pojo has an int and a 
> long, a TTL of 1 hour, and configured with 
> cleanupInRocksdbCompactFilter(1000L) as well.
>
>
>
> Both operators with global window state have logic to manually remove old 
> state in addition to configured TTL. The other operator does override and 
> call clear().
>
>
>
> I have now analyzed the checkpoint folder with the state processor API, and 
> I’ll note here that I see 50 folders named chk-*** even though I don’t set 
> state.checkpoints.num-retained and the default should be 1. I loaded the data 
> from the folder with the highest chk number and I see that my operators have 
> these amounts respectively:
>
>
>
> 10 entries
> 80 entries
> 200 entries
>
>
>
> I got those numbers with something like this:
>
>
>
> savepoint
>
> .window(SlidingEventTimeWindows.of(Time.minutes(11L), 
> Time.minutes(1L)))
>
> .process(...)
>
> .collect()
>
> .parallelStream()
>
> .reduce(0, Integer::sum);
>
>
>
> Where my WindowReaderFunction classes just count the number of entries in 
> each call to readWindow.
>
>
>
> Those amounts cannot possibly account for 614MB, so what am I missing?
>
>
>
> Regards,
>
> Alexis.
>
>


Re: Error during shutdown of StandaloneApplicationClusterEntryPoint via JVM shutdown hook

2022-04-08 Thread Roman Khachatryan
I'd try to increase the value, so that the timeout doesn't happen
during the shutdown.

Regards,
Roman

On Fri, Apr 8, 2022 at 7:50 PM Alexey Trenikhun  wrote:
>
> Hi Roman,
> Currently rest.async.store-duration is not set. Are you suggesting to try to 
> decrease value from default or vice-versa?
>
> Thanks,
> Alexey
> 
> From: Roman Khachatryan 
> Sent: Friday, April 8, 2022 5:32:45 AM
> To: Alexey Trenikhun 
> Cc: Flink User Mail List 
> Subject: Re: Error during shutdown of StandaloneApplicationClusterEntryPoint 
> via JVM shutdown hook
>
> Hello,
>
> Unfortunately, it's difficult to name the exact reason why the timeout
> happens because there's no message logged.
> I've opened a ticket to improve the logging [1].
> There, I also listed some code paths that might lead to this situation.
>
> From the described scenario, I'd suppose that it's
> CompletedOperationCache.closeAsync()  that times out. It can be
> verified or maybe mitigated by changing rest.async.store-duration  [2]
> (the default is 5 minutes).
> Could you check that?
>
> [1]
> https://issues.apache.org/jira/browse/FLINK-27144
> [2]
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#rest-async-store-duration
>
> Regards,
> Roman
>
> On Wed, Apr 6, 2022 at 5:21 PM Alexey Trenikhun  wrote:
> >
> > Hello,
> >
> > We are using Flink 1.13.6, Application Mode, k8s HA. To upgrade job, we use 
> > POST, 
> > url=http://gsp-jm:8081/jobs//savepoints, 
> > then we wait for up to 5 minutes for completion, periodically pulling 
> > status (GET, 
> > url=http://gsp-jm:8081/jobs//savepoints/{triggerId}).
> >  If savepoint is not complete in 5 minute, we cancel job (PATCH, 
> > url=http://gsp-jm:8081/jobs/000). Usually it 
> > works well, job stopped one way or another and we proceed with upgrade, but 
> > currently JM exits with code -2, and as result k8s restarts pod. We tried 
> > multiple times, but every time getting -2. JM log is below (newest messages 
> > on top):
> >
> > 2022-04-06T14:21:17.465Z Error during shutdown of 
> > StandaloneApplicationClusterEntryPoint via JVM shutdown hook.
> > java.util.concurrent.CompletionException: 
> > java.util.concurrent.TimeoutException
> >  at 
> > java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
> >  at 
> > java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
> >  at 
> > java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:661)
> >  at 
> > java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:683)
> >  at 
> > java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2010)
> >  at 
> > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.closeAsync(ClusterEntrypoint.java:379)
> >  at 
> > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$new$0(ClusterEntrypoint.java:168)
> >  at 
> > org.apache.flink.util.ShutdownHookUtil.lambda$addShutdownHook$0(ShutdownHookUtil.java:39)
> >  at java.lang.Thread.run(Thread.java:750) Caused by: 
> > java.util.concurrent.TimeoutException: null
> >  at 
> > org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:1255)
> >  at 
> > org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:217)
> >  at 
> > org.apache.flink.runtime.concurrent.FutureUtils.lambda$orTimeout$15(FutureUtils.java:582)
> >  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> >  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> >  at 
> > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> >  at 
> > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> >  at 
> > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> >  at 
> > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> >  ... 1 common frames omitted
> > 2022-04-06T14:21:17.464Z Terminating cluster entrypoint process 
> > StandaloneApplicationClusterEntryPoint with exit code 2.
> > java.util.concurrent.TimeoutException: null
> >  at 
> > org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:1255)
> >  at 
> > org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:217)
> >  at 
> > org.apache.flink.runtime.concurrent.FutureUtils.lambda$orTimeout$15(FutureUtils.java:582)
> >  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> >  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> >  at 
> > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> >  at 
> > 

Re: DataStream request / response

2022-04-08 Thread Roman Khachatryan
It seems to be possible now with RequestReplyHandlers from Java SDK
[1] (or other SDKs) unless I'm missing something.

[1]
https://nightlies.apache.org/flink/flink-statefun-docs-release-3.2/docs/sdk/java/#serving-functions

Regards,
Roman

On Fri, Apr 8, 2022 at 7:45 PM Austin Cawley-Edwards
 wrote:
>
> Good suggestion – though a common misconception with Statefun is that HTTP 
> ingestion is possible. Last time I checked it was still under theoretical 
> discussion. Do you know the current  state there?
>
> Austin
>
> On Fri, Apr 8, 2022 at 1:19 PM Roman Khachatryan  wrote:
>>
>> Hi,
>>
>> Besides the solution suggested by Austing, you might also want to look
>> at Stateful Functions [1]. They provide a more convenient programming
>> model for the use-case I think, while DataStream is a relatively
>> low-level API.
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-statefun-docs-stable/
>>
>> Regards,
>> Roman
>>
>> On Fri, Apr 8, 2022 at 6:56 PM Austin Cawley-Edwards
>>  wrote:
>> >
>> > Hi Jason,
>> >
>> > No, there is no HTTP source/ sink support that I know of for Flink. Would 
>> > running the Spring + Kafka solution in front of Flink work for you?
>> >
>> > On a higher level, what drew you to migrating the microservice to Flink?
>> >
>> > Best,
>> > Austin
>> >
>> > On Fri, Apr 8, 2022 at 12:35 PM Jason Thomas  
>> > wrote:
>> >>
>> >> I'm taking an existing REST based microservice application and moving all 
>> >> of the logic into Flink DataStreams.
>> >>
>> >> Is there an easy way to get a request/response from a Flink DataStream so 
>> >> I can 'call' into it from a REST service?   For example, something 
>> >> similar to this Kafka streams example that uses Spring 
>> >> ReplyingKafkaTemplate - https://stackoverflow.com/a/58202587.
>> >>
>> >> Thanks for any help!
>> >>
>> >> -Jason
>> >>


Re: HDFS streaming source concerns

2022-04-08 Thread Roman Khachatryan
Hi Carlos,

AFAIK, Flink FileSource is capable of checkpointing while reading the
files (at least in Streaming Mode).
As for the watermarks, I think FLIP-182 [1] could solve the problem;
however, it's currently under development.

I'm also pulling in Arvid and Fabian who are more familiar with the subject.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources

Regards,
Roman

On Wed, Apr 6, 2022 at 4:17 PM Carlos Downey  wrote:
>
> Hi,
>
> We have an in-house platform that we want to integrate with external clients 
> via HDFS. They have lots of existing files and they continuously put more 
> data to HDFS. Ideally, we would like to have a Flink job that takes care of 
> ingesting data as one of the requirements is to execute SQL on top of these 
> files. We looked at existing FileSource implementation but we believe this 
> will not be well suited for this use case.
> - firstly, we'd have to ingest all files initially present on HDFS before 
> completing first checkpoint - this is unacceptable for us as we would have to 
> reprocess all the files again in case of early job failure. Not to mention 
> the state blowing up for aggregations.
> - secondly, we see now way to establish valid watermark strategy. This is a 
> major pain point that we can't find the right answer for. We don't want to 
> assume too much about the data itself. In general, the only solutions we see 
> require some sort of synchronization across subtasks. On the other hand, the 
> simplest strategy is to delay the watermark. In that case though we are 
> afraid of accidentally dropping events.
>
> Given this, we think about implementing our own file source, have someone in 
> the community already tried solving similar problem? If not, any suggestions 
> about the concerns we raised would be valuable.


Re: Error during shutdown of StandaloneApplicationClusterEntryPoint via JVM shutdown hook

2022-04-08 Thread Alexey Trenikhun
Hi Roman,
Currently rest.async.store-duration is not set. Are you suggesting to try to 
decrease value from default or vice-versa?

Thanks,
Alexey

From: Roman Khachatryan 
Sent: Friday, April 8, 2022 5:32:45 AM
To: Alexey Trenikhun 
Cc: Flink User Mail List 
Subject: Re: Error during shutdown of StandaloneApplicationClusterEntryPoint 
via JVM shutdown hook

Hello,

Unfortunately, it's difficult to name the exact reason why the timeout
happens because there's no message logged.
I've opened a ticket to improve the logging [1].
There, I also listed some code paths that might lead to this situation.

>From the described scenario, I'd suppose that it's
CompletedOperationCache.closeAsync()  that times out. It can be
verified or maybe mitigated by changing rest.async.store-duration  [2]
(the default is 5 minutes).
Could you check that?

[1]
https://issues.apache.org/jira/browse/FLINK-27144
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#rest-async-store-duration

Regards,
Roman

On Wed, Apr 6, 2022 at 5:21 PM Alexey Trenikhun  wrote:
>
> Hello,
>
> We are using Flink 1.13.6, Application Mode, k8s HA. To upgrade job, we use 
> POST, 
> url=http://gsp-jm:8081/jobs//savepoints, then 
> we wait for up to 5 minutes for completion, periodically pulling status (GET, 
> url=http://gsp-jm:8081/jobs//savepoints/{triggerId}).
>  If savepoint is not complete in 5 minute, we cancel job (PATCH, 
> url=http://gsp-jm:8081/jobs/000). Usually it 
> works well, job stopped one way or another and we proceed with upgrade, but 
> currently JM exits with code -2, and as result k8s restarts pod. We tried 
> multiple times, but every time getting -2. JM log is below (newest messages 
> on top):
>
> 2022-04-06T14:21:17.465Z Error during shutdown of 
> StandaloneApplicationClusterEntryPoint via JVM shutdown hook.
> java.util.concurrent.CompletionException: 
> java.util.concurrent.TimeoutException
>  at 
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
>  at 
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
>  at 
> java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:661)
>  at 
> java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:683)
>  at 
> java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2010)
>  at 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.closeAsync(ClusterEntrypoint.java:379)
>  at 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$new$0(ClusterEntrypoint.java:168)
>  at 
> org.apache.flink.util.ShutdownHookUtil.lambda$addShutdownHook$0(ShutdownHookUtil.java:39)
>  at java.lang.Thread.run(Thread.java:750) Caused by: 
> java.util.concurrent.TimeoutException: null
>  at 
> org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:1255)
>  at 
> org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:217)
>  at 
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$orTimeout$15(FutureUtils.java:582)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  ... 1 common frames omitted
> 2022-04-06T14:21:17.464Z Terminating cluster entrypoint process 
> StandaloneApplicationClusterEntryPoint with exit code 2.
> java.util.concurrent.TimeoutException: null
>  at 
> org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:1255)
>  at 
> org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:217)
>  at 
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$orTimeout$15(FutureUtils.java:582)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at  
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:750)
> 2022-04-06T14:21:17.463Z Stopped Akka RPC service.
> 2022-04-06T14:21:17.46Z Stopped Akka RPC service.
> 

Re: DataStream request / response

2022-04-08 Thread Austin Cawley-Edwards
Good suggestion – though a common misconception with Statefun is that HTTP
ingestion is possible. Last time I checked it was still under theoretical
discussion. Do you know the current  state there?

Austin

On Fri, Apr 8, 2022 at 1:19 PM Roman Khachatryan  wrote:

> Hi,
>
> Besides the solution suggested by Austing, you might also want to look
> at Stateful Functions [1]. They provide a more convenient programming
> model for the use-case I think, while DataStream is a relatively
> low-level API.
>
> [1]
> https://nightlies.apache.org/flink/flink-statefun-docs-stable/
>
> Regards,
> Roman
>
> On Fri, Apr 8, 2022 at 6:56 PM Austin Cawley-Edwards
>  wrote:
> >
> > Hi Jason,
> >
> > No, there is no HTTP source/ sink support that I know of for Flink.
> Would running the Spring + Kafka solution in front of Flink work for you?
> >
> > On a higher level, what drew you to migrating the microservice to Flink?
> >
> > Best,
> > Austin
> >
> > On Fri, Apr 8, 2022 at 12:35 PM Jason Thomas 
> wrote:
> >>
> >> I'm taking an existing REST based microservice application and moving
> all of the logic into Flink DataStreams.
> >>
> >> Is there an easy way to get a request/response from a Flink DataStream
> so I can 'call' into it from a REST service?   For example, something
> similar to this Kafka streams example that uses Spring
> ReplyingKafkaTemplate - https://stackoverflow.com/a/58202587.
> >>
> >> Thanks for any help!
> >>
> >> -Jason
> >>
>


Unsubscribe

2022-04-08 Thread Natalie Dunn



Re: DataStream request / response

2022-04-08 Thread Roman Khachatryan
Hi,

Besides the solution suggested by Austing, you might also want to look
at Stateful Functions [1]. They provide a more convenient programming
model for the use-case I think, while DataStream is a relatively
low-level API.

[1]
https://nightlies.apache.org/flink/flink-statefun-docs-stable/

Regards,
Roman

On Fri, Apr 8, 2022 at 6:56 PM Austin Cawley-Edwards
 wrote:
>
> Hi Jason,
>
> No, there is no HTTP source/ sink support that I know of for Flink. Would 
> running the Spring + Kafka solution in front of Flink work for you?
>
> On a higher level, what drew you to migrating the microservice to Flink?
>
> Best,
> Austin
>
> On Fri, Apr 8, 2022 at 12:35 PM Jason Thomas  wrote:
>>
>> I'm taking an existing REST based microservice application and moving all of 
>> the logic into Flink DataStreams.
>>
>> Is there an easy way to get a request/response from a Flink DataStream so I 
>> can 'call' into it from a REST service?   For example, something similar to 
>> this Kafka streams example that uses Spring ReplyingKafkaTemplate - 
>> https://stackoverflow.com/a/58202587.
>>
>> Thanks for any help!
>>
>> -Jason
>>


Re: DataStream request / response

2022-04-08 Thread Austin Cawley-Edwards
Hi Jason,

No, there is no HTTP source/ sink support that I know of for Flink. Would
running the Spring + Kafka solution in front of Flink work for you?

On a higher level, what drew you to migrating the microservice to Flink?

Best,
Austin

On Fri, Apr 8, 2022 at 12:35 PM Jason Thomas 
wrote:

> I'm taking an existing REST based microservice application and moving all
> of the logic into Flink DataStreams.
>
> Is there an easy way to get a request/response from a Flink DataStream
> so I can 'call' into it from a REST service?   For example, something
> similar to this Kafka streams example that uses Spring
> ReplyingKafkaTemplate - https://stackoverflow.com/a/58202587.
>
> Thanks for any help!
>
> -Jason
>
>


DataStream request / response

2022-04-08 Thread Jason Thomas
I'm taking an existing REST based microservice application and moving all
of the logic into Flink DataStreams.

Is there an easy way to get a request/response from a Flink DataStream so I
can 'call' into it from a REST service?   For example, something similar to
this Kafka streams example that uses Spring ReplyingKafkaTemplate -
https://stackoverflow.com/a/58202587.

Thanks for any help!

-Jason


RocksDB's state size discrepancy with what's seen with state processor API

2022-04-08 Thread Alexis Sarda-Espinosa
Hello,

I have a streaming job running on Flink 1.14.4 that uses managed state with 
RocksDB with incremental checkpoints as backend. I've been monitoring a dev 
environment that has been running for the last week and I noticed that state 
size and end-to-end duration have been increasing steadily. Currently, duration 
is 11 seconds and size is 917MB (as shown in the UI). The tasks with the 
largest state (614MB) come from keyed sliding windows. Some attributes of this 
job's setup:


  *   Windows are 11 minutes in size.
  *   Slide time is 1 minute.
  *   Throughput is approximately 20 events per minute.

I have 3 operators with these states:


  1.  Window state with ListState and no TTL.
  2.  Global window state with MapState> and a TTL of 1 hour 
(with cleanupInRocksdbCompactFilter(1000L)).
  3.  Global window state with ListState where the Pojo has an int and a 
long, a TTL of 1 hour, and configured with cleanupInRocksdbCompactFilter(1000L) 
as well.

Both operators with global window state have logic to manually remove old state 
in addition to configured TTL. The other operator does override and call 
clear().

I have now analyzed the checkpoint folder with the state processor API, and 
I'll note here that I see 50 folders named chk-*** even though I don't set 
state.checkpoints.num-retained and the default should be 1. I loaded the data 
from the folder with the highest chk number and I see that my operators have 
these amounts respectively:


  1.  10 entries
  2.  80 entries
  3.  200 entries

I got those numbers with something like this:

savepoint
.window(SlidingEventTimeWindows.of(Time.minutes(11L), Time.minutes(1L)))
.process(...)
.collect()
.parallelStream()
.reduce(0, Integer::sum);

Where my WindowReaderFunction classes just count the number of entries in each 
call to readWindow.

Those amounts cannot possibly account for 614MB, so what am I missing?

Regards,
Alexis.



Re: RocksDB state not cleaned up

2022-04-08 Thread Yun Tang
Hi Alexis,

RocksDB itself supports manual compaction API [1], and current Flink does not 
support to call these APIs to support periodic compactions.

If Flink supports such period compaction, from my understanding, this is 
somehow like major compaction in HBase. I am not sure whether this is really 
useful for Flink as this could push data to the last level, which leads to 
increase the read amplification.

[1] 
https://javadoc.io/doc/org.rocksdb/rocksdbjni/6.20.3/org/rocksdb/RocksDB.html

Best
Yun Tang

From: Alexis Sarda-Espinosa 
Sent: Friday, April 8, 2022 18:54
To: tao xiao ; David Morávek 
Cc: Yun Tang ; user 
Subject: RE: RocksDB state not cleaned up


May I ask if anyone tested RocksDB’s periodic compaction in the meantime? And 
if yes, if it helped with this case.



Regards,

Alexis.



From: tao xiao 
Sent: Samstag, 18. September 2021 05:01
To: David Morávek 
Cc: Yun Tang ; user 
Subject: Re: RocksDB state not cleaned up



Thanks for the feedback! However TTL already proves that the state cannot be 
cleaned up on time due to too many levels built up in RocksDB.



Hi @Yun Tang do you have any suggestions to tune 
RocksDB to accelerate the compaction progress?



On Fri, Sep 17, 2021 at 8:01 PM David Morávek 
mailto:d...@apache.org>> wrote:

Cleaning up with timers should solve this. Both approaches have some advantages 
and disadvantages though.



Timers:

- No "side effects".

- Can be set in event time. Deletes are regular tombstones that will get 
compacted later on.



TTL:

- Performance. This costs literally nothing compared to an extra state for 
timer + writing a tombstone marker.

- Has "side-effects", because it works in processing time. This is just 
something to keep in mind eg. when bootstraping the state from historical data. 
(large event time / processing time skew)



With 1.14 release, we've bumped the RocksDB version so it may be possible to 
use a "periodic compaction" [1], but nobody has tried that so far. In the 
meantime I think there is non real workaround because we don't expose a way to 
trigger manual compaction.



I'm off to vacation until 27th and I won't be responsive during that time. I'd 
like to pull Yun into the conversation as he's super familiar with the RocksDB 
state backend.



[1] 
https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide#periodic-and-ttl-compaction



Best,

D.



On Fri, Sep 17, 2021 at 5:17 AM tao xiao 
mailto:xiaotao...@gmail.com>> wrote:

Hi David,



Confirmed with RocksDB log Stephan's observation is the root cause that 
compaction doesn't clean up the high level sst files fast enough.  Do you think 
manual clean up by registering a timer is the way to go or any RocksDB 
parameter can be tuned to mitigate this issue?



On Wed, Sep 15, 2021 at 12:10 AM tao xiao 
mailto:xiaotao...@gmail.com>> wrote:

Hi David,



If I read Stephan's comment correctly TTL doesn't work well for cases where we 
have too many levels, like fast growing state,  as compaction doesn't clean up 
high level SST files in time, Is this correct? If yes should we register a 
timer with TTL time and manual clean up the state (state.clear() ) when the 
timer fires?



I will turn on RocksDB logging as well as compaction logging [1] to verify this



[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/state.html#cleanup-during-rocksdb-compaction





On Tue, Sep 14, 2021 at 5:38 PM David Morávek 
mailto:d...@apache.org>> wrote:

Hi Tao,



my intuition is that the compaction of SST files is not triggering. By default, 
it's only triggered by the size ratios of different levels [1] and the TTL 
mechanism has no effect on it.



Some reasoning from Stephan:



It's very likely to have large files in higher levels that haven't been 
compacted in a long time and thus just stay around.



This might be especially possible if you insert a lot in the beginning (build 
up many levels) and then have a moderate rate of modifications, so the changes 
and expiration keep happening purely in the merges / compactions of the first 
levels. Then the later levels may stay unchanged for quite some time.



You should be able to see compaction details by setting RocksDB logging to INFO 
[2]. Can you please check these and validate whether this really is the case?



[1] https://github.com/facebook/rocksdb/wiki/Leveled-Compaction

[2] 
https://ververica.zendesk.com/hc/en-us/articles/360015933320-How-to-get-RocksDB-s-LOG-file-back-for-advanced-troubleshooting



Best,

D.



On Mon, Sep 13, 2021 at 3:18 PM tao xiao 
mailto:xiaotao...@gmail.com>> wrote:

Hi team



We have a job that uses value state with RocksDB and TTL set to 1 day. The TTL 
update type is OnCreateAndWrite. We set the value state when the value state 
doesn't exist and we never update it again after the state is not empty. The 
key of the value state is timestamp. My understanding of such TTL settings is 
that the size of 

答复: Re:Re:yarn api 提交报错

2022-04-08 Thread Geng Biao
Hi 周涛,
Mang的建议很好,封装拼接参数是比较常见的实现。如果你没有什么特殊需求的话,推荐考虑他的建议,之后Flink版本升级之类的也一般会方便一些。
如果你因为某些原因,要继续走你目前的方式的话,我看到你的代码和YARNApplicationITCase中的代码比较接近了,你可以注意下,ITCase中的代码是在本地运行的,并且通过类似YARNApplicationITCase#startYARNWithConfig这样的方法设置好了HADOOP和Flink相关的环境变量。
在实际作业中,最终在Server侧,YARN在AM运行Flink作业的命令类似这样:
/bin/bash -c /usr/lib/jvm/java-1.8.0/bin/java -Xmx1073741824 -Xms1073741824 
-XX:MaxMetaspaceSize=268435456 
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint -D 
jobmanager.memory.off-heap.size=134217728b …
我觉得你可以结合startYARNWithConfig判断一下你是否有哪里没有设置好相应环境变量,导致AM中的classpath等环境变量不对,进而导致找不到class。

发件人: 周涛 <06160...@163.com>
日期: 星期五, 2022年4月8日 下午8:57
收件人: Mang Zhang 
抄送: user-zh@flink.apache.org 
主题: Re:Re:yarn api 提交报错
非常感谢Mang Zhang的回复
AM的log全部内容如下,主要信息还是未找到YarnApplicationClusterEntryPoint类:

Log Type: jobmanager.err

Log Upload Time: Fri Apr 08 09:24:01 +0800 2022

Log Length: 107

Error: Could not find or load main class 
org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint


Log Type: jobmanager.out

Log Upload Time: Fri Apr 08 09:24:01 +0800 2022

Log Length: 0

Log Type: prelaunch.err

Log Upload Time: Fri Apr 08 09:24:01 +0800 2022

Log Length: 0

Log Type: prelaunch.out

Log Upload Time: Fri Apr 08 09:24:01 +0800 2022

Log Length: 70

Setting up env variables
Setting up job resources
Launching container


我在代码中已经将flink所依赖的lib包,都注册给了yarn了,jar包已提前上传至hdfs,在debug中,也都看到获取到了jar包信息,也包含flink-dist.jar
提交到yarn后,启动jm时抛错。一直未找到原因




另外,flink提供的工具是指?我这个开发,主要是想自己开发一个管理平台,提交任务和管理任务














在 2022-04-08 17:57:48,"Mang Zhang"  写道:




这个异常看起来是提交到YARN集群,启动AM的时候报错了,可以到集群上看AM的log
比如log里提示的 http://bigdata-beta2:8088/cluster/app/application_1647875542261_0079
另外就是,从你的代码看,你是用YARN的API方式提交任务,那么就需要自己将任务依赖的所有jar都注册给YARN
比如flink lib下的jar,使用 YarnLocalResourceDescriptor 
注册依赖等,过程可能会比较曲折,需要先了解YARN的任务部署机制;


建议还是直接使用Flink提供好的命令和工具来提交任务,引擎已经将这些步骤都封装好了







祝好







--

Best regards,
Mang Zhang





在 2022-04-08 09:53:45,"周涛" <06160...@163.com> 写道:
>
>
>
>
>
>
>hi,
>我在测试使用java api提交flink任务时,遇到了一些问题,需要请教:
>flink版本1.14.4
>Hadoop版本:3.0.0-cdh6.2.1
>application模式,使用命令提交正常运行,api提交失败
>提交失败,yarn日志:
>   LogType:jobmanager.err
>   LogLastModifiedTime:Fri Apr 08 09:24:01 +0800 2022
>   LogLength:107
>   LogContents:
>   Error: Could not find or load main class 
> org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint
>   End of LogType:jobmanager.err
>以下是代码:
>
>
> System.setProperty("HADOOP_USER_NAME", "hdfs");
>//flink的本地配置目录,为了得到flink的配置
>String configurationDirectory = 
>"E:\\AtourWork\\dctool\\dctool-common\\dctool-common-hdfs\\src\\main\\resources\\test\\flink114\\";
>
>//存放flink集群相关的jar包目录
>String flinkLibs = "hdfs://nameservice1/flink/jar/libs/lib/";
>//用户jar
>String userJarPath = 
>"hdfs://nameservice1/flink/jar/userTask/flink-streaming-test-1.0-SNAPSHOT.jar";
>String flinkDistJar = 
> "hdfs://nameservice1/flink/jar/libs/flink-dist.jar";
>
>YarnClientService yarnClientService = new YarnClientService();
>//yarnclient创建
>YarnClient yarnClient = yarnClientService.getYarnClient();
>yarnClient.start();
>
>// 设置日志的,没有的话看不到日志
>YarnClusterInformationRetriever clusterInformationRetriever = 
>YarnClientYarnClusterInformationRetriever
>.create(yarnClient);
>
>//获取flink的配置
>Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration(
>configurationDirectory);
>
>
> flinkConfiguration.setString(ConfigOptions.key("fs.hdfs.hadoopconf").stringType().noDefaultValue(),
>"E:\\AtourWork\\dctool\\dctool-common\\dctool-common-hdfs\\src\\main\\resources\\test\\yarn\\");
>
>flinkConfiguration.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, 
> true);
>
>flinkConfiguration.set(PipelineOptions.JARS, 
> Collections.singletonList(userJarPath));
>
>Path remoteLib = new Path(flinkLibs);
>flinkConfiguration.set(
>YarnConfigOptions.PROVIDED_LIB_DIRS,
>Collections.singletonList(remoteLib.toString()));
>
>flinkConfiguration.set(
>YarnConfigOptions.FLINK_DIST_JAR,
>flinkDistJar);
>
>// 设置为application模式
>flinkConfiguration.set(
>DeploymentOptions.TARGET,
>YarnDeploymentTarget.APPLICATION.getName());
>
>// yarn application name
>flinkConfiguration.set(YarnConfigOptions.APPLICATION_NAME, 
>"flink-application");
>
>YarnLogConfigUtil.setLogConfigFileInConfig(flinkConfiguration, 
> configurationDirectory);
>
>// 设置用户jar的参数和主类
>ApplicationConfiguration appConfig = new ApplicationConfiguration(new 
>String[]{}, "com.zt.FlinkTest1");
>
>
>final int jobManagerMemoryMB =
>
> JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(
>flinkConfiguration, 
> JobManagerOptions.TOTAL_PROCESS_MEMORY)
>.getTotalProcessMemorySize()
>  

Re:Re:Re:yarn api 提交报错

2022-04-08 Thread Mang Zhang



这个信息比较少,第一感觉是因为YARN任务提交相关信息没有设置正确,如果感兴趣可以看看https://github.com/hortonworks/simple-yarn-app
这个项目理解清楚YARN APP的机制和原理;


回到你本质诉求上来,你是想开发一个任务托管平台,一个简单,正常的思路是,你通过封装拼接参数,然后通过调用 $FLINK_HOME/bin/flink run 
相关命令来提交任务
你现在的思路有点跑偏,也可能是因为你的场景下有其他我不知道的需求点;


另外调度平台,Apache DolphinScheduler 也是一个不错的选择,也是国内开源的优秀项目,功能完善,也可以参考
https://dolphinscheduler.apache.org/zh-cn/docs/latest/user_doc/guide/flink-call.html




希望对你有所帮助




--

Best regards,
Mang Zhang





在 2022-04-08 20:56:33,"周涛" <06160...@163.com> 写道:
>非常感谢Mang Zhang的回复
>AM的log全部内容如下,主要信息还是未找到YarnApplicationClusterEntryPoint类:
>
>Log Type: jobmanager.err
>
>Log Upload Time: Fri Apr 08 09:24:01 +0800 2022
>
>Log Length: 107
>
>Error: Could not find or load main class 
>org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint
>
>
>Log Type: jobmanager.out
>
>Log Upload Time: Fri Apr 08 09:24:01 +0800 2022
>
>Log Length: 0
>
>Log Type: prelaunch.err
>
>Log Upload Time: Fri Apr 08 09:24:01 +0800 2022
>
>Log Length: 0
>
>Log Type: prelaunch.out
>
>Log Upload Time: Fri Apr 08 09:24:01 +0800 2022
>
>Log Length: 70
>
>Setting up env variables
>Setting up job resources
>Launching container
>
>
>我在代码中已经将flink所依赖的lib包,都注册给了yarn了,jar包已提前上传至hdfs,在debug中,也都看到获取到了jar包信息,也包含flink-dist.jar
>提交到yarn后,启动jm时抛错。一直未找到原因
>
>
>
>
>另外,flink提供的工具是指?我这个开发,主要是想自己开发一个管理平台,提交任务和管理任务
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2022-04-08 17:57:48,"Mang Zhang"  写道:
>
>
>
>
>这个异常看起来是提交到YARN集群,启动AM的时候报错了,可以到集群上看AM的log
>比如log里提示的 http://bigdata-beta2:8088/cluster/app/application_1647875542261_0079
>另外就是,从你的代码看,你是用YARN的API方式提交任务,那么就需要自己将任务依赖的所有jar都注册给YARN
>比如flink lib下的jar,使用 YarnLocalResourceDescriptor 
>注册依赖等,过程可能会比较曲折,需要先了解YARN的任务部署机制;
>
>
>建议还是直接使用Flink提供好的命令和工具来提交任务,引擎已经将这些步骤都封装好了
>
>
>
>
>
>
>
>祝好
>
>
>
>
>
>
>
>--
>
>Best regards,
>Mang Zhang
>
>
>
>
>
>在 2022-04-08 09:53:45,"周涛" <06160...@163.com> 写道:
>>
>>
>>
>>
>>
>>
>>hi,
>>我在测试使用java api提交flink任务时,遇到了一些问题,需要请教:
>>flink版本1.14.4
>>Hadoop版本:3.0.0-cdh6.2.1
>>application模式,使用命令提交正常运行,api提交失败
>>提交失败,yarn日志:
>>   LogType:jobmanager.err
>>   LogLastModifiedTime:Fri Apr 08 09:24:01 +0800 2022
>>   LogLength:107
>>   LogContents:
>>   Error: Could not find or load main class 
>> org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint
>>   End of LogType:jobmanager.err
>>以下是代码:
>>
>>
>> System.setProperty("HADOOP_USER_NAME", "hdfs");
>>//flink的本地配置目录,为了得到flink的配置
>>String configurationDirectory = 
>>"E:\\AtourWork\\dctool\\dctool-common\\dctool-common-hdfs\\src\\main\\resources\\test\\flink114\\";
>>
>>//存放flink集群相关的jar包目录
>>String flinkLibs = "hdfs://nameservice1/flink/jar/libs/lib/";
>>//用户jar
>>String userJarPath = 
>>"hdfs://nameservice1/flink/jar/userTask/flink-streaming-test-1.0-SNAPSHOT.jar";
>>String flinkDistJar = 
>> "hdfs://nameservice1/flink/jar/libs/flink-dist.jar";
>>
>>YarnClientService yarnClientService = new YarnClientService();
>>//yarnclient创建
>>YarnClient yarnClient = yarnClientService.getYarnClient();
>>yarnClient.start();
>>
>>// 设置日志的,没有的话看不到日志
>>YarnClusterInformationRetriever clusterInformationRetriever = 
>>YarnClientYarnClusterInformationRetriever
>>.create(yarnClient);
>>
>>//获取flink的配置
>>Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration(
>>configurationDirectory);
>>
>>
>> flinkConfiguration.setString(ConfigOptions.key("fs.hdfs.hadoopconf").stringType().noDefaultValue(),
>>"E:\\AtourWork\\dctool\\dctool-common\\dctool-common-hdfs\\src\\main\\resources\\test\\yarn\\");
>>
>>flinkConfiguration.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, 
>> true);
>>
>>flinkConfiguration.set(PipelineOptions.JARS, 
>> Collections.singletonList(userJarPath));
>>
>>Path remoteLib = new Path(flinkLibs);
>>flinkConfiguration.set(
>>YarnConfigOptions.PROVIDED_LIB_DIRS,
>>Collections.singletonList(remoteLib.toString()));
>>
>>flinkConfiguration.set(
>>YarnConfigOptions.FLINK_DIST_JAR,
>>flinkDistJar);
>>
>>// 设置为application模式
>>flinkConfiguration.set(
>>DeploymentOptions.TARGET,
>>YarnDeploymentTarget.APPLICATION.getName());
>>
>>// yarn application name
>>flinkConfiguration.set(YarnConfigOptions.APPLICATION_NAME, 
>>"flink-application");
>>
>>YarnLogConfigUtil.setLogConfigFileInConfig(flinkConfiguration, 
>> configurationDirectory);
>>
>>// 设置用户jar的参数和主类
>>ApplicationConfiguration appConfig = new ApplicationConfiguration(new 
>>String[]{}, "com.zt.FlinkTest1");
>>
>>
>>final int jobManagerMemoryMB =
>>
>> JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(
>>flinkConfiguration, 
>> JobManagerOptions.TOTAL_PROCESS_MEMORY)
>>.getTotalProcessMemorySize()
>>.getMebiBytes();
>>final 

退订

2022-04-08 Thread swessia



Re:Re:yarn api 提交报错

2022-04-08 Thread 周涛
非常感谢Mang Zhang的回复
AM的log全部内容如下,主要信息还是未找到YarnApplicationClusterEntryPoint类:

Log Type: jobmanager.err

Log Upload Time: Fri Apr 08 09:24:01 +0800 2022

Log Length: 107

Error: Could not find or load main class 
org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint


Log Type: jobmanager.out

Log Upload Time: Fri Apr 08 09:24:01 +0800 2022

Log Length: 0

Log Type: prelaunch.err

Log Upload Time: Fri Apr 08 09:24:01 +0800 2022

Log Length: 0

Log Type: prelaunch.out

Log Upload Time: Fri Apr 08 09:24:01 +0800 2022

Log Length: 70

Setting up env variables
Setting up job resources
Launching container


我在代码中已经将flink所依赖的lib包,都注册给了yarn了,jar包已提前上传至hdfs,在debug中,也都看到获取到了jar包信息,也包含flink-dist.jar
提交到yarn后,启动jm时抛错。一直未找到原因




另外,flink提供的工具是指?我这个开发,主要是想自己开发一个管理平台,提交任务和管理任务














在 2022-04-08 17:57:48,"Mang Zhang"  写道:




这个异常看起来是提交到YARN集群,启动AM的时候报错了,可以到集群上看AM的log
比如log里提示的 http://bigdata-beta2:8088/cluster/app/application_1647875542261_0079
另外就是,从你的代码看,你是用YARN的API方式提交任务,那么就需要自己将任务依赖的所有jar都注册给YARN
比如flink lib下的jar,使用 YarnLocalResourceDescriptor 
注册依赖等,过程可能会比较曲折,需要先了解YARN的任务部署机制;


建议还是直接使用Flink提供好的命令和工具来提交任务,引擎已经将这些步骤都封装好了







祝好







--

Best regards,
Mang Zhang





在 2022-04-08 09:53:45,"周涛" <06160...@163.com> 写道:
>
>
>
>
>
>
>hi,
>我在测试使用java api提交flink任务时,遇到了一些问题,需要请教:
>flink版本1.14.4
>Hadoop版本:3.0.0-cdh6.2.1
>application模式,使用命令提交正常运行,api提交失败
>提交失败,yarn日志:
>   LogType:jobmanager.err
>   LogLastModifiedTime:Fri Apr 08 09:24:01 +0800 2022
>   LogLength:107
>   LogContents:
>   Error: Could not find or load main class 
> org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint
>   End of LogType:jobmanager.err
>以下是代码:
>
>
> System.setProperty("HADOOP_USER_NAME", "hdfs");
>//flink的本地配置目录,为了得到flink的配置
>String configurationDirectory = 
>"E:\\AtourWork\\dctool\\dctool-common\\dctool-common-hdfs\\src\\main\\resources\\test\\flink114\\";
>
>//存放flink集群相关的jar包目录
>String flinkLibs = "hdfs://nameservice1/flink/jar/libs/lib/";
>//用户jar
>String userJarPath = 
>"hdfs://nameservice1/flink/jar/userTask/flink-streaming-test-1.0-SNAPSHOT.jar";
>String flinkDistJar = 
> "hdfs://nameservice1/flink/jar/libs/flink-dist.jar";
>
>YarnClientService yarnClientService = new YarnClientService();
>//yarnclient创建
>YarnClient yarnClient = yarnClientService.getYarnClient();
>yarnClient.start();
>
>// 设置日志的,没有的话看不到日志
>YarnClusterInformationRetriever clusterInformationRetriever = 
>YarnClientYarnClusterInformationRetriever
>.create(yarnClient);
>
>//获取flink的配置
>Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration(
>configurationDirectory);
>
>
> flinkConfiguration.setString(ConfigOptions.key("fs.hdfs.hadoopconf").stringType().noDefaultValue(),
>"E:\\AtourWork\\dctool\\dctool-common\\dctool-common-hdfs\\src\\main\\resources\\test\\yarn\\");
>
>flinkConfiguration.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, 
> true);
>
>flinkConfiguration.set(PipelineOptions.JARS, 
> Collections.singletonList(userJarPath));
>
>Path remoteLib = new Path(flinkLibs);
>flinkConfiguration.set(
>YarnConfigOptions.PROVIDED_LIB_DIRS,
>Collections.singletonList(remoteLib.toString()));
>
>flinkConfiguration.set(
>YarnConfigOptions.FLINK_DIST_JAR,
>flinkDistJar);
>
>// 设置为application模式
>flinkConfiguration.set(
>DeploymentOptions.TARGET,
>YarnDeploymentTarget.APPLICATION.getName());
>
>// yarn application name
>flinkConfiguration.set(YarnConfigOptions.APPLICATION_NAME, 
>"flink-application");
>
>YarnLogConfigUtil.setLogConfigFileInConfig(flinkConfiguration, 
> configurationDirectory);
>
>// 设置用户jar的参数和主类
>ApplicationConfiguration appConfig = new ApplicationConfiguration(new 
>String[]{}, "com.zt.FlinkTest1");
>
>
>final int jobManagerMemoryMB =
>
> JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(
>flinkConfiguration, 
> JobManagerOptions.TOTAL_PROCESS_MEMORY)
>.getTotalProcessMemorySize()
>.getMebiBytes();
>final int taskManagerMemoryMB =
>TaskExecutorProcessUtils.processSpecFromConfig(
>TaskExecutorProcessUtils
>
> .getConfigurationMapLegacyTaskManagerHeapSizeToConfigOption(
>flinkConfiguration,
>
> TaskManagerOptions.TOTAL_PROCESS_MEMORY))
>.getTotalProcessMemorySize()
>.getMebiBytes();
>ClusterSpecification clusterSpecification = new 
> ClusterSpecification.ClusterSpecificationBuilder()
>.setMasterMemoryMB(jobManagerMemoryMB)
>

Re: Error during shutdown of StandaloneApplicationClusterEntryPoint via JVM shutdown hook

2022-04-08 Thread Roman Khachatryan
Hello,

Unfortunately, it's difficult to name the exact reason why the timeout
happens because there's no message logged.
I've opened a ticket to improve the logging [1].
There, I also listed some code paths that might lead to this situation.

>From the described scenario, I'd suppose that it's
CompletedOperationCache.closeAsync()  that times out. It can be
verified or maybe mitigated by changing rest.async.store-duration  [2]
(the default is 5 minutes).
Could you check that?

[1]
https://issues.apache.org/jira/browse/FLINK-27144
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#rest-async-store-duration

Regards,
Roman

On Wed, Apr 6, 2022 at 5:21 PM Alexey Trenikhun  wrote:
>
> Hello,
>
> We are using Flink 1.13.6, Application Mode, k8s HA. To upgrade job, we use 
> POST, 
> url=http://gsp-jm:8081/jobs//savepoints, then 
> we wait for up to 5 minutes for completion, periodically pulling status (GET, 
> url=http://gsp-jm:8081/jobs//savepoints/{triggerId}).
>  If savepoint is not complete in 5 minute, we cancel job (PATCH, 
> url=http://gsp-jm:8081/jobs/000). Usually it 
> works well, job stopped one way or another and we proceed with upgrade, but 
> currently JM exits with code -2, and as result k8s restarts pod. We tried 
> multiple times, but every time getting -2. JM log is below (newest messages 
> on top):
>
> 2022-04-06T14:21:17.465Z Error during shutdown of 
> StandaloneApplicationClusterEntryPoint via JVM shutdown hook.
> java.util.concurrent.CompletionException: 
> java.util.concurrent.TimeoutException
>  at 
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
>  at 
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
>  at 
> java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:661)
>  at 
> java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:683)
>  at 
> java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2010)
>  at 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.closeAsync(ClusterEntrypoint.java:379)
>  at 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$new$0(ClusterEntrypoint.java:168)
>  at 
> org.apache.flink.util.ShutdownHookUtil.lambda$addShutdownHook$0(ShutdownHookUtil.java:39)
>  at java.lang.Thread.run(Thread.java:750) Caused by: 
> java.util.concurrent.TimeoutException: null
>  at 
> org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:1255)
>  at 
> org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:217)
>  at 
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$orTimeout$15(FutureUtils.java:582)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  ... 1 common frames omitted
> 2022-04-06T14:21:17.464Z Terminating cluster entrypoint process 
> StandaloneApplicationClusterEntryPoint with exit code 2.
> java.util.concurrent.TimeoutException: null
>  at 
> org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:1255)
>  at 
> org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:217)
>  at 
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$orTimeout$15(FutureUtils.java:582)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at  
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:750)
> 2022-04-06T14:21:17.463Z Stopped Akka RPC service.
> 2022-04-06T14:21:17.46Z Stopped Akka RPC service.
> 2022-04-06T14:21:17.373Z Remoting shut down.
> 2022-04-06T14:21:17.373Z Remoting shut down.
> 2022-04-06T14:21:17.297Z Remote daemon shut down; proceeding with flushing 
> remote transports.
> 2022-04-06T14:21:17.297Z Remote daemon shut down; proceeding with flushing 
> remote transports.
> 2022-04-06T14:21:17.296Z Shutting down remote daemon.
> 2022-04-06T14:21:17.296Z Shutting down remote 

RE: RocksDB state not cleaned up

2022-04-08 Thread Alexis Sarda-Espinosa
May I ask if anyone tested RocksDB’s periodic compaction in the meantime? And 
if yes, if it helped with this case.

Regards,
Alexis.

From: tao xiao 
Sent: Samstag, 18. September 2021 05:01
To: David Morávek 
Cc: Yun Tang ; user 
Subject: Re: RocksDB state not cleaned up

Thanks for the feedback! However TTL already proves that the state cannot be 
cleaned up on time due to too many levels built up in RocksDB.

Hi @Yun Tang do you have any suggestions to tune 
RocksDB to accelerate the compaction progress?

On Fri, Sep 17, 2021 at 8:01 PM David Morávek 
mailto:d...@apache.org>> wrote:
Cleaning up with timers should solve this. Both approaches have some advantages 
and disadvantages though.

Timers:
- No "side effects".
- Can be set in event time. Deletes are regular tombstones that will get 
compacted later on.

TTL:
- Performance. This costs literally nothing compared to an extra state for 
timer + writing a tombstone marker.
- Has "side-effects", because it works in processing time. This is just 
something to keep in mind eg. when bootstraping the state from historical data. 
(large event time / processing time skew)

With 1.14 release, we've bumped the RocksDB version so it may be possible to 
use a "periodic compaction" [1], but nobody has tried that so far. In the 
meantime I think there is non real workaround because we don't expose a way to 
trigger manual compaction.

I'm off to vacation until 27th and I won't be responsive during that time. I'd 
like to pull Yun into the conversation as he's super familiar with the RocksDB 
state backend.

[1] 
https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide#periodic-and-ttl-compaction

Best,
D.

On Fri, Sep 17, 2021 at 5:17 AM tao xiao 
mailto:xiaotao...@gmail.com>> wrote:
Hi David,

Confirmed with RocksDB log Stephan's observation is the root cause that 
compaction doesn't clean up the high level sst files fast enough.  Do you think 
manual clean up by registering a timer is the way to go or any RocksDB 
parameter can be tuned to mitigate this issue?

On Wed, Sep 15, 2021 at 12:10 AM tao xiao 
mailto:xiaotao...@gmail.com>> wrote:
Hi David,

If I read Stephan's comment correctly TTL doesn't work well for cases where we 
have too many levels, like fast growing state,  as compaction doesn't clean up 
high level SST files in time, Is this correct? If yes should we register a 
timer with TTL time and manual clean up the state (state.clear() ) when the 
timer fires?

I will turn on RocksDB logging as well as compaction logging [1] to verify this

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/state.html#cleanup-during-rocksdb-compaction


On Tue, Sep 14, 2021 at 5:38 PM David Morávek 
mailto:d...@apache.org>> wrote:
Hi Tao,

my intuition is that the compaction of SST files is not triggering. By default, 
it's only triggered by the size ratios of different levels [1] and the TTL 
mechanism has no effect on it.

Some reasoning from Stephan:

It's very likely to have large files in higher levels that haven't been 
compacted in a long time and thus just stay around.

This might be especially possible if you insert a lot in the beginning (build 
up many levels) and then have a moderate rate of modifications, so the changes 
and expiration keep happening purely in the merges / compactions of the first 
levels. Then the later levels may stay unchanged for quite some time.

You should be able to see compaction details by setting RocksDB logging to INFO 
[2]. Can you please check these and validate whether this really is the case?

[1] https://github.com/facebook/rocksdb/wiki/Leveled-Compaction
[2] 
https://ververica.zendesk.com/hc/en-us/articles/360015933320-How-to-get-RocksDB-s-LOG-file-back-for-advanced-troubleshooting

Best,
D.

On Mon, Sep 13, 2021 at 3:18 PM tao xiao 
mailto:xiaotao...@gmail.com>> wrote:
Hi team

We have a job that uses value state with RocksDB and TTL set to 1 day. The TTL 
update type is OnCreateAndWrite. We set the value state when the value state 
doesn't exist and we never update it again after the state is not empty. The 
key of the value state is timestamp. My understanding of such TTL settings is 
that the size of all SST files remains flat (let's disregard the impact space 
amplification brings) after 1 day as the daily data volume is more or less the 
same. However the RocksDB native metrics show that the SST files continue to 
grow since I started the job. I check the SST files in local storage and I can 
see SST files with age 1 months ago (when I started the job). What is the 
possible reason for the SST files not cleaned up?.

The Flink version is 1.12.1
State backend is RocksDB with incremental checkpoint
All default configuration for RocksDB
Per job mode in Yarn and checkpoint to S3


Here is the code to set value state

public void open(Configuration parameters) {
StateTtlConfig ttlConfigClick = StateTtlConfig
.newBuilder(Time.days(1))
   

Re: Weird Flink Kafka source watermark behavior

2022-04-08 Thread Martijn Visser
Hi Jin,

Flink is an open source project, so the community works on best-effort.
There's no guaranteed/quick support available. There are companies that
provide commercial support if needed.

Best regards,

Martijn Visser
https://twitter.com/MartijnVisser82
https://github.com/MartijnVisser


On Fri, 8 Apr 2022 at 12:13, Jin Yi  wrote:

> confirmed that moving back to FlinkKafkaConsumer fixes things.
>
> is there some notification channel/medium that highlights critical
> bugs/issues on the intended features like this pretty readily?
>
> On Fri, Apr 8, 2022 at 2:18 AM Jin Yi  wrote:
>
>> based on symptoms/observations on the first operator (LogRequestFilter)
>> watermark and event timestamps, it does seem like it's the bug.  things
>> track fine (timestamp > watermark) for the first batch of events, then the
>> event timestamps go back into the past and are "late".
>>
>> looks like the 1.14 backport just got in 11 days ago (
>> https://github.com/apache/flink/pull/19128).  is there a way to easily
>> test this fix locally?  based on the threads, should i just move back to
>> FlinkKafkaConsumer until 1.14.5?
>>
>> On Fri, Apr 8, 2022 at 1:34 AM Qingsheng Ren  wrote:
>>
>>> Hi Jin,
>>>
>>> If you are using new FLIP-27 sources like KafkaSource, per-partition
>>> watermark (or per-split watermark) is a default feature integrated in
>>> SourceOperator. You might hit the bug described in FLINK-26018 [1], which
>>> happens during the first fetch of the source that records in the first
>>> split pushes the watermark far away, then records from other splits will be
>>> treated as late events.
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-26018
>>>
>>> Best regards,
>>>
>>> Qingsheng
>>>
>>>
>>> > On Apr 8, 2022, at 15:54, Jin Yi  wrote:
>>> >
>>> > how should the code look like to verify we're using per-partition
>>> watermarks if we moved away from FlinkKafkaConsumer to KafkaSource in
>>> 1.14.4?
>>> >
>>> > we currently have it looking like:
>>> >
>>> > streamExecutionEnvironment.fromSource(
>>> >KafkaSource.builder().build(),
>>> >watermarkStrategy,
>>> >"whatever",
>>> >typeInfo);
>>> >
>>> > when running this job with the streamExecutionEnviornment parallelism
>>> set to 1, and the kafka source having 30 partitions, i'm seeing weird
>>> behaviors where the first operator after this source consumes events out of
>>> order (and therefore, violates watermarks).  the operator simply checks to
>>> see what "type" of event something is and uses side outputs to output the
>>> type-specific messages.  here's a snippet of the event timestamp going back
>>> before the current watermark (first instance of going backwards in time in
>>> bold):
>>> >
>>> > 2022-04-08 05:47:06,315 WARN
>>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>>> [] - LogRequestFilter ts: 1649284267139 watermark: 1649284187140
>>> > 2022-04-08 05:47:06,315 WARN
>>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>>> [] - LogRequestFilter ts: 1649284268138 watermark: 1649284187140
>>> > 2022-04-08 05:47:06,315 WARN
>>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>>> [] - LogRequestFilter ts: 1649284269138 watermark: 1649284187140
>>> > 2022-04-08 05:47:06,315 WARN
>>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>>> [] - LogRequestFilter ts: 1649284270139 watermark: 1649284187140
>>> > 2022-04-08 05:47:06,315 WARN
>>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>>> [] - LogRequestFilter ts: 1649284271139 watermark: 1649284187140
>>> > 2022-04-08 05:47:06,315 WARN
>>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>>> [] - LogRequestFilter ts: 1649284171037 watermark: 1649284187140
>>> > 2022-04-08 05:47:06,316 WARN
>>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>>> [] - LogRequestFilter ts: 1649284172057 watermark: 1649284187140
>>> > 2022-04-08 05:47:06,316 WARN
>>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>>> [] - LogRequestFilter ts: 1649284172067 watermark: 1649284187140
>>> > 2022-04-08 05:47:06,316 WARN
>>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>>> [] - LogRequestFilter ts: 1649284172171 watermark: 1649284187140
>>> > 2022-04-08 05:47:06,316 WARN
>>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>>> [] - LogRequestFilter ts: 1649284172174 watermark: 1649284187140
>>> > 2022-04-08 05:47:06,317 WARN
>>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>>> [] - LogRequestFilter ts: 1649284172666 watermark: 1649284187140
>>> >
>>> >
>>> >
>>> > On Sat, Mar 19, 2022 at 10:51 AM Dan Hill 
>>> wrote:
>>> > I dove deeper.  I wasn't actually using per-partition watermarks.
>>> Thank you for the help!
>>> >
>>> > On Fri, Mar 18, 2022 at 12:11 PM Dan Hill 
>>> wrote:
>>> > Thanks, Thias and Dongwon.
>>> >

Re: Weird Flink Kafka source watermark behavior

2022-04-08 Thread Jin Yi
confirmed that moving back to FlinkKafkaConsumer fixes things.

is there some notification channel/medium that highlights critical
bugs/issues on the intended features like this pretty readily?

On Fri, Apr 8, 2022 at 2:18 AM Jin Yi  wrote:

> based on symptoms/observations on the first operator (LogRequestFilter)
> watermark and event timestamps, it does seem like it's the bug.  things
> track fine (timestamp > watermark) for the first batch of events, then the
> event timestamps go back into the past and are "late".
>
> looks like the 1.14 backport just got in 11 days ago (
> https://github.com/apache/flink/pull/19128).  is there a way to easily
> test this fix locally?  based on the threads, should i just move back to
> FlinkKafkaConsumer until 1.14.5?
>
> On Fri, Apr 8, 2022 at 1:34 AM Qingsheng Ren  wrote:
>
>> Hi Jin,
>>
>> If you are using new FLIP-27 sources like KafkaSource, per-partition
>> watermark (or per-split watermark) is a default feature integrated in
>> SourceOperator. You might hit the bug described in FLINK-26018 [1], which
>> happens during the first fetch of the source that records in the first
>> split pushes the watermark far away, then records from other splits will be
>> treated as late events.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-26018
>>
>> Best regards,
>>
>> Qingsheng
>>
>>
>> > On Apr 8, 2022, at 15:54, Jin Yi  wrote:
>> >
>> > how should the code look like to verify we're using per-partition
>> watermarks if we moved away from FlinkKafkaConsumer to KafkaSource in
>> 1.14.4?
>> >
>> > we currently have it looking like:
>> >
>> > streamExecutionEnvironment.fromSource(
>> >KafkaSource.builder().build(),
>> >watermarkStrategy,
>> >"whatever",
>> >typeInfo);
>> >
>> > when running this job with the streamExecutionEnviornment parallelism
>> set to 1, and the kafka source having 30 partitions, i'm seeing weird
>> behaviors where the first operator after this source consumes events out of
>> order (and therefore, violates watermarks).  the operator simply checks to
>> see what "type" of event something is and uses side outputs to output the
>> type-specific messages.  here's a snippet of the event timestamp going back
>> before the current watermark (first instance of going backwards in time in
>> bold):
>> >
>> > 2022-04-08 05:47:06,315 WARN
>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>> [] - LogRequestFilter ts: 1649284267139 watermark: 1649284187140
>> > 2022-04-08 05:47:06,315 WARN
>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>> [] - LogRequestFilter ts: 1649284268138 watermark: 1649284187140
>> > 2022-04-08 05:47:06,315 WARN
>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>> [] - LogRequestFilter ts: 1649284269138 watermark: 1649284187140
>> > 2022-04-08 05:47:06,315 WARN
>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>> [] - LogRequestFilter ts: 1649284270139 watermark: 1649284187140
>> > 2022-04-08 05:47:06,315 WARN
>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>> [] - LogRequestFilter ts: 1649284271139 watermark: 1649284187140
>> > 2022-04-08 05:47:06,315 WARN
>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>> [] - LogRequestFilter ts: 1649284171037 watermark: 1649284187140
>> > 2022-04-08 05:47:06,316 WARN
>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>> [] - LogRequestFilter ts: 1649284172057 watermark: 1649284187140
>> > 2022-04-08 05:47:06,316 WARN
>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>> [] - LogRequestFilter ts: 1649284172067 watermark: 1649284187140
>> > 2022-04-08 05:47:06,316 WARN
>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>> [] - LogRequestFilter ts: 1649284172171 watermark: 1649284187140
>> > 2022-04-08 05:47:06,316 WARN
>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>> [] - LogRequestFilter ts: 1649284172174 watermark: 1649284187140
>> > 2022-04-08 05:47:06,317 WARN
>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>> [] - LogRequestFilter ts: 1649284172666 watermark: 1649284187140
>> >
>> >
>> >
>> > On Sat, Mar 19, 2022 at 10:51 AM Dan Hill 
>> wrote:
>> > I dove deeper.  I wasn't actually using per-partition watermarks.
>> Thank you for the help!
>> >
>> > On Fri, Mar 18, 2022 at 12:11 PM Dan Hill 
>> wrote:
>> > Thanks, Thias and Dongwon.
>> >
>> > I'll keep debugging this with the idle watermark turned off.
>> >
>> > Next TODOs:
>> > - Verify that we’re using per-partition watermarks.  Our code matches
>> the example but maybe something is disabling it.
>> > - Enable logging of partition-consumer assignment, to see if that is
>> the cause of the problem.
>> > - Look at adding flags to set the source parallelism to see if that
>> fixes the issue.
>> >
>> > Yes, I've seen Flink talks 

RE: Using state processor API to read state defined with a TypeHint

2022-04-08 Thread Alexis Sarda-Espinosa
Hi Roman,

Thanks for the quick response. It wasn't that, but your comment about erasure 
made me realize I should have debugged the code and looked at the types. 
Apparently setting TTL changes the serializer, so I also had to add TTL in the 
WindowReaderFunction.

Regards,
Alexis.

-Original Message-
From: Roman Khachatryan  
Sent: Freitag, 8. April 2022 11:48
To: Alexis Sarda-Espinosa 
Cc: user@flink.apache.org
Subject: Re: Using state processor API to read state defined with a TypeHint

Hi Alexis,

I think your setup is fine, but probably Java type erasure makes Flink consider 
the two serializers as different.
Could you try creating a MapStateDescriptor by explicitly providing serializers 
(constructed manually)?

Regards,
Roman


On Fri, Apr 8, 2022 at 10:01 AM Alexis Sarda-Espinosa 
 wrote:
>
> Hi everyone,
>
>
>
> I have a ProcessWindowFunction that uses Global window state. It uses 
> MapState with a descriptor defined like this:
>
>
>
> MapStateDescriptor> msd = new MapStateDescriptor<>(
>
> "descriptorName",
>
> TypeInformation.of(Long.class),
>
> TypeInformation.of(new TypeHint>() {})
>
> );
>
>
>
> Now I’m trying to access a checkpoint’s state data to read that (created with 
> RocksDB as backend in Flink 1.14.4). I have a WindowReaderFunction Integer, String, TimeWindow> that defines the same descriptor and calls this 
> in readWindow:
>
>
>
> MapState> mapState = 
> context.globalState().getMapState(msd);
>
>
>
> After loading the savepoint with EmbeddedRocksDBStateBackend(true), I try to 
> configure the reader function like this:
>
>
>
> savepoint
>
> .window(SlidingEventTimeWindows.of(Time.minutes(11L), 
> Time.minutes(1L)))
>
> .process(
>
> "my-uid",
>
> new StateReaderFunction(),
>
> Types.STRING,
>
> TypeInformation.of(MyPojo.class),
>
> Types.INT
>
> )
>
> .print();
>
>
>
> But I am getting this exception:
>
>
>
> Caused by: org.apache.flink.util.StateMigrationException: The new state 
> serializer 
> (org.apache.flink.api.common.typeutils.base.MapSerializer@a07c9263) must not 
> be incompatible with the old state serializer 
> (org.apache.flink.api.common.typeutils.base.MapSerializer@706b3103).
>
>
>
> Does someone know what I’m doing wrong in my setup?
>
>
>
> Regards,
>
> Alexis.
>
>


Re:yarn api 提交报错

2022-04-08 Thread Mang Zhang



这个异常看起来是提交到YARN集群,启动AM的时候报错了,可以到集群上看AM的log
比如log里提示的 http://bigdata-beta2:8088/cluster/app/application_1647875542261_0079
另外就是,从你的代码看,你是用YARN的API方式提交任务,那么就需要自己将任务依赖的所有jar都注册给YARN
比如flink lib下的jar,使用 YarnLocalResourceDescriptor 
注册依赖等,过程可能会比较曲折,需要先了解YARN的任务部署机制;


建议还是直接使用Flink提供好的命令和工具来提交任务,引擎已经将这些步骤都封装好了










--

Best regards,
Mang Zhang





在 2022-04-08 09:53:45,"周涛" <06160...@163.com> 写道:
>
>
>
>
>
>
>hi,
>我在测试使用java api提交flink任务时,遇到了一些问题,需要请教:
>flink版本1.14.4
>Hadoop版本:3.0.0-cdh6.2.1
>application模式,使用命令提交正常运行,api提交失败
>提交失败,yarn日志:
>   LogType:jobmanager.err
>   LogLastModifiedTime:Fri Apr 08 09:24:01 +0800 2022
>   LogLength:107
>   LogContents:
>   Error: Could not find or load main class 
> org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint
>   End of LogType:jobmanager.err
>以下是代码:
>
>
> System.setProperty("HADOOP_USER_NAME", "hdfs");
>//flink的本地配置目录,为了得到flink的配置
>String configurationDirectory = 
>"E:\\AtourWork\\dctool\\dctool-common\\dctool-common-hdfs\\src\\main\\resources\\test\\flink114\\";
>
>//存放flink集群相关的jar包目录
>String flinkLibs = "hdfs://nameservice1/flink/jar/libs/lib/";
>//用户jar
>String userJarPath = 
>"hdfs://nameservice1/flink/jar/userTask/flink-streaming-test-1.0-SNAPSHOT.jar";
>String flinkDistJar = 
> "hdfs://nameservice1/flink/jar/libs/flink-dist.jar";
>
>YarnClientService yarnClientService = new YarnClientService();
>//yarnclient创建
>YarnClient yarnClient = yarnClientService.getYarnClient();
>yarnClient.start();
>
>// 设置日志的,没有的话看不到日志
>YarnClusterInformationRetriever clusterInformationRetriever = 
>YarnClientYarnClusterInformationRetriever
>.create(yarnClient);
>
>//获取flink的配置
>Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration(
>configurationDirectory);
>
>
> flinkConfiguration.setString(ConfigOptions.key("fs.hdfs.hadoopconf").stringType().noDefaultValue(),
>"E:\\AtourWork\\dctool\\dctool-common\\dctool-common-hdfs\\src\\main\\resources\\test\\yarn\\");
>
>flinkConfiguration.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, 
> true);
>
>flinkConfiguration.set(PipelineOptions.JARS, 
> Collections.singletonList(userJarPath));
>
>Path remoteLib = new Path(flinkLibs);
>flinkConfiguration.set(
>YarnConfigOptions.PROVIDED_LIB_DIRS,
>Collections.singletonList(remoteLib.toString()));
>
>flinkConfiguration.set(
>YarnConfigOptions.FLINK_DIST_JAR,
>flinkDistJar);
>
>// 设置为application模式
>flinkConfiguration.set(
>DeploymentOptions.TARGET,
>YarnDeploymentTarget.APPLICATION.getName());
>
>// yarn application name
>flinkConfiguration.set(YarnConfigOptions.APPLICATION_NAME, 
>"flink-application");
>
>YarnLogConfigUtil.setLogConfigFileInConfig(flinkConfiguration, 
> configurationDirectory);
>
>// 设置用户jar的参数和主类
>ApplicationConfiguration appConfig = new ApplicationConfiguration(new 
>String[]{}, "com.zt.FlinkTest1");
>
>
>final int jobManagerMemoryMB =
>
> JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(
>flinkConfiguration, 
> JobManagerOptions.TOTAL_PROCESS_MEMORY)
>.getTotalProcessMemorySize()
>.getMebiBytes();
>final int taskManagerMemoryMB =
>TaskExecutorProcessUtils.processSpecFromConfig(
>TaskExecutorProcessUtils
>
> .getConfigurationMapLegacyTaskManagerHeapSizeToConfigOption(
>flinkConfiguration,
>
> TaskManagerOptions.TOTAL_PROCESS_MEMORY))
>.getTotalProcessMemorySize()
>.getMebiBytes();
>ClusterSpecification clusterSpecification = new 
> ClusterSpecification.ClusterSpecificationBuilder()
>.setMasterMemoryMB(jobManagerMemoryMB)
>.setTaskManagerMemoryMB(taskManagerMemoryMB)
>
> .setSlotsPerTaskManager(flinkConfiguration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS))
>.createClusterSpecification();
>YarnClusterDescriptor yarnClusterDescriptor = new 
> YarnClusterDescriptor(
>flinkConfiguration,
>(YarnConfiguration) yarnClient.getConfig(),
>yarnClient,
>clusterInformationRetriever,
>true);
>
>try {
>ClusterClientProvider clusterClientProvider = 
> yarnClusterDescriptor.deployApplicationCluster(
>clusterSpecification,
>appConfig);
>
>ClusterClient clusterClient = 
> clusterClientProvider.getClusterClient();
>
>ApplicationId applicationId = clusterClient.getClusterId();
>String webInterfaceURL = 

Re: Using state processor API to read state defined with a TypeHint

2022-04-08 Thread Roman Khachatryan
Hi Alexis,

I think your setup is fine, but probably Java type erasure makes Flink
consider the two serializers as different.
Could you try creating a MapStateDescriptor by explicitly providing
serializers (constructed manually)?

Regards,
Roman


On Fri, Apr 8, 2022 at 10:01 AM Alexis Sarda-Espinosa
 wrote:
>
> Hi everyone,
>
>
>
> I have a ProcessWindowFunction that uses Global window state. It uses 
> MapState with a descriptor defined like this:
>
>
>
> MapStateDescriptor> msd = new MapStateDescriptor<>(
>
> "descriptorName",
>
> TypeInformation.of(Long.class),
>
> TypeInformation.of(new TypeHint>() {})
>
> );
>
>
>
> Now I’m trying to access a checkpoint’s state data to read that (created with 
> RocksDB as backend in Flink 1.14.4). I have a WindowReaderFunction Integer, String, TimeWindow> that defines the same descriptor and calls this 
> in readWindow:
>
>
>
> MapState> mapState = 
> context.globalState().getMapState(msd);
>
>
>
> After loading the savepoint with EmbeddedRocksDBStateBackend(true), I try to 
> configure the reader function like this:
>
>
>
> savepoint
>
> .window(SlidingEventTimeWindows.of(Time.minutes(11L), 
> Time.minutes(1L)))
>
> .process(
>
> "my-uid",
>
> new StateReaderFunction(),
>
> Types.STRING,
>
> TypeInformation.of(MyPojo.class),
>
> Types.INT
>
> )
>
> .print();
>
>
>
> But I am getting this exception:
>
>
>
> Caused by: org.apache.flink.util.StateMigrationException: The new state 
> serializer 
> (org.apache.flink.api.common.typeutils.base.MapSerializer@a07c9263) must not 
> be incompatible with the old state serializer 
> (org.apache.flink.api.common.typeutils.base.MapSerializer@706b3103).
>
>
>
> Does someone know what I’m doing wrong in my setup?
>
>
>
> Regards,
>
> Alexis.
>
>


Re:flink sql 任务中jm Blob server 总是在凌晨报 java.io.exception :unknow opreation 71

2022-04-08 Thread Mang Zhang



图片挂了,如果要发图片,可以将图片上传到共享图床,然后把链接贴在邮件里;
或者是把异常信息直接贴在邮件内容里










--

Best regards,
Mang Zhang




在 2022-04-07 16:25:12,"su wenwen"  写道:

hi,all.想问大家下,是否有遇到过这个问题,flink 1.12 的版本
在线上运行的flink sql 作业,总是在凌晨报错如下:




blobserver 我理解是传输二进制jar 包,从hdfs 到 本地工作目录。但没发现其他环节出现问题,对任务数据未产生影响。。

Re: Weird Flink Kafka source watermark behavior

2022-04-08 Thread Jin Yi
based on symptoms/observations on the first operator (LogRequestFilter)
watermark and event timestamps, it does seem like it's the bug.  things
track fine (timestamp > watermark) for the first batch of events, then the
event timestamps go back into the past and are "late".

looks like the 1.14 backport just got in 11 days ago (
https://github.com/apache/flink/pull/19128).  is there a way to easily test
this fix locally?  based on the threads, should i just move back to
FlinkKafkaConsumer until 1.14.5?

On Fri, Apr 8, 2022 at 1:34 AM Qingsheng Ren  wrote:

> Hi Jin,
>
> If you are using new FLIP-27 sources like KafkaSource, per-partition
> watermark (or per-split watermark) is a default feature integrated in
> SourceOperator. You might hit the bug described in FLINK-26018 [1], which
> happens during the first fetch of the source that records in the first
> split pushes the watermark far away, then records from other splits will be
> treated as late events.
>
> [1] https://issues.apache.org/jira/browse/FLINK-26018
>
> Best regards,
>
> Qingsheng
>
>
> > On Apr 8, 2022, at 15:54, Jin Yi  wrote:
> >
> > how should the code look like to verify we're using per-partition
> watermarks if we moved away from FlinkKafkaConsumer to KafkaSource in
> 1.14.4?
> >
> > we currently have it looking like:
> >
> > streamExecutionEnvironment.fromSource(
> >KafkaSource.builder().build(),
> >watermarkStrategy,
> >"whatever",
> >typeInfo);
> >
> > when running this job with the streamExecutionEnviornment parallelism
> set to 1, and the kafka source having 30 partitions, i'm seeing weird
> behaviors where the first operator after this source consumes events out of
> order (and therefore, violates watermarks).  the operator simply checks to
> see what "type" of event something is and uses side outputs to output the
> type-specific messages.  here's a snippet of the event timestamp going back
> before the current watermark (first instance of going backwards in time in
> bold):
> >
> > 2022-04-08 05:47:06,315 WARN
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
> [] - LogRequestFilter ts: 1649284267139 watermark: 1649284187140
> > 2022-04-08 05:47:06,315 WARN
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
> [] - LogRequestFilter ts: 1649284268138 watermark: 1649284187140
> > 2022-04-08 05:47:06,315 WARN
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
> [] - LogRequestFilter ts: 1649284269138 watermark: 1649284187140
> > 2022-04-08 05:47:06,315 WARN
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
> [] - LogRequestFilter ts: 1649284270139 watermark: 1649284187140
> > 2022-04-08 05:47:06,315 WARN
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
> [] - LogRequestFilter ts: 1649284271139 watermark: 1649284187140
> > 2022-04-08 05:47:06,315 WARN
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
> [] - LogRequestFilter ts: 1649284171037 watermark: 1649284187140
> > 2022-04-08 05:47:06,316 WARN
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
> [] - LogRequestFilter ts: 1649284172057 watermark: 1649284187140
> > 2022-04-08 05:47:06,316 WARN
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
> [] - LogRequestFilter ts: 1649284172067 watermark: 1649284187140
> > 2022-04-08 05:47:06,316 WARN
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
> [] - LogRequestFilter ts: 1649284172171 watermark: 1649284187140
> > 2022-04-08 05:47:06,316 WARN
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
> [] - LogRequestFilter ts: 1649284172174 watermark: 1649284187140
> > 2022-04-08 05:47:06,317 WARN
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
> [] - LogRequestFilter ts: 1649284172666 watermark: 1649284187140
> >
> >
> >
> > On Sat, Mar 19, 2022 at 10:51 AM Dan Hill  wrote:
> > I dove deeper.  I wasn't actually using per-partition watermarks.  Thank
> you for the help!
> >
> > On Fri, Mar 18, 2022 at 12:11 PM Dan Hill  wrote:
> > Thanks, Thias and Dongwon.
> >
> > I'll keep debugging this with the idle watermark turned off.
> >
> > Next TODOs:
> > - Verify that we’re using per-partition watermarks.  Our code matches
> the example but maybe something is disabling it.
> > - Enable logging of partition-consumer assignment, to see if that is the
> cause of the problem.
> > - Look at adding flags to set the source parallelism to see if that
> fixes the issue.
> >
> > Yes, I've seen Flink talks on creating our own watermarks through
> Kafka.  Sounds like a good idea.
> >
> > On Fri, Mar 18, 2022 at 1:17 AM Dongwon Kim 
> wrote:
> > I totally agree with Schwalbe that per-partition watermarking allows #
> source tasks < # kafka partitions.
> >
> > Otherwise, Dan, you should suspect other possibilities like what
> Schwalbe said.
> >
> > Best,
> >
> > 

Re: Weird Flink Kafka source watermark behavior

2022-04-08 Thread Qingsheng Ren
Hi Jin,

If you are using new FLIP-27 sources like KafkaSource, per-partition watermark 
(or per-split watermark) is a default feature integrated in SourceOperator. You 
might hit the bug described in FLINK-26018 [1], which happens during the first 
fetch of the source that records in the first split pushes the watermark far 
away, then records from other splits will be treated as late events.  

[1] https://issues.apache.org/jira/browse/FLINK-26018

Best regards,

Qingsheng


> On Apr 8, 2022, at 15:54, Jin Yi  wrote:
> 
> how should the code look like to verify we're using per-partition watermarks 
> if we moved away from FlinkKafkaConsumer to KafkaSource in 1.14.4?
> 
> we currently have it looking like:
> 
> streamExecutionEnvironment.fromSource(
>KafkaSource.builder().build(),
>watermarkStrategy,
>"whatever",
>typeInfo);
> 
> when running this job with the streamExecutionEnviornment parallelism set to 
> 1, and the kafka source having 30 partitions, i'm seeing weird behaviors 
> where the first operator after this source consumes events out of order (and 
> therefore, violates watermarks).  the operator simply checks to see what 
> "type" of event something is and uses side outputs to output the 
> type-specific messages.  here's a snippet of the event timestamp going back 
> before the current watermark (first instance of going backwards in time in 
> bold):
> 
> 2022-04-08 05:47:06,315 WARN  
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] 
> - LogRequestFilter ts: 1649284267139 watermark: 1649284187140
> 2022-04-08 05:47:06,315 WARN  
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] 
> - LogRequestFilter ts: 1649284268138 watermark: 1649284187140
> 2022-04-08 05:47:06,315 WARN  
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] 
> - LogRequestFilter ts: 1649284269138 watermark: 1649284187140
> 2022-04-08 05:47:06,315 WARN  
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] 
> - LogRequestFilter ts: 1649284270139 watermark: 1649284187140
> 2022-04-08 05:47:06,315 WARN  
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] 
> - LogRequestFilter ts: 1649284271139 watermark: 1649284187140
> 2022-04-08 05:47:06,315 WARN  
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] 
> - LogRequestFilter ts: 1649284171037 watermark: 1649284187140
> 2022-04-08 05:47:06,316 WARN  
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] 
> - LogRequestFilter ts: 1649284172057 watermark: 1649284187140
> 2022-04-08 05:47:06,316 WARN  
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] 
> - LogRequestFilter ts: 1649284172067 watermark: 1649284187140
> 2022-04-08 05:47:06,316 WARN  
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] 
> - LogRequestFilter ts: 1649284172171 watermark: 1649284187140
> 2022-04-08 05:47:06,316 WARN  
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] 
> - LogRequestFilter ts: 1649284172174 watermark: 1649284187140
> 2022-04-08 05:47:06,317 WARN  
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] 
> - LogRequestFilter ts: 1649284172666 watermark: 1649284187140
> 
> 
> 
> On Sat, Mar 19, 2022 at 10:51 AM Dan Hill  wrote:
> I dove deeper.  I wasn't actually using per-partition watermarks.  Thank you 
> for the help!
> 
> On Fri, Mar 18, 2022 at 12:11 PM Dan Hill  wrote:
> Thanks, Thias and Dongwon.
> 
> I'll keep debugging this with the idle watermark turned off.
> 
> Next TODOs:
> - Verify that we’re using per-partition watermarks.  Our code matches the 
> example but maybe something is disabling it.
> - Enable logging of partition-consumer assignment, to see if that is the 
> cause of the problem.
> - Look at adding flags to set the source parallelism to see if that fixes the 
> issue.
> 
> Yes, I've seen Flink talks on creating our own watermarks through Kafka.  
> Sounds like a good idea.
> 
> On Fri, Mar 18, 2022 at 1:17 AM Dongwon Kim  wrote:
> I totally agree with Schwalbe that per-partition watermarking allows # source 
> tasks < # kafka partitions. 
> 
> Otherwise, Dan, you should suspect other possibilities like what Schwalbe 
> said.
> 
> Best,
> 
> Dongwon
> 
> On Fri, Mar 18, 2022 at 5:01 PM Schwalbe Matthias 
>  wrote:
> Hi San, Dongwon,
> 
>  
> 
> I share the opinion that when per-partition watermarking is enabled, you 
> should observe correct behavior … would be interesting to see why it does not 
> work for you.
> 
>  
> 
> I’d like to clear one tiny misconception here when you write:
> 
>  
> 
> >> - The same issue happens even if I use an idle watermark.
> 
>  
> 
> You would expect to see glitches with watermarking when you enable idleness.
> 
> Idleness sort of trades watermark correctness for reduces latency when 
> processing timers (much 

Re: k8s session cluster flink1.13.6创建后提示的地址啥用。

2022-04-08 Thread Zhanghao Chen
Standalone K8s 和 Native K8s 部署模式主要的区别是 Native K8s 模式下的 Flink 具备和 K8s API Server 
直接通信来申请所需的资源和感知集群状态的能力,而 Standalone K8s 对底层的 K8s 集群没有直接感知,这带来了两个主要区别:


  1.  在部署上,Standalone K8s 需要你手动去创建集群所需要的 deployment、configmap、service,而 Native 
K8s 你只需要调用 Flink CLI 就行。
  2.  在资源申请上,Standalone K8s 使用被动资源管理 - 需要你或者其他外部系统分配好资源,Flink 
被动接受这些分配好的资源;Native K8s 使用主动资源管理 - Flink 集群启动后自己会根据提交上来的作业的属性去跟 K8s 申请所需要的资源。

Best,
Zhanghao Chen

From: yidan zhao 
Sent: Friday, April 8, 2022 10:52
To: user-zh 
Subject: Re: k8s session cluster flink1.13.6创建后提示的地址啥用。

貌似官网对flink k8s情况有2个入口,分别为:
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/resource-providers/standalone/kubernetes/#session-mode
和
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/。

分别对应 Resource Providers/Standalone/Kubernetes 和 Kubernetes Resource
Providers/Native
Kubernetes。有人知道啥区别吗。从文档来看,貌似前者是给了具体的service、deployment等yml描述,然后自己创建集群。后者是脚本一键创建。但如果仅仅是这个区别,为啥有“standalone/kubernetes”和“native
kubernetes”这种区分呢?

>
> 集群是3台物理机搭建,非minikube。
> 不清楚是否和网卡有关,init搭建时就有网络问题,k8s要根据默认路由网卡ip决定监听的地址。
> 但是我感觉这个场景不应该,因为既然是clusterIp,创建后提示信息就应该提示用clusterIp吧,为啥提示的用了本机的网卡ip呢。
>
> yidan zhao  于2022年4月8日周五 10:38写道:
> >
> > 如下是 describe svc my-first-flink-cluster-rest 的结果:
> > Name: my-first-flink-cluster-rest
> > Namespace:default
> > Labels:   app=my-first-flink-cluster
> >   type=flink-native-kubernetes
> > Annotations:  
> > Selector:
> > app=my-first-flink-cluster,component=jobmanager,type=flink-native-kubernetes
> > Type: LoadBalancer
> > IP Family Policy: SingleStack
> > IP Families:  IPv4
> > IP:   192.168.127.57
> > IPs:  192.168.127.57
> > Port: rest  8081/TCP
> > TargetPort:   8081/TCP
> > NodePort: rest  31419/TCP
> > Endpoints:192.168.130.11:8081
> > Session Affinity: None
> > External Traffic Policy:  Cluster
> > Events:   
> >
> > 如上,其中IP为192.168.127.57,这个是ClusterIp是可以访问的。我是不知道为啥创建之后提示的地址不是这个,而且通过
> > -Dkubernetes.cluster-id=my-first-flink-cluster检索到的地址也不是192那个,导致无法提交任务等。
> >
> > yu'an huang  于2022年4月8日周五 02:11写道:
> > >
> > > 理论上cluster ip是不可能在集群外访问的,你的Kubernetes环境是怎么搭建的呢?Minikube吗?
> > >
> > > 方便的话可以分享你运行这个命令的结果吗?
> > > 》kubectl describe svc  my-first-flink-cluster-rest
> > >
> > >
> > >
> > > > On 7 Apr 2022, at 4:44 PM, Zhanghao Chen  
> > > > wrote:
> > > >
> > > > 你 kubernetes.rest-service.exposed.type 这个参数设置的是什么呢?
> > > >
> > > > Best,
> > > > Zhanghao Chen
> > > > 
> > > > From: yidan zhao 
> > > > Sent: Thursday, April 7, 2022 11:41
> > > > To: user-zh 
> > > > Subject: k8s session cluster flink1.13.6创建后提示的地址啥用。
> > > >
> > > > 参考 
> > > > https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#starting-a-flink-session-on-kubernetes
> > > >
> > > > 基于命令创建k8s flink session集群(flink1.13.6):./bin/kubernetes-session.sh
> > > > -Dkubernetes.cluster-id=my-first-flink-cluster 。创建成功,最后提示一句 Create
> > > > flink session cluster my-first-flink-cluster successfully, JobManager
> > > > Web Interface: http://10.227.137.154:8081。但是这个地址访问不通。
> > > >
> > > > 并且通过如下命令提交任务也一样,会检索到如上地址,然后提交任务不成功。
> > > > ./bin/flink run \
> > > >--target kubernetes-session \
> > > >-Dkubernetes.cluster-id=my-first-flink-cluster \
> > > >./examples/streaming/TopSpeedWindowing.jar
> > > >
> > > > --- 然后如下方式是可以的,不清楚是啥问题呢。
> > > > 1 通过 kubectl get svc 拿到 my-first-flink-cluster-rest
> > > > 的clusterIp:port为192.168.127.57:8081。
> > > > 2 查看任务
> > > > flink list  -m 192.168.127.57:8081
> > > > 3 提交任务
> > > > flink run  -m 192.168.127.57:8081
> > > > /home/work/flink/examples/streaming/TopSpeedWindowing.jar
> > > >
> > > > --- 区别:192这个是clusterIp虚拟ip。  10.x那个是我机器ip。
> > >


Using state processor API to read state defined with a TypeHint

2022-04-08 Thread Alexis Sarda-Espinosa
Hi everyone,

I have a ProcessWindowFunction that uses Global window state. It uses MapState 
with a descriptor defined like this:

MapStateDescriptor> msd = new MapStateDescriptor<>(
"descriptorName",
TypeInformation.of(Long.class),
TypeInformation.of(new TypeHint>() {})
);

Now I'm trying to access a checkpoint's state data to read that (created with 
RocksDB as backend in Flink 1.14.4). I have a WindowReaderFunction that defines the same descriptor and calls this in 
readWindow:

MapState> mapState = context.globalState().getMapState(msd);

After loading the savepoint with EmbeddedRocksDBStateBackend(true), I try to 
configure the reader function like this:

savepoint
.window(SlidingEventTimeWindows.of(Time.minutes(11L), Time.minutes(1L)))
.process(
"my-uid",
new StateReaderFunction(),
Types.STRING,
TypeInformation.of(MyPojo.class),
Types.INT
)
.print();

But I am getting this exception:

Caused by: org.apache.flink.util.StateMigrationException: The new state 
serializer (org.apache.flink.api.common.typeutils.base.MapSerializer@a07c9263) 
must not be incompatible with the old state serializer 
(org.apache.flink.api.common.typeutils.base.MapSerializer@706b3103).

Does someone know what I'm doing wrong in my setup?

Regards,
Alexis.



Re: Is there any way to get the ExecutionConfigurations in Dynamic factory class

2022-04-08 Thread Qingsheng Ren
Hi Anitha,

AFAIK DynamicTableSourceFactory doesn’t expose interface for getting 
parallelism. Could you elaborate on why you need parallelism in table factory? 
Maybe we could find other ways to fulfill your requirement. 

Best regards, 

Qingsheng

> On Apr 7, 2022, at 16:11, Anitha Thankappan  
> wrote:
> 
> Hi 
> 
> I have developed a BigQuery Flink connector by implementing 
> DynamicTableSourceFactory.
> I have a requirement to :
>get the configured parallelism value of 
> StreamExecutionEnvironment in the Factory class.
>or
>set the parallelism at Factory class or Table source 
> class level.
> Please help me on this.
> 
> 
> Thanks and Regards,
> Anitha Thankappan
> 
> This message contains information that may be privileged or confidential and 
> is the property of the Quantiphi Inc and/or its affiliates. It is intended 
> only for the person to whom it is addressed. If you are not the intended 
> recipient, any review, dissemination, distribution, copying, storage or other 
> use of all or any portion of this message is strictly prohibited. If you 
> received this message in error, please immediately notify the sender by reply 
> e-mail and delete this message in its entirety