Re: Flink missing Kafka records

2021-04-29 Thread Dan Hill
Hey Arvid,

I'll try to repo sometime in the next few weeks.  I need to make some
larger changes to get a full diff to see what is being dropped.


On Thu, Apr 29, 2021 at 4:03 AM Arvid Heise  wrote:

> Hi Dan,
>
> could you check which records are missing? I'm suspecting it could be
> records that are emitted right before roll over of the bucket strategy from
> an otherwise idling partition.
>
> If so it could be indeed connected to idleness. Idleness tells Flink to
> not wait on the particular partition to advance watermark. If a record
> appears in a previously idle partition with an event timestamp before the
> watermark of the other partitions, that record would be deemed late and is
> discarded.
>
> On Tue, Apr 27, 2021 at 2:42 AM Dan Hill  wrote:
>
>> Hey Robert.
>>
>> Nothing weird.  I was trying to find recent records (not the latest).  No
>> savepoints (just was running about ~1 day).  No checkpoint issues (all
>> successes).  I don't know how many are missing.
>>
>> I removed the withIdleness. The other parts are very basic.  The text
>> logs look pretty useless.
>>
>> On Mon, Apr 26, 2021 at 11:07 AM Robert Metzger 
>> wrote:
>>
>>> Hi Dan,
>>>
>>> Can you describe under which conditions you are missing records (after a
>>> machine failure, after a Kafka failure, after taking and restoring from a
>>> savepoint, ...).
>>> Are many records missing? Are "the first records" or the "latest
>>> records" missing? Any individual records missing, or larger blocks of data?
>>>
>>> I don't think that there's a bug in Flink or the Kafka connector. Maybe
>>> its just a configuration or systems design issue.
>>>
>>>
>>> On Sun, Apr 25, 2021 at 9:56 AM Dan Hill  wrote:
>>>
>>>> Hi!
>>>>
>>>> Have any other devs noticed issues with Flink missing Kafka records
>>>> with long-running Flink jobs?  When I re-run my Flink job and start from
>>>> the earliest Kafka offset, Flink processes the events correctly.  I'm using
>>>> Flink v1.11.1.
>>>>
>>>> I have a simple job that takes records (Requests) from Kafka and
>>>> serializes them to S3.  Pretty basic.  No related issues in the text logs.
>>>> I'm hoping I just have a configuration issue.  I'm guessing idleness is
>>>> working in a way that I'm not expecting.
>>>>
>>>> Any ideas?
>>>> - Dan
>>>>
>>>>
>>>> void createLogRequestJob(StreamExecutionEnvironment env) throws
>>>> Exception {
>>>>
>>>>   Properties kafkaSourceProperties =
>>>> getKafkaSourceProperties("logrequest");
>>>>
>>>>   SingleOutputStreamOperator rawRequestInput = env.addSource(
>>>>
>>>> new FlinkKafkaConsumer(getInputRequestTopic(),
>>>> getProtoDeserializationSchema(Request.class), kafkaSourceProperties))
>>>>
>>>>   .uid("source-request")
>>>>
>>>>   .name("Request")
>>>>
>>>>   .assignTimestampsAndWatermarks(
>>>>
>>>>
>>>> WatermarkStrategy.forBoundedOutOfOrderness(maxOutOfOrderness).withIdleness(Duration.ofMinutes(1)));
>>>>
>>>>
>>>>   executeLogRequest(rawRequestInput);
>>>>
>>>>   env.execute("log-request");
>>>>
>>>> }
>>>>
>>>>
>>>> void executeLogRequest(SingleOutputStreamOperator
>>>> rawRequestInput) {
>>>>
>>>>   AvroWriterFactory factory =
>>>> getAvroWriterFactory(Request.class);
>>>>
>>>>   rawRequestInput.addSink(StreamingFileSink
>>>>
>>>>   .forBulkFormat(new Path(getS3OutputDirectory(), "raw/request"),
>>>> factory)
>>>>
>>>>   .withBucketAssigner(new DateHourBucketAssigner(request
>>>> -> request.getTiming().getEventApiTimestamp()))
>>>>
>>>>   .withRollingPolicy(OnCheckpointRollingPolicy.build())
>>>>
>>>>   .withOutputFileConfig(createOutputFileConfig())
>>>>
>>>>   .build())
>>>>
>>>> .uid("sink-s3-raw-request")
>>>>
>>>> .name("S3 Raw Request");
>>>>
>>>> }
>>>>
>>>>
>>>>
>>>>


