Hi,

 

I'm trying to split one large multi-field text file into many single-field
text files.

My code is like this: (somewhat simplified)

 

    final Broadcast<ColSchema> bcSchema = sc.broadcast(schema);

    final String outputPathName = env.outputPathName;

 

    sc.textFile(env.inputFileName)

    .mapPartitionsWithIndex(new Function2<Integer, Iterator<String>,
Iterator<String>>()

    {

        @Override

        public Iterator<String> call(Integer partitionIndex,
Iterator<String> itor) throws Exception

        {

            ColSchema schema = bcSchema.value();

            FileSystem outputFs = FileSystem.get(new URI(outputPathName),
new Configuration());

            PrintStream[] outss = new PrintStream[schema.getColCount()];

            try

            {

                while (itor.hasNext())

                {

                    String cols[] = itor.next().split("\t", -1);

 

                    for (int i = 0; i < schema.getColCount(); i++)

                    {

                        String value = cols[i];

                        if (value.isEmpty())

                            continue;

                

                        if (outss[i] == null)

                            outss[i] = new PrintStream(

                                outputFs.create(new Path(outputPathName +
"/" + schema.getColName(i) +

                                    ".tsv/part-" + String.format("%05d",
partitionIndex))));

                

                        outss[i].println(value);

                    }

                }

            }

            finally

            {

                for (PrintStream outs : outss)

                    if (outs != null)

                        outs.close();

            }

            return new ArrayList<String>().iterator();  // dummy.

        }

    }, true)

    .count();  // just to invoke mapPartitionsWithIndex().

 

    bcSchema.unpersist();

 

 

Basically, it uses mapPartitionsWithIndex() to write multiple single-field
file at once partition by partition.

Eventually the job succeeds.

But occasionally while executing, the following exception is thrown and the
task fails (the task is automatically retried by Spark and then succeeds).

The location is itor.hasNext().

 

 

14/07/28 19:10:47 WARN TaskSetManager: Lost TID 154 (task 10.0:142)

14/07/28 19:10:47 WARN TaskSetManager: Loss was due to
org.apache.hadoop.hdfs.BlockMissingException

org.apache.hadoop.hdfs.BlockMissingException: Could not obtain block:
BP-1127695701-10.254.0.11-1405426572227:blk_1073930972_190153
file=/user/test/Data/big.tsv/part-00142

       at
org.apache.hadoop.hdfs.DFSInputStream.chooseDataNode(DFSInputStream.java:880
)

       at
org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:560)

       at
org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:7
90)

       at
org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:837)

       at java.io.DataInputStream.read(DataInputStream.java:100)

       at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:180)

       at
org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)

       at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)

       at
org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:209)

       at
org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:47)

       at
org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:198)

       at
org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:181)

       at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)

       at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:3
9)

       at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

       at
scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:29)

       at com.test.tester.Splitter$17.call(Splitter.java:504)

       at com.test.tester.Splitter$17.call(Splitter.java:495)

       at
org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction2$1.apply(Java
PairRDD.scala:744)

       at
org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.appl
y(JavaRDDLike.scala:81)

       at
org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.appl
y(JavaRDDLike.scala:81)

       at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:569)

       at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:569)

       at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)

       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)

       at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)

       at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)

       at org.apache.spark.scheduler.Task.run(Task.scala:51)

       at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)

       at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:11
45)

       at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:6
15)

       at java.lang.Thread.run(Thread.java:744)

 

Since it succeeds eventually, the input file cannot be corrupt.

What can cause this exception?

 

Thanks.

 

 

Reply via email to