Re: Job Recovery Time on TM Lost

2021-06-29 Thread Lu Niu
Hi, Gen

Thanks for replying! The reasoning overall makes sense. But in this case,
when JM sends a cancel request to a killed TM,  why the call timeout 30s
instead of returning "connection refused" immediately?

Best
Lu

On Tue, Jun 29, 2021 at 7:52 PM Gen Luo  wrote:

> Hi Lu,
>
> We found almost the same thing when we were trying failover in a large
> scale job. The akka.ask.timeout and heartbeat.timeout were set to 10min for
> the test, and we found that the job would take 10min to recover from TM
> lost.
>
> We reached the conclusion that the behavior is expected in the Flink
> version we used(1.13), and it should also apply to previous versions.
> Here's how we think things are happening.
>
> Flink JM uses heartbeat to check TM status. When a TM is lost, an upstream
> or downstream TM will first sense it, while JM is not aware until the
> heartbeat of the lost TM is timeout. The upstream/downstream TM then sends
> an exception to JM, and JM will start canceling the job by sending cancel
> requests to all TMs. However, the lost TM can't respond to the request, and
> JM has to wait until the cancel request(via akka) is timeout before it can
> mark the TM failed and continue the failover procedure.
>
> We suppose that the phase from CANCELING to CANCELED takes
> min(akka.ask.timeout, heartbeat.timeout), though not confirmed yet.
>
> Hope it helps. Please let me know if there's anything wrong.
>
> Lu Niu  于2021年6月30日周三 上午8:45写道:
>
>> Hi, Flink Users
>>
>> We(Pinterest) are trying to speed up recovery speed when flink jobs hit
>> one-time exceptions. To understand the baseline, the first test we do is to
>> randomly kill one TM container and watch for how fast the flink job can
>> recover. We did such test to multiple jobs and here are some findings:
>>
>>- The whole recovery stage can break down into two phases:
>>   - Phase 1: job state switched from RUNNING -> RESTARTING ->
>>   RUNNING. All tasks switch from RUNNING -> CANCELING -> CANCELED.
>>   - Phase 2: All tasks switch from CREATED -> SCHEDULED -> DEPLOYING
>>   -> RUNNING.
>>- Phase 1 always takes around 30s and Phase 2 takes around 10 - 15s.
>>
>> Question:
>> Why does Phase 1 always take about 30s? I shared related logs about 2
>> jobs showing that. Does it have sth to do with akka config?
>>
>> Our setup:
>> flink version 1.11
>> running yarn per-job mode
>> akka.ask.timeout: 30 s
>> akka.lookup.timeout: 30 s
>> akka.tcp.timeout: 30 s
>>
>> Best
>> Lu
>>
>>


Re: Questions about UPDATE_BEFORE row kind in ElasticSearch sink

2021-06-29 Thread Kai Fu
Thank you for the reply, Jark. In our case, we found that there are no
UPDATE_BEFORE records generated since the join is using -D/+I row kinds.

*> Note: "+I" represents "INSERT", "-D" represents "DELETE", "+U"
represents "UPDATE_AFTER",*




* "-U" represents "UPDATE_BEFORE". We forward input RowKind if it is inner
join, otherwise, we  always send insert and delete for simplification. We
can optimize this to send -U & +U  instead of D & I in the future (see
FLINK-17337). They are equivalent in this join case. It  may need some
refactoring if we want to send -U & +U, so we still keep -D & +I for now
for  simplification.*

On Mon, Jun 28, 2021 at 2:21 PM Jark Wu  wrote:

> UPDATE_BEFORE is required in cases such as Aggregation with Filter. For
> example:
>
> SELECT *
> FROM (
>   SELECT word, count(*) as cnt
>   FROM T
>   GROUP BY word
> ) WHERE cnt < 3;
>
> There is more discussion in this issue:
> https://issues.apache.org/jira/browse/FLINK-9528
>
> Best,
> Jark
>
> On Mon, 28 Jun 2021 at 13:52, Kai Fu  wrote:
>
>> Hi team,
>>
>> We notice the UPDATE_BEFORE row kind is not dropped and treated as DELETE
>> as in code
>> .
>> We're aware that this is useful to retract output records in some cases,
>> but we cannot come up with such a scenario, could anyone name a few cases
>> for it.
>>
>> The other thing we want to do is drop the UPDATE_BEFORE row kind in the
>> ES connector to reduce the sink traffic since almost all of our records are
>> update. In our case, the records are generated by joining with a couple of
>> upsert-kafka data sources. Only primary-key participants in the join
>> condition for all join cases, with some granularity/cardinality fan-out in
>> the middle. We want to know whether it impacts the final result correctness
>> if we drop the records with UPDATE_BEFORE row kind.
>>
>> --
>> *Best wishes,*
>> *- Kai*
>>
>

