RE: Kafka as source for batch job

2018-02-08 Thread Marchant, Hayden
Forget to mention that my target Kafka version is 0.11.x  with aim to upgrade 
to 1.0 when 1.0.x fixpack is released.

From: Tzu-Li (Gordon) Tai [mailto:tzuli...@apache.org]
Sent: Thursday, February 08, 2018 8:05 PM
To: user@flink.apache.org; Marchant, Hayden [ICG-IT] <hm97...@imceu.eu.ssmb.com>
Subject: RE: Kafka as source for batch job

Hi Marchant,

Yes I agree. In general, the isEndOfStream method has a very ill-defined 
semantic, with actually different behaviors across different Kafka connector 
versions.
This method will definitely need to be revisited in the future (we are thinking 
about a rework of the connector).

What is your target Kafka version? And do you know the ending offsets of _all_ 
partitions which you want to only consume a range of?
I can probably double check for you if your specific case is possible, given 
the above information.

Cheers,
Gordon


On 8 February 2018 at 3:22:24 PM, Marchant, Hayden 
(hayden.march...@citi.com<mailto:hayden.march...@citi.com>) wrote:
Gordon,

Thanks for the pointer. I did some searches for usages of isEndOfStream and 
it’s a little confusing. I see that all implementors of DeserializationSchema 
must implement this method, but it’s not called from anyone central in the 
Flink streaming engine, but rather each source can decide to use this in it’s 
own implementation – for example Kafka stops processing the topic when 
isEndOfStream returns true. This is nice, but localizes the treatment just to 
that Operator, and, even though it goers a long way in ensuring that I get just 
my bounded data, it still does not give me the ability to stop my job when I 
have finished consuming the elements.

Also, in my case I need to ensure that I have reached a certain offset for each 
of the Kafka partitions that are assigned to the instance of source function. 
It seems from the code that I need a different implementation of 
KafkaFetcher.runFetchLoop that has slightly different logic for changing 
running to be false.

What would you recommend in this case?

From: Tzu-Li (Gordon) Tai [mailto:tzuli...@apache.org]
Sent: Thursday, February 08, 2018 12:24 PM
To: user@flink.apache.org<mailto:user@flink.apache.org>; Marchant, Hayden 
[ICG-IT] <hm97...@imceu.eu.ssmb.com<mailto:hm97...@imceu.eu.ssmb.com>>
Subject: Re: Kafka as source for batch job

Hi Hayden,

Have you tried looking into `KeyedDeserializationSchema#isEndOfStream` [1]?
I think that could be what you are looking for. It signals the end of the 
stream when consuming from Kafka.

Cheers,
Gordon


On 8 February 2018 at 10:44:59 AM, Marchant, Hayden 
(hayden.march...@citi.com<mailto:hayden.march...@citi.com>) wrote:
I know that traditionally Kafka is used as a source for a streaming job. In our 
particular case, we are looking at extracting records from a Kafka topic from a 
particular well-defined offset range (per partition) - i.e. from offset X to 
offset Y. In this case, we'd somehow want the application to know that it has 
finished when it gets to offset Y. This is basically changes Kafka stream to be 
bounded data as opposed to unbounded in the usual Stream paradigm.

What would be the best approach to do this in Flink? I see a few options, 
though there might be more:

1. Use a regular streaming job, and have some external service that monitors 
the current offsets of the consumer group of the topic and manually stops job 
when the consumer group of the topic has finished
Pros - simple wrt Flink, Cons - hacky

2. Create a batch job, and a new InputFormat based on Kafka that reads the 
specified subset of Kafka topic into the source.
Pros - represent bounded data from Kafka topic as batch source, Cons - requires 
implementation of source.

3. Dump the subset of Kafka into a file and then trigger a more 'traditional' 
Flink batch job that reads from a file.
Pros - simple, cons - unnecessary I/O.

I personally prefer 1 and 3 for simplicity. Has anyone done anything like this 
before?

Thanks,
Hayden Marchant


RE: Kafka as source for batch job

2018-02-08 Thread Marchant, Hayden
Hi Gordon,

