[GitHub] [storm] kishorvpatil commented on a change in pull request #3295: STORM-3660: Remove use of queues for updating credentials

2020-07-10 Thread GitBox


kishorvpatil commented on a change in pull request #3295:
URL: https://github.com/apache/storm/pull/3295#discussion_r453078872



##
File path: storm-client/src/jvm/org/apache/storm/executor/Executor.java
##
@@ -124,6 +126,7 @@
 protected int idToTaskBase;
 protected String hostname;
 private static final double msDurationFactor = 1.0 / 
TimeUnit.MILLISECONDS.toNanos(1);
+private AtomicBoolean needToRefreshCreds = new AtomicBoolean(true);

Review comment:
   Fixed





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [storm] kishorvpatil commented on a change in pull request #3295: STORM-3660: Remove use of queues for updating credentials

2020-07-09 Thread GitBox


kishorvpatil commented on a change in pull request #3295:
URL: https://github.com/apache/storm/pull/3295#discussion_r452306573



##
File path: storm-client/src/jvm/org/apache/storm/executor/Executor.java
##
@@ -124,6 +126,7 @@
 protected int idToTaskBase;
 protected String hostname;
 private static final double msDurationFactor = 1.0 / 
TimeUnit.MILLISECONDS.toNanos(1);
+private boolean needToRefreshCreds = true;

Review comment:
   Since multiple threads are changing this flag- switching to 
_AtomicBoolean_.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [storm] kishorvpatil commented on a change in pull request #3295: STORM-3660: Remove use of queues for updating credentials

2020-07-01 Thread GitBox


kishorvpatil commented on a change in pull request #3295:
URL: https://github.com/apache/storm/pull/3295#discussion_r448387076



