Deduplicate messages from Kafka topic

2017-01-14 Thread ljwagerfield
As I understand it, the Flink Kafka Producer may emit duplicates to Kafka
topics.

How can I deduplicate these messages when reading them back with Flink (via
the Flink Kafka Consumer)?

For example, is there any out-the-box support for deduplicating messages,
i.e. by incorporating something like "idempotent producers" as proposed by
Jay Krepps (which, as I understand it, involves maintaining a "high
watermark" on a message-by-message level)?



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Deduplicate-messages-from-Kafka-topic-tp11051.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Kafka topic partition skewness causes watermark not being emitted

2017-01-14 Thread tao xiao
The case I described was for experiment only but data skewness would happen
in production. The current implementation will block the watermark emission
to downstream until all partition move forward which has great impact on
latency. It may be a good idea to expose an API to users to decide what the
best way is to control watermark emission

On Fri, 13 Jan 2017 at 21:57 Tzu-Li (Gordon) Tai 
wrote:

> Hi,
>
> This is expected behaviour due to how the per-partition watermarks are
> designed in the Kafka consumer, but I think it’s probably a good idea to
> handle idle partitions also when the Kafka consumer itself emits
> watermarks. I’ve filed a JIRA issue for this:
> https://issues.apache.org/jira/browse/FLINK-5479.
>
> For the time being, I don’t think there will be an easy way to avoid this
> with the existing APIs, unfortunately. Is the skewed partition data
> intentional, or only for experimental purposes?
>
> Best,
> Gordon
>
> On January 12, 2017 at 5:28:40 PM, tao xiao (xiaotao...@gmail.com) wrote:
>
> Hi team,
>
> I have a topic with 2 partitions in Kafka. I produced all data to
> partition 0 and no data to partition 1. I created a Flink job with
> parallelism to 1 that consumes that topic and count the events with session
> event window (5 seconds gap). It turned out that the session event window
> was never closed even I sent a message with 10 minutes gap. After digging
> into the source code, AbstractFetcher[1] that is responsible for sending
> watermark to downstream calculates the min watermark of all partitions. Due
> to the fact that we don't have data in partition 1, the watermark returned
> from partition 1is always Long.MIN_VALUE therefore AbstractFetcher never
> fires the watermark to downstream.
>
> I want to know if this is expected behavior or a bug. If this is expected
> behavior how do I avoid the delay of watermark firing when data is not
> evenly distributed to all partitions?
>
> This is the timestamp extractor I used
>
> public class ExactTimestampExtractor implements
> AssignerWithPeriodicWatermarks {
>
> private long currentMaxTimestamp = Long.MIN_VALUE;
>
> @Nullable
> @Override
> public Watermark getCurrentWatermark() {
> return new Watermark(currentMaxTimestamp == Long.MIN_VALUE ?
> Long.MIN_VALUE : currentMaxTimestamp - 1);
> }
>
> @Override
> public long extractTimestamp(SessionEvent element, long
> previousElementTimestamp) {
> long eventStartTime = (long) element.get(SessionEvent.SESSION_START_DT);
> if (eventStartTime > currentMaxTimestamp) {
> currentMaxTimestamp = eventStartTime;
> }
>
> return eventStartTime;
> }
> }
>
> and this is the Flink topo
>
> // get input data
> FlinkKafkaConsumer010 consumer = new
> FlinkKafkaConsumer010<>("topic4",
> new MyOwnSchema()
> consumer.assignTimestampsAndWatermarks(new ExactTimestampExtractor());
> DataStream input = env.addSource(consumer);
>
> input.
> keyBy("id").
> window(EventTimeSessionWindows.withGap(Time.seconds(5))).
> reduce(new Reducer(), new WindowFunction()).
> print();
>
> //// execute program
> env.execute("a job");
>
> I used the latest code in github
>
> [1]
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java#L539
>
>


Re: Queryable State

2017-01-14 Thread Dawid Wysakowicz
Hi Nico,

Recently I've tried the queryable state a bit differently, by using
ValueState with a value of a util.ArrayList and a ValueSerializer for
util.ArrayList and it works as expected.

The non-working example you can browse here:
https://github.com/dawidwys/flink-intro/tree/c66f01117b0fe3c0adc8923000543a70a6fe2219
The working example here:
https://github.com/dawidwys/flink-intro/tree/master
(The QueryableJob is in module flink-queryable-job and the QueryClient in
flink-state-server)

Sure, I am aware of the downfall of the ListState. I need it just for
presentational purpose, but you may be right there might not be any
production use for this state and it should be removed.
Maybe the problem is just with the ListState and removing it would resolve
also my problem :)

Regards
Dawid Wysakowicz


2017-01-13 18:50 GMT+01:00 Nico Kruber :

> Hi Dawid,
> I'll try to reproduce the error in the next couple of days. Can you also
> share
> the value deserializer you use? Also, have you tried even smaller examples
> in
> the meantime? Did they work?
>
> As a side-note in general regarding the queryable state "sink" using
> ListState
> (".asQueryableState(, ListStateDescriptor)"): everything that enters
> this operator will be stored forever and never cleaned. Eventually, it will
> pile up too much memory and is thus of limited use. Maybe it should even be
> removed from the API.
>
>
> Nico
>
> On Tuesday, 10 January 2017 19:43:40 CET Dawid Wysakowicz wrote:
> > Hey Ufuk.
> > Did you maybe had a while to have a look at that problem?
> >
> > 2017-01-09 10:47 GMT+01:00 Ufuk Celebi :
> > > Hey Dawid! Thanks for reporting this. I will try to have a look over
> > > the course of the day. From a first impression, this seems like a bug
> > > to me.
> > >
> > > On Sun, Jan 8, 2017 at 4:43 PM, Dawid Wysakowicz
> > >
> > >  wrote:
> > > > Hi I was experimenting with the Query State feature and I have some
> > >
> > > problems
> > >
> > > > querying the state.
> > > >
> > > > The code which I use to produce the queryable state is:
> > > > env.addSource(kafkaConsumer).map(
> > > >
> > > >   e => e match {
> > > >
> > > > case LoginClickEvent(_, t) => ("login", 1, t)
> > > > case LogoutClickEvent(_, t) => ("logout", 1, t)
> > > > case ButtonClickEvent(_, _, t) => ("button", 1, t)
> > > >
> > > >   }).keyBy(0).timeWindow(Time.seconds(1))
> > > >   .reduce((e1, e2) => (e1._1, e1._2 + e2._2, Math.max(e1._3,
> > > >   e2._3)))
> > > >   .map(e => new KeyedDataPoint[java.lang.Integer](e._1, e._3,
> e._2))
> > > >   .keyBy("key")
> > > >   .asQueryableState(
> > > >
> > > > "type-time-series-count",
> > > > new ListStateDescriptor[KeyedDataPoint[java.lang.Integer]](
> > > >
> > > >   "type-time-series-count",
> > > >   classOf[KeyedDataPoint[java.lang.Integer]]))
> > > >
> > > > As you see it is a rather simple job, in which I try to count events
> of
> > > > different types in windows and then query by event type.
> > > >
> > > > In client code I do:
> > > > // Query Flink state
> > > > val future = client.getKvState(jobId, "type-time-series-count",
> > > >
> > > > key.hashCode, seralizedKey)
> > > >
> > > > // Await async result
> > > > val serializedResult: Array[Byte] = Await.result(
> > > >
> > > >   future, new FiniteDuration(
> > > >
> > > > 10,
> > > > duration.SECONDS))
> > > >
> > > > // Deserialize response
> > > > val results = deserializeResponse(serializedResult)
> > > >
> > > > results
> > > >
> > > >   }
> > > >
> > > >   private def deserializeResponse(serializedResult: Array[Byte]):
> > > > util.List[KeyedDataPoint[lang
> > > >
> > > >   .Integer]] = {
> > > >
> > > > KvStateRequestSerializer.deserializeList(serializedResult,
> > > >
> > > > getValueSerializer())
> > > >
> > > >   }
> > > >
> > > > As I was trying to debug the issue I see the first element in list
> gets
> > > > deserialized correctly, but it fails on the second one. It seems like
> > > > the
> > > > serialized result is broken. Do you have any idea if I am doing sth
> > >
> > > wrong or
> > >
> > > > there is some bug?
> > > >
> > > >
> > > > The exception I get is:
> > > > java.io.EOFException: null
> > > > at
> > > > org.apache.flink.runtime.util.DataInputDeserializer.readFully(
> > >
> > > DataInputDeserializer.java:157)
> > >
> > > > at
> > > > org.apache.flink.runtime.util.DataInputDeserializer.readUTF(
> > >
> > > DataInputDeserializer.java:240)
> > >
> > > > at
> > > > org.apache.flink.api.java.typeutils.runtime.
> PojoSerializer.deserialize(
> > >
> > > PojoSerializer.java:386)
> > >
> > > > at
> > > > org.apache.flink.runtime.query.netty.message.
> KvStateRequestSerializer.
> > >
> > > deserializeList(KvStateRequestSerializer.java:487)
> > >
> > > > at
> > > > com.dataartisans.stateserver.queryclient.QueryClient.
> > >
> > > 

