On 30/01/19 14:03, Panagiotis Sakellariou wrote:
Hi there,

I'm new on qpid proton and I can to work with qpid-proton cpp libs.
I'm running through the provided examples but I'm having the following
issue.

Simple sent  and receive works.
Send receive and reply works as well.
Now I want to have send, receive, reply but on the sender to have a timeout
and if the reply is not receive within a period of time then stop waiting.
I have opened a ticket https://issues.apache.org/jira/browse/PROTON-2000
because I though this is how it should be implemented.

The timeout on the source determines how long any state associated with the subscription is kept when the subscription is not active. It comes from the AMQP spec[1] but the description there is not very helpful either (requires a fair bit of reading between the lines).

[1] http://docs.oasis-open.org/amqp/core/v1.0/amqp-core-complete-v1.0.pdf#subsection.3.5.3

I have the code there so anyone can check it,

Can anyone send me an example or a link where i can find more information
about what I'm trying to do.
Also i think the documentation is not sufficient. So any information links
with example, discussion would be very helpful. :)

Attached is an example that I think shows what you want (i.e. time out if a reply does not arrive within some configured time). There isn't a lot of doc on it I'm afraid. Ideally the tutorial would have a section on scheduled events. The schedule method on container is the primary tool though: https://qpid.apache.org/releases/qpid-proton-0.25.0/proton/cpp/api/classproton_1_1container.html#aa99ede2051ccdf5fe8257d893559ea26


/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */

#include "options.hpp"
#include <proton/connection.hpp>
#include <proton/container.hpp>
#include <proton/delivery.hpp>
#include <proton/message.hpp>
#include <proton/messaging_handler.hpp>
#include <proton/receiver_options.hpp>
#include <proton/source_options.hpp>
#include <proton/tracker.hpp>
#include <proton/work_queue.hpp>

#include <iostream>
#include <vector>

#include "fake_cpp11.hpp"

using proton::receiver_options;
using proton::source_options;

class client : public proton::messaging_handler {
  private:
    std::string url;
    proton::duration timeout;
    proton::sender sender;
    proton::receiver receiver;
    bool closed;
    proton::work_queue* work_queue;

  public:
    client(const std::string &u, double t) : url(u), timeout(int(t*proton::duration::SECOND.milliseconds())), closed(false), work_queue(0) {}

    void on_container_start(proton::container &c) OVERRIDE {
        sender = c.open_sender(url);
        // Create a receiver requesting a dynamically created queue
        // for the message source.
        receiver_options opts = receiver_options().source(source_options().dynamic(true));
        receiver = sender.connection().open_receiver("", opts);
    }

    void on_receiver_open(proton::receiver &) OVERRIDE {
        proton::message req;
        req.body("Test request");
        req.reply_to(receiver.source().address());
        sender.send(req);
        work_queue = &sender.work_queue();
        // Call this->cancel after timeout.
        sender.container().schedule(timeout, [this]() { this->work_queue->add( [this]() { this->close(); }); });
    }

    void close() {
        if (!closed) {
            std::cout << "Timed out waiting for reply" << std::endl;
            sender.connection().close();
        }
    }

    void on_message(proton::delivery &d, proton::message &response) OVERRIDE {
        std::cout << "Got reply: " << response.body() << std::endl;
        closed = true;
        sender.connection().close();
    }
};

int main(int argc, char **argv) {
    std::string url("127.0.0.1:5672/examples");
    example::options opts(argc, argv);
    double timeout = 5.0;

    opts.add_value(url, 'a', "address", "connect and send to URL", "URL");
    opts.add_value(timeout, 't', "timeout", "stop after T seconds", "T");

    try {
        opts.parse();
        client c(url, timeout);
        proton::container(c).run();

        return 0;
    } catch (const example::bad_option& e) {
        std::cout << opts << std::endl << e.what() << std::endl;
    } catch (const std::exception& e) {
        std::cerr << e.what() << std::endl;
    }

    return 1;
}

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to