Re: Kafka producer does not respect added partitions

2017-03-24 Thread Manikumar
producer refreshes the metadata for every metadata.max.age.ms (default
5min) to
discover new partitions.

On Sat, Mar 25, 2017 at 2:22 AM, Robert Quinlivan 
wrote:

> Hello,
>
> I have added partitions to a topic. The new partitions appear in the
> consumer assignments and in the topics listing they have the correct number
> of ISRs. However, the producer still does not write to the new partitions.
>
> My producer writes in a round-robin fashion, using the Cluster's reported
> partition count. I have seen no mention of a need to restart or reconfigure
> the producer in order to pick up the added partitions. Is this required?
>
> Thanks
> --
> Robert Quinlivan
> Software Engineer, Signal
>


Kafka producer does not respect added partitions

2017-03-24 Thread Robert Quinlivan
Hello,

I have added partitions to a topic. The new partitions appear in the
consumer assignments and in the topics listing they have the correct number
of ISRs. However, the producer still does not write to the new partitions.

My producer writes in a round-robin fashion, using the Cluster's reported
partition count. I have seen no mention of a need to restart or reconfigure
the producer in order to pick up the added partitions. Is this required?

Thanks
-- 
Robert Quinlivan
Software Engineer, Signal


Re: YASSQ (yet another state store question)

2017-03-24 Thread Eno Thereska
Hi Jon,

This is expected, see this: 
https://groups.google.com/forum/?pli=1#!searchin/confluent-platform/migrated$20to$20another$20instance%7Csort:relevance/confluent-platform/LglWC_dZDKw/qsPuCRT_DQAJ
 
.
 

Thanks
Eno
> On 24 Mar 2017, at 20:51, Jon Yeargers  wrote:
> 
> I've setup a KTable as follows:
> 
> KTable outTable = sourceStream.groupByKey().
> reduce(rowReducer,
>TimeWindows.of(5 * 60 * 1000L).advanceBy(1 * 60 *
> 1000).until(10 * 60 * 1000L),
>"AggStore");
> 
> I can confirm its presence via 'streams.allMetadata()' (accessible through
> a simple httpserver).
> 
> When I call 'ReadOnlyKeyValueStore store =
> kafkaStreams.store("AggStore", QueryableStoreTypes.keyValueStore());'
> 
> I get this exception:
> 
> org.apache.kafka.streams.errors.InvalidStateStoreException: the state
> store, AggStore, may have migrated to another instance.
>at
> org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:49)
>at
> org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:378)
>at
> com.cedexis.videokafka.videohouraggregator.RequestHandler.handle(RequestHandler.java:97)
>at com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:79)
>at sun.net.httpserver.AuthFilter.doFilter(AuthFilter.java:83)
>at com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:82)
>at
> sun.net.httpserver.ServerImpl$Exchange$LinkHandler.handle(ServerImpl.java:675)
>at com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:79)
>at sun.net.httpserver.ServerImpl$Exchange.run(ServerImpl.java:647)
>at
> sun.net.httpserver.ServerImpl$DefaultExecutor.execute(ServerImpl.java:158)
>at
> sun.net.httpserver.ServerImpl$Dispatcher.handle(ServerImpl.java:431)
>at sun.net.httpserver.ServerImpl$Dispatcher.run(ServerImpl.java:396)
>at java.lang.Thread.run(Thread.java:745)
> 
> 
> ... except.. there is only one instance.. running locally.



YASSQ (yet another state store question)

2017-03-24 Thread Jon Yeargers
I've setup a KTable as follows:

KTable outTable = sourceStream.groupByKey().
reduce(rowReducer,
TimeWindows.of(5 * 60 * 1000L).advanceBy(1 * 60 *
1000).until(10 * 60 * 1000L),
"AggStore");

I can confirm its presence via 'streams.allMetadata()' (accessible through
a simple httpserver).

When I call 'ReadOnlyKeyValueStore store =
kafkaStreams.store("AggStore", QueryableStoreTypes.keyValueStore());'

I get this exception:

org.apache.kafka.streams.errors.InvalidStateStoreException: the state
store, AggStore, may have migrated to another instance.
at
org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:49)
at
org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:378)
at
com.cedexis.videokafka.videohouraggregator.RequestHandler.handle(RequestHandler.java:97)
at com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:79)
at sun.net.httpserver.AuthFilter.doFilter(AuthFilter.java:83)
at com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:82)
at
sun.net.httpserver.ServerImpl$Exchange$LinkHandler.handle(ServerImpl.java:675)
at com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:79)
at sun.net.httpserver.ServerImpl$Exchange.run(ServerImpl.java:647)
at
sun.net.httpserver.ServerImpl$DefaultExecutor.execute(ServerImpl.java:158)
at
sun.net.httpserver.ServerImpl$Dispatcher.handle(ServerImpl.java:431)
at sun.net.httpserver.ServerImpl$Dispatcher.run(ServerImpl.java:396)
at java.lang.Thread.run(Thread.java:745)


... except.. there is only one instance.. running locally.


Re: Should MirrorMaker produce round robin with this configuration?

2017-03-24 Thread Chris Neal
lol...well, I take it all back.  Now I can't get it to work at all :(

Here's what I have:

*consumer.properties*
zookeeper.connect=[server_list]
# I was changing the group.id each time in case that was causing some issues
group.id=MirrorMakerTest7
client.id=MirrorMakerConsumer

*producer.properties*
metadata.broker.list=[server_list]
compression.codec=gzip
producer.type=async
batch.num.messages=100
message.send.max.retries=2
client.id=MirrorMakerProducer

