Re: Help with Mirror Maker config to produce to all partitions in target cluster

2017-11-01 Thread Manikumar
Any exceptions in the mirror maker logs? may be you can enable mirror maker
trace logs.
maybe all messages are having same key? Can you recheck partition count on
target cluster?


On Thu, Nov 2, 2017 at 2:45 AM, Chris Neal  wrote:

> Apologies for bumping my own post, but really hoping someone has experience
> with MirrorMaker who might be able to help with my question below. :)
>
> Thanks!
> Chris
>
> On Fri, Oct 27, 2017 at 10:59 AM, Chris Neal  wrote:
>
> > Hi all,
> >
> > I've been struggling with this for awhile and need some help please. :)
> >
> > I have a source cluster running Kafka 0.8.1.1 with about 900 topics.
> > I have a target cluster running Kafka 0.10.2 with those same topics
> > pre-created with the same number of partitions per topic as the source
> > cluster.
> >
> > I'm running Mirror Maker from the 0.8.1.1 Kafka version, and cannot seem
> > to get the produced records to go anywhere besides partition 0 of all
> these
> > topics. :(
> >
> > I found this page to be helpful WRT performance tuning, but it did not
> > solve my partition producer problem:
> >
> > https://community.hortonworks.com/articles/79891/kafka-
> > mirror-maker-best-practices.html
> >
> > My consumer.config:
> > ==
> > zookeeper.connect=zk1:2181,zk2:2181,zk3:2181
> > group.id=prod_perf_mirrormaker2_new1
> > num.consumer.fetchers=1100
> > client.id=MirrorMakerConsumer_perf
> > ==
> >
> > My producer config:
> > ==
> > metadata.broker.list=kafka1:9092,kafka2:9092,kafka3:9092
> > compression.codec=gzip
> > producer.type=async
> > batch.num.messages=1000
> > message.send.max.retries=2
> > client.id=MirrorMakerProducer_perf
> > ==
> >
> > My start script:
> > ==
> > #!/bin/sh
> >
> > export KAFKA_HEAP_OPTS="-Xmx2G -Xms2G"
> >
> > /home/chris.neal/kafka_2.9.2-0.8.1.1/bin/kafka-run-class.sh
> > kafka.tools.MirrorMaker \
> > --producer.config /home/chris.neal/mirror_maker/
> prod_perf/producer.properties
> > \
> > --consumer.config /home/chris.neal/mirror_maker/
> prod_perf/consumer.properties
> > \
> > --num.streams=10 \
> > --whitelist ''
> > ==
> >
> > I do know that there is some combination of "num.streams" and
> > "num.consumer.fetchers" that should get it working, but I can't figure
> out
> > what that combination is.
> >
> > All topics have the same setup of 10 partitions per topic.
> >
> > Can anyone shed some light on how to make this go across all 10
> > partitions, and not just partition 0?
> >
> > Thank you SO much for your time and help.
> > Chris
> >
>


Reg. Kafka transactional producer and consumer

2017-11-01 Thread Abhishek Verma

Hi All,

I am trying to make a hello world example for Transactional Producer and trying 
to consume.
I am doing all this in plain java.

I can produce but consumer is not consuming message.

I searched over other places and I found some people have same problem.

Right now, I am using single broker. I tried same with 3 brokers also and it 
was not working at that time also.

I don’t know what I am missing and where… :p in Consumer I am missing something 
or in producer.

I have attached Producer and Consumer codes and console logs with my broker logs

Thanks,
Abhishek


My Broker logs after producing messages

[2017-11-01 18:45:55,000] INFO Updated PartitionLeaderEpoch. New: {epoch:4, 
offset:3}, Current: {epoch:3, offset0} for Partition: __transaction_state-2. 
Cache now contains 1 entries. (kafka.server.epoch.LeaderEpochFileCache)
[2017-11-01 18:46:03,482] INFO [Transaction Coordinator 1001]: Initialized 
transactionalId TXN_ID:0.5031925219291776-156417066 with producerId 4001 and 
producer epoch 0 on partition __transaction_state-2 
(kafka.coordinator.transaction.TransactionCoordinator)


My producer code is

import com.example.transaction.producer.utils.DataObject;
import com.example.transaction.producer.utils.serde.JsonSerializer;
import kafka.common.AuthorizationException;
import kafka.common.KafkaException;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;

import java.util.*;

public class SampleProducer {

public static String topic = "topic-4";

public static void main(String[] args) {

Properties configProperties = new Properties();

//configProperties.put(ProducerConfig.CLIENT_ID_CONFIG, 
"some-client-id");
configProperties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "TXN_ID:" 
+ new Random().nextDouble() + new Random().nextInt());
configProperties.put("acks", "all");
configProperties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
configProperties.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
configProperties.put("key.serializer", 
"org.apache.kafka.common.serialization.IntegerSerializer");
configProperties.put("value.serializer", JsonSerializer.class);
configProperties.put("bootstrap.servers", 
"192.168.41.132:9090");


KafkaProducerproducer = new 
KafkaProducer<>(configProperties);

System.out.println("Init Transaction");
producer.initTransactions();
try {

System.out.println("transaction initialised going to begin 
transaction");
producer.beginTransaction();
System.out.println("Transaction started");

ProducerRecord rec = new ProducerRecord(topic, 5, new DataObject(5, 
"Hello, World!"));

RecordMetadata metadata = (RecordMetadata) producer.send(rec).get();
System.out.println("The offset of the record we just sent is: " + 
metadata.offset());

metadata = (RecordMetadata) producer.send(rec).get();
System.out.println("The offset of the record we just sent is: " + 
metadata.offset());

producer.commitTransaction();
System.out.println("Transaction Committed");

}catch (ProducerFencedException | OutOfOrderSequenceException | 
AuthorizationException e){
// We can't recover from these exceptions, so our only option is to 
close the producer and exit.
System.out.println("Connection closed but commit failed. We can't 
recover");
producer.close();
}catch(KafkaException e) {
// For all other exceptions, just abort the transaction and try 
again.
System.out.println("Abort Transaction");
producer.abortTransaction();
}catch (Exception x){}
producer.close();
System.out.println("Closed");
}
}




These are my producer console logs


0[main] INFO  org.apache.kafka.clients.producer.ProducerConfig  - 
ProducerConfig values:
acks = all
batch.size = 16384
bootstrap.servers = [192.168.41.132:9090]
buffer.memory = 33554432
client.id =
compression.type = none
connections.max.idle.ms = 54
enable.idempotence = true
interceptor.classes = null
key.serializer = class org.apache.kafka.common.serialization.IntegerSerializer
linger.ms = 0
max.block.ms = 6
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 30
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 3
partitioner.class = class 
org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768

Re: [ANNOUNCE] Apache Kafka 1.0.0 Released

2017-11-01 Thread Jaikiran Pai
Congratulations Kafka team on the release. Happy to see Kafka reach this 
milestone. It has been a pleasure using Kafka and also interacting with 
the Kafka team.


-Jaikiran


On 01/11/17 7:57 PM, Guozhang Wang wrote:

The Apache Kafka community is pleased to announce the release for Apache
Kafka 1.0.0.

This is a major release of the Kafka project, and is no mere bump of the
version number. The Apache Kafka Project Management Committee has packed a
number of valuable enhancements into the release. Let me summarize a few of
them:

** Since its introduction in version 0.10, the Streams API has become
hugely popular among Kafka users, including the likes of Pinterest,
Rabobank, Zalando, and The New York Times. In 1.0, the the API continues to
evolve at a healthy pace. To begin with, the builder API has been improved
(KIP-120). A new API has been added to expose the state of active tasks at
runtime (KIP-130). Debuggability gets easier with enhancements to the
print() and writeAsText() methods (KIP-160). And if that’s not enough,
check out KIP-138 and KIP-161 too. For more on streams, check out the
Apache Kafka Streams documentation (https://kafka.apache.org/docu
mentation/streams/), including some helpful new tutorial videos.

** Operating Kafka at scale requires that the system remain observable, and
to make that easier, we’ve made a number of improvements to metrics. These
are too many to summarize without becoming tedious, but Connect metrics
have been significantly improved (KIP-196), a litany of new health check
metrics are now exposed (KIP-188), and we now have a global topic and
partition count (KIP-168). Check out KIP-164 and KIP-187 for even more.

** We now support Java 9, leading, among other things, to significantly
faster TLS and CRC32C implementations. Over-the-wire encryption will be
faster now, which will keep Kafka fast and compute costs low when
encryption is enabled.

** In keeping with the security theme, KIP-152 cleans up the error handling
on Simple Authentication Security Layer (SASL) authentication attempts.
Previously, some authentication error conditions were indistinguishable
from broker failures and were not logged in a clear way. This is cleaner
now.

** Kafka can now tolerate disk failures better. Historically, JBOD storage
configurations have not been recommended, but the architecture has
nevertheless been tempting: after all, why not rely on Kafka’s own
replication mechanism to protect against storage failure rather than using
RAID? With KIP-112, Kafka now handles disk failure more gracefully. A
single disk failure in a JBOD broker will not bring the entire broker down;
rather, the broker will continue serving any log files that remain on
functioning disks.

** Since release 0.11.0, the idempotent producer (which is the producer
used in the presence of a transaction, which of course is the producer we
use for exactly-once processing) required max.in.flight.requests.per.connection
to be equal to one. As anyone who has written or tested a wire protocol can
attest, this put an upper bound on throughput. Thanks to KAFKA-5949, this
can now be as large as five, relaxing the throughput constraint quite a bit.


All of the changes in this release can be found in the release notes:

https://dist.apache.org/repos/dist/release/kafka/1.0.0/RELEASE_NOTES.html


You can download the source release from:

https://www.apache.org/dyn/closer.cgi?path=/kafka/1.0.0/kafka-1.0.0-src.tgz

and binary releases from:

https://www.apache.org/dyn/closer.cgi?path=/kafka/1.0.0/kafka_2.11-1.0.0.tgz
(Scala
2.11)
https://www.apache.org/dyn/closer.cgi?path=/kafka/1.0.0/kafka_2.12-1.0.0.tgz
(Scala
2.12)



---

Apache Kafka is a distributed streaming platform with four four core APIs:

** The Producer API allows an application to publish a stream records to one
or more Kafka topics.

** The Consumer API allows an application to subscribe to one or more topics
and process the stream of records produced to them.

** The Streams API allows an application to act as a stream processor,
consuming
an input stream from one or more topics and producing an output stream to
one or more output topics, effectively transforming the input streams to
output streams.

** The Connector API allows building and running reusable producers or
consumers
that connect Kafka topics to existing applications or data systems. For
example, a connector to a relational database might capture every change to
a table.three key capabilities:


With these APIs, Kafka can be used for two broad classes of application:

** Building real-time streaming data pipelines that reliably get data between
systems or applications.

** Building real-time streaming applications that transform or react
to the streams
of data.


Apache Kafka is in use at large and small companies worldwide, including
Capital One, Goldman Sachs, ING, LinkedIn, Netflix, Pinterest, 

Re: [ANNOUNCE] Apache Kafka 1.0.0 Released

2017-11-01 Thread James Cheng
Good job, everyone!

And thanks Guozhang for running the release!

-James

> On Nov 1, 2017, at 7:27 AM, Guozhang Wang  wrote:
> 
> The Apache Kafka community is pleased to announce the release for Apache
> Kafka 1.0.0.
> 
> This is a major release of the Kafka project, and is no mere bump of the
> version number. The Apache Kafka Project Management Committee has packed a
> number of valuable enhancements into the release. Let me summarize a few of
> them:
> 
> ** Since its introduction in version 0.10, the Streams API has become
> hugely popular among Kafka users, including the likes of Pinterest,
> Rabobank, Zalando, and The New York Times. In 1.0, the the API continues to
> evolve at a healthy pace. To begin with, the builder API has been improved
> (KIP-120). A new API has been added to expose the state of active tasks at
> runtime (KIP-130). Debuggability gets easier with enhancements to the
> print() and writeAsText() methods (KIP-160). And if that’s not enough,
> check out KIP-138 and KIP-161 too. For more on streams, check out the
> Apache Kafka Streams documentation (https://kafka.apache.org/docu
> mentation/streams/), including some helpful new tutorial videos.
> 
> ** Operating Kafka at scale requires that the system remain observable, and
> to make that easier, we’ve made a number of improvements to metrics. These
> are too many to summarize without becoming tedious, but Connect metrics
> have been significantly improved (KIP-196), a litany of new health check
> metrics are now exposed (KIP-188), and we now have a global topic and
> partition count (KIP-168). Check out KIP-164 and KIP-187 for even more.
> 
> ** We now support Java 9, leading, among other things, to significantly
> faster TLS and CRC32C implementations. Over-the-wire encryption will be
> faster now, which will keep Kafka fast and compute costs low when
> encryption is enabled.
> 
> ** In keeping with the security theme, KIP-152 cleans up the error handling
> on Simple Authentication Security Layer (SASL) authentication attempts.
> Previously, some authentication error conditions were indistinguishable
> from broker failures and were not logged in a clear way. This is cleaner
> now.
> 
> ** Kafka can now tolerate disk failures better. Historically, JBOD storage
> configurations have not been recommended, but the architecture has
> nevertheless been tempting: after all, why not rely on Kafka’s own
> replication mechanism to protect against storage failure rather than using
> RAID? With KIP-112, Kafka now handles disk failure more gracefully. A
> single disk failure in a JBOD broker will not bring the entire broker down;
> rather, the broker will continue serving any log files that remain on
> functioning disks.
> 
> ** Since release 0.11.0, the idempotent producer (which is the producer
> used in the presence of a transaction, which of course is the producer we
> use for exactly-once processing) required 
> max.in.flight.requests.per.connection
> to be equal to one. As anyone who has written or tested a wire protocol can
> attest, this put an upper bound on throughput. Thanks to KAFKA-5949, this
> can now be as large as five, relaxing the throughput constraint quite a bit.
> 
> 
> All of the changes in this release can be found in the release notes:
> 
> https://dist.apache.org/repos/dist/release/kafka/1.0.0/RELEASE_NOTES.html
> 
> 
> You can download the source release from:
> 
> https://www.apache.org/dyn/closer.cgi?path=/kafka/1.0.0/kafka-1.0.0-src.tgz
> 
> and binary releases from:
> 
> https://www.apache.org/dyn/closer.cgi?path=/kafka/1.0.0/kafka_2.11-1.0.0.tgz
> (Scala
> 2.11)
> https://www.apache.org/dyn/closer.cgi?path=/kafka/1.0.0/kafka_2.12-1.0.0.tgz
> (Scala
> 2.12)
> 
> 
> 
> ---
> 
> Apache Kafka is a distributed streaming platform with four four core APIs:
> 
> ** The Producer API allows an application to publish a stream records to one
> or more Kafka topics.
> 
> ** The Consumer API allows an application to subscribe to one or more topics
> and process the stream of records produced to them.
> 
> ** The Streams API allows an application to act as a stream processor,
> consuming
> an input stream from one or more topics and producing an output stream to
> one or more output topics, effectively transforming the input streams to
> output streams.
> 
> ** The Connector API allows building and running reusable producers or
> consumers
> that connect Kafka topics to existing applications or data systems. For
> example, a connector to a relational database might capture every change to
> a table.three key capabilities:
> 
> 
> With these APIs, Kafka can be used for two broad classes of application:
> 
> ** Building real-time streaming data pipelines that reliably get data between
> systems or applications.
> 
> ** Building real-time streaming applications that transform or react
> to the streams
> of data.

Authentication using ssl in kafka

2017-11-01 Thread mascarenhas, jewel
Hi,
I am trying to configure apache kafka version 0.11.0.1 with ssl, but am faing 
the following error while trying to produce:


Could you please suggest as to what can I do such that I can have client 
authentication taking place via ssl, my kafka configrations are given below:

Server.properties file


Producer.properties


Consumer.properties




Awaiting your reply.

Thanks & Regards,
Jewel Mascarenhas





Re: Building news feed of social app using kafka

2017-11-01 Thread Svante Karlsson
Nope, that's the wrong design. It does not scale. You would end up in a
wide and shallow thing. To few messages per partition to make sense. You
want many thousands per partition per second to amortize the consumer to
broker round-trip.


On Nov 1, 2017 21:12, "Anshuman Ghosh" 
wrote:

> Hello!
>
> I am currently designing a social app (with the whole gamut of users
> following each other and personal feeds - consisting of posts by those you
> follow). To implement this "news feed" for each user, I was considering
> having a Kafka stream/topic per user.
>
> Given that our intention is to get 1M+ users on the app, is this a good
> idea to have 1 topic per user, thus ending up with a million topics?
>
> Thanks and regards,
> Anshuman
>


Re: Help with Mirror Maker config to produce to all partitions in target cluster

2017-11-01 Thread Chris Neal
Apologies for bumping my own post, but really hoping someone has experience
with MirrorMaker who might be able to help with my question below. :)

