Re: Using Flink's Kubernetes API inside Java

2021-07-02 Thread Alexis Sarda-Espinosa
Hi Roman,

In the operator I mentioned I see logic like the one here: 
https://github.com/wangyang0918/flink-native-k8s-operator/blob/a60a9826d4bcdaa4f23cf296d95954b9f6f328c3/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkApplicationController.java#L169

For instance, DefaultClusterClientServiceLoader is annotated with @Internal: 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/client/deployment/DefaultClusterClientServiceLoader.html

And just like ApplicationClusterDeployer is in charge of Application Mode 
(according to the 
Javadoc),
 I was wondering if there's something similar for Session Mode.

Perhaps I should subscribe to the developer mailing list? Although I guess 
that's part of the question, whether those classes count as User API or 
Developer API.

I took a quick glance at the operator you mentioned, but I'm hoping I can make 
use of Flink's new support for pod templates to make it as similar as possible 
to a native Deployment resource.

Regards,
Alexis.

From: Roman Khachatryan 
Sent: Friday, July 2, 2021 9:19 PM
To: Alexis Sarda-Espinosa ; Yang Wang 

Cc: user@flink.apache.org 
Subject: Re: Using Flink's Kubernetes API inside Java

Hi Alexis,

Have you looked at flink-on-k8s-operator [1]?
It seems to have the functionality you need:
https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/0310df76d6e2128cd5d2bc51fae4e842d370c463/controllers/flinkcluster_reconciler.go#L569

I couldn't find many Flink-specific classes in the operator you
mentioned, but in general classes annotated with Public and
PublicEvolving are unlikely to change if that's your concern.

Also pulling in Yang Wang.

[1]
https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/


Regards,
Roman


On Thu, Jul 1, 2021 at 7:49 PM Alexis Sarda-Espinosa
 wrote:
>
> Hello everyone,
>
> I'm testing a custom Kubernetes operator that should fulfill some specific 
> requirements I have for Flink. I know of this WIP project: 
> https://github.com/wangyang0918/flink-native-k8s-operator
>
> I can see that it uses some classes that aren't publicly documented, and I 
> believe it doesn't cover Flink K8s sessions, which I would like to use.
>
> Is there something I could use for Flink K8s sessions? And is it ok if I use 
> these classes knowing that I might need adjustments for future Flink versions?
>
> Regards,
> Alexis.
>


Re: Understanding recovering from savepoint / checkpoint with additional operators when chaining

2021-07-02 Thread Roman Khachatryan
Hi,

Just to clarify, you are recovering from a savepoint, not a retained
checkpoint, right?
And how are you setting the operator IDs?
You mentioned that with allowNonRestoredState set to false recovery fails.
Does it succeed with this flag set to true?

Answering your questions:
Each operator state has the corresponding ID in the snapshot (though
technically the snapshot for the chain is sent as a single object to the
JM).
Probably some intermediate operators have state. How do you verify that
they don't? Exception message could probably help to identify the
problematic operators.

Regards,
Roman


On Fri, Jul 2, 2021 at 7:52 PM Jiahui Jiang  wrote:

