[GitHub] [incubator-gobblin] vikrambohra commented on a change in pull request #2622: Added random key generator for reporters. Added reporters fo…

2019-05-01 Thread GitBox
vikrambohra commented on a change in pull request #2622: Added random key 
generator for reporters. Added reporters fo…
URL: https://github.com/apache/incubator-gobblin/pull/2622#discussion_r280293704
 
 

 ##
 File path: 
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KeyValueEventObjectReporter.java
 ##
 @@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.kafka;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Random;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.lang3.tuple.Pair;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.GobblinTrackingEvent;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.reporter.EventReporter;
+import org.apache.gobblin.util.AvroUtils;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+@Slf4j
+public class KeyValueEventObjectReporter extends EventReporter {
+  private static final String PUSHER_CONFIG = "pusherConfig";
+
+  protected Optional> keys = Optional.absent();
+  protected final String randomKey;
+  protected KeyValuePusher pusher;
+  protected Optional> namespaceOverride;
+  protected final String topic;
+
+  public KeyValueEventObjectReporter(Builder builder){
+super(builder);
+
+this.topic=builder.topic;
+this.namespaceOverride=builder.namespaceOverride;
+Config config = builder.config.get();
+
+if (builder.pusher.isPresent()) {
+  this.pusher = builder.pusher.get();
+} else {
+  Config pusherConfig = ConfigUtils.getConfigOrEmpty(config, 
PUSHER_CONFIG).withFallback(config);
+  String pusherClassName = ConfigUtils.getString(config, "pusherClass", 
PusherUtils.DEFAULT_KEY_VALUE_PUSHER_CLASS_NAME);
+  this.pusher = PusherUtils.getKeyValuePusher(pusherClassName, 
builder.brokers, builder.topic, Optional.of(pusherConfig));
+}
+
+this.closer.register(this.pusher);
+
+randomKey=String.valueOf(new Random().nextInt(100));
 
 Review comment:
   Not sure if that's the right place. buildKey is called for each push message 
and we do not want to create a new random number per message.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-gobblin] zxcware commented on a change in pull request #2622: Added random key generator for reporters. Added reporters fo…

2019-05-01 Thread GitBox
zxcware commented on a change in pull request #2622: Added random key generator 
for reporters. Added reporters fo…
URL: https://github.com/apache/incubator-gobblin/pull/2622#discussion_r280284592
 
 

 ##
 File path: 
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/KafkaReportingFormats.java
 ##
 @@ -17,87 +17,170 @@
 
 package org.apache.gobblin.metrics;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.Properties;
 
+import com.codahale.metrics.ScheduledReporter;
 import com.google.common.base.Splitter;
+import com.typesafe.config.Config;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metrics.kafka.KafkaAvroEventKeyValueReporter;
 import org.apache.gobblin.metrics.kafka.KafkaAvroEventReporter;
 import org.apache.gobblin.metrics.kafka.KafkaAvroReporter;
 import org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistry;
 import org.apache.gobblin.metrics.kafka.KafkaEventReporter;
+import org.apache.gobblin.metrics.kafka.KeyValueEventObjectReporter;
+import org.apache.gobblin.metrics.kafka.KeyValueMetricObjectReporter;
 import org.apache.gobblin.metrics.kafka.KafkaReporter;
+import org.apache.gobblin.metrics.kafka.PusherUtils;
+import org.apache.gobblin.metrics.reporter.util.KafkaAvroReporterUtil;
+import org.apache.gobblin.util.ConfigUtils;
 
 
 /**
  * Kafka reporting formats enumeration.
  */
 public enum KafkaReportingFormats {
 
-  AVRO,
-  AVRO_KEY_VALUE,
-  JSON;
-
-  /**
-   * Get a {@link org.apache.gobblin.metrics.kafka.KafkaReporter.Builder} for 
this reporting format.
-   *
-   * @param properties {@link Properties} containing information to build 
reporters.
-   * @return {@link org.apache.gobblin.metrics.kafka.KafkaReporter.Builder}.
-   */
-  public KafkaReporter.Builder metricReporterBuilder(Properties properties) 
{
-switch (this) {
-  case AVRO:
-KafkaAvroReporter.Builder builder = 
KafkaAvroReporter.BuilderFactory.newBuilder();
-if 
(Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY,
-
ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY))) {
-  builder.withSchemaRegistry(new KafkaAvroSchemaRegistry(properties));
-}
-return builder;
-  case JSON:
-return KafkaReporter.BuilderFactory.newBuilder();
-  default:
-// This should never happen.
-throw new IllegalArgumentException("KafkaReportingFormat not 
recognized.");
+  AVRO() {
+
+@Override
+public void buildMetricsScheduledReporter(String brokers, String topic, 
Properties properties) throws IOException {
+
+  KafkaAvroReporter.Builder builder = 
KafkaAvroReporter.BuilderFactory.newBuilder();
+  if 
(Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY,
+  
ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY))) {
+builder.withSchemaRegistry(new KafkaAvroSchemaRegistry(properties));
+  }
+  builder.build(brokers, topic, properties);
+
+}
+
+@Override
+public ScheduledReporter buildEventsScheduledReporter(String brokers, 
String topic, MetricContext context, Properties properties) throws IOException {
+
+  KafkaAvroEventReporter.Builder builder = 
KafkaAvroEventReporter.Factory.forContext(context);
+  if 
(Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY,
+  
ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY))) {
+builder.withSchemaRegistry(new KafkaAvroSchemaRegistry(properties));
+  }
+  String pusherClassName = 
properties.containsKey(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY_FOR_EVENTS)
+  ? 
properties.getProperty(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY_FOR_EVENTS)
+  : properties.getProperty(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY,
+  PusherUtils.DEFAULT_KAFKA_PUSHER_CLASS_NAME);
+  builder.withPusherClassName(pusherClassName);
+
+  Config allConfig = ConfigUtils.propertiesToConfig(properties);
+  // the kafka configuration is composed of the metrics reporting specific 
keys with a fallback to the shared
+  // kafka config
+  Config kafkaConfig = ConfigUtils.getConfigOrEmpty(allConfig,
+  
PusherUtils.METRICS_REPORTING_KAFKA_CONFIG_PREFIX).withFallback(ConfigUtils.getConfigOrEmpty(allConfig,
+  ConfigurationKeys.SHARED_KAFKA_CONFIG_PREFIX));
+
+  builder.withConfig(kafkaConfig);
+
+  return builder.build(brokers, topic);
+
 }
-  }
-
-  /**
-   * Get a {@link org.apache.gobblin.metrics.kafka.KafkaEventReporter.Builder} 
for this reporting format.
-   * @param context {@link MetricContext} that should be reported.
-   * @param properties {@link Properties} containing information to build 
reporters.
-   * @return {@link 

[GitHub] [incubator-gobblin] zxcware commented on a change in pull request #2622: Added random key generator for reporters. Added reporters fo…

2019-05-01 Thread GitBox
zxcware commented on a change in pull request #2622: Added random key generator 
for reporters. Added reporters fo…
URL: https://github.com/apache/incubator-gobblin/pull/2622#discussion_r280284613
 
 

 ##
 File path: 
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/KafkaReportingFormats.java
 ##
 @@ -17,87 +17,170 @@
 
 package org.apache.gobblin.metrics;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.Properties;
 
+import com.codahale.metrics.ScheduledReporter;
 import com.google.common.base.Splitter;
+import com.typesafe.config.Config;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metrics.kafka.KafkaAvroEventKeyValueReporter;
 import org.apache.gobblin.metrics.kafka.KafkaAvroEventReporter;
 import org.apache.gobblin.metrics.kafka.KafkaAvroReporter;
 import org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistry;
 import org.apache.gobblin.metrics.kafka.KafkaEventReporter;
+import org.apache.gobblin.metrics.kafka.KeyValueEventObjectReporter;
+import org.apache.gobblin.metrics.kafka.KeyValueMetricObjectReporter;
 import org.apache.gobblin.metrics.kafka.KafkaReporter;
+import org.apache.gobblin.metrics.kafka.PusherUtils;
+import org.apache.gobblin.metrics.reporter.util.KafkaAvroReporterUtil;
+import org.apache.gobblin.util.ConfigUtils;
 
 
 /**
  * Kafka reporting formats enumeration.
  */
 public enum KafkaReportingFormats {
 
-  AVRO,
-  AVRO_KEY_VALUE,
-  JSON;
-
-  /**
-   * Get a {@link org.apache.gobblin.metrics.kafka.KafkaReporter.Builder} for 
this reporting format.
-   *
-   * @param properties {@link Properties} containing information to build 
reporters.
-   * @return {@link org.apache.gobblin.metrics.kafka.KafkaReporter.Builder}.
-   */
-  public KafkaReporter.Builder metricReporterBuilder(Properties properties) 
{
-switch (this) {
-  case AVRO:
-KafkaAvroReporter.Builder builder = 
KafkaAvroReporter.BuilderFactory.newBuilder();
-if 
(Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY,
-
ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY))) {
-  builder.withSchemaRegistry(new KafkaAvroSchemaRegistry(properties));
-}
-return builder;
-  case JSON:
-return KafkaReporter.BuilderFactory.newBuilder();
-  default:
-// This should never happen.
-throw new IllegalArgumentException("KafkaReportingFormat not 
recognized.");
+  AVRO() {
+
+@Override
+public void buildMetricsScheduledReporter(String brokers, String topic, 
Properties properties) throws IOException {
+
+  KafkaAvroReporter.Builder builder = 
KafkaAvroReporter.BuilderFactory.newBuilder();
+  if 
(Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY,
+  
ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY))) {
+builder.withSchemaRegistry(new KafkaAvroSchemaRegistry(properties));
+  }
+  builder.build(brokers, topic, properties);
+
+}
+
+@Override
+public ScheduledReporter buildEventsScheduledReporter(String brokers, 
String topic, MetricContext context, Properties properties) throws IOException {
+
+  KafkaAvroEventReporter.Builder builder = 
KafkaAvroEventReporter.Factory.forContext(context);
+  if 
(Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY,
+  
ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY))) {
+builder.withSchemaRegistry(new KafkaAvroSchemaRegistry(properties));
+  }
+  String pusherClassName = 
properties.containsKey(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY_FOR_EVENTS)
+  ? 
properties.getProperty(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY_FOR_EVENTS)
+  : properties.getProperty(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY,
+  PusherUtils.DEFAULT_KAFKA_PUSHER_CLASS_NAME);
+  builder.withPusherClassName(pusherClassName);
+
+  Config allConfig = ConfigUtils.propertiesToConfig(properties);
+  // the kafka configuration is composed of the metrics reporting specific 
keys with a fallback to the shared
+  // kafka config
+  Config kafkaConfig = ConfigUtils.getConfigOrEmpty(allConfig,
+  
PusherUtils.METRICS_REPORTING_KAFKA_CONFIG_PREFIX).withFallback(ConfigUtils.getConfigOrEmpty(allConfig,
+  ConfigurationKeys.SHARED_KAFKA_CONFIG_PREFIX));
+
+  builder.withConfig(kafkaConfig);
+
+  return builder.build(brokers, topic);
+
 }