##
File path: storm-client/src/jvm/org/apache/storm/executor/Executor.java
##
@@ -270,6 +273,15 @@ public ExecutorShutdown execute() throws Exception {
 
 @Override
 public void accept(Object event) {
+if (this.needToRefreshCreds) {

Review comment:
   moved this to separate method and being called fro withing 
`BoltExecutor` and `SpoutExecutor` call at the beginning of each iteration in 
the asyncloop. This would ensure that credentials are updated on executor 
irrespective of back pressure or no tuples to process scenarios. For backward 
compatibility and agreement about invoking _setCreentials_ on user 
implementation on _ICredentialsListener_ from executor thread, if Executor 
thread is stuck in _execute_ or _nextTuple_  methods delays can not be avoided. 
But otherwise we should be able to get credentials update on next iteration.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [storm] kishorvpatil commented on a change in pull request #3295: STORM-3660: Remove use of queues for updating credentials

2020-07-01 Thread GitBox


kishorvpatil commented on a change in pull request #3295:
URL: https://github.com/apache/storm/pull/3295#discussion_r448384044



##
File path: storm-client/src/jvm/org/apache/storm/executor/Executor.java
##
@@ -270,6 +273,15 @@ public ExecutorShutdown execute() throws Exception {
 
 @Override
 public void accept(Object event) {
+if (this.needToRefreshCreds) {
+this.needToRefreshCreds = false;

Review comment:
   The _needToRefreshCreds_ could be updated to _true_ anytime after if 
condition is evaluated by the worker credentials refresh thread. If there are 
consecutive updates, it will either execute the _setCredentials_ twice or will 
execute only once with latest credentials - no harm/no deadlock.   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [storm] kishorvpatil commented on a change in pull request #3295: STORM-3660: Remove use of queues for updating credentials

2020-07-01 Thread GitBox


kishorvpatil commented on a change in pull request #3295:
URL: https://github.com/apache/storm/pull/3295#discussion_r448381948



##
File path: storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
##
@@ -415,13 +414,13 @@ public void updateBlobUpdates() throws IOException {
 
 public void checkCredentialsChanged() {
 Credentials newCreds = 
workerState.stormClusterState.credentials(topologyId, null);
-if (!ObjectUtils.equals(newCreds, credentialsAtom.get())) {
+if (!ObjectUtils.equals(newCreds, 
this.workerState.credentialsAtom.get())) {
 // This does not have to be atomic, worst case we update when one 
is not needed
 ClientAuthUtils.updateSubject(subject, autoCreds, (null == 
newCreds) ? null : newCreds.get_creds());
 for (IRunningExecutor executor : executorsAtom.get()) {
 executor.credentialsChanged(newCreds);
 }
-credentialsAtom.set(newCreds);
+this.workerState.credentialsAtom.set(newCreds);

Review comment:
   Moved this up before for loop.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [storm] kishorvpatil commented on a change in pull request #3295: STORM-3660: Remove use of queues for updating credentials

2020-07-01 Thread GitBox


kishorvpatil commented on a change in pull request #3295:
URL: https://github.com/apache/storm/pull/3295#discussion_r448381511



##
File path: storm-client/src/jvm/org/apache/storm/executor/Executor.java
##
@@ -124,6 +126,7 @@
 protected int idToTaskBase;
 protected String hostname;
 private static final double msDurationFactor = 1.0 / 
TimeUnit.MILLISECONDS.toNanos(1);
+protected boolean needToRefreshCreds = true;

Review comment:
   Addressed

##
File path: storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
##
@@ -152,6 +153,7 @@
 private final AtomicLong nextLoadUpdate = new AtomicLong(0);
 private final boolean trySerializeLocal;
 private final Collection autoCredentials;
+AtomicReference credentialsAtom;

Review comment:
   Addressed





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [storm] kishorvpatil commented on a change in pull request #3295: STORM-3660: Remove use of queues for updating credentials

2020-06-26 Thread GitBox


kishorvpatil commented on a change in pull request #3295:
URL: https://github.com/apache/storm/pull/3295#discussion_r446401963



##
File path: storm-client/src/jvm/org/apache/storm/executor/Executor.java
##
@@ -270,6 +273,15 @@ public ExecutorShutdown execute() throws Exception {
 
 @Override
 public void accept(Object event) {
+if (this.needToRefreshCreds) {
+LOG.info("The credentials are being updated {}.", executorId);
+Credentials creds = this.workerData.getCredentials();
+idToTask.stream().map(Task::getTaskObject).filter(taskObject -> 
taskObject instanceof ICredentialsListener)
+.forEach(taskObject -> {
+((ICredentialsListener) 
taskObject).setCredentials(creds == null ? null : creds.get_creds());
+});
+this.needToRefreshCreds = false;

Review comment:
   Addressed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [storm] kishorvpatil commented on a change in pull request #3295: STORM-3660: Remove use of queues for updating credentials

2020-06-26 Thread GitBox


kishorvpatil commented on a change in pull request #3295:
URL: https://github.com/apache/storm/pull/3295#discussion_r446379664



##
File path: storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
##
@@ -60,16 +60,7 @@ public ExecutorStats renderStats() {
 
 @Override
 public void credentialsChanged(Credentials credentials) {
-TupleImpl tuple = new TupleImpl(executor.getWorkerTopologyContext(), 
new Values(credentials),
-Constants.SYSTEM_COMPONENT_ID, (int) 
Constants.SYSTEM_TASK_ID,
-
Constants.CREDENTIALS_CHANGED_STREAM_ID);
-AddressedTuple addressedTuple = new 
AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple);
-try {
-executor.getReceiveQueue().publish(addressedTuple);
-executor.getReceiveQueue().flush();
-} catch (InterruptedException e) {
-throw new RuntimeException(e);
-}
+executor.needToRefreshCreds = true;

Review comment:
   Since the _SpoutExecutor_ and _BoltExecutor_ during init uses the 
initialCredentials,  the _accept_ method does not need to worry about it. 

##
File path: storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
##
@@ -60,16 +60,7 @@ public ExecutorStats renderStats() {
 
 @Override
 public void credentialsChanged(Credentials credentials) {
-TupleImpl tuple = new TupleImpl(executor.getWorkerTopologyContext(), 
new Values(credentials),
-Constants.SYSTEM_COMPONENT_ID, (int) 
Constants.SYSTEM_TASK_ID,
-
Constants.CREDENTIALS_CHANGED_STREAM_ID);
-AddressedTuple addressedTuple = new 
AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple);
-try {
-executor.getReceiveQueue().publish(addressedTuple);
-executor.getReceiveQueue().flush();
-} catch (InterruptedException e) {
-throw new RuntimeException(e);
-}
+executor.needToRefreshCreds = true;

Review comment:
   e.g. 
https://github.com/apache/storm/blob/master/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java#L144-L146
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org