abdullah alamoudi has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1935

Change subject: Further chunk large chunks
......................................................................

Further chunk large chunks

Change-Id: I08bac47ea28f66502b99df6fb8ff91dd85566d38
---
M hyracks-fullstack/hyracks/hyracks-http/pom.xml
M 
hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/AbstractServlet.java
M 
hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java
A 
hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/ChattyServlet.java
M 
hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
5 files changed, 137 insertions(+), 31 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/35/1935/1

diff --git a/hyracks-fullstack/hyracks/hyracks-http/pom.xml 
b/hyracks-fullstack/hyracks/hyracks-http/pom.xml
index 835cf61..6439adb 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-http/pom.xml
@@ -16,7 +16,8 @@
  ! specific language governing permissions and limitations
  ! under the License.
  !-->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
   <modelVersion>4.0.0</modelVersion>
   <parent>
     <groupId>org.apache.hyracks</groupId>
@@ -27,6 +28,17 @@
   <properties>
     <root.dir>${basedir}/../..</root.dir>
   </properties>
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <argLine>-XX:MaxDirectMemorySize=16M</argLine>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
   <dependencies>
     <dependency>
       <groupId>io.netty</groupId>
diff --git 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/AbstractServlet.java
 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/AbstractServlet.java
index 4a4c25a..433d5ac 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/AbstractServlet.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/AbstractServlet.java
@@ -88,6 +88,9 @@
         } catch (Exception e) {
             LOGGER.log(Level.WARNING, "Unhandled exception", e);
             response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
+        } catch (Throwable th) { //NOSONAR Just logging and then throwing again
+            LOGGER.log(Level.WARNING, "Unhandled throwable", th);
+            throw th;
         }
     }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java
 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java
index 3456343..b5b526f 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java
@@ -27,6 +27,7 @@
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.http.DefaultHttpContent;
 import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.util.internal.OutOfDirectMemoryError;
 
 public class ChunkedNettyOutputStream extends OutputStream {
 
@@ -50,23 +51,27 @@
 
     @Override
     public void write(byte[] b, int off, int len) throws IOException {
-        if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > 
b.length)) {
-            throw new IndexOutOfBoundsException();
-        } else if (len == 0) {
-            return;
-        }
-        if (len > buffer.capacity()) {
-            flush();
-            flush(b, off, len);
-        } else {
-            int space = buffer.writableBytes();
-            if (space >= len) {
-                buffer.writeBytes(b, off, len);
-            } else {
-                buffer.writeBytes(b, off, space);
-                flush();
-                buffer.writeBytes(b, off + space, len - space);
+        try {
+            if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > 
b.length)) {
+                throw new IndexOutOfBoundsException();
+            } else if (len == 0) {
+                return;
             }
+            int remaining = len;
+            while (remaining > 0) {
+                int space = buffer.writableBytes();
+                if (space >= remaining) {
+                    buffer.writeBytes(b, off, remaining);
+                    remaining = 0;
+                } else {
+                    buffer.writeBytes(b, off, space);
+                    remaining -= space;
+                    flush();
+                }
+            }
+        } catch (OutOfDirectMemoryError error) {// NOSONAR
+            response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
+            throw error;
         }
     }
 
@@ -111,18 +116,6 @@
                 response.error(aBuffer);
                 buffer.clear();
             }
-        }
-    }
-
-    private void flush(byte[] buf, int offset, int len) throws IOException {
-        ensureWritable();
-        ByteBuf aBuffer = ctx.alloc().buffer(len);
-        aBuffer.writeBytes(buf, offset, len);
-        if (response.status() == HttpResponseStatus.OK) {
-            response.beforeFlush();
-            ctx.write(new DefaultHttpContent(aBuffer), 
ctx.channel().voidPromise());
-        } else {
-            response.error(aBuffer);
         }
     }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/ChattyServlet.java
 
