Re: Switching kafka brokers

2022-10-06 Thread Mason Chen
Hi Lars,

That sounds like a painful process. Since the offsets are inconsistent, I
would suggest to reset the Kafka source state by changing the `uid`, set
the source to start from earliest if you haven't already, make the
bootstrap server change, and restart your job with allowNonRestoredState
enabled. This process effectively will retain Flink state, excluding the
Kafka source.

Here is a similar question, where Martjin gave the same answer, but about
resetting Kafka topics:
https://lists.apache.org/thread/xcfjm23xk7xy9nh887pvsxbw9z649p3q.

If you are interested, I talked about this exact problem at Flink Forward
and how we are trying to solve it with FLIP-246
https://cwiki.apache.org/confluence/display/FLINK/FLIP-246%3A+Multi+Cluster+Kafka+Source.
The discussion thread is linked at the topic if you want to give feedback
on the FLIP.

Best,
Mason

On Thu, Oct 6, 2022 at 9:40 AM Lars Skjærven  wrote:

> Hello,
>
> What is the recommended approach for migrating flink jobs to a new kafka
> server? I was naively hoping to use Kafka Mirror Maker to sync the old
> server with the new server, and simply continue from savepoint with updated
> URL's. Unfortunately, the kafka offsets are not identical for log compacted
> topics when using mirror maker. Any tips ?
>
> L
>


Re: Can temporal table functions only be registered using the table API?

2022-10-06 Thread David Anderson
I was wrong about this. The AS OF style processing join has been disabled
at a higher level,
in 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTemporalJoin#createJoinOperator

David

On Thu, Oct 6, 2022 at 9:59 AM David Anderson  wrote:

> Salva,
>
> Have you tried doing an AS OF style processing time temporal join? I know
> the documentation leads one to believe this isn't supported, but I think it
> actually works. I'm basing this on this comment [1] in the code for
> the TemporalProcessTimeJoinOperator:
>
> The operator to temporal join a stream on processing time.
>
> For temporal TableFunction join (LATERAL
>> TemporalTableFunction(o.proctime)) and temporal table join (FOR SYSTEM_TIME
>> AS OF), they can reuse same processing-time operator implementation, the
>> differences between them are: (1) The temporal TableFunction join only
>> supports single column in primary key but temporal table join supports
>> arbitrary columns in primary key. (2) The temporal TableFunction join only
>> supports inner join, temporal table join supports both inner join and left
>> outer join.
>
>
> [1]
> https://github.com/apache/flink/blob/release-1.15/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalProcessTimeJoinOperator.java#L38
>
> Regards,
> David
>
> On Wed, Oct 5, 2022 at 6:39 AM Salva Alcántara 
> wrote:
>
>> I've found more examples here:
>>
>> https://www.ververica.com/blog/a-journey-to-beating-flinks-sql-performance
>>
>> where a fact table is enriched using several dimension tables, but again
>> the temporal table functions are registered using Table API like so:
>>
>> ```java
>> tEnv.registerFunction(
>> "dimension_table1",
>> tEnv.from("dim_table1").createTemporalTableFunction("r_proctime",
>> "id"));```
>>
>> It's not exactly the same application, since this example covers a lookup
>> join, but the SQL query is also relying on the LATERAL TABLE + temporal
>> table functions:
>>
>> ```
>> SELECT
>> D1.col1 AS A,
>> D1.col2 AS B,
>> FROM
>> fact_table,
>> LATERAL TABLE (dimension_table1(f_proctime)) AS D1,
>> WHERE
>> fact_table.dim1 = D1.id
>> ```
>>
>> In particular, this produces a job which is equivalent to
>>
>> ```
>>   private abstract static class AbstractFactDimTableJoin
>>   extends CoProcessFunction {
>> private static final long serialVersionUID = 1L;
>>
>> protected transient ValueState dimState;
>>
>> @Override
>> public void processElement1(IN1 value, Context ctx, Collector
>> out) throws Exception {
>>   Dimension dim = dimState.value();
>>   if (dim == null) {
>> return;
>>   }
>>   out.collect(join(value, dim));
>> }
>>
>> abstract OUT join(IN1 value, Dimension dim);
>>
>> @Override
>> public void processElement2(Dimension value, Context ctx,
>> Collector out) throws Exception {
>>   dimState.update(value);
>> }
>>
>> @Override
>> public void open(Configuration parameters) throws Exception {
>>   super.open(parameters);
>>   ValueStateDescriptor dimStateDesc =
>>   new ValueStateDescriptor<>("dimstate", Dimension.class);
>>   this.dimState = getRuntimeContext().getState(dimStateDesc);
>> }
>>   }
>> ```
>>
>> I'm basically interested in rewriting these types of DIY joins (based on
>> CoProcessFunction or CoFlatMapFunction) from DataStream to pure SQL if
>> possible, otherwise I would like to know which limitations there are.
>>
>> Regards,
>>
>> Salva
>>
>> On Tue, Oct 4, 2022 at 9:09 PM Salva Alcántara 
>> wrote:
>>
>>> By looking at the docs for older versions of Flink, e.g.,
>>>
>>>
>>> https://nightlies.apache.org/flink/flink-docs-release-1.8/dev/table/streaming/joins.html
>>>
>>> it seems that it's possible to rewrite this query
>>>
>>> ```
>>> SELECT
>>>   o.amount * r.rate AS amount
>>> FROM
>>>   Orders AS o,
>>>   LATERAL TABLE (Rates(o.rowtime)) AS r
>>> WHERE r.currency = o.currency
>>> ```
>>>
>>> as
>>>
>>> ```
>>> SELECT
>>>   SUM(o.amount * r.rate) AS amount
>>> FROM Orders AS o,
>>>   RatesHistory AS r
>>> WHERE r.currency = o.currency
>>> AND r.rowtime = (
>>>   SELECT MAX(rowtime)
>>>   FROM RatesHistory AS r2
>>>   WHERE r2.currency = o.currency
>>>   AND r2.rowtime <= o.rowtime);
>>> ```
>>>
>>> This would be a way to accomplish this task in SQL without using a
>>> temporal table function.
>>>
>>> Would this rewrite be equivalent in terms of the final generated job?
>>> Obviously I very much prefer the LATERAL TABLE query but this requires
>>> using a temporal table function which can only be registered using the
>>> Table API (apparently).
>>>
>>> Regards,
>>>
>>> Salva
>>>
>>> On Tue, Oct 4, 2022 at 8:39 PM Salva Alcántara 
>>> wrote:
>>>
 It doesn't seem the case with processing time unless I'm mistaken:


 https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#processing-time-temporal-join

 

