HIVE-16124: Drop the segments data as soon it is pushed to HDFS (Slim 
Bouguerra, reviewed by Jesus Camacho Rodriguez)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/33e7d606
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/33e7d606
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/33e7d606

Branch: refs/heads/hive-14535
Commit: 33e7d6066580c857b157e4ffe7f2b941e26e8b80
Parents: 1d159ff
Author: Slim Bouguerra <slim.bougue...@gmail.com>
Authored: Wed Mar 8 11:45:24 2017 +0000
Committer: Jesus Camacho Rodriguez <jcama...@apache.org>
Committed: Wed Mar 8 11:45:35 2017 +0000

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/druid/io/DruidRecordWriter.java   | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/33e7d606/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java
index e2c5b9d..8d22df6 100644
--- 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java
+++ 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java
@@ -158,6 +158,8 @@ public class DruidRecordWriter implements 
RecordWriter<NullWritable, DruidWritab
                 new 
LinearShardSpec(currentOpenSegment.getShardSpec().getPartitionNum() + 1)
         );
         pushSegments(Lists.newArrayList(currentOpenSegment));
+        LOG.info("Creating new partition for segment {}, partition num {}",
+                retVal.getIdentifierAsString(), 
retVal.getShardSpec().getPartitionNum());
         currentOpenSegment = retVal;
         return retVal;
       }
@@ -169,6 +171,7 @@ public class DruidRecordWriter implements 
RecordWriter<NullWritable, DruidWritab
               new LinearShardSpec(0)
       );
       pushSegments(Lists.newArrayList(currentOpenSegment));
+      LOG.info("Creating segment {}", retVal.getIdentifierAsString());
       currentOpenSegment = retVal;
       return retVal;
     }
@@ -187,7 +190,6 @@ public class DruidRecordWriter implements 
RecordWriter<NullWritable, DruidWritab
                 .makeSegmentDescriptorOutputPath(pushedSegment, 
segmentsDescriptorDir);
         DruidStorageHandlerUtils
                 .writeSegmentDescriptor(fileSystem, pushedSegment, 
segmentDescriptorOutputPath);
-
         LOG.info(
                 String.format(
                         "Pushed the segment [%s] and persisted the descriptor 
located at [%s]",
@@ -217,6 +219,10 @@ public class DruidRecordWriter implements 
RecordWriter<NullWritable, DruidWritab
                 Joiner.on(", ").join(pushedSegmentIdentifierHashSet)
         ));
       }
+      for (SegmentIdentifier dataSegmentId : segmentsToPush) {
+        LOG.info("Dropping segment {}", dataSegmentId.toString());
+        appenderator.drop(dataSegmentId).get();
+      }
 
       LOG.info(String.format("Published [%,d] segments.", 
segmentsToPush.size()));
     } catch (InterruptedException e) {

Reply via email to