I am trying to understand how to use streaming sql, very similar to the
example
from the documentation: count the number of pageclicks in a certain period
of time for each user.
I'm trying to solve the problem using both the SQL API and the table API

My input sample stream looks like this: (timestamp, user, url)
I'm using EventTime
1530229582338,John,./reports?id=311
1530229584339,Bob,./cart
1530229587339,Mary,./prod?id=1
1530229592340,Liz,./home
1530229598340,John,./reports?id=312
1530229599341,John,./reports?id=313
1530229600342,Bob,./prod?id=3
1530229601342,Mary,./prod?id=7
1530229604343,Liz,./prod?id=4

My SQL API solution source code is here.
Everything is working, but I have a question:
It appears that the datatype of the timestamp is java.sql.Timestamp.
*Is there a way to retrieve this from the query as a long instead?*

When I examine the schema for the table returned by SQL query, I get this:
root
 |-- wend: TimeIndicatorTypeInfo(rowtime)
 |-- username: String
 |-- viewcount: Long

I would prefer to get the schema back like this instead:
root
 |-- wend: Long
 |-- username: String
 |-- viewcount: Long


// Get Our Data Stream
DataStream<Tuple3&lt;Long,String,String>> eventStream = env
                .socketTextStream(parms.get("host"), parms.getInt("port"))
                .map(new TableStreamMapper())
                .assignTimestampsAndWatermarks(new MyEventTimestampAssigner());


// Register Table
// Dynamic Table From Stream
tableEnvironment.registerDataStream("pageViews", eventStream,
"pageViewTime.rowtime, username, url");

// Continuous Query
String continuousQuery =
        "SELECT TUMBLE_END(pageViewTime, INTERVAL '1' MINUTE) as wend, " +
        "username, COUNT(url) as viewcount FROM pageViews " +
        "GROUP BY TUMBLE(pageViewTime, INTERVAL '1' MINUTE), username";

// Dynamic Table from Continuous Query
Table rows = tableEnvironment.sqlQuery(continuousQuery);
rows.printSchema();     

// Convert Results to DataStream
Table resultTable = rows
        .select("wend, username,viewcount");


TupleTypeInfo<Tuple3&lt;Timestamp,String,Long>> tupleTypeInfo = new
TupleTypeInfo<>(
                Types.SQL_TIMESTAMP,
                Types.STRING,
                Types.LONG);
DataStream<Tuple3&lt;Timestamp,String,Long>> resultDataStream =
tableEnvironment.toAppendStream(resultTable,tupleTypeInfo);
resultDataStream.print();



Question # 2  (TableAPI solution)
========
I'm trying to implement the same logic using the Table API.
*Question: how do I express that I want to get the count(url) instead of
just the url?*
I don't know how to do that using the Table API
Source code is below:

                // Get Our Data Stream
                DataStream<Tuple3&lt;Long,String,String>> eventStream = env
                                .socketTextStream(parms.get("host"), 
parms.getInt("port"))
                                .map(new TableStreamMapper())
                                .assignTimestampsAndWatermarks(new 
MyEventTimestampAssigner());
                

                // Dynamic Table From Stream
                Table dynamicTable = 
tableEnvironment.fromDataStream(eventStream,
"pageViewTime.rowtime, username, url");
                
        
                // Continuous Query : Table API
                WindowedTable windowedTable =
dynamicTable.window(Tumble.over("1.minutes").on("pageViewTime").as("clickWindow"));
                windowedTable.table().printSchema();
                
                
                Table resultTable = windowedTable.table()
                        .select("pageViewTime,username,url");           
                TupleTypeInfo<Tuple3&lt;Timestamp,String,String>> tupleType = 
new
TupleTypeInfo<>(
                                Types.SQL_TIMESTAMP,
                                Types.STRING,
                                Types.STRING);
                DataStream<Tuple3&lt;Timestamp,String,String>> resultDataStream 
=
                                tableEnvironment.toAppendStream(resultTable, 
tupleType);
                resultDataStream.print();                               









--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to