Re: jdbc connector configuration

2021-10-12 Thread Qihua Yang
Hi, If I configure batch mode, application will stop after the job is complete, right? Then k8s will restart the pod and rerun the job. That is not what we want. Thanks, Qihua On Tue, Oct 12, 2021 at 7:27 PM Caizhi Weng wrote: > Hi! > > It seems that you want to run a batch job instead of a

Re: 关于slot分配

2021-10-12 Thread yidan zhao
目前从概率上来看,默认cluster.evenly-spread-out-slots 设为 false的情况下,都是单TM用完才下一个TM,只有少数时候不清楚啥情况就出现这种问题了。 这种情况下,我是想彻底隔离任务,一旦某个TM的slot没用完,会导致提交其他任务可能也用到该TM,这样任务隔离性不够。 然后有时候机器出问题,或者任务出问题重启导致TM失败等会导致更多的任务重启。 Caizhi Weng 于2021年10月13日周三 上午10:20写道: > Hi! > > “默认的优先单个 TM 的机制”我记得没有这样的参数。你的意思是把

[External] metric collision using datadog and standalone Kubernetes HA mode

2021-10-12 Thread Clemens Valiente
Hi, we are using datadog as our metrics reporter as documented here: https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/metric_reporters/#datadog our jobmanager scope is metrics.scope.jm: flink.jobmanager metrics.scope.jm.job: flink.jobmanager since datadog doesn't

Re: jdbc connector configuration

2021-10-12 Thread Caizhi Weng
Hi! It seems that you want to run a batch job instead of a streaming job. Call EnvironmentSettings.newInstance().inBatchMode().build() to create your environment settings for a batch job. Qihua Yang 于2021年10月13日周三 上午5:50写道: > Hi, > > Sorry for asking again. I plan to use JDBC connector to scan

Re: 关于slot分配

2021-10-12 Thread Caizhi Weng
Hi! “默认的优先单个 TM 的机制”我记得没有这样的参数。你的意思是把 cluster.evenly-spread-out-slots 设为 false 吗?如果是这样,那么会在所有 slot 中任意选择,而不是优先单个 TM。 想知道优先单个 TM 是出于什么样的需求呢?因为这样做可能会造成集群内部分机器很忙,但部分机器空闲的情况,忙机器上的并发会被拖慢。 yidan zhao 于2021年10月12日周二 下午4:25写道: > 我以前采用分配到多个TM的机制,最近尝试了下默认的优先单个TM的机制。 >

Re: Checkpoint size increasing even i enable increasemental checkpoint

2021-10-12 Thread Vijay Bhaskar
Since state size is small, you can try FileState Backend, rather than RocksDB. You can check once. Thumb rule is if FileStateBackend Performs worse, RocksDB is good. Regards Bhasakar On Tue, Oct 12, 2021 at 1:47 PM Yun Tang wrote: > Hi Lei, > > RocksDB state-backend's checkpoint is composited

Re: Impossible to get pending file names/paths on checkpoint?

2021-10-12 Thread Preston Price
Thanks for your thoughts here Fabian, I've responded inline but I also want to clarify the reason I need the file paths on commit. The FileSink works as expected in Azure Data Lake with the ABFS connector, but I want to perform an additional step by telling Azure Data Explorer to ingest the

Re: jdbc connector configuration

2021-10-12 Thread Qihua Yang
Hi, Sorry for asking again. I plan to use JDBC connector to scan a database. How do I know if it is done? Are there any metrics I can track? We want to monitor the progress, stop flink application when it is done. Thanks, Qihua On Fri, Oct 8, 2021 at 10:07 AM Qihua Yang wrote: > It is pretty

Re: Flink 1.11 loses track of event timestamps and watermarks after process function

2021-10-12 Thread Ahmad Alkilani
Thanks Arvid. Getting the easy stuff out of the way, I certainly wait for longer than 10s (typically observe what happens over a few minutes) so the bounded watermark issue isn't in play here. The Async IO as it stands today has timeouts so it doesn't run indefinitely. WIth that said, I replaced

Re: Snapshot method for custom keyed state checkpointing ?

2021-10-12 Thread Seth Wiesman
Hi Marc, I think you will find this is less efficient than just using keyed state. Remember state backends are local, reading and writing is extremely cheap. HashMapStateBackend is just an in-memory data structure and EmbeddedRocksDBStateBackend only works against local disk. Additionally, the

Fwd: How to deserialize Avro enum type in Flink SQL?

2021-10-12 Thread Dongwon Kim
Hi community, Can I get advice on this question? Another user just sent me an email asking whether I found a solution or a workaround for this question, but I'm still stuck there. Any suggestions? Thanks in advance, Dongwon -- Forwarded message - From: Dongwon Kim Date: Mon,

Re: Snapshot method for custom keyed state checkpointing ?

2021-10-12 Thread Marc LEGER
Hello Nicolaus, Unfortunately, I don't really have the hand on the custom state solution since it is managed by an existing system which cannot be easily modified. What I finally did for the "data state" in my CoFlatMapFunction is to use a* list-style operator state* to store the partitioned

[no subject]

