Yes, sorry for my confusing comment. I just meant that it seems like
there's a bug somewhere now that the output is missing some data.

> I would wait and check the actual output in s3 because it is the main
result of the job

Yes, and that's what I have already done. There seems to be always some
data loss with the production data volumes, if the job has been restarted
on that day.

Would you have any suggestions for how to debug this further?

Many thanks for stepping in.

On Fri, Aug 24, 2018 at 6:37 PM Andrey Zagrebin <and...@data-artisans.com>
wrote:

> Hi Juho,
>
> So it is a per key deduplication job.
>
> Yes, I would wait and check the actual output in s3 because it is the main
> result of the job and
>
> > The late data around the time of taking savepoint might be not included
> into the savepoint but it should be behind the snapshotted offset in Kafka.
>
> is not a bug, it is a possible behaviour.
>
> The savepoint is a snapshot of the data in transient which is already
> consumed from Kafka.
> Basically the full contents of the window result is split between the
> savepoint and what can come after the savepoint'ed offset in Kafka but
> before the window result is written into s3.
>
> Allowed lateness should not affect it, I am just saying that the final
> result in s3 should include all records after it.
> This is what should be guaranteed but not the contents of the intermediate
> savepoint.
>
> Cheers,
> Andrey
>
> On 24 Aug 2018, at 16:52, Juho Autio <juho.au...@rovio.com> wrote:
>
> Thanks for your answer!
>
> I check for the missed data from the final output on s3. So I wait until
> the next day, then run the same thing re-implemented in batch, and compare
> the output.
>
> > The late data around the time of taking savepoint might be not included
> into the savepoint but it should be behind the snapshotted offset in Kafka.
>
> Yes, I would definitely expect that. It seems like there's a bug somewhere.
>
> > Then it should just come later after the restore and should be reduced
> within the allowed lateness into the final result which is saved into s3.
>
> Well, as far as I know, allowed lateness doesn't play any role here,
> because I started running the job with allowedLateness=0, and still get the
> data loss, while my late data output doesn't receive anything.
>
> > Also, is this `DistinctFunction.reduce` just an example or the actual
> implementation, basically saving just one of records inside the 24h window
> in s3? then what is missing there?
>
> Yes, it's the actual implementation. Note that there's a keyBy before
> the DistinctFunction. So there's one record for each key (which is the
> combination of a couple of fields). In practice I've seen that we're
> missing ~2000-4000 elements on each restore, and the total output is
> obviously much more than that.
>
> Here's the full code for the key selector:
>
> public class MapKeySelector implements KeySelector<Map<String,String>,
> Object> {
>
>     private final String[] fields;
>
>     public MapKeySelector(String... fields) {
>         this.fields = fields;
>     }
>
>     @Override
>     public Object getKey(Map<String, String> event) throws Exception {
>         Tuple key = Tuple.getTupleClass(fields.length).newInstance();
>         for (int i = 0; i < fields.length; i++) {
>             key.setField(event.getOrDefault(fields[i], ""), i);
>         }
>         return key;
>     }
> }
>
> And a more exact example on how it's used:
>
>                 .keyBy(new MapKeySelector("ID", "PLAYER_ID", "FIELD",
> "KEY_NAME", "KEY_VALUE"))
>                 .timeWindow(Time.days(1))
>                 .reduce(new DistinctFunction())
>
> On Fri, Aug 24, 2018 at 5:26 PM Andrey Zagrebin <and...@data-artisans.com>
> wrote:
>
>> Hi Juho,
>>
>> Where exactly does the data miss? When do you notice that?
>> Do you check it:
>> - debugging `DistinctFunction.reduce` right after resume in the middle of
>> the day
>> or
>> - some distinct records miss in the final output of BucketingSink in s3
>> after window result is actually triggered and saved into s3 at the end of
>> the day? is this the main output?
>>
>> The late data around the time of taking savepoint might be not included
>> into the savepoint but it should be behind the snapshotted offset in Kafka.
>> Then it should just come later after the restore and should be reduced
>> within the allowed lateness into the final result which is saved into s3.
>>
>> Also, is this `DistinctFunction.reduce` just an example or the actual
>> implementation, basically saving just one of records inside the 24h window
>> in s3? then what is missing there?
>>
>> Cheers,
>> Andrey
>>
>> On 23 Aug 2018, at 15:42, Juho Autio <juho.au...@rovio.com> wrote:
>>
>> I changed to allowedLateness=0, no change, still missing data when
>> restoring from savepoint.
>>
>> On Tue, Aug 21, 2018 at 10:43 AM Juho Autio <juho.au...@rovio.com> wrote:
>>
>>> I realized that BucketingSink must not play any role in this problem.
>>> This is because only when the 24-hour window triggers, BucketinSink gets a
>>> burst of input. Around the state restoring point (middle of the day) it
>>> doesn't get any input, so it can't lose anything either (right?).
>>>
>>> I will next try removing the allowedLateness entirely from the equation.
>>>
>>> In the meanwhile, please let me know if you have any suggestions for
>>> debugging the lost data, for example what logs to enable.
>>>
>>> We use FlinkKafkaConsumer010 btw. Are there any known issues with that,
>>> that could contribute to lost data when restoring a savepoint?
>>>
>>> On Fri, Aug 17, 2018 at 4:23 PM Juho Autio <juho.au...@rovio.com> wrote:
>>>
>>>> Some data is silently lost on my Flink stream job when state is
>>>> restored from a savepoint.
>>>>
>>>> Do you have any debugging hints to find out where exactly the data gets
>>>> dropped?
>>>>
>>>> My job gathers distinct values using a 24-hour window. It doesn't have
>>>> any custom state management.
>>>>
>>>> When I cancel the job with savepoint and restore from that savepoint,
>>>> some data is missed. It seems to be losing just a small amount of data. The
>>>> event time of lost data is probably around the time of savepoint. In other
>>>> words the rest of the time window is not entirely missed – collection works
>>>> correctly also for (most of the) events that come in after restoring.
>>>>
>>>> When the job processes a full 24-hour window without interruptions it
>>>> doesn't miss anything.
>>>>
>>>> Usually the problem doesn't happen in test environments that have
>>>> smaller parallelism and smaller data volumes. But in production volumes the
>>>> job seems to be consistently missing at least something on every restore.
>>>>
>>>> This issue has consistently happened since the job was initially
>>>> created. It was at first run on an older version of Flink 1.5-SNAPSHOT and
>>>> it still happens on both Flink 1.5.2 & 1.6.0.
>>>>
>>>> I'm wondering if this could be for example some synchronization issue
>>>> between the kafka consumer offsets vs. what's been written by 
>>>> BucketingSink?
>>>>
>>>> 1. Job content, simplified
>>>>
>>>>         kafkaStream
>>>>                 .flatMap(new ExtractFieldsFunction())
>>>>                 .keyBy(new MapKeySelector(1, 2, 3, 4))
>>>>                 .timeWindow(Time.days(1))
>>>>                 .allowedLateness(allowedLateness)
>>>>                 .sideOutputLateData(lateDataTag)
>>>>                 .reduce(new DistinctFunction())
>>>>                 .addSink(sink)
>>>>                 // use a fixed number of output partitions
>>>>                 .setParallelism(8))
>>>>
>>>> /**
>>>>  * Usage: .keyBy("the", "distinct", "fields").reduce(new
>>>> DistinctFunction())
>>>>  */
>>>> public class DistinctFunction implements
>>>> ReduceFunction<java.util.Map<String, String>> {
>>>>     @Override
>>>>     public Map<String, String> reduce(Map<String, String> value1,
>>>> Map<String, String> value2) {
>>>>         return value1;
>>>>     }
>>>> }
>>>>
>>>> 2. State configuration
>>>>
>>>> boolean enableIncrementalCheckpointing = true;
>>>> String statePath = "s3n://bucket/savepoints";
>>>> new RocksDBStateBackend(statePath, enableIncrementalCheckpointing);
>>>>
>>>> Checkpointing Mode Exactly Once
>>>> Interval 1m 0s
>>>> Timeout 10m 0s
>>>> Minimum Pause Between Checkpoints 1m 0s
>>>> Maximum Concurrent Checkpoints 1
>>>> Persist Checkpoints Externally Enabled (retain on cancellation)
>>>>
>>>> 3. BucketingSink configuration
>>>>
>>>> We use BucketingSink, I don't think there's anything special here, if
>>>> not the fact that we're writing to S3.
>>>>
>>>>         String outputPath = "s3://bucket/output";
>>>>         BucketingSink<Map<String, String>> sink = new
>>>> BucketingSink<Map<String, String>>(outputPath)
>>>>                 .setBucketer(new ProcessdateBucketer())
>>>>                 .setBatchSize(batchSize)
>>>>                 .setInactiveBucketThreshold(inactiveBucketThreshold)
>>>>
>>>> .setInactiveBucketCheckInterval(inactiveBucketCheckInterval);
>>>>         sink.setWriter(new IdJsonWriter());
>>>>
>>>> 4. Kafka & event time
>>>>
>>>> My flink job reads the data from Kafka, using a
>>>> BoundedOutOfOrdernessTimestampExtractor on the kafka consumer to
>>>> synchronize watermarks accross all kafka partitions. We also write late
>>>> data to side output, but nothing is written there – if it would, it could
>>>> explain missed data in the main output (I'm also sure that our late data
>>>> writing works, because we previously had some actual late data which ended
>>>> up there).
>>>>
>>>> 5. allowedLateness
>>>>
>>>> It may be or may not be relevant that I have also enabled
>>>> allowedLateness with 1 minute lateness on the 24-hour window:
>>>>
>>>> If that makes sense, I could try removing allowedLateness entirely?
>>>> That would be just to rule out that Flink doesn't have a bug that's related
>>>> to restoring state in combination with the allowedLateness feature. After
>>>> all, all of our data should be in a good enough order to not be late, given
>>>> the max out of orderness used on kafka consumer timestamp extractor.
>>>>
>>>> Thank you in advance!
>>>>
>>>
>>>
>>
>>
>
>

Reply via email to