Re: Review Request 61665: ATLAS-2047: Exception Thrown by Kafka Consumer Ends up Filling Logs Due to Incorrect Handling

2017-08-30 Thread Madhan Neethiraj

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/61665/#review184234
---


Ship it!




Ship It!

- Madhan Neethiraj


On Aug. 31, 2017, 4:23 a.m., Ashutosh Mestry wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/61665/
> ---
> 
> (Updated Aug. 31, 2017, 4:23 a.m.)
> 
> 
> Review request for atlas, Madhan Neethiraj and Nixon Rodrigues.
> 
> 
> Bugs: ATLAS-2047
> https://issues.apache.org/jira/browse/ATLAS-2047
> 
> 
> Repository: atlas
> 
> 
> Description
> ---
> 
> Please refer to 
> [ATLAS-2047](https://issues.apache.org/jira/browse/ATLAS-2047) for background 
> and analysis.
> 
> **Background**
> 
> The _IllegalStateException_ is thrown by _KafkaConsumer.aquire_. This method 
> is called at the beginning of almost every method in this class. The method 
> checks if the consumer is closed, if it is then it throws 
> IllegalStateException.
> 
> Scenario may come about in this way:
> - Shutdown has been initiated. Close on consumer is called.
> - However, the consumer thread is just about to enter another poll cycle.
> - Thus acquire sees that consumer is closed and throws the exception (2nd 
> bullet above).
> 
> Please take a look at this scala code. This is _ShutdownableThread_. The 
> thread does the job of handling all exceptions. Upon exception, it manages 
> the _shutdownLatch_ (from yesterday’s bug fix) and gets out of the 
> _isRunning_ loop.
> ```scala
>   override def run(): Unit = {
> info("Starting ")
> try{
>   while(isRunning.get()){
> doWork()
>   }
> } catch{
>   case e: Throwable =>
> if(isRunning.get())
>   error("Error due to ", e)
> }
> shutdownLatch.countDown()
> info("Stopped ")
>   }
> ```
> 
> **Implementation**
> 
> Special treatment is given to _IllegalStateException_ by implementing pause & 
> retry logic:
> - Modified _LOG_ to _debug_. That way logs are not filled during retry.
> - _HookConsumer_ is more resilient. It handles exceptions resulting from 
> _Kafka_ and entity APIs.
> 
> 
> Diffs
> -
> 
>   
> webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
>  ef64c3b 
>   webapp/src/test/java/org/apache/atlas/notification/AdaptiveWaiterTest.java 
> PRE-CREATION 
>   
> webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
>  a6f58e8 
> 
> 
> Diff: https://reviews.apache.org/r/61665/diff/5/
> 
> 
> Testing
> ---
> 
> **Unit tests**
> Updated unit tests to reproduce the scenarios and verify the fix.
> 
> **Functional tests**
> Verified regular notification scenarios.
> 
> 
> Thanks,
> 
> Ashutosh Mestry
> 
>



Re: Review Request 61665: ATLAS-2047: Exception Thrown by Kafka Consumer Ends up Filling Logs Due to Incorrect Handling

2017-08-30 Thread Ashutosh Mestry

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/61665/
---

(Updated Aug. 31, 2017, 4:23 a.m.)


Review request for atlas, Madhan Neethiraj and Nixon Rodrigues.


Changes
---

Updates include:
- Addressed review comments.
- Updated unit test.


Bugs: ATLAS-2047
https://issues.apache.org/jira/browse/ATLAS-2047


Repository: atlas


Description
---

Please refer to [ATLAS-2047](https://issues.apache.org/jira/browse/ATLAS-2047) 
for background and analysis.

**Background**

The _IllegalStateException_ is thrown by _KafkaConsumer.aquire_. This method is 
called at the beginning of almost every method in this class. The method checks 
if the consumer is closed, if it is then it throws IllegalStateException.

Scenario may come about in this way:
- Shutdown has been initiated. Close on consumer is called.
- However, the consumer thread is just about to enter another poll cycle.
- Thus acquire sees that consumer is closed and throws the exception (2nd 
bullet above).

Please take a look at this scala code. This is _ShutdownableThread_. The thread 
does the job of handling all exceptions. Upon exception, it manages the 
_shutdownLatch_ (from yesterday’s bug fix) and gets out of the _isRunning_ loop.
```scala
  override def run(): Unit = {
info("Starting ")
try{
  while(isRunning.get()){
doWork()
  }
} catch{
  case e: Throwable =>
if(isRunning.get())
  error("Error due to ", e)
}
shutdownLatch.countDown()
info("Stopped ")
  }
```

**Implementation**

Special treatment is given to _IllegalStateException_ by implementing pause & 
retry logic:
- Modified _LOG_ to _debug_. That way logs are not filled during retry.
- _HookConsumer_ is more resilient. It handles exceptions resulting from 
_Kafka_ and entity APIs.


Diffs (updated)
-

  
webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
 ef64c3b 
  webapp/src/test/java/org/apache/atlas/notification/AdaptiveWaiterTest.java 
PRE-CREATION 
  
webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
 a6f58e8 


Diff: https://reviews.apache.org/r/61665/diff/5/

Changes: https://reviews.apache.org/r/61665/diff/4-5/


Testing
---

**Unit tests**
Updated unit tests to reproduce the scenarios and verify the fix.

**Functional tests**
Verified regular notification scenarios.


Thanks,

Ashutosh Mestry



Re: Review Request 61665: ATLAS-2047: Exception Thrown by Kafka Consumer Ends up Filling Logs Due to Incorrect Handling

2017-08-30 Thread Madhan Neethiraj

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/61665/#review184102
---


Fix it, then Ship it!





webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
Lines 246 (patched)


Instead of computing timeSinceLastWait here and passing to 
setWaitDurations(), I would suggest the following:

public void pause(Exception ex) {
  setWaitDuration();
  
  try {
...
Thread.sleep(waitDuration);
...
  } catch(...) {
...
  }
}

private void setWaitDuration() {
  long now   = System.currentTimeMillis();
  long timeSinceLastWait = now - lastWaitAt; // lastWaitAt will be 0 for 
the first time, which will result in "waitDuration = minDuration" in the 
followig if block

  lastWaitAt = now;

  if (timeSinceLastWait > resetInterval) {
waitDuration = minDuration;
  } else if (waitDuration != maxDuration) {
waitDuration += increment;

if (waitDuration > maxDuration) {
  waitDuration = maxDuration;
}
  }
}



webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
Lines 281 (patched)


Use maxWaitDuration from line #127, instead of creating a new one here.



webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
Lines 283 (patched)


500 ==> minWaitDuration


- Madhan Neethiraj


On Aug. 29, 2017, 10:37 p.m., Ashutosh Mestry wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/61665/
> ---
> 
> (Updated Aug. 29, 2017, 10:37 p.m.)
> 
> 
> Review request for atlas, Madhan Neethiraj and Nixon Rodrigues.
> 
> 
> Bugs: ATLAS-2047
> https://issues.apache.org/jira/browse/ATLAS-2047
> 
> 
> Repository: atlas
> 
> 
> Description
> ---
> 
> Please refer to 
> [ATLAS-2047](https://issues.apache.org/jira/browse/ATLAS-2047) for background 
> and analysis.
> 
> **Background**
> 
> The _IllegalStateException_ is thrown by _KafkaConsumer.aquire_. This method 
> is called at the beginning of almost every method in this class. The method 
> checks if the consumer is closed, if it is then it throws 
> IllegalStateException.
> 
> Scenario may come about in this way:
> - Shutdown has been initiated. Close on consumer is called.
> - However, the consumer thread is just about to enter another poll cycle.
> - Thus acquire sees that consumer is closed and throws the exception (2nd 
> bullet above).
> 
> Please take a look at this scala code. This is _ShutdownableThread_. The 
> thread does the job of handling all exceptions. Upon exception, it manages 
> the _shutdownLatch_ (from yesterday’s bug fix) and gets out of the 
> _isRunning_ loop.
> ```scala
>   override def run(): Unit = {
> info("Starting ")
> try{
>   while(isRunning.get()){
> doWork()
>   }
> } catch{
>   case e: Throwable =>
> if(isRunning.get())
>   error("Error due to ", e)
> }
> shutdownLatch.countDown()
> info("Stopped ")
>   }
> ```
> 
> **Implementation**
> 
> Special treatment is given to _IllegalStateException_ by implementing pause & 
> retry logic:
> - Modified _LOG_ to _debug_. That way logs are not filled during retry.
> - _HookConsumer_ is more resilient. It handles exceptions resulting from 
> _Kafka_ and entity APIs.
> 
> 
> Diffs
> -
> 
>   
> webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
>  ef64c3b 
>   webapp/src/test/java/org/apache/atlas/notification/AdaptiveWaiterTest.java 
> PRE-CREATION 
>   
> webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
>  a6f58e8 
> 
> 
> Diff: https://reviews.apache.org/r/61665/diff/4/
> 
> 
> Testing
> ---
> 
> **Unit tests**
> Updated unit tests to reproduce the scenarios and verify the fix.
> 
> **Functional tests**
> Verified regular notification scenarios.
> 
> 
> Thanks,
> 
> Ashutosh Mestry
> 
>



Re: Review Request 61665: ATLAS-2047: Exception Thrown by Kafka Consumer Ends up Filling Logs Due to Incorrect Handling

2017-08-29 Thread Madhan Neethiraj

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/61665/#review184093
---




webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
Lines 227 (patched)


Consider marking 'resetInterval' as final.



webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
Lines 257 (patched)


Consider renaming updateDurations() to setWaitDuration() and call from line 
#240.

void setWaitDuration() {
  long now   = System.currentTimeMillis();
  long timeSinceLastWait = now - lastWaitAt;

  if (timeSinceLastWait > resetInterval) {
waitDuration = minDuration; // reset
  } else if (waitDuration != maxDuration) {
waitDuration += increment;

if (waitDuration > maxDuration) {
  waitDuration = maxDuration;
}
  }
  
  lastWaitAt = now;
}



webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
Lines 264 (patched)


Shouldn't waitDuration be set to 'maxDuration' here? Else, the next wait 
will start from minDuration - causing more frequent retries.



webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
Lines 275 (patched)


Consider reading minWaitDuration, maxWaitDuration from configuration, right 
next to where consumerRetryInterval is read currently (line #120).

consumerRetryInterval = 
applicationProperties.getInt(CONSUMER_RETRY_INTERVAL, 500);
minWaitDuration   = applicationProperties.getInt(MIN_RETRY_INTERVAL, 
consumerRetryInterval); // 500 ms  by default
maxWaitDuration   = applicationProperties.getInt(MAX_RETRY_INTERVAL, 
minWaitDuration * 60);  //  30 sec by default



webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
Lines 279 (patched)


'pauseDuration' seems to be used only in tests; considering removing this 
member and update tests to use minWaitDuration.



webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
Line 246 (original), 306 (patched)


Why not call 'adaptiveWaiter.pause()' here as well?


- Madhan Neethiraj


On Aug. 29, 2017, 9:01 p.m., Ashutosh Mestry wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/61665/
> ---
> 
> (Updated Aug. 29, 2017, 9:01 p.m.)
> 
> 
> Review request for atlas, Madhan Neethiraj and Nixon Rodrigues.
> 
> 
> Bugs: ATLAS-2047
> https://issues.apache.org/jira/browse/ATLAS-2047
> 
> 
> Repository: atlas
> 
> 
> Description
> ---
> 
> Please refer to 
> [ATLAS-2047](https://issues.apache.org/jira/browse/ATLAS-2047) for background 
> and analysis.
> 
> **Background**
> 
> The _IllegalStateException_ is thrown by _KafkaConsumer.aquire_. This method 
> is called at the beginning of almost every method in this class. The method 
> checks if the consumer is closed, if it is then it throws 
> IllegalStateException.
> 
> Scenario may come about in this way:
> - Shutdown has been initiated. Close on consumer is called.
> - However, the consumer thread is just about to enter another poll cycle.
> - Thus acquire sees that consumer is closed and throws the exception (2nd 
> bullet above).
> 
> Please take a look at this scala code. This is _ShutdownableThread_. The 
> thread does the job of handling all exceptions. Upon exception, it manages 
> the _shutdownLatch_ (from yesterday’s bug fix) and gets out of the 
> _isRunning_ loop.
> ```scala
>   override def run(): Unit = {
> info("Starting ")
> try{
>   while(isRunning.get()){
> doWork()
>   }
> } catch{
>   case e: Throwable =>
> if(isRunning.get())
>   error("Error due to ", e)
> }
> shutdownLatch.countDown()
> info("Stopped ")
>   }
> ```
> 
> **Implementation**
> 
> Special treatment is given to _IllegalStateException_ by implementing pause & 
> retry logic:
> - Modified _LOG_ to _debug_. That way logs are not filled during retry.
> - _HookConsumer_ is more resilient. It handles exceptions resulting from 
> _Kafka_ and entity APIs.
> 
> 
> Diffs
> -
> 
>   
> webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
>  ef64c3b 
>   webapp/src/test/java/org/apache/atlas/notification/AdaptiveWaiterTest.java 
> PRE-CREATION 
>   
> webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
>  

Re: Review Request 61665: ATLAS-2047: Exception Thrown by Kafka Consumer Ends up Filling Logs Due to Incorrect Handling

2017-08-29 Thread Ashutosh Mestry

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/61665/
---

(Updated Aug. 29, 2017, 9:01 p.m.)


Review request for atlas, Madhan Neethiraj and Nixon Rodrigues.


Changes
---

Updates include:
- _AdaptiveWaiter_ implementation.
- Added unit tests for new implementation.


Bugs: ATLAS-2047
https://issues.apache.org/jira/browse/ATLAS-2047


Repository: atlas


Description
---

Please refer to [ATLAS-2047](https://issues.apache.org/jira/browse/ATLAS-2047) 
for background and analysis.

**Background**

The _IllegalStateException_ is thrown by _KafkaConsumer.aquire_. This method is 
called at the beginning of almost every method in this class. The method checks 
if the consumer is closed, if it is then it throws IllegalStateException.

Scenario may come about in this way:
- Shutdown has been initiated. Close on consumer is called.
- However, the consumer thread is just about to enter another poll cycle.
- Thus acquire sees that consumer is closed and throws the exception (2nd 
bullet above).

Please take a look at this scala code. This is _ShutdownableThread_. The thread 
does the job of handling all exceptions. Upon exception, it manages the 
_shutdownLatch_ (from yesterday’s bug fix) and gets out of the _isRunning_ loop.
```scala
  override def run(): Unit = {
info("Starting ")
try{
  while(isRunning.get()){
doWork()
  }
} catch{
  case e: Throwable =>
if(isRunning.get())
  error("Error due to ", e)
}
shutdownLatch.countDown()
info("Stopped ")
  }
```

**Implementation**

Special treatment is given to _IllegalStateException_ by implementing pause & 
retry logic:
- Modified _LOG_ to _debug_. That way logs are not filled during retry.
- _HookConsumer_ is more resilient. It handles exceptions resulting from 
_Kafka_ and entity APIs.


Diffs (updated)
-

  
webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
 ef64c3b 
  webapp/src/test/java/org/apache/atlas/notification/AdaptiveWaiterTest.java 
PRE-CREATION 
  
webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
 a6f58e8 


Diff: https://reviews.apache.org/r/61665/diff/3/

Changes: https://reviews.apache.org/r/61665/diff/2-3/


Testing
---

**Unit tests**
Updated unit tests to reproduce the scenarios and verify the fix.

**Functional tests**
Verified regular notification scenarios.


Thanks,

Ashutosh Mestry



Re: Review Request 61665: ATLAS-2047: Exception Thrown by Kafka Consumer Ends up Filling Logs Due to Incorrect Handling

2017-08-25 Thread Madhan Neethiraj

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/61665/#review183832
---




webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
Lines 274 (patched)


I think it will be useful to abstract this functionality in a class of its 
own.

class AdaptiveWaiter {
  public AdaptiveWaiter(long minWaitTime, long maxWaitTime, long 
waitIncrement) {
resetInterval = maxWaitTime * 2;
  }

  public wait(String message) {
long now   = System.currentTimeMillis();
long timeSinceLastWait = now - lastWaitAt;

if (timeSinceLastWait > resetInterval) { // it has been a long time 
since the last call
  waitTime = minWaitTime;
} else {
  waitTime = timeSinceLastWait + waitIncrement;

  if (waitTime > maxWaitTime) {
waitTime = maxWaitTime;
  }
}

lastWaitAt = now;

LOG.debug(...);

sleep(waitTime);
  }


- Madhan Neethiraj


On Aug. 17, 2017, 5:04 a.m., Ashutosh Mestry wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/61665/
> ---
> 
> (Updated Aug. 17, 2017, 5:04 a.m.)
> 
> 
> Review request for atlas, Madhan Neethiraj and Nixon Rodrigues.
> 
> 
> Bugs: ATLAS-2047
> https://issues.apache.org/jira/browse/ATLAS-2047
> 
> 
> Repository: atlas
> 
> 
> Description
> ---
> 
> Please refer to 
> [ATLAS-2047](https://issues.apache.org/jira/browse/ATLAS-2047) for background 
> and analysis.
> 
> **Background**
> 
> The _IllegalStateException_ is thrown by _KafkaConsumer.aquire_. This method 
> is called at the beginning of almost every method in this class. The method 
> checks if the consumer is closed, if it is then it throws 
> IllegalStateException.
> 
> Scenario may come about in this way:
> - Shutdown has been initiated. Close on consumer is called.
> - However, the consumer thread is just about to enter another poll cycle.
> - Thus acquire sees that consumer is closed and throws the exception (2nd 
> bullet above).
> 
> Please take a look at this scala code. This is _ShutdownableThread_. The 
> thread does the job of handling all exceptions. Upon exception, it manages 
> the _shutdownLatch_ (from yesterday’s bug fix) and gets out of the 
> _isRunning_ loop.
> ```scala
>   override def run(): Unit = {
> info("Starting ")
> try{
>   while(isRunning.get()){
> doWork()
>   }
> } catch{
>   case e: Throwable =>
> if(isRunning.get())
>   error("Error due to ", e)
> }
> shutdownLatch.countDown()
> info("Stopped ")
>   }
> ```
> 
> **Implementation**
> 
> Special treatment is given to _IllegalStateException_ by implementing pause & 
> retry logic:
> - Modified _LOG_ to _debug_. That way logs are not filled during retry.
> - _HookConsumer_ is more resilient. It handles exceptions resulting from 
> _Kafka_ and entity APIs.
> 
> 
> Diffs
> -
> 
>   
> webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
>  ef64c3b 
>   
> webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
>  a6f58e8 
> 
> 
> Diff: https://reviews.apache.org/r/61665/diff/2/
> 
> 
> Testing
> ---
> 
> **Unit tests**
> Updated unit tests to reproduce the scenarios and verify the fix.
> 
> **Functional tests**
> Verified regular notification scenarios.
> 
> 
> Thanks,
> 
> Ashutosh Mestry
> 
>



Re: Review Request 61665: ATLAS-2047: Exception Thrown by Kafka Consumer Ends up Filling Logs Due to Incorrect Handling

2017-08-16 Thread Ashutosh Mestry

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/61665/
---

(Updated Aug. 16, 2017, 10:45 p.m.)


Review request for atlas, Madhan Neethiraj and Nixon Rodrigues.


Changes
---

Includes:
- Retry with _slidingWindow_ logic.
- Additional unit tests to verify retry logic.


Bugs: ATLAS-2047
https://issues.apache.org/jira/browse/ATLAS-2047


Repository: atlas


Description
---

Please refer to [ATLAS-2047](https://issues.apache.org/jira/browse/ATLAS-2047) 
for background and analysis.

**Implementation**

Please take a look at this scala code. This is _ShutdownableThread_. The thread 
does the job of handling all exceptions. Upon exception, it manages the 
_shutdownLatch_ (from yesterday’s bug fix) and gets out of the _isRunning_ loop.
```scala
  override def run(): Unit = {
info("Starting ")
try{
  while(isRunning.get()){
doWork()
  }
} catch{
  case e: Throwable =>
if(isRunning.get())
  error("Error due to ", e)
}
shutdownLatch.countDown()
info("Stopped ")
  }
```

Moved _try...catch_ block to leave exception handling to _ShutdownableThread_.


Diffs (updated)
-

  
webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
 ef64c3b 
  
webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
 a6f58e8 


Diff: https://reviews.apache.org/r/61665/diff/2/

Changes: https://reviews.apache.org/r/61665/diff/1-2/


Testing
---

**Unit tests**
Updated unit tests to reproduce the scenarios and verify the fix.

**Functional tests**
Verified regular notification scenarios.


Thanks,

Ashutosh Mestry



Re: Review Request 61665: ATLAS-2047: Exception Thrown by Kafka Consumer Ends up Filling Logs Due to Incorrect Handling

2017-08-16 Thread Ashutosh Mestry


> On Aug. 16, 2017, 9:59 a.m., Nixon Rodrigues wrote:
> > Ship It!

Can you please review? I have updated the patch with retry logic.


- Ashutosh


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/61665/#review183030
---


On Aug. 15, 2017, 4:58 p.m., Ashutosh Mestry wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/61665/
> ---
> 
> (Updated Aug. 15, 2017, 4:58 p.m.)
> 
> 
> Review request for atlas, Madhan Neethiraj and Nixon Rodrigues.
> 
> 
> Bugs: ATLAS-2047
> https://issues.apache.org/jira/browse/ATLAS-2047
> 
> 
> Repository: atlas
> 
> 
> Description
> ---
> 
> Please refer to 
> [ATLAS-2047](https://issues.apache.org/jira/browse/ATLAS-2047) for background 
> and analysis.
> 
> **Implementation**
> 
> Please take a look at this scala code. This is _ShutdownableThread_. The 
> thread does the job of handling all exceptions. Upon exception, it manages 
> the _shutdownLatch_ (from yesterday’s bug fix) and gets out of the 
> _isRunning_ loop.
> ```scala
>   override def run(): Unit = {
> info("Starting ")
> try{
>   while(isRunning.get()){
> doWork()
>   }
> } catch{
>   case e: Throwable =>
> if(isRunning.get())
>   error("Error due to ", e)
> }
> shutdownLatch.countDown()
> info("Stopped ")
>   }
> ```
> 
> Moved _try...catch_ block to leave exception handling to _ShutdownableThread_.
> 
> 
> Diffs
> -
> 
>   
> webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
>  ef64c3b 
>   
> webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
>  a6f58e8 
> 
> 
> Diff: https://reviews.apache.org/r/61665/diff/1/
> 
> 
> Testing
> ---
> 
> **Unit tests**
> Updated unit tests to reproduce the scenarios and verify the fix.
> 
> **Functional tests**
> Verified regular notification scenarios.
> 
> 
> Thanks,
> 
> Ashutosh Mestry
> 
>