Actually our use case is that we have start/end timestamp, and we plan on 
calling KafkaConsumer.offsetForTimes to get the offsets for each partition. So, 
I guess our logic is different in that we have an ‘and’ predicate between each 
partition arriving at offset, as opposed to the current ‘or’ predicate – i.e. 
any partition that fulfills a condition is enough to stop the job.

Either way, I’d still need to figure out when to stop the job.

Would it make more sense to implement an InputFormat that could wrap this 
‘bounded’ Kafka source, and use the DataSet / Batch Table API ?

Thanks
Hayden

From: Tzu-Li (Gordon) Tai [mailto:tzuli...@apache.org]
Sent: Thursday, February 08, 2018 8:05 PM
To: user@flink.apache.org; Marchant, Hayden [ICG-IT] <hm97...@imceu.eu.ssmb.com>
Subject: RE: Kafka as source for batch job

Hi Marchant,

Yes I agree. In general, the isEndOfStream method has a very ill-defined 
semantic, with actually different behaviors across different Kafka connector 
versions.
This method will definitely need to be revisited in the future (we are thinking 
about a rework of the connector).

What is your target Kafka version? And do you know the ending offsets of _all_ 
partitions which you want to only consume a range of?
I can probably double check for you if your specific case is possible, given 
the above information.

Cheers,
Gordon


On 8 February 2018 at 3:22:24 PM, Marchant, Hayden 
(hayden.march...@citi.com<mailto:hayden.march...@citi.com>) wrote:
Gordon,

Thanks for the pointer. I did some searches for usages of isEndOfStream and 
it’s a little confusing. I see that all implementors of DeserializationSchema 
must implement this method, but it’s not called from anyone central in the 
Flink streaming engine, but rather each source can decide to use this in it’s 
own implementation – for example Kafka stops processing the topic when 
isEndOfStream returns true. This is nice, but localizes the treatment just to 
that Operator, and, even though it goers a long way in ensuring that I get just 
my bounded data, it still does not give me the ability to stop my job when I 
have finished consuming the elements.

Also, in my case I need to ensure that I have reached a certain offset for each 
of the Kafka partitions that are assigned to the instance of source function. 
It seems from the code that I need a different implementation of 
KafkaFetcher.runFetchLoop that has slightly different logic for changing 
running to be false.

What would you recommend in this case?

From: Tzu-Li (Gordon) Tai [mailto:tzuli...@apache.org]
Sent: Thursday, February 08, 2018 12:24 PM
To: user@flink.apache.org<mailto:user@flink.apache.org>; Marchant, Hayden 
[ICG-IT] <hm97...@imceu.eu.ssmb.com<mailto:hm97...@imceu.eu.ssmb.com>>
Subject: Re: Kafka as source for batch job

Hi Hayden,

Have you tried looking into `KeyedDeserializationSchema#isEndOfStream` [1]?
I think that could be what you are looking for. It signals the end of the 
stream when consuming from Kafka.

Cheers,
Gordon


On 8 February 2018 at 10:44:59 AM, Marchant, Hayden 
(hayden.march...@citi.com<mailto:hayden.march...@citi.com>) wrote:
I know that traditionally Kafka is used as a source for a streaming job. In our 
particular case, we are looking at extracting records from a Kafka topic from a 
particular well-defined offset range (per partition) - i.e. from offset X to 
offset Y. In this case, we'd somehow want the application to know that it has 
finished when it gets to offset Y. This is basically changes Kafka stream to be 
bounded data as opposed to unbounded in the usual Stream paradigm.

What would be the best approach to do this in Flink? I see a few options, 
though there might be more:

1. Use a regular streaming job, and have some external service that monitors 
the current offsets of the consumer group of the topic and manually stops job 
when the consumer group of the topic has finished
Pros - simple wrt Flink, Cons - hacky

2. Create a batch job, and a new InputFormat based on Kafka that reads the 
specified subset of Kafka topic into the source.
Pros - represent bounded data from Kafka topic as batch source, Cons - requires 
implementation of source.

