Thank you very much for the detailed answer. Now I understand a DataStream
can be repartitioned or “joined” (don’t know the exact terminology) with
keyBy.

But another question:
Despite the non-existence of incremental top-k algorithm, I’d like to
incrementally compute the local word count during one hour, probably using
a TreeMap for counting. As soon as the hour finishes, the TreeMap is
converted to a stream of Tuple2 and forwarded to the remaining computation
thereafter. I’m concerned about the memory usage: the TreeMap and the
Tuple2 collection hold a huge amount of items, do I have to do some custom
memory management?

I’m also not sure whether a TreeMap is suitable here. This StackOverflow
question presents a similar approach:
http://stackoverflow.com/questions/34681887/how-apache-flink-deal-with-skewed-data,
but the suggested solution seems rather complicated.

On 8 June 2016 at 08:04, Jamie Grier <ja...@data-artisans.com> wrote:

> Suggestions in-line below...
>
> On Mon, Jun 6, 2016 at 7:26 PM, Yukun Guo <gyk....@gmail.com> wrote:
>
>> Hi,
>>
>> I'm working on a project which uses Flink to compute hourly log statistics
>> like top-K. The logs are fetched from Kafka by a FlinkKafkaProducer and
>> packed
>> into a DataStream.
>>
>> The problem is, I find the computation quite challenging to express with
>> Flink's DataStream API:
>>
>> 1. If I use something like `logs.timeWindow(Time.hours(1))`, suppose that
>> the
>> data volume is really high, e.g., billions of logs might be generated in
>> one
>> hour, will the window grow too large and can't be handled efficiently?
>>
>
> In the general case you can use:
>
>     stream
>         .timeWindow(...)
>         .apply(reduceFunction, windowFunction)
>
> which can take a ReduceFunction and a WindowFunction.  The ReduceFunction
> is used to reduce the state on the fly and thereby keep the total state
> size low.  This can commonly be used in analytics applications to reduce
> the state size that you're accumulating for each window.  In the specific
> case of TopK, however, you cannot do this if you want an exact result.  To
> get an exact result I believe you have to actually keep around all of the
> data and then calculate TopK at the end in your WindowFunction.  If you are
> able to use approximate algorithms for your use case than you can calculate
> a probabilistic incremental TopK based on some sort of sketch-based
> algorithm.
>
>>
>> 2. We have to create a `KeyedStream` before applying `timeWindow`.
>> However,
>> the distribution of some keys are skewed hence using them may compromise
>> the performance due to unbalanced partition loads. (What I want is just
>> rebalance the stream across all partitions.)
>>
>
> A good and simple way to approach this may be to come up with a composite
> key for your data that *is* uniformly distributed.  You can imagine
> something simple like 'natural_key:random_number'.  Then keyBy(natural_key)
> and reduce() again.  For example:
>
>     stream
>         .keyBy(key, rand())      // partition by composite key that is
> uniformly distributed
>         .timeWindow(1 hour)
>         .reduce()                     // pre-aggregation
>         .keyBy(key)                // repartition
>         .timeWindow(1 hour)
>         .reduce()                     // final aggregation
>
>
>>
>> 3. The top-K algorithm can be straightforwardly implemented with
>> `DataSet`'s
>> `mapPartition` and `reduceGroup` API as in
>> [FLINK-2549](https://github.com/apache/flink/pull/1161/), but not so
>> easy if
>> taking the DataStream approach, even with the stateful operators. I still
>> cannot figure out how to reunion streams once they are partitioned.
>>
>>     I'm not sure I know what you're trying to do here.  What do you mean
> by re-union?
>
>
>> 4. Is it possible to convert a DataStream into a DataSet? If yes, how can
>> I
>> make Flink analyze the data incrementally rather than aggregating the
>> logs for
>> one hour before starting to process?
>>
>> There is no direct way to turn a DataStream into a DataSet.  I addressed
> the point about doing the computation incrementally above, though.  You do
> this with a ReduceFunction.  But again, there doesn't exist an exact
> incremental TopK algorithm that I'm aware of.  This can be done with
> sketching, though.
>
>
> --
>
> Jamie Grier
> data Artisans, Director of Applications Engineering
> @jamiegrier <https://twitter.com/jamiegrier>
> ja...@data-artisans.com
>
>

Reply via email to