Thanks!
Chris

On Fri, Oct 27, 2017 at 10:59 AM, Chris Neal  wrote:

> Hi all,
>
> I've been struggling with this for awhile and need some help please. :)
>
> I have a source cluster running Kafka 0.8.1.1 with about 900 topics.
> I have a target cluster running Kafka 0.10.2 with those same topics
> pre-created with the same number of partitions per topic as the source
> cluster.
>
> I'm running Mirror Maker from the 0.8.1.1 Kafka version, and cannot seem
> to get the produced records to go anywhere besides partition 0 of all these
> topics. :(
>
> I found this page to be helpful WRT performance tuning, but it did not
> solve my partition producer problem:
>
> https://community.hortonworks.com/articles/79891/kafka-
> mirror-maker-best-practices.html
>
> My consumer.config:
> ==
> zookeeper.connect=zk1:2181,zk2:2181,zk3:2181
> group.id=prod_perf_mirrormaker2_new1
> num.consumer.fetchers=1100
> client.id=MirrorMakerConsumer_perf
> ==
>
> My producer config:
> ==
> metadata.broker.list=kafka1:9092,kafka2:9092,kafka3:9092
> compression.codec=gzip
> producer.type=async
> batch.num.messages=1000
> message.send.max.retries=2
> client.id=MirrorMakerProducer_perf
> ==
>
> My start script:
> ==
> #!/bin/sh
>
> export KAFKA_HEAP_OPTS="-Xmx2G -Xms2G"
>
> /home/chris.neal/kafka_2.9.2-0.8.1.1/bin/kafka-run-class.sh
> kafka.tools.MirrorMaker \
> --producer.config /home/chris.neal/mirror_maker/prod_perf/producer.properties
> \
> --consumer.config /home/chris.neal/mirror_maker/prod_perf/consumer.properties
> \
> --num.streams=10 \
> --whitelist ''
> ==
>
> I do know that there is some combination of "num.streams" and
> "num.consumer.fetchers" that should get it working, but I can't figure out
> what that combination is.
>
> All topics have the same setup of 10 partitions per topic.
>
> Can anyone shed some light on how to make this go across all 10
> partitions, and not just partition 0?
>
> Thank you SO much for your time and help.
> Chris
>


Building news feed of social app using kafka

2017-11-01 Thread Anshuman Ghosh
Hello!

I am currently designing a social app (with the whole gamut of users
following each other and personal feeds - consisting of posts by those you
follow). To implement this "news feed" for each user, I was considering
having a Kafka stream/topic per user.

Given that our intention is to get 1M+ users on the app, is this a good
idea to have 1 topic per user, thus ending up with a million topics?

Thanks and regards,
Anshuman


Re: Client Offset Deleted by Broker without cause

2017-11-01 Thread Elmar Weber

Right, they are:
http://cupenya.com/files/tmp/hgd23121/consumer-logs.png
http://cupenya.com/files/tmp/hgd23121/kafka-broker-logs.png

(it's a bit more than in the pasted mail)

On 11/01/2017 5:35 PM, Ted Yu wrote:

bq. attached screenshots from the log viewer

The screenshots didn't go through. Consider using 3rd party site.

On Wed, Nov 1, 2017 at 9:18 AM, Elmar Weber  wrote:


Hello,

I had this morning the issue that a client offset got deleted from a
broker as it seems.

(Kafka 0.11.0.1 with patch for KAFKA-6030 on top)

It happened in a test environment and the pipeline stage got re-deployed
multiple times, during one restart, when the consumer reconnected, it
didn't get any offset and started from the beginning as per it's
configuration. The queue didn't receive any new events in the last 48h, so
any restart should not do anything.

Here the relevant consumer logs:


2017-10-31 17:15:08.453 Successfully joined group
events-inst-agg-stream.aggregation.v1 with generation 5
2017-11-01 14:29:46.554 Successfully joined group
events-inst-agg-stream.aggregation.v1 with generation 7
2017-11-01 14:51:14.639 Successfully joined group
events-inst-agg-stream.aggregation.v1 with generation 9
2017-11-01 14:51:19.068 Committing Map(GroupTopicPartition(events
-inst-agg-stream.aggregation.v1,events.lg,0) -> 3830)
2017-11-01 14:51:24.083 Committing Map(GroupTopicPartition(events
-inst-agg-stream.aggregation.v1,events.lg,0) -> 11339)

You can see the restarts at 17:15 yesterday and 14:29 today were normal.
Then the restart at 14:51 started reading from the beginning (we log
committed offsets).

The relevant leading broker did some "stuff" that was different between
14:29 and 14:51. The full logs are below.
 From what I can see it deleted a segment from the consumer offsets log,
and the next time the consumer connected it got no offset.

I can provide the logs of the other kafka nodes if it is useful. I also
attached screenshots from the log viewer in case it's easier to read.

I found https://issues.apache.org/jira/browse/KAFKA-5600 which looked
related, but it's fixed in 0.11.0.1.

Any other ideas what the issue could be?


[2017-11-01 14:09:46,805] INFO [Group Metadata Manager on Broker 1]:
Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.group.Group
MetadataManager)
[2017-11-01 14:19:46,805] INFO [Group Metadata Manager on Broker 1]:
Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.group.Group
MetadataManager)
[2017-11-01 14:24:23,627] INFO [GroupCoordinator 1]: Preparing to
rebalance group events-inst-agg-stream.aggregation.v1 with old generation
5 (__consumer_offsets-8) (kafka.coordinator.group.GroupCoordinator)
[2017-11-01 14:24:23,627] INFO [GroupCoordinator 1]: Group
events-inst-agg-stream.aggregation.v1 with generation 6 is now empty
(__consumer_offsets-8) (kafka.coordinator.group.GroupCoordinator)
[2017-11-01 14:29:46,540] INFO [GroupCoordinator 1]: Preparing to
rebalance group events-inst-agg-stream.aggregation.v1 with old generation
6 (__consumer_offsets-8) (kafka.coordinator.group.GroupCoordinator)
[2017-11-01 14:29:46,546] INFO [GroupCoordinator 1]: Stabilized group
events-inst-agg-stream.aggregation.v1 generation 7 (__consumer_offsets-8)
(kafka.coordinator.group.GroupCoordinator)
[2017-11-01 14:29:46,551] INFO [GroupCoordinator 1]: Assignment received
from leader for group events-inst-agg-stream.aggregation.v1 for
generation 7 (kafka.coordinator.group.GroupCoordinator)
[2017-11-01 14:29:46,832] INFO Rolled new log segment for
'__consumer_offsets-5' in 26 ms. (kafka.log.Log)
[2017-11-01 14:29:46,833] INFO [Group Metadata Manager on Broker 1]:
Removed 3 expired offsets in 29 milliseconds. (kafka.coordinator.group.Group
MetadataManager)
[2017-11-01 14:30:23,642] INFO [GroupCoordinator 1]: Preparing to
rebalance group key-stats-kafka-stream.stats.v1 with old generation 11
(__consumer_offsets-5) (kafka.coordinator.group.GroupCoordinator)
[2017-11-01 14:30:23,644] INFO [GroupCoordinator 1]: Group
key-stats-kafka-stream.stats.v1 with generation 12 is now empty
(__consumer_offsets-5) (kafka.coordinator.group.GroupCoordinator)
[2017-11-01 14:30:54,731] INFO Deleting segment 0 from log
__consumer_offsets-5. (kafka.log.Log)
[2017-11-01 14:30:54,767] INFO Deleting index
/var/lib/kafka/data/topics/__consumer_offsets-5/.index.deleted
(kafka.log.OffsetIndex)
[2017-11-01 14:30:54,767] INFO Deleting index
/var/lib/kafka/data/topics/__consumer_offsets-5/.timeindex.deleted
(kafka.log.TimeIndex)
[2017-11-01 14:36:15,590] INFO [GroupCoordinator 1]: Preparing to
rebalance group key-stats-kafka-stream.stats.v1 with old generation 12
(__consumer_offsets-5) (kafka.coordinator.group.GroupCoordinator)
[2017-11-01 14:36:15,594] INFO [GroupCoordinator 1]: Stabilized group
key-stats-kafka-stream.stats.v1 generation 13 (__consumer_offsets-5)
(kafka.coordinator.group.GroupCoordinator)
[2017-11-01 14:36:15,612] INFO [GroupCoordinator 1]: 

