[jira] [Commented] (KAFKA-3904) File descriptor leaking (Too many open files) for long running stream process

2016-06-27 Thread Henry Cai (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3904?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15352387#comment-15352387
 ] 

Henry Cai commented on KAFKA-3904:
--

Please take a look at this PR for the fix:

https://github.com/apache/kafka/pull/1563

> File descriptor leaking (Too many open files) for long running stream process
> -
>
> Key: KAFKA-3904
> URL: https://issues.apache.org/jira/browse/KAFKA-3904
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Henry Cai
>Assignee: Henry Cai
>  Labels: architecture, newbie
>
> I noticed when my application was running long (> 1 day), I will get 'Too 
> many open files' error.
> I used 'lsof' to list all the file descriptors used by the process, it's over 
> 32K, but most of them belongs to the .lock file, e.g. this same lock file 
> shows 2700 times.
> I looked at the code, I think the problem is in:
> File lockFile = new File(stateDir, ProcessorStateManager.LOCK_FILE_NAME);
> FileChannel channel = new RandomAccessFile(lockFile, "rw").getChannel();
> Each time new RandomAccessFile is called, a new fd will be created, we 
> probably should either close or reuse this RandomAccessFile object.
> lsof result:
> java14799 hcai *740u   REG9,00 2415928585 
> /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock
> java14799 hcai *743u   REG9,00 2415928585 
> /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock
> java14799 hcai *746u   REG9,00 2415928585 
> /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock
> java14799 hcai *755u   REG9,00 2415928585 
> /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock
> hcai@teststream02001:~$ lsof -p 14799 | grep lock | grep 0_16  | wc
>2709   24381  319662



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3904) File descriptor leaking (Too many open files) for long running stream process

2016-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3904?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15352386#comment-15352386
 ] 

ASF GitHub Bot commented on KAFKA-3904:
---

GitHub user HenryCaiHaiying opened a pull request:

https://github.com/apache/kafka/pull/1563

KAFKA-3904: File descriptor leaking (Too many open files) for long ru…

…nning stream process

I noticed when my application was running for more than one day, I will get 
'Too many open files' error.

I used 'lsof' to list all the file descriptors used by the process, it's 
over 32K, but most of them belongs to the .lock file, e.g. a single lock file 
shows 2700 times.

I looked at the code, I think the problem is in:
FileChannel channel = new RandomAccessFile(lockFile, "rw").getChannel();
Each time new RandomAccessFile is called, a new fd will be created.

Fix this by caching the FileChannels we created so far.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/HenryCaiHaiying/kafka fd

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1563.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1563


commit bb68fb8b820e4a0baef0c549ea1a4a8cfc913187
Author: Henry Cai 
Date:   2016-06-28T05:24:03Z

KAFKA-3904: File descriptor leaking (Too many open files) for long running 
stream process

I noticed when my application was running for more than one day, I will get 
'Too many open files' error.

I used 'lsof' to list all the file descriptors used by the process, it's 
over 32K, but most of them belongs to the .lock file, e.g. a single lock file 
shows 2700 times.

I looked at the code, I think the problem is in:
FileChannel channel = new RandomAccessFile(lockFile, "rw").getChannel();
Each time new RandomAccessFile is called, a new fd will be created.

Fix this by caching the FileChannels we created so far.




> File descriptor leaking (Too many open files) for long running stream process
> -
>
> Key: KAFKA-3904
> URL: https://issues.apache.org/jira/browse/KAFKA-3904
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Henry Cai
>Assignee: Henry Cai
>  Labels: architecture, newbie
>
> I noticed when my application was running long (> 1 day), I will get 'Too 
> many open files' error.
> I used 'lsof' to list all the file descriptors used by the process, it's over 
> 32K, but most of them belongs to the .lock file, e.g. this same lock file 
> shows 2700 times.
> I looked at the code, I think the problem is in:
> File lockFile = new File(stateDir, ProcessorStateManager.LOCK_FILE_NAME);
> FileChannel channel = new RandomAccessFile(lockFile, "rw").getChannel();
> Each time new RandomAccessFile is called, a new fd will be created, we 
> probably should either close or reuse this RandomAccessFile object.
> lsof result:
> java14799 hcai *740u   REG9,00 2415928585 
> /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock
> java14799 hcai *743u   REG9,00 2415928585 
> /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock
> java14799 hcai *746u   REG9,00 2415928585 
> /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock
> java14799 hcai *755u   REG9,00 2415928585 
> /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock
> hcai@teststream02001:~$ lsof -p 14799 | grep lock | grep 0_16  | wc
>2709   24381  319662



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1563: KAFKA-3904: File descriptor leaking (Too many open...

2016-06-27 Thread HenryCaiHaiying
GitHub user HenryCaiHaiying opened a pull request:

https://github.com/apache/kafka/pull/1563

KAFKA-3904: File descriptor leaking (Too many open files) for long ru…

…nning stream process

I noticed when my application was running for more than one day, I will get 
'Too many open files' error.

I used 'lsof' to list all the file descriptors used by the process, it's 
over 32K, but most of them belongs to the .lock file, e.g. a single lock file 
shows 2700 times.

I looked at the code, I think the problem is in:
FileChannel channel = new RandomAccessFile(lockFile, "rw").getChannel();
Each time new RandomAccessFile is called, a new fd will be created.

Fix this by caching the FileChannels we created so far.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/HenryCaiHaiying/kafka fd

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1563.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1563


commit bb68fb8b820e4a0baef0c549ea1a4a8cfc913187
Author: Henry Cai 
Date:   2016-06-28T05:24:03Z

KAFKA-3904: File descriptor leaking (Too many open files) for long running 
stream process

I noticed when my application was running for more than one day, I will get 
'Too many open files' error.

I used 'lsof' to list all the file descriptors used by the process, it's 
over 32K, but most of them belongs to the .lock file, e.g. a single lock file 
shows 2700 times.

I looked at the code, I think the problem is in:
FileChannel channel = new RandomAccessFile(lockFile, "rw").getChannel();
Each time new RandomAccessFile is called, a new fd will be created.

Fix this by caching the FileChannels we created so far.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Comment Edited] (KAFKA-3905) remove null from subscribed topics in KafkaConsumer#subscribe

2016-06-27 Thread Xing Huang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15352378#comment-15352378
 ] 

Xing Huang edited comment on KAFKA-3905 at 6/28/16 5:22 AM:


I think IllegalArgumentException with an error message will be enough. And, 
empty string should cause an exception too. 


was (Author: peoplebike):
I think IllegalArgumentException with an error message will be enough. And, 
empty string cause an exception too. 

> remove null from subscribed topics  in KafkaConsumer#subscribe
> --
>
> Key: KAFKA-3905
> URL: https://issues.apache.org/jira/browse/KAFKA-3905
> Project: Kafka
>  Issue Type: Wish
>  Components: clients
>Affects Versions: 0.10.0.0
>Reporter: Xing Huang
>Assignee: Rekha Joshi
>Priority: Minor
>
> Currently, KafkaConsumer's subscribe methods accept Collection as 
> topics to be subscribed, but a Collection may have null as its element. For 
> example
> {code}
> String topic = null;
> Collection topics = Arrays.asList(topic);
> consumer.subscribe(topics)
> {code}
> When this happens, consumer will throw a puzzling NullPointerException:
> {code}
>   at org.apache.kafka.common.utils.Utils.utf8Length(Utils.java:245)
>   at org.apache.kafka.common.protocol.types.Type$6.sizeOf(Type.java:248)
>   at 
> org.apache.kafka.common.protocol.types.ArrayOf.sizeOf(ArrayOf.java:85)
>   at org.apache.kafka.common.protocol.types.Schema.sizeOf(Schema.java:89)
>   at org.apache.kafka.common.protocol.types.Struct.sizeOf(Struct.java:244)
>   at 
> org.apache.kafka.common.requests.RequestSend.serialize(RequestSend.java:35)
>   at 
> org.apache.kafka.common.requests.RequestSend.(RequestSend.java:29)
>   at 
> org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.request(NetworkClient.java:616)
>   at 
> org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:639)
>   at 
> org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:552)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:258)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:179)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:970)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:934)
> {code}
> Maybe it's better to remove null when doing subscription.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3905) remove null from subscribed topics in KafkaConsumer#subscribe

2016-06-27 Thread Xing Huang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15352378#comment-15352378
 ] 

Xing Huang commented on KAFKA-3905:
---

I think IllegalArgumentException with an error message will be enough. And, 
empty string cause an exception too. 

> remove null from subscribed topics  in KafkaConsumer#subscribe
> --
>
> Key: KAFKA-3905
> URL: https://issues.apache.org/jira/browse/KAFKA-3905
> Project: Kafka
>  Issue Type: Wish
>  Components: clients
>Affects Versions: 0.10.0.0
>Reporter: Xing Huang
>Assignee: Rekha Joshi
>Priority: Minor
>
> Currently, KafkaConsumer's subscribe methods accept Collection as 
> topics to be subscribed, but a Collection may have null as its element. For 
> example
> {code}
> String topic = null;
> Collection topics = Arrays.asList(topic);
> consumer.subscribe(topics)
> {code}
> When this happens, consumer will throw a puzzling NullPointerException:
> {code}
>   at org.apache.kafka.common.utils.Utils.utf8Length(Utils.java:245)
>   at org.apache.kafka.common.protocol.types.Type$6.sizeOf(Type.java:248)
>   at 
> org.apache.kafka.common.protocol.types.ArrayOf.sizeOf(ArrayOf.java:85)
>   at org.apache.kafka.common.protocol.types.Schema.sizeOf(Schema.java:89)
>   at org.apache.kafka.common.protocol.types.Struct.sizeOf(Struct.java:244)
>   at 
> org.apache.kafka.common.requests.RequestSend.serialize(RequestSend.java:35)
>   at 
> org.apache.kafka.common.requests.RequestSend.(RequestSend.java:29)
>   at 
> org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.request(NetworkClient.java:616)
>   at 
> org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:639)
>   at 
> org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:552)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:258)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:179)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:970)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:934)
> {code}
> Maybe it's better to remove null when doing subscription.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3089) VerifiableProducer should do a clean shutdown in stop_node()

2016-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15352205#comment-15352205
 ] 

ASF GitHub Bot commented on KAFKA-3089:
---

Github user lindong28 closed the pull request at:

https://github.com/apache/kafka/pull/755


> VerifiableProducer should do a clean shutdown in stop_node()
> 
>
> Key: KAFKA-3089
> URL: https://issues.apache.org/jira/browse/KAFKA-3089
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>
> VerifiableProducer is closed by SIGKILL when stop_node() is called. For this 
> reason, when stop_producer_and_consumer() is invoked in 
> ProduceConsumeValidateTest, VerifiableProducer is killed immediately without 
> allowing it to wait for acknowledgement. The reported number of messages 
> produced by VerifiableProducer will thus be much smaller than the reported 
> number of messages consumed by consumer, causing confusion to developers.
> For almost all other services, such as VerifiableConsumer and 
> ConsoleConsumer, we send SIGINT when stop_node() is called. It is not clear 
> why VerifiableProducer is different from them.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #755: KAFKA-3089; VerifiableProducer should do a clean sh...

2016-06-27 Thread lindong28
Github user lindong28 closed the pull request at:

https://github.com/apache/kafka/pull/755


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-3522) Consider adding version information into rocksDB storage format

2016-06-27 Thread Ishita Mandhan (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15352172#comment-15352172
 ] 

Ishita Mandhan commented on KAFKA-3522:
---

