abdullah alamoudi has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/529
Change subject: ASTERIXDB-1044 Allow Reading large records from HDFS
......................................................................
ASTERIXDB-1044 Allow Reading large records from HDFS
This change enable HDFS input stream to stream records larger than 8192.
Before this change, the stream will return 0 bytes if the record doesn't
fit in the reader buffer. The reader used by the delimited data parser
throws an exception if it gets back 0 bytes. With this change, a record
is streamed to the reader in multiple read calls. A test case was added
for this case.
Change-Id: I3cf52be4bc0fd8af2555062eeb421d7235088d98
---
M asterix-app/data/hdfs/external-indexing-test.txt
M asterix-app/src/test/resources/runtimets/testsuite.xml
M
asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedRuntimeInputHandler.java
M
asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/AbstractHDFSLookupInputStream.java
M
asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/TextualDataReader.java
M
asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/TextualFullScanDataReader.java
6 files changed, 72 insertions(+), 48 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/29/529/1
diff --git a/asterix-app/data/hdfs/external-indexing-test.txt
b/asterix-app/data/hdfs/external-indexing-test.txt
index 555272e..bc2dc1b 100644
--- a/asterix-app/data/hdfs/external-indexing-test.txt
+++ b/asterix-app/data/hdfs/external-indexing-test.txt
@@ -1,5 +1,5 @@
1|Steve|50
-2|John|23
+2|JohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJoh
nJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohn
JohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJ
ohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJo
hnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJoh
nJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohn
JohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJ
ohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJo
hnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJoh
nJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohnJohn|23
3|Samuel|22
4|Mary|29
5|William|75
diff --git a/asterix-app/src/test/resources/runtimets/testsuite.xml
b/asterix-app/src/test/resources/runtimets/testsuite.xml
index 430fd8e..f27744a 100644
--- a/asterix-app/src/test/resources/runtimets/testsuite.xml
+++ b/asterix-app/src/test/resources/runtimets/testsuite.xml
@@ -21,11 +21,7 @@
<!ENTITY RecordsQueries SYSTEM "queries/records/RecordsQueries.xml">
]>
-<test-suite
- xmlns="urn:xml.testframework.asterix.apache.org"
- ResultOffsetPath="results"
- QueryOffsetPath="queries"
- QueryFileExtension=".aql">
+<test-suite xmlns="urn:xml.testframework.asterix.apache.org"
ResultOffsetPath="results" QueryOffsetPath="queries" QueryFileExtension=".aql">
<test-group name="flwor">
<test-case FilePath="flwor">
<compilation-unit name="at00">
@@ -1351,10 +1347,10 @@
-->
</test-group>
<test-group name="dml">
- <test-case FilePath="dml">
+ <test-case FilePath="dml">
<compilation-unit name="insert-duplicated-keys-from-query">
<output-dir
compare="Text">insert-duplicated-keys-from-query</output-dir>
-
<expected-error>org.apache.hyracks.storage.am.common.exceptions.TreeIndexDuplicateKeyException
+
<expected-error>org.apache.hyracks.storage.am.common.exceptions.TreeIndexDuplicateKeyException
</expected-error>
</compilation-unit>
</test-case>
@@ -1506,7 +1502,7 @@
<test-case FilePath="dml">
<compilation-unit name="insert-duplicated-keys">
<output-dir compare="Text">insert-duplicated-keys</output-dir>
-
<expected-error>org.apache.hyracks.storage.am.common.exceptions.TreeIndexDuplicateKeyException
+
<expected-error>org.apache.hyracks.storage.am.common.exceptions.TreeIndexDuplicateKeyException
</expected-error>
</compilation-unit>
</test-case>
@@ -2962,7 +2958,7 @@
<test-case FilePath="open-index-enforced/error-checking">
<compilation-unit name="enforced-field-name-collision">
<output-dir
compare="Text">enforced-field-name-collision</output-dir>
-
<expected-error>org.apache.hyracks.algebricks.common.exceptions.AlgebricksException
+
<expected-error>org.apache.hyracks.algebricks.common.exceptions.AlgebricksException
</expected-error>
</compilation-unit>
</test-case>
@@ -2975,14 +2971,14 @@
<test-case FilePath="open-index-enforced/error-checking">
<compilation-unit name="missing-enforce-statement">
<output-dir
compare="Text">missing-enforce-statement</output-dir>
-
<expected-error>org.apache.hyracks.algebricks.common.exceptions.AlgebricksException
+
<expected-error>org.apache.hyracks.algebricks.common.exceptions.AlgebricksException
</expected-error>
</compilation-unit>
</test-case>
<test-case FilePath="open-index-enforced/error-checking">
<compilation-unit name="index-on-closed-type">
<output-dir
compare="Text">index-on-closed-type</output-dir>
-
<expected-error>org.apache.hyracks.algebricks.common.exceptions.AlgebricksException
+
<expected-error>org.apache.hyracks.algebricks.common.exceptions.AlgebricksException
</expected-error>
</compilation-unit>
</test-case>
@@ -6190,10 +6186,9 @@
</compilation-unit>
</test-case>
-->
-
</test-group>
<test-group name="hdfs">
- <test-case FilePath="hdfs">
+ <test-case FilePath="hdfs">
<compilation-unit name="hdfs_shortcircuit">
<output-dir compare="Text">hdfs_shortcircuit</output-dir>
</compilation-unit>
@@ -6768,31 +6763,26 @@
<output-dir compare="Text">parse</output-dir>
</compilation-unit>
</test-case>
-
<test-case FilePath="binary">
<compilation-unit name="print">
<output-dir compare="Text">print</output-dir>
</compilation-unit>
</test-case>
-
<test-case FilePath="binary">
<compilation-unit name="concat">
<output-dir compare="Text">concat</output-dir>
</compilation-unit>
</test-case>
-
<test-case FilePath="binary">
<compilation-unit name="subbinary">
<output-dir compare="Text">subbinary</output-dir>
</compilation-unit>
</test-case>
-
<test-case FilePath="binary">
<compilation-unit name="find">
<output-dir compare="Text">find</output-dir>
</compilation-unit>
</test-case>
-
<test-case FilePath="binary">
<compilation-unit name="insert">
<output-dir compare="Text">insert</output-dir>
@@ -6876,4 +6866,4 @@
</compilation-unit>
</test-case>
</test-group>
-</test-suite>
+</test-suite>
\ No newline at end of file
diff --git
a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedRuntimeInputHandler.java
b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedRuntimeInputHandler.java
index 6642df1..6ff8741 100644
---
a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedRuntimeInputHandler.java
+++
b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedRuntimeInputHandler.java
@@ -245,8 +245,15 @@
while (!frameProcessed) {
try {
if (!bufferingEnabled) {
- coreOperator.nextFrame(frame); // synchronous
- mBuffer.sendReport(frame);
+ if (frame != null) {
+ coreOperator.nextFrame(frame); // synchronous
+ mBuffer.sendReport(frame);
+ } else {
+ this.finished = true;
+ synchronized (coreOperator) {
+ coreOperator.notifyAll();
+ }
+ }
} else {
DataBucket bucket = pool.getDataBucket();
if (bucket != null) {
diff --git
a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/AbstractHDFSLookupInputStream.java
b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/AbstractHDFSLookupInputStream.java
index 2bd2a95..aa70f69 100644
---
a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/AbstractHDFSLookupInputStream.java
+++
b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/AbstractHDFSLookupInputStream.java
@@ -36,6 +36,7 @@
public abstract class AbstractHDFSLookupInputStream extends InputStream {
protected String pendingValue = null;
+ private int pendingValuePosition = 0;
protected FileSystem fs;
protected int fileNumber = -1;
protected int EOL = "\n".getBytes()[0];
@@ -53,12 +54,17 @@
public int read(byte[] buffer, int offset, int len) throws IOException {
if (pendingValue != null) {
int size = pendingValue.length() + 1;
- if (size > len) {
- return 0;
+ if (size - pendingValuePosition > len) {
+ //copy partial record
+ System.arraycopy(pendingValue.getBytes(),
pendingValuePosition, buffer, offset, len);
+ pendingValuePosition += len;
+ return len;
}
- System.arraycopy(pendingValue.getBytes(), 0, buffer, offset,
pendingValue.length());
- buffer[offset + pendingValue.length()] = (byte) EOL;
+ System.arraycopy(pendingValue.getBytes(), pendingValuePosition,
buffer, offset,
+ pendingValue.length() - pendingValuePosition);
+ buffer[offset + pendingValue.length() - pendingValuePosition] =
(byte) EOL;
pendingValue = null;
+ pendingValuePosition = 0;
return size;
}
return -1;
diff --git
a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/TextualDataReader.java
b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/TextualDataReader.java
index 797b961..36ff5d1 100644
---
a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/TextualDataReader.java
+++
b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/TextualDataReader.java
@@ -47,6 +47,7 @@
private boolean hasMore = false;
private int EOL = "\n".getBytes()[0];
private Text pendingValue = null;
+ private int pendingValuePosition = 0;
private int currentSplitIndex = 0;
private String fileName;
private long recordOffset;
@@ -182,13 +183,18 @@
int numBytes = 0;
if (pendingValue != null) {
int sizeOfNextTuple = pendingValue.getLength() + 1;
- if (sizeOfNextTuple > len) {
- return 0;
+ if ((sizeOfNextTuple - pendingValuePosition) > len) {
+ //copy partial record
+ System.arraycopy(pendingValue.getBytes(),
pendingValuePosition, buffer, offset, len);
+ pendingValuePosition += len;
+ return len;
}
- System.arraycopy(pendingValue.getBytes(), 0, buffer, offset +
numBytes, pendingValue.getLength());
- buffer[offset + numBytes + pendingValue.getLength()] = (byte) EOL;
- numBytes += pendingValue.getLength() + 1;
+ System.arraycopy(pendingValue.getBytes(), pendingValuePosition,
buffer, offset,
+ pendingValue.getLength() - pendingValuePosition);
+ buffer[offset + pendingValue.getLength() - pendingValuePosition] =
(byte) EOL;
+ numBytes += pendingValue.getLength() - pendingValuePosition + 1;
pendingValue = null;
+ pendingValuePosition = 0;
return numBytes;
}
if (numBytes < len) {
@@ -205,7 +211,10 @@
int sizeOfNextTuple = value.getLength() + 1;
if (numBytes + sizeOfNextTuple > len) {
pendingValue = value;
- return 0;
+ //copy partial record
+ System.arraycopy(pendingValue.getBytes(), 0,
buffer, offset, len);
+ pendingValuePosition += len;
+ return len;
} else {
System.arraycopy(value.getBytes(), 0, buffer,
offset + numBytes, value.getLength());
buffer[offset + numBytes + value.getLength()] =
(byte) EOL;
@@ -218,9 +227,12 @@
} else {
//return the value read
int sizeOfNextTuple = value.getLength() + 1;
- if (numBytes + sizeOfNextTuple > len) {
+ if (sizeOfNextTuple > len) {
pendingValue = value;
- return 0;
+ //copy partial record
+ System.arraycopy(pendingValue.getBytes(), 0, buffer,
offset, len);
+ pendingValuePosition += len;
+ return len;
} else {
System.arraycopy(value.getBytes(), 0, buffer, offset +
numBytes, value.getLength());
buffer[offset + numBytes + value.getLength()] = (byte) EOL;
diff --git
a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/TextualFullScanDataReader.java
b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/TextualFullScanDataReader.java
index 9fe09a2..7e8f49c 100644
---
a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/TextualFullScanDataReader.java
+++
b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/TextualFullScanDataReader.java
@@ -22,14 +22,13 @@
import java.io.InputStream;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.TextInputFormat;
-import org.apache.hadoop.mapred.Counters.Counter;
-
import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
public class TextualFullScanDataReader extends InputStream {
@@ -40,6 +39,7 @@
private boolean hasMore = false;
private int EOL = "\n".getBytes()[0];
private Text pendingValue = null;
+ private int pendingValuePosition = 0;
private int currentSplitIndex = 0;
private boolean executed[];
private InputSplit[] inputSplits;
@@ -85,7 +85,7 @@
*/
reader = getRecordReader(currentSplitIndex);
key = reader.createKey();
- value = (Text) reader.createValue();
+ value = reader.createValue();
return true;
}
}
@@ -104,13 +104,18 @@
int numBytes = 0;
if (pendingValue != null) {
int sizeOfNextTuple = pendingValue.getLength() + 1;
- if (sizeOfNextTuple > len) {
- return 0;
+ if ((sizeOfNextTuple - pendingValuePosition) > len) {
+ //copy partial record
+ System.arraycopy(pendingValue.getBytes(),
pendingValuePosition, buffer, offset, len);
+ pendingValuePosition += len;
+ return len;
}
- System.arraycopy(pendingValue.getBytes(), 0, buffer, offset +
numBytes, pendingValue.getLength());
- buffer[offset + numBytes + pendingValue.getLength()] = (byte) EOL;
- numBytes += pendingValue.getLength() + 1;
+ System.arraycopy(pendingValue.getBytes(), pendingValuePosition,
buffer, offset,
+ pendingValue.getLength() - pendingValuePosition);
+ buffer[offset + pendingValue.getLength() - pendingValuePosition] =
(byte) EOL;
+ numBytes += pendingValue.getLength() - pendingValuePosition + 1;
pendingValue = null;
+ pendingValuePosition = 0;
}
while (numBytes < len) {
@@ -134,7 +139,13 @@
// we need to store this for a subsequent read call.
// and return this then.
pendingValue = value;
- break;
+
+ //compute partial copy size
+ int copySize = len - numBytes;
+ //copy partial record
+ System.arraycopy(pendingValue.getBytes(),
pendingValuePosition, buffer, offset + numBytes, copySize);
+ pendingValuePosition += copySize;
+ return len;
} else {
System.arraycopy(value.getBytes(), 0, buffer, offset +
numBytes, value.getLength());
buffer[offset + numBytes + value.getLength()] = (byte) EOL;
@@ -153,13 +164,11 @@
private RecordReader getRecordReader(int splitIndex) throws IOException {
if (conf.getInputFormat() instanceof SequenceFileInputFormat) {
SequenceFileInputFormat format = (SequenceFileInputFormat)
conf.getInputFormat();
- RecordReader reader =
format.getRecordReader((org.apache.hadoop.mapred.FileSplit)
inputSplits[splitIndex],
- conf, getReporter());
+ RecordReader reader =
format.getRecordReader(inputSplits[splitIndex], conf, getReporter());
return reader;
} else {
TextInputFormat format = (TextInputFormat) conf.getInputFormat();
- RecordReader reader =
format.getRecordReader((org.apache.hadoop.mapred.FileSplit)
inputSplits[splitIndex],
- conf, getReporter());
+ RecordReader reader =
format.getRecordReader(inputSplits[splitIndex], conf, getReporter());
return reader;
}
}
@@ -206,4 +215,4 @@
return reporter;
}
-}
+}
\ No newline at end of file
--
To view, visit https://asterix-gerrit.ics.uci.edu/529
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I3cf52be4bc0fd8af2555062eeb421d7235088d98
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <[email protected]>