Re: Flink missing Kafka records

2021-04-29 Thread Arvid Heise
Hi Dan,

could you check which records are missing? I'm suspecting it could be
records that are emitted right before roll over of the bucket strategy from
an otherwise idling partition.

If so it could be indeed connected to idleness. Idleness tells Flink to not
wait on the particular partition to advance watermark. If a record appears
in a previously idle partition with an event timestamp before the watermark
of the other partitions, that record would be deemed late and is discarded.

On Tue, Apr 27, 2021 at 2:42 AM Dan Hill  wrote:

> Hey Robert.
>
> Nothing weird.  I was trying to find recent records (not the latest).  No
> savepoints (just was running about ~1 day).  No checkpoint issues (all
> successes).  I don't know how many are missing.
>
> I removed the withIdleness. The other parts are very basic.  The text logs
> look pretty useless.
>
> On Mon, Apr 26, 2021 at 11:07 AM Robert Metzger 
> wrote:
>
>> Hi Dan,
>>
>> Can you describe under which conditions you are missing records (after a
>> machine failure, after a Kafka failure, after taking and restoring from a
>> savepoint, ...).
>> Are many records missing? Are "the first records" or the "latest records"
>> missing? Any individual records missing, or larger blocks of data?
>>
>> I don't think that there's a bug in Flink or the Kafka connector. Maybe
>> its just a configuration or systems design issue.
>>
>>
>> On Sun, Apr 25, 2021 at 9:56 AM Dan Hill  wrote:
>>
>>> Hi!
>>>
>>> Have any other devs noticed issues with Flink missing Kafka records with
>>> long-running Flink jobs?  When I re-run my Flink job and start from the
>>> earliest Kafka offset, Flink processes the events correctly.  I'm using
>>> Flink v1.11.1.
>>>
>>> I have a simple job that takes records (Requests) from Kafka and
>>> serializes them to S3.  Pretty basic.  No related issues in the text logs.
>>> I'm hoping I just have a configuration issue.  I'm guessing idleness is
>>> working in a way that I'm not expecting.
>>>
>>> Any ideas?
>>> - Dan
>>>
>>>
>>> void createLogRequestJob(StreamExecutionEnvironment env) throws
>>> Exception {
>>>
>>>   Properties kafkaSourceProperties =
>>> getKafkaSourceProperties("logrequest");
>>>
>>>   SingleOutputStreamOperator rawRequestInput = env.addSource(
>>>
>>> new FlinkKafkaConsumer(getInputRequestTopic(),
>>> getProtoDeserializationSchema(Request.class), kafkaSourceProperties))
>>>
>>>   .uid("source-request")
>>>
>>>   .name("Request")
>>>
>>>   .assignTimestampsAndWatermarks(
>>>
>>>
>>> WatermarkStrategy.forBoundedOutOfOrderness(maxOutOfOrderness).withIdleness(Duration.ofMinutes(1)));
>>>
>>>
>>>   executeLogRequest(rawRequestInput);
>>>
>>>   env.execute("log-request");
>>>
>>> }
>>>
>>>
>>> void executeLogRequest(SingleOutputStreamOperator
>>> rawRequestInput) {
>>>
>>>   AvroWriterFactory factory =
>>> getAvroWriterFactory(Request.class);
>>>
>>>   rawRequestInput.addSink(StreamingFileSink
>>>
>>>   .forBulkFormat(new Path(getS3OutputDirectory(), "raw/request"),
>>> factory)
>>>
>>>   .withBucketAssigner(new DateHourBucketAssigner(request
>>> -> request.getTiming().getEventApiTimestamp()))
>>>
>>>   .withRollingPolicy(OnCheckpointRollingPolicy.build())
>>>
>>>   .withOutputFileConfig(createOutputFileConfig())
>>>
>>>   .build())
>>>
>>> .uid("sink-s3-raw-request")
>>>
>>> .name("S3 Raw Request");
>>>
>>> }
>>>
>>>
>>>
>>>


