the two records it checks if their LogAppendTime are within 5
> minutes then they would get joined.
> Please let me know if I got this part right?
>
> Thanks
> Sachin
>
>
>
>
> On Sat, Dec 7, 2019 at 10:43 AM John Roesler wrote:
>
> > Hi Sachin,
> >
terialized. byte[]>>as("store-name").withKeySerde(Serde).withValueSerde(Serde)))
>
> Note that I have custom class for Key and Value.
>
> Thanks
> Sachin
>
>
>
> On Fri, Dec 6, 2019 at 11:02 PM John Roesler wrote:
>
> > Hi Sachin,
> >
> >
; Thank you so much for your help
>
> --
> Alessandro Tagliapietra
>
>
> On Sat, Dec 7, 2019 at 9:14 AM John Roesler wrote:
>
> > Ah, yes. Glad you figured it out!
> >
> > Caching does not reduce EOS guarantees at all. I highly recommend using
> > it. You m
Hi Sachin,
I'd need more information to speculate about why your records are missing, but
it sounds like you're suspecting something to do with the records' timestamps,
so I'll just focus on answering your questions.
Streams always uses the same timestamp for all operations, which is the
Hi again, Sachin,
I highly recommend this tool for helping to understand the topology
description: https://zz85.github.io/kafka-streams-viz/
I think your interpretation of the format is pretty much spot-on.
Hope this helps,
-John
On Fri, Dec 6, 2019, at 12:21, Sachin Mittal wrote:
> Hi,
> I
Hi Sachin,
The way that Java infers generic arguments makes that case particularly
obnoxious.
By the way, the problem you're facing is specifically addressed by these
relatively new features:
*
er now
> having only one key per key instead of one key per window per key.
>
> Thanks a lot for helping! I'm now going to setup a prometheus-jmx
> monitoring so we can keep better track of what's going on :)
>
> --
> Alessandro Tagliapietra
>
>
> On Tue, Dec 3,
manually.
> Regarding the change in the recovery behavior, with compact cleanup policy
> shouldn't the changelog only keep the last value? That would make the
> recovery faster and "cheaper" as it would only need to read a single value
> per key (if the cleanup just happened) rig
ue for each key.
>
> Hope that makes sense
>
> Thanks again
>
> --
> Alessandro Tagliapietra
>
>
> On Tue, Dec 3, 2019 at 3:04 PM John Roesler wrote:
>
> > Hi Alessandro,
> >
> > To take a stab at your question, maybe it first doesn't find it, but then
261, next starting position is 3024262
>
>
> why it first says it didn't find the checkpoint and then it does find it?
> It seems it loaded about 2.7M records (sum of offset difference in the
> "restorting partition ...." messages) right?
> Maybe should I try to reduce the
Hi Vasily,
Probably in this case, with the constraints you’re providing, the first branch
would output first, but I wouldn’t depend on it. Any small change in your
program could mess this up, and also any change in Streams could alter the
exact execution order also.
The right way to think
Hi Murilo,
For this case, you don’t have to worry. Kafka Streams provides the guarantee
you want by default.
Let us know if you want/need more information!
Cheers,
John
On Tue, Dec 3, 2019, at 08:59, Murilo Tavares wrote:
> Hi Mathias
> Thank you for your feedback.
> I'm still a bit confused
Hi Alessandro,
I'm sorry to hear that.
The restore process only takes one factor into account: the current offset
position of the changelog topic is stored in a local file alongside the state
stores. On startup, the app checks if the recorded position lags the latest
offset in the changelog.
Hi Alessandro,
Sorry if I'm missing some of the context, but could you just keep
retrying the API call inside a loop? This would block any other
processing by the same thread, but it would allow Streams to stay up
in the face of transient failures. Otherwise, I'm afraid that throwing
an exception
Hey all,
If you want to figure it out theoretically, if you print out the
topology description, you'll have some number of state stores listed
in there. The number of Rocks instances should just be
(#global_state_stores +
sum(#partitions_of_topic_per_local_state_store)) . The number of
stream
n practice, it may not be a problem always. But then the real time nature of
> the problem might require us that there is not a huge delay between the
> processing of the event and the flush. How does one solve this issue in
> production ? I am wondering why the design did not accommodat
ssible that there is new stream of messages coming in but post-map
> operation, the partitions in the repartitioned topic does not see the same
> thing.
>
> Thanks
> Mohan
>
> On 6/24/19, 7:49 AM, "John Roesler" wrote:
>
> Hey, this is a very apt q
Hey Sendoh,
I think you just overlooked the javadoc in your search, which says:
> @deprecated since 2.1. Use {@link Materialized#withRetention(Duration)} or
> directly configure the retention in a store supplier and use {@link
> Materialized#as(WindowBytesStoreSupplier)}.
Sorry for the
ross all the application instances ? Is there a designated instance for a
> particular key ?
>
> In my case, there was only one instance processing records from all
> partitions and it is kind of odd that windows did not expire even though I
> understand why now.
>
> Thanks
>
ion ( I will go back and confirm
> this.
>
> -mohan
>
>
> On 6/21/19, 12:40 PM, "John Roesler" wrote:
>
> Sure, the record cache attempts to save downstream operators from
> unnecessary updates by also buffering for a short amount of time
>
the windows depends only on the event time and not on the key
>
> Are these two statements correct ?
>
> Thanks
> Mohan
>
> On 6/20/19, 9:17 AM, "John Roesler" wrote:
>
> Hi!
>
> In addition to setting the grace period to zero (or some small
>
Hi!
You might also want to set MAX_TASK_IDLE_MS_CONFIG =
"max.task.idle.ms" to a non-zero value. This will instruct Streams to
wait the configured amount of time to buffer incoming events on all
topics before choosing any records to process. In turn, this should
cause records to be processed in
Hi!
In addition to setting the grace period to zero (or some small
number), you should also consider the delays introduced by record
caches upstream of the suppression. If you're closely watching the
timing of records going into and coming out of the topology, this
might also spoil your
Hi Ashok,
In general, what Ryanne said is correct. For example, if you send an
email, and the send times out (but the email actually did get sent),
your app cannot distinguish this from a failure in which the send
times out before the email makes it out. Therefore, your only option
would be to
Hi Alessandro,
Interesting. I agree, messing with the record timestamp to achieve your
goal sounds too messy.
It should be pretty easy to plug in your own implementation of Windows,
instead of using the built-in TimeWindows, if you want slightly different
windowing behavior.
Does that work for
Hi Ashok,
I think some people may be able to give you advice, but please start a new
thread instead of replying to an existing message. This just helps keep all
the messages organized.
Thanks!
-John
On Thu, Apr 25, 2019 at 6:12 AM ASHOK MACHERLA wrote:
> Hii,
>
> what I asking
>
> I want to
Hey, Jose,
This is an interesting thought that I hadn't considered before. I think
(tentatively) that windowsFor should *not* take the grace period into
account.
What I'm thinking is that the method is supposed to return "all windows
that contain the provided timestamp" . When we keep window1
Hi Pierre,
If you're using a Processor (or Transformer), you might be able to use the
`close` method for this purpose. Streams invokes `close` on the Processor
when it suspends the task at the start of the rebalance, when the
partitions are revoked. (It invokes `init` once the rebalance is
Hi, Murlio,
I found https://issues.apache.org/jira/browse/KAFKA-7970, which sounds like
the answer is currently "yes". Unfortunately, it is still tricky to handle
this case, although the situation may improve soon.
In the mean time, you can try to work around it with the StateListener.
When
to reproduce the problem, and also exactly
the behavior you observe?
Thanks,
-John
On Mon, Mar 4, 2019 at 10:41 PM John Roesler wrote:
> Hi Jonathan,
>
> Sorry to hear that the feature is causing you trouble as well, and that
> the 2.2 release candidate didn't seem to fix it.
>
&g
X-KTABLE-SUPPRESS-STATE-STORE-11-changelog-10 with EOS turned
> on. *Reinitializing
> > the task and restore its state from the beginning.*
> >
> > ...
> >
> >
> > I was hoping for this to be fixed, but is not the case, at least for my
> > case.
&
; miss the intended "exactly once" guarantee.
>
> Thanks,
> -John
>
> On Fri, Jan 25, 2019 at 1:51 AM Peter Levart
> wrote:
>
>> Hi John,
>>
>> Haven't been able to reinstate the demo yet, but I have been re-reading
>> the following scenario
Hi Trey,
I think there is a ticket open requesting to be able to re-use the source
topic, so I don't think it's an intentional restriction, just a consequence
of the way the code is structured at the moment.
Is it sufficient to send the update to "calls" and "answered-calls" at the
same time?
ading
> the following scenario of yours
>
> On 1/24/19 11:48 PM, Peter Levart wrote:
> > Hi John,
> >
> > On 1/24/19 3:18 PM, John Roesler wrote:
> >
> >>
> >> The reason is that, upon restart, the suppression buffer can only
> >> "
9 at 5:11 AM Peter Levart wrote:
> Hi John,
>
> Sorry I haven't had time to prepare the minimal reproducer yet. I still
> have plans to do it though...
>
> On 1/22/19 8:02 PM, John Roesler wrote:
> > Hi Peter,
> >
> > Just to follow up on the actual bug,
e with EOS enabled.
Thanks for your help,
-John
On Mon, Jan 14, 2019 at 1:20 PM John Roesler wrote:
> Hi Peter,
>
> I see your train of thought, but the actual implementation of the
> window store is structured differently from your mental model.
> Unlike Key/Value stores,
I have some
> more questions...
>
> On 1/10/19 5:33 PM, John Roesler wrote:
> > Hi Peter,
> >
> > Regarding retention, I was not referring to log retention, but to the
> > window store retention.
> > Since a new window is created every second (for example), there
Hi Peter,
Regarding retention, I was not referring to log retention, but to the
window store retention.
Since a new window is created every second (for example), there are in
principle an unbounded
number of windows (the longer the application runs, the more windows there
are, with no end).
their value - but
> many many more that didn't lose their value).
> (I am monitoring those changes using datadog, so I know which keys are
> affected and I can investigate them)
>
> Let me know if you need some more details or if you want me to escalate
> this situation to a jira
Hi Peter,
Sorry, I just now have seen this thread.
You asked if this behavior is unexpected, and the answer is yes.
Suppress.untilWindowCloses is intended to emit only the final result,
regardless of restarts.
You also asked how the suppression buffer can resume after a restart, since
it's not
Hi Nitay,
I'm sorry to hear of these troubles; it sounds frustrating.
No worries about spamming the list, but it does sound like this might be
worth tracking as a bug report in Jira.
Obviously, we do not expect to lose data when instances come and go,
regardless of the frequency, and we do have
streams topology, does it have
> to
> > read back everything it sends to the broker or can it keep the table
> > internally? I hope it is understandable what I mean, otherwise I can try
> > the explain it more clearly.
> >
> > best regards
> >
> > Patrik
&
ense?
>
> Having said this: you _can_ write a KStream into a topic an read it back
> as KTable. But it's semantically questionable to do so, IMHO. Maybe it
> makes sense for your specific application, but in general I don't think
> it does make sense.
>
>
> -Matthias
>
&
Hi Patrik,
Just to drop one observation in... Streaming to a topic and then consuming
it as a table does create overhead, but so does reducing a stream to a
table, and I think it's actually the same in either case.
They both require a store to collect the table state, and in both cases,
the
results when I have more info.
>
> Greets,
> Bart
>
> On Mon, Oct 8, 2018 at 7:36 PM John Roesler wrote:
>
> > Hi Bart,
> >
> > This sounds a bit surprising. Is there any chance you can zip up some
> logs
> > so we can see the assignment protocol on the
Hi Bart,
This sounds a bit surprising. Is there any chance you can zip up some logs
so we can see the assignment protocol on the nodes?
Thanks,
-John
On Mon, Oct 8, 2018 at 4:32 AM Bart Vercammen wrote:
> Hi,
>
> I recently moved some KafkaStreams applications from v0.10.2.1 to v1.1.1
> and
Hey Praveen,
I also suspect that you can get away with far fewer threads. Here's the
general starting point I recommend:
* start with just a little over 1 thread per hardware thread (accounting
for cores and hyperthreading). For example, on my machine, I have 4 cores
with 2 threads of execution
pic).
> What do you think?
>
> El sáb., 15 sept. 2018 a las 0:30, John Roesler ()
> escribió:
>
> > Specifically, you can monitor the "records-lag-max" (
> > https://docs.confluent.io/current/kafka/monitoring.html#fetch-metrics)
> > metric. (or the mo
Specifically, you can monitor the "records-lag-max" (
https://docs.confluent.io/current/kafka/monitoring.html#fetch-metrics)
metric. (or the more granular one per partition).
Once this metric goes to 0, you know that you've caught up with the tail of
the log.
Hope this helps,
-John
On Fri, Sep
> >
> > Kind regards,
> >
> > Liam Clarke
> >
> >> On Tue, Sep 11, 2018 at 1:43 PM, Michael Eugene
> wrote:
> >>
> >> Well to bring up that kafka to 2.0, do I just need for sbt kafka clients
> >> and kafka streams 2.0 for sbt?
Hi Elliot,
This is not currently supported, but I, for one, think it would be awesome.
It's something I have considered tackling in the future.
Feel free to create a Jira ticket asking for it (but please take a minute
to search for preexisting tickets).
Offhand, my #1 concern would be how it
your state store, you may find that
recovery from the changelog is on the slow side. For this reason, it's
probably a good idea to use stateful sets with K8s if possible.
Does this help?
Thanks,
-John
On Wed, Sep 12, 2018 at 7:50 AM Tim Ward wrote:
> From: John Roesler
> > As you noticed,
In addition to the other suggestions, if you're having too much trouble
with the interface, you can always fall back to creating anonymous
Initializer/Aggregator instances the way you would if programming in Java.
This way, you wouldn't need SAM conversion at all (all it's doing is
turning your
Hi Tim,
>From your spec, I think that Kafka Streams has several ways to support
either scenario. Caveat: this is off the cuff, so I might have missed
something. For your context, I'll give you some thoughts on several ways
you could do it, with various tradeoffs.
Scenario 1 (every widget reports
Hi Robin,
The AdminClient is what you want.
"Evolving" is just a heads-up that the API is relatively new and hasn't
stood the test of time, so you shouldn't be *too* surprised to see it
change in the future.
That said, even APIs marked "Evolving" need to go through the KIP process
to be changed,
Hi Meeiling,
It sounds like "formal process" is more like "fill out a form beforehand"
than just "access controlled".
In that case, although it's somewhat of an internal detail, I think the
easiest thing would be to figure out what the name of the internal topics
will be and request their
stream by fusing together records from the two inputs
by key
For example:
input-1:
(A, 1)
(B, 2)
input-2:
(A, 500)
(C, 60)
join( (l,r) -> new KeyValue(l, r) ):// simplified API
(A, (1, 500) )
(B, (2, null) )
(C, (null, 60) )
merge:
(A, 1)
(A, 500)
(B, 2)
(C, 60)
Does that make sense?
-J
here. Could you please clarify this?
>
> Can you please point me to a place in kafka-streams sources where a
> Change of newValue/oldValue is produced, so I could take a look? I
> found KTableReduce implementation, but can't find who makes these
> Change value.
> On Fri, Jul 13, 20
art with table aggregation (no app reset)
>
> Bellow is an interpretation of the adder/subtractor logs for a given
> key/window in the chronological order
>
> SUB: newValue=(key2, 732, 10:50:40) aggValue=null
> ADD: newValue=(key2, 751, 10:50:59) aggValue=(-732, 10:50:40)
> SUB: newVa
Hi Vasily,
Thanks for the email.
To answer your question: you should reset the application basically any
time you change the topology. Some transitions are safe, but others will
result in data loss or corruption. Rather than try to reason about which is
which, it's much safer just to either
both expressed doubt that there
are practical use cases for it outside of final-results.
-John
On Mon, Jul 2, 2018 at 12:27 PM John Roesler wrote:
> Hi again, Guozhang ;) Here's the second part of my response...
>
> It seems like your main concern is: "if I'm a user who wants final u
oes "suppressLateEvents" with parameter Y != X (window retention time)
> for windowed stores make sense in practice?
> 2. Does "suppressLateEvents" with any parameter Y for non-windowed stores
> make sense in practice?
>
>
>
> Guozhang
>
>
&
ot; with any parameter Y for non-windowed stores
> make sense in practice?
>
>
>
> Guozhang
>
>
> On Fri, Jun 29, 2018 at 2:26 PM, Bill Bejeck wrote:
>
> > Thanks for the explanation, that does make sense. I have some questions
> on
> > operations, but
d also record it in "too-late-records"
> metrics. And also this emit policy would not need any buffering, since the
> original store's cache contains the record context already need for
> flushing downstream.
>
> My primary motivation is that, from user's perspective, this ma
lly the same as what I
> suggested.
> > So it is good by me.
> >
> > Thanks
> >
> > On Wed, Jun 27, 2018 at 7:31 AM, John Roesler wrote:
> >
> >> Thanks for taking look, Ted,
> >>
> >> I agree this is a departure from the conve
Hello again all,
I realized today that I neglected to include metrics in the proposal. I
have added them just now.
Thanks,
-John
On Tue, Jun 26, 2018 at 3:11 PM John Roesler wrote:
> Hello devs and users,
>
> Please take some time to consider this proposal for Kafka Streams:
&g
ructures by supporting `of` ?
>
> Suppression.of(Duration.ofMinutes(10))
>
>
> Cheers
>
>
>
> On Tue, Jun 26, 2018 at 1:11 PM, John Roesler wrote:
>
> > Hello devs and users,
> >
> > Please take some time to consider this proposal for Kafka
Hello devs and users,
Please take some time to consider this proposal for Kafka Streams:
KIP-328: Ability to suppress updates for KTables
link: https://cwiki.apache.org/confluence/x/sQU0BQ
The basic idea is to provide:
* more usable control over update rate (vs the current state store caches)
Hi Sicheng,
I'm also curious about the details.
Let's say you are doing a simple count aggregation with 24-hour windows.
You got three events with key "A" on 2017-06-21, one year ago, so the
windowed key (A,2017-06-21) has a value of 3.
Fast-forward a year later. We get one late event, also for
Hi Sam,
This sounds like a condition I fixed in
https://github.com/apache/kafka/commit/ed51b2cdf5bdac210a6904bead1a2ca6e8411406#diff-8b364ed2d0abd8e8ae21f5d322db6564R221
. I realized that the prior code creates a new Meter, which uses a Total
metric instead of a Count. But that would total all
Hi EC,
Thanks for the very clear report and question. Like Guozhang said this is
expected (but not ideal) behavior.
For an immediate work-around, you can try materializing the KTable and
setting the commit interval and cache size as discussed here (
artion and changelog topics will contain two identical messages, like
> this
>
> {"claimList":{"lst":[{"claimnumber":"3_0","claimtime":"708.521153490306","
> claimreporttime":"55948.33110985625","claimcounter":&
Hello Artur,
Apologies in advance if I say something incorrect, as I'm still a little
new to this project.
If I followed your example, then I think the scenario is that you're
joining "claims" and "payments", grouping by "claimNumber", and then
building a list for each "claimNumber" of all the
k it is a good cost to pay,
> plus once we start publishing test-util artifacts for other projects like
> client and connect, we may face the same issue and need to do this
> refactoring as well.
>
>
>
> Guozhang
>
>
>
>
> On Fri, Mar 9, 2018 at 9:54 AM, John R
Hi Wim,
One off-the-cuff idea is that you maybe don't need to actually delay
anonymizing the data. Instead, you can just create a separate pathway that
immediately anonymizes the data. Something like this:
(raw-input topic, GDPR retention period set)
|\->[streams apps that needs non-anonymized
hn
On Thu, Mar 8, 2018 at 3:39 PM, Matthias J. Sax <matth...@confluent.io>
wrote:
> Isn't MockProcessorContext in o.a.k.test part of the unit-test package
> but not the main package?
>
> This should resolve the dependency issue.
>
> -Matthias
>
> On 3/8/18 3:32 P
> V value, String childName)` -- should we also throw
> UnsupportedOperationException similar to `schedule(long)` if KIP-251 is
> accepted?
>
>
> -Matthias
>
> On 3/8/18 3:16 PM, John Roesler wrote:
> > Thanks for the review, Guozhang,
> >
> > In response:
>
hird module that depends on both
streams and test-utils. Yuck!
Thanks,
-John
On Thu, Mar 8, 2018 at 3:16 PM, John Roesler <j...@confluent.io> wrote:
> Thanks for the review, Guozhang,
>
> In response:
> 1. I missed that! I'll look into it and update the KIP.
>
> 2.
e punctuators are
> indeed registered, and if people want full auto punctuation testing they
> have to go with TopologyTestDriver.
>
>
>
> Guozhang
>
>
> On Wed, Mar 7, 2018 at 8:04 PM, John Roesler <j...@confluent.io> wrote:
>
> > On Wed, Mar 7, 2018 at 8:0
On Wed, Mar 7, 2018 at 8:03 PM, John Roesler <j...@confluent.io> wrote:
> Thanks Ted,
>
> Sure thing; I updated the example code in the KIP with a little snippet.
>
> -John
>
> On Wed, Mar 7, 2018 at 7:18 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>
>> Looks
Thanks Ted,
Sure thing; I updated the example code in the KIP with a little snippet.
-John
On Wed, Mar 7, 2018 at 7:18 PM, Ted Yu <yuzhih...@gmail.com> wrote:
> Looks good.
>
> See if you can add punctuator into the sample code.
>
> On Wed, Mar 7, 2018 at 7:1
Dear Kafka community,
I am proposing KIP-267 to augment the public Streams test utils API.
The goal is to simplify testing of Kafka Streams applications.
Please find details in the
101 - 182 of 182 matches
Mail list logo