Sorry for the previous incomplete email. Didn't realize I hit send!
I was facing a weird compilation error in Scala when I did
val joinedStream = stream1.connect(stream2)
.transform("funName", outTypeInfo, joinOperator)
It turned out to be due to a difference in API signature between Scala and
Java API. I was refering to javadoc. Is there a scaladoc?
Java API has
public <R> SingleOutputStreamOperator<R> transform(
String functionName,
TypeInformation<R> outTypeInfo,
TwoInputStreamOperator<IN1, IN2, R> operator)
Scala API has
def transform[R: TypeInformation](
functionName: String,
operator: TwoInputStreamOperator[IN1, IN2, R])
Srikanth
On Mon, May 2, 2016 at 7:18 PM, Srikanth <[email protected]> wrote:
> Hello,
>
> I'm fac
>
> val stream = env.addSource(new FlinkKafkaConsumer09[String]("test-topic",
> new SimpleStringSchema(), properties))
> val bidderStream: KeyedStream[BidderRawLogs, Int] = stream.flatMap(b =>
> BidderRawLogs(b)).keyBy(b => b.strategyId)
>
> val metaStrategy: KeyedStream[(Int, String), Int] =
> env.readTextFile("path").name("Strategy")
> .map((1, _) ).keyBy(_._1)
>
> val staticTypeInfo = new TypeHint[(Int, String)]() {}.getTypeInfo()
> val dynamicTypeInfo = new TypeHint[BidderRawLogs]() {}.getTypeInfo()
> val joinOperator: TwoInputStreamOperator[BidderRawLogs, (Int, String),
> (Int, BidderRawLogs, (Int, String))] =
> new JoinOperator[Int, BidderRawLogs, (Int, String)] (dynamicTypeInfo,
> staticTypeInfo)
> val outTypeInfo = new TypeHint[(Int, BidderRawLogs, (Int, String))]()
> {}.getTypeInfo()
>
> val funName = "test"
> val joinedStream = bidderStream.connect(metaStrategy)
> .transform(funName, joinOperator, outTypeInfo)
>
>