Re: Show plan in UI not working.

2022-01-24 Thread Ingo Bürk

Hi John,

can you please submit this as an issue in JIRA? If you suspect it is 
related to other issues, just make a note of that in the issue as well. 
Thanks!



Ingo

On 23.01.22 18:05, John Smith wrote:
Just I'm case but in 1.14.x regardless of the job manager is leader or 
not. Before submitting a job of you click on "Show Plan" it just shows a 
blank window.


I'm assuming it's similar issue as the deserialozation ones.


Upgrade to 1.14.3

2022-01-24 Thread Sweta Kalakuntla
Hi,

We are on flink 1.13.3 and trying to upgrade  the cluster to 1.14.3
version. I see that job(on 1.13.3) is unable to start up because it says it
couldn't find metrics group(inside flinkkafkaconsumer class).

- can I deploy 1.13.3 job on 1.14.3 cluster?
- can I deploy 1.14.3 job on 1.13.3 cluster?
- How do I upgrade to 1.14.3 cluster without loosing running apps state? I
have even tried doing savepoint that did not revive the job.

Thank you,
Sweta


Re: Apache Flink - Can AllWindowedStream be parallel ?

2022-01-24 Thread M Singh
 
Thanks Yun for your response, but I a not sure why the doc states that in many 
cases it is non-parallel.  

On Monday, January 24, 2022, 01:30:27 AM EST, Yun Tang  
wrote:  
 
 #yiv5427582454 P {margin-top:0;margin-bottom:0;}Hi Singh,
All the output operator transformed by AllWindowedStream would be 
SingleOutputStreamOperator, which cannot be parallel.

[1] 
https://github.com/apache/flink/blob/master/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
BestYun TangFrom: M Singh 
Sent: Sunday, January 23, 2022 4:24
To: User-Flink 
Subject: Apache Flink - Can AllWindowedStream be parallel ? Hi Folks:
The documentation for AllWindowedStream 
(https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/overview/#datastream-rarr-allwindowedstream)
 has a note:

This is inmany cases a non-parallel transformation. All records will be 
gathered in one task for the windowAll operator.


Does this mean that in some cases it might be parallel ?  If so, is there an 
example of such a scenario ?
Thanks  

Re: Apache Fink - Adding/Removing KeyedStreams at run time without stopping the application

2022-01-24 Thread M Singh
 Hi Caizhi:
Thanks for your reply.
I need to aggregate streams based on dynamic groupings.  All the groupings 
(keyBy) are not known upfront and can be added or removed after the streaming 
application is started and I don't want to restart the application/change the 
code.  So, I wanted to find out, what are the options to achieve this 
functionality.  Please let me know if you have any advice or recommendations.
Thanks
On Monday, January 24, 2022, 04:28:32 AM EST, Caizhi Weng 
 wrote:  
 
 Hi!
Adding/removing keyed streams will change the topology graph of the job. 
Currently it is not possible to do so without restarting the job and as far as 
I know there is no existing framework/pattern to achieve this.
By the way, why do you need this functionality? Could you elaborate more on 
your use case?
M Singh  于2022年1月22日周六 21:32写道:

Hi Folks:
I am working on an exploratory project in which I would like to add/remove 
KeyedStreams 
(https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/overview/#keyby)
 without restarting the Flink streaming application.
Is it possible natively in Apache Flink ?  If not, is there any 
framework/pattern which can be used to implement this without restarting the 
application/changing the code ?
Thanks








  

TaskManager的Slot的释放时机

2022-01-24 Thread johnjlong
各位大佬好,请教一个问题。
我根据ResourceID主动释放TM的链接的时候,我发现TM对应的Slots仅仅是标记为free。
而其真正是释放却要等到JobMaster主动cancel整个ExecuteGraph的时候,此时会逐个调用每个定点所在的slot的TM的cancel方法。
但是此时相关联的TM已经close掉,触发了rpc超时,默认20s。然后slot才会被释放。


我的问题是:为什么不在调用TaskExecutor的cancelTask之间判断下TM是否存活,如果不存活就直接走cancel的流程,不用等rpc超时后,才进行下一步???


附上日志截图:


| |
johnjlong
|
|
johnjl...@163.com
|
签名由网易邮箱大师定制

Re: [statefun] client cert auth in remote function

2022-01-24 Thread Filip Karnicki
Cool, thanks! I'll speak to the shared cluster support team to see if they
can install our CA cert on every box. So we've got that side of
authentication sorted - flink can trust that the service is who it says it
is.

How about the other way around? Any thoughts on how I could provide a
*key*store
for the stateful functions job to use while calling remote function
services with TLS? The remote function server (undertow in our case) needs
to authenticate and authorise statefun based on the latter's cert.

Many thanks
Fil

On Mon, 24 Jan 2022 at 21:25, Igal Shilman  wrote:

> Hello Filip,
>
> As far as I know SslContextBuilder.forClient() should use the default
> trust store, so if you will install your self signed certificate in the
> community supported container image this should be picked up[1].
> The following user has reported something similar, and it seems that
> they've gone down a similar path [2].
>
> Cheers,
> Igal.
>
> [1] https://stackoverflow.com/a/35304873/4405470
> [2] https://lists.apache.org/thread/nxf7yk5ctcvndyygnvx7l34gldn0xgj3
>
>
> On Mon, Jan 24, 2022 at 7:08 PM Filip Karnicki 
> wrote:
>
>> Hi All!
>>
>> I was wondering if there's a way to secure a remote function by requiring
>> the client (flink) to use a client cert. Preferably a base64 encoded string
>> from the env properties, but that might be asking for a lot :)
>>
>> I had a look at the code, and NettySharedResources seems to use
>> SslContextBuilder.forClient(), and doesn't really seem to deal with setting
>> any kind of a keystore
>>
>> Also, I don't think that setting
>> -Djavax.net.ssl.trustStore=path/to/truststore.jks does anything, since I
>> keep getting 'unable to find valid certification path to requested target',
>> while an exported .pem from my tuststore works fine as a CA in postman
>>
>> I'm happy to contribute some code if need be, just point me in the right
>> direction
>>
>> Kind regards,
>> Fil
>>
>


Job fails to restore state properly when recovery from a checkpoint

2022-01-24 Thread Li Wang
Hi team,

We have a streaming job running with 1 JM + 4 TM in our k8s cluster.
Recently one of the TMs encountered some failure, and the job can't be
recovered from the lastest state from the checkpoint. From the log we found
something suspicious -

2022-01-21T13:38:41.296Z | FlinkStreamJob | SPNJP1 |
spaas-nj-prod01-r00n23 | namespace=s-art |
pod=flinkfileintegratehbaseindexer-taskmanager-0 | INFO |
pool-16-thread-23 | o.a.f.r.t.Task | Source:
consuming_topic_kf-ec-alerting-post-matcher-derived-alerts-ny (1/4)
(5051444b987b69d69df24df5aa3adeaf) switched from RUNNING to FAILED.
org.apache.flink.streaming.runtime.tasks.AsynchronousException:
java.lang.Exception: Could not materialize checkpoint 15931959 for
operator Source:
consuming_topic_kf-ec-alerting-post-matcher-derived-alerts-ny (1/4).
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not materialize checkpoint
15931959 for operator Source:
consuming_topic_kf-ec-alerting-post-matcher-derived-alerts-ny (1/4).
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
... 6 common frames omitted
Caused by: java.util.concurrent.ExecutionException:
java.io.IOException: Could not flush and close the file system output
stream to 
hdfs:/projects/art/spaas/flink/prod/nj/flinkfileintegratehbaseindexer/checkpoints//chk-15931959/dd64e300-ad59-4a52-9887-308b10092e5a
in order to obtain the stream state handle
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at 
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:53)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
... 5 common frames omitted
Caused by: java.io.IOException: Could not flush and close the file
system output stream to
hdfs:/projects/art/spaas/flink/prod/nj/flinkfileintegratehbaseindexer/checkpoints//chk-15931959/dd64e300-ad59-4a52-9887-308b10092e5a
in order to obtain the stream state handle
at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:328)
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:454)
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:359)
at 
org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
... 7 common frames omitted
Caused by: org.apache.hadoop.ipc.RemoteException: No lease on
/projects/art/spaas/flink/prod/nj/flinkfileintegratehbaseindexer/checkpoints//chk-15931959/dd64e300-ad59-4a52-9887-308b10092e5a
(inode 34171509797): File does not exist. Holder
DFSClient_NONMAPREDUCE_-946314839_1 does not have any open files.
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3693)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFileInternal(FSNamesystem.java:3781)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFile(FSNamesystem.java:3748)
at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.complete(NameNodeRpcServer.java:912)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.complete(ClientNamenodeProtocolServerSideTranslatorPB.java:549)
at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640)
at 

