[jira] [Commented] (HELIX-748) ZkClient should not throw Exception when internal ZkConnection is reset
[ https://issues.apache.org/jira/browse/HELIX-748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16564371#comment-16564371 ] Jiajun Wang commented on HELIX-748: --- Sure, will post the code later. This is for recording the discussion only. We haven't started coding yet. > ZkClient should not throw Exception when internal ZkConnection is reset > --- > > Key: HELIX-748 > URL: https://issues.apache.org/jira/browse/HELIX-748 > Project: Apache Helix > Issue Type: Task >Reporter: Jiajun Wang >Assignee: Jiajun Wang >Priority: Major > > It is noticed that ZkClient throws an exception because of ZkConnection == > null when it is reset. > This could be caused by an expiring session handling. According to the > design, ZkClient operation should wait until reset done, instead of break the > retry. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (HELIX-748) ZkClient should not throw Exception when internal ZkConnection is reset
[ https://issues.apache.org/jira/browse/HELIX-748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16564149#comment-16564149 ] Jiajun Wang commented on HELIX-748: --- Good point. We shall certainly do that. Besides this concern, we need to resolve another issue as well. Note in the proposed code, we keep retrying on any Exceptions except ZkException or InterruptedException. This could be dangerous. If any callback logic throws random Exception because of their business logic, the client call will keep retrying forever. So, 2 options: # Check all possible Exception thrown by the Zk operation call. Only throwing KeeperExceptions so we know when to retry when to stop. # Change ZkConnection processing logic to ensure it is never to be null. In this case, any exceptions shall be related to business logic. We can safely end the retry. To implement this, we can implement an atomic connection swap logic. So that the ZkConnection ref is always valid. Based on our investigation, option 2 seems to be a cleaner design. ZkConnection is used everywhere. Any possibility that this ref to be null means more error handling work. > ZkClient should not throw Exception when internal ZkConnection is reset > --- > > Key: HELIX-748 > URL: https://issues.apache.org/jira/browse/HELIX-748 > Project: Apache Helix > Issue Type: Task >Reporter: Jiajun Wang >Assignee: Jiajun Wang >Priority: Major > > It is noticed that ZkClient throws an exception because of ZkConnection == > null when it is reset. > This could be caused by an expiring session handling. According to the > design, ZkClient operation should wait until reset done, instead of break the > retry. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (HELIX-748) ZkClient should not throw Exception when internal ZkConnection is reset
[ https://issues.apache.org/jira/browse/HELIX-748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16558786#comment-16558786 ] Jiajun Wang commented on HELIX-748: --- Change to something like this: public T retryUntilConnected(final Callable callable) throws IllegalArgumentException, ZkException { if (_zookeeperEventThread != null && Thread.currentThread() == _zookeeperEventThread) { throw new IllegalArgumentException("Must not be done in the zookeeper event thread."); } final long operationStartTime = System.currentTimeMillis(); while (true) { if (_closed) { throw new IllegalStateException("ZkClient already closed!"); } try { final ZkConnection zkConnection = (ZkConnection) getConnection(); // Validate that the connection is not null before trigger callback if (zkConnection == null || zkConnection.getZookeeper() == null) { LOG.debug( "ZkConnection is in invalid state! Retry until timeout or ZkClient closed."); } else { return callable.call(); } } catch (InterruptedException e) { throw new ZkInterruptedException(e); } catch (Exception e) { // we give the ZkClient some time to fix the connection issue. Thread.yield(); waitForRetry(); } // before attempting a retry, check whether retry timeout has elapsed if (System.currentTimeMillis() - operationStartTime > _operationRetryTimeoutInMillis) { throw new ZkTimeoutException( "Operation cannot be retried because of retry timeout (" + _operationRetryTimeoutInMillis + " milli seconds)"); } } } Need to validate if any corner cases and adding test cases. > ZkClient should not throw Exception when internal ZkConnection is reset > --- > > Key: HELIX-748 > URL: https://issues.apache.org/jira/browse/HELIX-748 > Project: Apache Helix > Issue Type: Task >Reporter: Jiajun Wang >Assignee: Jiajun Wang >Priority: Major > > It is noticed that ZkClient throws an exception because of ZkConnection == > null when it is reset. > This could be caused by an expiring session handling. According to the > design, ZkClient operation should wait until reset done, instead of break the > retry. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (HELIX-748) ZkClient should not throw Exception when internal ZkConnection is reset
Jiajun Wang created HELIX-748: - Summary: ZkClient should not throw Exception when internal ZkConnection is reset Key: HELIX-748 URL: https://issues.apache.org/jira/browse/HELIX-748 Project: Apache Helix Issue Type: Task Reporter: Jiajun Wang Assignee: Jiajun Wang It is noticed that ZkClient throws an exception because of ZkConnection == null when it is reset. This could be caused by an expiring session handling. According to the design, ZkClient operation should wait until reset done, instead of break the retry. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (HELIX-747) Replace org.codehaus.jackson with FasterXML/jackson
Jiajun Wang created HELIX-747: - Summary: Replace org.codehaus.jackson with FasterXML/jackson Key: HELIX-747 URL: https://issues.apache.org/jira/browse/HELIX-747 Project: Apache Helix Issue Type: Task Reporter: Jiajun Wang The current json lib Helix uses is out of date. We should consider replacing it with a well-maintained lib. FasterXML/jackson is compatible with the current lib we used. So it could be a good candidate. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (HELIX-727) Fix resource monitor race condition
Jiajun Wang created HELIX-727: - Summary: Fix resource monitor race condition Key: HELIX-727 URL: https://issues.apache.org/jira/browse/HELIX-727 Project: Apache Helix Issue Type: Bug Reporter: Jiajun Wang Assignee: Jiajun Wang The async monitor processing may cause resource mbean deleting failure. This will leave unnecessary mbean in the mbean server. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (HELIX-726) Add new monitor metrics for state transitions.
Jiajun Wang created HELIX-726: - Summary: Add new monitor metrics for state transitions. Key: HELIX-726 URL: https://issues.apache.org/jira/browse/HELIX-726 Project: Apache Helix Issue Type: Bug Reporter: Jiajun Wang Assignee: Jiajun Wang ClusterStatus: MissingMinActiveReplicaPartitionGauge ClusterStatus: TotalResourceGauge ClusterStatus/ResourceStatus: PendingStateTransitionsGauge ClusterStatus: StateTransitionsCounter -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (HELIX-712) Backward compatible of the rebalance algorithm
Jiajun Wang created HELIX-712: - Summary: Backward compatible of the rebalance algorithm Key: HELIX-712 URL: https://issues.apache.org/jira/browse/HELIX-712 Project: Apache Helix Issue Type: Bug Reporter: Jiajun Wang Assignee: Jiajun Wang For keeping CRUSHed stable, we need to split the logic changes made for constraint based rebalance strategy. Otherwise, some improvement will change the original assignment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (HELIX-707) Fix topstate handoff metrics.
Jiajun Wang created HELIX-707: - Summary: Fix topstate handoff metrics. Key: HELIX-707 URL: https://issues.apache.org/jira/browse/HELIX-707 Project: Apache Helix Issue Type: Bug Affects Versions: 0.8.x Reporter: Jiajun Wang Assignee: Jiajun Wang We've confirmed a bug in the logic that calculates topstate handoff duration. With this issue, if the previous master instance is offline, an older handoff start time could be used to calculate the duration. This results in huge handoff duration in the Helix metrics. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (HELIX-674) Constraint Based Resource Rebalancer
[ https://issues.apache.org/jira/browse/HELIX-674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16392019#comment-16392019 ] Jiajun Wang commented on HELIX-674: --- Due to lack of Apache Wiki privilege, I cannot create a wiki page for posting design doc there. Upload the design doc as PDF for now. > Constraint Based Resource Rebalancer > > > Key: HELIX-674 > URL: https://issues.apache.org/jira/browse/HELIX-674 > Project: Apache Helix > Issue Type: New Feature >Reporter: Jiajun Wang >Assignee: Jiajun Wang >Priority: Major > Fix For: 0.8.x > > Attachments: Constraint-BasedResourceRebalancing-080318-2226-240.pdf > > > Helix rebalancer assigns resources according to different strategies. > Recently, we optimize the strategy for evenness and minimize movement. > However, the evenness here only applies to partition numbers. Moreover, we've > got more requests for customizable rebalancer from our users. > Take partition weight as an example: > In reality, partition replicas have different size. We use "partition weight" > as an abstraction of the partition size. It can be network traffic usage, > disk usage, or any other combined factors. > Given each partition may have different weights, Helix should be able to > assign partition accordingly. So that the distribution would be even > regarding the weight. > In this project, we are planning new rebalancer mechanism that generates > resource partition assignment according to a list of "constraints". Current > rebalance strategy can be regarded as one kind of constraint. Moving forward, > Helix users would be able to extend the constraint interface using their own > logic. > Some init discussions are in progress and we will have a proposal posted here > soon. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (HELIX-678) Clear cluster even queue when controller is no longer leader
Jiajun Wang created HELIX-678: - Summary: Clear cluster even queue when controller is no longer leader Key: HELIX-678 URL: https://issues.apache.org/jira/browse/HELIX-678 Project: Apache Helix Issue Type: Task Reporter: Jiajun Wang Assignee: Jiajun Wang Currently, the controller will keep processing queued event even it is no longer the leader. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (HELIX-677) Change error log to info/warning when replica count cannot be read in ResourceMonitor.
Jiajun Wang created HELIX-677: - Summary: Change error log to info/warning when replica count cannot be read in ResourceMonitor. Key: HELIX-677 URL: https://issues.apache.org/jira/browse/HELIX-677 Project: Apache Helix Issue Type: Task Reporter: Jiajun Wang Assignee: Jiajun Wang Currently error log on this case drags too much attention. Lower the log level to info/warning according to the Exception type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (HELIX-676) Controller keeps updating idealstates when there is no real diff.
Jiajun Wang created HELIX-676: - Summary: Controller keeps updating idealstates when there is no real diff. Key: HELIX-676 URL: https://issues.apache.org/jira/browse/HELIX-676 Project: Apache Helix Issue Type: Bug Reporter: Jiajun Wang Assignee: Jiajun Wang An issue has been confirmed that controller may keep updating ideastates when PERSIST__STATE is true. This increase ZK traffic a lot, and cause performance issue. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (HELIX-675) Cluster Monitor may not be closed or init correctly when leadership changes
Jiajun Wang created HELIX-675: - Summary: Cluster Monitor may not be closed or init correctly when leadership changes Key: HELIX-675 URL: https://issues.apache.org/jira/browse/HELIX-675 Project: Apache Helix Issue Type: Task Reporter: Jiajun Wang Assignee: Jiajun Wang Due to different possible event process order, controller init event might be processed later or earlier than expected. This cause inconsistency when even handler thread process and record information in the cluster monitor. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (HELIX-674) Constraint Based Resource Rebalancer
Jiajun Wang created HELIX-674: - Summary: Constraint Based Resource Rebalancer Key: HELIX-674 URL: https://issues.apache.org/jira/browse/HELIX-674 Project: Apache Helix Issue Type: New Feature Reporter: Jiajun Wang Assignee: Jiajun Wang Helix rebalancer assigns resources according to different strategies. Recently, we optimize the strategy for evenness and minimize movement. However, the evenness here only applies to partition numbers. Moreover, we've got more requests for customizable rebalancer from our users. Take partition weight as an example: In reality, partition replicas have different size. We use "partition weight" as an abstraction of the partition size. It can be network traffic usage, disk usage, or any other combined factors. Given each partition may have different weights, Helix should be able to assign partition accordingly. So that the distribution would be even regarding the weight. In this project, we are planning new rebalancer mechanism that generates resource partition assignment according to a list of "constraints". Current rebalance strategy can be regarded as one kind of constraint. Moving forward, Helix users would be able to extend the constraint interface using their own logic. Some init discussions are in progress and we will have a proposal posted here soon. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (HELIX-672) Improve partition distribution strategy for small clusters
Jiajun Wang created HELIX-672: - Summary: Improve partition distribution strategy for small clusters Key: HELIX-672 URL: https://issues.apache.org/jira/browse/HELIX-672 Project: Apache Helix Issue Type: Task Reporter: Jiajun Wang Assignee: Jiajun Wang Current, CRUSH (also multi-round CRUSH) strategy provides even partition distribution based on statistically randomization. All CRUSH based strategy works well when the cluster is reasonably large. However, in some of our small production clusters, we find the distribution is not good as expected. The difference between heavy load node and idle node could be 30% in our case. Given we assign resource according to the max load, we are wasting 30% of resource on the idle nodes. And it could be worse in other clusters. We need to find a new algorithm or improve CRUSH for better evenness for small clusters. Note that, meanwhile, we should keep the good features such as minimum movements, deterministic calculating result. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (HELIX-667) Adding statistic measurement for the critical MBean attributes
Jiajun Wang created HELIX-667: - Summary: Adding statistic measurement for the critical MBean attributes Key: HELIX-667 URL: https://issues.apache.org/jira/browse/HELIX-667 Project: Apache Helix Issue Type: Task Reporter: Jiajun Wang Currently, all MBean attributes only support simple counter or gauge. For the attributes such as latency, statistic values would be important to understanding the system. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (HELIX-666) Adding MBean to monitor HelixCallbacks
Jiajun Wang created HELIX-666: - Summary: Adding MBean to monitor HelixCallbacks Key: HELIX-666 URL: https://issues.apache.org/jira/browse/HELIX-666 Project: Apache Helix Issue Type: Task Reporter: Jiajun Wang Need MBean to monitor HelixCallbacks to understand callback processing latency and frequency. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (HELIX-665) Adding MBean to monitor threadpoolexecutor
Jiajun Wang created HELIX-665: - Summary: Adding MBean to monitor threadpoolexecutor Key: HELIX-665 URL: https://issues.apache.org/jira/browse/HELIX-665 Project: Apache Helix Issue Type: Task Reporter: Jiajun Wang Need to add MBean to monitor different kinds of threadpoolexecutors for understanding system workload. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (HELIX-664) Adding MBean to monitor ZkClient activity
Jiajun Wang created HELIX-664: - Summary: Adding MBean to monitor ZkClient activity Key: HELIX-664 URL: https://issues.apache.org/jira/browse/HELIX-664 Project: Apache Helix Issue Type: Task Reporter: Jiajun Wang Need to add MBean to monitor ZkClient for performance evaluation. Latency, throughput etc. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (HELIX-663) Adding MBean for monitoring cluster event
Jiajun Wang created HELIX-663: - Summary: Adding MBean for monitoring cluster event Key: HELIX-663 URL: https://issues.apache.org/jira/browse/HELIX-663 Project: Apache Helix Issue Type: Task Reporter: Jiajun Wang Need a MBean to monitor cluster events processing. Event count, processing duration. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (HELIX-659) Extend Helix to Support Resource with Multiple States
[ https://issues.apache.org/jira/browse/HELIX-659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16080926#comment-16080926 ] Jiajun Wang commented on HELIX-659: --- h2. Design Details h3. Register Secondary States Model / Factory Note that if a secondary state model is a dynamic state, defaultTransitionHandler has to be implemented. *State Model Factory* public abstract class DynamicStateModelFactory extends StateModelFactory { ... } public abstract class DynamicStateModel extends StateModel { static final String DEFAULT_INITIAL_STATE = "UNKNOWN"; protected String _currentState = DEFAULT_INITIAL_STATE; public String getCurrentState() { return _currentState; } // !!! Changed part // @transition(from='from', to='to') public void defaultTransitionHandler(Message message, NotificationContext context) { logger .error("Default transition handler. The idea is to invoke this if no transition method is found. To be implemented"); } public boolean updateState(String newState) { _currentState = newState; return true; } public void rollbackOnError(Message message, NotificationContext context, StateTransitionError error) { logger.error("Default rollback method invoked on error. Error Code: " + error.getCode()); } public void reset() { logger .warn("Default reset method invoked. Either because the process longer own this resource or session timedout"); } // !! Internal State such as ERROR will still exist and supported !! // @Transition(to = "DROPPED", from = "ERROR") public void onBecomeDroppedFromError(Message message, NotificationContext context) throws Exception { logger.info("Default ERROR->DROPPED transition invoked."); } } h2. Resource Configuration Secondary states are conceptually map values. Besides the state itself, each state model may have different factory name as well. So there will be and . We keep the design that, 1. state configurations are at the partition level. 2. state factory configurations are at the resource level. In order to allow multiple states to be configured, we propose to represent it in JSON string format. Note that the state model name is used as the key, so no duplicate model can be used in one partition. *Resource config with secondary state VERSION* { "id":"Test_Resource" ,"simpleFields":{ "SECONDARY_STATE_MODEL_DEF" : "{VERSION : VersionStateModelFactory}" } ,"mapFields":{ "partition_1" : "{VERSION : 1.0.1}" ,"partition_2" : "{VERSION : 1.0.2}" } } *Additional APIs to configure secondary states* /** * Set configuration values * @param scope * @param properties */ void setConfig(HelixConfigScope scope, Map> listProperties); /** * Get configuration values * @param scope * @param keys * @return configuration values ordered by the provided keys */ Map> getConfig(HelixConfigScope scope, List keys); h3. Partitions with the Secondary States shown in Current State and External View Current state shows both the secondary state models and states in the same format with resource configuration. *Current States* { "id":"example_resource" ,"simpleFields":{ "STATE_MODEL_DEF":"MasterSlave" ,"STATE_MODEL_FACTORY_NAME":"DEFAULT" ,"BUCKET_SIZE":"0" ,"SESSION_ID":"25b2ce5dfbde0fa" ,"SECONDARY_STATE_MODEL_DEF" : "{VERSION : VersionStateModelFactory}" } ,"listFields":{ } ,"mapFields":{ "partition_1":{ "CURRENT_STATE":"MASTER" ,"SECONDARY_STATES":"{VERSION : 1.0.1}" ,"INFO":"" } ,"partition_2":{ "CURRENT_STATE":"SLAVE" ,"SECONDARY_STATES":"{VERSION : 1.0.1}" ,"INFO":"" } } } As for the external view, we have 2 options to show secondary states. 1. Compressing all states by combining the main state with secondary states. The states are separated by ":". *Secondary state in External View* { "id":"example_resource" ,"simpleFields":{ "STATE_MODEL_DEF_REF":"MasterSlave" ,"ASSOCIATE_STATE_MODEL_DEF_REFS" : "{VERSION : VersionStateModelFactory}" } ,"listFields":{ } ,"mapFields":{ "example_resource_0":{ "lca1-app0004.stg.linkedin.com_11932":"{MasterSlave : MASTER} : {VERSION : 1.0.1}" ,"lca1-app0048.stg.linkedin.com_11932":"{MasterSlave : SLAVE} : {VERSION : 1.0.0}" } } } 2. Adding new fields for showing secondary states separately. *Secondary state in External View* { "id":"example_resource" ,"simpleFields":{ "STATE_MODEL_DEF_REF":"MasterSlave" ,"ASSOCIATE_STATE_MODEL_DEF_REFS" : "{VERSION : VersionStateModelFactory}" } ,"listFields":{ } ,"mapFields":{ "example_resource_0":{ "lca1-app0004.stg.linkedin.com_11932":"MASTER" ,"lca1-app0048.stg.linkedin.com_11932":"SLAVE" ,"lca1-app0048.stg.linkedin.com_11932_SECONDARY_STATE":"{VERSION : 1.0.0}" ,"lc
[jira] [Comment Edited] (HELIX-659) Extend Helix to Support Resource with Multiple States
[ https://issues.apache.org/jira/browse/HELIX-659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16080926#comment-16080926 ] Jiajun Wang edited comment on HELIX-659 at 7/10/17 7:25 PM: h2. Design Details h3. Register Secondary States Model / Factory Note that if a secondary state model is a dynamic state, defaultTransitionHandler has to be implemented. *State Model Factory* public abstract class DynamicStateModelFactory extends StateModelFactory { ... } public abstract class DynamicStateModel extends StateModel { static final String DEFAULT_INITIAL_STATE = "UNKNOWN"; protected String _currentState = DEFAULT_INITIAL_STATE; public String getCurrentState() { return _currentState; } // !!! Changed part // @transition(from='from', to='to') public void defaultTransitionHandler(Message message, NotificationContext context) { logger .error("Default transition handler. The idea is to invoke this if no transition method is found. To be implemented"); } public boolean updateState(String newState) { _currentState = newState; return true; } public void rollbackOnError(Message message, NotificationContext context, StateTransitionError error) { logger.error("Default rollback method invoked on error. Error Code: " + error.getCode()); } public void reset() { logger .warn("Default reset method invoked. Either because the process longer own this resource or session timedout"); } // !! Internal State such as ERROR will still exist and supported !! // @Transition(to = "DROPPED", from = "ERROR") public void onBecomeDroppedFromError(Message message, NotificationContext context) throws Exception { logger.info("Default ERROR->DROPPED transition invoked."); } } h2. Resource Configuration Secondary states are conceptually map values. Besides the state itself, each state model may have different factory name as well. So there will be and . We keep the design that, 1. state configurations are at the partition level. 2. state factory configurations are at the resource level. In order to allow multiple states to be configured, we propose to represent it in JSON string format. Note that the state model name is used as the key, so no duplicate model can be used in one partition. *Resource config with secondary state VERSION* { "id":"Test_Resource" ,"simpleFields":{ "SECONDARY_STATE_MODEL_DEF" : "{VERSION : VersionStateModelFactory}" } ,"mapFields":{ "partition_1" : "{VERSION : 1.0.1}" ,"partition_2" : "{VERSION : 1.0.2}" } } *Additional APIs to configure secondary states* /** * Set configuration values * @param scope * @param properties */ void setConfig(HelixConfigScope scope, Map> listProperties); /** * Get configuration values * @param scope * @param keys * @return configuration values ordered by the provided keys */ Map> getConfig(HelixConfigScope scope, List keys); h3. Partitions with the Secondary States shown in Current State and External View Current state shows both the secondary state models and states in the same format with resource configuration. *Current States* { "id":"example_resource" ,"simpleFields":{ "STATE_MODEL_DEF":"MasterSlave" ,"STATE_MODEL_FACTORY_NAME":"DEFAULT" ,"BUCKET_SIZE":"0" ,"SESSION_ID":"25b2ce5dfbde0fa" ,"SECONDARY_STATE_MODEL_DEF" : "{VERSION : VersionStateModelFactory}" } ,"listFields":{ } ,"mapFields":{ "partition_1":{ "CURRENT_STATE":"MASTER" ,"SECONDARY_STATES":"{VERSION : 1.0.1}" ,"INFO":"" } ,"partition_2":{ "CURRENT_STATE":"SLAVE" ,"SECONDARY_STATES":"{VERSION : 1.0.1}" ,"INFO":"" } } } As for the external view, we have 2 options to show secondary states. 1. Compressing all states by combining the main state with secondary states. The states are separated by ":". *Secondary state in External View* { "id":"example_resource" ,"simpleFields":{ "STATE_MODEL_DEF_REF":"MasterSlave" ,"ASSOCIATE_STATE_MODEL_DEF_REFS" : "{VERSION : VersionStateModelFactory}" } ,"listFields":{ } ,"mapFields":{ "example_resource_0":{ "app0004.stg.com_11900":"{MasterSlave : MASTER} : {VERSION : 1.0.1}" ,"app0048.stg.com_11900":"{MasterSlave : SLAVE} : {VERSION : 1.0.0}" } } } 2. Adding new fields for showing secondary states separately. *Secondary state in External View* { "id":"example_resource" ,"simpleFields":{ "STATE_MODEL_DEF_REF":"MasterSlave" ,"ASSOCIATE_STATE_MODEL_DEF_REFS" : "{VERSION : VersionStateModelFactory}" } ,"listFields":{ } ,"mapFields":{ "example_resource_0":{ "app0004.stg.com_11900":"MASTER" ,"app0048.stg.com_11900":"SLAVE" ,"app0048.stg.com_11900_SECONDARY_STATE":"{VERSION : 1.0.0}" ,"app0048.stg.com_11900_SEC
[jira] [Comment Edited] (HELIX-659) Extend Helix to Support Resource with Multiple States
[ https://issues.apache.org/jira/browse/HELIX-659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16030755#comment-16030755 ] Jiajun Wang edited comment on HELIX-659 at 7/10/17 7:19 PM: Based on all that is discussed above, let us imagine a resource represented by 3 independent state models: MasterSlave, ReadWrite, and Versions. The following figure shows three possible state transitions for a replica of the resource. !https://documents.lucidchart.com/documents/e19ab04e-aa06-4ab3-9e57-cfe273554fa1/pages/0_0?a=2213&x=-11&y=422&w=1124&h=396&store=1&accept=image%2F*&auth=LCA%20ef1c4685cd5e2f5bbded5596a92e76f1a84fb390-ts%3D1497894598! Partition 1 has some internal error. So although it is still the master, it is transited to "Error" state. Meantime, it's version needs to be upgraded. Partition 2 is changed to "R/W". Probably because partition 1 is no longer servicing as an "R/W" node. As for partition 3, all its states are changed. The difficulties of supporting this request using current Helix system include but not limited to the following aspects. *It is hard to define state machine or transition constraint for all state models using the single state model* For a dynamic state, pre-defined state model won't work at all. But even we only consider regular state, there is still a problem. Based on our existing framework, in order to support such scenario, we will need to create a very complex state model that combines all 3 models. The result will be 2 * 3 * 4 = 24 states and around 80 possible transition paths, which will be super hard to code. *It will be potentially low efficient to do states transition* Imagine that each state transition message contains the delta of a single state. The messages should be as following. Partitions State transitions R1 (Online, R/W, 1.0.1) → (Online, Error, 1.0.1) (Online, Error, 1.0.1) → (Online, Error, 1.0.2) R2 (Online, Init, 1.0.1) → (Online, R/W, 1.0.1) R3 (Offline, Init, 1.0.1) → (Online, Init, 1.0.1) (Online, Init, 1.0.1) → (Online, Ready, 1.0.1) (Online, Ready, 1.0.1) → (Online, Ready, 1.0.2) Obviously, this strategy increases traffic and make the whole transition process much slower. So a simpler design is that a message carries all necessary information. Partitions State transitions R1 (Online, R/W, 1.0.1) → (Online, Error, 1.0.2) R2 (Online, Init, 1.0.1) → (Online, R/W, 1.0.1) R3 (Offline, Init, 1.0.1) → (Online, Ready, 1.0.2) But this design brings other issues. # When a participant gets a message, it may report the new states after finish all the changes. Among all these states, if one state transition takes a considerably longer time than others, the whole process is blocked. # The controller has less control on how a participant does states transitions. It is a problem if any policy like Helix State Transition Priority Support needs to be applied. # On the other hand, the participant needs to check the message and compare status. It's hard to ensure backward compatibility. *Helix is not able to calculate the best possible state for every state model* With dynamic state, we allow the application to manage state transition. So the state model is not defined with a complete constraint and requirement. Helix cannot calculate the best possible states. Moreover, even for a nondynamic state, the application may want to trigger the transition based on some external factors. In this case, Helix only coordinates the state transition. But it won't make the best possible states plan. In order to let the user define such states, we need to provide a new state model type. And Helix should be able to interpret the definition and generate transition messages correctly. h2. Additional Case Study h3. Ambry R/W State In Ambry, a partition has an "R/W" state in addition to OnlineOffline state. So the partition can be "ONLINE:READ" or "ONLINE:WRITE". The "R/W" state is for indicating whether this partition is for read-only or writable. There may be state transitions as shown following. * The first state transition is conducted by the Ambry application. * The second one is regular state transition managed by Helix. !https://documents.lucidchart.com/documents/e19ab04e-aa06-4ab3-9e57-cfe273554fa1/pages/0_0?a=1272&x=13&y=824&w=647&h=352&store=1&accept=image%2F*&auth=LCA%206f398192ee541fa7519801ec33ae2ae4f6e02bef-ts%3D1496770738! Note that the "R/W" state model is still regular model. Which means the state is pre-defined and the constraint will still be defined as a regular state. h3. Pinot Version State In Pinot, when a new version of data is ready, the system replaces old partitions with the new ones. If the replacement is done one partition by another, any read that is queried during the upgrade period will get inconsistent data. Currently, t
[jira] [Comment Edited] (HELIX-659) Support Additional Associate States
[ https://issues.apache.org/jira/browse/HELIX-659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16030755#comment-16030755 ] Jiajun Wang edited comment on HELIX-659 at 5/31/17 8:00 AM: h1. Proposal In this document, we propose to introduce an additional layer of state mechanism into Helix. Considering Pinot case, what they need is transiting from "ONLINE:V1" to "ONLINE:V2". Note that "V1" to "V2" transition is in parallel of the existing state transition. It is special in following ways: # The state is not pre-defined. New version numbers may appear after state transition model is registered. # Helix won't understand the internal logic of this additional state. So there is no way that Helix automatically computes idea state. It will rely on application's configuration to update this state. We will take the above 2 points as assumptions. As for expected workflow, still take Pinot partition version as an example: # Pinot needs to register their own logic for version upgrade, which means a new state model (factory name). # Helix provides API to configure resources with additional state ("VERSION"). # Upon resource configuration changed, the controller triggers state transition and sends message to the participants. # Participants handles message by calling corresponding state transition methods. Then update in current state. # Controller listens on current state change. If any update, it processes and reflects the update in the external view. h1. Design h2. Register Associate States Model / Factory Note that since associate states maybe not pre-defined, so defaultTransitionHandler has to be implemented. h3. State Model Factory: public abstract class AssociateStateModelFactory extends StateModelFactory { ... } public abstract class AssociateStateModel extends StateModel { static final String DEFAULT_INITIAL_STATE = "UNKNOWN"; protected String _currentState = DEFAULT_INITIAL_STATE; public String getCurrentState() { return _currentState; } // !!! Changed part // @transition(from='from', to='to') public void defaultTransitionHandler(Message message, NotificationContext context) { logger .error("Default transition handler. The idea is to invoke this if no transition method is found. To be implemented"); } public boolean updateState(String newState) { _currentState = newState; return true; } public void rollbackOnError(Message message, NotificationContext context, StateTransitionError error) { logger.error("Default rollback method invoked on error. Error Code: " + error.getCode()); } public void reset() { logger .warn("Default reset method invoked. Either because the process longer own this resource or session timedout"); } @Transition(to = "DROPPED", from = "ERROR") public void onBecomeDroppedFromError(Message message, NotificationContext context) throws Exception { logger.info("Default ERROR->DROPPED transition invoked."); } } h2. Resource Configuration h3. Resource config with associate state VERSION: { "id":"Test_Resource" ,"simpleFields":{ } ,"listFields":{ "ASSOCIATE_STATE_MODEL_DEF_REFS": [ "VERSION" ], "ASSOCIATE_STATE_MODEL_FACTORY_NAMES": [ "DEFAULT" ], "ASSOCIATE_STATES": [ "1.0.1" ], } ,"mapFields":{ } } h2. Additional APIs to configure associate states /** * Set configuration values * @param scope * @param properties */ void setConfig(HelixConfigScope scope, Map> listProperties); /** * Get configuration values * @param scope * @param keys * @return configuration values ordered by the provided keys */ Map> getConfig(HelixConfigScope scope, List keys); h2. Partition with the Associate States on the Participant State And EV h3. Current States: { "id":"example_resource" ,"simpleFields":{ "STATE_MODEL_DEF":"MasterSlave" ,"STATE_MODEL_FACTORY_NAME":"DEFAULT" ,"BUCKET_SIZE":"0" ,"SESSION_ID":"25b2ce5dfbde0fa" } ,"listFields":{ "ASSOCIATE_STATE_MODEL_DEF_REFS": [ "VERSION" ], "ASSOCIATE_STATE_MODEL_FACTORY_NAMES": [ "DEFAULT" ] } ,"mapFields":{ "example_resource_0":{ "CURRENT_STATE":"MASTER" "ASSOCIATE_STATES":"1.0.1" // Split by ":" if multiple associate states are set ,"INFO":"" } } } h3. Associate state in External View: { "id":"example_resource" ,"simpleFields":{ ,"STATE_MODEL_DEF_REF":"MasterSlave" } ,"listFields":{ "ASSOCIATE_STATE_MODEL_DEF_REFS": [ "VERSION" ] } ,"mapFields":{ "example_resource_0":{ // Given more than one assistant states, they will be split by ":". And the main state will always be the first state. "lca1-app0004.stg.linkedin.com_11932":"MASTER:1.0.1" ,"lca1-app0048.stg.linkedin.com_11932":"SLAVE:1.0.0" }
[jira] [Comment Edited] (HELIX-659) Support Additional Associate States
[ https://issues.apache.org/jira/browse/HELIX-659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16030755#comment-16030755 ] Jiajun Wang edited comment on HELIX-659 at 5/31/17 7:17 AM: h1. Proposal In this document, we propose to introduce an additional layer of state mechanism into Helix. Considering Pinot case, what they need is transiting from "ONLINE:V1" to "ONLINE:V2". Note that "V1" to "V2" transition is in parallel of the existing state transition. It is special in following ways: # The state is not pre-defined. New version numbers may appear after state transition model is registered. # Helix won't understand the internal logic of this additional state. So there is no way that Helix automatically computes idea state. It will rely on application's configuration to update this state. We will take the above 2 points as assumptions. As for expected workflow, still take Pinot partition version as an example: # Pinot needs to register their own logic for version upgrade, which means a new state model (factory name). # Helix provides API to configure resources with additional state ("VERSION"). # Upon resource configuration changed, the controller triggers state transition and sends message to the participants. # Participants handles message by calling corresponding state transition methods. Then update in current state. # Controller listens on current state change. If any update, it processes and reflects the update in the external view. h1. Design h2. Register Associate States Model / Factory Note that since associate states maybe not pre-defined, so defaultTransitionHandler has to be implemented. h3. State Model Factory: public abstract class AssociateStateModelFactory extends StateModelFactory { ... } public abstract class AssociateStateModel extends StateModel { static final String DEFAULT_INITIAL_STATE = "UNKNOWN"; protected String _currentState = DEFAULT_INITIAL_STATE; public String getCurrentState() { return _currentState; } // !!! Changed part // @transition(from='from', to='to') public void defaultTransitionHandler(Message message, NotificationContext context) { logger .error("Default transition handler. The idea is to invoke this if no transition method is found. To be implemented"); } public boolean updateState(String newState) { _currentState = newState; return true; } public void rollbackOnError(Message message, NotificationContext context, StateTransitionError error) { logger.error("Default rollback method invoked on error. Error Code: " + error.getCode()); } public void reset() { logger .warn("Default reset method invoked. Either because the process longer own this resource or session timedout"); } @Transition(to = "DROPPED", from = "ERROR") public void onBecomeDroppedFromError(Message message, NotificationContext context) throws Exception { logger.info("Default ERROR->DROPPED transition invoked."); } } h2. Resource Configuration h3. Resource config with associate state VERSION: { "id":"Test_Resource" ,"simpleFields":{ } ,"listFields":{ "ASSOCIATE_STATE_MODEL_DEF_REFS": [ "VERSION" ], "ASSOCIATE_STATE_MODEL_FACTORY_NAMES": [ "DEFAULT" ], "ASSOCIATE_STATES": [ "1.0.1" ], } ,"mapFields":{ } } h2. Additional APIs to configure associate states /** * Set configuration values * @param scope * @param properties */ void setConfig(HelixConfigScope scope, Map> listProperties); /** * Get configuration values * @param scope * @param keys * @return configuration values ordered by the provided keys */ Map> getConfig(HelixConfigScope scope, List keys); h2. Partition with the Associate States on the Participant State And EV h3. Current States: { "id":"example_resource" ,"simpleFields":{ "STATE_MODEL_DEF":"MasterSlave" ,"STATE_MODEL_FACTORY_NAME":"DEFAULT" ,"BUCKET_SIZE":"0" ,"SESSION_ID":"25b2ce5dfbde0fa" } ,"listFields":{ "ASSOCIATE_STATE_MODEL_DEF_REFS": [ "VERSION" ], "ASSOCIATE_STATE_MODEL_FACTORY_NAMES": [ "DEFAULT" ] } ,"mapFields":{ "example_resource_0":{ "CURRENT_STATE":"MASTER" "ASSOCIATE_STATES":"1.0.1" // Split by ":" if multiple associate states are set ,"INFO":"" } } } h3. Associate state in External View: { "id":"example_resource" ,"simpleFields":{ ,"STATE_MODEL_DEF_REF":"MasterSlave" } ,"listFields":{ "ASSOCIATE_STATE_MODEL_DEF_REFS": [ "VERSION" ] } ,"mapFields":{ "example_resource_0":{ // Given more than one assistant states, they will be split by ":". And the main state will always be the first state. "lca1-app0004.stg.linkedin.com_11932":"MASTER:1.0.1" ,"lca1-app0048.stg.linkedin.com_11932":"SLAVE:1.0.0" }
[jira] [Commented] (HELIX-659) Support Additional Associate States
[ https://issues.apache.org/jira/browse/HELIX-659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16030755#comment-16030755 ] Jiajun Wang commented on HELIX-659: --- h1. Proposal In this document, we propose to introduce an additional layer of state mechanism into Helix. Considering Pinot case, what they need is transiting from "ONLINE:V1" to "ONLINE:V2". Note that "V1" to "V2" transition is in parallel of the existing state transition. It is special in following ways: # The state is not pre-defined. New version numbers may appear after state transition model is registered. # Helix won't understand the internal logic of this additional state. So there is no way that Helix automatically computes idea state. It will rely on application's configuration to update this state. We will take the above 2 points as assumptions. As for expected workflow, still take Pinot partition version as an example: # Pinot needs to register their own logic for version upgrade, which means a new state model (factory name). # Helix provides API to configure resources with additional state ("VERSION"). # Upon resource configuration changed, the controller triggers state transition and sends message to the participants. # Participants handles message by calling corresponding state transition methods. Then update in current state. # Controller listens on current state change. If any update, it processes and reflects the update in the external view. h1. Design h2. Register Associate States Model / Factory Note that since associate states maybe not pre-defined, so defaultTransitionHandler has to be implemented. State Model Factory: public abstract class AssociateStateModelFactory extends StateModelFactory { ... } public abstract class AssociateStateModel extends StateModel { static final String DEFAULT_INITIAL_STATE = "UNKNOWN"; protected String _currentState = DEFAULT_INITIAL_STATE; public String getCurrentState() { return _currentState; } // !!! Changed part // @transition(from='from', to='to') public void defaultTransitionHandler(Message message, NotificationContext context) { logger .error("Default transition handler. The idea is to invoke this if no transition method is found. To be implemented"); } public boolean updateState(String newState) { _currentState = newState; return true; } public void rollbackOnError(Message message, NotificationContext context, StateTransitionError error) { logger.error("Default rollback method invoked on error. Error Code: " + error.getCode()); } public void reset() { logger .warn("Default reset method invoked. Either because the process longer own this resource or session timedout"); } @Transition(to = "DROPPED", from = "ERROR") public void onBecomeDroppedFromError(Message message, NotificationContext context) throws Exception { logger.info("Default ERROR->DROPPED transition invoked."); } } h2. Resource Configuration Resource config with associate state VERSION: { "id":"Test_Resource" ,"simpleFields":{ } ,"listFields":{ "ASSOCIATE_STATE_MODEL_DEF_REFS": [ "VERSION" ], "ASSOCIATE_STATE_MODEL_FACTORY_NAMES": [ "DEFAULT" ], "ASSOCIATE_STATES": [ "1.0.1" ], } ,"mapFields":{ } } h2. Additional APIs to configure associate states /** * Set configuration values * @param scope * @param properties */ void setConfig(HelixConfigScope scope, Map> listProperties); /** * Get configuration values * @param scope * @param keys * @return configuration values ordered by the provided keys */ Map> getConfig(HelixConfigScope scope, List keys); h2. Partition with the Associate States on the Participant State And EV Current States: { "id":"example_resource" ,"simpleFields":{ "STATE_MODEL_DEF":"MasterSlave" ,"STATE_MODEL_FACTORY_NAME":"DEFAULT" ,"BUCKET_SIZE":"0" ,"SESSION_ID":"25b2ce5dfbde0fa" } ,"listFields":{ "ASSOCIATE_STATE_MODEL_DEF_REFS": [ "VERSION" ], "ASSOCIATE_STATE_MODEL_FACTORY_NAMES": [ "DEFAULT" ] } ,"mapFields":{ "example_resource_0":{ "CURRENT_STATE":"MASTER" "ASSOCIATE_STATES":"1.0.1" // Split by ":" if multiple associate states are set ,"INFO":"" } } } Associate state in External View: { "id":"example_resource" ,"simpleFields":{ ,"STATE_MODEL_DEF_REF":"MasterSlave" } ,"listFields":{ "ASSOCIATE_STATE_MODEL_DEF_REFS": [ "VERSION" ] } ,"mapFields":{ "example_resource_0":{ // Given more than one assistant states, they will be split by ":". And the main state will always be the first state. "lca1-app0004.stg.linkedin.com_11932":"MASTER:1.0.1" ,"lca1-app0048.stg.linkedin.com_11932":"SLAVE:1.0.0" } } } h2. Helix Controller Updates On resource configuration chan
[jira] [Created] (HELIX-659) Support Additional Associate States
Jiajun Wang created HELIX-659: - Summary: Support Additional Associate States Key: HELIX-659 URL: https://issues.apache.org/jira/browse/HELIX-659 Project: Apache Helix Issue Type: New Feature Components: helix-core Affects Versions: 0.6.x Reporter: Jiajun Wang Currently, Helix only supports management a single state for all resources/partitions. However, in the real world, cluster management requirements may be more complicated than that. In Pinot, for example, each partition need to be assigned a version for ensuring data consistency. When a new version comes, the system needs to replace the old partition with the new one. And the replacement is done one partition by one partition. So any reads during this period will get inconsistent data. Pinot system cannot directly put the version information into the section(partition) state field because it is already occupied by the main state (offline-online for instance) used by Helix controller. So Pinot team relies on some workarounds to implement their application logic: creating a new resource with the latest version and replace them after the resource is fully loaded. And for Helix controller, version is unknown. Another option is Pinot team maintaining their own config item or property store item for recording versions. Both ways require Pinot team implementing version control themselves. Another requirement is from Ambry team. Where partition can be "ONLINE:READ" or "ONLINE:WRITE". In both cases, single state mechanism is not sufficient for applications' requirement. It would be very helpful to provide a framework level feature that supports more than one states for each partition. Benefits: # The application doesn't need to write additional code for managing additional states. # Avoid potential conflict when multiple states transition happens concurrently. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (HELIX-657) Fix TestRebalancerPersistAssignments
Jiajun Wang created HELIX-657: - Summary: Fix TestRebalancerPersistAssignments Key: HELIX-657 URL: https://issues.apache.org/jira/browse/HELIX-657 Project: Apache Helix Issue Type: Bug Components: helix-core Reporter: Jiajun Wang Fix the unstable test case TestRebalancerPersistAssignments. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (HELIX-655) Helix per-participant concurrent task throttling
[ https://issues.apache.org/jira/browse/HELIX-655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15993887#comment-15993887 ] Jiajun Wang edited comment on HELIX-655 at 5/3/17 5:22 PM: --- h1. Design h2. Task Throttling Per Participant This limitation conceptually equals to max thread pool size in TaskStateModelFactory on the participant. If user constructs TaskStateModelFactory using their customized executor with a limited sized thread pool, that participant will never execute more tasks than the threshold. The problem is that since the limitation is not known by the controller, tasks will still be assigned to the participant. And they will be queued in participant thread pool and never re-assigned. It makes more sense to throttle tasks in the controller. At the same time that tasks are assigned to participants. Basically, a participant is configured with a "MaxRunnigTasksNumber". And the controller assigns task accordingly. h3. pseudo code When calculating Best possible state in the JobRebalancer Foreach Job in RunnableJobs: TaskToParticipantMapping = CalculateAssignment(Job) Foreach MappingEntry in TaskToParticipantMapping: If Running_task + ToBeAssigned_task exceeds Participant_Task_Threshold: TaskToParticipantMapping.remove(MappingEntry) [Stretch] Try next applicable participant (consider task attached to resource) The above logic can be considered as a task queue algorithm. However, the original assignment will keep relying on current logic. So if all participants have enough capacity, tasks will still be evenly dispatched. h3. participant configuration { "id" : "localhost_12918", "simpleFields" : { "HELIX_ENABLED" : "true", "HELIX_ENABLED_TIMESTAMP" : "1493326930182", "HELIX_HOST" : "localhost", "HELIX_PORT" : "12918", "MAX_RUNNING_TASK" : "55" } } h3. Backward compatible For old participants, the controller assumes the thread pool is with a default capacity 40 (equal to default message handling thread pool size). h4. Assumption Note that if some tasks have the workload that is much heavier than others, only control tasks number won't work. In this design, we assume that tasks have the approximately same workload. h3. [Stretch] Optimization Existing JobRebalancer will be trigger every time a state change event happens. That means completely sorting all pending jobs/tasks and calculate assignment. A better strategy is to maintain a Job priority queue in the controller. When a job became runnable, enqueue. When a job is complete, dequeue. Any task state update, check participant capacity and assign the task from the queue if possible. This refactoring is considered as a stretch goal. h3. Alternative option h4. "GlobalMaxConcurrentJobNumber" Per Cluster Helix controller restricts the number of running jobs. However, with this throttling, once a job is scheduled, it will occupy the slot until the finish. This will be bad when all the running jobs are long-run. No new jobs will be scheduled. Moreover, it's harder for admin to set a reasonable total job count, given workflows and jobs are usually quite different regarding their real workload. Comparing these 2 options, "MaxRunnigTasksPerParticipant" is directly related to participant's capacity. Once the controller schedule tasks according to this, we can for sure avoid overloading the instances. Even we throttle jobs, there is no guarantee about the running thread in each participant. Moreover, a user can currently control job scheduling by adjusting the frequency of submitting jobs. So "GlobalMaxConcurrentJobNumber" is not necessary. h2. Job Priority Given limited resource, which job we schedule first? h3. Schedule the jobs with the highest priority first until participants are full In this design, we proposed the simplest solution for priority control. The user can configure job resource priority or Helix will assume "age" (time that the job was scheduled) as a priority. If part of the jobs is assigned priority, others are not, Helix will assume jobs with priority setting have higher priority. One issue here is that if the application keeps sending high priority jobs to Helix, lower priority jobs will be starving. Since this is controlled by the application (and mostly desired result), Helix won't apply any additional control on these starving jobs. Our plan is: Step 1. Support job "start time" based priority Step 2. Support user defined priority h3. Alternative options h4. Option 1. Using per-job and per-workflow concurrency control to implement priority WorkflowConfig.ParallelJobs and JobConfig.numConcurrentTasksPerInstance are used to control how many jobs and tasks can be executed in parallel within a single workflow. Given that the cluster administrators can configure these numbers "correctly", workflows will be assigned expected resources eventually.
[jira] [Commented] (HELIX-655) Helix per-participant concurrent task throttling
[ https://issues.apache.org/jira/browse/HELIX-655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15993887#comment-15993887 ] Jiajun Wang commented on HELIX-655: --- h1. Design h2. Task Throttling Per Participant This limitation conceptually equals to max thread pool size in TaskStateModelFactory on the participant. If user constructs TaskStateModelFactory using their customized executor with a limited sized thread pool, that participant will never executing more tasks than the threshold. The problem is that since the limitation is not known by the controller, tasks will still be assigned to the participant. And they will be queued in participant thread pool and never re-assigned. It makes more sense to throttle tasks in the controller. At the same time that tasks are assigned to participants. Basically, participant is configured with a "MaxRunnigTasksNumber". And the controller assigns task accordingly. h3. pseudo code When calculating Best possible state in the JobRebalancer Foreach Job in RunnableJobs: TaskToParticipantMapping = CalculateAssignment(Job) Foreach MappingEntry in TaskToParticipantMapping: If Running_task + ToBeAssigned_task exceeds Participant_Task_Threshold: TaskToParticipantMapping.remove(MappingEntry) Try next applicable participant (consider task attached to resource) The above logic can be considered as a task queue algorithm. However, the original assignment will keep relying on current logic. So if all participants have enough capacity, tasks will still be evenly dispatched. h3. participant configuration { "id" : "localhost_12918", "simpleFields" : { "HELIX_ENABLED" : "true", "HELIX_ENABLED_TIMESTAMP" : "1493326930182", "HELIX_HOST" : "localhost", "HELIX_PORT" : "12918", "MAX_RUNNING_TASK" : "55" } } h3. Backward compatible For old participants, controller assumes the thread pool is with a default capacity 40 (equal to default message handling thread pool size). h4. Assumption Note that if some tasks have workload that are much heavier than others, only control tasks number won't work. In this design, we assume that tasks have the approximately same workload. h3. [Stretch] Performance optimization Existing JobRebalancer will be trigger everytime a state change event happens. That means completely sorting all pending jobs/tasks and calculate assignment. A better strategy is maintain a Job priority queue in the controller. When a job became runnable, enqueue. When a job is complete, dequeue. Any task state update, check paticipant capacity and assign task from the queue if possible. This refactoring is considered as a stretch goal. h3. Alternative option h4. "GlobalMaxConcurrentJobNumber" Per Cluster Helix controller restricts the number of running jobs. However, with this throttling, once a job is scheduled, it will occupy the slot until finish. This will be bad when all the running jobs are long-run. No new jobs will be scheduled. Moreover, it's harder for admin to set a reasonable total job count, given workflows and jobs are usually quite different regarding their real workload. Comparing these 2 options, "MaxRunnigTasksPerParticipant" is directly related to participant's capacity. Once the controller schedule tasks according to this, we can for sure avoid overloading the instances. Even we throttle jobs, there is no guarantee about the running thread in each participant. Moreover, user can currently control job scheduling by adjusting the frequency of submitting jobs. So "GlobalMaxConcurrentJobNumber" is not necessary. h2. Job Priority Given limited resource, which job we schedule first? h3. Schedule the jobs with the highest priority first until participants are full In this design, we proposed the simplest solution for priority control. User can configure job resource priority or Helix will assume "age" (time that the job was scheduled) as priority. If part of the jobs are assigned priority, others are not, Helix will assume jobs with priority setting have higher priority. One issue here is that if the application keeps sending high priority jobs to Helix, lower priority jobs will be starving. Since this is controlled by the application (and mostly desired result), Helix won't apply any additional control on these starving jobs. h3. Alternative options h4. Option 1. Using per-job and per-workflow concurrency control to implement priority WorkflowConfig.ParallelJobs and JobConfig.numConcurrentTasksPerInstance are used to control how many jobs and tasks can be executed in parallel within a single workflow. Given that the cluster administrators can configure these numbers "correctly", workflows will be assigned expected resources eventually. However, there is no promising that high priority workflows will be scheduled before others. This is because tasks are picked up randomly, so the controlle
[jira] [Created] (HELIX-655) Helix per-participant concurrent task throttling
Jiajun Wang created HELIX-655: - Summary: Helix per-participant concurrent task throttling Key: HELIX-655 URL: https://issues.apache.org/jira/browse/HELIX-655 Project: Apache Helix Issue Type: New Feature Components: helix-core Affects Versions: 0.6.x Reporter: Jiajun Wang h1. Overview Currently, all runnable jobs/tasks in Helix are equally treated. They are all scheduled according to the rebalancer algorithm. Means, their assignment might be different, but they will all be in RUNNING state. This may cause an issue if there are too many concurrently runnable jobs. When Helix controller starts all these jobs, the instances may be overload as they are assigning resources and executing all the tasks. As a result, the jobs won't be able to finish in a reasonable time window. The issue is even more critical to long run jobs. According to our meeting with Gobblin team, when a job is scheduled, they allocate resource for the job. So in the situation described above, more and more resources will be reserved for the pending jobs. The cluster will soon be exhausted. For solving the problem, an application needs to schedule jobs in a relatively low frequency (what Gobblin is doing now). This may cause low utilization. A better way to fix this issue, at framework level, is throttling jobs/tasks that are running concurrently, and allowing setting priority for different jobs to control total execute time. So given same amount of jobs, the cluster is in a better condition. As a result, jobs running in that cluster have a more controllable execute time. Existing related control mechanisms are: * ConcurrentTasksPerInstance for each job * ParallelJobs for each workflow * Threadpool limitation on the participant if user customizes TaskStateModelFactory. But none of them can directly help when concurrent workflows or jobs number is large. If an application keeps scheduling jobs/jobQueues, Helix will start any runnable jobs without considering the workload on the participants. The application may be able to carefully configures these items to achieve the goal. But they won't be able to easily find the sweet spot. Especially the cluster might be changing (scale out etc.). h2. Problem summary # All runnable tasks will start executing, which may overload the participant. # Application needs a mechanism to prioritize important jobs (or workflows). Otherwise, important tasks may be blocked by other less important ones. And allocated resource is wasted. h3. Feature proposed Based on our discussing, we proposed 2 features that can help to resolve the issue. # Running task throttling on each participant. This is for avoiding overload. # Job priority control that ensures high priority jobs are scheduled earlier. In addition, application can leverage workflow/job monitor items as feedback from Helix to adjust their stretgy. -- This message was sent by Atlassian JIRA (v6.3.15#6346)