Re: Is it a bad idea to use periods within a consumer group name? "my-service.topic1_consumer_group"

2016-12-13 Thread Praveen
Not that I know of. We at Flurry have been using periods in our group names
for a while now and haven't encountered any issues b/c of that.



On Tue, Dec 13, 2016 at 5:13 PM, Jeff Widman  wrote:

> I vaguely remember reading somewhere that it's a bad idea to use periods
> within Kafka consumer group names because it can potentially conflict with
> metric names.
>
> I've searched, and not finding anything, so am I just mis-remembering?
>
> It is operationally convenient because zookeeper CLI allows tab completion
> within periods.
>


Consumer/Publisher code is not throwing any exception if my Kafka broker is not running

2016-12-13 Thread Prasad Dls
Hi,

I am new to Kafka, could you please let me know why my application is not
getting any exception like *refuse to connect so and so host, *if the
broker is down/unable to connect

*Here is my producer code*

private static Map producerConfigs() {
Map props = new HashMap();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.CLIENT_ID_CONFIG, "ProducerTest_client");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"org.apache.kafka.clients.producer.internals.DefaultPartitioner");
return props;
}

Producer producer = new KafkaProducer<>(producerConfigs());
producer.send(new ProducerRecord("TEST.TOPIC", "TESTKEY"
"TEST VALUE";

*Here is my consumer code*

private Properties configProperties() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, <>);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, <>);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, <>);
props.put(ConsumerConfig.GROUP_ID_CONFIG, <>);
props.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, <>);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, <>);
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,<>);
return props;
}


*KafkaConsumer consumer = new KafkaConsumer(props);*

*consumer.subscribe(Arrays.asList(getTopics().split(",")));*

*while (true) {*

*ConsumerRecords records = consumer.poll(100); *
*}*

I need to write some logic in exception block, please suggest with solution

Thanks in Advance
Prasad


Re: How does 'TimeWindows.of().until()' work?

2016-12-13 Thread Sachin Mittal
Hi,
Thanks for the explanation. This illustration makes it super easy to
understand how until works. Perhaps we can update the wiki with this
illustration.
It is basically the retention time for a past window.
I used to think until creates all the future windows for that period and
when time passes that it used to delete all the past windows. However
actually until retains a window for specified time. This makes so much more
sense.

I just had one pending query regarding:

> windowstore.changelog.additional.retention.ms

How does this relate to rentention.ms param of topic config?
I create internal topic manually using say rentention.ms=360.
In next release (post kafka_2.10-0.10.0.1) since we support delete of
internal changelog topic as well and I want it to be retained for say just
1 hour.
So how does that above parameter interfere with this topic level setting.
Or now I just need to set above config as 360 and not add
rentention.ms=360
while creating internal topic.

Thanks
Sachin


On Tue, Dec 13, 2016 at 11:27 PM, Matthias J. Sax 
wrote:

> First, windows are only created if there is actual data for a window. So
> you get windows [0, 50), [25, 75), [50, 100) only if there are record
> falling into each window (btw: window start-time is inclusive while
> window end time is exclusive). If you have only 2 record with lets say
> ts=20 and ts=90 you will not have an open window [25,75). Each window is
> physically created each time the first record for it is processed.
>
> If you have above 4 windows and a record with ts=101 arrives, a new
> window [101,151) will be created. Window [0,50) will not be deleted yet,
> because retention is 100 and thus Streams guarantees that all record
> with ts >= 1 (= 101 - 100) are still processed correctly and those
> records would fall into window [0,50).
>
> Thus, window [0,50) can be dropped, if time advanced to TS = 150, but
> not before that.
>
> -Matthias
>
>
> On 12/13/16 12:06 AM, Sachin Mittal wrote:
> > Hi,
> > So is until for future or past?
> > Say I get first record at t = 0 and until is 100 and my window size is 50
> > advance by 25.
> > I understand it will create windows (0, 50), (25, 75), (50, 100)
> > Now at t = 101 it will drop
> > (0, 50), (25, 75), (50, 100) and create
> > (101, 150), (125, 175), (150, 200)
> >
> > Please confirm if this understanding us correct. It is not clear how it
> > will handle overlapping windows (75, 125) and (175, 225) and so on?
> >
> > What case is not clear again is that at say t = 102 I get some message
> with
> > timestamp 99. What happens then?
> > Will the result added to previous aggregation of (50, 100) or (75, 125),
> > like it should.
> >
> > Or it will recreate the old window (50, 100) and aggregate the value
> there
> > and then drop it. This would result is wrong aggregated value, as it does
> > not consider the previous aggregated values.
> >
> > So this is the pressing case I am not able to understand. Maybe I am
> wrong
> > at some basic understanding.
> >
> >
> > Next for
> > The parameter
> >> windowstore.changelog.additional.retention.ms
> >
> > How does this relate to rentention.ms param of topic config?
> > I create internal topic manually using say rentention.ms=360.
> > In next release (post kafka_2.10-0.10.0.1) since we support delete of
> > internal changelog topic as well and I want it to be retained for say
> just
> > 1 hour.
> > So how does that above parameter interfere with this topic level setting.
> > Or now I just need to set above config as 360 and not add
> > rentention.ms=360
> > while creating internal topic.
> > This is just another doubt remaining here.
> >
> > Thanks
> > Sachin
> >
> >
> >
> > On Tue, Dec 13, 2016 at 3:02 AM, Matthias J. Sax 
> > wrote:
> >
> >> Sachin,
> >>
> >> There is no reason to have an .until() AND a .retain() -- just increase
> >> the value of .until()
> >>
> >> If you have a window of let's say 1h size and you set .until() also to
> >> 1h -- you can obviously not process any late arriving data. If you set
> >> until() to 2h is this example, you can process data that is up to 1h
> >> delayed.
> >>
> >> So basically, the retention should always be larger than you window
> size.
> >>
> >> The parameter
> >>> windowstore.changelog.additional.retention.ms
> >>
> >> is applies to changelog topics that backup window state stores. Those
> >> changelog topics are compacted. However, the used key does encode an
> >> window ID and thus older data can never be cleaned up by compaction.
> >> Therefore, an additional retention time is applied to those topics, too.
> >> Thus, if an old window is not updated for this amount of time, it will
> >> get deleted eventually preventing this topic to grown infinitely.
> >>
> >> The value will be determined by until(), i.e., whatever you specify in
> >> .until() will be used to set this parameter.
> >>
> >>
> >> -Matthias
> >>
> >> On 12/12/16 1:07 AM, Sachin 

Kafka Errors and Hung Brokers

2016-12-13 Thread Shailesh Hemdev
We are using a 3 node Kafka cluster and are encountering some weird issues.

1) On Each node, when we tail the server.log file under /var/log/kafka we
see continuous errors like these

pic-partition. (kafka.server.ReplicaFetcherThread)
[2016-12-14 02:39:30,747] ERROR [ReplicaFetcherThread-0-441], Error for
partition [dev-core-platform-logging,15] to broker 441:org.apache.kafka.
common.errors.NotLeaderForPartitionException: This server is not the leader
for that topic-partition. (kafka.server.ReplicaFetcherThread)

The broker is up and is showing under zookeeper. So it is not clear why
these errors occur

2) Occasionally we will find a Kafka broker that goes down. We have
adjusted the Ulimit to increase open files as well as added 6g to the heap.
When the broker goes down, the process is itself up but is de registered
from Zookeeper

Thanks,

*Shailesh *

-- 

*Shailesh Hemdev*
Manager, Software Engineering
shailesh.hem...@foresee.com
p (734) 352-6247


-- 


This email communication (including any attachments) contains information 
from Answers Corporation or its affiliates that is confidential and may be 
privileged. The information contained herein is intended only for the use 
of the addressee(s) named above. If you are not the intended recipient (or 
the agent responsible to deliver it to the intended recipient), you are 
hereby notified that any dissemination, distribution, use, or copying of 
this communication is strictly prohibited. If you have received this email 
in error, please immediately reply to sender, delete the message and 
destroy all copies of it. If you have questions, please email 
le...@answers.com. 

If you wish to unsubscribe to commercial emails from Answers and its 
affiliates, please go to the Answers Subscription Center 
http://campaigns.answers.com/subscriptions to opt out.  Thank you.


Kafka Errors and Hung Brokers

2016-12-13 Thread Shailesh Hemdev
We are using a 3 node Kafka cluster and are encountering some weird issues.

1) On Each node, when we tail the server.log file under /var/log/kafka we
see continuous errors like these

pic-partition. (kafka.server.ReplicaFetcherThread)
[2016-12-14 02:39:30,747] ERROR [ReplicaFetcherThread-0-441], Error for
partition [dev-core-platform-logging,15] to broker
441:org.apache.kafka.common.errors.NotLeaderForPartitionException: This
server is not the leader for that topic-partition.
(kafka.server.ReplicaFetcherThread)

The broker is up and is showing under zookeeper. So it is not clear why
these errors occur

2) Occasionally we will find a Kafka broker that goes down. We have
adjusted the Ulimit to increase open files as well as added 6g to the heap.
When the broker goes down, the process is itself up but is de registered
from Zookeeper

Thanks,

*Shailesh *

-- 


This email communication (including any attachments) contains information 
from Answers Corporation or its affiliates that is confidential and may be 
privileged. The information contained herein is intended only for the use 
of the addressee(s) named above. If you are not the intended recipient (or 
the agent responsible to deliver it to the intended recipient), you are 
hereby notified that any dissemination, distribution, use, or copying of 
this communication is strictly prohibited. If you have received this email 
in error, please immediately reply to sender, delete the message and 
destroy all copies of it. If you have questions, please email 
le...@answers.com. 

If you wish to unsubscribe to commercial emails from Answers and its 
affiliates, please go to the Answers Subscription Center 
http://campaigns.answers.com/subscriptions to opt out.  Thank you.


Weird message regarding kafka Cluster

2016-12-13 Thread Chieh-Chun Chang
Hello Sir:

My name is Chieh-Chun Chang and and we have a problem about our Kafka prod
cluster.


Kafka client version
kafka-clients-0.9.0-kafka-2.0.0.jar
Kafka version
kafka_2.10-0.9.0-kafka-2.0.0.jar

Our kafka broker cluster is experiencing  under replicated partitions
problems and I found out this information
2016-12-13 07:20:24,393 DEBUG org.apache.kafka.common.network.Selector:
Connection with /10.1.205.75 disconnected
java.io.EOFException
at
org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83)
at
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
at
org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:153)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:134)
at
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:323)
at org.apache.kafka.common.network.Selector.poll(Selector.java:283)
at kafka.network.Processor.poll(SocketServer.scala:472)
at kafka.network.Processor.run(SocketServer.scala:412)
at java.lang.Thread.run(Thread.java:745)
And this is source code



the ip address(10.1.205.75 ) might be random host macihne, might be yarn
cluster machine or one producer machine and it seems relating  to under
replicated partitions problems regarding timestamp.

This is my speculation.
I did research on this and it seems that 0.9.0 will not keep alive socket
connection so this might happen.

but it seems that once it happens, it will delay yarn cluster execution
time so i am wondering if this is a expected behavior.

Would you mind commenting on this  why this might happen?

Thank you very much,
Chieh-Chun Chang


Re: Website Update, Part 2

2016-12-13 Thread Gwen Shapira
Hi,

Since we are breaking down the docs, we can no longer use ctrl-f to find
where to find specific things we are looking for... maybe it is time to add
a site search bar? I think google has something we can embed.

On Tue, Dec 13, 2016 at 6:12 PM, Guozhang Wang  wrote:

> Folks,
>
> We are continuing to improve our website, and one of it is to break the
> single gigantic "documentation" page:
>
> https://kafka.apache.org/documentation/
>
> into sub-spaces and sub-pages for better visibility. As the first step of
> this effort, we will be gradually extract each section of this page into a
> separate page and then grow each one of them in their own sub-space.
>
> As of now, we have extract Streams section out of documentation as
>
> https://kafka.apache.org/documentation/streams
>
> while all the existing hashtags are preserved and re-directed via JS (many
> thanks to Derrick!) so that we do not loose any SEO. At the same time I
> have updated the "website doc contributions" wiki a bit with guidance on
> locally displaying and debugging doc changes with this refactoring:
>
> https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Website+
> Documentation+Changes
>
>
> We are trying to do the same for Connect, Ops, Configs, APIs etc in the
> near future. Any comments, improvements, and contributions are welcome and
> encouraged.
>
>
> --
> -- Guozhang
>



-- 
*Gwen Shapira*
Product Manager | Confluent
650.450.2760 | @gwenshap
Follow us: Twitter  | blog



Website Update, Part 2

2016-12-13 Thread Guozhang Wang
Folks,

We are continuing to improve our website, and one of it is to break the
single gigantic "documentation" page:

https://kafka.apache.org/documentation/

into sub-spaces and sub-pages for better visibility. As the first step of
this effort, we will be gradually extract each section of this page into a
separate page and then grow each one of them in their own sub-space.

As of now, we have extract Streams section out of documentation as

https://kafka.apache.org/documentation/streams

while all the existing hashtags are preserved and re-directed via JS (many
thanks to Derrick!) so that we do not loose any SEO. At the same time I
have updated the "website doc contributions" wiki a bit with guidance on
locally displaying and debugging doc changes with this refactoring:

https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Website+Documentation+Changes


We are trying to do the same for Connect, Ops, Configs, APIs etc in the
near future. Any comments, improvements, and contributions are welcome and
encouraged.


-- 
-- Guozhang


Re: lag for a specific partitions on newly added host

2016-12-13 Thread Jeremy Hansen
The new host has been in place for over a week. Lag is still high on partitions 
that exist on that new host.  Should I attempt another reassign?

Thanks
-jeremy

