Xikui Wang has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/2321

Change subject: [ASTERIXDB-2264][ING] Introduce Http Feed
......................................................................

[ASTERIXDB-2264][ING] Introduce Http Feed

1. HttpSever Interface refactoring
2. Add authentication server
3. Test case refactoring
4. Http Feed

Change-Id: I9c03bddd0df59b69a1f2d6e989b8ee93cf50a6c0
---
M 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
M 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionTest.java
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.1.ddl.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.2.update.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.3.post.http
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.4.post.http
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.5.update.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.6.query.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.8.ddl.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/http_feed/http_feed.1.adm
A 
asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/http_feed/http_feed.2.adm
M asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
A 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/http/HttpServerRecordReader.java
A 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/http/HttpServerRecordReaderFactory.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
M 
asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
A 
hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IAuthenticator.java
A 
hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/AuthenticatedHttpServer.java
A 
hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/AuthenticatedHttpServerHandler.java
M 
hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
A 
hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/authenticator/BasicAuthenticator.java
27 files changed, 785 insertions(+), 122 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/21/2321/1

diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
index c3c450c..2422cd8 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
@@ -38,6 +38,7 @@
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Base64;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -57,6 +58,7 @@
 import java.util.regex.Pattern;
 import java.util.stream.Stream;
 
+import io.netty.handler.codec.base64.Base64Encoder;
 import org.apache.asterix.api.http.server.QueryServiceServlet;
 import org.apache.asterix.app.external.IExternalUDFLibrarian;
 import org.apache.asterix.common.api.Duration;
@@ -86,9 +88,11 @@
 import org.apache.http.entity.StringEntity;
 import org.apache.http.impl.client.HttpClients;
 import org.apache.http.impl.client.StandardHttpRequestRetryHandler;
+import org.apache.http.message.BasicHeader;
 import org.apache.http.protocol.HttpContext;
 import org.apache.http.util.EntityUtils;
 import org.apache.hyracks.util.StorageUtil;
+import org.apache.hyracks.util.StringUtil;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -124,6 +128,8 @@
     private static final Pattern HTTP_BODY_PATTERN = 
Pattern.compile("body=(.*)", Pattern.MULTILINE);
     private static final Pattern HTTP_STATUSCODE_PATTERN = 
Pattern.compile("statuscode (.*)", Pattern.MULTILINE);
     private static final Pattern MAX_RESULT_READS_PATTERN = 
Pattern.compile("maxresultreads=(\\d+)(\\D|$)", Pattern.MULTILINE);
+    private static final Pattern HTTP_AUTHORIZATION_PATTERN = 
Pattern.compile("auth=(.*)");
+
     public static final int TRUNCATE_THRESHOLD = 16384;
 
     public static final String DELIVERY_ASYNC = "async";
@@ -618,7 +624,8 @@
         return builder.build();
     }
 
