github-advanced-security[bot] commented on code in PR #49:
URL: https://github.com/apache/iotdb-extras/pull/49#discussion_r1981039321
##
iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/TaskRuntime.java:
##
@@ -33,106 +31,119 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-public class TaskRuntime {
+public class TaskRuntime implements AutoCloseable {
private static final Logger LOGGER =
LoggerFactory.getLogger(TaskRuntime.class);
- private static final Map TASK_REPOSITORY_MAP = new
ConcurrentHashMap<>();
+ private final Map tasks = new ConcurrentHashMap<>();
- public Response createTask(
+ public synchronized Response createTask(
final String taskId,
final Map sourceAttribute,
final Map processorAttribute,
final Map sinkAttribute) {
try {
- if (validateTaskIsExist(taskId)) {
+ if (tasks.containsKey(taskId)) {
return Response.status(Response.Status.CONFLICT)
.entity(String.format("task %s has existed", taskId))
.build();
}
- final SinkTask sinkTask = new SinkTask(sinkAttribute);
- final ProcessorTask processorTask = new
ProcessorTask(processorAttribute, sinkTask);
- final SourceTask sourceTask = new PushSourceTask(taskId,
sourceAttribute, processorTask);
- final TaskRepository taskRepository = new TaskRepository(sourceTask,
processorTask, sinkTask);
+ final SinkTask sinkTask = new SinkTask(taskId, sinkAttribute);
+ final ProcessorTask processorTask =
+ new ProcessorTask(taskId, processorAttribute,
sinkTask.makeProducer());
+ final SourceTask sourceTask =
+ SourceTask.construct(taskId, sourceAttribute,
processorTask.makeProducer());
- TASK_REPOSITORY_MAP.put(taskId, taskRepository);
- taskRepository.create();
+ final TaskCombiner taskCombiner = new TaskCombiner(sourceTask,
processorTask, sinkTask);
+ tasks.put(taskId, taskCombiner);
+ taskCombiner.create();
LOGGER.info("Successfully created task {}", taskId);
return Response.status(Response.Status.CREATED)
.entity(String.format("Successfully created task %s", taskId))
.build();
} catch (final Exception e) {
- LOGGER.warn("Failed to create task", e);
+ LOGGER.warn("Failed to create task {} because {}", taskId,
e.getMessage(), e);
return Response.serverError()
.entity(String.format("Failed to create task %s, because %s",
taskId, e.getMessage()))
.build();
}
}
- public boolean alterTask() {
-return true;
- }
-
- public Response startTask(final String taskId) {
-if (!validateTaskIsExist(taskId)) {
+ public synchronized Response startTask(final String taskId) {
+if (!tasks.containsKey(taskId)) {
return Response.status(Response.Status.NOT_FOUND)
.entity(String.format("task %s not found", taskId))
.build();
}
-TASK_REPOSITORY_MAP.get(taskId).start();
-LOGGER.info("Task {} started successfully", taskId);
-return Response.status(Response.Status.OK)
-.entity(String.format("task %s start successfully", taskId))
-.build();
+try {
+ tasks.get(taskId).start();
+
+ LOGGER.info("Task {} start successfully", taskId);
+ return Response.status(Response.Status.OK)
+ .entity(String.format("task %s start successfully", taskId))
+ .build();
+} catch (Exception e) {
+ LOGGER.warn("Failed to start task {} because {}", taskId,
e.getMessage(), e);
+ return Response.serverError()
+ .entity(String.format("Failed to start task %s, because %s", taskId,
e.getMessage()))
+ .build();
+}
}
- public Response stopTask(final String taskId) {
-if (!validateTaskIsExist(taskId)) {
+ public synchronized Response stopTask(final String taskId) {
+if (!tasks.containsKey(taskId)) {
return Response.status(Response.Status.NOT_FOUND)
.entity(String.format("task %s not found", taskId))
.build();
}
try {
- final TaskRepository taskRepository = TASK_REPOSITORY_MAP.get(taskId);
- if (taskRepository != null) {
-taskRepository.stop();
- }
+ tasks.get(taskId).stop();
+
+ LOGGER.info("Task {} stop successfully", taskId);
+ return Response.status(Response.Status.OK)
+ .entity(String.format("task %s stop successfully", taskId))
+ .build();
} catch (final Exception e) {
- LOGGER.warn("Failed to stop task", e);
+ LOGGER.warn("Failed to stop task {} because {}", taskId, e.getMessage(),
e);
Review Comment:
## Log Injection
This log entry depends on a [user-provided value](1).
[Show more
details](https://github.com/apache/iotdb-extras/security/code-scanning/24)
##
iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/TaskR