HBASE-18177 FanOutOneBlockAsyncDFSOutputHelper fails to compile against Hadoop 3
Because ClientProtocol::create has API changes between Hadoop 2/3 Signed-off-by: zhangduo <zhang...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/cb5299ae Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/cb5299ae Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/cb5299ae Branch: refs/heads/HBASE-18147 Commit: cb5299ae9b3360a6cca93958a74417d663135a60 Parents: 22dce22 Author: Mike Drob <md...@apache.org> Authored: Mon Jun 26 11:29:34 2017 -0500 Committer: zhangduo <zhang...@apache.org> Committed: Wed Jul 12 13:40:05 2017 +0800 ---------------------------------------------------------------------- .../FanOutOneBlockAsyncDFSOutputHelper.java | 63 +++++++++++++++++++- 1 file changed, 62 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/cb5299ae/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java index d14d4d8..1dbe131 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java @@ -67,6 +67,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.crypto.CryptoProtocolVersion; import org.apache.hadoop.crypto.Encryptor; +import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystemLinkResolver; import org.apache.hadoop.fs.Path; @@ -195,6 +196,32 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { private static final ChecksumCreater CHECKSUM_CREATER; + // helper class for creating files. + private interface FileCreator { + default HdfsFileStatus create(ClientProtocol instance, String src, FsPermission masked, + String clientName, EnumSetWritable<CreateFlag> flag, + boolean createParent, short replication, long blockSize, + CryptoProtocolVersion[] supportedVersions) throws Exception { + try { + return (HdfsFileStatus) createObject(instance, src, masked, clientName, flag, createParent, + replication, blockSize, supportedVersions); + } catch (InvocationTargetException e) { + if (e.getCause() instanceof Exception) { + throw (Exception) e.getCause(); + } else { + throw new RuntimeException(e.getCause()); + } + } + }; + + Object createObject(ClientProtocol instance, String src, FsPermission masked, + String clientName, EnumSetWritable<CreateFlag> flag, + boolean createParent, short replication, long blockSize, + CryptoProtocolVersion[] supportedVersions) throws Exception; + } + + private static final FileCreator FILE_CREATOR; + private static DFSClientAdaptor createDFSClientAdaptor() throws NoSuchMethodException { Method isClientRunningMethod = DFSClient.class.getDeclaredMethod("isClientRunning"); isClientRunningMethod.setAccessible(true); @@ -460,6 +487,39 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { return createChecksumCreater27(Class.forName("org.apache.hadoop.hdfs.DFSClient$Conf")); } + private static FileCreator createFileCreator3() throws NoSuchMethodException { + Method createMethod = ClientProtocol.class.getMethod("create", String.class, FsPermission.class, + String.class, EnumSetWritable.class, boolean.class, short.class, long.class, CryptoProtocolVersion[].class, + String.class); + + return (instance, src, masked, clientName, flag, createParent, replication, blockSize, + supportedVersions) -> { + return (HdfsFileStatus) createMethod.invoke(instance, + src, masked, clientName, flag, createParent, replication, blockSize, supportedVersions, + null); + }; + } + + private static FileCreator createFileCreator2() throws NoSuchMethodException { + Method createMethod = ClientProtocol.class.getMethod("create", String.class, FsPermission.class, + String.class, EnumSetWritable.class, boolean.class, short.class, long.class, CryptoProtocolVersion[].class); + + return (instance, src, masked, clientName, flag, createParent, replication, blockSize, + supportedVersions) -> { + return (HdfsFileStatus) createMethod.invoke(instance, + src, masked, clientName, flag, createParent, replication, blockSize, supportedVersions); + }; + } + + private static FileCreator createFileCreator() throws NoSuchMethodException { + try { + return createFileCreator3(); + } catch (NoSuchMethodException e) { + LOG.debug("ClientProtocol::create wrong number of arguments, should be hadoop 2.x"); + } + return createFileCreator2(); + } + // cancel the processing if DFSClient is already closed. static final class CancelOnClose implements CancelableProgressable { @@ -484,6 +544,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { DFS_CLIENT_ADAPTOR = createDFSClientAdaptor(); PB_HELPER = createPBHelper(); CHECKSUM_CREATER = createChecksumCreater(); + FILE_CREATOR = createFileCreator(); } catch (Exception e) { String msg = "Couldn't properly initialize access to HDFS internals. Please " + "update your WAL Provider to not make use of the 'asyncfs' provider. See " @@ -679,7 +740,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { ClientProtocol namenode = client.getNamenode(); HdfsFileStatus stat; try { - stat = namenode.create(src, + stat = FILE_CREATOR.create(namenode, src, FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), clientName, new EnumSetWritable<>(overwrite ? EnumSet.of(CREATE, OVERWRITE) : EnumSet.of(CREATE)), createParent, replication, blockSize, CryptoProtocolVersion.supported());