Flink 1.15 Interval Join error after Deduplicate

2022-10-14 Thread liebingyu
I had a problem with Interval Join after using Deduplicate. I'm using Flink 
version 1.15.

I want to use Flink's Interval Join for double-stream association, and my first 
table needs to be de-duplicated. Here is my sample code.

```
CREATE TEMPORARY TABLE `source` (
  id INT,
  name STRING,
  event_time TIMESTAMP(3),
  WATERMARK FOR event_time AS event_time
) WITH (
  'connector' = 'datagen'
);


CREATE TEMPORARY TABLE B (
  id INT,
  `start` INT,
  `end` INT,
  event_time TIMESTAMP(3),
  WATERMARK FOR event_time AS event_time
) WITH (
  'connector' = 'datagen'
);

create TEMPORARY view A as
select id, name, event_time from (
  select id, name, event_time,
  row_number() over(partition by id, name, event_time order by event_time asc) 
as rn
  from source
)
where rn = 1;

SELECT *
FROM A, B
WHERE 
A.id = B.id AND A.id >= B.`start` AND A.id <= B.`end` AND 
A.event_time BETWEEN B.event_time - INTERVAL '10' SECOND AND 
B.event_time + INTERVAL '10' SECOND;
```

I used to preserve the first row of data for the de-duplication, so view A 
should only produce insert rows, but running the SQL above would produce the 
following error.

```
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: StreamPhysicalIntervalJoin doesn't 
support consuming update and delete changes which is produced by node 
Deduplicate(keep=[FirstRow], key=[id, name, event_time], order=[ROWTIME])
```

How to perform Interval Join after using Deduplicate?

Re: Flink 1.15 Deduplicate之后Interval Join出错

2022-10-14 Thread 余列冰
Hi!

我在使用Deduplicate之后进行Interval Join出现问题。我使用的Flink版本是1.15

我希望使用Flink的Interval Join进行双流关联,并且我的第一个表需要去重。以下是我的示例代码。
```sql
CREATE TEMPORARY TABLE `source` (
  id INT,
  name STRING,
  event_time TIMESTAMP(3),
  WATERMARK FOR event_time AS event_time
) WITH (
  'connector' = 'datagen'
);


CREATE TEMPORARY TABLE B (
  id INT,
  `start` INT,
  `end` INT,
  event_time TIMESTAMP(3),
  WATERMARK FOR event_time AS event_time
) WITH (
  'connector' = 'datagen'
);

create TEMPORARY view A as
select id, name, event_time from (
  select id, name, event_time,
  row_number() over(partition by id, name, event_time order by event_time asc) 
as rn
  from source
)
where rn = 1;

SELECT *
FROM A, B
WHERE 
A.id = B.id AND A.id >= B.`start` AND A.id <= B.`end` AND 
A.event_time BETWEEN B.event_time - INTERVAL '10' SECOND AND 
B.event_time + INTERVAL '10' SECOND;
```

在去重时我采用了保留第一行数据,这时view A应该只会产生insert的行,但是运行上述SQL会出现如下错误。
```
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: StreamPhysicalIntervalJoin doesn't 
support consuming update and delete changes which is produced by node 
Deduplicate(keep=[FirstRow], key=[id, name, event_time], order=[ROWTIME])
```

请问如何在使用Deduplicate之后进行Interval Join?


