[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r397030268 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java ## @@ -179,28 +164,82 @@ private static ReporterSetup createReporterSetup(String reporterName, MetricConf metricReporterOptional.ifPresent(reporter -> { MetricConfig metricConfig = new MetricConfig(); reporterConfig.addAllToProperties(metricConfig); - - reporterArguments.add(createReporterSetup(reporterName, metricConfig, reporter)); + reporterSetups.add(createReporterSetup(reporterName, metricConfig, reporter)); }); - } - catch (Throwable t) { + } catch (Throwable t) { LOG.error("Could not instantiate metrics reporter {}. Metrics might not be exposed/reported.", reporterName, t); } } - return reporterArguments; + return reporterSetups; } - private static Map loadReporterFactories() { - final ServiceLoader serviceLoader = ServiceLoader.load(MetricReporterFactory.class); + private static List> loadReporterConfigurations(Configuration configuration, Set namedReporters) { + final List> reporterConfigurations = new ArrayList<>(namedReporters.size()); + for (String namedReporter: namedReporters) { + DelegatingConfiguration delegatingConfiguration = new DelegatingConfiguration( + configuration, + ConfigConstants.METRICS_REPORTER_PREFIX + namedReporter + '.'); + + reporterConfigurations.add(Tuple2.of(namedReporter, delegatingConfiguration)); + } + return reporterConfigurations; + } + + private static Set findEnabledReportersInConfiguration(Configuration configuration, String includedReportersString) { + Set includedReporters = reporterListPattern.splitAsStream(includedReportersString) + .filter(r -> !r.isEmpty()) // splitting an empty string results in an empty string on jdk9+ + .collect(Collectors.toSet()); Review comment: You've mentioned "to get rid of the next few lines", but this Tree data structure is further used as a container that is filled using some conditional logic and returned from the method, it is not just about having the input entries sorted. Do you propose to rewrite it? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r397030268 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java ## @@ -179,28 +164,82 @@ private static ReporterSetup createReporterSetup(String reporterName, MetricConf metricReporterOptional.ifPresent(reporter -> { MetricConfig metricConfig = new MetricConfig(); reporterConfig.addAllToProperties(metricConfig); - - reporterArguments.add(createReporterSetup(reporterName, metricConfig, reporter)); + reporterSetups.add(createReporterSetup(reporterName, metricConfig, reporter)); }); - } - catch (Throwable t) { + } catch (Throwable t) { LOG.error("Could not instantiate metrics reporter {}. Metrics might not be exposed/reported.", reporterName, t); } } - return reporterArguments; + return reporterSetups; } - private static Map loadReporterFactories() { - final ServiceLoader serviceLoader = ServiceLoader.load(MetricReporterFactory.class); + private static List> loadReporterConfigurations(Configuration configuration, Set namedReporters) { + final List> reporterConfigurations = new ArrayList<>(namedReporters.size()); + for (String namedReporter: namedReporters) { + DelegatingConfiguration delegatingConfiguration = new DelegatingConfiguration( + configuration, + ConfigConstants.METRICS_REPORTER_PREFIX + namedReporter + '.'); + + reporterConfigurations.add(Tuple2.of(namedReporter, delegatingConfiguration)); + } + return reporterConfigurations; + } + + private static Set findEnabledReportersInConfiguration(Configuration configuration, String includedReportersString) { + Set includedReporters = reporterListPattern.splitAsStream(includedReportersString) + .filter(r -> !r.isEmpty()) // splitting an empty string results in an empty string on jdk9+ + .collect(Collectors.toSet()); Review comment: You've mentioned "to get rid of the next few lines", but this Tree data structure is further used as a container that is filled with some conditional logic and returned, it is not just about having the input entries sorted. Do you propose to rewrite it? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r397030268 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java ## @@ -179,28 +164,82 @@ private static ReporterSetup createReporterSetup(String reporterName, MetricConf metricReporterOptional.ifPresent(reporter -> { MetricConfig metricConfig = new MetricConfig(); reporterConfig.addAllToProperties(metricConfig); - - reporterArguments.add(createReporterSetup(reporterName, metricConfig, reporter)); + reporterSetups.add(createReporterSetup(reporterName, metricConfig, reporter)); }); - } - catch (Throwable t) { + } catch (Throwable t) { LOG.error("Could not instantiate metrics reporter {}. Metrics might not be exposed/reported.", reporterName, t); } } - return reporterArguments; + return reporterSetups; } - private static Map loadReporterFactories() { - final ServiceLoader serviceLoader = ServiceLoader.load(MetricReporterFactory.class); + private static List> loadReporterConfigurations(Configuration configuration, Set namedReporters) { + final List> reporterConfigurations = new ArrayList<>(namedReporters.size()); + for (String namedReporter: namedReporters) { + DelegatingConfiguration delegatingConfiguration = new DelegatingConfiguration( + configuration, + ConfigConstants.METRICS_REPORTER_PREFIX + namedReporter + '.'); + + reporterConfigurations.add(Tuple2.of(namedReporter, delegatingConfiguration)); + } + return reporterConfigurations; + } + + private static Set findEnabledReportersInConfiguration(Configuration configuration, String includedReportersString) { + Set includedReporters = reporterListPattern.splitAsStream(includedReportersString) + .filter(r -> !r.isEmpty()) // splitting an empty string results in an empty string on jdk9+ + .collect(Collectors.toSet()); Review comment: You've mentioned "to get rid of the next few lines", but this Tree data structure is further used as a container that is filled using some conditional logic and returned, it is not just about having the input entries sorted. Do you propose to rewrite it? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r396809894 ## File path: flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java ## @@ -120,24 +122,51 @@ public static void checkOS() { public final DownloadCache downloadCache = DownloadCache.get(); @Test - public void testReporter() throws Exception { - dist.copyOptJarsToLib("flink-metrics-prometheus"); + public void reporterWorksWhenFoundInLibsViaReflection() throws Exception { + dist.copyOptJarsToLib(PROMETHEUS_JAR_PREFIX); Review comment: @zentol It seems that this updated approach that got merged into master does not support the kinds of tests that we would need to do for the supported scenarios (see reporterWorksWhenFoundInLibsViaReflection, reporterWorksWhenFoundInPluginsViaReflection, reporterWorksWhenFoundBothInPluginsAndLibsViaFactories in this PR). How should we proceed? I can either bend the FlinkResource implementation back to the state where it supports modifications of the underlying resources after creation and keep the initialization in `@Rule` (a hack), or reinitialize FlinkResource in every test. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r396809894 ## File path: flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java ## @@ -120,24 +122,51 @@ public static void checkOS() { public final DownloadCache downloadCache = DownloadCache.get(); @Test - public void testReporter() throws Exception { - dist.copyOptJarsToLib("flink-metrics-prometheus"); + public void reporterWorksWhenFoundInLibsViaReflection() throws Exception { + dist.copyOptJarsToLib(PROMETHEUS_JAR_PREFIX); Review comment: @zentol It seems that this updated approach that got merged into master does not support the kinds of tests that we would need to do for the supported scenarios (see reporterWorksWhenFoundInLibsViaReflection, reporterWorksWhenFoundInPluginsViaReflection, reporterWorksWhenFoundBothInPluginsAndLibsViaFactories in this PR). How should we proceed? I can either bend the FlinkResource implementation back to the state where it supports modifications of the underlying resources after creation and keep the initialization in `@Rule` (a hack), or reinitizliae FlinkResource in every test (probably a bad idea from performance and overhead perspectives). What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r396809894 ## File path: flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java ## @@ -120,24 +122,51 @@ public static void checkOS() { public final DownloadCache downloadCache = DownloadCache.get(); @Test - public void testReporter() throws Exception { - dist.copyOptJarsToLib("flink-metrics-prometheus"); + public void reporterWorksWhenFoundInLibsViaReflection() throws Exception { + dist.copyOptJarsToLib(PROMETHEUS_JAR_PREFIX); Review comment: @zentol It seems that this updated approach that got merged into master does not support the kinds of tests that we would need to do for the supported scenarios (see reporterWorksWhenFoundInLibsViaReflection, reporterWorksWhenFoundInPluginsViaReflection, reporterWorksWhenFoundBothInPluginsAndLibsViaFactories in this PR). How should we proceed? I can either bend the FlinkResource implementation back to the state where it supports modifications of the underlying resources after creation and keep the initialization in @ Rule (a hack), or reinitizliae FlinkResource in every test (probably a bad idea from performance and overhead perspectives). What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r396809894 ## File path: flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java ## @@ -120,24 +122,51 @@ public static void checkOS() { public final DownloadCache downloadCache = DownloadCache.get(); @Test - public void testReporter() throws Exception { - dist.copyOptJarsToLib("flink-metrics-prometheus"); + public void reporterWorksWhenFoundInLibsViaReflection() throws Exception { + dist.copyOptJarsToLib(PROMETHEUS_JAR_PREFIX); Review comment: @zentol It seems that this updated approach that got merged into master does not support the kinds of tests that we would need to do for the supported scenarios (see reporterWorksWhenFoundInLibsViaReflection, reporterWorksWhenFoundInPluginsViaReflection, reporterWorksWhenFoundBothInPluginsAndLibsViaFactories in this PR). How should we proceed? I can either bend the FlinkResource implementation back to the state where it supports modifications of the underlying resources after creation and keep the initialization in @Rule (a hack), or reinitizliae FlinkResource in every test (probably a bad idea from performance and overhead perspectives). What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r395661258 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java ## @@ -210,17 +213,31 @@ private static ReporterSetup createReporterSetup(String reporterName, MetricConf return namedOrderedReporters; } - private static Map loadReporterFactories() { - final ServiceLoader serviceLoader = ServiceLoader.load(MetricReporterFactory.class); - + private static Map loadAvailableReporterFactories(PluginManager pluginManager) { final Map reporterFactories = new HashMap<>(2); - final Iterator factoryIterator = serviceLoader.iterator(); + final Iterator factoryIterator = getAllReporterFactories(pluginManager); + LOG.info("Prepare reporter factories (from both SPIs and Plugins):"); // do not use streams or for-each loops here because they do not allow catching individual ServiceConfigurationErrors // such an error might be caused if the META-INF/services contains an entry to a non-existing factory class while (factoryIterator.hasNext()) { try { MetricReporterFactory factory = factoryIterator.next(); - reporterFactories.put(factory.getClass().getName(), factory); + String factoryClassName = factory.getClass().getName(); + MetricReporterFactory existingFactory = reporterFactories.get(factoryClassName); + if (existingFactory == null){ + reporterFactories.put(factoryClassName, factory); + LOG.info("Found reporter factory {} at {} ", + factoryClassName, + new File(factory.getClass().getProtectionDomain().getCodeSource().getLocation().toURI()).getCanonicalPath()); + } else { + //TODO: use path information below, when Plugin Classloader stops always prioritizing factories from /lib +// String jarPath1 = new File(existingFactory.getClass().getProtectionDomain().getCodeSource().getLocation() +// .toURI()).getCanonicalPath(); +// String jarPath2 = new File(factory.getClass().getProtectionDomain().getCodeSource().getLocation() +// .toURI()).getCanonicalPath(); +// LOG.warn("Multiple implementations of the same reporter were found: \n {} and \n{}", jarPath1, jarPath2); + LOG.warn("Multiple implementations of the same reporter were found in 'lib' and/or 'plugins' directories for {}. It is recommended to remove redundant reporter JARs to resolve used versions' ambiguity.", factoryClassName); Review comment: I leave it as "and/or" here, and then we comb through the docs and sources to address it separately. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r395376334 ## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java ## @@ -260,16 +262,26 @@ public void submitSQLJob(SQLJobSubmission job) throws IOException { } public void copyOptJarsToLib(String jarNamePrefix) throws FileNotFoundException, IOException { - final Optional reporterJarOptional; - try (Stream logFiles = Files.walk(opt)) { - reporterJarOptional = logFiles + copyOptJars(jarNamePrefix, lib); + } + + public void copyOptJarsToPlugins(String jarNamePrefix) throws FileNotFoundException, IOException { Review comment: I assume you mean to modify `FlinkDistribution#mapJarLocationToPath` ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r395375550 ## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java ## @@ -260,16 +262,26 @@ public void submitSQLJob(SQLJobSubmission job) throws IOException { } public void copyOptJarsToLib(String jarNamePrefix) throws FileNotFoundException, IOException { - final Optional reporterJarOptional; - try (Stream logFiles = Files.walk(opt)) { - reporterJarOptional = logFiles + copyOptJars(jarNamePrefix, lib); + } + + public void copyOptJarsToPlugins(String jarNamePrefix) throws FileNotFoundException, IOException { Review comment: > and modify FlinkDistribution#moveJar to handle this location appropriately. could you clarify? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r395376334 ## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java ## @@ -260,16 +262,26 @@ public void submitSQLJob(SQLJobSubmission job) throws IOException { } public void copyOptJarsToLib(String jarNamePrefix) throws FileNotFoundException, IOException { - final Optional reporterJarOptional; - try (Stream logFiles = Files.walk(opt)) { - reporterJarOptional = logFiles + copyOptJars(jarNamePrefix, lib); + } + + public void copyOptJarsToPlugins(String jarNamePrefix) throws FileNotFoundException, IOException { Review comment: I assume you mean to modify FlinkDistribution#mapJarLocationToPath ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r395375550 ## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java ## @@ -260,16 +262,26 @@ public void submitSQLJob(SQLJobSubmission job) throws IOException { } public void copyOptJarsToLib(String jarNamePrefix) throws FileNotFoundException, IOException { - final Optional reporterJarOptional; - try (Stream logFiles = Files.walk(opt)) { - reporterJarOptional = logFiles + copyOptJars(jarNamePrefix, lib); + } + + public void copyOptJarsToPlugins(String jarNamePrefix) throws FileNotFoundException, IOException { Review comment: > and modify FlinkDistribution#moveJar to handle this location appropriately. - could you clarify? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r395375550 ## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java ## @@ -260,16 +262,26 @@ public void submitSQLJob(SQLJobSubmission job) throws IOException { } public void copyOptJarsToLib(String jarNamePrefix) throws FileNotFoundException, IOException { - final Optional reporterJarOptional; - try (Stream logFiles = Files.walk(opt)) { - reporterJarOptional = logFiles + copyOptJars(jarNamePrefix, lib); + } + + public void copyOptJarsToPlugins(String jarNamePrefix) throws FileNotFoundException, IOException { Review comment: > and modify FlinkDistribution#moveJar to handle this location appropriately. could you clarify? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r395375550 ## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java ## @@ -260,16 +262,26 @@ public void submitSQLJob(SQLJobSubmission job) throws IOException { } public void copyOptJarsToLib(String jarNamePrefix) throws FileNotFoundException, IOException { - final Optional reporterJarOptional; - try (Stream logFiles = Files.walk(opt)) { - reporterJarOptional = logFiles + copyOptJars(jarNamePrefix, lib); + } + + public void copyOptJarsToPlugins(String jarNamePrefix) throws FileNotFoundException, IOException { Review comment: > and modify FlinkDistribution#moveJar to handle this location appropriately. could you clarify? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r395375423 ## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java ## @@ -260,16 +262,26 @@ public void submitSQLJob(SQLJobSubmission job) throws IOException { } public void copyOptJarsToLib(String jarNamePrefix) throws FileNotFoundException, IOException { - final Optional reporterJarOptional; - try (Stream logFiles = Files.walk(opt)) { - reporterJarOptional = logFiles + copyOptJars(jarNamePrefix, lib); + } + + public void copyOptJarsToPlugins(String jarNamePrefix) throws FileNotFoundException, IOException { Review comment: But this is not really "copying jars", right? It will actually move the file from opt to lib or plugins. The problem is that one of the cases I would like to test required the jar to be actually copied to both. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r395348055 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java ## @@ -179,28 +164,82 @@ private static ReporterSetup createReporterSetup(String reporterName, MetricConf metricReporterOptional.ifPresent(reporter -> { MetricConfig metricConfig = new MetricConfig(); reporterConfig.addAllToProperties(metricConfig); - - reporterArguments.add(createReporterSetup(reporterName, metricConfig, reporter)); + reporterSetups.add(createReporterSetup(reporterName, metricConfig, reporter)); }); - } - catch (Throwable t) { + } catch (Throwable t) { LOG.error("Could not instantiate metrics reporter {}. Metrics might not be exposed/reported.", reporterName, t); } } - return reporterArguments; + return reporterSetups; } - private static Map loadReporterFactories() { - final ServiceLoader serviceLoader = ServiceLoader.load(MetricReporterFactory.class); + private static List> loadReporterConfigurations(Configuration configuration, Set namedReporters) { + final List> reporterConfigurations = new ArrayList<>(namedReporters.size()); + for (String namedReporter: namedReporters) { + DelegatingConfiguration delegatingConfiguration = new DelegatingConfiguration( + configuration, + ConfigConstants.METRICS_REPORTER_PREFIX + namedReporter + '.'); + + reporterConfigurations.add(Tuple2.of(namedReporter, delegatingConfiguration)); + } + return reporterConfigurations; + } + + private static Set findEnabledReportersInConfiguration(Configuration configuration, String includedReportersString) { + Set includedReporters = reporterListPattern.splitAsStream(includedReportersString) + .filter(r -> !r.isEmpty()) // splitting an empty string results in an empty string on jdk9+ + .collect(Collectors.toSet()); Review comment: @AHeise I have applied this refactoring but then understood that I probably really did not get what you actually propose. Could you please clarify? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r395368073 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java ## @@ -178,36 +164,96 @@ private static ReporterSetup createReporterSetup(String reporterName, MetricConf metricReporterOptional.ifPresent(reporter -> { MetricConfig metricConfig = new MetricConfig(); reporterConfig.addAllToProperties(metricConfig); - - reporterArguments.add(createReporterSetup(reporterName, metricConfig, reporter)); + reporterSetups.add(createReporterSetup(reporterName, metricConfig, reporter)); }); - } - catch (Throwable t) { + } catch (Throwable t) { LOG.error("Could not instantiate metrics reporter {}. Metrics might not be exposed/reported.", reporterName, t); } } - return reporterArguments; + return reporterSetups; } - private static Map loadReporterFactories() { - final ServiceLoader serviceLoader = ServiceLoader.load(MetricReporterFactory.class); + private static List> loadReporterConfigurations(Configuration configuration, Set namedReporters) { + final List> reporterConfigurations = new ArrayList<>(namedReporters.size()); + + for (String namedReporter: namedReporters) { + DelegatingConfiguration delegatingConfiguration = new DelegatingConfiguration( + configuration, + ConfigConstants.METRICS_REPORTER_PREFIX + namedReporter + '.'); + reporterConfigurations.add(Tuple2.of(namedReporter, delegatingConfiguration)); + } + return reporterConfigurations; + } + + private static Set findEnabledReportersInConfiguration(Configuration configuration, String includedReportersString) { + Set includedReporters = reporterListPattern.splitAsStream(includedReportersString) + .filter(r -> !r.isEmpty()) // splitting an empty string results in an empty string on jdk9+ + .collect(Collectors.toSet()); + + // use a TreeSet to make the reporter order deterministic, which is useful for testing + Set namedOrderedReporters = new TreeSet<>(String::compareTo); + + // scan entire configuration for keys starting with METRICS_REPORTER_PREFIX and determine the set of enabled reporters + for (String key : configuration.keySet()) { + if (key.startsWith(ConfigConstants.METRICS_REPORTER_PREFIX)) { + Matcher matcher = reporterClassPattern.matcher(key); + if (matcher.matches()) { + String reporterName = matcher.group(1); + if (includedReporters.isEmpty() || includedReporters.contains(reporterName)) { + if (namedOrderedReporters.contains(reporterName)) { + LOG.warn("Duplicate class configuration detected for reporter {}.", reporterName); + } else { + namedOrderedReporters.add(reporterName); + } + } else { + LOG.info("Excluding reporter {}, not configured in reporter list ({}).", reporterName, includedReportersString); + } + } + } + } + return namedOrderedReporters; + } + + private static Map loadAvailableReporterFactories(PluginManager pluginManager) { final Map reporterFactories = new HashMap<>(2); - final Iterator factoryIterator = serviceLoader.iterator(); + final Iterator factoryIterator = getAllReporterFactories(pluginManager); + LOG.debug("All available factories (from both SPIs and Plugins):"); + getAllReporterFactories(pluginManager).forEachRemaining(i -> LOG.debug(i.toString())); // do not use streams or for-each loops here because they do not allow catching individual ServiceConfigurationErrors // such an error might be caused if the META-INF/services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r395366353 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java ## @@ -160,13 +161,14 @@ public void startCluster() throws ClusterEntrypointException { LOG.info("Starting {}.", getClass().getSimpleName()); try { - - configureFileSystems(configuration); + //TODO: push down filesystem initialization into runCluster - initializeServices (?) Review comment: If it is something non-trivial and hard to make a call about, I would propose to skip this refactoring for now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r395365114 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java ## @@ -210,17 +213,31 @@ private static ReporterSetup createReporterSetup(String reporterName, MetricConf return namedOrderedReporters; } - private static Map loadReporterFactories() { - final ServiceLoader serviceLoader = ServiceLoader.load(MetricReporterFactory.class); - + private static Map loadAvailableReporterFactories(PluginManager pluginManager) { final Map reporterFactories = new HashMap<>(2); - final Iterator factoryIterator = serviceLoader.iterator(); + final Iterator factoryIterator = getAllReporterFactories(pluginManager); + LOG.info("Prepare reporter factories (from both SPIs and Plugins):"); // do not use streams or for-each loops here because they do not allow catching individual ServiceConfigurationErrors // such an error might be caused if the META-INF/services contains an entry to a non-existing factory class while (factoryIterator.hasNext()) { try { MetricReporterFactory factory = factoryIterator.next(); - reporterFactories.put(factory.getClass().getName(), factory); + String factoryClassName = factory.getClass().getName(); + MetricReporterFactory existingFactory = reporterFactories.get(factoryClassName); + if (existingFactory == null){ + reporterFactories.put(factoryClassName, factory); + LOG.info("Found reporter factory {} at {} ", + factoryClassName, + new File(factory.getClass().getProtectionDomain().getCodeSource().getLocation().toURI()).getCanonicalPath()); + } else { + //TODO: use path information below, when Plugin Classloader stops always prioritizing factories from /lib +// String jarPath1 = new File(existingFactory.getClass().getProtectionDomain().getCodeSource().getLocation() +// .toURI()).getCanonicalPath(); +// String jarPath2 = new File(factory.getClass().getProtectionDomain().getCodeSource().getLocation() +// .toURI()).getCanonicalPath(); +// LOG.warn("Multiple implementations of the same reporter were found: \n {} and \n{}", jarPath1, jarPath2); + LOG.warn("Multiple implementations of the same reporter were found in 'lib' and/or 'plugins' directories for {}. It is recommended to remove redundant reporter JARs to resolve used versions' ambiguity.", factoryClassName); Review comment: I thing the problem is that we have too many variants to describe with a single coordinating conjunction like "or" (plus "or" can unfortunately be both inclusive and exclusive). Cases: (xx) _ () () _ (xx) (x) _ (x) (xx) _ (xx) Alternatives like "a or b or both" also do not work, because in this particular case they can be easily be interpreted as if the "(x) _ (x)" case is not an issue (because of "Multiple" in the beginning). Seems to me like the best case to indicate this ambiguity is is to use "and/or" so that people will be aware of multiple ways this can happen. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r395365114 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java ## @@ -210,17 +213,31 @@ private static ReporterSetup createReporterSetup(String reporterName, MetricConf return namedOrderedReporters; } - private static Map loadReporterFactories() { - final ServiceLoader serviceLoader = ServiceLoader.load(MetricReporterFactory.class); - + private static Map loadAvailableReporterFactories(PluginManager pluginManager) { final Map reporterFactories = new HashMap<>(2); - final Iterator factoryIterator = serviceLoader.iterator(); + final Iterator factoryIterator = getAllReporterFactories(pluginManager); + LOG.info("Prepare reporter factories (from both SPIs and Plugins):"); // do not use streams or for-each loops here because they do not allow catching individual ServiceConfigurationErrors // such an error might be caused if the META-INF/services contains an entry to a non-existing factory class while (factoryIterator.hasNext()) { try { MetricReporterFactory factory = factoryIterator.next(); - reporterFactories.put(factory.getClass().getName(), factory); + String factoryClassName = factory.getClass().getName(); + MetricReporterFactory existingFactory = reporterFactories.get(factoryClassName); + if (existingFactory == null){ + reporterFactories.put(factoryClassName, factory); + LOG.info("Found reporter factory {} at {} ", + factoryClassName, + new File(factory.getClass().getProtectionDomain().getCodeSource().getLocation().toURI()).getCanonicalPath()); + } else { + //TODO: use path information below, when Plugin Classloader stops always prioritizing factories from /lib +// String jarPath1 = new File(existingFactory.getClass().getProtectionDomain().getCodeSource().getLocation() +// .toURI()).getCanonicalPath(); +// String jarPath2 = new File(factory.getClass().getProtectionDomain().getCodeSource().getLocation() +// .toURI()).getCanonicalPath(); +// LOG.warn("Multiple implementations of the same reporter were found: \n {} and \n{}", jarPath1, jarPath2); + LOG.warn("Multiple implementations of the same reporter were found in 'lib' and/or 'plugins' directories for {}. It is recommended to remove redundant reporter JARs to resolve used versions' ambiguity.", factoryClassName); Review comment: I thing the problem is that we have too many variants to describe with a single coordinating conjunction like "or" (plus "or" can unfortunately be both inclusive and exclusive). Cases: (xx) _ () () _ (xx) (x) _ (x) (xx) _ (xx) Alternatives like "a or b or both" also do not work, because in this particular case they can be easily be interpreted as if the "(x) _ (x)" case is not an issue (because of "Multiple" in the beginning). Seems to me like the best case to indicate this ambiguity is is to use "and/or" so that people will be aware for what kinds of problem potentially to look for. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r395365114 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java ## @@ -210,17 +213,31 @@ private static ReporterSetup createReporterSetup(String reporterName, MetricConf return namedOrderedReporters; } - private static Map loadReporterFactories() { - final ServiceLoader serviceLoader = ServiceLoader.load(MetricReporterFactory.class); - + private static Map loadAvailableReporterFactories(PluginManager pluginManager) { final Map reporterFactories = new HashMap<>(2); - final Iterator factoryIterator = serviceLoader.iterator(); + final Iterator factoryIterator = getAllReporterFactories(pluginManager); + LOG.info("Prepare reporter factories (from both SPIs and Plugins):"); // do not use streams or for-each loops here because they do not allow catching individual ServiceConfigurationErrors // such an error might be caused if the META-INF/services contains an entry to a non-existing factory class while (factoryIterator.hasNext()) { try { MetricReporterFactory factory = factoryIterator.next(); - reporterFactories.put(factory.getClass().getName(), factory); + String factoryClassName = factory.getClass().getName(); + MetricReporterFactory existingFactory = reporterFactories.get(factoryClassName); + if (existingFactory == null){ + reporterFactories.put(factoryClassName, factory); + LOG.info("Found reporter factory {} at {} ", + factoryClassName, + new File(factory.getClass().getProtectionDomain().getCodeSource().getLocation().toURI()).getCanonicalPath()); + } else { + //TODO: use path information below, when Plugin Classloader stops always prioritizing factories from /lib +// String jarPath1 = new File(existingFactory.getClass().getProtectionDomain().getCodeSource().getLocation() +// .toURI()).getCanonicalPath(); +// String jarPath2 = new File(factory.getClass().getProtectionDomain().getCodeSource().getLocation() +// .toURI()).getCanonicalPath(); +// LOG.warn("Multiple implementations of the same reporter were found: \n {} and \n{}", jarPath1, jarPath2); + LOG.warn("Multiple implementations of the same reporter were found in 'lib' and/or 'plugins' directories for {}. It is recommended to remove redundant reporter JARs to resolve used versions' ambiguity.", factoryClassName); Review comment: I thing the problem is that we have too many variants to describe with a single coordinating conjunction like "or" (plus "or" can unfortunately be both inclusive and exclusive). Cases: (xx) _ () () _ (xx) (x) _ (x) (xx) _ (xx) Alternatives like "x or y or both" also do not work, because in this particular case they can be easily be interpreted as if the "(x) _ (x)" case is not an issue (because of "Multiple" in the beginning). Seems to me like the best case to indicate this ambiguity is is to use "and/or" so that people will be aware for what kinds of problem potentially to look for. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r395360427 ## File path: flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java ## @@ -120,24 +122,51 @@ public static void checkOS() { public final DownloadCache downloadCache = DownloadCache.get(); @Test - public void testReporter() throws Exception { - dist.copyOptJarsToLib("flink-metrics-prometheus"); + public void reporterWorksWhenFoundInLibsViaReflection() throws Exception { + dist.copyOptJarsToLib(PROMETHEUS_JAR_PREFIX); + testReporter(false); + } + + @Test + public void reporterWorksWhenFoundInPluginsViaReflection() throws Exception { + dist.copyOptJarsToPlugins(PROMETHEUS_JAR_PREFIX); + testReporter(false); + } + + @Test + public void reporterWorksWhenFoundInPluginsViaFactories() throws Exception { + dist.copyOptJarsToPlugins(PROMETHEUS_JAR_PREFIX); + testReporter(true); + } + @Test + public void reporterWorksWhenFoundBothInPluginsAndLibsViaFactories() throws Exception { + dist.copyOptJarsToPlugins(PROMETHEUS_JAR_PREFIX); + dist.copyOptJarsToLib(PROMETHEUS_JAR_PREFIX); + testReporter(true); + } + + private void testReporter(boolean useFactory) throws Exception { final Configuration config = new Configuration(); - config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "prom." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, PrometheusReporter.class.getCanonicalName()); + + if (useFactory) { + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "prom." + ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX, PrometheusReporterFactory.class.getName()); + } else { + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "prom." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, PrometheusReporter.class.getCanonicalName()); + } + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "prom.port", "9000-9100"); dist.appendConfiguration(config); final Path tmpPrometheusDir = tmp.newFolder().toPath().resolve("prometheus"); - final Path prometheusArchive = tmpPrometheusDir.resolve(PROMETHEUS_FILE_NAME + ".tar.gz"); final Path prometheusBinDir = tmpPrometheusDir.resolve(PROMETHEUS_FILE_NAME); final Path prometheusConfig = prometheusBinDir.resolve("prometheus.yml"); final Path prometheusBinary = prometheusBinDir.resolve("prometheus"); Files.createDirectory(tmpPrometheusDir); - downloadCache.getOrDownload( - "https://github.com/prometheus/prometheus/releases/download/v; + PROMETHEUS_VERSION + '/' + prometheusArchive.getFileName(), + final Path prometheusArchive = downloadCache.getOrDownload( Review comment: Split as requested. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r395348055 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java ## @@ -179,28 +164,82 @@ private static ReporterSetup createReporterSetup(String reporterName, MetricConf metricReporterOptional.ifPresent(reporter -> { MetricConfig metricConfig = new MetricConfig(); reporterConfig.addAllToProperties(metricConfig); - - reporterArguments.add(createReporterSetup(reporterName, metricConfig, reporter)); + reporterSetups.add(createReporterSetup(reporterName, metricConfig, reporter)); }); - } - catch (Throwable t) { + } catch (Throwable t) { LOG.error("Could not instantiate metrics reporter {}. Metrics might not be exposed/reported.", reporterName, t); } } - return reporterArguments; + return reporterSetups; } - private static Map loadReporterFactories() { - final ServiceLoader serviceLoader = ServiceLoader.load(MetricReporterFactory.class); + private static List> loadReporterConfigurations(Configuration configuration, Set namedReporters) { + final List> reporterConfigurations = new ArrayList<>(namedReporters.size()); + for (String namedReporter: namedReporters) { + DelegatingConfiguration delegatingConfiguration = new DelegatingConfiguration( + configuration, + ConfigConstants.METRICS_REPORTER_PREFIX + namedReporter + '.'); + + reporterConfigurations.add(Tuple2.of(namedReporter, delegatingConfiguration)); + } + return reporterConfigurations; + } + + private static Set findEnabledReportersInConfiguration(Configuration configuration, String includedReportersString) { + Set includedReporters = reporterListPattern.splitAsStream(includedReportersString) + .filter(r -> !r.isEmpty()) // splitting an empty string results in an empty string on jdk9+ + .collect(Collectors.toSet()); Review comment: @AHeise I have applied this refactoring but then understood that I probably did not get what you actually propose. Could you please clarify? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r395348055 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java ## @@ -179,28 +164,82 @@ private static ReporterSetup createReporterSetup(String reporterName, MetricConf metricReporterOptional.ifPresent(reporter -> { MetricConfig metricConfig = new MetricConfig(); reporterConfig.addAllToProperties(metricConfig); - - reporterArguments.add(createReporterSetup(reporterName, metricConfig, reporter)); + reporterSetups.add(createReporterSetup(reporterName, metricConfig, reporter)); }); - } - catch (Throwable t) { + } catch (Throwable t) { LOG.error("Could not instantiate metrics reporter {}. Metrics might not be exposed/reported.", reporterName, t); } } - return reporterArguments; + return reporterSetups; } - private static Map loadReporterFactories() { - final ServiceLoader serviceLoader = ServiceLoader.load(MetricReporterFactory.class); + private static List> loadReporterConfigurations(Configuration configuration, Set namedReporters) { + final List> reporterConfigurations = new ArrayList<>(namedReporters.size()); + for (String namedReporter: namedReporters) { + DelegatingConfiguration delegatingConfiguration = new DelegatingConfiguration( + configuration, + ConfigConstants.METRICS_REPORTER_PREFIX + namedReporter + '.'); + + reporterConfigurations.add(Tuple2.of(namedReporter, delegatingConfiguration)); + } + return reporterConfigurations; + } + + private static Set findEnabledReportersInConfiguration(Configuration configuration, String includedReportersString) { + Set includedReporters = reporterListPattern.splitAsStream(includedReportersString) + .filter(r -> !r.isEmpty()) // splitting an empty string results in an empty string on jdk9+ + .collect(Collectors.toSet()); Review comment: @AHeise I have applied this refactoring but then understood that I probably did not get what you actually propose. Could you clarify? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r395337241 ## File path: flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporterFactory.java ## @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.prometheus; + +import org.apache.flink.core.plugin.Plugin; +import org.apache.flink.metrics.reporter.MetricReporterFactory; + +import java.util.Properties; + +/** + * {@link MetricReporterFactory} for {@link PrometheusReporter}. + */ +public class PrometheusReporterFactory implements MetricReporterFactory, Plugin { Review comment: Added. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r395335333 ## File path: flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporterFactory.java ## @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.prometheus; + +import org.apache.flink.core.plugin.Plugin; +import org.apache.flink.metrics.reporter.MetricReporterFactory; + +import java.util.Properties; + +/** + * {@link MetricReporterFactory} for {@link PrometheusReporter}. + */ +public class PrometheusReporterFactory implements MetricReporterFactory, Plugin { + + @Override + public PrometheusReporter createMetricReporter(Properties properties) { + return new PrometheusReporter(); Review comment: It seems that this will pull a rather large refactoring with it, because of the call to `super.open(config)` in the `open` methods and because of having to reconcile different configuration containers - `Properties` vs `MetricsConfig`. I would prefer to address it in a separate refactoring PR, if possible. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r395326037 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java ## @@ -210,17 +213,31 @@ private static ReporterSetup createReporterSetup(String reporterName, MetricConf return namedOrderedReporters; } - private static Map loadReporterFactories() { - final ServiceLoader serviceLoader = ServiceLoader.load(MetricReporterFactory.class); - + private static Map loadAvailableReporterFactories(PluginManager pluginManager) { final Map reporterFactories = new HashMap<>(2); - final Iterator factoryIterator = serviceLoader.iterator(); + final Iterator factoryIterator = getAllReporterFactories(pluginManager); + LOG.info("Prepare reporter factories (from both SPIs and Plugins):"); // do not use streams or for-each loops here because they do not allow catching individual ServiceConfigurationErrors // such an error might be caused if the META-INF/services contains an entry to a non-existing factory class while (factoryIterator.hasNext()) { try { MetricReporterFactory factory = factoryIterator.next(); - reporterFactories.put(factory.getClass().getName(), factory); + String factoryClassName = factory.getClass().getName(); + MetricReporterFactory existingFactory = reporterFactories.get(factoryClassName); + if (existingFactory == null){ + reporterFactories.put(factoryClassName, factory); + LOG.info("Found reporter factory {} at {} ", + factoryClassName, + new File(factory.getClass().getProtectionDomain().getCodeSource().getLocation().toURI()).getCanonicalPath()); + } else { + //TODO: use path information below, when Plugin Classloader stops always prioritizing factories from /lib +// String jarPath1 = new File(existingFactory.getClass().getProtectionDomain().getCodeSource().getLocation() +// .toURI()).getCanonicalPath(); +// String jarPath2 = new File(factory.getClass().getProtectionDomain().getCodeSource().getLocation() +// .toURI()).getCanonicalPath(); +// LOG.warn("Multiple implementations of the same reporter were found: \n {} and \n{}", jarPath1, jarPath2); + LOG.warn("Multiple implementations of the same reporter were found in 'lib' and/or 'plugins' directories for {}. It is recommended to remove redundant reporter JARs to resolve used versions' ambiguity.", factoryClassName); Review comment: https://en.wikipedia.org/wiki/And/or `It is used as an inclusive "or" (as in logic and mathematics), while an "or" in spoken language might be inclusive or exclusive.` Seems to me like something that reduces ambiguity, as @zentol said. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r395323032 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java ## @@ -178,36 +164,96 @@ private static ReporterSetup createReporterSetup(String reporterName, MetricConf metricReporterOptional.ifPresent(reporter -> { MetricConfig metricConfig = new MetricConfig(); reporterConfig.addAllToProperties(metricConfig); - - reporterArguments.add(createReporterSetup(reporterName, metricConfig, reporter)); + reporterSetups.add(createReporterSetup(reporterName, metricConfig, reporter)); }); - } - catch (Throwable t) { + } catch (Throwable t) { Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r393563258 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java ## @@ -160,13 +161,14 @@ public void startCluster() throws ClusterEntrypointException { LOG.info("Starting {}.", getClass().getSimpleName()); try { - - configureFileSystems(configuration); + //TODO: push down filesystem initialization into runCluster - initializeServices (?) Review comment: @AHeise I wanted to ask if what is written in TODO is a good idea in your opinion. I am not sure about the implications of initializing FileSystems within `runSecured`. I do not quite like that initialization of services (initializeServices) and file systems happen in different places. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r395299396 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java ## @@ -120,55 +124,37 @@ public static ReporterSetup forReporter(String reporterName, MetricConfig metric } private static ReporterSetup createReporterSetup(String reporterName, MetricConfig metricConfig, MetricReporter reporter) { - LOG.info("Configuring {} with {}.", reporterName, metricConfig); + LOG.debug("Configuring {} with {}.", reporterName, metricConfig); reporter.open(metricConfig); return new ReporterSetup(reporterName, metricConfig, reporter); } - public static List fromConfiguration(final Configuration configuration) { + public static List fromConfiguration(final Configuration configuration, final PluginManager pluginManager) { + LOG.debug("Initializing Reporters from Configuration: {}", configuration); String includedReportersString = configuration.getString(MetricOptions.REPORTERS_LIST, ""); - Set includedReporters = reporterListPattern.splitAsStream(includedReportersString) Review comment: As this did not come up in the second round I consider the new split of commits as appropriate. Resolving. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r395298324 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java ## @@ -178,36 +164,96 @@ private static ReporterSetup createReporterSetup(String reporterName, MetricConf metricReporterOptional.ifPresent(reporter -> { MetricConfig metricConfig = new MetricConfig(); reporterConfig.addAllToProperties(metricConfig); - - reporterArguments.add(createReporterSetup(reporterName, metricConfig, reporter)); + reporterSetups.add(createReporterSetup(reporterName, metricConfig, reporter)); }); - } - catch (Throwable t) { + } catch (Throwable t) { Review comment: @zentol Ok, I find it a bit strange, but I am not setting the rules here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r393950683 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java ## @@ -120,55 +124,37 @@ public static ReporterSetup forReporter(String reporterName, MetricConfig metric } private static ReporterSetup createReporterSetup(String reporterName, MetricConfig metricConfig, MetricReporter reporter) { - LOG.info("Configuring {} with {}.", reporterName, metricConfig); + LOG.debug("Configuring {} with {}.", reporterName, metricConfig); Review comment: Removed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r393950579 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java ## @@ -120,55 +124,37 @@ public static ReporterSetup forReporter(String reporterName, MetricConfig metric } private static ReporterSetup createReporterSetup(String reporterName, MetricConfig metricConfig, MetricReporter reporter) { - LOG.info("Configuring {} with {}.", reporterName, metricConfig); + LOG.debug("Configuring {} with {}.", reporterName, metricConfig); reporter.open(metricConfig); return new ReporterSetup(reporterName, metricConfig, reporter); } - public static List fromConfiguration(final Configuration configuration) { + public static List fromConfiguration(final Configuration configuration, final PluginManager pluginManager) { + LOG.debug("Initializing Reporters from Configuration: {}", configuration); String includedReportersString = configuration.getString(MetricOptions.REPORTERS_LIST, ""); - Set includedReporters = reporterListPattern.splitAsStream(includedReportersString) - .filter(r -> !r.isEmpty()) // splitting an empty string results in an empty string on jdk9+ - .collect(Collectors.toSet()); - // use a TreeSet to make the reporter order deterministic, which is useful for testing - Set namedReporters = new TreeSet<>(String::compareTo); - // scan entire configuration for "metric.reporter" keys and parse individual reporter configurations - for (String key : configuration.keySet()) { - if (key.startsWith(ConfigConstants.METRICS_REPORTER_PREFIX)) { - Matcher matcher = reporterClassPattern.matcher(key); - if (matcher.matches()) { - String reporterName = matcher.group(1); - if (includedReporters.isEmpty() || includedReporters.contains(reporterName)) { - if (namedReporters.contains(reporterName)) { - LOG.warn("Duplicate class configuration detected for reporter {}.", reporterName); - } else { - namedReporters.add(reporterName); - } - } else { - LOG.info("Excluding reporter {}, not configured in reporter list ({}).", reporterName, includedReportersString); - } - } - } - } + Set namedReporters = findEnabledReportersInConfiguration(configuration, + includedReportersString); if (namedReporters.isEmpty()) { return Collections.emptyList(); } - List> reporterConfigurations = new ArrayList<>(namedReporters.size()); + final Map reporterFactories = loadAvailableReporterFactories(pluginManager); + LOG.debug("Loaded Reporter Factories: {}", reporterFactories); + final List> reporterConfigurations = loadReporterConfigurations(configuration, namedReporters); + LOG.debug("Loaded Reporter Configurations: {}", reporterConfigurations); Review comment: Addressed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r393950508 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java ## @@ -120,55 +124,37 @@ public static ReporterSetup forReporter(String reporterName, MetricConfig metric } private static ReporterSetup createReporterSetup(String reporterName, MetricConfig metricConfig, MetricReporter reporter) { - LOG.info("Configuring {} with {}.", reporterName, metricConfig); + LOG.debug("Configuring {} with {}.", reporterName, metricConfig); reporter.open(metricConfig); return new ReporterSetup(reporterName, metricConfig, reporter); } - public static List fromConfiguration(final Configuration configuration) { + public static List fromConfiguration(final Configuration configuration, final PluginManager pluginManager) { + LOG.debug("Initializing Reporters from Configuration: {}", configuration); String includedReportersString = configuration.getString(MetricOptions.REPORTERS_LIST, ""); - Set includedReporters = reporterListPattern.splitAsStream(includedReportersString) - .filter(r -> !r.isEmpty()) // splitting an empty string results in an empty string on jdk9+ - .collect(Collectors.toSet()); - // use a TreeSet to make the reporter order deterministic, which is useful for testing - Set namedReporters = new TreeSet<>(String::compareTo); - // scan entire configuration for "metric.reporter" keys and parse individual reporter configurations - for (String key : configuration.keySet()) { - if (key.startsWith(ConfigConstants.METRICS_REPORTER_PREFIX)) { - Matcher matcher = reporterClassPattern.matcher(key); - if (matcher.matches()) { - String reporterName = matcher.group(1); - if (includedReporters.isEmpty() || includedReporters.contains(reporterName)) { - if (namedReporters.contains(reporterName)) { - LOG.warn("Duplicate class configuration detected for reporter {}.", reporterName); - } else { - namedReporters.add(reporterName); - } - } else { - LOG.info("Excluding reporter {}, not configured in reporter list ({}).", reporterName, includedReportersString); - } - } - } - } + Set namedReporters = findEnabledReportersInConfiguration(configuration, + includedReportersString); if (namedReporters.isEmpty()) { return Collections.emptyList(); } - List> reporterConfigurations = new ArrayList<>(namedReporters.size()); + final Map reporterFactories = loadAvailableReporterFactories(pluginManager); + LOG.debug("Loaded Reporter Factories: {}", reporterFactories); + final List> reporterConfigurations = loadReporterConfigurations(configuration, namedReporters); + LOG.debug("Loaded Reporter Configurations: {}", reporterConfigurations); - for (String namedReporter: namedReporters) { - DelegatingConfiguration delegatingConfiguration = new DelegatingConfiguration( - configuration, - ConfigConstants.METRICS_REPORTER_PREFIX + namedReporter + '.'); + List reporterSetups = setupReporters(reporterFactories, reporterConfigurations); + LOG.debug("All initialized Reporters:"); + reporterSetups.forEach(i -> LOG.debug("{} - {}", i.getName(), i.getConfiguration())); Review comment: Addressed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r393950810 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java ## @@ -120,55 +124,37 @@ public static ReporterSetup forReporter(String reporterName, MetricConfig metric } private static ReporterSetup createReporterSetup(String reporterName, MetricConfig metricConfig, MetricReporter reporter) { - LOG.info("Configuring {} with {}.", reporterName, metricConfig); + LOG.debug("Configuring {} with {}.", reporterName, metricConfig); reporter.open(metricConfig); return new ReporterSetup(reporterName, metricConfig, reporter); } - public static List fromConfiguration(final Configuration configuration) { + public static List fromConfiguration(final Configuration configuration, final PluginManager pluginManager) { + LOG.debug("Initializing Reporters from Configuration: {}", configuration); Review comment: Removed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r393848386 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java ## @@ -179,28 +164,82 @@ private static ReporterSetup createReporterSetup(String reporterName, MetricConf metricReporterOptional.ifPresent(reporter -> { MetricConfig metricConfig = new MetricConfig(); reporterConfig.addAllToProperties(metricConfig); - - reporterArguments.add(createReporterSetup(reporterName, metricConfig, reporter)); + reporterSetups.add(createReporterSetup(reporterName, metricConfig, reporter)); }); - } - catch (Throwable t) { + } catch (Throwable t) { LOG.error("Could not instantiate metrics reporter {}. Metrics might not be exposed/reported.", reporterName, t); } } - return reporterArguments; + return reporterSetups; } - private static Map loadReporterFactories() { - final ServiceLoader serviceLoader = ServiceLoader.load(MetricReporterFactory.class); + private static List> loadReporterConfigurations(Configuration configuration, Set namedReporters) { + final List> reporterConfigurations = new ArrayList<>(namedReporters.size()); + for (String namedReporter: namedReporters) { + DelegatingConfiguration delegatingConfiguration = new DelegatingConfiguration( + configuration, + ConfigConstants.METRICS_REPORTER_PREFIX + namedReporter + '.'); + + reporterConfigurations.add(Tuple2.of(namedReporter, delegatingConfiguration)); + } + return reporterConfigurations; + } + + private static Set findEnabledReportersInConfiguration(Configuration configuration, String includedReportersString) { + Set includedReporters = reporterListPattern.splitAsStream(includedReportersString) + .filter(r -> !r.isEmpty()) // splitting an empty string results in an empty string on jdk9+ + .collect(Collectors.toSet()); Review comment: Thanks, changed as proposed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r393848386 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java ## @@ -179,28 +164,82 @@ private static ReporterSetup createReporterSetup(String reporterName, MetricConf metricReporterOptional.ifPresent(reporter -> { MetricConfig metricConfig = new MetricConfig(); reporterConfig.addAllToProperties(metricConfig); - - reporterArguments.add(createReporterSetup(reporterName, metricConfig, reporter)); + reporterSetups.add(createReporterSetup(reporterName, metricConfig, reporter)); }); - } - catch (Throwable t) { + } catch (Throwable t) { LOG.error("Could not instantiate metrics reporter {}. Metrics might not be exposed/reported.", reporterName, t); } } - return reporterArguments; + return reporterSetups; } - private static Map loadReporterFactories() { - final ServiceLoader serviceLoader = ServiceLoader.load(MetricReporterFactory.class); + private static List> loadReporterConfigurations(Configuration configuration, Set namedReporters) { + final List> reporterConfigurations = new ArrayList<>(namedReporters.size()); + for (String namedReporter: namedReporters) { + DelegatingConfiguration delegatingConfiguration = new DelegatingConfiguration( + configuration, + ConfigConstants.METRICS_REPORTER_PREFIX + namedReporter + '.'); + + reporterConfigurations.add(Tuple2.of(namedReporter, delegatingConfiguration)); + } + return reporterConfigurations; + } + + private static Set findEnabledReportersInConfiguration(Configuration configuration, String includedReportersString) { + Set includedReporters = reporterListPattern.splitAsStream(includedReportersString) + .filter(r -> !r.isEmpty()) // splitting an empty string results in an empty string on jdk9+ + .collect(Collectors.toSet()); Review comment: Thanks, changes as proposed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r393843746 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java ## @@ -202,9 +204,11 @@ private SecurityContext installSecurityContext(Configuration configuration) thro return SecurityUtils.getInstalledContext(); } - private void runCluster(Configuration configuration) throws Exception { + private void runCluster(Configuration configuration, PluginManager pluginManager) throws Exception { synchronized (lock) { - initializeServices(configuration); + + //TODO: Ask why FileSystem is not initialized here too. Review comment: See above. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r393842876 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java ## @@ -179,28 +164,82 @@ private static ReporterSetup createReporterSetup(String reporterName, MetricConf metricReporterOptional.ifPresent(reporter -> { MetricConfig metricConfig = new MetricConfig(); reporterConfig.addAllToProperties(metricConfig); - - reporterArguments.add(createReporterSetup(reporterName, metricConfig, reporter)); + reporterSetups.add(createReporterSetup(reporterName, metricConfig, reporter)); }); - } - catch (Throwable t) { + } catch (Throwable t) { LOG.error("Could not instantiate metrics reporter {}. Metrics might not be exposed/reported.", reporterName, t); } } - return reporterArguments; + return reporterSetups; } - private static Map loadReporterFactories() { - final ServiceLoader serviceLoader = ServiceLoader.load(MetricReporterFactory.class); + private static List> loadReporterConfigurations(Configuration configuration, Set namedReporters) { + final List> reporterConfigurations = new ArrayList<>(namedReporters.size()); + for (String namedReporter: namedReporters) { + DelegatingConfiguration delegatingConfiguration = new DelegatingConfiguration( + configuration, + ConfigConstants.METRICS_REPORTER_PREFIX + namedReporter + '.'); + + reporterConfigurations.add(Tuple2.of(namedReporter, delegatingConfiguration)); + } + return reporterConfigurations; + } + + private static Set findEnabledReportersInConfiguration(Configuration configuration, String includedReportersString) { + Set includedReporters = reporterListPattern.splitAsStream(includedReportersString) + .filter(r -> !r.isEmpty()) // splitting an empty string results in an empty string on jdk9+ + .collect(Collectors.toSet()); + + // use a TreeSet to make the reporter order deterministic, which is useful for testing + Set namedOrderedReporters = new TreeSet<>(String::compareTo); + + // scan entire configuration for keys starting with METRICS_REPORTER_PREFIX and determine the set of enabled reporters + for (String key : configuration.keySet()) { + if (key.startsWith(ConfigConstants.METRICS_REPORTER_PREFIX)) { + Matcher matcher = reporterClassPattern.matcher(key); + if (matcher.matches()) { Review comment: Seems like a silent skip . This is old code, maybe @zentol could comment. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r393840952 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java ## @@ -120,55 +124,37 @@ public static ReporterSetup forReporter(String reporterName, MetricConfig metric } private static ReporterSetup createReporterSetup(String reporterName, MetricConfig metricConfig, MetricReporter reporter) { - LOG.info("Configuring {} with {}.", reporterName, metricConfig); + LOG.debug("Configuring {} with {}.", reporterName, metricConfig); reporter.open(metricConfig); return new ReporterSetup(reporterName, metricConfig, reporter); } - public static List fromConfiguration(final Configuration configuration) { + public static List fromConfiguration(final Configuration configuration, final PluginManager pluginManager) { + LOG.debug("Initializing Reporters from Configuration: {}", configuration); String includedReportersString = configuration.getString(MetricOptions.REPORTERS_LIST, ""); - Set includedReporters = reporterListPattern.splitAsStream(includedReportersString) Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r393840337 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java ## @@ -120,55 +124,37 @@ public static ReporterSetup forReporter(String reporterName, MetricConfig metric } private static ReporterSetup createReporterSetup(String reporterName, MetricConfig metricConfig, MetricReporter reporter) { - LOG.info("Configuring {} with {}.", reporterName, metricConfig); + LOG.debug("Configuring {} with {}.", reporterName, metricConfig); reporter.open(metricConfig); return new ReporterSetup(reporterName, metricConfig, reporter); } - public static List fromConfiguration(final Configuration configuration) { + public static List fromConfiguration(final Configuration configuration, final PluginManager pluginManager) { + LOG.debug("Initializing Reporters from Configuration: {}", configuration); String includedReportersString = configuration.getString(MetricOptions.REPORTERS_LIST, ""); - Set includedReporters = reporterListPattern.splitAsStream(includedReportersString) Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r393840337 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java ## @@ -120,55 +124,37 @@ public static ReporterSetup forReporter(String reporterName, MetricConfig metric } private static ReporterSetup createReporterSetup(String reporterName, MetricConfig metricConfig, MetricReporter reporter) { - LOG.info("Configuring {} with {}.", reporterName, metricConfig); + LOG.debug("Configuring {} with {}.", reporterName, metricConfig); reporter.open(metricConfig); return new ReporterSetup(reporterName, metricConfig, reporter); } - public static List fromConfiguration(final Configuration configuration) { + public static List fromConfiguration(final Configuration configuration, final PluginManager pluginManager) { + LOG.debug("Initializing Reporters from Configuration: {}", configuration); String includedReportersString = configuration.getString(MetricOptions.REPORTERS_LIST, ""); - Set includedReporters = reporterListPattern.splitAsStream(includedReportersString) Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r393839754 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java ## @@ -120,55 +124,37 @@ public static ReporterSetup forReporter(String reporterName, MetricConfig metric } private static ReporterSetup createReporterSetup(String reporterName, MetricConfig metricConfig, MetricReporter reporter) { - LOG.info("Configuring {} with {}.", reporterName, metricConfig); + LOG.debug("Configuring {} with {}.", reporterName, metricConfig); Review comment: Ping @AHeise @zentol - could we agree on something here? I am always "pro extensive logging", but this could be professional deformation. Being able to "on-demand" see what is going on is very valuable for production systems. We could declare somewhere for Flink in general that if you choose to run with debug log level, some potentially sensitive information could leak into logs. My arguments are: 1) If someone has uncontrolled access to the log files on your machine in production, content of this file is probably not the biggest of your problems. 2) Running with debug level is not a "normal" scenario - this is intended for hands on investigation of issues. Log level for potentially compromisable external systems could be explicitly set to trace in such cases. 3) We have been "leaking" this data in the current versions with info (!) level without much concern This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r393832500 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java ## @@ -178,36 +164,96 @@ private static ReporterSetup createReporterSetup(String reporterName, MetricConf metricReporterOptional.ifPresent(reporter -> { MetricConfig metricConfig = new MetricConfig(); reporterConfig.addAllToProperties(metricConfig); - - reporterArguments.add(createReporterSetup(reporterName, metricConfig, reporter)); + reporterSetups.add(createReporterSetup(reporterName, metricConfig, reporter)); }); - } - catch (Throwable t) { + } catch (Throwable t) { Review comment: This whole code block was already "touched" anyhow, because of refactoring, so I think it should be OK to do such things, unless you have a strong opinion. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r393832500 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java ## @@ -178,36 +164,96 @@ private static ReporterSetup createReporterSetup(String reporterName, MetricConf metricReporterOptional.ifPresent(reporter -> { MetricConfig metricConfig = new MetricConfig(); reporterConfig.addAllToProperties(metricConfig); - - reporterArguments.add(createReporterSetup(reporterName, metricConfig, reporter)); + reporterSetups.add(createReporterSetup(reporterName, metricConfig, reporter)); }); - } - catch (Throwable t) { + } catch (Throwable t) { Review comment: This code was touched anyhow, because of refactoring, so I think it should be OK to do such things, unless you have a strong opinion about this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r393832500 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java ## @@ -178,36 +164,96 @@ private static ReporterSetup createReporterSetup(String reporterName, MetricConf metricReporterOptional.ifPresent(reporter -> { MetricConfig metricConfig = new MetricConfig(); reporterConfig.addAllToProperties(metricConfig); - - reporterArguments.add(createReporterSetup(reporterName, metricConfig, reporter)); + reporterSetups.add(createReporterSetup(reporterName, metricConfig, reporter)); }); - } - catch (Throwable t) { + } catch (Throwable t) { Review comment: This code was touched anyhow, because of refactoring, so I think it should be OK to do such things, unless you have a strong opinion. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r393580453 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java ## @@ -178,36 +164,96 @@ private static ReporterSetup createReporterSetup(String reporterName, MetricConf metricReporterOptional.ifPresent(reporter -> { MetricConfig metricConfig = new MetricConfig(); reporterConfig.addAllToProperties(metricConfig); - - reporterArguments.add(createReporterSetup(reporterName, metricConfig, reporter)); + reporterSetups.add(createReporterSetup(reporterName, metricConfig, reporter)); }); - } - catch (Throwable t) { + } catch (Throwable t) { LOG.error("Could not instantiate metrics reporter {}. Metrics might not be exposed/reported.", reporterName, t); } } - return reporterArguments; + return reporterSetups; } - private static Map loadReporterFactories() { - final ServiceLoader serviceLoader = ServiceLoader.load(MetricReporterFactory.class); + private static List> loadReporterConfigurations(Configuration configuration, Set namedReporters) { + final List> reporterConfigurations = new ArrayList<>(namedReporters.size()); + + for (String namedReporter: namedReporters) { + DelegatingConfiguration delegatingConfiguration = new DelegatingConfiguration( + configuration, + ConfigConstants.METRICS_REPORTER_PREFIX + namedReporter + '.'); + reporterConfigurations.add(Tuple2.of(namedReporter, delegatingConfiguration)); + } + return reporterConfigurations; + } + + private static Set findEnabledReportersInConfiguration(Configuration configuration, String includedReportersString) { + Set includedReporters = reporterListPattern.splitAsStream(includedReportersString) + .filter(r -> !r.isEmpty()) // splitting an empty string results in an empty string on jdk9+ + .collect(Collectors.toSet()); + + // use a TreeSet to make the reporter order deterministic, which is useful for testing + Set namedOrderedReporters = new TreeSet<>(String::compareTo); + + // scan entire configuration for keys starting with METRICS_REPORTER_PREFIX and determine the set of enabled reporters + for (String key : configuration.keySet()) { + if (key.startsWith(ConfigConstants.METRICS_REPORTER_PREFIX)) { + Matcher matcher = reporterClassPattern.matcher(key); + if (matcher.matches()) { + String reporterName = matcher.group(1); + if (includedReporters.isEmpty() || includedReporters.contains(reporterName)) { + if (namedOrderedReporters.contains(reporterName)) { + LOG.warn("Duplicate class configuration detected for reporter {}.", reporterName); + } else { + namedOrderedReporters.add(reporterName); + } + } else { + LOG.info("Excluding reporter {}, not configured in reporter list ({}).", reporterName, includedReportersString); + } + } + } + } + return namedOrderedReporters; + } + + private static Map loadAvailableReporterFactories(PluginManager pluginManager) { final Map reporterFactories = new HashMap<>(2); - final Iterator factoryIterator = serviceLoader.iterator(); + final Iterator factoryIterator = getAllReporterFactories(pluginManager); + LOG.debug("All available factories (from both SPIs and Plugins):"); + getAllReporterFactories(pluginManager).forEachRemaining(i -> LOG.debug(i.toString())); // do not use streams or for-each loops here because they do not allow catching individual ServiceConfigurationErrors // such an error might be caused if the META-INF/services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r393580453 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java ## @@ -178,36 +164,96 @@ private static ReporterSetup createReporterSetup(String reporterName, MetricConf metricReporterOptional.ifPresent(reporter -> { MetricConfig metricConfig = new MetricConfig(); reporterConfig.addAllToProperties(metricConfig); - - reporterArguments.add(createReporterSetup(reporterName, metricConfig, reporter)); + reporterSetups.add(createReporterSetup(reporterName, metricConfig, reporter)); }); - } - catch (Throwable t) { + } catch (Throwable t) { LOG.error("Could not instantiate metrics reporter {}. Metrics might not be exposed/reported.", reporterName, t); } } - return reporterArguments; + return reporterSetups; } - private static Map loadReporterFactories() { - final ServiceLoader serviceLoader = ServiceLoader.load(MetricReporterFactory.class); + private static List> loadReporterConfigurations(Configuration configuration, Set namedReporters) { + final List> reporterConfigurations = new ArrayList<>(namedReporters.size()); + + for (String namedReporter: namedReporters) { + DelegatingConfiguration delegatingConfiguration = new DelegatingConfiguration( + configuration, + ConfigConstants.METRICS_REPORTER_PREFIX + namedReporter + '.'); + reporterConfigurations.add(Tuple2.of(namedReporter, delegatingConfiguration)); + } + return reporterConfigurations; + } + + private static Set findEnabledReportersInConfiguration(Configuration configuration, String includedReportersString) { + Set includedReporters = reporterListPattern.splitAsStream(includedReportersString) + .filter(r -> !r.isEmpty()) // splitting an empty string results in an empty string on jdk9+ + .collect(Collectors.toSet()); + + // use a TreeSet to make the reporter order deterministic, which is useful for testing + Set namedOrderedReporters = new TreeSet<>(String::compareTo); + + // scan entire configuration for keys starting with METRICS_REPORTER_PREFIX and determine the set of enabled reporters + for (String key : configuration.keySet()) { + if (key.startsWith(ConfigConstants.METRICS_REPORTER_PREFIX)) { + Matcher matcher = reporterClassPattern.matcher(key); + if (matcher.matches()) { + String reporterName = matcher.group(1); + if (includedReporters.isEmpty() || includedReporters.contains(reporterName)) { + if (namedOrderedReporters.contains(reporterName)) { + LOG.warn("Duplicate class configuration detected for reporter {}.", reporterName); + } else { + namedOrderedReporters.add(reporterName); + } + } else { + LOG.info("Excluding reporter {}, not configured in reporter list ({}).", reporterName, includedReportersString); + } + } + } + } + return namedOrderedReporters; + } + + private static Map loadAvailableReporterFactories(PluginManager pluginManager) { final Map reporterFactories = new HashMap<>(2); - final Iterator factoryIterator = serviceLoader.iterator(); + final Iterator factoryIterator = getAllReporterFactories(pluginManager); + LOG.debug("All available factories (from both SPIs and Plugins):"); + getAllReporterFactories(pluginManager).forEachRemaining(i -> LOG.debug(i.toString())); // do not use streams or for-each loops here because they do not allow catching individual ServiceConfigurationErrors // such an error might be caused if the META-INF/services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r393580453 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java ## @@ -178,36 +164,96 @@ private static ReporterSetup createReporterSetup(String reporterName, MetricConf metricReporterOptional.ifPresent(reporter -> { MetricConfig metricConfig = new MetricConfig(); reporterConfig.addAllToProperties(metricConfig); - - reporterArguments.add(createReporterSetup(reporterName, metricConfig, reporter)); + reporterSetups.add(createReporterSetup(reporterName, metricConfig, reporter)); }); - } - catch (Throwable t) { + } catch (Throwable t) { LOG.error("Could not instantiate metrics reporter {}. Metrics might not be exposed/reported.", reporterName, t); } } - return reporterArguments; + return reporterSetups; } - private static Map loadReporterFactories() { - final ServiceLoader serviceLoader = ServiceLoader.load(MetricReporterFactory.class); + private static List> loadReporterConfigurations(Configuration configuration, Set namedReporters) { + final List> reporterConfigurations = new ArrayList<>(namedReporters.size()); + + for (String namedReporter: namedReporters) { + DelegatingConfiguration delegatingConfiguration = new DelegatingConfiguration( + configuration, + ConfigConstants.METRICS_REPORTER_PREFIX + namedReporter + '.'); + reporterConfigurations.add(Tuple2.of(namedReporter, delegatingConfiguration)); + } + return reporterConfigurations; + } + + private static Set findEnabledReportersInConfiguration(Configuration configuration, String includedReportersString) { + Set includedReporters = reporterListPattern.splitAsStream(includedReportersString) + .filter(r -> !r.isEmpty()) // splitting an empty string results in an empty string on jdk9+ + .collect(Collectors.toSet()); + + // use a TreeSet to make the reporter order deterministic, which is useful for testing + Set namedOrderedReporters = new TreeSet<>(String::compareTo); + + // scan entire configuration for keys starting with METRICS_REPORTER_PREFIX and determine the set of enabled reporters + for (String key : configuration.keySet()) { + if (key.startsWith(ConfigConstants.METRICS_REPORTER_PREFIX)) { + Matcher matcher = reporterClassPattern.matcher(key); + if (matcher.matches()) { + String reporterName = matcher.group(1); + if (includedReporters.isEmpty() || includedReporters.contains(reporterName)) { + if (namedOrderedReporters.contains(reporterName)) { + LOG.warn("Duplicate class configuration detected for reporter {}.", reporterName); + } else { + namedOrderedReporters.add(reporterName); + } + } else { + LOG.info("Excluding reporter {}, not configured in reporter list ({}).", reporterName, includedReportersString); + } + } + } + } + return namedOrderedReporters; + } + + private static Map loadAvailableReporterFactories(PluginManager pluginManager) { final Map reporterFactories = new HashMap<>(2); - final Iterator factoryIterator = serviceLoader.iterator(); + final Iterator factoryIterator = getAllReporterFactories(pluginManager); + LOG.debug("All available factories (from both SPIs and Plugins):"); + getAllReporterFactories(pluginManager).forEachRemaining(i -> LOG.debug(i.toString())); // do not use streams or for-each loops here because they do not allow catching individual ServiceConfigurationErrors // such an error might be caused if the META-INF/services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r393578320 ## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java ## @@ -77,6 +77,7 @@ private Path conf; Review comment: Addressed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r393570847 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java ## @@ -178,36 +164,96 @@ private static ReporterSetup createReporterSetup(String reporterName, MetricConf metricReporterOptional.ifPresent(reporter -> { MetricConfig metricConfig = new MetricConfig(); reporterConfig.addAllToProperties(metricConfig); - - reporterArguments.add(createReporterSetup(reporterName, metricConfig, reporter)); + reporterSetups.add(createReporterSetup(reporterName, metricConfig, reporter)); }); - } - catch (Throwable t) { + } catch (Throwable t) { LOG.error("Could not instantiate metrics reporter {}. Metrics might not be exposed/reported.", reporterName, t); } } - return reporterArguments; + return reporterSetups; } - private static Map loadReporterFactories() { - final ServiceLoader serviceLoader = ServiceLoader.load(MetricReporterFactory.class); + private static List> loadReporterConfigurations(Configuration configuration, Set namedReporters) { + final List> reporterConfigurations = new ArrayList<>(namedReporters.size()); + + for (String namedReporter: namedReporters) { + DelegatingConfiguration delegatingConfiguration = new DelegatingConfiguration( + configuration, + ConfigConstants.METRICS_REPORTER_PREFIX + namedReporter + '.'); + reporterConfigurations.add(Tuple2.of(namedReporter, delegatingConfiguration)); + } + return reporterConfigurations; + } + + private static Set findEnabledReportersInConfiguration(Configuration configuration, String includedReportersString) { + Set includedReporters = reporterListPattern.splitAsStream(includedReportersString) + .filter(r -> !r.isEmpty()) // splitting an empty string results in an empty string on jdk9+ + .collect(Collectors.toSet()); + + // use a TreeSet to make the reporter order deterministic, which is useful for testing + Set namedOrderedReporters = new TreeSet<>(String::compareTo); + + // scan entire configuration for keys starting with METRICS_REPORTER_PREFIX and determine the set of enabled reporters + for (String key : configuration.keySet()) { + if (key.startsWith(ConfigConstants.METRICS_REPORTER_PREFIX)) { + Matcher matcher = reporterClassPattern.matcher(key); + if (matcher.matches()) { + String reporterName = matcher.group(1); + if (includedReporters.isEmpty() || includedReporters.contains(reporterName)) { + if (namedOrderedReporters.contains(reporterName)) { + LOG.warn("Duplicate class configuration detected for reporter {}.", reporterName); + } else { + namedOrderedReporters.add(reporterName); + } + } else { + LOG.info("Excluding reporter {}, not configured in reporter list ({}).", reporterName, includedReportersString); + } + } + } + } + return namedOrderedReporters; + } + + private static Map loadAvailableReporterFactories(PluginManager pluginManager) { final Map reporterFactories = new HashMap<>(2); - final Iterator factoryIterator = serviceLoader.iterator(); + final Iterator factoryIterator = getAllReporterFactories(pluginManager); + LOG.debug("All available factories (from both SPIs and Plugins):"); + getAllReporterFactories(pluginManager).forEachRemaining(i -> LOG.debug(i.toString())); // do not use streams or for-each loops here because they do not allow catching individual ServiceConfigurationErrors // such an error might be caused if the META-INF/services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r393563258 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java ## @@ -160,13 +161,14 @@ public void startCluster() throws ClusterEntrypointException { LOG.info("Starting {}.", getClass().getSimpleName()); try { - - configureFileSystems(configuration); + //TODO: push down filesystem initialization into runCluster - initializeServices (?) Review comment: @AHeise I wanted to ask if what is written in TODO is a good idea in your opinion. I am not sure about the implications of initializing FileSystems within `runSecured`. I do not quite like that initialization if services (initializeServices) and file systems happen in different places. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r393256771 ## File path: flink-core/src/main/java/org/apache/flink/core/plugin/PluginLoader.java ## @@ -69,7 +68,7 @@ public static PluginLoader create(PluginDescriptor pluginDescriptor, ClassLoader * @param Type of the requested plugin service. * @return An iterator of all implementations of the given service interface that could be loaded from the plugin. */ - public Iterator load(Class service) { + public Iterator load(Class service) { Review comment: Marking as resolved. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r386649079 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java ## @@ -178,36 +164,96 @@ private static ReporterSetup createReporterSetup(String reporterName, MetricConf metricReporterOptional.ifPresent(reporter -> { MetricConfig metricConfig = new MetricConfig(); reporterConfig.addAllToProperties(metricConfig); - - reporterArguments.add(createReporterSetup(reporterName, metricConfig, reporter)); + reporterSetups.add(createReporterSetup(reporterName, metricConfig, reporter)); }); - } - catch (Throwable t) { + } catch (Throwable t) { LOG.error("Could not instantiate metrics reporter {}. Metrics might not be exposed/reported.", reporterName, t); } } - return reporterArguments; + return reporterSetups; } - private static Map loadReporterFactories() { - final ServiceLoader serviceLoader = ServiceLoader.load(MetricReporterFactory.class); + private static List> loadReporterConfigurations(Configuration configuration, Set namedReporters) { + final List> reporterConfigurations = new ArrayList<>(namedReporters.size()); + + for (String namedReporter: namedReporters) { + DelegatingConfiguration delegatingConfiguration = new DelegatingConfiguration( + configuration, + ConfigConstants.METRICS_REPORTER_PREFIX + namedReporter + '.'); + reporterConfigurations.add(Tuple2.of(namedReporter, delegatingConfiguration)); + } + return reporterConfigurations; + } + + private static Set findEnabledReportersInConfiguration(Configuration configuration, String includedReportersString) { + Set includedReporters = reporterListPattern.splitAsStream(includedReportersString) + .filter(r -> !r.isEmpty()) // splitting an empty string results in an empty string on jdk9+ + .collect(Collectors.toSet()); + + // use a TreeSet to make the reporter order deterministic, which is useful for testing + Set namedOrderedReporters = new TreeSet<>(String::compareTo); + + // scan entire configuration for keys starting with METRICS_REPORTER_PREFIX and determine the set of enabled reporters + for (String key : configuration.keySet()) { + if (key.startsWith(ConfigConstants.METRICS_REPORTER_PREFIX)) { + Matcher matcher = reporterClassPattern.matcher(key); + if (matcher.matches()) { + String reporterName = matcher.group(1); + if (includedReporters.isEmpty() || includedReporters.contains(reporterName)) { + if (namedOrderedReporters.contains(reporterName)) { + LOG.warn("Duplicate class configuration detected for reporter {}.", reporterName); + } else { + namedOrderedReporters.add(reporterName); + } + } else { + LOG.info("Excluding reporter {}, not configured in reporter list ({}).", reporterName, includedReportersString); + } + } + } + } + return namedOrderedReporters; + } + + private static Map loadAvailableReporterFactories(PluginManager pluginManager) { final Map reporterFactories = new HashMap<>(2); - final Iterator factoryIterator = serviceLoader.iterator(); + final Iterator factoryIterator = getAllReporterFactories(pluginManager); + LOG.debug("All available factories (from both SPIs and Plugins):"); + getAllReporterFactories(pluginManager).forEachRemaining(i -> LOG.debug(i.toString())); // do not use streams or for-each loops here because they do not allow catching individual ServiceConfigurationErrors // such an error might be caused if the META-INF/services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r386682800 ## File path: flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java ## @@ -185,6 +214,7 @@ public void testReporter() throws Exception { checkMetricAvailability(client, "flink_jobmanager_numRegisteredTaskManagers"); checkMetricAvailability(client, "flink_taskmanager_Status_Network_TotalMemorySegments"); + Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r386644501 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java ## @@ -119,7 +120,7 @@ private boolean shutdown; - public TaskManagerRunner(Configuration configuration, ResourceID resourceId) throws Exception { + public TaskManagerRunner(Configuration configuration, ResourceID resourceId, PluginManager pluginManager) throws Exception { Review comment: I am not sure what should be the expected contract here. ReporterSetup will currently only work with the null pluginManager if no reporters are configured (namedReporters.isEmpty()). We could maybe add a checkNonNull in the ReporterSetup after that first return due to empty namedReporters. (TaskManagerRunnerTest is now fixed to initialize the PluginManager properly) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r386675530 ## File path: flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java ## @@ -35,7 +35,7 @@ * {@link MetricReporter} that exports {@link Metric Metrics} via Prometheus. */ @PublicEvolving -public class PrometheusReporter extends AbstractPrometheusReporter { +public class PrometheusReporter extends AbstractPrometheusReporter implements MetricReporter { Review comment: missed that AbstractPrometheusReporter already implements it. Fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r386663744 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java ## @@ -178,36 +164,96 @@ private static ReporterSetup createReporterSetup(String reporterName, MetricConf metricReporterOptional.ifPresent(reporter -> { MetricConfig metricConfig = new MetricConfig(); reporterConfig.addAllToProperties(metricConfig); - - reporterArguments.add(createReporterSetup(reporterName, metricConfig, reporter)); + reporterSetups.add(createReporterSetup(reporterName, metricConfig, reporter)); }); - } - catch (Throwable t) { + } catch (Throwable t) { LOG.error("Could not instantiate metrics reporter {}. Metrics might not be exposed/reported.", reporterName, t); } } - return reporterArguments; + return reporterSetups; } - private static Map loadReporterFactories() { - final ServiceLoader serviceLoader = ServiceLoader.load(MetricReporterFactory.class); + private static List> loadReporterConfigurations(Configuration configuration, Set namedReporters) { + final List> reporterConfigurations = new ArrayList<>(namedReporters.size()); + + for (String namedReporter: namedReporters) { + DelegatingConfiguration delegatingConfiguration = new DelegatingConfiguration( + configuration, + ConfigConstants.METRICS_REPORTER_PREFIX + namedReporter + '.'); + reporterConfigurations.add(Tuple2.of(namedReporter, delegatingConfiguration)); + } + return reporterConfigurations; + } + + private static Set findEnabledReportersInConfiguration(Configuration configuration, String includedReportersString) { + Set includedReporters = reporterListPattern.splitAsStream(includedReportersString) + .filter(r -> !r.isEmpty()) // splitting an empty string results in an empty string on jdk9+ + .collect(Collectors.toSet()); + + // use a TreeSet to make the reporter order deterministic, which is useful for testing + Set namedOrderedReporters = new TreeSet<>(String::compareTo); + + // scan entire configuration for keys starting with METRICS_REPORTER_PREFIX and determine the set of enabled reporters + for (String key : configuration.keySet()) { + if (key.startsWith(ConfigConstants.METRICS_REPORTER_PREFIX)) { + Matcher matcher = reporterClassPattern.matcher(key); + if (matcher.matches()) { + String reporterName = matcher.group(1); + if (includedReporters.isEmpty() || includedReporters.contains(reporterName)) { + if (namedOrderedReporters.contains(reporterName)) { + LOG.warn("Duplicate class configuration detected for reporter {}.", reporterName); + } else { + namedOrderedReporters.add(reporterName); + } + } else { + LOG.info("Excluding reporter {}, not configured in reporter list ({}).", reporterName, includedReportersString); + } + } + } + } + return namedOrderedReporters; + } + + private static Map loadAvailableReporterFactories(PluginManager pluginManager) { final Map reporterFactories = new HashMap<>(2); - final Iterator factoryIterator = serviceLoader.iterator(); + final Iterator factoryIterator = getAllReporterFactories(pluginManager); + LOG.debug("All available factories (from both SPIs and Plugins):"); + getAllReporterFactories(pluginManager).forEachRemaining(i -> LOG.debug(i.toString())); // do not use streams or for-each loops here because they do not allow catching individual ServiceConfigurationErrors // such an error might be caused if the META-INF/services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r386662966 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java ## @@ -178,36 +164,96 @@ private static ReporterSetup createReporterSetup(String reporterName, MetricConf metricReporterOptional.ifPresent(reporter -> { MetricConfig metricConfig = new MetricConfig(); reporterConfig.addAllToProperties(metricConfig); - - reporterArguments.add(createReporterSetup(reporterName, metricConfig, reporter)); + reporterSetups.add(createReporterSetup(reporterName, metricConfig, reporter)); }); - } - catch (Throwable t) { + } catch (Throwable t) { LOG.error("Could not instantiate metrics reporter {}. Metrics might not be exposed/reported.", reporterName, t); } } - return reporterArguments; + return reporterSetups; } - private static Map loadReporterFactories() { - final ServiceLoader serviceLoader = ServiceLoader.load(MetricReporterFactory.class); + private static List> loadReporterConfigurations(Configuration configuration, Set namedReporters) { + final List> reporterConfigurations = new ArrayList<>(namedReporters.size()); + + for (String namedReporter: namedReporters) { + DelegatingConfiguration delegatingConfiguration = new DelegatingConfiguration( + configuration, + ConfigConstants.METRICS_REPORTER_PREFIX + namedReporter + '.'); + reporterConfigurations.add(Tuple2.of(namedReporter, delegatingConfiguration)); + } + return reporterConfigurations; + } + + private static Set findEnabledReportersInConfiguration(Configuration configuration, String includedReportersString) { + Set includedReporters = reporterListPattern.splitAsStream(includedReportersString) + .filter(r -> !r.isEmpty()) // splitting an empty string results in an empty string on jdk9+ + .collect(Collectors.toSet()); + + // use a TreeSet to make the reporter order deterministic, which is useful for testing + Set namedOrderedReporters = new TreeSet<>(String::compareTo); + + // scan entire configuration for keys starting with METRICS_REPORTER_PREFIX and determine the set of enabled reporters + for (String key : configuration.keySet()) { + if (key.startsWith(ConfigConstants.METRICS_REPORTER_PREFIX)) { + Matcher matcher = reporterClassPattern.matcher(key); + if (matcher.matches()) { + String reporterName = matcher.group(1); + if (includedReporters.isEmpty() || includedReporters.contains(reporterName)) { + if (namedOrderedReporters.contains(reporterName)) { + LOG.warn("Duplicate class configuration detected for reporter {}.", reporterName); + } else { + namedOrderedReporters.add(reporterName); + } + } else { + LOG.info("Excluding reporter {}, not configured in reporter list ({}).", reporterName, includedReportersString); + } + } + } + } + return namedOrderedReporters; + } + + private static Map loadAvailableReporterFactories(PluginManager pluginManager) { final Map reporterFactories = new HashMap<>(2); - final Iterator factoryIterator = serviceLoader.iterator(); + final Iterator factoryIterator = getAllReporterFactories(pluginManager); + LOG.debug("All available factories (from both SPIs and Plugins):"); + getAllReporterFactories(pluginManager).forEachRemaining(i -> LOG.debug(i.toString())); Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r386662377 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java ## @@ -273,3 +319,4 @@ private static ReporterSetup createReporterSetup(String reporterName, MetricConf return Optional.of((MetricReporter) reporterClass.newInstance()); } } + Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r386649079 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java ## @@ -178,36 +164,96 @@ private static ReporterSetup createReporterSetup(String reporterName, MetricConf metricReporterOptional.ifPresent(reporter -> { MetricConfig metricConfig = new MetricConfig(); reporterConfig.addAllToProperties(metricConfig); - - reporterArguments.add(createReporterSetup(reporterName, metricConfig, reporter)); + reporterSetups.add(createReporterSetup(reporterName, metricConfig, reporter)); }); - } - catch (Throwable t) { + } catch (Throwable t) { LOG.error("Could not instantiate metrics reporter {}. Metrics might not be exposed/reported.", reporterName, t); } } - return reporterArguments; + return reporterSetups; } - private static Map loadReporterFactories() { - final ServiceLoader serviceLoader = ServiceLoader.load(MetricReporterFactory.class); + private static List> loadReporterConfigurations(Configuration configuration, Set namedReporters) { + final List> reporterConfigurations = new ArrayList<>(namedReporters.size()); + + for (String namedReporter: namedReporters) { + DelegatingConfiguration delegatingConfiguration = new DelegatingConfiguration( + configuration, + ConfigConstants.METRICS_REPORTER_PREFIX + namedReporter + '.'); + reporterConfigurations.add(Tuple2.of(namedReporter, delegatingConfiguration)); + } + return reporterConfigurations; + } + + private static Set findEnabledReportersInConfiguration(Configuration configuration, String includedReportersString) { + Set includedReporters = reporterListPattern.splitAsStream(includedReportersString) + .filter(r -> !r.isEmpty()) // splitting an empty string results in an empty string on jdk9+ + .collect(Collectors.toSet()); + + // use a TreeSet to make the reporter order deterministic, which is useful for testing + Set namedOrderedReporters = new TreeSet<>(String::compareTo); + + // scan entire configuration for keys starting with METRICS_REPORTER_PREFIX and determine the set of enabled reporters + for (String key : configuration.keySet()) { + if (key.startsWith(ConfigConstants.METRICS_REPORTER_PREFIX)) { + Matcher matcher = reporterClassPattern.matcher(key); + if (matcher.matches()) { + String reporterName = matcher.group(1); + if (includedReporters.isEmpty() || includedReporters.contains(reporterName)) { + if (namedOrderedReporters.contains(reporterName)) { + LOG.warn("Duplicate class configuration detected for reporter {}.", reporterName); + } else { + namedOrderedReporters.add(reporterName); + } + } else { + LOG.info("Excluding reporter {}, not configured in reporter list ({}).", reporterName, includedReportersString); + } + } + } + } + return namedOrderedReporters; + } + + private static Map loadAvailableReporterFactories(PluginManager pluginManager) { final Map reporterFactories = new HashMap<>(2); - final Iterator factoryIterator = serviceLoader.iterator(); + final Iterator factoryIterator = getAllReporterFactories(pluginManager); + LOG.debug("All available factories (from both SPIs and Plugins):"); + getAllReporterFactories(pluginManager).forEachRemaining(i -> LOG.debug(i.toString())); // do not use streams or for-each loops here because they do not allow catching individual ServiceConfigurationErrors // such an error might be caused if the META-INF/services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r386649079 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java ## @@ -178,36 +164,96 @@ private static ReporterSetup createReporterSetup(String reporterName, MetricConf metricReporterOptional.ifPresent(reporter -> { MetricConfig metricConfig = new MetricConfig(); reporterConfig.addAllToProperties(metricConfig); - - reporterArguments.add(createReporterSetup(reporterName, metricConfig, reporter)); + reporterSetups.add(createReporterSetup(reporterName, metricConfig, reporter)); }); - } - catch (Throwable t) { + } catch (Throwable t) { LOG.error("Could not instantiate metrics reporter {}. Metrics might not be exposed/reported.", reporterName, t); } } - return reporterArguments; + return reporterSetups; } - private static Map loadReporterFactories() { - final ServiceLoader serviceLoader = ServiceLoader.load(MetricReporterFactory.class); + private static List> loadReporterConfigurations(Configuration configuration, Set namedReporters) { + final List> reporterConfigurations = new ArrayList<>(namedReporters.size()); + + for (String namedReporter: namedReporters) { + DelegatingConfiguration delegatingConfiguration = new DelegatingConfiguration( + configuration, + ConfigConstants.METRICS_REPORTER_PREFIX + namedReporter + '.'); + reporterConfigurations.add(Tuple2.of(namedReporter, delegatingConfiguration)); + } + return reporterConfigurations; + } + + private static Set findEnabledReportersInConfiguration(Configuration configuration, String includedReportersString) { + Set includedReporters = reporterListPattern.splitAsStream(includedReportersString) + .filter(r -> !r.isEmpty()) // splitting an empty string results in an empty string on jdk9+ + .collect(Collectors.toSet()); + + // use a TreeSet to make the reporter order deterministic, which is useful for testing + Set namedOrderedReporters = new TreeSet<>(String::compareTo); + + // scan entire configuration for keys starting with METRICS_REPORTER_PREFIX and determine the set of enabled reporters + for (String key : configuration.keySet()) { + if (key.startsWith(ConfigConstants.METRICS_REPORTER_PREFIX)) { + Matcher matcher = reporterClassPattern.matcher(key); + if (matcher.matches()) { + String reporterName = matcher.group(1); + if (includedReporters.isEmpty() || includedReporters.contains(reporterName)) { + if (namedOrderedReporters.contains(reporterName)) { + LOG.warn("Duplicate class configuration detected for reporter {}.", reporterName); + } else { + namedOrderedReporters.add(reporterName); + } + } else { + LOG.info("Excluding reporter {}, not configured in reporter list ({}).", reporterName, includedReportersString); + } + } + } + } + return namedOrderedReporters; + } + + private static Map loadAvailableReporterFactories(PluginManager pluginManager) { final Map reporterFactories = new HashMap<>(2); - final Iterator factoryIterator = serviceLoader.iterator(); + final Iterator factoryIterator = getAllReporterFactories(pluginManager); + LOG.debug("All available factories (from both SPIs and Plugins):"); + getAllReporterFactories(pluginManager).forEachRemaining(i -> LOG.debug(i.toString())); // do not use streams or for-each loops here because they do not allow catching individual ServiceConfigurationErrors // such an error might be caused if the META-INF/services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r386646692 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java ## @@ -178,36 +164,96 @@ private static ReporterSetup createReporterSetup(String reporterName, MetricConf metricReporterOptional.ifPresent(reporter -> { MetricConfig metricConfig = new MetricConfig(); reporterConfig.addAllToProperties(metricConfig); - - reporterArguments.add(createReporterSetup(reporterName, metricConfig, reporter)); + reporterSetups.add(createReporterSetup(reporterName, metricConfig, reporter)); }); - } - catch (Throwable t) { + } catch (Throwable t) { LOG.error("Could not instantiate metrics reporter {}. Metrics might not be exposed/reported.", reporterName, t); } } - return reporterArguments; + return reporterSetups; } - private static Map loadReporterFactories() { - final ServiceLoader serviceLoader = ServiceLoader.load(MetricReporterFactory.class); + private static List> loadReporterConfigurations(Configuration configuration, Set namedReporters) { + final List> reporterConfigurations = new ArrayList<>(namedReporters.size()); + + for (String namedReporter: namedReporters) { + DelegatingConfiguration delegatingConfiguration = new DelegatingConfiguration( + configuration, + ConfigConstants.METRICS_REPORTER_PREFIX + namedReporter + '.'); + reporterConfigurations.add(Tuple2.of(namedReporter, delegatingConfiguration)); + } + return reporterConfigurations; + } + + private static Set findEnabledReportersInConfiguration(Configuration configuration, String includedReportersString) { + Set includedReporters = reporterListPattern.splitAsStream(includedReportersString) + .filter(r -> !r.isEmpty()) // splitting an empty string results in an empty string on jdk9+ + .collect(Collectors.toSet()); + + // use a TreeSet to make the reporter order deterministic, which is useful for testing + Set namedOrderedReporters = new TreeSet<>(String::compareTo); + + // scan entire configuration for keys starting with METRICS_REPORTER_PREFIX and determine the set of enabled reporters + for (String key : configuration.keySet()) { + if (key.startsWith(ConfigConstants.METRICS_REPORTER_PREFIX)) { + Matcher matcher = reporterClassPattern.matcher(key); + if (matcher.matches()) { + String reporterName = matcher.group(1); + if (includedReporters.isEmpty() || includedReporters.contains(reporterName)) { + if (namedOrderedReporters.contains(reporterName)) { + LOG.warn("Duplicate class configuration detected for reporter {}.", reporterName); + } else { + namedOrderedReporters.add(reporterName); + } + } else { + LOG.info("Excluding reporter {}, not configured in reporter list ({}).", reporterName, includedReportersString); + } + } + } + } + return namedOrderedReporters; + } + + private static Map loadAvailableReporterFactories(PluginManager pluginManager) { final Map reporterFactories = new HashMap<>(2); - final Iterator factoryIterator = serviceLoader.iterator(); + final Iterator factoryIterator = getAllReporterFactories(pluginManager); + LOG.debug("All available factories (from both SPIs and Plugins):"); + getAllReporterFactories(pluginManager).forEachRemaining(i -> LOG.debug(i.toString())); // do not use streams or for-each loops here because they do not allow catching individual ServiceConfigurationErrors // such an error might be caused if the META-INF/services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r386644501 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java ## @@ -119,7 +120,7 @@ private boolean shutdown; - public TaskManagerRunner(Configuration configuration, ResourceID resourceId) throws Exception { + public TaskManagerRunner(Configuration configuration, ResourceID resourceId, PluginManager pluginManager) throws Exception { Review comment: I am not sure what should be the expected contract here. ReporterSetup will currently only work with the null pluginManager if no reporters are configured (namedReporters.isEmpty()). We could maybe add a checkNonNull in the ReporterSetup after the first return due to empty namedReporters. (TaskManagerRunnerTest is now fixed to initialize the PluginManager properly) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r386630929 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java ## @@ -178,36 +164,96 @@ private static ReporterSetup createReporterSetup(String reporterName, MetricConf metricReporterOptional.ifPresent(reporter -> { MetricConfig metricConfig = new MetricConfig(); reporterConfig.addAllToProperties(metricConfig); - - reporterArguments.add(createReporterSetup(reporterName, metricConfig, reporter)); + reporterSetups.add(createReporterSetup(reporterName, metricConfig, reporter)); }); - } - catch (Throwable t) { + } catch (Throwable t) { LOG.error("Could not instantiate metrics reporter {}. Metrics might not be exposed/reported.", reporterName, t); } } - return reporterArguments; + return reporterSetups; } - private static Map loadReporterFactories() { - final ServiceLoader serviceLoader = ServiceLoader.load(MetricReporterFactory.class); + private static List> loadReporterConfigurations(Configuration configuration, Set namedReporters) { + final List> reporterConfigurations = new ArrayList<>(namedReporters.size()); + + for (String namedReporter: namedReporters) { + DelegatingConfiguration delegatingConfiguration = new DelegatingConfiguration( + configuration, + ConfigConstants.METRICS_REPORTER_PREFIX + namedReporter + '.'); + reporterConfigurations.add(Tuple2.of(namedReporter, delegatingConfiguration)); + } + return reporterConfigurations; + } + + private static Set findEnabledReportersInConfiguration(Configuration configuration, String includedReportersString) { + Set includedReporters = reporterListPattern.splitAsStream(includedReportersString) + .filter(r -> !r.isEmpty()) // splitting an empty string results in an empty string on jdk9+ + .collect(Collectors.toSet()); + + // use a TreeSet to make the reporter order deterministic, which is useful for testing + Set namedOrderedReporters = new TreeSet<>(String::compareTo); + + // scan entire configuration for keys starting with METRICS_REPORTER_PREFIX and determine the set of enabled reporters + for (String key : configuration.keySet()) { + if (key.startsWith(ConfigConstants.METRICS_REPORTER_PREFIX)) { + Matcher matcher = reporterClassPattern.matcher(key); + if (matcher.matches()) { + String reporterName = matcher.group(1); + if (includedReporters.isEmpty() || includedReporters.contains(reporterName)) { + if (namedOrderedReporters.contains(reporterName)) { + LOG.warn("Duplicate class configuration detected for reporter {}.", reporterName); + } else { + namedOrderedReporters.add(reporterName); + } + } else { + LOG.info("Excluding reporter {}, not configured in reporter list ({}).", reporterName, includedReportersString); + } + } + } + } + return namedOrderedReporters; + } + + private static Map loadAvailableReporterFactories(PluginManager pluginManager) { final Map reporterFactories = new HashMap<>(2); - final Iterator factoryIterator = serviceLoader.iterator(); + final Iterator factoryIterator = getAllReporterFactories(pluginManager); + LOG.debug("All available factories (from both SPIs and Plugins):"); + getAllReporterFactories(pluginManager).forEachRemaining(i -> LOG.debug(i.toString())); // do not use streams or for-each loops here because they do not allow catching individual ServiceConfigurationErrors // such an error might be caused if the META-INF/services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r386628387 ## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java ## @@ -38,11 +38,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.BufferedReader; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InputStreamReader; +import java.io.*; Review comment: fixed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r386620764 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java ## @@ -92,7 +92,7 @@ private static Configuration createConfiguration() { } private static TaskManagerRunner createTaskManagerRunner(final Configuration configuration) throws Exception { - TaskManagerRunner taskManagerRunner = new TaskManagerRunner(configuration, ResourceID.generate()); + TaskManagerRunner taskManagerRunner = new TaskManagerRunner(configuration, ResourceID.generate(), null); Review comment: adjusted This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r386613981 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java ## @@ -178,36 +164,96 @@ private static ReporterSetup createReporterSetup(String reporterName, MetricConf metricReporterOptional.ifPresent(reporter -> { MetricConfig metricConfig = new MetricConfig(); reporterConfig.addAllToProperties(metricConfig); - - reporterArguments.add(createReporterSetup(reporterName, metricConfig, reporter)); + reporterSetups.add(createReporterSetup(reporterName, metricConfig, reporter)); }); - } - catch (Throwable t) { + } catch (Throwable t) { Review comment: Is this an accepted style in Flink? I mostly see "classic" variant with `} catch ...` (including the same class in loadReporterFactories() method. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r386613981 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java ## @@ -178,36 +164,96 @@ private static ReporterSetup createReporterSetup(String reporterName, MetricConf metricReporterOptional.ifPresent(reporter -> { MetricConfig metricConfig = new MetricConfig(); reporterConfig.addAllToProperties(metricConfig); - - reporterArguments.add(createReporterSetup(reporterName, metricConfig, reporter)); + reporterSetups.add(createReporterSetup(reporterName, metricConfig, reporter)); }); - } - catch (Throwable t) { + } catch (Throwable t) { Review comment: Is this an accepted style in Flink? I mostly see "classic" variant with `} catch ...` , including the same class in loadReporterFactories() method. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters URL: https://github.com/apache/flink/pull/11195#discussion_r386604078 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java ## @@ -120,55 +124,37 @@ public static ReporterSetup forReporter(String reporterName, MetricConfig metric } private static ReporterSetup createReporterSetup(String reporterName, MetricConfig metricConfig, MetricReporter reporter) { - LOG.info("Configuring {} with {}.", reporterName, metricConfig); + LOG.debug("Configuring {} with {}.", reporterName, metricConfig); Review comment: Good point. I am not 100% sure about it - do we have a Flink-wide way to handle such cases? I guess the cleanest approach would be to have a special set of keys that are considered sensitive, which have to be obfuscated prior to logging. Seeing which other config values are used during the initialization could be generally pretty useful for debugging. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services