Re: Any way to improve list state get performance

2022-11-21 Thread tao xiao
any suggestion is highly appreciated On Tue, Nov 15, 2022 at 8:50 PM tao xiao wrote: > Hi team, > > I have a Flink job that joins two streams, let's say A and B streams, > followed by a key process function. In the key process function the job > inserts elements from B strea

Any way to improve list state get performance

2022-11-15 Thread tao xiao
Hi team, I have a Flink job that joins two streams, let's say A and B streams, followed by a key process function. In the key process function the job inserts elements from B stream to a list state if element from A stream hasn't arrived yet. I am wondering if any way to skip the liststat.get() to

Re: Pojo state schema evolution not working correctly

2022-08-09 Thread tao xiao
> Then the exception will be thrown, right? > > As for the next question about why the value is not updated in RocksDB, > Where you added the debugging log ? In RocksDBListState#get() ? > > Best, > Hangxiang. > > On Wed, Aug 3, 2022 at 5:32 PM tao xiao wrote: > >

Pojo state schema evolution not working correctly

2022-08-03 Thread tao xiao
Hi team, I encountered below exception after I added a new field to a POJO used in list state and resumed the job from checkpoint > [error occurred during error reporting , id > 0xb]\n","stream":"stdout","time": > \n","stream":"stdout","time": > #\n","stream":"stdout","time": > # http://bugre

Re: Incorrect checkpoint id used when job is recovering

2022-05-19 Thread tao xiao
Hi team, Can anyone shed some light? On Sat, May 14, 2022 at 8:56 AM tao xiao wrote: > Hi team, > > Does anyone have any ideas? > > On Thu, May 12, 2022 at 9:20 PM tao xiao wrote: > >> Forgot to mention the Flink version is 1.13.2 and we use kubernetes >> na

Re: Incorrect checkpoint id used when job is recovering

2022-05-13 Thread tao xiao
Hi team, Does anyone have any ideas? On Thu, May 12, 2022 at 9:20 PM tao xiao wrote: > Forgot to mention the Flink version is 1.13.2 and we use kubernetes native > mode > > On Thu, May 12, 2022 at 9:18 PM tao xiao wrote: > >> Hi team, >> >> I met a weird issue

Re: Incorrect checkpoint id used when job is recovering

2022-05-12 Thread tao xiao
Forgot to mention the Flink version is 1.13.2 and we use kubernetes native mode On Thu, May 12, 2022 at 9:18 PM tao xiao wrote: > Hi team, > > I met a weird issue when a job tries to recover from JM failure. The > success checkpoint before JM crashed is 41205 > > ``` > &g

Incorrect checkpoint id used when job is recovering

2022-05-12 Thread tao xiao
Hi team, I met a weird issue when a job tries to recover from JM failure. The success checkpoint before JM crashed is 41205 ``` {"log":"2022-05-10 14:55:40,663 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Completed checkpoint 41205 for job 0

Re: Confusion about rebalance bytes sent metric in Flink UI

2021-12-16 Thread tao xiao
is > not inflating the record size? The number of records seems to work > decently. @pnowojski FYI. > > On Thu, Dec 16, 2021 at 2:20 AM tao xiao wrote: > >> Hi Arvid >> >> The second picture shows the metrics of the upstream operator. The >> upstream has 150 par

Re: Confusion about rebalance bytes sent metric in Flink UI

2021-12-15 Thread tao xiao
rds to one of the parallelism, not all. >> >> If possible could you please explain what your Flink job is doing and >> preferably share your user code so that others can look into this case? >> >> tao xiao 于2021年12月11日周六 01:11写道: >> >>> Hi team, >&g

Confusion about rebalance bytes sent metric in Flink UI

2021-12-10 Thread tao xiao
Hi team, I have one operator that is connected to another 9 downstream operators using rebalance. Each operator has 150 parallelisms[1]. I assume each message in the upstream operation is sent to one of the parallel instances of the 9 receiving operators so the total bytes sent should be roughly 9

Re: RocksDB state not cleaned up

2021-09-17 Thread tao xiao
to trigger manual compaction. > > I'm off to vacation until 27th and I won't be responsive during that time. > I'd like to pull Yun into the conversation as he's super familiar with the > RocksDB state backend. > > [1] > https://github.com/facebook/rocks

Re: RocksDB state not cleaned up

2021-09-16 Thread tao xiao
ed, Sep 15, 2021 at 12:10 AM tao xiao wrote: > Hi David, > > If I read Stephan's comment correctly TTL doesn't work well for cases > where we have too many levels, like fast growing state, as compaction > doesn't clean up high level SST files in time, Is this correct?

Re: RocksDB state not cleaned up

2021-09-14 Thread tao xiao
2]. Can you please check these and validate whether this really is > the case? > > [1] https://github.com/facebook/rocksdb/wiki/Leveled-Compaction > [2] > https://ververica.zendesk.com/hc/en-us/articles/360015933320-How-to-get-RocksDB-s-LOG-file-back-for-advanced-troubleshooting > &

