Repository: beam Updated Branches: refs/heads/master 2d25b6840 -> 28c6fd42e
Fix minor issues on HCatalogIO - Restrict access level when possible - Rename Filter to Partition for the write to be coherent with the HCatalog API - Improve test coverage - Fix documentation details - Implement TearDown method for the writer Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c11f0ff5 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c11f0ff5 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c11f0ff5 Branch: refs/heads/master Commit: c11f0ff57efca5786fb5da20006d9eb96b44cffe Parents: 2d25b68 Author: Ismaël MejÃa <ieme...@apache.org> Authored: Fri Jun 9 00:01:55 2017 +0200 Committer: Ismaël MejÃa <ieme...@apache.org> Committed: Wed Jun 21 21:58:14 2017 +0200 ---------------------------------------------------------------------- .../apache/beam/sdk/io/hcatalog/HCatalogIO.java | 113 ++++++++----------- .../io/hcatalog/EmbeddedMetastoreService.java | 3 +- .../beam/sdk/io/hcatalog/HCatalogIOTest.java | 54 +++++---- .../sdk/io/hcatalog/HCatalogIOTestUtils.java | 22 ++-- 4 files changed, 90 insertions(+), 102 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/c11f0ff5/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java index 07b56e3..1549dab 100644 --- a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java +++ b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java @@ -78,11 +78,10 @@ import org.slf4j.LoggerFactory; * * pipeline * .apply(HCatalogIO.read() - * .withConfigProperties(configProperties) //mandatory - * .withTable("employee") //mandatory + * .withConfigProperties(configProperties) * .withDatabase("default") //optional, assumes default if none specified - * .withFilter(filterString) //optional, - * should be specified if the table is partitioned + * .withTable("employee") + * .withFilter(filterString) //optional, may be specified if the table is partitioned * }</pre> * * <h3>Writing using HCatalog</h3> @@ -100,13 +99,11 @@ import org.slf4j.LoggerFactory; * pipeline * .apply(...) * .apply(HiveIO.write() - * .withConfigProperties(configProperties) //mandatory - * .withTable("employee") //mandatory + * .withConfigProperties(configProperties) * .withDatabase("default") //optional, assumes default if none specified - * .withFilter(partitionValues) //optional, - * should be specified if the table is partitioned - * .withBatchSize(1024L)) //optional, - * assumes a default batch size of 1024 if none specified + * .withTable("employee") + * .withPartition(partitionValues) //optional, may be specified if the table is partitioned + * .withBatchSize(1024L)) //optional, assumes a default batch size of 1024 if none specified * }</pre> */ @Experimental @@ -114,14 +111,17 @@ public class HCatalogIO { private static final Logger LOG = LoggerFactory.getLogger(HCatalogIO.class); + private static final long BATCH_SIZE = 1024L; + private static final String DEFAULT_DATABASE = "default"; + /** Write data to Hive. */ public static Write write() { - return new AutoValue_HCatalogIO_Write.Builder().setBatchSize(1024L).build(); + return new AutoValue_HCatalogIO_Write.Builder().setBatchSize(BATCH_SIZE).build(); } /** Read data from Hive. */ public static Read read() { - return new AutoValue_HCatalogIO_Read.Builder().setDatabase("default").build(); + return new AutoValue_HCatalogIO_Read.Builder().setDatabase(DEFAULT_DATABASE).build(); } private HCatalogIO() {} @@ -130,44 +130,26 @@ public class HCatalogIO { @VisibleForTesting @AutoValue public abstract static class Read extends PTransform<PBegin, PCollection<HCatRecord>> { - @Nullable - abstract Map<String, String> getConfigProperties(); - - @Nullable - abstract String getDatabase(); - - @Nullable - abstract String getTable(); - - @Nullable - abstract String getFilter(); - - @Nullable - abstract ReaderContext getContext(); - - @Nullable - abstract Integer getSplitId(); - + @Nullable abstract Map<String, String> getConfigProperties(); + @Nullable abstract String getDatabase(); + @Nullable abstract String getTable(); + @Nullable abstract String getFilter(); + @Nullable abstract ReaderContext getContext(); + @Nullable abstract Integer getSplitId(); abstract Builder toBuilder(); @AutoValue.Builder abstract static class Builder { abstract Builder setConfigProperties(Map<String, String> configProperties); - abstract Builder setDatabase(String database); - abstract Builder setTable(String table); - abstract Builder setFilter(String filter); - abstract Builder setSplitId(Integer splitId); - abstract Builder setContext(ReaderContext context); - abstract Read build(); } - /** Sets the configuration properties like metastore URI. This is mandatory */ + /** Sets the configuration properties like metastore URI. */ public Read withConfigProperties(Map<String, String> configProperties) { return toBuilder().setConfigProperties(new HashMap<>(configProperties)).build(); } @@ -177,12 +159,12 @@ public class HCatalogIO { return toBuilder().setDatabase(database).build(); } - /** Sets the table name to read from. This is mandatory */ + /** Sets the table name to read from. */ public Read withTable(String table) { return toBuilder().setTable(table).build(); } - /** Sets the filter (partition) details. This is optional, assumes none if not specified */ + /** Sets the filter details. This is optional, assumes none if not specified */ public Read withFilter(String filter) { return toBuilder().setFilter(filter).build(); } @@ -220,7 +202,7 @@ public class HCatalogIO { /** A HCatalog {@link BoundedSource} reading {@link HCatRecord} from a given instance. */ @VisibleForTesting static class BoundedHCatalogSource extends BoundedSource<HCatRecord> { - private Read spec; + private final Read spec; BoundedHCatalogSource(Read spec) { this.spec = spec; @@ -367,38 +349,24 @@ public class HCatalogIO { /** A {@link PTransform} to write to a HCatalog managed source. */ @AutoValue public abstract static class Write extends PTransform<PCollection<HCatRecord>, PDone> { - @Nullable - abstract Map<String, String> getConfigProperties(); - - @Nullable - abstract String getDatabase(); - - @Nullable - abstract String getTable(); - - @Nullable - abstract Map getFilter(); - + @Nullable abstract Map<String, String> getConfigProperties(); + @Nullable abstract String getDatabase(); + @Nullable abstract String getTable(); + @Nullable abstract Map<String, String> getPartition(); abstract long getBatchSize(); - abstract Builder toBuilder(); @AutoValue.Builder abstract static class Builder { abstract Builder setConfigProperties(Map<String, String> configProperties); - abstract Builder setDatabase(String database); - abstract Builder setTable(String table); - - abstract Builder setFilter(Map partition); - + abstract Builder setPartition(Map<String, String> partition); abstract Builder setBatchSize(long batchSize); - abstract Write build(); } - /** Sets the configuration properties like metastore URI. This is mandatory */ + /** Sets the configuration properties like metastore URI. */ public Write withConfigProperties(Map<String, String> configProperties) { return toBuilder().setConfigProperties(new HashMap<>(configProperties)).build(); } @@ -408,14 +376,14 @@ public class HCatalogIO { return toBuilder().setDatabase(database).build(); } - /** Sets the table name to write to, the table should exist beforehand. This is mandatory */ + /** Sets the table name to write to, the table should exist beforehand. */ public Write withTable(String table) { return toBuilder().setTable(table).build(); } - /** Sets the filter (partition) details. This is required if the table is partitioned */ - public Write withFilter(Map filter) { - return toBuilder().setFilter(filter).build(); + /** Sets the partition details. */ + public Write withPartition(Map<String, String> partition) { + return toBuilder().setPartition(partition).build(); } /** @@ -454,7 +422,7 @@ public class HCatalogIO { super.populateDisplayData(builder); builder.addIfNotNull(DisplayData.item("database", spec.getDatabase())); builder.add(DisplayData.item("table", spec.getTable())); - builder.addIfNotNull(DisplayData.item("filter", String.valueOf(spec.getFilter()))); + builder.addIfNotNull(DisplayData.item("partition", String.valueOf(spec.getPartition()))); builder.add(DisplayData.item("configProperties", spec.getConfigProperties().toString())); builder.add(DisplayData.item("batchSize", spec.getBatchSize())); } @@ -465,7 +433,7 @@ public class HCatalogIO { new WriteEntity.Builder() .withDatabase(spec.getDatabase()) .withTable(spec.getTable()) - .withPartition(spec.getFilter()) + .withPartition(spec.getPartition()) .build(); masterWriter = DataTransferFactory.getHCatWriter(entity, spec.getConfigProperties()); writerContext = masterWriter.prepareWrite(); @@ -506,6 +474,19 @@ public class HCatalogIO { hCatRecordsBatch.clear(); } } + + @Teardown + public void tearDown() throws Exception { + if (slaveWriter != null) { + slaveWriter = null; + } + if (masterWriter != null) { + masterWriter = null; + } + if (writerContext != null) { + writerContext = null; + } + } } } } http://git-wip-us.apache.org/repos/asf/beam/blob/c11f0ff5/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/EmbeddedMetastoreService.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/EmbeddedMetastoreService.java b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/EmbeddedMetastoreService.java index 5792bf6..31e5b1c 100644 --- a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/EmbeddedMetastoreService.java +++ b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/EmbeddedMetastoreService.java @@ -35,7 +35,7 @@ import org.apache.hadoop.hive.ql.session.SessionState; * https://github.com/apache/hive/blob/master/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce * /HCatBaseTest.java </a> */ -public final class EmbeddedMetastoreService implements AutoCloseable { +final class EmbeddedMetastoreService implements AutoCloseable { private final Driver driver; private final HiveConf hiveConf; private final SessionState sessionState; @@ -57,7 +57,6 @@ public final class EmbeddedMetastoreService implements AutoCloseable { hiveConf.setVar(HiveConf.ConfVars.POSTEXECHOOKS, ""); hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false); hiveConf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE, testWarehouseDirPath); - hiveConf.setVar(HiveConf.ConfVars.HIVEMAPREDMODE, "nonstrict"); hiveConf.setBoolVar(HiveConf.ConfVars.HIVEOPTIMIZEMETADATAQUERIES, true); hiveConf.setVar( HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, http://git-wip-us.apache.org/repos/asf/beam/blob/c11f0ff5/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTest.java b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTest.java index 49c538f..91671a5 100644 --- a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTest.java +++ b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTest.java @@ -17,8 +17,10 @@ */ package org.apache.beam.sdk.io.hcatalog; +import static org.apache.beam.sdk.io.hcatalog.HCatalogIOTestUtils.TEST_DATABASE; +import static org.apache.beam.sdk.io.hcatalog.HCatalogIOTestUtils.TEST_FILTER; import static org.apache.beam.sdk.io.hcatalog.HCatalogIOTestUtils.TEST_RECORDS_COUNT; -import static org.apache.beam.sdk.io.hcatalog.HCatalogIOTestUtils.TEST_TABLE_NAME; +import static org.apache.beam.sdk.io.hcatalog.HCatalogIOTestUtils.TEST_TABLE; import static org.apache.beam.sdk.io.hcatalog.HCatalogIOTestUtils.getConfigPropertiesAsMap; import static org.apache.beam.sdk.io.hcatalog.HCatalogIOTestUtils.getExpectedRecords; import static org.apache.beam.sdk.io.hcatalog.HCatalogIOTestUtils.getHCatRecords; @@ -69,7 +71,7 @@ import org.junit.runners.model.Statement; /** Test for HCatalogIO. */ public class HCatalogIOTest implements Serializable { - public static final PipelineOptions OPTIONS = PipelineOptionsFactory.create(); + private static final PipelineOptions OPTIONS = PipelineOptionsFactory.create(); @ClassRule public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder(); @@ -103,12 +105,12 @@ public class HCatalogIOTest implements Serializable { /** Use this annotation to setup complete test data(table populated with records). */ @Retention(RetentionPolicy.RUNTIME) @Target({ElementType.METHOD}) - @interface NeedsTestData {} + private @interface NeedsTestData {} /** Use this annotation to setup test tables alone(empty tables, no records are populated). */ @Retention(RetentionPolicy.RUNTIME) @Target({ElementType.METHOD}) - @interface NeedsEmptyTestTables {} + private @interface NeedsEmptyTestTables {} @BeforeClass public static void setupEmbeddedMetastoreService () throws IOException { @@ -117,7 +119,7 @@ public class HCatalogIOTest implements Serializable { @AfterClass public static void shutdownEmbeddedMetastoreService () throws Exception { - service.executeQuery("drop table " + TEST_TABLE_NAME); + service.executeQuery("drop table " + TEST_TABLE); service.close(); } @@ -130,23 +132,27 @@ public class HCatalogIOTest implements Serializable { .apply( HCatalogIO.write() .withConfigProperties(getConfigPropertiesAsMap(service.getHiveConf())) - .withTable(TEST_TABLE_NAME)); + .withDatabase(TEST_DATABASE) + .withTable(TEST_TABLE) + .withPartition(new java.util.HashMap<String, String>()) + .withBatchSize(512L)); defaultPipeline.run(); - PCollection<String> output = - readAfterWritePipeline - .apply( - HCatalogIO.read() - .withConfigProperties(getConfigPropertiesAsMap(service.getHiveConf())) - .withTable(HCatalogIOTestUtils.TEST_TABLE_NAME)) - .apply( - ParDo.of( - new DoFn<HCatRecord, String>() { - @ProcessElement - public void processElement(ProcessContext c) { - c.output(c.element().get(0).toString()); - } - })); + PCollection<String> output = readAfterWritePipeline + .apply( + HCatalogIO.read() + .withConfigProperties(getConfigPropertiesAsMap(service.getHiveConf())) + .withDatabase(TEST_DATABASE) + .withTable(TEST_TABLE) + .withFilter(TEST_FILTER)) + .apply( + ParDo.of( + new DoFn<HCatRecord, String>() { + @ProcessElement + public void processElement(ProcessContext c) { + c.output(c.element().get(0).toString()); + } + })); PAssert.that(output).containsInAnyOrder(getExpectedRecords(TEST_RECORDS_COUNT)); readAfterWritePipeline.run(); } @@ -222,7 +228,7 @@ public class HCatalogIOTest implements Serializable { HCatalogIO.read() .withConfigProperties(getConfigPropertiesAsMap(service.getHiveConf())) .withContext(context) - .withTable(TEST_TABLE_NAME); + .withTable(TEST_TABLE); List<String> records = new ArrayList<>(); for (int i = 0; i < context.numSplits(); i++) { @@ -246,7 +252,7 @@ public class HCatalogIOTest implements Serializable { HCatalogIO.read() .withConfigProperties(getConfigPropertiesAsMap(service.getHiveConf())) .withContext(context) - .withTable(TEST_TABLE_NAME); + .withTable(TEST_TABLE); BoundedHCatalogSource source = new BoundedHCatalogSource(spec); List<BoundedSource<HCatRecord>> unSplitSource = source.split(-1, OPTIONS); @@ -260,8 +266,8 @@ public class HCatalogIOTest implements Serializable { } private void reCreateTestTable() throws CommandNeedRetryException { - service.executeQuery("drop table " + TEST_TABLE_NAME); - service.executeQuery("create table " + TEST_TABLE_NAME + "(mycol1 string, mycol2 int)"); + service.executeQuery("drop table " + TEST_TABLE); + service.executeQuery("create table " + TEST_TABLE + "(mycol1 string, mycol2 int)"); } private void prepareTestData() throws Exception { http://git-wip-us.apache.org/repos/asf/beam/blob/c11f0ff5/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTestUtils.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTestUtils.java b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTestUtils.java index f66e0bc..ae1eb50 100644 --- a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTestUtils.java +++ b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTestUtils.java @@ -35,15 +35,16 @@ import org.apache.hive.hcatalog.data.transfer.WriteEntity; import org.apache.hive.hcatalog.data.transfer.WriterContext; /** Utility class for HCatalogIOTest. */ -public class HCatalogIOTestUtils { - public static final String TEST_TABLE_NAME = "mytable"; - - public static final int TEST_RECORDS_COUNT = 1000; +class HCatalogIOTestUtils { + static final String TEST_DATABASE = "default"; + static final String TEST_TABLE = "mytable"; + static final String TEST_FILTER = "myfilter"; + static final int TEST_RECORDS_COUNT = 1000; private static final ReadEntity READ_ENTITY = - new ReadEntity.Builder().withTable(TEST_TABLE_NAME).build(); + new ReadEntity.Builder().withTable(TEST_TABLE).build(); private static final WriteEntity WRITE_ENTITY = - new WriteEntity.Builder().withTable(TEST_TABLE_NAME).build(); + new WriteEntity.Builder().withTable(TEST_TABLE).build(); /** Returns a ReaderContext instance for the passed datastore config params. */ static ReaderContext getReaderContext(Map<String, String> config) throws HCatException { @@ -51,17 +52,18 @@ public class HCatalogIOTestUtils { } /** Returns a WriterContext instance for the passed datastore config params. */ - static WriterContext getWriterContext(Map<String, String> config) throws HCatException { + private static WriterContext getWriterContext(Map<String, String> config) throws HCatException { return DataTransferFactory.getHCatWriter(WRITE_ENTITY, config).prepareWrite(); } /** Writes records to the table using the passed WriterContext. */ - static void writeRecords(WriterContext context) throws HCatException { + private static void writeRecords(WriterContext context) throws HCatException { DataTransferFactory.getHCatWriter(context).write(getHCatRecords(TEST_RECORDS_COUNT).iterator()); } /** Commits the pending writes to the database. */ - static void commitRecords(Map<String, String> config, WriterContext context) throws IOException { + private static void commitRecords(Map<String, String> config, WriterContext context) + throws IOException { DataTransferFactory.getHCatWriter(WRITE_ENTITY, config).commit(context); } @@ -100,7 +102,7 @@ public class HCatalogIOTestUtils { } /** returns a DefaultHCatRecord instance for passed value. */ - static DefaultHCatRecord toHCatRecord(int value) { + private static DefaultHCatRecord toHCatRecord(int value) { return new DefaultHCatRecord(Arrays.<Object>asList("record " + value, value)); } }