Re: [DISCUSS] Streams DSL/StateStore Refactoring

2017-08-02 Thread Jan Filipiak

Hi Bill,

totally! So in the original discussion it was mentioned that the 
overloads are nasty when implementing new features. So we wanted to get 
rid of them. But what I felt was that the
copy & pasted code in the KTableProcessors for maintaining IQ stores was 
as big as a hurdle as the overloads.


With this proposal I try to shift things into the direction of getting 
IQ for free if
KTableValueGetterSupplier is properly implemented (like getting join for 
free then). Instead of having the code for maintaining IQ stores all the 
places. I realized I can do that while getting rid of the overloads, 
that makes me feel my proposal is superior.


Further I try to optimize by using as few stores as possible to give the 
user what he needs. That should save all sorts of resources while 
allowing faster rebalances.


The target ultimately is to only have KTableSource and the Aggregators 
maintain a Store and provide a ValueGetterSupplier.


Does this makes sense to you?

Best Jan

On 02.08.2017 18:09, Bill Bejeck wrote:

Hi Jan,

Thanks for the effort in putting your thoughts down on paper.

Comparing what I see from your proposal and what is presented in 
KIP-182, one of the main differences is the exclusion of 
an`Materialized`  instance in the `KTable` methods.


Can you go into more detail why this is so and the specific problems 
is avoids and or solves with this approach?


Thanks!
Bill

On Wed, Aug 2, 2017 at 4:19 AM, Damian Guy > wrote:


Hi Jan,

Thanks for taking the time to put this together, appreciated. For the
benefit of others would you mind explaining a bit about your
motivation?

Cheers,
Damian

On Wed, 2 Aug 2017 at 01:40 Jan Filipiak > wrote:

> Hi all,
>
> after some further discussions, the best thing to show my Idea
of how it
> should evolve would be a bigger mock/interface description.
> The goal is to reduce the store maintaining processors to only the
> Aggregators + and KTableSource. While having KTableSource optionally
> materialized.
>
> Introducing KTable:copy() will allow users to maintain state
twice if
> they really want to. KStream::join*() wasn't touched. I never
personally
> used that so I didn't feel
> comfortable enough touching it. Currently still making up my
mind. None
> of the suggestions made it querieable so far. Gouzhangs
'Buffered' idea
> seems ideal here.
>
> please have a look. Looking forward for your opinions.
>
> Best Jan
>
>
>
> On 21.06.2017 17 :24, Eno Thereska wrote:
> > (cc’ing user-list too)
> >
> > Given that we already have StateStoreSuppliers that are
configurable
> using the fluent-like API, probably it’s worth discussing the other
> examples with joins and serdes first since those have many
overloads and
> are in need of some TLC.
> >
> > So following your example, I guess you’d have something like:
> > .join()
> > .withKeySerdes(…)
> > .withValueSerdes(…)
> > .withJoinType(“outer”)
> >
> > etc?
> >
> > I like the approach since it still remains declarative and
it’d reduce
> the number of overloads by quite a bit.
> >
> > Eno
> >
> >> On Jun 21, 2017, at 3:37 PM, Damian Guy > wrote:
> >>
> >> Hi,
> >>
> >> I'd like to get a discussion going around some of the API
choices we've
> >> made in the DLS. In particular those that relate to stateful
operations
> >> (though this could expand).
> >> As it stands we lean heavily on overloaded methods in the
API, i.e,
> there
> >> are 9 overloads for KGroupedStream.count(..)! It is becoming
noisy and i
> >> feel it is only going to get worse as we add more optional
params. In
> >> particular we've had some requests to be able to turn caching
off, or
> >> change log configs,  on a per operator basis (note this can
be done now
> if
> >> you pass in a StateStoreSupplier, but this can be a bit
cumbersome).
> >>
> >> So this is a bit of an open question. How can we change the DSL
> overloads
> >> so that it flows, is simple to use and understand, and is easily
> extended
> >> in the future?
> >>
> >> One option would be to use a fluent API approach for
providing the
> optional
> >> params, so something like this:
> >>
> >> groupedStream.count()
> >>.withStoreName("name")
> >>.withCachingEnabled(false)
> >>.withLoggingEnabled(config)
> >>.table()
> >>
> >>
> >>
> >> Another option would be to provide a Builder to the count
method, so it
> >> would look something like this:
> >> 

Kafka TLS Authentication for brokers and clients (w/o Zookeeper Auth)

2017-08-02 Thread M. Manna
Hello,

>From Kafka Documentation - I understand that Authentication and encryption
can be enabled for inter-broker, broker-client exchanges. By exchanges, i
mean data transfer-related activities.

My questions are:

1) Is it common to have ONLY inter-broker and broker-client exchanges, but
only plain transfer between zookeeper? in other words, is it common to only
put authentication for brokers and clients, but not zookeepers.