Switching kafka brokers

2022-10-06 Thread Lars Skjærven
Hello,

What is the recommended approach for migrating flink jobs to a new kafka
server? I was naively hoping to use Kafka Mirror Maker to sync the old
server with the new server, and simply continue from savepoint with updated
URL's. Unfortunately, the kafka offsets are not identical for log compacted
topics when using mirror maker. Any tips ?

L


Sometimes checkpoints to s3 fail

2022-10-06 Thread Evgeniy Lyutikov
Hello all.
I can’t understand the floating problem, sometimes checkpoints stop passing, 
sometimes they start to complete every other time.
Flink 1.14.4 in kubernetes application mode.


2022-10-06 09:08:04,731 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Triggering 
checkpoint 18314 (type=CHECKPOINT) @ 1665047284716 for job 
.
2022-10-06 09:11:29,130 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Decline 
checkpoint 18314 by task 048169f0e3c2efd473d3cef9c9d2cd70 of job 
 at job-name-taskmanager-3-1 @ 10.109.0.168 
(dataPort=43795).
org.apache.flink.util.SerializedThrowable: Asynchronous task checkpoint failed.
at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:301)
 ~[flink-dist_2.12-1.14.4.jar:1.14.4]
at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:155)
 ~[flink-dist_2.12-1.14.4.jar:1.14.4]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
[?:?]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
[?:?]
at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: org.apache.flink.util.SerializedThrowable: Could not materialize 
checkpoint 18314 for operator Process rec last clicks -> Cast rec last clicks 
type (30/44)#0.
at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:279)
 ~[flink-dist_2.12-1.14.4.jar:1.14.4]
... 4 more
Caused by: org.apache.flink.util.SerializedThrowable: java.io.IOException: 
Could not flush to file and close the file system output stream to 
s3p://flink-checkpoints/k8s-checkpoint-job-name//shared/7c09fcf1-49b9-4b72-b756-81cd7778e396
 in order to obtain the stream state handle
