Hi,

there are multiple ways to create a table for testing:

- use the datagen connector
- use the filesystem connector with CSV data
- and beginning from Flink 1.13 your code snippets becomes much simpler

Regards,
Timo

On 29.04.21 20:35, Svend wrote:
I found an answer to my own question!

For future reference, the snipet below allows to create a SQL table with a nested field and a watermark and filled with hard-coded values, which is all I need in order to test SQL expressions.

It's quite a mouthful though, is there a more succint to express the same thing?


var testData = List./of/(
Row./of/(Row./of/("group123"), Timestamp./valueOf/("2021-02-03 11:36:20")),
Row./of/(Row./of/("group123"), Timestamp./valueOf/("2021-02-03 11:38:20")),
Row./of/(Row./of/("group123"), Timestamp./valueOf/("2021-02-03 11:40:20"))
);
var testStream = streamEnv
.fromCollection(testData,
Types./ROW_NAMED/(new String[] {"created", "event_time"},
Types./ROW_NAMED/(new String[] {"fandom_id"}, Types./STRING/),
Types./SQL_TIMESTAMP
/)
)
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Row>/forBoundedOutOfOrderness/(Duration./ofMinutes/(10))
.withTimestampAssigner(
TimestampAssignerSupplier./of/((t2, t) -> ((Timestamp) (t2.getField(1))).getTime()))
);
var testDataTable = tableEnv.fromDataStream(testStream, /$/("created"), /$/("event_time").rowtime());
tableEnv.createTemporaryView("post_events_kafka", testDataTable);





On Thu, 29 Apr 2021, at 7:04 PM, Svend wrote:
I'm trying to write java unit test for a Flink SQL application using Flink mini cluster, but I do not manage to create an input table with nested fields and time characteristics.

I had a look at the documentation and examples below, although I'm still struggling: https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/time_attributes.html <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/time_attributes.html> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java <https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java>


Consider for example this simple expression that I want to test and which depends on the nested field "created.group_id" and expects "metricValue" to be the row time:


var createTableDDl = ""
        + " CREATE TEMPORARY VIEW postCreated10min                                      \n"         + " AS \n"         + " SELECT \n"         + "   created.group_id as groupId,                                              \n"         + "   TUMBLE_END(event_time, INTERVAL '10' MINUTES) as metricTime,              \n"         + "   TUMBLE_ROWTIME(event_time, INTERVAL '10' MINUTES) as rowTime,             \n"         + "   count(1) as metricValue                                                   \n"         + " FROM post_events_kafka                                                      \n"         + " GROUP BY                                                                    \n"         + " created.group_id, \n"         + "   TUMBLE(event_time, INTERVAL '10' MINUTES)                                 \n";
tableEnv.executeSql(createTableDDl);


In a unit test, the following syntax allows me to create test input data with nested fields, but I have not found how to specify row time nor watermarks with this approach:


Table testTable = tableEnv.fromValues(
  DataTypes.ROW(
    DataTypes.FIELD("created",
      DataTypes.ROW(
        DataTypes.FIELD("group_id", DataTypes.STRING())
      )
    ),
    DataTypes.FIELD("event_time", DataTypes.TIMESTAMP(3))
  ),

  row(row("group123"), "2021-02-03 11:36:20"),
  row(row("group123"), "2021-02-03 11:38:20"),
  row(row("group123"), "2021-02-03 11:40:20")
);
tableEnv.createTemporaryView("post_events_kafka", testTable);


I have also tried the following syntax, which allows to specify watermark and row time, but I have not found how to create a nested field with this approach:


var testData = List.of(
  Tuple2.of("group123", Timestamp.valueOf("2021-02-03 11:36:20")),
  sTuple2.of("group123", Timestamp.valueOf("2021-02-03 11:38:20")),
  Tuple2.of("group123", Timestamp.valueOf("2021-02-03 11:40:20"))
);
var testStream = streamEnv
  .fromCollection(testData)
  .assignTimestampsAndWatermarks(WatermarkStrategy
  .<Tuple2<String, Timestamp>>forBoundedOutOfOrderness(Duration.ofMinutes(10))
  .withTimestampAssigner(
    TimestampAssignerSupplier.of((t2, t) -> t2.f1.getTime()))
  );
var testDataTable = tableEnv.fromDataStream(
  testStream,
  $("group_id"), $("true_as_of"), $("event_time").rowtime()
);
tableEnv.createTemporaryView("post_events_kafka", testDataTable);



What am I missing?


Reply via email to