Re: [DISCUSS] KIP-78: Cluster Id

2016-09-04 Thread sumit arrawatia
Hi Dong,

Please find my answers inline.

Hopefully they address your concerns this time !

Sumit

On Sun, Sep 4, 2016 at 4:54 PM, Dong Lin  wrote:

> Hey Ismael,
>
> Thanks for the explanation Ismael. Please see my comment inline.
>
> On Sun, Sep 4, 2016 at 8:58 AM, Ismael Juma  wrote:
>
> > Hi Dong,
> >
> > Sorry for the delay, I was offline for 24 hours. Thankfully Sumit was
> > around to explain the reasoning for the current approach. From reading
> the
> > thread, it seems like it may help to reiterate a few important goals for
> > the auditing use case where there is a requirement to associate a message
> > with a particular cluster:
> >
> >1. Each cluster id should be unique.
> >2. The cluster id should be immutable. If the cluster id can be
> changed
> >as a matter of course, it's hard to provide any guarantees. Assigning
> a
> >meaningful name for the id makes it more likely that you may need to
> > change
> >it whereas decoupling the immutable id from the
> > meaningful/human-readable
> >name eliminates the issue.
> >
>
> I don't think have a human-readable name is equivalent to a meaningful
> name. It is not true that a human readable name makes it more likely you
> want to change it. Look, every city has a human readable name and we don't
> worry about changing its name. The conference room in any company has a
> human readable name instead of a random id. For the same reason you can
> name a cluster as Yosemite and don't have to change it in the future.
>
> By immutable I think you are saying that we should prevent people from
> changing cluster.id. However, this KIP doesn't really prevent this from
> happening -- user can delete znode and restart kafka to change cluster.id.
> Therefore the requirement is not satisfied anyway.
>
> I am also not sure why you want to prevent people from changing cluster.id
> after reading the motivation section of this KIP. Is there any motivation
> or use-case for this requirement?


As I explained before, a stable cluster id is required for monitoring and
auditing use cases which are the main motivations of this change. If the id
changes, you will need to either update either the historical data with new
cluster id or throw the data away.

BTW, You provide excellent analogies for why the id should not be human
readable and changeable :).

Cities have human readable names and it is very hard to enforce that city
name is unique which is why the postal department needs you provide zip
code if you want your mail to be delivered correctly. This is analogous to
why we need cluster id to be stable in a auditing/ monitoring use case (you
need a zip code (cluster id) to uniquely identify an address (cluster) for
a message).

Taking the analogy further, we in India recently changed names of biggest
cities to Indian name from their British given names and nobody had to
change all their historical data or business logic because the zip code
remained the same.

Your conference room analogy is also very helpful in understanding why it
is difficult to manage uniqueness across the organization and why
uniqueness is essential for monitoring and auditing. Let's say your
companies has offices all across the world and you wanted to audit and
monitor their usage. Having a room called "Yosemite" in 3 locations will
make this impossible. Either you will need to coordinate to make sure
conference names are unique in offices across the world which places a lot
of burden on the team. To avoid this, you can assign unique ids to
conference rooms with Human readable names. And this is in fact what most
big organizations do.

And this is why the monitoring/alerting use cases become much much harder
too if you don't have unique ids .

And as I explained before, this KIP lays the foundation for the approach of
using ids + human readable tags. As Ismael said earlier, we want to do so
in small incremental steps. So, the current KIP just focuses on cluster id
and a future KIP will add human readable tags.



>


>
>
> >3. Every cluster should have an id. An optional cluster id makes the
> >downstream code more complex and makes the feature less useful.
> >
>
> It is not clear why it will make downstream code would be more complex and
> feature less useful if we provide a default cluster.id here. For users who
> are not interested in this feature, they can use the cluster.id and all
> downstream application will not be affected. For users who need this
> feature, they can configure a unique human readable cluster.id for their
> clusters. In this case the downstream application will have the same
> complexity as with the approach in this KIP. Did I miss something?
>
> Here we can assume user can configure unique human-readable cluster.id in
> the config since it is discussed in the Concern 1 below.
>
>
>
I would like to point out that it is much easier to develop generic tools
for downstream processing in the community if 

Build failed in Jenkins: kafka-trunk-jdk8 #860

2016-09-04 Thread Apache Jenkins Server
See 

Changes:

[wangguoz] KAFKA-4105: Queryable state tests

--
[...truncated 3450 lines...]

kafka.integration.SaslPlaintextTopicMetadataTest > testAutoCreateTopic STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > testAutoCreateTopic PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > testGetAllTopicMetadata 
STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > testGetAllTopicMetadata 
PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterNewBrokerStartup STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterNewBrokerStartup PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > testBasicTopicMetadata 
STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > testBasicTopicMetadata PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAutoCreateTopicWithInvalidReplication STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAutoCreateTopicWithInvalidReplication PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown PASSED

kafka.integration.PrimitiveApiTest > testMultiProduce STARTED

kafka.integration.PrimitiveApiTest > testMultiProduce PASSED

kafka.integration.PrimitiveApiTest > testDefaultEncoderProducerAndFetch STARTED

kafka.integration.PrimitiveApiTest > testDefaultEncoderProducerAndFetch PASSED

kafka.integration.PrimitiveApiTest > testFetchRequestCanProperlySerialize 
STARTED

kafka.integration.PrimitiveApiTest > testFetchRequestCanProperlySerialize PASSED

kafka.integration.PrimitiveApiTest > testPipelinedProduceRequests STARTED

kafka.integration.PrimitiveApiTest > testPipelinedProduceRequests PASSED

kafka.integration.PrimitiveApiTest > testProduceAndMultiFetch STARTED

kafka.integration.PrimitiveApiTest > testProduceAndMultiFetch PASSED

kafka.integration.PrimitiveApiTest > 
testDefaultEncoderProducerAndFetchWithCompression STARTED

kafka.integration.PrimitiveApiTest > 
testDefaultEncoderProducerAndFetchWithCompression PASSED

kafka.integration.PrimitiveApiTest > testConsumerEmptyTopic STARTED

kafka.integration.PrimitiveApiTest > testConsumerEmptyTopic PASSED

kafka.integration.PrimitiveApiTest > testEmptyFetchRequest STARTED

kafka.integration.PrimitiveApiTest > testEmptyFetchRequest PASSED

kafka.integration.PlaintextTopicMetadataTest > 
testIsrAfterBrokerShutDownAndJoinsBack STARTED

kafka.integration.PlaintextTopicMetadataTest > 
testIsrAfterBrokerShutDownAndJoinsBack PASSED

kafka.integration.PlaintextTopicMetadataTest > testAutoCreateTopicWithCollision 
STARTED

kafka.integration.PlaintextTopicMetadataTest > testAutoCreateTopicWithCollision 
PASSED

kafka.integration.PlaintextTopicMetadataTest > testAliveBrokerListWithNoTopics 
STARTED

kafka.integration.PlaintextTopicMetadataTest > testAliveBrokerListWithNoTopics 
PASSED

kafka.integration.PlaintextTopicMetadataTest > testAutoCreateTopic STARTED

kafka.integration.PlaintextTopicMetadataTest > testAutoCreateTopic PASSED

kafka.integration.PlaintextTopicMetadataTest > testGetAllTopicMetadata STARTED

kafka.integration.PlaintextTopicMetadataTest > testGetAllTopicMetadata PASSED

kafka.integration.PlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterNewBrokerStartup STARTED

kafka.integration.PlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterNewBrokerStartup PASSED

kafka.integration.PlaintextTopicMetadataTest > testBasicTopicMetadata STARTED

