Re: ARM support

2021-04-04 Thread Rex Fenley
Thanks for all of this info. Highly appreciated!

On Thu, Apr 1, 2021 at 1:17 AM Guowei Ma  wrote:

> Hi, Rex
>
> I think that Flink does not have an official release that supports the arm
> architecture. There are some efforts and discussion [1][2][3] about
> supporting the architecture. I think you could find some builds at
> openlabtesting. [4]
> But AFAIK there is no clear timeline about that.(correct me if I miss
> something) There is a discussion [5] and I think you might find some
> insight from there at that time.
>
> [1] https://issues.apache.org/jira/browse/FLINK-13448
> [2]
> https://lists.apache.org/thread.html/a564836a3c7cc5300bec7729c2af1ad9d611d526bb59dd6cca72cc7b%40%3Cdev.flink.apache.org%3E
> [3]
> https://lists.apache.org/thread.html/2399c8a701bced2266f9658719807b98a2e593a99b949f50e9a1ab1a%40%3Cdev.flink.apache.org%3E
> [4] http://status.openlabtesting.org/builds?project=apache%2Fflink
> [5]
> https://lists.apache.org/thread.html/5c4c75a2de979ed7ef1c661c15dd252569e598a374c27042b38d078b%40%3Cdev.flink.apache.org%3E
>
> Best,
> Guowei
>
>
> On Thu, Apr 1, 2021 at 3:55 AM Rex Fenley  wrote:
>
>> Hello,
>>
>> We would like to run Flink on ARM yet haven't found any resources
>> indicating that this is yet possible. We are wondering what the timeline is
>> for Flink supporting ARM. Given that all Mac Books are moving to ARM and
>> that AWS is excitedly supporting ARM, it seems important that Flink also
>> supports running on ARM.
>>
>> Thank you
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> <https://www.facebook.com/remindhq>
>>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


ARM support

2021-03-31 Thread Rex Fenley
Hello,

We would like to run Flink on ARM yet haven't found any resources
indicating that this is yet possible. We are wondering what the timeline is
for Flink supporting ARM. Given that all Mac Books are moving to ARM and
that AWS is excitedly supporting ARM, it seems important that Flink also
supports running on ARM.

Thank you

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Re: How to advance Kafka offsets manually with checkpointing enabled on Flink (TableAPI)

2021-03-18 Thread Rex Fenley
Thanks for the info!

On Thu, Mar 18, 2021 at 7:46 AM Dawid Wysakowicz 
wrote:

> Hi Rex,
>
> The approach you described is definitely possible in the DataStream API.
> You could replace the uid of your Kafka source and start your job with your
> checkpoint with the allowNonRestoredState option enabled[1]. I am afraid
> though it is not possible to change the uid in Table API/SQL
>
> Another approach that you could try is to edit the checkpoint via the
> State Processor API[2] and increase the checkpointed offsets.
>
> Best,
>
> Dawid
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/cli.html#starting-a-job-from-a-savepoint
>
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/libs/state_processor_api.html
> On 16/03/2021 20:03, Rex Fenley wrote:
>
> Hello,
>
> I'm wondering how, in the event of a poison pill record on Kafka, to
> advance a partition's checkpointed offsets by 1 when using the TableAPI/SQL.
>
> It is my understanding that when checkpointing is enabled Flink uses its
> own checkpoint committed offsets and not the offsets committed to Kafka
> when starting a job from a checkpoint.
>
> In the event that there is a poison pill record in Kafka that is crashing
> the Flink job, we may want to simply advance our checkpointed offsets by 1
> for the partition, past the poison record, and then continue operation as
> normal. We do not want to lose any other state in Flink however.
>
> I'm wondering how to go about this then. It's easy enough to have Kafka
> advance its committed offsets. Is there a way to tell Flink to ignore
> checkpointed offsets and instead respect the offsets committed to Kafka for
> a consumer group when restoring from a checkpoint?
> If so we could:
> 1. Advance Kafka's offsets.
> 2. Run our job from the checkpoint and have it use Kafka's offsets and
> then checkpoint with new Kafka offsets.
> 3. Stop the job, and rerun it using Flink's committed, now advanced,
> offsets.
>
> Is this possible? Are there any better strategies?
>
> Thanks!
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


How to advance Kafka offsets manually with checkpointing enabled on Flink (TableAPI)

2021-03-16 Thread Rex Fenley
Hello,

I'm wondering how, in the event of a poison pill record on Kafka, to
advance a partition's checkpointed offsets by 1 when using the TableAPI/SQL.

It is my understanding that when checkpointing is enabled Flink uses its
own checkpoint committed offsets and not the offsets committed to Kafka
when starting a job from a checkpoint.

In the event that there is a poison pill record in Kafka that is crashing
the Flink job, we may want to simply advance our checkpointed offsets by 1
for the partition, past the poison record, and then continue operation as
normal. We do not want to lose any other state in Flink however.

I'm wondering how to go about this then. It's easy enough to have Kafka
advance its committed offsets. Is there a way to tell Flink to ignore
checkpointed offsets and instead respect the offsets committed to Kafka for
a consumer group when restoring from a checkpoint?
If so we could:
1. Advance Kafka's offsets.
2. Run our job from the checkpoint and have it use Kafka's offsets and then
checkpoint with new Kafka offsets.
3. Stop the job, and rerun it using Flink's committed, now advanced,
offsets.

Is this possible? Are there any better strategies?

Thanks!

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Re: Prefix Seek RocksDB

2021-03-16 Thread Rex Fenley
Thanks for the input, I'll look more into that.

Does your answer then imply that Joins and Aggs do not inherently always
use prefix seeks? I'd imagine that the join key on join and groupby key on
aggs would always be used as prefix keys. Is this not the case?

Also, is there good information on what the correct prefix extractor is for
Flink? This feature is something I only just discovered so I was hoping to
gain clarity.

Thanks

On Mon, Mar 15, 2021 at 8:33 PM Yun Tang  wrote:

> Hi Rex,
>
> You could configure prefix seek via RocksDB's column family options [1].
> Be careful to use correct prefix extractor.
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#passing-options-factory-to-rocksdb
>
>
> Best
>
>
> --
> *From:* Rex Fenley 
> *Sent:* Tuesday, March 16, 2021 8:29
> *To:* user 
> *Cc:* Brad Davis 
> *Subject:* Prefix Seek RocksDB
>
> Hello!
>
> I'm wondering if Flink RocksDB state backend is pre-configured to have
> Prefix Seeks enabled, such as for Joins and Aggs on the TableAPI [1]? If
> not, what's the easiest way to configure this? I'd imagine this would be
> beneficial.
>
> Thanks!
>
> [1] https://github.com/facebook/rocksdb/wiki/Prefix-Seek
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>


-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Prefix Seek RocksDB

2021-03-15 Thread Rex Fenley
Hello!

I'm wondering if Flink RocksDB state backend is pre-configured to have
Prefix Seeks enabled, such as for Joins and Aggs on the TableAPI [1]? If
not, what's the easiest way to configure this? I'd imagine this would be
beneficial.

Thanks!

[1] https://github.com/facebook/rocksdb/wiki/Prefix-Seek

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Re: Does the Kafka source perform retractions on Key?

2021-03-01 Thread Rex Fenley
Thanks Arvid,

I think my confusion lies in misinterpreting the meaning of CDC. We
basically don't want CDC, we just use it to get data into a compacted Kafka
topic where we hold the current state of the world to consume from multiple
consumers. You have described pretty thoroughly where we want to go.

One interesting part of your architecture is this "Debezium -> State
collecting Flink job". Is there a way for Debezium to write to Flink? I
thought it required Kafka Connect.

Appreciate your feedback

On Mon, Mar 1, 2021 at 12:43 AM Arvid Heise  wrote:

> > We are rereading the topics, at any time we might want a completely
> different materialized view for a different web service for some new
> application feature. Other jobs / new jobs need to read all the up-to-date
> rows from the databases.
> > I still don't see how this is the case if everything just needs to be
> overwritten by primary key. To re-emphasize, we do not care about
> historical data.
> Why are you reading from a CDC topic and not a log-compacted topic that
> reflects the state then? CDC is all about history and changes.
>
> What i'd imagine an architecture that would work better for you:
>
> For each SQL table (ingress layer):
> SQL Table -> Debezium -> State collecting Flink job -> Kafka state topic
> (compacted)
>
> Analytics (processing layer):
> Kafka state topics (compacted) -> Analytical Flink job -> Kafka state
> topic (compacted)
>
> For each view (egress layer):
> Kafka state topics (compacted) -> Aggregating Flink job -> K/V store(s) ->
> Web application
>
> The ingress layer is only there to provide you log-compacted Kafka topics.
> Then you can do a bunch of analytical queries from Kafka to Kafka. Finally,
> you output your views to K/V stores for high-avail web applications
> (=decoupled from processing layer).
>
> If that's what you already have, then my apology for not picking that up.
> It's really important to stress that no Kafka topics ever contain CDC data
> in this instance since you are not interested in historic data. The only
> CDC exchange is by using the debezium connector of Flink. At this point,
> all discussions of this thread are resolved.
>
>
>
> On Sat, Feb 27, 2021 at 9:06 PM Rex Fenley  wrote:
>
>> Hi Arvid,
>>
>> >If you are not rereading the topics, why do you compact them?
>> We are rereading the topics, at any time we might want a completely
>> different materialized view for a different web service for some new
>> application feature. Other jobs / new jobs need to read all the up-to-date
>> rows from the databases.
>>
>> >correctness depends on compaction < downtime
>> I still don't see how this is the case if everything just needs to be
>> overwritten by primary key. To re-emphasize, we do not care about
>> historical data.
>>
>> >Again, a cloud-native key/value store would perform much better and be
>> much cheaper with better SLAs
>> Is there a cloud-native key/value store which can read from a Postgres
>> WAL or MySQL binlog and then keep an up-to-date read marker for any
>> materialization consumers downstream *besides* Kafka + Debezium?
>>
>> Appreciate all the feedback, though hopefully we can get closer to the
>> same mental model. If there's really a better alternative here I'm all for
>> it!
>>
>>
>> On Sat, Feb 27, 2021 at 11:50 AM Arvid Heise  wrote:
>>
>>> Hi Rex,
>>>
>>> Your initial question was about the impact of compaction on your CDC
>>> application logic. I have been (unsuccessfully) trying to tell you that you
>>> do not need compaction and it's counterproductive.
>>>
>>> If you are not rereading the topics, why do you compact them? It's lost
>>> compute time and I/O on the Kafka brokers (which are both very valuable)
>>> and does not give you anything that an appropriate retention time wouldn't
>>> give you (=lower SSD usage). It makes the mental model more complicated. An
>>> aggressive compaction and a larger backlog (compaction time < application
>>> failure/restart/upgrade time) would lead to incorrect results (in the same
>>> way an inappropriate retention period may cause data loss for the same
>>> reason).
>>>
>>> The only use case for log compaction is if you're using a Kafka topic
>>> for a key/value store to serve a web application (in which case, it's
>>> usually better to take a real key/value store) but then you don't need
>>> retractions anymore but you'd simply overwrite the actual value

Re: Does the Kafka source perform retractions on Key?

2021-02-27 Thread Rex Fenley
Hi Arvid,

>If you are not rereading the topics, why do you compact them?
We are rereading the topics, at any time we might want a completely
different materialized view for a different web service for some new
application feature. Other jobs / new jobs need to read all the up-to-date
rows from the databases.

>correctness depends on compaction < downtime
I still don't see how this is the case if everything just needs to be
overwritten by primary key. To re-emphasize, we do not care about
historical data.

>Again, a cloud-native key/value store would perform much better and be
much cheaper with better SLAs
Is there a cloud-native key/value store which can read from a Postgres WAL
or MySQL binlog and then keep an up-to-date read marker for any
materialization consumers downstream *besides* Kafka + Debezium?

Appreciate all the feedback, though hopefully we can get closer to the same
mental model. If there's really a better alternative here I'm all for it!


On Sat, Feb 27, 2021 at 11:50 AM Arvid Heise  wrote:

> Hi Rex,
>
> Your initial question was about the impact of compaction on your CDC
> application logic. I have been (unsuccessfully) trying to tell you that you
> do not need compaction and it's counterproductive.
>
> If you are not rereading the topics, why do you compact them? It's lost
> compute time and I/O on the Kafka brokers (which are both very valuable)
> and does not give you anything that an appropriate retention time wouldn't
> give you (=lower SSD usage). It makes the mental model more complicated. An
> aggressive compaction and a larger backlog (compaction time < application
> failure/restart/upgrade time) would lead to incorrect results (in the same
> way an inappropriate retention period may cause data loss for the same
> reason).
>
> The only use case for log compaction is if you're using a Kafka topic for
> a key/value store to serve a web application (in which case, it's usually
> better to take a real key/value store) but then you don't need retractions
> anymore but you'd simply overwrite the actual values or use tombstone
> records for deletions.
>
> If you consume the same topic both for web applications and Flink and
> don't want to use another technology for key/value store, then log
> compaction of retractions kinda makes sense to kill 2 birds with one stone.
> However, you have to live with the downsides on the Flink side (correctness
> depends on compaction < downtime) and on web application (deal with
> retractions even though they do not make any sense at that level). Again, a
> cloud-native key/value store would perform much better and be much cheaper
> with better SLAs and solve all issues on the Flink side (final note: it's
> independent of the technology, any stream processor will encounter the same
> issue as it's a conceptual mismatch).
>
> On Sat, Feb 27, 2021 at 8:24 PM Rex Fenley  wrote:
>
>> Hi Arvid,
>>
>> I really appreciate the thorough response but I don't think this
>> contradicts our use case. In servicing web applications we're doing nothing
>> more than taking data from giant databases we use, and performing joins and
>> denormalizing aggs strictly for performance reasons (joining across a lot
>> of stuff on query time is slow) and putting specified results into another
>> database connected to the specified web server. Our Flink jobs are purely
>> used for up-to-date materialized views. We don't care about historical
>> analysis, we only care about what the exact current state of the world is.
>>
>> This is why every row has a primary key, from beginning to end of the job
>> (even though Flink's table api can't seem to detect that after a lot of
>> joins in our plan, but it's logically true since then the join key will be
>> pk). This is also why all we need to do is retract the current row from the
>> Kafka source on the existing primary key that's being overwritten, have
>> that retract propagate downstream to throw away any data transformed from
>> that row, and then process the new row. We don't care what other data
>> changes may have happened in between, it's not applicable to our use case.
>>
>> We're using CDC for nothing more than a way to get the latest rows in
>> real time into Kafka so they can be read by various Flink jobs we hope to
>> build (starting with the one we're currently working on that has ~35
>> stateful operators) which then just transform and forward to another
>> database.
>>
>> 
>>
>> Reading the Upsert Kafka docs [1] "In the physical operator, we will use
>> state to know whether the key is the fir

Re: Does the Kafka source perform retractions on Key?

2021-02-27 Thread Rex Fenley
ause of the retention time. In
> general, it's better to choose a technology such as Pulsar with tiered
> storage that gives you exactly what you want with low overhead: you need
> unlimited retention without compaction but without holding much data in
> expensive storage (SSD) by offloading automatically to cold storage.
>
> If this is not working for you, then please share your requirements with
> me why you'd need compaction + a different retention for
> source/intermediate topics.
>
> For the final topic, from my experience, a real key/value store works much
> better than log compacted topics for serving web applications. Confluent's
> marketing is strongly pushing that Kafka can be used as a database and as a
> key/value store while in reality, it's "just" a good distribution log. I
> can provide pointers that discuss the limitations if there is interest.
> Also note that the final topic should not be in CDC format anymore (so no
> retractions). It should just contain the current state. For both examples
> together it would be
> 1, Gop, 2009
> and no record for person 2.
>
>
> On Sat, Feb 27, 2021 at 3:33 AM Rex Fenley  wrote:
>
>> Digging around, it looks like Upsert Kafka which requires a Primary Key
>> will actually do what I want and uses compaction, but it doesn't look
>> compatible with Debezium format? Is this on the roadmap?
>>
>> In the meantime, we're considering consuming from Debezium Kafka (still
>> compacted) and then writing directly to an Upsert Kafka sink and then
>> reading right back out of a corresponding Upsert Kafka source. Since that
>> little roundabout will key all changes by primary key it should give us a
>> compacted topic to start with initially. Once we get that working we can
>> probably do the same thing with intermediate flink jobs too.
>>
>> Would appreciate any feedback on this approach, thanks!
>>
>> On Fri, Feb 26, 2021 at 10:52 AM Rex Fenley  wrote:
>>
>>> Does this also imply that it's not safe to compact the initial topic
>>> where data is coming from Debezium? I'd think that Flink's Kafka source
>>> would emit retractions on any existing data with a primary key as new data
>>> with the same pk arrived (in our case all data has primary keys). I guess
>>> that goes back to my original question still however, is this not what the
>>> Kafka source does? Is there no way to make that happen?
>>>
>>> We really can't live with the record amplification, it's sometimes
>>> nonlinear and randomly kills RocksDB performance.
>>>
>>> On Fri, Feb 26, 2021 at 2:16 AM Arvid Heise  wrote:
>>>
>>>> Just to clarify, intermediate topics should in most cases not be
>>>> compacted for exactly the reasons if your application depends on all
>>>> intermediate data. For the final topic, it makes sense. If you also consume
>>>> intermediate topics for web application, one solution is to split it into
>>>> two topics (like topic-raw for Flink and topic-compacted for applications)
>>>> and live with some amplification.
>>>>
>>>> On Thu, Feb 25, 2021 at 12:11 AM Rex Fenley  wrote:
>>>>
>>>>> All of our Flink jobs are (currently) used for web applications at the
>>>>> end of the day. We see a lot of latency spikes from record amplification
>>>>> and we were at first hoping we could pass intermediate results through
>>>>> Kafka and compact them to lower the record amplification, but then it hit
>>>>> me that this might be an issue.
>>>>>
>>>>> Thanks for the detailed explanation, though it seems like we'll need
>>>>> to look for a different solution or only compact on records we know will
>>>>> never mutate.
>>>>>
>>>>> On Wed, Feb 24, 2021 at 6:38 AM Arvid Heise  wrote:
>>>>>
>>>>>> Jan's response is correct, but I'd like to emphasize the impact on a
>>>>>> Flink application.
>>>>>>
>>>>>> If the compaction happens before the data arrives in Flink, the
>>>>>> intermediate updates are lost and just the final result appears.
>>>>>> Also if you restart your Flink application and reprocess older data,
>>>>>> it will naturally only see the compacted data save for the active 
>>>>>> segment.
>>>>>>
>>>>>> So how to make it deterministic? Simply drop topic compaction. If
>>>>>> i

Re: How to pass PROCTIME through an aggregate

2021-02-26 Thread Rex Fenley
This is great Timo. Maybe it only works in SQL but not Table API in the
middle of a plan, which is fine. We'll give this a shot, thank you so much.

On Fri, Feb 26, 2021 at 2:00 AM Timo Walther  wrote:

> Hi Rex,
>
> as far as I know, we recently allowed PROCTIME() also at arbitrary
> locations in the query. So you don't have to pass it through the
> aggregate but you can call it afterwards again.
>
> Does that work in your use case? Something like:
>
> SELECT i, COUNT(*) FROM customers GROUP BY i, TUMBLE(PROCTIME(),
> INTERVAL '5' SECOND)
>
> Regards,
> Timo
>
>
>
>
> On 24.02.21 14:20, Arvid Heise wrote:
> > Hi Rex,
> >
> > just an idea, wouldn't it be possible to just add
> >
> > |UNIX_TIMESTAMP()
> > |
> >
> > |right before your window operation?|
> >
> > |
> > |
> >
> >
> > On Sat, Feb 20, 2021 at 2:14 AM Rex Fenley  > <mailto:r...@remind101.com>> wrote:
> >
> > Hello,
> >
> > Using the table api, I have a CREATE DDL which adds a PROCTIME()
> > column and I need to use it deep (like 10 operators deep) in our
> > plan. However, I explicitly do not want to use it before that point.
> >
> > The problem I'm running into is for any group by UDAF I use I then
> > lose the proctime column. I do not want to group by proctime there
> > (it is unwindowed at that point) nor do I want to agg over proctime,
> > I just want to pass the proctime column through until later when I
> > actually use it in a window.
> >
> > My question is, how do I forward proctime through group-by UDAFs
> > without interfering with my plan?
> >
> > Thanks!
> >
> > --
> >
> > Rex Fenley|Software Engineer - Mobile and Backend
> >
> >
> > Remind.com <https://www.remind.com/>| BLOG
> > <http://blog.remind.com/> | FOLLOW US
> > <https://twitter.com/remindhq> | LIKE US
> > <https://www.facebook.com/remindhq>
> >
>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Re: Does the Kafka source perform retractions on Key?

2021-02-26 Thread Rex Fenley
Digging around, it looks like Upsert Kafka which requires a Primary Key
will actually do what I want and uses compaction, but it doesn't look
compatible with Debezium format? Is this on the roadmap?

In the meantime, we're considering consuming from Debezium Kafka (still
compacted) and then writing directly to an Upsert Kafka sink and then
reading right back out of a corresponding Upsert Kafka source. Since that
little roundabout will key all changes by primary key it should give us a
compacted topic to start with initially. Once we get that working we can
probably do the same thing with intermediate flink jobs too.

Would appreciate any feedback on this approach, thanks!

On Fri, Feb 26, 2021 at 10:52 AM Rex Fenley  wrote:

> Does this also imply that it's not safe to compact the initial topic where
> data is coming from Debezium? I'd think that Flink's Kafka source would
> emit retractions on any existing data with a primary key as new data with
> the same pk arrived (in our case all data has primary keys). I guess that
> goes back to my original question still however, is this not what the Kafka
> source does? Is there no way to make that happen?
>
> We really can't live with the record amplification, it's sometimes
> nonlinear and randomly kills RocksDB performance.
>
> On Fri, Feb 26, 2021 at 2:16 AM Arvid Heise  wrote:
>
>> Just to clarify, intermediate topics should in most cases not be
>> compacted for exactly the reasons if your application depends on all
>> intermediate data. For the final topic, it makes sense. If you also consume
>> intermediate topics for web application, one solution is to split it into
>> two topics (like topic-raw for Flink and topic-compacted for applications)
>> and live with some amplification.
>>
>> On Thu, Feb 25, 2021 at 12:11 AM Rex Fenley  wrote:
>>
>>> All of our Flink jobs are (currently) used for web applications at the
>>> end of the day. We see a lot of latency spikes from record amplification
>>> and we were at first hoping we could pass intermediate results through
>>> Kafka and compact them to lower the record amplification, but then it hit
>>> me that this might be an issue.
>>>
>>> Thanks for the detailed explanation, though it seems like we'll need to
>>> look for a different solution or only compact on records we know will never
>>> mutate.
>>>
>>> On Wed, Feb 24, 2021 at 6:38 AM Arvid Heise  wrote:
>>>
>>>> Jan's response is correct, but I'd like to emphasize the impact on a
>>>> Flink application.
>>>>
>>>> If the compaction happens before the data arrives in Flink, the
>>>> intermediate updates are lost and just the final result appears.
>>>> Also if you restart your Flink application and reprocess older data, it
>>>> will naturally only see the compacted data save for the active segment.
>>>>
>>>> So how to make it deterministic? Simply drop topic compaction. If it's
>>>> coming from CDC and you want to process and produce changelog streams over
>>>> several applications, you probably don't want to use log compactions
>>>> anyways.
>>>>
>>>> Log compaction only makes sense in the snapshot topic that displays the
>>>> current state (KTable), where you don't think in CDC updates anymore but
>>>> just final records, like
>>>> (user_id: 1, state: "california")
>>>> (user_id: 1, state: "ohio")
>>>>
>>>> Usually, if you use CDC in your company, each application is
>>>> responsible for building its own current model by tapping in the relevant
>>>> changes. Log compacted topics would then only appear at the end of
>>>> processing, when you hand it over towards non-analytical applications, such
>>>> as Web Apps.
>>>>
>>>> On Wed, Feb 24, 2021 at 10:01 AM Jan Lukavský  wrote:
>>>>
>>>>> Hi Rex,
>>>>>
>>>>> If I understand correctly, you are concerned about behavior of Kafka
>>>>> source in the case of compacted topic, right? If that is the case, then
>>>>> this is not directly related to Flink, Flink will expose the behavior
>>>>> defined by Kafka. You can read about it for instance here [1]. TL;TD - 
>>>>> your
>>>>> pipeline is guaranteed to see every record written to topic (every single
>>>>> update, be it later "overwritten" or not) if it processes the record with
>>>>> latency at most 'delete.r

