Re: Producer connect timeouts

2016-12-19 Thread Ewen Cheslack-Postava
Yes, this is something that we could consider fixing in Kafka itself.
Pretty much all timeouts can be customized if the defaults for the
OS/network are larger than make sense for the system. And given the large
default values for some of these timeouts, we probably don't want to rely
on the defaults.

-Ewen

On Mon, Dec 19, 2016 at 8:23 AM, Luke Steensen  wrote:

> Makes sense, thanks Ewen.
>
> Is this something we could consider fixing in Kafka itself? I don't think
> the producer is necessarily doing anything wrong, but the end result is
> certainly very surprising behavior. It would also be nice not to have to
> coordinate request timeouts, retries, and the max block configuration with
> system-level configs.
>
>
> On Sat, Dec 17, 2016 at 6:55 PM, Ewen Cheslack-Postava 
> wrote:
>
> > Without having dug back into the code to check, this sounds right.
> > Connection management just fires off a request to connect and then
> > subsequent poll() calls will handle any successful/failed connections.
> The
> > timeouts wrt requests are handled somewhat differently (the connection
> > request isn't explicitly tied to the request that triggered it, so when
> the
> > latter times out, we don't follow up and timeout the connection request
> > either).
> >
> > So yes, you currently will have connection requests tied to your
> underlying
> > TCP timeout request. This tends to be much more of a problem in public
> > clouds where the handshake request will be silently dropped due to
> firewall
> > rules.
> >
> > The metadata.max.age.ms is a workable solution, but agreed that it's not
> > great. If possible, reducing the default TCP connection timeout isn't
> > unreasonable either -- the defaults are set for WAN connections (and
> > arguably set for WAN connections of long ago), so much more aggressive
> > timeouts are reasonable for Kafka clusters.
> >
> > -Ewen
> >
> > On Fri, Dec 16, 2016 at 1:41 PM, Luke Steensen  > braintreepayments.com> wrote:
> >
> > > Hello,
> > >
> > > Is it correct that producers do not fail new connection establishment
> > when
> > > it exceeds the request timeout?
> > >
> > > Running on AWS, we've encountered a problem where certain very low
> volume
> > > producers end up with metadata that's sufficiently stale that they
> > attempt
> > > to establish a connection to a broker instance that has already been
> > > terminated as part of a maintenance operation. I would expect this to
> > fail
> > > and be retried normally, but it appears to hang until the system-level
> > TCP
> > > connection timeout is reached (2-3 minutes), with the writes themselves
> > > being expired before even a single attempt is made to send them.
> > >
> > > We've worked around the issue by setting `metadata.max.age.ms`
> extremely
> > > low, such that these producers are requesting new metadata much faster
> > than
> > > our maintenance operations are terminating instances. While this does
> > work,
> > > it seems like an unfortunate workaround for some very surprising
> > behavior.
> > >
> > > Thanks,
> > > Luke
> > >
> >
>


Re: TLS

2016-12-19 Thread Ewen Cheslack-Postava
Ruben,

There are step-by-step instructions explained here: http://docs.confluent.
io/3.1.1/kafka/security.html For the purposes of configuring Kafka, the
JAAS details basically boil down to a security configuration in a security
configuration file.

-Ewen

On Mon, Dec 19, 2016 at 8:40 AM, Ruben Poveda Teba  wrote:

> Hello,
>
> I'm trying set SSL/TLS between Kafka-Zookeeper-Kafka but in the
> documentation you explain that the security must set as JAAS, but what is
> JAAS and how configure it?
>
> Un saludo,
>
>
> Rubén Poveda Teba
> Security Infrastructures
> rpov...@sia.es
>


Re: [VOTE] 0.10.1.1 RC1

2016-12-19 Thread Vahid S Hashemian
Hi Guozhang,

I also verified the quickstart on Ubuntu and Mac. +1 on those.

On Windows OS there are a couple of issues for which the following PRs 
exist:
- https://github.com/apache/kafka/pull/2146 (already merged to trunk)
- https://github.com/apache/kafka/pull/2238 (open)

These issues are not specific to this RC. So they can be included in a 
future release.

Thanks again for running the release.

Regards.
--Vahid




From:   Jun Rao 
To: "users@kafka.apache.org" , 
"d...@kafka.apache.org" 
Date:   12/19/2016 02:47 PM
Subject:Re: [VOTE] 0.10.1.1 RC1



Hi, Guozhang,

Thanks for preparing the release. Verified quickstart. +1

Jun

On Thu, Dec 15, 2016 at 1:29 PM, Guozhang Wang  wrote:

> Hello Kafka users, developers and client-developers,
>
> This is the second, and hopefully the last candidate for the release of
> Apache Kafka 0.10.1.1 before the break. This is a bug fix release and it
> includes fixes and improvements from 30 JIRAs. See the release notes for
> more details:
>
> http://home.apache.org/~guozhang/kafka-0.10.1.1-rc1/RELEASE_NOTES.html
>
> *** Please download, test and vote by Tuesday, 20 December, 8pm PT ***
>
> Kafka's KEYS file containing PGP keys we use to sign the release:
> http://kafka.apache.org/KEYS
>
> * Release artifacts to be voted upon (source and binary):
> http://home.apache.org/~guozhang/kafka-0.10.1.1-rc1/
>
> * Maven artifacts to be voted upon:
> https://repository.apache.org/content/groups/staging/org/apache/kafka/
>
> NOTE the artifacts include the ones built from Scala 2.12.1 and Java8,
> which are treated a pre-alpha artifacts for the Scala community to try 
and
> test it out:
>
> https://repository.apache.org/content/groups/staging/org/
> apache/kafka/kafka_2.12/0.10.1.1/
>
> We will formally add the scala 2.12 support in future minor releases.
>
>
> * Javadoc:
> http://home.apache.org/~guozhang/kafka-0.10.1.1-rc1/javadoc/
>
> * Tag to be voted upon (off 0.10.0 branch) is the 0.10.0.1-rc0 tag:
> https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=
> c3638376708ee6c02dfe4e57747acae0126fa6e7
>
>
> Thanks,
> Guozhang
>
> --
> -- Guozhang
>






Re: WARN [ReplicaFetcherThread-1-1011], Error in fetch kafka.server.ReplicaFetcherThread$FetchRequest@5dad549c

2016-12-19 Thread Tony Liu
After a further debugging, I found the following error on broken node
(1011).

the error :
[2016-12-19 21:07:38,081] ERROR Error while accepting connection
(kafka.network.Acceptor)
java.io.IOException: Too many open files
at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
at
sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422)
at
sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250)
at kafka.network.Acceptor.accept(SocketServer.scala:326)
at kafka.network.Acceptor.run(SocketServer.scala:269)
at java.lang.Thread.run(Thread.java:745)

On Mon, Dec 19, 2016 at 4:33 PM, Tony Liu  wrote:

> ​Hi Experts,
>
> is there anyone run into this connection error ?
>
> [2016-12-17 20:13:32,728] WARN  [ReplicaFetcherThread-1-1011], Error in fetch 
> kafka.server.ReplicaFetcherThread$FetchRequest@5dad549c 
> (kafka.server.ReplicaFetcherThread)java.io.IOException: Connection to 
> 10.137.126.113:9092 (id: 1011 rack: null) failed
>   at 
> kafka.utils.NetworkClientBlockingOps$.awaitReady$1(NetworkClientBlockingOps.scala:83)
>   at 
> kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:93)
>   at 
> kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:248)
>   at 
> kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:238)
>   at 
> kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
>   at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:118)
>   at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:103)
>   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
>
>
>
> by the way, I check the source code, which said there are lots of reasons
> causing that error, so I am not clear about what the 'lots of reasons` are,
> so that I put effort to do further addressing.
>
> /**
>  * Invokes `client.send` followed by 1 or more `client.poll` invocations 
> until a response is received or a
>  * disconnection happens (which can happen for a number of reasons including 
> a request timeout).
>  *
>  * In case of a disconnection, an `IOException` is thrown.
>  *
>  * This method is useful for implementing blocking behaviour on top of the 
> non-blocking `NetworkClient`, use it with
>  * care.
>  */
> def blockingSendAndReceive(request: ClientRequest)(implicit time: JTime): 
> ClientResponse = {
>   client.send(request, time.milliseconds())
>
>   pollContinuously { responses =>
> val response = responses.find { response =>
>   response.request.request.header.correlationId == 
> request.request.header.correlationId
> }
> response.foreach { r =>
>   if (r.wasDisconnected) {
> val destination = request.request.destination
> throw new IOException(s"Connection to $destination was disconnected 
> before the response was read")
>   }
> }
> response
>   }
>
> }
>
>


Zookeeper configuration with ACL using SSL

2016-12-19 Thread Raghu B
Hi All,


I trying to enable ACL with SSL protocol and It is giving me below
exceptions(LEADER_NOT_AVAILABLE)

*[2016-12-19 16:16:47,078] WARN Error while fetching metadata with
correlation id 16 : {my-ssl-topic4=LEADER_NOT_AVAILABLE}
(org.apache.kafka.clients.NetworkClient)*

*[2016-12-19 16:16:47,231] WARN Error while fetching metadata with
correlation id 17 : {my-ssl-topic4=LEADER_NOT_AVAILABLE}
(org.apache.kafka.clients.NetworkClient)*


Did most of the debugging but no luck and I am able to send and receive
messages by using Super.User i.e when I configure in server.properties file


super.users=User:"CN=Unknown,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown"


but If I give any specific ACL's from command line, it is not working i.e

Eg: bin/kafka-acls.sh --authorizer-properties
zookeeper.connect=localhost:2181 --allow-principal User:ANONYMOUS
--allow-host 172.28.91.4 --consumer --topic my-ssl-topic4 --add --group
group4


Permissions given correctly and in kafka-authorizer.log I can see these
permissions


[2016-12-20 00:16:47,521] DEBUG Principal = User:ANONYMOUS is Allowed
Operation = Describe from host = 172.28.91.4 on resource =
Topic:my-ssl-topic4 (kafka.authorizer.logger)

[2016-12-20 00:16:47,522] DEBUG operation = Create on resource =
Cluster:kafka-cluster from host = 172.28.91.4 is Allow based on acl =
User:ANONYMOUS has Allow permission for operations: All from hosts:
172.28.91.4 (kafka.authorizer.logger)

[2016-12-20 00:16:47,522] DEBUG Principal = User:ANONYMOUS is Allowed
Operation = Create from host = 172.28.91.4 on resource =
Cluster:kafka-cluster (kafka.authorizer.logger)


Do I need to config any JAAS files for Zookeeper ?

Looks like I am missing something between *Kafka & Zookeeper *

Please suggest me, How can I Enable the ACL with SSL.


Thanks in advance.


WARN [ReplicaFetcherThread-1-1011], Error in fetch kafka.server.ReplicaFetcherThread$FetchRequest@5dad549c

2016-12-19 Thread Tony Liu
​Hi Experts,

is there anyone run into this connection error ?

[2016-12-17 20:13:32,728] WARN  [ReplicaFetcherThread-1-1011], Error
in fetch kafka.server.ReplicaFetcherThread$FetchRequest@5dad549c
(kafka.server.ReplicaFetcherThread)java.io.IOException: Connection to
10.137.126.113:9092 (id: 1011 rack: null) failed
at 
kafka.utils.NetworkClientBlockingOps$.awaitReady$1(NetworkClientBlockingOps.scala:83)
at 
kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:93)
at 
kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:248)
at 
kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:238)
at 
kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:118)
at 
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:103)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)