2021-10-12 Thread Andrew Otto
Hello, I'm trying to use HiveCatalog with Kerberos. Our Hadoop cluster, our Hive Metastore, and our Hive Server are kerberized. I can successfully submit Flink jobs to Yarn authenticated as my users using a cached ticket, as well as using a keytab. However, I can't seem to register a

Exception: SequenceNumber is treated as a generic type

2021-10-12 Thread Ori Popowski
Hi, I have a large backpressure in a somewhat simple Flink application in Scala. Using Flink version 1.12.1. To find the source of the problem, I want to eliminate all classes with generic serialization, so I set pipeline.generic-types=false in order to spot those classes and write a serializer

Re: Reset of transient variables in state to default values.

2021-10-12 Thread Alex Drobinsky
Hi Jing, Job doesn't restart from the checkpoint, it's a brand new clean job , no exceptions happened during execution, no restarts :) The state is a Keyed State so a new key means a new State - in this situation a currentFile is equal to null - as expected and handled without issues. Before I

Re: Reset of transient variables in state to default values.

2021-10-12 Thread JING ZHANG
Hi Alex, Since you use `FileSystemStateBackend`, I think currentFile became nullified once in a while is not caused by period checkpoint. Because if job is running without failover or restore from checkpoint, read/write value state on `FileSystemStateBackend` does not cause serialization and

Re: Inconsistent parallelism in web UI when using reactive mode

2021-10-12 Thread 陳昌倬
On Tue, Oct 12, 2021 at 10:41:24AM +0200, Chesnay Schepler wrote: > This is a known and documented > > limitation of the AdaptiveScheduler. There is no concrete date yet for when > it will

Re: how to view doc of flink-1.10 in Chinese

2021-10-12 Thread Chesnay Schepler
https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/ Do note that certain pages haven't been translated. On 09/10/2021 05:08, 杨浩 wrote: Our company use release-1.10,can we see the zh doc? English Doc:https://ci.apache.org/projects/flink/flink-docs-release-1.10/ Chinese Doc(only

Re: Reset of transient variables in state to default values.

2021-10-12 Thread Yun Tang
Hi Alex, Since you use customized MultiStorePacketState class as the value state type, it should use kryo serializer [1] to serialize your class via accessing RocksDB state or checkpoint via FileSystemStateBackend, and I don't know whether Kryo would serialize your transient field. If you're

Re: Inconsistent parallelism in web UI when using reactive mode

2021-10-12 Thread Chesnay Schepler
This is a known and documented limitation of the AdaptiveScheduler. There is no concrete date yet for when it will be fixed. On 12/10/2021 05:08, ChangZhuo Chen (陳昌倬) wrote: Hi, We

关于slot分配

2021-10-12 Thread yidan zhao
我以前采用分配到多个TM的机制,最近尝试了下默认的优先单个TM的机制。 但是发现个问题,我当前每个TM是10个slot,我有个任务40并发,然后实际占用了5个TM,10+10+10+2+8。这个是啥情况呢? 我更期望要么就彻底平均(配置spread那个参数),要么就单个TM这样用。 前者:期望机器之间均衡。 后者:期间任务之间完全隔离,我的任务并发会设置单TMslot数量(10)的倍数。

Re: Checkpoint size increasing even i enable increasemental checkpoint

2021-10-12 Thread Yun Tang
Hi Lei, RocksDB state-backend's checkpoint is composited by RocksDB's own files (unmodified compressed SST format files) and incremental checkpoints means Flink does not upload files which were uploaded before. As you can see, incremental checkpoints highly depend on the RocksDB's own

Re: Impossible to get pending file names/paths on checkpoint?

2021-10-12 Thread Fabian Paul
Hi Preston, I just noticed I forgot to cc to the user mailing list on my first reply …. I have a few thoughts about the design you are describing. > In the meantime I have a nasty hack in place that has unblocked me for now in > getting the target file off the

Re: Snapshot method for custom keyed state checkpointing ?

2021-10-12 Thread Nicolaus Weidner
Hi Marc, thanks for clarifying, I had misunderstood some parts. Unfortunately, I don't think there is a way to update keyed state (for multiple keys even) outside of a keyed context. I will ask if someone else has an idea, but allow me to ask one counter-question first: Did you actually run

Re: Custom Sink Object attribute issue

2021-10-12 Thread Arvid Heise
Hi Jigar, I'm moving your user question to the user ML. The best place to initialize transient fields is in private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException; as described in [1]: Remember that transient fields will be initialized to their

??????flink-1.14 ???? kafkasource ????watermark????

2021-10-12 Thread kcz
??globalWindowtriggertimes.public class PathMonitorJob { private static final String PATH = "path"; private static double THRESHOLD; public static void main(String[] args) throws Exception

Re: Flink 1.11 loses track of event timestamps and watermarks after process function

2021-10-12 Thread Arvid Heise
Hi Ahmad, >From your description, I'd look in a different direction: Could it be that your Sink/Async IO is not processing data (fast enough)? Since you have a bounded watermark strategy, you'd need to see 10s of data being processed before the first watermark is emitted. To test that, can you