Did you get this sorted or do you still need help?

Justin

On Mon, Nov 22, 2021 at 1:58 AM hannes bauer <hannes.bau...@gmx.net> wrote:

> Sorry for causing that confusion. We have switched to Artemis in the
> meantime.
>
> As for our implementation in general.
> We are using one class handling the sending of messages.
> In this class we instantiate an ActiveMQConnectionFactory and set the the
> incoming and outgoing interceptor list.
> Then we create a message producer and send the message.
>
>         String incomingInterceptorClassName =
> JmsIncomingMessageInterceptor.class.getName();
>         String outgoingInterceptorClassName =
> JmsOutgoingMessageInterceptor.class.getName();
>         String queueName = "testQueue";
>         ActiveMQConnectionFactory connectionFactory = new
> ActiveMQConnectionFactory(<url>, <username>, <password>);
>
> connectionFactory.getServerLocator().setIncomingInterceptorList(incomingInterceptorClassName);
>
> connectionFactory.getServerLocator().setOutgoingInterceptorList(outgoingInterceptorClassName);
>
>         Connection connection = connectionFactory.createConnection();
>         connection.start();
>
>         Session session = connection.createSession(false,
> Session.AUTO_ACKNOWLEDGE);
>         Queue queue = session.createQueue(queueName);
>
>         MessageProducer messageProducer = session.createProducer(queue);
>
>         MapMessage message = createMessage(messageDTO);
>         messageProducer.send(message);
>
> And now the interceptors
> The working outgoing one is implemented as follows
>
> public class JmsOutgoingMessageInterceptor implements
> org.apache.activemq.artemis.api.core.Interceptor {
>     private static final Logger logger =
> LoggerFactory.getLogger(JmsOutgoingMessageInterceptor.class);
>
>     @Override
>     public boolean intercept(Packet packet, RemotingConnection
> remotingConnection) throws ActiveMQException {
>         boolean letPass = true;
>         if (packet instanceof SessionSendMessage) {
>                 // custom check to see if messages need to be intercepted
>                 letPass = false;
>             }
>         }
>         return letPass;
>     }
>
> }
>
> The idea for the outgoing one is to do the same just for incoming messages
> instead of outgoing ones. So my understanding was that I need to use the
> SessionReceiveMessage.
>
> public class JmsIncomingMessageInterceptor implements
> org.apache.activemq.artemis.api.core.Interceptor {
>     private static final Logger logger =
> LoggerFactory.getLogger(JmsIncomingMessageInterceptor.class);
>
>     @Override
>     public boolean intercept(Packet packet, RemotingConnection
> remotingConnection) throws ActiveMQException {
>         boolean letPass = true;
>         if (packet instanceof SessionReceiveMessage) {
>                 // custom check to see if messages need to be intercepted
>                 letPass = false;
>             }
>         }
>         return letPass;
>     }
>
> }
>
> Hannes
>
>
>
> Gesendet: Samstag, 20. November 2021 um 00:43 Uhr
> Von: "Justin Bertram" <jbert...@apache.org>
> An: users@activemq.apache.org
> Betreff: Re: Re: pause acceptance and delivery of messages in ActiveMQ
> 5.16.3
> I'm confused about the nature of your use-case. You originally said that
> you were using ActiveMQ 5.16.3, but in this latest email you mention
> intercepting things like ClusterTopologyChangeMessage_V4,
> CreateSessionMessage_V2, CreateSessionResponseMessage, etc. which are used
> exclusively by ActiveMQ Artemis as part of the core protocol. Can you
> clarify whether you're using ActiveMQ "Classic" (i.e. 5.x) or Artemis? Can
> you elaborate on your interceptor implementation also? Thanks!
>
>
> Justin
>
> On Fri, Nov 19, 2021 at 3:52 PM hannes bauer <hannes.bau...@gmx.net>
> wrote:
>
> > Thank you very much for the possible solutions.
> > I finally was able to try the approach with the interceptor and it works
> > great for intercepting messages send to the broker using
> SessionSendMessage
> > to filter the messages I want to intercept.
> >
> > Unfortunately, it does not intercept messages sent from the broker to the
> > message driven bean.
> >
> > I configured the interceptor to intercept incoming and outgoing messages
> > and it only “catches” the following types.
> >
> > Ping
> >
> > ClusterTopologyChangeMessage_V4
> > CreateSessionMessage_V2
> >
> > CreateSessionResponseMessage
> >
> > SessionAddMetaDataMessageV2
> >
> > NullResponseMessage_V2
> >
> > PacketImpl
> >
> > SessionQueueQueryMessage
> >
> > SessionQueueQueryResponseMessage_V3
> >
> > SessionBindingQueryResponseMessage_V4
> >
> > SessionQueueQueryResponseMessage_V3
> >
> > SessionRequestProducerCreditsMessage
> >
> > SessionSendMessage_V2
> >
> > NullResponseMessage_V2
> >
> > SessionCloseMessage
> >
> >
> >
> > The SessionReceiveMessage used in the examples is never found.
> >
> > Could that be a problem with the resource connector?
> >
> > Currently we are still using the activemq-rar-5.16.3 resource adapter
> > because I was not able to find an artemis version.
> >
> > Or does the incoming interceptor only work when using a MessageConsumer?
> >
> >
> >
> > Thank you very much for helping me with those options I am not sure if I
> > would have found them on my own in a timely manner.
> >
> > Best regards
> > Hannes
> >
> >
> >
> >
> >
> > On 2021/10/26 12:32:24 Tim Bain wrote:
> > > The success of that approach would depend on how OP's clients are
> > > interacting with the broker.
> > >
> > > If all producers are OpenWire connect and stay connected forever with
> no
> > > disconnects and no producers coming and going, then this manual process
> > > could work. If producers come and go (including due to auto-scaling or
> > > producer process instability or restarting the broker), or if they
> > produce
> > > messages by making a new connection each time (including if messages
> are
> > > produced via STOMP, which I believe is always connectionless), it
> won't.
> > >
> > > If you need to be sure it works in the face of any of the things I
> > listed,
> > > I'd continue to recommend implementing this as a plugin. If you're sure
> > you
> > > have a stable environment, or if best-effort is good enough, then the
> JMX
> > > approach sounds possible.
> > >
> > > Tim
> > >
> > > If you need to be sure this works co
> > >
> > > On Tue, Oct 26, 2021, 2:39 AM Simon Lundström <si...@su.se> wrote:
> > >
> > > > Isn't it also possible to stop the client connector via JMX?
> > > >
> > > > org.apache.activemq:type=Broker,brokerName=esb-test-mq04.it.su.se
> > ,connector=clientConnectors,connectorName=ssl
> > > >
> > > > has a stop operation.
> > > >
> > > > Does this not work the way OP intends?
> > > >
> > > > Pause queue(s), stop connector(s). No messages in or out?
> > > >
> > > > BR,
> > > > - Simon
> > > >
> > > > On Fri, 2021-10-22 at 04:42:18 +0200, Tim Bain wrote:
> > > > >I believe it would be possible to write a custom interceptor (see
> > > > https://activemq.apache.org/interceptors) that rejected incoming
> > messages
> > > > and incoming connections. (Maybe rejecting incoming connections would
> > be
> > > > enough, because you can't send a message without an established
> > > > connection.) If you restart the broker when you apply it, that should
> > give
> > > > you what you need.
> > > > >
> > > > >Alternatively, you could put a reverse proxy such as Nginx in front
> of
> > > > the broker, and then modify its configuration to disable/re-enable
> the
> > > > OpenWire port. Then JMX and web console connections could go through
> > but
> > > > protocol connections would be stopped.
> > > > >
> > > > >Tim
> > > > >
> > > > >On Thu, Oct 21, 2021, 2:47 AM Bauer, Hannes <h...@tangro.de<mailto:
> > > > h...@tangro.de>> wrote:
> > > > >Hello,
> > > > >I am working on an application that is using ActiveMQ 5.16.3 and I
> am
> > > > tasked with implementing a feature that should pause the whole
> message
> > > > process.
> > > > >
> > > > >In the application we have producers and message driven beans as
> > > > consumers. The goal is to stop the consumption of messages as well as
> > the
> > > > acceptance of new messages in the queues.
> > > > >At first, I was hoping that pausing a queue would do exactly that,
> > but I
> > > > quickly learned that that only stops messages from being delivered.
> > > > >And now I need an option to also prevent queues from accepting new
> > > > messages.
> > > > >
> > > > >What I am trying to achieve in the end is somewhat like this
> existing
> > > > thread
> > > > >
> > > >
> >
> https://lists.apache.org/thread.html/r615e1b6c2cdf6341ac69935f24190a865147b32e3b95dfb35c86383e%40%3Cusers.activemq.apache.org%3E[https://lists.apache.org/thread.html/r615e1b6c2cdf6341ac69935f24190a865147b32e3b95dfb35c86383e%40%3Cusers.activemq.apache.org%3E]
> <https://lists.apache.org/thread.html/r615e1b6c2cdf6341ac69935f24190a865147b32e3b95dfb35c86383e%40%3Cusers.activemq.apache.org%3E%5Bhttps://lists.apache.org/thread.html/r615e1b6c2cdf6341ac69935f24190a865147b32e3b95dfb35c86383e%40%3Cusers.activemq.apache.org%3E%5D>
> > > > >But the proposed solution with multiple brokers and a failover is
> not
> > > > applicable/feasible in our use case.
> > > > >
> > > > >So far, the only other option I found is to stop the complete broker
> > by
> > > > stopping the windows service.
> > > > >But this obviously creates new challenges since we can no longer get
> > > > information on the size of the queues for example.
> > > > >
> > > > >Is there another way to stop the consumption and creation of
> messages?
> > > > >Ideally with an instant exception when a sent is attempted on a
> queue
> > > > that is stopped?
> > > > >
> > > > >Any help/hints are greatly appreciated
> > > > >
> > > > >Regards
> > > > >Hannes
> > > > >
> > > > > [cid:tangro-logo-cyan-rgb_cc8f5a9b-fee1-495e-878a-457c4b170988.png]
> > > > >
> > > > >Hannes Bauer
> > > > >Entwicklung
> > > > >h...@tangro.de<ma...@tangro.de>
> > > > >Fon +49 6221 1333 666
> > > > >www.tangro.de[http://www.tangro.de]<http://www.tangro.de[
> http://www.tangro.de]>
> > > > >
> > > > > <
> https://www.linkedin.com/company/3731516/[https://www.linkedin.com/company/3731516/]
> >
> > > > [cid:linkedin_cyan_Flche_c8d1d8e0-819d-446d-a928-19e3f92da84a.png] <
> > > >
> https://www.linkedin.com/company/3731516/[https://www.linkedin.com/company/3731516/]
> >
> > > > [cid:xing_cyan_Flche(1)_3aa617c4-0f52-4b4f-9e00-9fac40d37193.png] <
> > > >
> https://www.xing.com/companies/tangrosoftwarecomponentsgmbh[https://www.xing.com/companies/tangrosoftwarecomponentsgmbh]
> >
> > > > [cid:youtube_cyan_Flche_bbde269b-c7da-48df-b3a9-3ec7dfe09432.png] <
> > > >
> https://www.youtube.com/channel/UCsxrvLKxYpQ-KYK8uGQ8tVQ[https://www.youtube.com/channel/UCsxrvLKxYpQ-KYK8uGQ8tVQ]
> >
> > > >
> [cid:Twitter-Icon_cyan-Flche_14719373-0a07-425c-88a4-31303fb01996.png]
> > <
> > > >
> https://twitter.com/tangro_software[https://twitter.com/tangro_software]>
> > > >
> >
> [cid:tangro_newsletter_icon_rgb57-176-201_139x42_7758a315-d1ec-4829-adce-36eea7130bda.gif]
> > > > <
> https://www.tangro.de/newsletter/[https://www.tangro.de/newsletter/]> <
> https://www.tangro.de/newsletter/[https://www.tangro.de/newsletter/]
> > >
> > > > >
> > > > >tangro software components gmbh, Speyerer Straße 4, 69115 Heidelberg
> > > > >Geschäftsführer: Andreas Schumann, Registergericht: Mannheim, HRB
> > 336064
> > > > (Sitz: Heidelberg)
> > > > >
> > > >
> > >
> >
> >
>
>

Reply via email to