How to run in IDE?

2022-01-24 Thread John Smith
Hi using Flink 1.14.3 with gradle. I explicitly added the flink client
dependency and the job starts but it quits with...

In Flink 1.10 the job worked as is. How do I set the number of slots and is
there any other settings for the IDE?

16:29:50,633 INFO
 org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager
 - Received resource requirements from job
3a3e9c46da413071392bce161c39270f:
[ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN},
numberOfRequiredSlots=2}]
16:29:50,633 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
 - Sink: Print to Std. Out (14/16) (d7c4fbf5f23f3118e54998f2b35338c1)
switched from CANCELING to CANCELED.


Re: [statefun] client cert auth in remote function

2022-01-24 Thread Igal Shilman
Hello Filip,

As far as I know SslContextBuilder.forClient() should use the default trust
store, so if you will install your self signed certificate in the community
supported container image this should be picked up[1].
The following user has reported something similar, and it seems that
they've gone down a similar path [2].

Cheers,
Igal.

[1] https://stackoverflow.com/a/35304873/4405470
[2] https://lists.apache.org/thread/nxf7yk5ctcvndyygnvx7l34gldn0xgj3


On Mon, Jan 24, 2022 at 7:08 PM Filip Karnicki 
wrote:

> Hi All!
>
> I was wondering if there's a way to secure a remote function by requiring
> the client (flink) to use a client cert. Preferably a base64 encoded string
> from the env properties, but that might be asking for a lot :)
>
> I had a look at the code, and NettySharedResources seems to use
> SslContextBuilder.forClient(), and doesn't really seem to deal with setting
> any kind of a keystore
>
> Also, I don't think that setting
> -Djavax.net.ssl.trustStore=path/to/truststore.jks does anything, since I
> keep getting 'unable to find valid certification path to requested target',
> while an exported .pem from my tuststore works fine as a CA in postman
>
> I'm happy to contribute some code if need be, just point me in the right
> direction
>
> Kind regards,
> Fil
>


[statefun] client cert auth in remote function

2022-01-24 Thread Filip Karnicki
Hi All!

I was wondering if there's a way to secure a remote function by requiring
the client (flink) to use a client cert. Preferably a base64 encoded string
from the env properties, but that might be asking for a lot :)

I had a look at the code, and NettySharedResources seems to use
SslContextBuilder.forClient(), and doesn't really seem to deal with setting
any kind of a keystore

Also, I don't think that setting
-Djavax.net.ssl.trustStore=path/to/truststore.jks does anything, since I
keep getting 'unable to find valid certification path to requested target',
while an exported .pem from my tuststore works fine as a CA in postman

I'm happy to contribute some code if need be, just point me in the right
direction

Kind regards,
Fil


Watermarking with FileSource

2022-01-24 Thread Meghajit Mazumdar
I had a doubt regarding watermarking w.r.t streaming with FileSource. Will
really appreciate it if somebody can explain this behavior.

Consider a filesystem with a root folder containing date wise sub folders
such as  *D1* , *D2*, … and so on. Each of these date folders further has
24 sub-folders inside corresponding to data generated for each hour,
i:e, *hr=0,
hr=1, hr=2*,… and so on. Each hourly folder has only one file inside it.
So there will be 24 files total. The file structure will look something
like this:


   - - *ROOT FOLDER*


   - - D1


   - - hr=0
   - - somefile.txt
   - - hr=1
   - - somefile.txt
   - … (more hour folders)


   - - D2


   - - hr=0
   - - somefile.txt
   - - hr=1
   - - somefile.txt
   - … (more hour folders)


   - …(more date folders)



Let’s assume we are running a Flink job  with a File Source and parallelism
of 15. Let’s say, one task manager has one slot. Also, let’s assume we want
one split per file (NonSplittingRecursiveEnumerator is used)
Also, let’s assume a case where each task manager picks up one file split
in the beginning. For the very sake of simplicity, let’s also assume the
splits are generated and picked in chronological order. So files from *hr=0*
 till *hr=14* will be picked up respectively by the 15 task managers. Total
15 files are picked for processing.

Now, even if all the task managers finish reading their files at the same
or different time, the next file split, i:e *hr=15*  can be picked up by
any task manager.
Unless otherwise the task manager which also processed *hr=14* file picks
this *hr=15* file as well, rows from this file will always be dropped by
other task managers unless the watermark interval is really huge, like 1
hour+.

Am I thinking about this correctly ? Is the solution then to keep a
really big watermark interval for a FileSource  ? Or is there an idiomatic
pattern to solve these kinds of problems with FileSource ?
-- 
*Regards,*
*Meghajit*


Regarding Queryable state in Flink

2022-01-24 Thread Jessy Ping
Hi Team,

We are currently running our streaming application based Flink(Datastream
API ) on a non-prod cluster.And planning to move it to production cluster
soon.. We are keeping cerating keyed state backed by rocksdb in the flink
application. We need a mechanism to query these keyed state values for
debugging and troubleshooting. Is it a good idea to use Queryable state for
a single link-job running in application-mode on kubernetes for an average
load of 10k events/second ?.
Or is it a better idea to keep these state values in an external k,v store
?.

So in short --> Is the queryable state stable enough to use in production
systems ?


Thanks
Jessy


Re: ParquetColumnarRowInputFormat - parameter description

2022-01-24 Thread Krzysztof Chmielewski
Hi,
I would like to bump this up a little bit.

The isCaseSensitive  is rather clear. If this is false, then column read in
parquet file is case insensitive.
batchSize - how many records we read from the Parquet file before passing
it to the upper classes right?

Could someone describe what  timestamp flab does with some examples?

Regards,
Krzysztof Chmielewski


pon., 10 sty 2022 o 14:59 Krzysztof Chmielewski <
krzysiek.chmielew...@gmail.com> napisał(a):

> Hi,
> I would like to ask for some more details regarding
> three ParquetColumnarRowInputFormat contruction parameters.
>
> The parameters are:
> batchSize,
> isUtcTimestamp,
> isCaseSensitive
>
> The parametr names gives some hint about their purpose but there is no
> description in docs (java, flink page).
>
> Could you provide me some information about the batching process and other
> two boolean flags?
>
> Regards,
> Krzysztof Chmielewski
>


Re: Is Scala the best language for Flink?

2022-01-24 Thread Yun Tang
Hi Sebastian,

If you are a Flink runtime developer, Flink already make the runtime code scala 
free [1] for maintenance concerns. If you are just a Flink user, I think both 
languages are fine.

[1] https://issues.apache.org/jira/browse/FLINK-14105
[FLINK-14105] Make flink-runtime scala-free - ASF 
JIRA
As the consensus among our community(please link dedicated thread if there is) 
we keep in mind that flink-runtime will be eventually scala-free. It is because 
of ...
issues.apache.org

Best
Yun Tang

From: seb 
Sent: Monday, January 24, 2022 20:14
To: user@flink.apache.org 
Subject: Is Scala the best language for Flink?