> Hello Flink,
>
> I'm trying to understand the state recovery mechanism when there are extra
> stateless operators.
>
> I'm using flink-sql, and I tested a 'select `number_col` from source'
> query, where the stream graph looks like:
>
> `source (stateful with fixed uid) -> [several stateless operators
> translated by Flink] -> sink (stateful with fixed uid)`
>
> I have enabled chaining, so these operators are all chaining into one task
> vertex.
>
>
> According to Flink's docs, I should be able to start a new job with
> different job graph as long as all the previous stateful operators can
> still be found in the graph.
> But when I tested recovery from the previous state with a new query
> 'select `1` from source'.
>
> The generated stream graph had one extra stateless operator, but failed to
> recover when there allowNonRestoredState was set to false.
>
> I'm wondering how's Flink storing operator state when chaining is enabled?
> Does it (1) store each operator state separately (source and sink has its
> own entry in the checkpoint state) or (2) store the state for all the
> operators chained into the same subtask (source, sink, all the SQL
> transformation operators) all under the same operator ID?
>
> In this experiement I have fixed the source and sink's uids, why does that
> seem to not have an effect on forcing the stateful operators to recover
> from its own state?
>
> Thank you!
>
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/savepoints/#savepoint-state
> Savepoints | Apache Flink
> 
> Savepoints # What is a Savepoint? How is a Savepoint different from a
> Checkpoint? # A Savepoint is a consistent image of the execution state of a
> streaming job, created via Flink’s checkpointing mechanism. You can use
> Savepoints to stop-and-resume, fork, or update your Flink jobs. Savepoints
> consist of two parts: a directory with (typically large) binary files on
> stable storage (e.g. HDFS ...
> ci.apache.org
>
>


Re: Using Flink's Kubernetes API inside Java

2021-07-02 Thread Roman Khachatryan
Hi Alexis,

Have you looked at flink-on-k8s-operator [1]?
It seems to have the functionality you need:
https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/0310df76d6e2128cd5d2bc51fae4e842d370c463/controllers/flinkcluster_reconciler.go#L569

I couldn't find many Flink-specific classes in the operator you
mentioned, but in general classes annotated with Public and
PublicEvolving are unlikely to change if that's your concern.

Also pulling in Yang Wang.

[1]
https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/


Regards,
Roman


On Thu, Jul 1, 2021 at 7:49 PM Alexis Sarda-Espinosa
 wrote:
>
> Hello everyone,
>
> I'm testing a custom Kubernetes operator that should fulfill some specific 
> requirements I have for Flink. I know of this WIP project: 
> https://github.com/wangyang0918/flink-native-k8s-operator
>
> I can see that it uses some classes that aren't publicly documented, and I 
> believe it doesn't cover Flink K8s sessions, which I would like to use.
>
> Is there something I could use for Flink K8s sessions? And is it ok if I use 
> these classes knowing that I might need adjustments for future Flink versions?
>
> Regards,
> Alexis.
>


Re: Flink State Processor API Example - Java

2021-07-02 Thread Roman Khachatryan
Hi Sandeep,

Could you provide the error stack trace and Flink version you are using?

Regards,
Roman

On Fri, Jul 2, 2021 at 6:42 PM Sandeep khanzode  wrote:
>
> Hi Guowei,
>
> I followed the document, but somehow, I am unable to get a working Java 
> example for Avro state.
>
> So, I tried to simply use the Java SpecificRecords created by Avro Maven 
> Plugin and inject. Now, that works correctly, but I use Avro 1.7.7 since it 
> is the last version that I saw which does not put a serialVersionUid in the 
> generated SpecificRecord.
>
> How can I use a Avro SpecificRecord generated by 1.8.0 if it use a 
> serialVersionUid because the moment I change something in the Avro schema and 
> regenerate the SpecificRecord, I will get a class incompatible error when 
> Flink tried to deserialise.
>
>
> Thanks,
> Sandip
>
>
>
> On 25-Jun-2021, at 10:25 AM, Guowei Ma  wrote:
>
> Hi Sandeep
>
> What I understand is that you want to manipulate the state. So I think you 
> could use the old schema to read the state first, and then write it to a new 
> schema, instead of using a new schema to read an old schema format data.
> In addition, I would like to ask, if you want to do "State Schema Evolution" 
> ? Flink currently supports avro+pojo's schema evolution[1], and you don't 
> need to do this manually.
>
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/schema_evolution/#supported-data-types-for-schema-evolution
>
> Best,
> Guowei
>
>
> On Fri, Jun 25, 2021 at 3:04 AM Sandeep khanzode  
> wrote:
>>
>> Hello,
>>
>> 1.] Can someone please share a working example of how to read 
>> ValueState and MapState from a checkpoint and update it? I tried 
>> to assemble a working Java example but there are bit and pieces of info 
>> around.
>>
>> 2.] I am using Avro 1.7.7 with Flink for state entities since versions 
>> belong Avro 1.7.7 add a serialVersionUid and then I cannot replace the class 
>> with a new Avro schema seamlessly. If I update the Avro schema, and the Avro 
>> Maven plugin runs, a new class with a new serialVersionUid is created and 
>> that cannot be replaced in the state with the Java exception stating that 
>> local and state copies are different.  Any example would be really 
>> appreciated.
>>
>> Thanks,
>> Sandip
>
>