> -原始邮件-
> 发件人: LB 
> 发送时间: 2022-10-15 09:39:31 (星期六)
> 收件人: user-zh 
> 抄送: 
> 主题: Flink 1.15 Deduplicate之后Interval Join出错
> 
> 抱歉上一封邮件格式有问题,以此为准。Hi! 我在使用Deduplicate之后进行Interval Join出现问题。我使用的Flink版本是1.15 
> 我希望使用Flink的Interval Join进行双流关联,并且我的第一个表需要去重。以下是我的示例代码。 ```sql CREATE 
> TEMPORARY TABLE `source` (   id INT,   name STRING,   event_time 
> TIMESTAMP(3),   WATERMARK FOR event_time AS event_time ) WITH (   'connector' 
> = 'datagen' ); CREATE TEMPORARY TABLE B (   id INT,   `start` INT,   `end` 
> INT,   event_time TIMESTAMP(3),   WATERMARK FOR event_time AS event_time ) 
> WITH (   'connector' = 'datagen' ); create TEMPORARY view A as select id, 
> name, event_time from (   select id, name, event_time,   row_number() 
> over(partition by id, name, event_time order by event_time asc) as rn   from 
> source ) where rn = 1; SELECT * FROM A, B WHEREA.id = B.id AND A.id = 
> B.`start` AND A.id <= B.`end` ANDA.event_time BETWEEN B.event_time - 
> INTERVAL '10' SECOND ANDB.event_time + INTERVAL '10' SECOND; ``` 
> 在去重时我采用了保留第一行数据,这时view A应该只会产生insert的行,但是运行上述SQL会出现如下错误。 ``` [ERROR] Could 
> not execute SQL statement. Reason: org.apache.flink.table.api.TableException: 
> StreamPhysicalIntervalJoin doesn't support consuming update and delete 
> changes which is produced by node Deduplicate(keep=[FirstRow], key=[id, name, 
> event_time], order=[ROWTIME]) ``` 请问如何在使用Deduplicate之后进行Interval Join?


Re: question about Async IO

2022-10-14 Thread Krzysztof Chmielewski
Hi Galen,
i will tell from my experience as a Flink user and developer of Flink jobs.



