This is an automated email from the ASF dual-hosted git repository. fjy pushed a commit to branch 0.18.0 in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/0.18.0 by this push: new 325f299 Fix NPE in RemoteTaskRunner event handler causes JVM shutdown (#9610) (#9641) 325f299 is described below commit 325f29977740b1ff446f01b98179e377760e47f4 Author: Maytas Monsereenusorn <52679095+mayt...@users.noreply.github.com> AuthorDate: Tue Apr 7 16:58:27 2020 -1000 Fix NPE in RemoteTaskRunner event handler causes JVM shutdown (#9610) (#9641) * Fix NPE in RemoteTaskRunner event handler causes JVM shutdown * address comments * fix compile * fix checkstyle * fix lgtm * fix merge * fix test * fix tests * change scope * address comments * address comments --- extensions-contrib/ambari-metrics-emitter/pom.xml | 12 ++ .../druid/indexing/overlord/RemoteTaskRunner.java | 232 ++++++++++++--------- .../indexing/overlord/RemoteTaskRunnerTest.java | 36 ++++ licenses.yaml | 2 +- pom.xml | 2 +- .../druid/curator/discovery/DiscoveryModule.java | 12 ++ 6 files changed, 191 insertions(+), 105 deletions(-) diff --git a/extensions-contrib/ambari-metrics-emitter/pom.xml b/extensions-contrib/ambari-metrics-emitter/pom.xml index 7cd69ea..fe94d09 100644 --- a/extensions-contrib/ambari-metrics-emitter/pom.xml +++ b/extensions-contrib/ambari-metrics-emitter/pom.xml @@ -131,6 +131,18 @@ <artifactId>JUnitParams</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-core-asl</artifactId> + <version>${codehaus.jackson.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-mapper-asl</artifactId> + <version>${codehaus.jackson.version}</version> + <scope>test</scope> + </dependency> </dependencies> <build> diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java index fe6e8be..dbaadf9 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java @@ -45,6 +45,7 @@ import com.google.common.util.concurrent.SettableFuture; import org.apache.commons.lang.mutable.MutableInt; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.utils.ZKPaths; import org.apache.druid.concurrent.LifecycleLock; import org.apache.druid.curator.CuratorUtils; @@ -969,116 +970,141 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer ); // Add status listener to the watcher for status changes - zkWorker.addListener( - (client, event) -> { - final String taskId; - final RemoteTaskRunnerWorkItem taskRunnerWorkItem; - synchronized (statusLock) { - try { - switch (event.getType()) { - case CHILD_ADDED: - case CHILD_UPDATED: - taskId = ZKPaths.getNodeFromPath(event.getData().getPath()); - final TaskAnnouncement announcement = jsonMapper.readValue( - event.getData().getData(), TaskAnnouncement.class - ); - - log.info( - "Worker[%s] wrote %s status for task [%s] on [%s]", - zkWorker.getWorker().getHost(), - announcement.getTaskStatus().getStatusCode(), - taskId, - announcement.getTaskLocation() - ); - - // Synchronizing state with ZK - statusLock.notifyAll(); - - final RemoteTaskRunnerWorkItem tmp; - if ((tmp = runningTasks.get(taskId)) != null) { - taskRunnerWorkItem = tmp; - } else { - final RemoteTaskRunnerWorkItem newTaskRunnerWorkItem = new RemoteTaskRunnerWorkItem( - taskId, - announcement.getTaskType(), - zkWorker.getWorker(), - TaskLocation.unknown(), - announcement.getTaskDataSource() - ); - final RemoteTaskRunnerWorkItem existingItem = runningTasks.putIfAbsent( - taskId, - newTaskRunnerWorkItem - ); - if (existingItem == null) { - log.warn( - "Worker[%s] announced a status for a task I didn't know about, adding to runningTasks: %s", - zkWorker.getWorker().getHost(), - taskId - ); - taskRunnerWorkItem = newTaskRunnerWorkItem; - } else { - taskRunnerWorkItem = existingItem; - } - } - - if (!announcement.getTaskLocation().equals(taskRunnerWorkItem.getLocation())) { - taskRunnerWorkItem.setLocation(announcement.getTaskLocation()); - TaskRunnerUtils.notifyLocationChanged(listeners, taskId, announcement.getTaskLocation()); - } + zkWorker.addListener(getStatusListener(worker, zkWorker, retVal)); + zkWorker.start(); + return retVal; + } + catch (Exception e) { + throw new RuntimeException(e); + } + } - if (announcement.getTaskStatus().isComplete()) { - taskComplete(taskRunnerWorkItem, zkWorker, announcement.getTaskStatus()); - runPendingTasks(); - } - break; - case CHILD_REMOVED: - taskId = ZKPaths.getNodeFromPath(event.getData().getPath()); - taskRunnerWorkItem = runningTasks.remove(taskId); - if (taskRunnerWorkItem != null) { - log.info("Task[%s] just disappeared!", taskId); - taskRunnerWorkItem.setResult(TaskStatus.failure(taskId)); - TaskRunnerUtils.notifyStatusChanged(listeners, taskId, TaskStatus.failure(taskId)); - } else { - log.info("Task[%s] went bye bye.", taskId); - } - break; - case INITIALIZED: - if (zkWorkers.putIfAbsent(worker.getHost(), zkWorker) == null) { - retVal.set(zkWorker); - } else { - final String message = StringUtils.format( - "WTF?! Tried to add already-existing worker[%s]", - worker.getHost() - ); - log.makeAlert(message) - .addData("workerHost", worker.getHost()) - .addData("workerIp", worker.getIp()) - .emit(); - retVal.setException(new IllegalStateException(message)); - } - runPendingTasks(); - break; - case CONNECTION_SUSPENDED: - case CONNECTION_RECONNECTED: - case CONNECTION_LOST: - // do nothing + @VisibleForTesting + PathChildrenCacheListener getStatusListener(final Worker worker, final ZkWorker zkWorker, final SettableFuture<ZkWorker> retVal) + { + return (client, event) -> { + final String taskId; + final RemoteTaskRunnerWorkItem taskRunnerWorkItem; + synchronized (statusLock) { + try { + switch (event.getType()) { + case CHILD_ADDED: + case CHILD_UPDATED: + if (event.getData() == null) { + log.error("Unexpected null for event.getData() in handle new worker status for [%s]", event.getType().toString()); + log.makeAlert("Unexpected null for event.getData() in handle new worker status") + .addData("worker", zkWorker.getWorker().getHost()) + .addData("eventType", event.getType().toString()) + .emit(); + return; + } + taskId = ZKPaths.getNodeFromPath(event.getData().getPath()); + final TaskAnnouncement announcement = jsonMapper.readValue( + event.getData().getData(), TaskAnnouncement.class + ); + + log.info( + "Worker[%s] wrote %s status for task [%s] on [%s]", + zkWorker.getWorker().getHost(), + announcement.getTaskStatus().getStatusCode(), + taskId, + announcement.getTaskLocation() + ); + + // Synchronizing state with ZK + statusLock.notifyAll(); + + final RemoteTaskRunnerWorkItem tmp; + if ((tmp = runningTasks.get(taskId)) != null) { + taskRunnerWorkItem = tmp; + } else { + final RemoteTaskRunnerWorkItem newTaskRunnerWorkItem = new RemoteTaskRunnerWorkItem( + taskId, + announcement.getTaskType(), + zkWorker.getWorker(), + TaskLocation.unknown(), + announcement.getTaskDataSource() + ); + final RemoteTaskRunnerWorkItem existingItem = runningTasks.putIfAbsent( + taskId, + newTaskRunnerWorkItem + ); + if (existingItem == null) { + log.warn( + "Worker[%s] announced a status for a task I didn't know about, adding to runningTasks: %s", + zkWorker.getWorker().getHost(), + taskId + ); + taskRunnerWorkItem = newTaskRunnerWorkItem; + } else { + taskRunnerWorkItem = existingItem; } } - catch (Exception e) { - log.makeAlert(e, "Failed to handle new worker status") + + if (!announcement.getTaskLocation().equals(taskRunnerWorkItem.getLocation())) { + taskRunnerWorkItem.setLocation(announcement.getTaskLocation()); + TaskRunnerUtils.notifyLocationChanged(listeners, taskId, announcement.getTaskLocation()); + } + + if (announcement.getTaskStatus().isComplete()) { + taskComplete(taskRunnerWorkItem, zkWorker, announcement.getTaskStatus()); + runPendingTasks(); + } + break; + case CHILD_REMOVED: + if (event.getData() == null) { + log.error("Unexpected null for event.getData() in handle new worker status for [%s]", event.getType().toString()); + log.makeAlert("Unexpected null for event.getData() in handle new worker status") .addData("worker", zkWorker.getWorker().getHost()) - .addData("znode", event.getData().getPath()) + .addData("eventType", event.getType().toString()) .emit(); + return; } - } + taskId = ZKPaths.getNodeFromPath(event.getData().getPath()); + taskRunnerWorkItem = runningTasks.remove(taskId); + if (taskRunnerWorkItem != null) { + log.info("Task[%s] just disappeared!", taskId); + taskRunnerWorkItem.setResult(TaskStatus.failure(taskId)); + TaskRunnerUtils.notifyStatusChanged(listeners, taskId, TaskStatus.failure(taskId)); + } else { + log.info("Task[%s] went bye bye.", taskId); + } + break; + case INITIALIZED: + if (zkWorkers.putIfAbsent(worker.getHost(), zkWorker) == null) { + retVal.set(zkWorker); + } else { + final String message = StringUtils.format( + "This should not happen...tried to add already-existing worker[%s]", + worker.getHost() + ); + log.makeAlert(message) + .addData("workerHost", worker.getHost()) + .addData("workerIp", worker.getIp()) + .emit(); + retVal.setException(new IllegalStateException(message)); + } + runPendingTasks(); + break; + case CONNECTION_SUSPENDED: + case CONNECTION_RECONNECTED: + case CONNECTION_LOST: + // do nothing } - ); - zkWorker.start(); - return retVal; - } - catch (Exception e) { - throw new RuntimeException(e); - } + } + catch (Exception e) { + String znode = null; + if (event.getData() != null) { + znode = event.getData().getPath(); + } + log.makeAlert(e, "Failed to handle new worker status") + .addData("worker", zkWorker.getWorker().getHost()) + .addData("znode", znode) + .addData("eventType", event.getType().toString()) + .emit(); + } + } + }; } /** diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java index 0601648..b23e063 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java @@ -29,6 +29,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.common.util.concurrent.ListenableFuture; import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.IndexingServiceCondition; @@ -44,6 +45,7 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.testing.DeadlockDetectingTimeout; +import org.easymock.Capture; import org.easymock.EasyMock; import org.joda.time.Period; import org.junit.After; @@ -55,6 +57,7 @@ import org.junit.rules.TestRule; import java.util.ArrayList; import java.util.Collection; +import java.util.Map; import java.util.Set; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -944,4 +947,37 @@ public class RemoteTaskRunnerTest Assert.assertTrue(taskFuture2.get().isSuccess()); Assert.assertEquals(0, remoteTaskRunner.getBlackListedWorkers().size()); } + + @Test + public void testStatusListenerEventDataNullShouldNotThrowException() throws Exception + { + // Set up mock emitter to verify log alert when exception is thrown inside the status listener + Worker worker = EasyMock.createMock(Worker.class); + EasyMock.expect(worker.getHost()).andReturn("host").atLeastOnce(); + EasyMock.replay(worker); + ServiceEmitter emitter = EasyMock.createMock(ServiceEmitter.class); + Capture<EmittingLogger.EmittingAlertBuilder> capturedArgument = Capture.newInstance(); + emitter.emit(EasyMock.capture(capturedArgument)); + EasyMock.expectLastCall().atLeastOnce(); + EmittingLogger.registerEmitter(emitter); + EasyMock.replay(emitter); + + PathChildrenCache cache = new PathChildrenCache(cf, "/test", true); + testStartWithNoWorker(); + cache.getListenable().addListener(remoteTaskRunner.getStatusListener(worker, new ZkWorker(worker, cache, jsonMapper), null)); + cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); + + // Status listener will recieve event with null data + Assert.assertTrue( + TestUtils.conditionValid(() -> cache.getCurrentData().size() == 1) + ); + + // Verify that the log emitter was called + EasyMock.verify(worker); + EasyMock.verify(emitter); + Map<String, Object> alertDataMap = capturedArgument.getValue().build(null).getDataMap(); + Assert.assertTrue(alertDataMap.containsKey("znode")); + Assert.assertNull(alertDataMap.get("znode")); + // Status listener should successfully completes without throwing exception + } } diff --git a/licenses.yaml b/licenses.yaml index bcdaa8c..3442697 100644 --- a/licenses.yaml +++ b/licenses.yaml @@ -1315,7 +1315,7 @@ name: Apache Curator license_category: binary module: java-core license_name: Apache License version 2.0 -version: 4.1.0 +version: 4.3.0 libraries: - org.apache.curator: curator-client - org.apache.curator: curator-framework diff --git a/pom.xml b/pom.xml index 6d28e2f..391ac8e 100644 --- a/pom.xml +++ b/pom.xml @@ -76,7 +76,7 @@ <java.version>8</java.version> <project.build.resourceEncoding>UTF-8</project.build.resourceEncoding> <aether.version>0.9.0.M2</aether.version> - <apache.curator.version>4.1.0</apache.curator.version> + <apache.curator.version>4.3.0</apache.curator.version> <apache.curator.test.version>2.12.0</apache.curator.test.version> <apache.kafka.version>2.2.2</apache.kafka.version> <avatica.version>1.15.0</avatica.version> diff --git a/server/src/main/java/org/apache/druid/curator/discovery/DiscoveryModule.java b/server/src/main/java/org/apache/druid/curator/discovery/DiscoveryModule.java index fd89964..78c5669 100644 --- a/server/src/main/java/org/apache/druid/curator/discovery/DiscoveryModule.java +++ b/server/src/main/java/org/apache/druid/curator/discovery/DiscoveryModule.java @@ -482,6 +482,18 @@ public class DiscoveryModule implements Module { return this; } + + @Override + public ServiceProviderBuilder<T> executorService(ExecutorService executorService) + { + return this; + } + + @Override + public ServiceProviderBuilder<T> executorService(CloseableExecutorService closeableExecutorService) + { + return this; + } } private static class NoopServiceProvider<T> implements ServiceProvider<T> --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org