2) If I also want to use authentication for zookeeper-zookeeper exchanges,
is there any known performance issues I should be concerned about?

I would be grateful if someone could provide a dummy example of having both
implemented. I can see kafka online documentation which shows self-signing
certificates and keystore usage for inter-broker and broker-client
authentication, but I cannot see much mentioning of zookeeper to broker
exchanges (may be i missed it).

Kindest Regards,
M. Manna


Re: "client" and requests in request.timeout.ms

2017-08-02 Thread Guozhang Wang
Hello Stevo,

Thanks for your inputs. I think your comments can be further categorized
into the following points:

1. Is config request.timeout.ms leaky of internal implementations?

Your observation that "request.timeout.ms" is related only to NetworkClient
is right, but I think that does not mean we should not expose this config
to users, just like "send.buffer.bytes" and "receive.buffer.bytes" are also
for the internal NetworkClient.

For client configs we have IMPORTANCE levels for such configs, and internal
configs like these are usually marked LOW meaning that most of time users
would not need to modify its default values.


2. NetworkClient is actually used both in broker and clients. Is the
javadoc out dated?

Yes I agree. Current NetworkClient is used for controller-to-broker
communication including shutdown request sending, and replica fetcher
consumer request sending. When request timed out the corresponding
destination nodes are marked as disconnected, and it's the caller's
responsibility to re-send the request and try to reconnect to the node.

Could you file a JIRA to update the javadoc of NetworkClient?


3. Could we consider having a different request timeout for different types
of requests?

The community have been discussing about this (e.g.
https://github.com/apache/kafka/pull/3503). Personally I think it makes
sense to have a longer request timeout for some request types which may be
parked by the broker at the purgatory for some long time before responding,
like Produce / Consume / JoinGroup requests, etc.

Guozhang




On Fri, Jul 28, 2017 at 1:12 AM, Stevo Slavić  wrote:

> Hello Apache Kafka community,
>
> In Consumer, Producer, AdminClient and Broker configuration documentation
> there's a common config property, request.timeout.ms, with common
> description part being:
> "The configuration controls the maximum amount of time the client will wait
> for the response of a request. If the response is not received before the
> timeout elapses the client will resend the request if necessary or fail the
> request if retries are exhausted."
>
> If I'm not mistaken "client" term in all the different request.timeout.ms
> config property descriptions actually refers to NetworkClient, which is
> kind of leaky internal abstraction. It seems there's no mentioning of
> NetworkClient on the Kafka documentation page. By it's javadoc
> NetworkClient is:
> "A network client for asynchronous request/response network i/o. This is an
> internal class used to implement the user-facing producer and consumer
> clients."
> Since it's considered to be internal class maybe it could be moved in
> "internal" package as other internal classes.
> More importantly NetworkClient javadoc (second sentence) is not entirely
> true, since NetworkClient is used on the broker side too, e.g. to exchange
> controlled shutdown request/response, which IMO has nothing to do with
> "user-facing producer and consumer clients". Because NetworkClient
> abstraction is used on the broker side, there's request.timeout.ms config
> property not only for producer/consumer but also in broker configuration.
>
> Can somebody please verify if my understanding of the current situation is
> correct?
>
> There's no mentioning in the Kafka documentation about which requests will
> be affected by tuning each of the request.timeout.ms config properties, or
> how if at all are different request timeouts related.
>
> Specifically I'd like to lower producer/consumer request timeout, so
> user-facing client requests like Produce/Fetch/Metadata should be affected,
> but e.g. controlled shutdown requests on the broker side should not. I'm
> not sure whether broker side request timeout can be left unchanged or if
> there's combination/chain of client and broker side request/responses that
> are related so that the request timeout settings have to be kept in sync. I
> guess maybe client side Produce request and broker side replica Fetch form
> kind of a chain/dependency - depending on acks Produce cannot finish
> successfully until enough of replicas got the message. Producer's "
> request.timeout.ms" description explains relationship with Broker's "
> replica.lag.time.max.ms" (potential produce failure or duplication of
> messages due to retries being negative side-effects) but relationship with
> Broker's "request.timeout.ms" is not covered. Similarly Consumer's Fetch
> request to lead broker seems can only retrieve messages replicated to rest
> of ISR set, so there's again kind of dependency on replica Fetch, this time
> dependency has not so negative side-effect, it seems there could be more
> empty reads if Consumer request timeout is lower than Broker's which is a
> tradeoff, lower latency of individual requests vs lower load / number of
> requests.
>
> Is there a reason that the producer/consumer request timeout are set to
> same default value as request timeout default on the broker side?
>
> Additionally, is there are reason 