*"if the input to an AsyncFunction is a keyed stream, can I assume that all
input elements with the same key will be handled by the same instance of
the async operator"*
>From what I know (and someone can correct me if I'm wrong) this is
possible. However you have to make sure that there is no Re-balance or
re-shuffle between those operators. For example operators after first
.keyBy(..) call must have same parallelism level.

Regarding:
" I have a situation where I would like to enforce that async operations
associated with a particular key happen sequentially,"

This is also possible as far as I know. In fact I was implementing
streaming pipeline with similar requirements like
*"maintaining order of events withing keyBy group across multiple operators
including Async operators". *
We achieved that with same thing -> making sure that all operators in
entire pipeline except Source and Sink had exact same parallelism level.
Additional thing to remember here is that if you call .keyBy(...) again but
with different key extractor, then original order might not be preserved
since keyBy will execute re-shuffle/re-balance.

We were also using reinterpretAsKeyedStream feature [1] after async
operators to avoid calling ".keyBay" multiple times in pipeline. Calling
.keyBy always has negative impact on performance.
With reinterpretAsKeyedStream we were able to use keyed operators with
access to keyed state after Async operators.

Hope that helped.

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/experimental/

Regards,
Krzysztof Chmielewski







pt., 14 paź 2022 o 19:11 Galen Warren  napisał(a):

> I have a question about Flink's Async IO support: Async I/O | Apache Flink
> 
> .
>
> I understand that access to state is not supported in an AsyncFunction.
> However, if the input to an AsyncFunction is a keyed stream, can I assume
> that all input elements with the same key will be handled by the same
> instance of the async operator, as would normally be the case with keyed
> streams/operators?
>
> I'm asking because I have a situation where I would like to enforce that
> async operations associated with a particular key happen sequentially, i.e.
> if two elements come through with the same key, I need  the async operation
> for the second to happen after the async operation for the first one
> completes. I think I can achieve this using a local map of "in flight"
> async operations in the operator itself, but only if I can rely on all
> input elements with the same key being processed by the same async operator.
>
> If anyone can confirm how this works, I'd appreciate it. Thanks.
>


question about Async IO

2022-10-14 Thread Galen Warren
I have a question about Flink's Async IO support: Async I/O | Apache Flink

.

I understand that access to state is not supported in an AsyncFunction.
However, if the input to an AsyncFunction is a keyed stream, can I assume
that all input elements with the same key will be handled by the same
instance of the async operator, as would normally be the case with keyed
streams/operators?

I'm asking because I have a situation where I would like to enforce that
async operations associated with a particular key happen sequentially, i.e.
if two elements come through with the same key, I need  the async operation
for the second to happen after the async operation for the first one
completes. I think I can achieve this using a local map of "in flight"
async operations in the operator itself, but only if I can rely on all
input elements with the same key being processed by the same async operator.

If anyone can confirm how this works, I'd appreciate it. Thanks.


SQL Changes between 1.14 and 1.15?

2022-10-14 Thread PACE, JAMES
We've noticed the following difference in sql when upgrading from flink 1.14.5 
to 1.15.2 around characters that are escaped in an sql statement:

This statement:
  tableEnvironment.executeSql("select * from testTable WHERE lower(field1) LIKE 
'b\"cd\"e%'");
produces a runtime error in flink 1.15.2, but executes properly in flink 1.14.5

This can be worked around by escaping the backslash, changing the statement to:
  tableEnvironment.executeSql("select * from testTable WHERE lower(field1) LIKE 
'b\\\"cd\\\"e%'");

This code illustrates the issue:

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class TestCase3 {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

TestData testData = new TestData();
testData.setField1("b\"cd\"e");
DataStream stream = env.fromElements(testData);
stream.print();
final StreamTableEnvironment tableEnvironment = 
StreamTableEnvironment.create(env);
tableEnvironment.createTemporaryView("testTable", stream, 
Schema.newBuilder().build());

// Works with Flink 1.14.x, flink runtime errors in 1.15.2.  Uncomment 
to see runtime trace
//tableEnvironment.executeSql("select *, '1' as run from testTable 
WHERE lower(field1) LIKE 'b\"cd\"e%'").print();
// Works with 1.15.2
tableEnvironment.executeSql("select * from testTable WHERE 
lower(field1) LIKE 'b\\\"cd\\\"e%'").print();

env.execute("TestCase");
}

public static class TestData {
private String field1;

public String getField1() { return field1; }
public void setField1(String field1) { this.field1 = field1; }
}
}

Thanks
Jim


Re: Sometimes checkpoints to s3 fail

2022-10-14 Thread Matthias Pohl via user
Hi Evgeniy,
is it Ceph which you're using as a S3 server? All the Google search entries
point to Ceph when looking for the error message. Could it be that there's
a problem with the version of the underlying system? The stacktrace you
provided looks like Flink struggles to close the File and, therefore, fails
to create the checkpoint.

Best,
Matthias

On Thu, Oct 6, 2022 at 11:25 AM Evgeniy Lyutikov 
wrote:

> Hello all.
> I can’t understand the floating problem, sometimes checkpoints stop
> passing, sometimes they start to complete every other time.
> Flink 1.14.4 in kubernetes application mode.
>
>
> 2022-10-06 09:08:04,731 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
> Triggering checkpoint 18314 (type=CHECKPOINT) @ 1665047284716 for job
> .
> 2022-10-06 09:11:29,130 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Decline
> checkpoint 18314 by task 048169f0e3c2efd473d3cef9c9d2cd70 of job
>  at job-name-taskmanager-3-1 @ 10.109.0.168
> (dataPort=43795).
> org.apache.flink.util.SerializedThrowable: Asynchronous task checkpoint
> failed.
> at
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:301)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:155)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> [?:?]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> [?:?]
> at java.lang.Thread.run(Thread.java:829) [?:?]
> Caused by: org.apache.flink.util.SerializedThrowable: Could not
> materialize checkpoint 18314 for operator Process rec last clicks -> Cast
> rec last clicks type (30/44)#0.
> at
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:279)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> ... 4 more
> Caused by: org.apache.flink.util.SerializedThrowable: java.io.IOException:
> Could not flush to file and close the file system output stream to
> s3p://flink-checkpoints/k8s-checkpoint-job-name//shared/7c09fcf1-49b9-4b72-b756-81cd7778e396
> in order to obtain the stream state handle
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> ~[?:?]
> at java.util.concurrent.FutureTask.get(FutureTask.java:191) ~[?:?]
> at
> org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:645)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:54)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:177)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> ... 3 more
> Caused by: org.apache.flink.util.SerializedThrowable: Could not flush to
> file and close the file system output stream to
> s3p://flink-checkpoints/k8s-checkpoint-job-name//shared/7c09fcf1-49b9-4b72-b756-81cd7778e396
> in order to obtain the stream state handle
> at
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:373)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at
> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadLocalFileToCheckpointFs(RocksDBStateUploader.java:143)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at
> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.lambda$createUploadFutures$0(RocksDBStateUploader.java:101)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at
> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:32)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
> ~[?:?]
> ... 3 more
> Caused by: org.apache.flink.util.SerializedThrowable:
> com.amazonaws.services.s3.model.AmazonS3Exception: This multipart
> completion is already in progress (Service: Amazon S3; Status Code: 500;
> Error Code: InternalError; Request ID:
> tx000ced9f8-00633e9bc1-18489a52-default; S3 Extended Request
> ID: 18489a52-default-default; Proxy: null), S3 Extended Request ID:
> 18489a52-default-default
> at
> com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3OutputStream.uploadObject(PrestoS3FileSystem.java:1278)
> ~[?:?]
> at
> 

