[FLINK-4920] Introduce Scala Function Gauge

This closes #3080.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/570dbc8d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/570dbc8d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/570dbc8d

Branch: refs/heads/master
Commit: 570dbc8d2597ee9688579f399b8743636e70f891
Parents: b36b43b
Author: heytitle <pat.chor...@gmail.com>
Authored: Tue Dec 27 23:21:19 2016 +0100
Committer: zentol <ches...@apache.org>
Committed: Thu Jan 19 23:57:22 2017 +0100

----------------------------------------------------------------------
 docs/monitoring/metrics.md                      | 22 ++++++++++++
 .../flink/api/scala/metrics/ScalaGauge.scala    | 34 ++++++++++++++++++
 .../api/scala/metrics/ScalaGaugeTest.scala      | 36 ++++++++++++++++++++
 3 files changed, 92 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/570dbc8d/docs/monitoring/metrics.md
----------------------------------------------------------------------
diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index afbce90..578c926 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -83,6 +83,8 @@ A `Gauge` provides a value of any type on demand. In order to 
use a `Gauge` you
 There is no restriction for the type of the returned value.
 You can register a gauge by calling `gauge(String name, Gauge gauge)` on a 
`MetricGroup`.
 
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
 {% highlight java %}
 
 public class MyMapper extends RichMapFunction<String, Integer> {
@@ -102,6 +104,26 @@ public class MyMapper extends RichMapFunction<String, 
Integer> {
 }
 
 {% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+
+public class MyMapper extends RichMapFunction[String,Int] {
+  val valueToExpose = 5
+
+  override def open(parameters: Configuration): Unit = {
+    getRuntimeContext()
+      .getMetricGroup()
+      .gauge("MyGauge", ScalaGauge[Int]( () => valueToExpose ) )
+  }
+  ...
+}
+
+{% endhighlight %}
+</div>
+
+</div>
 
 Note that reporters will turn the exposed object into a `String`, which means 
that a meaningful `toString()` implementation is required.
 

http://git-wip-us.apache.org/repos/asf/flink/blob/570dbc8d/flink-scala/src/main/scala/org/apache/flink/api/scala/metrics/ScalaGauge.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/metrics/ScalaGauge.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/metrics/ScalaGauge.scala
new file mode 100644
index 0000000..e2f9ebf
--- /dev/null
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/metrics/ScalaGauge.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.metrics
+
+import org.apache.flink.metrics.Gauge
+
+/**
+  * This class allows the concise definition of a gauge from Scala using 
function references.
+  */
+class ScalaGauge[T](func: () => T) extends Gauge[T] {
+  override def getValue: T = {
+    func()
+  }
+}
+
+object ScalaGauge {
+  def apply[T](func: () => T): ScalaGauge[T] = new ScalaGauge(func)
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/570dbc8d/flink-scala/src/test/scala/org/apache/flink/api/scala/metrics/ScalaGaugeTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/test/scala/org/apache/flink/api/scala/metrics/ScalaGaugeTest.scala
 
b/flink-scala/src/test/scala/org/apache/flink/api/scala/metrics/ScalaGaugeTest.scala
new file mode 100644
index 0000000..9d53e4c
--- /dev/null
+++ 
b/flink-scala/src/test/scala/org/apache/flink/api/scala/metrics/ScalaGaugeTest.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.metrics
+
+import org.apache.flink.metrics.Gauge
+import org.apache.flink.runtime.metrics.{MetricRegistry, 
MetricRegistryConfiguration}
+import org.apache.flink.runtime.metrics.groups.GenericMetricGroup
+import org.apache.flink.util.TestLogger
+import org.junit.Test
+import org.scalatest.junit.JUnitSuiteLike
+
+class ScalaGaugeTest extends TestLogger with JUnitSuiteLike {
+
+  @Test
+  def testGaugeCorrectValue(): Unit = {
+    val myGauge = ScalaGauge[Long](() => 4)
+    assert(myGauge.getValue == 4)
+  }
+
+}

Reply via email to