Re: Any way to improve list state get performance

2022-11-21 Thread tao xiao
any suggestion is highly appreciated

On Tue, Nov 15, 2022 at 8:50 PM tao xiao  wrote:

> Hi team,
>
> I have a Flink job that joins two streams, let's say A and B streams,
> followed by a key process function. In the key process function the job
> inserts elements from B stream to a list state if element from A stream
> hasn't arrived yet. I am wondering if any way to skip the liststat.get() to
> check if there are elements in the list state when A stream arrives to
> reduce the call to underlying state (RocksDB)
>
> Here is the code snippet
>
> keyfunction {
>
> process(in, ctx, collector) {
> if (in is A stream)
> // anyway to check if list state is empty so that we dont need to call
> get()?
> for (b : liststate.get()) {
> .
> }
>
> if (in is B stream)
> liststate.add(in)
>
>
> --
> Regards,
> Tao
>


-- 
Regards,
Tao


Any way to improve list state get performance

2022-11-15 Thread tao xiao
Hi team,

I have a Flink job that joins two streams, let's say A and B streams,
followed by a key process function. In the key process function the job
inserts elements from B stream to a list state if element from A stream
hasn't arrived yet. I am wondering if any way to skip the liststat.get() to
check if there are elements in the list state when A stream arrives to
reduce the call to underlying state (RocksDB)

Here is the code snippet

keyfunction {

process(in, ctx, collector) {
if (in is A stream)
// anyway to check if list state is empty so that we dont need to call
get()?
for (b : liststate.get()) {
.
}

if (in is B stream)
liststate.add(in)


-- 
Regards,
Tao


Re: Pojo state schema evolution not working correctly

2022-08-09 Thread tao xiao
Not exactly reproduciable. I cannot reproduce the problem in a test
environment with the same setting. This is only reproducible in PRD. I
guess it has something to do with the state itself.


Where you added the debugging log ? In RocksDBListState#get() ?
>
I added the debug log in
*org.apache.flink.contrib.streaming.state.RocksDBListState#migrateSerializedValue
*that prints message: before migration object size *480* :after object size
:*481*
and another debug log in
org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager.ListElementFilter#nextUnexpiredOffset
that prints: object size *480*

If I understand the code correctly the first debug log verifies the object
has been successfully serliazlied back to RocksDB with new schema however
the second debug log suggests the object is still in old schema when the
state is queried

On Mon, Aug 8, 2022 at 12:28 PM Hangxiang Yu  wrote:

> Hi,
> IIUC, Conditions to reproduce it are:
> 1. Using RocksDBStateBackend with incremental strategy
> 2. Using ListState in the stateful operator
> 3. enabling TTL with cleanupInRocksdbCompactFilter
> 4. adding a field to make the job trigger schema evolution
> Then the exception will be thrown, right?
>
> As for the next question about why the value is not updated in RocksDB,
> Where you added the debugging log ? In RocksDBListState#get() ?
>
> Best,
> Hangxiang.
>
> On Wed, Aug 3, 2022 at 5:32 PM tao xiao  wrote:
>
>> Hi team,
>>
>> I encountered below exception after I added a new field to a POJO used in
>> list state and resumed the job from checkpoint
>>
>>
>>
>>> [error occurred during error reporting , id
>>> 0xb]\n","stream":"stdout","time":
>>> \n","stream":"stdout","time":
>>> #\n","stream":"stdout","time":
>>> #   http://bugreport.java.com/bugreport/crash.jsp\n
>>> ","stream":"stdout","time":
>>> # If you would like to submit a bug report, please
>>> visit:\n","stream":"stdout","time":
>>> #\n","stream":"stdout","time":
>>> # /opt/flink/hs_err_pid1.log\n","stream":"stdout","time":
>>> # An error report file with more information is saved
>>> as:\n","stream":"stdout","time":
>>> #\n","stream":"stdout","time":
>>> # Core dump written. Default location: /opt/flink/core or
>>> core.1\n","stream":"stdout","time":
>>> #\n","stream":"stdout","time":
>>> # V  [libjvm.so+0x57595f]  Exceptions::_throw(Thread*, char const*, int,
>>> Handle, char const*)+0x1ef\n","stream":"stdout","time":\# Problematic
>>> frame:\n","stream":"stdout","time":
>>> # Java VM: OpenJDK 64-Bit Server VM (25.302-b08 mixed mode linux-amd64
>>> compressed oops)\n","stream":"stdout","time":
>>> # JRE version: OpenJDK Runtime Environment (8.0_302-b08) (build
>>> 1.8.0_302-b08)\n","stream":"stdout","time":
>>> #\n","stream":"stdout","time":
>>> #  SIGSEGV (0xb) at pc=0x7f6a2ad5d95f, pid=1,
>>> tid=0x7f69bac9f700\n","stream":"stdout","time":
>>> #\n","stream":"stdout","time":
>>> # A fatal error has been detected by the Java Runtime
>>> Environment:\n","stream":"stdout","time":
>>> #\n","stream":"stdout","time":]
>>>
>>>
>>> org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager$ListElementFilter.nextUnexpiredOffset(RocksDbTtlCompactFiltersManager.java:193)
>>>
>>> org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager$ListElementFilter.nextElementLastAccessTimestamp(RocksDbTtlCompactFiltersManager.java:243)
>>>
>>> org.apache.flink.api.common.typeutils.CompositeSerializer.deserialize(CompositeSerializer.java:156)
>>> at
>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:412)
>>> at java.lang.reflect.Field.set(Field.java:764)
>>> at
>>> sun.reflect.UnsafeIntegerFieldAccessorImpl.set(UnsafeIntegerFieldAccessorImpl.java:80)
>>> at
>>> sun.reflect.UnsafeFieldAccessorImpl.throw

Pojo state schema evolution not working correctly

2022-08-03 Thread tao xiao
Hi team,

I encountered below exception after I added a new field to a POJO used in
list state and resumed the job from checkpoint



> [error occurred during error reporting , id
> 0xb]\n","stream":"stdout","time":
> \n","stream":"stdout","time":
> #\n","stream":"stdout","time":
> #   http://bugreport.java.com/bugreport/crash.jsp\n
> ","stream":"stdout","time":
> # If you would like to submit a bug report, please
> visit:\n","stream":"stdout","time":
> #\n","stream":"stdout","time":
> # /opt/flink/hs_err_pid1.log\n","stream":"stdout","time":
> # An error report file with more information is saved
> as:\n","stream":"stdout","time":
> #\n","stream":"stdout","time":
> # Core dump written. Default location: /opt/flink/core or
> core.1\n","stream":"stdout","time":
> #\n","stream":"stdout","time":
> # V  [libjvm.so+0x57595f]  Exceptions::_throw(Thread*, char const*, int,
> Handle, char const*)+0x1ef\n","stream":"stdout","time":\# Problematic
> frame:\n","stream":"stdout","time":
> # Java VM: OpenJDK 64-Bit Server VM (25.302-b08 mixed mode linux-amd64
> compressed oops)\n","stream":"stdout","time":
> # JRE version: OpenJDK Runtime Environment (8.0_302-b08) (build
> 1.8.0_302-b08)\n","stream":"stdout","time":
> #\n","stream":"stdout","time":
> #  SIGSEGV (0xb) at pc=0x7f6a2ad5d95f, pid=1,
> tid=0x7f69bac9f700\n","stream":"stdout","time":
> #\n","stream":"stdout","time":
> # A fatal error has been detected by the Java Runtime
> Environment:\n","stream":"stdout","time":
> #\n","stream":"stdout","time":]
>
>
> org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager$ListElementFilter.nextUnexpiredOffset(RocksDbTtlCompactFiltersManager.java:193)
>
> org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager$ListElementFilter.nextElementLastAccessTimestamp(RocksDbTtlCompactFiltersManager.java:243)
>
> org.apache.flink.api.common.typeutils.CompositeSerializer.deserialize(CompositeSerializer.java:156)
> at
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:412)
> at java.lang.reflect.Field.set(Field.java:764)
> at
> sun.reflect.UnsafeIntegerFieldAccessorImpl.set(UnsafeIntegerFieldAccessorImpl.java:80)
> at
> sun.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException(UnsafeFieldAccessorImpl.java:171)
> at
> sun.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException(UnsafeFieldAccessorImpl.java:167)
> Caused by: java.lang.IllegalArgumentException: Can not set int field
> x.hr to null value
>
> org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager$ListElementFilter.nextUnexpiredOffset(RocksDbTtlCompactFiltersManager.java:214)
> Exception in thread \"Thread-23\"
> org.apache.flink.util.FlinkRuntimeException: Failed to deserialize list
> element for TTL compaction filter
>

I verified that Flink recognized the state change

Performing state migration for state ListStateDescriptor{name=novimp,
> defaultValue=null,
> serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@6fe249e5}
> because the state serializer's schema, i.e. serialization format, has
> changed.
>

and successfully migrated the state with new POJO class (I updated Flink
source code with my own debugging log)

before migration object size *480* :after object size :*481*


However when the exception occurred the object read from state has byte
length of original size not updated size. It appears that the state
migration during the state recovery phase didn't take effect or persist to
RocksDB. Can you pls give me some pointers to debug this problem further?

> object size *480*
>


Flink version is 1.13.2
RocksDB statebackend with incremental, aligned checkpoint
List state with TTL (24H) enabled

-- 
Regards,
Tao


Re: Incorrect checkpoint id used when job is recovering

2022-05-19 Thread tao xiao
Hi team,

Can anyone shed some light?

On Sat, May 14, 2022 at 8:56 AM tao xiao  wrote:

> Hi team,
>
> Does anyone have any ideas?
>
> On Thu, May 12, 2022 at 9:20 PM tao xiao  wrote:
>
>> Forgot to mention the Flink version is 1.13.2 and we use kubernetes
>> native mode
>>
>> On Thu, May 12, 2022 at 9:18 PM tao xiao  wrote:
>>
>>> Hi team,
>>>
>>> I met a weird issue when a job tries to recover from JM failure.  The
>>> success checkpoint before JM crashed is 41205
>>>
>>> ```
>>>
>>> {"log":"2022-05-10 14:55:40,663 INFO  
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Completed 
>>> checkpoint 41205 for job  (9453840 bytes in 
>>> 1922 ms).\n","stream":"stdout","time":"2022-05-10T14:55:40.663286893Z"}
>>>
>>> ```
>>>
>>> However JM tries to recover the job with an old checkpoint 41051 which
>>> doesn't exist that leads to unrecoverable state
>>>
>>> ```
>>>
>>> "2022-05-10 14:59:38,949 INFO  
>>> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - 
>>> Trying to retrieve checkpoint 41051.\n"
>>>
>>> ```
>>>
>>> Full log attached
>>>
>>> --
>>> Regards,
>>> Tao
>>>
>>
>>
>> --
>> Regards,
>> Tao
>>
>
>
> --
> Regards,
> Tao
>


-- 
Regards,
Tao


Re: Incorrect checkpoint id used when job is recovering