3. Dump the subset of Kafka into a file and then trigger a more 'traditional' 
Flink batch job that reads from a file.
Pros - simple, cons - unnecessary I/O.

I personally prefer 1 and 3 for simplicity. Has anyone done anything like this 
before?

Thanks,
Hayden Marchant


RE: Kafka as source for batch job

2018-02-08 Thread Tzu-Li (Gordon) Tai
Hi Marchant,

Yes I agree. In general, the isEndOfStream method has a very ill-defined 
semantic, with actually different behaviors across different Kafka connector 
versions.
This method will definitely need to be revisited in the future (we are thinking 
about a rework of the connector).

What is your target Kafka version? And do you know the ending offsets of _all_ 
partitions which you want to only consume a range of?
I can probably double check for you if your specific case is possible, given 
the above information.

Cheers,
Gordon

On 8 February 2018 at 3:22:24 PM, Marchant, Hayden (hayden.march...@citi.com) 
wrote:

Gordon,

 

Thanks for the pointer. I did some searches for usages of isEndOfStream and 
it’s a little confusing. I see that all implementors of DeserializationSchema 
must implement this method, but it’s not called from anyone central in the 
Flink streaming engine, but rather each source can decide to use this in it’s 
own implementation – for example Kafka stops processing the topic when 
isEndOfStream returns true. This is nice, but localizes the treatment just to 
that Operator, and, even though it goers a long way in ensuring that I get just 
my bounded data, it still does not give me the ability to stop my job when I 
have finished consuming the elements.

 

Also, in my case I need to ensure that I have reached a certain offset for each 
of the Kafka partitions that are assigned to the instance of source function. 
It seems from the code that I need a different implementation of 
KafkaFetcher.runFetchLoop that has slightly different logic for changing 
running to be false.

 

What would you recommend in this case?

 

From: Tzu-Li (Gordon) Tai [mailto:tzuli...@apache.org]  
Sent: Thursday, February 08, 2018 12:24 PM
To: user@flink.apache.org; Marchant, Hayden [ICG-IT] <hm97...@imceu.eu.ssmb.com>
Subject: Re: Kafka as source for batch job

 

Hi Hayden,

 

Have you tried looking into `KeyedDeserializationSchema#isEndOfStream` [1]?

I think that could be what you are looking for. It signals the end of the 
stream when consuming from Kafka.

 

Cheers,

Gordon

 

On 8 February 2018 at 10:44:59 AM, Marchant, Hayden (hayden.march...@citi.com) 
wrote:

I know that traditionally Kafka is used as a source for a streaming job. In our 
particular case, we are looking at extracting records from a Kafka topic from a 
particular well-defined offset range (per partition) - i.e. from offset X to 
offset Y. In this case, we'd somehow want the application to know that it has 
finished when it gets to offset Y. This is basically changes Kafka stream to be 
bounded data as opposed to unbounded in the usual Stream paradigm.  

What would be the best approach to do this in Flink? I see a few options, 
though there might be more:  

1. Use a regular streaming job, and have some external service that monitors 
the current offsets of the consumer group of the topic and manually stops job 
when the consumer group of the topic has finished  
Pros - simple wrt Flink, Cons - hacky  

2. Create a batch job, and a new InputFormat based on Kafka that reads the 
specified subset of Kafka topic into the source.  
Pros - represent bounded data from Kafka topic as batch source, Cons - requires 
implementation of source.  

3. Dump the subset of Kafka into a file and then trigger a more 'traditional' 
Flink batch job that reads from a file.  
Pros - simple, cons - unnecessary I/O.  

I personally prefer 1 and 3 for simplicity. Has anyone done anything like this 
before?  

Thanks,  
Hayden Marchant

RE: Kafka as source for batch job

2018-02-08 Thread Marchant, Hayden
Gordon,

Thanks for the pointer. I did some searches for usages of isEndOfStream and 
it’s a little confusing. I see that all implementors of DeserializationSchema 
must implement this method, but it’s not called from anyone central in the 
Flink streaming engine, but rather each source can decide to use this in it’s 
own implementation – for example Kafka stops processing the topic when 
isEndOfStream returns true. This is nice, but localizes the treatment just to 
that Operator, and, even though it goers a long way in ensuring that I get just 
my bounded data, it still does not give me the ability to stop my job when I 
have finished consuming the elements.