kafka.integration.PlaintextTopicMetadataTest > testBasicTopicMetadata PASSED

kafka.integration.PlaintextTopicMetadataTest > 
testAutoCreateTopicWithInvalidReplication STARTED

kafka.integration.PlaintextTopicMetadataTest > 
testAutoCreateTopicWithInvalidReplication PASSED

kafka.integration.PlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown STARTED

kafka.integration.PlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown PASSED

kafka.integration.AutoOffsetResetTest > testResetToLatestWhenOffsetTooLow 
STARTED

kafka.integration.AutoOffsetResetTest > testResetToLatestWhenOffsetTooLow PASSED

kafka.integration.AutoOffsetResetTest > testResetToEarliestWhenOffsetTooLow 
STARTED

kafka.integration.AutoOffsetResetTest > testResetToEarliestWhenOffsetTooLow 
PASSED

kafka.integration.AutoOffsetResetTest > testResetToEarliestWhenOffsetTooHigh 
STARTED

kafka.integration.AutoOffsetResetTest > testResetToEarliestWhenOffsetTooHigh 
PASSED

kafka.integration.AutoOffsetResetTest > testResetToLatestWhenOffsetTooHigh 
STARTED

kafka.integration.AutoOffsetResetTest > testResetToLatestWhenOffsetTooHigh 
PASSED

kafka.integration.SslTopicMetadataTest > testIsrAfterBrokerShutDownAndJoinsBack 
STARTED


[jira] [Commented] (KAFKA-4120) byte[] keys in RocksDB state stores do not work as expected

2016-09-04 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15464171#comment-15464171
 ] 

Greg Fodor commented on KAFKA-4120:
---

We were able to work around it by just creating a proper Avro class for the 
byte data, but I think it would probably be helpful to future if there were a 
way to prevent this from happening -- an exception doesn't seem unreasonable.

> byte[] keys in RocksDB state stores do not work as expected
> ---
>
> Key: KAFKA-4120
> URL: https://issues.apache.org/jira/browse/KAFKA-4120
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Greg Fodor
>Assignee: Guozhang Wang
>
> We ran into an issue using a byte[] key in a RocksDB state store (with the 
> byte array serde.) Internally, the RocksDB store keeps a LRUCache that is 
> backed by a LinkedHashMap that sits between the callers and the actual db. 
> The problem is that while the underlying rocks db will persist byte arrays 
> with equal data as equivalent keys, the LinkedHashMap uses byte[] reference 
> equality from Object.equals/hashcode. So, this can result in multiple entries 
> in the cache for two different byte arrays that have the same contents and 
> are backed by the same key in the db, resulting in unexpected behavior. 
> One such behavior that manifests from this is if you store a value in the 
> state store with a specific key, if you re-read that key with the same byte 
> array you will get the new value, but if you re-read that key with a 
> different byte array with the same bytes, you will get a stale value until 
> the db is flushed. (This made it particularly tricky to track down what was 
> happening :))
> The workaround for us is to convert the keys from raw byte arrays to a 
> deserialized avro structure that provides proper hashcode/equals semantics 
> for the intermediate cache. In general this seems like good practice, so one 
> of the proposed solutions is to simply emit a warning or exception if a key 
> type with breaking semantics like this is provided.
> A few proposed solutions:
> - When the state store is defined on array keys, ensure that the cache map 
> does proper comparisons on array values not array references. This would fix 
> this problem, but seems a bit strange to special case. However, I have a hard 
> time of thinking of other examples where this behavior would burn users.
> - Change the LRU cache to deserialize and serialize all keys to bytes and use 
> a value based comparison for the map. This would be the most correct, as it 
> would ensure that both the rocks db and the cache have identical key spaces 
> and equality/hashing semantics. However, this is probably slow, and since the 
> general case of using avro record types as keys works fine, it will largely 
> be unnecessary overhead.
> - Don't change anything about the behavior, but trigger a warning in the log 
> or fail to start if a state store is defined on array keys (or possibly any 
> key type that fails to properly override Object.equals/hashcode.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4120) byte[] keys in RocksDB state stores do not work as expected

2016-09-04 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15464109#comment-15464109
 ] 

Guozhang Wang commented on KAFKA-4120:
--

Hi [~gfodor], thanks for reporting this issue.

We found this issue some time ago, and took the approach of replacing 
{{byte[]}} with a comparable {{Bytes}} class, which is in the public package 
{{o.a.k.common.utils}}, and you can find its usage in a recent ticket: 
KAFKA-3776. Could you try to use this class in your application as well?

> byte[] keys in RocksDB state stores do not work as expected
> ---
>
> Key: KAFKA-4120
> URL: https://issues.apache.org/jira/browse/KAFKA-4120
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Greg Fodor
>Assignee: Guozhang Wang
>
> We ran into an issue using a byte[] key in a RocksDB state store (with the 
> byte array serde.) Internally, the RocksDB store keeps a LRUCache that is 
> backed by a LinkedHashMap that sits between the callers and the actual db. 
> The problem is that while the underlying rocks db will persist byte arrays 
> with equal data as equivalent keys, the LinkedHashMap uses byte[] reference 
> equality from Object.equals/hashcode. So, this can result in multiple entries 
> in the cache for two different byte arrays that have the same contents and 
> are backed by the same key in the db, resulting in unexpected behavior. 
> One such behavior that manifests from this is if you store a value in the 
> state store with a specific key, if you re-read that key with the same byte 
> array you will get the new value, but if you re-read that key with a 
> different byte array with the same bytes, you will get a stale value until 
> the db is flushed. (This made it particularly tricky to track down what was 
> happening :))
> The workaround for us is to convert the keys from raw byte arrays to a 
> deserialized avro structure that provides proper hashcode/equals semantics 
> for the intermediate cache. In general this seems like good practice, so one 
> of the proposed solutions is to simply emit a warning or exception if a key 
> type with breaking semantics like this is provided.
> A few proposed solutions:
> - When the state store is defined on array keys, ensure that the cache map 
> does proper comparisons on array values not array references. This would fix 
> this problem, but seems a bit strange to special case. However, I have a hard 
> time of thinking of other examples where this behavior would burn users.
> - Change the LRU cache to deserialize and serialize all keys to bytes and use 
> a value based comparison for the map. This would be the most correct, as it 
> would ensure that both the rocks db and the cache have identical key spaces 
> and equality/hashing semantics. However, this is probably slow, and since the 
> general case of using avro record types as keys works fine, it will largely 
> be unnecessary overhead.
> - Don't change anything about the behavior, but trigger a warning in the log 
> or fail to start if a state store is defined on array keys (or possibly any 
> key type that fails to properly override Object.equals/hashcode.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4105) Queryable state tests for concurrency and rebalancing

2016-09-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15464083#comment-15464083
 ] 

ASF GitHub Bot commented on KAFKA-4105:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/1806


> Queryable state tests for concurrency and rebalancing
> -
>
> Key: KAFKA-4105
> URL: https://issues.apache.org/jira/browse/KAFKA-4105
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Eno Thereska
>Assignee: Eno Thereska
> Fix For: 0.10.1.0
>
>
> The queryable state feature (KIP-67) needs more tests on concurrent queries 
> and queries during rebalancing.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1806: KAFKA-4105: Queryable state tests

2016-09-04 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/1806


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Resolved] (KAFKA-4105) Queryable state tests for concurrency and rebalancing

2016-09-04 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-4105.
--
Resolution: Fixed