Re: [DISCUSS] Streams DSL/StateStore Refactoring

2017-08-02 Thread Bill Bejeck
Hi Jan,

Thanks for the effort in putting your thoughts down on paper.

Comparing what I see from your proposal and what is presented in KIP-182,
one of the main differences is the exclusion of an`Materialized`  instance
in the `KTable` methods.

Can you go into more detail why this is so and the specific problems is
avoids and or solves with this approach?

Thanks!
Bill

On Wed, Aug 2, 2017 at 4:19 AM, Damian Guy  wrote:

> Hi Jan,
>
> Thanks for taking the time to put this together, appreciated. For the
> benefit of others would you mind explaining a bit about your motivation?
>
> Cheers,
> Damian
>
> On Wed, 2 Aug 2017 at 01:40 Jan Filipiak  wrote:
>
> > Hi all,
> >
> > after some further discussions, the best thing to show my Idea of how it
> > should evolve would be a bigger mock/interface description.
> > The goal is to reduce the store maintaining processors to only the
> > Aggregators + and KTableSource. While having KTableSource optionally
> > materialized.
> >
> > Introducing KTable:copy() will allow users to maintain state twice if
> > they really want to. KStream::join*() wasn't touched. I never personally
> > used that so I didn't feel
> > comfortable enough touching it. Currently still making up my mind. None
> > of the suggestions made it querieable so far. Gouzhangs 'Buffered' idea
> > seems ideal here.
> >
> > please have a look. Looking forward for your opinions.
> >
> > Best Jan
> >
> >
> >
> > On 21.06.2017 17:24, Eno Thereska wrote:
> > > (cc’ing user-list too)
> > >
> > > Given that we already have StateStoreSuppliers that are configurable
> > using the fluent-like API, probably it’s worth discussing the other
> > examples with joins and serdes first since those have many overloads and
> > are in need of some TLC.
> > >
> > > So following your example, I guess you’d have something like:
> > > .join()
> > > .withKeySerdes(…)
> > > .withValueSerdes(…)
> > > .withJoinType(“outer”)
> > >
> > > etc?
> > >
> > > I like the approach since it still remains declarative and it’d reduce
> > the number of overloads by quite a bit.
> > >
> > > Eno
> > >
> > >> On Jun 21, 2017, at 3:37 PM, Damian Guy  wrote:
> > >>
> > >> Hi,
> > >>
> > >> I'd like to get a discussion going around some of the API choices
> we've
> > >> made in the DLS. In particular those that relate to stateful
> operations
> > >> (though this could expand).
> > >> As it stands we lean heavily on overloaded methods in the API, i.e,
> > there
> > >> are 9 overloads for KGroupedStream.count(..)! It is becoming noisy
> and i
> > >> feel it is only going to get worse as we add more optional params. In
> > >> particular we've had some requests to be able to turn caching off, or
> > >> change log configs,  on a per operator basis (note this can be done
> now
> > if
> > >> you pass in a StateStoreSupplier, but this can be a bit cumbersome).
> > >>
> > >> So this is a bit of an open question. How can we change the DSL
> > overloads
> > >> so that it flows, is simple to use and understand, and is easily
> > extended
> > >> in the future?
> > >>
> > >> One option would be to use a fluent API approach for providing the
> > optional
> > >> params, so something like this:
> > >>
> > >> groupedStream.count()
> > >>.withStoreName("name")
> > >>.withCachingEnabled(false)
> > >>.withLoggingEnabled(config)
> > >>.table()
> > >>
> > >>
> > >>
> > >> Another option would be to provide a Builder to the count method, so
> it
> > >> would look something like this:
> > >> groupedStream.count(new
> > >> CountBuilder("storeName").withCachingEnabled(false).build())
> > >>
> > >> Another option is to say: Hey we don't need this, what are you on
> about!
> > >>
> > >> The above has focussed on state store related overloads, but the same
> > ideas
> > >> could  be applied to joins etc, where we presently have many join
> > methods
> > >> and many overloads.
> > >>
> > >> Anyway, i look forward to hearing your opinions.
> > >>
> > >> Thanks,
> > >> Damian
> >
> >
>


Re: increased response time for OffsetCommit requests

2017-08-02 Thread Gaurav Abbi
Hi Apurva,
For the ProduceRequest,

   - The increase is from 470 ms to around 1.004 s.
   - The average batch size (batch-size-avg) is around 320B.
   - The linger time is 10ms.

However, the 99th percentile for OffsetCommit has increased from 1.08 to
2.8 seconds.

Best Regards,
Gaurav Abbi

On Tue, Aug 1, 2017 at 7:32 PM, Apurva Mehta  wrote:

> Sorry to keep prodding you with questions, but can you quantify the
> increase for the ProduceRequest? What is the workload you are testing
> against: specificallly the batch size, message size, linger time settings
> of the producers in question?
>
> I ask because we benchmarked 0.11.0 against the older 0.10.0 message format
> and found no difference in performance between an 0.10.2 on the 0.10
> message format and 0.11.0 on the 0.10 message format.  Could you create a
> topic with the 0.10.0 message format and see if there is any degradation
> for the same workload?
>
> Thanks,
> Apurva
>
>
> On Tue, Aug 1, 2017 at 2:51 AM, Gaurav Abbi  wrote:
>
> > Hi Apurva,
> > There are increases in the *Produce* request also. It is not as
> substantial
> > as compared to *OffsetCommit. *For both of these requests, the major
> > contributor is Remote time.
> > A couple of other metrics that show different behavior post upgrade:
> >
> >1. *LogStartOffset*: It has drastically decreased.
> >2. *NumDelayedOperations: *It has dropped.
> >
> > These could be related or may be these are intended good changes in Kafka
> > 0.11.0.0 or one of the previous versions.
> >
> > Best Regards,
> > Gaurav Abbi
> >
> > On Tue, Aug 1, 2017 at 12:11 AM, Apurva Mehta 
> wrote:
> >
> > > Thanks for your response. Is it 200% only for the OffsetCommitRequest,
> or
> > > is it similar for all the requests?
> > >
> > >
> > > On Mon, Jul 31, 2017 at 12:48 PM, Gaurav Abbi 
> > > wrote:
> > >
> > > > Hi Apurva,
> > > > 1. The increase is about 200%.
> > > > 2. There is no increase in throughput. However,  this has caused in
> > error
> > > > rate and a decrease in the responses received per second.
> > > >
> > > >
> > > > One more thing to mention, we also upgraded to 0.11.0.0 client
> > libraries.
> > > > We are currently using old Producer and consumer APIs.
> > > >
> > > >
> > > >
> > > > Best Regards,
> > > > Gaurav Abbi
> > > >
> > > > On Mon, Jul 31, 2017 at 7:46 PM, Apurva Mehta 
> > > wrote:
> > > >
> > > > > How much is the increase? Is there any increase in throughput?
> > > > >
> > > > > On Mon, Jul 31, 2017 at 8:04 AM, Gaurav Abbi <
> abbi.gau...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi All,
> > > > > > We recently upgraded to Kafka 0.11.0.0 from 0.10.1.1.
> > > > > > Since then we have been observing increased latencies especially
> > > > > > OffsetCommit requests.
> > > > > > Looking at the server side metrics, it seems the culprit is the
> > > > Follower
> > > > > > time.
> > > > > >
> > > > > > We are using following
> > > > > > inter.broker.protocol.version: 0.11.0.0
> > > > > > log.message.format.version: 0.9.0.1
> > > > > >
> > > > > > Are there some possible pointers that we can explore to
> > troubleshoot
> > > > the
> > > > > > root cause?
> > > > > >
> > > > > > Best Regards,
> > > > > > Gaurav Abbi
> > > > > >
> > > > >
> > > >
> > >
> >
>


Re: Confluent Kafka 3.2.2 - rebalancing not happenning

2017-08-02 Thread Ismael Juma
Hi Karan,

I noticed that you posted this to the Confluent Google Group as well. Let's
discuss it over there. This mailing list is only for Apache Kafka.

Thanks,
Ismael

On Wed, Aug 2, 2017 at 7:11 AM, karan alang  wrote:

> Hello, here is the update ..
>
> when i ran script - kafka-preferred-replica-election, it did the
> re-election as required.
>
>
> ./bin/kafka-preferred-replica-election --zookeeper localhost:3181
>
> so does that mean that i need to run the  script ->
> ./bin/confluent-rebalancer
> to rebalance the data,
> but for the leader election, the script to be run is ->
> ./bin/kafka-preferred-replica-election
>
> The documentation mentions this  (link -
> http://docs.confluent.io/current/kafka/rebalancer/rebalancer.html)
>
> The confluent-rebalancer tool balances data so that the number of leaders
> > and disk usage are even across brokers and racks on a per topic and
> cluster
> > level while minimising data movement.
>
>
> seems there is a disconnect here, pls let me know if anyone has inputs.
>
>
> On Tue, Aug 1, 2017 at 4:45 PM, karan alang  wrote:
>
> > Hi All -
> > i'm trying to rebalance Kafka topic (refer link ->
> > http://docs.confluent.io/current/kafka/rebalancer/rebalancer.html), and
> > somehow the rebalancing is not working.
> >
> >
> > Here is what i'm doing ->
> > - i've 4 Kafka brokers & i've made changes to the server.properties file
> > to enable Confluent Metrics Reporter.
> > (attached are the server.properties of the 4 brokers)
> >
> > -> Created a topic specifying Replica assignment
> >
> > ./bin/kafka-topics --create --topic topic-a1 --replica-assignment
> > 0:1,0:1,0:1,0:1 --zookeeper nwk2-bdp-kafka-04.gdcs-qa.apple.com:3181
> >
> > -> describe topic
> >
> > ./bin/kafka-topics --describe --topic topic-a1 --zookeeper
> >> nwk2-bdp-kafka-04.gdcs-qa.apple.com:3181
> >> Topic:topic-a1 PartitionCount:4 ReplicationFactor:2 Configs:
> >> Topic: topic-a1 Partition: 0 Leader: 1 Replicas: 1,0 Isr: 0,1
> >> Topic: topic-a1 Partition: 1 Leader: 1 Replicas: 1,0 Isr: 0,1
> >> Topic: topic-a1 Partition: 2 Leader: 0 Replicas: 0,1 Isr: 0,1
> >> Topic: topic-a1 Partition: 3 Leader: 0 Replicas: 0,1 Isr: 0,1
> >>
> >
> >
> > -> Produce data into topics, using the following command
> >
> > ./bin/kafka-producer-perf-test --topic topic-a1 --num-records 20
> >> --record-size 1000 --throughput 1000 --producer-props
> bootstrap.servers=
> >> nwk2-bdp-kafka-04.gdcs-qa.apple.com:9092,nwk2-bdp-kafka-04.gdcs-
> >> qa.apple.com:9082,nwk2-bdp-kafka-04.gdcs-qa.apple.com:9072,
> >> nwk2-bdp-kafka-04.gdcs-qa.apple.com:9062
> >
> >
> >
> > -> Force Creation of offsets topic, by creating a Consumer (NOT SURE WHAT
> > THIS IS FOR ???) :
> >
> > ./bin/kafka-consumer-perf-test --topic topic-a1 --broker-list
> >> nwk2-bdp-kafka-04.gdcs-qa.apple.com:9092,nwk2-bdp-kafka-
> >> 04.gdcs-qa.apple.com:9082,nwk2-bdp-kafka-04.gdcs-qa.apple.com:9072,
> >> nwk2-bdp-kafka-04.gdcs-qa.apple.com:9062 --messages 10
> >
> >
> >
> > -> run the following command to rebalance
> >
> >>
> >> The plan that is presented does not really any rebalancing ->
> >
> > ./bin/confluent-rebalancer execute --zookeeper nwk2-bdp-kafka-04.gdcs-qa.
> >> apple.com:3181 --metrics-bootstrap-server nwk2-bdp-kafka-04.gdcs-qa.
> >> apple.com:9092,nwk2-bdp-kafka-04.gdcs-qa.apple.com:9082,nwk2
> >> -bdp-kafka-04.gdcs-qa.apple.com:9072,nwk2-bdp-kafka-04.
> >> gdcs-qa.apple.com:9062 --throttle 1000 --verbose
> >>
> >> Computing the rebalance plan (this may take a while) ...
> >> You are about to move 0 replica(s) for 0 partitions to 0 broker(s) with
> >> total size 0 MB.
> >> The preferred leader for 2 partition(s) will be changed.
> >> In total, the assignment for 2 partitions will be changed.
> >> The minimum free volume space is set to 20.0%.
> >> The following brokers will have less than 40% of free volume space
> during
> >> the rebalance:
> >> Broker Current Size (MB)  Size During Rebalance (MB)   Free % During
> >> Rebalance  Size After Rebalance (MB)Free % After Rebalance
> >> 0  4,021.14,021.1  14.2
> >> 4,021.1  14.2
> >> 1  1,240.81,240.8  14.2
> >> 1,240.8  14.2
> >> 2  620.4  620.414.2
> >> 620.414.2
> >> 3  0  014.2
> >> 014.2
> >> Min/max stats for brokers (before -> after):
> >> Type  Leader Count Replica CountSize
> >> (MB)
> >> Min   0 (id: 3) -> 0 (id: 3)   0 (id: 3) -> 0 (id: 3)   0 (id:
> >> 3) -> 0 (id: 3)
> >> Max   125 (id: 0) -> 123 (id: 0)   127 (id: 0) -> 127 (id: 0)   4,021.1
> >> (id: 0) -> 4,021.1 (id: 0)
> >> No racks are defined.
> >> Broker stats (before -> after):
> >> Broker Leader 

Re: [DISCUSS] Streams DSL/StateStore Refactoring

2017-08-02 Thread Damian Guy
Hi Jan,

Thanks for taking the time to put this together, appreciated. For the
benefit of others would you mind explaining a bit about your motivation?

Cheers,
Damian

On Wed, 2 Aug 2017 at 01:40 Jan Filipiak  wrote:

> Hi all,
>
> after some further discussions, the best thing to show my Idea of how it
> should evolve would be a bigger mock/interface description.
> The goal is to reduce the store maintaining processors to only the
> Aggregators + and KTableSource. While having KTableSource optionally
> materialized.
>
> Introducing KTable:copy() will allow users to maintain state twice if
> they really want to. KStream::join*() wasn't touched. I never personally
> used that so I didn't feel
> comfortable enough touching it. Currently still making up my mind. None
> of the suggestions made it querieable so far. Gouzhangs 'Buffered' idea
> seems ideal here.
>
> please have a look. Looking forward for your opinions.
>
> Best Jan
>
>
>
> On 21.06.2017 17:24, Eno Thereska wrote:
> > (cc’ing user-list too)
> >
> > Given that we already have StateStoreSuppliers that are configurable
> using the fluent-like API, probably it’s worth discussing the other
> examples with joins and serdes first since those have many overloads and
> are in need of some TLC.
> >
> > So following your example, I guess you’d have something like:
> > .join()
> > .withKeySerdes(…)
> > .withValueSerdes(…)
> > .withJoinType(“outer”)
> >
> > etc?
> >
> > I like the approach since it still remains declarative and it’d reduce
> the number of overloads by quite a bit.
> >
> > Eno
> >
> >> On Jun 21, 2017, at 3:37 PM, Damian Guy  wrote:
> >>
> >> Hi,
> >>
> >> I'd like to get a discussion going around some of the API choices we've
> >> made in the DLS. In particular those that relate to stateful operations
> >> (though this could expand).
> >> As it stands we lean heavily on overloaded methods in the API, i.e,
> there
> >> are 9 overloads for KGroupedStream.count(..)! It is becoming noisy and i
> >> feel it is only going to get worse as we add more optional params. In
> >> particular we've had some requests to be able to turn caching off, or
> >> change log configs,  on a per operator basis (note this can be done now
> if
> >> you pass in a StateStoreSupplier, but this can be a bit cumbersome).
> >>
> >> So this is a bit of an open question. How can we change the DSL
> overloads
> >> so that it flows, is simple to use and understand, and is easily
> extended
> >> in the future?
> >>
> >> One option would be to use a fluent API approach for providing the
> optional
> >> params, so something like this:
> >>
> >> groupedStream.count()
> >>.withStoreName("name")
> >>.withCachingEnabled(false)
> >>.withLoggingEnabled(config)
> >>.table()
> >>
> >>
> >>
> >> Another option would be to provide a Builder to the count method, so it
> >> would look something like this:
> >> groupedStream.count(new
> >> CountBuilder("storeName").withCachingEnabled(false).build())
> >>
> >> Another option is to say: Hey we don't need this, what are you on about!
> >>
> >> The above has focussed on state store related overloads, but the same
> ideas
> >> could  be applied to joins etc, where we presently have many join
> methods
> >> and many overloads.
> >>
> >> Anyway, i look forward to hearing your opinions.
> >>
> >> Thanks,
> >> Damian
>
>


Re: Adding new user to the broker dynamically

2017-08-02 Thread Manikumar
looks like some config error. Can you upload initial logs for both the
servers?
One user is sufficient inter broker communication.

On Wed, Aug 2, 2017 at 11:04 AM, Alexei Levashov <
alexei.levas...@arrayent.com> wrote:

> Hello Manikumar,
>
> I appreciate your advice , thank you.
>
> I tried to use SASL_PLAINTEXT with SCRAM enabled hoping that lack of SSL
> will help debugging (will switch to SASL_SSL later).
> I have 3 brokers running on one box with different ports
> listeners = SASL_PLAINTEXT://:9092
> listeners = SASL_PLAINTEXT://:9093
> listeners = SASL_PLAINTEXT://:9094
>
> 0. Changed broker.properties
> listeners = SASL_PLAINTEXT://:9093
>
> sasl.enabled.mechanisms = [SCRAM-SHA-256]
> sasl.mechanism.inter.broker.protocol = SCRAM-SHA-256
> security.inter.broker.protocol = SASL_PLAINTEXT
>
> 1.created admin user for the brokers
>  bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config
> 'SCRAM-SHA-256=password=admin-secret,SCRAM-SHA-512=password=admin-secret'
> --entity-type users --entity-name admin
>
> 2.created jaas.conf file in config dir :config/kafka_server_jaas.conf
>
>  KafkaServer {
> org.apache.kafka.common.security.plain.ScramLoginModule required
> username="admin"
> password="admin-secret"
> user_admin="admin-secret"
> user_alice="alice-secret";
> };
>
> 3. Added export
> KAFKA_OPTS="-Djava.security.auth.login.config=config/
> kafka_server_jaas.conf"
>
> But I can start only one broker, the moment I start second broker I am
> getting exceptions like these:
>
> [2017-08-02 04:30:36,733] DEBUG [Replica Manager on Broker 0]: Recording
> follower broker 1 log read results:
> ArrayBuffer((TNT_GRP_subgroup_getAttributeList_ACK-1,Fetch Data:
> [FetchDataInfo(0 [0 : 0],[],false,None)], HW: [0], leaderLogStartOffset:
> [0], leaderLogEndOffset: [0], followerLogStartOffset: [0], fetchTimeMs:
> [1501648236733], readSize: [1048576], error: [NONE]))
>  (kafka.server.ReplicaManager)
>
> [2017-08-02 04:30:36,803] DEBUG Accepted connection from /:58816 on
> /:9093 and assigned it to processor 2, sendBufferSize
> [actual|requested]: [102400|102400] recvBufferSize [actual|requested]:
> [102400|102400] (kafka.network.Acceptor)
> [2017-08-02 04:30:36,803] DEBUG Processor 2 listening to new connection
> from /:58816 (kafka.network.Processor)
> [2017-08-02 04:30:36,803] DEBUG Set SASL server state to HANDSHAKE_REQUEST
> (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator)
> [2017-08-02 04:30:36,803] DEBUG Handle Kafka request METADATA
> (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator)
> [2017-08-02 04:30:36,803] DEBUG Set SASL server state to FAILED
> (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator)
> [2017-08-02 04:30:36,803] DEBUG Connection with / disconnected
> (org.apache.kafka.common.network.Selector)
> java.io.IOException:
> org.apache.kafka.common.errors.IllegalSaslStateException: Unexpected Kafka
> request of type METADATA during SASL handshake.
> at
> org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.
> authenticate(SaslServerAuthenticator.java:247)
> at
> org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:76)
> at
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.
> java:374)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:326)
> at kafka.network.Processor.poll(SocketServer.scala:499)
> at kafka.network.Processor.run(SocketServer.scala:435)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.kafka.common.errors.IllegalSaslStateException:
> Unexpected Kafka request of type METADATA during SASL handshake.
> [2017-08-02 04:30:36,905] DEBUG Accepted connection from /:58823 on
> /:9093 and assigned it to processor 0, sendBufferSize
> [actual|requested]: [102400|102400] recvBufferSize [actual|requested]:
> [102400|102400] (kafka.network.Acceptor)
> [2017-08-02 04:30:36,905] DEBUG Processor 0 listening to new connection
> from /:58823 (kafka.network.Processor)
> [2017-08-02 04:30:36,905] DEBUG Set SASL server state to HANDSHAKE_REQUEST
> (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator)
> [2017-08-02 04:30:36,905] DEBUG Handle Kafka request METADATA
> (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator)
> [2017-08-02 04:30:36,905] DEBUG Set SASL server state to FAILED
> (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator)
> [2017-08-02 04:30:36,905] DEBUG Connection with / disconnected
> (org.apache.kafka.common.network.Selector)
> java.io.IOException:
> org.apache.kafka.common.errors.IllegalSaslStateException: Unexpected Kafka
> request of type METADATA during SASL handshake.
>
> Adding separate jaas.conf files for each broker with different users didn't
> change anything.
>
> Question - should each broker use separate user for inter broker
> communication? Or the reason for exceptions is broker set up on one IP?
> Any hints would be highly 

