Hey Aaron,

thanks for preparing the example. I've checked it out and tried it with a 
similar setup (12 task managers with 1 slots each, running the job with 
parallelism of 12).

I couldn't reproduce the problem. What have you configured in the "slaves" 
file? I think Flink does not allow you to run multiple task managers on a 
single machine with the startup scripts. Can you provide some information on 
how you start the system?

Thanks for helping out with this.

– Ufuk

On 24 Jun 2015, at 05:09, Aaron Jackson <ajack...@pobox.com> wrote:

> Yes, the task manager continues running.  I have put together a test app to 
> demonstrate the problem and in doing so noticed some oddities.  The problem 
> manifests itself on a simple join (I originally believed it was the distinct, 
> I was wrong).
>       • When the source is generated via fromCollection(), it works fine.
>       • When the source is generated via readCsvFile() where the file URL is 
> of the form file:/// it fails.
>       • When the source is generated via JDBCInputFormat it fails.
>               • My real app uses the JDBCInputFormat but I converted it to 
> work off data that might be in a file.
> In all cases, I stopped the cluster and restarted the cluster.  Then ran the 
> application twice, once to make sure the error occurred on a clean cluster 
> and then once again on a cluster that had previously had a failed job.  You 
> can find the application at 
> https://github.com/ajaxx/flink-examples/tree/master/FlinkErrorWithFile.
> 
> Please let me know if there is anything I can do to help.
> 
> Aaron
> 
> On Tue, Jun 23, 2015 at 1:50 AM, Ufuk Celebi <u...@apache.org> wrote:
> Hey Aaron,
> 
> thanks for reporting the issue.
> 
> You are right that the Exception is thrown during a shuffle. The receiver 
> initiates a TCP connection to receive all the data for the join. A failing 
> connect usually means that there respective TaskManager is not running.
> 
> Can you check whether all expected task managers are running? You can use the 
> web interface of the job manager for this (http://jm-address:8081).
> 
> Some further questions:
> - Are you running in stand alone/cluster mode (e.g. slaves files configured 
> and bin/start-cluster.sh script used)?
> - Is this reproducible?
> 
> – Ufuk
> 
> On 23 Jun 2015, at 07:11, Aaron Jackson <ajack...@pobox.com> wrote:
> 
> > Hello,
> >
> > I have a process that works fine with flink 0.8.1 but I decided to test it 
> > against 0.9.0-milestone-1.  I have 12 task managers across 3 machines - so 
> > it's a small setup.
> >
> > The process fails with the following message.  It appears that it's 
> > attempting to do a shuffle in response to my join request.  I checked all 3 
> > machines and there are no issues with the hostname on any of them.  But the 
> > host being reported as "localhost" seems to make me wonder if I haven't 
> > missed something obvious.
> >
> > I noticed this exception in one of the Travis CI builds, so I'm hoping it's 
> > something obvious I've missed.
> >
> > 06/23/2015 05:03:00     Join (Join at run(Job.java:137))(11/12) switched to 
> > RUNNING
> > 06/23/2015 05:03:00     Join (Join at run(Job.java:176))(9/12) switched to 
> > RUNNING
> > 06/23/2015 05:03:00     Join (Join at run(Job.java:176))(12/12) switched to 
> > RUNNING
> > 06/23/2015 05:03:00     Join (Join at run(Job.java:137))(12/12) switched to 
> > FAILED
> > java.lang.Exception: The data preparation for task 'Join (Join at 
> > run(Job.java:137))' , caused an error: Connecting the channel failed: 
> > Connection refused: localhost/127.0.0.1:46229
> >         at 
> > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:472)
> >         at 
> > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> >         at 
> > org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217)
> >         at java.lang.Thread.run(Thread.java:745)
> > Caused by: java.io.IOException: Connecting the channel failed: Connection 
> > refused: localhost/127.0.0.1:46229
> >         at 
> > org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:193)
> >         at 
> > org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:129)
> >         at 
> > org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:65)
> >         at 
> > org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:57)
> >         at 
> > org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:106)
> >         at 
> > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:305)
> >         at 
> > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:328)
> >         at 
> > org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:76)
> >         at 
> > org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
> >         at 
> > org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
> >         at 
> > org.apache.flink.runtime.operators.hash.MutableHashTable.buildInitialTable(MutableHashTable.java:696)
> >         at 
> > org.apache.flink.runtime.operators.hash.MutableHashTable.open(MutableHashTable.java:440)
> >         at 
> > org.apache.flink.runtime.operators.hash.NonReusingBuildSecondHashMatchIterator.open(NonReusingBuildSecondHashMatchIterator.java:85)
> >         at 
> > org.apache.flink.runtime.operators.MatchDriver.prepare(MatchDriver.java:160)
> >         at 
> > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:466)
> >         ... 3 more
> >
> > Thanks
> 
> 

Reply via email to