[jira] [Commented] (KAFKA-4696) Streams standby task assignment should be state-store aware

2019-10-08 Thread Vinoth Chandar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16947200#comment-16947200
 ] 

Vinoth Chandar commented on KAFKA-4696:
---

My bad. you are right. and seems like this is tracked as a part of KIP itself. 

> Streams standby task assignment should be state-store aware
> ---
>
> Key: KAFKA-4696
> URL: https://issues.apache.org/jira/browse/KAFKA-4696
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.2.0, 0.11.0.0
>Reporter: Damian Guy
>Assignee: Richard Yu
>Priority: Major
>
> Task Assignment is currently not aware of which tasks have State Stores. This 
> can result in uneven balance of standby task assignment as all tasks are 
> assigned, but only those tasks with state-stores are ever created by 
> {{StreamThread}}. So what seems like an optimal strategy during assignment 
> time could be sub-optimal post-assignment.
> For example, lets say we have 4 tasks (2 with state-stores), 2 clients, 
> numStandbyReplicas = 1. Each client would get 2 active and 2 standby tasks.  
> One of the clients may end up with both state-store tasks, while the other 
> has none.
> Further to this, standby task configuration is currently "all or nothing". It 
> might make sense to allow more fine grained configuration, i.e., the ability 
> to specify the number of standby replicas individually for each stateful 
> operator.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-4696) Streams standby task assignment should be state-store aware

2019-10-08 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16947190#comment-16947190
 ] 

Sophie Blee-Goldman commented on KAFKA-4696:


I don't think they were actually merged, just closed. Anyways Streams doesn't 
seem to assign in a state store aware fashion just yet, but I'd say this ticket 
will be subsumed by KIP-441

> Streams standby task assignment should be state-store aware
> ---
>
> Key: KAFKA-4696
> URL: https://issues.apache.org/jira/browse/KAFKA-4696
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.2.0, 0.11.0.0
>Reporter: Damian Guy
>Assignee: Richard Yu
>Priority: Major
>
> Task Assignment is currently not aware of which tasks have State Stores. This 
> can result in uneven balance of standby task assignment as all tasks are 
> assigned, but only those tasks with state-stores are ever created by 
> {{StreamThread}}. So what seems like an optimal strategy during assignment 
> time could be sub-optimal post-assignment.
> For example, lets say we have 4 tasks (2 with state-stores), 2 clients, 
> numStandbyReplicas = 1. Each client would get 2 active and 2 standby tasks.  
> One of the clients may end up with both state-store tasks, while the other 
> has none.
> Further to this, standby task configuration is currently "all or nothing". It 
> might make sense to allow more fine grained configuration, i.e., the ability 
> to specify the number of standby replicas individually for each stateful 
> operator.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-4696) Streams standby task assignment should be state-store aware

2019-10-08 Thread Vinoth Chandar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16947183#comment-16947183
 ] 

Vinoth Chandar commented on KAFKA-4696:
---

Looks like the linked PRs have merged? Close this? 

> Streams standby task assignment should be state-store aware
> ---
>
> Key: KAFKA-4696
> URL: https://issues.apache.org/jira/browse/KAFKA-4696
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.2.0, 0.11.0.0
>Reporter: Damian Guy
>Assignee: Richard Yu
>Priority: Major
>
> Task Assignment is currently not aware of which tasks have State Stores. This 
> can result in uneven balance of standby task assignment as all tasks are 
> assigned, but only those tasks with state-stores are ever created by 
> {{StreamThread}}. So what seems like an optimal strategy during assignment 
> time could be sub-optimal post-assignment.
> For example, lets say we have 4 tasks (2 with state-stores), 2 clients, 
> numStandbyReplicas = 1. Each client would get 2 active and 2 standby tasks.  
> One of the clients may end up with both state-store tasks, while the other 
> has none.
> Further to this, standby task configuration is currently "all or nothing". It 
> might make sense to allow more fine grained configuration, i.e., the ability 
> to specify the number of standby replicas individually for each stateful 
> operator.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-4696) Streams standby task assignment should be state-store aware

2019-01-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16731747#comment-16731747
 ] 

ASF GitHub Bot commented on KAFKA-4696:
---