-  }
-
-  /**
-   * Get a {@link org.apache.gobblin.metrics.kafka.KafkaEventReporter.Builder} 
for this reporting format.
-   * @param context {@link MetricContext} that should be reported.
-   * @param properties {@link Properties} containing information to build 
reporters.
-   * @return {@link 

[GitHub] [incubator-gobblin] zxcware commented on a change in pull request #2622: Added random key generator for reporters. Added reporters fo…

2019-05-01 Thread GitBox
zxcware commented on a change in pull request #2622: Added random key generator 
for reporters. Added reporters fo…
URL: https://github.com/apache/incubator-gobblin/pull/2622#discussion_r280284912
 
 

 ##
 File path: 
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KeyValueEventObjectReporter.java
 ##
 @@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.kafka;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Random;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.lang3.tuple.Pair;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.GobblinTrackingEvent;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.reporter.EventReporter;
+import org.apache.gobblin.util.AvroUtils;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+@Slf4j
+public class KeyValueEventObjectReporter extends EventReporter {
+  private static final String PUSHER_CONFIG = "pusherConfig";
+
+  protected Optional> keys = Optional.absent();
+  protected final String randomKey;
+  protected KeyValuePusher pusher;
+  protected Optional> namespaceOverride;
+  protected final String topic;
+
+  public KeyValueEventObjectReporter(Builder builder){
+super(builder);
+
+this.topic=builder.topic;
+this.namespaceOverride=builder.namespaceOverride;
+Config config = builder.config.get();
+
+if (builder.pusher.isPresent()) {
+  this.pusher = builder.pusher.get();
+} else {
+  Config pusherConfig = ConfigUtils.getConfigOrEmpty(config, 
PUSHER_CONFIG).withFallback(config);
+  String pusherClassName = ConfigUtils.getString(config, "pusherClass", 
PusherUtils.DEFAULT_KEY_VALUE_PUSHER_CLASS_NAME);
 
 Review comment:
   define constant for "pusherClass"


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-gobblin] zxcware commented on a change in pull request #2622: Added random key generator for reporters. Added reporters fo…

2019-05-01 Thread GitBox
zxcware commented on a change in pull request #2622: Added random key generator 
for reporters. Added reporters fo…
URL: https://github.com/apache/incubator-gobblin/pull/2622#discussion_r280285071
 
 

 ##
 File path: 
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KeyValueEventObjectReporter.java
 ##
 @@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.kafka;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Random;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.lang3.tuple.Pair;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.GobblinTrackingEvent;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.reporter.EventReporter;
+import org.apache.gobblin.util.AvroUtils;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+@Slf4j
+public class KeyValueEventObjectReporter extends EventReporter {
 
 Review comment:
   install code style template to format your codes. Instructions are here: 
https://gobblin.readthedocs.io/en/latest/developer-guide/CodingStyle/


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-gobblin] zxcware commented on a change in pull request #2622: Added random key generator for reporters. Added reporters fo…

2019-05-01 Thread GitBox
zxcware commented on a change in pull request #2622: Added random key generator 
for reporters. Added reporters fo…
URL: https://github.com/apache/incubator-gobblin/pull/2622#discussion_r280285604
 
 

 ##
 File path: 
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KeyValueEventObjectReporter.java
 ##
 @@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.kafka;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Random;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.lang3.tuple.Pair;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.GobblinTrackingEvent;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.reporter.EventReporter;
+import org.apache.gobblin.util.AvroUtils;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+@Slf4j
+public class KeyValueEventObjectReporter extends EventReporter {
+  private static final String PUSHER_CONFIG = "pusherConfig";
+
+  protected Optional> keys = Optional.absent();
+  protected final String randomKey;
+  protected KeyValuePusher pusher;
+  protected Optional> namespaceOverride;
+  protected final String topic;
+
+  public KeyValueEventObjectReporter(Builder builder){
+super(builder);
+
+this.topic=builder.topic;
+this.namespaceOverride=builder.namespaceOverride;
+Config config = builder.config.get();
+
+if (builder.pusher.isPresent()) {
+  this.pusher = builder.pusher.get();
+} else {
+  Config pusherConfig = ConfigUtils.getConfigOrEmpty(config, 
PUSHER_CONFIG).withFallback(config);
+  String pusherClassName = ConfigUtils.getString(config, "pusherClass", 
PusherUtils.DEFAULT_KEY_VALUE_PUSHER_CLASS_NAME);
+  this.pusher = PusherUtils.getKeyValuePusher(pusherClassName, 
builder.brokers, builder.topic, Optional.of(pusherConfig));
+}
+
+this.closer.register(this.pusher);
+
+randomKey=String.valueOf(new Random().nextInt(100));
+String pusherKeys_Key = "pusherKeys";
+if (config.hasPath(pusherKeys_Key)) {
+  List keys = 
Splitter.on(",").omitEmptyStrings().trimResults().splitToList(config.getString(pusherKeys_Key));
+  this.keys = Optional.of(keys);
+}else{
+  log.warn("Key not assigned from config. Please set it with property {}", 
ConfigurationKeys.METRICS_REPORTING_EVENTS_KAFKAPUSHERKEYS);
+  log.warn("Using generated number " + randomKey + " as key");
+}
+  }
+
+  @Override
+  public void reportEventQueue(Queue queue) {
+log.info("Emitting report using KeyValueEventObjectReporter");
+
+List> events = Lists.newArrayList();
+GobblinTrackingEvent event;
+
+while(null != (event = queue.poll())){
+  GenericRecord record = AvroUtils.overrideNameAndNamespace(event, 
this.topic, this.namespaceOverride);
+  events.add(Pair.of(buildKey(event), record));
+}
+
+if (!events.isEmpty()) {
+  this.pusher.pushKeyValueMessages(events);
+}
+  }
+
+  protected String buildKey(GobblinTrackingEvent event) {
+
+String key = randomKey;
+if (this.keys.isPresent()) {
+
+  StringBuilder keyBuilder = new StringBuilder();
+  for (String keyPart : keys.get()) {
+Map metadata = event.getMetadata();
+if (metadata.containsKey(keyPart)) {
+  keyBuilder.append(metadata.get(keyPart));
+} else {
+  log.error("{} not found in the GobblinTrackingEvent. Setting key to 
null.", key);
+  keyBuilder = null;
+  break;
+}
+  }
+
+  key = (keyBuilder == null) ? key : keyBuilder.toString();
+}
+
+return key;
+  }
+
+  public static class Factory {
+/**
+ * Returns a new {@link KeyValueEventObjectReporter.Builder} for {@link 
KeyValueEventObjectReporter}.
+ * Will automatically add all Context tags to the reporter.
+ *
+ * @param context the {@link MetricContext} to report
+ * 

[GitHub] [incubator-gobblin] zxcware commented on a change in pull request #2622: Added random key generator for reporters. Added reporters fo…

2019-05-01 Thread GitBox
zxcware commented on a change in pull request #2622: Added random key generator 
for reporters. Added reporters fo…
URL: https://github.com/apache/incubator-gobblin/pull/2622#discussion_r280285710
 
 

 ##
 File path: 
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KeyValueMetricObjectReporter.java
 ##
 @@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.kafka;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.lang3.tuple.Pair;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.MetricReport;
+import org.apache.gobblin.metrics.reporter.MetricReportReporter;
+import org.apache.gobblin.util.AvroUtils;
+import org.apache.gobblin.util.ConfigUtils;
+
+@Slf4j
+public class KeyValueMetricObjectReporter extends MetricReportReporter {
 
 Review comment:
   comments for `KeyValueEventsObjectReporter` are also applied here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-gobblin] zxcware commented on a change in pull request #2622: Added random key generator for reporters. Added reporters fo…

2019-05-01 Thread GitBox
zxcware commented on a change in pull request #2622: Added random key generator 
for reporters. Added reporters fo…
URL: https://github.com/apache/incubator-gobblin/pull/2622#discussion_r280285532
 
 

 ##
 File path: 
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KeyValueEventObjectReporter.java
 ##
 @@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.kafka;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Random;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.lang3.tuple.Pair;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.GobblinTrackingEvent;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.reporter.EventReporter;
+import org.apache.gobblin.util.AvroUtils;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+@Slf4j
+public class KeyValueEventObjectReporter extends EventReporter {
+  private static final String PUSHER_CONFIG = "pusherConfig";
+
+  protected Optional> keys = Optional.absent();
+  protected final String randomKey;
+  protected KeyValuePusher pusher;
+  protected Optional> namespaceOverride;
+  protected final String topic;
+
+  public KeyValueEventObjectReporter(Builder builder){
+super(builder);
+
+this.topic=builder.topic;
+this.namespaceOverride=builder.namespaceOverride;
+Config config = builder.config.get();
+
+if (builder.pusher.isPresent()) {
+  this.pusher = builder.pusher.get();
+} else {
+  Config pusherConfig = ConfigUtils.getConfigOrEmpty(config, 
PUSHER_CONFIG).withFallback(config);
+  String pusherClassName = ConfigUtils.getString(config, "pusherClass", 
PusherUtils.DEFAULT_KEY_VALUE_PUSHER_CLASS_NAME);
+  this.pusher = PusherUtils.getKeyValuePusher(pusherClassName, 
builder.brokers, builder.topic, Optional.of(pusherConfig));
+}
+
+this.closer.register(this.pusher);
+
+randomKey=String.valueOf(new Random().nextInt(100));
 
 Review comment:
   This is `buildKey` logic, put it there.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-gobblin] zxcware commented on a change in pull request #2622: Added random key generator for reporters. Added reporters fo…

2019-05-01 Thread GitBox
zxcware commented on a change in pull request #2622: Added random key generator 
for reporters. Added reporters fo…
URL: https://github.com/apache/incubator-gobblin/pull/2622#discussion_r280285097
 
 

 ##
 File path: 
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KeyValueEventObjectReporter.java
 ##
 @@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.kafka;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Random;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.lang3.tuple.Pair;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.GobblinTrackingEvent;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.reporter.EventReporter;
+import org.apache.gobblin.util.AvroUtils;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+@Slf4j
+public class KeyValueEventObjectReporter extends EventReporter {
+  private static final String PUSHER_CONFIG = "pusherConfig";
 
 Review comment:
   install code style template to format your codes. Instructions are here: 
https://gobblin.readthedocs.io/en/latest/developer-guide/CodingStyle/


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-gobblin] zxcware commented on a change in pull request #2622: Added random key generator for reporters. Added reporters fo…

2019-05-01 Thread GitBox
zxcware commented on a change in pull request #2622: Added random key generator 
for reporters. Added reporters fo…
URL: https://github.com/apache/incubator-gobblin/pull/2622#discussion_r280284823
 
 

 ##
 File path: 
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/KafkaReportingFormats.java
 ##
 @@ -17,87 +17,170 @@
 
 package org.apache.gobblin.metrics;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.Properties;
 
+import com.codahale.metrics.ScheduledReporter;
 import com.google.common.base.Splitter;
+import com.typesafe.config.Config;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metrics.kafka.KafkaAvroEventKeyValueReporter;
 import org.apache.gobblin.metrics.kafka.KafkaAvroEventReporter;
 import org.apache.gobblin.metrics.kafka.KafkaAvroReporter;
 import org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistry;
 import org.apache.gobblin.metrics.kafka.KafkaEventReporter;
+import org.apache.gobblin.metrics.kafka.KeyValueEventObjectReporter;
+import org.apache.gobblin.metrics.kafka.KeyValueMetricObjectReporter;
 import org.apache.gobblin.metrics.kafka.KafkaReporter;
+import org.apache.gobblin.metrics.kafka.PusherUtils;
+import org.apache.gobblin.metrics.reporter.util.KafkaAvroReporterUtil;
+import org.apache.gobblin.util.ConfigUtils;
 
 
 /**
  * Kafka reporting formats enumeration.
  */
 public enum KafkaReportingFormats {
 
-  AVRO,
-  AVRO_KEY_VALUE,
-  JSON;
-
-  /**
-   * Get a {@link org.apache.gobblin.metrics.kafka.KafkaReporter.Builder} for 
this reporting format.
-   *
-   * @param properties {@link Properties} containing information to build 
reporters.
-   * @return {@link org.apache.gobblin.metrics.kafka.KafkaReporter.Builder}.
-   */
-  public KafkaReporter.Builder metricReporterBuilder(Properties properties) 
{
-switch (this) {
-  case AVRO:
-KafkaAvroReporter.Builder builder = 
KafkaAvroReporter.BuilderFactory.newBuilder();
-if 
(Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY,
-
ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY))) {
-  builder.withSchemaRegistry(new KafkaAvroSchemaRegistry(properties));
-}
-return builder;
-  case JSON:
-return KafkaReporter.BuilderFactory.newBuilder();
-  default:
-// This should never happen.
-throw new IllegalArgumentException("KafkaReportingFormat not 
recognized.");
+  AVRO() {
+
+@Override
+public void buildMetricsScheduledReporter(String brokers, String topic, 
Properties properties) throws IOException {
+
+  KafkaAvroReporter.Builder builder = 
KafkaAvroReporter.BuilderFactory.newBuilder();
+  if 
(Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY,
+  
ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY))) {
+builder.withSchemaRegistry(new KafkaAvroSchemaRegistry(properties));
+  }
+  builder.build(brokers, topic, properties);
+
+}
+
+@Override
+public ScheduledReporter buildEventsScheduledReporter(String brokers, 
String topic, MetricContext context, Properties properties) throws IOException {
+
+  KafkaAvroEventReporter.Builder builder = 
KafkaAvroEventReporter.Factory.forContext(context);
+  if 
(Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY,
+  
ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY))) {
+builder.withSchemaRegistry(new KafkaAvroSchemaRegistry(properties));
+  }
+  String pusherClassName = 
properties.containsKey(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY_FOR_EVENTS)
+  ? 
properties.getProperty(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY_FOR_EVENTS)
+  : properties.getProperty(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY,
+  PusherUtils.DEFAULT_KAFKA_PUSHER_CLASS_NAME);
+  builder.withPusherClassName(pusherClassName);
+
+  Config allConfig = ConfigUtils.propertiesToConfig(properties);
+  // the kafka configuration is composed of the metrics reporting specific 
keys with a fallback to the shared
+  // kafka config
+  Config kafkaConfig = ConfigUtils.getConfigOrEmpty(allConfig,
+  
PusherUtils.METRICS_REPORTING_KAFKA_CONFIG_PREFIX).withFallback(ConfigUtils.getConfigOrEmpty(allConfig,
+  ConfigurationKeys.SHARED_KAFKA_CONFIG_PREFIX));
+
+  builder.withConfig(kafkaConfig);
+
+  return builder.build(brokers, topic);
+
 }
