Repository: drill Updated Branches: refs/heads/0.7.0 80fc97dec -> d925eab56 refs/heads/master cc3357997 -> d120c399d
DRILL-1843: Support per-batch schema change at RecordBatchLoader Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/08e555e6 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/08e555e6 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/08e555e6 Branch: refs/heads/master Commit: 08e555e6946a0ca927a6121e5799e482247eed35 Parents: e536d6b Author: Hanifi Gunes <hgu...@maprtech.com> Authored: Thu Dec 11 11:23:12 2014 -0800 Committer: Parth Chandra <pchan...@maprtech.com> Committed: Thu Dec 11 11:49:08 2014 -0800 ---------------------------------------------------------------------- .../drill/exec/record/RecordBatchLoader.java | 22 ++++++++++++-------- 1 file changed, 13 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/08e555e6/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java index 97756e2..088630c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java @@ -78,21 +78,25 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp int bufOffset = 0; for (SerializedField fmd : fields) { MaterializedField fieldDef = MaterializedField.create(fmd); - ValueVector v = oldFields.remove(fieldDef); - if(v == null) { - // if we arrive here, we didn't have a matching vector. + ValueVector vector = oldFields.remove(fieldDef); + + if (vector == null) { + schemaChanged = true; + vector = TypeHelper.getNewVector(fieldDef, allocator); + } else if (!vector.getField().getType().equals(fieldDef.getType())) { + // clear previous vector + vector.clear(); schemaChanged = true; - v = TypeHelper.getNewVector(fieldDef, allocator); + vector = TypeHelper.getNewVector(fieldDef, allocator); } + if (fmd.getValueCount() == 0 && (!fmd.hasGroupCount() || fmd.getGroupCount() == 0)) { -// v.clear(); -// v.load(fmd, allocator.buffer(8)); - AllocationHelper.allocate(v, 0, 0, 0); + AllocationHelper.allocate(vector, 0, 0, 0); } else { - v.load(fmd, buf.slice(bufOffset, fmd.getBufferLength())); + vector.load(fmd, buf.slice(bufOffset, fmd.getBufferLength())); } bufOffset += fmd.getBufferLength(); - newVectors.add(v); + newVectors.add(vector); } Preconditions.checkArgument(buf == null || bufOffset == buf.capacity());