HIVE-16124: Drop the segments data as soon it is pushed to HDFS (Slim Bouguerra, reviewed by Jesus Camacho Rodriguez)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/33e7d606 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/33e7d606 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/33e7d606 Branch: refs/heads/hive-14535 Commit: 33e7d6066580c857b157e4ffe7f2b941e26e8b80 Parents: 1d159ff Author: Slim Bouguerra <slim.bougue...@gmail.com> Authored: Wed Mar 8 11:45:24 2017 +0000 Committer: Jesus Camacho Rodriguez <jcama...@apache.org> Committed: Wed Mar 8 11:45:35 2017 +0000 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/druid/io/DruidRecordWriter.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/33e7d606/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java ---------------------------------------------------------------------- diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java index e2c5b9d..8d22df6 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java @@ -158,6 +158,8 @@ public class DruidRecordWriter implements RecordWriter<NullWritable, DruidWritab new LinearShardSpec(currentOpenSegment.getShardSpec().getPartitionNum() + 1) ); pushSegments(Lists.newArrayList(currentOpenSegment)); + LOG.info("Creating new partition for segment {}, partition num {}", + retVal.getIdentifierAsString(), retVal.getShardSpec().getPartitionNum()); currentOpenSegment = retVal; return retVal; } @@ -169,6 +171,7 @@ public class DruidRecordWriter implements RecordWriter<NullWritable, DruidWritab new LinearShardSpec(0) ); pushSegments(Lists.newArrayList(currentOpenSegment)); + LOG.info("Creating segment {}", retVal.getIdentifierAsString()); currentOpenSegment = retVal; return retVal; } @@ -187,7 +190,6 @@ public class DruidRecordWriter implements RecordWriter<NullWritable, DruidWritab .makeSegmentDescriptorOutputPath(pushedSegment, segmentsDescriptorDir); DruidStorageHandlerUtils .writeSegmentDescriptor(fileSystem, pushedSegment, segmentDescriptorOutputPath); - LOG.info( String.format( "Pushed the segment [%s] and persisted the descriptor located at [%s]", @@ -217,6 +219,10 @@ public class DruidRecordWriter implements RecordWriter<NullWritable, DruidWritab Joiner.on(", ").join(pushedSegmentIdentifierHashSet) )); } + for (SegmentIdentifier dataSegmentId : segmentsToPush) { + LOG.info("Dropping segment {}", dataSegmentId.toString()); + appenderator.drop(dataSegmentId).get(); + } LOG.info(String.format("Published [%,d] segments.", segmentsToPush.size())); } catch (InterruptedException e) {