ConcurrencyPractitioner commented on pull request #5012: KAFKA-4696: Streams 
standby task assignment should be state-store aware
URL: https://github.com/apache/kafka/pull/5012
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Streams standby task assignment should be state-store aware
> ---
>
> Key: KAFKA-4696
> URL: https://issues.apache.org/jira/browse/KAFKA-4696
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.2.0, 0.11.0.0
>Reporter: Damian Guy
>Assignee: Richard Yu
>Priority: Major
>
> Task Assignment is currently not aware of which tasks have State Stores. This 
> can result in uneven balance of standby task assignment as all tasks are 
> assigned, but only those tasks with state-stores are ever created by 
> {{StreamThread}}. So what seems like an optimal strategy during assignment 
> time could be sub-optimal post-assignment.
> For example, lets say we have 4 tasks (2 with state-stores), 2 clients, 
> numStandbyReplicas = 1. Each client would get 2 active and 2 standby tasks.  
> One of the clients may end up with both state-store tasks, while the other 
> has none.
> Further to this, standby task configuration is currently "all or nothing". It 
> might make sense to allow more fine grained configuration, i.e., the ability 
> to specify the number of standby replicas individually for each stateful 
> operator.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4696) Streams standby task assignment should be state-store aware

2018-06-14 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512722#comment-16512722
 ] 

Richard Yu commented on KAFKA-4696:
---

Hi, I will probably restart my work on this.

> Streams standby task assignment should be state-store aware
> ---
>
> Key: KAFKA-4696
> URL: https://issues.apache.org/jira/browse/KAFKA-4696
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.2.0, 0.11.0.0
>Reporter: Damian Guy
>Assignee: Richard Yu
>Priority: Major
>
> Task Assignment is currently not aware of which tasks have State Stores. This 
> can result in uneven balance of standby task assignment as all tasks are 
> assigned, but only those tasks with state-stores are ever created by 
> {{StreamThread}}. So what seems like an optimal strategy during assignment 
> time could be sub-optimal post-assignment.
> For example, lets say we have 4 tasks (2 with state-stores), 2 clients, 
> numStandbyReplicas = 1. Each client would get 2 active and 2 standby tasks.  
> One of the clients may end up with both state-store tasks, while the other 
> has none.
> Further to this, standby task configuration is currently "all or nothing". It 
> might make sense to allow more fine grained configuration, i.e., the ability 
> to specify the number of standby replicas individually for each stateful 
> operator.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4696) Streams standby task assignment should be state-store aware

2018-05-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16473245#comment-16473245
 ] 

ASF GitHub Bot commented on KAFKA-4696:
---

ConcurrencyPractitioner opened a new pull request #5012: [KAFKA-4696] Streams 
standby task assignment should be state-store aware
URL: https://github.com/apache/kafka/pull/5012
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Streams standby task assignment should be state-store aware
> ---
>
> Key: KAFKA-4696
> URL: https://issues.apache.org/jira/browse/KAFKA-4696
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.2.0, 0.11.0.0
>Reporter: Damian Guy
>Assignee: Richard Yu
>Priority: Major
>
> Task Assignment is currently not aware of which tasks have State Stores. This 
> can result in uneven balance of standby task assignment as all tasks are 
> assigned, but only those tasks with state-stores are ever created by 
> {{StreamThread}}. So what seems like an optimal strategy during assignment 
> time could be sub-optimal post-assignment.
> For example, lets say we have 4 tasks (2 with state-stores), 2 clients, 
> numStandbyReplicas = 1. Each client would get 2 active and 2 standby tasks.  
> One of the clients may end up with both state-store tasks, while the other 
> has none.
> Further to this, standby task configuration is currently "all or nothing". It 
> might make sense to allow more fine grained configuration, i.e., the ability 
> to specify the number of standby replicas individually for each stateful 
> operator.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4696) Streams standby task assignment should be state-store aware

2018-05-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16473240#comment-16473240
 ] 

ASF GitHub Bot commented on KAFKA-4696:
---

ConcurrencyPractitioner closed pull request #4615: [KAFKA-4696] Streams standby 
task assignment should be state-store aware
URL: https://github.com/apache/kafka/pull/4615
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java 
b/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
index f4c9ce00814..94fce7f9ccf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
@@ -32,10 +32,13 @@
 public final int topicGroupId;
 /** The ID of the partition. */
 public final int partition;