2022-05-13 Thread tao xiao
Hi team,

Does anyone have any ideas?

On Thu, May 12, 2022 at 9:20 PM tao xiao  wrote:

> Forgot to mention the Flink version is 1.13.2 and we use kubernetes native
> mode
>
> On Thu, May 12, 2022 at 9:18 PM tao xiao  wrote:
>
>> Hi team,
>>
>> I met a weird issue when a job tries to recover from JM failure.  The
>> success checkpoint before JM crashed is 41205
>>
>> ```
>>
>> {"log":"2022-05-10 14:55:40,663 INFO  
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Completed 
>> checkpoint 41205 for job  (9453840 bytes in 
>> 1922 ms).\n","stream":"stdout","time":"2022-05-10T14:55:40.663286893Z"}
>>
>> ```
>>
>> However JM tries to recover the job with an old checkpoint 41051 which
>> doesn't exist that leads to unrecoverable state
>>
>> ```
>>
>> "2022-05-10 14:59:38,949 INFO  
>> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - 
>> Trying to retrieve checkpoint 41051.\n"
>>
>> ```
>>
>> Full log attached
>>
>> --
>> Regards,
>> Tao
>>
>
>
> --
> Regards,
> Tao
>


-- 
Regards,
Tao


Re: Incorrect checkpoint id used when job is recovering

2022-05-12 Thread tao xiao
Forgot to mention the Flink version is 1.13.2 and we use kubernetes native
mode

On Thu, May 12, 2022 at 9:18 PM tao xiao  wrote:

> Hi team,
>
> I met a weird issue when a job tries to recover from JM failure.  The
> success checkpoint before JM crashed is 41205
>
> ```
>
> {"log":"2022-05-10 14:55:40,663 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Completed 
> checkpoint 41205 for job  (9453840 bytes in 
> 1922 ms).\n","stream":"stdout","time":"2022-05-10T14:55:40.663286893Z"}
>
> ```
>
> However JM tries to recover the job with an old checkpoint 41051 which
> doesn't exist that leads to unrecoverable state
>
> ```
>
> "2022-05-10 14:59:38,949 INFO  
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - 
> Trying to retrieve checkpoint 41051.\n"
>
> ```
>
> Full log attached
>
> --
> Regards,
> Tao
>


-- 
Regards,
Tao


Incorrect checkpoint id used when job is recovering

2022-05-12 Thread tao xiao
Hi team,

I met a weird issue when a job tries to recover from JM failure.  The
success checkpoint before JM crashed is 41205

```

{"log":"2022-05-10 14:55:40,663 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
Completed checkpoint 41205 for job 
(9453840 bytes in 1922
ms).\n","stream":"stdout","time":"2022-05-10T14:55:40.663286893Z"}

```

However JM tries to recover the job with an old checkpoint 41051 which
doesn't exist that leads to unrecoverable state

```

"2022-05-10 14:59:38,949 INFO
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore []
- Trying to retrieve checkpoint 41051.\n"

```

Full log attached

-- 
Regards,
Tao


jm.log
Description: Binary data


Re: Confusion about rebalance bytes sent metric in Flink UI

2021-12-16 Thread tao xiao
>Your upstream is not inflating the record size?
No, this is a simply dedup function

On Thu, Dec 16, 2021 at 2:49 PM Arvid Heise  wrote:

> Ah yes I see it now as well. Yes you are right, each record should be
> replicated 9 times to send to one of the instances each. Your upstream is
> not inflating the record size? The number of records seems to work
> decently. @pnowojski  FYI.
>
> On Thu, Dec 16, 2021 at 2:20 AM tao xiao  wrote:
>
>> Hi Arvid
>>
>> The second picture shows the metrics of the upstream operator. The
>> upstream has 150 parallelisms as you can see in the first picture. I expect
>> the bytes sent is about 9 * bytes received as we have 9 downstream
>> operators connecting.
>>
>> Hi Caizhi,
>> Let me create a minimal reproducible DAG and update here
>>
>> On Thu, Dec 16, 2021 at 4:03 AM Arvid Heise  wrote:
>>
>>> Hi,
>>>
>>> Could you please clarify which operator we see in the second picture?
>>>
>>> If you are showing the upstream operator, then this has only parallelism
>>> 1, so there shouldn't be multiple subtasks.
>>> If you are showing the downstream operator, then the metric would refer
>>> to the HASH and not REBALANCE.
>>>
>>> On Tue, Dec 14, 2021 at 2:55 AM Caizhi Weng 
>>> wrote:
>>>
>>>> Hi!
>>>>
>>>> This doesn't seem to be the expected behavior. Rebalance shuffle should
>>>> send records to one of the parallelism, not all.
>>>>
>>>> If possible could you please explain what your Flink job is doing and
>>>> preferably share your user code so that others can look into this case?
>>>>
>>>> tao xiao  于2021年12月11日周六 01:11写道:
>>>>
>>>>> Hi team,
>>>>>
>>>>> I have one operator that is connected to another 9 downstream
>>>>> operators using rebalance. Each operator has 150 parallelisms[1]. I assume
>>>>> each message in the upstream operation is sent to one of the parallel
>>>>> instances of the 9 receiving operators so the total bytes sent should be
>>>>> roughly 9 times of bytes received in the upstream operator metric. However
>>>>> the Flink UI shows the bytes sent is much higher than 9 times. It is about
>>>>> 150 * 9 * bytes received[2]. This looks to me like every message is
>>>>> duplicated to each parallel instance of all receiving operators like what
>>>>> broadcast does.  Is this correct?
>>>>>
>>>>>
>>>>>
>>>>> [1] https://imgur.com/cGyb0QO
>>>>> [2] https://imgur.com/SFqPiJA
>>>>> --
>>>>> Regards,
>>>>> Tao
>>>>>
>>>>
>>
>> --
>> Regards,
>> Tao
>>
>

-- 
Regards,
Tao


Re: Confusion about rebalance bytes sent metric in Flink UI

2021-12-15 Thread tao xiao
Hi Arvid

The second picture shows the metrics of the upstream operator. The upstream
has 150 parallelisms as you can see in the first picture. I expect the
bytes sent is about 9 * bytes received as we have 9 downstream operators
connecting.

Hi Caizhi,
Let me create a minimal reproducible DAG and update here

On Thu, Dec 16, 2021 at 4:03 AM Arvid Heise  wrote:

> Hi,
>
> Could you please clarify which operator we see in the second picture?
>
> If you are showing the upstream operator, then this has only parallelism
> 1, so there shouldn't be multiple subtasks.
> If you are showing the downstream operator, then the metric would refer to
> the HASH and not REBALANCE.
>
> On Tue, Dec 14, 2021 at 2:55 AM Caizhi Weng  wrote:
>
>> Hi!
>>
>> This doesn't seem to be the expected behavior. Rebalance shuffle should
>> send records to one of the parallelism, not all.
>>
>> If possible could you please explain what your Flink job is doing and
>> preferably share your user code so that others can look into this case?
>>
>> tao xiao  于2021年12月11日周六 01:11写道:
>>
>>> Hi team,
>>>
>>> I have one operator that is connected to another 9 downstream operators
>>> using rebalance. Each operator has 150 parallelisms[1]. I assume each
>>> message in the upstream operation is sent to one of the parallel instances
>>> of the 9 receiving operators so the total bytes sent should be roughly 9
>>> times of bytes received in the upstream operator metric. However the Flink
>>> UI shows the bytes sent is much higher than 9 times. It is about 150 * 9 *
>>> bytes received[2]. This looks to me like every message is duplicated to
>>> each parallel instance of all receiving operators like what broadcast
>>> does.  Is this correct?
>>>
>>>
>>>
>>> [1] https://imgur.com/cGyb0QO
>>> [2] https://imgur.com/SFqPiJA
>>> --
>>> Regards,
>>> Tao
>>>
>>

-- 
Regards,
Tao


Confusion about rebalance bytes sent metric in Flink UI

2021-12-10 Thread tao xiao
Hi team,

I have one operator that is connected to another 9 downstream operators
using rebalance. Each operator has 150 parallelisms[1]. I assume each
message in the upstream operation is sent to one of the parallel instances
of the 9 receiving operators so the total bytes sent should be roughly 9
times of bytes received in the upstream operator metric. However the Flink
UI shows the bytes sent is much higher than 9 times. It is about 150 * 9 *
bytes received[2]. This looks to me like every message is duplicated to
each parallel instance of all receiving operators like what broadcast
does.  Is this correct?



[1] https://imgur.com/cGyb0QO
[2] https://imgur.com/SFqPiJA
-- 
Regards,
Tao


Re: RocksDB state not cleaned up

2021-09-17 Thread tao xiao
Thanks for the feedback! However TTL already proves that the state cannot
be cleaned up on time due to too many levels built up in RocksDB.

Hi @Yun Tang  do you have any suggestions to tune RocksDB
to accelerate the compaction progress?

On Fri, Sep 17, 2021 at 8:01 PM David Morávek  wrote:

> Cleaning up with timers should solve this. Both approaches have some
> advantages and disadvantages though.
>
> Timers:
> - No "side effects".
> - Can be set in event time. Deletes are regular tombstones that will get
> compacted later on.
>
> TTL:
> - Performance. This costs literally nothing compared to an extra state for
> timer + writing a tombstone marker.
> - Has "side-effects", because it works in processing time. This is just
> something to keep in mind eg. when bootstraping the state from historical
> data. (large event time / processing time skew)
>
> With 1.14 release, we've bumped the RocksDB version so it may be possible
> to use a "periodic compaction" [1], but nobody has tried that so far. In
> the meantime I think there is non real workaround because we don't expose a
> way to trigger manual compaction.
>
> I'm off to vacation until 27th and I won't be responsive during that time.
> I'd like to pull Yun into the conversation as he's super familiar with the
> RocksDB state backend.
>
> [1]
> https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide#periodic-and-ttl-compaction
>
> Best,
> D.
>
> On Fri, Sep 17, 2021 at 5:17 AM tao xiao  wrote:
>
>> Hi David,
>>
>> Confirmed with RocksDB log Stephan's observation is the root cause that
>> compaction doesn't clean up the high level sst files fast enough.  Do you
>> think manual clean up by registering a timer is the way to go or any
>> RocksDB parameter can be tuned to mitigate this issue?
>>
>> On Wed, Sep 15, 2021 at 12:10 AM tao xiao  wrote:
>>
>>> Hi David,
>>>
>>> If I read Stephan's comment correctly TTL doesn't work well for cases
>>> where we have too many levels, like fast growing state,  as compaction
>>> doesn't clean up high level SST files in time, Is this correct? If yes
>>> should we register a timer with TTL time and manual clean up the state
>>> (state.clear() ) when the timer fires?
>>>
>>> I will turn on RocksDB logging as well as compaction logging [1] to
>>> verify this
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/state.html#cleanup-during-rocksdb-compaction
>>>
>>>
>>> On Tue, Sep 14, 2021 at 5:38 PM David Morávek  wrote:
>>>
>>>> Hi Tao,
>>>>
>>>> my intuition is that the compaction of SST files is not triggering. By
>>>> default, it's only triggered by the size ratios of different levels [1] and
>>>> the TTL mechanism has no effect on it.
>>>>
>>>> Some reasoning from Stephan:
>>>>
>>>> It's very likely to have large files in higher levels that haven't been
>>>>> compacted in a long time and thus just stay around.
>>>>>
>>>>> This might be especially possible if you insert a lot in the beginning
>>>>> (build up many levels) and then have a moderate rate of modifications, so
>>>>> the changes and expiration keep happening purely in the merges /
>>>>> compactions of the first levels. Then the later levels may stay unchanged
>>>>> for quite some time.
>>>>>
>>>>
>>>> You should be able to see compaction details by setting RocksDB logging
>>>> to INFO [2]. Can you please check these and validate whether this really is
>>>> the case?
>>>>
>>>> [1] https://github.com/facebook/rocksdb/wiki/Leveled-Compaction
>>>> [2]
>>>> https://ververica.zendesk.com/hc/en-us/articles/360015933320-How-to-get-RocksDB-s-LOG-file-back-for-advanced-troubleshooting
>>>>
>>>> Best,
>>>> D.
>>>>
>>>> On Mon, Sep 13, 2021 at 3:18 PM tao xiao  wrote:
>>>>
>>>>> Hi team
>>>>>
>>>>> We have a job that uses value state with RocksDB and TTL set to 1 day.
>>>>> The TTL update type is OnCreateAndWrite. We set the value state when the
>>>>> value state doesn't exist and we never update it again after the state is
>>>>> not empty. The key of the value state is timestamp. My understanding of
>>>>> such TTL settings is that the size of all SST files remains flat (let's
>>>>> disregard 

Re: RocksDB state not cleaned up

2021-09-16 Thread tao xiao
Hi David,

Confirmed with RocksDB log Stephan's observation is the root cause that
compaction doesn't clean up the high level sst files fast enough.  Do you
think manual clean up by registering a timer is the way to go or any
RocksDB parameter can be tuned to mitigate this issue?

On Wed, Sep 15, 2021 at 12:10 AM tao xiao  wrote:

> Hi David,
>
> If I read Stephan's comment correctly TTL doesn't work well for cases
> where we have too many levels, like fast growing state,  as compaction
> doesn't clean up high level SST files in time, Is this correct? If yes
> should we register a timer with TTL time and manual clean up the state
> (state.clear() ) when the timer fires?
>
> I will turn on RocksDB logging as well as compaction logging [1] to verify
> this
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/state.html#cleanup-during-rocksdb-compaction
>
>
> On Tue, Sep 14, 2021 at 5:38 PM David Morávek  wrote:
>
>> Hi Tao,
>>
>> my intuition is that the compaction of SST files is not triggering. By
>> default, it's only triggered by the size ratios of different levels [1] and
>> the TTL mechanism has no effect on it.
>>
>> Some reasoning from Stephan:
>>
>> It's very likely to have large files in higher levels that haven't been
>>> compacted in a long time and thus just stay around.
>>>
>>> This might be especially possible if you insert a lot in the beginning
>>> (build up many levels) and then have a moderate rate of modifications, so
>>> the changes and expiration keep happening purely in the merges /
>>> compactions of the first levels. Then the later levels may stay unchanged
>>> for quite some time.
>>>
>>
>> You should be able to see compaction details by setting RocksDB logging
>> to INFO [2]. Can you please check these and validate whether this really is
>> the case?
>>
>> [1] https://github.com/facebook/rocksdb/wiki/Leveled-Compaction
>> [2]
>> https://ververica.zendesk.com/hc/en-us/articles/360015933320-How-to-get-RocksDB-s-LOG-file-back-for-advanced-troubleshooting
>>
>> Best,
>> D.
>>
>> On Mon, Sep 13, 2021 at 3:18 PM tao xiao  wrote:
>>
>>> Hi team
>>>
>>> We have a job that uses value state with RocksDB and TTL set to 1 day.
>>> The TTL update type is OnCreateAndWrite. We set the value state when the
>>> value state doesn't exist and we never update it again after the state is
>>> not empty. The key of the value state is timestamp. My understanding of
>>> such TTL settings is that the size of all SST files remains flat (let's
>>> disregard the impact space amplification brings) after 1 day as the daily
>>> data volume is more or less the same. However the RocksDB native metrics
>>> show that the SST files continue to grow since I started the job. I check
>>> the SST files in local storage and I can see SST files with age 1 months
>>> ago (when I started the job). What is the possible reason for the SST files
>>> not cleaned up?.
>>>
>>> The Flink version is 1.12.1
>>> State backend is RocksDB with incremental checkpoint
>>> All default configuration for RocksDB
>>> Per job mode in Yarn and checkpoint to S3
>>>
>>>
>>> Here is the code to set value state
>>>
>>> public void open(Configuration parameters) {
>>> StateTtlConfig ttlConfigClick = StateTtlConfig
>>> .newBuilder(Time.days(1))
>>> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
>>> 
>>> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
>>> .cleanupInRocksdbCompactFilter(300_000)
>>> .build();
>>> ValueStateDescriptor clickStateDescriptor = new 
>>> ValueStateDescriptor<>("click", Click.class);
>>> clickStateDescriptor.enableTimeToLive(ttlConfigClick);
>>> clickState = getRuntimeContext().getState(clickStateDescriptor);
>>>
>>> StateTtlConfig ttlConfigAds = StateTtlConfig
>>> .newBuilder(Time.days(1))
>>> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
>>> 
>>> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
>>> .cleanupInRocksdbCompactFilter(30_000_000)
>>> .build();
>>> ValueStateDescriptor adsStateDescriptor = new 
>>> ValueStateDescriptor<>("ads", slimAdsClass);
>>> adsStateDescriptor

Re: RocksDB state not cleaned up

2021-09-14 Thread tao xiao
Hi David,

If I read Stephan's comment correctly TTL doesn't work well for cases where
we have too many levels, like fast growing state,  as compaction doesn't
clean up high level SST files in time, Is this correct? If yes should we
register a timer with TTL time and manual clean up the state (state.clear()
) when the timer fires?

I will turn on RocksDB logging as well as compaction logging [1] to verify
this

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/state.html#cleanup-during-rocksdb-compaction


On Tue, Sep 14, 2021 at 5:38 PM David Morávek  wrote:

> Hi Tao,
>
> my intuition is that the compaction of SST files is not triggering. By
> default, it's only triggered by the size ratios of different levels [1] and
> the TTL mechanism has no effect on it.
>
> Some reasoning from Stephan:
>
> It's very likely to have large files in higher levels that haven't been
>> compacted in a long time and thus just stay around.
>>
>> This might be especially possible if you insert a lot in the beginning
>> (build up many levels) and then have a moderate rate of modifications, so
>> the changes and expiration keep happening purely in the merges /
>> compactions of the first levels. Then the later levels may stay unchanged
>> for quite some time.
>>
>
> You should be able to see compaction details by setting RocksDB logging to
> INFO [2]. Can you please check these and validate whether this really is
> the case?
>
> [1] https://github.com/facebook/rocksdb/wiki/Leveled-Compaction
> [2]
> https://ververica.zendesk.com/hc/en-us/articles/360015933320-How-to-get-RocksDB-s-LOG-file-back-for-advanced-troubleshooting
>
> Best,
> D.
>
> On Mon, Sep 13, 2021 at 3:18 PM tao xiao  wrote:
>
>> Hi team
>>
>> We have a job that uses value state with RocksDB and TTL set to 1 day.
>> The TTL update type is OnCreateAndWrite. We set the value state when the
>> value state doesn't exist and we never update it again after the state is
>> not empty. The key of the value state is timestamp. My understanding of
>> such TTL settings is that the size of all SST files remains flat (let's
>> disregard the impact space amplification brings) after 1 day as the daily
>> data volume is more or less the same. However the RocksDB native metrics
>> show that the SST files continue to grow since I started the job. I check
>> the SST files in local storage and I can see SST files with age 1 months
>> ago (when I started the job). What is the possible reason for the SST files
>> not cleaned up?.
>>
>> The Flink version is 1.12.1
>> State backend is RocksDB with incremental checkpoint
>> All default configuration for RocksDB
>> Per job mode in Yarn and checkpoint to S3
>>
>>
>> Here is the code to set value state
>>
>> public void open(Configuration parameters) {
>> StateTtlConfig ttlConfigClick = StateTtlConfig
>> .newBuilder(Time.days(1))
>> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
>> 
>> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
>> .cleanupInRocksdbCompactFilter(300_000)
>> .build();
>> ValueStateDescriptor clickStateDescriptor = new 
>> ValueStateDescriptor<>("click", Click.class);
>> clickStateDescriptor.enableTimeToLive(ttlConfigClick);
>> clickState = getRuntimeContext().getState(clickStateDescriptor);
>>
>> StateTtlConfig ttlConfigAds = StateTtlConfig
>> .newBuilder(Time.days(1))
>> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
>> 
>> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
>> .cleanupInRocksdbCompactFilter(30_000_000)
>> .build();
>> ValueStateDescriptor adsStateDescriptor = new 
>> ValueStateDescriptor<>("ads", slimAdsClass);
>> adsStateDescriptor.enableTimeToLive(ttlConfigAds);
>> adsState = getRuntimeContext().getState(adsStateDescriptor);
>> }
>>
>> @Override
>> public void processElement(Tuple3 tuple, Context ctx, 
>> Collector collector) throws Exception {
>> if (tuple.f1 != null) {
>> Click click = tuple.f1;
>>
>> if (clickState.value() != null) {
>> return;
>> }
>>
>> clickState.update(click);
>>
>> A adsFromState = adsState.value();
>> if (adsFromState != null) {
>> collector.collect(adsFromState);
>> }
>> } else {
>> A ads = tuple.f2;

RocksDB state not cleaned up

2021-09-13 Thread tao xiao
Hi team

We have a job that uses value state with RocksDB and TTL set to 1 day. The
TTL update type is OnCreateAndWrite. We set the value state when the value
state doesn't exist and we never update it again after the state is not
empty. The key of the value state is timestamp. My understanding of such
TTL settings is that the size of all SST files remains flat (let's
disregard the impact space amplification brings) after 1 day as the daily
data volume is more or less the same. However the RocksDB native metrics
show that the SST files continue to grow since I started the job. I check
the SST files in local storage and I can see SST files with age 1 months
ago (when I started the job). What is the possible reason for the SST files
not cleaned up?.

The Flink version is 1.12.1
State backend is RocksDB with incremental checkpoint
All default configuration for RocksDB
Per job mode in Yarn and checkpoint to S3


Here is the code to set value state

public void open(Configuration parameters) {
StateTtlConfig ttlConfigClick = StateTtlConfig
.newBuilder(Time.days(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)

.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.cleanupInRocksdbCompactFilter(300_000)
.build();
ValueStateDescriptor clickStateDescriptor = new
ValueStateDescriptor<>("click", Click.class);
clickStateDescriptor.enableTimeToLive(ttlConfigClick);
clickState = getRuntimeContext().getState(clickStateDescriptor);

StateTtlConfig ttlConfigAds = StateTtlConfig
.newBuilder(Time.days(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)

.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.cleanupInRocksdbCompactFilter(30_000_000)
.build();
ValueStateDescriptor adsStateDescriptor = new
ValueStateDescriptor<>("ads", slimAdsClass);
adsStateDescriptor.enableTimeToLive(ttlConfigAds);
adsState = getRuntimeContext().getState(adsStateDescriptor);
}

@Override
public void processElement(Tuple3 tuple, Context
ctx, Collector collector) throws Exception {
if (tuple.f1 != null) {
Click click = tuple.f1;

if (clickState.value() != null) {
return;
}

clickState.update(click);

A adsFromState = adsState.value();
if (adsFromState != null) {
collector.collect(adsFromState);
}
} else {
A ads = tuple.f2;

if (adsState.value() != null) {
return;
}

adsState.update(ads);

Click clickFromState = clickState.value();
if (clickFromState != null) {
collector.collect(ads);
}
}
}


Here is the snippet of sst files in local storage

[root@ db]# ll | head -n10
total 76040068
-rw-r- 1 hadoop yarn0 Aug 16 08:46 03.log
-rw-r- 1 hadoop yarn 67700362 Aug 17 02:38 001763.sst
-rw-r- 1 hadoop yarn 67698753 Aug 17 02:38 001764.sst
-rw-r- 1 hadoop yarn 67699769 Aug 17 02:59 001790.sst
-rw-r- 1 hadoop yarn 67701239 Aug 17 04:58 002149.sst
-rw-r- 1 hadoop yarn 67700607 Aug 17 04:58 002150.sst
-rw-r- 1 hadoop yarn 67697524 Aug 17 04:59 002151.sst
-rw-r- 1 hadoop yarn 67700729 Aug 17 06:20 002373.sst
-rw-r- 1 hadoop yarn 67700296 Aug 17 06:20 002374.sst
-- 
Regards,
Tao


Re: How to mount PVC volumes using Flink Native Kubernetes ?

2021-09-10 Thread tao xiao
Thanks David for the tips. We have been running Flink with no performance
degradation observed in EMR (which is EBS attached) for more than 1 year
therefore we believe the same performance can be applied in Kubernetes.

On Sat, Sep 11, 2021 at 3:13 AM David Morávek  wrote:

> OT: Beware that even if you manage to solve this, EBS is replicated
> network storage, therefore rocksdb performance will be affected
> significantly.
>
> Best,
> D.
>
> On Fri 10. 9. 2021 at 16:19, tao xiao  wrote:
>
>> The use case we have is to store the RocksDB sst files in EBS. The EC2
>> instance type (m5) we use doesn't provide local disk storage therefore EBS
>> is the only option to store the local sst file.
>>
>> On Fri, Sep 10, 2021 at 7:10 PM Yang Wang  wrote:
>>
>>> I am afraid Flink could not support creating dedicated PVC for each
>>> TaskManager pod now.
>>> But I think it might be a reasonable requirement.
>>>
>>> Could you please share why you need to mount a persistent volume claim
>>> per TaskManager?
>>> AFAIK, the TaskManager will be deleted once it fails. You expect the PVC
>>> to also be deleted. Right?
>>>
>>>
>>> Best,
>>> Yang
>>>
>>> Xiaolong Wang  于2021年9月10日周五 下午2:37写道:
>>>
>>>> Hi,
>>>>
>>>>I'm facing a tough question. I want to start a Flink Native
>>>> Kubernetes job with each of the task manager pod mounted with an aws-ebs
>>>> PVC.
>>>>
>>>>   The first thought is to use the pod-template file to do this, but it
>>>> soon went to a dead end. Since the pod-template on each of the task manager
>>>> pod is the same, how can I mount different PVCs ?
>>>>
>>>>This issue is quite puzzling, will you please help me ?
>>>>
>>>> Thanks in advance !
>>>>
>>>
>>
>> --
>> Regards,
>> Tao
>>
>

-- 
Regards,
Tao


Re: How to mount PVC volumes using Flink Native Kubernetes ?

2021-09-10 Thread tao xiao
The use case we have is to store the RocksDB sst files in EBS. The EC2
instance type (m5) we use doesn't provide local disk storage therefore EBS
is the only option to store the local sst file.

On Fri, Sep 10, 2021 at 7:10 PM Yang Wang  wrote:

> I am afraid Flink could not support creating dedicated PVC for each
> TaskManager pod now.
> But I think it might be a reasonable requirement.
>
> Could you please share why you need to mount a persistent volume claim per
> TaskManager?
> AFAIK, the TaskManager will be deleted once it fails. You expect the PVC
> to also be deleted. Right?
>
>
> Best,
> Yang
>
> Xiaolong Wang  于2021年9月10日周五 下午2:37写道:
>
>> Hi,
>>
>>I'm facing a tough question. I want to start a Flink Native Kubernetes
>> job with each of the task manager pod mounted with an aws-ebs PVC.
>>
>>   The first thought is to use the pod-template file to do this, but it
>> soon went to a dead end. Since the pod-template on each of the task manager
>> pod is the same, how can I mount different PVCs ?
>>
>>This issue is quite puzzling, will you please help me ?
>>
>> Thanks in advance !
>>
>

-- 
Regards,
Tao


Re: Exception in snapshotState suppresses subsequent checkpoints

2021-07-01 Thread tao xiao
Thanks for the pointer. let me try upgrading the flink

On Thu, Jul 1, 2021 at 5:29 PM Yun Tang  wrote:

> Hi Tao,
>
> I run your program with Flink-1.12.1 and found the problem you described
> really existed. And things would go normal if switching to Flink-1.12.2
> version.
>
> After dig into the root cause, I found this is caused by a fixed bug
> [1]: If a legacy source task fails outside of the legacy thread, the legacy
> thread blocks proper cancellation (completion future never completed). As
> you throw the NPE within the source operator, it will never exit and cannot
> handle subsequent checkpoint requirements then. That's why you see all
> subsequent checkpoints cannot finish.
>
>
> [1]
> https://github.com/apache/flink/commit/b332ce40d88be84d9cf896f446c7c6e26dbc8b6a#diff-54e9ce3b15d6badcc9376ab144df066eb46c4e516d6ee31ef8eb38e2d4359042
>
> Best
> Yun Tang
> --
> *From:* Matthias Pohl 
> *Sent:* Thursday, July 1, 2021 16:41
> *To:* tao xiao 
> *Cc:* Yun Tang ; user ; Roman
> Khachatryan 
> *Subject:* Re: Exception in snapshotState suppresses subsequent
> checkpoints
>
> Hi Tao,
> it looks like it should work considering that you have a sleep of 1 second
> before each emission. I'm going to add Roman to this thread. Maybe, he has
> sees something obvious which I'm missing.
> Could you run the job with the log level set to debug and provide the logs
> once more? Additionally, having both the TaskManager's and the
> JobManager's logs available would help in understanding what's going on.
>
> Best,
> Matthias
>
> On Wed, Jun 30, 2021 at 6:14 PM tao xiao  wrote:
>
> Hi team,
>
> Does anyone have a clue?
>
> On Mon, Jun 28, 2021 at 3:27 PM tao xiao  wrote:
>
> My job is very simple as you can see from the code I pasted. I simply
> print out the number to stdout. If you look at the log the number continued
> to print out after checkpoint 1 which indicated no back pressure was
> happening.  It is very easy to reproduce this if you run the code I
> provided in IDE
>
>
> LOG
>
> [2021-06-26 16:08:52,352] INFO Triggering checkpoint 1 (type=CHECKPOINT) @
> 1624694932345 for job afde4a82f41e8284cb0bfff20497a5cc.
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator)
> [2021-06-26 16:08:52,372] INFO Could not complete snapshot 1 for operator
> Source: Custom Source -> Sink: Print to Std. Out (1/1)#0. Failure reason:
> Checkpoint was declined.
> (org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl)
> org.apache.flink.runtime.checkpoint.CheckpointException: Could not
> complete snapshot 1 for operator Source: Custom Source -> Sink: Print to
> Std. Out (1/1)#0. Failure reason: Checkpoint was declined.
> at
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:241)
> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:162)
> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371)
> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:685)
> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:606)
> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:571)
> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:298)
> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$9(StreamTask.java:1003)
> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:993)
> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:912)
> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
>