Re: Flink missing Kafka records

2021-04-26 Thread Dan Hill
Hey Robert.

Nothing weird.  I was trying to find recent records (not the latest).  No
savepoints (just was running about ~1 day).  No checkpoint issues (all
successes).  I don't know how many are missing.

I removed the withIdleness. The other parts are very basic.  The text logs
look pretty useless.

On Mon, Apr 26, 2021 at 11:07 AM Robert Metzger  wrote:

> Hi Dan,
>
> Can you describe under which conditions you are missing records (after a
> machine failure, after a Kafka failure, after taking and restoring from a
> savepoint, ...).
> Are many records missing? Are "the first records" or the "latest records"
> missing? Any individual records missing, or larger blocks of data?
>
> I don't think that there's a bug in Flink or the Kafka connector. Maybe
> its just a configuration or systems design issue.
>
>
> On Sun, Apr 25, 2021 at 9:56 AM Dan Hill  wrote:
>
>> Hi!
>>
>> Have any other devs noticed issues with Flink missing Kafka records with
>> long-running Flink jobs?  When I re-run my Flink job and start from the
>> earliest Kafka offset, Flink processes the events correctly.  I'm using
>> Flink v1.11.1.
>>
>> I have a simple job that takes records (Requests) from Kafka and
>> serializes them to S3.  Pretty basic.  No related issues in the text logs.
>> I'm hoping I just have a configuration issue.  I'm guessing idleness is
>> working in a way that I'm not expecting.
>>
>> Any ideas?
>> - Dan
>>
>>
>> void createLogRequestJob(StreamExecutionEnvironment env) throws Exception
>> {
>>
>>   Properties kafkaSourceProperties =
>> getKafkaSourceProperties("logrequest");
>>
>>   SingleOutputStreamOperator rawRequestInput = env.addSource(
>>
>> new FlinkKafkaConsumer(getInputRequestTopic(),
>> getProtoDeserializationSchema(Request.class), kafkaSourceProperties))
>>
>>   .uid("source-request")
>>
>>   .name("Request")
>>
>>   .assignTimestampsAndWatermarks(
>>
>>
>> WatermarkStrategy.forBoundedOutOfOrderness(maxOutOfOrderness).withIdleness(Duration.ofMinutes(1)));
>>
>>
>>   executeLogRequest(rawRequestInput);
>>
>>   env.execute("log-request");
>>
>> }
>>
>>
>> void executeLogRequest(SingleOutputStreamOperator
>> rawRequestInput) {
>>
>>   AvroWriterFactory factory =
>> getAvroWriterFactory(Request.class);
>>
>>   rawRequestInput.addSink(StreamingFileSink
>>
>>   .forBulkFormat(new Path(getS3OutputDirectory(), "raw/request"),
>> factory)
>>
>>   .withBucketAssigner(new DateHourBucketAssigner(request ->
>> request.getTiming().getEventApiTimestamp()))
>>
>>   .withRollingPolicy(OnCheckpointRollingPolicy.build())
>>
>>   .withOutputFileConfig(createOutputFileConfig())
>>
>>   .build())
>>
>> .uid("sink-s3-raw-request")
>>
>> .name("S3 Raw Request");
>>
>> }
>>
>>
>>
>>


Re: Flink missing Kafka records

2021-04-26 Thread Robert Metzger
Hi Dan,

Can you describe under which conditions you are missing records (after a
machine failure, after a Kafka failure, after taking and restoring from a
savepoint, ...).
Are many records missing? Are "the first records" or the "latest records"
missing? Any individual records missing, or larger blocks of data?

I don't think that there's a bug in Flink or the Kafka connector. Maybe its
just a configuration or systems design issue.