> On Dec 13, 2016, at 5:43 PM, Apurva Mehta  wrote:
> 
> How did you add the host and when did you measure the lag? If you used the
> reassign-partitions script, it will move partitions to the new host, but
> the data copy will take time. in that period, those partitions will lag.
> However, once the reassign-partitions script finishes, the partitions on
> the new replica should be caught up and no longer demonstrate any lag.
> 
>> On Fri, Dec 9, 2016 at 8:58 PM, Jeremy Hansen  wrote:
>> 
>> Here’s the topic description:
>> 
>> Topic:blumfrub  PartitionCount:15   ReplicationFactor:5 Configs:
>>Topic: blumfrub Partition: 0Leader: 1001Replicas:
>> 1001,0,1,2,3  Isr: 0,1001,1,2,3
>>Topic: blumfrub Partition: 1Leader: 0   Replicas:
>> 0,1,2,3,4 Isr: 0,1,2,3,4
>>Topic: blumfrub Partition: 2Leader: 1   Replicas:
>> 1,2,3,4,1001  Isr: 1001,1,2,3,4
>>Topic: blumfrub Partition: 3Leader: 2   Replicas:
>> 2,3,4,1001,0  Isr: 0,1001,2,3,4
>>Topic: blumfrub Partition: 4Leader: 0   Replicas:
>> 3,4,1001,0,1  Isr: 0,1001,1,3,4
>>Topic: blumfrub Partition: 5Leader: 4   Replicas:
>> 4,1001,0,1,2  Isr: 0,1001,1,2,4
>>Topic: blumfrub Partition: 6Leader: 1001Replicas:
>> 1001,1,2,3,4  Isr: 1001,1,2,3,4
>>Topic: blumfrub Partition: 7Leader: 0   Replicas:
>> 0,2,3,4,1001  Isr: 0,1001,2,3,4
>>Topic: blumfrub Partition: 8Leader: 1   Replicas:
>> 1,3,4,1001,0  Isr: 0,1001,1,3,4
>>Topic: blumfrub Partition: 9Leader: 2   Replicas:
>> 2,4,1001,0,1  Isr: 0,1001,1,2,4
>>Topic: blumfrub Partition: 10   Leader: 0   Replicas:
>> 3,1001,0,1,2  Isr: 0,1001,1,2,3
>>Topic: blumfrub Partition: 11   Leader: 4   Replicas:
>> 4,0,1,2,3 Isr: 0,1,2,3,4
>>Topic: blumfrub Partition: 12   Leader: 1001Replicas:
>> 1001,2,3,4,0  Isr: 0,1001,2,3,4
>>Topic: blumfrub Partition: 13   Leader: 0   Replicas:
>> 0,3,4,1001,1  Isr: 0,1001,1,3,4
>>Topic: blumfrub Partition: 14   Leader: 1   Replicas:
>> 1,4,1001,0,2  Isr: 0,1001,1,2,4
>> 
>> 1001 is the new broker.
>> 
>> -jeremy
>> 
>> 
>> 
>>> On Dec 9, 2016, at 8:55 PM, Jeremy Hansen  wrote:
>>> 
>>> I added a new host to kafka.  Partitions that fall on this new host have
>> a very high lag and I’m trying to understand why this would be and how to
>> fix it.
>>> 
>>> iloveconsuming  blumfrub  0
>> 5434682 7416933 1982251 iloveconsuming_kf0001.host.
>> com-1481225033576-47393b55-0
>>> iloveconsuming  blumfrub  1
>> 7416828 7416875 47  iloveconsuming_kf0001.host.
>> com-1481225033769-17152bca-0
>>> iloveconsuming  blumfrub  2
>> 7416799 7416848 49  iloveconsuming_kf0001.host.
>> com-1481225033791-77a30285-0
>>> iloveconsuming  blumfrub  3
>> 7416898 7416903 5   iloveconsuming_kf0001.host.
>> com-1481225033822-d088f844-0
>>> iloveconsuming  blumfrub  4
>> 7416891 7416925 34  iloveconsuming_kf0001.host.
>> com-1481225033846-78f8e5b5-0
>>> iloveconsuming  blumfrub  5
>> 7416843 7416883 40  iloveconsuming_kf0001.host.
>> com-1481225033869-54027178-0
>>> iloveconsuming  blumfrub  6
>> 5434720 7416896 1982176 iloveconsuming_kf0001.host.
>> com-1481225033891-cc3f6bf6-0
>>> iloveconsuming  blumfrub  7
>> 7416896 7416954 58  iloveconsuming_kf0001.host.
>> com-1481225033915-79b49de8-0
>>> iloveconsuming  blumfrub  8
>> 7416849 7416898 49  iloveconsuming_kf0001.host.
>> com-1481225033939-1fe784c0-0
>>> iloveconsuming  blumfrub  9
>> 7416898 7416917 19  iloveconsuming_kf0001.host.
>> com-1481225033961-40cc3185-0
>>> iloveconsuming  blumfrub  10
>> 354571863545722135  iloveconsuming_kf0001.host.
>> com-1481225033998-a817062e-0
>>> iloveconsuming  blumfrub  11
>> 7416866 7416909 43  iloveconsuming_kf0001.host.
>> com-1481225034020-7a15999e-0
>>> iloveconsuming  blumfrub  12
>> 5434739 7416907 1982168 iloveconsuming_kf0001.host.
>> com-1481225034043-badde97c-0
>>> iloveconsuming  blumfrub  13

Re: lag for a specific partitions on newly added host

2016-12-13 Thread Apurva Mehta
How did you add the host and when did you measure the lag? If you used the
reassign-partitions script, it will move partitions to the new host, but
the data copy will take time. in that period, those partitions will lag.
However, once the reassign-partitions script finishes, the partitions on
the new replica should be caught up and no longer demonstrate any lag.

On Fri, Dec 9, 2016 at 8:58 PM, Jeremy Hansen  wrote:

> Here’s the topic description:
>
> Topic:blumfrub  PartitionCount:15   ReplicationFactor:5 Configs:
> Topic: blumfrub Partition: 0Leader: 1001Replicas:
> 1001,0,1,2,3  Isr: 0,1001,1,2,3
> Topic: blumfrub Partition: 1Leader: 0   Replicas:
> 0,1,2,3,4 Isr: 0,1,2,3,4
> Topic: blumfrub Partition: 2Leader: 1   Replicas:
> 1,2,3,4,1001  Isr: 1001,1,2,3,4
> Topic: blumfrub Partition: 3Leader: 2   Replicas:
> 2,3,4,1001,0  Isr: 0,1001,2,3,4
> Topic: blumfrub Partition: 4Leader: 0   Replicas:
> 3,4,1001,0,1  Isr: 0,1001,1,3,4
> Topic: blumfrub Partition: 5Leader: 4   Replicas:
> 4,1001,0,1,2  Isr: 0,1001,1,2,4
> Topic: blumfrub Partition: 6Leader: 1001Replicas:
> 1001,1,2,3,4  Isr: 1001,1,2,3,4
> Topic: blumfrub Partition: 7Leader: 0   Replicas:
> 0,2,3,4,1001  Isr: 0,1001,2,3,4
> Topic: blumfrub Partition: 8Leader: 1   Replicas:
> 1,3,4,1001,0  Isr: 0,1001,1,3,4
> Topic: blumfrub Partition: 9Leader: 2   Replicas:
> 2,4,1001,0,1  Isr: 0,1001,1,2,4
> Topic: blumfrub Partition: 10   Leader: 0   Replicas:
> 3,1001,0,1,2  Isr: 0,1001,1,2,3
> Topic: blumfrub Partition: 11   Leader: 4   Replicas:
> 4,0,1,2,3 Isr: 0,1,2,3,4
> Topic: blumfrub Partition: 12   Leader: 1001Replicas:
> 1001,2,3,4,0  Isr: 0,1001,2,3,4
> Topic: blumfrub Partition: 13   Leader: 0   Replicas:
> 0,3,4,1001,1  Isr: 0,1001,1,3,4
> Topic: blumfrub Partition: 14   Leader: 1   Replicas:
> 1,4,1001,0,2  Isr: 0,1001,1,2,4
>
> 1001 is the new broker.
>
> -jeremy
>
>
>
> > On Dec 9, 2016, at 8:55 PM, Jeremy Hansen  wrote:
> >
> > I added a new host to kafka.  Partitions that fall on this new host have
> a very high lag and I’m trying to understand why this would be and how to
> fix it.
> >
> > iloveconsuming  blumfrub  0
> 5434682 7416933 1982251 iloveconsuming_kf0001.host.
> com-1481225033576-47393b55-0
> > iloveconsuming  blumfrub  1
> 7416828 7416875 47  iloveconsuming_kf0001.host.
> com-1481225033769-17152bca-0
> > iloveconsuming  blumfrub  2
> 7416799 7416848 49  iloveconsuming_kf0001.host.
> com-1481225033791-77a30285-0
> > iloveconsuming  blumfrub  3
> 7416898 7416903 5   iloveconsuming_kf0001.host.
> com-1481225033822-d088f844-0
> > iloveconsuming  blumfrub  4
> 7416891 7416925 34  iloveconsuming_kf0001.host.
> com-1481225033846-78f8e5b5-0
> > iloveconsuming  blumfrub  5
> 7416843 7416883 40  iloveconsuming_kf0001.host.
> com-1481225033869-54027178-0
> > iloveconsuming  blumfrub  6
> 5434720 7416896 1982176 iloveconsuming_kf0001.host.
> com-1481225033891-cc3f6bf6-0
> > iloveconsuming  blumfrub  7
> 7416896 7416954 58  iloveconsuming_kf0001.host.
> com-1481225033915-79b49de8-0
> > iloveconsuming  blumfrub  8
> 7416849 7416898 49  iloveconsuming_kf0001.host.
> com-1481225033939-1fe784c0-0
> > iloveconsuming  blumfrub  9
> 7416898 7416917 19  iloveconsuming_kf0001.host.
> com-1481225033961-40cc3185-0
> > iloveconsuming  blumfrub  10
>  354571863545722135  iloveconsuming_kf0001.host.
> com-1481225033998-a817062e-0
> > iloveconsuming  blumfrub  11
>  7416866 7416909 43  iloveconsuming_kf0001.host.
> com-1481225034020-7a15999e-0
> > iloveconsuming  blumfrub  12
>  5434739 7416907 1982168 iloveconsuming_kf0001.host.
> com-1481225034043-badde97c-0
> > iloveconsuming  blumfrub  13
>  7416818 7416865 47  iloveconsuming_kf0001.host.
> com-1481225034066-6e77e3dc-0
> > iloveconsuming  blumfrub  14
>  7416901 7416947 46  iloveconsuming_kf0002.host.
> com-1481225107317-32355c51-0
> >
> > Why do the partitions 

Re: kafka commands taking a long time

2016-12-13 Thread Apurva Mehta
That is certainly odd. What's the latency when using the kafka console
producers and consumers? Is it much faster? If it is, I would just strace
the kafka-topics command to see where it is spending the time.

On Thu, Dec 8, 2016 at 7:21 AM, Stephen Cresswell <
stephen.cressw...@gmail.com> wrote:

> I followed the quickstart instructions at
> https://kafka.apache.org/quickstart and everything seems to be working ok,
> except that commands take a long time to run, e.g.
>
> $ time bin/kafka-topics.sh --list --zookeeper localhost:2181
>
> real 0m11.751s
> user 0m1.540s
> sys 0m0.273s
>
> The zookeeper logging shows that the request is processed in a few
> milliseconds, so I think it's related to the kafka JVM configuration. If I
> remove com.sun.management.jmxremote it's takes 6 seconds but this is still
> much longer than I would have expected.
>
> Any suggestions on how to speed things up?
>


Kafka ACL's with SSL Protocol is not working

2016-12-13 Thread Raghu B
Hi All,

I am trying to enable ACL's in my Kafka cluster with along with SSL
Protocol.

I tried with each and every parameters but no luck, so I need help to
enable the SSL(without Kerberos) and I am attaching all the configuration
details in this.

Kindly Help me.