Re: Confluent Kafka 3.2.2 - rebalancing not happenning

2017-08-02 Thread karan alang
Hello, here is the update ..

when i ran script - kafka-preferred-replica-election, it did the
re-election as required.


./bin/kafka-preferred-replica-election --zookeeper localhost:3181

so does that mean that i need to run the  script -> ./bin/confluent-rebalancer
to rebalance the data,
but for the leader election, the script to be run is ->
./bin/kafka-preferred-replica-election

The documentation mentions this  (link -
http://docs.confluent.io/current/kafka/rebalancer/rebalancer.html)

The confluent-rebalancer tool balances data so that the number of leaders
> and disk usage are even across brokers and racks on a per topic and cluster
> level while minimising data movement.


seems there is a disconnect here, pls let me know if anyone has inputs.


On Tue, Aug 1, 2017 at 4:45 PM, karan alang  wrote:

> Hi All -
> i'm trying to rebalance Kafka topic (refer link ->
> http://docs.confluent.io/current/kafka/rebalancer/rebalancer.html), and
> somehow the rebalancing is not working.
>
>
> Here is what i'm doing ->
> - i've 4 Kafka brokers & i've made changes to the server.properties file
> to enable Confluent Metrics Reporter.
> (attached are the server.properties of the 4 brokers)
>
> -> Created a topic specifying Replica assignment
>
> ./bin/kafka-topics --create --topic topic-a1 --replica-assignment
> 0:1,0:1,0:1,0:1 --zookeeper nwk2-bdp-kafka-04.gdcs-qa.apple.com:3181
>
> -> describe topic
>
> ./bin/kafka-topics --describe --topic topic-a1 --zookeeper
>> nwk2-bdp-kafka-04.gdcs-qa.apple.com:3181
>> Topic:topic-a1 PartitionCount:4 ReplicationFactor:2 Configs:
>> Topic: topic-a1 Partition: 0 Leader: 1 Replicas: 1,0 Isr: 0,1
>> Topic: topic-a1 Partition: 1 Leader: 1 Replicas: 1,0 Isr: 0,1
>> Topic: topic-a1 Partition: 2 Leader: 0 Replicas: 0,1 Isr: 0,1
>> Topic: topic-a1 Partition: 3 Leader: 0 Replicas: 0,1 Isr: 0,1
>>
>
>
> -> Produce data into topics, using the following command
>
> ./bin/kafka-producer-perf-test --topic topic-a1 --num-records 20
>> --record-size 1000 --throughput 1000 --producer-props bootstrap.servers=
>> nwk2-bdp-kafka-04.gdcs-qa.apple.com:9092,nwk2-bdp-kafka-04.gdcs-
>> qa.apple.com:9082,nwk2-bdp-kafka-04.gdcs-qa.apple.com:9072,
>> nwk2-bdp-kafka-04.gdcs-qa.apple.com:9062
>
>
>
> -> Force Creation of offsets topic, by creating a Consumer (NOT SURE WHAT
> THIS IS FOR ???) :
>
> ./bin/kafka-consumer-perf-test --topic topic-a1 --broker-list
>> nwk2-bdp-kafka-04.gdcs-qa.apple.com:9092,nwk2-bdp-kafka-
>> 04.gdcs-qa.apple.com:9082,nwk2-bdp-kafka-04.gdcs-qa.apple.com:9072,
>> nwk2-bdp-kafka-04.gdcs-qa.apple.com:9062 --messages 10
>
>
>
> -> run the following command to rebalance
>
>>
>> The plan that is presented does not really any rebalancing ->
>
> ./bin/confluent-rebalancer execute --zookeeper nwk2-bdp-kafka-04.gdcs-qa.
>> apple.com:3181 --metrics-bootstrap-server nwk2-bdp-kafka-04.gdcs-qa.
>> apple.com:9092,nwk2-bdp-kafka-04.gdcs-qa.apple.com:9082,nwk2
>> -bdp-kafka-04.gdcs-qa.apple.com:9072,nwk2-bdp-kafka-04.
>> gdcs-qa.apple.com:9062 --throttle 1000 --verbose
>>
>> Computing the rebalance plan (this may take a while) ...
>> You are about to move 0 replica(s) for 0 partitions to 0 broker(s) with
>> total size 0 MB.
>> The preferred leader for 2 partition(s) will be changed.
>> In total, the assignment for 2 partitions will be changed.
>> The minimum free volume space is set to 20.0%.
>> The following brokers will have less than 40% of free volume space during
>> the rebalance:
>> Broker Current Size (MB)  Size During Rebalance (MB)   Free % During
>> Rebalance  Size After Rebalance (MB)Free % After Rebalance
>> 0  4,021.14,021.1  14.2
>> 4,021.1  14.2
>> 1  1,240.81,240.8  14.2
>> 1,240.8  14.2
>> 2  620.4  620.414.2
>> 620.414.2
>> 3  0  014.2
>> 014.2
>> Min/max stats for brokers (before -> after):
>> Type  Leader Count Replica CountSize
>> (MB)
>> Min   0 (id: 3) -> 0 (id: 3)   0 (id: 3) -> 0 (id: 3)   0 (id:
>> 3) -> 0 (id: 3)
>> Max   125 (id: 0) -> 123 (id: 0)   127 (id: 0) -> 127 (id: 0)   4,021.1
>> (id: 0) -> 4,021.1 (id: 0)
>> No racks are defined.
>> Broker stats (before -> after):
>> Broker Leader CountReplica Count   Size (MB)Free
>> Space (%)
>> 0  125 -> 123  127 -> 127  4,021.1 -> 4,021.1   14.2 ->
>> 14.2
>> 1  3 -> 5  12 -> 121,240.8 -> 1,240.8   14.2 ->
>> 14.2
>> 2  2 -> 2  3 -> 3  620.4 -> 620.4   14.2 ->
>> 14.2
>> 3  0 -> 0  0 -> 0  0 -> 0   14.2 ->
>> 14.2
>> Would you like to continue? (y/n): y
>> The rebalance has been