This is an automated email from the ASF dual-hosted git repository. stoty pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push: new 49ad9db27e PHOENIX-7223 Make Sure Tools Always Close HBase Connections on Exit 49ad9db27e is described below commit 49ad9db27e6a196edfa469274a949954e3ea23a5 Author: Istvan Toth <st...@apache.org> AuthorDate: Tue Feb 20 13:01:19 2024 +0100 PHOENIX-7223 Make Sure Tools Always Close HBase Connections on Exit * don't throw exceptions from Tools, log the error and return non-zero exit code * Close all Phoenix Connections in Tools * Close cached CQSI objects on PhoenixDriver.close() --- .../jdbc/ClusterRoleRecordGeneratorTool.java | 19 ++- .../org/apache/phoenix/jdbc/PhoenixDriver.java | 13 +- .../apache/phoenix/jdbc/PhoenixHAAdminTool.java | 63 ++++---- .../org/apache/phoenix/schema/tool/SchemaTool.java | 29 ++-- .../phoenix/mapreduce/AbstractBulkLoadTool.java | 159 ++++++++++++--------- .../apache/phoenix/mapreduce/OrphanViewTool.java | 2 + .../phoenix/mapreduce/index/IndexUpgradeTool.java | 11 +- .../phoenix/schema/stats/UpdateStatisticsTool.java | 19 ++- .../util/MergeViewIndexIdSequencesTool.java | 17 +-- .../apache/phoenix/end2end/CsvBulkLoadToolIT.java | 41 +++--- .../phoenix/end2end/RegexBulkLoadToolIT.java | 20 +-- 11 files changed, 221 insertions(+), 172 deletions(-) diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecordGeneratorTool.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecordGeneratorTool.java index 49ec3db61a..93899f87a2 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecordGeneratorTool.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecordGeneratorTool.java @@ -66,13 +66,18 @@ public class ClusterRoleRecordGeneratorTool extends Configured implements Tool { @Override public int run(String[] args) throws Exception { - String fileName = getConf().get(PHOENIX_HA_GENERATOR_FILE_ATTR); - File file = StringUtils.isEmpty(fileName) - ? File.createTempFile("phoenix.ha.cluster.role.records", ".json") - : new File(fileName); - JacksonUtil.getObjectWriterPretty().writeValue(file, listAllRecordsByZk()); - System.out.println("Created JSON file '" + file + "'"); - return 0; + try { + String fileName = getConf().get(PHOENIX_HA_GENERATOR_FILE_ATTR); + File file = StringUtils.isEmpty(fileName) + ? File.createTempFile("phoenix.ha.cluster.role.records", ".json") + : new File(fileName); + JacksonUtil.getObjectWriterPretty().writeValue(file, listAllRecordsByZk()); + System.out.println("Created JSON file '" + file + "'"); + return 0; + } catch (Exception e) { + e.printStackTrace(); + return -1; + } } List<ClusterRoleRecord> listAllRecordsByZk() throws IOException { diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java index ca412d5238..8bdc6ea182 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java @@ -144,6 +144,7 @@ public final class PhoenixDriver extends PhoenixEmbeddedDriver { } // One entry per cluster here + // TODO that's not true, we can have multiple connections with different configs / principals private final Cache<ConnectionInfo, ConnectionQueryServices> connectionQueryServicesCache = initializeConnectionCache(); @@ -341,8 +342,18 @@ public final class PhoenixDriver extends PhoenixEmbeddedDriver { services = null; } } + + if (connectionQueryServicesCache != null) { + try { + for (ConnectionQueryServices cqsi : connectionQueryServicesCache.asMap().values()) { + cqsi.close(); + } + } catch (Exception e) { + LOGGER.warn("Failed to close ConnectionQueryServices instance", e); + } + } } - + private enum LockMode { READ, WRITE }; diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdminTool.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdminTool.java index c6bdadc335..e7a9cd7a22 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdminTool.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdminTool.java @@ -105,38 +105,43 @@ public class PhoenixHAAdminTool extends Configured implements Tool { return RET_ARGUMENT_ERROR; } - if (commandLine.hasOption(HELP_OPT.getOpt())) { - printUsageMessage(); - return RET_SUCCESS; - } else if (commandLine.hasOption(LIST_OPT.getOpt())) { // list - String zkUrl = getLocalZkUrl(getConf()); // Admin is created against local ZK cluster - try (PhoenixHAAdminHelper admin = new PhoenixHAAdminHelper(zkUrl, getConf(), HighAvailibilityCuratorProvider.INSTANCE)) { - List<ClusterRoleRecord> records = admin.listAllClusterRoleRecordsOnZookeeper(); - JacksonUtil.getObjectWriterPretty().writeValue(System.out, records); - } - } else if (commandLine.hasOption(MANIFEST_OPT.getOpt())) { // create or update - String fileName = commandLine.getOptionValue(MANIFEST_OPT.getOpt()); - List<ClusterRoleRecord> records = readRecordsFromFile(fileName); - boolean forceful = commandLine.hasOption(FORCEFUL_OPT.getOpt()); - Map<String, List<String>> failedHaGroups = syncClusterRoleRecords(records, forceful); - if (!failedHaGroups.isEmpty()) { - System.out.println("Found following HA groups are failing to write the clusters:"); - failedHaGroups.forEach((k, v) -> - System.out.printf("%s -> [%s]\n", k, String.join(",", v))); - return RET_SYNC_ERROR; - } - } else if (commandLine.hasOption(REPAIR_OPT.getOpt())) { // verify and repair - String zkUrl = getLocalZkUrl(getConf()); // Admin is created against local ZK cluster - try (PhoenixHAAdminHelper admin = new PhoenixHAAdminHelper(zkUrl, getConf(), HighAvailibilityCuratorProvider.INSTANCE)) { - List<String> inconsistentRecord = admin.verifyAndRepairWithRemoteZnode(); - if (!inconsistentRecord.isEmpty()) { - System.out.println("Found following inconsistent cluster role records: "); - System.out.print(String.join(",", inconsistentRecord)); - return RET_REPAIR_FOUND_INCONSISTENCIES; + try { + if (commandLine.hasOption(HELP_OPT.getOpt())) { + printUsageMessage(); + return RET_SUCCESS; + } else if (commandLine.hasOption(LIST_OPT.getOpt())) { // list + String zkUrl = getLocalZkUrl(getConf()); // Admin is created against local ZK cluster + try (PhoenixHAAdminHelper admin = new PhoenixHAAdminHelper(zkUrl, getConf(), HighAvailibilityCuratorProvider.INSTANCE)) { + List<ClusterRoleRecord> records = admin.listAllClusterRoleRecordsOnZookeeper(); + JacksonUtil.getObjectWriterPretty().writeValue(System.out, records); + } + } else if (commandLine.hasOption(MANIFEST_OPT.getOpt())) { // create or update + String fileName = commandLine.getOptionValue(MANIFEST_OPT.getOpt()); + List<ClusterRoleRecord> records = readRecordsFromFile(fileName); + boolean forceful = commandLine.hasOption(FORCEFUL_OPT.getOpt()); + Map<String, List<String>> failedHaGroups = syncClusterRoleRecords(records, forceful); + if (!failedHaGroups.isEmpty()) { + System.out.println("Found following HA groups are failing to write the clusters:"); + failedHaGroups.forEach((k, v) -> + System.out.printf("%s -> [%s]\n", k, String.join(",", v))); + return RET_SYNC_ERROR; + } + } else if (commandLine.hasOption(REPAIR_OPT.getOpt())) { // verify and repair + String zkUrl = getLocalZkUrl(getConf()); // Admin is created against local ZK cluster + try (PhoenixHAAdminHelper admin = new PhoenixHAAdminHelper(zkUrl, getConf(), HighAvailibilityCuratorProvider.INSTANCE)) { + List<String> inconsistentRecord = admin.verifyAndRepairWithRemoteZnode(); + if (!inconsistentRecord.isEmpty()) { + System.out.println("Found following inconsistent cluster role records: "); + System.out.print(String.join(",", inconsistentRecord)); + return RET_REPAIR_FOUND_INCONSISTENCIES; + } } } + return RET_SUCCESS; + } catch(Exception e ) { + e.printStackTrace(); + return -1; } - return RET_SUCCESS; } /** diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/tool/SchemaTool.java b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/tool/SchemaTool.java index 6bc4922f90..f000b98064 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/tool/SchemaTool.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/tool/SchemaTool.java @@ -66,19 +66,24 @@ public class SchemaTool extends Configured implements Tool { @Override public int run(String[] args) throws Exception { - populateToolAttributes(args); - SchemaProcessor processor=null; - if(Mode.SYNTH.equals(mode)) { - processor = new SchemaSynthesisProcessor(ddlFile); - } else if(Mode.EXTRACT.equals(mode)) { - conf = HBaseConfiguration.addHbaseResources(getConf()); - processor = new SchemaExtractionProcessor(tenantId, conf, pSchemaName, pTableName); - } else { - throw new Exception(mode+" is not accepted, provide [synth or extract]"); + try { + populateToolAttributes(args); + SchemaProcessor processor=null; + if(Mode.SYNTH.equals(mode)) { + processor = new SchemaSynthesisProcessor(ddlFile); + } else if(Mode.EXTRACT.equals(mode)) { + conf = HBaseConfiguration.addHbaseResources(getConf()); + processor = new SchemaExtractionProcessor(tenantId, conf, pSchemaName, pTableName); + } else { + throw new Exception(mode+" is not accepted, provide [synth or extract]"); + } + output = processor.process(); + LOGGER.info("Effective DDL with " + mode.toString() +": " + output); + return 0; + } catch (Exception e) { + e.printStackTrace(); + return -1; } - output = processor.process(); - LOGGER.info("Effective DDL with " + mode.toString() +": " + output); - return 0; } public String getOutput() { diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java index 70039aa808..142aea63b7 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java @@ -182,7 +182,12 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool { } catch (IllegalStateException e) { printHelpAndExit(e.getMessage(), getOptions()); } - return loadData(conf, cmdLine); + try { + return loadData(conf, cmdLine); + } catch (Exception e) { + e.printStackTrace(); + return -1; + } } @@ -215,85 +220,99 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool { PhoenixTextInputFormat.setSkipHeader(conf); } - final Connection conn = QueryUtil.getConnection(conf); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Reading columns from {} :: {}", ((PhoenixConnection) conn).getURL(), - qualifiedTableName); - } - List<ColumnInfo> importColumns = buildImportColumns(conn, cmdLine, qualifiedTableName); - Preconditions.checkNotNull(importColumns); - Preconditions.checkArgument(!importColumns.isEmpty(), "Column info list is empty"); - FormatToBytesWritableMapper.configureColumnInfoList(conf, importColumns); - boolean ignoreInvalidRows = cmdLine.hasOption(IGNORE_ERRORS_OPT.getOpt()); - conf.setBoolean(FormatToBytesWritableMapper.IGNORE_INVALID_ROW_CONFKEY, ignoreInvalidRows); - conf.set(FormatToBytesWritableMapper.TABLE_NAME_CONFKEY, - SchemaUtil.getEscapedFullTableName(qualifiedTableName)); - // give subclasses their hook - configureOptions(cmdLine, importColumns, conf); - String sName = SchemaUtil.normalizeIdentifier(schemaName); - String tName = SchemaUtil.normalizeIdentifier(tableName); - - String tn = SchemaUtil.getEscapedTableName(sName, tName); - ResultSet rsempty = conn.createStatement().executeQuery("SELECT * FROM " + tn + " LIMIT 1"); - boolean tableNotEmpty = rsempty.next(); - rsempty.close(); - - try { - validateTable(conn, sName, tName); - } finally { - conn.close(); - } - final String inputPaths = cmdLine.getOptionValue(INPUT_PATH_OPT.getOpt()); final Path outputPath; - if (cmdLine.hasOption(OUTPUT_PATH_OPT.getOpt())) { - outputPath = new Path(cmdLine.getOptionValue(OUTPUT_PATH_OPT.getOpt())); - } else { - outputPath = new Path("/tmp/" + UUID.randomUUID()); - } - List<TargetTableRef> tablesToBeLoaded = new ArrayList<TargetTableRef>(); - PTable table = conn.unwrap(PhoenixConnection.class).getTable(qualifiedTableName); - tablesToBeLoaded.add(new TargetTableRef(qualifiedTableName, table.getPhysicalName().getString())); boolean hasLocalIndexes = false; - boolean hasGlobalIndexes = false; - for(PTable index: table.getIndexes()) { - if (index.getIndexType() == IndexType.LOCAL) { - hasLocalIndexes = - qualifiedIndexTableName == null ? true : index.getTableName().getString() - .equals(qualifiedIndexTableName); - if (hasLocalIndexes && hasGlobalIndexes) break; + + try (Connection conn = QueryUtil.getConnection(conf)) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Reading columns from {} :: {}", ((PhoenixConnection) conn).getURL(), + qualifiedTableName); + } + List<ColumnInfo> importColumns = buildImportColumns(conn, cmdLine, qualifiedTableName); + Preconditions.checkNotNull(importColumns); + Preconditions.checkArgument(!importColumns.isEmpty(), "Column info list is empty"); + FormatToBytesWritableMapper.configureColumnInfoList(conf, importColumns); + boolean ignoreInvalidRows = cmdLine.hasOption(IGNORE_ERRORS_OPT.getOpt()); + conf.setBoolean(FormatToBytesWritableMapper.IGNORE_INVALID_ROW_CONFKEY, + ignoreInvalidRows); + conf.set(FormatToBytesWritableMapper.TABLE_NAME_CONFKEY, + SchemaUtil.getEscapedFullTableName(qualifiedTableName)); + // give subclasses their hook + configureOptions(cmdLine, importColumns, conf); + String sName = SchemaUtil.normalizeIdentifier(schemaName); + String tName = SchemaUtil.normalizeIdentifier(tableName); + + String tn = SchemaUtil.getEscapedTableName(sName, tName); + ResultSet rsempty = + conn.createStatement().executeQuery("SELECT * FROM " + tn + " LIMIT 1"); + boolean tableNotEmpty = rsempty.next(); + rsempty.close(); + + try { + validateTable(conn, sName, tName); + } finally { + conn.close(); } - if (IndexUtil.isGlobalIndex(index)) { - hasGlobalIndexes = true; - if (hasLocalIndexes && hasGlobalIndexes) break; + + if (cmdLine.hasOption(OUTPUT_PATH_OPT.getOpt())) { + outputPath = new Path(cmdLine.getOptionValue(OUTPUT_PATH_OPT.getOpt())); + } else { + outputPath = new Path("/tmp/" + UUID.randomUUID()); } - } - if(hasGlobalIndexes && tableNotEmpty && !cmdLine.hasOption(ENABLE_CORRUPT_INDEXES.getOpt())){ - throw new IllegalStateException("Bulk Loading error: Bulk loading is disabled for non" + - " empty tables with global indexes, because it will corrupt the global index table in most cases.\n" + - "Use the --corruptindexes option to override this check."); - } + PTable table = PhoenixRuntime.getTable(conn, qualifiedTableName); + tablesToBeLoaded.add( + new TargetTableRef(qualifiedTableName, table.getPhysicalName().getString())); + boolean hasGlobalIndexes = false; + for (PTable index : table.getIndexes()) { + if (index.getIndexType() == IndexType.LOCAL) { + hasLocalIndexes = + qualifiedIndexTableName == null ? true + : index.getTableName().getString() + .equals(qualifiedIndexTableName); + if (hasLocalIndexes && hasGlobalIndexes) { + break; + } + } + if (IndexUtil.isGlobalIndex(index)) { + hasGlobalIndexes = true; + if (hasLocalIndexes && hasGlobalIndexes) { + break; + } + } + } - // using conn after it's been closed... o.O - tablesToBeLoaded.addAll(getIndexTables(conn, qualifiedTableName)); + if (hasGlobalIndexes && tableNotEmpty + && !cmdLine.hasOption(ENABLE_CORRUPT_INDEXES.getOpt())) { + throw new IllegalStateException( + "Bulk Loading error: Bulk loading is disabled for non" + + " empty tables with global indexes, because it will corrupt" + + " the global index table in most cases.\n" + + "Use the --corruptindexes option to override this check."); + } - // When loading a single index table, check index table name is correct - if (qualifiedIndexTableName != null){ - TargetTableRef targetIndexRef = null; - for (TargetTableRef tmpTable : tablesToBeLoaded){ - if (tmpTable.getLogicalName().compareToIgnoreCase(qualifiedIndexTableName) == 0) { - targetIndexRef = tmpTable; - break; + // using conn after it's been closed... o.O + tablesToBeLoaded.addAll(getIndexTables(conn, qualifiedTableName)); + + // When loading a single index table, check index table name is correct + if (qualifiedIndexTableName != null) { + TargetTableRef targetIndexRef = null; + for (TargetTableRef tmpTable : tablesToBeLoaded) { + if (tmpTable.getLogicalName() + .compareToIgnoreCase(qualifiedIndexTableName) == 0) { + targetIndexRef = tmpTable; + break; + } } + if (targetIndexRef == null) { + throw new IllegalStateException("Bulk Loader error: index table " + + qualifiedIndexTableName + " doesn't exist"); + } + tablesToBeLoaded.clear(); + tablesToBeLoaded.add(targetIndexRef); } - if (targetIndexRef == null){ - throw new IllegalStateException("Bulk Loader error: index table " + - qualifiedIndexTableName + " doesn't exist"); - } - tablesToBeLoaded.clear(); - tablesToBeLoaded.add(targetIndexRef); } return submitJob(conf, tableName, inputPaths, outputPath, tablesToBeLoaded, hasLocalIndexes); @@ -449,7 +468,7 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool { */ private List<TargetTableRef> getIndexTables(Connection conn, String qualifiedTableName) throws SQLException { - PTable table = conn.unwrap(PhoenixConnection.class).getTable(qualifiedTableName); + PTable table = PhoenixRuntime.getTable(conn, qualifiedTableName); List<TargetTableRef> indexTables = new ArrayList<TargetTableRef>(); for(PTable indexTable : table.getIndexes()){ indexTables.add(new TargetTableRef(indexTable.getName().getString(), indexTable diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/OrphanViewTool.java b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/OrphanViewTool.java index 2d91bce07e..736780ac6e 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/OrphanViewTool.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/OrphanViewTool.java @@ -432,6 +432,7 @@ public class OrphanViewTool extends Configured implements Tool { } } finally { if (newConn) { + // TODO can this be rewritten with try-with-resources ? tryClosingConnection(tenantConnection); } } @@ -949,6 +950,7 @@ public class OrphanViewTool extends Configured implements Tool { ExceptionUtils.getStackTrace(ex)); return -1; } finally { + // TODO use try-with-resources at least for the Connection ? closeConnectionAndFiles(connection); } } diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/index/IndexUpgradeTool.java b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/index/IndexUpgradeTool.java index 96e322154f..ed72a1e783 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/index/IndexUpgradeTool.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/index/IndexUpgradeTool.java @@ -206,9 +206,14 @@ public class IndexUpgradeTool extends Configured implements Tool { } catch (IllegalStateException e) { printHelpAndExit(e.getMessage(), getOptions()); } - initializeTool(cmdLine); - prepareToolSetup(); - executeTool(); + try { + initializeTool(cmdLine); + prepareToolSetup(); + executeTool(); + } catch (Exception e) { + e.printStackTrace(); + hasFailure = true; + } if (hasFailure) { return -1; } else { diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/schema/stats/UpdateStatisticsTool.java b/phoenix-core-server/src/main/java/org/apache/phoenix/schema/stats/UpdateStatisticsTool.java index ffe7ed5f5d..3d9837215b 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/schema/stats/UpdateStatisticsTool.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/schema/stats/UpdateStatisticsTool.java @@ -96,13 +96,18 @@ public class UpdateStatisticsTool extends Configured implements Tool { @Override public int run(String[] args) throws Exception { - parseArgs(args); - preJobTask(); - configureJob(); - TableMapReduceUtil.initCredentials(job); - int ret = runJob(); - postJobTask(); - return ret; + try { + parseArgs(args); + preJobTask(); + configureJob(); + TableMapReduceUtil.initCredentials(job); + int ret = runJob(); + postJobTask(); + return ret; + } catch (Exception e) { + e.printStackTrace(); + return -1; + } } /** diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/util/MergeViewIndexIdSequencesTool.java b/phoenix-core-server/src/main/java/org/apache/phoenix/util/MergeViewIndexIdSequencesTool.java index 0bb0fb8549..3eb62ea086 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/util/MergeViewIndexIdSequencesTool.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/util/MergeViewIndexIdSequencesTool.java @@ -91,25 +91,16 @@ public class MergeViewIndexIdSequencesTool extends Configured implements Tool { @Override public int run(String[] args) throws Exception { int status = 0; - PhoenixConnection conn = null; - try { - parseOptions(args); - - final Configuration config = HBaseConfiguration.addHbaseResources(getConf()); - - conn = ConnectionUtil.getInputConnection(config). - unwrap(PhoenixConnection.class); + parseOptions(args); + final Configuration config = HBaseConfiguration.addHbaseResources(getConf()); + try (PhoenixConnection conn = ConnectionUtil.getInputConnection(config). + unwrap(PhoenixConnection.class)) { UpgradeUtil.mergeViewIndexIdSequences(conn); - } catch (Exception e) { LOGGER.error("Get an error while running MergeViewIndexIdSequencesTool, " + e.getMessage()); status = 1; - } finally { - if (conn != null) { - conn.close(); - } } return status; } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java index 4d45568b9f..86acbe1df3 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java @@ -37,8 +37,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.mapred.FileAlreadyExistsException; import org.apache.phoenix.end2end.index.IndexTestUtil; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.mapreduce.CsvBulkLoadTool; @@ -151,12 +149,10 @@ public class CsvBulkLoadToolIT extends BaseOwnClusterIT { "--table", "table1", "--schema", "s", "--zookeeper", zkQuorum}); - fail("Bulk loading error should have happened earlier"); - } catch (Exception e){ - assertTrue(e.getMessage().contains("Bulk Loading error: Bulk loading is disabled for " + - "non empty tables with global indexes, because it will corrupt " + - "the global index table in most cases.\n" + - "Use the --corruptindexes option to override this check.")); + assertTrue("Bulk loading error should have happened earlier", exitCode != 0); + } catch (Exception e) { + fail("Tools should return non-zero exit codes on failure" + + " instead of throwing an exception"); } ResultSet rs = stmt.executeQuery("SELECT id, name, t FROM s.table1 ORDER BY id"); @@ -398,7 +394,7 @@ public class CsvBulkLoadToolIT extends BaseOwnClusterIT { + " (FIRST_NAME ASC)" + " INCLUDE (LAST_NAME)"; stmt.execute(ddl); - + FileSystem fs = FileSystem.get(getUtility().getConfiguration()); FSDataOutputStream outputStream = fs.create(new Path("/tmp/input3.csv")); PrintWriter printWriter = new PrintWriter(outputStream); @@ -598,17 +594,17 @@ public class CsvBulkLoadToolIT extends BaseOwnClusterIT { CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool(); csvBulkLoadTool.setConf(getUtility().getConfiguration()); try { - csvBulkLoadTool.run(new String[] { + int exitCode = csvBulkLoadTool.run(new String[] { "--input", "/tmp/input4.csv", "--table", tableName, "--zookeeper", zkQuorum }); - fail(String.format("Table %s not created, hence should fail",tableName)); + assertTrue(String.format("Table %s not created, hence should fail", tableName), + exitCode != 0); } catch (Exception ex) { - assertTrue(ex instanceof IllegalArgumentException); - assertTrue(ex.getMessage().contains(String.format("Table %s not found", tableName))); - } + fail("Tools should return non-zero exit codes on failure" + + " instead of throwing an exception"); } } - + @Test public void testAlreadyExistsOutputPath() { String tableName = "TABLE9"; @@ -617,7 +613,7 @@ public class CsvBulkLoadToolIT extends BaseOwnClusterIT { Statement stmt = conn.createStatement(); stmt.execute("CREATE TABLE " + tableName + "(ID INTEGER NOT NULL PRIMARY KEY, " + "FIRST_NAME VARCHAR, LAST_NAME VARCHAR)"); - + FileSystem fs = FileSystem.get(getUtility().getConfiguration()); fs.create(new Path(outputPath)); FSDataOutputStream outputStream = fs.create(new Path("/tmp/input9.csv")); @@ -625,18 +621,21 @@ public class CsvBulkLoadToolIT extends BaseOwnClusterIT { printWriter.println("1,FirstName 1,LastName 1"); printWriter.println("2,FirstName 2,LastName 2"); printWriter.close(); - + CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool(); csvBulkLoadTool.setConf(getUtility().getConfiguration()); - csvBulkLoadTool.run(new String[] { + int exitCode = csvBulkLoadTool.run(new String[] { "--input", "/tmp/input9.csv", "--output", outputPath, "--table", tableName, "--zookeeper", zkQuorum }); - - fail(String.format("Output path %s already exists. hence, should fail",outputPath)); + + assertTrue( + String.format("Output path %s already exists. hence, should fail", outputPath), + exitCode != 0); } catch (Exception ex) { - assertTrue(ex instanceof FileAlreadyExistsException); + fail("Tools should return non-zero exit codes when fail," + + " instead of throwing an exception"); } } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/RegexBulkLoadToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/RegexBulkLoadToolIT.java index 5c66e125f7..e96f97cd32 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/RegexBulkLoadToolIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/RegexBulkLoadToolIT.java @@ -33,7 +33,6 @@ import java.sql.Statement; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.FileAlreadyExistsException; import org.apache.phoenix.mapreduce.RegexBulkLoadTool; import org.apache.phoenix.util.DateUtil; import org.apache.phoenix.util.PhoenixRuntime; @@ -305,15 +304,16 @@ public class RegexBulkLoadToolIT extends BaseOwnClusterIT { RegexBulkLoadTool regexBulkLoadTool = new RegexBulkLoadTool(); regexBulkLoadTool.setConf(getUtility().getConfiguration()); try { - regexBulkLoadTool.run(new String[] { + int exitCode = regexBulkLoadTool.run(new String[] { "--input", "/tmp/input4.csv", "--table", tableName, "--regex", "([^,]*),([^,]*),([^,]*)", "--zookeeper", zkQuorum }); - fail(String.format("Table %s not created, hence should fail",tableName)); + assertTrue(String.format("Table %s not created, hence should fail", tableName), + exitCode != 0); } catch (Exception ex) { - assertTrue(ex instanceof IllegalArgumentException); - assertTrue(ex.getMessage().contains(String.format("Table %s not found", tableName))); + fail("Tools should return non-zero exit codes on failure" + + " instead of throwing an exception"); } } @@ -336,16 +336,18 @@ public class RegexBulkLoadToolIT extends BaseOwnClusterIT { RegexBulkLoadTool regexBulkLoadTool = new RegexBulkLoadTool(); regexBulkLoadTool.setConf(getUtility().getConfiguration()); - regexBulkLoadTool.run(new String[] { + int exitCode = regexBulkLoadTool.run(new String[] { "--input", "/tmp/input9.csv", "--output", outputPath, "--table", tableName, "--regex", "([^,]*),([^,]*),([^,]*)", "--zookeeper", zkQuorum }); - - fail(String.format("Output path %s already exists. hence, should fail",outputPath)); + assertTrue( + String.format("Output path %s already exists. hence, should fail", outputPath), + exitCode != 0); } catch (Exception ex) { - assertTrue(ex instanceof FileAlreadyExistsException); + fail("Tools should return non-zero exit codes on failure" + + " instead of throwing an exception"); } }