*script:*
#!/bin/sh

export KAFKA_HEAP_OPTS="-Xmx32M -Xms32M"

/home/chris.neal/kafka_2.10-0.8.1.1/bin/kafka-run-class.sh
kafka.tools.MirrorMaker \
--producer.config /home/chris.neal/mirror_maker/test3/producer.properties \
--consumer.config /home/chris.neal/mirror_maker/test3/consumer.properties \
--num.streams=2 \
--num.producers=2 \
--whitelist 'mytopic'

The topic definition is as follows:

*Source cluster:*
Topic: mytopic PartitionCount:2 ReplicationFactor:2 Configs:
Topic: mytopic Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: mytopic Partition: 1 Leader: 0 Replicas: 0,1 Isr: 0,1

*Target cluster:*
Topic: mytopic PartitionCount:2 ReplicationFactor:2 Configs:
Topic: mytopic Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: mytopic Partition: 1 Leader: 2 Replicas: 2,3 Isr: 2,3

The target has one more replica (3 servers in cluster vs. 2), but that's
the only difference.  I've spent seriously all day trying various configs,
and I can't get this simple thing working...

Sorry to keep bugging you for something that should be quite simple.  :(
Chris



On Fri, Mar 24, 2017 at 12:37 PM, Chris Neal  wrote:

> Again, thank you for the feedback.  That link was very helpful!
>
> I adjusted my consumer/producer configs to be:
>
> consumer:
> zookeeper.connect=[server_list_here]
> group.id=MirrorMaker
> exclude.internal.topics=true
> client.id=MirrorMakerConsumer
>
> producer:
> metadata.broker.list=[server_list_here]
> compression.codec=gzip
> producer.type=async
> batch.size=100
> message.send.max.retries=2
> client.id=MirrorMakerProducer
>
> script file:
> /home/chris.neal/kafka_2.10-0.8.1.1/bin/kafka-run-class.sh
> kafka.tools.MirrorMaker \
> --producer.config /home/chris.neal/mirror_maker/producer.properties \
> --consumer.config /home/chris.neal/mirror_maker/consumer.properties \
> --num.streams=18 \
> --num.producers=18 \
> --whitelist '[topic_list_here]'
>
> The num.streams matches the *total* of all partitions for the topics I am
> consuming.  The topics on both the source/target cluster have the same
> partitions as well.  I did have to adjust one to make this true.
>
> But...The messages still all go to partition 1 for all the topics. :(
>
> The only time I have been able to get it to round-robin is when I pick
> just one topic.  That really makes me think it is still a configuration
> issue that is causing my problems (probably num.streams).
>
> Thanks again for working through this with me.  VERY much appreciate your
> help.
> Chris
>
>
> On Fri, Mar 24, 2017 at 10:48 AM, Manikumar 
> wrote:
>
>> producer distributes the non-keyed messages to available target partitions
>> in a
>> round-robin fashion.
>>
>> You don't need to set num.consumer.fetchers, partition.assignment.strategy
>> props.
>> Use the --num.streams option to specify the number of consumer threads to
>> create.
>>
>> https://community.hortonworks.com/articles/79891/kafka-mirro
>> r-maker-best-practices.html
>>
>> On Fri, Mar 24, 2017 at 8:49 PM, Chris Neal  wrote:
>>
>> > Thanks very much for the reply Manikumar!
>> >
>> > I found that there were a few topics on the source cluster that had more
>> > than two partitions, but all topics on the target cluster had 2
>> > partitions.  I did a test between one topic that had 2 on each, and I
>> did
>> > get messages to both partitions as expected.
>> >
>> > Is there a way to get this same behavior if the number of partitions on
>> the
>> > source and target topic are different?
>> >
>> > What should I set the num.consumer.fetchers property to?  I believe I
>> > remember reading that it needed to be at least equal to the number of
>> > partitions on the topic.
>> >
>> > For example, if I'm reading from:
>> >
>> > topicA:  2 partitions
>> > topicB:  3 partitions
>> > topicC:  1 partition
>> >
>> > should i set  num.consumer.fetchers to 3, or 6? :)
>> >
>> > Again, thank you!!
>> >
>> >
>> > On Fri, Mar 24, 2017 at 12:57 AM, Manikumar 
>> > wrote:
>> >
>> > > Are you sure target cluster topics have more than one partition?
>> > > If you are sending keyed messages, they may be going to the same
>> > partition.
>> > >
>> > > On Thu, Mar 23, 2017 at 11:15 PM, Chris Neal 
>> wrote:
>> > >
>> > > > Hi everyone,
>> > > >
>> > > > I am using MirrorMaker to consume from a 0.8.2.2 cluster and produce
>> > to a
>> > > > 0.10.2 cluster.  All the topics have two partitions on both
>> clusters.
>> > My
>> > > > consumer.properties is:
>> 

how to get parallel processing with kafka and akka

2017-03-24 Thread Laxmi Narayan
Hi,

 Kafka has partitions and akka can do parallel processing.
I have one perfect use-case where I have to read data in parallel.
But seems like partitions does not give me any extra info other than
partition number and how do i make sure that data_x should always go to
x-partition next time  and I want to keep it consistent , means if new
data_k comes that should create one more partition k, is there is any way
of associating some meta data so that next time I should push it back in
same and if not present , i can create one more.

may be I can keep created 10,000 partitions and keep associating over and
over until
my all partitions are consumed.

Example :
eg: I have twitter handles, how do I make sure to feed all tweets from
same handle go to same partition and It should not affect once a new handle
gets added.

2. In akka I can keep ready my consumers inside actors and keep listing to
partitions in parallel. But for newly added handle how do I create one more
actor-child to start listening, I do not want to scan meta data entirely
(seems like , after 1mints I can do
scan is only way to get this done.) .


Keep learning keep moving .


Re: Question about kafka-streams task load balancing

2017-03-24 Thread Guozhang Wang
Hi Karthik,

I think in the current trunk we do effectively load balance across
processes (they are named as "clients" in the partition assignor) already.
More specifically:

1. Consumer clients embedded a "client UUID" in its subscription so that
the leader can group them into a single client, whose "capacity" is the
number of threads it has.
2. Suppose there are N tasks in total, and M capacities (i.e. M
num.total.threads): then for each client with a capacity of m, it will
likely to get (N / M) * m tasks no matter if N > M or N < M.
3. So in the case of N < M even, say in the above example N = 9 and M = 28,
each client should have 9 / 28 * 14 = 4.5 tasks.


You could try to build your app from Kafka trunk and see if this is the
case in your scenario. Never the less, Matthias point is still valid that
we do not recommend you ever have N < M since it will result in idle
threads.

Guozhang


On Tue, Mar 21, 2017 at 4:56 PM, Matthias J. Sax 
 wrote:

> Hi,
>
> I guess, it's currently not possible to load balance between different
> machines. It might be a nice optimization to add into Streams though.
>
> Right now, you should reduce the number of threads. Load balancing is
> based on threads, and thus, if Streams place tasks to all threads of one
> machine, it will automatically assign the remaining tasks to thread of
> the second machine.
>
> Btw: If you have only 9 input partitions, you will get most likely 9
> tasks (might be more, depending on your topology structure) and thus,
> you cannot utilize more then 9 thread anyway. Thus, running with 28
> thread will most likely result in many idle threads.
>
> See the docs for more details:
>
>  -
> http://docs.confluent.io/current/streams/architecture.
> html#parallelism-model
>  -
> http://docs.confluent.io/current/streams/architecture.html#threading-model
>
>
>
> -Matthias
>
> On 3/21/17 3:40 PM, Prasad, Karthik wrote:
> > Hey,
> >
> > I have a typical scenario of a kafka-streams application in a production
> environment.
> >
> > We have a kafka-cluster with multiple topics. Messages from one topic is
> being consumed by a the kafka-streams application. The topic, currently,
> has 9 partitions. We have configured consumer thread count to 14. We are
> running 2 instances of this stream application on 2 different machines,
> thereby consisting of 28 threads across both machines. The group id for the
> consumers are the same. But, what I observe is that all partitions are
> being assigned to threads on a single machine. Now, I do understand that if
> the task on the active machine fails, then the threads in the other machine
> would take over. My question is that is there a way that kafka-streams can
> auto-balance across instances of the same stream application ? If yes, how
> do I go about doing that ? Please let me know. Thanks,
> >
> > Best,
> > Karthik Prasad
> > Senior Software Engineer
> > Sony Interactive Entertainment
> >
> >
> >
>
>


-- 
Thanks,
Guozhang

*Guozhang Wang | Software Engineer | Confluent | +1 607.339.8352
<607.339.8352> *

On Tue, Mar 21, 2017 at 4:56 PM, Matthias J. Sax 
wrote:

> Hi,
>
> I guess, it's currently not possible to load balance between different
> machines. It might be a nice optimization to add into Streams though.
>
> Right now, you should reduce the number of threads. Load balancing is
> based on threads, and thus, if Streams place tasks to all threads of one
> machine, it will automatically assign the remaining tasks to thread of
> the second machine.
>
> Btw: If you have only 9 input partitions, you will get most likely 9
> tasks (might be more, depending on your topology structure) and thus,
> you cannot utilize more then 9 thread anyway. Thus, running with 28
> thread will most likely result in many idle threads.
>
> See the docs for more details:
>
>  -
> http://docs.confluent.io/current/streams/architecture.
> html#parallelism-model
>  -
> http://docs.confluent.io/current/streams/architecture.html#threading-model
>
>
>
> -Matthias
>
> On 3/21/17 3:40 PM, Prasad, Karthik wrote:
> > Hey,
> >
> > I have a typical scenario of a kafka-streams application in a production
> environment.
> >
> > We have a kafka-cluster with multiple topics. Messages from one topic is
> being consumed by a the kafka-streams application. The topic, currently,
> has 9 partitions. We have configured consumer thread count to 14. We are
> running 2 instances of this stream application on 2 different machines,
> thereby consisting of 28 threads across both machines. The group id for the
> consumers are the same. But, what I observe is that all partitions are
> being assigned to threads on a single machine. Now, I do understand that if
> the task on the active machine fails, then the threads in the other machine
> would take over. My question is that is there a way that kafka-streams can
> auto-balance across instances of the same stream application ? If yes, how
> do I go about doing that ? Please 

Re: APPLICATION_SERVER_CONFIG ?

2017-03-24 Thread Jon Yeargers
You make some great cases for your architecture. To be clear - Ive been
proselytizing for kafka since I joined this company last year. I think my
largest issue is rethinking some preexisting notions about streaming to
make them work in the kstream universe.

On Fri, Mar 24, 2017 at 6:07 AM, Michael Noll  wrote:

> > If I understand this correctly: assuming I have a simple aggregator
> > distributed across n-docker instances each instance will _also_ need to
> > support some sort of communications process for allowing access to its
> > statestore (last param from KStream.groupby.aggregate).
>
> Yes.
>
> See
> http://docs.confluent.io/current/streams/developer-
> guide.html#your-application-and-interactive-queries
> .
>
> > - The tombstoning facilities of redis or C* would lend themselves well to
> > implementing a 'true' rolling aggregation
>
> What is a 'true' rolling aggregation, and how could Redis or C* help with
> that in a way that Kafka can't?  (Honest question.)
>
>
> > I get that RocksDB has a small footprint but given the choice of
> > implementing my own RPC / gossip-like process for data sharing and using
> a
> > well tested one (ala C* or redis) I would almost always opt for the
> latter.
> > [...]
> > Just my $0.02. I would love to hear why Im missing the 'big picture'. The
> > kstreams architecture seems rife with potential.
>
> One question is, for example:  can the remote/central DB of your choice
> (Redis, C*) handle as many qps as Kafka/Kafka Streams/RocksDB can handle?
> Over the network?  At the same low latency?  Also, what happens if the
> remote DB is unavailable?  Do you wait and retry?  Discard?  Accept the
> fact that your app's processing latency will now go through the roof?  I
> wrote about some such scenarios at
> https://www.confluent.io/blog/distributed-real-time-joins-
> and-aggregations-on-user-activity-events-using-kafka-streams/
> .
>
> One big advantage (for many use cases, not for) with Kafka/Kafka Streams is
> that you can leverage fault-tolerant *local* state that may also be
> distributed across app instances.  Local state is much more efficient and
> faster when doing stateful processing such as joins or aggregations.  You
> don't need to worry about an external system, whether it's up and running,
> whether its version is still compatible with your app, whether it can scale
> as much as your app/Kafka Streams/Kafka/the volume of your input data.
>
> Also, note that some users have actually opted to run hybrid setups:  Some
> processing output is sent to a remote data store like Cassandra (e.g. via
> Kafka Connect), some processing output is exposed directly through
> interactive queries.  It's not like your forced to pick only one approach.
>
>
> > - Typical microservices would separate storing / retrieving data
>
> I'd rather argue that for microservices you'd oftentimes prefer to *not*
> use a remote DB, and rather do everything inside your microservice whatever
> the microservice needs to do (perhaps we could relax this to "do everything
> in a way that your microservices is in full, exclusive control", i.e. it
> doesn't necessarily need to be *inside*, but arguably it would be better if
> it actually is).
> See e.g. the article
> https://www.confluent.io/blog/data-dichotomy-rethinking-the-
> way-we-treat-data-and-services/
> that lists some of the reasoning behind this school of thinking.  Again,
> YMMV.
>
> Personally, I think there's no simple true/false here.  The decisions
> depend on what you need, what your context is, etc.  Anyways, since you
> already have some opinions for the one side, I wanted to share some food
> for thought for the other side of the argument. :-)
>
> Best,
> Michael
>
>
>
>
>
> On Fri, Mar 24, 2017 at 1:25 PM, Jon Yeargers 
> wrote:
>
> > If I understand this correctly: assuming I have a simple aggregator
> > distributed across n-docker instances each instance will _also_ need to
> > support some sort of communications process for allowing access to its
> > statestore (last param from KStream.groupby.aggregate).
> >
> > How would one go about substituting a separated db (EG redis) for the
> > statestore?
> >
> > Some advantages to decoupling:
> > - It would seem like having a centralized process like this would
> alleviate
> > the need to execute multiple requests for a given kv pair (IE "who has
> this
> > data?" and subsequent requests to retrieve it).
> > - it would take some pressure off of each node to maintain a large disk
> > store
> > - Typical microservices would separate storing / retrieving data
> > - It would raise some eyebrows if a spec called for a mysql/nosql
> instance
> > to be installed with every docker container
> > - The tombstoning facilities of redis or C* would lend themselves well to
> > implementing a 'true' rolling aggregation
> >
> > I get that RocksDB has a small footprint but given the choice of
> > implementing my own RPC / gossip-like process for 

Re: Should MirrorMaker produce round robin with this configuration?

2017-03-24 Thread Chris Neal
Again, thank you for the feedback.  That link was very helpful!

I adjusted my consumer/producer configs to be:

consumer:
zookeeper.connect=[server_list_here]
group.id=MirrorMaker
exclude.internal.topics=true
client.id=MirrorMakerConsumer

producer:
metadata.broker.list=[server_list_here]
compression.codec=gzip
producer.type=async
batch.size=100
message.send.max.retries=2
client.id=MirrorMakerProducer

script file:
/home/chris.neal/kafka_2.10-0.8.1.1/bin/kafka-run-class.sh
kafka.tools.MirrorMaker \
--producer.config /home/chris.neal/mirror_maker/producer.properties \
--consumer.config /home/chris.neal/mirror_maker/consumer.properties \
--num.streams=18 \
--num.producers=18 \
--whitelist '[topic_list_here]'

The num.streams matches the *total* of all partitions for the topics I am
consuming.  The topics on both the source/target cluster have the same
partitions as well.  I did have to adjust one to make this true.

But...The messages still all go to partition 1 for all the topics. :(

The only time I have been able to get it to round-robin is when I pick just
one topic.  That really makes me think it is still a configuration issue
that is causing my problems (probably num.streams).

Thanks again for working through this with me.  VERY much appreciate your
help.
Chris


On Fri, Mar 24, 2017 at 10:48 AM, Manikumar 
wrote:

> producer distributes the non-keyed messages to available target partitions
> in a
> round-robin fashion.
>
> You don't need to set num.consumer.fetchers, partition.assignment.strategy
> props.
> Use the --num.streams option to specify the number of consumer threads to
> create.
>
> https://community.hortonworks.com/articles/79891/kafka-
> mirror-maker-best-practices.html
>
> On Fri, Mar 24, 2017 at 8:49 PM, Chris Neal  wrote:
>
> > Thanks very much for the reply Manikumar!
> >
> > I found that there were a few topics on the source cluster that had more
> > than two partitions, but all topics on the target cluster had 2
> > partitions.  I did a test between one topic that had 2 on each, and I did
> > get messages to both partitions as expected.
> >
> > Is there a way to get this same behavior if the number of partitions on
> the
> > source and target topic are different?
> >
> > What should I set the num.consumer.fetchers property to?  I believe I
> > remember reading that it needed to be at least equal to the number of
> > partitions on the topic.
> >
> > For example, if I'm reading from:
> >
> > topicA:  2 partitions
> > topicB:  3 partitions
> > topicC:  1 partition
> >
> > should i set  num.consumer.fetchers to 3, or 6? :)
> >
> > Again, thank you!!
> >
> >
> > On Fri, Mar 24, 2017 at 12:57 AM, Manikumar 
> > wrote:
> >
> > > Are you sure target cluster topics have more than one partition?
> > > If you are sending keyed messages, they may be going to the same
> > partition.
> > >
> > > On Thu, Mar 23, 2017 at 11:15 PM, Chris Neal  wrote:
> > >
> > > > Hi everyone,
> > > >
> > > > I am using MirrorMaker to consume from a 0.8.2.2 cluster and produce
> > to a
> > > > 0.10.2 cluster.  All the topics have two partitions on both clusters.
> > My
> > > > consumer.properties is:
> > > >
> > > > zookeeper.connect=[string of servers]
> > > > group.id=MirrorMaker
> > > > num.consumer.fetchers=2
> > > > partition.assignment.strategy=roundrobin
> > > >
> > > > My producer.properties is:
> > > >
> > > > metadata.broker.list=[string of servers]
> > > > compression.codec=gzip
> > > > producer.type=async
> > > > message.send.max.retries=2
> > > >
> > > > My understanding from the documentation is that this *should* work,
> but
> > > it
> > > > is not.  All my data is going into partition 1 on the target cluster
> > for
> > > > all topics.
> > > >
> > > > Can someone help me understand what I'm missing here?
> > > >
> > > > Thank you for your time!
> > > > Chris
> > > >
> > >
> >
>


Re: Should MirrorMaker produce round robin with this configuration?

2017-03-24 Thread Manikumar
producer distributes the non-keyed messages to available target partitions
in a
round-robin fashion.

You don't need to set num.consumer.fetchers, partition.assignment.strategy
props.
Use the --num.streams option to specify the number of consumer threads to
create.

https://community.hortonworks.com/articles/79891/kafka-mirror-maker-best-practices.html

On Fri, Mar 24, 2017 at 8:49 PM, Chris Neal  wrote:

> Thanks very much for the reply Manikumar!
>
> I found that there were a few topics on the source cluster that had more
> than two partitions, but all topics on the target cluster had 2
> partitions.  I did a test between one topic that had 2 on each, and I did
> get messages to both partitions as expected.
>
> Is there a way to get this same behavior if the number of partitions on the
> source and target topic are different?
>
> What should I set the num.consumer.fetchers property to?  I believe I
> remember reading that it needed to be at least equal to the number of
> partitions on the topic.
>
> For example, if I'm reading from:
>
> topicA:  2 partitions
> topicB:  3 partitions
> topicC:  1 partition
>
> should i set  num.consumer.fetchers to 3, or 6? :)
>
> Again, thank you!!
>
>
> On Fri, Mar 24, 2017 at 12:57 AM, Manikumar 
> wrote:
>
> > Are you sure target cluster topics have more than one partition?
> > If you are sending keyed messages, they may be going to the same
> partition.
> >
> > On Thu, Mar 23, 2017 at 11:15 PM, Chris Neal  wrote:
> >
> > > Hi everyone,
> > >
> > > I am using MirrorMaker to consume from a 0.8.2.2 cluster and produce
> to a
> > > 0.10.2 cluster.  All the topics have two partitions on both clusters.
> My
> > > consumer.properties is:
> > >
> > > zookeeper.connect=[string of servers]
> > > group.id=MirrorMaker
> > > num.consumer.fetchers=2
> > > partition.assignment.strategy=roundrobin
> > >
> > > My producer.properties is:
> > >
> > > metadata.broker.list=[string of servers]
> > > compression.codec=gzip
> > > producer.type=async
> > > message.send.max.retries=2
> > >
> > > My understanding from the documentation is that this *should* work, but
> > it
> > > is not.  All my data is going into partition 1 on the target cluster
> for
> > > all topics.
> > >
> > > Can someone help me understand what I'm missing here?
> > >
> > > Thank you for your time!
> > > Chris
> > >
> >
>


