Robert Reid created SPARK-24335:
-----------------------------------

             Summary: Dataset.map schema not applied in some cases
                 Key: SPARK-24335
                 URL: https://issues.apache.org/jira/browse/SPARK-24335
             Project: Spark
          Issue Type: Bug
          Components: Structured Streaming
    Affects Versions: 2.3.0, 2.1.0
            Reporter: Robert Reid


In the following code an {color:#808080}UnsupportedOperationException{color} is 
thrown in the filter() call just after the Dateset.map() call unless 
withWatermark() is added between them. The error reports 
`{color:#808080}fieldIndex on a Row without schema is undefined{color}`.  I 
expect the map() method to have applied the schema and for it to be accessible 
in filter().  Without the extra withWatermark() call my debugger reports that 
the `row` objects in the filter lambda are `GenericRow`.  With the watermark 
call it reports that they are `GenericRowWithSchema`.

I should add that I'm new to working with Structured Streaming.  So if I'm 
overlooking some implied dependency please fill me in.

I'm encountering this in new code for a new production job. The presented code 
is distilled down to demonstrate the problem.  While the problem can be worked 
around simply by adding withWatermark() I'm concerned that this will leave the 
code in a fragile state.  With this simplified code if this error occurs again 
it will be easy to identify what change led to the error.  But in the code I'm 
writing, with this functionality delegated to other classes, it is (and has 
been) very challenging to identify the cause.

 
{code:java}
public static void main(String[] args) {
    SparkSession sparkSession = 
SparkSession.builder().master("local").getOrCreate();

    sparkSession.conf().set(
            "spark.sql.streaming.checkpointLocation",
            "hdfs://localhost:9000/search_relevance/checkpoint" // for spark 2.3
            // "spark.sql.streaming.checkpointLocation", "tmp/checkpoint" // 
for spark 2.1
    );

    StructType inSchema = DataTypes.createStructType(
            new StructField[] {
                    DataTypes.createStructField("id", DataTypes.StringType      
, false),
                    DataTypes.createStructField("ts", DataTypes.TimestampType   
, false),
                    DataTypes.createStructField("f1", DataTypes.LongType        
, true)
            }
    );
    Dataset<Row> rawSet = sparkSession.sqlContext().readStream()
            .format("rate")
            .option("rowsPerSecond", 1)
            .load()
            .map(   (MapFunction<Row, Row>) raw -> {
                        Object[] fields = new Object[3];
                        fields[0] = "id1";
                        fields[1] = raw.getAs("timestamp");
                        fields[2] = raw.getAs("value");
                        return RowFactory.create(fields);
                    },
                    RowEncoder.apply(inSchema)
            )
            // If withWatermark() is included above the filter() line then this 
works.  Without it we get:
            //    Caused by: java.lang.UnsupportedOperationException: 
fieldIndex on a Row without schema is undefined.
            // at the row.getAs() call.
            // .withWatermark("ts", "10 seconds")  // <-- This is required for 
row.getAs("f1") to work ???
            .filter((FilterFunction<Row>) row -> !row.getAs("f1").equals(0L))
            .withWatermark("ts", "10 seconds")
            ;

    StreamingQuery streamingQuery = rawSet
            .select("*")
            .writeStream()
            .format("console")
            .outputMode("append")
            .start();

    try {
        streamingQuery.awaitTermination(30_000);
    } catch (StreamingQueryException e) {
        System.out.println("Caught exception at 'awaitTermination':");
        e.printStackTrace();
    }
}{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org


Reply via email to