Glad that you sort it out and sorry for the late reply.
yes. I think the problem is how your `TypeInformation` for `Data` is being
passed to the DataStreamSource construct.

Regarding why scala side works but not java, there might've been something
to do with the implicit variable passing for your `readStream`, which is
very tricky mixing with Java code. So I would avoid mixing them if possible.

--
Rong

On Sun, Aug 25, 2019 at 11:10 PM Debasish Ghosh <ghosh.debas...@gmail.com>
wrote:

> Looks like using the following overload of
> StreamExecutionEnvironment.addSource which takes a TypeInformation as well,
> does the trick ..
>
> env.<Data>addSource(
>   FlinkSource.<Data>collectionSourceFunction(data),
>   TypeInformation.<Data>of(Data.class)
> )
>
> regards.
>
> On Mon, Aug 26, 2019 at 11:24 AM Debasish Ghosh <ghosh.debas...@gmail.com>
> wrote:
>
>> oh .. and I am using Flink 1.8 ..
>>
>> On Mon, Aug 26, 2019 at 12:09 AM Debasish Ghosh <ghosh.debas...@gmail.com>
>> wrote:
>>
>>> Thanks for the feedback .. here are the details ..
>>>
>>> Just to give u some background the original API is a Scala API as
>>> follows ..
>>>
>>> final def readStream[In: TypeInformation: DeserializationSchema](inlet:
>>> CodecInlet[In]): DataStream[In] =
>>>     context.readStream(inlet)
>>>
>>> and the *Scala version of the code runs fine* .. Here's the Java API
>>> (also written in Scala though but passing type information and
>>> deserialization schema explicitly and using the DataStream class from Flink
>>> Java) ..
>>>
>>> final def readStream[In](inlet: CodecInlet[In], clazz: Class[In],
>>> deserializationSchema: DeserializationSchema[In]): JDataStream[In] =
>>>     context.readStream(inlet)(TypeInformation.of[In](clazz),
>>> deserializationSchema)
>>>       .javaStream
>>>
>>> Here's the Java code for transformation where I get the error ..
>>>
>>> DataStream<Data> ins =
>>>   this.<Data>readStream(in, Data.class, serdeData)
>>>       .map((Data d) -> d)
>>>       .returns(new TypeHint<Data>(){}.getTypeInfo());
>>>
>>> DataStream<Simple> simples = ins.map((Data d) -> new
>>> Simple(d.getName())); // .returns(new TypeHint<Simple>(){}.getTypeInfo());
>>> DataStreamSink<Simple> sink = writeStream(out, simples, Simple.class,
>>> serdeSimple);
>>>
>>> Here's the corresponding Scala code that runs fine ..
>>>
>>> val ins: DataStream[Data] = readStream(in)
>>> val simples: DataStream[Simple] = ins.map(r ⇒ new Simple(r.getName()))
>>> writeStream(out, simples)
>>>
>>> Here's the custom source that's also referred in the exception .. the
>>> case class is directly used in Scala while I use the Java API that uses
>>> that case class from Java ..
>>>
>>> object FlinkSource {
>>>   case class CollectionSourceFunction[T](data: Seq[T]) extends
>>> SourceFunction[T] {
>>>     def cancel(): Unit = {}
>>>     def run(ctx: SourceContext[T]): Unit = {
>>>       data.foreach(d ⇒ ctx.collect(d))
>>>     }
>>>   }
>>>
>>>   /**
>>>    * Java API
>>>    */
>>>   def collectionSourceFunction[T](data: java.util.List[T]) =
>>>     CollectionSourceFunction(data.asScala.toSeq)
>>> }
>>>
>>> Here's how I use the custom source from Java .. (which gives exception)
>>> .. here data is a java.util.List<Data>
>>>
>>> env.<Data>addSource(
>>>   FlinkSource.<Data>collectionSourceFunction(data)
>>> )
>>>
>>> and here's the Scala version, which runs fine .. here data is a
>>> scala.Seq[Data]
>>>
>>> env.addSource(FlinkSource.CollectionSourceFunction(data))
>>>
>>> Here's the complete exception ..
>>>
>>> [info]   org.apache.flink.api.common.functions.InvalidTypesException:
>>> The return type of function 'Custom Source' could not be determined
>>> automatically, due to type erasure. You can give type information hints by
>>> using the returns(...) method on the result of the transformation call, or
>>> by letting your function implement the 'ResultTypeQueryable' interface.
>>> [info]   at
>>> org.apache.flink.streaming.api.transformations.StreamTransformation.getOutputType(StreamTransformation.java:420)
>>> [info]   at
>>> org.apache.flink.streaming.api.datastream.DataStream.getType(DataStream.java:175)
>>> [info]   at
>>> org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:587)
>>> [info]   at
>>> pipelines.flink.FlinkStreamletLogic.readStream(FlinkStreamlet.scala:237)
>>> [info]   at
>>> pipelines.flink.javadsl.FlinkProcessorJ$1.buildExecutionGraph(FlinkProcessorJ.java:38)
>>> [info]   at
>>> pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:282)
>>> [info]   at pipelines.flink.FlinkStreamlet.run(FlinkStreamlet.scala:151)
>>> [info]   at
>>> pipelines.flink.testkit.FlinkTestkit.doRun(FlinkTestkit.scala:146)
>>> [info]   at
>>> pipelines.flink.testkit.FlinkTestkit.run(FlinkTestkit.scala:138)
>>> [info]   at
>>> pipelines.flink.javadsl.FlinkStreamletTest.shouldProcessDataWhenItIsRun(FlinkStreamletTest.java:46)
>>> [info]   ...
>>> [info]   Cause:
>>> org.apache.flink.api.common.functions.InvalidTypesException: Type of
>>> TypeVariable 'T' in 'class
>>> pipelines.flink.testkit.FlinkSource$CollectionSourceFunction' could not be
>>> determined. This is most likely a type erasure problem. The type extraction
>>> currently supports types with generic variables only in cases where all
>>> variables in the return type can be deduced from the input type(s).
>>> Otherwise the type has to be specified explicitly using type information.
>>> [info]   at
>>> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:882)
>>> [info]   at
>>> org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:803)
>>> [info]   at
>>> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:769)
>>> [info]   at
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1459)
>>> [info]   at
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1414)
>>> [info]   at
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1396)
>>> [info]   at
>>> pipelines.flink.javadsl.FlinkStreamletTest.shouldProcessDataWhenItIsRun(FlinkStreamletTest.java:34)
>>> [info]   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> [info]   at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> [info]   at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> [info]   ...
>>>
>>> regards.
>>>
>>> On Sun, Aug 25, 2019 at 11:44 PM Rong Rong <walter...@gmail.com> wrote:
>>>
>>>> I am not sure how the function `readStream` is implemented (also which
>>>> version of Flink are you using?).
>>>> Can you share more information on your code blocks and exception logs?
>>>>
>>>> Also to answer your question, DataStream return type is determined by
>>>> its underlying transformation, so you cannot set it directly.
>>>>
>>>> Thanks,
>>>> Rong
>>>>
>>>> On Sat, Aug 24, 2019 at 12:29 PM Debasish Ghosh <
>>>> ghosh.debas...@gmail.com> wrote:
>>>>
>>>>> Thanks .. I tried this ..
>>>>>
>>>>> DataStream<Data> ins = readStream(in, Data.class, serdeData).map((Data
>>>>> d) -> d).returns(new TypeHint<Data>(){}.getTypeInfo());
>>>>>
>>>>> But still get the same error on this line ..
>>>>>
>>>>> (BTW I am not sure how to invoke returns on a DataStream and hence
>>>>> had to do a fake map - any suggestions here ?)
>>>>>
>>>>> regards.
>>>>>
>>>>> On Sat, Aug 24, 2019 at 10:26 PM Rong Rong <walter...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Debasish,
>>>>>>
>>>>>> I think the error refers to the output of your source instead of your
>>>>>> result of the map function. E.g.
>>>>>> DataStream<Data> ins = readStream(in, Data.class, serdeData)*.returns(new
>>>>>> TypeInformation<Data>);*
>>>>>> DataStream<Simple> simples = ins.map((Data d) -> new
>>>>>> Simple(d.getName())).returns(new TypeHint<Simple>(){}.getTypeInfo());
>>>>>>
>>>>>> --
>>>>>> Rong
>>>>>>
>>>>>> On Fri, Aug 23, 2019 at 9:55 AM Debasish Ghosh <
>>>>>> ghosh.debas...@gmail.com> wrote:
>>>>>>
>>>>>>> Hello -
>>>>>>>
>>>>>>> I have the following call to addSource where I pass a Custom
>>>>>>> SourceFunction ..
>>>>>>>
>>>>>>> env.<Data>addSource(
>>>>>>>   new CollectionSourceFunctionJ<Data>(data,
>>>>>>> TypeInformation.<Data>of(new TypeHint<Data>(){}))
>>>>>>> )
>>>>>>>
>>>>>>> where data is List<Data> and CollectionSourceFunctionJ is a Scala
>>>>>>> case class ..
>>>>>>>
>>>>>>> case class CollectionSourceFunctionJ[T](data: java.util.List[T], ti:
>>>>>>> TypeInformation[T]) extends SourceFunction[T] {
>>>>>>>   def cancel(): Unit = {}
>>>>>>>   def run(ctx: SourceContext[T]): Unit = {
>>>>>>>     data.asScala.foreach(d ⇒ ctx.collect(d))
>>>>>>>   }
>>>>>>> }
>>>>>>>
>>>>>>> When the following transformation runs ..
>>>>>>>
>>>>>>> DataStream<Data> ins = readStream(in, Data.class, serdeData);
>>>>>>> DataStream<Simple> simples = ins.map((Data d) -> new
>>>>>>> Simple(d.getName())).returns(new TypeHint<Simple>(){}.getTypeInfo());
>>>>>>>
>>>>>>> I get the following exception in the second line ..
>>>>>>>
>>>>>>> org.apache.flink.api.common.functions.InvalidTypesException: The
>>>>>>>> return type of function 'Custom Source' could not be determined
>>>>>>>> automatically, due to type erasure. You can give type information 
>>>>>>>> hints by
>>>>>>>> using the returns(...) method on the result of the transformation 
>>>>>>>> call, or
>>>>>>>> by letting your function implement the 'ResultTypeQueryable' interface.
>>>>>>>
>>>>>>>
>>>>>>> Initially the returns call was not there and I was getting the same
>>>>>>> exception. Now after adding the returns call, nothing changes.
>>>>>>>
>>>>>>> Any help will be appreciated ..
>>>>>>>
>>>>>>> regards.
>>>>>>>
>>>>>>> --
>>>>>>> Debasish Ghosh
>>>>>>> http://manning.com/ghosh2
>>>>>>> http://manning.com/ghosh
>>>>>>>
>>>>>>> Twttr: @debasishg
>>>>>>> Blog: http://debasishg.blogspot.com
>>>>>>> Code: http://github.com/debasishg
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Debasish Ghosh
>>>>> http://manning.com/ghosh2
>>>>> http://manning.com/ghosh
>>>>>
>>>>> Twttr: @debasishg
>>>>> Blog: http://debasishg.blogspot.com
>>>>> Code: http://github.com/debasishg
>>>>>
>>>>
>>>
>>> --
>>> Debasish Ghosh
>>> http://manning.com/ghosh2
>>> http://manning.com/ghosh
>>>
>>> Twttr: @debasishg
>>> Blog: http://debasishg.blogspot.com
>>> Code: http://github.com/debasishg
>>>
>>
>>
>> --
>> Debasish Ghosh
>> http://manning.com/ghosh2
>> http://manning.com/ghosh
>>
>> Twttr: @debasishg
>> Blog: http://debasishg.blogspot.com
>> Code: http://github.com/debasishg
>>
>
>
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg
>

Reply via email to