Hi there,

I am getting started with Apache Flink. I am curious whether there is a clear 
winner between developing in either Scala or Java.

It sounds like Flink is typically slower to support new versions of Scala and 
that Java development might have fewer quirks.

What do you think? I have experience coding in Scala, but I am more than happy 
to learn Java.

Thanks in advance for sharing your thoughts!

Best,
Sebastian


Is Scala the best language for Flink?

2022-01-24 Thread seb
Hi there,

I am getting started with Apache Flink. I am curious whether there is a clear 
winner between developing in either Scala or Java.

It sounds like Flink is typically slower to support new versions of Scala and 
that Java development might have fewer quirks.

What do you think? I have experience coding in Scala, but I am more than happy 
to learn Java.

Thanks in advance for sharing your thoughts!

Best,
Sebastian

退订

2022-01-24 Thread imnu205...@126.com
退订



imnu205...@126.com


Flink job of multiple sink tables can't started on yarn

2022-01-24 Thread Xuekui
Hi all,


I have one flink job which reads data from one kafka topic and sinks to two 
kafka topics using Flink SQL.


The code is something like this:


tableEnv.executeSql(
"""
create table sink_table1 (
xxx
xxx
) with (
  'connector' = 'kafka',
  'topic' = 'topic1'
)
"""
)


tableEnv.executeSql(
"""
create table sink_table2 (
xxx
xxx
) with (
  'connector' = 'kafka',
  'topic' = 'topic2'
)
"""
)



The code works well in local minicluster mode.
But when I deploy it on yarn, one application is running well and the other 
would fail with the following error


022-01-24 11:16:25,374 ERROR 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint   
 [] - Could not start cluster entrypoint YarnJobClusterEntrypoint.
org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to 
initialize the cluster entrypoint YarnJobClusterEntrypoint.
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:200)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:569)
 [flink-dist_2.11-1.12.2.jar:1.12.2]
at 
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:99)
 [flink-dist_2.11-1.12.2.jar:1.12.2]
Caused by: org.apache.flink.util.FlinkException: Could not create the 
DispatcherResourceManagerComponent.
at 
org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:275)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:234)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:178)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]
at java.security.AccessController.doPrivileged(Native Method) 
~[?:1.8.0_161]
at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_161]
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
 ~[hadoop-common-2.7.3.jar:?]
at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:175)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]
... 2 more
Caused by: java.net.BindException: Could not start rest endpoint on any port in 
port range 42888
at 
org.apache.flink.runtime.rest.RestServerEndpoint.start(RestServerEndpoint.java:234)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]
at 
org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:172)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:234)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:178)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]
at java.security.AccessController.doPrivileged(Native Method) 
~[?:1.8.0_161]
at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_161]
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
 ~[hadoop-common-2.7.3.jar:?]
at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:175)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]
... 2 more



It seems that the two sink tables would trigger two yarn applications and each 
application will start one job manager and two job managers are using the the 
same rest port on same container, so there's only one application can run 
successfully, the other would fail.


Is there anything I can do to start both applications?


Thanks
Xuekui

Re: [DISCUSS] Future of Per-Job Mode

2022-01-24 Thread Matthias Pohl
Hi all,
I agree with Xintong's comment: Reducing the number of deployment modes
would help users. There is a clearer distinction between session mode and
the two other deployment modes (i.e. application and job mode). The
difference between application and job mode is not that easy to grasp for
newcomers, I imagine. It would also help cleaning up some job-mode-specific
code segments in the source code.

It would be interesting to see whether there are other use-cases that are
missed in the Application mode (besides the ones already addressed by
Biao). I would second Xintong's proposal of deprecating the job-mode rather
soonish making users aware of the plans around that deployment mode. That
might help encourage users to speak up in case they are not able to find a
solution to work around deprecation warnings.

I also agree with Xintong's assessment that dropping it should only be done
after we're sure that all relevant use cases are met also by other
deployment modes considering that (based on the comments above) it is a
widely used deployment mode.

Matthias

On Mon, Jan 24, 2022 at 10:00 AM Xintong Song  wrote:

> Sorry for joining the discussion late.
>
> I'm leaning towards deprecating the per-job mode soonish, and eventually
> dropping it in the long-term.
> - One less deployment mode makes it easier for users (especially
> newcomers) to understand. Deprecating the per-job mode sends the signal
> that it is legacy, not recommended, and in most cases users do not need to
> care about it.
> - For most (if not all) user demands that are satisfied by the per-job
> mode but not by the application mode, AFAICS, they can be either workaround
> or eventually addressed by the application mode. E.g., make application
> mode support shipping local dependencies.
> - I'm not sure about dropping the per-job mode soonish, as many users are
> still working with it. We'd better not force these users to migrate to the
> application mode when upgrading the Flink version.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Fri, Jan 21, 2022 at 4:30 PM Konstantin Knauf 
> wrote:
>
>> Thanks Thomas & Biao for your feedback.
>>
>> Any additional opinions on how we should proceed with per job-mode? As
>> you might have guessed, I am leaning towards proposing to deprecate per-job
>> mode.
>>
>> On Thu, Jan 13, 2022 at 5:11 PM Thomas Weise  wrote:
>>
>>> Regarding session mode:
>>>
>>> ## Session Mode
>>> * main() method executed in client
>>>
>>> Session mode also supports execution of the main method on Jobmanager
>>> with submission through REST API. That's how Flinkk k8s operators like
>>> [1] work. It's actually an important capability because it allows for
>>> allocation of the cluster resources prior to taking down the previous
>>> job during upgrade when the goal is optimization for availability.
>>>
>>> Thanks,
>>> Thomas
>>>
>>> [1] https://github.com/lyft/flinkk8soperator
>>>
>>> On Thu, Jan 13, 2022 at 12:32 AM Konstantin Knauf 
>>> wrote:
>>> >
>>> > Hi everyone,
>>> >
>>> > I would like to discuss and understand if the benefits of having
>>> Per-Job
>>> > Mode in Apache Flink outweigh its drawbacks.
>>> >
>>> >
>>> > *# Background: Flink's Deployment Modes*
>>> > Flink currently has three deployment modes. They differ in the
>>> following
>>> > dimensions:
>>> > * main() method executed on Jobmanager or Client
>>> > * dependencies shipped by client or bundled with all nodes
>>> > * number of jobs per cluster & relationship between job and cluster
>>> > lifecycle* (supported resource providers)
>>> >
>>> > ## Application Mode
>>> > * main() method executed on Jobmanager
>>> > * dependencies already need to be available on all nodes
>>> > * dedicated cluster for all jobs executed from the same main()-method
>>> > (Note: applications with more than one job, currently still significant
>>> > limitations like missing high-availability). Technically, a session
>>> cluster
>>> > dedicated to all jobs submitted from the same main() method.
>>> > * supported by standalone, native kubernetes, YARN
>>> >
>>> > ## Session Mode
>>> > * main() method executed in client
>>> > * dependencies are distributed from and by the client to all nodes
>>> > * cluster is shared by multiple jobs submitted from different clients,
>>> > independent lifecycle
>>> > * supported by standalone, Native Kubernetes, YARN
>>> >
>>> > ## Per-Job Mode
>>> > * main() method executed in client
>>> > * dependencies are distributed from and by the client to all nodes
>>> > * dedicated cluster for a single job
>>> > * supported by YARN only
>>> >
>>> >
>>> > *# Reasons to Keep** There are use cases where you might need the
>>> > combination of a single job per cluster, but main() method execution
>>> in the
>>> > client. This combination is only supported by per-job mode.
>>> > * It currently exists. Existing users will need to migrate to either
>>> > session or application mode.
>>> >
>>> >
>>> > *# Reasons to Drop** With Per-Job Mode and Application Mode we 

Re: Flink shutdown with exception when run in idea IDE