Issue resolved by pull request 1806
[https://github.com/apache/kafka/pull/1806]

> Queryable state tests for concurrency and rebalancing
> -
>
> Key: KAFKA-4105
> URL: https://issues.apache.org/jira/browse/KAFKA-4105
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Eno Thereska
>Assignee: Eno Thereska
> Fix For: 0.10.1.0
>
>
> The queryable state feature (KIP-67) needs more tests on concurrent queries 
> and queries during rebalancing.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3769) KStream job spending 60% of time writing metrics

2016-09-04 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15463861#comment-15463861
 ] 

Greg Fodor commented on KAFKA-3769:
---

I've done some additional profiling and I have found that this problem also 
seems to crop up in complex kafka streams jobs within the Kafka core Selector 
class. Should I open another JIRA?

> KStream job spending 60% of time writing metrics
> 
>
> Key: KAFKA-3769
> URL: https://issues.apache.org/jira/browse/KAFKA-3769
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Assignee: Guozhang Wang
>Priority: Critical
> Fix For: 0.10.1.0
>
>
> I've been profiling a complex streams job, and found two major hotspots when 
> writing metrics, which take up about 60% of the CPU time of the job. (!) A PR 
> is attached.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-78: Cluster Id

2016-09-04 Thread Dong Lin
Hey Ismael,

Thanks for the explanation Ismael. Please see my comment inline.

On Sun, Sep 4, 2016 at 8:58 AM, Ismael Juma  wrote:

> Hi Dong,
>
> Sorry for the delay, I was offline for 24 hours. Thankfully Sumit was
> around to explain the reasoning for the current approach. From reading the
> thread, it seems like it may help to reiterate a few important goals for
> the auditing use case where there is a requirement to associate a message
> with a particular cluster:
>
>1. Each cluster id should be unique.
>2. The cluster id should be immutable. If the cluster id can be changed
>as a matter of course, it's hard to provide any guarantees. Assigning a
>meaningful name for the id makes it more likely that you may need to
> change
>it whereas decoupling the immutable id from the
> meaningful/human-readable
>name eliminates the issue.
>

I don't think have a human-readable name is equivalent to a meaningful
name. It is not true that a human readable name makes it more likely you
want to change it. Look, every city has a human readable name and we don't
worry about changing its name. The conference room in any company has a
human readable name instead of a random id. For the same reason you can
name a cluster as Yosemite and don't have to change it in the future.

By immutable I think you are saying that we should prevent people from
changing cluster.id. However, this KIP doesn't really prevent this from
happening -- user can delete znode and restart kafka to change cluster.id.
Therefore the requirement is not satisfied anyway.

I am also not sure why you want to prevent people from changing cluster.id
after reading the motivation section of this KIP. Is there any motivation
or use-case for this requirement?



>3. Every cluster should have an id. An optional cluster id makes the
>downstream code more complex and makes the feature less useful.
>

It is not clear why it will make downstream code would be more complex and
feature less useful if we provide a default cluster.id here. For users who
are not interested in this feature, they can use the cluster.id and all
downstream application will not be affected. For users who need this
feature, they can configure a unique human readable cluster.id for their
clusters. In this case the downstream application will have the same
complexity as with the approach in this KIP. Did I miss something?

Here we can assume user can configure unique human-readable cluster.id in
the config since it is discussed in the Concern 1 below.


>4. No new mandatory configs should be introduced. We generally avoid
>doing adding mandatory configs as they make it harder to get started
> with
>Kafka and upgrades become more complex. I've added this to the KIP as it
>was previously missing. Thanks for the reminder.
>
> With that in mind, let's look at your proposal.
>
> When Kafka starts, it reads cluster.id from config. And then it reads
> > cluster.id from zookeeper.
> > - if the cluster.id is not specified in zookeeper, create the znode.
> > - if the cluster.id is specified in zookeeper
> >- if the cluster.id in znode is the same as the that in config,
> proceed
> >- Otherwise, broker startup fails and it reports error. Note that we
> > don't make this change after the startup.
>
>
> The concerns I have are:
>
>1. The same id can be given to two different clusters and we have no
>easy way to detect this (as each cluster may be using a completely
>different ZooKeeper ensemble). Affects goal 1.
>

Right, there is no easy way to detect this automatically with Kafka. But
this is not a requirement to automatically detect violation of uniqueness
in the first place. SRE can manually make sure that the unique cluster.id
is given to each cluster in the broker config. This doesn't seem
too onerous as compared to the other effort SRE is doing to configure the
correct value of zookeeper in broker config. It should also be less onerous
than the effort needed to track the random cluster.id for every cluster so
that you can recover it if the znode is deleted.

Look, if users want to have unique cluster.id across a bunch of clusters,
it is probably the case that the user wants to monitor/audit all these
clusters in a central place. In this case the SRE who managers the
monitor/audit service is probably managing clusters config in a central
place, i.e. they should have a list of clusters together with there
zookeeper url, bootstrap server url. They probably already have a name for
these cluster so that they can refer to a cluster when talking to each
other.  It should be easy to provide a cluster.id in this list.



>2. If users choose a meaningful name for the cluster id, they may end up
>with the choice of having to live with a misleading id or having to
> change
>it (with the respective bad implications) as cluster usage evolves.
> Affects
>goal 2.
>

Please see my reply to requirement 2. I don't 

Re: Queryable state client read guarantees

2016-09-04 Thread Mikael Högqvist
Hi Eno,

thanks for the response and sorry for not getting back earlier. I think it
makes sense and the example is great! To make it possible to experiment
with the guarantees/semantics, I've created a tool available at:
https://github.com/mkhq/kafka-qs-verify. Basically it can be used to trace
read requests over time, e.g. one read per second using a client that
queries multiple instances.

By doing this I observed a couple of interesting things with the current
implementation. Note that these observations can also be a result of me
using kafka streams in the wrong way, e.g. assumptions, setup or tool
implementation. The topology the tool uses is counting strings and stores
them in an output table.

The experiments are setup with an input topic containing "hello" 5 times.
StoreUnavailable in the traces below means that the call to
streams.store(table) returned null and KeyNotFound happens when
store.get(key) is null. The first run of the tool creates a trace where the
store is unavailable followed by key not found and finally the correct
value of 5.

> Failed to read key hello, org.mkhq.kafka.Topology$StoreUnavailable
> Failed to read key hello, org.mkhq.kafka.Topology$KeyNotFound
> hello -> 5

The second run, i.e. a restart of the java app with the same app-id and
table name as the previous run, creates the following trace:

> Failed to read key hello, org.mkhq.kafka.Topology$StoreUnavailable
> Failed to read key hello, org.mkhq.kafka.Topology$KeyNotFound
> hello -> 10

I assumed that two things would be different:

a) KeyNotFound should not happen since the store was already initialized
and had associated hello with 5. This could violate the "never read an
older value"-rule if an instance restarts.

b) hello should still have the value 5.

Does a) have to do with the start-up sequence of the stream instance? E.g.
replay of the changelog for a table can take longer?

For b) it looks like the stream instance re-processes the input topic data.
Maybe it's related to setup? In this case, the consumer config
"AUTO_OFFSET_RESET_CONFIG" was not defined in the settings. Defining
"latest" results in only KeyNotFound.

I've also tried out a couple of other scenarios when querying multiple
instances with "random" restarts, but maybe its better if we start with the
simplest cases.

Thanks,
Mikael

