abdullah alamoudi has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/767

Change subject: Fix Error Message for Unknown Format
......................................................................

Fix Error Message for Unknown Format

This change includes some refactoring for external
data. This refactoring makes the code less error prone
and separate data source selection from data parser
selection.

Change-Id: Ib4aac833e30bd7c5a7706f5c8116383c2362c964
---
M 
asterix-app/src/test/resources/runtimets/queries/external-indexing/rc-format/rc-format.1.ddl.aql
M 
asterix-app/src/test/resources/runtimets/queries/hdfs/large-record/large-record.1.ddl.aql
M asterix-app/src/test/resources/runtimets/testsuite.xml
M 
asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
M 
asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
A 
asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/IndexingStreamRecordReader.java
D 
asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReaderFactory.java
M 
asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReader.java
D 
asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReaderFactory.java
M 
asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
D 
asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReaderFactory.java
M 
asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
M 
asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
D 
asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReaderFactory.java
R 
asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java
A 
asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
M 
asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStream.java
M 
asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
A 
asterix-external-data/src/main/java/org/apache/asterix/external/provider/StreamRecordReaderProvider.java
M 
asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
M 
asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
M 
asterix-external-data/src/test/java/org/apache/asterix/external/classad/test/ClassAdToADMTest.java
M 
asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java
M 
asterix-installer/src/test/resources/transactionts/queries/query_after_restart/external_index/external_index.2.ddl.aql
24 files changed, 408 insertions(+), 439 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/67/767/1

diff --git 
a/asterix-app/src/test/resources/runtimets/queries/external-indexing/rc-format/rc-format.1.ddl.aql
 
b/asterix-app/src/test/resources/runtimets/queries/external-indexing/rc-format/rc-format.1.ddl.aql
index 5a7294c..4ffc5a7 100644
--- 
a/asterix-app/src/test/resources/runtimets/queries/external-indexing/rc-format/rc-format.1.ddl.aql
+++ 
b/asterix-app/src/test/resources/runtimets/queries/external-indexing/rc-format/rc-format.1.ddl.aql
@@ -36,6 +36,11 @@
 
 create external dataset EmployeeDataset(EmployeeType)
 using hdfs
-(("hdfs"="hdfs://127.0.0.1:31888"),("path"="/asterix/external-indexing-test.rc"),("input-format"="rc-input-format"),("format"="binary"),("parser"="hive-parser"),("hive-serde"="org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe"));
+(("hdfs"="hdfs://127.0.0.1:31888"),
+("path"="/asterix/external-indexing-test.rc"),
+("input-format"="rc-input-format"),
+("format"="hdfs-writable"),
+("parser"="hive-parser"),
+("hive-serde"="org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe"));
 
 create index EmployeeAgeIdx on EmployeeDataset(age);
diff --git 
a/asterix-app/src/test/resources/runtimets/queries/hdfs/large-record/large-record.1.ddl.aql
 
b/asterix-app/src/test/resources/runtimets/queries/hdfs/large-record/large-record.1.ddl.aql
index 000ef5b..4e306b3 100644
--- 
a/asterix-app/src/test/resources/runtimets/queries/hdfs/large-record/large-record.1.ddl.aql
+++ 
b/asterix-app/src/test/resources/runtimets/queries/hdfs/large-record/large-record.1.ddl.aql
@@ -37,7 +37,7 @@
 create external dataset EmployeeDataset(EmployeeType)
 using adapter
 (("reader"="hdfs"),
-("parser"="delimited-text"),
+("format"="delimited-text"),
 ("hdfs"="hdfs://127.0.0.1:31888"),
 ("path"="/asterix/large-record.txt"),
 ("input-format"="text-input-format"),
diff --git a/asterix-app/src/test/resources/runtimets/testsuite.xml 
b/asterix-app/src/test/resources/runtimets/testsuite.xml
index 3a5140c..ae7333e 100644
--- a/asterix-app/src/test/resources/runtimets/testsuite.xml
+++ b/asterix-app/src/test/resources/runtimets/testsuite.xml
@@ -28,6 +28,38 @@
         ResultOffsetPath="results"
         QueryOffsetPath="queries"
         QueryFileExtension=".aql">
+    <test-group name="external-indexing">
+        <test-case FilePath="external-indexing">
+            <compilation-unit name="text-format">
+                <output-dir compare="Text">text-format</output-dir>
+            </compilation-unit>
+        </test-case>
+        <test-case FilePath="external-indexing">
+            <compilation-unit name="sequence-format">
+                <output-dir compare="Text">sequence-format</output-dir>
+            </compilation-unit>
+        </test-case>
+        <test-case FilePath="external-indexing">
+            <compilation-unit name="rc-format">
+                <output-dir compare="Text">rc-format</output-dir>
+            </compilation-unit>
+        </test-case>
+        <test-case FilePath="external-indexing">
+            <compilation-unit name="rtree-index">
+                <output-dir compare="Text">rtree-index</output-dir>
+            </compilation-unit>
+        </test-case>
+        <test-case FilePath="external-indexing">
+            <compilation-unit name="leftouterjoin">
+                <output-dir compare="Text">leftouterjoin</output-dir>
+            </compilation-unit>
+        </test-case>
+        <test-case FilePath="external-indexing">
+            <compilation-unit name="leftouterjoin-rtree">
+                <output-dir compare="Text">leftouterjoin-rtree</output-dir>
+            </compilation-unit>
+        </test-case>
+    </test-group>
     <test-group name="external-library">
         <test-case FilePath="external-library">
             <compilation-unit name="typed_adapter">
@@ -6387,38 +6419,6 @@
         <test-case FilePath="big-object">
             <compilation-unit name="big_object_load">
                 <output-dir compare="Text">big_object_load</output-dir>
-            </compilation-unit>
-        </test-case>
-    </test-group>
-    <test-group name="external-indexing">
-        <test-case FilePath="external-indexing">
-            <compilation-unit name="text-format">
-                <output-dir compare="Text">text-format</output-dir>
-            </compilation-unit>
-        </test-case>
-        <test-case FilePath="external-indexing">
-            <compilation-unit name="sequence-format">
-                <output-dir compare="Text">sequence-format</output-dir>
-            </compilation-unit>
-        </test-case>
-        <test-case FilePath="external-indexing">
-            <compilation-unit name="rc-format">
-                <output-dir compare="Text">rc-format</output-dir>
-            </compilation-unit>
-        </test-case>
-        <test-case FilePath="external-indexing">
-            <compilation-unit name="rtree-index">
-                <output-dir compare="Text">rtree-index</output-dir>
-            </compilation-unit>
-        </test-case>
-        <test-case FilePath="external-indexing">
-            <compilation-unit name="leftouterjoin">
-                <output-dir compare="Text">leftouterjoin</output-dir>
-            </compilation-unit>
-        </test-case>
-        <test-case FilePath="external-indexing">
-            <compilation-unit name="leftouterjoin-rtree">
-                <output-dir compare="Text">leftouterjoin-rtree</output-dir>
             </compilation-unit>
         </test-case>
     </test-group>
diff --git 
a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
 
b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
index a03ad1a..041f706 100644
--- 
a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
+++ 
b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
@@ -120,9 +120,14 @@
         this.metaType = metaType;
         this.configuration = configuration;
         dataSourceFactory = 
DatasourceFactoryProvider.getExternalDataSourceFactory(configuration);
-
         dataParserFactory = 
ParserFactoryProvider.getDataParserFactory(configuration);
-        prepare();
+        if (dataSourceFactory.isIndexible() && (files != null)) {
+            ((IIndexibleExternalDataSource) 
dataSourceFactory).setSnapshot(files, indexingOp);
+        }
+        dataSourceFactory.configure(configuration);
+        dataParserFactory.setRecordType(recordType);
+        dataParserFactory.setMetaType(metaType);
+        dataParserFactory.configure(configuration);
         
ExternalDataCompatibilityUtils.validateCompatibility(dataSourceFactory, 
dataParserFactory);
         configureFeedLogManager();
         nullifyExternalObjects();
@@ -143,16 +148,6 @@
         if 
(ExternalDataUtils.isExternal(configuration.get(ExternalDataConstants.KEY_PARSER)))
 {
             dataParserFactory = null;
         }
-    }
-
-    private void prepare() throws AsterixException {
-        if (dataSourceFactory.isIndexible() && (files != null)) {
-            ((IIndexibleExternalDataSource) 
dataSourceFactory).setSnapshot(files, indexingOp);
-        }
-        dataSourceFactory.configure(configuration);
-        dataParserFactory.setRecordType(recordType);
-        dataParserFactory.setMetaType(metaType);
-        dataParserFactory.configure(configuration);
     }
 
     @Override
