On 10/10/22 11:15, Jiri Daněk wrote:
I want to remove a durable subscriber that I have created on the remote
peer, which is the Artemis messaging broker.

How do I do this correctly? (Also, as a broader question, where do I find
complete instructions for working with durable subscriptions on Artemis
broker using an AMQP client, as opposed to JMS 2.0 client?)

Currently, I am using this code
https://github.com/rh-messaging/cli-java/blob/6c2cfc02984cfd93b44d306973f83422f716aeba/cli-protonj2/src/main/java/com/redhat/mqe/CliProtonJ2Receiver.java#L240-L243

that is, I create a receiver and then call receiver.close(). (If I wanted
to keep the durable subscriber active, I'd do receiver.detach() instead).

This is what happens when I export PN_TRACE_FRM=true

-> SASL:[1658511941:0] AMQP,3,1,0,0
<- SASL:[1658511941:0] AMQP,3,1,0,0
<- SASL:[1658511941:0] SaslMechanisms{saslServerMechanisms=[PLAIN,
ANONYMOUS]}
-> SASL:[1658511941:0] SaslInit{mechanism=PLAIN,
initialResponse="\x00admin\x00admin", hostname='null'}
<- SASL:[1658511941:0] SaslOutcome{code=OK, additionalData=null}
-> AMQP:[1658511941:0] AMQP,0,1,0,0
<- AMQP:[1658511941:0] AMQP,0,1,0,0
-> AMQP:[1658511941:0] Open{ containerId='cliId0', hostname='127.0.0.1',
maxFrameSize=65536, channelMax=65535, idleTimeOut=60000,
outgoingLocales=null, incomingLocales=null, offeredCapabilities=null,
desiredCapabilities=[sole-connection-for-container, DELAYED_DELIVERY,
SHARED-SUBS, ANONYMOUS-RELAY], properties=null}
-> AMQP:[1658511941:0] Close{error=null}
<- AMQP:[1658511941:0] Open{ containerId='amq', hostname='null',
maxFrameSize=131072, channelMax=65535, idleTimeOut=30000,
outgoingLocales=null, incomingLocales=null,
offeredCapabilities=[sole-connection-for-container, DELAYED_DELIVERY,
SHARED-SUBS, ANONYMOUS-RELAY], desiredCapabilities=null,
properties={product=apache-activemq-artemis, version=2.26.0}}
<- AMQP:[1658511941:0] Close{error=null}

Client does not even bother doing Begin/Attach/Detach! So the durable
subscriber stays active.

You need to use the openFuture of the Receiver to wait until the remote has fully opened it and then close the receiver to ensure that you get the behavior you are looking for.  In this case it is quite likely that the connection was still not fully setup when you opened and closed the receiver resulting in the engine deciding there was no need to send the performatives since you opened and closed before the connection likely finished the SASL exchange.  For an operation like this it makes sense to always ensure the receiver fully opened as the open can fail but closing it wouldn't tell you that.



What I want to happen is something similar to what I can get with Qpid JMS,
which is

[1253727474:0] -> SASL
[1253727474:0] <- SASL
[1253727474:0] <- SaslMechanisms{saslServerMechanisms=[PLAIN, ANONYMOUS]}
[1253727474:0] -> SaslInit{mechanism=PLAIN,
initialResponse=\x00admin\x00admin, hostname='127.0.0.1'}
[1253727474:0] <- SaslOutcome{_code=OK, _additionalData=null}
[1253727474:0] <- AMQP
[1253727474:0] -> AMQP
[1253727474:0] -> Open{ containerId='cliId0', hostname='127.0.0.1',
maxFrameSize=1048576, channelMax=32767, idleTimeOut=30000,
outgoingLocales=null, incomingLocales=null, offeredCapabilities=null,
desiredCapabilities=[sole-connection-for-container, DELAYED_DELIVERY,
ANONYMOUS-RELAY, SHARED-SUBS], properties={product=QpidJMS, version=2.1.0,
platform=JVM: 11.0.17-ea, 11.0.17-ea+1, Red Hat, Inc., OS: Linux,
5.19.11-300.fc37.x86_64, amd64}}
[1253727474:0] <- Open{ containerId='amq', hostname='null',
maxFrameSize=131072, channelMax=65535, idleTimeOut=30000,
outgoingLocales=null, incomingLocales=null,
offeredCapabilities=[sole-connection-for-container, DELAYED_DELIVERY,
SHARED-SUBS, ANONYMOUS-RELAY], desiredCapabilities=null,
properties={product=apache-activemq-artemis, version=2.26.0}}
[1253727474:0] -> Begin{remoteChannel=null, nextOutgoingId=1,
incomingWindow=2047, outgoingWindow=2147483647, handleMax=65535,
offeredCapabilities=null, desiredCapabilities=null, properties=null}
[1253727474:0] <- Begin{remoteChannel=0, nextOutgoingId=1,
incomingWindow=2147483647, outgoingWindow=2147483647, handleMax=65535,
offeredCapabilities=null, desiredCapabilities=null, properties=null}
[1253727474:1] -> Begin{remoteChannel=null, nextOutgoingId=1,
incomingWindow=2047, outgoingWindow=2147483647, handleMax=65535,
offeredCapabilities=null, desiredCapabilities=null, properties=null}
[1253727474:1] <- Begin{remoteChannel=1, nextOutgoingId=1,
incomingWindow=2147483647, outgoingWindow=2147483647, handleMax=65535,
offeredCapabilities=null, desiredCapabilities=null, properties=null}
[1253727474:0] -> Attach{name='ds0', handle=0, role=RECEIVER,
sndSettleMode=UNSETTLED, rcvSettleMode=FIRST, source=null,
target=Target{address='null', durable=NONE, expiryPolicy=SESSION_END,
timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null},
unsettled=null, incompleteUnsettled=false, initialDeliveryCount=null,
maxMessageSize=null, offeredCapabilities=null, desiredCapabilities=null,
properties=null}
[1253727474:0] <- Attach{name='ds0', handle=0, role=SENDER,
sndSettleMode=UNSETTLED, rcvSettleMode=FIRST,
source=Source{address='cliId0.ds0', durable=UNSETTLED_STATE,
expiryPolicy=NEVER, timeout=0, dynamic=false, dynamicNodeProperties=null,
distributionMode=copy, filter=null, defaultOutcome=null, outcomes=null,
capabilities=[topic]}, target=Target{address='null', durable=NONE,
expiryPolicy=SESSION_END, timeout=0, dynamic=false,
dynamicNodeProperties=null, capabilities=null}, unsettled=null,
incompleteUnsettled=false, initialDeliveryCount=0, maxMessageSize=null,
offeredCapabilities=null, desiredCapabilities=null, properties=null}
[1253727474:0] -> Detach{handle=0, closed=true, error=null}
[1253727474:0] <- Detach{handle=0, closed=true, error=null}
[1253727474:1] -> End{error=null}
[1253727474:1] <- End{error=null}
[1253727474:0] -> Close{error=null}
[1253727474:0] <- Close{error=null}

As a workaround, I figured I can do

             if (stringToBool(subscriberUnsubscribeString)) {
                 receiver.receive(1, TimeUnit.SECOND);  // add this receive
call
                 receiver.close();
                 return 0;
             }

but that does something different than the Qpid JMS example (it sends flow
and actually would receive messages, if there was any).

When I tried tryReceive instead of receive, the AMQP trace stayed the same
as it was originally, that is, no Begin/Attach/Detach was produced either.


--
Tim Bish


---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscr...@qpid.apache.org
For additional commands, e-mail: users-h...@qpid.apache.org

Reply via email to