Re: Terminology: Split, Group and Partition

2017-01-14 Thread Robert Schmidtke
Hi Fabian,

I have opened a ticket for that, thanks.

I have another question: now that I have obtained the proper local
grouping, I did some aggregation of type [T] -> U, where one aggregated
object is of type U, containing information of zero or more Ts. The Us are
still tied to the hostname, and have the property hostname=hostX for the
workers they're executed on, just like before. Is it possible to specify
the grouping/partitioning for DataSets that are not DataSources, just like
you suggested before? Because my guess is that the grouping information is
lost when going from T to U.

Best and thanks for the great help!
Robert

On Fri, Jan 13, 2017 at 8:54 PM, Fabian Hueske  wrote:

> I think so far getExecutionPlan() was only used for debugging purpose and
> not in programs that would also be executed.
> You can open a JIRA issue if you think that this would a valuable feature.
>
> Thanks, Fabian
>
> 2017-01-13 16:34 GMT+01:00 Robert Schmidtke :
>
>> Just a side note, I'm guessing there's a bug here:
>> https://github.com/apache/flink/blob/master/flink-
>> clients/src/main/java/org/apache/flink/client/program/
>> ContextEnvironment.java#L68
>>
>> It should say createProgramPlan("unnamed job", false);
>>
>> Otherwise I'm getting an exception complaining that no new sinks have
>> been added after the last execution. So currently it is not possible for me
>> to first get the execution plan and then run execute the program.
>>
>> Robert
>>
>> On Fri, Jan 13, 2017 at 3:14 PM, Robert Schmidtke > > wrote:
>>
>>> Hi Fabian,
>>>
>>> thanks for the quick and comprehensive reply. I'll have a look at the
>>> ExecutionPlan using your suggestion to check what actually gets computed,
>>> and I'll use the properties as well. If I stumble across something else
>>> I'll let you know.
>>>
>>> Many thanks again!
>>> Robert
>>>
>>> On Fri, Jan 13, 2017 at 2:40 PM, Fabian Hueske 
>>> wrote:
>>>
 Hi Robert,

 let me first describe what splits, groups, and partitions are.

 * Partition: This is basically all data that goes through the same task
 instance. If you have an operator with a parallelism of 80, you have 80
 partitions. When you call sortPartition() you'll have 80 sorted streams, if
 you call mapPartition you iterate over all records in one partition.
 * Split: Splits are a concept of InputFormats. An InputFormat can
 process several splits. All splits that are processed by the same data
 source task make up the partition of that task. So a split is a subset of a
 partition. In your case where each task reads exactly one split, the split
 is equivalent to the partition.
 * Group: A group is based on the groupBy attribute and hence
 data-driven and does not depend on the parallelism. A groupReduce requires
 a partitioning such that all records with the same grouping attribute are
 sent to the same operator, i.e., all are part of the same partition.
 Depending on the number of distinct grouping keys (and the hash-function) a
 partition can have zero, one, or more groups.

 Now coming to your use case. You have 80 sources running on 5 machines.
 All source on the same machine produce records with the same grouping key
 (hostname). You can actually give a hint to Flink, that the data returned
 by a split is partitioned, grouped, or sorted in a specific way. This works
 as follows:

 // String is hostname, Integer is parallel id of the source task
 DataSet> = env.createInput(YourFormat);
 SplitDataProperties> splitProps =
 ((DataSource)text).getSplitDataProperties();
 splitProps.splitsGroupedBy(0,1)
 splitProps.splitsPartitionedBy(0,1)

 With this info, Flink knows that the data returned by our source is
 partitioned and grouped. Now you can do groupBy(0,1).groupReduce(XXX) to
 run a local groupReduce operation on each of the 80 tasks (hostname and
 parallel index result in 80 keys) and locally reduce the data.
 Next step would be another .groupBy(0).groupReduce() which gives 16
 groups which are distributed across your tasks.

 However, you have to be careful with the SplitDataProperties. If you
 get them wrong, the optimizer makes false assumption and the resulting plan
 might not compute what you are looking for.
 I'd recommend to read the JavaDocs and play a bit with this feature to
 see how it behaves. ExecutionEnvironment.getExecutionPlan() can help
 to figure out what is happening.

 Best,
 Fabian


 2017-01-13 12:14 GMT+01:00 Robert Schmidtke :

