Hi,
The first selectKey/groupBy/windowedBy/reduce is to group messages by
key and drop duplicated messages based on the new key, so that for
each 1hr time window, each key will only populate 1 message. I use
suppress() is to make sure only the latest message per time window
will be sent.
The
You might want to try temporarily commenting the suppress() call just to
see if that's the cause of the issue. That said, what is the goal of this
topology? It looks like you're trying to produce a count at the end for a
key. Is the windowedBy() and suppress() there just to eliminate
duplicates,
Ok I think I've found the problem
looking at
https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll-java.time.Duration-
it
says:
On each poll, consumer will try to use the last consumed offset as the
> starting offset and fetch sequentially. The last
The only way this works is if I don't catch the exception, let the consumer
crash and fully restart it.
Maybe the consumer has an internal state that always gets updated when it
receives a message during poll?
--
Alessandro Tagliapietra
On Wed, Sep 25, 2019 at 7:37 PM Alessandro Tagliapietra
I downloads a Binary package(kafka_2.12-2.3.0) and index.html in site-docs dir
is can't be effective. like these pics.
If I change my code slightly trying to manually commit offsets at partition
level
while (true) {
try {
val records = consumer.poll(Duration.ofHours(1))
val partitionOffsetMap = mutableMapOf()
records.forEach {
Hello,
I have one topic with 12 partitions where the partition 0 is missing one
node from ISR... Is there a way I get get it back to work again without
having to do some weird stuff like restarting the cluster?
Because this missing node in ISR is causing some problems for the
consumers...
Hi Alex,
Thanks for the reply!
Yes. After deploy with same application ID, source topic has new
messages and the application is consuming them but no output at the
end.
suppress call is:
.suppress(untilWindowCloses(Suppressed.BufferConfig.unbounded()))
Topology is like below:
final KStream
Hi Xiyuan, just to clarify: after you restart the application (using the
same application ID as previously) there ARE new messages in the source
topic and your application IS consuming them, but you're not seeing any
output at the end? How are you configuring your suppress() call? Is it
possible
It's still is in the topic because is weeks after the deletion threshold
(today's message with a 4 weeks retention).
So I assume the consumer just moves on to the next one.
As a test I've created this test script
https://gist.github.com/alex88/85ba5c3288a531a107ed6a22751c1088
After running this I
"I'm just saying that a message which processing throws an exception is
gone within minutes."
Is the message no longer in the topic or is your consumer group current
offset just higher than the offset of the message in question?
On Wed, Sep 25, 2019 at 7:38 PM Alessandro Tagliapietra <
Tried your suggestions and unable to get suppress emit anything. I can see the
SUPPRESS_STORES are created in Kafka nodes but nothing get outputted.
Looks like the grace period and window closing is not honored for some reason.
I can see lot of people having difficulty of getting suppress
You mean the retention time of the topic? That's one month
I'm just saying that a message which processing throws an exception is gone
within minutes.
I will handle duplicates better later, if we can't be sure that we don't
skip/lose these messages then it's useless to use kafka.
That's why I'm
How long is your message retention set for ? Perhaps you want to increase
that to a large enough value.
I have almost identical use case, but I would strongly recommend that you
handle duplicates as they are due to your process ( not Kafka duplicate).
Regards,
On Wed, 25 Sep 2019 at 22:37,
Hello folks,
This is a kind reminder of the Bay Area Kafka® meetup next Tuesday 6:30pm,
at Confluent's San Francisco office.
*RSVP and Register* (if you intend to attend in person):
https://www.meetup.com/KafkaBayArea/events/264562779/
*Date*
6:30pm, Tuesday, October 1st, 2019
*Location*
Hello everyone,
I've another problem unrelated to the previous one so I'm creating another
thread
We've a stream application that reads from a topic, read/writes from 3
different stores and writes the output to another topic. All with exactly
once processing guarantee enabled.
Due to a bug in
I've disabled the auto commit, so what I thought that code would do is:
- it fetches a message
- it processes a message
- if everything went fine it commits its offset
- if there's an exception, it didn't commit, so after the error it would
just poll again and get the same message over and
Hi,
How are you managing your offset commits ?
Also, if it’s a duplicate record issue ( sounds like database entry to
me), have you thought about ignoring/handling duplicates?
Thanks,
On Wed, 25 Sep 2019 at 21:28, Alessandro Tagliapietra <
tagliapietra.alessan...@gmail.com> wrote:
> Hello
There was a benchmark published on confluence wiki. It’s slightly old now,
but has a good spread of data that should help up pinpoint your test
parameters.
Thanks,
On Wed, 25 Sep 2019 at 21:53, Arvind Sharma wrote:
> Hello All,
>
> We are trying to load test Kafka Cluster but we are not able
Hello All,
We are trying to load test Kafka Cluster but we are not able to come up
with exact maths for producers concurrency , partition number, replication
factor , number of broker
I am just trying to find out what is the maximum concurrent connection a
partition of 1 node Kafka broker can
Hello everyone,
I've a consumer that fetches messages from a topic, for each message it
makes an API call to our backend. To ensure that if a message fails it
tries again to process the message I've set max.poll.records to 1 and I've
a code like this:
consumer.subscribe(arrayListOf("orders"))
Hi,
If I change application id, it will start process new messages I
assume? The old data will be dropped. But this solution will not work
during production deployment, since we can't change application id for
each release.
My code looks like below:
builder.stream(topicName)
.mapValues()
Thanks, Jason. I agree we should include this. I'll produce RC1 once this
patch is available.
-David
On Tue, Sep 24, 2019 at 6:02 PM Jason Gustafson wrote:
> Hi David,
>
> Thanks for running the release. I think we should consider getting this bug
> fixed:
Hi,
I think this is accomplished with metadata requests. A client sends this
request (along with a list of topics it's interested in) to a broker in the
cluster and gets back a list of leader/replica partitions. This is cached
and refreshed, and when a client receives the "Not a Leader" error
Hey Xiyuan,
I would assume it's easier for us to help you by reading your application
with a full paste of code (a prototype). Changing application id would work
suggests that re-process all the data again shall work, do I understand
that correctly?
Boyang
On Wed, Sep 25, 2019 at 8:16 AM Xiyuan
Hi,
I'm running a Kafka streams app(v2.1.0) with windowed function(reduce
and suppress). One thing I noticed is, every time when I redeployment
or restart the application, I have to change the application ID to a
new one, otherwise, only the reduce-repartition internal topic has
input traffic(and
Hi,
Can anyone clarify the procedure of a client getting to know about a Kafka
leader failover and identify the new leader?
Thanks.
27 matches
Mail list logo