by the way, I check the source code, which said there are lots of reasons
causing that error, so I am not clear about what the 'lots of reasons` are,
so that I put effort to do further addressing.

/**
 * Invokes `client.send` followed by 1 or more `client.poll`
invocations until a response is received or a
 * disconnection happens (which can happen for a number of reasons
including a request timeout).
 *
 * In case of a disconnection, an `IOException` is thrown.
 *
 * This method is useful for implementing blocking behaviour on top of
the non-blocking `NetworkClient`, use it with
 * care.
 */
def blockingSendAndReceive(request: ClientRequest)(implicit time:
JTime): ClientResponse = {
  client.send(request, time.milliseconds())

  pollContinuously { responses =>
val response = responses.find { response =>
  response.request.request.header.correlationId ==
request.request.header.correlationId
}
response.foreach { r =>
  if (r.wasDisconnected) {
val destination = request.request.destination
throw new IOException(s"Connection to $destination was
disconnected before the response was read")
  }
}
response
  }

}


Re: Kafka SSL encryption plus external CA

2016-12-19 Thread Stephane Maarek
Thanks Rajini!

Also, I currently have each broker advertising as broker1.mydomain.com,
broker2.mydomain.com broker6.mydomain.com etc…
I have setup CNAME with round robin fashion to group brokers by
availability zone i.e. broker-a.mydomain.com broker-b.mydomain.com
broker-c.mydomain.com. I use them for setting up the bootstrap such as I
got high resiliency and don’t need to change the client code if I had or
remove or change brokers.

Do I need the bootstrap servers to match the wildcard of the certificate,
or is the SSL verification happening after we get the advertised hostnames
from the brokers?

Kind regards,
Stephane

[image: Simple Machines]

*Stephane Maarek* | Developer

+61 416 575 980
steph...@simplemachines.com.au
simplemachines.com.au
Level 2, 145 William Street, Sydney NSW 2010

On 20 December 2016 at 4:27:28 am, Rajini Sivaram (rajinisiva...@gmail.com)
wrote:

Stephane,

If you are using a trusted CA like Verisign, clients don't need to specify
a truststore. The host names specified in advertised.listeners in the
broker must match the wildcard DNS names in the certificates if clients
configure ssl.endpoint.identification.algorithm=https. If
ssl.endpoint.identification.algorithm is not specified, by default hostname
is not validated. It should be set to https however to prevent
man-in-the-middle attacks. There is an open JIRA to make this the default
in Kafka.

It makes sense to enable SSL in dev and prod to ensure that the code path
being run in dev is the same as in prod.



On Mon, Dec 19, 2016 at 3:50 AM, Stephane Maarek <
steph...@simplemachines.com.au> wrote:

> Hi,
>
> I have read the docs extensively but yet there are a few answers I can’t
> find. It has to do with external CA
> Please confirm my understanding if possible:
>
> I can create my own CA to sign all the brokers and clients certificates.
> Pros:
> - cheap, easy, automated. I need to find a way to access that CA
> programatically for new brokers if I want to automated their deployment,
> but I could use something like credstash or vault for that.
> Cons:
> - all of my clients needs to trust the CA. That means somehow find a way
> for my clients to get access to the CA using ca-cert and add it to their
> truststore… correct?
>
> I don’t really like the fact that I need to provide the CA cert file to
> every client. That seems quite hard to achieve, and prevents my users
from
> using the Kafka cluster directly. What’s the best way for the Kafka
clients
> to get access to the CA, while my users are doing dev, etc? Most of our
> applications run in Docker, which means we usually pass stuff around
using
> environment variables.
>
>
> My next idea was to use an external CA (like Verisign) to sign my
> certificate with a wildcard *.kafka.mydomain.com (A records pointing to
> internal IPs - the DNS name would be the advertised kafka hostname). My
> goal was then for the clients not to require to trust the CA because it
> would be automatically trusted? Do I have the correct understanding? Or
do
> I still need to add the external CA to the truststore of my clients?
> (basically I’m trying to reproduce the behaviour of what a web browser
> does).
>
>
> Finally, is it recommended to enable SSL in my dev Kafka cluster vs my
prod
> Kafka cluster, or to have SSL on each cluster?
>
> Thanks!
>
> Kind regards,
> Stephane
>



-- 
Regards,

Rajini


Re: Website Update, Part 2

2016-12-19 Thread Guozhang Wang
Hi Gwen,

I am about to have a separate side bar for the subsections, but a
site-level search bar is also a good idea. Just filed a JIRA to keep track
of this.



Guozhang


On Tue, Dec 13, 2016 at 6:16 PM, Gwen Shapira  wrote:

> Hi,
>
> Since we are breaking down the docs, we can no longer use ctrl-f to find
> where to find specific things we are looking for... maybe it is time to add
> a site search bar? I think google has something we can embed.
>
> On Tue, Dec 13, 2016 at 6:12 PM, Guozhang Wang  wrote:
>
> > Folks,
> >
> > We are continuing to improve our website, and one of it is to break the
> > single gigantic "documentation" page:
> >
> > https://kafka.apache.org/documentation/
> >
> > into sub-spaces and sub-pages for better visibility. As the first step of
> > this effort, we will be gradually extract each section of this page into
> a
> > separate page and then grow each one of them in their own sub-space.
> >
> > As of now, we have extract Streams section out of documentation as
> >
> > https://kafka.apache.org/documentation/streams
> >
> > while all the existing hashtags are preserved and re-directed via JS
> (many
> > thanks to Derrick!) so that we do not loose any SEO. At the same time I
> > have updated the "website doc contributions" wiki a bit with guidance on
> > locally displaying and debugging doc changes with this refactoring:
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Website+
> > Documentation+Changes
> >
> >
> > We are trying to do the same for Connect, Ops, Configs, APIs etc in the
> > near future. Any comments, improvements, and contributions are welcome
> and
> > encouraged.
> >
> >
> > --
> > -- Guozhang
> >
>
>
>
> --
> *Gwen Shapira*
> Product Manager | Confluent
> 650.450.2760 | @gwenshap
> Follow us: Twitter  | blog
> 
>



-- 
-- Guozhang


Re: [VOTE] 0.10.1.1 RC1

2016-12-19 Thread Jun Rao
Hi, Guozhang,

Thanks for preparing the release. Verified quickstart. +1

Jun

On Thu, Dec 15, 2016 at 1:29 PM, Guozhang Wang  wrote:

> Hello Kafka users, developers and client-developers,
>
> This is the second, and hopefully the last candidate for the release of
> Apache Kafka 0.10.1.1 before the break. This is a bug fix release and it
> includes fixes and improvements from 30 JIRAs. See the release notes for
> more details:
>
> http://home.apache.org/~guozhang/kafka-0.10.1.1-rc1/RELEASE_NOTES.html
>
> *** Please download, test and vote by Tuesday, 20 December, 8pm PT ***
>
> Kafka's KEYS file containing PGP keys we use to sign the release:
> http://kafka.apache.org/KEYS
>
> * Release artifacts to be voted upon (source and binary):
> http://home.apache.org/~guozhang/kafka-0.10.1.1-rc1/
>
> * Maven artifacts to be voted upon:
> https://repository.apache.org/content/groups/staging/org/apache/kafka/
>
> NOTE the artifacts include the ones built from Scala 2.12.1 and Java8,
> which are treated a pre-alpha artifacts for the Scala community to try and
> test it out:
>
> https://repository.apache.org/content/groups/staging/org/
> apache/kafka/kafka_2.12/0.10.1.1/
>
> We will formally add the scala 2.12 support in future minor releases.
>
>
> * Javadoc:
> http://home.apache.org/~guozhang/kafka-0.10.1.1-rc1/javadoc/
>
> * Tag to be voted upon (off 0.10.0 branch) is the 0.10.0.1-rc0 tag:
> https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=
> c3638376708ee6c02dfe4e57747acae0126fa6e7
>
>
> Thanks,
> Guozhang
>
> --
> -- Guozhang
>


Does kafka support reloading TLS certificates without downtime?

2016-12-19 Thread F21
I want to use Vault[1] to manage my TLS certificates. The certificates 
would be issued by Vault and have a short lifetime of around 72 hours. 
There would be a co-process to request a new certificate before expiry.


Does kafka provide any method to reload the TLS certificate without any 
down time?


Thanks,

Francis

[1]: https://www.vaultproject.io/



Re: KafkaStreams StateStore as EventStore (Event Sourcing)

2016-12-19 Thread Guozhang Wang
Hello Anatoly,

Jay's understanding about the time-space tradeoff in state store HA is
correct. As for your second question on "best practices": Streams
high-level DSL has a notion of KTable in addition to its first-class
KStream objects, which can be viewed as a materialized view of the stream,
which is optionally backed up by a state store. I think this is a natural
fit for event store use cases such that assuming your change events are
stored in a Kafka topic "topic1", you can then in your Streams app creates
a materialized view as:


KTable table1 = topologyBuilder.table("topic1", "store1");


Or you can construct an aggregated materialized view from the events of
streams (say in another topic "topic2") as:


KTable table2 =
topologyBuilder.stream("topic2").groupBy(...).aggregate(..., "store2");


Then the backing up state stores "store1" and "store2" can be viewed as the
current snapshots of the materialized view of the change events, and users
can query them simply as querying a normal store engine:

https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/


Guozhang

On Fri, Dec 16, 2016 at 11:27 AM, Jay Kreps  wrote:

> Good question! Here's my understanding.
>
> The streams API has a config num.standby.replicas. If this value is set to
> 0, the default, then the local state will have to be recreated by
> re-reading the relevant Kafka partition and replaying that into the state
> store, and as you point out this will take time proportional to the amount
> of data. If you set this value to something more than 0, then a "standby
> task" will be kept on one of the other instances. This standby won't do any
> processing it will just passively replicate the state changes of the
> primary task; in the event of a failure this standby task will be able to
> take over very quickly because it already has the full state pre-created.
>
> So you have a choice of redundancy in either "time" (by replaying data) or
> "space" (by storing multiple copies).
>
> (Hopefully that's correct, I don't have the firmest grasp on how the
> standby tasks work.)
>
> -Jay
>
> On Thu, Dec 15, 2016 at 6:10 PM, Anatoly Pulyaevskiy <
> anatoly.pulyaevs...@gmail.com> wrote:
>
> > Hi everyone,
> >
> > I've been reading a lot about new features in Kafka Streams and
> everything
> > looks very promising. There is even an article on Kafka and Event
> Sourcing:
> > https://www.confluent.io/blog/event-sourcing-cqrs-stream-
> > processing-apache-kafka-whats-connection/
> >
> > There are a couple of things that I'm concerned about though. For Event
> > Sourcing it is assumed that there is a way to fetch all events for a
> > particular object and replay them in order to get "latest snapshot" of
> that
> > object.
> >
> > It seems like (and the article says so) that StateStore in KafkaStreams
> can
> > be used to achieve that.
> >
> > My first question is would it scale well for millions of objects?
> > I understand that StateStore is backed by a compacted Kafka topic so in
> an
> > event of failure KafkaStreams will recover to the latest state by reading
> > all messages from that topic. But my suspicion is that for millions of
> > objects this may take a while (it would need to read the whole partition
> > for each object), is this a correct assumption?
> >
> > My second question is would it make more sense to use an external DB in
> > such case or is there a "best practice" around implementing Event
> Sourcing
> > and using Kafka's internal StateStore as EventStore?
> >
> > Thanks,
> > Anatoly
> >
>



-- 
-- Guozhang


Re: [VOTE] 0.10.1.1 RC1

2016-12-19 Thread Gwen Shapira
+1 (binding)

Validated signatures
Ran tests
Built from source distro
Tested binaries using the quickstart guide

Gwen

On Thu, Dec 15, 2016 at 1:29 PM, Guozhang Wang  wrote:
> Hello Kafka users, developers and client-developers,
>
> This is the second, and hopefully the last candidate for the release of
> Apache Kafka 0.10.1.1 before the break. This is a bug fix release and it
> includes fixes and improvements from 30 JIRAs. See the release notes for
> more details:
>
> http://home.apache.org/~guozhang/kafka-0.10.1.1-rc1/RELEASE_NOTES.html
>
> *** Please download, test and vote by Tuesday, 20 December, 8pm PT ***
>
> Kafka's KEYS file containing PGP keys we use to sign the release:
> http://kafka.apache.org/KEYS
>
> * Release artifacts to be voted upon (source and binary):
> http://home.apache.org/~guozhang/kafka-0.10.1.1-rc1/
>
> * Maven artifacts to be voted upon:
> https://repository.apache.org/content/groups/staging/org/apache/kafka/
>
> NOTE the artifacts include the ones built from Scala 2.12.1 and Java8,
> which are treated a pre-alpha artifacts for the Scala community to try and
> test it out:
>
> https://repository.apache.org/content/groups/staging/org/apache/kafka/kafka_2.12/0.10.1.1/
>
> We will formally add the scala 2.12 support in future minor releases.
>
>
> * Javadoc:
> http://home.apache.org/~guozhang/kafka-0.10.1.1-rc1/javadoc/
>
> * Tag to be voted upon (off 0.10.0 branch) is the 0.10.0.1-rc0 tag:
> https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=c3638376708ee6c02dfe4e57747acae0126fa6e7
>
>
> Thanks,
> Guozhang
>
> --
> -- Guozhang



-- 
Gwen Shapira
Product Manager | Confluent
650.450.2760 | @gwenshap
Follow us: Twitter | blog


Re: "support" topics for KStreams

2016-12-19 Thread Guozhang Wang
For changelog topics, their retention policy is based on the log compaction
by default, and users can customize it if necessary when creating the state
store (this is in trunk, not in released versions yet).

For repartition topics, their retention policy is based on whatever the
broker's default retention policies.

I suspect your main issue of relentless rebalancing is due to state stores
being restoring when one or more instances are rolling bounced, which
requires replaying the state store changelog topic from scratch. So if your
changelog topic is very long, it will take very long time to restore.

Guozhang

On Thu, Dec 15, 2016 at 12:54 PM, Jon Yeargers 
wrote:

> What's the retention settings for these (-changelog and
> -replication)? Im wondering about the relentless rebalancing issues Im
> facing and wondering if it has anything to do with consumers that lag too
> far behind.
>
> If I delete all the topics associated with a KStream project and restart it
> there are no rebalance issues. Everything is fast and responsive.
>
> Over the course of 6-10 hours of execution the rebalances take longer and
> longer until eventually the app(s) stop responding at all.
>
> Just curious.
>



