The KafkaSink is the last step in my program after the 2nd deduplication. I could not yet track down where duplicates show up. That's a bit difficult to find out... But I'm trying to find it...
> Am 03.09.2015 um 14:14 schrieb Stephan Ewen <se...@apache.org>: > > Can you tell us where the KafkaSink comes into play? At what point do the > duplicates come up? > >> On Thu, Sep 3, 2015 at 2:09 PM, Rico Bergmann <i...@ricobergmann.de> wrote: >> No. I mean the KafkaSink. >> >> A bit more insight to my program: I read from a Kafka topic with >> flinkKafkaConsumer082, then hashpartition the data, then I do a >> deduplication (does not eliminate all duplicates though). Then some >> computation, afterwards again deduplication (group by message in a window of >> last 2 seconds). >> >> Of course the last deduplication is not perfect. >> >> Cheers. Rico. >> >> >> >>> Am 03.09.2015 um 13:29 schrieb Stephan Ewen <se...@apache.org>: >>> >>> Do you mean the KafkaSource? >>> >>> Which KafkaSource are you using? The 0.9.1 FlinkKafkaConsumer082 or the >>> KafkaSource? >>> >>>> On Thu, Sep 3, 2015 at 1:10 PM, Rico Bergmann <i...@ricobergmann.de> wrote: >>>> Hi! >>>> >>>> Testing it with the current 0.10 snapshot is not easily possible atm >>>> >>>> But I deactivated checkpointing in my program and still get duplicates in >>>> my output. So it seems not only to come from the checkpointing feature, or? >>>> >>>> May be the KafkaSink is responsible for this? (Just my guess) >>>> >>>> Cheers Rico. >>>> >>>> >>>> >>>>> Am 01.09.2015 um 15:37 schrieb Aljoscha Krettek <aljos...@apache.org>: >>>>> >>>>> Hi Rico, >>>>> unfortunately the 0.9 branch still seems to have problems with exactly >>>>> once processing and checkpointed operators. We reworked how the >>>>> checkpoints are handled for the 0.10 release so it should work well >>>>> there. >>>>> >>>>> Could you maybe try running on the 0.10-SNAPSHOT release and see if the >>>>> problems persist there? >>>>> >>>>> Cheers, >>>>> Aljoscha >>>>> >>>>>> On Tue, 1 Sep 2015 at 14:38 Dipl.-Inf. Rico Bergmann >>>>>> <i...@ricobergmann.de> wrote: >>>>>> Hi! >>>>>> >>>>>> I still have an issue... I was now using 0.9.1 and the new >>>>>> KafkaConnector. But I still get duplicates in my flink prog. Here's the >>>>>> relevant part: >>>>>> >>>>>> final FlinkKafkaConsumer082<String> kafkaSrc = new >>>>>> FlinkKafkaConsumer082<String>( >>>>>> kafkaTopicIn, new SimpleStringSchema(), properties); >>>>>> >>>>>> DataStream<String> start = env.addSource(kafkaSrc) >>>>>> .setParallelism(numReadPartitions); //numReadPartitions = 2 >>>>>> >>>>>> DataStream<JSONObject> jsonized = start >>>>>> .flatMap(new ExtractAndFilterJSON()); >>>>>> >>>>>> DataStream<Session> sessions = jsonized >>>>>> .partitionByHash(new KeySelector<JSONObject, String>() { >>>>>> /** >>>>>> * partition by session id >>>>>> */ >>>>>> @Override >>>>>> public String getKey(JSONObject value) >>>>>> throws Exception { >>>>>> try { >>>>>> return /*session id*/; >>>>>> } catch (Exception e) { >>>>>> LOG.error("no session could be retrieved", e); >>>>>> } >>>>>> return ""; >>>>>> } >>>>>> }).flatMap(new StatefulSearchSessionizer()); >>>>>> >>>>>> In the StatefulSearchSessionizer I receive duplicates sporadically. I'm >>>>>> sure that the kafka topic I'm reading from does not contain any >>>>>> duplicates. So it must be in the flink program ... >>>>>> >>>>>> Any ideas? >>>>>> >>>>>> Cheers, Rico. >