-  }
-
-  /**
-   * Get a {@link org.apache.gobblin.metrics.kafka.KafkaEventReporter.Builder} 
for this reporting format.
-   * @param context {@link MetricContext} that should be reported.
-   * @param properties {@link Properties} containing information to build 
reporters.
-   * @return {@link 

[GitHub] [incubator-gobblin] zxcware commented on a change in pull request #2622: Added random key generator for reporters. Added reporters fo…

2019-05-01 Thread GitBox
zxcware commented on a change in pull request #2622: Added random key generator 
for reporters. Added reporters fo…
URL: https://github.com/apache/incubator-gobblin/pull/2622#discussion_r280285885
 
 

 ##
 File path: gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
 ##
 @@ -875,11 +876,26 @@ public static Schema decorateRecordSchema(Schema 
inputSchema, @Nonnull List fieldMap,
   Schema outputSchema) {
 GenericRecord outputRecord = new GenericData.Record(outputSchema);
-inputRecord.getSchema().getFields().forEach(
-f -> outputRecord.put(f.name(), inputRecord.get(f.name()))
-);
+inputRecord.getSchema().getFields().forEach(f -> 
outputRecord.put(f.name(), inputRecord.get(f.name(;
 fieldMap.forEach((key, value) -> outputRecord.put(key, value));
 return outputRecord;
   }
 
+  public static GenericRecord overrideNameAndNamespace(GenericRecord event, 
String nameOverride, Optional> namespaceOverride) {
 
 Review comment:
   Need unit test


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-gobblin] zxcware commented on a change in pull request #2622: Added random key generator for reporters. Added reporters fo…

2019-05-01 Thread GitBox
zxcware commented on a change in pull request #2622: Added random key generator 
for reporters. Added reporters fo…
URL: https://github.com/apache/incubator-gobblin/pull/2622#discussion_r280285404
 
 

 ##
 File path: 
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KeyValueEventObjectReporter.java
 ##
 @@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.kafka;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Random;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.lang3.tuple.Pair;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.GobblinTrackingEvent;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.reporter.EventReporter;
+import org.apache.gobblin.util.AvroUtils;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+@Slf4j
+public class KeyValueEventObjectReporter extends EventReporter {
+  private static final String PUSHER_CONFIG = "pusherConfig";
+
+  protected Optional> keys = Optional.absent();
+  protected final String randomKey;
+  protected KeyValuePusher pusher;
+  protected Optional> namespaceOverride;
+  protected final String topic;
+
+  public KeyValueEventObjectReporter(Builder builder){
+super(builder);
+
+this.topic=builder.topic;
+this.namespaceOverride=builder.namespaceOverride;
+Config config = builder.config.get();
+
+if (builder.pusher.isPresent()) {
+  this.pusher = builder.pusher.get();
+} else {
+  Config pusherConfig = ConfigUtils.getConfigOrEmpty(config, 
PUSHER_CONFIG).withFallback(config);
+  String pusherClassName = ConfigUtils.getString(config, "pusherClass", 
PusherUtils.DEFAULT_KEY_VALUE_PUSHER_CLASS_NAME);
+  this.pusher = PusherUtils.getKeyValuePusher(pusherClassName, 
builder.brokers, builder.topic, Optional.of(pusherConfig));
+}
+
+this.closer.register(this.pusher);
+
+randomKey=String.valueOf(new Random().nextInt(100));
+String pusherKeys_Key = "pusherKeys";
+if (config.hasPath(pusherKeys_Key)) {
+  List keys = 
Splitter.on(",").omitEmptyStrings().trimResults().splitToList(config.getString(pusherKeys_Key));
+  this.keys = Optional.of(keys);
+}else{
+  log.warn("Key not assigned from config. Please set it with property {}", 
ConfigurationKeys.METRICS_REPORTING_EVENTS_KAFKAPUSHERKEYS);
+  log.warn("Using generated number " + randomKey + " as key");
+}
+  }
+
+  @Override
+  public void reportEventQueue(Queue queue) {
+log.info("Emitting report using KeyValueEventObjectReporter");
+
+List> events = Lists.newArrayList();
+GobblinTrackingEvent event;
+
+while(null != (event = queue.poll())){
+  GenericRecord record = AvroUtils.overrideNameAndNamespace(event, 
this.topic, this.namespaceOverride);
+  events.add(Pair.of(buildKey(event), record));
+}
+
+if (!events.isEmpty()) {
+  this.pusher.pushKeyValueMessages(events);
+}
+  }
+
+  protected String buildKey(GobblinTrackingEvent event) {
+
+String key = randomKey;
+if (this.keys.isPresent()) {
+
+  StringBuilder keyBuilder = new StringBuilder();
+  for (String keyPart : keys.get()) {
+Map metadata = event.getMetadata();
+if (metadata.containsKey(keyPart)) {
+  keyBuilder.append(metadata.get(keyPart));
 
 Review comment:
   should provide a proper separator between keyparts


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-gobblin] zxcware commented on a change in pull request #2622: Added random key generator for reporters. Added reporters fo…

2019-05-01 Thread GitBox
zxcware commented on a change in pull request #2622: Added random key generator 
for reporters. Added reporters fo…
URL: https://github.com/apache/incubator-gobblin/pull/2622#discussion_r280284689
 
 

 ##
 File path: 
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java
 ##
 @@ -44,7 +44,7 @@ public ScheduledReporter newScheduledReporter(MetricRegistry 
registry, Propertie
 ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_ENABLED))) {
   return null;
 }
-log.info("Reporting metrics to Kafka");
 
 Review comment:
   This is a redundant log and can be removed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-gobblin] zxcware commented on a change in pull request #2622: Added random key generator for reporters. Added reporters fo…

2019-05-01 Thread GitBox
zxcware commented on a change in pull request #2622: Added random key generator 
for reporters. Added reporters fo…
URL: https://github.com/apache/incubator-gobblin/pull/2622#discussion_r280285611
 
 

 ##
 File path: 
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KeyValueEventObjectReporter.java
 ##
 @@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.kafka;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Random;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.lang3.tuple.Pair;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.GobblinTrackingEvent;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.reporter.EventReporter;
+import org.apache.gobblin.util.AvroUtils;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+@Slf4j
+public class KeyValueEventObjectReporter extends EventReporter {
+  private static final String PUSHER_CONFIG = "pusherConfig";
+
+  protected Optional> keys = Optional.absent();
+  protected final String randomKey;
+  protected KeyValuePusher pusher;
+  protected Optional> namespaceOverride;
+  protected final String topic;
+
+  public KeyValueEventObjectReporter(Builder builder){
+super(builder);
+
+this.topic=builder.topic;
+this.namespaceOverride=builder.namespaceOverride;
+Config config = builder.config.get();
+
+if (builder.pusher.isPresent()) {
+  this.pusher = builder.pusher.get();
+} else {
+  Config pusherConfig = ConfigUtils.getConfigOrEmpty(config, 
PUSHER_CONFIG).withFallback(config);
+  String pusherClassName = ConfigUtils.getString(config, "pusherClass", 
PusherUtils.DEFAULT_KEY_VALUE_PUSHER_CLASS_NAME);
+  this.pusher = PusherUtils.getKeyValuePusher(pusherClassName, 
builder.brokers, builder.topic, Optional.of(pusherConfig));
+}
+
+this.closer.register(this.pusher);
+
+randomKey=String.valueOf(new Random().nextInt(100));
+String pusherKeys_Key = "pusherKeys";
+if (config.hasPath(pusherKeys_Key)) {
+  List keys = 
Splitter.on(",").omitEmptyStrings().trimResults().splitToList(config.getString(pusherKeys_Key));
+  this.keys = Optional.of(keys);
+}else{
+  log.warn("Key not assigned from config. Please set it with property {}", 
ConfigurationKeys.METRICS_REPORTING_EVENTS_KAFKAPUSHERKEYS);
+  log.warn("Using generated number " + randomKey + " as key");
+}
+  }
+
+  @Override
+  public void reportEventQueue(Queue queue) {
+log.info("Emitting report using KeyValueEventObjectReporter");
+
+List> events = Lists.newArrayList();
+GobblinTrackingEvent event;
+
+while(null != (event = queue.poll())){
+  GenericRecord record = AvroUtils.overrideNameAndNamespace(event, 
this.topic, this.namespaceOverride);
+  events.add(Pair.of(buildKey(event), record));
+}
+
+if (!events.isEmpty()) {
+  this.pusher.pushKeyValueMessages(events);
+}
+  }
+
+  protected String buildKey(GobblinTrackingEvent event) {
+
+String key = randomKey;
+if (this.keys.isPresent()) {
+
+  StringBuilder keyBuilder = new StringBuilder();
+  for (String keyPart : keys.get()) {
+Map metadata = event.getMetadata();
+if (metadata.containsKey(keyPart)) {
+  keyBuilder.append(metadata.get(keyPart));
+} else {
+  log.error("{} not found in the GobblinTrackingEvent. Setting key to 
null.", key);
+  keyBuilder = null;
+  break;
+}
+  }
+
+  key = (keyBuilder == null) ? key : keyBuilder.toString();
+}
+
+return key;
+  }
+
+  public static class Factory {
+/**
+ * Returns a new {@link KeyValueEventObjectReporter.Builder} for {@link 
KeyValueEventObjectReporter}.
+ * Will automatically add all Context tags to the reporter.
+ *
+ * @param context the {@link MetricContext} to report
+ * 

[GitHub] [incubator-gobblin] zxcware commented on a change in pull request #2622: Added random key generator for reporters. Added reporters fo…

2019-05-01 Thread GitBox
zxcware commented on a change in pull request #2622: Added random key generator 
for reporters. Added reporters fo…
URL: https://github.com/apache/incubator-gobblin/pull/2622#discussion_r280285309
 
 

 ##
 File path: 
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KeyValueEventObjectReporter.java
 ##
 @@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.kafka;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Random;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.lang3.tuple.Pair;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.GobblinTrackingEvent;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.reporter.EventReporter;
+import org.apache.gobblin.util.AvroUtils;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+@Slf4j
+public class KeyValueEventObjectReporter extends EventReporter {
+  private static final String PUSHER_CONFIG = "pusherConfig";
+
+  protected Optional> keys = Optional.absent();
+  protected final String randomKey;
+  protected KeyValuePusher pusher;
+  protected Optional> namespaceOverride;
+  protected final String topic;
+
+  public KeyValueEventObjectReporter(Builder builder){
+super(builder);
+
+this.topic=builder.topic;
+this.namespaceOverride=builder.namespaceOverride;
+Config config = builder.config.get();
+
+if (builder.pusher.isPresent()) {
+  this.pusher = builder.pusher.get();
+} else {
+  Config pusherConfig = ConfigUtils.getConfigOrEmpty(config, 
PUSHER_CONFIG).withFallback(config);
+  String pusherClassName = ConfigUtils.getString(config, "pusherClass", 
PusherUtils.DEFAULT_KEY_VALUE_PUSHER_CLASS_NAME);
+  this.pusher = PusherUtils.getKeyValuePusher(pusherClassName, 
builder.brokers, builder.topic, Optional.of(pusherConfig));
+}
+
+this.closer.register(this.pusher);
+
+randomKey=String.valueOf(new Random().nextInt(100));
+String pusherKeys_Key = "pusherKeys";
+if (config.hasPath(pusherKeys_Key)) {
+  List keys = 
Splitter.on(",").omitEmptyStrings().trimResults().splitToList(config.getString(pusherKeys_Key));
+  this.keys = Optional.of(keys);
+}else{
+  log.warn("Key not assigned from config. Please set it with property {}", 
ConfigurationKeys.METRICS_REPORTING_EVENTS_KAFKAPUSHERKEYS);
+  log.warn("Using generated number " + randomKey + " as key");
+}
+  }
+
+  @Override
+  public void reportEventQueue(Queue queue) {
+log.info("Emitting report using KeyValueEventObjectReporter");
+
+List> events = Lists.newArrayList();
+GobblinTrackingEvent event;
+
+while(null != (event = queue.poll())){
+  GenericRecord record = AvroUtils.overrideNameAndNamespace(event, 
this.topic, this.namespaceOverride);
+  events.add(Pair.of(buildKey(event), record));
+}
+
+if (!events.isEmpty()) {
+  this.pusher.pushKeyValueMessages(events);
+}
+  }
+
+  protected String buildKey(GobblinTrackingEvent event) {
 
 Review comment:
   define as `private`. Initially reserve the right to extend, allowing easy 
refactor if any in the future.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Work logged] (GOBBLIN-707) combine & standardize all gobblin scripts into one master script & restructure configs accordingly

2019-05-01 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/GOBBLIN-707?focusedWorklogId=236098=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-236098
 ]

ASF GitHub Bot logged work on GOBBLIN-707:
--

Author: ASF GitHub Bot
Created on: 02/May/19 02:42
Start Date: 02/May/19 02:42
Worklog Time Spent: 10m 
  Work Description: jhsenjaliya commented on issue #2578: [GOBBLIN-707] 
rewrite gobblin script to combine all modes and command
URL: 
https://github.com/apache/incubator-gobblin/pull/2578#issuecomment-488537624
 
 
   1 out of 4 test env has failed, will look into it.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 236098)
Time Spent: 5h 40m  (was: 5.5h)

> combine & standardize all gobblin scripts into one master script & 
> restructure configs accordingly
> --
>
> Key: GOBBLIN-707
> URL: https://issues.apache.org/jira/browse/GOBBLIN-707
> Project: Apache Gobblin
>  Issue Type: Improvement
>Reporter: Jay Sen
>Priority: Major
>  Time Spent: 5h 40m
>  Remaining Estimate: 0h
>
> gobblin supports multiple modes of executions ( CLI, Standalone, 
> cluster-master, cluster-worker, AWS, YARN, MR ) and various command lines 
> utility to run cli and admin commands. There is a individual script for each 
> of them.
> Having individual script introduces lot of issues
>  # all scripts handles gobblin variables, user parameters differently, and 
> its highly inconsistent among various different gobblin scripts
>  # functionality around start, stop, status checking and handling PID's among 
> lot of other things, varies vastly as per the implementation of the script.
>  # features like GC & JVM params, log4j file selection, classpath 
> calculation, etc... exists in some gobblin scripts but not all, adding to 
> inconsistent user experience.
>  # maintaining total 13 script would be too much effort.
> Also all the gobblin scripts share lot of common code to handle params, 
> start, stop services, status checks, pid handling, etc... combining all the 
> scripts into  1 not only makes maintenance easier but also brings clarity and 
> consistency.
>  
> Solution:
> 1. there can be one gobblin.sh script to handle all gobblin commands and 
> deployment options as per following signature. NOTE: This
> {{gobblin.sh   }}
>  {{gobblin.sh   }}
> {{commands values: admin, cli, statestore-check, statestore-clean, 
> historystore-manager, classpath}}
>  {{service values: standalone, cluster-master, cluster-worker, aws, yarn, mr, 
> service}}
> with above change, following becomes valid command.
> {code:java}
> # all under GobblinCli class
> gobblin run listQuickApps  –> gobblin cli run listQuickApps
> gobblin run listQuickApps  –> gobblin cli run listQuickApps
> gobblin run  -> gobblin cli run 
> # class: JobStateToJsonConverter
> statestore-checker.sh  -> gobblin statestore-checker 
> # class: StateStoreCleaner
> statestore-clean.sh  -> gobblin statestore-clean 
> # class: DatabaseJobHistoryStoreSchemaManager
> historystore-manager.sh  -> gobblin historystore-manager 
> # class: Cli
> gobblin-admin.sh-> gobblin admin 
> # all gobblin deployment modes
> gobblin-cluster-master.sh   -> gobblin cluster-mater start|stop|status
> gobblin-cluster-worker.sh   -> gobblin cluster-mater start|stop|status
> gobblin-compaction.sh   -> gobblin cluster-mater start|stop|status
> gobblin-env.sh  -> gobblin cluster-mater start|stop|status
> gobblin-mapreduce.sh-> gobblin cluster-mater start|stop|status
> gobblin-service.sh  -> gobblin cluster-mater start|stop|status
> gobblin-standalone.sh   -> gobblin cluster-mater start|stop|status
> gobblin-yarn.sh -> gobblin cluster-mater start|stop|status
> {code}
>  
> 2. Also configs needs to be structured and deduped accordingly to make it 
> clear on which config will be picked up for which execution mode.
>  
>  {color:#FF}
>  NOTE: this refactoring to gobblin.sh, changes the way all gobblin commands 
> where ran before{color}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [incubator-gobblin] jhsenjaliya commented on issue #2578: [GOBBLIN-707] rewrite gobblin script to combine all modes and command

2019-05-01 Thread GitBox
jhsenjaliya commented on issue #2578: [GOBBLIN-707] rewrite gobblin script to 
combine all modes and command
URL: 
https://github.com/apache/incubator-gobblin/pull/2578#issuecomment-488537624
 
 
   1 out of 4 test env has failed, will look into it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-gobblin] asfgit closed pull request #2625: [GOBBLIN-761]Only instantiate topic-specific configStore object when topic.name is available

2019-05-01 Thread GitBox
asfgit closed pull request #2625: [GOBBLIN-761]Only instantiate topic-specific 
configStore object when topic.name is available
URL: https://github.com/apache/incubator-gobblin/pull/2625
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Work logged] (GOBBLIN-761) Fix runtime property like Topic.name not available in Compaction when fetching configStore object

2019-05-01 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/GOBBLIN-761?focusedWorklogId=236069=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-236069
 ]

ASF GitHub Bot logged work on GOBBLIN-761:
--

Author: ASF GitHub Bot
Created on: 02/May/19 00:21
Start Date: 02/May/19 00:21
Worklog Time Spent: 10m 
  Work Description: asfgit commented on pull request #2625: 
[GOBBLIN-761]Only instantiate topic-specific configStore object when topic.name 
is available
URL: https://github.com/apache/incubator-gobblin/pull/2625
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 236069)
Time Spent: 20m  (was: 10m)

> Fix runtime property like Topic.name not available in Compaction when 
> fetching configStore object
> -
>
> Key: GOBBLIN-761
> URL: https://issues.apache.org/jira/browse/GOBBLIN-761
> Project: Apache Gobblin
>  Issue Type: Improvement
>Reporter: Lei Sun
>Priority: Major
>  Time Spent: 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (GOBBLIN-761) Fix runtime property like Topic.name not available in Compaction when fetching configStore object

2019-05-01 Thread Hung Tran (JIRA)


 [ 
https://issues.apache.org/jira/browse/GOBBLIN-761?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hung Tran resolved GOBBLIN-761.
---
   Resolution: Fixed
Fix Version/s: 0.15.0

Issue resolved by pull request #2625
[https://github.com/apache/incubator-gobblin/pull/2625]

> Fix runtime property like Topic.name not available in Compaction when 
> fetching configStore object
> -
>
> Key: GOBBLIN-761
> URL: https://issues.apache.org/jira/browse/GOBBLIN-761
> Project: Apache Gobblin
>  Issue Type: Improvement
>Reporter: Lei Sun
>Priority: Major
> Fix For: 0.15.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (GOBBLIN-761) Fix runtime property like Topic.name not available in Compaction when fetching configStore object

2019-05-01 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/GOBBLIN-761?focusedWorklogId=236059=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-236059
 ]

ASF GitHub Bot logged work on GOBBLIN-761:
--

Author: ASF GitHub Bot
Created on: 01/May/19 23:35
Start Date: 01/May/19 23:35
Worklog Time Spent: 10m 
  Work Description: autumnust commented on pull request #2625: 
[GOBBLIN-761]Only instantiate topic-specific configStore object when topic.name 
is available
URL: https://github.com/apache/incubator-gobblin/pull/2625
 
 
   … available
   
   Dear Gobblin maintainers,
   
   Please accept this PR. I understand that it will not be reviewed until I 
have checked off all the steps below!
   
   
   ### JIRA
   - https://issues.apache.org/jira/browse/GOBBLIN-761
   
   
   ### Description
   - [ ] Here are some details about my PR, including screenshots (if 
applicable):
   
   
   ### Tests
   - [ ] My PR adds the following unit tests __OR__ does not need testing for 
this extremely good reason:
   
   
   ### Commits
   - [ ] My commits all reference JIRA issues in their subject lines, and I 
have squashed multiple commits if they address the same issue. In addition, my 
commits follow the guidelines from "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)":
   1. Subject is separated from body by a blank line
   2. Subject is limited to 50 characters
   3. Subject does not end with a period
   4. Subject uses the imperative mood ("add", not "adding")
   5. Body wraps at 72 characters
   6. Body explains "what" and "why", not "how"
   
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 236059)
Time Spent: 10m
Remaining Estimate: 0h

> Fix runtime property like Topic.name not available in Compaction when 
> fetching configStore object
> -
>
> Key: GOBBLIN-761
> URL: https://issues.apache.org/jira/browse/GOBBLIN-761
> Project: Apache Gobblin
>  Issue Type: Improvement
>Reporter: Lei Sun
>Priority: Major
>  Time Spent: 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (GOBBLIN-707) combine & standardize all gobblin scripts into one master script & restructure configs accordingly

2019-05-01 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/GOBBLIN-707?focusedWorklogId=236061=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-236061
 ]

ASF GitHub Bot logged work on GOBBLIN-707:
--

Author: ASF GitHub Bot
Created on: 01/May/19 23:41
Start Date: 01/May/19 23:41
Worklog Time Spent: 10m 
  Work Description: jhsenjaliya commented on issue #2578: [GOBBLIN-707] 
rewrite gobblin script to combine all modes and command
URL: 
https://github.com/apache/incubator-gobblin/pull/2578#issuecomment-488498198
 
 
   @ibuenros , @autumnust, Would you pls take a look ? Thanks
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 236061)
Time Spent: 5.5h  (was: 5h 20m)

> combine & standardize all gobblin scripts into one master script & 
> restructure configs accordingly
> --
>
> Key: GOBBLIN-707
> URL: https://issues.apache.org/jira/browse/GOBBLIN-707
> Project: Apache Gobblin
>  Issue Type: Improvement
>Reporter: Jay Sen
>Priority: Major
>  Time Spent: 5.5h
>  Remaining Estimate: 0h
>
> gobblin supports multiple modes of executions ( CLI, Standalone, 
> cluster-master, cluster-worker, AWS, YARN, MR ) and various command lines 
> utility to run cli and admin commands. There is a individual script for each 
> of them.
> Having individual script introduces lot of issues
>  # all scripts handles gobblin variables, user parameters differently, and 
> its highly inconsistent among various different gobblin scripts
>  # functionality around start, stop, status checking and handling PID's among 
> lot of other things, varies vastly as per the implementation of the script.
>  # features like GC & JVM params, log4j file selection, classpath 
> calculation, etc... exists in some gobblin scripts but not all, adding to 
> inconsistent user experience.
>  # maintaining total 13 script would be too much effort.
> Also all the gobblin scripts share lot of common code to handle params, 
> start, stop services, status checks, pid handling, etc... combining all the 
> scripts into  1 not only makes maintenance easier but also brings clarity and 
> consistency.
>  
> Solution:
> 1. there can be one gobblin.sh script to handle all gobblin commands and 
> deployment options as per following signature. NOTE: This
> {{gobblin.sh   }}
>  {{gobblin.sh   }}
> {{commands values: admin, cli, statestore-check, statestore-clean, 
> historystore-manager, classpath}}
>  {{service values: standalone, cluster-master, cluster-worker, aws, yarn, mr, 
> service}}
> with above change, following becomes valid command.
> {code:java}
> # all under GobblinCli class
> gobblin run listQuickApps  –> gobblin cli run listQuickApps
> gobblin run listQuickApps  –> gobblin cli run listQuickApps
> gobblin run  -> gobblin cli run 
> # class: JobStateToJsonConverter
> statestore-checker.sh  -> gobblin statestore-checker 
> # class: StateStoreCleaner
> statestore-clean.sh  -> gobblin statestore-clean 
> # class: DatabaseJobHistoryStoreSchemaManager
> historystore-manager.sh  -> gobblin historystore-manager 
> # class: Cli
> gobblin-admin.sh-> gobblin admin 
> # all gobblin deployment modes
> gobblin-cluster-master.sh   -> gobblin cluster-mater start|stop|status
> gobblin-cluster-worker.sh   -> gobblin cluster-mater start|stop|status
> gobblin-compaction.sh   -> gobblin cluster-mater start|stop|status
> gobblin-env.sh  -> gobblin cluster-mater start|stop|status
> gobblin-mapreduce.sh-> gobblin cluster-mater start|stop|status
> gobblin-service.sh  -> gobblin cluster-mater start|stop|status
> gobblin-standalone.sh   -> gobblin cluster-mater start|stop|status
> gobblin-yarn.sh -> gobblin cluster-mater start|stop|status
> {code}
>  
> 2. Also configs needs to be structured and deduped accordingly to make it 
> clear on which config will be picked up for which execution mode.
>  
>  {color:#FF}
>  NOTE: this refactoring to gobblin.sh, changes the way all gobblin commands 
> where ran before{color}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [incubator-gobblin] jhsenjaliya commented on issue #2578: [GOBBLIN-707] rewrite gobblin script to combine all modes and command

2019-05-01 Thread GitBox
jhsenjaliya commented on issue #2578: [GOBBLIN-707] rewrite gobblin script to 
combine all modes and command
URL: 
https://github.com/apache/incubator-gobblin/pull/2578#issuecomment-488498198
 
 
   @ibuenros , @autumnust, Would you pls take a look ? Thanks


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (GOBBLIN-761) Fix runtime property like Topic.name not available in Compaction when fetching configStore object

2019-05-01 Thread Lei Sun (JIRA)
Lei Sun created GOBBLIN-761:
---

 Summary: Fix runtime property like Topic.name not available in 
Compaction when fetching configStore object
 Key: GOBBLIN-761
 URL: https://issues.apache.org/jira/browse/GOBBLIN-761
 Project: Apache Gobblin
  Issue Type: Improvement
Reporter: Lei Sun






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (GOBBLIN-740) influxDB failes to write due to invalid retentionPolicy

2019-05-01 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/GOBBLIN-740?focusedWorklogId=236023=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-236023
 ]

ASF GitHub Bot logged work on GOBBLIN-740:
--

Author: ASF GitHub Bot
Created on: 01/May/19 22:36
Start Date: 01/May/19 22:36
Worklog Time Spent: 10m 
  Work Description: jhsenjaliya commented on issue #2607: [GOBBLIN-740] 
Remove setting retentionPolicy on every Point write
URL: 
https://github.com/apache/incubator-gobblin/pull/2607#issuecomment-488465685
 
 
   @shirshanka , can u or someone pls merge this? thanks
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 236023)
Time Spent: 10m
Remaining Estimate: 0h

> influxDB failes to write due to invalid retentionPolicy
> ---
>
> Key: GOBBLIN-740
> URL: https://issues.apache.org/jira/browse/GOBBLIN-740
> Project: Apache Gobblin
>  Issue Type: Bug
>Reporter: Jay Sen
>Priority: Minor
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Gobblin tried to set {{retentionPolicy}} while pushing a {{Point}} in 
> {{InfluxDBPusher}}.
> latest influxDB does not come with, nor you can even create "default" 
> retention policy.
> Gobblin dont have to really set the retention policy every time it pushes a 
> {{Point}}, It also dont have a way to configure retention policy per metric, 
> so having a retention policy can be completely offline while setting up 
> InfluxDB database.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [incubator-gobblin] jhsenjaliya commented on issue #2607: [GOBBLIN-740] Remove setting retentionPolicy on every Point write

2019-05-01 Thread GitBox
jhsenjaliya commented on issue #2607: [GOBBLIN-740] Remove setting 
retentionPolicy on every Point write
URL: 
https://github.com/apache/incubator-gobblin/pull/2607#issuecomment-488465685
 
 
   @shirshanka , can u or someone pls merge this? thanks


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Resolved] (GOBBLIN-756) Add flow template that updates when file system is modified

2019-05-01 Thread Jack Moseley (JIRA)


 [ 
https://issues.apache.org/jira/browse/GOBBLIN-756?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jack Moseley resolved GOBBLIN-756.
--
Resolution: Fixed

> Add flow template that updates when file system is modified
> ---
>
> Key: GOBBLIN-756
> URL: https://issues.apache.org/jira/browse/GOBBLIN-756
> Project: Apache Gobblin
>  Issue Type: Improvement
>Reporter: Jack Moseley
>Priority: Major
>  Time Spent: 4.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (GOBBLIN-756) Add flow template that updates when file system is modified

2019-05-01 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/GOBBLIN-756?focusedWorklogId=235946=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-235946
 ]

ASF GitHub Bot logged work on GOBBLIN-756:
--

Author: ASF GitHub Bot
Created on: 01/May/19 18:49
Start Date: 01/May/19 18:49
Worklog Time Spent: 10m 
  Work Description: asfgit commented on pull request #2620: [GOBBLIN-756] 
Add flow catalog that updates when filesystem is modified
URL: https://github.com/apache/incubator-gobblin/pull/2620
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 235946)
Time Spent: 4.5h  (was: 4h 20m)

> Add flow template that updates when file system is modified
> ---
>
> Key: GOBBLIN-756
> URL: https://issues.apache.org/jira/browse/GOBBLIN-756
> Project: Apache Gobblin
>  Issue Type: Improvement
>Reporter: Jack Moseley
>Priority: Major
>  Time Spent: 4.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [incubator-gobblin] asfgit closed pull request #2620: [GOBBLIN-756] Add flow catalog that updates when filesystem is modified

2019-05-01 Thread GitBox
asfgit closed pull request #2620: [GOBBLIN-756] Add flow catalog that updates 
when filesystem is modified
URL: https://github.com/apache/incubator-gobblin/pull/2620
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Resolved] (GOBBLIN-760) Improve retrying behavior of throttling clients

2019-05-01 Thread Issac Buenrostro (JIRA)


 [ 
https://issues.apache.org/jira/browse/GOBBLIN-760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Issac Buenrostro resolved GOBBLIN-760.
--
   Resolution: Fixed
Fix Version/s: 0.15.0

Issue resolved by pull request #2624
[https://github.com/apache/incubator-gobblin/pull/2624]

> Improve retrying behavior of throttling clients
> ---
>
> Key: GOBBLIN-760
> URL: https://issues.apache.org/jira/browse/GOBBLIN-760
> Project: Apache Gobblin
>  Issue Type: Improvement
>Reporter: Issac Buenrostro
>Assignee: Issac Buenrostro
>Priority: Major
> Fix For: 0.15.0
>
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [incubator-gobblin] asfgit closed pull request #2624: [GOBBLIN-760] Improve retrying behavior of throttling client, add more informative …

2019-05-01 Thread GitBox
asfgit closed pull request #2624: [GOBBLIN-760] Improve retrying behavior of 
throttling client, add more informative …
URL: https://github.com/apache/incubator-gobblin/pull/2624
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Work logged] (GOBBLIN-760) Improve retrying behavior of throttling clients

2019-05-01 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/GOBBLIN-760?focusedWorklogId=235922=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-235922
 ]

ASF GitHub Bot logged work on GOBBLIN-760:
--

Author: ASF GitHub Bot
Created on: 01/May/19 18:03
Start Date: 01/May/19 18:03
Worklog Time Spent: 10m 
  Work Description: asfgit commented on pull request #2624: [GOBBLIN-760] 
Improve retrying behavior of throttling client, add more informative …
URL: https://github.com/apache/incubator-gobblin/pull/2624
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 235922)
Time Spent: 1.5h  (was: 1h 20m)

> Improve retrying behavior of throttling clients
> ---
>
> Key: GOBBLIN-760
> URL: https://issues.apache.org/jira/browse/GOBBLIN-760
> Project: Apache Gobblin
>  Issue Type: Improvement
>Reporter: Issac Buenrostro
>Assignee: Issac Buenrostro
>Priority: Major
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (GOBBLIN-760) Improve retrying behavior of throttling clients

2019-05-01 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/GOBBLIN-760?focusedWorklogId=235907=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-235907
 ]

ASF GitHub Bot logged work on GOBBLIN-760:
--

Author: ASF GitHub Bot
Created on: 01/May/19 17:35
Start Date: 01/May/19 17:35
Worklog Time Spent: 10m 
  Work Description: htran1 commented on issue #2624: [GOBBLIN-760] Improve 
retrying behavior of throttling client, add more informative …
URL: 
https://github.com/apache/incubator-gobblin/pull/2624#issuecomment-488352670
 
 
   +1
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 235907)
Time Spent: 1h 20m  (was: 1h 10m)

> Improve retrying behavior of throttling clients
> ---
>
> Key: GOBBLIN-760
> URL: https://issues.apache.org/jira/browse/GOBBLIN-760
> Project: Apache Gobblin
>  Issue Type: Improvement
>Reporter: Issac Buenrostro
>Assignee: Issac Buenrostro
>Priority: Major
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [incubator-gobblin] htran1 commented on issue #2624: [GOBBLIN-760] Improve retrying behavior of throttling client, add more informative …

2019-05-01 Thread GitBox
htran1 commented on issue #2624: [GOBBLIN-760] Improve retrying behavior of 
throttling client, add more informative …
URL: 
https://github.com/apache/incubator-gobblin/pull/2624#issuecomment-488352670
 
 
   +1


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


Re: Gobblin for Google Season of Docs 2019

2019-05-01 Thread Jayesh Senjaliya
Actually, we just missed the deadline:
https://developers.google.com/season-of-docs/docs/timeline

may be next time

- Jay


On Tue, Apr 30, 2019 at 11:56 AM Abhishek Tiwari  wrote:

> I didn't get a chance to review all requirements, but sounds like something
> we should do.
> How do we get started on this? Who wants to be the mentor?
>
> Abhishek
>
> On Tue, Apr 30, 2019 at 11:54 AM Jay Sen  wrote:
>
> > did anybody get chance to look at this? do anybody think we might have
> > capacity to leverage this opportunity?
> >
> > - Jay
> >
> > On Thu, Apr 18, 2019 at 4:26 PM Jay Sen  wrote:
> >
> > > Hello Everyone
> > >
> > > Google Season of Docs is here and Apache Gobblin can greatly benefit
> from
> > > this program.
> > >
> > > https://developers.google.com/season-of-docs/docs/get-started/
> > >
> > > It is the program that brings open source projects and technical
> writers
> > > together. It is similar to Google Summer of Code but
> > > for Documentation, and the technical writers are not students
> > > but experienced folks in the field.
> > >
> > > I think it will be very beneficial for Gobblin to participate, as we
> need
> > > lot of improvements in docs in general.
> > >
> > > Can we have volunteer mentors to participate in the process? I will be
> > > happy to contribute on whatever contribution i can make.
> > >
> > > Thanks
> > > Jay
> > >
> >
>


[jira] [Work logged] (GOBBLIN-760) Improve retrying behavior of throttling clients

2019-05-01 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/GOBBLIN-760?focusedWorklogId=235869=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-235869
 ]

ASF GitHub Bot logged work on GOBBLIN-760:
--

Author: ASF GitHub Bot
Created on: 01/May/19 16:33
Start Date: 01/May/19 16:33
Worklog Time Spent: 10m 
  Work Description: ibuenros commented on pull request #2624: [GOBBLIN-760] 
Improve retrying behavior of throttling client, add more informative …
URL: https://github.com/apache/incubator-gobblin/pull/2624#discussion_r280128758
 
 

 ##
 File path: 
gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/org/apache/gobblin/util/limiter/BatchedPermitsRequester.java
 ##
 @@ -196,18 +205,26 @@ private long elapsedMillis(long startTimeNanos) {
   /**
* Send a new permit request to the server.
*/
-  private void maybeSendNewPermitRequest() {
-if (!this.requestSemaphore.tryAcquire()) {
-  return;
+  private synchronized void maybeSendNewPermitRequest() {
+while (!this.requestSemaphore.tryAcquire()) {
+  if (this.currentCallback == null) {
+throw new IllegalStateException("Semaphore is unavailable while 
callback is null!");
+  }
+  if (this.currentCallback.elapsedTime() > 3) {
 
 Review comment:
   It is not actually derived from that, it's an arbitrary number. Added some 
comments and logs though.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 235869)
Time Spent: 1h 10m  (was: 1h)

> Improve retrying behavior of throttling clients
> ---
>
> Key: GOBBLIN-760
> URL: https://issues.apache.org/jira/browse/GOBBLIN-760
> Project: Apache Gobblin
>  Issue Type: Improvement
>Reporter: Issac Buenrostro
>Assignee: Issac Buenrostro
>Priority: Major
>  Time Spent: 1h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [incubator-gobblin] ibuenros commented on a change in pull request #2624: [GOBBLIN-760] Improve retrying behavior of throttling client, add more informative …

2019-05-01 Thread GitBox
ibuenros commented on a change in pull request #2624: [GOBBLIN-760] Improve 
retrying behavior of throttling client, add more informative …
URL: https://github.com/apache/incubator-gobblin/pull/2624#discussion_r280128758
 
 

 ##
 File path: 
gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/org/apache/gobblin/util/limiter/BatchedPermitsRequester.java
 ##
 @@ -196,18 +205,26 @@ private long elapsedMillis(long startTimeNanos) {
   /**
* Send a new permit request to the server.
*/
-  private void maybeSendNewPermitRequest() {
-if (!this.requestSemaphore.tryAcquire()) {
-  return;
+  private synchronized void maybeSendNewPermitRequest() {
+while (!this.requestSemaphore.tryAcquire()) {
+  if (this.currentCallback == null) {
+throw new IllegalStateException("Semaphore is unavailable while 
callback is null!");
+  }
+  if (this.currentCallback.elapsedTime() > 3) {
 
 Review comment:
   It is not actually derived from that, it's an arbitrary number. Added some 
comments and logs though.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Work logged] (GOBBLIN-760) Improve retrying behavior of throttling clients

2019-05-01 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/GOBBLIN-760?focusedWorklogId=235867=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-235867
 ]

ASF GitHub Bot logged work on GOBBLIN-760:
--

Author: ASF GitHub Bot
Created on: 01/May/19 16:32
Start Date: 01/May/19 16:32
Worklog Time Spent: 10m 
  Work Description: ibuenros commented on pull request #2624: [GOBBLIN-760] 
Improve retrying behavior of throttling client, add more informative …
URL: https://github.com/apache/incubator-gobblin/pull/2624#discussion_r280128632
 
 

 ##
 File path: 
gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/org/apache/gobblin/util/limiter/BatchedPermitsRequester.java
 ##
 @@ -364,10 +393,22 @@ public void onSuccess(Response result) 
{
   }
 }
 
+public long elapsedTime() {
+  return System.currentTimeMillis() - this.startTime;
+}
+
+public synchronized void clearCallback() {
 
 Review comment:
   We want to make sure a late callback and a timeout don't both release the 
semaphore.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 235867)
Time Spent: 50m  (was: 40m)

> Improve retrying behavior of throttling clients
> ---
>
> Key: GOBBLIN-760
> URL: https://issues.apache.org/jira/browse/GOBBLIN-760
> Project: Apache Gobblin
>  Issue Type: Improvement
>Reporter: Issac Buenrostro
>Assignee: Issac Buenrostro
>Priority: Major
>  Time Spent: 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (GOBBLIN-760) Improve retrying behavior of throttling clients

2019-05-01 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/GOBBLIN-760?focusedWorklogId=235868=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-235868
 ]

ASF GitHub Bot logged work on GOBBLIN-760:
--

Author: ASF GitHub Bot
Created on: 01/May/19 16:32
Start Date: 01/May/19 16:32
Worklog Time Spent: 10m 
  Work Description: ibuenros commented on pull request #2624: [GOBBLIN-760] 
Improve retrying behavior of throttling client, add more informative …
URL: https://github.com/apache/incubator-gobblin/pull/2624#discussion_r280128649
 
 

 ##
 File path: 
gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/org/apache/gobblin/util/limiter/BatchedPermitsRequester.java
 ##
 @@ -87,6 +88,7 @@
   private static final long RETRY_DELAY_ON_NON_RETRIABLE_EXCEPTION = 6; // 
10 minutes
   private static final double MAX_DEPLETION_RATE = 1e20;
   public static final int MAX_GROWTH_REQUEST = 2;
+  private static final long GET_PERMITS_MAX_SLEEP = 1000;
 
 Review comment:
   Fixed
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 235868)
Time Spent: 1h  (was: 50m)

> Improve retrying behavior of throttling clients
> ---
>
> Key: GOBBLIN-760
> URL: https://issues.apache.org/jira/browse/GOBBLIN-760
> Project: Apache Gobblin
>  Issue Type: Improvement
>Reporter: Issac Buenrostro
>Assignee: Issac Buenrostro
>Priority: Major
>  Time Spent: 1h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [incubator-gobblin] ibuenros commented on a change in pull request #2624: [GOBBLIN-760] Improve retrying behavior of throttling client, add more informative …

2019-05-01 Thread GitBox
ibuenros commented on a change in pull request #2624: [GOBBLIN-760] Improve 
retrying behavior of throttling client, add more informative …
URL: https://github.com/apache/incubator-gobblin/pull/2624#discussion_r280128649
 
 

 ##
 File path: 
gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/org/apache/gobblin/util/limiter/BatchedPermitsRequester.java
 ##
 @@ -87,6 +88,7 @@
   private static final long RETRY_DELAY_ON_NON_RETRIABLE_EXCEPTION = 6; // 
10 minutes
   private static final double MAX_DEPLETION_RATE = 1e20;
   public static final int MAX_GROWTH_REQUEST = 2;
+  private static final long GET_PERMITS_MAX_SLEEP = 1000;
 
 Review comment:
   Fixed


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Work logged] (GOBBLIN-760) Improve retrying behavior of throttling clients

2019-05-01 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/GOBBLIN-760?focusedWorklogId=235701=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-235701
 ]

ASF GitHub Bot logged work on GOBBLIN-760:
--

Author: ASF GitHub Bot
Created on: 01/May/19 06:21
Start Date: 01/May/19 06:21
Worklog Time Spent: 10m 
  Work Description: autumnust commented on pull request #2624: 
[GOBBLIN-760] Improve retrying behavior of throttling client, add more 
informative …
URL: https://github.com/apache/incubator-gobblin/pull/2624#discussion_r280011104
 
 

 ##
 File path: 
gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/org/apache/gobblin/util/limiter/BatchedPermitsRequester.java
 ##
 @@ -364,10 +393,22 @@ public void onSuccess(Response result) 
{
   }
 }
 
+public long elapsedTime() {
+  return System.currentTimeMillis() - this.startTime;
+}
+
+public synchronized void clearCallback() {
 
 Review comment:
   Why this method is needed if there's only one on-the-fly request ( 
corresponding one callback) being existed ? Can't it be simply `clearSemaphore` 
instead of having a `callbackcleared` method for protection? 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 235701)
Time Spent: 40m  (was: 0.5h)

> Improve retrying behavior of throttling clients
> ---
>
> Key: GOBBLIN-760
> URL: https://issues.apache.org/jira/browse/GOBBLIN-760
> Project: Apache Gobblin
>  Issue Type: Improvement
>Reporter: Issac Buenrostro
>Assignee: Issac Buenrostro
>Priority: Major
>  Time Spent: 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (GOBBLIN-760) Improve retrying behavior of throttling clients

2019-05-01 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/GOBBLIN-760?focusedWorklogId=235702=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-235702
 ]

ASF GitHub Bot logged work on GOBBLIN-760:
--

Author: ASF GitHub Bot
Created on: 01/May/19 06:21
Start Date: 01/May/19 06:21
Worklog Time Spent: 10m 
  Work Description: autumnust commented on pull request #2624: 
[GOBBLIN-760] Improve retrying behavior of throttling client, add more 
informative …
URL: https://github.com/apache/incubator-gobblin/pull/2624#discussion_r280011339
 
 

 ##
 File path: 
gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/org/apache/gobblin/util/limiter/BatchedPermitsRequester.java
 ##
 @@ -87,6 +88,7 @@
   private static final long RETRY_DELAY_ON_NON_RETRIABLE_EXCEPTION = 6; // 
10 minutes
   private static final double MAX_DEPLETION_RATE = 1e20;
   public static final int MAX_GROWTH_REQUEST = 2;
+  private static final long GET_PERMITS_MAX_SLEEP = 1000;
 
 Review comment:
   Add unit in name? 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 235702)

> Improve retrying behavior of throttling clients
> ---
>
> Key: GOBBLIN-760
> URL: https://issues.apache.org/jira/browse/GOBBLIN-760
> Project: Apache Gobblin
>  Issue Type: Improvement
>Reporter: Issac Buenrostro
>Assignee: Issac Buenrostro
>Priority: Major
>  Time Spent: 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (GOBBLIN-760) Improve retrying behavior of throttling clients

2019-05-01 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/GOBBLIN-760?focusedWorklogId=235700=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-235700
 ]