Re: Should MirrorMaker produce round robin with this configuration?

2017-03-24 Thread Chris Neal
Thanks very much for the reply Manikumar!

I found that there were a few topics on the source cluster that had more
than two partitions, but all topics on the target cluster had 2
partitions.  I did a test between one topic that had 2 on each, and I did
get messages to both partitions as expected.

Is there a way to get this same behavior if the number of partitions on the
source and target topic are different?

What should I set the num.consumer.fetchers property to?  I believe I
remember reading that it needed to be at least equal to the number of
partitions on the topic.

For example, if I'm reading from:

topicA:  2 partitions
topicB:  3 partitions
topicC:  1 partition

should i set  num.consumer.fetchers to 3, or 6? :)

Again, thank you!!


On Fri, Mar 24, 2017 at 12:57 AM, Manikumar 
wrote:

> Are you sure target cluster topics have more than one partition?
> If you are sending keyed messages, they may be going to the same partition.
>
> On Thu, Mar 23, 2017 at 11:15 PM, Chris Neal  wrote:
>
> > Hi everyone,
> >
> > I am using MirrorMaker to consume from a 0.8.2.2 cluster and produce to a
> > 0.10.2 cluster.  All the topics have two partitions on both clusters.  My
> > consumer.properties is:
> >
> > zookeeper.connect=[string of servers]
> > group.id=MirrorMaker
> > num.consumer.fetchers=2
> > partition.assignment.strategy=roundrobin
> >
> > My producer.properties is:
> >
> > metadata.broker.list=[string of servers]
> > compression.codec=gzip
> > producer.type=async
> > message.send.max.retries=2
> >
> > My understanding from the documentation is that this *should* work, but
> it
> > is not.  All my data is going into partition 1 on the target cluster for
> > all topics.
> >
> > Can someone help me understand what I'm missing here?
> >
> > Thank you for your time!
> > Chris
> >
>


