This is an automated email from the ASF dual-hosted git repository. wusheng pushed a commit to branch enhance-datacarrier in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit e1b4fc16b9d84afb43ceeb7303ac8e59074267d3 Author: Wu Sheng <[email protected]> AuthorDate: Fri Aug 13 10:44:39 2021 +0800 DataCarrier changes a `#consume` API to add properties as a parameter to initialize consumer. --- CHANGES.md | 5 ++++- .../apm/commons/datacarrier/DataCarrier.java | 11 ++++++++--- .../datacarrier/consumer/ConsumeDriver.java | 22 +++++++++++++++------- .../commons/datacarrier/consumer/IConsumer.java | 3 ++- .../apm/commons/datacarrier/DataCarrierTest.java | 3 ++- .../commons/datacarrier/consumer/ConsumerTest.java | 3 ++- .../datacarrier/consumer/SampleConsumer.java | 3 ++- .../agent/core/remote/LogReportServiceClient.java | 3 ++- .../core/remote/TraceSegmentServiceClient.java | 3 ++- .../core/kafka/KafkaTraceSegmentServiceClient.java | 3 ++- .../exporter/provider/grpc/GRPCExporter.java | 3 ++- .../exporter/provider/grpc/GRPCExporterTest.java | 2 +- .../analysis/worker/MetricsAggregateWorker.java | 3 ++- .../analysis/worker/MetricsPersistentWorker.java | 3 ++- .../server/core/analysis/worker/TopNWorker.java | 3 ++- .../core/remote/client/GRPCRemoteClient.java | 3 ++- .../storage/plugin/jdbc/h2/dao/H2BatchDAO.java | 3 ++- 17 files changed, 54 insertions(+), 25 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index e18df0f..4f21273 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -6,7 +6,10 @@ Release Notes. ------------------ #### Project + * Upgrade jdk 11 in dockerfile and remove unused java_opts. +* DataCarrier changes a `#consume` API to add properties as a parameter to initialize consumer when + use `Class<? extends IConsumer<T>> consumerClass`. #### Java Agent @@ -31,7 +34,7 @@ Release Notes. MacOS. * [Break Change] Remove page path in the browser log query condition. Only support `query by page path id`. * [Break Change] Remove endpoint name in the backend log query condition. Only support `query by endpoint id`. -* [Break Change] Fix typo for a column `page_path_id`(was `pate_path_id`) of storage entity `browser_error_log`. +* [Break Change] Fix typo for a column `page_path_id`(was `pate_path_id`) of storage entity `browser_error_log`. * Add component id for Python falcon plugin. #### UI diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrier.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrier.java index 66d3ff8..4f5eabd 100644 --- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrier.java +++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrier.java @@ -18,6 +18,7 @@ package org.apache.skywalking.apm.commons.datacarrier; +import java.util.Properties; import org.apache.skywalking.apm.commons.datacarrier.buffer.BufferStrategy; import org.apache.skywalking.apm.commons.datacarrier.buffer.Channels; import org.apache.skywalking.apm.commons.datacarrier.consumer.ConsumeDriver; @@ -90,12 +91,16 @@ public class DataCarrier<T> { * * @param consumerClass class of consumer * @param num number of consumer threads + * @param properties for initializing consumer. */ - public DataCarrier consume(Class<? extends IConsumer<T>> consumerClass, int num, long consumeCycle) { + public DataCarrier consume(Class<? extends IConsumer<T>> consumerClass, + int num, + long consumeCycle, + Properties properties) { if (driver != null) { driver.close(channels); } - driver = new ConsumeDriver<T>(this.name, this.channels, consumerClass, num, consumeCycle); + driver = new ConsumeDriver<T>(this.name, this.channels, consumerClass, num, consumeCycle, properties); driver.begin(channels); return this; } @@ -108,7 +113,7 @@ public class DataCarrier<T> { * @param num number of consumer threads */ public DataCarrier consume(Class<? extends IConsumer<T>> consumerClass, int num) { - return this.consume(consumerClass, num, 20); + return this.consume(consumerClass, num, 20, new Properties()); } /** diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumeDriver.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumeDriver.java index 1ceb765..a6d808f 100644 --- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumeDriver.java +++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumeDriver.java @@ -19,6 +19,7 @@ package org.apache.skywalking.apm.commons.datacarrier.consumer; import java.lang.reflect.InvocationTargetException; +import java.util.Properties; import java.util.concurrent.locks.ReentrantLock; import org.apache.skywalking.apm.commons.datacarrier.buffer.Channels; @@ -31,20 +32,27 @@ public class ConsumeDriver<T> implements IDriver { private Channels<T> channels; private ReentrantLock lock; - public ConsumeDriver(String name, Channels<T> channels, Class<? extends IConsumer<T>> consumerClass, int num, - long consumeCycle) { + public ConsumeDriver(String name, + Channels<T> channels, Class<? extends IConsumer<T>> consumerClass, + int num, + long consumeCycle, + Properties properties) { this(channels, num); for (int i = 0; i < num; i++) { - consumerThreads[i] = new ConsumerThread("DataCarrier." + name + ".Consumer." + i + ".Thread", getNewConsumerInstance(consumerClass), consumeCycle); + consumerThreads[i] = new ConsumerThread( + "DataCarrier." + name + ".Consumer." + i + ".Thread", getNewConsumerInstance(consumerClass, properties), + consumeCycle + ); consumerThreads[i].setDaemon(true); } } public ConsumeDriver(String name, Channels<T> channels, IConsumer<T> prototype, int num, long consumeCycle) { this(channels, num); - prototype.init(); + prototype.init(new Properties()); for (int i = 0; i < num; i++) { - consumerThreads[i] = new ConsumerThread("DataCarrier." + name + ".Consumer." + i + ".Thread", prototype, consumeCycle); + consumerThreads[i] = new ConsumerThread( + "DataCarrier." + name + ".Consumer." + i + ".Thread", prototype, consumeCycle); consumerThreads[i].setDaemon(true); } @@ -57,10 +65,10 @@ public class ConsumeDriver<T> implements IDriver { lock = new ReentrantLock(); } - private IConsumer<T> getNewConsumerInstance(Class<? extends IConsumer<T>> consumerClass) { + private IConsumer<T> getNewConsumerInstance(Class<? extends IConsumer<T>> consumerClass, Properties properties) { try { IConsumer<T> inst = consumerClass.getDeclaredConstructor().newInstance(); - inst.init(); + inst.init(properties); return inst; } catch (InstantiationException e) { throw new ConsumerCannotBeCreatedException(e); diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/IConsumer.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/IConsumer.java index 07793eb..6216b39 100644 --- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/IConsumer.java +++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/IConsumer.java @@ -19,9 +19,10 @@ package org.apache.skywalking.apm.commons.datacarrier.consumer; import java.util.List; +import java.util.Properties; public interface IConsumer<T> { - void init(); + void init(final Properties properties); void consume(List<T> data); diff --git a/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrierTest.java b/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrierTest.java index ae496a0..5d20cce 100644 --- a/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrierTest.java +++ b/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrierTest.java @@ -20,6 +20,7 @@ package org.apache.skywalking.apm.commons.datacarrier; import java.util.ArrayList; import java.util.List; +import java.util.Properties; import org.apache.skywalking.apm.commons.datacarrier.buffer.BufferStrategy; import org.apache.skywalking.apm.commons.datacarrier.buffer.Channels; import org.apache.skywalking.apm.commons.datacarrier.buffer.QueueBuffer; @@ -120,7 +121,7 @@ public class DataCarrierTest { int i = 0; @Override - public void init() { + public void init(final Properties properties) { } diff --git a/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerTest.java b/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerTest.java index 6da897d..17942d5 100644 --- a/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerTest.java +++ b/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerTest.java @@ -21,6 +21,7 @@ package org.apache.skywalking.apm.commons.datacarrier.consumer; import java.util.ArrayList; import java.util.HashSet; import java.util.List; +import java.util.Properties; import java.util.concurrent.LinkedBlockingQueue; import org.apache.skywalking.apm.commons.datacarrier.DataCarrier; import org.apache.skywalking.apm.commons.datacarrier.SampleData; @@ -104,7 +105,7 @@ public class ConsumerTest { public boolean onError = false; @Override - public void init() { + public void init(final Properties properties) { } diff --git a/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/consumer/SampleConsumer.java b/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/consumer/SampleConsumer.java index b23ef03..4534abd 100644 --- a/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/consumer/SampleConsumer.java +++ b/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/consumer/SampleConsumer.java @@ -19,13 +19,14 @@ package org.apache.skywalking.apm.commons.datacarrier.consumer; import java.util.List; +import java.util.Properties; import org.apache.skywalking.apm.commons.datacarrier.SampleData; public class SampleConsumer implements IConsumer<SampleData> { public int i = 1; @Override - public void init() { + public void init(final Properties properties) { } diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/LogReportServiceClient.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/LogReportServiceClient.java index 498af9e..ec9ef86 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/LogReportServiceClient.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/LogReportServiceClient.java @@ -20,6 +20,7 @@ package org.apache.skywalking.apm.agent.core.remote; import java.util.List; +import java.util.Properties; import org.apache.skywalking.apm.agent.core.boot.BootService; import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor; import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer; @@ -49,7 +50,7 @@ public class LogReportServiceClient implements BootService, IConsumer<LogData> { } @Override - public void init() { + public void init(final Properties properties) { } diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java index c9583cc..a0b8695 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java @@ -21,6 +21,7 @@ package org.apache.skywalking.apm.agent.core.remote; import io.grpc.Channel; import io.grpc.stub.StreamObserver; import java.util.List; +import java.util.Properties; import java.util.concurrent.TimeUnit; import org.apache.skywalking.apm.agent.core.boot.BootService; import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor; @@ -80,7 +81,7 @@ public class TraceSegmentServiceClient implements BootService, IConsumer<TraceSe } @Override - public void init() { + public void init(final Properties properties) { } diff --git a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaTraceSegmentServiceClient.java b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaTraceSegmentServiceClient.java index ac5ae33..65181fc 100644 --- a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaTraceSegmentServiceClient.java +++ b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaTraceSegmentServiceClient.java @@ -20,6 +20,7 @@ package org.apache.skywalking.apm.agent.core.kafka; import java.util.List; import java.util.Objects; +import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.utils.Bytes; @@ -77,7 +78,7 @@ public class KafkaTraceSegmentServiceClient implements BootService, IConsumer<Tr } @Override - public void init() { + public void init(final Properties properties) { } diff --git a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java index 8758584..1221b50 100644 --- a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java +++ b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java @@ -22,6 +22,7 @@ import io.grpc.ManagedChannel; import io.grpc.stub.StreamObserver; import java.util.ArrayList; import java.util.List; +import java.util.Properties; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; @@ -121,7 +122,7 @@ public class GRPCExporter extends MetricFormatter implements MetricValuesExportS } @Override - public void init() { + public void init(final Properties properties) { } diff --git a/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterTest.java b/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterTest.java index de42ce7..d31bb0e 100644 --- a/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterTest.java +++ b/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterTest.java @@ -78,7 +78,7 @@ public class GRPCExporterTest { @Test public void init() { - exporter.init(); + exporter.init(properties); } @Test diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java index 4e7c59e..268e0ad 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java @@ -19,6 +19,7 @@ package org.apache.skywalking.oap.server.core.analysis.worker; import java.util.List; +import java.util.Properties; import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.apm.commons.datacarrier.DataCarrier; import org.apache.skywalking.apm.commons.datacarrier.consumer.BulkConsumePool; @@ -117,7 +118,7 @@ public class MetricsAggregateWorker extends AbstractWorker<Metrics> { private class AggregatorConsumer implements IConsumer<Metrics> { @Override - public void init() { + public void init(final Properties properties) { } @Override diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java index b216bd0..dbbc451 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java @@ -26,6 +26,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Properties; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.apm.commons.datacarrier.DataCarrier; @@ -321,7 +322,7 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> { */ private class PersistentConsumer implements IConsumer<Metrics> { @Override - public void init() { + public void init(final Properties properties) { } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java index 1e9db5e..e224e62 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Properties; import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.apm.commons.datacarrier.DataCarrier; import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer; @@ -100,7 +101,7 @@ public class TopNWorker extends PersistenceWorker<TopN> { private class TopNConsumer implements IConsumer<TopN> { @Override - public void init() { + public void init(final Properties properties) { } @Override diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java index 51fda20..99049a6 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java @@ -23,6 +23,7 @@ import io.grpc.stub.StreamObserver; import io.netty.handler.ssl.SslContext; import java.util.List; import java.util.Objects; +import java.util.Properties; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import lombok.extern.slf4j.Slf4j; @@ -153,7 +154,7 @@ public class GRPCRemoteClient implements RemoteClient { class RemoteMessageConsumer implements IConsumer<RemoteMessage> { @Override - public void init() { + public void init(final Properties properties) { } @Override diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java index 3a799f9..2101e68 100644 --- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java +++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java @@ -21,6 +21,7 @@ package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao; import java.sql.Connection; import java.sql.SQLException; import java.util.List; +import java.util.Properties; import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.apm.commons.datacarrier.DataCarrier; import org.apache.skywalking.apm.commons.datacarrier.consumer.BulkConsumePool; @@ -94,7 +95,7 @@ public class H2BatchDAO implements IBatchDAO { } @Override - public void init() { + public void init(final Properties properties) { }