Also, in my case I need to ensure that I have reached a certain offset for each 
of the Kafka partitions that are assigned to the instance of source function. 
It seems from the code that I need a different implementation of 
KafkaFetcher.runFetchLoop that has slightly different logic for changing 
running to be false.

What would you recommend in this case?

From: Tzu-Li (Gordon) Tai [mailto:tzuli...@apache.org]
Sent: Thursday, February 08, 2018 12:24 PM
To: user@flink.apache.org; Marchant, Hayden [ICG-IT] <hm97...@imceu.eu.ssmb.com>
Subject: Re: Kafka as source for batch job

Hi Hayden,

Have you tried looking into `KeyedDeserializationSchema#isEndOfStream` [1]?
I think that could be what you are looking for. It signals the end of the 
stream when consuming from Kafka.

Cheers,
Gordon


On 8 February 2018 at 10:44:59 AM, Marchant, Hayden 
(hayden.march...@citi.com<mailto:hayden.march...@citi.com>) wrote:
I know that traditionally Kafka is used as a source for a streaming job. In our 
particular case, we are looking at extracting records from a Kafka topic from a 
particular well-defined offset range (per partition) - i.e. from offset X to 
offset Y. In this case, we'd somehow want the application to know that it has 
finished when it gets to offset Y. This is basically changes Kafka stream to be 
bounded data as opposed to unbounded in the usual Stream paradigm.

What would be the best approach to do this in Flink? I see a few options, 
though there might be more:

1. Use a regular streaming job, and have some external service that monitors 
the current offsets of the consumer group of the topic and manually stops job 
when the consumer group of the topic has finished
Pros - simple wrt Flink, Cons - hacky

2. Create a batch job, and a new InputFormat based on Kafka that reads the 
specified subset of Kafka topic into the source.
Pros - represent bounded data from Kafka topic as batch source, Cons - requires 
implementation of source.

3. Dump the subset of Kafka into a file and then trigger a more 'traditional' 
Flink batch job that reads from a file.
Pros - simple, cons - unnecessary I/O.

I personally prefer 1 and 3 for simplicity. Has anyone done anything like this 
before?

Thanks,
Hayden Marchant


Re: Kafka as source for batch job

2018-02-08 Thread Tzu-Li (Gordon) Tai
Hi Hayden,

Have you tried looking into `KeyedDeserializationSchema#isEndOfStream` [1]?
I think that could be what you are looking for. It signals the end of the 
stream when consuming from Kafka.

Cheers,
Gordon

On 8 February 2018 at 10:44:59 AM, Marchant, Hayden (hayden.march...@citi.com) 
wrote:

I know that traditionally Kafka is used as a source for a streaming job. In our 
particular case, we are looking at extracting records from a Kafka topic from a 
particular well-defined offset range (per partition) - i.e. from offset X to 
offset Y. In this case, we'd somehow want the application to know that it has 
finished when it gets to offset Y. This is basically changes Kafka stream to be 
bounded data as opposed to unbounded in the usual Stream paradigm. 

What would be the best approach to do this in Flink? I see a few options, 
though there might be more: 

1. Use a regular streaming job, and have some external service that monitors 
the current offsets of the consumer group of the topic and manually stops job 
when the consumer group of the topic has finished 
Pros - simple wrt Flink, Cons - hacky 

2. Create a batch job, and a new InputFormat based on Kafka that reads the 
specified subset of Kafka topic into the source. 
Pros - represent bounded data from Kafka topic as batch source, Cons - requires 
implementation of source. 

3. Dump the subset of Kafka into a file and then trigger a more 'traditional' 
Flink batch job that reads from a file. 
Pros - simple, cons - unnecessary I/O. 

I personally prefer 1 and 3 for simplicity. Has anyone done anything like this 
before? 

Thanks, 
Hayden Marchant