*I tested SSL without ACL, it worked fine
(listeners=SSL://10.247.195.122:9093 )*


*This is my Kafka server properties file:*

*# ACL SETTINGS #*

*auto.create.topics.enable=true*

*authorizer.class.name
=kafka.security.auth.SimpleAclAuthorizer*

*security.inter.broker.protocol=SSL*

*#allow.everyone.if.no.acl.found=true*

*#principal.builder.class=CustomizedPrincipalBuilderClass*

*#super.users=User:"CN=writeuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown"*

*#super.users=User:Raghu;User:Admin*

*#offsets.storage=kafka*

*#dual.commit.enabled=true*

*listeners=SSL://10.247.195.122:9093 *

*#listeners=PLAINTEXT://10.247.195.122:9092 *

*#listeners=PLAINTEXT://10.247.195.122:9092
,SSL://10.247.195.122:9093
*

*#advertised.listeners=PLAINTEXT://10.247.195.122:9092
*


*
ssl.keystore.location=/home/raghu/kafka/security/server.keystore.jks*

*ssl.keystore.password=123456*

*ssl.key.password=123456*

*
ssl.truststore.location=/home/raghu/kafka/security/server.truststore.jks*

*ssl.truststore.password=123456*



*Set the ACL from Authorizer CLI:*

> *bin/kafka-acls.sh --authorizer-properties
zookeeper.connect=10.247.195.122:2181  --list
--topic ssltopic*

*Current ACLs for resource `Topic:ssltopic`: *

*  User:CN=writeuser, OU=Unknown, O=Unknown, L=Unknown, ST=Unknown,
C=Unknown has Allow permission for operations: Write from hosts: * *


*XXXWMXXX-7:kafka_2.11-0.10.1.0 rbaddam$ bin/kafka-console-producer.sh
--broker-list 10.247.195.122:9093  --topic
ssltopic --producer.config client-ssl.properties*


*[2016-12-13 14:53:45,839] WARN Error while fetching metadata with
correlation id 0 : {ssltopic=UNKNOWN_TOPIC_OR_PARTITION}
(org.apache.kafka.clients.NetworkClient)*

*[2016-12-13 14:53:45,984] WARN Error while fetching metadata with
correlation id 1 : {ssltopic=UNKNOWN_TOPIC_OR_PARTITION}
(org.apache.kafka.clients.NetworkClient)*


*XXXWMXXX-7:kafka_2.11-0.10.1.0 rbaddam$ cat client-ssl.properties*

*#group.id =sslgroup*

*security.protocol=SSL*

*ssl.truststore.location=/Users/rbaddam/Desktop/Dev/kafka_2.11-0.10.1.0/ssl/client.truststore.jks*

*ssl.truststore.password=123456*

* #Configure Below if you use Client Auth*


*ssl.keystore.location=/Users/rbaddam/Desktop/Dev/kafka_2.11-0.10.1.0/ssl/client.keystore.jks*

*ssl.keystore.password=123456*

*ssl.key.password=123456*


*XXXWMXXX-7:kafka_2.11-0.10.1.0 rbaddam$ bin/kafka-console-consumer.sh
--bootstrap-server 10.247.195.122:9093 
--new-consumer --consumer.config client-ssl.properties --topic ssltopic
--from-beginning*

*[2016-12-13 14:53:28,817] WARN Error while fetching metadata with
correlation id 1 : {ssltopic=UNKNOWN_TOPIC_OR_PARTITION}
(org.apache.kafka.clients.NetworkClient)*

*[2016-12-13 14:53:28,819] ERROR Unknown error when running consumer:
(kafka.tools.ConsoleConsumer$)*

*org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized
to access group: console-consumer-52826*


Thanks in advance,

Raghu - raghu98...@gmail.com


Re: failed to delete kafka topic when building from source

2016-12-13 Thread Apurva Mehta
How are you trying to delete the topic? Next time this occurs, can you
check whether the process has permissions to perform that operation?

On Mon, Dec 12, 2016 at 10:55 PM, Sachin Mittal  wrote:

> Hi,
> I recently built an application from source and I get the following
> exception when trying to delete a topic
>
> kafka.common.KafkaStorageException: Failed to rename log directory from
> D:\tmp\kafka-logs\test-window-stream-0 to
> D:\tmp\kafka-logs\test-window-stream-0.0ce9f915431397d1c2dad4f535
> a3-delete
> at kafka.log.LogManager.asyncDelete(LogManager.scala:451)
> at
> kafka.cluster.Partition$$anonfun$delete$1.apply$mcV$sp(
> Partition.scala:164)
>
>
> Note that I have set delete.topic.enable=true
>
> Also this was all working fine when using kafka_2.10-0.10.0.1.
> Issue happens with kafka_2.10-0.10.2.0-SNAPSHOT
>
> Thanks
> Sachin
>


Re: How does 'TimeWindows.of().until()' work?

2016-12-13 Thread Matthias J. Sax
> So a given window (with a '.until()' setting) is triggered for closing by
> the presence of a record outside the .until() setting?

Yes (and all windows do have a setting for it -- if you do not call it
in you code, default is 1 day).

However, in Kafka Streams, there is no notion of "closing a window" as
other system do. Thus we do not use the term "closing" but "discarding"
for this case as no computation will be triggered -- the window is just
dropped. A computation is triggered for each update of the window, ie,
for each record that falls into a window.


> If the timestamps for records jump about  by a value larger than the
.until
> value you could have windows being created / deleted quite a bit then?

Yes. But you have to think differently. A record will not "jump ahead"
but all records (with smaller timestamp that this record) that are
processes after this record are late. And if they are too late, we do
not guarantee that they are processed.

Think like this


1,2,3,4.nothing for along time..100,6,7,8

so record with ts=100 did not jump ahead, but records with ts=6,7,8 are
super late.



-Matthias




On 12/13/16 10:01 AM, Jon Yeargers wrote:
> So a given window (with a '.until()' setting) is triggered for closing by
> the presence of a record outside the .until() setting?
> 
> If the timestamps for records jump about  by a value larger than the .until
> value you could have windows being created / deleted quite a bit then?
> 
> On Tue, Dec 13, 2016 at 9:57 AM, Matthias J. Sax 
> wrote:
> 
>> First, windows are only created if there is actual data for a window. So
>> you get windows [0, 50), [25, 75), [50, 100) only if there are record
>> falling into each window (btw: window start-time is inclusive while
>> window end time is exclusive). If you have only 2 record with lets say
>> ts=20 and ts=90 you will not have an open window [25,75). Each window is
>> physically created each time the first record for it is processed.
>>
>> If you have above 4 windows and a record with ts=101 arrives, a new
>> window [101,151) will be created. Window [0,50) will not be deleted yet,
>> because retention is 100 and thus Streams guarantees that all record
>> with ts >= 1 (= 101 - 100) are still processed correctly and those
>> records would fall into window [0,50).
>>
>> Thus, window [0,50) can be dropped, if time advanced to TS = 150, but
>> not before that.
>>
>> -Matthias
>>
>>
>> On 12/13/16 12:06 AM, Sachin Mittal wrote:
>>> Hi,
>>> So is until for future or past?
>>> Say I get first record at t = 0 and until is 100 and my window size is 50
>>> advance by 25.
>>> I understand it will create windows (0, 50), (25, 75), (50, 100)
>>> Now at t = 101 it will drop
>>> (0, 50), (25, 75), (50, 100) and create
>>> (101, 150), (125, 175), (150, 200)
>>>
>>> Please confirm if this understanding us correct. It is not clear how it
>>> will handle overlapping windows (75, 125) and (175, 225) and so on?
>>>
>>> What case is not clear again is that at say t = 102 I get some message
>> with
>>> timestamp 99. What happens then?
>>> Will the result added to previous aggregation of (50, 100) or (75, 125),
>>> like it should.
>>>
>>> Or it will recreate the old window (50, 100) and aggregate the value
>> there
>>> and then drop it. This would result is wrong aggregated value, as it does
>>> not consider the previous aggregated values.
>>>
>>> So this is the pressing case I am not able to understand. Maybe I am
>> wrong
>>> at some basic understanding.
>>>
>>>
>>> Next for
>>> The parameter
 windowstore.changelog.additional.retention.ms
>>>
>>> How does this relate to rentention.ms param of topic config?
>>> I create internal topic manually using say rentention.ms=360.
>>> In next release (post kafka_2.10-0.10.0.1) since we support delete of
>>> internal changelog topic as well and I want it to be retained for say
>> just
>>> 1 hour.
>>> So how does that above parameter interfere with this topic level setting.
>>> Or now I just need to set above config as 360 and not add
>>> rentention.ms=360
>>> while creating internal topic.
>>> This is just another doubt remaining here.
>>>
>>> Thanks
>>> Sachin
>>>
>>>
>>>
>>> On Tue, Dec 13, 2016 at 3:02 AM, Matthias J. Sax 
>>> wrote:
>>>
 Sachin,

 There is no reason to have an .until() AND a .retain() -- just increase
 the value of .until()

 If you have a window of let's say 1h size and you set .until() also to
 1h -- you can obviously not process any late arriving data. If you set
 until() to 2h is this example, you can process data that is up to 1h
 delayed.

 So basically, the retention should always be larger than you window
>> size.

 The parameter
> windowstore.changelog.additional.retention.ms

 is applies to changelog topics that backup window state stores. Those
 changelog topics are compacted. However, the used key does encode an
 

Re: ActiveControllerCount is always will be either 0 or 1 in 3 nodes kafka cluster?

2016-12-13 Thread Apurva Mehta
Thanks Sven, I will followup and ensure that the document is tightened up.

Apurva

On Mon, Dec 12, 2016 at 4:57 AM, Sven Ludwig  wrote:

> Hi,
>
> in JMX each Kafka broker has a value 1 or 0 for ActiveControllerCount. As
> I understood from this thread, the sum of these values across the cluster
> should never be something other than 1. The documentation at
> http://docs.confluent.io/3.1.0/kafka/monitoring.html should be improved
> to make that clear. Currently it is misleading:
>
> kafka.controller:type=KafkaController,name=ActiveControllerCount
> Number of active controllers in the cluster. Alert if value is anything
> other than 1.
>
> Suggested:
>
> kafka.controller:type=KafkaController,name=ActiveControllerCount
> Number of active controllers on a broker. Alert if the aggregated sum
> across all brokers in the cluster is anything other than 1, because in a
> cluster there should only be one broker with an active controller (cluster
> singleton).
>
> Kind Regards,
> Sven
>


Re: How does 'TimeWindows.of().until()' work?

2016-12-13 Thread Jon Yeargers
So a given window (with a '.until()' setting) is triggered for closing by
the presence of a record outside the .until() setting?

If the timestamps for records jump about  by a value larger than the .until
value you could have windows being created / deleted quite a bit then?

On Tue, Dec 13, 2016 at 9:57 AM, Matthias J. Sax 
wrote:

> First, windows are only created if there is actual data for a window. So
> you get windows [0, 50), [25, 75), [50, 100) only if there are record
> falling into each window (btw: window start-time is inclusive while
> window end time is exclusive). If you have only 2 record with lets say
> ts=20 and ts=90 you will not have an open window [25,75). Each window is
> physically created each time the first record for it is processed.
>
> If you have above 4 windows and a record with ts=101 arrives, a new
> window [101,151) will be created. Window [0,50) will not be deleted yet,
> because retention is 100 and thus Streams guarantees that all record
> with ts >= 1 (= 101 - 100) are still processed correctly and those
> records would fall into window [0,50).
>
> Thus, window [0,50) can be dropped, if time advanced to TS = 150, but
> not before that.
>
> -Matthias
>
>
> On 12/13/16 12:06 AM, Sachin Mittal wrote:
> > Hi,
> > So is until for future or past?
> > Say I get first record at t = 0 and until is 100 and my window size is 50
> > advance by 25.
> > I understand it will create windows (0, 50), (25, 75), (50, 100)
> > Now at t = 101 it will drop
> > (0, 50), (25, 75), (50, 100) and create
> > (101, 150), (125, 175), (150, 200)
> >
> > Please confirm if this understanding us correct. It is not clear how it
> > will handle overlapping windows (75, 125) and (175, 225) and so on?
> >
> > What case is not clear again is that at say t = 102 I get some message
> with
> > timestamp 99. What happens then?
> > Will the result added to previous aggregation of (50, 100) or (75, 125),
> > like it should.
> >
> > Or it will recreate the old window (50, 100) and aggregate the value
> there
> > and then drop it. This would result is wrong aggregated value, as it does
> > not consider the previous aggregated values.
> >
> > So this is the pressing case I am not able to understand. Maybe I am
> wrong
> > at some basic understanding.
> >
> >
> > Next for
> > The parameter
> >> windowstore.changelog.additional.retention.ms
> >
> > How does this relate to rentention.ms param of topic config?
> > I create internal topic manually using say rentention.ms=360.
> > In next release (post kafka_2.10-0.10.0.1) since we support delete of
> > internal changelog topic as well and I want it to be retained for say
> just
> > 1 hour.
> > So how does that above parameter interfere with this topic level setting.
> > Or now I just need to set above config as 360 and not add
> > rentention.ms=360
> > while creating internal topic.
> > This is just another doubt remaining here.
> >
> > Thanks
> > Sachin
> >
> >
> >
> > On Tue, Dec 13, 2016 at 3:02 AM, Matthias J. Sax 
> > wrote:
> >
> >> Sachin,
> >>
> >> There is no reason to have an .until() AND a .retain() -- just increase
> >> the value of .until()
> >>
> >> If you have a window of let's say 1h size and you set .until() also to
> >> 1h -- you can obviously not process any late arriving data. If you set
> >> until() to 2h is this example, you can process data that is up to 1h
> >> delayed.
> >>
> >> So basically, the retention should always be larger than you window
> size.
> >>
> >> The parameter
> >>> windowstore.changelog.additional.retention.ms
> >>
> >> is applies to changelog topics that backup window state stores. Those
> >> changelog topics are compacted. However, the used key does encode an
> >> window ID and thus older data can never be cleaned up by compaction.
> >> Therefore, an additional retention time is applied to those topics, too.
> >> Thus, if an old window is not updated for this amount of time, it will
> >> get deleted eventually preventing this topic to grown infinitely.
> >>
> >> The value will be determined by until(), i.e., whatever you specify in
> >> .until() will be used to set this parameter.
> >>
> >>
> >> -Matthias
> >>
> >> On 12/12/16 1:07 AM, Sachin Mittal wrote:
> >>> Hi,
> >>> We are facing the exact problem as described by Matthias above.
> >>> We are keeping default until which is 1 day.
> >>>
> >>> Our record's times tamp extractor has a field which increases with
> time.
> >>> However for short time we cannot guarantee the time stamp is always
> >>> increases. So at the boundary ie after 24 hrs we can get records which
> >> are
> >>> beyond that windows retention period.
> >>>
> >>> Then it happens like it is mentioned above and our aggregation fails.
> >>>
> >>> So just to sum up when we get record
> >>> 24h + 1 sec (it deletes older window and since the new record belongs
> to
> >>> the new window its gets created)
> >>> Now when we get next record of 24 hs - 1 sec 

reasonable KStream app config settings?

2016-12-13 Thread Jon Yeargers
My app seems to be continuously rebalancing. If I said it processed data
maybe 3 minutes / hour I wouldn't be exaggerating. Surely this isn't normal
behavior.

My config is:

config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP);
config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_IP);
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "PRTMinuteAgg" );
config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
AggKey.class.getName());
config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());

config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "2");
config.put(StreamsConfig.STATE_DIR_CONFIG, "/mnt/PRTMinuteAgg");

config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "2");

config.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "5242880");

config.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "60");

Does this seem reasonable / rational? Some default that I shouldn't rely on?


Re: How does 'TimeWindows.of().until()' work?

2016-12-13 Thread Matthias J. Sax
First, windows are only created if there is actual data for a window. So
you get windows [0, 50), [25, 75), [50, 100) only if there are record
falling into each window (btw: window start-time is inclusive while
window end time is exclusive). If you have only 2 record with lets say
ts=20 and ts=90 you will not have an open window [25,75). Each window is
physically created each time the first record for it is processed.

If you have above 4 windows and a record with ts=101 arrives, a new
window [101,151) will be created. Window [0,50) will not be deleted yet,
because retention is 100 and thus Streams guarantees that all record
with ts >= 1 (= 101 - 100) are still processed correctly and those
records would fall into window [0,50).

Thus, window [0,50) can be dropped, if time advanced to TS = 150, but
not before that.

-Matthias


