Re: default auto.offset.reset value is reset to none

2020-05-07 Thread Matthias J. Sax
There is a ticket about it: https://issues.apache.org/jira/browse/KAFKA-7480

The current solution is to delete the local copy of the global state
store manually to be able to restart.


-Matthias

On 5/6/20 6:14 AM, Alexander Sibiryakov wrote:
> Hello,
> 
> I'm facing an issue in one of our Kafka Streams applications using
> GlobalKTable. The idea is to have a GlobalKTable over compacted topic
> and be able to re-read it on startup. We had a consumer group and topic
> sometime ago, recently I've recreated a topic, requiring consumer
> offsets to be reset and consumed from beginning. But application started
> to fail with OffsetOutOfRangeException and message "Offsets out of range
> with no configured reset policy for partitions..". I do have
> auto.offset.reset set in my configuration to "latest", but it is
> overridden to "none" for global and restore consumers of Streams
> application. 
> 
> This exception is resulting in a shutdown loop, and requiring
> investigation to understand what is going on.
> 
> 
> This is the line where it is happening
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java#L1244
> 
> So the question is this behavior of overriding offset reset policy is
> intended? If not, please confirm it is a bug, and will submit a patch.
> 
> See the Streams output for detailed configs, traceback and exceptions in
> attachment.
> 
> Thanks,
> A.



signature.asc
Description: OpenPGP digital signature


default auto.offset.reset value is reset to none

2020-05-06 Thread Alexander Sibiryakov
Hello,

I'm facing an issue in one of our Kafka Streams applications using
GlobalKTable. The idea is to have a GlobalKTable over compacted topic and
be able to re-read it on startup. We had a consumer group and topic
sometime ago, recently I've recreated a topic, requiring consumer offsets
to be reset and consumed from beginning. But application started to fail
with OffsetOutOfRangeException and message "Offsets out of range with no
configured reset policy for partitions..". I do have auto.offset.reset set
in my configuration to "latest", but it is overridden to "none" for global
and restore consumers of Streams application.

This exception is resulting in a shutdown loop, and requiring investigation
to understand what is going on.


This is the line where it is happening
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java#L1244

So the question is this behavior of overriding offset reset policy is
intended? If not, please confirm it is a bug, and will submit a patch.

See the Streams output for detailed configs, traceback and exceptions in
attachment.

Thanks,
A.
[2020-05-06 13:03:19,360: INFO/main] (AbstractConfig.java:347) - StreamsConfig 
values:
application.id = *-green-staging
application.server =
bootstrap.servers = [*:9092]
buffered.records.per.partition = 1000
cache.max.bytes.buffering = 10485760
client.id = *-green-staging
commit.interval.ms = 3
connections.max.idle.ms = 54
default.deserialization.exception.handler = class 
org.apache.kafka.streams.errors.LogAndContinueExceptionHandler
default.key.serde = class 
org.apache.kafka.common.serialization.Serdes$StringSerde
default.production.exception.handler = class 
com.test.LogAndContinueProductionExceptionHandler
default.timestamp.extractor = class 
org.apache.kafka.streams.processor.FailOnInvalidTimestamp
default.value.serde = class 
io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
max.task.idle.ms = 0
metadata.max.age.ms = 30
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 3
num.standby.replicas = 0
num.stream.threads = 1
partition.grouper = class 
org.apache.kafka.streams.processor.DefaultPartitionGrouper
poll.ms = 100
processing.guarantee = at_least_once
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
replication.factor = 1
request.timeout.ms = 2
retries = 2147483647
retry.backoff.ms = 500
rocksdb.config.setter = null
security.protocol = SASL_SSL
send.buffer.bytes = 131072
state.cleanup.delay.ms = 60
state.dir = /mnt/storage
topology.optimization = none
upgrade.from = null
windowstore.changelog.additional.retention.ms = 8640

[2020-05-06 13:03:19,386: INFO/main] (KafkaStreams.java:686) - stream-client 
[*-green-staging] Kafka Streams version: 2.4.0
[2020-05-06 13:03:19,386: INFO/main] (KafkaStreams.java:687) - stream-client 
[*-green-staging] Kafka Streams commit ID: 77a89fcf8d7fa018
[2020-05-06 13:03:21,446: INFO/main] (AbstractConfig.java:347) - ConsumerConfig 
values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = none
bootstrap.servers = [*:9092]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = *-green-staging-global-consumer
client.rack =
connections.max.idle.ms = 54
default.api.timeout.ms = 6
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = null
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = false
isolation.level = read_uncommitted
key.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 30
max.poll.records = 1000
metadata.max.age.ms = 30
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 3
partition.assignment.strategy = [class 
org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 2
retry.backoff.ms = 500
sasl.client.callback.handler.class = null
sasl.jaas.config = [hidden]
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 6