How do I instantiate a metrics reporter in Kafka Streams, with custom config?

2017-11-01 Thread James Cheng
Hi, we have a KafkaStreams app. We specify a custom metric reporter by doing:

Properties config = new Properties();
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
"kafka-broker1:9092");
config.put(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, 
"com.mycompany.MetricReporter");
config.put("custom-key-for-metric-reporter", "value");

Previously, our metric reporter would get passed the properties object upon 
instantiation, and would retrieve its custom config. It appears that in recent 
releases, that in order to apply my metric reporter to the consumer, that I 
have to specify my config as "consumer.custom-key-for-metric-reporter". And for 
the producer, I have to specify it as 
"producer.custom-key-for-metric-reporter". If I don't prefix it with 
"consumer." or "producer." , it appears it gets stripped out from the 
properties object that is passsed to my metric reporter when the 
consumer/producer gets initialized, and so my metric reporter can't get its 
config.

That means that if I have a metrics reporter and I want to collect producer and 
consumer metrics, as well as kafka-streams metrics, that I have to specify my 
custom config 3 times:
1) consumer.custom-key-for-metric-reporter
2) producer.custom-key-for-metric-reporter
3) custom-key-for-metric-reporter

Is that behavior as designed or is that a bug? What is the desired behavior for 
non-recognized keys in the properties object?

