[GitHub] flink pull request #4725: [FLINK-7689] [Streaming Connectors] Added metrics ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4725#discussion_r192223629 --- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java --- @@ -207,13 +238,18 @@ public void writeRecord(Row row) throws IOException { if (batchCount >= batchInterval) { // execute batch + batchLimitReachedMeter.markEvent(); --- End diff -- this seems redundant given that `flushMeter` exists. While the job is running `batchLimit == flushMeter`, and at the end `batchLimit == flushMeter -1` except in the exceedingly rare case that the total number of rows fits perfectly into the batches. ---
[GitHub] flink pull request #4725: [FLINK-7689] [Streaming Connectors] Added metrics ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4725#discussion_r19445 --- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java --- @@ -82,6 +97,22 @@ public void open(int taskNumber, int numTasks) throws IOException { } catch (ClassNotFoundException cnfe) { throw new IllegalArgumentException("JDBC driver class not found.", cnfe); } + this.flushMeter = getRuntimeContext() + .getMetricGroup() + .addGroup(FLUSH_SCOPE) + .meter(FLUSH_RATE_METER_NAME, new DropwizardMeterWrapper(new com.codahale.metrics.Meter())); --- End diff -- These could be replaced with the built-in `MeterView`. ---
[GitHub] flink pull request #4725: [FLINK-7689] [Streaming Connectors] Added metrics ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4725#discussion_r192224177 --- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java --- @@ -207,13 +238,18 @@ public void writeRecord(Row row) throws IOException { if (batchCount >= batchInterval) { // execute batch + batchLimitReachedMeter.markEvent(); flush(); } } void flush() { try { + flushMeter.markEvent(); + flushBatchCountHisto.update(batchCount); + long before = System.currentTimeMillis(); upload.executeBatch(); + flushDurationMsHisto.update(System.currentTimeMillis() - before); --- End diff -- This may result in a negative duration. ---
[GitHub] flink pull request #4725: [FLINK-7689] [Streaming Connectors] Added metrics ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4725#discussion_r19854 --- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java --- @@ -41,6 +46,11 @@ public class JDBCOutputFormat extends RichOutputFormat { private static final long serialVersionUID = 1L; static final int DEFAULT_BATCH_INTERVAL = 5000; + static final String FLUSH_SCOPE = "flush"; + static final String FLUSH_RATE_METER_NAME = "rate"; --- End diff -- I'm not convinced of the naming scheme. I would replace `FLUSH_SCOPE` with "jdbc", and explicitly prefix the rate and duration metrics with "flush". ---
[GitHub] flink pull request #4725: [FLINK-7689] [Streaming Connectors] Added metrics ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4725#discussion_r19532 --- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java --- @@ -58,6 +68,11 @@ private int[] typesArray; + private Meter batchLimitReachedMeter; + private Meter flushMeter; --- End diff -- These could be initialized in the constructor and made final. ---
[GitHub] flink pull request #4725: [FLINK-7689] [Streaming Connectors] Added metrics ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4725#discussion_r19379 --- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java --- @@ -82,6 +97,22 @@ public void open(int taskNumber, int numTasks) throws IOException { } catch (ClassNotFoundException cnfe) { throw new IllegalArgumentException("JDBC driver class not found.", cnfe); } + this.flushMeter = getRuntimeContext() + .getMetricGroup() + .addGroup(FLUSH_SCOPE) + .meter(FLUSH_RATE_METER_NAME, new DropwizardMeterWrapper(new com.codahale.metrics.Meter())); + this.batchLimitReachedMeter = getRuntimeContext() + .getMetricGroup() + .addGroup(FLUSH_SCOPE) + .meter(BATCH_LIMIT_REACHED_RATE_METER_NAME, new DropwizardMeterWrapper(new com.codahale.metrics.Meter())); + this.flushDurationMsHisto = getRuntimeContext() + .getMetricGroup() + .addGroup(FLUSH_SCOPE) + .histogram(FLUSH_DURATION_HISTO_NAME, new DropwizardHistogramWrapper(new com.codahale.metrics.Histogram(new ExponentiallyDecayingReservoir(; --- End diff -- I recommend staying away form histograms as long as possible. Most metric backends recommend to _not_ build histograms in the application, but let the backend handle it. ---
[GitHub] flink pull request #4725: [FLINK-7689] [Streaming Connectors] Added metrics ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4725#discussion_r19079 --- Diff: flink-connectors/flink-jdbc/pom.xml --- @@ -59,5 +59,11 @@ under the License. 10.10.1.1 test + + + org.apache.flink + flink-metrics-dropwizard --- End diff -- We should exclusively rely on built-in metrics. ---
[GitHub] flink pull request #4725: [FLINK-7689] [Streaming Connectors] Added metrics ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4725#discussion_r142918146 --- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java --- @@ -41,6 +46,11 @@ public class JDBCOutputFormat extends RichOutputFormat { private static final long serialVersionUID = 1L; static final int DEFAULT_BATCH_INTERVAL = 5000; + static final String FLUSH_SCOPE = "flush"; + static final String FLUSH_RATE_METER_NAME = "rate"; + static final String FLUSH_RATE_GR_BATCH_INT_METER_NAME = "rateGreaterThanBatchInterval"; --- End diff -- rename to `batchLimitReachedRate`? ---
[GitHub] flink pull request #4725: [FLINK-7689] [Streaming Connectors] Added metrics ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4725#discussion_r142919779 --- Diff: flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java --- @@ -233,6 +255,30 @@ public void testFlush() throws SQLException, IOException { } } + @Test + public void testMetricsSetup() throws IOException { --- End diff -- Can you extend this test to check that the metrics are correctly set (except for the `durationMs` histogram)? ---
[GitHub] flink pull request #4725: [FLINK-7689] [Streaming Connectors] Added metrics ...
GitHub user asicoe opened a pull request: https://github.com/apache/flink/pull/4725 [FLINK-7689] [Streaming Connectors] Added metrics to JDBCOutputFormat in order to be able to m⦠Added metrics to JDBCOutputFormat in order to be able to measure jdbc batch flush rate, latency and size. ## Contribution Checklist - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue. - Name the pull request in the form "[FLINK-] [component] Title of the pull request", where *FLINK-* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component. Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`. - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review. - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Travis CI to do that following [this guide](http://flink.apache.org/contribute-code.html#best-practices). - Each pull request should address only one issue, not mix up code from multiple issues. - Each commit in the pull request has a meaningful commit message (including the JIRA id) - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below. **(The sections below can be removed for hotfixes of typos)** ## What is the purpose of the change This pull request adds some useful metrics to the flink-jdbc sink. ## Brief change log - Added metrics to JDBCOutputFormat in order to be able to measure jdbc batch flush rate, latency and size. ## Verifying this change This change added tests and can be verified as follows: Check the Task Metrics section of the Flink UI of a job that uses the flink-jdbc and flink-table 1.4-SNAPSHOT dependencies and creates a JDBCAppendTableSink instance and uses it's emitDataStream method to write out a stream of messages to a JDBC enabled destination. These metrics have been successfully used in a dev environment running a Flink streaming app for the past 1 month, in conjunction with statsd/graphite/graphana. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable You can merge this pull request into a Git repository by running: $ git pull https://github.com/asicoe/flink FLINK-7689_instrument_jdbc_sink Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4725.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4725 commit c5baa1d4fb8947383c601817f84dc2e2cfb7f26a Author: Alex Sicoe Date: 2017-09-26T08:01:40Z FLINK-7689 Added metrics to JDBCOutputFormat in order to be able to measure jdbc batch flush rate, latency and size. ---