-- 
-- Guozhang


Re: Consumers in the same group but consuming different topics?

2016-12-19 Thread Guozhang Wang
Hello Avi,

Similar questions have been discussed previously:

https://issues.apache.org/jira/browse/KAFKA-3775

One concern of doing this, though is that if you want to do the change
on-the-fly by rolling bounce (i.e. combining step 5 6 and 7) the instances
to switch back the assignor class, then there will be a period of time
while some instances are using the old assignor while some other are using
the new one, hence undefined behavior.

But since you are doing steps 5 6 7 consecutively I think it should be fine.


BTW regarding your first email, it actually is OK to have consumers in the
same group to be subscribing to different topics (e.g. consumerA
subscribing to topic 1 2 3, consumerB subscribing to topic 4 5 6), and the
topic partitions will then be assigned correspondingly if you are using the
consumer's default assignor, however in streams partition assignor we did
not do this yet.


Guozhang

On Thu, Dec 15, 2016 at 10:28 AM, Avi Flax  wrote:

>
> > On Dec 15, 2016, at 11:33, Damian Guy  wrote:
> >
> > Technically you can, but not without writing some code. If you want to
> use
> > consumer groups then you would need to write a custom PartitionAssignor
> and
> > configure it in your Consumer Config, like so:
> > consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
> > YourPartitionAssignor.class.getName());
> >
> > Alternatively you could use manual partition assignment to temporarily
> work
> > around it.
>
> Thanks Damian! Very helpful!
>
> As an alternative, would there be any adverse effects of changing the
> config of a Kafka Streams app to temporarily (say, for 30 minutes) consume
> from a smaller set of topics and then change it right back?
>
> To be clear, the workflow I have in mind is:
>
> 1. Stop app (gracefully)
> 2. Change config to have the app consume from a subset of its current
> topics
> 3. Start app
> 4. Wait ~30 minutes
> 5. Stop app (gracefully)
> 6. Revert config to the full set of topics
> 6. Start app
>
> Would there be any adverse effects, like lost local state, lost offsets,
> etc?
>
> It may be relevant that my app currently creates an instance of
> KafkaStreams for each topic it consumes from.
>
> Thanks!
> Avi
>
> 
> Software Architect @ Park Assist
> We’re hiring! http://tech.parkassist.com/jobs/
>



-- 
-- Guozhang


Re: RocksDB - no locks available exception

2016-12-19 Thread Guozhang Wang
Jon,


Re: "RocksDBException: IO error ... No locks available". We have nailed a
couple of rebalance issues that could cause the rocksDB lock file to be
released not in time recently in trunk, and by checking from your logs I
suspect you were hitting one of these issues. Could you try out the current
trunk in Kafka and see if they can still be re-produced?

Re: "Log end offset should not change while restoring". Unfortunately the
stack trace does not reveal which state store's restoration has this issue,
could you apply this patch and rerun your application (from trunk + this
patch) and send me the stack trace if you encounter this issue again?

https://github.com/apache/kafka/pull/2276

Guozhang

On Thu, Dec 15, 2016 at 6:54 AM, Jon Yeargers 
wrote:

> Attached is a debug log showing this exception.
>
> Question: is it typical to have so many disconnections from brokers?
>
> This log also includes the exception "Log end offset should not change
> while restoring"
>
>
>


-- 
-- Guozhang


Re: Another odd error

2016-12-19 Thread Guozhang Wang
Hi Jon,

When you "copied a new build up to one machine" did you mean that you swipe
in the new jar, and then bounce the instance?

Kafka Streams should naturally support online upgrading by simply rolling
bounce your instances, so I would not expect the scenarios you described to
happen. However, note that if you have lots of state stores it indeed will
cause them to be migrated and restored (i.e. replaying the whole changelog
to get the up-to-date state) on other instances when the current instance
is being bounced and then migrated back, which makes the rebalance itself
very long time.

To validate if the state restoration is taking most of the time in
rebalancing, you can turn on TRACE level logging and see if the restore
consumer is keep fetching records from the changelog topics, as in

"Returning fetched records at offset XXX for assigned partition
YYY-changelog and update position to ZZZ"



Guozhang


On Thu, Dec 15, 2016 at 3:54 AM, Jon Yeargers 
wrote:

> Update: the app ran well for several hours.. until I tried to update it. I
> copied a new build up to one machine (of five) and then we went back to
> near-endless-rebalance. After about an hour I ended up killing the other
> four instances and watching the first (new one). It took 90 minutes before
> it started consuming anything.
>
> This morning I copied / started two more. 60 minutes and still waiting for
> rebalance to conclude.
>
> Obviously its impractical to delete the topic(s) before updating the
> consumer software. What am I doing wrong thats causing all this waiting?
>
> On Wed, Dec 14, 2016 at 9:28 AM, Jon Yeargers 
> wrote:
>
> > In a turn of events - this morning I was about to throw in the proverbial
> > towel on Kafka. In a last ditch effort I killed all but one instance of
> my
> > app, put it back to a single thread (why offer the option if it's not
> > advised?) and deleted every last topic that had any relation to this app.
> >
> > I restarted it on a single machine and it magically worked. It's been
> > running for more than an hour now and hasn't been stuck in
> 'rebalance-land'
> > at all.
> >
> > I'll keep watching it and see how it goes.
> >
> > On Wed, Dec 14, 2016 at 6:13 AM, Damian Guy 
> wrote:
> >
> >> We do recommend one thread per instance of the app. However, it should
> >> also
> >> work with multiple threads.
> >> I can't debug the problem any further without the logs from the other
> >> apps.
> >> We'd need to try and see if another instance still has task 1_3 open ( i
> >> suspect it does )
> >>
> >> Thanks,
> >> Damian
> >>
> >> On Wed, 14 Dec 2016 at 13:20 Jon Yeargers 
> >> wrote:
> >>
> >> > What should I do about this? One thread per app?
> >> >
> >> > On Wed, Dec 14, 2016 at 4:11 AM, Damian Guy 
> >> wrote:
> >> >
> >> > > That is correct
> >> > >
> >> > > On Wed, 14 Dec 2016 at 12:09 Jon Yeargers  >
> >> > > wrote:
> >> > >
> >> > > > I have the app running on 5 machines. Is that what you mean?
> >> > > >
> >> > > > On Wed, Dec 14, 2016 at 1:38 AM, Damian Guy  >
> >> > > wrote:
> >> > > >
> >> > > > > Hi Jon,
> >> > > > >
> >> > > > > Do you have more than one instance of the app running? The
> reason
> >> i
> >> > ask
> >> > > > is
> >> > > > > because the task (task 1_3) that fails with the
> >> > > > > "java.lang.IllegalStateException" in this log is previously
> >> running
> >> > > as a
> >> > > > > Standby Task. This would mean the active task for this store
> would
> >> > have
> >> > > > > been running elsewhere, but i don't see that in the logs. The
> >> > exception
> >> > > > > occurs as StreamThread-1 starts to run task 1_3 as an active
> task.
> >> > The
> >> > > > > exception might indicate that another thread/instance is still
> >> > writing
> >> > > to
> >> > > > > the changelog topic for the State Store.
> >> > > > >
> >> > > > > Thanks,
> >> > > > > Damian
> >> > > > >
> >> > > > > On Tue, 13 Dec 2016 at 17:23 Jon Yeargers <
> >> jon.yearg...@cedexis.com>
> >> > > > > wrote:
> >> > > > >
> >> > > > > > As near as I can see it's rebalancing constantly.
> >> > > > > >
> >> > > > > > I'll up that value and see what happens.
> >> > > > > >
> >> > > > > > On Tue, Dec 13, 2016 at 9:04 AM, Damian Guy <
> >> damian@gmail.com>
> >> > > > > wrote:
> >> > > > > >
> >> > > > > > > Hi Jon,
> >> > > > > > >
> >> > > > > > > I haven't had much of a chance to look at the logs in detail
> >> too
> >> > > much
> >> > > > > > yet,
> >> > > > > > > but i have noticed that your app seems to be rebalancing
> >> > > frequently.
> >> > > > > It
> >> > > > > > > seems that it is usually around the 300 second mark, which
> >> > usually
> >> > > > > would
> >> > > > > > > mean that poll hasn't been called for at least that long.
> You
> >> > might
> >> > > > > want
> >> > > > > > to
> >> > > > > > > try setting the config 

Re: KStreams app - frequent broker dis/re connects

2016-12-19 Thread Guozhang Wang
Jon,

Could you share your observe debug logs here? BTW note that if the
application's topology is complex or if you start multiple instance / or
threads at the same time, then it may take multiple iterations of
rebalances in the worst case to eventually get the assignment among all the
threads.


Guozhang

On Tue, Dec 13, 2016 at 8:49 AM, Jon Yeargers 
wrote:

> Watching the debug output on an app - wondering why it spends nearly all of
> its time rebalancing. Noticed that it seems to drop / recreate connections
> to brokers pretty frequently. No error messages to speak of though.
>
> Connect / timeout / related settings in the consumer are all default.
>
> How much connection-thrashing is considered typical?
>



-- 
-- Guozhang


Re: Kafka ACL's with SSL Protocol is not working

2016-12-19 Thread Raghu B
Thanks Rajani for the above Info but I want to restrict a user from
performing all the operations (I think that defines ACL), I just want
User_1 to produce messages and User_2 to consume messages.

How can we achieve that.

Thanks in advance

On Mon, Dec 19, 2016 at 3:13 AM, Rajini Sivaram 
wrote:

> Raghu,
>
> It could be because the principal used for inter broker communication
> doesn't have all the necessary permissions. If you are using PLAINTEXT for
> inter-broker, the principal is ANONYMOUS, if using SSL, it would be similar
> to the one you are setting for client. You can configure broker principal
> as super.users to give full access.
>
> On Fri, Dec 16, 2016 at 10:16 PM, Raghu B  wrote:
>
> > Thank you Rajani, your suggestion is really helpful.
> >
> >
> > [2016-12-16 21:55:36,720] DEBUG Principal =
> > User:CN=writeuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown is
> > Allowed Operation = Create from host = 172.28.89.63 on resource =
> > Cluster:kafka-cluster (kafka.authorizer.logger)
> >
> > Finally I am getting the user as exactly what I set in my SSL-Cert (Not
> > Anonymous).
> >
> > But, I am getting another Error i.e
> >
> >
> > [2016-12-16 13:55:36,449] WARN Error while fetching metadata with
> > correlation id 45 : {my-ssl-topic=LEADER_NOT_AVAILABLE}
> > (org.apache.kafka.clients.NetworkClient)
> > [2016-12-16 13:55:36,609] WARN Error while fetching metadata with
> > correlation id 46 : {my-ssl-topic=LEADER_NOT_AVAILABLE}
> > (org.apache.kafka.clients.NetworkClient)
> > [2016-12-16 13:55:36,766] WARN Error while fetching metadata with
> > correlation id 47 : {my-ssl-topic=LEADER_NOT_AVAILABLE}
> > (org.apache.kafka.clients.NetworkClient)
> >
> >
> > I created the topic and my kafka node is working without any issues (I
> > restarted several time)
> >
> > [raghu@Kafka-238343-1-33109167 kafka_2.11-0.10.1.0]$
> *bin/kafka-topics.sh
> > --describe --zookeeper localhost:2181 --topic my-ssl-topic*
> >
> > Topic:my-ssl-topic PartitionCount:1 ReplicationFactor:1 Configs:
> > Topic: my-ssl-topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
> >
> > Thanks in advance,
> > Raghu
> >
> >
> > On Fri, Dec 16, 2016 at 1:30 AM, Rajini Sivaram 
> > wrote:
> >
> > > You need to set ssl.client.auth="required" in server.properties.
> > >
> > > Regards,
> > >
> > > Rajini
> > >
> > > On Wed, Dec 14, 2016 at 12:12 AM, Raghu B 
> wrote:
> > >
> > > > Hi All,
> > > >
> > > > I am trying to enable ACL's in my Kafka cluster with along with SSL
> > > > Protocol.
> > > >
> > > > I tried with each and every parameters but no luck, so I need help to
> > > > enable the SSL(without Kerberos) and I am attaching all the
> > configuration
> > > > details in this.
> > > >
> > > > Kindly Help me.
> > > >
> > > >
> > > > *I tested SSL without ACL, it worked fine
> > > > (listeners=SSL://10.247.195.122:9093 )*
> > > >
> > > >
> > > > *This is my Kafka server properties file:*
> > > >
> > > > *# ACL SETTINGS
> > > #*
> > > >
> > > > *auto.create.topics.enable=true*
> > > >
> > > > *authorizer.class.name
> > > > =kafka.security.auth.
> > SimpleAclAuthorizer*
> > > >
> > > > *security.inter.broker.protocol=SSL*
> > > >
> > > > *#allow.everyone.if.no.acl.found=true*
> > > >
> > > > *#principal.builder.class=CustomizedPrincipalBuilderClass*
> > > >
> > > > *#super.users=User:"CN=writeuser,OU=Unknown,O=
> > > > Unknown,L=Unknown,ST=Unknown,C=Unknown"*
> > > >
> > > > *#super.users=User:Raghu;User:Admin*
> > > >
> > > > *#offsets.storage=kafka*
> > > >
> > > > *#dual.commit.enabled=true*
> > > >
> > > > *listeners=SSL://10.247.195.122:9093 *
> > > >
> > > > *#listeners=PLAINTEXT://10.247.195.122:9092 <
> > http://10.247.195.122:9092
> > > >*
> > > >
> > > > *#listeners=PLAINTEXT://10.247.195.122:9092
> > > > ,SSL://10.247.195.122:9093
> > > > *
> > > >
> > > > *#advertised.listeners=PLAINTEXT://10.247.195.122:9092
> > > > *
> > > >
> > > >
> > > > *
> > > > ssl.keystore.location=/home/raghu/kafka/security/server.
> keystore.jks*
> > > >
> > > > *ssl.keystore.password=123456*
> > > >
> > > > *ssl.key.password=123456*
> > > >
> > > > *
> > > > ssl.truststore.location=/home/raghu/kafka/security/server.
> > > truststore.jks*
> > > >
> > > > *ssl.truststore.password=123456*
> > > >
> > > >
> > > >
> > > > *Set the ACL from Authorizer CLI:*
> > > >
> > > > > *bin/kafka-acls.sh --authorizer-properties
> > > > zookeeper.connect=10.247.195.122:2181 
> > > --list
> > > > --topic ssltopic*
> > > >
> > > > *Current ACLs for resource `Topic:ssltopic`: *
> > > >
> > > > *  User:CN=writeuser, OU=Unknown, O=Unknown, L=Unknown, ST=Unknown,
> > > > C=Unknown has Allow permission for 

Re: Kafka 0.9-Java : consumer skipping offsets during application restart

2016-12-19 Thread Marina
I'm very interested in the answer to this question as well - if the offsets are 
not preserved over the app re-start, we coudl be looking into a sizable data 
loss 
thanks!Marina


  From: Dhyan Muralidharan 
 To: users@kafka.apache.org 
 Sent: Friday, December 16, 2016 8:29 AM
 Subject: Kafka 0.9-Java : consumer skipping offsets during application restart
   
Hi,

  Can some one look at the below question and see if you can help ? I've
tried to explain the issue in stack overflow .

http://stackoverflow.com/q/41177614/3705186

--Dhyan


   

Re: The connection between kafka and zookeeper is often closed byzookeeper, lead to NotLeaderForPartitionException: This server is not theleader for that topic-partition.

2016-12-19 Thread Guozhang Wang
Xiaoyuan,

I am not an expert in ZK so here is what I can tell:

1. "NotLeaderForPartitionException" is not usually thrown when ZK
connection timed out, it is thrown when the produce requests has arrived
the broker but the brokers think themselves as not the leader (any more)
for the requested partitions. So it usually indicates that the partition
leaders has migrated.

2. It is generally suggested to not co-located the Kafka brokers and
Zookeepers on the same machines, since both are memory and IO-consumption
heavy applications and hence could increase long GCs, which can also
causing ZK or Kafka brokers "stalls". See this for more details:

https://kafka.apache.org/documentation/#zk

2. Without the version information of the brokers and clients I cannot tell
further what could be the issue.


Guozhang


On Thu, Dec 15, 2016 at 5:17 PM, Xiaoyuan Chen <253441...@qq.com> wrote:

> Any solution?
>
>
>
>
> -- 原始邮件 --
> 发件人: "Xiaoyuan Chen"<253441...@qq.com>;
> 发送时间: 2016年12月9日(星期五) 上午10:15
> 收件人: "users";
> 主题: The connection between kafka and zookeeper is often closed
> byzookeeper, lead to NotLeaderForPartitionException: This server is not
> theleader for that topic-partition.
>
>
>
> Hi guys,
>
> Situation:
>   3 nodes, each 32G memory, CPU 24 cores, 1T hd.
>   3 brokers on 3 nodes, and 3 zookeeper on these 3 nodes too, all the
> properties are default, start the zookeeper cluster and kafka cluster.
>   Create a topic (3 replications, 6 partions), like below:
> bin/kafka-topics.sh --create --zookeeper hw168:2181
> --replication-factor 3 --partitions 6 --topic test
>   And run the ProducerPerformance given by kafka on the two nodes at the
> same time, it means we have two producers, command like below:
> bin/kafka-run-class.sh org.apache.kafka.tools.ProducerPerformance
> --topic test --num-records 1 --record-size 100 --throughput -1
> --producer-props bootstrap.servers=hw168:9092 buffer.memory=67108864
> batch.size=65536 acks=1
>
> Problem:
>   We can see from the producer, a lot of  NotLeaderForPartitionException:
> org.apache.kafka.common.errors.NotLeaderForPartitionException: This
> server is not the leader for that topic-partition.
> org.apache.kafka.common.errors.NotLeaderForPartitionException: This
> server is not the leader for that topic-partition.
> org.apache.kafka.common.errors.NotLeaderForPartitionException: This
> server is not the leader for that topic-partition.
> …
>
> Track the process (by using DEBUG):
>   There is a INFO:
> INFO Client session timed out, have not heard from server in 11647ms
> for sessionid 0x258de4a26a4, closing socket connection and attempting
> reconnect (org.apache.zookeeper.ClientCnxn)
>
>   And We found that the connection between zkClient (kafka holds) and
> zookeeper server is closed by zookeeper server, the reason is that time is
> out, for details:
> [2016-12-08 20:24:00,547] DEBUG Partition [test,5] on broker 1:
> Skipping update high watermark since Old hw 15986847 [8012779 : 1068525112]
> is larger than new hw 15986847 [8012779 : 1068525112] for partition
> [test,5]. All leo's are 16566175 [16025299 : 72477384],15986847 [8012779 :
> 1068525112],16103549 [16025299 : 10485500] (kafka.cluster.Partition)
> [2016-12-08 20:24:00,547] DEBUG Adding index entry 16566161 =>
> 72475508 to 16025299.index. (kafka.log.OffsetIndex)
> [2016-12-08 20:24:11,368] DEBUG [Replica Manager on Broker 1]: Request
> key test-2 unblocked 0 fetch requests. (kafka.server.ReplicaManager)
> [2016-12-08 20:24:11,368] DEBUG Partition [test,2] on broker 1:
> Skipping update high watermark since Old hw 16064424 [16025299 : 5242750]
> is larger than new hw 16064424 [16025299 : 5242750] for partition [test,2].
> All leo's are 16566175 [16025299 : 72477384],16205274 [16025299 :
> 24116650],16064424 [16025299 : 5242750] (kafka.cluster.Partition)
> [2016-12-08 20:24:11,369] DEBUG [Replica Manager on Broker 1]: Produce
> to local log in 10821 ms (kafka.server.ReplicaManager)
> [2016-12-08 20:24:11,369] INFO Client session timed out, have not
> heard from server in 11647ms for sessionid 0x258de4a26a4, closing
> socket connection and attempting reconnect (org.apache.zookeeper.
> ClientCnxn)
>
>   Please watch the time, the there is no DEBUG between 20:24:00,547 and
> 20:24:11,368, it already exceeded the time for session timeout (6000ms), so
> it causes this disconnection.  We keep digging:
>   We found that it got stuck in the function:
>   selector.select(waitTimeOut); — in the method doTransport(…) in
> class org.apache.zookeeper.ClientCnxnSocketNIO
>   and that is the time ww got no DEBUG.
>   For more details, Call procedure (zookeeper client):
>   org.apache.zookeeper.ClientCnxn -> run() -> doTransport(..)
>   In the function run(), every time it will check whether there is a
> timeout, if not, it will run doTransport, but the doTransport costs 