Re: Relationship fetch.replica.max.bytes and message.max.bytes

2017-03-24 Thread Kostas Christidis
Cool - thanks for clarifying this!

On Thu, Mar 23, 2017 at 10:54 AM, Ismael Juma  wrote:
> Hi Kostas,
>
> Yes, equal is fine. The code that prints an error if replication fails due
> to this:
>
> error(s"Replication is failing due to a message that is greater than
> replica.fetch.max.bytes for partition $topicPartition. " +
> "This generally occurs when the max.message.bytes has been
> overridden to exceed this value and a suitably large " +
> "message has also been sent. To fix this problem increase
> replica.fetch.max.bytes in your broker config to be " +
> "equal or larger than your settings for max.message.bytes, both at
> a broker and topic level.")
>
> Ismael
>
> On Thu, Mar 23, 2017 at 2:37 PM, Kostas Christidis  wrote:
>
>> On Thu, Mar 23, 2017 at 5:04 AM, Ben Stopford  wrote:
>> > Hi Kostas - The docs for replica.fetch.max.bytes should be helpful here:
>> >
>> > The number of bytes of messages to attempt to fetch for each partition.
>> > This is not an absolute maximum, if the first message in the first
>> > non-empty partition of the fetch is larger than this value, the message
>> > will still be returned to ensure that progress can be made.
>>
>> Hi Ben
>>
>> When I'm reading this bit (which I have done before), I am left with
>> the impression that replica.fetch.max.bytes can in fact be equal to
>> message.max.bytes. Am I wrong? This interpretation goes against points
>> 3 and 4 in the original email.
>>
>> Kostas
>>


Re: APPLICATION_SERVER_CONFIG ?

2017-03-24 Thread Michael Noll
> If I understand this correctly: assuming I have a simple aggregator
> distributed across n-docker instances each instance will _also_ need to
> support some sort of communications process for allowing access to its
> statestore (last param from KStream.groupby.aggregate).

Yes.

See
http://docs.confluent.io/current/streams/developer-guide.html#your-application-and-interactive-queries
.

> - The tombstoning facilities of redis or C* would lend themselves well to
> implementing a 'true' rolling aggregation

What is a 'true' rolling aggregation, and how could Redis or C* help with
that in a way that Kafka can't?  (Honest question.)


> I get that RocksDB has a small footprint but given the choice of
> implementing my own RPC / gossip-like process for data sharing and using a
> well tested one (ala C* or redis) I would almost always opt for the
latter.
> [...]
> Just my $0.02. I would love to hear why Im missing the 'big picture'. The
> kstreams architecture seems rife with potential.

One question is, for example:  can the remote/central DB of your choice
(Redis, C*) handle as many qps as Kafka/Kafka Streams/RocksDB can handle?
Over the network?  At the same low latency?  Also, what happens if the
remote DB is unavailable?  Do you wait and retry?  Discard?  Accept the
fact that your app's processing latency will now go through the roof?  I
wrote about some such scenarios at
https://www.confluent.io/blog/distributed-real-time-joins-and-aggregations-on-user-activity-events-using-kafka-streams/
.

One big advantage (for many use cases, not for) with Kafka/Kafka Streams is
that you can leverage fault-tolerant *local* state that may also be
distributed across app instances.  Local state is much more efficient and
faster when doing stateful processing such as joins or aggregations.  You
don't need to worry about an external system, whether it's up and running,
whether its version is still compatible with your app, whether it can scale
as much as your app/Kafka Streams/Kafka/the volume of your input data.

Also, note that some users have actually opted to run hybrid setups:  Some
processing output is sent to a remote data store like Cassandra (e.g. via
Kafka Connect), some processing output is exposed directly through
interactive queries.  It's not like your forced to pick only one approach.


> - Typical microservices would separate storing / retrieving data

I'd rather argue that for microservices you'd oftentimes prefer to *not*
use a remote DB, and rather do everything inside your microservice whatever
the microservice needs to do (perhaps we could relax this to "do everything
in a way that your microservices is in full, exclusive control", i.e. it
doesn't necessarily need to be *inside*, but arguably it would be better if
it actually is).
See e.g. the article
https://www.confluent.io/blog/data-dichotomy-rethinking-the-way-we-treat-data-and-services/
that lists some of the reasoning behind this school of thinking.  Again,
YMMV.

Personally, I think there's no simple true/false here.  The decisions
depend on what you need, what your context is, etc.  Anyways, since you
already have some opinions for the one side, I wanted to share some food
for thought for the other side of the argument. :-)