I'd like to work on this once a consensus has been reached and if this is 
something suitable for someone new to kafka (might end up asking a lot of 
questions here :) )
[~guozhang] When you say version indicator file, are you thinking of some sort 
of file that simply stores the rocksdb version number as a json and perform a 
check during startup in KTableStoreSupplier.java? Or a file that stores the 
various configurable values (currently hardcoded) in RocksDBStore.java? Or both?

> Consider adding version information into rocksDB storage format
> ---
>
> Key: KAFKA-3522
> URL: https://issues.apache.org/jira/browse/KAFKA-3522
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Ishita Mandhan
>  Labels: architecture
> Fix For: 0.10.1.0
>
>
> Kafka Streams does not introduce any modifications to the data format in the 
> underlying Kafka protocol, but it does use RocksDB for persistent state 
> storage, and currently its data format is fixed and hard-coded. We want to 
> consider the evolution path in the future we we change the data format, and 
> hence having some version info stored along with the storage file / directory 
> would be useful.
> And this information could be even out of the storage file; for example, we 
> can just use a small "version indicator" file in the rocksdb directory for 
> this purposes. Thoughts? [~enothereska] [~jkreps]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3885) Kafka new producer cannot failover

2016-06-27 Thread wateray (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3885?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15352163#comment-15352163
 ] 

wateray commented on KAFKA-3885:


[~junrao] Is it a defect? 

> Kafka new producer cannot failover
> --
>
> Key: KAFKA-3885
> URL: https://issues.apache.org/jira/browse/KAFKA-3885
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.0, 0.8.2.2, 0.9.0.1, 0.10.0.0
>Reporter: wateray
>
> This bug can reproduce by the following steps.
> The cluster has 2 brokers.
>  a) start a new producer, then send messages, it works well.
>  b) Then kill one broker,  it works well.
>  c) Then restart the broker,  it works well.
>  d) Then kill the other broker,  the producer can't failover.
> The the producer print log infinity.
> org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) 
> expired due to timeout while requesting metadata from brokers for 
> lwb_test_p50_r2-29
> 
> When producer sends msg, it detected that metadata should update.
> But at this code, class: NetworkClient ,method: leastLoadedNode
> List nodes = this.metadataUpdater.fetchNodes();
> nodes only return one result, and the returned node is the killed node, so 
> the producer cannot failover!



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3904) File descriptor leaking (Too many open files) for long running stream process

2016-06-27 Thread Henry Cai (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3904?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15352126#comment-15352126
 ] 

Henry Cai commented on KAFKA-3904:
--

I took a look at FileChannel.open, looks like it will still create a file 
descriptor for that channel, so the underlying problem of creating too many 
file descriptors are still there.

I am not hundred percent sure we can use this new FileChannel.open() since it 
relies on underlying FileSystemProvider.newFileChannel() and some of the 
implementations throws NotSupportedOperationException.

I think I will still use the traditional RandomAccessFile.getChannel and post a 
PR for this.


> File descriptor leaking (Too many open files) for long running stream process
> -
>
> Key: KAFKA-3904
> URL: https://issues.apache.org/jira/browse/KAFKA-3904
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Henry Cai
>Assignee: Henry Cai
>  Labels: architecture, newbie
>
> I noticed when my application was running long (> 1 day), I will get 'Too 
> many open files' error.
> I used 'lsof' to list all the file descriptors used by the process, it's over 
> 32K, but most of them belongs to the .lock file, e.g. this same lock file 
> shows 2700 times.
> I looked at the code, I think the problem is in:
> File lockFile = new File(stateDir, ProcessorStateManager.LOCK_FILE_NAME);
> FileChannel channel = new RandomAccessFile(lockFile, "rw").getChannel();
> Each time new RandomAccessFile is called, a new fd will be created, we 
> probably should either close or reuse this RandomAccessFile object.
> lsof result:
> java14799 hcai *740u   REG9,00 2415928585 
> /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock
> java14799 hcai *743u   REG9,00 2415928585 
> /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock
> java14799 hcai *746u   REG9,00 2415928585 
> /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock
> java14799 hcai *755u   REG9,00 2415928585 
> /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock
> hcai@teststream02001:~$ lsof -p 14799 | grep lock | grep 0_16  | wc
>2709   24381  319662



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3705) Support non-key joining in KTable

2016-06-27 Thread Jan Filipiak (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15352103#comment-15352103
 ] 

Jan Filipiak commented on KAFKA-3705:
-

I will just shoot a quick reply now, time somehow became sparse recently. 
Anyhow. The bottom line of our misunderstandings is always the same thing. My 
bad that I didn't see the wiki page, if that Range-query interface is addressed 
that's nice :D.

Point 3 is the one that causes the most confusion I guess. In the repartition 
case we follow different pathes, where I am not sure that I was able to 
communicate mine well enough. I <3 the idea of having everything a derived 
store. ITE all this is beeing used to tail -F mysql-.bin | kafka | XXX | 
redis, therefore Redis become a derived store of mysql wich can be used for 
NoSql style reads. I infact am such a great fan of this concept that I tend to 
treat everything a derived store. For me this means a repartitioned topic is a 
derived store of the source topic. This stands in contrast to make a changelog 
out of it and materialize the changelog in say RocksDb. This leads to the 
"problem" that the changelog topic is not a derived store anymore. Wich gives 
me a personally bad feeling, it just pushes me out of my comfort zone. 
Confluent peeps seem to be in their comfort zone with change logging topics. In 
my narrative shit hits the fan when the property of beeing a derived store is 
lost. It leads to all the nasty things like beeing in the need of change 
logging your say RocksDbs as the intermidate topic wont hold stuff forever. 

In contrast to having a change-logging topic that I re-materialize and then 
changecapture again, I prefer todo the change capturing first and only maintain 
the state to wich downstream partitions a record is currently published. This 
works clean and nicely but brings with it what I call "key widening". Say I 
have KTable A and i want to repartition it to A' so that the topic containing 
A' is a derived store & logcompacted. Then I cant use Key todo this for 2 
reasons. The Stream partition, can only access the key to determine the 
partition to delete from  (deletes come as null values), wich means the fields 
going to determine the partitions need to be in the key no matter what. Snippet:
{code:java}

topology.addSink(name, repartitionTopicName, new 
StreamPartitioner(){
private Serializer intermediateSerializer = 
intermediateSerde.serializer();
@Override
public Integer partition(K key, VR value, int 
numPartitions) {
KL newKey = intermideateKeyExtractor.apply(key);
//Copied from Default Partitioner, didn't want 
to create a CLUSTER object here to reuse it.
return 
(Utils.murmur2(intermediateSerializer.serialize(repartitionTopicName, newKey)) 
% numPartitions )& 0x7fff;
}

}, repartitionProcessorName);
{code}

As you can see the result Key K contains the KL ( the key of the not 
repatitioned table).

the second reason why this key must be there is that one needs to be able to 
build a derived stream A''. But since in A' a record can "move" from partition 
X to Y there is a race condition between the "insert" in Y and the delete in X. 
The repartitioner Processor repartitioning for A'' needs to treat them as 
different keys. If it would be the same key the delete would wipe the new value 
maybe. This puts downstream consumers of A'' also in the wired position that at 
any point in time there can be as many A-keys with the same value as there are 
A' partitons -1 or a specific A key might vanish completly and then reappear. 
Wich is sometimes wanky to work around in the end application. But there is 
enough strategies to solve at least the multiple Akeys case, not so much for 
the complete fanish case. I hope this clarrifies stuff. 





> Support non-key joining in KTable
> -
>
> Key: KAFKA-3705
> URL: https://issues.apache.org/jira/browse/KAFKA-3705
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Liquan Pei
>  Labels: api
> Fix For: 0.10.1.0
>
>
> Today in Kafka Streams DSL, KTable joins are only based on keys. If users 
> want to join a KTable A by key {{a}} with another KTable B by key {{b}} but 
> with a "foreign key" {{a}}, and assuming they are read from two topics which 
> are partitioned on {{a}} and {{b}} respectively, they need to do the 
> following pattern:
> {code}
> tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB' 
> is partitioned on "a"
> tableA.join(tableB', joiner);
> {code}
> Even if these two tables 

Re: KTable.filter usage, memory consumption and materialized view semantics

2016-06-27 Thread Guozhang Wang
The boolean flag can be reset by a child operator which requires the source
to be materialized, more details can be found in this design wiki:

https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65143671

But just to give you a concrete idea, if your topology is defined as:


KTable table1 = builder.table("topic1");
table1.filter().mapValues().to("topic2");

Then then enableSendOldValues will not be set to true and the table1 will
not be materialized; if your topology is defined as:

KTable table1 = builder.table("topic1");
KTable table2 = table1.filter().mapValues().groupBy(..).aggregate(..);

Then the enableSendOldValues will be called from the aggregate() operator,
then back-forth to its parent, and then all the way back to the source
table1 (as in KTableSource).


Guozhang



On Mon, Jun 27, 2016 at 3:00 PM, Philippe Derome  wrote:

> Then I don't see any simple solution here at least for a novice, especially
> since I don't know what can trigger the boolean flag to true.
> On 27 Jun 2016 5:38 p.m., "Guozhang Wang"  wrote:
>
> > My concern is that, the overhead of requesting the source KTable to be
> > materialized (i.e. creating a state store, and sending the {old -> new}
> > pair instead of the new value only) may be over-whelming compared with
> its
> > potential benefits of reducing the downstream traffic.
> >
> > Guozhang
> >
> > On Sun, Jun 26, 2016 at 8:58 AM, Philippe Derome 
> > wrote:
> >
> > > Guozhang,
> > >
> > > would you say it's advisable to initialize KTableFilter.sendOldValues
> to
> > > true instead of false? That's what I see that can trigger your
> described
> > > case 3 to potentially desirable effect, but I didn't include it into
> pull
> > > request. If left to default value of false, I don't know what mechanism
> > > should override it to true.
> > >
> > > Phil
> > >
> > > On Sun, Jun 26, 2016 at 12:07 AM, Guozhang Wang 
> > > wrote:
> > >
> > > > Thanks! You can follow this step-by-step guidance to contribute to
> > Kafka
> > > > via github.
> > > >
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Code+Changes#ContributingCodeChanges-PullRequest
> > > >
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Sat, Jun 25, 2016 at 8:40 PM, Philippe Derome  >
> > > > wrote:
> > > >
> > > > > I have a 1 liner solution for this in KTableFilter.java and about
> 5-6
> > > > lines
> > > > > changes to existing unit test
> KTableFilterTest.testSendingOldValue. I
> > > > > included those lines with context in the JIRA. I am struggling a
> bit
> > > with
> > > > > github being new to it and how to do a proper pull request so
> > hopefully
> > > > > that can be followed up by you? I had the streams test suite pass
> > aside
> > > > for
> > > > > a few cases that pertain specifically to this JIRA as assumptions
> > have
> > > > now
> > > > > changed.
> > > > >
> > > > > On Sat, Jun 25, 2016 at 1:14 AM, Guozhang Wang  >
> > > > wrote:
> > > > >
> > > > > > Hi Philippe,
> > > > > >
> > > > > > Great, since you agree with my reasonings, I have created a JIRA
> > > ticket
> > > > > for
> > > > > > optimizing KTableFilter (feel free to pick it up if you are
> > > interested
> > > > in
> > > > > > contributing):
> > > > > >
> > > > > > https://issues.apache.org/jira/browse/KAFKA-3902
> > > > > >
> > > > > > About case 3-c-1), what I meant is that since "predicate return
> > true
> > > on
> > > > > > both",
> > > > > > the resulted pair would just be the same as the original pair.
> > > > > >
> > > > > > About KIP-63, itself is a rather big story, but it has one
> > > > correspondence
> > > > > > to this JIRA: with caching you can dedup some records with the
> same
> > > > key,
> > > > > > for example in the input records to the KTable is:
> > > > > >
> > > > > > , , , , ,  ...
> > > > > >
> > > > > > And the KTable is materialized into a state store with cache on
> top
> > > of
> > > > > it,
> > > > > > then the resulted downstream could be:
> > > > > >
> > > > > >  1}>,  6}> ...
> > > > > >
> > > > > > Instead of
> > > > > >
> > > > > >  1}>,  2}>,  3}>, ...  6}>
> > ...
> > > > > >
> > > > > > So if it is piped to a filter() operator, then even less data
> will
> > be
> > > > > > produced.
> > > > > >
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > >
> > > > > > On Fri, Jun 24, 2016 at 5:58 PM, Philippe Derome <
> > phder...@gmail.com
> > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Yes, it looks very good. Your detailed explanation appears
> > > compelling
> > > > > > > enough to reveal that some of the details of the complexity of
> a
> > > > > streams
> > > > > > > system are probably inherent complexity (not that I dared
> assume
> > it
> > > > was
> > > > > > > "easy" but I could afford to be conveniently unaware). It took
> me
> > > 30
> > > > > > > minutes to grasp this latest response.
> > > > > > >
> > 

Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy

2016-06-27 Thread Vahid S Hashemian
Hi Jason,

Thanks for the thoughtful comments.
Please see my response below.

BTW, I have been trying to update the KIP with some of the recent 
discussions on the mailing list.

Regards,
--Vahid
 



From:   Jason Gustafson 
To: dev@kafka.apache.org
Date:   06/27/2016 12:53 PM
Subject:Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy



Hey Vahid,

Comments below:


I'm not very clear on the first part of this paragraph. You could clarify
> it for me, but in general balancing out the partitions across consumers 
in
> a group as much as possible would normally mean balancing the load 
within
> the cluster, and that's something a user would want to have compared to
> cases where the assignments and therefore the load could be quite
> unbalanced depending on the subscriptions.


I'm just wondering what kind of use cases require differing subscriptions
in a steady state. Usually we expect all consumers in the group to have 
the
same subscription, in which case the balance provided by round robin 
should
be even (in terms of the number of assigned partitions). The only case 
that
comes to mind is a rolling upgrade scenario in which the consumers in the
group are restarted one by one with an updated subscription. It would be
ideal to provide better balance in this situation, but once the upgrade
finishes, the assignment should be balanced again, so it's unclear to me
how significant the gain is. On the other hand, if there are cases which
require differing subscriptions in a long term state, it would make this
feature more compelling.


I agree that if we care only about a balanced assignment with same 
subscriptions the round robin assignment is a good choice. But if we bring 
in stickiness to the mix it won't be guaranteed by the round robin 
assignor. An example (as Andrew mentioned in his earlier note) is elastic 
consumers that come and go automatically depending on the load and how 
much they lag behind. If these consumer maintain state of the partitions 
they consume from it would be reasonable to want them to stick to their 
assigned partitions, rather than having to repeat partition cleanup every 
time the number of consumers changes due to an increase or decrease in 
load. 

I'll also think about it and let you know if I come up with a use case 
with differing subscriptions. If differing subscriptions turns out not to 
be a common use case, the design and implementation of the sticky assignor 
could be modified to a far less complex setting so that 
fairness/stickiness can be guaranteed for same subscriptions. As I 
mentioned before, the current design / implementation is comprehensive and 
can be tweaked towards a less complex solution if further assumptions can 
be made.


Since the new consumer is single threaded there is no such problem in its
> round robin strategy. It simply considers consumers one by one for each
> partition assignment, and when one consumer is assigned a partition, the
> next assignment starts with considering the next consumer in the list 
(and
> not the same consumer that was just assigned). This removes the
> possibility of the issue reported in KAFKA-2019 surfacing in the new
> consumer. In the sticky strategy we do not have this issue either, since
> every time an assignment is about to happen we start with the consumer
> with least number of assignments. So we will not have a scenario where a
> consumer is repeated assigned partitions as in KAFKA-2019 (unless that
> consumer is lagging behind other consumers on the number of partitions
> assigned).


Thanks for checking into this. I think the other factor is that the round
robin assignor sorts the consumers using the id given them by the
coordinator, which at the moment looks like this: "{clientId}-{uuid}". So
if the group uses a common clientId, then it shouldn't usually be the case
that two consumers on the same host get ordered together. We could 
actually
change the order of these fields in a compatible way if we didn't like the
dependence on the clientId. It seems anyway that the sticky assignor is 
not
needed to deal with this problem.


That's correct, and thanks for going into the issue in more details.


Even though consumer groups are usually stable, it might be the case that
> consumers do not initially join the group at the same time. The sticky
> strategy in that situation lets those who joined earlier stick to their
> partitions to some extent (assuming fairness take precedence over
> stickiness). In terms of specific use cases, Andrew touched on examples 
of
> how Kafka can benefit from a sticky assignor. I could add those to the 
KIP
> if you also think they help building the case in favor of sticky 
assignor.
> I agree with you about the downside and I'll make sure I add that to the
> KIP as you suggested.


Yep, I agree that it helps in some situations, but I think the impact is
amortized over the life of the group. It also takes a bit more work to
explain this 

[jira] [Commented] (KAFKA-3905) remove null from subscribed topics in KafkaConsumer#subscribe

2016-06-27 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15352034#comment-15352034
 ] 

Ismael Juma commented on KAFKA-3905:


[~wushujames], I agree.

> remove null from subscribed topics  in KafkaConsumer#subscribe
> --
>
> Key: KAFKA-3905
> URL: https://issues.apache.org/jira/browse/KAFKA-3905
> Project: Kafka
>  Issue Type: Wish
>  Components: clients
>Affects Versions: 0.10.0.0
>Reporter: Xing Huang
>Assignee: Rekha Joshi
>Priority: Minor
>
> Currently, KafkaConsumer's subscribe methods accept Collection as 
> topics to be subscribed, but a Collection may have null as its element. For 
> example
> {code}
> String topic = null;
> Collection topics = Arrays.asList(topic);
> consumer.subscribe(topics)
> {code}
> When this happens, consumer will throw a puzzling NullPointerException:
> {code}
>   at org.apache.kafka.common.utils.Utils.utf8Length(Utils.java:245)
>   at org.apache.kafka.common.protocol.types.Type$6.sizeOf(Type.java:248)
>   at 
> org.apache.kafka.common.protocol.types.ArrayOf.sizeOf(ArrayOf.java:85)
>   at org.apache.kafka.common.protocol.types.Schema.sizeOf(Schema.java:89)
>   at org.apache.kafka.common.protocol.types.Struct.sizeOf(Struct.java:244)
>   at 
> org.apache.kafka.common.requests.RequestSend.serialize(RequestSend.java:35)
>   at 
> org.apache.kafka.common.requests.RequestSend.(RequestSend.java:29)
>   at 
> org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.request(NetworkClient.java:616)
>   at 
> org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:639)
>   at 
> org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:552)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:258)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:179)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:970)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:934)
> {code}
> Maybe it's better to remove null when doing subscription.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3904) File descriptor leaking (Too many open files) for long running stream process

2016-06-27 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3904?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15352025#comment-15352025
 ] 

Ismael Juma commented on KAFKA-3904:


By the way, there's no need to create a `RandomAccessFile` just to get its 
channel. One can simply call `FileChannel.open` instead.

> File descriptor leaking (Too many open files) for long running stream process
> -
>
> Key: KAFKA-3904
> URL: https://issues.apache.org/jira/browse/KAFKA-3904
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Henry Cai
>Assignee: Henry Cai
>  Labels: architecture, newbie
>
> I noticed when my application was running long (> 1 day), I will get 'Too 
> many open files' error.
> I used 'lsof' to list all the file descriptors used by the process, it's over 
> 32K, but most of them belongs to the .lock file, e.g. this same lock file 
> shows 2700 times.
> I looked at the code, I think the problem is in:
> File lockFile = new File(stateDir, ProcessorStateManager.LOCK_FILE_NAME);
> FileChannel channel = new RandomAccessFile(lockFile, "rw").getChannel();
> Each time new RandomAccessFile is called, a new fd will be created, we 
> probably should either close or reuse this RandomAccessFile object.
> lsof result:
> java14799 hcai *740u   REG9,00 2415928585 
> /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock
> java14799 hcai *743u   REG9,00 2415928585 
> /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock
> java14799 hcai *746u   REG9,00 2415928585 
> /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock
> java14799 hcai *755u   REG9,00 2415928585 
> /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock
> hcai@teststream02001:~$ lsof -p 14799 | grep lock | grep 0_16  | wc
>2709   24381  319662



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3908) Set SendBufferSize for socket used by Processor

2016-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3908?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15351977#comment-15351977
 ] 

ASF GitHub Bot commented on KAFKA-3908:
---

GitHub user lindong28 opened a pull request:

https://github.com/apache/kafka/pull/1562

KAFKA-3908; Set SendBufferSize for socket used by Processor



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/lindong28/kafka KAFKA-3908

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1562.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1562


commit 9ad0909e8cfd821dd7cea19efdc6ce993f95ace7
Author: Dong Lin 
Date:   2016-06-27T22:37:59Z

KAFKA-3908; Set SendBufferSize for socket used by Processor




> Set SendBufferSize for socket used by Processor
> ---
>
> Key: KAFKA-3908
> URL: https://issues.apache.org/jira/browse/KAFKA-3908
> Project: Kafka
>  Issue Type: Bug
>Reporter: Dong Lin
>Assignee: Dong Lin
>
> SRE should be able to control the receive buffer size of sockets that are 
> used to receive request from clients, for the same reason set receive buffer 
> size for all other sockets in the server and client. However, we currently 
> only set the send buffer size of this socket.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1562: KAFKA-3908; Set SendBufferSize for socket used by ...

2016-06-27 Thread lindong28
GitHub user lindong28 opened a pull request:

https://github.com/apache/kafka/pull/1562

KAFKA-3908; Set SendBufferSize for socket used by Processor



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/lindong28/kafka KAFKA-3908

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1562.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1562


commit 9ad0909e8cfd821dd7cea19efdc6ce993f95ace7
Author: Dong Lin 
Date:   2016-06-27T22:37:59Z

KAFKA-3908; Set SendBufferSize for socket used by Processor




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (KAFKA-3908) Set SendBufferSize for socket used by Processor

2016-06-27 Thread Dong Lin (JIRA)
Dong Lin created KAFKA-3908:
---

 Summary: Set SendBufferSize for socket used by Processor
 Key: KAFKA-3908
 URL: https://issues.apache.org/jira/browse/KAFKA-3908
 Project: Kafka
  Issue Type: Bug
Reporter: Dong Lin
Assignee: Dong Lin


SRE should be able to control the receive buffer size of sockets that are used 
to receive request from clients, for the same reason set receive buffer size 
for all other sockets in the server and client. However, we currently only set 
the send buffer size of this socket.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: KTable.filter usage, memory consumption and materialized view semantics

2016-06-27 Thread Philippe Derome
Then I don't see any simple solution here at least for a novice, especially
since I don't know what can trigger the boolean flag to true.
On 27 Jun 2016 5:38 p.m., "Guozhang Wang"  wrote:

> My concern is that, the overhead of requesting the source KTable to be
> materialized (i.e. creating a state store, and sending the {old -> new}
> pair instead of the new value only) may be over-whelming compared with its
> potential benefits of reducing the downstream traffic.
>
> Guozhang
>
> On Sun, Jun 26, 2016 at 8:58 AM, Philippe Derome 
> wrote:
>
> > Guozhang,
> >
> > would you say it's advisable to initialize KTableFilter.sendOldValues to
> > true instead of false? That's what I see that can trigger your described
> > case 3 to potentially desirable effect, but I didn't include it into pull
> > request. If left to default value of false, I don't know what mechanism
> > should override it to true.
> >
> > Phil
> >
> > On Sun, Jun 26, 2016 at 12:07 AM, Guozhang Wang 
> > wrote:
> >
> > > Thanks! You can follow this step-by-step guidance to contribute to
> Kafka
> > > via github.
> > >
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Code+Changes#ContributingCodeChanges-PullRequest
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Sat, Jun 25, 2016 at 8:40 PM, Philippe Derome 
> > > wrote:
> > >
> > > > I have a 1 liner solution for this in KTableFilter.java and about 5-6
> > > lines
> > > > changes to existing unit test KTableFilterTest.testSendingOldValue. I
> > > > included those lines with context in the JIRA. I am struggling a bit
> > with
> > > > github being new to it and how to do a proper pull request so
> hopefully
> > > > that can be followed up by you? I had the streams test suite pass
> aside
> > > for
> > > > a few cases that pertain specifically to this JIRA as assumptions
> have
> > > now
> > > > changed.
> > > >
> > > > On Sat, Jun 25, 2016 at 1:14 AM, Guozhang Wang 
> > > wrote:
> > > >
> > > > > Hi Philippe,
> > > > >
> > > > > Great, since you agree with my reasonings, I have created a JIRA
> > ticket
> > > > for
> > > > > optimizing KTableFilter (feel free to pick it up if you are
> > interested
> > > in
> > > > > contributing):
> > > > >
> > > > > https://issues.apache.org/jira/browse/KAFKA-3902
> > > > >
> > > > > About case 3-c-1), what I meant is that since "predicate return
> true
> > on
> > > > > both",
> > > > > the resulted pair would just be the same as the original pair.
> > > > >
> > > > > About KIP-63, itself is a rather big story, but it has one
> > > correspondence
> > > > > to this JIRA: with caching you can dedup some records with the same
> > > key,
> > > > > for example in the input records to the KTable is:
> > > > >
> > > > > , , , , ,  ...
> > > > >
> > > > > And the KTable is materialized into a state store with cache on top
> > of
> > > > it,
> > > > > then the resulted downstream could be:
> > > > >
> > > > >  1}>,  6}> ...
> > > > >
> > > > > Instead of
> > > > >
> > > > >  1}>,  2}>,  3}>, ...  6}>
> ...
> > > > >
> > > > > So if it is piped to a filter() operator, then even less data will
> be
> > > > > produced.
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > > On Fri, Jun 24, 2016 at 5:58 PM, Philippe Derome <
> phder...@gmail.com
> > >
> > > > > wrote:
> > > > >
> > > > > > Yes, it looks very good. Your detailed explanation appears
> > compelling
> > > > > > enough to reveal that some of the details of the complexity of a
> > > > streams
> > > > > > system are probably inherent complexity (not that I dared assume
> it
> > > was
> > > > > > "easy" but I could afford to be conveniently unaware). It took me
> > 30
> > > > > > minutes to grasp this latest response.
> > > > > >
> > > > > > There might be a typo in your email for case 3.c.1) as I would
> > think
> > > we
> > > > > > should send the most recent pair as opposed to original, in any
> > event
> > > > it
> > > > > > does not materially impact your presentation.
> > > > > >
> > > > > > Your case 3a) is really what triggered my line of questioning
> and I
> > > > found
> > > > > > the current behaviour vexing as it may lead to some undesirable
> and
> > > > > > necessary filter (see Michael G. Noll's fix in
> > > UserRegionLambdaExample
> > > > at
> > > > > > the very end trying to weed out null) used to output to topic to
> > > > console.
> > > > > > Without looking at design, it seemed self-evident to me that the
> > 3a)
> > > > > > behaviour had to be implemented ( from my point of view with the
> > code
> > > > > > example I was looking at, it simply means never say to delete a
> key
> > > > that
> > > > > > was never created, simply don't "create a deleted" key).
> > > > > >
> > > > > > Likewise cases 3 b,c look very reasonable.
> > > > > >
> > > > > > Just out of curiosity, did you effectively just restate the
> essence
> > > of
> > > > > > KIP-63 in a 

[jira] [Commented] (KAFKA-3905) remove null from subscribed topics in KafkaConsumer#subscribe

2016-06-27 Thread James Cheng (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15351906#comment-15351906
 ] 

James Cheng commented on KAFKA-3905:


I think it's better to throw an error, instead of silently removing them. The 
user may be unaware they are passing in bad data, and would be confused why the 
consumer is not acting as they expected.

The NullPointerException is definitely ugly though. Maybe some sort of 
Exception (not sure which type) which says "can't subscribe to topic (null)"? 

> remove null from subscribed topics  in KafkaConsumer#subscribe
> --
>
> Key: KAFKA-3905
> URL: https://issues.apache.org/jira/browse/KAFKA-3905
> Project: Kafka
>  Issue Type: Wish
>  Components: clients
>Affects Versions: 0.10.0.0
>Reporter: Xing Huang
>Assignee: Rekha Joshi
>Priority: Minor
>
> Currently, KafkaConsumer's subscribe methods accept Collection as 
> topics to be subscribed, but a Collection may have null as its element. For 
> example
> {code}
> String topic = null;
> Collection topics = Arrays.asList(topic);
> consumer.subscribe(topics)
> {code}
> When this happens, consumer will throw a puzzling NullPointerException:
> {code}
>   at org.apache.kafka.common.utils.Utils.utf8Length(Utils.java:245)
>   at org.apache.kafka.common.protocol.types.Type$6.sizeOf(Type.java:248)
>   at 
> org.apache.kafka.common.protocol.types.ArrayOf.sizeOf(ArrayOf.java:85)
>   at org.apache.kafka.common.protocol.types.Schema.sizeOf(Schema.java:89)
>   at org.apache.kafka.common.protocol.types.Struct.sizeOf(Struct.java:244)
>   at 
> org.apache.kafka.common.requests.RequestSend.serialize(RequestSend.java:35)
>   at 
> org.apache.kafka.common.requests.RequestSend.(RequestSend.java:29)
>   at 
> org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.request(NetworkClient.java:616)
>   at 
> org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:639)
>   at 
> org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:552)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:258)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:179)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:970)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:934)
> {code}
> Maybe it's better to remove null when doing subscription.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3904) File descriptor leaking (Too many open files) for long running stream process

2016-06-27 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3904?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15351901#comment-15351901
 ] 

Guozhang Wang commented on KAFKA-3904:
--

[~h...@pinterest.com] This is a great find, I think we have several problems 
with our locking files, and another example is KAFKA-3812.

As for the fix, could you first submit a PR for it? In your code snippet above 
I'm not sure, for example, what is `channels`, is that a map from file object 
to fd?

> File descriptor leaking (Too many open files) for long running stream process
> -
>
> Key: KAFKA-3904
> URL: https://issues.apache.org/jira/browse/KAFKA-3904
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Henry Cai
>Assignee: Henry Cai
>  Labels: architecture, newbie
>
> I noticed when my application was running long (> 1 day), I will get 'Too 
> many open files' error.
> I used 'lsof' to list all the file descriptors used by the process, it's over 
> 32K, but most of them belongs to the .lock file, e.g. this same lock file 
> shows 2700 times.
> I looked at the code, I think the problem is in:
> File lockFile = new File(stateDir, ProcessorStateManager.LOCK_FILE_NAME);
> FileChannel channel = new RandomAccessFile(lockFile, "rw").getChannel();
> Each time new RandomAccessFile is called, a new fd will be created, we 
> probably should either close or reuse this RandomAccessFile object.
> lsof result:
> java14799 hcai *740u   REG9,00 2415928585 
> /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock
> java14799 hcai *743u   REG9,00 2415928585 
> /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock
> java14799 hcai *746u   REG9,00 2415928585 
> /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock
> java14799 hcai *755u   REG9,00 2415928585 
> /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock
> hcai@teststream02001:~$ lsof -p 14799 | grep lock | grep 0_16  | wc
>2709   24381  319662



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3904) File descriptor leaking (Too many open files) for long running stream process

2016-06-27 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3904?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-3904:
-
Labels: architecture newbie  (was: api newbie)

> File descriptor leaking (Too many open files) for long running stream process
> -
>
> Key: KAFKA-3904
> URL: https://issues.apache.org/jira/browse/KAFKA-3904
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Henry Cai
>Assignee: Henry Cai
>  Labels: architecture, newbie
>
> I noticed when my application was running long (> 1 day), I will get 'Too 
> many open files' error.
> I used 'lsof' to list all the file descriptors used by the process, it's over 
> 32K, but most of them belongs to the .lock file, e.g. this same lock file 
> shows 2700 times.
> I looked at the code, I think the problem is in:
> File lockFile = new File(stateDir, ProcessorStateManager.LOCK_FILE_NAME);
> FileChannel channel = new RandomAccessFile(lockFile, "rw").getChannel();
> Each time new RandomAccessFile is called, a new fd will be created, we 
> probably should either close or reuse this RandomAccessFile object.
> lsof result:
> java14799 hcai *740u   REG9,00 2415928585 
> /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock
> java14799 hcai *743u   REG9,00 2415928585 
> /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock
> java14799 hcai *746u   REG9,00 2415928585 
> /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock
> java14799 hcai *755u   REG9,00 2415928585 
> /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock
> hcai@teststream02001:~$ lsof -p 14799 | grep lock | grep 0_16  | wc
>2709   24381  319662



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: KTable.filter usage, memory consumption and materialized view semantics

2016-06-27 Thread Guozhang Wang
My concern is that, the overhead of requesting the source KTable to be
materialized (i.e. creating a state store, and sending the {old -> new}
pair instead of the new value only) may be over-whelming compared with its
potential benefits of reducing the downstream traffic.

Guozhang

On Sun, Jun 26, 2016 at 8:58 AM, Philippe Derome  wrote:

> Guozhang,
>
> would you say it's advisable to initialize KTableFilter.sendOldValues to
> true instead of false? That's what I see that can trigger your described
> case 3 to potentially desirable effect, but I didn't include it into pull
> request. If left to default value of false, I don't know what mechanism
> should override it to true.
>
> Phil
>
> On Sun, Jun 26, 2016 at 12:07 AM, Guozhang Wang 
> wrote:
>
> > Thanks! You can follow this step-by-step guidance to contribute to Kafka
> > via github.
> >
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Code+Changes#ContributingCodeChanges-PullRequest
> >
> >
> > Guozhang
> >
> >
> > On Sat, Jun 25, 2016 at 8:40 PM, Philippe Derome 
> > wrote:
> >
> > > I have a 1 liner solution for this in KTableFilter.java and about 5-6
> > lines
> > > changes to existing unit test KTableFilterTest.testSendingOldValue. I
> > > included those lines with context in the JIRA. I am struggling a bit
> with
> > > github being new to it and how to do a proper pull request so hopefully
> > > that can be followed up by you? I had the streams test suite pass aside
> > for
> > > a few cases that pertain specifically to this JIRA as assumptions have
> > now
> > > changed.
> > >
> > > On Sat, Jun 25, 2016 at 1:14 AM, Guozhang Wang 
> > wrote:
> > >
> > > > Hi Philippe,
> > > >
> > > > Great, since you agree with my reasonings, I have created a JIRA
> ticket
> > > for
> > > > optimizing KTableFilter (feel free to pick it up if you are
> interested
> > in
> > > > contributing):
> > > >
> > > > https://issues.apache.org/jira/browse/KAFKA-3902
> > > >
> > > > About case 3-c-1), what I meant is that since "predicate return true
> on
> > > > both",
> > > > the resulted pair would just be the same as the original pair.
> > > >
> > > > About KIP-63, itself is a rather big story, but it has one
> > correspondence
> > > > to this JIRA: with caching you can dedup some records with the same
> > key,
> > > > for example in the input records to the KTable is:
> > > >
> > > > , , , , ,  ...
> > > >
> > > > And the KTable is materialized into a state store with cache on top
> of
> > > it,
> > > > then the resulted downstream could be:
> > > >
> > > >  1}>,  6}> ...
> > > >
> > > > Instead of
> > > >
> > > >  1}>,  2}>,  3}>, ...  6}> ...
> > > >
> > > > So if it is piped to a filter() operator, then even less data will be
> > > > produced.
> > > >
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Fri, Jun 24, 2016 at 5:58 PM, Philippe Derome  >
> > > > wrote:
> > > >
> > > > > Yes, it looks very good. Your detailed explanation appears
> compelling
> > > > > enough to reveal that some of the details of the complexity of a
> > > streams
> > > > > system are probably inherent complexity (not that I dared assume it
> > was
> > > > > "easy" but I could afford to be conveniently unaware). It took me
> 30
> > > > > minutes to grasp this latest response.
> > > > >
> > > > > There might be a typo in your email for case 3.c.1) as I would
> think
> > we
> > > > > should send the most recent pair as opposed to original, in any
> event
> > > it
> > > > > does not materially impact your presentation.
> > > > >
> > > > > Your case 3a) is really what triggered my line of questioning and I
> > > found
> > > > > the current behaviour vexing as it may lead to some undesirable and
> > > > > necessary filter (see Michael G. Noll's fix in
> > UserRegionLambdaExample
> > > at
> > > > > the very end trying to weed out null) used to output to topic to
> > > console.
> > > > > Without looking at design, it seemed self-evident to me that the
> 3a)
> > > > > behaviour had to be implemented ( from my point of view with the
> code
> > > > > example I was looking at, it simply means never say to delete a key
> > > that
> > > > > was never created, simply don't "create a deleted" key).
> > > > >
> > > > > Likewise cases 3 b,c look very reasonable.
> > > > >
> > > > > Just out of curiosity, did you effectively just restate the essence
> > of
> > > > > KIP-63 in a more approachable language I could understand or is
> > KIP-63
> > > > > really a different beast?
> > > > >
> > > > >
> > > > >
> > > > > On Fri, Jun 24, 2016 at 5:45 PM, Guozhang Wang  >
> > > > wrote:
> > > > >
> > > > > > Hello Philippe,
> > > > > >
> > > > > > Very good points, let me dump my thoughts about "KTable.filter"
> > > > > > specifically and how we can improve on that:
> > > > > >
> > > > > > 1. Some context: when a KTable participates in a downstream
> > operators
> > > > 

[jira] [Created] (KAFKA-3907) Better support for user-specific committing in the Streams DSL

2016-06-27 Thread Guozhang Wang (JIRA)
Guozhang Wang created KAFKA-3907:


 Summary: Better support for user-specific committing in the 
Streams DSL
 Key: KAFKA-3907
 URL: https://issues.apache.org/jira/browse/KAFKA-3907
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Guozhang Wang


Currently for user-specifically committing the current processing state, users 
can make use of the {{ProcessorContext}} object, which is exposed in the 
{{Processor}} API. Other than that, the application will also automatically 
committing the processing state based on the configured interval.

Hence in the Streams DSL, if a user wants to explicitly call {{commit}}, she 
needs to use a {{process(ProcessorSupplier)}} API to get a customized processor 
instance in order to access the {{ProcessorContext}}. We should think of a 
better way to support user-specific committing interfaces inside the high-level 
Streams DSL.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [VOTE] KIP-55: Secure quotas for authenticated users

2016-06-27 Thread Jun Rao
Rajini,

Thanks for the update. Looks good to me. My only comment is that
instead of /config/users//clients,
would it be better to represent it as /config/users//clients/
so that it's more consistent?

Jun


On Thu, Jun 23, 2016 at 2:16 PM, Rajini Sivaram <
rajinisiva...@googlemail.com> wrote:

> Jun,
>
> Yes, I agree that it makes sense to retain the existing semantics for
> client-id quotas for compatibility. Especially if we can provide the option
> to enable secure client-id quotas for multi-user clusters as well.
>
> I have updated the KIP - each of these levels can have defaults as well as
> specific entries:
>
>- /config/clients : Insecure  quotas with the same semantics
>as now
>- /config/users: User quotas
>- /config/users/userA/clients:  quotas for userA
>- /config/users//clients: Default  quotas
>
> Now it is fully flexible as well as compatible with the current
> implementation. I used /config/users//clients rather than
> /config/users/clients since "clients" is a valid (unlikely, but still
> possible) user principal. I used , but it could be anything that
> is a valid Zookeeper node name, but not a valid URL-encoded name.
>
> Thank you,
>
> Rajini
>
> On Thu, Jun 23, 2016 at 3:43 PM, Jun Rao  wrote:
>
> > Hi, Rajini,
> >
> > For the following statements, would it be better to allocate the quota to
> > all connections whose client-id is clientX? This way, existing client-id
> > quotas are fully compatible in the new release whether the cluster is in
> a
> > single user or multi-user environment.
> >
> > 4. If client-id quota override is defined for clientX in
> > /config/clients/clientX, this quota is allocated for the sole use of
> >  > clientX>
> > 5. If dynamic client-id default is configured in /config/clients, this
> > default quota is allocated for the sole use of 
> > 6. If quota.producer.default is configured for the broker in
> > server.properties, this default quota is allocated for the sole use of
> >  > clientX>
> >
> > We can potentially add a default quota for both user and client at path
> > /config/users/clients?
> >
> > Thanks,
> >
> > Jun
> >
> > On Wed, Jun 22, 2016 at 3:01 AM, Rajini Sivaram <
> > rajinisiva...@googlemail.com> wrote:
> >
> > > Ismael, Jun,
> > >
> > > Thank you both for the feedback. Have updated the KIP to add dynamic
> > > default quotas for client-id with deprecation of existing static
> default
> > > properties.
> > >
> > >
> > > On Wed, Jun 22, 2016 at 12:50 AM, Jun Rao  wrote:
> > >
> > > > Yes, for consistency, perhaps we can allow client-id quota to be
> > > configured
> > > > dynamically too and mark the static config in the broker as
> deprecated.
> > > If
> > > > both are set, the dynamic one wins.
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > > On Tue, Jun 21, 2016 at 3:56 AM, Ismael Juma 
> > wrote:
> > > >
> > > > > On Tue, Jun 21, 2016 at 12:50 PM, Rajini Sivaram <
> > > > > rajinisiva...@googlemail.com> wrote:
> > > > >
> > > > > > It is actually quite tempting to do the same for client-id quotas
> > as
> > > > > well,
> > > > > > but I suppose we can't break existing users who have configured
> > > > defaults
> > > > > in
> > > > > > server.properties and providing two ways of setting client-id
> > > defaults
> > > > > > would be just too confusing.
> > > > > >
> > > > >
> > > > > Using two different approaches for client-id versus user quota
> > defaults
> > > > is
> > > > > also not great. We could deprecate the server.properties default
> > > configs
> > > > > for client-id quotas and remove them in the future. In the
> meantime,
> > we
> > > > > would have to live with 2 level defaults.
> > > > >
> > > > > Jun, what are your thoughts on this?
> > > > >
> > > > > Ismael
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > Regards,
> > >
> > > Rajini
> > >
> >
>
>
>
> --
> Regards,
>
> Rajini
>


[jira] [Commented] (KAFKA-3905) remove null from subscribed topics in KafkaConsumer#subscribe

2016-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15351785#comment-15351785
 ] 

ASF GitHub Bot commented on KAFKA-3905:
---

GitHub user rekhajoshm opened a pull request:

https://github.com/apache/kafka/pull/1561

KAFKA-3905:remove null from subscribed topics in KafkaConsumer#subscribe

KAFKA-3905:remove null from subscribed topics in KafkaConsumer#subscribe

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rekhajoshm/kafka localtrunk

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1561.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1561






> remove null from subscribed topics  in KafkaConsumer#subscribe
> --
>
> Key: KAFKA-3905
> URL: https://issues.apache.org/jira/browse/KAFKA-3905
> Project: Kafka
>  Issue Type: Wish
>  Components: clients
>Affects Versions: 0.10.0.0
>Reporter: Xing Huang
>Assignee: Rekha Joshi
>Priority: Minor
>
> Currently, KafkaConsumer's subscribe methods accept Collection as 
> topics to be subscribed, but a Collection may have null as its element. For 
> example
> {code}
> String topic = null;
> Collection topics = Arrays.asList(topic);
> consumer.subscribe(topics)
> {code}
> When this happens, consumer will throw a puzzling NullPointerException:
> {code}
>   at org.apache.kafka.common.utils.Utils.utf8Length(Utils.java:245)
>   at org.apache.kafka.common.protocol.types.Type$6.sizeOf(Type.java:248)
>   at 
> org.apache.kafka.common.protocol.types.ArrayOf.sizeOf(ArrayOf.java:85)
>   at org.apache.kafka.common.protocol.types.Schema.sizeOf(Schema.java:89)
>   at org.apache.kafka.common.protocol.types.Struct.sizeOf(Struct.java:244)
>   at 
> org.apache.kafka.common.requests.RequestSend.serialize(RequestSend.java:35)
>   at 
> org.apache.kafka.common.requests.RequestSend.(RequestSend.java:29)
>   at 
> org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.request(NetworkClient.java:616)
>   at 
> org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:639)
>   at 
> org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:552)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:258)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:179)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:970)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:934)
> {code}
> Maybe it's better to remove null when doing subscription.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1561: KAFKA-3905:remove null from subscribed topics in K...

2016-06-27 Thread rekhajoshm
GitHub user rekhajoshm opened a pull request:

https://github.com/apache/kafka/pull/1561

KAFKA-3905:remove null from subscribed topics in KafkaConsumer#subscribe

KAFKA-3905:remove null from subscribed topics in KafkaConsumer#subscribe

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rekhajoshm/kafka localtrunk

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1561.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1561






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Jenkins build is back to normal : kafka-trunk-jdk8 #722

2016-06-27 Thread Apache Jenkins Server
See 



Re: [jira] [Commented] (KAFKA-3896) Unstable test KStreamRepartitionJoinTest.shouldCorrectlyRepartitionOnJoinOperations

2016-06-27 Thread Philippe Derome
Never mind sorry. I should research this on my own.
On 27 Jun 2016 3:29 p.m., "Philippe Derome"  wrote:

> Guozhang, all on this thread,
>
> I've been spending a fair amount looking at this problem very indirectly
> with my PR for KAFKA-3902. I had 3 commits on the PR and the last one fails
> on Jenkins for this unit test here:
> https://github.com/apache/kafka/pull/1556
>
> Additionally, that particular file KStreamRepartitionJoinTest.java is not
> in my local repo and I cannot seem to resolve merge conflicts between my
> local repo and trunk (I admit to be unfamiliar with git commands, which is
> possibly the main issue).
>
> Is it possible for anyone to validate that my PR is actually reasonably
> sensible despite that unit test failure?
>
> Thanks,
>
> Phil
>
> On Mon, Jun 27, 2016 at 2:51 PM, ASF GitHub Bot (JIRA) 
> wrote:
>
>>
>> [
>> https://issues.apache.org/jira/browse/KAFKA-3896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15351603#comment-15351603
>> ]
>>
>> ASF GitHub Bot commented on KAFKA-3896:
>> ---
>>
>> Github user asfgit closed the pull request at:
>>
>> https://github.com/apache/kafka/pull/1549
>>
>>
>> > Unstable test
>> KStreamRepartitionJoinTest.shouldCorrectlyRepartitionOnJoinOperations
>> >
>> ---
>> >
>> > Key: KAFKA-3896
>> > URL: https://issues.apache.org/jira/browse/KAFKA-3896
>> > Project: Kafka
>> >  Issue Type: Bug
>> >  Components: unit tests
>> >Reporter: Ashish K Singh
>> >Assignee: Damian Guy
>> > Fix For: 0.10.1.0
>> >
>> >
>> >
>> {{KStreamRepartitionJoinTest.shouldCorrectlyRepartitionOnJoinOperations}}
>> seems to be unstable. A failure can be found [here|
>> https://builds.apache.org/job/kafka-trunk-git-pr-jdk7/4363/]. Could not
>> reproduce the test failure locally though.
>>
>>
>>
>> --
>> This message was sent by Atlassian JIRA
>> (v6.3.4#6332)
>>
>
>


Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy

2016-06-27 Thread Jason Gustafson
Hey Vahid,

Comments below:

I'm not very clear on the first part of this paragraph. You could clarify
> it for me, but in general balancing out the partitions across consumers in
> a group as much as possible would normally mean balancing the load within
> the cluster, and that's something a user would want to have compared to
> cases where the assignments and therefore the load could be quite
> unbalanced depending on the subscriptions.


I'm just wondering what kind of use cases require differing subscriptions
in a steady state. Usually we expect all consumers in the group to have the
same subscription, in which case the balance provided by round robin should
be even (in terms of the number of assigned partitions). The only case that
comes to mind is a rolling upgrade scenario in which the consumers in the
group are restarted one by one with an updated subscription. It would be
ideal to provide better balance in this situation, but once the upgrade
finishes, the assignment should be balanced again, so it's unclear to me
how significant the gain is. On the other hand, if there are cases which
require differing subscriptions in a long term state, it would make this
feature more compelling.

Since the new consumer is single threaded there is no such problem in its
> round robin strategy. It simply considers consumers one by one for each
> partition assignment, and when one consumer is assigned a partition, the
> next assignment starts with considering the next consumer in the list (and
> not the same consumer that was just assigned). This removes the
> possibility of the issue reported in KAFKA-2019 surfacing in the new
> consumer. In the sticky strategy we do not have this issue either, since
> every time an assignment is about to happen we start with the consumer
> with least number of assignments. So we will not have a scenario where a
> consumer is repeated assigned partitions as in KAFKA-2019 (unless that
> consumer is lagging behind other consumers on the number of partitions
> assigned).


Thanks for checking into this. I think the other factor is that the round
robin assignor sorts the consumers using the id given them by the
coordinator, which at the moment looks like this: "{clientId}-{uuid}". So
if the group uses a common clientId, then it shouldn't usually be the case
that two consumers on the same host get ordered together. We could actually
change the order of these fields in a compatible way if we didn't like the
dependence on the clientId. It seems anyway that the sticky assignor is not
needed to deal with this problem.

Even though consumer groups are usually stable, it might be the case that
> consumers do not initially join the group at the same time. The sticky
> strategy in that situation lets those who joined earlier stick to their
> partitions to some extent (assuming fairness take precedence over
> stickiness). In terms of specific use cases, Andrew touched on examples of
> how Kafka can benefit from a sticky assignor. I could add those to the KIP
> if you also think they help building the case in favor of sticky assignor.
> I agree with you about the downside and I'll make sure I add that to the
> KIP as you suggested.


Yep, I agree that it helps in some situations, but I think the impact is
amortized over the life of the group. It also takes a bit more work to
explain this to users and may require them to change their usage pattern a
little bit. I think we expect users to do something like the following in
their rebalance listener:

class MyRebalanceListener {
  void onPartitionsRevoked(Collection partitions) {
for (TopicPartition partition : partitions) {
  commitOffsets(partition);
  cleanupState(partition);
}
  }

  void onPartitionsAssigned(Collection partitions) {
for (TopicPartition partition : partitions) {
  initializeState(partition);
  initializeOffset(partition);
}
  }
}

This is fairly intuitive, but if you use this pattern, then sticky
assignment doesn't give you anything because you always cleanup state prior
to the rebalance. Instead you need to do something like this:

class MyRebalanceListener {
  Collection lastAssignment = Collections.emptyList();

  void onPartitionsRevoked(Collection partitions) {
for (TopicPartition partition : partitions) {
  commitOffsets(partition);
}
  }

  void onPartitionsAssigned(Collection assignment) {
for (TopicPartition partition : difference(lastAssignment, assignment) {
  cleanupState(partition);
}

for (TopicPartition partition : difference(assignment, lastAssignment) {
  initializeState(partition);
}

for (TopicPartition partition : assignment) {
  initializeOffset(partition);
}

this.lastAssignment = assignment;
  }
}

This seems harder to explain and probably is the reason why Andy was
suggesting that it would be more ideal if we could simply skip the call to
onRevoked() if the partitions remain assigned to the consumer after the
rebalance. 

Re: [jira] [Commented] (KAFKA-3896) Unstable test KStreamRepartitionJoinTest.shouldCorrectlyRepartitionOnJoinOperations

2016-06-27 Thread Philippe Derome
Guozhang, all on this thread,

I've been spending a fair amount looking at this problem very indirectly
with my PR for KAFKA-3902. I had 3 commits on the PR and the last one fails
on Jenkins for this unit test here:
https://github.com/apache/kafka/pull/1556

Additionally, that particular file KStreamRepartitionJoinTest.java is not
in my local repo and I cannot seem to resolve merge conflicts between my
local repo and trunk (I admit to be unfamiliar with git commands, which is
possibly the main issue).

Is it possible for anyone to validate that my PR is actually reasonably
sensible despite that unit test failure?

Thanks,

Phil

On Mon, Jun 27, 2016 at 2:51 PM, ASF GitHub Bot (JIRA) 
wrote:

>
> [
> https://issues.apache.org/jira/browse/KAFKA-3896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15351603#comment-15351603
> ]
>
> ASF GitHub Bot commented on KAFKA-3896:
> ---
>
> Github user asfgit closed the pull request at:
>
> https://github.com/apache/kafka/pull/1549
>
>
> > Unstable test
> KStreamRepartitionJoinTest.shouldCorrectlyRepartitionOnJoinOperations
> >
> ---
> >
> > Key: KAFKA-3896
> > URL: https://issues.apache.org/jira/browse/KAFKA-3896
> > Project: Kafka
> >  Issue Type: Bug
> >  Components: unit tests
> >Reporter: Ashish K Singh
> >Assignee: Damian Guy
> > Fix For: 0.10.1.0
> >
> >
> >
> {{KStreamRepartitionJoinTest.shouldCorrectlyRepartitionOnJoinOperations}}
> seems to be unstable. A failure can be found [here|
> https://builds.apache.org/job/kafka-trunk-git-pr-jdk7/4363/]. Could not
> reproduce the test failure locally though.
>
>
>
> --
> This message was sent by Atlassian JIRA
> (v6.3.4#6332)
>


[jira] [Commented] (KAFKA-3906) Connect logical types do not support nulls.

2016-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15351622#comment-15351622
 ] 

ASF GitHub Bot commented on KAFKA-3906:
---

GitHub user jcustenborder opened a pull request:

https://github.com/apache/kafka/pull/1560

KAFKA-3906 - Connect logical types do not support nulls.

Initial commit with failing unit tests for proposed functionality.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jcustenborder/kafka KAFKA-3906

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1560.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1560


commit 5e5d26fcb26988f1f067a91864b74f9da0851871
Author: Jeremy Custenborder 
Date:   2016-06-27T19:02:59Z

Modified method signatures for Date, Time, Timestamp to allow nulls. Added 
tests for logical types with nulls.




> Connect logical types do not support nulls.
> ---
>
> Key: KAFKA-3906
> URL: https://issues.apache.org/jira/browse/KAFKA-3906
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.10.0.0
>Reporter: Jeremy Custenborder
>Assignee: Ewen Cheslack-Postava
>
> The logical types for Kafka Connect do not support null data values. Date, 
> Decimal, Time, and Timestamp all will throw null reference exceptions if a 
> null is passed in to their fromLogical and toLogical methods. Date, Time, and 
> Timestamp require signature changes for these methods to support nullable 
> types.  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1560: KAFKA-3906 - Connect logical types do not support ...

2016-06-27 Thread jcustenborder
GitHub user jcustenborder opened a pull request:

https://github.com/apache/kafka/pull/1560

KAFKA-3906 - Connect logical types do not support nulls.

Initial commit with failing unit tests for proposed functionality.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jcustenborder/kafka KAFKA-3906

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1560.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1560


commit 5e5d26fcb26988f1f067a91864b74f9da0851871
Author: Jeremy Custenborder 
Date:   2016-06-27T19:02:59Z

Modified method signatures for Date, Time, Timestamp to allow nulls. Added 
tests for logical types with nulls.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (KAFKA-3906) Connect logical types do not support nulls.

2016-06-27 Thread Jeremy Custenborder (JIRA)
Jeremy Custenborder created KAFKA-3906:
--

 Summary: Connect logical types do not support nulls.
 Key: KAFKA-3906
 URL: https://issues.apache.org/jira/browse/KAFKA-3906
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 0.10.0.0
Reporter: Jeremy Custenborder
Assignee: Ewen Cheslack-Postava


The logical types for Kafka Connect do not support null data values. Date, 
Decimal, Time, and Timestamp all will throw null reference exceptions if a null 
is passed in to their fromLogical and toLogical methods. Date, Time, and 
Timestamp require signature changes for these methods to support nullable 
types.  




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-3896) Unstable test KStreamRepartitionJoinTest.shouldCorrectlyRepartitionOnJoinOperations

2016-06-27 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-3896.
--
   Resolution: Fixed
Fix Version/s: 0.10.1.0

Issue resolved by pull request 1549
[https://github.com/apache/kafka/pull/1549]

> Unstable test 
> KStreamRepartitionJoinTest.shouldCorrectlyRepartitionOnJoinOperations
> ---
>
> Key: KAFKA-3896
> URL: https://issues.apache.org/jira/browse/KAFKA-3896
> Project: Kafka
>  Issue Type: Bug
>  Components: unit tests
>Reporter: Ashish K Singh
>Assignee: Damian Guy
> Fix For: 0.10.1.0
>
>
> {{KStreamRepartitionJoinTest.shouldCorrectlyRepartitionOnJoinOperations}} 
> seems to be unstable. A failure can be found 
> [here|https://builds.apache.org/job/kafka-trunk-git-pr-jdk7/4363/]. Could not 
> reproduce the test failure locally though.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3896) Unstable test KStreamRepartitionJoinTest.shouldCorrectlyRepartitionOnJoinOperations

2016-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15351603#comment-15351603
 ] 

ASF GitHub Bot commented on KAFKA-3896:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/1549


> Unstable test 
> KStreamRepartitionJoinTest.shouldCorrectlyRepartitionOnJoinOperations
> ---
>
> Key: KAFKA-3896
> URL: https://issues.apache.org/jira/browse/KAFKA-3896
> Project: Kafka
>  Issue Type: Bug
>  Components: unit tests
>Reporter: Ashish K Singh
>Assignee: Damian Guy
> Fix For: 0.10.1.0
>
>
> {{KStreamRepartitionJoinTest.shouldCorrectlyRepartitionOnJoinOperations}} 
> seems to be unstable. A failure can be found 
> [here|https://builds.apache.org/job/kafka-trunk-git-pr-jdk7/4363/]. Could not 
> reproduce the test failure locally though.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1549: KAFKA-3896: Unstable test KStreamRepartitionJoinTe...

2016-06-27 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/1549


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-3822) Kafka Consumer close() hangs indefinitely if Kafka Broker shutdown while connected

2016-06-27 Thread Ashish K Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15351574#comment-15351574
 ] 

Ashish K Singh commented on KAFKA-3822:
---

[~hachikuji] yeah makes sense. Will have a KIP up for review today.

> Kafka Consumer close() hangs indefinitely if Kafka Broker shutdown while 
> connected
> --
>
> Key: KAFKA-3822
> URL: https://issues.apache.org/jira/browse/KAFKA-3822
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.1, 0.10.0.0
> Environment: x86 Red Hat 6 (1 broker running zookeeper locally, 
> client running on a separate server)
>Reporter: Alexander Cook
>Assignee: Ashish K Singh
>
> I am using the KafkaConsumer java client to consume messages. My application 
> shuts down smoothly if I am connected to a Kafka broker, or if I never 
> succeed at connecting to a Kafka broker, but if the broker is shut down while 
> my consumer is connected to it, consumer.close() hangs indefinitely. 
> Here is how I reproduce it: 
> 1. Start 0.9.0.1 Kafka Broker
> 2. Start consumer application and consume messages
> 3. Stop 0.9.0.1 Kafka Broker (ctrl-c or stop script)
> 4. Try to stop application...hangs at consumer.close() indefinitely. 
> I also see this same behavior using 0.10 broker and client. 
> This is my first bug reported to Kafka, so please let me know if I should be 
> following a different format. Thanks! 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-50 - Enhance Authorizer interface to be aware of supported Principal Types

2016-06-27 Thread Ashish Singh
Hello Ismael,

That is a fair suggestion. As of now, I do not see how Authorizer will be
used by clients and as such I am happy to adopt your suggested change.
Given that this KIP already went through multiple rounds of discussions and
votes, do we want to do another round of vote for this change, provided
there are no major concerns expressed? If the change is small enough, which
I think it is, I can just update the KIP and the associated PR. Let me know.
​

On Fri, Jun 24, 2016 at 12:34 AM, Ismael Juma  wrote:

> I'm really late to this thread, but I want to propose a small change: what
> if we changed the proposed package for the authorizer from
> `org.apache.kafka.common.security.auth` to
> `org.apache.kafka.server.security.auth` (while still living in the
> `clients` jar)?
>
> The advantages are:
> 1. It would be obvious that this is only used by the server.
> 2. We can set the checkstyle rules so that code in the `clients` and
> `common` packages don't call into the `server` package.
> 3. If we ever decide to move these server pluggable classes to their own
> module, we can do it without breaking source/binary compatibility, it would
> only require users to update their build configurations to depend on the
> new module (and in a transition period, we can make the `clients` module
> depend on this new module so that no change is required at all).
>
> The only downside I can see is that it's weird to have a `server` package
> in a `clients` jar, but that is just making explicit what is happening
> either way (we are adding a server-only interface to the `clients` jar).
>
> Thoughts?
>
> Ismael
>
> On Fri, Apr 22, 2016 at 10:56 PM, Ashish Singh 
> wrote:
>
> > Hey Guys,
> >
> > If there are no objections or major modifications to suggest I would like
> > to start a vote thread on this. I will wait till EOD today before
> starting
> > a vote thread.
> >
> > On Thu, Apr 21, 2016 at 4:36 PM, Gwen Shapira  wrote:
> >
> > > I would like to suggest taking the discussion of "how to break Kafka
> > > down into modules" outside the scope of KIP-50 and outside the scope
> > > of the next release.
> > >
> > > I understand that the current location of the authorizer API is not
> > > ideal, but I want to point out that the scope was already expanded
> > > from a new method to a complete rewrite of the authorizer. Is the
> > > current location really bad enough to expand the scope into larger
> > > refactoring of Kafka?
> > >
> > > Gwen
> > >
> > > On Wed, Apr 20, 2016 at 10:43 PM, Ismael Juma 
> wrote:
> > > > Hi Jay,
> > > >
> > > > Thanks for summarising the reasoning for the current approach. On the
> > > topic
> > > > of additional jars, the obvious example that came up recently is
> > sharing
> > > > JSON serializers between connect and streams. Given the desire not to
> > > add a
> > > > Jackson dependency to clients, it seems like adding a
> > > kafka-serializer-json
> > > > (or something like that) may be needed. This is similar to the
> > > > kafka-log4j-appender jar that we have today.
> > > >
> > > > When you look at it this way, then the situation is not as clear-cut
> as
> > > > initially described. Perhaps a way to explain this is that we only
> add
> > > > additional modules when they introduce a new dependency.
> > > >
> > > > Finally, it seems a bit weird to add something to `common` that is,
> in
> > > > fact, not common. Would it not make sense to have a separate package
> > for
> > > > pluggable core/server classes (because they are pluggable we want
> them
> > to
> > > > be in Java and not to be associated with a particular Scala version)?
> > > >
> > > > Ismael
> > > >
> > > > On Wed, Apr 20, 2016 at 4:52 PM, Jay Kreps  wrote:
> > > >
> > > >> Yeah our take when we came up with this approach was pretty much
> what
> > > Gwen
> > > >> is saying:
> > > >> 1. In practice you either need the server or client to do anything
> and
> > > the
> > > >> server depends on the client so bundling common and client doesn't
> > hurt.
> > > >> 2. Our experience with more granular jars (not in Kafka) was that
> > > although
> > > >> it feels "cleaner" the complexity comes quickly for a few reasons.
> > > First it
> > > >> gets hard to detangle the more granular packages (e.g. somebody
> needs
> > to
> > > >> use something in Utils in the authorizer package and then you no
> > longer
> > > >> have a dag). Second people end up mixing and matching in ways you
> > didn't
> > > >> anticipate which causes crazy heisenbugs (e.g. they depend on two
> > > different
> > > >> versions of the client via transitive dependencies and somehow end
> up
> > > with
> > > >> client version x and common version y due to duplicate entries on
> the
> > > class
> > > >> path).
> > > >>
> > > >> I'm not really arguing that this approach is superior, I'm just
> saying
> > > this
> > > >> is the current approach and that is the reason we 

[jira] [Assigned] (KAFKA-3905) remove null from subscribed topics in KafkaConsumer#subscribe

2016-06-27 Thread Rekha Joshi (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3905?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rekha Joshi reassigned KAFKA-3905:
--

Assignee: Rekha Joshi

> remove null from subscribed topics  in KafkaConsumer#subscribe
> --
>
> Key: KAFKA-3905
> URL: https://issues.apache.org/jira/browse/KAFKA-3905
> Project: Kafka
>  Issue Type: Wish
>  Components: clients
>Affects Versions: 0.10.0.0
>Reporter: Xing Huang
>Assignee: Rekha Joshi
>Priority: Minor
>
> Currently, KafkaConsumer's subscribe methods accept Collection as 
> topics to be subscribed, but a Collection may have null as its element. For 
> example
> {code}
> String topic = null;
> Collection topics = Arrays.asList(topic);
> consumer.subscribe(topics)
> {code}
> When this happens, consumer will throw a puzzling NullPointerException:
> {code}
>   at org.apache.kafka.common.utils.Utils.utf8Length(Utils.java:245)
>   at org.apache.kafka.common.protocol.types.Type$6.sizeOf(Type.java:248)
>   at 
> org.apache.kafka.common.protocol.types.ArrayOf.sizeOf(ArrayOf.java:85)
>   at org.apache.kafka.common.protocol.types.Schema.sizeOf(Schema.java:89)
>   at org.apache.kafka.common.protocol.types.Struct.sizeOf(Struct.java:244)
>   at 
> org.apache.kafka.common.requests.RequestSend.serialize(RequestSend.java:35)
>   at 
> org.apache.kafka.common.requests.RequestSend.(RequestSend.java:29)
>   at 
> org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.request(NetworkClient.java:616)
>   at 
> org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:639)
>   at 
> org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:552)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:258)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:179)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:970)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:934)
> {code}
> Maybe it's better to remove null when doing subscription.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-3844) Sort configuration items in log