Re: Does the Kafka source perform retractions on Key?

2021-02-26 Thread Rex Fenley
Does this also imply that it's not safe to compact the initial topic where
data is coming from Debezium? I'd think that Flink's Kafka source would
emit retractions on any existing data with a primary key as new data with
the same pk arrived (in our case all data has primary keys). I guess that
goes back to my original question still however, is this not what the Kafka
source does? Is there no way to make that happen?

We really can't live with the record amplification, it's sometimes
nonlinear and randomly kills RocksDB performance.

On Fri, Feb 26, 2021 at 2:16 AM Arvid Heise  wrote:

> Just to clarify, intermediate topics should in most cases not be compacted
> for exactly the reasons if your application depends on all intermediate
> data. For the final topic, it makes sense. If you also consume intermediate
> topics for web application, one solution is to split it into two topics
> (like topic-raw for Flink and topic-compacted for applications) and live
> with some amplification.
>
> On Thu, Feb 25, 2021 at 12:11 AM Rex Fenley  wrote:
>
>> All of our Flink jobs are (currently) used for web applications at the
>> end of the day. We see a lot of latency spikes from record amplification
>> and we were at first hoping we could pass intermediate results through
>> Kafka and compact them to lower the record amplification, but then it hit
>> me that this might be an issue.
>>
>> Thanks for the detailed explanation, though it seems like we'll need to
>> look for a different solution or only compact on records we know will never
>> mutate.
>>
>> On Wed, Feb 24, 2021 at 6:38 AM Arvid Heise  wrote:
>>
>>> Jan's response is correct, but I'd like to emphasize the impact on a
>>> Flink application.
>>>
>>> If the compaction happens before the data arrives in Flink, the
>>> intermediate updates are lost and just the final result appears.
>>> Also if you restart your Flink application and reprocess older data, it
>>> will naturally only see the compacted data save for the active segment.
>>>
>>> So how to make it deterministic? Simply drop topic compaction. If it's
>>> coming from CDC and you want to process and produce changelog streams over
>>> several applications, you probably don't want to use log compactions
>>> anyways.
>>>
>>> Log compaction only makes sense in the snapshot topic that displays the
>>> current state (KTable), where you don't think in CDC updates anymore but
>>> just final records, like
>>> (user_id: 1, state: "california")
>>> (user_id: 1, state: "ohio")
>>>
>>> Usually, if you use CDC in your company, each application is responsible
>>> for building its own current model by tapping in the relevant changes. Log
>>> compacted topics would then only appear at the end of processing, when you
>>> hand it over towards non-analytical applications, such as Web Apps.
>>>
>>> On Wed, Feb 24, 2021 at 10:01 AM Jan Lukavský  wrote:
>>>
>>>> Hi Rex,
>>>>
>>>> If I understand correctly, you are concerned about behavior of Kafka
>>>> source in the case of compacted topic, right? If that is the case, then
>>>> this is not directly related to Flink, Flink will expose the behavior
>>>> defined by Kafka. You can read about it for instance here [1]. TL;TD - your
>>>> pipeline is guaranteed to see every record written to topic (every single
>>>> update, be it later "overwritten" or not) if it processes the record with
>>>> latency at most 'delete.retention.ms'. This is configurable per topic
>>>> - default 24 hours. If you want to reprocess the data later, your consumer
>>>> might see only resulting compacted ("retracted") stream, and not every
>>>> record actually written to the topic.
>>>>
>>>>  Jan
>>>>
>>>> [1]
>>>> https://medium.com/swlh/introduction-to-topic-log-compaction-in-apache-kafka-3e4d4afd2262
>>>> On 2/24/21 3:14 AM, Rex Fenley wrote:
>>>>
>>>> Apologies, forgot to finish. If the Kafka source performs its own
>>>> retractions of old data on key (user_id) for every append it receives, it
>>>> should resolve this discrepancy I think.
>>>>
>>>> Again, is this true? Anything else I'm missing?
>>>>
>>>> Thanks!
>>>>
>>>>
>>>> On Tue, Feb 23, 2021 at 6:12 PM Rex Fenley  wrote:
>>>>
>>>>> Hi,
>>&g

Re: Does the Kafka source perform retractions on Key?

2021-02-24 Thread Rex Fenley
All of our Flink jobs are (currently) used for web applications at the end
of the day. We see a lot of latency spikes from record amplification and we
were at first hoping we could pass intermediate results through Kafka and
compact them to lower the record amplification, but then it hit me that
this might be an issue.

Thanks for the detailed explanation, though it seems like we'll need to
look for a different solution or only compact on records we know will never
mutate.

On Wed, Feb 24, 2021 at 6:38 AM Arvid Heise  wrote:

> Jan's response is correct, but I'd like to emphasize the impact on a Flink
> application.
>
> If the compaction happens before the data arrives in Flink, the
> intermediate updates are lost and just the final result appears.
> Also if you restart your Flink application and reprocess older data, it
> will naturally only see the compacted data save for the active segment.
>
> So how to make it deterministic? Simply drop topic compaction. If it's
> coming from CDC and you want to process and produce changelog streams over
> several applications, you probably don't want to use log compactions
> anyways.
>
> Log compaction only makes sense in the snapshot topic that displays the
> current state (KTable), where you don't think in CDC updates anymore but
> just final records, like
> (user_id: 1, state: "california")
> (user_id: 1, state: "ohio")
>
> Usually, if you use CDC in your company, each application is responsible
> for building its own current model by tapping in the relevant changes. Log
> compacted topics would then only appear at the end of processing, when you
> hand it over towards non-analytical applications, such as Web Apps.
>
> On Wed, Feb 24, 2021 at 10:01 AM Jan Lukavský  wrote:
>
>> Hi Rex,
>>
>> If I understand correctly, you are concerned about behavior of Kafka
>> source in the case of compacted topic, right? If that is the case, then
>> this is not directly related to Flink, Flink will expose the behavior
>> defined by Kafka. You can read about it for instance here [1]. TL;TD - your
>> pipeline is guaranteed to see every record written to topic (every single
>> update, be it later "overwritten" or not) if it processes the record with
>> latency at most 'delete.retention.ms'. This is configurable per topic -
>> default 24 hours. If you want to reprocess the data later, your consumer
>> might see only resulting compacted ("retracted") stream, and not every
>> record actually written to the topic.
>>
>>  Jan
>>
>> [1]
>> https://medium.com/swlh/introduction-to-topic-log-compaction-in-apache-kafka-3e4d4afd2262
>> On 2/24/21 3:14 AM, Rex Fenley wrote:
>>
>> Apologies, forgot to finish. If the Kafka source performs its own
>> retractions of old data on key (user_id) for every append it receives, it
>> should resolve this discrepancy I think.
>>
>> Again, is this true? Anything else I'm missing?
>>
>> Thanks!
>>
>>
>> On Tue, Feb 23, 2021 at 6:12 PM Rex Fenley  wrote:
>>
>>> Hi,
>>>
>>> I'm concerned about the impacts of Kafka's compactions when sending data
>>> between running flink jobs.
>>>
>>> For example, one job produces retract stream records in sequence of
>>> (false, (user_id: 1, state: "california") -- retract
>>> (true, (user_id: 1, state: "ohio")) -- append
>>> Which is consumed by Kafka and keyed by user_id, this could end up
>>> compacting to just
>>> (true, (user_id: 1, state: "ohio")) -- append
>>> If some other downstream Flink job has a filter on state == "california"
>>> and reads from the Kafka stream, I assume it will miss the retract message
>>> altogether and produce incorrect results.
>>>
>>> Is this true? How do we prevent this from happening? We need to use
>>> compaction since all our jobs are based on CDC and we can't just drop data
>>> after x number of days.
>>>
>>> Thanks
>>>
>>> --
>>>
>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>
>>>
>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>> <https://www.facebook.com/remindhq>
>>>
>>
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> <https://www.facebook.com/remindhq>
>>
>>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Re: Does the Kafka source perform retractions on Key?

2021-02-23 Thread Rex Fenley
Apologies, forgot to finish. If the Kafka source performs its own
retractions of old data on key (user_id) for every append it receives, it
should resolve this discrepancy I think.

Again, is this true? Anything else I'm missing?

Thanks!


On Tue, Feb 23, 2021 at 6:12 PM Rex Fenley  wrote:

> Hi,
>
> I'm concerned about the impacts of Kafka's compactions when sending data
> between running flink jobs.
>
> For example, one job produces retract stream records in sequence of
> (false, (user_id: 1, state: "california") -- retract
> (true, (user_id: 1, state: "ohio")) -- append
> Which is consumed by Kafka and keyed by user_id, this could end up
> compacting to just
> (true, (user_id: 1, state: "ohio")) -- append
> If some other downstream Flink job has a filter on state == "california"
> and reads from the Kafka stream, I assume it will miss the retract message
> altogether and produce incorrect results.
>
> Is this true? How do we prevent this from happening? We need to use
> compaction since all our jobs are based on CDC and we can't just drop data
> after x number of days.
>
> Thanks
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>


-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Does the Kafka source perform retractions on Key?

2021-02-23 Thread Rex Fenley
Hi,

I'm concerned about the impacts of Kafka's compactions when sending data
between running flink jobs.

For example, one job produces retract stream records in sequence of
(false, (user_id: 1, state: "california") -- retract
(true, (user_id: 1, state: "ohio")) -- append
Which is consumed by Kafka and keyed by user_id, this could end up
compacting to just
(true, (user_id: 1, state: "ohio")) -- append
If some other downstream Flink job has a filter on state == "california"
and reads from the Kafka stream, I assume it will miss the retract message
altogether and produce incorrect results.

Is this true? How do we prevent this from happening? We need to use
compaction since all our jobs are based on CDC and we can't just drop data
after x number of days.

Thanks

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


How to pass PROCTIME through an aggregate

2021-02-19 Thread Rex Fenley
Hello,

Using the table api, I have a CREATE DDL which adds a PROCTIME() column and
I need to use it deep (like 10 operators deep) in our plan. However, I
explicitly do not want to use it before that point.

The problem I'm running into is for any group by UDAF I use I then lose the
proctime column. I do not want to group by proctime there (it is unwindowed
at that point) nor do I want to agg over proctime, I just want to pass the
proctime column through until later when I actually use it in a window.

My question is, how do I forward proctime through group-by UDAFs without
interfering with my plan?

Thanks!

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Re: How is proctime represented?

2021-02-19 Thread Rex Fenley
Thanks yall this is really helpful!

On Fri, Feb 19, 2021 at 2:40 AM Timo Walther  wrote:

> Chesnay is right. The PROCTIME() is lazy evaluated and executed when its
> result is needed as an argument for another expression or function. So
> within the pipeline the column is NULL but when you want to compute
> something e.g. CAST(proctime AS TIMESTAMP(3)) it will be materialized
> into the row. If you want to use ingestion time, you should be able to use:
>
> CREATE TABLE (
>ingest_ts AS CAST(PROCTIME() AS TIMESTAMP(3))
> )
>
> Regards,
> Timo
>
>
> On 19.02.21 10:23, Chesnay Schepler wrote:
> > hmm...I can now see where that uncertainty comes from.
> >
> > My /impression/ is that PROCTIME is not evaluated eagerly, and instead
> > and operators relying on this column generate their own processing
> > timestamp. What throws me off is that I cannot tell how you would tell
> > Flink to store a processing timestamp as is in a row (to essentially
> > create something like ingestion time).
> >
> > I'm looping in Timo to provide some clarity.
> >
> > On 2/19/2021 8:39 AM, Rex Fenley wrote:
> >> Reading the documentation you posted again after posting this
> >> question, it does sound like it's simply a placeholder that only gets
> >> filled in when used by an operator, then again, that's still not
> >> exactly what it says so I only feel 70% confident like that's what is
> >> happening.
> >>
> >> On Thu, Feb 18, 2021 at 10:55 PM Chesnay Schepler  >> <mailto:ches...@apache.org>> wrote:
> >>
> >> Could you check whether this answers your question?
> >>
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/timely-stream-processing.html#notions-of-time-event-time-and-processing-time
> >> <
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/timely-stream-processing.html#notions-of-time-event-time-and-processing-time
> >
> >>
> >> On 2/19/2021 7:29 AM, Rex Fenley wrote:
> >>> Hello,
> >>>
> >>> When using PROCTIME() in CREATE DDL for a source, is the proctime
> >>> attribute a timestamp generated at the time of row ingestion at
> >>> the source and then forwarded through the graph execution, or is
> >>> proctime attribute a placeholder that says "fill me in with a
> >>> timestamp" once it's being used directly by some operator, by
> >>> some machine?
> >>>
> >>> Thanks!
> >>>
> >>> --
> >>>
> >>> Rex Fenley|Software Engineer - Mobile and Backend
> >>>
> >>>
> >>> Remind.com <https://www.remind.com/>| BLOG
> >>> <http://blog.remind.com/> | FOLLOW US
> >>> <https://twitter.com/remindhq> | LIKE US
> >>> <https://www.facebook.com/remindhq>
> >>>
> >>
> >>
> >>
> >> --
> >>
> >> Rex Fenley|Software Engineer - Mobile and Backend
> >>
> >>
> >> Remind.com <https://www.remind.com/>| BLOG <http://blog.remind.com/> |
> >> FOLLOW US <https://twitter.com/remindhq> | LIKE US
> >> <https://www.facebook.com/remindhq>
> >>
> >
>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Re: How is proctime represented?

2021-02-18 Thread Rex Fenley
Reading the documentation you posted again after posting this question, it
does sound like it's simply a placeholder that only gets filled in when
used by an operator, then again, that's still not exactly what it says so I
only feel 70% confident like that's what is happening.

On Thu, Feb 18, 2021 at 10:55 PM Chesnay Schepler 
wrote:

> Could you check whether this answers your question?
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/timely-stream-processing.html#notions-of-time-event-time-and-processing-time
>
> On 2/19/2021 7:29 AM, Rex Fenley wrote:
>
> Hello,
>
> When using PROCTIME() in CREATE DDL for a source, is the proctime
> attribute a timestamp generated at the time of row ingestion at the source
> and then forwarded through the graph execution, or is proctime attribute a
> placeholder that says "fill me in with a timestamp" once it's being used
> directly by some operator, by some machine?
>
> Thanks!
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>
>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


How is proctime represented?

2021-02-18 Thread Rex Fenley
Hello,

When using PROCTIME() in CREATE DDL for a source, is the proctime attribute
a timestamp generated at the time of row ingestion at the source and then
forwarded through the graph execution, or is proctime attribute a
placeholder that says "fill me in with a timestamp" once it's being used
directly by some operator, by some machine?

Thanks!

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Re: Adding proctime columng to table api

2021-02-17 Thread Rex Fenley
Following from that, I'm not really sure why I need to provide a proctime
timestamp. There should never be any late data with proctime, when a record
arrives it should just be put into whatever the current window is. So why
is there any requirement to specify a time column in this case?

Thanks!

On Wed, Feb 17, 2021 at 9:33 PM Rex Fenley  wrote:

> Also, as an example, I've tried
> table.window(Tumble over 1.seconds on proctime() as $"w")...
> and it failed.
>
> On Wed, Feb 17, 2021 at 9:30 PM Rex Fenley  wrote:
>
>> Hi,
>>
>> When using streaming api, if I want a tumbling window on proctime all I
>> have to do is the following:
>> table.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))...
>> I don't even need to explicitly create a proctime column.
>>
>> However, adding an intermediate tumbling window on proctime using the
>> table api has proved more difficult.
>>
>> The docs seem to possibly imply that I can only add a proctime column on
>> table creation [1] however this isn't what I want because it adds
>> complexity. I want to only render and use proctime at one intermediate
>> tumbling windowed aggregate in the entire query plan, Therefore, I don't
>> want proctime carried from the beginning of all my tables to where I
>> finally need it, I just want it where I need it. Every combination of
>> things I've tried though has seemed to have failed. Is there any way to do
>> this?
>>
>> Additionally, I don't want to switch to data streams because my tables
>> have retractions and the table api is simpler to use in that sense.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/time_attributes.html
>>
>> Thanks!
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> <https://www.facebook.com/remindhq>
>>
>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>


-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Re: Adding proctime columng to table api

2021-02-17 Thread Rex Fenley
Also, as an example, I've tried
table.window(Tumble over 1.seconds on proctime() as $"w")...
and it failed.

On Wed, Feb 17, 2021 at 9:30 PM Rex Fenley  wrote:

> Hi,
>
> When using streaming api, if I want a tumbling window on proctime all I
> have to do is the following:
> table.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))...
> I don't even need to explicitly create a proctime column.
>
> However, adding an intermediate tumbling window on proctime using the
> table api has proved more difficult.
>
> The docs seem to possibly imply that I can only add a proctime column on
> table creation [1] however this isn't what I want because it adds
> complexity. I want to only render and use proctime at one intermediate
> tumbling windowed aggregate in the entire query plan, Therefore, I don't
> want proctime carried from the beginning of all my tables to where I
> finally need it, I just want it where I need it. Every combination of
> things I've tried though has seemed to have failed. Is there any way to do
> this?
>
> Additionally, I don't want to switch to data streams because my tables
> have retractions and the table api is simpler to use in that sense.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/time_attributes.html
>
> Thanks!
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>


-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Adding proctime columng to table api

2021-02-17 Thread Rex Fenley
Hi,

When using streaming api, if I want a tumbling window on proctime all I
have to do is the following:
table.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))...
I don't even need to explicitly create a proctime column.

However, adding an intermediate tumbling window on proctime using the table
api has proved more difficult.

The docs seem to possibly imply that I can only add a proctime column on
table creation [1] however this isn't what I want because it adds
complexity. I want to only render and use proctime at one intermediate
tumbling windowed aggregate in the entire query plan, Therefore, I don't
want proctime carried from the beginning of all my tables to where I
finally need it, I just want it where I need it. Every combination of
things I've tried though has seemed to have failed. Is there any way to do
this?

Additionally, I don't want to switch to data streams because my tables have
retractions and the table api is simpler to use in that sense.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/time_attributes.html

Thanks!

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Re: Proctime consistency

2021-02-04 Thread Rex Fenley
So if I'm reading this correctly, on checkpoint restore, if current machine
time / proc time > checkpointed window proc time, the window will fire
immediately with all the data it had aggregated. If current machine time <
window proc time, the window will just continue where it left off until it
hits the machine's clock time where it is meant to trigger.

That actually also seems perfectly fine for our use case. I see the concern
if there are a lot of proc time windows building up how a lot of triggers
firing could stress resources on a restore, but I don't think that will
matter for our case, we just want to make sure we don't lose any data or
have any gaps between windows.

Please confirm if I got this right, and thank you much for your reply!

On Tue, Feb 2, 2021 at 3:17 AM Timo Walther  wrote:

> As far as I know, we support ROW_NUMBER in SQL that could give you
> sequence number.
>
> Regarding window semantics, the processing time only determines when to
> trigger the evaluation (also mentioned here [1]). A timer is registered
> for the next evaluation. The window content and next timer is part of
> every checkpoint and savepoint. If you restore from a
> checkpoint/savepoint, the stored next timestamp will be checked with the
> current wall clock and an evaluation might be triggered immediately.
> Thus, usually event-time is more useful than processing time. If you
> have a lot of processing time timers set, they might all fire
> immediately during a restore.
>
> So the window will not start over from scratch. But inflight data that
> was about to reach the window operator will be reread from the source
> operator.
>
> Timo
>
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/windows.html#triggers
>
>
> On 01.02.21 20:06, Rex Fenley wrote:
> > We need to aggregate in precisely row order. Is there a safe way to do
> > this? Maybe with some sort of row time sequence number?
> >
> > As written in another email, we're currently doing the following set of
> > operations
> > valcompactedUserDocsStream = userDocsStream
> > .window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
> > .aggregate(newCompactionAggregate())
> >
> > I guess my concern is if we restore from a checkpoint or savepoint I
> > don't understand how the window get's checkpointed and how window
> > alignment works between runs of a job. Will the window just start over
> > from scratch, and re-process any rows that may have been inflight but
> > not finished processing in the previous run's last window?
> >
> > If so then I guess everything will arrive in row order like we want it
> > to. But if a window get's checkpointed with its previous proctime, then
> > it may be misaligned in the next run and drop rows that were in that
> window.
> >
> > On Mon, Feb 1, 2021 at 6:37 AM Timo Walther  > <mailto:twal...@apache.org>> wrote:
> >
> > Hi Rex,
> >
> > processing-time gives you no alignment of operators across nodes.
> Each
> > operation works with its local machine clock that might be
> interrupted
> > by the OS, Java garbage collector, etc. It is always a best effort
> > timing.
> >
> > Regards,
> > Timo
> >
> >
> > On 27.01.21 18:16, Rex Fenley wrote:
> >  > Hello,
> >  >
> >  > I'm looking at ways to deduplicate data and found [1], but does
> > proctime
> >  > get committed with operators? How does this work against clock
> > skew on
> >  > different machines?
> >  >
> >  > Thanks
> >  >
> >  > [1]
> >  >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication
> > <
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication
> >
> >
> >  >
> > <
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication
> > <
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication
> >>
> >  >
> >  > --
> >  >
> >  > Rex Fenley|Software Engineer - Mobile and Backend
> >  >
> >  >
> >  > Remind.com <https://www.remind.com/ <https://www.remind.com/>>|
> > BLOG <http://blog.remind.com/ <http://blog.remind.com/>> |
> >  > FOLLOW US <https://twitter.com/remindhq
> > <https://twitter.com/remindhq>> | LIKE US
> >  > <https://www.facebook.com/remindhq
> > <https://www.facebook.com/remindhq>>
> >  >
> >
> >
> >
> > --
> >
> > Rex Fenley|Software Engineer - Mobile and Backend
> >
> >
> > Remind.com <https://www.remind.com/>| BLOG <http://blog.remind.com/> |
> > FOLLOW US <https://twitter.com/remindhq> | LIKE US
> > <https://www.facebook.com/remindhq>
> >
> >
> >
>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Re: Proctime consistency

