Hi Niels,

I think (not 100% sure) you could also cast the event time attribute to
TIMESTAMP before you emit the table.
This should remove the event time property (and thereby the
TimeIndicatorTypeInfo) and you wouldn't know to fiddle with the output
types.

Best, Fabian

Am Mi., 21. Aug. 2019 um 10:51 Uhr schrieb Niels Basjes <ni...@basjes.nl>:

> Hi,
>
> It has taken me quite a bit of time to figure this out.
> This is the solution I have now (works on my machine).
>
> Please tell me where I can improve this.
>
> Turns out that the schema you provide for registerDataStream only needs
> the 'top level' fields of the Avro datastructure.
> With only the top fields there you can still access nested fields with
> something like "topfield.x.y.z" in the SQL statement.
>
> What I found is that the easiest way to make this all work is to ensure
> the rowtime field in the structure is at the top level (which makes sense
> in general) and generate the fields string where I only need to know the
> name of the "rowtime" field.
>
> So I have
>
>     DataStream<Measurement> inputStream = ...
>
>
> then I register the stream with
>
>
>     TypeInformation<Measurement> typeInformation = 
> TypeInformation.of(Measurement.class);
>     String [] fieldNames = TableEnvironment.getFieldNames(typeInformation);
>
>     List<String> rootSchema = new ArrayList<>();
>     for (String fieldName: fieldNames) {
>         if (rowtimeFieldName.equals(fieldName)) {
>             rootSchema.add(fieldName + ".rowtime");
>         } else {
>             rootSchema.add(fieldName);
>         }
>     }
>
>     tableEnv.registerDataStream("MeasurementStream", inputStream, 
> String.join(",", rootSchema));
>
>
> Now after the actual SQL has been executed I have a
>
>     Table resultTable = ...
>
> Now simply feeding this into a DataStream with something like this fails
> badly.
>
>     TypeInformation<Row> tupleType = new 
> RowTypeInfo(resultTable.getSchema().getFieldTypes());
>     DataStream<Row>      resultSet = tableEnv.toAppendStream(resultTable, 
> tupleType);
>
> will result in
>
>     org.apache.flink.table.api.TableException: The time indicator type is an 
> internal type only.
>        at 
> org.apache.flink.table.api.TableEnvironment.org$apache$flink$table$api$TableEnvironment$$validateFieldType$1(TableEnvironment.scala:1172)
>
> Turns out that the schema of the output contains a field that was created
> by TUMBLE_START which is of type TimeIndicatorTypeInfo
>
> So I have to do it this way (NASTY!):
>
>     final TypeInformation<?>[] fieldTypes = 
> resultTable.getSchema().getFieldTypes();
>     int index;
>     for(index = 0 ; index < fieldTypes.length ; index++) {
>         if (fieldTypes[index] instanceof TimeIndicatorTypeInfo) {
>            fieldTypes[index] = SQL_TIMESTAMP;
>         }
>     }
>     TypeInformation<Row> tupleType = new RowTypeInfo(fieldTypes);
>     DataStream<Row>      resultSet = tableEnv.toAppendStream(resultTable, 
> tupleType);
>
> Which gives me the desired DataStream.
>
>
> Niels Basjes
>
>
>
>
>
> On Wed, Aug 14, 2019 at 5:13 PM Timo Walther <twal...@apache.org> wrote:
>
>> Hi Niels,
>>
>> if you are coming from DataStream API, all you need to do is to write a
>> timestamp extractor.
>>
>> When you call:
>>
>> tableEnv.registerDataStream("TestStream", letterStream,
>> "EventTime.rowtime, letter, counter");
>>
>> The ".rowtime" means that the framework will extract the rowtime from the
>> stream record timestamp. You don't need to name all fields again but could
>> simply construct a string from letterStream.getTypeInfo().getFieldNames().
>> I hope we can improve this further in the future as part of FLIP-37.
>>
>> Regards,
>> Timo
>>
>> Am 14.08.19 um 17:00 schrieb Niels Basjes:
>>
>> Hi,
>>
>> Experimenting with the StreamTableEnvironment I build something like this:
>>
>> DataStream<Tuple3<Long, String, Long>> letterStream = ...
>> tableEnv.registerDataStream("TestStream", letterStream,
>> "EventTime.rowtime, letter, counter");
>>
>>
>> Because the "EventTime" was tagged with ".rowtime" it is now being used
>> as the rowtime and has the DATETIME so I can do this
>>
>> TUMBLE_START(eventTime, INTERVAL '1' MINUTE)
>>
>>
>> So far so good.
>>
>> Working towards a more realistic scenario I have a source that produces a
>> stream of records that have been defined using Apache Avro.
>>
>> So I have a Measurement.avdl that (among other things) contains something
>> like this:
>>
>> record Measurement {
>>    /** The time (epoch in milliseconds since 1970-01-01 UTC) when the
>> event occurred */
>>     long                        timestamp;
>>     string                      letter;
>>     long                        pageviews;
>> }
>>
>>
>> Now because the registerDataStream call can also derive the schema from
>> the provided data I can do this:
>>
>> DataStream<Measurement> inputStream = ...
>> tableEnv.registerDataStream("DataStream", inputStream);
>>
>>
>> This is very nice because any real schema is big (few hundred columns)
>> and changes over time.
>>
>> Now In the SQL the timestamp is a BIGINT and not a DATETIME and as a
>> consequence I get this error
>>
>> Cannot apply 'TUMBLE' to arguments of type 'TUMBLE(<BIGINT>, <INTERVAL
>> MINUTE>)'. Supported form(s): 'TUMBLE(<DATETIME>, <DATETIME_INTERVAL>)'
>>
>>
>> So far I have now yet figured how to make the system understand that the
>> timestamp column show be treated as the rowtime.
>> How do I do that?
>>
>> --
>> Best regards / Met vriendelijke groeten,
>>
>> Niels Basjes
>>
>>
>>
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>

Reply via email to