Re: Exception in snapshotState suppresses subsequent checkpoints

2021-06-30 Thread tao xiao
Hi team,

Does anyone have a clue?

On Mon, Jun 28, 2021 at 3:27 PM tao xiao  wrote:

> My job is very simple as you can see from the code I pasted. I simply
> print out the number to stdout. If you look at the log the number continued
> to print out after checkpoint 1 which indicated no back pressure was
> happening.  It is very easy to reproduce this if you run the code I
> provided in IDE
>
>
> LOG
>
> [2021-06-26 16:08:52,352] INFO Triggering checkpoint 1 (type=CHECKPOINT) @
> 1624694932345 for job afde4a82f41e8284cb0bfff20497a5cc.
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator)
> [2021-06-26 16:08:52,372] INFO Could not complete snapshot 1 for operator
> Source: Custom Source -> Sink: Print to Std. Out (1/1)#0. Failure reason:
> Checkpoint was declined.
> (org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl)
> org.apache.flink.runtime.checkpoint.CheckpointException: Could not
> complete snapshot 1 for operator Source: Custom Source -> Sink: Print to
> Std. Out (1/1)#0. Failure reason: Checkpoint was declined.
> at
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:241)
> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:162)
> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371)
> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:685)
> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:606)
> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:571)
> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:298)
> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$9(StreamTask.java:1003)
> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:993)
> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:912)
> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$8(StreamTask.java:880)
> ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
> [flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> [flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
> [flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
> [flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
> [flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
> [flink-streaming-java_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> [flink-runtime_2.11-1.12.1.jar:1.12.1]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> [flink-runtime_2.11-1.12.1.jar:1.12.1]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_261]
> Caused by: org.apache.flink.util.SerializedThrowable: npe
> at
> com.smartnews.dp.kafka.sample.flink.FromElementsFunctionT.snapshotState(FromElementsFunctionT.java:111)
> ~[classes/:?]
> at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(

Re: Exception in snapshotState suppresses subsequent checkpoints

2021-06-28 Thread tao xiao
[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
... 20 more
[2021-06-26 16:08:55,357] INFO Checkpoint 1 of job
afde4a82f41e8284cb0bfff20497a5cc expired before completing.
(org.apache.flink.runtime.checkpoint.CheckpointCoordinator)
output
[2021-06-26 16:09:12,347] INFO Triggering checkpoint 2 (type=CHECKPOINT) @
1624694952346 for job afde4a82f41e8284cb0bfff20497a5cc.
(org.apache.flink.runtime.checkpoint.CheckpointCoordinator)
33
34
35
[2021-06-26 16:09:15,349] INFO Checkpoint 2 of job
afde4a82f41e8284cb0bfff20497a5cc expired before completing.
(org.apache.flink.runtime.checkpoint.CheckpointCoordinator)
36
37
38


Main function
StreamExecutionEnvironment env = StreamExecutionEnvironment.
getExecutionEnvironment();
env.setStateBackend(new FsStateBackend("file:///tmp/chpk", true));
env.enableCheckpointing(20_000, CheckpointingMode.AT_LEAST_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(3_000);
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1000);

env.addSource(new FromElementsFunctionT())
.setParallelism(1)
.print()
.setParallelism(1);
env.execute("Demo");

source funciton
package sample.flink;

import java.util.ArrayList;
import java.util.List;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Preconditions;

/**
* A stream source function that returns a sequence of elements.
*
* Upon construction, this source function serializes the elements using
Flink's type
* information. That way, any object transport using Java serialization will
not be affected by the
* serializability of the elements.
*
* NOTE: This source has a parallelism of 1.
*
*/
@PublicEvolving
public class FromElementsFunctionT implements SourceFunction,
CheckpointedFunction
{

private static final long serialVersionUID = 1L;

/** The number of elements emitted already. */
private volatile int numElementsEmitted;

/** Flag to make the source cancelable. */
private volatile boolean isRunning = true;

private transient ListState checkpointedState;

@Override
public void initializeState(FunctionInitializationContext context)
throws Exception
{
Preconditions.checkState(
this.checkpointedState == null,
"The " + getClass().getSimpleName() + " has already been initialized.");

this.checkpointedState =
context.getOperatorStateStore()
.getListState(
new ListStateDescriptor<>(
"from-elements-state", IntSerializer.INSTANCE));

if (context.isRestored()) {
List retrievedStates = new ArrayList<>();
for (Integer entry : this.checkpointedState.get()) {
retrievedStates.add(entry);
}

// given that the parallelism of the function is 1, we can only have 1 state
Preconditions.checkArgument(
retrievedStates.size() == 1,
getClass().getSimpleName() + " retrieved invalid state.");

this.numElementsEmitted = retrievedStates.get(0);
}
}

@Override
public void run(SourceContext ctx) throws Exception {
final Object lock = ctx.getCheckpointLock();

while (isRunning && numElementsEmitted < Integer.MAX_VALUE) {
Thread.sleep(1000);
synchronized (lock) {
ctx.collect(numElementsEmitted++);
}
}
}

@Override
public void cancel() {
isRunning = false;
}

// 
// Checkpointing
// 

@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception
{
Preconditions.checkState(
this.checkpointedState != null,
"The " + getClass().getSimpleName() + " has not been properly initialized.")
;

this.checkpointedState.clear();
this.checkpointedState.add(this.numElementsEmitted);
throw new NullPointerException("npe");
}
}




On Mon, Jun 28, 2021 at 2:36 PM Yun Tang  wrote:

> Hi Tao,
>
> I'm afraid that your Flink job continues to be in high backpressued and
> all subsequent checkpoints did not ever run
> 'FromElementsFunctionT#snapshotState' which means your code to throw
> exception never be executed. You could check those expired checkpoints to
> see whether your tasks containing 'FromElementsFunctionT' has ever been
> completed.
>
> Best
> Yun Tang
> --
> *From:* tao xiao 
> *Sent:* Saturday, June 26, 2021 16:40
> *To:* user 
> *Subject:* Re: Exception in snapshotState suppresses subsequent
> checkpoints
>
> Btw here is the checkpoint related log
>
> [2021-06-26 16:08:52,352] INFO Triggering checkpoint 1 (type=CHECKPOINT) @
> 1624694932345 for

Exception in snapshotState suppresses subsequent checkpoints

2021-06-26 Thread tao xiao
Hi team,

I run a simple 1.12.1 Flink job in IDE with
TolerableCheckpointFailureNumber set
where I throw an exception in source function snapshotState intentionally to
verify how Flink behaves. What I find is the first checkpoint throws the
exception and eventually time out while the main flow continues to work.
This is expected however all subsequent checkpoints don't reach the
exception anymore and report timeout when timeout reaches. I want to know
if this is expected behavior which all later checkpoints cannot finish if
there is one checkpoint that throws exception.

Below is the code the reproduce the behavior
main

StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new FsStateBackend("file:///tmp/chpk", true));
env.enableCheckpointing(20_000, CheckpointingMode.AT_LEAST_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(3_000);
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1000);

env.addSource(new FromElementsFunctionT())
.setParallelism(1)
.print()
.setParallelism(1);
env.execute("Demo");


Source function

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.smartnews.dp.kafka.sample.flink;

import java.util.ArrayList;
import java.util.List;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Preconditions;

/**
 * A stream source function that returns a sequence of elements.
 *
 * Upon construction, this source function serializes the elements
using Flink's type
 * information. That way, any object transport using Java
serialization will not be affected by the
 * serializability of the elements.
 *
 * NOTE: This source has a parallelism of 1.
 *
 */
@PublicEvolving
public class FromElementsFunctionT implements SourceFunction,
CheckpointedFunction {

private static final long serialVersionUID = 1L;

/** The number of elements emitted already. */
private volatile int numElementsEmitted;

/** Flag to make the source cancelable. */
private volatile boolean isRunning = true;

private transient ListState checkpointedState;

@Override
public void initializeState(FunctionInitializationContext context)
throws Exception {
Preconditions.checkState(
this.checkpointedState == null,
"The " + getClass().getSimpleName() + " has already
been initialized.");

this.checkpointedState =
context.getOperatorStateStore()
.getListState(
new ListStateDescriptor<>(
"from-elements-state",
IntSerializer.INSTANCE));

if (context.isRestored()) {
List retrievedStates = new ArrayList<>();
for (Integer entry : this.checkpointedState.get()) {
retrievedStates.add(entry);
}

// given that the parallelism of the function is 1, we can
only have 1 state
Preconditions.checkArgument(
retrievedStates.size() == 1,
getClass().getSimpleName() + " retrieved invalid state.");

this.numElementsEmitted = retrievedStates.get(0);
}
}

@Override
public void run(SourceContext ctx) throws Exception {
final Object lock = ctx.getCheckpointLock();

while (isRunning && numElementsEmitted < Integer.MAX_VALUE) {
Thread.sleep(1000);
synchronized (lock) {
ctx.collect(numElementsEmitted++);
}
}
}

@Override
public void cancel() {
isRunning = false;
}

// 
//  Checkpointing
// 

Re: Exception in snapshotState suppresses subsequent checkpoints

2021-06-26 Thread tao xiao
KPOINT) @
1624694952346 for job afde4a82f41e8284cb0bfff20497a5cc.
(org.apache.flink.runtime.checkpoint.CheckpointCoordinator)
33
34
35
[2021-06-26 16:09:15,349] INFO Checkpoint 2 of job
afde4a82f41e8284cb0bfff20497a5cc expired before completing.
(org.apache.flink.runtime.checkpoint.CheckpointCoordinator)

On Sat, Jun 26, 2021 at 4:36 PM tao xiao  wrote:

> Hi team,
>
> I run a simple 1.12.1 Flink job in IDE with
> TolerableCheckpointFailureNumber set where I throw an exception in source
> function snapshotState intentionally to verify how Flink behaves. What I
> find is the first checkpoint throws the exception and eventually time out
> while the main flow continues to work. This is expected however all
> subsequent checkpoints don't reach the exception anymore and report timeout
> when timeout reaches. I want to know if this is expected behavior which all
> later checkpoints cannot finish if there is one checkpoint that throws
> exception.
>
> Below is the code the reproduce the behavior
> main
>
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStateBackend(new FsStateBackend("file:///tmp/chpk", true));
> env.enableCheckpointing(20_000, CheckpointingMode.AT_LEAST_ONCE);
> env.getCheckpointConfig().setCheckpointTimeout(3_000);
> env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1000);
>
> env.addSource(new FromElementsFunctionT())
> .setParallelism(1)
> .print()
> .setParallelism(1);
> env.execute("Demo");
>
>
> Source function
>
> /*
>  * Licensed to the Apache Software Foundation (ASF) under one or more
>  * contributor license agreements.  See the NOTICE file distributed with
>  * this work for additional information regarding copyright ownership.
>  * The ASF licenses this file to You under the Apache License, Version 2.0
>  * (the "License"); you may not use this file except in compliance with
>  * the License.  You may obtain a copy of the License at
>  *
>  *http://www.apache.org/licenses/LICENSE-2.0
>  *
>  * Unless required by applicable law or agreed to in writing, software
>  * distributed under the License is distributed on an "AS IS" BASIS,
>  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>  * See the License for the specific language governing permissions and
>  * limitations under the License.
>  */
>
> package sample.flink;
>
> import java.util.ArrayList;
> import java.util.List;
> import org.apache.flink.annotation.PublicEvolving;
> import org.apache.flink.api.common.state.ListState;
> import org.apache.flink.api.common.state.ListStateDescriptor;
> import org.apache.flink.api.common.typeutils.base.IntSerializer;
> import org.apache.flink.runtime.state.FunctionInitializationContext;
> import org.apache.flink.runtime.state.FunctionSnapshotContext;
> import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
> import org.apache.flink.streaming.api.functions.source.SourceFunction;
> import org.apache.flink.util.Preconditions;
>
> /**
>  * A stream source function that returns a sequence of elements.
>  *
>  * Upon construction, this source function serializes the elements using 
> Flink's type
>  * information. That way, any object transport using Java serialization will 
> not be affected by the
>  * serializability of the elements.
>  *
>  * NOTE: This source has a parallelism of 1.
>  *
>  */
> @PublicEvolving
> public class FromElementsFunctionT implements SourceFunction, 
> CheckpointedFunction {
>
> private static final long serialVersionUID = 1L;
>
> /** The number of elements emitted already. */
> private volatile int numElementsEmitted;
>
> /** Flag to make the source cancelable. */
> private volatile boolean isRunning = true;
>
> private transient ListState checkpointedState;
>
> @Override
> public void initializeState(FunctionInitializationContext context) throws 
> Exception {
> Preconditions.checkState(
> this.checkpointedState == null,
> "The " + getClass().getSimpleName() + " has already been 
> initialized.");
>
> this.checkpointedState =
> context.getOperatorStateStore()
> .getListState(
> new ListStateDescriptor<>(
> "from-elements-state", 
> IntSerializer.INSTANCE));
>
> if (context.isRestored()) {
> List retrievedStates = new ArrayList<>();
> for (Integer entry : this.checkpointedState.get()) {
> retrievedStates.add(entry);
>

Is it possible to customize avro schema name when using SQL

2021-06-06 Thread tao xiao
Hi team,

I want to use avro-confluent to encode the data using SQL but the schema
registered by the encoder hard code the schema name to 'record'. is it
possible to dictate the name?

-- 
Regards,
Tao


Re: avro-confluent supports authentication enabled schema registry

2021-06-02 Thread tao xiao
JIRA created https://issues.apache.org/jira/browse/FLINK-22858 but I cannot
assign it to myself. Can you pls assign it to me?

On Wed, Jun 2, 2021 at 11:00 PM Fabian Paul 
wrote:

> Hi Tao,
>
> I was browsing the code a bit and I think this is currently not support
> but it seems to be not too
> difficult to implement. You would need to allow a map of configurations
> and finally pass it to [1]
>
> Can you create a ticket in our JIRA?
> Would you be willing to contribute this feature?
>
> Best,
> Fabian
>
>
> [1]
> https://github.com/apache/flink/blob/1db4e560d1b46fac27a18bce9556fec646f063d9/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/CachedSchemaCoderProvider.java#L54
>
> On 2. Jun 2021, at 13:57, tao xiao  wrote:
>
> Hi Fabian,
>
> Unfortunately this will not work in our environment where we implement our
> own 
> io.confluent.kafka.schemaregistry.client.security.bearerauth.BearerAuthCredentialProvider
> which does the login and supplies the JWT to authorization HTTP header. The
> only way it will work is to pass the schema registry
> config BEARER_AUTH_CREDENTIALS_SOURCE [1] to table format factory
>
> [1]
> https://github.com/confluentinc/schema-registry/blob/master/schema-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaSchemaSerDeConfig.java#L85
>
> On Wed, Jun 2, 2021 at 5:27 PM Fabian Paul 
> wrote:
>
>> Hi Tao,
>>
>> Thanks for reaching out. Have you tried the following
>>
>>  'value.avro-confluent.schema-registry.url' = 
>> 'https://{SR_API_KEY}:{SR_API_SECRET}@psrc-lzvd0.ap-southeast-2.aws.confluent.cloud',
>>
>>
>>
>> It may be possible to provide basic HTTP authentication by adding your
>> username and password to the URL. There is already a similar ticket open
>> unfortunately without much progress. [1]
>> Please let me know if this works for  you otherwise we can try to find a
>> different solution.
>>
>> Best,
>> Fabian
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-22763
>>
>>
>> On 2. Jun 2021, at 10:58, tao xiao  wrote:
>>
>> Hi team,
>>
>> Confluent schema registry supports HTTP basic authentication[1] but I
>> don't find the corresponding configs in Flink documentation[2]. Is this
>> achievable in Flink avro-confluent?
>>
>>
>> [1]
>> https://docs.confluent.io/platform/current/confluent-security-plugins/schema-registry/install.html#authentication-mechanisms
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/formats/avro-confluent/#format-options
>> --
>> Regards,
>> Tao
>>
>>
>>
>
> --
> Regards,
> Tao
>
>
>

-- 
Regards,
Tao


Re: avro-confluent supports authentication enabled schema registry

2021-06-02 Thread tao xiao
Hi Fabian,

Unfortunately this will not work in our environment where we implement our
own 
io.confluent.kafka.schemaregistry.client.security.bearerauth.BearerAuthCredentialProvider
which does the login and supplies the JWT to authorization HTTP header. The
only way it will work is to pass the schema registry
config BEARER_AUTH_CREDENTIALS_SOURCE [1] to table format factory

[1]
https://github.com/confluentinc/schema-registry/blob/master/schema-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaSchemaSerDeConfig.java#L85

On Wed, Jun 2, 2021 at 5:27 PM Fabian Paul 
wrote:

> Hi Tao,
>
> Thanks for reaching out. Have you tried the following
>
>  'value.avro-confluent.schema-registry.url' = 
> 'https://{SR_API_KEY}:{SR_API_SECRET}@psrc-lzvd0.ap-southeast-2.aws.confluent.cloud',
>
>
>
> It may be possible to provide basic HTTP authentication by adding your
> username and password to the URL. There is already a similar ticket open
> unfortunately without much progress. [1]
> Please let me know if this works for  you otherwise we can try to find a
> different solution.
>
> Best,
> Fabian
>
> [1] https://issues.apache.org/jira/browse/FLINK-22763
>
>
> On 2. Jun 2021, at 10:58, tao xiao  wrote:
>
> Hi team,
>
> Confluent schema registry supports HTTP basic authentication[1] but I
> don't find the corresponding configs in Flink documentation[2]. Is this
> achievable in Flink avro-confluent?
>
>
> [1]
> https://docs.confluent.io/platform/current/confluent-security-plugins/schema-registry/install.html#authentication-mechanisms
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/formats/avro-confluent/#format-options
> --
> Regards,
> Tao
>
>
>

-- 
Regards,
Tao


avro-confluent supports authentication enabled schema registry

2021-06-02 Thread tao xiao
Hi team,

Confluent schema registry supports HTTP basic authentication[1] but I don't
find the corresponding configs in Flink documentation[2]. Is this
achievable in Flink avro-confluent?


[1]
https://docs.confluent.io/platform/current/confluent-security-plugins/schema-registry/install.html#authentication-mechanisms
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/formats/avro-confluent/#format-options
-- 
Regards,
Tao


Re: classloader.resolve-order is not honored when submitting job to a remote cluster

2021-06-01 Thread tao xiao
Hi Till,

The PR covers the problem and it will fix the inconsistent class loading
issue

On Tue, Jun 1, 2021 at 9:55 PM Till Rohrmann  wrote:

> Hi Tao,
>
> I think this looks like a bug to me. Could it be that this problem is
> covered by [1, 2]? Maybe you can review this PR and check whether it solves
> the problem. If yes, then let's quickly get it in.
>
> [1] https://issues.apache.org/jira/browse/FLINK-21445
> [2] https://github.com/apache/flink/pull/15020
>
> Cheers,
> Till
>
> On Sun, May 30, 2021 at 9:41 AM tao xiao  wrote:
>
>> Hi team,
>>
>> I discovered that child first class loader is always used to initialize
>> the main program when submitting the job to a yarn cluster using
>> application mode regardless of what value classloader.resolve-order is set
>> in flink-conf.yaml. But this is not the case if I submit the same job with
>> the same config to the local cluster which honors the config and use the
>> correct class loader to load the main program. Here is the log from local
>> cluster
>>
>> 2021-05-30 15:01:16,372 INFO  org.apache.flink.client.cli.CliFrontend
>>  [] -
>> 
>> 2021-05-30 15:01:16,375 INFO  org.apache.flink.client.cli.CliFrontend
>>  [] -  Starting Command Line Client (Version: 1.12.1,
>> Scala: 2.11, Rev:dc404e2, Date:2021-01-09T14:46:36+01:00)
>> [trim down the log]
>> *2021-05-30 15:01:16,616 INFO
>>  org.apache.flink.configuration.GlobalConfiguration   [] - Loading
>> configuration property: classloader.resolve-order, parent-first*
>> 2021-05-30 15:01:16,763 WARN  org.apache.flink.runtime.util.HadoopUtils
>>  [] - Could not find Hadoop configuration via any of the
>> supported methods (Flink configuration, environment variables).
>> [trim down the log]
>> 2021-05-30 15:01:16,830 INFO  org.apache.flink.client.ClientUtils
>>  [] - Starting program (detached: false)
>> *2021-05-30 15:01:16,871 INFO  io.demo.flink.WordCount
>>[] - Loaded by
>> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ParentFirstClassLoader@14c053c6*
>>
>> Here is the log from yarn cluster
>> 2021-05-30 07:20:14,434 INFO
>>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
>> 
>> 2021-05-30 07:20:14,438 INFO
>>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
>>  Starting YarnApplicationClusterEntryPoint (Version: 1.12.1, Scala: 2.11,
>> Rev:dc404e2, Date:2021-01-09T14:46:36+01:00)
>> [trim down the log]
>> 2021-05-30 07:20:15,205 INFO
>>  org.apache.flink.configuration.GlobalConfiguration   [] - Loading
>> configuration property: taskmanager.memory.process.size, 2048m
>> *2021-05-30 07:20:15,205 INFO
>>  org.apache.flink.configuration.GlobalConfiguration   [] - Loading
>> configuration property: classloader.resolve-order, parent-first*
>> 2021-05-30 07:20:15,205 INFO
>>  org.apache.flink.configuration.GlobalConfiguration   [] - Loading
>> configuration property: metrics.scope.jm, flink.jobmanager
>> [trim down the log]
>> *2021-05-30 07:20:21,383 INFO  io.demo.flink.WordCount
>>[] - Loaded by
>> org.apache.flink.util.ChildFirstClassLoader@3da30852*
>>
>> Here is the job to reproduce the problem
>>
>> public static void main(String[] args) throws Exception {
>>
>>  // set up the execution environment
>>  final StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>
>>  LOG.info("Loaded by {}", WordCount.class.getClassLoader());
>>  // get input data
>>  DataStreamSource text = env.fromElements(
>>"To be, or not to be,--that is the question:--",
>>"Whether 'tis nobler in the mind to suffer",
>>"The slings and arrows of outrageous fortune",
>>"Or to take arms against a sea of troubles,"
>>);
>>
>>text.print();
>>  env.execute("demo job");
>>
>> }
>>
>>
>> Flink version 1.12.1
>>
>> I believe the inconsistency is the result of user defined flink-conf not
>> passed to PackageProgram which uses default config instead
>>
>>
>> https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ClassPathPackagedProgramRetriever.java#L109
>>
>> Not sure if this is expected behavior that we never assume the main
>> program is loaded with the configured class loader
>> --
>> Regards,
>> Tao
>>
>

-- 
Regards,
Tao


classloader.resolve-order is not honored when submitting job to a remote cluster

2021-05-30 Thread tao xiao
Hi team,

I discovered that child first class loader is always used to initialize the
main program when submitting the job to a yarn cluster using application
mode regardless of what value classloader.resolve-order is set in
flink-conf.yaml. But this is not the case if I submit the same job with the
same config to the local cluster which honors the config and use the
correct class loader to load the main program. Here is the log from local
cluster

2021-05-30 15:01:16,372 INFO  org.apache.flink.client.cli.CliFrontend
   [] -

2021-05-30 15:01:16,375 INFO  org.apache.flink.client.cli.CliFrontend
   [] -  Starting Command Line Client (Version: 1.12.1, Scala:
2.11, Rev:dc404e2, Date:2021-01-09T14:46:36+01:00)
[trim down the log]
*2021-05-30 15:01:16,616 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: classloader.resolve-order, parent-first*
2021-05-30 15:01:16,763 WARN  org.apache.flink.runtime.util.HadoopUtils
   [] - Could not find Hadoop configuration via any of the
supported methods (Flink configuration, environment variables).
[trim down the log]
2021-05-30 15:01:16,830 INFO  org.apache.flink.client.ClientUtils
   [] - Starting program (detached: false)
*2021-05-30 15:01:16,871 INFO  io.demo.flink.WordCount
 [] - Loaded by
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ParentFirstClassLoader@14c053c6*

Here is the log from yarn cluster
2021-05-30 07:20:14,434 INFO
 org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -

2021-05-30 07:20:14,438 INFO
 org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
 Starting YarnApplicationClusterEntryPoint (Version: 1.12.1, Scala: 2.11,
Rev:dc404e2, Date:2021-01-09T14:46:36+01:00)
[trim down the log]
2021-05-30 07:20:15,205 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: taskmanager.memory.process.size, 2048m
*2021-05-30 07:20:15,205 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: classloader.resolve-order, parent-first*
2021-05-30 07:20:15,205 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: metrics.scope.jm, flink.jobmanager
[trim down the log]
*2021-05-30 07:20:21,383 INFO  io.demo.flink.WordCount
 [] - Loaded by
org.apache.flink.util.ChildFirstClassLoader@3da30852*

Here is the job to reproduce the problem

public static void main(String[] args) throws Exception {

 // set up the execution environment
 final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

 LOG.info("Loaded by {}", WordCount.class.getClassLoader());
 // get input data
 DataStreamSource text = env.fromElements(
   "To be, or not to be,--that is the question:--",
   "Whether 'tis nobler in the mind to suffer",
   "The slings and arrows of outrageous fortune",
   "Or to take arms against a sea of troubles,"
   );

   text.print();
 env.execute("demo job");

}


Flink version 1.12.1

I believe the inconsistency is the result of user defined flink-conf not
passed to PackageProgram which uses default config instead

https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ClassPathPackagedProgramRetriever.java#L109

Not sure if this is expected behavior that we never assume the main program
is loaded with the configured class loader
-- 
Regards,
Tao


Re: No result shown when submitting the SQL in cli

2021-05-11 Thread tao xiao
Thanks for the reply. Finally got it working. The output actually get
printed out on client side not on TM side

On Tue, May 11, 2021 at 11:47 PM Jeff Zhang  wrote:

> The result is printed in TM.
> It is local mode in IDE, so TM runs in your local jvm that's why you see
> the result
> While it is distributed mode (either yarn or standalone mode) when you are
> in sql-client, you should be able to see the result in TM logs.
>
>
> tao xiao  于2021年5月11日周二 下午11:40写道:
>
>> Does anyone help with this question?
>>
>> On Thu, May 6, 2021 at 3:26 PM tao xiao  wrote:
>>
>>> Hi team,
>>>
>>> I wrote a simple SQL job to select data from Kafka. I can see results
>>> printing out in IDE but when I submit the job to a standalone cluster in
>>> CLI there is no result shown. I am sure the job is running well in the
>>> cluster with debug log suggesting that the kafka consumer is fetching data
>>> from Kafka. I enabled debug log in CLI and I don't see any obvious log.
>>> Here is the job code snippet
>>>
>>> public static void main(String[] args) throws Exception {
>>>   StreamTableEnvironment tableEnv = StreamTableEnvironment
>>>   
>>> .create(StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1));
>>>
>>>   String sqls = new String(Files.readAllBytes(Paths.get(args[0])));
>>>   splitIgnoreQuota(sqls, ';').forEach(sql -> {
>>> TableResult tableResult = tableEnv.executeSql(sql);
>>> tableResult.print();
>>>   });
>>> }
>>>
>>> It simply parses a sql file and execute the statements
>>>
>>> Here is the SQL statements
>>>
>>> CREATE TABLE t1 (
>>>   `f1` STRING,
>>>   `f2` STRING
>>> ) WITH (
>>>   'connector' = 'kafka',
>>>   'topic' = 'topic',
>>>   'properties.group.id' = 'test1',
>>>   'properties.max.partition.fetch.bytes' = '16384',
>>>   'properties.enable.auto.commit' = 'false',
>>>   'properties.bootstrap.servers' = 'kafka:9092',
>>>   'scan.startup.mode' = 'earliest-offset',
>>>   'format' = 'json'
>>> );
>>>
>>> SELECT * FROM t1
>>>
>>>
>>> Below is the result I got from IDE
>>> | +I | b8f5 |   abcd |
>>> | +I | b8f5 |   abcd |
>>>
>>> And this is the result from CLI
>>> bin/flink run  -m localhost:8081 -c kafka.sample.flink.SQLSample
>>> ~/workspaces/kafka-sample/target/kafka-sample-0.1.2-jar-with-dependencies.jar
>>> /sample.sql
>>> ++
>>> | result |
>>> ++
>>> | OK |
>>> ++
>>> 1 row in set
>>> Job has been submitted with JobID ace45d2ff850675243e2663d3bf11701
>>> ++++
>>> | op |   uuid |ots |
>>> ++++
>>>
>>>
>>> --
>>> Regards,
>>> Tao
>>>
>>
>>
>> --
>> Regards,
>> Tao
>>
>
>
> --
> Best Regards
>
> Jeff Zhang
>


-- 
Regards,
Tao


Re: No result shown when submitting the SQL in cli

2021-05-11 Thread tao xiao
Does anyone help with this question?

On Thu, May 6, 2021 at 3:26 PM tao xiao  wrote:

> Hi team,
>
> I wrote a simple SQL job to select data from Kafka. I can see results
> printing out in IDE but when I submit the job to a standalone cluster in
> CLI there is no result shown. I am sure the job is running well in the
> cluster with debug log suggesting that the kafka consumer is fetching data
> from Kafka. I enabled debug log in CLI and I don't see any obvious log.
> Here is the job code snippet
>
> public static void main(String[] args) throws Exception {
>   StreamTableEnvironment tableEnv = StreamTableEnvironment
>   
> .create(StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1));
>
>   String sqls = new String(Files.readAllBytes(Paths.get(args[0])));
>   splitIgnoreQuota(sqls, ';').forEach(sql -> {
> TableResult tableResult = tableEnv.executeSql(sql);
> tableResult.print();
>   });
> }
>
> It simply parses a sql file and execute the statements
>
> Here is the SQL statements
>
> CREATE TABLE t1 (
>   `f1` STRING,
>   `f2` STRING
> ) WITH (
>   'connector' = 'kafka',
>   'topic' = 'topic',
>   'properties.group.id' = 'test1',
>   'properties.max.partition.fetch.bytes' = '16384',
>   'properties.enable.auto.commit' = 'false',
>   'properties.bootstrap.servers' = 'kafka:9092',
>   'scan.startup.mode' = 'earliest-offset',
>   'format' = 'json'
> );
>
> SELECT * FROM t1
>
>
> Below is the result I got from IDE
> | +I | b8f5 |   abcd |
> | +I | b8f5 |   abcd |
>
> And this is the result from CLI
> bin/flink run  -m localhost:8081 -c kafka.sample.flink.SQLSample
> ~/workspaces/kafka-sample/target/kafka-sample-0.1.2-jar-with-dependencies.jar
> /sample.sql
> ++
> | result |
> ++
> | OK |
> ++
> 1 row in set
> Job has been submitted with JobID ace45d2ff850675243e2663d3bf11701
> ++++
> | op |   uuid |ots |
> ++++
>
>
> --
> Regards,
> Tao
>


-- 
Regards,
Tao


No result shown when submitting the SQL in cli

2021-05-06 Thread tao xiao
Hi team,

I wrote a simple SQL job to select data from Kafka. I can see results
printing out in IDE but when I submit the job to a standalone cluster in
CLI there is no result shown. I am sure the job is running well in the
cluster with debug log suggesting that the kafka consumer is fetching data
from Kafka. I enabled debug log in CLI and I don't see any obvious log.
Here is the job code snippet

public static void main(String[] args) throws Exception {
  StreamTableEnvironment tableEnv = StreamTableEnvironment
  
.create(StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1));

  String sqls = new String(Files.readAllBytes(Paths.get(args[0])));
  splitIgnoreQuota(sqls, ';').forEach(sql -> {
TableResult tableResult = tableEnv.executeSql(sql);
tableResult.print();
  });
}

It simply parses a sql file and execute the statements

Here is the SQL statements

CREATE TABLE t1 (
  `f1` STRING,
  `f2` STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'topic',
  'properties.group.id' = 'test1',
  'properties.max.partition.fetch.bytes' = '16384',
  'properties.enable.auto.commit' = 'false',
  'properties.bootstrap.servers' = 'kafka:9092',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);

SELECT * FROM t1


Below is the result I got from IDE
| +I | b8f5 |   abcd |
| +I | b8f5 |   abcd |

And this is the result from CLI
bin/flink run  -m localhost:8081 -c kafka.sample.flink.SQLSample
~/workspaces/kafka-sample/target/kafka-sample-0.1.2-jar-with-dependencies.jar
/sample.sql
++
| result |
++
| OK |
++
1 row in set
Job has been submitted with JobID ace45d2ff850675243e2663d3bf11701
++++
| op |   uuid |ots |
++++


-- 
Regards,
Tao


Re: Rich Function Thread Safety

2020-05-07 Thread tao xiao
As the java doc suggests it seems operator method and snapshot checkpoint
are accessed by two different threads

https://github.com/apache/flink/blob/release-1.10.0/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java#L39-L62

On Thu, May 7, 2020 at 1:22 AM Joey Echeverria 
wrote:

> I’ve seen a few mailing list posts (including this one) that say Flink
> guarantees there is no concurrent access to operator methods (e.g. flatMap,
> snapshotState, etc.) and thus synchronization isn’t needed when writing
> operators that support checkpointing. I was trying to find a place in the
> official docs where this was called out, but was coming up empty.
>
> Is there a section of the docs that covers this topic?
>
> Thanks!
>
> -Joey
>
> On Dec 18, 2019, at 9:38 PM, Zhu Zhu  wrote:
>
> [--- This email originated from outside of the organization. Do not click
> links or open attachments unless you recognize the sender and know the
> content is safe. ---]
>
> Hi Aaron,
>
> It is thread safe since the state snapshot happens in the same thread with
> the user function.
>
> Thanks,
> Zhu Zhu
>
> Aaron Langford  于2019年12月19日周四 上午11:25写道:
>
>> Hello Flink Community,
>>
>> I'm hoping to verify some understanding:
>>
>> If I have a function with managed state, I'm wondering if a
>> checkpoint will ever be taken while a function is mutating state. I'll try
>> to illustrate the situation I'm hoping to be safe from:
>>
>> Happy Path:
>> t0 -> processFunction invoked with el1
>> t1 -> set A to 5
>> t2 -> set B to 10
>> t3 -> function returns
>>
>> Unhappy path:
>> t0 -> processFunction invoked with el1
>> t1 -> set A to 5
>> t2 -> function interrupted, checkpoint taken (A = 5, B = 1)
>> t3 -> set B to 10
>> t4 -> function returns
>> ...
>> tn -> flink application fails, restart from prev checkpoint (A=5, B=1)
>> tn+1 -> recovery begins somewhere, but state is torn anyway, so we're
>> going to have a bad time
>>
>> I don't think this could happen given that checkpoints effectively are
>> messages in the pipeline, and the checkpoint is only taken when an operator
>> sees the checkpoint barrier.
>>
>> Hoping to make sure this is correct!
>>
>> Aaron
>>
>
>

-- 
Regards,
Tao


Re: Unable to serialize org.apache.kafka.common.config.types.Password

2018-12-25 Thread tao xiao
Thanks, it works

On Wed, 26 Dec 2018 at 10:07 fudian.fd  wrote:

> The exception is very clear that the SourceFunction should be
> serializable. Password is not serializable. You can try to set the kafka
> consumer properties such as this:
>
> props.put(SaslConfigs.SASL_JAAS_CONFIG, "LoginModule required
> subject=\"test\" secret=\"test\";");
>
> The String value will be parsed to Password object.(refer to the method
> org.apache.kafka.common.config.ConfigDef.parseType)
>
> Regards,
> Dian
>
>
> 在 2018年12月25日,下午11:04,tao xiao  写道:
>
> Hi team,
>
> I am passing a security enabled kafka consumer properties to
> FlinkKafkaConsumer but keep getting this
> error java.io.NotSerializableException? what is the best way to handle this?
>
> I use Flink 1.7.1 and here is the consumer property that produces the
> exception
>
> props.put(SaslConfigs.SASL_JAAS_CONFIG, new Password("LoginModule required
> subject=\"test\" secret=\"test\";"));
>
> stacktrace
> Exception in thread "main"
> org.apache.flink.api.common.InvalidProgramException: The implementation of
> the FlinkKafkaConsumerBase is not serializable. The object probably
> contains or references non serializable fields.
> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1559)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1471)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1415)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1397)
> at
> org.apache.flink.streaming.examples.statemachine.KafkaEventsGeneratorJob.main(KafkaEventsGeneratorJob.java:69)
> Caused by: java.io.NotSerializableException:
> org.apache.kafka.common.config.types.Password
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at java.util.Hashtable.writeObject(Hashtable.java:1157)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:534)
> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
> ... 5 more
>
>
>


Unable to serialize org.apache.kafka.common.config.types.Password

2018-12-25 Thread tao xiao
Hi team,

I am passing a security enabled kafka consumer properties to
FlinkKafkaConsumer but keep getting this
error java.io.NotSerializableException? what is the best way to handle this?

I use Flink 1.7.1 and here is the consumer property that produces the
exception

props.put(SaslConfigs.SASL_JAAS_CONFIG, new Password("LoginModule required
subject=\"test\" secret=\"test\";"));

stacktrace
Exception in thread "main"
org.apache.flink.api.common.InvalidProgramException: The implementation of
the FlinkKafkaConsumerBase is not serializable. The object probably
contains or references non serializable fields.
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1559)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1471)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1415)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1397)
at
org.apache.flink.streaming.examples.statemachine.KafkaEventsGeneratorJob.main(KafkaEventsGeneratorJob.java:69)
Caused by: java.io.NotSerializableException:
org.apache.kafka.common.config.types.Password
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at java.util.Hashtable.writeObject(Hashtable.java:1157)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:534)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
... 5 more


