Xikui Wang has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/2093

Change subject: [WIP] Add external dataset for feed logs
......................................................................

[WIP] Add external dataset for feed logs

Change-Id: Id4ab4d39ac8d00e4ea94da83b0c09b3869fb0131
---
M 
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ActiveStatsApiServlet.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-metadata/feed-with-log-dataset-metadata.1.ddl.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-metadata/feed-with-log-dataset-metadata.18.ddl.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-metadata/feed-with-log-dataset-metadata.2.update.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-metadata/feed-with-log-dataset-metadata.3.query.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-metadata/feed-with-log-dataset-metadata.4.query.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-metadata/feed-with-log-dataset-metadata.5.update.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-metadata/feed-with-log-dataset-metadata.6.query.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.1.ddl.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.10.query.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.11.update.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.12.update.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.13.query.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.14.server.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.15.ddl.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.2.update.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.3.server.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.4.server.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.5.sleep.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.6.query.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.7.update.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.8.query.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.9.update.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feed-with-log-dataset-metadata/feed-with-log-dataset-metadata.3.adm
A 
asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feed-with-log-dataset-metadata/feed-with-log-dataset-metadata.4.adm
A 
asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feed-with-log-dataset-metadata/feed-with-log-dataset-metadata.6.adm
A 
asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.10.adm
A 
asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.13.adm
A 
asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.6.adm
A 
asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.8.adm
M asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java
M asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj
M 
asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ExternalDetailsDecl.java
M 
asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/StartFeedStatement.java
M asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
43 files changed, 775 insertions(+), 70 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/93/2093/1

diff --git 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
index 264e9bc..fad49db 100644
--- 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
+++ 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
@@ -87,6 +87,10 @@
         return runtimes.get(runtimeId);
     }
 
+    public String getNodeId() {
+        return this.nodeId;
+    }
+
     @Override
     public String toString() {
         return ActiveManager.class.getSimpleName() + "[" + nodeId + "]";
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ActiveStatsApiServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ActiveStatsApiServlet.java
index 7e6d54b..a4889ce 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ActiveStatsApiServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ActiveStatsApiServlet.java
@@ -76,8 +76,10 @@
             }
             long currentTime = System.currentTimeMillis();
             for (int iter1 = 0; iter1 < listeners.length; iter1++) {
-                resNode.putPOJO(listeners[iter1].getDisplayName(),
-                        constructNode(OBJECT_MAPPER, listeners[iter1], 
currentTime, expireTime));
+                if (listeners[iter1].isActive()) {
+                    resNode.putPOJO(listeners[iter1].getDisplayName(),
+                            constructNode(OBJECT_MAPPER, listeners[iter1], 
currentTime, expireTime));
+                }
             }
             // Construct Response
             
responseWriter.write(OBJECT_MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(resNode));
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
index 73d0840..d544fff 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
@@ -51,6 +51,7 @@
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
@@ -65,6 +66,7 @@
     private static final ActiveEvent STATE_CHANGED = new ActiveEvent(null, 
Kind.STATE_CHANGED, null, null);
     private static final EnumSet<ActivityState> TRANSITION_STATES = 
EnumSet.of(ActivityState.RESUMING,
             ActivityState.STARTING, ActivityState.STOPPING, 
ActivityState.RECOVERING);
+    private static final String DEFAULT_ACTIVE_STATS = "{\"Stats\":\"N/A\"}";
     // finals
     protected final IClusterStateManager clusterStateManager;
     protected final ActiveNotificationHandler handler;
@@ -113,7 +115,7 @@
         this.statsTimestamp = -1;
         this.isFetchingStats = false;
         this.statsUpdatedEvent = new ActiveEvent(null, Kind.STATS_UPDATED, 
entityId, null);
-        this.stats = "{\"Stats\":\"N/A\"}";
+        this.stats = DEFAULT_ACTIVE_STATS;
         this.runtimeName = runtimeName;
         this.locations = locations;
         this.numRegistered = 0;
@@ -261,13 +263,7 @@
     }
 
     public String formatStats(List<String> responses) {
-        StringBuilder strBuilder = new StringBuilder();
-        strBuilder.append("{\"Stats\": [").append(responses.get(0));
-        for (int i = 1; i < responses.size(); i++) {
-            strBuilder.append(", ").append(responses.get(i));
-        }
-        strBuilder.append("]}");
-        return strBuilder.toString();
+        return "[" + StringUtils.join(responses, ",") + "]";
     }
 
     @SuppressWarnings("unchecked")
