abdullah alamoudi has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/529

Change subject: ASTERIXDB-1044 Allow Reading large records from HDFS
......................................................................

ASTERIXDB-1044 Allow Reading large records from HDFS

This change enable HDFS input stream to stream records larger than 8192.
Before this change, the stream will return 0 bytes if the record doesn't
fit in the reader buffer. The reader used by the delimited data parser
throws an exception if it gets back 0 bytes. With this change, a record
is streamed to the reader in multiple read calls. A test case was added
for this case.

Change-Id: I3cf52be4bc0fd8af2555062eeb421d7235088d98
---
M asterix-app/data/hdfs/external-indexing-test.txt
M asterix-app/src/test/resources/runtimets/testsuite.xml
M 
asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedRuntimeInputHandler.java
M 
asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/AbstractHDFSLookupInputStream.java
M 
asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/TextualDataReader.java
M 
asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/TextualFullScanDataReader.java
6 files changed, 72 insertions(+), 48 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/29/529/1

diff --git a/asterix-app/data/hdfs/external-indexing-test.txt 
b/asterix-app/data/hdfs/external-indexing-test.txt
index 555272e..bc2dc1b 100644
--- a/asterix-app/data/hdfs/external-indexing-test.txt
+++ b/asterix-app/data/hdfs/external-indexing-test.txt
@@ -1,5 +1,5 @@
 1|Steve|50
-2|John|23
+2|JohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJoh
 
nJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohn
 
JohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJ
 
ohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJo
 
hnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJoh
 
nJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohn
 
JohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJ
 
ohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJo
 
hnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJoh
 
nJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohn|23
 3|Samuel|22
 4|Mary|29
 5|William|75
diff --git a/asterix-app/src/test/resources/runtimets/testsuite.xml 
b/asterix-app/src/test/resources/runtimets/testsuite.xml
index 430fd8e..f27744a 100644
--- a/asterix-app/src/test/resources/runtimets/testsuite.xml
+++ b/asterix-app/src/test/resources/runtimets/testsuite.xml
@@ -21,11 +21,7 @@
         <!ENTITY RecordsQueries SYSTEM "queries/records/RecordsQueries.xml">
 
         ]>
-<test-suite
-        xmlns="urn:xml.testframework.asterix.apache.org"
-        ResultOffsetPath="results"
-        QueryOffsetPath="queries"
-        QueryFileExtension=".aql">
+<test-suite xmlns="urn:xml.testframework.asterix.apache.org" 
ResultOffsetPath="results" QueryOffsetPath="queries" QueryFileExtension=".aql">
     <test-group name="flwor">
         <test-case FilePath="flwor">
             <compilation-unit name="at00">
@@ -1351,10 +1347,10 @@
         -->
     </test-group>
     <test-group name="dml">
-         <test-case FilePath="dml">
+        <test-case FilePath="dml">
             <compilation-unit name="insert-duplicated-keys-from-query">
                 <output-dir 
compare="Text">insert-duplicated-keys-from-query</output-dir>
-                
<expected-error>org.apache.hyracks.storage.am.common.exceptions.TreeIndexDuplicateKeyException
+                
<expected-error>org.apache.hyracks.storage.am.common.exceptions.TreeIndexDuplicateKeyException
 
                 </expected-error>
             </compilation-unit>
         </test-case>
@@ -1506,7 +1502,7 @@
         <test-case FilePath="dml">
             <compilation-unit name="insert-duplicated-keys">
                 <output-dir compare="Text">insert-duplicated-keys</output-dir>
-                
<expected-error>org.apache.hyracks.storage.am.common.exceptions.TreeIndexDuplicateKeyException
+                
<expected-error>org.apache.hyracks.storage.am.common.exceptions.TreeIndexDuplicateKeyException
 
                 </expected-error>
             </compilation-unit>
         </test-case>
@@ -2962,7 +2958,7 @@
             <test-case FilePath="open-index-enforced/error-checking">
                 <compilation-unit name="enforced-field-name-collision">
                     <output-dir 