-- 
*Best wishes,*
*- Kai*


flink sql 空闲数据源场景如何配置

2021-06-29 Thread 杨光跃
在代码中可以通过  .withIdleness(Duration.ofMinutes(1)); 指定空闲数据源也触发水印,那么sql中怎么表示呢


| |
杨光跃
|
|
yangguangyuem...@163.com
|
签名由网易邮箱大师定制

Re: Job Recovery Time on TM Lost

2021-06-29 Thread Gen Luo
Hi Lu,

We found almost the same thing when we were trying failover in a large
scale job. The akka.ask.timeout and heartbeat.timeout were set to 10min for
the test, and we found that the job would take 10min to recover from TM
lost.

We reached the conclusion that the behavior is expected in the Flink
version we used(1.13), and it should also apply to previous versions.
Here's how we think things are happening.

Flink JM uses heartbeat to check TM status. When a TM is lost, an upstream
or downstream TM will first sense it, while JM is not aware until the
heartbeat of the lost TM is timeout. The upstream/downstream TM then sends
an exception to JM, and JM will start canceling the job by sending cancel
requests to all TMs. However, the lost TM can't respond to the request, and
JM has to wait until the cancel request(via akka) is timeout before it can
mark the TM failed and continue the failover procedure.

We suppose that the phase from CANCELING to CANCELED takes
min(akka.ask.timeout, heartbeat.timeout), though not confirmed yet.

Hope it helps. Please let me know if there's anything wrong.

Lu Niu  于2021年6月30日周三 上午8:45写道:

> Hi, Flink Users
>
> We(Pinterest) are trying to speed up recovery speed when flink jobs hit
> one-time exceptions. To understand the baseline, the first test we do is to
> randomly kill one TM container and watch for how fast the flink job can
> recover. We did such test to multiple jobs and here are some findings:
>
>- The whole recovery stage can break down into two phases:
>   - Phase 1: job state switched from RUNNING -> RESTARTING ->
>   RUNNING. All tasks switch from RUNNING -> CANCELING -> CANCELED.
>   - Phase 2: All tasks switch from CREATED -> SCHEDULED -> DEPLOYING
>   -> RUNNING.
>- Phase 1 always takes around 30s and Phase 2 takes around 10 - 15s.
>
> Question:
> Why does Phase 1 always take about 30s? I shared related logs about 2 jobs
> showing that. Does it have sth to do with akka config?
>
> Our setup:
> flink version 1.11
> running yarn per-job mode
> akka.ask.timeout: 30 s
> akka.lookup.timeout: 30 s
> akka.tcp.timeout: 30 s
>
> Best
> Lu
>
>


How can I tell if a record in a bounded job is the last record?

2021-06-29 Thread Yik San Chan
Hi community,

