Glad to hear it! Cheers, Till
On Sun, Nov 8, 2020 at 8:02 PM Simone Cavallarin <cavalla...@hotmail.com> wrote: > Hi Till, > > That's great! thank you so much!!! I have spent one week on this. I'm so > relieved! > > Cheers > > s > > > ------------------------------ > *From:* Till Rohrmann <trohrm...@apache.org> > *Sent:* 06 November 2020 17:56 > *To:* Simone Cavallarin <cavalla...@hotmail.com> > *Cc:* user@flink.apache.org <user@flink.apache.org>; Aljoscha Krettek < > aljos...@apache.org> > *Subject:* Re: How to use properly the function: > withTimestampAssigner((event, timestamp) ->.. > > Hi Simone, > > The problem is that the Java 1.8 compiler cannot do type inference when > chaining methods [1]. > > The solution would be > > WatermarkStrategy<Event> wmStrategy = > WatermarkStrategy > .<Event>forMonotonousTimestamps() > .withTimestampAssigner((event, timestamp) -> { > return event.getTime(); > }); > > @Aljoscha Krettek <aljos...@apache.org> I think we need to update the > documentation about it. We have some examples which don't take this into > account. > > [1] > https://e.printstacktrace.blog/java-type-inference-generic-methods-chain-call/ > > Cheers, > Till > > On Fri, Nov 6, 2020 at 4:19 PM Simone Cavallarin <cavalla...@hotmail.com> > wrote: > > Hi, > > I'm taking the timestamp from the event payload that I'm receiving from > Kafka. > > I'm struggling to get the time and I'm confused on how I should use the > function ".withTimestampAssigner()". I'm receiving an error on event. > getTime() that is telling me: *"cannot resolve method "Get Time" in > "Object"* and I really don't understand how I can fix it. My class is > providing a long so the variable itself should be fine. Any help would be > really appreciated. > > *This is my code:* > > * FlinkKafkaConsumer<Event> kafkaData =* > * new FlinkKafkaConsumer("CorID_0", new > EventDeserializationSchema(), p);* > * WatermarkStrategy<Event> wmStrategy =* > * WatermarkStrategy* > * .forMonotonousTimestamps()* > * .withTimestampAssigner((event, timestamp) -> { > return event.**getTime();* > * });* > > * DataStream<Event> stream = env.addSource(* > * kafkaData.assignTimestampsAndWatermarks(wmStrategy));* > > > And to give you the idea of the whole project, > > *This is the EventDeserializationSchema class:* > > *public class EventDeserializationSchema implements > DeserializationSchema<Event> {* > > * private static final long serialVersionUID = 1L;* > > > * private static final CsvSchema schema = CsvSchema.builder()* > * .addColumn("firstName")* > * .addColumn("lastName")* > * .addColumn("age", CsvSchema.ColumnType.NUMBER)* > * .addColumn("time")* > * .build();* > > * private static final ObjectMapper mapper = new CsvMapper();* > > * @Override* > * public Event deserialize(byte[] message) throws IOException {* > * return > mapper.readerFor(Event.class).with(schema).readValue(message);* > * }* > > * @Override* > * public boolean isEndOfStream(Event nextElement) {* > * return false;* > * }* > > * @Override* > * public TypeInformation<Event> getProducedType() {* > > * return TypeInformation.of(Event.class);* > * }* > *}* > > *And this is the Event Class:* > > *public class Event implements Serializable {* > * public String firstName;* > * public String lastName;* > * private int age;* > * public Long time;* > > > > * public Event() {* > * }* > > * public String getFirstName() {* > * return firstName;* > * }* > > * public void setFirstName(String firstName) {* > * this.firstName = firstName;* > * }* > > * public String getLastName() {* > * return lastName;* > * }* > > * public void setLastName(String lastName) {* > * this.lastName = lastName;* > * }* > > * public int getAge() {* > * return age;* > * }* > > * public void setAge(int age) {* > * this.age = age;* > * }* > > * public long getTime() {* > * return time;* > * }* > > * public void setTime(String kafkaTime) {* > * long tn = > OffsetDateTime.parse(kafkaTime).toInstant().toEpochMilli();* > * this.time = tn;* > * }* > *}* > > > > > >