Hi Rong,

Thank you for reply :-)

which Flink version are you using?

I'm using Flink-1.8.0.

what is the "sourceTable.getSchema().toRowType()" return?

Row(time1: TimeIndicatorTypeInfo(rowtime))

what is the line *".map(a -> a)" *do and can you remove it?

*".map(a->a)"* is just to illustrate a problem.
My actual code contains a process function (instead of .map() in the
snippet) which appends a new field containing watermark to a row.
If there were ways to get watermark inside a scalar UDF, I wouldn't convert
table to datastream and vice versa.

if I am understanding correctly, you are also using "time1" as the rowtime,
> is that want your intension is to use it later as well?

yup :-)

As far as I know *".returns(sourceTable.getSchema().toRowType());"* only
> adds a type information hint about the return type of this operator. It is
> used in cases where Flink cannot determine automatically[1].

The reason why I specify *".returns(sourceTable.getSchema().toRowType());"* is
to give a type information hint as you said.
That is needed later when I need to make another table like
   "*Table anotherTable = tEnv.fromDataStream(stream);"*,
Without the type information hint, I've got an error
   "*An input of GenericTypeInfo<Row> cannot be converted to Table. Please
specify the type of the input with a RowTypeInfo."*
That's why I give a type information hint in that way.

Best,

Dongwon

On Fri, Jul 19, 2019 at 12:39 AM Rong Rong <walter...@gmail.com> wrote:

> Hi Dongwon,
>
> Can you provide a bit more information:
> which Flink version are you using?
> what is the "sourceTable.getSchema().toRowType()" return?
> what is the line *".map(a -> a)" *do and can you remove it?
> if I am understanding correctly, you are also using "time1" as the
> rowtime, is that want your intension is to use it later as well?
>
> As far as I know *".returns(sourceTable.getSchema().toRowType());"* only
> adds a type information hint about the return type of this operator. It is
> used in cases where Flink cannot determine automatically[1].
>
> Thanks,
> Rong
>
> --
> [1]
> https://github.com/apache/flink/blob/release-1.8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java#L351
>
>
> On Wed, Jul 17, 2019 at 1:29 AM Dongwon Kim <eastcirc...@gmail.com> wrote:
>
>> Hello,
>>
>> Consider the following snippet:
>>
>>>     Table sourceTable = getKafkaSource0(tEnv);
>>>     DataStream<Row> stream = tEnv.toAppendStream(sourceTable, Row.class)
>>>
>>> *      .map(a -> a)      .returns(sourceTable.getSchema().toRowType());*
>>>     stream.print();
>>>
>> where sourceTable.printSchema() shows:
>>
>>> root
>>>  |-- time1: TimeIndicatorTypeInfo(rowtime)
>>
>>
>>
>>  This program returns the following exception:
>>
>>> Exception in thread "main"
>>> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>>> at
>>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>>> at
>>> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
>>> at
>>> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
>>> at
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1509)
>>> at app.metatron.test.Main2.main(Main2.java:231)
>>> *Caused by: java.lang.ClassCastException: java.sql.Timestamp cannot be
>>> cast to java.lang.Long*
>>> * at
>>> org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:32)*
>>> at
>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)
>>> at
>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)
>>> ...
>>
>>
>> The row serializer seems to try to deep-copy an instance of
>> java.sql.Timestamp using LongSerializer instead of SqlTimestampSerializer.
>> Could anybody help me?
>>
>> Best,
>>
>> - Dongwon
>>
>> p.s. though removing .returns() makes everything okay, I need to do that
>> as I want to convert DataStream<Row> into another table later.
>> p.s. the source table is created as follows:
>>
>> private static final Table getKafkaSource0(StreamTableEnvironment tEnv) {
>>>     ConnectorDescriptor connectorDescriptor = new Kafka()
>>>       .version("universal")
>>>       .topic("mytopic")
>>>       .property("bootstrap.servers", "localhost:9092")
>>>       .property("group.id", "mygroup")
>>>       .startFromEarliest();
>>>     FormatDescriptor formatDescriptor = new Csv()
>>>       .deriveSchema()
>>>       .ignoreParseErrors()
>>>       .fieldDelimiter(',');
>>>     Schema schemaDescriptor = new Schema()
>>>       .field("time1", SQL_TIMESTAMP())
>>>       .rowtime(
>>>         new Rowtime()
>>>           .timestampsFromField("rowTime")
>>>           .watermarksPeriodicBounded(100)
>>>       );
>>>     tEnv.connect(connectorDescriptor)
>>>       .withFormat(formatDescriptor)
>>>       .withSchema(schemaDescriptor)
>>>       .inAppendMode()
>>>       .registerTableSource("mysrc");
>>>     return tEnv.scan("mysrc");
>>>   }
>>
>>

Reply via email to