And actually, for the metrics.reporter class itself, am I supposed to specify 
it as:

metrics.reporter

or

metric.reporters
producer.metric.reporters
consumer.metric.reporters

Thanks,
-James



Re: Client Offset Deleted by Broker without cause

2017-11-01 Thread Ted Yu
bq. attached screenshots from the log viewer

The screenshots didn't go through. Consider using 3rd party site.

On Wed, Nov 1, 2017 at 9:18 AM, Elmar Weber  wrote:

> Hello,
>
> I had this morning the issue that a client offset got deleted from a
> broker as it seems.
>
> (Kafka 0.11.0.1 with patch for KAFKA-6030 on top)
>
> It happened in a test environment and the pipeline stage got re-deployed
> multiple times, during one restart, when the consumer reconnected, it
> didn't get any offset and started from the beginning as per it's
> configuration. The queue didn't receive any new events in the last 48h, so
> any restart should not do anything.
>
> Here the relevant consumer logs:
>
>
> 2017-10-31 17:15:08.453 Successfully joined group
> events-inst-agg-stream.aggregation.v1 with generation 5
> 2017-11-01 14:29:46.554 Successfully joined group
> events-inst-agg-stream.aggregation.v1 with generation 7
> 2017-11-01 14:51:14.639 Successfully joined group
> events-inst-agg-stream.aggregation.v1 with generation 9
> 2017-11-01 14:51:19.068 Committing Map(GroupTopicPartition(events
> -inst-agg-stream.aggregation.v1,events.lg,0) -> 3830)
> 2017-11-01 14:51:24.083 Committing Map(GroupTopicPartition(events
> -inst-agg-stream.aggregation.v1,events.lg,0) -> 11339)
>
> You can see the restarts at 17:15 yesterday and 14:29 today were normal.
> Then the restart at 14:51 started reading from the beginning (we log
> committed offsets).
>
> The relevant leading broker did some "stuff" that was different between
> 14:29 and 14:51. The full logs are below.
> From what I can see it deleted a segment from the consumer offsets log,
> and the next time the consumer connected it got no offset.
>
> I can provide the logs of the other kafka nodes if it is useful. I also
> attached screenshots from the log viewer in case it's easier to read.
>
> I found https://issues.apache.org/jira/browse/KAFKA-5600 which looked
> related, but it's fixed in 0.11.0.1.
>
> Any other ideas what the issue could be?
>
>
> [2017-11-01 14:09:46,805] INFO [Group Metadata Manager on Broker 1]:
> Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.group.Group
> MetadataManager)
> [2017-11-01 14:19:46,805] INFO [Group Metadata Manager on Broker 1]:
> Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.group.Group
> MetadataManager)
> [2017-11-01 14:24:23,627] INFO [GroupCoordinator 1]: Preparing to
> rebalance group events-inst-agg-stream.aggregation.v1 with old generation
> 5 (__consumer_offsets-8) (kafka.coordinator.group.GroupCoordinator)
> [2017-11-01 14:24:23,627] INFO [GroupCoordinator 1]: Group
> events-inst-agg-stream.aggregation.v1 with generation 6 is now empty
> (__consumer_offsets-8) (kafka.coordinator.group.GroupCoordinator)
> [2017-11-01 14:29:46,540] INFO [GroupCoordinator 1]: Preparing to
> rebalance group events-inst-agg-stream.aggregation.v1 with old generation
> 6 (__consumer_offsets-8) (kafka.coordinator.group.GroupCoordinator)
> [2017-11-01 14:29:46,546] INFO [GroupCoordinator 1]: Stabilized group
> events-inst-agg-stream.aggregation.v1 generation 7 (__consumer_offsets-8)
> (kafka.coordinator.group.GroupCoordinator)
> [2017-11-01 14:29:46,551] INFO [GroupCoordinator 1]: Assignment received
> from leader for group events-inst-agg-stream.aggregation.v1 for
> generation 7 (kafka.coordinator.group.GroupCoordinator)
> [2017-11-01 14:29:46,832] INFO Rolled new log segment for
> '__consumer_offsets-5' in 26 ms. (kafka.log.Log)
> [2017-11-01 14:29:46,833] INFO [Group Metadata Manager on Broker 1]:
> Removed 3 expired offsets in 29 milliseconds. (kafka.coordinator.group.Group
> MetadataManager)
> [2017-11-01 14:30:23,642] INFO [GroupCoordinator 1]: Preparing to
> rebalance group key-stats-kafka-stream.stats.v1 with old generation 11
> (__consumer_offsets-5) (kafka.coordinator.group.GroupCoordinator)
> [2017-11-01 14:30:23,644] INFO [GroupCoordinator 1]: Group
> key-stats-kafka-stream.stats.v1 with generation 12 is now empty
> (__consumer_offsets-5) (kafka.coordinator.group.GroupCoordinator)
> [2017-11-01 14:30:54,731] INFO Deleting segment 0 from log
> __consumer_offsets-5. (kafka.log.Log)
> [2017-11-01 14:30:54,767] INFO Deleting index
> /var/lib/kafka/data/topics/__consumer_offsets-5/.index.deleted
> (kafka.log.OffsetIndex)
> [2017-11-01 14:30:54,767] INFO Deleting index
> /var/lib/kafka/data/topics/__consumer_offsets-5/.timeindex.deleted
> (kafka.log.TimeIndex)
> [2017-11-01 14:36:15,590] INFO [GroupCoordinator 1]: Preparing to
> rebalance group key-stats-kafka-stream.stats.v1 with old generation 12
> (__consumer_offsets-5) (kafka.coordinator.group.GroupCoordinator)
> [2017-11-01 14:36:15,594] INFO [GroupCoordinator 1]: Stabilized group
> key-stats-kafka-stream.stats.v1 generation 13 (__consumer_offsets-5)
> (kafka.coordinator.group.GroupCoordinator)
> [2017-11-01 14:36:15,612] INFO [GroupCoordinator 1]: Assignment received
> from leader for group 