RocksDB state not cleaned up

2021-09-13 Thread tao xiao
Hi team We have a job that uses value state with RocksDB and TTL set to 1 day. The TTL update type is OnCreateAndWrite. We set the value state when the value state doesn't exist and we never update it again after the state is not empty. The key of the value state is timestamp. My understanding of

Re: How to mount PVC volumes using Flink Native Kubernetes ?

2021-09-10 Thread tao xiao
ven if you manage to solve this, EBS is replicated > network storage, therefore rocksdb performance will be affected > significantly. > > Best, > D. > > On Fri 10. 9. 2021 at 16:19, tao xiao wrote: > >> The use case we have is to store the RocksDB sst files in EBS. The

Re: How to mount PVC volumes using Flink Native Kubernetes ?

2021-09-10 Thread tao xiao
The use case we have is to store the RocksDB sst files in EBS. The EC2 instance type (m5) we use doesn't provide local disk storage therefore EBS is the only option to store the local sst file. On Fri, Sep 10, 2021 at 7:10 PM Yang Wang wrote: > I am afraid Flink could not support creating dedica

Re: Exception in snapshotState suppresses subsequent checkpoints

2021-07-01 Thread tao xiao
eb46c4e516d6ee31ef8eb38e2d4359042 > > Best > Yun Tang > -- > *From:* Matthias Pohl > *Sent:* Thursday, July 1, 2021 16:41 > *To:* tao xiao > *Cc:* Yun Tang ; user ; Roman > Khachatryan > *Subject:* Re: Exception in snapshotState suppresses subsequent > check

Re: Exception in snapshotState suppresses subsequent checkpoints

2021-06-30 Thread tao xiao
Hi team, Does anyone have a clue? On Mon, Jun 28, 2021 at 3:27 PM tao xiao wrote: > My job is very simple as you can see from the code I pasted. I simply > print out the number to stdout. If you look at the log the number continued > to print out after checkpoint 1 which indicate

Re: Exception in snapshotState suppresses subsequent checkpoints

2021-06-28 Thread tao xiao
be in high backpressued and > all subsequent checkpoints did not ever run > 'FromElementsFunctionT#snapshotState' which means your code to throw > exception never be executed. You could check those expired checkpoints to > see whether your tasks containing 'FromElemen

Exception in snapshotState suppresses subsequent checkpoints

2021-06-26 Thread tao xiao
Hi team, I run a simple 1.12.1 Flink job in IDE with TolerableCheckpointFailureNumber set where I throw an exception in source function snapshotState intentionally to verify how Flink behaves. What I find is the first checkpoint throws the exception and eventually time out while the main flow cont

Re: Exception in snapshotState suppresses subsequent checkpoints

2021-06-26 Thread tao xiao
97a5cc. (org.apache.flink.runtime.checkpoint.CheckpointCoordinator) 33 34 35 [2021-06-26 16:09:15,349] INFO Checkpoint 2 of job afde4a82f41e8284cb0bfff20497a5cc expired before completing. (org.apache.flink.runtime.checkpoint.CheckpointCoordinator) On Sat, Jun 26, 2021 at 4:36 PM tao xiao wrote: > Hi team, > > I run a simple 1.12

Is it possible to customize avro schema name when using SQL

2021-06-06 Thread tao xiao
Hi team, I want to use avro-confluent to encode the data using SQL but the schema registered by the encoder hard code the schema name to 'record'. is it possible to dictate the name? -- Regards, Tao

Re: avro-confluent supports authentication enabled schema registry

2021-06-02 Thread tao xiao
nk/blob/1db4e560d1b46fac27a18bce9556fec646f063d9/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/CachedSchemaCoderProvider.java#L54 > > On 2. Jun 2021, at 13:57, tao xiao wrote: > > Hi Fabian, > > Unfortunately

Re: avro-confluent supports authentication enabled schema registry

2021-06-02 Thread tao xiao
t; [1] https://issues.apache.org/jira/browse/FLINK-22763 > > > On 2. Jun 2021, at 10:58, tao xiao wrote: > > Hi team, > > Confluent schema registry supports HTTP basic authentication[1] but I > don't find the corresponding configs in Flink documentation[2]. Is this >

avro-confluent supports authentication enabled schema registry

2021-06-02 Thread tao xiao
Hi team, Confluent schema registry supports HTTP basic authentication[1] but I don't find the corresponding configs in Flink documentation[2]. Is this achievable in Flink avro-confluent? [1] https://docs.confluent.io/platform/current/confluent-security-plugins/schema-registry/install.html#authen

Re: classloader.resolve-order is not honored when submitting job to a remote cluster