Re: jobmaster's fatal error will kill the session cluster

2022-10-14 Thread Jie Han
Thanks for the note.
The root cause is the following

Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to start the 
operator coordinators
at 
org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:169)
 ~[flink-dist-1.15.0.jar:1.15.0]
at 
org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startAllOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:82)
 ~[flink-dist-1.15.0.jar:1.15.0]
at 
org.apache.flink.runtime.scheduler.SchedulerBase.startScheduling(SchedulerBase.java:624)
 ~[flink-dist-1.15.0.jar:1.15.0]
at 
org.apache.flink.runtime.jobmaster.JobMaster.startScheduling(JobMaster.java:1010)
 ~[flink-dist-1.15.0.jar:1.15.0]
at 
org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:927)
 ~[flink-dist-1.15.0.jar:1.15.0]
at 
org.apache.flink.runtime.jobmaster.JobMaster.onStart(JobMaster.java:388) 
~[flink-dist-1.15.0.jar:1.15.0]
at 
org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:181)
 ~[flink-dist-1.15.0.jar:1.15.0]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.lambda$start$0(AkkaRpcActor.java:612)
 ~[flink-rpc-akka_db70a2fa-991e-4392-9447-5d060aeb156e.jar:1.15.0]
at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
 ~[flink-rpc-akka_db70a2fa-991e-4392-9447-5d060aeb156e.jar:1.15.0]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:611)
 ~[flink-rpc-akka_db70a2fa-991e-4392-9447-5d060aeb156e.jar:1.15.0]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:185)
 ~[flink-rpc-akka_db70a2fa-991e-4392-9447-5d060aeb156e.jar:1.15.0]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) ~[?:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) ~[?:?]
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) 
~[flink-scala_2.12-1.15.0.jar:1.15.0]
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) 
~[flink-scala_2.12-1.15.0.jar:1.15.0]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) 
~[?:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
~[flink-scala_2.12-1.15.0.jar:1.15.0]
... 13 more
Caused by: java.lang.RuntimeException: java.net.URISyntaxException: Relative 
path in absolute URI: file:~/usr/bin/hudi/tables/t1/.hoodie
at 
org.apache.hudi.common.fs.HoodieWrapperFileSystem.convertPathWithScheme(HoodieWrapperFileSystem.java:156)
 ~[?:?]
at 
org.apache.hudi.common.fs.HoodieWrapperFileSystem.convertToDefaultPath(HoodieWrapperFileSystem.java:961)
 ~[?:?]
at 
org.apache.hudi.common.fs.HoodieWrapperFileSystem.lambda$getFileStatus$17(HoodieWrapperFileSystem.java:398)
 ~[?:?]
at 
org.apache.hudi.common.fs.HoodieWrapperFileSystem.executeFuncWithTimeMetrics(HoodieWrapperFileSystem.java:106)
 ~[?:?]
at 
org.apache.hudi.common.fs.HoodieWrapperFileSystem.getFileStatus(HoodieWrapperFileSystem.java:396)
 ~[?:?]
at 
org.apache.hudi.exception.TableNotFoundException.checkTableValidity(TableNotFoundException.java:51)
 ~[?:?]
at 
org.apache.hudi.common.table.HoodieTableMetaClient.(HoodieTableMetaClient.java:128)
 ~[?:?]
at 
org.apache.hudi.common.table.HoodieTableMetaClient.newMetaClient(HoodieTableMetaClient.java:642)
 ~[?:?]
at 
org.apache.hudi.common.table.HoodieTableMetaClient.access$000(HoodieTableMetaClient.java:80)
 ~[?:?]
at 
org.apache.hudi.common.table.HoodieTableMetaClient$Builder.build(HoodieTableMetaClient.java:711)
 ~[?:?]
at 
org.apache.hudi.common.table.HoodieTableMetaClient.initTableAndGetMetaClient(HoodieTableMetaClient.java:466)
 ~[?:?]
at 
org.apache.hudi.common.table.HoodieTableMetaClient$PropertyBuilder.initTable(HoodieTableMetaClient.java:1122)
 ~[?:?]
at 
org.apache.hudi.util.StreamerUtil.initTableIfNotExists(StreamerUtil.java:323) 
~[?:?]
at 
org.apache.hudi.util.StreamerUtil.initTableIfNotExists(StreamerUtil.java:293) 
~[?:?]
at 
org.apache.hudi.sink.StreamWriteOperatorCoordinator.start(StreamWriteOperatorCoordinator.java:179)
 ~[?:?]
at 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.start(OperatorCoordinatorHolder.java:194)
 ~[flink-dist-1.15.0.jar:1.15.0]
at 
org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:164)
 ~[flink-dist-1.15.0.jar:1.15.0]