2021-02-01 Thread Rex Fenley
We need to aggregate in precisely row order. Is there a safe way to do
this? Maybe with some sort of row time sequence number?

As written in another email, we're currently doing the following set of
operations
val compactedUserDocsStream = userDocsStream
.window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
.aggregate(new CompactionAggregate())

I guess my concern is if we restore from a checkpoint or savepoint I don't
understand how the window get's checkpointed and how window alignment works
between runs of a job. Will the window just start over from scratch, and
re-process any rows that may have been inflight but not finished processing
in the previous run's last window?

If so then I guess everything will arrive in row order like we want it to.
But if a window get's checkpointed with its previous proctime, then it may
be misaligned in the next run and drop rows that were in that window.

On Mon, Feb 1, 2021 at 6:37 AM Timo Walther  wrote:

> Hi Rex,
>
> processing-time gives you no alignment of operators across nodes. Each
> operation works with its local machine clock that might be interrupted
> by the OS, Java garbage collector, etc. It is always a best effort timing.
>
> Regards,
> Timo
>
>
> On 27.01.21 18:16, Rex Fenley wrote:
> > Hello,
> >
> > I'm looking at ways to deduplicate data and found [1], but does proctime
> > get committed with operators? How does this work against clock skew on
> > different machines?
> >
> > Thanks
> >
> > [1]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication
> > <
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication
> >
> >
> > --
> >
> > Rex Fenley|Software Engineer - Mobile and Backend
> >
> >
> > Remind.com <https://www.remind.com/>| BLOG <http://blog.remind.com/> |
> > FOLLOW US <https://twitter.com/remindhq> | LIKE US
> > <https://www.facebook.com/remindhq>
> >
>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Re: What is "Processed (persisted) in-flight data"

2021-01-31 Thread Rex Fenley
Oh, nvm, that's the "Persisted" part which is documented as "*Persisted
in-flight data*: The number of bytes persisted during the alignment (time
between receiving the first and the last checkpoint barrier) over all
acknowledged subtasks. This is > 0 only if the unaligned checkpoints are
enabled."

Thanks!

On Sun, Jan 31, 2021 at 3:10 PM Rex Fenley  wrote:

> Got it, thanks! What is the 0 B part of that?
>
> On Sun, Jan 31, 2021 at 3:43 AM Arvid Heise  wrote:
>
>> Processed in-flight data is the size of data that is processed between
>> the first and last checkpoint barrier in aligned checkpointing. [1]
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/monitoring/checkpoint_monitoring.html#history-tab
>>
>> On Sun, Jan 31, 2021 at 7:45 AM Rex Fenley  wrote:
>>
>>> Hello,
>>>
>>> In the latest version I now see two byte measures on checkpoints. There's
>>> *Checkpointed Data Size*
>>> *9.05 GB*
>>> Which I'm assuming means 9.05 GB were written in this incremental
>>> checkpoint.
>>>
>>> But now there is also
>>> *Processed (persisted) in-flight data*
>>> *152 MB (0 B)*
>>> What is that?
>>>
>>> Thanks!
>>>
>>> --
>>>
>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>
>>>
>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>> <https://www.facebook.com/remindhq>
>>>
>>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>


-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Re: What is "Processed (persisted) in-flight data"

2021-01-31 Thread Rex Fenley
Got it, thanks! What is the 0 B part of that?

On Sun, Jan 31, 2021 at 3:43 AM Arvid Heise  wrote:

> Processed in-flight data is the size of data that is processed between the
> first and last checkpoint barrier in aligned checkpointing. [1]
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/monitoring/checkpoint_monitoring.html#history-tab
>
> On Sun, Jan 31, 2021 at 7:45 AM Rex Fenley  wrote:
>
>> Hello,
>>
>> In the latest version I now see two byte measures on checkpoints. There's
>> *Checkpointed Data Size*
>> *9.05 GB*
>> Which I'm assuming means 9.05 GB were written in this incremental
>> checkpoint.
>>
>> But now there is also
>> *Processed (persisted) in-flight data*
>> *152 MB (0 B)*
>> What is that?
>>
>> Thanks!
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> <https://www.facebook.com/remindhq>
>>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


What is "Processed (persisted) in-flight data"

2021-01-30 Thread Rex Fenley
Hello,

In the latest version I now see two byte measures on checkpoints. There's
*Checkpointed Data Size*
*9.05 GB*
Which I'm assuming means 9.05 GB were written in this incremental
checkpoint.

But now there is also
*Processed (persisted) in-flight data*
*152 MB (0 B)*
What is that?

Thanks!

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Re: Deduplicating record amplification

2021-01-29 Thread Rex Fenley
Great, thank you for the confirmation!

On Thu, Jan 28, 2021 at 11:25 PM Arvid Heise  wrote:

> Hi Rex,
>
> there cannot be any late event in processing time by definition (maybe on
> a quantum computer?), so you should be fine. The timestamp of records in
> processing time is monotonously increasing.
>
> Best,
>
> Arvid
>
> On Fri, Jan 29, 2021 at 1:14 AM Rex Fenley  wrote:
>
>> Switching to TumblingProcessingTimeWindows seems to have solved that
>> problem.
>>
>> For my own understanding, this won't have any "late" and therefore
>> dropped records right? We cannot blindly drop a record from the aggregate
>> evaluation, it just needs to take all the records it gets in a window and
>> process them and then the aggregate will take whatever is last in-order.
>>
>> Thanks!
>>
>> On Thu, Jan 28, 2021 at 1:01 PM Rex Fenley  wrote:
>>
>>> It looks like it wants me to call assignTimestampsAndWatermarks but I
>>> already have a timer on my window which I'd expect everything entering this
>>> stream would simply be aggregated during that window
>>> .window(TumblingEventTimeWindows.of(Time.seconds(1)))
>>>
>>> On Thu, Jan 28, 2021 at 12:59 PM Rex Fenley  wrote:
>>>
>>>> I think I may have been affected by some late night programming.
>>>>
>>>> Slightly revised how I'm using my aggregate
>>>> val userDocsStream =
>>>> this.tableEnv
>>>> .toRetractStream(userDocsTable, classOf[Row])
>>>> .keyBy(_.f1.getField(0))
>>>> val compactedUserDocsStream = userDocsStream
>>>> .window(TumblingEventTimeWindows.of(Time.seconds(1)))
>>>> .aggregate(new CompactionAggregate())
>>>> but this now gives me the following exception:
>>>> java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no
>>>> timestamp marker). Is the time characteristic set to 'ProcessingTime',
>>>> or did you forget to call
>>>> 'DataStream.assignTimestampsAndWatermarks(...)'?
>>>> at org.apache.flink.streaming.api.windowing.assigners.
>>>> TumblingEventTimeWindows.assignWindows(TumblingEventTimeWindows.java:69
>>>> )
>>>> at org.apache.flink.streaming.runtime.operators.windowing.
>>>> WindowOperator.processElement(WindowOperator.java:295)
>>>> at org.apache.flink.streaming.runtime.tasks.
>>>> OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(
>>>> OneInputStreamTask.java:161)
>>>> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
>>>> .processElement(StreamTaskNetworkInput.java:178)
>>>> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
>>>> .emitNext(StreamTaskNetworkInput.java:153)
>>>> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
>>>> .processInput(StreamOneInputProcessor.java:67)
>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask
>>>> .processInput(StreamTask.java:351)
>>>> at org.apache.flink.streaming.runtime.tasks.mailbox.
>>>> MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
>>>> at org.apache.flink.streaming.runtime.tasks.mailbox.
>>>> MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask
>>>> .runMailboxLoop(StreamTask.java:566)
>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>>>> StreamTask.java:536)
>>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>>>> at java.base/java.lang.Thread.run(Thread.java:829)
>>>>
>>>> Which I'm not at all sure how to interpret
>>>>
>>>> On Wed, Jan 27, 2021 at 11:57 PM Rex Fenley  wrote:
>>>>
>>>>> Ok, that sounds like it confirms my expectations.
>>>>>
>>>>> So I tried running my above code and had to slightly edit to using
>>>>> java Tuple2 because our execution environment stuff is all in Java.
>>>>>
>>>>> class CompactionAggregate
>>>>> extends AggregateFunction[
>>>>> Tuple2[java.lang.Boolean, Row],
>>>>> Tuple2[java.lang.Boolean, Row],
>>>>> Tuple2[java.lang.Boolean, Row]
>>>>> ] {
>>>>>
>>>>> override def createAccumulator() = new Tuple2(false, 

Re: Deduplicating record amplification

2021-01-28 Thread Rex Fenley
Switching to TumblingProcessingTimeWindows seems to have solved that
problem.

For my own understanding, this won't have any "late" and therefore dropped
records right? We cannot blindly drop a record from the aggregate
evaluation, it just needs to take all the records it gets in a window and
process them and then the aggregate will take whatever is last in-order.

Thanks!

On Thu, Jan 28, 2021 at 1:01 PM Rex Fenley  wrote:

> It looks like it wants me to call assignTimestampsAndWatermarks but I
> already have a timer on my window which I'd expect everything entering this
> stream would simply be aggregated during that window
> .window(TumblingEventTimeWindows.of(Time.seconds(1)))
>
> On Thu, Jan 28, 2021 at 12:59 PM Rex Fenley  wrote:
>
>> I think I may have been affected by some late night programming.
>>
>> Slightly revised how I'm using my aggregate
>> val userDocsStream =
>> this.tableEnv
>> .toRetractStream(userDocsTable, classOf[Row])
>> .keyBy(_.f1.getField(0))
>> val compactedUserDocsStream = userDocsStream
>> .window(TumblingEventTimeWindows.of(Time.seconds(1)))
>> .aggregate(new CompactionAggregate())
>> but this now gives me the following exception:
>> java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no
>> timestamp marker). Is the time characteristic set to 'ProcessingTime', or
>> did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?
>> at org.apache.flink.streaming.api.windowing.assigners.
>> TumblingEventTimeWindows.assignWindows(TumblingEventTimeWindows.java:69)
>> at org.apache.flink.streaming.runtime.operators.windowing.
>> WindowOperator.processElement(WindowOperator.java:295)
>> at org.apache.flink.streaming.runtime.tasks.
>> OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask
>> .java:161)
>> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
>> .processElement(StreamTaskNetworkInput.java:178)
>> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
>> .emitNext(StreamTaskNetworkInput.java:153)
>> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
>> .processInput(StreamOneInputProcessor.java:67)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
>> StreamTask.java:351)
>> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
>> .runMailboxStep(MailboxProcessor.java:191)
>> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
>> .runMailboxLoop(MailboxProcessor.java:181)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask
>> .runMailboxLoop(StreamTask.java:566)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>> StreamTask.java:536)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>> at java.base/java.lang.Thread.run(Thread.java:829)
>>
>> Which I'm not at all sure how to interpret
>>
>> On Wed, Jan 27, 2021 at 11:57 PM Rex Fenley  wrote:
>>
>>> Ok, that sounds like it confirms my expectations.
>>>
>>> So I tried running my above code and had to slightly edit to using java
>>> Tuple2 because our execution environment stuff is all in Java.
>>>
>>> class CompactionAggregate
>>> extends AggregateFunction[
>>> Tuple2[java.lang.Boolean, Row],
>>> Tuple2[java.lang.Boolean, Row],
>>> Tuple2[java.lang.Boolean, Row]
>>> ] {
>>>
>>> override def createAccumulator() = new Tuple2(false, null)
>>>
>>> // Just take the lastest value to compact.
>>> override def add(
>>> value: Tuple2[java.lang.Boolean, Row],
>>> accumulator: Tuple2[java.lang.Boolean, Row]
>>> ) =
>>> value
>>>
>>> override def getResult(accumulator: Tuple2[java.lang.Boolean, Row]) =
>>> accumulator
>>>
>>> // This is a required function that we don't use.
>>> override def merge(
>>> a: Tuple2[java.lang.Boolean, Row],
>>> b: Tuple2[java.lang.Boolean, Row]
>>> ) =
>>> throw new NotImplementedException()
>>> }
>>>
>>> But when running I get the following error:
>>> >Caused by: java.lang.RuntimeException: Could not extract key from
>>> [redacted row]
>>> >...
>>> > Caused by: org.apache.flink.table.api.ValidationException: Unsupported
>>> kind 'DELETE' of a row [redacted row]. Only rows with 'INSERT' kind are
>>> supp

Re: Deduplicating record amplification

2021-01-28 Thread Rex Fenley
It looks like it wants me to call assignTimestampsAndWatermarks but I
already have a timer on my window which I'd expect everything entering this
stream would simply be aggregated during that window
.window(TumblingEventTimeWindows.of(Time.seconds(1)))

On Thu, Jan 28, 2021 at 12:59 PM Rex Fenley  wrote:

> I think I may have been affected by some late night programming.
>
> Slightly revised how I'm using my aggregate
> val userDocsStream =
> this.tableEnv
> .toRetractStream(userDocsTable, classOf[Row])
> .keyBy(_.f1.getField(0))
> val compactedUserDocsStream = userDocsStream
> .window(TumblingEventTimeWindows.of(Time.seconds(1)))
> .aggregate(new CompactionAggregate())
> but this now gives me the following exception:
> java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no
> timestamp marker). Is the time characteristic set to 'ProcessingTime', or
> did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?
> at org.apache.flink.streaming.api.windowing.assigners.
> TumblingEventTimeWindows.assignWindows(TumblingEventTimeWindows.java:69)
> at org.apache.flink.streaming.runtime.operators.windowing.
> WindowOperator.processElement(WindowOperator.java:295)
> at org.apache.flink.streaming.runtime.tasks.
> OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask
> .java:161)
> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
> .processElement(StreamTaskNetworkInput.java:178)
> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
> .emitNext(StreamTaskNetworkInput.java:153)
> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
> .processInput(StreamOneInputProcessor.java:67)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
> StreamTask.java:351)
> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
> .runMailboxStep(MailboxProcessor.java:191)
> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
> .runMailboxLoop(MailboxProcessor.java:181)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
> StreamTask.java:566)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:536)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> at java.base/java.lang.Thread.run(Thread.java:829)
>
> Which I'm not at all sure how to interpret
>
> On Wed, Jan 27, 2021 at 11:57 PM Rex Fenley  wrote:
>
>> Ok, that sounds like it confirms my expectations.
>>
>> So I tried running my above code and had to slightly edit to using java
>> Tuple2 because our execution environment stuff is all in Java.
>>
>> class CompactionAggregate
>> extends AggregateFunction[
>> Tuple2[java.lang.Boolean, Row],
>> Tuple2[java.lang.Boolean, Row],
>> Tuple2[java.lang.Boolean, Row]
>> ] {
>>
>> override def createAccumulator() = new Tuple2(false, null)
>>
>> // Just take the lastest value to compact.
>> override def add(
>> value: Tuple2[java.lang.Boolean, Row],
>> accumulator: Tuple2[java.lang.Boolean, Row]
>> ) =
>> value
>>
>> override def getResult(accumulator: Tuple2[java.lang.Boolean, Row]) =
>> accumulator
>>
>> // This is a required function that we don't use.
>> override def merge(
>> a: Tuple2[java.lang.Boolean, Row],
>> b: Tuple2[java.lang.Boolean, Row]
>> ) =
>> throw new NotImplementedException()
>> }
>>
>> But when running I get the following error:
>> >Caused by: java.lang.RuntimeException: Could not extract key from
>> [redacted row]
>> >...
>> > Caused by: org.apache.flink.table.api.ValidationException: Unsupported
>> kind 'DELETE' of a row [redacted row]. Only rows with 'INSERT' kind are
>> supported when converting to an expression.
>>
>> I'm googling around and haven't found anything informative about what
>> might be causing this issue. Any ideas?
>>
>> I'll also take a look at the SQL functions you suggested and see if I can
>> use those.
>>
>> Thanks!
>>
>>
>>
>> On Wed, Jan 27, 2021 at 11:48 PM Arvid Heise  wrote:
>>
>>> Hi Rex,
>>>
>>> if your keyby (and with join/grouping/windowing) is random or not
>>> depends on the relationship of the join/grouping key with your Kafka
>>> partitioning key.
>>>
>>> Say your partitioning key is document_id. Then, any join/grouping key
>>> that is composed of (or is exactly) document_id, 

Re: Deduplicating record amplification

2021-01-28 Thread Rex Fenley
I think I may have been affected by some late night programming.

Slightly revised how I'm using my aggregate
val userDocsStream =
this.tableEnv
.toRetractStream(userDocsTable, classOf[Row])
.keyBy(_.f1.getField(0))
val compactedUserDocsStream = userDocsStream
.window(TumblingEventTimeWindows.of(Time.seconds(1)))
.aggregate(new CompactionAggregate())
but this now gives me the following exception:
java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no
timestamp marker). Is the time characteristic set to 'ProcessingTime', or
did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?
at org.apache.flink.streaming.api.windowing.assigners.
TumblingEventTimeWindows.assignWindows(TumblingEventTimeWindows.java:69)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator
.processElement(WindowOperator.java:295)
at org.apache.flink.streaming.runtime.tasks.
OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask
.java:161)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
.processElement(StreamTaskNetworkInput.java:178)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
.emitNext(StreamTaskNetworkInput.java:153)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
.processInput(StreamOneInputProcessor.java:67)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
StreamTask.java:351)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
.runMailboxStep(MailboxProcessor.java:191)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
.runMailboxLoop(MailboxProcessor.java:181)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
StreamTask.java:566)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:536)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.base/java.lang.Thread.run(Thread.java:829)

Which I'm not at all sure how to interpret

On Wed, Jan 27, 2021 at 11:57 PM Rex Fenley  wrote:

> Ok, that sounds like it confirms my expectations.
>
> So I tried running my above code and had to slightly edit to using java
> Tuple2 because our execution environment stuff is all in Java.
>
> class CompactionAggregate
> extends AggregateFunction[
> Tuple2[java.lang.Boolean, Row],
> Tuple2[java.lang.Boolean, Row],
> Tuple2[java.lang.Boolean, Row]
> ] {
>
> override def createAccumulator() = new Tuple2(false, null)
>
> // Just take the lastest value to compact.
> override def add(
> value: Tuple2[java.lang.Boolean, Row],
> accumulator: Tuple2[java.lang.Boolean, Row]
> ) =
> value
>
> override def getResult(accumulator: Tuple2[java.lang.Boolean, Row]) =
> accumulator
>
> // This is a required function that we don't use.
> override def merge(
> a: Tuple2[java.lang.Boolean, Row],
> b: Tuple2[java.lang.Boolean, Row]
> ) =
> throw new NotImplementedException()
> }
>
> But when running I get the following error:
> >Caused by: java.lang.RuntimeException: Could not extract key from
> [redacted row]
> >...
> > Caused by: org.apache.flink.table.api.ValidationException: Unsupported
> kind 'DELETE' of a row [redacted row]. Only rows with 'INSERT' kind are
> supported when converting to an expression.
>
> I'm googling around and haven't found anything informative about what
> might be causing this issue. Any ideas?
>
> I'll also take a look at the SQL functions you suggested and see if I can
> use those.
>
> Thanks!
>
>
>
> On Wed, Jan 27, 2021 at 11:48 PM Arvid Heise  wrote:
>
>> Hi Rex,
>>
>> if your keyby (and with join/grouping/windowing) is random or not depends
>> on the relationship of the join/grouping key with your Kafka partitioning
>> key.
>>
>> Say your partitioning key is document_id. Then, any join/grouping key
>> that is composed of (or is exactly) document_id, will retain the order. You
>> should always ask yourself the question: can two records coming from the
>> ordered Kafka partition X be processed by two different operator instances.
>> For a join/grouping operator, there is only the strict guarantee that all
>> records with the same key will be shuffled into the same operator instance.
>>
>> Your compaction in general looks good but I'm not deep into Table API.
>> I'm quite sure that *FIRST_VALUE* and *LAST_VALUE* functions in Table
>> API should already do what you want. [1]
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/systemFunctions.html#aggregate-functions
>>
>> On Thu, Jan

Re: Deduplicating record amplification

2021-01-27 Thread Rex Fenley
Ok, that sounds like it confirms my expectations.

So I tried running my above code and had to slightly edit to using java
Tuple2 because our execution environment stuff is all in Java.

class CompactionAggregate
extends AggregateFunction[
Tuple2[java.lang.Boolean, Row],
Tuple2[java.lang.Boolean, Row],
Tuple2[java.lang.Boolean, Row]
] {

override def createAccumulator() = new Tuple2(false, null)

// Just take the lastest value to compact.
override def add(
value: Tuple2[java.lang.Boolean, Row],
accumulator: Tuple2[java.lang.Boolean, Row]
) =
value

override def getResult(accumulator: Tuple2[java.lang.Boolean, Row]) =
accumulator

// This is a required function that we don't use.
override def merge(
a: Tuple2[java.lang.Boolean, Row],
b: Tuple2[java.lang.Boolean, Row]
) =
throw new NotImplementedException()
}

But when running I get the following error:
>Caused by: java.lang.RuntimeException: Could not extract key from
[redacted row]
>...
> Caused by: org.apache.flink.table.api.ValidationException: Unsupported
kind 'DELETE' of a row [redacted row]. Only rows with 'INSERT' kind are
supported when converting to an expression.

I'm googling around and haven't found anything informative about what might
be causing this issue. Any ideas?

I'll also take a look at the SQL functions you suggested and see if I can
use those.

Thanks!



On Wed, Jan 27, 2021 at 11:48 PM Arvid Heise  wrote:

> Hi Rex,
>
> if your keyby (and with join/grouping/windowing) is random or not depends
> on the relationship of the join/grouping key with your Kafka partitioning
> key.
>
> Say your partitioning key is document_id. Then, any join/grouping key that
> is composed of (or is exactly) document_id, will retain the order. You
> should always ask yourself the question: can two records coming from the
> ordered Kafka partition X be processed by two different operator instances.
> For a join/grouping operator, there is only the strict guarantee that all
> records with the same key will be shuffled into the same operator instance.
>
> Your compaction in general looks good but I'm not deep into Table API. I'm
> quite sure that *FIRST_VALUE* and *LAST_VALUE* functions in Table API
> should already do what you want. [1]
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/systemFunctions.html#aggregate-functions
>
> On Thu, Jan 28, 2021 at 6:45 AM Rex Fenley  wrote:
>
>> In addition to those questions, assuming that keyed streams are in order,
>> I've come up with the following solution to compact our records and only
>> pick the most recent one per id before sending to the ES sink.
>>
>> The first item in the Row is the document ID / primary key which we want
>> to compact records on.
>>
>> val userDocsStream = userDocsTable.toRetractStream[Row].keyBy(_._2.get(0))
>> userDocsStream
>>   .window(TumblingEventTimeWindows.of(Time.seconds(1)))
>>   .aggregate(new CompactionAggregate())class CompactionAggregate
>> extends AggregateFunction[
>>   (Boolean, Row),
>>   (Boolean, Row),
>>   (Boolean, Row)
>> ] {  override def createAccumulator() = (false, null)  // Just take the 
>> latest value to compact.
>>   override def add(value: (Boolean, Row), accumulator: (Boolean, Row)) =
>> value  override def getResult(accumulator: (Boolean, Row)) = accumulator 
>>  // This is a required function that we don't use.
>>   override def merge(a: (Boolean, Row), b: (Boolean, Row)) =
>> throw new NotImplementedException()
>> }
>>
>> I'm hoping that if the last record in the window is an insert it picks
>> that if it's a retract then it picks that and then when we send this to the
>> ES sink we will simply check true or false in the first element of the
>> tuple for an insert or delete request to ES. Does this seem like it will
>> work?
>>
>> Thanks!
>>
>>
>> On Wed, Jan 27, 2021 at 12:45 PM Rex Fenley  wrote:
>>
>>> This is great info, thanks!
>>>
>>> My question then becomes, what constitutes a random shuffle? Currently
>>> we're using the Table API with minibatch on flink v1.11.3. Do our joins
>>> output a keyed stream of records by join key or is this random? I imagine
>>> that they'd have to have a key for retracts and accumulates to arrive in
>>> order on the next downstream operator. Same with aggs but on the groupBy
>>> key.
>>>
>>> Does this sound correct to you?
>>>
>>> Thanks!
>>>
>>> On Wed, Jan 27, 2021 at 11:39 AM Arvid Heise  wrote:
>>>
>>>> Hi Rex,
>>>>

