Hi Vishal, Kostas (in CC) should be able to help here.
Best, Fabian Am Mo., 11. Feb. 2019 um 00:05 Uhr schrieb Vishal Santoshi < vishal.santo...@gmail.com>: > Any one ? > > On Sun, Feb 10, 2019 at 2:07 PM Vishal Santoshi <vishal.santo...@gmail.com> > wrote: > >> You don't have to. Thank you for the input. >> >> On Sun, Feb 10, 2019 at 1:56 PM Timothy Victor <vict...@gmail.com> wrote: >> >>> My apologies for not seeing your use case properly. The constraint on >>> rolling policy is only applicable for bulk formats such as Parquet as >>> highlighted in the docs. >>> >>> As for your questions, I'll have to defer to others more familiar with >>> it. I mostly just use bulk formats such as avro and parquet. >>> >>> Tim >>> >>> >>> On Sun, Feb 10, 2019, 12:40 PM Vishal Santoshi < >>> vishal.santo...@gmail.com wrote: >>> >>>> That said the in the DefaultRollingPolicy it seems the check is on the >>>> file size ( mimics the check shouldRollOnEVent()). >>>> >>>> I guess the question is >>>> >>>> Is the call to shouldRollOnCheckPoint. done by the checkpointing >>>> thread ? >>>> >>>> Are the calls to the other 2 methods shouldRollOnEVent and >>>> shouldRollOnProcessingTIme done on the execution thread as in inlined ? >>>> >>>> >>>> >>>> >>>> >>>> On Sun, Feb 10, 2019 at 1:17 PM Vishal Santoshi < >>>> vishal.santo...@gmail.com> wrote: >>>> >>>>> Thanks for the quick reply. >>>>> >>>>> I am confused. If this was a more full featured BucketingSink ,I would >>>>> imagine that based on shouldRollOnEvent and shouldRollOnEvent, an in >>>>> progress file could go into pending phase and on checkpoint the pending >>>>> part file would be finalized. For exactly once any files ( in progress >>>>> file ) will have a length of the file snapshotted to the checkpoint and >>>>> used to truncate the file ( if supported ) or dropped as a part-length >>>>> file >>>>> ( if truncate not supported ) if a resume from a checkpoint was to >>>>> happen, >>>>> to indicate what part of the the finalized file ( finalized when resumed ) >>>>> was valid . and I had always assumed ( and there is no doc otherwise ) >>>>> that shouldRollOnCheckpoint would be similar to the other 2 apart >>>>> from the fact it does the roll and finalize step in a single step on a >>>>> checkpoint. >>>>> >>>>> >>>>> Am I better off using BucketingSink ? When to use BucketingSink and >>>>> when to use RollingSink is not clear at all, even though at the surface it >>>>> sure looks RollingSink is a better version of .BucketingSink ( or not ) >>>>> >>>>> Regards. >>>>> >>>>> >>>>> >>>>> On Sun, Feb 10, 2019 at 12:09 PM Timothy Victor <vict...@gmail.com> >>>>> wrote: >>>>> >>>>>> I think the only rolling policy that can be used is >>>>>> CheckpointRollingPolicy to ensure exactly once. >>>>>> >>>>>> Tim >>>>>> >>>>>> On Sun, Feb 10, 2019, 9:13 AM Vishal Santoshi < >>>>>> vishal.santo...@gmail.com wrote: >>>>>> >>>>>>> Can StreamingFileSink be used instead of >>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/filesystem_sink.html, >>>>>>> even though it looks it could. >>>>>>> >>>>>>> >>>>>>> This code for example >>>>>>> >>>>>>> >>>>>>> StreamingFileSink >>>>>>> .forRowFormat(new Path(PATH), >>>>>>> new SimpleStringEncoder<KafkaRecord>()) >>>>>>> .withBucketAssigner(new >>>>>>> KafkaRecordBucketAssigner(DEFAULT_FORMAT_STRING, ZONE_ID)) >>>>>>> .withRollingPolicy(new RollingPolicy<KafkaRecord, >>>>>>> String>() { >>>>>>> @Override >>>>>>> public boolean >>>>>>> shouldRollOnCheckpoint(PartFileInfo<String> partFileState) throws >>>>>>> IOException { >>>>>>> return false; >>>>>>> } >>>>>>> >>>>>>> @Override >>>>>>> public boolean >>>>>>> shouldRollOnEvent(PartFileInfo<String> partFileState, >>>>>>> >>>>>>> KafkaRecord element) throws IOException { >>>>>>> return >>>>>>> partFileState.getSize() > 1024 * 1024 * 1024l; >>>>>>> } >>>>>>> >>>>>>> @Override >>>>>>> public boolean >>>>>>> shouldRollOnProcessingTime(PartFileInfo<String> partFileState, long >>>>>>> currentTime) throws IOException { >>>>>>> return currentTime - >>>>>>> partFileState.getLastUpdateTime() > 10 * 60 * 1000l || >>>>>>> currentTime - >>>>>>> partFileState.getCreationTime() > 120 * 60 * 1000l; >>>>>>> } >>>>>>> } >>>>>>> ) >>>>>>> .build(); >>>>>>> >>>>>>> >>>>>>> few things I see and am not sure I follow about the new RollingFileSink >>>>>>> vis a vis BucketingSink >>>>>>> >>>>>>> >>>>>>> 1. I do not ever see the inprogress file go to the pending state, as in >>>>>>> renamed as pending, as was the case in Bucketing Sink. I would assume >>>>>>> that it would be pending and then >>>>>>> >>>>>>> finalized on checkpoint for exactly once semantics ? >>>>>>> >>>>>>> >>>>>>> 2. I see dangling inprogress files at the end of the day. I would >>>>>>> assume that the withBucketCheckInterval set to 1 minute by default, the >>>>>>> shouldRollOnProcessingTime should kick in ? >>>>>>> >>>>>>> 3. The inprogress files are like >>>>>>> .part-4-45.inprogress.3ed08b67-2b56-4d31-a233-b1c146cfcf14 . What is >>>>>>> that additional suffix ? >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> I have the following set up on the env >>>>>>> >>>>>>> env.enableCheckpointing(10 * 60000); >>>>>>> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); >>>>>>> env.setRestartStrategy(fixedDelayRestart(4, >>>>>>> org.apache.flink.api.common.time.Time.minutes(1))); >>>>>>> StateBackend stateBackEnd = new MemoryStateBackend(); >>>>>>> env.setStateBackend(stateBackEnd); >>>>>>> >>>>>>> >>>>>>> Regards. >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>>