Xikui Wang has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/1713
Change subject: Fix continue ingestion bug when exception happens
......................................................................
Fix continue ingestion bug when exception happens
1. Fix the bug when exception happens, localfs adapter couldn't pick up
a new file to continue the ingestion.
2. Change the exception handling from string to error code.
Change-Id: Ie8656a4d1afabbc1b481eb97509a861b22478676
---
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
M asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java
5 files changed, 11 insertions(+), 7 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/13/1713/1
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index 9de9dde..702cb0a 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -117,7 +117,7 @@
public static final int
INPUT_RECORD_RECORD_WITH_METADATA_AND_PK_NULL_IN_NON_OPTIONAL = 3018;
public static final int
INPUT_RECORD_RECORD_WITH_METADATA_AND_PK_CANNT_GET_PKEY = 3019;
public static final int FEED_CHANGE_FEED_CONNECTIVITY_ON_ALIVE_FEED = 3020;
- public static final int
FEED_MANAGEMENT_FEED_EVENT_REGISTER_INTAKE_JOB_FAIL = 3021;
+ public static final int RECORD_READER_MALFORMED_INPUT_STREAM = 3021;
public static final int PROVIDER_DATAFLOW_CONTROLLER_UNKNOWN_DATA_SOURCE =
3022;
public static final int
PROVIDER_DATASOURCE_FACTORY_UNKNOWN_INPUT_STREAM_FACTORY = 3023;
public static final int
UTIL_EXTERNAL_DATA_UTILS_FAIL_CREATE_STREAM_FACTORY = 3024;
diff --git
a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index b6423f6..fa64037 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -103,7 +103,7 @@
3018 = Field %1$s of meta record is not an optional type so it cannot accept
null value.
3019 = Can't get PK from record part
3020 = This operation cannot be done when Feed %1$s is alive.
-3021 = Could not register feed intake job [%1$s] for feed %2$s
+3021 = Malformed Input Stream.
3022 = Unknown data source type: %1$s
3023 = Unknown input stream factory: %1$s
3024 = Failed to create stream factory
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index 1b12dc1..d01859e 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -177,7 +177,7 @@
if (!recordReader.handleException(th)) {
finish();
}
- return closed.get();
+ return !closed.get();
}
public IRecordReader<T> getReader() {
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
index 4d6d004..7614e6e 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
@@ -20,7 +20,9 @@
import java.io.IOException;
+import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.ExceptionUtils;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -101,7 +103,7 @@
// corrupted file. clear the buffer and stop reading
reader.reset();
bufferPosn = bufferLength = 0;
- throw new IOException("Malformed input stream");
+ throw new
RuntimeDataException(ErrorCode.RECORD_READER_MALFORMED_INPUT_STREAM);
}
}
}
@@ -141,7 +143,7 @@
} catch (IOException e) {
reader.reset();
bufferPosn = bufferLength = 0;
- throw new IOException("Malformed input stream");
+ throw new
RuntimeDataException(ErrorCode.RECORD_READER_MALFORMED_INPUT_STREAM);
}
}
} while (!hasFinished);
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java
index 3c3b8fb..eb099f1 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java
@@ -22,6 +22,8 @@
import java.io.FileInputStream;
import java.io.IOException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
import org.apache.asterix.external.util.ExternalDataConstants;
@@ -154,8 +156,8 @@
return false;
}
if (th instanceof IOException) {
- // TODO: Change from string check to exception type
- if (th.getCause().getMessage().contains("Malformed input stream"))
{
+ if (th instanceof RuntimeDataException
+ && ((RuntimeDataException) th).getErrorCode() ==
ErrorCode.RECORD_READER_MALFORMED_INPUT_STREAM) {
if (currentFile != null) {
try {
logManager.logRecord(currentFile.getAbsolutePath(),
"Corrupted input file");
--
To view, visit https://asterix-gerrit.ics.uci.edu/1713
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ie8656a4d1afabbc1b481eb97509a861b22478676
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Xikui Wang <[email protected]>