Re: Deduplicating record amplification

2021-01-27 Thread Rex Fenley
In addition to those questions, assuming that keyed streams are in order,
I've come up with the following solution to compact our records and only
pick the most recent one per id before sending to the ES sink.

The first item in the Row is the document ID / primary key which we want to
compact records on.

val userDocsStream = userDocsTable.toRetractStream[Row].keyBy(_._2.get(0))
userDocsStream
  .window(TumblingEventTimeWindows.of(Time.seconds(1)))
  .aggregate(new CompactionAggregate())class CompactionAggregate
extends AggregateFunction[
  (Boolean, Row),
  (Boolean, Row),
  (Boolean, Row)
] {  override def createAccumulator() = (false, null)  // Just
take the latest value to compact.
  override def add(value: (Boolean, Row), accumulator: (Boolean, Row)) =
value  override def getResult(accumulator: (Boolean, Row)) =
accumulator  // This is a required function that we don't use.
  override def merge(a: (Boolean, Row), b: (Boolean, Row)) =
throw new NotImplementedException()
}

I'm hoping that if the last record in the window is an insert it picks that
if it's a retract then it picks that and then when we send this to the ES
sink we will simply check true or false in the first element of the tuple
for an insert or delete request to ES. Does this seem like it will work?

Thanks!


On Wed, Jan 27, 2021 at 12:45 PM Rex Fenley  wrote:

> This is great info, thanks!
>
> My question then becomes, what constitutes a random shuffle? Currently
> we're using the Table API with minibatch on flink v1.11.3. Do our joins
> output a keyed stream of records by join key or is this random? I imagine
> that they'd have to have a key for retracts and accumulates to arrive in
> order on the next downstream operator. Same with aggs but on the groupBy
> key.
>
> Does this sound correct to you?
>
> Thanks!
>
> On Wed, Jan 27, 2021 at 11:39 AM Arvid Heise  wrote:
>
>> Hi Rex,
>>
>> indeed these two statements look like they contradict each other, but
>> they are looking at both sides from the same coin.
>> Flink is simply putting records in FIFO in windows. That is, there is no
>> ordering on event time if there are late events. So if your elements arrive
>> ordered, the ordering is retained. If your elements arrive unordered, the
>> same unordered order is retained.
>>
>> However, note that Flink can only guarantee FIFO according to your
>> topology. Consider a source with parallelism 2, each reading data from an
>> ordered kafka partition (k1, k2) respectively. Each partition has records
>> with keys, such that no key appears in both partitions (default behavior if
>> you set keys but no partition while writing to Kafka).
>> 1) Let's assume you do a simple transformation and write them back into
>> kafka with the same key. Then you can be sure that the order of the records
>> is retained.
>>
>> 2) Now you add a random shuffle and have the transformation. Now two
>> successive records may be processed in parallel and there is a race
>> condition who is written first into Kafka. So order is not retained.
>>
>> 3) You shuffle both partitions by the Kafka key (keyby) and do some
>> aggregation. Two successive records with the same key will always be
>> processed by the same aggregation operator. So the order is retained for
>> each key (note that this is what you usually want and want Kafka gives you
>> if you don't set the partition explicitly and just provide a key)
>>
>> 4) You shuffle both partitions by a different key. Then two successive
>> Kafka records could be again calculated in parallel such that there is a
>> race condition.
>>
>> Note that windows are a kind of aggregation.
>>
>> So Flink is never going to restore an ordering that is not there (because
>> it's too costly and there are too many unknowns). But you can infer the
>> guarantees by analyzing your topology.
>>
>> ---
>>
>> Please note that there is a common pitfall when you work with Kafka:
>> - Ordering of records in Kafka is only guaranteed if you set 
>> *max.in.flight.requests.per.connection
>> *to 1
>> *. [1]*
>> *- *Often you also want to set *enable.idempotence* and *acks=all*
>>
>> That is true for the upstream application and if you plan back to write
>> to Kafka you also need to set that in Flink.
>>
>> [1]
>> https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html
>>
>> On Wed, Jan 27, 2021 at 6:08 AM Rex Fenley  wrote:
>>
>>> Hello,
>>>
>>> I began reading
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/conce

Re: Converting non-mini-batch to mini-batch from checkpoint or savepoint

2021-01-27 Thread Rex Fenley
Thanks for the clarification.

On Wed, Jan 27, 2021 at 7:24 PM Jark Wu  wrote:

> Hi Rex,
>
> Currently, it is not state compatible, because we will add a new node
> called MiniBatchAssigner after the source which changes the JobGraph , thus
> uid is different.
>
> Best,
> Jark
>
> On Tue, 26 Jan 2021 at 18:33, Dawid Wysakowicz 
> wrote:
>
>> I am pulling in Jark and Godfrey who are more familiar with the internals
>> of the planner.
>> On 21/01/2021 01:43, Rex Fenley wrote:
>>
>> Just tested this and I couldn't restore from a savepoint. If I do a new
>> job from scratch, can I tune the minibatch parameters and restore from a
>> savepoint without having to make yet another brand new job?
>>
>> Thanks
>>
>>
>> On Wed, Jan 20, 2021 at 12:43 PM Rex Fenley  wrote:
>>
>>> Hello,
>>>
>>> Is it safe to convert a non-mini-batch job to a mini-batch job when
>>> restoring from a checkpoint or a savepoint?
>>>
>>> Thanks
>>>
>>> --
>>>
>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>
>>>
>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>> <https://www.facebook.com/remindhq>
>>>
>>
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> <https://www.facebook.com/remindhq>
>>
>>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Re: A few questions about minibatch

2021-01-27 Thread Rex Fenley
Thanks, that all makes sense!

On Wed, Jan 27, 2021 at 7:00 PM Jark Wu  wrote:

> Hi Rex,
>
> Could you share your query here? It would be helpful to identify the root
> cause if we have the query.
>
> 1) watermark
> The framework automatically adds a node (the MiniBatchAssigner) to
> generate watermark events as the mini-batch id to broadcast and trigger
> mini-batch in the pipeline.
>
> 2) MiniBatchAssigner(interval=[1000ms], mode=[ProcTime]
> It generates a new mini-batch id in an interval of 1000ms in system time.
> The mini-batch id is represented by the watermark event.
>
> 3) TWO_PHASE optimization
> If users want to have TWO_PHASE optimization, it requires the aggregate
> functions all support the merge() method and the mini-batch is enabled.
>
> Best,
> Jark
>
>
>
>
> On Tue, 26 Jan 2021 at 19:01, Dawid Wysakowicz 
> wrote:
>
>> I am pulling Jark and Godfrey who are more familiar with the planner
>> internals.
>>
>> Best,
>>
>> Dawid
>> On 22/01/2021 20:11, Rex Fenley wrote:
>>
>> Hello,
>>
>> Does anyone have any more information here?
>>
>> Thanks!
>>
>> On Wed, Jan 20, 2021 at 9:13 PM Rex Fenley  wrote:
>>
>>> Hi,
>>>
>>> Our job was experiencing high write amplification on aggregates so we
>>> decided to give mini-batch a go. There's a few things I've noticed that are
>>> different from our previous job and I would like some clarification.
>>>
>>> 1) Our operators now say they have Watermarks. We never explicitly added
>>> watermarks, and our state is essentially unbounded across all time since it
>>> consumes from Debezium and reshapes our database data into another store.
>>> Why does it say we have Watermarks then?
>>>
>>> 2) In our sources I see MiniBatchAssigner(interval=[1000ms],
>>> mode=[ProcTime], what does that do?
>>>
>>> 3) I don't really see anything else different yet in the shape of our
>>> plan even though we've turned on
>>> configuration.setString(
>>> "table.optimizer.agg-phase-strategy",
>>> "TWO_PHASE"
>>> )
>>> is there a way to check that this optimization is on? We use user
>>> defined aggregate functions, does it work for UDAF?
>>>
>>> Thanks!
>>>
>>> --
>>>
>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>
>>>
>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>> <https://www.facebook.com/remindhq>
>>>
>>
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> <https://www.facebook.com/remindhq>
>>
>>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Re: Deduplicating record amplification

2021-01-27 Thread Rex Fenley
This is great info, thanks!

My question then becomes, what constitutes a random shuffle? Currently
we're using the Table API with minibatch on flink v1.11.3. Do our joins
output a keyed stream of records by join key or is this random? I imagine
that they'd have to have a key for retracts and accumulates to arrive in
order on the next downstream operator. Same with aggs but on the groupBy
key.

Does this sound correct to you?

Thanks!

On Wed, Jan 27, 2021 at 11:39 AM Arvid Heise  wrote:

> Hi Rex,
>
> indeed these two statements look like they contradict each other, but they
> are looking at both sides from the same coin.
> Flink is simply putting records in FIFO in windows. That is, there is no
> ordering on event time if there are late events. So if your elements arrive
> ordered, the ordering is retained. If your elements arrive unordered, the
> same unordered order is retained.
>
> However, note that Flink can only guarantee FIFO according to your
> topology. Consider a source with parallelism 2, each reading data from an
> ordered kafka partition (k1, k2) respectively. Each partition has records
> with keys, such that no key appears in both partitions (default behavior if
> you set keys but no partition while writing to Kafka).
> 1) Let's assume you do a simple transformation and write them back into
> kafka with the same key. Then you can be sure that the order of the records
> is retained.
>
> 2) Now you add a random shuffle and have the transformation. Now two
> successive records may be processed in parallel and there is a race
> condition who is written first into Kafka. So order is not retained.
>
> 3) You shuffle both partitions by the Kafka key (keyby) and do some
> aggregation. Two successive records with the same key will always be
> processed by the same aggregation operator. So the order is retained for
> each key (note that this is what you usually want and want Kafka gives you
> if you don't set the partition explicitly and just provide a key)
>
> 4) You shuffle both partitions by a different key. Then two successive
> Kafka records could be again calculated in parallel such that there is a
> race condition.
>
> Note that windows are a kind of aggregation.
>
> So Flink is never going to restore an ordering that is not there (because
> it's too costly and there are too many unknowns). But you can infer the
> guarantees by analyzing your topology.
>
> ---
>
> Please note that there is a common pitfall when you work with Kafka:
> - Ordering of records in Kafka is only guaranteed if you set 
> *max.in.flight.requests.per.connection
> *to 1
> *. [1]*
> *- *Often you also want to set *enable.idempotence* and *acks=all*
>
> That is true for the upstream application and if you plan back to write to
> Kafka you also need to set that in Flink.
>
> [1]
> https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html
>
> On Wed, Jan 27, 2021 at 6:08 AM Rex Fenley  wrote:
>
>> Hello,
>>
>> I began reading
>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/concepts/concepts.html#parallel-dataflows
>>
>>-
>>
>>*> Redistributing* streams (between *map()* and *keyBy/window*, as
>>well as between *keyBy/window* and *sink*) change the partitioning of
>>streams. Each *operator subtask* sends data to different target
>>subtasks, depending on the selected transformation. Examples are
>>*keyBy()* (re-partitions by hash code), *broadcast()*, or
>>*rebalance()* (random redistribution). In a *redistributing*
>>exchange, order among elements is only preserved for each pair of sending-
>>and receiving task (for example subtask[1] of *map()* and subtask[2]
>>of *keyBy/window*).
>>
>> This makes it sounds like ordering on the same partition/key is always
>> maintained. Which is exactly the ordering guarantee that I need. This seems
>> to slightly contradict the statement "Flink provides no guarantees about
>> the order of the elements within a window" for keyed state. So is it true
>> that ordering _is_ guaranteed for identical keys?
>>
>> If I'm not mistaken, the state in the TableAPI is always considered keyed
>> state for a join or aggregate. Or am I missing something?
>>
>> Thanks!
>>
>> On Tue, Jan 26, 2021 at 8:53 PM Rex Fenley  wrote:
>>
>>> Our data arrives in order from Kafka, so we are hoping to use that same
>>> order for our processing.
>>>
>>> On Tue, Jan 26, 2021 at 8:40 PM Rex Fenley  wrote:
>>>
>>>> Going further, if "Flink provides no guarantees about the order of the
>>

Proctime consistency

2021-01-27 Thread Rex Fenley
Hello,

I'm looking at ways to deduplicate data and found [1], but does proctime
get committed with operators? How does this work against clock skew on
different machines?

Thanks

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Re: How User-Defined AggregateFunctions handle deletes of all aggregated rows.

2021-01-26 Thread Rex Fenley
Hello,

I still don't have a good understanding of how UDAF in the Table API
handles deletes. If every row aggregated into one groupBy(key) gets a
retract, meaning nothing should be grouped by that key, will the state get
deleted? Is there a way to delete the state for that row i.e. forward a
retract but not an append and remove the state from RocksDB?

Thanks!

On Fri, Dec 11, 2020 at 9:15 AM Rex Fenley  wrote:

> Hi,
>
> Does this question make sense or am I missing something?
>
> Thanks!
>
> On Thu, Dec 10, 2020 at 10:24 AM Rex Fenley  wrote:
>
>> Ok, that makes sense.
>>
>> > You just need to correct the acc state to what it expects to be (say
>> re-evaluate the acc without the record that needs retraction) when you
>> received  the retraction message.
>>
>> So for example, if i just remove all items from acc.groupIdSet on
>> retraction it will know to clear out the state entirely from rocks?
>>
>> If a user gets deleted altogether (and my groupby is on user_id) what
>> sort of retraction do I need to evaluate then? Because I'm thinking now it
>> will need to just delete the state entirely and pass a full retraction of
>> the state downstream, but I don't know if deleting state from rocks happens
>> automatically or I need to make it do that in the retract method somehow.
>>
>> On Wed, Dec 9, 2020 at 6:16 PM Danny Chan  wrote:
>>
>>> No, the group agg, stream-stream join and rank are all stateful
>>> operators which need a state-backend to bookkeep the acc values.
>>>
>>> But it is only required to emit the retractions when the stateful
>>> operator A has a downstream operator B that is also stateful, because the B
>>> needs the retractions to correct the accs. If B is not stateful, just
>>> emitting the new record to override is enough.
>>>
>>> You just need to correct the acc state to what it expects to be (say
>>> re-evaluate the acc without the record that needs retraction) when you
>>> received  the retraction message.
>>>
>>> Rex Fenley  于2020年12月10日周四 上午2:44写道:
>>>
>>>> So from what I'm understanding, the aggregate itself is not a "stateful
>>>> operator" but one may follow it? How does the aggregate accumulator keep
>>>> old values then? It can't all just live in memory, actually, looking at the
>>>> savepoints it looks like there's state associated with our aggregate
>>>> operator.
>>>>
>>>> To clarify my concern too, in my retract function impl in the aggregate
>>>> function class, all I do is remove a value (a group id) from the
>>>> accumulator set (which is an array). For example, if there is only 1
>>>> group_id left for a user and it gets deleted, that group_id will be removed
>>>> from the accumulator set and the set will be empty. I would hope that at
>>>> that point, given that there are no remaining rows for the aggregate, that
>>>> I could or flink will just delete the associated stored accumulator
>>>> altogether i.e. delete `user_id_1 -> []`. Is it possible that both the
>>>> groups and the user need to be deleted for everything to clear from
>>>> storage? That might make more sense actually..
>>>>
>>>> If this doesn't happen, since users delete themselves and their groups
>>>> all the time, we'll be storing all these empty data sets in rocks for no
>>>> reason. To clarify, we're using Debezium as our source and using Flink as a
>>>> materialization engine, so we never want to explicitly set a timeout on any
>>>> of our data, we just want to scale up predictably with our user growth.
>>>>
>>>> Thanks!
>>>>
>>>> On Wed, Dec 9, 2020 at 4:14 AM Danny Chan  wrote:
>>>>
>>>>> Hi, Rex Fenley ~
>>>>>
>>>>> If there is stateful operator as the output of the aggregate function.
>>>>> Then each time the function receives an update (or delete) for the key, 
>>>>> the
>>>>> agg operator would emit 2 messages, one for retracting the old record, one
>>>>> for the new message. For your case, the new message is the DELETE.
>>>>>
>>>>> If there is no stateful operator, the aggregate operator would just
>>>>> emit the update after (the new) message which is the delete.
>>>>>
>>>>> Rex Fenley  于2020年12月9日周三 上午4:30写道:
>>>>>
>>>>>> Hello,
>>>>>

Re: Deduplicating record amplification

2021-01-26 Thread Rex Fenley
Hello,

I began reading
https://ci.apache.org/projects/flink/flink-docs-release-1.0/concepts/concepts.html#parallel-dataflows
-

*> Redistributing* streams (between *map()* and *keyBy/window*, as well as
between *keyBy/window* and *sink*) change the partitioning of streams.
Each *operator
subtask* sends data to different target subtasks, depending on the selected
transformation. Examples are *keyBy()* (re-partitions by hash code),
*broadcast()*, or *rebalance()* (random redistribution). In a
*redistributing* exchange, order among elements is only preserved for each
pair of sending- and receiving task (for example subtask[1] of *map()* and
subtask[2] of *keyBy/window*).
This makes it sounds like ordering on the same partition/key is always
maintained. Which is exactly the ordering guarantee that I need. This seems
to slightly contradict the statement "Flink provides no guarantees about
the order of the elements within a window" for keyed state. So is it true
that ordering _is_ guaranteed for identical keys?

If I'm not mistaken, the state in the TableAPI is always considered keyed
state for a join or aggregate. Or am I missing something?

Thanks!

On Tue, Jan 26, 2021 at 8:53 PM Rex Fenley  wrote:

> Our data arrives in order from Kafka, so we are hoping to use that same
> order for our processing.
>
> On Tue, Jan 26, 2021 at 8:40 PM Rex Fenley  wrote:
>
>> Going further, if "Flink provides no guarantees about the order of the
>> elements within a window" then with minibatch, which I assume uses a window
>> under the hood, any aggregates that expect rows to arrive in order will
>> fail to keep their consistency. Is this correct?
>>
>> On Tue, Jan 26, 2021 at 5:36 PM Rex Fenley  wrote:
>>
>>> Hello,
>>>
>>> We have a job from CDC to a large unbounded Flink plan to Elasticsearch.
>>>
>>> Currently, we have been relentlessly trying to reduce our record
>>> amplification which, when our Elasticsearch index is near fully populated,
>>> completely bottlenecks our write performance. We decided recently to try a
>>> new job using mini-batch. At first this seemed promising but at some point
>>> we began getting huge record amplification in a join operator. It appears
>>> that minibatch may only batch on aggregate operators?
>>>
>>> So we're now thinking that we should have a window before our ES sink
>>> which only takes the last record for any unique document id in the window,
>>> since that's all we really want to send anyway. However, when investigating
>>> turning a table, to a keyed window stream for deduping, and then back into
>>> a table I read the following:
>>>
>>> >Attention Flink provides no guarantees about the order of the elements
>>> within a window. This implies that although an evictor may remove elements
>>> from the beginning of the window, these are not necessarily the ones that
>>> arrive first or last. [1]
>>>
>>> which has put a damper on our investigation.
>>>
>>> I then found the deduplication SQL doc [2], but I have a hard time
>>> parsing what the SQL does and we've never used TemporaryViews or proctime
>>> before.
>>> Is this essentially what we want?
>>> Will just using this SQL be safe for a job that is unbounded and just
>>> wants to deduplicate a document write to whatever the most current one is
>>> (i.e. will restoring from a checkpoint maintain our unbounded consistency
>>> and will deletes work)?
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication
>>>
>>> Thanks!
>>>
>>>
>>> --
>>>
>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>
>>>
>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>> <https://www.facebook.com/remindhq>
>>>
>>
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> <https://www.facebook.com/remindhq>
>>
>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>


-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Re: Deduplicating record amplification

2021-01-26 Thread Rex Fenley
Our data arrives in order from Kafka, so we are hoping to use that same
order for our processing.

On Tue, Jan 26, 2021 at 8:40 PM Rex Fenley  wrote:

> Going further, if "Flink provides no guarantees about the order of the
> elements within a window" then with minibatch, which I assume uses a window
> under the hood, any aggregates that expect rows to arrive in order will
> fail to keep their consistency. Is this correct?
>
> On Tue, Jan 26, 2021 at 5:36 PM Rex Fenley  wrote:
>
>> Hello,
>>
>> We have a job from CDC to a large unbounded Flink plan to Elasticsearch.
>>
>> Currently, we have been relentlessly trying to reduce our record
>> amplification which, when our Elasticsearch index is near fully populated,
>> completely bottlenecks our write performance. We decided recently to try a
>> new job using mini-batch. At first this seemed promising but at some point
>> we began getting huge record amplification in a join operator. It appears
>> that minibatch may only batch on aggregate operators?
>>
>> So we're now thinking that we should have a window before our ES sink
>> which only takes the last record for any unique document id in the window,
>> since that's all we really want to send anyway. However, when investigating
>> turning a table, to a keyed window stream for deduping, and then back into
>> a table I read the following:
>>
>> >Attention Flink provides no guarantees about the order of the elements
>> within a window. This implies that although an evictor may remove elements
>> from the beginning of the window, these are not necessarily the ones that
>> arrive first or last. [1]
>>
>> which has put a damper on our investigation.
>>
>> I then found the deduplication SQL doc [2], but I have a hard time
>> parsing what the SQL does and we've never used TemporaryViews or proctime
>> before.
>> Is this essentially what we want?
>> Will just using this SQL be safe for a job that is unbounded and just
>> wants to deduplicate a document write to whatever the most current one is
>> (i.e. will restoring from a checkpoint maintain our unbounded consistency
>> and will deletes work)?
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication
>>
>> Thanks!
>>
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> <https://www.facebook.com/remindhq>
>>
>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>


-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Re: Deduplicating record amplification

2021-01-26 Thread Rex Fenley
Going further, if "Flink provides no guarantees about the order of the
elements within a window" then with minibatch, which I assume uses a window
under the hood, any aggregates that expect rows to arrive in order will
fail to keep their consistency. Is this correct?

On Tue, Jan 26, 2021 at 5:36 PM Rex Fenley  wrote:

> Hello,
>
> We have a job from CDC to a large unbounded Flink plan to Elasticsearch.
>
> Currently, we have been relentlessly trying to reduce our record
> amplification which, when our Elasticsearch index is near fully populated,
> completely bottlenecks our write performance. We decided recently to try a
> new job using mini-batch. At first this seemed promising but at some point
> we began getting huge record amplification in a join operator. It appears
> that minibatch may only batch on aggregate operators?
>
> So we're now thinking that we should have a window before our ES sink
> which only takes the last record for any unique document id in the window,
> since that's all we really want to send anyway. However, when investigating
> turning a table, to a keyed window stream for deduping, and then back into
> a table I read the following:
>
> >Attention Flink provides no guarantees about the order of the elements
> within a window. This implies that although an evictor may remove elements
> from the beginning of the window, these are not necessarily the ones that
> arrive first or last. [1]
>
> which has put a damper on our investigation.
>
> I then found the deduplication SQL doc [2], but I have a hard time parsing
> what the SQL does and we've never used TemporaryViews or proctime before.
> Is this essentially what we want?
> Will just using this SQL be safe for a job that is unbounded and just
> wants to deduplicate a document write to whatever the most current one is
> (i.e. will restoring from a checkpoint maintain our unbounded consistency
> and will deletes work)?
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication
>
> Thanks!
>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>


-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Deduplicating record amplification

2021-01-26 Thread Rex Fenley
Hello,

We have a job from CDC to a large unbounded Flink plan to Elasticsearch.

Currently, we have been relentlessly trying to reduce our record
amplification which, when our Elasticsearch index is near fully populated,
completely bottlenecks our write performance. We decided recently to try a
new job using mini-batch. At first this seemed promising but at some point
we began getting huge record amplification in a join operator. It appears
that minibatch may only batch on aggregate operators?

So we're now thinking that we should have a window before our ES sink which
only takes the last record for any unique document id in the window, since
that's all we really want to send anyway. However, when investigating
turning a table, to a keyed window stream for deduping, and then back into
a table I read the following:

>Attention Flink provides no guarantees about the order of the elements
within a window. This implies that although an evictor may remove elements
from the beginning of the window, these are not necessarily the ones that
arrive first or last. [1]

which has put a damper on our investigation.

I then found the deduplication SQL doc [2], but I have a hard time parsing
what the SQL does and we've never used TemporaryViews or proctime before.
Is this essentially what we want?
Will just using this SQL be safe for a job that is unbounded and just wants
to deduplicate a document write to whatever the most current one is (i.e.
will restoring from a checkpoint maintain our unbounded consistency and
will deletes work)?

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication

Thanks!


-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Re: Seeing Rocks Native Metrics in Data Dog

2021-01-26 Thread Rex Fenley
Oddly, I'm seeing them now. I'm not sure what has changed. Fwiw, we have
modified the scopes per
https://docs.datadoghq.com/integrations/flink/#metric-collection but their
modifications ids as tags. We do need to modify them according to that
documentation - "*Note*: The system scopes must be remapped for your Flink
metrics to be supported, otherwise they are submitted as custom metrics."
Could we instead add host and ids as tags to our metrics?

Thanks for your help!

On Tue, Jan 26, 2021 at 2:49 PM Chesnay Schepler  wrote:

> It is good to know that something from the task executors arrives at
> datadog.
>
> Do you see any metrics for a specific job, like the numRestarts metric of
> the JobManager?
>
> Are you using the default scope formats
> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/metrics.html#system-scope>,
> or have you modified them?
> Could you try these instead and report back? (I replaced all
> job/task/operator names with their IDs, in case some special character is
> messing with datadog)
>
> metrics.scope.jm: .jobmanager
> metrics.scope.jm.job: .jobmanager.
> metrics.scope.tm: .taskmanager.
> metrics.scope.tm.job: .taskmanager..
> metrics.scope.task:
> .taskmanager
> metrics.scope.operator:
> .taskmanager
>
>
> On 1/26/2021 9:28 PM, Rex Fenley wrote:
>
> All taskmanager and jobmanager logs show up. Anything specific to an
> operator does not.
> For example, flink.taskmanager.Status.JVM.Memory.Heap.Used shows up, but I
> can't see stats on an individual operator.
>
> I mostly followed a combination of
> https://docs.datadoghq.com/integrations/flink/#metric-collection and
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/monitoring/metrics.html#datadog-orgapacheflinkmetricsdatadogdatadoghttpreporter
> since datadog's documentation was slightly out of date.
>
> Thanks
>
> On Tue, Jan 26, 2021 at 10:28 AM Chesnay Schepler 
> wrote:
>
>> Anything metric that is shown in the Flink UI should also appear in
>> DataDog.
>> If this is not the case then something goes wrong within the reporter.
>>
>> Is there anything suspicious in the Flink logs?
>>
>> Can you give some example of metrics that *do* show up in DataDog?
>>
>> On 1/26/2021 6:32 PM, Rex Fenley wrote:
>>
>> Hi,
>>
>> I need to get a deeper dive into how rocks is performing so I turned on
>> Rocks Native Metrics. However, I don't see any of the metrics in DataDog
>> (though I have other Flink metrics in DataDog). I only see rocks metrics in
>> the operator metrics in Flink UI, and unfortunately I can't really zoom in
>> or out of those metrics or compare against multiple operators at a time
>> which makes it really difficult to get an overview of how rocks is doing.
>>
>> Is this there any way to get the Rocks Native Metrics forwarded over to
>> DataDog?
>>
>> Thanks!
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> <https://www.facebook.com/remindhq>
>>
>>
>>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>
>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Re: Seeing Rocks Native Metrics in Data Dog

2021-01-26 Thread Rex Fenley
All taskmanager and jobmanager logs show up. Anything specific to an
operator does not.
For example, flink.taskmanager.Status.JVM.Memory.Heap.Used shows up, but I
can't see stats on an individual operator.

I mostly followed a combination of
https://docs.datadoghq.com/integrations/flink/#metric-collection and
https://ci.apache.org/projects/flink/flink-docs-release-1.11/monitoring/metrics.html#datadog-orgapacheflinkmetricsdatadogdatadoghttpreporter
since datadog's documentation was slightly out of date.

Thanks

On Tue, Jan 26, 2021 at 10:28 AM Chesnay Schepler 
wrote:

> Anything metric that is shown in the Flink UI should also appear in
> DataDog.
> If this is not the case then something goes wrong within the reporter.
>
> Is there anything suspicious in the Flink logs?
>
> Can you give some example of metrics that *do* show up in DataDog?
>
> On 1/26/2021 6:32 PM, Rex Fenley wrote:
>
> Hi,
>
> I need to get a deeper dive into how rocks is performing so I turned on
> Rocks Native Metrics. However, I don't see any of the metrics in DataDog
> (though I have other Flink metrics in DataDog). I only see rocks metrics in
> the operator metrics in Flink UI, and unfortunately I can't really zoom in
> or out of those metrics or compare against multiple operators at a time
> which makes it really difficult to get an overview of how rocks is doing.
>
> Is this there any way to get the Rocks Native Metrics forwarded over to
> DataDog?
>
> Thanks!
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>
>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Seeing Rocks Native Metrics in Data Dog

2021-01-26 Thread Rex Fenley
Hi,

I need to get a deeper dive into how rocks is performing so I turned on
Rocks Native Metrics. However, I don't see any of the metrics in DataDog
(though I have other Flink metrics in DataDog). I only see rocks metrics in
the operator metrics in Flink UI, and unfortunately I can't really zoom in
or out of those metrics or compare against multiple operators at a time
which makes it really difficult to get an overview of how rocks is doing.

Is this there any way to get the Rocks Native Metrics forwarded over to
DataDog?

Thanks!

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Re: A few questions about minibatch

2021-01-22 Thread Rex Fenley
Hello,

Does anyone have any more information here?

Thanks!

On Wed, Jan 20, 2021 at 9:13 PM Rex Fenley  wrote:

> Hi,
>
> Our job was experiencing high write amplification on aggregates so we
> decided to give mini-batch a go. There's a few things I've noticed that are
> different from our previous job and I would like some clarification.
>
> 1) Our operators now say they have Watermarks. We never explicitly added
> watermarks, and our state is essentially unbounded across all time since it
> consumes from Debezium and reshapes our database data into another store.
> Why does it say we have Watermarks then?
>
> 2) In our sources I see MiniBatchAssigner(interval=[1000ms],
> mode=[ProcTime], what does that do?
>
> 3) I don't really see anything else different yet in the shape of our plan
> even though we've turned on
> configuration.setString(
> "table.optimizer.agg-phase-strategy",
> "TWO_PHASE"
> )
> is there a way to check that this optimization is on? We use user defined
> aggregate functions, does it work for UDAF?
>
> Thanks!
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>


