Github user stoader commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19775#discussion_r167359067
  
    --- Diff: 
core/src/main/java/org/apache/spark/metrics/prometheus/client/exporter/PushGatewayWithTimestamp.java
 ---
    @@ -0,0 +1,320 @@
    +/*
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.metrics.prometheus.client.exporter;
    +
    +import io.prometheus.client.Collector;
    +import io.prometheus.client.CollectorRegistry;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.net.HttpURLConnection;
    +import java.net.InetAddress;
    +import java.net.UnknownHostException;
    +import java.net.URL;
    +import java.net.URLEncoder;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import java.io.BufferedWriter;
    +import java.io.IOException;
    +import java.io.OutputStreamWriter;
    +
    +/**
    + * Export metrics via the Prometheus Pushgateway.
    + * <p>
    + * The Prometheus Pushgateway exists to allow ephemeral and
    + * batch jobs to expose their metrics to Prometheus.
    + * Since these kinds of jobs may not exist long enough to be scraped,
    + * they can instead push their metrics to a Pushgateway.
    + * This class allows pushing the contents of a {@link CollectorRegistry} to
    + * a Pushgateway.
    + * <p>
    + * Example usage:
    + * <pre>
    + * {@code
    + *   void executeBatchJob() throws Exception {
    + *     CollectorRegistry registry = new CollectorRegistry();
    + *     Gauge duration = Gauge.build()
    + *         .name("my_batch_job_duration_seconds")
    + *         .help("Duration of my batch job in seconds.")
    + *         .register(registry);
    + *     Gauge.Timer durationTimer = duration.startTimer();
    + *     try {
    + *       // Your code here.
    + *
    + *       // This is only added to the registry after success,
    + *       // so that a previous success in the Pushgateway isn't 
overwritten on failure.
    + *       Gauge lastSuccess = Gauge.build()
    + *           .name("my_batch_job_last_success")
    + *           .help("Last time my batch job succeeded, in unixtime.")
    + *           .register(registry);
    + *       lastSuccess.setToCurrentTime();
    + *     } finally {
    + *       durationTimer.setDuration();
    + *       PushGatewayWithTimestamp pg = new 
PushGatewayWithTimestamp("127.0.0.1:9091");
    + *       pg.pushAdd(registry, "my_batch_job");
    + *     }
    + *   }
    + * }
    + * </pre>
    + * <p>
    + * See <a href="https://github.com/prometheus/pushgateway";>
    + *     https://github.com/prometheus/pushgateway</a>
    + */
    +public class PushGatewayWithTimestamp {
    +
    +    private static final Logger logger = 
LoggerFactory.getLogger(PushGatewayWithTimestamp.class);
    +    private final String address;
    +    private static final int SECONDS_PER_MILLISECOND = 1000;
    +    /**
    +     * Construct a Pushgateway, with the given address.
    +     * <p>
    +     * @param address  host:port or ip:port of the Pushgateway.
    +     */
    +    public PushGatewayWithTimestamp(String address) {
    +        this.address = address;
    +    }
    +
    +    /**
    +     * Pushes all metrics in a registry,
    +     * replacing all those with the same job and no grouping key.
    +     * <p>
    +     * This uses the PUT HTTP method.
    +     */
    +    public void push(CollectorRegistry registry, String job) throws 
IOException {
    +        doRequest(registry, job, null, "PUT", null);
    +    }
    +
    +    /**
    +     * Pushes all metrics in a Collector,
    +     * replacing all those with the same job and no grouping key.
    +     * <p>
    +     * This is useful for pushing a single Gauge.
    +     * <p>
    +     * This uses the PUT HTTP method.
    +     */
    +    public void push(Collector collector, String job) throws IOException {
    +        CollectorRegistry registry = new CollectorRegistry();
    +        collector.register(registry);
    +        push(registry, job);
    +    }
    +
    +    /**
    +     * Pushes all metrics in a registry,
    +     * replacing all those with the same job and grouping key.
    +     * <p>
    +     * This uses the PUT HTTP method.
    +     */
    +    public void push(CollectorRegistry registry,
    +                     String job, Map<String, String> groupingKey) throws 
IOException {
    +        doRequest(registry, job, groupingKey, "PUT", null);
    +    }
    +
    +    /**
    +     * Pushes all metrics in a Collector, replacing all those with the 
same job and grouping key.
    +     * <p>
    +     * This is useful for pushing a single Gauge.
    +     * <p>
    +     * This uses the PUT HTTP method.
    +     */
    +    public void push(Collector collector,
    +                     String job, Map<String, String> groupingKey) throws 
IOException {
    +        CollectorRegistry registry = new CollectorRegistry();
    +        collector.register(registry);
    +        push(registry, job, groupingKey);
    +    }
    +
    +    /**
    +     * Pushes all metrics in a registry,
    +     * replacing only previously pushed metrics of the same name and job 
and no grouping key.
    +     * <p>
    +     * This uses the POST HTTP method.
    +     */
    +    public void pushAdd(CollectorRegistry registry,
    +                        String job, String timestamp) throws IOException {
    +        doRequest(registry, job, null, "POST", timestamp);
    +    }
    +
    +    /**
    +     * Pushes all metrics in a Collector,
    +     * replacing only previously pushed metrics of the same name and job 
and no grouping key.
    +     * <p>
    +     * This is useful for pushing a single Gauge.
    +     * <p>
    +     * This uses the POST HTTP method.
    +     */
    +    public void pushAdd(Collector collector, String job) throws 
IOException {
    +        CollectorRegistry registry = new CollectorRegistry();
    +        collector.register(registry);
    +        pushAdd(registry, job, "");
    +    }
    +
    +    /**
    +     * Pushes all metrics in a registry,
    +     * replacing only previously pushed metrics of the same name, job and 
grouping key.
    +     * <p>
    +     * This uses the POST HTTP method.
    +     */
    +    public void pushAdd(CollectorRegistry registry,String job,
    +                        Map<String, String> groupingKey, String timestamp) 
throws IOException {
    +        doRequest(registry, job, groupingKey, "POST", timestamp);
    +    }
    +
    +    /**
    +     * Pushes all metrics in a Collector,
    +     * replacing only previously pushed metrics of the same name, job and 
grouping key.
    +     * <p>
    +     * This is useful for pushing a single Gauge.
    +     * <p>
    +     * This uses the POST HTTP method.
    +     */
    +    public void pushAdd(Collector collector, String job,
    +                        Map<String, String> groupingKey) throws 
IOException {
    +        CollectorRegistry registry = new CollectorRegistry();
    +        collector.register(registry);
    +        pushAdd(registry, job, groupingKey, null);
    +    }
    +
    +
    +    /**
    +     * Deletes metrics from the Pushgateway.
    +     * <p>
    +     * Deletes metrics with no grouping key and the provided job.
    +     * This uses the DELETE HTTP method.
    +     */
    +    public void delete(String job) throws IOException {
    +        doRequest(null, job, null, "DELETE", null);
    +    }
    +
    +    /**
    +     * Deletes metrics from the Pushgateway.
    +     * <p>
    +     * Deletes metrics with the provided job and grouping key.
    +     * This uses the DELETE HTTP method.
    +     */
    +    public void delete(String job, Map<String, String> groupingKey) throws 
IOException {
    +        doRequest(null, job, groupingKey, "DELETE", null);
    +    }
    +
    +
    +    /**
    +     * Pushes all metrics in a registry, replacing all those with the same 
job and instance.
    +     * <p>
    +     * This uses the PUT HTTP method.
    +     * @deprecated use {@link #push(CollectorRegistry, String, Map)}
    +     */
    +    @Deprecated
    +    public void push(CollectorRegistry registry, String job, String 
instance) throws IOException {
    +        push(registry, job, Collections.singletonMap("instance", 
instance));
    +    }
    +
    +    /**
    +     * Pushes all metrics in a Collector, replacing all those with the 
same job and instance.
    +     * <p>
    +     * This is useful for pushing a single Gauge.
    +     * <p>
    +     * This uses the PUT HTTP method.
    +     * @deprecated use {@link #push(Collector, String, Map)}
    +     */
    +    @Deprecated
    +    public void push(Collector collector, String job, String instance) 
throws IOException {
    +        push(collector, job, Collections.singletonMap("instance", 
instance));
    +    }
    +
    +    /**
    +     * Pushes all metrics in a Collector,
    +     * replacing only previously pushed metrics of the same name.
    +     * <p>
    +     * This is useful for pushing a single Gauge.
    +     * <p>
    +     * This uses the POST HTTP method.
    +     * @deprecated use {@link #pushAdd(Collector, String, Map)}
    +     */
    +    @Deprecated
    +    public void pushAdd(Collector collector, String job, String instance) 
throws IOException {
    +        pushAdd(collector, job, Collections.singletonMap("instance", 
instance));
    +    }
    +
    +    /**
    +     * Deletes metrics from the Pushgateway.
    +     * <p>
    +     * This uses the DELETE HTTP method.
    +     * @deprecated use {@link #delete(String, Map)}
    +     */
    +    @Deprecated
    +    public void delete(String job, String instance) throws IOException {
    +        delete(job, Collections.singletonMap("instance", instance));
    +    }
    +
    +    void doRequest(CollectorRegistry registry, String job, Map<String,
    +            String> groupingKey, String method, String timestamp) throws 
IOException {
    +        String url = address + "/metrics/job/" + URLEncoder.encode(job, 
"UTF-8");
    +        if (groupingKey != null) {
    +            for (Map.Entry<String, String> entry: groupingKey.entrySet()) {
    +                url += "/" + entry.getKey() + "/" + 
URLEncoder.encode(entry.getValue(), "UTF-8");
    +            }
    +        }
    +
    +        logger.info("Sending metrics data to '{}'", url);
    +
    +        HttpURLConnection connection = (HttpURLConnection) new 
URL(url).openConnection();
    +        connection.setRequestProperty("Content-Type", 
TextFormatWithTimestamp.CONTENT_TYPE_004);
    +        if (!method.equals("DELETE")) {
    +            connection.setDoOutput(true);
    +        }
    +        connection.setRequestMethod(method);
    +
    +        connection.setConnectTimeout(10 * SECONDS_PER_MILLISECOND);
    +        connection.setReadTimeout(10 * SECONDS_PER_MILLISECOND);
    +        connection.connect();
    +
    +        try {
    +            if (!method.equals("DELETE")) {
    +                BufferedWriter writer =
    +                        new BufferedWriter(
    +                                new 
OutputStreamWriter(connection.getOutputStream(), "UTF-8"));
    +                TextFormatWithTimestamp.write004(writer,
    +                                                
registry.metricFamilySamples(), timestamp);
    +                writer.flush();
    +                writer.close();
    +            }
    +
    +            int response = connection.getResponseCode();
    +            if (response != HttpURLConnection.HTTP_ACCEPTED) {
    +                throw new IOException("Response code from " + url + " was 
" + response);
    +            }
    +        } catch (Exception ex) {
    +            logger.error("Sending metrics failed due to: ", ex);
    +        }
    +
    --- End diff --
    
    Removed extra line.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to