[jira] [Commented] (KAFKA-6309) add support for getting topic defaults from AdminClient

2018-06-01 Thread Manikumar (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6309?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16498033#comment-16498033
 ] 

Manikumar commented on KAFKA-6309:
--

This functionality is supported in KafkaAdminClient.describeConfigs() API.

We canĀ callĀ _"describeConfigs(topicResource, new 
DescribeConfigsOptions().includeSynonyms(true))"_ to list all the configured 
values and the precedence used to obtain the currently configured value.

more details: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-226+-+Dynamic+Broker+Configuration]

> add support for getting topic defaults from AdminClient
> ---
>
> Key: KAFKA-6309
> URL: https://issues.apache.org/jira/browse/KAFKA-6309
> Project: Kafka
>  Issue Type: Improvement
>Reporter: dan norwood
>Assignee: dan norwood
>Priority: Major
>
> kip here: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-234%3A+add+support+for+getting+topic+defaults+from+AdminClient



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6309) add support for getting topic defaults from AdminClient

2018-03-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6309?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16387310#comment-16387310
 ] 

ASF GitHub Bot commented on KAFKA-6309:
---

guozhangwang closed pull request #4624: KAFKA-6309: Improve task assignor load 
balance
URL: https://github.com/apache/kafka/pull/4624
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
index de8fa57e36a..5b54d08c032 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
@@ -20,10 +20,13 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
@@ -106,14 +109,13 @@ private void assignActive() {
 }
 
 // assign any remaining unassigned tasks
-for (final TaskId taskId : unassigned) {
+List sortedTasks = new ArrayList<>(unassigned);
+Collections.sort(sortedTasks);
+for (final TaskId taskId : sortedTasks) {
 allocateTaskWithClientCandidates(taskId, clients.keySet(), true);
 }
-
 }
 
-
-
 private void allocateTaskWithClientCandidates(final TaskId taskId, final 
Set clientsWithin, final boolean active) {
 final ClientState client = findClient(taskId, clientsWithin, active);
 taskPairs.addPairs(taskId, client.assignedTasks());
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
index 4f770c86239..ed22e3c30de 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
@@ -23,11 +23,13 @@
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.TreeSet;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -350,6 +352,42 @@ public void 
shouldAssignMoreTasksToClientWithMoreCapacity() {
 assertThat(clients.get(p1).assignedTaskCount(), equalTo(4));
 }
 
+@Test
+public void shouldEvenlyDistributeByTaskIdAndPartition() {
+createClient(p1, 4);
+createClient(p2, 4);
+createClient(p3, 4);
+createClient(p4, 4);
+
+final List taskIds = new ArrayList<>();
+final TaskId[] taskIdArray = new TaskId[16];
+
+for (int i = 1; i <= 2; i++) {
+for (int j = 0; j < 8; j++) {
+taskIds.add(new TaskId(i, j));
+}
+}
+
+Collections.shuffle(taskIds);
+taskIds.toArray(taskIdArray);
+
+final StickyTaskAssignor taskAssignor = 
createTaskAssignor(taskIdArray);
+taskAssignor.assign(0);
+
+Collections.sort(taskIds);
+final Set expectedClientOneAssignment = 
getExpectedTaskIdAssignment(taskIds, 0, 4, 8, 12);
+final Set expectedClientTwoAssignment = 
getExpectedTaskIdAssignment(taskIds, 1, 5, 9, 13);
+final Set expectedClientThreeAssignment = 
getExpectedTaskIdAssignment(taskIds, 2, 6, 10, 14);
+final Set expectedClientFourAssignment = 
getExpectedTaskIdAssignment(taskIds, 3, 7, 11, 15);
+
+final Map sortedAssignments = 
sortClientAssignments(clients);
+
+assertThat(sortedAssignments.get(p1), 
equalTo(expectedClientOneAssignment));
+assertThat(sortedAssignments.get(p2), 
equalTo(expectedClientTwoAssignment));
+assertThat(sortedAssignments.get(p3), 
equalTo(expectedClientThreeAssignment));
+assertThat(sortedAssignments.get(p4), 
equalTo(expectedClientFourAssignment));
+}
+
 
 @Test
 public void shouldNotHaveSameAssignmentOnAnyTwoHosts() {
@@ -665,4 +703,21 @@ private void 
assertActiveTaskTopicGroupIdsEvenlyDistributed() {
 }
 }
 
+private Map 

[jira] [Commented] (KAFKA-6309) add support for getting topic defaults from AdminClient

2018-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6309?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16378712#comment-16378712
 ] 

ASF GitHub Bot commented on KAFKA-6309:
---

bbejeck opened a new pull request #4624: KAFKA-6309: improve task assignor load 
balance
URL: https://github.com/apache/kafka/pull/4624
 
 
   Sorts TaskIds on first assignment evenly distributing tasks by 
`topicGroupId` should help with evening the load of work across topologies.  
This PR is an initial "strawman" approach which will be followed up  (at a 
later date YTBD) by scoring or assigning weight to processing nodes to ensure 
even processing distribution.
   
   Added a new test to existing unit test.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> add support for getting topic defaults from AdminClient
> ---
>
> Key: KAFKA-6309
> URL: https://issues.apache.org/jira/browse/KAFKA-6309
> Project: Kafka
>  Issue Type: Improvement
>Reporter: dan norwood
>Assignee: dan norwood
>Priority: Major
>
> kip here: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-234%3A+add+support+for+getting+topic+defaults+from+AdminClient



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6309) add support for getting topic defaults from AdminClient

2018-01-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6309?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16324854#comment-16324854
 ] 

ASF GitHub Bot commented on KAFKA-6309:
---

guozhangwang opened a new pull request #4421: KAFKA-6309: Return value getter 
based on KTable materialization status
URL: https://github.com/apache/kafka/pull/4421
 
 
   This is a bug fix that is composed of two parts:
   
   1. The major part is, for all operators that is generating a KTable, we 
should construct its value getter based on whether the KTable itself is 
materialized.
   1.a If yes, then query the materialized store directly for value getter.
   1.b If not, then hand over to its parents value getter (recursively) and 
apply the computation to return.
   
   2. The minor part is, in KStreamImpl, when joining with a table, we should 
connect with table's `valueGetterSupplier().storeNames()`, not the 
`internalStoreName()` as the latter always assume that the KTable is 
materialized, but that is not always true.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> add support for getting topic defaults from AdminClient
> ---
>
> Key: KAFKA-6309
> URL: https://issues.apache.org/jira/browse/KAFKA-6309
> Project: Kafka
>  Issue Type: Improvement
>Reporter: dan norwood
>Assignee: dan norwood
>
> kip here: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-234%3A+add+support+for+getting+topic+defaults+from+AdminClient



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)