> One more thing, there is an example of an end-to-end REST service that 
> demonstrates one possible
> way to query at 
> https://github.com/confluentinc/examples/tree/master/kafka-streams/src/main/java/io/confluent/examples/streams/queryablestate
> .
> The instructions on how to run are in QueryableStateExample.java.
>
> Thanks
> Eno
>
>
> > On 26 Aug 2016, at 18:07, Eno Thereska  wrote:
> >
> > Hi Mikael,
> >
> > Very good question. You are correct about the desired semantics.
> >
> > The semantic of case (a) depends on the local store as you mention. For 
> > case (b), the
> final check is always performed again on get(), and if the store has 
> disappeared between the
> lookup and get, the user will get an exception and will have to retry. The 
> state store in
> A does become invalid when the state is re-assigned. There isn't any other 
> way to detect the
> change, since we wanted to hide the system details (e.g., rebalance) from the 
> user.
> >
> > Does this make sense?
> >
> > Thanks
> > Eno
> >
> >> On 26 Aug 2016, at 16:26, Mikael Högqvist  wrote:
> >>
> >> Hi,
> >>
> >> I've tried to understand the implementation and APIs from KIP-67 and would
> >> like to know the possible semantics for read requests from a client
> >> perspective. As a developer of a queryable state client, the access
> >> semantics I would like to have (I think...) is one where subsequent reads
> >> always return the value from the last read or a newer value (if the state
> >> store is available). This should be independent of the current system
> >> configuration, e.g. re-balancing, failures etc. .
> >>
> >> A client-side get(k) can be implemented by starting with a lookup for the
> >> instances that store k followed by a retrieve of the value associated with
> >> k from the instances returned by the lookup. In the worst case we can
> >> always do scatter+gather over all instances.
> >>
> >> We can start by considering a get(k) under two failure-free cases: a)
> >> single instance and b) a system going from one instance to two instances. 
> >> In
> >> case a) the lookup will always return the same instance and the following
> >> get will read from a local store. The semantics in this case depends on the
> >> local store.
> >>
> >> For case b) the lookup returns instance A, but in between the lookup and
> >> the get, a new instance B is introduced to which k is transferred? Does the
> >> state store on A become invalid when the state is re-assigned? Is there
> >> another 

Re: [DISCUSS] KIP-63: Unify store and downstream caching in streams

2016-09-04 Thread Matthias J. Sax
> Processor code should always work; independently if caching is enabled
or not.

If we want to get this, I guess we need a quite different design (see (1)).

The point is, that we want to dedup the output, and not state updates.

It just happens that our starting point was KTable, for which state
updates and downstream changelog output is the same thing. Thus, we can
just use the internal KTable state to do the deduplication for the
downstream changelog.

However, from a general point of view (Processor API view), if we dedup
the output, we want dedup/caching for the processor (and not for a state
store). Of course, we need a state to do the dedup. For KTable, both
things merge into a single abstraction, and we use only a single state
instead of two. From a general point of view, we would need two states
though (one for the actual state, and one for dedup -- think Processor
API -- not DSL).


Alternative proposal 1:
(see also (2) -- which might be better than this one)

Thus, it might be a cleaner design to decouple user-states and
dedup-state from each other. If a user enables dedup/caching (for a
processor) we add an additional state to do the dedup and this
dedup-state is independent from all user states and context.forward()
works as always. The dedup state could be hidden from the user and could
be a pure in-memory state (no need for any recovery -- only flush on
commit). Internally, a context.forward() would call dedupState.put() and
trigger actual output if dedup state needs to evict records.

The disadvantage would be, that we end up with two states for KTable.
The advantage is, that deduplication can be switched off/on without any
Processor code change.


Alternative proposal 2:

We basically keep the current KIP design, including not to disable
context.forward() if a cached state is used. Additionally, for cached
state, we rename put() into putAndForward() which is only available for
cached states. Thus, in processor code, a state must be explicitly cast
into a cached state. We also make the user aware, that an update/put to
a state result in downstream output and that context.forward() would be
a "direct/non-cached" output.

The disadvantage of this is, that processor code is not independent from
caching and thus, caching cannot just be switched on/off (ie, we do not
follow the initial statement of this mail). The advantage is, we can
keep a single state for KTable and this design is just small changes to
the current KIP.



-Matthias


On 09/04/2016 07:10 PM, Matthias J. Sax wrote:
> Sure, you can use a non-cached state. However, if you write code like
> below for a non-cached state, and learn about caching later on, and
> think, caching is a cool feature, I want to use it, you would simply
> want to enable caching (without breaking your code).
> 
> Processor code should always work independently if caching is enabled or
> not.
> 
> -Matthias
> 
> On 09/04/2016 06:56 PM, Eno Thereska wrote:
>> Hi Matthias,
>>
>> Thanks for the good questions. 
>>
>> There is still the option of not using cached state. If one uses cached 
>> state it will dedup for stores and forwarding further. But you can always 
>> disable caching and do what you say.
>>
>> Eno
>>
>>> On 4 Sep 2016, at 17:36, Matthias J. Sax  wrote:
>>>
>>> Sorry for not being precise. What I meant be "completely" is for a
>>> single processor. Assume I want to have the following pattern:
>>>
>>>  process(...) {
>>>if (someCondition) {
>>>  state.put(...)
>>>  context.forward(...);
>>>} else {
>>>  context.forward(...);
>>>  }
>>>
>>> Ie, for some record I do update the state and emit output records, for
>>> other records I only emit output records. This work in current design.
>>> However, if a "cached state" would be used, it would not work any more.
>>>
>>>
>>> -Matthias
>>>
>>> On 09/04/2016 05:58 PM, Damian Guy wrote:
 Hi Matthias,

 Thanks for bringing the conversation across to the thread.

 I think a main limitation would be, that you cannot mix the 4 patterns
> within a single application anymore (iff you use a "caches state"). If
> you have processor with a "cached state" this disables direct usage of
> context.forward() completely -- if I understand the design correctly.
> Thus, if a "cached state" is used, forwarding is only possible via state
> updates.
>
>
 The above statement is not correct. Caching doesn't completely disable
 forwarding, it only disables it for Processors that are using State Stores.
 In all other cases context.forward() works as it does now.

 Thanks,
 Damian

>>>
>>
> 



signature.asc
Description: OpenPGP digital signature


Re: Plans to improve SSL performance in Kafka, for 0.10.x?

2016-09-04 Thread Todd Palino
We've been using SSL for produce traffic (mirror makers only right now, but
that's a very large percentage of traffic for us), and we're in the process
of turning it on for inter broker traffic as well. Our experience is that
this does not present a significant amount of overhead to the brokers.
Specifically with switching over the IBP, we were expecting a lot more of a
hit, and it really only ended up being something like a 5% increase in
system load, and no reduction in the cluster capacity, in our test cluster.
Note that this relies on the fix in KAFKA-4050 and switching the PRNG to
SHA1PRNG.

Right now, we're specifically avoiding moving consume traffic to SSL, due
to the zero copy send issue. Now I've been told (but I have not
investigated) that OpenSSL can solve this. It would probably be a good use
of time to look into that further.

That said, switching the message format to the newer option (KIP-31 I
believe?) will result in the brokers not needing to recompress message
batches that are produced. This should result in a significant reduction in
CPU usage, which may offset the cost of SSL. We haven't had a chance to
fully investigate this, however, as changing that config depends on the
clients being updated to support the new format.

-Todd

On Sunday, September 4, 2016, Jaikiran Pai  wrote:

> We are using 0.10.0.1 of Kafka and (Java) client libraries. We recently
> decided to start using SSL for Kafka communication between broker and
> clients. Right now, we have a pretty basic setup with just 1 broker with
> SSL keystore setup and the Java client(s) communicate using the
> Producer/Consumer APIs against this single broker. There's no client auth
> (intentionally) right now. We also have plain text enabled for the initial
> testing.
>
> What we have noticed is that the consumer/producer performance when SSL is
> enabled is noticeably poor when compared to plain text. I understand that
> there are expected to be performance impacts when SSL is enabled but the
> order of magnitude is too high and in fact it shows up in a very noticeable
> fashion in our product. I do have the numbers, but I haven't been able to
> narrow it down yet (because there's nothing that stands out, except that
> it's slow). Our application code is exactly the same between non-SSL and
> SSL usage.
>
> Furthermore, I'm aware of this specific JIRA in Kafka
> https://issues.apache.org/jira/browse/KAFKA-2561 which acknowledges a
> similar issue. So what I would like to know is, in context of Kafka 0.10.x
> releases and Java 8 support, are there any timelines that the dev team is
> looking for in terms of improving this performance issue (which I believe
> requires usage of OpenSSL or other non-JDK implementations of SSLEngine)?
> We would like to go for GA of our product in the next couple of months and
> in order to do that, we do plan to have Kafka over SSL working with
> reasonably good performance, but the current performance isn't promising.
> Expecting this to be fixed in the next couple of months and have it
> available in 0.10.x is probably too much to expect, but if we know the
> plans around this, we should be able to come up with a plan of our own for
> our product.
>
>
> -Jaikiran
>


-- 
*Todd Palino*
Staff Site Reliability Engineer
Data Infrastructure Streaming



linkedin.com/in/toddpalino


Re: [DISCUSS] KIP-63: Unify store and downstream caching in streams

2016-09-04 Thread Matthias J. Sax
Sure, you can use a non-cached state. However, if you write code like
below for a non-cached state, and learn about caching later on, and
think, caching is a cool feature, I want to use it, you would simply
want to enable caching (without breaking your code).

Processor code should always work independently if caching is enabled or
not.

-Matthias

On 09/04/2016 06:56 PM, Eno Thereska wrote:
> Hi Matthias,
> 
> Thanks for the good questions. 
> 
> There is still the option of not using cached state. If one uses cached state 
> it will dedup for stores and forwarding further. But you can always disable 
> caching and do what you say.
> 
> Eno
> 
>> On 4 Sep 2016, at 17:36, Matthias J. Sax  wrote:
>>
>> Sorry for not being precise. What I meant be "completely" is for a
>> single processor. Assume I want to have the following pattern:
>>
>>  process(...) {
>>if (someCondition) {
>>  state.put(...)
>>  context.forward(...);
>>} else {
>>  context.forward(...);
>>  }
>>
>> Ie, for some record I do update the state and emit output records, for
>> other records I only emit output records. This work in current design.
>> However, if a "cached state" would be used, it would not work any more.
>>
>>
>> -Matthias
>>
>> On 09/04/2016 05:58 PM, Damian Guy wrote:
>>> Hi Matthias,
>>>
>>> Thanks for bringing the conversation across to the thread.
>>>
>>> I think a main limitation would be, that you cannot mix the 4 patterns
 within a single application anymore (iff you use a "caches state"). If
 you have processor with a "cached state" this disables direct usage of
 context.forward() completely -- if I understand the design correctly.
 Thus, if a "cached state" is used, forwarding is only possible via state
 updates.


>>> The above statement is not correct. Caching doesn't completely disable
>>> forwarding, it only disables it for Processors that are using State Stores.
>>> In all other cases context.forward() works as it does now.
>>>
>>> Thanks,
>>> Damian
>>>
>>
> 



signature.asc
Description: OpenPGP digital signature


Plans to improve SSL performance in Kafka, for 0.10.x?

2016-09-04 Thread Jaikiran Pai
We are using 0.10.0.1 of Kafka and (Java) client libraries. We recently 
decided to start using SSL for Kafka communication between broker and 
clients. Right now, we have a pretty basic setup with just 1 broker with 
SSL keystore setup and the Java client(s) communicate using the 
Producer/Consumer APIs against this single broker. There's no client 
auth (intentionally) right now. We also have plain text enabled for the 
initial testing.


What we have noticed is that the consumer/producer performance when SSL 
is enabled is noticeably poor when compared to plain text. I understand 
that there are expected to be performance impacts when SSL is enabled 
but the order of magnitude is too high and in fact it shows up in a very 
noticeable fashion in our product. I do have the numbers, but I haven't 
been able to narrow it down yet (because there's nothing that stands 
out, except that it's slow). Our application code is exactly the same 
between non-SSL and SSL usage.


Furthermore, I'm aware of this specific JIRA in Kafka 
https://issues.apache.org/jira/browse/KAFKA-2561 which acknowledges a 
similar issue. So what I would like to know is, in context of Kafka 
0.10.x releases and Java 8 support, are there any timelines that the dev 
team is looking for in terms of improving this performance issue (which 
I believe requires usage of OpenSSL or other non-JDK implementations of 
SSLEngine)? We would like to go for GA of our product in the next couple 
of months and in order to do that, we do plan to have Kafka over SSL 
working with reasonably good performance, but the current performance 
isn't promising. Expecting this to be fixed in the next couple of months 
and have it available in 0.10.x is probably too much to expect, but if 
we know the plans around this, we should be able to come up with a plan 
of our own for our product.



-Jaikiran


Re: [DISCUSS] KIP-63: Unify store and downstream caching in streams

2016-09-04 Thread Damian Guy
Thanks for clarifying

On 4 September 2016 at 17:36, Matthias J. Sax  wrote:

> Sorry for not being precise. What I meant be "completely" is for a
> single processor. Assume I want to have the following pattern:
>
>   process(...) {
> if (someCondition) {
>   state.put(...)
>   context.forward(...);
> } else {
>   context.forward(...);
>   }
>
> Ie, for some record I do update the state and emit output records, for
> other records I only emit output records. This work in current design.
> However, if a "cached state" would be used, it would not work any more.
>
>
> -Matthias
>
> On 09/04/2016 05:58 PM, Damian Guy wrote:
> > Hi Matthias,
> >
> > Thanks for bringing the conversation across to the thread.
> >
> > I think a main limitation would be, that you cannot mix the 4 patterns
> >> within a single application anymore (iff you use a "caches state"). If
> >> you have processor with a "cached state" this disables direct usage of
> >> context.forward() completely -- if I understand the design correctly.
> >> Thus, if a "cached state" is used, forwarding is only possible via state
> >> updates.
> >>
> >>
> > The above statement is not correct. Caching doesn't completely disable
> > forwarding, it only disables it for Processors that are using State
> Stores.
> > In all other cases context.forward() works as it does now.
> >
> > Thanks,
> > Damian
> >
>
>


Re: [DISCUSS] KIP-63: Unify store and downstream caching in streams

2016-09-04 Thread Eno Thereska
Hi Matthias,

Thanks for the good questions. 

There is still the option of not using cached state. If one uses cached state 
it will dedup for stores and forwarding further. But you can always disable 
caching and do what you say.

Eno

> On 4 Sep 2016, at 17:36, Matthias J. Sax  wrote:
> 
> Sorry for not being precise. What I meant be "completely" is for a
> single processor. Assume I want to have the following pattern:
> 
>  process(...) {
>if (someCondition) {
>  state.put(...)
>  context.forward(...);
>} else {
>  context.forward(...);
>  }
> 
> Ie, for some record I do update the state and emit output records, for
> other records I only emit output records. This work in current design.
> However, if a "cached state" would be used, it would not work any more.
> 
> 
> -Matthias
> 
> On 09/04/2016 05:58 PM, Damian Guy wrote:
>> Hi Matthias,
>> 
>> Thanks for bringing the conversation across to the thread.
>> 
>> I think a main limitation would be, that you cannot mix the 4 patterns
>>> within a single application anymore (iff you use a "caches state"). If
>>> you have processor with a "cached state" this disables direct usage of
>>> context.forward() completely -- if I understand the design correctly.
>>> Thus, if a "cached state" is used, forwarding is only possible via state
>>> updates.
>>> 
>>> 
>> The above statement is not correct. Caching doesn't completely disable
>> forwarding, it only disables it for Processors that are using State Stores.
>> In all other cases context.forward() works as it does now.
>> 
>> Thanks,
>> Damian
>> 
> 



Re: [DISCUSS] KIP-63: Unify store and downstream caching in streams

2016-09-04 Thread Matthias J. Sax
Sorry for not being precise. What I meant be "completely" is for a
single processor. Assume I want to have the following pattern:

  process(...) {
if (someCondition) {
  state.put(...)
  context.forward(...);
} else {
  context.forward(...);
  }

Ie, for some record I do update the state and emit output records, for
other records I only emit output records. This work in current design.
However, if a "cached state" would be used, it would not work any more.


-Matthias

On 09/04/2016 05:58 PM, Damian Guy wrote:
> Hi Matthias,
> 
> Thanks for bringing the conversation across to the thread.
> 
> I think a main limitation would be, that you cannot mix the 4 patterns
>> within a single application anymore (iff you use a "caches state"). If
>> you have processor with a "cached state" this disables direct usage of
>> context.forward() completely -- if I understand the design correctly.
>> Thus, if a "cached state" is used, forwarding is only possible via state
>> updates.
>>
>>
> The above statement is not correct. Caching doesn't completely disable
> forwarding, it only disables it for Processors that are using State Stores.
> In all other cases context.forward() works as it does now.
> 
> Thanks,
> Damian
> 



signature.asc
Description: OpenPGP digital signature


[jira] [Commented] (KAFKA-3900) High CPU util on broker

2016-09-04 Thread Michael Saffitz (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15463149#comment-15463149
 ] 

Michael Saffitz commented on KAFKA-3900:


We're seeing a similar issue-- we have a 5 node kafka cluster, 4 of the 5 nodes 
have CPU around 65% but one is persistently pegged at 100%.  We get the same 
exception as above and see frequent shrink / expands on the ISRs.  Also on AWS 
w/ Amazon Linux.

> High CPU util on broker
> ---
>
> Key: KAFKA-3900
> URL: https://issues.apache.org/jira/browse/KAFKA-3900
> Project: Kafka
>  Issue Type: Bug
>  Components: network, replication
>Affects Versions: 0.10.0.0
> Environment: kafka = 2.11-0.10.0.0
> java version "1.8.0_91"
> amazon linux
>Reporter: Andrey Konyaev
>
> I start kafka cluster in amazon with m4.xlarge (4 cpu and 16 GB mem (14 
> allocate for kafka in heap)). Have three nodes.
> I haven't high load (6000 message/sec) and we have cpu_idle = 70%, but 
> sometime (about once a day) I see this message in server.log:
> [2016-06-24 14:52:22,299] WARN [ReplicaFetcherThread-0-2], Error in fetch 
> kafka.server.ReplicaFetcherThread$FetchRequest@6eaa1034 
> (kafka.server.ReplicaFetcherThread)
> java.io.IOException: Connection to 2 was disconnected before the response was 
> read
> at 
> kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:87)
> at 
> kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:84)
> at scala.Option.foreach(Option.scala:257)
> at 
> kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:84)
> at 
> kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:80)
> at 
> kafka.utils.NetworkClientBlockingOps$.recursivePoll$2(NetworkClientBlockingOps.scala:137)
> at 
> kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollContinuously$extension(NetworkClientBlockingOps.scala:143)
> at 
> kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$extension(NetworkClientBlockingOps.scala:80)
> at 
> kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:244)
> at 
> kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:229)
> at 
> kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
> at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:107)
> at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:98)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> I know, this can be network glitch, but why kafka eat all cpu time?
> My config:
> inter.broker.protocol.version=0.10.0.0
> log.message.format.version=0.10.0.0
> default.replication.factor=3
> num.partitions=3
> replica.lag.time.max.ms=15000
> broker.id=0
> listeners=PLAINTEXT://:9092
> log.dirs=/mnt/kafka/kafka
> log.retention.check.interval.ms=30
> log.retention.hours=168
> log.segment.bytes=1073741824
> num.io.threads=20
> num.network.threads=10
> num.partitions=1
> num.recovery.threads.per.data.dir=2
> socket.receive.buffer.bytes=102400
> socket.request.max.bytes=104857600
> socket.send.buffer.bytes=102400
> zookeeper.connection.timeout.ms=6000
> delete.topic.enable = true
> broker.max_heap_size=10 GiB 
>   
> Any ideas?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-78: Cluster Id

