Hi Hequn,
indeed the ReduceFunction is better than the ProcessWindowFunction. I
replaced and could check the improvement performance [1]. Thanks for that!
I will try a distinct count with the Table API.

The question that I am facing is that I want to use a HyperLogLog on a UDF
for DataStream. Thus I will be able to have an approximate distinct count
inside a window, like I did here [2]. After having my UDF I want to have my
own operator which process this approximation of distinct count. So I am
not sure with I can implement my own operator for the TableAPI. Can I?

[1]
https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/WordDistinctCountReduceWindowSocket.java
[2]
https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/WordHLLKeyedProcessWindowSocket.java

Thanks!
Felipe

*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
<https://felipeogutierrez.blogspot.com>*


On Thu, Jun 13, 2019 at 8:10 AM Hequn Cheng <chenghe...@gmail.com> wrote:

> Hi Felipe,
>
> From your code, I think you want to get the "count distinct" result
> instead of the "distinct count". They contain a different meaning.
>
> To improve the performance, you can replace
> your DistinctProcessWindowFunction to a DistinctProcessReduceFunction. A
> ReduceFunction can aggregate the elements of a window incrementally, while
> for ProcessWindowFunction, elements cannot be incrementally aggregated but
> instead need to be buffered internally until the window is considered ready
> for processing.
>
> > Flink does not have a built-in operator which does this computation.
> Flink does have built-in operators to solve your problem. You can use
> Table API & SQL to solve your problem. The code looks like:
>
> public static void main(String[] args) throws Exception {
>    StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
>
>    DataStream ds = env.socketTextStream("localhost", 9000);
>    tableEnv.registerDataStream("sourceTable", ds, "line, proctime.proctime");
>
>    SplitTableFunction splitFunc = new SplitTableFunction();
>    tableEnv.registerFunction("splitFunc", splitFunc);
>    Table result = tableEnv.scan("sourceTable")
>          .joinLateral("splitFunc(line) as word")
>          .window(Tumble.over("5.seconds").on("proctime").as("w"))
>          .groupBy("w")
>          .select("count.distinct(word), collect.distinct(word)");
>
>    tableEnv.toAppendStream(result, Row.class).print();
>    env.execute();
> }
>
> Detail code can be found here[1].
>
> At the same time, you can perform two-stage window to improve the
> performance, i.e., the first window aggregate is used to distinct words and
> the second window used to get the final results.
>
> Document about Table API and SQL can be found here[2][3].
>
> Best, Hequn
>
> [1]
> https://github.com/hequn8128/flink/commit/b4676a5730cecabe2931b9e628aaebd7729beab2
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tableApi.html
> [3]
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/sql.html
>
>
> On Wed, Jun 12, 2019 at 9:19 PM Felipe Gutierrez <
> felipe.o.gutier...@gmail.com> wrote:
>
>> Hi Rong, I implemented my solution using a ProcessingWindow
>> with timeWindow and a ReduceFunction with timeWindowAll [1]. So for the
>> first window I use parallelism and the second window I use to merge
>> everything on the Reducer. I guess it is the best approach to do
>> DistinctCount. Would you suggest some improvements?
>>
>> [1]
>> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/WordDistinctCountProcessTimeWindowSocket.java
>>
>> Thanks!
>> *--*
>> *-- Felipe Gutierrez*
>>
>> *-- skype: felipe.o.gutierrez*
>> *--* *https://felipeogutierrez.blogspot.com
>> <https://felipeogutierrez.blogspot.com>*
>>
>>
>> On Wed, Jun 12, 2019 at 9:27 AM Felipe Gutierrez <
>> felipe.o.gutier...@gmail.com> wrote:
>>
>>> Hi Rong,
>>>
>>> thanks for your answer. If I understood well, the option will be to use
>>> ProcessFunction [1] since it has the method onTimer(). But not the
>>> ProcessWindowFunction [2], because it does not have the method onTimer(). I
>>> will need this method to call Collector<OUT> out.collect(...) from the
>>> onTImer() method in order to emit a single value of my Distinct Count
>>> function.
>>>
>>> Is that reasonable what I am saying?
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/index.html?org/apache/flink/streaming/api/datastream/DataStream.html
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/index.html?org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.html
>>>
>>> Kind Regards,
>>> Felipe
>>>
>>> *--*
>>> *-- Felipe Gutierrez*
>>>
>>> *-- skype: felipe.o.gutierrez*
>>> *--* *https://felipeogutierrez.blogspot.com
>>> <https://felipeogutierrez.blogspot.com>*
>>>
>>>
>>> On Wed, Jun 12, 2019 at 3:41 AM Rong Rong <walter...@gmail.com> wrote:
>>>
>>>> Hi Felipe,
>>>>
>>>> there are multiple ways to do DISTINCT COUNT in Table/SQL API. In fact
>>>> there's already a thread going on recently [1]
>>>> Based on the description you provided, it seems like it might be a
>>>> better API level to use.
>>>>
>>>> To answer your question,
>>>> - You should be able to use other TimeCharacteristic. You might want to
>>>> try WindowProcessFunction and see if this fits your use case.
>>>> - Not sure I fully understand the question, your keyed by should be
>>>> done on your distinct key (or a combo key) and if you do keyby correctly
>>>> then yes all msg with same key is processed by the same TM thread.
>>>>
>>>> --
>>>> Rong
>>>>
>>>>
>>>>
>>>> [1]
>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/count-DISTINCT-in-flink-SQL-td28061.html
>>>>
>>>> On Tue, Jun 11, 2019 at 1:27 AM Felipe Gutierrez <
>>>> felipe.o.gutier...@gmail.com> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> I have implemented a Flink data stream application to compute distinct
>>>>> count of words. Flink does not have a built-in operator which does this
>>>>> computation. I used KeyedProcessFunction and I am saving the state on a
>>>>> ValueState descriptor.
>>>>> Could someone check if my implementation is the best way of doing it?
>>>>> Here is my solution:
>>>>> https://stackoverflow.com/questions/56524962/how-can-i-improve-my-count-distinct-for-data-stream-implementation-in-flink/56539296#56539296
>>>>>
>>>>> I have some points that I could not understand better:
>>>>> - I only could use TimeCharacteristic.IngestionTime.
>>>>> - I split the words using "Tuple2<Integer, String>(0, word)", so I
>>>>> will have always the same key (0). As I understand, all the events will be
>>>>> processed on the same TaskManager which will not achieve parallelism if I
>>>>> am in a cluster.
>>>>>
>>>>> Kind Regards,
>>>>> Felipe
>>>>> *--*
>>>>> *-- Felipe Gutierrez*
>>>>>
>>>>> *-- skype: felipe.o.gutierrez*
>>>>> *--* *https://felipeogutierrez.blogspot.com
>>>>> <https://felipeogutierrez.blogspot.com>*
>>>>>
>>>>

Reply via email to