-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


A few questions about minibatch

2021-01-20 Thread Rex Fenley
Hi,

Our job was experiencing high write amplification on aggregates so we
decided to give mini-batch a go. There's a few things I've noticed that are
different from our previous job and I would like some clarification.

1) Our operators now say they have Watermarks. We never explicitly added
watermarks, and our state is essentially unbounded across all time since it
consumes from Debezium and reshapes our database data into another store.
Why does it say we have Watermarks then?

2) In our sources I see MiniBatchAssigner(interval=[1000ms],
mode=[ProcTime], what does that do?

3) I don't really see anything else different yet in the shape of our plan
even though we've turned on
configuration.setString(
"table.optimizer.agg-phase-strategy",
"TWO_PHASE"
)
is there a way to check that this optimization is on? We use user defined
aggregate functions, does it work for UDAF?

Thanks!

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Re: Converting non-mini-batch to mini-batch from checkpoint or savepoint

2021-01-20 Thread Rex Fenley
Just tested this and I couldn't restore from a savepoint. If I do a new job
from scratch, can I tune the minibatch parameters and restore from a
savepoint without having to make yet another brand new job?

Thanks


On Wed, Jan 20, 2021 at 12:43 PM Rex Fenley  wrote:

> Hello,
>
> Is it safe to convert a non-mini-batch job to a mini-batch job when
> restoring from a checkpoint or a savepoint?
>
> Thanks
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>


-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Converting non-mini-batch to mini-batch from checkpoint or savepoint

2021-01-20 Thread Rex Fenley
Hello,

Is it safe to convert a non-mini-batch job to a mini-batch job when
restoring from a checkpoint or a savepoint?

Thanks

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Re: Restoring from a checkpoint or savepoint on a different Kafka consumer group

2021-01-20 Thread Rex Fenley
Thanks!

On Tue, Jan 19, 2021 at 9:47 PM Piotr Nowojski 
wrote:

> Hi Rex,
>
> Sorry, I might have misled you. I think you were right in your previous
> email
>
> > So from the sounds of things, regardless of the consumer group's
> offsets, it will always start from a checkpoint or savepoints offsets if
> there are some (unless checkpointing offsets is turned off).
> >
> > Is this interpretation correct?
>
> I think this is correct. `setStartFromGroupOffsets` and other `setStart*`
> variants take effect only if there are no offsets stored in the state. I
> would suggest you try it out regardless.
>
> If you want to duplicate a job for some testing, each of the duplicated
> jobs will have it's own sets of offsets and they will read records
> independently, but starting from the same starting point (when the job was
> duplicated).
>
> Piotrek
>
> wt., 19 sty 2021 o 20:19 Rex Fenley  napisał(a):
>
>> Thank you,
>>
>> That's unfortunate, because I imagine we often will want to duplicate a
>> job in order to do some testing out-of-bound from the normal job while
>> slightly tweaking / tuning things. Is there any way to transfer offsets
>> between consumer groups?
>>
>> On Tue, Jan 19, 2021 at 5:45 AM Piotr Nowojski 
>> wrote:
>>
>>> Hi,
>>>
>>> > I read this as, "The offsets committed to Kafka are ignored, the
>>> offsets committed within a checkpoint are used".
>>>
>>> yes, exactly
>>>
>>> > So from the sounds of things, regardless of the consumer group's
>>> offsets, it will always start from a checkpoint or savepoints offsets if
>>> there are some (unless checkpointing offsets is turned off).
>>>
>>> Yes. But, keep in mind this part:
>>>
>>> > setStartFromGroupOffsets (default behaviour): Start reading partitions
>>> from the consumer group’s (group.id setting in the consumer properties)
>>> committed offsets in Kafka brokers.* If offsets could not be found for
>>> a partition, the auto.offset.reset setting in the properties will be used.*
>>>
>>> As I understand it, if you are using the default
>>> `setStartFromGroupOffsets`, and you happen to change `group.id` (which
>>> is what I believe you were asking about in the first e-mail), after
>>> changing the `group.id` FlinkKafkaConsumer will not be able to found
>>> previously saved offsets in the Flink's state and it will start reading
>>> from completely new set of offsets. The same way as if this would be a
>>> freshly started new job without any state. Those new offsets would be as
>>> specified/defined via `auto.offset.reset`.
>>>
>>> Piotrek
>>>
>>>
>>> pon., 18 sty 2021 o 18:12 Rex Fenley  napisał(a):
>>>
>>>> Thank you,
>>>>
>>>> Some parts that stick out
>>>> >The Flink Kafka Consumer allows configuring the behaviour of how
>>>> offsets are committed back to Kafka brokers. Note that the Flink Kafka
>>>> Consumer does not rely on the committed offsets for fault tolerance
>>>> guarantees. The committed offsets are only a means to expose the consumer’s
>>>> progress for monitoring purposes.
>>>>
>>>> I read this as, "The offsets committed to Kafka are ignored, the
>>>> offsets committed within a checkpoint are used".
>>>>
>>>> >With Flink’s checkpointing enabled, the Flink Kafka Consumer will
>>>> consume records from a topic and periodically checkpoint all its Kafka
>>>> offsets, together with the state of other operations. In case of a job
>>>> failure, Flink will restore the streaming program to the state of the
>>>> latest checkpoint and re-consume the records from Kafka, starting from the
>>>> offsets that were stored in the checkpoint.
>>>>
>>>> This seems to say something similar.
>>>>
>>>> So from the sounds of things, regardless of the consumer group's
>>>> offsets, it will always start from a checkpoint or savepoints offsets if
>>>> there are some (unless checkpointing offsets is turned off).
>>>>
>>>> Is this interpretation correct?
>>>>
>>>> Thanks!
>>>>
>>>>
>>>> On Mon, Jan 18, 2021 at 3:23 AM Piotr Nowojski 
>>>> wrote:
>>>>
>>>>> Hi Rex,
>>>>>
>>>>> I believe this section answers your que

Re: What is checkpoint start delay?

2021-01-19 Thread Rex Fenley
Ok, this makes sense. I'm guessing loading state from S3 into RocksDB is a
large contributor to start delay then.

Thanks!

On Tue, Jan 19, 2021 at 12:16 PM Piotr Nowojski 
wrote:

> Hi Rex,
>
> start delay is not the same as the alignment time. Start delay is the time
> between creation of the checkpoint barrier and the time a task/subtask sees
> a first checkpoint barrier from any of its inputs. Alignment time is the
> time between receiving the first checkpoint barrier on a given subtask and
> the last one. In other words,
>
> start of the checkpoint TS (on JobManager) + start delay on subtask =
> start of the checkpoint TS (on TaskManager)
> start of the checkpoint TS (on TaskManager) + alignment time on subtask =
> end of the checkpoint TS (on TaskManager)
>
> Maybe something in your job must ramp up and record throughput is slower
> during this time, causing higher back pressure, which in turns is causing
> longer checkpointing time for the first checkpoint after recovery. Maybe
> RocksDB is needs to load it's state from disks.
>
> Piotrek
>
> wt., 19 sty 2021 o 20:11 Rex Fenley  napisał(a):
>
>> Thanks for the input.
>>
>> This seems odd though, if start delay is the same as alignment then (1)
>> why is it only ever prominent when right after recovering from a
>> checkpoint? (2) Why is the first checkpoint during the recovery process 10x
>> as long as every other checkpoint? Something else must be going on that's
>> in addition to the normal alignment process.
>>
>> On Tue, Jan 19, 2021 at 8:14 AM Piotr Nowojski 
>> wrote:
>>
>>> Hey Rex,
>>>
>>> What do you mean by "Start Delay" when recovering from a checkpoint? Did
>>> you mean when taking a checkpoint? If so:
>>>
>>> 1. https://www.google.com/search?q=flink+checkpoint+start+delay
>>> 2. top 3 result (at least for me)
>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/monitoring/checkpoint_monitoring.html
>>> > Start Delay: The time it took for the first checkpoint barrier to
>>> reach this subtasks since the checkpoint barrier has been created.
>>>
>>> 3. https://www.google.com/search?q=flink+checkpoint+barrier
>>> 4. top 2 result (at least for me)
>>> https://ci.apache.org/projects/flink/flink-docs-stable/concepts/stateful-stream-processing.html#barriers
>>> > A core element in Flink’s distributed snapshotting are the stream
>>> barriers. These barriers are injected into the data stream and flow with
>>> the records as part of the data stream.
>>>
>>> Long start delay or alignment time means checkpoint barriers are
>>> propagating slowly through the job graph, usually a symptom of a
>>> back-pressure. It's best to solve the back-pressure problem, via optimising
>>> your job or scaling it up.
>>>
>>> Alternatively you could use unaligned checkpoints [1], at a cost of
>>> larger checkpoint size and higher IO usage. Note here that if you are using
>>> Flink 1.12.x, I would refrain from using unaligned checkpoints on the
>>> production because of some bugs [2] that we are fixing right now. On Flink
>>> 1.11.x it should be fine.
>>>
>>> Cheers,
>>> Piotrek
>>>
>>> [1]
>>> https://flink.apache.org/2020/10/15/from-aligned-to-unaligned-checkpoints-part-1.html
>>> [2] https://issues.apache.org/jira/browse/FLINK-20654
>>>
>>>
>>>
>>> pon., 18 sty 2021 o 21:32 Rex Fenley  napisał(a):
>>>
>>>> Hello,
>>>>
>>>> When we are recovering on a checkpoint it will take multiple minutes.
>>>> The time is usually taken by "Start Delay". What is Start Delay and how can
>>>> we optimize for it?
>>>>
>>>> Thanks!
>>>>
>>>> --
>>>>
>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>
>>>>
>>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>>> <https://www.facebook.com/remindhq>
>>>>
>>>
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> <https://www.facebook.com/remindhq>
>>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Re: Restoring from a checkpoint or savepoint on a different Kafka consumer group

2021-01-19 Thread Rex Fenley
Thank you,

That's unfortunate, because I imagine we often will want to duplicate a job
in order to do some testing out-of-bound from the normal job while slightly
tweaking / tuning things. Is there any way to transfer offsets between
consumer groups?

On Tue, Jan 19, 2021 at 5:45 AM Piotr Nowojski  wrote:

> Hi,
>
> > I read this as, "The offsets committed to Kafka are ignored, the offsets
> committed within a checkpoint are used".
>
> yes, exactly
>
> > So from the sounds of things, regardless of the consumer group's
> offsets, it will always start from a checkpoint or savepoints offsets if
> there are some (unless checkpointing offsets is turned off).
>
> Yes. But, keep in mind this part:
>
> > setStartFromGroupOffsets (default behaviour): Start reading partitions
> from the consumer group’s (group.id setting in the consumer properties)
> committed offsets in Kafka brokers.* If offsets could not be found for a
> partition, the auto.offset.reset setting in the properties will be used.*
>
> As I understand it, if you are using the default
> `setStartFromGroupOffsets`, and you happen to change `group.id` (which is
> what I believe you were asking about in the first e-mail), after changing
> the `group.id` FlinkKafkaConsumer will not be able to found previously
> saved offsets in the Flink's state and it will start reading from
> completely new set of offsets. The same way as if this would be a freshly
> started new job without any state. Those new offsets would be as
> specified/defined via `auto.offset.reset`.
>
> Piotrek
>
>
> pon., 18 sty 2021 o 18:12 Rex Fenley  napisał(a):
>
>> Thank you,
>>
>> Some parts that stick out
>> >The Flink Kafka Consumer allows configuring the behaviour of how offsets
>> are committed back to Kafka brokers. Note that the Flink Kafka Consumer
>> does not rely on the committed offsets for fault tolerance guarantees. The
>> committed offsets are only a means to expose the consumer’s progress for
>> monitoring purposes.
>>
>> I read this as, "The offsets committed to Kafka are ignored, the offsets
>> committed within a checkpoint are used".
>>
>> >With Flink’s checkpointing enabled, the Flink Kafka Consumer will
>> consume records from a topic and periodically checkpoint all its Kafka
>> offsets, together with the state of other operations. In case of a job
>> failure, Flink will restore the streaming program to the state of the
>> latest checkpoint and re-consume the records from Kafka, starting from the
>> offsets that were stored in the checkpoint.
>>
>> This seems to say something similar.
>>
>> So from the sounds of things, regardless of the consumer group's offsets,
>> it will always start from a checkpoint or savepoints offsets if there are
>> some (unless checkpointing offsets is turned off).
>>
>> Is this interpretation correct?
>>
>> Thanks!
>>
>>
>> On Mon, Jan 18, 2021 at 3:23 AM Piotr Nowojski 
>> wrote:
>>
>>> Hi Rex,
>>>
>>> I believe this section answers your question [1]
>>>
>>> Piotrek
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-start-position-configuration
>>>
>>> pon., 18 sty 2021 o 09:00 赵一旦  napisał(a):
>>>
>>>> If you changed the consumer group in your new job, the group id will be
>>>> the new one you set.
>>>> The job will continue to consumer the topics from the
>>>> savepoint/checkpoint you specified no matter whether the group id is the
>>>> original one?
>>>>
>>>> Rex Fenley  于2021年1月18日周一 下午12:53写道:
>>>>
>>>>> Hello,
>>>>>
>>>>> When using the Kafka consumer connector, if we restore a from a
>>>>> checkpoint or savepoint using a differently named consumer group than the
>>>>> one we originally ran a job with will it still pick up exactly where it
>>>>> left off or are you locked into using the same consumer group as before?
>>>>>
>>>>> Thanks!
>>>>>
>>>>> --
>>>>>
>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>
>>>>>
>>>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>>>> <https://www.facebook.com/remindhq>
>>>>>
>>>>
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> <https://www.facebook.com/remindhq>
>>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Re: Why use ListView?

2021-01-19 Thread Rex Fenley
Thanks!

On Tue, Jan 19, 2021 at 12:55 AM Timo Walther  wrote:

> As always, this depends on the use case ;-)
>
> In general, you should not get a performance regression in using them.
> But keep in mind that ListViews/MapViews cannot be backed by a state
> backend in every operator, so sometimes they are represented as
> List/Maps on heap.
>
> Regards,
> Timo
>
>
> On 18.01.21 18:28, Rex Fenley wrote:
> > Fascinating, do you have an estimate of what qualifies as a lot of data
> > and therefore when this should be used?
> >
> > Thanks
> >
> > On Mon, Jan 18, 2021 at 12:35 AM Timo Walther  > <mailto:twal...@apache.org>> wrote:
> >
> > Hi Rex,
> >
> > ListView and MapView have been part of Flink for years. However, they
> > were considered as an internal feature and therefore not well
> > documented. MapView is used internally to make distinct aggregates
> work.
> >
> > Because we reworked the type inference of aggregate functions, we
> also
> > added basic documentation for power users.
> >
> > By default an accumulator will be deserialized from state on every
> > access. ListView and MapView are not deserialized entirely on access
> > but
> > delegate directly to a state backend. Thus, only the key that is
> > accessed is deserialized. So if an accumulator stores a lot of data,
> it
> > might be beneficial to use the mentioned abstractions.
> >
> > Regards,
> > Timo
> >
> >
> > On 16.01.21 20:09, Rex Fenley wrote:
> >  > Hello,
> >  >
> >  > In the recent version of Flink docs I read the following [1]:
> >  >  > If an accumulator needs to store large amounts of data,
> >  > |org.apache.flink.table.api.dataview.ListView| and
> >  > |org.apache.flink.table.api.dataview.MapView| provide advanced
> > features
> >  > for leveraging Flink’s state backends in unbounded data scenarios.
> >  > Please see the docs of the corresponding classes for more
> > information
> >  > about this advanced feature.
> >  >
> >  > Our job has unbounded state from Debezium/Kafka, uses RocksDB,
> > and we
> >  > have a number of Aggregators like the following, which group a
> > set of
> >  > ids by some foreign key "group_id". The sets are usually 10-100
> > ids in
> >  > size, but at the largest the sets could at some theoretical point
> > get
> >  > into the tens of thousands of ids (right now largest sets are
> > ~2000 ids).
> >  >
> >  > table.groupBy($"group_id")
> >  > .aggregate(
> >  > newIDsAgg()(
> >  > $"member_id"
> >  > ) as ("member_ids")
> >  > )
> >  > .select($"group_id", $"member_ids")
> >  >
> >  > caseclassIDsAcc(
> >  > varIDs: mutable.Set[Long]
> >  > )
> >  > classIDsAgg extendsAggregateFunction[Row, IDsAcc] {
> >  >
> >  > overridedefcreateAccumulator(): IDsAcc =
> >  > IDsAcc(mutable.Set())
> >  >
> >  > defaccumulate(
> >  > acc: IDsAcc,
> >  > ID: Long
> >  > ): Unit = {
> >  > acc.IDs.add(ID)
> >  > }
> >  >
> >  > defretract(acc: IDsAcc, ID: Long): Unit = {
> >  > acc.IDs.remove(ID)
> >  > }
> >  >
> >  > defresetAccumulator(acc: IDsAcc): Unit = {
> >  > acc.IDs = mutable.Set()
> >  > }
> >  >
> >  > overridedefgetValue(acc: IDsAcc): Row = {
> >  > Row.of(acc.IDs.toArray)
> >  > }
> >  >
> >  > overridedefgetResultType: TypeInformation[Row] = {
> >  > newRowTypeInfo(
> >  > createTypeInformation[Array[Long]]
> >  > )
> >  > }
> >  > }
> >  >
> >  > I read the docs [2] but I don't see it really say anything about
> why
> >  > ListView is better than just using a Set or Array.
> >  > If we were to move from a Set to a ListView what advantages might
> > we see
> >  > in these Aggregates?
> >  >
> >  > I also noticed that ListView existed in 1.11 (we're on 1.11.2),
> > did we
> >  > simply miss this feature? Does it work for 1

Re: What is checkpoint start delay?

2021-01-19 Thread Rex Fenley
Thanks for the input.

This seems odd though, if start delay is the same as alignment then (1) why
is it only ever prominent when right after recovering from a checkpoint?
(2) Why is the first checkpoint during the recovery process 10x as long as
every other checkpoint? Something else must be going on that's in addition
to the normal alignment process.

On Tue, Jan 19, 2021 at 8:14 AM Piotr Nowojski  wrote:

> Hey Rex,
>
> What do you mean by "Start Delay" when recovering from a checkpoint? Did
> you mean when taking a checkpoint? If so:
>
> 1. https://www.google.com/search?q=flink+checkpoint+start+delay
> 2. top 3 result (at least for me)
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/monitoring/checkpoint_monitoring.html
> > Start Delay: The time it took for the first checkpoint barrier to reach
> this subtasks since the checkpoint barrier has been created.
>
> 3. https://www.google.com/search?q=flink+checkpoint+barrier
> 4. top 2 result (at least for me)
> https://ci.apache.org/projects/flink/flink-docs-stable/concepts/stateful-stream-processing.html#barriers
> > A core element in Flink’s distributed snapshotting are the stream
> barriers. These barriers are injected into the data stream and flow with
> the records as part of the data stream.
>
> Long start delay or alignment time means checkpoint barriers are
> propagating slowly through the job graph, usually a symptom of a
> back-pressure. It's best to solve the back-pressure problem, via optimising
> your job or scaling it up.
>
> Alternatively you could use unaligned checkpoints [1], at a cost of larger
> checkpoint size and higher IO usage. Note here that if you are using Flink
> 1.12.x, I would refrain from using unaligned checkpoints on the production
> because of some bugs [2] that we are fixing right now. On Flink 1.11.x it
> should be fine.
>
> Cheers,
> Piotrek
>
> [1]
> https://flink.apache.org/2020/10/15/from-aligned-to-unaligned-checkpoints-part-1.html
> [2] https://issues.apache.org/jira/browse/FLINK-20654
>
>
>
> pon., 18 sty 2021 o 21:32 Rex Fenley  napisał(a):
>
>> Hello,
>>
>> When we are recovering on a checkpoint it will take multiple minutes. The
>> time is usually taken by "Start Delay". What is Start Delay and how can we
>> optimize for it?
>>
>> Thanks!
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> <https://www.facebook.com/remindhq>
>>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


What is checkpoint start delay?

2021-01-18 Thread Rex Fenley
Hello,

When we are recovering on a checkpoint it will take multiple minutes. The
time is usually taken by "Start Delay". What is Start Delay and how can we
optimize for it?

Thanks!

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Re: Elasticsearch config maxes can not be disabled

2021-01-18 Thread Rex Fenley
Will do. Thank you for digging in and finding this!

On Mon, Jan 18, 2021 at 3:01 AM Dawid Wysakowicz 
wrote:

> I've checked it again, with ES 7 this time (I missed that previously and
> checked with ES 6) and actually you're right it fails for value -1. I
> created a JIRA[1] to fix it. Until this is merged unfortunately, I can't
> think of a way to actually disable it.
>
> For a future reference. It would be really really nice if you could
> provide us with a full stack trace instead of just the message, as it is
> much much easier to track back the problem that way.
>
> Best,
>
> Dawid
>
> [1] https://issues.apache.org/jira/browse/FLINK-21009
> On 15/01/2021 21:28, Rex Fenley wrote:
>
> Yes, use the same SQL and change '0's to '-1'. We received "Caused by:
> java.lang.IllegalArgumentException: Could not parse value '-1' for key
> 'sink.bulk-flush.max-size'."
>
> On Fri, Jan 15, 2021 at 6:04 AM Dawid Wysakowicz 
> wrote:
>
>> Hi Rex,
>>
>> As I said in my previous email the documentation for
>> sink.bulk-flush.max-actions is wrong. You should be able to disable it with
>> -1. I've just checked it on the 1.11.2 tag and it seems to be working just
>> fine with:
>>
>> CREATE TABLE esTable (
>>
>> a BIGINT NOT NULL,
>> b TIME,
>> c STRING NOT NULL,
>> d FLOAT,
>> e TINYINT NOT NULL,
>> f DATE,
>> g TIMESTAMP NOT NULL,
>> h as a + 2,
>> PRIMARY KEY (a, g) NOT ENFORCED
>> )
>> WITH (
>> 'connector'='elasticsearch-6',
>> 'index'='table-api',
>> 'document-type'='MyType',
>> 'hosts'='http://127.0.0.1:9200',
>> 'sink.flush-on-checkpoint'='false',
>> 'sink.bulk-flush.max-actions'='-1',
>> 'sink.bulk-flush.max-size'='0'
>> )
>>
>> If it still does not work for you with -1 could you share an example how
>> I can reproduce the problem.
>>
>> Best,
>>
>> Dawid
>> On 14/01/2021 18:08, Rex Fenley wrote:
>>
>> Flink 1.11.2
>>
>> CREATE TABLE sink_es (
>> ...
>> ) WITH (
>> 'connector' = 'elasticsearch-7',
>> 'hosts' = '${sys:proxyEnv.ELASTICSEARCH_HOSTS}',
>> 'index' = '${sys:graph.flink.index_name}',
>> 'format' = 'json',
>> 'sink.bulk-flush.max-actions' = '0',
>> 'sink.bulk-flush.max-size' = '0',
>> 'sink.bulk-flush.interval' = '1s',
>> 'sink.bulk-flush.backoff.delay' = '1s',
>> 'sink.bulk-flush.backoff.max-retries' = '4',
>> 'sink.bulk-flush.backoff.strategy' = 'CONSTANT'
>> )
>>
>> On Thu, Jan 14, 2021 at 4:16 AM Dawid Wysakowicz 
>> wrote:
>>
>>> Hi,
>>>
>>> First of all, what Flink versions are you using?
>>>
>>> You are right it is a mistake in the documentation of the
>>> sink.bulk-flush.max-actions. It should say: Can be set to '-1' to
>>> disable it. I created a ticket[1] to track that. And as far as I can tell
>>> and I quickly checked that it should work. As for the
>>> sink.bulk-flush.max-size you should be able to disable it with a value of
>>> '0'.
>>>
>>> Could you share with us how do you use the connector? Could you also
>>> share the full stack trace for the exception you're getting? Are you
>>> creating the table with a CREATE statement?
>>>
>>> Best,
>>>
>>> Dawid
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-20979
>>> On 13/01/2021 20:10, Rex Fenley wrote:
>>>
>>> Hello,
>>>
>>> It doesn't seem like we can disable max actions and max size for
>>> Elasticsearch connector.
>>>
>>> Docs:
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/elasticsearch.html#sink-bulk-flush-max-actions
>>> sink.bulk-flush.max-actions optional 1000 Integer Maximum number of
>>> buffered actions per bulk request. Can be set to '0' to disable it.
>>> sink.bulk-flush.max-size optional 2mb MemorySize Maximum size in memory
>>> of buffered actions per bulk request. Must be in MB granularity. Can be set
>>> to '0' to disable it.
>>> Rea

Re: Why use ListView?

2021-01-18 Thread Rex Fenley
Fascinating, do you have an estimate of what qualifies as a lot of data and
therefore when this should be used?

Thanks

On Mon, Jan 18, 2021 at 12:35 AM Timo Walther  wrote:

