Hi everyone.  I wrote a HTTP client using netty.  But it can only send 100
requests despite how many requests are written to the channel.  For example
if I send 500 requests, an callback print the number on write complete
tells that 500 requests are written out, but nginx only get 100 of them and
returned 100 responses.  Not sure what I missed.  This seems to be some
configuration I didn't notice or there is a bug in my code I can't see.


My JDK and OS:
Apache Maven 3.3.9
Java version: 1.8.0_112, vendor: Oracle Corporation
OS name: "windows 7", version: "6.1", arch: "amd64", family: "dos"
Netty: 4.1.6.Final

How to test:
1. Run ab -c10 -n10000 http://localhost/ proves nginx is fine.
2. Run the code against nginx and check program's log, find log line "written
499" means 500 requests are written.  Find "200 OK 100" means only 100
respons with http status code 200.  But check nginx access_log, only 100
request are received by nginx.

Here is my code:

package demo;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;

public class HttpClient {
    private EventLoopGroup group;
    private Channel channel;

    public static void main(String[] args) throws InterruptedException,
IOException {
        String host = "127.0.0.1";
        int port = 80;
        int count = 500;

        EventLoopGroup group = new NioEventLoopGroup(1, new
DefaultThreadFactory("http-client-v1"));
        HttpClient client = new HttpClient(group);
        client.open(host, port);
        for (int i = 0; i < count; i++) {
            if (!client.isWritable()) {
                Thread.sleep(5000);
            }

            if (client.isWritable()) {
                System.out.println("send " + i);

                FullHttpRequest getRequest = new DefaultFullHttpRequest(
                        HttpVersion.HTTP_1_1,
                        HttpMethod.GET,
                        "/");
                getRequest.headers().set(HttpHeaderNames.CONTENT_TYPE,
HttpHeaderValues.TEXT_PLAIN);
                getRequest.headers().set(HttpHeaderNames.HOST, host + ":" +
"port");
                client.write(getRequest);
            }
        }

        System.out.println("done");
        System.in.read();
        client.close();
        System.out.println("close");
    }

    public HttpClient(EventLoopGroup group) {
        this.group = group;
    }

    public void open(String host, int port) {
        Bootstrap b = new Bootstrap()
                .group(group)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.SO_SNDBUF, 128 * 1024 * 1024)
                .option(ChannelOption.SO_RCVBUF, 128 * 1024 * 1024)
                .option(ChannelOption.SO_REUSEADDR, false)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .option(ChannelOption.WRITE_BUFFER_WATER_MARK, new
WriteBufferWaterMark(32 * 1024 * 1024, 128 * 1024 * 1024))
                .remoteAddress(host, port)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws
Exception {
                        ch.pipeline()
                                .addLast(new LogHandler())
                                .addLast(new HttpClientCodec())
                                .addLast(new HttpObjectAggregator(128 *
1024 * 1024))
                                .addLast(new ClientHandler());
                    }
                });
        channel = b.connect().syncUninterruptibly().channel();
        System.out.println("connected");
    }

    public void close() {
        try {
            channel.closeFuture().syncUninterruptibly();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public boolean isWritable() {
        return channel.isWritable();
    }

    public ChannelFuture write(FullHttpRequest request) {
        return channel.writeAndFlush(request);
    }

    private static class ClientHandler extends
SimpleChannelInboundHandler<FullHttpResponse> {
        AtomicInteger id = new AtomicInteger();

        @Override
        protected void channelRead0(ChannelHandlerContext ctx,
FullHttpResponse msg) throws Exception {
            System.out.println(msg.status() + ", " + id.incrementAndGet());
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable
cause) throws Exception {
            cause.getCause().printStackTrace();
        }
    }

    private static class LogHandler extends ChannelDuplexHandler {
        AtomicInteger id = new AtomicInteger();

        @Override
        public void write(ChannelHandlerContext ctx, Object msg,
ChannelPromise promise) throws Exception {
            super.write(ctx, msg, promise.addListener(new
GenericFutureListener<Future<? super Void>>() {
                @Override
                public void operationComplete(Future<? super Void> future)
throws Exception {
                    if (future.isSuccess()) {
                        System.out.println("written " +
id.incrementAndGet());
                    }
                }
            }));
        }
    }
}

-- 
You received this message because you are subscribed to the Google Groups 
"Netty discussions" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To view this discussion on the web visit 
https://groups.google.com/d/msgid/netty/CAAmVRwaoZ173RUUh15Pi6qyJFgfdFbWA2_cwg1ZwJdAm3tdjaw%40mail.gmail.com.
For more options, visit https://groups.google.com/d/optout.

Reply via email to