Address-full-policy is "BLOCK." I'm attaching a broker.xml and a C++ app that we use to recreate this problem. The program is invoked as:
ProtonSender tcp://<IP>:<PORT> ring_static_block::ring_static_block_1000 That starts sending to the ring_static_block_1000 queue. Expect it to fill up and then block the sender, but the actual behavior is as described in last email. Thanks for any help. -----Original Message----- From: Justin Bertram <[email protected]> Sent: Wednesday, November 12, 2025 6:02 PM To: [email protected] Subject: Re: Flow control with Artemis, AMQP, & Qpid Proton External Email Alert This email has been sent from an account outside of the BAE Systems network. Please treat the email with caution, especially if you are requested to click on a link, decrypt/open an attachment, or enable macros. For further information on how to spot phishing, access “Cybersecurity OneSpace Page” and report phishing by clicking the button “Report Phishing” on the Outlook toolbar. What's your address-full-policy? Do you have a test-case that demonstrates the issue which you can shared? Justin On Wed, Nov 12, 2025 at 2:44 PM Patrick, Alton (US) <[email protected]> wrote: > We are using an Artemis broker with clients that speak AMQP with the > Qpid Proton library. We're trying to implement blocking on the > producer as described here: > > https://activemq.apache.org/components/artemis/documentation/latest/fl > ow-control.html#blocking-amqp-producers > > We have set both max-size-bytes and max-size-bytes-reject-threshold on > the > address: > <max-size-bytes>1048576</max-size-bytes> > > <max-size-bytes-reject-threshold>524288</max-size-bytes-reject-thresho > ld> > > What we expect to see: When the broker has received > max-size-bytes-reject-threshold bytes of data, it should start > rejecting messages. > > But we aren't seeing any messages get rejected. Instead, it continues > accepting messages until max-size-bytes have been received. The > message that pushes the buffer over that limit is dropped. After that, > our producer stops getting credits from the broker and is effectively > blocked. However, we do lose one message. > > I suppose we could add logic in the producer to figure out the message > was dropped and retransmit it....but that doesn't seem like the way > this is supposed to work. Is it? Are we doing something wrong? Has > anyone successfully implemented blocking of AMQP producers with qpid > proton without losing messages? > > -Alton > > >
broker.xml
Description: broker.xml
#include <proton/delivery.hpp>
#include <proton/connection.hpp>
#include <proton/connection_options.hpp>
#include <proton/container.hpp>
#include <proton/message.hpp>
#include <proton/messaging_handler.hpp>
#include <proton/receiver.hpp>
#include <proton/receiver_options.hpp>
#include <proton/sender.hpp>
#include <proton/source.hpp>
#include <proton/work_queue.hpp>
#include <atomic>
#include <chrono>
#include <csignal>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <queue>
#include <sstream>
#include <string>
#include <thread>
static bool shouldQuit = false;
class receiver : private proton::messaging_handler
{
proton::receiver receiver_;
std::mutex lock_;
proton::work_queue* work_queue_;
std::queue<proton::message> buffer_;
std::condition_variable can_receive_;
bool closed_;
bool first_message_received_ = false;
int expected_message_number_;
public:
receiver(proton::container& cont, const std::string& url, const
std::string& address)
{
proton::connection_options connectionOptions;
connectionOptions.handler(*this);
connectionOptions.user("artemis");
connectionOptions.password("artemis");
connectionOptions.sasl_enabled(true);
connectionOptions.sasl_allow_insecure_mechs(true);
connectionOptions.sasl_allowed_mechs("PLAIN");
cont.open_receiver(url + "/" + address,
proton::receiver_options().credit_window(0),
connectionOptions);
}
proton::message receive()
{
std::unique_lock<std::mutex> l(lock_);
while (!closed_ && (!work_queue_ || buffer_.empty()) && !shouldQuit)
{
can_receive_.wait(l);
}
if (closed_)
{
throw std::runtime_error("receiver closed");
}
proton::message m = std::move(buffer_.front());
buffer_.pop();
work_queue_->add([=]() { this->receive_done(); });
return m;
}
void close()
{
std::lock_guard<std::mutex> l(lock_);
if (!closed_)
{
closed_ = true;
can_receive_.notify_all();
if (work_queue_)
{
work_queue_->add([this]() {
this->receiver_.connection().close(); });
}
}
}
int32_t check_message_number(const std::string& body)
{
int32_t retVal = -1;
size_t pos = body.find("this is message #");
if (pos != std::string::npos)
{
int message_number = std::stoi(body.substr(pos + 17));
retVal = message_number;
if (first_message_received_)
{
if (message_number != expected_message_number_)
{
std::cerr << "expected message " <<
expected_message_number_ << ", but instead received message " << message_number
<< std::endl;
expected_message_number_ = message_number;
}
}
else
{
first_message_received_ = true;
expected_message_number_ = message_number;
}
++expected_message_number_;
}
return retVal;
}
private:
void on_receiver_open(proton::receiver& r) override
{
receiver_ = r;
std::lock_guard<std::mutex> l(lock_);
work_queue_ = &receiver_.work_queue();
receiver_.add_credit(1);
}
void on_message(proton::delivery& d, proton::message& m) override
{
std::lock_guard<std::mutex> l(lock_);
buffer_.push(m);
d.accept();
can_receive_.notify_all();
}
void receive_done()
{
receiver_.add_credit(1);
}
void on_error(const proton::error_condition& e) override
{
std::cerr << "unexpected error: " << e << std::endl;
exit(1);
}
};
static receiver* receiverPtr = NULL;
void handleSigint(int)
{
std::cout << "Caught Ctrl-C. Terminating..." << std::endl;
shouldQuit = true;
if (receiverPtr != NULL)
{
receiverPtr->close();
}
}
int main(int argc, char** argv)
{
shouldQuit = false;
signal(SIGTERM, handleSigint);
signal(SIGINT, handleSigint);
if (argc != 3)
{
std::cout << "Usage: <connect_string> <address>" << std::endl;
return -1;
}
const char* CONNECT_STRING = argv[1];
const char* ADDRESS = argv[2];
try
{
proton::container container;
auto container_thread = std::thread([&]() { container.run(); });
receiver recv(container, CONNECT_STRING, ADDRESS);
receiverPtr = &recv;
std::cout << "Connected to server: " << CONNECT_STRING << " address: "
<< ADDRESS << std::endl;
int msg_count = 0;
int32_t msg_number = 0;
while(!shouldQuit)
{
try
{
proton::message m = recv.receive();
msg_count++;
std::string body = proton::to_string(m.body());
msg_number = recv.check_message_number(body);
const int PREFIX_LENGTH = 1024;
std::string bodyNoPrefix(body.substr(PREFIX_LENGTH));
if (msg_number % 1000 == 0)
{
std::cout << bodyNoPrefix << std::endl;
}
}
catch (...)
{
// bury exception and keep going.
}
}
recv.close();
container_thread.join();
std::cout << "Message Count: " << msg_count << " last message number: "
<< msg_number << std::endl;
return 0;
}
catch (const std::exception& e)
{
std::cerr << e.what() << std::endl;
}
return 1;
}#include <proton/connection.hpp>
#include <proton/connection_options.hpp>
#include <proton/container.hpp>
#include <proton/message.hpp>
#include <proton/messaging_handler.hpp>
#include <proton/receiver.hpp>
#include <proton/receiver_options.hpp>
#include <proton/sender.hpp>
#include <proton/work_queue.hpp>
#include <iomanip>
#include <atomic>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <queue>
#include <sstream>
#include <string>
#include <thread>
#include <set>
class sender : private proton::messaging_handler
{
public:
sender(const std::string& connectString,
const std::string& address)
: accepted(0)
, modified(0)
, none(0)
, recieved(0)
, rejected(0)
, released(0)
, sent(0)
, m_workQueue(0)
, m_connectionFailed(false)
, m_currentCredit(1)
, m_queued(0)
{
m_url = connectString;
m_address = address;
m_container.reset(new proton::container(*this));
m_containerRunThread = std::make_shared<std::thread>(
std::bind(&sender::runContainerThread, this, m_container)
);
}
~sender()
{
{
std::lock_guard<std::mutex> lockQueue(m_connectionLock);
if (m_workQueue)
{
m_workQueue->add(
[=]()
{
m_sender.close();
m_sender.connection().close();
}
);
}
}
if (m_containerRunThread)
{
m_containerRunThread->join();
m_containerRunThread.reset();
}
std::cout << "======================================" << std::endl
<< "= accepted: " << accepted << std::endl
<< "= modified: " << modified << std::endl
<< "= none: " << none << std::endl
<< "= received: " << recieved << std::endl
<< "= rejected: " << rejected << std::endl
<< "= released: " << released << std::endl
<< "= sent: " << sent << std::endl
<< "= unacknowledged messages: " << sentTags.size() << std::endl
<< "======================================" << std::endl;
}
void runContainerThread(std::shared_ptr<proton::container> container)
{
try
{
container->run();
}
catch (const std::exception&)
{
std::lock_guard<std::mutex> lockConnection(m_connectionLock);
m_connectionFailed = true;
m_senderReady.notify_all();
}
}
void on_container_start(proton::container& container)
{
proton::connection_options connectionOptions;
connectionOptions.user("artemis");
connectionOptions.password("artemis");
connectionOptions.sasl_enabled(true);
connectionOptions.sasl_allow_insecure_mechs(true);
connectionOptions.sasl_allowed_mechs("PLAIN");
container.connect(m_url, connectionOptions);
}
void on_connection_open(proton::connection& conn)
{
m_connection = conn;
{
std::lock_guard<std::mutex> lockConn(m_connectionLock);
m_connectionFailed = false;
}
m_connection.open_sender(m_address);
}
bool readyToSend()
{
std::unique_lock<std::mutex> lockConn(m_connectionLock);
while (!m_workQueue && !m_connectionFailed)
{
m_senderReady.wait(lockConn);
}
return !m_connectionFailed;
}
bool send(const proton::message& message)
{
if (!readyToSend())
{
while (!readyToSend())
{
std::this_thread::sleep_for(std::chrono::milliseconds(20));
}
}
m_queued++;
{
//std::lock_guard<std::mutex> l(m_connectionLock);
while (m_queued > m_currentCredit)
{
if (m_connectionFailed)
{
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(20));
}
}
m_workQueue->add([=]() { do_send(message); });
return true;
}
void on_sendable(proton::sender& s) override
{
std::lock_guard<std::mutex> l(m_connectionLock);
m_currentCredit = s.credit();
sender_ready_.notify_all();
}
void do_send(const proton::message& m)
{
std::lock_guard<std::mutex> l(m_connectionLock);
sent++;
proton::tracker t = m_sender.send(m);
sentTags.insert(t.tag());
m_currentCredit = m_sender.credit();
m_queued--;
sender_ready_.notify_all();
}
void on_sender_open(proton::sender& s)
{
std::lock_guard<std::mutex> lockSend(m_connectionLock);
m_sender = s;
m_workQueue = &s.work_queue();
m_senderReady.notify_all();
}
void on_connection_close(proton::connection& conn)
{
conn.container().stop();
}
void on_transport_error(proton::transport& tport)
{
std::lock_guard<std::mutex> lockConnection(m_connectionLock);
m_connectionFailed = true;
m_senderReady.notify_all();
}
void on_connection_error(proton::connection& conn)
{
std::lock_guard<std::mutex> lockConnection(m_connectionLock);
m_connectionFailed = true;
m_senderReady.notify_all();
}
void on_error(const proton::error_condition& e)
{
std::lock_guard<std::mutex> lockConnection(m_connectionLock);
m_connectionFailed = true;
m_senderReady.notify_all();
}
void on_tracker_accept(proton::tracker& t) override
{
std::lock_guard<std::mutex> l(m_connectionLock);
auto matchingResult = sentTags.find(t.tag());
if (matchingResult == sentTags.end())
{
std::cout << "on_tracker_accept: unknown tag: " << t.tag() <<
std::endl;
}
else
{
// found item
sentTags.erase(t.tag());
switch (t.state())
{
case proton::transfer::state::ACCEPTED:
//pendingMessages.erase(t.tag());
accepted++;
break;
case proton::transfer::state::MODIFIED:
modified++;
break;
case proton::transfer::state::NONE:
none++;
break;
case proton::transfer::state::RECEIVED:
recieved++;
break;
case proton::transfer::state::REJECTED:
rejected++;
break;
case proton::transfer::state::RELEASED:
released++;
break;
}
}
}
private:
std::condition_variable sender_ready_;
std::string m_url;
std::string m_address;
std::mutex m_connectionLock;
std::set<proton::binary> sentTags;
int32_t accepted;
int32_t modified;
int32_t none;
int32_t recieved;
int32_t rejected;
int32_t released;
int32_t sent;
// The router connection
proton::connection m_connection;
bool m_connectionFailed;
int32_t m_currentCredit;
int32_t m_queued;
// The sender for the router
proton::sender m_sender;
// Indicates when the sender is initialized and ready to send messages
std::condition_variable m_senderReady;
// The receiver for the router
proton::receiver m_receiver;
// The container (highest level proton struct for message_handlers) for the
router
std::shared_ptr<proton::container> m_container;
// The thread for running the router container
std::shared_ptr<std::thread> m_containerRunThread;
proton::work_queue* m_workQueue;
};
int main(int argc, char** argv)
{
if (argc != 3)
{
std::cout << "Usage: <connect_string> <address>" << std::endl;
return -1;
}
const char *CONNECT_STRING = argv[1];
const char* ADDRESS = argv[2];
try
{
sender send(CONNECT_STRING, ADDRESS);
const int32_t PREFIX_LENGTH = 1025-23;
const char PREFIX_CONST[] = "X";
std::string messagePrefix;
for (int j = 0; j < PREFIX_LENGTH; ++j)
{
messagePrefix += PREFIX_CONST;
}
int i = 0;
for (i = 0; i < 100000; ++i)
{
proton::message m;
std::stringstream ss;
ss << "this is message #" << std::setw(6) << std::setfill('0') << i;
std::string payload(messagePrefix + ss.str());
m.body(payload);
// downsample printouts, to ensure log messages to stdout aren't
slowing us down.
if (i % 1000 == 0)
{
std::cout << m.body() << " | " << payload.size() << std::endl;
}
send.send(m);
}
return 0;
}
catch (const std::exception& e)
{
std::cerr << e.what() << std::endl;
}
return 1;
}--------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected] For further information, visit: https://activemq.apache.org/contact