On 12/13/16 12:06 AM, Sachin Mittal wrote:
> Hi,
> So is until for future or past?
> Say I get first record at t = 0 and until is 100 and my window size is 50
> advance by 25.
> I understand it will create windows (0, 50), (25, 75), (50, 100)
> Now at t = 101 it will drop
> (0, 50), (25, 75), (50, 100) and create
> (101, 150), (125, 175), (150, 200)
> 
> Please confirm if this understanding us correct. It is not clear how it
> will handle overlapping windows (75, 125) and (175, 225) and so on?
> 
> What case is not clear again is that at say t = 102 I get some message with
> timestamp 99. What happens then?
> Will the result added to previous aggregation of (50, 100) or (75, 125),
> like it should.
> 
> Or it will recreate the old window (50, 100) and aggregate the value there
> and then drop it. This would result is wrong aggregated value, as it does
> not consider the previous aggregated values.
> 
> So this is the pressing case I am not able to understand. Maybe I am wrong
> at some basic understanding.
> 
> 
> Next for
> The parameter
>> windowstore.changelog.additional.retention.ms
> 
> How does this relate to rentention.ms param of topic config?
> I create internal topic manually using say rentention.ms=360.
> In next release (post kafka_2.10-0.10.0.1) since we support delete of
> internal changelog topic as well and I want it to be retained for say just
> 1 hour.
> So how does that above parameter interfere with this topic level setting.
> Or now I just need to set above config as 360 and not add
> rentention.ms=360
> while creating internal topic.
> This is just another doubt remaining here.
> 
> Thanks
> Sachin
> 
> 
> 
> On Tue, Dec 13, 2016 at 3:02 AM, Matthias J. Sax 
> wrote:
> 
>> Sachin,
>>
>> There is no reason to have an .until() AND a .retain() -- just increase
>> the value of .until()
>>
>> If you have a window of let's say 1h size and you set .until() also to
>> 1h -- you can obviously not process any late arriving data. If you set
>> until() to 2h is this example, you can process data that is up to 1h
>> delayed.
>>
>> So basically, the retention should always be larger than you window size.
>>
>> The parameter
>>> windowstore.changelog.additional.retention.ms
>>
>> is applies to changelog topics that backup window state stores. Those
>> changelog topics are compacted. However, the used key does encode an
>> window ID and thus older data can never be cleaned up by compaction.
>> Therefore, an additional retention time is applied to those topics, too.
>> Thus, if an old window is not updated for this amount of time, it will
>> get deleted eventually preventing this topic to grown infinitely.
>>
>> The value will be determined by until(), i.e., whatever you specify in
>> .until() will be used to set this parameter.
>>
>>
>> -Matthias
>>
>> On 12/12/16 1:07 AM, Sachin Mittal wrote:
>>> Hi,
>>> We are facing the exact problem as described by Matthias above.
>>> We are keeping default until which is 1 day.
>>>
>>> Our record's times tamp extractor has a field which increases with time.
>>> However for short time we cannot guarantee the time stamp is always
>>> increases. So at the boundary ie after 24 hrs we can get records which
>> are
>>> beyond that windows retention period.
>>>
>>> Then it happens like it is mentioned above and our aggregation fails.
>>>
>>> So just to sum up when we get record
>>> 24h + 1 sec (it deletes older window and since the new record belongs to
>>> the new window its gets created)
>>> Now when we get next record of 24 hs - 1 sec since older window is
>> dropped
>>> it does not get aggregated in that bucket.
>>>
>>> I suggest we have another setting next to until call retain which retains
>>> the older windows into next window.
>>>
>>> I think at stream window boundary level it should use a concept of
>> sliding
>>> window. So we can define window like
>>>
>>> TimeWindows.of("test-table", 3600 * 1000l).advanceBy(1800 *
>> 1000l).untill(7
>>> * 24 * 3600 * 1000l).retain(900 * 1000l)
>>>
>>> So after 7 days it retains the data covered by windows in last 15 minutes
>>> which rolls over the data in them to 

Re: Kafka windowed table not aggregating correctly

2016-12-13 Thread Matthias J. Sax
Just increase the retention time so the window is not dropped and can
accept later arriving data.

About your example: retention time specified via until() is a minimum
retention time! It can happen, that a window is kept longer.


-Matthias

On 12/12/16 11:49 PM, Sachin Mittal wrote:
> Hi,
> Well it does help in case you mentioned, but in the case when on 2017 Dec
> 12 12:01 AM if we receive a message stamped 2017 Dec 11 11:59 PM, it will
> either drop this message or create a fresh older window and aggregate the
> message in that, and then drop the window.
> It is not clear which of the case it will do. But here both cases are
> wrong, as ideally it should have aggregated that message into previous
> aggregation and not start a fresh older aggregation (since on Dec 12 12:00
> AM, we drop older windows and create fresh ones.)
> 
> Could you please explain this case.
> 
> I am trying to reproduce this scenario and have written a small java
> program which runs against latest kafka source. Build against trunk git
> commit of 01d58ad8e039181ade742cf896a08199e3cb7483
> 
> Here I am publishing messages with ts
> TS, TS + 5,  TS + 1, TS + 6, TS + 2, TS + 7, TS + 3, TS + 8, TS + 4, + TS +
> 9, TS + 5 ...
> I hope you get an idea where TS is generally increasing but a next TS can
> have value less than previous one.
> 
> My window is
> TimeWindows.of(60 * 1000L).advanceBy(30 * 1000L).until(2 * 60 * 1000L)
> ie 1 min rolling by 30 seconds and until 2 minutes when we discard the old
> and create new one.
> 
> What I observe is that it always aggregate the result in first bucket it
> creates even after until timestamp is elapsed. So kind of confused here.
> 
> See if you can give me some insight into rolling window. Here is the code
> attached.
> 
> 
> Thanks
> Sachin
> --
> 
> import java.io.ByteArrayOutputStream;
> import java.util.Date;
> import java.util.Map;
> import java.util.Properties;
> import java.util.SortedSet;
> import java.util.TreeSet;
> 
> import org.apache.kafka.clients.consumer.ConsumerConfig;
> import org.apache.kafka.clients.consumer.ConsumerRecord;
> import org.apache.kafka.clients.producer.KafkaProducer;
> import org.apache.kafka.clients.producer.ProducerRecord;
> import org.apache.kafka.common.errors.SerializationException;
> import org.apache.kafka.common.serialization.Deserializer;
> import org.apache.kafka.common.serialization.Serde;
> import org.apache.kafka.common.serialization.Serdes;
> import org.apache.kafka.common.serialization.Serializer;
> import org.apache.kafka.streams.KafkaStreams;
> import org.apache.kafka.streams.StreamsConfig;
> import org.apache.kafka.streams.kstream.Aggregator;
> import org.apache.kafka.streams.kstream.ForeachAction;
> import org.apache.kafka.streams.kstream.Initializer;
> import org.apache.kafka.streams.kstream.KStreamBuilder;
> import org.apache.kafka.streams.kstream.TimeWindows;
> import org.apache.kafka.streams.kstream.Windowed;
> import org.apache.kafka.streams.processor.TimestampExtractor;
> 
> import com.fasterxml.jackson.core.type.TypeReference;
> import com.fasterxml.jackson.databind.ObjectMapper;
> 
> public class TestKafkaWindowStream {
> 
> public static void main(String[] args) {
> //start the producer
> Producer producerThread = new Producer();
> producerThread.start();
> //aggregate the messages via stream
> final Serde messageSerde = Serdes.serdeFrom(new
> MessageSerializer(), new MessageDeserializer());
> final Serde messagesSerde =
> Serdes.serdeFrom(new Serializer() {
> private ObjectMapper objectMapper = new ObjectMapper();
> public void close() {}
> public void configure(Map paramMap, boolean
> paramBoolean) {}
> public byte[] serialize(String paramString, SortedSet
> messages) {
> if (messages == null) {
> return null;
> }
> try {
> ByteArrayOutputStream out = new ByteArrayOutputStream();
> objectMapper.writeValue(out, messages);
> return out.toByteArray();
> } catch (Exception e) {
> throw new SerializationException("Error serializing
> JSON message", e);
> }
> }
> }, new Deserializer() {
> private ObjectMapper objectMapper = new ObjectMapper();
> public void close() {}
> public void configure(Map paramMap, boolean
> paramBoolean) {}
> public SortedSet deserialize(String paramString,
> byte[] paramArrayOfByte) {
> if (paramArrayOfByte == null) {
> return null;
> }
> SortedSet data = 

Re: checking consumer lag on KStreams app?

2016-12-13 Thread Matthias J. Sax
There is a workaround:

http://stackoverflow.com/questions/41097126/how-to-get-the-group-commit-offset-from-kafka0-10-x

I never had time to follow up with this issue. Will put it back on the
agenda. Should really get fixed.


-Matthias

On 12/13/16 2:03 AM, Sachin Mittal wrote:
> If this is a bug then it is not fixed because I just build kafka from
> source and it gave me the reported error.
> 
> On Tue, Dec 13, 2016 at 3:24 PM, Sam Pegler 
> wrote:
> 
>>> You can only check the offsets when there is an active member of the
>> consumer group.
>>
>> This was a bug [1] thats been fixed.  Thanks to Vincent Dautremont for
>> pointing this out to me a while ago.
>>
>> http://mail-archives.apache.org/mod_mbox/kafka-users/201611.mbox/%
>> 3CCAD2WViSAgwc9i4-9xEw1oz1xzpsbveFt1%3DSZ0qkHRiFEc3fXbw%40mail.
>> gmail.com%3E
>>
>> __
>>
>> Sam Pegler
>>
>> PRODUCTION ENGINEER
>>
>> T. +44(0) 07 562 867 486
>>
>> 
>> 3-7 Herbal Hill / London / EC1R 5EJ
>> www.infectiousmedia.com
>>
>> This email and any attachments are confidential and may also be privileged.
>> If you
>> are not the intended recipient, please notify the sender immediately, and
>> do not
>> disclose the contents to another person, use it for any purpose, or store,
>> or copy
>> the information in any medium. Please also destroy and delete the message
>> from
>> your computer.
>>
>>
>> On 13 December 2016 at 09:39, Damian Guy  wrote:
>>
>>> Hi Sachin
>>>
>>> That is correct. You can only check the offsets when there is an active
>>> member of the consumer group. In this case that would mean that you have
>> at
>>> least one instance of your streams application running.
>>>
>>> Thanks,
>>> Damian
>>>
>>> On Tue, 13 Dec 2016 at 06:58 Sachin Mittal  wrote:
>>>
 Hi,
 I used the following command
 bin\windows\kafka-consumer-groups.bat --bootstrap-server
>> localhost:9092
 --describe --group test
 and I get the following output

 Note: This will only show information about consumers that use the Java
 consumer API (non-ZooKeeper-based consumers).

 Error: Consumer group 'test' has no active members.

 What does this mean.

 It means I can check the offset of consumer only when streams
>> applictaion
 "test" is running.

 Thanks
 Sachin


 On Mon, Dec 12, 2016 at 8:33 PM, Damian Guy 
>>> wrote:

> Hi Sachin,
>
> You should use the kafka-consumer-groups.sh command. The
> ConsumerOffsetChecker is deprecated and is only for the old consumer.
>
> Thanks,
> Damian
>
> On Mon, 12 Dec 2016 at 14:32 Sachin Mittal 
>> wrote:
>
>> Hi,
>> I have a streams application running with application id test.
>> When I try to check consumer lag like you suggested I get the
>>> following
>> issue:
>>
>> bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker
>> --zookeeper
>> localhost:2181 --group test
>> [2016-12-12 10:26:01,348] WARN WARNING: ConsumerOffsetChecker is
>> deprecated and will be dropped in releases following 0.9.0. Use
>> ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$)
>> SLF4J: Class path contains multiple SLF4J bindings.
>> SLF4J: Found binding in
>>
>> [jar:file:/home/testuser/kafka/kafka_2.10-0.10.0.1/
> libs/logback-classic-1.0.3.jar!/org/slf4j/impl/
>>> StaticLoggerBinder.class]
>> SLF4J: Found binding in
>>
>> [jar:file:/home/testuser/kafka/kafka_2.10-0.10.0.1/
> libs/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/
>>> StaticLoggerBinder.class]
>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for
>> an
>> explanation.
>> SLF4J: Actual binding is of type
>> [ch.qos.logback.classic.selector.DefaultContextSelector]
>> Exiting due to: org.apache.zookeeper.KeeperException$
>>> NoNodeException:
>> KeeperErrorCode = NoNode for /consumers/test/owners.
>>
>> Please let me know where I may be going wrong.
>> I have the kafka logs set in folder
>> /data01/testuser/kafka-logs
>>
>> Under kafka-logs I see many folders with name something like
>> consumer_offsets_*
>>
>> I have the stream dir set in folder
>> /data01/testuser/kafka-streams/test
>>
>> Thanks
>> Sachin
>>
>>
>> On Sun, Dec 11, 2016 at 2:19 AM, Matthias J. Sax <
 matth...@confluent.io>
>> wrote:
>>
>>> It's basically just a consumer as any other. The application.id
>> is
> used
>>> as consumer group.id.
>>>
>>> So just use the available tools you do use to check consumer lag.
>>>
>>>
>>> -Matthias
>>>
>>> On 12/9/16 5:49 PM, Jon Yeargers wrote:
 How would this be done?

>>>
>>>
>>
>

>>>
>>
> 



signature.asc
Description: OpenPGP digital signature


Re: "Log end offset should not change while restoring"

2016-12-13 Thread Matthias J. Sax
StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG

The only one cache parameter. Do you use code auto-completion? It's also
in the docs:

http://docs.confluent.io/current/streams/developer-guide.html#optional-configuration-parameters


-Matthias

On 12/12/16 8:22 PM, Jon Yeargers wrote:
> What is the specific cache config setting?
> 
> On Mon, Dec 12, 2016 at 1:49 PM, Matthias J. Sax 
> wrote:
> 
>> We discovered a few more bugs and a bug fix release 0.10.1.1 is planned
>> already.
>>
>> The voting started for it, and it should get release the next weeks.
>>
>> If you issues is related to this caching problem, disabling the cache
>> via StreamsConfig should fix the problem for now. Just set the cache
>> size to zero.
>>
>>
>> -Matthias
>>
>>
>> On 12/12/16 2:31 AM, Jon Yeargers wrote:
>>> Im seeing this error occur more frequently of late. I ran across this
>>> thread:
>>> https://groups.google.com/forum/#!topic/confluent-platform/AH5QClSNZBw
>>>
>>>
>>> The implication from the thread is that a fix is available. Where can I
>> get
>>> it?
>>>
>>
>>
> 



signature.asc
Description: OpenPGP digital signature


Re: Another odd error

2016-12-13 Thread Jon Yeargers
As near as I can see it's rebalancing constantly.

I'll up that value and see what happens.

On Tue, Dec 13, 2016 at 9:04 AM, Damian Guy  wrote:

> Hi Jon,
>
> I haven't had much of a chance to look at the logs in detail too much yet,
> but i have noticed that your app seems to be rebalancing frequently.  It
> seems that it is usually around the 300 second mark, which usually would
> mean that poll hasn't been called for at least that long. You might want to
> try setting the config ConsumerConfig.MAX_POLL_INTERVAL_CONFIG to
> something
> higher than 30 (which is the default).
>
> I'll continue to look at your logs and get back to you.
> Thanks,
> Damian
>
> On Tue, 13 Dec 2016 at 15:02 Jon Yeargers 
> wrote:
>
> > attached is a log with lots of disconnections and a small amount of
> > actual, useful activity.
> >
> >
> >
> > On Tue, Dec 13, 2016 at 4:57 AM, Jon Yeargers 
> > wrote:
> >
> > n/m - I understand the logging issue now. Am generating a new one. Will
> > send shortly.
> >
> > On Tue, Dec 13, 2016 at 4:55 AM, Jon Yeargers 
> > wrote:
> >
> > Yes - saw that one. There were plenty of smaller records available
> though.
> >
> > I sent another log this morning with the level set to DEBUG. Hopefully
> you
> > rec'd it.
> >
> > On Tue, Dec 13, 2016 at 3:58 AM, Damian Guy 
> wrote:
> >
> > HI Jon,
> >
> > It looks like you have the logging level for KafkaStreams set to at least
> > WARN. I can only see ERROR level logs being produced from Streams.
> >
> > However, i did notice an issue in the logs (not related to your specific
> > error but you will need to fix anyway):
> >
> > There are lots of messages like:
> > task [2_9] Error sending record to topic
> > PRTMinuteAgg-prt_hour_agg_stream-changelog
> > org.apache.kafka.common.errors.RecordTooLargeException: The message is
> > 2381750 bytes when serialized which is larger than the maximum
> >
> > This means you need to add some extra config to your StreamsConfig:
> > config.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,
> > expectedMaximumMessageSizeBytes)
> >
> > You will also possible need to adjust the broker properties and
> > increase message.max.bytes
> > - it will need to be at least as large as the setting above.
> >
> > At the moment all of the change-logs for your state-stores are being
> > dropped due to this issue.
> >
> > Thanks,
> > Damian
> >
> >
> >
> > On Tue, 13 Dec 2016 at 11:32 Jon Yeargers 
> > wrote:
> >
> > > (am attaching a debug log - note that app terminated with no further
> > > messages)
> > >
> > > topology: kStream -> groupByKey.aggregate(minute) -> foreach
> > >  \-> groupByKey.aggregate(hour) -> foreach
> > >
> > >
> > > config:
> > >
> > > Properties config = new Properties();
> > > config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP);
> > > config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG,
> ZOOKEEPER_IP);
> > > config.put(StreamsConfig.APPLICATION_ID_CONFIG,
> "PRTMinuteAgg" );
> > > config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> > > AggKey.class.getName());
> > > config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> > > Serdes.String().getClass().getName());
> > > config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
> > > config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "2");
> > > config.put(StreamsConfig.STATE_DIR_CONFIG,
> "/mnt/PRTMinuteAgg");
> > >
> > > config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "2");
> > >
> > >
> > > On Mon, Dec 12, 2016 at 4:45 PM, Guozhang Wang 
> > wrote:
> > >
> > > Jon,
> > >
> > > To help investigating this issue, could you let me know 1) your
> topology
> > > sketch and 2) your app configs? For example did you enable caching in
> > your
> > > apps with the cache.max.bytes.buffering config?
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Sun, Dec 11, 2016 at 3:44 PM, Jon Yeargers <
> jon.yearg...@cedexis.com>
> > > wrote:
> > >
> > > > I get this one quite a bit. It kills my app after a short time of
> > > running.
> > > > Driving me nuts.
> > > >
> > > > On Sun, Dec 11, 2016 at 2:17 PM, Matthias J. Sax <
> > matth...@confluent.io>
> > > > wrote:
> > > >
> > > > > Not sure about this one.
> > > > >
> > > > > Can you describe what you do exactly? Can you reproduce the issue?
> We
> > > > > definitely want to investigate this.
> > > > >
> > > > > -Matthias
> > > > >
> > > > > On 12/10/16 4:17 PM, Jon Yeargers wrote:
> > > > > > (Am reporting these as have moved to 0.10.1.0-cp2)
> > > > > >
> > > > > > ERROR  o.a.k.c.c.i.ConsumerCoordinator - User provided listener
> > > > > > org.apache.kafka.streams.processor.internals.StreamThread$1 for
> > group
> > > > > > MinuteAgg failed on partition assignment
> > > > > >
> > > > > > java.lang.IllegalStateException: task [1_9] Log end 