compare="Text">enforced-field-name-collision</output-dir>
-                    
<expected-error>org.apache.hyracks.algebricks.common.exceptions.AlgebricksException
+                    
<expected-error>org.apache.hyracks.algebricks.common.exceptions.AlgebricksException
 
                     </expected-error>
                 </compilation-unit>
             </test-case>
@@ -2975,14 +2971,14 @@
             <test-case FilePath="open-index-enforced/error-checking">
                 <compilation-unit name="missing-enforce-statement">
                     <output-dir 
compare="Text">missing-enforce-statement</output-dir>
-                    
<expected-error>org.apache.hyracks.algebricks.common.exceptions.AlgebricksException
+                    
<expected-error>org.apache.hyracks.algebricks.common.exceptions.AlgebricksException
 
                     </expected-error>
                 </compilation-unit>
             </test-case>
             <test-case FilePath="open-index-enforced/error-checking">
                 <compilation-unit name="index-on-closed-type">
                     <output-dir 
compare="Text">index-on-closed-type</output-dir>
-                    
<expected-error>org.apache.hyracks.algebricks.common.exceptions.AlgebricksException
+                    
<expected-error>org.apache.hyracks.algebricks.common.exceptions.AlgebricksException
 
                     </expected-error>
                 </compilation-unit>
             </test-case>
@@ -6190,10 +6186,9 @@
             </compilation-unit>
         </test-case>
         -->
-
     </test-group>
     <test-group name="hdfs">
-      <test-case FilePath="hdfs">
+        <test-case FilePath="hdfs">
             <compilation-unit name="hdfs_shortcircuit">
                 <output-dir compare="Text">hdfs_shortcircuit</output-dir>
             </compilation-unit>
@@ -6768,31 +6763,26 @@
                 <output-dir compare="Text">parse</output-dir>
             </compilation-unit>
         </test-case>
-
         <test-case FilePath="binary">
             <compilation-unit name="print">
                 <output-dir compare="Text">print</output-dir>
             </compilation-unit>
         </test-case>
-
         <test-case FilePath="binary">
             <compilation-unit name="concat">
                 <output-dir compare="Text">concat</output-dir>
             </compilation-unit>
         </test-case>
-
         <test-case FilePath="binary">
             <compilation-unit name="subbinary">
                 <output-dir compare="Text">subbinary</output-dir>
             </compilation-unit>
         </test-case>
-
         <test-case FilePath="binary">
             <compilation-unit name="find">
                 <output-dir compare="Text">find</output-dir>
             </compilation-unit>
         </test-case>
-
         <test-case FilePath="binary">
             <compilation-unit name="insert">
                 <output-dir compare="Text">insert</output-dir>
@@ -6876,4 +6866,4 @@
             </compilation-unit>
         </test-case>
     </test-group>
-</test-suite>
+</test-suite>
\ No newline at end of file
diff --git 
a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedRuntimeInputHandler.java
 