Best,
Michael





On Fri, Mar 24, 2017 at 1:25 PM, Jon Yeargers 
wrote:

> If I understand this correctly: assuming I have a simple aggregator
> distributed across n-docker instances each instance will _also_ need to
> support some sort of communications process for allowing access to its
> statestore (last param from KStream.groupby.aggregate).
>
> How would one go about substituting a separated db (EG redis) for the
> statestore?
>
> Some advantages to decoupling:
> - It would seem like having a centralized process like this would alleviate
> the need to execute multiple requests for a given kv pair (IE "who has this
> data?" and subsequent requests to retrieve it).
> - it would take some pressure off of each node to maintain a large disk
> store
> - Typical microservices would separate storing / retrieving data
> - It would raise some eyebrows if a spec called for a mysql/nosql instance
> to be installed with every docker container
> - The tombstoning facilities of redis or C* would lend themselves well to
> implementing a 'true' rolling aggregation
>
> I get that RocksDB has a small footprint but given the choice of
> implementing my own RPC / gossip-like process for data sharing and using a
> well tested one (ala C* or redis) I would almost always opt for the latter.
> (Footnote: Our implementations already heavily use redis/memcached for
> deduplication of kafka messages so it would seem a small step to use the
> same to store aggregation results.)
>
> Just my $0.02. I would love to hear why Im missing the 'big picture'. The
> kstreams architecture seems rife with potential.
>
> On Thu, Mar 23, 2017 at 3:17 PM, Matthias J. Sax 
> wrote:
>
> > The config does not "do" 

even if i pass key no change in partition

2017-03-24 Thread Laxmi Narayan
Hi,

I am passing key in producer but still no change in partition.


I can see in producer response key value but no change in partition.

*This is how my props looks:*

props.put("bootstrap.servers",  "localhost:9092");
props.put("group.id",   "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms","1000");
props.put("session.timeout.ms", "3");
props.put("linger.ms",  "1");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("partitioner.class",
"org.apache.kafka.clients.producer.internals.DefaultPartitioner");



*This is my output : *
ConsumerRecord(topic = test, partition = 0, offset = 449, CreateTime =
1490359767112, checksum = 1987036294, serialized key size = 3, serialized
value size = 16, key = 172, value = hello world - 86)





*Regards,*
*Laxmi Narayan Patel*
*MCA NIT Durgapur (2011-2014)*
*Mob:-9741292048,8345847473*


Re: APPLICATION_SERVER_CONFIG ?

2017-03-24 Thread Jon Yeargers
If I understand this correctly: assuming I have a simple aggregator
distributed across n-docker instances each instance will _also_ need to
support some sort of communications process for allowing access to its
statestore (last param from KStream.groupby.aggregate).

How would one go about substituting a separated db (EG redis) for the
statestore?

Some advantages to decoupling:
- It would seem like having a centralized process like this would alleviate
the need to execute multiple requests for a given kv pair (IE "who has this
data?" and subsequent requests to retrieve it).
- it would take some pressure off of each node to maintain a large disk
store
- Typical microservices would separate storing / retrieving data
- It would raise some eyebrows if a spec called for a mysql/nosql instance
to be installed with every docker container
- The tombstoning facilities of redis or C* would lend themselves well to
implementing a 'true' rolling aggregation

I get that RocksDB has a small footprint but given the choice of
implementing my own RPC / gossip-like process for data sharing and using a
well tested one (ala C* or redis) I would almost always opt for the latter.
(Footnote: Our implementations already heavily use redis/memcached for
deduplication of kafka messages so it would seem a small step to use the
same to store aggregation results.)

Just my $0.02. I would love to hear why Im missing the 'big picture'. The
kstreams architecture seems rife with potential.

On Thu, Mar 23, 2017 at 3:17 PM, Matthias J. Sax 
wrote:

> The config does not "do" anything. It's metadata that get's broadcasted
> to other Streams instances for IQ feature.
>
> See this blog post for more details:
> https://www.confluent.io/blog/unifying-stream-processing-
> and-interactive-queries-in-apache-kafka/
>
> Happy to answer any follow up question.
>
>
> -Matthias
>
> On 3/23/17 11:51 AM, Jon Yeargers wrote:
> > What does this config param do?
> >
> > I see it referenced / used in some samples and here (
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 67%3A+Queryable+state+for+Kafka+Streams
> > )
> >
>
>


Re: Kafka high cpu usage and disconnects

2017-03-24 Thread Paul van der Linden
If I run 3 brokers in a cluster on localhost the cpu usage is virtually
zero. Not sure why on other environments the minimum usage of each broker
is at least 13% (with zero producers/consumers), that doesn't sound normal.

On Thu, Mar 23, 2017 at 4:48 PM, Paul van der Linden 
wrote:

> Doesn't seem to be the clients indeed. Maybe it already uses 13% of cpu on
> maintaining the cluster. With no connections at all, except zookeeper and
> the other 2 brokers. This is the cpu usage:
>
> CPU SAMPLES BEGIN (total = 86359) Thu Mar 23 16:47:26 2017
> rank   self  accum   count trace method
>1 87.23% 87.23%   75327 300920 sun.nio.ch.EPollArrayWrapper.epollWait
>2 12.48% 99.71%   10780 300518 java.net.PlainSocketImpl.socketAccept
>3  0.06% 99.77%  51 300940 sun.nio.ch.FileDispatcherImpl.write0
>4  0.02% 99.79%  20 301559 sun.nio.ch.FileDispatcherImpl.read0
>5  0.01% 99.80%  10 301567 org.apache.log4j.Category.
> getEffectiveLevel
> CPU SAMPLES END
>
> On Thu, Mar 23, 2017 at 1:02 PM, Jaikiran Pai 
> wrote:
>
>> One thing that you might want to check is the number of consumers that
>> are connected/consuming against this Kafka setup. We have consistently
>> noticed that the CPU usage of the broker is very high even with very few
>> consumers (around 10 Java consumers). There's even a JIRA for it. From what
>> I remember, it had to do with the constant hearbeat and other such network
>> activities that happen between these consumers and the broker. We had this
>> issues since 0.8.x days till 0.10.0.1. We just migrated to 0.10.2.0 and we
>> will have to see if it is still reproducible in there.
>>
>> I don't mean to say you are running into the same issue, but you can
>> check that aspect as well (maybe shutdown all consumers and see how the
>> broker CPU behaves).
>>
>> -Jaikiran
>>
>>
>> On Thursday 23 March 2017 06:15 PM, Paul van der Linden wrote:
>>
>>> Thanks. I managed to get a cpu dump from staging.
>>>
>>> The output:
>>> THREAD START (obj=5427, id = 24, name="RMI TCP Accept-0",
>>> group="system")
>>> THREAD START (obj=5427, id = 21, name="main", group="main")
>>> THREAD START (obj=5427, id = 25, name="SensorExpiryThread",
>>> group="main")
>>> THREAD START (obj=58e6, id = 26,
>>> name="ThrottledRequestReaper-Fetch", group="main")
>>> THREAD START (obj=58e6, id = 27,
>>> name="ThrottledRequestReaper-Produce", group="main")
>>> THREAD START (obj=5914, id = 28,
>>> name="ZkClient-EventThread-18-zookeeper:2181", group="main")
>>> THREAD START (obj=58e6, id = 29, name="main-SendThread()",
>>> group="main")
>>> THREAD START (obj=5950, id = 200010, name="main-EventThread",
>>> group="main")
>>> THREAD START (obj=5427, id = 200011, name="pool-3-thread-1",
>>> group="main")
>>> THREAD END (id = 200011)
>>> THREAD START (obj=5427, id = 200012,
>>> name="metrics-meter-tick-thread-1", group="main")
>>> THREAD START (obj=5427, id = 200014, name="kafka-scheduler-0",
>>> group="main")
>>> THREAD START (obj=5427, id = 200013, name="kafka-scheduler-1",
>>> group="main")
>>> THREAD START (obj=5427, id = 200015, name="kafka-scheduler-2",
>>> group="main")
>>> THREAD START (obj=5c33, id = 200016, name="kafka-log-cleaner-thread
>>> -0",
>>> group="main")
>>> THREAD START (obj=5427, id = 200017,
>>> name="kafka-network-thread-2-PLAINTEXT-0", group="main")
>>> THREAD START (obj=5427, id = 200018,
>>> name="kafka-network-thread-2-PLAINTEXT-1", group="main")
>>> THREAD START (obj=5427, id = 200019,
>>> name="kafka-network-thread-2-PLAINTEXT-2", group="main")
>>> THREAD START (obj=5427, id = 200020,
>>> name="kafka-socket-acceptor-PLAINTEXT-9092", group="main")
>>> THREAD START (obj=58e6, id = 200021, name="ExpirationReaper-2",
>>> group="main")
>>> THREAD START (obj=58e6, id = 200022, name="ExpirationReaper-2",
>>> group="main")
>>> THREAD START (obj=5427, id = 200023,
>>> name="metrics-meter-tick-thread-2", group="main")
>>> THREAD START (obj=5427, id = 200024, name="kafka-scheduler-3",
>>> group="main")
>>> THREAD START (obj=5427, id = 200025, name="kafka-scheduler-4",
>>> group="main")
>>> THREAD START (obj=5427, id = 200026, name="kafka-scheduler-5",
>>> group="main")
>>> THREAD START (obj=5427, id = 200027, name="kafka-scheduler-6",
>>> group="main")
>>> THREAD START (obj=58e6, id = 200028, name="ExpirationReaper-2",
>>> group="main")
>>> THREAD START (obj=58e6, id = 200029, name="ExpirationReaper-2",
>>> group="main")
>>> THREAD START (obj=58e6, id = 200030, name="ExpirationReaper-2",
>>> group="main")
>>> THREAD START (obj=5427, id = 200031, name="group-metadata-manager-0
>>> ",
>>> group="main")
>>> THREAD START (obj=5427, id = 200032, name="kafka-request-handler-0",
>>> group="main")
>>> THREAD START (obj=5427, id = 200037, name="kafka-request-handler-5",
>>> group="main")
>>> THREAD START 

Re: ORC plugin for Kafka HDFS connector

2017-03-24 Thread Manoj Murumkar
>> It wants to extract Avro schema from ORC record.

Should say: It wants to extract connect schema from ORC record.

On Thu, Mar 23, 2017 at 11:14 PM, Manoj Murumkar 
wrote:

> Hi,
>
> I am developing a connector to support ORC data type in HDFS connector.
> Everything is in place except for hive integration. Specifically, in the
> SchemaFileReader implementation. It wants to extract Avro schema from ORC
> record. However, I am unable to get record name from ORC record in order to
> build the Avro schema. Has anyone implemented this anywhere?
>
> This is what I am using for testing (bold information is missing in ORC):
>
> value.schema='{"type":"record",*"name":"orcrecord"*,"fields":[
> {"name":"name","type":"string"},{"name":"age","type":"int"}]}'
>
> Thanks,
>
> Manoj
>