Hi Pramod,
Below are the versions I am using for Apex core & Malhar.
<apex.version>3.4.0</apex.version>
<malhar.version>3.6.0</malhar.version>
code of File Output operator:
import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;
public class WriteToHdfs extends AbstractFileOutputOperator<String>{
String HDFS_FILENAME = "hdfs_data";
public WriteToHdfs() {}
public WriteToHdfs(String fileName) {
HDFS_FILENAME = fileName;
}
@Override
protected String getFileName(String s) {
return HDFS_FILENAME;
}
@Override
protected byte[] getBytesForTuple(String s) {
return (s + "\n").getBytes();
}
}
Regards,
Raja.
From: Pramod Immaneni <[email protected]>
Reply-To: "[email protected]" <[email protected]>
Date: Thursday, March 23, 2017 at 10:08 PM
To: "[email protected]" <[email protected]>
Subject: [EXTERNAL] Re: File output operator - HDFS Write failing
Hi Raja,
What version of malhar are you using? Are you extending the
AbstractFileOutputOperator or are you use a stock implementation from Malhar?
Thanks
On Thu, Mar 23, 2017 at 4:57 AM, Raja.Aravapalli
<[email protected]<mailto:[email protected]>> wrote:
Hi Team,
I have a file output operator, that is writing data into hdfs files, which is
working fine for a day, after that started to fail with exception below:
java.lang.RuntimeException:
org.apache.hadoop.ipc.RemoteException(java.lang.ArrayIndexOutOfBoundsException):
java.lang.ArrayIndexOutOfBoundsException
at
com.datatorrent.lib.io.fs.AbstractFileOutputOperator.setup(AbstractFileOutputOperator.java:428)
at
com.datatorrent.lib.io.fs.AbstractFileOutputOperator.setup(AbstractFileOutputOperator.java:112)
at com.datatorrent.stram.engine.Node.setup(Node.java:187)
at
com.datatorrent.stram.engine.StreamingContainer.setupNode(StreamingContainer.java:1309)
at
com.datatorrent.stram.engine.StreamingContainer.access$100(StreamingContainer.java:130)
at
com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1388)
Caused by:
org.apache.hadoop.ipc.RemoteException(java.lang.ArrayIndexOutOfBoundsException):
java.lang.ArrayIndexOutOfBoundsException
at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1552)
at org.apache.hadoop.ipc.Client.call(Client.java:1496)
at org.apache.hadoop.ipc.Client.call(Client.java:1396)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:233)
at com.sun.proxy.$Proxy10.getBlockLocations(Unknown Source)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:270)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:278)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:194)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:176)
at com.sun.proxy.$Proxy11.getBlockLocations(Unknown Source)
at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1236)
at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1223)
at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1211)
at
org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:309)
at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:274)
at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:266)
at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1536)
at
org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:330)
at
org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:326)
at
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at
org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:326)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:782)
at
com.datatorrent.lib.io.fs.AbstractFileOutputOperator.recoverFile(AbstractFileOutputOperator.java:455)
at
com.datatorrent.lib.io.fs.AbstractFileOutputOperator.setup(AbstractFileOutputOperator.java:373)
... 5 more
2017-03-23 06:53:47,942 INFO engine.StreamingContainer
(StreamingContainer.java:processHeartbeatResponse(799)) - Undeploy request: [6]
2017-03-23 06:53:47,944 INFO engine.StreamingContainer
(StreamingContainer.java:undeploy(561)) - Undeploy complete.
Can someone pls help me fix this. It is breaking our production job!
Thanks a lot.
- Raja.