b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/ChattyServlet.java
new file mode 100644
index 0000000..30df003
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/ChattyServlet.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.http.servlet;
+
+import java.util.concurrent.ConcurrentMap;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.hyracks.http.api.IServletRequest;
+import org.apache.hyracks.http.api.IServletResponse;
+import org.apache.hyracks.http.server.AbstractServlet;
+import org.apache.hyracks.http.server.utils.HttpUtil;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+
+public class ChattyServlet extends AbstractServlet {
+    private static final Logger LOGGER = 
Logger.getLogger(ChattyServlet.class.getName());
+    private byte[] bytes;
+
+    public ChattyServlet(ConcurrentMap<String, Object> ctx, String[] paths) {
+        super(ctx, paths);
+        String line = "I don't know when to stop talking\n";
+        StringBuilder responseBuilder = new StringBuilder();
+        for (int i = 0; i < 100000; i++) {
+            responseBuilder.append(line);
+        }
+        String responseString = responseBuilder.toString();
+        bytes = responseString.getBytes();
+    }
+
+    @Override
+    protected void get(IServletRequest request, IServletResponse response) 
throws Exception {
+        response.setStatus(HttpResponseStatus.OK);
+        HttpUtil.setContentType(response, HttpUtil.ContentType.TEXT_HTML, 
HttpUtil.Encoding.UTF8);
+        LOGGER.log(Level.WARNING, "I am about to flood you... and a single 
buffer is " + bytes.length + " bytes");
+        for (int i = 0; i < 100; i++) {
+            response.outputStream().write(bytes);
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
 
b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
index 2076201..854980e 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
@@ -41,13 +41,16 @@
 import org.apache.http.impl.client.StandardHttpRequestRetryHandler;
 import org.apache.hyracks.http.server.HttpServer;
 import org.apache.hyracks.http.server.WebManager;
+import org.apache.hyracks.http.servlet.ChattyServlet;
 import org.apache.hyracks.http.servlet.SlowServlet;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 
 import io.netty.handler.codec.http.HttpResponseStatus;
 
 public class HttpServerTest {
+    static final boolean PRINT_TO_CONSOLE = false;
     static final int PORT = 9898;
     static final int NUM_EXECUTOR_THREADS = 16;
     static final int SERVER_QUEUE_SIZE = 16;
@@ -59,6 +62,13 @@
     static final AtomicInteger UNAVAILABLE_COUNT = new AtomicInteger();
     static final AtomicInteger OTHER_COUNT = new AtomicInteger();
     static final List<Thread> THREADS = new ArrayList<>();
+
+    @Before
+    public void setUp() {
+        SUCCESS_COUNT.set(0);
+        UNAVAILABLE_COUNT.set(0);
+        OTHER_COUNT.set(0);
+    }
 
     @Test
     public void testOverloadingServer() throws Exception {
@@ -76,6 +86,31 @@
             }
             Assert.assertEquals(32, SUCCESS_COUNT.get());
             Assert.assertEquals(16, UNAVAILABLE_COUNT.get());
+            Assert.assertEquals(0, OTHER_COUNT.get());
+        } finally {
+            webMgr.stop();
+        }
+    }
+
+    @Test
+    public void testChattyServer() throws Exception {
+        int numRequests = 64;
+        int numExecutors = 32;
+        int serverQueueSize = 32;
+        WebManager webMgr = new WebManager();
+        HttpServer server =
+                new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), PORT, 
numExecutors, serverQueueSize);
+        ChattyServlet servlet = new ChattyServlet(server.ctx(), new String[] { 
PATH });
+        server.addServlet(servlet);
+        webMgr.add(server);
+        webMgr.start();
+        try {
+            request(numRequests);
+            for (Thread thread : THREADS) {
+                thread.join();
+            }
+            Assert.assertEquals(numRequests, SUCCESS_COUNT.get());
+            Assert.assertEquals(0, UNAVAILABLE_COUNT.get());
             Assert.assertEquals(0, OTHER_COUNT.get());
         } finally {
             webMgr.stop();
@@ -136,8 +171,15 @@
                     } else {
                         OTHER_COUNT.incrementAndGet();
                     }
-                    InputStream responseStream = 
response.getEntity().getContent();
-                    IOUtils.closeQuietly(responseStream);
+                    InputStream in = response.getEntity().getContent();
+                    if (PRINT_TO_CONSOLE) {
+                        BufferedReader reader = new BufferedReader(new 
InputStreamReader(in));
+                        String line = null;
+                        while ((line = reader.readLine()) != null) {
+                            System.out.println(line);
+                        }
+                    }
+                    IOUtils.closeQuietly(in);
                 } catch (Throwable th) {
                     th.printStackTrace();
                 }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1935
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I08bac47ea28f66502b99df6fb8ff91dd85566d38
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <bamou...@gmail.com>

Reply via email to