abdullah alamoudi has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/567
Change subject: Improvment on Cursor for Delimited Data
......................................................................
Improvment on Cursor for Delimited Data
This change allows the parser to parse records in addition to streams.
Change-Id: I84ff40db664633c633277e9cc0ffa534cda9f26a
---
M
hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ISerializerDeserializer.java
M
hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FieldCursorForDelimitedDataParser.java
M
hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/scheduler/Scheduler.java
M pom.xml
4 files changed, 102 insertions(+), 41 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/hyracks refs/changes/67/567/1
diff --git
a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ISerializerDeserializer.java
b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ISerializerDeserializer.java
index 7f287cc..a93de4c 100644
---
a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ISerializerDeserializer.java
+++
b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ISerializerDeserializer.java
@@ -43,7 +43,7 @@
* - Stream to write data to.
*/
public void serialize(T instance, DataOutput out) throws
HyracksDataException;
-
+
/*
* TODO: Add a new method:
* T deserialize(DataInput in, T mutable)
diff --git
a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FieldCursorForDelimitedDataParser.java
b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FieldCursorForDelimitedDataParser.java
index e7f53bc..d2be3a0 100644
---
a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FieldCursorForDelimitedDataParser.java
+++
b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FieldCursorForDelimitedDataParser.java
@@ -25,45 +25,51 @@
public class FieldCursorForDelimitedDataParser {
private enum State {
- INIT,
- IN_RECORD,
- EOR,
- CR,
- EOF
+ INIT, //initial state
+ IN_RECORD, //cursor is inside record
+ EOR, //cursor is at end of record
+ CR, //cursor at carriage return
+ EOF //end of stream reached
}
- // public variables will be used by delimited data parser
- public char[] buffer;
- public int fStart;
- public int fEnd;
- public int recordCount;
- public int fieldCount;
- public int doubleQuoteCount;
- public boolean isDoubleQuoteIncludedInThisField;
+ public char[] buffer; //buffer to holds the input coming form the
underlying input stream
+ public int fStart; //start position for field
+ public int fEnd; //end position for field
+ public int recordCount; //count of records
+ public int fieldCount; //count of fields in current record
+ public int doubleQuoteCount; //count of double quotes
+ public boolean isDoubleQuoteIncludedInThisField; //does current field
include double quotes
- private static final int INITIAL_BUFFER_SIZE = 4096;
- private static final int INCREMENT = 4096;
+ private static final int INITIAL_BUFFER_SIZE = 4096;//initial buffer size
+ private static final int INCREMENT = 4096; //increment size
- private final Reader in;
+ private Reader in; //the underlying buffer
- private int start;
- private int end;
- private State state;
+ private int start; //start of valid buffer area
+ private int end; //end of valid buffer area
+ private State state; //state (see states above)
- private int lastQuotePosition;
- private int lastDoubleQuotePosition;
- private int lastDelimiterPosition;
- private int quoteCount;
- private boolean startedQuote;
+ private int lastQuotePosition; //position of last quote
+ private int lastDoubleQuotePosition; //position of last double quote
+ private int lastDelimiterPosition; //position of last delimiter
+ private int quoteCount; //count of single quotes
+ private boolean startedQuote; //whether a quote has been started
- private char quote;
- private char fieldDelimiter;
+ private char quote; //the quote character
+ private char fieldDelimiter; //the delimiter
+ private boolean hasBuffer;
public FieldCursorForDelimitedDataParser(Reader in, char fieldDelimiter,
char quote) {
this.in = in;
- buffer = new char[INITIAL_BUFFER_SIZE];
+ if (in != null) {
+ buffer = new char[INITIAL_BUFFER_SIZE];
+ hasBuffer = true;
+ end = 0;
+ } else {
+ hasBuffer = false;
+ end = Integer.MAX_VALUE;
+ }
start = 0;
- end = 0;
state = State.INIT;
this.quote = quote;
this.fieldDelimiter = fieldDelimiter;
@@ -76,6 +82,30 @@
isDoubleQuoteIncludedInThisField = false;
recordCount = 0;
fieldCount = 0;
+ }
+
+ public String getRecordInfo() {
+ if (hasBuffer) {
+ return "Line Number:" + recordCount;
+ } else {
+ int length = 0;
+ while (true) {
+ length++;
+ if (buffer[length] == '\n' || buffer[length] == '\r') {
+ break;
+ }
+ }
+ return "Line Number:" + recordCount + ", Record:" +
String.copyValueOf(buffer, 0, length);
+ }
+ }
+
+ public void nextRecord(char[] buffer, int recordLength) throws IOException
{
+ recordCount++;
+ fieldCount = 0;
+ start = 0;
+ end = recordLength;
+ state = State.IN_RECORD;
+ this.buffer = buffer;
}
public boolean nextRecord() throws IOException {
@@ -224,12 +254,8 @@
startedQuote = true;
} else {
// In this case, we don't have a quote in the
beginning of a field.
- throw new IOException(
- "At record: "
- + recordCount
- + ", field#: "
- + fieldCount
- + " - a quote enclosing a
field needs to be placed in the beginning of that field.");
+ throw new IOException("At record: " +
recordCount + ", field#: " + fieldCount
+ + " - a quote enclosing a field needs
to be placed in the beginning of that field.");
}
}
// Check double quotes - "". We check [start != p-2]
@@ -362,4 +388,36 @@
}
}
}
-}
+
+ public void setInputStream(Reader reader, char[] buffer) {
+ this.in = reader;
+ this.buffer = buffer;
+ hasBuffer = true;
+ end = 0;
+ start = 0;
+ state = State.INIT;
+ lastDelimiterPosition = -99;
+ lastQuotePosition = -99;
+ lastDoubleQuotePosition = -99;
+ quoteCount = 0;
+ doubleQuoteCount = 0;
+ startedQuote = false;
+ isDoubleQuoteIncludedInThisField = false;
+ }
+
+ public void removeInputStream() {
+ this.in = null;
+ this.buffer = null;
+ hasBuffer = false;
+ end = Integer.MAX_VALUE;
+ start = 0;
+ state = State.INIT;
+ lastDelimiterPosition = -99;
+ lastQuotePosition = -99;
+ lastDoubleQuotePosition = -99;
+ quoteCount = 0;
+ doubleQuoteCount = 0;
+ startedQuote = false;
+ isDoubleQuoteIncludedInThisField = false;
+ }
+}
\ No newline at end of file
diff --git
a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/scheduler/Scheduler.java
b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/scheduler/Scheduler.java
index fa6ed72..c28c740 100644
---
a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/scheduler/Scheduler.java
+++
b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/scheduler/Scheduler.java
@@ -34,7 +34,6 @@
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapred.InputSplit;
-
import org.apache.hyracks.api.client.HyracksConnection;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.client.NodeControllerInfo;
@@ -48,7 +47,6 @@
* The scheduler conduct data-local scheduling for data reading on HDFS. This
* class works for Hadoop old API.
*/
-@SuppressWarnings("deprecation")
public class Scheduler {
private static final Logger LOGGER =
Logger.getLogger(Scheduler.class.getName());
@@ -75,6 +73,7 @@
* @param ncNameToNcInfos
* @throws HyracksException
*/
+
public Scheduler(String ipAddress, int port) throws HyracksException {
try {
IHyracksClientConnection hcc = new HyracksConnection(ipAddress,
port);
@@ -127,7 +126,8 @@
* the hyracks cluster toplogy
* @throws HyracksException
*/
- public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos,
ClusterTopology topology) throws HyracksException {
+ public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos,
ClusterTopology topology)
+ throws HyracksException {
this(ncNameToNcInfos);
this.ncCollectionBuilder = topology == null ? new
IPProximityNcCollectionBuilder()
: new RackAwareNcCollectionBuilder(topology);
@@ -274,8 +274,8 @@
* @throws UnknownHostException
*/
private void scheduleLocalSlots(InputSplit[] splits, int[] workloads,
String[] locations, int slots, Random random,
- boolean[] scheduled, final Map<String, IntWritable>
locationToNumSplits) throws IOException,
- UnknownHostException {
+ boolean[] scheduled, final Map<String, IntWritable>
locationToNumSplits)
+ throws IOException, UnknownHostException {
/** scheduling candidates will be ordered inversely according to their
popularity */
PriorityQueue<String> scheduleCadndiates = new
PriorityQueue<String>(3, new Comparator<String>() {
diff --git a/pom.xml b/pom.xml
index fbe936d..c7bd65e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -172,6 +172,9 @@
<exclude>**/target/**</exclude>
<exclude>**/output/**</exclude>
<exclude>**/*.iml</exclude>
+ <exclude>**/*.prefs</exclude>
+ <exclude>**/*.classpath</exclude>
+ <exclude>**/*.project</exclude>
</excludes>
</configuration>
</plugin>
--
To view, visit https://asterix-gerrit.ics.uci.edu/567
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I84ff40db664633c633277e9cc0ffa534cda9f26a
Gerrit-PatchSet: 1
Gerrit-Project: hyracks
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <[email protected]>