I have a batch job that consumes records from a bounded source (e.g.,
Hive), walk them through a BufferingSink as described in [docs](
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#checkpointedfunction).
In the BufferingSink, I want to flush out records to the sink in
1000-record batches.

Given the source is bounded, I will need to flush out all records when it
comes to the end, otherwise records buffered (variable bufferedElements)
will be lost.

An obvious way of doing so is to flush out all records in the `close`
method. That should work fine.

However, I wonder if it's possible to tell if a record is the last record
in the `invoke` method? In other words, how to implement the `isLastRecord`
method below?

```java
@Override public void invoke(Tuple2 value, Context
context) throws Exception {
bufferedElements.add(value);
if (bufferedElements.size() == threshold || isLastRecord()) {
for (Tuple2 element: bufferedElements) {
// send it to the sink
}
bufferedElements.clear();
}
}
```

Thanks!

Best,
Yik San


Re: Converting Table API query to Datastream API

2021-06-29 Thread JING ZHANG
Hi Le,
link

is
a bit outdated. Since Flink 1.9 version, TableAPI  & SQL is no longer
translated to DataStream API. TableAPI & SQL and DataStream are at the same
level, and both translated into StreamOperator DAG.

Best regards,
JING ZHANG

Le Xu  于2021年6月30日周三 上午1:11写道:

> Hello! I have a basic question about the concept of using Flink Table API.
> Based on the link
> 
> here it seems like if I implement stream query with Table API the program
> is translated to datastream API eventually during execution. But is there a
> way to visualize how the datastream program looks like
>
> Thanks,
>
> Le
>


kafka字段不定

2021-06-29 Thread 黄润星
flinksql 读取kafka写入oracle
语句如:
insert into oracle_table(a,b,c,d) select a,b,c,d  from kafka_table
因为kafka json格式字段数不一定,有时候只有a,b字段后者只有c,d
所以a,b / c,d的null值会覆盖之前a,b / c,d的值
有什么办法判断c,d为null或者c,d不传的时候就保持oracle的原值吗?


| |
黄润星
|
|
13418645940
huangrunxingx...@163.com
|
签名由网易邮箱大师定制

Session cluster configmap removal

2021-06-29 Thread Sweta Kalakuntla
Hi,

We have flink session clusters in kubernetes and several long running flink
jobs deployed in them with HA enabled. After we have enabled HA, we are
seeing configmaps created for every new job. Whenever we stop/cancel any
existing jobs, these configmaps do not get deleted. Is that right, these
configmaps will not be removed unless we shutdown the cluster?

Thanks,
Sweta


Re: Savepoint failure with operation not found under key

2021-06-29 Thread Rainie Li
I see, then it passed longer than 5 mins.
Thanks for the help.

Best regards
Rainie

On Tue, Jun 29, 2021 at 12:29 AM Chesnay Schepler 
wrote:

> How much time has passed between the requests? (You can only query the
> status for about 5 minutes)
>
> On 6/29/2021 6:37 AM, Rainie Li wrote:
>
> Thanks for the context Chesnay.
> Yes, I sent both requests to the same JM.
>
> Best regards
> Rainie
>
> On Mon, Jun 28, 2021 at 8:33 AM Chesnay Schepler 
> wrote:
>
>> Ordinarily this happens because the status request is sent to a different
>> JM than the one who received the request for creating a savepoint.
>> The meta information for such requests is only stored locally on each JM
>> and neither distributed to all JMs nor persisted anywhere.
>>
>> Did you send both requests ( the ones for creating a savepoint and one
>> for querying the status) to the same JM?
>>
>> On 6/26/2021 11:18 PM, Rainie Li wrote:
>>
>> Hi Flink Community,
>>
>> I found this error when I tried to create a savepoint for my flink job.
>> It's in version 1.9.
>> {
>>
>> "errors": [
>> "Operation not found under key:
>> org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey@57b9711e"
>> ]
>> }
>>
>> Here is error from JM log:
>>
>> 2021-06-21 06:49:50,195 ERROR 
>> org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers$SavepointStatusHandler
>>   - Exception occurred in REST handler: Operation not found under key: 
>> org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey@56675052
>> 2021-06-21 06:50:50,023 INFO  
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 
>> 47281 of job 8de31918b2f2d983eee0d7f988a8f95a expired before completing.
>>
>> Any idea what could cause it and how to find more debugging info?
>>
>> Appreciated for any suggestions.
>>
>> Thanks
>>
>> Best regard
>>
>> Rainie
>>
>>
>>
>


Converting Table API query to Datastream API

2021-06-29 Thread Le Xu
 Hello! I have a basic question about the concept of using Flink Table API.
Based on the link

here it seems like if I implement stream query with Table API the program
is translated to datastream API eventually during execution. But is there a
way to visualize how the datastream program looks like

Thanks,

Le


Re: Recommended way to submit a SQL job via code without getting tied to a Flink version?

2021-06-29 Thread Sonam Mandal
Hi Matthias,

Thanks for getting back to me. We are trying to build a system where users can 
focus on writing Flink SQL applications and we handle the full lifecycle of 
their Flink cluster and job. We would like to let users focus on just their SQL 
and UDF logic. In such an environment, we cannot enforce that all users must 
use a single Flink version. We intend to have this setup in kubernetes where 
within the same kubernetes cluster we can create multiple Flink clusters to 
which jobs are submitted.

Due to this, using an interactive shell will not be an option, nor do we want 
to directly expose this to users except for testing purposes.

I see the latest 1.13 release now has an option to pass a SQL file as input to 
the SQL client and it’ll take care of running the job. We will explore this 
option as well. I believe this is a new feature which wasn’t available in 1.12, 
right? Does the planning happen in the SQL client or on the job manager? We ran 
into issues with job graph incompatibility if our code directly submitted the 
SQL to the remote environment or if we used /bin/flink to run this jar that 
does the SQL conversion.

We currently have a POC idea which takes the SQL as a file and we wrote a 
simple job runner which reads this SQL and executes it. We are using Flink REST 
APIs to upload this jar and submit the job so that the job graph generation 
happens on the job manager. We no longer see the job graph incompatibility 
issues.

Is there any reason not to use the above approach? We noticed that the Flink 
client (/bin/flink) does job graph generation  itself and not via the REST API, 
any reason why it doesn’t leverage the REST API?

Nice thing about using REST is that we can now run multiple Flink cluster 
versions and our job submission code doesn’t need to know which flink client 
version to use.

We definitely saw this job graph incompatibility with /bin/flink. We still need 
to test out the sql client with the -f option to assess whether we will require 
keeping multiple versions around should we decide to use this option. So we 
were wondering what the recommendation is within the Flink community on how to 
handle such cases. Hope this clarifies our use case better.

Also, as for the state incompatibility between major Flink versions, I see the 
thread mentions using a tool to rewrite the savepoints. Is this the only 
recommended way to handle this? Is this safe and does it work in all scenarios?

Thanks,
Sonam



From: Matthias Pohl 
Sent: Tuesday, June 29, 2021 02:29
To: Sonam Mandal
Cc: user@flink.apache.org; Jark Wu; Timo Walther
Subject: Re: Recommended way to submit a SQL job via code without getting tied 
to a Flink version?

Hi Sonam,
what's the reason for not using the Flink SQL client? Because of the version 
issue? I only know that FlinkSQL's state is not backwards-compatible between 
major Flink versions [1]. But that seems to be unrelated to what you describe.

I'm gonna add Jark and Timo to this thread. Maybe, they can add more insights.

Matthias

[1] 
https://issues.apache.org/jira/browse/FLINK-20823

On Tue, Jun 22, 2021 at 9:44 PM Sonam Mandal 
mailto:soman...@linkedin.com>> wrote:
Hello,

We've written a simple tool which takes SQL statements as input and uses a 
StreamTableEnvironment to eventually submit this to the Flink cluster. We've 
noticed that the Flink library versions we depend on must match the Flink 
version running in our Kubernetes cluster for the job submission to be 
successful. If the versions don't match, the job submission goes through but 
the job errors out for various reasons. We do not want to use the SQL shell 
(which I also believe is version specific and must run on the same pod as the 
Job Manager).

Is there any version agnostic way to submit SQL jobs to the Flink cluster?

Thanks,
Sonam


Re: Use Flink to write a Kafka topic to s3 as parquet files

2021-06-29 Thread Arvid Heise
Hi Thomas,

The usual way with Avro would be to generate a class from your schema [1].
Then PlaySession would already be a SpecificRecord and you would avoid the
extra step.

I'm quite positive that the same way works with ParquetAvroWriters.
Note that you would need to use ParquetAvroWriters#forSpecificRecord
instead. You can also directly try
ParquetAvroWriters#forReflectRecord(POJO.class), but it's much slower.

Let me know if that approach doesn't work for you.

[1] https://avro.apache.org/docs/1.10.2/gettingstartedjava.html

On Wed, Jun 23, 2021 at 12:08 AM Thomas Wang  wrote:

> Hi,
>
> I'm trying to tail a Kafka topic and copy the data to s3 as parquet files.
> I'm using StreamingFileSink with ParquetAvroWriters. It works just fine.
> However, it looks like I have to generate the Avro schema and convert my
> POJO class to GenericRecord first (i.e. convert DataStream to
> DataStream). Here is my code:
>
> DataStream input = ... // read from Kafka
> DataStream output =
> input.map(KafkaToS3::convertPlaySessionEventToGenericRecord).uid(operatorUid);
> Schema schema = new Schema.Parser().parse(schemaStr);
> StreamingFileSink streamingFileSink = StreamingFileSink
> .forBulkFormat(new Path(filePath),
> ParquetAvroWriters.forGenericRecord(schema))
> .build();
> output.addSink(streamingFileSink).uid(operatorUid);
>
> Following is the code for
> KafkaToS3::convertPlaySessionEventToGenericRecord:
>
> private static GenericRecord
> convertPlaySessionEventToGenericRecord(PlaySessionEvent playSessionEvent)
> throws Exception {
> Schema schema = PlaySession.getAvroSchema();
> GenericRecord record = new GenericData.Record(schema);
> for (Schema.Field field : schema.getFields()) {
> record.put(field.name(), KafkaToS3.getObjectField(field,
> playSessionEvent));
> }
> return record;
> }
>
> Although the app works just fine, I feel the whole process is
> unnecessarily convoluted. I would appreciate any guideline on this front.
> Am I doing this job right?
>
> Thanks.
>
> Thomas
>


Re: Recommended way to submit a SQL job via code without getting tied to a Flink version?

2021-06-29 Thread Matthias Pohl
Hi Sonam,
what's the reason for not using the Flink SQL client? Because of the
version issue? I only know that FlinkSQL's state is not
backwards-compatible between major Flink versions [1]. But that seems to be
unrelated to what you describe.

I'm gonna add Jark and Timo to this thread. Maybe, they can add more
insights.

Matthias

[1] https://issues.apache.org/jira/browse/FLINK-20823

On Tue, Jun 22, 2021 at 9:44 PM Sonam Mandal  wrote:

> Hello,
>
> We've written a simple tool which takes SQL statements as input and uses a
> StreamTableEnvironment to eventually submit this to the Flink cluster.
> We've noticed that the Flink library versions we depend on must match the
> Flink version running in our Kubernetes cluster for the job submission to
> be successful. If the versions don't match, the job submission goes through
> but the job errors out for various reasons. We do not want to use the SQL
> shell (which I also believe is version specific and must run on the same
> pod as the Job Manager).
>
> Is there any version agnostic way to submit SQL jobs to the Flink cluster?
>
> Thanks,
> Sonam
>


Re: Monitoring Exceptions using Bugsnag

2021-06-29 Thread Matthias Pohl
Hi Kevin,
I haven't worked with Bugsnag. So, I cannot give more input on that one.
For Flink, exceptions are handled by the job's scheduler. Flink collects
these exceptions in some bounded queue called the exception history [1]. It
collects task failures but also global failures which make the job fail or
restart. The size of the queue can be set through
web.exception-history-size [2].
Additionally, there's the FatalErrorHandler interface [3] which is used for
fatal (i.e. unrecoverable) errors of the ecosystem. You might want to have
a look at the implementations of this interface.

I hope that helps a bit.
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/rest_api/#jobs-jobid-exceptions
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#web-exception-history-size
[3]
https://github.com/apache/flink/blob/239067c8d6393a273acaa2a3c3f57ad1c5486e3a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/FatalErrorHandler.java#L22

On Mon, Jun 21, 2021 at 3:12 PM Kevin Lam  wrote:

> Hi all,
>
> I'm interested in instrumenting an Apache Flink application so that we can
> monitor exceptions. I was wondering what the best practices are here? Is
> there a good way to observe all the exceptions inside of a Flink
> application, including Flink internals?
>
> We are currently thinking of using Bugsnag, which has some steps to
> integrate with java applications:
> https://docs.bugsnag.com/platforms/java/other/, which works fine for
> uncaught exceptions in the job manager / pipeline driver context, but
> doesn't catch anything outside of that.
>
> We're also interested in reporting on exceptions that occur in the job
> execution context, eg. in task managers.
>
> Any tips/suggestions? I'd love to learn more about exception tracking and
> handling in Flink :)
>
> (reposting because it looks like my other thread got deleted?)
>


