Robert,

So removing the setUnbounded(OffsetInitializer.latest) fixed the issue.
Thanks!

On Wed, Sep 22, 2021 at 11:51 AM Robert Metzger <rmetz...@apache.org> wrote:

> Hi,
>
> What happens if you do not set any boundedness on the KafkaSource?
> For a DataStream job in streaming mode, the Kafka source should be
> unbounded.
>
> From reading the code, it seems that setting unbounded(latest) should not
> trigger the behavior you mention ... but the Flink docs are not clearly
> written [1], as it says that you can make a Kafka source bounded by calling
> "setUnbounded" ... which is weird, because "setUnbounded" should not make
> something bounded?!
>
> Are there any log messages from the Source that can give us any hints?
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#boundedness
>
> On Wed, Sep 22, 2021 at 5:37 PM Robert Cullen <cinquate...@gmail.com>
> wrote:
>
>> I have an unbounded kafka source that has records written to it every
>> second.  Instead of the job waiting to process the new messages it closes.
>> How do I keep the stream open?
>>
>> KafkaSource<FluentdMessage> dataSource = KafkaSource
>>         .<FluentdMessage>builder()
>>         .setBootstrapServers(kafkaServer)
>>         .setTopics(Arrays.asList("fluentd"))
>>         .setGroupId("")
>>         .setDeserializer(new FluentdRecordDeserializer())
>>         //.setStartingOffsets(OffsetsInitializer.earliest())
>>         //.setBounded(OffsetsInitializer.latest())
>>         .setUnbounded(OffsetsInitializer.latest())
>>         .build();
>>
>>
>>
>>
>> --
>> Robert Cullen
>> 240-475-4490
>>
>

-- 
Robert Cullen
240-475-4490

Reply via email to