+/** The number of State Stores in the task. */
+private int numberOfStateStores;
 
 public TaskId(int topicGroupId, int partition) {
 this.topicGroupId = topicGroupId;
 this.partition = partition;
+this.setNumberOfStateStores(0);
 }
 
 public String toString() {
@@ -74,13 +77,20 @@ public static TaskId readFrom(DataInputStream in) throws 
IOException {
 return new TaskId(in.readInt(), in.readInt());
 }
 
-public void writeTo(ByteBuffer buf) {
+public void writeTo(ByteBuffer buf, final int version) {
 buf.putInt(topicGroupId);
 buf.putInt(partition);
+if (version == 2) {
+buf.putInt(numberOfStateStores);
+}
 }
 
-public static TaskId readFrom(ByteBuffer buf) {
-return new TaskId(buf.getInt(), buf.getInt());
+public static TaskId readFrom(ByteBuffer buf, final int version) {
+final TaskId result = new TaskId(buf.getInt(), buf.getInt());
+if (version == 2) {
+result.setNumberOfStateStores(buf.getInt());
+}
+return result;
 }
 
 @Override
@@ -111,4 +121,12 @@ public int compareTo(TaskId other) {
 (this.partition > other.partition ? 1 :
 0)));
 }
+
+public int numberOfStateStores() {
+return numberOfStateStores;
+}
+
+public void setNumberOfStateStores(int numberOfStateStores) {
+this.numberOfStateStores = numberOfStateStores;
+}
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index d9c827fff52..2f414ef11f8 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -68,6 +68,7 @@
  final StateDirectory stateDirectory,
  final StreamsConfig config) {
 this.id = id;
+this.id.setNumberOfStateStores(topology.stateStores().size());
 this.applicationId = 
config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
 this.partitions = new HashSet<>(partitions);
 this.topology = topology;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index 2a08308a2fd..271f24dbaa2 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -121,8 +121,8 @@ public int hashCode() {
 
 void addConsumer(final String consumerMemberId, final SubscriptionInfo 
info) {
 consumers.add(consumerMemberId);
-state.addPreviousActiveTasks(info.prevTasks);
-state.addPreviousStandbyTasks(info.standbyTasks);
+state.addPreviousActiveTasks(info.prevTasks());
+state.addPreviousStandbyTasks(info.standbyTasks());
 state.incrementCapacity();
 }
 
@@ -288,11 +288,11 @@ public Subscription subscription(Set topics) {
 SubscriptionInfo info = 
SubscriptionInfo.decode(subscription.userData());
 
 // create the new client metadata if necessary
-ClientMetadata clientMetadata = 
clientsMetadata.get(info.processId);
+ClientMetadata clientMetadata = 
clientsMetadata.get(info.processId());
 
 if (clientMetadata == null) {
-clientMetadata = new ClientMetadata(info.userEndPoint);
-clientsMetadata.put(info.processId, clientMetadat

[jira] [Commented] (KAFKA-4696) Streams standby task assignment should be state-store aware

2018-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16373589#comment-16373589
 ] 

ASF GitHub Bot commented on KAFKA-4696:
---

ConcurrencyPractitioner opened a new pull request #4615: [KAFKA-4696] Streams 
standby task assignment should be state-store aware
URL: https://github.com/apache/kafka/pull/4615
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Streams standby task assignment should be state-store aware
> ---
>
> Key: KAFKA-4696
> URL: https://issues.apache.org/jira/browse/KAFKA-4696
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.2.0, 0.11.0.0
>Reporter: Damian Guy
>Priority: Major
>
> Task Assignment is currently not aware of which tasks have State Stores. This 
> can result in uneven balance of standby task assignment as all tasks are 
> assigned, but only those tasks with state-stores are ever created by 
> {{StreamThread}}. So what seems like an optimal strategy during assignment 
> time could be sub-optimal post-assignment.
> For example, lets say we have 4 tasks (2 with state-stores), 2 clients, 
> numStandbyReplicas = 1. Each client would get 2 active and 2 standby tasks.  
> One of the clients may end up with both state-store tasks, while the other 
> has none.
> Further to this, standby task configuration is currently "all or nothing". It 
> might make sense to allow more fine grained configuration, i.e., the ability 
> to specify the number of standby replicas individually for each stateful 
> operator.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4696) Streams standby task assignment should be state-store aware

2018-02-21 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16372081#comment-16372081
 ] 

Guozhang Wang commented on KAFKA-4696:
--

[~Yohan123] That is a good question, and here are the initial thoughts: we will 
enrich the rebalance protocol metadata (today it already contain the prev tasks 
in order to enable sticky assignment) to include the information about the 
number of state stores for each task, and use that as the approximate "weight" 
of the task.

If we want to go even further, we can encode each task's "restoration progress 
indicator", which will be computed as the sum of {{log-end-offset - 
checkpointed-offset}} across all state stores of that task, and this indicate 
how far the task need to be restored before it is ready to be processed, and 
hence can be used as a more accurate "weight" of the tasks.