Register a user scope metric in window reduce function

2017-01-18 Thread tao xiao
Hi team,

Is there any way that I can register a metric in a window reduce function?
As per the flink doc getRuntimecontext is only available in RichFunction
but window operator doesn't allow RichFunction to be chained. Any way to
workaround this?


Re: Restart the job from a checkpoint

2017-01-16 Thread tao xiao
Hi Ufuk,

Thank you for the reply. I want to know what the difference is between
state.backend.fs.checkpoint.dir
and state.checkpoints.dir in this case? Does state.checkpoint.dir store the
metadata that points to the checkpoint that is stored in
state.backend.fs.checkpoint.dir?

On Mon, 16 Jan 2017 at 19:24 Ufuk Celebi <u...@apache.org> wrote:

> Hey!
>
> This is possible with the upcoming 1.2 version of Flink (also in the
> current snapshot version):
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/checkpoints.html#externalized-checkpoints
>
> You have to manually activate it via the checkpoint config (see docs).
>
> Ping me if you have any questions.
>
> – Ufuk
>
>
> On Mon, Jan 16, 2017 at 5:51 AM, tao xiao <xiaotao...@gmail.com> wrote:
> > Hi team,
> >
> > Can we restart a flink job from previous successful checkpoint? I know we
> > can start a flink from a savepoint but I wonder if I can do it similar by
> > passing the checkpoint path to the flink run command to restore the job
> from
> > checkpoint.
>


