Hi chrisr,

> *Is there a way to retrieve this from the query as a long instead?*
You have to convert the timestamp type to long type. It seems there are no
internal udf to convert timestamp to unix timestamp, however you can write
one and used in your SQL.

> *Question: how do I express that I want to get the count(url) instead of just
the url?*
.groupBy("clickWindow").select("count(url)") can get the count value. There
are more examples here[1].

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tableApi.html#group-windows


On Fri, Jun 29, 2018 at 8:06 AM, chrisr123 <chris.rueg...@gmail.com> wrote:

> I am trying to understand how to use streaming sql, very similar to the
> example
> from the documentation: count the number of pageclicks in a certain period
> of time for each user.
> I'm trying to solve the problem using both the SQL API and the table API
>
> My input sample stream looks like this: (timestamp, user, url)
> I'm using EventTime
> 1530229582338,John,./reports?id=311
> 1530229584339,Bob,./cart
> 1530229587339,Mary,./prod?id=1
> 1530229592340,Liz,./home
> 1530229598340,John,./reports?id=312
> 1530229599341,John,./reports?id=313
> 1530229600342,Bob,./prod?id=3
> 1530229601342,Mary,./prod?id=7
> 1530229604343,Liz,./prod?id=4
>
> My SQL API solution source code is here.
> Everything is working, but I have a question:
> It appears that the datatype of the timestamp is java.sql.Timestamp.
> *Is there a way to retrieve this from the query as a long instead?*
>
> When I examine the schema for the table returned by SQL query, I get this:
> root
>  |-- wend: TimeIndicatorTypeInfo(rowtime)
>  |-- username: String
>  |-- viewcount: Long
>
> I would prefer to get the schema back like this instead:
> root
>  |-- wend: Long
>  |-- username: String
>  |-- viewcount: Long
>
>
> // Get Our Data Stream
> DataStream<Tuple3&lt;Long,String,String>> eventStream = env
>                 .socketTextStream(parms.get("host"), parms.getInt("port"))
>                 .map(new TableStreamMapper())
>                 .assignTimestampsAndWatermarks(new
> MyEventTimestampAssigner());
>
>
> // Register Table
> // Dynamic Table From Stream
> tableEnvironment.registerDataStream("pageViews", eventStream,
> "pageViewTime.rowtime, username, url");
>
> // Continuous Query
> String continuousQuery =
>         "SELECT TUMBLE_END(pageViewTime, INTERVAL '1' MINUTE) as wend, " +
>         "username, COUNT(url) as viewcount FROM pageViews " +
>         "GROUP BY TUMBLE(pageViewTime, INTERVAL '1' MINUTE), username";
>
> // Dynamic Table from Continuous Query
> Table rows = tableEnvironment.sqlQuery(continuousQuery);
> rows.printSchema();
>
> // Convert Results to DataStream
> Table resultTable = rows
>         .select("wend, username,viewcount");
>
>
> TupleTypeInfo<Tuple3&lt;Timestamp,String,Long>> tupleTypeInfo = new
> TupleTypeInfo<>(
>                 Types.SQL_TIMESTAMP,
>                 Types.STRING,
>                 Types.LONG);
> DataStream<Tuple3&lt;Timestamp,String,Long>> resultDataStream =
> tableEnvironment.toAppendStream(resultTable,tupleTypeInfo);
> resultDataStream.print();
>
>
>
> Question # 2  (TableAPI solution)
> ========
> I'm trying to implement the same logic using the Table API.
> *Question: how do I express that I want to get the count(url) instead of
> just the url?*
> I don't know how to do that using the Table API
> Source code is below:
>
>                 // Get Our Data Stream
>                 DataStream<Tuple3&lt;Long,String,String>> eventStream =
> env
>                                 .socketTextStream(parms.get("host"),
> parms.getInt("port"))
>                                 .map(new TableStreamMapper())
>                                 .assignTimestampsAndWatermarks(new
> MyEventTimestampAssigner());
>
>
>                 // Dynamic Table From Stream
>                 Table dynamicTable = tableEnvironment.
> fromDataStream(eventStream,
> "pageViewTime.rowtime, username, url");
>
>
>                 // Continuous Query : Table API
>                 WindowedTable windowedTable =
> dynamicTable.window(Tumble.over("1.minutes").on("pageViewTime").as("
> clickWindow"));
>                 windowedTable.table().printSchema();
>
>
>                 Table resultTable = windowedTable.table()
>                         .select("pageViewTime,username,url");
>                 TupleTypeInfo<Tuple3&lt;Timestamp,String,String>>
> tupleType = new
> TupleTypeInfo<>(
>                                 Types.SQL_TIMESTAMP,
>                                 Types.STRING,
>                                 Types.STRING);
>                 DataStream<Tuple3&lt;Timestamp,String,String>>
> resultDataStream =
>                                 tableEnvironment.toAppendStream(resultTable,
> tupleType);
>                 resultDataStream.print();
>
>
>
>
>
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>

Reply via email to