I would assign timestamps directly at the source.
Timestamps are not striped of by operators.
Reassigning timestamps somewhere in the middle of a job can cause very
unexpected results.
2016-09-08 9:32 GMT+02:00 Dong-iL, Kim :
> Thanks for replying. pushpendra.
> The
I wanna assign timestamp after keyBy.
because the stream does not aligned before keyBy.
I’ve already tested as like your code.
It occured many warnings that timestamp monotony violated.
> On Sep 8, 2016, at 4:32 PM, Dong-iL, Kim wrote:
>
> Thanks for replying. pushpendra.
>
Please refer
https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamps_watermarks.html
for assigning timestamps.
You can do map after keyby to assign timestamps
e.g:
val withTimestampsAndWatermarks: DataStream[MyEvent] = stream
.filter( _.severity == WARNING )
Thanks for replying. pushpendra.
The assignTimestamp method returns not KeyedStream but DataStream.
so I cannot use windowing.
is it possible casting to KeyedStream?
Regards
> On Sep 8, 2016, at 3:12 PM, pushpendra.jaiswal
> wrote:
>
> Please refer
>
Hi Fabian
I am also looking for this solution, could you help me with two things:
1. How this is different from Queryable state.
2. How to query this key-value state from DS2 even if its running in the
same application.
e.g.
val keyedStream = stream.keyby(_.key)
val otherStream =
Hi Fabian,
First of all thanks for all your prompt responses. With regards to 2)
Multiple looks ups, I have to clarify what I mean by that...
DS1 elementKeyStream = stream1.map(String<>); this maps each
of the streaming elements into string mapped value...
DS2 = stream2.xxx();
Hi Frank,
input should be of DataSet[(BSONWritable, BSONWritable)], so a
Tuple2[BSONWritable, BSONWritable], right?
Something like this should work:
input.map( pair => pair._1.toString )
Pair is a Tuple2[BSONWritable, BSONWritable], and pair._1 accesses the key
of the pair.
Alternatively you
Hi Frank,
I didn't tried to run the code, but this does not show a compiler error in
IntelliJ:
> input.map( mapfunc2 _ )
Decomposing the Tuple2 into two separate arguments does only work with
Scala's pattern matching technique (this is the second approach I posted).
The Java API is not capable
Hello Fabian,
Thanks, your solution works indeed. however, i don't understand why.
When i replace the lambda by an explicit function
def mapfunc2(pair: Tuple2[BSONWritable, BSONWritable]) : String = {
return pair._1.toString
}
input.map mapfunc2
i get the error below, which seemingly