Re: Another odd error

2016-12-13 Thread Damian Guy
Hi Jon,

I haven't had much of a chance to look at the logs in detail too much yet,
but i have noticed that your app seems to be rebalancing frequently.  It
seems that it is usually around the 300 second mark, which usually would
mean that poll hasn't been called for at least that long. You might want to
try setting the config ConsumerConfig.MAX_POLL_INTERVAL_CONFIG to something
higher than 30 (which is the default).

I'll continue to look at your logs and get back to you.
Thanks,
Damian

On Tue, 13 Dec 2016 at 15:02 Jon Yeargers  wrote:

> attached is a log with lots of disconnections and a small amount of
> actual, useful activity.
>
>
>
> On Tue, Dec 13, 2016 at 4:57 AM, Jon Yeargers 
> wrote:
>
> n/m - I understand the logging issue now. Am generating a new one. Will
> send shortly.
>
> On Tue, Dec 13, 2016 at 4:55 AM, Jon Yeargers 
> wrote:
>
> Yes - saw that one. There were plenty of smaller records available though.
>
> I sent another log this morning with the level set to DEBUG. Hopefully you
> rec'd it.
>
> On Tue, Dec 13, 2016 at 3:58 AM, Damian Guy  wrote:
>
> HI Jon,
>
> It looks like you have the logging level for KafkaStreams set to at least
> WARN. I can only see ERROR level logs being produced from Streams.
>
> However, i did notice an issue in the logs (not related to your specific
> error but you will need to fix anyway):
>
> There are lots of messages like:
> task [2_9] Error sending record to topic
> PRTMinuteAgg-prt_hour_agg_stream-changelog
> org.apache.kafka.common.errors.RecordTooLargeException: The message is
> 2381750 bytes when serialized which is larger than the maximum
>
> This means you need to add some extra config to your StreamsConfig:
> config.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,
> expectedMaximumMessageSizeBytes)
>
> You will also possible need to adjust the broker properties and
> increase message.max.bytes
> - it will need to be at least as large as the setting above.
>
> At the moment all of the change-logs for your state-stores are being
> dropped due to this issue.
>
> Thanks,
> Damian
>
>
>
> On Tue, 13 Dec 2016 at 11:32 Jon Yeargers 
> wrote:
>
> > (am attaching a debug log - note that app terminated with no further
> > messages)
> >
> > topology: kStream -> groupByKey.aggregate(minute) -> foreach
> >  \-> groupByKey.aggregate(hour) -> foreach
> >
> >
> > config:
> >
> > Properties config = new Properties();
> > config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP);
> > config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_IP);
> > config.put(StreamsConfig.APPLICATION_ID_CONFIG, "PRTMinuteAgg" );
> > config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> > AggKey.class.getName());
> > config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> > Serdes.String().getClass().getName());
> > config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
> > config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "2");
> > config.put(StreamsConfig.STATE_DIR_CONFIG, "/mnt/PRTMinuteAgg");
> >
> > config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "2");
> >
> >
> > On Mon, Dec 12, 2016 at 4:45 PM, Guozhang Wang 
> wrote:
> >
> > Jon,
> >
> > To help investigating this issue, could you let me know 1) your topology
> > sketch and 2) your app configs? For example did you enable caching in
> your
> > apps with the cache.max.bytes.buffering config?
> >
> >
> > Guozhang
> >
> >
> > On Sun, Dec 11, 2016 at 3:44 PM, Jon Yeargers 
> > wrote:
> >
> > > I get this one quite a bit. It kills my app after a short time of
> > running.
> > > Driving me nuts.
> > >
> > > On Sun, Dec 11, 2016 at 2:17 PM, Matthias J. Sax <
> matth...@confluent.io>
> > > wrote:
> > >
> > > > Not sure about this one.
> > > >
> > > > Can you describe what you do exactly? Can you reproduce the issue? We
> > > > definitely want to investigate this.
> > > >
> > > > -Matthias
> > > >
> > > > On 12/10/16 4:17 PM, Jon Yeargers wrote:
> > > > > (Am reporting these as have moved to 0.10.1.0-cp2)
> > > > >
> > > > > ERROR  o.a.k.c.c.i.ConsumerCoordinator - User provided listener
> > > > > org.apache.kafka.streams.processor.internals.StreamThread$1 for
> group
> > > > > MinuteAgg failed on partition assignment
> > > > >
> > > > > java.lang.IllegalStateException: task [1_9] Log end offset should
> not
> > > > > change while restoring
> > > > >
> > > > > at
>
> > > > > org.apache.kafka.streams.processor.internals.ProcessorStateManager.
> > > > restoreActiveState(ProcessorStateManager.java:245)
> > > > >
> > > > > at
> > > > > org.apache.kafka.streams.processor.internals.ProcessorStateManager.
>
> > > > register(ProcessorStateManager.java:198)
> > > > >
> > > > > at
> > > > > 

consumer throttling based on rate quotas leads to client errors

2016-12-13 Thread Paul Mackles
Hi - We are using kafka_2.11-0.9.0.1. Using the kafka-configs.sh command, we 
set the consumer_byte_rate for a specific client.id that was misbehaving.


The new setting definitely should have led to some throttling.


What we found was that connections from that client.id started failing with the 
following error:


org.apache.kafka.common.protocol.types.SchemaException: Error reading field 
'responses': Error reading array of size 554098, only 64 bytes available


Here is the stack trace:


org.apache.kafka.common.protocol.types.SchemaException: Error reading field 
'responses': Error reading array of size 554098, only 64 bytes available
at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71)
at 
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:439)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)


Resetting the consumer_byte_rate to the default for that client.id immediately 
fixed the errors.


I couldn't find anything in Jira that looked similar to this. Has anyone seen 
anything like this before?


Thanks,

Paul



Re: Another odd error

2016-12-13 Thread Jon Yeargers
n/m - I understand the logging issue now. Am generating a new one. Will
send shortly.

On Tue, Dec 13, 2016 at 4:55 AM, Jon Yeargers 
wrote:

> Yes - saw that one. There were plenty of smaller records available though.
>
> I sent another log this morning with the level set to DEBUG. Hopefully you
> rec'd it.
>
> On Tue, Dec 13, 2016 at 3:58 AM, Damian Guy  wrote:
>
>> HI Jon,
>>
>> It looks like you have the logging level for KafkaStreams set to at least
>> WARN. I can only see ERROR level logs being produced from Streams.
>>
>> However, i did notice an issue in the logs (not related to your specific
>> error but you will need to fix anyway):
>>
>> There are lots of messages like:
>> task [2_9] Error sending record to topic
>> PRTMinuteAgg-prt_hour_agg_stream-changelog
>> org.apache.kafka.common.errors.RecordTooLargeException: The message is
>> 2381750 bytes when serialized which is larger than the maximum
>>
>> This means you need to add some extra config to your StreamsConfig:
>> config.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,
>> expectedMaximumMessageSizeBytes)
>>
>> You will also possible need to adjust the broker properties and
>> increase message.max.bytes
>> - it will need to be at least as large as the setting above.
>>
>> At the moment all of the change-logs for your state-stores are being
>> dropped due to this issue.
>>
>> Thanks,
>> Damian
>>
>>
>> On Tue, 13 Dec 2016 at 11:32 Jon Yeargers 
>> wrote:
>>
>> > (am attaching a debug log - note that app terminated with no further
>> > messages)
>> >
>> > topology: kStream -> groupByKey.aggregate(minute) -> foreach
>> >  \-> groupByKey.aggregate(hour) -> foreach
>> >
>> >
>> > config:
>> >
>> > Properties config = new Properties();
>> > config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP);
>> > config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG,
>> ZOOKEEPER_IP);
>> > config.put(StreamsConfig.APPLICATION_ID_CONFIG, "PRTMinuteAgg"
>> );
>> > config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
>> > AggKey.class.getName());
>> > config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
>> > Serdes.String().getClass().getName());
>> > config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
>> > config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "2");
>> > config.put(StreamsConfig.STATE_DIR_CONFIG,
>> "/mnt/PRTMinuteAgg");
>> >
>> > config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "2");
>> >
>> >
>> > On Mon, Dec 12, 2016 at 4:45 PM, Guozhang Wang 
>> wrote:
>> >
>> > Jon,
>> >
>> > To help investigating this issue, could you let me know 1) your topology
>> > sketch and 2) your app configs? For example did you enable caching in
>> your
>> > apps with the cache.max.bytes.buffering config?
>> >
>> >
>> > Guozhang
>> >
>> >
>> > On Sun, Dec 11, 2016 at 3:44 PM, Jon Yeargers > >
>> > wrote:
>> >
>> > > I get this one quite a bit. It kills my app after a short time of
>> > running.
>> > > Driving me nuts.
>> > >
>> > > On Sun, Dec 11, 2016 at 2:17 PM, Matthias J. Sax <
>> matth...@confluent.io>
>> > > wrote:
>> > >
>> > > > Not sure about this one.
>> > > >
>> > > > Can you describe what you do exactly? Can you reproduce the issue?
>> We
>> > > > definitely want to investigate this.
>> > > >
>> > > > -Matthias
>> > > >
>> > > > On 12/10/16 4:17 PM, Jon Yeargers wrote:
>> > > > > (Am reporting these as have moved to 0.10.1.0-cp2)
>> > > > >
>> > > > > ERROR  o.a.k.c.c.i.ConsumerCoordinator - User provided listener
>> > > > > org.apache.kafka.streams.processor.internals.StreamThread$1 for
>> group
>> > > > > MinuteAgg failed on partition assignment
>> > > > >
>> > > > > java.lang.IllegalStateException: task [1_9] Log end offset
>> should not
>> > > > > change while restoring
>> > > > >
>> > > > > at
>> > > > > org.apache.kafka.streams.processor.internals.ProcessorStateM
>> anager.
>> > > > restoreActiveState(ProcessorStateManager.java:245)
>> > > > >
>> > > > > at
>> > > > > org.apache.kafka.streams.processor.internals.ProcessorStateM
>> anager.
>> > > > register(ProcessorStateManager.java:198)
>> > > > >
>> > > > > at
>> > > > > org.apache.kafka.streams.processor.internals.
>> > > > ProcessorContextImpl.register(ProcessorContextImpl.java:123)
>> > > > >
>> > > > > at
>> > > > > org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(
>> > > > RocksDBWindowStore.java:206)
>> > > > >
>> > > > > at
>> > > > > org.apache.kafka.streams.state.internals.MeteredWindowStore.init(
>> > > > MeteredWindowStore.java:66)
>> > > > >
>> > > > > at
>> > > > > org.apache.kafka.streams.state.internals.CachingWindowStore.init(
>> > > > CachingWindowStore.java:64)
>> > > > >
>> > > > > at
>> > > > > org.apache.kafka.streams.processor.internals.AbstractTask.
>> > > > 

