In this case, I was using a harness to test the function. Although, I could
honestly care less about the unit-test surrounding metrics, I'm much more
concerned with having something that will actually run and work as intended
within a job. The only real concern I have or problem that I want to solve
is building metrics that may vary based on the data coming in from a
"label" perspective (e.g. keeping track of the events I've seen for a given
tenant, or some other properties).

Something like:

<metric prefix>_events_seen { tenant = "tenant-1" } 1.0
<metric prefix>_events_seen { tenant = "tenant-2" } 200.0

If that makes sense. I've used the Prometheus client previously to
accomplish these types of metrics, but since I'm fairly new to the Flink
world, I was trying to use the built-in constructs available (thus the
dynamic groups / metrics being added).

On Tue, Mar 16, 2021 at 9:36 AM Chesnay Schepler <ches...@apache.org> wrote:

> Are you actually running a job, or are you using a harness for testing
> your function?
>
> On 3/16/2021 3:24 PM, Rion Williams wrote:
>
> Hi Chesnay,
>
> Thanks for the prompt response and feedback, it's very much appreciated.
> Please see the inline responses below to your questions:
>
> *Was there anything in the logs (ideally on debug)?*
>
>
> I didn't see anything within the logs that seemed to indicate anything out
> of the ordinary. I'm currently using a MiniClusterResources for this and
> attempted to set the logging levels to pick up everything (i.e. ALL), but
> if there's a way to expose more, I'm not aware of it.
>
> *Have you debugged the execution and followed the counter() calls all the
>> way to the reporter?*
>
>
> With the debugger, I traced one of the counter initializations and it
> seems that no reporters were being found within the register call in the
> MetricsRegistryImpl (i.e. this.reporters has no registered reporters):
>
> if (this.reporters != null) {
>     for(int i = 0; i < this.reporters.size(); ++i) {
>         MetricRegistryImpl.ReporterAndSettings reporterAndSettings = 
> (MetricRegistryImpl.ReporterAndSettings)this.reporters.get(i);
>
>         try {
>             if (reporterAndSettings != null) {
>                 FrontMetricGroup front = new 
> FrontMetricGroup(reporterAndSettings.getSettings(), group);
>                 reporterAndSettings.getReporter().notifyOfAddedMetric(metric, 
> metricName, front);
>             }
>         } catch (Exception var11) {
>             LOG.warn("Error while registering metric: {}.", metricName, 
> var11);
>         }
>     }
> }
>
>  Perhaps this is an error on my part as I had assumed the following would
> be sufficient to register my reporter (within a local / minicluster
> environment):
>
> private val metricsConfiguration = Configuration.fromMap(mutableMapOf(
>     ConfigConstants.METRICS_REPORTER_PREFIX +
>     "MockCustomMetricsReporter." +
>     ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX to 
> MockCustomMetricsReporter::class.java.name))
> @ClassRule@JvmFieldval flink = MiniClusterResource(
>     MiniClusterResourceConfiguration.Builder()
>         .setConfiguration(metricsConfiguration)
>         .setNumberTaskManagers(1)
>         .setNumberSlotsPerTaskManager(1)
>         .build()
> )
>
> However, it's clearly being recognized for the built-in metrics, just not
> these custom ones that are being registered as they are triggering the
> notifyOfAddedMetric() function within the reporter itself.
>
> *Do you only see JobManager metrics, or is there somewhere also something
>> about the TaskManager?*
>
>
> It looks like there are metrics coming from both the JobManager and
> TaskManagers from the following examples that were coming out:
>
> localhost.jobmanager.numRegisteredTaskManagers
> .taskmanager.ad5aaade-cd54-44f4-a099-a765b53f79b4.Status.Shuffle.Netty.UsedMemorySegments
> .taskmanager.ad5aaade-cd54-44f4-a099-a765b53f79b4.Status.JVM.Memory.Metaspace.Committed
> localhost.jobmanager.Status.JVM.Memory.Direct.Count
>
> I do agree that a factory implementation with a static reporter would
> likely be a better approach, so I may explore that a bit more. As well as
> adding some changes to the existing, albeit ghetto, implementation for
> handling the dynamic metrics. I did see several references to a
> MetricRegistry class, however I wasn't sure if that was the most
> appropriate place to add this type of functionality or if it was needed at
> all.
>
> Thanks much,
>
> Rion
>
>
>
> On Tue, Mar 16, 2021 at 4:45 AM Chesnay Schepler <ches...@apache.org>
> wrote:
>
>> Was there anything in the logs (ideally on debug)?
>> Have you debugged the execution and followed the counter() calls all the
>> way to the reporter?
>> Do you only see JobManager metrics, or is there somewhere also something
>> about the TaskManager?
>>
>> I can see several issues with your code, but none that would fully
>> explain the issue:
>>
>> a) your reporter is not thread-safe
>> b) you only differentiate metrics by name, which will lead to quite a few
>> collisions.
>>
>> Be also aware that there will be 2 reporter instances; one for the JM and
>> one for the TM.
>> To remedy this, I would recommend creating a factory that returns a
>> static reporter instance instead; overall this tends to be cleaner.
>>
>> Alternatively, when using the testing harnesses IIRC you can also set set
>> a custom MetricGroup implementation.
>>
>> On 3/16/2021 4:13 AM, Rion Williams wrote:
>>
>> Hi all,
>>
>> Recently, I was working on adding some custom metrics to a Flink job that
>> required the use of dynamic labels (i.e. capturing various counters that
>> were "slicable" by things like tenant / source, etc.).
>>
>> I ended up handling it in a very naive fashion that would just keep a
>> dictionary of metrics that had already been registered and update them
>> accordingly which looked something like this:
>>
>> class MyCustomProcessFunction: ProcessFunction<Event, Unit>() {
>>     private lateinit var metrics: CustomMetricsRegistry    override fun 
>> open(parameters: Configuration) {
>>         metrics = CustomMetricsRegistry(runtimeContext.metricGroup)
>>     }
>>
>>     override fun processElement(event: Event, context: Context, collector: 
>> Collector<Unit>) {
>>         // Insert calls like metrics.inc("tenant-name", 4) here    }
>> }
>> class CustomMetricsRegistry(private val metricGroup: MetricGroup): 
>> Serializable {
>>     // Increments a given metric by key    fun inc(metric: String, tenant: 
>> String, amount: Long = 1) {
>>         // Store a key for the metric        val key = "$metric-$tenant"     
>>    // Store/register the metric        if 
>> (!registeredMetrics.containsKey(key)){
>>             registeredMetrics[key] = metricGroup                
>> .addGroup("tenant", tenant)
>>                 .counter(metric)
>>         }
>>
>>         // Update the metric by a given amount        
>> registeredMetrics[key]!!.inc(amount)
>>     }
>>
>>     companion object {
>>         private var registeredMetrics: HashMap<String, Counter> = hashMapOf()
>>     }
>> }
>>
>> Basically registering and updating new metrics for tenants as they are
>> encountered, which I've seen being emitted as expected via hitting the
>> appropriately configured metrics endpoint (using a PrometheusReporter).
>>
>> However, while I was trying to write a few unit tests for this, I seemed
>> to encounter an issue. I was following a Stack Overflow post that was
>> answered by @Chesnay Schepler <ches...@apache.org> [0] that described
>> the use of an in-memory/embedded Flink cluster and a custom reporter that
>> would statically expose the underlying metrics.
>>
>> So I took a shot at implementing something similar as follows:
>>
>> *Flink Cluster Definition*
>>
>> private val metricsConfiguration = Configuration.fromMap(mutableMapOf(
>>     ConfigConstants.METRICS_REPORTER_PREFIX +
>>     "MockCustomMetricsReporter." +
>>     ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX to 
>> MockCustomMetricsReporter::class.java.name))
>> @ClassRule@JvmFieldval flinkCluster = MiniClusterResource(
>>     MiniClusterResourceConfiguration.Builder()
>>         .setConfiguration(metricsConfiguration)
>>         .setNumberTaskManagers(1)
>>         .setNumberSlotsPerTaskManager(1)
>>         .build()
>> )
>>
>> *Custom Reporter*
>>
>> class MockCustomMetricsReporter : MetricReporter {
>>
>>     override fun open(metricConfig: MetricConfig) {}
>>     override fun close() {}
>>     override fun notifyOfAddedMetric(metric: Metric, name: String, 
>> metricGroup: MetricGroup) {
>>         // Store the metrics that are being registered as we see them        
>> if (!registeredCustomMetrics.containsKey(name)){
>>             registeredCustomMetrics[name] = metric        }
>>     }
>>
>>     override fun notifyOfRemovedMetric(metric: Metric, name: String, 
>> metricGroup: MetricGroup) {
>>         // Do nothing here    }
>>
>>     companion object {
>>         // Static reference to metrics as they are registered        var 
>> registeredCustomMetrics = HashMap<String, Metric>()
>>     }
>> }
>>
>> *Example Test*
>>
>> @Testfun `Example Metrics Use Case`(){
>>     // Arrange    val stream = 
>> StreamExecutionEnvironment.getExecutionEnvironment()
>>     val events = listOf(
>>         eventWithUsers("tenant1", "us...@testing.com"),
>>         eventWithUsers("tenant2", "us...@testing.com"),
>>     )
>>
>>     // Act    stream
>>         .fromCollection(events)
>>         .process(MyCustomProcessFunction())
>>
>>     // Assert    stream.execute()
>>     assertTrue(MockCustomMetricsReporter.registeredCustomMetrics.size > 0)
>> }
>>
>> While this test will pass, *the problem is that the custom metrics
>> defined dynamically (via the CustomMetricsRegistry implementation) do not
>> appear within the registeredCustomMetrics collection*. In fact, there
>> are 21 metrics that get registered but all of them appear to be classic
>> out-of-the-box metrics such as CPU usage, number of task managers, load,
>> various other Netty and JVM stats, but no custom metrics are included.
>>
>> I've tried multiple different configurations, implementations via a
>> custom TestHarness, etc. but for some reason the custom metrics being
>> defined are never triggering the notifyOfAddedMetric function which
>> would be responsible for adding them to the static collection to be
>> asserted against.
>>
>> Any ideas / guidance would be more than welcome. Perhaps a different
>> approach? Based off examples I've encountered, the code seems like it
>> should "just work".
>>
>> Thanks much,
>>
>> Rion
>>
>> [0] :
>> https://stackoverflow.com/questions/51675597/how-to-unitest-gauge-metrics-in-flink
>>
>>
>>
>>
>

Reply via email to