The KafkaSink is the last step in my program after the 2nd deduplication. 

I could not yet track down where duplicates show up. That's a bit difficult to 
find out... But I'm trying to find it...



> Am 03.09.2015 um 14:14 schrieb Stephan Ewen <se...@apache.org>:
> 
> Can you tell us where the KafkaSink comes into play? At what point do the 
> duplicates come up?
> 
>> On Thu, Sep 3, 2015 at 2:09 PM, Rico Bergmann <i...@ricobergmann.de> wrote:
>> No. I mean the KafkaSink. 
>> 
>> A bit more insight to my program: I read from a Kafka topic with 
>> flinkKafkaConsumer082, then hashpartition the data, then I do a 
>> deduplication (does not eliminate all duplicates though). Then some 
>> computation, afterwards again deduplication (group by message in a window of 
>> last 2 seconds). 
>> 
>> Of course the last deduplication is not perfect.
>> 
>> Cheers. Rico. 
>> 
>> 
>> 
>>> Am 03.09.2015 um 13:29 schrieb Stephan Ewen <se...@apache.org>:
>>> 
>>> Do you mean the KafkaSource?
>>> 
>>> Which KafkaSource are you using? The 0.9.1 FlinkKafkaConsumer082 or the 
>>> KafkaSource?
>>> 
>>>> On Thu, Sep 3, 2015 at 1:10 PM, Rico Bergmann <i...@ricobergmann.de> wrote:
>>>> Hi!
>>>> 
>>>> Testing it with the current 0.10 snapshot is not easily possible atm
>>>> 
>>>> But I deactivated checkpointing in my program and still get duplicates in 
>>>> my output. So it seems not only to come from the checkpointing feature, or?
>>>> 
>>>> May be the KafkaSink is responsible for this? (Just my guess)
>>>> 
>>>> Cheers Rico. 
>>>> 
>>>> 
>>>> 
>>>>> Am 01.09.2015 um 15:37 schrieb Aljoscha Krettek <aljos...@apache.org>:
>>>>> 
>>>>> Hi Rico,
>>>>> unfortunately the 0.9 branch still seems to have problems with exactly 
>>>>> once processing and checkpointed operators. We reworked how the 
>>>>> checkpoints are handled for the 0.10 release so it should work well 
>>>>> there. 
>>>>> 
>>>>> Could you maybe try running on the 0.10-SNAPSHOT release and see if the 
>>>>> problems persist there?
>>>>> 
>>>>> Cheers,
>>>>> Aljoscha
>>>>> 
>>>>>> On Tue, 1 Sep 2015 at 14:38 Dipl.-Inf. Rico Bergmann 
>>>>>> <i...@ricobergmann.de> wrote:
>>>>>> Hi!
>>>>>> 
>>>>>> I still have an issue... I was now using 0.9.1 and the new
>>>>>> KafkaConnector. But I still get duplicates in my flink prog. Here's the
>>>>>> relevant part:
>>>>>> 
>>>>>>          final FlinkKafkaConsumer082<String> kafkaSrc = new
>>>>>> FlinkKafkaConsumer082<String>(
>>>>>>              kafkaTopicIn, new SimpleStringSchema(), properties);
>>>>>> 
>>>>>>          DataStream<String> start = env.addSource(kafkaSrc)
>>>>>>              .setParallelism(numReadPartitions); //numReadPartitions = 2
>>>>>> 
>>>>>>          DataStream<JSONObject> jsonized = start
>>>>>>              .flatMap(new ExtractAndFilterJSON());
>>>>>> 
>>>>>>          DataStream<Session> sessions = jsonized
>>>>>>              .partitionByHash(new KeySelector<JSONObject, String>() {
>>>>>>              /**
>>>>>>               * partition by session id
>>>>>>               */
>>>>>>              @Override
>>>>>>              public String getKey(JSONObject value)
>>>>>>                  throws Exception {
>>>>>>                  try {
>>>>>>                  return /*session id*/;
>>>>>>                  } catch (Exception e) {
>>>>>>                  LOG.error("no session could be retrieved", e);
>>>>>>                  }
>>>>>>                  return "";
>>>>>>              }
>>>>>>              }).flatMap(new StatefulSearchSessionizer());
>>>>>> 
>>>>>> In the StatefulSearchSessionizer I receive duplicates sporadically. I'm
>>>>>> sure that the kafka topic I'm reading from does not contain any
>>>>>> duplicates. So it must be in the flink program ...
>>>>>> 
>>>>>> Any ideas?
>>>>>> 
>>>>>> Cheers, Rico.
> 

Reply via email to