humm.. it seems that it is my turn to implement all this stuff using Table API. Thanks Rong!
*--* *-- Felipe Gutierrez* *-- skype: felipe.o.gutierrez* *--* *https://felipeogutierrez.blogspot.com <https://felipeogutierrez.blogspot.com>* On Thu, Jun 13, 2019 at 6:00 PM Rong Rong <walter...@gmail.com> wrote: > Hi Felipe, > > Hequn is right. The problem you are facing is better using TableAPI level > code instead of dealing with in DataStream. You will have more Flink > library support to achieve your goal. > > In addition, Flink TableAPI also support UserDefineAggregateFunction [1] > to achieve your hyperLogLog based approximation. In fact the interface is > similar to the ones in DataStream API [2]. > > -- > Rong > > [1] https://ci.apache.org/projects/flink/flink-docs > <https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html#aggregation-functions> > -release-1.8 > <https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/windows.html#aggregatefunction> > /dev/table/udfs.html#aggregation-functions > [2] > https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/windows.html#aggregatefunction > > On Thu, Jun 13, 2019 at 8:55 AM Felipe Gutierrez < > felipe.o.gutier...@gmail.com> wrote: > >> Hi Hequn, >> indeed the ReduceFunction is better than the ProcessWindowFunction. I >> replaced and could check the improvement performance [1]. Thanks for that! >> I will try a distinct count with the Table API. >> >> The question that I am facing is that I want to use a HyperLogLog on a >> UDF for DataStream. Thus I will be able to have an approximate distinct >> count inside a window, like I did here [2]. After having my UDF I want to >> have my own operator which process this approximation of distinct count. So >> I am not sure with I can implement my own operator for the TableAPI. Can I? >> >> [1] >> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/WordDistinctCountReduceWindowSocket.java >> [2] >> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/WordHLLKeyedProcessWindowSocket.java >> >> Thanks! >> Felipe >> >> *--* >> *-- Felipe Gutierrez* >> >> *-- skype: felipe.o.gutierrez* >> *--* *https://felipeogutierrez.blogspot.com >> <https://felipeogutierrez.blogspot.com>* >> >> >> On Thu, Jun 13, 2019 at 8:10 AM Hequn Cheng <chenghe...@gmail.com> wrote: >> >>> Hi Felipe, >>> >>> From your code, I think you want to get the "count distinct" result >>> instead of the "distinct count". They contain a different meaning. >>> >>> To improve the performance, you can replace >>> your DistinctProcessWindowFunction to a DistinctProcessReduceFunction. A >>> ReduceFunction can aggregate the elements of a window incrementally, while >>> for ProcessWindowFunction, elements cannot be incrementally aggregated but >>> instead need to be buffered internally until the window is considered ready >>> for processing. >>> >>> > Flink does not have a built-in operator which does this computation. >>> Flink does have built-in operators to solve your problem. You can use >>> Table API & SQL to solve your problem. The code looks like: >>> >>> public static void main(String[] args) throws Exception { >>> StreamExecutionEnvironment env = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); >>> >>> DataStream ds = env.socketTextStream("localhost", 9000); >>> tableEnv.registerDataStream("sourceTable", ds, "line, >>> proctime.proctime"); >>> >>> SplitTableFunction splitFunc = new SplitTableFunction(); >>> tableEnv.registerFunction("splitFunc", splitFunc); >>> Table result = tableEnv.scan("sourceTable") >>> .joinLateral("splitFunc(line) as word") >>> .window(Tumble.over("5.seconds").on("proctime").as("w")) >>> .groupBy("w") >>> .select("count.distinct(word), collect.distinct(word)"); >>> >>> tableEnv.toAppendStream(result, Row.class).print(); >>> env.execute(); >>> } >>> >>> Detail code can be found here[1]. >>> >>> At the same time, you can perform two-stage window to improve the >>> performance, i.e., the first window aggregate is used to distinct words and >>> the second window used to get the final results. >>> >>> Document about Table API and SQL can be found here[2][3]. >>> >>> Best, Hequn >>> >>> [1] >>> https://github.com/hequn8128/flink/commit/b4676a5730cecabe2931b9e628aaebd7729beab2 >>> [2] >>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tableApi.html >>> [3] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/sql.html >>> >>> >>> On Wed, Jun 12, 2019 at 9:19 PM Felipe Gutierrez < >>> felipe.o.gutier...@gmail.com> wrote: >>> >>>> Hi Rong, I implemented my solution using a ProcessingWindow >>>> with timeWindow and a ReduceFunction with timeWindowAll [1]. So for the >>>> first window I use parallelism and the second window I use to merge >>>> everything on the Reducer. I guess it is the best approach to do >>>> DistinctCount. Would you suggest some improvements? >>>> >>>> [1] >>>> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/WordDistinctCountProcessTimeWindowSocket.java >>>> >>>> Thanks! >>>> *--* >>>> *-- Felipe Gutierrez* >>>> >>>> *-- skype: felipe.o.gutierrez* >>>> *--* *https://felipeogutierrez.blogspot.com >>>> <https://felipeogutierrez.blogspot.com>* >>>> >>>> >>>> On Wed, Jun 12, 2019 at 9:27 AM Felipe Gutierrez < >>>> felipe.o.gutier...@gmail.com> wrote: >>>> >>>>> Hi Rong, >>>>> >>>>> thanks for your answer. If I understood well, the option will be to >>>>> use ProcessFunction [1] since it has the method onTimer(). But not the >>>>> ProcessWindowFunction [2], because it does not have the method onTimer(). >>>>> I >>>>> will need this method to call Collector<OUT> out.collect(...) from the >>>>> onTImer() method in order to emit a single value of my Distinct Count >>>>> function. >>>>> >>>>> Is that reasonable what I am saying? >>>>> >>>>> [1] >>>>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/index.html?org/apache/flink/streaming/api/datastream/DataStream.html >>>>> [2] >>>>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/index.html?org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.html >>>>> >>>>> Kind Regards, >>>>> Felipe >>>>> >>>>> *--* >>>>> *-- Felipe Gutierrez* >>>>> >>>>> *-- skype: felipe.o.gutierrez* >>>>> *--* *https://felipeogutierrez.blogspot.com >>>>> <https://felipeogutierrez.blogspot.com>* >>>>> >>>>> >>>>> On Wed, Jun 12, 2019 at 3:41 AM Rong Rong <walter...@gmail.com> wrote: >>>>> >>>>>> Hi Felipe, >>>>>> >>>>>> there are multiple ways to do DISTINCT COUNT in Table/SQL API. In >>>>>> fact there's already a thread going on recently [1] >>>>>> Based on the description you provided, it seems like it might be a >>>>>> better API level to use. >>>>>> >>>>>> To answer your question, >>>>>> - You should be able to use other TimeCharacteristic. You might want >>>>>> to try WindowProcessFunction and see if this fits your use case. >>>>>> - Not sure I fully understand the question, your keyed by should be >>>>>> done on your distinct key (or a combo key) and if you do keyby correctly >>>>>> then yes all msg with same key is processed by the same TM thread. >>>>>> >>>>>> -- >>>>>> Rong >>>>>> >>>>>> >>>>>> >>>>>> [1] >>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/count-DISTINCT-in-flink-SQL-td28061.html >>>>>> >>>>>> On Tue, Jun 11, 2019 at 1:27 AM Felipe Gutierrez < >>>>>> felipe.o.gutier...@gmail.com> wrote: >>>>>> >>>>>>> Hi all, >>>>>>> >>>>>>> I have implemented a Flink data stream application to compute >>>>>>> distinct count of words. Flink does not have a built-in operator which >>>>>>> does >>>>>>> this computation. I used KeyedProcessFunction and I am saving the state >>>>>>> on >>>>>>> a ValueState descriptor. >>>>>>> Could someone check if my implementation is the best way of doing >>>>>>> it? Here is my solution: >>>>>>> https://stackoverflow.com/questions/56524962/how-can-i-improve-my-count-distinct-for-data-stream-implementation-in-flink/56539296#56539296 >>>>>>> >>>>>>> I have some points that I could not understand better: >>>>>>> - I only could use TimeCharacteristic.IngestionTime. >>>>>>> - I split the words using "Tuple2<Integer, String>(0, word)", so I >>>>>>> will have always the same key (0). As I understand, all the events will >>>>>>> be >>>>>>> processed on the same TaskManager which will not achieve parallelism if >>>>>>> I >>>>>>> am in a cluster. >>>>>>> >>>>>>> Kind Regards, >>>>>>> Felipe >>>>>>> *--* >>>>>>> *-- Felipe Gutierrez* >>>>>>> >>>>>>> *-- skype: felipe.o.gutierrez* >>>>>>> *--* *https://felipeogutierrez.blogspot.com >>>>>>> <https://felipeogutierrez.blogspot.com>* >>>>>>> >>>>>>