Re: How can I improve this Flink application for "Distinct Count of elements" in the data stream?

2019-06-13 Thread Felipe Gutierrez
humm.. it seems that it is my turn to implement all this stuff using Table
API.
Thanks Rong!

*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
*


On Thu, Jun 13, 2019 at 6:00 PM Rong Rong  wrote:

> Hi Felipe,
>
> Hequn is right. The problem you are facing is better using TableAPI level
> code instead of dealing with in DataStream. You will have more Flink
> library support to achieve your goal.
>
> In addition, Flink TableAPI also support UserDefineAggregateFunction [1]
> to achieve your hyperLogLog based approximation. In fact the interface is
> similar to the ones in DataStream API [2].
>
> --
> Rong
>
> [1] https://ci.apache.org/projects/flink/flink-docs
> 
> -release-1.8
> 
> /dev/table/udfs.html#aggregation-functions
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/windows.html#aggregatefunction
>
> On Thu, Jun 13, 2019 at 8:55 AM Felipe Gutierrez <
> felipe.o.gutier...@gmail.com> wrote:
>
>> Hi Hequn,
>> indeed the ReduceFunction is better than the ProcessWindowFunction. I
>> replaced and could check the improvement performance [1]. Thanks for that!
>> I will try a distinct count with the Table API.
>>
>> The question that I am facing is that I want to use a HyperLogLog on a
>> UDF for DataStream. Thus I will be able to have an approximate distinct
>> count inside a window, like I did here [2]. After having my UDF I want to
>> have my own operator which process this approximation of distinct count. So
>> I am not sure with I can implement my own operator for the TableAPI. Can I?
>>
>> [1]
>> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/WordDistinctCountReduceWindowSocket.java
>> [2]
>> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/WordHLLKeyedProcessWindowSocket.java
>>
>> Thanks!
>> Felipe
>>
>> *--*
>> *-- Felipe Gutierrez*
>>
>> *-- skype: felipe.o.gutierrez*
>> *--* *https://felipeogutierrez.blogspot.com
>> *
>>
>>
>> On Thu, Jun 13, 2019 at 8:10 AM Hequn Cheng  wrote:
>>
>>> Hi Felipe,
>>>
>>> From your code, I think you want to get the "count distinct" result
>>> instead of the "distinct count". They contain a different meaning.
>>>
>>> To improve the performance, you can replace
>>> your DistinctProcessWindowFunction to a DistinctProcessReduceFunction. A
>>> ReduceFunction can aggregate the elements of a window incrementally, while
>>> for ProcessWindowFunction, elements cannot be incrementally aggregated but
>>> instead need to be buffered internally until the window is considered ready
>>> for processing.
>>>
>>> > Flink does not have a built-in operator which does this computation.
>>> Flink does have built-in operators to solve your problem. You can use
>>> Table API & SQL to solve your problem. The code looks like:
>>>
>>> public static void main(String[] args) throws Exception {
>>>StreamExecutionEnvironment env = 
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
>>>
>>>DataStream ds = env.socketTextStream("localhost", 9000);
>>>tableEnv.registerDataStream("sourceTable", ds, "line, 
>>> proctime.proctime");
>>>
>>>SplitTableFunction splitFunc = new SplitTableFunction();
>>>tableEnv.registerFunction("splitFunc", splitFunc);
>>>Table result = tableEnv.scan("sourceTable")
>>>  .joinLateral("splitFunc(line) as word")
>>>  .window(Tumble.over("5.seconds").on("proctime").as("w"))
>>>  .groupBy("w")
>>>  .select("count.distinct(word), collect.distinct(word)");
>>>
>>>tableEnv.toAppendStream(result, Row.class).print();
>>>env.execute();
>>> }
>>>
>>> Detail code can be found here[1].
>>>
>>> At the same time, you can perform two-stage window to improve the
>>> performance, i.e., the first window aggregate is used to distinct words and
>>> the second window used to get the final results.
>>>
>>> Document about Table API and SQL can be found here[2][3].
>>>
>>> Best, Hequn
>>>
>>> [1]
>>> https://github.com/hequn8128/flink/commit/b4676a5730cecabe2931b9e628aaebd7729beab2
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tableApi.html
>>> [3]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/sql.html
>>>
>>>
>>> On Wed, Jun 12, 2019 at 9:19 PM Felipe Gutierrez <
>>> felipe.o.gutier...@gmail.com> wrote:
>>>
 Hi Rong, I implemented my solution using a ProcessingWindow
 with timeWindow and a ReduceFunction with timeWindowAll [1]. So for the
 first window I use parallelism and the second window 

Re: How can I improve this Flink application for "Distinct Count of elements" in the data stream?

2019-06-13 Thread Rong Rong
Hi Felipe,

Hequn is right. The problem you are facing is better using TableAPI level
code instead of dealing with in DataStream. You will have more Flink
library support to achieve your goal.

In addition, Flink TableAPI also support UserDefineAggregateFunction [1] to
achieve your hyperLogLog based approximation. In fact the interface is
similar to the ones in DataStream API [2].

--
Rong

[1] https://ci.apache.org/projects/flink/flink-docs

-release-1.8

/dev/table/udfs.html#aggregation-functions
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/windows.html#aggregatefunction

On Thu, Jun 13, 2019 at 8:55 AM Felipe Gutierrez <
felipe.o.gutier...@gmail.com> wrote:

> Hi Hequn,
> indeed the ReduceFunction is better than the ProcessWindowFunction. I
> replaced and could check the improvement performance [1]. Thanks for that!
> I will try a distinct count with the Table API.
>
> The question that I am facing is that I want to use a HyperLogLog on a UDF
> for DataStream. Thus I will be able to have an approximate distinct count
> inside a window, like I did here [2]. After having my UDF I want to have my
> own operator which process this approximation of distinct count. So I am
> not sure with I can implement my own operator for the TableAPI. Can I?
>
> [1]
> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/WordDistinctCountReduceWindowSocket.java
> [2]
> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/WordHLLKeyedProcessWindowSocket.java
>
> Thanks!
> Felipe
>
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> *
>
>
> On Thu, Jun 13, 2019 at 8:10 AM Hequn Cheng  wrote:
>
>> Hi Felipe,
>>
>> From your code, I think you want to get the "count distinct" result
>> instead of the "distinct count". They contain a different meaning.
>>
>> To improve the performance, you can replace
>> your DistinctProcessWindowFunction to a DistinctProcessReduceFunction. A
>> ReduceFunction can aggregate the elements of a window incrementally, while
>> for ProcessWindowFunction, elements cannot be incrementally aggregated but
>> instead need to be buffered internally until the window is considered ready
>> for processing.
>>
>> > Flink does not have a built-in operator which does this computation.
>> Flink does have built-in operators to solve your problem. You can use
>> Table API & SQL to solve your problem. The code looks like:
>>
>> public static void main(String[] args) throws Exception {
>>StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
>>
>>DataStream ds = env.socketTextStream("localhost", 9000);
>>tableEnv.registerDataStream("sourceTable", ds, "line, proctime.proctime");
>>
>>SplitTableFunction splitFunc = new SplitTableFunction();
>>tableEnv.registerFunction("splitFunc", splitFunc);
>>Table result = tableEnv.scan("sourceTable")
>>  .joinLateral("splitFunc(line) as word")
>>  .window(Tumble.over("5.seconds").on("proctime").as("w"))
>>  .groupBy("w")
>>  .select("count.distinct(word), collect.distinct(word)");
>>
>>tableEnv.toAppendStream(result, Row.class).print();
>>env.execute();
>> }
>>
>> Detail code can be found here[1].
>>
>> At the same time, you can perform two-stage window to improve the
>> performance, i.e., the first window aggregate is used to distinct words and
>> the second window used to get the final results.
>>
>> Document about Table API and SQL can be found here[2][3].
>>
>> Best, Hequn
>>
>> [1]
>> https://github.com/hequn8128/flink/commit/b4676a5730cecabe2931b9e628aaebd7729beab2
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tableApi.html
>> [3]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/sql.html
>>
>>
>> On Wed, Jun 12, 2019 at 9:19 PM Felipe Gutierrez <
>> felipe.o.gutier...@gmail.com> wrote:
>>
>>> Hi Rong, I implemented my solution using a ProcessingWindow
>>> with timeWindow and a ReduceFunction with timeWindowAll [1]. So for the
>>> first window I use parallelism and the second window I use to merge
>>> everything on the Reducer. I guess it is the best approach to do
>>> DistinctCount. Would you suggest some improvements?
>>>
>>> [1]
>>> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/WordDistinctCountProcessTimeWindowSocket.java
>>>
>>> Thanks!
>>> *--*
>>> *-- Felipe Gutierrez*
>>>
>>> *-- skype: felipe.o.gutierrez*
>>> *--* 

Re: How can I improve this Flink application for "Distinct Count of elements" in the data stream?

2019-06-13 Thread Felipe Gutierrez
Hi Hequn,
indeed the ReduceFunction is better than the ProcessWindowFunction. I
replaced and could check the improvement performance [1]. Thanks for that!
I will try a distinct count with the Table API.

The question that I am facing is that I want to use a HyperLogLog on a UDF
for DataStream. Thus I will be able to have an approximate distinct count
inside a window, like I did here [2]. After having my UDF I want to have my
own operator which process this approximation of distinct count. So I am
not sure with I can implement my own operator for the TableAPI. Can I?

[1]
https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/WordDistinctCountReduceWindowSocket.java
[2]
https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/WordHLLKeyedProcessWindowSocket.java

Thanks!
Felipe

*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
*


On Thu, Jun 13, 2019 at 8:10 AM Hequn Cheng  wrote:

> Hi Felipe,
>
> From your code, I think you want to get the "count distinct" result
> instead of the "distinct count". They contain a different meaning.
>
> To improve the performance, you can replace
> your DistinctProcessWindowFunction to a DistinctProcessReduceFunction. A
> ReduceFunction can aggregate the elements of a window incrementally, while
> for ProcessWindowFunction, elements cannot be incrementally aggregated but
> instead need to be buffered internally until the window is considered ready
> for processing.
>
> > Flink does not have a built-in operator which does this computation.
> Flink does have built-in operators to solve your problem. You can use
> Table API & SQL to solve your problem. The code looks like:
>
> public static void main(String[] args) throws Exception {
>StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
>
>DataStream ds = env.socketTextStream("localhost", 9000);
>tableEnv.registerDataStream("sourceTable", ds, "line, proctime.proctime");
>
>SplitTableFunction splitFunc = new SplitTableFunction();
>tableEnv.registerFunction("splitFunc", splitFunc);
>Table result = tableEnv.scan("sourceTable")
>  .joinLateral("splitFunc(line) as word")
>  .window(Tumble.over("5.seconds").on("proctime").as("w"))
>  .groupBy("w")
>  .select("count.distinct(word), collect.distinct(word)");
>
>tableEnv.toAppendStream(result, Row.class).print();
>env.execute();
> }
>
> Detail code can be found here[1].
>
> At the same time, you can perform two-stage window to improve the
> performance, i.e., the first window aggregate is used to distinct words and
> the second window used to get the final results.
>
> Document about Table API and SQL can be found here[2][3].
>
> Best, Hequn
>
> [1]
> https://github.com/hequn8128/flink/commit/b4676a5730cecabe2931b9e628aaebd7729beab2
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tableApi.html
> [3]
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/sql.html
>
>
> On Wed, Jun 12, 2019 at 9:19 PM Felipe Gutierrez <
> felipe.o.gutier...@gmail.com> wrote:
>
>> Hi Rong, I implemented my solution using a ProcessingWindow
>> with timeWindow and a ReduceFunction with timeWindowAll [1]. So for the
>> first window I use parallelism and the second window I use to merge
>> everything on the Reducer. I guess it is the best approach to do
>> DistinctCount. Would you suggest some improvements?
>>
>> [1]
>> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/WordDistinctCountProcessTimeWindowSocket.java
>>
>> Thanks!
>> *--*
>> *-- Felipe Gutierrez*
>>
>> *-- skype: felipe.o.gutierrez*
>> *--* *https://felipeogutierrez.blogspot.com
>> *
>>
>>
>> On Wed, Jun 12, 2019 at 9:27 AM Felipe Gutierrez <
>> felipe.o.gutier...@gmail.com> wrote:
>>
>>> Hi Rong,
>>>
>>> thanks for your answer. If I understood well, the option will be to use
>>> ProcessFunction [1] since it has the method onTimer(). But not the
>>> ProcessWindowFunction [2], because it does not have the method onTimer(). I
>>> will need this method to call Collector out.collect(...) from the
>>> onTImer() method in order to emit a single value of my Distinct Count
>>> function.
>>>
>>> Is that reasonable what I am saying?
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/index.html?org/apache/flink/streaming/api/datastream/DataStream.html
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/index.html?org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.html
>>>
>>> Kind Regards,
>>> Felipe
>>>
>>> *--*
>>> *-- Felipe Gutierrez*
>>>
>>> *-- skype: felipe.o.gutierrez*
>>> 

Re: How can I improve this Flink application for "Distinct Count of elements" in the data stream?

2019-06-13 Thread Hequn Cheng
Hi Felipe,

>From your code, I think you want to get the "count distinct" result instead
of the "distinct count". They contain a different meaning.

To improve the performance, you can replace
your DistinctProcessWindowFunction to a DistinctProcessReduceFunction. A
ReduceFunction can aggregate the elements of a window incrementally, while
for ProcessWindowFunction, elements cannot be incrementally aggregated but
instead need to be buffered internally until the window is considered ready
for processing.

> Flink does not have a built-in operator which does this computation.
Flink does have built-in operators to solve your problem. You can use Table
API & SQL to solve your problem. The code looks like:

public static void main(String[] args) throws Exception {
   StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
   StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

   DataStream ds = env.socketTextStream("localhost", 9000);
   tableEnv.registerDataStream("sourceTable", ds, "line, proctime.proctime");

   SplitTableFunction splitFunc = new SplitTableFunction();
   tableEnv.registerFunction("splitFunc", splitFunc);
   Table result = tableEnv.scan("sourceTable")
 .joinLateral("splitFunc(line) as word")
 .window(Tumble.over("5.seconds").on("proctime").as("w"))
 .groupBy("w")
 .select("count.distinct(word), collect.distinct(word)");

   tableEnv.toAppendStream(result, Row.class).print();
   env.execute();
}

Detail code can be found here[1].

At the same time, you can perform two-stage window to improve the
performance, i.e., the first window aggregate is used to distinct words and
the second window used to get the final results.

Document about Table API and SQL can be found here[2][3].

Best, Hequn

[1]
https://github.com/hequn8128/flink/commit/b4676a5730cecabe2931b9e628aaebd7729beab2
[2]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tableApi.html
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/sql.html


On Wed, Jun 12, 2019 at 9:19 PM Felipe Gutierrez <
felipe.o.gutier...@gmail.com> wrote:

> Hi Rong, I implemented my solution using a ProcessingWindow
> with timeWindow and a ReduceFunction with timeWindowAll [1]. So for the
> first window I use parallelism and the second window I use to merge
> everything on the Reducer. I guess it is the best approach to do
> DistinctCount. Would you suggest some improvements?
>
> [1]
> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/WordDistinctCountProcessTimeWindowSocket.java
>
> Thanks!
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> *
>
>
> On Wed, Jun 12, 2019 at 9:27 AM Felipe Gutierrez <
> felipe.o.gutier...@gmail.com> wrote:
>
>> Hi Rong,
>>
>> thanks for your answer. If I understood well, the option will be to use
>> ProcessFunction [1] since it has the method onTimer(). But not the
>> ProcessWindowFunction [2], because it does not have the method onTimer(). I
>> will need this method to call Collector out.collect(...) from the
>> onTImer() method in order to emit a single value of my Distinct Count
>> function.
>>
>> Is that reasonable what I am saying?
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/index.html?org/apache/flink/streaming/api/datastream/DataStream.html
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/index.html?org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.html
>>
>> Kind Regards,
>> Felipe
>>
>> *--*
>> *-- Felipe Gutierrez*
>>
>> *-- skype: felipe.o.gutierrez*
>> *--* *https://felipeogutierrez.blogspot.com
>> *
>>
>>
>> On Wed, Jun 12, 2019 at 3:41 AM Rong Rong  wrote:
>>
>>> Hi Felipe,
>>>
>>> there are multiple ways to do DISTINCT COUNT in Table/SQL API. In fact
>>> there's already a thread going on recently [1]
>>> Based on the description you provided, it seems like it might be a
>>> better API level to use.
>>>
>>> To answer your question,
>>> - You should be able to use other TimeCharacteristic. You might want to
>>> try WindowProcessFunction and see if this fits your use case.
>>> - Not sure I fully understand the question, your keyed by should be done
>>> on your distinct key (or a combo key) and if you do keyby correctly then
>>> yes all msg with same key is processed by the same TM thread.
>>>
>>> --
>>> Rong
>>>
>>>
>>>
>>> [1]
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/count-DISTINCT-in-flink-SQL-td28061.html
>>>
>>> On Tue, Jun 11, 2019 at 1:27 AM Felipe Gutierrez <
>>> felipe.o.gutier...@gmail.com> wrote:
>>>
 Hi all,

 I have implemented a Flink data stream application to compute distinct
 count of words. Flink does not have a built-in operator which does this
 

Re: How can I improve this Flink application for "Distinct Count of elements" in the data stream?

2019-06-12 Thread Felipe Gutierrez
Hi Rong, I implemented my solution using a ProcessingWindow with timeWindow
and a ReduceFunction with timeWindowAll [1]. So for the first window I use
parallelism and the second window I use to merge everything on the Reducer.
I guess it is the best approach to do DistinctCount. Would you suggest some
improvements?

[1]
https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/WordDistinctCountProcessTimeWindowSocket.java

Thanks!
*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
*


On Wed, Jun 12, 2019 at 9:27 AM Felipe Gutierrez <
felipe.o.gutier...@gmail.com> wrote:

> Hi Rong,
>
> thanks for your answer. If I understood well, the option will be to use
> ProcessFunction [1] since it has the method onTimer(). But not the
> ProcessWindowFunction [2], because it does not have the method onTimer(). I
> will need this method to call Collector out.collect(...) from the
> onTImer() method in order to emit a single value of my Distinct Count
> function.
>
> Is that reasonable what I am saying?
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/index.html?org/apache/flink/streaming/api/datastream/DataStream.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/index.html?org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.html
>
> Kind Regards,
> Felipe
>
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> *
>
>
> On Wed, Jun 12, 2019 at 3:41 AM Rong Rong  wrote:
>
>> Hi Felipe,
>>
>> there are multiple ways to do DISTINCT COUNT in Table/SQL API. In fact
>> there's already a thread going on recently [1]
>> Based on the description you provided, it seems like it might be a better
>> API level to use.
>>
>> To answer your question,
>> - You should be able to use other TimeCharacteristic. You might want to
>> try WindowProcessFunction and see if this fits your use case.
>> - Not sure I fully understand the question, your keyed by should be done
>> on your distinct key (or a combo key) and if you do keyby correctly then
>> yes all msg with same key is processed by the same TM thread.
>>
>> --
>> Rong
>>
>>
>>
>> [1]
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/count-DISTINCT-in-flink-SQL-td28061.html
>>
>> On Tue, Jun 11, 2019 at 1:27 AM Felipe Gutierrez <
>> felipe.o.gutier...@gmail.com> wrote:
>>
>>> Hi all,
>>>
>>> I have implemented a Flink data stream application to compute distinct
>>> count of words. Flink does not have a built-in operator which does this
>>> computation. I used KeyedProcessFunction and I am saving the state on a
>>> ValueState descriptor.
>>> Could someone check if my implementation is the best way of doing it?
>>> Here is my solution:
>>> https://stackoverflow.com/questions/56524962/how-can-i-improve-my-count-distinct-for-data-stream-implementation-in-flink/56539296#56539296
>>>
>>> I have some points that I could not understand better:
>>> - I only could use TimeCharacteristic.IngestionTime.
>>> - I split the words using "Tuple2(0, word)", so I will
>>> have always the same key (0). As I understand, all the events will be
>>> processed on the same TaskManager which will not achieve parallelism if I
>>> am in a cluster.
>>>
>>> Kind Regards,
>>> Felipe
>>> *--*
>>> *-- Felipe Gutierrez*
>>>
>>> *-- skype: felipe.o.gutierrez*
>>> *--* *https://felipeogutierrez.blogspot.com
>>> *
>>>
>>


Re: How can I improve this Flink application for "Distinct Count of elements" in the data stream?

2019-06-12 Thread Felipe Gutierrez
Hi Rong,

thanks for your answer. If I understood well, the option will be to use
ProcessFunction [1] since it has the method onTimer(). But not the
ProcessWindowFunction [2], because it does not have the method onTimer(). I
will need this method to call Collector out.collect(...) from the
onTImer() method in order to emit a single value of my Distinct Count
function.

Is that reasonable what I am saying?

[1]
https://ci.apache.org/projects/flink/flink-docs-master/api/java/index.html?org/apache/flink/streaming/api/datastream/DataStream.html
[2]
https://ci.apache.org/projects/flink/flink-docs-master/api/java/index.html?org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.html

Kind Regards,
Felipe

*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
*


On Wed, Jun 12, 2019 at 3:41 AM Rong Rong  wrote:

> Hi Felipe,
>
> there are multiple ways to do DISTINCT COUNT in Table/SQL API. In fact
> there's already a thread going on recently [1]
> Based on the description you provided, it seems like it might be a better
> API level to use.
>
> To answer your question,
> - You should be able to use other TimeCharacteristic. You might want to
> try WindowProcessFunction and see if this fits your use case.
> - Not sure I fully understand the question, your keyed by should be done
> on your distinct key (or a combo key) and if you do keyby correctly then
> yes all msg with same key is processed by the same TM thread.
>
> --
> Rong
>
>
>
> [1]
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/count-DISTINCT-in-flink-SQL-td28061.html
>
> On Tue, Jun 11, 2019 at 1:27 AM Felipe Gutierrez <
> felipe.o.gutier...@gmail.com> wrote:
>
>> Hi all,
>>
>> I have implemented a Flink data stream application to compute distinct
>> count of words. Flink does not have a built-in operator which does this
>> computation. I used KeyedProcessFunction and I am saving the state on a
>> ValueState descriptor.
>> Could someone check if my implementation is the best way of doing it?
>> Here is my solution:
>> https://stackoverflow.com/questions/56524962/how-can-i-improve-my-count-distinct-for-data-stream-implementation-in-flink/56539296#56539296
>>
>> I have some points that I could not understand better:
>> - I only could use TimeCharacteristic.IngestionTime.
>> - I split the words using "Tuple2(0, word)", so I will
>> have always the same key (0). As I understand, all the events will be
>> processed on the same TaskManager which will not achieve parallelism if I
>> am in a cluster.
>>
>> Kind Regards,
>> Felipe
>> *--*
>> *-- Felipe Gutierrez*
>>
>> *-- skype: felipe.o.gutierrez*
>> *--* *https://felipeogutierrez.blogspot.com
>> *
>>
>


Re: How can I improve this Flink application for "Distinct Count of elements" in the data stream?

2019-06-11 Thread Rong Rong
Hi Felipe,

there are multiple ways to do DISTINCT COUNT in Table/SQL API. In fact
there's already a thread going on recently [1]
Based on the description you provided, it seems like it might be a better
API level to use.

To answer your question,
- You should be able to use other TimeCharacteristic. You might want to try
WindowProcessFunction and see if this fits your use case.
- Not sure I fully understand the question, your keyed by should be done on
your distinct key (or a combo key) and if you do keyby correctly then yes
all msg with same key is processed by the same TM thread.

--
Rong



[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/count-DISTINCT-in-flink-SQL-td28061.html

On Tue, Jun 11, 2019 at 1:27 AM Felipe Gutierrez <
felipe.o.gutier...@gmail.com> wrote:

> Hi all,
>
> I have implemented a Flink data stream application to compute distinct
> count of words. Flink does not have a built-in operator which does this
> computation. I used KeyedProcessFunction and I am saving the state on a
> ValueState descriptor.
> Could someone check if my implementation is the best way of doing it? Here
> is my solution:
> https://stackoverflow.com/questions/56524962/how-can-i-improve-my-count-distinct-for-data-stream-implementation-in-flink/56539296#56539296
>
> I have some points that I could not understand better:
> - I only could use TimeCharacteristic.IngestionTime.
> - I split the words using "Tuple2(0, word)", so I will
> have always the same key (0). As I understand, all the events will be
> processed on the same TaskManager which will not achieve parallelism if I
> am in a cluster.
>
> Kind Regards,
> Felipe
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> *
>


How can I improve this Flink application for "Distinct Count of elements" in the data stream?

2019-06-11 Thread Felipe Gutierrez
Hi all,

I have implemented a Flink data stream application to compute distinct
count of words. Flink does not have a built-in operator which does this
computation. I used KeyedProcessFunction and I am saving the state on a
ValueState descriptor.
Could someone check if my implementation is the best way of doing it? Here
is my solution:
https://stackoverflow.com/questions/56524962/how-can-i-improve-my-count-distinct-for-data-stream-implementation-in-flink/56539296#56539296

I have some points that I could not understand better:
- I only could use TimeCharacteristic.IngestionTime.
- I split the words using "Tuple2(0, word)", so I will
have always the same key (0). As I understand, all the events will be
processed on the same TaskManager which will not achieve parallelism if I
am in a cluster.

Kind Regards,
Felipe
*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
*