at 
org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startAllOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:82)
 ~[flink-dist-1.15.0.jar:1.15.0]
at 

Re: jobmaster's fatal error will kill the session cluster

2022-10-14 Thread Matthias Pohl via user
Hi Jie Han,
welcome to the community. Just a little side note: These kinds of questions
are more suitable to be asked in the user mailing list. The dev mailing
list is rather used for discussing feature development or project-related
topics. See [1] for further details.

About your question: The stacktrace you're providing indicates that
something went wrong while initiating the job execution. Unfortunately, the
actual reason is not clear because that's not included in your stacktrace
(it should be listed as a cause for the JobMasterException in your logs).
You're right in assuming that Flink is able to handle certain kinds of user
code and infrastructure-related errors by restarting the job. But there
might be other Flink cluster internal errors that could cause a Flink
cluster shutdown. It's hard to tell from the logs you provided. Usually,
it's a good habit to share a reasonable amount of logs to make
investigating the issue easier right away.

Let's move the discussion into the user mailing list in case you have
further questions.

Best,
Matthias

[1] https://flink.apache.org/community.html#mailing-lists

On Fri, Oct 14, 2022 at 10:13 AM Jie Han  wrote:

> Hi, guys, I’m new to apache flink. It’s exciting to join the community!
>
> When I experienced flink 1.15.0, I met some problems confusing, here is
> the streamlined log:
>
> org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException: Could not
> start RpcEndpoint jobmanager_2.
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:617)
> ~[flink-rpc-akka_65043be6-9dc5-4303-a760-61bd044fb53a.jar:1.15.0]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:185)
> ~[flink-rpc-akka_65043be6-9dc5-4303-a760-61bd044fb53a.jar:1.15.0]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
> ~[flink-rpc-akka_65043be6-9dc5-4303-a760-61bd044fb53a.jar:1.15.0]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
> ~[flink-rpc-akka_65043be6-9dc5-4303-a760-61bd044fb53a.jar:1.15.0]
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> ~[flink-rpc-akka_65043be6-9dc5-4303-a760-61bd044fb53a.jar:1.15.0]
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> ~[flink-rpc-akka_65043be6-9dc5-4303-a760-61bd044fb53a.jar:1.15.0]
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
> ~[flink-rpc-akka_65043be6-9dc5-4303-a760-61bd044fb53a.jar:1.15.0]
> at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> ~[flink-rpc-akka_65043be6-9dc5-4303-a760-61bd044fb53a.jar:1.15.0]
> at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> ~[flink-rpc-akka_65043be6-9dc5-4303-a760-61bd044fb53a.jar:1.15.0]
> at akka.actor.Actor.aroundReceive(Actor.scala:537)
> ~[flink-rpc-akka_65043be6-9dc5-4303-a760-61bd044fb53a.jar:1.15.0]
> at akka.actor.Actor.aroundReceive$(Actor.scala:535)
> ~[flink-rpc-akka_65043be6-9dc5-4303-a760-61bd044fb53a.jar:1.15.0]
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
> ~[flink-rpc-akka_65043be6-9dc5-4303-a760-61bd044fb53a.jar:1.15.0]
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
> ~[flink-rpc-akka_65043be6-9dc5-4303-a760-61bd044fb53a.jar:1.15.0]
> at akka.actor.ActorCell.invoke(ActorCell.scala:548)
> ~[flink-rpc-akka_65043be6-9dc5-4303-a760-61bd044fb53a.jar:1.15.0]
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
> [flink-rpc-akka_65043be6-9dc5-4303-a760-61bd044fb53a.jar:1.15.0]
> at akka.dispatch.Mailbox.run(Mailbox.scala:231)
> [flink-rpc-akka_65043be6-9dc5-4303-a760-61bd044fb53a.jar:1.15.0]
> at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
> [flink-rpc-akka_65043be6-9dc5-4303-a760-61bd044fb53a.jar:1.15.0]
> at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
> [?:1.8.0_301]
> at
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067)
> [?:1.8.0_301]
> at
> java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703)
> [?:1.8.0_301]
> at
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172)
> [?:1.8.0_301]
> Caused by: org.apache.flink.runtime.jobmaster.JobMasterException: Could
> not start the JobMaster.
> at
> org.apache.flink.runtime.jobmaster.JobMaster.onStart(JobMaster.java:390)
> ~[flink-dist-1.15.0.jar:1.15.0]
> at
> org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:181)
> ~[flink-dist-1.15.0.jar:1.15.0]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.lambda$start$0(AkkaRpcActor.java:612)
> ~[flink-rpc-akka_65043be6-9dc5-4303-a760-61bd044fb53a.jar:1.15.0]
> at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
> 

