Re: Field could not be resolved by the field mapping when using kafka connector

2018-11-15 Thread Dominik Wosiński
Hey,
Thanks for the info, I haven't noticed that.
I was just going through older messages with no responses.

Best Regards,
Dom.


Re: Field could not be resolved by the field mapping when using kafka connector

2018-11-15 Thread Chesnay Schepler

This issue was already resolved in another thread by the same author.

On 15.11.2018 10:52, Dominik Wosiński wrote:

Hey,

Could You please show a sample data that You want to process? This 
would help in verifying the issue.


Best Regards,
Dom.

wt., 13 lis 2018 o 13:58 Jeff Zhang > napisał(a):


Hi,

I hit the following error when I try to use kafka connector in
flink table api. There's very little document about how to use
kafka connector in flink table api, could anyone help me on that ?
Thanks

Exception in thread "main"
org.apache.flink.table.api.ValidationException: Field 'event_ts'
could not be resolved by the field mapping.
at

org.apache.flink.table.sources.TableSourceUtil$.org$apache$flink$table$sources$TableSourceUtil$$resolveInputField(TableSourceUtil.scala:491)
at

org.apache.flink.table.sources.TableSourceUtil$$anonfun$org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields$1.apply(TableSourceUtil.scala:521)
at

org.apache.flink.table.sources.TableSourceUtil$$anonfun$org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields$1.apply(TableSourceUtil.scala:521)
at

scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at

scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at

scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at
scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at

org.apache.flink.table.sources.TableSourceUtil$.org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields(TableSourceUtil.scala:521)
at

org.apache.flink.table.sources.TableSourceUtil$.validateTableSource(TableSourceUtil.scala:127)
at

org.apache.flink.table.plan.schema.StreamTableSourceTable.(StreamTableSourceTable.scala:33)
at

org.apache.flink.table.api.StreamTableEnvironment.registerTableSourceInternal(StreamTableEnvironment.scala:150)
at

org.apache.flink.table.api.TableEnvironment.registerTableSource(TableEnvironment.scala:541)
at

org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:47)
at

org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSourceAndSink(ConnectTableDescriptor.scala:68)

And here's the source code:



  case class Record(status:String, direction:String,var event_ts: Timestamp)


   def main(args: Array[String]): Unit = {
 val senv = StreamExecutionEnvironment.getExecutionEnvironment 
senv.setParallelism(1)
 senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

 val data: DataStream[Record] = ...
 val tEnv = TableEnvironment.getTableEnvironment(senv)tEnv
   // declare the external system to connect to .connect(
   new Kafka()
 .version("0.11")
 .topic("processed5.events")
 .startFromEarliest()
 .property("zookeeper.connect","localhost:2181")
 .property("bootstrap.servers","localhost:9092"))
   .withFormat(new Json()
 .failOnMissingField(false)
 .deriveSchema()
   )
   .withSchema(
 new Schema()
   .field("status", Types.STRING)
   .field("direction", Types.STRING)
   .field("event_ts", Types.SQL_TIMESTAMP).rowtime(
   new 
Rowtime().timestampsFromField("event_ts").watermarksPeriodicAscending())
   )

   // specify the update-mode for streaming tables .inAppendMode()

   // register as source, sink, or both and under a name 
.registerTableSourceAndSink("MyUserTable");

 tEnv.fromDataStream(data).insertInto("MyUserTable")





Re: Field could not be resolved by the field mapping when using kafka connector

2018-11-15 Thread Dominik Wosiński
Hey,

Could You please show a sample data that You want to process? This would
help in verifying the issue.

Best Regards,
Dom.

wt., 13 lis 2018 o 13:58 Jeff Zhang  napisał(a):