Kafka controlled shutdown hangs when there are large number of topics in the cluster

2016-12-19 Thread Robin, Martin (Nokia - IN/Bangalore)
Hi

We have 9 broker instances in a kafka cluster spread across 3 linux machines. 
The 1st machine has 4 broker instances. 2nd  machine has 4 broker instances and 
3rd one has 1 broker instance.  There are around 101 topics created in the 
cluster

We start the broker as follows
All 4 brokers are started on first machine
All 4 brokers are started on 2nd machine
1 broker started on 3rd machine

After brokers were running for sometime, we try to shutdown the brokers as below
All 4 brokers stopped on 1st machine
4 brokers are stopped on 2nd machine While we do this kafka 
controlled shutdown hangs

This same issue was not seen with 25 topics.

Please let us know if any solution is known to this issue

Thanks
Martin






Re: Kafka ACL's with SSL Protocol is not working

2016-12-19 Thread Rajini Sivaram
Raghu,

It could be because the principal used for inter broker communication
doesn't have all the necessary permissions. If you are using PLAINTEXT for
inter-broker, the principal is ANONYMOUS, if using SSL, it would be similar
to the one you are setting for client. You can configure broker principal
as super.users to give full access.

On Fri, Dec 16, 2016 at 10:16 PM, Raghu B  wrote:

> Thank you Rajani, your suggestion is really helpful.
>
>
> [2016-12-16 21:55:36,720] DEBUG Principal =
> User:CN=writeuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown is
> Allowed Operation = Create from host = 172.28.89.63 on resource =
> Cluster:kafka-cluster (kafka.authorizer.logger)
>
> Finally I am getting the user as exactly what I set in my SSL-Cert (Not
> Anonymous).
>
> But, I am getting another Error i.e
>
>
> [2016-12-16 13:55:36,449] WARN Error while fetching metadata with
> correlation id 45 : {my-ssl-topic=LEADER_NOT_AVAILABLE}
> (org.apache.kafka.clients.NetworkClient)
> [2016-12-16 13:55:36,609] WARN Error while fetching metadata with
> correlation id 46 : {my-ssl-topic=LEADER_NOT_AVAILABLE}
> (org.apache.kafka.clients.NetworkClient)
> [2016-12-16 13:55:36,766] WARN Error while fetching metadata with
> correlation id 47 : {my-ssl-topic=LEADER_NOT_AVAILABLE}
> (org.apache.kafka.clients.NetworkClient)
>
>
> I created the topic and my kafka node is working without any issues (I
> restarted several time)
>
> [raghu@Kafka-238343-1-33109167 kafka_2.11-0.10.1.0]$ *bin/kafka-topics.sh
> --describe --zookeeper localhost:2181 --topic my-ssl-topic*
>
> Topic:my-ssl-topic PartitionCount:1 ReplicationFactor:1 Configs:
> Topic: my-ssl-topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
>
> Thanks in advance,
> Raghu
>
>
> On Fri, Dec 16, 2016 at 1:30 AM, Rajini Sivaram 
> wrote:
>
> > You need to set ssl.client.auth="required" in server.properties.
> >
> > Regards,
> >
> > Rajini
> >
> > On Wed, Dec 14, 2016 at 12:12 AM, Raghu B  wrote:
> >
> > > Hi All,
> > >
> > > I am trying to enable ACL's in my Kafka cluster with along with SSL
> > > Protocol.
> > >
> > > I tried with each and every parameters but no luck, so I need help to
> > > enable the SSL(without Kerberos) and I am attaching all the
> configuration
> > > details in this.
> > >
> > > Kindly Help me.
> > >
> > >
> > > *I tested SSL without ACL, it worked fine
> > > (listeners=SSL://10.247.195.122:9093 )*
> > >
> > >
> > > *This is my Kafka server properties file:*
> > >
> > > *# ACL SETTINGS
> > #*
> > >
> > > *auto.create.topics.enable=true*
> > >
> > > *authorizer.class.name
> > > =kafka.security.auth.
> SimpleAclAuthorizer*
> > >
> > > *security.inter.broker.protocol=SSL*
> > >
> > > *#allow.everyone.if.no.acl.found=true*
> > >
> > > *#principal.builder.class=CustomizedPrincipalBuilderClass*
> > >
> > > *#super.users=User:"CN=writeuser,OU=Unknown,O=
> > > Unknown,L=Unknown,ST=Unknown,C=Unknown"*
> > >
> > > *#super.users=User:Raghu;User:Admin*
> > >
> > > *#offsets.storage=kafka*
> > >
> > > *#dual.commit.enabled=true*
> > >
> > > *listeners=SSL://10.247.195.122:9093 *
> > >
> > > *#listeners=PLAINTEXT://10.247.195.122:9092 <
> http://10.247.195.122:9092
> > >*
> > >
> > > *#listeners=PLAINTEXT://10.247.195.122:9092
> > > ,SSL://10.247.195.122:9093
> > > *
> > >
> > > *#advertised.listeners=PLAINTEXT://10.247.195.122:9092
> > > *
> > >
> > >
> > > *
> > > ssl.keystore.location=/home/raghu/kafka/security/server.keystore.jks*
> > >
> > > *ssl.keystore.password=123456*
> > >
> > > *ssl.key.password=123456*
> > >
> > > *
> > > ssl.truststore.location=/home/raghu/kafka/security/server.
> > truststore.jks*
> > >
> > > *ssl.truststore.password=123456*
> > >
> > >
> > >
> > > *Set the ACL from Authorizer CLI:*
> > >
> > > > *bin/kafka-acls.sh --authorizer-properties
> > > zookeeper.connect=10.247.195.122:2181 
> > --list
> > > --topic ssltopic*
> > >
> > > *Current ACLs for resource `Topic:ssltopic`: *
> > >
> > > *  User:CN=writeuser, OU=Unknown, O=Unknown, L=Unknown, ST=Unknown,
> > > C=Unknown has Allow permission for operations: Write from hosts: * *
> > >
> > >
> > > *XXXWMXXX-7:kafka_2.11-0.10.1.0 rbaddam$ bin/kafka-console-producer.sh
> > > --broker-list 10.247.195.122:9093  --topic
> > > ssltopic --producer.config client-ssl.properties*
> > >
> > >
> > > *[2016-12-13 14:53:45,839] WARN Error while fetching metadata with
> > > correlation id 0 : {ssltopic=UNKNOWN_TOPIC_OR_PARTITION}
> > > (org.apache.kafka.clients.NetworkClient)*
> > >
> > > *[2016-12-13 14:53:45,984] WARN Error while fetching metadata with
> > > correlation id 1 : {ssltopic=UNKNOWN_TOPIC_OR_PARTITION}
> > > 

Re: Kafka SSL encryption plus external CA

2016-12-19 Thread Rajini Sivaram
Stephane,

If you are using a trusted CA like Verisign, clients don't need to specify
a truststore. The host names specified in advertised.listeners in the
broker must match the wildcard DNS names in the certificates if clients
configure ssl.endpoint.identification.algorithm=https. If
ssl.endpoint.identification.algorithm is not specified, by default hostname
is not validated. It should be set to  https however to prevent
man-in-the-middle attacks. There is an open JIRA to make this the default
in Kafka.

It makes sense to enable SSL in dev and prod to ensure that the code path
being run in dev is the same as in prod.



On Mon, Dec 19, 2016 at 3:50 AM, Stephane Maarek <
steph...@simplemachines.com.au> wrote:

> Hi,
>
> I have read the docs extensively but yet there are a few answers I can’t
> find. It has to do with external CA
> Please confirm my understanding if possible:
>
> I can create my own CA to sign all the brokers and clients certificates.
> Pros:
> - cheap, easy, automated. I need to find a way to access that CA
> programatically for new brokers if I want to automated their deployment,
> but I could use something like credstash or vault for that.
> Cons:
> - all of my clients needs to trust the CA. That means somehow find a way
> for my clients to get access to the CA  using ca-cert and add it to their
> truststore… correct?
>
> I don’t really like the fact that I need to provide the CA cert file to
> every client. That seems quite hard to achieve, and prevents my users from
> using the Kafka cluster directly. What’s the best way for the Kafka clients
> to get access to the CA, while my users are doing dev, etc? Most of our
> applications run in Docker, which means we usually pass stuff around using
> environment variables.
>
>
> My next idea was to use an external CA (like Verisign) to sign my
> certificate with a wildcard *.kafka.mydomain.com (A records pointing to
> internal IPs - the DNS name would be the advertised kafka hostname). My
> goal was then for the clients not to require to trust the CA because it
> would be automatically trusted? Do I have the correct understanding? Or do
> I still need to add the external CA to the truststore of my clients?
> (basically I’m trying to reproduce the behaviour of what a web browser
> does).
>
>
> Finally, is it recommended to enable SSL in my dev Kafka cluster vs my prod
> Kafka cluster, or to have SSL on each cluster?
>
> Thanks!
>
> Kind regards,
> Stephane
>



-- 
Regards,

Rajini


Subscribe.

2016-12-19 Thread Venkata D
Subscribe.


TLS

2016-12-19 Thread Ruben Poveda Teba
Hello,

I'm trying set SSL/TLS between Kafka-Zookeeper-Kafka but in the documentation 
you explain that the security must set as JAAS, but what is JAAS and how 
configure it?

Un saludo,


Rubén Poveda Teba
Security Infrastructures
rpov...@sia.es


Re: Producer connect timeouts

2016-12-19 Thread Luke Steensen
Makes sense, thanks Ewen.

Is this something we could consider fixing in Kafka itself? I don't think
the producer is necessarily doing anything wrong, but the end result is
certainly very surprising behavior. It would also be nice not to have to
coordinate request timeouts, retries, and the max block configuration with
system-level configs.


On Sat, Dec 17, 2016 at 6:55 PM, Ewen Cheslack-Postava 
wrote:

> Without having dug back into the code to check, this sounds right.
> Connection management just fires off a request to connect and then
> subsequent poll() calls will handle any successful/failed connections. The
> timeouts wrt requests are handled somewhat differently (the connection
> request isn't explicitly tied to the request that triggered it, so when the
> latter times out, we don't follow up and timeout the connection request
> either).
>
> So yes, you currently will have connection requests tied to your underlying
> TCP timeout request. This tends to be much more of a problem in public
> clouds where the handshake request will be silently dropped due to firewall
> rules.
>
> The metadata.max.age.ms is a workable solution, but agreed that it's not
> great. If possible, reducing the default TCP connection timeout isn't
> unreasonable either -- the defaults are set for WAN connections (and
> arguably set for WAN connections of long ago), so much more aggressive
> timeouts are reasonable for Kafka clusters.
>
> -Ewen
>
> On Fri, Dec 16, 2016 at 1:41 PM, Luke Steensen  braintreepayments.com> wrote:
>
> > Hello,
> >
> > Is it correct that producers do not fail new connection establishment
> when
> > it exceeds the request timeout?
> >
> > Running on AWS, we've encountered a problem where certain very low volume
> > producers end up with metadata that's sufficiently stale that they
> attempt
> > to establish a connection to a broker instance that has already been
> > terminated as part of a maintenance operation. I would expect this to
> fail
> > and be retried normally, but it appears to hang until the system-level
> TCP
> > connection timeout is reached (2-3 minutes), with the writes themselves
> > being expired before even a single attempt is made to send them.
> >
> > We've worked around the issue by setting `metadata.max.age.ms` extremely
> > low, such that these producers are requesting new metadata much faster
> than
> > our maintenance operations are terminating instances. While this does
> work,
> > it seems like an unfortunate workaround for some very surprising
> behavior.
> >
> > Thanks,
> > Luke
> >
>


Re: How does 'TimeWindows.of().until()' work?

2016-12-19 Thread Damian Guy
Yes that is one of the methods. It will be available on the 0.10.2 release
which is due at the beginning of February.

On Mon, 19 Dec 2016 at 12:17 Sachin Mittal  wrote:

> I believe you are talking about this method.
> public  KTable aggregate(final
> Initializer initializer,
>   final
> Aggregator aggregator,
>   final
> Windows windows,
>   final
> StateStoreSupplier storeSupplier)
>
> Will this api be part of next release?
>
> I can go about using this, however if in StateStoreSupplier we add some api
> to update the logConfig, then we can pass all the topic level props as part
> of streams config directly.
>
> Thanks
> Sachin
>
>
>
> On Mon, Dec 19, 2016 at 5:32 PM, Damian Guy  wrote:
>
> > Hi Sachin,
> >
> > I think we have a way of doing what you want already. If you create a
> > custom state store you can call the enableLogging method and pass in any
> > configuration parameters you want: For example:
> >
> > final StateStoreSupplier supplier = Stores.create("store")
> > .withKeys(Serdes.String())
> > .withValues(Serdes.String())
> > .persistent()
> > .enableLogging(Collections.singletonMap("retention.ms", "1000"))
> > .build();
> >
> > You can then use the overloaded methods in the DSL to pass in the
> > StateStoreSupplier to your aggregates (trunk only)
> >
> >
> > On Mon, 19 Dec 2016 at 10:58 Sachin Mittal  wrote:
> >
> > > Hi,
> > > I am working towards adding topic configs as part of streams config.
> > > However I have run into an issue:
> > > Code flow is like this
> > >
> > > KStreamBuilder builder = new KStreamBuilder();
> > > builder.stream(...)
> > > ...
> > > KafkaStreams streams = new KafkaStreams(builder, streamsProps);
> > > streams.start();
> > >
> > > So we can see we build the topology before building the streams.
> > > While building topology it assigns state store.
> > > That time no topic config props are available.
> > >
> > > So it creates the supplier with empty topic config.
> > >
> > > Further StateStoreSupplier has method just to get the config and not to
> > > update it.
> > > Map logConfig()
> > >
> > > One way to implement this is change this interface to be able to update
> > the
> > > log config props too.
> > > And we the props are available to streams we update the topology
> > builder's
> > > state stores too with updated config.
> > >
> > > Other way is to change the KStreamBuilder and make it pass the topic
> > > config.
> > > However in second approach we would be splitting the streams config
> into
> > > two parts.
> > >
> > > Let me know how should one proceed with this.
> > >
> > > Thanks
> > > Sachin
> > >
> > >
> > >
> > > On Thu, Dec 15, 2016 at 2:27 PM, Matthias J. Sax <
> matth...@confluent.io>
> > > wrote:
> > >
> > > > I agree. We got already multiple request to add an API for specifying
> > > > topic parameters for internal topic... I am pretty sure we will add
> it
> > > > if time permits -- feel free to contribute this new feature!
> > > >
> > > > About chancing the value of until: that does not work, as the
> changelog
> > > > topic configuration would not be updated.
> > > >
> > > >
> > > > -Matthias
> > > >
> > > > On 12/14/16 8:22 PM, Sachin Mittal wrote:
> > > > > Hi,
> > > > > I suggest to include topic config as well as part of streams config
> > > > > properties like we do for producer and consumer configs.
> > > > > The topic config supplied would be used for creating internal
> > changelog
> > > > > topics along with certain additional configs which are applied by
> > > > default.
> > > > >
> > > > > This way we don't have to ever create internal topics manually.
> > > > >
> > > > > I had one doubt regarding until.
> > > > > Say I specify one value and run my streams app.
> > > > > Now I stop the app, specify different value and re start the app.
> > > > >
> > > > > Which value for retain would the old (pre existing) windows use.
> > Would
> > > it
> > > > > be the older value or the new value?
> > > > >
> > > > > Thanks
> > > > > Sachin
> > > > >
> > > > >
> > > > >
> > > > > On Thu, Dec 15, 2016 at 12:26 AM, Matthias J. Sax <
> > > matth...@confluent.io
> > > > >
> > > > > wrote:
> > > > >
> > > > >> Understood. Makes sense.
> > > > >>
> > > > >> For this, you should apply Streams configs manually when creating
> > > those
> > > > >> topics. For retention parameter, use the value you specify in
> > > > >> corresponding .until() method for it.
> > > > >>
> > > > >>
> > > > >> -Matthias
> > > > >>
> > > > >>
> > > > >> On 12/14/16 10:08 AM, Sachin Mittal wrote:
> > > > >>> I was referring to internal change log topic. I had to create
> them
> > > > >> manually
> > > > >>> because in some case the message size of 

Re: getting intermittent TimeoutException at producer side in streams application

2016-12-19 Thread Damian Guy
Hi,

You could use this:
https://github.com/apache/kafka/blob/trunk/streams/src/test/resources/log4j.properties
as
a starting point. You will probably want to change the level from WARN to
INFO.

Thanks

On Mon, 19 Dec 2016 at 12:49 Sachin Mittal  wrote:

> Hi,
> In order to enable streams logging we would need to add log4j.properties as
> part of our classpath right?
>
> Is there any sample log properties we can use.
>
> I think this issue mostly happens when we run streams app in a different DC
> than the brokers.
>
> However would like to see in logs if there is any additional information
> which we can check.
>
> Thanks
> Sachin
>
> On Mon, Dec 19, 2016 at 5:43 PM, Damian Guy  wrote:
>
> > Hi Sachin,
> >
> > This would usually indicate that may indicate that there is a
> connectivity
> > issue with the brokers. You would need to correlate the logs etc on the
> > brokers with the streams logs to try and understand what is happening.
> >
> > Thanks,
> > Damian
> >
> > On Sun, 18 Dec 2016 at 07:26 Sachin Mittal  wrote:
> >
> > > Hi all,
> > > I have a simple stream application pipeline
> > > src.filter.aggragteByKey.mapValues.forEach
> > >
> > > From time to time I get the following exception:
> > > Error sending record to topic test-stream-key-table-changelog
> > > org.apache.kafka.common.errors.TimeoutException: Batch containing 2
> > > record(s) expired due to timeout while requesting metadata from brokers
> > for
> > > test-stream-key-table-changelog-0
> > >
> > > What could be causing the issue?
> > > I investigated a bit and saw none of the stage takes a long time. Even
> in
> > > forEach stage where we commit the output to external db takes sub 100
> ms
> > in
> > > worst case.
> > >
> > > I have right now done a workaround of
> > > props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180);
> > >
> > > Increased the default timeout from 30 seconds to 3 minutes.
> > >
> > > However to dig deep into the issue where can the problem be?
> > >
> > > Is it that some stage is taking beyond 30 seconds to execute. Or is it
> > some
> > > network issue where it is taking a long time to connect to broker
> itself?
> > >
> > > Any logging that I can enable at the streams side to get more complete
> > > stacktraces?
> > >
> > > Note that issue occurs in bunches. Then everything works fine for a
> while
> > > then these exceptions come in bunch and then it works fine for sometime
> > > then again exceptions and so on.
> > >
> > > Note that my version is kafka_2.10-0.10.0.1.
> > >
> > > Thanks
> > > Sachin
> > >
> >
>