Client Offset Deleted by Broker without cause

2017-11-01 Thread Elmar Weber

Hello,

I had this morning the issue that a client offset got deleted from a 
broker as it seems.


(Kafka 0.11.0.1 with patch for KAFKA-6030 on top)

It happened in a test environment and the pipeline stage got re-deployed 
multiple times, during one restart, when the consumer reconnected, it 
didn't get any offset and started from the beginning as per it's 
configuration. The queue didn't receive any new events in the last 48h, 
so any restart should not do anything.


Here the relevant consumer logs:


2017-10-31 17:15:08.453 Successfully joined group 
events-inst-agg-stream.aggregation.v1 with generation 5
2017-11-01 14:29:46.554 Successfully joined group 
events-inst-agg-stream.aggregation.v1 with generation 7
2017-11-01 14:51:14.639 Successfully joined group 
events-inst-agg-stream.aggregation.v1 with generation 9
2017-11-01 14:51:19.068 Committing 
Map(GroupTopicPartition(events-inst-agg-stream.aggregation.v1,events.lg,0) 
-> 3830)
2017-11-01 14:51:24.083 Committing 
Map(GroupTopicPartition(events-inst-agg-stream.aggregation.v1,events.lg,0) 
-> 11339)


You can see the restarts at 17:15 yesterday and 14:29 today were normal. 
Then the restart at 14:51 started reading from the beginning (we log 
committed offsets).