b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedRuntimeInputHandler.java
index 6642df1..6ff8741 100644
--- 
a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedRuntimeInputHandler.java
+++ 
b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedRuntimeInputHandler.java
@@ -245,8 +245,15 @@
         while (!frameProcessed) {
             try {
                 if (!bufferingEnabled) {
-                    coreOperator.nextFrame(frame); // synchronous
-                    mBuffer.sendReport(frame);
+                    if (frame != null) {
+                        coreOperator.nextFrame(frame); // synchronous
+                        mBuffer.sendReport(frame);
+                    } else {
+                        this.finished = true;
+                        synchronized (coreOperator) {
+                            coreOperator.notifyAll();
+                        }
+                    }
                 } else {
                     DataBucket bucket = pool.getDataBucket();
                     if (bucket != null) {
diff --git 
a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/AbstractHDFSLookupInputStream.java
 
b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/AbstractHDFSLookupInputStream.java
index 2bd2a95..aa70f69 100644
--- 
a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/AbstractHDFSLookupInputStream.java
+++ 
b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/AbstractHDFSLookupInputStream.java
@@ -36,6 +36,7 @@
 public abstract class AbstractHDFSLookupInputStream extends InputStream {
 
     protected String pendingValue = null;
+    private int pendingValuePosition = 0;
     protected FileSystem fs;
     protected int fileNumber = -1;
     protected int EOL = "\n".getBytes()[0];
@@ -53,12 +54,17 @@
     public int read(byte[] buffer, int offset, int len) throws IOException {
         if (pendingValue != null) {
             int size = pendingValue.length() + 1;
-            if (size > len) {
-                return 0;
+            if (size - pendingValuePosition > len) {
+                //copy partial record
+                System.arraycopy(pendingValue.getBytes(), 
pendingValuePosition, buffer, offset, len);
+                pendingValuePosition += len;
+                return len;
             }
-            System.arraycopy(pendingValue.getBytes(), 0, buffer, offset, 
pendingValue.length());
-            buffer[offset + pendingValue.length()] = (byte) EOL;
+            System.arraycopy(pendingValue.getBytes(), pendingValuePosition, 
buffer, offset,
+                    pendingValue.length() - pendingValuePosition);
+            buffer[offset + pendingValue.length() - pendingValuePosition] = 
(byte) EOL;
             pendingValue = null;
+            pendingValuePosition = 0;
             return size;
         }
         return -1;
diff --git 
a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/TextualDataReader.java
 
b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/TextualDataReader.java
index 797b961..36ff5d1 100644
--- 
a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/TextualDataReader.java
+++ 
b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/TextualDataReader.java
@@ -47,6 +47,7 @@
     private boolean hasMore = false;
     private int EOL = "\n".getBytes()[0];
     private Text pendingValue = null;
+    private int pendingValuePosition = 0;
     private int currentSplitIndex = 0;
     private String fileName;
     private long recordOffset;
@@ -182,13 +183,18 @@
         int numBytes = 0;
         if (pendingValue != null) {
             int sizeOfNextTuple = pendingValue.getLength() + 1;
-            if (sizeOfNextTuple > len) {
-                return 0;
+            if ((sizeOfNextTuple - pendingValuePosition) > len) {
+                //copy partial record
+                System.arraycopy(pendingValue.getBytes(), 
pendingValuePosition, buffer, offset, len);
+                pendingValuePosition += len;
+                return len;
             }
-            System.arraycopy(pendingValue.getBytes(), 0, buffer, offset + 
numBytes, pendingValue.getLength());
-            buffer[offset + numBytes + pendingValue.getLength()] = (byte) EOL;
-            numBytes += pendingValue.getLength() + 1;
+            System.arraycopy(pendingValue.getBytes(), pendingValuePosition, 
buffer, offset,
+                    pendingValue.getLength() - pendingValuePosition);
+            buffer[offset + pendingValue.getLength() - pendingValuePosition] = 
(byte) EOL;
+            numBytes += pendingValue.getLength() - pendingValuePosition + 1;
             pendingValue = null;
+            pendingValuePosition = 0;
             return numBytes;
         }
         if (numBytes < len) {
@@ -205,7 +211,10 @@
                         int sizeOfNextTuple = value.getLength() + 1;
                         if (numBytes + sizeOfNextTuple > len) {
                             pendingValue = value;
-                            return 0;
+                            //copy partial record
+                            System.arraycopy(pendingValue.getBytes(), 0, 
buffer, offset, len);
+                            pendingValuePosition += len;
+                            return len;
                         } else {
                             System.arraycopy(value.getBytes(), 0, buffer, 
offset + numBytes, value.getLength());
                             buffer[offset + numBytes + value.getLength()] = 
(byte) EOL;
@@ -218,9 +227,12 @@
             } else {
                 //return the value read
                 int sizeOfNextTuple = value.getLength() + 1;
-                if (numBytes + sizeOfNextTuple > len) {
+                if (sizeOfNextTuple > len) {
                     pendingValue = value;
-                    return 0;
+                    //copy partial record
+                    System.arraycopy(pendingValue.getBytes(), 0, buffer, 
offset, len);
+                    pendingValuePosition += len;
+                    return len;
                 } else {
                     System.arraycopy(value.getBytes(), 0, buffer, offset + 
numBytes, value.getLength());
                     buffer[offset + numBytes + value.getLength()] = (byte) EOL;
diff --git 
a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/TextualFullScanDataReader.java
 
b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/TextualFullScanDataReader.java
index 9fe09a2..7e8f49c 100644
--- 
a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/TextualFullScanDataReader.java
+++ 
b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/TextualFullScanDataReader.java
@@ -22,14 +22,13 @@
 import java.io.InputStream;
 
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.Counters.Counter;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.mapred.TextInputFormat;
-import org.apache.hadoop.mapred.Counters.Counter;
-
 import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
 
 public class TextualFullScanDataReader extends InputStream {
@@ -40,6 +39,7 @@
     private boolean hasMore = false;
     private int EOL = "\n".getBytes()[0];
     private Text pendingValue = null;
+    private int pendingValuePosition = 0;
     private int currentSplitIndex = 0;
     private boolean executed[];
     private InputSplit[] inputSplits;
@@ -85,7 +85,7 @@
                  */
                 reader = getRecordReader(currentSplitIndex);
                 key = reader.createKey();
-                value = (Text) reader.createValue();
+                value = reader.createValue();
                 return true;
             }
         }
@@ -104,13 +104,18 @@
         int numBytes = 0;
         if (pendingValue != null) {
             int sizeOfNextTuple = pendingValue.getLength() + 1;
-            if (sizeOfNextTuple > len) {
-                return 0;
+            if ((sizeOfNextTuple - pendingValuePosition) > len) {
+                //copy partial record
+                System.arraycopy(pendingValue.getBytes(), 
pendingValuePosition, buffer, offset, len);
+                pendingValuePosition += len;
+                return len;
             }
-            System.arraycopy(pendingValue.getBytes(), 0, buffer, offset + 
numBytes, pendingValue.getLength());
-            buffer[offset + numBytes + pendingValue.getLength()] = (byte) EOL;
-            numBytes += pendingValue.getLength() + 1;
+            System.arraycopy(pendingValue.getBytes(), pendingValuePosition, 
buffer, offset,
+                    pendingValue.getLength() - pendingValuePosition);
+            buffer[offset + pendingValue.getLength() - pendingValuePosition] = 
(byte) EOL;
+            numBytes += pendingValue.getLength() - pendingValuePosition + 1;
             pendingValue = null;
+            pendingValuePosition = 0;
         }
 
         while (numBytes < len) {
@@ -134,7 +139,13 @@
                 // we need to store this for a subsequent read call.
                 // and return this then.
                 pendingValue = value;
-                break;
+
+                //compute partial copy size
+                int copySize = len - numBytes;
+                //copy partial record
+                System.arraycopy(pendingValue.getBytes(), 
pendingValuePosition, buffer, offset + numBytes, copySize);
+                pendingValuePosition += copySize;
+                return len;
             } else {
                 System.arraycopy(value.getBytes(), 0, buffer, offset + 
numBytes, value.getLength());
                 buffer[offset + numBytes + value.getLength()] = (byte) EOL;
@@ -153,13 +164,11 @@
     private RecordReader getRecordReader(int splitIndex) throws IOException {
         if (conf.getInputFormat() instanceof SequenceFileInputFormat) {
             SequenceFileInputFormat format = (SequenceFileInputFormat) 
conf.getInputFormat();
-            RecordReader reader = 
format.getRecordReader((org.apache.hadoop.mapred.FileSplit) 
inputSplits[splitIndex],
-                    conf, getReporter());
+            RecordReader reader = 
format.getRecordReader(inputSplits[splitIndex], conf, getReporter());
             return reader;
         } else {
             TextInputFormat format = (TextInputFormat) conf.getInputFormat();
-            RecordReader reader = 
format.getRecordReader((org.apache.hadoop.mapred.FileSplit) 
inputSplits[splitIndex],
-                    conf, getReporter());
+            RecordReader reader = 
format.getRecordReader(inputSplits[splitIndex], conf, getReporter());
             return reader;
         }
     }
@@ -206,4 +215,4 @@
 
         return reporter;
     }
-}
+}
\ No newline at end of file

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/529
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I3cf52be4bc0fd8af2555062eeb421d7235088d98
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <[email protected]>

Reply via email to