Broadcast state restoration for BroadcastProcessFunction

2022-10-14 Thread Alexis Sarda-Espinosa
Hello,

I wrote a test for a broadcast function to check how it handles broadcast
state during retries [1] (the gist only shows a subset of the test in
Kotlin, but it's hopefully understandable). The test will not pass unless
my function also implements CheckpointedFunction, although those
interface's methods' implementations can be empty - the state is empty in
this case, even though its descriptor is registered with the harness.

Is this requirement specific to the test harness API?
Otherwise BaseBroadcastProcessFunction should implement
CheckpointedFunction, maybe with empty default methods, no?

[1] https://gist.github.com/asardaes/b804b7ed04ace176881189c3d1cf842a

Regards,
Alexis.


Re: Re: flink1.15.1 stop 任务失败

2022-10-14 Thread yidan zhao
嗯,我之前也试过了,kafkaSouce确实是可以的,就是FlinkKafkaConsumer不行。

yanfei lei  于2022年10月14日周五 14:22写道:
>
> Hi yidan && hjw,
> 我用FlinkKafkaConsumer在本地也复现了这一问题,但用KafakaSource是可以正常做stop-with-savepoint的。FlinkKafkaConsumer在Flink
> 1.15后被deprecated了[1],推荐用新的KafkaSource再试试。
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#kafka-sourcefunction
>
> Best,
> Yanfei
>
> hjw <1010445...@qq.com.invalid> 于2022年8月23日周二 23:39写道:
>
> > 我认为这个问题应该是Kafka Connector用旧的Api导致的。这个问题在IDEA本地跑就可以复现。我针对这个问题已经提过相关Jira
> > https://issues.apache.org/jira/browse/FLINK-28758。目前还没有收到社区的反馈。
> >
> >
> > --原始邮件--
> > 发件人:
> >   "user-zh"
> > <
> > hinobl...@gmail.com;
> > 发送时间:2022年8月23日(星期二) 晚上11:09
> > 收件人:"user-zh" >
> > 主题:Re: Re: flink1.15.1 stop 任务失败
> >
> >
> >
> > 1 大概率是source部分问题,或者 savepoint 的 trigger 层面。
> > 2 也可以从 cancel 和 stop 的区别上考虑下?
> > 3 补充信息:我的kafka source是用的旧版本(没办法用新版本,原因是由于一些原因我必须用 kafka 低版本 client)。
> >
> > yidan zhao  > 
> >  看了下,报错很少。
> >  反正 flink cancel -s 是可以的,flink stop 就不行。而且目测是瞬间失败。从web
> >  ui来看,整个savepoint的完成是0/841,应该是几乎没开始就出错了。
> >  目前4台机器:
> >  机器1
> >  2022-08-23 22:47:37,093 WARN
> > 
> > org.apache.flink.runtime.taskmanager.Task
> > [] -
> >  Source: JobConfig - Split(JobName_configType)
> >  (1/1)#0 (b5076938b231fb9d33e582104292ebd5) switched from
> > RUNNING to
> >  FAILED with failure cause:
> >  org.apache.flink.util.FlinkRuntimeException: S
> >  top-with-savepoint failed.
> >  at
> > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointAbortAsync$15(StreamTask.java:1339)
> >  at
> > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$17(StreamTask.java:1364)
> >  at
> > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskAction
> >  Executor.java:93)
> >  at
> > org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> >  at
> > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:33
> >  8)
> >  at
> > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
> >  at
> > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
> >  at
> > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
> >  at
> > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
> >  at
> > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
> >  at
> > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
> >  at
> > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
> >  at
> > org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
> >  at
> > java.lang.Thread.run(Thread.java:748)
> >  下面就是各种 free task,unregister扒拉的。
> > 
> >  机器2
> >  ...
> >  基本看起来都是从 Attempt to cancel 开始,没有先报错,再cancel的貌似。
> > 
> >  Xuyang  >  
> >   Hi, TM上有报错信息嘛?有的话可以贴出来看一下是什么导致cp失败的
> >  
> >  
> >  
> >  
> >  
> >  
> >  
> >   --
> >  
> >   Best!
> >   Xuyang
> >  
> >  
> >  
> >  
> >  
> >   Hi, TM上有报错信息嘛?有的话可以贴出来看一下是什么导致cp失败的
> >   在 2022-08-23 20:41:59,"yidan zhao"  >   补充部分信息:
> >   看日志,如果是 flink savepoint xxx 这样触发检查点,JM的日志很简单:
> >   2022-08-23 20:33:22,307 INFO
> >  
> > org.apache.flink.runtime.jobmaster.JobMaster
> > [] -
> >   Triggering savepoint for job 8d231de75b8227a1b
> >   715b1aa665caa91.
> >   
> >   2022-08-23 20:33:22,318 INFO
> >  
> > org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> > [] -
> >   Triggering checkpoint 5 (type=SavepointType{na
> >   me='Savepoint', postCheckpointAction=NONE,
> > formatType=CANONICAL}) @
> >   1661258002307 for job 8d231de75b8227a1b715b1aa665caa91.
> >   
> >   2022-08-23 20:33:23,701 INFO
> >  
> > org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream
> >   [] - Cannot create recoverable writer
> >due to Recoverable writers on Hadoop are only supported for
> > HDFS,
> >   will use the ordinary writer.
> >   
> >   2022-08-23 20:33:23,908 INFO
> >  
> > org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> > [] -
> >   Completed checkpoint 5 for job
> > 8d231de75b8227a1b715b1aa665caa91
> >   (1638207 bytes, checkpointDuration=1600 ms,
> > finalizationTime=1 ms).
> >   
> >   
> >   如果是 stop xxx 这样停止任务,则JM日志(错误)如下:
> >   
> >   2022-08-23 20:35:01,834 INFO
> >  
> > org.apache.flink.runtime.jobmaster.JobMaster
> > [] -
> >   Triggering stop-with-savepoint for job
> >   8d231de75b8227a1b715b1aa665caa91.
> >   
> >   2022-08-23 20:35:01,842 INFO
> >  
> > org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> > [] -
> >   Triggering checkpoint 6 (type=SavepointType{name='Suspend
> > Savepoint',
> >   postCheckpointAction=SUSPEND, 

Re: Re: flink1.15.1 stop 任务失败

2022-10-14 Thread yanfei lei
Hi yidan && hjw,
我用FlinkKafkaConsumer在本地也复现了这一问题,但用KafakaSource是可以正常做stop-with-savepoint的。FlinkKafkaConsumer在Flink
1.15后被deprecated了[1],推荐用新的KafkaSource再试试。

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#kafka-sourcefunction

Best,
Yanfei

hjw <1010445...@qq.com.invalid> 于2022年8月23日周二 23:39写道:

> 我认为这个问题应该是Kafka Connector用旧的Api导致的。这个问题在IDEA本地跑就可以复现。我针对这个问题已经提过相关Jira
> https://issues.apache.org/jira/browse/FLINK-28758。目前还没有收到社区的反馈。
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> hinobl...@gmail.com;
> 发送时间:2022年8月23日(星期二) 晚上11:09
> 收件人:"user-zh"
> 主题:Re: Re: flink1.15.1 stop 任务失败
>
>
>
> 1 大概率是source部分问题,或者 savepoint 的 trigger 层面。
> 2 也可以从 cancel 和 stop 的区别上考虑下?
> 3 补充信息:我的kafka source是用的旧版本(没办法用新版本,原因是由于一些原因我必须用 kafka 低版本 client)。
>
> yidan zhao  
>  看了下,报错很少。
>  反正 flink cancel -s 是可以的,flink stop 就不行。而且目测是瞬间失败。从web
>  ui来看,整个savepoint的完成是0/841,应该是几乎没开始就出错了。
>  目前4台机器:
>  机器1
>  2022-08-23 22:47:37,093 WARN
> 
> org.apache.flink.runtime.taskmanager.Task
> [] -
>  Source: JobConfig - Split(JobName_configType)
>  (1/1)#0 (b5076938b231fb9d33e582104292ebd5) switched from
> RUNNING to
>  FAILED with failure cause:
>  org.apache.flink.util.FlinkRuntimeException: S
>  top-with-savepoint failed.
>  at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointAbortAsync$15(StreamTask.java:1339)
>  at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$17(StreamTask.java:1364)
>  at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskAction
>  Executor.java:93)
>  at
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
>  at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:33
>  8)
>  at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
>  at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
>  at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
>  at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
>  at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>  at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
>  at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>  at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>  at
> java.lang.Thread.run(Thread.java:748)
>  下面就是各种 free task,unregister扒拉的。
> 
>  机器2
>  ...
>  基本看起来都是从 Attempt to cancel 开始,没有先报错,再cancel的貌似。
> 
>  Xuyang   
>   Hi, TM上有报错信息嘛?有的话可以贴出来看一下是什么导致cp失败的
>  
>  
>  
>  
>  
>  
>  
>   --
>  
>   Best!
>   Xuyang
>  
>  
>  
>  
>  
>   Hi, TM上有报错信息嘛?有的话可以贴出来看一下是什么导致cp失败的
>   在 2022-08-23 20:41:59,"yidan zhao"补充部分信息:
>   看日志,如果是 flink savepoint xxx 这样触发检查点,JM的日志很简单:
>   2022-08-23 20:33:22,307 INFO
>  
> org.apache.flink.runtime.jobmaster.JobMaster
> [] -
>   Triggering savepoint for job 8d231de75b8227a1b
>   715b1aa665caa91.
>   
>   2022-08-23 20:33:22,318 INFO
>  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> [] -
>   Triggering checkpoint 5 (type=SavepointType{na
>   me='Savepoint', postCheckpointAction=NONE,
> formatType=CANONICAL}) @
>   1661258002307 for job 8d231de75b8227a1b715b1aa665caa91.
>   
>   2022-08-23 20:33:23,701 INFO
>  
> org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream
>   [] - Cannot create recoverable writer
>due to Recoverable writers on Hadoop are only supported for
> HDFS,
>   will use the ordinary writer.
>   
>   2022-08-23 20:33:23,908 INFO
>  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> [] -
>   Completed checkpoint 5 for job
> 8d231de75b8227a1b715b1aa665caa91
>   (1638207 bytes, checkpointDuration=1600 ms,
> finalizationTime=1 ms).
>   
>   
>   如果是 stop xxx 这样停止任务,则JM日志(错误)如下:
>   
>   2022-08-23 20:35:01,834 INFO
>  
> org.apache.flink.runtime.jobmaster.JobMaster
> [] -
>   Triggering stop-with-savepoint for job
>   8d231de75b8227a1b715b1aa665caa91.
>   
>   2022-08-23 20:35:01,842 INFO
>  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> [] -
>   Triggering checkpoint 6 (type=SavepointType{name='Suspend
> Savepoint',
>   postCheckpointAction=SUSPEND, formatType=CANONICAL}) @
> 1661258101834
>   for job 8d231de75b8227a1b715b1aa665caa91.
>   
>   2022-08-23 20:35:02,083 INFO
>  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> [] -
>   Decline checkpoint 6 by task
> a65383dad01bc15f654c4afe4aa63b6d of job
>   8d231de75b8227a1b715b1aa665caa91 at 10.35.95.150:13151-3dfdc5
> @
>   xxx.xxx.com (dataPort=13156).
>   (此处看起来是被decline了,原因是 task