2016-06-27 Thread Rekha Joshi (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3844?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rekha Joshi reassigned KAFKA-3844:
--

Assignee: Rekha Joshi

> Sort configuration items in log
> ---
>
> Key: KAFKA-3844
> URL: https://issues.apache.org/jira/browse/KAFKA-3844
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 0.10.0.0
>Reporter: Xing Huang
>Assignee: Rekha Joshi
>Priority: Trivial
>
> Currently, the output of 
> org.apache.kafka.common.config.AbstractConfig#logAll() is unsorted, so it's 
> not convenient to check related configurations. The configuration items in 
> log could be sorted, so that related items be adjacent. For example, all 
> "log.*" configuration items would be adjacent.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3844) Sort configuration items in log

2016-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15351494#comment-15351494
 ] 

ASF GitHub Bot commented on KAFKA-3844:
---

GitHub user rekhajoshm opened a pull request:

https://github.com/apache/kafka/pull/1559

KAFKA-3844; Sort configuration items in log

KAFKA-3844; Sort configuration items in log

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rekhajoshm/kafka KAFKA-3844

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1559.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1559


commit c9a66992b1095616f87c5748f210b973ebc7eb01
Author: Rekha Joshi 
Date:   2016-05-26T17:48:37Z

Merge pull request #2 from apache/trunk

Apache Kafka trunk pull

commit 8d7fb005cb132440e7768a5b74257d2598642e0f
Author: Rekha Joshi 
Date:   2016-05-30T21:37:43Z

Merge pull request #3 from apache/trunk

Apache Kafka trunk pull

commit fbef9a8fb1411282fbadec46955691c3e7ba2578
Author: Rekha Joshi 
Date:   2016-06-04T23:58:02Z

Merge pull request #4 from apache/trunk

Apache Kafka trunk pull

commit 172db701bf9affda1304b684921260d1cd36ae9e
Author: Rekha Joshi 
Date:   2016-06-06T22:10:31Z

Merge pull request #6 from apache/trunk

Apache Kafka trunk pull

commit 9d18d93745cf2bc9b0ab4bb9b25d9a31196ef918
Author: Rekha Joshi 
Date:   2016-06-07T19:36:45Z

Merge pull request #7 from apache/trunk

Apache trunk pull

commit 882faea01f28aef1977f4ced6567833bcf736840
Author: Rekha Joshi 
Date:   2016-06-13T20:01:43Z

Merge pull request #8 from confluentinc/trunk

Apache kafka trunk pull

commit 851315d39c0c308d79b9575546822aa932c46a09
Author: Rekha Joshi 
Date:   2016-06-27T17:34:54Z

Merge pull request #9 from apache/trunk

Merge Apache kafka trunk

commit d36bbfc8723d2e072ff1ac8b9bb551befa2b1c1a
Author: Joshi 
Date:   2016-06-27T17:59:41Z

KAFKA-3844; Sort configuration items in log




