Hello -

I have the following call to addSource where I pass a Custom SourceFunction
..

env.<Data>addSource(
  new CollectionSourceFunctionJ<Data>(data, TypeInformation.<Data>of(new
TypeHint<Data>(){}))
)

where data is List<Data> and CollectionSourceFunctionJ is a Scala case
class ..

case class CollectionSourceFunctionJ[T](data: java.util.List[T], ti:
TypeInformation[T]) extends SourceFunction[T] {
  def cancel(): Unit = {}
  def run(ctx: SourceContext[T]): Unit = {
    data.asScala.foreach(d ⇒ ctx.collect(d))
  }
}

When the following transformation runs ..

DataStream<Data> ins = readStream(in, Data.class, serdeData);
DataStream<Simple> simples = ins.map((Data d) -> new
Simple(d.getName())).returns(new TypeHint<Simple>(){}.getTypeInfo());

I get the following exception in the second line ..

org.apache.flink.api.common.functions.InvalidTypesException: The return
> type of function 'Custom Source' could not be determined automatically, due
> to type erasure. You can give type information hints by using the
> returns(...) method on the result of the transformation call, or by letting
> your function implement the 'ResultTypeQueryable' interface.


Initially the returns call was not there and I was getting the same
exception. Now after adding the returns call, nothing changes.

Any help will be appreciated ..

regards.

-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg

Reply via email to