Re: IllegalStateException when putting to state store in Transformer implementation

2017-06-16 Thread Matthias J. Sax
That is correct. You need to return a new instance on each call. I am not sure how to improve the docs though: 1) it's a supplier pattern (that should explain it) 2) also the `TransformerSupplier#get()` method JavaDoc says: > /** > * Return a new {@link Transformer} instance. > * >

Re: Single Key Aggregation

2017-06-16 Thread Matthias J. Sax
ivate KTable extractLICount(KStream< >> Windowed, >>>>> AdLog> joinedImprLogs) { >>>>>KTable liCount = joinedImprLogs.flatMap((key, >> value) >>>>> -> { >>>>> List> l = new ArrayList<>(); >>>>> i

Re: Kafka Streams vs Spark Streaming : reduce by window

2017-06-16 Thread Matthias J. Sax
ly makes sense for processing time semantics. For > event-time semantics I find the arguments for "no final aggregation" > totally convincing. > > > Cheers, > > Michał > > > On 16/06/17 00:08, Matthias J. Sax wrote: >> Hi Paolo, >> >> This SO

Re: comment in addStandbyTasks() method of StreamThread has some typos

2017-06-16 Thread Matthias J. Sax
Thanks for reporting this! Would you like to open a MINOR PR to fix it? Don't think we need a Jira for this. -Matthias On 6/16/17 9:26 AM, john cheng wrote: > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L1345 >

Re: Kafka 0.11 transactions API question

2017-06-16 Thread Matthias J. Sax
As Michał said. It's not designed for this use case. Kafka's transaction, are not the same thing as DB transactions and if you break it down, it allows for atomic (multi-partition) writes, but no 2-phase commit. Also, a transaction is "owned" by a single thread (ie, producer) and cannot be "share

Re: Kafka Streams vs Spark Streaming : reduce by window

2017-06-15 Thread Matthias J. Sax
Hi Paolo, This SO question might help, too: https://stackoverflow.com/questions/38935904/how-to-send-final-kafka-streams-aggregation-result-of-a-time-windowed-ktable For Streams, the basic model is based on "change" and we report updates to the "current" result immediately reducing latency to a m

Re: Dropped messages in kstreams?

2017-06-15 Thread Matthias J. Sax
tput 342476 intermediate calculations for 3414 >> distinct windows, 14400 distinct were expected. >> >> Regards, >> Caleb >> >> On Wed, Jun 14, 2017 at 5:13 PM, Matthias J. Sax >> wrote: >> >>> This seems to be related to internal KTable cac

Re: kafka streams first time did not print output

2017-06-14 Thread Matthias J. Sax
Hi, in your first example with .print() "before" and "after" .to() I want to clarify, that the order you use to add operators does not really matter here. The DAG you build will branch out anyway: +--> print() | KTable -> changelog --+--> to()

Re: Dropped messages in kstreams?

2017-06-14 Thread Matthias J. Sax
This seems to be related to internal KTable caches. You can disable them by setting cache size to zero. http://docs.confluent.io/current/streams/developer-guide.html#memory-management -Matthias On 6/14/17 4:08 PM, Caleb Welton wrote: > Update, if I set `StreamsConfig.NUM_STREAM_THREADS_CONFIG=

Re: KafkaStreams reports RUNNING even though all StreamThreads has crashed

2017-06-13 Thread Matthias J. Sax
I did create https://issues.apache.org/jira/browse/KAFKA-5440 On 5/16/17 8:46 AM, Eno Thereska wrote: > Hi Andreas, > > Thanks for reporting. This sounds like a bug, but could also be a semantic > thing. Couple of questions: > > - which version of Kafka are you using? > - what is the nature o

Re: kafka streams checkpoint and restore understanding

2017-06-10 Thread Matthias J. Sax
case will read checkpoint file when create StreamTask > or StandbyTask > > Ps: I'm wring an chinese book about kafka-internal. So I have to be sure > all my observation has theory support. > > > 2017-06-10 1:23 GMT+08:00 Matthias J. Sax : > >> Your observation i

Re: Kafka Streams Failed to rebalance error

2017-06-09 Thread Matthias J. Sax
ete(ConsumerCoordinator.java:259) >>>>>>>>>> [kafka-clients-0.10.2.1.jar!/:na] >>>>>>>>>> at >>>>>>>>>> >>>>>>>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator. >

Re: kafka streams checkpoint and restore understanding

2017-06-09 Thread Matthias J. Sax
Your observation is completely correct and this is also correct behavior. Note, that instance1 and instance2 both also do have a local RocksDB instance that holds the state. The checkpoint file basically tells streams, what prefix of the changelog topic is already in RocksDB. As Streams loads (no

Re: kafka streams docs about topology picture only one source node

2017-06-08 Thread Matthias J. Sax
Well. You can also ready multiple topics as a single KStream. > builder.stream("topic-1", "topic-2") Of course both topics must contain data with same key and value type. For this pattern, there is only one source node. There is no 1-to-1 relationship between input topics and source node, and th

Re: Reliably implementing global KeyValueStore#get

2017-06-07 Thread Matthias J. Sax
If you write to remote DB, keep in mind that this will impact you Streams app, as you loose data locality. Thus, populating a DB from the changelog might be better. It also decouples both systems what give you the advantage that your Streams app can still run if DB has an issues. If you write dire

Re: kafka streams changelog topic value

2017-06-07 Thread Matthias J. Sax
ConsoleConsumer by default uses String deserializer, but value in the changelog is of type long. For output topic, the type in converted from long to string though -- thus you can read the output topic without problems. For reading the changelog topic, you need to specify option --property value

Fwd: Re: [DISCUSS]: KIP-161: streams record processing exception handlers

2017-06-05 Thread Matthias J. Sax
So I wouldn't recommend relying on it. The option to turn it off is better than forcing it and still beeing unable to rollback badpills (as explained before) > > 6. Will add an end-to-end example as Michael suggested. > > Thanks > Eno > > > >> On 4 Jun 2017, at 0

Re: [DISCUSS]: KIP-161: streams record processing exception handlers

2017-06-05 Thread Matthias J. Sax
wn into the > topology. I haven't followed but I pray you made EoS optional! We don't > need this and we don't want this and we will turn it off if it comes. So > I wouldn't recommend relying on it. The option to turn it off is better > than forcing it and sti

Re: Losing messages in Kafka Streams after upgrading

2017-06-05 Thread Matthias J. Sax
s a small percentage of records to vanish) is far from >>> intuitive, and it somehow worked fine until a few weeks ago. >>> >>> I think your option 3 should work. I'll make a custom timestamp extractor >>> (I actually do have a timestamp in my messages), and

Re: Suppressing intermediate topics feeding (Global)KTable

2017-06-02 Thread Matthias J. Sax
ehow "broadcast" the information -- and Kafka Streams applications use topics to exchange data. Thus, we need a topic anyhow. Does this make sense? So your overall architecture seems to be sound to me. -Matthias On 6/2/17 2:37 PM, Steven Schlansker wrote: > >> On Jun 2, 2017,

Re: Finding StreamsMetadata with value-dependent partitioning

2017-06-02 Thread Matthias J. Sax
te: > >> On Jun 2, 2017, at 2:11 PM, Matthias J. Sax wrote: >> >> I am not sure if I understand the use case correctly. Could you give >> some more context? > > Happily, thanks for thinking about this! > >> >>> backing store whose partitioning is v

Re: Suppressing intermediate topics feeding (Global)KTable

2017-06-02 Thread Matthias J. Sax
Hi, If you want to populate a GlobalKTable you can only do this by reading a topic. So the short answer for you head line is: no, you can suppress the intermediate topic. However, I am wondering what the purpose of you secondary index is, and why you are using a GlobalKTable for it. Maybe you can

Re: Finding StreamsMetadata with value-dependent partitioning

2017-06-02 Thread Matthias J. Sax
I am not sure if I understand the use case correctly. Could you give some more context? > backing store whose partitioning is value dependent In infer that you are using a custom store and not default RocksDB? If yes, what do you use? What does "value dependent" mean in this context? Right now,

Re: Losing messages in Kafka Streams after upgrading

2017-06-02 Thread Matthias J. Sax
Hi Frank, yes, retention policy is based on the embedded record timestamps and not on system time. Thus, if you send messages with an old timestamp, they can trigger log/segment rolling. >> I see that the repartition topics have timestamp.type = CreateTime, does >> that mean it uses the timestamp

Re: [DISCUSS]: KIP-161: streams record processing exception handlers

2017-06-01 Thread Matthias J. Sax
First a meta comment. KIP discussion should take place on the dev list -- if user list is cc'ed please make sure to reply to both lists. Thanks. Thanks for making the scope of the KIP clear. Makes a lot of sense to focus on deserialization exceptions for now. With regard to corrupted state stores

Re: Kafka Streams - possible to know partition?

2017-05-28 Thread Matthias J. Sax
I just added this FAQ: https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-HowcanIaccessrecordmetadata? -Matthias On 5/28/17 12:36 PM, Srimanth G wrote: > Hello, > When using Kafka Streams, is it possible to know which partition the byte[] > are coming from? > > Either in the Serde clas

Re: [DISCUSS]: KIP-161: streams record processing exception handlers

2017-05-26 Thread Matthias J. Sax
"bad" for this case would mean, that we got an `DeserializationException`. I am not sure if any other processing error should be covered? @Eno: this raises one one question. Might it be better to allow for two handlers instead of one? One for deserialization exception and one for all other excepti

Re: [DISCUSS]: KIP-161: streams record processing exception handlers

2017-05-26 Thread Matthias J. Sax
w important it is to configure the name (the less config the better -- otherwise it's getting to hard to see what is important/relevant and what not) -Matthias On 5/26/17 2:13 AM, Eno Thereska wrote: > Replying to Avi's and Matthias' questions in one go inline: >

Re: KIP-162: Enable topic deletion by default

2017-05-26 Thread Matthias J. Sax
+1 On 5/26/17 7:03 AM, Gwen Shapira wrote: > Hi Kafka developers, users and friends, > > I've added a KIP to improve our out-of-the-box usability a bit: > KIP-162: Enable topic deletion by default: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-162+-+Enable+topic+deletion+by+default > >

Re: Kafka Streams: "subscribed topics are not assigned to any members in the group"

2017-05-23 Thread Matthias J. Sax
Hi Dimity, this sounds like a bug to me. Can you share some more details. What is your program structure? How many partitions so you have per topic? How many threads/instances to you run? When does the issue occur exactly? -Matthias On 5/23/17 12:26 PM, Dmitry Minkovsky wrote: > Certain elemen

Re: Order of punctuate() and process() in a stream processor

2017-05-18 Thread Matthias J. Sax
ned advance of stream time for the "missed" punctuations - > > this would ease the processing of burst messages after some silence. I do > not see if KIP-138 may solve this or not. > > Regards > > -Sini > > > > From: "Matthias J. Sax"

Re: Java 8 stream consumer pattern

2017-05-17 Thread Matthias J. Sax
Did you try out Kafka Streams API instead of wrapping the consumer? It does support Lambdas already: https://github.com/confluentinc/examples/blob/3.2.x/kafka-streams/src/main/java/io/confluent/examples/streams/MapFunctionLambdaExample.java#L126 Full docs: http://docs.confluent.io/current/streams

Re: Creating read-only state stores from a compacted topic

2017-05-17 Thread Matthias J. Sax
Hey, at the moment there is no official API for that -- however, using `KSTreamBuilder#table()` we internally do the exact some thing -- we don't create an additional changelog topic, but use the original input topic for that. Thus, it might make sense to expose this as official API at Processor

Re: What purpose serves the repartition topic?

2017-05-17 Thread Matthias J. Sax
e security > configuration there is not final as of now. > > Thanks for the info > > On Tue, May 16, 2017 at 5:28 PM Matthias J. Sax > wrote: > >> João, >> >> in your example, k.toUpperCase() does break partitioning. Assume you >> have two records a

Re: What purpose serves the repartition topic?

2017-05-16 Thread Matthias J. Sax
João, in your example, k.toUpperCase() does break partitioning. Assume you have two records and -- both do have different keys and might be contained in different partitions. After you do a selectKey(), both do have the same key. In order to compute the aggregation correctly, it is required to r

Re: Can state stores function as a caching layer for persistent storage

2017-05-16 Thread Matthias J. Sax
inks, thank you. >>> >>> Part of my original misunderstanding was that the at-least-once guarantee >>> was considered fulfilled if the record reached a sink node. >>> >>> Thanks for all the feedback, you may consider my question answered. >>

Re: Kafka Streams Failed to rebalance error

2017-05-16 Thread Matthias J. Sax
ion 10.2.1 and >> currently monitoring the situation. >> Will report back in case I find any errors. Thanks for the help though. >> >> -Sameer. >> >> On Fri, May 5, 2017 at 3:37 AM, Matthias J. Sax >> wrote: >> >>> Did you see Eno's reply?

Re: Can state stores function as a caching layer for persistent storage

2017-05-14 Thread Matthias J. Sax
gt; >> When I said "local" I meant that the state stores are partial for each >> instance, i.e. they only have the partitions the task is responsible for >> (normal behavior) rather than a global store. >> >> >>> >>>> I read about Kafka C

Re: Debugging Kafka Streams Windowing

2017-05-13 Thread Matthias J. Sax
-4] INFO o.a.k.c.c.i.AbstractCoordinator - > Marking the coordinator broker-05:6667 (id: 2147483642 rack: null) dead for > group grp_id > 16:13:16.573 [StreamThread-2] INFO o.a.k.c.c.i.AbstractCoordinator - > Discovered coordinator broker-05:6667 (id: 2147483642 rack: null) for group > g

Re: Order of punctuate() and process() in a stream processor

2017-05-12 Thread Matthias J. Sax
; time it seems a bit odd. If you do anything in a punctuate call that is > relatively expensive it's especially bad. > > ________ > From: Matthias J. Sax [matth...@confluent.io] > Sent: Friday, May 12, 2017 1:18 PM > To: users@kafka.apache.org &

Re: Order of punctuate() and process() in a stream processor

2017-05-12 Thread Matthias J. Sax
punctuate() and process() in a stream > processor > > > > I'm a bit troubled by the fact that it fires 3 times despite the stream > time being advanced all at once; is there a scenario when this is > beneficial? > > > F

Re: Can state stores function as a caching layer for persistent storage

2017-05-12 Thread Matthias J. Sax
Hi, I am not sure about your overall setup. Are your stores local (similar to RocksDB) or are you using one global remote store? If you use a global remote store, you would not need to back your changes in a changelog topic, as the store would not be lost if case of failure. Also (in case that yo

Re: Order of punctuate() and process() in a stream processor

2017-05-12 Thread Matthias J. Sax
Hi Peter, It's by design. Streams internally tracks time progress (so-called "streams time"). "streams time" get advanced *after* processing a record. Thus, in your case, "stream time" is still at its old value before it processed the first message of you send "burst". After that, "streams time"

Kafka Streams Usage Patterns

2017-05-09 Thread Matthias J. Sax
Hi, I started a new Wiki page to collect some common usage patterns for Kafka Streams. Right now, it contains a quick example on "how to compute average". Hope we can collect more example like this! https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns -Matthias sign

Re: Kafka Stream stops polling new messages

2017-05-09 Thread Matthias J. Sax
n. > > Thanks for the help once again > > JP > On Tue, May 9, 2017 at 4:36 AM Eno Thereska wrote: > >> Yeah that's a good point, I'm not taking action then. >> >> Eno >> >> On Mon, May 8, 2017 at 10:38 PM, Matthias J. Sax >> wrote

Re: KafkaStreams pause specific topic partition consumption

2017-05-08 Thread Matthias J. Sax
r.java#L94 -Matthias On 5/8/17 4:01 PM, Timur Yusupov wrote: > That means in order to process filtered out records in a next batch, we > have to seek KafkaStreams back, right? > > On Tue, May 9, 2017 at 1:19 AM, Matthias J. Sax > wrote: > >> I see. >> >> I

Re: How to chain increasing window operations one after another

2017-05-08 Thread Matthias J. Sax
l as the Generic typing on some of these classes > (yea you KeyValueMapper), makes for long code! I had to specify them > everywhere since the key/val's changed. > > > I didn't get enough time to mess with it today, I will wrap up the unit > tests and run it to see how it p

Re: KafkaStreams pause specific topic partition consumption

2017-05-08 Thread Matthias J. Sax
I see. I you do the step of storing the end offsets in your database before starting up Streams this would work. What you could do as a work around (even if it might not be a nice solution), is to apply a `transform()` as your first operator. Within `transfrom()` you get access to there current r

Re: Debugging Kafka Streams Windowing

2017-05-08 Thread Matthias J. Sax
continuously check for > inconsistencies in real time and report if there is any issue. > > No issues have been reported for the last 24 hours. Will update this thread > if we find any issue. > > Thanks for all the support! > > > > On Fri, May 5, 2017 at 3:55

Re: Deduplicating KStream-KStream join

2017-05-08 Thread Matthias J. Sax
; > <https://www.facebook.com/myheritage> > <https://twitter.com/myheritage> <http://blog.myheritage.com/> > <https://www.youtube.com/user/MyHeritageLtd> > > > On Fri, May 5, 2017 at 8:15 PM, Matthias J. Sax > wrote: > >> Not sur

Re: Kafka Stream stops polling new messages

2017-05-08 Thread Matthias J. Sax
method, >>>>> which took ~11h, exactly the time the instance was stuck for. >>>>> Changing the backing state store from "persistent" to "inMemory" solved >>>> my >>>>> issue, at least after several days running, no st

Re: How to chain increasing window operations one after another

2017-05-08 Thread Matthias J. Sax
Michal, that's an interesting idea. In an ideal world, Kafka Streams should have an optimizer that is able to to this automatically under the hood. Too bad we are not there yet. @Garret: did you try this out? This seems to be a question that might affect many users, and it might we worth to docu

Re: [DISCUSS] KIP-156 Add option "dry run" to Streams application reset tool

2017-05-08 Thread Matthias J. Sax
Thanks for the KIP Bharat! I like it. Don't have anything to add so far. Seems to be fairly straight forward. Looking forward for further comments from the community. -Matthias On 5/8/17 11:17 AM, Steven Schlansker wrote: > >> On May 8, 2017, at 11:14 AM, BigData dev wrote: >> >> Hi All, >>

Re: Verify time semantics through topology

2017-05-06 Thread Matthias J. Sax
output Event Time >>> correct?* >>> - When new output records are generated via periodic functions such >>> as punctuate(), the output record timestamp is defined as the current >>>internal time (obtained through context.timestamp()) of the stream

Re: Deduplicating KStream-KStream join

2017-05-05 Thread Matthias J. Sax
ave not missed a feature on > the current DSL. > > I suppose the implementation would not be much different from putting a > transform after the join as a second state store would be required? > > Adrian > > -Original Message- > From: Matthias J. Sax [mailto

Re: Verify time semantics through topology

2017-05-05 Thread Matthias J. Sax
- *This is where I am confused, what operations count as a > punctuate()? Just the low level api? And are these thus Process time?* > - For aggregations, the timestamp of a resulting aggregate update >record will be that of the latest arrived input record that triggered

Re: Verify time semantics through topology

2017-05-04 Thread Matthias J. Sax
Hi, I am not sure if I understand correctly: If you use default TimestampExtractor, the whole pipeline will be event-time based. However, as you want to compute the AVG, I would recommend a different pattern anyway: FEED -> groupByKey() -> window() -> aggregate() -> mapValues() = avgKTable In

Re: Shouldn't the initializer of a stream aggregate accept the key?

2017-05-04 Thread Matthias J. Sax
hink the initializer should only return a value > (as it does right now). > On Thu, May 4, 2017 at 3:02 PM Matthias J. Sax > wrote: > >> Currently, you don't get any hold on the key, because the key must be >> protected from modification. >> >> Thus, the

Re: What is the impact of setting the consumer config max.poll.interval.ms to high value

2017-05-04 Thread Matthias J. Sax
If your consumer fails (ie. whole process dies) setting the value high is not a problem (because the heartbeat thread dies, too, and the failure will be detected quickly). It's only a problem if you "main processing thread" dies (and everything else is still up and running), or if you main process

Re: Deduplicating KStream-KStream join

2017-05-04 Thread Matthias J. Sax
Hi, we don't believe in triggers ;) https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/ -> Thus, it's a BigQuery flaw to not support updates... (IMHO) (We are also considering improving KStream-KStream join though, but that's of course no short term solution for you: https

Re: Debugging Kafka Streams Windowing

2017-05-04 Thread Matthias J. Sax
About > 07:44:08.493 [StreamThread-10] INFO o.a.k.c.c.i.AbstractCoordinator - > Discovered coordinator broker-05:6667 for group group-2. Please upgrade to Streams 0.10.2.1 -- we fixed couple of bug and I would assume this issue is fixed, too. If not, please report back. > Another question that I

Re: Record timestamp

2017-05-04 Thread Matthias J. Sax
The timestamp is always stored -- it's a metadata field that was added to the message format in 0.10.0. By default, Producer uses System.currentTimeMillis() to set the timestamp before it send the record to the broker. Or you can explicitly set the timestamp by yourself. The default TimestampExtr

Re: Kafka Streams Failed to rebalance error

2017-05-04 Thread Matthias J. Sax
Did you see Eno's reply? Please try out Streams 0.10.2.1 -- this should be fixed there. If not, please report back. I would also recommend to subscribe to the list. It's self-service http://kafka.apache.org/contact -Matthias On 5/3/17 10:49 PM, Sameer Kumar wrote: > My brokers are on version 1

Re: Shouldn't the initializer of a stream aggregate accept the key?

2017-05-04 Thread Matthias J. Sax
Currently, you don't get any hold on the key, because the key must be protected from modification. Thus, the initial value must be the same for all keys atm. We are aware that this limits some use cases and there is already a proposal to add keys to some API. https://cwiki.apache.org/confluence/d

Re: Kafka Streams stopped with errors, failed to reinitialize itself

2017-05-03 Thread Matthias J. Sax
What Kafka Streams version are you using? We had some bounce issues that got fixed in 0.10.2.1 (that was released last week). -Matthias On 5/3/17 4:57 AM, Sameer Kumar wrote: > Hi, > > > > I want to report an issue where in addition of a server at runtime in my > streams compute cluster cau

Re: joining two windowed aggregations

2017-05-03 Thread Matthias J. Sax
> Seems like this would be a standard join operation Not sure, if I would consider this a "standard" join... Your windows have different size and thus "move" differently fast. Kafka Stream joins provide sliding join semantics. Similar to a SQL query like this (conceptually): > SELECT * FROM st

Re: Kafka-Streams Offsets Commit - Reprocessing Very Old Messages

2017-05-03 Thread Matthias J. Sax
Hard to say. Couple of things you could try: upgrade to 0.10.2.1 (got released last week) -- it contains a couple of bug fixed with regard to rebalancing and state store locks. Also, when you application "jumps back", is this somewhere in the middle of your input topic or is it "earliest" -- if i

Re: disable KTable caching?

2017-05-03 Thread Matthias J. Sax
You can disable caching by setting cache size to zero. http://docs.confluent.io/current/streams/developer-guide.html#memory-management -Matthias On 5/3/17 4:07 PM, Steven Schlansker wrote: > I'm designing a Streams application that provides an API that acts > on messages. Messages have a send

Re: Debugging Kafka Streams Windowing

2017-05-03 Thread Matthias J. Sax
a particular field in the message. So, we just add >> that >>>> particular field's value in the HashSet and then take the size of the >>>> HashSet. >>>> >>>> On our side, we are also investigating and it looks like there might be >> a >&

Re: AW: Consumer with another group.id conflicts with streams()

2017-05-03 Thread Matthias J. Sax
I just confirmed. `KafkaConsumer.close()` should be idempotent. It's a bug in the consumer. https://issues.apache.org/jira/browse/KAFKA-5169 -Matthias On 5/3/17 2:20 PM, Matthias J. Sax wrote: > Yes, Streams might call "close" multiple times, as we assume it's an

Re: AW: Consumer with another group.id conflicts with streams()

2017-05-03 Thread Matthias J. Sax
also an indicator for a bug, because a > streaming app should behave the same, independent of whether one or two > instances are running. > - I added the properties you suggested, but behavior did not change. > > I think this is a bug, consumers of different groups should not inter

Re: Kafka Stream stops polling new messages

2017-05-02 Thread Matthias J. Sax
Did you check the logs? Maybe you need to increase log level to DEBUG to get some more information. Did you double check committed offsets via bin/kafka-consumer-groups.sh? -Matthias On 4/28/17 9:22 AM, João Peixoto wrote: > My stream gets stale after a while and it simply does not receive any n

Re: Caching in Kafka Streams to ignore garbage message

2017-04-30 Thread Matthias J. Sax
en meant to be read-in as a ktable to filter against. > > > Am I clearer now? > > > Cheers, > > Michał > > > On 30/04/17 18:14, Matthias J. Sax wrote: >> Your observation is correct. >> >> If you use inner KStream-KTable join, th

Re: Caching in Kafka Streams to ignore garbage message

2017-04-30 Thread Matthias J. Sax
one, does this: > > private void putInternal(byte[] rawKey, byte[] rawValue) { > if (rawValue == null) { > try { > db.delete(wOptions, rawKey); > > But any non-null value would do. > Please correct me if miss-understood. > > Cheers, > Michał &g

Re: Debugging Kafka Streams Windowing

2017-04-29 Thread Matthias J. Sax
Just a follow up (we identified a bug in the "skipped records" metric). The reported value is not correct. On 4/28/17 9:12 PM, Matthias J. Sax wrote: > Ok. That makes sense. > > Question: why do you use .aggregate() instead of .count() ? > > Also, can

Re: Kafka Streams 0.10.0.1 - multiple consumers not receiving messages

2017-04-28 Thread Matthias J. Sax
Henry, you might want to check out the docs, that give an overview of the architecture: http://docs.confluent.io/current/streams/architecture.html#example Also, I am wondering why your application did not crash: I would expect an exception like java.lang.IllegalArgumentException: Assigned partit

Re: Debugging Kafka Streams Windowing

2017-04-28 Thread Matthias J. Sax
(360)) > .mapValues(HashSet::size) > .toStream() > .map((key, value) -> convertToProtobufObject(key, value)) > .to() > > > > > > > On Fri, Apr 28, 2017 at 1:13 PM, Matthias J. Sax > wrote: > >> Thanks for the details (sorry that I for

Re: Consumer with another group.id conflicts with streams()

2017-04-28 Thread Matthias J. Sax
gt; do that? > > Andreas > > > -Ursprüngliche Nachricht- > Von: Matthias J. Sax [mailto:matth...@confluent.io] > Gesendet: Montag, 24. April 2017 19:18 > An: users@kafka.apache.org > Betreff: Re: Consumer with another group.id conflicts with streams() > &g

Re: Debugging Kafka Streams Windowing

2017-04-27 Thread Matthias J. Sax
Thanks for the details (sorry that I forgot that you did share the output already). Might be a dumb question, but what is the count for missing windows in your seconds implementation? If there is no data for a window, it should not emit a window with count zero, but nothing. Thus, looking at you

Re: Debugging Kafka Streams Windowing

2017-04-27 Thread Matthias J. Sax
see any errors in the logs. However, the skipped-records-rate has > not come down. > > > > > On Fri, Apr 28, 2017 at 5:17 AM, Matthias J. Sax > wrote: > >> Streams skips records with timestamp -1 >> >> The metric you mentioned, reports the number of sk

Re: Caching in Kafka Streams to ignore garbage message

2017-04-27 Thread Matthias J. Sax
t; Is it possible to send a message w/ the id as the partition key to a topic, > and then use the same id as the key, so the same node which will receive > the data for an id is the one which will process it? > > > On Fri, Apr 28, 2017 at 2:32 AM, Matthias J. Sax > wrote: > >>

Re: Caching in Kafka Streams to ignore garbage message

2017-04-27 Thread Matthias J. Sax
The recommended solution would be to use Kafka Connect to load you DB data into a Kafka topic. With Kafka Streams you read your db-topic as KTable and do a (inne) KStream-KTable join to lookup the IDs. -Matthias On 4/27/17 2:22 PM, Ali Akhtar wrote: > I have a Kafka topic which will receive a l

Re: Deployment of Kafka Stream app

2017-04-27 Thread Matthias J. Sax
If you use a containerized environment, if a container fails it will get restarted -- so you get HA out of the box. For scaling, you have a single docker image, but start the same image multiple times. If you have multiple apps, you should have one image per app and start the number of containers

Re: Debugging Kafka Streams Windowing

2017-04-27 Thread Matthias J. Sax
Streams skips records with timestamp -1 The metric you mentioned, reports the number of skipped record. Are you sure that `getEventTimestamp()` never returns -1 ? -Matthias On 4/27/17 10:33 AM, Mahendra Kariya wrote: > Hey Eno, > > We are using a custom TimeStampExtractor class. All messages

Re: KafkaStreams pause specific topic partition consumption

2017-04-27 Thread Matthias J. Sax
Timur, there is not API to pause/resume partitions in Streams, because Streams handles/manages its internal consumer by itself. The "batch processing KIP" is currently delayed -- but I am sure we will pick it up again. Hopefully after 0.11 got released. > So we are considering to just pause spec

Re: [ANNOUNCE] New committer: Rajini Sivaram

2017-04-24 Thread Matthias J. Sax
Congrats! On 4/24/17 4:59 PM, Apurva Mehta wrote: > Congratulations Rajini! > > On Mon, Apr 24, 2017 at 2:06 PM, Gwen Shapira wrote: > >> The PMC for Apache Kafka has invited Rajini Sivaram as a committer and we >> are pleased to announce that she has accepted! >> >> Rajini contributed 83 patch

Re: Consumer with another group.id conflicts with streams()

2017-04-24 Thread Matthias J. Sax
Hi, hard to diagnose. The new consumer should not affect the Streams app though -- even if I am wondering why you need it. > KafkaConsumer (with a UUID as group.id) that reads some historical data from > input topic Maybe using GlobalKTable instead might be a better solution? > (i.e. I feed 10

Re: Unable to consume from remote

2017-04-24 Thread Matthias J. Sax
Can you read/write your topics using > bin/kafka-console-[consumer|producer].sh If no, it's a general configuration problem. If yes, the problem is in your code. I did not open the link for security concerns... Hope you understand that. If you want to share code, pleas c&p __relevant__ parts int

Re: Version compatibility and flush() in Kafka Producer

2017-04-14 Thread Matthias J. Sax
0.9 clients cannot connect to 0.8 brokers. If you want to upgrade to 0.9, you first need to upgrade your brokers to 0.9 -- your running 0.8 clients will be able to connect to 0.9 brokers. Afterwards, you can update your clients to 0.9, too. Btw: since 0.10.2, clients are backward compatible to 0.

Re: Causes for Kafka messages being unexpectedly delivered more than once? The 'exactly once' semantic

2017-04-13 Thread Matthias J. Sax
aks our internal > bookkeeping of the data processing flows. > > In other words, we would ideally like to see message queueing capabilities > in Kafka with very high exactly-once delivery guarantees... > > Thanks, > - Dmitry > > > On Thu, Apr 13, 2017 at 7:00 PM, Matth

Re: Delayed consumer in a Streams topology

2017-04-13 Thread Matthias J. Sax
Hi, reading a topic twice -- what it the first requirement you have -- is not possible (and not necessary IMHO) with Streams API -- regardless of a "delayed" read. The reason is, that Streams uses a single consumer group.id internally and thus, Streams can commit only one offset per topic-partitio

Re: Causes for Kafka messages being unexpectedly delivered more than once? The 'exactly once' semantic

2017-04-13 Thread Matthias J. Sax
Hi, the first question to ask would be, if you get duplicate writes at the producer or duplicate reads at the consumer... For exactly-once: it's work in progress and we aim for 0.11 release (what might still be a beta version). In short, there will be an idempotent producer that will avoid dupli

Re: Kafka Streams Application does not start after 10.1 to 10.2 update if topics need to be auto-created

2017-04-13 Thread Matthias J. Sax
Hi, thanks for reporting this issue. We are aware of a bug in 0.10.2 that seems to be related: https://issues.apache.org/jira/browse/KAFKA-5037 However, I also want to point out, that it is highly recommended to not use auto topic create for Streams, but to manually create all input/output topics

Re: Streams error handling

2017-04-13 Thread Matthias J. Sax
Mike, thanks for your feedback. You are absolutely right that Streams API does not have great support for this atm. And it's very valuable that you report this (you are not the first person). It helps us prioritizing :) For now, there is no better solution as the one you described in your email,

Re: Subscription to this list

2017-04-12 Thread Matthias J. Sax
It's self-service: http://kafka.apache.org/contact -Matthias On 4/11/17 11:17 PM, Vidya Priyadarshni Narayan wrote: signature.asc Description: OpenPGP digital signature

Re: auto.offset.reset for Kafka streams 0.10.2.0

2017-04-10 Thread Matthias J. Sax
Default for Streams is "earliest" cf. https://github.com/apache/kafka/blob/0.10.2.0/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java#L405 -Matthias On 4/10/17 9:41 PM, Mahendra Kariya wrote: > This was even my assumption. But I had to explicitly specify > auto.offset.reset=late

Re: Request to subscribe

2017-04-05 Thread Matthias J. Sax
It's more helpful to cc'ed people that are not subscribed yet if you answer :) -Matthias On 4/4/17 11:39 PM, Shuai Lin wrote: > To subscribe you should send a message to users-subscr...@kafka.apache.org. > Please follow the instructions on https://kafka.apache.org/contact . > > On Wed, Apr 5, 2

Re: Which is True? Kafka site vs Confluent 3.2 site upgrade doc details contradiction regarding 0.10.2 clients backward compatible to resp. 0.10.0 vs 0.10.1?

2017-04-01 Thread Matthias J. Sax
The reason Streams API 0.10.2 is not backward compatible to 0.10.0 broker is not related to Producer/Consumer API. Streams API (as of 0.10.2) uses a new AdminClient (cf. KIP-4) for topic management that is not supported by 0.10.0 brokers. 0.10.0 broker topics are managed via ZK client only -- and Z

Re: Difference between with and without state store cleanup streams startup

2017-04-01 Thread Matthias J. Sax
> > And in order to make sure state store rebuilds or builds its state by > reading from changelog topic faster, we need to ensure that change log > topics are compacted efficiently. > > I hope these assumptions are correct. > > Thanks > Sachin > > > On

Re: Kafka streams 0.10.2 Producer throwing exception eventually causing streams shutdown

2017-03-31 Thread Matthias J. Sax
Yes, you can increase ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG -Matthias On 3/31/17 11:32 AM, Sachin Mittal wrote: > Hi, > So I have added the config ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE > and the NotLeaderForPartitionException is gone. > > However we see a new exception especially

<    4   5   6   7   8   9   10   11   12   >