2021-06-01 Thread tao xiao
k whether it solves > the problem. If yes, then let's quickly get it in. > > [1] https://issues.apache.org/jira/browse/FLINK-21445 > [2] https://github.com/apache/flink/pull/15020 > > Cheers, > Till > > On Sun, May 30, 2021 at 9:41 AM tao xiao wrote: > >&g

classloader.resolve-order is not honored when submitting job to a remote cluster

2021-05-30 Thread tao xiao
Hi team, I discovered that child first class loader is always used to initialize the main program when submitting the job to a yarn cluster using application mode regardless of what value classloader.resolve-order is set in flink-conf.yaml. But this is not the case if I submit the same job with th

Re: No result shown when submitting the SQL in cli

2021-05-11 Thread tao xiao
> While it is distributed mode (either yarn or standalone mode) when you are > in sql-client, you should be able to see the result in TM logs. > > > tao xiao 于2021年5月11日周二 下午11:40写道: > >> Does anyone help with this question? >> >> On Thu, May 6, 2021 at 3:26 PM

Re: No result shown when submitting the SQL in cli

2021-05-11 Thread tao xiao
Does anyone help with this question? On Thu, May 6, 2021 at 3:26 PM tao xiao wrote: > Hi team, > > I wrote a simple SQL job to select data from Kafka. I can see results > printing out in IDE but when I submit the job to a standalone cluster in > CLI there is no result shown. I

No result shown when submitting the SQL in cli

2021-05-06 Thread tao xiao
Hi team, I wrote a simple SQL job to select data from Kafka. I can see results printing out in IDE but when I submit the job to a standalone cluster in CLI there is no result shown. I am sure the job is running well in the cluster with debug log suggesting that the kafka consumer is fetching data

Re: Rich Function Thread Safety

2020-05-07 Thread tao xiao
As the java doc suggests it seems operator method and snapshot checkpoint are accessed by two different threads https://github.com/apache/flink/blob/release-1.10.0/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java#L39-L62 On Thu, May 7, 2020 at

Re: Unable to serialize org.apache.kafka.common.config.types.Password

2018-12-25 Thread tao xiao
G, "LoginModule required > subject=\"test\" secret=\"test\";"); > > The String value will be parsed to Password object.(refer to the method > org.apache.kafka.common.config.ConfigDef.parseType) > > Regards, > Dian > > > 在 2018年12月25日,下

Unable to serialize org.apache.kafka.common.config.types.Password

2018-12-25 Thread tao xiao
Hi team, I am passing a security enabled kafka consumer properties to FlinkKafkaConsumer but keep getting this error java.io.NotSerializableException? what is the best way to handle this? I use Flink 1.7.1 and here is the consumer property that produces the exception props.put(SaslConfigs.SASL_J

Register a user scope metric in window reduce function

2017-01-18 Thread tao xiao
Hi team, Is there any way that I can register a metric in a window reduce function? As per the flink doc getRuntimecontext is only available in RichFunction but window operator doesn't allow RichFunction to be chained. Any way to workaround this?

Re: Restart the job from a checkpoint

2017-01-16 Thread tao xiao
ally activate it via the checkpoint config (see docs). > > Ping me if you have any questions. > > – Ufuk > > > On Mon, Jan 16, 2017 at 5:51 AM, tao xiao wrote: > > Hi team, > > > > Can we restart a flink job from previous successful checkpoint? I know we >

Restart the job from a checkpoint

2017-01-15 Thread tao xiao
Hi team, Can we restart a flink job from previous successful checkpoint? I know we can start a flink from a savepoint but I wonder if I can do it similar by passing the checkpoint path to the flink run command to restore the job from checkpoint.

Re: Kafka topic partition skewness causes watermark not being emitted

2017-01-14 Thread tao xiao
on data > intentional, or only for experimental purposes? > > Best, > Gordon > > On January 12, 2017 at 5:28:40 PM, tao xiao (xiaotao...@gmail.com) wrote: > > Hi team, > > I have a topic with 2 partitions in Kafka. I produced all data to > partition 0 and no data to

Kafka topic partition skewness causes watermark not being emitted

2017-01-12 Thread tao xiao
Hi team, I have a topic with 2 partitions in Kafka. I produced all data to partition 0 and no data to partition 1. I created a Flink job with parallelism to 1 that consumes that topic and count the events with session event window (5 seconds gap). It turned out that the session event window was ne

Re: window function outputs two different values

2017-01-09 Thread tao xiao
nside the reduce function, count the number of data and also emit the data itself to another operator for further processing As the reduce function can only emit the count, I want to know how to also emit the data as well? On Sat, 7 Jan 2017 at 20:30 tao xiao wrote: > Hi team, > > I

window function outputs two different values

2017-01-07 Thread tao xiao
Hi team, I have a requirement that wants to output two different values from a time window reduce function. Here is basic workflow 1. fetch data from Kafka 2. flow the data to a event session window. kafka source -> keyBy -> session window -> reduce 3. inside the reduce function, count the number