2016-09-04 Thread Ismael Juma
Hi Dong,

Sorry for the delay, I was offline for 24 hours. Thankfully Sumit was
around to explain the reasoning for the current approach. From reading the
thread, it seems like it may help to reiterate a few important goals for
the auditing use case where there is a requirement to associate a message
with a particular cluster:

   1. Each cluster id should be unique.
   2. The cluster id should be immutable. If the cluster id can be changed
   as a matter of course, it's hard to provide any guarantees. Assigning a
   meaningful name for the id makes it more likely that you may need to change
   it whereas decoupling the immutable id from the meaningful/human-readable
   name eliminates the issue.
   3. Every cluster should have an id. An optional cluster id makes the
   downstream code more complex and makes the feature less useful.
   4. No new mandatory configs should be introduced. We generally avoid
   doing adding mandatory configs as they make it harder to get started with
   Kafka and upgrades become more complex. I've added this to the KIP as it
   was previously missing. Thanks for the reminder.

With that in mind, let's look at your proposal.

When Kafka starts, it reads cluster.id from config. And then it reads
> cluster.id from zookeeper.
> - if the cluster.id is not specified in zookeeper, create the znode.
> - if the cluster.id is specified in zookeeper
>- if the cluster.id in znode is the same as the that in config, proceed
>- Otherwise, broker startup fails and it reports error. Note that we
> don't make this change after the startup.


The concerns I have are:

   1. The same id can be given to two different clusters and we have no
   easy way to detect this (as each cluster may be using a completely
   different ZooKeeper ensemble). Affects goal 1.
   2. If users choose a meaningful name for the cluster id, they may end up
   with the choice of having to live with a misleading id or having to change
   it (with the respective bad implications) as cluster usage evolves. Affects
   goal 2.
   3. If the cluster.id config is optional, it goes against goal 3. On the
   other hand, if it is mandatory, it goes against goal 4.
   4. It is a bit odd to configure a cluster property via a broker config
   from a semantics/architecture perspective. Having an optional cluster.id
   broker config could make sense as a way to validate that the broker is
   connecting to the right cluster[1], but it's weird to _define_ the cluster
   id in this way.

*How can we change cluster.id :*
> - Update kafka broker config to use the new cluster.id
> - Either delete that znode from zookeeper, or update kafka broker config to
> use a new zookeeper which doesn't have that znode
> - Do a rolling bounce


Again, I think changing ids is something to be avoided, but for the cases
where one really wants that, it seems to me that it's easier with the
current KIP as you don't need to update the config in every broker.

Does that help at all?

Thanks,
Ismael

[1] It's listed as one of the possible approaches in point 3 of the
potential future work section (the other is to store the cluster id after
the first connection).

On Sun, Sep 4, 2016 at 2:53 AM, Dong Lin  wrote:

