Hi Philip,

I don't think the ConnectionThrottleFilter would help in your case, as it throttle the incoming *connections*, not the read bytes.

Regarding your requirement, on the server, if you do nothing, it will read as much as it can from the socket, as fast as it can. So you need to find a way to block the server from reading data for a session when the threshold has been met.

There are two things to deal with here:
* blocking the reads when we already have had enough
* unblocking the reads when we are back on our feet


The second point would require some kind of mechanism to 'wake up' a suspended session.

The first point is easier, you have an IoSession.suspendRead() that does exactly that.

Starting from this second point, let's check the testSuspendResumeReadWrite() test:



    @Test
    public void testSuspendResumeReadWrite() throws Exception {
        ConnectFuture future = connect(port, new ClientIoHandler());
        future.awaitUninterruptibly();
        IoSession session = future.getSession();

        // We wait for the sessionCreated() event is fired because we
        // cannot guarantee that it is invoked already.
        while (session.getAttribute("lock") == null) {
            Thread.yield();
        }

        Object lock = session.getAttribute("lock");
        synchronized (lock) {

            write(session, "1");
            assertEquals('1', read(session));
            assertEquals("1", getReceived(session));
            assertEquals("1", getSent(session));

            session.suspendRead();

            Thread.sleep(100);

            write(session, "2");
            assertFalse(canRead(session));
            assertEquals("1", getReceived(session));
            assertEquals("12", getSent(session));

            session.suspendWrite();

            Thread.sleep(100);

            write(session, "3");
            assertFalse(canRead(session));
            assertEquals("1", getReceived(session));
            assertEquals("12", getSent(session));

            session.resumeRead();

            Thread.sleep(100);
...

As you can see, we call the suspendRead() and resumeRead() method on the session at will (the remaining code is just not interesting).

What I can imagine is that your application processes incoming data through the messageReceived() method, and when you've reached your threshold, then you call the session.suspendRead() method.

Now you need a timer to resume the session state.

My idea would be to leverage the ioHandler.sessionIdle() callback to manage the session state: this message is called every second, so you can compute the time to wait before resuming the read based on how much bytes you have received in a given time frame.

This is from the top of my head, and as you can see, it's up to the application implementer to deal with the threshold enforcement...

And yes, it would be cool to have such a feature embedded in MINA :-)

Hope it helps...

On 12/09/2024 15:26, Philip Whitehouse wrote:
Hi folks,

We have an application built on top of Apache MINA and we're looking at 
introducing read-throttling (ideally making use of TCP queue behaviour) to 
allow us to handle incoming message volumes on a TCP connection.

 From the documentation I can see there's a connection throttle feature - 
implemented in 
https://nightlies.apache.org/mina/mina/2.0.22/apidocs/org/apache/mina/filter/firewall/ConnectionThrottleFilter.html
But there's no apparently read-throttle filter in the docs.

Is there a suggested approach/implementation. Is there a better method than 
adding `Thread.sleep` in messageReceived? Or is there an example/implementation 
already in code I've overlooked?

Best regards,
Philip Whitehouse

This communication is for informational purposes only. The contents of this 
transmission are confidential and are intended solely for the use of the 
individual or entity to whom they are addressed. If you have received this 
email in error, please notify the sender by return email and delete this 
message from your system. FlexTrade Systems Inc., its subsidiaries and 
affiliates do not guarantee the completeness and accuracy of this 
transmission's contents. Moreover, FlexTrade Systems Inc., its subsidiaries and 
affiliates do not guarantee this communication to be free of viruses and accept 
no liability for any damage caused thereof.



--
*Emmanuel Lécharny* P. +33 (0)6 08 33 32 61
elecha...@apache.org

---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscr...@mina.apache.org
For additional commands, e-mail: users-h...@mina.apache.org

Reply via email to