This is an automated email from the ASF dual-hosted git repository. leesf pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push: new 594da28 [HUDI-595] code cleanup, refactoring code out of PR# 1159 (#1302) 594da28 is described below commit 594da28fbf64fb20432e718a409577fd10516c4a Author: Suneel Marthi <smar...@apache.org> AuthorDate: Tue Feb 4 14:52:03 2020 +0100 [HUDI-595] code cleanup, refactoring code out of PR# 1159 (#1302) --- .../org/apache/hudi/index/hbase/HBaseIndex.java | 23 ++------- .../org/apache/hudi/io/HoodieCommitArchiveLog.java | 6 +-- .../io/compact/strategy/CompactionStrategy.java | 8 +-- .../main/java/org/apache/hudi/metrics/Metrics.java | 17 +++---- .../src/test/java/org/apache/hudi/TestCleaner.java | 37 +++++++------- .../hudi/common/HoodieTestDataGenerator.java | 14 ++---- .../index/bloom/TestHoodieGlobalBloomIndex.java | 51 ++++++++++++------- .../org/apache/hudi/common/HoodieJsonPayload.java | 5 +- .../hudi/common/table/log/HoodieLogFileReader.java | 33 ++++++------ .../java/org/apache/hudi/common/util/FSUtils.java | 2 +- .../hudi/common/minicluster/HdfsTestService.java | 2 +- .../hudi/common/table/log/TestHoodieLogFormat.java | 26 +++------- .../table/view/TestHoodieTableFileSystemView.java | 10 ++-- .../common/util/collection/TestDiskBasedMap.java | 6 +-- .../hadoop/hive/HoodieCombineHiveInputFormat.java | 9 ++-- .../realtime/TestHoodieRealtimeRecordReader.java | 5 +- .../java/org/apache/hudi/hive/util/SchemaUtil.java | 31 +++++------- .../org/apache/hudi/hive/TestHiveSyncTool.java | 4 +- .../org/apache/hudi/hive/util/HiveTestService.java | 11 ++-- .../org/apache/hudi/integ/ITTestHoodieDemo.java | 58 +++++++++++----------- .../apache/hudi/utilities/HDFSParquetImporter.java | 7 ++- .../hudi/utilities/perf/TimelineServerPerf.java | 20 +++----- .../sources/helpers/IncrSourceHelper.java | 4 +- .../utilities/sources/helpers/KafkaOffsetGen.java | 7 +-- 24 files changed, 172 insertions(+), 224 deletions(-) diff --git a/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java b/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java index 3f79096..12d352d 100644 --- a/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java +++ b/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java @@ -205,9 +205,7 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> { } } List<HoodieRecord<T>> taggedRecords = new ArrayList<>(); - HTable hTable = null; - try { - hTable = (HTable) hbaseConnection.getTable(TableName.valueOf(tableName)); + try (HTable hTable = (HTable) hbaseConnection.getTable(TableName.valueOf(tableName))) { List<Get> statements = new ArrayList<>(); List<HoodieRecord> currentBatchOfRecords = new LinkedList<>(); // Do the tagging. @@ -250,15 +248,6 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> { } } catch (IOException e) { throw new HoodieIndexException("Failed to Tag indexed locations because of exception with HBase Client", e); - } finally { - if (hTable != null) { - try { - hTable.close(); - } catch (IOException e) { - // Ignore - } - } - } return taggedRecords.iterator(); }; @@ -444,16 +433,14 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> { */ public int getBatchSize(int numRegionServersForTable, int maxQpsPerRegionServer, int numTasksDuringPut, int maxExecutors, int sleepTimeMs, float qpsFraction) { - int numRSAlive = numRegionServersForTable; - int maxReqPerSec = (int) (qpsFraction * numRSAlive * maxQpsPerRegionServer); - int numTasks = numTasksDuringPut; - int maxParallelPuts = Math.max(1, Math.min(numTasks, maxExecutors)); + int maxReqPerSec = (int) (qpsFraction * numRegionServersForTable * maxQpsPerRegionServer); + int maxParallelPuts = Math.max(1, Math.min(numTasksDuringPut, maxExecutors)); int maxReqsSentPerTaskPerSec = MILLI_SECONDS_IN_A_SECOND / sleepTimeMs; int multiPutBatchSize = Math.max(1, maxReqPerSec / (maxParallelPuts * maxReqsSentPerTaskPerSec)); LOG.info("HbaseIndexThrottling: qpsFraction :" + qpsFraction); - LOG.info("HbaseIndexThrottling: numRSAlive :" + numRSAlive); + LOG.info("HbaseIndexThrottling: numRSAlive :" + numRegionServersForTable); LOG.info("HbaseIndexThrottling: maxReqPerSec :" + maxReqPerSec); - LOG.info("HbaseIndexThrottling: numTasks :" + numTasks); + LOG.info("HbaseIndexThrottling: numTasks :" + numTasksDuringPut); LOG.info("HbaseIndexThrottling: maxExecutors :" + maxExecutors); LOG.info("HbaseIndexThrottling: maxParallelPuts :" + maxParallelPuts); LOG.info("HbaseIndexThrottling: maxReqsSentPerTaskPerSec :" + maxReqsSentPerTaskPerSec); diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCommitArchiveLog.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCommitArchiveLog.java index bafbc8d..6847a24 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCommitArchiveLog.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCommitArchiveLog.java @@ -147,9 +147,9 @@ public class HoodieCommitArchiveLog { HoodieTimeline cleanAndRollbackTimeline = table.getActiveTimeline() .getTimelineOfActions(Sets.newHashSet(HoodieTimeline.CLEAN_ACTION)).filterCompletedInstants(); Stream<HoodieInstant> instants = cleanAndRollbackTimeline.getInstants() - .collect(Collectors.groupingBy(s -> s.getAction())).entrySet().stream().map(i -> { - if (i.getValue().size() > maxCommitsToKeep) { - return i.getValue().subList(0, i.getValue().size() - minCommitsToKeep); + .collect(Collectors.groupingBy(HoodieInstant::getAction)).values().stream().map(hoodieInstants -> { + if (hoodieInstants.size() > maxCommitsToKeep) { + return hoodieInstants.subList(0, hoodieInstants.size() - minCommitsToKeep); } else { return new ArrayList<HoodieInstant>(); } diff --git a/hudi-client/src/main/java/org/apache/hudi/io/compact/strategy/CompactionStrategy.java b/hudi-client/src/main/java/org/apache/hudi/io/compact/strategy/CompactionStrategy.java index 4c03116..dd17212 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/compact/strategy/CompactionStrategy.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/compact/strategy/CompactionStrategy.java @@ -62,10 +62,10 @@ public abstract class CompactionStrategy implements Serializable { public Map<String, Double> captureMetrics(HoodieWriteConfig writeConfig, Option<HoodieBaseFile> dataFile, String partitionPath, List<HoodieLogFile> logFiles) { Map<String, Double> metrics = Maps.newHashMap(); - Long defaultMaxParquetFileSize = writeConfig.getParquetMaxFileSize(); + long defaultMaxParquetFileSize = writeConfig.getParquetMaxFileSize(); // Total size of all the log files Long totalLogFileSize = logFiles.stream().map(HoodieLogFile::getFileSize).filter(size -> size >= 0) - .reduce((size1, size2) -> size1 + size2).orElse(0L); + .reduce(Long::sum).orElse(0L); // Total read will be the base file + all the log files Long totalIORead = FSUtils.getSizeInMB((dataFile.isPresent() ? dataFile.get().getFileSize() : 0L) + totalLogFileSize); @@ -73,11 +73,11 @@ public abstract class CompactionStrategy implements Serializable { Long totalIOWrite = FSUtils.getSizeInMB(dataFile.isPresent() ? dataFile.get().getFileSize() : defaultMaxParquetFileSize); // Total IO will the the IO for read + write - Long totalIO = totalIORead + totalIOWrite; + long totalIO = totalIORead + totalIOWrite; // Save these metrics and we will use during the filter metrics.put(TOTAL_IO_READ_MB, totalIORead.doubleValue()); metrics.put(TOTAL_IO_WRITE_MB, totalIOWrite.doubleValue()); - metrics.put(TOTAL_IO_MB, totalIO.doubleValue()); + metrics.put(TOTAL_IO_MB, (double) totalIO); metrics.put(TOTAL_LOG_FILE_SIZE, totalLogFileSize.doubleValue()); metrics.put(TOTAL_LOG_FILES, (double) logFiles.size()); return metrics; diff --git a/hudi-client/src/main/java/org/apache/hudi/metrics/Metrics.java b/hudi-client/src/main/java/org/apache/hudi/metrics/Metrics.java index 4b19441..533208f 100644 --- a/hudi-client/src/main/java/org/apache/hudi/metrics/Metrics.java +++ b/hudi-client/src/main/java/org/apache/hudi/metrics/Metrics.java @@ -49,17 +49,14 @@ public class Metrics { } // reporter.start(); - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { - try { - reporter.report(); - Closeables.close(reporter.getReporter(), true); - } catch (Exception e) { - e.printStackTrace(); - } + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + try { + reporter.report(); + Closeables.close(reporter.getReporter(), true); + } catch (Exception e) { + e.printStackTrace(); } - }); + })); } public static Metrics getInstance() { diff --git a/hudi-client/src/test/java/org/apache/hudi/TestCleaner.java b/hudi-client/src/test/java/org/apache/hudi/TestCleaner.java index 24aa9cd..662273a 100644 --- a/hudi-client/src/test/java/org/apache/hudi/TestCleaner.java +++ b/hudi-client/src/test/java/org/apache/hudi/TestCleaner.java @@ -68,6 +68,7 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -106,7 +107,7 @@ public class TestCleaner extends TestHoodieClientBase { Function2<List<HoodieRecord>, String, Integer> recordGenFunction, Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> insertFn) throws Exception { - /** + /* * do a big insert (this is basically same as insert part of upsert, just adding it here so we can catch breakages * in insert(), if the implementation diverges.) */ @@ -606,8 +607,8 @@ public class TestCleaner extends TestHoodieClientBase { String filePath2 = metaClient.getBasePath() + "/" + partition1 + "/" + fileName2; List<String> deletePathPatterns1 = Arrays.asList(filePath1, filePath2); - List<String> successDeleteFiles1 = Arrays.asList(filePath1); - List<String> failedDeleteFiles1 = Arrays.asList(filePath2); + List<String> successDeleteFiles1 = Collections.singletonList(filePath1); + List<String> failedDeleteFiles1 = Collections.singletonList(filePath2); // create partition1 clean stat. HoodieCleanStat cleanStat1 = new HoodieCleanStat(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS, @@ -630,7 +631,8 @@ public class TestCleaner extends TestHoodieClientBase { // map with relative path. Map<String, Tuple3> newExpected = new HashMap<>(); - newExpected.put(partition1, new Tuple3<>(Arrays.asList(fileName1, fileName2), Arrays.asList(fileName1), Arrays.asList(fileName2))); + newExpected.put(partition1, new Tuple3<>(Arrays.asList(fileName1, fileName2), Collections.singletonList(fileName1), + Collections.singletonList(fileName2))); newExpected.put(partition2, new Tuple3<>(deletePathPatterns2, successDeleteFiles2, failedDeleteFiles2)); HoodieCleanMetadata metadata = @@ -1079,19 +1081,18 @@ public class TestCleaner extends TestHoodieClientBase { }); // Test for progress (Did we clean some files ?) - long numFilesUnderCompactionDeleted = hoodieCleanStats.stream().flatMap(cleanStat -> { - return convertPathToFileIdWithCommitTime(newMetaClient, cleanStat.getDeletePathPatterns()) - .map(fileIdWithCommitTime -> { - if (expFileIdToPendingCompaction.containsKey(fileIdWithCommitTime.getKey())) { - Assert.assertTrue("Deleted instant time must be less than pending compaction", - HoodieTimeline.compareTimestamps( - fileIdToLatestInstantBeforeCompaction.get(fileIdWithCommitTime.getKey()), - fileIdWithCommitTime.getValue(), HoodieTimeline.GREATER)); - return true; - } - return false; - }); - }).filter(x -> x).count(); + long numFilesUnderCompactionDeleted = hoodieCleanStats.stream() + .flatMap(cleanStat -> convertPathToFileIdWithCommitTime(newMetaClient, cleanStat.getDeletePathPatterns()) + .map(fileIdWithCommitTime -> { + if (expFileIdToPendingCompaction.containsKey(fileIdWithCommitTime.getKey())) { + Assert.assertTrue("Deleted instant time must be less than pending compaction", + HoodieTimeline.compareTimestamps( + fileIdToLatestInstantBeforeCompaction.get(fileIdWithCommitTime.getKey()), + fileIdWithCommitTime.getValue(), HoodieTimeline.GREATER)); + return true; + } + return false; + })).filter(x -> x).count(); long numDeleted = hoodieCleanStats.stream().mapToLong(cleanStat -> cleanStat.getDeletePathPatterns().size()).sum(); // Tighter check for regression @@ -1123,7 +1124,7 @@ public class TestCleaner extends TestHoodieClientBase { * @throws IOException in case of error */ private int getTotalTempFiles() throws IOException { - RemoteIterator itr = fs.listFiles(new Path(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME), true); + RemoteIterator<?> itr = fs.listFiles(new Path(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME), true); int count = 0; while (itr.hasNext()) { count++; diff --git a/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java b/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java index 56b3c07..e0d2a53 100644 --- a/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java +++ b/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java @@ -210,13 +210,10 @@ public class HoodieTestDataGenerator { Path commitFile = new Path(basePath + "/" + HoodieTableMetaClient.AUXILIARYFOLDER_NAME + "/" + instant.getFileName()); FileSystem fs = FSUtils.getFs(basePath, configuration); - FSDataOutputStream os = fs.create(commitFile, true); - HoodieCompactionPlan workload = new HoodieCompactionPlan(); - try { + try (FSDataOutputStream os = fs.create(commitFile, true)) { + HoodieCompactionPlan workload = new HoodieCompactionPlan(); // Write empty commit metadata os.writeBytes(new String(AvroUtils.serializeCompactionPlan(workload).get(), StandardCharsets.UTF_8)); - } finally { - os.close(); } } @@ -225,13 +222,10 @@ public class HoodieTestDataGenerator { Path commitFile = new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + HoodieTimeline.makeSavePointFileName(commitTime)); FileSystem fs = FSUtils.getFs(basePath, configuration); - FSDataOutputStream os = fs.create(commitFile, true); - HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); - try { + try (FSDataOutputStream os = fs.create(commitFile, true)) { + HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); // Write empty commit metadata os.writeBytes(new String(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); - } finally { - os.close(); } } diff --git a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java index 15a8af7..c605654 100644 --- a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java +++ b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java @@ -42,6 +42,7 @@ import org.junit.Test; import java.io.File; import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -246,13 +247,17 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness { JavaRDD<HoodieRecord> recordRDD = jsc.parallelize(Arrays.asList(record1, record2, record3, record5)); String filename0 = - HoodieClientTestUtils.writeParquetFile(basePath, "2016/04/01", Arrays.asList(record1), schema, null, false); + HoodieClientTestUtils.writeParquetFile(basePath, "2016/04/01", Collections.singletonList(record1), + schema, null, false); String filename1 = - HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", Lists.newArrayList(), schema, null, false); + HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", Lists.newArrayList(), + schema, null, false); String filename2 = - HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", Arrays.asList(record2), schema, null, false); + HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", Collections.singletonList(record2), + schema, null, false); String filename3 = - HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", Arrays.asList(record4), schema, null, false); + HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", Collections.singletonList(record4), + schema, null, false); // intentionally missed the partition "2015/03/12" to see if the GlobalBloomIndex can pick it up metaClient = HoodieTableMetaClient.reload(metaClient); @@ -265,21 +270,29 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness { JavaRDD<HoodieRecord> taggedRecordRDD = index.tagLocation(recordRDD, jsc, table); for (HoodieRecord record : taggedRecordRDD.collect()) { - if (record.getRecordKey().equals("000")) { - assertTrue(record.getCurrentLocation().getFileId().equals(FSUtils.getFileId(filename0))); - assertEquals(((TestRawTripPayload) record.getData()).getJsonData(), rowChange1.getJsonData()); - } else if (record.getRecordKey().equals("001")) { - assertTrue(record.getCurrentLocation().getFileId().equals(FSUtils.getFileId(filename2))); - assertEquals(((TestRawTripPayload) record.getData()).getJsonData(), rowChange2.getJsonData()); - } else if (record.getRecordKey().equals("002")) { - assertTrue(!record.isCurrentLocationKnown()); - assertEquals(((TestRawTripPayload) record.getData()).getJsonData(), rowChange3.getJsonData()); - } else if (record.getRecordKey().equals("003")) { - assertTrue(record.getCurrentLocation().getFileId().equals(FSUtils.getFileId(filename3))); - assertEquals(((TestRawTripPayload) record.getData()).getJsonData(), rowChange5.getJsonData()); - } else if (record.getRecordKey().equals("004")) { - assertTrue(record.getCurrentLocation().getFileId().equals(FSUtils.getFileId(filename3))); - assertEquals(((TestRawTripPayload) record.getData()).getJsonData(), rowChange4.getJsonData()); + switch (record.getRecordKey()) { + case "000": + assertEquals(record.getCurrentLocation().getFileId(), FSUtils.getFileId(filename0)); + assertEquals(((TestRawTripPayload) record.getData()).getJsonData(), rowChange1.getJsonData()); + break; + case "001": + assertEquals(record.getCurrentLocation().getFileId(), FSUtils.getFileId(filename2)); + assertEquals(((TestRawTripPayload) record.getData()).getJsonData(), rowChange2.getJsonData()); + break; + case "002": + assertFalse(record.isCurrentLocationKnown()); + assertEquals(((TestRawTripPayload) record.getData()).getJsonData(), rowChange3.getJsonData()); + break; + case "003": + assertEquals(record.getCurrentLocation().getFileId(), FSUtils.getFileId(filename3)); + assertEquals(((TestRawTripPayload) record.getData()).getJsonData(), rowChange5.getJsonData()); + break; + case "004": + assertEquals(record.getCurrentLocation().getFileId(), FSUtils.getFileId(filename3)); + assertEquals(((TestRawTripPayload) record.getData()).getJsonData(), rowChange4.getJsonData()); + break; + default: + throw new IllegalArgumentException("Unknown Key: " + record.getRecordKey()); } } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/HoodieJsonPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/HoodieJsonPayload.java index 9e95fd8..1c15c66 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/HoodieJsonPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/HoodieJsonPayload.java @@ -86,11 +86,8 @@ public class HoodieJsonPayload implements HoodieRecordPayload<HoodieJsonPayload> } private String unCompressData(byte[] data) throws IOException { - InflaterInputStream iis = new InflaterInputStream(new ByteArrayInputStream(data)); - try { + try (InflaterInputStream iis = new InflaterInputStream(new ByteArrayInputStream(data))) { return FileIOUtils.readAsUTFString(iis, dataSize); - } finally { - iis.close(); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java index 354f809..40a5243 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java @@ -46,6 +46,7 @@ import java.io.IOException; import java.util.Arrays; import java.util.HashMap; import java.util.Map; +import java.util.Objects; /** * Scans a log file and provides block level iterator on the log file Loads the entire block contents in memory Can emit @@ -107,25 +108,22 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader { * Close the inputstream if not closed when the JVM exits. */ private void addShutDownHook() { - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { - try { - close(); - } catch (Exception e) { - LOG.warn("unable to close input stream for log file " + logFile, e); - // fail silently for any sort of exception - } + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + try { + close(); + } catch (Exception e) { + LOG.warn("unable to close input stream for log file " + logFile, e); + // fail silently for any sort of exception } - }); + })); } // TODO : convert content and block length to long by using ByteBuffer, raw byte [] allows // for max of Integer size private HoodieLogBlock readBlock() throws IOException { - int blocksize = -1; - int type = -1; + int blocksize; + int type; HoodieLogBlockType blockType = null; Map<HeaderMetadataType, String> header = null; @@ -190,7 +188,7 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader { // 9. Read the log block end position in the log file long blockEndPos = inputStream.getPos(); - switch (blockType) { + switch (Objects.requireNonNull(blockType)) { // based on type read the block case AVRO_DATA_BLOCK: if (nextBlockVersion.getVersion() == HoodieLogFormatVersion.DEFAULT_VERSION) { @@ -278,10 +276,10 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader { } } - @Override - /** + /* * hasNext is not idempotent. TODO - Fix this. It is okay for now - PR */ + @Override public boolean hasNext() { try { return readMagic(); @@ -315,10 +313,7 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader { long pos = inputStream.getPos(); // 1. Read magic header from the start of the block inputStream.readFully(MAGIC_BUFFER, 0, 6); - if (!Arrays.equals(MAGIC_BUFFER, HoodieLogFormat.MAGIC)) { - return false; - } - return true; + return Arrays.equals(MAGIC_BUFFER, HoodieLogFormat.MAGIC); } @Override diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/FSUtils.java index 43b0030..87925c7 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/FSUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/FSUtils.java @@ -193,7 +193,7 @@ public class FSUtils { return partitions; } - public static final List<String> getAllDataFilesForMarkers(FileSystem fs, String basePath, String instantTs, + public static List<String> getAllDataFilesForMarkers(FileSystem fs, String basePath, String instantTs, String markerDir) throws IOException { List<String> dataFiles = new LinkedList<>(); processFiles(fs, markerDir, (status) -> { diff --git a/hudi-common/src/test/java/org/apache/hudi/common/minicluster/HdfsTestService.java b/hudi-common/src/test/java/org/apache/hudi/common/minicluster/HdfsTestService.java index 9bc9a8d..7fb3bfd 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/minicluster/HdfsTestService.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/minicluster/HdfsTestService.java @@ -79,7 +79,7 @@ public class HdfsTestService { // Configure and start the HDFS cluster // boolean format = shouldFormatDFSCluster(localDFSLocation, clean); - hadoopConf = configureDFSCluster(hadoopConf, localDFSLocation, bindIP, namenodeRpcPort, + configureDFSCluster(hadoopConf, localDFSLocation, bindIP, namenodeRpcPort, datanodePort, datanodeIpcPort, datanodeHttpPort); miniDfsCluster = new MiniDFSCluster.Builder(hadoopConf).numDataNodes(1).format(format).checkDataNodeAddrConfig(true) .checkDataNodeHostConfig(true).build(); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/log/TestHoodieLogFormat.java b/hudi-common/src/test/java/org/apache/hudi/common/table/log/TestHoodieLogFormat.java index 1b9667c..6c01d4a 100755 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/log/TestHoodieLogFormat.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/log/TestHoodieLogFormat.java @@ -285,7 +285,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { } } - /** + /* * This is actually a test on concurrent append and not recovery lease. Commenting this out. * https://issues.apache.org/jira/browse/HUDI-117 */ @@ -337,7 +337,6 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { assertEquals(2, statuses.length); } - @SuppressWarnings("unchecked") @Test public void testBasicWriteAndScan() throws IOException, URISyntaxException, InterruptedException { Writer writer = @@ -366,7 +365,6 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { reader.close(); } - @SuppressWarnings("unchecked") @Test public void testBasicAppendAndRead() throws IOException, URISyntaxException, InterruptedException { Writer writer = @@ -434,7 +432,6 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { reader.close(); } - @SuppressWarnings("unchecked") @Test public void testBasicAppendAndScanMultipleFiles() throws IOException, URISyntaxException, InterruptedException { Writer writer = @@ -911,11 +908,6 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, header); writer = writer.appendBlock(dataBlock); - List<String> originalKeys = - copyOfRecords1.stream().map(s -> ((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString()) - .collect(Collectors.toList()); - - // Delete 50 keys // Delete 50 keys List<HoodieKey> deletedKeys = copyOfRecords1.stream() .map(s -> (new HoodieKey(((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(), @@ -1127,8 +1119,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { * duplicate data. * */ - private void testAvroLogRecordReaderMergingMultipleLogFiles(int numRecordsInLog1, int numRecordsInLog2) - throws IOException, URISyntaxException, InterruptedException { + private void testAvroLogRecordReaderMergingMultipleLogFiles(int numRecordsInLog1, int numRecordsInLog2) { try { // Write one Data block with same InstantTime (written in same batch) Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema()); @@ -1178,8 +1169,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { } @Test - public void testAvroLogRecordReaderWithFailedTaskInFirstStageAttempt() - throws IOException, URISyntaxException, InterruptedException { + public void testAvroLogRecordReaderWithFailedTaskInFirstStageAttempt() { /* * FIRST_ATTEMPT_FAILED: * Original task from the stage attempt failed, but subsequent stage retry succeeded. @@ -1188,8 +1178,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { } @Test - public void testAvroLogRecordReaderWithFailedTaskInSecondStageAttempt() - throws IOException, URISyntaxException, InterruptedException { + public void testAvroLogRecordReaderWithFailedTaskInSecondStageAttempt() { /* * SECOND_ATTEMPT_FAILED: * Original task from stage attempt succeeded, but subsequent retry attempt failed. @@ -1198,8 +1187,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { } @Test - public void testAvroLogRecordReaderTasksSucceededInBothStageAttempts() - throws IOException, URISyntaxException, InterruptedException { + public void testAvroLogRecordReaderTasksSucceededInBothStageAttempts() { /* * BOTH_ATTEMPTS_SUCCEEDED: * Original task from the stage attempt and duplicate task from the stage retry succeeded. @@ -1207,7 +1195,6 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { testAvroLogRecordReaderMergingMultipleLogFiles(100, 100); } - @SuppressWarnings("unchecked") @Test public void testBasicAppendAndReadInReverse() throws IOException, URISyntaxException, InterruptedException { Writer writer = @@ -1335,7 +1322,6 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { reader.close(); } - @SuppressWarnings("unchecked") @Test public void testBasicAppendAndTraverseInReverse() throws IOException, URISyntaxException, InterruptedException { Writer writer = @@ -1392,7 +1378,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { } @Test - public void testV0Format() throws IOException, InterruptedException, URISyntaxException { + public void testV0Format() throws IOException, URISyntaxException { // HoodieLogFormatVersion.DEFAULT_VERSION has been deprecated so we cannot // create a writer for it. So these tests are only for the HoodieAvroDataBlock // of older version. diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java index 2b8f04f..0a910e9 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java @@ -221,7 +221,7 @@ public class TestHoodieTableFileSystemView extends HoodieCommonTestHarness { */ public Stream<FileSlice> getLatestRawFileSlices(String partitionPath) { return fsView.getAllFileGroups(partitionPath).map(HoodieFileGroup::getLatestFileSlicesIncludingInflight) - .filter(fileSliceOpt -> fileSliceOpt.isPresent()).map(Option::get); + .filter(Option::isPresent).map(Option::get); } /** @@ -322,7 +322,7 @@ public class TestHoodieTableFileSystemView extends HoodieCommonTestHarness { assertEquals("Expect only valid data-file", dataFileName, dataFiles.get(0).getFileName()); } - /** Merge API Tests **/ + // Merge API Tests List<FileSlice> fileSliceList = rtView.getLatestMergedFileSlicesBeforeOrOn(partitionPath, deltaInstantTime5).collect(Collectors.toList()); assertEquals("Expect file-slice to be merged", 1, fileSliceList.size()); @@ -355,7 +355,7 @@ public class TestHoodieTableFileSystemView extends HoodieCommonTestHarness { assertEquals("Log File Order check", fileName4, logFiles.get(0).getFileName()); assertEquals("Log File Order check", fileName3, logFiles.get(1).getFileName()); - /** Data Files API tests */ + // Data Files API tests dataFiles = roView.getLatestBaseFiles().collect(Collectors.toList()); if (skipCreatingDataFile) { assertEquals("Expect no data file to be returned", 0, dataFiles.size()); @@ -385,7 +385,7 @@ public class TestHoodieTableFileSystemView extends HoodieCommonTestHarness { dataFiles.forEach(df -> assertEquals("Expect data-file for instant 1 be returned", df.getCommitTime(), instantTime1)); } - /** Inflight/Orphan File-groups needs to be in the view **/ + // Inflight/Orphan File-groups needs to be in the view // There is a data-file with this inflight file-id final String inflightFileId1 = UUID.randomUUID().toString(); @@ -507,7 +507,7 @@ public class TestHoodieTableFileSystemView extends HoodieCommonTestHarness { assertEquals("Log File Order check", fileName4, logFiles.get(0).getFileName()); assertEquals("Log File Order check", fileName3, logFiles.get(1).getFileName()); - /** Data Files API tests */ + // Data Files API tests dataFiles = roView.getLatestBaseFiles().collect(Collectors.toList()); assertEquals("Expect only one data-file to be sent", 1, dataFiles.size()); dataFiles.forEach(df -> assertEquals("Expect data-file created by compaction be returned", df.getCommitTime(), compactionRequestedTime)); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestDiskBasedMap.java b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestDiskBasedMap.java index 76d7e06..2cc726e 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestDiskBasedMap.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestDiskBasedMap.java @@ -167,7 +167,7 @@ public class TestDiskBasedMap extends HoodieCommonTestHarness { schema = SchemaTestUtil.getSimpleSchema(); List<IndexedRecord> indexedRecords = SchemaTestUtil.generateHoodieTestRecords(0, 1); hoodieRecords = - indexedRecords.stream().map(r -> new HoodieRecord(new HoodieKey(UUID.randomUUID().toString(), "0000/00/00"), + indexedRecords.stream().map(r -> new HoodieRecord<>(new HoodieKey(UUID.randomUUID().toString(), "0000/00/00"), new AvroBinaryTestPayload(Option.of((GenericRecord) r)))).collect(Collectors.toList()); payloadSize = SpillableMapUtils.computePayloadSize(hoodieRecords.remove(0), new HoodieRecordSizeEstimator(schema)); assertTrue(payloadSize > 0); @@ -176,7 +176,7 @@ public class TestDiskBasedMap extends HoodieCommonTestHarness { final Schema simpleSchemaWithMetadata = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema()); indexedRecords = SchemaTestUtil.generateHoodieTestRecords(0, 1); hoodieRecords = indexedRecords.stream() - .map(r -> new HoodieRecord(new HoodieKey(UUID.randomUUID().toString(), "0000/00/00"), + .map(r -> new HoodieRecord<>(new HoodieKey(UUID.randomUUID().toString(), "0000/00/00"), new AvroBinaryTestPayload( Option.of(HoodieAvroUtils.rewriteRecord((GenericRecord) r, simpleSchemaWithMetadata))))) .collect(Collectors.toList()); @@ -193,7 +193,7 @@ public class TestDiskBasedMap extends HoodieCommonTestHarness { // Test sizeEstimatorPerformance with simpleSchema Schema schema = SchemaTestUtil.getSimpleSchema(); List<HoodieRecord> hoodieRecords = SchemaTestUtil.generateHoodieTestRecords(0, 1, schema); - HoodieRecordSizeEstimator sizeEstimator = new HoodieRecordSizeEstimator(schema); + HoodieRecordSizeEstimator sizeEstimator = new HoodieRecordSizeEstimator<>(schema); HoodieRecord record = hoodieRecords.remove(0); long startTime = System.currentTimeMillis(); SpillableMapUtils.computePayloadSize(record, sizeEstimator); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java index 506b6cf..0c3f141 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java @@ -333,8 +333,7 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend if (o instanceof CombinePathInputFormat) { CombinePathInputFormat mObj = (CombinePathInputFormat) o; return (opList.equals(mObj.opList)) && (inputFormatClassName.equals(mObj.inputFormatClassName)) - && (deserializerClassName == null ? (mObj.deserializerClassName == null) - : deserializerClassName.equals(mObj.deserializerClassName)); + && (Objects.equals(deserializerClassName, mObj.deserializerClassName)); } return false; } @@ -353,16 +352,16 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend init(job); Map<Path, ArrayList<String>> pathToAliases = mrwork.getPathToAliases(); Map<String, Operator<? extends OperatorDesc>> aliasToWork = mrwork.getAliasToWork(); - /** MOD - Initialize a custom combine input format shim that will call listStatus on the custom inputFormat **/ + /* MOD - Initialize a custom combine input format shim that will call listStatus on the custom inputFormat **/ HoodieCombineHiveInputFormat.HoodieCombineFileInputFormatShim combine = - new HoodieCombineHiveInputFormat.HoodieCombineFileInputFormatShim(); + new HoodieCombineHiveInputFormat.HoodieCombineFileInputFormatShim<>(); InputSplit[] splits; if (combine.getInputPathsShim(job).length == 0) { throw new IOException("No input paths specified in job"); } - ArrayList<InputSplit> result = new ArrayList<>(); + List<InputSplit> result = new ArrayList<>(); // combine splits only from same tables and same partitions. Do not combine splits from multiple // tables or multiple partitions. diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java index 89b7168..0586bc4 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java @@ -210,7 +210,7 @@ public class TestHoodieRealtimeRecordReader { action.equals(HoodieTimeline.ROLLBACK_ACTION) ? String.valueOf(baseInstantTs + logVersion - 2) : instantTime; - HoodieLogFormat.Writer writer = null; + HoodieLogFormat.Writer writer; if (action.equals(HoodieTimeline.ROLLBACK_ACTION)) { writer = writeRollback(partitionDir, schema, "fileid0", baseInstant, instantTime, String.valueOf(baseInstantTs + logVersion - 1), logVersion); @@ -317,7 +317,7 @@ public class TestHoodieRealtimeRecordReader { numRecordsAtCommit2++; Assert.assertTrue(gotKey > firstBatchLastRecordKey); Assert.assertTrue(gotKey <= secondBatchLastRecordKey); - assertEquals((int) gotKey, lastSeenKeyFromLog + 1); + assertEquals(gotKey, lastSeenKeyFromLog + 1); lastSeenKeyFromLog++; } else { numRecordsAtCommit1++; @@ -491,7 +491,6 @@ public class TestHoodieRealtimeRecordReader { writer = writeRollbackBlockToLogFile(partitionDir, schema, "fileid0", commitTime, newCommitTime, "101", 1); logFilePaths.add(writer.getLogFile().getPath().toString()); writer.close(); - assertTrue("block - size should be > 0", size > 0); InputFormatTestUtil.deltaCommit(basePath, newCommitTime); // create a split with baseFile (parquet file written earlier) and new log file(s) diff --git a/hudi-hive/src/main/java/org/apache/hudi/hive/util/SchemaUtil.java b/hudi-hive/src/main/java/org/apache/hudi/hive/util/SchemaUtil.java index d85778a..6ca9957 100644 --- a/hudi-hive/src/main/java/org/apache/hudi/hive/util/SchemaUtil.java +++ b/hudi-hive/src/main/java/org/apache/hudi/hive/util/SchemaUtil.java @@ -18,6 +18,10 @@ package org.apache.hudi.hive.util; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.table.log.HoodieLogFormat.Reader; @@ -26,11 +30,6 @@ import org.apache.hudi.common.table.log.block.HoodieLogBlock; import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.HoodieHiveSyncException; import org.apache.hudi.hive.SchemaDifference; - -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.parquet.avro.AvroSchemaConverter; @@ -46,7 +45,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.stream.Collectors; /** * Schema Utilities. @@ -367,10 +365,9 @@ public class SchemaUtil { return true; } else if (prevType.equalsIgnoreCase("float") && newType.equalsIgnoreCase("double")) { return true; - } else if (prevType.contains("struct") && newType.toLowerCase().contains("struct")) { - return true; + } else { + return prevType.contains("struct") && newType.toLowerCase().contains("struct"); } - return false; } public static String generateSchemaString(MessageType storageSchema) throws IOException { @@ -403,18 +400,17 @@ public class SchemaUtil { .append(getPartitionKeyType(hiveSchema, partitionKeyWithTicks)).toString()); } - String partitionsStr = partitionFields.stream().collect(Collectors.joining(",")); + String partitionsStr = String.join(",", partitionFields); StringBuilder sb = new StringBuilder("CREATE EXTERNAL TABLE IF NOT EXISTS "); - sb = sb.append(HIVE_ESCAPE_CHARACTER).append(config.databaseName).append(HIVE_ESCAPE_CHARACTER) + sb.append(HIVE_ESCAPE_CHARACTER).append(config.databaseName).append(HIVE_ESCAPE_CHARACTER) .append(".").append(HIVE_ESCAPE_CHARACTER).append(tableName).append(HIVE_ESCAPE_CHARACTER); - sb = sb.append("( ").append(columns).append(")"); + sb.append("( ").append(columns).append(")"); if (!config.partitionFields.isEmpty()) { - sb = sb.append(" PARTITIONED BY (").append(partitionsStr).append(")"); + sb.append(" PARTITIONED BY (").append(partitionsStr).append(")"); } - sb = sb.append(" ROW FORMAT SERDE '").append(serdeClass).append("'"); - sb = sb.append(" STORED AS INPUTFORMAT '").append(inputFormatClass).append("'"); - sb = sb.append(" OUTPUTFORMAT '").append(outputFormatClass).append("' LOCATION '").append(config.basePath) - .append("'"); + sb.append(" ROW FORMAT SERDE '").append(serdeClass).append("'"); + sb.append(" STORED AS INPUTFORMAT '").append(inputFormatClass).append("'"); + sb.append(" OUTPUTFORMAT '").append(outputFormatClass).append("' LOCATION '").append(config.basePath).append("'"); return sb.toString(); } @@ -433,7 +429,6 @@ public class SchemaUtil { * * @return */ - @SuppressWarnings("OptionalUsedAsFieldOrParameterType") public static MessageType readSchemaFromLogFile(FileSystem fs, Path path) throws IOException { Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(path), null); HoodieAvroDataBlock lastBlock = null; diff --git a/hudi-hive/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java b/hudi-hive/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java index 50bb0e5..49692f5 100644 --- a/hudi-hive/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java +++ b/hudi-hive/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java @@ -38,7 +38,6 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import java.io.IOException; -import java.net.URISyntaxException; import java.util.Arrays; import java.util.Collection; import java.util.List; @@ -47,7 +46,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -@SuppressWarnings("ConstantConditions") @RunWith(Parameterized.class) public class TestHiveSyncTool { @@ -64,7 +62,7 @@ public class TestHiveSyncTool { } @Before - public void setUp() throws IOException, InterruptedException, URISyntaxException { + public void setUp() throws IOException, InterruptedException { TestUtil.setUp(); } diff --git a/hudi-hive/src/test/java/org/apache/hudi/hive/util/HiveTestService.java b/hudi-hive/src/test/java/org/apache/hudi/hive/util/HiveTestService.java index d82c33b..fc7675f 100644 --- a/hudi-hive/src/test/java/org/apache/hudi/hive/util/HiveTestService.java +++ b/hudi-hive/src/test/java/org/apache/hudi/hive/util/HiveTestService.java @@ -265,11 +265,11 @@ public class HiveTestService { ? new ChainedTTransportFactory(new TFramedTransport.Factory(), new TUGIContainingTransport.Factory()) : new TUGIContainingTransport.Factory(); - processor = new TUGIBasedProcessor<IHMSHandler>(handler); + processor = new TUGIBasedProcessor<>(handler); LOG.info("Starting DB backed MetaStore Server with SetUGI enabled"); } else { transFactory = useFramedTransport ? new TFramedTransport.Factory() : new TTransportFactory(); - processor = new TSetIpAddressProcessor<IHMSHandler>(handler); + processor = new TSetIpAddressProcessor<>(handler); LOG.info("Starting DB backed MetaStore Server"); } @@ -278,12 +278,7 @@ public class HiveTestService { .minWorkerThreads(minWorkerThreads).maxWorkerThreads(maxWorkerThreads); final TServer tServer = new TThreadPoolServer(args); - executorService.submit(new Runnable() { - @Override - public void run() { - tServer.serve(); - } - }); + executorService.submit(tServer::serve); return tServer; } catch (Throwable x) { throw new IOException(x); diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java index 7d11414..f61028e 100644 --- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java +++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java @@ -33,35 +33,35 @@ import java.util.List; */ public class ITTestHoodieDemo extends ITTestBase { - private static String HDFS_DATA_DIR = "/usr/hive/data/input"; - private static String HDFS_BATCH_PATH1 = HDFS_DATA_DIR + "/batch_1.json"; - private static String HDFS_BATCH_PATH2 = HDFS_DATA_DIR + "/batch_2.json"; - private static String HDFS_PRESTO_INPUT_TABLE_CHECK_PATH = HDFS_DATA_DIR + "/presto-table-check.commands"; - private static String HDFS_PRESTO_INPUT_BATCH1_PATH = HDFS_DATA_DIR + "/presto-batch1.commands"; - private static String HDFS_PRESTO_INPUT_BATCH2_PATH = HDFS_DATA_DIR + "/presto-batch2-after-compaction.commands"; - - private static String INPUT_BATCH_PATH1 = HOODIE_WS_ROOT + "/docker/demo/data/batch_1.json"; - private static String PRESTO_INPUT_TABLE_CHECK_RELATIVE_PATH = "/docker/demo/presto-table-check.commands"; - private static String PRESTO_INPUT_BATCH1_RELATIVE_PATH = "/docker/demo/presto-batch1.commands"; - private static String INPUT_BATCH_PATH2 = HOODIE_WS_ROOT + "/docker/demo/data/batch_2.json"; - private static String PRESTO_INPUT_BATCH2_RELATIVE_PATH = "/docker/demo/presto-batch2-after-compaction.commands"; - - private static String COW_BASE_PATH = "/user/hive/warehouse/stock_ticks_cow"; - private static String MOR_BASE_PATH = "/user/hive/warehouse/stock_ticks_mor"; - private static String COW_TABLE_NAME = "stock_ticks_cow"; - private static String MOR_TABLE_NAME = "stock_ticks_mor"; - - private static String DEMO_CONTAINER_SCRIPT = HOODIE_WS_ROOT + "/docker/demo/setup_demo_container.sh"; - private static String MIN_COMMIT_TIME_SCRIPT = HOODIE_WS_ROOT + "/docker/demo/get_min_commit_time.sh"; - private static String HUDI_CLI_TOOL = HOODIE_WS_ROOT + "/hudi-cli/hudi-cli.sh"; - private static String COMPACTION_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/compaction.commands"; - private static String SPARKSQL_BATCH1_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/sparksql-batch1.commands"; - private static String SPARKSQL_BATCH2_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/sparksql-batch2.commands"; - private static String SPARKSQL_INCREMENTAL_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/sparksql-incremental.commands"; - private static String HIVE_TBLCHECK_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/hive-table-check.commands"; - private static String HIVE_BATCH1_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/hive-batch1.commands"; - private static String HIVE_BATCH2_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/hive-batch2-after-compaction.commands"; - private static String HIVE_INCREMENTAL_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/hive-incremental.commands"; + private static final String HDFS_DATA_DIR = "/usr/hive/data/input"; + private static final String HDFS_BATCH_PATH1 = HDFS_DATA_DIR + "/batch_1.json"; + private static final String HDFS_BATCH_PATH2 = HDFS_DATA_DIR + "/batch_2.json"; + private static final String HDFS_PRESTO_INPUT_TABLE_CHECK_PATH = HDFS_DATA_DIR + "/presto-table-check.commands"; + private static final String HDFS_PRESTO_INPUT_BATCH1_PATH = HDFS_DATA_DIR + "/presto-batch1.commands"; + private static final String HDFS_PRESTO_INPUT_BATCH2_PATH = HDFS_DATA_DIR + "/presto-batch2-after-compaction.commands"; + + private static final String INPUT_BATCH_PATH1 = HOODIE_WS_ROOT + "/docker/demo/data/batch_1.json"; + private static final String PRESTO_INPUT_TABLE_CHECK_RELATIVE_PATH = "/docker/demo/presto-table-check.commands"; + private static final String PRESTO_INPUT_BATCH1_RELATIVE_PATH = "/docker/demo/presto-batch1.commands"; + private static final String INPUT_BATCH_PATH2 = HOODIE_WS_ROOT + "/docker/demo/data/batch_2.json"; + private static final String PRESTO_INPUT_BATCH2_RELATIVE_PATH = "/docker/demo/presto-batch2-after-compaction.commands"; + + private static final String COW_BASE_PATH = "/user/hive/warehouse/stock_ticks_cow"; + private static final String MOR_BASE_PATH = "/user/hive/warehouse/stock_ticks_mor"; + private static final String COW_TABLE_NAME = "stock_ticks_cow"; + private static final String MOR_TABLE_NAME = "stock_ticks_mor"; + + private static final String DEMO_CONTAINER_SCRIPT = HOODIE_WS_ROOT + "/docker/demo/setup_demo_container.sh"; + private static final String MIN_COMMIT_TIME_SCRIPT = HOODIE_WS_ROOT + "/docker/demo/get_min_commit_time.sh"; + private static final String HUDI_CLI_TOOL = HOODIE_WS_ROOT + "/hudi-cli/hudi-cli.sh"; + private static final String COMPACTION_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/compaction.commands"; + private static final String SPARKSQL_BATCH1_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/sparksql-batch1.commands"; + private static final String SPARKSQL_BATCH2_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/sparksql-batch2.commands"; + private static final String SPARKSQL_INCREMENTAL_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/sparksql-incremental.commands"; + private static final String HIVE_TBLCHECK_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/hive-table-check.commands"; + private static final String HIVE_BATCH1_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/hive-batch1.commands"; + private static final String HIVE_BATCH2_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/hive-batch2-after-compaction.commands"; + private static final String HIVE_INCREMENTAL_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/hive-incremental.commands"; private static String HIVE_SYNC_CMD_FMT = " --enable-hive-sync --hoodie-conf hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver:10000 " diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java index 2cfc914..aaddee7 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java @@ -260,14 +260,13 @@ public class HDFSParquetImporter implements Serializable { public int parallelism = 1; @Parameter(names = {"--schema-file", "-sf"}, description = "path for Avro schema file", required = true) public String schemaFile = null; - @Parameter(names = {"--format", "-f"}, description = "Format for the input data.", required = false, - validateValueWith = FormatValidator.class) + @Parameter(names = {"--format", "-f"}, description = "Format for the input data.", validateValueWith = FormatValidator.class) public String format = null; - @Parameter(names = {"--spark-master", "-ms"}, description = "Spark master", required = false) + @Parameter(names = {"--spark-master", "-ms"}, description = "Spark master") public String sparkMaster = null; @Parameter(names = {"--spark-memory", "-sm"}, description = "spark memory to use", required = true) public String sparkMemory = null; - @Parameter(names = {"--retry", "-rt"}, description = "number of retries", required = false) + @Parameter(names = {"--retry", "-rt"}, description = "number of retries") public int retry = 0; @Parameter(names = {"--props"}, description = "path to properties file on localfs or dfs, with configurations for " + "hoodie client for importing") diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java index 7fa0da5..e9a5f80 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java @@ -114,7 +114,7 @@ public class TimelineServerPerf implements Serializable { d2.close(); System.out.println("\n\n\nDumping all File Slices"); - selected.stream().forEach(p -> fsView.getAllFileSlices(p).forEach(s -> System.out.println("\tMyFileSlice=" + s))); + selected.forEach(p -> fsView.getAllFileSlices(p).forEach(s -> System.out.println("\tMyFileSlice=" + s))); // Waiting for curl queries if (!useExternalTimelineServer && cfg.waitForManualQueries) { @@ -131,17 +131,16 @@ public class TimelineServerPerf implements Serializable { public List<PerfStats> runLookups(JavaSparkContext jsc, List<String> partitionPaths, SyncableFileSystemView fsView, int numIterations, int concurrency) { - List<PerfStats> perfStats = jsc.parallelize(partitionPaths, cfg.numExecutors).flatMap(p -> { + return jsc.parallelize(partitionPaths, cfg.numExecutors).flatMap(p -> { ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(100); final List<PerfStats> result = new ArrayList<>(); final List<ScheduledFuture<PerfStats>> futures = new ArrayList<>(); List<FileSlice> slices = fsView.getLatestFileSlices(p).collect(Collectors.toList()); String fileId = slices.isEmpty() ? "dummyId" : slices.get(new Random(Double.doubleToLongBits(Math.random())).nextInt(slices.size())).getFileId(); - IntStream.range(0, concurrency).forEach(i -> { - futures.add(executor.schedule(() -> runOneRound(fsView, p, fileId, i, numIterations), 0, TimeUnit.NANOSECONDS)); - }); - futures.stream().forEach(x -> { + IntStream.range(0, concurrency).forEach(i -> futures.add(executor.schedule(() -> runOneRound(fsView, p, fileId, + i, numIterations), 0, TimeUnit.NANOSECONDS))); + futures.forEach(x -> { try { result.add(x.get()); } catch (InterruptedException | ExecutionException e) { @@ -149,12 +148,9 @@ public class TimelineServerPerf implements Serializable { } }); System.out.println("SLICES are="); - slices.stream().forEach(s -> { - System.out.println("\t\tFileSlice=" + s); - }); + slices.forEach(s -> System.out.println("\t\tFileSlice=" + s)); return result.iterator(); }).collect(); - return perfStats; } private static PerfStats runOneRound(SyncableFileSystemView fsView, String partition, String fileId, int id, @@ -194,7 +190,7 @@ public class TimelineServerPerf implements Serializable { } public void dump(List<PerfStats> stats) { - stats.stream().forEach(x -> { + stats.forEach(x -> { String row = String.format("%s,%d,%d,%d,%f,%f,%f,%f\n", x.partition, x.id, x.minTime, x.maxTime, x.meanTime, x.medianTime, x.p75, x.p95); System.out.println(row); @@ -260,7 +256,7 @@ public class TimelineServerPerf implements Serializable { @Parameter(names = {"--num-iterations", "-i"}, description = "Number of iterations for each partitions") public Integer numIterations = 10; - @Parameter(names = {"--spark-master", "-ms"}, description = "Spark master", required = false) + @Parameter(names = {"--spark-master", "-ms"}, description = "Spark master") public String sparkMaster = "local[2]"; @Parameter(names = {"--server-port", "-p"}, description = " Server Port") diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java index 54ea0f3..9787bab 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java @@ -38,7 +38,7 @@ public class IncrSourceHelper { private static String getStrictlyLowerTimestamp(String timestamp) { long ts = Long.parseLong(timestamp); Preconditions.checkArgument(ts > 0, "Timestamp must be positive"); - Long lower = ts - 1; + long lower = ts - 1; return "" + lower; } @@ -73,7 +73,7 @@ public class IncrSourceHelper { Option<HoodieInstant> nthInstant = Option.fromJavaOptional(activeCommitTimeline .findInstantsAfter(beginInstantTime, numInstantsPerFetch).getInstants().reduce((x, y) -> y)); - return Pair.of(beginInstantTime, nthInstant.map(instant -> instant.getTimestamp()).orElse(beginInstantTime)); + return Pair.of(beginInstantTime, nthInstant.map(HoodieInstant::getTimestamp).orElse(beginInstantTime)); } /** diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java index a92a441..4ad8855 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java @@ -94,8 +94,7 @@ public class KafkaOffsetGen { // Create initial offset ranges for each 'to' partition, with from = to offsets. OffsetRange[] ranges = new OffsetRange[toOffsetMap.size()]; - toOffsetMap.entrySet().stream().map(e -> { - TopicPartition tp = e.getKey(); + toOffsetMap.keySet().stream().map(tp -> { long fromOffset = fromOffsetMap.getOrDefault(tp, 0L); return OffsetRange.create(tp, fromOffset, fromOffset); }).sorted(byPartition).collect(Collectors.toList()).toArray(ranges); @@ -208,9 +207,7 @@ public class KafkaOffsetGen { maxEventsToReadFromKafka = (maxEventsToReadFromKafka == Long.MAX_VALUE || maxEventsToReadFromKafka == Integer.MAX_VALUE) ? Config.maxEventsFromKafkaSource : maxEventsToReadFromKafka; long numEvents = sourceLimit == Long.MAX_VALUE ? maxEventsToReadFromKafka : sourceLimit; - OffsetRange[] offsetRanges = CheckpointUtils.computeOffsetRanges(fromOffsets, toOffsets, numEvents); - - return offsetRanges; + return CheckpointUtils.computeOffsetRanges(fromOffsets, toOffsets, numEvents); } // check up checkpoint offsets is valid or not, if true, return checkpoint offsets,