at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:?]
at java.util.concurrent.FutureTask.get(FutureTask.java:191) ~[?:?]
at 
org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:645)
 ~[flink-dist_2.12-1.14.4.jar:1.14.4]
at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:54)
 ~[flink-dist_2.12-1.14.4.jar:1.14.4]
at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:177)
 ~[flink-dist_2.12-1.14.4.jar:1.14.4]
at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124)
 ~[flink-dist_2.12-1.14.4.jar:1.14.4]
... 3 more
Caused by: org.apache.flink.util.SerializedThrowable: Could not flush to file 
and close the file system output stream to 
s3p://flink-checkpoints/k8s-checkpoint-job-name//shared/7c09fcf1-49b9-4b72-b756-81cd7778e396
 in order to obtain the stream state handle
at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:373)
 ~[flink-dist_2.12-1.14.4.jar:1.14.4]
at 
org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadLocalFileToCheckpointFs(RocksDBStateUploader.java:143)
 ~[flink-dist_2.12-1.14.4.jar:1.14.4]
at 
org.apache.flink.contrib.streaming.state.RocksDBStateUploader.lambda$createUploadFutures$0(RocksDBStateUploader.java:101)
 ~[flink-dist_2.12-1.14.4.jar:1.14.4]
at 
org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:32)
 ~[flink-dist_2.12-1.14.4.jar:1.14.4]
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
 ~[?:?]
... 3 more
Caused by: org.apache.flink.util.SerializedThrowable: 
com.amazonaws.services.s3.model.AmazonS3Exception: This multipart completion is 
already in progress (Service: Amazon S3; Status Code: 500; Error Code: 
InternalError; Request ID: tx000ced9f8-00633e9bc1-18489a52-default; 
S3 Extended Request ID: 18489a52-default-default; Proxy: null), S3 Extended 
Request ID: 18489a52-default-default
at 
com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3OutputStream.uploadObject(PrestoS3FileSystem.java:1278)
 ~[?:?]
at 
com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3OutputStream.close(PrestoS3FileSystem.java:1226)
 ~[?:?]
at 
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
 ~[?:?]
at 
org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:101) 
~[?:?]
at 
org.apache.flink.fs.s3presto.common.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)
 ~[?:?]
at 
org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
 

Re: Can temporal table functions only be registered using the table API?

2022-10-06 Thread David Anderson
As for your original question, the documentation states that a temporal
table function can only be registered via the Table API, and I believe this
is true.

David

On Thu, Oct 6, 2022 at 9:59 AM David Anderson  wrote:

> Salva,
>
> Have you tried doing an AS OF style processing time temporal join? I know
> the documentation leads one to believe this isn't supported, but I think it
> actually works. I'm basing this on this comment [1] in the code for
> the TemporalProcessTimeJoinOperator:
>
> The operator to temporal join a stream on processing time.
>
> For temporal TableFunction join (LATERAL
>> TemporalTableFunction(o.proctime)) and temporal table join (FOR SYSTEM_TIME
>> AS OF), they can reuse same processing-time operator implementation, the
>> differences between them are: (1) The temporal TableFunction join only
>> supports single column in primary key but temporal table join supports
>> arbitrary columns in primary key. (2) The temporal TableFunction join only
>> supports inner join, temporal table join supports both inner join and left
>> outer join.
>
>
> [1]
> https://github.com/apache/flink/blob/release-1.15/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalProcessTimeJoinOperator.java#L38
>
> Regards,
> David
>
> On Wed, Oct 5, 2022 at 6:39 AM Salva Alcántara 
> wrote:
>
>> I've found more examples here:
>>
>> https://www.ververica.com/blog/a-journey-to-beating-flinks-sql-performance
>>
>> where a fact table is enriched using several dimension tables, but again
>> the temporal table functions are registered using Table API like so:
>>
>> ```java
>> tEnv.registerFunction(
>> "dimension_table1",
>> tEnv.from("dim_table1").createTemporalTableFunction("r_proctime",
>> "id"));```
>>
>> It's not exactly the same application, since this example covers a lookup
>> join, but the SQL query is also relying on the LATERAL TABLE + temporal
>> table functions:
>>
>> ```
>> SELECT
>> D1.col1 AS A,
>> D1.col2 AS B,
>> FROM
>> fact_table,
>> LATERAL TABLE (dimension_table1(f_proctime)) AS D1,
>> WHERE
>> fact_table.dim1 = D1.id
>> ```
>>
>> In particular, this produces a job which is equivalent to
>>
>> ```
>>   private abstract static class AbstractFactDimTableJoin
>>   extends CoProcessFunction {
>> private static final long serialVersionUID = 1L;
>>
>> protected transient ValueState dimState;
>>
>> @Override
>> public void processElement1(IN1 value, Context ctx, Collector
>> out) throws Exception {
>>   Dimension dim = dimState.value();
>>   if (dim == null) {
>> return;
>>   }
>>   out.collect(join(value, dim));
>> }
>>
>> abstract OUT join(IN1 value, Dimension dim);
>>
>> @Override
>> public void processElement2(Dimension value, Context ctx,
>> Collector out) throws Exception {
>>   dimState.update(value);
>> }
>>
>> @Override
>> public void open(Configuration parameters) throws Exception {
>>   super.open(parameters);
>>   ValueStateDescriptor dimStateDesc =
>>   new ValueStateDescriptor<>("dimstate", Dimension.class);
>>   this.dimState = getRuntimeContext().getState(dimStateDesc);
>> }
>>   }
>> ```
>>
>> I'm basically interested in rewriting these types of DIY joins (based on
>> CoProcessFunction or CoFlatMapFunction) from DataStream to pure SQL if
>> possible, otherwise I would like to know which limitations there are.
>>
>> Regards,
>>
>> Salva
>>
>> On Tue, Oct 4, 2022 at 9:09 PM Salva Alcántara 
>> wrote:
>>
>>> By looking at the docs for older versions of Flink, e.g.,
>>>
>>>
>>> https://nightlies.apache.org/flink/flink-docs-release-1.8/dev/table/streaming/joins.html
>>>
>>> it seems that it's possible to rewrite this query
>>>
>>> ```
>>> SELECT
>>>   o.amount * r.rate AS amount
>>> FROM
>>>   Orders AS o,
>>>   LATERAL TABLE (Rates(o.rowtime)) AS r
>>> WHERE r.currency = o.currency
>>> ```
>>>
>>> as
>>>
>>> ```
>>> SELECT
>>>   SUM(o.amount * r.rate) AS amount
>>> FROM Orders AS o,
>>>   RatesHistory AS r
>>> WHERE r.currency = o.currency
>>> AND r.rowtime = (
>>>   SELECT MAX(rowtime)
>>>   FROM RatesHistory AS r2
>>>   WHERE r2.currency = o.currency
>>>   AND r2.rowtime <= o.rowtime);
>>> ```
>>>
>>> This would be a way to accomplish this task in SQL without using a
>>> temporal table function.
>>>
>>> Would this rewrite be equivalent in terms of the final generated job?
>>> Obviously I very much prefer the LATERAL TABLE query but this requires
>>> using a temporal table function which can only be registered using the
>>> Table API (apparently).
>>>
>>> Regards,
>>>
>>> Salva
>>>
>>> On Tue, Oct 4, 2022 at 8:39 PM Salva Alcántara 
>>> wrote:
>>>
 It doesn't seem the case with processing time unless I'm mistaken:


 https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#processing-time-temporal-join

 This case seems to require a 

Re: Can temporal table functions only be registered using the table API?

2022-10-06 Thread David Anderson
Salva,

Have you tried doing an AS OF style processing time temporal join? I know
the documentation leads one to believe this isn't supported, but I think it
actually works. I'm basing this on this comment [1] in the code for
the TemporalProcessTimeJoinOperator:

The operator to temporal join a stream on processing time.

For temporal TableFunction join (LATERAL TemporalTableFunction(o.proctime))
> and temporal table join (FOR SYSTEM_TIME AS OF), they can reuse same
> processing-time operator implementation, the differences between them are:
> (1) The temporal TableFunction join only supports single column in primary
> key but temporal table join supports arbitrary columns in primary key. (2)
> The temporal TableFunction join only supports inner join, temporal table
> join supports both inner join and left outer join.


[1]
https://github.com/apache/flink/blob/release-1.15/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalProcessTimeJoinOperator.java#L38

Regards,
David

On Wed, Oct 5, 2022 at 6:39 AM Salva Alcántara 
wrote:

> I've found more examples here:
>
> https://www.ververica.com/blog/a-journey-to-beating-flinks-sql-performance
>
> where a fact table is enriched using several dimension tables, but again
> the temporal table functions are registered using Table API like so:
>
> ```java
> tEnv.registerFunction(
> "dimension_table1",
> tEnv.from("dim_table1").createTemporalTableFunction("r_proctime",
> "id"));```
>
> It's not exactly the same application, since this example covers a lookup
> join, but the SQL query is also relying on the LATERAL TABLE + temporal
> table functions:
>
> ```
> SELECT
> D1.col1 AS A,
> D1.col2 AS B,
> FROM
> fact_table,
> LATERAL TABLE (dimension_table1(f_proctime)) AS D1,
> WHERE
> fact_table.dim1 = D1.id
> ```
>
> In particular, this produces a job which is equivalent to
>
> ```
>   private abstract static class AbstractFactDimTableJoin
>   extends CoProcessFunction {
> private static final long serialVersionUID = 1L;
>
> protected transient ValueState dimState;
>
> @Override
> public void processElement1(IN1 value, Context ctx, Collector
> out) throws Exception {
>   Dimension dim = dimState.value();
>   if (dim == null) {
> return;
>   }
>   out.collect(join(value, dim));
> }
>
> abstract OUT join(IN1 value, Dimension dim);
>
> @Override
> public void processElement2(Dimension value, Context ctx,
> Collector out) throws Exception {
>   dimState.update(value);
> }
>
> @Override
> public void open(Configuration parameters) throws Exception {
>   super.open(parameters);
>   ValueStateDescriptor dimStateDesc =
>   new ValueStateDescriptor<>("dimstate", Dimension.class);
>   this.dimState = getRuntimeContext().getState(dimStateDesc);
> }
>   }
> ```
>
> I'm basically interested in rewriting these types of DIY joins (based on
> CoProcessFunction or CoFlatMapFunction) from DataStream to pure SQL if
> possible, otherwise I would like to know which limitations there are.
>
> Regards,
>
> Salva
>
> On Tue, Oct 4, 2022 at 9:09 PM Salva Alcántara 
> wrote:
>
>> By looking at the docs for older versions of Flink, e.g.,
>>
>>
>> https://nightlies.apache.org/flink/flink-docs-release-1.8/dev/table/streaming/joins.html
>>
>> it seems that it's possible to rewrite this query
>>
>> ```
>> SELECT
>>   o.amount * r.rate AS amount
>> FROM
>>   Orders AS o,
>>   LATERAL TABLE (Rates(o.rowtime)) AS r
>> WHERE r.currency = o.currency
>> ```
>>
>> as
>>
>> ```
>> SELECT
>>   SUM(o.amount * r.rate) AS amount
>> FROM Orders AS o,
>>   RatesHistory AS r
>> WHERE r.currency = o.currency
>> AND r.rowtime = (
>>   SELECT MAX(rowtime)
>>   FROM RatesHistory AS r2
>>   WHERE r2.currency = o.currency
>>   AND r2.rowtime <= o.rowtime);
>> ```
>>
>> This would be a way to accomplish this task in SQL without using a
>> temporal table function.
>>
>> Would this rewrite be equivalent in terms of the final generated job?
>> Obviously I very much prefer the LATERAL TABLE query but this requires
>> using a temporal table function which can only be registered using the
>> Table API (apparently).
>>
>> Regards,
>>
>> Salva
>>
>> On Tue, Oct 4, 2022 at 8:39 PM Salva Alcántara 
>> wrote:
>>
>>> It doesn't seem the case with processing time unless I'm mistaken:
>>>
>>>
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#processing-time-temporal-join
>>>
>>> This case seems to require a different syntax based on LATERAL TABLE and
>>> a temporal table function (FOR SYSTEM_TIME is not supported). From the docs
>>> too, it seems that temporal table functions can only be registered via the
>>> table API. Am I missing/misunderstanding something?
>>>
>>> Salva
>>>
>>> On Tue, Oct 4, 2022, 19:26 Martijn Visser 
>>> wrote:
>>>
 Hi Salva,

 The examples for temporal table