> Streams standby task assignment should be state-store aware
> ---
>
> Key: KAFKA-4696
> URL: https://issues.apache.org/jira/browse/KAFKA-4696
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.2.0, 0.11.0.0
>Reporter: Damian Guy
>Priority: Major
>
> Task Assignment is currently not aware of which tasks have State Stores. This 
> can result in uneven balance of standby task assignment as all tasks are 
> assigned, but only those tasks with state-stores are ever created by 
> {{StreamThread}}. So what seems like an optimal strategy during assignment 
> time could be sub-optimal post-assignment.
> For example, lets say we have 4 tasks (2 with state-stores), 2 clients, 
> numStandbyReplicas = 1. Each client would get 2 active and 2 standby tasks.  
> One of the clients may end up with both state-store tasks, while the other 
> has none.
> Further to this, standby task configuration is currently "all or nothing". It 
> might make sense to allow more fine grained configuration, i.e., the ability 
> to specify the number of standby replicas individually for each stateful 
> operator.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4696) Streams standby task assignment should be state-store aware

2018-02-17 Thread Richard Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16368437#comment-16368437
 ] 

Richard Yu commented on KAFKA-4696:
---

[~damianguy] [~mjsax] While looking through {{StickyTaskAssignor}}, I have 
found that {{StickyTaskAssignor#leastLoaded()}} is used to determine the next 
{{ClientState}} for which the task will be assigned to (note that this approach 
is used mostly for {{StickyTaskAssignor#assignStandby()}}). In 
{{leastLoaded()}}, the main mode of comparison is through the use of the 
{{ClientState#hasMoreAvailableCapacityThan()}} method which essentially 
compares the number of tasks currently    assigned and then returns which is 
less (after dividing by respective capacities to determine what fraction of 
each {{ClientState}}'s storage is occupied). However, this Jira would require 
that tasks with StateStores be distinguished from tasks without them. In 
essence, how would one compare two ClientStates which would be lighter?

One solution will be to let the tasks be weighted. (e.g. tasks with StateStores 
has weight 2 and tasks without having weight 1). However, that would bring 
about complications when dealing when the total percentage of capacity which is 
occupied. What are your thoughts on this approach?

> Streams standby task assignment should be state-store aware
> ---
>
> Key: KAFKA-4696
> URL: https://issues.apache.org/jira/browse/KAFKA-4696
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.2.0, 0.11.0.0
>Reporter: Damian Guy
>Priority: Major
>
> Task Assignment is currently not aware of which tasks have State Stores. This 
> can result in uneven balance of standby task assignment as all tasks are 
> assigned, but only those tasks with state-stores are ever created by 
> {{StreamThread}}. So what seems like an optimal strategy during assignment 
> time could be sub-optimal post-assignment.
> For example, lets say we have 4 tasks (2 with state-stores), 2 clients, 
> numStandbyReplicas = 1. Each client would get 2 active and 2 standby tasks.  
> One of the clients may end up with both state-store tasks, while the other 
> has none.
> Further to this, standby task configuration is currently "all or nothing". It 
> might make sense to allow more fine grained configuration, i.e., the ability 
> to specify the number of standby replicas individually for each stateful 
> operator.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4696) Streams standby task assignment should be state-store aware

2017-11-13 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16250727#comment-16250727
 ] 

Matthias J. Sax commented on KAFKA-4696:


[~damianguy] Is this resolved already? IIRC, we don't create state-less 
StandbyTasks anymore. Can you confirm.

> Streams standby task assignment should be state-store aware
> ---
>
> Key: KAFKA-4696
> URL: https://issues.apache.org/jira/browse/KAFKA-4696
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.2.0, 0.11.0.0
>Reporter: Damian Guy
> Fix For: 1.1.0
>
>
> Task Assignment is currently not aware of which tasks have State Stores. This 
> can result in uneven balance of standby task assignment as all tasks are 
> assigned, but only those tasks with state-stores are ever created by 
> {{StreamThread}}. So what seems like an optimal strategy during assignment 
> time could be sub-optimal post-assignment.
> For example, lets say we have 4 tasks (2 with state-stores), 2 clients, 
> numStandbyReplicas = 1. Each client would get 2 active and 2 standby tasks.  
> One of the clients may end up with both state-store tasks, while the other 
> has none.
> Further to this, standby task configuration is currently "all or nothing". It 
> might make sense to allow more fine grained configuration, i.e., the ability 
> to specify the number of standby replicas individually for each stateful 
> operator.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)