Fix code style issues for HBaseIO
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e5bdedd2 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e5bdedd2 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e5bdedd2 Branch: refs/heads/master Commit: e5bdedd23208e484f6852eda44c59fb873645e8f Parents: cdf050c Author: Ismaël MejÃa <ieme...@gmail.com> Authored: Fri Aug 25 10:43:17 2017 +0200 Committer: Ismaël MejÃa <ieme...@gmail.com> Committed: Fri Aug 25 23:52:22 2017 +0200 ---------------------------------------------------------------------- .../io/hbase/HBaseCoderProviderRegistrar.java | 8 +- .../org/apache/beam/sdk/io/hbase/HBaseIO.java | 1090 +++++++++--------- .../beam/sdk/io/hbase/HBaseMutationCoder.java | 27 +- .../beam/sdk/io/hbase/HBaseResultCoder.java | 6 +- .../beam/sdk/io/hbase/SerializableScan.java | 37 +- .../hbase/HBaseCoderProviderRegistrarTest.java | 4 +- .../apache/beam/sdk/io/hbase/HBaseIOTest.java | 814 +++++++------ .../sdk/io/hbase/HBaseMutationCoderTest.java | 4 +- .../beam/sdk/io/hbase/HBaseResultCoderTest.java | 4 +- .../beam/sdk/io/hbase/SerializableScanTest.java | 6 +- 10 files changed, 987 insertions(+), 1013 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/e5bdedd2/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrar.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrar.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrar.java index 2973d1b..f836ebe 100644 --- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrar.java +++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrar.java @@ -26,15 +26,13 @@ import org.apache.beam.sdk.coders.CoderProviders; import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.hadoop.hbase.client.Result; -/** - * A {@link CoderProviderRegistrar} for standard types used with {@link HBaseIO}. - */ +/** A {@link CoderProviderRegistrar} for standard types used with {@link HBaseIO}. */ @AutoService(CoderProviderRegistrar.class) public class HBaseCoderProviderRegistrar implements CoderProviderRegistrar { @Override public List<CoderProvider> getCoderProviders() { return ImmutableList.of( - HBaseMutationCoder.getCoderProvider(), - CoderProviders.forCoder(TypeDescriptor.of(Result.class), HBaseResultCoder.of())); + HBaseMutationCoder.getCoderProvider(), + CoderProviders.forCoder(TypeDescriptor.of(Result.class), HBaseResultCoder.of())); } } http://git-wip-us.apache.org/repos/asf/beam/blob/e5bdedd2/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java index 7f58cef..41ced93 100644 --- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java +++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java @@ -71,19 +71,19 @@ import org.slf4j.LoggerFactory; /** * A bounded source and sink for HBase. * - * <p>For more information, see the online documentation at - * <a href="https://hbase.apache.org/">HBase</a>. + * <p>For more information, see the online documentation at <a + * href="https://hbase.apache.org/">HBase</a>. * * <h3>Reading from HBase</h3> * - * <p>The HBase source returns a set of rows from a single table, returning a - * {@code PCollection<Result>}. + * <p>The HBase source returns a set of rows from a single table, returning a {@code + * PCollection<Result>}. * - * <p>To configure a HBase source, you must supply a table id and a {@link Configuration} - * to identify the HBase instance. By default, {@link HBaseIO.Read} will read all rows in the - * table. The row range to be read can optionally be restricted using with a {@link Scan} object - * or using the {@link HBaseIO.Read#withKeyRange}, and a {@link Filter} using - * {@link HBaseIO.Read#withFilter}, for example: + * <p>To configure a HBase source, you must supply a table id and a {@link Configuration} to + * identify the HBase instance. By default, {@link HBaseIO.Read} will read all rows in the table. + * The row range to be read can optionally be restricted using with a {@link Scan} object or using + * the {@link HBaseIO.Read#withKeyRange}, and a {@link Filter} using {@link + * HBaseIO.Read#withFilter}, for example: * * <pre>{@code * // Scan the entire table. @@ -118,12 +118,12 @@ import org.slf4j.LoggerFactory; * * <h3>Writing to HBase</h3> * - * <p>The HBase sink executes a set of row mutations on a single table. It takes as input a - * {@link PCollection PCollection<Mutation>}, where each {@link Mutation} represents an - * idempotent transformation on a row. + * <p>The HBase sink executes a set of row mutations on a single table. It takes as input a {@link + * PCollection PCollection<Mutation>}, where each {@link Mutation} represents an idempotent + * transformation on a row. * - * <p>To configure a HBase sink, you must supply a table id and a {@link Configuration} - * to identify the HBase instance, for example: + * <p>To configure a HBase sink, you must supply a table id and a {@link Configuration} to identify + * the HBase instance, for example: * * <pre>{@code * Configuration configuration = ...; @@ -137,605 +137,605 @@ import org.slf4j.LoggerFactory; * * <h3>Experimental</h3> * - * <p>The design of the API for HBaseIO is currently related to the BigtableIO one, - * it can evolve or be different in some aspects, but the idea is that users can easily migrate - * from one to the other</p>. + * <p>The design of the API for HBaseIO is currently related to the BigtableIO one, it can evolve or + * be different in some aspects, but the idea is that users can easily migrate from one to the other + * . */ @Experimental(Experimental.Kind.SOURCE_SINK) public class HBaseIO { - private static final Logger LOG = LoggerFactory.getLogger(HBaseIO.class); - - /** Disallow construction of utility class. */ - private HBaseIO() { + private static final Logger LOG = LoggerFactory.getLogger(HBaseIO.class); + + /** Disallow construction of utility class. */ + private HBaseIO() {} + + /** + * Creates an uninitialized {@link HBaseIO.Read}. Before use, the {@code Read} must be initialized + * with a {@link HBaseIO.Read#withConfiguration(Configuration)} that specifies the HBase instance, + * and a {@link HBaseIO.Read#withTableId tableId} that specifies which table to read. A {@link + * Filter} may also optionally be specified using {@link HBaseIO.Read#withFilter}. + */ + @Experimental + public static Read read() { + return new Read(null, "", new SerializableScan(new Scan())); + } + + /** + * A {@link PTransform} that reads from HBase. See the class-level Javadoc on {@link HBaseIO} for + * more information. + * + * @see HBaseIO + */ + public static class Read extends PTransform<PBegin, PCollection<Result>> { + /** + * Returns a new {@link HBaseIO.Read} that will read from the HBase instance indicated by the + * given configuration. + */ + public Read withConfiguration(Configuration configuration) { + checkNotNull(configuration, "conf"); + return new Read(new SerializableConfiguration(configuration), tableId, serializableScan); } /** - * Creates an uninitialized {@link HBaseIO.Read}. Before use, the {@code Read} must be - * initialized with a - * {@link HBaseIO.Read#withConfiguration(Configuration)} that specifies - * the HBase instance, and a {@link HBaseIO.Read#withTableId tableId} that - * specifies which table to read. A {@link Filter} may also optionally be specified using - * {@link HBaseIO.Read#withFilter}. + * Returns a new {@link HBaseIO.Read} that will read from the specified table. + * + * <p>Does not modify this object. */ - @Experimental - public static Read read() { - return new Read(null, "", new SerializableScan(new Scan())); + public Read withTableId(String tableId) { + checkNotNull(tableId, "tableId"); + return new Read(serializableConfiguration, tableId, serializableScan); } /** - * A {@link PTransform} that reads from HBase. See the class-level Javadoc on - * {@link HBaseIO} for more information. + * Returns a new {@link HBaseIO.Read} that will filter the rows read from HBase using the given + * scan. * - * @see HBaseIO + * <p>Does not modify this object. */ - public static class Read extends PTransform<PBegin, PCollection<Result>> { - /** - * Returns a new {@link HBaseIO.Read} that will read from the HBase instance - * indicated by the given configuration. - */ - public Read withConfiguration(Configuration configuration) { - checkNotNull(configuration, "conf"); - return new Read(new SerializableConfiguration(configuration), - tableId, serializableScan); - } + public Read withScan(Scan scan) { + checkNotNull(scan, "scan"); + return new Read(serializableConfiguration, tableId, new SerializableScan(scan)); + } - /** - * Returns a new {@link HBaseIO.Read} that will read from the specified table. - * - * <p>Does not modify this object. - */ - public Read withTableId(String tableId) { - checkNotNull(tableId, "tableId"); - return new Read(serializableConfiguration, tableId, serializableScan); - } + /** + * Returns a new {@link HBaseIO.Read} that will filter the rows read from HBase using the given + * row filter. + * + * <p>Does not modify this object. + */ + public Read withFilter(Filter filter) { + checkNotNull(filter, "filter"); + return withScan(serializableScan.get().setFilter(filter)); + } - /** - * Returns a new {@link HBaseIO.Read} that will filter the rows read from HBase - * using the given scan. - * - * <p>Does not modify this object. - */ - public Read withScan(Scan scan) { - checkNotNull(scan, "scan"); - return new Read(serializableConfiguration, tableId, new SerializableScan(scan)); - } + /** + * Returns a new {@link HBaseIO.Read} that will read only rows in the specified range. + * + * <p>Does not modify this object. + */ + public Read withKeyRange(ByteKeyRange keyRange) { + checkNotNull(keyRange, "keyRange"); + byte[] startRow = keyRange.getStartKey().getBytes(); + byte[] stopRow = keyRange.getEndKey().getBytes(); + return withScan(serializableScan.get().setStartRow(startRow).setStopRow(stopRow)); + } - /** - * Returns a new {@link HBaseIO.Read} that will filter the rows read from HBase - * using the given row filter. - * - * <p>Does not modify this object. - */ - public Read withFilter(Filter filter) { - checkNotNull(filter, "filter"); - return withScan(serializableScan.get().setFilter(filter)); - } + /** + * Returns a new {@link HBaseIO.Read} that will read only rows in the specified range. + * + * <p>Does not modify this object. + */ + public Read withKeyRange(byte[] startRow, byte[] stopRow) { + checkNotNull(startRow, "startRow"); + checkNotNull(stopRow, "stopRow"); + ByteKeyRange keyRange = + ByteKeyRange.of(ByteKey.copyFrom(startRow), ByteKey.copyFrom(stopRow)); + return withKeyRange(keyRange); + } - /** - * Returns a new {@link HBaseIO.Read} that will read only rows in the specified range. - * - * <p>Does not modify this object. - */ - public Read withKeyRange(ByteKeyRange keyRange) { - checkNotNull(keyRange, "keyRange"); - byte[] startRow = keyRange.getStartKey().getBytes(); - byte[] stopRow = keyRange.getEndKey().getBytes(); - return withScan(serializableScan.get().setStartRow(startRow).setStopRow(stopRow)); - } + private Read( + SerializableConfiguration serializableConfiguration, + String tableId, + SerializableScan serializableScan) { + this.serializableConfiguration = serializableConfiguration; + this.tableId = tableId; + this.serializableScan = serializableScan; + } - /** - * Returns a new {@link HBaseIO.Read} that will read only rows in the specified range. - * - * <p>Does not modify this object. - */ - public Read withKeyRange(byte[] startRow, byte[] stopRow) { - checkNotNull(startRow, "startRow"); - checkNotNull(stopRow, "stopRow"); - ByteKeyRange keyRange = - ByteKeyRange.of(ByteKey.copyFrom(startRow), ByteKey.copyFrom(stopRow)); - return withKeyRange(keyRange); - } + @Override + public PCollection<Result> expand(PBegin input) { + HBaseSource source = new HBaseSource(this, null /* estimatedSizeBytes */); + return input.getPipeline().apply(org.apache.beam.sdk.io.Read.from(source)); + } - private Read(SerializableConfiguration serializableConfiguration, String tableId, - SerializableScan serializableScan) { - this.serializableConfiguration = serializableConfiguration; - this.tableId = tableId; - this.serializableScan = serializableScan; - } + @Override + public void validate(PipelineOptions options) { + checkArgument(serializableConfiguration != null, "Configuration not provided"); + checkArgument(!tableId.isEmpty(), "Table ID not specified"); + try (Connection connection = + ConnectionFactory.createConnection(serializableConfiguration.get())) { + Admin admin = connection.getAdmin(); + checkArgument( + admin.tableExists(TableName.valueOf(tableId)), "Table %s does not exist", tableId); + } catch (IOException e) { + LOG.warn("Error checking whether table {} exists; proceeding.", tableId, e); + } + } - @Override - public PCollection<Result> expand(PBegin input) { - HBaseSource source = new HBaseSource(this, null /* estimatedSizeBytes */); - return input.getPipeline().apply(org.apache.beam.sdk.io.Read.from(source)); - } + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder.add(DisplayData.item("configuration", serializableConfiguration.get().toString())); + builder.add(DisplayData.item("tableId", tableId)); + builder.addIfNotNull(DisplayData.item("scan", serializableScan.get().toString())); + } - @Override - public void validate(PipelineOptions options) { - checkArgument(serializableConfiguration != null, - "Configuration not provided"); - checkArgument(!tableId.isEmpty(), "Table ID not specified"); - try (Connection connection = ConnectionFactory.createConnection( - serializableConfiguration.get())) { - Admin admin = connection.getAdmin(); - checkArgument(admin.tableExists(TableName.valueOf(tableId)), - "Table %s does not exist", tableId); - } catch (IOException e) { - LOG.warn("Error checking whether table {} exists; proceeding.", tableId, e); - } - } + public String getTableId() { + return tableId; + } - @Override - public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); - builder.add(DisplayData.item("configuration", - serializableConfiguration.get().toString())); - builder.add(DisplayData.item("tableId", tableId)); - builder.addIfNotNull(DisplayData.item("scan", serializableScan.get().toString())); - } + public Configuration getConfiguration() { + return serializableConfiguration.get(); + } - public String getTableId() { - return tableId; - } + /** Returns the range of keys that will be read from the table. */ + public ByteKeyRange getKeyRange() { + byte[] startRow = serializableScan.get().getStartRow(); + byte[] stopRow = serializableScan.get().getStopRow(); + return ByteKeyRange.of(ByteKey.copyFrom(startRow), ByteKey.copyFrom(stopRow)); + } - public Configuration getConfiguration() { - return serializableConfiguration.get(); - } + private final SerializableConfiguration serializableConfiguration; + private final String tableId; + private final SerializableScan serializableScan; + } - /** - * Returns the range of keys that will be read from the table. - */ - public ByteKeyRange getKeyRange() { - byte[] startRow = serializableScan.get().getStartRow(); - byte[] stopRow = serializableScan.get().getStopRow(); - return ByteKeyRange.of(ByteKey.copyFrom(startRow), ByteKey.copyFrom(stopRow)); - } + static class HBaseSource extends BoundedSource<Result> { + private final Read read; + @Nullable private Long estimatedSizeBytes; - private final SerializableConfiguration serializableConfiguration; - private final String tableId; - private final SerializableScan serializableScan; + HBaseSource(Read read, @Nullable Long estimatedSizeBytes) { + this.read = read; + this.estimatedSizeBytes = estimatedSizeBytes; } - static class HBaseSource extends BoundedSource<Result> { - private final Read read; - @Nullable private Long estimatedSizeBytes; - - HBaseSource(Read read, @Nullable Long estimatedSizeBytes) { - this.read = read; - this.estimatedSizeBytes = estimatedSizeBytes; - } + HBaseSource withStartKey(ByteKey startKey) throws IOException { + checkNotNull(startKey, "startKey"); + Read newRead = + new Read( + read.serializableConfiguration, + read.tableId, + new SerializableScan( + new Scan(read.serializableScan.get()).setStartRow(startKey.getBytes()))); + return new HBaseSource(newRead, estimatedSizeBytes); + } - HBaseSource withStartKey(ByteKey startKey) throws IOException { - checkNotNull(startKey, "startKey"); - Read newRead = new Read(read.serializableConfiguration, read.tableId, - new SerializableScan( - new Scan(read.serializableScan.get()).setStartRow(startKey.getBytes()))); - return new HBaseSource(newRead, estimatedSizeBytes); - } + HBaseSource withEndKey(ByteKey endKey) throws IOException { + checkNotNull(endKey, "endKey"); + Read newRead = + new Read( + read.serializableConfiguration, + read.tableId, + new SerializableScan( + new Scan(read.serializableScan.get()).setStopRow(endKey.getBytes()))); + return new HBaseSource(newRead, estimatedSizeBytes); + } - HBaseSource withEndKey(ByteKey endKey) throws IOException { - checkNotNull(endKey, "endKey"); - Read newRead = new Read(read.serializableConfiguration, read.tableId, - new SerializableScan( - new Scan(read.serializableScan.get()).setStopRow(endKey.getBytes()))); - return new HBaseSource(newRead, estimatedSizeBytes); - } + @Override + public long getEstimatedSizeBytes(PipelineOptions pipelineOptions) throws Exception { + if (estimatedSizeBytes == null) { + estimatedSizeBytes = estimateSizeBytes(); + LOG.debug( + "Estimated size {} bytes for table {} and scan {}", + estimatedSizeBytes, + read.tableId, + read.serializableScan.get()); + } + return estimatedSizeBytes; + } - @Override - public long getEstimatedSizeBytes(PipelineOptions pipelineOptions) throws Exception { - if (estimatedSizeBytes == null) { - estimatedSizeBytes = estimateSizeBytes(); - LOG.debug("Estimated size {} bytes for table {} and scan {}", estimatedSizeBytes, - read.tableId, read.serializableScan.get()); + /** + * This estimates the real size, it can be the compressed size depending on the HBase + * configuration. + */ + private long estimateSizeBytes() throws Exception { + // This code is based on RegionSizeCalculator in hbase-server + long estimatedSizeBytes = 0L; + Configuration configuration = this.read.serializableConfiguration.get(); + try (Connection connection = ConnectionFactory.createConnection(configuration)) { + // filter regions for the given table/scan + List<HRegionLocation> regionLocations = getRegionLocations(connection); + + // builds set of regions who are part of the table scan + Set<byte[]> tableRegions = new TreeSet<>(Bytes.BYTES_COMPARATOR); + for (HRegionLocation regionLocation : regionLocations) { + tableRegions.add(regionLocation.getRegionInfo().getRegionName()); + } + + // calculate estimated size for the regions + Admin admin = connection.getAdmin(); + ClusterStatus clusterStatus = admin.getClusterStatus(); + Collection<ServerName> servers = clusterStatus.getServers(); + for (ServerName serverName : servers) { + ServerLoad serverLoad = clusterStatus.getLoad(serverName); + for (RegionLoad regionLoad : serverLoad.getRegionsLoad().values()) { + byte[] regionId = regionLoad.getName(); + if (tableRegions.contains(regionId)) { + long regionSizeBytes = regionLoad.getStorefileSizeMB() * 1_048_576L; + estimatedSizeBytes += regionSizeBytes; } - return estimatedSizeBytes; + } } + } + return estimatedSizeBytes; + } - /** - * This estimates the real size, it can be the compressed size depending on the HBase - * configuration. - */ - private long estimateSizeBytes() throws Exception { - // This code is based on RegionSizeCalculator in hbase-server - long estimatedSizeBytes = 0L; - Configuration configuration = this.read.serializableConfiguration.get(); - try (Connection connection = ConnectionFactory.createConnection(configuration)) { - // filter regions for the given table/scan - List<HRegionLocation> regionLocations = getRegionLocations(connection); - - // builds set of regions who are part of the table scan - Set<byte[]> tableRegions = new TreeSet<>(Bytes.BYTES_COMPARATOR); - for (HRegionLocation regionLocation : regionLocations) { - tableRegions.add(regionLocation.getRegionInfo().getRegionName()); - } - - // calculate estimated size for the regions - Admin admin = connection.getAdmin(); - ClusterStatus clusterStatus = admin.getClusterStatus(); - Collection<ServerName> servers = clusterStatus.getServers(); - for (ServerName serverName : servers) { - ServerLoad serverLoad = clusterStatus.getLoad(serverName); - for (RegionLoad regionLoad : serverLoad.getRegionsLoad().values()) { - byte[] regionId = regionLoad.getName(); - if (tableRegions.contains(regionId)) { - long regionSizeBytes = regionLoad.getStorefileSizeMB() * 1_048_576L; - estimatedSizeBytes += regionSizeBytes; - } - } - } - } - return estimatedSizeBytes; - } + private List<HRegionLocation> getRegionLocations(Connection connection) throws Exception { + final Scan scan = read.serializableScan.get(); + byte[] startRow = scan.getStartRow(); + byte[] stopRow = scan.getStopRow(); - private List<HRegionLocation> getRegionLocations(Connection connection) throws Exception { - final Scan scan = read.serializableScan.get(); - byte[] startRow = scan.getStartRow(); - byte[] stopRow = scan.getStopRow(); - - final List<HRegionLocation> regionLocations = new ArrayList<>(); - - final boolean scanWithNoLowerBound = startRow.length == 0; - final boolean scanWithNoUpperBound = stopRow.length == 0; - - TableName tableName = TableName.valueOf(read.tableId); - RegionLocator regionLocator = connection.getRegionLocator(tableName); - List<HRegionLocation> tableRegionInfos = regionLocator.getAllRegionLocations(); - for (HRegionLocation regionLocation : tableRegionInfos) { - final byte[] startKey = regionLocation.getRegionInfo().getStartKey(); - final byte[] endKey = regionLocation.getRegionInfo().getEndKey(); - boolean isLastRegion = endKey.length == 0; - // filters regions who are part of the scan - if ((scanWithNoLowerBound - || isLastRegion || Bytes.compareTo(startRow, endKey) < 0) - && (scanWithNoUpperBound || Bytes.compareTo(stopRow, startKey) > 0)) { - regionLocations.add(regionLocation); - } - } + final List<HRegionLocation> regionLocations = new ArrayList<>(); - return regionLocations; - } + final boolean scanWithNoLowerBound = startRow.length == 0; + final boolean scanWithNoUpperBound = stopRow.length == 0; - private List<HBaseSource> - splitBasedOnRegions(List<HRegionLocation> regionLocations, int numSplits) - throws Exception { - final Scan scan = read.serializableScan.get(); - byte[] startRow = scan.getStartRow(); - byte[] stopRow = scan.getStopRow(); - - final List<HBaseSource> sources = new ArrayList<>(numSplits); - final boolean scanWithNoLowerBound = startRow.length == 0; - final boolean scanWithNoUpperBound = stopRow.length == 0; - - for (HRegionLocation regionLocation : regionLocations) { - final byte[] startKey = regionLocation.getRegionInfo().getStartKey(); - final byte[] endKey = regionLocation.getRegionInfo().getEndKey(); - boolean isLastRegion = endKey.length == 0; - String host = regionLocation.getHostnamePort(); - - final byte[] splitStart = (scanWithNoLowerBound - || Bytes.compareTo(startKey, startRow) >= 0) ? startKey : startRow; - final byte[] splitStop = - (scanWithNoUpperBound || Bytes.compareTo(endKey, stopRow) <= 0) - && !isLastRegion ? endKey : stopRow; - - LOG.debug("{} {} {} {} {}", sources.size(), host, read.tableId, - Bytes.toString(splitStart), Bytes.toString(splitStop)); - - // We need to create a new copy of the scan and read to add the new ranges - Scan newScan = new Scan(scan).setStartRow(splitStart).setStopRow(splitStop); - Read newRead = new Read(read.serializableConfiguration, read.tableId, - new SerializableScan(newScan)); - sources.add(new HBaseSource(newRead, estimatedSizeBytes)); - } - return sources; + TableName tableName = TableName.valueOf(read.tableId); + RegionLocator regionLocator = connection.getRegionLocator(tableName); + List<HRegionLocation> tableRegionInfos = regionLocator.getAllRegionLocations(); + for (HRegionLocation regionLocation : tableRegionInfos) { + final byte[] startKey = regionLocation.getRegionInfo().getStartKey(); + final byte[] endKey = regionLocation.getRegionInfo().getEndKey(); + boolean isLastRegion = endKey.length == 0; + // filters regions who are part of the scan + if ((scanWithNoLowerBound || isLastRegion || Bytes.compareTo(startRow, endKey) < 0) + && (scanWithNoUpperBound || Bytes.compareTo(stopRow, startKey) > 0)) { + regionLocations.add(regionLocation); } + } + + return regionLocations; + } + + private List<HBaseSource> splitBasedOnRegions( + List<HRegionLocation> regionLocations, int numSplits) throws Exception { + final Scan scan = read.serializableScan.get(); + byte[] startRow = scan.getStartRow(); + byte[] stopRow = scan.getStopRow(); + + final List<HBaseSource> sources = new ArrayList<>(numSplits); + final boolean scanWithNoLowerBound = startRow.length == 0; + final boolean scanWithNoUpperBound = stopRow.length == 0; + + for (HRegionLocation regionLocation : regionLocations) { + final byte[] startKey = regionLocation.getRegionInfo().getStartKey(); + final byte[] endKey = regionLocation.getRegionInfo().getEndKey(); + boolean isLastRegion = endKey.length == 0; + String host = regionLocation.getHostnamePort(); + + final byte[] splitStart = + (scanWithNoLowerBound || Bytes.compareTo(startKey, startRow) >= 0) + ? startKey + : startRow; + final byte[] splitStop = + (scanWithNoUpperBound || Bytes.compareTo(endKey, stopRow) <= 0) && !isLastRegion + ? endKey + : stopRow; + + LOG.debug( + "{} {} {} {} {}", + sources.size(), + host, + read.tableId, + Bytes.toString(splitStart), + Bytes.toString(splitStop)); + + // We need to create a new copy of the scan and read to add the new ranges + Scan newScan = new Scan(scan).setStartRow(splitStart).setStopRow(splitStop); + Read newRead = + new Read(read.serializableConfiguration, read.tableId, new SerializableScan(newScan)); + sources.add(new HBaseSource(newRead, estimatedSizeBytes)); + } + return sources; + } @Override public List<? extends BoundedSource<Result>> split( long desiredBundleSizeBytes, PipelineOptions options) throws Exception { - LOG.debug("desiredBundleSize {} bytes", desiredBundleSizeBytes); - long estimatedSizeBytes = getEstimatedSizeBytes(options); - int numSplits = 1; - if (estimatedSizeBytes > 0 && desiredBundleSizeBytes > 0) { - numSplits = (int) Math.ceil((double) estimatedSizeBytes / desiredBundleSizeBytes); - } - - try (Connection connection = ConnectionFactory.createConnection( - read.getConfiguration())) { - List<HRegionLocation> regionLocations = getRegionLocations(connection); - int realNumSplits = - numSplits < regionLocations.size() ? regionLocations.size() : numSplits; - LOG.debug("Suggested {} bundle(s) based on size", numSplits); - LOG.debug("Suggested {} bundle(s) based on number of regions", - regionLocations.size()); - final List<HBaseSource> sources = splitBasedOnRegions(regionLocations, - realNumSplits); - LOG.debug("Split into {} bundle(s)", sources.size()); - if (numSplits >= 1) { - return sources; - } - return Collections.singletonList(this); - } - } - - @Override - public BoundedReader<Result> createReader(PipelineOptions pipelineOptions) - throws IOException { - return new HBaseReader(this); - } - - @Override - public void validate() { - read.validate(null /* input */); - } + LOG.debug("desiredBundleSize {} bytes", desiredBundleSizeBytes); + long estimatedSizeBytes = getEstimatedSizeBytes(options); + int numSplits = 1; + if (estimatedSizeBytes > 0 && desiredBundleSizeBytes > 0) { + numSplits = (int) Math.ceil((double) estimatedSizeBytes / desiredBundleSizeBytes); + } + + try (Connection connection = ConnectionFactory.createConnection(read.getConfiguration())) { + List<HRegionLocation> regionLocations = getRegionLocations(connection); + int realNumSplits = numSplits < regionLocations.size() ? regionLocations.size() : numSplits; + LOG.debug("Suggested {} bundle(s) based on size", numSplits); + LOG.debug("Suggested {} bundle(s) based on number of regions", regionLocations.size()); + final List<HBaseSource> sources = splitBasedOnRegions(regionLocations, realNumSplits); + LOG.debug("Split into {} bundle(s)", sources.size()); + if (numSplits >= 1) { + return sources; + } + return Collections.singletonList(this); + } + } - @Override - public void populateDisplayData(DisplayData.Builder builder) { - read.populateDisplayData(builder); - } + @Override + public BoundedReader<Result> createReader(PipelineOptions pipelineOptions) throws IOException { + return new HBaseReader(this); + } - @Override - public Coder<Result> getOutputCoder() { - return HBaseResultCoder.of(); - } + @Override + public void validate() { + read.validate(null /* input */); } - private static class HBaseReader extends BoundedSource.BoundedReader<Result> { - private HBaseSource source; - private Connection connection; - private ResultScanner scanner; - private Iterator<Result> iter; - private Result current; - private final ByteKeyRangeTracker rangeTracker; - private long recordsReturned; - - HBaseReader(HBaseSource source) { - this.source = source; - Scan scan = source.read.serializableScan.get(); - ByteKeyRange range = ByteKeyRange - .of(ByteKey.copyFrom(scan.getStartRow()), ByteKey.copyFrom(scan.getStopRow())); - rangeTracker = ByteKeyRangeTracker.of(range); - } + @Override + public void populateDisplayData(DisplayData.Builder builder) { + read.populateDisplayData(builder); + } - @Override - public boolean start() throws IOException { - HBaseSource source = getCurrentSource(); - Configuration configuration = source.read.serializableConfiguration.get(); - String tableId = source.read.tableId; - connection = ConnectionFactory.createConnection(configuration); - TableName tableName = TableName.valueOf(tableId); - Table table = connection.getTable(tableName); - // [BEAM-2319] We have to clone the Scan because the underlying scanner may mutate it. - Scan scanClone = new Scan(source.read.serializableScan.get()); - scanner = table.getScanner(scanClone); - iter = scanner.iterator(); - return advance(); - } + @Override + public Coder<Result> getOutputCoder() { + return HBaseResultCoder.of(); + } + } + + private static class HBaseReader extends BoundedSource.BoundedReader<Result> { + private HBaseSource source; + private Connection connection; + private ResultScanner scanner; + private Iterator<Result> iter; + private Result current; + private final ByteKeyRangeTracker rangeTracker; + private long recordsReturned; + + HBaseReader(HBaseSource source) { + this.source = source; + Scan scan = source.read.serializableScan.get(); + ByteKeyRange range = + ByteKeyRange.of( + ByteKey.copyFrom(scan.getStartRow()), ByteKey.copyFrom(scan.getStopRow())); + rangeTracker = ByteKeyRangeTracker.of(range); + } - @Override - public Result getCurrent() throws NoSuchElementException { - return current; - } + @Override + public boolean start() throws IOException { + HBaseSource source = getCurrentSource(); + Configuration configuration = source.read.serializableConfiguration.get(); + String tableId = source.read.tableId; + connection = ConnectionFactory.createConnection(configuration); + TableName tableName = TableName.valueOf(tableId); + Table table = connection.getTable(tableName); + // [BEAM-2319] We have to clone the Scan because the underlying scanner may mutate it. + Scan scanClone = new Scan(source.read.serializableScan.get()); + scanner = table.getScanner(scanClone); + iter = scanner.iterator(); + return advance(); + } - @Override - public boolean advance() throws IOException { - if (!iter.hasNext()) { - return rangeTracker.markDone(); - } - final Result next = iter.next(); - boolean hasRecord = - rangeTracker.tryReturnRecordAt(true, ByteKey.copyFrom(next.getRow())) - || rangeTracker.markDone(); - if (hasRecord) { - current = next; - ++recordsReturned; - } - return hasRecord; - } + @Override + public Result getCurrent() throws NoSuchElementException { + return current; + } - @Override - public void close() throws IOException { - LOG.debug("Closing reader after reading {} records.", recordsReturned); - if (scanner != null) { - scanner.close(); - scanner = null; - } - if (connection != null) { - connection.close(); - connection = null; - } - } + @Override + public boolean advance() throws IOException { + if (!iter.hasNext()) { + return rangeTracker.markDone(); + } + final Result next = iter.next(); + boolean hasRecord = + rangeTracker.tryReturnRecordAt(true, ByteKey.copyFrom(next.getRow())) + || rangeTracker.markDone(); + if (hasRecord) { + current = next; + ++recordsReturned; + } + return hasRecord; + } - @Override - public synchronized HBaseSource getCurrentSource() { - return source; - } + @Override + public void close() throws IOException { + LOG.debug("Closing reader after reading {} records.", recordsReturned); + if (scanner != null) { + scanner.close(); + scanner = null; + } + if (connection != null) { + connection.close(); + connection = null; + } + } - @Override - public final Double getFractionConsumed() { - return rangeTracker.getFractionConsumed(); - } + @Override + public synchronized HBaseSource getCurrentSource() { + return source; + } - @Override - public final long getSplitPointsConsumed() { - return rangeTracker.getSplitPointsConsumed(); - } + @Override + public final Double getFractionConsumed() { + return rangeTracker.getFractionConsumed(); + } - @Override - @Nullable - public final synchronized HBaseSource splitAtFraction(double fraction) { - ByteKey splitKey; - try { - splitKey = rangeTracker.getRange().interpolateKey(fraction); - } catch (RuntimeException e) { - LOG.info("{}: Failed to interpolate key for fraction {}.", rangeTracker.getRange(), - fraction, e); - return null; - } - LOG.info( - "Proposing to split {} at fraction {} (key {})", rangeTracker, fraction, splitKey); - HBaseSource primary; - HBaseSource residual; - try { - primary = source.withEndKey(splitKey); - residual = source.withStartKey(splitKey); - } catch (Exception e) { - LOG.info( - "{}: Interpolating for fraction {} yielded invalid split key {}.", - rangeTracker.getRange(), - fraction, - splitKey, - e); - return null; - } - if (!rangeTracker.trySplitAtPosition(splitKey)) { - return null; - } - this.source = primary; - return residual; - } + @Override + public final long getSplitPointsConsumed() { + return rangeTracker.getSplitPointsConsumed(); } + @Override + @Nullable + public final synchronized HBaseSource splitAtFraction(double fraction) { + ByteKey splitKey; + try { + splitKey = rangeTracker.getRange().interpolateKey(fraction); + } catch (RuntimeException e) { + LOG.info( + "{}: Failed to interpolate key for fraction {}.", rangeTracker.getRange(), fraction, e); + return null; + } + LOG.info("Proposing to split {} at fraction {} (key {})", rangeTracker, fraction, splitKey); + HBaseSource primary; + HBaseSource residual; + try { + primary = source.withEndKey(splitKey); + residual = source.withStartKey(splitKey); + } catch (Exception e) { + LOG.info( + "{}: Interpolating for fraction {} yielded invalid split key {}.", + rangeTracker.getRange(), + fraction, + splitKey, + e); + return null; + } + if (!rangeTracker.trySplitAtPosition(splitKey)) { + return null; + } + this.source = primary; + return residual; + } + } + + /** + * Creates an uninitialized {@link HBaseIO.Write}. Before use, the {@code Write} must be + * initialized with a {@link HBaseIO.Write#withConfiguration(Configuration)} that specifies the + * destination HBase instance, and a {@link HBaseIO.Write#withTableId tableId} that specifies + * which table to write. + */ + public static Write write() { + return new Write(null /* SerializableConfiguration */, ""); + } + + /** + * A {@link PTransform} that writes to HBase. See the class-level Javadoc on {@link HBaseIO} for + * more information. + * + * @see HBaseIO + */ + public static class Write extends PTransform<PCollection<Mutation>, PDone> { /** - * Creates an uninitialized {@link HBaseIO.Write}. Before use, the {@code Write} must be - * initialized with a - * {@link HBaseIO.Write#withConfiguration(Configuration)} that specifies - * the destination HBase instance, and a {@link HBaseIO.Write#withTableId tableId} - * that specifies which table to write. + * Returns a new {@link HBaseIO.Write} that will write to the HBase instance indicated by the + * given Configuration, and using any other specified customizations. + * + * <p>Does not modify this object. */ - public static Write write() { - return new Write(null /* SerializableConfiguration */, ""); + public Write withConfiguration(Configuration configuration) { + checkNotNull(configuration, "conf"); + return new Write(new SerializableConfiguration(configuration), tableId); } /** - * A {@link PTransform} that writes to HBase. See the class-level Javadoc on - * {@link HBaseIO} for more information. + * Returns a new {@link HBaseIO.Write} that will write to the specified table. * - * @see HBaseIO + * <p>Does not modify this object. */ - public static class Write extends PTransform<PCollection<Mutation>, PDone> { - /** - * Returns a new {@link HBaseIO.Write} that will write to the HBase instance - * indicated by the given Configuration, and using any other specified customizations. - * - * <p>Does not modify this object. - */ - public Write withConfiguration(Configuration configuration) { - checkNotNull(configuration, "conf"); - return new Write(new SerializableConfiguration(configuration), tableId); - } - - /** - * Returns a new {@link HBaseIO.Write} that will write to the specified table. - * - * <p>Does not modify this object. - */ - public Write withTableId(String tableId) { - checkNotNull(tableId, "tableId"); - return new Write(serializableConfiguration, tableId); - } - - private Write(SerializableConfiguration serializableConfiguration, String tableId) { - this.serializableConfiguration = serializableConfiguration; - this.tableId = tableId; - } - - @Override - public PDone expand(PCollection<Mutation> input) { - input.apply(ParDo.of(new HBaseWriterFn(tableId, serializableConfiguration))); - return PDone.in(input.getPipeline()); - } - - @Override - public void validate(PipelineOptions options) { - checkArgument(serializableConfiguration != null, "Configuration not specified"); - checkArgument(!tableId.isEmpty(), "Table ID not specified"); - try (Connection connection = ConnectionFactory.createConnection( - serializableConfiguration.get())) { - Admin admin = connection.getAdmin(); - checkArgument(admin.tableExists(TableName.valueOf(tableId)), - "Table %s does not exist", tableId); - } catch (IOException e) { - LOG.warn("Error checking whether table {} exists; proceeding.", tableId, e); - } - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); - builder.add(DisplayData.item("configuration", - serializableConfiguration.get().toString())); - builder.add(DisplayData.item("tableId", tableId)); - } - - public String getTableId() { - return tableId; - } - - public Configuration getConfiguration() { - return serializableConfiguration.get(); - } - - private final String tableId; - private final SerializableConfiguration serializableConfiguration; - - private class HBaseWriterFn extends DoFn<Mutation, Void> { - - public HBaseWriterFn(String tableId, - SerializableConfiguration serializableConfiguration) { - this.tableId = checkNotNull(tableId, "tableId"); - this.serializableConfiguration = checkNotNull(serializableConfiguration, - "serializableConfiguration"); - } - - @Setup - public void setup() throws Exception { - connection = ConnectionFactory.createConnection(serializableConfiguration.get()); - } - - @StartBundle - public void startBundle(StartBundleContext c) throws IOException { - BufferedMutatorParams params = - new BufferedMutatorParams(TableName.valueOf(tableId)); - mutator = connection.getBufferedMutator(params); - recordsWritten = 0; - } + public Write withTableId(String tableId) { + checkNotNull(tableId, "tableId"); + return new Write(serializableConfiguration, tableId); + } - @ProcessElement - public void processElement(ProcessContext c) throws Exception { - mutator.mutate(c.element()); - ++recordsWritten; - } + private Write(SerializableConfiguration serializableConfiguration, String tableId) { + this.serializableConfiguration = serializableConfiguration; + this.tableId = tableId; + } - @FinishBundle - public void finishBundle() throws Exception { - mutator.flush(); - LOG.debug("Wrote {} records", recordsWritten); - } + @Override + public PDone expand(PCollection<Mutation> input) { + input.apply(ParDo.of(new HBaseWriterFn(tableId, serializableConfiguration))); + return PDone.in(input.getPipeline()); + } - @Teardown - public void tearDown() throws Exception { - if (mutator != null) { - mutator.close(); - mutator = null; - } - if (connection != null) { - connection.close(); - connection = null; - } - } + @Override + public void validate(PipelineOptions options) { + checkArgument(serializableConfiguration != null, "Configuration not specified"); + checkArgument(!tableId.isEmpty(), "Table ID not specified"); + try (Connection connection = + ConnectionFactory.createConnection(serializableConfiguration.get())) { + Admin admin = connection.getAdmin(); + checkArgument( + admin.tableExists(TableName.valueOf(tableId)), "Table %s does not exist", tableId); + } catch (IOException e) { + LOG.warn("Error checking whether table {} exists; proceeding.", tableId, e); + } + } - @Override - public void populateDisplayData(DisplayData.Builder builder) { - builder.delegate(Write.this); - } + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder.add(DisplayData.item("configuration", serializableConfiguration.get().toString())); + builder.add(DisplayData.item("tableId", tableId)); + } - private final String tableId; - private final SerializableConfiguration serializableConfiguration; + public String getTableId() { + return tableId; + } - private Connection connection; - private BufferedMutator mutator; + public Configuration getConfiguration() { + return serializableConfiguration.get(); + } - private long recordsWritten; - } + private final String tableId; + private final SerializableConfiguration serializableConfiguration; + + private class HBaseWriterFn extends DoFn<Mutation, Void> { + + public HBaseWriterFn(String tableId, SerializableConfiguration serializableConfiguration) { + this.tableId = checkNotNull(tableId, "tableId"); + this.serializableConfiguration = + checkNotNull(serializableConfiguration, "serializableConfiguration"); + } + + @Setup + public void setup() throws Exception { + connection = ConnectionFactory.createConnection(serializableConfiguration.get()); + } + + @StartBundle + public void startBundle(StartBundleContext c) throws IOException { + BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf(tableId)); + mutator = connection.getBufferedMutator(params); + recordsWritten = 0; + } + + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + mutator.mutate(c.element()); + ++recordsWritten; + } + + @FinishBundle + public void finishBundle() throws Exception { + mutator.flush(); + LOG.debug("Wrote {} records", recordsWritten); + } + + @Teardown + public void tearDown() throws Exception { + if (mutator != null) { + mutator.close(); + mutator = null; + } + if (connection != null) { + connection.close(); + connection = null; + } + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.delegate(Write.this); + } + + private final String tableId; + private final SerializableConfiguration serializableConfiguration; + + private Connection connection; + private BufferedMutator mutator; + + private long recordsWritten; } + } } http://git-wip-us.apache.org/repos/asf/beam/blob/e5bdedd2/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java index ee83114..e7a36d5 100644 --- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java +++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java @@ -71,30 +71,29 @@ class HBaseMutationCoder extends AtomicCoder<Mutation> implements Serializable { } /** - * Returns a {@link CoderProvider} which uses the {@link HBaseMutationCoder} for - * {@link Mutation mutations}. + * Returns a {@link CoderProvider} which uses the {@link HBaseMutationCoder} for {@link Mutation + * mutations}. */ static CoderProvider getCoderProvider() { return HBASE_MUTATION_CODER_PROVIDER; } private static final CoderProvider HBASE_MUTATION_CODER_PROVIDER = - new HBaseMutationCoderProvider(); + new HBaseMutationCoderProvider(); - /** - * A {@link CoderProvider} for {@link Mutation mutations}. - */ + /** A {@link CoderProvider} for {@link Mutation mutations}. */ private static class HBaseMutationCoderProvider extends CoderProvider { @Override - public <T> Coder<T> coderFor(TypeDescriptor<T> typeDescriptor, - List<? extends Coder<?>> componentCoders) throws CannotProvideCoderException { + public <T> Coder<T> coderFor( + TypeDescriptor<T> typeDescriptor, List<? extends Coder<?>> componentCoders) + throws CannotProvideCoderException { if (!typeDescriptor.isSubtypeOf(HBASE_MUTATION_TYPE_DESCRIPTOR)) { throw new CannotProvideCoderException( - String.format( - "Cannot provide %s because %s is not a subclass of %s", - HBaseMutationCoder.class.getSimpleName(), - typeDescriptor, - Mutation.class.getName())); + String.format( + "Cannot provide %s because %s is not a subclass of %s", + HBaseMutationCoder.class.getSimpleName(), + typeDescriptor, + Mutation.class.getName())); } try { @@ -106,5 +105,5 @@ class HBaseMutationCoder extends AtomicCoder<Mutation> implements Serializable { } private static final TypeDescriptor<Mutation> HBASE_MUTATION_TYPE_DESCRIPTOR = - new TypeDescriptor<Mutation>() {}; + new TypeDescriptor<Mutation>() {}; } http://git-wip-us.apache.org/repos/asf/beam/blob/e5bdedd2/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseResultCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseResultCoder.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseResultCoder.java index 1d06635..bce1567 100644 --- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseResultCoder.java +++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseResultCoder.java @@ -41,14 +41,12 @@ class HBaseResultCoder extends AtomicCoder<Result> implements Serializable { } @Override - public void encode(Result value, OutputStream outputStream) - throws IOException { + public void encode(Result value, OutputStream outputStream) throws IOException { ProtobufUtil.toResult(value).writeDelimitedTo(outputStream); } @Override - public Result decode(InputStream inputStream) - throws IOException { + public Result decode(InputStream inputStream) throws IOException { return ProtobufUtil.toResult(ClientProtos.Result.parseDelimitedFrom(inputStream)); } } http://git-wip-us.apache.org/repos/asf/beam/blob/e5bdedd2/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/SerializableScan.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/SerializableScan.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/SerializableScan.java index f3bc7ac..6ed3c51 100644 --- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/SerializableScan.java +++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/SerializableScan.java @@ -25,31 +25,28 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; -/** - * This is just a wrapper class to serialize HBase {@link Scan} using Protobuf. - */ +/** This is just a wrapper class to serialize HBase {@link Scan} using Protobuf. */ class SerializableScan implements Serializable { - private transient Scan scan; + private transient Scan scan; - public SerializableScan() { - } + public SerializableScan() {} - public SerializableScan(Scan scan) { - if (scan == null) { - throw new NullPointerException("Scan must not be null."); - } - this.scan = scan; + public SerializableScan(Scan scan) { + if (scan == null) { + throw new NullPointerException("Scan must not be null."); } + this.scan = scan; + } - private void writeObject(ObjectOutputStream out) throws IOException { - ProtobufUtil.toScan(scan).writeDelimitedTo(out); - } + private void writeObject(ObjectOutputStream out) throws IOException { + ProtobufUtil.toScan(scan).writeDelimitedTo(out); + } - private void readObject(ObjectInputStream in) throws IOException { - scan = ProtobufUtil.toScan(ClientProtos.Scan.parseDelimitedFrom(in)); - } + private void readObject(ObjectInputStream in) throws IOException { + scan = ProtobufUtil.toScan(ClientProtos.Scan.parseDelimitedFrom(in)); + } - public Scan get() { - return scan; - } + public Scan get() { + return scan; + } } http://git-wip-us.apache.org/repos/asf/beam/blob/e5bdedd2/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrarTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrarTest.java b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrarTest.java index 5b2e138..25369fc 100644 --- a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrarTest.java +++ b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrarTest.java @@ -26,9 +26,7 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -/** - * Tests for {@link HBaseCoderProviderRegistrar}. - */ +/** Tests for {@link HBaseCoderProviderRegistrar}. */ @RunWith(JUnit4.class) public class HBaseCoderProviderRegistrarTest { @Test