[GitHub] [druid] ayushkul2910 commented on a change in pull request #10448: Added CronScheduler support as a proof to clock drift while emitting metrics
ayushkul2910 commented on a change in pull request #10448: URL: https://github.com/apache/druid/pull/10448#discussion_r553893369 ## File path: server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java ## @@ -108,10 +108,10 @@ public MonitorScheduler getMonitorScheduler( return new MonitorScheduler( config.get(), - CronScheduler.newBuilder(Duration.ofSeconds(1L)).setThreadName("MonitorScheduler-%s").build(), + CronScheduler.newBuilder(Duration.ofSeconds(1L)).setThreadName("MonitorSchedulerThread").build(), emitter, monitors, -Executors.newCachedThreadPool() +Execs.multiThreaded(64, "MonitorThread-%d") Review comment: This makes sense, we should reduce the number of monitor threads. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] [druid] ayushkul2910 commented on a change in pull request #10448: Added CronScheduler support as a proof to clock drift while emitting metrics
ayushkul2910 commented on a change in pull request #10448: URL: https://github.com/apache/druid/pull/10448#discussion_r553893369 ## File path: server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java ## @@ -108,10 +108,10 @@ public MonitorScheduler getMonitorScheduler( return new MonitorScheduler( config.get(), - CronScheduler.newBuilder(Duration.ofSeconds(1L)).setThreadName("MonitorScheduler-%s").build(), + CronScheduler.newBuilder(Duration.ofSeconds(1L)).setThreadName("MonitorSchedulerThread").build(), emitter, monitors, -Executors.newCachedThreadPool() +Execs.multiThreaded(64, "MonitorThread-%d") Review comment: This makes sense, we should reduce the number of monitor threads. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] [druid] ayushkul2910 commented on a change in pull request #10448: Added CronScheduler support as a proof to clock drift while emitting metrics
ayushkul2910 commented on a change in pull request #10448: URL: https://github.com/apache/druid/pull/10448#discussion_r553162349 ## File path: server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java ## @@ -108,10 +108,10 @@ public MonitorScheduler getMonitorScheduler( return new MonitorScheduler( config.get(), - CronScheduler.newBuilder(Duration.ofSeconds(1L)).setThreadName("MonitorScheduler-%s").build(), + CronScheduler.newBuilder(Duration.ofSeconds(1L)).setThreadName("MonitorSchedulerThread").build(), emitter, monitors, -Executors.newCachedThreadPool() +Execs.multiThreaded(64, "MonitorThread-%d") Review comment: Hi @jihoonson , sorry for delay in response. I think currently there are ~20 monitors, which can run concurrently with the MonitorScheduler class. Suppose a case in which frequency of scheduling < time taken by the executor thread to do `monitor.monitor(...)`(Although I am not sure if this case is possible in practical, kind of edge case). This can result in queuing of the tasks if threads are very less. I think we should atleast have no. of threads equal to max number of monitors supported. I may be missing something here. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] [druid] ayushkul2910 commented on a change in pull request #10448: Added CronScheduler support as a proof to clock drift while emitting metrics
ayushkul2910 commented on a change in pull request #10448: URL: https://github.com/apache/druid/pull/10448#discussion_r512608527 ## File path: core/src/main/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutors.java ## @@ -167,6 +169,50 @@ public void run() ); } + public static void scheduleAtFixedRate(CronScheduler exec, Duration rate, Callable callable) + { +scheduleAtFixedRate(exec, rate, rate, callable); + } + + /** + * Run callable once every period, after the given initial delay. Uses + * {@link CronScheduler} for task scheduling. Exceptions are caught and logged + * as errors. + */ + public static void scheduleAtFixedRate( + final CronScheduler exec, + final Duration initialDelay, + final Duration rate, + final Callable callable + ) + { +log.debug("Scheduling periodically: %s with period %s", callable, rate); +Instant delayInstance = Instant.now().plusMillis(initialDelay.getMillis()); +exec.scheduleAt(delayInstance, Review comment: Keeping the above points in mind, I think this implementation will do. synchronized (lock) { monitor.start(); Long rate = config.getEmitterPeriod().getMillis(); Future scheduledFuture = scheduler.scheduleAtFixedRate( rate, rate, TimeUnit.MILLISECONDS, new CronTask() { private volatile Future monitorFuture = null; @Override public void run(long scheduledRunTimeMillis) { try { if (monitorFuture != null && monitorFuture.isDone() && !(monitorFuture.get() && hasMonitor(monitor))) { removeMonitor(monitor); monitor.getScheduledFuture().cancel(false); log.debug("Stopped rescheduling %s (delay %s)", this, rate); return; } log.trace("Running %s (period %s)", this, rate); monitorFuture = executor.submit(new Callable() { public Boolean call() { try { return monitor.monitor(emitter); } catch (Throwable e) { log.error(e, "Uncaught exception."); return false; } } }); } catch (Throwable e) { log.error(e, "Uncaught exception."); } } }); monitor.setScheduledFuture(scheduledFuture); } In this: 1. Each monitor has a separate future. 2. Cron task is cheap, it checks a boolean condition. If condition is true it cancels the scheduling process for the particular monitor, else submits a callable for monitoring to executor service 3. No race condition for cancelling scheduledFuture on first iteration. Please let me know your thoughts on this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] [druid] ayushkul2910 commented on a change in pull request #10448: Added CronScheduler support as a proof to clock drift while emitting metrics
ayushkul2910 commented on a change in pull request #10448: URL: https://github.com/apache/druid/pull/10448#discussion_r512608527 ## File path: core/src/main/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutors.java ## @@ -167,6 +169,50 @@ public void run() ); } + public static void scheduleAtFixedRate(CronScheduler exec, Duration rate, Callable callable) + { +scheduleAtFixedRate(exec, rate, rate, callable); + } + + /** + * Run callable once every period, after the given initial delay. Uses + * {@link CronScheduler} for task scheduling. Exceptions are caught and logged + * as errors. + */ + public static void scheduleAtFixedRate( + final CronScheduler exec, + final Duration initialDelay, + final Duration rate, + final Callable callable + ) + { +log.debug("Scheduling periodically: %s with period %s", callable, rate); +Instant delayInstance = Instant.now().plusMillis(initialDelay.getMillis()); +exec.scheduleAt(delayInstance, Review comment: Keeping the above points in mind, I think this implementation will do. synchronized (lock) { monitor.start(); Long rate = config.getEmitterPeriod().getMillis(); Future scheduledFuture = scheduler.scheduleAtFixedRate( rate, rate, TimeUnit.MILLISECONDS, new CronTask() { private volatile Future monitorFuture = null; @Override public void run(long scheduledRunTimeMillis) { try { if (monitorFuture != null && monitorFuture.isDone() && !(monitorFuture.get() && hasMonitor(monitor))) { removeMonitor(monitor); monitor.getScheduledFuture().cancel(false); log.debug("Stopped rescheduling %s (delay %s)", this, rate); return; } log.trace("Running %s (period %s)", this, rate); monitorFuture = executor.submit(new Callable() { public Boolean call() { try { return monitor.monitor(emitter); } catch (Throwable e) { log.error(e, "Uncaught exception."); return false; } } }); } catch (Throwable e) { log.error(e, "Uncaught exception."); } } }); monitor.setScheduledFuture(scheduledFuture); } In this: 1. Each monitor has a separate future. 2. Cron task is cheap, it checks a boolean condition. If condition is true it cancels the scheduling process for the particular monitor, else submits a callable for monitoring to executor service 3. No race condition for cancelling scheduledFuture on first iteration. Please let me know your thoughts on this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] [druid] ayushkul2910 commented on a change in pull request #10448: Added CronScheduler support as a proof to clock drift while emitting metrics
ayushkul2910 commented on a change in pull request #10448: URL: https://github.com/apache/druid/pull/10448#discussion_r512608527 ## File path: core/src/main/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutors.java ## @@ -167,6 +169,50 @@ public void run() ); } + public static void scheduleAtFixedRate(CronScheduler exec, Duration rate, Callable callable) + { +scheduleAtFixedRate(exec, rate, rate, callable); + } + + /** + * Run callable once every period, after the given initial delay. Uses + * {@link CronScheduler} for task scheduling. Exceptions are caught and logged + * as errors. + */ + public static void scheduleAtFixedRate( + final CronScheduler exec, + final Duration initialDelay, + final Duration rate, + final Callable callable + ) + { +log.debug("Scheduling periodically: %s with period %s", callable, rate); +Instant delayInstance = Instant.now().plusMillis(initialDelay.getMillis()); +exec.scheduleAt(delayInstance, Review comment: Keeping your above points in mind, I think this implementation will do. synchronized (lock) { monitor.start(); Long rate = config.getEmitterPeriod().getMillis(); Future scheduledFuture = scheduler.scheduleAtFixedRate( rate, rate, TimeUnit.MILLISECONDS, new CronTask() { private volatile Future monitorFuture = null; @Override public void run(long scheduledRunTimeMillis) { try { if (monitorFuture != null && monitorFuture.isDone() && !(monitorFuture.get() && hasMonitor(monitor))) { removeMonitor(monitor); monitor.getScheduledFuture().cancel(false); log.debug("Stopped rescheduling %s (delay %s)", this, rate); return; } log.trace("Running %s (period %s)", this, rate); monitorFuture = executor.submit(new Callable() { public Boolean call() { try { return monitor.monitor(emitter); } catch (Throwable e) { log.error(e, "Uncaught exception."); return false; } } }); } catch (Throwable e) { log.error(e, "Uncaught exception."); } } }); monitor.setScheduledFuture(scheduledFuture); } In this: 1. Each monitor has a separate future. 2. Cron task is cheap, it checks a boolean condition. If condition is true it cancels the scheduling process for the particular monitor, else submits a callable for monitoring to executor service 3. No race condition for cancelling scheduledFuture on first iteration. Please let me know your thoughts on this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] [druid] ayushkul2910 commented on a change in pull request #10448: Added CronScheduler support as a proof to clock drift while emitting metrics
ayushkul2910 commented on a change in pull request #10448: URL: https://github.com/apache/druid/pull/10448#discussion_r511941702 ## File path: core/src/main/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutors.java ## @@ -167,6 +169,50 @@ public void run() ); } + public static void scheduleAtFixedRate(CronScheduler exec, Duration rate, Callable callable) + { +scheduleAtFixedRate(exec, rate, rate, callable); + } + + /** + * Run callable once every period, after the given initial delay. Uses + * {@link CronScheduler} for task scheduling. Exceptions are caught and logged + * as errors. + */ + public static void scheduleAtFixedRate( + final CronScheduler exec, + final Duration initialDelay, + final Duration rate, + final Callable callable + ) + { +log.debug("Scheduling periodically: %s with period %s", callable, rate); +Instant delayInstance = Instant.now().plusMillis(initialDelay.getMillis()); +exec.scheduleAt(delayInstance, Review comment: Hey @leventov, do you think the below implementation for `startMonitor(final Monitor monitor)` method in `MonitorScheduler` class will suffice? Also, can this cause any inconsistency since `scheduledFuture` is volatile and is shared amongst all the monitors? synchronized (lock) { monitor.start(); Long rate = config.getEmitterPeriod().getMillis(); scheduledFuture = scheduler.scheduleAtFixedRate( rate, rate, TimeUnit.MILLISECONDS, new CronTask() { @Override public void run(long scheduledRunTimeMillis) { try { if (monitor.monitor(emitter) && hasMonitor(monitor)) { log.trace("Running %s (period %s)", this, rate); } else { log.debug("Stopping rescheduling %s (delay %s)", this, rate); removeMonitor(monitor); while (scheduledFuture == null) { Thread.sleep(1); } scheduledFuture.cancel(false); } } catch (Throwable e) { log.error(e, "Uncaught exception."); } } }); } This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] [druid] ayushkul2910 commented on a change in pull request #10448: Added CronScheduler support as a proof to clock drift while emitting metrics
ayushkul2910 commented on a change in pull request #10448: URL: https://github.com/apache/druid/pull/10448#discussion_r507186926 ## File path: core/src/main/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutors.java ## @@ -167,6 +169,50 @@ public void run() ); } + public static void scheduleAtFixedRate(CronScheduler exec, Duration rate, Callable callable) + { +scheduleAtFixedRate(exec, rate, rate, callable); + } + + /** + * Run callable once every period, after the given initial delay. Uses + * {@link CronScheduler} for task scheduling. Exceptions are caught and logged + * as errors. + */ + public static void scheduleAtFixedRate( + final CronScheduler exec, + final Duration initialDelay, + final Duration rate, + final Callable callable + ) + { +log.debug("Scheduling periodically: %s with period %s", callable, rate); +Instant delayInstance = Instant.now().plusMillis(initialDelay.getMillis()); +exec.scheduleAt(delayInstance, Review comment: That makes sense. I'll change this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org