This is an automated email from the ASF dual-hosted git repository. jagadish pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push: new 380c976 SAMZA-2105: [ELASTICSEARCH, HDFS, KAFKA] code cleanup and refactoring 380c976 is described below commit 380c9762e683d2c755b7d85f484e338d846132a7 Author: strkkk <andreypay...@gmail.com> AuthorDate: Wed Mar 20 12:52:01 2019 -0700 SAMZA-2105: [ELASTICSEARCH, HDFS, KAFKA] code cleanup and refactoring 1. Added/removed modifiers 2. Guava optional -> java optional and removed guava from samza-elasticsearch module dependencies. Guava optional was added in https://issues.apache.org/jira/browse/SAMZA-853 but it is not clear why guava is better than default API. 3. Few code snippets are simplified Author: strkkk <andreypay...@gmail.com> Reviewers: Jagadish <jagad...@apache.org> Closes #921 from strkkk/hdfs_es_kfka --- build.gradle | 1 - .../apache/samza/config/ElasticsearchConfig.java | 22 ++++------- .../system/elasticsearch/BulkProcessorFactory.java | 16 ++++---- .../elasticsearch/ElasticsearchSystemProducer.java | 2 +- .../client/TransportClientFactory.java | 17 ++------- .../indexrequest/DefaultIndexRequestFactory.java | 41 +++++--------------- .../apache/samza/system/hdfs/HdfsSystemAdmin.java | 8 ++-- .../samza/system/hdfs/HdfsSystemConsumer.java | 15 +++----- .../hdfs/descriptors/HdfsSystemDescriptor.java | 44 +++++++++++----------- .../hdfs/partitioner/DirectoryPartitioner.java | 25 ++++-------- .../system/hdfs/partitioner/FileSystemAdapter.java | 8 ++-- .../system/hdfs/reader/MultiFileHdfsReader.java | 4 +- .../system/hdfs/reader/SingleFileHdfsReader.java | 12 +++--- .../apache/samza/config/KafkaConsumerConfig.java | 10 ++--- .../apache/samza/system/kafka/KafkaStreamSpec.java | 8 ++-- .../samza/system/kafka/KafkaSystemAdmin.java | 18 +++------ .../kafka/descriptors/KafkaInputDescriptor.java | 4 +- .../kafka/descriptors/KafkaSystemDescriptor.java | 4 +- .../samza/system/kafka/KafkaConsumerProxy.java | 20 ++++------ 19 files changed, 104 insertions(+), 175 deletions(-) diff --git a/build.gradle b/build.gradle index 29fba39..f5d9f51 100644 --- a/build.gradle +++ b/build.gradle @@ -306,7 +306,6 @@ project(":samza-elasticsearch_$scalaSuffix") { compile project(':samza-api') compile project(":samza-core_$scalaSuffix") compile "org.elasticsearch:elasticsearch:$elasticsearchVersion" - compile "com.google.guava:guava:$guavaVersion" compile "org.slf4j:slf4j-api:$slf4jVersion" testCompile "junit:junit:$junitVersion" testCompile "org.mockito:mockito-core:$mockitoVersion" diff --git a/samza-elasticsearch/src/main/java/org/apache/samza/config/ElasticsearchConfig.java b/samza-elasticsearch/src/main/java/org/apache/samza/config/ElasticsearchConfig.java index 75bf4c7..b062e24 100644 --- a/samza-elasticsearch/src/main/java/org/apache/samza/config/ElasticsearchConfig.java +++ b/samza-elasticsearch/src/main/java/org/apache/samza/config/ElasticsearchConfig.java @@ -20,7 +20,7 @@ package org.apache.samza.config; import org.apache.samza.SamzaException; -import com.google.common.base.Optional; +import java.util.Optional; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,27 +67,19 @@ public class ElasticsearchConfig extends MapConfig { // Index Request public Optional<String> getIndexRequestFactoryClassName() { - if (containsKey(CONFIG_KEY_INDEX_REQUEST_FACTORY)) { - return Optional.of(get(CONFIG_KEY_INDEX_REQUEST_FACTORY)); - } else { - return Optional.absent(); - } + return Optional.ofNullable(get(CONFIG_KEY_INDEX_REQUEST_FACTORY)); } // Transport client settings public Optional<String> getTransportHost() { - if (containsKey(CONFIG_KEY_CLIENT_TRANSPORT_HOST)) { - return Optional.of(get(CONFIG_KEY_CLIENT_TRANSPORT_HOST)); - } else { - return Optional.absent(); - } + return Optional.ofNullable(get(CONFIG_KEY_CLIENT_TRANSPORT_HOST)); } public Optional<Integer> getTransportPort() { if (containsKey(CONFIG_KEY_CLIENT_TRANSPORT_PORT)) { return Optional.of(getInt(CONFIG_KEY_CLIENT_TRANSPORT_PORT)); } else { - return Optional.absent(); + return Optional.empty(); } } @@ -96,7 +88,7 @@ public class ElasticsearchConfig extends MapConfig { if (containsKey(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)) { return Optional.of(getInt(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)); } else { - return Optional.absent(); + return Optional.empty(); } } @@ -104,7 +96,7 @@ public class ElasticsearchConfig extends MapConfig { if (containsKey(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB)) { return Optional.of(getInt(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB)); } else { - return Optional.absent(); + return Optional.empty(); } } @@ -112,7 +104,7 @@ public class ElasticsearchConfig extends MapConfig { if (containsKey(CONFIG_KEY_BULK_FLUSH_INTERVALS_MS)) { return Optional.of(getInt(CONFIG_KEY_BULK_FLUSH_INTERVALS_MS)); } else { - return Optional.absent(); + return Optional.empty(); } } diff --git a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/BulkProcessorFactory.java b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/BulkProcessorFactory.java index 0027531..2ceb899 100644 --- a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/BulkProcessorFactory.java +++ b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/BulkProcessorFactory.java @@ -43,15 +43,13 @@ public class BulkProcessorFactory { // This also means BulkProcessor#flush() is blocking as is also required. builder.setConcurrentRequests(0); - if (config.getBulkFlushMaxActions().isPresent()) { - builder.setBulkActions(config.getBulkFlushMaxActions().get()); - } - if (config.getBulkFlushMaxSizeMB().isPresent()) { - builder.setBulkSize(new ByteSizeValue(config.getBulkFlushMaxSizeMB().get(), ByteSizeUnit.MB)); - } - if (config.getBulkFlushIntervalMS().isPresent()) { - builder.setFlushInterval(TimeValue.timeValueMillis(config.getBulkFlushIntervalMS().get())); - } + config.getBulkFlushMaxActions().ifPresent(builder::setBulkActions); + config.getBulkFlushMaxSizeMB().ifPresent(size -> + builder.setBulkSize(new ByteSizeValue(size, ByteSizeUnit.MB)) + ); + config.getBulkFlushIntervalMS().ifPresent(interval -> + builder.setFlushInterval(TimeValue.timeValueMillis(interval)) + ); return builder.build(); } diff --git a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java index 7672ee8..001dabf 100644 --- a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java +++ b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java @@ -67,7 +67,7 @@ public class ElasticsearchSystemProducer implements SystemProducer { private final BulkProcessorFactory bulkProcessorFactory; private final ElasticsearchSystemProducerMetrics metrics; - private Client client; + private final Client client; public ElasticsearchSystemProducer(String system, BulkProcessorFactory bulkProcessorFactory, Client client, IndexRequestFactory indexRequestFactory, diff --git a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/client/TransportClientFactory.java b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/client/TransportClientFactory.java index e336ad9..5920b8f 100644 --- a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/client/TransportClientFactory.java +++ b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/client/TransportClientFactory.java @@ -47,20 +47,11 @@ public class TransportClientFactory implements ClientFactory { public TransportClientFactory(ElasticsearchConfig config) { clientSettings = config.getElasticseachSettings(); + transportHost = config.getTransportHost().orElseThrow(() -> + new SamzaException("You must specify the transport host for TransportClientFactory with the Elasticsearch system.")); - if (config.getTransportHost().isPresent()) { - transportHost = config.getTransportHost().get(); - } else { - throw new SamzaException("You must specify the transport host for TransportClientFactory" - + "with the Elasticsearch system."); - } - - if (config.getTransportPort().isPresent()) { - transportPort = config.getTransportPort().get(); - } else { - throw new SamzaException("You must specify the transport port for TransportClientFactory" - + "with the Elasticsearch system."); - } + transportPort = config.getTransportPort().orElseThrow(() -> + new SamzaException("You must specify the transport port for TransportClientFactory with the Elasticsearch system.")); } @Override diff --git a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java index 7f1e884..7befb3f 100644 --- a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java +++ b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java @@ -23,10 +23,10 @@ import org.apache.samza.SamzaException; import org.apache.samza.system.OutgoingMessageEnvelope; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.Requests; -import com.google.common.base.Optional; import org.elasticsearch.index.VersionType; import java.util.Map; +import java.util.Optional; /** * The default {@link IndexRequestFactory}. @@ -58,25 +58,10 @@ public class DefaultIndexRequestFactory implements IndexRequestFactory { public IndexRequest getIndexRequest(OutgoingMessageEnvelope envelope) { IndexRequest indexRequest = getRequest(envelope); - Optional<String> id = getId(envelope); - if (id.isPresent()) { - indexRequest.id(id.get()); - } - - Optional<String> routingKey = getRoutingKey(envelope); - if (routingKey.isPresent()) { - indexRequest.routing(routingKey.get()); - } - - Optional<Long> version = getVersion(envelope); - if (version.isPresent()) { - indexRequest.version(version.get()); - } - - Optional<VersionType> versionType = getVersionType(envelope); - if (versionType.isPresent()) { - indexRequest.versionType(versionType.get()); - } + getId(envelope).ifPresent(indexRequest::id); + getRoutingKey(envelope).ifPresent(indexRequest::routing); + getVersion(envelope).ifPresent(indexRequest::version); + getVersionType(envelope).ifPresent(indexRequest::versionType); setSource(envelope, indexRequest); @@ -94,27 +79,19 @@ public class DefaultIndexRequestFactory implements IndexRequestFactory { } protected Optional<String> getId(OutgoingMessageEnvelope envelope) { - Object id = envelope.getKey(); - if (id == null) { - return Optional.absent(); - } - return Optional.of(id.toString()); + return Optional.ofNullable(envelope.getKey()).map(Object::toString); } protected Optional<String> getRoutingKey(OutgoingMessageEnvelope envelope) { - Object partitionKey = envelope.getPartitionKey(); - if (partitionKey == null) { - return Optional.absent(); - } - return Optional.of(partitionKey.toString()); + return Optional.ofNullable(envelope.getPartitionKey()).map(Object::toString); } protected Optional<Long> getVersion(OutgoingMessageEnvelope envelope) { - return Optional.absent(); + return Optional.empty(); } protected Optional<VersionType> getVersionType(OutgoingMessageEnvelope envelope) { - return Optional.absent(); + return Optional.empty(); } protected void setSource(OutgoingMessageEnvelope envelope, IndexRequest indexRequest) { diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java index 0d50f26..7ffbfc7 100644 --- a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java +++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java @@ -91,10 +91,10 @@ import org.slf4j.LoggerFactory; public class HdfsSystemAdmin implements SystemAdmin { private static final Logger LOG = LoggerFactory.getLogger(HdfsSystemAdmin.class); - private HdfsConfig hdfsConfig; - private DirectoryPartitioner directoryPartitioner; - private String stagingDirectory; // directory that contains the partition description - private HdfsReaderFactory.ReaderType readerType; + private final HdfsConfig hdfsConfig; + private final DirectoryPartitioner directoryPartitioner; + private final String stagingDirectory; // directory that contains the partition description + private final HdfsReaderFactory.ReaderType readerType; public HdfsSystemAdmin(String systemName, Config config) { hdfsConfig = new HdfsConfig(config); diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java index 92457ab..1ceb5d6 100644 --- a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java +++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java @@ -22,7 +22,6 @@ package org.apache.samza.system.hdfs; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; @@ -103,9 +102,9 @@ public class HdfsSystemConsumer extends BlockingEnvelopeMap { * (stream2) -> (P0) -> "hdfs://user/samzauser/2/datafile01.avro" * ... */ - private LoadingCache<String, Map<Partition, List<String>>> cachedPartitionDescriptorMap; - private Map<SystemStreamPartition, MultiFileHdfsReader> readers; - private Map<SystemStreamPartition, Future> readerRunnableStatus; + private final LoadingCache<String, Map<Partition, List<String>>> cachedPartitionDescriptorMap; + private final Map<SystemStreamPartition, MultiFileHdfsReader> readers; + private final Map<SystemStreamPartition, Future> readerRunnableStatus; /** * Whether the {@link org.apache.samza.system.hdfs.HdfsSystemConsumer} is notified @@ -130,8 +129,7 @@ public class HdfsSystemConsumer extends BlockingEnvelopeMap { this.consumerMetrics = consumerMetrics; cachedPartitionDescriptorMap = CacheBuilder.newBuilder().build(new CacheLoader<String, Map<Partition, List<String>>>() { @Override - public Map<Partition, List<String>> load(String streamName) - throws Exception { + public Map<Partition, List<String>> load(String streamName) { Validate.notEmpty(streamName); if (StringUtils.isBlank(stagingDirectory)) { throw new SamzaException("Staging directory can't be empty. " @@ -151,8 +149,7 @@ public class HdfsSystemConsumer extends BlockingEnvelopeMap { public void start() { LOG.info(String.format("HdfsSystemConsumer started with %d readers", readers.size())); executorService = Executors.newCachedThreadPool(); - readers.entrySet().forEach( - entry -> readerRunnableStatus.put(entry.getKey(), executorService.submit(new ReaderRunnable(entry.getValue())))); + readers.forEach((key, value) -> readerRunnableStatus.put(key, executorService.submit(new ReaderRunnable(value)))); } /** @@ -275,7 +272,7 @@ public class HdfsSystemConsumer extends BlockingEnvelopeMap { } private class ReaderRunnable implements Runnable { - public MultiFileHdfsReader reader; + public final MultiFileHdfsReader reader; public ReaderRunnable(MultiFileHdfsReader reader) { this.reader = reader; diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/descriptors/HdfsSystemDescriptor.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/descriptors/HdfsSystemDescriptor.java index f4d8566..fd63f79 100644 --- a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/descriptors/HdfsSystemDescriptor.java +++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/descriptors/HdfsSystemDescriptor.java @@ -92,7 +92,7 @@ public class HdfsSystemDescriptor extends SystemDescriptor<HdfsSystemDescriptor> * @return this system descriptor */ public HdfsSystemDescriptor withDatePathFormat(String datePathFormat) { - this.datePathFormat = Optional.of(StringUtils.stripToNull(datePathFormat)); + this.datePathFormat = Optional.ofNullable(StringUtils.stripToNull(datePathFormat)); return this; } @@ -102,7 +102,7 @@ public class HdfsSystemDescriptor extends SystemDescriptor<HdfsSystemDescriptor> * @return this system descriptor */ public HdfsSystemDescriptor withOutputBaseDir(String outputBaseDir) { - this.outputBaseDir = Optional.of(StringUtils.stripToNull(outputBaseDir)); + this.outputBaseDir = Optional.ofNullable(StringUtils.stripToNull(outputBaseDir)); return this; } @@ -135,7 +135,7 @@ public class HdfsSystemDescriptor extends SystemDescriptor<HdfsSystemDescriptor> * @return this system descriptor */ public HdfsSystemDescriptor withWriteCompressionType(String writeCompressionType) { - this.writeCompressionType = Optional.of(StringUtils.stripToNull(writeCompressionType)); + this.writeCompressionType = Optional.ofNullable(StringUtils.stripToNull(writeCompressionType)); return this; } @@ -145,7 +145,7 @@ public class HdfsSystemDescriptor extends SystemDescriptor<HdfsSystemDescriptor> * @return this system descriptor */ public HdfsSystemDescriptor withWriterClassName(String writerClassName) { - this.writerClass = Optional.of(StringUtils.stripToNull(writerClassName)); + this.writerClass = Optional.ofNullable(StringUtils.stripToNull(writerClassName)); return this; } @@ -175,7 +175,7 @@ public class HdfsSystemDescriptor extends SystemDescriptor<HdfsSystemDescriptor> * @return this system descriptor */ public HdfsSystemDescriptor withConsumerWhiteList(String whiteList) { - this.consumerWhiteList = Optional.of(StringUtils.stripToNull(whiteList)); + this.consumerWhiteList = Optional.ofNullable(StringUtils.stripToNull(whiteList)); return this; } @@ -185,7 +185,7 @@ public class HdfsSystemDescriptor extends SystemDescriptor<HdfsSystemDescriptor> * @return this system descriptor */ public HdfsSystemDescriptor withConsumerBlackList(String blackList) { - this.consumerBlackList = Optional.of(StringUtils.stripToNull(blackList)); + this.consumerBlackList = Optional.ofNullable(StringUtils.stripToNull(blackList)); return this; } @@ -195,7 +195,7 @@ public class HdfsSystemDescriptor extends SystemDescriptor<HdfsSystemDescriptor> * @return this system descriptor */ public HdfsSystemDescriptor withConsumerGroupPattern(String groupPattern) { - this.consumerGroupPattern = Optional.of(StringUtils.stripToNull(groupPattern)); + this.consumerGroupPattern = Optional.ofNullable(StringUtils.stripToNull(groupPattern)); return this; } @@ -205,7 +205,7 @@ public class HdfsSystemDescriptor extends SystemDescriptor<HdfsSystemDescriptor> * @return this system descriptor */ public HdfsSystemDescriptor withReaderType(String readerType) { - this.consumerReader = Optional.of(StringUtils.stripToNull(readerType)); + this.consumerReader = Optional.ofNullable(StringUtils.stripToNull(readerType)); return this; } @@ -216,7 +216,7 @@ public class HdfsSystemDescriptor extends SystemDescriptor<HdfsSystemDescriptor> * @return this system descriptor */ public HdfsSystemDescriptor withStagingDirectory(String stagingDirectory) { - this.consumerStagingDirectory = Optional.of(StringUtils.stripToNull(stagingDirectory)); + this.consumerStagingDirectory = Optional.ofNullable(StringUtils.stripToNull(stagingDirectory)); return this; } @@ -225,29 +225,29 @@ public class HdfsSystemDescriptor extends SystemDescriptor<HdfsSystemDescriptor> Map<String, String> config = new HashMap<>(super.toConfig()); String systemName = getSystemName(); - this.datePathFormat.ifPresent( + datePathFormat.ifPresent( val -> config.put(String.format(HdfsConfig.DATE_PATH_FORMAT_STRING(), systemName), val)); - this.outputBaseDir.ifPresent(val -> config.put(String.format(HdfsConfig.BASE_OUTPUT_DIR(), systemName), val)); - this.writeBatchSizeBytes.ifPresent( + outputBaseDir.ifPresent(val -> config.put(String.format(HdfsConfig.BASE_OUTPUT_DIR(), systemName), val)); + writeBatchSizeBytes.ifPresent( val -> config.put(String.format(HdfsConfig.WRITE_BATCH_SIZE_BYTES(), systemName), String.valueOf(val))); - this.writeBatchSizeRecords.ifPresent( + writeBatchSizeRecords.ifPresent( val -> config.put(String.format(HdfsConfig.WRITE_BATCH_SIZE_RECORDS(), systemName), String.valueOf(val))); - this.writeCompressionType.ifPresent( + writeCompressionType.ifPresent( val -> config.put(String.format(HdfsConfig.COMPRESSION_TYPE(), systemName), val)); - this.writerClass.ifPresent(val -> config.put(String.format(HdfsConfig.HDFS_WRITER_CLASS_NAME(), systemName), val)); + writerClass.ifPresent(val -> config.put(String.format(HdfsConfig.HDFS_WRITER_CLASS_NAME(), systemName), val)); - this.consumerBufferCapacity.ifPresent( + consumerBufferCapacity.ifPresent( val -> config.put(String.format(HdfsConfig.CONSUMER_BUFFER_CAPACITY(), systemName), String.valueOf(val))); - this.consumerMaxRetries.ifPresent( + consumerMaxRetries.ifPresent( val -> config.put(String.format(HdfsConfig.CONSUMER_NUM_MAX_RETRIES(), systemName), String.valueOf(val))); - this.consumerWhiteList.ifPresent( + consumerWhiteList.ifPresent( val -> config.put(String.format(HdfsConfig.CONSUMER_PARTITIONER_WHITELIST(), systemName), val)); - this.consumerBlackList.ifPresent( + consumerBlackList.ifPresent( val -> config.put(String.format(HdfsConfig.CONSUMER_PARTITIONER_BLACKLIST(), systemName), val)); - this.consumerGroupPattern.ifPresent( + consumerGroupPattern.ifPresent( val -> config.put(String.format(HdfsConfig.CONSUMER_PARTITIONER_GROUP_PATTERN(), systemName), val)); - this.consumerReader.ifPresent(val -> config.put(String.format(HdfsConfig.FILE_READER_TYPE(), systemName), val)); - this.consumerStagingDirectory.ifPresent( + consumerReader.ifPresent(val -> config.put(String.format(HdfsConfig.FILE_READER_TYPE(), systemName), val)); + consumerStagingDirectory.ifPresent( val -> config.put(String.format(HdfsConfig.STAGING_DIRECTORY(), systemName), val)); return config; diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/DirectoryPartitioner.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/DirectoryPartitioner.java index 0661139..8244504 100644 --- a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/DirectoryPartitioner.java +++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/DirectoryPartitioner.java @@ -20,11 +20,9 @@ package org.apache.samza.system.hdfs.partitioner; import java.util.ArrayList; -import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -33,7 +31,6 @@ import java.util.regex.Pattern; import javax.annotation.Nullable; -import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; import org.apache.commons.lang.StringUtils; import org.apache.samza.Partition; @@ -63,13 +60,13 @@ public class DirectoryPartitioner { private static final Logger LOG = LoggerFactory.getLogger(DirectoryPartitioner.class); private static final String GROUP_IDENTIFIER = "\\[id]"; - private String whiteListRegex; - private String blackListRegex; - private String groupPattern; - private FileSystemAdapter fileSystemAdapter; + private final String whiteListRegex; + private final String blackListRegex; + private final String groupPattern; + private final FileSystemAdapter fileSystemAdapter; // stream name => partition => partition descriptor - private Map<String, Map<Partition, List<String>>> partitionDescriptorMap = new HashMap<>(); + private final Map<String, Map<Partition, List<String>>> partitionDescriptorMap = new HashMap<>(); public DirectoryPartitioner(String whiteList, String blackList, String groupPattern, FileSystemAdapter fileSystemAdapter) { @@ -93,7 +90,7 @@ public class DirectoryPartitioner { allFiles.stream().filter(file -> file.getPath().matches(whiteListRegex) && !file.getPath().matches(blackListRegex)) .forEach(filteredFiles::add); // sort the files to have a consistent order - filteredFiles.sort((f1, f2) -> f1.getPath().compareTo(f2.getPath())); + filteredFiles.sort(Comparator.comparing(FileMetadata::getPath)); LOG.info(String.format("List of filtered files for %s: %s", streamName, filteredFiles)); return filteredFiles; } @@ -152,7 +149,7 @@ public class DirectoryPartitioner { List<List<FileMetadata>> ret = new ArrayList<>(); // sort the map to guarantee consistent ordering List<String> sortedKeys = new ArrayList<>(map.keySet()); - sortedKeys.sort(Comparator.<String>naturalOrder()); + sortedKeys.sort(Comparator.naturalOrder()); sortedKeys.stream().forEach(key -> ret.add(map.get(key))); return ret; } @@ -175,13 +172,7 @@ public class DirectoryPartitioner { throw new SamzaException("The list of new files is not a super set of the old files. diff = " + oldFileSet.removeAll(newFileSet)); } - Iterator<FileMetadata> iterator = newFileList.iterator(); - while (iterator.hasNext()) { - FileMetadata file = iterator.next(); - if (!oldFileSet.contains(file.getPath())) { - iterator.remove(); - } - } + newFileList.removeIf(file -> !oldFileSet.contains(file.getPath())); return newFileList; } diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/FileSystemAdapter.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/FileSystemAdapter.java index 5fec4bf..7e35eec 100644 --- a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/FileSystemAdapter.java +++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/FileSystemAdapter.java @@ -33,11 +33,11 @@ public interface FileSystemAdapter { * @param streamName name of the stream * @return list of <code>FileMetadata</code> for all files associated to the given stream */ - public List<FileMetadata> getAllFiles(String streamName); + List<FileMetadata> getAllFiles(String streamName); - public class FileMetadata { - private String path; - private long length; + class FileMetadata { + private final String path; + private final long length; public FileMetadata(String path, long length) { this.path = path; diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/MultiFileHdfsReader.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/MultiFileHdfsReader.java index eea68bb..cd8cc5e 100644 --- a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/MultiFileHdfsReader.java +++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/MultiFileHdfsReader.java @@ -46,7 +46,7 @@ public class MultiFileHdfsReader { private final HdfsReaderFactory.ReaderType readerType; private final SystemStreamPartition systemStreamPartition; - private List<String> filePaths; + private final List<String> filePaths; private SingleFileHdfsReader curReader; private int curFileIndex = 0; private String curSingleFileOffset; @@ -127,7 +127,7 @@ public class MultiFileHdfsReader { this.filePaths = partitionDescriptors; this.numMaxRetries = numMaxRetries; this.numRetries = 0; - if (partitionDescriptors.size() <= 0) { + if (partitionDescriptors.isEmpty()) { throw new SamzaException( "Invalid number of files based on partition descriptors: " + partitionDescriptors.size()); } diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/SingleFileHdfsReader.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/SingleFileHdfsReader.java index eb8a70d..cbae032 100644 --- a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/SingleFileHdfsReader.java +++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/SingleFileHdfsReader.java @@ -28,35 +28,35 @@ public interface SingleFileHdfsReader { * @param path path of the file to be read * @param offset offset the reader should start from */ - public void open(String path, String offset); + void open(String path, String offset); /** * Seek to a specific offset * @param offset offset the reader should seek to */ - public void seek(String offset); + void seek(String offset); /** * Construct and return the next message envelope * @return constructed IncomeMessageEnvelope */ - public IncomingMessageEnvelope readNext(); + IncomingMessageEnvelope readNext(); /** * Get the next offset, which is the offset for the next message * that will be returned by readNext * @return next offset */ - public String nextOffset(); + String nextOffset(); /** * Whether there are still records to be read * @return true of false based on whether the reader has hit end of file */ - public boolean hasNext(); + boolean hasNext(); /** * Close the reader. */ - public void close(); + void close(); } diff --git a/samza-kafka/src/main/java/org/apache/samza/config/KafkaConsumerConfig.java b/samza-kafka/src/main/java/org/apache/samza/config/KafkaConsumerConfig.java index 21709be..dea60b3 100644 --- a/samza-kafka/src/main/java/org/apache/samza/config/KafkaConsumerConfig.java +++ b/samza-kafka/src/main/java/org/apache/samza/config/KafkaConsumerConfig.java @@ -71,8 +71,7 @@ public class KafkaConsumerConfig extends HashMap<String, Object> { final String groupId = createConsumerGroupId(config); - Map<String, Object> consumerProps = new HashMap<>(); - consumerProps.putAll(subConf); + Map<String, Object> consumerProps = new HashMap<>(subConf); consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId); @@ -115,8 +114,7 @@ public class KafkaConsumerConfig extends HashMap<String, Object> { } // Override default max poll config if there is no value - consumerProps.computeIfAbsent(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, - (k) -> DEFAULT_KAFKA_CONSUMER_MAX_POLL_RECORDS); + consumerProps.putIfAbsent(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, DEFAULT_KAFKA_CONSUMER_MAX_POLL_RECORDS); return new KafkaConsumerConfig(consumerProps, systemName); } @@ -194,7 +192,7 @@ public class KafkaConsumerConfig extends HashMap<String, Object> { return autoOffsetReset; } // translate old kafka consumer values into new ones (SAMZA-1987 top remove it) - String newAutoOffsetReset = null; + String newAutoOffsetReset; switch (autoOffsetReset) { case "largest": newAutoOffsetReset = KAFKA_OFFSET_LATEST; @@ -230,4 +228,4 @@ public class KafkaConsumerConfig extends HashMap<String, Object> { return newAutoOffsetReset; } -} \ No newline at end of file +} diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java index 113dced..d621308 100644 --- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java +++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java @@ -33,7 +33,7 @@ import org.slf4j.LoggerFactory; * Extends StreamSpec with the ability to easily get the topic replication factor. */ public class KafkaStreamSpec extends StreamSpec { - private static Logger LOG = LoggerFactory.getLogger(KafkaStreamSpec.class); + private static final Logger LOG = LoggerFactory.getLogger(KafkaStreamSpec.class); private static final int DEFAULT_REPLICATION_FACTOR = 2; @@ -59,10 +59,8 @@ public class KafkaStreamSpec extends StreamSpec { * @return The Map instance. */ private static Map<String, String> propertiesToMap(Properties properties) { - Map<String, String> map = new HashMap<String, String>(); - for (final String name: properties.stringPropertyNames()) { - map.put(name, properties.getProperty(name)); - } + Map<String, String> map = new HashMap<>(); + properties.stringPropertyNames().forEach(name -> map.put(name, properties.getProperty(name))); return map; } diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java index 36aa695..f4db408 100644 --- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java +++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java @@ -244,9 +244,7 @@ public class KafkaSystemAdmin implements SystemAdmin { List<PartitionInfo> partitionInfos = metadataConsumer.partitionsFor(streamName); LOG.debug("Stream {} has partitions {}", streamName, partitionInfos); - partitionInfos.forEach(partitionInfo -> { - partitionMetadata.put(new Partition(partitionInfo.partition()), dummySspm); - }); + partitionInfos.forEach(partitionInfo -> partitionMetadata.put(new Partition(partitionInfo.partition()), dummySspm)); allMetadata.put(streamName, new SystemStreamMetadata(streamName, partitionMetadata)); }); @@ -370,9 +368,7 @@ public class KafkaSystemAdmin implements SystemAdmin { } }; - Map<String, SystemStreamMetadata> result = - retryBackoff.run(fetchMetadataOperation, onExceptionRetryOperation).getOrElse(fallbackOperation); - return result; + return retryBackoff.run(fetchMetadataOperation, onExceptionRetryOperation).getOrElse(fallbackOperation); } /** @@ -401,9 +397,7 @@ public class KafkaSystemAdmin implements SystemAdmin { Map<TopicPartition, Long> upcomingOffsetsWithLong = metadataConsumer.endOffsets(topicPartitions); LOG.debug("Kafka-fetched endOffsets: {}", upcomingOffsetsWithLong); - oldestOffsetsWithLong.forEach((topicPartition, offset) -> { - oldestOffsets.put(toSystemStreamPartition(topicPartition), String.valueOf(offset)); - }); + oldestOffsetsWithLong.forEach((topicPartition, offset) -> oldestOffsets.put(toSystemStreamPartition(topicPartition), String.valueOf(offset))); upcomingOffsetsWithLong.forEach((topicPartition, offset) -> { upcomingOffsets.put(toSystemStreamPartition(topicPartition), String.valueOf(offset)); @@ -511,7 +505,7 @@ public class KafkaSystemAdmin implements SystemAdmin { NewTopic newTopic = new NewTopic(topicName, kSpec.getPartitionCount(), (short) kSpec.getReplicationFactor()); // specify the configs - Map<String, String> streamConfig = new HashMap(streamSpec.getConfig()); + Map<String, String> streamConfig = new HashMap<>(streamSpec.getConfig()); // HACK - replication.factor is invalid config for AdminClient.createTopics if (streamConfig.containsKey(REPL_FACTOR)) { String repl = streamConfig.get(REPL_FACTOR); @@ -561,7 +555,7 @@ public class KafkaSystemAdmin implements SystemAdmin { } /** - * Converts a StreamSpec into a KafakStreamSpec. Special handling for coordinator and changelog stream. + * Converts a StreamSpec into a KafkaStreamSpec. Special handling for coordinator and changelog stream. * @param spec a StreamSpec object * @return KafkaStreamSpec object */ @@ -615,7 +609,7 @@ public class KafkaSystemAdmin implements SystemAdmin { // get partition info for topic Map<String, List<PartitionInfo>> getTopicMetadata(Set<String> topics) { - Map<String, List<PartitionInfo>> streamToPartitionsInfo = new HashMap(); + Map<String, List<PartitionInfo>> streamToPartitionsInfo = new HashMap<>(); List<PartitionInfo> partitionInfoList; for (String topic : topics) { partitionInfoList = metadataConsumer.partitionsFor(topic); diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/descriptors/KafkaInputDescriptor.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/descriptors/KafkaInputDescriptor.java index d9477e5..54f433f 100644 --- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/descriptors/KafkaInputDescriptor.java +++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/descriptors/KafkaInputDescriptor.java @@ -71,7 +71,7 @@ public class KafkaInputDescriptor<StreamMessageType> * @return this input descriptor */ public KafkaInputDescriptor<StreamMessageType> withConsumerAutoOffsetReset(String consumerAutoOffsetReset) { - this.consumerAutoOffsetResetOptional = Optional.of(StringUtils.stripToNull(consumerAutoOffsetReset)); + this.consumerAutoOffsetResetOptional = Optional.ofNullable(StringUtils.stripToNull(consumerAutoOffsetReset)); return this; } @@ -94,7 +94,7 @@ public class KafkaInputDescriptor<StreamMessageType> @Override public Map<String, String> toConfig() { - HashMap<String, String> configs = new HashMap<>(super.toConfig()); + Map<String, String> configs = new HashMap<>(super.toConfig()); // Note: Kafka configuration needs the topic's physical name, not the stream-id. // We won't have that here if user only specified it in configs, or if it got rewritten // by the planner to something different than what's in this descriptor. diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/descriptors/KafkaSystemDescriptor.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/descriptors/KafkaSystemDescriptor.java index 091c21a..8c4d48b 100644 --- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/descriptors/KafkaSystemDescriptor.java +++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/descriptors/KafkaSystemDescriptor.java @@ -120,7 +120,7 @@ public class KafkaSystemDescriptor extends SystemDescriptor<KafkaSystemDescripto * @return this system descriptor */ public KafkaSystemDescriptor withConsumerAutoOffsetReset(String consumerAutoOffsetReset) { - this.consumerAutoOffsetResetOptional = Optional.of(StringUtils.stripToNull(consumerAutoOffsetReset)); + this.consumerAutoOffsetResetOptional = Optional.ofNullable(StringUtils.stripToNull(consumerAutoOffsetReset)); return this; } @@ -246,4 +246,4 @@ public class KafkaSystemDescriptor extends SystemDescriptor<KafkaSystemDescripto producerConfigs.forEach((key, value) -> configs.put(String.format(PRODUCER_CONFIGS_CONFIG_KEY, getSystemName(), key), value)); return configs; } -} \ No newline at end of file +} diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java index 93b1ab2..ac0a55c 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java @@ -57,7 +57,7 @@ public class KafkaConsumerProxy<K, V> { private static final int SLEEP_MS_WHILE_NO_TOPIC_PARTITION = 100; - final Thread consumerPollThread; + private final Thread consumerPollThread; private final Consumer<K, V> kafkaConsumer; private final KafkaSystemConsumer.KafkaConsumerMessageSink sink; private final KafkaSystemConsumerMetrics kafkaConsumerMetrics; @@ -187,7 +187,7 @@ public class KafkaConsumerProxy<K, V> { // creates a separate thread for getting the messages. private Runnable createProxyThreadRunnable() { - Runnable runnable = () -> { + return () -> { isRunning = true; try { @@ -208,8 +208,6 @@ public class KafkaConsumerProxy<K, V> { LOG.info("KafkaConsumerProxy for system {} has stopped.", systemName); } }; - - return runnable; } private void fetchMessages() { @@ -305,11 +303,7 @@ public class KafkaConsumerProxy<K, V> { updateMetrics(record, tp); SystemStreamPartition ssp = topicPartitionToSSP.get(tp); - List<IncomingMessageEnvelope> messages = results.get(ssp); - if (messages == null) { - messages = new ArrayList<>(); - results.put(ssp, messages); - } + List<IncomingMessageEnvelope> messages = results.computeIfAbsent(ssp, k -> new ArrayList<>()); IncomingMessageEnvelope incomingMessageEnvelope = handleNewRecord(record, ssp); messages.add(incomingMessageEnvelope); @@ -359,7 +353,7 @@ public class KafkaConsumerProxy<K, V> { if (lag == null) { throw new SamzaException("Unknown/unregistered ssp in latestLags. ssp=" + ssp + "; system=" + systemName); } - long currentSSPLag = lag.longValue(); // lag between the current offset and the highwatermark + long currentSSPLag = lag; // lag between the current offset and the highwatermark if (currentSSPLag < 0) { return; } @@ -397,7 +391,7 @@ public class KafkaConsumerProxy<K, V> { // populate the MetricNames first time if (perPartitionMetrics.isEmpty()) { - HashMap<String, String> tags = new HashMap<>(); + Map<String, String> tags = new HashMap<>(); tags.put("client-id", clientId); // this is required by the KafkaConsumer to get the metrics for (SystemStreamPartition ssp : ssps) { @@ -429,10 +423,10 @@ public class KafkaConsumerProxy<K, V> { Long lag = latestLags.get(ssp); LOG.trace("Latest offset of {} is {}; lag = {}", ssp, offset, lag); if (lag != null && offset != null && lag >= 0) { - long streamEndOffset = offset.longValue() + lag.longValue(); + long streamEndOffset = offset + lag; // update the metrics kafkaConsumerMetrics.setHighWatermarkValue(tp, streamEndOffset); - kafkaConsumerMetrics.setLagValue(tp, lag.longValue()); + kafkaConsumerMetrics.setLagValue(tp, lag); } } }