Restart the job from a checkpoint

2017-01-15 Thread tao xiao
Hi team,

Can we restart a flink job from previous successful checkpoint? I know we
can start a flink from a savepoint but I wonder if I can do it similar by
passing the checkpoint path to the flink run command to restore the job
from checkpoint.


Re: Kafka topic partition skewness causes watermark not being emitted

2017-01-14 Thread tao xiao
The case I described was for experiment only but data skewness would happen
in production. The current implementation will block the watermark emission
to downstream until all partition move forward which has great impact on
latency. It may be a good idea to expose an API to users to decide what the
best way is to control watermark emission

On Fri, 13 Jan 2017 at 21:57 Tzu-Li (Gordon) Tai <tzuli...@apache.org>
wrote:

> Hi,
>
> This is expected behaviour due to how the per-partition watermarks are
> designed in the Kafka consumer, but I think it’s probably a good idea to
> handle idle partitions also when the Kafka consumer itself emits
> watermarks. I’ve filed a JIRA issue for this:
> https://issues.apache.org/jira/browse/FLINK-5479.
>
> For the time being, I don’t think there will be an easy way to avoid this
> with the existing APIs, unfortunately. Is the skewed partition data
> intentional, or only for experimental purposes?
>
> Best,
> Gordon
>
> On January 12, 2017 at 5:28:40 PM, tao xiao (xiaotao...@gmail.com) wrote:
>
> Hi team,
>
> I have a topic with 2 partitions in Kafka. I produced all data to
> partition 0 and no data to partition 1. I created a Flink job with
> parallelism to 1 that consumes that topic and count the events with session
> event window (5 seconds gap). It turned out that the session event window
> was never closed even I sent a message with 10 minutes gap. After digging
> into the source code, AbstractFetcher[1] that is responsible for sending
> watermark to downstream calculates the min watermark of all partitions. Due
> to the fact that we don't have data in partition 1, the watermark returned
> from partition 1is always Long.MIN_VALUE therefore AbstractFetcher never
> fires the watermark to downstream.
>
> I want to know if this is expected behavior or a bug. If this is expected
> behavior how do I avoid the delay of watermark firing when data is not
> evenly distributed to all partitions?
>
> This is the timestamp extractor I used
>
> public class ExactTimestampExtractor implements
> AssignerWithPeriodicWatermarks {
>
> private long currentMaxTimestamp = Long.MIN_VALUE;
>
> @Nullable
> @Override
> public Watermark getCurrentWatermark() {
> return new Watermark(currentMaxTimestamp == Long.MIN_VALUE ?
> Long.MIN_VALUE : currentMaxTimestamp - 1);
> }
>
> @Override
> public long extractTimestamp(SessionEvent element, long
> previousElementTimestamp) {
> long eventStartTime = (long) element.get(SessionEvent.SESSION_START_DT);
> if (eventStartTime > currentMaxTimestamp) {
> currentMaxTimestamp = eventStartTime;
> }
>
> return eventStartTime;
> }
> }
>
> and this is the Flink topo
>
> // get input data
> FlinkKafkaConsumer010 consumer = new
> FlinkKafkaConsumer010<>("topic4",
> new MyOwnSchema()
> consumer.assignTimestampsAndWatermarks(new ExactTimestampExtractor());
> DataStream input = env.addSource(consumer);
>
> input.
> keyBy("id").
> window(EventTimeSessionWindows.withGap(Time.seconds(5))).
> reduce(new Reducer(), new WindowFunction()).
> print();
>
> //// execute program
> env.execute("a job");
>
> I used the latest code in github
>
> [1]
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java#L539
>
>


Kafka topic partition skewness causes watermark not being emitted

2017-01-12 Thread tao xiao
Hi team,

I have a topic with 2 partitions in Kafka. I produced all data to partition
0 and no data to partition 1. I created a Flink job with parallelism to 1
that consumes that topic and count the events with session event window (5
seconds gap). It turned out that the session event window was never closed
even I sent a message with 10 minutes gap. After digging into the source
code, AbstractFetcher[1] that is responsible for sending watermark to
downstream calculates the min watermark of all partitions. Due to the fact
that we don't have data in partition 1, the watermark returned from
partition 1is always Long.MIN_VALUE therefore AbstractFetcher never fires
the watermark to downstream.

I want to know if this is expected behavior or a bug. If this is expected
behavior how do I avoid the delay of watermark firing when data is not
evenly distributed to all partitions?

This is the timestamp extractor I used

public class ExactTimestampExtractor implements
AssignerWithPeriodicWatermarks {

private long currentMaxTimestamp = Long.MIN_VALUE;

@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE
: currentMaxTimestamp - 1);
}

