PHOENIX-4955 - PhoenixIndexImportDirectMapper undercounts failed records

Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/fc38ace7
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/fc38ace7
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/fc38ace7

Branch: refs/heads/omid2
Commit: fc38ace74e4926b25067ff7201631898eecaeb63
Parents: b493797
Author: Geoffrey Jacoby <gjac...@apache.org>
Authored: Fri Nov 16 13:57:45 2018 -0800
Committer: Geoffrey Jacoby <gjac...@apache.org>
Committed: Fri Nov 16 14:31:34 2018 -0800

----------------------------------------------------------------------
 .../mapreduce/index/PhoenixIndexImportDirectMapper.java  | 11 +++++++----
 1 file changed, 7 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/fc38ace7/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java
index eb4bc0e..e2ac491 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java
@@ -68,6 +68,8 @@ public class PhoenixIndexImportDirectMapper extends
     private long batchSizeBytes;
 
     private MutationState mutationState;
+    private int currentBatchCount = 0;
+
 
     @Override
     protected void setup(final Context context) throws IOException, 
InterruptedException {
@@ -113,6 +115,7 @@ public class PhoenixIndexImportDirectMapper extends
             throws IOException, InterruptedException {
 
         try {
+            currentBatchCount++;
             final List<Object> values = record.getValues();
             indxWritable.setValues(values);
             indxWritable.write(this.pStatement);
@@ -125,9 +128,8 @@ public class PhoenixIndexImportDirectMapper extends
             }
             // Keep accumulating Mutations till batch size
             mutationState.join(currentMutationState);
-
             // Write Mutation Batch
-            if 
(context.getCounter(PhoenixJobCounters.INPUT_RECORDS).getValue() % batchSize == 
0) {
+            if (currentBatchCount % batchSize == 0) {
                 writeBatch(mutationState, context);
                 mutationState = null;
             }
@@ -136,7 +138,7 @@ public class PhoenixIndexImportDirectMapper extends
             context.progress();
         } catch (SQLException e) {
             LOG.error(" Error {}  while read/write of a record ", 
e.getMessage());
-            context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(1);
+            
context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(currentBatchCount);
             throw new RuntimeException(e);
         }
         context.getCounter(PhoenixJobCounters.INPUT_RECORDS).increment(1);
@@ -157,6 +159,7 @@ public class PhoenixIndexImportDirectMapper extends
                 mutationPair.getSecond().size());
         }
         connection.rollback();
+        currentBatchCount = 0;
     }
 
     @Override
@@ -173,7 +176,7 @@ public class PhoenixIndexImportDirectMapper extends
             super.cleanup(context);
         } catch (SQLException e) {
             LOG.error(" Error {}  while read/write of a record ", 
e.getMessage());
-            context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(1);
+            
context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(currentBatchCount);
             throw new RuntimeException(e);
         } finally {
             if (connection != null) {

Reply via email to