Hi Marchant,

Yes I agree. In general, the isEndOfStream method has a very ill-defined 
semantic, with actually different behaviors across different Kafka connector 
versions.
This method will definitely need to be revisited in the future (we are thinking 
about a rework of the connector).

What is your target Kafka version? And do you know the ending offsets of _all_ 
partitions which you want to only consume a range of?
I can probably double check for you if your specific case is possible, given 
the above information.

Cheers,
Gordon

On 8 February 2018 at 3:22:24 PM, Marchant, Hayden (hayden.march...@citi.com) 
wrote:

Gordon,

 

Thanks for the pointer. I did some searches for usages of isEndOfStream and 
it’s a little confusing. I see that all implementors of DeserializationSchema 
must implement this method, but it’s not called from anyone central in the 
Flink streaming engine, but rather each source can decide to use this in it’s 
own implementation – for example Kafka stops processing the topic when 
isEndOfStream returns true. This is nice, but localizes the treatment just to 
that Operator, and, even though it goers a long way in ensuring that I get just 
my bounded data, it still does not give me the ability to stop my job when I 
have finished consuming the elements.

 

Also, in my case I need to ensure that I have reached a certain offset for each 
of the Kafka partitions that are assigned to the instance of source function. 
It seems from the code that I need a different implementation of 
KafkaFetcher.runFetchLoop that has slightly different logic for changing 
running to be false.

 

What would you recommend in this case?

 

From: Tzu-Li (Gordon) Tai [mailto:tzuli...@apache.org]  
Sent: Thursday, February 08, 2018 12:24 PM
To: user@flink.apache.org; Marchant, Hayden [ICG-IT] <hm97...@imceu.eu.ssmb.com>
Subject: Re: Kafka as source for batch job

 

Hi Hayden,

 

Have you tried looking into `KeyedDeserializationSchema#isEndOfStream` [1]?

I think that could be what you are looking for. It signals the end of the 
stream when consuming from Kafka.

 

Cheers,

Gordon

 

On 8 February 2018 at 10:44:59 AM, Marchant, Hayden (hayden.march...@citi.com) 
wrote:

I know that traditionally Kafka is used as a source for a streaming job. In our 
particular case, we are looking at extracting records from a Kafka topic from a 
particular well-defined offset range (per partition) - i.e. from offset X to 
offset Y. In this case, we'd somehow want the application to know that it has 
finished when it gets to offset Y. This is basically changes Kafka stream to be 
bounded data as opposed to unbounded in the usual Stream paradigm.  

What would be the best approach to do this in Flink? I see a few options, 
though there might be more:  

1. Use a regular streaming job, and have some external service that monitors 
the current offsets of the consumer group of the topic and manually stops job 
when the consumer group of the topic has finished  
Pros - simple wrt Flink, Cons - hacky  

2. Create a batch job, and a new InputFormat based on Kafka that reads the 
specified subset of Kafka topic into the source.  
Pros - represent bounded data from Kafka topic as batch source, Cons - requires 
implementation of source.  

3. Dump the subset of Kafka into a file and then trigger a more 'traditional' 
Flink batch job that reads from a file.  
Pros - simple, cons - unnecessary I/O.  

I personally prefer 1 and 3 for simplicity. Has anyone done anything like this 
before?  

Thanks,  
Hayden Marchant

Reply via email to