Hi Rong, Thank you for reply :-)
which Flink version are you using? I'm using Flink-1.8.0. what is the "sourceTable.getSchema().toRowType()" return? Row(time1: TimeIndicatorTypeInfo(rowtime)) what is the line *".map(a -> a)" *do and can you remove it? *".map(a->a)"* is just to illustrate a problem. My actual code contains a process function (instead of .map() in the snippet) which appends a new field containing watermark to a row. If there were ways to get watermark inside a scalar UDF, I wouldn't convert table to datastream and vice versa. if I am understanding correctly, you are also using "time1" as the rowtime, > is that want your intension is to use it later as well? yup :-) As far as I know *".returns(sourceTable.getSchema().toRowType());"* only > adds a type information hint about the return type of this operator. It is > used in cases where Flink cannot determine automatically[1]. The reason why I specify *".returns(sourceTable.getSchema().toRowType());"* is to give a type information hint as you said. That is needed later when I need to make another table like "*Table anotherTable = tEnv.fromDataStream(stream);"*, Without the type information hint, I've got an error "*An input of GenericTypeInfo<Row> cannot be converted to Table. Please specify the type of the input with a RowTypeInfo."* That's why I give a type information hint in that way. Best, Dongwon On Fri, Jul 19, 2019 at 12:39 AM Rong Rong <walter...@gmail.com> wrote: > Hi Dongwon, > > Can you provide a bit more information: > which Flink version are you using? > what is the "sourceTable.getSchema().toRowType()" return? > what is the line *".map(a -> a)" *do and can you remove it? > if I am understanding correctly, you are also using "time1" as the > rowtime, is that want your intension is to use it later as well? > > As far as I know *".returns(sourceTable.getSchema().toRowType());"* only > adds a type information hint about the return type of this operator. It is > used in cases where Flink cannot determine automatically[1]. > > Thanks, > Rong > > -- > [1] > https://github.com/apache/flink/blob/release-1.8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java#L351 > > > On Wed, Jul 17, 2019 at 1:29 AM Dongwon Kim <eastcirc...@gmail.com> wrote: > >> Hello, >> >> Consider the following snippet: >> >>> Table sourceTable = getKafkaSource0(tEnv); >>> DataStream<Row> stream = tEnv.toAppendStream(sourceTable, Row.class) >>> >>> * .map(a -> a) .returns(sourceTable.getSchema().toRowType());* >>> stream.print(); >>> >> where sourceTable.printSchema() shows: >> >>> root >>> |-- time1: TimeIndicatorTypeInfo(rowtime) >> >> >> >> This program returns the following exception: >> >>> Exception in thread "main" >>> org.apache.flink.runtime.client.JobExecutionException: Job execution failed. >>> at >>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) >>> at >>> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638) >>> at >>> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123) >>> at >>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1509) >>> at app.metatron.test.Main2.main(Main2.java:231) >>> *Caused by: java.lang.ClassCastException: java.sql.Timestamp cannot be >>> cast to java.lang.Long* >>> * at >>> org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:32)* >>> at >>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93) >>> at >>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44) >>> ... >> >> >> The row serializer seems to try to deep-copy an instance of >> java.sql.Timestamp using LongSerializer instead of SqlTimestampSerializer. >> Could anybody help me? >> >> Best, >> >> - Dongwon >> >> p.s. though removing .returns() makes everything okay, I need to do that >> as I want to convert DataStream<Row> into another table later. >> p.s. the source table is created as follows: >> >> private static final Table getKafkaSource0(StreamTableEnvironment tEnv) { >>> ConnectorDescriptor connectorDescriptor = new Kafka() >>> .version("universal") >>> .topic("mytopic") >>> .property("bootstrap.servers", "localhost:9092") >>> .property("group.id", "mygroup") >>> .startFromEarliest(); >>> FormatDescriptor formatDescriptor = new Csv() >>> .deriveSchema() >>> .ignoreParseErrors() >>> .fieldDelimiter(','); >>> Schema schemaDescriptor = new Schema() >>> .field("time1", SQL_TIMESTAMP()) >>> .rowtime( >>> new Rowtime() >>> .timestampsFromField("rowTime") >>> .watermarksPeriodicBounded(100) >>> ); >>> tEnv.connect(connectorDescriptor) >>> .withFormat(formatDescriptor) >>> .withSchema(schemaDescriptor) >>> .inAppendMode() >>> .registerTableSource("mysrc"); >>> return tEnv.scan("mysrc"); >>> } >> >>