Re: NoSuchMethodError - getColumnIndexTruncateLength after upgrading Flink from 1.11.2 to 1.12.1

2021-06-29 Thread Matthias Pohl
Hi Rommel, Hi Thomas,
Apache Parquet was bumped from 1.10.0 to 1.11.1 for Flink 1.12 in
FLINK-19137 [1]. The error you're seeing looks like some dependency issue
where you have a version other than 1.11.1
of org.apache.parquet:parquet-column:jar on your classpath?

Matthias

[1] https://issues.apache.org/jira/browse/FLINK-19137

On Wed, Jun 23, 2021 at 1:50 AM Rommel Holmes 
wrote:

> To give more information
>
> parquet-avro version 1.10.0 with Flink 1.11.2 and it was running fine.
>
> now Flink 1.12.1, the error msg shows up.
>
> Thank you for help.
>
> Rommel
>
>
>
>
>
> On Tue, Jun 22, 2021 at 2:41 PM Thomas Wang  wrote:
>
>> Hi,
>>
>> We recently upgraded our Flink version from 1.11.2 to 1.12.1 and one of
>> our jobs that used to run ok, now sees the following error. This error
>> doesn't seem to be related to any user code. Can someone help me take a
>> look?
>>
>> Thanks.
>>
>> Thomas
>>
>> java.lang.NoSuchMethodError:
>> org.apache.parquet.column.ParquetProperties.getColumnIndexTruncateLength()I
>> at org.apache.parquet.hadoop.ParquetWriter.(ParquetWriter.java:282)
>> ~[?:?]
>> at
>> org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:564)
>> ~[?:?]
>> at
>> org.apache.flink.formats.parquet.avro.ParquetAvroWriters.createAvroParquetWriter(ParquetAvroWriters.java:90)
>> ~[?:?]
>> at
>> org.apache.flink.formats.parquet.avro.ParquetAvroWriters.lambda$forGenericRecord$abd75386$1(ParquetAvroWriters.java:65)
>> ~[?:?]
>> at
>> org.apache.flink.formats.parquet.ParquetWriterFactory.create(ParquetWriterFactory.java:56)
>> ~[?:?]
>> at
>> org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter.openNew(BulkBucketWriter.java:75)
>> ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
>> at
>> org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedBucketWriter.openNewInProgressFile(OutputStreamBasedPartFileWriter.java:90)
>> ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
>> at
>> org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter.openNewInProgressFile(BulkBucketWriter.java:36)
>> ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
>> at
>> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:243)
>> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>> at
>> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:220)
>> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>> at
>> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:305)
>> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>> at
>> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:103)
>> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>> at
>> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:492)
>> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>> at
>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
>> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>> at
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
>> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>> at
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
>> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>> at
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
>> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>> at
>> org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:75)
>> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>> at
>> org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32)
>> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>> at
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
>> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>> at
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
>> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>> at
>> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
>> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>> at
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
>> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>> at
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
>> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>> at
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
>> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>> at
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
>> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>> at
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
>> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>> at
>> 

