I see that there is an issue with caching of UnboundedReaders in the UnboundedSourceAsSdfWrapper which is used by the v2 dataflow runner for executing UnboundedSources (which JMS source is). This is https://github.com/apache/beam/issues/32968. This could result in more connections as each DoFn is separately maintaining a cache and there is not affinity on a worker between cached DoFn instances and SDF splits scheduled. I am sending out https://github.com/apache/beam/pull/33901 to share the cache across dofn instances of the same step within a worker.
Another issue that I see is that the sources hands off Session and consumer to generated checkpoints. https://github.com/apache/beam/blob/master/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java#L763 If the checkpoint is finalized then the session and consumer are closed after acknowledging the message: https://github.com/apache/beam/blob/master/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java#L61 This works well in the happy case where checkpoints are finalized but appears that it will leak sessions and consumers if the checkpoint is unsuccessful and the finalize method is never called. https://github.com/apache/beam/blob/9dd1f684a4f7733fac4634c35733d7aa03b72539/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java#L139 This can occur in dataflow if the commit to the service fails for various reasons such as load-balancing, autoscaling, retries etc. This seems like a limitation in the CheckpointMark interface. SDKs and runners may know when a checkpoint fails explicitly or may give up on caching a checkpoint for possible finalization after a timeout. In either case they could explicitly notify the CheckpointMark that it is being abandoned. A workaround would be to have a static set of pending checkpoint marks and some timeout/nack that JMSIO manages itself. On Tue, Jan 28, 2025 at 10:20 PM Radek Stankiewicz via user < user@beam.apache.org> wrote: > Zack, > > Below you can find some code snippets for autoscaler. > > https://github.com/tilgalas/ibm-mq-sample - my coworker recently released > this > > > https://github.com/tilgalas/ibm-mq-sample/blob/main/jms-pipeline/src/main/java/com/google/dce/pipelines/PipelineBuilders.java#L37 > - > jms IO setup, message size observer > > https://github.com/tilgalas/ibm-mq-sample/blob/main/jms-pipeline/src/main/java/com/google/dce/autoscaler/RestAutoScaler.java > - > autoscaler > > https://github.com/tilgalas/ibm-mq-sample/blob/main/jms-pipeline/src/main/java/com/google/dce/metrics/AverageMetric.java > - > moving average > > https://github.com/tilgalas/ibm-mq-sample/blob/main/jms-pipeline/src/main/java/com/google/dce/globals/GuiceInitializer.java > - > a bit of bagic/plumbing how to calculate average payload size > > Would you be able to try the pipeline with Runner v1? > Feel free to raise a support case to Google Cloud to get more help with > debugging this issue. > > Radek > > > On Tue, Jan 28, 2025 at 9:12 PM Jean-Baptiste Onofré <j...@nanthrax.net> > wrote: > >> I'm very sorry, I was not active for a while on Beam (my bad). But I'm >> still happy to help. I will read the whole thread first :) >> >> Regards >> JB >> >> On Tue, Jan 28, 2025 at 9:00 PM Radek Stankiewicz <radosl...@google.com> >> wrote: >> > >> > Hi JB, >> > >> > Thanks for that! >> > >> > Zack, I think you need to implement autoscaler otherwise Dataflow won't >> know how to move splits between the workers. >> > In your pipeline, have you noticed any imbalance between the workers? >> > >> > >> > >> > Radek >> > >> > wt., 28 sty 2025, 20:54 użytkownik Jean-Baptiste Onofré < >> j...@nanthrax.net> napisał: >> >> >> >> Hi >> >> >> >> Sorry for jumping late on this thread :) >> >> As I worked on the JmsIO while ago (I'm the original author :)), happy >> >> to help if I can. >> >> >> >> Regards >> >> JB >> >> >> >> On Tue, Jan 28, 2025 at 6:10 PM Zack Culberson >> >> <zack.culber...@albertsons.com> wrote: >> >> > >> >> > Hi Radek, >> >> > >> >> > I wouldn’t say it was empty but the queue was receiving about a 1/3 >> to ¼ of the message one of the other Queue Managers was getting. But those >> connections were still going up. I am using dataflow runner v2 and 2.61 >> sdk. We saw the connections on MeshIq which we use to monitor the queues >> and see connection count and if there are any stuck messages. >> >> > >> >> > thank you >> >> > >> >> > Zack Culberson >> >> > >> >> > >> >> > >> >> > From: Radek Stankiewicz <radosl...@google.com> >> >> > Sent: Tuesday, January 28, 2025 9:15 AM >> >> > To: Ahmet Altay <al...@google.com> >> >> > Cc: user@beam.apache.org; Yi Hu <ya...@google.com>; Zack Culberson < >> zack.culber...@albertsons.com> >> >> > Subject: EXTERNAL Email: Re: JMSIO support >> >> > >> >> > >> >> > >> >> > hey Zack, >> >> > >> >> > JMSIO reader reads messages in a loop and it looks like whenever >> there is a checkpoint Beam recreates the connection. Maybe it takes some >> time for the driver and MQ to finalize the connection. >> >> > >> >> > Can you confirm which dataflow runner and sdk you are using? >> >> > >> >> > When you observed 2000 connections, do I understand correctly that >> this QM had empty queues? >> >> > >> >> > Were 2000 connections reported by QM monitoring? >> >> > >> >> > >> >> > >> >> > Radek >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > On Tue, Jan 28, 2025 at 1:28 AM Ahmet Altay <al...@google.com> >> wrote: >> >> > >> >> > I do not know the answer. Adding @Radek Stankiewicz & @Yi Hu in case >> they might be able to help. >> >> > >> >> > >> >> > >> >> > On Mon, Jan 27, 2025 at 2:55 PM Zack Culberson < >> zack.culber...@albertsons.com> wrote: >> >> > >> >> > Hi all, >> >> > >> >> > I was wondering if anyone would know, we are using the JMS IO read >> to read from 3 IBM MQ’s. We setup the job to have a configurable amount of >> readers, which right now is 9 so three readers per mq. Our question came >> that at one point we were seeing the number of connections for one of our >> Queue managers was going up to 2000 connections but all of the messages >> were going to another Queue Manager whose connections stayed at about a >> constant 100. Could some one help me understand why that is and does the >> JMS leave the connection open when it connects to read or does it open and >> close every time it connects ? >> >> > >> >> > Thank you >> >> > >> >> > >> >> > >> >> > Zack Culberson >> >> > >> >> > >> >> > >> >> > ________________________________ >> >> > >> >> > Warning: All e-mail sent to this address will be received by the >> corporate e-mail system, and is subject to archival and review by someone >> other than the recipient. This e-mail may contain proprietary information >> and is intended only for the use of the intended recipient(s). If the >> reader of this message is not the intended recipient(s), you are notified >> that you have received this message in error and that any review, >> dissemination, distribution or copying of this message is strictly >> prohibited. If you have received this message in error, please notify the >> sender immediately. >> >> > >> >> > ________________________________ >> >> > >> >> > ________________________________ >> >> > Warning: All e-mail sent to this address will be received by the >> corporate e-mail system, and is subject to archival and review by someone >> other than the recipient. This e-mail may contain proprietary information >> and is intended only for the use of the intended recipient(s). If the >> reader of this message is not the intended recipient(s), you are notified >> that you have received this message in error and that any review, >> dissemination, distribution or copying of this message is strictly >> prohibited. If you have received this message in error, please notify the >> sender immediately. >> >> > ________________________________ >> >