Hi Timo 
"You don't need to specify the type in .flatMap() explicitly. It will be 
automatically extracted using the generic signature of DataDataConverter.”
It does not. That is the reason why I had to add it there

> Regarding your error. Make sure that you don't mix up the API classes. If you 
> want to use the Java API you should not use 
> "org.apache.flink.streaming.api.scala.DataStream" but the Java one.
I rewrote the class in Java. Thats why I am so confused



Boris Lublinsky
FDP Architect
boris.lublin...@lightbend.com
https://www.lightbend.com/

> 
> From: Timo Walther <twal...@apache.org>
> Subject: Re: Java types
> Date: January 11, 2018 at 3:07:08 AM CST
> To: user@flink.apache.org
> 
> 
> Hi Boris,
> 
> each API is designed language-specific so they might not always be the same. 
> Scala has better type extraction features and let you write code very 
> precisely. Java requires sometime more code to archieve the same.
> 
> You don't need to specify the type in .flatMap() explicitly. It will be 
> automatically extracted using the generic signature of DataDataConverter.
> 
> Regarding your error. Make sure that you don't mix up the API classes. If you 
> want to use the Java API you should not use 
> "org.apache.flink.streaming.api.scala.DataStream" but the Java one.
> 
> Regards,
> Timo
> 
> 
> 
> Am 1/11/18 um 5:13 AM schrieb Boris Lublinsky:
>> More questions
>> In Scala my DataProcessor is defined as
>> class DataProcessorKeyed extends CoProcessFunction[WineRecord, ModelToServe, 
>> Double] with CheckpointedFunction {
>> And it is used as follows
>> val models = modelsStream.map(ModelToServe.fromByteArray(_))
>>   .flatMap(BadDataHandler[ModelToServe])
>>   .keyBy(_.dataType)
>> val data = dataStream.map(DataRecord.fromByteArray(_))
>>   .flatMap(BadDataHandler[WineRecord])
>>   .keyBy(_.dataType)
>> 
>> // Merge streams
>> data
>>   .connect(models)
>>   .process(DataProcessorKeyed())
>> When I am doing the same thing in Java
>> public class DataProcessorKeyed extends 
>> CoProcessFunction<Winerecord.WineRecord, ModelToServe, Double> implements 
>> CheckpointedFunction{
>> Which I am using as follows
>> // Read data from streams
>> DataStream<Tuple2<String, ModelToServe>> models = modelsStream
>>         .flatMap(new ModelDataConverter(), new 
>> TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, 
>> TypeInformation.of(ModelToServe.class)))
>>         .keyBy(0);
>> DataStream<Tuple2<String, Winerecord.WineRecord>> data = dataStream
>>         .flatMap(new DataDataConverter(), new 
>> TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, 
>> TypeInformation.of(Winerecord.WineRecord.class)))
>>         .keyBy(0);
>> 
>> // Merge streams
>> data
>>         .connect(models)
>>         .process(new DataProcessorKeyed());
>> I am getting an error
>> 
>> Error:(68, 17) java: no suitable method found for keyBy(int)
>>     method 
>> org.apache.flink.streaming.api.scala.DataStream.keyBy(scala.collection.Seq<java.lang.Object>)
>>  is not applicable
>>       (argument mismatch; int cannot be converted to 
>> scala.collection.Seq<java.lang.Object>)
>>     method 
>> org.apache.flink.streaming.api.scala.DataStream.<K>keyBy(scala.Function1<org.apache.flink.api.java.tuple.Tuple2<java.lang.String,com.lightbend.model.ModelToServe>,K>,org.apache.flink.api.common.typeinfo.TypeInformation<K>)
>>  is not applicable
>>       (cannot infer type-variable(s) K
>>         (actual and formal argument lists differ in length))
>> So it assumes key/value pairs for the coprocessor
>> 
>> Why is such difference between APIs?
>> 
>> Boris Lublinsky
>> FDP Architect
>> boris.lublin...@lightbend.com <mailto:boris.lublin...@lightbend.com>
>> https://www.lightbend.com/ <https://www.lightbend.com/>
>>> On Jan 10, 2018, at 6:20 PM, Boris Lublinsky <boris.lublin...@lightbend.com 
>>> <mailto:boris.lublin...@lightbend.com>> wrote:
>>> 
>>> I am trying to covert Scala code (which works fine) to Java
>>> The sacral code is:
>>> // create a Kafka consumers
>>> // Data
>>> val dataConsumer = new FlinkKafkaConsumer010[Array[Byte]](
>>>   DATA_TOPIC,
>>>   new ByteArraySchema,
>>>   dataKafkaProps
>>> )
>>> 
>>> // Model
>>> val modelConsumer = new FlinkKafkaConsumer010[Array[Byte]](
>>>   MODELS_TOPIC,
>>>   new ByteArraySchema,
>>>   modelKafkaProps
>>> )
>>> 
>>> // Create input data streams
>>> val modelsStream = env.addSource(modelConsumer)
>>> val dataStream = env.addSource(dataConsumer)
>>> 
>>> // Read data from streams
>>> val models = modelsStream.map(ModelToServe.fromByteArray(_))
>>>   .flatMap(BadDataHandler[ModelToServe])
>>>   .keyBy(_.dataType)
>>> val data = dataStream.map(DataRecord.fromByteArray(_))
>>>   .flatMap(BadDataHandler[WineRecord])
>>>   .keyBy(_.dataType)
>>> Now I am trying to re write it to Java and fighting with the requirement of 
>>> providing types, where they should be obvious
>>> 
>>> // create a Kafka consumers
>>> // Data
>>> FlinkKafkaConsumer010<byte[]> dataConsumer = new FlinkKafkaConsumer010<>(
>>>         ModelServingConfiguration.DATA_TOPIC,
>>>         new ByteArraySchema(),
>>>         dataKafkaProps);
>>> 
>>> // Model
>>> FlinkKafkaConsumer010<byte[]>  modelConsumer = new FlinkKafkaConsumer010<>(
>>>         ModelServingConfiguration.MODELS_TOPIC,
>>>         new ByteArraySchema(),
>>>         modelKafkaProps);
>>> 
>>> // Create input data streams
>>> DataStream<byte[]> modelsStream = env.addSource(modelConsumer, 
>>> PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO);
>>> DataStream<byte[]> dataStream = env.addSource(dataConsumer, 
>>> PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO);
>>> // Read data from streams
>>> DataStream<Tuple2<String,ModelToServe>> models = modelsStream
>>>      .flatMap(new ModelConverter(), new 
>>> TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, 
>>> TypeInformation.of(ModelToServe.class)));
>>> 
>>> Am I missing something similar to import org.apache.flink.api.scala._
>>>  In java?
>>> 
>>> Now if this is an only way, Does this seems right?
>>> 
>>> Boris Lublinsky
>>> FDP Architect
>>> boris.lublin...@lightbend.com <mailto:boris.lublin...@lightbend.com>
>>> https://www.lightbend.com/ <https://www.lightbend.com/>
>> 

Reply via email to