[GitHub] [druid] ayushkul2910 commented on a change in pull request #10448: Added CronScheduler support as a proof to clock drift while emitting metrics

2021-01-09 Thread GitBox


ayushkul2910 commented on a change in pull request #10448:
URL: https://github.com/apache/druid/pull/10448#discussion_r553893369



##
File path: 
server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java
##
@@ -108,10 +108,10 @@ public MonitorScheduler getMonitorScheduler(
 
 return new MonitorScheduler(
 config.get(),
-
CronScheduler.newBuilder(Duration.ofSeconds(1L)).setThreadName("MonitorScheduler-%s").build(),
+
CronScheduler.newBuilder(Duration.ofSeconds(1L)).setThreadName("MonitorSchedulerThread").build(),
 emitter,
 monitors,
-Executors.newCachedThreadPool()
+Execs.multiThreaded(64, "MonitorThread-%d")

Review comment:
   This makes sense, we should reduce the number of monitor threads.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [druid] ayushkul2910 commented on a change in pull request #10448: Added CronScheduler support as a proof to clock drift while emitting metrics

2021-01-08 Thread GitBox


ayushkul2910 commented on a change in pull request #10448:
URL: https://github.com/apache/druid/pull/10448#discussion_r553893369



##
File path: 
server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java
##
@@ -108,10 +108,10 @@ public MonitorScheduler getMonitorScheduler(
 
 return new MonitorScheduler(
 config.get(),
-
CronScheduler.newBuilder(Duration.ofSeconds(1L)).setThreadName("MonitorScheduler-%s").build(),
+
CronScheduler.newBuilder(Duration.ofSeconds(1L)).setThreadName("MonitorSchedulerThread").build(),
 emitter,
 monitors,
-Executors.newCachedThreadPool()
+Execs.multiThreaded(64, "MonitorThread-%d")

Review comment:
   This makes sense, we should reduce the number of monitor threads.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [druid] ayushkul2910 commented on a change in pull request #10448: Added CronScheduler support as a proof to clock drift while emitting metrics

2021-01-06 Thread GitBox


ayushkul2910 commented on a change in pull request #10448:
URL: https://github.com/apache/druid/pull/10448#discussion_r553162349



##
File path: 
server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java
##
@@ -108,10 +108,10 @@ public MonitorScheduler getMonitorScheduler(
 
 return new MonitorScheduler(
 config.get(),
-
CronScheduler.newBuilder(Duration.ofSeconds(1L)).setThreadName("MonitorScheduler-%s").build(),
+
CronScheduler.newBuilder(Duration.ofSeconds(1L)).setThreadName("MonitorSchedulerThread").build(),
 emitter,
 monitors,
-Executors.newCachedThreadPool()
+Execs.multiThreaded(64, "MonitorThread-%d")

Review comment:
   Hi @jihoonson , sorry for delay in response. 
   I think currently there are ~20 monitors, which can run concurrently with 
the MonitorScheduler class. Suppose a case in which frequency of scheduling < 
time taken by the executor thread to do `monitor.monitor(...)`(Although I am 
not sure if this case is possible in practical, kind of edge case). This can 
result in queuing of the tasks if threads are very less. I think we should 
atleast have no. of threads equal to max number of monitors supported. I may be 
missing something here. What do you think?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [druid] ayushkul2910 commented on a change in pull request #10448: Added CronScheduler support as a proof to clock drift while emitting metrics

2020-10-27 Thread GitBox


ayushkul2910 commented on a change in pull request #10448:
URL: https://github.com/apache/druid/pull/10448#discussion_r512608527



##
File path: 
core/src/main/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutors.java
##
@@ -167,6 +169,50 @@ public void run()
 );
   }
 
+  public static void scheduleAtFixedRate(CronScheduler exec, Duration rate, 
Callable callable)
+  {
+scheduleAtFixedRate(exec, rate, rate, callable);
+  }
+
+  /**
+   * Run callable once every period, after the given initial delay. Uses
+   * {@link CronScheduler} for task scheduling. Exceptions are caught and 
logged
+   * as errors.
+   */
+  public static void scheduleAtFixedRate(
+  final CronScheduler exec,
+  final Duration initialDelay,
+  final Duration rate,
+  final Callable callable
+  )
+  {
+log.debug("Scheduling periodically: %s with period %s", callable, rate);
+Instant delayInstance = Instant.now().plusMillis(initialDelay.getMillis());
+exec.scheduleAt(delayInstance,

Review comment:
   Keeping the above points in mind, I think this implementation will do.
   
   synchronized (lock) {
 monitor.start();
 Long rate = config.getEmitterPeriod().getMillis();
 Future scheduledFuture = scheduler.scheduleAtFixedRate(
 rate,
 rate,
 TimeUnit.MILLISECONDS,
 new CronTask()
 {
   private volatile Future monitorFuture = null;
   @Override
   public void run(long scheduledRunTimeMillis)
   {
 try {
   if (monitorFuture != null && monitorFuture.isDone()
   && !(monitorFuture.get() && hasMonitor(monitor))) {
 removeMonitor(monitor);
 monitor.getScheduledFuture().cancel(false);
 log.debug("Stopped rescheduling %s (delay %s)", this, 
rate);
 return;
   }
   
   log.trace("Running %s (period %s)", this, rate);
   monitorFuture = executor.submit(new Callable()
   {
 public Boolean call()
 {
   try {
 return monitor.monitor(emitter);
   } catch (Throwable e) {
 log.error(e, "Uncaught exception.");
 return false;
   }
 }
   });
 } 
 catch (Throwable e) {
   log.error(e, "Uncaught exception.");
 }
   }
 });
 monitor.setScheduledFuture(scheduledFuture);
   }
   
   
   In this:
   1.  Each monitor has a separate future.
   2.  Cron task is cheap, it checks a boolean condition. If condition is true 
it cancels the scheduling process for the particular monitor, else submits a 
callable for monitoring to executor service 
   3.  No race condition for cancelling scheduledFuture on first iteration.
   
   Please let me know your thoughts on this.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [druid] ayushkul2910 commented on a change in pull request #10448: Added CronScheduler support as a proof to clock drift while emitting metrics

2020-10-27 Thread GitBox


ayushkul2910 commented on a change in pull request #10448:
URL: https://github.com/apache/druid/pull/10448#discussion_r512608527



##
File path: 
core/src/main/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutors.java
##
@@ -167,6 +169,50 @@ public void run()
 );
   }
 
+  public static void scheduleAtFixedRate(CronScheduler exec, Duration rate, 
Callable callable)
+  {
+scheduleAtFixedRate(exec, rate, rate, callable);
+  }
+
+  /**
+   * Run callable once every period, after the given initial delay. Uses
+   * {@link CronScheduler} for task scheduling. Exceptions are caught and 
logged
+   * as errors.
+   */
+  public static void scheduleAtFixedRate(
+  final CronScheduler exec,
+  final Duration initialDelay,
+  final Duration rate,
+  final Callable callable
+  )
+  {
+log.debug("Scheduling periodically: %s with period %s", callable, rate);
+Instant delayInstance = Instant.now().plusMillis(initialDelay.getMillis());
+exec.scheduleAt(delayInstance,

Review comment:
   Keeping the above points in mind, I think this implementation will do.
   
   synchronized (lock) {
 monitor.start();
 Long rate = config.getEmitterPeriod().getMillis();
 Future scheduledFuture = scheduler.scheduleAtFixedRate(
 rate,
 rate,
 TimeUnit.MILLISECONDS,
 new CronTask()
 {
   private volatile Future monitorFuture = null;
   @Override
   public void run(long scheduledRunTimeMillis)
   {
 try {
   if (monitorFuture != null && monitorFuture.isDone()
   && !(monitorFuture.get() && hasMonitor(monitor))) {
 removeMonitor(monitor);
 monitor.getScheduledFuture().cancel(false);
 log.debug("Stopped rescheduling %s (delay %s)", this, 
rate);
 return;
   }
   
   log.trace("Running %s (period %s)", this, rate);
   monitorFuture = executor.submit(new Callable()
   {
 public Boolean call()
 {
   try {
 return monitor.monitor(emitter);
   } catch (Throwable e) {
 log.error(e, "Uncaught exception.");
 return false;
   }
 }
   });
 } 
 catch (Throwable e) {
   log.error(e, "Uncaught exception.");
 }
   }
 });
 monitor.setScheduledFuture(scheduledFuture);
   }
   
   
   In this:
   1.  Each monitor has a separate future.
   2.  Cron task is cheap, it checks a boolean condition. If condition is true 
it cancels the scheduling process for the 
particular monitor, else submits a callable for monitoring to executor 
service 
   3.  No race condition for cancelling scheduledFuture on first iteration.
   
   Please let me know your thoughts on this.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [druid] ayushkul2910 commented on a change in pull request #10448: Added CronScheduler support as a proof to clock drift while emitting metrics

2020-10-27 Thread GitBox


ayushkul2910 commented on a change in pull request #10448:
URL: https://github.com/apache/druid/pull/10448#discussion_r512608527



##
File path: 
core/src/main/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutors.java
##
@@ -167,6 +169,50 @@ public void run()
 );
   }
 
+  public static void scheduleAtFixedRate(CronScheduler exec, Duration rate, 
Callable callable)
+  {
+scheduleAtFixedRate(exec, rate, rate, callable);
+  }
+
+  /**
+   * Run callable once every period, after the given initial delay. Uses
+   * {@link CronScheduler} for task scheduling. Exceptions are caught and 
logged
+   * as errors.
+   */
+  public static void scheduleAtFixedRate(
+  final CronScheduler exec,
+  final Duration initialDelay,
+  final Duration rate,
+  final Callable callable
+  )
+  {
+log.debug("Scheduling periodically: %s with period %s", callable, rate);
+Instant delayInstance = Instant.now().plusMillis(initialDelay.getMillis());
+exec.scheduleAt(delayInstance,

Review comment:
   Keeping your above points in mind, I think this implementation will do.
   
   synchronized (lock) {
 monitor.start();
 Long rate = config.getEmitterPeriod().getMillis();
 Future scheduledFuture = scheduler.scheduleAtFixedRate(
 rate,
 rate,
 TimeUnit.MILLISECONDS,
 new CronTask()
 {
   private volatile Future monitorFuture = null;
   @Override
   public void run(long scheduledRunTimeMillis)
   {
 try {
   if (monitorFuture != null && monitorFuture.isDone()
   && !(monitorFuture.get() && hasMonitor(monitor))) {
 removeMonitor(monitor);
 monitor.getScheduledFuture().cancel(false);
 log.debug("Stopped rescheduling %s (delay %s)", this, 
rate);
 return;
   }
   
   log.trace("Running %s (period %s)", this, rate);
   monitorFuture = executor.submit(new Callable()
   {
 public Boolean call()
 {
   try {
 return monitor.monitor(emitter);
   } catch (Throwable e) {
 log.error(e, "Uncaught exception.");
 return false;
   }
 }
   });
 } 
 catch (Throwable e) {
   log.error(e, "Uncaught exception.");
 }
   }
 });
 monitor.setScheduledFuture(scheduledFuture);
   }
   
   
   In this:
   1.  Each monitor has a separate future.
   2.  Cron task is cheap, it checks a boolean condition. If condition is true 
it cancels the scheduling process for the 
particular monitor, else submits a callable for monitoring to executor 
service 
   3.  No race condition for cancelling scheduledFuture on first iteration.
   
   Please let me know your thoughts on this.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [druid] ayushkul2910 commented on a change in pull request #10448: Added CronScheduler support as a proof to clock drift while emitting metrics

2020-10-26 Thread GitBox


ayushkul2910 commented on a change in pull request #10448:
URL: https://github.com/apache/druid/pull/10448#discussion_r511941702



##
File path: 
core/src/main/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutors.java
##
@@ -167,6 +169,50 @@ public void run()
 );
   }
 
+  public static void scheduleAtFixedRate(CronScheduler exec, Duration rate, 
Callable callable)
+  {
+scheduleAtFixedRate(exec, rate, rate, callable);
+  }
+
+  /**
+   * Run callable once every period, after the given initial delay. Uses
+   * {@link CronScheduler} for task scheduling. Exceptions are caught and 
logged
+   * as errors.
+   */
+  public static void scheduleAtFixedRate(
+  final CronScheduler exec,
+  final Duration initialDelay,
+  final Duration rate,
+  final Callable callable
+  )
+  {
+log.debug("Scheduling periodically: %s with period %s", callable, rate);
+Instant delayInstance = Instant.now().plusMillis(initialDelay.getMillis());
+exec.scheduleAt(delayInstance,

Review comment:
   Hey @leventov, do you think the below implementation for 
`startMonitor(final Monitor monitor)` method in `MonitorScheduler` class will 
suffice? Also, can this cause any inconsistency  since `scheduledFuture` is 
volatile and is shared amongst all the monitors?
   
   synchronized (lock) {
 monitor.start();
 Long rate = config.getEmitterPeriod().getMillis();
 scheduledFuture = scheduler.scheduleAtFixedRate(
 rate,
 rate,
 TimeUnit.MILLISECONDS,
 new CronTask()
 {
   @Override
   public void run(long scheduledRunTimeMillis)
   {
 try {
   if (monitor.monitor(emitter) && hasMonitor(monitor)) {
 log.trace("Running %s (period %s)", this, rate);
   } else {
 log.debug("Stopping rescheduling %s (delay %s)", this, 
rate);
 removeMonitor(monitor);
 while (scheduledFuture == null) {
   Thread.sleep(1);
 }
 scheduledFuture.cancel(false);
   }
 } catch (Throwable e) {
   log.error(e, "Uncaught exception.");
 }
   }
 });
   }





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org



[GitHub] [druid] ayushkul2910 commented on a change in pull request #10448: Added CronScheduler support as a proof to clock drift while emitting metrics

2020-10-18 Thread GitBox


ayushkul2910 commented on a change in pull request #10448:
URL: https://github.com/apache/druid/pull/10448#discussion_r507186926



##
File path: 
core/src/main/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutors.java
##
@@ -167,6 +169,50 @@ public void run()
 );
   }
 
+  public static void scheduleAtFixedRate(CronScheduler exec, Duration rate, 
Callable callable)
+  {
+scheduleAtFixedRate(exec, rate, rate, callable);
+  }
+
+  /**
+   * Run callable once every period, after the given initial delay. Uses
+   * {@link CronScheduler} for task scheduling. Exceptions are caught and 
logged
+   * as errors.
+   */
+  public static void scheduleAtFixedRate(
+  final CronScheduler exec,
+  final Duration initialDelay,
+  final Duration rate,
+  final Callable callable
+  )
+  {
+log.debug("Scheduling periodically: %s with period %s", callable, rate);
+Instant delayInstance = Instant.now().plusMillis(initialDelay.getMillis());
+exec.scheduleAt(delayInstance,

Review comment:
   That makes sense. I'll change this.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org