@@ -275,9 +271,12 @@
     public void refreshStats(long timeout) throws HyracksDataException {
         LOGGER.log(level, "refreshStats called");
         synchronized (this) {
-            if (state != ActivityState.RUNNING || isFetchingStats) {
-                LOGGER.log(level,
-                        "returning immediately since state = " + state + " and 
fetchingStats = " + isFetchingStats);
+            if (state != ActivityState.RUNNING) {
+                LOGGER.log(level, "returning immediately since state = " + 
state);
+                notifySubscribers(statsUpdatedEvent);
+                return;
+            } else if (isFetchingStats) {
+                LOGGER.log(level, "returning immediately since fetchingStats = 
" + isFetchingStats);
                 return;
             } else {
                 isFetchingStats = true;
@@ -426,6 +425,8 @@
         } else {
             throw new 
RuntimeDataException(ErrorCode.ACTIVE_ENTITY_CANNOT_BE_STOPPED, entityId, 
state);
         }
+        this.stats = DEFAULT_ACTIVE_STATS;
+        notifySubscribers(statsUpdatedEvent);
     }
 
     public RecoveryTask getRecoveryTask() {
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 6b4483c..e2de8d5 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -33,6 +33,7 @@
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
@@ -76,7 +77,9 @@
 import org.apache.asterix.common.utils.JobUtils;
 import org.apache.asterix.common.utils.JobUtils.ProgressState;
 import org.apache.asterix.compiler.provider.ILangCompilationProvider;
+import org.apache.asterix.external.dataflow.FeedRecordDataFlowController;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.feed.watch.StatsSubscriber;
 import org.apache.asterix.external.indexing.ExternalFile;
 import org.apache.asterix.external.indexing.IndexingConstants;
 import org.apache.asterix.external.operators.FeedIntakeOperatorNodePushable;
@@ -126,6 +129,7 @@
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataTransactionContext;
 import org.apache.asterix.metadata.bootstrap.MetadataBuiltinEntities;
+import org.apache.asterix.metadata.bootstrap.MetadataRecordTypes;
 import org.apache.asterix.metadata.dataset.hints.DatasetHints;
 import 
org.apache.asterix.metadata.dataset.hints.DatasetHints.DatasetNodegroupCardinalityHint;
 import org.apache.asterix.metadata.declared.MetadataProvider;
@@ -205,6 +209,9 @@
 import org.apache.hyracks.control.common.job.profiling.om.JobletProfile;
 import org.apache.hyracks.control.common.job.profiling.om.TaskProfile;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.node.ObjectNode;
 
 /*
  * Provides functionality for executing a batch of Query statements (queries 
included)
@@ -554,7 +561,8 @@
                 throw new AlgebricksException(": type " + itemTypeName + " 
could not be found.");
             }
             String ngName = ngNameId != null ? ngNameId.getValue()
-                    : configureNodegroupForDataset(appCtx, dd.getHints(), 
dataverseName, datasetName, metadataProvider);
+                    : DatasetUtil.configureNodegroupForDataset(appCtx, 
dd.getHints(), dataverseName, datasetName,
+                            metadataProvider);
 
             if (compactionPolicy == null) {
                 compactionPolicy = GlobalConfig.DEFAULT_COMPACTION_POLICY_NAME;
@@ -711,33 +719,6 @@
             throw new CompilationException("Dataset " + 
dataset.getDataverseName() + "." + dataset.getDatasetName()
                     + " is currently being " + "fed into by the following 
active entities.\n" + builder.toString());
         }
-    }
-
-    protected static String configureNodegroupForDataset(ICcApplicationContext 
appCtx, Map<String, String> hints,
-            String dataverseName, String datasetName, MetadataProvider 
metadataProvider) throws Exception {
-        IClusterStateManager csm = appCtx.getClusterStateManager();
-        Set<String> allNodes = csm.getParticipantNodes(true);
-        Set<String> selectedNodes = new LinkedHashSet<>();
-        String hintValue = hints.get(DatasetNodegroupCardinalityHint.NAME);
-        if (hintValue == null) {
-            selectedNodes.addAll(allNodes);
-        } else {
-            int nodegroupCardinality;
-            final Pair<Boolean, String> validation = 
DatasetHints.validate(appCtx, DatasetNodegroupCardinalityHint.NAME,
-                    hints.get(DatasetNodegroupCardinalityHint.NAME));
-            boolean valid = validation.first;
-            if (!valid) {
-                throw new CompilationException(
-                        "Incorrect use of hint '" + 
DatasetNodegroupCardinalityHint.NAME + "': " + validation.second);
-            } else {
-                nodegroupCardinality = 
Integer.parseInt(hints.get(DatasetNodegroupCardinalityHint.NAME));
-            }
-            List<String> allNodeList = new ArrayList<>(allNodes);
-            Collections.shuffle(allNodeList);
-            selectedNodes.addAll(allNodeList.subList(0, nodegroupCardinality));
-        }
-        // Creates the associated node group for the dataset.
-        return DatasetUtil.createNodeGroupForNewDataset(dataverseName, 
datasetName, selectedNodes, metadataProvider);
     }
 
     protected void handleCreateIndexStatement(MetadataProvider 
metadataProvider, Statement stmt,
@@ -2014,6 +1995,21 @@
             }
             doDropFeed(hcc, metadataProvider, feed);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+            // drop log dataset
+            if 
(feed.getAdapterConfiguration().containsKey(ExternalDataConstants.LOG_EXTERNAL_DATASET))
 {
+                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+                metadataProvider.setMetadataTxnContext(mdTxnCtx);
+                String failedDatasetName = 
feed.getAdapterConfiguration().get(ExternalDataConstants.LOG_EXTERNAL_DATASET);
+                MetadataLockUtil.dropDatasetBegin(lockManager, 
metadataProvider.getLocks(), dataverseName,
+                        dataverseName + "." + failedDatasetName);
+                Dataset ds = metadataProvider.findDataset(dataverseName, 
failedDatasetName);
+                if (ds != null) {
+                    ds.drop(metadataProvider, new MutableObject<>(mdTxnCtx), 
new ArrayList<>(), new MutableBoolean(true),
+                            new 
MutableObject<>(JobUtils.ProgressState.NO_PROGRESS), hcc, true);
+                }
+                mdTxnCtx = metadataProvider.getMetadataTxnContext();
+                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+            }
         } catch (Exception e) {
             abort(e, e, mdTxnCtx);
             throw e;
@@ -2078,11 +2074,11 @@
         String dataverseName = getActiveDataverse(sfs.getDataverseName());
         String feedName = sfs.getFeedName().getValue();
         MetadataTransactionContext mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
         boolean committed = false;
         MetadataLockUtil.startFeedBegin(lockManager, 
metadataProvider.getLocks(), dataverseName,
                 dataverseName + "." + feedName);
         try {
-            metadataProvider.setMetadataTxnContext(mdTxnCtx);
             // Runtime handler
             EntityId entityId = new EntityId(Feed.EXTENSION_NAME, 
dataverseName, feedName);
             // Feed & Feed Connections
@@ -2116,6 +2112,13 @@
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             committed = true;
             listener.start(metadataProvider);
+
+            if 
(feed.getAdapterConfiguration().containsKey(ExternalDataConstants.LOG_EXTERNAL_DATASET))
 {
+                String failedDatasetName = feed.getAdapterConfiguration()
+                        .get(ExternalDataConstants.LOG_EXTERNAL_DATASET);
+                ((StartFeedStatement) stmt).addLogDataset(appCtx, 
failedDatasetName, lockManager, metadataProvider,
+                        hcc, listener, dataverseName);
+            }
         } catch (Exception e) {
             if (!committed) {
                 abort(e, e, mdTxnCtx);
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-metadata/feed-with-log-dataset-metadata.1.ddl.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-metadata/feed-with-log-dataset-metadata.1.ddl.sqlpp
new file mode 100644
index 0000000..1aeb983
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-metadata/feed-with-log-dataset-metadata.1.ddl.sqlpp
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+drop dataverse experiments if exists;
+create dataverse experiments;
+use experiments;
+
+create type TwitterUserType as closed {
+    `screen-name`: string,
+    lang: string,
+    friends_count: int32,
+    statuses_count: int32,
+    name: string,
+    followers_count: int32
+};
+
+create type TweetMessageType as closed {
+    tweetid: string,
+    `tweetid-copy`:string,
+    user: TwitterUserType,
+    `sender-location`: point,
+    `send-time`: datetime,
+    `send-time-copy`:datetime,
+    `referred-topics`: {{ string }},
+    `message-text`: string
+};
+
+create dataset Tweets1(TweetMessageType) primary key tweetid;
+
+create feed TweetFeed using socket_adapter
+(
+    ("sockets"="asterix_nc1:10001"),
+    ("address-type"="NC"),
+    ("type-name"="TweetMessageType"),
+    ("format"="adm"),
+    ("log-dataset"="TweetLog")
+);
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-metadata/feed-with-log-dataset-metadata.18.ddl.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-metadata/feed-with-log-dataset-metadata.18.ddl.sqlpp
new file mode 100644
index 0000000..e4d2615
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-metadata/feed-with-log-dataset-metadata.18.ddl.sqlpp
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use experiments;
+drop dataverse experiments;
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-metadata/feed-with-log-dataset-metadata.2.update.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-metadata/feed-with-log-dataset-metadata.2.update.sqlpp
new file mode 100644
index 0000000..03d6d6f
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-metadata/feed-with-log-dataset-metadata.2.update.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use experiments;
+
+connect feed TweetFeed to dataset Tweets1;
+
+start feed TweetFeed;
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-metadata/feed-with-log-dataset-metadata.3.query.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-metadata/feed-with-log-dataset-metadata.3.query.sqlpp
new file mode 100644
index 0000000..b63ce54
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-metadata/feed-with-log-dataset-metadata.3.query.sqlpp
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use experiments;
+
+select t.DataverseName, t.DatasetName, t.DatatypeDataverseName, 
t.DatatypeName, t.DatasetType, t.GroupName, t.CompactionPolicy
+from Metadata.`Dataset` t
+where t.DatasetName = 'TweetLog';
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-metadata/feed-with-log-dataset-metadata.4.query.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-metadata/feed-with-log-dataset-metadata.4.query.sqlpp
new file mode 100644
index 0000000..0100bfa
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-metadata/feed-with-log-dataset-metadata.4.query.sqlpp
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use experiments;
+
+select t.FailedRecordErrorMessage, t.FailedRecord from TweetLog t;
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-metadata/feed-with-log-dataset-metadata.5.update.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-metadata/feed-with-log-dataset-metadata.5.update.sqlpp
new file mode 100644
index 0000000..d957482
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-metadata/feed-with-log-dataset-metadata.5.update.sqlpp
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use experiments;
+stop feed TweetFeed;
+drop feed TweetFeed;
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-metadata/feed-with-log-dataset-metadata.6.query.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-metadata/feed-with-log-dataset-metadata.6.query.sqlpp
new file mode 100644
index 0000000..b63ce54
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-metadata/feed-with-log-dataset-metadata.6.query.sqlpp
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use experiments;
+
+select t.DataverseName, t.DatasetName, t.DatatypeDataverseName, 
t.DatatypeName, t.DatasetType, t.GroupName, t.CompactionPolicy
+from Metadata.`Dataset` t
+where t.DatasetName = 'TweetLog';
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.1.ddl.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.1.ddl.sqlpp
new file mode 100644
index 0000000..bf6cd58
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.1.ddl.sqlpp
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+drop dataverse experiments if exists;
+create dataverse experiments;
+use experiments;
+
+create type TwitterUserType as closed {
+    `screen-name`: string,
+    lang: string,
+    friends_count: int32,
+    statuses_count: int32,
+    name: string,
+    followers_count: int32
+};
+
+create type TweetMessageType as closed {
+    tweetid: string,
+    `tweetid-copy`:string,
+    user: TwitterUserType,
+    `sender-location`: point,
+    `send-time`: datetime,
+    `send-time-copy`:datetime,
+    `referred-topics`: {{ string }},
+    `message-text`: string
+};
+
+create dataset Tweets1(TweetMessageType) primary key tweetid;
+
+create feed TweetFeed using socket_adapter
+(
+    ("sockets"="asterix_nc1:10001, asterix_nc2:10002"),
+    ("address-type"="NC"),
+    ("type-name"="TweetMessageType"),
+    ("format"="adm"),
+    ("log-dataset"="TweetLog")
+);
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.10.query.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.10.query.sqlpp
new file mode 100644
index 0000000..0100bfa
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.10.query.sqlpp
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use experiments;
+
+select t.FailedRecordErrorMessage, t.FailedRecord from TweetLog t;
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.11.update.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.11.update.sqlpp
new file mode 100644
index 0000000..c32f572
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.11.update.sqlpp
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use experiments;
+stop feed TweetFeed;
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.12.update.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.12.update.sqlpp
new file mode 100644
index 0000000..9c516e5
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.12.update.sqlpp
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use  experiments;
+drop feed TweetFeed;
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.13.query.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.13.query.sqlpp
new file mode 100644
index 0000000..2d66776
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.13.query.sqlpp
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use experiments;
+
+select * from Metadata.`Dataset` t where t.DatasetName = 'TweetLog';
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.14.server.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.14.server.sqlpp
new file mode 100644
index 0000000..c3ba795
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.14.server.sqlpp
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+stop 10001
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.15.ddl.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.15.ddl.sqlpp
new file mode 100644
index 0000000..e4d2615
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.15.ddl.sqlpp
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use experiments;
+drop dataverse experiments;
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.2.update.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.2.update.sqlpp
new file mode 100644
index 0000000..03d6d6f
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.2.update.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use experiments;
+
+connect feed TweetFeed to dataset Tweets1;
+
+start feed TweetFeed;
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.3.server.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.3.server.sqlpp
new file mode 100644
index 0000000..ccf9b5b
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.3.server.sqlpp
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+start client 10001 file-client 127.0.0.1 
../asterix-app/data/twitter/mixed_tweets.adm 500 50 1000
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.4.server.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.4.server.sqlpp
new file mode 100644
index 0000000..d8aafe6
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.4.server.sqlpp
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+start client 10002 file-client 127.0.0.1 
../asterix-app/data/twitter/mixed_tweets.adm 500 50 1000
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.5.sleep.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.5.sleep.sqlpp
new file mode 100644
index 0000000..6559ae8
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.5.sleep.sqlpp
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+2000
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.6.query.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.6.query.sqlpp
new file mode 100644
index 0000000..0100bfa
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.6.query.sqlpp
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use experiments;
+
+select t.FailedRecordErrorMessage, t.FailedRecord from TweetLog t;
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.7.update.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.7.update.sqlpp
new file mode 100644
index 0000000..c32f572
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.7.update.sqlpp
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use experiments;
+stop feed TweetFeed;
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.8.query.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.8.query.sqlpp
new file mode 100644
index 0000000..0100bfa
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.8.query.sqlpp
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use experiments;
+
+select t.FailedRecordErrorMessage, t.FailedRecord from TweetLog t;
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.9.update.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.9.update.sqlpp
new file mode 100644
index 0000000..4d104ef
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.9.update.sqlpp
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use experiments;
+start feed TweetFeed;
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feed-with-log-dataset-metadata/feed-with-log-dataset-metadata.3.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feed-with-log-dataset-metadata/feed-with-log-dataset-metadata.3.adm
new file mode 100644
index 0000000..1b448c3
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feed-with-log-dataset-metadata/feed-with-log-dataset-metadata.3.adm
@@ -0,0 +1 @@
+{ "DataverseName": "experiments", "DatasetName": "TweetLog", 
"DatatypeDataverseName": "Metadata", "DatatypeName": "AnyObject", 
"DatasetType": "EXTERNAL", "GroupName": "experiments.TweetLog", 
"CompactionPolicy": "prefix" }
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feed-with-log-dataset-metadata/feed-with-log-dataset-metadata.4.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feed-with-log-dataset-metadata/feed-with-log-dataset-metadata.4.adm
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feed-with-log-dataset-metadata/feed-with-log-dataset-metadata.4.adm
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feed-with-log-dataset-metadata/feed-with-log-dataset-metadata.6.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feed-with-log-dataset-metadata/feed-with-log-dataset-metadata.6.adm
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feed-with-log-dataset-metadata/feed-with-log-dataset-metadata.6.adm
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.10.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.10.adm
new file mode 100644
index 0000000..7700f26
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.10.adm
@@ -0,0 +1,6 @@
+{ "FailedRecordErrorMessage": "Parser failed to parse record", "FailedRecord": 
"{ \"create_at\": datetime(\"2012-05-01T09:15:07.000Z\"), \"id\": 
197358499422928896, \"text\": \"I hate writing document\", 
\"in_reply_to_status\": -1, \"in_reply_to_user\": -1, \"favorite_count\": -1, 
\"coordinate\": point(\"-81.6430449, 38.3092672\"), \"retweet_count\": 0, 
\"lang\": \"null\", \"is_retweet\": false, \"hashtags\": {{ \"hate\", 
\"document\" }}, \"user_mentions\": null, \"user\": { \"id\": 331998689, 
\"name\": \"ImAGlenardenNigga\", \"screen_name\": \"WhiteBoyTurntUp\", 
\"lang\": \"en\", \"location\": \"Glenarden D $M$ V \", \"create_at\": 
date(\"2011-07-08\"), \"description\": \"#TeamNAS #TeamTatted #TeamTakin 
#TeamLightSkin #TeamRollUp #TeamGDHU #TeamGlenarden #Follow My Folk's 
@EfffYou_PayMe & Go Follow My Short Stuff @_Chinkyy \", \"followers_count\": 
1629, \"friends_count\": 1542, \"statues_count\": 40754 }, \"place\": { 
\"country\": \"United States\", \"country_code\": \"United State
 s\", \"full_name\": \"Charleston, WV\", \"id\": \"44439f1538ac3ca0\", 
\"name\": \"Charleston\", \"place_type\": \"city\", \"bounding_box\": 
rectangle(\"-81.727777,38.281139 -81.559673,38.405759\") }, \"geo_tag\": { 
\"stateID\": 54, \"stateName\": \"West Virgnia\", \"countyID\": 54039, 
\"countyName\": \"Kanawha\", \"cityID\": 5414600, \"cityName\": \"Charleston\" 
} }" }
+{ "FailedRecordErrorMessage": "Parser failed to parse record", "FailedRecord": 
"{ \"create_at\": datetime(\"2012-05-01T09:15:07.000Z\"), \"id\": 
197358499422928891, \"text\": \"I hate writing document\", 
\"in_reply_to_status\": -1, \"in_reply_to_user\": -1, \"favorite_count\": -1, 
\"coordinate\": point(\"-81.6430449, 38.3092672\"), \"retweet_count\": 0, 
\"lang\": \"null\", \"is_retweet\": false, \"hashtags\": {{ \"hate\", 
\"document\" }}, \"user_mentions\": null, \"user\": { \"id\": 331998689, 
\"name\": \"ImAGlenardenNigga\", \"screen_name\": \"WhiteBoyTurntUp\", 
\"lang\": \"en\", \"location\": \"Glenarden D $M$ V \", \"create_at\": 
date(\"2011-07-08\"), \"description\": \"#TeamNAS #TeamTatted #TeamTakin 
#TeamLightSkin #TeamRollUp #TeamGDHU #TeamGlenarden #Follow My Folk's 
@EfffYou_PayMe & Go Follow My Short Stuff @_Chinkyy \", \"followers_count\": 
1629, \"friends_count\": 1542, \"statues_count\": 40754 }, \"place\": { 
\"country\": \"United States\", \"country_code\": \"United State
 s\", \"full_name\": \"Charleston, WV\", \"id\": \"44439f1538ac3ca0\", 
\"name\": \"Charleston\", \"place_type\": \"city\", \"bounding_box\": 
rectangle(\"-81.727777,38.281139 -81.559673,38.405759\") }, \"geo_tag\": { 
\"stateID\": 54, \"stateName\": \"West Virgnia\", \"countyID\": 54039, 
\"countyName\": \"Kanawha\", \"cityID\": 5414600, \"cityName\": \"Charleston\" 
} }" }
+{ "FailedRecordErrorMessage": "Parser failed to parse record", "FailedRecord": 
"{ \"create_at\": datetime(\"2012-05-01T09:15:07.000Z\"), \"id\": 
197358499422928892, \"text\": \"I hate writing document\", 
\"in_reply_to_status\": -1, \"in_reply_to_user\": -1, \"favorite_count\": -1, 
\"coordinate\": point(\"-81.6430449, 38.3092672\"), \"retweet_count\": 0, 
\"lang\": \"null\", \"is_retweet\": false, \"hashtags\": {{ \"hate\", 
\"document\" }}, \"user_mentions\": null, \"user\": { \"id\": 331998689, 
\"name\": \"ImAGlenardenNigga\", \"screen_name\": \"WhiteBoyTurntUp\", 
\"lang\": \"en\", \"location\": \"Glenarden D $M$ V \", \"create_at\": 
date(\"2011-07-08\"), \"description\": \"#TeamNAS #TeamTatted #TeamTakin 
#TeamLightSkin #TeamRollUp #TeamGDHU #TeamGlenarden #Follow My Folk's 
@EfffYou_PayMe & Go Follow My Short Stuff @_Chinkyy \", \"followers_count\": 
1629, \"friends_count\": 1542, \"statues_count\": 40754 }, \"place\": { 
\"country\": \"United States\", \"country_code\": \"United State
 s\", \"full_name\": \"Charleston, WV\", \"id\": \"44439f1538ac3ca0\", 
\"name\": \"Charleston\", \"place_type\": \"city\", \"bounding_box\": 
rectangle(\"-81.727777,38.281139 -81.559673,38.405759\") }, \"geo_tag\": { 
\"stateID\": 54, \"stateName\": \"West Virgnia\", \"countyID\": 54039, 
\"countyName\": \"Kanawha\", \"cityID\": 5414600, \"cityName\": \"Charleston\" 
} }" }
+{ "FailedRecordErrorMessage": "Parser failed to parse record", "FailedRecord": 
"{ \"create_at\": datetime(\"2012-05-01T09:15:07.000Z\"), \"id\": 
197358499422928896, \"text\": \"I hate writing document\", 
\"in_reply_to_status\": -1, \"in_reply_to_user\": -1, \"favorite_count\": -1, 
\"coordinate\": point(\"-81.6430449, 38.3092672\"), \"retweet_count\": 0, 
\"lang\": \"null\", \"is_retweet\": false, \"hashtags\": {{ \"hate\", 
\"document\" }}, \"user_mentions\": null, \"user\": { \"id\": 331998689, 
\"name\": \"ImAGlenardenNigga\", \"screen_name\": \"WhiteBoyTurntUp\", 
\"lang\": \"en\", \"location\": \"Glenarden D $M$ V \", \"create_at\": 
date(\"2011-07-08\"), \"description\": \"#TeamNAS #TeamTatted #TeamTakin 
#TeamLightSkin #TeamRollUp #TeamGDHU #TeamGlenarden #Follow My Folk's 
@EfffYou_PayMe & Go Follow My Short Stuff @_Chinkyy \", \"followers_count\": 
1629, \"friends_count\": 1542, \"statues_count\": 40754 }, \"place\": { 
\"country\": \"United States\", \"country_code\": \"United State
 s\", \"full_name\": \"Charleston, WV\", \"id\": \"44439f1538ac3ca0\", 
\"name\": \"Charleston\", \"place_type\": \"city\", \"bounding_box\": 
rectangle(\"-81.727777,38.281139 -81.559673,38.405759\") }, \"geo_tag\": { 
\"stateID\": 54, \"stateName\": \"West Virgnia\", \"countyID\": 54039, 
\"countyName\": \"Kanawha\", \"cityID\": 5414600, \"cityName\": \"Charleston\" 
} }" }
+{ "FailedRecordErrorMessage": "Parser failed to parse record", "FailedRecord": 
"{ \"create_at\": datetime(\"2012-05-01T09:15:07.000Z\"), \"id\": 
197358499422928891, \"text\": \"I hate writing document\", 
\"in_reply_to_status\": -1, \"in_reply_to_user\": -1, \"favorite_count\": -1, 
\"coordinate\": point(\"-81.6430449, 38.3092672\"), \"retweet_count\": 0, 
\"lang\": \"null\", \"is_retweet\": false, \"hashtags\": {{ \"hate\", 
\"document\" }}, \"user_mentions\": null, \"user\": { \"id\": 331998689, 
\"name\": \"ImAGlenardenNigga\", \"screen_name\": \"WhiteBoyTurntUp\", 
\"lang\": \"en\", \"location\": \"Glenarden D $M$ V \", \"create_at\": 
date(\"2011-07-08\"), \"description\": \"#TeamNAS #TeamTatted #TeamTakin 
#TeamLightSkin #TeamRollUp #TeamGDHU #TeamGlenarden #Follow My Folk's 
@EfffYou_PayMe & Go Follow My Short Stuff @_Chinkyy \", \"followers_count\": 
1629, \"friends_count\": 1542, \"statues_count\": 40754 }, \"place\": { 
\"country\": \"United States\", \"country_code\": \"United State
 s\", \"full_name\": \"Charleston, WV\", \"id\": \"44439f1538ac3ca0\", 
\"name\": \"Charleston\", \"place_type\": \"city\", \"bounding_box\": 
rectangle(\"-81.727777,38.281139 -81.559673,38.405759\") }, \"geo_tag\": { 
\"stateID\": 54, \"stateName\": \"West Virgnia\", \"countyID\": 54039, 
\"countyName\": \"Kanawha\", \"cityID\": 5414600, \"cityName\": \"Charleston\" 
} }" }
+{ "FailedRecordErrorMessage": "Parser failed to parse record", "FailedRecord": 
"{ \"create_at\": datetime(\"2012-05-01T09:15:07.000Z\"), \"id\": 
197358499422928892, \"text\": \"I hate writing document\", 
\"in_reply_to_status\": -1, \"in_reply_to_user\": -1, \"favorite_count\": -1, 
\"coordinate\": point(\"-81.6430449, 38.3092672\"), \"retweet_count\": 0, 
\"lang\": \"null\", \"is_retweet\": false, \"hashtags\": {{ \"hate\", 
\"document\" }}, \"user_mentions\": null, \"user\": { \"id\": 331998689, 
\"name\": \"ImAGlenardenNigga\", \"screen_name\": \"WhiteBoyTurntUp\", 
\"lang\": \"en\", \"location\": \"Glenarden D $M$ V \", \"create_at\": 
date(\"2011-07-08\"), \"description\": \"#TeamNAS #TeamTatted #TeamTakin 
#TeamLightSkin #TeamRollUp #TeamGDHU #TeamGlenarden #Follow My Folk's 
@EfffYou_PayMe & Go Follow My Short Stuff @_Chinkyy \", \"followers_count\": 
1629, \"friends_count\": 1542, \"statues_count\": 40754 }, \"place\": { 
\"country\": \"United States\", \"country_code\": \"United State
 s\", \"full_name\": \"Charleston, WV\", \"id\": \"44439f1538ac3ca0\", 
\"name\": \"Charleston\", \"place_type\": \"city\", \"bounding_box\": 
rectangle(\"-81.727777,38.281139 -81.559673,38.405759\") }, \"geo_tag\": { 
\"stateID\": 54, \"stateName\": \"West Virgnia\", \"countyID\": 54039, 
\"countyName\": \"Kanawha\", \"cityID\": 5414600, \"cityName\": \"Charleston\" 
} }" }
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.13.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.13.adm
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.13.adm
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.6.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.6.adm
new file mode 100644
index 0000000..7700f26
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.6.adm
@@ -0,0 +1,6 @@
+{ "FailedRecordErrorMessage": "Parser failed to parse record", "FailedRecord": 
"{ \"create_at\": datetime(\"2012-05-01T09:15:07.000Z\"), \"id\": 
197358499422928896, \"text\": \"I hate writing document\", 
\"in_reply_to_status\": -1, \"in_reply_to_user\": -1, \"favorite_count\": -1, 
\"coordinate\": point(\"-81.6430449, 38.3092672\"), \"retweet_count\": 0, 
\"lang\": \"null\", \"is_retweet\": false, \"hashtags\": {{ \"hate\", 
\"document\" }}, \"user_mentions\": null, \"user\": { \"id\": 331998689, 
\"name\": \"ImAGlenardenNigga\", \"screen_name\": \"WhiteBoyTurntUp\", 
\"lang\": \"en\", \"location\": \"Glenarden D $M$ V \", \"create_at\": 
date(\"2011-07-08\"), \"description\": \"#TeamNAS #TeamTatted #TeamTakin 
#TeamLightSkin #TeamRollUp #TeamGDHU #TeamGlenarden #Follow My Folk's 
@EfffYou_PayMe & Go Follow My Short Stuff @_Chinkyy \", \"followers_count\": 
1629, \"friends_count\": 1542, \"statues_count\": 40754 }, \"place\": { 
\"country\": \"United States\", \"country_code\": \"United State
 s\", \"full_name\": \"Charleston, WV\", \"id\": \"44439f1538ac3ca0\", 
\"name\": \"Charleston\", \"place_type\": \"city\", \"bounding_box\": 
rectangle(\"-81.727777,38.281139 -81.559673,38.405759\") }, \"geo_tag\": { 
\"stateID\": 54, \"stateName\": \"West Virgnia\", \"countyID\": 54039, 
\"countyName\": \"Kanawha\", \"cityID\": 5414600, \"cityName\": \"Charleston\" 
} }" }
+{ "FailedRecordErrorMessage": "Parser failed to parse record", "FailedRecord": 
"{ \"create_at\": datetime(\"2012-05-01T09:15:07.000Z\"), \"id\": 
197358499422928891, \"text\": \"I hate writing document\", 
\"in_reply_to_status\": -1, \"in_reply_to_user\": -1, \"favorite_count\": -1, 
\"coordinate\": point(\"-81.6430449, 38.3092672\"), \"retweet_count\": 0, 
\"lang\": \"null\", \"is_retweet\": false, \"hashtags\": {{ \"hate\", 
\"document\" }}, \"user_mentions\": null, \"user\": { \"id\": 331998689, 
\"name\": \"ImAGlenardenNigga\", \"screen_name\": \"WhiteBoyTurntUp\", 
\"lang\": \"en\", \"location\": \"Glenarden D $M$ V \", \"create_at\": 
date(\"2011-07-08\"), \"description\": \"#TeamNAS #TeamTatted #TeamTakin 
#TeamLightSkin #TeamRollUp #TeamGDHU #TeamGlenarden #Follow My Folk's 
@EfffYou_PayMe & Go Follow My Short Stuff @_Chinkyy \", \"followers_count\": 
1629, \"friends_count\": 1542, \"statues_count\": 40754 }, \"place\": { 
\"country\": \"United States\", \"country_code\": \"United State
 s\", \"full_name\": \"Charleston, WV\", \"id\": \"44439f1538ac3ca0\", 
\"name\": \"Charleston\", \"place_type\": \"city\", \"bounding_box\": 
rectangle(\"-81.727777,38.281139 -81.559673,38.405759\") }, \"geo_tag\": { 
\"stateID\": 54, \"stateName\": \"West Virgnia\", \"countyID\": 54039, 
\"countyName\": \"Kanawha\", \"cityID\": 5414600, \"cityName\": \"Charleston\" 
} }" }
+{ "FailedRecordErrorMessage": "Parser failed to parse record", "FailedRecord": 
"{ \"create_at\": datetime(\"2012-05-01T09:15:07.000Z\"), \"id\": 
197358499422928892, \"text\": \"I hate writing document\", 
\"in_reply_to_status\": -1, \"in_reply_to_user\": -1, \"favorite_count\": -1, 
\"coordinate\": point(\"-81.6430449, 38.3092672\"), \"retweet_count\": 0, 
\"lang\": \"null\", \"is_retweet\": false, \"hashtags\": {{ \"hate\", 
\"document\" }}, \"user_mentions\": null, \"user\": { \"id\": 331998689, 
\"name\": \"ImAGlenardenNigga\", \"screen_name\": \"WhiteBoyTurntUp\", 
\"lang\": \"en\", \"location\": \"Glenarden D $M$ V \", \"create_at\": 
date(\"2011-07-08\"), \"description\": \"#TeamNAS #TeamTatted #TeamTakin 
#TeamLightSkin #TeamRollUp #TeamGDHU #TeamGlenarden #Follow My Folk's 
@EfffYou_PayMe & Go Follow My Short Stuff @_Chinkyy \", \"followers_count\": 
1629, \"friends_count\": 1542, \"statues_count\": 40754 }, \"place\": { 
\"country\": \"United States\", \"country_code\": \"United State
 s\", \"full_name\": \"Charleston, WV\", \"id\": \"44439f1538ac3ca0\", 
\"name\": \"Charleston\", \"place_type\": \"city\", \"bounding_box\": 
rectangle(\"-81.727777,38.281139 -81.559673,38.405759\") }, \"geo_tag\": { 
\"stateID\": 54, \"stateName\": \"West Virgnia\", \"countyID\": 54039, 
\"countyName\": \"Kanawha\", \"cityID\": 5414600, \"cityName\": \"Charleston\" 
} }" }
+{ "FailedRecordErrorMessage": "Parser failed to parse record", "FailedRecord": 
"{ \"create_at\": datetime(\"2012-05-01T09:15:07.000Z\"), \"id\": 
197358499422928896, \"text\": \"I hate writing document\", 
\"in_reply_to_status\": -1, \"in_reply_to_user\": -1, \"favorite_count\": -1, 
\"coordinate\": point(\"-81.6430449, 38.3092672\"), \"retweet_count\": 0, 
\"lang\": \"null\", \"is_retweet\": false, \"hashtags\": {{ \"hate\", 
\"document\" }}, \"user_mentions\": null, \"user\": { \"id\": 331998689, 
\"name\": \"ImAGlenardenNigga\", \"screen_name\": \"WhiteBoyTurntUp\", 
\"lang\": \"en\", \"location\": \"Glenarden D $M$ V \", \"create_at\": 
date(\"2011-07-08\"), \"description\": \"#TeamNAS #TeamTatted #TeamTakin 
#TeamLightSkin #TeamRollUp #TeamGDHU #TeamGlenarden #Follow My Folk's 
@EfffYou_PayMe & Go Follow My Short Stuff @_Chinkyy \", \"followers_count\": 
1629, \"friends_count\": 1542, \"statues_count\": 40754 }, \"place\": { 
\"country\": \"United States\", \"country_code\": \"United State
 s\", \"full_name\": \"Charleston, WV\", \"id\": \"44439f1538ac3ca0\", 
\"name\": \"Charleston\", \"place_type\": \"city\", \"bounding_box\": 
rectangle(\"-81.727777,38.281139 -81.559673,38.405759\") }, \"geo_tag\": { 
\"stateID\": 54, \"stateName\": \"West Virgnia\", \"countyID\": 54039, 
\"countyName\": \"Kanawha\", \"cityID\": 5414600, \"cityName\": \"Charleston\" 
} }" }
+{ "FailedRecordErrorMessage": "Parser failed to parse record", "FailedRecord": 
"{ \"create_at\": datetime(\"2012-05-01T09:15:07.000Z\"), \"id\": 
197358499422928891, \"text\": \"I hate writing document\", 
\"in_reply_to_status\": -1, \"in_reply_to_user\": -1, \"favorite_count\": -1, 
\"coordinate\": point(\"-81.6430449, 38.3092672\"), \"retweet_count\": 0, 
\"lang\": \"null\", \"is_retweet\": false, \"hashtags\": {{ \"hate\", 
\"document\" }}, \"user_mentions\": null, \"user\": { \"id\": 331998689, 
\"name\": \"ImAGlenardenNigga\", \"screen_name\": \"WhiteBoyTurntUp\", 
\"lang\": \"en\", \"location\": \"Glenarden D $M$ V \", \"create_at\": 
date(\"2011-07-08\"), \"description\": \"#TeamNAS #TeamTatted #TeamTakin 
#TeamLightSkin #TeamRollUp #TeamGDHU #TeamGlenarden #Follow My Folk's 
@EfffYou_PayMe & Go Follow My Short Stuff @_Chinkyy \", \"followers_count\": 
1629, \"friends_count\": 1542, \"statues_count\": 40754 }, \"place\": { 
\"country\": \"United States\", \"country_code\": \"United State
 s\", \"full_name\": \"Charleston, WV\", \"id\": \"44439f1538ac3ca0\", 
\"name\": \"Charleston\", \"place_type\": \"city\", \"bounding_box\": 
rectangle(\"-81.727777,38.281139 -81.559673,38.405759\") }, \"geo_tag\": { 
\"stateID\": 54, \"stateName\": \"West Virgnia\", \"countyID\": 54039, 
\"countyName\": \"Kanawha\", \"cityID\": 5414600, \"cityName\": \"Charleston\" 
} }" }
+{ "FailedRecordErrorMessage": "Parser failed to parse record", "FailedRecord": 
"{ \"create_at\": datetime(\"2012-05-01T09:15:07.000Z\"), \"id\": 
197358499422928892, \"text\": \"I hate writing document\", 
\"in_reply_to_status\": -1, \"in_reply_to_user\": -1, \"favorite_count\": -1, 
\"coordinate\": point(\"-81.6430449, 38.3092672\"), \"retweet_count\": 0, 
\"lang\": \"null\", \"is_retweet\": false, \"hashtags\": {{ \"hate\", 
\"document\" }}, \"user_mentions\": null, \"user\": { \"id\": 331998689, 
\"name\": \"ImAGlenardenNigga\", \"screen_name\": \"WhiteBoyTurntUp\", 
\"lang\": \"en\", \"location\": \"Glenarden D $M$ V \", \"create_at\": 
date(\"2011-07-08\"), \"description\": \"#TeamNAS #TeamTatted #TeamTakin 
#TeamLightSkin #TeamRollUp #TeamGDHU #TeamGlenarden #Follow My Folk's 
@EfffYou_PayMe & Go Follow My Short Stuff @_Chinkyy \", \"followers_count\": 
1629, \"friends_count\": 1542, \"statues_count\": 40754 }, \"place\": { 
\"country\": \"United States\", \"country_code\": \"United State
 s\", \"full_name\": \"Charleston, WV\", \"id\": \"44439f1538ac3ca0\", 
\"name\": \"Charleston\", \"place_type\": \"city\", \"bounding_box\": 
rectangle(\"-81.727777,38.281139 -81.559673,38.405759\") }, \"geo_tag\": { 
\"stateID\": 54, \"stateName\": \"West Virgnia\", \"countyID\": 54039, 
\"countyName\": \"Kanawha\", \"cityID\": 5414600, \"cityName\": \"Charleston\" 
} }" }
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.8.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.8.adm
new file mode 100644
index 0000000..7700f26
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feed-with-log-dataset-runtime/feed-with-log-dataset-runtime.8.adm
@@ -0,0 +1,6 @@
+{ "FailedRecordErrorMessage": "Parser failed to parse record", "FailedRecord": 
"{ \"create_at\": datetime(\"2012-05-01T09:15:07.000Z\"), \"id\": 
197358499422928896, \"text\": \"I hate writing document\", 
\"in_reply_to_status\": -1, \"in_reply_to_user\": -1, \"favorite_count\": -1, 
\"coordinate\": point(\"-81.6430449, 38.3092672\"), \"retweet_count\": 0, 
\"lang\": \"null\", \"is_retweet\": false, \"hashtags\": {{ \"hate\", 
\"document\" }}, \"user_mentions\": null, \"user\": { \"id\": 331998689, 
\"name\": \"ImAGlenardenNigga\", \"screen_name\": \"WhiteBoyTurntUp\", 
\"lang\": \"en\", \"location\": \"Glenarden D $M$ V \", \"create_at\": 
date(\"2011-07-08\"), \"description\": \"#TeamNAS #TeamTatted #TeamTakin 
#TeamLightSkin #TeamRollUp #TeamGDHU #TeamGlenarden #Follow My Folk's 
@EfffYou_PayMe & Go Follow My Short Stuff @_Chinkyy \", \"followers_count\": 
1629, \"friends_count\": 1542, \"statues_count\": 40754 }, \"place\": { 
\"country\": \"United States\", \"country_code\": \"United State
 s\", \"full_name\": \"Charleston, WV\", \"id\": \"44439f1538ac3ca0\", 
\"name\": \"Charleston\", \"place_type\": \"city\", \"bounding_box\": 
rectangle(\"-81.727777,38.281139 -81.559673,38.405759\") }, \"geo_tag\": { 
\"stateID\": 54, \"stateName\": \"West Virgnia\", \"countyID\": 54039, 
\"countyName\": \"Kanawha\", \"cityID\": 5414600, \"cityName\": \"Charleston\" 
} }" }
+{ "FailedRecordErrorMessage": "Parser failed to parse record", "FailedRecord": 
"{ \"create_at\": datetime(\"2012-05-01T09:15:07.000Z\"), \"id\": 
197358499422928891, \"text\": \"I hate writing document\", 
\"in_reply_to_status\": -1, \"in_reply_to_user\": -1, \"favorite_count\": -1, 
\"coordinate\": point(\"-81.6430449, 38.3092672\"), \"retweet_count\": 0, 
\"lang\": \"null\", \"is_retweet\": false, \"hashtags\": {{ \"hate\", 
\"document\" }}, \"user_mentions\": null, \"user\": { \"id\": 331998689, 
\"name\": \"ImAGlenardenNigga\", \"screen_name\": \"WhiteBoyTurntUp\", 
\"lang\": \"en\", \"location\": \"Glenarden D $M$ V \", \"create_at\": 
date(\"2011-07-08\"), \"description\": \"#TeamNAS #TeamTatted #TeamTakin 
#TeamLightSkin #TeamRollUp #TeamGDHU #TeamGlenarden #Follow My Folk's 
@EfffYou_PayMe & Go Follow My Short Stuff @_Chinkyy \", \"followers_count\": 
1629, \"friends_count\": 1542, \"statues_count\": 40754 }, \"place\": { 
\"country\": \"United States\", \"country_code\": \"United State
 s\", \"full_name\": \"Charleston, WV\", \"id\": \"44439f1538ac3ca0\", 
\"name\": \"Charleston\", \"place_type\": \"city\", \"bounding_box\": 
rectangle(\"-81.727777,38.281139 -81.559673,38.405759\") }, \"geo_tag\": { 
\"stateID\": 54, \"stateName\": \"West Virgnia\", \"countyID\": 54039, 
\"countyName\": \"Kanawha\", \"cityID\": 5414600, \"cityName\": \"Charleston\" 
} }" }
+{ "FailedRecordErrorMessage": "Parser failed to parse record", "FailedRecord": 
"{ \"create_at\": datetime(\"2012-05-01T09:15:07.000Z\"), \"id\": 
197358499422928892, \"text\": \"I hate writing document\", 
\"in_reply_to_status\": -1, \"in_reply_to_user\": -1, \"favorite_count\": -1, 
\"coordinate\": point(\"-81.6430449, 38.3092672\"), \"retweet_count\": 0, 
\"lang\": \"null\", \"is_retweet\": false, \"hashtags\": {{ \"hate\", 
\"document\" }}, \"user_mentions\": null, \"user\": { \"id\": 331998689, 
\"name\": \"ImAGlenardenNigga\", \"screen_name\": \"WhiteBoyTurntUp\", 
\"lang\": \"en\", \"location\": \"Glenarden D $M$ V \", \"create_at\": 
date(\"2011-07-08\"), \"description\": \"#TeamNAS #TeamTatted #TeamTakin 
#TeamLightSkin #TeamRollUp #TeamGDHU #TeamGlenarden #Follow My Folk's 
@EfffYou_PayMe & Go Follow My Short Stuff @_Chinkyy \", \"followers_count\": 
1629, \"friends_count\": 1542, \"statues_count\": 40754 }, \"place\": { 
\"country\": \"United States\", \"country_code\": \"United State
 s\", \"full_name\": \"Charleston, WV\", \"id\": \"44439f1538ac3ca0\", 
\"name\": \"Charleston\", \"place_type\": \"city\", \"bounding_box\": 
rectangle(\"-81.727777,38.281139 -81.559673,38.405759\") }, \"geo_tag\": { 
\"stateID\": 54, \"stateName\": \"West Virgnia\", \"countyID\": 54039, 
\"countyName\": \"Kanawha\", \"cityID\": 5414600, \"cityName\": \"Charleston\" 
} }" }
+{ "FailedRecordErrorMessage": "Parser failed to parse record", "FailedRecord": 
"{ \"create_at\": datetime(\"2012-05-01T09:15:07.000Z\"), \"id\": 
197358499422928896, \"text\": \"I hate writing document\", 
\"in_reply_to_status\": -1, \"in_reply_to_user\": -1, \"favorite_count\": -1, 
\"coordinate\": point(\"-81.6430449, 38.3092672\"), \"retweet_count\": 0, 
\"lang\": \"null\", \"is_retweet\": false, \"hashtags\": {{ \"hate\", 
\"document\" }}, \"user_mentions\": null, \"user\": { \"id\": 331998689, 
\"name\": \"ImAGlenardenNigga\", \"screen_name\": \"WhiteBoyTurntUp\", 
\"lang\": \"en\", \"location\": \"Glenarden D $M$ V \", \"create_at\": 
date(\"2011-07-08\"), \"description\": \"#TeamNAS #TeamTatted #TeamTakin 
#TeamLightSkin #TeamRollUp #TeamGDHU #TeamGlenarden #Follow My Folk's 
@EfffYou_PayMe & Go Follow My Short Stuff @_Chinkyy \", \"followers_count\": 
1629, \"friends_count\": 1542, \"statues_count\": 40754 }, \"place\": { 
\"country\": \"United States\", \"country_code\": \"United State
 s\", \"full_name\": \"Charleston, WV\", \"id\": \"44439f1538ac3ca0\", 
\"name\": \"Charleston\", \"place_type\": \"city\", \"bounding_box\": 
rectangle(\"-81.727777,38.281139 -81.559673,38.405759\") }, \"geo_tag\": { 
\"stateID\": 54, \"stateName\": \"West Virgnia\", \"countyID\": 54039, 
\"countyName\": \"Kanawha\", \"cityID\": 5414600, \"cityName\": \"Charleston\" 
} }" }
+{ "FailedRecordErrorMessage": "Parser failed to parse record", "FailedRecord": 
"{ \"create_at\": datetime(\"2012-05-01T09:15:07.000Z\"), \"id\": 
197358499422928891, \"text\": \"I hate writing document\", 
\"in_reply_to_status\": -1, \"in_reply_to_user\": -1, \"favorite_count\": -1, 
\"coordinate\": point(\"-81.6430449, 38.3092672\"), \"retweet_count\": 0, 
\"lang\": \"null\", \"is_retweet\": false, \"hashtags\": {{ \"hate\", 
\"document\" }}, \"user_mentions\": null, \"user\": { \"id\": 331998689, 
\"name\": \"ImAGlenardenNigga\", \"screen_name\": \"WhiteBoyTurntUp\", 
\"lang\": \"en\", \"location\": \"Glenarden D $M$ V \", \"create_at\": 
date(\"2011-07-08\"), \"description\": \"#TeamNAS #TeamTatted #TeamTakin 
#TeamLightSkin #TeamRollUp #TeamGDHU #TeamGlenarden #Follow My Folk's 
@EfffYou_PayMe & Go Follow My Short Stuff @_Chinkyy \", \"followers_count\": 
1629, \"friends_count\": 1542, \"statues_count\": 40754 }, \"place\": { 
\"country\": \"United States\", \"country_code\": \"United State
 s\", \"full_name\": \"Charleston, WV\", \"id\": \"44439f1538ac3ca0\", 
\"name\": \"Charleston\", \"place_type\": \"city\", \"bounding_box\": 
rectangle(\"-81.727777,38.281139 -81.559673,38.405759\") }, \"geo_tag\": { 
\"stateID\": 54, \"stateName\": \"West Virgnia\", \"countyID\": 54039, 
\"countyName\": \"Kanawha\", \"cityID\": 5414600, \"cityName\": \"Charleston\" 
} }" }
+{ "FailedRecordErrorMessage": "Parser failed to parse record", "FailedRecord": 
"{ \"create_at\": datetime(\"2012-05-01T09:15:07.000Z\"), \"id\": 
197358499422928892, \"text\": \"I hate writing document\", 
\"in_reply_to_status\": -1, \"in_reply_to_user\": -1, \"favorite_count\": -1, 
\"coordinate\": point(\"-81.6430449, 38.3092672\"), \"retweet_count\": 0, 
\"lang\": \"null\", \"is_retweet\": false, \"hashtags\": {{ \"hate\", 
\"document\" }}, \"user_mentions\": null, \"user\": { \"id\": 331998689, 
\"name\": \"ImAGlenardenNigga\", \"screen_name\": \"WhiteBoyTurntUp\", 
\"lang\": \"en\", \"location\": \"Glenarden D $M$ V \", \"create_at\": 
date(\"2011-07-08\"), \"description\": \"#TeamNAS #TeamTatted #TeamTakin 
#TeamLightSkin #TeamRollUp #TeamGDHU #TeamGlenarden #Follow My Folk's 
@EfffYou_PayMe & Go Follow My Short Stuff @_Chinkyy \", \"followers_count\": 
1629, \"friends_count\": 1542, \"statues_count\": 40754 }, \"place\": { 
\"country\": \"United States\", \"country_code\": \"United State
 s\", \"full_name\": \"Charleston, WV\", \"id\": \"44439f1538ac3ca0\", 
\"name\": \"Charleston\", \"place_type\": \"city\", \"bounding_box\": 
rectangle(\"-81.727777,38.281139 -81.559673,38.405759\") }, \"geo_tag\": { 
\"stateID\": 54, \"stateName\": \"West Virgnia\", \"countyID\": 54039, 
\"countyName\": \"Kanawha\", \"cityID\": 5414600, \"cityName\": \"Charleston\" 
} }" }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml 
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
index abbcaaa..00dbcd5 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
@@ -296,6 +296,16 @@
         <expected-error>Function fundv.test_func0@1 is being used. It cannot 
be dropped</expected-error>
       </compilation-unit>
     </test-case>
+    <test-case FilePath="feeds">
+      <compilation-unit name="feed-with-log-dataset-metadata">
+        <output-dir compare="Text">feed-with-log-dataset-metadata</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="feeds">
+      <compilation-unit name="feed-with-log-dataset-runtime">
+        <output-dir compare="Text">feed-with-log-dataset-runtime</output-dir>
+      </compilation-unit>
+    </test-case>
   </test-group>
   <test-group name="upsert">
     <test-case FilePath="upsert">
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index 824f51a..afc4cf0 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -18,11 +18,13 @@
  */
 package org.apache.asterix.external.dataflow;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import com.sun.tools.doclets.internal.toolkit.util.Extern;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.external.api.IRawRecord;
@@ -36,8 +38,6 @@
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 
 public class FeedRecordDataFlowController<T> extends 
AbstractFeedDataFlowController {
-    public static final String INCOMING_RECORDS_COUNT_FIELD_NAME = 
"incoming-records-count";
-    public static final String FAILED_AT_PARSER_RECORDS_COUNT_FIELD_NAME = 
"failed-at-parser-records-count";
 
     public enum State {
         CREATED,
@@ -281,7 +281,10 @@
 
     @Override
     public String getStats() {
-        return "{\"" + INCOMING_RECORDS_COUNT_FIELD_NAME + "\": " + 
incomingRecordsCount + ", \"" +
-                FAILED_AT_PARSER_RECORDS_COUNT_FIELD_NAME + "\": " + 
failedRecordsCount + "}";
+        return "\"" + ExternalDataConstants.INCOMING_RECORDS_COUNT_FIELD_NAME 
+ "\": " + incomingRecordsCount + ", \""
+                + 
ExternalDataConstants.FAILED_AT_PARSER_RECORDS_COUNT_FIELD_NAME + "\": " + 
failedRecordsCount + ", \""
+                + ExternalDataConstants.FAILED_RECORD_LOG_LOCATION_NAME + "\": 
\""
+                + feedLogManager.getDir().toAbsolutePath() + File.separator + 
FeedLogManager.BAD_RECORDS_FILE_NAME
+                + "\"";
     }
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
index 7907e69..befd2bc 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
@@ -26,6 +26,7 @@
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.external.api.IAdapterFactory;
 import org.apache.asterix.external.dataset.adapter.FeedAdapter;
+import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -143,10 +144,8 @@
 
     @Override
     public String getStats() {
-        if (adapter != null) {
-            return "{\"adapter-stats\": " + adapter.getStats() + "}";
-        } else {
-            return "\"Runtime stats is not available.\"";
-        }
+        return "{\""+ ExternalDataConstants.PARTITION_ID_NAME + "\": " + 
this.runtimeId.getPartition() + ", "
+                + "\"" + ExternalDataConstants.FEED_NODE_ID_NAME + "\": \"" + 
this.activeManager.getNodeId() + "\", "
+                + adapter.getStats() + "}";
     }
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 3b6e7ff..700acd3 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -224,6 +224,17 @@
     public static final String KEY_READER_FACTORY = "reader-factory";
     public static final String READER_RSS = "rss_feed";
     public static final String FORMAT_CSV = "csv";
+    public static final String LOG_EXTERNAL_DATASET = "log-dataset";
+
+
+    /**
+     * External Runtime stats attributes
+     * */
+    public static final String PARTITION_ID_NAME = "adapter-partition-id";
+    public static final String FEED_NODE_ID_NAME = "node_id";
+    public static final String INCOMING_RECORDS_COUNT_FIELD_NAME = 
"incoming-records-count";
+    public static final String FAILED_AT_PARSER_RECORDS_COUNT_FIELD_NAME = 
"failed-at-parser-records-count";
+    public static final String FAILED_RECORD_LOG_LOCATION_NAME = 
"failed-record-log-file-location";
 
     public static final String ERROR_PARSE_RECORD = "Parser failed to parse 
record";
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java
index 9d887b6..2d887cb 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java
@@ -29,11 +29,14 @@
 import java.nio.file.StandardOpenOption;
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
+import java.time.Instant;
 import java.util.Date;
 import java.util.TreeSet;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.node.ObjectNode;
 
 public class FeedLogManager {
 
@@ -50,6 +53,12 @@
     public static final String START_PREFIX = "s:";
     public static final String END_PREFIX = "e:";
     private static final String DATE_FORMAT_STRING = "MM/dd/yyyy HH:mm:ss";
+
+    public static final String FIELD_NAME_FAILED_RECORD_ID = "FailedRecordID";
+    public static final String FIELD_NAME_FAILED_RECORD_ERROR_MSG = 
"FailedRecordErrorMessage";
+    public static final String FIELD_NAME_FAILED_RECORD_TIME = 
"FailedTimestamp";
+    public static final String FIELD_NAME_FAILED_RECORD_RECORD = 
"FailedRecord";
+
     public static final int PREFIX_SIZE = START_PREFIX.length() + 
DATE_FORMAT_STRING.length() + 1;
     private String currentPartition;
     private final TreeSet<String> completed;
@@ -60,6 +69,8 @@
     private final StringBuilder stringBuilder = new StringBuilder();
     private int count = 0;
     private static final DateFormat df = new 
SimpleDateFormat(DATE_FORMAT_STRING);
+    private final ObjectMapper logRecordMapper = new ObjectMapper();
+    private final ObjectNode errorRecordNode = 
logRecordMapper.createObjectNode();
 
     public FeedLogManager(File file) throws HyracksDataException {
         try {
@@ -171,13 +182,13 @@
 
     public synchronized void logRecord(String record, String errorMessage) 
throws IOException {
         stringBuilder.setLength(0);
-        stringBuilder.append(record);
-        stringBuilder.append(ExternalDataConstants.LF);
-        stringBuilder.append(df.format((new Date())));
-        stringBuilder.append(' ');
-        stringBuilder.append(errorMessage);
-        stringBuilder.append(ExternalDataConstants.LF);
-        recordLogger.write(stringBuilder.toString());
+        Date logTime = new Date();
+        errorRecordNode.removeAll();
+        errorRecordNode.put(FIELD_NAME_FAILED_RECORD_ID, logTime.getTime());
+        errorRecordNode.put(FIELD_NAME_FAILED_RECORD_TIME, df.format(logTime));
+        errorRecordNode.put(FIELD_NAME_FAILED_RECORD_ERROR_MSG, errorMessage);
+        errorRecordNode.put(FIELD_NAME_FAILED_RECORD_RECORD, record);
+        recordLogger.write(errorRecordNode.toString() + 
ExternalDataConstants.LF);
         recordLogger.flush();
     }
 
@@ -188,4 +199,8 @@
     public synchronized boolean isSplitRead(String split) {
         return completed.contains(split);
     }
+
+    public Path getDir() {
+        return this.dir;
+    }
 }
diff --git a/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj 
b/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj
index 0cc6789..d18df81 100644
--- a/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj
+++ b/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj
@@ -462,9 +462,7 @@
     ( <HINTS> hints = Properties() )?
     ( <USING> <COMPACTION> <POLICY> compactionPolicy = CompactionPolicy() 
(compactionPolicyProperties = Configuration())? )?
       {
-        ExternalDetailsDecl edd = new ExternalDetailsDecl();
-        edd.setAdapter(adapterName);
-        edd.setProperties(properties);
+        ExternalDetailsDecl edd = new ExternalDetailsDecl(adapterName, 
properties);
         dsetDecl = new DatasetDecl(nameComponents.first,
                                    nameComponents.second,
                                    typeComponents.first,
diff --git 
a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ExternalDetailsDecl.java
 
b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ExternalDetailsDecl.java
index b15579c..bcc56eb 100644
--- 
a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ExternalDetailsDecl.java
+++ 
b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ExternalDetailsDecl.java
@@ -21,14 +21,11 @@
 import java.util.Map;
 
 public class ExternalDetailsDecl implements IDatasetDetailsDecl {
-    private Map<String, String> properties;
-    private String adapter;
+    private final Map<String, String> properties;
+    private final String adapter;
 
-    public void setAdapter(String adapter) {
+    public ExternalDetailsDecl(String adapter, Map<String, String> properties) 
{
         this.adapter = adapter;
-    }
-
-    public void setProperties(Map<String, String> properties) {
         this.properties = properties;
     }
 
diff --git 
a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/StartFeedStatement.java
 
b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/StartFeedStatement.java
index b3452b5..141399c 100644
--- 
a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/StartFeedStatement.java
+++ 
b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/StartFeedStatement.java
@@ -19,11 +19,40 @@
 
 package org.apache.asterix.lang.common.statement;
 
+import org.apache.asterix.active.IActiveEntityEventsListener;
+import org.apache.asterix.common.api.IMetadataLockManager;
+import org.apache.asterix.common.config.DatasetConfig;
+import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.utils.JobUtils;
+import org.apache.asterix.external.feed.watch.StatsSubscriber;
+import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.lang.common.base.Statement;
 import org.apache.asterix.lang.common.struct.Identifier;
 import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.bootstrap.MetadataBuiltinEntities;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.ExternalDatasetDetails;
+import org.apache.asterix.metadata.utils.DatasetUtil;
+import org.apache.asterix.metadata.utils.MetadataLockUtil;
+import org.apache.asterix.metadata.utils.MetadataUtil;
+import 
org.apache.asterix.transaction.management.service.transaction.DatasetIdFactory;
+import org.apache.commons.lang3.mutable.MutableBoolean;
+import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
 
 public class StartFeedStatement implements Statement {
 
@@ -58,4 +87,65 @@
     public Identifier getFeedName() {
         return feedName;
     }
+
+    public void addLogDataset(ICcApplicationContext appCtx, String 
failedDatasetName, IMetadataLockManager lockManager,
+            MetadataProvider metadataProvider, IHyracksClientConnection hcc,
+            IActiveEntityEventsListener listener, String dvName) throws 
Exception {
+
+        MetadataTransactionContext mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
+
+        // drop the previous failed record dataset. there will be a case the 
restart feed, partition changed.
+        // so, the datset must be removed.
+        MetadataLockUtil.dropDatasetBegin(lockManager, 
metadataProvider.getLocks(), dvName,
+                dvName + "." + failedDatasetName);
+        Dataset ds = metadataProvider.findDataset(dvName, failedDatasetName);
+        if (ds != null) {
+            ds.drop(metadataProvider, new MutableObject<>(mdTxnCtx), new 
ArrayList<>(), new MutableBoolean(true),
+                    new MutableObject<>(JobUtils.ProgressState.NO_PROGRESS), 
hcc, true);
+            mdTxnCtx = metadataProvider.getMetadataTxnContext();
+        }
+
+        // get failed record log file
+        StatsSubscriber feedStatsSubscriber = new StatsSubscriber(listener);
+        listener.refreshStats(2000);
+        feedStatsSubscriber.sync();
+        String feedStats = listener.getStats();
+
+        ObjectMapper statsMapper = new ObjectMapper();
+        JsonNode statsNode = statsMapper.readTree(feedStats);
+
+        StringBuilder failedRecordsFilePath = new StringBuilder();
+
+        // Add all log files for the external dataset
+        for (int iter1 = 0; iter1 < statsNode.size(); iter1++) {
+            if (iter1 > 0) {
+                failedRecordsFilePath.append(',');
+            }
+            JsonNode partitionNode = statsNode.get(iter1);
+            
failedRecordsFilePath.append(partitionNode.get(ExternalDataConstants.FEED_NODE_ID_NAME).getTextValue()
+                    + "://" + 
partitionNode.get(ExternalDataConstants.FAILED_RECORD_LOG_LOCATION_NAME).getTextValue());
+        }
+
+        // create a new failed record dataset
+        MetadataLockUtil.createDatasetBegin(lockManager, 
metadataProvider.getLocks(), dvName, "Metadata",
+                "Metadata" + "." + 
MetadataBuiltinEntities.ANY_OBJECT_RECORD_TYPE.getTypeName(), dvName,
+                dvName + "." + null, null, null, dvName + "." + 
failedDatasetName, true);
+        Map<String, String> failedDatasetProp = new LinkedHashMap<>();
+        failedDatasetProp.put(ExternalDataConstants.KEY_FORMAT, 
ExternalDataConstants.FORMAT_JSON);
+        failedDatasetProp.put(ExternalDataConstants.KEY_PATH, 
failedRecordsFilePath.toString());
+        Map<String, String> hints = new HashMap<>();
+        String ngName = DatasetUtil.configureNodegroupForDataset(appCtx, 
hints, dvName, failedDatasetName,
+                metadataProvider);
+        metadataProvider.findDataset(dvName, failedDatasetName);
+        Dataset failedRecordDataset = new Dataset(dvName, failedDatasetName, 
"Metadata",
+                MetadataBuiltinEntities.ANY_OBJECT_RECORD_TYPE.getTypeName(), 
ngName,
+                GlobalConfig.DEFAULT_COMPACTION_POLICY_NAME, 
GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES,
+                new 
ExternalDatasetDetails(ExternalDataConstants.ALIAS_LOCALFS_ADAPTER, 
failedDatasetProp, new Date(),
+                        DatasetConfig.TransactionState.COMMIT),
+                hints, DatasetConfig.DatasetType.EXTERNAL, 
DatasetIdFactory.generateDatasetId(),
+                MetadataUtil.PENDING_ADD_OP);
+        
MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), 
failedRecordDataset);
+        MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+    }
 }
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj 
b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
index d3a64d8..5fef8e4 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
+++ b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
@@ -513,9 +513,7 @@
     ( <HINTS> hints = Properties() )?
     ( <USING> <COMPACTION> <POLICY> compactionPolicy = CompactionPolicy() 
(LOOKAHEAD(1) compactionPolicyProperties = Configuration())? )?
       {
-        ExternalDetailsDecl edd = new ExternalDetailsDecl();
-        edd.setAdapter(adapterName);
-        edd.setProperties(properties);
+        ExternalDetailsDecl edd = new ExternalDetailsDecl(adapterName, 
properties);
         dsetDecl = new DatasetDecl(nameComponents.first,
                                    nameComponents.second,
                                    typeComponents.first,
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
index e4a6ca8..10c4348 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -21,6 +21,8 @@
 import java.io.DataOutput;
 import java.rmi.RemoteException;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -29,6 +31,7 @@
 
 import org.apache.asterix.builders.IARecordBuilder;
 import org.apache.asterix.builders.RecordBuilder;
+import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.context.CorrelatedPrefixMergePolicyFactory;
 import org.apache.asterix.common.context.IStorageComponentProvider;
@@ -37,6 +40,7 @@
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.MetadataException;
 import org.apache.asterix.common.transactions.IRecoveryManager;
 import org.apache.asterix.common.transactions.JobId;
@@ -45,6 +49,7 @@
 import org.apache.asterix.formats.nontagged.TypeTraitProvider;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.dataset.hints.DatasetHints;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.CompactionPolicy;
 import org.apache.asterix.metadata.entities.Dataset;
@@ -559,4 +564,32 @@
         MetadataManager.INSTANCE.addNodegroup(mdTxnCtx, new 
NodeGroup(nodeGroup, new ArrayList<>(ncNames)));
         return nodeGroup;
     }
+
+    public static String configureNodegroupForDataset(ICcApplicationContext 
appCtx, Map<String, String> hints,
+            String dataverseName, String datasetName, MetadataProvider 
metadataProvider) throws Exception {
+        IClusterStateManager csm = appCtx.getClusterStateManager();
+        Set<String> allNodes = csm.getParticipantNodes(true);
+        Set<String> selectedNodes = new LinkedHashSet<>();
+        String hintValue = 
hints.get(DatasetHints.DatasetNodegroupCardinalityHint.NAME);
+        if (hintValue == null) {
+            selectedNodes.addAll(allNodes);
+        } else {
+            int nodegroupCardinality;
+            final Pair<Boolean, String> validation = 
DatasetHints.validate(appCtx,
+                    DatasetHints.DatasetNodegroupCardinalityHint.NAME,
+                    
hints.get(DatasetHints.DatasetNodegroupCardinalityHint.NAME));
+            boolean valid = validation.first;
+            if (!valid) {
+                throw new CompilationException("Incorrect use of hint '"
+                        + DatasetHints.DatasetNodegroupCardinalityHint.NAME + 
"': " + validation.second);
+            } else {
+                nodegroupCardinality = 
Integer.parseInt(hints.get(DatasetHints.DatasetNodegroupCardinalityHint.NAME));
+            }
+            List<String> allNodeList = new ArrayList<>(allNodes);
+            Collections.shuffle(allNodeList);
+            selectedNodes.addAll(allNodeList.subList(0, nodegroupCardinality));
+        }
+        // Creates the associated node group for the dataset.
+        return DatasetUtil.createNodeGroupForNewDataset(dataverseName, 
datasetName, selectedNodes, metadataProvider);
+    }
 }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2093
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Id4ab4d39ac8d00e4ea94da83b0c09b3869fb0131
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Xikui Wang <[email protected]>

Reply via email to