[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-24 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r397030268
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
 ##
 @@ -179,28 +164,82 @@ private static ReporterSetup createReporterSetup(String 
reporterName, MetricConf
metricReporterOptional.ifPresent(reporter -> {
MetricConfig metricConfig = new 
MetricConfig();

reporterConfig.addAllToProperties(metricConfig);
-
-   
reporterArguments.add(createReporterSetup(reporterName, metricConfig, 
reporter));
+   
reporterSetups.add(createReporterSetup(reporterName, metricConfig, reporter));
});
-   }
-   catch (Throwable t) {
+   } catch (Throwable t) {
LOG.error("Could not instantiate metrics 
reporter {}. Metrics might not be exposed/reported.", reporterName, t);
}
}
-   return reporterArguments;
+   return reporterSetups;
}
 
-   private static Map 
loadReporterFactories() {
-   final ServiceLoader serviceLoader = 
ServiceLoader.load(MetricReporterFactory.class);
+   private static List> 
loadReporterConfigurations(Configuration configuration, Set 
namedReporters) {
+   final List> 
reporterConfigurations = new ArrayList<>(namedReporters.size());
 
+   for (String namedReporter: namedReporters) {
+   DelegatingConfiguration delegatingConfiguration = new 
DelegatingConfiguration(
+   configuration,
+   ConfigConstants.METRICS_REPORTER_PREFIX + 
namedReporter + '.');
+
+   reporterConfigurations.add(Tuple2.of(namedReporter, 
delegatingConfiguration));
+   }
+   return reporterConfigurations;
+   }
+
+   private static Set 
findEnabledReportersInConfiguration(Configuration configuration, String 
includedReportersString) {
+   Set includedReporters = 
reporterListPattern.splitAsStream(includedReportersString)
+   .filter(r -> !r.isEmpty()) // splitting an empty string 
results in an empty string on jdk9+
+   .collect(Collectors.toSet());
 
 Review comment:
   You've mentioned "to get rid of the next few lines", but this Tree data 
structure is further used as a container that is filled using some conditional 
logic and returned from the method, it is not just about having the input 
entries sorted. Do you propose to rewrite it?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-24 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r397030268
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
 ##
 @@ -179,28 +164,82 @@ private static ReporterSetup createReporterSetup(String 
reporterName, MetricConf
metricReporterOptional.ifPresent(reporter -> {
MetricConfig metricConfig = new 
MetricConfig();

reporterConfig.addAllToProperties(metricConfig);
-
-   
reporterArguments.add(createReporterSetup(reporterName, metricConfig, 
reporter));
+   
reporterSetups.add(createReporterSetup(reporterName, metricConfig, reporter));
});
-   }
-   catch (Throwable t) {
+   } catch (Throwable t) {
LOG.error("Could not instantiate metrics 
reporter {}. Metrics might not be exposed/reported.", reporterName, t);
}
}
-   return reporterArguments;
+   return reporterSetups;
}
 
-   private static Map 
loadReporterFactories() {
-   final ServiceLoader serviceLoader = 
ServiceLoader.load(MetricReporterFactory.class);
+   private static List> 
loadReporterConfigurations(Configuration configuration, Set 
namedReporters) {
+   final List> 
reporterConfigurations = new ArrayList<>(namedReporters.size());
 
+   for (String namedReporter: namedReporters) {
+   DelegatingConfiguration delegatingConfiguration = new 
DelegatingConfiguration(
+   configuration,
+   ConfigConstants.METRICS_REPORTER_PREFIX + 
namedReporter + '.');
+
+   reporterConfigurations.add(Tuple2.of(namedReporter, 
delegatingConfiguration));
+   }
+   return reporterConfigurations;
+   }
+
+   private static Set 
findEnabledReportersInConfiguration(Configuration configuration, String 
includedReportersString) {
+   Set includedReporters = 
reporterListPattern.splitAsStream(includedReportersString)
+   .filter(r -> !r.isEmpty()) // splitting an empty string 
results in an empty string on jdk9+
+   .collect(Collectors.toSet());
 
 Review comment:
   You've mentioned "to get rid of the next few lines", but this Tree data 
structure is further used as a container that is filled with some conditional 
logic and returned, it is not just about having the input entries sorted. Do 
you propose to rewrite it?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-24 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r397030268
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
 ##
 @@ -179,28 +164,82 @@ private static ReporterSetup createReporterSetup(String 
reporterName, MetricConf
metricReporterOptional.ifPresent(reporter -> {
MetricConfig metricConfig = new 
MetricConfig();

reporterConfig.addAllToProperties(metricConfig);
-
-   
reporterArguments.add(createReporterSetup(reporterName, metricConfig, 
reporter));
+   
reporterSetups.add(createReporterSetup(reporterName, metricConfig, reporter));
});
-   }
-   catch (Throwable t) {
+   } catch (Throwable t) {
LOG.error("Could not instantiate metrics 
reporter {}. Metrics might not be exposed/reported.", reporterName, t);
}
}
-   return reporterArguments;
+   return reporterSetups;
}
 
-   private static Map 
loadReporterFactories() {
-   final ServiceLoader serviceLoader = 
ServiceLoader.load(MetricReporterFactory.class);
+   private static List> 
loadReporterConfigurations(Configuration configuration, Set 
namedReporters) {
+   final List> 
reporterConfigurations = new ArrayList<>(namedReporters.size());
 
+   for (String namedReporter: namedReporters) {
+   DelegatingConfiguration delegatingConfiguration = new 
DelegatingConfiguration(
+   configuration,
+   ConfigConstants.METRICS_REPORTER_PREFIX + 
namedReporter + '.');
+
+   reporterConfigurations.add(Tuple2.of(namedReporter, 
delegatingConfiguration));
+   }
+   return reporterConfigurations;
+   }
+
+   private static Set 
findEnabledReportersInConfiguration(Configuration configuration, String 
includedReportersString) {
+   Set includedReporters = 
reporterListPattern.splitAsStream(includedReportersString)
+   .filter(r -> !r.isEmpty()) // splitting an empty string 
results in an empty string on jdk9+
+   .collect(Collectors.toSet());
 
 Review comment:
   You've mentioned "to get rid of the next few lines", but this Tree data 
structure is further used as a container that is filled using some conditional 
logic and returned, it is not just about having the input entries sorted. Do 
you propose to rewrite it?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-23 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r396809894
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java
 ##
 @@ -120,24 +122,51 @@ public static void checkOS() {
public final DownloadCache downloadCache = DownloadCache.get();
 
@Test
-   public void testReporter() throws Exception {
-   dist.copyOptJarsToLib("flink-metrics-prometheus");
+   public void reporterWorksWhenFoundInLibsViaReflection() throws 
Exception {
+   dist.copyOptJarsToLib(PROMETHEUS_JAR_PREFIX);
 
 Review comment:
   @zentol It seems that this updated approach that got merged into master does 
not support the kinds of tests that we would need to do for the supported 
scenarios (see reporterWorksWhenFoundInLibsViaReflection, 
reporterWorksWhenFoundInPluginsViaReflection, 
reporterWorksWhenFoundBothInPluginsAndLibsViaFactories in this PR). How should 
we proceed? I can either bend the FlinkResource implementation back to the 
state where it supports modifications of the underlying resources after 
creation and keep the initialization in `@Rule` (a hack), or reinitialize 
FlinkResource in every test. What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-23 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r396809894
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java
 ##
 @@ -120,24 +122,51 @@ public static void checkOS() {
public final DownloadCache downloadCache = DownloadCache.get();
 
@Test
-   public void testReporter() throws Exception {
-   dist.copyOptJarsToLib("flink-metrics-prometheus");
+   public void reporterWorksWhenFoundInLibsViaReflection() throws 
Exception {
+   dist.copyOptJarsToLib(PROMETHEUS_JAR_PREFIX);
 
 Review comment:
   @zentol It seems that this updated approach that got merged into master does 
not support the kinds of tests that we would need to do for the supported 
scenarios (see reporterWorksWhenFoundInLibsViaReflection, 
reporterWorksWhenFoundInPluginsViaReflection, 
reporterWorksWhenFoundBothInPluginsAndLibsViaFactories in this PR). How should 
we proceed? I can either bend the FlinkResource implementation back to the 
state where it supports modifications of the underlying resources after 
creation and keep the initialization in `@Rule` (a hack), or reinitizliae 
FlinkResource in every test (probably a bad idea from performance and overhead 
perspectives). What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-23 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r396809894
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java
 ##
 @@ -120,24 +122,51 @@ public static void checkOS() {
public final DownloadCache downloadCache = DownloadCache.get();
 
@Test
-   public void testReporter() throws Exception {
-   dist.copyOptJarsToLib("flink-metrics-prometheus");
+   public void reporterWorksWhenFoundInLibsViaReflection() throws 
Exception {
+   dist.copyOptJarsToLib(PROMETHEUS_JAR_PREFIX);
 
 Review comment:
   @zentol It seems that this updated approach that got merged into master does 
not support the kinds of tests that we would need to do for the supported 
scenarios (see reporterWorksWhenFoundInLibsViaReflection, 
reporterWorksWhenFoundInPluginsViaReflection, 
reporterWorksWhenFoundBothInPluginsAndLibsViaFactories in this PR). How should 
we proceed? I can either bend the FlinkResource implementation back to the 
state where it supports modifications of the underlying resources after 
creation and keep the initialization in @ Rule (a hack), or reinitizliae 
FlinkResource in every test (probably a bad idea from performance and overhead 
perspectives). What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-23 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r396809894
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java
 ##
 @@ -120,24 +122,51 @@ public static void checkOS() {
public final DownloadCache downloadCache = DownloadCache.get();
 
@Test
-   public void testReporter() throws Exception {
-   dist.copyOptJarsToLib("flink-metrics-prometheus");
+   public void reporterWorksWhenFoundInLibsViaReflection() throws 
Exception {
+   dist.copyOptJarsToLib(PROMETHEUS_JAR_PREFIX);
 
 Review comment:
   @zentol It seems that this updated approach that got merged into master does 
not support the kinds of tests that we would need to do for the supported 
scenarios (see reporterWorksWhenFoundInLibsViaReflection, 
reporterWorksWhenFoundInPluginsViaReflection, 
reporterWorksWhenFoundBothInPluginsAndLibsViaFactories in this PR). How should 
we proceed? I can either bend the FlinkResource implementation back to the 
state where it supports modifications of the underlying resources after 
creation and keep the initialization in @Rule (a hack), or reinitizliae 
FlinkResource in every test (probably a bad idea from performance and overhead 
perspectives). What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-20 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r395661258
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
 ##
 @@ -210,17 +213,31 @@ private static ReporterSetup createReporterSetup(String 
reporterName, MetricConf
return namedOrderedReporters;
}
 
-   private static Map 
loadReporterFactories() {
-   final ServiceLoader serviceLoader = 
ServiceLoader.load(MetricReporterFactory.class);
-
+   private static Map 
loadAvailableReporterFactories(PluginManager pluginManager) {
final Map reporterFactories = 
new HashMap<>(2);
-   final Iterator factoryIterator = 
serviceLoader.iterator();
+   final Iterator factoryIterator = 
getAllReporterFactories(pluginManager);
+   LOG.info("Prepare reporter factories (from both SPIs and 
Plugins):");
// do not use streams or for-each loops here because they do 
not allow catching individual ServiceConfigurationErrors
// such an error might be caused if the META-INF/services 
contains an entry to a non-existing factory class
while (factoryIterator.hasNext()) {
try {
MetricReporterFactory factory = 
factoryIterator.next();
-   
reporterFactories.put(factory.getClass().getName(), factory);
+   String factoryClassName = 
factory.getClass().getName();
+   MetricReporterFactory existingFactory = 
reporterFactories.get(factoryClassName);
+   if (existingFactory == null){
+   reporterFactories.put(factoryClassName, 
factory);
+   LOG.info("Found reporter factory {} at 
{} ",
+   factoryClassName,
+   new 
File(factory.getClass().getProtectionDomain().getCodeSource().getLocation().toURI()).getCanonicalPath());
+   } else {
+   //TODO: use path information below, 
when Plugin Classloader stops always prioritizing factories from /lib
+// String jarPath1 = new 
File(existingFactory.getClass().getProtectionDomain().getCodeSource().getLocation()
+// .toURI()).getCanonicalPath();
+// String jarPath2 = new 
File(factory.getClass().getProtectionDomain().getCodeSource().getLocation()
+// .toURI()).getCanonicalPath();
+// LOG.warn("Multiple implementations of 
the same reporter were found: \n {} and \n{}", jarPath1, jarPath2);
+   LOG.warn("Multiple implementations of 
the same reporter were found in 'lib' and/or 'plugins' directories for {}. It 
is recommended to remove redundant reporter JARs to resolve used versions' 
ambiguity.", factoryClassName);
 
 Review comment:
   I leave it as "and/or" here, and then we comb through the docs and sources 
to address it separately.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-19 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r395376334
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java
 ##
 @@ -260,16 +262,26 @@ public void submitSQLJob(SQLJobSubmission job) throws 
IOException {
}
 
public void copyOptJarsToLib(String jarNamePrefix) throws 
FileNotFoundException, IOException {
-   final Optional reporterJarOptional;
-   try (Stream logFiles = Files.walk(opt)) {
-   reporterJarOptional = logFiles
+   copyOptJars(jarNamePrefix, lib);
+   }
+
+   public void copyOptJarsToPlugins(String jarNamePrefix) throws 
FileNotFoundException, IOException {
 
 Review comment:
   I assume you mean to modify `FlinkDistribution#mapJarLocationToPath` ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-19 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r395375550
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java
 ##
 @@ -260,16 +262,26 @@ public void submitSQLJob(SQLJobSubmission job) throws 
IOException {
}
 
public void copyOptJarsToLib(String jarNamePrefix) throws 
FileNotFoundException, IOException {
-   final Optional reporterJarOptional;
-   try (Stream logFiles = Files.walk(opt)) {
-   reporterJarOptional = logFiles
+   copyOptJars(jarNamePrefix, lib);
+   }
+
+   public void copyOptJarsToPlugins(String jarNamePrefix) throws 
FileNotFoundException, IOException {
 
 Review comment:
   > and modify FlinkDistribution#moveJar to handle this location appropriately.
   
   could you clarify? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-19 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r395376334
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java
 ##
 @@ -260,16 +262,26 @@ public void submitSQLJob(SQLJobSubmission job) throws 
IOException {
}
 
public void copyOptJarsToLib(String jarNamePrefix) throws 
FileNotFoundException, IOException {
-   final Optional reporterJarOptional;
-   try (Stream logFiles = Files.walk(opt)) {
-   reporterJarOptional = logFiles
+   copyOptJars(jarNamePrefix, lib);
+   }
+
+   public void copyOptJarsToPlugins(String jarNamePrefix) throws 
FileNotFoundException, IOException {
 
 Review comment:
   I assume you mean to modify FlinkDistribution#mapJarLocationToPath ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-19 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r395375550
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java
 ##
 @@ -260,16 +262,26 @@ public void submitSQLJob(SQLJobSubmission job) throws 
IOException {
}
 
public void copyOptJarsToLib(String jarNamePrefix) throws 
FileNotFoundException, IOException {
-   final Optional reporterJarOptional;
-   try (Stream logFiles = Files.walk(opt)) {
-   reporterJarOptional = logFiles
+   copyOptJars(jarNamePrefix, lib);
+   }
+
+   public void copyOptJarsToPlugins(String jarNamePrefix) throws 
FileNotFoundException, IOException {
 
 Review comment:
   > and modify FlinkDistribution#moveJar to handle this location appropriately.
   - could you clarify? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-19 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r395375550
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java
 ##
 @@ -260,16 +262,26 @@ public void submitSQLJob(SQLJobSubmission job) throws 
IOException {
}
 
public void copyOptJarsToLib(String jarNamePrefix) throws 
FileNotFoundException, IOException {
-   final Optional reporterJarOptional;
-   try (Stream logFiles = Files.walk(opt)) {
-   reporterJarOptional = logFiles
+   copyOptJars(jarNamePrefix, lib);
+   }
+
+   public void copyOptJarsToPlugins(String jarNamePrefix) throws 
FileNotFoundException, IOException {
 
 Review comment:
   > and modify FlinkDistribution#moveJar to handle this location appropriately.
   
   could you clarify? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-19 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r395375550
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java
 ##
 @@ -260,16 +262,26 @@ public void submitSQLJob(SQLJobSubmission job) throws 
IOException {
}
 
public void copyOptJarsToLib(String jarNamePrefix) throws 
FileNotFoundException, IOException {
-   final Optional reporterJarOptional;
-   try (Stream logFiles = Files.walk(opt)) {
-   reporterJarOptional = logFiles
+   copyOptJars(jarNamePrefix, lib);
+   }
+
+   public void copyOptJarsToPlugins(String jarNamePrefix) throws 
FileNotFoundException, IOException {
 
 Review comment:
   > and modify FlinkDistribution#moveJar to handle this location appropriately.
could you clarify? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-19 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r395375423
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java
 ##
 @@ -260,16 +262,26 @@ public void submitSQLJob(SQLJobSubmission job) throws 
IOException {
}
 
public void copyOptJarsToLib(String jarNamePrefix) throws 
FileNotFoundException, IOException {
-   final Optional reporterJarOptional;
-   try (Stream logFiles = Files.walk(opt)) {
-   reporterJarOptional = logFiles
+   copyOptJars(jarNamePrefix, lib);
+   }
+
+   public void copyOptJarsToPlugins(String jarNamePrefix) throws 
FileNotFoundException, IOException {
 
 Review comment:
   But this is not really "copying jars", right? It will actually move the file 
from opt to lib or plugins. The problem is that one of the cases I would like 
to test required the jar to be actually copied to both.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-19 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r395348055
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
 ##
 @@ -179,28 +164,82 @@ private static ReporterSetup createReporterSetup(String 
reporterName, MetricConf
metricReporterOptional.ifPresent(reporter -> {
MetricConfig metricConfig = new 
MetricConfig();

reporterConfig.addAllToProperties(metricConfig);
-
-   
reporterArguments.add(createReporterSetup(reporterName, metricConfig, 
reporter));
+   
reporterSetups.add(createReporterSetup(reporterName, metricConfig, reporter));
});
-   }
-   catch (Throwable t) {
+   } catch (Throwable t) {
LOG.error("Could not instantiate metrics 
reporter {}. Metrics might not be exposed/reported.", reporterName, t);
}
}
-   return reporterArguments;
+   return reporterSetups;
}
 
-   private static Map 
loadReporterFactories() {
-   final ServiceLoader serviceLoader = 
ServiceLoader.load(MetricReporterFactory.class);
+   private static List> 
loadReporterConfigurations(Configuration configuration, Set 
namedReporters) {
+   final List> 
reporterConfigurations = new ArrayList<>(namedReporters.size());
 
+   for (String namedReporter: namedReporters) {
+   DelegatingConfiguration delegatingConfiguration = new 
DelegatingConfiguration(
+   configuration,
+   ConfigConstants.METRICS_REPORTER_PREFIX + 
namedReporter + '.');
+
+   reporterConfigurations.add(Tuple2.of(namedReporter, 
delegatingConfiguration));
+   }
+   return reporterConfigurations;
+   }
+
+   private static Set 
findEnabledReportersInConfiguration(Configuration configuration, String 
includedReportersString) {
+   Set includedReporters = 
reporterListPattern.splitAsStream(includedReportersString)
+   .filter(r -> !r.isEmpty()) // splitting an empty string 
results in an empty string on jdk9+
+   .collect(Collectors.toSet());
 
 Review comment:
   @AHeise I have applied this refactoring but then understood that I probably 
really did not get what you actually propose. Could you please clarify? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-19 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r395368073
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
 ##
 @@ -178,36 +164,96 @@ private static ReporterSetup createReporterSetup(String 
reporterName, MetricConf
metricReporterOptional.ifPresent(reporter -> {
MetricConfig metricConfig = new 
MetricConfig();

reporterConfig.addAllToProperties(metricConfig);
-
-   
reporterArguments.add(createReporterSetup(reporterName, metricConfig, 
reporter));
+   
reporterSetups.add(createReporterSetup(reporterName, metricConfig, reporter));
});
-   }
-   catch (Throwable t) {
+   } catch (Throwable t) {
LOG.error("Could not instantiate metrics 
reporter {}. Metrics might not be exposed/reported.", reporterName, t);
}
}
-   return reporterArguments;
+   return reporterSetups;
}
 
-   private static Map 
loadReporterFactories() {
-   final ServiceLoader serviceLoader = 
ServiceLoader.load(MetricReporterFactory.class);
+   private static List> 
loadReporterConfigurations(Configuration configuration, Set 
namedReporters) {
+   final List> 
reporterConfigurations = new ArrayList<>(namedReporters.size());
+
+   for (String namedReporter: namedReporters) {
+   DelegatingConfiguration delegatingConfiguration = new 
DelegatingConfiguration(
+   configuration,
+   ConfigConstants.METRICS_REPORTER_PREFIX + 
namedReporter + '.');
 
+   reporterConfigurations.add(Tuple2.of(namedReporter, 
delegatingConfiguration));
+   }
+   return reporterConfigurations;
+   }
+
+   private static Set 
findEnabledReportersInConfiguration(Configuration configuration, String 
includedReportersString) {
+   Set includedReporters = 
reporterListPattern.splitAsStream(includedReportersString)
+   .filter(r -> !r.isEmpty()) // splitting an empty string 
results in an empty string on jdk9+
+   .collect(Collectors.toSet());
+
+   // use a TreeSet to make the reporter order deterministic, 
which is useful for testing
+   Set namedOrderedReporters = new 
TreeSet<>(String::compareTo);
+
+   // scan entire configuration for keys starting with 
METRICS_REPORTER_PREFIX and determine the set of enabled reporters
+   for (String key : configuration.keySet()) {
+   if 
(key.startsWith(ConfigConstants.METRICS_REPORTER_PREFIX)) {
+   Matcher matcher = 
reporterClassPattern.matcher(key);
+   if (matcher.matches()) {
+   String reporterName = matcher.group(1);
+   if (includedReporters.isEmpty() || 
includedReporters.contains(reporterName)) {
+   if 
(namedOrderedReporters.contains(reporterName)) {
+   LOG.warn("Duplicate 
class configuration detected for reporter {}.", reporterName);
+   } else {
+   
namedOrderedReporters.add(reporterName);
+   }
+   } else {
+   LOG.info("Excluding reporter 
{}, not configured in reporter list ({}).", reporterName, 
includedReportersString);
+   }
+   }
+   }
+   }
+   return namedOrderedReporters;
+   }
+
+   private static Map 
loadAvailableReporterFactories(PluginManager pluginManager) {
final Map reporterFactories = 
new HashMap<>(2);
-   final Iterator factoryIterator = 
serviceLoader.iterator();
+   final Iterator factoryIterator = 
getAllReporterFactories(pluginManager);
+   LOG.debug("All available factories (from both SPIs and 
Plugins):");
+   getAllReporterFactories(pluginManager).forEachRemaining(i -> 
LOG.debug(i.toString()));
// do not use streams or for-each loops here because they do 
not allow catching individual ServiceConfigurationErrors
// such an error might be caused if the META-INF/services 

[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-19 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r395366353
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 ##
 @@ -160,13 +161,14 @@ public void startCluster() throws 
ClusterEntrypointException {
LOG.info("Starting {}.", getClass().getSimpleName());
 
try {
-
-   configureFileSystems(configuration);
+   //TODO: push down filesystem initialization into 
runCluster - initializeServices (?)
 
 Review comment:
   If it is something non-trivial and hard to make a call about, I would 
propose to skip this refactoring for now.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-19 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r395365114
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
 ##
 @@ -210,17 +213,31 @@ private static ReporterSetup createReporterSetup(String 
reporterName, MetricConf
return namedOrderedReporters;
}
 
-   private static Map 
loadReporterFactories() {
-   final ServiceLoader serviceLoader = 
ServiceLoader.load(MetricReporterFactory.class);
-
+   private static Map 
loadAvailableReporterFactories(PluginManager pluginManager) {
final Map reporterFactories = 
new HashMap<>(2);
-   final Iterator factoryIterator = 
serviceLoader.iterator();
+   final Iterator factoryIterator = 
getAllReporterFactories(pluginManager);
+   LOG.info("Prepare reporter factories (from both SPIs and 
Plugins):");
// do not use streams or for-each loops here because they do 
not allow catching individual ServiceConfigurationErrors
// such an error might be caused if the META-INF/services 
contains an entry to a non-existing factory class
while (factoryIterator.hasNext()) {
try {
MetricReporterFactory factory = 
factoryIterator.next();
-   
reporterFactories.put(factory.getClass().getName(), factory);
+   String factoryClassName = 
factory.getClass().getName();
+   MetricReporterFactory existingFactory = 
reporterFactories.get(factoryClassName);
+   if (existingFactory == null){
+   reporterFactories.put(factoryClassName, 
factory);
+   LOG.info("Found reporter factory {} at 
{} ",
+   factoryClassName,
+   new 
File(factory.getClass().getProtectionDomain().getCodeSource().getLocation().toURI()).getCanonicalPath());
+   } else {
+   //TODO: use path information below, 
when Plugin Classloader stops always prioritizing factories from /lib
+// String jarPath1 = new 
File(existingFactory.getClass().getProtectionDomain().getCodeSource().getLocation()
+// .toURI()).getCanonicalPath();
+// String jarPath2 = new 
File(factory.getClass().getProtectionDomain().getCodeSource().getLocation()
+// .toURI()).getCanonicalPath();
+// LOG.warn("Multiple implementations of 
the same reporter were found: \n {} and \n{}", jarPath1, jarPath2);
+   LOG.warn("Multiple implementations of 
the same reporter were found in 'lib' and/or 'plugins' directories for {}. It 
is recommended to remove redundant reporter JARs to resolve used versions' 
ambiguity.", factoryClassName);
 
 Review comment:
   I thing the problem is that we have too many variants to describe with a 
single coordinating conjunction like "or" (plus "or" can unfortunately be both 
inclusive and exclusive). Cases:
   (xx) _ ()
   () _ (xx)
   (x) _ (x)
   (xx) _ (xx)
   
   Alternatives like "a or b or both" also do not work, because in this 
particular case they can be easily be interpreted as if the "(x) _ (x)" case is 
not an issue (because of "Multiple" in the beginning). Seems to me like the 
best case to indicate this ambiguity is is to use "and/or" so that people will 
be aware of multiple ways this can happen.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-19 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r395365114
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
 ##
 @@ -210,17 +213,31 @@ private static ReporterSetup createReporterSetup(String 
reporterName, MetricConf
return namedOrderedReporters;
}
 
-   private static Map 
loadReporterFactories() {
-   final ServiceLoader serviceLoader = 
ServiceLoader.load(MetricReporterFactory.class);
-
+   private static Map 
loadAvailableReporterFactories(PluginManager pluginManager) {
final Map reporterFactories = 
new HashMap<>(2);
-   final Iterator factoryIterator = 
serviceLoader.iterator();
+   final Iterator factoryIterator = 
getAllReporterFactories(pluginManager);
+   LOG.info("Prepare reporter factories (from both SPIs and 
Plugins):");
// do not use streams or for-each loops here because they do 
not allow catching individual ServiceConfigurationErrors
// such an error might be caused if the META-INF/services 
contains an entry to a non-existing factory class
while (factoryIterator.hasNext()) {
try {
MetricReporterFactory factory = 
factoryIterator.next();
-   
reporterFactories.put(factory.getClass().getName(), factory);
+   String factoryClassName = 
factory.getClass().getName();
+   MetricReporterFactory existingFactory = 
reporterFactories.get(factoryClassName);
+   if (existingFactory == null){
+   reporterFactories.put(factoryClassName, 
factory);
+   LOG.info("Found reporter factory {} at 
{} ",
+   factoryClassName,
+   new 
File(factory.getClass().getProtectionDomain().getCodeSource().getLocation().toURI()).getCanonicalPath());
+   } else {
+   //TODO: use path information below, 
when Plugin Classloader stops always prioritizing factories from /lib
+// String jarPath1 = new 
File(existingFactory.getClass().getProtectionDomain().getCodeSource().getLocation()
+// .toURI()).getCanonicalPath();
+// String jarPath2 = new 
File(factory.getClass().getProtectionDomain().getCodeSource().getLocation()
+// .toURI()).getCanonicalPath();
+// LOG.warn("Multiple implementations of 
the same reporter were found: \n {} and \n{}", jarPath1, jarPath2);
+   LOG.warn("Multiple implementations of 
the same reporter were found in 'lib' and/or 'plugins' directories for {}. It 
is recommended to remove redundant reporter JARs to resolve used versions' 
ambiguity.", factoryClassName);
 
 Review comment:
   I thing the problem is that we have too many variants to describe with a 
single coordinating conjunction like "or" (plus "or" can unfortunately be both 
inclusive and exclusive). Cases:
   (xx) _ ()
   () _ (xx)
   (x) _ (x)
   (xx) _ (xx)
   
   Alternatives like "a or b or both" also do not work, because in this 
particular case they can be easily be interpreted as if the "(x) _ (x)" case is 
not an issue (because of "Multiple" in the beginning). Seems to me like the 
best case to indicate this ambiguity is is to use "and/or" so that people will 
be aware for what kinds of problem potentially to look for.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-19 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r395365114
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
 ##
 @@ -210,17 +213,31 @@ private static ReporterSetup createReporterSetup(String 
reporterName, MetricConf
return namedOrderedReporters;
}
 
-   private static Map 
loadReporterFactories() {
-   final ServiceLoader serviceLoader = 
ServiceLoader.load(MetricReporterFactory.class);
-
+   private static Map 
loadAvailableReporterFactories(PluginManager pluginManager) {
final Map reporterFactories = 
new HashMap<>(2);
-   final Iterator factoryIterator = 
serviceLoader.iterator();
+   final Iterator factoryIterator = 
getAllReporterFactories(pluginManager);
+   LOG.info("Prepare reporter factories (from both SPIs and 
Plugins):");
// do not use streams or for-each loops here because they do 
not allow catching individual ServiceConfigurationErrors
// such an error might be caused if the META-INF/services 
contains an entry to a non-existing factory class
while (factoryIterator.hasNext()) {
try {
MetricReporterFactory factory = 
factoryIterator.next();
-   
reporterFactories.put(factory.getClass().getName(), factory);
+   String factoryClassName = 
factory.getClass().getName();
+   MetricReporterFactory existingFactory = 
reporterFactories.get(factoryClassName);
+   if (existingFactory == null){
+   reporterFactories.put(factoryClassName, 
factory);
+   LOG.info("Found reporter factory {} at 
{} ",
+   factoryClassName,
+   new 
File(factory.getClass().getProtectionDomain().getCodeSource().getLocation().toURI()).getCanonicalPath());
+   } else {
+   //TODO: use path information below, 
when Plugin Classloader stops always prioritizing factories from /lib
+// String jarPath1 = new 
File(existingFactory.getClass().getProtectionDomain().getCodeSource().getLocation()
+// .toURI()).getCanonicalPath();
+// String jarPath2 = new 
File(factory.getClass().getProtectionDomain().getCodeSource().getLocation()
+// .toURI()).getCanonicalPath();
+// LOG.warn("Multiple implementations of 
the same reporter were found: \n {} and \n{}", jarPath1, jarPath2);
+   LOG.warn("Multiple implementations of 
the same reporter were found in 'lib' and/or 'plugins' directories for {}. It 
is recommended to remove redundant reporter JARs to resolve used versions' 
ambiguity.", factoryClassName);
 
 Review comment:
   I thing the problem is that we have too many variants to describe with a 
single coordinating conjunction like "or" (plus "or" can unfortunately be both 
inclusive and exclusive). Cases:
   (xx) _ ()
   () _ (xx)
   (x) _ (x)
   (xx) _ (xx)
   
   Alternatives like "x or y or both" also do not work, because in this 
particular case they can be easily be interpreted as if the "(x) _ (x)" case is 
not an issue (because of "Multiple" in the beginning). Seems to me like the 
best case to indicate this ambiguity is is to use "and/or" so that people will 
be aware for what kinds of problem potentially to look for.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-19 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r395360427
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java
 ##
 @@ -120,24 +122,51 @@ public static void checkOS() {
public final DownloadCache downloadCache = DownloadCache.get();
 
@Test
-   public void testReporter() throws Exception {
-   dist.copyOptJarsToLib("flink-metrics-prometheus");
+   public void reporterWorksWhenFoundInLibsViaReflection() throws 
Exception {
+   dist.copyOptJarsToLib(PROMETHEUS_JAR_PREFIX);
+   testReporter(false);
+   }
+
+   @Test
+   public void reporterWorksWhenFoundInPluginsViaReflection() throws 
Exception {
+   dist.copyOptJarsToPlugins(PROMETHEUS_JAR_PREFIX);
+   testReporter(false);
+   }
+
+   @Test
+   public void reporterWorksWhenFoundInPluginsViaFactories() throws 
Exception {
+   dist.copyOptJarsToPlugins(PROMETHEUS_JAR_PREFIX);
+   testReporter(true);
+   }
 
+   @Test
+   public void reporterWorksWhenFoundBothInPluginsAndLibsViaFactories() 
throws Exception {
+   dist.copyOptJarsToPlugins(PROMETHEUS_JAR_PREFIX);
+   dist.copyOptJarsToLib(PROMETHEUS_JAR_PREFIX);
+   testReporter(true);
+   }
+
+   private void testReporter(boolean useFactory) throws Exception {
final Configuration config = new Configuration();
-   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"prom." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
PrometheusReporter.class.getCanonicalName());
+
+   if (useFactory) {
+   
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "prom." + 
ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX, 
PrometheusReporterFactory.class.getName());
+   } else {
+   
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "prom." + 
ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
PrometheusReporter.class.getCanonicalName());
+   }
+
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"prom.port", "9000-9100");
 
dist.appendConfiguration(config);
 
final Path tmpPrometheusDir = 
tmp.newFolder().toPath().resolve("prometheus");
-   final Path prometheusArchive = 
tmpPrometheusDir.resolve(PROMETHEUS_FILE_NAME + ".tar.gz");
final Path prometheusBinDir = 
tmpPrometheusDir.resolve(PROMETHEUS_FILE_NAME);
final Path prometheusConfig = 
prometheusBinDir.resolve("prometheus.yml");
final Path prometheusBinary = 
prometheusBinDir.resolve("prometheus");
Files.createDirectory(tmpPrometheusDir);
 
-   downloadCache.getOrDownload(
-   
"https://github.com/prometheus/prometheus/releases/download/v; + 
PROMETHEUS_VERSION + '/' + prometheusArchive.getFileName(),
+   final Path prometheusArchive = downloadCache.getOrDownload(
 
 Review comment:
   Split as requested.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-19 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r395348055
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
 ##
 @@ -179,28 +164,82 @@ private static ReporterSetup createReporterSetup(String 
reporterName, MetricConf
metricReporterOptional.ifPresent(reporter -> {
MetricConfig metricConfig = new 
MetricConfig();

reporterConfig.addAllToProperties(metricConfig);
-
-   
reporterArguments.add(createReporterSetup(reporterName, metricConfig, 
reporter));
+   
reporterSetups.add(createReporterSetup(reporterName, metricConfig, reporter));
});
-   }
-   catch (Throwable t) {
+   } catch (Throwable t) {
LOG.error("Could not instantiate metrics 
reporter {}. Metrics might not be exposed/reported.", reporterName, t);
}
}
-   return reporterArguments;
+   return reporterSetups;
}
 
-   private static Map 
loadReporterFactories() {
-   final ServiceLoader serviceLoader = 
ServiceLoader.load(MetricReporterFactory.class);
+   private static List> 
loadReporterConfigurations(Configuration configuration, Set 
namedReporters) {
+   final List> 
reporterConfigurations = new ArrayList<>(namedReporters.size());
 
+   for (String namedReporter: namedReporters) {
+   DelegatingConfiguration delegatingConfiguration = new 
DelegatingConfiguration(
+   configuration,
+   ConfigConstants.METRICS_REPORTER_PREFIX + 
namedReporter + '.');
+
+   reporterConfigurations.add(Tuple2.of(namedReporter, 
delegatingConfiguration));
+   }
+   return reporterConfigurations;
+   }
+
+   private static Set 
findEnabledReportersInConfiguration(Configuration configuration, String 
includedReportersString) {
+   Set includedReporters = 
reporterListPattern.splitAsStream(includedReportersString)
+   .filter(r -> !r.isEmpty()) // splitting an empty string 
results in an empty string on jdk9+
+   .collect(Collectors.toSet());
 
 Review comment:
   @AHeise I have applied this refactoring but then understood that I probably 
did not get what you actually propose. Could you please clarify? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-19 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r395348055
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
 ##
 @@ -179,28 +164,82 @@ private static ReporterSetup createReporterSetup(String 
reporterName, MetricConf
metricReporterOptional.ifPresent(reporter -> {
MetricConfig metricConfig = new 
MetricConfig();

reporterConfig.addAllToProperties(metricConfig);
-
-   
reporterArguments.add(createReporterSetup(reporterName, metricConfig, 
reporter));
+   
reporterSetups.add(createReporterSetup(reporterName, metricConfig, reporter));
});
-   }
-   catch (Throwable t) {
+   } catch (Throwable t) {
LOG.error("Could not instantiate metrics 
reporter {}. Metrics might not be exposed/reported.", reporterName, t);
}
}
-   return reporterArguments;
+   return reporterSetups;
}
 
-   private static Map 
loadReporterFactories() {
-   final ServiceLoader serviceLoader = 
ServiceLoader.load(MetricReporterFactory.class);
+   private static List> 
loadReporterConfigurations(Configuration configuration, Set 
namedReporters) {
+   final List> 
reporterConfigurations = new ArrayList<>(namedReporters.size());
 
+   for (String namedReporter: namedReporters) {
+   DelegatingConfiguration delegatingConfiguration = new 
DelegatingConfiguration(
+   configuration,
+   ConfigConstants.METRICS_REPORTER_PREFIX + 
namedReporter + '.');
+
+   reporterConfigurations.add(Tuple2.of(namedReporter, 
delegatingConfiguration));
+   }
+   return reporterConfigurations;
+   }
+
+   private static Set 
findEnabledReportersInConfiguration(Configuration configuration, String 
includedReportersString) {
+   Set includedReporters = 
reporterListPattern.splitAsStream(includedReportersString)
+   .filter(r -> !r.isEmpty()) // splitting an empty string 
results in an empty string on jdk9+
+   .collect(Collectors.toSet());
 
 Review comment:
   @AHeise I have applied this refactoring but then understood that I probably 
did not get what you actually propose. Could you clarify? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-19 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r395337241
 
 

 ##
 File path: 
flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporterFactory.java
 ##
 @@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus;
+
+import org.apache.flink.core.plugin.Plugin;
+import org.apache.flink.metrics.reporter.MetricReporterFactory;
+
+import java.util.Properties;
+
+/**
+ * {@link MetricReporterFactory} for {@link PrometheusReporter}.
+ */
+public class PrometheusReporterFactory implements MetricReporterFactory, 
Plugin {
 
 Review comment:
   Added.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-19 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r395335333
 
 

 ##
 File path: 
flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporterFactory.java
 ##
 @@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus;
+
+import org.apache.flink.core.plugin.Plugin;
+import org.apache.flink.metrics.reporter.MetricReporterFactory;
+
+import java.util.Properties;
+
+/**
+ * {@link MetricReporterFactory} for {@link PrometheusReporter}.
+ */
+public class PrometheusReporterFactory implements MetricReporterFactory, 
Plugin {
+
+   @Override
+   public PrometheusReporter createMetricReporter(Properties properties) {
+   return new PrometheusReporter();
 
 Review comment:
   It seems that this will pull a rather large refactoring with it, because of 
the call to `super.open(config)` in the `open` methods and because of having to 
reconcile different configuration containers - `Properties` vs `MetricsConfig`. 
I would prefer to address it in a separate refactoring PR, if possible.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-19 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r395326037
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
 ##
 @@ -210,17 +213,31 @@ private static ReporterSetup createReporterSetup(String 
reporterName, MetricConf
return namedOrderedReporters;
}
 
-   private static Map 
loadReporterFactories() {
-   final ServiceLoader serviceLoader = 
ServiceLoader.load(MetricReporterFactory.class);
-
+   private static Map 
loadAvailableReporterFactories(PluginManager pluginManager) {
final Map reporterFactories = 
new HashMap<>(2);
-   final Iterator factoryIterator = 
serviceLoader.iterator();
+   final Iterator factoryIterator = 
getAllReporterFactories(pluginManager);
+   LOG.info("Prepare reporter factories (from both SPIs and 
Plugins):");
// do not use streams or for-each loops here because they do 
not allow catching individual ServiceConfigurationErrors
// such an error might be caused if the META-INF/services 
contains an entry to a non-existing factory class
while (factoryIterator.hasNext()) {
try {
MetricReporterFactory factory = 
factoryIterator.next();
-   
reporterFactories.put(factory.getClass().getName(), factory);
+   String factoryClassName = 
factory.getClass().getName();
+   MetricReporterFactory existingFactory = 
reporterFactories.get(factoryClassName);
+   if (existingFactory == null){
+   reporterFactories.put(factoryClassName, 
factory);
+   LOG.info("Found reporter factory {} at 
{} ",
+   factoryClassName,
+   new 
File(factory.getClass().getProtectionDomain().getCodeSource().getLocation().toURI()).getCanonicalPath());
+   } else {
+   //TODO: use path information below, 
when Plugin Classloader stops always prioritizing factories from /lib
+// String jarPath1 = new 
File(existingFactory.getClass().getProtectionDomain().getCodeSource().getLocation()
+// .toURI()).getCanonicalPath();
+// String jarPath2 = new 
File(factory.getClass().getProtectionDomain().getCodeSource().getLocation()
+// .toURI()).getCanonicalPath();
+// LOG.warn("Multiple implementations of 
the same reporter were found: \n {} and \n{}", jarPath1, jarPath2);
+   LOG.warn("Multiple implementations of 
the same reporter were found in 'lib' and/or 'plugins' directories for {}. It 
is recommended to remove redundant reporter JARs to resolve used versions' 
ambiguity.", factoryClassName);
 
 Review comment:
   https://en.wikipedia.org/wiki/And/or 
   
   `It is used as an inclusive "or" (as in logic and mathematics), while an 
"or" in spoken language might be inclusive or exclusive.`
   
   Seems to me like something that reduces ambiguity, as @zentol said.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-19 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r395323032
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
 ##
 @@ -178,36 +164,96 @@ private static ReporterSetup createReporterSetup(String 
reporterName, MetricConf
metricReporterOptional.ifPresent(reporter -> {
MetricConfig metricConfig = new 
MetricConfig();

reporterConfig.addAllToProperties(metricConfig);
-
-   
reporterArguments.add(createReporterSetup(reporterName, metricConfig, 
reporter));
+   
reporterSetups.add(createReporterSetup(reporterName, metricConfig, reporter));
});
-   }
-   catch (Throwable t) {
+   } catch (Throwable t) {
 
 Review comment:
   Done.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-19 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r393563258
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 ##
 @@ -160,13 +161,14 @@ public void startCluster() throws 
ClusterEntrypointException {
LOG.info("Starting {}.", getClass().getSimpleName());
 
try {
-
-   configureFileSystems(configuration);
+   //TODO: push down filesystem initialization into 
runCluster - initializeServices (?)
 
 Review comment:
   @AHeise I wanted to ask if what is written in TODO is a good idea in your 
opinion. I am not sure about the implications of initializing FileSystems 
within `runSecured`. I do not quite like that initialization of services 
(initializeServices) and  file systems happen in different places. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-19 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r395299396
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
 ##
 @@ -120,55 +124,37 @@ public static ReporterSetup forReporter(String 
reporterName, MetricConfig metric
}
 
private static ReporterSetup createReporterSetup(String reporterName, 
MetricConfig metricConfig, MetricReporter reporter) {
-   LOG.info("Configuring {} with {}.", reporterName, metricConfig);
+   LOG.debug("Configuring {} with {}.", reporterName, 
metricConfig);
reporter.open(metricConfig);
 
return new ReporterSetup(reporterName, metricConfig, reporter);
}
 
-   public static List fromConfiguration(final Configuration 
configuration) {
+   public static List fromConfiguration(final Configuration 
configuration, final PluginManager pluginManager) {
+   LOG.debug("Initializing Reporters from Configuration: {}", 
configuration);
String includedReportersString = 
configuration.getString(MetricOptions.REPORTERS_LIST, "");
-   Set includedReporters = 
reporterListPattern.splitAsStream(includedReportersString)
 
 Review comment:
   As this did not come up in the second round I consider the new split of 
commits as appropriate. Resolving.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-19 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r395298324
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
 ##
 @@ -178,36 +164,96 @@ private static ReporterSetup createReporterSetup(String 
reporterName, MetricConf
metricReporterOptional.ifPresent(reporter -> {
MetricConfig metricConfig = new 
MetricConfig();

reporterConfig.addAllToProperties(metricConfig);
-
-   
reporterArguments.add(createReporterSetup(reporterName, metricConfig, 
reporter));
+   
reporterSetups.add(createReporterSetup(reporterName, metricConfig, reporter));
});
-   }
-   catch (Throwable t) {
+   } catch (Throwable t) {
 
 Review comment:
   @zentol Ok, I find it a bit strange, but I am not setting the rules here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-17 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r393950683
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
 ##
 @@ -120,55 +124,37 @@ public static ReporterSetup forReporter(String 
reporterName, MetricConfig metric
}
 
private static ReporterSetup createReporterSetup(String reporterName, 
MetricConfig metricConfig, MetricReporter reporter) {
-   LOG.info("Configuring {} with {}.", reporterName, metricConfig);
+   LOG.debug("Configuring {} with {}.", reporterName, 
metricConfig);
 
 Review comment:
   Removed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-17 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r393950579
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
 ##
 @@ -120,55 +124,37 @@ public static ReporterSetup forReporter(String 
reporterName, MetricConfig metric
}
 
private static ReporterSetup createReporterSetup(String reporterName, 
MetricConfig metricConfig, MetricReporter reporter) {
-   LOG.info("Configuring {} with {}.", reporterName, metricConfig);
+   LOG.debug("Configuring {} with {}.", reporterName, 
metricConfig);
reporter.open(metricConfig);
 
return new ReporterSetup(reporterName, metricConfig, reporter);
}
 
-   public static List fromConfiguration(final Configuration 
configuration) {
+   public static List fromConfiguration(final Configuration 
configuration, final PluginManager pluginManager) {
+   LOG.debug("Initializing Reporters from Configuration: {}", 
configuration);
String includedReportersString = 
configuration.getString(MetricOptions.REPORTERS_LIST, "");
-   Set includedReporters = 
reporterListPattern.splitAsStream(includedReportersString)
-   .filter(r -> !r.isEmpty()) // splitting an empty string 
results in an empty string on jdk9+
-   .collect(Collectors.toSet());
 
-   // use a TreeSet to make the reporter order deterministic, 
which is useful for testing
-   Set namedReporters = new TreeSet<>(String::compareTo);
-   // scan entire configuration for "metric.reporter" keys and 
parse individual reporter configurations
-   for (String key : configuration.keySet()) {
-   if 
(key.startsWith(ConfigConstants.METRICS_REPORTER_PREFIX)) {
-   Matcher matcher = 
reporterClassPattern.matcher(key);
-   if (matcher.matches()) {
-   String reporterName = matcher.group(1);
-   if (includedReporters.isEmpty() || 
includedReporters.contains(reporterName)) {
-   if 
(namedReporters.contains(reporterName)) {
-   LOG.warn("Duplicate 
class configuration detected for reporter {}.", reporterName);
-   } else {
-   
namedReporters.add(reporterName);
-   }
-   } else {
-   LOG.info("Excluding reporter 
{}, not configured in reporter list ({}).", reporterName, 
includedReportersString);
-   }
-   }
-   }
-   }
+   Set namedReporters = 
findEnabledReportersInConfiguration(configuration,
+   includedReportersString);
 
if (namedReporters.isEmpty()) {
return Collections.emptyList();
}
 
-   List> reporterConfigurations = 
new ArrayList<>(namedReporters.size());
+   final Map reporterFactories = 
loadAvailableReporterFactories(pluginManager);
+   LOG.debug("Loaded Reporter Factories: {}", reporterFactories);
+   final List> 
reporterConfigurations = loadReporterConfigurations(configuration, 
namedReporters);
+   LOG.debug("Loaded Reporter Configurations: {}", 
reporterConfigurations);
 
 Review comment:
   Addressed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-17 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r393950508
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
 ##
 @@ -120,55 +124,37 @@ public static ReporterSetup forReporter(String 
reporterName, MetricConfig metric
}
 
private static ReporterSetup createReporterSetup(String reporterName, 
MetricConfig metricConfig, MetricReporter reporter) {
-   LOG.info("Configuring {} with {}.", reporterName, metricConfig);
+   LOG.debug("Configuring {} with {}.", reporterName, 
metricConfig);
reporter.open(metricConfig);
 
return new ReporterSetup(reporterName, metricConfig, reporter);
}
 
-   public static List fromConfiguration(final Configuration 
configuration) {
+   public static List fromConfiguration(final Configuration 
configuration, final PluginManager pluginManager) {
+   LOG.debug("Initializing Reporters from Configuration: {}", 
configuration);
String includedReportersString = 
configuration.getString(MetricOptions.REPORTERS_LIST, "");
-   Set includedReporters = 
reporterListPattern.splitAsStream(includedReportersString)
-   .filter(r -> !r.isEmpty()) // splitting an empty string 
results in an empty string on jdk9+
-   .collect(Collectors.toSet());
 
-   // use a TreeSet to make the reporter order deterministic, 
which is useful for testing
-   Set namedReporters = new TreeSet<>(String::compareTo);
-   // scan entire configuration for "metric.reporter" keys and 
parse individual reporter configurations
-   for (String key : configuration.keySet()) {
-   if 
(key.startsWith(ConfigConstants.METRICS_REPORTER_PREFIX)) {
-   Matcher matcher = 
reporterClassPattern.matcher(key);
-   if (matcher.matches()) {
-   String reporterName = matcher.group(1);
-   if (includedReporters.isEmpty() || 
includedReporters.contains(reporterName)) {
-   if 
(namedReporters.contains(reporterName)) {
-   LOG.warn("Duplicate 
class configuration detected for reporter {}.", reporterName);
-   } else {
-   
namedReporters.add(reporterName);
-   }
-   } else {
-   LOG.info("Excluding reporter 
{}, not configured in reporter list ({}).", reporterName, 
includedReportersString);
-   }
-   }
-   }
-   }
+   Set namedReporters = 
findEnabledReportersInConfiguration(configuration,
+   includedReportersString);
 
if (namedReporters.isEmpty()) {
return Collections.emptyList();
}
 
-   List> reporterConfigurations = 
new ArrayList<>(namedReporters.size());
+   final Map reporterFactories = 
loadAvailableReporterFactories(pluginManager);
+   LOG.debug("Loaded Reporter Factories: {}", reporterFactories);
+   final List> 
reporterConfigurations = loadReporterConfigurations(configuration, 
namedReporters);
+   LOG.debug("Loaded Reporter Configurations: {}", 
reporterConfigurations);
 
-   for (String namedReporter: namedReporters) {
-   DelegatingConfiguration delegatingConfiguration = new 
DelegatingConfiguration(
-   configuration,
-   ConfigConstants.METRICS_REPORTER_PREFIX + 
namedReporter + '.');
+   List reporterSetups = 
setupReporters(reporterFactories, reporterConfigurations);
+   LOG.debug("All initialized Reporters:");
+   reporterSetups.forEach(i -> LOG.debug("{} - {}", i.getName(), 
i.getConfiguration()));
 
 Review comment:
   Addressed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-17 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r393950810
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
 ##
 @@ -120,55 +124,37 @@ public static ReporterSetup forReporter(String 
reporterName, MetricConfig metric
}
 
private static ReporterSetup createReporterSetup(String reporterName, 
MetricConfig metricConfig, MetricReporter reporter) {
-   LOG.info("Configuring {} with {}.", reporterName, metricConfig);
+   LOG.debug("Configuring {} with {}.", reporterName, 
metricConfig);
reporter.open(metricConfig);
 
return new ReporterSetup(reporterName, metricConfig, reporter);
}
 
-   public static List fromConfiguration(final Configuration 
configuration) {
+   public static List fromConfiguration(final Configuration 
configuration, final PluginManager pluginManager) {
+   LOG.debug("Initializing Reporters from Configuration: {}", 
configuration);
 
 Review comment:
   Removed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-17 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r393848386
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
 ##
 @@ -179,28 +164,82 @@ private static ReporterSetup createReporterSetup(String 
reporterName, MetricConf
metricReporterOptional.ifPresent(reporter -> {
MetricConfig metricConfig = new 
MetricConfig();

reporterConfig.addAllToProperties(metricConfig);
-
-   
reporterArguments.add(createReporterSetup(reporterName, metricConfig, 
reporter));
+   
reporterSetups.add(createReporterSetup(reporterName, metricConfig, reporter));
});
-   }
-   catch (Throwable t) {
+   } catch (Throwable t) {
LOG.error("Could not instantiate metrics 
reporter {}. Metrics might not be exposed/reported.", reporterName, t);
}
}
-   return reporterArguments;
+   return reporterSetups;
}
 
-   private static Map 
loadReporterFactories() {
-   final ServiceLoader serviceLoader = 
ServiceLoader.load(MetricReporterFactory.class);
+   private static List> 
loadReporterConfigurations(Configuration configuration, Set 
namedReporters) {
+   final List> 
reporterConfigurations = new ArrayList<>(namedReporters.size());
 
+   for (String namedReporter: namedReporters) {
+   DelegatingConfiguration delegatingConfiguration = new 
DelegatingConfiguration(
+   configuration,
+   ConfigConstants.METRICS_REPORTER_PREFIX + 
namedReporter + '.');
+
+   reporterConfigurations.add(Tuple2.of(namedReporter, 
delegatingConfiguration));
+   }
+   return reporterConfigurations;
+   }
+
+   private static Set 
findEnabledReportersInConfiguration(Configuration configuration, String 
includedReportersString) {
+   Set includedReporters = 
reporterListPattern.splitAsStream(includedReportersString)
+   .filter(r -> !r.isEmpty()) // splitting an empty string 
results in an empty string on jdk9+
+   .collect(Collectors.toSet());
 
 Review comment:
   Thanks, changed as proposed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-17 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r393848386
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
 ##
 @@ -179,28 +164,82 @@ private static ReporterSetup createReporterSetup(String 
reporterName, MetricConf
metricReporterOptional.ifPresent(reporter -> {
MetricConfig metricConfig = new 
MetricConfig();

reporterConfig.addAllToProperties(metricConfig);
-
-   
reporterArguments.add(createReporterSetup(reporterName, metricConfig, 
reporter));
+   
reporterSetups.add(createReporterSetup(reporterName, metricConfig, reporter));
});
-   }
-   catch (Throwable t) {
+   } catch (Throwable t) {
LOG.error("Could not instantiate metrics 
reporter {}. Metrics might not be exposed/reported.", reporterName, t);
}
}
-   return reporterArguments;
+   return reporterSetups;
}
 
-   private static Map 
loadReporterFactories() {
-   final ServiceLoader serviceLoader = 
ServiceLoader.load(MetricReporterFactory.class);
+   private static List> 
loadReporterConfigurations(Configuration configuration, Set 
namedReporters) {
+   final List> 
reporterConfigurations = new ArrayList<>(namedReporters.size());
 
+   for (String namedReporter: namedReporters) {
+   DelegatingConfiguration delegatingConfiguration = new 
DelegatingConfiguration(
+   configuration,
+   ConfigConstants.METRICS_REPORTER_PREFIX + 
namedReporter + '.');
+
+   reporterConfigurations.add(Tuple2.of(namedReporter, 
delegatingConfiguration));
+   }
+   return reporterConfigurations;
+   }
+
+   private static Set 
findEnabledReportersInConfiguration(Configuration configuration, String 
includedReportersString) {
+   Set includedReporters = 
reporterListPattern.splitAsStream(includedReportersString)
+   .filter(r -> !r.isEmpty()) // splitting an empty string 
results in an empty string on jdk9+
+   .collect(Collectors.toSet());
 
 Review comment:
   Thanks, changes as proposed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-17 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r393843746
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 ##
 @@ -202,9 +204,11 @@ private SecurityContext 
installSecurityContext(Configuration configuration) thro
return SecurityUtils.getInstalledContext();
}
 
-   private void runCluster(Configuration configuration) throws Exception {
+   private void runCluster(Configuration configuration, PluginManager 
pluginManager) throws Exception {
synchronized (lock) {
-   initializeServices(configuration);
+
+   //TODO: Ask why FileSystem is not initialized here too.
 
 Review comment:
   See above.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-17 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r393842876
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
 ##
 @@ -179,28 +164,82 @@ private static ReporterSetup createReporterSetup(String 
reporterName, MetricConf
metricReporterOptional.ifPresent(reporter -> {
MetricConfig metricConfig = new 
MetricConfig();

reporterConfig.addAllToProperties(metricConfig);
-
-   
reporterArguments.add(createReporterSetup(reporterName, metricConfig, 
reporter));
+   
reporterSetups.add(createReporterSetup(reporterName, metricConfig, reporter));
});
-   }
-   catch (Throwable t) {
+   } catch (Throwable t) {
LOG.error("Could not instantiate metrics 
reporter {}. Metrics might not be exposed/reported.", reporterName, t);
}
}
-   return reporterArguments;
+   return reporterSetups;
}
 
-   private static Map 
loadReporterFactories() {
-   final ServiceLoader serviceLoader = 
ServiceLoader.load(MetricReporterFactory.class);
+   private static List> 
loadReporterConfigurations(Configuration configuration, Set 
namedReporters) {
+   final List> 
reporterConfigurations = new ArrayList<>(namedReporters.size());
 
+   for (String namedReporter: namedReporters) {
+   DelegatingConfiguration delegatingConfiguration = new 
DelegatingConfiguration(
+   configuration,
+   ConfigConstants.METRICS_REPORTER_PREFIX + 
namedReporter + '.');
+
+   reporterConfigurations.add(Tuple2.of(namedReporter, 
delegatingConfiguration));
+   }
+   return reporterConfigurations;
+   }
+
+   private static Set 
findEnabledReportersInConfiguration(Configuration configuration, String 
includedReportersString) {
+   Set includedReporters = 
reporterListPattern.splitAsStream(includedReportersString)
+   .filter(r -> !r.isEmpty()) // splitting an empty string 
results in an empty string on jdk9+
+   .collect(Collectors.toSet());
+
+   // use a TreeSet to make the reporter order deterministic, 
which is useful for testing
+   Set namedOrderedReporters = new 
TreeSet<>(String::compareTo);
+
+   // scan entire configuration for keys starting with 
METRICS_REPORTER_PREFIX and determine the set of enabled reporters
+   for (String key : configuration.keySet()) {
+   if 
(key.startsWith(ConfigConstants.METRICS_REPORTER_PREFIX)) {
+   Matcher matcher = 
reporterClassPattern.matcher(key);
+   if (matcher.matches()) {
 
 Review comment:
   Seems like a silent skip . This is old code, maybe @zentol could comment.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-17 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r393840952
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
 ##
 @@ -120,55 +124,37 @@ public static ReporterSetup forReporter(String 
reporterName, MetricConfig metric
}
 
private static ReporterSetup createReporterSetup(String reporterName, 
MetricConfig metricConfig, MetricReporter reporter) {
-   LOG.info("Configuring {} with {}.", reporterName, metricConfig);
+   LOG.debug("Configuring {} with {}.", reporterName, 
metricConfig);
reporter.open(metricConfig);
 
return new ReporterSetup(reporterName, metricConfig, reporter);
}
 
-   public static List fromConfiguration(final Configuration 
configuration) {
+   public static List fromConfiguration(final Configuration 
configuration, final PluginManager pluginManager) {
+   LOG.debug("Initializing Reporters from Configuration: {}", 
configuration);
String includedReportersString = 
configuration.getString(MetricOptions.REPORTERS_LIST, "");
-   Set includedReporters = 
reporterListPattern.splitAsStream(includedReportersString)
 
 Review comment:
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-17 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r393840337
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
 ##
 @@ -120,55 +124,37 @@ public static ReporterSetup forReporter(String 
reporterName, MetricConfig metric
}
 
private static ReporterSetup createReporterSetup(String reporterName, 
MetricConfig metricConfig, MetricReporter reporter) {
-   LOG.info("Configuring {} with {}.", reporterName, metricConfig);
+   LOG.debug("Configuring {} with {}.", reporterName, 
metricConfig);
reporter.open(metricConfig);
 
return new ReporterSetup(reporterName, metricConfig, reporter);
}
 
-   public static List fromConfiguration(final Configuration 
configuration) {
+   public static List fromConfiguration(final Configuration 
configuration, final PluginManager pluginManager) {
+   LOG.debug("Initializing Reporters from Configuration: {}", 
configuration);
String includedReportersString = 
configuration.getString(MetricOptions.REPORTERS_LIST, "");
-   Set includedReporters = 
reporterListPattern.splitAsStream(includedReportersString)
 
 Review comment:
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-17 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r393840337
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
 ##
 @@ -120,55 +124,37 @@ public static ReporterSetup forReporter(String 
reporterName, MetricConfig metric
}
 
private static ReporterSetup createReporterSetup(String reporterName, 
MetricConfig metricConfig, MetricReporter reporter) {
-   LOG.info("Configuring {} with {}.", reporterName, metricConfig);
+   LOG.debug("Configuring {} with {}.", reporterName, 
metricConfig);
reporter.open(metricConfig);
 
return new ReporterSetup(reporterName, metricConfig, reporter);
}
 
-   public static List fromConfiguration(final Configuration 
configuration) {
+   public static List fromConfiguration(final Configuration 
configuration, final PluginManager pluginManager) {
+   LOG.debug("Initializing Reporters from Configuration: {}", 
configuration);
String includedReportersString = 
configuration.getString(MetricOptions.REPORTERS_LIST, "");
-   Set includedReporters = 
reporterListPattern.splitAsStream(includedReportersString)
 
 Review comment:
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-17 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r393839754
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
 ##
 @@ -120,55 +124,37 @@ public static ReporterSetup forReporter(String 
reporterName, MetricConfig metric
}
 
private static ReporterSetup createReporterSetup(String reporterName, 
MetricConfig metricConfig, MetricReporter reporter) {
-   LOG.info("Configuring {} with {}.", reporterName, metricConfig);
+   LOG.debug("Configuring {} with {}.", reporterName, 
metricConfig);
 
 Review comment:
   Ping @AHeise @zentol - could we agree on something here?
   I am always "pro extensive logging", but this could be professional 
deformation. Being able to "on-demand" see what is going on is very valuable 
for production systems. We could declare somewhere for Flink in general that if 
you choose to run with debug log level, some potentially sensitive information 
could leak into logs. My arguments are:
   1) If someone has uncontrolled access to the log files on your machine in 
production, content of this file is probably not the biggest of your problems.
   2) Running with debug level is not a "normal" scenario - this is intended 
for hands on investigation of issues. Log level for potentially compromisable 
external systems could be explicitly set to trace in such cases.
   3) We have been "leaking" this data in the current versions with info (!) 
level without much concern


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-17 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r393832500
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
 ##
 @@ -178,36 +164,96 @@ private static ReporterSetup createReporterSetup(String 
reporterName, MetricConf
metricReporterOptional.ifPresent(reporter -> {
MetricConfig metricConfig = new 
MetricConfig();

reporterConfig.addAllToProperties(metricConfig);
-
-   
reporterArguments.add(createReporterSetup(reporterName, metricConfig, 
reporter));
+   
reporterSetups.add(createReporterSetup(reporterName, metricConfig, reporter));
});
-   }
-   catch (Throwable t) {
+   } catch (Throwable t) {
 
 Review comment:
   This whole code block was already "touched" anyhow, because of refactoring, 
so I think it should be OK to do such things, unless you have a strong opinion.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-17 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r393832500
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
 ##
 @@ -178,36 +164,96 @@ private static ReporterSetup createReporterSetup(String 
reporterName, MetricConf
metricReporterOptional.ifPresent(reporter -> {
MetricConfig metricConfig = new 
MetricConfig();

reporterConfig.addAllToProperties(metricConfig);
-
-   
reporterArguments.add(createReporterSetup(reporterName, metricConfig, 
reporter));
+   
reporterSetups.add(createReporterSetup(reporterName, metricConfig, reporter));
});
-   }
-   catch (Throwable t) {
+   } catch (Throwable t) {
 
 Review comment:
   This code was touched anyhow, because of refactoring, so I think it should 
be OK to do such things, unless you have a strong opinion about this.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-17 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r393832500
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
 ##
 @@ -178,36 +164,96 @@ private static ReporterSetup createReporterSetup(String 
reporterName, MetricConf
metricReporterOptional.ifPresent(reporter -> {
MetricConfig metricConfig = new 
MetricConfig();

reporterConfig.addAllToProperties(metricConfig);
-
-   
reporterArguments.add(createReporterSetup(reporterName, metricConfig, 
reporter));
+   
reporterSetups.add(createReporterSetup(reporterName, metricConfig, reporter));
});
-   }
-   catch (Throwable t) {
+   } catch (Throwable t) {
 
 Review comment:
   This code was touched anyhow, because of refactoring, so I think it should 
be OK to do such things, unless you have a strong opinion.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-17 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r393580453
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
 ##
 @@ -178,36 +164,96 @@ private static ReporterSetup createReporterSetup(String 
reporterName, MetricConf
metricReporterOptional.ifPresent(reporter -> {
MetricConfig metricConfig = new 
MetricConfig();

reporterConfig.addAllToProperties(metricConfig);
-
-   
reporterArguments.add(createReporterSetup(reporterName, metricConfig, 
reporter));
+   
reporterSetups.add(createReporterSetup(reporterName, metricConfig, reporter));
});
-   }
-   catch (Throwable t) {
+   } catch (Throwable t) {
LOG.error("Could not instantiate metrics 
reporter {}. Metrics might not be exposed/reported.", reporterName, t);
}
}
-   return reporterArguments;
+   return reporterSetups;
}
 
-   private static Map 
loadReporterFactories() {
-   final ServiceLoader serviceLoader = 
ServiceLoader.load(MetricReporterFactory.class);
+   private static List> 
loadReporterConfigurations(Configuration configuration, Set 
namedReporters) {
+   final List> 
reporterConfigurations = new ArrayList<>(namedReporters.size());
+
+   for (String namedReporter: namedReporters) {
+   DelegatingConfiguration delegatingConfiguration = new 
DelegatingConfiguration(
+   configuration,
+   ConfigConstants.METRICS_REPORTER_PREFIX + 
namedReporter + '.');
 
+   reporterConfigurations.add(Tuple2.of(namedReporter, 
delegatingConfiguration));
+   }
+   return reporterConfigurations;
+   }
+
+   private static Set 
findEnabledReportersInConfiguration(Configuration configuration, String 
includedReportersString) {
+   Set includedReporters = 
reporterListPattern.splitAsStream(includedReportersString)
+   .filter(r -> !r.isEmpty()) // splitting an empty string 
results in an empty string on jdk9+
+   .collect(Collectors.toSet());
+
+   // use a TreeSet to make the reporter order deterministic, 
which is useful for testing
+   Set namedOrderedReporters = new 
TreeSet<>(String::compareTo);
+
+   // scan entire configuration for keys starting with 
METRICS_REPORTER_PREFIX and determine the set of enabled reporters
+   for (String key : configuration.keySet()) {
+   if 
(key.startsWith(ConfigConstants.METRICS_REPORTER_PREFIX)) {
+   Matcher matcher = 
reporterClassPattern.matcher(key);
+   if (matcher.matches()) {
+   String reporterName = matcher.group(1);
+   if (includedReporters.isEmpty() || 
includedReporters.contains(reporterName)) {
+   if 
(namedOrderedReporters.contains(reporterName)) {
+   LOG.warn("Duplicate 
class configuration detected for reporter {}.", reporterName);
+   } else {
+   
namedOrderedReporters.add(reporterName);
+   }
+   } else {
+   LOG.info("Excluding reporter 
{}, not configured in reporter list ({}).", reporterName, 
includedReportersString);
+   }
+   }
+   }
+   }
+   return namedOrderedReporters;
+   }
+
+   private static Map 
loadAvailableReporterFactories(PluginManager pluginManager) {
final Map reporterFactories = 
new HashMap<>(2);
-   final Iterator factoryIterator = 
serviceLoader.iterator();
+   final Iterator factoryIterator = 
getAllReporterFactories(pluginManager);
+   LOG.debug("All available factories (from both SPIs and 
Plugins):");
+   getAllReporterFactories(pluginManager).forEachRemaining(i -> 
LOG.debug(i.toString()));
// do not use streams or for-each loops here because they do 
not allow catching individual ServiceConfigurationErrors
// such an error might be caused if the META-INF/services 

[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-17 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r393580453
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
 ##
 @@ -178,36 +164,96 @@ private static ReporterSetup createReporterSetup(String 
reporterName, MetricConf
metricReporterOptional.ifPresent(reporter -> {
MetricConfig metricConfig = new 
MetricConfig();

reporterConfig.addAllToProperties(metricConfig);
-
-   
reporterArguments.add(createReporterSetup(reporterName, metricConfig, 
reporter));
+   
reporterSetups.add(createReporterSetup(reporterName, metricConfig, reporter));
});
-   }
-   catch (Throwable t) {
+   } catch (Throwable t) {
LOG.error("Could not instantiate metrics 
reporter {}. Metrics might not be exposed/reported.", reporterName, t);
}
}
-   return reporterArguments;
+   return reporterSetups;
}
 
-   private static Map 
loadReporterFactories() {
-   final ServiceLoader serviceLoader = 
ServiceLoader.load(MetricReporterFactory.class);
+   private static List> 
loadReporterConfigurations(Configuration configuration, Set 
namedReporters) {
+   final List> 
reporterConfigurations = new ArrayList<>(namedReporters.size());
+
+   for (String namedReporter: namedReporters) {
+   DelegatingConfiguration delegatingConfiguration = new 
DelegatingConfiguration(
+   configuration,
+   ConfigConstants.METRICS_REPORTER_PREFIX + 
namedReporter + '.');
 
+   reporterConfigurations.add(Tuple2.of(namedReporter, 
delegatingConfiguration));
+   }
+   return reporterConfigurations;
+   }
+
+   private static Set 
findEnabledReportersInConfiguration(Configuration configuration, String 
includedReportersString) {
+   Set includedReporters = 
reporterListPattern.splitAsStream(includedReportersString)
+   .filter(r -> !r.isEmpty()) // splitting an empty string 
results in an empty string on jdk9+
+   .collect(Collectors.toSet());
+
+   // use a TreeSet to make the reporter order deterministic, 
which is useful for testing
+   Set namedOrderedReporters = new 
TreeSet<>(String::compareTo);
+
+   // scan entire configuration for keys starting with 
METRICS_REPORTER_PREFIX and determine the set of enabled reporters
+   for (String key : configuration.keySet()) {
+   if 
(key.startsWith(ConfigConstants.METRICS_REPORTER_PREFIX)) {
+   Matcher matcher = 
reporterClassPattern.matcher(key);
+   if (matcher.matches()) {
+   String reporterName = matcher.group(1);
+   if (includedReporters.isEmpty() || 
includedReporters.contains(reporterName)) {
+   if 
(namedOrderedReporters.contains(reporterName)) {
+   LOG.warn("Duplicate 
class configuration detected for reporter {}.", reporterName);
+   } else {
+   
namedOrderedReporters.add(reporterName);
+   }
+   } else {
+   LOG.info("Excluding reporter 
{}, not configured in reporter list ({}).", reporterName, 
includedReportersString);
+   }
+   }
+   }
+   }
+   return namedOrderedReporters;
+   }
+
+   private static Map 
loadAvailableReporterFactories(PluginManager pluginManager) {
final Map reporterFactories = 
new HashMap<>(2);
-   final Iterator factoryIterator = 
serviceLoader.iterator();
+   final Iterator factoryIterator = 
getAllReporterFactories(pluginManager);
+   LOG.debug("All available factories (from both SPIs and 
Plugins):");
+   getAllReporterFactories(pluginManager).forEachRemaining(i -> 
LOG.debug(i.toString()));
// do not use streams or for-each loops here because they do 
not allow catching individual ServiceConfigurationErrors
// such an error might be caused if the META-INF/services 

[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-17 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r393580453
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
 ##
 @@ -178,36 +164,96 @@ private static ReporterSetup createReporterSetup(String 
reporterName, MetricConf
metricReporterOptional.ifPresent(reporter -> {
MetricConfig metricConfig = new 
MetricConfig();

reporterConfig.addAllToProperties(metricConfig);
-
-   
reporterArguments.add(createReporterSetup(reporterName, metricConfig, 
reporter));
+   
reporterSetups.add(createReporterSetup(reporterName, metricConfig, reporter));
});
-   }
-   catch (Throwable t) {
+   } catch (Throwable t) {
LOG.error("Could not instantiate metrics 
reporter {}. Metrics might not be exposed/reported.", reporterName, t);
}
}
-   return reporterArguments;
+   return reporterSetups;
}
 
-   private static Map 
loadReporterFactories() {
-   final ServiceLoader serviceLoader = 
ServiceLoader.load(MetricReporterFactory.class);
+   private static List> 
loadReporterConfigurations(Configuration configuration, Set 
namedReporters) {
+   final List> 
reporterConfigurations = new ArrayList<>(namedReporters.size());
+
+   for (String namedReporter: namedReporters) {
+   DelegatingConfiguration delegatingConfiguration = new 
DelegatingConfiguration(
+   configuration,
+   ConfigConstants.METRICS_REPORTER_PREFIX + 
namedReporter + '.');
 
+   reporterConfigurations.add(Tuple2.of(namedReporter, 
delegatingConfiguration));
+   }
+   return reporterConfigurations;
+   }
+
+   private static Set 
findEnabledReportersInConfiguration(Configuration configuration, String 
includedReportersString) {
+   Set includedReporters = 
reporterListPattern.splitAsStream(includedReportersString)
+   .filter(r -> !r.isEmpty()) // splitting an empty string 
results in an empty string on jdk9+
+   .collect(Collectors.toSet());
+
+   // use a TreeSet to make the reporter order deterministic, 
which is useful for testing
+   Set namedOrderedReporters = new 
TreeSet<>(String::compareTo);
+
+   // scan entire configuration for keys starting with 
METRICS_REPORTER_PREFIX and determine the set of enabled reporters
+   for (String key : configuration.keySet()) {
+   if 
(key.startsWith(ConfigConstants.METRICS_REPORTER_PREFIX)) {
+   Matcher matcher = 
reporterClassPattern.matcher(key);
+   if (matcher.matches()) {
+   String reporterName = matcher.group(1);
+   if (includedReporters.isEmpty() || 
includedReporters.contains(reporterName)) {
+   if 
(namedOrderedReporters.contains(reporterName)) {
+   LOG.warn("Duplicate 
class configuration detected for reporter {}.", reporterName);
+   } else {
+   
namedOrderedReporters.add(reporterName);
+   }
+   } else {
+   LOG.info("Excluding reporter 
{}, not configured in reporter list ({}).", reporterName, 
includedReportersString);
+   }
+   }
+   }
+   }
+   return namedOrderedReporters;
+   }
+
+   private static Map 
loadAvailableReporterFactories(PluginManager pluginManager) {
final Map reporterFactories = 
new HashMap<>(2);
-   final Iterator factoryIterator = 
serviceLoader.iterator();
+   final Iterator factoryIterator = 
getAllReporterFactories(pluginManager);
+   LOG.debug("All available factories (from both SPIs and 
Plugins):");
+   getAllReporterFactories(pluginManager).forEachRemaining(i -> 
LOG.debug(i.toString()));
// do not use streams or for-each loops here because they do 
not allow catching individual ServiceConfigurationErrors
// such an error might be caused if the META-INF/services 

[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-17 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r393578320
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java
 ##
 @@ -77,6 +77,7 @@
private Path conf;
 
 Review comment:
   Addressed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-17 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r393570847
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
 ##
 @@ -178,36 +164,96 @@ private static ReporterSetup createReporterSetup(String 
reporterName, MetricConf
metricReporterOptional.ifPresent(reporter -> {
MetricConfig metricConfig = new 
MetricConfig();

reporterConfig.addAllToProperties(metricConfig);
-
-   
reporterArguments.add(createReporterSetup(reporterName, metricConfig, 
reporter));
+   
reporterSetups.add(createReporterSetup(reporterName, metricConfig, reporter));
});
-   }
-   catch (Throwable t) {
+   } catch (Throwable t) {
LOG.error("Could not instantiate metrics 
reporter {}. Metrics might not be exposed/reported.", reporterName, t);
}
}
-   return reporterArguments;
+   return reporterSetups;
}
 
-   private static Map 
loadReporterFactories() {
-   final ServiceLoader serviceLoader = 
ServiceLoader.load(MetricReporterFactory.class);
+   private static List> 
loadReporterConfigurations(Configuration configuration, Set 
namedReporters) {
+   final List> 
reporterConfigurations = new ArrayList<>(namedReporters.size());
+
+   for (String namedReporter: namedReporters) {
+   DelegatingConfiguration delegatingConfiguration = new 
DelegatingConfiguration(
+   configuration,
+   ConfigConstants.METRICS_REPORTER_PREFIX + 
namedReporter + '.');
 
+   reporterConfigurations.add(Tuple2.of(namedReporter, 
delegatingConfiguration));
+   }
+   return reporterConfigurations;
+   }
+
+   private static Set 
findEnabledReportersInConfiguration(Configuration configuration, String 
includedReportersString) {
+   Set includedReporters = 
reporterListPattern.splitAsStream(includedReportersString)
+   .filter(r -> !r.isEmpty()) // splitting an empty string 
results in an empty string on jdk9+
+   .collect(Collectors.toSet());
+
+   // use a TreeSet to make the reporter order deterministic, 
which is useful for testing
+   Set namedOrderedReporters = new 
TreeSet<>(String::compareTo);
+
+   // scan entire configuration for keys starting with 
METRICS_REPORTER_PREFIX and determine the set of enabled reporters
+   for (String key : configuration.keySet()) {
+   if 
(key.startsWith(ConfigConstants.METRICS_REPORTER_PREFIX)) {
+   Matcher matcher = 
reporterClassPattern.matcher(key);
+   if (matcher.matches()) {
+   String reporterName = matcher.group(1);
+   if (includedReporters.isEmpty() || 
includedReporters.contains(reporterName)) {
+   if 
(namedOrderedReporters.contains(reporterName)) {
+   LOG.warn("Duplicate 
class configuration detected for reporter {}.", reporterName);
+   } else {
+   
namedOrderedReporters.add(reporterName);
+   }
+   } else {
+   LOG.info("Excluding reporter 
{}, not configured in reporter list ({}).", reporterName, 
includedReportersString);
+   }
+   }
+   }
+   }
+   return namedOrderedReporters;
+   }
+
+   private static Map 
loadAvailableReporterFactories(PluginManager pluginManager) {
final Map reporterFactories = 
new HashMap<>(2);
-   final Iterator factoryIterator = 
serviceLoader.iterator();
+   final Iterator factoryIterator = 
getAllReporterFactories(pluginManager);
+   LOG.debug("All available factories (from both SPIs and 
Plugins):");
+   getAllReporterFactories(pluginManager).forEachRemaining(i -> 
LOG.debug(i.toString()));
// do not use streams or for-each loops here because they do 
not allow catching individual ServiceConfigurationErrors
// such an error might be caused if the META-INF/services 

[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-17 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r393563258
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 ##
 @@ -160,13 +161,14 @@ public void startCluster() throws 
ClusterEntrypointException {
LOG.info("Starting {}.", getClass().getSimpleName());
 
try {
-
-   configureFileSystems(configuration);
+   //TODO: push down filesystem initialization into 
runCluster - initializeServices (?)
 
 Review comment:
   @AHeise I wanted to ask if what is written in TODO is a good idea in your 
opinion. I am not sure about the implications of initializing FileSystems 
within `runSecured`. I do not quite like that initialization if services 
(initializeServices) and  file systems happen in different places. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-16 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r393256771
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/core/plugin/PluginLoader.java
 ##
 @@ -69,7 +68,7 @@ public static PluginLoader create(PluginDescriptor 
pluginDescriptor, ClassLoader
 * @param  Type of the requested plugin service.
 * @return An iterator of all implementations of the given service 
interface that could be loaded from the plugin.
 */
-   public  Iterator load(Class service) {
+   public  Iterator load(Class service) {
 
 Review comment:
   Marking as resolved.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-03 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r386649079
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
 ##
 @@ -178,36 +164,96 @@ private static ReporterSetup createReporterSetup(String 
reporterName, MetricConf
metricReporterOptional.ifPresent(reporter -> {
MetricConfig metricConfig = new 
MetricConfig();

reporterConfig.addAllToProperties(metricConfig);
-
-   
reporterArguments.add(createReporterSetup(reporterName, metricConfig, 
reporter));
+   
reporterSetups.add(createReporterSetup(reporterName, metricConfig, reporter));
});
-   }
-   catch (Throwable t) {
+   } catch (Throwable t) {
LOG.error("Could not instantiate metrics 
reporter {}. Metrics might not be exposed/reported.", reporterName, t);
}
}
-   return reporterArguments;
+   return reporterSetups;
}
 
-   private static Map 
loadReporterFactories() {
-   final ServiceLoader serviceLoader = 
ServiceLoader.load(MetricReporterFactory.class);
+   private static List> 
loadReporterConfigurations(Configuration configuration, Set 
namedReporters) {
+   final List> 
reporterConfigurations = new ArrayList<>(namedReporters.size());
+
+   for (String namedReporter: namedReporters) {
+   DelegatingConfiguration delegatingConfiguration = new 
DelegatingConfiguration(
+   configuration,
+   ConfigConstants.METRICS_REPORTER_PREFIX + 
namedReporter + '.');
 
+   reporterConfigurations.add(Tuple2.of(namedReporter, 
delegatingConfiguration));
+   }
+   return reporterConfigurations;
+   }
+
+   private static Set 
findEnabledReportersInConfiguration(Configuration configuration, String 
includedReportersString) {
+   Set includedReporters = 
reporterListPattern.splitAsStream(includedReportersString)
+   .filter(r -> !r.isEmpty()) // splitting an empty string 
results in an empty string on jdk9+
+   .collect(Collectors.toSet());
+
+   // use a TreeSet to make the reporter order deterministic, 
which is useful for testing
+   Set namedOrderedReporters = new 
TreeSet<>(String::compareTo);
+
+   // scan entire configuration for keys starting with 
METRICS_REPORTER_PREFIX and determine the set of enabled reporters
+   for (String key : configuration.keySet()) {
+   if 
(key.startsWith(ConfigConstants.METRICS_REPORTER_PREFIX)) {
+   Matcher matcher = 
reporterClassPattern.matcher(key);
+   if (matcher.matches()) {
+   String reporterName = matcher.group(1);
+   if (includedReporters.isEmpty() || 
includedReporters.contains(reporterName)) {
+   if 
(namedOrderedReporters.contains(reporterName)) {
+   LOG.warn("Duplicate 
class configuration detected for reporter {}.", reporterName);
+   } else {
+   
namedOrderedReporters.add(reporterName);
+   }
+   } else {
+   LOG.info("Excluding reporter 
{}, not configured in reporter list ({}).", reporterName, 
includedReportersString);
+   }
+   }
+   }
+   }
+   return namedOrderedReporters;
+   }
+
+   private static Map 
loadAvailableReporterFactories(PluginManager pluginManager) {
final Map reporterFactories = 
new HashMap<>(2);
-   final Iterator factoryIterator = 
serviceLoader.iterator();
+   final Iterator factoryIterator = 
getAllReporterFactories(pluginManager);
+   LOG.debug("All available factories (from both SPIs and 
Plugins):");
+   getAllReporterFactories(pluginManager).forEachRemaining(i -> 
LOG.debug(i.toString()));
// do not use streams or for-each loops here because they do 
not allow catching individual ServiceConfigurationErrors
// such an error might be caused if the META-INF/services 

[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-02 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r386682800
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java
 ##
 @@ -185,6 +214,7 @@ public void testReporter() throws Exception {
 
checkMetricAvailability(client, 
"flink_jobmanager_numRegisteredTaskManagers");
checkMetricAvailability(client, 
"flink_taskmanager_Status_Network_TotalMemorySegments");
+
 
 Review comment:
   done


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-02 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r386644501
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
 ##
 @@ -119,7 +120,7 @@
 
private boolean shutdown;
 
-   public TaskManagerRunner(Configuration configuration, ResourceID 
resourceId) throws Exception {
+   public TaskManagerRunner(Configuration configuration, ResourceID 
resourceId, PluginManager pluginManager) throws Exception {
 
 Review comment:
   I am  not sure what should be the expected contract here. ReporterSetup will 
currently only work with the null pluginManager if no reporters are configured 
(namedReporters.isEmpty()). We could maybe add a checkNonNull in  the 
ReporterSetup after that first return due to empty namedReporters.
   
   (TaskManagerRunnerTest is now fixed to initialize the PluginManager 
properly) 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-02 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r386675530
 
 

 ##
 File path: 
flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java
 ##
 @@ -35,7 +35,7 @@
  * {@link MetricReporter} that exports {@link Metric Metrics} via Prometheus.
  */
 @PublicEvolving
-public class PrometheusReporter extends AbstractPrometheusReporter {
+public class PrometheusReporter extends AbstractPrometheusReporter implements 
MetricReporter {
 
 Review comment:
   missed that AbstractPrometheusReporter already implements it. Fixed. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-02 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r386663744
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
 ##
 @@ -178,36 +164,96 @@ private static ReporterSetup createReporterSetup(String 
reporterName, MetricConf
metricReporterOptional.ifPresent(reporter -> {
MetricConfig metricConfig = new 
MetricConfig();

reporterConfig.addAllToProperties(metricConfig);
-
-   
reporterArguments.add(createReporterSetup(reporterName, metricConfig, 
reporter));
+   
reporterSetups.add(createReporterSetup(reporterName, metricConfig, reporter));
});
-   }
-   catch (Throwable t) {
+   } catch (Throwable t) {
LOG.error("Could not instantiate metrics 
reporter {}. Metrics might not be exposed/reported.", reporterName, t);
}
}
-   return reporterArguments;
+   return reporterSetups;
}
 
-   private static Map 
loadReporterFactories() {
-   final ServiceLoader serviceLoader = 
ServiceLoader.load(MetricReporterFactory.class);
+   private static List> 
loadReporterConfigurations(Configuration configuration, Set 
namedReporters) {
+   final List> 
reporterConfigurations = new ArrayList<>(namedReporters.size());
+
+   for (String namedReporter: namedReporters) {
+   DelegatingConfiguration delegatingConfiguration = new 
DelegatingConfiguration(
+   configuration,
+   ConfigConstants.METRICS_REPORTER_PREFIX + 
namedReporter + '.');
 
+   reporterConfigurations.add(Tuple2.of(namedReporter, 
delegatingConfiguration));
+   }
+   return reporterConfigurations;
+   }
+
+   private static Set 
findEnabledReportersInConfiguration(Configuration configuration, String 
includedReportersString) {
+   Set includedReporters = 
reporterListPattern.splitAsStream(includedReportersString)
+   .filter(r -> !r.isEmpty()) // splitting an empty string 
results in an empty string on jdk9+
+   .collect(Collectors.toSet());
+
+   // use a TreeSet to make the reporter order deterministic, 
which is useful for testing
+   Set namedOrderedReporters = new 
TreeSet<>(String::compareTo);
+
+   // scan entire configuration for keys starting with 
METRICS_REPORTER_PREFIX and determine the set of enabled reporters
+   for (String key : configuration.keySet()) {
+   if 
(key.startsWith(ConfigConstants.METRICS_REPORTER_PREFIX)) {
+   Matcher matcher = 
reporterClassPattern.matcher(key);
+   if (matcher.matches()) {
+   String reporterName = matcher.group(1);
+   if (includedReporters.isEmpty() || 
includedReporters.contains(reporterName)) {
+   if 
(namedOrderedReporters.contains(reporterName)) {
+   LOG.warn("Duplicate 
class configuration detected for reporter {}.", reporterName);
+   } else {
+   
namedOrderedReporters.add(reporterName);
+   }
+   } else {
+   LOG.info("Excluding reporter 
{}, not configured in reporter list ({}).", reporterName, 
includedReportersString);
+   }
+   }
+   }
+   }
+   return namedOrderedReporters;
+   }
+
+   private static Map 
loadAvailableReporterFactories(PluginManager pluginManager) {
final Map reporterFactories = 
new HashMap<>(2);
-   final Iterator factoryIterator = 
serviceLoader.iterator();
+   final Iterator factoryIterator = 
getAllReporterFactories(pluginManager);
+   LOG.debug("All available factories (from both SPIs and 
Plugins):");
+   getAllReporterFactories(pluginManager).forEachRemaining(i -> 
LOG.debug(i.toString()));
// do not use streams or for-each loops here because they do 
not allow catching individual ServiceConfigurationErrors
// such an error might be caused if the META-INF/services 

[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-02 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r386662966
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
 ##
 @@ -178,36 +164,96 @@ private static ReporterSetup createReporterSetup(String 
reporterName, MetricConf
metricReporterOptional.ifPresent(reporter -> {
MetricConfig metricConfig = new 
MetricConfig();

reporterConfig.addAllToProperties(metricConfig);
-
-   
reporterArguments.add(createReporterSetup(reporterName, metricConfig, 
reporter));
+   
reporterSetups.add(createReporterSetup(reporterName, metricConfig, reporter));
});
-   }
-   catch (Throwable t) {
+   } catch (Throwable t) {
LOG.error("Could not instantiate metrics 
reporter {}. Metrics might not be exposed/reported.", reporterName, t);
}
}
-   return reporterArguments;
+   return reporterSetups;
}
 
-   private static Map 
loadReporterFactories() {
-   final ServiceLoader serviceLoader = 
ServiceLoader.load(MetricReporterFactory.class);
+   private static List> 
loadReporterConfigurations(Configuration configuration, Set 
namedReporters) {
+   final List> 
reporterConfigurations = new ArrayList<>(namedReporters.size());
+
+   for (String namedReporter: namedReporters) {
+   DelegatingConfiguration delegatingConfiguration = new 
DelegatingConfiguration(
+   configuration,
+   ConfigConstants.METRICS_REPORTER_PREFIX + 
namedReporter + '.');
 
+   reporterConfigurations.add(Tuple2.of(namedReporter, 
delegatingConfiguration));
+   }
+   return reporterConfigurations;
+   }
+
+   private static Set 
findEnabledReportersInConfiguration(Configuration configuration, String 
includedReportersString) {
+   Set includedReporters = 
reporterListPattern.splitAsStream(includedReportersString)
+   .filter(r -> !r.isEmpty()) // splitting an empty string 
results in an empty string on jdk9+
+   .collect(Collectors.toSet());
+
+   // use a TreeSet to make the reporter order deterministic, 
which is useful for testing
+   Set namedOrderedReporters = new 
TreeSet<>(String::compareTo);
+
+   // scan entire configuration for keys starting with 
METRICS_REPORTER_PREFIX and determine the set of enabled reporters
+   for (String key : configuration.keySet()) {
+   if 
(key.startsWith(ConfigConstants.METRICS_REPORTER_PREFIX)) {
+   Matcher matcher = 
reporterClassPattern.matcher(key);
+   if (matcher.matches()) {
+   String reporterName = matcher.group(1);
+   if (includedReporters.isEmpty() || 
includedReporters.contains(reporterName)) {
+   if 
(namedOrderedReporters.contains(reporterName)) {
+   LOG.warn("Duplicate 
class configuration detected for reporter {}.", reporterName);
+   } else {
+   
namedOrderedReporters.add(reporterName);
+   }
+   } else {
+   LOG.info("Excluding reporter 
{}, not configured in reporter list ({}).", reporterName, 
includedReportersString);
+   }
+   }
+   }
+   }
+   return namedOrderedReporters;
+   }
+
+   private static Map 
loadAvailableReporterFactories(PluginManager pluginManager) {
final Map reporterFactories = 
new HashMap<>(2);
-   final Iterator factoryIterator = 
serviceLoader.iterator();
+   final Iterator factoryIterator = 
getAllReporterFactories(pluginManager);
+   LOG.debug("All available factories (from both SPIs and 
Plugins):");
+   getAllReporterFactories(pluginManager).forEachRemaining(i -> 
LOG.debug(i.toString()));
 
 Review comment:
   done


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use 

[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-02 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r386662377
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
 ##
 @@ -273,3 +319,4 @@ private static ReporterSetup createReporterSetup(String 
reporterName, MetricConf
return Optional.of((MetricReporter) 
reporterClass.newInstance());
}
 }
+
 
 Review comment:
   done


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-02 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r386649079
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
 ##
 @@ -178,36 +164,96 @@ private static ReporterSetup createReporterSetup(String 
reporterName, MetricConf
metricReporterOptional.ifPresent(reporter -> {
MetricConfig metricConfig = new 
MetricConfig();

reporterConfig.addAllToProperties(metricConfig);
-
-   
reporterArguments.add(createReporterSetup(reporterName, metricConfig, 
reporter));
+   
reporterSetups.add(createReporterSetup(reporterName, metricConfig, reporter));
});
-   }
-   catch (Throwable t) {
+   } catch (Throwable t) {
LOG.error("Could not instantiate metrics 
reporter {}. Metrics might not be exposed/reported.", reporterName, t);
}
}
-   return reporterArguments;
+   return reporterSetups;
}
 
-   private static Map 
loadReporterFactories() {
-   final ServiceLoader serviceLoader = 
ServiceLoader.load(MetricReporterFactory.class);
+   private static List> 
loadReporterConfigurations(Configuration configuration, Set 
namedReporters) {
+   final List> 
reporterConfigurations = new ArrayList<>(namedReporters.size());
+
+   for (String namedReporter: namedReporters) {
+   DelegatingConfiguration delegatingConfiguration = new 
DelegatingConfiguration(
+   configuration,
+   ConfigConstants.METRICS_REPORTER_PREFIX + 
namedReporter + '.');
 
+   reporterConfigurations.add(Tuple2.of(namedReporter, 
delegatingConfiguration));
+   }
+   return reporterConfigurations;
+   }
+
+   private static Set 
findEnabledReportersInConfiguration(Configuration configuration, String 
includedReportersString) {
+   Set includedReporters = 
reporterListPattern.splitAsStream(includedReportersString)
+   .filter(r -> !r.isEmpty()) // splitting an empty string 
results in an empty string on jdk9+
+   .collect(Collectors.toSet());
+
+   // use a TreeSet to make the reporter order deterministic, 
which is useful for testing
+   Set namedOrderedReporters = new 
TreeSet<>(String::compareTo);
+
+   // scan entire configuration for keys starting with 
METRICS_REPORTER_PREFIX and determine the set of enabled reporters
+   for (String key : configuration.keySet()) {
+   if 
(key.startsWith(ConfigConstants.METRICS_REPORTER_PREFIX)) {
+   Matcher matcher = 
reporterClassPattern.matcher(key);
+   if (matcher.matches()) {
+   String reporterName = matcher.group(1);
+   if (includedReporters.isEmpty() || 
includedReporters.contains(reporterName)) {
+   if 
(namedOrderedReporters.contains(reporterName)) {
+   LOG.warn("Duplicate 
class configuration detected for reporter {}.", reporterName);
+   } else {
+   
namedOrderedReporters.add(reporterName);
+   }
+   } else {
+   LOG.info("Excluding reporter 
{}, not configured in reporter list ({}).", reporterName, 
includedReportersString);
+   }
+   }
+   }
+   }
+   return namedOrderedReporters;
+   }
+
+   private static Map 
loadAvailableReporterFactories(PluginManager pluginManager) {
final Map reporterFactories = 
new HashMap<>(2);
-   final Iterator factoryIterator = 
serviceLoader.iterator();
+   final Iterator factoryIterator = 
getAllReporterFactories(pluginManager);
+   LOG.debug("All available factories (from both SPIs and 
Plugins):");
+   getAllReporterFactories(pluginManager).forEachRemaining(i -> 
LOG.debug(i.toString()));
// do not use streams or for-each loops here because they do 
not allow catching individual ServiceConfigurationErrors
// such an error might be caused if the META-INF/services 

[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-02 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r386649079
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
 ##
 @@ -178,36 +164,96 @@ private static ReporterSetup createReporterSetup(String 
reporterName, MetricConf
metricReporterOptional.ifPresent(reporter -> {
MetricConfig metricConfig = new 
MetricConfig();

reporterConfig.addAllToProperties(metricConfig);
-
-   
reporterArguments.add(createReporterSetup(reporterName, metricConfig, 
reporter));
+   
reporterSetups.add(createReporterSetup(reporterName, metricConfig, reporter));
});
-   }
-   catch (Throwable t) {
+   } catch (Throwable t) {
LOG.error("Could not instantiate metrics 
reporter {}. Metrics might not be exposed/reported.", reporterName, t);
}
}
-   return reporterArguments;
+   return reporterSetups;
}
 
-   private static Map 
loadReporterFactories() {
-   final ServiceLoader serviceLoader = 
ServiceLoader.load(MetricReporterFactory.class);
+   private static List> 
loadReporterConfigurations(Configuration configuration, Set 
namedReporters) {
+   final List> 
reporterConfigurations = new ArrayList<>(namedReporters.size());
+
+   for (String namedReporter: namedReporters) {
+   DelegatingConfiguration delegatingConfiguration = new 
DelegatingConfiguration(
+   configuration,
+   ConfigConstants.METRICS_REPORTER_PREFIX + 
namedReporter + '.');
 
+   reporterConfigurations.add(Tuple2.of(namedReporter, 
delegatingConfiguration));
+   }
+   return reporterConfigurations;
+   }
+
+   private static Set 
findEnabledReportersInConfiguration(Configuration configuration, String 
includedReportersString) {
+   Set includedReporters = 
reporterListPattern.splitAsStream(includedReportersString)
+   .filter(r -> !r.isEmpty()) // splitting an empty string 
results in an empty string on jdk9+
+   .collect(Collectors.toSet());
+
+   // use a TreeSet to make the reporter order deterministic, 
which is useful for testing
+   Set namedOrderedReporters = new 
TreeSet<>(String::compareTo);
+
+   // scan entire configuration for keys starting with 
METRICS_REPORTER_PREFIX and determine the set of enabled reporters
+   for (String key : configuration.keySet()) {
+   if 
(key.startsWith(ConfigConstants.METRICS_REPORTER_PREFIX)) {
+   Matcher matcher = 
reporterClassPattern.matcher(key);
+   if (matcher.matches()) {
+   String reporterName = matcher.group(1);
+   if (includedReporters.isEmpty() || 
includedReporters.contains(reporterName)) {
+   if 
(namedOrderedReporters.contains(reporterName)) {
+   LOG.warn("Duplicate 
class configuration detected for reporter {}.", reporterName);
+   } else {
+   
namedOrderedReporters.add(reporterName);
+   }
+   } else {
+   LOG.info("Excluding reporter 
{}, not configured in reporter list ({}).", reporterName, 
includedReportersString);
+   }
+   }
+   }
+   }
+   return namedOrderedReporters;
+   }
+
+   private static Map 
loadAvailableReporterFactories(PluginManager pluginManager) {
final Map reporterFactories = 
new HashMap<>(2);
-   final Iterator factoryIterator = 
serviceLoader.iterator();
+   final Iterator factoryIterator = 
getAllReporterFactories(pluginManager);
+   LOG.debug("All available factories (from both SPIs and 
Plugins):");
+   getAllReporterFactories(pluginManager).forEachRemaining(i -> 
LOG.debug(i.toString()));
// do not use streams or for-each loops here because they do 
not allow catching individual ServiceConfigurationErrors
// such an error might be caused if the META-INF/services 

[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-02 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r386646692
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
 ##
 @@ -178,36 +164,96 @@ private static ReporterSetup createReporterSetup(String 
reporterName, MetricConf
metricReporterOptional.ifPresent(reporter -> {
MetricConfig metricConfig = new 
MetricConfig();

reporterConfig.addAllToProperties(metricConfig);
-
-   
reporterArguments.add(createReporterSetup(reporterName, metricConfig, 
reporter));
+   
reporterSetups.add(createReporterSetup(reporterName, metricConfig, reporter));
});
-   }
-   catch (Throwable t) {
+   } catch (Throwable t) {
LOG.error("Could not instantiate metrics 
reporter {}. Metrics might not be exposed/reported.", reporterName, t);
}
}
-   return reporterArguments;
+   return reporterSetups;
}
 
-   private static Map 
loadReporterFactories() {
-   final ServiceLoader serviceLoader = 
ServiceLoader.load(MetricReporterFactory.class);
+   private static List> 
loadReporterConfigurations(Configuration configuration, Set 
namedReporters) {
+   final List> 
reporterConfigurations = new ArrayList<>(namedReporters.size());
+
+   for (String namedReporter: namedReporters) {
+   DelegatingConfiguration delegatingConfiguration = new 
DelegatingConfiguration(
+   configuration,
+   ConfigConstants.METRICS_REPORTER_PREFIX + 
namedReporter + '.');
 
+   reporterConfigurations.add(Tuple2.of(namedReporter, 
delegatingConfiguration));
+   }
+   return reporterConfigurations;
+   }
+
+   private static Set 
findEnabledReportersInConfiguration(Configuration configuration, String 
includedReportersString) {
+   Set includedReporters = 
reporterListPattern.splitAsStream(includedReportersString)
+   .filter(r -> !r.isEmpty()) // splitting an empty string 
results in an empty string on jdk9+
+   .collect(Collectors.toSet());
+
+   // use a TreeSet to make the reporter order deterministic, 
which is useful for testing
+   Set namedOrderedReporters = new 
TreeSet<>(String::compareTo);
+
+   // scan entire configuration for keys starting with 
METRICS_REPORTER_PREFIX and determine the set of enabled reporters
+   for (String key : configuration.keySet()) {
+   if 
(key.startsWith(ConfigConstants.METRICS_REPORTER_PREFIX)) {
+   Matcher matcher = 
reporterClassPattern.matcher(key);
+   if (matcher.matches()) {
+   String reporterName = matcher.group(1);
+   if (includedReporters.isEmpty() || 
includedReporters.contains(reporterName)) {
+   if 
(namedOrderedReporters.contains(reporterName)) {
+   LOG.warn("Duplicate 
class configuration detected for reporter {}.", reporterName);
+   } else {
+   
namedOrderedReporters.add(reporterName);
+   }
+   } else {
+   LOG.info("Excluding reporter 
{}, not configured in reporter list ({}).", reporterName, 
includedReportersString);
+   }
+   }
+   }
+   }
+   return namedOrderedReporters;
+   }
+
+   private static Map 
loadAvailableReporterFactories(PluginManager pluginManager) {
final Map reporterFactories = 
new HashMap<>(2);
-   final Iterator factoryIterator = 
serviceLoader.iterator();
+   final Iterator factoryIterator = 
getAllReporterFactories(pluginManager);
+   LOG.debug("All available factories (from both SPIs and 
Plugins):");
+   getAllReporterFactories(pluginManager).forEachRemaining(i -> 
LOG.debug(i.toString()));
// do not use streams or for-each loops here because they do 
not allow catching individual ServiceConfigurationErrors
// such an error might be caused if the META-INF/services 

[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-02 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r386644501
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
 ##
 @@ -119,7 +120,7 @@
 
private boolean shutdown;
 
-   public TaskManagerRunner(Configuration configuration, ResourceID 
resourceId) throws Exception {
+   public TaskManagerRunner(Configuration configuration, ResourceID 
resourceId, PluginManager pluginManager) throws Exception {
 
 Review comment:
   I am  not sure what should be the expected contract here. ReporterSetup will 
currently only work with the null pluginManager if no reporters are configured 
(namedReporters.isEmpty()). We could maybe add a checkNonNull in  the 
ReporterSetup after the first return due to empty namedReporters.
   
   (TaskManagerRunnerTest is now fixed to initialize the PluginManager 
properly) 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-02 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r386630929
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
 ##
 @@ -178,36 +164,96 @@ private static ReporterSetup createReporterSetup(String 
reporterName, MetricConf
metricReporterOptional.ifPresent(reporter -> {
MetricConfig metricConfig = new 
MetricConfig();

reporterConfig.addAllToProperties(metricConfig);
-
-   
reporterArguments.add(createReporterSetup(reporterName, metricConfig, 
reporter));
+   
reporterSetups.add(createReporterSetup(reporterName, metricConfig, reporter));
});
-   }
-   catch (Throwable t) {
+   } catch (Throwable t) {
LOG.error("Could not instantiate metrics 
reporter {}. Metrics might not be exposed/reported.", reporterName, t);
}
}
-   return reporterArguments;
+   return reporterSetups;
}
 
-   private static Map 
loadReporterFactories() {
-   final ServiceLoader serviceLoader = 
ServiceLoader.load(MetricReporterFactory.class);
+   private static List> 
loadReporterConfigurations(Configuration configuration, Set 
namedReporters) {
+   final List> 
reporterConfigurations = new ArrayList<>(namedReporters.size());
+
+   for (String namedReporter: namedReporters) {
+   DelegatingConfiguration delegatingConfiguration = new 
DelegatingConfiguration(
+   configuration,
+   ConfigConstants.METRICS_REPORTER_PREFIX + 
namedReporter + '.');
 
+   reporterConfigurations.add(Tuple2.of(namedReporter, 
delegatingConfiguration));
+   }
+   return reporterConfigurations;
+   }
+
+   private static Set 
findEnabledReportersInConfiguration(Configuration configuration, String 
includedReportersString) {
+   Set includedReporters = 
reporterListPattern.splitAsStream(includedReportersString)
+   .filter(r -> !r.isEmpty()) // splitting an empty string 
results in an empty string on jdk9+
+   .collect(Collectors.toSet());
+
+   // use a TreeSet to make the reporter order deterministic, 
which is useful for testing
+   Set namedOrderedReporters = new 
TreeSet<>(String::compareTo);
+
+   // scan entire configuration for keys starting with 
METRICS_REPORTER_PREFIX and determine the set of enabled reporters
+   for (String key : configuration.keySet()) {
+   if 
(key.startsWith(ConfigConstants.METRICS_REPORTER_PREFIX)) {
+   Matcher matcher = 
reporterClassPattern.matcher(key);
+   if (matcher.matches()) {
+   String reporterName = matcher.group(1);
+   if (includedReporters.isEmpty() || 
includedReporters.contains(reporterName)) {
+   if 
(namedOrderedReporters.contains(reporterName)) {
+   LOG.warn("Duplicate 
class configuration detected for reporter {}.", reporterName);
+   } else {
+   
namedOrderedReporters.add(reporterName);
+   }
+   } else {
+   LOG.info("Excluding reporter 
{}, not configured in reporter list ({}).", reporterName, 
includedReportersString);
+   }
+   }
+   }
+   }
+   return namedOrderedReporters;
+   }
+
+   private static Map 
loadAvailableReporterFactories(PluginManager pluginManager) {
final Map reporterFactories = 
new HashMap<>(2);
-   final Iterator factoryIterator = 
serviceLoader.iterator();
+   final Iterator factoryIterator = 
getAllReporterFactories(pluginManager);
+   LOG.debug("All available factories (from both SPIs and 
Plugins):");
+   getAllReporterFactories(pluginManager).forEachRemaining(i -> 
LOG.debug(i.toString()));
// do not use streams or for-each loops here because they do 
not allow catching individual ServiceConfigurationErrors
// such an error might be caused if the META-INF/services 

[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-02 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r386628387
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/FlinkDistribution.java
 ##
 @@ -38,11 +38,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedReader;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStreamReader;
+import java.io.*;
 
 Review comment:
   fixed


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-02 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r386620764
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java
 ##
 @@ -92,7 +92,7 @@ private static Configuration createConfiguration() {
}
 
private static TaskManagerRunner createTaskManagerRunner(final 
Configuration configuration) throws Exception {
-   TaskManagerRunner taskManagerRunner = new 
TaskManagerRunner(configuration, ResourceID.generate());
+   TaskManagerRunner taskManagerRunner = new 
TaskManagerRunner(configuration, ResourceID.generate(), null);
 
 Review comment:
   adjusted


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-02 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r386613981
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
 ##
 @@ -178,36 +164,96 @@ private static ReporterSetup createReporterSetup(String 
reporterName, MetricConf
metricReporterOptional.ifPresent(reporter -> {
MetricConfig metricConfig = new 
MetricConfig();

reporterConfig.addAllToProperties(metricConfig);
-
-   
reporterArguments.add(createReporterSetup(reporterName, metricConfig, 
reporter));
+   
reporterSetups.add(createReporterSetup(reporterName, metricConfig, reporter));
});
-   }
-   catch (Throwable t) {
+   } catch (Throwable t) {
 
 Review comment:
   Is this an accepted style in Flink? I mostly see "classic" variant with `} 
catch ...` (including the same class in loadReporterFactories() method.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-02 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r386613981
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
 ##
 @@ -178,36 +164,96 @@ private static ReporterSetup createReporterSetup(String 
reporterName, MetricConf
metricReporterOptional.ifPresent(reporter -> {
MetricConfig metricConfig = new 
MetricConfig();

reporterConfig.addAllToProperties(metricConfig);
-
-   
reporterArguments.add(createReporterSetup(reporterName, metricConfig, 
reporter));
+   
reporterSetups.add(createReporterSetup(reporterName, metricConfig, reporter));
});
-   }
-   catch (Throwable t) {
+   } catch (Throwable t) {
 
 Review comment:
   Is this an accepted style in Flink? I mostly see "classic" variant with `} 
catch ...` , including the same class in loadReporterFactories() method.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] Use plugins mechanism for initializing MetricReporters

2020-03-02 Thread GitBox
afedulov commented on a change in pull request #11195: [FLINK-16222][runtime] 
Use plugins mechanism for initializing MetricReporters
URL: https://github.com/apache/flink/pull/11195#discussion_r386604078
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
 ##
 @@ -120,55 +124,37 @@ public static ReporterSetup forReporter(String 
reporterName, MetricConfig metric
}
 
private static ReporterSetup createReporterSetup(String reporterName, 
MetricConfig metricConfig, MetricReporter reporter) {
-   LOG.info("Configuring {} with {}.", reporterName, metricConfig);
+   LOG.debug("Configuring {} with {}.", reporterName, 
metricConfig);
 
 Review comment:
   Good point. I am not 100% sure about it - do we have a Flink-wide way to 
handle such cases? I guess the cleanest approach would be to have a special set 
of keys that are considered sensitive, which have to be obfuscated prior to 
logging. Seeing which other config values are used during the initialization 
could be generally pretty useful for debugging. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services