Re: Another odd error

2016-12-13 Thread Jon Yeargers
Yes - saw that one. There were plenty of smaller records available though.

I sent another log this morning with the level set to DEBUG. Hopefully you
rec'd it.

On Tue, Dec 13, 2016 at 3:58 AM, Damian Guy  wrote:

> HI Jon,
>
> It looks like you have the logging level for KafkaStreams set to at least
> WARN. I can only see ERROR level logs being produced from Streams.
>
> However, i did notice an issue in the logs (not related to your specific
> error but you will need to fix anyway):
>
> There are lots of messages like:
> task [2_9] Error sending record to topic
> PRTMinuteAgg-prt_hour_agg_stream-changelog
> org.apache.kafka.common.errors.RecordTooLargeException: The message is
> 2381750 bytes when serialized which is larger than the maximum
>
> This means you need to add some extra config to your StreamsConfig:
> config.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,
> expectedMaximumMessageSizeBytes)
>
> You will also possible need to adjust the broker properties and
> increase message.max.bytes
> - it will need to be at least as large as the setting above.
>
> At the moment all of the change-logs for your state-stores are being
> dropped due to this issue.
>
> Thanks,
> Damian
>
>
> On Tue, 13 Dec 2016 at 11:32 Jon Yeargers 
> wrote:
>
> > (am attaching a debug log - note that app terminated with no further
> > messages)
> >
> > topology: kStream -> groupByKey.aggregate(minute) -> foreach
> >  \-> groupByKey.aggregate(hour) -> foreach
> >
> >
> > config:
> >
> > Properties config = new Properties();
> > config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP);
> > config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG,
> ZOOKEEPER_IP);
> > config.put(StreamsConfig.APPLICATION_ID_CONFIG, "PRTMinuteAgg"
> );
> > config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> > AggKey.class.getName());
> > config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> > Serdes.String().getClass().getName());
> > config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
> > config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "2");
> > config.put(StreamsConfig.STATE_DIR_CONFIG, "/mnt/PRTMinuteAgg");
> >
> > config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "2");
> >
> >
> > On Mon, Dec 12, 2016 at 4:45 PM, Guozhang Wang 
> wrote:
> >
> > Jon,
> >
> > To help investigating this issue, could you let me know 1) your topology
> > sketch and 2) your app configs? For example did you enable caching in
> your
> > apps with the cache.max.bytes.buffering config?
> >
> >
> > Guozhang
> >
> >
> > On Sun, Dec 11, 2016 at 3:44 PM, Jon Yeargers 
> > wrote:
> >
> > > I get this one quite a bit. It kills my app after a short time of
> > running.
> > > Driving me nuts.
> > >
> > > On Sun, Dec 11, 2016 at 2:17 PM, Matthias J. Sax <
> matth...@confluent.io>
> > > wrote:
> > >
> > > > Not sure about this one.
> > > >
> > > > Can you describe what you do exactly? Can you reproduce the issue? We
> > > > definitely want to investigate this.
> > > >
> > > > -Matthias
> > > >
> > > > On 12/10/16 4:17 PM, Jon Yeargers wrote:
> > > > > (Am reporting these as have moved to 0.10.1.0-cp2)
> > > > >
> > > > > ERROR  o.a.k.c.c.i.ConsumerCoordinator - User provided listener
> > > > > org.apache.kafka.streams.processor.internals.StreamThread$1 for
> group
> > > > > MinuteAgg failed on partition assignment
> > > > >
> > > > > java.lang.IllegalStateException: task [1_9] Log end offset should
> not
> > > > > change while restoring
> > > > >
> > > > > at
> > > > > org.apache.kafka.streams.processor.internals.
> ProcessorStateManager.
> > > > restoreActiveState(ProcessorStateManager.java:245)
> > > > >
> > > > > at
> > > > > org.apache.kafka.streams.processor.internals.
> ProcessorStateManager.
> > > > register(ProcessorStateManager.java:198)
> > > > >
> > > > > at
> > > > > org.apache.kafka.streams.processor.internals.
> > > > ProcessorContextImpl.register(ProcessorContextImpl.java:123)
> > > > >
> > > > > at
> > > > > org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(
> > > > RocksDBWindowStore.java:206)
> > > > >
> > > > > at
> > > > > org.apache.kafka.streams.state.internals.MeteredWindowStore.init(
> > > > MeteredWindowStore.java:66)
> > > > >
> > > > > at
> > > > > org.apache.kafka.streams.state.internals.CachingWindowStore.init(
> > > > CachingWindowStore.java:64)
> > > > >
> > > > > at
> > > > > org.apache.kafka.streams.processor.internals.AbstractTask.
> > > > initializeStateStores(AbstractTask.java:81)
> > > > >
> > > > > at
> > > > > org.apache.kafka.streams.processor.internals.
> > > > StreamTask.(StreamTask.java:120)
> > > > >
> > > > > at
> > > > > org.apache.kafka.streams.processor.internals.
> > > > StreamThread.createStreamTask(StreamThread.java:633)
> > > > >
> 

Re: Another odd error

2016-12-13 Thread Damian Guy
HI Jon,

It looks like you have the logging level for KafkaStreams set to at least
WARN. I can only see ERROR level logs being produced from Streams.

However, i did notice an issue in the logs (not related to your specific
error but you will need to fix anyway):

There are lots of messages like:
task [2_9] Error sending record to topic
PRTMinuteAgg-prt_hour_agg_stream-changelog
org.apache.kafka.common.errors.RecordTooLargeException: The message is
2381750 bytes when serialized which is larger than the maximum

This means you need to add some extra config to your StreamsConfig:
config.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,
expectedMaximumMessageSizeBytes)

You will also possible need to adjust the broker properties and
increase message.max.bytes
- it will need to be at least as large as the setting above.

At the moment all of the change-logs for your state-stores are being
dropped due to this issue.

Thanks,
Damian


On Tue, 13 Dec 2016 at 11:32 Jon Yeargers  wrote:

> (am attaching a debug log - note that app terminated with no further
> messages)
>
> topology: kStream -> groupByKey.aggregate(minute) -> foreach
>  \-> groupByKey.aggregate(hour) -> foreach
>
>
> config:
>
> Properties config = new Properties();
> config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP);
> config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_IP);
> config.put(StreamsConfig.APPLICATION_ID_CONFIG, "PRTMinuteAgg" );
> config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> AggKey.class.getName());
> config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> Serdes.String().getClass().getName());
> config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
> config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "2");
> config.put(StreamsConfig.STATE_DIR_CONFIG, "/mnt/PRTMinuteAgg");
>
> config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "2");
>
>
> On Mon, Dec 12, 2016 at 4:45 PM, Guozhang Wang  wrote:
>
> Jon,
>
> To help investigating this issue, could you let me know 1) your topology
> sketch and 2) your app configs? For example did you enable caching in your
> apps with the cache.max.bytes.buffering config?
>
>
> Guozhang
>
>
> On Sun, Dec 11, 2016 at 3:44 PM, Jon Yeargers 
> wrote:
>
> > I get this one quite a bit. It kills my app after a short time of
> running.
> > Driving me nuts.
> >
> > On Sun, Dec 11, 2016 at 2:17 PM, Matthias J. Sax 
> > wrote:
> >
> > > Not sure about this one.
> > >
> > > Can you describe what you do exactly? Can you reproduce the issue? We
> > > definitely want to investigate this.
> > >
> > > -Matthias
> > >
> > > On 12/10/16 4:17 PM, Jon Yeargers wrote:
> > > > (Am reporting these as have moved to 0.10.1.0-cp2)
> > > >
> > > > ERROR  o.a.k.c.c.i.ConsumerCoordinator - User provided listener
> > > > org.apache.kafka.streams.processor.internals.StreamThread$1 for group
> > > > MinuteAgg failed on partition assignment
> > > >
> > > > java.lang.IllegalStateException: task [1_9] Log end offset should not
> > > > change while restoring
> > > >
> > > > at
> > > > org.apache.kafka.streams.processor.internals.ProcessorStateManager.
> > > restoreActiveState(ProcessorStateManager.java:245)
> > > >
> > > > at
> > > > org.apache.kafka.streams.processor.internals.ProcessorStateManager.
> > > register(ProcessorStateManager.java:198)
> > > >
> > > > at
> > > > org.apache.kafka.streams.processor.internals.
> > > ProcessorContextImpl.register(ProcessorContextImpl.java:123)
> > > >
> > > > at
> > > > org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(
> > > RocksDBWindowStore.java:206)
> > > >
> > > > at
> > > > org.apache.kafka.streams.state.internals.MeteredWindowStore.init(
> > > MeteredWindowStore.java:66)
> > > >
> > > > at
> > > > org.apache.kafka.streams.state.internals.CachingWindowStore.init(
> > > CachingWindowStore.java:64)
> > > >
> > > > at
> > > > org.apache.kafka.streams.processor.internals.AbstractTask.
> > > initializeStateStores(AbstractTask.java:81)
> > > >
> > > > at
> > > > org.apache.kafka.streams.processor.internals.
> > > StreamTask.(StreamTask.java:120)
> > > >
> > > > at
> > > > org.apache.kafka.streams.processor.internals.
> > > StreamThread.createStreamTask(StreamThread.java:633)
> > > >
> > > > at
> > > > org.apache.kafka.streams.processor.internals.
> > > StreamThread.addStreamTasks(StreamThread.java:660)
> > > >
> > > > at
> > > > org.apache.kafka.streams.processor.internals.StreamThread.access$100(
> > > StreamThread.java:69)
> > > >
> > > > at
> > > > org.apache.kafka.streams.processor.internals.StreamThread$1.
> > > onPartitionsAssigned(StreamThread.java:124)
> > > >
> > > > at
> > > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
> > 

Re: Another odd error

2016-12-13 Thread Jon Yeargers
(am attaching a debug log - note that app terminated with no further
messages)

topology: kStream -> groupByKey.aggregate(minute) -> foreach
 \-> groupByKey.aggregate(hour) -> foreach


config:

Properties config = new Properties();
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP);
config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_IP);
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "PRTMinuteAgg" );
config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
AggKey.class.getName());
config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "2");
config.put(StreamsConfig.STATE_DIR_CONFIG, "/mnt/PRTMinuteAgg");

config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "2");


On Mon, Dec 12, 2016 at 4:45 PM, Guozhang Wang  wrote:

> Jon,
>
> To help investigating this issue, could you let me know 1) your topology
> sketch and 2) your app configs? For example did you enable caching in your
> apps with the cache.max.bytes.buffering config?
>
>
> Guozhang
>
>
> On Sun, Dec 11, 2016 at 3:44 PM, Jon Yeargers 
> wrote:
>
> > I get this one quite a bit. It kills my app after a short time of
> running.
> > Driving me nuts.
> >
> > On Sun, Dec 11, 2016 at 2:17 PM, Matthias J. Sax 
> > wrote:
> >
> > > Not sure about this one.
> > >
> > > Can you describe what you do exactly? Can you reproduce the issue? We
> > > definitely want to investigate this.
> > >
> > > -Matthias
> > >
> > > On 12/10/16 4:17 PM, Jon Yeargers wrote:
> > > > (Am reporting these as have moved to 0.10.1.0-cp2)
> > > >
> > > > ERROR  o.a.k.c.c.i.ConsumerCoordinator - User provided listener
> > > > org.apache.kafka.streams.processor.internals.StreamThread$1 for
> group
> > > > MinuteAgg failed on partition assignment
> > > >
> > > > java.lang.IllegalStateException: task [1_9] Log end offset should
> not
> > > > change while restoring
> > > >
> > > > at
> > > > org.apache.kafka.streams.processor.internals.ProcessorStateManager.
> > > restoreActiveState(ProcessorStateManager.java:245)
> > > >
> > > > at
> > > > org.apache.kafka.streams.processor.internals.ProcessorStateManager.
> > > register(ProcessorStateManager.java:198)
> > > >
> > > > at
> > > > org.apache.kafka.streams.processor.internals.
> > > ProcessorContextImpl.register(ProcessorContextImpl.java:123)
> > > >
> > > > at
> > > > org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(
> > > RocksDBWindowStore.java:206)
> > > >
> > > > at
> > > > org.apache.kafka.streams.state.internals.MeteredWindowStore.init(
> > > MeteredWindowStore.java:66)
> > > >
> > > > at
> > > > org.apache.kafka.streams.state.internals.CachingWindowStore.init(
> > > CachingWindowStore.java:64)
> > > >
> > > > at
> > > > org.apache.kafka.streams.processor.internals.AbstractTask.
> > > initializeStateStores(AbstractTask.java:81)
> > > >
> > > > at
> > > > org.apache.kafka.streams.processor.internals.
> > > StreamTask.(StreamTask.java:120)
> > > >
> > > > at
> > > > org.apache.kafka.streams.processor.internals.
> > > StreamThread.createStreamTask(StreamThread.java:633)
> > > >
> > > > at
> > > > org.apache.kafka.streams.processor.internals.
> > > StreamThread.addStreamTasks(StreamThread.java:660)
> > > >
> > > > at
> > > > org.apache.kafka.streams.processor.internals.
> StreamThread.access$100(
> > > StreamThread.java:69)
> > > >
> > > > at
> > > > org.apache.kafka.streams.processor.internals.StreamThread$1.
> > > onPartitionsAssigned(StreamThread.java:124)
> > > >
> > > > at
> > > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
> > > onJoinComplete(ConsumerCoordinator.java:228)
> > > >
> > > > at
> > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> > > joinGroupIfNeeded(AbstractCoordinator.java:313)
> > > >
> > > > at
> > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> > > ensureActiveGroup(AbstractCoordinator.java:277)
> > > >
> > > > at
> > > > org.apache.kafka.clients.consumer.internals.
> ConsumerCoordinator.poll(
> > > ConsumerCoordinator.java:259)
> > > >
> > > > at
> > > > org.apache.kafka.clients.consumer.KafkaConsumer.
> > > pollOnce(KafkaConsumer.java:1013)
> > > >
> > > > at
> > > > org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> > > KafkaConsumer.java:979)
> > > >
> > > > at
> > > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > > StreamThread.java:407)
> > > >
> > > > at
> > > > org.apache.kafka.streams.processor.internals.
> > > StreamThread.run(StreamThread.java:242)
> > > >
> > >
> > >
> >
>
>
>
> --
> -- Guozhang
>