Understanding recovering from savepoint / checkpoint with additional operators when chaining

2021-07-02 Thread Jiahui Jiang
Hello Flink,

I'm trying to understand the state recovery mechanism when there are extra 
stateless operators.

I'm using flink-sql, and I tested a 'select `number_col` from source' query, 
where the stream graph looks like:

`source (stateful with fixed uid) -> [several stateless operators translated by 
Flink] -> sink (stateful with fixed uid)`

I have enabled chaining, so these operators are all chaining into one task 
vertex.


According to Flink's docs, I should be able to start a new job with different 
job graph as long as all the previous stateful operators can still be found in 
the graph.
But when I tested recovery from the previous state with a new query 'select `1` 
from source'.

The generated stream graph had one extra stateless operator, but failed to 
recover when there allowNonRestoredState was set to false.

I'm wondering how's Flink storing operator state when chaining is enabled?
Does it (1) store each operator state separately (source and sink has its own 
entry in the checkpoint state) or (2) store the state for all the operators 
chained into the same subtask (source, sink, all the SQL transformation 
operators) all under the same operator ID?

In this experiement I have fixed the source and sink's uids, why does that seem 
to not have an effect on forcing the stateful operators to recover from its own 
state?

Thank you!


https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/savepoints/#savepoint-state
Savepoints | Apache 
Flink
Savepoints # What is a Savepoint? How is a Savepoint different from a 
Checkpoint? # A Savepoint is a consistent image of the execution state of a 
streaming job, created via Flink’s checkpointing mechanism. You can use 
Savepoints to stop-and-resume, fork, or update your Flink jobs. Savepoints 
consist of two parts: a directory with (typically large) binary files on stable 
storage (e.g. HDFS ...
ci.apache.org



Re: Flink State Processor API Example - Java

2021-07-02 Thread Sandeep khanzode
Hi Guowei,

I followed the document, but somehow, I am unable to get a working Java example 
for Avro state.

So, I tried to simply use the Java SpecificRecords created by Avro Maven Plugin 
and inject. Now, that works correctly, but I use Avro 1.7.7 since it is the 
last version that I saw which does not put a serialVersionUid in the generated 
SpecificRecord. 

How can I use a Avro SpecificRecord generated by 1.8.0 if it use a 
serialVersionUid because the moment I change something in the Avro schema and 
regenerate the SpecificRecord, I will get a class incompatible error when Flink 
tried to deserialise.


Thanks,
Sandip



> On 25-Jun-2021, at 10:25 AM, Guowei Ma  wrote:
> 
> Hi Sandeep
> 
> What I understand is that you want to manipulate the state. So I think you 
> could use the old schema to read the state first, and then write it to a new 
> schema, instead of using a new schema to read an old schema format data.
> In addition, I would like to ask, if you want to do "State Schema Evolution" 
> ? Flink currently supports avro+pojo's schema evolution[1], and you don't 
> need to do this manually.
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/schema_evolution/#supported-data-types-for-schema-evolution
>  
> 
> 
> Best,
> Guowei
> 
> 
> On Fri, Jun 25, 2021 at 3:04 AM Sandeep khanzode  > wrote:
> Hello,
> 
> 1.] Can someone please share a working example of how to read 
> ValueState and MapState from a checkpoint and update it? I tried 
> to assemble a working Java example but there are bit and pieces of info 
> around. 
> 
> 2.] I am using Avro 1.7.7 with Flink for state entities since versions belong 
> Avro 1.7.7 add a serialVersionUid and then I cannot replace the class with a 
> new Avro schema seamlessly. If I update the Avro schema, and the Avro Maven 
> plugin runs, a new class with a new serialVersionUid is created and that 
> cannot be replaced in the state with the Java exception stating that local 
> and state copies are different.  Any example would be really appreciated.
> 
> Thanks,
> Sandip



