Re: Any way to improve list state get performance
any suggestion is highly appreciated On Tue, Nov 15, 2022 at 8:50 PM tao xiao wrote: > Hi team, > > I have a Flink job that joins two streams, let's say A and B streams, > followed by a key process function. In the key process function the job > inserts elements from B stream to a list state if element from A stream > hasn't arrived yet. I am wondering if any way to skip the liststat.get() to > check if there are elements in the list state when A stream arrives to > reduce the call to underlying state (RocksDB) > > Here is the code snippet > > keyfunction { > > process(in, ctx, collector) { > if (in is A stream) > // anyway to check if list state is empty so that we dont need to call > get()? > for (b : liststate.get()) { > . > } > > if (in is B stream) > liststate.add(in) > > > -- > Regards, > Tao > -- Regards, Tao
Any way to improve list state get performance
Hi team, I have a Flink job that joins two streams, let's say A and B streams, followed by a key process function. In the key process function the job inserts elements from B stream to a list state if element from A stream hasn't arrived yet. I am wondering if any way to skip the liststat.get() to check if there are elements in the list state when A stream arrives to reduce the call to underlying state (RocksDB) Here is the code snippet keyfunction { process(in, ctx, collector) { if (in is A stream) // anyway to check if list state is empty so that we dont need to call get()? for (b : liststate.get()) { . } if (in is B stream) liststate.add(in) -- Regards, Tao
Re: Pojo state schema evolution not working correctly
Not exactly reproduciable. I cannot reproduce the problem in a test environment with the same setting. This is only reproducible in PRD. I guess it has something to do with the state itself. Where you added the debugging log ? In RocksDBListState#get() ? > I added the debug log in *org.apache.flink.contrib.streaming.state.RocksDBListState#migrateSerializedValue *that prints message: before migration object size *480* :after object size :*481* and another debug log in org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager.ListElementFilter#nextUnexpiredOffset that prints: object size *480* If I understand the code correctly the first debug log verifies the object has been successfully serliazlied back to RocksDB with new schema however the second debug log suggests the object is still in old schema when the state is queried On Mon, Aug 8, 2022 at 12:28 PM Hangxiang Yu wrote: > Hi, > IIUC, Conditions to reproduce it are: > 1. Using RocksDBStateBackend with incremental strategy > 2. Using ListState in the stateful operator > 3. enabling TTL with cleanupInRocksdbCompactFilter > 4. adding a field to make the job trigger schema evolution > Then the exception will be thrown, right? > > As for the next question about why the value is not updated in RocksDB, > Where you added the debugging log ? In RocksDBListState#get() ? > > Best, > Hangxiang. > > On Wed, Aug 3, 2022 at 5:32 PM tao xiao wrote: > >> Hi team, >> >> I encountered below exception after I added a new field to a POJO used in >> list state and resumed the job from checkpoint >> >> >> >>> [error occurred during error reporting , id >>> 0xb]\n","stream":"stdout","time": >>> \n","stream":"stdout","time": >>> #\n","stream":"stdout","time": >>> # http://bugreport.java.com/bugreport/crash.jsp\n >>> ","stream":"stdout","time": >>> # If you would like to submit a bug report, please >>> visit:\n","stream":"stdout","time": >>> #\n","stream":"stdout","time": >>> # /opt/flink/hs_err_pid1.log\n","stream":"stdout","time": >>> # An error report file with more information is saved >>> as:\n","stream":"stdout","time": >>> #\n","stream":"stdout","time": >>> # Core dump written. Default location: /opt/flink/core or >>> core.1\n","stream":"stdout","time": >>> #\n","stream":"stdout","time": >>> # V [libjvm.so+0x57595f] Exceptions::_throw(Thread*, char const*, int, >>> Handle, char const*)+0x1ef\n","stream":"stdout","time":\# Problematic >>> frame:\n","stream":"stdout","time": >>> # Java VM: OpenJDK 64-Bit Server VM (25.302-b08 mixed mode linux-amd64 >>> compressed oops)\n","stream":"stdout","time": >>> # JRE version: OpenJDK Runtime Environment (8.0_302-b08) (build >>> 1.8.0_302-b08)\n","stream":"stdout","time": >>> #\n","stream":"stdout","time": >>> # SIGSEGV (0xb) at pc=0x7f6a2ad5d95f, pid=1, >>> tid=0x7f69bac9f700\n","stream":"stdout","time": >>> #\n","stream":"stdout","time": >>> # A fatal error has been detected by the Java Runtime >>> Environment:\n","stream":"stdout","time": >>> #\n","stream":"stdout","time":] >>> >>> >>> org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager$ListElementFilter.nextUnexpiredOffset(RocksDbTtlCompactFiltersManager.java:193) >>> >>> org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager$ListElementFilter.nextElementLastAccessTimestamp(RocksDbTtlCompactFiltersManager.java:243) >>> >>> org.apache.flink.api.common.typeutils.CompositeSerializer.deserialize(CompositeSerializer.java:156) >>> at >>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:412) >>> at java.lang.reflect.Field.set(Field.java:764) >>> at >>> sun.reflect.UnsafeIntegerFieldAccessorImpl.set(UnsafeIntegerFieldAccessorImpl.java:80) >>> at >>> sun.reflect.UnsafeFieldAccessorImpl.throw
Pojo state schema evolution not working correctly
Hi team, I encountered below exception after I added a new field to a POJO used in list state and resumed the job from checkpoint > [error occurred during error reporting , id > 0xb]\n","stream":"stdout","time": > \n","stream":"stdout","time": > #\n","stream":"stdout","time": > # http://bugreport.java.com/bugreport/crash.jsp\n > ","stream":"stdout","time": > # If you would like to submit a bug report, please > visit:\n","stream":"stdout","time": > #\n","stream":"stdout","time": > # /opt/flink/hs_err_pid1.log\n","stream":"stdout","time": > # An error report file with more information is saved > as:\n","stream":"stdout","time": > #\n","stream":"stdout","time": > # Core dump written. Default location: /opt/flink/core or > core.1\n","stream":"stdout","time": > #\n","stream":"stdout","time": > # V [libjvm.so+0x57595f] Exceptions::_throw(Thread*, char const*, int, > Handle, char const*)+0x1ef\n","stream":"stdout","time":\# Problematic > frame:\n","stream":"stdout","time": > # Java VM: OpenJDK 64-Bit Server VM (25.302-b08 mixed mode linux-amd64 > compressed oops)\n","stream":"stdout","time": > # JRE version: OpenJDK Runtime Environment (8.0_302-b08) (build > 1.8.0_302-b08)\n","stream":"stdout","time": > #\n","stream":"stdout","time": > # SIGSEGV (0xb) at pc=0x7f6a2ad5d95f, pid=1, > tid=0x7f69bac9f700\n","stream":"stdout","time": > #\n","stream":"stdout","time": > # A fatal error has been detected by the Java Runtime > Environment:\n","stream":"stdout","time": > #\n","stream":"stdout","time":] > > > org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager$ListElementFilter.nextUnexpiredOffset(RocksDbTtlCompactFiltersManager.java:193) > > org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager$ListElementFilter.nextElementLastAccessTimestamp(RocksDbTtlCompactFiltersManager.java:243) > > org.apache.flink.api.common.typeutils.CompositeSerializer.deserialize(CompositeSerializer.java:156) > at > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:412) > at java.lang.reflect.Field.set(Field.java:764) > at > sun.reflect.UnsafeIntegerFieldAccessorImpl.set(UnsafeIntegerFieldAccessorImpl.java:80) > at > sun.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException(UnsafeFieldAccessorImpl.java:171) > at > sun.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException(UnsafeFieldAccessorImpl.java:167) > Caused by: java.lang.IllegalArgumentException: Can not set int field > x.hr to null value > > org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager$ListElementFilter.nextUnexpiredOffset(RocksDbTtlCompactFiltersManager.java:214) > Exception in thread \"Thread-23\" > org.apache.flink.util.FlinkRuntimeException: Failed to deserialize list > element for TTL compaction filter > I verified that Flink recognized the state change Performing state migration for state ListStateDescriptor{name=novimp, > defaultValue=null, > serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@6fe249e5} > because the state serializer's schema, i.e. serialization format, has > changed. > and successfully migrated the state with new POJO class (I updated Flink source code with my own debugging log) before migration object size *480* :after object size :*481* However when the exception occurred the object read from state has byte length of original size not updated size. It appears that the state migration during the state recovery phase didn't take effect or persist to RocksDB. Can you pls give me some pointers to debug this problem further? > object size *480* > Flink version is 1.13.2 RocksDB statebackend with incremental, aligned checkpoint List state with TTL (24H) enabled -- Regards, Tao
Re: Incorrect checkpoint id used when job is recovering
Hi team, Can anyone shed some light? On Sat, May 14, 2022 at 8:56 AM tao xiao wrote: > Hi team, > > Does anyone have any ideas? > > On Thu, May 12, 2022 at 9:20 PM tao xiao wrote: > >> Forgot to mention the Flink version is 1.13.2 and we use kubernetes >> native mode >> >> On Thu, May 12, 2022 at 9:18 PM tao xiao wrote: >> >>> Hi team, >>> >>> I met a weird issue when a job tries to recover from JM failure. The >>> success checkpoint before JM crashed is 41205 >>> >>> ``` >>> >>> {"log":"2022-05-10 14:55:40,663 INFO >>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Completed >>> checkpoint 41205 for job (9453840 bytes in >>> 1922 ms).\n","stream":"stdout","time":"2022-05-10T14:55:40.663286893Z"} >>> >>> ``` >>> >>> However JM tries to recover the job with an old checkpoint 41051 which >>> doesn't exist that leads to unrecoverable state >>> >>> ``` >>> >>> "2022-05-10 14:59:38,949 INFO >>> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - >>> Trying to retrieve checkpoint 41051.\n" >>> >>> ``` >>> >>> Full log attached >>> >>> -- >>> Regards, >>> Tao >>> >> >> >> -- >> Regards, >> Tao >> > > > -- > Regards, > Tao > -- Regards, Tao
Re: Incorrect checkpoint id used when job is recovering
Hi team, Does anyone have any ideas? On Thu, May 12, 2022 at 9:20 PM tao xiao wrote: > Forgot to mention the Flink version is 1.13.2 and we use kubernetes native > mode > > On Thu, May 12, 2022 at 9:18 PM tao xiao wrote: > >> Hi team, >> >> I met a weird issue when a job tries to recover from JM failure. The >> success checkpoint before JM crashed is 41205 >> >> ``` >> >> {"log":"2022-05-10 14:55:40,663 INFO >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Completed >> checkpoint 41205 for job (9453840 bytes in >> 1922 ms).\n","stream":"stdout","time":"2022-05-10T14:55:40.663286893Z"} >> >> ``` >> >> However JM tries to recover the job with an old checkpoint 41051 which >> doesn't exist that leads to unrecoverable state >> >> ``` >> >> "2022-05-10 14:59:38,949 INFO >> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - >> Trying to retrieve checkpoint 41051.\n" >> >> ``` >> >> Full log attached >> >> -- >> Regards, >> Tao >> > > > -- > Regards, > Tao > -- Regards, Tao
Re: Incorrect checkpoint id used when job is recovering
Forgot to mention the Flink version is 1.13.2 and we use kubernetes native mode On Thu, May 12, 2022 at 9:18 PM tao xiao wrote: > Hi team, > > I met a weird issue when a job tries to recover from JM failure. The > success checkpoint before JM crashed is 41205 > > ``` > > {"log":"2022-05-10 14:55:40,663 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Completed > checkpoint 41205 for job (9453840 bytes in > 1922 ms).\n","stream":"stdout","time":"2022-05-10T14:55:40.663286893Z"} > > ``` > > However JM tries to recover the job with an old checkpoint 41051 which > doesn't exist that leads to unrecoverable state > > ``` > > "2022-05-10 14:59:38,949 INFO > org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - > Trying to retrieve checkpoint 41051.\n" > > ``` > > Full log attached > > -- > Regards, > Tao > -- Regards, Tao
Incorrect checkpoint id used when job is recovering
Hi team, I met a weird issue when a job tries to recover from JM failure. The success checkpoint before JM crashed is 41205 ``` {"log":"2022-05-10 14:55:40,663 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Completed checkpoint 41205 for job (9453840 bytes in 1922 ms).\n","stream":"stdout","time":"2022-05-10T14:55:40.663286893Z"} ``` However JM tries to recover the job with an old checkpoint 41051 which doesn't exist that leads to unrecoverable state ``` "2022-05-10 14:59:38,949 INFO org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - Trying to retrieve checkpoint 41051.\n" ``` Full log attached -- Regards, Tao jm.log Description: Binary data
Re: Confusion about rebalance bytes sent metric in Flink UI
>Your upstream is not inflating the record size? No, this is a simply dedup function On Thu, Dec 16, 2021 at 2:49 PM Arvid Heise wrote: > Ah yes I see it now as well. Yes you are right, each record should be > replicated 9 times to send to one of the instances each. Your upstream is > not inflating the record size? The number of records seems to work > decently. @pnowojski FYI. > > On Thu, Dec 16, 2021 at 2:20 AM tao xiao wrote: > >> Hi Arvid >> >> The second picture shows the metrics of the upstream operator. The >> upstream has 150 parallelisms as you can see in the first picture. I expect >> the bytes sent is about 9 * bytes received as we have 9 downstream >> operators connecting. >> >> Hi Caizhi, >> Let me create a minimal reproducible DAG and update here >> >> On Thu, Dec 16, 2021 at 4:03 AM Arvid Heise wrote: >> >>> Hi, >>> >>> Could you please clarify which operator we see in the second picture? >>> >>> If you are showing the upstream operator, then this has only parallelism >>> 1, so there shouldn't be multiple subtasks. >>> If you are showing the downstream operator, then the metric would refer >>> to the HASH and not REBALANCE. >>> >>> On Tue, Dec 14, 2021 at 2:55 AM Caizhi Weng >>> wrote: >>> >>>> Hi! >>>> >>>> This doesn't seem to be the expected behavior. Rebalance shuffle should >>>> send records to one of the parallelism, not all. >>>> >>>> If possible could you please explain what your Flink job is doing and >>>> preferably share your user code so that others can look into this case? >>>> >>>> tao xiao 于2021年12月11日周六 01:11写道: >>>> >>>>> Hi team, >>>>> >>>>> I have one operator that is connected to another 9 downstream >>>>> operators using rebalance. Each operator has 150 parallelisms[1]. I assume >>>>> each message in the upstream operation is sent to one of the parallel >>>>> instances of the 9 receiving operators so the total bytes sent should be >>>>> roughly 9 times of bytes received in the upstream operator metric. However >>>>> the Flink UI shows the bytes sent is much higher than 9 times. It is about >>>>> 150 * 9 * bytes received[2]. This looks to me like every message is >>>>> duplicated to each parallel instance of all receiving operators like what >>>>> broadcast does. Is this correct? >>>>> >>>>> >>>>> >>>>> [1] https://imgur.com/cGyb0QO >>>>> [2] https://imgur.com/SFqPiJA >>>>> -- >>>>> Regards, >>>>> Tao >>>>> >>>> >> >> -- >> Regards, >> Tao >> > -- Regards, Tao
Re: Confusion about rebalance bytes sent metric in Flink UI
Hi Arvid The second picture shows the metrics of the upstream operator. The upstream has 150 parallelisms as you can see in the first picture. I expect the bytes sent is about 9 * bytes received as we have 9 downstream operators connecting. Hi Caizhi, Let me create a minimal reproducible DAG and update here On Thu, Dec 16, 2021 at 4:03 AM Arvid Heise wrote: > Hi, > > Could you please clarify which operator we see in the second picture? > > If you are showing the upstream operator, then this has only parallelism > 1, so there shouldn't be multiple subtasks. > If you are showing the downstream operator, then the metric would refer to > the HASH and not REBALANCE. > > On Tue, Dec 14, 2021 at 2:55 AM Caizhi Weng wrote: > >> Hi! >> >> This doesn't seem to be the expected behavior. Rebalance shuffle should >> send records to one of the parallelism, not all. >> >> If possible could you please explain what your Flink job is doing and >> preferably share your user code so that others can look into this case? >> >> tao xiao 于2021年12月11日周六 01:11写道: >> >>> Hi team, >>> >>> I have one operator that is connected to another 9 downstream operators >>> using rebalance. Each operator has 150 parallelisms[1]. I assume each >>> message in the upstream operation is sent to one of the parallel instances >>> of the 9 receiving operators so the total bytes sent should be roughly 9 >>> times of bytes received in the upstream operator metric. However the Flink >>> UI shows the bytes sent is much higher than 9 times. It is about 150 * 9 * >>> bytes received[2]. This looks to me like every message is duplicated to >>> each parallel instance of all receiving operators like what broadcast >>> does. Is this correct? >>> >>> >>> >>> [1] https://imgur.com/cGyb0QO >>> [2] https://imgur.com/SFqPiJA >>> -- >>> Regards, >>> Tao >>> >> -- Regards, Tao
Confusion about rebalance bytes sent metric in Flink UI
Hi team, I have one operator that is connected to another 9 downstream operators using rebalance. Each operator has 150 parallelisms[1]. I assume each message in the upstream operation is sent to one of the parallel instances of the 9 receiving operators so the total bytes sent should be roughly 9 times of bytes received in the upstream operator metric. However the Flink UI shows the bytes sent is much higher than 9 times. It is about 150 * 9 * bytes received[2]. This looks to me like every message is duplicated to each parallel instance of all receiving operators like what broadcast does. Is this correct? [1] https://imgur.com/cGyb0QO [2] https://imgur.com/SFqPiJA -- Regards, Tao
Re: RocksDB state not cleaned up
Thanks for the feedback! However TTL already proves that the state cannot be cleaned up on time due to too many levels built up in RocksDB. Hi @Yun Tang do you have any suggestions to tune RocksDB to accelerate the compaction progress? On Fri, Sep 17, 2021 at 8:01 PM David Morávek wrote: > Cleaning up with timers should solve this. Both approaches have some > advantages and disadvantages though. > > Timers: > - No "side effects". > - Can be set in event time. Deletes are regular tombstones that will get > compacted later on. > > TTL: > - Performance. This costs literally nothing compared to an extra state for > timer + writing a tombstone marker. > - Has "side-effects", because it works in processing time. This is just > something to keep in mind eg. when bootstraping the state from historical > data. (large event time / processing time skew) > > With 1.14 release, we've bumped the RocksDB version so it may be possible > to use a "periodic compaction" [1], but nobody has tried that so far. In > the meantime I think there is non real workaround because we don't expose a > way to trigger manual compaction. > > I'm off to vacation until 27th and I won't be responsive during that time. > I'd like to pull Yun into the conversation as he's super familiar with the > RocksDB state backend. > > [1] > https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide#periodic-and-ttl-compaction > > Best, > D. > > On Fri, Sep 17, 2021 at 5:17 AM tao xiao wrote: > >> Hi David, >> >> Confirmed with RocksDB log Stephan's observation is the root cause that >> compaction doesn't clean up the high level sst files fast enough. Do you >> think manual clean up by registering a timer is the way to go or any >> RocksDB parameter can be tuned to mitigate this issue? >> >> On Wed, Sep 15, 2021 at 12:10 AM tao xiao wrote: >> >>> Hi David, >>> >>> If I read Stephan's comment correctly TTL doesn't work well for cases >>> where we have too many levels, like fast growing state, as compaction >>> doesn't clean up high level SST files in time, Is this correct? If yes >>> should we register a timer with TTL time and manual clean up the state >>> (state.clear() ) when the timer fires? >>> >>> I will turn on RocksDB logging as well as compaction logging [1] to >>> verify this >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/state.html#cleanup-during-rocksdb-compaction >>> >>> >>> On Tue, Sep 14, 2021 at 5:38 PM David Morávek wrote: >>> >>>> Hi Tao, >>>> >>>> my intuition is that the compaction of SST files is not triggering. By >>>> default, it's only triggered by the size ratios of different levels [1] and >>>> the TTL mechanism has no effect on it. >>>> >>>> Some reasoning from Stephan: >>>> >>>> It's very likely to have large files in higher levels that haven't been >>>>> compacted in a long time and thus just stay around. >>>>> >>>>> This might be especially possible if you insert a lot in the beginning >>>>> (build up many levels) and then have a moderate rate of modifications, so >>>>> the changes and expiration keep happening purely in the merges / >>>>> compactions of the first levels. Then the later levels may stay unchanged >>>>> for quite some time. >>>>> >>>> >>>> You should be able to see compaction details by setting RocksDB logging >>>> to INFO [2]. Can you please check these and validate whether this really is >>>> the case? >>>> >>>> [1] https://github.com/facebook/rocksdb/wiki/Leveled-Compaction >>>> [2] >>>> https://ververica.zendesk.com/hc/en-us/articles/360015933320-How-to-get-RocksDB-s-LOG-file-back-for-advanced-troubleshooting >>>> >>>> Best, >>>> D. >>>> >>>> On Mon, Sep 13, 2021 at 3:18 PM tao xiao wrote: >>>> >>>>> Hi team >>>>> >>>>> We have a job that uses value state with RocksDB and TTL set to 1 day. >>>>> The TTL update type is OnCreateAndWrite. We set the value state when the >>>>> value state doesn't exist and we never update it again after the state is >>>>> not empty. The key of the value state is timestamp. My understanding of >>>>> such TTL settings is that the size of all SST files remains flat (let's >>>>> disregard
Re: RocksDB state not cleaned up
Hi David, Confirmed with RocksDB log Stephan's observation is the root cause that compaction doesn't clean up the high level sst files fast enough. Do you think manual clean up by registering a timer is the way to go or any RocksDB parameter can be tuned to mitigate this issue? On Wed, Sep 15, 2021 at 12:10 AM tao xiao wrote: > Hi David, > > If I read Stephan's comment correctly TTL doesn't work well for cases > where we have too many levels, like fast growing state, as compaction > doesn't clean up high level SST files in time, Is this correct? If yes > should we register a timer with TTL time and manual clean up the state > (state.clear() ) when the timer fires? > > I will turn on RocksDB logging as well as compaction logging [1] to verify > this > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/state.html#cleanup-during-rocksdb-compaction > > > On Tue, Sep 14, 2021 at 5:38 PM David Morávek wrote: > >> Hi Tao, >> >> my intuition is that the compaction of SST files is not triggering. By >> default, it's only triggered by the size ratios of different levels [1] and >> the TTL mechanism has no effect on it. >> >> Some reasoning from Stephan: >> >> It's very likely to have large files in higher levels that haven't been >>> compacted in a long time and thus just stay around. >>> >>> This might be especially possible if you insert a lot in the beginning >>> (build up many levels) and then have a moderate rate of modifications, so >>> the changes and expiration keep happening purely in the merges / >>> compactions of the first levels. Then the later levels may stay unchanged >>> for quite some time. >>> >> >> You should be able to see compaction details by setting RocksDB logging >> to INFO [2]. Can you please check these and validate whether this really is >> the case? >> >> [1] https://github.com/facebook/rocksdb/wiki/Leveled-Compaction >> [2] >> https://ververica.zendesk.com/hc/en-us/articles/360015933320-How-to-get-RocksDB-s-LOG-file-back-for-advanced-troubleshooting >> >> Best, >> D. >> >> On Mon, Sep 13, 2021 at 3:18 PM tao xiao wrote: >> >>> Hi team >>> >>> We have a job that uses value state with RocksDB and TTL set to 1 day. >>> The TTL update type is OnCreateAndWrite. We set the value state when the >>> value state doesn't exist and we never update it again after the state is >>> not empty. The key of the value state is timestamp. My understanding of >>> such TTL settings is that the size of all SST files remains flat (let's >>> disregard the impact space amplification brings) after 1 day as the daily >>> data volume is more or less the same. However the RocksDB native metrics >>> show that the SST files continue to grow since I started the job. I check >>> the SST files in local storage and I can see SST files with age 1 months >>> ago (when I started the job). What is the possible reason for the SST files >>> not cleaned up?. >>> >>> The Flink version is 1.12.1 >>> State backend is RocksDB with incremental checkpoint >>> All default configuration for RocksDB >>> Per job mode in Yarn and checkpoint to S3 >>> >>> >>> Here is the code to set value state >>> >>> public void open(Configuration parameters) { >>> StateTtlConfig ttlConfigClick = StateTtlConfig >>> .newBuilder(Time.days(1)) >>> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) >>> >>> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) >>> .cleanupInRocksdbCompactFilter(300_000) >>> .build(); >>> ValueStateDescriptor clickStateDescriptor = new >>> ValueStateDescriptor<>("click", Click.class); >>> clickStateDescriptor.enableTimeToLive(ttlConfigClick); >>> clickState = getRuntimeContext().getState(clickStateDescriptor); >>> >>> StateTtlConfig ttlConfigAds = StateTtlConfig >>> .newBuilder(Time.days(1)) >>> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) >>> >>> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) >>> .cleanupInRocksdbCompactFilter(30_000_000) >>> .build(); >>> ValueStateDescriptor adsStateDescriptor = new >>> ValueStateDescriptor<>("ads", slimAdsClass); >>> adsStateDescriptor
Re: RocksDB state not cleaned up
Hi David, If I read Stephan's comment correctly TTL doesn't work well for cases where we have too many levels, like fast growing state, as compaction doesn't clean up high level SST files in time, Is this correct? If yes should we register a timer with TTL time and manual clean up the state (state.clear() ) when the timer fires? I will turn on RocksDB logging as well as compaction logging [1] to verify this [1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/state.html#cleanup-during-rocksdb-compaction On Tue, Sep 14, 2021 at 5:38 PM David Morávek wrote: > Hi Tao, > > my intuition is that the compaction of SST files is not triggering. By > default, it's only triggered by the size ratios of different levels [1] and > the TTL mechanism has no effect on it. > > Some reasoning from Stephan: > > It's very likely to have large files in higher levels that haven't been >> compacted in a long time and thus just stay around. >> >> This might be especially possible if you insert a lot in the beginning >> (build up many levels) and then have a moderate rate of modifications, so >> the changes and expiration keep happening purely in the merges / >> compactions of the first levels. Then the later levels may stay unchanged >> for quite some time. >> > > You should be able to see compaction details by setting RocksDB logging to > INFO [2]. Can you please check these and validate whether this really is > the case? > > [1] https://github.com/facebook/rocksdb/wiki/Leveled-Compaction > [2] > https://ververica.zendesk.com/hc/en-us/articles/360015933320-How-to-get-RocksDB-s-LOG-file-back-for-advanced-troubleshooting > > Best, > D. > > On Mon, Sep 13, 2021 at 3:18 PM tao xiao wrote: > >> Hi team >> >> We have a job that uses value state with RocksDB and TTL set to 1 day. >> The TTL update type is OnCreateAndWrite. We set the value state when the >> value state doesn't exist and we never update it again after the state is >> not empty. The key of the value state is timestamp. My understanding of >> such TTL settings is that the size of all SST files remains flat (let's >> disregard the impact space amplification brings) after 1 day as the daily >> data volume is more or less the same. However the RocksDB native metrics >> show that the SST files continue to grow since I started the job. I check >> the SST files in local storage and I can see SST files with age 1 months >> ago (when I started the job). What is the possible reason for the SST files >> not cleaned up?. >> >> The Flink version is 1.12.1 >> State backend is RocksDB with incremental checkpoint >> All default configuration for RocksDB >> Per job mode in Yarn and checkpoint to S3 >> >> >> Here is the code to set value state >> >> public void open(Configuration parameters) { >> StateTtlConfig ttlConfigClick = StateTtlConfig >> .newBuilder(Time.days(1)) >> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) >> >> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) >> .cleanupInRocksdbCompactFilter(300_000) >> .build(); >> ValueStateDescriptor clickStateDescriptor = new >> ValueStateDescriptor<>("click", Click.class); >> clickStateDescriptor.enableTimeToLive(ttlConfigClick); >> clickState = getRuntimeContext().getState(clickStateDescriptor); >> >> StateTtlConfig ttlConfigAds = StateTtlConfig >> .newBuilder(Time.days(1)) >> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) >> >> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) >> .cleanupInRocksdbCompactFilter(30_000_000) >> .build(); >> ValueStateDescriptor adsStateDescriptor = new >> ValueStateDescriptor<>("ads", slimAdsClass); >> adsStateDescriptor.enableTimeToLive(ttlConfigAds); >> adsState = getRuntimeContext().getState(adsStateDescriptor); >> } >> >> @Override >> public void processElement(Tuple3 tuple, Context ctx, >> Collector collector) throws Exception { >> if (tuple.f1 != null) { >> Click click = tuple.f1; >> >> if (clickState.value() != null) { >> return; >> } >> >> clickState.update(click); >> >> A adsFromState = adsState.value(); >> if (adsFromState != null) { >> collector.collect(adsFromState); >> } >> } else { >> A ads = tuple.f2;
RocksDB state not cleaned up
Hi team We have a job that uses value state with RocksDB and TTL set to 1 day. The TTL update type is OnCreateAndWrite. We set the value state when the value state doesn't exist and we never update it again after the state is not empty. The key of the value state is timestamp. My understanding of such TTL settings is that the size of all SST files remains flat (let's disregard the impact space amplification brings) after 1 day as the daily data volume is more or less the same. However the RocksDB native metrics show that the SST files continue to grow since I started the job. I check the SST files in local storage and I can see SST files with age 1 months ago (when I started the job). What is the possible reason for the SST files not cleaned up?. The Flink version is 1.12.1 State backend is RocksDB with incremental checkpoint All default configuration for RocksDB Per job mode in Yarn and checkpoint to S3 Here is the code to set value state public void open(Configuration parameters) { StateTtlConfig ttlConfigClick = StateTtlConfig .newBuilder(Time.days(1)) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .cleanupInRocksdbCompactFilter(300_000) .build(); ValueStateDescriptor clickStateDescriptor = new ValueStateDescriptor<>("click", Click.class); clickStateDescriptor.enableTimeToLive(ttlConfigClick); clickState = getRuntimeContext().getState(clickStateDescriptor); StateTtlConfig ttlConfigAds = StateTtlConfig .newBuilder(Time.days(1)) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .cleanupInRocksdbCompactFilter(30_000_000) .build(); ValueStateDescriptor adsStateDescriptor = new ValueStateDescriptor<>("ads", slimAdsClass); adsStateDescriptor.enableTimeToLive(ttlConfigAds); adsState = getRuntimeContext().getState(adsStateDescriptor); } @Override public void processElement(Tuple3 tuple, Context ctx, Collector collector) throws Exception { if (tuple.f1 != null) { Click click = tuple.f1; if (clickState.value() != null) { return; } clickState.update(click); A adsFromState = adsState.value(); if (adsFromState != null) { collector.collect(adsFromState); } } else { A ads = tuple.f2; if (adsState.value() != null) { return; } adsState.update(ads); Click clickFromState = clickState.value(); if (clickFromState != null) { collector.collect(ads); } } } Here is the snippet of sst files in local storage [root@ db]# ll | head -n10 total 76040068 -rw-r- 1 hadoop yarn0 Aug 16 08:46 03.log -rw-r- 1 hadoop yarn 67700362 Aug 17 02:38 001763.sst -rw-r- 1 hadoop yarn 67698753 Aug 17 02:38 001764.sst -rw-r- 1 hadoop yarn 67699769 Aug 17 02:59 001790.sst -rw-r- 1 hadoop yarn 67701239 Aug 17 04:58 002149.sst -rw-r- 1 hadoop yarn 67700607 Aug 17 04:58 002150.sst -rw-r- 1 hadoop yarn 67697524 Aug 17 04:59 002151.sst -rw-r- 1 hadoop yarn 67700729 Aug 17 06:20 002373.sst -rw-r- 1 hadoop yarn 67700296 Aug 17 06:20 002374.sst -- Regards, Tao
Re: How to mount PVC volumes using Flink Native Kubernetes ?
Thanks David for the tips. We have been running Flink with no performance degradation observed in EMR (which is EBS attached) for more than 1 year therefore we believe the same performance can be applied in Kubernetes. On Sat, Sep 11, 2021 at 3:13 AM David Morávek wrote: > OT: Beware that even if you manage to solve this, EBS is replicated > network storage, therefore rocksdb performance will be affected > significantly. > > Best, > D. > > On Fri 10. 9. 2021 at 16:19, tao xiao wrote: > >> The use case we have is to store the RocksDB sst files in EBS. The EC2 >> instance type (m5) we use doesn't provide local disk storage therefore EBS >> is the only option to store the local sst file. >> >> On Fri, Sep 10, 2021 at 7:10 PM Yang Wang wrote: >> >>> I am afraid Flink could not support creating dedicated PVC for each >>> TaskManager pod now. >>> But I think it might be a reasonable requirement. >>> >>> Could you please share why you need to mount a persistent volume claim >>> per TaskManager? >>> AFAIK, the TaskManager will be deleted once it fails. You expect the PVC >>> to also be deleted. Right? >>> >>> >>> Best, >>> Yang >>> >>> Xiaolong Wang 于2021年9月10日周五 下午2:37写道: >>> >>>> Hi, >>>> >>>>I'm facing a tough question. I want to start a Flink Native >>>> Kubernetes job with each of the task manager pod mounted with an aws-ebs >>>> PVC. >>>> >>>> The first thought is to use the pod-template file to do this, but it >>>> soon went to a dead end. Since the pod-template on each of the task manager >>>> pod is the same, how can I mount different PVCs ? >>>> >>>>This issue is quite puzzling, will you please help me ? >>>> >>>> Thanks in advance ! >>>> >>> >> >> -- >> Regards, >> Tao >> > -- Regards, Tao
Re: How to mount PVC volumes using Flink Native Kubernetes ?
The use case we have is to store the RocksDB sst files in EBS. The EC2 instance type (m5) we use doesn't provide local disk storage therefore EBS is the only option to store the local sst file. On Fri, Sep 10, 2021 at 7:10 PM Yang Wang wrote: > I am afraid Flink could not support creating dedicated PVC for each > TaskManager pod now. > But I think it might be a reasonable requirement. > > Could you please share why you need to mount a persistent volume claim per > TaskManager? > AFAIK, the TaskManager will be deleted once it fails. You expect the PVC > to also be deleted. Right? > > > Best, > Yang > > Xiaolong Wang 于2021年9月10日周五 下午2:37写道: > >> Hi, >> >>I'm facing a tough question. I want to start a Flink Native Kubernetes >> job with each of the task manager pod mounted with an aws-ebs PVC. >> >> The first thought is to use the pod-template file to do this, but it >> soon went to a dead end. Since the pod-template on each of the task manager >> pod is the same, how can I mount different PVCs ? >> >>This issue is quite puzzling, will you please help me ? >> >> Thanks in advance ! >> > -- Regards, Tao
Re: Exception in snapshotState suppresses subsequent checkpoints
Thanks for the pointer. let me try upgrading the flink On Thu, Jul 1, 2021 at 5:29 PM Yun Tang wrote: > Hi Tao, > > I run your program with Flink-1.12.1 and found the problem you described > really existed. And things would go normal if switching to Flink-1.12.2 > version. > > After dig into the root cause, I found this is caused by a fixed bug > [1]: If a legacy source task fails outside of the legacy thread, the legacy > thread blocks proper cancellation (completion future never completed). As > you throw the NPE within the source operator, it will never exit and cannot > handle subsequent checkpoint requirements then. That's why you see all > subsequent checkpoints cannot finish. > > > [1] > https://github.com/apache/flink/commit/b332ce40d88be84d9cf896f446c7c6e26dbc8b6a#diff-54e9ce3b15d6badcc9376ab144df066eb46c4e516d6ee31ef8eb38e2d4359042 > > Best > Yun Tang > -- > *From:* Matthias Pohl > *Sent:* Thursday, July 1, 2021 16:41 > *To:* tao xiao > *Cc:* Yun Tang ; user ; Roman > Khachatryan > *Subject:* Re: Exception in snapshotState suppresses subsequent > checkpoints > > Hi Tao, > it looks like it should work considering that you have a sleep of 1 second > before each emission. I'm going to add Roman to this thread. Maybe, he has > sees something obvious which I'm missing. > Could you run the job with the log level set to debug and provide the logs > once more? Additionally, having both the TaskManager's and the > JobManager's logs available would help in understanding what's going on. > > Best, > Matthias > > On Wed, Jun 30, 2021 at 6:14 PM tao xiao wrote: > > Hi team, > > Does anyone have a clue? > > On Mon, Jun 28, 2021 at 3:27 PM tao xiao wrote: > > My job is very simple as you can see from the code I pasted. I simply > print out the number to stdout. If you look at the log the number continued > to print out after checkpoint 1 which indicated no back pressure was > happening. It is very easy to reproduce this if you run the code I > provided in IDE > > > LOG > > [2021-06-26 16:08:52,352] INFO Triggering checkpoint 1 (type=CHECKPOINT) @ > 1624694932345 for job afde4a82f41e8284cb0bfff20497a5cc. > (org.apache.flink.runtime.checkpoint.CheckpointCoordinator) > [2021-06-26 16:08:52,372] INFO Could not complete snapshot 1 for operator > Source: Custom Source -> Sink: Print to Std. Out (1/1)#0. Failure reason: > Checkpoint was declined. > (org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl) > org.apache.flink.runtime.checkpoint.CheckpointException: Could not > complete snapshot 1 for operator Source: Custom Source -> Sink: Print to > Std. Out (1/1)#0. Failure reason: Checkpoint was declined. > at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:241) > ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:162) > ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371) > ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:685) > ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:606) > ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:571) > ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:298) > ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$9(StreamTask.java:1003) > ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93) > ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:993) > ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:912) > ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at >
Re: Exception in snapshotState suppresses subsequent checkpoints
Hi team, Does anyone have a clue? On Mon, Jun 28, 2021 at 3:27 PM tao xiao wrote: > My job is very simple as you can see from the code I pasted. I simply > print out the number to stdout. If you look at the log the number continued > to print out after checkpoint 1 which indicated no back pressure was > happening. It is very easy to reproduce this if you run the code I > provided in IDE > > > LOG > > [2021-06-26 16:08:52,352] INFO Triggering checkpoint 1 (type=CHECKPOINT) @ > 1624694932345 for job afde4a82f41e8284cb0bfff20497a5cc. > (org.apache.flink.runtime.checkpoint.CheckpointCoordinator) > [2021-06-26 16:08:52,372] INFO Could not complete snapshot 1 for operator > Source: Custom Source -> Sink: Print to Std. Out (1/1)#0. Failure reason: > Checkpoint was declined. > (org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl) > org.apache.flink.runtime.checkpoint.CheckpointException: Could not > complete snapshot 1 for operator Source: Custom Source -> Sink: Print to > Std. Out (1/1)#0. Failure reason: Checkpoint was declined. > at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:241) > ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:162) > ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371) > ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:685) > ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:606) > ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:571) > ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:298) > ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$9(StreamTask.java:1003) > ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93) > ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:993) > ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:912) > ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$8(StreamTask.java:880) > ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93) > [flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) > [flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317) > [flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189) > [flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609) > [flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573) > [flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) > [flink-runtime_2.11-1.12.1.jar:1.12.1] > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) > [flink-runtime_2.11-1.12.1.jar:1.12.1] > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_261] > Caused by: org.apache.flink.util.SerializedThrowable: npe > at > com.smartnews.dp.kafka.sample.flink.FromElementsFunctionT.snapshotState(FromElementsFunctionT.java:111) > ~[classes/:?] > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(
Re: Exception in snapshotState suppresses subsequent checkpoints
[flink-streaming-java_2.11-1.12.1.jar:1.12.1] ... 20 more [2021-06-26 16:08:55,357] INFO Checkpoint 1 of job afde4a82f41e8284cb0bfff20497a5cc expired before completing. (org.apache.flink.runtime.checkpoint.CheckpointCoordinator) output [2021-06-26 16:09:12,347] INFO Triggering checkpoint 2 (type=CHECKPOINT) @ 1624694952346 for job afde4a82f41e8284cb0bfff20497a5cc. (org.apache.flink.runtime.checkpoint.CheckpointCoordinator) 33 34 35 [2021-06-26 16:09:15,349] INFO Checkpoint 2 of job afde4a82f41e8284cb0bfff20497a5cc expired before completing. (org.apache.flink.runtime.checkpoint.CheckpointCoordinator) 36 37 38 Main function StreamExecutionEnvironment env = StreamExecutionEnvironment. getExecutionEnvironment(); env.setStateBackend(new FsStateBackend("file:///tmp/chpk", true)); env.enableCheckpointing(20_000, CheckpointingMode.AT_LEAST_ONCE); env.getCheckpointConfig().setCheckpointTimeout(3_000); env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1000); env.addSource(new FromElementsFunctionT()) .setParallelism(1) .print() .setParallelism(1); env.execute("Demo"); source funciton package sample.flink; import java.util.ArrayList; import java.util.List; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.util.Preconditions; /** * A stream source function that returns a sequence of elements. * * Upon construction, this source function serializes the elements using Flink's type * information. That way, any object transport using Java serialization will not be affected by the * serializability of the elements. * * NOTE: This source has a parallelism of 1. * */ @PublicEvolving public class FromElementsFunctionT implements SourceFunction, CheckpointedFunction { private static final long serialVersionUID = 1L; /** The number of elements emitted already. */ private volatile int numElementsEmitted; /** Flag to make the source cancelable. */ private volatile boolean isRunning = true; private transient ListState checkpointedState; @Override public void initializeState(FunctionInitializationContext context) throws Exception { Preconditions.checkState( this.checkpointedState == null, "The " + getClass().getSimpleName() + " has already been initialized."); this.checkpointedState = context.getOperatorStateStore() .getListState( new ListStateDescriptor<>( "from-elements-state", IntSerializer.INSTANCE)); if (context.isRestored()) { List retrievedStates = new ArrayList<>(); for (Integer entry : this.checkpointedState.get()) { retrievedStates.add(entry); } // given that the parallelism of the function is 1, we can only have 1 state Preconditions.checkArgument( retrievedStates.size() == 1, getClass().getSimpleName() + " retrieved invalid state."); this.numElementsEmitted = retrievedStates.get(0); } } @Override public void run(SourceContext ctx) throws Exception { final Object lock = ctx.getCheckpointLock(); while (isRunning && numElementsEmitted < Integer.MAX_VALUE) { Thread.sleep(1000); synchronized (lock) { ctx.collect(numElementsEmitted++); } } } @Override public void cancel() { isRunning = false; } // // Checkpointing // @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { Preconditions.checkState( this.checkpointedState != null, "The " + getClass().getSimpleName() + " has not been properly initialized.") ; this.checkpointedState.clear(); this.checkpointedState.add(this.numElementsEmitted); throw new NullPointerException("npe"); } } On Mon, Jun 28, 2021 at 2:36 PM Yun Tang wrote: > Hi Tao, > > I'm afraid that your Flink job continues to be in high backpressued and > all subsequent checkpoints did not ever run > 'FromElementsFunctionT#snapshotState' which means your code to throw > exception never be executed. You could check those expired checkpoints to > see whether your tasks containing 'FromElementsFunctionT' has ever been > completed. > > Best > Yun Tang > -- > *From:* tao xiao > *Sent:* Saturday, June 26, 2021 16:40 > *To:* user > *Subject:* Re: Exception in snapshotState suppresses subsequent > checkpoints > > Btw here is the checkpoint related log > > [2021-06-26 16:08:52,352] INFO Triggering checkpoint 1 (type=CHECKPOINT) @ > 1624694932345 for
Exception in snapshotState suppresses subsequent checkpoints
Hi team, I run a simple 1.12.1 Flink job in IDE with TolerableCheckpointFailureNumber set where I throw an exception in source function snapshotState intentionally to verify how Flink behaves. What I find is the first checkpoint throws the exception and eventually time out while the main flow continues to work. This is expected however all subsequent checkpoints don't reach the exception anymore and report timeout when timeout reaches. I want to know if this is expected behavior which all later checkpoints cannot finish if there is one checkpoint that throws exception. Below is the code the reproduce the behavior main StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(new FsStateBackend("file:///tmp/chpk", true)); env.enableCheckpointing(20_000, CheckpointingMode.AT_LEAST_ONCE); env.getCheckpointConfig().setCheckpointTimeout(3_000); env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1000); env.addSource(new FromElementsFunctionT()) .setParallelism(1) .print() .setParallelism(1); env.execute("Demo"); Source function /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * *http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.smartnews.dp.kafka.sample.flink; import java.util.ArrayList; import java.util.List; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.util.Preconditions; /** * A stream source function that returns a sequence of elements. * * Upon construction, this source function serializes the elements using Flink's type * information. That way, any object transport using Java serialization will not be affected by the * serializability of the elements. * * NOTE: This source has a parallelism of 1. * */ @PublicEvolving public class FromElementsFunctionT implements SourceFunction, CheckpointedFunction { private static final long serialVersionUID = 1L; /** The number of elements emitted already. */ private volatile int numElementsEmitted; /** Flag to make the source cancelable. */ private volatile boolean isRunning = true; private transient ListState checkpointedState; @Override public void initializeState(FunctionInitializationContext context) throws Exception { Preconditions.checkState( this.checkpointedState == null, "The " + getClass().getSimpleName() + " has already been initialized."); this.checkpointedState = context.getOperatorStateStore() .getListState( new ListStateDescriptor<>( "from-elements-state", IntSerializer.INSTANCE)); if (context.isRestored()) { List retrievedStates = new ArrayList<>(); for (Integer entry : this.checkpointedState.get()) { retrievedStates.add(entry); } // given that the parallelism of the function is 1, we can only have 1 state Preconditions.checkArgument( retrievedStates.size() == 1, getClass().getSimpleName() + " retrieved invalid state."); this.numElementsEmitted = retrievedStates.get(0); } } @Override public void run(SourceContext ctx) throws Exception { final Object lock = ctx.getCheckpointLock(); while (isRunning && numElementsEmitted < Integer.MAX_VALUE) { Thread.sleep(1000); synchronized (lock) { ctx.collect(numElementsEmitted++); } } } @Override public void cancel() { isRunning = false; } // // Checkpointing //
Re: Exception in snapshotState suppresses subsequent checkpoints
KPOINT) @ 1624694952346 for job afde4a82f41e8284cb0bfff20497a5cc. (org.apache.flink.runtime.checkpoint.CheckpointCoordinator) 33 34 35 [2021-06-26 16:09:15,349] INFO Checkpoint 2 of job afde4a82f41e8284cb0bfff20497a5cc expired before completing. (org.apache.flink.runtime.checkpoint.CheckpointCoordinator) On Sat, Jun 26, 2021 at 4:36 PM tao xiao wrote: > Hi team, > > I run a simple 1.12.1 Flink job in IDE with > TolerableCheckpointFailureNumber set where I throw an exception in source > function snapshotState intentionally to verify how Flink behaves. What I > find is the first checkpoint throws the exception and eventually time out > while the main flow continues to work. This is expected however all > subsequent checkpoints don't reach the exception anymore and report timeout > when timeout reaches. I want to know if this is expected behavior which all > later checkpoints cannot finish if there is one checkpoint that throws > exception. > > Below is the code the reproduce the behavior > main > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setStateBackend(new FsStateBackend("file:///tmp/chpk", true)); > env.enableCheckpointing(20_000, CheckpointingMode.AT_LEAST_ONCE); > env.getCheckpointConfig().setCheckpointTimeout(3_000); > env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1000); > > env.addSource(new FromElementsFunctionT()) > .setParallelism(1) > .print() > .setParallelism(1); > env.execute("Demo"); > > > Source function > > /* > * Licensed to the Apache Software Foundation (ASF) under one or more > * contributor license agreements. See the NOTICE file distributed with > * this work for additional information regarding copyright ownership. > * The ASF licenses this file to You under the Apache License, Version 2.0 > * (the "License"); you may not use this file except in compliance with > * the License. You may obtain a copy of the License at > * > *http://www.apache.org/licenses/LICENSE-2.0 > * > * Unless required by applicable law or agreed to in writing, software > * distributed under the License is distributed on an "AS IS" BASIS, > * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > * See the License for the specific language governing permissions and > * limitations under the License. > */ > > package sample.flink; > > import java.util.ArrayList; > import java.util.List; > import org.apache.flink.annotation.PublicEvolving; > import org.apache.flink.api.common.state.ListState; > import org.apache.flink.api.common.state.ListStateDescriptor; > import org.apache.flink.api.common.typeutils.base.IntSerializer; > import org.apache.flink.runtime.state.FunctionInitializationContext; > import org.apache.flink.runtime.state.FunctionSnapshotContext; > import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; > import org.apache.flink.streaming.api.functions.source.SourceFunction; > import org.apache.flink.util.Preconditions; > > /** > * A stream source function that returns a sequence of elements. > * > * Upon construction, this source function serializes the elements using > Flink's type > * information. That way, any object transport using Java serialization will > not be affected by the > * serializability of the elements. > * > * NOTE: This source has a parallelism of 1. > * > */ > @PublicEvolving > public class FromElementsFunctionT implements SourceFunction, > CheckpointedFunction { > > private static final long serialVersionUID = 1L; > > /** The number of elements emitted already. */ > private volatile int numElementsEmitted; > > /** Flag to make the source cancelable. */ > private volatile boolean isRunning = true; > > private transient ListState checkpointedState; > > @Override > public void initializeState(FunctionInitializationContext context) throws > Exception { > Preconditions.checkState( > this.checkpointedState == null, > "The " + getClass().getSimpleName() + " has already been > initialized."); > > this.checkpointedState = > context.getOperatorStateStore() > .getListState( > new ListStateDescriptor<>( > "from-elements-state", > IntSerializer.INSTANCE)); > > if (context.isRestored()) { > List retrievedStates = new ArrayList<>(); > for (Integer entry : this.checkpointedState.get()) { > retrievedStates.add(entry); >
Is it possible to customize avro schema name when using SQL
Hi team, I want to use avro-confluent to encode the data using SQL but the schema registered by the encoder hard code the schema name to 'record'. is it possible to dictate the name? -- Regards, Tao
Re: avro-confluent supports authentication enabled schema registry
JIRA created https://issues.apache.org/jira/browse/FLINK-22858 but I cannot assign it to myself. Can you pls assign it to me? On Wed, Jun 2, 2021 at 11:00 PM Fabian Paul wrote: > Hi Tao, > > I was browsing the code a bit and I think this is currently not support > but it seems to be not too > difficult to implement. You would need to allow a map of configurations > and finally pass it to [1] > > Can you create a ticket in our JIRA? > Would you be willing to contribute this feature? > > Best, > Fabian > > > [1] > https://github.com/apache/flink/blob/1db4e560d1b46fac27a18bce9556fec646f063d9/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/CachedSchemaCoderProvider.java#L54 > > On 2. Jun 2021, at 13:57, tao xiao wrote: > > Hi Fabian, > > Unfortunately this will not work in our environment where we implement our > own > io.confluent.kafka.schemaregistry.client.security.bearerauth.BearerAuthCredentialProvider > which does the login and supplies the JWT to authorization HTTP header. The > only way it will work is to pass the schema registry > config BEARER_AUTH_CREDENTIALS_SOURCE [1] to table format factory > > [1] > https://github.com/confluentinc/schema-registry/blob/master/schema-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaSchemaSerDeConfig.java#L85 > > On Wed, Jun 2, 2021 at 5:27 PM Fabian Paul > wrote: > >> Hi Tao, >> >> Thanks for reaching out. Have you tried the following >> >> 'value.avro-confluent.schema-registry.url' = >> 'https://{SR_API_KEY}:{SR_API_SECRET}@psrc-lzvd0.ap-southeast-2.aws.confluent.cloud', >> >> >> >> It may be possible to provide basic HTTP authentication by adding your >> username and password to the URL. There is already a similar ticket open >> unfortunately without much progress. [1] >> Please let me know if this works for you otherwise we can try to find a >> different solution. >> >> Best, >> Fabian >> >> [1] https://issues.apache.org/jira/browse/FLINK-22763 >> >> >> On 2. Jun 2021, at 10:58, tao xiao wrote: >> >> Hi team, >> >> Confluent schema registry supports HTTP basic authentication[1] but I >> don't find the corresponding configs in Flink documentation[2]. Is this >> achievable in Flink avro-confluent? >> >> >> [1] >> https://docs.confluent.io/platform/current/confluent-security-plugins/schema-registry/install.html#authentication-mechanisms >> [2] >> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/formats/avro-confluent/#format-options >> -- >> Regards, >> Tao >> >> >> > > -- > Regards, > Tao > > > -- Regards, Tao
Re: avro-confluent supports authentication enabled schema registry
Hi Fabian, Unfortunately this will not work in our environment where we implement our own io.confluent.kafka.schemaregistry.client.security.bearerauth.BearerAuthCredentialProvider which does the login and supplies the JWT to authorization HTTP header. The only way it will work is to pass the schema registry config BEARER_AUTH_CREDENTIALS_SOURCE [1] to table format factory [1] https://github.com/confluentinc/schema-registry/blob/master/schema-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaSchemaSerDeConfig.java#L85 On Wed, Jun 2, 2021 at 5:27 PM Fabian Paul wrote: > Hi Tao, > > Thanks for reaching out. Have you tried the following > > 'value.avro-confluent.schema-registry.url' = > 'https://{SR_API_KEY}:{SR_API_SECRET}@psrc-lzvd0.ap-southeast-2.aws.confluent.cloud', > > > > It may be possible to provide basic HTTP authentication by adding your > username and password to the URL. There is already a similar ticket open > unfortunately without much progress. [1] > Please let me know if this works for you otherwise we can try to find a > different solution. > > Best, > Fabian > > [1] https://issues.apache.org/jira/browse/FLINK-22763 > > > On 2. Jun 2021, at 10:58, tao xiao wrote: > > Hi team, > > Confluent schema registry supports HTTP basic authentication[1] but I > don't find the corresponding configs in Flink documentation[2]. Is this > achievable in Flink avro-confluent? > > > [1] > https://docs.confluent.io/platform/current/confluent-security-plugins/schema-registry/install.html#authentication-mechanisms > [2] > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/formats/avro-confluent/#format-options > -- > Regards, > Tao > > > -- Regards, Tao
avro-confluent supports authentication enabled schema registry
Hi team, Confluent schema registry supports HTTP basic authentication[1] but I don't find the corresponding configs in Flink documentation[2]. Is this achievable in Flink avro-confluent? [1] https://docs.confluent.io/platform/current/confluent-security-plugins/schema-registry/install.html#authentication-mechanisms [2] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/formats/avro-confluent/#format-options -- Regards, Tao
Re: classloader.resolve-order is not honored when submitting job to a remote cluster
Hi Till, The PR covers the problem and it will fix the inconsistent class loading issue On Tue, Jun 1, 2021 at 9:55 PM Till Rohrmann wrote: > Hi Tao, > > I think this looks like a bug to me. Could it be that this problem is > covered by [1, 2]? Maybe you can review this PR and check whether it solves > the problem. If yes, then let's quickly get it in. > > [1] https://issues.apache.org/jira/browse/FLINK-21445 > [2] https://github.com/apache/flink/pull/15020 > > Cheers, > Till > > On Sun, May 30, 2021 at 9:41 AM tao xiao wrote: > >> Hi team, >> >> I discovered that child first class loader is always used to initialize >> the main program when submitting the job to a yarn cluster using >> application mode regardless of what value classloader.resolve-order is set >> in flink-conf.yaml. But this is not the case if I submit the same job with >> the same config to the local cluster which honors the config and use the >> correct class loader to load the main program. Here is the log from local >> cluster >> >> 2021-05-30 15:01:16,372 INFO org.apache.flink.client.cli.CliFrontend >> [] - >> >> 2021-05-30 15:01:16,375 INFO org.apache.flink.client.cli.CliFrontend >> [] - Starting Command Line Client (Version: 1.12.1, >> Scala: 2.11, Rev:dc404e2, Date:2021-01-09T14:46:36+01:00) >> [trim down the log] >> *2021-05-30 15:01:16,616 INFO >> org.apache.flink.configuration.GlobalConfiguration [] - Loading >> configuration property: classloader.resolve-order, parent-first* >> 2021-05-30 15:01:16,763 WARN org.apache.flink.runtime.util.HadoopUtils >> [] - Could not find Hadoop configuration via any of the >> supported methods (Flink configuration, environment variables). >> [trim down the log] >> 2021-05-30 15:01:16,830 INFO org.apache.flink.client.ClientUtils >> [] - Starting program (detached: false) >> *2021-05-30 15:01:16,871 INFO io.demo.flink.WordCount >>[] - Loaded by >> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ParentFirstClassLoader@14c053c6* >> >> Here is the log from yarn cluster >> 2021-05-30 07:20:14,434 INFO >> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - >> >> 2021-05-30 07:20:14,438 INFO >> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - >> Starting YarnApplicationClusterEntryPoint (Version: 1.12.1, Scala: 2.11, >> Rev:dc404e2, Date:2021-01-09T14:46:36+01:00) >> [trim down the log] >> 2021-05-30 07:20:15,205 INFO >> org.apache.flink.configuration.GlobalConfiguration [] - Loading >> configuration property: taskmanager.memory.process.size, 2048m >> *2021-05-30 07:20:15,205 INFO >> org.apache.flink.configuration.GlobalConfiguration [] - Loading >> configuration property: classloader.resolve-order, parent-first* >> 2021-05-30 07:20:15,205 INFO >> org.apache.flink.configuration.GlobalConfiguration [] - Loading >> configuration property: metrics.scope.jm, flink.jobmanager >> [trim down the log] >> *2021-05-30 07:20:21,383 INFO io.demo.flink.WordCount >>[] - Loaded by >> org.apache.flink.util.ChildFirstClassLoader@3da30852* >> >> Here is the job to reproduce the problem >> >> public static void main(String[] args) throws Exception { >> >> // set up the execution environment >> final StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> >> LOG.info("Loaded by {}", WordCount.class.getClassLoader()); >> // get input data >> DataStreamSource text = env.fromElements( >>"To be, or not to be,--that is the question:--", >>"Whether 'tis nobler in the mind to suffer", >>"The slings and arrows of outrageous fortune", >>"Or to take arms against a sea of troubles," >>); >> >>text.print(); >> env.execute("demo job"); >> >> } >> >> >> Flink version 1.12.1 >> >> I believe the inconsistency is the result of user defined flink-conf not >> passed to PackageProgram which uses default config instead >> >> >> https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ClassPathPackagedProgramRetriever.java#L109 >> >> Not sure if this is expected behavior that we never assume the main >> program is loaded with the configured class loader >> -- >> Regards, >> Tao >> > -- Regards, Tao
classloader.resolve-order is not honored when submitting job to a remote cluster
Hi team, I discovered that child first class loader is always used to initialize the main program when submitting the job to a yarn cluster using application mode regardless of what value classloader.resolve-order is set in flink-conf.yaml. But this is not the case if I submit the same job with the same config to the local cluster which honors the config and use the correct class loader to load the main program. Here is the log from local cluster 2021-05-30 15:01:16,372 INFO org.apache.flink.client.cli.CliFrontend [] - 2021-05-30 15:01:16,375 INFO org.apache.flink.client.cli.CliFrontend [] - Starting Command Line Client (Version: 1.12.1, Scala: 2.11, Rev:dc404e2, Date:2021-01-09T14:46:36+01:00) [trim down the log] *2021-05-30 15:01:16,616 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: classloader.resolve-order, parent-first* 2021-05-30 15:01:16,763 WARN org.apache.flink.runtime.util.HadoopUtils [] - Could not find Hadoop configuration via any of the supported methods (Flink configuration, environment variables). [trim down the log] 2021-05-30 15:01:16,830 INFO org.apache.flink.client.ClientUtils [] - Starting program (detached: false) *2021-05-30 15:01:16,871 INFO io.demo.flink.WordCount [] - Loaded by org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ParentFirstClassLoader@14c053c6* Here is the log from yarn cluster 2021-05-30 07:20:14,434 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - 2021-05-30 07:20:14,438 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Starting YarnApplicationClusterEntryPoint (Version: 1.12.1, Scala: 2.11, Rev:dc404e2, Date:2021-01-09T14:46:36+01:00) [trim down the log] 2021-05-30 07:20:15,205 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: taskmanager.memory.process.size, 2048m *2021-05-30 07:20:15,205 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: classloader.resolve-order, parent-first* 2021-05-30 07:20:15,205 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: metrics.scope.jm, flink.jobmanager [trim down the log] *2021-05-30 07:20:21,383 INFO io.demo.flink.WordCount [] - Loaded by org.apache.flink.util.ChildFirstClassLoader@3da30852* Here is the job to reproduce the problem public static void main(String[] args) throws Exception { // set up the execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); LOG.info("Loaded by {}", WordCount.class.getClassLoader()); // get input data DataStreamSource text = env.fromElements( "To be, or not to be,--that is the question:--", "Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune", "Or to take arms against a sea of troubles," ); text.print(); env.execute("demo job"); } Flink version 1.12.1 I believe the inconsistency is the result of user defined flink-conf not passed to PackageProgram which uses default config instead https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ClassPathPackagedProgramRetriever.java#L109 Not sure if this is expected behavior that we never assume the main program is loaded with the configured class loader -- Regards, Tao
Re: No result shown when submitting the SQL in cli
Thanks for the reply. Finally got it working. The output actually get printed out on client side not on TM side On Tue, May 11, 2021 at 11:47 PM Jeff Zhang wrote: > The result is printed in TM. > It is local mode in IDE, so TM runs in your local jvm that's why you see > the result > While it is distributed mode (either yarn or standalone mode) when you are > in sql-client, you should be able to see the result in TM logs. > > > tao xiao 于2021年5月11日周二 下午11:40写道: > >> Does anyone help with this question? >> >> On Thu, May 6, 2021 at 3:26 PM tao xiao wrote: >> >>> Hi team, >>> >>> I wrote a simple SQL job to select data from Kafka. I can see results >>> printing out in IDE but when I submit the job to a standalone cluster in >>> CLI there is no result shown. I am sure the job is running well in the >>> cluster with debug log suggesting that the kafka consumer is fetching data >>> from Kafka. I enabled debug log in CLI and I don't see any obvious log. >>> Here is the job code snippet >>> >>> public static void main(String[] args) throws Exception { >>> StreamTableEnvironment tableEnv = StreamTableEnvironment >>> >>> .create(StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1)); >>> >>> String sqls = new String(Files.readAllBytes(Paths.get(args[0]))); >>> splitIgnoreQuota(sqls, ';').forEach(sql -> { >>> TableResult tableResult = tableEnv.executeSql(sql); >>> tableResult.print(); >>> }); >>> } >>> >>> It simply parses a sql file and execute the statements >>> >>> Here is the SQL statements >>> >>> CREATE TABLE t1 ( >>> `f1` STRING, >>> `f2` STRING >>> ) WITH ( >>> 'connector' = 'kafka', >>> 'topic' = 'topic', >>> 'properties.group.id' = 'test1', >>> 'properties.max.partition.fetch.bytes' = '16384', >>> 'properties.enable.auto.commit' = 'false', >>> 'properties.bootstrap.servers' = 'kafka:9092', >>> 'scan.startup.mode' = 'earliest-offset', >>> 'format' = 'json' >>> ); >>> >>> SELECT * FROM t1 >>> >>> >>> Below is the result I got from IDE >>> | +I | b8f5 | abcd | >>> | +I | b8f5 | abcd | >>> >>> And this is the result from CLI >>> bin/flink run -m localhost:8081 -c kafka.sample.flink.SQLSample >>> ~/workspaces/kafka-sample/target/kafka-sample-0.1.2-jar-with-dependencies.jar >>> /sample.sql >>> ++ >>> | result | >>> ++ >>> | OK | >>> ++ >>> 1 row in set >>> Job has been submitted with JobID ace45d2ff850675243e2663d3bf11701 >>> ++++ >>> | op | uuid |ots | >>> ++++ >>> >>> >>> -- >>> Regards, >>> Tao >>> >> >> >> -- >> Regards, >> Tao >> > > > -- > Best Regards > > Jeff Zhang > -- Regards, Tao
Re: No result shown when submitting the SQL in cli
Does anyone help with this question? On Thu, May 6, 2021 at 3:26 PM tao xiao wrote: > Hi team, > > I wrote a simple SQL job to select data from Kafka. I can see results > printing out in IDE but when I submit the job to a standalone cluster in > CLI there is no result shown. I am sure the job is running well in the > cluster with debug log suggesting that the kafka consumer is fetching data > from Kafka. I enabled debug log in CLI and I don't see any obvious log. > Here is the job code snippet > > public static void main(String[] args) throws Exception { > StreamTableEnvironment tableEnv = StreamTableEnvironment > > .create(StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1)); > > String sqls = new String(Files.readAllBytes(Paths.get(args[0]))); > splitIgnoreQuota(sqls, ';').forEach(sql -> { > TableResult tableResult = tableEnv.executeSql(sql); > tableResult.print(); > }); > } > > It simply parses a sql file and execute the statements > > Here is the SQL statements > > CREATE TABLE t1 ( > `f1` STRING, > `f2` STRING > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'topic', > 'properties.group.id' = 'test1', > 'properties.max.partition.fetch.bytes' = '16384', > 'properties.enable.auto.commit' = 'false', > 'properties.bootstrap.servers' = 'kafka:9092', > 'scan.startup.mode' = 'earliest-offset', > 'format' = 'json' > ); > > SELECT * FROM t1 > > > Below is the result I got from IDE > | +I | b8f5 | abcd | > | +I | b8f5 | abcd | > > And this is the result from CLI > bin/flink run -m localhost:8081 -c kafka.sample.flink.SQLSample > ~/workspaces/kafka-sample/target/kafka-sample-0.1.2-jar-with-dependencies.jar > /sample.sql > ++ > | result | > ++ > | OK | > ++ > 1 row in set > Job has been submitted with JobID ace45d2ff850675243e2663d3bf11701 > ++++ > | op | uuid |ots | > ++++ > > > -- > Regards, > Tao > -- Regards, Tao
No result shown when submitting the SQL in cli
Hi team, I wrote a simple SQL job to select data from Kafka. I can see results printing out in IDE but when I submit the job to a standalone cluster in CLI there is no result shown. I am sure the job is running well in the cluster with debug log suggesting that the kafka consumer is fetching data from Kafka. I enabled debug log in CLI and I don't see any obvious log. Here is the job code snippet public static void main(String[] args) throws Exception { StreamTableEnvironment tableEnv = StreamTableEnvironment .create(StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1)); String sqls = new String(Files.readAllBytes(Paths.get(args[0]))); splitIgnoreQuota(sqls, ';').forEach(sql -> { TableResult tableResult = tableEnv.executeSql(sql); tableResult.print(); }); } It simply parses a sql file and execute the statements Here is the SQL statements CREATE TABLE t1 ( `f1` STRING, `f2` STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'topic', 'properties.group.id' = 'test1', 'properties.max.partition.fetch.bytes' = '16384', 'properties.enable.auto.commit' = 'false', 'properties.bootstrap.servers' = 'kafka:9092', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json' ); SELECT * FROM t1 Below is the result I got from IDE | +I | b8f5 | abcd | | +I | b8f5 | abcd | And this is the result from CLI bin/flink run -m localhost:8081 -c kafka.sample.flink.SQLSample ~/workspaces/kafka-sample/target/kafka-sample-0.1.2-jar-with-dependencies.jar /sample.sql ++ | result | ++ | OK | ++ 1 row in set Job has been submitted with JobID ace45d2ff850675243e2663d3bf11701 ++++ | op | uuid |ots | ++++ -- Regards, Tao
Re: Rich Function Thread Safety
As the java doc suggests it seems operator method and snapshot checkpoint are accessed by two different threads https://github.com/apache/flink/blob/release-1.10.0/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java#L39-L62 On Thu, May 7, 2020 at 1:22 AM Joey Echeverria wrote: > I’ve seen a few mailing list posts (including this one) that say Flink > guarantees there is no concurrent access to operator methods (e.g. flatMap, > snapshotState, etc.) and thus synchronization isn’t needed when writing > operators that support checkpointing. I was trying to find a place in the > official docs where this was called out, but was coming up empty. > > Is there a section of the docs that covers this topic? > > Thanks! > > -Joey > > On Dec 18, 2019, at 9:38 PM, Zhu Zhu wrote: > > [--- This email originated from outside of the organization. Do not click > links or open attachments unless you recognize the sender and know the > content is safe. ---] > > Hi Aaron, > > It is thread safe since the state snapshot happens in the same thread with > the user function. > > Thanks, > Zhu Zhu > > Aaron Langford 于2019年12月19日周四 上午11:25写道: > >> Hello Flink Community, >> >> I'm hoping to verify some understanding: >> >> If I have a function with managed state, I'm wondering if a >> checkpoint will ever be taken while a function is mutating state. I'll try >> to illustrate the situation I'm hoping to be safe from: >> >> Happy Path: >> t0 -> processFunction invoked with el1 >> t1 -> set A to 5 >> t2 -> set B to 10 >> t3 -> function returns >> >> Unhappy path: >> t0 -> processFunction invoked with el1 >> t1 -> set A to 5 >> t2 -> function interrupted, checkpoint taken (A = 5, B = 1) >> t3 -> set B to 10 >> t4 -> function returns >> ... >> tn -> flink application fails, restart from prev checkpoint (A=5, B=1) >> tn+1 -> recovery begins somewhere, but state is torn anyway, so we're >> going to have a bad time >> >> I don't think this could happen given that checkpoints effectively are >> messages in the pipeline, and the checkpoint is only taken when an operator >> sees the checkpoint barrier. >> >> Hoping to make sure this is correct! >> >> Aaron >> > > -- Regards, Tao
Re: Unable to serialize org.apache.kafka.common.config.types.Password
Thanks, it works On Wed, 26 Dec 2018 at 10:07 fudian.fd wrote: > The exception is very clear that the SourceFunction should be > serializable. Password is not serializable. You can try to set the kafka > consumer properties such as this: > > props.put(SaslConfigs.SASL_JAAS_CONFIG, "LoginModule required > subject=\"test\" secret=\"test\";"); > > The String value will be parsed to Password object.(refer to the method > org.apache.kafka.common.config.ConfigDef.parseType) > > Regards, > Dian > > > 在 2018年12月25日,下午11:04,tao xiao 写道: > > Hi team, > > I am passing a security enabled kafka consumer properties to > FlinkKafkaConsumer but keep getting this > error java.io.NotSerializableException? what is the best way to handle this? > > I use Flink 1.7.1 and here is the consumer property that produces the > exception > > props.put(SaslConfigs.SASL_JAAS_CONFIG, new Password("LoginModule required > subject=\"test\" secret=\"test\";")); > > stacktrace > Exception in thread "main" > org.apache.flink.api.common.InvalidProgramException: The implementation of > the FlinkKafkaConsumerBase is not serializable. The object probably > contains or references non serializable fields. > at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1559) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1471) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1415) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1397) > at > org.apache.flink.streaming.examples.statemachine.KafkaEventsGeneratorJob.main(KafkaEventsGeneratorJob.java:69) > Caused by: java.io.NotSerializableException: > org.apache.kafka.common.config.types.Password > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > at java.util.Hashtable.writeObject(Hashtable.java:1157) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > at > org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:534) > at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81) > ... 5 more > > >
Unable to serialize org.apache.kafka.common.config.types.Password
Hi team, I am passing a security enabled kafka consumer properties to FlinkKafkaConsumer but keep getting this error java.io.NotSerializableException? what is the best way to handle this? I use Flink 1.7.1 and here is the consumer property that produces the exception props.put(SaslConfigs.SASL_JAAS_CONFIG, new Password("LoginModule required subject=\"test\" secret=\"test\";")); stacktrace Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: The implementation of the FlinkKafkaConsumerBase is not serializable. The object probably contains or references non serializable fields. at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1559) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1471) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1415) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1397) at org.apache.flink.streaming.examples.statemachine.KafkaEventsGeneratorJob.main(KafkaEventsGeneratorJob.java:69) Caused by: java.io.NotSerializableException: org.apache.kafka.common.config.types.Password at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at java.util.Hashtable.writeObject(Hashtable.java:1157) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:534) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81) ... 5 more
Register a user scope metric in window reduce function
Hi team, Is there any way that I can register a metric in a window reduce function? As per the flink doc getRuntimecontext is only available in RichFunction but window operator doesn't allow RichFunction to be chained. Any way to workaround this?
Re: Restart the job from a checkpoint
Hi Ufuk, Thank you for the reply. I want to know what the difference is between state.backend.fs.checkpoint.dir and state.checkpoints.dir in this case? Does state.checkpoint.dir store the metadata that points to the checkpoint that is stored in state.backend.fs.checkpoint.dir? On Mon, 16 Jan 2017 at 19:24 Ufuk Celebi <u...@apache.org> wrote: > Hey! > > This is possible with the upcoming 1.2 version of Flink (also in the > current snapshot version): > > https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/checkpoints.html#externalized-checkpoints > > You have to manually activate it via the checkpoint config (see docs). > > Ping me if you have any questions. > > – Ufuk > > > On Mon, Jan 16, 2017 at 5:51 AM, tao xiao <xiaotao...@gmail.com> wrote: > > Hi team, > > > > Can we restart a flink job from previous successful checkpoint? I know we > > can start a flink from a savepoint but I wonder if I can do it similar by > > passing the checkpoint path to the flink run command to restore the job > from > > checkpoint. >
Restart the job from a checkpoint
Hi team, Can we restart a flink job from previous successful checkpoint? I know we can start a flink from a savepoint but I wonder if I can do it similar by passing the checkpoint path to the flink run command to restore the job from checkpoint.
Re: Kafka topic partition skewness causes watermark not being emitted
The case I described was for experiment only but data skewness would happen in production. The current implementation will block the watermark emission to downstream until all partition move forward which has great impact on latency. It may be a good idea to expose an API to users to decide what the best way is to control watermark emission On Fri, 13 Jan 2017 at 21:57 Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote: > Hi, > > This is expected behaviour due to how the per-partition watermarks are > designed in the Kafka consumer, but I think it’s probably a good idea to > handle idle partitions also when the Kafka consumer itself emits > watermarks. I’ve filed a JIRA issue for this: > https://issues.apache.org/jira/browse/FLINK-5479. > > For the time being, I don’t think there will be an easy way to avoid this > with the existing APIs, unfortunately. Is the skewed partition data > intentional, or only for experimental purposes? > > Best, > Gordon > > On January 12, 2017 at 5:28:40 PM, tao xiao (xiaotao...@gmail.com) wrote: > > Hi team, > > I have a topic with 2 partitions in Kafka. I produced all data to > partition 0 and no data to partition 1. I created a Flink job with > parallelism to 1 that consumes that topic and count the events with session > event window (5 seconds gap). It turned out that the session event window > was never closed even I sent a message with 10 minutes gap. After digging > into the source code, AbstractFetcher[1] that is responsible for sending > watermark to downstream calculates the min watermark of all partitions. Due > to the fact that we don't have data in partition 1, the watermark returned > from partition 1is always Long.MIN_VALUE therefore AbstractFetcher never > fires the watermark to downstream. > > I want to know if this is expected behavior or a bug. If this is expected > behavior how do I avoid the delay of watermark firing when data is not > evenly distributed to all partitions? > > This is the timestamp extractor I used > > public class ExactTimestampExtractor implements > AssignerWithPeriodicWatermarks { > > private long currentMaxTimestamp = Long.MIN_VALUE; > > @Nullable > @Override > public Watermark getCurrentWatermark() { > return new Watermark(currentMaxTimestamp == Long.MIN_VALUE ? > Long.MIN_VALUE : currentMaxTimestamp - 1); > } > > @Override > public long extractTimestamp(SessionEvent element, long > previousElementTimestamp) { > long eventStartTime = (long) element.get(SessionEvent.SESSION_START_DT); > if (eventStartTime > currentMaxTimestamp) { > currentMaxTimestamp = eventStartTime; > } > > return eventStartTime; > } > } > > and this is the Flink topo > > // get input data > FlinkKafkaConsumer010 consumer = new > FlinkKafkaConsumer010<>("topic4", > new MyOwnSchema() > consumer.assignTimestampsAndWatermarks(new ExactTimestampExtractor()); > DataStream input = env.addSource(consumer); > > input. > keyBy("id"). > window(EventTimeSessionWindows.withGap(Time.seconds(5))). > reduce(new Reducer(), new WindowFunction()). > print(); > > //// execute program > env.execute("a job"); > > I used the latest code in github > > [1] > https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java#L539 > >
Kafka topic partition skewness causes watermark not being emitted
Hi team, I have a topic with 2 partitions in Kafka. I produced all data to partition 0 and no data to partition 1. I created a Flink job with parallelism to 1 that consumes that topic and count the events with session event window (5 seconds gap). It turned out that the session event window was never closed even I sent a message with 10 minutes gap. After digging into the source code, AbstractFetcher[1] that is responsible for sending watermark to downstream calculates the min watermark of all partitions. Due to the fact that we don't have data in partition 1, the watermark returned from partition 1is always Long.MIN_VALUE therefore AbstractFetcher never fires the watermark to downstream. I want to know if this is expected behavior or a bug. If this is expected behavior how do I avoid the delay of watermark firing when data is not evenly distributed to all partitions? This is the timestamp extractor I used public class ExactTimestampExtractor implements AssignerWithPeriodicWatermarks { private long currentMaxTimestamp = Long.MIN_VALUE; @Nullable @Override public Watermark getCurrentWatermark() { return new Watermark(currentMaxTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentMaxTimestamp - 1); } @Override public long extractTimestamp(SessionEvent element, long previousElementTimestamp) { long eventStartTime = (long) element.get(SessionEvent.SESSION_START_DT); if (eventStartTime > currentMaxTimestamp) { currentMaxTimestamp = eventStartTime; } return eventStartTime; } } and this is the Flink topo // get input data FlinkKafkaConsumer010 consumer = new FlinkKafkaConsumer010<>("topic4", new MyOwnSchema() consumer.assignTimestampsAndWatermarks(new ExactTimestampExtractor()); DataStream input = env.addSource(consumer); input. keyBy("id"). window(EventTimeSessionWindows.withGap(Time.seconds(5))). reduce(new Reducer(), new WindowFunction()). print(); //// execute program env.execute("a job"); I used the latest code in github [1] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java#L539
Re: window function outputs two different values
Hi team, any suggestions on below topic? I have a requirement that wants to output two different values from a time window reduce function. Here is basic workflow 1. fetch data from Kafka 2. flow the data to a event session window. kafka source -> keyBy -> session window -> reduce 3. inside the reduce function, count the number of data and also emit the data itself to another operator for further processing As the reduce function can only emit the count, I want to know how to also emit the data as well? On Sat, 7 Jan 2017 at 20:30 tao xiao <xiaotao...@gmail.com> wrote: > Hi team, > > I have a requirement that wants to output two different values from a time > window reduce function. Here is basic workflow > > 1. fetch data from Kafka > 2. flow the data to a event session window. kafka source -> keyBy -> > session window -> reduce > 3. inside the reduce function, count the number of data and also emit the > data itself to another operator for further processing > > As the reduce function can only emit the count, I want to know how to also > emit the data as well? > >
window function outputs two different values
Hi team, I have a requirement that wants to output two different values from a time window reduce function. Here is basic workflow 1. fetch data from Kafka 2. flow the data to a event session window. kafka source -> keyBy -> session window -> reduce 3. inside the reduce function, count the number of data and also emit the data itself to another operator for further processing As the reduce function can only emit the count, I want to know how to also emit the data as well?