Re: checking consumer lag on KStreams app?

2016-12-13 Thread Sam Pegler
Is the following PR present https://github.com/apache/kafka/pull/1336?

__

Sam Pegler

PRODUCTION ENGINEER

T. +44(0) 07 562 867 486


3-7 Herbal Hill / London / EC1R 5EJ
www.infectiousmedia.com

This email and any attachments are confidential and may also be privileged.
If you
are not the intended recipient, please notify the sender immediately, and
do not
disclose the contents to another person, use it for any purpose, or store,
or copy
the information in any medium. Please also destroy and delete the message
from
your computer.


On 13 December 2016 at 10:03, Sachin Mittal  wrote:

> If this is a bug then it is not fixed because I just build kafka from
> source and it gave me the reported error.
>
> On Tue, Dec 13, 2016 at 3:24 PM, Sam Pegler  com>
> wrote:
>
> > >You can only check the offsets when there is an active member of the
> > consumer group.
> >
> > This was a bug [1] thats been fixed.  Thanks to Vincent Dautremont for
> > pointing this out to me a while ago.
> >
> > http://mail-archives.apache.org/mod_mbox/kafka-users/201611.mbox/%
> > 3CCAD2WViSAgwc9i4-9xEw1oz1xzpsbveFt1%3DSZ0qkHRiFEc3fXbw%40mail.
> > gmail.com%3E
> >
> > __
> >
> > Sam Pegler
> >
> > PRODUCTION ENGINEER
> >
> > T. +44(0) 07 562 867 486
> >
> > 
> > 3-7 Herbal Hill / London / EC1R 5EJ
> > www.infectiousmedia.com
> >
> > This email and any attachments are confidential and may also be
> privileged.
> > If you
> > are not the intended recipient, please notify the sender immediately, and
> > do not
> > disclose the contents to another person, use it for any purpose, or
> store,
> > or copy
> > the information in any medium. Please also destroy and delete the message
> > from
> > your computer.
> >
> >
> > On 13 December 2016 at 09:39, Damian Guy  wrote:
> >
> > > Hi Sachin
> > >
> > > That is correct. You can only check the offsets when there is an active
> > > member of the consumer group. In this case that would mean that you
> have
> > at
> > > least one instance of your streams application running.
> > >
> > > Thanks,
> > > Damian
> > >
> > > On Tue, 13 Dec 2016 at 06:58 Sachin Mittal  wrote:
> > >
> > > > Hi,
> > > > I used the following command
> > > > bin\windows\kafka-consumer-groups.bat --bootstrap-server
> > localhost:9092
> > > > --describe --group test
> > > > and I get the following output
> > > >
> > > > Note: This will only show information about consumers that use the
> Java
> > > > consumer API (non-ZooKeeper-based consumers).
> > > >
> > > > Error: Consumer group 'test' has no active members.
> > > >
> > > > What does this mean.
> > > >
> > > > It means I can check the offset of consumer only when streams
> > applictaion
> > > > "test" is running.
> > > >
> > > > Thanks
> > > > Sachin
> > > >
> > > >
> > > > On Mon, Dec 12, 2016 at 8:33 PM, Damian Guy 
> > > wrote:
> > > >
> > > > > Hi Sachin,
> > > > >
> > > > > You should use the kafka-consumer-groups.sh command. The
> > > > > ConsumerOffsetChecker is deprecated and is only for the old
> consumer.
> > > > >
> > > > > Thanks,
> > > > > Damian
> > > > >
> > > > > On Mon, 12 Dec 2016 at 14:32 Sachin Mittal 
> > wrote:
> > > > >
> > > > > > Hi,
> > > > > > I have a streams application running with application id test.
> > > > > > When I try to check consumer lag like you suggested I get the
> > > following
> > > > > > issue:
> > > > > >
> > > > > > bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker
> > --zookeeper
> > > > > > localhost:2181 --group test
> > > > > > [2016-12-12 10:26:01,348] WARN WARNING: ConsumerOffsetChecker is
> > > > > > deprecated and will be dropped in releases following 0.9.0. Use
> > > > > > ConsumerGroupCommand instead. (kafka.tools.
> ConsumerOffsetChecker$)
> > > > > > SLF4J: Class path contains multiple SLF4J bindings.
> > > > > > SLF4J: Found binding in
> > > > > >
> > > > > > [jar:file:/home/testuser/kafka/kafka_2.10-0.10.0.1/
> > > > > libs/logback-classic-1.0.3.jar!/org/slf4j/impl/
> > > StaticLoggerBinder.class]
> > > > > > SLF4J: Found binding in
> > > > > >
> > > > > > [jar:file:/home/testuser/kafka/kafka_2.10-0.10.0.1/
> > > > > libs/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/
> > > StaticLoggerBinder.class]
> > > > > > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for
> > an
> > > > > > explanation.
> > > > > > SLF4J: Actual binding is of type
> > > > > > [ch.qos.logback.classic.selector.DefaultContextSelector]
> > > > > > Exiting due to: org.apache.zookeeper.KeeperException$
> > > NoNodeException:
> > > > > > KeeperErrorCode = NoNode for /consumers/test/owners.
> > > > > >
> > > > > > Please let me know where I may be going wrong.
> > > > > > I have the kafka logs set in folder
> > > > > > /data01/testuser/kafka-logs
> > > > > >
> > > > > > Under kafka-logs I see many folders with name something like
> > > > 

Re: checking consumer lag on KStreams app?

2016-12-13 Thread Sachin Mittal
If this is a bug then it is not fixed because I just build kafka from
source and it gave me the reported error.

On Tue, Dec 13, 2016 at 3:24 PM, Sam Pegler 
wrote:

> >You can only check the offsets when there is an active member of the
> consumer group.
>
> This was a bug [1] thats been fixed.  Thanks to Vincent Dautremont for
> pointing this out to me a while ago.
>
> http://mail-archives.apache.org/mod_mbox/kafka-users/201611.mbox/%
> 3CCAD2WViSAgwc9i4-9xEw1oz1xzpsbveFt1%3DSZ0qkHRiFEc3fXbw%40mail.
> gmail.com%3E
>
> __
>
> Sam Pegler
>
> PRODUCTION ENGINEER
>
> T. +44(0) 07 562 867 486
>
> 
> 3-7 Herbal Hill / London / EC1R 5EJ
> www.infectiousmedia.com
>
> This email and any attachments are confidential and may also be privileged.
> If you
> are not the intended recipient, please notify the sender immediately, and
> do not
> disclose the contents to another person, use it for any purpose, or store,
> or copy
> the information in any medium. Please also destroy and delete the message
> from
> your computer.
>
>
> On 13 December 2016 at 09:39, Damian Guy  wrote:
>
> > Hi Sachin
> >
> > That is correct. You can only check the offsets when there is an active
> > member of the consumer group. In this case that would mean that you have
> at
> > least one instance of your streams application running.
> >
> > Thanks,
> > Damian
> >
> > On Tue, 13 Dec 2016 at 06:58 Sachin Mittal  wrote:
> >
> > > Hi,
> > > I used the following command
> > > bin\windows\kafka-consumer-groups.bat --bootstrap-server
> localhost:9092
> > > --describe --group test
> > > and I get the following output
> > >
> > > Note: This will only show information about consumers that use the Java
> > > consumer API (non-ZooKeeper-based consumers).
> > >
> > > Error: Consumer group 'test' has no active members.
> > >
> > > What does this mean.
> > >
> > > It means I can check the offset of consumer only when streams
> applictaion
> > > "test" is running.
> > >
> > > Thanks
> > > Sachin
> > >
> > >
> > > On Mon, Dec 12, 2016 at 8:33 PM, Damian Guy 
> > wrote:
> > >
> > > > Hi Sachin,
> > > >
> > > > You should use the kafka-consumer-groups.sh command. The
> > > > ConsumerOffsetChecker is deprecated and is only for the old consumer.
> > > >
> > > > Thanks,
> > > > Damian
> > > >
> > > > On Mon, 12 Dec 2016 at 14:32 Sachin Mittal 
> wrote:
> > > >
> > > > > Hi,
> > > > > I have a streams application running with application id test.
> > > > > When I try to check consumer lag like you suggested I get the
> > following
> > > > > issue:
> > > > >
> > > > > bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker
> --zookeeper
> > > > > localhost:2181 --group test
> > > > > [2016-12-12 10:26:01,348] WARN WARNING: ConsumerOffsetChecker is
> > > > > deprecated and will be dropped in releases following 0.9.0. Use
> > > > > ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$)
> > > > > SLF4J: Class path contains multiple SLF4J bindings.
> > > > > SLF4J: Found binding in
> > > > >
> > > > > [jar:file:/home/testuser/kafka/kafka_2.10-0.10.0.1/
> > > > libs/logback-classic-1.0.3.jar!/org/slf4j/impl/
> > StaticLoggerBinder.class]
> > > > > SLF4J: Found binding in
> > > > >
> > > > > [jar:file:/home/testuser/kafka/kafka_2.10-0.10.0.1/
> > > > libs/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/
> > StaticLoggerBinder.class]
> > > > > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for
> an
> > > > > explanation.
> > > > > SLF4J: Actual binding is of type
> > > > > [ch.qos.logback.classic.selector.DefaultContextSelector]
> > > > > Exiting due to: org.apache.zookeeper.KeeperException$
> > NoNodeException:
> > > > > KeeperErrorCode = NoNode for /consumers/test/owners.
> > > > >
> > > > > Please let me know where I may be going wrong.
> > > > > I have the kafka logs set in folder
> > > > > /data01/testuser/kafka-logs
> > > > >
> > > > > Under kafka-logs I see many folders with name something like
> > > > > consumer_offsets_*
> > > > >
> > > > > I have the stream dir set in folder
> > > > > /data01/testuser/kafka-streams/test
> > > > >
> > > > > Thanks
> > > > > Sachin
> > > > >
> > > > >
> > > > > On Sun, Dec 11, 2016 at 2:19 AM, Matthias J. Sax <
> > > matth...@confluent.io>
> > > > > wrote:
> > > > >
> > > > > > It's basically just a consumer as any other. The application.id
> is
> > > > used
> > > > > > as consumer group.id.
> > > > > >
> > > > > > So just use the available tools you do use to check consumer lag.
> > > > > >
> > > > > >
> > > > > > -Matthias
> > > > > >
> > > > > > On 12/9/16 5:49 PM, Jon Yeargers wrote:
> > > > > > > How would this be done?
> > > > > > >
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>


Re: checking consumer lag on KStreams app?

2016-12-13 Thread Sam Pegler
>You can only check the offsets when there is an active member of the
consumer group.

This was a bug [1] thats been fixed.  Thanks to Vincent Dautremont for
pointing this out to me a while ago.

http://mail-archives.apache.org/mod_mbox/kafka-users/201611.mbox/%3CCAD2WViSAgwc9i4-9xEw1oz1xzpsbveFt1%3DSZ0qkHRiFEc3fXbw%40mail.gmail.com%3E

__

Sam Pegler

PRODUCTION ENGINEER

T. +44(0) 07 562 867 486


3-7 Herbal Hill / London / EC1R 5EJ
www.infectiousmedia.com

This email and any attachments are confidential and may also be privileged.
If you
are not the intended recipient, please notify the sender immediately, and
do not
disclose the contents to another person, use it for any purpose, or store,
or copy
the information in any medium. Please also destroy and delete the message
from
your computer.


On 13 December 2016 at 09:39, Damian Guy  wrote:

> Hi Sachin
>
> That is correct. You can only check the offsets when there is an active
> member of the consumer group. In this case that would mean that you have at
> least one instance of your streams application running.
>
> Thanks,
> Damian
>
> On Tue, 13 Dec 2016 at 06:58 Sachin Mittal  wrote:
>
> > Hi,
> > I used the following command
> > bin\windows\kafka-consumer-groups.bat --bootstrap-server localhost:9092
> > --describe --group test
> > and I get the following output
> >
> > Note: This will only show information about consumers that use the Java
> > consumer API (non-ZooKeeper-based consumers).
> >
> > Error: Consumer group 'test' has no active members.
> >
> > What does this mean.
> >
> > It means I can check the offset of consumer only when streams applictaion
> > "test" is running.
> >
> > Thanks
> > Sachin
> >
> >
> > On Mon, Dec 12, 2016 at 8:33 PM, Damian Guy 
> wrote:
> >
> > > Hi Sachin,
> > >
> > > You should use the kafka-consumer-groups.sh command. The
> > > ConsumerOffsetChecker is deprecated and is only for the old consumer.
> > >
> > > Thanks,
> > > Damian
> > >
> > > On Mon, 12 Dec 2016 at 14:32 Sachin Mittal  wrote:
> > >
> > > > Hi,
> > > > I have a streams application running with application id test.
> > > > When I try to check consumer lag like you suggested I get the
> following
> > > > issue:
> > > >
> > > > bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper
> > > > localhost:2181 --group test
> > > > [2016-12-12 10:26:01,348] WARN WARNING: ConsumerOffsetChecker is
> > > > deprecated and will be dropped in releases following 0.9.0. Use
> > > > ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$)
> > > > SLF4J: Class path contains multiple SLF4J bindings.
> > > > SLF4J: Found binding in
> > > >
> > > > [jar:file:/home/testuser/kafka/kafka_2.10-0.10.0.1/
> > > libs/logback-classic-1.0.3.jar!/org/slf4j/impl/
> StaticLoggerBinder.class]
> > > > SLF4J: Found binding in
> > > >
> > > > [jar:file:/home/testuser/kafka/kafka_2.10-0.10.0.1/
> > > libs/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/
> StaticLoggerBinder.class]
> > > > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> > > > explanation.
> > > > SLF4J: Actual binding is of type
> > > > [ch.qos.logback.classic.selector.DefaultContextSelector]
> > > > Exiting due to: org.apache.zookeeper.KeeperException$
> NoNodeException:
> > > > KeeperErrorCode = NoNode for /consumers/test/owners.
> > > >
> > > > Please let me know where I may be going wrong.
> > > > I have the kafka logs set in folder
> > > > /data01/testuser/kafka-logs
> > > >
> > > > Under kafka-logs I see many folders with name something like
> > > > consumer_offsets_*
> > > >
> > > > I have the stream dir set in folder
> > > > /data01/testuser/kafka-streams/test
> > > >
> > > > Thanks
> > > > Sachin
> > > >
> > > >
> > > > On Sun, Dec 11, 2016 at 2:19 AM, Matthias J. Sax <
> > matth...@confluent.io>
> > > > wrote:
> > > >
> > > > > It's basically just a consumer as any other. The application.id is
> > > used
> > > > > as consumer group.id.
> > > > >
> > > > > So just use the available tools you do use to check consumer lag.
> > > > >
> > > > >
> > > > > -Matthias
> > > > >
> > > > > On 12/9/16 5:49 PM, Jon Yeargers wrote:
> > > > > > How would this be done?
> > > > > >
> > > > >
> > > > >
> > > >
> > >
> >
>


