chenshangan created GOBBLIN-500:
-----------------------------------

             Summary: Failed to use the correct previous offset when 
OffsetOutOfRange
                 Key: GOBBLIN-500
                 URL: https://issues.apache.org/jira/browse/GOBBLIN-500
             Project: Apache Gobblin
          Issue Type: Bug
          Components: gobblin-kafka
            Reporter: chenshangan
            Assignee: Shirshanka Das


when previous offset is out of range, offsets.startAt(previousOffset) will 
throw StartOffsetOutOfRangeException, thus the startOffset in offsets won't be 
updated.

So we could not use offsets.getStartOffset instead of previous offset in the 
following logic.

 
{code:java}
try {
  offsets.startAt(previousOffset);
} catch (StartOffsetOutOfRangeException e) {

  // Increment counts, which will be reported as job metrics
  if (offsets.getStartOffset() <= offsets.getLatestOffset()) {
    this.offsetTooEarlyCount.incrementAndGet();
  } else {
    this.offsetTooLateCount.incrementAndGet();
  }

  // When previous offset is out of range, either start at earliest, latest or 
nearest offset, or skip the
  // partition. If skipping, need to create an empty workunit so that 
previousOffset is persisted.
  String offsetOutOfRangeMsg = String.format(
      "Start offset for partition %s is out of range. Start offset = %d, 
earliest offset = %d, latest offset = %d.",
      partition, offsets.getStartOffset(), offsets.getEarliestOffset(), 
offsets.getLatestOffset());
  String offsetOption =
      state.getProp(RESET_ON_OFFSET_OUT_OF_RANGE, 
DEFAULT_RESET_ON_OFFSET_OUT_OF_RANGE).toLowerCase();
  if (offsetOption.equals(LATEST_OFFSET)
      || (offsetOption.equals(NEAREST_OFFSET) && offsets.getStartOffset() >= 
offsets.getLatestOffset())) {
    LOG.warn(
        offsetOutOfRangeMsg + "This partition will start from the latest 
offset: " + offsets.getLatestOffset());
    offsets.startAtLatestOffset();
  } else if (offsetOption.equals(EARLIEST_OFFSET) || 
offsetOption.equals(NEAREST_OFFSET)) {
    LOG.warn(offsetOutOfRangeMsg + "This partition will start from the earliest 
offset: "
        + offsets.getEarliestOffset());
    offsets.startAtEarliestOffset();
  } else {
    LOG.warn(offsetOutOfRangeMsg + "This partition will be skipped.");
    return createEmptyWorkUnit(partition, previousOffset, topicSpecificState);
  }
}
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to