Hi Dawid, About your exception:
1. When you remove the window start and end selections, using UserSongsStatistics can work well. the sql as follows: SELECT OUNT(1) as cnt, song_name as songName, userId FROM songs WHERE type = 'PLAY' GROUP BY song_name, userId, TUMBLE(t, INTERVAL '3' SECOND)"); 2. When you add window properties in the projections. It's can not work. So, First of all, for your case I suggest you keep using: 【tEnv.toAppendStream(table,TypeInformation.of(Row.class)).print();】 Because Row can include all fields which you want added in UserSongsStatistics. And for express my point clearly, I had write a example according your test case. Please see the example code as follows link: 1. SongEventTableSource.java <https://github.com/sunjincheng121/flink/blob/Bayes/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/SongEventTableSource.java> using `emitWatermark ` `collectWithTimestamp` and the objects you mentioned. 2. StreamSql.java <https://github.com/sunjincheng121/flink/blob/Bayes/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/StreamSql.java> like yours. Secondly, I think you can open a JIRA. relation the error. I hope it can help you and best wish to you. welcome any feedback. :) Thanks, SunJincheng 2017-06-10 12:02 GMT+08:00 jincheng sun <sunjincheng...@gmail.com>: > Hi Dawid, > > For your case I think you can keep using: > > 【tEnv.toAppendStream(table,TypeInformation.of(Row.class)).print();】 > > Because Row can include all fields which you want added in > UserSongsStatistics. And for express my point clearly, I had write a > example according your test case. Please see the example code as follows > link: > > 1. SongEventTableSource.java > <https://github.com/sunjincheng121/flink/blob/Bayes/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/SongEventTableSource.java> > using `emitWatermark ` `collectWithTimestamp` and the objects you mentioned. > 2. StreamSql.java > <https://github.com/sunjincheng121/flink/blob/Bayes/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/StreamSql.java> > like yours. > > I hope it can help you and best wish to you. > welcome any feedback. :) > > Thanks, > SunJincheng > > 2017-06-10 5:02 GMT+08:00 Dawid Wysakowicz <wysakowicz.da...@gmail.com>: > >> Thanks a lot Timo, after I added the ResultTypeQueryable interface to my >> mapper it worked. As for the SongEvent the reason I tried remapping it to >> Row is that it has an enum field on which I want to filter, so my first >> approach was to remap it in TableSource to String. What do you think should >> be the way to go in such case? >> >> After successfully producing DataStream[Row] I tried sth like: >> >>> tEnv.toAppendStream(table)(TypeInformation.of(classOf[UserSo >>> ngsStatistics])).print(); >>> >> >> The class UserSongsStatistics is a pojo with fields named the same as >> expressions in SELECT clause. Is such a construct intended to work? Right >> now I get an error: >> >> org.apache.flink.table.api.TableException: The field types of physical >>> and logical row types do not match.This is a bug and should not happen. >>> Please file an issue. >> >> >> Is it really a bug? >> >> Anyway thanks for help. I will file a JIRA for the previous issue >> tomorrow. >> >> Z pozdrowieniami! / Cheers! >> >> Dawid Wysakowicz >> >> *Data/Software Engineer* >> >> Skype: dawid_wys | Twitter: @OneMoreCoder >> >> <http://getindata.com/> >> >> 2017-06-09 22:25 GMT+02:00 Timo Walther <twal...@apache.org>: >> >>> Hi David, >>> >>> I think the problem is that the type of the DataStream produced by the >>> TableSource, does not match the type that is declared in the ` >>> getReturnType()`. A `MapFunction<xxx, Row>` is always a generic type >>> (because Row cannot be analyzed). A solution would be that the mapper >>> implements `ResultTypeQueryable`. I agree that the error should be thrown >>> earlier, not in the CodeGenerator. Can you create an issue for this? >>> >>> Btw the Table API supports nested types, it should work that the >>> TableSource returns ` SongEvent`. >>> >>> Regards, >>> Timo >>> >>> >>> Am 09.06.17 um 20:19 schrieb Dawid Wysakowicz: >>> >>> Sorry forgot to add the link: >>> >>> https://gist.github.com/dawidwys/537d12a6f2355cba728bf93f1af87b45 >>> >>> Z pozdrowieniami! / Cheers! >>> >>> Dawid Wysakowicz >>> >>> *Data/Software Engineer* >>> >>> Skype: dawid_wys | Twitter: @OneMoreCoder >>> >>> <http://getindata.com/> >>> >>> 2017-06-09 20:19 GMT+02:00 Dawid Wysakowicz <wysakowicz.da...@gmail.com> >>> : >>> >>>> Hi, >>>> I tried writing a simple sql query with custom StreamTableSource and it >>>> fails with error: >>>> >>>> org.apache.flink.table.codegen.CodeGenException: Arity of result type >>>>>> does not match number of expressions. >>>>> >>>>> at org.apache.flink.table.codegen.CodeGenerator.generateResultE >>>>>> xpression(CodeGenerator.scala:940) >>>>> >>>>> at org.apache.flink.table.codegen.CodeGenerator.generateConvert >>>>>> erResultExpression(CodeGenerator.scala:883) >>>>> >>>>> at org.apache.flink.table.plan.nodes.CommonScan$class.generated >>>>>> ConversionFunction(CommonScan.scala:57) >>>>> >>>>> at org.apache.flink.table.plan.nodes.datastream.StreamTableSour >>>>>> ceScan.generatedConversionFunction(StreamTableSourceScan.scala:35) >>>>> >>>>> at org.apache.flink.table.plan.nodes.datastream.StreamScan$clas >>>>>> s.convertToInternalRow(StreamScan.scala:48) >>>>> >>>>> at org.apache.flink.table.plan.nodes.datastream.StreamTableSour >>>>>> ceScan.convertToInternalRow(StreamTableSourceScan.scala:35) >>>>> >>>>> at org.apache.flink.table.plan.nodes.datastream.StreamTableSour >>>>>> ceScan.translateToPlan(StreamTableSourceScan.scala:107) >>>>> >>>>> >>>> You can check the source code here: >>>> >>>> >>>> Z pozdrowieniami! / Cheers! >>>> >>>> Dawid Wysakowicz >>>> >>>> *Data/Software Engineer* >>>> >>>> Skype: dawid_wys | Twitter: @OneMoreCoder >>>> >>>> <http://getindata.com/> >>>> >>> >>> >>> >> >