[ 
https://issues.apache.org/jira/browse/SPARK-3693?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xuefu Zhang updated SPARK-3693:
-------------------------------
    Description: 
While trying RDD caching, it's found that caching a Hadoop RDD causes data 
correctness issues. The following code snippet demonstrates the usage:

{code}
public final class Test {
    public static void main(String[] args) throws Exception {
        SparkConf sparkConf = new SparkConf().setAppName("Test");
        JavaSparkContext ctx = new JavaSparkContext(sparkConf);
        ... 
        JavaPairRDD<BytesWritable, BytesWritable> input = 
                ctx.hadoopRDD(jobConf, CombineHiveInputClass.class, 
WritableComparable.class, Writable.class);
        input = input.cache();
        input.foreach(new VoidFunction<Tuple2<BytesWritable, BytesWritable>>() {
            @Override
            public void call(Tuple2<BytesWritable, BytesWritable> row) throws 
Exception {
                if (row._1() != null) {
                    System.out.println("Key: " + row._1());
                }
                if (row._2() != null) {
                    System.out.println("Value: " + row._2());
                }
            }
        });
        ctx.stop();
    }
}
{code}
In this case, row._2() always gives the same value. If we disable caching by 
removing input.cache(), the program gives the expected rows.

Further analysis shows that MemoryStore (see 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala#L236)
 is storing the references to (key, value) pairs returned by 
HadoopRDD.getNext() (See 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala#L220),
 but this method always returns the same (key, value) object references, except 
each getNext() call updates values inside these objects. When there are no more 
records (key, value) objects are filled with empty strings (no values) in 
CombineFileRecordReader. As all pairs in MemoryStore.vector refer to the same 
key, value object pairs, all values become NULL.

Probably MemoryStore should instead store a copy of <key, value> pair rather 
than keeping a reference to it.


  was:
While trying RDD caching, it's found that caching a Hadoop RDD causes data 
correctness issues. The following code snippet demonstrates the usage:

{code}
public final class Test {
    private static final Pattern SPACE = Pattern.compile(" ");

    public static void main(String[] args) throws Exception {
        SparkConf sparkConf = new SparkConf().setAppName("Test");
        JavaSparkContext ctx = new JavaSparkContext(sparkConf);
        ... 
        JavaPairRDD<BytesWritable, BytesWritable> input = 
                ctx.hadoopRDD(jobConf, CombineHiveInputClass.class, 
WritableComparable.class, Writable.class);
        input = input.cache();
        input.foreach(new VoidFunction<Tuple2<BytesWritable, BytesWritable>>() {
            @Override
            public void call(Tuple2<BytesWritable, BytesWritable> row) throws 
Exception {
                if (row._1() != null) {
                    System.out.println("Key: " + row._1());
                }
                if (row._2() != null) {
                    System.out.println("Value: " + row._2());
                }
            }
        });
        ctx.stop();
    }
}
{code}
In this case, row._2() always gives the same value. If we disable caching by 
removing input.cache(), the program gives the expected rows.

Further analysis shows that MemoryStore (see 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala#L236)
 is storing the references to (key, value) pairs returned by 
HadoopRDD.getNext() (See 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala#L220),
 but this method always returns the same (key, value) object references, except 
each getNext() call updates values inside these objects. When there are no more 
records (key, value) objects are filled with empty strings (no values) in 
CombineFileRecordReader. As all pairs in MemoryStore.vector refer to the same 
key, value object pairs, all values become NULL.

Probably MemoryStore should instead store a copy of <key, value> pair rather 
than keeping a reference to it.



> Cached Hadoop RDD always return rows with the same value
> --------------------------------------------------------
>
>                 Key: SPARK-3693
>                 URL: https://issues.apache.org/jira/browse/SPARK-3693
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.2.0
>            Reporter: Xuefu Zhang
>
> While trying RDD caching, it's found that caching a Hadoop RDD causes data 
> correctness issues. The following code snippet demonstrates the usage:
> {code}
> public final class Test {
>     public static void main(String[] args) throws Exception {
>         SparkConf sparkConf = new SparkConf().setAppName("Test");
>         JavaSparkContext ctx = new JavaSparkContext(sparkConf);
>         ... 
>         JavaPairRDD<BytesWritable, BytesWritable> input = 
>                 ctx.hadoopRDD(jobConf, CombineHiveInputClass.class, 
> WritableComparable.class, Writable.class);
>         input = input.cache();
>         input.foreach(new VoidFunction<Tuple2<BytesWritable, 
> BytesWritable>>() {
>             @Override
>             public void call(Tuple2<BytesWritable, BytesWritable> row) throws 
> Exception {
>                 if (row._1() != null) {
>                     System.out.println("Key: " + row._1());
>                 }
>                 if (row._2() != null) {
>                     System.out.println("Value: " + row._2());
>                 }
>             }
>         });
>         ctx.stop();
>     }
> }
> {code}
> In this case, row._2() always gives the same value. If we disable caching by 
> removing input.cache(), the program gives the expected rows.
> Further analysis shows that MemoryStore (see 
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala#L236)
>  is storing the references to (key, value) pairs returned by 
> HadoopRDD.getNext() (See 
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala#L220),
>  but this method always returns the same (key, value) object references, 
> except each getNext() call updates values inside these objects. When there 
> are no more records (key, value) objects are filled with empty strings (no 
> values) in CombineFileRecordReader. As all pairs in MemoryStore.vector refer 
> to the same key, value object pairs, all values become NULL.
> Probably MemoryStore should instead store a copy of <key, value> pair rather 
> than keeping a reference to it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to