Re: Built-in functions to manipulate MULTISET type

2021-09-20 Thread Kai Fu
Hi Seth,

This is really helpful and inspiring, thank you for the information.

On Sun, Sep 19, 2021 at 11:06 PM Seth Wiesman  wrote:

> Hi,
>
> I agree it would be great to see these functions built-in, but you do not
> need to write a UDF for each type. You can overload a UDFs type inference
> and have the same capabilities as built-in functions, which means
> supporting generics.
>
>
> https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/functions/LastDatedValueFunction.java
>
> On Sat, Sep 18, 2021 at 7:42 AM Yuval Itzchakov  wrote:
>
>> Hi Jing,
>>
>> I recall there is already an open ticket for built-in aggregate functions
>>
>> On Sat, Sep 18, 2021, 15:08 JING ZHANG  wrote:
>>
>>> Hi Yuval,
>>> You could open a JIRA to track this if you think some functions should
>>> be added as built-in functions in Flink.
>>>
>>> Best,
>>> JING ZHANG
>>>
>>> Yuval Itzchakov  于2021年9月18日周六 下午3:33写道:
>>>
>>>> The problem with defining a UDF is that you have to create one overload
>>>> per key type in the MULTISET. It would be very convenient to have functions
>>>> like Snowflakes ARRAY_AGG.
>>>>
>>>> On Sat, Sep 18, 2021, 05:43 JING ZHANG  wrote:
>>>>
>>>>> Hi Kai,
>>>>> AFAIK, there is no built-in function to extract the keys in MULTISET
>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/types/>
>>>>>  to
>>>>> be an ARRAY. Define a UTF is a good solution.
>>>>>
>>>>> Best,
>>>>> JING ZHANG
>>>>>
>>>>> Kai Fu  于2021年9月18日周六 上午7:35写道:
>>>>>
>>>>>> Hi team,
>>>>>>
>>>>>> We want to know if there is any built-in function to extract the keys
>>>>>> in MULTISET
>>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/types/>
>>>>>> to be an ARRAY. There is no such function as far as we can find, except 
>>>>>> to
>>>>>> define a simple wrapper UDF for that, please advise.
>>>>>>
>>>>>> --
>>>>>> *Best wishes,*
>>>>>> *- Kai*
>>>>>>
>>>>>

-- 
*Best wishes,*
*- Kai*


Built-in functions to manipulate MULTISET type

2021-09-17 Thread Kai Fu
Hi team,

We want to know if there is any built-in function to extract the keys in
MULTISET

to be an ARRAY. There is no such function as far as we can find, except to
define a simple wrapper UDF for that, please advise.

-- 
*Best wishes,*
*- Kai*


Inspecting SST state of rocksdb

2021-08-08 Thread Kai Fu
Hi team,

I'm trying to inspect SST files of flink's state with sst related tools
like sst_dump, ldb in wiki
.
But it seems I'm getting meaningless results as shown below. The tools I'm
using are from RocksDB's trunk and built from source. Am I doing it the
right way, or is there any alternative to inspect the state? We're aware of
Flink's queryable state, while it seems not well supported for SQL
generated operators.
















*$ ./sst_dump --file=../db/30.sst --command=scan
--read_num=50options.env is 0xba33e0Process ../db/30.sstSst file
format: block-basedfrom [] to []'� =>'� =>'� =>'� =>'� =>'� =>'� =>'� =>'�
=>'� =>'� =>*

-- 
*Best wishes,*
*- Kai*


Re: Regarding state access in UDF

2021-07-01 Thread Kai Fu
Hi Ingo,

Thank you for your advice, we've not tried it yet, we just thought it may
work that way, but now it seems not then. We'll see how it could match our
use case with the AggregateFunction interface.

On Thu, Jul 1, 2021 at 1:57 PM Ingo Bürk  wrote:

> Hi Kai,
>
> CheckpointedFunction is not an interface meant to be used with UDFs (in
> the Table API / SQL sense[1]), but is rather an interface for DataStream
> API[2]; the term "user-defined function" has a different meaning there. Did
> you actually try it to see if it works? I'd be surprised it it did.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/functions/udfs/
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/user_defined_functions/
>
>
> Ingo
>
> On Thu, Jul 1, 2021 at 5:17 AM Kai Fu  wrote:
>
>> Hi Ingo,
>>
>> Thank you for the reply, we actually need more fine-grained control on
>> the states in UDF. Per investigation, we found that the states can be
>> simply created/accessed via implementing `CheckpointedFunction` interface,
>> please advise if there is any side-effect by doing that.
>>
>> On Wed, Jun 30, 2021 at 10:33 PM Ingo Bürk  wrote:
>>
>>> Hi Kai,
>>>
>>> AggregateFunction and TableAggregateFunction are both stateful UDF
>>> interfaces. This should cover most scenarios given where they would be
>>> used. If you need more fine-grained control you can also always drop down
>>> into the DataStream API (using #toDataStream) and work there. Table API /
>>> SQL in general are higher-level abstractions where you cannot directly
>>> interact with operators.
>>>
>>> If this doesn't answer your question it would also be great if you could
>>> explain your use case more so we can understand it. Thanks!
>>>
>>>
>>> Best
>>> Ingo
>>>
>>> On Wed, Jun 30, 2021 at 3:37 PM Kai Fu  wrote:
>>>
>>>> Hi team,
>>>>
>>>> We've a use case that needs to create/access state in UDF, while per
>>>> the documentation
>>>> <https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/functions/udfs/#runtime-integration>
>>>> and UDF interface
>>>> <https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/UserDefinedFunction.java>.
>>>> It does not provide such a way for that. We want to know if it is by design
>>>> and is there any other approach for it.
>>>>
>>>> --
>>>> *Best wishes,*
>>>> *- Kai*
>>>>
>>>
>>
>> --
>> *Best wishes,*
>> *- Kai*
>>
>

-- 
*Best wishes,*
*- Kai*


Re: Regarding state access in UDF

2021-06-30 Thread Kai Fu
Hi Ingo,

Thank you for the reply, we actually need more fine-grained control on the
states in UDF. Per investigation, we found that the states can be simply
created/accessed via implementing `CheckpointedFunction` interface, please
advise if there is any side-effect by doing that.

On Wed, Jun 30, 2021 at 10:33 PM Ingo Bürk  wrote:

> Hi Kai,
>
> AggregateFunction and TableAggregateFunction are both stateful UDF
> interfaces. This should cover most scenarios given where they would be
> used. If you need more fine-grained control you can also always drop down
> into the DataStream API (using #toDataStream) and work there. Table API /
> SQL in general are higher-level abstractions where you cannot directly
> interact with operators.
>
> If this doesn't answer your question it would also be great if you could
> explain your use case more so we can understand it. Thanks!
>
>
> Best
> Ingo
>
> On Wed, Jun 30, 2021 at 3:37 PM Kai Fu  wrote:
>
>> Hi team,
>>
>> We've a use case that needs to create/access state in UDF, while per the
>> documentation
>> <https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/functions/udfs/#runtime-integration>
>> and UDF interface
>> <https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/UserDefinedFunction.java>.
>> It does not provide such a way for that. We want to know if it is by design
>> and is there any other approach for it.
>>
>> --
>> *Best wishes,*
>> *- Kai*
>>
>

-- 
*Best wishes,*
*- Kai*


Regarding state access in UDF

2021-06-30 Thread Kai Fu
Hi team,

We've a use case that needs to create/access state in UDF, while per the
documentation

and UDF interface
.
It does not provide such a way for that. We want to know if it is by design
and is there any other approach for it.

-- 
*Best wishes,*
*- Kai*


Re: Questions about UPDATE_BEFORE row kind in ElasticSearch sink

2021-06-29 Thread Kai Fu
Thank you for the reply, Jark. In our case, we found that there are no
UPDATE_BEFORE records generated since the join is using -D/+I row kinds.

*> Note: "+I" represents "INSERT", "-D" represents "DELETE", "+U"
represents "UPDATE_AFTER",*




* "-U" represents "UPDATE_BEFORE". We forward input RowKind if it is inner
join, otherwise, we  always send insert and delete for simplification. We
can optimize this to send -U & +U  instead of D & I in the future (see
FLINK-17337). They are equivalent in this join case. It  may need some
refactoring if we want to send -U & +U, so we still keep -D & +I for now
for  simplification.*

On Mon, Jun 28, 2021 at 2:21 PM Jark Wu  wrote:

> UPDATE_BEFORE is required in cases such as Aggregation with Filter. For
> example:
>
> SELECT *
> FROM (
>   SELECT word, count(*) as cnt
>   FROM T
>   GROUP BY word
> ) WHERE cnt < 3;
>
> There is more discussion in this issue:
> https://issues.apache.org/jira/browse/FLINK-9528
>
> Best,
> Jark
>
> On Mon, 28 Jun 2021 at 13:52, Kai Fu  wrote:
>
>> Hi team,
>>
>> We notice the UPDATE_BEFORE row kind is not dropped and treated as DELETE
>> as in code
>> <https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RowElasticsearchSinkFunction.java#L81-L84>.
>> We're aware that this is useful to retract output records in some cases,
>> but we cannot come up with such a scenario, could anyone name a few cases
>> for it.
>>
>> The other thing we want to do is drop the UPDATE_BEFORE row kind in the
>> ES connector to reduce the sink traffic since almost all of our records are
>> update. In our case, the records are generated by joining with a couple of
>> upsert-kafka data sources. Only primary-key participants in the join
>> condition for all join cases, with some granularity/cardinality fan-out in
>> the middle. We want to know whether it impacts the final result correctness
>> if we drop the records with UPDATE_BEFORE row kind.
>>
>> --
>> *Best wishes,*
>> *- Kai*
>>
>

-- 
*Best wishes,*
*- Kai*


Questions about UPDATE_BEFORE row kind in ElasticSearch sink

2021-06-27 Thread Kai Fu
Hi team,

We notice the UPDATE_BEFORE row kind is not dropped and treated as DELETE
as in code
.
We're aware that this is useful to retract output records in some cases,
but we cannot come up with such a scenario, could anyone name a few cases
for it.

The other thing we want to do is drop the UPDATE_BEFORE row kind in the ES
connector to reduce the sink traffic since almost all of our records are
update. In our case, the records are generated by joining with a couple of
upsert-kafka data sources. Only primary-key participants in the join
condition for all join cases, with some granularity/cardinality fan-out in
the middle. We want to know whether it impacts the final result correctness
if we drop the records with UPDATE_BEFORE row kind.

-- 
*Best wishes,*
*- Kai*


Re: Elasticsearch sink connector timeout

2021-06-05 Thread Kai Fu
:1.13.1] 6011 at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
~[flink-dist_2.11-1.13.1.jar:1.13.1] 6012 at
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:861)
~[flink-dist_2.11-1.13.1.jar:1.13.1] 6013 at
org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:840)
[flink-dist_2.11-1.13.1.jar:1.13.1] 6014 at
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:753)
[flink-dist_2.11-1.13.1.jar:1.13.1] 6015 at
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:659)
[flink-dist_2.11-1.13.1.jar:1.13.1] 6016 at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
[flink-dist_2.11-1.13.1.jar:1.13.1] 6017 at
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
[flink-dist_2.11-1.13.1.jar:1.13.1]*