> Hi all,
>
> I'm having some trouble grasping what the meaning of/difference
> between the following concepts is:
>
> - Split
> - Group
> - Partition
>
> 

Re: Reading compressed XML data

2017-01-14 Thread Robert Metzger
Hi Sebastian,

I'm not aware of a better way of implementing this in Flink. You could
implement your own XmlInputFormat using Flink's InputFormat abstractions,
but you would end up with almost exactly the same code as Mahout / Hadoop.
I wonder why the decompression with the XmlInputFormat doesn't work. Did
you get any exception?

Regards,
Robert


On Wed, Jan 11, 2017 at 4:31 PM, Sebastian Neef <
gehax...@mailbox.tu-berlin.de> wrote:

> Hi,
>
> what's the best way to read a compressed (bz2 / gz) XML file splitting
> it at a specific XML-tag?
>
> So far I've been using hadoop's TextInputFormat in combination with
> mahouts XmlInputFormat ([0]) with env.readHadoopFile(). Whereas the
> plain TextInputFormat can handle compressed data, the XmlInputFormat
> can't for some reason.
>
> Is there a flink-ish way to accomplish this?
>
> Best regards,
> Sebastian
>
> [0]:
> https://github.com/apache/mahout/blob/ad84344e4055b1e6adff5779339a33
> fa29e1265d/examples/src/main/java/org/apache/mahout/classifier/bayes/
> XmlInputFormat.java
>