The javadoc on the distinct function <http://twitter.github.io/scalding/index.html#com.twitter.scalding.typed.TypedPipe@distinct(implicitord:Ordering[_>:T]):com.twitter.scalding.typed.TypedPipe[T]> seems to comment on your use-case: "Returns the set of distinct elements in the TypedPipe This is the same as: .map((_, ())).group.sum.keys If you want a distinct while joining, consider: instead of: a.join(b.distinct.asKeys) manually do the distinct: a.join(b.asKeys.sum) The latter creates 1 map/reduce phase rather than 2"
Could you try that? (val joinedSets = data1.join(data2.asKeys.sum) ). Not sure if the hdfs permissions error you're running into is while temporary job output is being written out between 2 MR jobs / somewhere else. On Wed, Nov 2, 2016 at 11:54 PM, Nikhil J Joshi <[email protected]> wrote: > Hi, > I am trying to perform join with distinct keys in Scalding as > > val joinedSets = data1 .join(data2.distinct.asKeys) > > And the above operation raises hdfs permissions error (stacktrace below), > while the same without the distinct clause works well. I am imagining that > the clause is forcing some disk io at the wrong path. Can anyone suggest a > remedy? > > Thanks, > Nikhil > > stacktrace: > > Job setup failed : org.apache.hadoop.security.AccessControlException: > Permission denied: user=username, access=WRITE, > inode="/":hdfs:hdfs:drwxr-xr-x > at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker. > checkFsPermission(FSPermissionChecker.java:271) > at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker. > check(FSPermissionChecker.java:257) > at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker. > check(FSPermissionChecker.java:238) > at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker. > checkPermission(FSPermissionChecker.java:179) > at org.apache.hadoop.hdfs.server.namenode.FSNamesystem. > checkPermission(FSNamesystem.java:6630) > at org.apache.hadoop.hdfs.server.namenode.FSNamesystem. > checkPermission(FSNamesystem.java:6612) > at org.apache.hadoop.hdfs.server.namenode.FSNamesystem. > checkAncestorAccess(FSNamesystem.java:6564) > at org.apache.hadoop.hdfs.server.namenode.FSNamesystem. > mkdirsInternal(FSNamesystem.java:4368) > at org.apache.hadoop.hdfs.server.namenode.FSNamesystem. > mkdirsInt(FSNamesystem.java:4338) > at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirs( > FSNamesystem.java:4311) > at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer. > mkdirs(NameNodeRpcServer.java:853) > at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSi > deTranslatorPB.mkdirs(ClientNamenodeProtocolServerSi > deTranslatorPB.java:600) > at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ > ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos. > java) > at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call( > ProtobufRpcEngine.java:619) > at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:962) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2063) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2059) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at org.apache.hadoop.security.UserGroupInformation.doAs( > UserGroupInformation.java:1688) > at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2057) > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at sun.reflect.NativeConstructorAccessorImpl.newInstance( > NativeConstructorAccessorImpl.java:62) > at sun.reflect.DelegatingConstructorAccessorImpl.newInstance( > DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:408) > at org.apache.hadoop.ipc.RemoteException.instantiateException( > RemoteException.java:106) > at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException( > RemoteException.java:73) > at org.apache.hadoop.hdfs.DFSClient.primitiveMkdir(DFSClient.java:2744) > at org.apache.hadoop.hdfs.DFSClient.mkdirs(DFSClient.java:2713) > at org.apache.hadoop.hdfs.DistributedFileSystem$17. > doCall(DistributedFileSystem.java:870) > at org.apache.hadoop.hdfs.DistributedFileSystem$17. > doCall(DistributedFileSystem.java:866) > at org.apache.hadoop.fs.FileSystemLinkResolver.resolve( > FileSystemLinkResolver.java:81) > at org.apache.hadoop.hdfs.DistributedFileSystem.mkdirsInternal( > DistributedFileSystem.java:866) > at org.apache.hadoop.hdfs.DistributedFileSystem.mkdirs( > DistributedFileSystem.java:859) > at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1817) > at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob( > FileOutputCommitter.java:305) > at org.apache.hadoop.mapred.FileOutputCommitter.setupJob( > FileOutputCommitter.java:131) > at org.apache.hadoop.mapred.OutputCommitter.setupJob( > OutputCommitter.java:233) > at org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventHandler$ > EventProcessor.handleJobSetup(CommitterEventHandler.java:254) > at org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventHandler$ > EventProcessor.run(CommitterEventHandler.java:234) > at java.util.concurrent.ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1142) > at java.util.concurrent.ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.hadoop.ipc.RemoteException(org.apache. > hadoop.security.AccessControlException): Permission denied: > user=username, access=WRITE, inode="/":hdfs:hdfs:drwxr-xr-x > at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker. > checkFsPermission(FSPermissionChecker.java:271) > at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker. > check(FSPermissionChecker.java:257) > at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker. > check(FSPermissionChecker.java:238) > at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker. > checkPermission(FSPermissionChecker.java:179) > at org.apache.hadoop.hdfs.server.namenode.FSNamesystem. > checkPermission(FSNamesystem.java:6630) > at org.apache.hadoop.hdfs.server.namenode.FSNamesystem. > checkPermission(FSNamesystem.java:6612) > at org.apache.hadoop.hdfs.server.namenode.FSNamesystem. > checkAncestorAccess(FSNamesystem.java:6564) > at org.apache.hadoop.hdfs.server.namenode.FSNamesystem. > mkdirsInternal(FSNamesystem.java:4368) > at org.apache.hadoop.hdfs.server.namenode.FSNamesystem. > mkdirsInt(FSNamesystem.java:4338) > at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirs( > FSNamesystem.java:4311) > at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer. > mkdirs(NameNodeRpcServer.java:853) > at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSi > deTranslatorPB.mkdirs(ClientNamenodeProtocolServerSi > deTranslatorPB.java:600) > at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ > ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos. > java) > at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call( > ProtobufRpcEngine.java:619) > at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:962) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2063) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2059) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at org.apache.hadoop.security.UserGroupInformation.doAs( > UserGroupInformation.java:1688) > at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2057) > at org.apache.hadoop.ipc.Client.call(Client.java:1469) > at org.apache.hadoop.ipc.Client.call(Client.java:1400) > at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker. > invoke(ProtobufRpcEngine.java:232) > at com.sun.proxy.$Proxy9.mkdirs(Unknown Source) > at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslat > orPB.mkdirs(ClientNamenodeProtocolTranslatorPB.java:539) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at sun.reflect.NativeMethodAccessorImpl.invoke( > NativeMethodAccessorImpl.java:62) > at sun.reflect.DelegatingMethodAccessorImpl.invoke( > DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:483) > at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod( > RetryInvocationHandler.java:187) > at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke( > RetryInvocationHandler.java:102) > at com.sun.proxy.$Proxy10.mkdirs(Unknown Source) > at org.apache.hadoop.hdfs.DFSClient.primitiveMkdir(DFSClient.java:2742) > ... 15 more > > -- > You received this message because you are subscribed to the Google Groups > "Scalding Development" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to [email protected]. > For more options, visit https://groups.google.com/d/optout. > -- - Piyush -- You received this message because you are subscribed to the Google Groups "Scalding Development" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. For more options, visit https://groups.google.com/d/optout.
