Github user stoader commented on a diff in the pull request: https://github.com/apache/spark/pull/19775#discussion_r167359085 --- Diff: core/src/main/java/org/apache/spark/metrics/prometheus/client/exporter/PushGatewayWithTimestamp.java --- @@ -0,0 +1,320 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.metrics.prometheus.client.exporter; + +import io.prometheus.client.Collector; +import io.prometheus.client.CollectorRegistry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.HttpURLConnection; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.net.URL; +import java.net.URLEncoder; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; + +/** + * Export metrics via the Prometheus Pushgateway. + * <p> + * The Prometheus Pushgateway exists to allow ephemeral and + * batch jobs to expose their metrics to Prometheus. + * Since these kinds of jobs may not exist long enough to be scraped, + * they can instead push their metrics to a Pushgateway. + * This class allows pushing the contents of a {@link CollectorRegistry} to + * a Pushgateway. + * <p> + * Example usage: + * <pre> + * {@code + * void executeBatchJob() throws Exception { + * CollectorRegistry registry = new CollectorRegistry(); + * Gauge duration = Gauge.build() + * .name("my_batch_job_duration_seconds") + * .help("Duration of my batch job in seconds.") + * .register(registry); + * Gauge.Timer durationTimer = duration.startTimer(); + * try { + * // Your code here. + * + * // This is only added to the registry after success, + * // so that a previous success in the Pushgateway isn't overwritten on failure. + * Gauge lastSuccess = Gauge.build() + * .name("my_batch_job_last_success") + * .help("Last time my batch job succeeded, in unixtime.") + * .register(registry); + * lastSuccess.setToCurrentTime(); + * } finally { + * durationTimer.setDuration(); + * PushGatewayWithTimestamp pg = new PushGatewayWithTimestamp("127.0.0.1:9091"); + * pg.pushAdd(registry, "my_batch_job"); + * } + * } + * } + * </pre> + * <p> + * See <a href="https://github.com/prometheus/pushgateway"> + * https://github.com/prometheus/pushgateway</a> + */ +public class PushGatewayWithTimestamp { + + private static final Logger logger = LoggerFactory.getLogger(PushGatewayWithTimestamp.class); + private final String address; + private static final int SECONDS_PER_MILLISECOND = 1000; + /** + * Construct a Pushgateway, with the given address. + * <p> + * @param address host:port or ip:port of the Pushgateway. + */ + public PushGatewayWithTimestamp(String address) { + this.address = address; + } + + /** + * Pushes all metrics in a registry, + * replacing all those with the same job and no grouping key. + * <p> + * This uses the PUT HTTP method. + */ + public void push(CollectorRegistry registry, String job) throws IOException { + doRequest(registry, job, null, "PUT", null); + } + + /** + * Pushes all metrics in a Collector, + * replacing all those with the same job and no grouping key. + * <p> + * This is useful for pushing a single Gauge. + * <p> + * This uses the PUT HTTP method. + */ + public void push(Collector collector, String job) throws IOException { + CollectorRegistry registry = new CollectorRegistry(); + collector.register(registry); + push(registry, job); + } + + /** + * Pushes all metrics in a registry, + * replacing all those with the same job and grouping key. + * <p> + * This uses the PUT HTTP method. + */ + public void push(CollectorRegistry registry, + String job, Map<String, String> groupingKey) throws IOException { + doRequest(registry, job, groupingKey, "PUT", null); + } + + /** + * Pushes all metrics in a Collector, replacing all those with the same job and grouping key. + * <p> + * This is useful for pushing a single Gauge. + * <p> + * This uses the PUT HTTP method. + */ + public void push(Collector collector, + String job, Map<String, String> groupingKey) throws IOException { + CollectorRegistry registry = new CollectorRegistry(); + collector.register(registry); + push(registry, job, groupingKey); + } + + /** + * Pushes all metrics in a registry, + * replacing only previously pushed metrics of the same name and job and no grouping key. + * <p> + * This uses the POST HTTP method. + */ + public void pushAdd(CollectorRegistry registry, + String job, String timestamp) throws IOException { + doRequest(registry, job, null, "POST", timestamp); + } + + /** + * Pushes all metrics in a Collector, + * replacing only previously pushed metrics of the same name and job and no grouping key. + * <p> + * This is useful for pushing a single Gauge. + * <p> + * This uses the POST HTTP method. + */ + public void pushAdd(Collector collector, String job) throws IOException { + CollectorRegistry registry = new CollectorRegistry(); + collector.register(registry); + pushAdd(registry, job, ""); + } + + /** + * Pushes all metrics in a registry, + * replacing only previously pushed metrics of the same name, job and grouping key. + * <p> + * This uses the POST HTTP method. + */ + public void pushAdd(CollectorRegistry registry,String job, + Map<String, String> groupingKey, String timestamp) throws IOException { + doRequest(registry, job, groupingKey, "POST", timestamp); + } + + /** + * Pushes all metrics in a Collector, + * replacing only previously pushed metrics of the same name, job and grouping key. + * <p> + * This is useful for pushing a single Gauge. + * <p> + * This uses the POST HTTP method. + */ + public void pushAdd(Collector collector, String job, + Map<String, String> groupingKey) throws IOException { + CollectorRegistry registry = new CollectorRegistry(); + collector.register(registry); + pushAdd(registry, job, groupingKey, null); + } + + + /** + * Deletes metrics from the Pushgateway. + * <p> + * Deletes metrics with no grouping key and the provided job. + * This uses the DELETE HTTP method. + */ + public void delete(String job) throws IOException { + doRequest(null, job, null, "DELETE", null); + } + + /** + * Deletes metrics from the Pushgateway. + * <p> + * Deletes metrics with the provided job and grouping key. + * This uses the DELETE HTTP method. + */ + public void delete(String job, Map<String, String> groupingKey) throws IOException { + doRequest(null, job, groupingKey, "DELETE", null); + } + + + /** + * Pushes all metrics in a registry, replacing all those with the same job and instance. + * <p> + * This uses the PUT HTTP method. + * @deprecated use {@link #push(CollectorRegistry, String, Map)} + */ + @Deprecated --- End diff -- Removed deprecated methods.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org