ASF GitHub Bot logged work on GOBBLIN-760:
--

Author: ASF GitHub Bot
Created on: 01/May/19 06:21
Start Date: 01/May/19 06:21
Worklog Time Spent: 10m 
  Work Description: autumnust commented on pull request #2624: 
[GOBBLIN-760] Improve retrying behavior of throttling client, add more 
informative …
URL: https://github.com/apache/incubator-gobblin/pull/2624#discussion_r280011772
 
 

 ##
 File path: 
gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/org/apache/gobblin/util/limiter/BatchedPermitsRequester.java
 ##
 @@ -196,18 +205,26 @@ private long elapsedMillis(long startTimeNanos) {
   /**
* Send a new permit request to the server.
*/
-  private void maybeSendNewPermitRequest() {
-if (!this.requestSemaphore.tryAcquire()) {
-  return;
+  private synchronized void maybeSendNewPermitRequest() {
+while (!this.requestSemaphore.tryAcquire()) {
+  if (this.currentCallback == null) {
+throw new IllegalStateException("Semaphore is unavailable while 
callback is null!");
+  }
+  if (this.currentCallback.elapsedTime() > 3) {
 
 Review comment:
   This number seems to be derived from maxTimeOut / (maxRetry + 1) ? Shall we 
explicitly place a formula here instead of a plain number 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 235700)
Time Spent: 0.5h  (was: 20m)

> Improve retrying behavior of throttling clients
> ---
>
> Key: GOBBLIN-760
> URL: https://issues.apache.org/jira/browse/GOBBLIN-760
> Project: Apache Gobblin
>  Issue Type: Improvement
>Reporter: Issac Buenrostro
>Assignee: Issac Buenrostro
>Priority: Major
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [incubator-gobblin] autumnust commented on a change in pull request #2624: [GOBBLIN-760] Improve retrying behavior of throttling client, add more informative …

2019-05-01 Thread GitBox
autumnust commented on a change in pull request #2624: [GOBBLIN-760] Improve 
retrying behavior of throttling client, add more informative …
URL: https://github.com/apache/incubator-gobblin/pull/2624#discussion_r280011339
 
 

 ##
 File path: 
gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/org/apache/gobblin/util/limiter/BatchedPermitsRequester.java
 ##
 @@ -87,6 +88,7 @@
   private static final long RETRY_DELAY_ON_NON_RETRIABLE_EXCEPTION = 6; // 
10 minutes
   private static final double MAX_DEPLETION_RATE = 1e20;
   public static final int MAX_GROWTH_REQUEST = 2;
+  private static final long GET_PERMITS_MAX_SLEEP = 1000;
 
 Review comment:
   Add unit in name? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-gobblin] autumnust commented on a change in pull request #2624: [GOBBLIN-760] Improve retrying behavior of throttling client, add more informative …

2019-05-01 Thread GitBox
autumnust commented on a change in pull request #2624: [GOBBLIN-760] Improve 
retrying behavior of throttling client, add more informative …
URL: https://github.com/apache/incubator-gobblin/pull/2624#discussion_r280011104
 
 

 ##
 File path: 
gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/org/apache/gobblin/util/limiter/BatchedPermitsRequester.java
 ##
 @@ -364,10 +393,22 @@ public void onSuccess(Response result) 
{
   }
 }
 
+public long elapsedTime() {
+  return System.currentTimeMillis() - this.startTime;
+}
+
+public synchronized void clearCallback() {
 
 Review comment:
   Why this method is needed if there's only one on-the-fly request ( 
corresponding one callback) being existed ? Can't it be simply `clearSemaphore` 
instead of having a `callbackcleared` method for protection? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-gobblin] autumnust commented on a change in pull request #2624: [GOBBLIN-760] Improve retrying behavior of throttling client, add more informative …

2019-05-01 Thread GitBox
autumnust commented on a change in pull request #2624: [GOBBLIN-760] Improve 
retrying behavior of throttling client, add more informative …
URL: https://github.com/apache/incubator-gobblin/pull/2624#discussion_r280011772
 
 

 ##
 File path: 
gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/org/apache/gobblin/util/limiter/BatchedPermitsRequester.java
 ##
 @@ -196,18 +205,26 @@ private long elapsedMillis(long startTimeNanos) {
   /**
* Send a new permit request to the server.
*/
-  private void maybeSendNewPermitRequest() {
-if (!this.requestSemaphore.tryAcquire()) {
-  return;
+  private synchronized void maybeSendNewPermitRequest() {
+while (!this.requestSemaphore.tryAcquire()) {
+  if (this.currentCallback == null) {
+throw new IllegalStateException("Semaphore is unavailable while 
callback is null!");
+  }
+  if (this.currentCallback.elapsedTime() > 3) {
 
 Review comment:
   This number seems to be derived from maxTimeOut / (maxRetry + 1) ? Shall we 
explicitly place a formula here instead of a plain number 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services