Re: Switching kafka brokers
Hi Lars, That sounds like a painful process. Since the offsets are inconsistent, I would suggest to reset the Kafka source state by changing the `uid`, set the source to start from earliest if you haven't already, make the bootstrap server change, and restart your job with allowNonRestoredState enabled. This process effectively will retain Flink state, excluding the Kafka source. Here is a similar question, where Martjin gave the same answer, but about resetting Kafka topics: https://lists.apache.org/thread/xcfjm23xk7xy9nh887pvsxbw9z649p3q. If you are interested, I talked about this exact problem at Flink Forward and how we are trying to solve it with FLIP-246 https://cwiki.apache.org/confluence/display/FLINK/FLIP-246%3A+Multi+Cluster+Kafka+Source. The discussion thread is linked at the topic if you want to give feedback on the FLIP. Best, Mason On Thu, Oct 6, 2022 at 9:40 AM Lars Skjærven wrote: > Hello, > > What is the recommended approach for migrating flink jobs to a new kafka > server? I was naively hoping to use Kafka Mirror Maker to sync the old > server with the new server, and simply continue from savepoint with updated > URL's. Unfortunately, the kafka offsets are not identical for log compacted > topics when using mirror maker. Any tips ? > > L >
Re: Can temporal table functions only be registered using the table API?
I was wrong about this. The AS OF style processing join has been disabled at a higher level, in org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTemporalJoin#createJoinOperator David On Thu, Oct 6, 2022 at 9:59 AM David Anderson wrote: > Salva, > > Have you tried doing an AS OF style processing time temporal join? I know > the documentation leads one to believe this isn't supported, but I think it > actually works. I'm basing this on this comment [1] in the code for > the TemporalProcessTimeJoinOperator: > > The operator to temporal join a stream on processing time. > > For temporal TableFunction join (LATERAL >> TemporalTableFunction(o.proctime)) and temporal table join (FOR SYSTEM_TIME >> AS OF), they can reuse same processing-time operator implementation, the >> differences between them are: (1) The temporal TableFunction join only >> supports single column in primary key but temporal table join supports >> arbitrary columns in primary key. (2) The temporal TableFunction join only >> supports inner join, temporal table join supports both inner join and left >> outer join. > > > [1] > https://github.com/apache/flink/blob/release-1.15/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalProcessTimeJoinOperator.java#L38 > > Regards, > David > > On Wed, Oct 5, 2022 at 6:39 AM Salva Alcántara > wrote: > >> I've found more examples here: >> >> https://www.ververica.com/blog/a-journey-to-beating-flinks-sql-performance >> >> where a fact table is enriched using several dimension tables, but again >> the temporal table functions are registered using Table API like so: >> >> ```java >> tEnv.registerFunction( >> "dimension_table1", >> tEnv.from("dim_table1").createTemporalTableFunction("r_proctime", >> "id"));``` >> >> It's not exactly the same application, since this example covers a lookup >> join, but the SQL query is also relying on the LATERAL TABLE + temporal >> table functions: >> >> ``` >> SELECT >> D1.col1 AS A, >> D1.col2 AS B, >> FROM >> fact_table, >> LATERAL TABLE (dimension_table1(f_proctime)) AS D1, >> WHERE >> fact_table.dim1 = D1.id >> ``` >> >> In particular, this produces a job which is equivalent to >> >> ``` >> private abstract static class AbstractFactDimTableJoin >> extends CoProcessFunction { >> private static final long serialVersionUID = 1L; >> >> protected transient ValueState dimState; >> >> @Override >> public void processElement1(IN1 value, Context ctx, Collector >> out) throws Exception { >> Dimension dim = dimState.value(); >> if (dim == null) { >> return; >> } >> out.collect(join(value, dim)); >> } >> >> abstract OUT join(IN1 value, Dimension dim); >> >> @Override >> public void processElement2(Dimension value, Context ctx, >> Collector out) throws Exception { >> dimState.update(value); >> } >> >> @Override >> public void open(Configuration parameters) throws Exception { >> super.open(parameters); >> ValueStateDescriptor dimStateDesc = >> new ValueStateDescriptor<>("dimstate", Dimension.class); >> this.dimState = getRuntimeContext().getState(dimStateDesc); >> } >> } >> ``` >> >> I'm basically interested in rewriting these types of DIY joins (based on >> CoProcessFunction or CoFlatMapFunction) from DataStream to pure SQL if >> possible, otherwise I would like to know which limitations there are. >> >> Regards, >> >> Salva >> >> On Tue, Oct 4, 2022 at 9:09 PM Salva Alcántara >> wrote: >> >>> By looking at the docs for older versions of Flink, e.g., >>> >>> >>> https://nightlies.apache.org/flink/flink-docs-release-1.8/dev/table/streaming/joins.html >>> >>> it seems that it's possible to rewrite this query >>> >>> ``` >>> SELECT >>> o.amount * r.rate AS amount >>> FROM >>> Orders AS o, >>> LATERAL TABLE (Rates(o.rowtime)) AS r >>> WHERE r.currency = o.currency >>> ``` >>> >>> as >>> >>> ``` >>> SELECT >>> SUM(o.amount * r.rate) AS amount >>> FROM Orders AS o, >>> RatesHistory AS r >>> WHERE r.currency = o.currency >>> AND r.rowtime = ( >>> SELECT MAX(rowtime) >>> FROM RatesHistory AS r2 >>> WHERE r2.currency = o.currency >>> AND r2.rowtime <= o.rowtime); >>> ``` >>> >>> This would be a way to accomplish this task in SQL without using a >>> temporal table function. >>> >>> Would this rewrite be equivalent in terms of the final generated job? >>> Obviously I very much prefer the LATERAL TABLE query but this requires >>> using a temporal table function which can only be registered using the >>> Table API (apparently). >>> >>> Regards, >>> >>> Salva >>> >>> On Tue, Oct 4, 2022 at 8:39 PM Salva Alcántara >>> wrote: >>> It doesn't seem the case with processing time unless I'm mistaken: https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#processing-time-temporal-join
Switching kafka brokers
Hello, What is the recommended approach for migrating flink jobs to a new kafka server? I was naively hoping to use Kafka Mirror Maker to sync the old server with the new server, and simply continue from savepoint with updated URL's. Unfortunately, the kafka offsets are not identical for log compacted topics when using mirror maker. Any tips ? L
Sometimes checkpoints to s3 fail
Hello all. I can’t understand the floating problem, sometimes checkpoints stop passing, sometimes they start to complete every other time. Flink 1.14.4 in kubernetes application mode. 2022-10-06 09:08:04,731 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Triggering checkpoint 18314 (type=CHECKPOINT) @ 1665047284716 for job . 2022-10-06 09:11:29,130 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Decline checkpoint 18314 by task 048169f0e3c2efd473d3cef9c9d2cd70 of job at job-name-taskmanager-3-1 @ 10.109.0.168 (dataPort=43795). org.apache.flink.util.SerializedThrowable: Asynchronous task checkpoint failed. at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:301) ~[flink-dist_2.12-1.14.4.jar:1.14.4] at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:155) ~[flink-dist_2.12-1.14.4.jar:1.14.4] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?] at java.lang.Thread.run(Thread.java:829) [?:?] Caused by: org.apache.flink.util.SerializedThrowable: Could not materialize checkpoint 18314 for operator Process rec last clicks -> Cast rec last clicks type (30/44)#0. at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:279) ~[flink-dist_2.12-1.14.4.jar:1.14.4] ... 4 more Caused by: org.apache.flink.util.SerializedThrowable: java.io.IOException: Could not flush to file and close the file system output stream to s3p://flink-checkpoints/k8s-checkpoint-job-name//shared/7c09fcf1-49b9-4b72-b756-81cd7778e396 in order to obtain the stream state handle at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:?] at java.util.concurrent.FutureTask.get(FutureTask.java:191) ~[?:?] at org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:645) ~[flink-dist_2.12-1.14.4.jar:1.14.4] at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:54) ~[flink-dist_2.12-1.14.4.jar:1.14.4] at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:177) ~[flink-dist_2.12-1.14.4.jar:1.14.4] at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124) ~[flink-dist_2.12-1.14.4.jar:1.14.4] ... 3 more Caused by: org.apache.flink.util.SerializedThrowable: Could not flush to file and close the file system output stream to s3p://flink-checkpoints/k8s-checkpoint-job-name//shared/7c09fcf1-49b9-4b72-b756-81cd7778e396 in order to obtain the stream state handle at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:373) ~[flink-dist_2.12-1.14.4.jar:1.14.4] at org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadLocalFileToCheckpointFs(RocksDBStateUploader.java:143) ~[flink-dist_2.12-1.14.4.jar:1.14.4] at org.apache.flink.contrib.streaming.state.RocksDBStateUploader.lambda$createUploadFutures$0(RocksDBStateUploader.java:101) ~[flink-dist_2.12-1.14.4.jar:1.14.4] at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:32) ~[flink-dist_2.12-1.14.4.jar:1.14.4] at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700) ~[?:?] ... 3 more Caused by: org.apache.flink.util.SerializedThrowable: com.amazonaws.services.s3.model.AmazonS3Exception: This multipart completion is already in progress (Service: Amazon S3; Status Code: 500; Error Code: InternalError; Request ID: tx000ced9f8-00633e9bc1-18489a52-default; S3 Extended Request ID: 18489a52-default-default; Proxy: null), S3 Extended Request ID: 18489a52-default-default at com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3OutputStream.uploadObject(PrestoS3FileSystem.java:1278) ~[?:?] at com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3OutputStream.close(PrestoS3FileSystem.java:1226) ~[?:?] at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) ~[?:?] at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:101) ~[?:?] at org.apache.flink.fs.s3presto.common.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52) ~[?:?] at org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
Re: Can temporal table functions only be registered using the table API?
As for your original question, the documentation states that a temporal table function can only be registered via the Table API, and I believe this is true. David On Thu, Oct 6, 2022 at 9:59 AM David Anderson wrote: > Salva, > > Have you tried doing an AS OF style processing time temporal join? I know > the documentation leads one to believe this isn't supported, but I think it > actually works. I'm basing this on this comment [1] in the code for > the TemporalProcessTimeJoinOperator: > > The operator to temporal join a stream on processing time. > > For temporal TableFunction join (LATERAL >> TemporalTableFunction(o.proctime)) and temporal table join (FOR SYSTEM_TIME >> AS OF), they can reuse same processing-time operator implementation, the >> differences between them are: (1) The temporal TableFunction join only >> supports single column in primary key but temporal table join supports >> arbitrary columns in primary key. (2) The temporal TableFunction join only >> supports inner join, temporal table join supports both inner join and left >> outer join. > > > [1] > https://github.com/apache/flink/blob/release-1.15/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalProcessTimeJoinOperator.java#L38 > > Regards, > David > > On Wed, Oct 5, 2022 at 6:39 AM Salva Alcántara > wrote: > >> I've found more examples here: >> >> https://www.ververica.com/blog/a-journey-to-beating-flinks-sql-performance >> >> where a fact table is enriched using several dimension tables, but again >> the temporal table functions are registered using Table API like so: >> >> ```java >> tEnv.registerFunction( >> "dimension_table1", >> tEnv.from("dim_table1").createTemporalTableFunction("r_proctime", >> "id"));``` >> >> It's not exactly the same application, since this example covers a lookup >> join, but the SQL query is also relying on the LATERAL TABLE + temporal >> table functions: >> >> ``` >> SELECT >> D1.col1 AS A, >> D1.col2 AS B, >> FROM >> fact_table, >> LATERAL TABLE (dimension_table1(f_proctime)) AS D1, >> WHERE >> fact_table.dim1 = D1.id >> ``` >> >> In particular, this produces a job which is equivalent to >> >> ``` >> private abstract static class AbstractFactDimTableJoin >> extends CoProcessFunction { >> private static final long serialVersionUID = 1L; >> >> protected transient ValueState dimState; >> >> @Override >> public void processElement1(IN1 value, Context ctx, Collector >> out) throws Exception { >> Dimension dim = dimState.value(); >> if (dim == null) { >> return; >> } >> out.collect(join(value, dim)); >> } >> >> abstract OUT join(IN1 value, Dimension dim); >> >> @Override >> public void processElement2(Dimension value, Context ctx, >> Collector out) throws Exception { >> dimState.update(value); >> } >> >> @Override >> public void open(Configuration parameters) throws Exception { >> super.open(parameters); >> ValueStateDescriptor dimStateDesc = >> new ValueStateDescriptor<>("dimstate", Dimension.class); >> this.dimState = getRuntimeContext().getState(dimStateDesc); >> } >> } >> ``` >> >> I'm basically interested in rewriting these types of DIY joins (based on >> CoProcessFunction or CoFlatMapFunction) from DataStream to pure SQL if >> possible, otherwise I would like to know which limitations there are. >> >> Regards, >> >> Salva >> >> On Tue, Oct 4, 2022 at 9:09 PM Salva Alcántara >> wrote: >> >>> By looking at the docs for older versions of Flink, e.g., >>> >>> >>> https://nightlies.apache.org/flink/flink-docs-release-1.8/dev/table/streaming/joins.html >>> >>> it seems that it's possible to rewrite this query >>> >>> ``` >>> SELECT >>> o.amount * r.rate AS amount >>> FROM >>> Orders AS o, >>> LATERAL TABLE (Rates(o.rowtime)) AS r >>> WHERE r.currency = o.currency >>> ``` >>> >>> as >>> >>> ``` >>> SELECT >>> SUM(o.amount * r.rate) AS amount >>> FROM Orders AS o, >>> RatesHistory AS r >>> WHERE r.currency = o.currency >>> AND r.rowtime = ( >>> SELECT MAX(rowtime) >>> FROM RatesHistory AS r2 >>> WHERE r2.currency = o.currency >>> AND r2.rowtime <= o.rowtime); >>> ``` >>> >>> This would be a way to accomplish this task in SQL without using a >>> temporal table function. >>> >>> Would this rewrite be equivalent in terms of the final generated job? >>> Obviously I very much prefer the LATERAL TABLE query but this requires >>> using a temporal table function which can only be registered using the >>> Table API (apparently). >>> >>> Regards, >>> >>> Salva >>> >>> On Tue, Oct 4, 2022 at 8:39 PM Salva Alcántara >>> wrote: >>> It doesn't seem the case with processing time unless I'm mistaken: https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#processing-time-temporal-join This case seems to require a
Re: Can temporal table functions only be registered using the table API?
Salva, Have you tried doing an AS OF style processing time temporal join? I know the documentation leads one to believe this isn't supported, but I think it actually works. I'm basing this on this comment [1] in the code for the TemporalProcessTimeJoinOperator: The operator to temporal join a stream on processing time. For temporal TableFunction join (LATERAL TemporalTableFunction(o.proctime)) > and temporal table join (FOR SYSTEM_TIME AS OF), they can reuse same > processing-time operator implementation, the differences between them are: > (1) The temporal TableFunction join only supports single column in primary > key but temporal table join supports arbitrary columns in primary key. (2) > The temporal TableFunction join only supports inner join, temporal table > join supports both inner join and left outer join. [1] https://github.com/apache/flink/blob/release-1.15/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalProcessTimeJoinOperator.java#L38 Regards, David On Wed, Oct 5, 2022 at 6:39 AM Salva Alcántara wrote: > I've found more examples here: > > https://www.ververica.com/blog/a-journey-to-beating-flinks-sql-performance > > where a fact table is enriched using several dimension tables, but again > the temporal table functions are registered using Table API like so: > > ```java > tEnv.registerFunction( > "dimension_table1", > tEnv.from("dim_table1").createTemporalTableFunction("r_proctime", > "id"));``` > > It's not exactly the same application, since this example covers a lookup > join, but the SQL query is also relying on the LATERAL TABLE + temporal > table functions: > > ``` > SELECT > D1.col1 AS A, > D1.col2 AS B, > FROM > fact_table, > LATERAL TABLE (dimension_table1(f_proctime)) AS D1, > WHERE > fact_table.dim1 = D1.id > ``` > > In particular, this produces a job which is equivalent to > > ``` > private abstract static class AbstractFactDimTableJoin > extends CoProcessFunction { > private static final long serialVersionUID = 1L; > > protected transient ValueState dimState; > > @Override > public void processElement1(IN1 value, Context ctx, Collector > out) throws Exception { > Dimension dim = dimState.value(); > if (dim == null) { > return; > } > out.collect(join(value, dim)); > } > > abstract OUT join(IN1 value, Dimension dim); > > @Override > public void processElement2(Dimension value, Context ctx, > Collector out) throws Exception { > dimState.update(value); > } > > @Override > public void open(Configuration parameters) throws Exception { > super.open(parameters); > ValueStateDescriptor dimStateDesc = > new ValueStateDescriptor<>("dimstate", Dimension.class); > this.dimState = getRuntimeContext().getState(dimStateDesc); > } > } > ``` > > I'm basically interested in rewriting these types of DIY joins (based on > CoProcessFunction or CoFlatMapFunction) from DataStream to pure SQL if > possible, otherwise I would like to know which limitations there are. > > Regards, > > Salva > > On Tue, Oct 4, 2022 at 9:09 PM Salva Alcántara > wrote: > >> By looking at the docs for older versions of Flink, e.g., >> >> >> https://nightlies.apache.org/flink/flink-docs-release-1.8/dev/table/streaming/joins.html >> >> it seems that it's possible to rewrite this query >> >> ``` >> SELECT >> o.amount * r.rate AS amount >> FROM >> Orders AS o, >> LATERAL TABLE (Rates(o.rowtime)) AS r >> WHERE r.currency = o.currency >> ``` >> >> as >> >> ``` >> SELECT >> SUM(o.amount * r.rate) AS amount >> FROM Orders AS o, >> RatesHistory AS r >> WHERE r.currency = o.currency >> AND r.rowtime = ( >> SELECT MAX(rowtime) >> FROM RatesHistory AS r2 >> WHERE r2.currency = o.currency >> AND r2.rowtime <= o.rowtime); >> ``` >> >> This would be a way to accomplish this task in SQL without using a >> temporal table function. >> >> Would this rewrite be equivalent in terms of the final generated job? >> Obviously I very much prefer the LATERAL TABLE query but this requires >> using a temporal table function which can only be registered using the >> Table API (apparently). >> >> Regards, >> >> Salva >> >> On Tue, Oct 4, 2022 at 8:39 PM Salva Alcántara >> wrote: >> >>> It doesn't seem the case with processing time unless I'm mistaken: >>> >>> >>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#processing-time-temporal-join >>> >>> This case seems to require a different syntax based on LATERAL TABLE and >>> a temporal table function (FOR SYSTEM_TIME is not supported). From the docs >>> too, it seems that temporal table functions can only be registered via the >>> table API. Am I missing/misunderstanding something? >>> >>> Salva >>> >>> On Tue, Oct 4, 2022, 19:26 Martijn Visser >>> wrote: >>> Hi Salva, The examples for temporal table