The output is a bunch of files in parquet format. The thing reading them
would be presto, so I can really tell it to ignore some rows but not
others. Not to mention that the files would keep piling making sql queries
super slow.

On Fri, Jan 18, 2019, 10:01 AM Jamie Grier <jgr...@lyft.com wrote:

> Sorry my earlier comment should read: "It would just read all the files in
> order and NOT worry about which data rows are in which files"
>
> On Fri, Jan 18, 2019 at 10:00 AM Jamie Grier <jgr...@lyft.com> wrote:
>
>> Hmm..  I would have to look into the code for the StreamingFileSink more
>> closely to understand the concern but typically you should not be concerned
>> at all with *when* checkpoints happen.  They are meant to be a completely
>> asynchronous background process that has absolutely no bearing on
>> application semantics.  The output should be thought of as a stream rather
>> than a snapshot.
>>
>> Can you rework the downstream consumer of the output data such that you
>> don't have to worry about this?  It would just read all the files in order
>> and worry about which data rows are in which files.
>>
>> Anyway, maybe Kostas can add more since he wrote the StreamingFileSink
>> code.  I've cc'd him directly.
>>
>> -Jamie
>>
>>
>> On Fri, Jan 18, 2019 at 9:44 AM Cristian C <cristian.k...@gmail.com>
>> wrote:
>>
>>> Well, the problem is that, conceptually, the way I'm trying to approach
>>> this is ok. But in practice, it has some edge cases.
>>>
>>> So back to my original premise: if you both, trigger and checkpoint
>>> happen around the same time, there is a chance that the streaming file sink
>>> rolls the bucket BEFORE it has received all the data. In other words, it
>>> would create incomplete snapshots of the table.
>>>
>>> Keep in mind that every snapshot is written to a different folder. And
>>> they are supposed to represent the state of the whole table at a point in
>>> time.
>>>
>>> On Fri, Jan 18, 2019, 8:26 AM Jamie Grier <jgr...@lyft.com wrote:
>>>
>>>> Oh sorry..  Logically, since the ContinuousProcessingTimeTrigger never
>>>> PURGES but only FIRES what I said is semantically true.  The window
>>>> contents are never cleared.
>>>>
>>>> What I missed is that in this case since you're using a function that
>>>> incrementally reduces on the fly rather than processing all the data when
>>>> it's triggered your state is always kept to one element per key.  Your'e
>>>> correct but in general with non-incremental window functions the state
>>>> would grow unbounded in this configuration.
>>>>
>>>> So it looks like your approach should work just fine.
>>>>
>>>> -Jamie
>>>>
>>>>
>>>>
>>>> On Thu, Jan 17, 2019 at 10:18 PM knur <cristian.k...@gmail.com> wrote:
>>>>
>>>>> Hello Jamie.
>>>>>
>>>>> Thanks for taking a look at this. So, yes, I want to write only the
>>>>> last
>>>>> data for each key every X minutes. In other words, I want a snapshot
>>>>> of the
>>>>> whole database every X minutes.
>>>>>
>>>>> >  The issue is that the window never get's PURGED so the data just
>>>>> > continues to accumulate in the window.  This will grow without bound.
>>>>>
>>>>> The window not being purged does not necessarily mean that the data
>>>>> will be
>>>>> accumulated indefinitely. How so? Well, Flink has two mechanisms to
>>>>> remove
>>>>> data from a window: triggering a FIRE/FIRE_AND_PURGE or using an
>>>>> evictor.
>>>>>
>>>>> The reduce function has an implicit evictor that automatically removes
>>>>> events from the window pane that are no longer needed. i.e. it keeps in
>>>>> state only the element that was reduced. Here is an example:
>>>>>
>>>>>     env.socketTextStream("localhost", 9999)
>>>>>       .keyBy { it.first().toString() }
>>>>>       .window(GlobalWindows.create())
>>>>>
>>>>>
>>>>> .trigger(ContinuousProcessingTimeTrigger.of(WindowTime.seconds(seconds)))
>>>>>       .reduce { left, right ->
>>>>>         println("left: $left, right: $right")
>>>>>         if (left.length > right.length) {
>>>>>           left
>>>>>         } else {
>>>>>           right
>>>>>         }
>>>>>       }
>>>>>       .printToErr()
>>>>>
>>>>> For your claim to hold true, every time the trigger fires one would
>>>>> expect
>>>>> to see ALL the elements by a key being printed over and over again in
>>>>> the
>>>>> reduce function. However, if you run a job similar to this one in your
>>>>> lang
>>>>> of choice, you will notice that the print statement is effectively
>>>>> called
>>>>> only once per event per key.
>>>>>
>>>>> In fact, not using purge is intentional. Because I want to hold every
>>>>> record
>>>>> (the last one by its primary key) of the database in state so that I
>>>>> can
>>>>> write a snapshot of the whole database.
>>>>>
>>>>> So for instance, let's say my table has two columns: id and time. And
>>>>> I have
>>>>> the following events:
>>>>>
>>>>> 1,January
>>>>> 2,February
>>>>> 1,March
>>>>>
>>>>> I want to write to S3 two records: "1,March", and "2,February".
>>>>>
>>>>> Now, let's say two more events come into the stream:
>>>>>
>>>>> 3,April
>>>>> 1,June
>>>>>
>>>>> Then I want to write to S3 three records: "1,June", "2,February" and
>>>>> "3,April".
>>>>>
>>>>> In other words, I can't just purge the windows, because I would lose
>>>>> the
>>>>> record with id 2.
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Sent from:
>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>>>
>>>>

Reply via email to