-    private HttpUriRequest buildRequest(String method, URI uri, 
List<Parameter> params, Optional<String> body) {
+    private HttpUriRequest buildRequest(String method, URI uri, 
List<Parameter> params, Optional<String> body,
+            String auth) {
         RequestBuilder builder = RequestBuilder.create(method);
         builder.setUri(uri);
         for (Parameter param : params) {
@@ -628,12 +635,16 @@
         if (body.isPresent()) {
             builder.setEntity(new StringEntity(body.get(), 
StandardCharsets.UTF_8));
         }
+        if (auth != null) {
+            builder.addHeader("Authorization",
+                    "Basic " + 
Base64.getEncoder().encodeToString(auth.getBytes()));
+        }
         return builder.build();
     }
 
     private HttpUriRequest buildRequest(String method, URI uri, OutputFormat 
fmt, List<Parameter> params,
-            Optional<String> body) {
-        HttpUriRequest request = buildRequest(method, uri, params, body);
+            Optional<String> body, String auth) {
+        HttpUriRequest request = buildRequest(method, uri, params, body, auth);
         // Set accepted output response type
         request.setHeader("Accept", fmt.mimeType());
         return request;
@@ -704,21 +715,22 @@
 
     public InputStream executeJSONGet(OutputFormat fmt, URI uri, 
List<Parameter> params,
             Predicate<Integer> responseCodeValidator) throws Exception {
-        return executeJSON(fmt, "GET", uri, params, responseCodeValidator, 
Optional.empty());
+        return executeJSON(fmt, "GET", uri, params, responseCodeValidator, 
Optional.empty(), null);
     }
 
     public InputStream executeJSON(OutputFormat fmt, String method, URI uri, 
List<Parameter> params) throws Exception {
-        return executeJSON(fmt, method, uri, params, code -> code == 
HttpStatus.SC_OK, Optional.empty());
+        return executeJSON(fmt, method, uri, params, code -> code == 
HttpStatus.SC_OK, Optional.empty(), null);
     }
 
     public InputStream executeJSON(OutputFormat fmt, String method, URI uri, 
Predicate<Integer> responseCodeValidator)
             throws Exception {
-        return executeJSON(fmt, method, uri, Collections.emptyList(), 
responseCodeValidator, Optional.empty());
+        return executeJSON(fmt, method, uri, Collections.emptyList(), 
responseCodeValidator, Optional.empty(), null);
     }
 
     public InputStream executeJSON(OutputFormat fmt, String method, URI uri, 
List<Parameter> params,
-            Predicate<Integer> responseCodeValidator, Optional<String> body) 
throws Exception {
-        HttpUriRequest request = buildRequest(method, uri, fmt, params, body);
+            Predicate<Integer> responseCodeValidator, Optional<String> body, 
String auth)
+            throws Exception {
+        HttpUriRequest request = buildRequest(method, uri, fmt, params, body, 
auth);
         HttpResponse response = executeAndCheckHttpRequest(request, 
responseCodeValidator);
         return response.getEntity().getContent();
     }
@@ -1163,15 +1175,18 @@
             int numResultFiles, String extension, ComparisonEnum compare) 
throws Exception {
         String handleVar = getHandleVariable(statement);
         final String trimmedPathAndQuery = stripAllComments(statement).trim();
+        final String auth = getAuthorizationInfo(statement);
         final String variablesReplaced = replaceVarRef(trimmedPathAndQuery, 
variableCtx);
         final List<Parameter> params = extractParameters(statement);
         final Optional<String> body = extractBody(statement);
         final Predicate<Integer> statusCodePredicate = 
extractStatusCodePredicate(statement);
         InputStream resultStream;
         if ("http".equals(extension)) {
-            resultStream = executeHttp(reqType, variablesReplaced, fmt, 
params, statusCodePredicate, body);
+            resultStream = executeHttp(reqType, variablesReplaced, fmt, 
params, statusCodePredicate, body,
+                    auth);
         } else if ("uri".equals(extension)) {
-            resultStream = executeURI(reqType, URI.create(variablesReplaced), 
fmt, params, statusCodePredicate, body);
+            resultStream = executeURI(reqType, URI.create(variablesReplaced), 
fmt, params, statusCodePredicate, body,
+                    auth);
         } else {
             throw new IllegalArgumentException("Unexpected format for method " 
+ reqType + ": " + extension);
         }
@@ -1394,6 +1409,11 @@
         return handleVariableMatcher.find() ? handleVariableMatcher.group(1) : 
null;
     }
 
+    protected static String getAuthorizationInfo(String statement) {
+        final Matcher authorizationInfoMatcher = 
HTTP_AUTHORIZATION_PATTERN.matcher(statement);
+        return authorizationInfoMatcher.find() ? 
authorizationInfoMatcher.group(1) : null;
+    }
+
     protected static String replaceVarRef(String statement, Map<String, 
Object> variableCtx) {
         String tmpStmt = statement;
         Matcher variableReferenceMatcher = 
VARIABLE_REF_PATTERN.matcher(tmpStmt);
@@ -1449,10 +1469,10 @@
     }
 
     protected InputStream executeHttp(String ctxType, String endpoint, 
OutputFormat fmt, List<Parameter> params,
-            Predicate<Integer> statusCodePredicate, Optional<String> body) 
throws Exception {
+            Predicate<Integer> statusCodePredicate, Optional<String> body, 
String auth) throws Exception {
         String[] split = endpoint.split("\\?");
         URI uri = createEndpointURI(split[0], split.length > 1 ? split[1] : 
null);
-        return executeURI(ctxType, uri, fmt, params, statusCodePredicate, 
body);
+        return executeURI(ctxType, uri, fmt, params, statusCodePredicate, 
body, auth);
     }
 
     private InputStream executeURI(String ctxType, URI uri, OutputFormat fmt, 
List<Parameter> params) throws Exception {
@@ -1460,8 +1480,9 @@
     }
 
     private InputStream executeURI(String ctxType, URI uri, OutputFormat fmt, 
List<Parameter> params,
-            Predicate<Integer> responseCodeValidator, Optional<String> body) 
throws Exception {
-        return executeJSON(fmt, ctxType.toUpperCase(), uri, params, 
responseCodeValidator, body);
+            Predicate<Integer> responseCodeValidator, Optional<String> body, 
String auth)
+            throws Exception {
+        return executeJSON(fmt, ctxType.toUpperCase(), uri, params, 
responseCodeValidator, body, auth);
     }
 
     public void killNC(String nodeId, CompilationUnit cUnit) throws Exception {
@@ -1619,6 +1640,7 @@
 
     protected URI createEndpointURI(String path, String query) throws 
URISyntaxException {
         InetSocketAddress endpoint;
+        int port = -1;
         if (!path.startsWith("nc:")) {
             int endpointIdx = Math.abs(endpointSelector++ % endpoints.size());
             endpoint = endpoints.get(endpointIdx);
@@ -1628,10 +1650,15 @@
                 throw new IllegalArgumentException("Unrecognized http 
pattern");
             }
             String nodeId = tokens[0].substring(3);
+            if (nodeId.contains(":")) {
+                String nodeParts[] = StringUtils.split(nodeId, ':');
+                nodeId = nodeParts[0];
+                port = Integer.valueOf(nodeParts[1]);
+            }
             endpoint = getNcEndPoint(nodeId);
             path = tokens[1];
         }
-        URI uri = new URI("http", null, endpoint.getHostString(), 
endpoint.getPort(), path, query, null);
+        URI uri = new URI("http", null, endpoint.getHostString(), port == -1 ? 
endpoint.getPort() : port, path, query, null);
         LOGGER.debug("Created endpoint URI: " + uri);
         return uri;
     }
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionTest.java
index f9f57a3..b4fb7d5 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionTest.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionTest.java
@@ -18,10 +18,17 @@
  */
 package org.apache.asterix.test.runtime;
 
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
 
+import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.test.common.TestExecutor;
 import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.hyracks.control.common.controllers.NCConfig;
+import org.apache.hyracks.control.nc.NodeControllerService;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -38,7 +45,16 @@
 
     @BeforeClass
     public static void setUp() throws Exception {
-        LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, new TestExecutor());
+        TestExecutor testExecutor = new TestExecutor();
+        LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, testExecutor);
+        final NodeControllerService[] ncs = 
ExecutionTestUtil.integrationUtil.ncs;
+        Map<String, InetSocketAddress> ncEndPoints = new HashMap<>();
+        final String ip = InetAddress.getLoopbackAddress().getHostAddress();
+        for (NodeControllerService nc : ncs) {
+            final String nodeId = nc.getId();
+            ncEndPoints.put(nodeId, InetSocketAddress.createUnresolved(ip, 0));
+        }
+        testExecutor.setNcEndPoints(ncEndPoints);
     }
 
     @AfterClass
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.1.ddl.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.1.ddl.sqlpp
new file mode 100644
index 0000000..3edf6e3
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.1.ddl.sqlpp
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+drop dataverse experiments if exists;
+create dataverse experiments;
+use experiments;
+
+create type TweetMessageType as open {
+    id : string
+};
+
+create dataset Tweets(TweetMessageType) primary key id;
+
+create feed TweetFeed with {
+  "adapter-name" : "http_adapter",
+  "addresses" : "asterix_nc2:10012",
+  "address-type" : "NC",
+  "type-name" : "TweetMessageType",
+  "format" : "adm",
+  "username" : "till",
+  "password" : "westmann"
+};
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.2.update.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.2.update.sqlpp
new file mode 100644
index 0000000..fd41b72
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.2.update.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use experiments;
+set `wait-for-completion-feed` "false";
+
+connect feed TweetFeed to dataset Tweets;
+
+start feed TweetFeed;
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.3.post.http
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.3.post.http
new file mode 100644
index 0000000..d363e55
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.3.post.http
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+nc:asterix_nc2:10012 /
+--auth=till:westmann
+--body={ "id": "nc1:11", "username": "magarika", "location": "", "text": "RT 
@ken24xavier: Obama tells SOROS - our plan is ALMOST finished 
http://t.co/WvzK0GtU";, "timestamp": "Thu Dec 06 16:53:05 PST 2012" }{ "id": 
"nc1:102", "username": "jaysauce82", "location": "", "text": "Not voting for 
President Obama #BadDecision", "timestamp": "Thu Dec 06 16:53:16 PST 2012" }
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.4.post.http
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.4.post.http
new file mode 100644
index 0000000..5fcb222
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.4.post.http
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+nc:asterix_nc2:10012 /
+--auth=till:eastmann
+--body={ "id": "nc1:11", "username": "magarika", "location": "", "text": "RT 
@ken24xavier: Obama tells SOROS - our plan is ALMOST finished 
http://t.co/WvzK0GtU";, "timestamp": "Thu Dec 06 16:53:05 PST 2012" }
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.5.update.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.5.update.sqlpp
new file mode 100644
index 0000000..7a07621
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.5.update.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use experiments;
+stop feed TweetFeed;
+disconnect feed TweetFeed from dataset Tweets;
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.6.query.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.6.query.sqlpp
new file mode 100644
index 0000000..bcddf7a
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.6.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use experiments;
+
+select value count(t) from Tweets as t;
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.8.ddl.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.8.ddl.sqlpp
new file mode 100644
index 0000000..e4d2615
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.8.ddl.sqlpp
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use experiments;
+drop dataverse experiments;
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/http_feed/http_feed.1.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/http_feed/http_feed.1.adm
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/http_feed/http_feed.1.adm
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/http_feed/http_feed.2.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/http_feed/http_feed.2.adm
new file mode 100644
index 0000000..d8263ee
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/http_feed/http_feed.2.adm
@@ -0,0 +1 @@
+2
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml 
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 16fa334..4dfc241 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -8403,6 +8403,12 @@
         <output-dir compare="Text">connect-feed-with-function</output-dir>
       </compilation-unit>
     </test-case>
+    <test-case FilePath="feeds">
+      <compilation-unit name="http_feed">
+        <output-dir compare="Text">http_feed</output-dir>
+        <expected-error>HTTP/1.1 401 Unauthorized</expected-error>
+      </compilation-unit>
+    </test-case>
   </test-group>
   <test-group name="hdfs">
     <test-case FilePath="hdfs">
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
index 4c3b7e6..efa12ef 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
@@ -38,7 +38,7 @@
      * The data source type indicates whether the data source produces a 
continuous stream or
      * a set of records
      */
-    public enum DataSourceType {
+    enum DataSourceType {
         STREAM,
         RECORDS
     }
@@ -46,7 +46,7 @@
     /**
      * @return The data source type {STREAM or RECORDS}
      */
-    public DataSourceType getDataSourceType();
+    DataSourceType getDataSourceType();
 
     /**
      * Specifies on which locations this data source is expected to run.
@@ -54,7 +54,7 @@
      * @return
      * @throws AsterixException
      */
-    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() 
throws AlgebricksException;
+    AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws 
AlgebricksException;
 
     /**
      * Configure the data parser factory. The passed map contains key value 
pairs from the
@@ -63,7 +63,7 @@
      * @param configuration
      * @throws AsterixException
      */
-    public void configure(IServiceContext ctx, Map<String, String> 
configuration)
+    void configure(IServiceContext ctx, Map<String, String> configuration)
             throws AlgebricksException, HyracksDataException;
 
     /**
@@ -71,7 +71,7 @@
      *
      * @return
      */
-    public default boolean isIndexible() {
+    default boolean isIndexible() {
         return false;
     }
 
@@ -84,7 +84,7 @@
      * @return
      * @throws AlgebricksException
      */
-    public static AlgebricksAbsolutePartitionConstraint 
getPartitionConstraints(ICcApplicationContext appCtx,
+    static AlgebricksAbsolutePartitionConstraint 
getPartitionConstraints(ICcApplicationContext appCtx,
             AlgebricksAbsolutePartitionConstraint constraints, int count) 
throws AlgebricksException {
         if (constraints == null) {
             IClusterStateManager clusterStateManager = 
appCtx.getClusterStateManager();
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/http/HttpServerRecordReader.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/http/HttpServerRecordReader.java
new file mode 100644
index 0000000..a8fe2af
--- /dev/null
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/http/HttpServerRecordReader.java
@@ -0,0 +1,174 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *   http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+package org.apache.asterix.external.input.record.reader.http;
+
+import io.netty.handler.codec.http.HttpMethod;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
+import org.apache.asterix.external.input.record.GenericRecord;
+import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.http.api.IServletRequest;
+import org.apache.hyracks.http.api.IServletResponse;
+import org.apache.hyracks.http.server.AbstractServlet;
+import org.apache.hyracks.http.server.AuthenticatedHttpServer;
+import org.apache.hyracks.http.server.HttpServer;
+import org.apache.hyracks.http.server.WebManager;
+import org.apache.hyracks.http.server.authenticator.BasicAuthenticator;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class HttpServerRecordReader implements IRecordReader<char[]> {
+
+    private static String DEFAULT_ENTRY_POINT = "/";
+    private LinkedBlockingQueue<String> inputQ;
+    private GenericRecord<char[]> record;
+    private boolean closed = false;
+    private WebManager webManager;
+    private HttpServer webServer;
+
+    public HttpServerRecordReader(int port, String username, String password, 
String entryPoint) throws Exception {
+        this.inputQ = new LinkedBlockingQueue<>();
+        this.record = new GenericRecord<>();
+        webManager = new WebManager();
+        if (username == null || password == null) {
+            webServer = new HttpServer(webManager.getBosses(), 
webManager.getWorkers(), port);
+        } else {
+            webServer = new AuthenticatedHttpServer(webManager.getBosses(), 
webManager.getWorkers(), port,
+                    new BasicAuthenticator(username, password));
+        }
+        webServer.addServlet(new HttpFeedServlet(webServer.ctx(),
+                new String[] { entryPoint == null ? DEFAULT_ENTRY_POINT : 
entryPoint }, inputQ));
+        webManager.add(webServer);
+        webManager.start();
+    }
+
+    @Override
+    public boolean hasNext() {
+        return !closed;
+    }
+
+    @Override
+    public IRawRecord<char[]> next() throws IOException, InterruptedException {
+        String srecord = inputQ.poll();
+        if (srecord == null) {
+            return null;
+        }
+        record.set(srecord.toCharArray());
+        return record;
+    }
+
+    @Override
+    public boolean stop() {
+        try {
+            close();
+        } catch (Exception e) {
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public void setController(AbstractFeedDataFlowController controller) {
+        // do nothing
+    }
+
+    @Override
+    public void setFeedLogManager(FeedLogManager feedLogManager) throws 
HyracksDataException {
+        // do nothing
+    }
+
+    @Override
+    public boolean handleException(Throwable th) {
+        return false;
+    }
+
+    @Override
+    public void close() throws IOException {
+        try {
+            if (!closed) {
+                System.out.println("RR close requested ");
+                webManager.stop();
+                closed = true;
+                System.out.println("RR closed ");
+            }
+        } catch (Exception e) {
+            throw new IOException(e);
+        }
+    }
+
+    private class HttpFeedServlet extends AbstractServlet {
+
+        private LinkedBlockingQueue<String> inputQ;
+
+        private void splitIntoRecords(String admData) throws 
InterruptedException {
+            int p = 0, cnt = 0;
+            boolean record = false;
+            char[] charBuff = admData.toCharArray();
+            for (int iter1 = 0; iter1 < charBuff.length; iter1++) {
+                if (charBuff[iter1] == '{') {
+                    if (record == false) {
+                        p = iter1;
+                        record = true;
+                    }
+                    cnt++;
+                } else if (charBuff[iter1] == '}') {
+                    cnt--;
+                }
+                if (cnt == 0) {
+                    if (record) {
+                        inputQ.put(admData.substring(p, iter1 + 1) + '\n');
+                        record = false;
+                    }
+                    p = iter1;
+                }
+            }
+        }
+
+        public HttpFeedServlet(ConcurrentMap<String, Object> ctx, String[] 
paths, LinkedBlockingQueue<String> inputQ) {
+            super(ctx, paths);
+            this.inputQ = inputQ;
+        }
+
+        private void doPost(IServletRequest request) throws 
InterruptedException {
+            
splitIntoRecords(request.getHttpRequest().content().toString(StandardCharsets.UTF_8));
+        }
+
+        @Override
+        public void handle(IServletRequest request, IServletResponse response) 
{
+            if (request.getHttpRequest().method() == HttpMethod.POST) {
+                try {
+                    doPost(request);
+                    response.setStatus(HttpResponseStatus.OK);
+                } catch (InterruptedException e) {
+                    
response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
+                }
+            } else {
+                response.setStatus(HttpResponseStatus.METHOD_NOT_ALLOWED);
+            }
+        }
+    }
+}
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/http/HttpServerRecordReaderFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/http/HttpServerRecordReaderFactory.java
new file mode 100644
index 0000000..7532e2b
--- /dev/null
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/http/HttpServerRecordReaderFactory.java
@@ -0,0 +1,103 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *   http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+package org.apache.asterix.external.input.record.reader.http;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.api.IRecordReaderFactory;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.FeedUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class HttpServerRecordReaderFactory implements 
IRecordReaderFactory<char[]> {
+
+    public static final String KEY_CONFIGURATION_USER_NAME = "username";
+    public static final String KEY_CONFIGURATION_PASSWORD = "password";
+    public static final String KEY_CONFIGURATION_ADDRESSES = "addresses";
+    public static final String KEY_CONFIGURATION_PATH = "path";
+
+    private static final List<String> recordReaderNames = Collections
+            
.unmodifiableList(Arrays.asList(ExternalDataConstants.KEY_ADAPTER_NAME_HTTP));
+
+    private String username;
+    private String password;
+    private String entryPoint;
+    private String addrValue;
+    private Map<String, String> configurations;
+    private List<Pair<String, Integer>> serverAddrs;
+
+    @Override
+    public IRecordReader<? extends char[]> 
createRecordReader(IHyracksTaskContext ctx, int partition)
+            throws HyracksDataException {
+        try {
+            return new 
HttpServerRecordReader(serverAddrs.get(partition).getRight(), username, 
password, entryPoint);
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    @Override
+    public Class<?> getRecordClass() {
+        return char[].class;
+    }
+
+    @Override
+    public List<String> getRecordReaderNames() {
+        return recordReaderNames;
+    }
+
+    @Override
+    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
+        return FeedUtils.addressToAbsolutePartitionConstraints(serverAddrs);
+    }
+
+    private String getConfigurationValue(String key, boolean required) throws 
CompilationException {
+        String value = configurations.get(key);
+        if (value == null && required) {
+            throw new CompilationException("Required configuration missing: " 
+ key);
+        }
+        return value;
+    }
+
+    @Override
+    public void configure(IServiceContext ctx, Map<String, String> 
configuration)
+            throws AlgebricksException {
+        this.configurations = configuration;
+        // necessary configs
+        addrValue = getConfigurationValue(KEY_CONFIGURATION_ADDRESSES, true);
+        serverAddrs = 
FeedUtils.extractHostsPorts(getConfigurationValue(ExternalDataConstants.KEY_MODE,
 true), ctx,
+                addrValue);
+        // optional configs
+        username = getConfigurationValue(KEY_CONFIGURATION_USER_NAME, false);
+        password = getConfigurationValue(KEY_CONFIGURATION_PASSWORD, false);
+        entryPoint = getConfigurationValue(KEY_CONFIGURATION_PATH, false);
+    }
+}
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
index 0591775..8e2adda 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
@@ -24,10 +24,8 @@
 import java.util.List;
 import java.util.Map;
 
-import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.external.api.AsterixInputStream;
 import org.apache.asterix.external.api.IInputStreamFactory;
 import org.apache.asterix.external.api.IRecordReader;
 import org.apache.asterix.external.api.IRecordReaderFactory;
@@ -49,8 +47,8 @@
     protected Map<String, String> configuration;
     protected Class recordReaderClazz;
     private static final List<String> recordReaderNames = 
Collections.unmodifiableList(
-            Arrays.asList(ExternalDataConstants.ALIAS_LOCALFS_ADAPTER, 
ExternalDataConstants.ALIAS_SOCKET_ADAPTER,
-                    ExternalDataConstants.SOCKET, 
ExternalDataConstants.STREAM_SOCKET_CLIENT));
+            Arrays.asList(ExternalDataConstants.ALIAS_LOCALFS_ADAPTER, 
ExternalDataConstants.KEY_ALIAS_ADAPTER_NAME_SOCKET,
+                    ExternalDataConstants.KEY_ADAPTER_NAME_SOCKET, 
ExternalDataConstants.STREAM_SOCKET_CLIENT));
 
     @Override
     public DataSourceType getDataSourceType() {
@@ -71,8 +69,8 @@
         String reader = config.get(ExternalDataConstants.KEY_READER);
         if (reader.equals(ExternalDataConstants.ALIAS_LOCALFS_ADAPTER)) {
             streamFactory = new LocalFSInputStreamFactory();
-        } else if (reader.equals(ExternalDataConstants.ALIAS_SOCKET_ADAPTER)
-                || reader.equals(ExternalDataConstants.SOCKET)) {
+        } else if 
(reader.equals(ExternalDataConstants.KEY_ALIAS_ADAPTER_NAME_SOCKET)
+                || 
reader.equals(ExternalDataConstants.KEY_ADAPTER_NAME_SOCKET)) {
             streamFactory = new SocketServerInputStreamFactory();
         } else if (reader.equals(ExternalDataConstants.STREAM_SOCKET_CLIENT)) {
             streamFactory = new SocketClientInputStreamFactory();
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
index 8182dcd..7e5e3c8 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
@@ -56,9 +56,9 @@
     private transient IServiceContext serviceCtx;
 
     private static final List<String> recordReaderNames =
-            
Collections.unmodifiableList(Arrays.asList(ExternalDataConstants.READER_TWITTER_PULL,
-                    ExternalDataConstants.READER_TWITTER_PUSH, 
ExternalDataConstants.READER_PUSH_TWITTER,
-                    ExternalDataConstants.READER_PULL_TWITTER, 
ExternalDataConstants.READER_USER_STREAM_TWITTER));
+            
Collections.unmodifiableList(Arrays.asList(ExternalDataConstants.KEY_ADAPTER_NAME_TWITTER_PULL,
+                    ExternalDataConstants.KEY_ADAPTER_NAME_TWITTER_PUSH, 
ExternalDataConstants.KEY_ADAPTER_NAME_PUSH_TWITTER,
+                    ExternalDataConstants.KEY_ADAPTER_NAME_PULL_TWITTER, 
ExternalDataConstants.KEY_ADAPTER_NAME_TWITTER_USER_STREAM));
 
     @Override
     public DataSourceType getDataSourceType() {
@@ -98,7 +98,7 @@
             throw new AsterixException(builder.toString());
         }
 
-        if 
(configuration.get(ExternalDataConstants.KEY_READER).equals(ExternalDataConstants.READER_PULL_TWITTER))
 {
+        if 
(configuration.get(ExternalDataConstants.KEY_READER).equals(ExternalDataConstants.KEY_ADAPTER_NAME_PULL_TWITTER))
 {
             if (configuration.get(SearchAPIConstants.QUERY) == null) {
                 throw new AsterixException(
                         "parameter " + SearchAPIConstants.QUERY + " not 
specified as part of adaptor configuration");
@@ -131,12 +131,12 @@
             throws HyracksDataException {
         IRecordReader<? extends String> recordReader;
         switch (configuration.get(ExternalDataConstants.KEY_READER)) {
-            case ExternalDataConstants.READER_PULL_TWITTER:
+            case ExternalDataConstants.KEY_ADAPTER_NAME_PULL_TWITTER:
                 recordReader = new 
TwitterPullRecordReader(TwitterUtil.getTwitterService(configuration),
                         configuration.get(SearchAPIConstants.QUERY),
                         
Integer.parseInt(configuration.get(SearchAPIConstants.INTERVAL)));
                 break;
-            case ExternalDataConstants.READER_PUSH_TWITTER:
+            case ExternalDataConstants.KEY_ADAPTER_NAME_PUSH_TWITTER:
                 FilterQuery query;
                 try {
                     query = TwitterUtil.getFilterQuery(configuration);
@@ -149,7 +149,7 @@
                     throw new HyracksDataException(e);
                 }
                 break;
-            case ExternalDataConstants.READER_USER_STREAM_TWITTER:
+            case ExternalDataConstants.KEY_ADAPTER_NAME_TWITTER_USER_STREAM:
                 recordReader = new 
TwitterPushRecordReader(TwitterUtil.getTwitterStream(configuration),
                         TwitterUtil.getUserTweetsListener());
                 break;
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java
index 1f1fa5c..86a4db6 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java
@@ -19,28 +19,20 @@
 package org.apache.asterix.external.input.stream.factory;
 
 import java.io.IOException;
-import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
-import java.util.Set;
 
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.external.api.AsterixInputStream;
 import org.apache.asterix.external.api.IInputStreamFactory;
 import org.apache.asterix.external.input.stream.SocketServerInputStream;
 import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.runtime.utils.RuntimeUtils;
-import org.apache.commons.lang3.StringUtils;
+import org.apache.asterix.external.util.FeedUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.api.application.IServiceContext;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -49,66 +41,15 @@
 
     private static final long serialVersionUID = 1L;
     private List<Pair<String, Integer>> sockets;
-    private Mode mode = Mode.IP;
-
-    public static enum Mode {
-        NC,
-        IP
-    }
 
     @Override
     public void configure(IServiceContext serviceCtx, Map<String, String> 
configuration)
-            throws AsterixException, CompilationException {
+            throws CompilationException {
         try {
-            sockets = new ArrayList<>();
-            String modeValue = 
configuration.get(ExternalDataConstants.KEY_MODE);
-            if (modeValue != null) {
-                mode = Mode.valueOf(modeValue.trim().toUpperCase());
-            }
-            String socketsValue = 
configuration.get(ExternalDataConstants.KEY_SOCKETS);
-            if (socketsValue == null) {
-                throw new 
CompilationException(ErrorCode.FEED_METADATA_SOCKET_ADAPTOR_SOCKET_NOT_PROPERLY_CONFIGURED);
-            }
-            Map<InetAddress, Set<String>> ncMap;
-            ncMap = RuntimeUtils.getNodeControllerMap((ICcApplicationContext) 
serviceCtx.getApplicationContext());
-            List<String> ncs =
-                    RuntimeUtils.getAllNodeControllers((ICcApplicationContext) 
serviceCtx.getApplicationContext());
-            String[] socketsArray = socketsValue.split(",");
-            Random random = new Random();
-            for (String socket : socketsArray) {
-                String[] socketTokens = socket.split(":");
-                String host = socketTokens[0].trim();
-                int port = Integer.parseInt(socketTokens[1].trim());
-                Pair<String, Integer> p = null;
-                switch (mode) {
-                    case IP:
-                        Set<String> ncsOnIp = 
ncMap.get(InetAddress.getByName(host));
-                        if ((ncsOnIp == null) || ncsOnIp.isEmpty()) {
-                            throw new CompilationException(
-                                    
ErrorCode.FEED_METADATA_SOCKET_ADAPTOR_SOCKET_INVALID_HOST_NC, "host", host,
-                                    StringUtils.join(ncMap.keySet(), ", "));
-                        }
-                        String[] ncArray = ncsOnIp.toArray(new String[] {});
-                        String nc = ncArray[random.nextInt(ncArray.length)];
-                        p = new Pair<>(nc, port);
-                        break;
-
-                    case NC:
-                        p = new Pair<>(host, port);
-                        if (!ncs.contains(host)) {
-                            throw new CompilationException(
-                                    
ErrorCode.FEED_METADATA_SOCKET_ADAPTOR_SOCKET_INVALID_HOST_NC, "NC", host,
-                                    StringUtils.join(ncs, ", "));
-
-                        }
-                        break;
-                }
-                sockets.add(p);
-            }
+            sockets = 
FeedUtils.extractHostsPorts(configuration.get(ExternalDataConstants.KEY_MODE),
+                    serviceCtx, 
configuration.get(ExternalDataConstants.KEY_SOCKETS));
         } catch (CompilationException e) {
             throw e;
-        } catch (HyracksDataException | UnknownHostException e) {
-            throw new AsterixException(e);
         } catch (Exception e) {
             throw new 
CompilationException(ErrorCode.FEED_METADATA_SOCKET_ADAPTOR_SOCKET_NOT_PROPERLY_CONFIGURED);
         }
@@ -121,7 +62,7 @@
             Pair<String, Integer> socket = sockets.get(partition);
             ServerSocket server;
             server = new ServerSocket();
-            server.bind(new InetSocketAddress(socket.second));
+            server.bind(new InetSocketAddress(socket.getRight()));
             return new SocketServerInputStream(server);
         } catch (IOException e) {
             throw HyracksDataException.create(e);
@@ -130,11 +71,7 @@
 
     @Override
     public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
-        List<String> locations = new ArrayList<>();
-        for (Pair<String, Integer> socket : sockets) {
-            locations.add(socket.first);
-        }
-        return new AlgebricksAbsolutePartitionConstraint(locations.toArray(new 
String[] {}));
+        return FeedUtils.addressToAbsolutePartitionConstraints(sockets);
     }
 
     public List<Pair<String, Integer>> getSockets() {
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
index 859c9fd..b35cbb6 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
@@ -72,11 +72,11 @@
                     streamSource);
         } else {
             switch (streamSource) {
-                case ExternalDataConstants.STREAM_LOCAL_FILESYSTEM:
+                case ExternalDataConstants.KEY_ADAPTER_NAME_LOCALFS:
                     streamSourceFactory = new LocalFSInputStreamFactory();
                     break;
-                case ExternalDataConstants.SOCKET:
-                case ExternalDataConstants.ALIAS_SOCKET_ADAPTER:
+                case ExternalDataConstants.KEY_ADAPTER_NAME_SOCKET:
+                case ExternalDataConstants.KEY_ALIAS_ADAPTER_NAME_SOCKET:
                     streamSourceFactory = new SocketServerInputStreamFactory();
                     break;
                 case ExternalDataConstants.STREAM_SOCKET_CLIENT:
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 494efb5..7814b16 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -101,6 +101,21 @@
     public static final String KEY_HTTP_PROXY_PORT = "http-proxy-port";
     public static final String KEY_HTTP_PROXY_USER = "http-proxy-user";
     public static final String KEY_HTTP_PROXY_PASSWORD = "http-proxy-password";
+
+    /**
+     *  Keys for adapter name
+     **/
+    public static final String KEY_ADAPTER_NAME_TWITTER_PUSH = "twitter_push";
+    public static final String KEY_ADAPTER_NAME_PUSH_TWITTER = "push_twitter";
+    public static final String KEY_ADAPTER_NAME_TWITTER_PULL = "twitter_pull";
+    public static final String KEY_ADAPTER_NAME_PULL_TWITTER = "pull_twitter";
+    public static final String KEY_ADAPTER_NAME_TWITTER_USER_STREAM = 
"twitter_user_stream";
+    public static final String KEY_ADAPTER_NAME_LOCALFS = "localfs";
+    public static final String KEY_ADAPTER_NAME_SOCKET = "socket";
+    public static final String KEY_ALIAS_ADAPTER_NAME_SOCKET = 
"socket_adapter";
+    public static final String KEY_ADAPTER_NAME_HTTP = "http_adapter";
+
+
     /**
      * HDFS class names
      */
@@ -122,11 +137,7 @@
      * Builtin record readers
      */
     public static final String READER_HDFS = "hdfs";
-    public static final String READER_TWITTER_PUSH = "twitter_push";
-    public static final String READER_PUSH_TWITTER = "push_twitter";
-    public static final String READER_TWITTER_PULL = "twitter_pull";
-    public static final String READER_PULL_TWITTER = "pull_twitter";
-    public static final String READER_USER_STREAM_TWITTER = 
"twitter_user_stream";
+
 
     public static final String CLUSTER_LOCATIONS = "cluster-locations";
     public static final String SCHEDULER = "hdfs-scheduler";
@@ -157,8 +168,6 @@
      * input streams
      */
     public static final String STREAM_HDFS = "hdfs";
-    public static final String STREAM_LOCAL_FILESYSTEM = "localfs";
-    public static final String SOCKET = "socket";
     public static final String STREAM_SOCKET_CLIENT = "socket-client";
 
     /**
@@ -168,13 +177,7 @@
     public static final String ALIAS_LOCALFS_ADAPTER = "localfs";
     public static final String ALIAS_LOCALFS_PUSH_ADAPTER = "push_localfs";
     public static final String ALIAS_HDFS_ADAPTER = "hdfs";
-    public static final String ALIAS_SOCKET_ADAPTER = "socket_adapter";
     public static final String ALIAS_SOCKET_CLIENT_ADAPTER = "socket_client";
-    public static final String ALIAS_RSS_ADAPTER = "rss";
-    public static final String ALIAS_FILE_FEED_ADAPTER = "file_feed";
-    public static final String ALIAS_TWITTER_PUSH_ADAPTER = "push_twitter";
-    public static final String ALIAS_TWITTER_PULL_ADAPTER = "pull_twitter";
-    public static final String ALIAS_CNN_ADAPTER = "cnn_feed";
     public static final String ALIAS_FEED_WITH_META_ADAPTER = "feed_with_meta";
     public static final String ALIAS_CHANGE_FEED_WITH_META_ADAPTER = 
"change_feed_with_meta";
     // for testing purposes
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
index 8481064..32fe0c9 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
@@ -19,19 +19,30 @@
 package org.apache.asterix.external.util;
 
 import java.io.File;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
+import java.util.Set;
 
 import org.apache.asterix.common.api.IClusterManagementWork;
 import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.utils.StoragePathUtil;
+import org.apache.asterix.runtime.utils.RuntimeUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint.PartitionConstraintType;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.application.IServiceContext;
 import org.apache.hyracks.api.comm.FrameHelper;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -135,4 +146,57 @@
         return configuration.get(ExternalDataConstants.KEY_META_TYPE_NAME);
 
     }
+
+    public static List<Pair<String, Integer>> extractHostsPorts(String 
modeValue, IServiceContext serviceCtx,
+            String hostsValue) throws AlgebricksException {
+        try {
+            List<Pair<String, Integer>> sockets = new ArrayList<>();
+            String mode = modeValue.trim().toUpperCase();
+            if (hostsValue == null) {
+                throw new 
CompilationException(ErrorCode.FEED_METADATA_SOCKET_ADAPTOR_SOCKET_NOT_PROPERLY_CONFIGURED);
+            }
+            Map<InetAddress, Set<String>> ncMap;
+            ncMap = RuntimeUtils.getNodeControllerMap((ICcApplicationContext) 
serviceCtx.getApplicationContext());
+            List<String> ncs = RuntimeUtils
+                    .getAllNodeControllers((ICcApplicationContext) 
serviceCtx.getApplicationContext());
+            String[] socketsArray = hostsValue.split(",");
+            Random random = new Random();
+            for (String socket : socketsArray) {
+                String[] socketTokens = socket.split(":");
+                String host = socketTokens[0].trim();
+                int port = Integer.parseInt(socketTokens[1].trim());
+                Pair<String, Integer> p = null;
+                if (mode.equals("IP")) {
+                    Set<String> ncsOnIp = 
ncMap.get(InetAddress.getByName(host));
+                    if ((ncsOnIp == null) || ncsOnIp.isEmpty()) {
+                        throw new 
CompilationException(ErrorCode.FEED_METADATA_SOCKET_ADAPTOR_SOCKET_INVALID_HOST_NC,
+                                "host", host, StringUtils.join(ncMap.keySet(), 
", "));
+                    }
+                    String[] ncArray = ncsOnIp.toArray(new String[] {});
+                    String nc = ncArray[random.nextInt(ncArray.length)];
+                    p = Pair.of(nc, port);
+                } else if (mode.equals("NC")) {
+                    p = Pair.of(host, port);
+                    if (!ncs.contains(host)) {
+                        throw new 
CompilationException(ErrorCode.FEED_METADATA_SOCKET_ADAPTOR_SOCKET_INVALID_HOST_NC,
+                                "NC", host, StringUtils.join(ncs, ", "));
+
+                    }
+                }
+                sockets.add(p);
+            }
+            return sockets;
+        } catch (HyracksDataException | UnknownHostException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
+    public static AlgebricksAbsolutePartitionConstraint 
addressToAbsolutePartitionConstraints(
+            List<Pair<String, Integer>> sockets) {
+        List<String> locations = new ArrayList<>();
+        for (Pair<String, Integer> socket : sockets) {
+            locations.add(socket.getLeft());
+        }
+        return new AlgebricksAbsolutePartitionConstraint(locations.toArray(new 
String[] {}));
+    }
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
 
b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
index 52e28be..0d96658 100644
--- 
a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
+++ 
b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
@@ -19,3 +19,4 @@
 org.apache.asterix.external.input.record.reader.rss.RSSRecordReaderFactory
 org.apache.asterix.external.input.HDFSDataSourceFactory
 
org.apache.asterix.external.input.record.reader.stream.StreamRecordReaderFactory
+org.apache.asterix.external.input.record.reader.http.HttpServerRecordReaderFactory
diff --git 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IAuthenticator.java
 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IAuthenticator.java
new file mode 100644
index 0000000..9fbc11c
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IAuthenticator.java
@@ -0,0 +1,30 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *   http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+package org.apache.hyracks.http.api;
+
+import io.netty.handler.codec.http.HttpRequest;
+
+public interface IAuthenticator {
+
+    String KEY_REQUEST_AUTHORIZATION = "Authorization";
+
+    boolean validate(HttpRequest request);
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/AuthenticatedHttpServer.java
 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/AuthenticatedHttpServer.java
new file mode 100644
index 0000000..09d9c9c
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/AuthenticatedHttpServer.java
@@ -0,0 +1,41 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *   http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+package org.apache.hyracks.http.server;
+
+import org.apache.hyracks.http.api.IAuthenticator;
+
+import io.netty.channel.EventLoopGroup;
+
+public class AuthenticatedHttpServer extends HttpServer {
+
+    private IAuthenticator authenticator;
+
+    public AuthenticatedHttpServer(EventLoopGroup bossGroup, EventLoopGroup 
workerGroup, int port, IAuthenticator authenticator) {
+        super(bossGroup, workerGroup, port);
+        this.authenticator = authenticator;
+    }
+
+    @Override
+    protected HttpServerHandler<? extends HttpServer> createHttpHandler(int 
chunkSize) {
+        return new AuthenticatedHttpServerHandler(this, chunkSize, 
authenticator);
+    }
+
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/AuthenticatedHttpServerHandler.java
 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/AuthenticatedHttpServerHandler.java
new file mode 100644
index 0000000..f2e3bab
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/AuthenticatedHttpServerHandler.java
@@ -0,0 +1,48 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *   http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+package org.apache.hyracks.http.server;
+
+import java.io.IOException;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import org.apache.hyracks.http.api.IAuthenticator;
+import org.apache.hyracks.http.api.IServlet;
+
+public class AuthenticatedHttpServerHandler extends HttpServerHandler {
+
+    private IAuthenticator authenticator;
+
+    public AuthenticatedHttpServerHandler(HttpServer server, int chunkSize, 
IAuthenticator authenticator) {
+        super(server, chunkSize);
+        this.authenticator = authenticator;
+    }
+
+    @Override
+    protected void submit(ChannelHandlerContext ctx, IServlet servlet, 
FullHttpRequest request) throws IOException {
+        if (!authenticator.validate(request)) {
+            respond(ctx, request.protocolVersion(), 
HttpResponseStatus.UNAUTHORIZED);
+        } else {
+            super.submit(ctx, servlet, request);
+        }
+    }
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
index 9290cdf..0511103 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
@@ -82,7 +82,7 @@
         ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
     }
 
-    private void submit(ChannelHandlerContext ctx, IServlet servlet, 
FullHttpRequest request) throws IOException {
+    protected void submit(ChannelHandlerContext ctx, IServlet servlet, 
FullHttpRequest request) throws IOException {
         IServletRequest servletRequest;
         try {
             servletRequest = HttpUtil.toServletRequest(request);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/authenticator/BasicAuthenticator.java
 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/authenticator/BasicAuthenticator.java
new file mode 100644
index 0000000..11a8aa7
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/authenticator/BasicAuthenticator.java
@@ -0,0 +1,45 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *   http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+package org.apache.hyracks.http.server.authenticator;
+
+import io.netty.handler.codec.http.HttpRequest;
+import org.apache.hyracks.http.api.IAuthenticator;
+
+import java.util.Base64;
+
+public class BasicAuthenticator implements IAuthenticator {
+
+    public static final String SUFFIX_BASIC_AUTHENTICATION = "Basic ";
+
+    private String secret;
+
+    public BasicAuthenticator(String username, String password) {
+        this.secret = SUFFIX_BASIC_AUTHENTICATION
+                + Base64.getEncoder().encodeToString((username + ":" + 
password).getBytes());
+
+    }
+
+    @Override
+    public boolean validate(HttpRequest request) {
+        String authorizationInfo = 
request.headers().get(KEY_REQUEST_AUTHORIZATION);
+        return (authorizationInfo != null && 
authorizationInfo.equals(this.secret));
+    }
+}

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2321
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I9c03bddd0df59b69a1f2d6e989b8ee93cf50a6c0
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Xikui Wang <[email protected]>

Reply via email to