> Hey Sumit,
>
> I have no doubt that there are benefits with using tags. But the usage of
> tags is actually orthogonal to the usage of cluster.id. I am not sure the
> benefits of using tags that you provided can help us decide whether
> randomly generated cluster.id is better than readable cluster.id from
> config.
>
> In addition, it is hard to say evaluate your suggested approach until we
> know the goal and implementation detail of this approach. There are some
> interested questions regarding your approach. Let me list some of them:
>
> - Using readable cluster.id doesn't rule out using tags. Would it be
> better
> to use readable cluster.id + readable tags than random cluster.id +
> readable tags?
> -  Do you even need cluster-id to distinguish between clusters if you have
> tags?
> - Are you going to include both the random cluster-id and tags in the
> sensor name?
>
> I am happy to discuss this approach in more detail if you can provide the
> goal and motivation in either this KIP or a new KIP.
>
> Thanks,
> Dong
>
>
> On Sat, Sep 3, 2016 at 5:32 PM, sumit arrawatia  >
> wrote:
>
> > Hi Dong,
> >
> > Please find my comments inline.
> >
> > Hopefully they address your concerns.
> >
> > Have a great weekend !
> > Sumit
> >
> > On Sat, Sep 3, 2016 at 3:17 PM, Dong Lin  wrote:
> >
> > > Hi Sumit,
> > >
> > > Please see my comments inline.
> > >
> > > On Sat, Sep 3, 2016 at 10:33 AM, sumit arrawatia <
> > > sumit.arrawa...@gmail.com>
> > > wrote:
> > >
> > > > Hi Dong,
> > > >
> > > > Please see my comments inline.
> > > >
> > > > Sumit
> > > >
> > > > On Sat, Sep 3, 2016 at 9:26 AM, Dong Lin 
> wrote:
> > > >
> > > > > Hey Sumit,
> > > 

Re: [DISCUSS] KIP-63: Unify store and downstream caching in streams

2016-09-04 Thread Damian Guy
Hi Matthias,

Thanks for bringing the conversation across to the thread.

I think a main limitation would be, that you cannot mix the 4 patterns
> within a single application anymore (iff you use a "caches state"). If
> you have processor with a "cached state" this disables direct usage of
> context.forward() completely -- if I understand the design correctly.
> Thus, if a "cached state" is used, forwarding is only possible via state
> updates.
>
>
The above statement is not correct. Caching doesn't completely disable
forwarding, it only disables it for Processors that are using State Stores.
In all other cases context.forward() works as it does now.

Thanks,
Damian


Re: [DISCUSS] KIP-63: Unify store and downstream caching in streams

2016-09-04 Thread Matthias J. Sax
I think a main limitation would be, that you cannot mix the 4 patterns
within a single application anymore (iff you use a "caches state"). If
you have processor with a "cached state" this disables direct usage of
context.forward() completely -- if I understand the design correctly.
Thus, if a "cached state" is used, forwarding is only possible via state
updates.

The above described approach is fine from DSL point of view. The main
question is, if a "cached state" should be a DSL internal implementation
detail or should be exposed to the user for Processor API reuse. For the
former, the design is fine; for the latter, IMHO it puts a limitation
and hard to understand usage pattern for a regular user of Processor API.

-Matthias


On 09/04/2016 05:28 PM, Matthias J. Sax wrote:
> We had a recent discussion about KIP-63, and I just c from the JIRA
> discussion:
> 
> Damian:
>> During the code walk-through, Matthias raised a very good point about the 
>> use of context().forward being coupled to whether or not caching is enabled. 
>> Now that i've had the chance to think about it I have one potential solution 
>> for making this transparent to uses of the Processor API.
>>
>> We can add another method boolean isCachingEnabled() to the new interface 
>> ForwardingStateStoreSupplier. We also add 2 new methods to ProcessorNode:
>> boolean isStateStoreCachingEnabled() and void setStateStoreCachingEnabled()
>>
>> In TopologyBuilder when we are creating the ProcessorNodeCacheFlushListener 
>> to attach to the ForwardingStateStoreSupplier we can call 
>> ProcessorNode.setStateStoreCachingEnabled(supplier.isStateStoreCachingEnabled())
>>
>> We add an extra boolean parameter to the ProcessorRecordContextImpl forward 
>> this will be set to false when constructed from StreamTask and will be set 
>> to true when constructed from ProcessorNodeCacheFlushListener. Then in 
>> ProcessorRecordContextImpl.forward(..) we add a guard if (shouldForward()) 
>> where shouldForward is return forward || !node.stateStoreCachingEnabled();
>>
>> Now Processors are free to call context().forward(..) whether caching is 
>> enabled or not. If it is enabled the values just wont get forwarded until 
>> the cache evicts/flushes them.
> 
> 
> Matthias:
>> I guess this is a good solution/workaround. I had something like this in my 
>> mind during the call, too.
>>
>> However, thinking about the root cause of this issue again, I am not sure if 
>> the (overall) design of this KIP is optimal or not. My new concern is, that 
>> with this caching strategy, we "merge" two concepts into one; and I am not 
>> sure, if we should to this.
>>
>> Currently, message flow and state is decoupled and independent of each 
>> other. Thus, if there is a state, updates to the state are completely 
>> independent from emitting output records. With the new design, we merge 
>> state updates and record emits, limiting the overall flexibility. I guess, 
>> from a DSL point of view, this would not be problematic, because in an 
>> aggregation and changelog output, each update to the state should result in 
>> a downstream record. However, from a Processor API point of view, there are 
>> other patterns we want to be able to support, too.
>>
>> Basically, for each input record, there a four different patterns that could 
>> be applied by the user:
>>
>> no state updates, no output records
>> only state update
>> only output records
>> state updates and output records
>>
>> Right now, we go with a design that allows to use one of the patterns within 
>> a Processor. However, all 4 pattern could be mixed within a single Processor 
>> (pre KIP design), and this mixture would not be possible any more. If we 
>> want to support all four cases, we might not want to merge both into "a 
>> single abstraction" as we do in the design of this PR. What if a user just 
>> wants to sent a record downstream (without any state manipulation)?
>>
>> Going back to the KIP design, we move the cache from RocksDB into the 
>> processor. However, what we actually wanted to do was to de-duplicate output 
>> records. Thus, the newly introduced cache, could actually go "after the 
>> processor" and could be completely independent from the state. Thus, on each 
>> call to forward() the record is put into the cache, and if the cache is 
>> full, an actual cache eviction and record forwarding happens. This would 
>> make the de-duplication cache independent from the state.
> 
> 
> Eno:
>> it's not entirely true that the flexibility is limited. For example, what's 
>> next in implementation is https://issues.apache.org/jira/browse/KAFKA-3779 
>> where we add the dedup cache to the to operator. That is not implemented yet.
> 
> 
> Damian:
>> i think of the 4 patterns you mentioned only the last one changes, i.e, 
>> state updates and output records.
>> context.forward() still exists so you can just send a record downstream 
>> without any state manipulation, that behaviour hasn't 

Re: [DISCUSS] KIP-63: Unify store and downstream caching in streams

2016-09-04 Thread Matthias J. Sax
We had a recent discussion about KIP-63, and I just c from the JIRA
discussion:

Damian:
> During the code walk-through, Matthias raised a very good point about the use 
> of context().forward being coupled to whether or not caching is enabled. Now 
> that i've had the chance to think about it I have one potential solution for 
> making this transparent to uses of the Processor API.
> 
> We can add another method boolean isCachingEnabled() to the new interface 
> ForwardingStateStoreSupplier. We also add 2 new methods to ProcessorNode:
> boolean isStateStoreCachingEnabled() and void setStateStoreCachingEnabled()
> 
> In TopologyBuilder when we are creating the ProcessorNodeCacheFlushListener 
> to attach to the ForwardingStateStoreSupplier we can call 
> ProcessorNode.setStateStoreCachingEnabled(supplier.isStateStoreCachingEnabled())
> 
> We add an extra boolean parameter to the ProcessorRecordContextImpl forward 
> this will be set to false when constructed from StreamTask and will be set to 
> true when constructed from ProcessorNodeCacheFlushListener. Then in 
> ProcessorRecordContextImpl.forward(..) we add a guard if (shouldForward()) 
> where shouldForward is return forward || !node.stateStoreCachingEnabled();
> 
> Now Processors are free to call context().forward(..) whether caching is 
> enabled or not. If it is enabled the values just wont get forwarded until the 
> cache evicts/flushes them.


Matthias:
> I guess this is a good solution/workaround. I had something like this in my 
> mind during the call, too.
> 
> However, thinking about the root cause of this issue again, I am not sure if 
> the (overall) design of this KIP is optimal or not. My new concern is, that 
> with this caching strategy, we "merge" two concepts into one; and I am not 
> sure, if we should to this.
> 
> Currently, message flow and state is decoupled and independent of each other. 
> Thus, if there is a state, updates to the state are completely independent 
> from emitting output records. With the new design, we merge state updates and 
> record emits, limiting the overall flexibility. I guess, from a DSL point of 
> view, this would not be problematic, because in an aggregation and changelog 
> output, each update to the state should result in a downstream record. 
> However, from a Processor API point of view, there are other patterns we want 
> to be able to support, too.
> 
> Basically, for each input record, there a four different patterns that could 
> be applied by the user:
> 
> no state updates, no output records
> only state update
> only output records
> state updates and output records
> 
> Right now, we go with a design that allows to use one of the patterns within 
> a Processor. However, all 4 pattern could be mixed within a single Processor 
> (pre KIP design), and this mixture would not be possible any more. If we want 
> to support all four cases, we might not want to merge both into "a single 
> abstraction" as we do in the design of this PR. What if a user just wants to 
> sent a record downstream (without any state manipulation)?
> 
> Going back to the KIP design, we move the cache from RocksDB into the 
> processor. However, what we actually wanted to do was to de-duplicate output 
> records. Thus, the newly introduced cache, could actually go "after the 
> processor" and could be completely independent from the state. Thus, on each 
> call to forward() the record is put into the cache, and if the cache is full, 
> an actual cache eviction and record forwarding happens. This would make the 
> de-duplication cache independent from the state.


Eno:
> it's not entirely true that the flexibility is limited. For example, what's 
> next in implementation is https://issues.apache.org/jira/browse/KAFKA-3779 
> where we add the dedup cache to the to operator. That is not implemented yet.


Damian:
> i think of the 4 patterns you mentioned only the last one changes, i.e, state 
> updates and output records.
> context.forward() still exists so you can just send a record downstream 
> without any state manipulation, that behaviour hasn't changed.






On 08/24/2016 03:35 PM, Eno Thereska wrote:
> Hi folks,
> 
> We've been working on a proof-of-concept for KIP-63 and that can now be
> found at the main JIRA (https://issues.apache.org/jira/browse/KAFKA-3776)
> under PR https://github.com/apache/kafka/pull/1752. It is still work in
> progress, however we are confident that the basic structure is there.
> 
> As part of this work, we've also updated the KIP to clarify several things,
> listed here for convenience:
> 
> - Clarify that the optimization is applicable to aggregations and to
> operators. It is not applicable to joins.
> - Clarify that for the low-level Processor API, we propose to allow users
> for disabling caching on a store-by-store basis using a new
> .enableCaching() call.
> 
> We'll start the voting process shortly for this KIP.
> 
> Thanks
> Eno
> 
> 
> On Thu, Jun 2, 2016 at 

WARN log message flooding broker logs for a pretty typical SSL setup

2016-09-04 Thread Jaikiran Pai
We just started enabling SSL for our Kafka brokers and (Java) clients 
and among some of the issues we are running into, one of them is the 
flooding of the server/broker Kafka logs where we are seeing these 
messages:


[2016-09-02 08:07:13,773] WARN SSL peer is not authenticated, returning 
ANONYMOUS instead (org.apache.kafka.common.network.SslTransportLayer)
[2016-09-02 08:07:15,710] WARN SSL peer is not authenticated, returning 
ANONYMOUS instead (org.apache.kafka.common.network.SslTransportLayer)
[2016-09-02 08:07:15,711] WARN SSL peer is not authenticated, returning 
ANONYMOUS instead (org.apache.kafka.common.network.SslTransportLayer)
[2016-09-02 08:07:15,711] WARN SSL peer is not authenticated, returning 
ANONYMOUS instead (org.apache.kafka.common.network.SslTransportLayer)
[2016-09-02 08:07:15,712] WARN SSL peer is not authenticated, returning 
ANONYMOUS instead (org.apache.kafka.common.network.SslTransportLayer)



They just keep going on and on. In our SSL setup, we have the broker 
configured with the keystore and the Java clients have been configured 
with a proper truststore and all works fine except for these messages 
flooding the logs. We don't have any ACLs setup nor have we enabled 
client auth check.


Looking at the code which generates this WARN message 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java#L638 
and the fact that the setup we have (where we just enable server/broker 
cert validation) is, IMO, a valid scenario and not some 
exceptional/incorrect setup issue, I think this log message is something 
that can be removed from the code (or at least logged at a very lower 
level given the frequency at which this gets logged)


Any thoughts on this?

It's a pretty straightforward change and if this change is something 
that sounds right, I can go ahead and submit a PR.


P.S: This is both on 0.9.0.1 and latest 0.10.0.1.

-Jaikiran



Re: [DISCUSS] KIP-78: Cluster Id

2016-09-04 Thread Ismael Juma
Hi Flavio,

Thanks for reviewing the KIP. Comments inline.

On Sat, Sep 3, 2016 at 4:48 PM, Flavio Junqueira  wrote:

> Thanks for the KIP, Ismael, looks great. I have just a couple of comments
> and questions:
>
> - I assume the znode containing the cluster id is persistent. Is there
> ever the need to delete that znode to force a new instance id, and if so,
> how is it expected to happen? I'm thinking about a scenario in which I have
> a cluster, I wipe out intentionally to reset my state, and I want to start
> with a fresh instance of the cluster. In such a case, it should be as
> simple as deleting the znode manually, so I'm just wondering if this is
> what we are going to expect from users. The alternative is to assume that
> the zookeeper ensemble is also going to be wiped out in this case, or at
> least the corresponding sub-tree.
>

Yes, my expectation is that if you want to start from scratch, you'd delete
the corresponding sub-tree (which will include the cluster id).


> - I'm not actually suggesting that we do things manually, but I can see
> some benefit in manually creating the cluster id znode and have brokers
> only check that the znode is there before starting.
>

Yes, I agree that there are some benefits in doing it this way. The reason
why we do it automatically is we didn't want to add an additional step for
people who are starting a Kafka cluster.


> - One of the messages mentions changing names, which is a good point. If
> it isn't necessary for correctness that all brokers have either no id or
> the same id, then I'd think it is fine to just watch that znode for
> changes. Brokers that are connect to zk will receive eventually a
> notification. But, you point out that changing the name for a working
> cluster isn't necessary a desirable feature, so this might not be needed.
>

Indeed, the idea is that this id should never change for a working cluster
(i.e. it should be unique and immutable). There are exceptions like the
scenario you pointed out: a user wants to intentionally reset the state and
start a fresh instance of the cluster (I think one could look at this as
starting a new cluster though).

Ismael