diff --git 
a/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
 
b/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
index de185e0..e78edca 100644
--- 
a/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
+++ 
b/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
@@ -25,15 +25,20 @@
 
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.external.api.AsterixInputStream;
+import org.apache.asterix.external.api.IExternalIndexer;
 import org.apache.asterix.external.api.IIndexibleExternalDataSource;
-import org.apache.asterix.external.api.IInputStreamFactory;
 import org.apache.asterix.external.api.IRecordReader;
 import org.apache.asterix.external.api.IRecordReaderFactory;
 import org.apache.asterix.external.indexing.ExternalFile;
 import org.apache.asterix.external.indexing.IndexingScheduler;
+import 
org.apache.asterix.external.input.record.reader.IndexingStreamRecordReader;
 import org.apache.asterix.external.input.record.reader.hdfs.HDFSRecordReader;
+import 
org.apache.asterix.external.input.record.reader.stream.StreamRecordReader;
 import org.apache.asterix.external.input.stream.HDFSInputStream;
 import org.apache.asterix.external.provider.ExternalIndexerProvider;
+import org.apache.asterix.external.provider.StreamRecordReaderProvider;
+import org.apache.asterix.external.provider.StreamRecordReaderProvider.Format;
+import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.external.util.HDFSUtils;
 import org.apache.hadoop.io.Writable;
@@ -48,8 +53,7 @@
 import org.apache.hyracks.hdfs.dataflow.InputSplitsFactory;
 import org.apache.hyracks.hdfs.scheduler.Scheduler;
 
