Hello,
I'm fac
val stream = env.addSource(new FlinkKafkaConsumer09[String]("test-topic",
new SimpleStringSchema(), properties))
val bidderStream: KeyedStream[BidderRawLogs, Int] = stream.flatMap(b =>
BidderRawLogs(b)).keyBy(b => b.strategyId)
val metaStrategy: KeyedStream[(Int, String), Int] =
env.readTextFile("path").name("Strategy")
.map((1, _) ).keyBy(_._1)
val staticTypeInfo = new TypeHint[(Int, String)]() {}.getTypeInfo()
val dynamicTypeInfo = new TypeHint[BidderRawLogs]() {}.getTypeInfo()
val joinOperator: TwoInputStreamOperator[BidderRawLogs, (Int, String),
(Int, BidderRawLogs, (Int, String))] =
new JoinOperator[Int, BidderRawLogs, (Int, String)] (dynamicTypeInfo,
staticTypeInfo)
val outTypeInfo = new TypeHint[(Int, BidderRawLogs, (Int, String))]()
{}.getTypeInfo()
val funName = "test"
val joinedStream = bidderStream.connect(metaStrategy)
.transform(funName, joinOperator, outTypeInfo)