abdullah alamoudi has submitted this change and it was merged. Change subject: Improvment on Cursor for Delimited Data ......................................................................
Improvment on Cursor for Delimited Data This change allows the parser to parse records in addition to streams. Change-Id: I84ff40db664633c633277e9cc0ffa534cda9f26a Reviewed-on: https://asterix-gerrit.ics.uci.edu/567 Tested-by: Jenkins <[email protected]> Reviewed-by: Till Westmann <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> --- M hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ISerializerDeserializer.java M hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FieldCursorForDelimitedDataParser.java M hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/scheduler/Scheduler.java M pom.xml 4 files changed, 52 insertions(+), 41 deletions(-) Approvals: Murtadha Hubail: Looks good to me, approved Till Westmann: Looks good to me, but someone else must approve Jenkins: Verified diff --git a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ISerializerDeserializer.java b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ISerializerDeserializer.java index 7f287cc..a93de4c 100644 --- a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ISerializerDeserializer.java +++ b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ISerializerDeserializer.java @@ -43,7 +43,7 @@ * - Stream to write data to. */ public void serialize(T instance, DataOutput out) throws HyracksDataException; - + /* * TODO: Add a new method: * T deserialize(DataInput in, T mutable) diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FieldCursorForDelimitedDataParser.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FieldCursorForDelimitedDataParser.java index e7f53bc..3d96224 100644 --- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FieldCursorForDelimitedDataParser.java +++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FieldCursorForDelimitedDataParser.java @@ -25,45 +25,48 @@ public class FieldCursorForDelimitedDataParser { private enum State { - INIT, - IN_RECORD, - EOR, - CR, - EOF + INIT, //initial state + IN_RECORD, //cursor is inside record + EOR, //cursor is at end of record + CR, //cursor at carriage return + EOF //end of stream reached } - // public variables will be used by delimited data parser - public char[] buffer; - public int fStart; - public int fEnd; - public int recordCount; - public int fieldCount; - public int doubleQuoteCount; - public boolean isDoubleQuoteIncludedInThisField; + public char[] buffer; //buffer to holds the input coming form the underlying input stream + public int fStart; //start position for field + public int fEnd; //end position for field + public int recordCount; //count of records + public int fieldCount; //count of fields in current record + public int doubleQuoteCount; //count of double quotes + public boolean isDoubleQuoteIncludedInThisField; //does current field include double quotes - private static final int INITIAL_BUFFER_SIZE = 4096; - private static final int INCREMENT = 4096; + private static final int INITIAL_BUFFER_SIZE = 4096;//initial buffer size + private static final int INCREMENT = 4096; //increment size - private final Reader in; + private Reader in; //the underlying buffer - private int start; - private int end; - private State state; + private int start; //start of valid buffer area + private int end; //end of valid buffer area + private State state; //state (see states above) - private int lastQuotePosition; - private int lastDoubleQuotePosition; - private int lastDelimiterPosition; - private int quoteCount; - private boolean startedQuote; + private int lastQuotePosition; //position of last quote + private int lastDoubleQuotePosition; //position of last double quote + private int lastDelimiterPosition; //position of last delimiter + private int quoteCount; //count of single quotes + private boolean startedQuote; //whether a quote has been started - private char quote; - private char fieldDelimiter; + private char quote; //the quote character + private char fieldDelimiter; //the delimiter public FieldCursorForDelimitedDataParser(Reader in, char fieldDelimiter, char quote) { this.in = in; - buffer = new char[INITIAL_BUFFER_SIZE]; + if (in != null) { + buffer = new char[INITIAL_BUFFER_SIZE]; + end = 0; + } else { + end = Integer.MAX_VALUE; + } start = 0; - end = 0; state = State.INIT; this.quote = quote; this.fieldDelimiter = fieldDelimiter; @@ -76,6 +79,15 @@ isDoubleQuoteIncludedInThisField = false; recordCount = 0; fieldCount = 0; + } + + public void nextRecord(char[] buffer, int recordLength) throws IOException { + recordCount++; + fieldCount = 0; + start = 0; + end = recordLength; + state = State.IN_RECORD; + this.buffer = buffer; } public boolean nextRecord() throws IOException { @@ -224,12 +236,8 @@ startedQuote = true; } else { // In this case, we don't have a quote in the beginning of a field. - throw new IOException( - "At record: " - + recordCount - + ", field#: " - + fieldCount - + " - a quote enclosing a field needs to be placed in the beginning of that field."); + throw new IOException("At record: " + recordCount + ", field#: " + fieldCount + + " - a quote enclosing a field needs to be placed in the beginning of that field."); } } // Check double quotes - "". We check [start != p-2] @@ -362,4 +370,4 @@ } } } -} +} \ No newline at end of file diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/scheduler/Scheduler.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/scheduler/Scheduler.java index fa6ed72..c28c740 100644 --- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/scheduler/Scheduler.java +++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/scheduler/Scheduler.java @@ -34,7 +34,6 @@ import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapred.InputSplit; - import org.apache.hyracks.api.client.HyracksConnection; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.client.NodeControllerInfo; @@ -48,7 +47,6 @@ * The scheduler conduct data-local scheduling for data reading on HDFS. This * class works for Hadoop old API. */ -@SuppressWarnings("deprecation") public class Scheduler { private static final Logger LOGGER = Logger.getLogger(Scheduler.class.getName()); @@ -75,6 +73,7 @@ * @param ncNameToNcInfos * @throws HyracksException */ + public Scheduler(String ipAddress, int port) throws HyracksException { try { IHyracksClientConnection hcc = new HyracksConnection(ipAddress, port); @@ -127,7 +126,8 @@ * the hyracks cluster toplogy * @throws HyracksException */ - public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos, ClusterTopology topology) throws HyracksException { + public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos, ClusterTopology topology) + throws HyracksException { this(ncNameToNcInfos); this.ncCollectionBuilder = topology == null ? new IPProximityNcCollectionBuilder() : new RackAwareNcCollectionBuilder(topology); @@ -274,8 +274,8 @@ * @throws UnknownHostException */ private void scheduleLocalSlots(InputSplit[] splits, int[] workloads, String[] locations, int slots, Random random, - boolean[] scheduled, final Map<String, IntWritable> locationToNumSplits) throws IOException, - UnknownHostException { + boolean[] scheduled, final Map<String, IntWritable> locationToNumSplits) + throws IOException, UnknownHostException { /** scheduling candidates will be ordered inversely according to their popularity */ PriorityQueue<String> scheduleCadndiates = new PriorityQueue<String>(3, new Comparator<String>() { diff --git a/pom.xml b/pom.xml index fbe936d..90f80a1 100644 --- a/pom.xml +++ b/pom.xml @@ -172,6 +172,9 @@ <exclude>**/target/**</exclude> <exclude>**/output/**</exclude> <exclude>**/*.iml</exclude> + <exclude>**/*.prefs</exclude> + <exclude>**/.classpath</exclude> + <exclude>**/.project</exclude> </excludes> </configuration> </plugin> -- To view, visit https://asterix-gerrit.ics.uci.edu/567 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I84ff40db664633c633277e9cc0ffa534cda9f26a Gerrit-PatchSet: 4 Gerrit-Project: hyracks Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]> Gerrit-Reviewer: abdullah alamoudi <[email protected]>