@Override
public long extractTimestamp(SessionEvent element, long
previousElementTimestamp) {
long eventStartTime = (long) element.get(SessionEvent.SESSION_START_DT);
if (eventStartTime > currentMaxTimestamp) {
currentMaxTimestamp = eventStartTime;
}

return eventStartTime;
}
}

and this is the Flink topo

// get input data
FlinkKafkaConsumer010 consumer = new
FlinkKafkaConsumer010<>("topic4",
new MyOwnSchema()
consumer.assignTimestampsAndWatermarks(new ExactTimestampExtractor());
DataStream input = env.addSource(consumer);

input.
keyBy("id").
window(EventTimeSessionWindows.withGap(Time.seconds(5))).
reduce(new Reducer(), new WindowFunction()).
print();

//// execute program
env.execute("a job");

I used the latest code in github

[1]
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java#L539


Re: window function outputs two different values

2017-01-09 Thread tao xiao
Hi team,

any suggestions on below topic?

I have a requirement that wants to output two different values from a time
window reduce function. Here is basic workflow

1. fetch data from Kafka
2. flow the data to a event session window. kafka source -> keyBy ->
session window -> reduce
3. inside the reduce function, count the number of data and also emit the
data itself to another operator for further processing

As the reduce function can only emit the count, I want to know how to also
emit the data as well?



On Sat, 7 Jan 2017 at 20:30 tao xiao <xiaotao...@gmail.com> wrote:

> Hi team,
>
> I have a requirement that wants to output two different values from a time
> window reduce function. Here is basic workflow
>
> 1. fetch data from Kafka
> 2. flow the data to a event session window. kafka source -> keyBy ->
> session window -> reduce
> 3. inside the reduce function, count the number of data and also emit the
> data itself to another operator for further processing
>
> As the reduce function can only emit the count, I want to know how to also
> emit the data as well?
>
>


window function outputs two different values

2017-01-07 Thread tao xiao
Hi team,

I have a requirement that wants to output two different values from a time
window reduce function. Here is basic workflow

1. fetch data from Kafka
2. flow the data to a event session window. kafka source -> keyBy ->
session window -> reduce
3. inside the reduce function, count the number of data and also emit the
data itself to another operator for further processing

As the reduce function can only emit the count, I want to know how to also
emit the data as well?