This is an automated email from the ASF dual-hosted git repository. wusheng pushed a commit to branch chore in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit a4d703cd9aee4565f292c45169a1db7d79f0a417 Author: Wu Sheng <[email protected]> AuthorDate: Mon Dec 5 11:25:55 2022 +0800 Polish codes. --- .../exporter/provider/grpc/GRPCMetricsExporter.java | 11 ----------- .../exporter/provider/kafka/log/KafkaLogExporter.java | 11 ----------- .../provider/kafka/trace/KafkaTraceExporter.java | 11 ----------- .../core/analysis/worker/MetricsAggregateWorker.java | 17 ++++------------- .../core/analysis/worker/MetricsPersistentWorker.java | 10 ---------- .../oap/server/core/analysis/worker/TopNWorker.java | 14 ++------------ .../oap/server/core/remote/client/GRPCRemoteClient.java | 13 ++----------- .../server/library/datacarrier/consumer/IConsumer.java | 7 ++++--- .../oap/server/library/datacarrier/DataCarrierTest.java | 14 -------------- .../library/datacarrier/consumer/ConsumerTest.java | 11 ----------- .../library/datacarrier/consumer/SampleConsumer.java | 11 ----------- .../storage/plugin/jdbc/common/dao/JDBCBatchDAO.java | 14 ++------------ 12 files changed, 14 insertions(+), 130 deletions(-) diff --git a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCMetricsExporter.java b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCMetricsExporter.java index cb1b49dca5..4693fc51a9 100644 --- a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCMetricsExporter.java +++ b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCMetricsExporter.java @@ -22,7 +22,6 @@ import io.grpc.ManagedChannel; import io.grpc.stub.StreamObserver; import java.util.ArrayList; import java.util.List; -import java.util.Properties; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; @@ -131,11 +130,6 @@ public class GRPCMetricsExporter extends MetricFormatter implements MetricValues } } - @Override - public void init(final Properties properties) { - - } - @Override public void consume(List<ExportData> data) { GRPCStreamStatus status = new GRPCStreamStatus(); @@ -239,11 +233,6 @@ public class GRPCMetricsExporter extends MetricFormatter implements MetricValues log.error(t.getMessage(), t); } - @Override - public void onExit() { - - } - private boolean eventTypeMatch(ExportEvent.EventType eventType, org.apache.skywalking.oap.server.exporter.grpc.EventType subscriptionType) { return (ExportEvent.EventType.INCREMENT.equals(eventType) && EventType.INCREMENT.equals(subscriptionType)) diff --git a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/kafka/log/KafkaLogExporter.java b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/kafka/log/KafkaLogExporter.java index 033cbdf4f5..095a364ee7 100644 --- a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/kafka/log/KafkaLogExporter.java +++ b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/kafka/log/KafkaLogExporter.java @@ -20,7 +20,6 @@ package org.apache.skywalking.oap.server.exporter.provider.kafka.log; import com.google.protobuf.InvalidProtocolBufferException; import java.util.List; -import java.util.Properties; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.utils.Bytes; @@ -94,11 +93,6 @@ public class KafkaLogExporter extends KafkaExportProducer implements LogExportSe return setting.isEnableKafkaLog(); } - @Override - public void init(final Properties properties) { - - } - @Override public void consume(final List<LogRecord> data) { for (LogRecord logRecord : data) { @@ -131,11 +125,6 @@ public class KafkaLogExporter extends KafkaExportProducer implements LogExportSe } - @Override - public void onExit() { - - } - private LogData transLogData(LogRecord logRecord) throws InvalidProtocolBufferException { LogData.Builder builder = LogData.newBuilder(); LogDataBody.Builder bodyBuilder = LogDataBody.newBuilder(); diff --git a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/kafka/trace/KafkaTraceExporter.java b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/kafka/trace/KafkaTraceExporter.java index 243b09ea02..273fddc609 100644 --- a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/kafka/trace/KafkaTraceExporter.java +++ b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/kafka/trace/KafkaTraceExporter.java @@ -20,7 +20,6 @@ package org.apache.skywalking.oap.server.exporter.provider.kafka.trace; import com.google.protobuf.InvalidProtocolBufferException; import java.util.List; -import java.util.Properties; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.utils.Bytes; @@ -85,11 +84,6 @@ public class KafkaTraceExporter extends KafkaExportProducer implements TraceExpo return setting.isEnableKafkaTrace(); } - @Override - public void init(final Properties properties) { - - } - @Override public void consume(final List<SegmentRecord> data) { for (SegmentRecord segmentRecord : data) { @@ -122,9 +116,4 @@ public class KafkaTraceExporter extends KafkaExportProducer implements TraceExpo public void onError(final List<SegmentRecord> data, final Throwable t) { } - - @Override - public void onExit() { - - } } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java index 046029cff4..61576fc9cf 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java @@ -19,16 +19,15 @@ package org.apache.skywalking.oap.server.core.analysis.worker; import java.util.List; -import java.util.Properties; import lombok.extern.slf4j.Slf4j; -import org.apache.skywalking.oap.server.library.datacarrier.DataCarrier; -import org.apache.skywalking.oap.server.library.datacarrier.consumer.BulkConsumePool; -import org.apache.skywalking.oap.server.library.datacarrier.consumer.ConsumerPoolFactory; -import org.apache.skywalking.oap.server.library.datacarrier.consumer.IConsumer; import org.apache.skywalking.oap.server.core.UnexpectedException; import org.apache.skywalking.oap.server.core.analysis.data.MergableBufferedData; import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; import org.apache.skywalking.oap.server.core.worker.AbstractWorker; +import org.apache.skywalking.oap.server.library.datacarrier.DataCarrier; +import org.apache.skywalking.oap.server.library.datacarrier.consumer.BulkConsumePool; +import org.apache.skywalking.oap.server.library.datacarrier.consumer.ConsumerPoolFactory; +import org.apache.skywalking.oap.server.library.datacarrier.consumer.IConsumer; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; import org.apache.skywalking.oap.server.telemetry.TelemetryModule; import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics; @@ -130,10 +129,6 @@ public class MetricsAggregateWorker extends AbstractWorker<Metrics> { } private class AggregatorConsumer implements IConsumer<Metrics> { - @Override - public void init(final Properties properties) { - } - @Override public void consume(List<Metrics> data) { MetricsAggregateWorker.this.onWork(data); @@ -144,10 +139,6 @@ public class MetricsAggregateWorker extends AbstractWorker<Metrics> { log.error(t.getMessage(), t); } - @Override - public void onExit() { - } - @Override public void nothingToConsume() { flush(); diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java index 169332a168..403e2b8c67 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java @@ -22,7 +22,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Optional; -import java.util.Properties; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.oap.server.core.CoreModule; @@ -401,11 +400,6 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> { * ID is declared through {@link Object#hashCode()} and {@link Object#equals(Object)} as usual. */ private class PersistentConsumer implements IConsumer<Metrics> { - @Override - public void init(final Properties properties) { - - } - @Override public void consume(List<Metrics> data) { MetricsPersistentWorker.this.onWork(data); @@ -415,9 +409,5 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> { public void onError(List<Metrics> data, Throwable t) { log.error(t.getMessage(), t); } - - @Override - public void onExit() { - } } } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java index 25c5144399..4c80ef9961 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java @@ -21,16 +21,15 @@ package org.apache.skywalking.oap.server.core.analysis.worker; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Properties; import lombok.extern.slf4j.Slf4j; -import org.apache.skywalking.oap.server.library.datacarrier.DataCarrier; -import org.apache.skywalking.oap.server.library.datacarrier.consumer.IConsumer; import org.apache.skywalking.oap.server.core.analysis.data.LimitedSizeBufferedData; import org.apache.skywalking.oap.server.core.analysis.data.ReadWriteSafeCache; import org.apache.skywalking.oap.server.core.analysis.topn.TopN; import org.apache.skywalking.oap.server.core.storage.IRecordDAO; import org.apache.skywalking.oap.server.core.storage.model.Model; import org.apache.skywalking.oap.server.library.client.request.PrepareRequest; +import org.apache.skywalking.oap.server.library.datacarrier.DataCarrier; +import org.apache.skywalking.oap.server.library.datacarrier.consumer.IConsumer; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; /** @@ -97,10 +96,6 @@ public class TopNWorker extends PersistenceWorker<TopN> { } private class TopNConsumer implements IConsumer<TopN> { - @Override - public void init(final Properties properties) { - } - @Override public void consume(List<TopN> data) { TopNWorker.this.onWork(data); @@ -110,10 +105,5 @@ public class TopNWorker extends PersistenceWorker<TopN> { public void onError(List<TopN> data, Throwable t) { log.error(t.getMessage(), t); } - - @Override - public void onExit() { - - } } } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java index 7284dc22af..a1a324309f 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java @@ -24,17 +24,16 @@ import io.grpc.stub.StreamObserver; import io.netty.handler.ssl.SslContext; import java.util.List; import java.util.Objects; -import java.util.Properties; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import lombok.extern.slf4j.Slf4j; -import org.apache.skywalking.oap.server.library.datacarrier.DataCarrier; -import org.apache.skywalking.oap.server.library.datacarrier.consumer.IConsumer; import org.apache.skywalking.oap.server.core.remote.data.StreamData; import org.apache.skywalking.oap.server.core.remote.grpc.proto.Empty; import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteMessage; import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteServiceGrpc; import org.apache.skywalking.oap.server.library.client.grpc.GRPCClient; +import org.apache.skywalking.oap.server.library.datacarrier.DataCarrier; +import org.apache.skywalking.oap.server.library.datacarrier.consumer.IConsumer; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; import org.apache.skywalking.oap.server.telemetry.TelemetryModule; import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics; @@ -154,10 +153,6 @@ public class GRPCRemoteClient implements RemoteClient { } class RemoteMessageConsumer implements IConsumer<RemoteMessage> { - @Override - public void init(final Properties properties) { - } - @Override public void consume(List<RemoteMessage> remoteMessages) { try { @@ -177,10 +172,6 @@ public class GRPCRemoteClient implements RemoteClient { public void onError(List<RemoteMessage> remoteMessages, Throwable t) { log.error(t.getMessage(), t); } - - @Override - public void onExit() { - } } /** diff --git a/oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/IConsumer.java b/oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/IConsumer.java index b47a31942a..bb5db59ef7 100644 --- a/oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/IConsumer.java +++ b/oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/IConsumer.java @@ -22,19 +22,20 @@ import java.util.List; import java.util.Properties; public interface IConsumer<T> { - void init(final Properties properties); + default void init(final Properties properties) { + } void consume(List<T> data); void onError(List<T> data, Throwable t); - void onExit(); + default void onExit() { + } /** * Notify the implementation, if there is nothing fetched from the queue. This could be used as a timer to trigger * reaction if the queue has no element. */ default void nothingToConsume() { - return; } } diff --git a/oap-server/server-library/library-datacarrier-queue/src/test/java/org/apache/skywalking/oap/server/library/datacarrier/DataCarrierTest.java b/oap-server/server-library/library-datacarrier-queue/src/test/java/org/apache/skywalking/oap/server/library/datacarrier/DataCarrierTest.java index 3cc615ebf9..db64c26fba 100644 --- a/oap-server/server-library/library-datacarrier-queue/src/test/java/org/apache/skywalking/oap/server/library/datacarrier/DataCarrierTest.java +++ b/oap-server/server-library/library-datacarrier-queue/src/test/java/org/apache/skywalking/oap/server/library/datacarrier/DataCarrierTest.java @@ -20,14 +20,12 @@ package org.apache.skywalking.oap.server.library.datacarrier; import java.util.ArrayList; import java.util.List; -import java.util.Properties; import org.apache.skywalking.oap.server.library.datacarrier.buffer.BufferStrategy; import org.apache.skywalking.oap.server.library.datacarrier.buffer.Channels; import org.apache.skywalking.oap.server.library.datacarrier.buffer.QueueBuffer; import org.apache.skywalking.oap.server.library.datacarrier.consumer.IConsumer; import org.apache.skywalking.oap.server.library.datacarrier.partition.ProducerThreadPartitioner; import org.apache.skywalking.oap.server.library.datacarrier.partition.SimpleRollingPartitioner; - import org.junit.Assert; import org.junit.Test; import org.powermock.api.support.membermodification.MemberModifier; @@ -119,13 +117,6 @@ public class DataCarrierTest { e.printStackTrace(); } IConsumer<SampleData> consumer = new IConsumer<SampleData>() { - int i = 0; - - @Override - public void init(final Properties properties) { - - } - @Override public void consume(List<SampleData> data) { @@ -135,11 +126,6 @@ public class DataCarrierTest { public void onError(List<SampleData> data, Throwable t) { } - - @Override - public void onExit() { - - } }; carrier.consume(consumer, 1); }).start(); diff --git a/oap-server/server-library/library-datacarrier-queue/src/test/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/ConsumerTest.java b/oap-server/server-library/library-datacarrier-queue/src/test/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/ConsumerTest.java index 03b3409d2c..c759e3d2d4 100644 --- a/oap-server/server-library/library-datacarrier-queue/src/test/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/ConsumerTest.java +++ b/oap-server/server-library/library-datacarrier-queue/src/test/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/ConsumerTest.java @@ -21,7 +21,6 @@ package org.apache.skywalking.oap.server.library.datacarrier.consumer; import java.util.ArrayList; import java.util.HashSet; import java.util.List; -import java.util.Properties; import java.util.concurrent.LinkedBlockingQueue; import org.apache.skywalking.oap.server.library.datacarrier.DataCarrier; import org.apache.skywalking.oap.server.library.datacarrier.SampleData; @@ -104,11 +103,6 @@ public class ConsumerTest { public boolean onError = false; - @Override - public void init(final Properties properties) { - - } - @Override public void consume(List<SampleData> data) { if (onError) { @@ -120,11 +114,6 @@ public class ConsumerTest { public void onError(List<SampleData> data, Throwable t) { IS_OCCUR_ERROR = true; } - - @Override - public void onExit() { - - } } private IConsumer getConsumer(DataCarrier<SampleData> carrier) throws IllegalAccessException { diff --git a/oap-server/server-library/library-datacarrier-queue/src/test/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/SampleConsumer.java b/oap-server/server-library/library-datacarrier-queue/src/test/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/SampleConsumer.java index 0a242ad3ee..4d433fc713 100644 --- a/oap-server/server-library/library-datacarrier-queue/src/test/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/SampleConsumer.java +++ b/oap-server/server-library/library-datacarrier-queue/src/test/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/SampleConsumer.java @@ -19,17 +19,11 @@ package org.apache.skywalking.oap.server.library.datacarrier.consumer; import java.util.List; -import java.util.Properties; import org.apache.skywalking.oap.server.library.datacarrier.SampleData; public class SampleConsumer implements IConsumer<SampleData> { public int i = 1; - @Override - public void init(final Properties properties) { - - } - @Override public void consume(List<SampleData> data) { for (SampleData one : data) { @@ -42,9 +36,4 @@ public class SampleConsumer implements IConsumer<SampleData> { public void onError(List<SampleData> data, Throwable t) { } - - @Override - public void onExit() { - - } } diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCBatchDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCBatchDAO.java index 51df2e944a..d5aceebcd2 100644 --- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCBatchDAO.java +++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCBatchDAO.java @@ -22,19 +22,18 @@ import java.sql.Connection; import java.sql.SQLException; import java.util.ArrayList; import java.util.List; -import java.util.Properties; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.function.Function; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; -import org.apache.skywalking.oap.server.library.datacarrier.DataCarrier; -import org.apache.skywalking.oap.server.library.datacarrier.consumer.IConsumer; import org.apache.skywalking.oap.server.core.storage.IBatchDAO; import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException; import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient; import org.apache.skywalking.oap.server.library.client.request.InsertRequest; import org.apache.skywalking.oap.server.library.client.request.PrepareRequest; +import org.apache.skywalking.oap.server.library.datacarrier.DataCarrier; +import org.apache.skywalking.oap.server.library.datacarrier.consumer.IConsumer; import org.apache.skywalking.oap.server.library.util.CollectionUtils; import org.apache.skywalking.oap.server.storage.plugin.jdbc.BatchSQLExecutor; import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLExecutor; @@ -110,11 +109,6 @@ public class JDBCBatchDAO implements IBatchDAO { this.h2BatchDAO = h2BatchDAO; } - @Override - public void init(final Properties properties) { - - } - @Override public void consume(List<PrepareRequest> prepareRequests) { h2BatchDAO.flush(prepareRequests); @@ -124,9 +118,5 @@ public class JDBCBatchDAO implements IBatchDAO { public void onError(List<PrepareRequest> prepareRequests, Throwable t) { log.error(t.getMessage(), t); } - - @Override - public void onExit() { - } } }