Re: Memory Usage - Total Memory Usage on UI and Metric

2021-07-02 Thread Ken Krugler
When we run Flink jobs in EMR (typically batch, though) we disable the pmem 
(permanent memory) and vmem (virtual memory) checks.

This was initially done for much older versions of Flink (1.6???), where the 
memory model wasn’t so well documented or understood by us.

But I think the pmem check might still have an issue, due to Flink’s use of 
off-heap.

So something like:

[
{
"classification": "yarn-site",
"properties": {
"yarn.nodemanager.pmem-check-enabled": "false",
"yarn.nodemanager.vmem-check-enabled": "false"
}
}
]


…might help.

— Ken


> On Jul 2, 2021, at 8:36 AM, bat man  wrote:
> 
> Hi,
> 
> I am running a streaming job (Flink 1.9) on EMR on yarn. Flink web UI or 
> metrics reported from prometheus shows total memory usage within specified 
> task manager memory - 3GB. 
> 
> Metrics shows below numbers(in MB) -
> Heap - 577
> Non Heap - 241
> DirectMemoryUsed - 852
> 
> Non-heap does rise gradually, starting around 210MB and reaching 241 when 
> yarn kills the container. Heap fluctuates between 1.x - .6GB, 
> DirectMemoryUsed is constant at 852.
> 
> Based on configurations these are the tm params from yarn logs - 
> -Xms1957m -Xmx1957m -XX:MaxDirectMemorySize=1115m 
> 
> These are other params as configuration in flink-conf
> yarn-cutoff - 270MB
> Managed memory - 28MB
> Network memory - 819MB
> 
> Above memory values are from around the same time the container is killed by 
> yarn for -  is running beyond physical memory limits.
> 
> Is there anything else which is not reported by flink in metrics or I have 
> been misinterpreting as seen from above total memory consumed is below - 3GB.
> 
> Same behavior is reported when I have run the job with 2GB, 2.7GB and now 3GB 
> task mem. My job does have shuffles as data from one operator is sent to 4 
> other operators after filtering.
> 
> One more thing is I am running this with 3 yarn containers(2 tasks in each 
> container), total parallelism as 6. As soon as one container fails with this 
> error, the job re-starts. However, within minutes other 2 containers also 
> fail with the same error one by one.
> 
> Thanks,
> Hemant

--
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch





Memory Usage - Total Memory Usage on UI and Metric

2021-07-02 Thread bat man
Hi,

I am running a streaming job (Flink 1.9) on EMR on yarn. Flink web UI or
metrics reported from prometheus shows total memory usage within specified
task manager memory - 3GB.

Metrics shows below numbers(in MB) -
Heap - 577
Non Heap - 241
DirectMemoryUsed - 852

Non-heap does rise gradually, starting around 210MB and reaching 241 when
yarn kills the container. Heap fluctuates between 1.x - .6GB,
DirectMemoryUsed is constant at 852.

Based on configurations these are the tm params from yarn logs -
-Xms1957m -Xmx1957m -XX:MaxDirectMemorySize=1115m

These are other params as configuration in flink-conf
yarn-cutoff - 270MB
Managed memory - 28MB
Network memory - 819MB

Above memory values are from around the same time the container is killed
by yarn for -  is running beyond physical memory limits.

Is there anything else which is not reported by flink in metrics or I have
been misinterpreting as seen from above total memory consumed is below -
3GB.

Same behavior is reported when I have run the job with 2GB, 2.7GB and now
3GB task mem. My job does have shuffles as data from one operator is sent
to 4 other operators after filtering.

