Xikui Wang has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1591

Change subject: Feed Policy Refactoring
......................................................................

Feed Policy Refactoring

1. Fix the framesize inconsistency in feed dataflow.
2. Add a runtime test case for create feed with policy.
3. Fix the FeedTupleForwarder flush() logic. If there is no record, it
   will not flush. This will avoid empty frame bug.
4. Remove FeedRuntimeInputHander aggresively flush.
5. Refactor FeedPolicyAccessor and BuiltinPolices. Now we only have
   spill and discard polices.
6. Remove PolicyEnforcer. Merge the functionality into
   FeedPolicyAccessor.
7. Revise SocketServerInputStream. Make the expected exception more
   friendly.

Change-Id: Ibc10139925cfedee66d1263990ba80b94675f182
---
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/create-feed-with-policy/create-feed-with-policy.1.ddl.aql
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/create-feed-with-policy/create-feed-with-policy.2.update.aql
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/create-feed-with-policy/create-feed-with-policy.3.server.aql
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/create-feed-with-policy/create-feed-with-policy.4.sleep.aql
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/create-feed-with-policy/create-feed-with-policy.5.update.aql
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/create-feed-with-policy/create-feed-with-policy.6.query.aql
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/create-feed-with-policy/create-feed-with-policy.7.server.aql
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/create-feed-with-policy/create-feed-with-policy.8.ddl.aql
M 
asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.4.adm
M 
asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.5.adm
A 
asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/create-feed-with-policy/create-feed-with-policy.1.adm
M 
asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_03/feeds_03.1.adm
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/policy/FeedPolicyAccessor.java
D 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/policy/FeedPolicyEnforcer.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedConstants.java
M 
asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/BuiltinFeedPolicies.java
24 files changed, 311 insertions(+), 260 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/91/1591/1

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
index 4ea524a..7409cd0 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
@@ -32,6 +32,7 @@
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.active.message.ActiveManagerMessage;
 import org.apache.asterix.app.translator.DefaultStatementExecutorFactory;
+import org.apache.asterix.common.config.CompilerProperties;
 import org.apache.asterix.common.context.IStorageComponentProvider;
 import 
org.apache.asterix.common.dataflow.LSMTreeInsertDeleteOperatorDescriptor;
 import org.apache.asterix.common.exceptions.ACIDException;