Re: getting intermittent TimeoutException at producer side in streams application

2016-12-19 Thread Sachin Mittal
Hi,
In order to enable streams logging we would need to add log4j.properties as
part of our classpath right?

Is there any sample log properties we can use.

I think this issue mostly happens when we run streams app in a different DC
than the brokers.

However would like to see in logs if there is any additional information
which we can check.

Thanks
Sachin

On Mon, Dec 19, 2016 at 5:43 PM, Damian Guy  wrote:

> Hi Sachin,
>
> This would usually indicate that may indicate that there is a connectivity
> issue with the brokers. You would need to correlate the logs etc on the
> brokers with the streams logs to try and understand what is happening.
>
> Thanks,
> Damian
>
> On Sun, 18 Dec 2016 at 07:26 Sachin Mittal  wrote:
>
> > Hi all,
> > I have a simple stream application pipeline
> > src.filter.aggragteByKey.mapValues.forEach
> >
> > From time to time I get the following exception:
> > Error sending record to topic test-stream-key-table-changelog
> > org.apache.kafka.common.errors.TimeoutException: Batch containing 2
> > record(s) expired due to timeout while requesting metadata from brokers
> for
> > test-stream-key-table-changelog-0
> >
> > What could be causing the issue?
> > I investigated a bit and saw none of the stage takes a long time. Even in
> > forEach stage where we commit the output to external db takes sub 100 ms
> in
> > worst case.
> >
> > I have right now done a workaround of
> > props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180);
> >
> > Increased the default timeout from 30 seconds to 3 minutes.
> >
> > However to dig deep into the issue where can the problem be?
> >
> > Is it that some stage is taking beyond 30 seconds to execute. Or is it
> some
> > network issue where it is taking a long time to connect to broker itself?
> >
> > Any logging that I can enable at the streams side to get more complete
> > stacktraces?
> >
> > Note that issue occurs in bunches. Then everything works fine for a while
> > then these exceptions come in bunch and then it works fine for sometime
> > then again exceptions and so on.
> >
> > Note that my version is kafka_2.10-0.10.0.1.
> >
> > Thanks
> > Sachin
> >
>


Re: How does 'TimeWindows.of().until()' work?

2016-12-19 Thread Sachin Mittal
I believe you are talking about this method.
public  KTable aggregate(final
Initializer initializer,
  final
Aggregator aggregator,
  final
Windows windows,
  final
StateStoreSupplier storeSupplier)

Will this api be part of next release?

I can go about using this, however if in StateStoreSupplier we add some api
to update the logConfig, then we can pass all the topic level props as part
of streams config directly.

Thanks
Sachin



On Mon, Dec 19, 2016 at 5:32 PM, Damian Guy  wrote:

> Hi Sachin,
>
> I think we have a way of doing what you want already. If you create a
> custom state store you can call the enableLogging method and pass in any
> configuration parameters you want: For example:
>
> final StateStoreSupplier supplier = Stores.create("store")
> .withKeys(Serdes.String())
> .withValues(Serdes.String())
> .persistent()
> .enableLogging(Collections.singletonMap("retention.ms", "1000"))
> .build();
>
> You can then use the overloaded methods in the DSL to pass in the
> StateStoreSupplier to your aggregates (trunk only)
>
>
> On Mon, 19 Dec 2016 at 10:58 Sachin Mittal  wrote:
>
> > Hi,
> > I am working towards adding topic configs as part of streams config.
> > However I have run into an issue:
> > Code flow is like this
> >
> > KStreamBuilder builder = new KStreamBuilder();
> > builder.stream(...)
> > ...
> > KafkaStreams streams = new KafkaStreams(builder, streamsProps);
> > streams.start();
> >
> > So we can see we build the topology before building the streams.
> > While building topology it assigns state store.
> > That time no topic config props are available.
> >
> > So it creates the supplier with empty topic config.
> >
> > Further StateStoreSupplier has method just to get the config and not to
> > update it.
> > Map logConfig()
> >
> > One way to implement this is change this interface to be able to update
> the
> > log config props too.
> > And we the props are available to streams we update the topology
> builder's
> > state stores too with updated config.
> >
> > Other way is to change the KStreamBuilder and make it pass the topic
> > config.
> > However in second approach we would be splitting the streams config into
> > two parts.
> >
> > Let me know how should one proceed with this.
> >
> > Thanks
> > Sachin
> >
> >
> >
> > On Thu, Dec 15, 2016 at 2:27 PM, Matthias J. Sax 
> > wrote:
> >
> > > I agree. We got already multiple request to add an API for specifying
> > > topic parameters for internal topic... I am pretty sure we will add it
> > > if time permits -- feel free to contribute this new feature!
> > >
> > > About chancing the value of until: that does not work, as the changelog
> > > topic configuration would not be updated.
> > >
> > >
> > > -Matthias
> > >
> > > On 12/14/16 8:22 PM, Sachin Mittal wrote:
> > > > Hi,
> > > > I suggest to include topic config as well as part of streams config
> > > > properties like we do for producer and consumer configs.
> > > > The topic config supplied would be used for creating internal
> changelog
> > > > topics along with certain additional configs which are applied by
> > > default.
> > > >
> > > > This way we don't have to ever create internal topics manually.
> > > >
> > > > I had one doubt regarding until.
> > > > Say I specify one value and run my streams app.
> > > > Now I stop the app, specify different value and re start the app.
> > > >
> > > > Which value for retain would the old (pre existing) windows use.
> Would
> > it
> > > > be the older value or the new value?
> > > >
> > > > Thanks
> > > > Sachin
> > > >
> > > >
> > > >
> > > > On Thu, Dec 15, 2016 at 12:26 AM, Matthias J. Sax <
> > matth...@confluent.io
> > > >
> > > > wrote:
> > > >
> > > >> Understood. Makes sense.
> > > >>
> > > >> For this, you should apply Streams configs manually when creating
> > those
> > > >> topics. For retention parameter, use the value you specify in
> > > >> corresponding .until() method for it.
> > > >>
> > > >>
> > > >> -Matthias
> > > >>
> > > >>
> > > >> On 12/14/16 10:08 AM, Sachin Mittal wrote:
> > > >>> I was referring to internal change log topic. I had to create them
> > > >> manually
> > > >>> because in some case the message size of these topic were greater
> > than
> > > >> the
> > > >>> default ones used by kafka streams.
> > > >>>
> > > >>> I think someone in this group recommended to create these topic
> > > >> manually. I
> > > >>> understand that it is better to have internal topics created by
> > streams
> > > >> app
> > > >>> and I will take a second look at these and see if that can be done.
> > > >>>
> > > >>> I just wanted to make sure what all configs are applied to internal
> > > >> 

Re: getting intermittent TimeoutException at producer side in streams application

2016-12-19 Thread Damian Guy
Hi Sachin,

This would usually indicate that may indicate that there is a connectivity
issue with the brokers. You would need to correlate the logs etc on the
brokers with the streams logs to try and understand what is happening.

Thanks,
Damian

On Sun, 18 Dec 2016 at 07:26 Sachin Mittal  wrote:

> Hi all,
> I have a simple stream application pipeline
> src.filter.aggragteByKey.mapValues.forEach
>
> From time to time I get the following exception:
> Error sending record to topic test-stream-key-table-changelog
> org.apache.kafka.common.errors.TimeoutException: Batch containing 2
> record(s) expired due to timeout while requesting metadata from brokers for
> test-stream-key-table-changelog-0
>
> What could be causing the issue?
> I investigated a bit and saw none of the stage takes a long time. Even in
> forEach stage where we commit the output to external db takes sub 100 ms in
> worst case.
>
> I have right now done a workaround of
> props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180);
>
> Increased the default timeout from 30 seconds to 3 minutes.
>
> However to dig deep into the issue where can the problem be?
>
> Is it that some stage is taking beyond 30 seconds to execute. Or is it some
> network issue where it is taking a long time to connect to broker itself?
>
> Any logging that I can enable at the streams side to get more complete
> stacktraces?
>
> Note that issue occurs in bunches. Then everything works fine for a while
> then these exceptions come in bunch and then it works fine for sometime
> then again exceptions and so on.
>
> Note that my version is kafka_2.10-0.10.0.1.
>
> Thanks
> Sachin
>


Re: How does 'TimeWindows.of().until()' work?

2016-12-19 Thread Damian Guy
Hi Sachin,

I think we have a way of doing what you want already. If you create a
custom state store you can call the enableLogging method and pass in any
configuration parameters you want: For example:

final StateStoreSupplier supplier = Stores.create("store")
.withKeys(Serdes.String())
.withValues(Serdes.String())
.persistent()
.enableLogging(Collections.singletonMap("retention.ms", "1000"))
.build();

