[GitHub] [incubator-gobblin] vikrambohra commented on a change in pull request #2622: Added random key generator for reporters. Added reporters fo…
vikrambohra commented on a change in pull request #2622: Added random key generator for reporters. Added reporters fo… URL: https://github.com/apache/incubator-gobblin/pull/2622#discussion_r280293704 ## File path: gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KeyValueEventObjectReporter.java ## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.metrics.kafka; + +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Random; + +import org.apache.avro.generic.GenericRecord; +import org.apache.commons.lang3.tuple.Pair; + +import com.google.common.base.Optional; +import com.google.common.base.Splitter; +import com.google.common.collect.Lists; +import com.typesafe.config.Config; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.metrics.GobblinTrackingEvent; +import org.apache.gobblin.metrics.MetricContext; +import org.apache.gobblin.metrics.reporter.EventReporter; +import org.apache.gobblin.util.AvroUtils; +import org.apache.gobblin.util.ConfigUtils; + + +@Slf4j +public class KeyValueEventObjectReporter extends EventReporter { + private static final String PUSHER_CONFIG = "pusherConfig"; + + protected Optional> keys = Optional.absent(); + protected final String randomKey; + protected KeyValuePusher pusher; + protected Optional> namespaceOverride; + protected final String topic; + + public KeyValueEventObjectReporter(Builder builder){ +super(builder); + +this.topic=builder.topic; +this.namespaceOverride=builder.namespaceOverride; +Config config = builder.config.get(); + +if (builder.pusher.isPresent()) { + this.pusher = builder.pusher.get(); +} else { + Config pusherConfig = ConfigUtils.getConfigOrEmpty(config, PUSHER_CONFIG).withFallback(config); + String pusherClassName = ConfigUtils.getString(config, "pusherClass", PusherUtils.DEFAULT_KEY_VALUE_PUSHER_CLASS_NAME); + this.pusher = PusherUtils.getKeyValuePusher(pusherClassName, builder.brokers, builder.topic, Optional.of(pusherConfig)); +} + +this.closer.register(this.pusher); + +randomKey=String.valueOf(new Random().nextInt(100)); Review comment: Not sure if that's the right place. buildKey is called for each push message and we do not want to create a new random number per message. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-gobblin] zxcware commented on a change in pull request #2622: Added random key generator for reporters. Added reporters fo…
zxcware commented on a change in pull request #2622: Added random key generator for reporters. Added reporters fo… URL: https://github.com/apache/incubator-gobblin/pull/2622#discussion_r280284592 ## File path: gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/KafkaReportingFormats.java ## @@ -17,87 +17,170 @@ package org.apache.gobblin.metrics; +import java.io.IOException; import java.util.List; import java.util.Properties; +import com.codahale.metrics.ScheduledReporter; import com.google.common.base.Splitter; +import com.typesafe.config.Config; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.metrics.kafka.KafkaAvroEventKeyValueReporter; import org.apache.gobblin.metrics.kafka.KafkaAvroEventReporter; import org.apache.gobblin.metrics.kafka.KafkaAvroReporter; import org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistry; import org.apache.gobblin.metrics.kafka.KafkaEventReporter; +import org.apache.gobblin.metrics.kafka.KeyValueEventObjectReporter; +import org.apache.gobblin.metrics.kafka.KeyValueMetricObjectReporter; import org.apache.gobblin.metrics.kafka.KafkaReporter; +import org.apache.gobblin.metrics.kafka.PusherUtils; +import org.apache.gobblin.metrics.reporter.util.KafkaAvroReporterUtil; +import org.apache.gobblin.util.ConfigUtils; /** * Kafka reporting formats enumeration. */ public enum KafkaReportingFormats { - AVRO, - AVRO_KEY_VALUE, - JSON; - - /** - * Get a {@link org.apache.gobblin.metrics.kafka.KafkaReporter.Builder} for this reporting format. - * - * @param properties {@link Properties} containing information to build reporters. - * @return {@link org.apache.gobblin.metrics.kafka.KafkaReporter.Builder}. - */ - public KafkaReporter.Builder metricReporterBuilder(Properties properties) { -switch (this) { - case AVRO: -KafkaAvroReporter.Builder builder = KafkaAvroReporter.BuilderFactory.newBuilder(); -if (Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY, - ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY))) { - builder.withSchemaRegistry(new KafkaAvroSchemaRegistry(properties)); -} -return builder; - case JSON: -return KafkaReporter.BuilderFactory.newBuilder(); - default: -// This should never happen. -throw new IllegalArgumentException("KafkaReportingFormat not recognized."); + AVRO() { + +@Override +public void buildMetricsScheduledReporter(String brokers, String topic, Properties properties) throws IOException { + + KafkaAvroReporter.Builder builder = KafkaAvroReporter.BuilderFactory.newBuilder(); + if (Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY, + ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY))) { +builder.withSchemaRegistry(new KafkaAvroSchemaRegistry(properties)); + } + builder.build(brokers, topic, properties); + +} + +@Override +public ScheduledReporter buildEventsScheduledReporter(String brokers, String topic, MetricContext context, Properties properties) throws IOException { + + KafkaAvroEventReporter.Builder builder = KafkaAvroEventReporter.Factory.forContext(context); + if (Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY, + ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY))) { +builder.withSchemaRegistry(new KafkaAvroSchemaRegistry(properties)); + } + String pusherClassName = properties.containsKey(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY_FOR_EVENTS) + ? properties.getProperty(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY_FOR_EVENTS) + : properties.getProperty(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY, + PusherUtils.DEFAULT_KAFKA_PUSHER_CLASS_NAME); + builder.withPusherClassName(pusherClassName); + + Config allConfig = ConfigUtils.propertiesToConfig(properties); + // the kafka configuration is composed of the metrics reporting specific keys with a fallback to the shared + // kafka config + Config kafkaConfig = ConfigUtils.getConfigOrEmpty(allConfig, + PusherUtils.METRICS_REPORTING_KAFKA_CONFIG_PREFIX).withFallback(ConfigUtils.getConfigOrEmpty(allConfig, + ConfigurationKeys.SHARED_KAFKA_CONFIG_PREFIX)); + + builder.withConfig(kafkaConfig); + + return builder.build(brokers, topic); + } - } - - /** - * Get a {@link org.apache.gobblin.metrics.kafka.KafkaEventReporter.Builder} for this reporting format. - * @param context {@link MetricContext} that should be reported. - * @param properties {@link Properties} containing information to build reporters. - * @return {@link
[GitHub] [incubator-gobblin] zxcware commented on a change in pull request #2622: Added random key generator for reporters. Added reporters fo…
zxcware commented on a change in pull request #2622: Added random key generator for reporters. Added reporters fo… URL: https://github.com/apache/incubator-gobblin/pull/2622#discussion_r280284613 ## File path: gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/KafkaReportingFormats.java ## @@ -17,87 +17,170 @@ package org.apache.gobblin.metrics; +import java.io.IOException; import java.util.List; import java.util.Properties; +import com.codahale.metrics.ScheduledReporter; import com.google.common.base.Splitter; +import com.typesafe.config.Config; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.metrics.kafka.KafkaAvroEventKeyValueReporter; import org.apache.gobblin.metrics.kafka.KafkaAvroEventReporter; import org.apache.gobblin.metrics.kafka.KafkaAvroReporter; import org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistry; import org.apache.gobblin.metrics.kafka.KafkaEventReporter; +import org.apache.gobblin.metrics.kafka.KeyValueEventObjectReporter; +import org.apache.gobblin.metrics.kafka.KeyValueMetricObjectReporter; import org.apache.gobblin.metrics.kafka.KafkaReporter; +import org.apache.gobblin.metrics.kafka.PusherUtils; +import org.apache.gobblin.metrics.reporter.util.KafkaAvroReporterUtil; +import org.apache.gobblin.util.ConfigUtils; /** * Kafka reporting formats enumeration. */ public enum KafkaReportingFormats { - AVRO, - AVRO_KEY_VALUE, - JSON; - - /** - * Get a {@link org.apache.gobblin.metrics.kafka.KafkaReporter.Builder} for this reporting format. - * - * @param properties {@link Properties} containing information to build reporters. - * @return {@link org.apache.gobblin.metrics.kafka.KafkaReporter.Builder}. - */ - public KafkaReporter.Builder metricReporterBuilder(Properties properties) { -switch (this) { - case AVRO: -KafkaAvroReporter.Builder builder = KafkaAvroReporter.BuilderFactory.newBuilder(); -if (Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY, - ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY))) { - builder.withSchemaRegistry(new KafkaAvroSchemaRegistry(properties)); -} -return builder; - case JSON: -return KafkaReporter.BuilderFactory.newBuilder(); - default: -// This should never happen. -throw new IllegalArgumentException("KafkaReportingFormat not recognized."); + AVRO() { + +@Override +public void buildMetricsScheduledReporter(String brokers, String topic, Properties properties) throws IOException { + + KafkaAvroReporter.Builder builder = KafkaAvroReporter.BuilderFactory.newBuilder(); + if (Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY, + ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY))) { +builder.withSchemaRegistry(new KafkaAvroSchemaRegistry(properties)); + } + builder.build(brokers, topic, properties); + +} + +@Override +public ScheduledReporter buildEventsScheduledReporter(String brokers, String topic, MetricContext context, Properties properties) throws IOException { + + KafkaAvroEventReporter.Builder builder = KafkaAvroEventReporter.Factory.forContext(context); + if (Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY, + ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY))) { +builder.withSchemaRegistry(new KafkaAvroSchemaRegistry(properties)); + } + String pusherClassName = properties.containsKey(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY_FOR_EVENTS) + ? properties.getProperty(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY_FOR_EVENTS) + : properties.getProperty(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY, + PusherUtils.DEFAULT_KAFKA_PUSHER_CLASS_NAME); + builder.withPusherClassName(pusherClassName); + + Config allConfig = ConfigUtils.propertiesToConfig(properties); + // the kafka configuration is composed of the metrics reporting specific keys with a fallback to the shared + // kafka config + Config kafkaConfig = ConfigUtils.getConfigOrEmpty(allConfig, + PusherUtils.METRICS_REPORTING_KAFKA_CONFIG_PREFIX).withFallback(ConfigUtils.getConfigOrEmpty(allConfig, + ConfigurationKeys.SHARED_KAFKA_CONFIG_PREFIX)); + + builder.withConfig(kafkaConfig); + + return builder.build(brokers, topic); + } - } - - /** - * Get a {@link org.apache.gobblin.metrics.kafka.KafkaEventReporter.Builder} for this reporting format. - * @param context {@link MetricContext} that should be reported. - * @param properties {@link Properties} containing information to build reporters. - * @return {@link
[GitHub] [incubator-gobblin] zxcware commented on a change in pull request #2622: Added random key generator for reporters. Added reporters fo…
zxcware commented on a change in pull request #2622: Added random key generator for reporters. Added reporters fo… URL: https://github.com/apache/incubator-gobblin/pull/2622#discussion_r280284912 ## File path: gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KeyValueEventObjectReporter.java ## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.metrics.kafka; + +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Random; + +import org.apache.avro.generic.GenericRecord; +import org.apache.commons.lang3.tuple.Pair; + +import com.google.common.base.Optional; +import com.google.common.base.Splitter; +import com.google.common.collect.Lists; +import com.typesafe.config.Config; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.metrics.GobblinTrackingEvent; +import org.apache.gobblin.metrics.MetricContext; +import org.apache.gobblin.metrics.reporter.EventReporter; +import org.apache.gobblin.util.AvroUtils; +import org.apache.gobblin.util.ConfigUtils; + + +@Slf4j +public class KeyValueEventObjectReporter extends EventReporter { + private static final String PUSHER_CONFIG = "pusherConfig"; + + protected Optional> keys = Optional.absent(); + protected final String randomKey; + protected KeyValuePusher pusher; + protected Optional> namespaceOverride; + protected final String topic; + + public KeyValueEventObjectReporter(Builder builder){ +super(builder); + +this.topic=builder.topic; +this.namespaceOverride=builder.namespaceOverride; +Config config = builder.config.get(); + +if (builder.pusher.isPresent()) { + this.pusher = builder.pusher.get(); +} else { + Config pusherConfig = ConfigUtils.getConfigOrEmpty(config, PUSHER_CONFIG).withFallback(config); + String pusherClassName = ConfigUtils.getString(config, "pusherClass", PusherUtils.DEFAULT_KEY_VALUE_PUSHER_CLASS_NAME); Review comment: define constant for "pusherClass" This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-gobblin] zxcware commented on a change in pull request #2622: Added random key generator for reporters. Added reporters fo…
zxcware commented on a change in pull request #2622: Added random key generator for reporters. Added reporters fo… URL: https://github.com/apache/incubator-gobblin/pull/2622#discussion_r280285071 ## File path: gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KeyValueEventObjectReporter.java ## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.metrics.kafka; + +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Random; + +import org.apache.avro.generic.GenericRecord; +import org.apache.commons.lang3.tuple.Pair; + +import com.google.common.base.Optional; +import com.google.common.base.Splitter; +import com.google.common.collect.Lists; +import com.typesafe.config.Config; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.metrics.GobblinTrackingEvent; +import org.apache.gobblin.metrics.MetricContext; +import org.apache.gobblin.metrics.reporter.EventReporter; +import org.apache.gobblin.util.AvroUtils; +import org.apache.gobblin.util.ConfigUtils; + + +@Slf4j +public class KeyValueEventObjectReporter extends EventReporter { Review comment: install code style template to format your codes. Instructions are here: https://gobblin.readthedocs.io/en/latest/developer-guide/CodingStyle/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-gobblin] zxcware commented on a change in pull request #2622: Added random key generator for reporters. Added reporters fo…
zxcware commented on a change in pull request #2622: Added random key generator for reporters. Added reporters fo… URL: https://github.com/apache/incubator-gobblin/pull/2622#discussion_r280285604 ## File path: gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KeyValueEventObjectReporter.java ## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.metrics.kafka; + +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Random; + +import org.apache.avro.generic.GenericRecord; +import org.apache.commons.lang3.tuple.Pair; + +import com.google.common.base.Optional; +import com.google.common.base.Splitter; +import com.google.common.collect.Lists; +import com.typesafe.config.Config; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.metrics.GobblinTrackingEvent; +import org.apache.gobblin.metrics.MetricContext; +import org.apache.gobblin.metrics.reporter.EventReporter; +import org.apache.gobblin.util.AvroUtils; +import org.apache.gobblin.util.ConfigUtils; + + +@Slf4j +public class KeyValueEventObjectReporter extends EventReporter { + private static final String PUSHER_CONFIG = "pusherConfig"; + + protected Optional> keys = Optional.absent(); + protected final String randomKey; + protected KeyValuePusher pusher; + protected Optional> namespaceOverride; + protected final String topic; + + public KeyValueEventObjectReporter(Builder builder){ +super(builder); + +this.topic=builder.topic; +this.namespaceOverride=builder.namespaceOverride; +Config config = builder.config.get(); + +if (builder.pusher.isPresent()) { + this.pusher = builder.pusher.get(); +} else { + Config pusherConfig = ConfigUtils.getConfigOrEmpty(config, PUSHER_CONFIG).withFallback(config); + String pusherClassName = ConfigUtils.getString(config, "pusherClass", PusherUtils.DEFAULT_KEY_VALUE_PUSHER_CLASS_NAME); + this.pusher = PusherUtils.getKeyValuePusher(pusherClassName, builder.brokers, builder.topic, Optional.of(pusherConfig)); +} + +this.closer.register(this.pusher); + +randomKey=String.valueOf(new Random().nextInt(100)); +String pusherKeys_Key = "pusherKeys"; +if (config.hasPath(pusherKeys_Key)) { + List keys = Splitter.on(",").omitEmptyStrings().trimResults().splitToList(config.getString(pusherKeys_Key)); + this.keys = Optional.of(keys); +}else{ + log.warn("Key not assigned from config. Please set it with property {}", ConfigurationKeys.METRICS_REPORTING_EVENTS_KAFKAPUSHERKEYS); + log.warn("Using generated number " + randomKey + " as key"); +} + } + + @Override + public void reportEventQueue(Queue queue) { +log.info("Emitting report using KeyValueEventObjectReporter"); + +List> events = Lists.newArrayList(); +GobblinTrackingEvent event; + +while(null != (event = queue.poll())){ + GenericRecord record = AvroUtils.overrideNameAndNamespace(event, this.topic, this.namespaceOverride); + events.add(Pair.of(buildKey(event), record)); +} + +if (!events.isEmpty()) { + this.pusher.pushKeyValueMessages(events); +} + } + + protected String buildKey(GobblinTrackingEvent event) { + +String key = randomKey; +if (this.keys.isPresent()) { + + StringBuilder keyBuilder = new StringBuilder(); + for (String keyPart : keys.get()) { +Map metadata = event.getMetadata(); +if (metadata.containsKey(keyPart)) { + keyBuilder.append(metadata.get(keyPart)); +} else { + log.error("{} not found in the GobblinTrackingEvent. Setting key to null.", key); + keyBuilder = null; + break; +} + } + + key = (keyBuilder == null) ? key : keyBuilder.toString(); +} + +return key; + } + + public static class Factory { +/** + * Returns a new {@link KeyValueEventObjectReporter.Builder} for {@link KeyValueEventObjectReporter}. + * Will automatically add all Context tags to the reporter. + * + * @param context the {@link MetricContext} to report + *
[GitHub] [incubator-gobblin] zxcware commented on a change in pull request #2622: Added random key generator for reporters. Added reporters fo…
zxcware commented on a change in pull request #2622: Added random key generator for reporters. Added reporters fo… URL: https://github.com/apache/incubator-gobblin/pull/2622#discussion_r280285710 ## File path: gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KeyValueMetricObjectReporter.java ## @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.metrics.kafka; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Random; + +import org.apache.avro.generic.GenericRecord; +import org.apache.commons.lang3.tuple.Pair; + +import com.google.common.base.Optional; +import com.google.common.base.Splitter; +import com.google.common.collect.Lists; +import com.typesafe.config.Config; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.metrics.MetricReport; +import org.apache.gobblin.metrics.reporter.MetricReportReporter; +import org.apache.gobblin.util.AvroUtils; +import org.apache.gobblin.util.ConfigUtils; + +@Slf4j +public class KeyValueMetricObjectReporter extends MetricReportReporter { Review comment: comments for `KeyValueEventsObjectReporter` are also applied here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-gobblin] zxcware commented on a change in pull request #2622: Added random key generator for reporters. Added reporters fo…
zxcware commented on a change in pull request #2622: Added random key generator for reporters. Added reporters fo… URL: https://github.com/apache/incubator-gobblin/pull/2622#discussion_r280285532 ## File path: gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KeyValueEventObjectReporter.java ## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.metrics.kafka; + +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Random; + +import org.apache.avro.generic.GenericRecord; +import org.apache.commons.lang3.tuple.Pair; + +import com.google.common.base.Optional; +import com.google.common.base.Splitter; +import com.google.common.collect.Lists; +import com.typesafe.config.Config; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.metrics.GobblinTrackingEvent; +import org.apache.gobblin.metrics.MetricContext; +import org.apache.gobblin.metrics.reporter.EventReporter; +import org.apache.gobblin.util.AvroUtils; +import org.apache.gobblin.util.ConfigUtils; + + +@Slf4j +public class KeyValueEventObjectReporter extends EventReporter { + private static final String PUSHER_CONFIG = "pusherConfig"; + + protected Optional> keys = Optional.absent(); + protected final String randomKey; + protected KeyValuePusher pusher; + protected Optional> namespaceOverride; + protected final String topic; + + public KeyValueEventObjectReporter(Builder builder){ +super(builder); + +this.topic=builder.topic; +this.namespaceOverride=builder.namespaceOverride; +Config config = builder.config.get(); + +if (builder.pusher.isPresent()) { + this.pusher = builder.pusher.get(); +} else { + Config pusherConfig = ConfigUtils.getConfigOrEmpty(config, PUSHER_CONFIG).withFallback(config); + String pusherClassName = ConfigUtils.getString(config, "pusherClass", PusherUtils.DEFAULT_KEY_VALUE_PUSHER_CLASS_NAME); + this.pusher = PusherUtils.getKeyValuePusher(pusherClassName, builder.brokers, builder.topic, Optional.of(pusherConfig)); +} + +this.closer.register(this.pusher); + +randomKey=String.valueOf(new Random().nextInt(100)); Review comment: This is `buildKey` logic, put it there. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-gobblin] zxcware commented on a change in pull request #2622: Added random key generator for reporters. Added reporters fo…
zxcware commented on a change in pull request #2622: Added random key generator for reporters. Added reporters fo… URL: https://github.com/apache/incubator-gobblin/pull/2622#discussion_r280285097 ## File path: gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KeyValueEventObjectReporter.java ## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.metrics.kafka; + +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Random; + +import org.apache.avro.generic.GenericRecord; +import org.apache.commons.lang3.tuple.Pair; + +import com.google.common.base.Optional; +import com.google.common.base.Splitter; +import com.google.common.collect.Lists; +import com.typesafe.config.Config; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.metrics.GobblinTrackingEvent; +import org.apache.gobblin.metrics.MetricContext; +import org.apache.gobblin.metrics.reporter.EventReporter; +import org.apache.gobblin.util.AvroUtils; +import org.apache.gobblin.util.ConfigUtils; + + +@Slf4j +public class KeyValueEventObjectReporter extends EventReporter { + private static final String PUSHER_CONFIG = "pusherConfig"; Review comment: install code style template to format your codes. Instructions are here: https://gobblin.readthedocs.io/en/latest/developer-guide/CodingStyle/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-gobblin] zxcware commented on a change in pull request #2622: Added random key generator for reporters. Added reporters fo…
zxcware commented on a change in pull request #2622: Added random key generator for reporters. Added reporters fo… URL: https://github.com/apache/incubator-gobblin/pull/2622#discussion_r280284823 ## File path: gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/KafkaReportingFormats.java ## @@ -17,87 +17,170 @@ package org.apache.gobblin.metrics; +import java.io.IOException; import java.util.List; import java.util.Properties; +import com.codahale.metrics.ScheduledReporter; import com.google.common.base.Splitter; +import com.typesafe.config.Config; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.metrics.kafka.KafkaAvroEventKeyValueReporter; import org.apache.gobblin.metrics.kafka.KafkaAvroEventReporter; import org.apache.gobblin.metrics.kafka.KafkaAvroReporter; import org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistry; import org.apache.gobblin.metrics.kafka.KafkaEventReporter; +import org.apache.gobblin.metrics.kafka.KeyValueEventObjectReporter; +import org.apache.gobblin.metrics.kafka.KeyValueMetricObjectReporter; import org.apache.gobblin.metrics.kafka.KafkaReporter; +import org.apache.gobblin.metrics.kafka.PusherUtils; +import org.apache.gobblin.metrics.reporter.util.KafkaAvroReporterUtil; +import org.apache.gobblin.util.ConfigUtils; /** * Kafka reporting formats enumeration. */ public enum KafkaReportingFormats { - AVRO, - AVRO_KEY_VALUE, - JSON; - - /** - * Get a {@link org.apache.gobblin.metrics.kafka.KafkaReporter.Builder} for this reporting format. - * - * @param properties {@link Properties} containing information to build reporters. - * @return {@link org.apache.gobblin.metrics.kafka.KafkaReporter.Builder}. - */ - public KafkaReporter.Builder metricReporterBuilder(Properties properties) { -switch (this) { - case AVRO: -KafkaAvroReporter.Builder builder = KafkaAvroReporter.BuilderFactory.newBuilder(); -if (Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY, - ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY))) { - builder.withSchemaRegistry(new KafkaAvroSchemaRegistry(properties)); -} -return builder; - case JSON: -return KafkaReporter.BuilderFactory.newBuilder(); - default: -// This should never happen. -throw new IllegalArgumentException("KafkaReportingFormat not recognized."); + AVRO() { + +@Override +public void buildMetricsScheduledReporter(String brokers, String topic, Properties properties) throws IOException { + + KafkaAvroReporter.Builder builder = KafkaAvroReporter.BuilderFactory.newBuilder(); + if (Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY, + ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY))) { +builder.withSchemaRegistry(new KafkaAvroSchemaRegistry(properties)); + } + builder.build(brokers, topic, properties); + +} + +@Override +public ScheduledReporter buildEventsScheduledReporter(String brokers, String topic, MetricContext context, Properties properties) throws IOException { + + KafkaAvroEventReporter.Builder builder = KafkaAvroEventReporter.Factory.forContext(context); + if (Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY, + ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY))) { +builder.withSchemaRegistry(new KafkaAvroSchemaRegistry(properties)); + } + String pusherClassName = properties.containsKey(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY_FOR_EVENTS) + ? properties.getProperty(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY_FOR_EVENTS) + : properties.getProperty(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY, + PusherUtils.DEFAULT_KAFKA_PUSHER_CLASS_NAME); + builder.withPusherClassName(pusherClassName); + + Config allConfig = ConfigUtils.propertiesToConfig(properties); + // the kafka configuration is composed of the metrics reporting specific keys with a fallback to the shared + // kafka config + Config kafkaConfig = ConfigUtils.getConfigOrEmpty(allConfig, + PusherUtils.METRICS_REPORTING_KAFKA_CONFIG_PREFIX).withFallback(ConfigUtils.getConfigOrEmpty(allConfig, + ConfigurationKeys.SHARED_KAFKA_CONFIG_PREFIX)); + + builder.withConfig(kafkaConfig); + + return builder.build(brokers, topic); + } - } - - /** - * Get a {@link org.apache.gobblin.metrics.kafka.KafkaEventReporter.Builder} for this reporting format. - * @param context {@link MetricContext} that should be reported. - * @param properties {@link Properties} containing information to build reporters. - * @return {@link
[GitHub] [incubator-gobblin] zxcware commented on a change in pull request #2622: Added random key generator for reporters. Added reporters fo…
zxcware commented on a change in pull request #2622: Added random key generator for reporters. Added reporters fo… URL: https://github.com/apache/incubator-gobblin/pull/2622#discussion_r280285885 ## File path: gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java ## @@ -875,11 +876,26 @@ public static Schema decorateRecordSchema(Schema inputSchema, @Nonnull List fieldMap, Schema outputSchema) { GenericRecord outputRecord = new GenericData.Record(outputSchema); -inputRecord.getSchema().getFields().forEach( -f -> outputRecord.put(f.name(), inputRecord.get(f.name())) -); +inputRecord.getSchema().getFields().forEach(f -> outputRecord.put(f.name(), inputRecord.get(f.name(; fieldMap.forEach((key, value) -> outputRecord.put(key, value)); return outputRecord; } + public static GenericRecord overrideNameAndNamespace(GenericRecord event, String nameOverride, Optional> namespaceOverride) { Review comment: Need unit test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-gobblin] zxcware commented on a change in pull request #2622: Added random key generator for reporters. Added reporters fo…
zxcware commented on a change in pull request #2622: Added random key generator for reporters. Added reporters fo… URL: https://github.com/apache/incubator-gobblin/pull/2622#discussion_r280285404 ## File path: gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KeyValueEventObjectReporter.java ## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.metrics.kafka; + +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Random; + +import org.apache.avro.generic.GenericRecord; +import org.apache.commons.lang3.tuple.Pair; + +import com.google.common.base.Optional; +import com.google.common.base.Splitter; +import com.google.common.collect.Lists; +import com.typesafe.config.Config; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.metrics.GobblinTrackingEvent; +import org.apache.gobblin.metrics.MetricContext; +import org.apache.gobblin.metrics.reporter.EventReporter; +import org.apache.gobblin.util.AvroUtils; +import org.apache.gobblin.util.ConfigUtils; + + +@Slf4j +public class KeyValueEventObjectReporter extends EventReporter { + private static final String PUSHER_CONFIG = "pusherConfig"; + + protected Optional> keys = Optional.absent(); + protected final String randomKey; + protected KeyValuePusher pusher; + protected Optional> namespaceOverride; + protected final String topic; + + public KeyValueEventObjectReporter(Builder builder){ +super(builder); + +this.topic=builder.topic; +this.namespaceOverride=builder.namespaceOverride; +Config config = builder.config.get(); + +if (builder.pusher.isPresent()) { + this.pusher = builder.pusher.get(); +} else { + Config pusherConfig = ConfigUtils.getConfigOrEmpty(config, PUSHER_CONFIG).withFallback(config); + String pusherClassName = ConfigUtils.getString(config, "pusherClass", PusherUtils.DEFAULT_KEY_VALUE_PUSHER_CLASS_NAME); + this.pusher = PusherUtils.getKeyValuePusher(pusherClassName, builder.brokers, builder.topic, Optional.of(pusherConfig)); +} + +this.closer.register(this.pusher); + +randomKey=String.valueOf(new Random().nextInt(100)); +String pusherKeys_Key = "pusherKeys"; +if (config.hasPath(pusherKeys_Key)) { + List keys = Splitter.on(",").omitEmptyStrings().trimResults().splitToList(config.getString(pusherKeys_Key)); + this.keys = Optional.of(keys); +}else{ + log.warn("Key not assigned from config. Please set it with property {}", ConfigurationKeys.METRICS_REPORTING_EVENTS_KAFKAPUSHERKEYS); + log.warn("Using generated number " + randomKey + " as key"); +} + } + + @Override + public void reportEventQueue(Queue queue) { +log.info("Emitting report using KeyValueEventObjectReporter"); + +List> events = Lists.newArrayList(); +GobblinTrackingEvent event; + +while(null != (event = queue.poll())){ + GenericRecord record = AvroUtils.overrideNameAndNamespace(event, this.topic, this.namespaceOverride); + events.add(Pair.of(buildKey(event), record)); +} + +if (!events.isEmpty()) { + this.pusher.pushKeyValueMessages(events); +} + } + + protected String buildKey(GobblinTrackingEvent event) { + +String key = randomKey; +if (this.keys.isPresent()) { + + StringBuilder keyBuilder = new StringBuilder(); + for (String keyPart : keys.get()) { +Map metadata = event.getMetadata(); +if (metadata.containsKey(keyPart)) { + keyBuilder.append(metadata.get(keyPart)); Review comment: should provide a proper separator between keyparts This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-gobblin] zxcware commented on a change in pull request #2622: Added random key generator for reporters. Added reporters fo…
zxcware commented on a change in pull request #2622: Added random key generator for reporters. Added reporters fo… URL: https://github.com/apache/incubator-gobblin/pull/2622#discussion_r280284689 ## File path: gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java ## @@ -44,7 +44,7 @@ public ScheduledReporter newScheduledReporter(MetricRegistry registry, Propertie ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_ENABLED))) { return null; } -log.info("Reporting metrics to Kafka"); Review comment: This is a redundant log and can be removed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-gobblin] zxcware commented on a change in pull request #2622: Added random key generator for reporters. Added reporters fo…
zxcware commented on a change in pull request #2622: Added random key generator for reporters. Added reporters fo… URL: https://github.com/apache/incubator-gobblin/pull/2622#discussion_r280285611 ## File path: gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KeyValueEventObjectReporter.java ## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.metrics.kafka; + +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Random; + +import org.apache.avro.generic.GenericRecord; +import org.apache.commons.lang3.tuple.Pair; + +import com.google.common.base.Optional; +import com.google.common.base.Splitter; +import com.google.common.collect.Lists; +import com.typesafe.config.Config; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.metrics.GobblinTrackingEvent; +import org.apache.gobblin.metrics.MetricContext; +import org.apache.gobblin.metrics.reporter.EventReporter; +import org.apache.gobblin.util.AvroUtils; +import org.apache.gobblin.util.ConfigUtils; + + +@Slf4j +public class KeyValueEventObjectReporter extends EventReporter { + private static final String PUSHER_CONFIG = "pusherConfig"; + + protected Optional> keys = Optional.absent(); + protected final String randomKey; + protected KeyValuePusher pusher; + protected Optional> namespaceOverride; + protected final String topic; + + public KeyValueEventObjectReporter(Builder builder){ +super(builder); + +this.topic=builder.topic; +this.namespaceOverride=builder.namespaceOverride; +Config config = builder.config.get(); + +if (builder.pusher.isPresent()) { + this.pusher = builder.pusher.get(); +} else { + Config pusherConfig = ConfigUtils.getConfigOrEmpty(config, PUSHER_CONFIG).withFallback(config); + String pusherClassName = ConfigUtils.getString(config, "pusherClass", PusherUtils.DEFAULT_KEY_VALUE_PUSHER_CLASS_NAME); + this.pusher = PusherUtils.getKeyValuePusher(pusherClassName, builder.brokers, builder.topic, Optional.of(pusherConfig)); +} + +this.closer.register(this.pusher); + +randomKey=String.valueOf(new Random().nextInt(100)); +String pusherKeys_Key = "pusherKeys"; +if (config.hasPath(pusherKeys_Key)) { + List keys = Splitter.on(",").omitEmptyStrings().trimResults().splitToList(config.getString(pusherKeys_Key)); + this.keys = Optional.of(keys); +}else{ + log.warn("Key not assigned from config. Please set it with property {}", ConfigurationKeys.METRICS_REPORTING_EVENTS_KAFKAPUSHERKEYS); + log.warn("Using generated number " + randomKey + " as key"); +} + } + + @Override + public void reportEventQueue(Queue queue) { +log.info("Emitting report using KeyValueEventObjectReporter"); + +List> events = Lists.newArrayList(); +GobblinTrackingEvent event; + +while(null != (event = queue.poll())){ + GenericRecord record = AvroUtils.overrideNameAndNamespace(event, this.topic, this.namespaceOverride); + events.add(Pair.of(buildKey(event), record)); +} + +if (!events.isEmpty()) { + this.pusher.pushKeyValueMessages(events); +} + } + + protected String buildKey(GobblinTrackingEvent event) { + +String key = randomKey; +if (this.keys.isPresent()) { + + StringBuilder keyBuilder = new StringBuilder(); + for (String keyPart : keys.get()) { +Map metadata = event.getMetadata(); +if (metadata.containsKey(keyPart)) { + keyBuilder.append(metadata.get(keyPart)); +} else { + log.error("{} not found in the GobblinTrackingEvent. Setting key to null.", key); + keyBuilder = null; + break; +} + } + + key = (keyBuilder == null) ? key : keyBuilder.toString(); +} + +return key; + } + + public static class Factory { +/** + * Returns a new {@link KeyValueEventObjectReporter.Builder} for {@link KeyValueEventObjectReporter}. + * Will automatically add all Context tags to the reporter. + * + * @param context the {@link MetricContext} to report + *
[GitHub] [incubator-gobblin] zxcware commented on a change in pull request #2622: Added random key generator for reporters. Added reporters fo…
zxcware commented on a change in pull request #2622: Added random key generator for reporters. Added reporters fo… URL: https://github.com/apache/incubator-gobblin/pull/2622#discussion_r280285309 ## File path: gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KeyValueEventObjectReporter.java ## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.metrics.kafka; + +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Random; + +import org.apache.avro.generic.GenericRecord; +import org.apache.commons.lang3.tuple.Pair; + +import com.google.common.base.Optional; +import com.google.common.base.Splitter; +import com.google.common.collect.Lists; +import com.typesafe.config.Config; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.metrics.GobblinTrackingEvent; +import org.apache.gobblin.metrics.MetricContext; +import org.apache.gobblin.metrics.reporter.EventReporter; +import org.apache.gobblin.util.AvroUtils; +import org.apache.gobblin.util.ConfigUtils; + + +@Slf4j +public class KeyValueEventObjectReporter extends EventReporter { + private static final String PUSHER_CONFIG = "pusherConfig"; + + protected Optional> keys = Optional.absent(); + protected final String randomKey; + protected KeyValuePusher pusher; + protected Optional> namespaceOverride; + protected final String topic; + + public KeyValueEventObjectReporter(Builder builder){ +super(builder); + +this.topic=builder.topic; +this.namespaceOverride=builder.namespaceOverride; +Config config = builder.config.get(); + +if (builder.pusher.isPresent()) { + this.pusher = builder.pusher.get(); +} else { + Config pusherConfig = ConfigUtils.getConfigOrEmpty(config, PUSHER_CONFIG).withFallback(config); + String pusherClassName = ConfigUtils.getString(config, "pusherClass", PusherUtils.DEFAULT_KEY_VALUE_PUSHER_CLASS_NAME); + this.pusher = PusherUtils.getKeyValuePusher(pusherClassName, builder.brokers, builder.topic, Optional.of(pusherConfig)); +} + +this.closer.register(this.pusher); + +randomKey=String.valueOf(new Random().nextInt(100)); +String pusherKeys_Key = "pusherKeys"; +if (config.hasPath(pusherKeys_Key)) { + List keys = Splitter.on(",").omitEmptyStrings().trimResults().splitToList(config.getString(pusherKeys_Key)); + this.keys = Optional.of(keys); +}else{ + log.warn("Key not assigned from config. Please set it with property {}", ConfigurationKeys.METRICS_REPORTING_EVENTS_KAFKAPUSHERKEYS); + log.warn("Using generated number " + randomKey + " as key"); +} + } + + @Override + public void reportEventQueue(Queue queue) { +log.info("Emitting report using KeyValueEventObjectReporter"); + +List> events = Lists.newArrayList(); +GobblinTrackingEvent event; + +while(null != (event = queue.poll())){ + GenericRecord record = AvroUtils.overrideNameAndNamespace(event, this.topic, this.namespaceOverride); + events.add(Pair.of(buildKey(event), record)); +} + +if (!events.isEmpty()) { + this.pusher.pushKeyValueMessages(events); +} + } + + protected String buildKey(GobblinTrackingEvent event) { Review comment: define as `private`. Initially reserve the right to extend, allowing easy refactor if any in the future. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Work logged] (GOBBLIN-707) combine & standardize all gobblin scripts into one master script & restructure configs accordingly
[ https://issues.apache.org/jira/browse/GOBBLIN-707?focusedWorklogId=236098=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-236098 ] ASF GitHub Bot logged work on GOBBLIN-707: -- Author: ASF GitHub Bot Created on: 02/May/19 02:42 Start Date: 02/May/19 02:42 Worklog Time Spent: 10m Work Description: jhsenjaliya commented on issue #2578: [GOBBLIN-707] rewrite gobblin script to combine all modes and command URL: https://github.com/apache/incubator-gobblin/pull/2578#issuecomment-488537624 1 out of 4 test env has failed, will look into it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 236098) Time Spent: 5h 40m (was: 5.5h) > combine & standardize all gobblin scripts into one master script & > restructure configs accordingly > -- > > Key: GOBBLIN-707 > URL: https://issues.apache.org/jira/browse/GOBBLIN-707 > Project: Apache Gobblin > Issue Type: Improvement >Reporter: Jay Sen >Priority: Major > Time Spent: 5h 40m > Remaining Estimate: 0h > > gobblin supports multiple modes of executions ( CLI, Standalone, > cluster-master, cluster-worker, AWS, YARN, MR ) and various command lines > utility to run cli and admin commands. There is a individual script for each > of them. > Having individual script introduces lot of issues > # all scripts handles gobblin variables, user parameters differently, and > its highly inconsistent among various different gobblin scripts > # functionality around start, stop, status checking and handling PID's among > lot of other things, varies vastly as per the implementation of the script. > # features like GC & JVM params, log4j file selection, classpath > calculation, etc... exists in some gobblin scripts but not all, adding to > inconsistent user experience. > # maintaining total 13 script would be too much effort. > Also all the gobblin scripts share lot of common code to handle params, > start, stop services, status checks, pid handling, etc... combining all the > scripts into 1 not only makes maintenance easier but also brings clarity and > consistency. > > Solution: > 1. there can be one gobblin.sh script to handle all gobblin commands and > deployment options as per following signature. NOTE: This > {{gobblin.sh }} > {{gobblin.sh }} > {{commands values: admin, cli, statestore-check, statestore-clean, > historystore-manager, classpath}} > {{service values: standalone, cluster-master, cluster-worker, aws, yarn, mr, > service}} > with above change, following becomes valid command. > {code:java} > # all under GobblinCli class > gobblin run listQuickApps –> gobblin cli run listQuickApps > gobblin run listQuickApps –> gobblin cli run listQuickApps > gobblin run -> gobblin cli run > # class: JobStateToJsonConverter > statestore-checker.sh -> gobblin statestore-checker > # class: StateStoreCleaner > statestore-clean.sh -> gobblin statestore-clean > # class: DatabaseJobHistoryStoreSchemaManager > historystore-manager.sh -> gobblin historystore-manager > # class: Cli > gobblin-admin.sh-> gobblin admin > # all gobblin deployment modes > gobblin-cluster-master.sh -> gobblin cluster-mater start|stop|status > gobblin-cluster-worker.sh -> gobblin cluster-mater start|stop|status > gobblin-compaction.sh -> gobblin cluster-mater start|stop|status > gobblin-env.sh -> gobblin cluster-mater start|stop|status > gobblin-mapreduce.sh-> gobblin cluster-mater start|stop|status > gobblin-service.sh -> gobblin cluster-mater start|stop|status > gobblin-standalone.sh -> gobblin cluster-mater start|stop|status > gobblin-yarn.sh -> gobblin cluster-mater start|stop|status > {code} > > 2. Also configs needs to be structured and deduped accordingly to make it > clear on which config will be picked up for which execution mode. > > {color:#FF} > NOTE: this refactoring to gobblin.sh, changes the way all gobblin commands > where ran before{color} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [incubator-gobblin] jhsenjaliya commented on issue #2578: [GOBBLIN-707] rewrite gobblin script to combine all modes and command
jhsenjaliya commented on issue #2578: [GOBBLIN-707] rewrite gobblin script to combine all modes and command URL: https://github.com/apache/incubator-gobblin/pull/2578#issuecomment-488537624 1 out of 4 test env has failed, will look into it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-gobblin] asfgit closed pull request #2625: [GOBBLIN-761]Only instantiate topic-specific configStore object when topic.name is available
asfgit closed pull request #2625: [GOBBLIN-761]Only instantiate topic-specific configStore object when topic.name is available URL: https://github.com/apache/incubator-gobblin/pull/2625 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Work logged] (GOBBLIN-761) Fix runtime property like Topic.name not available in Compaction when fetching configStore object
[ https://issues.apache.org/jira/browse/GOBBLIN-761?focusedWorklogId=236069=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-236069 ] ASF GitHub Bot logged work on GOBBLIN-761: -- Author: ASF GitHub Bot Created on: 02/May/19 00:21 Start Date: 02/May/19 00:21 Worklog Time Spent: 10m Work Description: asfgit commented on pull request #2625: [GOBBLIN-761]Only instantiate topic-specific configStore object when topic.name is available URL: https://github.com/apache/incubator-gobblin/pull/2625 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 236069) Time Spent: 20m (was: 10m) > Fix runtime property like Topic.name not available in Compaction when > fetching configStore object > - > > Key: GOBBLIN-761 > URL: https://issues.apache.org/jira/browse/GOBBLIN-761 > Project: Apache Gobblin > Issue Type: Improvement >Reporter: Lei Sun >Priority: Major > Time Spent: 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (GOBBLIN-761) Fix runtime property like Topic.name not available in Compaction when fetching configStore object
[ https://issues.apache.org/jira/browse/GOBBLIN-761?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hung Tran resolved GOBBLIN-761. --- Resolution: Fixed Fix Version/s: 0.15.0 Issue resolved by pull request #2625 [https://github.com/apache/incubator-gobblin/pull/2625] > Fix runtime property like Topic.name not available in Compaction when > fetching configStore object > - > > Key: GOBBLIN-761 > URL: https://issues.apache.org/jira/browse/GOBBLIN-761 > Project: Apache Gobblin > Issue Type: Improvement >Reporter: Lei Sun >Priority: Major > Fix For: 0.15.0 > > Time Spent: 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (GOBBLIN-761) Fix runtime property like Topic.name not available in Compaction when fetching configStore object
[ https://issues.apache.org/jira/browse/GOBBLIN-761?focusedWorklogId=236059=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-236059 ] ASF GitHub Bot logged work on GOBBLIN-761: -- Author: ASF GitHub Bot Created on: 01/May/19 23:35 Start Date: 01/May/19 23:35 Worklog Time Spent: 10m Work Description: autumnust commented on pull request #2625: [GOBBLIN-761]Only instantiate topic-specific configStore object when topic.name is available URL: https://github.com/apache/incubator-gobblin/pull/2625 … available Dear Gobblin maintainers, Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below! ### JIRA - https://issues.apache.org/jira/browse/GOBBLIN-761 ### Description - [ ] Here are some details about my PR, including screenshots (if applicable): ### Tests - [ ] My PR adds the following unit tests __OR__ does not need testing for this extremely good reason: ### Commits - [ ] My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)": 1. Subject is separated from body by a blank line 2. Subject is limited to 50 characters 3. Subject does not end with a period 4. Subject uses the imperative mood ("add", not "adding") 5. Body wraps at 72 characters 6. Body explains "what" and "why", not "how" This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 236059) Time Spent: 10m Remaining Estimate: 0h > Fix runtime property like Topic.name not available in Compaction when > fetching configStore object > - > > Key: GOBBLIN-761 > URL: https://issues.apache.org/jira/browse/GOBBLIN-761 > Project: Apache Gobblin > Issue Type: Improvement >Reporter: Lei Sun >Priority: Major > Time Spent: 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (GOBBLIN-707) combine & standardize all gobblin scripts into one master script & restructure configs accordingly
[ https://issues.apache.org/jira/browse/GOBBLIN-707?focusedWorklogId=236061=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-236061 ] ASF GitHub Bot logged work on GOBBLIN-707: -- Author: ASF GitHub Bot Created on: 01/May/19 23:41 Start Date: 01/May/19 23:41 Worklog Time Spent: 10m Work Description: jhsenjaliya commented on issue #2578: [GOBBLIN-707] rewrite gobblin script to combine all modes and command URL: https://github.com/apache/incubator-gobblin/pull/2578#issuecomment-488498198 @ibuenros , @autumnust, Would you pls take a look ? Thanks This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 236061) Time Spent: 5.5h (was: 5h 20m) > combine & standardize all gobblin scripts into one master script & > restructure configs accordingly > -- > > Key: GOBBLIN-707 > URL: https://issues.apache.org/jira/browse/GOBBLIN-707 > Project: Apache Gobblin > Issue Type: Improvement >Reporter: Jay Sen >Priority: Major > Time Spent: 5.5h > Remaining Estimate: 0h > > gobblin supports multiple modes of executions ( CLI, Standalone, > cluster-master, cluster-worker, AWS, YARN, MR ) and various command lines > utility to run cli and admin commands. There is a individual script for each > of them. > Having individual script introduces lot of issues > # all scripts handles gobblin variables, user parameters differently, and > its highly inconsistent among various different gobblin scripts > # functionality around start, stop, status checking and handling PID's among > lot of other things, varies vastly as per the implementation of the script. > # features like GC & JVM params, log4j file selection, classpath > calculation, etc... exists in some gobblin scripts but not all, adding to > inconsistent user experience. > # maintaining total 13 script would be too much effort. > Also all the gobblin scripts share lot of common code to handle params, > start, stop services, status checks, pid handling, etc... combining all the > scripts into 1 not only makes maintenance easier but also brings clarity and > consistency. > > Solution: > 1. there can be one gobblin.sh script to handle all gobblin commands and > deployment options as per following signature. NOTE: This > {{gobblin.sh }} > {{gobblin.sh }} > {{commands values: admin, cli, statestore-check, statestore-clean, > historystore-manager, classpath}} > {{service values: standalone, cluster-master, cluster-worker, aws, yarn, mr, > service}} > with above change, following becomes valid command. > {code:java} > # all under GobblinCli class > gobblin run listQuickApps –> gobblin cli run listQuickApps > gobblin run listQuickApps –> gobblin cli run listQuickApps > gobblin run -> gobblin cli run > # class: JobStateToJsonConverter > statestore-checker.sh -> gobblin statestore-checker > # class: StateStoreCleaner > statestore-clean.sh -> gobblin statestore-clean > # class: DatabaseJobHistoryStoreSchemaManager > historystore-manager.sh -> gobblin historystore-manager > # class: Cli > gobblin-admin.sh-> gobblin admin > # all gobblin deployment modes > gobblin-cluster-master.sh -> gobblin cluster-mater start|stop|status > gobblin-cluster-worker.sh -> gobblin cluster-mater start|stop|status > gobblin-compaction.sh -> gobblin cluster-mater start|stop|status > gobblin-env.sh -> gobblin cluster-mater start|stop|status > gobblin-mapreduce.sh-> gobblin cluster-mater start|stop|status > gobblin-service.sh -> gobblin cluster-mater start|stop|status > gobblin-standalone.sh -> gobblin cluster-mater start|stop|status > gobblin-yarn.sh -> gobblin cluster-mater start|stop|status > {code} > > 2. Also configs needs to be structured and deduped accordingly to make it > clear on which config will be picked up for which execution mode. > > {color:#FF} > NOTE: this refactoring to gobblin.sh, changes the way all gobblin commands > where ran before{color} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [incubator-gobblin] jhsenjaliya commented on issue #2578: [GOBBLIN-707] rewrite gobblin script to combine all modes and command
jhsenjaliya commented on issue #2578: [GOBBLIN-707] rewrite gobblin script to combine all modes and command URL: https://github.com/apache/incubator-gobblin/pull/2578#issuecomment-488498198 @ibuenros , @autumnust, Would you pls take a look ? Thanks This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (GOBBLIN-761) Fix runtime property like Topic.name not available in Compaction when fetching configStore object
Lei Sun created GOBBLIN-761: --- Summary: Fix runtime property like Topic.name not available in Compaction when fetching configStore object Key: GOBBLIN-761 URL: https://issues.apache.org/jira/browse/GOBBLIN-761 Project: Apache Gobblin Issue Type: Improvement Reporter: Lei Sun -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (GOBBLIN-740) influxDB failes to write due to invalid retentionPolicy
[ https://issues.apache.org/jira/browse/GOBBLIN-740?focusedWorklogId=236023=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-236023 ] ASF GitHub Bot logged work on GOBBLIN-740: -- Author: ASF GitHub Bot Created on: 01/May/19 22:36 Start Date: 01/May/19 22:36 Worklog Time Spent: 10m Work Description: jhsenjaliya commented on issue #2607: [GOBBLIN-740] Remove setting retentionPolicy on every Point write URL: https://github.com/apache/incubator-gobblin/pull/2607#issuecomment-488465685 @shirshanka , can u or someone pls merge this? thanks This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 236023) Time Spent: 10m Remaining Estimate: 0h > influxDB failes to write due to invalid retentionPolicy > --- > > Key: GOBBLIN-740 > URL: https://issues.apache.org/jira/browse/GOBBLIN-740 > Project: Apache Gobblin > Issue Type: Bug >Reporter: Jay Sen >Priority: Minor > Time Spent: 10m > Remaining Estimate: 0h > > Gobblin tried to set {{retentionPolicy}} while pushing a {{Point}} in > {{InfluxDBPusher}}. > latest influxDB does not come with, nor you can even create "default" > retention policy. > Gobblin dont have to really set the retention policy every time it pushes a > {{Point}}, It also dont have a way to configure retention policy per metric, > so having a retention policy can be completely offline while setting up > InfluxDB database. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [incubator-gobblin] jhsenjaliya commented on issue #2607: [GOBBLIN-740] Remove setting retentionPolicy on every Point write
jhsenjaliya commented on issue #2607: [GOBBLIN-740] Remove setting retentionPolicy on every Point write URL: https://github.com/apache/incubator-gobblin/pull/2607#issuecomment-488465685 @shirshanka , can u or someone pls merge this? thanks This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Resolved] (GOBBLIN-756) Add flow template that updates when file system is modified
[ https://issues.apache.org/jira/browse/GOBBLIN-756?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jack Moseley resolved GOBBLIN-756. -- Resolution: Fixed > Add flow template that updates when file system is modified > --- > > Key: GOBBLIN-756 > URL: https://issues.apache.org/jira/browse/GOBBLIN-756 > Project: Apache Gobblin > Issue Type: Improvement >Reporter: Jack Moseley >Priority: Major > Time Spent: 4.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (GOBBLIN-756) Add flow template that updates when file system is modified
[ https://issues.apache.org/jira/browse/GOBBLIN-756?focusedWorklogId=235946=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-235946 ] ASF GitHub Bot logged work on GOBBLIN-756: -- Author: ASF GitHub Bot Created on: 01/May/19 18:49 Start Date: 01/May/19 18:49 Worklog Time Spent: 10m Work Description: asfgit commented on pull request #2620: [GOBBLIN-756] Add flow catalog that updates when filesystem is modified URL: https://github.com/apache/incubator-gobblin/pull/2620 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 235946) Time Spent: 4.5h (was: 4h 20m) > Add flow template that updates when file system is modified > --- > > Key: GOBBLIN-756 > URL: https://issues.apache.org/jira/browse/GOBBLIN-756 > Project: Apache Gobblin > Issue Type: Improvement >Reporter: Jack Moseley >Priority: Major > Time Spent: 4.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [incubator-gobblin] asfgit closed pull request #2620: [GOBBLIN-756] Add flow catalog that updates when filesystem is modified
asfgit closed pull request #2620: [GOBBLIN-756] Add flow catalog that updates when filesystem is modified URL: https://github.com/apache/incubator-gobblin/pull/2620 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Resolved] (GOBBLIN-760) Improve retrying behavior of throttling clients
[ https://issues.apache.org/jira/browse/GOBBLIN-760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Issac Buenrostro resolved GOBBLIN-760. -- Resolution: Fixed Fix Version/s: 0.15.0 Issue resolved by pull request #2624 [https://github.com/apache/incubator-gobblin/pull/2624] > Improve retrying behavior of throttling clients > --- > > Key: GOBBLIN-760 > URL: https://issues.apache.org/jira/browse/GOBBLIN-760 > Project: Apache Gobblin > Issue Type: Improvement >Reporter: Issac Buenrostro >Assignee: Issac Buenrostro >Priority: Major > Fix For: 0.15.0 > > Time Spent: 1.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [incubator-gobblin] asfgit closed pull request #2624: [GOBBLIN-760] Improve retrying behavior of throttling client, add more informative …
asfgit closed pull request #2624: [GOBBLIN-760] Improve retrying behavior of throttling client, add more informative … URL: https://github.com/apache/incubator-gobblin/pull/2624 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Work logged] (GOBBLIN-760) Improve retrying behavior of throttling clients
[ https://issues.apache.org/jira/browse/GOBBLIN-760?focusedWorklogId=235922=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-235922 ] ASF GitHub Bot logged work on GOBBLIN-760: -- Author: ASF GitHub Bot Created on: 01/May/19 18:03 Start Date: 01/May/19 18:03 Worklog Time Spent: 10m Work Description: asfgit commented on pull request #2624: [GOBBLIN-760] Improve retrying behavior of throttling client, add more informative … URL: https://github.com/apache/incubator-gobblin/pull/2624 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 235922) Time Spent: 1.5h (was: 1h 20m) > Improve retrying behavior of throttling clients > --- > > Key: GOBBLIN-760 > URL: https://issues.apache.org/jira/browse/GOBBLIN-760 > Project: Apache Gobblin > Issue Type: Improvement >Reporter: Issac Buenrostro >Assignee: Issac Buenrostro >Priority: Major > Time Spent: 1.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (GOBBLIN-760) Improve retrying behavior of throttling clients
[ https://issues.apache.org/jira/browse/GOBBLIN-760?focusedWorklogId=235907=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-235907 ] ASF GitHub Bot logged work on GOBBLIN-760: -- Author: ASF GitHub Bot Created on: 01/May/19 17:35 Start Date: 01/May/19 17:35 Worklog Time Spent: 10m Work Description: htran1 commented on issue #2624: [GOBBLIN-760] Improve retrying behavior of throttling client, add more informative … URL: https://github.com/apache/incubator-gobblin/pull/2624#issuecomment-488352670 +1 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 235907) Time Spent: 1h 20m (was: 1h 10m) > Improve retrying behavior of throttling clients > --- > > Key: GOBBLIN-760 > URL: https://issues.apache.org/jira/browse/GOBBLIN-760 > Project: Apache Gobblin > Issue Type: Improvement >Reporter: Issac Buenrostro >Assignee: Issac Buenrostro >Priority: Major > Time Spent: 1h 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [incubator-gobblin] htran1 commented on issue #2624: [GOBBLIN-760] Improve retrying behavior of throttling client, add more informative …
htran1 commented on issue #2624: [GOBBLIN-760] Improve retrying behavior of throttling client, add more informative … URL: https://github.com/apache/incubator-gobblin/pull/2624#issuecomment-488352670 +1 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
Re: Gobblin for Google Season of Docs 2019
Actually, we just missed the deadline: https://developers.google.com/season-of-docs/docs/timeline may be next time - Jay On Tue, Apr 30, 2019 at 11:56 AM Abhishek Tiwari wrote: > I didn't get a chance to review all requirements, but sounds like something > we should do. > How do we get started on this? Who wants to be the mentor? > > Abhishek > > On Tue, Apr 30, 2019 at 11:54 AM Jay Sen wrote: > > > did anybody get chance to look at this? do anybody think we might have > > capacity to leverage this opportunity? > > > > - Jay > > > > On Thu, Apr 18, 2019 at 4:26 PM Jay Sen wrote: > > > > > Hello Everyone > > > > > > Google Season of Docs is here and Apache Gobblin can greatly benefit > from > > > this program. > > > > > > https://developers.google.com/season-of-docs/docs/get-started/ > > > > > > It is the program that brings open source projects and technical > writers > > > together. It is similar to Google Summer of Code but > > > for Documentation, and the technical writers are not students > > > but experienced folks in the field. > > > > > > I think it will be very beneficial for Gobblin to participate, as we > need > > > lot of improvements in docs in general. > > > > > > Can we have volunteer mentors to participate in the process? I will be > > > happy to contribute on whatever contribution i can make. > > > > > > Thanks > > > Jay > > > > > >
[jira] [Work logged] (GOBBLIN-760) Improve retrying behavior of throttling clients
[ https://issues.apache.org/jira/browse/GOBBLIN-760?focusedWorklogId=235869=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-235869 ] ASF GitHub Bot logged work on GOBBLIN-760: -- Author: ASF GitHub Bot Created on: 01/May/19 16:33 Start Date: 01/May/19 16:33 Worklog Time Spent: 10m Work Description: ibuenros commented on pull request #2624: [GOBBLIN-760] Improve retrying behavior of throttling client, add more informative … URL: https://github.com/apache/incubator-gobblin/pull/2624#discussion_r280128758 ## File path: gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/org/apache/gobblin/util/limiter/BatchedPermitsRequester.java ## @@ -196,18 +205,26 @@ private long elapsedMillis(long startTimeNanos) { /** * Send a new permit request to the server. */ - private void maybeSendNewPermitRequest() { -if (!this.requestSemaphore.tryAcquire()) { - return; + private synchronized void maybeSendNewPermitRequest() { +while (!this.requestSemaphore.tryAcquire()) { + if (this.currentCallback == null) { +throw new IllegalStateException("Semaphore is unavailable while callback is null!"); + } + if (this.currentCallback.elapsedTime() > 3) { Review comment: It is not actually derived from that, it's an arbitrary number. Added some comments and logs though. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 235869) Time Spent: 1h 10m (was: 1h) > Improve retrying behavior of throttling clients > --- > > Key: GOBBLIN-760 > URL: https://issues.apache.org/jira/browse/GOBBLIN-760 > Project: Apache Gobblin > Issue Type: Improvement >Reporter: Issac Buenrostro >Assignee: Issac Buenrostro >Priority: Major > Time Spent: 1h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [incubator-gobblin] ibuenros commented on a change in pull request #2624: [GOBBLIN-760] Improve retrying behavior of throttling client, add more informative …
ibuenros commented on a change in pull request #2624: [GOBBLIN-760] Improve retrying behavior of throttling client, add more informative … URL: https://github.com/apache/incubator-gobblin/pull/2624#discussion_r280128758 ## File path: gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/org/apache/gobblin/util/limiter/BatchedPermitsRequester.java ## @@ -196,18 +205,26 @@ private long elapsedMillis(long startTimeNanos) { /** * Send a new permit request to the server. */ - private void maybeSendNewPermitRequest() { -if (!this.requestSemaphore.tryAcquire()) { - return; + private synchronized void maybeSendNewPermitRequest() { +while (!this.requestSemaphore.tryAcquire()) { + if (this.currentCallback == null) { +throw new IllegalStateException("Semaphore is unavailable while callback is null!"); + } + if (this.currentCallback.elapsedTime() > 3) { Review comment: It is not actually derived from that, it's an arbitrary number. Added some comments and logs though. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Work logged] (GOBBLIN-760) Improve retrying behavior of throttling clients
[ https://issues.apache.org/jira/browse/GOBBLIN-760?focusedWorklogId=235867=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-235867 ] ASF GitHub Bot logged work on GOBBLIN-760: -- Author: ASF GitHub Bot Created on: 01/May/19 16:32 Start Date: 01/May/19 16:32 Worklog Time Spent: 10m Work Description: ibuenros commented on pull request #2624: [GOBBLIN-760] Improve retrying behavior of throttling client, add more informative … URL: https://github.com/apache/incubator-gobblin/pull/2624#discussion_r280128632 ## File path: gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/org/apache/gobblin/util/limiter/BatchedPermitsRequester.java ## @@ -364,10 +393,22 @@ public void onSuccess(Response result) { } } +public long elapsedTime() { + return System.currentTimeMillis() - this.startTime; +} + +public synchronized void clearCallback() { Review comment: We want to make sure a late callback and a timeout don't both release the semaphore. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 235867) Time Spent: 50m (was: 40m) > Improve retrying behavior of throttling clients > --- > > Key: GOBBLIN-760 > URL: https://issues.apache.org/jira/browse/GOBBLIN-760 > Project: Apache Gobblin > Issue Type: Improvement >Reporter: Issac Buenrostro >Assignee: Issac Buenrostro >Priority: Major > Time Spent: 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (GOBBLIN-760) Improve retrying behavior of throttling clients
[ https://issues.apache.org/jira/browse/GOBBLIN-760?focusedWorklogId=235868=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-235868 ] ASF GitHub Bot logged work on GOBBLIN-760: -- Author: ASF GitHub Bot Created on: 01/May/19 16:32 Start Date: 01/May/19 16:32 Worklog Time Spent: 10m Work Description: ibuenros commented on pull request #2624: [GOBBLIN-760] Improve retrying behavior of throttling client, add more informative … URL: https://github.com/apache/incubator-gobblin/pull/2624#discussion_r280128649 ## File path: gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/org/apache/gobblin/util/limiter/BatchedPermitsRequester.java ## @@ -87,6 +88,7 @@ private static final long RETRY_DELAY_ON_NON_RETRIABLE_EXCEPTION = 6; // 10 minutes private static final double MAX_DEPLETION_RATE = 1e20; public static final int MAX_GROWTH_REQUEST = 2; + private static final long GET_PERMITS_MAX_SLEEP = 1000; Review comment: Fixed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 235868) Time Spent: 1h (was: 50m) > Improve retrying behavior of throttling clients > --- > > Key: GOBBLIN-760 > URL: https://issues.apache.org/jira/browse/GOBBLIN-760 > Project: Apache Gobblin > Issue Type: Improvement >Reporter: Issac Buenrostro >Assignee: Issac Buenrostro >Priority: Major > Time Spent: 1h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [incubator-gobblin] ibuenros commented on a change in pull request #2624: [GOBBLIN-760] Improve retrying behavior of throttling client, add more informative …
ibuenros commented on a change in pull request #2624: [GOBBLIN-760] Improve retrying behavior of throttling client, add more informative … URL: https://github.com/apache/incubator-gobblin/pull/2624#discussion_r280128649 ## File path: gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/org/apache/gobblin/util/limiter/BatchedPermitsRequester.java ## @@ -87,6 +88,7 @@ private static final long RETRY_DELAY_ON_NON_RETRIABLE_EXCEPTION = 6; // 10 minutes private static final double MAX_DEPLETION_RATE = 1e20; public static final int MAX_GROWTH_REQUEST = 2; + private static final long GET_PERMITS_MAX_SLEEP = 1000; Review comment: Fixed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Work logged] (GOBBLIN-760) Improve retrying behavior of throttling clients
[ https://issues.apache.org/jira/browse/GOBBLIN-760?focusedWorklogId=235701=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-235701 ] ASF GitHub Bot logged work on GOBBLIN-760: -- Author: ASF GitHub Bot Created on: 01/May/19 06:21 Start Date: 01/May/19 06:21 Worklog Time Spent: 10m Work Description: autumnust commented on pull request #2624: [GOBBLIN-760] Improve retrying behavior of throttling client, add more informative … URL: https://github.com/apache/incubator-gobblin/pull/2624#discussion_r280011104 ## File path: gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/org/apache/gobblin/util/limiter/BatchedPermitsRequester.java ## @@ -364,10 +393,22 @@ public void onSuccess(Response result) { } } +public long elapsedTime() { + return System.currentTimeMillis() - this.startTime; +} + +public synchronized void clearCallback() { Review comment: Why this method is needed if there's only one on-the-fly request ( corresponding one callback) being existed ? Can't it be simply `clearSemaphore` instead of having a `callbackcleared` method for protection? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 235701) Time Spent: 40m (was: 0.5h) > Improve retrying behavior of throttling clients > --- > > Key: GOBBLIN-760 > URL: https://issues.apache.org/jira/browse/GOBBLIN-760 > Project: Apache Gobblin > Issue Type: Improvement >Reporter: Issac Buenrostro >Assignee: Issac Buenrostro >Priority: Major > Time Spent: 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (GOBBLIN-760) Improve retrying behavior of throttling clients
[ https://issues.apache.org/jira/browse/GOBBLIN-760?focusedWorklogId=235702=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-235702 ] ASF GitHub Bot logged work on GOBBLIN-760: -- Author: ASF GitHub Bot Created on: 01/May/19 06:21 Start Date: 01/May/19 06:21 Worklog Time Spent: 10m Work Description: autumnust commented on pull request #2624: [GOBBLIN-760] Improve retrying behavior of throttling client, add more informative … URL: https://github.com/apache/incubator-gobblin/pull/2624#discussion_r280011339 ## File path: gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/org/apache/gobblin/util/limiter/BatchedPermitsRequester.java ## @@ -87,6 +88,7 @@ private static final long RETRY_DELAY_ON_NON_RETRIABLE_EXCEPTION = 6; // 10 minutes private static final double MAX_DEPLETION_RATE = 1e20; public static final int MAX_GROWTH_REQUEST = 2; + private static final long GET_PERMITS_MAX_SLEEP = 1000; Review comment: Add unit in name? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 235702) > Improve retrying behavior of throttling clients > --- > > Key: GOBBLIN-760 > URL: https://issues.apache.org/jira/browse/GOBBLIN-760 > Project: Apache Gobblin > Issue Type: Improvement >Reporter: Issac Buenrostro >Assignee: Issac Buenrostro >Priority: Major > Time Spent: 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (GOBBLIN-760) Improve retrying behavior of throttling clients
[ https://issues.apache.org/jira/browse/GOBBLIN-760?focusedWorklogId=235700=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-235700 ] ASF GitHub Bot logged work on GOBBLIN-760: -- Author: ASF GitHub Bot Created on: 01/May/19 06:21 Start Date: 01/May/19 06:21 Worklog Time Spent: 10m Work Description: autumnust commented on pull request #2624: [GOBBLIN-760] Improve retrying behavior of throttling client, add more informative … URL: https://github.com/apache/incubator-gobblin/pull/2624#discussion_r280011772 ## File path: gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/org/apache/gobblin/util/limiter/BatchedPermitsRequester.java ## @@ -196,18 +205,26 @@ private long elapsedMillis(long startTimeNanos) { /** * Send a new permit request to the server. */ - private void maybeSendNewPermitRequest() { -if (!this.requestSemaphore.tryAcquire()) { - return; + private synchronized void maybeSendNewPermitRequest() { +while (!this.requestSemaphore.tryAcquire()) { + if (this.currentCallback == null) { +throw new IllegalStateException("Semaphore is unavailable while callback is null!"); + } + if (this.currentCallback.elapsedTime() > 3) { Review comment: This number seems to be derived from maxTimeOut / (maxRetry + 1) ? Shall we explicitly place a formula here instead of a plain number This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 235700) Time Spent: 0.5h (was: 20m) > Improve retrying behavior of throttling clients > --- > > Key: GOBBLIN-760 > URL: https://issues.apache.org/jira/browse/GOBBLIN-760 > Project: Apache Gobblin > Issue Type: Improvement >Reporter: Issac Buenrostro >Assignee: Issac Buenrostro >Priority: Major > Time Spent: 0.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [incubator-gobblin] autumnust commented on a change in pull request #2624: [GOBBLIN-760] Improve retrying behavior of throttling client, add more informative …
autumnust commented on a change in pull request #2624: [GOBBLIN-760] Improve retrying behavior of throttling client, add more informative … URL: https://github.com/apache/incubator-gobblin/pull/2624#discussion_r280011339 ## File path: gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/org/apache/gobblin/util/limiter/BatchedPermitsRequester.java ## @@ -87,6 +88,7 @@ private static final long RETRY_DELAY_ON_NON_RETRIABLE_EXCEPTION = 6; // 10 minutes private static final double MAX_DEPLETION_RATE = 1e20; public static final int MAX_GROWTH_REQUEST = 2; + private static final long GET_PERMITS_MAX_SLEEP = 1000; Review comment: Add unit in name? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-gobblin] autumnust commented on a change in pull request #2624: [GOBBLIN-760] Improve retrying behavior of throttling client, add more informative …
autumnust commented on a change in pull request #2624: [GOBBLIN-760] Improve retrying behavior of throttling client, add more informative … URL: https://github.com/apache/incubator-gobblin/pull/2624#discussion_r280011104 ## File path: gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/org/apache/gobblin/util/limiter/BatchedPermitsRequester.java ## @@ -364,10 +393,22 @@ public void onSuccess(Response result) { } } +public long elapsedTime() { + return System.currentTimeMillis() - this.startTime; +} + +public synchronized void clearCallback() { Review comment: Why this method is needed if there's only one on-the-fly request ( corresponding one callback) being existed ? Can't it be simply `clearSemaphore` instead of having a `callbackcleared` method for protection? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-gobblin] autumnust commented on a change in pull request #2624: [GOBBLIN-760] Improve retrying behavior of throttling client, add more informative …
autumnust commented on a change in pull request #2624: [GOBBLIN-760] Improve retrying behavior of throttling client, add more informative … URL: https://github.com/apache/incubator-gobblin/pull/2624#discussion_r280011772 ## File path: gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/org/apache/gobblin/util/limiter/BatchedPermitsRequester.java ## @@ -196,18 +205,26 @@ private long elapsedMillis(long startTimeNanos) { /** * Send a new permit request to the server. */ - private void maybeSendNewPermitRequest() { -if (!this.requestSemaphore.tryAcquire()) { - return; + private synchronized void maybeSendNewPermitRequest() { +while (!this.requestSemaphore.tryAcquire()) { + if (this.currentCallback == null) { +throw new IllegalStateException("Semaphore is unavailable while callback is null!"); + } + if (this.currentCallback.elapsedTime() > 3) { Review comment: This number seems to be derived from maxTimeOut / (maxRetry + 1) ? Shall we explicitly place a formula here instead of a plain number This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services