-public class HDFSDataSourceFactory
-        implements IInputStreamFactory, IRecordReaderFactory<Object>, 
IIndexibleExternalDataSource {
+public class HDFSDataSourceFactory implements IRecordReaderFactory<Object>, 
IIndexibleExternalDataSource {
 
     protected static final long serialVersionUID = 1L;
     protected transient AlgebricksAbsolutePartitionConstraint clusterLocations;
@@ -69,6 +73,7 @@
     private JobConf conf;
     private InputSplit[] inputSplits;
     private String nodeName;
+    private Format format;
 
     @Override
     public void configure(Map<String, String> configuration) throws 
AsterixException {
@@ -94,10 +99,14 @@
             inputSplitsFactory = new InputSplitsFactory(inputSplits);
             read = new boolean[readSchedule.length];
             Arrays.fill(read, false);
-            if 
(!ExternalDataUtils.getDataSourceType(configuration).equals(DataSourceType.STREAM))
 {
+            String formatString = 
configuration.get(ExternalDataConstants.KEY_FORMAT);
+            if (formatString == null || 
formatString.equals(ExternalDataConstants.FORMAT_HDFS_WRITABLE)) {
                 RecordReader<?, ?> reader = 
conf.getInputFormat().getRecordReader(inputSplits[0], conf, Reporter.NULL);
                 this.recordClass = reader.createValue().getClass();
                 reader.close();
+            } else {
+                format = 
StreamRecordReaderProvider.getReaderFormat(configuration);
+                this.recordClass = char[].class;
             }
         } catch (IOException e) {
             throw new AsterixException(e);
@@ -117,8 +126,8 @@
      * 1. when target files are not null, it generates a file aware input 
stream that validate
      * against the files
      * 2. if the data is binary, it returns a generic reader */
-    @Override
-    public AsterixInputStream createInputStream(IHyracksTaskContext ctx, int 
partition) throws HyracksDataException {
+    public AsterixInputStream createInputStream(IHyracksTaskContext ctx, int 
partition, IExternalIndexer indexer)
+            throws HyracksDataException {
         try {
             if (!configured) {
                 conf = confFactory.getConf();
@@ -126,7 +135,7 @@
                 nodeName = 
ctx.getJobletContext().getApplicationContext().getNodeId();
                 configured = true;
             }
-            return new HDFSInputStream(read, inputSplits, readSchedule, 
nodeName, conf, configuration, files);
+            return new HDFSInputStream(read, inputSplits, readSchedule, 
nodeName, conf, configuration, files, indexer);
         } catch (Exception e) {
             throw new HyracksDataException(e);
         }
@@ -171,14 +180,24 @@
     }
 
     @Override
-    public IRecordReader<? extends Writable> 
createRecordReader(IHyracksTaskContext ctx, int partition)
+    public IRecordReader<? extends Object> 
createRecordReader(IHyracksTaskContext ctx, int partition)
             throws HyracksDataException {
         try {
+            IExternalIndexer indexer = files == null ? null : 
ExternalIndexerProvider.getIndexer(configuration);
+            if (format != null) {
+                StreamRecordReader streamReader = 
StreamRecordReaderProvider.createRecordReader(format,
+                        createInputStream(ctx, partition, indexer), 
configuration);
+                if (indexer != null) {
+                    return new IndexingStreamRecordReader(streamReader, 
indexer);
+                } else {
+                    return streamReader;
+                }
+            }
             JobConf conf = confFactory.getConf();
             InputSplit[] inputSplits = inputSplitsFactory.getSplits();
             String nodeName = 
ctx.getJobletContext().getApplicationContext().getNodeId();
             return new HDFSRecordReader<Object, Writable>(read, inputSplits, 
readSchedule, nodeName, conf, files,
-                    files == null ? null : 
ExternalIndexerProvider.getIndexer(configuration));
+                    indexer);
         } catch (Exception e) {
             throw new HyracksDataException(e);
         }
diff --git 
a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/IndexingStreamRecordReader.java
 
b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/IndexingStreamRecordReader.java
new file mode 100644
index 0000000..2c2dd98
--- /dev/null
+++ 
b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/IndexingStreamRecordReader.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.asterix.external.api.IExternalIndexer;
+import org.apache.asterix.external.api.IIndexingDatasource;
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
+import org.apache.asterix.external.indexing.ExternalFile;
+import 
org.apache.asterix.external.input.record.reader.stream.StreamRecordReader;
+import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.RecordReader;
+
+public class IndexingStreamRecordReader implements IRecordReader<char[]>, 
IIndexingDatasource {
+
+    private StreamRecordReader reader;
+    private IExternalIndexer indexer;
+
+    public IndexingStreamRecordReader(StreamRecordReader reader, 
IExternalIndexer indexer) {
+        this.reader = reader;
+        this.indexer = indexer;
+    }
+
+    @Override
+    public void close() throws IOException {
+        reader.close();
+    }
+
+    @Override
+    public IExternalIndexer getIndexer() {
+        return indexer;
+    }
+
+    @Override
+    public boolean hasNext() throws Exception {
+        return reader.hasNext();
+    }
+
+    @Override
+    public IRawRecord<char[]> next() throws IOException, InterruptedException {
+        return reader.next();
+    }
+
+    @Override
+    public boolean stop() {
+        return reader.stop();
+    }
+
+    @Override
+    public void setController(AbstractFeedDataFlowController controller) {
+        reader.setController(controller);
+    }
+
+    @Override
+    public void setFeedLogManager(FeedLogManager feedLogManager) {
+        reader.setFeedLogManager(feedLogManager);
+    }
+
+    @Override
+    public List<ExternalFile> getSnapshot() {
+        return null;
+    }
+
+    @Override
+    public int getCurrentSplitIndex() {
+        return -1;
+    }
+
+    @Override
+    public RecordReader<?, ? extends Writable> getReader() {
+        return null;
+    }
+
+    @Override
+    public boolean handleException(Throwable th) {
+        return reader.handleException(th);
+    }
+
+}
diff --git 
a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReaderFactory.java
 
b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReaderFactory.java
deleted file mode 100644
index 2c82f47..0000000
--- 
a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReaderFactory.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.record.reader.stream;
-
-import java.util.List;
-import java.util.Map;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.external.api.AsterixInputStream;
-import org.apache.asterix.external.api.IExternalIndexer;
-import org.apache.asterix.external.api.IIndexibleExternalDataSource;
-import org.apache.asterix.external.api.IIndexingDatasource;
-import org.apache.asterix.external.api.IInputStreamFactory;
-import org.apache.asterix.external.api.IRecordReaderFactory;
-import org.apache.asterix.external.indexing.ExternalFile;
-import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public abstract class AbstractStreamRecordReaderFactory<T>
-        implements IRecordReaderFactory<T>, IIndexibleExternalDataSource {
-
-    private static final long serialVersionUID = 1L;
-    protected IInputStreamFactory inputStreamFactory;
-    protected Map<String, String> configuration;
-
-    public AbstractStreamRecordReaderFactory<T> setInputStreamFactoryProvider(
-            IInputStreamFactory inputStreamFactory) {
-        this.inputStreamFactory = inputStreamFactory;
-        return this;
-    }
-
-    @Override
-    public DataSourceType getDataSourceType() {
-        return DataSourceType.RECORDS;
-    }
-
-    @Override
-    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() 
throws AsterixException {
-        return inputStreamFactory.getPartitionConstraint();
-    }
-
-    @Override
-    public void configure(Map<String, String> configuration) throws 
AsterixException {
-        this.configuration = configuration;
-        inputStreamFactory.configure(configuration);
-    }
-
-    @Override
-    public boolean isIndexible() {
-        return inputStreamFactory.isIndexible();
-    }
-
-    @Override
-    public void setSnapshot(List<ExternalFile> files, boolean indexingOp) {
-        ((IIndexibleExternalDataSource) inputStreamFactory).setSnapshot(files, 
indexingOp);
-    }
-
-    @Override
-    public boolean isIndexingOp() {
-        if (inputStreamFactory.isIndexible()) {
-            return ((IIndexibleExternalDataSource) 
inputStreamFactory).isIndexingOp();
-        }
-        return false;
-    }
-
-    protected Pair<AsterixInputStream, IExternalIndexer> 
getStreamAndIndexer(IHyracksTaskContext ctx, int partition)
-            throws HyracksDataException {
-        AsterixInputStream inputStream = 
inputStreamFactory.createInputStream(ctx, partition);
-        IExternalIndexer indexer = null;
-        if (inputStreamFactory.isIndexible()) {
-            if (((IIndexibleExternalDataSource) 
inputStreamFactory).isIndexingOp()) {
-                indexer = ((IIndexingDatasource) inputStream).getIndexer();
-            }
-        }
-        return new Pair<AsterixInputStream, IExternalIndexer>(inputStream, 
indexer);
-    }
-}
diff --git 
a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReader.java
 
b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReader.java
index 6964a82..aa0451a 100644
--- 
a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReader.java
+++ 
b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReader.java
@@ -21,13 +21,12 @@
 import java.io.IOException;
 
 import org.apache.asterix.external.api.AsterixInputStream;
-import org.apache.asterix.external.api.IExternalIndexer;
 import org.apache.asterix.external.util.ExternalDataConstants;
 
-public class EmptyLineSeparatedRecordReader extends AbstractStreamRecordReader 
{
+public class EmptyLineSeparatedRecordReader extends StreamRecordReader {
 
-    public EmptyLineSeparatedRecordReader(AsterixInputStream inputStream, 
IExternalIndexer indexer) {
-        super(inputStream, indexer);
+    public EmptyLineSeparatedRecordReader(AsterixInputStream inputStream) {
+        super(inputStream);
     }
 
     private boolean prevCharCR;
diff --git 
a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReaderFactory.java
 
b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReaderFactory.java
deleted file mode 100644
index 063ed11..0000000
--- 
a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReaderFactory.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.record.reader.stream;
-
-import org.apache.asterix.external.api.AsterixInputStream;
-import org.apache.asterix.external.api.IExternalIndexer;
-import org.apache.asterix.external.api.IRecordReader;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public class EmptyLineSeparatedRecordReaderFactory extends 
AbstractStreamRecordReaderFactory<char[]> {
-
-    private static final long serialVersionUID = 1L;
-
-    @Override
-    public IRecordReader<char[]> createRecordReader(IHyracksTaskContext ctx, 
int partition)
-            throws HyracksDataException {
-        final Pair<AsterixInputStream, IExternalIndexer> streamAndIndexer = 
getStreamAndIndexer(ctx, partition);
-        return new EmptyLineSeparatedRecordReader(streamAndIndexer.first, 
streamAndIndexer.second);
-    }
-
-    @Override
-    public Class<? extends char[]> getRecordClass() {
-        return char[].class;
-    }
-}
diff --git 
a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
 
b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
index 3089295..8572fc7 100644
--- 
a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
+++ 
b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
@@ -21,19 +21,17 @@
 import java.io.IOException;
 
 import org.apache.asterix.external.api.AsterixInputStream;
-import org.apache.asterix.external.api.IExternalIndexer;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
-public class LineRecordReader extends AbstractStreamRecordReader {
+public class LineRecordReader extends StreamRecordReader {
 
     protected boolean prevCharCR;
     protected int newlineLength;
     protected int recordNumber = 0;
 
-    public LineRecordReader(final boolean hasHeader, final AsterixInputStream 
stream, final IExternalIndexer indexer)
-            throws HyracksDataException {
-        super(stream, indexer);
+    public LineRecordReader(final boolean hasHeader, final AsterixInputStream 
stream) throws HyracksDataException {
+        super(stream);
         try {
             if (hasHeader) {
                 if (hasNext()) {
diff --git 
a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReaderFactory.java
 
b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReaderFactory.java
deleted file mode 100644
index 4d44001..0000000
--- 
a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReaderFactory.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.record.reader.stream;
-
-import org.apache.asterix.external.api.AsterixInputStream;
-import org.apache.asterix.external.api.IExternalIndexer;
-import org.apache.asterix.external.api.IRecordReader;
-import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.ExternalDataUtils;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public class LineRecordReaderFactory extends 
AbstractStreamRecordReaderFactory<char[]> {
-
-    private static final long serialVersionUID = 1L;
-
-    @Override
-    public IRecordReader<? extends char[]> 
createRecordReader(IHyracksTaskContext ctx, int partition)
-            throws HyracksDataException {
-        String quoteString = 
configuration.get(ExternalDataConstants.KEY_QUOTE);
-        boolean hasHeader = ExternalDataUtils.hasHeader(configuration);
-        Pair<AsterixInputStream, IExternalIndexer> streamAndIndexer = 
getStreamAndIndexer(ctx, partition);
-        if (quoteString != null) {
-            return new QuotedLineRecordReader(hasHeader, 
streamAndIndexer.first, streamAndIndexer.second, quoteString);
-        } else {
-            return new LineRecordReader(hasHeader, streamAndIndexer.first, 
streamAndIndexer.second);
-        }
-    }
-
-    @Override
-    public Class<? extends char[]> getRecordClass() {
-        return char[].class;
-    }
-
-}
diff --git 
a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
 
b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
index abd2952..515e0e5 100644
--- 
a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
+++ 
b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
@@ -32,9 +32,9 @@
     private boolean prevCharEscape;
     private boolean inQuote;
 
-    public QuotedLineRecordReader(final boolean hasHeader, final 
AsterixInputStream stream,
-            final IExternalIndexer indexer, final String quoteString) throws 
HyracksDataException {
-        super(hasHeader, stream, indexer);
+    public QuotedLineRecordReader(final boolean hasHeader, final 
AsterixInputStream stream, final String quoteString)
+            throws HyracksDataException {
+        super(hasHeader, stream);
         if ((quoteString == null) || (quoteString.length() != 1)) {
             throw new 
HyracksDataException(ExternalDataExceptionUtils.incorrectParameterMessage(
                     ExternalDataConstants.KEY_QUOTE, 
ExternalDataConstants.PARAMETER_OF_SIZE_ONE, quoteString));
diff --git 
a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
 
b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
index 7339bfd..26ac3cb 100644
--- 
a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
+++ 
b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
@@ -20,13 +20,13 @@
 
 import java.io.IOException;
 
-import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.external.api.AsterixInputStream;
 import org.apache.asterix.external.api.IExternalIndexer;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataExceptionUtils;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
-public class SemiStructuredRecordReader extends AbstractStreamRecordReader {
+public class SemiStructuredRecordReader extends StreamRecordReader {
 
     private int depth;
     private boolean prevCharEscape;
@@ -35,13 +35,13 @@
     private char recordEnd;
     private int recordNumber = 0;
 
-    public SemiStructuredRecordReader(AsterixInputStream stream, 
IExternalIndexer indexer, String recStartString,
-            String recEndString) throws AsterixException {
-        super(stream, indexer);
+    public SemiStructuredRecordReader(AsterixInputStream stream, String 
recStartString, String recEndString)
+            throws HyracksDataException {
+        super(stream);
         // set record opening char
         if (recStartString != null) {
             if (recStartString.length() != 1) {
-                throw new AsterixException(
+                throw new HyracksDataException(
                         
ExternalDataExceptionUtils.incorrectParameterMessage(ExternalDataConstants.KEY_RECORD_START,
                                 ExternalDataConstants.PARAMETER_OF_SIZE_ONE, 
recStartString));
             }
@@ -52,7 +52,7 @@
         // set record ending char
         if (recEndString != null) {
             if (recEndString.length() != 1) {
-                throw new AsterixException(
+                throw new HyracksDataException(
                         
ExternalDataExceptionUtils.incorrectParameterMessage(ExternalDataConstants.KEY_RECORD_END,
                                 ExternalDataConstants.PARAMETER_OF_SIZE_ONE, 
recEndString));
             }
@@ -67,7 +67,7 @@
     }
 
     @Override
-    public boolean hasNext() throws Exception {
+    public boolean hasNext() throws IOException {
         if (done) {
             return false;
         }
diff --git 
a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReaderFactory.java
 
b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReaderFactory.java
deleted file mode 100644
index 0f50204..0000000
--- 
a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReaderFactory.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.record.reader.stream;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.external.api.AsterixInputStream;
-import org.apache.asterix.external.api.IExternalIndexer;
-import org.apache.asterix.external.api.IRecordReader;
-import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public class SemiStructuredRecordReaderFactory extends 
AbstractStreamRecordReaderFactory<char[]> {
-
-    private static final long serialVersionUID = 1L;
-
-    @Override
-    public IRecordReader<? extends char[]> 
createRecordReader(IHyracksTaskContext ctx, int partition)
-            throws HyracksDataException {
-        Pair<AsterixInputStream, IExternalIndexer> streamAndIndexer = 
getStreamAndIndexer(ctx, partition);
-        try {
-            return new SemiStructuredRecordReader(streamAndIndexer.first, 
streamAndIndexer.second,
-                    configuration.get(ExternalDataConstants.KEY_RECORD_START),
-                    configuration.get(ExternalDataConstants.KEY_RECORD_END));
-        } catch (AsterixException e) {
-            throw new HyracksDataException(e);
-        }
-    }
-
-    @Override
-    public Class<? extends char[]> getRecordClass() {
-        return char[].class;
-    }
-}
diff --git 
a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReader.java
 
b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java
similarity index 72%
rename from 
asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReader.java
rename to 
asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java
index 7d6c1f3..57ef3ae 100644
--- 
a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReader.java
+++ 
b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java
@@ -19,35 +19,27 @@
 package org.apache.asterix.external.input.record.reader.stream;
 
 import java.io.IOException;
-import java.util.List;
 
 import org.apache.asterix.external.api.AsterixInputStream;
-import org.apache.asterix.external.api.IExternalIndexer;
-import org.apache.asterix.external.api.IIndexingDatasource;
 import org.apache.asterix.external.api.IRawRecord;
 import org.apache.asterix.external.api.IRecordReader;
 import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
-import org.apache.asterix.external.indexing.ExternalFile;
 import org.apache.asterix.external.input.record.CharArrayRecord;
 import org.apache.asterix.external.input.stream.AsterixInputStreamReader;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.FeedLogManager;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.RecordReader;
 
-public abstract class AbstractStreamRecordReader implements 
IRecordReader<char[]>, IIndexingDatasource {
+public abstract class StreamRecordReader implements IRecordReader<char[]> {
     protected final AsterixInputStreamReader reader;
     protected CharArrayRecord record;
     protected char[] inputBuffer;
     protected int bufferLength = 0;
     protected int bufferPosn = 0;
-    protected final IExternalIndexer indexer;
     protected boolean done = false;
     protected FeedLogManager feedLogManager;
 
-    public AbstractStreamRecordReader(AsterixInputStream inputStream, 
IExternalIndexer indexer) {
+    public StreamRecordReader(AsterixInputStream inputStream) {
         this.reader = new AsterixInputStreamReader(inputStream);
-        this.indexer = indexer;
         record = new CharArrayRecord();
         inputBuffer = new char[ExternalDataConstants.DEFAULT_BUFFER_SIZE];
     }
@@ -66,11 +58,6 @@
     }
 
     @Override
-    public IExternalIndexer getIndexer() {
-        return indexer;
-    }
-
-    @Override
     public boolean stop() {
         try {
             reader.stop();
@@ -82,35 +69,20 @@
     }
 
     @Override
+    public abstract boolean hasNext() throws IOException;
+
+    @Override
+    public void setFeedLogManager(FeedLogManager feedLogManager) {
+        reader.setFeedLogManager(feedLogManager);
+    }
+
+    @Override
     public void setController(AbstractFeedDataFlowController controller) {
         reader.setController(controller);
     }
 
     @Override
-    public void setFeedLogManager(FeedLogManager feedLogManager) {
-        this.feedLogManager = feedLogManager;
-        reader.setFeedLogManager(feedLogManager);
-    }
-
-    @Override
     public boolean handleException(Throwable th) {
         return reader.handleException(th);
-    }
-
-    //TODO: Fix the following method since they don't fit
-    //Already the fix is in another local branch
-    @Override
-    public List<ExternalFile> getSnapshot() {
-        return null;
-    }
-
-    @Override
-    public int getCurrentSplitIndex() {
-        return -1;
-    }
-
-    @Override
-    public RecordReader<?, Writable> getReader() {
-        return null;
     }
 }
diff --git 
a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
 
b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
new file mode 100644
index 0000000..f743a3f
--- /dev/null
+++ 
b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.stream;
+
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IInputStreamFactory;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.api.IRecordReaderFactory;
+import org.apache.asterix.external.provider.StreamRecordReaderProvider;
+import org.apache.asterix.external.provider.StreamRecordReaderProvider.Format;
+import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class StreamRecordReaderFactory implements IRecordReaderFactory<char[]> 
{
+
+    private static final long serialVersionUID = 1L;
+    protected final IInputStreamFactory streamFactory;
+    protected Map<String, String> configuration;
+    protected Format format;
+
+    public StreamRecordReaderFactory(IInputStreamFactory inputStreamFactory) {
+        this.streamFactory = inputStreamFactory;
+    }
+
+    @Override
+    public DataSourceType getDataSourceType() {
+        return DataSourceType.RECORDS;
+    }
+
+    @Override
+    public Class<?> getRecordClass() {
+        return char[].class;
+    }
+
+    @Override
+    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() 
throws AsterixException {
+        return streamFactory.getPartitionConstraint();
+    }
+
+    @Override
+    public void configure(Map<String, String> configuration) throws 
AsterixException {
+        this.configuration = configuration;
+        streamFactory.configure(configuration);
+        format = StreamRecordReaderProvider.getReaderFormat(configuration);
+    }
+
+    @Override
+    public IRecordReader<? extends char[]> 
createRecordReader(IHyracksTaskContext ctx, int partition)
+            throws HyracksDataException {
+        return StreamRecordReaderProvider.createRecordReader(format, 
streamFactory.createInputStream(ctx, partition),
+                configuration);
+    }
+}
diff --git 
a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStream.java
 
b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStream.java
index 063b8fa..997c254 100644
--- 
a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStream.java
+++ 
b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStream.java
@@ -28,7 +28,6 @@
 import org.apache.asterix.external.api.IIndexingDatasource;
 import org.apache.asterix.external.indexing.ExternalFile;
 import org.apache.asterix.external.input.record.reader.hdfs.EmptyRecordReader;
-import org.apache.asterix.external.provider.ExternalIndexerProvider;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -63,8 +62,8 @@
 
     @SuppressWarnings("unchecked")
     public HDFSInputStream(boolean read[], InputSplit[] inputSplits, String[] 
readSchedule, String nodeName,
-            JobConf conf, Map<String, String> configuration, 
List<ExternalFile> snapshot)
-                    throws IOException, AsterixException {
+            JobConf conf, Map<String, String> configuration, 
List<ExternalFile> snapshot, IExternalIndexer indexer)
+            throws IOException, AsterixException {
         this.read = read;
         this.inputSplits = inputSplits;
         this.readSchedule = readSchedule;
@@ -74,15 +73,13 @@
         this.reader = new EmptyRecordReader<Object, Text>();
         this.snapshot = snapshot;
         this.hdfs = FileSystem.get(conf);
+        this.indexer = indexer;
         nextInputSplit();
         this.value = new Text();
         if (snapshot != null) {
-            this.indexer = ExternalIndexerProvider.getIndexer(configuration);
             if (currentSplitIndex < snapshot.size()) {
                 indexer.reset(this);
             }
-        } else {
-            this.indexer = null;
         }
     }
 
diff --git 
a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
 
b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
index f8d64e0..4e4aaf2 100644
--- 
a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
+++ 
b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
@@ -29,11 +29,10 @@
 import 
org.apache.asterix.external.input.record.reader.RecordWithPKTestReaderFactory;
 import org.apache.asterix.external.input.record.reader.kv.KVReaderFactory;
 import org.apache.asterix.external.input.record.reader.kv.KVTestReaderFactory;
-import 
org.apache.asterix.external.input.record.reader.stream.EmptyLineSeparatedRecordReaderFactory;
-import 
org.apache.asterix.external.input.record.reader.stream.LineRecordReaderFactory;
-import 
org.apache.asterix.external.input.record.reader.stream.SemiStructuredRecordReaderFactory;
+import 
org.apache.asterix.external.input.record.reader.stream.StreamRecordReaderFactory;
 import 
org.apache.asterix.external.input.record.reader.twitter.TwitterRecordReaderFactory;
 import 
org.apache.asterix.external.input.stream.factory.LocalFSInputStreamFactory;
+import 
org.apache.asterix.external.input.stream.factory.SocketClientInputStreamFactory;
 import 
org.apache.asterix.external.input.stream.factory.SocketServerInputStreamFactory;
 import 
org.apache.asterix.external.input.stream.factory.TwitterFirehoseStreamFactory;
 import org.apache.asterix.external.util.ExternalDataConstants;
@@ -53,21 +52,18 @@
         }
     }
 
-    public static IInputStreamFactory getInputStreamFactory(String 
streamSource,
-            Map<String, String> configuration) throws AsterixException {
+    public static IInputStreamFactory getInputStreamFactory(String 
streamSource, Map<String, String> configuration)
+            throws AsterixException {
         IInputStreamFactory streamSourceFactory;
         if (ExternalDataUtils.isExternal(streamSource)) {
             String dataverse = ExternalDataUtils.getDataverse(configuration);
             streamSourceFactory = 
ExternalDataUtils.createExternalInputStreamFactory(dataverse, streamSource);
         } else {
             switch (streamSource) {
-                case ExternalDataConstants.STREAM_HDFS:
-                    streamSourceFactory = new HDFSDataSourceFactory();
-                    break;
                 case ExternalDataConstants.STREAM_LOCAL_FILESYSTEM:
                     streamSourceFactory = new LocalFSInputStreamFactory();
                     break;
-                case ExternalDataConstants.STREAM_SOCKET:
+                case ExternalDataConstants.SOCKET:
                 case ExternalDataConstants.ALIAS_SOCKET_ADAPTER:
                     streamSourceFactory = new SocketServerInputStreamFactory();
                     break;
@@ -89,59 +85,27 @@
         if (reader.equals(ExternalDataConstants.EXTERNAL)) {
             return 
ExternalDataUtils.createExternalRecordReaderFactory(configuration);
         }
-        String parser = configuration.get(ExternalDataConstants.KEY_PARSER);
-        IInputStreamFactory inputStreamFactory;
-        switch (parser) {
-            case ExternalDataConstants.FORMAT_ADM:
-            case ExternalDataConstants.FORMAT_JSON:
-            case ExternalDataConstants.FORMAT_SEMISTRUCTURED:
-                inputStreamFactory = 
DatasourceFactoryProvider.getInputStreamFactory(reader, configuration);
-                return new 
SemiStructuredRecordReaderFactory().setInputStreamFactoryProvider(inputStreamFactory);
-            case ExternalDataConstants.FORMAT_LINE_SEPARATED:
-                inputStreamFactory = 
DatasourceFactoryProvider.getInputStreamFactory(reader, configuration);
-                return new 
EmptyLineSeparatedRecordReaderFactory().setInputStreamFactoryProvider(inputStreamFactory);
-            case ExternalDataConstants.FORMAT_DELIMITED_TEXT:
-            case ExternalDataConstants.FORMAT_CSV:
-                inputStreamFactory = 
DatasourceFactoryProvider.getInputStreamFactory(reader, configuration);
-                return new 
LineRecordReaderFactory().setInputStreamFactoryProvider(inputStreamFactory);
-            case ExternalDataConstants.FORMAT_RECORD_WITH_METADATA:
-                switch (reader) {
-                    case ExternalDataConstants.READER_KV:
-                        return new KVReaderFactory();
-                    case ExternalDataConstants.READER_KV_TEST:
-                        return new KVTestReaderFactory();
-                }
-        }
-        String format = configuration.get(ExternalDataConstants.KEY_FORMAT);
-        if (format != null) {
-            switch (format) {
-                case ExternalDataConstants.FORMAT_ADM:
-                case ExternalDataConstants.FORMAT_JSON:
-                case ExternalDataConstants.FORMAT_SEMISTRUCTURED:
-                    inputStreamFactory = 
DatasourceFactoryProvider.getInputStreamFactory(reader, configuration);
-                    return new 
SemiStructuredRecordReaderFactory().setInputStreamFactoryProvider(inputStreamFactory);
-                case ExternalDataConstants.FORMAT_LINE_SEPARATED:
-                    inputStreamFactory = 
DatasourceFactoryProvider.getInputStreamFactory(reader, configuration);
-                    return new EmptyLineSeparatedRecordReaderFactory()
-                            .setInputStreamFactoryProvider(inputStreamFactory);
-                case ExternalDataConstants.FORMAT_DELIMITED_TEXT:
-                case ExternalDataConstants.FORMAT_CSV:
-                    inputStreamFactory = 
DatasourceFactoryProvider.getInputStreamFactory(reader, configuration);
-                    return new 
LineRecordReaderFactory().setInputStreamFactoryProvider(inputStreamFactory);
-            }
-        }
         switch (reader) {
-            case ExternalDataConstants.READER_HDFS:
-                return new HDFSDataSourceFactory();
-            case ExternalDataConstants.READER_TWITTER_PULL:
-            case ExternalDataConstants.READER_TWITTER_PUSH:
-                return new TwitterRecordReaderFactory();
             case ExternalDataConstants.READER_KV:
                 return new KVReaderFactory();
             case ExternalDataConstants.READER_KV_TEST:
                 return new KVTestReaderFactory();
+            case ExternalDataConstants.READER_HDFS:
+                return new HDFSDataSourceFactory();
+            case ExternalDataConstants.ALIAS_LOCALFS_ADAPTER:
+                return new StreamRecordReaderFactory(new 
LocalFSInputStreamFactory());
+            case ExternalDataConstants.READER_TWITTER_PULL:
+            case ExternalDataConstants.READER_TWITTER_PUSH:
+                return new TwitterRecordReaderFactory();
             case ExternalDataConstants.TEST_RECORD_WITH_PK:
                 return new RecordWithPKTestReaderFactory();
+            case ExternalDataConstants.ALIAS_TWITTER_FIREHOSE_ADAPTER:
+                return new StreamRecordReaderFactory(new 
TwitterFirehoseStreamFactory());
+            case ExternalDataConstants.ALIAS_SOCKET_ADAPTER:
+            case ExternalDataConstants.SOCKET:
+                return new StreamRecordReaderFactory(new 
SocketServerInputStreamFactory());
+            case ExternalDataConstants.STREAM_SOCKET_CLIENT:
+                return new StreamRecordReaderFactory(new 
SocketClientInputStreamFactory());
             default:
                 throw new AsterixException("unknown record reader factory: " + 
reader);
         }
diff --git 
a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/StreamRecordReaderProvider.java
 
b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/StreamRecordReaderProvider.java
new file mode 100644
index 0000000..ea8bc98
--- /dev/null
+++ 
b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/StreamRecordReaderProvider.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.provider;
+
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.AsterixInputStream;
+import 
org.apache.asterix.external.input.record.reader.stream.EmptyLineSeparatedRecordReader;
+import org.apache.asterix.external.input.record.reader.stream.LineRecordReader;
+import 
org.apache.asterix.external.input.record.reader.stream.QuotedLineRecordReader;
+import 
org.apache.asterix.external.input.record.reader.stream.SemiStructuredRecordReader;
+import 
org.apache.asterix.external.input.record.reader.stream.StreamRecordReader;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class StreamRecordReaderProvider {
+    public enum Format {
+        SEMISTRUCTURED,
+        CSV,
+        LINE_SEPARATED
+    }
+
+    public static Format getReaderFormat(Map<String, String> configuration) 
throws AsterixException {
+        String format = configuration.get(ExternalDataConstants.KEY_FORMAT);
+        if (format != null) {
+            switch (format) {
+                case ExternalDataConstants.FORMAT_ADM:
+                case ExternalDataConstants.FORMAT_JSON:
+                case ExternalDataConstants.FORMAT_SEMISTRUCTURED:
+                    return Format.SEMISTRUCTURED;
+                case ExternalDataConstants.FORMAT_LINE_SEPARATED:
+                    return Format.LINE_SEPARATED;
+                case ExternalDataConstants.FORMAT_DELIMITED_TEXT:
+                case ExternalDataConstants.FORMAT_CSV:
+                    return Format.CSV;
+            }
+            throw new AsterixException("Unknown format: " + format);
+        }
+        throw new AsterixException("Unspecified paramter: " + 
ExternalDataConstants.KEY_FORMAT);
+    }
+
+    public static StreamRecordReader createRecordReader(Format format, 
AsterixInputStream inputStream,
+            Map<String, String> configuration) throws HyracksDataException {
+        switch (format) {
+            case CSV:
+                String quoteString = 
configuration.get(ExternalDataConstants.KEY_QUOTE);
+                boolean hasHeader = ExternalDataUtils.hasHeader(configuration);
+                if (quoteString != null) {
+                    return new QuotedLineRecordReader(hasHeader, inputStream, 
quoteString);
+                } else {
+                    return new LineRecordReader(hasHeader, inputStream);
+                }
+            case LINE_SEPARATED:
+                return new EmptyLineSeparatedRecordReader(inputStream);
+            case SEMISTRUCTURED:
+                return new SemiStructuredRecordReader(inputStream,
+                        
configuration.get(ExternalDataConstants.KEY_RECORD_START),
+                        
configuration.get(ExternalDataConstants.KEY_RECORD_END));
+            default:
+                throw new HyracksDataException("Unknown format: " + format);
+        }
+    }
+}
diff --git 
a/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
 
b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index a02152b..609b894 100644
--- 
a/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ 
b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -156,7 +156,7 @@
      */
     public static final String STREAM_HDFS = "hdfs";
     public static final String STREAM_LOCAL_FILESYSTEM = "localfs";
-    public static final String STREAM_SOCKET = "socket";
+    public static final String SOCKET = "socket";
     public static final String STREAM_SOCKET_CLIENT = "socket-client";
 
     /**
diff --git 
a/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
 
b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
index 5ab41af..23e12bd 100644
--- 
a/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
+++ 
b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
@@ -60,27 +60,31 @@
 
     public static FileSplit[] splitsForAdapter(String dataverseName, String 
feedName,
             AlgebricksPartitionConstraint partitionConstraints) throws 
AsterixException {
-        if (partitionConstraints.getPartitionConstraintType() == 
PartitionConstraintType.COUNT) {
-            throw new AsterixException("Can't create file splits for adapter 
with count partitioning constraints");
+        try {
+            if (partitionConstraints.getPartitionConstraintType() == 
PartitionConstraintType.COUNT) {
+                throw new AsterixException("Can't create file splits for 
adapter with count partitioning constraints");
+            }
+            File relPathFile = new 
File(prepareDataverseFeedName(dataverseName, feedName));
+            String[] locations = null;
+            locations = ((AlgebricksAbsolutePartitionConstraint) 
partitionConstraints).getLocations();
+            List<FileSplit> splits = new ArrayList<FileSplit>();
+            String storageDirName = 
AsterixClusterProperties.INSTANCE.getStorageDirectoryName();
+            int i = 0;
+            for (String nd : locations) {
+                // Always get the first partition
+                ClusterPartition nodePartition = 
AsterixClusterProperties.INSTANCE.getNodePartitions(nd)[0];
+                String storagePartitionPath = 
StoragePathUtil.prepareStoragePartitionPath(storageDirName,
+                        nodePartition.getPartitionId());
+                // format: 'storage dir 
name'/partition_#/dataverse/feed/adapter_#
+                File f = new File(storagePartitionPath + File.separator + 
relPathFile + File.separator
+                        + StoragePathUtil.ADAPTER_INSTANCE_PREFIX + i);
+                
splits.add(StoragePathUtil.getFileSplitForClusterPartition(nodePartition, f));
+                i++;
+            }
+            return splits.toArray(new FileSplit[] {});
+        } catch (Throwable th) {
+            throw th;
         }
-        File relPathFile = new File(prepareDataverseFeedName(dataverseName, 
feedName));
-        String[] locations = null;
-        locations = ((AlgebricksAbsolutePartitionConstraint) 
partitionConstraints).getLocations();
-        List<FileSplit> splits = new ArrayList<FileSplit>();
-        String storageDirName = 
AsterixClusterProperties.INSTANCE.getStorageDirectoryName();
-        int i = 0;
-        for (String nd : locations) {
-            // Always get the first partition
-            ClusterPartition nodePartition = 
AsterixClusterProperties.INSTANCE.getNodePartitions(nd)[0];
-            String storagePartitionPath = 
StoragePathUtil.prepareStoragePartitionPath(storageDirName,
-                    nodePartition.getPartitionId());
-            // format: 'storage dir name'/partition_#/dataverse/feed/adapter_#
-            File f = new File(storagePartitionPath + File.separator + 
relPathFile + File.separator
-                    + StoragePathUtil.ADAPTER_INSTANCE_PREFIX + i);
-            
splits.add(StoragePathUtil.getFileSplitForClusterPartition(nodePartition, f));
-            i++;
-        }
-        return splits.toArray(new FileSplit[] {});
     }
 
     public static FileReference getAbsoluteFileRef(String relativePath, int 
ioDeviceId, IIOManager ioManager) {
diff --git 
a/asterix-external-data/src/test/java/org/apache/asterix/external/classad/test/ClassAdToADMTest.java
 
b/asterix-external-data/src/test/java/org/apache/asterix/external/classad/test/ClassAdToADMTest.java
index d822310..354aedb 100644
--- 
a/asterix-external-data/src/test/java/org/apache/asterix/external/classad/test/ClassAdToADMTest.java
+++ 
b/asterix-external-data/src/test/java/org/apache/asterix/external/classad/test/ClassAdToADMTest.java
@@ -73,7 +73,7 @@
                         new FileSplit[] { new FileSplit("",
                                 new 
FileReference(Paths.get(getClass().getResource(path).toURI()).toFile())) },
                         null, null, 0, null, false);
-                SemiStructuredRecordReader recordReader = new 
SemiStructuredRecordReader(in, null, "[", "]");
+                SemiStructuredRecordReader recordReader = new 
SemiStructuredRecordReader(in, "[", "]");
                 Value val = new Value(objectPool);
                 while (recordReader.hasNext()) {
                     val.reset();
diff --git 
a/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java
 
b/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java
index 851a7e0..fc6e725 100644
--- 
a/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java
+++ 
b/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java
@@ -86,7 +86,7 @@
                     null, null, 0, null, false);
 
             // create reader record reader
-            QuotedLineRecordReader lineReader = new 
QuotedLineRecordReader(true, inputStream, null,
+            QuotedLineRecordReader lineReader = new 
QuotedLineRecordReader(true, inputStream,
                     ExternalDataConstants.DEFAULT_QUOTE);
             // create csv with json record reader
             CSVToRecordWithMetadataAndPKConverter recordConverter = new 
CSVToRecordWithMetadataAndPKConverter(
diff --git 
a/asterix-installer/src/test/resources/transactionts/queries/query_after_restart/external_index/external_index.2.ddl.aql
 
b/asterix-installer/src/test/resources/transactionts/queries/query_after_restart/external_index/external_index.2.ddl.aql
index 7aa1129..e6b7e21 100644
--- 
a/asterix-installer/src/test/resources/transactionts/queries/query_after_restart/external_index/external_index.2.ddl.aql
+++ 
b/asterix-installer/src/test/resources/transactionts/queries/query_after_restart/external_index/external_index.2.ddl.aql
@@ -36,7 +36,11 @@
 
 create external dataset EmployeeDataset(EmployeeType)
 using hdfs
-(("hdfs"="hdfs://127.0.0.1:31888"),("path"="/asterix/external-indexing-test.txt"),("input-format"="text-input-format"),("format"="delimited-text"),("delimiter"="|"));
+(("hdfs"="hdfs://127.0.0.1:31888"),
+("path"="/asterix/external-indexing-test.txt"),
+("input-format"="text-input-format"),
+("format"="delimited-text"),
+("delimiter"="|"));
 
 create index EmployeeAgeIdx on EmployeeDataset(age);
 

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/767
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ib4aac833e30bd7c5a7706f5c8116383c2362c964
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <[email protected]>

Reply via email to