I am trying to understand how to use streaming sql, very similar to the example from the documentation: count the number of pageclicks in a certain period of time for each user. I'm trying to solve the problem using both the SQL API and the table API
My input sample stream looks like this: (timestamp, user, url) I'm using EventTime 1530229582338,John,./reports?id=311 1530229584339,Bob,./cart 1530229587339,Mary,./prod?id=1 1530229592340,Liz,./home 1530229598340,John,./reports?id=312 1530229599341,John,./reports?id=313 1530229600342,Bob,./prod?id=3 1530229601342,Mary,./prod?id=7 1530229604343,Liz,./prod?id=4 My SQL API solution source code is here. Everything is working, but I have a question: It appears that the datatype of the timestamp is java.sql.Timestamp. *Is there a way to retrieve this from the query as a long instead?* When I examine the schema for the table returned by SQL query, I get this: root |-- wend: TimeIndicatorTypeInfo(rowtime) |-- username: String |-- viewcount: Long I would prefer to get the schema back like this instead: root |-- wend: Long |-- username: String |-- viewcount: Long // Get Our Data Stream DataStream<Tuple3<Long,String,String>> eventStream = env .socketTextStream(parms.get("host"), parms.getInt("port")) .map(new TableStreamMapper()) .assignTimestampsAndWatermarks(new MyEventTimestampAssigner()); // Register Table // Dynamic Table From Stream tableEnvironment.registerDataStream("pageViews", eventStream, "pageViewTime.rowtime, username, url"); // Continuous Query String continuousQuery = "SELECT TUMBLE_END(pageViewTime, INTERVAL '1' MINUTE) as wend, " + "username, COUNT(url) as viewcount FROM pageViews " + "GROUP BY TUMBLE(pageViewTime, INTERVAL '1' MINUTE), username"; // Dynamic Table from Continuous Query Table rows = tableEnvironment.sqlQuery(continuousQuery); rows.printSchema(); // Convert Results to DataStream Table resultTable = rows .select("wend, username,viewcount"); TupleTypeInfo<Tuple3<Timestamp,String,Long>> tupleTypeInfo = new TupleTypeInfo<>( Types.SQL_TIMESTAMP, Types.STRING, Types.LONG); DataStream<Tuple3<Timestamp,String,Long>> resultDataStream = tableEnvironment.toAppendStream(resultTable,tupleTypeInfo); resultDataStream.print(); Question # 2 (TableAPI solution) ======== I'm trying to implement the same logic using the Table API. *Question: how do I express that I want to get the count(url) instead of just the url?* I don't know how to do that using the Table API Source code is below: // Get Our Data Stream DataStream<Tuple3<Long,String,String>> eventStream = env .socketTextStream(parms.get("host"), parms.getInt("port")) .map(new TableStreamMapper()) .assignTimestampsAndWatermarks(new MyEventTimestampAssigner()); // Dynamic Table From Stream Table dynamicTable = tableEnvironment.fromDataStream(eventStream, "pageViewTime.rowtime, username, url"); // Continuous Query : Table API WindowedTable windowedTable = dynamicTable.window(Tumble.over("1.minutes").on("pageViewTime").as("clickWindow")); windowedTable.table().printSchema(); Table resultTable = windowedTable.table() .select("pageViewTime,username,url"); TupleTypeInfo<Tuple3<Timestamp,String,String>> tupleType = new TupleTypeInfo<>( Types.SQL_TIMESTAMP, Types.STRING, Types.STRING); DataStream<Tuple3<Timestamp,String,String>> resultDataStream = tableEnvironment.toAppendStream(resultTable, tupleType); resultDataStream.print(); -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/