Hi Pramod,

Below are the versions I am using for Apex core & Malhar.

<apex.version>3.4.0</apex.version>
<malhar.version>3.6.0</malhar.version>


code of File Output operator:

import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;


public class WriteToHdfs extends AbstractFileOutputOperator<String>{

    String HDFS_FILENAME = "hdfs_data";

    public WriteToHdfs() {}

    public WriteToHdfs(String fileName) {
        HDFS_FILENAME = fileName;
    }

    @Override
    protected String getFileName(String s) {

        return HDFS_FILENAME;
    }

    @Override
    protected byte[] getBytesForTuple(String s) {
        return (s + "\n").getBytes();
    }

}


Regards,
Raja.

From: Pramod Immaneni <[email protected]>
Reply-To: "[email protected]" <[email protected]>
Date: Thursday, March 23, 2017 at 10:08 PM
To: "[email protected]" <[email protected]>
Subject: [EXTERNAL] Re: File output operator - HDFS Write failing

Hi Raja,

What version of malhar are you using? Are you extending the 
AbstractFileOutputOperator or are you use a stock implementation from Malhar?

Thanks

On Thu, Mar 23, 2017 at 4:57 AM, Raja.Aravapalli 
<[email protected]<mailto:[email protected]>> wrote:

Hi Team,

I have a file output operator, that is writing data into hdfs files, which is 
working fine for a day, after that started to fail with exception below:

java.lang.RuntimeException: 
org.apache.hadoop.ipc.RemoteException(java.lang.ArrayIndexOutOfBoundsException):
 java.lang.ArrayIndexOutOfBoundsException
at 
com.datatorrent.lib.io.fs.AbstractFileOutputOperator.setup(AbstractFileOutputOperator.java:428)
at 
com.datatorrent.lib.io.fs.AbstractFileOutputOperator.setup(AbstractFileOutputOperator.java:112)
at com.datatorrent.stram.engine.Node.setup(Node.java:187)
at 
com.datatorrent.stram.engine.StreamingContainer.setupNode(StreamingContainer.java:1309)
at 
com.datatorrent.stram.engine.StreamingContainer.access$100(StreamingContainer.java:130)
at 
com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1388)
Caused by: 
org.apache.hadoop.ipc.RemoteException(java.lang.ArrayIndexOutOfBoundsException):
 java.lang.ArrayIndexOutOfBoundsException
at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1552)
at org.apache.hadoop.ipc.Client.call(Client.java:1496)
at org.apache.hadoop.ipc.Client.call(Client.java:1396)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:233)
at com.sun.proxy.$Proxy10.getBlockLocations(Unknown Source)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:270)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:278)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:194)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:176)
at com.sun.proxy.$Proxy11.getBlockLocations(Unknown Source)
at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1236)
at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1223)
at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1211)
at 
org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:309)
at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:274)
at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:266)
at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1536)
at 
org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:330)
at 
org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:326)
at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at 
org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:326)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:782)
at 
com.datatorrent.lib.io.fs.AbstractFileOutputOperator.recoverFile(AbstractFileOutputOperator.java:455)
at 
com.datatorrent.lib.io.fs.AbstractFileOutputOperator.setup(AbstractFileOutputOperator.java:373)
... 5 more
2017-03-23 06:53:47,942 INFO engine.StreamingContainer 
(StreamingContainer.java:processHeartbeatResponse(799)) - Undeploy request: [6]
2017-03-23 06:53:47,944 INFO engine.StreamingContainer 
(StreamingContainer.java:undeploy(561)) - Undeploy complete.



Can someone pls help me fix this. It is breaking our production job!

Thanks a lot.



-          Raja.

Reply via email to