Re: checking consumer lag on KStreams app?

2016-12-13 Thread Damian Guy
Hi Sachin

That is correct. You can only check the offsets when there is an active
member of the consumer group. In this case that would mean that you have at
least one instance of your streams application running.

Thanks,
Damian

On Tue, 13 Dec 2016 at 06:58 Sachin Mittal  wrote:

> Hi,
> I used the following command
> bin\windows\kafka-consumer-groups.bat --bootstrap-server localhost:9092
> --describe --group test
> and I get the following output
>
> Note: This will only show information about consumers that use the Java
> consumer API (non-ZooKeeper-based consumers).
>
> Error: Consumer group 'test' has no active members.
>
> What does this mean.
>
> It means I can check the offset of consumer only when streams applictaion
> "test" is running.
>
> Thanks
> Sachin
>
>
> On Mon, Dec 12, 2016 at 8:33 PM, Damian Guy  wrote:
>
> > Hi Sachin,
> >
> > You should use the kafka-consumer-groups.sh command. The
> > ConsumerOffsetChecker is deprecated and is only for the old consumer.
> >
> > Thanks,
> > Damian
> >
> > On Mon, 12 Dec 2016 at 14:32 Sachin Mittal  wrote:
> >
> > > Hi,
> > > I have a streams application running with application id test.
> > > When I try to check consumer lag like you suggested I get the following
> > > issue:
> > >
> > > bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper
> > > localhost:2181 --group test
> > > [2016-12-12 10:26:01,348] WARN WARNING: ConsumerOffsetChecker is
> > > deprecated and will be dropped in releases following 0.9.0. Use
> > > ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$)
> > > SLF4J: Class path contains multiple SLF4J bindings.
> > > SLF4J: Found binding in
> > >
> > > [jar:file:/home/testuser/kafka/kafka_2.10-0.10.0.1/
> > libs/logback-classic-1.0.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> > > SLF4J: Found binding in
> > >
> > > [jar:file:/home/testuser/kafka/kafka_2.10-0.10.0.1/
> > libs/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> > > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> > > explanation.
> > > SLF4J: Actual binding is of type
> > > [ch.qos.logback.classic.selector.DefaultContextSelector]
> > > Exiting due to: org.apache.zookeeper.KeeperException$NoNodeException:
> > > KeeperErrorCode = NoNode for /consumers/test/owners.
> > >
> > > Please let me know where I may be going wrong.
> > > I have the kafka logs set in folder
> > > /data01/testuser/kafka-logs
> > >
> > > Under kafka-logs I see many folders with name something like
> > > consumer_offsets_*
> > >
> > > I have the stream dir set in folder
> > > /data01/testuser/kafka-streams/test
> > >
> > > Thanks
> > > Sachin
> > >
> > >
> > > On Sun, Dec 11, 2016 at 2:19 AM, Matthias J. Sax <
> matth...@confluent.io>
> > > wrote:
> > >
> > > > It's basically just a consumer as any other. The application.id is
> > used
> > > > as consumer group.id.
> > > >
> > > > So just use the available tools you do use to check consumer lag.
> > > >
> > > >
> > > > -Matthias
> > > >
> > > > On 12/9/16 5:49 PM, Jon Yeargers wrote:
> > > > > How would this be done?
> > > > >
> > > >
> > > >
> > >
> >
>


Re: rocksdb error(s)

2016-12-13 Thread Damian Guy
Hi Jon,

But i don't see any attached logs?

Thanks,
Damian

On Mon, 12 Dec 2016 at 19:35 Jon Yeargers  wrote:

> The attached error log shows several of the problems I've run into. After
> hitting this list (and typically much less) the app dies.
>
> On Mon, Dec 12, 2016 at 4:48 AM, Damian Guy  wrote:
>
> Just set the log level to debug and then run your app until you start
> seeing the problem.
> Thanks
>
> On Mon, 12 Dec 2016 at 12:47 Jon Yeargers 
> wrote:
>
> > I can log whatever you need. Tell me what is useful.
> >
> > On Mon, Dec 12, 2016 at 4:43 AM, Damian Guy 
> wrote:
> >
> > > If you provide the logs from your streams application then we might
> have
> > > some chance of working out what is going on. Without logs then we
> really
> > > don't have much hope of diagnosing the problem.
> > >
> > > On Mon, 12 Dec 2016 at 12:18 Jon Yeargers 
> > > wrote:
> > >
> > > > Im running as many threads as I have partitions on this topic. Just
> > > curious
> > > > if it would make any difference to the seemingly endless rebalancing
> > > woes.
> > > >
> > > > So far no change. In fact, I'll often see all 10 partitions (plus the
> > 2 x
> > > > 10 for the two aggregations) assigned to a single thread.
> > > >
> > > > On Mon, Dec 12, 2016 at 4:15 AM, Jon Yeargers <
> > jon.yearg...@cedexis.com>
> > > > wrote:
> > > >
> > > > > At this moment I have 5 instances each running 2 threads.
> > > > > Single instance / machine.
> > > > >
> > > > > Define 'full logs' ?
> > > > >
> > > > > On Mon, Dec 12, 2016 at 3:54 AM, Damian Guy 
> > > > wrote:
> > > > >
> > > > >> Jon,
> > > > >>
> > > > >> How many StreamThreads do you have running?
> > > > >> How many application instances?
> > > > >> Do you have more than one instance per machine? If yes, are they
> > > sharing
> > > > >> the same State Directory?
> > > > >> Do you have full logs that can be provided so we can try and see
> > > > how/what
> > > > >> is happening?
> > > > >>
> > > > >> Thanks,
> > > > >> Damian
> > > > >>
> > > > >> On Mon, 12 Dec 2016 at 10:17 Jon Yeargers <
> jon.yearg...@cedexis.com
> > >
> > > > >> wrote:
> > > > >>
> > > > >> > No luck here. Moved all state storage to a non-tmp folder and
> > > > restarted.
> > > > >> > Still hitting the 'No locks available' error quite frequently.
> > > > >> >
> > > > >> > On Sun, Dec 11, 2016 at 3:45 PM, Jon Yeargers <
> > > > jon.yearg...@cedexis.com
> > > > >> >
> > > > >> > wrote:
> > > > >> >
> > > > >> > > I moved the state folder to a separate drive and linked out to
> > it.
> > > > >> > >
> > > > >> > > I'll try your suggestion and point directly.
> > > > >> > >
> > > > >> > > On Sun, Dec 11, 2016 at 2:20 PM, Matthias J. Sax <
> > > > >> matth...@confluent.io>
> > > > >> > > wrote:
> > > > >> > >
> > > > >> > >> I am not sure, but this might be related with your state
> > > directory.
> > > > >> > >>
> > > > >> > >> You use default directory that is located in /tmp -- could it
> > be,
> > > > >> that
> > > > >> > >> /tmp gets clean up and thus you loose files/directories?
> > > > >> > >>
> > > > >> > >> Try to reconfigure your state directory via StreamsConfig:
> > > > >> > >> http://docs.confluent.io/current/streams/developer-guide.
> > > > >> > >> html#optional-configuration-parameters
> > > > >> > >>
> > > > >> > >>
> > > > >> > >> -Matthias
> > > > >> > >>
> > > > >> > >> On 12/11/16 1:28 AM, Jon Yeargers wrote:
> > > > >> > >> > Seeing this appearing somewhat frequently -
> > > > >> > >> >
> > > > >> > >> > org.apache.kafka.streams.errors.ProcessorStateException:
> > Error
> > > > >> opening
> > > > >> > >> > store minute_agg_stream-201612100812 at location
> > > > >> > >> >
> /tmp/kafka-streams/MinuteAgg/1_9/minute_agg_stream/minute_ag
> > > > >> > >> g_stream-201612100812
> > > > >> > >> >
> > > > >> > >> > at
> > > > >> > >> >
> org.apache.kafka.streams.state.internals.RocksDBStore.openDB
> > > > >> > >> (RocksDBStore.java:196)
> > > > >> > >> >
> > > > >> > >> > at
> > > > >> > >> >
> org.apache.kafka.streams.state.internals.RocksDBStore.openDB
> > > > >> > >> (RocksDBStore.java:158)
> > > > >> > >> >
> > > > >> > >> > at
> > > > >> > >> >
> org.apache.kafka.streams.state.internals.RocksDBWindowStore$
> > > > >> > >> Segment.openDB(RocksDBWindowStore.java:72)
> > > > >> > >> >
> > > > >> > >> > at
> > > > >> > >> >
> org.apache.kafka.streams.state.internals.RocksDBWindowStore.
> > > > >> > >> getOrCreateSegment(RocksDBWindowStore.java:402)
> > > > >> > >> >
> > > > >> > >> > at
> > > > >> > >> >
> org.apache.kafka.streams.state.internals.RocksDBWindowStore.
> > > > >> > >> putInternal(RocksDBWindowStore.java:333)
> > > > >> > >> >
> > > > >> > >> > at
> > > > >> > >> >
> org.apache.kafka.streams.state.internals.RocksDBWindowStore.
> > > > >> > >> access$100(RocksDBWindowStore.java:51)
> > > > >> > >> 

Re: How does 'TimeWindows.of().until()' work?

2016-12-13 Thread Sachin Mittal
Hi,
So is until for future or past?
Say I get first record at t = 0 and until is 100 and my window size is 50
advance by 25.
I understand it will create windows (0, 50), (25, 75), (50, 100)
Now at t = 101 it will drop
(0, 50), (25, 75), (50, 100) and create
(101, 150), (125, 175), (150, 200)

Please confirm if this understanding us correct. It is not clear how it
will handle overlapping windows (75, 125) and (175, 225) and so on?

What case is not clear again is that at say t = 102 I get some message with
timestamp 99. What happens then?
Will the result added to previous aggregation of (50, 100) or (75, 125),
like it should.

Or it will recreate the old window (50, 100) and aggregate the value there
and then drop it. This would result is wrong aggregated value, as it does
not consider the previous aggregated values.

So this is the pressing case I am not able to understand. Maybe I am wrong
at some basic understanding.


Next for
The parameter
> windowstore.changelog.additional.retention.ms

How does this relate to rentention.ms param of topic config?
I create internal topic manually using say rentention.ms=360.
In next release (post kafka_2.10-0.10.0.1) since we support delete of
internal changelog topic as well and I want it to be retained for say just
1 hour.
So how does that above parameter interfere with this topic level setting.
Or now I just need to set above config as 360 and not add
rentention.ms=360
while creating internal topic.
This is just another doubt remaining here.

Thanks
Sachin



On Tue, Dec 13, 2016 at 3:02 AM, Matthias J. Sax 
wrote:

> Sachin,
>
> There is no reason to have an .until() AND a .retain() -- just increase
> the value of .until()
>
> If you have a window of let's say 1h size and you set .until() also to
> 1h -- you can obviously not process any late arriving data. If you set
> until() to 2h is this example, you can process data that is up to 1h
> delayed.
>
> So basically, the retention should always be larger than you window size.
>
> The parameter
> > windowstore.changelog.additional.retention.ms
>
> is applies to changelog topics that backup window state stores. Those
> changelog topics are compacted. However, the used key does encode an
> window ID and thus older data can never be cleaned up by compaction.
> Therefore, an additional retention time is applied to those topics, too.
> Thus, if an old window is not updated for this amount of time, it will
> get deleted eventually preventing this topic to grown infinitely.
>
> The value will be determined by until(), i.e., whatever you specify in
> .until() will be used to set this parameter.
>
>
> -Matthias
>
> On 12/12/16 1:07 AM, Sachin Mittal wrote:
> > Hi,
> > We are facing the exact problem as described by Matthias above.
> > We are keeping default until which is 1 day.
> >
> > Our record's times tamp extractor has a field which increases with time.
> > However for short time we cannot guarantee the time stamp is always
> > increases. So at the boundary ie after 24 hrs we can get records which
> are
> > beyond that windows retention period.
> >
> > Then it happens like it is mentioned above and our aggregation fails.
> >
> > So just to sum up when we get record
> > 24h + 1 sec (it deletes older window and since the new record belongs to
> > the new window its gets created)
> > Now when we get next record of 24 hs - 1 sec since older window is
> dropped
> > it does not get aggregated in that bucket.
> >
> > I suggest we have another setting next to until call retain which retains
> > the older windows into next window.
> >
> > I think at stream window boundary level it should use a concept of
> sliding
> > window. So we can define window like
> >
> > TimeWindows.of("test-table", 3600 * 1000l).advanceBy(1800 *
> 1000l).untill(7
> > * 24 * 3600 * 1000l).retain(900 * 1000l)
> >
> > So after 7 days it retains the data covered by windows in last 15 minutes
> > which rolls over the data in them to next window. This way streams work
> > continuously.
> >
> > Please let us know your thoughts on this.
> >
> > On another side question on this there is a setting:
> >
> > windowstore.changelog.additional.retention.ms
> > I is not clear what is does. Is this the default for until?
> >
> > Thanks
> > Sachin
> >
> >
> > On Mon, Dec 12, 2016 at 10:17 AM, Matthias J. Sax  >
> > wrote:
> >
> >> Windows are created on demand, ie, each time a new record arrives and
> >> there is no window yet for it, a new window will get created.
> >>
> >> Windows are accepting data until their retention time (that you can
> >> configure via .until()) passed. Thus, you will have many windows being
> >> open in parallel.
> >>
> >> If you read older data, they will just be put into the corresponding
> >> windows (as long as window retention time did not pass). If a window was
> >> discarded already, a new window with this single (later arriving) record
> >> will get created, the computation will