Xikui Wang has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/2321
Change subject: [ASTERIXDB-2264][ING] Introduce Http Feed
......................................................................
[ASTERIXDB-2264][ING] Introduce Http Feed
1. HttpSever Interface refactoring
2. Add authentication server
3. Test case refactoring
4. Http Feed
Change-Id: I9c03bddd0df59b69a1f2d6e989b8ee93cf50a6c0
---
M
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
M
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionTest.java
A
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.1.ddl.sqlpp
A
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.2.update.sqlpp
A
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.3.post.http
A
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.4.post.http
A
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.5.update.sqlpp
A
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.6.query.sqlpp
A
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.8.ddl.sqlpp
A
asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/http_feed/http_feed.1.adm
A
asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/http_feed/http_feed.2.adm
M asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
A
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/http/HttpServerRecordReader.java
A
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/http/HttpServerRecordReaderFactory.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
M
asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
A
hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IAuthenticator.java
A
hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/AuthenticatedHttpServer.java
A
hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/AuthenticatedHttpServerHandler.java
M
hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
A
hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/authenticator/BasicAuthenticator.java
27 files changed, 785 insertions(+), 122 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/21/2321/1
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
index c3c450c..2422cd8 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
@@ -38,6 +38,7 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -57,6 +58,7 @@
import java.util.regex.Pattern;
import java.util.stream.Stream;
+import io.netty.handler.codec.base64.Base64Encoder;
import org.apache.asterix.api.http.server.QueryServiceServlet;
import org.apache.asterix.app.external.IExternalUDFLibrarian;
import org.apache.asterix.common.api.Duration;
@@ -86,9 +88,11 @@
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.client.StandardHttpRequestRetryHandler;
+import org.apache.http.message.BasicHeader;
import org.apache.http.protocol.HttpContext;
import org.apache.http.util.EntityUtils;
import org.apache.hyracks.util.StorageUtil;
+import org.apache.hyracks.util.StringUtil;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -124,6 +128,8 @@
private static final Pattern HTTP_BODY_PATTERN =
Pattern.compile("body=(.*)", Pattern.MULTILINE);
private static final Pattern HTTP_STATUSCODE_PATTERN =
Pattern.compile("statuscode (.*)", Pattern.MULTILINE);
private static final Pattern MAX_RESULT_READS_PATTERN =
Pattern.compile("maxresultreads=(\\d+)(\\D|$)", Pattern.MULTILINE);
+ private static final Pattern HTTP_AUTHORIZATION_PATTERN =
Pattern.compile("auth=(.*)");
+
public static final int TRUNCATE_THRESHOLD = 16384;
public static final String DELIVERY_ASYNC = "async";
@@ -618,7 +624,8 @@
return builder.build();
}
- private HttpUriRequest buildRequest(String method, URI uri,
List<Parameter> params, Optional<String> body) {
+ private HttpUriRequest buildRequest(String method, URI uri,
List<Parameter> params, Optional<String> body,
+ String auth) {
RequestBuilder builder = RequestBuilder.create(method);
builder.setUri(uri);
for (Parameter param : params) {
@@ -628,12 +635,16 @@
if (body.isPresent()) {
builder.setEntity(new StringEntity(body.get(),
StandardCharsets.UTF_8));
}
+ if (auth != null) {
+ builder.addHeader("Authorization",
+ "Basic " +
Base64.getEncoder().encodeToString(auth.getBytes()));
+ }
return builder.build();
}
private HttpUriRequest buildRequest(String method, URI uri, OutputFormat
fmt, List<Parameter> params,
- Optional<String> body) {
- HttpUriRequest request = buildRequest(method, uri, params, body);
+ Optional<String> body, String auth) {
+ HttpUriRequest request = buildRequest(method, uri, params, body, auth);
// Set accepted output response type
request.setHeader("Accept", fmt.mimeType());
return request;
@@ -704,21 +715,22 @@
public InputStream executeJSONGet(OutputFormat fmt, URI uri,
List<Parameter> params,
Predicate<Integer> responseCodeValidator) throws Exception {
- return executeJSON(fmt, "GET", uri, params, responseCodeValidator,
Optional.empty());
+ return executeJSON(fmt, "GET", uri, params, responseCodeValidator,
Optional.empty(), null);
}
public InputStream executeJSON(OutputFormat fmt, String method, URI uri,
List<Parameter> params) throws Exception {
- return executeJSON(fmt, method, uri, params, code -> code ==
HttpStatus.SC_OK, Optional.empty());
+ return executeJSON(fmt, method, uri, params, code -> code ==
HttpStatus.SC_OK, Optional.empty(), null);
}
public InputStream executeJSON(OutputFormat fmt, String method, URI uri,
Predicate<Integer> responseCodeValidator)
throws Exception {
- return executeJSON(fmt, method, uri, Collections.emptyList(),
responseCodeValidator, Optional.empty());
+ return executeJSON(fmt, method, uri, Collections.emptyList(),
responseCodeValidator, Optional.empty(), null);
}
public InputStream executeJSON(OutputFormat fmt, String method, URI uri,
List<Parameter> params,
- Predicate<Integer> responseCodeValidator, Optional<String> body)
throws Exception {
- HttpUriRequest request = buildRequest(method, uri, fmt, params, body);
+ Predicate<Integer> responseCodeValidator, Optional<String> body,
String auth)
+ throws Exception {
+ HttpUriRequest request = buildRequest(method, uri, fmt, params, body,
auth);
HttpResponse response = executeAndCheckHttpRequest(request,
responseCodeValidator);
return response.getEntity().getContent();
}
@@ -1163,15 +1175,18 @@
int numResultFiles, String extension, ComparisonEnum compare)
throws Exception {
String handleVar = getHandleVariable(statement);
final String trimmedPathAndQuery = stripAllComments(statement).trim();
+ final String auth = getAuthorizationInfo(statement);
final String variablesReplaced = replaceVarRef(trimmedPathAndQuery,
variableCtx);
final List<Parameter> params = extractParameters(statement);
final Optional<String> body = extractBody(statement);
final Predicate<Integer> statusCodePredicate =
extractStatusCodePredicate(statement);
InputStream resultStream;
if ("http".equals(extension)) {
- resultStream = executeHttp(reqType, variablesReplaced, fmt,
params, statusCodePredicate, body);
+ resultStream = executeHttp(reqType, variablesReplaced, fmt,
params, statusCodePredicate, body,
+ auth);
} else if ("uri".equals(extension)) {
- resultStream = executeURI(reqType, URI.create(variablesReplaced),
fmt, params, statusCodePredicate, body);
+ resultStream = executeURI(reqType, URI.create(variablesReplaced),
fmt, params, statusCodePredicate, body,
+ auth);
} else {
throw new IllegalArgumentException("Unexpected format for method "
+ reqType + ": " + extension);
}
@@ -1394,6 +1409,11 @@
return handleVariableMatcher.find() ? handleVariableMatcher.group(1) :
null;
}
+ protected static String getAuthorizationInfo(String statement) {
+ final Matcher authorizationInfoMatcher =
HTTP_AUTHORIZATION_PATTERN.matcher(statement);
+ return authorizationInfoMatcher.find() ?
authorizationInfoMatcher.group(1) : null;
+ }
+
protected static String replaceVarRef(String statement, Map<String,
Object> variableCtx) {
String tmpStmt = statement;
Matcher variableReferenceMatcher =
VARIABLE_REF_PATTERN.matcher(tmpStmt);
@@ -1449,10 +1469,10 @@
}
protected InputStream executeHttp(String ctxType, String endpoint,
OutputFormat fmt, List<Parameter> params,
- Predicate<Integer> statusCodePredicate, Optional<String> body)
throws Exception {
+ Predicate<Integer> statusCodePredicate, Optional<String> body,
String auth) throws Exception {
String[] split = endpoint.split("\\?");
URI uri = createEndpointURI(split[0], split.length > 1 ? split[1] :
null);
- return executeURI(ctxType, uri, fmt, params, statusCodePredicate,
body);
+ return executeURI(ctxType, uri, fmt, params, statusCodePredicate,
body, auth);
}
private InputStream executeURI(String ctxType, URI uri, OutputFormat fmt,
List<Parameter> params) throws Exception {
@@ -1460,8 +1480,9 @@
}
private InputStream executeURI(String ctxType, URI uri, OutputFormat fmt,
List<Parameter> params,
- Predicate<Integer> responseCodeValidator, Optional<String> body)
throws Exception {
- return executeJSON(fmt, ctxType.toUpperCase(), uri, params,
responseCodeValidator, body);
+ Predicate<Integer> responseCodeValidator, Optional<String> body,
String auth)
+ throws Exception {
+ return executeJSON(fmt, ctxType.toUpperCase(), uri, params,
responseCodeValidator, body, auth);
}
public void killNC(String nodeId, CompilationUnit cUnit) throws Exception {
@@ -1619,6 +1640,7 @@
protected URI createEndpointURI(String path, String query) throws
URISyntaxException {
InetSocketAddress endpoint;
+ int port = -1;
if (!path.startsWith("nc:")) {
int endpointIdx = Math.abs(endpointSelector++ % endpoints.size());
endpoint = endpoints.get(endpointIdx);
@@ -1628,10 +1650,15 @@
throw new IllegalArgumentException("Unrecognized http
pattern");
}
String nodeId = tokens[0].substring(3);
+ if (nodeId.contains(":")) {
+ String nodeParts[] = StringUtils.split(nodeId, ':');
+ nodeId = nodeParts[0];
+ port = Integer.valueOf(nodeParts[1]);
+ }
endpoint = getNcEndPoint(nodeId);
path = tokens[1];
}
- URI uri = new URI("http", null, endpoint.getHostString(),
endpoint.getPort(), path, query, null);
+ URI uri = new URI("http", null, endpoint.getHostString(), port == -1 ?
endpoint.getPort() : port, path, query, null);
LOGGER.debug("Created endpoint URI: " + uri);
return uri;
}
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionTest.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionTest.java
index f9f57a3..b4fb7d5 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionTest.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionTest.java
@@ -18,10 +18,17 @@
*/
package org.apache.asterix.test.runtime;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.test.common.TestExecutor;
import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.hyracks.control.common.controllers.NCConfig;
+import org.apache.hyracks.control.nc.NodeControllerService;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -38,7 +45,16 @@
@BeforeClass
public static void setUp() throws Exception {
- LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, new TestExecutor());
+ TestExecutor testExecutor = new TestExecutor();
+ LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, testExecutor);
+ final NodeControllerService[] ncs =
ExecutionTestUtil.integrationUtil.ncs;
+ Map<String, InetSocketAddress> ncEndPoints = new HashMap<>();
+ final String ip = InetAddress.getLoopbackAddress().getHostAddress();
+ for (NodeControllerService nc : ncs) {
+ final String nodeId = nc.getId();
+ ncEndPoints.put(nodeId, InetSocketAddress.createUnresolved(ip, 0));
+ }
+ testExecutor.setNcEndPoints(ncEndPoints);
}
@AfterClass
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.1.ddl.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.1.ddl.sqlpp
new file mode 100644
index 0000000..3edf6e3
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.1.ddl.sqlpp
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+drop dataverse experiments if exists;
+create dataverse experiments;
+use experiments;
+
+create type TweetMessageType as open {
+ id : string
+};
+
+create dataset Tweets(TweetMessageType) primary key id;
+
+create feed TweetFeed with {
+ "adapter-name" : "http_adapter",
+ "addresses" : "asterix_nc2:10012",
+ "address-type" : "NC",
+ "type-name" : "TweetMessageType",
+ "format" : "adm",
+ "username" : "till",
+ "password" : "westmann"
+};
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.2.update.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.2.update.sqlpp
new file mode 100644
index 0000000..fd41b72
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.2.update.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use experiments;
+set `wait-for-completion-feed` "false";
+
+connect feed TweetFeed to dataset Tweets;
+
+start feed TweetFeed;
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.3.post.http
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.3.post.http
new file mode 100644
index 0000000..d363e55
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.3.post.http
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+nc:asterix_nc2:10012 /
+--auth=till:westmann
+--body={ "id": "nc1:11", "username": "magarika", "location": "", "text": "RT
@ken24xavier: Obama tells SOROS - our plan is ALMOST finished
http://t.co/WvzK0GtU", "timestamp": "Thu Dec 06 16:53:05 PST 2012" }{ "id":
"nc1:102", "username": "jaysauce82", "location": "", "text": "Not voting for
President Obama #BadDecision", "timestamp": "Thu Dec 06 16:53:16 PST 2012" }
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.4.post.http
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.4.post.http
new file mode 100644
index 0000000..5fcb222
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.4.post.http
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+nc:asterix_nc2:10012 /
+--auth=till:eastmann
+--body={ "id": "nc1:11", "username": "magarika", "location": "", "text": "RT
@ken24xavier: Obama tells SOROS - our plan is ALMOST finished
http://t.co/WvzK0GtU", "timestamp": "Thu Dec 06 16:53:05 PST 2012" }
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.5.update.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.5.update.sqlpp
new file mode 100644
index 0000000..7a07621
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.5.update.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use experiments;
+stop feed TweetFeed;
+disconnect feed TweetFeed from dataset Tweets;
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.6.query.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.6.query.sqlpp
new file mode 100644
index 0000000..bcddf7a
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.6.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use experiments;
+
+select value count(t) from Tweets as t;
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.8.ddl.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.8.ddl.sqlpp
new file mode 100644
index 0000000..e4d2615
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/http_feed/http_feed.8.ddl.sqlpp
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use experiments;
+drop dataverse experiments;
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/http_feed/http_feed.1.adm
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/http_feed/http_feed.1.adm
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/http_feed/http_feed.1.adm
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/http_feed/http_feed.2.adm
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/http_feed/http_feed.2.adm
new file mode 100644
index 0000000..d8263ee
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/http_feed/http_feed.2.adm
@@ -0,0 +1 @@
+2
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 16fa334..4dfc241 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -8403,6 +8403,12 @@
<output-dir compare="Text">connect-feed-with-function</output-dir>
</compilation-unit>
</test-case>
+ <test-case FilePath="feeds">
+ <compilation-unit name="http_feed">
+ <output-dir compare="Text">http_feed</output-dir>
+ <expected-error>HTTP/1.1 401 Unauthorized</expected-error>
+ </compilation-unit>
+ </test-case>
</test-group>
<test-group name="hdfs">
<test-case FilePath="hdfs">
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
index 4c3b7e6..efa12ef 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
@@ -38,7 +38,7 @@
* The data source type indicates whether the data source produces a
continuous stream or
* a set of records
*/
- public enum DataSourceType {
+ enum DataSourceType {
STREAM,
RECORDS
}
@@ -46,7 +46,7 @@
/**
* @return The data source type {STREAM or RECORDS}
*/
- public DataSourceType getDataSourceType();
+ DataSourceType getDataSourceType();
/**
* Specifies on which locations this data source is expected to run.
@@ -54,7 +54,7 @@
* @return
* @throws AsterixException
*/
- public AlgebricksAbsolutePartitionConstraint getPartitionConstraint()
throws AlgebricksException;
+ AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws
AlgebricksException;
/**
* Configure the data parser factory. The passed map contains key value
pairs from the
@@ -63,7 +63,7 @@
* @param configuration
* @throws AsterixException
*/
- public void configure(IServiceContext ctx, Map<String, String>
configuration)
+ void configure(IServiceContext ctx, Map<String, String> configuration)
throws AlgebricksException, HyracksDataException;
/**
@@ -71,7 +71,7 @@
*
* @return
*/
- public default boolean isIndexible() {
+ default boolean isIndexible() {
return false;
}
@@ -84,7 +84,7 @@
* @return
* @throws AlgebricksException
*/
- public static AlgebricksAbsolutePartitionConstraint
getPartitionConstraints(ICcApplicationContext appCtx,
+ static AlgebricksAbsolutePartitionConstraint
getPartitionConstraints(ICcApplicationContext appCtx,
AlgebricksAbsolutePartitionConstraint constraints, int count)
throws AlgebricksException {
if (constraints == null) {
IClusterStateManager clusterStateManager =
appCtx.getClusterStateManager();
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/http/HttpServerRecordReader.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/http/HttpServerRecordReader.java
new file mode 100644
index 0000000..a8fe2af
--- /dev/null
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/http/HttpServerRecordReader.java
@@ -0,0 +1,174 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied. See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ *
+ */
+package org.apache.asterix.external.input.record.reader.http;
+
+import io.netty.handler.codec.http.HttpMethod;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
+import org.apache.asterix.external.input.record.GenericRecord;
+import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.http.api.IServletRequest;
+import org.apache.hyracks.http.api.IServletResponse;
+import org.apache.hyracks.http.server.AbstractServlet;
+import org.apache.hyracks.http.server.AuthenticatedHttpServer;
+import org.apache.hyracks.http.server.HttpServer;
+import org.apache.hyracks.http.server.WebManager;
+import org.apache.hyracks.http.server.authenticator.BasicAuthenticator;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class HttpServerRecordReader implements IRecordReader<char[]> {
+
+ private static String DEFAULT_ENTRY_POINT = "/";
+ private LinkedBlockingQueue<String> inputQ;
+ private GenericRecord<char[]> record;
+ private boolean closed = false;
+ private WebManager webManager;
+ private HttpServer webServer;
+
+ public HttpServerRecordReader(int port, String username, String password,
String entryPoint) throws Exception {
+ this.inputQ = new LinkedBlockingQueue<>();
+ this.record = new GenericRecord<>();
+ webManager = new WebManager();
+ if (username == null || password == null) {
+ webServer = new HttpServer(webManager.getBosses(),
webManager.getWorkers(), port);
+ } else {
+ webServer = new AuthenticatedHttpServer(webManager.getBosses(),
webManager.getWorkers(), port,
+ new BasicAuthenticator(username, password));
+ }
+ webServer.addServlet(new HttpFeedServlet(webServer.ctx(),
+ new String[] { entryPoint == null ? DEFAULT_ENTRY_POINT :
entryPoint }, inputQ));
+ webManager.add(webServer);
+ webManager.start();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return !closed;
+ }
+
+ @Override
+ public IRawRecord<char[]> next() throws IOException, InterruptedException {
+ String srecord = inputQ.poll();
+ if (srecord == null) {
+ return null;
+ }
+ record.set(srecord.toCharArray());
+ return record;
+ }
+
+ @Override
+ public boolean stop() {
+ try {
+ close();
+ } catch (Exception e) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public void setController(AbstractFeedDataFlowController controller) {
+ // do nothing
+ }
+
+ @Override
+ public void setFeedLogManager(FeedLogManager feedLogManager) throws
HyracksDataException {
+ // do nothing
+ }
+
+ @Override
+ public boolean handleException(Throwable th) {
+ return false;
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ if (!closed) {
+ System.out.println("RR close requested ");
+ webManager.stop();
+ closed = true;
+ System.out.println("RR closed ");
+ }
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ private class HttpFeedServlet extends AbstractServlet {
+
+ private LinkedBlockingQueue<String> inputQ;
+
+ private void splitIntoRecords(String admData) throws
InterruptedException {
+ int p = 0, cnt = 0;
+ boolean record = false;
+ char[] charBuff = admData.toCharArray();
+ for (int iter1 = 0; iter1 < charBuff.length; iter1++) {
+ if (charBuff[iter1] == '{') {
+ if (record == false) {
+ p = iter1;
+ record = true;
+ }
+ cnt++;
+ } else if (charBuff[iter1] == '}') {
+ cnt--;
+ }
+ if (cnt == 0) {
+ if (record) {
+ inputQ.put(admData.substring(p, iter1 + 1) + '\n');
+ record = false;
+ }
+ p = iter1;
+ }
+ }
+ }
+
+ public HttpFeedServlet(ConcurrentMap<String, Object> ctx, String[]
paths, LinkedBlockingQueue<String> inputQ) {
+ super(ctx, paths);
+ this.inputQ = inputQ;
+ }
+
+ private void doPost(IServletRequest request) throws
InterruptedException {
+
splitIntoRecords(request.getHttpRequest().content().toString(StandardCharsets.UTF_8));
+ }
+
+ @Override
+ public void handle(IServletRequest request, IServletResponse response)
{
+ if (request.getHttpRequest().method() == HttpMethod.POST) {
+ try {
+ doPost(request);
+ response.setStatus(HttpResponseStatus.OK);
+ } catch (InterruptedException e) {
+
response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
+ }
+ } else {
+ response.setStatus(HttpResponseStatus.METHOD_NOT_ALLOWED);
+ }
+ }
+ }
+}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/http/HttpServerRecordReaderFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/http/HttpServerRecordReaderFactory.java
new file mode 100644
index 0000000..7532e2b
--- /dev/null
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/http/HttpServerRecordReaderFactory.java
@@ -0,0 +1,103 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied. See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ *
+ */
+package org.apache.asterix.external.input.record.reader.http;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.api.IRecordReaderFactory;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.FeedUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class HttpServerRecordReaderFactory implements
IRecordReaderFactory<char[]> {
+
+ public static final String KEY_CONFIGURATION_USER_NAME = "username";
+ public static final String KEY_CONFIGURATION_PASSWORD = "password";
+ public static final String KEY_CONFIGURATION_ADDRESSES = "addresses";
+ public static final String KEY_CONFIGURATION_PATH = "path";
+
+ private static final List<String> recordReaderNames = Collections
+
.unmodifiableList(Arrays.asList(ExternalDataConstants.KEY_ADAPTER_NAME_HTTP));
+
+ private String username;
+ private String password;
+ private String entryPoint;
+ private String addrValue;
+ private Map<String, String> configurations;
+ private List<Pair<String, Integer>> serverAddrs;
+
+ @Override
+ public IRecordReader<? extends char[]>
createRecordReader(IHyracksTaskContext ctx, int partition)
+ throws HyracksDataException {
+ try {
+ return new
HttpServerRecordReader(serverAddrs.get(partition).getRight(), username,
password, entryPoint);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public Class<?> getRecordClass() {
+ return char[].class;
+ }
+
+ @Override
+ public List<String> getRecordReaderNames() {
+ return recordReaderNames;
+ }
+
+ @Override
+ public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
+ return FeedUtils.addressToAbsolutePartitionConstraints(serverAddrs);
+ }
+
+ private String getConfigurationValue(String key, boolean required) throws
CompilationException {
+ String value = configurations.get(key);
+ if (value == null && required) {
+ throw new CompilationException("Required configuration missing: "
+ key);
+ }
+ return value;
+ }
+
+ @Override
+ public void configure(IServiceContext ctx, Map<String, String>
configuration)
+ throws AlgebricksException {
+ this.configurations = configuration;
+ // necessary configs
+ addrValue = getConfigurationValue(KEY_CONFIGURATION_ADDRESSES, true);
+ serverAddrs =
FeedUtils.extractHostsPorts(getConfigurationValue(ExternalDataConstants.KEY_MODE,
true), ctx,
+ addrValue);
+ // optional configs
+ username = getConfigurationValue(KEY_CONFIGURATION_USER_NAME, false);
+ password = getConfigurationValue(KEY_CONFIGURATION_PASSWORD, false);
+ entryPoint = getConfigurationValue(KEY_CONFIGURATION_PATH, false);
+ }
+}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
index 0591775..8e2adda 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
@@ -24,10 +24,8 @@
import java.util.List;
import java.util.Map;
-import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.asterix.external.api.IInputStreamFactory;
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.api.IRecordReaderFactory;
@@ -49,8 +47,8 @@
protected Map<String, String> configuration;
protected Class recordReaderClazz;
private static final List<String> recordReaderNames =
Collections.unmodifiableList(
- Arrays.asList(ExternalDataConstants.ALIAS_LOCALFS_ADAPTER,
ExternalDataConstants.ALIAS_SOCKET_ADAPTER,
- ExternalDataConstants.SOCKET,
ExternalDataConstants.STREAM_SOCKET_CLIENT));
+ Arrays.asList(ExternalDataConstants.ALIAS_LOCALFS_ADAPTER,
ExternalDataConstants.KEY_ALIAS_ADAPTER_NAME_SOCKET,
+ ExternalDataConstants.KEY_ADAPTER_NAME_SOCKET,
ExternalDataConstants.STREAM_SOCKET_CLIENT));
@Override
public DataSourceType getDataSourceType() {
@@ -71,8 +69,8 @@
String reader = config.get(ExternalDataConstants.KEY_READER);
if (reader.equals(ExternalDataConstants.ALIAS_LOCALFS_ADAPTER)) {
streamFactory = new LocalFSInputStreamFactory();
- } else if (reader.equals(ExternalDataConstants.ALIAS_SOCKET_ADAPTER)
- || reader.equals(ExternalDataConstants.SOCKET)) {
+ } else if
(reader.equals(ExternalDataConstants.KEY_ALIAS_ADAPTER_NAME_SOCKET)
+ ||
reader.equals(ExternalDataConstants.KEY_ADAPTER_NAME_SOCKET)) {
streamFactory = new SocketServerInputStreamFactory();
} else if (reader.equals(ExternalDataConstants.STREAM_SOCKET_CLIENT)) {
streamFactory = new SocketClientInputStreamFactory();
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
index 8182dcd..7e5e3c8 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
@@ -56,9 +56,9 @@
private transient IServiceContext serviceCtx;
private static final List<String> recordReaderNames =
-
Collections.unmodifiableList(Arrays.asList(ExternalDataConstants.READER_TWITTER_PULL,
- ExternalDataConstants.READER_TWITTER_PUSH,
ExternalDataConstants.READER_PUSH_TWITTER,
- ExternalDataConstants.READER_PULL_TWITTER,
ExternalDataConstants.READER_USER_STREAM_TWITTER));
+
Collections.unmodifiableList(Arrays.asList(ExternalDataConstants.KEY_ADAPTER_NAME_TWITTER_PULL,
+ ExternalDataConstants.KEY_ADAPTER_NAME_TWITTER_PUSH,
ExternalDataConstants.KEY_ADAPTER_NAME_PUSH_TWITTER,
+ ExternalDataConstants.KEY_ADAPTER_NAME_PULL_TWITTER,
ExternalDataConstants.KEY_ADAPTER_NAME_TWITTER_USER_STREAM));
@Override
public DataSourceType getDataSourceType() {
@@ -98,7 +98,7 @@
throw new AsterixException(builder.toString());
}
- if
(configuration.get(ExternalDataConstants.KEY_READER).equals(ExternalDataConstants.READER_PULL_TWITTER))
{
+ if
(configuration.get(ExternalDataConstants.KEY_READER).equals(ExternalDataConstants.KEY_ADAPTER_NAME_PULL_TWITTER))
{
if (configuration.get(SearchAPIConstants.QUERY) == null) {
throw new AsterixException(
"parameter " + SearchAPIConstants.QUERY + " not
specified as part of adaptor configuration");
@@ -131,12 +131,12 @@
throws HyracksDataException {
IRecordReader<? extends String> recordReader;
switch (configuration.get(ExternalDataConstants.KEY_READER)) {
- case ExternalDataConstants.READER_PULL_TWITTER:
+ case ExternalDataConstants.KEY_ADAPTER_NAME_PULL_TWITTER:
recordReader = new
TwitterPullRecordReader(TwitterUtil.getTwitterService(configuration),
configuration.get(SearchAPIConstants.QUERY),
Integer.parseInt(configuration.get(SearchAPIConstants.INTERVAL)));
break;
- case ExternalDataConstants.READER_PUSH_TWITTER:
+ case ExternalDataConstants.KEY_ADAPTER_NAME_PUSH_TWITTER:
FilterQuery query;
try {
query = TwitterUtil.getFilterQuery(configuration);
@@ -149,7 +149,7 @@
throw new HyracksDataException(e);
}
break;
- case ExternalDataConstants.READER_USER_STREAM_TWITTER:
+ case ExternalDataConstants.KEY_ADAPTER_NAME_TWITTER_USER_STREAM:
recordReader = new
TwitterPushRecordReader(TwitterUtil.getTwitterStream(configuration),
TwitterUtil.getUserTweetsListener());
break;
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java
index 1f1fa5c..86a4db6 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java
@@ -19,28 +19,20 @@
package org.apache.asterix.external.input.stream.factory;
import java.io.IOException;
-import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.asterix.external.api.IInputStreamFactory;
import org.apache.asterix.external.input.stream.SocketServerInputStream;
import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.runtime.utils.RuntimeUtils;
-import org.apache.commons.lang3.StringUtils;
+import org.apache.asterix.external.util.FeedUtils;
+import org.apache.commons.lang3.tuple.Pair;
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.api.application.IServiceContext;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -49,66 +41,15 @@
private static final long serialVersionUID = 1L;
private List<Pair<String, Integer>> sockets;
- private Mode mode = Mode.IP;
-
- public static enum Mode {
- NC,
- IP
- }
@Override
public void configure(IServiceContext serviceCtx, Map<String, String>
configuration)
- throws AsterixException, CompilationException {
+ throws CompilationException {
try {
- sockets = new ArrayList<>();
- String modeValue =
configuration.get(ExternalDataConstants.KEY_MODE);
- if (modeValue != null) {
- mode = Mode.valueOf(modeValue.trim().toUpperCase());
- }
- String socketsValue =
configuration.get(ExternalDataConstants.KEY_SOCKETS);
- if (socketsValue == null) {
- throw new
CompilationException(ErrorCode.FEED_METADATA_SOCKET_ADAPTOR_SOCKET_NOT_PROPERLY_CONFIGURED);
- }
- Map<InetAddress, Set<String>> ncMap;
- ncMap = RuntimeUtils.getNodeControllerMap((ICcApplicationContext)
serviceCtx.getApplicationContext());
- List<String> ncs =
- RuntimeUtils.getAllNodeControllers((ICcApplicationContext)
serviceCtx.getApplicationContext());
- String[] socketsArray = socketsValue.split(",");
- Random random = new Random();
- for (String socket : socketsArray) {
- String[] socketTokens = socket.split(":");
- String host = socketTokens[0].trim();
- int port = Integer.parseInt(socketTokens[1].trim());
- Pair<String, Integer> p = null;
- switch (mode) {
- case IP:
- Set<String> ncsOnIp =
ncMap.get(InetAddress.getByName(host));
- if ((ncsOnIp == null) || ncsOnIp.isEmpty()) {
- throw new CompilationException(
-
ErrorCode.FEED_METADATA_SOCKET_ADAPTOR_SOCKET_INVALID_HOST_NC, "host", host,
- StringUtils.join(ncMap.keySet(), ", "));
- }
- String[] ncArray = ncsOnIp.toArray(new String[] {});
- String nc = ncArray[random.nextInt(ncArray.length)];
- p = new Pair<>(nc, port);
- break;
-
- case NC:
- p = new Pair<>(host, port);
- if (!ncs.contains(host)) {
- throw new CompilationException(
-
ErrorCode.FEED_METADATA_SOCKET_ADAPTOR_SOCKET_INVALID_HOST_NC, "NC", host,
- StringUtils.join(ncs, ", "));
-
- }
- break;
- }
- sockets.add(p);
- }
+ sockets =
FeedUtils.extractHostsPorts(configuration.get(ExternalDataConstants.KEY_MODE),
+ serviceCtx,
configuration.get(ExternalDataConstants.KEY_SOCKETS));
} catch (CompilationException e) {
throw e;
- } catch (HyracksDataException | UnknownHostException e) {
- throw new AsterixException(e);
} catch (Exception e) {
throw new
CompilationException(ErrorCode.FEED_METADATA_SOCKET_ADAPTOR_SOCKET_NOT_PROPERLY_CONFIGURED);
}
@@ -121,7 +62,7 @@
Pair<String, Integer> socket = sockets.get(partition);
ServerSocket server;
server = new ServerSocket();
- server.bind(new InetSocketAddress(socket.second));
+ server.bind(new InetSocketAddress(socket.getRight()));
return new SocketServerInputStream(server);
} catch (IOException e) {
throw HyracksDataException.create(e);
@@ -130,11 +71,7 @@
@Override
public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
- List<String> locations = new ArrayList<>();
- for (Pair<String, Integer> socket : sockets) {
- locations.add(socket.first);
- }
- return new AlgebricksAbsolutePartitionConstraint(locations.toArray(new
String[] {}));
+ return FeedUtils.addressToAbsolutePartitionConstraints(sockets);
}
public List<Pair<String, Integer>> getSockets() {
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
index 859c9fd..b35cbb6 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
@@ -72,11 +72,11 @@
streamSource);
} else {
switch (streamSource) {
- case ExternalDataConstants.STREAM_LOCAL_FILESYSTEM:
+ case ExternalDataConstants.KEY_ADAPTER_NAME_LOCALFS:
streamSourceFactory = new LocalFSInputStreamFactory();
break;
- case ExternalDataConstants.SOCKET:
- case ExternalDataConstants.ALIAS_SOCKET_ADAPTER:
+ case ExternalDataConstants.KEY_ADAPTER_NAME_SOCKET:
+ case ExternalDataConstants.KEY_ALIAS_ADAPTER_NAME_SOCKET:
streamSourceFactory = new SocketServerInputStreamFactory();
break;
case ExternalDataConstants.STREAM_SOCKET_CLIENT:
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 494efb5..7814b16 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -101,6 +101,21 @@
public static final String KEY_HTTP_PROXY_PORT = "http-proxy-port";
public static final String KEY_HTTP_PROXY_USER = "http-proxy-user";
public static final String KEY_HTTP_PROXY_PASSWORD = "http-proxy-password";
+
+ /**
+ * Keys for adapter name
+ **/
+ public static final String KEY_ADAPTER_NAME_TWITTER_PUSH = "twitter_push";
+ public static final String KEY_ADAPTER_NAME_PUSH_TWITTER = "push_twitter";
+ public static final String KEY_ADAPTER_NAME_TWITTER_PULL = "twitter_pull";
+ public static final String KEY_ADAPTER_NAME_PULL_TWITTER = "pull_twitter";
+ public static final String KEY_ADAPTER_NAME_TWITTER_USER_STREAM =
"twitter_user_stream";
+ public static final String KEY_ADAPTER_NAME_LOCALFS = "localfs";
+ public static final String KEY_ADAPTER_NAME_SOCKET = "socket";
+ public static final String KEY_ALIAS_ADAPTER_NAME_SOCKET =
"socket_adapter";
+ public static final String KEY_ADAPTER_NAME_HTTP = "http_adapter";
+
+
/**
* HDFS class names
*/
@@ -122,11 +137,7 @@
* Builtin record readers
*/
public static final String READER_HDFS = "hdfs";
- public static final String READER_TWITTER_PUSH = "twitter_push";
- public static final String READER_PUSH_TWITTER = "push_twitter";
- public static final String READER_TWITTER_PULL = "twitter_pull";
- public static final String READER_PULL_TWITTER = "pull_twitter";
- public static final String READER_USER_STREAM_TWITTER =
"twitter_user_stream";
+
public static final String CLUSTER_LOCATIONS = "cluster-locations";
public static final String SCHEDULER = "hdfs-scheduler";
@@ -157,8 +168,6 @@
* input streams
*/
public static final String STREAM_HDFS = "hdfs";
- public static final String STREAM_LOCAL_FILESYSTEM = "localfs";
- public static final String SOCKET = "socket";
public static final String STREAM_SOCKET_CLIENT = "socket-client";
/**
@@ -168,13 +177,7 @@
public static final String ALIAS_LOCALFS_ADAPTER = "localfs";
public static final String ALIAS_LOCALFS_PUSH_ADAPTER = "push_localfs";
public static final String ALIAS_HDFS_ADAPTER = "hdfs";
- public static final String ALIAS_SOCKET_ADAPTER = "socket_adapter";
public static final String ALIAS_SOCKET_CLIENT_ADAPTER = "socket_client";
- public static final String ALIAS_RSS_ADAPTER = "rss";
- public static final String ALIAS_FILE_FEED_ADAPTER = "file_feed";
- public static final String ALIAS_TWITTER_PUSH_ADAPTER = "push_twitter";
- public static final String ALIAS_TWITTER_PULL_ADAPTER = "pull_twitter";
- public static final String ALIAS_CNN_ADAPTER = "cnn_feed";
public static final String ALIAS_FEED_WITH_META_ADAPTER = "feed_with_meta";
public static final String ALIAS_CHANGE_FEED_WITH_META_ADAPTER =
"change_feed_with_meta";
// for testing purposes
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
index 8481064..32fe0c9 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
@@ -19,19 +19,30 @@
package org.apache.asterix.external.util;
import java.io.File;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Random;
+import java.util.Set;
import org.apache.asterix.common.api.IClusterManagementWork;
import org.apache.asterix.common.cluster.ClusterPartition;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.utils.StoragePathUtil;
+import org.apache.asterix.runtime.utils.RuntimeUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint.PartitionConstraintType;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.application.IServiceContext;
import org.apache.hyracks.api.comm.FrameHelper;
import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -135,4 +146,57 @@
return configuration.get(ExternalDataConstants.KEY_META_TYPE_NAME);
}
+
+ public static List<Pair<String, Integer>> extractHostsPorts(String
modeValue, IServiceContext serviceCtx,
+ String hostsValue) throws AlgebricksException {
+ try {
+ List<Pair<String, Integer>> sockets = new ArrayList<>();
+ String mode = modeValue.trim().toUpperCase();
+ if (hostsValue == null) {
+ throw new
CompilationException(ErrorCode.FEED_METADATA_SOCKET_ADAPTOR_SOCKET_NOT_PROPERLY_CONFIGURED);
+ }
+ Map<InetAddress, Set<String>> ncMap;
+ ncMap = RuntimeUtils.getNodeControllerMap((ICcApplicationContext)
serviceCtx.getApplicationContext());
+ List<String> ncs = RuntimeUtils
+ .getAllNodeControllers((ICcApplicationContext)
serviceCtx.getApplicationContext());
+ String[] socketsArray = hostsValue.split(",");
+ Random random = new Random();
+ for (String socket : socketsArray) {
+ String[] socketTokens = socket.split(":");
+ String host = socketTokens[0].trim();
+ int port = Integer.parseInt(socketTokens[1].trim());
+ Pair<String, Integer> p = null;
+ if (mode.equals("IP")) {
+ Set<String> ncsOnIp =
ncMap.get(InetAddress.getByName(host));
+ if ((ncsOnIp == null) || ncsOnIp.isEmpty()) {
+ throw new
CompilationException(ErrorCode.FEED_METADATA_SOCKET_ADAPTOR_SOCKET_INVALID_HOST_NC,
+ "host", host, StringUtils.join(ncMap.keySet(),
", "));
+ }
+ String[] ncArray = ncsOnIp.toArray(new String[] {});
+ String nc = ncArray[random.nextInt(ncArray.length)];
+ p = Pair.of(nc, port);
+ } else if (mode.equals("NC")) {
+ p = Pair.of(host, port);
+ if (!ncs.contains(host)) {
+ throw new
CompilationException(ErrorCode.FEED_METADATA_SOCKET_ADAPTOR_SOCKET_INVALID_HOST_NC,
+ "NC", host, StringUtils.join(ncs, ", "));
+
+ }
+ }
+ sockets.add(p);
+ }
+ return sockets;
+ } catch (HyracksDataException | UnknownHostException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+
+ public static AlgebricksAbsolutePartitionConstraint
addressToAbsolutePartitionConstraints(
+ List<Pair<String, Integer>> sockets) {
+ List<String> locations = new ArrayList<>();
+ for (Pair<String, Integer> socket : sockets) {
+ locations.add(socket.getLeft());
+ }
+ return new AlgebricksAbsolutePartitionConstraint(locations.toArray(new
String[] {}));
+ }
}
diff --git
a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
index 52e28be..0d96658 100644
---
a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
+++
b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
@@ -19,3 +19,4 @@
org.apache.asterix.external.input.record.reader.rss.RSSRecordReaderFactory
org.apache.asterix.external.input.HDFSDataSourceFactory
org.apache.asterix.external.input.record.reader.stream.StreamRecordReaderFactory
+org.apache.asterix.external.input.record.reader.http.HttpServerRecordReaderFactory
diff --git
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IAuthenticator.java
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IAuthenticator.java
new file mode 100644
index 0000000..9fbc11c
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IAuthenticator.java
@@ -0,0 +1,30 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied. See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ *
+ */
+package org.apache.hyracks.http.api;
+
+import io.netty.handler.codec.http.HttpRequest;
+
+public interface IAuthenticator {
+
+ String KEY_REQUEST_AUTHORIZATION = "Authorization";
+
+ boolean validate(HttpRequest request);
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/AuthenticatedHttpServer.java
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/AuthenticatedHttpServer.java
new file mode 100644
index 0000000..09d9c9c
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/AuthenticatedHttpServer.java
@@ -0,0 +1,41 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied. See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ *
+ */
+package org.apache.hyracks.http.server;
+
+import org.apache.hyracks.http.api.IAuthenticator;
+
+import io.netty.channel.EventLoopGroup;
+
+public class AuthenticatedHttpServer extends HttpServer {
+
+ private IAuthenticator authenticator;
+
+ public AuthenticatedHttpServer(EventLoopGroup bossGroup, EventLoopGroup
workerGroup, int port, IAuthenticator authenticator) {
+ super(bossGroup, workerGroup, port);
+ this.authenticator = authenticator;
+ }
+
+ @Override
+ protected HttpServerHandler<? extends HttpServer> createHttpHandler(int
chunkSize) {
+ return new AuthenticatedHttpServerHandler(this, chunkSize,
authenticator);
+ }
+
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/AuthenticatedHttpServerHandler.java
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/AuthenticatedHttpServerHandler.java
new file mode 100644
index 0000000..f2e3bab
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/AuthenticatedHttpServerHandler.java
@@ -0,0 +1,48 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied. See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ *
+ */
+package org.apache.hyracks.http.server;
+
+import java.io.IOException;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import org.apache.hyracks.http.api.IAuthenticator;
+import org.apache.hyracks.http.api.IServlet;
+
+public class AuthenticatedHttpServerHandler extends HttpServerHandler {
+
+ private IAuthenticator authenticator;
+
+ public AuthenticatedHttpServerHandler(HttpServer server, int chunkSize,
IAuthenticator authenticator) {
+ super(server, chunkSize);
+ this.authenticator = authenticator;
+ }
+
+ @Override
+ protected void submit(ChannelHandlerContext ctx, IServlet servlet,
FullHttpRequest request) throws IOException {
+ if (!authenticator.validate(request)) {
+ respond(ctx, request.protocolVersion(),
HttpResponseStatus.UNAUTHORIZED);
+ } else {
+ super.submit(ctx, servlet, request);
+ }
+ }
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
index 9290cdf..0511103 100644
---
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
+++
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
@@ -82,7 +82,7 @@
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
- private void submit(ChannelHandlerContext ctx, IServlet servlet,
FullHttpRequest request) throws IOException {
+ protected void submit(ChannelHandlerContext ctx, IServlet servlet,
FullHttpRequest request) throws IOException {
IServletRequest servletRequest;
try {
servletRequest = HttpUtil.toServletRequest(request);
diff --git
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/authenticator/BasicAuthenticator.java
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/authenticator/BasicAuthenticator.java
new file mode 100644
index 0000000..11a8aa7
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/authenticator/BasicAuthenticator.java
@@ -0,0 +1,45 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied. See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ *
+ */
+package org.apache.hyracks.http.server.authenticator;
+
+import io.netty.handler.codec.http.HttpRequest;
+import org.apache.hyracks.http.api.IAuthenticator;
+
+import java.util.Base64;
+
+public class BasicAuthenticator implements IAuthenticator {
+
+ public static final String SUFFIX_BASIC_AUTHENTICATION = "Basic ";
+
+ private String secret;
+
+ public BasicAuthenticator(String username, String password) {
+ this.secret = SUFFIX_BASIC_AUTHENTICATION
+ + Base64.getEncoder().encodeToString((username + ":" +
password).getBytes());
+
+ }
+
+ @Override
+ public boolean validate(HttpRequest request) {
+ String authorizationInfo =
request.headers().get(KEY_REQUEST_AUTHORIZATION);
+ return (authorizationInfo != null &&
authorizationInfo.equals(this.secret));
+ }
+}
--
To view, visit https://asterix-gerrit.ics.uci.edu/2321
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I9c03bddd0df59b69a1f2d6e989b8ee93cf50a6c0
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Xikui Wang <[email protected]>