Hi Vishal,

Kostas (in CC) should be able to help here.

Best, Fabian

Am Mo., 11. Feb. 2019 um 00:05 Uhr schrieb Vishal Santoshi <
vishal.santo...@gmail.com>:

> Any one ?
>
> On Sun, Feb 10, 2019 at 2:07 PM Vishal Santoshi <vishal.santo...@gmail.com>
> wrote:
>
>> You don't have to. Thank you for the input.
>>
>> On Sun, Feb 10, 2019 at 1:56 PM Timothy Victor <vict...@gmail.com> wrote:
>>
>>> My apologies for not seeing your use case properly.   The constraint on
>>> rolling policy is only applicable for bulk formats such as Parquet as
>>> highlighted in the docs.
>>>
>>> As for your questions, I'll have to defer to others more familiar with
>>> it.   I mostly just use bulk formats such as avro and parquet.
>>>
>>> Tim
>>>
>>>
>>> On Sun, Feb 10, 2019, 12:40 PM Vishal Santoshi <
>>> vishal.santo...@gmail.com wrote:
>>>
>>>> That said the in the DefaultRollingPolicy it seems the check is on the
>>>> file size ( mimics the check shouldRollOnEVent()).
>>>>
>>>> I guess the question is
>>>>
>>>> Is  the call to shouldRollOnCheckPoint.  done by the checkpointing
>>>> thread ?
>>>>
>>>> Are the calls to the other 2 methods shouldRollOnEVent and
>>>> shouldRollOnProcessingTIme done on the execution thread  as in inlined ?
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Sun, Feb 10, 2019 at 1:17 PM Vishal Santoshi <
>>>> vishal.santo...@gmail.com> wrote:
>>>>
>>>>> Thanks for the quick reply.
>>>>>
>>>>> I am confused. If this was a more full featured BucketingSink ,I would
>>>>> imagine that  based on shouldRollOnEvent and shouldRollOnEvent, an in
>>>>> progress file could go into pending phase and on checkpoint the pending
>>>>> part file would be  finalized. For exactly once any files ( in progress
>>>>> file ) will have a length of the file  snapshotted to the checkpoint  and
>>>>> used to truncate the file ( if supported ) or dropped as a part-length 
>>>>> file
>>>>> ( if truncate not supported )  if a resume from a checkpoint was to 
>>>>> happen,
>>>>> to indicate what part of the the finalized file ( finalized when resumed )
>>>>> was valid . and  I had always assumed ( and there is no doc otherwise )
>>>>> that shouldRollOnCheckpoint would be similar to the other 2 apart
>>>>> from the fact it does the roll and finalize step in a single step on a
>>>>> checkpoint.
>>>>>
>>>>>
>>>>> Am I better off using BucketingSink ?  When to use BucketingSink and
>>>>> when to use RollingSink is not clear at all, even though at the surface it
>>>>> sure looks RollingSink is a better version of .BucketingSink ( or not )
>>>>>
>>>>> Regards.
>>>>>
>>>>>
>>>>>
>>>>> On Sun, Feb 10, 2019 at 12:09 PM Timothy Victor <vict...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> I think the only rolling policy that can be used is
>>>>>> CheckpointRollingPolicy to ensure exactly once.
>>>>>>
>>>>>> Tim
>>>>>>
>>>>>> On Sun, Feb 10, 2019, 9:13 AM Vishal Santoshi <
>>>>>> vishal.santo...@gmail.com wrote:
>>>>>>
>>>>>>> Can StreamingFileSink be used instead of 
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/filesystem_sink.html,
>>>>>>>  even though it looks it could.
>>>>>>>
>>>>>>>
>>>>>>> This code for example
>>>>>>>
>>>>>>>
>>>>>>>         StreamingFileSink
>>>>>>>                 .forRowFormat(new Path(PATH),
>>>>>>>                         new SimpleStringEncoder<KafkaRecord>())
>>>>>>>                 .withBucketAssigner(new 
>>>>>>> KafkaRecordBucketAssigner(DEFAULT_FORMAT_STRING, ZONE_ID))
>>>>>>>                 .withRollingPolicy(new RollingPolicy<KafkaRecord, 
>>>>>>> String>() {
>>>>>>>                                        @Override
>>>>>>>                                        public boolean 
>>>>>>> shouldRollOnCheckpoint(PartFileInfo<String> partFileState) throws 
>>>>>>> IOException {
>>>>>>>                                            return false;
>>>>>>>                                        }
>>>>>>>
>>>>>>>                                        @Override
>>>>>>>                                        public boolean 
>>>>>>> shouldRollOnEvent(PartFileInfo<String> partFileState,
>>>>>>>                                                                         
>>>>>>> KafkaRecord element) throws IOException {
>>>>>>>                                            return 
>>>>>>> partFileState.getSize() > 1024 * 1024 * 1024l;
>>>>>>>                                        }
>>>>>>>
>>>>>>>                                        @Override
>>>>>>>                                        public boolean 
>>>>>>> shouldRollOnProcessingTime(PartFileInfo<String> partFileState, long 
>>>>>>> currentTime) throws IOException {
>>>>>>>                                            return currentTime - 
>>>>>>> partFileState.getLastUpdateTime() > 10 * 60 * 1000l ||
>>>>>>>                                                    currentTime - 
>>>>>>> partFileState.getCreationTime() > 120 * 60 * 1000l;
>>>>>>>                                        }
>>>>>>>                                    }
>>>>>>>                 )
>>>>>>>                 .build();
>>>>>>>
>>>>>>>
>>>>>>> few things I see and am not sure I follow about the new RollingFileSink 
>>>>>>>  vis a vis BucketingSink
>>>>>>>
>>>>>>>
>>>>>>> 1. I do not ever see the inprogress file go to the pending state, as in 
>>>>>>> renamed as pending, as was the case in Bucketing Sink.  I would assume 
>>>>>>> that it would be pending and then
>>>>>>>
>>>>>>>    finalized on checkpoint for exactly once semantics ?
>>>>>>>
>>>>>>>
>>>>>>> 2. I see dangling inprogress files at the end of the day. I would 
>>>>>>> assume that the withBucketCheckInterval set to 1 minute by default, the 
>>>>>>> shouldRollOnProcessingTime should kick in ?
>>>>>>>
>>>>>>>  3. The inprogress files are  like 
>>>>>>> .part-4-45.inprogress.3ed08b67-2b56-4d31-a233-b1c146cfcf14 . What is 
>>>>>>> that additional suffix ?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> I have the following set up on the env
>>>>>>>
>>>>>>> env.enableCheckpointing(10 * 60000);
>>>>>>> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>>>>>>> env.setRestartStrategy(fixedDelayRestart(4, 
>>>>>>> org.apache.flink.api.common.time.Time.minutes(1)));
>>>>>>> StateBackend stateBackEnd = new MemoryStateBackend();
>>>>>>> env.setStateBackend(stateBackEnd);
>>>>>>>
>>>>>>>
>>>>>>> Regards.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>

Reply via email to