[GitHub] flink pull request #4725: [FLINK-7689] [Streaming Connectors] Added metrics ...

2018-05-31 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4725#discussion_r192223629
  
--- Diff: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
 ---
@@ -207,13 +238,18 @@ public void writeRecord(Row row) throws IOException {
 
if (batchCount >= batchInterval) {
// execute batch
+   batchLimitReachedMeter.markEvent();
--- End diff --

this seems redundant given that `flushMeter` exists. While the job is 
running `batchLimit == flushMeter`, and at the end `batchLimit == flushMeter 
-1` except in the exceedingly rare case that the total number of rows fits 
perfectly into the batches.


---


[GitHub] flink pull request #4725: [FLINK-7689] [Streaming Connectors] Added metrics ...

2018-05-31 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4725#discussion_r19445
  
--- Diff: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
 ---
@@ -82,6 +97,22 @@ public void open(int taskNumber, int numTasks) throws 
IOException {
} catch (ClassNotFoundException cnfe) {
throw new IllegalArgumentException("JDBC driver class 
not found.", cnfe);
}
+   this.flushMeter = getRuntimeContext()
+   .getMetricGroup()
+   .addGroup(FLUSH_SCOPE)
+   .meter(FLUSH_RATE_METER_NAME, new 
DropwizardMeterWrapper(new com.codahale.metrics.Meter()));
--- End diff --

These could be replaced with the built-in `MeterView`.


---


[GitHub] flink pull request #4725: [FLINK-7689] [Streaming Connectors] Added metrics ...

2018-05-31 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4725#discussion_r192224177
  
--- Diff: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
 ---
@@ -207,13 +238,18 @@ public void writeRecord(Row row) throws IOException {
 
if (batchCount >= batchInterval) {
// execute batch
+   batchLimitReachedMeter.markEvent();
flush();
}
}
 
void flush() {
try {
+   flushMeter.markEvent();
+   flushBatchCountHisto.update(batchCount);
+   long before = System.currentTimeMillis();
upload.executeBatch();
+   flushDurationMsHisto.update(System.currentTimeMillis() 
- before);
--- End diff --

This may result in a negative duration.


---


[GitHub] flink pull request #4725: [FLINK-7689] [Streaming Connectors] Added metrics ...

2018-05-31 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4725#discussion_r19854
  
--- Diff: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
 ---
@@ -41,6 +46,11 @@
 public class JDBCOutputFormat extends RichOutputFormat {
private static final long serialVersionUID = 1L;
static final int DEFAULT_BATCH_INTERVAL = 5000;
+   static final String FLUSH_SCOPE = "flush";
+   static final String FLUSH_RATE_METER_NAME = "rate";
--- End diff --

I'm not convinced of the naming scheme. I would replace `FLUSH_SCOPE` with 
"jdbc", and explicitly prefix the rate and duration metrics with "flush".


---


[GitHub] flink pull request #4725: [FLINK-7689] [Streaming Connectors] Added metrics ...

2018-05-31 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4725#discussion_r19532
  
--- Diff: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
 ---
@@ -58,6 +68,11 @@
 
private int[] typesArray;
 
+   private Meter batchLimitReachedMeter;
+   private Meter flushMeter;
--- End diff --

These could be initialized in the constructor and made final.


---


[GitHub] flink pull request #4725: [FLINK-7689] [Streaming Connectors] Added metrics ...

2018-05-31 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4725#discussion_r19379
  
--- Diff: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
 ---
@@ -82,6 +97,22 @@ public void open(int taskNumber, int numTasks) throws 
IOException {
} catch (ClassNotFoundException cnfe) {
throw new IllegalArgumentException("JDBC driver class 
not found.", cnfe);
}
+   this.flushMeter = getRuntimeContext()
+   .getMetricGroup()
+   .addGroup(FLUSH_SCOPE)
+   .meter(FLUSH_RATE_METER_NAME, new 
DropwizardMeterWrapper(new com.codahale.metrics.Meter()));
+   this.batchLimitReachedMeter = getRuntimeContext()
+   .getMetricGroup()
+   .addGroup(FLUSH_SCOPE)
+   .meter(BATCH_LIMIT_REACHED_RATE_METER_NAME, new 
DropwizardMeterWrapper(new com.codahale.metrics.Meter()));
+   this.flushDurationMsHisto = getRuntimeContext()
+   .getMetricGroup()
+   .addGroup(FLUSH_SCOPE)
+   .histogram(FLUSH_DURATION_HISTO_NAME, new 
DropwizardHistogramWrapper(new com.codahale.metrics.Histogram(new 
ExponentiallyDecayingReservoir(;
--- End diff --

I recommend staying away form histograms as long as possible. Most metric 
backends recommend to _not_ build histograms in the application, but let the 
backend handle it.


---


[GitHub] flink pull request #4725: [FLINK-7689] [Streaming Connectors] Added metrics ...

2018-05-31 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4725#discussion_r19079
  
--- Diff: flink-connectors/flink-jdbc/pom.xml ---
@@ -59,5 +59,11 @@ under the License.
10.10.1.1
test

+
+   
+   org.apache.flink
+   flink-metrics-dropwizard
--- End diff --

We should exclusively rely on built-in metrics.


---


[GitHub] flink pull request #4725: [FLINK-7689] [Streaming Connectors] Added metrics ...

2017-10-05 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4725#discussion_r142918146
  
--- Diff: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
 ---
@@ -41,6 +46,11 @@
 public class JDBCOutputFormat extends RichOutputFormat {
private static final long serialVersionUID = 1L;
static final int DEFAULT_BATCH_INTERVAL = 5000;
+   static final String FLUSH_SCOPE = "flush";
+   static final String FLUSH_RATE_METER_NAME = "rate";
+   static final String FLUSH_RATE_GR_BATCH_INT_METER_NAME = 
"rateGreaterThanBatchInterval";
--- End diff --

rename to `batchLimitReachedRate`?


---


[GitHub] flink pull request #4725: [FLINK-7689] [Streaming Connectors] Added metrics ...

2017-10-05 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4725#discussion_r142919779
  
--- Diff: 
flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
 ---
@@ -233,6 +255,30 @@ public void testFlush() throws SQLException, 
IOException {
}
}
 
+   @Test
+   public void testMetricsSetup() throws IOException {
--- End diff --

Can you extend this test to check that the metrics are correctly set 
(except for the `durationMs` histogram)?


---


[GitHub] flink pull request #4725: [FLINK-7689] [Streaming Connectors] Added metrics ...

2017-09-26 Thread asicoe
GitHub user asicoe opened a pull request:

https://github.com/apache/flink/pull/4725

[FLINK-7689] [Streaming Connectors] Added metrics to JDBCOutputFormat in 
order to be able to m…

Added metrics to JDBCOutputFormat in order to be able to measure jdbc batch 
flush rate, latency and size.

## Contribution Checklist

  - Make sure that the pull request corresponds to a [JIRA 
issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are 
made for typos in JavaDoc or documentation files, which need no JIRA issue.
  
  - Name the pull request in the form "[FLINK-] [component] Title of 
the pull request", where *FLINK-* should be replaced by the actual issue 
number. Skip *component* if you are unsure about which is the best component.
  Typo fixes that have no associated JIRA issue should be named following 
this pattern: `[hotfix] [docs] Fix typo in event time introduction` or 
`[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.

  - Fill out the template below to describe the changes contributed by the 
pull request. That will give reviewers the context they need to do the review.
  
  - Make sure that the change passes the automated tests, i.e., `mvn clean 
verify` passes. You can set up Travis CI to do that following [this 
guide](http://flink.apache.org/contribute-code.html#best-practices).

  - Each pull request should address only one issue, not mix up code from 
multiple issues.
  
  - Each commit in the pull request has a meaningful commit message 
(including the JIRA id)

  - Once all items of the checklist are addressed, remove the above text 
and this checklist, leaving only the filled out template below.


**(The sections below can be removed for hotfixes of typos)**

## What is the purpose of the change
This pull request adds some useful metrics to the flink-jdbc sink.

## Brief change log

- Added metrics to JDBCOutputFormat in order to be able to measure jdbc 
batch flush rate, latency and size.

## Verifying this change

This change added tests and can be verified as follows:

Check the Task Metrics section of the Flink UI of a job that uses the 
flink-jdbc and flink-table 1.4-SNAPSHOT dependencies and creates a 
JDBCAppendTableSink instance and uses it's emitDataStream method to write out a 
stream of messages to a JDBC enabled destination.

These metrics have been successfully used in a dev environment running a 
Flink streaming app for the past 1 month, in conjunction with 
statsd/graphite/graphana.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? not applicable



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/asicoe/flink FLINK-7689_instrument_jdbc_sink

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4725.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4725


commit c5baa1d4fb8947383c601817f84dc2e2cfb7f26a
Author: Alex Sicoe 
Date:   2017-09-26T08:01:40Z

FLINK-7689 Added metrics to JDBCOutputFormat in order to be able to measure 
jdbc batch flush rate, latency and size.




---