To unsubscribe you need to sent an email to

  users-unsubscr...@kafka.apache.org


-Matthias

On 12/3/16 6:13 PM, williamtellme123 wrote:
> Unsubscribe
> 
> 
> Sent via the Samsung Galaxy S7, an AT&T 4G LTE smartphone
> -------- Original message --------From: Guozhang Wang <wangg...@gmail.com> 
> Date: 12/2/16  5:48 PM  (GMT-06:00) To: users@kafka.apache.org Subject: Re: 
> Kafka windowed table not aggregating correctly 
> Sachin,
> 
> One thing to note is that the retention of the windowed stores works by
> keeping multiple segments of the stores where each segments stores a time
> range which can potentially span multiple windows, if a new window needs to
> be created that is further from the oldest segment's time range + retention
> period (from your code it seems you do not override it from
> TimeWindows.of("stream-table",
> 10 * 1000L).advanceBy(5 * 1000L), via until(...)), so the default of one
> day is used.
> 
> So with WallclockTimeExtractor since it is using system time, it wont give
> you timestamps that span for more than a day during a short period of time,
> but if your own defined timestamps expand that value, then old segments
> will be dropped immediately and hence the aggregate values will be returned
> as a single value.
> 
> Guozhang
> 
> 
> On Fri, Dec 2, 2016 at 11:58 AM, Matthias J. Sax <matth...@confluent.io>
> wrote:
> 
>> The extractor is used in
>>
>> org.apache.kafka.streams.processor.internals.RecordQueue#addRawRecords()
>>
>> Let us know, if you could resolve the problem or need more help.
>>
>> -Matthias
>>
>> On 12/2/16 11:46 AM, Sachin Mittal wrote:
>>> https://github.com/SOHU-Co/kafka-node/ this is the node js client i am
>>> using. The version is 0.5x. Can you please tell me what code in streams
>>> calls the timestamp extractor. I can look there to see if there is any
>>> issue.
>>>
>>> Again issue happens only when producing the messages using producer that
>> is
>>> compatible with kafka version 0.8x. I see that this producer does not
>> send
>>> a record timestamp as this was introduced in version 0.10 only.
>>>
>>> Thanks
>>> Sachin
>>>
>>> On 3 Dec 2016 1:03 a.m., "Matthias J. Sax" <matth...@confluent.io>
>> wrote:
>>>
>>>> I am not sure what is happening. That's why it would be good to have a
>>>> toy example to reproduce the issue.
>>>>
>>>> What do you mean by "Kafka node version 0.5"?
>>>>
>>>> -Matthias
>>>>
>>>> On 12/2/16 11:30 AM, Sachin Mittal wrote:
>>>>> I can provide with the data but data does not seem to be the issue.
>>>>> If I submit the same data and use same timestamp extractor  using the
>>>> java
>>>>> client with kafka version 0.10.0.1 aggregation works fine.
>>>>> I find the issue only when submitting the data with kafka node version
>>>> 0.5.
>>>>> It looks like the stream does not extract the time correctly in that
>>>> case.
>>>>>
>>>>> Thanks
>>>>> Sachin
>>>>>
>>>>> On 2 Dec 2016 11:41 p.m., "Matthias J. Sax" <matth...@confluent.io>
>>>> wrote:
>>>>>
>>>>>> Can you provide example input data (including timetamps) and result.
>>>>>> What is the expected result (ie, what aggregation do you apply)?
>>>>>>
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>> On 12/2/16 7:43 AM, Sachin Mittal wrote:
>>>>>>> Hi,
>>>>>>> After much debugging I found an issue with timestamp extractor.
>>>>>>>
>>>>>>> If I use a custom timestamp extractor with following code:
>>>>>>>      public static class MessageTimestampExtractor implements
>>>>>>> TimestampExtractor {
>>>>>>>          public long extract(ConsumerRecord<Object, Object> record) {
>>>>>>>              if (record.value() instanceof Message) {
>>>>>>>                  return ((Message) record.value()).ts;
>>>>>>>              } else {
>>>>>>>                  return record.timestamp();
>>>>>>>              }
>>>>>>>          }
>>>>>>>      }
>>>>>>>
>>>>>>> Here message has a long field ts which stores the timestamp, the
>>>>>>> aggregation does not work.
>>>>>>> Note I have checked and ts has valid timestamp values.
>>>>>>>
>>>>>>> However if I replace it with say WallclockTimestampExtractor
>>>> aggregation
>>>>>> is
>>>>>>> working fine.
>>>>>>>
>>>>>>> I do not understand what could be the issue here.
>>>>>>>
>>>>>>> Also note I am using kafka streams version 0.10.0.1 and I am
>> publishing
>>>>>>> messages via
>>>>>>> https://github.com/SOHU-Co/kafka-node/ whose version is quite old
>>>> 0.5.x
>>>>>>>
>>>>>>> Let me know if there is some bug in time stamp extractions.
>>>>>>>
>>>>>>> Thanks
>>>>>>> Sachin
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Nov 28, 2016 at 11:52 PM, Guozhang Wang <wangg...@gmail.com>
>>>>>> wrote:
>>>>>>>
>>>>>>>> Sachin,
>>>>>>>>
>>>>>>>> This is indeed a bit wired, and we'd like to try to re-produce your
>>>>>> issue
>>>>>>>> locally. Do you have a sample input data for us to try out?
>>>>>>>>
>>>>>>>> Guozhang
>>>>>>>>
>>>>>>>> On Fri, Nov 25, 2016 at 10:12 PM, Sachin Mittal <sjmit...@gmail.com
>>>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>> I fixed that sorted set issue but I am facing a weird problem
>> which I
>>>>>> am
>>>>>>>>> not able to replicate.
>>>>>>>>>
>>>>>>>>> Here is the sample problem that I could isolate:
>>>>>>>>> My class is like this:
>>>>>>>>>      public static class Message implements Comparable<Message> {
>>>>>>>>>          public long ts;
>>>>>>>>>          public String message;
>>>>>>>>>          public String key;
>>>>>>>>>          public Message() {};
>>>>>>>>>          public Message(long ts, String message, String key) {
>>>>>>>>>              this.ts = ts;
>>>>>>>>>              this.key = key;
>>>>>>>>>              this.message = message;
>>>>>>>>>          }
>>>>>>>>>          public int compareTo(Message paramT) {
>>>>>>>>>              long ts1 = paramT.ts;
>>>>>>>>>              return ts > ts1 ? 1 : -1;
>>>>>>>>>          }
>>>>>>>>>      }
>>>>>>>>>
>>>>>>>>> pipeline is like this:
>>>>>>>>> builder.stream(Serdes.String(), messageSerde,
>> "test-window-stream")\
>>>>>>>>>   .map(new KeyValueMapper<String, Message, KeyValue<String,
>>>> Message>>()
>>>>>> {
>>>>>>>>>       public KeyValue<String, Message> apply(String key, Message
>>>> value)
>>>>>> {
>>>>>>>>>           return new KeyValue<String, Message>(value.key, value);
>>>>>>>>>        }
>>>>>>>>>   })
>>>>>>>>> .through(Serdes.String(), messageSerde, "test-window-key-stream")
>>>>>>>>> .aggregateByKey(new Initializer<SortedSet<Message>>() {
>>>>>>>>>      public SortedSet<Message> apply() {
>>>>>>>>>          return new TreeSet<Message>();
>>>>>>>>>      }
>>>>>>>>> }, new Aggregator<String, Message, SortedSet<Message>>() {
>>>>>>>>>      public SortedSet<Message> apply(String aggKey, Message value,
>>>>>>>>> SortedSet<Message> aggregate) {
>>>>>>>>>          aggregate.add(value);
>>>>>>>>>          return aggregate;
>>>>>>>>>      }
>>>>>>>>> }, TimeWindows.of("stream-table", 10 * 1000L).advanceBy(5 * 1000L),
>>>>>>>>> Serdes.String(), messagesSerde)
>>>>>>>>> .foreach(new ForeachAction<Windowed<String>,
>> SortedSet<Message>>() {
>>>>>>>>>      public void apply(Windowed<String> key, SortedSet<Message>
>>>>>> messages)
>>>>>>>> {
>>>>>>>>>          ...
>>>>>>>>>      }
>>>>>>>>> });
>>>>>>>>>
>>>>>>>>> So basically I rekey the original message into another topic and
>> then
>>>>>>>>> aggregate it based on that key.
>>>>>>>>> What I have observed is that when I used windowed aggregation the
>>>>>>>>> aggregator does not use previous aggregated value.
>>>>>>>>>
>>>>>>>>> public SortedSet<Message> apply(String aggKey, Message value,
>>>>>>>>> SortedSet<Message> aggregate) {
>>>>>>>>>      aggregate.add(value);
>>>>>>>>>      return aggregate;
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> So in the above function the aggregate is an empty set of every
>> value
>>>>>>>>> entering into pipeline. When I remove the windowed aggregation, the
>>>>>>>>> aggregate set retains previously aggregated values in the set.
>>>>>>>>>
>>>>>>>>> I am just not able to wrap my head around it. When I ran this type
>> of
>>>>>>>> test
>>>>>>>>> locally on windows it is working fine. However a similar pipeline
>>>> setup
>>>>>>>>> when run against production on linux is behaving strangely and
>> always
>>>>>>>>> getting an empty aggregate set.
>>>>>>>>> Any idea what could be the reason, where should I look at the
>>>> problem.
>>>>>>>> Does
>>>>>>>>> length of key string matters here? I will later try to run the same
>>>>>>>> simple
>>>>>>>>> setup on linux and see what happens. But this is a very strange
>>>>>> behavior.
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>> Sachin
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Nov 23, 2016 at 12:04 AM, Guozhang Wang <
>> wangg...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hello Sachin,
>>>>>>>>>>
>>>>>>>>>> In the implementation of SortedSet, if the object's implemented
>> the
>>>>>>>>>> Comparable interface, that compareTo function is applied in "
>>>>>>>>>> aggregate.add(value);", and hence if it returns 0, this element
>> will
>>>>>>>> not
>>>>>>>>> be
>>>>>>>>>> added since it is a Set.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Guozhang
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Mon, Nov 21, 2016 at 10:06 PM, Sachin Mittal <
>> sjmit...@gmail.com
>>>>>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>> What I find is that when I use sorted set as aggregation it fails
>>>> to
>>>>>>>>>>> aggregate the values which have compareTo returning 0.
>>>>>>>>>>>
>>>>>>>>>>> My class is like this:
>>>>>>>>>>>      public class Message implements Comparable<Message> {
>>>>>>>>>>>          public long ts;
>>>>>>>>>>>          public String message;
>>>>>>>>>>>          public Message() {};
>>>>>>>>>>>          public Message(long ts, String message) {
>>>>>>>>>>>              this.ts = ts;
>>>>>>>>>>>              this.message = message;
>>>>>>>>>>>          }
>>>>>>>>>>>          public int compareTo(Message paramT) {
>>>>>>>>>>>              long ts1 = paramT.ts;
>>>>>>>>>>>              return ts == ts1 ? 0 : ts > ts1 ? 1 : -1;
>>>>>>>>>>>          }
>>>>>>>>>>>      }
>>>>>>>>>>>
>>>>>>>>>>> pipeline is like this:
>>>>>>>>>>> builder.stream(Serdes.String(), messageSerde,
>>>> "test-window-stream")
>>>>>>>>>>> .aggregateByKey(new Initializer<SortedSet<Message>>() {
>>>>>>>>>>>      public SortedSet<Message> apply() {
>>>>>>>>>>>          return new TreeSet<Message>();
>>>>>>>>>>>      }
>>>>>>>>>>> }, new Aggregator<String, Message, SortedSet<Message>>() {
>>>>>>>>>>>      public SortedSet<Message> apply(String aggKey, Message value,
>>>>>>>>>>> SortedSet<Message> aggregate) {
>>>>>>>>>>>          aggregate.add(value);
>>>>>>>>>>>          return aggregate;
>>>>>>>>>>>      }
>>>>>>>>>>> }, TimeWindows.of("stream-table", 10 * 1000L).advanceBy(5 *
>> 1000L),
>>>>>>>>>>> Serdes.String(), messagesSerde)
>>>>>>>>>>> .foreach(new ForeachAction<Windowed<String>,
>>>> SortedSet<Message>>() {
>>>>>>>>>>>      public void apply(Windowed<String> key, SortedSet<Message>
>>>>>>>>> messages)
>>>>>>>>>> {
>>>>>>>>>>>          ...
>>>>>>>>>>>      }
>>>>>>>>>>> });
>>>>>>>>>>>
>>>>>>>>>>> So any message published between 10 and 20 seconds gets
>> aggregated
>>>> in
>>>>>>>>> 10
>>>>>>>>>> -
>>>>>>>>>>> 20 bucket and I print the size of the set.
>>>>>>>>>>> However output I get is following:
>>>>>>>>>>>
>>>>>>>>>>> Published: 14
>>>>>>>>>>> Aggregated: 10  20 -> 1
>>>>>>>>>>>
>>>>>>>>>>> Published: 18
>>>>>>>>>>> Aggregated: 10  20 -> 2
>>>>>>>>>>>
>>>>>>>>>>> Published: 11
>>>>>>>>>>> Aggregated: 10  20 -> 3
>>>>>>>>>>>
>>>>>>>>>>> Published: 17
>>>>>>>>>>> Aggregated: 10  20 -> 4
>>>>>>>>>>>
>>>>>>>>>>> Published: 14
>>>>>>>>>>> Aggregated: 10  20 -> 4
>>>>>>>>>>>
>>>>>>>>>>> Published: 15
>>>>>>>>>>> Aggregated: 10  20 -> 5
>>>>>>>>>>>
>>>>>>>>>>> Published: 12
>>>>>>>>>>> Aggregated: key2  10  20 -> 6
>>>>>>>>>>>
>>>>>>>>>>> Published: 12
>>>>>>>>>>> Aggregated: 10  20 -> 6
>>>>>>>>>>>
>>>>>>>>>>> So if you see any message that occurs again for same second,
>> where
>>>>>>>>>>> compareTo returns 0, it fails to get aggregated in the pipeline.
>>>>>>>>>>> Notice ones published at 14 and 12 seconds.
>>>>>>>>>>>
>>>>>>>>>>> Now I am not sure if problem is with Java ie I should use
>>>> Comparator
>>>>>>>>>>> interface and not Comparable for my Message object. Or the
>> problem
>>>> is
>>>>>>>>>> with
>>>>>>>>>>> Kafka stream or with serializing and de-serializing the set of
>>>>>>>>> messages.
>>>>>>>>>> If
>>>>>>>>>>> I replace Set with List all is working fine.
>>>>>>>>>>>
>>>>>>>>>>> Anyway any ideas here would be appreciated, meanwhile let me see
>>>> what
>>>>>>>>> is
>>>>>>>>>>> the best java practice here.
>>>>>>>>>>>
>>>>>>>>>>> Thanks
>>>>>>>>>>> Sachin
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Nov 21, 2016 at 8:29 PM, Michael Noll <
>>>> mich...@confluent.io>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> On Mon, Nov 21, 2016 at 1:06 PM, Sachin Mittal <
>>>> sjmit...@gmail.com
>>>>>>>>>
>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> I am using kafka_2.10-0.10.0.1.
>>>>>>>>>>>>> Say I am having a window of 60 minutes advanced by 15 minutes.
>>>>>>>>>>>>> If the stream app using timestamp extractor puts the message in
>>>>>>>> one
>>>>>>>>>> or
>>>>>>>>>>>> more
>>>>>>>>>>>>> bucket(s), it will get aggregated in those buckets.
>>>>>>>>>>>>> I assume this statement is correct.
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Yes.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Also say when I restart the streams application then bucket
>>>>>>>>>> aggregation
>>>>>>>>>>>>> will resume from last point of halt.
>>>>>>>>>>>>> I hope this is also correct.
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Yes.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>> What I noticed that once a message is placed in one bucket,
>> that
>>>>>>>>>> bucket
>>>>>>>>>>>> was
>>>>>>>>>>>>> not getting new messages.
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> This should not happen...
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>> However when I ran a small test case replicating that, it is
>>>>>>>>> working
>>>>>>>>>>>>> properly. There maybe some issues in application reset.
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> ...and apparently it works (as expected) in your small test
>> case.
>>>>>>>>>>>>
>>>>>>>>>>>> Do you have any further information that you could share with us
>>>> so
>>>>>>>>> we
>>>>>>>>>>> can
>>>>>>>>>>>> help you better?  What's the difference, for example, between
>> your
>>>>>>>>>>> "normal"
>>>>>>>>>>>> use case and the small test case you have been referring to?
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> -Michael
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> -- Guozhang
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> -- Guozhang
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>>
> 
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to