Oops, sorry for not sending the reply to everyone
and thanks David for reposting it here.
Great to hear that you solved your issue!

Kostas



On Wed, Jan 15, 2020 at 1:57 PM David Magalhães <speeddra...@gmail.com> wrote:
>
> Sorry, I've only saw the replies today.
>
> Regarding my previous email,
>
>> Still, there is something missing in this solution to close a window for 
>> with a giving timeout, so it can write into the sink the last events if no 
>> more events are sent.
>
>
> I've fixed this using a custom trigger,
>
> val flag = ctx.getPartitionedState(valueStateDescriptor).value()
>
> // Flag only used to register one trigger per window. Flag is cleaned when 
> FIRE action is executed.
> if (!flag) {
>   val delay = window.getEnd - window.getStart
>   ctx.getPartitionedState(valueStateDescriptor).update(true)
>   ctx.registerProcessingTimeTimer(ctx.getCurrentProcessingTime + delay)
>   ctx.registerEventTimeTimer(window.maxTimestamp())
> }
>
> TriggerResult.CONTINUE
>
> Leonard, by "duplicated events" I mean store the same event on different 
> parquet files, since the file format was "part-X-Y". So, if I start to 
> process the same stream again (from a point in the past) I couldn't overwrite 
> the files with exactly the same name.
>
> I think I've read a blogpost about them (Pinterest), I will check the video.
>
> Kostas, replied to only me, I'm adding his response here.
>
>> Hi David,
>> I skimmed through the solution with the window before the sink.
>> If this solution fits your needs, I think you could:
>> 1)  just specify a BucketAssigner instead of writing a custom sink,
>> this will allow you to not lose any functionality from the
>> StreamingFileSink
>> 2)  for the timeout requirement, you could use a (keyed) process
>> function with map state to hold your event-time windows. The key will
>> be the window start (or interval) and you can register timers to fire
>> at the end of the window or after a certain period of inactivity. I
>> think that [1] can be a good starting point.
>> I hope this helps,
>> Kostas
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html
>
>
> I think I can only define partition name on BucketAssigner, because I don't 
> want to have many partition (currently I've accountId and yyyyMM (year and 
> month)). I've checked that on Flink 1.10 [1] we will have access to configure 
> a prefix and suffix for the filename, where I could add the day and hour to 
> the prefix, and when I needed to store again the same events I could start 
> from specific time (probably match with a Kafka offset) and remove the files 
> with prefix date newer than this time.
>
> The only scenario for this case is when for some reason Flink is writing bad 
> files (events with wrong information for some reason), that need to be stored 
> (processed) again.
>
> For 2), my implementation with the trigger solved this.
>
> [1] 
> https://github.com/apache/flink/blob/master/docs/dev/connectors/streamfile_sink.md
>
> On Tue, Jan 14, 2020 at 6:28 PM Till Rohrmann <trohrm...@apache.org> wrote:
>>
>> Hi David,
>>
>> I'm pulling in Kostas who worked on the StreamingFileSink and might be able 
>> to answer some of your questions.
>>
>> Cheers,
>> Till
>>
>> On Mon, Jan 13, 2020 at 2:45 PM Leonard Xu <xbjt...@gmail.com> wrote:
>>>
>>> Hi, David
>>>
>>> For you first description, I’m a little confused about duplicated records 
>>> when backfilling, could you describe your usage scenario/code more?
>>>
>>> I remembered a backfill user solution from Pinterest which is very similar 
>>> to yours and using Flink too[1], hope that can help you.
>>>
>>> Best,
>>> Leonard
>>>
>>> [1] 
>>> https://www.youtube.com/watch?v=3-X6FJ5JS4E&list=PLDX4T_cnKjD207Aa8b5CsZjc7Z_KRezGz&index=64
>>>
>>> 在 2020年1月10日,12:14,David Magalhães <speeddra...@gmail.com> 写道:
>>>
>>> Hi, I'm working for the first time with Flink and I'm trying to create 
>>> solution that will store events from Kafka into Parquet files in S3. This 
>>> also should support re-injection of events from Parquet files into a Kafka 
>>> topic.
>>>
>>> Here is the code with a simple usage of StreamingFileSink with BulkEncode 
>>> that will get the events and store in parquet files. The files will be 
>>> partition by account_id and year and month (yyyyMM). The issue with this 
>>> approach is when running the backfill from a certain point in time, it will 
>>> be hard to not generate duplicated events, since we will not override the 
>>> same files, as the filename is generate by 
>>> "part-<sub_task_id>-<sequencial_number>".
>>>
>>> To add predictability, I've used a tumbling window to aggregate multiple 
>>> GenericRecord, in order to write the parquet file with a list of them. For 
>>> that I've created a custom file sink, but I'm not sure of the properties I 
>>> am going to lose compared to the Streaming File Sink. Here is the code. 
>>> Still, there is something missing in this solution to close a window for 
>>> with a giving timeout, so it can write into the sink the last events if no 
>>> more events are sent.
>>>
>>> Another work around, would be create a StreamingFileSink with a RowEncoder, 
>>> and receive a List of GenericRecord, and create a custom Encoder with 
>>> AvroParquetWritter to write to a File. This way I have access to a custom 
>>> rolling policy. But this looks like truly inefficient. Here is the code.
>>>
>>> Am I overthinking this solution ? I'm know there are some issues (recently 
>>> closed) for the StreamingFileSink to support more custom rolling policies 
>>> in BulkEncode, like https://issues.apache.org/jira/browse/FLINK-13027, but 
>>> I just notice that now.
>>>
>>>

Reply via email to