You can then use the overloaded methods in the DSL to pass in the
StateStoreSupplier to your aggregates (trunk only)


On Mon, 19 Dec 2016 at 10:58 Sachin Mittal  wrote:

> Hi,
> I am working towards adding topic configs as part of streams config.
> However I have run into an issue:
> Code flow is like this
>
> KStreamBuilder builder = new KStreamBuilder();
> builder.stream(...)
> ...
> KafkaStreams streams = new KafkaStreams(builder, streamsProps);
> streams.start();
>
> So we can see we build the topology before building the streams.
> While building topology it assigns state store.
> That time no topic config props are available.
>
> So it creates the supplier with empty topic config.
>
> Further StateStoreSupplier has method just to get the config and not to
> update it.
> Map logConfig()
>
> One way to implement this is change this interface to be able to update the
> log config props too.
> And we the props are available to streams we update the topology builder's
> state stores too with updated config.
>
> Other way is to change the KStreamBuilder and make it pass the topic
> config.
> However in second approach we would be splitting the streams config into
> two parts.
>
> Let me know how should one proceed with this.
>
> Thanks
> Sachin
>
>
>
> On Thu, Dec 15, 2016 at 2:27 PM, Matthias J. Sax 
> wrote:
>
> > I agree. We got already multiple request to add an API for specifying
> > topic parameters for internal topic... I am pretty sure we will add it
> > if time permits -- feel free to contribute this new feature!
> >
> > About chancing the value of until: that does not work, as the changelog
> > topic configuration would not be updated.
> >
> >
> > -Matthias
> >
> > On 12/14/16 8:22 PM, Sachin Mittal wrote:
> > > Hi,
> > > I suggest to include topic config as well as part of streams config
> > > properties like we do for producer and consumer configs.
> > > The topic config supplied would be used for creating internal changelog
> > > topics along with certain additional configs which are applied by
> > default.
> > >
> > > This way we don't have to ever create internal topics manually.
> > >
> > > I had one doubt regarding until.
> > > Say I specify one value and run my streams app.
> > > Now I stop the app, specify different value and re start the app.
> > >
> > > Which value for retain would the old (pre existing) windows use. Would
> it
> > > be the older value or the new value?
> > >
> > > Thanks
> > > Sachin
> > >
> > >
> > >
> > > On Thu, Dec 15, 2016 at 12:26 AM, Matthias J. Sax <
> matth...@confluent.io
> > >
> > > wrote:
> > >
> > >> Understood. Makes sense.
> > >>
> > >> For this, you should apply Streams configs manually when creating
> those
> > >> topics. For retention parameter, use the value you specify in
> > >> corresponding .until() method for it.
> > >>
> > >>
> > >> -Matthias
> > >>
> > >>
> > >> On 12/14/16 10:08 AM, Sachin Mittal wrote:
> > >>> I was referring to internal change log topic. I had to create them
> > >> manually
> > >>> because in some case the message size of these topic were greater
> than
> > >> the
> > >>> default ones used by kafka streams.
> > >>>
> > >>> I think someone in this group recommended to create these topic
> > >> manually. I
> > >>> understand that it is better to have internal topics created by
> streams
> > >> app
> > >>> and I will take a second look at these and see if that can be done.
> > >>>
> > >>> I just wanted to make sure what all configs are applied to internal
> > >> topics
> > >>> in order to decide to avoid them creating manually.
> > >>>
> > >>> Thanks
> > >>> Sachin
> > >>>
> > >>>
> > >>> On Wed, Dec 14, 2016 at 11:08 PM, Matthias J. Sax <
> > matth...@confluent.io
> > >>>
> > >>> wrote:
> > >>>
> >  I am wondering about "I create internal topic manually" -- which
> > topics
> >  do you refer in detail?
> > 
> >  Kafka Streams create all kind of internal topics with auto-generated
> >  names. So it would be quite tricky to create all of them manually
> >  (especially because you need to know those name in advance).
> > 
> >  IRRC, if a topic does exist, Kafka Streams does no change it's
> >  configuration. Only if Kafka Streams does create a topic, it will
> >  specify certain config parameters on topic create step.
> > 
> > 
> >  -Matthias
> > 
> > 
> > 
> >  On 12/13/16 8:16 PM, Sachin Mittal wrote:
> > > Hi,
> > > Thanks for the explanation. This 

Re: Halting because log truncation is not allowed for topic __consumer_offsets

2016-12-19 Thread Ben Stopford
Hi Jun

This should only be possible in situations where there is a crash or
something happens to the underlying disks (assuming clean leader election).
I've not come across others. The assumption, as I understand it, is that
the underlying issue stems from KAFKA-1211
 which is being addressed
in KIP-101
.
If you can reproduce in a more generally scenario we would be very
interested.

All the best
B


On Mon, Dec 19, 2016 at 12:35 AM Jun MA  wrote:

> Would be grateful to hear opinions from experts out there. Thanks in
> advance
>
> > On Dec 17, 2016, at 11:06 AM, Jun MA  wrote:
> >
> > Hi,
> >
> > We saw the following FATAL error in 2 of our brokers (3 in total, the
> active controller doesn’t have this) and they crashed in the same time.
> >
> > [2016-12-16 16:12:47,085] FATAL [ReplicaFetcherThread-0-3], Halting
> because log truncation is not allowed for topic __consumer_offsets, Current
> leader 3's latest offset 5910081 is less than replica 1's latest offset
> 5910082 (kafka.server.ReplicaFetcherThread)
> >
> > Our solution is set topic __consumer_offsets
> unclean.leader.election.enable=true temporarily, and restart brokers. In
> this way we potentially lose offsets of some topics. Is there any better
> solutions?
> >
> > I saw related tickets https://issues.apache.org/jira/browse/KAFKA-3861 <
> https://issues.apache.org/jira/browse/KAFKA-3861>,
> https://issues.apache.org/jira/browse/KAFKA-3410 <
> https://issues.apache.org/jira/browse/KAFKA-3410> and understand why
> brokers crashed. But we didn’t see any scenarios mentioned in above
> tickets. Is there any other reason why this happened? There’s no broker
> restart involved in our case. How can we prevent it from happening?
> >
> > We’re using 0.9.0 with unclean.leader.election.enable=false and
> min.insync.replicas=2.
> >
> > Thanks,
> > Jun
>
>


Re: How does 'TimeWindows.of().until()' work?

2016-12-19 Thread Sachin Mittal
Hi,
I am working towards adding topic configs as part of streams config.
However I have run into an issue:
Code flow is like this

KStreamBuilder builder = new KStreamBuilder();
builder.stream(...)
...
KafkaStreams streams = new KafkaStreams(builder, streamsProps);
streams.start();

So we can see we build the topology before building the streams.
While building topology it assigns state store.
That time no topic config props are available.

So it creates the supplier with empty topic config.

Further StateStoreSupplier has method just to get the config and not to
update it.
Map logConfig()

One way to implement this is change this interface to be able to update the
log config props too.
And we the props are available to streams we update the topology builder's
state stores too with updated config.

Other way is to change the KStreamBuilder and make it pass the topic config.
However in second approach we would be splitting the streams config into
two parts.

Let me know how should one proceed with this.

Thanks
Sachin



On Thu, Dec 15, 2016 at 2:27 PM, Matthias J. Sax 
wrote:

> I agree. We got already multiple request to add an API for specifying
> topic parameters for internal topic... I am pretty sure we will add it
> if time permits -- feel free to contribute this new feature!
>
> About chancing the value of until: that does not work, as the changelog
> topic configuration would not be updated.
>
>
> -Matthias
>
> On 12/14/16 8:22 PM, Sachin Mittal wrote:
> > Hi,
> > I suggest to include topic config as well as part of streams config
> > properties like we do for producer and consumer configs.
> > The topic config supplied would be used for creating internal changelog
> > topics along with certain additional configs which are applied by
> default.
> >
> > This way we don't have to ever create internal topics manually.
> >
> > I had one doubt regarding until.
> > Say I specify one value and run my streams app.
> > Now I stop the app, specify different value and re start the app.
> >
> > Which value for retain would the old (pre existing) windows use. Would it
> > be the older value or the new value?
> >
> > Thanks
> > Sachin
> >
> >
> >
> > On Thu, Dec 15, 2016 at 12:26 AM, Matthias J. Sax  >
> > wrote:
> >
> >> Understood. Makes sense.
> >>
> >> For this, you should apply Streams configs manually when creating those
> >> topics. For retention parameter, use the value you specify in
> >> corresponding .until() method for it.
> >>
> >>
> >> -Matthias
> >>
> >>
> >> On 12/14/16 10:08 AM, Sachin Mittal wrote:
> >>> I was referring to internal change log topic. I had to create them
> >> manually
> >>> because in some case the message size of these topic were greater than
> >> the
> >>> default ones used by kafka streams.
> >>>
> >>> I think someone in this group recommended to create these topic
> >> manually. I
> >>> understand that it is better to have internal topics created by streams
> >> app
> >>> and I will take a second look at these and see if that can be done.
> >>>
> >>> I just wanted to make sure what all configs are applied to internal
> >> topics
> >>> in order to decide to avoid them creating manually.
> >>>
> >>> Thanks
> >>> Sachin
> >>>
> >>>
> >>> On Wed, Dec 14, 2016 at 11:08 PM, Matthias J. Sax <
> matth...@confluent.io
> >>>
> >>> wrote:
> >>>
>  I am wondering about "I create internal topic manually" -- which
> topics
>  do you refer in detail?
> 
>  Kafka Streams create all kind of internal topics with auto-generated
>  names. So it would be quite tricky to create all of them manually
>  (especially because you need to know those name in advance).
> 
>  IRRC, if a topic does exist, Kafka Streams does no change it's
>  configuration. Only if Kafka Streams does create a topic, it will
>  specify certain config parameters on topic create step.
> 
> 
>  -Matthias
> 
> 
> 
>  On 12/13/16 8:16 PM, Sachin Mittal wrote:
> > Hi,
> > Thanks for the explanation. This illustration makes it super easy to
> > understand how until works. Perhaps we can update the wiki with this
> > illustration.
> > It is basically the retention time for a past window.
> > I used to think until creates all the future windows for that period
> >> and
> > when time passes that it used to delete all the past windows. However
> > actually until retains a window for specified time. This makes so
> much
>  more
> > sense.
> >
> > I just had one pending query regarding:
> >
> >> windowstore.changelog.additional.retention.ms
> >
> > How does this relate to rentention.ms param of topic config?
> > I create internal topic manually using say rentention.ms=360.
> > In next release (post kafka_2.10-0.10.0.1) since we support delete of
> > internal changelog topic as well and I want it to be retained for say
>  just