Re: What does "Continuous incremental cleanup" mean in Flink 1.8 release notes

2019-03-10 Thread Tony Wei
Hi Konstantin,

That is really helpful. Thanks.

Another follow-up question: The document said "Cleanup in full snapshot" is
not applicable for
the incremental checkpointing in the RocksDB state backend. However, when
user manually
trigger a savepoint and restart job from it, the expired states should be
clean up as well based
on Flink 1.6's implementation. Am I right?

Best,
Tony Wei

Konstantin Knauf  於 2019年3月9日 週六 上午7:00寫道:

> Hi Tony,
>
> before Flink 1.8 expired state is only cleaned up, when you try to access
> it after expiration, i.e. when user code tries to access the expired state,
> the state value is cleaned and "null" is returned. There was also already
> the option to clean up expired state during full snapshots (
> https://github.com/apache/flink/pull/6460). With Flink 1.8 expired state
> is cleaned up continuously in the background regardless of checkpointing or
> any attempt to access it after expiration.
>
> As a reference the linked JIRA tickets should be a good starting point.
>
> Hope this helps.
>
> Konstantin
>
>
>
>
> On Fri, Mar 8, 2019 at 10:45 AM Tony Wei  wrote:
>
>> Hi everyone,
>>
>> I read the Flink 1.8 release notes about state [1], and it said
>>
>> *Continuous incremental cleanup of old Keyed State with TTL*
>>> We introduced TTL (time-to-live) for Keyed state in Flink 1.6 (
>>> FLINK-9510 ). This
>>> feature allowed to clean up and make inaccessible keyed state entries when
>>> accessing them. In addition state would now also being cleaned up when
>>> writing a savepoint/checkpoint.
>>> Flink 1.8 introduces continous cleanup of old entries for both the
>>> RocksDB state backend (FLINK-10471
>>> ) and the heap state
>>> backend (FLINK-10473 ).
>>> This means that old entries (according to the ttl setting) are continously
>>> being cleanup up.
>>
>>
>> I'm not familiar with TTL's implementation in Flink 1.6 and what new
>> features introduced in Flink
>> 1.8. I don't understand what difference between these two release version
>> after reading the
>> release notes. Did they change the outcome of TTL feature, or provide new
>> TTL features, or just
>> change the behavior of executing TTL mechanism.
>>
>> Could you give me more references to learn about it? A simple example
>> to illustrate it is more
>> appreciated. Thank you.
>>
>> Best,
>> Tony Wei
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/release-notes/flink-1.8.html#state
>>
>
>
> --
>
> Konstantin Knauf | Solutions Architect
>
> +49 160 91394525
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Data Artisans GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>


Re: estimate number of keys on rocks db

2019-03-10 Thread Avi Levi
Thanks Yun,
Attached. Please let me know if it is ok.
I made several trials including aggregation functions but couldn't figure
out why the line is not going straight up and why having those picks .



On Sun, Mar 10, 2019 at 4:49 PM Yun Tang  wrote:

> Hi Avi
>
> Unfortunately, we cannot see the attached images. By the way, did you ever
> use window in this job?
>
> Best
> Yun Tang
> --
> *From:* Avi Levi 
> *Sent:* Sunday, March 10, 2019 19:41
> *To:* user
> *Subject:* estimate number of keys on rocks db
>
> Hi,
> I am trying to estimate number of keys at a given minute.
> I created a graph based on avg_over_time
> 
>  with
> 1hr and 5m interval. looking at the graph you can see that it has high
> spikes which doesn't make sense (IMO) how can the average have those spikes
> ? after all since I do not delete keys I would expect to go up or remain
> the same.
> any ideas what can explain such behaviour ?
> attached are graphs 5m and 1 h intervals
> [image: Screen Shot 2019-03-10 at 13.37.44.png]
> [image: Screen Shot 2019-03-10 at 13.33.40.png]
>
>
>
>


Re: estimate number of keys on rocks db

2019-03-10 Thread Yun Tang
Hi Avi

Unfortunately, we cannot see the attached images. By the way, did you ever use 
window in this job?

Best
Yun Tang

From: Avi Levi 
Sent: Sunday, March 10, 2019 19:41
To: user
Subject: estimate number of keys on rocks db

Hi,
I am trying to estimate number of keys at a given minute.
I created a graph based on 
avg_over_time
 with 1hr and 5m interval. looking at the graph you can see that it has high 
spikes which doesn't make sense (IMO) how can the average have those spikes ? 
after all since I do not delete keys I would expect to go up or remain the same.
any ideas what can explain such behaviour ?
attached are graphs 5m and 1 h intervals
[Screen Shot 2019-03-10 at 13.37.44.png]
[Screen Shot 2019-03-10 at 13.33.40.png]





estimate number of keys on rocks db

2019-03-10 Thread Avi Levi
Hi,
I am trying to estimate number of keys at a given minute.
I created a graph based on avg_over_time

with
1hr and 5m interval. looking at the graph you can see that it has high
spikes which doesn't make sense (IMO) how can the average have those spikes
? after all since I do not delete keys I would expect to go up or remain
the same.
any ideas what can explain such behaviour ?
attached are graphs 5m and 1 h intervals
[image: Screen Shot 2019-03-10 at 13.37.44.png]
[image: Screen Shot 2019-03-10 at 13.33.40.png]


Re: S3 parquet sink - failed with S3 connection exception

2019-03-10 Thread Averell
Hi Kostas, and everyone,

Just some update to my issue: I have tried to:
 * changed s3 related configuration in hadoop as suggested by hadoop
document [1]: 
 increased /fs.s3a.threads.max/ from 10 to 100, and
/fs.s3a.connection.maximum/ from 15 to 120. For reference, I am having only
3 S3 sinks, with parallelisms of 4, 4, and 1.
 * followed AWS's document [2] to increase their EMRFS maxConnections to
200. However, I doubt that this would make any difference, as in creating
the S3 parquet bucket sink, I needed to use "s3a://..." path. "s3://..."
seems not supported by Flink yet. 
 * reduced the parallelism for my S3 continuous files reader.

However, the problem still randomly occurred (random by job executions. When
it occurred, the only solution is to cancel the job and restart from the
last successful checkpoint).

Thanks and regards,
Averell

[1]  Hadoop-AWS module: Integration with Amazon Web Services

  
[2]  emr-timeout-connection-wait

  



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Backoff strategies for async IO functions?

2019-03-10 Thread Shuyi Chen
Hi Konstantin,

(cc Till since he owns the code)

For async-IO, IO failure and retry is a common & expected pattern. In most
of the use cases, users will need to deal with IO failure and retry.
Therefore, I think it's better to address the problem in Flink rather than
user implementing its custom logic in user code for a better dev
experience. We do have similar problem in many of our use cases. To enable
backoff and retry, we need to put the failed message to a DLQ (another
Kafka topic) and re-ingest the message from the DLQ topic to retry, which
is manual/cumbersome and require setting up extra Kafka topic.

Can we add multiple strategies to handle async IO failure in the
AsyncWaitOperator? I propose the following strategies:


   - FAIL_OPERATOR (default & current behavior)
   - FIX_INTERVAL_RETRY (retry with configurable fixed interval up to N
   times)
   - EXP_BACKOFF_RETRY (retry with exponential backoff up to N times)

What do you guys think? Thanks a lot.

Shuyi

On Fri, Mar 8, 2019 at 3:17 PM Konstantin Knauf 
wrote:

> Hi William,
>
> the AsyncOperator does not have such a setting. It is "merely" a wrapper
> around an asynchronous call, which provides integration with Flink's state
> & time management.
>
> I think, the way to go would be to do the exponential back-off in the user
> code and set the timeout of the AsyncOperator to the sum of the timeouts in
> the user code (e.g. 2s + 4s + 8s + 16s).
>
> Cheers,
>
> Konstantin
>
>
> On Thu, Mar 7, 2019 at 5:20 PM William Saar  wrote:
>
>> Hi,
>> Is there a way to specify an exponential backoff strategy for when async
>> function calls fail?
>>
>> I have an async function that does web requests to a rate-limited API.
>> Can you handle that with settings on the async function call?
>>
>> Thanks,
>> William
>>
>>
>>
>
> --
>
> Konstantin Knauf | Solutions Architect
>
> +49 160 91394525
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Data Artisans GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>