One more thing is I am running this with 3 yarn containers(2 tasks in each
container), total parallelism as 6. As soon as one container fails with
this error, the job re-starts. However, within minutes other 2 containers
also fail with the same error one by one.

Thanks,
Hemant


Re: owasp-dependency-check is flagging flink 1.13 for scala 2.12.7

2021-07-02 Thread Chesnay Schepler
Its unlikely to be relevant for you since the vulnerability only affects 
the scaladocs, i.e., documentation.


On 7/2/2021 2:10 PM, Debraj Manna wrote:

Hi,

I was running owasp-dependency-check 
 in a java 
application based on flink-1.13.0 (scala 2.12). scala 2.12.7 was 
getting flagged for this 
. 



Relevant Dependency for this -

FO] +- org.apache.flink:flink-streaming-java_2.12:jar:1.13.0:provided
[INFO] |  +- org.apache.flink:flink-file-sink-common:jar:1.13.0:provided
[INFO] |  +- org.apache.flink:flink-runtime_2.12:jar:1.13.0:compile
[INFO] |  |  +- 
org.apache.flink:flink-queryable-state-client-java:jar:1.13.0:compile

[INFO] |  |  +- org.apache.flink:flink-hadoop-fs:jar:1.13.0:compile
[INFO] |  |  +- commons-io:commons-io:jar:2.7:compile
[INFO] |  |  +- 
org.apache.flink:flink-shaded-netty:jar:4.1.49.Final-13.0:compile
[INFO] |  |  +- 
org.apache.flink:flink-shaded-jackson:jar:2.12.1-13.0:compile
[INFO] |  |  +- 
org.apache.flink:flink-shaded-zookeeper-3:jar:3.4.14-13.0:compile

[INFO] |  |  +- org.javassist:javassist:jar:3.24.0-GA:compile
[INFO] |  |  +- org.scala-lang:scala-library:jar:2.12.7:compile

Can anyone suggest if flink app is vulnerable to this or can safely be 
ignored?


Thanks





Re: Job Recovery Time on TM Lost

2021-07-02 Thread Till Rohrmann
Could you share the full logs with us for the second experiment, Lu? I
cannot tell from the top of my head why it should take 30s unless you have
configured a restart delay of 30s.

Let's discuss FLINK-23216 on the JIRA ticket, Gen.

I've now implemented FLINK-23209 [1] but it somehow has the problem that in
a flakey environment you might not want to mark a TaskExecutor dead on the
first connection loss. Maybe this is something we need to make configurable
(e.g. introducing a threshold which admittedly is similar to the heartbeat
timeout) so that the user can configure it for her environment. On the
upside, if you mark the TaskExecutor dead on the first connection loss
(assuming you have a stable network environment), then it can now detect
lost TaskExecutors as fast as the heartbeat interval.

[1] https://issues.apache.org/jira/browse/FLINK-23209

Cheers,
Till

On Fri, Jul 2, 2021 at 9:33 AM Gen Luo  wrote:

> Thanks for sharing, Till and Yang.
>
> @Lu
> Sorry but I don't know how to explain the new test with the log. Let's
> wait for others' reply.
>
> @Till
> It would be nice if JIRAs could be fixed. Thanks again for proposing them.
>
> In addition, I was tracking an issue that RM keeps allocating and freeing
> slots after a TM lost until its heartbeat timeout, when I found the
> recovery costing as long as heartbeat timeout. That should be a minor bug
> introduced by declarative resource management. I have created a JIRA about
> the problem [1] and  we can discuss it there if necessary.
>
> [1] https://issues.apache.org/jira/browse/FLINK-23216
>
> Lu Niu  于2021年7月2日周五 上午3:13写道:
>
>> Another side question, Shall we add metric to cover the complete
>> restarting time (phase 1 + phase 2)? Current metric jm.restartingTime only
>> covers phase 1. Thanks!
>>
>> Best
>> Lu
>>
>> On Thu, Jul 1, 2021 at 12:09 PM Lu Niu  wrote:
>>
>>> Thanks TIll and Yang for help! Also Thanks Till for a quick fix!
>>>
>>> I did another test yesterday. In this test, I intentionally throw
>>> exception from the source operator:
>>> ```
>>> if (runtimeContext.getIndexOfThisSubtask() == 1
>>> && errorFrenquecyInMin > 0
>>> && System.currentTimeMillis() - lastStartTime >=
>>> errorFrenquecyInMin * 60 * 1000) {
>>>   lastStartTime = System.currentTimeMillis();
>>>   throw new RuntimeException(
>>>   "Trigger expected exception at: " + lastStartTime);
>>> }
>>> ```
>>> In this case, I found phase 1 still takes about 30s and Phase 2 dropped
>>> to 1s (because no need for container allocation). Why phase 1 still takes
>>> 30s even though no TM is lost?
>>>
>>> Related logs:
>>> ```
>>> 2021-06-30 00:55:07,463 INFO
>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph- Source:
>>> USER_MATERIALIZED_EVENT_SIGNAL-user_context-frontend_event_source -> ...
>>> java.lang.RuntimeException: Trigger expected exception at: 1625014507446
>>> 2021-06-30 00:55:07,509 INFO
>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
>>> NRTG_USER_MATERIALIZED_EVENT_SIGNAL_user_context_staging
>>> (35c95ee7141334845cfd49b642fa9f98) switched from state RUNNING to
>>> RESTARTING.
>>> 2021-06-30 00:55:37,596 INFO
>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
>>> NRTG_USER_MATERIALIZED_EVENT_SIGNAL_user_context_staging
>>> (35c95ee7141334845cfd49b642fa9f98) switched from state RESTARTING to
>>> RUNNING.
>>> 2021-06-30 00:55:38,678 INFO
>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph(time when
>>> all tasks switch from CREATED to RUNNING)
>>> ```
>>> Best
>>> Lu
>>>
>>>
>>> On Thu, Jul 1, 2021 at 12:06 PM Lu Niu  wrote:
>>>
 Thanks TIll and Yang for help! Also Thanks Till for a quick fix!

 I did another test yesterday. In this test, I intentionally throw
 exception from the source operator:
 ```
 if (runtimeContext.getIndexOfThisSubtask() == 1
 && errorFrenquecyInMin > 0
 && System.currentTimeMillis() - lastStartTime >=
 errorFrenquecyInMin * 60 * 1000) {
   lastStartTime = System.currentTimeMillis();
   throw new RuntimeException(
   "Trigger expected exception at: " + lastStartTime);
 }
 ```
 In this case, I found phase 1 still takes about 30s and Phase 2 dropped
 to 1s (because no need for container allocation).

 Some logs:
 ```
 ```


 On Thu, Jul 1, 2021 at 6:28 AM Till Rohrmann 
 wrote:

> A quick addition, I think with FLINK-23202 it should now also be
> possible to improve the heartbeat mechanism in the general case. We can
> leverage the unreachability exception thrown if a remote target is no
> longer reachable to mark an heartbeat target as no longer reachable [1].
> This can then be considered as if the heartbeat timeout has been 
> triggered.
> That way we should detect lost TaskExecutors as fast as our heartbeat
> interval is.
>
> [1] 

owasp-dependency-check is flagging flink 1.13 for scala 2.12.7

2021-07-02 Thread Debraj Manna
Hi,

I was running owasp-dependency-check
 in a java application
based on flink-1.13.0 (scala 2.12). scala 2.12.7 was getting flagged for
this

.

Relevant Dependency for this -

