chenshangan created GOBBLIN-500: ----------------------------------- Summary: Failed to use the correct previous offset when OffsetOutOfRange Key: GOBBLIN-500 URL: https://issues.apache.org/jira/browse/GOBBLIN-500 Project: Apache Gobblin Issue Type: Bug Components: gobblin-kafka Reporter: chenshangan Assignee: Shirshanka Das
when previous offset is out of range, offsets.startAt(previousOffset) will throw StartOffsetOutOfRangeException, thus the startOffset in offsets won't be updated. So we could not use offsets.getStartOffset instead of previous offset in the following logic. {code:java} try { offsets.startAt(previousOffset); } catch (StartOffsetOutOfRangeException e) { // Increment counts, which will be reported as job metrics if (offsets.getStartOffset() <= offsets.getLatestOffset()) { this.offsetTooEarlyCount.incrementAndGet(); } else { this.offsetTooLateCount.incrementAndGet(); } // When previous offset is out of range, either start at earliest, latest or nearest offset, or skip the // partition. If skipping, need to create an empty workunit so that previousOffset is persisted. String offsetOutOfRangeMsg = String.format( "Start offset for partition %s is out of range. Start offset = %d, earliest offset = %d, latest offset = %d.", partition, offsets.getStartOffset(), offsets.getEarliestOffset(), offsets.getLatestOffset()); String offsetOption = state.getProp(RESET_ON_OFFSET_OUT_OF_RANGE, DEFAULT_RESET_ON_OFFSET_OUT_OF_RANGE).toLowerCase(); if (offsetOption.equals(LATEST_OFFSET) || (offsetOption.equals(NEAREST_OFFSET) && offsets.getStartOffset() >= offsets.getLatestOffset())) { LOG.warn( offsetOutOfRangeMsg + "This partition will start from the latest offset: " + offsets.getLatestOffset()); offsets.startAtLatestOffset(); } else if (offsetOption.equals(EARLIEST_OFFSET) || offsetOption.equals(NEAREST_OFFSET)) { LOG.warn(offsetOutOfRangeMsg + "This partition will start from the earliest offset: " + offsets.getEarliestOffset()); offsets.startAtEarliestOffset(); } else { LOG.warn(offsetOutOfRangeMsg + "This partition will be skipped."); return createEmptyWorkUnit(partition, previousOffset, topicSpecificState); } } {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)