Mark You created FLINK-6862:
-------------------------------

             Summary: Tumble window rowtime not resolve at logic plan validation
                 Key: FLINK-6862
                 URL: https://issues.apache.org/jira/browse/FLINK-6862
             Project: Flink
          Issue Type: Bug
          Components: Table API & SQL
    Affects Versions: 1.3.0
            Reporter: Mark You


Following code sample work in version 1.2.1, but failed at 1.3.0
{code:title=Bar.java|borderStyle=solid}
public class TumblingWindow {

    public static void main(String[] args) throws Exception {
        List<Content> data = new ArrayList<Content>();
        data.add(new Content(1L, "Hi"));
        data.add(new Content(2L, "Hallo"));
        data.add(new Content(3L, "Hello"));
        data.add(new Content(4L, "Hello"));
        data.add(new Content(7L, "Hello"));
        data.add(new Content(8L, "Hello world"));
        data.add(new Content(16L, "Hello world"));

        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        final StreamTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);

        DataStream<Content> stream = env.fromCollection(data);

        DataStream<Content> stream2 = stream.assignTimestampsAndWatermarks(
                new 
BoundedOutOfOrdernessTimestampExtractor<Content>(Time.milliseconds(1)) {

                    /**
                     * 
                     */
                    private static final long serialVersionUID = 
410512296011057717L;

                    @Override
                    public long extractTimestamp(Content element) {
                        return element.getRecordTime();
                    }

                });

        Table table = tableEnv.fromDataStream(stream2);
        
table.window(Tumble.over("1.hours").on("rowtime").as("w")).groupBy("w").select("w.start,
 content.count");

        env.execute();
    }

    public static class Content implements Serializable {

        private long recordTime;
        private String content;

        public Content() {
            super();
        }

        public Content(long recordTime, String content) {
            super();
            this.recordTime = recordTime;
            this.content = content;
        }

        public long getRecordTime() {
            return recordTime;
        }

        public void setRecordTime(long recordTime) {
            this.recordTime = recordTime;
        }

        public String getContent() {
            return content;
        }

        public void setContent(String content) {
            this.content = content;
        }

    }

    private class TimestampWithEqualWatermark implements 
AssignerWithPunctuatedWatermarks<Object[]> {

        /**
         * 
         */
        private static final long serialVersionUID = 1L;

        @Override
        public long extractTimestamp(Object[] element, long 
previousElementTimestamp) {
            // TODO Auto-generated method stub
            return (long) element[0];
        }

        @Override
        public Watermark checkAndGetNextWatermark(Object[] lastElement, long 
extractedTimestamp) {
            return new Watermark(extractedTimestamp);
        }

    }
}
{code}

{noformat}
Exception trace
Exception in thread "main" org.apache.flink.table.api.ValidationException: 
Cannot resolve [rowtime] given input [content, recordTime].
        at 
org.apache.flink.table.plan.logical.LogicalNode.failValidation(LogicalNode.scala:143)
        at 
org.apache.flink.table.plan.logical.LogicalNode$$anonfun$validate$1.applyOrElse(LogicalNode.scala:86)
        at 
org.apache.flink.table.plan.logical.LogicalNode$$anonfun$validate$1.applyOrElse(LogicalNode.scala:83)
        at 
org.apache.flink.table.plan.TreeNode.postOrderTransform(TreeNode.scala:72)
        at 
org.apache.flink.table.plan.logical.LogicalNode.org$apache$flink$table$plan$logical$LogicalNode$$expressionPostOrderTransform$1(LogicalNode.scala:119)
        at 
org.apache.flink.table.plan.logical.LogicalNode$$anonfun$7$$anonfun$apply$1.apply(LogicalNode.scala:132)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
        at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at 
org.apache.flink.table.plan.logical.LogicalNode$$anonfun$7.apply(LogicalNode.scala:131)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
        at scala.collection.Iterator$class.foreach(Iterator.scala:742)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
        at 
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
        at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
        at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:308)
        at scala.collection.AbstractIterator.to(Iterator.scala:1194)
        at 
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:300)
        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1194)
        at 
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:287)
        at scala.collection.AbstractIterator.toArray(Iterator.scala:1194)
        at 
org.apache.flink.table.plan.logical.LogicalNode.expressionPostOrderTransform(LogicalNode.scala:137)
        at 
org.apache.flink.table.plan.logical.LogicalNode.validate(LogicalNode.scala:83)
        at 
org.apache.flink.table.plan.logical.Project.validate(operators.scala:67)
        at 
org.apache.flink.table.api.WindowGroupedTable.select(table.scala:1054)
        at 
org.apache.flink.table.api.WindowGroupedTable.select(table.scala:1073)
        at com.taiwanmobile.cep.noc.TumblingWindow.main(TumblingWindow.java:54)
{noformat}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to