On Sun, Apr 25, 2021 at 9:56 AM Dan Hill  wrote:

> Hi!
>
> Have any other devs noticed issues with Flink missing Kafka records with
> long-running Flink jobs?  When I re-run my Flink job and start from the
> earliest Kafka offset, Flink processes the events correctly.  I'm using
> Flink v1.11.1.
>
> I have a simple job that takes records (Requests) from Kafka and
> serializes them to S3.  Pretty basic.  No related issues in the text logs.
> I'm hoping I just have a configuration issue.  I'm guessing idleness is
> working in a way that I'm not expecting.
>
> Any ideas?
> - Dan
>
>
> void createLogRequestJob(StreamExecutionEnvironment env) throws Exception {
>
>   Properties kafkaSourceProperties =
> getKafkaSourceProperties("logrequest");
>
>   SingleOutputStreamOperator rawRequestInput = env.addSource(
>
> new FlinkKafkaConsumer(getInputRequestTopic(),
> getProtoDeserializationSchema(Request.class), kafkaSourceProperties))
>
>   .uid("source-request")
>
>   .name("Request")
>
>   .assignTimestampsAndWatermarks(
>
>
> WatermarkStrategy.forBoundedOutOfOrderness(maxOutOfOrderness).withIdleness(Duration.ofMinutes(1)));
>
>
>   executeLogRequest(rawRequestInput);
>
>   env.execute("log-request");
>
> }
>
>
> void executeLogRequest(SingleOutputStreamOperator
> rawRequestInput) {
>
>   AvroWriterFactory factory =
> getAvroWriterFactory(Request.class);
>
>   rawRequestInput.addSink(StreamingFileSink
>
>   .forBulkFormat(new Path(getS3OutputDirectory(), "raw/request"),
> factory)
>
>   .withBucketAssigner(new DateHourBucketAssigner(request ->
> request.getTiming().getEventApiTimestamp()))
>
>   .withRollingPolicy(OnCheckpointRollingPolicy.build())
>
>   .withOutputFileConfig(createOutputFileConfig())
>
>   .build())
>
> .uid("sink-s3-raw-request")
>
> .name("S3 Raw Request");
>
> }
>
>
>
>


Flink missing Kafka records

2021-04-25 Thread Dan Hill
Hi!

Have any other devs noticed issues with Flink missing Kafka records with
long-running Flink jobs?  When I re-run my Flink job and start from the
earliest Kafka offset, Flink processes the events correctly.  I'm using
Flink v1.11.1.

I have a simple job that takes records (Requests) from Kafka and serializes
them to S3.  Pretty basic.  No related issues in the text logs.  I'm hoping
I just have a configuration issue.  I'm guessing idleness is working in a
way that I'm not expecting.

Any ideas?
- Dan


void createLogRequestJob(StreamExecutionEnvironment env) throws Exception {

  Properties kafkaSourceProperties = getKafkaSourceProperties("logrequest");

  SingleOutputStreamOperator rawRequestInput = env.addSource(

new FlinkKafkaConsumer(getInputRequestTopic(),
getProtoDeserializationSchema(Request.class), kafkaSourceProperties))

  .uid("source-request")

  .name("Request")

  .assignTimestampsAndWatermarks(


WatermarkStrategy.forBoundedOutOfOrderness(maxOutOfOrderness).withIdleness(Duration.ofMinutes(1)));


  executeLogRequest(rawRequestInput);

  env.execute("log-request");

}


void executeLogRequest(SingleOutputStreamOperator rawRequestInput)
{

  AvroWriterFactory factory = getAvroWriterFactory(Request.class);

  rawRequestInput.addSink(StreamingFileSink

  .forBulkFormat(new Path(getS3OutputDirectory(), "raw/request"),
factory)

  .withBucketAssigner(new DateHourBucketAssigner(request ->
request.getTiming().getEventApiTimestamp()))

  .withRollingPolicy(OnCheckpointRollingPolicy.build())

  .withOutputFileConfig(createOutputFileConfig())

  .build())

.uid("sink-s3-raw-request")

.name("S3 Raw Request");

}