FO] +- org.apache.flink:flink-streaming-java_2.12:jar:1.13.0:provided
[INFO] |  +- org.apache.flink:flink-file-sink-common:jar:1.13.0:provided
[INFO] |  +- org.apache.flink:flink-runtime_2.12:jar:1.13.0:compile
[INFO] |  |  +-
org.apache.flink:flink-queryable-state-client-java:jar:1.13.0:compile
[INFO] |  |  +- org.apache.flink:flink-hadoop-fs:jar:1.13.0:compile
[INFO] |  |  +- commons-io:commons-io:jar:2.7:compile
[INFO] |  |  +-
org.apache.flink:flink-shaded-netty:jar:4.1.49.Final-13.0:compile
[INFO] |  |  +-
org.apache.flink:flink-shaded-jackson:jar:2.12.1-13.0:compile
[INFO] |  |  +-
org.apache.flink:flink-shaded-zookeeper-3:jar:3.4.14-13.0:compile
[INFO] |  |  +- org.javassist:javassist:jar:3.24.0-GA:compile
[INFO] |  |  +- org.scala-lang:scala-library:jar:2.12.7:compile

Can anyone suggest if flink app is vulnerable to this or can safely be
ignored?

Thanks


flink dataset api join报错

2021-07-02 Thread Eric Yang


hello,
您好,打扰到大家了,我使用的是flink的dataset的API,多个数据集进行join,比如,a join b join 
c,最终运行报错了,报错如下:
 我是需要在join的时候指定JoinHint策略么,求助,真的很感谢!







| |
Eric Yang
|
|
ymj7...@163.com
|
签名由网易邮箱大师定制



Re: Job Recovery Time on TM Lost

2021-07-02 Thread Gen Luo
Thanks for sharing, Till and Yang.

@Lu
Sorry but I don't know how to explain the new test with the log. Let's wait
for others' reply.

@Till
It would be nice if JIRAs could be fixed. Thanks again for proposing them.

In addition, I was tracking an issue that RM keeps allocating and freeing
slots after a TM lost until its heartbeat timeout, when I found the
recovery costing as long as heartbeat timeout. That should be a minor bug
introduced by declarative resource management. I have created a JIRA about
the problem [1] and  we can discuss it there if necessary.

[1] https://issues.apache.org/jira/browse/FLINK-23216

Lu Niu  于2021年7月2日周五 上午3:13写道:

> Another side question, Shall we add metric to cover the complete
> restarting time (phase 1 + phase 2)? Current metric jm.restartingTime only
> covers phase 1. Thanks!
>
> Best
> Lu
>
> On Thu, Jul 1, 2021 at 12:09 PM Lu Niu  wrote:
>
>> Thanks TIll and Yang for help! Also Thanks Till for a quick fix!
>>
>> I did another test yesterday. In this test, I intentionally throw
>> exception from the source operator:
>> ```
>> if (runtimeContext.getIndexOfThisSubtask() == 1
>> && errorFrenquecyInMin > 0
>> && System.currentTimeMillis() - lastStartTime >=
>> errorFrenquecyInMin * 60 * 1000) {
>>   lastStartTime = System.currentTimeMillis();
>>   throw new RuntimeException(
>>   "Trigger expected exception at: " + lastStartTime);
>> }
>> ```
>> In this case, I found phase 1 still takes about 30s and Phase 2 dropped
>> to 1s (because no need for container allocation). Why phase 1 still takes
>> 30s even though no TM is lost?
>>
>> Related logs:
>> ```
>> 2021-06-30 00:55:07,463 INFO
>>  org.apache.flink.runtime.executiongraph.ExecutionGraph- Source:
>> USER_MATERIALIZED_EVENT_SIGNAL-user_context-frontend_event_source -> ...
>> java.lang.RuntimeException: Trigger expected exception at: 1625014507446
>> 2021-06-30 00:55:07,509 INFO
>>  org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
>> NRTG_USER_MATERIALIZED_EVENT_SIGNAL_user_context_staging
>> (35c95ee7141334845cfd49b642fa9f98) switched from state RUNNING to
>> RESTARTING.
>> 2021-06-30 00:55:37,596 INFO
>>  org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
>> NRTG_USER_MATERIALIZED_EVENT_SIGNAL_user_context_staging
>> (35c95ee7141334845cfd49b642fa9f98) switched from state RESTARTING to
>> RUNNING.
>> 2021-06-30 00:55:38,678 INFO
>>  org.apache.flink.runtime.executiongraph.ExecutionGraph(time when
>> all tasks switch from CREATED to RUNNING)
>> ```
>> Best
>> Lu
>>
>>
>> On Thu, Jul 1, 2021 at 12:06 PM Lu Niu  wrote:
>>
>>> Thanks TIll and Yang for help! Also Thanks Till for a quick fix!
>>>
>>> I did another test yesterday. In this test, I intentionally throw
>>> exception from the source operator:
>>> ```
>>> if (runtimeContext.getIndexOfThisSubtask() == 1
>>> && errorFrenquecyInMin > 0
>>> && System.currentTimeMillis() - lastStartTime >=
>>> errorFrenquecyInMin * 60 * 1000) {
>>>   lastStartTime = System.currentTimeMillis();
>>>   throw new RuntimeException(
>>>   "Trigger expected exception at: " + lastStartTime);
>>> }
>>> ```
>>> In this case, I found phase 1 still takes about 30s and Phase 2 dropped
>>> to 1s (because no need for container allocation).
>>>
>>> Some logs:
>>> ```
>>> ```
>>>
>>>
>>> On Thu, Jul 1, 2021 at 6:28 AM Till Rohrmann 
>>> wrote:
>>>
 A quick addition, I think with FLINK-23202 it should now also be
 possible to improve the heartbeat mechanism in the general case. We can
 leverage the unreachability exception thrown if a remote target is no
 longer reachable to mark an heartbeat target as no longer reachable [1].
 This can then be considered as if the heartbeat timeout has been triggered.
 That way we should detect lost TaskExecutors as fast as our heartbeat
 interval is.

 [1] https://issues.apache.org/jira/browse/FLINK-23209

 Cheers,
 Till

 On Thu, Jul 1, 2021 at 1:46 PM Yang Wang  wrote:

> Since you are deploying Flink workloads on Yarn, the Flink
> ResourceManager should get the container
> completion event after the heartbeat of Yarn NM->Yarn RM->Flink RM,
> which is 8 seconds by default.
> And Flink ResourceManager will release the dead TaskManager container
> once received the completion event.
> As a result, Flink will not deploy tasks onto the dead TaskManagers.
>
>
> I think most of the time cost in Phase 1 might be cancelling the tasks
> on the dead TaskManagers.
>
>
> Best,
> Yang
>
>
> Till Rohrmann  于2021年7月1日周四 下午4:49写道:
>
>> The analysis of Gen is correct. Flink currently uses its heartbeat as
>> the primary means to detect dead TaskManagers. This means that Flink will
>> take at least `heartbeat.timeout` time before the system recovers. Even 
>> if
>> the cancellation happens fast (e.g. by 

flink部署失败

2021-07-02 Thread 投资未来
您好,我部署flink k8s session 模式集群,但是jobmanager总是自动重启,taskmanager报错如下:

fencing token not set: Ignoring message remoteFencedMessage ..sent to akka 
tcp://flink@.because the fencing token is null.

 请问这个是什么原因?

flink 版本视图不触发水印导致流阻塞的问题

2021-07-02 Thread 杨光跃
 select a.card as card,a.cust as cust, b.city as city ,cast(a.ts as TIMESTAMP) 
ts,c.city
 from case3_TOPIC_A a
 left join cust_data FOR SYSTEM_TIME AS OF a.ts as b on a.cust = b.cust
 left join view_case3_TOPIC_B FOR SYSTEM_TIME AS OF a.ts as c on a.cust = 
c.cust;


view_case3_TOPIC_B  是一个版本视图,现在的问题是如果view_case3_TOPIC_B  的数据不更新,
case3_TOPIC_A 即使有新数据,也不往外输出。  
怎么才能做到 case3_TOPIC_A  有数据就会立马触发呢


| |
杨光跃
|
|
yangguangyuem...@163.com
|
签名由网易邮箱大师定制