> Hi,
>
> I hit the following error when I try to use kafka connector in flink table
> api. There's very little document about how to use kafka connector in flink
> table api, could anyone help me on that ? Thanks
>
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> Field 'event_ts' could not be resolved by the field mapping.
> at
> org.apache.flink.table.sources.TableSourceUtil$.org$apache$flink$table$sources$TableSourceUtil$$resolveInputField(TableSourceUtil.scala:491)
> at
> org.apache.flink.table.sources.TableSourceUtil$$anonfun$org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields$1.apply(TableSourceUtil.scala:521)
> at
> org.apache.flink.table.sources.TableSourceUtil$$anonfun$org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields$1.apply(TableSourceUtil.scala:521)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
> at
> org.apache.flink.table.sources.TableSourceUtil$.org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields(TableSourceUtil.scala:521)
> at
> org.apache.flink.table.sources.TableSourceUtil$.validateTableSource(TableSourceUtil.scala:127)
> at
> org.apache.flink.table.plan.schema.StreamTableSourceTable.(StreamTableSourceTable.scala:33)
> at
> org.apache.flink.table.api.StreamTableEnvironment.registerTableSourceInternal(StreamTableEnvironment.scala:150)
> at
> org.apache.flink.table.api.TableEnvironment.registerTableSource(TableEnvironment.scala:541)
> at
> org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:47)
> at
> org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSourceAndSink(ConnectTableDescriptor.scala:68)
>
> And here's the source code:
>
>
>
>  case class Record(status: String, direction: String, var event_ts: Timestamp)
>
>
>   def main(args: Array[String]): Unit = {
> val senv = StreamExecutionEnvironment.getExecutionEnvironment
> senv.setParallelism(1)
> senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>
> val data: DataStream[Record] = ...
> val tEnv = TableEnvironment.getTableEnvironment(senv)
> tEnv
>   // declare the external system to connect to
>   .connect(
>   new Kafka()
> .version("0.11")
> .topic("processed5.events")
> .startFromEarliest()
> .property("zookeeper.connect", "localhost:2181")
> .property("bootstrap.servers", "localhost:9092"))
>   .withFormat(new Json()
> .failOnMissingField(false)
> .deriveSchema()
>   )
>   .withSchema(
> new Schema()
>   .field("status", Types.STRING)
>   .field("direction", Types.STRING)
>   .field("event_ts", Types.SQL_TIMESTAMP).rowtime(
>   new 
> Rowtime().timestampsFromField("event_ts").watermarksPeriodicAscending())
>   )
>
>   // specify the update-mode for streaming tables
>   .inAppendMode()
>
>   // register as source, sink, or both and under a name
>   .registerTableSourceAndSink("MyUserTable");
>
> tEnv.fromDataStream(data).insertInto("MyUserTable")
>
>


Re: Field could not be resolved by the field mapping when using kafka connector

2018-11-13 Thread Jeff Zhang
Thanks hequn, it is very helpful

On Wed, Nov 14, 2018 at 2:32 PM Hequn Cheng  wrote:

> Hi jeff,
>
> We need a different field name for the rowtime indicator, something looks
> like:
>
>>   new Schema()
>> .field("status", Types.STRING)
>> .field("direction", Types.STRING)
>> .field("rowtime", Types.SQL_TIMESTAMP).rowtime(
>> new
>> Rowtime().timestampsFromField("event_ts").watermarksPeriodicAscending())
>
>
> Furthermore, we should define another sink schema which contains no
> rowtime definitions, since currently time attributes and custom field
> mappings are not supported yet for sink.
>
>> val sinkSchema =
>>   new Schema()
>> .field("status", Types.STRING)
>> .field("direction", Types.STRING)
>> .field("rowtime", Types.SQL_TIMESTAMP)
>
>
> Btw, a unified api for source and sink is under discussion now. More
> details here[1]
>
> Best, Hequn
>
> [1]
> https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit?ts=5bb62df4#heading=h.41fd6rs7b3cf
>
>
> On Wed, Nov 14, 2018 at 9:18 AM Jeff Zhang  wrote:
>
>>
>> Hi,
>>
>> I hit the following error when I try to use kafka connector in flink
>> table api. There's very little document about how to use kafka connector in
>> flink table api, could anyone help me on that ? Thanks
>>
>> Exception in thread "main"
>> org.apache.flink.table.api.ValidationException: Field 'event_ts' could not
>> be resolved by the field mapping.
>> at
>> org.apache.flink.table.sources.TableSourceUtil$.org$apache$flink$table$sources$TableSourceUtil$$resolveInputField(TableSourceUtil.scala:491)
>> at
>> org.apache.flink.table.sources.TableSourceUtil$$anonfun$org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields$1.apply(TableSourceUtil.scala:521)
>> at
>> org.apache.flink.table.sources.TableSourceUtil$$anonfun$org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields$1.apply(TableSourceUtil.scala:521)
>> at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> at
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
>> at
>> org.apache.flink.table.sources.TableSourceUtil$.org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields(TableSourceUtil.scala:521)
>> at
>> org.apache.flink.table.sources.TableSourceUtil$.validateTableSource(TableSourceUtil.scala:127)
>> at
>> org.apache.flink.table.plan.schema.StreamTableSourceTable.(StreamTableSourceTable.scala:33)
>> at
>> org.apache.flink.table.api.StreamTableEnvironment.registerTableSourceInternal(StreamTableEnvironment.scala:150)
>> at
>> org.apache.flink.table.api.TableEnvironment.registerTableSource(TableEnvironment.scala:541)
>> at
>> org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:47)
>> at
>> org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSourceAndSink(ConnectTableDescriptor.scala:68)
>>
>> And here's the source code:
>>
>>
>>
>>  case class Record(status: String, direction: String, var event_ts: 
>> Timestamp)
>>
>>
>>   def main(args: Array[String]): Unit = {
>> val senv = StreamExecutionEnvironment.getExecutionEnvironment
>> senv.setParallelism(1)
>> senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>
>> val data: DataStream[Record] = ...
>> val tEnv = TableEnvironment.getTableEnvironment(senv)
>> tEnv
>>   // declare the external system to connect to
>>   .connect(
>>   new Kafka()
>> .version("0.11")
>> .topic("processed5.events")
>> .startFromEarliest()
>> .property("zookeeper.connect", "localhost:2181")
>> .property("bootstrap.servers", "localhost:9092"))
>>   .withFormat(new Json()
>> .failOnMissingField(false)
>> .deriveSchema()
>>   )
>>   .withSchema(
>> new Schema()
>>   .field("status", Types.STRING)
>>   .field("direction", Types.STRING)
>>   .field("event_ts", Types.SQL_TIMESTAMP).rowtime(
>>   new 
>> Rowtime().timestampsFromField("event_ts").watermarksPeriodicAscending())
>>   )
>>
>>   // specify the update-mode for streaming tables
>>   .inAppendMode()
>>
>>   // register as source, sink, or both and under a name
>>   .registerTableSourceAndSink("MyUserTable");
>>
>> tEnv.fromDataStream(data).insertInto("MyUserTable")
>>
>> 0封新邮件
>> 回复
>>
>>

-- 
Best Regards

Jeff Zhang


Re: Field could not be resolved by the field mapping when using kafka connector

2018-11-13 Thread Hequn Cheng
Hi jeff,

We need a different field name for the rowtime indicator, something looks
like:

>   new Schema()
> .field("status", Types.STRING)
> .field("direction", Types.STRING)
> .field("rowtime", Types.SQL_TIMESTAMP).rowtime(
> new
> Rowtime().timestampsFromField("event_ts").watermarksPeriodicAscending())


Furthermore, we should define another sink schema which contains no rowtime
definitions, since currently time attributes and custom field mappings are
not supported yet for sink.

> val sinkSchema =
>   new Schema()
> .field("status", Types.STRING)
> .field("direction", Types.STRING)
> .field("rowtime", Types.SQL_TIMESTAMP)


Btw, a unified api for source and sink is under discussion now. More
details here[1]

Best, Hequn

[1]
https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit?ts=5bb62df4#heading=h.41fd6rs7b3cf


On Wed, Nov 14, 2018 at 9:18 AM Jeff Zhang  wrote:

>
> Hi,
>
> I hit the following error when I try to use kafka connector in flink table
> api. There's very little document about how to use kafka connector in flink
> table api, could anyone help me on that ? Thanks
>
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> Field 'event_ts' could not be resolved by the field mapping.
> at
> org.apache.flink.table.sources.TableSourceUtil$.org$apache$flink$table$sources$TableSourceUtil$$resolveInputField(TableSourceUtil.scala:491)
> at
> org.apache.flink.table.sources.TableSourceUtil$$anonfun$org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields$1.apply(TableSourceUtil.scala:521)
> at
> org.apache.flink.table.sources.TableSourceUtil$$anonfun$org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields$1.apply(TableSourceUtil.scala:521)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
> at
> org.apache.flink.table.sources.TableSourceUtil$.org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields(TableSourceUtil.scala:521)
> at
> org.apache.flink.table.sources.TableSourceUtil$.validateTableSource(TableSourceUtil.scala:127)
> at
> org.apache.flink.table.plan.schema.StreamTableSourceTable.(StreamTableSourceTable.scala:33)
> at
> org.apache.flink.table.api.StreamTableEnvironment.registerTableSourceInternal(StreamTableEnvironment.scala:150)
> at
> org.apache.flink.table.api.TableEnvironment.registerTableSource(TableEnvironment.scala:541)
> at
> org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:47)
> at
> org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSourceAndSink(ConnectTableDescriptor.scala:68)
>
> And here's the source code:
>
>
>
>  case class Record(status: String, direction: String, var event_ts: Timestamp)
>
>
>   def main(args: Array[String]): Unit = {
> val senv = StreamExecutionEnvironment.getExecutionEnvironment
> senv.setParallelism(1)
> senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>
> val data: DataStream[Record] = ...
> val tEnv = TableEnvironment.getTableEnvironment(senv)
> tEnv
>   // declare the external system to connect to
>   .connect(
>   new Kafka()
> .version("0.11")
> .topic("processed5.events")
> .startFromEarliest()
> .property("zookeeper.connect", "localhost:2181")
> .property("bootstrap.servers", "localhost:9092"))
>   .withFormat(new Json()
> .failOnMissingField(false)
> .deriveSchema()
>   )
>   .withSchema(
> new Schema()
>   .field("status", Types.STRING)
>   .field("direction", Types.STRING)
>   .field("event_ts", Types.SQL_TIMESTAMP).rowtime(
>   new 
> Rowtime().timestampsFromField("event_ts").watermarksPeriodicAscending())
>   )
>
>   // specify the update-mode for streaming tables
>   .inAppendMode()
>
>   // register as source, sink, or both and under a name
>   .registerTableSourceAndSink("MyUserTable");
>
> tEnv.fromDataStream(data).insertInto("MyUserTable")
>
> 0封新邮件
> 回复
>
>


Field could not be resolved by the field mapping when using kafka connector

2018-11-13 Thread Jeff Zhang
Hi,

I hit the following error when I try to use kafka connector in flink table
api. There's very little document about how to use kafka connector in flink
table api, could anyone help me on that ? Thanks

Exception in thread "main" org.apache.flink.table.api.ValidationException:
Field 'event_ts' could not be resolved by the field mapping.
at
org.apache.flink.table.sources.TableSourceUtil$.org$apache$flink$table$sources$TableSourceUtil$$resolveInputField(TableSourceUtil.scala:491)
at
org.apache.flink.table.sources.TableSourceUtil$$anonfun$org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields$1.apply(TableSourceUtil.scala:521)
at
org.apache.flink.table.sources.TableSourceUtil$$anonfun$org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields$1.apply(TableSourceUtil.scala:521)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at
org.apache.flink.table.sources.TableSourceUtil$.org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields(TableSourceUtil.scala:521)
at
org.apache.flink.table.sources.TableSourceUtil$.validateTableSource(TableSourceUtil.scala:127)
at
org.apache.flink.table.plan.schema.StreamTableSourceTable.(StreamTableSourceTable.scala:33)
at
org.apache.flink.table.api.StreamTableEnvironment.registerTableSourceInternal(StreamTableEnvironment.scala:150)
at
org.apache.flink.table.api.TableEnvironment.registerTableSource(TableEnvironment.scala:541)
at
org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:47)
at
org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSourceAndSink(ConnectTableDescriptor.scala:68)

