> On 24 May 2021, at 10:43, Sozonoff Serge <se...@sozonoff.com> wrote:
> 
> OK thanks.  Just to clarify, in my case the message throughput is zero when I 
> start the Beam pipeline up and it will still crash once all file handles are 
> consumed even if I dont send a single message to the kafka topic.

This sounds like a bug for me even if it happens only with DirectRunner.  Mind 
you to provide a code of a pipeline and running command that reproduces this 
issue?

—
Alexey

> 
> Thanks,
> Serge
> 
> On 24 May 2021 at 10:14:33, Jan Lukavský (je...@seznam.cz 
> <mailto:je...@seznam.cz>) wrote:
> 
>> It is not 100 consumers, the checkpoint is created every 100 records. So, if 
>> your message throughput is high enough, the consumers might be created 
>> really often. But most importantly - DirectRunner is really not intended for 
>> performance sensitive applications. You should use a different runner for 
>> that.
>> 
>> Best,
>> 
>>  Jan
>> 
>> On 5/24/21 10:03 AM, Sozonoff Serge wrote:
>>> Hi Jan,
>>> 
>>> So if I read your SO answer correctly and looking at the Github link you 
>>> provided we are talking about ~100 consumers ? Since I am developing 
>>> locally with a dockerized minimal Kafka broker it is possible that this is 
>>> enough to hit the max open files limit. 
>>> 
>>> Depending on your definition of “limited” I would say there are more than a 
>>> limited number present at the same time. If you look at the below log 
>>> extract everyone of those “Kafka version: 2.5.0” lines corresponds to a 
>>> Kafka consumer instantiation and that’s within a very short period of time 
>>> !! 
>>> 
>>> Thanks,
>>> Serge
>>> 
>>> 
>>> 
>>> [INFO ] 2021-05-24 09:53:48.663 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:48.688 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:48.803 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:48.815 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:48.864 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:48.871 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:48.955 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:48.969 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:49.046 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:49.052 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:49.113 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:49.128 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:49.231 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:49.236 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:49.278 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:49.281 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:49.316 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:49.321 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:49.435 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:49.444 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:49.486 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:49.494 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:49.564 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:49.575 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:49.662 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:49.668 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:49.725 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:49.730 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:49.776 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:49.782 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:49.863 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:49.876 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:49.935 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:49.940 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:49.976 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:49.979 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:50.026 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:50.038 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:50.107 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:50.130 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:50.165 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:50.169 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:50.201 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:50.205 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:50.261 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:50.276 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:50.339 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:50.343 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:50.375 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:50.378 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:50.409 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:50.417 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:50.498 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:50.509 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:50.559 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:50.562 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:50.589 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:50.591 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:50.624 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:50.628 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:50.693 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:50.704 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:50.775 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:50.778 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:50.806 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:50.808 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:50.862 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:50.870 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:50.940 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:50.950 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:50.988 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:50.990 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:51.018 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:51.020 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:51.046 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:51.048 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:51.077 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:51.083 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:51.156 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:51.167 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:51.226 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:51.232 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> 
>>> 
>>> 
>>> 
>>> 
>>> On 24 May 2021 at 09:35:50, Jan Lukavský (je...@seznam.cz 
>>> <mailto:je...@seznam.cz>) wrote:
>>> 
>>>> Hi Serge,
>>>> 
>>>> I posted answer to the SO question, hope that helps. One question - a 
>>>> frequent creation of consumers should be expected with DirectRunner, but 
>>>> there should be only a limited number of them at a time. Do you see many 
>>>> of them present simultaneously? Or are they correctly closed and released?
>>>> 
>>>>  Jan
>>>> 
>>>> On 5/23/21 8:40 AM, Sozonoff Serge wrote:
>>>>> Hi,
>>>>> 
>>>>> I would like to refer to the following Stackoverflow issue I found.
>>>>> 
>>>>> https://stackoverflow.com/questions/56496611/apache-beam-kafka-consumer-restarted-over-and-over-again#new-answer
>>>>>  
>>>>> <https://stackoverflow.com/questions/56496611/apache-beam-kafka-consumer-restarted-over-and-over-again#new-answer>
>>>>> I have the very same issue when developing my Pipeline. Originally the 
>>>>> pipeline was bound and would read and process a CSV where the name came 
>>>>> from a parameter. Following some reading up on various patterns for being 
>>>>> able to process a new incoming file automatically I added a KafkaIO read 
>>>>> to the front of my pipeline to listen for messages which contain the name 
>>>>> of a file to be processed and then I pass this on to FileIO etc …. As 
>>>>> such my pipeline is now unbound.
>>>>> 
>>>>> My pipeline fails using DirectRunner once we have reached the maximum 
>>>>> number of open files!! Looking at the logging I see a very large number 
>>>>> of threads (consumers) which seem to be connecting to the Kafka broker 
>>>>> which makes no sense. I have a topic with a single partition!
>>>>> 
>>>>> 
>>>>> So literally 100’s of these. Notice the pool thread numbers and Consumer 
>>>>> client id's
>>>>> 
>>>>> [INFO ] 2021-05-23 08:37:56.511 [pool-82-thread-1] 
>>>>> kafka.clients.consumer.internals.SubscriptionState - [Consumer 
>>>>> clientId=consumer-Reader-0_offset_consumer_1203867505_report-processor--75,
>>>>>  groupId=Reader-0_offset_consumer_1203867505_report-processor-] Seeking 
>>>>> to LATEST offset of partition my_topic-0
>>>>> …..
>>>>> [INFO ] 2021-05-23 08:37:56.516 [pool-134-thread-1] 
>>>>> kafka.clients.consumer.internals.SubscriptionState - [Consumer 
>>>>> clientId=consumer-Reader-0_offset_consumer_1204976634_report-processor—127,
>>>>>  groupId=Reader-0_offset_consumer_1204976634_report-processor-] Seeking 
>>>>> to LATEST offset of partition my_topic-0
>>>>> ….
>>>>> [INFO ] 2021-05-23 08:37:56.517 [pool-48-thread-1] 
>>>>> kafka.clients.consumer.internals.SubscriptionState - [Consumer 
>>>>> clientId=consumer-Reader-0_offset_consumer_283343789_report-processor--41,
>>>>>  groupId=Reader-0_offset_consumer_283343789_report-processor-] Seeking to 
>>>>> LATEST offset of partition my_topic-0
>>>>> ….
>>>>> etc ….
>>>>> 
>>>>> 
>>>>> So my issue resembles the one which is described in the Stackoverflow and 
>>>>> I can confirm that switching to a Flink runner resolves the problem but 
>>>>> surely there is an explanation ? Is there a know bug with Direct Runner ?
>>>>> 
>>>>> Kind thanks,
>>>>> Serge

Reply via email to