Re: Savepoint failure with operation not found under key

2021-06-29 Thread Chesnay Schepler
How much time has passed between the requests? (You can only query the 
status for about 5 minutes)


On 6/29/2021 6:37 AM, Rainie Li wrote:

Thanks for the context Chesnay.
Yes, I sent both requests to the same JM.

Best regards
Rainie

On Mon, Jun 28, 2021 at 8:33 AM Chesnay Schepler > wrote:


Ordinarily this happens because the status request is sent to a
different JM than the one who received the request for creating a
savepoint.
The meta information for such requests is only stored locally on
each JM and neither distributed to all JMs nor persisted anywhere.

Did you send both requests ( the ones for creating a savepoint and
one for querying the status) to the same JM?

On 6/26/2021 11:18 PM, Rainie Li wrote:

Hi Flink Community,

I found this error when I tried to create a savepoint for my
flink job. It's in version 1.9.
{
 "errors": [
 "Operation not found under key:

org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey@57b9711e"
 ]
}
Here is error from JM log:
2021-06-21 06:49:50,195 ERROR 
org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers$SavepointStatusHandler
  - Exception occurred in REST handler: Operation not found under key: 
org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey@56675052
2021-06-21 06:50:50,023 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 
47281 of job 8de31918b2f2d983eee0d7f988a8f95a expired before completing.
Any idea what could cause it and how to find more debugging info?
Appreciated for any suggestions.
Thanks
Best regard
Rainie