I want to remove a durable subscriber that I have created on the remote
peer, which is the Artemis messaging broker.
How do I do this correctly? (Also, as a broader question, where do I find
complete instructions for working with durable subscriptions on Artemis
broker using an AMQP client, as opposed to JMS 2.0 client?)
Currently, I am using this code
https://github.com/rh-messaging/cli-java/blob/6c2cfc02984cfd93b44d306973f83422f716aeba/cli-protonj2/src/main/java/com/redhat/mqe/CliProtonJ2Receiver.java#L240-L243
that is, I create a receiver and then call receiver.close(). (If I wanted
to keep the durable subscriber active, I'd do receiver.detach() instead).
This is what happens when I export PN_TRACE_FRM=true
-> SASL:[1658511941:0] AMQP,3,1,0,0
<- SASL:[1658511941:0] AMQP,3,1,0,0
<- SASL:[1658511941:0] SaslMechanisms{saslServerMechanisms=[PLAIN,
ANONYMOUS]}
-> SASL:[1658511941:0] SaslInit{mechanism=PLAIN,
initialResponse="\x00admin\x00admin", hostname='null'}
<- SASL:[1658511941:0] SaslOutcome{code=OK, additionalData=null}
-> AMQP:[1658511941:0] AMQP,0,1,0,0
<- AMQP:[1658511941:0] AMQP,0,1,0,0
-> AMQP:[1658511941:0] Open{ containerId='cliId0', hostname='127.0.0.1',
maxFrameSize=65536, channelMax=65535, idleTimeOut=60000,
outgoingLocales=null, incomingLocales=null, offeredCapabilities=null,
desiredCapabilities=[sole-connection-for-container, DELAYED_DELIVERY,
SHARED-SUBS, ANONYMOUS-RELAY], properties=null}
-> AMQP:[1658511941:0] Close{error=null}
<- AMQP:[1658511941:0] Open{ containerId='amq', hostname='null',
maxFrameSize=131072, channelMax=65535, idleTimeOut=30000,
outgoingLocales=null, incomingLocales=null,
offeredCapabilities=[sole-connection-for-container, DELAYED_DELIVERY,
SHARED-SUBS, ANONYMOUS-RELAY], desiredCapabilities=null,
properties={product=apache-activemq-artemis, version=2.26.0}}
<- AMQP:[1658511941:0] Close{error=null}
Client does not even bother doing Begin/Attach/Detach! So the durable
subscriber stays active.
What I want to happen is something similar to what I can get with Qpid JMS,
which is
[1253727474:0] -> SASL
[1253727474:0] <- SASL
[1253727474:0] <- SaslMechanisms{saslServerMechanisms=[PLAIN, ANONYMOUS]}
[1253727474:0] -> SaslInit{mechanism=PLAIN,
initialResponse=\x00admin\x00admin, hostname='127.0.0.1'}
[1253727474:0] <- SaslOutcome{_code=OK, _additionalData=null}
[1253727474:0] <- AMQP
[1253727474:0] -> AMQP
[1253727474:0] -> Open{ containerId='cliId0', hostname='127.0.0.1',
maxFrameSize=1048576, channelMax=32767, idleTimeOut=30000,
outgoingLocales=null, incomingLocales=null, offeredCapabilities=null,
desiredCapabilities=[sole-connection-for-container, DELAYED_DELIVERY,
ANONYMOUS-RELAY, SHARED-SUBS], properties={product=QpidJMS, version=2.1.0,
platform=JVM: 11.0.17-ea, 11.0.17-ea+1, Red Hat, Inc., OS: Linux,
5.19.11-300.fc37.x86_64, amd64}}
[1253727474:0] <- Open{ containerId='amq', hostname='null',
maxFrameSize=131072, channelMax=65535, idleTimeOut=30000,
outgoingLocales=null, incomingLocales=null,
offeredCapabilities=[sole-connection-for-container, DELAYED_DELIVERY,
SHARED-SUBS, ANONYMOUS-RELAY], desiredCapabilities=null,
properties={product=apache-activemq-artemis, version=2.26.0}}
[1253727474:0] -> Begin{remoteChannel=null, nextOutgoingId=1,
incomingWindow=2047, outgoingWindow=2147483647, handleMax=65535,
offeredCapabilities=null, desiredCapabilities=null, properties=null}
[1253727474:0] <- Begin{remoteChannel=0, nextOutgoingId=1,
incomingWindow=2147483647, outgoingWindow=2147483647, handleMax=65535,
offeredCapabilities=null, desiredCapabilities=null, properties=null}
[1253727474:1] -> Begin{remoteChannel=null, nextOutgoingId=1,
incomingWindow=2047, outgoingWindow=2147483647, handleMax=65535,
offeredCapabilities=null, desiredCapabilities=null, properties=null}
[1253727474:1] <- Begin{remoteChannel=1, nextOutgoingId=1,
incomingWindow=2147483647, outgoingWindow=2147483647, handleMax=65535,
offeredCapabilities=null, desiredCapabilities=null, properties=null}
[1253727474:0] -> Attach{name='ds0', handle=0, role=RECEIVER,
sndSettleMode=UNSETTLED, rcvSettleMode=FIRST, source=null,
target=Target{address='null', durable=NONE, expiryPolicy=SESSION_END,
timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null},
unsettled=null, incompleteUnsettled=false, initialDeliveryCount=null,
maxMessageSize=null, offeredCapabilities=null, desiredCapabilities=null,
properties=null}
[1253727474:0] <- Attach{name='ds0', handle=0, role=SENDER,
sndSettleMode=UNSETTLED, rcvSettleMode=FIRST,
source=Source{address='cliId0.ds0', durable=UNSETTLED_STATE,
expiryPolicy=NEVER, timeout=0, dynamic=false, dynamicNodeProperties=null,
distributionMode=copy, filter=null, defaultOutcome=null, outcomes=null,
capabilities=[topic]}, target=Target{address='null', durable=NONE,
expiryPolicy=SESSION_END, timeout=0, dynamic=false,
dynamicNodeProperties=null, capabilities=null}, unsettled=null,
incompleteUnsettled=false, initialDeliveryCount=0, maxMessageSize=null,
offeredCapabilities=null, desiredCapabilities=null, properties=null}
[1253727474:0] -> Detach{handle=0, closed=true, error=null}
[1253727474:0] <- Detach{handle=0, closed=true, error=null}
[1253727474:1] -> End{error=null}
[1253727474:1] <- End{error=null}
[1253727474:0] -> Close{error=null}
[1253727474:0] <- Close{error=null}
As a workaround, I figured I can do
if (stringToBool(subscriberUnsubscribeString)) {
receiver.receive(1, TimeUnit.SECOND); // add this receive
call
receiver.close();
return 0;
}
but that does something different than the Qpid JMS example (it sends flow
and actually would receive messages, if there was any).
When I tried tryReceive instead of receive, the AMQP trace stayed the same
as it was originally, that is, no Begin/Attach/Detach was produced either.