The relevant leading broker did some "stuff" that was different between 
14:29 and 14:51. The full logs are below.
From what I can see it deleted a segment from the consumer offsets log, 
and the next time the consumer connected it got no offset.


I can provide the logs of the other kafka nodes if it is useful. I also 
attached screenshots from the log viewer in case it's easier to read.


I found https://issues.apache.org/jira/browse/KAFKA-5600 which looked 
related, but it's fixed in 0.11.0.1.


Any other ideas what the issue could be?


[2017-11-01 14:09:46,805] INFO [Group Metadata Manager on Broker 1]: 
Removed 0 expired offsets in 1 milliseconds. 
(kafka.coordinator.group.GroupMetadataManager)
[2017-11-01 14:19:46,805] INFO [Group Metadata Manager on Broker 1]: 
Removed 0 expired offsets in 1 milliseconds. 
(kafka.coordinator.group.GroupMetadataManager)
[2017-11-01 14:24:23,627] INFO [GroupCoordinator 1]: Preparing to 
rebalance group events-inst-agg-stream.aggregation.v1 with old 
generation 5 (__consumer_offsets-8) 
(kafka.coordinator.group.GroupCoordinator)
[2017-11-01 14:24:23,627] INFO [GroupCoordinator 1]: Group 
events-inst-agg-stream.aggregation.v1 with generation 6 is now empty 
(__consumer_offsets-8) (kafka.coordinator.group.GroupCoordinator)
[2017-11-01 14:29:46,540] INFO [GroupCoordinator 1]: Preparing to 
rebalance group events-inst-agg-stream.aggregation.v1 with old 
generation 6 (__consumer_offsets-8) 
(kafka.coordinator.group.GroupCoordinator)
[2017-11-01 14:29:46,546] INFO [GroupCoordinator 1]: Stabilized group 
events-inst-agg-stream.aggregation.v1 generation 7 
(__consumer_offsets-8) (kafka.coordinator.group.GroupCoordinator)
[2017-11-01 14:29:46,551] INFO [GroupCoordinator 1]: Assignment received 
from leader for group events-inst-agg-stream.aggregation.v1 for 
generation 7 (kafka.coordinator.group.GroupCoordinator)
[2017-11-01 14:29:46,832] INFO Rolled new log segment for 
'__consumer_offsets-5' in 26 ms. (kafka.log.Log)
[2017-11-01 14:29:46,833] INFO [Group Metadata Manager on Broker 1]: 
Removed 3 expired offsets in 29 milliseconds. 
(kafka.coordinator.group.GroupMetadataManager)
[2017-11-01 14:30:23,642] INFO [GroupCoordinator 1]: Preparing to 
rebalance group key-stats-kafka-stream.stats.v1 with old generation 11 
(__consumer_offsets-5) (kafka.coordinator.group.GroupCoordinator)
[2017-11-01 14:30:23,644] INFO [GroupCoordinator 1]: Group 
key-stats-kafka-stream.stats.v1 with generation 12 is now empty 
(__consumer_offsets-5) (kafka.coordinator.group.GroupCoordinator)
[2017-11-01 14:30:54,731] INFO Deleting segment 0 from log 
__consumer_offsets-5. (kafka.log.Log)
[2017-11-01 14:30:54,767] INFO Deleting index 
/var/lib/kafka/data/topics/__consumer_offsets-5/.index.deleted 
(kafka.log.OffsetIndex)
[2017-11-01 14:30:54,767] INFO Deleting index 
/var/lib/kafka/data/topics/__consumer_offsets-5/.timeindex.deleted 
(kafka.log.TimeIndex)
[2017-11-01 14:36:15,590] INFO [GroupCoordinator 1]: Preparing to 
rebalance group key-stats-kafka-stream.stats.v1 with old generation 12 
(__consumer_offsets-5) (kafka.coordinator.group.GroupCoordinator)
[2017-11-01 14:36:15,594] INFO [GroupCoordinator 1]: Stabilized group 
key-stats-kafka-stream.stats.v1 generation 13 (__consumer_offsets-5) 
(kafka.coordinator.group.GroupCoordinator)
[2017-11-01 14:36:15,612] INFO [GroupCoordinator 1]: Assignment received 
from leader for group key-stats-kafka-stream.stats.v1 for generation 13 
(kafka.coordinator.group.GroupCoordinator)
[2017-11-01 14:39:46,805] INFO [Group Metadata Manager on Broker 1]: 
Removed 0 expired offsets in 1 milliseconds. 
(kafka.coordinator.group.GroupMetadataManager)
[2017-11-01 14:49:46,804] INFO [Group Metadata Manager on 

The internals of commitSync failure

2017-11-01 Thread Kristopher Kane
Is it possible that an exception returned by a consumer on commitSync could
internally succeed for one or more partitions but fail for others and
return or is it all or none of the partitions?

Thanks,

Kris


[ANNOUNCE] Apache Kafka 1.0.0 Released

2017-11-01 Thread Guozhang Wang
The Apache Kafka community is pleased to announce the release for Apache
Kafka 1.0.0.

This is a major release of the Kafka project, and is no mere bump of the
version number. The Apache Kafka Project Management Committee has packed a
number of valuable enhancements into the release. Let me summarize a few of
them:

** Since its introduction in version 0.10, the Streams API has become
hugely popular among Kafka users, including the likes of Pinterest,
Rabobank, Zalando, and The New York Times. In 1.0, the the API continues to
evolve at a healthy pace. To begin with, the builder API has been improved
(KIP-120). A new API has been added to expose the state of active tasks at
runtime (KIP-130). Debuggability gets easier with enhancements to the
print() and writeAsText() methods (KIP-160). And if that’s not enough,
check out KIP-138 and KIP-161 too. For more on streams, check out the
Apache Kafka Streams documentation (https://kafka.apache.org/docu
mentation/streams/), including some helpful new tutorial videos.

** Operating Kafka at scale requires that the system remain observable, and
to make that easier, we’ve made a number of improvements to metrics. These
are too many to summarize without becoming tedious, but Connect metrics
have been significantly improved (KIP-196), a litany of new health check
metrics are now exposed (KIP-188), and we now have a global topic and
partition count (KIP-168). Check out KIP-164 and KIP-187 for even more.

** We now support Java 9, leading, among other things, to significantly
faster TLS and CRC32C implementations. Over-the-wire encryption will be
faster now, which will keep Kafka fast and compute costs low when
encryption is enabled.

** In keeping with the security theme, KIP-152 cleans up the error handling
on Simple Authentication Security Layer (SASL) authentication attempts.
Previously, some authentication error conditions were indistinguishable
from broker failures and were not logged in a clear way. This is cleaner
now.

** Kafka can now tolerate disk failures better. Historically, JBOD storage
configurations have not been recommended, but the architecture has
nevertheless been tempting: after all, why not rely on Kafka’s own
replication mechanism to protect against storage failure rather than using
RAID? With KIP-112, Kafka now handles disk failure more gracefully. A
single disk failure in a JBOD broker will not bring the entire broker down;
rather, the broker will continue serving any log files that remain on
functioning disks.

** Since release 0.11.0, the idempotent producer (which is the producer
used in the presence of a transaction, which of course is the producer we
use for exactly-once processing) required max.in.flight.requests.per.connection
to be equal to one. As anyone who has written or tested a wire protocol can
attest, this put an upper bound on throughput. Thanks to KAFKA-5949, this
can now be as large as five, relaxing the throughput constraint quite a bit.


All of the changes in this release can be found in the release notes:

https://dist.apache.org/repos/dist/release/kafka/1.0.0/RELEASE_NOTES.html


You can download the source release from:

https://www.apache.org/dyn/closer.cgi?path=/kafka/1.0.0/kafka-1.0.0-src.tgz

and binary releases from:

https://www.apache.org/dyn/closer.cgi?path=/kafka/1.0.0/kafka_2.11-1.0.0.tgz
(Scala
2.11)
https://www.apache.org/dyn/closer.cgi?path=/kafka/1.0.0/kafka_2.12-1.0.0.tgz
(Scala
2.12)



---

Apache Kafka is a distributed streaming platform with four four core APIs:

** The Producer API allows an application to publish a stream records to one
or more Kafka topics.

** The Consumer API allows an application to subscribe to one or more topics
and process the stream of records produced to them.

** The Streams API allows an application to act as a stream processor,
consuming
an input stream from one or more topics and producing an output stream to
one or more output topics, effectively transforming the input streams to
output streams.

** The Connector API allows building and running reusable producers or
consumers
that connect Kafka topics to existing applications or data systems. For
example, a connector to a relational database might capture every change to
a table.three key capabilities:


With these APIs, Kafka can be used for two broad classes of application:

** Building real-time streaming data pipelines that reliably get data between
systems or applications.

** Building real-time streaming applications that transform or react
to the streams
of data.


Apache Kafka is in use at large and small companies worldwide, including
Capital One, Goldman Sachs, ING, LinkedIn, Netflix, Pinterest, Rabobank,
Target, The New York Times, Uber, Yelp, and Zalando, among others.


A big thank you for the following 108 contributors to this release!

Abhishek Mendhekar, Xi Hu, Andras Beni, Andrey Dyachkov, Andy Chambers,

Re: WordCount Example using GlobalKStore

2017-11-01 Thread Damian Guy
Count will always use a StateStore, but if you want you can use an InMemory
store if you don't want a persistent store. You can do this by using the
overloaded `count(StateStoreSupplier)` method. You would use
`Stores.create(name).inMemory()...` to create the inmemory store

On Wed, 1 Nov 2017 at 11:22 pravin kumar  wrote:

> i have created 3 inputtopics with 10 partitions each and output Topic with
> 10 partitions
>
> I did wordcount example  and stored it in GlobalKTable.
> i initally stored counted value  in LocalStateStore and then it to
> GlobalStateStore.
>
> i have atteated the code here:
> https://gist.github.com/Pk007790/d46236b1b5c394301f27b96891a94584
>
> and i have supplied the inputs to the producers like this
> :https://gist.github.com/Pk007790/ba934b7bcea42b8b05f4816de3cb84a0
>
> my ques is:how to store the processed information in GlobalStateStore
> without localStateStore
>


WordCount Example using GlobalKStore

2017-11-01 Thread pravin kumar
i have created 3 inputtopics with 10 partitions each and output Topic with
10 partitions

I did wordcount example  and stored it in GlobalKTable.
i initally stored counted value  in LocalStateStore and then it to
GlobalStateStore.

i have atteated the code here:
https://gist.github.com/Pk007790/d46236b1b5c394301f27b96891a94584

and i have supplied the inputs to the producers like this
:https://gist.github.com/Pk007790/ba934b7bcea42b8b05f4816de3cb84a0

my ques is:how to store the processed information in GlobalStateStore
without localStateStore


GlobalKStore

2017-11-01 Thread pravin kumar
i have created 3 topics with 10 partitions each

i have intended to store processed information in globalKtable

now i have did with individual Ktable to Output topic then to GlobalKtable

#//

KStreamBuilder builder=new KStreamBuilder();
KStream inputStream =
builder.stream(INPUT_TOPICA,INPUT_TOPICB,INPUT_TOPICC)
.map(((key, value) -> new KeyValue<>(value, value)))
.groupByKey()
.count(INPUT_TABLE)
.toStream();
inputStream.to(Serdes.String(),Serdes.Long(),OUTPUT_TOPIC);
GlobalKTable objectObjectGlobalKTable =
builder.globalTable(OUTPUT_TOPIC);
KafkaStreams kafkaStreams=new KafkaStreams(builder,props);

//#


how to do it without localStateStore??


Re: kafka user

2017-11-01 Thread Kamal Chandraprakash
Please follow the instructions listed here https://kafka.apache.org/contact

You have to send a mail to 'users-subscr...@kafka.apache.org' to subscribe
to user list.

On Tue, Oct 31, 2017 at 1:06 PM, Karthigeyan 
wrote:

> Hi ,
>
> pls add to the user group.
>
> Thanks ,
>
> Karthigeyan
>
>