Re: type error with generics ..

2019-08-26 Thread Debasish Ghosh
Thanks for the clear explanation .. On Mon, Aug 26, 2019 at 10:34 PM Seth Wiesman wrote: > Hi Debasish, > > As it seems your aware TypeInformation is Flinkā€™s internal type system > used for serialization between tasks and in/out of state backends. > > The issue you are seeing is because you are

Re: type error with generics ..

2019-08-26 Thread Debasish Ghosh
actually the scala and java code are completely separate - in fact they are part of separate test suites. We have both scala and Java API in our application but they r completely separate .. and yeah in Scala the implicits did the trick while I had to pass the TypeInformation explicitly with

Re: type error with generics ..

2019-08-26 Thread Rong Rong
Glad that you sort it out and sorry for the late reply. yes. I think the problem is how your `TypeInformation` for `Data` is being passed to the DataStreamSource construct. Regarding why scala side works but not java, there might've been something to do with the implicit variable passing for your

Re: type error with generics ..

2019-08-26 Thread Debasish Ghosh
Looks like using the following overload of StreamExecutionEnvironment.addSource which takes a TypeInformation as well, does the trick .. env.addSource( FlinkSource.collectionSourceFunction(data), TypeInformation.of(Data.class) ) regards. On Mon, Aug 26, 2019 at 11:24 AM Debasish Ghosh

Re: type error with generics ..

2019-08-25 Thread Debasish Ghosh
oh .. and I am using Flink 1.8 .. On Mon, Aug 26, 2019 at 12:09 AM Debasish Ghosh wrote: > Thanks for the feedback .. here are the details .. > > Just to give u some background the original API is a Scala API as follows > .. > > final def readStream[In: TypeInformation:

Re: type error with generics ..

2019-08-25 Thread Debasish Ghosh
Thanks for the feedback .. here are the details .. Just to give u some background the original API is a Scala API as follows .. final def readStream[In: TypeInformation: DeserializationSchema](inlet: CodecInlet[In]): DataStream[In] = context.readStream(inlet) and the *Scala version of the

Re: type error with generics ..

2019-08-25 Thread Rong Rong
I am not sure how the function `readStream` is implemented (also which version of Flink are you using?). Can you share more information on your code blocks and exception logs? Also to answer your question, DataStream return type is determined by its underlying transformation, so you cannot set it

Re: type error with generics ..

2019-08-24 Thread Debasish Ghosh
Thanks .. I tried this .. DataStream ins = readStream(in, Data.class, serdeData).map((Data d) -> d).returns(new TypeHint(){}.getTypeInfo()); But still get the same error on this line .. (BTW I am not sure how to invoke returns on a DataStream and hence had to do a fake map - any suggestions

Re: type error with generics ..

2019-08-24 Thread Rong Rong
Hi Debasish, I think the error refers to the output of your source instead of your result of the map function. E.g. DataStream ins = readStream(in, Data.class, serdeData)*.returns(new TypeInformation);* DataStream simples = ins.map((Data d) -> new Simple(d.getName())) .returns(new