And here's the source code:



 case class Record(status: String, direction: String, var event_ts: Timestamp)


  def main(args: Array[String]): Unit = {
val senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.setParallelism(1)
senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val data: DataStream[Record] = ...
val tEnv = TableEnvironment.getTableEnvironment(senv)
tEnv
  // declare the external system to connect to
  .connect(
  new Kafka()
.version("0.11")
.topic("processed5.events")
.startFromEarliest()
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092"))
  .withFormat(new Json()
.failOnMissingField(false)
.deriveSchema()
  )
  .withSchema(
new Schema()
  .field("status", Types.STRING)
  .field("direction", Types.STRING)
  .field("event_ts", Types.SQL_TIMESTAMP).rowtime(
  new 
Rowtime().timestampsFromField("event_ts").watermarksPeriodicAscending())
  )

  // specify the update-mode for streaming tables
  .inAppendMode()

  // register as source, sink, or both and under a name
  .registerTableSourceAndSink("MyUserTable");

tEnv.fromDataStream(data).insertInto("MyUserTable")

0封新邮件
回复


Field could not be resolved by the field mapping when using kafka connector

2018-11-13 Thread Jeff Zhang
Hi,

I hit the following error when I try to use kafka connector in flink table
api. There's very little document about how to use kafka connector in flink
table api, could anyone help me on that ? Thanks

Exception in thread "main" org.apache.flink.table.api.ValidationException:
Field 'event_ts' could not be resolved by the field mapping.
at
org.apache.flink.table.sources.TableSourceUtil$.org$apache$flink$table$sources$TableSourceUtil$$resolveInputField(TableSourceUtil.scala:491)
at
org.apache.flink.table.sources.TableSourceUtil$$anonfun$org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields$1.apply(TableSourceUtil.scala:521)
at
org.apache.flink.table.sources.TableSourceUtil$$anonfun$org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields$1.apply(TableSourceUtil.scala:521)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at
org.apache.flink.table.sources.TableSourceUtil$.org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields(TableSourceUtil.scala:521)
at
org.apache.flink.table.sources.TableSourceUtil$.validateTableSource(TableSourceUtil.scala:127)
at
org.apache.flink.table.plan.schema.StreamTableSourceTable.(StreamTableSourceTable.scala:33)
at
org.apache.flink.table.api.StreamTableEnvironment.registerTableSourceInternal(StreamTableEnvironment.scala:150)
at
org.apache.flink.table.api.TableEnvironment.registerTableSource(TableEnvironment.scala:541)
at
org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:47)
at
org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSourceAndSink(ConnectTableDescriptor.scala:68)

And here's the source code:



 case class Record(status: String, direction: String, var event_ts: Timestamp)


  def main(args: Array[String]): Unit = {
val senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.setParallelism(1)
senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val data: DataStream[Record] = ...
val tEnv = TableEnvironment.getTableEnvironment(senv)
tEnv
  // declare the external system to connect to
  .connect(
  new Kafka()
.version("0.11")
.topic("processed5.events")
.startFromEarliest()
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092"))
  .withFormat(new Json()
.failOnMissingField(false)
.deriveSchema()
  )
  .withSchema(
new Schema()
  .field("status", Types.STRING)
  .field("direction", Types.STRING)
  .field("event_ts", Types.SQL_TIMESTAMP).rowtime(
  new 
Rowtime().timestampsFromField("event_ts").watermarksPeriodicAscending())
  )

  // specify the update-mode for streaming tables
  .inAppendMode()

  // register as source, sink, or both and under a name
  .registerTableSourceAndSink("MyUserTable");

tEnv.fromDataStream(data).insertInto("MyUserTable")