lianetm commented on code in PR #17972:
URL: https://github.com/apache/kafka/pull/17972#discussion_r1862680816
##########
clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java:
##########
@@ -610,6 +616,10 @@ private void resetOffsetPosition(TopicPartition tp) {
offset = endOffsets.get(tp);
if (offset == null)
throw new IllegalStateException("MockConsumer didn't have end
offset specified, but tried to seek to end");
+ } else if (strategy.timestamp().isPresent()) {
Review Comment:
I expect that effectively this condition will be true here only if strategy
== BY_DURATION (because we check for earliest and latest before, which are the
other ones that could have timestamp). If my understanding is right, couldn't
we check for the strategy type being BY_DURATION here? (seem clearer, assuming
that BY_DURATION will always have a duration after the validations)
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AutoOffsetResetStrategy.java:
##########
@@ -39,29 +43,67 @@ public String toString() {
public static final AutoOffsetResetStrategy NONE = new
AutoOffsetResetStrategy(StrategyType.NONE);
private final StrategyType type;
+ private final Optional<Duration> duration;
private AutoOffsetResetStrategy(StrategyType type) {
+ this(type, Optional.empty());
+ }
+
+ private AutoOffsetResetStrategy(StrategyType type, Optional<Duration>
duration) {
Review Comment:
shouldn't this constructor param be a `Duration`, not optional? I agree with
having the class var `Optional`, but when creating `AutoOffsetResetStrategy` I
expect we either create one that only has a type (so uses constructor above,
ie. earliest/latest), or we create one that does need duration, not optional
(and uses this constructor). We won't allow a by_duration without Duration
right?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AutoOffsetResetStrategy.java:
##########
@@ -39,29 +43,67 @@ public String toString() {
public static final AutoOffsetResetStrategy NONE = new
AutoOffsetResetStrategy(StrategyType.NONE);
private final StrategyType type;
+ private final Optional<Duration> duration;
private AutoOffsetResetStrategy(StrategyType type) {
+ this(type, Optional.empty());
+ }
+
+ private AutoOffsetResetStrategy(StrategyType type, Optional<Duration>
duration) {
this.type = type;
+ this.duration = duration;
}
+ /**
+ * Returns true if the given string is a valid auto offset reset strategy.
+ */
public static boolean isValid(String offsetStrategy) {
- return
Arrays.asList(Utils.enumOptions(StrategyType.class)).contains(offsetStrategy);
+ if (offsetStrategy == null) {
+ return false;
+ } else if (StrategyType.BY_DURATION.toString().equals(offsetStrategy))
{
+ return false;
Review Comment:
should we add a log.warn for this missing duration? (or propagate the error
to give better visibility on why the duration config is not ok? I expect this
might be a very common mistake initially.
With the current shape seems that any wrong value in this config will end up
just showing a generic msg IllegalArgumentException("Unknown auto offset reset
strategy: "..) right?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AutoOffsetResetStrategy.java:
##########
@@ -98,7 +159,7 @@ public void ensureValid(String name, Object value) {
String strategy = (String) value;
if (!AutoOffsetResetStrategy.isValid(strategy)) {
throw new ConfigException(name, value, "Invalid value " +
strategy + " for configuration " +
- name + ": the value must be either 'earliest',
'latest', or 'none'.");
+ name + ": the value must be either 'earliest',
'latest', or 'none' or of the format 'by_duration:<PnDTnHnMn.nS.>'.");
Review Comment:
```suggestion
name + ": the value must be either 'earliest',
'latest', 'none' or of the format 'by_duration:<PnDTnHnMn.nS.>'.");
```
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AutoOffsetResetStrategy.java:
##########
@@ -72,23 +114,42 @@ public String name() {
return type.toString();
}
+ /**
+ * Return the timestamp to be used for the ListOffsetsRequest.
+ * @return the timestamp for the OffsetResetStrategy,
+ * if the strategy is EARLIEST or LATEST or duration is provided
+ * else return Optional.empty()
+ */
+ public Optional<Long> timestamp() {
+ if (type == StrategyType.EARLIEST)
+ return Optional.of(ListOffsetsRequest.EARLIEST_TIMESTAMP);
+ else if (type == StrategyType.LATEST)
+ return Optional.of(ListOffsetsRequest.LATEST_TIMESTAMP);
+ else if (duration.isPresent()) {
+ Instant now = Instant.now();
+ return Optional.of(now.minus(duration.get()).toEpochMilli());
+ } else
+ return Optional.empty();
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AutoOffsetResetStrategy that = (AutoOffsetResetStrategy) o;
- return Objects.equals(type, that.type);
+ return type == that.type && Objects.equals(duration, that.duration);
}
@Override
public int hashCode() {
- return Objects.hashCode(type);
+ return Objects.hash(type, duration);
}
@Override
public String toString() {
return "AutoOffsetResetStrategy{" +
- "type='" + type + '\'' +
+ "type=" + type +
+ ", duration=" + duration +
Review Comment:
what about something like `duration.isPresent? "duration=...": ""` (just to
avoid the duration noise if the strategy is not concerned about it
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AutoOffsetResetStrategy.java:
##########
@@ -39,29 +43,67 @@ public String toString() {
public static final AutoOffsetResetStrategy NONE = new
AutoOffsetResetStrategy(StrategyType.NONE);
private final StrategyType type;
+ private final Optional<Duration> duration;
private AutoOffsetResetStrategy(StrategyType type) {
+ this(type, Optional.empty());
+ }
+
+ private AutoOffsetResetStrategy(StrategyType type, Optional<Duration>
duration) {
this.type = type;
+ this.duration = duration;
}
+ /**
+ * Returns true if the given string is a valid auto offset reset strategy.
+ */
public static boolean isValid(String offsetStrategy) {
- return
Arrays.asList(Utils.enumOptions(StrategyType.class)).contains(offsetStrategy);
+ if (offsetStrategy == null) {
+ return false;
+ } else if (StrategyType.BY_DURATION.toString().equals(offsetStrategy))
{
+ return false;
+ } else if
(Arrays.asList(Utils.enumOptions(StrategyType.class)).contains(offsetStrategy))
{
+ return true;
+ } else if (offsetStrategy.startsWith(StrategyType.BY_DURATION + ":")) {
+ String isoDuration =
offsetStrategy.substring(StrategyType.BY_DURATION.toString().length() + 1);
+ try {
+ Duration duration = Duration.parse(isoDuration);
+ return !duration.isNegative();
+ } catch (Exception e) {
+ return false;
Review Comment:
similar here, should we give better visibility to the user to point to the
duration format violation behind the IllegalArgumentException it will get?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AutoOffsetResetStrategy.java:
##########
@@ -72,23 +114,42 @@ public String name() {
return type.toString();
}
+ /**
+ * Return the timestamp to be used for the ListOffsetsRequest.
+ * @return the timestamp for the OffsetResetStrategy,
+ * if the strategy is EARLIEST or LATEST or duration is provided
+ * else return Optional.empty()
+ */
+ public Optional<Long> timestamp() {
+ if (type == StrategyType.EARLIEST)
+ return Optional.of(ListOffsetsRequest.EARLIEST_TIMESTAMP);
+ else if (type == StrategyType.LATEST)
+ return Optional.of(ListOffsetsRequest.LATEST_TIMESTAMP);
+ else if (duration.isPresent()) {
Review Comment:
just for my understanding: given the validations, we expect we'll never have
StrategyType.BY_DURATION without duration present, correct? If so, would it be
clearer to check for the type instead? consistently with how it's done for the
other types?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AutoOffsetResetStrategy.java:
##########
@@ -39,29 +43,67 @@ public String toString() {
public static final AutoOffsetResetStrategy NONE = new
AutoOffsetResetStrategy(StrategyType.NONE);
private final StrategyType type;
+ private final Optional<Duration> duration;
private AutoOffsetResetStrategy(StrategyType type) {
+ this(type, Optional.empty());
+ }
+
+ private AutoOffsetResetStrategy(StrategyType type, Optional<Duration>
duration) {
this.type = type;
+ this.duration = duration;
}
+ /**
+ * Returns true if the given string is a valid auto offset reset strategy.
+ */
public static boolean isValid(String offsetStrategy) {
- return
Arrays.asList(Utils.enumOptions(StrategyType.class)).contains(offsetStrategy);
+ if (offsetStrategy == null) {
+ return false;
+ } else if (StrategyType.BY_DURATION.toString().equals(offsetStrategy))
{
+ return false;
+ } else if
(Arrays.asList(Utils.enumOptions(StrategyType.class)).contains(offsetStrategy))
{
+ return true;
+ } else if (offsetStrategy.startsWith(StrategyType.BY_DURATION + ":")) {
+ String isoDuration =
offsetStrategy.substring(StrategyType.BY_DURATION.toString().length() + 1);
+ try {
+ Duration duration = Duration.parse(isoDuration);
+ return !duration.isNegative();
+ } catch (Exception e) {
+ return false;
+ }
+ } else {
+ return false;
+ }
}
+ /**
+ * Returns the AutoOffsetResetStrategy from the given string.
+ */
public static AutoOffsetResetStrategy fromString(String offsetStrategy) {
- if (offsetStrategy == null || !isValid(offsetStrategy)) {
+ if (!isValid(offsetStrategy)) {
throw new IllegalArgumentException("Unknown auto offset reset
strategy: " + offsetStrategy);
}
- StrategyType type =
StrategyType.valueOf(offsetStrategy.toUpperCase(Locale.ROOT));
- switch (type) {
- case EARLIEST:
- return EARLIEST;
- case LATEST:
- return LATEST;
- case NONE:
- return NONE;
- default:
- throw new IllegalArgumentException("Unknown auto offset reset
strategy: " + offsetStrategy);
+
+ if (offsetStrategy.startsWith(StrategyType.BY_DURATION + ":")) {
+ String isoDuration =
offsetStrategy.substring(StrategyType.BY_DURATION.toString().length() + 1);
+ return new AutoOffsetResetStrategy(StrategyType.BY_DURATION,
Optional.of(Duration.parse(isoDuration)));
Review Comment:
we already had to do this same str processing/parsing to check `isValid`.
Would it make sense maybe to have the validation returning the
`AutoOffsetResetStrategy` object or null? It would simplify this fromString and
keep the logic in one place (we could keep the isValid returning boolean if it
can build a non-null AutoOffsetResetStrategy, and reuse that same builder here
to build fromString?)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]