2022-01-24 Thread Chesnay Schepler
Certain (expected and completely fine) lifecycle events, like the one 
you mentioned, do log a stacktrace on debug level I believe. This one is 
not a cause for concern.


On 24/01/2022 11:02, Caizhi Weng wrote:

Hi!

The exception stack you provided is not complete. Could you please 
provide the whole exception stack (including all "Caused by")? Also 
could you please provide your user code so that others can look into 
this problem?


Shawn Du  于2022年1月24日周一 17:22写道:

Hi experts,
I am new to flink, just run a simple job in IDE, but there are
many exceptions thrown when job finished(see blow).
job source is bounded, read from a local file and run in streaming
mode. there is a customer sink also, simply write to local file.
It seems that each time I run, I got different lines of output. I
am not sure all data is flushed into disk. please help.

Thanks.

org.apache.flink.util.FlinkException: The TaskExecutor is shutting down.
 at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:438)
 at 
org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:214)
 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:563)
 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:186)
 at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
 at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
 at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
 at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
 at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
 at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
 at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
 at akka.actor.Actor.aroundReceive(Actor.scala:517)
 at akka.actor.Actor.aroundReceive$(Actor.scala:515)
 at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
 at akka.actor.ActorCell.receiveMessage$$$capture(ActorCell.scala:592)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala)
 at akka.actor.ActorCell.invoke(ActorCell.scala:561)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
 at akka.dispatch.Mailbox.run(Mailbox.scala:225)
 at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
 at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)




Re: Flink shutdown with exception when run in idea IDE

2022-01-24 Thread Caizhi Weng
Hi!

The exception stack you provided is not complete. Could you please provide
the whole exception stack (including all "Caused by")? Also could you
please provide your user code so that others can look into this problem?

Shawn Du  于2022年1月24日周一 17:22写道:

> Hi experts,
> I am new to flink, just run a simple job in IDE, but there are many
> exceptions thrown when job finished(see blow).
> job source is bounded, read from a local file and run in streaming mode.
> there is a customer sink also, simply write to local file.
> It seems that each time I run, I got different lines of output. I am not
> sure all data is flushed into disk.  please help.
>
> Thanks.
>
> org.apache.flink.util.FlinkException: The TaskExecutor is shutting down.
>
>  at 
> org.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:438)
>
>  at 
> org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:214)
>
>  at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:563)
>
>  at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:186)
>  at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>  at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>  at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
>  at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
>  at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>  at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
>  at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
>  at akka.actor.Actor.aroundReceive(Actor.scala:517)
>  at akka.actor.Actor.aroundReceive$(Actor.scala:515)
>  at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>  at akka.actor.ActorCell.receiveMessage$$$capture(ActorCell.scala:592)
>  at akka.actor.ActorCell.receiveMessage(ActorCell.scala)
>  at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>  at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>  at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>  at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>  at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>
>  at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>  at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>
>  at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
>


Re: JDBC read DB causeOutOfMemoryError: Java heap space

2022-01-24 Thread Caizhi Weng
Hi!

Sorry for the late reply. Which Flink version are you using? For current
Flink master there is no JdbcTableSource.

Qihua Yang  于2022年1月19日周三 16:00写道:

> Should I change the query? something like below to add a limit? If no
> limit, does that mean flink will read whole huge table to memory and fetch
> and return 20 records each time?
>
> val query = String.format("SELECT * FROM %s limit 1000", tableName)
>
>
> On Tue, Jan 18, 2022 at 11:56 PM Qihua Yang  wrote:
>
>> Hi Caizhi,
>>
>> Thank you for your reply. The heap size is 512m. Fetching from the DB
>> table is the only costly operation. After fetching from DB, I simply
>> ingested a kafka topic. That should not be the bottleneck.
>> Here is the jdbc configuration. Is that correct config?
>>
>> val query = String.format("SELECT * FROM %s", tableName)
>>
>> val options = JdbcOptions.builder()
>> .setDBUrl(url)
>> .setTableName(tableName)
>> .setDriverName(DRIVER_NAME)
>> .setUsername(userName)
>> .setPassword(password)
>> .build()
>> val readOptions = JdbcReadOptions.builder()
>> .setQuery(query)
>> .setPartitionColumnName(PARTITION_KEY)
>> .setPartitionLowerBound(dbLowerBound)
>> .setPartitionUpperBound(dbUpperBound)
>> .setNumPartitions(50)
>> .setFetchSize(20)
>> .build()
>> val lookupOptions = JdbcLookupOptions.builder()
>> .setCacheMaxSize(-1)
>> .setCacheExpireMs(1000)
>> .setMaxRetryTimes(2)
>> .build()
>> val rawSource = JdbcTableSource.builder()
>> .setOptions(options)
>> .setReadOptions(readOptions)
>> .setLookupOptions(lookupOptions)
>> .setSchema(schema)
>> .build().getDataStream(env)
>>
>>
>> On Tue, Jan 18, 2022 at 11:48 PM Caizhi Weng 
>> wrote:
>>
>>> Hi!
>>>
>>> This is not the desired behavior. As you have set fetchSize to 20 there
>>> will be only 20 records in each parallelism of the source. How large is
>>> your heap size? Does your job have any other operations which consume a lot
>>> of heap memory?
>>>
>>> Qihua Yang  于2022年1月19日周三 15:27写道:
>>>
 Here is the errors
 Exception: java.lang.OutOfMemoryError thrown from the
 UncaughtExceptionHandler in thread "server-timer"
 Exception: java.lang.OutOfMemoryError thrown from the
 UncaughtExceptionHandler in thread "I/O dispatcher 16"
 Exception: java.lang.OutOfMemoryError thrown from the
 UncaughtExceptionHandler in thread "HTTP-Dispatcher"
 Exception: java.lang.OutOfMemoryError thrown from the
 UncaughtExceptionHandler in thread "I/O dispatcher 11"
 Exception: java.lang.OutOfMemoryError thrown from the
 UncaughtExceptionHandler in thread "I/O dispatcher 9"

 On Tue, Jan 18, 2022 at 11:25 PM Qihua Yang  wrote:

> Hi,
>
> I have a flink cluster(50 hosts, each host runs a task manager).
> I am using Flink JDBC to consume data from a database. The db table is
> pretty large, around 18734 rows. I configured the JDBC number of
> partitions to 50. fetchSize is 20. Flink application has 50 task managers.
> Anyone know why I got OutOfMemoryError? How should I config it?
>
> Thanks,
> Qihua
>
>


Re: Tuning akka.ask.timeout

2022-01-24 Thread Guowei Ma
Hi
After 1.14.0 I think Flink should work well even at the 1000*1000 scale +
10s akka.timeout in the deploy stage.
So thank you for any further feedback after you investigate.

BTW: I think you might look at
https://issues.apache.org/jira/browse/FLINK-24295, which might cause the
problem.

Best,
Guowei



On Mon, Jan 24, 2022 at 4:31 PM Paul Lam  wrote:

> Hi Guowei,
>
> Thanks a lot for your reply.
>
> I’m using 1.14.0. The timeout happens at job deployment time. A subtask
> would run for a short period of `akka.ask.timeout` before fails due to the
> timeout.
>
> I noticed that jobmanager have a very hight CPU usage at the moment, like
> 2000%. I’m reasoning about the cause by profiling.
>
> Best,
> Paul Lam
>
> 2022年1月21日 09:56,Guowei Ma  写道:
>
> Hi, Paul
>
> Would you like to share some information such as the Flink version you
> used and the memory of TM and JM.
> And when does the timeout happen? Such as at begin of the job or during
> the running of the job
>
> Best,
> Guowei
>
>
> On Thu, Jan 20, 2022 at 4:45 PM Paul Lam  wrote:
>
>> Hi,
>>
>> I’m tuning a Flink job with 1000+ parallelism, which frequently fails
>> with Akka TimeOutException (it was fine with 200 parallelism).
>>
>> I see some posts recommend increasing `akka.ask.timeout` to 120s. I’m not
>> familiar with Akka but it looks like a very long time compared to the
>> default 10s and as a response timeout.
>>
>> So I’m wondering what’s the reasonable range for this option? And why
>> would the Actor fail to respond in time (the message was dropped due to
>> pressure)?
>>
>> Any input would be appreciated! Thanks a lot.
>>
>> Best,
>> Paul Lam
>>
>>
>


