[ 
https://issues.apache.org/jira/browse/BEAM-2308?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jean-Baptiste Onofré resolved BEAM-2308.
----------------------------------------
       Resolution: Won't Fix
         Assignee: Jean-Baptiste Onofré  (was: Amit Sela)
    Fix Version/s: Not applicable

{{HDFSFileSource}} has been deleted from Beam 2.0.0 and replaced by filesystem 
that you can  directly use with {{TextIO}}.

> run beam on spark runner successfully with small data,but fail with a big data
> ------------------------------------------------------------------------------
>
>                 Key: BEAM-2308
>                 URL: https://issues.apache.org/jira/browse/BEAM-2308
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-spark, sdk-java-core
>         Environment: spark jar 1.6.2
>            Reporter: liyuntian
>            Assignee: Jean-Baptiste Onofré
>             Fix For: Not applicable
>
>
> run beam on spark runner successfully with small data,but fail with a big 
> data about 1G.my spark configuration:--num-executors 3  --executor-memory 4G  
> --executor-cores 5.
> this is my code:
> Read.Bounded<KV<LongWritable, Text>> source = 
> Read.from(HDFSFileSource.from(inputPath, TextInputFormat.class, 
> LongWritable.class, Text.class).withConfiguration(config));
>             PCollection<KV<LongWritable, Text>> recordsFromHdfs = 
> pipeline.apply(source);
>             PCollection<List<String>> recordsList = 
> recordsFromHdfs.apply(ParDo.of(new InputHdfsFileFn(delimit, 
> firstTableColumnsSize)));
>             //convert to flow
>             String nextOutputTable;
>             Map<String, ComponentPara> map = beamTable.row(firstTableName);
>             ComponentPara component = (ComponentPara) 
> map.values().toArray()[0];
>             PCollection<List<String>> nextPCollection = 
> ComponentConvert.convert(component,recordsList);
>             //write result to hdfs
>             PCollection<String> recordsToHdfs = 
> nextPCollection.apply(ParDo.of(new OutputHdfsFileFn(delimit)));
>             HiveTable.deleteBeamFileOnHdfs(outputPath);
>             logger.info("输出文件位置:"+outputPath);
>             
> recordsToHdfs.apply(Write.to(HDFSFileSink.<String>toText(outputPath).withConfiguration(config)));
>             pipeline.run().waitUntilFinish();
> this is error:
> 17/05/16 21:31:57 WARN scheduler.TaskSetManager: Lost task 3.0 in stage 1.0 
> (TID 13, etl-dev-02): java.util.NoSuchElementException
>       at 
> org.apache.beam.sdk.io.hdfs.HDFSFileSource$HDFSFileReader.getCurrent(HDFSFileSource.java:510)
>       at 
> org.apache.beam.runners.spark.io.SourceRDD$Bounded$1.next(SourceRDD.java:142)
>       at 
> org.apache.beam.runners.spark.io.SourceRDD$Bounded$1.next(SourceRDD.java:111)
>       at 
> scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:42)
>       at 
> org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
>       at scala.collection.Iterator$$anon$12.next(Iterator.scala:357)
>       at 
> org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
>       at 
> scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:30)
>       at 
> org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:165)
>       at 
> org.apache.beam.spark.repackaged.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:145)
>       at 
> org.apache.beam.spark.repackaged.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:140)
>       at 
> org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:162)
>       at 
> org.apache.beam.spark.repackaged.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:145)
>       at 
> org.apache.beam.spark.repackaged.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:140)
>       at 
> org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:162)
>       at 
> org.apache.beam.spark.repackaged.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:145)
>       at 
> org.apache.beam.spark.repackaged.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:140)
>       at 
> org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:162)
>       at 
> org.apache.beam.spark.repackaged.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:145)
>       at 
> org.apache.beam.spark.repackaged.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:140)
>       at 
> org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:162)
>       at 
> org.apache.beam.spark.repackaged.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:145)
>       at 
> org.apache.beam.spark.repackaged.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:140)
>       at 
> org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:162)
>       at 
> org.apache.beam.spark.repackaged.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:145)
>       at 
> org.apache.beam.spark.repackaged.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:140)
>       at 
> org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:162)
>       at 
> org.apache.beam.spark.repackaged.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:145)
>       at 
> org.apache.beam.spark.repackaged.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:140)
>       at 
> org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:162)
>       at 
> org.apache.beam.spark.repackaged.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:145)
>       at 
> org.apache.beam.spark.repackaged.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:140)
>       at 
> org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:162)
>       at 
> org.apache.beam.spark.repackaged.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:145)
>       at 
> org.apache.beam.spark.repackaged.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:140)
>       at 
> scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:41)
>       at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>       at 
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>       at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>       at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>       at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
>       at scala.collection.AbstractIterator.to(Iterator.scala:1157)
>       at 
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>       at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>       at 
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>       at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927)
>       at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
>       at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>       at org.apache.spark.scheduler.Task.run(Task.scala:89)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>       at java.lang.Thread.run(Thread.java:745)
> 17/05/16 21:31:57 INFO scheduler.TaskSetManager: Starting task 3.1 in stage 
> 1.0 (TID 20, etl-dev-02, partition 3,PROCESS_LOCAL, 42372 bytes)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to