> Sort configuration items in log
> ---
>
> Key: KAFKA-3844
> URL: https://issues.apache.org/jira/browse/KAFKA-3844
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 0.10.0.0
>Reporter: Xing Huang
>Priority: Trivial
>
> Currently, the output of 
> org.apache.kafka.common.config.AbstractConfig#logAll() is unsorted, so it's 
> not convenient to check related configurations. The configuration items in 
> log could be sorted, so that related items be adjacent. For example, all 
> "log.*" configuration items would be adjacent.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1559: KAFKA-3844; Sort configuration items in log

2016-06-27 Thread rekhajoshm
GitHub user rekhajoshm opened a pull request:

https://github.com/apache/kafka/pull/1559

KAFKA-3844; Sort configuration items in log

KAFKA-3844; Sort configuration items in log

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rekhajoshm/kafka KAFKA-3844

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1559.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1559


commit c9a66992b1095616f87c5748f210b973ebc7eb01
Author: Rekha Joshi 
Date:   2016-05-26T17:48:37Z

Merge pull request #2 from apache/trunk

Apache Kafka trunk pull

commit 8d7fb005cb132440e7768a5b74257d2598642e0f
Author: Rekha Joshi 
Date:   2016-05-30T21:37:43Z

Merge pull request #3 from apache/trunk

Apache Kafka trunk pull

commit fbef9a8fb1411282fbadec46955691c3e7ba2578
Author: Rekha Joshi 
Date:   2016-06-04T23:58:02Z

Merge pull request #4 from apache/trunk

Apache Kafka trunk pull

commit 172db701bf9affda1304b684921260d1cd36ae9e
Author: Rekha Joshi 
Date:   2016-06-06T22:10:31Z

Merge pull request #6 from apache/trunk

Apache Kafka trunk pull

commit 9d18d93745cf2bc9b0ab4bb9b25d9a31196ef918
Author: Rekha Joshi 
Date:   2016-06-07T19:36:45Z

Merge pull request #7 from apache/trunk

Apache trunk pull

commit 882faea01f28aef1977f4ced6567833bcf736840
Author: Rekha Joshi 
Date:   2016-06-13T20:01:43Z

Merge pull request #8 from confluentinc/trunk

Apache kafka trunk pull

commit 851315d39c0c308d79b9575546822aa932c46a09
Author: Rekha Joshi 
Date:   2016-06-27T17:34:54Z

Merge pull request #9 from apache/trunk

Merge Apache kafka trunk

commit d36bbfc8723d2e072ff1ac8b9bb551befa2b1c1a
Author: Joshi 
Date:   2016-06-27T17:59:41Z

KAFKA-3844; Sort configuration items in log




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: delay of producer and consumer in kafka 0.9 is too big to be accepted

2016-06-27 Thread Kafka
can someone please explain why latency is so big for me?thanks

> 在 2016年6月25日,下午11:16,Jay Kreps  写道:
> 
> Can you sanity check this with the end-to-end latency test that ships with
> Kafka in the tools package?
> 
> https://apache.googlesource.com/kafka/+/1769642bb779921267bd57d3d338591dbdf33842/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala
> 
> On Saturday, June 25, 2016, Kafka  wrote:
> 
>> Hi all,
>>my kafka cluster is composed of three brokers with each have 8core
>> cpu and 8g memory and 1g network card.
>>with java async client,I sent 100 messages with size of 1024
>> bytes per message ,the send gap between each sending is 20us,the consumer’s
>> config is like this,fetch.min.bytes is set to 1, fetch.wait.max.ms is set
>> to 100.
>>to avoid the inconformity bewteen two machines,I start producer
>> and consumer at the same machine,the machine’s configurations  have enough
>> resources to satisfy these two clients.
>> 
>>I start consumer before producer on each test,with the sending
>> timestamp in each message,when consumer receive the message,then I can got
>> the consumer delay through the substraction between current timesstamp and
>> sending timestamp.
>>when I set acks to 0,replica to 2,then the average producer delay
>> is 2.98ms, the average consumer delay is 52.23ms.
>>when I set acks to 1,replica to 2,then the average producer delay
>> is 3.9ms,the average consumer delay is 44.88ms.
>>when I set acks to -1, replica to 2, then the average producer
>> delay is 1782ms, the average consumer delay is 1786ms.
>> 
>>I have two doubts,the first is why my  consumer's delay with acks
>> settled to 0  is logger than the consumer delay witch acks settled to 1.
>> the second is why the delay of producer and consumer is so big when I set
>> acks to -1,I think this delay is can not be accepted.
>>and I found this delay is amplified with sending more messages.
>> 
>>any feedback is appreciated.
>> thanks
>> 
>> 
>> 
>> 




[GitHub] kafka pull request #1558: org.apache.kafka.streams.errors.StreamsException: ...

2016-06-27 Thread stathmarxis
GitHub user stathmarxis opened a pull request:

https://github.com/apache/kafka/pull/1558

org.apache.kafka.streams.errors.StreamsException: Failed to rebalance

I am running the below simple example in kafka streams and i got a weird 
exception which i cannot handle
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
"192.168.1.3:9092");
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());

// setting offset reset to earliest so that we can re-run the 
demo code with the same pre-loaded data
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

KStreamBuilder builder = new KStreamBuilder();

builder.stream("streams-file-input").to("streams-pipe-output");

KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();

// usually the stream application would be running forever,
// in this example we just let it run for some time and stop 
since the input data is finite.
Thread.sleep(5000L);

streams.close();


[Exception in thread "StreamThread-1" 
org.apache.kafka.streams.errors.StreamsException: Failed to rebalance
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:299)
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
Caused by: org.apache.kafka.streams.errors.ProcessorStateException: 
Error while creating the state manager
at 
org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:71)
at 
org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:86)
at 
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550)
at 
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577)
at 
org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68)
at 
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
at 
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
at 

[jira] [Commented] (KAFKA-3900) High CPU util on broker

2016-06-27 Thread Andrey Konyaev (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15350909#comment-15350909
 ] 

Andrey Konyaev commented on KAFKA-3900:
---

Any comments?

> High CPU util on broker
> ---
>
> Key: KAFKA-3900
> URL: https://issues.apache.org/jira/browse/KAFKA-3900
> Project: Kafka
>  Issue Type: Bug
> Environment: kafka = 2.11-0.10.0.0
> java version "1.8.0_91"
> amazon linux
>Reporter: Andrey Konyaev
>
> I start kafka cluster in amazon with m4.xlarge (4 cpu and 16 GB mem (14 
> allocate for kafka in heap)). Have three nodes.
> I haven't high load (6000 message/sec) and we have cpu_idle = 70%, but 
> sometime (about once a day) I see this message in server.log:
> [2016-06-24 14:52:22,299] WARN [ReplicaFetcherThread-0-2], Error in fetch 
> kafka.server.ReplicaFetcherThread$FetchRequest@6eaa1034 
> (kafka.server.ReplicaFetcherThread)
> java.io.IOException: Connection to 2 was disconnected before the response was 
> read
> at 
> kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:87)
> at 
> kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:84)
> at scala.Option.foreach(Option.scala:257)
> at 
> kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:84)
> at 
> kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:80)
> at 
> kafka.utils.NetworkClientBlockingOps$.recursivePoll$2(NetworkClientBlockingOps.scala:137)
> at 
> kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollContinuously$extension(NetworkClientBlockingOps.scala:143)
> at 
> kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$extension(NetworkClientBlockingOps.scala:80)
> at 
> kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:244)
> at 
> kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:229)
> at 
> kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
> at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:107)
> at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:98)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> I know, this can be network glitch, but why kafka eat all cpu time?
> My config:
> inter.broker.protocol.version=0.10.0.0
> log.message.format.version=0.10.0.0
> default.replication.factor=3
> num.partitions=3
> replica.lag.time.max.ms=15000
> broker.id=0
> listeners=PLAINTEXT://:9092
> log.dirs=/mnt/kafka/kafka
> log.retention.check.interval.ms=30
> log.retention.hours=168
> log.segment.bytes=1073741824
> num.io.threads=20
> num.network.threads=10
> num.partitions=1
> num.recovery.threads.per.data.dir=2
> socket.receive.buffer.bytes=102400
> socket.request.max.bytes=104857600
> socket.send.buffer.bytes=102400
> zookeeper.connection.timeout.ms=6000
> delete.topic.enable = true
> broker.max_heap_size=10 GiB 
>   
> Any ideas?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1889) Refactor shell wrapper scripts

2016-06-27 Thread Carlo Alberto Ferraris (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15350715#comment-15350715
 ] 

Carlo Alberto Ferraris commented on KAFKA-1889:
---

An additional note about {{-loggc}}: GC logs generated by the JVM 
(server-jre-8u60-linux-x64) can not be logrotated (see e.g. 
stackoverflow.com/questions/8353401/garbage-collector-log-loggc-file-rotation-with-logrotate-does-not-work-properl)

> Refactor shell wrapper scripts
> --
>
> Key: KAFKA-1889
> URL: https://issues.apache.org/jira/browse/KAFKA-1889
> Project: Kafka
>  Issue Type: Improvement
>  Components: packaging
>Reporter: Francois Saint-Jacques
>Assignee: Francois Saint-Jacques
>Priority: Minor
> Attachments: refactor-scripts-v1.patch, refactor-scripts-v2.patch
>
>
> Shell scripts in bin/ need love.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3905) remove null from subscribed topics in KafkaConsumer#subscribe

2016-06-27 Thread Xing Huang (JIRA)
Xing Huang created KAFKA-3905:
-

 Summary: remove null from subscribed topics  in 
KafkaConsumer#subscribe
 Key: KAFKA-3905
 URL: https://issues.apache.org/jira/browse/KAFKA-3905
 Project: Kafka
  Issue Type: Wish
  Components: clients
Affects Versions: 0.10.0.0
Reporter: Xing Huang
Priority: Minor


Currently, KafkaConsumer's subscribe methods accept Collection as 
topics to be subscribed, but a Collection may have null as its element. For 
example
{code}
String topic = null;
Collection topics = Arrays.asList(topic);
consumer.subscribe(topics)
{code}
When this happens, consumer will throw a puzzling NullPointerException:
{code}
at org.apache.kafka.common.utils.Utils.utf8Length(Utils.java:245)
at org.apache.kafka.common.protocol.types.Type$6.sizeOf(Type.java:248)
at 
org.apache.kafka.common.protocol.types.ArrayOf.sizeOf(ArrayOf.java:85)
at org.apache.kafka.common.protocol.types.Schema.sizeOf(Schema.java:89)
at org.apache.kafka.common.protocol.types.Struct.sizeOf(Struct.java:244)
at 
org.apache.kafka.common.requests.RequestSend.serialize(RequestSend.java:35)
at 
org.apache.kafka.common.requests.RequestSend.(RequestSend.java:29)
at 
org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.request(NetworkClient.java:616)
at 
org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:639)
at 
org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:552)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:258)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:179)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:970)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:934)
{code}
Maybe it's better to remove null when doing subscription.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1557: Improved warning to be clear on first read.

2016-06-27 Thread LuboVarga
GitHub user LuboVarga opened a pull request:

https://github.com/apache/kafka/pull/1557

Improved warning to be clear on first read.

 I hope it will make searching for reason it is actually shown a bit 
cleaner.

Changed just text. Also fixed typo (server parameter is not called 
max.message.bytes, but it is message.max.bytes) Consult 
http://kafka.apache.org/documentation.html#configuration if needed.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/LuboVarga/kafka trunk

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1557.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1557


commit de4bb9f0e642f6b72794ffe02fe433bab13a56af
Author: Ľubomír Varga 
Date:   2016-06-27T06:20:51Z

Improved warning to be clear on first read. I hope it will make searching 
for reason it is actualy shown a bit cleaner.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---