In this case, I was using a harness to test the function. Although, I could honestly care less about the unit-test surrounding metrics, I'm much more concerned with having something that will actually run and work as intended within a job. The only real concern I have or problem that I want to solve is building metrics that may vary based on the data coming in from a "label" perspective (e.g. keeping track of the events I've seen for a given tenant, or some other properties).
Something like: <metric prefix>_events_seen { tenant = "tenant-1" } 1.0 <metric prefix>_events_seen { tenant = "tenant-2" } 200.0 If that makes sense. I've used the Prometheus client previously to accomplish these types of metrics, but since I'm fairly new to the Flink world, I was trying to use the built-in constructs available (thus the dynamic groups / metrics being added). On Tue, Mar 16, 2021 at 9:36 AM Chesnay Schepler <ches...@apache.org> wrote: > Are you actually running a job, or are you using a harness for testing > your function? > > On 3/16/2021 3:24 PM, Rion Williams wrote: > > Hi Chesnay, > > Thanks for the prompt response and feedback, it's very much appreciated. > Please see the inline responses below to your questions: > > *Was there anything in the logs (ideally on debug)?* > > > I didn't see anything within the logs that seemed to indicate anything out > of the ordinary. I'm currently using a MiniClusterResources for this and > attempted to set the logging levels to pick up everything (i.e. ALL), but > if there's a way to expose more, I'm not aware of it. > > *Have you debugged the execution and followed the counter() calls all the >> way to the reporter?* > > > With the debugger, I traced one of the counter initializations and it > seems that no reporters were being found within the register call in the > MetricsRegistryImpl (i.e. this.reporters has no registered reporters): > > if (this.reporters != null) { > for(int i = 0; i < this.reporters.size(); ++i) { > MetricRegistryImpl.ReporterAndSettings reporterAndSettings = > (MetricRegistryImpl.ReporterAndSettings)this.reporters.get(i); > > try { > if (reporterAndSettings != null) { > FrontMetricGroup front = new > FrontMetricGroup(reporterAndSettings.getSettings(), group); > reporterAndSettings.getReporter().notifyOfAddedMetric(metric, > metricName, front); > } > } catch (Exception var11) { > LOG.warn("Error while registering metric: {}.", metricName, > var11); > } > } > } > > Perhaps this is an error on my part as I had assumed the following would > be sufficient to register my reporter (within a local / minicluster > environment): > > private val metricsConfiguration = Configuration.fromMap(mutableMapOf( > ConfigConstants.METRICS_REPORTER_PREFIX + > "MockCustomMetricsReporter." + > ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX to > MockCustomMetricsReporter::class.java.name)) > @ClassRule@JvmFieldval flink = MiniClusterResource( > MiniClusterResourceConfiguration.Builder() > .setConfiguration(metricsConfiguration) > .setNumberTaskManagers(1) > .setNumberSlotsPerTaskManager(1) > .build() > ) > > However, it's clearly being recognized for the built-in metrics, just not > these custom ones that are being registered as they are triggering the > notifyOfAddedMetric() function within the reporter itself. > > *Do you only see JobManager metrics, or is there somewhere also something >> about the TaskManager?* > > > It looks like there are metrics coming from both the JobManager and > TaskManagers from the following examples that were coming out: > > localhost.jobmanager.numRegisteredTaskManagers > .taskmanager.ad5aaade-cd54-44f4-a099-a765b53f79b4.Status.Shuffle.Netty.UsedMemorySegments > .taskmanager.ad5aaade-cd54-44f4-a099-a765b53f79b4.Status.JVM.Memory.Metaspace.Committed > localhost.jobmanager.Status.JVM.Memory.Direct.Count > > I do agree that a factory implementation with a static reporter would > likely be a better approach, so I may explore that a bit more. As well as > adding some changes to the existing, albeit ghetto, implementation for > handling the dynamic metrics. I did see several references to a > MetricRegistry class, however I wasn't sure if that was the most > appropriate place to add this type of functionality or if it was needed at > all. > > Thanks much, > > Rion > > > > On Tue, Mar 16, 2021 at 4:45 AM Chesnay Schepler <ches...@apache.org> > wrote: > >> Was there anything in the logs (ideally on debug)? >> Have you debugged the execution and followed the counter() calls all the >> way to the reporter? >> Do you only see JobManager metrics, or is there somewhere also something >> about the TaskManager? >> >> I can see several issues with your code, but none that would fully >> explain the issue: >> >> a) your reporter is not thread-safe >> b) you only differentiate metrics by name, which will lead to quite a few >> collisions. >> >> Be also aware that there will be 2 reporter instances; one for the JM and >> one for the TM. >> To remedy this, I would recommend creating a factory that returns a >> static reporter instance instead; overall this tends to be cleaner. >> >> Alternatively, when using the testing harnesses IIRC you can also set set >> a custom MetricGroup implementation. >> >> On 3/16/2021 4:13 AM, Rion Williams wrote: >> >> Hi all, >> >> Recently, I was working on adding some custom metrics to a Flink job that >> required the use of dynamic labels (i.e. capturing various counters that >> were "slicable" by things like tenant / source, etc.). >> >> I ended up handling it in a very naive fashion that would just keep a >> dictionary of metrics that had already been registered and update them >> accordingly which looked something like this: >> >> class MyCustomProcessFunction: ProcessFunction<Event, Unit>() { >> private lateinit var metrics: CustomMetricsRegistry override fun >> open(parameters: Configuration) { >> metrics = CustomMetricsRegistry(runtimeContext.metricGroup) >> } >> >> override fun processElement(event: Event, context: Context, collector: >> Collector<Unit>) { >> // Insert calls like metrics.inc("tenant-name", 4) here } >> } >> class CustomMetricsRegistry(private val metricGroup: MetricGroup): >> Serializable { >> // Increments a given metric by key fun inc(metric: String, tenant: >> String, amount: Long = 1) { >> // Store a key for the metric val key = "$metric-$tenant" >> // Store/register the metric if >> (!registeredMetrics.containsKey(key)){ >> registeredMetrics[key] = metricGroup >> .addGroup("tenant", tenant) >> .counter(metric) >> } >> >> // Update the metric by a given amount >> registeredMetrics[key]!!.inc(amount) >> } >> >> companion object { >> private var registeredMetrics: HashMap<String, Counter> = hashMapOf() >> } >> } >> >> Basically registering and updating new metrics for tenants as they are >> encountered, which I've seen being emitted as expected via hitting the >> appropriately configured metrics endpoint (using a PrometheusReporter). >> >> However, while I was trying to write a few unit tests for this, I seemed >> to encounter an issue. I was following a Stack Overflow post that was >> answered by @Chesnay Schepler <ches...@apache.org> [0] that described >> the use of an in-memory/embedded Flink cluster and a custom reporter that >> would statically expose the underlying metrics. >> >> So I took a shot at implementing something similar as follows: >> >> *Flink Cluster Definition* >> >> private val metricsConfiguration = Configuration.fromMap(mutableMapOf( >> ConfigConstants.METRICS_REPORTER_PREFIX + >> "MockCustomMetricsReporter." + >> ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX to >> MockCustomMetricsReporter::class.java.name)) >> @ClassRule@JvmFieldval flinkCluster = MiniClusterResource( >> MiniClusterResourceConfiguration.Builder() >> .setConfiguration(metricsConfiguration) >> .setNumberTaskManagers(1) >> .setNumberSlotsPerTaskManager(1) >> .build() >> ) >> >> *Custom Reporter* >> >> class MockCustomMetricsReporter : MetricReporter { >> >> override fun open(metricConfig: MetricConfig) {} >> override fun close() {} >> override fun notifyOfAddedMetric(metric: Metric, name: String, >> metricGroup: MetricGroup) { >> // Store the metrics that are being registered as we see them >> if (!registeredCustomMetrics.containsKey(name)){ >> registeredCustomMetrics[name] = metric } >> } >> >> override fun notifyOfRemovedMetric(metric: Metric, name: String, >> metricGroup: MetricGroup) { >> // Do nothing here } >> >> companion object { >> // Static reference to metrics as they are registered var >> registeredCustomMetrics = HashMap<String, Metric>() >> } >> } >> >> *Example Test* >> >> @Testfun `Example Metrics Use Case`(){ >> // Arrange val stream = >> StreamExecutionEnvironment.getExecutionEnvironment() >> val events = listOf( >> eventWithUsers("tenant1", "us...@testing.com"), >> eventWithUsers("tenant2", "us...@testing.com"), >> ) >> >> // Act stream >> .fromCollection(events) >> .process(MyCustomProcessFunction()) >> >> // Assert stream.execute() >> assertTrue(MockCustomMetricsReporter.registeredCustomMetrics.size > 0) >> } >> >> While this test will pass, *the problem is that the custom metrics >> defined dynamically (via the CustomMetricsRegistry implementation) do not >> appear within the registeredCustomMetrics collection*. In fact, there >> are 21 metrics that get registered but all of them appear to be classic >> out-of-the-box metrics such as CPU usage, number of task managers, load, >> various other Netty and JVM stats, but no custom metrics are included. >> >> I've tried multiple different configurations, implementations via a >> custom TestHarness, etc. but for some reason the custom metrics being >> defined are never triggering the notifyOfAddedMetric function which >> would be responsible for adding them to the static collection to be >> asserted against. >> >> Any ideas / guidance would be more than welcome. Perhaps a different >> approach? Based off examples I've encountered, the code seems like it >> should "just work". >> >> Thanks much, >> >> Rion >> >> [0] : >> https://stackoverflow.com/questions/51675597/how-to-unitest-gauge-metrics-in-flink >> >> >> >> >