@@ -109,13 +110,15 @@
  */
 public class FeedOperations {
 
+    private static final CompilerProperties compilerProperties = 
AppContextInfo.INSTANCE.getCompilerProperties();
+
     private FeedOperations() {
     }
 
     private static Pair<JobSpecification, IAdapterFactory> 
buildFeedIntakeJobSpec(Feed feed,
             MetadataProvider metadataProvider, FeedPolicyAccessor 
policyAccessor) throws Exception {
         JobSpecification spec = RuntimeUtils.createJobSpecification();
-        spec.setFrameSize(FeedConstants.JobConstants.DEFAULT_FRAME_SIZE);
+        spec.setFrameSize(compilerProperties.getFrameSize());
         IAdapterFactory adapterFactory;
         IOperatorDescriptor feedIngestor;
         AlgebricksPartitionConstraint ingesterPc;
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/create-feed-with-policy/create-feed-with-policy.1.ddl.aql
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/create-feed-with-policy/create-feed-with-policy.1.ddl.aql
new file mode 100644
index 0000000..4e21323
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/create-feed-with-policy/create-feed-with-policy.1.ddl.aql
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Create a socket feed with a client that pushes
+ * 10 records. The feed is connected to a dataset that is then
+ * queried for the data.
+ * Expected Res : Success
+ * Date         : 16th Mar 2017
+ */
+drop dataverse experiments if exists;
+create dataverse experiments;
+use dataverse experiments;
+
+create type TwitterUserType as closed {
+    screen-name: string,
+    lang: string,
+    friends_count: int32,
+    statuses_count: int32,
+    name: string,
+    followers_count: int32
+}
+
+create type TweetMessageType as closed {
+    tweetid: string,
+    tweetid-copy:string,
+    user: TwitterUserType,
+    sender-location: point,
+    send-time: datetime,
+    send-time-copy:datetime,
+    referred-topics: {{ string }},
+    message-text: string
+}
+
+create dataset Tweets(TweetMessageType) primary key tweetid;
+
+create feed TweetFeed using socket_adapter
+(
+    ("sockets"="127.0.0.1:10001"),
+    ("address-type"="IP"),
+    ("type-name"="TweetMessageType"),
+    ("format"="adm")
+);
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/create-feed-with-policy/create-feed-with-policy.2.update.aql
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/create-feed-with-policy/create-feed-with-policy.2.update.aql
new file mode 100644
index 0000000..40ebc90
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/create-feed-with-policy/create-feed-with-policy.2.update.aql
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Create a socket feed with a client that pushes
+ * 10 records. The feed is connected to a dataset that is then
+ * queried for the data.
+ * Expected Res : Success
+ * Date         : 16th Mar 2017
+ */
+
+use dataverse experiments;
+set wait-for-completion-feed "false";
+
+connect feed TweetFeed to dataset Tweets using policy Discard;
+
+start feed TweetFeed;
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/create-feed-with-policy/create-feed-with-policy.3.server.aql
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/create-feed-with-policy/create-feed-with-policy.3.server.aql
new file mode 100644
index 0000000..c78da31
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/create-feed-with-policy/create-feed-with-policy.3.server.aql
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Create a socket feed with a client that pushes
+ * 10 records. The feed is connected to a dataset that is then
+ * queried for the data.
+ * Expected Res : Success
+ * Date         : 16th Mar 2017
+ */
+start client 10001 file-client 127.0.0.1 
../asterix-app/data/twitter/tw_messages.adm 500 50 1000
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/create-feed-with-policy/create-feed-with-policy.4.sleep.aql
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/create-feed-with-policy/create-feed-with-policy.4.sleep.aql
new file mode 100644
index 0000000..7c1ab60
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/create-feed-with-policy/create-feed-with-policy.4.sleep.aql
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Create a socket feed with a client that pushes
+ * 10 records. The feed is connected to a dataset that is then
+ * queried for the data.
+ * Expected Res : Success
+ * Date         : 16th Mar 2017
+ */
+5000
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/create-feed-with-policy/create-feed-with-policy.5.update.aql
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/create-feed-with-policy/create-feed-with-policy.5.update.aql
new file mode 100644
index 0000000..167c507
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/create-feed-with-policy/create-feed-with-policy.5.update.aql
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Create a socket feed with a client that pushes
+ * 10 records. The feed is connected to a dataset that is then
+ * queried for the data.
+ * Expected Res : Success
+ * Date         : 16th Mar 2017
+ */
+
+use dataverse experiments;
+stop feed TweetFeed;
+disconnect feed TweetFeed from dataset Tweets;
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/create-feed-with-policy/create-feed-with-policy.6.query.aql
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/create-feed-with-policy/create-feed-with-policy.6.query.aql
new file mode 100644
index 0000000..b67042d
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/create-feed-with-policy/create-feed-with-policy.6.query.aql
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Create a socket feed with a client that pushes
+ * 10 records. The feed is connected to a dataset that is then
+ * queried for the data.
+ * Expected Res : Success
+ * Date         : 16th Mar 2017
+ */
+
+use dataverse experiments;
+
+for $x in dataset Tweets
+order by $x.tweetid
+return $x;
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/create-feed-with-policy/create-feed-with-policy.7.server.aql
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/create-feed-with-policy/create-feed-with-policy.7.server.aql
new file mode 100644
index 0000000..cc02ce6
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/create-feed-with-policy/create-feed-with-policy.7.server.aql
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Create a socket feed with a client that pushes
+ * 10 records. The feed is connected to a dataset that is then
+ * queried for the data.
+ * Expected Res : Success
+ * Date         : 16th Mar 2017
+ */
+
+stop 10001
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/create-feed-with-policy/create-feed-with-policy.8.ddl.aql
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/create-feed-with-policy/create-feed-with-policy.8.ddl.aql
new file mode 100644
index 0000000..406fbbb
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/create-feed-with-policy/create-feed-with-policy.8.ddl.aql
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Create a socket feed with a client that pushes
+ * 10 records. The feed is connected to a dataset that is then
+ * queried for the data.
+ * Expected Res : Success
+ * Date         : 16th Mar 2017
+ */
+
+use dataverse experiments;
+drop dataverse experiments;
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.4.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.4.adm
index cbdc907..5ae5aef 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.4.adm
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.4.adm
@@ -1 +1 @@
-788
+800
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.5.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.5.adm
index cbdc907..5ae5aef 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.5.adm
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.5.adm
@@ -1 +1 @@
-788
+800
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/create-feed-with-policy/create-feed-with-policy.1.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/create-feed-with-policy/create-feed-with-policy.1.adm
new file mode 100644
index 0000000..6466feb
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/create-feed-with-policy/create-feed-with-policy.1.adm
@@ -0,0 +1,10 @@
+{ "tweetid": "1", "tweetid-copy": "1", "user": { "screen-name": 
"RollandEckhardstein#211", "lang": "en", "friends_count": 3657079, 
"statuses_count": 268, "name": "Rolland Eckhardstein", "followers_count": 
3311368 }, "sender-location": point("42.13,80.43"), "send-time": 
datetime("2005-12-05T21:06:41.000Z"), "send-time-copy": 
datetime("2005-12-05T21:06:41.000Z"), "referred-topics": {{ "samsung", "plan" 
}}, "message-text": " love samsung the plan is amazing" }
+{ "tweetid": "10", "tweetid-copy": "10", "user": { "screen-name": 
"Rolldstein#211", "lang": "en", "friends_count": 3657079, "statuses_count": 
268, "name": "Rolland Eckhardstful", "followers_count": 3311368 }, 
"sender-location": point("46.94,93.98"), "send-time": 
datetime("2011-04-07T14:08:46.000Z"), "send-time-copy": 
datetime("2011-04-07T14:08:46.000Z"), "referred-topics": {{ "t-mobile", 
"signal" }}, "message-text": " like t-mobile the signal is good" }
+{ "tweetid": "2", "tweetid-copy": "2", "user": { "screen-name": 
"RollandEckhardstein#211", "lang": "en", "friends_count": 3657079, 
"statuses_count": 268, "name": "David Eckhardstein", "followers_count": 3311368 
}, "sender-location": point("28.86,70.44"), "send-time": 
datetime("2007-08-15T06:44:17.000Z"), "send-time-copy": 
datetime("2007-08-15T06:44:17.000Z"), "referred-topics": {{ "sprint", 
"voice-clarity" }}, "message-text": " like sprint its voice-clarity is 
mind-blowing" }
+{ "tweetid": "3", "tweetid-copy": "3", "user": { "screen-name": 
"RollandEckhard#500", "lang": "en", "friends_count": 3657079, "statuses_count": 
268, "name": "Rolland Hetfield", "followers_count": 3311368 }, 
"sender-location": point("39.84,86.48"), "send-time": 
datetime("2008-12-24T00:07:04.000Z"), "send-time-copy": 
datetime("2008-12-24T00:07:04.000Z"), "referred-topics": {{ "verizon", 
"voice-command" }}, "message-text": " can't stand verizon its voice-command is 
terrible:(" }
+{ "tweetid": "4", "tweetid-copy": "4", "user": { "screen-name": 
"RollandEckhardstein#221", "lang": "en", "friends_count": 3657079, 
"statuses_count": 268, "name": "Rolland Eckhardstinz", "followers_count": 
3311368 }, "sender-location": point("27.67,87.32"), "send-time": 
datetime("2007-02-05T16:39:13.000Z"), "send-time-copy": 
datetime("2007-02-05T16:39:13.000Z"), "referred-topics": {{ "t-mobile", 
"customer-service" }}, "message-text": " love t-mobile its customer-service is 
mind-blowing" }
+{ "tweetid": "5", "tweetid-copy": "5", "user": { "screen-name": 
"RollandEcstein#211", "lang": "en", "friends_count": 3657079, "statuses_count": 
268, "name": "Rolland Eckhardst", "followers_count": 3311368 }, 
"sender-location": point("27.3,92.77"), "send-time": 
datetime("2010-09-12T06:15:28.000Z"), "send-time-copy": 
datetime("2010-09-12T06:15:28.000Z"), "referred-topics": {{ "t-mobile", 
"customization" }}, "message-text": " like t-mobile the customization is 
amazing:)" }
+{ "tweetid": "6", "tweetid-copy": "6", "user": { "screen-name": 
"Rollkhardstein#211", "lang": "en", "friends_count": 3657079, "statuses_count": 
268, "name": "Kirk Hammette ", "followers_count": 3311368 }, "sender-location": 
point("45.62,84.78"), "send-time": datetime("2012-01-23T06:23:13.000Z"), 
"send-time-copy": datetime("2012-01-23T06:23:13.000Z"), "referred-topics": {{ 
"iphone", "network" }}, "message-text": " like iphone its network is awesome:)" 
}
+{ "tweetid": "7", "tweetid-copy": "7", "user": { "screen-name": 
"andEckhardstein#211", "lang": "en", "friends_count": 3657079, 
"statuses_count": 268, "name": "Rolland khardstein", "followers_count": 3311368 
}, "sender-location": point("44.12,81.46"), "send-time": 
datetime("2012-02-17T17:30:26.000Z"), "send-time-copy": 
datetime("2012-02-17T17:30:26.000Z"), "referred-topics": {{ "t-mobile", 
"network" }}, "message-text": " hate t-mobile the network is bad" }
+{ "tweetid": "8", "tweetid-copy": "8", "user": { "screen-name": 
"Rolltein#211", "lang": "en", "friends_count": 3657079, "statuses_count": 268, 
"name": "Ron Eckhardstein", "followers_count": 3311368 }, "sender-location": 
point("36.86,90.71"), "send-time": datetime("2009-03-12T13:18:04.000Z"), 
"send-time-copy": datetime("2009-03-12T13:18:04.000Z"), "referred-topics": {{ 
"at&t", "touch-screen" }}, "message-text": " dislike at&t its touch-screen is 
OMG" }
+{ "tweetid": "9", "tweetid-copy": "9", "user": { "screen-name": 
"Roldstein#211", "lang": "en", "friends_count": 3657079, "statuses_count": 268, 
"name": "Rolland Eckdstein", "followers_count": 3311368 }, "sender-location": 
point("29.07,97.05"), "send-time": datetime("2012-08-15T20:19:46.000Z"), 
"send-time-copy": datetime("2012-08-15T20:19:46.000Z"), "referred-topics": {{ 
"verizon", "speed" }}, "message-text": " hate verizon its speed is bad" }
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_03/feeds_03.1.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_03/feeds_03.1.adm
index 573793f..cbde66d 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_03/feeds_03.1.adm
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_03/feeds_03.1.adm
@@ -1 +1 @@
-{ "DataverseName": "feeds", "FeedName": "TweetFeed", "DatasetName": "Tweets", 
"ReturnType": "TweetType: closed {\n  id: string,\n  username: string,\n  
location: string,\n  text: string,\n  timestamp: string\n}\n", 
"AppliedFunctions": {{ "feed_processor" }}, "PolicyName": "BasicFT" }
+{ "DataverseName": "feeds", "FeedName": "TweetFeed", "DatasetName": "Tweets", 
"ReturnType": "TweetType: closed {\n  id: string,\n  username: string,\n  
location: string,\n  text: string,\n  timestamp: string\n}\n", 
"AppliedFunctions": {{ "feed_processor" }}, "PolicyName": "Basic" }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
index 4177ea6..7f3722e 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
@@ -114,6 +114,8 @@
     }
 
     public void flush() throws HyracksDataException {
-        appender.flush(writer);
+        if (appender.getTupleCount() > 0) {
+            appender.flush(writer);
+        }
     }
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
index 9982477..18257b2 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
@@ -20,6 +20,7 @@
 
 import java.nio.ByteBuffer;
 import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -72,7 +73,6 @@
             IFrameWriter writer, FeedPolicyAccessor fpa, FrameTupleAccessor 
fta, ConcurrentFramePool framePool)
             throws HyracksDataException {
         this.writer = writer;
-
         this.spiller = fpa.spillToDiskOnCongestion()
                 ? new FrameSpiller(ctx,
                         connectionId.getFeedId() + "_" + 
connectionId.getDatasetName() + "_"
@@ -84,7 +84,7 @@
         this.framePool = framePool;
         this.inbox = new LinkedBlockingDeque<>();
         this.consumer = new FrameTransporter();
-        this.consumerThread = new Thread(consumer);
+        this.consumerThread = new Thread(consumer, 
"FeedRuntimeInputHandler-FrameTransporter");
         this.consumerThread.start();
         this.initialFrameSize = ctx.getInitialFrameSize();
         this.frameAction = new FrameAction();
@@ -475,7 +475,6 @@
                                 frame = spiller.next();
                             }
                         }
-                        writer.flush();
                         // At this point. We consumed all memory and spilled
                         // We can't assume the next will be in memory. what if 
there is 0 memory?
                         synchronized (mutex) {
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/policy/FeedPolicyAccessor.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/policy/FeedPolicyAccessor.java
index cb722bb..becf41d 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/policy/FeedPolicyAccessor.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/policy/FeedPolicyAccessor.java
@@ -30,33 +30,12 @@
 
     /**
      * --------------------------
-     * failure configuration
-     * --------------------------
-     **/
-
-    /** continue feed ingestion after a soft (runtime) failure **/
-    public static final String SOFT_FAILURE_CONTINUE = "soft.failure.continue";
-
-    /** log failed tuple to an asterixdb dataset for future reference **/
-    public static final String SOFT_FAILURE_LOG_DATA = "soft.failure.log.data";
-
-    /** continue feed ingestion after loss of one or more machines (hardware 
failure) **/
-    public static final String HARDWARE_FAILURE_CONTINUE = 
"hardware.failure.continue";
-
-    /** auto-start a loser feed when the asterixdb instance is restarted **/
-    public static final String CLUSTER_REBOOT_AUTO_RESTART = 
"cluster.reboot.auto.restart";
-
-    /** framework provides guarantee that each received feed record will be 
processed through the ingestion pipeline at least once **/
-    public static final String AT_LEAST_ONE_SEMANTICS = 
"atleast.once.semantics";
-
-    /**
-     * --------------------------
      * flow control configuration
      * --------------------------
      **/
 
     /** enable buffering in feeds **/
-    public static final String BUFFERING_ENABLED = "buffering.enabled";
+    public static final String FLOWCONTROL_ENABLED = "flowcontrol.enabled";
 
     /** spill excess tuples to disk if an operator cannot process incoming 
data at its arrival rate **/
     public static final String SPILL_TO_DISK_ON_CONGESTION = 
"spill.to.disk.on.congestion";
@@ -70,17 +49,8 @@
     /** maximum fraction of ingested data that can be discarded **/
     public static final String MAX_FRACTION_DISCARD = "max.fraction.discard";
 
-    /** maximum end-to-end delay/latency in persisting a tuple through the 
feed ingestion pipeline **/
-    public static final String MAX_DELAY_RECORD_PERSISTENCE = 
"max.delay.record.persistence";
-
-    /** rate limit the inflow of tuples in accordance with the maximum 
capacity of the pipeline **/
-    public static final String THROTTLING_ENABLED = "throttling.enabled";
-
     /** elasticity **/
     public static final String ELASTIC = "elastic";
-
-    /** statistics **/
-    public static final String TIME_TRACKING = "time.tracking";
 
     /** logging of statistics **/
     public static final String LOGGING_STATISTICS = "logging.statistics";
@@ -101,31 +71,9 @@
         this.feedPolicy = feedPolicy;
     }
 
-    /** Failure recover/reporting **/
-
-    public boolean logDataOnSoftFailure() {
-        return getBooleanPropertyValue(SOFT_FAILURE_LOG_DATA, false);
-    }
-
-    public boolean continueOnSoftFailure() {
-        return getBooleanPropertyValue(SOFT_FAILURE_CONTINUE, false);
-    }
-
-    public boolean continueOnHardwareFailure() {
-        return getBooleanPropertyValue(HARDWARE_FAILURE_CONTINUE, false);
-    }
-
-    public boolean autoRestartOnClusterReboot() {
-        return getBooleanPropertyValue(CLUSTER_REBOOT_AUTO_RESTART, false);
-    }
-
-    public boolean atleastOnceSemantics() {
-        return getBooleanPropertyValue(AT_LEAST_ONE_SEMANTICS, false);
-    }
-
     /** flow control **/
-    public boolean bufferingEnabled() {
-        return getBooleanPropertyValue(BUFFERING_ENABLED, false);
+    public boolean flowControlEnabled() {
+        return getBooleanPropertyValue(FLOWCONTROL_ENABLED, false);
     }
 
     public boolean spillToDiskOnCongestion() {
@@ -136,10 +84,6 @@
         return getMaxFractionDiscard() > 0;
     }
 
-    public boolean throttlingEnabled() {
-        return getBooleanPropertyValue(THROTTLING_ENABLED, false);
-    }
-
     public long getMaxSpillOnDisk() {
         return getLongPropertyValue(MAX_SPILL_SIZE_ON_DISK, NO_LIMIT);
     }
@@ -148,28 +92,9 @@
         return getFloatPropertyValue(MAX_FRACTION_DISCARD, 0);
     }
 
-    public long getMaxDelayRecordPersistence() {
-        return getLongPropertyValue(MAX_DELAY_RECORD_PERSISTENCE, 
Long.MAX_VALUE);
-    }
-
-    /** Elasticity **/
-    public boolean isElastic() {
-        return getBooleanPropertyValue(ELASTIC, false);
-    }
-
-    /** Statistics **/
-    public boolean isTimeTrackingEnabled() {
-        return getBooleanPropertyValue(TIME_TRACKING, false);
-    }
-
-    /** Logging of statistics **/
-    public boolean isLoggingStatisticsEnabled() {
-        return getBooleanPropertyValue(LOGGING_STATISTICS, false);
-    }
-
     private boolean getBooleanPropertyValue(String key, boolean defValue) {
         String v = feedPolicy.get(key);
-        return v == null ? false : Boolean.valueOf(v);
+        return v == null ? defValue : Boolean.valueOf(v);
     }
 
     private long getLongPropertyValue(String key, long defValue) {
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/policy/FeedPolicyEnforcer.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/policy/FeedPolicyEnforcer.java
deleted file mode 100644
index 3483da4..0000000
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/policy/FeedPolicyEnforcer.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.policy;
-
-import java.util.Map;
-
-import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public class FeedPolicyEnforcer {
-
-    private final FeedConnectionId connectionId;
-    private final FeedPolicyAccessor policyAccessor;
-
-    public FeedPolicyEnforcer(FeedConnectionId feedConnectionId, Map<String, 
String> feedPolicy) {
-        this.connectionId = feedConnectionId;
-        this.policyAccessor = new FeedPolicyAccessor(feedPolicy);
-    }
-
-    public boolean continueIngestionPostSoftwareFailure(HyracksDataException 
e) throws HyracksDataException {
-        return policyAccessor.continueOnSoftFailure();
-    }
-
-    public FeedPolicyAccessor getFeedPolicyAccessor() {
-        return policyAccessor;
-    }
-
-    public FeedConnectionId getFeedId() {
-        return connectionId;
-    }
-
-}
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java
index 964508f..ea55b5b 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java
@@ -71,7 +71,8 @@
             }
             read = connectionStream.read(b, off, len);
         } catch (IOException e) {
-            e.printStackTrace();
+            // exception is expected when no connection available
+            LOGGER.info("Exhausted all pending connections. Waiting for new 
ones to come.");
             read = -1;
         }
         while (read < 0) {
@@ -155,11 +156,10 @@
     @Override
     public boolean handleException(Throwable th) {
         try {
-            accept();
+            return accept();
         } catch (IOException e) {
             LOGGER.warn("Failed accepting more connections", e);
             return false;
         }
-        return true;
     }
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
index bdc11f5..f289361 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
@@ -61,7 +61,7 @@
             ActiveRuntimeId runtimeId =
                     new ActiveRuntimeId(connectionId.getFeedId(), 
FeedRuntimeType.COLLECT.toString(), partition);
             FrameTupleAccessor tAccessor = new FrameTupleAccessor(recordDesc);
-            if (policyAccessor.bufferingEnabled()) {
+            if (policyAccessor.flowControlEnabled()) {
                 writer = new FeedRuntimeInputHandler(ctx, connectionId, 
runtimeId, writer, policyAccessor, tAccessor,
                         activeManager.getFramePool());
             } else {
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
index fbdbece..6732b15 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
@@ -30,7 +30,6 @@
 import org.apache.asterix.external.feed.dataflow.SyncFeedRuntimeInputHandler;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
-import org.apache.asterix.external.feed.policy.FeedPolicyEnforcer;
 import org.apache.asterix.external.util.FeedUtils;
 import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
 import org.apache.hyracks.api.comm.VSizeFrame;
@@ -55,10 +54,10 @@
     private AbstractUnaryInputUnaryOutputOperatorNodePushable coreOperator;
 
     /**
-     * A policy enforcer that ensures dynamic decisions for a feed are taken
+     * A policy accessor that ensures dynamic decisions for a feed are taken
      * in accordance with the associated ingestion policy
      **/
-    private FeedPolicyEnforcer policyEnforcer;
+    private FeedPolicyAccessor policyAccessor;
 
     /**
      * A unique identifier for the feed instance. A feed instance represents
@@ -101,7 +100,7 @@
         this.ctx = ctx;
         this.coreOperator = 
(AbstractUnaryInputUnaryOutputOperatorNodePushable) ((IActivity) coreOperator)
                 .createPushRuntime(ctx, recordDescProvider, partition, 
nPartitions);
-        this.policyEnforcer = new FeedPolicyEnforcer(feedConnectionId, 
feedPolicyProperties);
+        this.policyAccessor = new FeedPolicyAccessor(feedPolicyProperties);
         this.partition = partition;
         this.connectionId = feedConnectionId;
         this.feedManager = (ActiveManager) ((IAppRuntimeContext) 
ctx.getJobletContext().getServiceContext()
@@ -127,9 +126,9 @@
 
     private void initializeNewFeedRuntime(ActiveRuntimeId runtimeId) throws 
Exception {
         fta = new 
FrameTupleAccessor(recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(),
 0));
-        FeedPolicyAccessor fpa = policyEnforcer.getFeedPolicyAccessor();
+        FeedPolicyAccessor fpa = policyAccessor;
         coreOperator.setOutputFrameWriter(0, writer, recordDesc);
-        if (fpa.bufferingEnabled()) {
+        if (fpa.flowControlEnabled()) {
             writer = new FeedRuntimeInputHandler(ctx, connectionId, runtimeId, 
coreOperator, fpa, fta,
                     feedManager.getFramePool());
         } else {
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
index 0bb27db..f2193af 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
@@ -30,7 +30,7 @@
 import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
 import org.apache.asterix.external.feed.dataflow.SyncFeedRuntimeInputHandler;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.policy.FeedPolicyEnforcer;
+import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
 import org.apache.asterix.external.util.FeedUtils;
 import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
 import org.apache.hyracks.api.comm.VSizeFrame;
@@ -52,10 +52,10 @@
     private AbstractUnaryInputUnaryOutputOperatorNodePushable insertOperator;
 
     /**
-     * A policy enforcer that ensures dyanmic decisions for a feed are taken
+     * A policy accessor that ensures dyanmic decisions for a feed are taken
      * in accordance with the associated ingestion policy
      **/
-    private final FeedPolicyEnforcer policyEnforcer;
+    private final FeedPolicyAccessor policyAccessor;
 
     /**
      * A unique identifier for the feed instance. A feed instance represents
@@ -94,7 +94,7 @@
         this.ctx = ctx;
         this.insertOperator = 
(AbstractUnaryInputUnaryOutputOperatorNodePushable) ((IActivity) coreOperator)
                 .createPushRuntime(ctx, recordDescProvider, partition, 
nPartitions);
-        this.policyEnforcer = new FeedPolicyEnforcer(feedConnectionId, 
feedPolicyProperties);
+        this.policyAccessor = new FeedPolicyAccessor(feedPolicyProperties);
         this.partition = partition;
         this.connectionId = feedConnectionId;
         this.feedManager = (ActiveManager) ((IAppRuntimeContext) 
ctx.getJobletContext().getServiceContext()
@@ -130,9 +130,9 @@
                 return;
             }
         }
-        if (policyEnforcer.getFeedPolicyAccessor().bufferingEnabled()) {
+        if (policyAccessor.flowControlEnabled()) {
             writer = new FeedRuntimeInputHandler(ctx, connectionId, runtimeId, 
insertOperator,
-                    policyEnforcer.getFeedPolicyAccessor(), fta, 
feedManager.getFramePool());
+                    policyAccessor, fta, feedManager.getFramePool());
         } else {
             writer = new SyncFeedRuntimeInputHandler(ctx, insertOperator, fta);
         }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedConstants.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedConstants.java
index cc21360..9538711 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedConstants.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedConstants.java
@@ -69,8 +69,4 @@
     public static final class NamingConstants {
         public static final String LIBRARY_NAME_SEPARATOR = "#";
     }
-
-    public static class JobConstants {
-        public static final int DEFAULT_FRAME_SIZE = 8192;
-    }
 }
diff --git 
a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
 
b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
index d407b8a..3ce5be4 100644
--- 
a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
+++ 
b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
@@ -100,7 +100,7 @@
     private static FeedPolicyAccessor createFeedPolicyAccessor(boolean spill, 
boolean discard, long spillBudget,
             float discardFraction) {
         FeedPolicyAccessor fpa = Mockito.mock(FeedPolicyAccessor.class);
-        Mockito.when(fpa.bufferingEnabled()).thenReturn(true);
+        Mockito.when(fpa.flowControlEnabled()).thenReturn(true);
         Mockito.when(fpa.spillToDiskOnCongestion()).thenReturn(spill);
         Mockito.when(fpa.getMaxSpillOnDisk()).thenReturn(spillBudget);
         Mockito.when(fpa.discardOnCongestion()).thenReturn(discard);
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/BuiltinFeedPolicies.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/BuiltinFeedPolicies.java
index 57284b0..dfa00ab 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/BuiltinFeedPolicies.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/BuiltinFeedPolicies.java
@@ -27,26 +27,18 @@
 
 public class BuiltinFeedPolicies {
 
-    public static final FeedPolicyEntity BRITTLE = initializeBrittlePolicy();
-
     public static final FeedPolicyEntity BASIC = initializeBasicPolicy();
-
-    public static final FeedPolicyEntity BASIC_FT = initializeBasicFTPolicy();
-
-    public static final FeedPolicyEntity ADVANCED_FT = 
initializeAdvancedFTPolicy();
 
     public static final FeedPolicyEntity ADVANCED_FT_DISCARD = 
initializeAdvancedFTDiscardPolicy();
 
     public static final FeedPolicyEntity ADVANCED_FT_SPILL = 
initializeAdvancedFTSpillPolicy();
 
-    public static final FeedPolicyEntity ADVANCED_FT_THROTTLE = 
initializeAdvancedFTThrottlePolicy();
-
     public static final FeedPolicyEntity ELASTIC = 
initializeAdvancedFTElasticPolicy();
 
-    public static final FeedPolicyEntity[] policies = new FeedPolicyEntity[] { 
BRITTLE, BASIC, BASIC_FT, ADVANCED_FT,
-            ADVANCED_FT_DISCARD, ADVANCED_FT_SPILL, ADVANCED_FT_THROTTLE, 
ELASTIC };
+    public static final FeedPolicyEntity[] policies = new FeedPolicyEntity[] { 
BASIC, ADVANCED_FT_DISCARD,
+            ADVANCED_FT_SPILL, ELASTIC };
 
-    public static final FeedPolicyEntity DEFAULT_POLICY = BASIC_FT;
+    public static final FeedPolicyEntity DEFAULT_POLICY = BASIC;
 
     public static final String CONFIG_FEED_POLICY_KEY = "policy";
 
@@ -59,132 +51,48 @@
         return null;
     }
 
-    //Brittle
-    private static FeedPolicyEntity initializeBrittlePolicy() {
-        Map<String, String> policyParams = new HashMap<String, String>();
-        policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_CONTINUE, "false");
-        policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_LOG_DATA, "false");
-        policyParams.put(FeedPolicyAccessor.HARDWARE_FAILURE_CONTINUE, 
"false");
-        policyParams.put(FeedPolicyAccessor.CLUSTER_REBOOT_AUTO_RESTART, 
"false");
-        policyParams.put(FeedPolicyAccessor.ELASTIC, "false");
-        policyParams.put(FeedPolicyAccessor.TIME_TRACKING, "false");
-        policyParams.put(FeedPolicyAccessor.AT_LEAST_ONE_SEMANTICS, "false");
-
-        String description = "Brittle";
-        return new FeedPolicyEntity(MetadataConstants.METADATA_DATAVERSE_NAME, 
"Brittle", description, policyParams);
-    }
-
     //Basic
     private static FeedPolicyEntity initializeBasicPolicy() {
         Map<String, String> policyParams = new HashMap<String, String>();
-        policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_CONTINUE, "false");
-        policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_LOG_DATA, "true");
-        policyParams.put(FeedPolicyAccessor.CLUSTER_REBOOT_AUTO_RESTART, 
"true");
         policyParams.put(FeedPolicyAccessor.ELASTIC, "false");
-        policyParams.put(FeedPolicyAccessor.TIME_TRACKING, "false");
-        policyParams.put(FeedPolicyAccessor.AT_LEAST_ONE_SEMANTICS, "false");
 
         String description = "Basic";
         return new FeedPolicyEntity(MetadataConstants.METADATA_DATAVERSE_NAME, 
"Basic", description, policyParams);
     }
 
-    // BasicFT
-    private static FeedPolicyEntity initializeBasicFTPolicy() {
-        Map<String, String> policyParams = new HashMap<String, String>();
-        policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_CONTINUE, "true");
-        policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_LOG_DATA, "true");
-        policyParams.put(FeedPolicyAccessor.HARDWARE_FAILURE_CONTINUE, 
"false");
-        policyParams.put(FeedPolicyAccessor.CLUSTER_REBOOT_AUTO_RESTART, 
"true");
-        policyParams.put(FeedPolicyAccessor.ELASTIC, "false");
-        policyParams.put(FeedPolicyAccessor.SPILL_TO_DISK_ON_CONGESTION, 
"false");
-        policyParams.put(FeedPolicyAccessor.MAX_FRACTION_DISCARD, "1");
-        policyParams.put(FeedPolicyAccessor.TIME_TRACKING, "false");
-        policyParams.put(FeedPolicyAccessor.AT_LEAST_ONE_SEMANTICS, "false");
-        policyParams.put(FeedPolicyAccessor.THROTTLING_ENABLED, "false");
-
-        String description = "Basic Monitored Fault-Tolerant";
-        return new FeedPolicyEntity(MetadataConstants.METADATA_DATAVERSE_NAME, 
"BasicFT", description, policyParams);
-    }
-
-    // AdvancedFT
-    private static FeedPolicyEntity initializeAdvancedFTPolicy() {
-        Map<String, String> policyParams = new HashMap<String, String>();
-        policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_CONTINUE, "true");
-        policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_LOG_DATA, "true");
-        policyParams.put(FeedPolicyAccessor.HARDWARE_FAILURE_CONTINUE, "true");
-        policyParams.put(FeedPolicyAccessor.CLUSTER_REBOOT_AUTO_RESTART, 
"true");
-        policyParams.put(FeedPolicyAccessor.ELASTIC, "false");
-        policyParams.put(FeedPolicyAccessor.TIME_TRACKING, "true");
-        policyParams.put(FeedPolicyAccessor.AT_LEAST_ONE_SEMANTICS, "true");
-
-        String description = "Basic Monitored Fault-Tolerant with at least 
once semantics";
-        return new FeedPolicyEntity(MetadataConstants.METADATA_DATAVERSE_NAME, 
"AdvancedFT", description, policyParams);
-    }
-
-    // AdvancedFT_Discard
+    // Discard
     private static FeedPolicyEntity initializeAdvancedFTDiscardPolicy() {
         Map<String, String> policyParams = new HashMap<String, String>();
-        policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_CONTINUE, "true");
-        policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_LOG_DATA, "true");
-        policyParams.put(FeedPolicyAccessor.HARDWARE_FAILURE_CONTINUE, "true");
-        policyParams.put(FeedPolicyAccessor.CLUSTER_REBOOT_AUTO_RESTART, 
"true");
         policyParams.put(FeedPolicyAccessor.ELASTIC, "false");
+        policyParams.put(FeedPolicyAccessor.FLOWCONTROL_ENABLED, "true");
         policyParams.put(FeedPolicyAccessor.MAX_SPILL_SIZE_ON_DISK, "false");
         policyParams.put(FeedPolicyAccessor.MAX_FRACTION_DISCARD, "100");
-        policyParams.put(FeedPolicyAccessor.TIME_TRACKING, "false");
         policyParams.put(FeedPolicyAccessor.LOGGING_STATISTICS, "true");
 
-        String description = "AdvancedFT 100% Discard during congestion";
-        return new FeedPolicyEntity(MetadataConstants.METADATA_DATAVERSE_NAME, 
"AdvancedFT_Discard", description,
+        String description = "FlowControl 100% Discard during congestion";
+        return new FeedPolicyEntity(MetadataConstants.METADATA_DATAVERSE_NAME, 
"Discard", description,
                 policyParams);
     }
 
-    // AdvancedFT_Spill
+    // Spill
     private static FeedPolicyEntity initializeAdvancedFTSpillPolicy() {
         Map<String, String> policyParams = new HashMap<String, String>();
-        policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_CONTINUE, "true");
-        policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_LOG_DATA, "true");
-        policyParams.put(FeedPolicyAccessor.HARDWARE_FAILURE_CONTINUE, "true");
-        policyParams.put(FeedPolicyAccessor.CLUSTER_REBOOT_AUTO_RESTART, 
"true");
         policyParams.put(FeedPolicyAccessor.ELASTIC, "false");
+        policyParams.put(FeedPolicyAccessor.FLOWCONTROL_ENABLED, "true");
         policyParams.put(FeedPolicyAccessor.SPILL_TO_DISK_ON_CONGESTION, "" + 
Boolean.TRUE);
         policyParams.put(FeedPolicyAccessor.MAX_SPILL_SIZE_ON_DISK, "" + 
FeedPolicyAccessor.NO_LIMIT);
-        policyParams.put(FeedPolicyAccessor.TIME_TRACKING, "true");
 
-        String description = "AdvancedFT 100% Discard during congestion";
-        return new FeedPolicyEntity(MetadataConstants.METADATA_DATAVERSE_NAME, 
"AdvancedFT_Spill", description,
-                policyParams);
-    }
-
-    // AdvancedFT_Spill
-    private static FeedPolicyEntity initializeAdvancedFTThrottlePolicy() {
-        Map<String, String> policyParams = new HashMap<String, String>();
-        policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_CONTINUE, "true");
-        policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_LOG_DATA, "true");
-        policyParams.put(FeedPolicyAccessor.HARDWARE_FAILURE_CONTINUE, "true");
-        policyParams.put(FeedPolicyAccessor.CLUSTER_REBOOT_AUTO_RESTART, 
"true");
-        policyParams.put(FeedPolicyAccessor.ELASTIC, "false");
-        policyParams.put(FeedPolicyAccessor.SPILL_TO_DISK_ON_CONGESTION, "" + 
Boolean.FALSE);
-        policyParams.put(FeedPolicyAccessor.MAX_FRACTION_DISCARD, "" + 0);
-        policyParams.put(FeedPolicyAccessor.TIME_TRACKING, "false");
-        policyParams.put(FeedPolicyAccessor.THROTTLING_ENABLED, "true");
-
-        String description = "AdvancedFT Throttle during congestion";
-        return new FeedPolicyEntity(MetadataConstants.METADATA_DATAVERSE_NAME, 
"AdvancedFT_Throttle", description,
+        String description = "FlowControl 100% Spill during congestion";
+        return new FeedPolicyEntity(MetadataConstants.METADATA_DATAVERSE_NAME, 
"Spill", description,
                 policyParams);
     }
 
     // AdvancedFT_Elastic
     private static FeedPolicyEntity initializeAdvancedFTElasticPolicy() {
         Map<String, String> policyParams = new HashMap<String, String>();
-        policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_CONTINUE, "true");
-        policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_LOG_DATA, "true");
-        policyParams.put(FeedPolicyAccessor.HARDWARE_FAILURE_CONTINUE, "true");
-        policyParams.put(FeedPolicyAccessor.CLUSTER_REBOOT_AUTO_RESTART, 
"true");
         policyParams.put(FeedPolicyAccessor.ELASTIC, "true");
-        policyParams.put(FeedPolicyAccessor.TIME_TRACKING, "false");
+        policyParams.put(FeedPolicyAccessor.FLOWCONTROL_ENABLED, "true");
         policyParams.put(FeedPolicyAccessor.LOGGING_STATISTICS, "true");
-
         String description = "Basic Monitored Fault-Tolerant Elastic";
         return new FeedPolicyEntity(MetadataConstants.METADATA_DATAVERSE_NAME, 
"AdvancedFT_Elastic", description,
                 policyParams);

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1591
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ibc10139925cfedee66d1263990ba80b94675f182
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Xikui Wang <[email protected]>

Reply via email to