The javadoc on the distinct function
<http://twitter.github.io/scalding/index.html#com.twitter.scalding.typed.TypedPipe@distinct(implicitord:Ordering[_>:T]):com.twitter.scalding.typed.TypedPipe[T]>
seems to comment on your use-case:
"Returns the set of distinct elements in the TypedPipe This is the same as:
.map((_, ())).group.sum.keys If you want a distinct while joining,
consider: instead of: a.join(b.distinct.asKeys) manually do the distinct:
a.join(b.asKeys.sum) The latter creates 1 map/reduce phase rather than 2"

Could you try that? (val joinedSets = data1.join(data2.asKeys.sum)  ). Not
sure if the hdfs permissions error you're running into is while temporary
job output is being written out between 2 MR jobs / somewhere else.

On Wed, Nov 2, 2016 at 11:54 PM, Nikhil J Joshi <[email protected]> wrote:

> Hi,
> I am trying to perform join with distinct keys in Scalding as
>
> val joinedSets = data1 .join(data2.distinct.asKeys)
>
> And the above operation raises hdfs permissions error (stacktrace below),
> while the same without the distinct clause works well. I am imagining that
> the clause is forcing some disk io at the wrong path. Can anyone suggest a
> remedy?
>
> Thanks,
> Nikhil
>
> stacktrace:
>
> Job setup failed : org.apache.hadoop.security.AccessControlException:
> Permission denied: user=username, access=WRITE,
> inode="/":hdfs:hdfs:drwxr-xr-x
> at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.
> checkFsPermission(FSPermissionChecker.java:271)
> at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.
> check(FSPermissionChecker.java:257)
> at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.
> check(FSPermissionChecker.java:238)
> at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.
> checkPermission(FSPermissionChecker.java:179)
> at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> checkPermission(FSNamesystem.java:6630)
> at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> checkPermission(FSNamesystem.java:6612)
> at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> checkAncestorAccess(FSNamesystem.java:6564)
> at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> mkdirsInternal(FSNamesystem.java:4368)
> at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> mkdirsInt(FSNamesystem.java:4338)
> at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirs(
> FSNamesystem.java:4311)
> at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.
> mkdirs(NameNodeRpcServer.java:853)
> at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSi
> deTranslatorPB.mkdirs(ClientNamenodeProtocolServerSi
> deTranslatorPB.java:600)
> at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$
> ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.
> java)
> at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(
> ProtobufRpcEngine.java:619)
> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:962)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2063)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2059)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at org.apache.hadoop.security.UserGroupInformation.doAs(
> UserGroupInformation.java:1688)
> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2057)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance(
> NativeConstructorAccessorImpl.java:62)
> at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(
> DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:408)
> at org.apache.hadoop.ipc.RemoteException.instantiateException(
> RemoteException.java:106)
> at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(
> RemoteException.java:73)
> at org.apache.hadoop.hdfs.DFSClient.primitiveMkdir(DFSClient.java:2744)
> at org.apache.hadoop.hdfs.DFSClient.mkdirs(DFSClient.java:2713)
> at org.apache.hadoop.hdfs.DistributedFileSystem$17.
> doCall(DistributedFileSystem.java:870)
> at org.apache.hadoop.hdfs.DistributedFileSystem$17.
> doCall(DistributedFileSystem.java:866)
> at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(
> FileSystemLinkResolver.java:81)
> at org.apache.hadoop.hdfs.DistributedFileSystem.mkdirsInternal(
> DistributedFileSystem.java:866)
> at org.apache.hadoop.hdfs.DistributedFileSystem.mkdirs(
> DistributedFileSystem.java:859)
> at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1817)
> at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(
> FileOutputCommitter.java:305)
> at org.apache.hadoop.mapred.FileOutputCommitter.setupJob(
> FileOutputCommitter.java:131)
> at org.apache.hadoop.mapred.OutputCommitter.setupJob(
> OutputCommitter.java:233)
> at org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventHandler$
> EventProcessor.handleJobSetup(CommitterEventHandler.java:254)
> at org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventHandler$
> EventProcessor.run(CommitterEventHandler.java:234)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.
> hadoop.security.AccessControlException): Permission denied:
> user=username, access=WRITE, inode="/":hdfs:hdfs:drwxr-xr-x
> at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.
> checkFsPermission(FSPermissionChecker.java:271)
> at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.
> check(FSPermissionChecker.java:257)
> at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.
> check(FSPermissionChecker.java:238)
> at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.
> checkPermission(FSPermissionChecker.java:179)
> at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> checkPermission(FSNamesystem.java:6630)
> at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> checkPermission(FSNamesystem.java:6612)
> at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> checkAncestorAccess(FSNamesystem.java:6564)
> at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> mkdirsInternal(FSNamesystem.java:4368)
> at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> mkdirsInt(FSNamesystem.java:4338)
> at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirs(
> FSNamesystem.java:4311)
> at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.
> mkdirs(NameNodeRpcServer.java:853)
> at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSi
> deTranslatorPB.mkdirs(ClientNamenodeProtocolServerSi
> deTranslatorPB.java:600)
> at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$
> ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.
> java)
> at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(
> ProtobufRpcEngine.java:619)
> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:962)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2063)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2059)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at org.apache.hadoop.security.UserGroupInformation.doAs(
> UserGroupInformation.java:1688)
> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2057)
> at org.apache.hadoop.ipc.Client.call(Client.java:1469)
> at org.apache.hadoop.ipc.Client.call(Client.java:1400)
> at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.
> invoke(ProtobufRpcEngine.java:232)
> at com.sun.proxy.$Proxy9.mkdirs(Unknown Source)
> at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslat
> orPB.mkdirs(ClientNamenodeProtocolTranslatorPB.java:539)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:483)
> at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(
> RetryInvocationHandler.java:187)
> at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(
> RetryInvocationHandler.java:102)
> at com.sun.proxy.$Proxy10.mkdirs(Unknown Source)
> at org.apache.hadoop.hdfs.DFSClient.primitiveMkdir(DFSClient.java:2742)
> ... 15 more
>
> --
> You received this message because you are subscribed to the Google Groups
> "Scalding Development" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to [email protected].
> For more options, visit https://groups.google.com/d/optout.
>



-- 
- Piyush

-- 
You received this message because you are subscribed to the Google Groups 
"Scalding Development" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
For more options, visit https://groups.google.com/d/optout.

Reply via email to