> On 24 May 2021, at 10:43, Sozonoff Serge <se...@sozonoff.com> wrote: > > OK thanks. Just to clarify, in my case the message throughput is zero when I > start the Beam pipeline up and it will still crash once all file handles are > consumed even if I dont send a single message to the kafka topic.
This sounds like a bug for me even if it happens only with DirectRunner. Mind you to provide a code of a pipeline and running command that reproduces this issue? — Alexey > > Thanks, > Serge > > On 24 May 2021 at 10:14:33, Jan Lukavský (je...@seznam.cz > <mailto:je...@seznam.cz>) wrote: > >> It is not 100 consumers, the checkpoint is created every 100 records. So, if >> your message throughput is high enough, the consumers might be created >> really often. But most importantly - DirectRunner is really not intended for >> performance sensitive applications. You should use a different runner for >> that. >> >> Best, >> >> Jan >> >> On 5/24/21 10:03 AM, Sozonoff Serge wrote: >>> Hi Jan, >>> >>> So if I read your SO answer correctly and looking at the Github link you >>> provided we are talking about ~100 consumers ? Since I am developing >>> locally with a dockerized minimal Kafka broker it is possible that this is >>> enough to hit the max open files limit. >>> >>> Depending on your definition of “limited” I would say there are more than a >>> limited number present at the same time. If you look at the below log >>> extract everyone of those “Kafka version: 2.5.0” lines corresponds to a >>> Kafka consumer instantiation and that’s within a very short period of time >>> !! >>> >>> Thanks, >>> Serge >>> >>> >>> >>> [INFO ] 2021-05-24 09:53:48.663 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:48.688 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:48.803 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:48.815 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:48.864 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:48.871 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:48.955 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:48.969 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:49.046 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:49.052 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:49.113 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:49.128 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:49.231 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:49.236 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:49.278 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:49.281 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:49.316 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:49.321 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:49.435 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:49.444 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:49.486 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:49.494 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:49.564 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:49.575 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:49.662 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:49.668 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:49.725 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:49.730 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:49.776 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:49.782 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:49.863 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:49.876 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:49.935 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:49.940 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:49.976 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:49.979 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:50.026 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:50.038 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:50.107 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:50.130 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:50.165 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:50.169 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:50.201 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:50.205 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:50.261 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:50.276 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:50.339 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:50.343 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:50.375 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:50.378 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:50.409 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:50.417 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:50.498 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:50.509 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:50.559 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:50.562 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:50.589 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:50.591 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:50.624 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:50.628 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:50.693 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:50.704 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:50.775 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:50.778 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:50.806 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:50.808 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:50.862 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:50.870 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:50.940 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:50.950 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:50.988 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:50.990 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:51.018 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:51.020 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:51.046 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:51.048 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:51.077 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:51.083 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:51.156 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:51.167 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:51.226 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> [INFO ] 2021-05-24 09:53:51.232 [direct-runner-worker] >>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0 >>> >>> >>> >>> >>> >>> On 24 May 2021 at 09:35:50, Jan Lukavský (je...@seznam.cz >>> <mailto:je...@seznam.cz>) wrote: >>> >>>> Hi Serge, >>>> >>>> I posted answer to the SO question, hope that helps. One question - a >>>> frequent creation of consumers should be expected with DirectRunner, but >>>> there should be only a limited number of them at a time. Do you see many >>>> of them present simultaneously? Or are they correctly closed and released? >>>> >>>> Jan >>>> >>>> On 5/23/21 8:40 AM, Sozonoff Serge wrote: >>>>> Hi, >>>>> >>>>> I would like to refer to the following Stackoverflow issue I found. >>>>> >>>>> https://stackoverflow.com/questions/56496611/apache-beam-kafka-consumer-restarted-over-and-over-again#new-answer >>>>> >>>>> <https://stackoverflow.com/questions/56496611/apache-beam-kafka-consumer-restarted-over-and-over-again#new-answer> >>>>> I have the very same issue when developing my Pipeline. Originally the >>>>> pipeline was bound and would read and process a CSV where the name came >>>>> from a parameter. Following some reading up on various patterns for being >>>>> able to process a new incoming file automatically I added a KafkaIO read >>>>> to the front of my pipeline to listen for messages which contain the name >>>>> of a file to be processed and then I pass this on to FileIO etc …. As >>>>> such my pipeline is now unbound. >>>>> >>>>> My pipeline fails using DirectRunner once we have reached the maximum >>>>> number of open files!! Looking at the logging I see a very large number >>>>> of threads (consumers) which seem to be connecting to the Kafka broker >>>>> which makes no sense. I have a topic with a single partition! >>>>> >>>>> >>>>> So literally 100’s of these. Notice the pool thread numbers and Consumer >>>>> client id's >>>>> >>>>> [INFO ] 2021-05-23 08:37:56.511 [pool-82-thread-1] >>>>> kafka.clients.consumer.internals.SubscriptionState - [Consumer >>>>> clientId=consumer-Reader-0_offset_consumer_1203867505_report-processor--75, >>>>> groupId=Reader-0_offset_consumer_1203867505_report-processor-] Seeking >>>>> to LATEST offset of partition my_topic-0 >>>>> ….. >>>>> [INFO ] 2021-05-23 08:37:56.516 [pool-134-thread-1] >>>>> kafka.clients.consumer.internals.SubscriptionState - [Consumer >>>>> clientId=consumer-Reader-0_offset_consumer_1204976634_report-processor—127, >>>>> groupId=Reader-0_offset_consumer_1204976634_report-processor-] Seeking >>>>> to LATEST offset of partition my_topic-0 >>>>> …. >>>>> [INFO ] 2021-05-23 08:37:56.517 [pool-48-thread-1] >>>>> kafka.clients.consumer.internals.SubscriptionState - [Consumer >>>>> clientId=consumer-Reader-0_offset_consumer_283343789_report-processor--41, >>>>> groupId=Reader-0_offset_consumer_283343789_report-processor-] Seeking to >>>>> LATEST offset of partition my_topic-0 >>>>> …. >>>>> etc …. >>>>> >>>>> >>>>> So my issue resembles the one which is described in the Stackoverflow and >>>>> I can confirm that switching to a Flink runner resolves the problem but >>>>> surely there is an explanation ? Is there a know bug with Direct Runner ? >>>>> >>>>> Kind thanks, >>>>> Serge