This is an automated email from the ASF dual-hosted git repository. xhsun pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new 866289c [TE] distribute detection and notification tasks (#4217) 866289c is described below commit 866289cf2671316d73195434c5aabd8e315ba18f Author: Xiaohui Sun <xh...@linkedin.com> AuthorDate: Tue May 21 14:20:17 2019 -0700 [TE] distribute detection and notification tasks (#4217) * [TE] distribute detection and notification tasks * [TE] fix integration test by adding cron explicitly * [TE] revert removing notification flag * [TE] randomization when creating the task * [TE] randomization when creating the task * [TE] Distribute job load to add random delays * [TE] Fix minor bug when logging sleep time --- .../pinot/thirdeye/detection/DetectionPipelineJob.java | 17 ++++++++++++++--- .../thirdeye/detection/alert/DetectionAlertJob.java | 16 ++++++++++++++-- .../yaml/CompositePipelineConfigTranslator.java | 5 +++++ .../yaml/YamlDetectionAlertConfigTranslator.java | 3 ++- 4 files changed, 35 insertions(+), 6 deletions(-) diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionPipelineJob.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionPipelineJob.java index 6349466..99f2708 100644 --- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionPipelineJob.java +++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionPipelineJob.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; import java.util.List; import java.util.Optional; +import java.util.Random; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.pinot.thirdeye.anomaly.task.TaskConstants; @@ -82,9 +83,19 @@ public class DetectionPipelineJob implements Job { taskDTO.setStatus(TaskConstants.TaskStatus.WAITING); taskDTO.setTaskInfo(taskInfoJson); - long taskId = taskDAO.save(taskDTO); - LOG.info("Created detection pipeline task {} with taskId {}", taskDTO, taskId); - + // TODO: revisit it after identifying bottlenecks + // Here will write the task information to mysql. + // Sleep random 0 - 5 seconds to distribute load to mysql. + Random random = new Random(); + try { + int sleepTime = random.nextInt(5000); + LOG.info("Wait for " + sleepTime + " milliseconds."); + Thread.sleep(sleepTime); + long taskId = taskDAO.save(taskDTO); + LOG.info("Created detection pipeline task {} with taskId {}", taskDTO, taskId); + } catch (InterruptedException e) { + LOG.error(e.toString()); + } } private Long getIdFromJobKey(String jobKey) { diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertJob.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertJob.java index bf89932..41ebcd5 100644 --- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertJob.java +++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertJob.java @@ -22,6 +22,7 @@ package org.apache.pinot.thirdeye.detection.alert; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import java.util.List; +import java.util.Random; import org.apache.pinot.thirdeye.anomaly.task.TaskConstants; import org.apache.pinot.thirdeye.datalayer.bao.DetectionAlertConfigManager; import org.apache.pinot.thirdeye.datalayer.bao.TaskManager; @@ -91,8 +92,19 @@ public class DetectionAlertJob implements Job { taskDTO.setStatus(TaskConstants.TaskStatus.WAITING); taskDTO.setTaskInfo(taskInfoJson); - long taskId = taskDAO.save(taskDTO); - LOG.info("Created subscription task {} with settings {}", taskId, taskDTO); + // TODO: revisit it after identifying bottlenecks + // Here will write the task information to mysql. + // Sleep random 0 - 5 seconds to distribute load to mysql. + Random random = new Random(); + try { + int sleepTime = random.nextInt(5000); + LOG.info("Wait for " + sleepTime + " milliseconds."); + Thread.sleep(sleepTime); + long taskId = taskDAO.save(taskDTO); + LOG.info("Created subscription task {} with settings {}", taskId, taskDTO); + } catch (InterruptedException e) { + LOG.error(e.toString()); + } } private Long getIdFromJobKey(String jobKey) { diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/CompositePipelineConfigTranslator.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/CompositePipelineConfigTranslator.java index 5c30ae2..dde36f4 100644 --- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/CompositePipelineConfigTranslator.java +++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/CompositePipelineConfigTranslator.java @@ -401,6 +401,11 @@ public class CompositePipelineConfigTranslator extends YamlDetectionConfigTransl return properties; } + // Default schedule: + // minute granularity: every 15 minutes, starts at 0 minute + // hourly: every hour, starts at 0 minute + // daily: every day, starts at 2 pm UTC + // others: every day, start at 12 am UTC private String buildCron() { switch (this.datasetConfig.bucketTimeGranularity().getUnit()) { case MINUTES: diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/YamlDetectionAlertConfigTranslator.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/YamlDetectionAlertConfigTranslator.java index 04501ab..6b90e1b 100644 --- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/YamlDetectionAlertConfigTranslator.java +++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/YamlDetectionAlertConfigTranslator.java @@ -63,7 +63,8 @@ public class YamlDetectionAlertConfigTranslator { static final String PROP_ALERT_SUPPRESSORS = "alertSuppressors"; static final String PROP_REFERENCE_LINKS = "referenceLinks"; static final String PROP_TIME_WINDOWS = "timeWindows"; - static final String CRON_SCHEDULE_DEFAULT = "0 0/5 * * * ? *"; // Every 5 min + // Every 5 minutes. + static final String CRON_SCHEDULE_DEFAULT = "0 0/5 * * * ? *"; private static final String PROP_DIMENSION = "dimension"; private static final String PROP_DIMENSION_RECIPIENTS = "dimensionRecipients"; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org