* 6030 2021-06-05 05:31:31,633 ERROR
org.apache.flink.streaming.connectors.elasticsearch.util.RetryRejectedExecutionFailureHandler
[] - Failed Elasticsearch item request: Connection closed unexpectedly 6031
org.apache.flink.elasticsearch7.shaded.org.apache.http.ConnectionClosedException:
Connection closed unexpectedly 6032 at
org.apache.flink.elasticsearch7.shaded.org.apache.http.nio.protocol.HttpAsyncRequestExecutor.closed(HttpAsyncRequestExecutor.java:146)
[flink-sql-connector-elasticsearch7_2.11- 1.13.1.jar:1.13.1] 6033 at
org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onClosed(InternalIODispatch.java:71)
[flink-sql-connector-elasticsearch7_2.11-1.13.1. jar:1.13.1] 6034 at
org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onClosed(InternalIODispatch.java:39)
[flink-sql-connector-elasticsearch7_2.11-1.13.1. jar:1.13.1] 6035 at
org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIODispatch.disconnected(AbstractIODispatch.java:100)
[flink-sql-connector-elasticsearch7_2.11-1. 13.1.jar:1.13.1] 6036 at
org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.sessionClosed(BaseIOReactor.java:277)
[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar: 1.13.1] 6037 at
org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.processClosedSessions(AbstractIOReactor.java:449)
[flink-sql-connector- elasticsearch7_2.11-1.13.1.jar:1.13.1] 6038 at
org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:283)
[flink-sql-connector-elasticsearch7_2.11-1.13.1. jar:1.13.1] 6039 at
org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] 6040 at
org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591)
[flink-sql-connector- elasticsearch7_2.11-1.13.1.jar:1.13.1] 6041 at
java.lang.Thread.run(Thread.java:748) [?:1.8.0_272]*

On Sat, Jun 5, 2021 at 12:13 PM Kai Fu  wrote:

> Hi team,
>
> We encountered an issue about ES sink connector timeout quite frequently.
> As checked the ES cluster is far from being loaded(~40% CPU utilization, no
> query, index rate is also low). We're using ES-7 connector, with 12 data
> nodes and parallelism of 32.
>
> The error log is as below, we want to know if there is any way to locate
> the issue or configure the timeout parameter.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/elasticsearch/
>
> *2021-06-05 11:49:10*
> *java.lang.RuntimeException: An error occurred in ElasticsearchSink.*
> *at
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkErrorAndRethrow(ElasticsearchSinkBase.java:427)*
> *at
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkAsyncErrorsAndRequests(ElasticsearchSinkBase.java:432)*
> *at
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:329)*
> *at
> org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:65)*
> *at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)*
> *at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)*
> *at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)*
> *at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)*
> *at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)*
&g

Elasticsearch sink connector timeout

2021-06-04 Thread Kai Fu
Hi team,

We encountered an issue about ES sink connector timeout quite frequently.
As checked the ES cluster is far from being loaded(~40% CPU utilization, no
query, index rate is also low). We're using ES-7 connector, with 12 data
nodes and parallelism of 32.

The error log is as below, we want to know if there is any way to locate
the issue or configure the timeout parameter.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/elasticsearch/

*2021-06-05 11:49:10*
*java.lang.RuntimeException: An error occurred in ElasticsearchSink.*
*at
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkErrorAndRethrow(ElasticsearchSinkBase.java:427)*
*at
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkAsyncErrorsAndRequests(ElasticsearchSinkBase.java:432)*
*at
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:329)*
*at
org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:65)*
*at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)*
*at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)*
*at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)*
*at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)*
*at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)*
*at
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)*
*at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)*
*at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)*
*at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)*
*at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)*
*at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)*
*at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)*
*at
org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.processLastRowOnChangelog(DeduplicateFunctionHelper.java:112)*
*at
org.apache.flink.table.runtime.operators.deduplicate.ProcTimeDeduplicateKeepLastRowFunction.processElement(ProcTimeDeduplicateKeepLastRowFunction.java:80)*
*at
org.apache.flink.table.runtime.operators.deduplicate.ProcTimeDeduplicateKeepLastRowFunction.processElement(ProcTimeDeduplicateKeepLastRowFunction.java:32)*
*at
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)*
*at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)*
*at org.apache.flink.streaming.runtime.io
.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)*
*at org.apache.flink.streaming.runtime.io
.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)*
*at org.apache.flink.streaming.runtime.io
.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)*
*at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)*
*at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)*
*at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)*
*at
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)*
*at
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)*
*at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)*
*at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)*
*at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)*
*at java.lang.Thread.run(Thread.java:748)*
*Caused by: java.net .SocketTimeoutException: 30,000
milliseconds timeout on connection http-outgoing-21 [ACTIVE]*
*at
org.apache.flink.elasticsearch7.shaded.org.apache.http.nio.protocol.HttpAsyncRequestExecutor.timeout(HttpAsyncRequestExecutor.java:387)*
*at
org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:92)*
*at
org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:39)*
*at