Re:数据库Table Schema 转换为 Flink Schema

2022-01-24 Thread Michael Ran
table api 里面有 catalogTable 的实现
在 2022-01-24 16:50:25,"WuKong"  写道:
>hi all:
>   大家有没有了解, 社区或者其他渠道可以提供 将 数据库的建表语句(比如 Mysql Create table ) 自动转换为 Flink 的 
> Schema 对象(org.apache.flink.table.api.Schema) ,求推荐
>
>
>
>---
>Best,
>WuKong


Re: flink任务提交到集群执行一段时间报错Java heap space

2022-01-24 Thread Caizhi Weng
Hi!

你的 state backend 是 heap state backend 吗?如果是的话,Flink
流作业运行过程中的状态会存储在堆中,checkpoint 也会存储在堆中,确实有可能导致 OOM。可以尝试换成其他 state backend 看一下。

Liu Join  于2022年1月21日周五 13:20写道:

>
> 我已经将5s的时间窗口替换为100条的countWindowAll,具体实现为使用aggregate函数将窗口内的数据拼接为一条sql语句,sql语句如下:replace
> into table (a1,a2,a3,a4,..) values(…)
> 但还是没有解决,
> heap dump暂时无法提供,
> taskmanager内存分配如下:
> task heap:2.76G,network:343MB,JVMMetaspace:256MB
>
>
> 我一共运行了两个任务,都会出现这种问题,但之前写过一个简单的数据同步的程序没有出错,就是将一个MySQL库中的500张表同步到另一个MySQL库,不知道对于这种问题有没有解决的方向。
>
> 之前在监控任务运行时发现是MySQLsource先失败,然后导致整个任务挂了,在开启checkpoint时,MySQLsource和开窗之前的部分为一个parallelism,这个parallelism的checkpoint大小一直是136MB,从任务开始到结束都是136MB,其他运算的checkpoint不到1MB,是否有这部分原因
> 从 Windows 版邮件发送
>
> 发件人: Caizhi Weng
> 发送时间: 2022年1月21日 10:52
> 收件人: flink中文邮件组
> 主题: Re: flink任务提交到集群执行一段时间报错Java heap space
>
> Hi!
>
> 5s 的窗口拼接 sql 语句看起来比较可疑,具体是怎么实现的?另外可以把 task manager 的 heap dump
> 出来看一下哪里占比较多的堆内存。
>
> Liu Join  于2022年1月20日周四 13:28写道:
>
> > 环境:
> >
> >
> flink1.13.5,Standalone模式集群,jobmanager内存2GB,taskmanager内存4GB,集群包括一个jobmanager和两个taskmanager,每个taskmanager有2个slot。
> >
> >
> >
> 任务内容是读取2万张表的数据,数据每1分钟一条,每10分钟输出每张表的最后一条数据。代码中使用了map、filter、watermark、开了一个10分钟的滑动窗口,使用reduce获得最后一条数据,因为sink是mysql,配置不高,所以将最后一条数据拼成批量插入语句才往MySQL写入。开了一个5s的窗口用于拼接sql语句。
> >
> > 报错内容:
> > java.lang.OutOfMemoryError: Java heap space
> >
> > 报错表象:
> >
> >
> 整个taskmanager内存被占满,任务失败重启后taskmanager内存仍然是满的,导致任务再次失败。之后任务直接挂了。时间长了之后内存没释放,Taskmanager进程也会挂了。
> > 从 Windows 版邮件发送
> >
> >
>
>


Re: flink执行任务失败,taskmanager内存不释放

2022-01-24 Thread Caizhi Weng
Hi!

taskmanager 内存就一直上涨指的是堆内存吗?可以把 heap dump 出来看一下具体是哪里占用了内存。

Liu Join  于2022年1月21日周五 15:21写道:

> 环境:flink1.13.5,Standalone模式,taskmanager内存4GB,两个slot
>
>
> 任务数据量很少,不到10MB,任务执行时,taskmanager内存就一直上涨知道报错重启,但因为任务重启后taskmanager内存没有释放,导致任务彻底失败,taskmanager服务也挂了
>
> 从 Windows 版邮件发送
>
>


Re: Window function - flush on job stop

2022-01-24 Thread Caizhi Weng
Hi!

As far as I know there is currently no way to do this. However if you'd
like to, you can implement this with a custom source. Before you stop the
job you need to send a signal to this custom source (for example through a
common file on HDFS or just through socket) and if the custom source
detects this, it sends out a record with a very large watermark to cut off
the session.

Lars Skjærven  于2022年1月21日周五 20:01写道:

> We're doing a stream.keyBy().window().aggregate() to aggregate customer
> feedback into sessions. Every now and then we have to update the job, e.g.
> change the key, so that we can't easlily continue from the previous state.
>
> Cancelling the job (without restarting from last savepoint) will result in
> loosing ongoing sessions. So we typically go back a few hours when we
> restart to minimize the loss.
>
> Is there any way of making the job flush it's content (sessions) on job
> cancellation? That will result in splitting ongoing sessions in two, which
> is perfectly fine for our purpose.
>
> Any thoughts ?
>
> Lars
>
>
>


Re: Apache Fink - Adding/Removing KeyedStreams at run time without stopping the application

2022-01-24 Thread Caizhi Weng
Hi!

Adding/removing keyed streams will change the topology graph of the job.
Currently it is not possible to do so without restarting the job and as far
as I know there is no existing framework/pattern to achieve this.

By the way, why do you need this functionality? Could you elaborate more on
your use case?

M Singh  于2022年1月22日周六 21:32写道:

> Hi Folks:
>
> I am working on an exploratory project in which I would like to add/remove
> KeyedStreams (
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/overview/#keyby)
> without restarting the Flink streaming application.
>
> Is it possible natively in Apache Flink ?  If not, is there any
> framework/pattern which can be used to implement this without restarting
> the application/changing the code ?
>
> Thanks
>
>
>
>
>
>
>
>
>


Flink shutdown with exception when run in idea IDE

2022-01-24 Thread Shawn Du
Hi experts,
I am new to flink, just run a simple job in IDE, but there are many exceptions 
thrown when job finished(see blow).
job source is bounded, read from a local file and run in streaming mode. there 
is a customer sink also, simply write to local file.
It seems that each time I run, I got different lines of output. I am not sure 
all data is flushed into disk.  please help.

Thanks.

org.apache.flink.util.FlinkException: The TaskExecutor is shutting down.
 at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:438)
 at 
org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:214)
 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:563)
 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:186)
 at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
 at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
 at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
 at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
 at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
 at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
 at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
 at akka.actor.Actor.aroundReceive(Actor.scala:517)
 at akka.actor.Actor.aroundReceive$(Actor.scala:515)
 at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
 at akka.actor.ActorCell.receiveMessage$$$capture(ActorCell.scala:592)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala)
 at akka.actor.ActorCell.invoke(ActorCell.scala:561)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
 at akka.dispatch.Mailbox.run(Mailbox.scala:225)
 at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
 at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)




Re: flink 1.13.2 计算hive仓库数据时错误,NullPointerException

2022-01-24 Thread Caizhi Weng
Hi!

这看起来像是一个 bug,能否提供一下 hive 表的 DDL 还有运行的 query 语句,这样大家可以更好地调查这个问题?

Asahi Lee  于2022年1月24日周一 09:53写道:

> 2022-01-23 04:31:39,568 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph  
> [] - Source:
> HiveSource-cosldatacenter.ods_rimdrill_dailyincidentsevents -
> Calc(select=[jobid, reportno, dayssincelast], where=[(idno = 1:BIGINT)])
> (1/1) (7533d77baa7eb16e8242ae63e0706dff) switched from RUNNING to CANCELING.
> 2022-01-23 04:31:39,570 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph  
> [] - Discarding the results produced by task execution
> 07b2cd514c6b6d85f79ab5b953971f82.
> 2022-01-23 04:31:39,570 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph  
> [] - MultipleInput(readOrder=[0,0,1],
> members=[\nHashJoin(joinType=[LeftOuterJoin], where=[((jobid = jobid0) AND
> ($f109 = reportno0))], select=[jobid, reportno, reportdate, depthprogress,
> numperstype1, numperstype2, numperstype3, numperstype4, numperstype5,
> numperstype6, currentops, futureops, dayssincelast, $f109, jobid0,
> reportno0, boptestdate, bopfunctiontestdate], build=[right])\n:-
> Calc(select=[jobid, reportno, reportdate, depthprogress, numperstype1,
> numperstype2, numperstype3, numperstype4, numperstype5, numperstype6,
> currentops, futureops, dayssincelast, bigint(reportno) AS $f109])\n:
> +- HashJoin(joinType=[LeftOuterJoin], where=[((jobid = jobid0) AND ($f94 =
> reportno0))], select=[jobid, reportno, reportdate, depthprogress,
> numperstype1, numperstype2, numperstype3, numperstype4, numperstype5,
> numperstype6, currentops, futureops, $f94, jobid0, reportno0,
> dayssincelast], build=[right])\n:  :- Calc(select=[jobid,
> reportno, reportdate, depthprogress, numperstype1, numperstype2,
> numperstype3, numperstype4, numperstype5, numperstype6, currentops,
> futureops, bigint(reportno) AS $f94])\n:  : +- [#3]
> Exchange(distribution=[hash[jobid]])\n:  +- [#2]
> Exchange(distribution=[hash[jobid]])\n+- [#1]
> Exchange(distribution=[hash[jobid]])\n]) - Calc(select=[jobid AS $f0,
> reportno AS $f1, dayssincelast AS $f2, depthprogress AS $f3, currentops AS
> $f4, futureops AS $f5, reportdate AS $f6, numperstype1 AS $f7, numperstype2
> AS $f8, numperstype3 AS $f9, numperstype4 AS $f10, numperstype5 AS $f11,
> numperstype6 AS $f12, boptestdate AS $f13, bopfunctiontestdate AS $f14])
> - HashAggregate(isMerge=[false], groupBy=[$f0, $f1, $f2, $f3, $f4, $f5,
> $f6, $f7, $f8, $f9, $f10, $f11, $f12], select=[$f0, $f1, $f2, $f3, $f4,
> $f5, $f6, $f7, $f8, $f9, $f10, $f11, $f12, MAX($f13) AS lasboptestdate,
> MAX($f14) AS lasbopfunctiontestdate]) - Calc(select=[$f0 AS jobid, $f1
> AS reportno, string($f6) AS reportdate, bigint((nvl($f7, 0) + nvl($f8,
> 0)) + nvl($f9, 0)) + nvl($f10, 0)) + nvl($f11, 0)) + nvl($f12, 0))) AS
> pobcnt, $f2 AS dayssincelast, $f3 AS depthprogress, $f4 AS currentops, $f5
> AS futureops, lasboptestdate, lasbopfunctiontestdate]) - Map -
> Sink: Unnamed (1/1) (3c555cbd6bf411a6111cf7eaab527d33) switched from
> CREATED to CANCELING.
> 2022-01-23 04:31:39,570 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph  
> [] - MultipleInput(readOrder=[0,0,1],
> members=[\nHashJoin(joinType=[LeftOuterJoin], where=[((jobid = jobid0) AND
> ($f109 = reportno0))], select=[jobid, reportno, reportdate, depthprogress,
> numperstype1, numperstype2, numperstype3, numperstype4, numperstype5,
> numperstype6, currentops, futureops, dayssincelast, $f109, jobid0,
> reportno0, boptestdate, bopfunctiontestdate], build=[right])\n:-
> Calc(select=[jobid, reportno, reportdate, depthprogress, numperstype1,
> numperstype2, numperstype3, numperstype4, numperstype5, numperstype6,
> currentops, futureops, dayssincelast, bigint(reportno) AS $f109])\n:
> +- HashJoin(joinType=[LeftOuterJoin], where=[((jobid = jobid0) AND ($f94 =
> reportno0))], select=[jobid, reportno, reportdate, depthprogress,
> numperstype1, numperstype2, numperstype3, numperstype4, numperstype5,
> numperstype6, currentops, futureops, $f94, jobid0, reportno0,
> dayssincelast], build=[right])\n:  :- Calc(select=[jobid,
> reportno, reportdate, depthprogress, numperstype1, numperstype2,
> numperstype3, numperstype4, numperstype5, numperstype6, currentops,
> futureops, bigint(reportno) AS $f94])\n:  : +- [#3]
> Exchange(distribution=[hash[jobid]])\n:  +- [#2]
> Exchange(distribution=[hash[jobid]])\n+- [#1]
> Exchange(distribution=[hash[jobid]])\n]) - Calc(select=[jobid AS $f0,
> reportno AS $f1, dayssincelast AS $f2, depthprogress AS $f3, currentops AS
> $f4, futureops AS $f5, reportdate AS $f6, numperstype1 AS $f7, numperstype2
> AS $f8, numperstype3 AS $f9, numperstype4 AS $f10, numperstype5 AS $f11,
> numperstype6 AS $f12, boptestdate AS $f13, bopfunctiontestdate AS $f14])
> - HashAggregate(isMerge=[false], groupBy=[$f0, $f1, $f2, $f3, $f4, $f5,
> $f6, $f7, $f8, $f9, $f10, $f11, $f12], select=[$f0, $f1, $f2, $f3, $f4,
> $f5, $f6, $f7, $f8, $f9, $f10, $f11, $f12, MAX($f13) AS lasboptestdate,
> MAX($f14) AS lasbopfunctiontestdate]) - 

Re: Query regarding Kafka Source and Kafka Sink in 1.14.3

2022-01-24 Thread Caizhi Weng
Hi!

All properties you set by calling KafkaSource.builder().setProperty() will
also be given to KafkaConsumer (see [1]). However these two properties are
specific to Flink and Kafka does not know them, so Kafka will produce a
warning message. These messages are harmless as long as the properties you
set are actually effective.

About writing timestamp to Kafka, I'm not familiar with Kafka but from the
code I guess if you create a Kafka record serializer
with KafkaRecordSerializationSchema.builder() then by default it will write
timestamp to Kafka. You can try out the example in [2] and see if it works.

[1]
https://github.com/apache/flink/blob/e615106b38a289bc624a8554b86c83f9785352d3/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L85
[2]
https://github.com/apache/flink/blob/0bc2234b60d1a0635e238d18990695943158123c/flink-end-to-end-tests/flink-streaming-kafka-test/src/main/java/org/apache/flink/streaming/kafka/test/KafkaExample.java

Mahima Agarwal  于2022年1月24日周一 15:35写道:

> Hi Team,
>
> I am trying to set the following properties in Kafka Source API in flink
> 1.14.3 version.
> -> client.id.prefix
> -> partition.discovery.interval.ms
>
> But I am getting the below mentioned warning in taskmanager logs:
>
> 1. WARN  org.apache.kafka.clients.consumer.ConsumerConfig [] -
> The configuration 'client.id.prefix' was supplied but isn't a known config.
> 2. WARN  org.apache.kafka.clients.consumer.ConsumerConfig [] -
> The configuration 'partition.discovery.interval.ms' was supplied but
> isn't a known config.
>
> What could be the reason for this warning?
>
> Also, in flink version 1.13.2 we were able write timestamp to kafka
> using setWriteTimestampToKafka(true) method of FlinkKafkaProducer class.
> Similar to this how can we write timestamp to kafka using KafkaSink API in
> flink 1.14.3?
>
> Any leads would be appreciated.
>
> Thanks and Regards
> Mahima Agarwal
>


Re: [DISCUSS] Future of Per-Job Mode

2022-01-24 Thread Xintong Song
Sorry for joining the discussion late.

I'm leaning towards deprecating the per-job mode soonish, and eventually
dropping it in the long-term.
- One less deployment mode makes it easier for users (especially newcomers)
to understand. Deprecating the per-job mode sends the signal that it is
legacy, not recommended, and in most cases users do not need to care about
it.
- For most (if not all) user demands that are satisfied by the per-job mode
but not by the application mode, AFAICS, they can be either workaround or
eventually addressed by the application mode. E.g., make application mode
support shipping local dependencies.
- I'm not sure about dropping the per-job mode soonish, as many users are
still working with it. We'd better not force these users to migrate to the
application mode when upgrading the Flink version.

Thank you~

Xintong Song



On Fri, Jan 21, 2022 at 4:30 PM Konstantin Knauf  wrote:

> Thanks Thomas & Biao for your feedback.
>
> Any additional opinions on how we should proceed with per job-mode? As you
> might have guessed, I am leaning towards proposing to deprecate per-job
> mode.
>
> On Thu, Jan 13, 2022 at 5:11 PM Thomas Weise  wrote:
>
>> Regarding session mode:
>>
>> ## Session Mode
>> * main() method executed in client
>>
>> Session mode also supports execution of the main method on Jobmanager
>> with submission through REST API. That's how Flinkk k8s operators like
>> [1] work. It's actually an important capability because it allows for
>> allocation of the cluster resources prior to taking down the previous
>> job during upgrade when the goal is optimization for availability.
>>
>> Thanks,
>> Thomas
>>
>> [1] https://github.com/lyft/flinkk8soperator
>>
>> On Thu, Jan 13, 2022 at 12:32 AM Konstantin Knauf 
>> wrote:
>> >
>> > Hi everyone,
>> >
>> > I would like to discuss and understand if the benefits of having Per-Job
>> > Mode in Apache Flink outweigh its drawbacks.
>> >
>> >
>> > *# Background: Flink's Deployment Modes*
>> > Flink currently has three deployment modes. They differ in the following
>> > dimensions:
>> > * main() method executed on Jobmanager or Client
>> > * dependencies shipped by client or bundled with all nodes
>> > * number of jobs per cluster & relationship between job and cluster
>> > lifecycle* (supported resource providers)
>> >
>> > ## Application Mode
>> > * main() method executed on Jobmanager
>> > * dependencies already need to be available on all nodes
>> > * dedicated cluster for all jobs executed from the same main()-method
>> > (Note: applications with more than one job, currently still significant
>> > limitations like missing high-availability). Technically, a session
>> cluster
>> > dedicated to all jobs submitted from the same main() method.
>> > * supported by standalone, native kubernetes, YARN
>> >
>> > ## Session Mode
>> > * main() method executed in client
>> > * dependencies are distributed from and by the client to all nodes
>> > * cluster is shared by multiple jobs submitted from different clients,
>> > independent lifecycle
>> > * supported by standalone, Native Kubernetes, YARN
>> >
>> > ## Per-Job Mode
>> > * main() method executed in client
>> > * dependencies are distributed from and by the client to all nodes
>> > * dedicated cluster for a single job
>> > * supported by YARN only
>> >
>> >
>> > *# Reasons to Keep** There are use cases where you might need the
>> > combination of a single job per cluster, but main() method execution in
>> the
>> > client. This combination is only supported by per-job mode.
>> > * It currently exists. Existing users will need to migrate to either
>> > session or application mode.
>> >
>> >
>> > *# Reasons to Drop** With Per-Job Mode and Application Mode we have two
>> > modes that for most users probably do the same thing. Specifically, for
>> > those users that don't care where the main() method is executed and
>> want to
>> > submit a single job per cluster. Having two ways to do the same thing is
>> > confusing.
>> > * Per-Job Mode is only supported by YARN anyway. If we keep it, we
>> should
>> > work towards support in Kubernetes and Standalone, too, to reduce
>> special
>> > casing.
>> > * Dropping per-job mode would reduce complexity in the code and allow
>> us to
>> > dedicate more resources to the other two deployment modes.
>> > * I believe with session mode and application mode we have to easily
>> > distinguishable and understandable deployment modes that cover Flink's
>> use
>> > cases:
>> >* session mode: olap-style, interactive jobs/queries, short lived
>> batch
>> > jobs, very small jobs, traditional cluster-centric deployment mode (fits
>> > the "Hadoop world")
>> >* application mode: long-running streaming jobs, large scale &
>> > heterogenous jobs (resource isolation!), application-centric deployment
>> > mode (fits the "Kubernetes world")
>> >
>> >
>> > *# Call to Action*
>> > * Do you use per-job mode? If so, why & would you be able to migrate to
>> one
>> > of the 

Database Table Schema convert Flink Schema

2022-01-24 Thread WuKong
hi all:
anyone know some project can auto convert DB Table Schema(like Mysql create 
table )  to Flink Schema(org.apache.flink.table.api.Schema) tools ,I want 
implment dynamic generate Flink Table Schema 



---
Best,
WuKong


数据库Table Schema 转换为 Flink Schema

2022-01-24 Thread WuKong
hi all:
   大家有没有了解, 社区或者其他渠道可以提供 将 数据库的建表语句(比如 Mysql Create table ) 自动转换为 Flink 的 
Schema 对象(org.apache.flink.table.api.Schema) ,求推荐



---
Best,
WuKong


Re: Tuning akka.ask.timeout

2022-01-24 Thread Paul Lam
Hi Guowei,

Thanks a lot for your reply.

I’m using 1.14.0. The timeout happens at job deployment time. A subtask would 
run for a short period of `akka.ask.timeout` before fails due to the timeout.

I noticed that jobmanager have a very hight CPU usage at the moment, like 
2000%. I’m reasoning about the cause by profiling.

Best,
Paul Lam

> 2022年1月21日 09:56,Guowei Ma  写道:
> 
> Hi, Paul 
> 
> Would you like to share some information such as the Flink version you used 
> and the memory of TM and JM.
> And when does the timeout happen? Such as at begin of the job or during the 
> running of the job
> 
> Best,
> Guowei
> 
> 
> On Thu, Jan 20, 2022 at 4:45 PM Paul Lam  > wrote:
> Hi,
> 
> I’m tuning a Flink job with 1000+ parallelism, which frequently fails with 
> Akka TimeOutException (it was fine with 200 parallelism). 
> 
> I see some posts recommend increasing `akka.ask.timeout` to 120s. I’m not 
> familiar with Akka but it looks like a very long time compared to the default 
> 10s and as a response timeout.
> 
> So I’m wondering what’s the reasonable range for this option? And why would 
> the Actor fail to respond in time (the message was dropped due to pressure)?
> 
> Any input would be appreciated! Thanks a lot.
> 
> Best,
> Paul Lam
> 



Re: Tuning akka.ask.timeout

2022-01-24 Thread Paul Lam
Hi Zhilong,

Thanks a lot for your very detailed answer!

My setup: Flink 1.14.0 on YARN, jdk1.8_u202

The timeout happens at the job deployment stage. I checked GC logs, both JM and 
TM look good, but the CPU usage of JM could go up to 2000% for a short time 
(cgroups are not turned on). 

I’ve set the akka timeout to 60s as a workaround, and the job now runs well.  
I’m planning to dig deeper using profile and will get back to the community if 
I find something.

Best,
Paul Lam

> 2022年1月20日 20:09,Zhilong Hong  写道:
> 
> Hi, Paul:
> 
> Increasing akka.ask.timeout only covers up the issue. Maybe you could try to 
> find the root cause why an akka timeout happens.
> 
> There are many reasons that could lead to an akka timeout:
> 
> 1. JM/TM cannot respond in time. Maybe JM/TM is busy with GC. You could 
> analyze the situation of GC according to the documentation [1]. If long-term 
> GC happens during runtime, you could try to increase the heap memory or 
> increase the parallelism. 
> 
> Maybe there exists a machine that has a high CPU load. If the CPU load is too 
> high, the main thread may not be able to process the akka messages in time. 
> You could monitor the CPU usage during the runtime of your jobs with commands 
> like top. 
> 
> I'm wondering what version of Flink you are using? And when did an akka 
> timeout happen? If it happened during the deployment, you could try to 
> upgrade your Flink to 1.14 for better deployment performance.
> 
> 2. The network congestion. If the situation of the network in your cluster is 
> awful, the akka message cannot arrive at its destination in time, and an akka 
> timeout happens. You could monitor the network traffic with tools mentioned 
> in [2]. 
> 
> If you are trying to increase the value of akka.ask.timeout, you could 
> increase 10 seconds each time and see whether it works.
> 
> Sincerely,
> Zhilong
> 
> [1] 
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/application_profiling/#analyzing-memory--garbage-collection-behaviour
>  
> 
> [2] 
> https://askubuntu.com/questions/257263/how-to-display-network-traffic-in-the-terminal
>  
> 
> On Thu, Jan 20, 2022 at 4:45 PM Paul Lam  > wrote:
> Hi,
> 
> I’m tuning a Flink job with 1000+ parallelism, which frequently fails with 
> Akka TimeOutException (it was fine with 200 parallelism). 
> 
> I see some posts recommend increasing `akka.ask.timeout` to 120s. I’m not 
> familiar with Akka but it looks like a very long time compared to the default 
> 10s and as a response timeout.
> 
> So I’m wondering what’s the reasonable range for this option? And why would 
> the Actor fail to respond in time (the message was dropped due to pressure)?
> 
> Any input would be appreciated! Thanks a lot.
> 
> Best,
> Paul Lam
> 



Re: Flink 1.14.2 - Log4j2 -Dlog4j.configurationFile is ignored and falls back to default /opt/flink/conf/log4j-console.properties

2022-01-24 Thread Yang Wang
>
> I checked the image prior cluster creation; all logs' files are there.
> once the cluster is deployed, they are missing. (bug?)


I do not think it is a bug since we already have shipped all the config
files(log4j properties, flink-conf.yaml) via the ConfigMap.
Then it is directly mounted to an existing path(/opt/flink/conf), which
makes all the existing files hidden.

Of course, we could use the subpath mount to avoid this issue. But the
volume mount will not receive any updates[1].


[1]. https://kubernetes.io/docs/concepts/storage/volumes/#configmap


Best,
Yang


Tamir Sagi  于2022年1月22日周六 23:18写道:

> Hey Yang,
>
> I've created the ticket,
> https://issues.apache.org/jira/browse/FLINK-25762
>
> In addition,
>
> The /opt/flink/conf is cleaned up because we are mounting the conf files
> from K8s ConfigMap.
>
> I checked the image prior cluster creation; all logs' files are there.
> once the cluster is deployed, they are missing. (bug?)
>
> Best,
> Tamir.
> --
> *From:* Tamir Sagi 
> *Sent:* Friday, January 21, 2022 7:19 PM
> *To:* Yang Wang 
> *Cc:* user@flink.apache.org 
> *Subject:* Re: Flink 1.14.2 - Log4j2 -Dlog4j.configurationFile is ignored
> and falls back to default /opt/flink/conf/log4j-console.properties
>
> Yes,
>
> Thank you!
> I will handle that.
>
> Best,
> Tamir
> --
> *From:* Yang Wang 
> *Sent:* Friday, January 21, 2022 5:11 AM
> *To:* Tamir Sagi 
> *Cc:* user@flink.apache.org 
> *Subject:* Re: Flink 1.14.2 - Log4j2 -Dlog4j.configurationFile is ignored
> and falls back to default /opt/flink/conf/log4j-console.properties
>
>
> *EXTERNAL EMAIL*
>
>
> Changing the order of exec command makes sense to me. Would you please
> create a ticket for this?
>
> The /opt/flink/conf is cleaned up because we are mounting the conf files
> from K8s ConfigMap.
>
>
>
> Best,
> Yang
>
> Tamir Sagi  于2022年1月18日周二 17:48写道:
>
> Hey Yang,
>
> Thank you for confirming it.
>
> IMO, a better approach is to change the order "log_setting" , "ARGS" and 
> "FLINK_ENV_JAVA_OPTS"
> in exec command.
> In that way we prioritize user defined properties.
>
> From:
>
> exec "$JAVA_RUN" $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}"
> -classpath "`manglePathList
> "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN}
> "${ARGS[@]}"
>
> To
>
> exec "$JAVA_RUN" $JVM_ARGS "${log_setting[@]}" -classpath "`manglePathList
> "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN}
> "${ARGS[@]}" "${FLINK_ENV_JAVA_OPTS}"
>
> Unless there are system configurations which not supposed to be overridden
> by user(And then having dedicated env variables is better). does it make
> sense to you?
>
>
> In addition, any idea why /opt/flink/conf gets cleaned (Only
> flink-conf.xml is there).
>
>
> Best,
> Tamir
>
>
> --
> *From:* Yang Wang 
> *Sent:* Tuesday, January 18, 2022 6:02 AM
> *To:* Tamir Sagi 
> *Cc:* user@flink.apache.org 
> *Subject:* Re: Flink 1.14.2 - Log4j2 -Dlog4j.configurationFile is ignored
> and falls back to default /opt/flink/conf/log4j-console.properties
>
>
> *EXTERNAL EMAIL*
>
>
> I think you are right. Before 1.13.0, if the log configuration file does
> not exist, the logging properties would not be added to the start command.
> That is why it could work in 1.12.2.
>
> However, from 1.13.0, we are not using
> "kubernetes.container-start-command-template" to generate the JM/TM start
> command, but the jobmanager.sh/taskmanager.sh. We do not
> have the same logic in the "flink-console.sh".
>
> Maybe we could introduce an environment for log configuration file name in
> the "flink-console.sh". The default value could be
> "log4j-console.properties" and it could be configured by users.
> If this makes sense to you, could you please create a ticket?
>
>
> Best,
> Yang
>
> Tamir Sagi  于2022年1月17日周一 22:53写道:
>
> Hey Yang,
>
> thanks for answering,
>
> TL;DR
>
> Assuming I have not missed anything , the way TM and JM are created is
> different between these 2 versions,
> but it does look like flink-console.sh gets called eventually with the
> same exec command.
>
> in 1.12.2 if org.apache.flink.kubernetes.kubeclient.parameters#hasLog4j
> returns false then logging args are not added to startCommand.
>
>
>1. why does the config dir gets cleaned once the cluster starts? Even
>when I pushed log4j-console.properties to the expected location
>(/opt/flink/conf) , the directory includes only flink-conf.yaml.
>2. I think by running exec command "...${FLINK_ENV_JAVA_OPTS}
>"${log_setting[@]}" "${ARGS[@]}" some properties might be ignored.
>IMO, it should first look for properties in java.opts provided by the
>user in flink-conf and falls back to default in case it's not present.
>
>
> Taking about Native kubernetes mode
>
> I checked the bash script in flink-dist module, it looks like in both
> 1.14.2 and 1.12.2. flink-console.sh is similar. (in 1.14.2 there are more
> cases for the input