> Hi Rex,
>
> ListView and MapView have been part of Flink for years. However, they
> were considered as an internal feature and therefore not well
> documented. MapView is used internally to make distinct aggregates work.
>
> Because we reworked the type inference of aggregate functions, we also
> added basic documentation for power users.
>
> By default an accumulator will be deserialized from state on every
> access. ListView and MapView are not deserialized entirely on access but
> delegate directly to a state backend. Thus, only the key that is
> accessed is deserialized. So if an accumulator stores a lot of data, it
> might be beneficial to use the mentioned abstractions.
>
> Regards,
> Timo
>
>
> On 16.01.21 20:09, Rex Fenley wrote:
> > Hello,
> >
> > In the recent version of Flink docs I read the following [1]:
> >  > If an accumulator needs to store large amounts of data,
> > |org.apache.flink.table.api.dataview.ListView| and
> > |org.apache.flink.table.api.dataview.MapView| provide advanced features
> > for leveraging Flink’s state backends in unbounded data scenarios.
> > Please see the docs of the corresponding classes for more information
> > about this advanced feature.
> >
> > Our job has unbounded state from Debezium/Kafka, uses RocksDB, and we
> > have a number of Aggregators like the following, which group a set of
> > ids by some foreign key "group_id". The sets are usually 10-100 ids in
> > size, but at the largest the sets could at some theoretical point get
> > into the tens of thousands of ids (right now largest sets are ~2000 ids).
> >
> > table.groupBy($"group_id")
> > .aggregate(
> > newIDsAgg()(
> > $"member_id"
> > ) as ("member_ids")
> > )
> > .select($"group_id", $"member_ids")
> >
> > caseclassIDsAcc(
> > varIDs: mutable.Set[Long]
> > )
> > classIDsAgg extendsAggregateFunction[Row, IDsAcc] {
> >
> > overridedefcreateAccumulator(): IDsAcc =
> > IDsAcc(mutable.Set())
> >
> > defaccumulate(
> > acc: IDsAcc,
> > ID: Long
> > ): Unit = {
> > acc.IDs.add(ID)
> > }
> >
> > defretract(acc: IDsAcc, ID: Long): Unit = {
> > acc.IDs.remove(ID)
> > }
> >
> > defresetAccumulator(acc: IDsAcc): Unit = {
> > acc.IDs = mutable.Set()
> > }
> >
> > overridedefgetValue(acc: IDsAcc): Row = {
> > Row.of(acc.IDs.toArray)
> > }
> >
> > overridedefgetResultType: TypeInformation[Row] = {
> > newRowTypeInfo(
> > createTypeInformation[Array[Long]]
> > )
> > }
> > }
> >
> > I read the docs [2] but I don't see it really say anything about why
> > ListView is better than just using a Set or Array.
> > If we were to move from a Set to a ListView what advantages might we see
> > in these Aggregates?
> >
> > I also noticed that ListView existed in 1.11 (we're on 1.11.2), did we
> > simply miss this feature? Does it work for 1.11.x too?
> >
> > Thanks!
> >
> > [1]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/udfs.html#mandatory-and-optional-methods
> > <
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/udfs.html#mandatory-and-optional-methods
> >
> > [2]
> >
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/api/dataview/ListView.html
> > <
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/api/dataview/ListView.html
> >
> >
> > --
> >
> > Rex Fenley|Software Engineer - Mobile and Backend
> >
> >
> > Remind.com <https://www.remind.com/>| BLOG <http://blog.remind.com/> |
> > FOLLOW US <https://twitter.com/remindhq> | LIKE US
> > <https://www.facebook.com/remindhq>
> >
>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Re: Flink ID hashing

2021-01-18 Thread Rex Fenley
This is great info. Looks like it uses murmur hash below the surface too
[1].

Thanks!

[1]
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java#L76

On Mon, Jan 18, 2021 at 1:38 AM Timo Walther  wrote:

> Hi Rex,
>
> for questions like this, I would recommend to checkout the source code
> as well.
>
> Search for subclasses of `StreamPartitioner`. For example, for keyBy
> Flink uses:
>
>
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java
>
> which uses
>
>
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
>
>
> Flink tries to avoid redistribution. Basically redistribution only
> occurs when performing a GROUP BY or when having operators with
> different parallelism. For Table API and SQL, you can print the
> shuffling steps via `Table.explain()`. They are indicated with an
> `Exchange` operation
>
> I hope this helps.
>
> Regards,
> Timo
>
>
> On 16.01.21 19:45, Rex Fenley wrote:
> > Hello,
> >
> > I'm wondering what sort of algorithm flink uses to map an Integer ID to
> > a subtask when distributing data. Also, what operators from the TableAPI
> > cause data to be redistributed? I know Joins will, what about
> > Aggregates, Sources, Filters?
> >
> > Thanks!
> >
> > --
> >
> > Rex Fenley|Software Engineer - Mobile and Backend
> >
> >
> > Remind.com <https://www.remind.com/>| BLOG <http://blog.remind.com/> |
> > FOLLOW US <https://twitter.com/remindhq> | LIKE US
> > <https://www.facebook.com/remindhq>
> >
>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Re: Setting different timeouts for savepoints and checkpoints

2021-01-18 Thread Rex Fenley
Wups, my bad, I should have refreshed. I will close my bug report.

On Mon, Jan 18, 2021 at 9:16 AM Rex Fenley  wrote:

> Here you go https://issues.apache.org/jira/browse/FLINK-21014 .
>
> Thanks!
>
> On Mon, Jan 18, 2021 at 1:28 AM Timo Walther  wrote:
>
>> Hi Rex,
>>
>> feel free to open an issue for this. I could also imagine that
>> checkpoints and savepoints will further divert from each other and a
>> having different timeout might be reasonable.
>>
>> Regards,
>> Timo
>>
>>
>> On 17.01.21 02:43, Rex Fenley wrote:
>> > Thanks for the quick response.
>> >
>> > Is this something that can be added as a feature request? Given that
>> the
>> > time it takes to restore from either is different, and the semantics
>> are
>> > slightly different, it seems like they should have completely separate
>> > configurable timeouts.
>> >
>> > Thanks!
>> >
>> >
>> > On Sat, Jan 16, 2021 at 2:43 PM Khachatryan Roman
>> > mailto:khachatryan.ro...@gmail.com>>
>> wrote:
>> >
>> > Hi Rex,
>> >
>> > Unfortunately not: the same timeout value is used both for
>> > savepoints and checkpoints.
>> >
>> > Regards,
>> > Roman
>> >
>> >
>> > On Sat, Jan 16, 2021 at 9:42 AM Rex Fenley > > <mailto:r...@remind101.com>> wrote:
>> >
>> > Hello,
>> >
>> > I'm wondering if there's a way to set different timeouts for
>> > savepoints and checkpoints. Our savepoints can take a number of
>> > hours to complete, whereas incremental checkpoints at their
>> > slowest take around 10 min. We'd like to timeout a checkpoint on
>> > a significantly smaller duration than a savepoint.
>> >
>> > Thanks!
>> >
>> > --
>> >
>> > Rex Fenley|Software Engineer - Mobile and Backend
>> >
>> >
>> >     Remind.com <https://www.remind.com/>| BLOG
>> > <http://blog.remind.com/> | FOLLOW US
>> > <https://twitter.com/remindhq> | LIKE US
>> > <https://www.facebook.com/remindhq>
>> >
>> >
>> >
>> > --
>> >
>> > Rex Fenley|Software Engineer - Mobile and Backend
>> >
>> >
>> > Remind.com <https://www.remind.com/>| BLOG <http://blog.remind.com/> |
>> > FOLLOW US <https://twitter.com/remindhq> | LIKE US
>> > <https://www.facebook.com/remindhq>
>> >
>>
>>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>


-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Re: Setting different timeouts for savepoints and checkpoints

2021-01-18 Thread Rex Fenley
Here you go https://issues.apache.org/jira/browse/FLINK-21014 .

Thanks!

On Mon, Jan 18, 2021 at 1:28 AM Timo Walther  wrote:

> Hi Rex,
>
> feel free to open an issue for this. I could also imagine that
> checkpoints and savepoints will further divert from each other and a
> having different timeout might be reasonable.
>
> Regards,
> Timo
>
>
> On 17.01.21 02:43, Rex Fenley wrote:
> > Thanks for the quick response.
> >
> > Is this something that can be added as a feature request? Given that the
> > time it takes to restore from either is different, and the semantics are
> > slightly different, it seems like they should have completely separate
> > configurable timeouts.
> >
> > Thanks!
> >
> >
> > On Sat, Jan 16, 2021 at 2:43 PM Khachatryan Roman
> > mailto:khachatryan.ro...@gmail.com>>
> wrote:
> >
> > Hi Rex,
> >
> > Unfortunately not: the same timeout value is used both for
> > savepoints and checkpoints.
> >
> > Regards,
> > Roman
> >
> >
> > On Sat, Jan 16, 2021 at 9:42 AM Rex Fenley  > <mailto:r...@remind101.com>> wrote:
> >
> > Hello,
> >
> > I'm wondering if there's a way to set different timeouts for
> > savepoints and checkpoints. Our savepoints can take a number of
> > hours to complete, whereas incremental checkpoints at their
> > slowest take around 10 min. We'd like to timeout a checkpoint on
> > a significantly smaller duration than a savepoint.
> >
> > Thanks!
> >
> > --
> >
> > Rex Fenley|Software Engineer - Mobile and Backend
> >
> >
> > Remind.com <https://www.remind.com/>| BLOG
> > <http://blog.remind.com/> | FOLLOW US
> > <https://twitter.com/remindhq> | LIKE US
> > <https://www.facebook.com/remindhq>
> >
> >
> >
> > --
> >
> > Rex Fenley|Software Engineer - Mobile and Backend
> >
> >
> > Remind.com <https://www.remind.com/>| BLOG <http://blog.remind.com/> |
> > FOLLOW US <https://twitter.com/remindhq> | LIKE US
> > <https://www.facebook.com/remindhq>
> >
>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Re: Restoring from a checkpoint or savepoint on a different Kafka consumer group

2021-01-18 Thread Rex Fenley
Thank you,

Some parts that stick out
>The Flink Kafka Consumer allows configuring the behaviour of how offsets
are committed back to Kafka brokers. Note that the Flink Kafka Consumer
does not rely on the committed offsets for fault tolerance guarantees. The
committed offsets are only a means to expose the consumer’s progress for
monitoring purposes.

I read this as, "The offsets committed to Kafka are ignored, the offsets
committed within a checkpoint are used".

>With Flink’s checkpointing enabled, the Flink Kafka Consumer will consume
records from a topic and periodically checkpoint all its Kafka offsets,
together with the state of other operations. In case of a job failure,
Flink will restore the streaming program to the state of the latest
checkpoint and re-consume the records from Kafka, starting from the offsets
that were stored in the checkpoint.

This seems to say something similar.

So from the sounds of things, regardless of the consumer group's offsets,
it will always start from a checkpoint or savepoints offsets if there are
some (unless checkpointing offsets is turned off).

Is this interpretation correct?

Thanks!


On Mon, Jan 18, 2021 at 3:23 AM Piotr Nowojski  wrote:

> Hi Rex,
>
> I believe this section answers your question [1]
>
> Piotrek
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-start-position-configuration
>
> pon., 18 sty 2021 o 09:00 赵一旦  napisał(a):
>
>> If you changed the consumer group in your new job, the group id will be
>> the new one you set.
>> The job will continue to consumer the topics from the
>> savepoint/checkpoint you specified no matter whether the group id is the
>> original one?
>>
>> Rex Fenley  于2021年1月18日周一 下午12:53写道:
>>
>>> Hello,
>>>
>>> When using the Kafka consumer connector, if we restore a from a
>>> checkpoint or savepoint using a differently named consumer group than the
>>> one we originally ran a job with will it still pick up exactly where it
>>> left off or are you locked into using the same consumer group as before?
>>>
>>> Thanks!
>>>
>>> --
>>>
>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>
>>>
>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>> <https://www.facebook.com/remindhq>
>>>
>>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Re: Restoring from a savepoint, constraining factors

2021-01-18 Thread Rex Fenley
This is great information, thank you. It does look like task local recovery
only works for checkpoints, however we do want to bring down our recovery
times so this is useful.

I'm wondering what all is involved with savepoints though too. Savepoint
restoration must have some repartitioning step too I'd imagine which seems
like it could be fairly involved? Anything else I'm missing?

Thanks!

On Mon, Jan 18, 2021 at 2:49 AM Piotr Nowojski  wrote:

> Hi Rex,
>
> Good that you have found the source of your problem and thanks for
> reporting it back.
>
> Regarding your question about the recovery steps (ignoring scheduling and
> deploying). I think it depends on the used state backend. From your
> other emails I see you are using RocksDB, so I believe this is the big
> picture how it works in the RocksDB case:
>
> 1. Relevant state files are loaded from the DFS to local disks of a
> TaskManager [1].
> 2. I presume RocksDB needs to load at least some meta data from those
> local files in order to finish the recovery process (I doubt it but maybe
> it also needs to load some actual data).
> 3. Task can start processing the records.
>
> Best,
> Piotrek
>
> [1] This step can be avoided if you are using local recovery
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#task-local-recovery
>
> sob., 16 sty 2021 o 06:15 Rex Fenley  napisał(a):
>
>> Aftering looking through lots of graphs and AWS limits. I've come to the
>> conclusion that we're hitting limits on our disk writes. I'm guessing this
>> is backpressuring against the entire restore process. I'm still very
>> curious about all the steps involved in savepoint restoration though!
>>
>> On Fri, Jan 15, 2021 at 7:50 PM Rex Fenley  wrote:
>>
>>> Hello,
>>>
>>> We have a savepoint that's ~0.5 TiB in size. When we try to restore from
>>> it, we time out because it takes too long (write now checkpoint timeouts
>>> are set to 2 hours which is way above where we want them already).
>>>
>>> I'm curious if it needs to download the entire savepoint to continue.
>>> Or, for further education, what are all the operations that take place
>>> before a job is restored from a savepoint?
>>>
>>> Additionally, the network seems to be a big bottleneck. Our network
>>> should be operating in the GiB/s range per instance, but seems to operate
>>> between 70-100MiB per second when retrieving a savepoint. Are there any
>>> constraining factors in Flink's design that would slow down the network
>>> download of a savepoint this much (from S3)?
>>>
>>> Thanks!
>>>
>>> --
>>>
>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>
>>>
>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>> <https://www.facebook.com/remindhq>
>>>
>>
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> <https://www.facebook.com/remindhq>
>>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Restoring from a checkpoint or savepoint on a different Kafka consumer group

2021-01-17 Thread Rex Fenley
Hello,

When using the Kafka consumer connector, if we restore a from a checkpoint
or savepoint using a differently named consumer group than the one we
originally ran a job with will it still pick up exactly where it left off
or are you locked into using the same consumer group as before?

Thanks!

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Re: Number of parallel connections for Elasticsearch Connector

2021-01-17 Thread Rex Fenley
Great, thanks!

On Sun, Jan 17, 2021 at 6:24 PM Yangze Guo  wrote:

> Hi, Rex.
>
> > How many connections does the ES connector use to write to Elasticsearch?
> I think the number is equal to your parallelism. Each subtask of an
> Elasticsearch sink will have its own separate Bulk Processor as both
> the Client and the Bulk Processor are class private[1]. The subtasks
> will be placed into different slots and have their own Elasticsearch
> sink instance.
>
> [1]
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java#L204
> .
>
> Best,
> Yangze Guo
>
> On Sun, Jan 17, 2021 at 11:33 AM Rex Fenley  wrote:
> >
> > I found the following, indicating that there is no concurrency for the
> Elasticsearch Connector
> https://github.com/apache/flink/blob/97bfd049951f8d52a2e0aed14265074c4255ead0/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java#L382
> >
> > Does each subtask of an Elasticsearch sink have it's own separate Bulk
> Processor to allow for parallel bulk writes?
> >
> > Thanks!
> >
> > On Sat, Jan 16, 2021 at 10:33 AM Rex Fenley  wrote:
> >>
> >> Hello,
> >>
> >> How many connections does the ES connector use to write to
> Elasticsearch? We have a single machine with 16 vCPUs and parallelism of 4
> running our job, with -p 4 I'd expect there to be 4 parallel bulk request
> writers / connections to Elasticsearch. Is there a place in the code to
> confirm this?
> >>
> >> Thanks!
> >>
> >> --
> >>
> >> Rex Fenley  |  Software Engineer - Mobile and Backend
> >>
> >>
> >> Remind.com |  BLOG  |  FOLLOW US  |  LIKE US
> >
> >
> >
> > --
> >
> > Rex Fenley  |  Software Engineer - Mobile and Backend
> >
> >
> > Remind.com |  BLOG  |  FOLLOW US  |  LIKE US
>


-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Re: Number of parallel connections for Elasticsearch Connector

2021-01-16 Thread Rex Fenley
I found the following, indicating that there is no concurrency for the
Elasticsearch Connector
https://github.com/apache/flink/blob/97bfd049951f8d52a2e0aed14265074c4255ead0/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java#L382

Does each subtask of an Elasticsearch sink have it's own separate Bulk
Processor to allow for parallel bulk writes?

Thanks!

On Sat, Jan 16, 2021 at 10:33 AM Rex Fenley  wrote:

> Hello,
>
> How many connections does the ES connector use to write to Elasticsearch?
> We have a single machine with 16 vCPUs and parallelism of 4 running our
> job, with -p 4 I'd expect there to be 4 parallel bulk request writers /
> connections to Elasticsearch. Is there a place in the code to confirm this?
>
> Thanks!
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>


-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Re: Setting different timeouts for savepoints and checkpoints

2021-01-16 Thread Rex Fenley
Thanks for the quick response.

Is this something that can be added as a feature request? Given that the
time it takes to restore from either is different, and the semantics are
slightly different, it seems like they should have completely separate
configurable timeouts.

Thanks!


On Sat, Jan 16, 2021 at 2:43 PM Khachatryan Roman <
khachatryan.ro...@gmail.com> wrote:

> Hi Rex,
>
> Unfortunately not: the same timeout value is used both for savepoints and
> checkpoints.
>
> Regards,
> Roman
>
>
> On Sat, Jan 16, 2021 at 9:42 AM Rex Fenley  wrote:
>
>> Hello,
>>
>> I'm wondering if there's a way to set different timeouts for savepoints
>> and checkpoints. Our savepoints can take a number of hours to complete,
>> whereas incremental checkpoints at their slowest take around 10 min. We'd
>> like to timeout a checkpoint on a significantly smaller duration than a
>> savepoint.
>>
>> Thanks!
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> <https://www.facebook.com/remindhq>
>>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Why use ListView?

2021-01-16 Thread Rex Fenley
Hello,

In the recent version of Flink docs I read the following [1]:
> If an accumulator needs to store large amounts of data,
org.apache.flink.table.api.dataview.ListView and
org.apache.flink.table.api.dataview.MapView provide advanced features for
leveraging Flink’s state backends in unbounded data scenarios. Please see
the docs of the corresponding classes for more information about this
advanced feature.

Our job has unbounded state from Debezium/Kafka, uses RocksDB, and we have
a number of Aggregators like the following, which group a set of ids by
some foreign key "group_id". The sets are usually 10-100 ids in size, but
at the largest the sets could at some theoretical point get into the tens
of thousands of ids (right now largest sets are ~2000 ids).

table.groupBy($"group_id")
.aggregate(
new IDsAgg()(
$"member_id"
) as ("member_ids")
)
.select($"group_id", $"member_ids")

case class IDsAcc(
var IDs: mutable.Set[Long]
)
class IDsAgg extends AggregateFunction[Row, IDsAcc] {

override def createAccumulator(): IDsAcc =
IDsAcc(mutable.Set())

def accumulate(
acc: IDsAcc,
ID: Long
): Unit = {
acc.IDs.add(ID)
}

def retract(acc: IDsAcc, ID: Long): Unit = {
acc.IDs.remove(ID)
}

def resetAccumulator(acc: IDsAcc): Unit = {
acc.IDs = mutable.Set()
}

override def getValue(acc: IDsAcc): Row = {
Row.of(acc.IDs.toArray)
}

override def getResultType: TypeInformation[Row] = {
new RowTypeInfo(
createTypeInformation[Array[Long]]
)
}
}

I read the docs [2] but I don't see it really say anything about why
ListView is better than just using a Set or Array.
If we were to move from a Set to a ListView what advantages might we see in
these Aggregates?

I also noticed that ListView existed in 1.11 (we're on 1.11.2), did we
simply miss this feature? Does it work for 1.11.x too?

Thanks!

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/udfs.html#mandatory-and-optional-methods
[2]
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/table/api/dataview/ListView.html

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Flink ID hashing

2021-01-16 Thread Rex Fenley
Hello,

I'm wondering what sort of algorithm flink uses to map an Integer ID to a
subtask when distributing data. Also, what operators from the TableAPI
cause data to be redistributed? I know Joins will, what about Aggregates,
Sources, Filters?

Thanks!

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Number of parallel connections for Elasticsearch Connector

2021-01-16 Thread Rex Fenley
Hello,

How many connections does the ES connector use to write to Elasticsearch?
We have a single machine with 16 vCPUs and parallelism of 4 running our
job, with -p 4 I'd expect there to be 4 parallel bulk request writers /
connections to Elasticsearch. Is there a place in the code to confirm this?

Thanks!

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Setting different timeouts for savepoints and checkpoints

2021-01-16 Thread Rex Fenley
Hello,

I'm wondering if there's a way to set different timeouts for savepoints and
checkpoints. Our savepoints can take a number of hours to complete, whereas
incremental checkpoints at their slowest take around 10 min. We'd like to
timeout a checkpoint on a significantly smaller duration than a savepoint.

Thanks!

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Re: Restoring from a savepoint, constraining factors

2021-01-15 Thread Rex Fenley
Aftering looking through lots of graphs and AWS limits. I've come to the
conclusion that we're hitting limits on our disk writes. I'm guessing this
is backpressuring against the entire restore process. I'm still very
curious about all the steps involved in savepoint restoration though!

On Fri, Jan 15, 2021 at 7:50 PM Rex Fenley  wrote:

> Hello,
>
> We have a savepoint that's ~0.5 TiB in size. When we try to restore from
> it, we time out because it takes too long (write now checkpoint timeouts
> are set to 2 hours which is way above where we want them already).
>
> I'm curious if it needs to download the entire savepoint to continue. Or,
> for further education, what are all the operations that take place before a
> job is restored from a savepoint?
>
> Additionally, the network seems to be a big bottleneck. Our network should
> be operating in the GiB/s range per instance, but seems to operate between
> 70-100MiB per second when retrieving a savepoint. Are there any
> constraining factors in Flink's design that would slow down the network
> download of a savepoint this much (from S3)?
>
> Thanks!
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>


-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Restoring from a savepoint, constraining factors

2021-01-15 Thread Rex Fenley
Hello,

We have a savepoint that's ~0.5 TiB in size. When we try to restore from
it, we time out because it takes too long (write now checkpoint timeouts
are set to 2 hours which is way above where we want them already).

I'm curious if it needs to download the entire savepoint to continue. Or,
for further education, what are all the operations that take place before a
job is restored from a savepoint?

Additionally, the network seems to be a big bottleneck. Our network should
be operating in the GiB/s range per instance, but seems to operate between
70-100MiB per second when retrieving a savepoint. Are there any
constraining factors in Flink's design that would slow down the network
download of a savepoint this much (from S3)?

Thanks!

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Re: Elasticsearch config maxes can not be disabled

2021-01-15 Thread Rex Fenley
Yes, use the same SQL and change '0's to '-1'. We received "Caused by:
java.lang.IllegalArgumentException: Could not parse value '-1' for key
'sink.bulk-flush.max-size'."

On Fri, Jan 15, 2021 at 6:04 AM Dawid Wysakowicz 
wrote:

> Hi Rex,
>
> As I said in my previous email the documentation for
> sink.bulk-flush.max-actions is wrong. You should be able to disable it with
> -1. I've just checked it on the 1.11.2 tag and it seems to be working just
> fine with:
>
> CREATE TABLE esTable (
>
> a BIGINT NOT NULL,
> b TIME,
> c STRING NOT NULL,
> d FLOAT,
> e TINYINT NOT NULL,
> f DATE,
> g TIMESTAMP NOT NULL,
> h as a + 2,
> PRIMARY KEY (a, g) NOT ENFORCED
> )
> WITH (
> 'connector'='elasticsearch-6',
> 'index'='table-api',
> 'document-type'='MyType',
> 'hosts'='http://127.0.0.1:9200',
> 'sink.flush-on-checkpoint'='false',
>     'sink.bulk-flush.max-actions'='-1',
> 'sink.bulk-flush.max-size'='0'
> )
>
> If it still does not work for you with -1 could you share an example how I
> can reproduce the problem.
>
> Best,
>
> Dawid
> On 14/01/2021 18:08, Rex Fenley wrote:
>
> Flink 1.11.2
>
> CREATE TABLE sink_es (
> ...
> ) WITH (
> 'connector' = 'elasticsearch-7',
> 'hosts' = '${sys:proxyEnv.ELASTICSEARCH_HOSTS}',
> 'index' = '${sys:graph.flink.index_name}',
> 'format' = 'json',
> 'sink.bulk-flush.max-actions' = '0',
> 'sink.bulk-flush.max-size' = '0',
> 'sink.bulk-flush.interval' = '1s',
> 'sink.bulk-flush.backoff.delay' = '1s',
> 'sink.bulk-flush.backoff.max-retries' = '4',
> 'sink.bulk-flush.backoff.strategy' = 'CONSTANT'
> )
>
> On Thu, Jan 14, 2021 at 4:16 AM Dawid Wysakowicz 
> wrote:
>
>> Hi,
>>
>> First of all, what Flink versions are you using?
>>
>> You are right it is a mistake in the documentation of the
>> sink.bulk-flush.max-actions. It should say: Can be set to '-1' to
>> disable it. I created a ticket[1] to track that. And as far as I can tell
>> and I quickly checked that it should work. As for the
>> sink.bulk-flush.max-size you should be able to disable it with a value of
>> '0'.
>>
>> Could you share with us how do you use the connector? Could you also
>> share the full stack trace for the exception you're getting? Are you
>> creating the table with a CREATE statement?
>>
>> Best,
>>
>> Dawid
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-20979
>> On 13/01/2021 20:10, Rex Fenley wrote:
>>
>> Hello,
>>
>> It doesn't seem like we can disable max actions and max size for
>> Elasticsearch connector.
>>
>> Docs:
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/elasticsearch.html#sink-bulk-flush-max-actions
>> sink.bulk-flush.max-actions optional 1000 Integer Maximum number of
>> buffered actions per bulk request. Can be set to '0' to disable it.
>> sink.bulk-flush.max-size optional 2mb MemorySize Maximum size in memory
>> of buffered actions per bulk request. Must be in MB granularity. Can be set
>> to '0' to disable it.
>> Reality:
>>
>> org.apache.flink.client.program.ProgramInvocationException: The main
>> method caused an error: Max number of buffered actions must be larger than
>> 0.
>>
>> ES code looks like -1 is actually the value for disabling, but when I use
>> -1:
>> Caused by: java.lang.IllegalArgumentException: Could not parse value '-1'
>> for key 'sink.bulk-flush.max-size'.
>>
>> How can I disable these two settings?
>>
>> Thanks!
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> <https://www.facebook.com/remindhq>
>>
>>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Re: Elasticsearch config maxes can not be disabled

2021-01-14 Thread Rex Fenley
Flink 1.11.2

CREATE TABLE sink_es (
...
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = '${sys:proxyEnv.ELASTICSEARCH_HOSTS}',
'index' = '${sys:graph.flink.index_name}',
'format' = 'json',
'sink.bulk-flush.max-actions' = '0',
'sink.bulk-flush.max-size' = '0',
'sink.bulk-flush.interval' = '1s',
'sink.bulk-flush.backoff.delay' = '1s',
'sink.bulk-flush.backoff.max-retries' = '4',
'sink.bulk-flush.backoff.strategy' = 'CONSTANT'
)

On Thu, Jan 14, 2021 at 4:16 AM Dawid Wysakowicz 
wrote:

> Hi,
>
> First of all, what Flink versions are you using?
>
> You are right it is a mistake in the documentation of the
> sink.bulk-flush.max-actions. It should say: Can be set to '-1' to disable
> it. I created a ticket[1] to track that. And as far as I can tell and I
> quickly checked that it should work. As for the sink.bulk-flush.max-size
> you should be able to disable it with a value of '0'.
>
> Could you share with us how do you use the connector? Could you also share
> the full stack trace for the exception you're getting? Are you creating the
> table with a CREATE statement?
>
> Best,
>
> Dawid
>
> [1] https://issues.apache.org/jira/browse/FLINK-20979
> On 13/01/2021 20:10, Rex Fenley wrote:
>
> Hello,
>
> It doesn't seem like we can disable max actions and max size for
> Elasticsearch connector.
>
> Docs:
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/elasticsearch.html#sink-bulk-flush-max-actions
> sink.bulk-flush.max-actions optional 1000 Integer Maximum number of
> buffered actions per bulk request. Can be set to '0' to disable it.
> sink.bulk-flush.max-size optional 2mb MemorySize Maximum size in memory
> of buffered actions per bulk request. Must be in MB granularity. Can be set
> to '0' to disable it.
> Reality:
>
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: Max number of buffered actions must be larger than
> 0.
>
> ES code looks like -1 is actually the value for disabling, but when I use
> -1:
> Caused by: java.lang.IllegalArgumentException: Could not parse value '-1'
> for key 'sink.bulk-flush.max-size'.
>
> How can I disable these two settings?
>
> Thanks!
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Re: Interpretting checkpoint data size

2021-01-13 Thread Rex Fenley
One thing that did just come to mind is possibly every time I'm submitting
a job from a previous checkpoint with different settings, it has to slowly
re-checkpoint all the previous data. Which means there would be some warm
up time before things functioned in a steady state. Is this possible?

On Wed, Jan 13, 2021 at 6:09 PM Rex Fenley  wrote:

> Hello,
>
> I have incremental checkpoints turned on and there seems to be no relation
> at all to how often the job checkpoints and how much data exists. Whether
> checkpoints are set to every 1 min or every 5 seconds they're still around
> 5 GB in size and checkpoint times are still in minutes. I would expect that
> if the system only runs for 5s then it would have significantly less data
> to checkpoint than if it runs for 1 min.
>
> Would someone mind clarifying the meaning of checkpoint data size when
> incremental checkpoints are turned on? Possibly I'm misinterpreting it.
>
> Thank you!
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>


-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Interpretting checkpoint data size

2021-01-13 Thread Rex Fenley
Hello,

I have incremental checkpoints turned on and there seems to be no relation
at all to how often the job checkpoints and how much data exists. Whether
checkpoints are set to every 1 min or every 5 seconds they're still around
5 GB in size and checkpoint times are still in minutes. I would expect that
if the system only runs for 5s then it would have significantly less data
to checkpoint than if it runs for 1 min.

Would someone mind clarifying the meaning of checkpoint data size when
incremental checkpoints are turned on? Possibly I'm misinterpreting it.

Thank you!

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Elasticsearch config maxes can not be disabled

2021-01-13 Thread Rex Fenley
Hello,

It doesn't seem like we can disable max actions and max size for
Elasticsearch connector.

Docs:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/elasticsearch.html#sink-bulk-flush-max-actions
sink.bulk-flush.max-actions optional 1000 Integer Maximum number of
buffered actions per bulk request. Can be set to '0' to disable it.
sink.bulk-flush.max-size optional 2mb MemorySize Maximum size in memory of
buffered actions per bulk request. Must be in MB granularity. Can be set to
'0' to disable it.
Reality:

org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Max number of buffered actions must be larger than 0.

ES code looks like -1 is actually the value for disabling, but when I use
-1:
Caused by: java.lang.IllegalArgumentException: Could not parse value '-1'
for key 'sink.bulk-flush.max-size'.

How can I disable these two settings?

Thanks!

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Flink Elasticsearch Async

2021-01-13 Thread Rex Fenley
Hello,

Looking at this documentation
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/elasticsearch.html#sink-bulk-flush-interval
it says

> The interval to flush buffered actions. Can be set to '0' to disable it.
Note, both 'sink.bulk-flush.max-size' and 'sink.bulk-flush.max-actions' can
be set to '0' with the flush interval set allowing for complete async
processing of buffered actions.

Does this imply that if either max size and max actions are set to not 0
that bulk indexing is not asynchronous?

We've been investigating socket time outs for some time now and are looking
for a way to fix. If things are synchronous that may be a contributing
factor.

Thanks

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Re: Dead code in ES Sink

2021-01-13 Thread Rex Fenley
>The actual field is never used but it will be used to check the allowed
options when verifying what users specify via "string" options.

Are you saying that this option does get passed along to Elasticsearch
still or that it's just arbitrarily validated? According to [1] it's been
deprecated in ES 6 and removed in ES 7.

[1] https://github.com/elastic/elasticsearch/pull/38085

On Wed, Jan 13, 2021 at 12:50 AM Aljoscha Krettek 
wrote:

> On 2021/01/12 15:04, Rex Fenley wrote:
> >[2]
> >
> https://github.com/apache/flink/blob/97bfd049951f8d52a2e0aed14265074c4255ead0/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchOptions.java#L131
> >
> >Should [2] be removed?
>
> The link seems to not work anymore but I'm guessing you're referring to
> `CONNECTION_MAX_RETRY_TIMEOUT_OPTION`. This is used in the
> `*DynamicSinkFactory` classes, such as [1]. These can be used when
> defining Table API/SQL sources using DDL or the programmatic API. The
> actual field is never used but it will be used to check the allowed
> options when verifying what users specify via "string" options.
>
> [1]
>
> https://github.com/apache/flink/blob/ee653778689023ddfdf007d5bde1daad8fbbc081/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkFactory.java#L98
>


-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Re: Configuring Elasticsearch Timeouts

2021-01-12 Thread Rex Fenley
For further clarity, we're on Elasticsearch 7.

On Tue, Jan 12, 2021 at 4:53 PM Rex Fenley  wrote:

> Correction, by HTTP timeout I mean BulkProcessor timeout.
>
> On Tue, Jan 12, 2021 at 4:40 PM Rex Fenley  wrote:
>
>> Hello,
>>
>> For the past number of days we've been investigating continuous
>> SocketTimeoutException like the following:
>>
>> 2021-01-12 20:53:42,105 DEBUG org.elasticsearch.client.RestClient
>>- request [POST :/_bulk?timeout=1m 
>> <http://graph-jvm-groups-es.empire/_bulk?timeout=1m>] failed
>> java.net.SocketTimeoutException: 30,000 milliseconds timeout on connection 
>> http-outgoing-2 [ACTIVE]
>> at java.base/java.lang.Thread.run(Thread.java:829)
>> 2021-01-12 20:53:42,106 DEBUG org.elasticsearch.client.RestClient
>>- added [[host=: 
>> <http://graph-jvm-groups-es.empire/>]] to blacklist
>>
>> It appears that we're hitting a socket timeout and then the client adds
>> our ES host to a blacklist which therefore fails any further request to ES
>> and eventually fails our job.
>>
>> Following from information we've gathered over the internet, we should be
>> able to set the SocketTimeout to something higher than the HTTP timeout and
>> it may fix the behavior we're seeing. However, we have not found a way to
>> configure SocketTimeout from flink, what we want to set can be found here
>> [1]. How do we set this from the Elasticsearch SQL connector?
>>
>> [1]
>> https://github.com/elastic/elasticsearch/blob/67be92c72386848ef52f029248b350ddaba0fd7e/client/rest/src/main/java/org/elasticsearch/client/RestClientBuilder.java#L213
>>
>> Thanks!
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> <https://www.facebook.com/remindhq>
>>
>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>


-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Re: Configuring Elasticsearch Timeouts

2021-01-12 Thread Rex Fenley
Correction, by HTTP timeout I mean BulkProcessor timeout.

On Tue, Jan 12, 2021 at 4:40 PM Rex Fenley  wrote:

> Hello,
>
> For the past number of days we've been investigating continuous
> SocketTimeoutException like the following:
>
> 2021-01-12 20:53:42,105 DEBUG org.elasticsearch.client.RestClient 
>   - request [POST :/_bulk?timeout=1m 
> <http://graph-jvm-groups-es.empire/_bulk?timeout=1m>] failed
> java.net.SocketTimeoutException: 30,000 milliseconds timeout on connection 
> http-outgoing-2 [ACTIVE]
> at java.base/java.lang.Thread.run(Thread.java:829)
> 2021-01-12 20:53:42,106 DEBUG org.elasticsearch.client.RestClient 
>   - added [[host=: 
> <http://graph-jvm-groups-es.empire/>]] to blacklist
>
> It appears that we're hitting a socket timeout and then the client adds
> our ES host to a blacklist which therefore fails any further request to ES
> and eventually fails our job.
>
> Following from information we've gathered over the internet, we should be
> able to set the SocketTimeout to something higher than the HTTP timeout and
> it may fix the behavior we're seeing. However, we have not found a way to
> configure SocketTimeout from flink, what we want to set can be found here
> [1]. How do we set this from the Elasticsearch SQL connector?
>
> [1]
> https://github.com/elastic/elasticsearch/blob/67be92c72386848ef52f029248b350ddaba0fd7e/client/rest/src/main/java/org/elasticsearch/client/RestClientBuilder.java#L213
>
> Thanks!
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>


-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Configuring Elasticsearch Timeouts

2021-01-12 Thread Rex Fenley
Hello,

For the past number of days we've been investigating continuous
SocketTimeoutException like the following:

2021-01-12 20:53:42,105 DEBUG org.elasticsearch.client.RestClient
 - request [POST
:/_bulk?timeout=1m
<http://graph-jvm-groups-es.empire/_bulk?timeout=1m>] failed
java.net.SocketTimeoutException: 30,000 milliseconds timeout on
connection http-outgoing-2 [ACTIVE]
at java.base/java.lang.Thread.run(Thread.java:829)
2021-01-12 20:53:42,106 DEBUG org.elasticsearch.client.RestClient
 - added [[host=:
<http://graph-jvm-groups-es.empire/>]] to blacklist

It appears that we're hitting a socket timeout and then the client adds our
ES host to a blacklist which therefore fails any further request to ES and
eventually fails our job.

Following from information we've gathered over the internet, we should be
able to set the SocketTimeout to something higher than the HTTP timeout and
it may fix the behavior we're seeing. However, we have not found a way to
configure SocketTimeout from flink, what we want to set can be found here
[1]. How do we set this from the Elasticsearch SQL connector?

[1]
https://github.com/elastic/elasticsearch/blob/67be92c72386848ef52f029248b350ddaba0fd7e/client/rest/src/main/java/org/elasticsearch/client/RestClientBuilder.java#L213

Thanks!

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Dead code in ES Sink

2021-01-12 Thread Rex Fenley
Hi,

I was looking through ES options trying to diagnose a SocketTimeOut we're
receiving on the ES TableAPI connector. A bug report on elasticsearch's
github[1] indicated I might want to set max retry timeout really high, but
from what I can tell it's not even consumed anywhere [2].

[1] https://github.com/elastic/elasticsearch/issues/33342

[2]
https://github.com/apache/flink/blob/97bfd049951f8d52a2e0aed14265074c4255ead0/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchOptions.java#L131

Should [2] be removed?

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Re: Log length

2021-01-12 Thread Rex Fenley
Got it. Thanks!

On Tue, Jan 12, 2021 at 2:12 AM Chesnay Schepler  wrote:

> A normal FileAppender that does not do any rolling or limiting of the log
> file.
>
> On 1/12/2021 3:09 AM, Rex Fenley wrote:
>
> Thanks, I'll check them out. What's the default in 1.11.2?
>
> On Mon, Jan 11, 2021 at 4:26 PM Chesnay Schepler 
> wrote:
>
>> Have a look at RollingFileAppenders
>> <https://logging.apache.org/log4j/2.x/manual/appenders.html#RollingFileAppender>.
>> These have become the default in 1.12 .
>>
>> On 1/12/2021 12:53 AM, Rex Fenley wrote:
>>
>> Hello,
>>
>> We've collected over 150 MiB of log lines in 5 days. Is there a way to
>> tell Flink to eject log lines after a certain length so we don't eventually
>> run out of disk?
>>
>> ThankS1
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> <https://www.facebook.com/remindhq>
>>
>>
>>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>
>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Re: How does at least once checkpointing work

2021-01-12 Thread Rex Fenley
Thanks!

On Tue, Jan 12, 2021 at 1:56 AM Yuan Mei  wrote:

>
>> It sounds like any state which does not have some form of uniqueness
>> could end up being incorrect.
>>
>> at least once usually works if the use case can tolerate a certain level
> of duplication or the computation is idempotent.
>
>
>> Specifically in my case, all rows passing through the execution graph
>> have unique ids. However, any operator from groupby foreign_key then
>> sum/count could end up with an inconsistent count. Normally a retract (-1)
>> and then insert (+1) would keep the count correct, but with "at least once"
>> a retract (-1) may be from epoch n+1 and therefore played twice, making the
>> count equal less than it should actually be.
>>
>>
> Not completely sure how the "retract (-1)" and "insert (+1)" work in your
> case, but "input data" that leads to a state change (count/sum change) is
> possible to be played twice after a recovery.
>
>
>> Am I understanding this correctly?
>>
>> Thanks!
>>
>> On Mon, Jan 11, 2021 at 10:06 PM Yuan Mei  wrote:
>>
>>> Hey Rex,
>>>
>>> You probably will find the link below helpful; it explains how
>>> at-least-once (does not have alignment) is different
>>> from exactly-once(needs alignment). It also explains how the
>>> alignment phase is skipped in the at-least-once mode.
>>>
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/stateful-stream-processing.html#exactly-once-vs-at-least-once
>>>
>>> In a high level, at least once mode for a task with multiple input
>>> channels
>>> 1. does NOT block processing to wait for barriers from all inputs,
>>> meaning the task keeps processing data after receiving a barrier even if it
>>> has multiple inputs.
>>> 2. but still, a task takes a snapshot after seeing the checkpoint
>>> barrier from all input channels.
>>>
>>> In this way, a Snapshot N may contain data change coming from Epoch N+1;
>>> that's where "at least once" comes from.
>>>
>>> On Tue, Jan 12, 2021 at 1:03 PM Rex Fenley  wrote:
>>>
>>>> Hello,
>>>>
>>>> We're using the TableAPI and want to optimize for checkpoint alignment
>>>> times. We received some advice to possibly use at-least-once. I'd like to
>>>> understand how checkpointing works in at-least-once mode so I understand
>>>> the caveats and can evaluate whether or not that will work for us.
>>>>
>>>> Thanks!
>>>> --
>>>>
>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>
>>>>
>>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>>> <https://www.facebook.com/remindhq>
>>>>
>>>
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> <https://www.facebook.com/remindhq>
>>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Re: How does at least once checkpointing work

2021-01-11 Thread Rex Fenley
Thanks for the info.

It sounds like any state which does not have some form of uniqueness could
end up being incorrect.

Specifically in my case, all rows passing through the execution graph have
unique ids. However, any operator from groupby foreign_key then sum/count
could end up with an inconsistent count. Normally a retract (-1) and then
insert (+1) would keep the count correct, but with "at least once" a
retract (-1) may be from epoch n+1 and therefore played twice, making the
count equal less than it should actually be.

Am I understanding this correctly?

Thanks!

On Mon, Jan 11, 2021 at 10:06 PM Yuan Mei  wrote:

> Hey Rex,
>
> You probably will find the link below helpful; it explains how
> at-least-once (does not have alignment) is different
> from exactly-once(needs alignment). It also explains how the
> alignment phase is skipped in the at-least-once mode.
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/stateful-stream-processing.html#exactly-once-vs-at-least-once
>
> In a high level, at least once mode for a task with multiple input channels
> 1. does NOT block processing to wait for barriers from all inputs, meaning
> the task keeps processing data after receiving a barrier even if it has
> multiple inputs.
> 2. but still, a task takes a snapshot after seeing the checkpoint barrier
> from all input channels.
>
> In this way, a Snapshot N may contain data change coming from Epoch N+1;
> that's where "at least once" comes from.
>
> On Tue, Jan 12, 2021 at 1:03 PM Rex Fenley  wrote:
>
>> Hello,
>>
>> We're using the TableAPI and want to optimize for checkpoint alignment
>> times. We received some advice to possibly use at-least-once. I'd like to
>> understand how checkpointing works in at-least-once mode so I understand
>> the caveats and can evaluate whether or not that will work for us.
>>
>> Thanks!
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> <https://www.facebook.com/remindhq>
>>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


How does at least once checkpointing work

2021-01-11 Thread Rex Fenley
Hello,

We're using the TableAPI and want to optimize for checkpoint alignment
times. We received some advice to possibly use at-least-once. I'd like to
understand how checkpointing works in at-least-once mode so I understand
the caveats and can evaluate whether or not that will work for us.

Thanks!
-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Re: Restoring from checkpoint with different parallism

2021-01-11 Thread Rex Fenley
Thanks! Looks like you can't with unaligned checkpoints, which seems to
imply that you can with normal checkpointing mechanism.

On Mon, Jan 11, 2021 at 7:56 PM Yun Tang  wrote:

>  Hi Rex,
>
> I think doc [1] should have given some descriptions. Rescaling from
> previous checkpoint is still supported in current Flink version.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint
>
> Best
> Yun Tang
> ----------
> *From:* Rex Fenley 
> *Sent:* Tuesday, January 12, 2021 11:01
> *To:* user 
> *Cc:* Brad Davis 
> *Subject:* Restoring from checkpoint with different parallism
>
> Hello,
>
> When using the TableAPI, is it safe to run a flink job with a different
> `-p` parallelism while restoring from a checkpoint (not a savepoint) using
> `-s`, without any rescaling of actual machines? I don't seem to find this
> documented anywhere.
>
> Thanks!
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>


-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Restoring from checkpoint with different parallism

2021-01-11 Thread Rex Fenley
Hello,

When using the TableAPI, is it safe to run a flink job with a different
`-p` parallelism while restoring from a checkpoint (not a savepoint) using
`-s`, without any rescaling of actual machines? I don't seem to find this
documented anywhere.

Thanks!

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Re: Log length

2021-01-11 Thread Rex Fenley
Thanks, I'll check them out. What's the default in 1.11.2?

On Mon, Jan 11, 2021 at 4:26 PM Chesnay Schepler  wrote:

> Have a look at RollingFileAppenders
> <https://logging.apache.org/log4j/2.x/manual/appenders.html#RollingFileAppender>.
> These have become the default in 1.12 .
>
> On 1/12/2021 12:53 AM, Rex Fenley wrote:
>
> Hello,
>
> We've collected over 150 MiB of log lines in 5 days. Is there a way to
> tell Flink to eject log lines after a certain length so we don't eventually
> run out of disk?
>
> ThankS1
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>
>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Log length

2021-01-11 Thread Rex Fenley
Hello,

We've collected over 150 MiB of log lines in 5 days. Is there a way to tell
Flink to eject log lines after a certain length so we don't eventually run
out of disk?

ThankS1

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


See lag end-to-end

2020-12-20 Thread Rex Fenley
Hello,

Is there some proxy to seeing the relative time it takes for records to
make it through an entire job plan? Maybe checkpoint alignment time would
be a proxy for this? Is there metrics for that or something else that would
provide signal here?

Thanks!

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Re: Direct Memory full

2020-12-16 Thread Rex Fenley
Ok, thanks. I've said this in another thread but everything seems to go
completely idle during checkpoints while waiting on 1 operator, there's no
CPU usage, hardly any disk usage. I'll assume it's something else then.

On Wed, Dec 16, 2020 at 10:42 AM Robert Metzger  wrote:

> I don't think the direct memory is causing any performance bottlenecks.
> The backpressure is probably caused by something else (high CPU load, slow
> external system, data skew)
>
> On Wed, Dec 16, 2020 at 7:23 PM Steven Wu  wrote:
>
>> if you are running out of direct buffer, you will see 
>> "java.lang.OutOfMemoryError:
>> Direct buffer memory"
>>
>> On Wed, Dec 16, 2020 at 9:47 AM Rex Fenley  wrote:
>>
>>> Thanks for the reply. If what I'm understanding is correct there's no
>>> chance of an OOM, but since direct memory is for I/O, it being completely
>>> filled may be a sign of backpressure? Currently one of our operators takes
>>> a tremendous amount of time to align during a checkpoint. Could increasing
>>> direct memory help checkpointing by improving I/O performance across the
>>> whole plan (assuming I/O is at least part of the bottleneck)?
>>>
>>> On Tue, Dec 15, 2020 at 10:37 PM Robert Metzger 
>>> wrote:
>>>
>>>> Hey Rex,
>>>>
>>>> the direct memory is used for IO. There is no concept of direct memory
>>>> being "full". The only thing that can happen is that you have something in
>>>> place (Kubernetes, YARN) that limits / enforces the memory use of a Flink
>>>> process, and you run out of your memory allowance. The direct memory is
>>>> allocated outside of the heap's upper limit, thus you could run out of the
>>>> budget.
>>>> But Flink is usually properly configuring the memory limits correctly,
>>>> to avoid running into those situations.
>>>>
>>>> tl;dr: you don't need to worry about this.
>>>>
>>>>
>>>> On Tue, Dec 15, 2020 at 8:38 AM Rex Fenley  wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> Our job consistently shows
>>>>> Outside JVM
>>>>> Type
>>>>> Count
>>>>> Used
>>>>> Capacity
>>>>> *Direct* 32,839 1.03 GB 1.03 GB
>>>>> for direct memory.
>>>>>
>>>>> Is it typical for it to be full? What are the consequences that we may
>>>>> not be noticing of direct memory being full?
>>>>>
>>>>> Thanks!
>>>>>
>>>>> --
>>>>>
>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>
>>>>>
>>>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>>>> <https://www.facebook.com/remindhq>
>>>>>
>>>>
>>>
>>> --
>>>
>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>
>>>
>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>> <https://www.facebook.com/remindhq>
>>>
>>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Re: Direct Memory full

2020-12-16 Thread Rex Fenley
Thanks for the reply. If what I'm understanding is correct there's no
chance of an OOM, but since direct memory is for I/O, it being completely
filled may be a sign of backpressure? Currently one of our operators takes
a tremendous amount of time to align during a checkpoint. Could increasing
direct memory help checkpointing by improving I/O performance across the
whole plan (assuming I/O is at least part of the bottleneck)?

On Tue, Dec 15, 2020 at 10:37 PM Robert Metzger  wrote:

> Hey Rex,
>
> the direct memory is used for IO. There is no concept of direct memory
> being "full". The only thing that can happen is that you have something in
> place (Kubernetes, YARN) that limits / enforces the memory use of a Flink
> process, and you run out of your memory allowance. The direct memory is
> allocated outside of the heap's upper limit, thus you could run out of the
> budget.
> But Flink is usually properly configuring the memory limits correctly, to
> avoid running into those situations.
>
> tl;dr: you don't need to worry about this.
>
>
> On Tue, Dec 15, 2020 at 8:38 AM Rex Fenley  wrote:
>
>> Hello,
>>
>> Our job consistently shows
>> Outside JVM
>> Type
>> Count
>> Used
>> Capacity
>> *Direct* 32,839 1.03 GB 1.03 GB
>> for direct memory.
>>
>> Is it typical for it to be full? What are the consequences that we may
>> not be noticing of direct memory being full?
>>
>> Thanks!
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> <https://www.facebook.com/remindhq>
>>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Re: Get current kafka offsets for kafka sources

2020-12-15 Thread Rex Fenley
I'll give a look into that approach. Thanks

On Tue, Dec 15, 2020 at 9:48 PM Aeden Jameson 
wrote:

> My understanding is the FlinkKafkaConsumer is a wrapper around the
> Kafka consumer libraries so if you've set the group.id property you
> should be able to see the offsets with something like
> kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe
> --group my-flink-application.
>
> On Tue, Dec 15, 2020 at 9:39 PM Rex Fenley  wrote:
> >
> > Hi,
> >
> > Is there any way to fetch the current kafka topic offsets for the kafka
> sources for flink?
> >
> > Thanks!
> >
> > --
> >
> > Rex Fenley  |  Software Engineer - Mobile and Backend
> >
> >
> > Remind.com |  BLOG  |  FOLLOW US  |  LIKE US
>
>
>
> --
> Cheers,
> Aeden
>
> GitHub: https://github.com/aedenj
> Linked In: http://www.linkedin.com/in/aedenjameson
> Blah Blah Blah: http://www.twitter.com/daliful
>


-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Get current kafka offsets for kafka sources

2020-12-15 Thread Rex Fenley
Hi,

Is there any way to fetch the current kafka topic offsets for the kafka
sources for flink?

Thanks!

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Direct Memory full

2020-12-14 Thread Rex Fenley
Hello,

Our job consistently shows
Outside JVM
Type
Count
Used
Capacity
*Direct* 32,839 1.03 GB 1.03 GB
for direct memory.

Is it typical for it to be full? What are the consequences that we may not
be noticing of direct memory being full?

Thanks!

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Re: How to tell when flink is done restoring from a savepoint

2020-12-14 Thread Rex Fenley
Ok, thank you.

On Mon, Dec 14, 2020 at 2:07 AM Chesnay Schepler  wrote:

> I do not believe there is anything in the UI, or the logs for that matter,
> that give a definite answer to that.
> I suppose if a new checkpoint was completed then you can be sure the state
> was restored.
>
> FLINK-19013 <https://issues.apache.org/jira/browse/FLINK-19013>
>
> On 12/14/2020 6:40 AM, Rex Fenley wrote:
>
> Hi,
>
> Every time I restore from a savepoint it looks like it can take 20+ min to
> restore given the network i/o graphs I'm seeing. However, I can't find a
> way to see in the Flink UI if the savepoint is currently restoring or if
> it's finished to be sure. Is there a way to tell if flink is in the middle
> of restoring or not?
>
> Thanks
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>
>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


How to tell when flink is done restoring from a savepoint

2020-12-13 Thread Rex Fenley
Hi,

Every time I restore from a savepoint it looks like it can take 20+ min to
restore given the network i/o graphs I'm seeing. However, I can't find a
way to see in the Flink UI if the savepoint is currently restoring or if
it's finished to be sure. Is there a way to tell if flink is in the middle
of restoring or not?

Thanks

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Re: Disk usage during savepoints

2020-12-12 Thread Rex Fenley
Our job just crashed while running a savepoint, it ran out of disk space. I
inspected the disk and found the following:
-rw---  1 yarn   yarn   10139680768 Dec 12 22:14
presto-s3-10125099138119182412.tmp
-rw---  1 yarn   yarn   10071916544 Dec 12 22:14
presto-s3-10363672991943897408.tmp
-rw---  1 yarn   yarn   10276716544 Dec 12 22:14
presto-s3-12109236276406796165.tmp
-rw---  1 yarn   yarn9420505088 Dec 12 22:14
presto-s3-12584127250588531727.tmp
-rw---  1 yarn   yarn   10282295296 Dec 12 22:14
presto-s3-14352553379340277827.tmp
-rw---  1 yarn   yarn9463644160 Dec 12 22:14
presto-s3-14552162277341829612.tmp
-rw---  1 yarn   yarn   10447626240 Dec 12 22:14
presto-s3-14660072789354472725.tmp
-rw---  1 yarn   yarn9420906496 Dec 12 22:14
presto-s3-15982235495935827021.tmp
-rw---  1 yarn   yarn   10268663808 Dec 12 22:14
presto-s3-16188204950210407933.tmp
-rw---  1 yarn   yarn9309986816 Dec 12 22:14
presto-s3-17905518564307248197.tmp
-rw---  1 yarn   yarn9491578880 Dec 12 22:14
presto-s3-1839692230976299010.tmp
-rw---  1 yarn   yarn9308168192 Dec 12 22:14
presto-s3-2488279210497334939.tmp
-rw---  1 yarn   yarn9496961024 Dec 12 22:14
presto-s3-3559445453885492666.tmp
-rw---  1 yarn   yarn9467682816 Dec 12 22:14
presto-s3-4932415031914708987.tmp
-rw---  1 yarn   yarn   10042425344 Dec 12 22:14
presto-s3-5619769647590893462.tmp

So it appears that everything is being written, on one of our disks first,
locally before being written to S3.

Is there a way to tell flink or the os to divide this work up across
mounted disks so it's not all up to 1 disk?

Thanks!

On Sat, Dec 12, 2020 at 10:12 AM Rex Fenley  wrote:

> Also, small correction from earlier, there are 4 volumes of 256 GiB so
> that's 1 TiB total.
>
> On Sat, Dec 12, 2020 at 10:08 AM Rex Fenley  wrote:
>
>> Our first big test run we wanted to eliminate as many variables as
>> possible, so this is on 1 machine with 1 task manager and 1 parallelism.
>> The machine has 4 disks though, and as you can see, they mostly all use
>> around the same space for storage until a savepoint is triggered.
>>
>> Could it be that given a parallelism of 1, certain operator's states are
>> pinned to specific drives and as it's doing compaction it's moving
>> everything over to that drive into a single file?
>> In which case, would greater parallelism distribute the work more evenly?
>>
>> Thanks!
>>
>>
>> On Sat, Dec 12, 2020 at 2:35 AM David Anderson 
>> wrote:
>>
>>> RocksDB does do compaction in the background, and incremental
>>> checkpoints simply mirror to S3 the set of RocksDB SST files needed by the
>>> current set of checkpoints.
>>>
>>> However, unlike checkpoints, which can be incremental, savepoints are
>>> always full snapshots. As for why one host would have much more state than
>>> the others, perhaps you have significant key skew, and one task manager is
>>> ending up with more than its share of state to manage.
>>>
>>> Best,
>>> David
>>>
>>> On Sat, Dec 12, 2020 at 12:31 AM Rex Fenley  wrote:
>>>
>>>> Hi,
>>>>
>>>> We're using the Rocks state backend with incremental checkpoints and
>>>> savepoints setup for S3. We notice that every time we trigger a savepoint,
>>>> one of the local disks on our host explodes in disk usage.
>>>> What is it that savepoints are doing which would cause so much disk to
>>>> be used?
>>>> Our checkpoints are a few GiB in size, is the savepoint combining all
>>>> the checkpoints together at once on disk?
>>>> I figured that incremental checkpoints would compact over time in the
>>>> background, is that correct?
>>>>
>>>> Thanks
>>>>
>>>> Graph here. Parallelism is 1 and volume size is 256 GiB.
>>>> [image: Screen Shot 2020-12-11 at 2.59.59 PM.png]
>>>>
>>>>
>>>> --
>>>>
>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>
>>>>
>>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>>> <https://www.facebook.com/remindhq>
>>>>
>>>
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> <https://www.facebook.com/remindhq>
>>
>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>


-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


  1   2   3   >