Re: Flink exported metrics scope configuration

2021-06-04 Thread Kai Fu
Hi Mason,

Thank you for the advice, as I tried, it works and reduces a lot in size.

On Fri, Jun 4, 2021 at 11:45 AM Mason Chen  wrote:

> Hi Kai,
>
> You can use the excluded variables config for the reporter.
>
>- metrics.reporter..scope.variables.excludes: (optional) A
>semi-colon (;) separate list of variables that should be ignored by
>tag-based reporters (e.g., Prometheus, InfluxDB).
>
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/metric_reporters/#reporter
>
> Best,
> Mason
>
> On Jun 3, 2021, at 9:31 PM, Kai Fu  wrote:
>
> Hi team,
>
> We noticed that Prometheus metrics exporter exports all of the metrics at
> the most fine-grained level, which is tremendous for the prometheus server
> especially when the parallelism is high. The metrics volume crawled from a
> single host(parallelism 8) is around 40MB for us currently. This is due to 
> *task_name
> *attribute in the metrics generated by the engine being very long. The
> task_name attribute is auto-generated from SQL job, and it seems it's
> attaching all field names onto it.
>
> We want to reduce the metrics volume by either drop task_name or at some
> more coarse-grained level. But I cannot find any related documents about
> this, any advice on that?
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/metric_reporters/
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/metrics/#list-of-all-variables
>
> --
> *Best wishes,*
> *- Kai*
>
>
>

-- 
*Best wishes,*
*- Kai*


Flink exported metrics scope configuration

2021-06-03 Thread Kai Fu
Hi team,

We noticed that Prometheus metrics exporter exports all of the metrics at
the most fine-grained level, which is tremendous for the prometheus server
especially when the parallelism is high. The metrics volume crawled from a
single host(parallelism 8) is around 40MB for us currently. This is
due to *task_name
*attribute in the metrics generated by the engine being very long. The
task_name attribute is auto-generated from SQL job, and it seems it's
attaching all field names onto it.

We want to reduce the metrics volume by either drop task_name or at some
more coarse-grained level. But I cannot find any related documents about
this, any advice on that?

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/metric_reporters/
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/metrics/#list-of-all-variables

-- 
*Best wishes,*
*- Kai*


Re: Error about "Rejected TaskExecutor registration at the ResourceManger"

2021-06-01 Thread Kai Fu
Hi Jing,

Thank you for your reply, that cluster is terminated and will provide the
log if it occurs again.

On Wed, Jun 2, 2021 at 11:17 AM JING ZHANG  wrote:

> Hi Kai,
> The reason why job job cannot be recovered maybe not directly related to
> the exception you mentioned in your email.
> Would you like provide complete jobmanager.log and taskmanager.log. Maybe
> we could find some hints there.
>
> Best regards,
> JING ZHANG
>
> Kai Fu  于2021年6月2日周三 上午7:23写道:
>
>> HI Till,
>>
>> Thank you for your response, per my observation that the process lasted
>> for ~1 day, and cannot be recovered and we killed the cluster finally.
>>
>> On Tue, Jun 1, 2021 at 9:47 PM Till Rohrmann 
>> wrote:
>>
>>> Hi Kai,
>>>
>>> The rejection you are seeing should not be serious. The way this can
>>> happen is the following: If Yarn restarts the application master, Flink
>>> will try to recover previously started containers. If this is not possible
>>> or Yarn only tells about a subset of the previously allocated containers,
>>> then it can happen that if a container that has not been reported to the
>>> new ResourceManager tries to register is rejected because it is not known.
>>> The idea behind this behaviour is to only accept those resources which one
>>> has knowingly requested in order to free other resources which might belong
>>> to another Yarn application.
>>>
>>> In any case, the newly started Flink ResourceManager should request new
>>> containers so that there are enough TaskManagers available to run your job
>>> (assuming that the Yarn cluster has enough resources). Hence, the cluster
>>> should recover from this situation and there should not be a lot to worry
>>> about.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Sun, May 30, 2021 at 7:36 AM Kai Fu  wrote:
>>>
>>>> Hi team,
>>>>
>>>> We encountered an issue during recovery from checkpoint. It's
>>>> recovering because the downstream Kafka sink is full for a while and the
>>>> job is failed and keeps trying to recover(The downstream is full for about
>>>> 4 hours). The job cannot recover from checkpoint successfully even if after
>>>> we scaled up the Kafka cluster and shows the following exception. Is there
>>>> any guidance on how to locate and avoid this kind of issue?
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> *2021-05-30 01:31:21,419 INFO
>>>>  org.apache.flink.runtime.taskexecutor.TaskExecutor   [] -
>>>> Connecting to ResourceManager
>>>> akka.tcp://flink@ip-10-61-98-18.ec2.internal:36833/user/rpc/resourcemanager_*().2021-05-30
>>>> 01:31:21,422 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor
>>>> [] - Resolved ResourceManager address, beginning registration2021-05-30
>>>> 01:31:21,427 ERROR org.apache.flink.runtime.taskexecutor.TaskExecutor
>>>> [] - Fatal error occurred in TaskExecutor
>>>> akka.tcp://flink@ip-10-61-98-204.ec2.internal:36631/user/rpc/taskmanager_0.org.apache.flink.util.FlinkException:
>>>> The TaskExecutor's registration at the ResourceManager
>>>> akka.tcp://flink@ip-10-61-98-18.ec2.internal:36833/user/rpc/resourcemanager_*
>>>> has been rejected: Rejected TaskExecutor registration at the ResourceManger
>>>> because: The ResourceManager does not recognize this TaskExecutor.
>>>> at
>>>> org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerRegistrationListener.onRegistrationRejection(TaskExecutor.java:2254)
>>>> ~[flink-dist_2.11-1.13.1.jar:1.13.1]at
>>>> org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerRegistrationListener.onRegistrationRejection(TaskExecutor.java:2209)
>>>> ~[flink-dist_2.11-1.13.1.jar:1.13.1]at
>>>> org.apache.flink.runtime.taskexecutor.TaskExecutorToResourceManagerConnection.onRegistrationRejection(TaskExecutorToResourceManagerConnection.java:109)
>>>> ~[

Re: Error about "Rejected TaskExecutor registration at the ResourceManger"

2021-06-01 Thread Kai Fu
HI Till,

Thank you for your response, per my observation that the process lasted for
~1 day, and cannot be recovered and we killed the cluster finally.

On Tue, Jun 1, 2021 at 9:47 PM Till Rohrmann  wrote:

> Hi Kai,
>
> The rejection you are seeing should not be serious. The way this can
> happen is the following: If Yarn restarts the application master, Flink
> will try to recover previously started containers. If this is not possible
> or Yarn only tells about a subset of the previously allocated containers,
> then it can happen that if a container that has not been reported to the
> new ResourceManager tries to register is rejected because it is not known.
> The idea behind this behaviour is to only accept those resources which one
> has knowingly requested in order to free other resources which might belong
> to another Yarn application.
>
> In any case, the newly started Flink ResourceManager should request new
> containers so that there are enough TaskManagers available to run your job
> (assuming that the Yarn cluster has enough resources). Hence, the cluster
> should recover from this situation and there should not be a lot to worry
> about.
>
> Cheers,
> Till
>
> On Sun, May 30, 2021 at 7:36 AM Kai Fu  wrote:
>
>> Hi team,
>>
>> We encountered an issue during recovery from checkpoint. It's recovering
>> because the downstream Kafka sink is full for a while and the job is failed
>> and keeps trying to recover(The downstream is full for about 4 hours). The
>> job cannot recover from checkpoint successfully even if after we scaled up
>> the Kafka cluster and shows the following exception. Is there any guidance
>> on how to locate and avoid this kind of issue?
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *2021-05-30 01:31:21,419 INFO
>>  org.apache.flink.runtime.taskexecutor.TaskExecutor   [] -
>> Connecting to ResourceManager
>> akka.tcp://flink@ip-10-61-98-18.ec2.internal:36833/user/rpc/resourcemanager_*().2021-05-30
>> 01:31:21,422 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor
>> [] - Resolved ResourceManager address, beginning registration2021-05-30
>> 01:31:21,427 ERROR org.apache.flink.runtime.taskexecutor.TaskExecutor
>> [] - Fatal error occurred in TaskExecutor
>> akka.tcp://flink@ip-10-61-98-204.ec2.internal:36631/user/rpc/taskmanager_0.org.apache.flink.util.FlinkException:
>> The TaskExecutor's registration at the ResourceManager
>> akka.tcp://flink@ip-10-61-98-18.ec2.internal:36833/user/rpc/resourcemanager_*
>> has been rejected: Rejected TaskExecutor registration at the ResourceManger
>> because: The ResourceManager does not recognize this TaskExecutor.
>> at
>> org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerRegistrationListener.onRegistrationRejection(TaskExecutor.java:2254)
>> ~[flink-dist_2.11-1.13.1.jar:1.13.1]at
>> org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerRegistrationListener.onRegistrationRejection(TaskExecutor.java:2209)
>> ~[flink-dist_2.11-1.13.1.jar:1.13.1]at
>> org.apache.flink.runtime.taskexecutor.TaskExecutorToResourceManagerConnection.onRegistrationRejection(TaskExecutorToResourceManagerConnection.java:109)
>> ~[flink-dist_2.11-1.13.1.jar:1.13.1]at
>> org.apache.flink.runtime.taskexecutor.TaskExecutorToResourceManagerConnection.onRegistrationRejection(TaskExecutorToResourceManagerConnection.java:40)
>> ~[flink-dist_2.11-1.13.1.jar:1.13.1]at
>> org.apache.flink.runtime.registration.RegisteredRpcConnection.lambda$createNewRegistration$0(RegisteredRpcConnection.java:269)
>> ~[flink-dist_2.11-1.13.1.jar:1.13.1]at
>> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
>> ~[?:1.8.0_272]at
>> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
>> ~[?:1.8.0_272]at
>> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
>> ~[?:1.8.0_272]at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
>> ~[flink-dist_2.11-1.13.1.jar:1.13.1]at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
>> ~[flink-dist_2.11-1.13.1.jar:1.13.1]at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
>> ~[flink-dist_2

Re: Dynamic configuration of Flink checkpoint interval

2021-05-30 Thread Kai Fu
Hi JING,

Here is the issue link: https://issues.apache.org/jira/browse/FLINK-22805

On Mon, May 31, 2021 at 10:21 AM JING ZHANG  wrote:

> Hi Kai,
>
> Happy to hear that.
> Would you please paste the JIRA link in the email after you create it.
> Maybe it could help other users who encounter the same problem. Thanks very
> much.
>
> Best regards,
> JING ZHANG
>
> Kai Fu  于2021年5月30日周日 下午11:19写道:
>
>> Hi Jing,
>>
>> Yup, what you're describing is what I want. I also tried the approach you
>> suggested and it works. I'm going to take that approach for the moment and
>> create a Jira issue for this feature.
>>
>> On Sun, May 30, 2021 at 8:57 PM JING ZHANG  wrote:
>>
>>> Hi Kai,
>>>
>>> Do you try to find a way to hot update checkpoint interval or
>>> disable/enable checkpoint without stop and restart job?
>>> Unfortunately, it is not supported yet, AFAIK.
>>> You're very welcome to create an issue and describe your needs here (Flink’s
>>> Jira <http://issues.apache.org/jira/browse/FLINK>) .
>>> At present, you may would like to use the following temporary solution:
>>>   1. set a bigger value as checkpoint interval, start your job
>>>   2. do a savepoint after cold start is completed
>>>   3. set a normal value as checkpoint interval, restart the job from
>>> savepoint
>>>
>>> Best regards,
>>> JING ZHANG
>>>
>>> Kai Fu  于2021年5月30日周日 下午7:13写道:
>>>
>>>> Hi team,
>>>>
>>>> We want to know if Flink has some dynamic configuration of the
>>>> checkpoint interval. Our use case has a cold start phase where the entire
>>>> dataset is replayed from the beginning until the most recent ones.
>>>>
>>>> In the cold start phase, the resources are fully utilized and the
>>>> backpressure is high for all upstream operators, causing the checkpoint
>>>> timeout constantly. The real production traffic is far less than that and
>>>> the current provisioned resource is capable of handling it.
>>>>
>>>> We're thinking if Flink can support the dynamic checkpoint config to
>>>> bypass the checkpoint operation or make it less frequent on the cold start
>>>> phase to speed up the process, while making the checkpoint normal again
>>>> once the cold start is completed.
>>>>
>>>> --
>>>> *Best wishes,*
>>>> *- Kai*
>>>>
>>>
>>
>> --
>> *Best wishes,*
>> *- Kai*
>>
>

-- 
*Best wishes,*
*- Kai*


Re: Dynamic configuration of Flink checkpoint interval

2021-05-30 Thread Kai Fu
Hi Jing,

Yup, what you're describing is what I want. I also tried the approach you
suggested and it works. I'm going to take that approach for the moment and
create a Jira issue for this feature.

On Sun, May 30, 2021 at 8:57 PM JING ZHANG  wrote:

> Hi Kai,
>
> Do you try to find a way to hot update checkpoint interval or
> disable/enable checkpoint without stop and restart job?
> Unfortunately, it is not supported yet, AFAIK.
> You're very welcome to create an issue and describe your needs here (Flink’s
> Jira <http://issues.apache.org/jira/browse/FLINK>) .
> At present, you may would like to use the following temporary solution:
>   1. set a bigger value as checkpoint interval, start your job
>   2. do a savepoint after cold start is completed
>   3. set a normal value as checkpoint interval, restart the job from
> savepoint
>
> Best regards,
> JING ZHANG
>
> Kai Fu  于2021年5月30日周日 下午7:13写道:
>
>> Hi team,
>>
>> We want to know if Flink has some dynamic configuration of the checkpoint
>> interval. Our use case has a cold start phase where the entire dataset is
>> replayed from the beginning until the most recent ones.
>>
>> In the cold start phase, the resources are fully utilized and the
>> backpressure is high for all upstream operators, causing the checkpoint
>> timeout constantly. The real production traffic is far less than that and
>> the current provisioned resource is capable of handling it.
>>
>> We're thinking if Flink can support the dynamic checkpoint config to
>> bypass the checkpoint operation or make it less frequent on the cold start
>> phase to speed up the process, while making the checkpoint normal again
>> once the cold start is completed.
>>
>> --
>> *Best wishes,*
>> *- Kai*
>>
>

-- 
*Best wishes,*
*- Kai*


Dynamic configuration of Flink checkpoint interval

2021-05-30 Thread Kai Fu
Hi team,

We want to know if Flink has some dynamic configuration of the checkpoint
interval. Our use case has a cold start phase where the entire dataset is
replayed from the beginning until the most recent ones.

In the cold start phase, the resources are fully utilized and the
backpressure is high for all upstream operators, causing the checkpoint
timeout constantly. The real production traffic is far less than that and
the current provisioned resource is capable of handling it.

We're thinking if Flink can support the dynamic checkpoint config to bypass
the checkpoint operation or make it less frequent on the cold start phase
to speed up the process, while making the checkpoint normal again once the
cold start is completed.

-- 
*Best wishes,*
*- Kai*


Error about "Rejected TaskExecutor registration at the ResourceManger"

2021-05-29 Thread Kai Fu
Hi team,

We encountered an issue during recovery from checkpoint. It's recovering
because the downstream Kafka sink is full for a while and the job is failed
and keeps trying to recover(The downstream is full for about 4 hours). The
job cannot recover from checkpoint successfully even if after we scaled up
the Kafka cluster and shows the following exception. Is there any guidance
on how to locate and avoid this kind of issue?



































*2021-05-30 01:31:21,419 INFO
 org.apache.flink.runtime.taskexecutor.TaskExecutor   [] -
Connecting to ResourceManager
akka.tcp://flink@ip-10-61-98-18.ec2.internal:36833/user/rpc/resourcemanager_*().2021-05-30
01:31:21,422 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor
[] - Resolved ResourceManager address, beginning registration2021-05-30
01:31:21,427 ERROR org.apache.flink.runtime.taskexecutor.TaskExecutor
[] - Fatal error occurred in TaskExecutor
akka.tcp://flink@ip-10-61-98-204.ec2.internal:36631/user/rpc/taskmanager_0.org.apache.flink.util.FlinkException:
The TaskExecutor's registration at the ResourceManager
akka.tcp://flink@ip-10-61-98-18.ec2.internal:36833/user/rpc/resourcemanager_*
has been rejected: Rejected TaskExecutor registration at the ResourceManger
because: The ResourceManager does not recognize this TaskExecutor.
at
org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerRegistrationListener.onRegistrationRejection(TaskExecutor.java:2254)
~[flink-dist_2.11-1.13.1.jar:1.13.1]at
org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerRegistrationListener.onRegistrationRejection(TaskExecutor.java:2209)
~[flink-dist_2.11-1.13.1.jar:1.13.1]at
org.apache.flink.runtime.taskexecutor.TaskExecutorToResourceManagerConnection.onRegistrationRejection(TaskExecutorToResourceManagerConnection.java:109)
~[flink-dist_2.11-1.13.1.jar:1.13.1]at
org.apache.flink.runtime.taskexecutor.TaskExecutorToResourceManagerConnection.onRegistrationRejection(TaskExecutorToResourceManagerConnection.java:40)
~[flink-dist_2.11-1.13.1.jar:1.13.1]at
org.apache.flink.runtime.registration.RegisteredRpcConnection.lambda$createNewRegistration$0(RegisteredRpcConnection.java:269)
~[flink-dist_2.11-1.13.1.jar:1.13.1]at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
~[?:1.8.0_272]at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
~[?:1.8.0_272]at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
~[?:1.8.0_272]at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
~[flink-dist_2.11-1.13.1.jar:1.13.1]at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
~[flink-dist_2.11-1.13.1.jar:1.13.1]at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
~[flink-dist_2.11-1.13.1.jar:1.13.1]at
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
[flink-dist_2.11-1.13.1.jar:1.13.1]at
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
[flink-dist_2.11-1.13.1.jar:1.13.1]at
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
[flink-dist_2.11-1.13.1.jar:1.13.1]at
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
[flink-dist_2.11-1.13.1.jar:1.13.1]at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
[flink-dist_2.11-1.13.1.jar:1.13.1]at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
[flink-dist_2.11-1.13.1.jar:1.13.1]at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
[flink-dist_2.11-1.13.1.jar:1.13.1]at
akka.actor.Actor$class.aroundReceive(Actor.scala:517)
[flink-dist_2.11-1.13.1.jar:1.13.1]at
akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
[flink-dist_2.11-1.13.1.jar:1.13.1]at
akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
[flink-dist_2.11-1.13.1.jar:1.13.1]at
akka.actor.ActorCell.invoke(ActorCell.scala:561)
[flink-dist_2.11-1.13.1.jar:1.13.1]at
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
[flink-dist_2.11-1.13.1.jar:1.13.1]at
akka.dispatch.Mailbox.run(Mailbox.scala:225)
[flink-dist_2.11-1.13.1.jar:1.13.1]at
akka.dispatch.Mailbox.exec(Mailbox.scala:235)
[flink-dist_2.11-1.13.1.jar:1.13.1]at
akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
[flink-dist_2.11-1.13.1.jar:1.13.1]at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
[flink-dist_2.11-1.13.1.jar:1.13.1]at
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
[flink-dist_2.11-1.13.1.jar:1.13.1]at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[flink-dist_2.11-1.13.1.jar:1.13.1]2021-05-30 

Re: UniqueKey constraint is lost with multiple sources join in SQL

2021-04-08 Thread Kai Fu
As identified with the community, it's bug and more information in issue
https://issues.apache.org/jira/browse/FLINK-22113

On Sat, Apr 3, 2021 at 8:43 PM Kai Fu  wrote:

> Hi team,
>
> We have a use case to join multiple data sources to generate a
> continuous updated view. We defined primary key constraint on all the input
> sources and all the keys are the subsets in the join condition. All joins
> are left join.
>
> In our case, the first two inputs can produce *JoinKeyContainsUniqueKey *input
> sepc, which is good and performant. While when it comes to the third input
> source, it's joined with the intermediate output table of the first two
> input tables, and the intermediate table does not carry key constraint
> information(although the thrid source input table does), so it results in a
> *NoUniqueKey* input sepc. Given NoUniqueKey inputs has dramatic
> performance implications per the Force Join Unique Key
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Force-Join-Unique-Key-td39521.html#a39651>
> email thread, we want to know if there is any mitigation plan for this.
>
> One solution I can come up with is to write the intermediate result into
> some place like Kafka with unique constraint and join with the
> third source, while it requires extra resources. Any other suggestion on
> this? Thanks.
>
> --
> *Best regards,*
> *- Kai*
>


-- 
*Best wishes,*
*- Kai*


Re: Re: Meaning of checkpointStartDelayNanos

2021-04-04 Thread Kai Fu
Thank you for the clarification Yun, it helps.

*-- Best wishes*
*Kai*

On Mon, Apr 5, 2021 at 12:03 PM Yun Gao  wrote:

> Hi Kai,
>
> Yes, you are basically right, one minor point is that the start time is
> taken as the time that the checkpoint get intiated in the JM side.
>
> Best,
>  Yun
>
>
> --Original Mail ------
> *Sender:*Kai Fu 
> *Send Date:*Mon Apr 5 09:31:58 2021
> *Recipients:*user 
> *Subject:*Re: Meaning of checkpointStartDelayNanos
>
>> I found its meaning in the code
>> <https://github.com/apache/flink/blob/99c2a415e9eeefafacf70762b6f54070f7911ceb/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L887-L892>.
>> It means the delay of checkpoint action when the checkpoint barrier comes
>> to the current operator since it's intiated in the source.
>>
>>
>> On Mon, Apr 5, 2021 at 9:21 AM Kai Fu  wrote:
>>
>>> Hi team,
>>>
>>> I'm a little confused by the meaning of *checkpointStartDelayNanos*, I
>>> do not understand what time it exactly means, but it seems it's a quite
>>> important indicator for checkpoint/backpresure.  The explanation of it on 
>>> metrics
>>> page
>>> <https://ci.apache.org/projects/flink/flink-docs-stable/ops/metrics.html> 
>>> does
>>> not help too much. Can someone help to explain it more clearly?
>>>
>>> --
>>> *Best regards,*
>>> *- Kai*
>>>
>>
>>
>> --
>> *Best regards,*
>> *- Kai*
>>
>

-- 
*Best regards,*
*- Kai*


Re: Questions about checkpointAlignmentTime in unaligned checkpoint

2021-04-04 Thread Kai Fu
Hi Yun,

Thank you for the explanation, it clarifies a lot.

*-- Best wishes*
*Kai*

On Mon, Apr 5, 2021 at 12:13 PM Yun Gao  wrote:

> Hi Kai,
>
> Under unaligned checkpoint settings, there are still alignment process.
> Although
> the task could snapshot the state of the operators on received the first
> barrier and
> emit barriers to the following tasks, it still need to wait till all the
> barriers to be received
> before finalize the checkpoint, and during this process, it need to
> snapshot the buffers
> that are skipped by the barrier, and the final snapshot would compose of
> both the operator
> snapshots and the snapshots of the skipped buffers.
>
> Therefore, the *checkpointAlignmentTime* metric still exists.
>
> Best,
> Yun
>
>
> ------Original Mail --
> *Sender:*Kai Fu 
> *Send Date:*Mon Apr 5 09:18:39 2021
> *Recipients:*user 
> *Subject:*Questions about checkpointAlignmentTime in unaligned checkpoint
>
>> Hi team,
>>
>> I'm observing the metrics reporter still emits *checkpointAlignmentTime 
>> *metric
>> in the unaligned checkpoint setting as shown in the figure below. Is it a
>> meaningful in unaligned checkpoint, since I suppose the alignment operation
>> only happens in aligned checkpoint.
>>
>> [image: image.png]
>>
>> --
>> *Best regards,*
>> *- Kai*
>>
>

-- 
*Best regards,*
*- Kai*


Re: Meaning of checkpointStartDelayNanos

2021-04-04 Thread Kai Fu
I found its meaning in the code
<https://github.com/apache/flink/blob/99c2a415e9eeefafacf70762b6f54070f7911ceb/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L887-L892>.
It means the delay of checkpoint action when the checkpoint barrier comes
to the current operator since it's intiated in the source.


On Mon, Apr 5, 2021 at 9:21 AM Kai Fu  wrote:

> Hi team,
>
> I'm a little confused by the meaning of *checkpointStartDelayNanos*, I do
> not understand what time it exactly means, but it seems it's a quite
> important indicator for checkpoint/backpresure.  The explanation of it on 
> metrics
> page
> <https://ci.apache.org/projects/flink/flink-docs-stable/ops/metrics.html> does
> not help too much. Can someone help to explain it more clearly?
>
> --
> *Best regards,*
> *- Kai*
>


-- 
*Best regards,*
*- Kai*


Meaning of checkpointStartDelayNanos

2021-04-04 Thread Kai Fu
Hi team,

I'm a little confused by the meaning of *checkpointStartDelayNanos*, I do
not understand what time it exactly means, but it seems it's a quite
important indicator for checkpoint/backpresure.  The explanation of it
on metrics
page
 does
not help too much. Can someone help to explain it more clearly?

-- 
*Best regards,*
*- Kai*


UniqueKey constraint is lost with multiple sources join in SQL

2021-04-03 Thread Kai Fu
Hi team,

We have a use case to join multiple data sources to generate a
continuous updated view. We defined primary key constraint on all the input
sources and all the keys are the subsets in the join condition. All joins
are left join.

In our case, the first two inputs can produce *JoinKeyContainsUniqueKey *input
sepc, which is good and performant. While when it comes to the third input
source, it's joined with the intermediate output table of the first two
input tables, and the intermediate table does not carry key constraint
information(although the thrid source input table does), so it results in a
*NoUniqueKey* input sepc. Given NoUniqueKey inputs has dramatic performance
implications per the Force Join Unique Key

email thread, we want to know if there is any mitigation plan for this.

One solution I can come up with is to write the intermediate result into
some place like Kafka with unique constraint and join with the
third source, while it requires extra resources. Any other suggestion on
this? Thanks.

-- 
*Best regards,*
*- Kai*