PHOENIX-4955 - PhoenixIndexImportDirectMapper undercounts failed records
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/fc38ace7 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/fc38ace7 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/fc38ace7 Branch: refs/heads/omid2 Commit: fc38ace74e4926b25067ff7201631898eecaeb63 Parents: b493797 Author: Geoffrey Jacoby <gjac...@apache.org> Authored: Fri Nov 16 13:57:45 2018 -0800 Committer: Geoffrey Jacoby <gjac...@apache.org> Committed: Fri Nov 16 14:31:34 2018 -0800 ---------------------------------------------------------------------- .../mapreduce/index/PhoenixIndexImportDirectMapper.java | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/fc38ace7/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java index eb4bc0e..e2ac491 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java @@ -68,6 +68,8 @@ public class PhoenixIndexImportDirectMapper extends private long batchSizeBytes; private MutationState mutationState; + private int currentBatchCount = 0; + @Override protected void setup(final Context context) throws IOException, InterruptedException { @@ -113,6 +115,7 @@ public class PhoenixIndexImportDirectMapper extends throws IOException, InterruptedException { try { + currentBatchCount++; final List<Object> values = record.getValues(); indxWritable.setValues(values); indxWritable.write(this.pStatement); @@ -125,9 +128,8 @@ public class PhoenixIndexImportDirectMapper extends } // Keep accumulating Mutations till batch size mutationState.join(currentMutationState); - // Write Mutation Batch - if (context.getCounter(PhoenixJobCounters.INPUT_RECORDS).getValue() % batchSize == 0) { + if (currentBatchCount % batchSize == 0) { writeBatch(mutationState, context); mutationState = null; } @@ -136,7 +138,7 @@ public class PhoenixIndexImportDirectMapper extends context.progress(); } catch (SQLException e) { LOG.error(" Error {} while read/write of a record ", e.getMessage()); - context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(1); + context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(currentBatchCount); throw new RuntimeException(e); } context.getCounter(PhoenixJobCounters.INPUT_RECORDS).increment(1); @@ -157,6 +159,7 @@ public class PhoenixIndexImportDirectMapper extends mutationPair.getSecond().size()); } connection.rollback(); + currentBatchCount = 0; } @Override @@ -173,7 +176,7 @@ public class PhoenixIndexImportDirectMapper extends super.cleanup(context); } catch (SQLException e) { LOG.error(" Error {} while read/write of a record ", e.getMessage()); - context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(1); + context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(currentBatchCount); throw new RuntimeException(e); } finally { if (connection != null) {