[GitHub] [storm] kishorvpatil commented on a change in pull request #3295: STORM-3660: Remove use of queues for updating credentials
kishorvpatil commented on a change in pull request #3295: URL: https://github.com/apache/storm/pull/3295#discussion_r453078872 ## File path: storm-client/src/jvm/org/apache/storm/executor/Executor.java ## @@ -124,6 +126,7 @@ protected int idToTaskBase; protected String hostname; private static final double msDurationFactor = 1.0 / TimeUnit.MILLISECONDS.toNanos(1); +private AtomicBoolean needToRefreshCreds = new AtomicBoolean(true); Review comment: Fixed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [storm] kishorvpatil commented on a change in pull request #3295: STORM-3660: Remove use of queues for updating credentials
kishorvpatil commented on a change in pull request #3295: URL: https://github.com/apache/storm/pull/3295#discussion_r452306573 ## File path: storm-client/src/jvm/org/apache/storm/executor/Executor.java ## @@ -124,6 +126,7 @@ protected int idToTaskBase; protected String hostname; private static final double msDurationFactor = 1.0 / TimeUnit.MILLISECONDS.toNanos(1); +private boolean needToRefreshCreds = true; Review comment: Since multiple threads are changing this flag- switching to _AtomicBoolean_. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [storm] kishorvpatil commented on a change in pull request #3295: STORM-3660: Remove use of queues for updating credentials
kishorvpatil commented on a change in pull request #3295: URL: https://github.com/apache/storm/pull/3295#discussion_r448387076 ## File path: storm-client/src/jvm/org/apache/storm/executor/Executor.java ## @@ -270,6 +273,15 @@ public ExecutorShutdown execute() throws Exception { @Override public void accept(Object event) { +if (this.needToRefreshCreds) { Review comment: moved this to separate method and being called fro withing `BoltExecutor` and `SpoutExecutor` call at the beginning of each iteration in the asyncloop. This would ensure that credentials are updated on executor irrespective of back pressure or no tuples to process scenarios. For backward compatibility and agreement about invoking _setCreentials_ on user implementation on _ICredentialsListener_ from executor thread, if Executor thread is stuck in _execute_ or _nextTuple_ methods delays can not be avoided. But otherwise we should be able to get credentials update on next iteration. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [storm] kishorvpatil commented on a change in pull request #3295: STORM-3660: Remove use of queues for updating credentials
kishorvpatil commented on a change in pull request #3295: URL: https://github.com/apache/storm/pull/3295#discussion_r448384044 ## File path: storm-client/src/jvm/org/apache/storm/executor/Executor.java ## @@ -270,6 +273,15 @@ public ExecutorShutdown execute() throws Exception { @Override public void accept(Object event) { +if (this.needToRefreshCreds) { +this.needToRefreshCreds = false; Review comment: The _needToRefreshCreds_ could be updated to _true_ anytime after if condition is evaluated by the worker credentials refresh thread. If there are consecutive updates, it will either execute the _setCredentials_ twice or will execute only once with latest credentials - no harm/no deadlock. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [storm] kishorvpatil commented on a change in pull request #3295: STORM-3660: Remove use of queues for updating credentials
kishorvpatil commented on a change in pull request #3295: URL: https://github.com/apache/storm/pull/3295#discussion_r448381948 ## File path: storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java ## @@ -415,13 +414,13 @@ public void updateBlobUpdates() throws IOException { public void checkCredentialsChanged() { Credentials newCreds = workerState.stormClusterState.credentials(topologyId, null); -if (!ObjectUtils.equals(newCreds, credentialsAtom.get())) { +if (!ObjectUtils.equals(newCreds, this.workerState.credentialsAtom.get())) { // This does not have to be atomic, worst case we update when one is not needed ClientAuthUtils.updateSubject(subject, autoCreds, (null == newCreds) ? null : newCreds.get_creds()); for (IRunningExecutor executor : executorsAtom.get()) { executor.credentialsChanged(newCreds); } -credentialsAtom.set(newCreds); +this.workerState.credentialsAtom.set(newCreds); Review comment: Moved this up before for loop. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [storm] kishorvpatil commented on a change in pull request #3295: STORM-3660: Remove use of queues for updating credentials
kishorvpatil commented on a change in pull request #3295: URL: https://github.com/apache/storm/pull/3295#discussion_r448381511 ## File path: storm-client/src/jvm/org/apache/storm/executor/Executor.java ## @@ -124,6 +126,7 @@ protected int idToTaskBase; protected String hostname; private static final double msDurationFactor = 1.0 / TimeUnit.MILLISECONDS.toNanos(1); +protected boolean needToRefreshCreds = true; Review comment: Addressed ## File path: storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java ## @@ -152,6 +153,7 @@ private final AtomicLong nextLoadUpdate = new AtomicLong(0); private final boolean trySerializeLocal; private final Collection autoCredentials; +AtomicReference credentialsAtom; Review comment: Addressed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [storm] kishorvpatil commented on a change in pull request #3295: STORM-3660: Remove use of queues for updating credentials
kishorvpatil commented on a change in pull request #3295: URL: https://github.com/apache/storm/pull/3295#discussion_r446401963 ## File path: storm-client/src/jvm/org/apache/storm/executor/Executor.java ## @@ -270,6 +273,15 @@ public ExecutorShutdown execute() throws Exception { @Override public void accept(Object event) { +if (this.needToRefreshCreds) { +LOG.info("The credentials are being updated {}.", executorId); +Credentials creds = this.workerData.getCredentials(); +idToTask.stream().map(Task::getTaskObject).filter(taskObject -> taskObject instanceof ICredentialsListener) +.forEach(taskObject -> { +((ICredentialsListener) taskObject).setCredentials(creds == null ? null : creds.get_creds()); +}); +this.needToRefreshCreds = false; Review comment: Addressed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [storm] kishorvpatil commented on a change in pull request #3295: STORM-3660: Remove use of queues for updating credentials
kishorvpatil commented on a change in pull request #3295: URL: https://github.com/apache/storm/pull/3295#discussion_r446379664 ## File path: storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java ## @@ -60,16 +60,7 @@ public ExecutorStats renderStats() { @Override public void credentialsChanged(Credentials credentials) { -TupleImpl tuple = new TupleImpl(executor.getWorkerTopologyContext(), new Values(credentials), -Constants.SYSTEM_COMPONENT_ID, (int) Constants.SYSTEM_TASK_ID, - Constants.CREDENTIALS_CHANGED_STREAM_ID); -AddressedTuple addressedTuple = new AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple); -try { -executor.getReceiveQueue().publish(addressedTuple); -executor.getReceiveQueue().flush(); -} catch (InterruptedException e) { -throw new RuntimeException(e); -} +executor.needToRefreshCreds = true; Review comment: Since the _SpoutExecutor_ and _BoltExecutor_ during init uses the initialCredentials, the _accept_ method does not need to worry about it. ## File path: storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java ## @@ -60,16 +60,7 @@ public ExecutorStats renderStats() { @Override public void credentialsChanged(Credentials credentials) { -TupleImpl tuple = new TupleImpl(executor.getWorkerTopologyContext(), new Values(credentials), -Constants.SYSTEM_COMPONENT_ID, (int) Constants.SYSTEM_TASK_ID, - Constants.CREDENTIALS_CHANGED_STREAM_ID); -AddressedTuple addressedTuple = new AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple); -try { -executor.getReceiveQueue().publish(addressedTuple); -executor.getReceiveQueue().flush(); -} catch (InterruptedException e) { -throw new RuntimeException(e); -} +executor.needToRefreshCreds = true; Review comment: e.g. https://github.com/apache/storm/blob/master/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java#L144-L146 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org