Re: [PROPOSAL] Mean Fn without_defaults()

2020-06-04 Thread Robert Bradshaw
Thanks for looking at this. I think it makes sense. If you could make a
pull request that would be great; it'd be much easier to discuss there.

On Thu, Jun 4, 2020 at 10:18 AM Iñigo San Jose  wrote:

> Hi all!
>
> Some time ago I was tinkering with A. Beam and I encountered the
> typical error when using Global Combiners and windows (the need of
> without_defaults). I was actually using the Mean function and I noticed
> that I couldn't use the without_defaults() option and I needed to use
> "CombineGlobally(MeanCombineFn()).without_defaults()".
>
> I checked the Python code [1] and I made a "fix" to be able to use
> Mean.Globally().without_defaults() without running into errors. Please find
> attached both the quick tests I ran and the code itself of the file [1]
> modified.
>
> I was considering making a pull request to Apache Beam, but I would like
> to discuss it with someone before in case there is a reason for not using
> `without_defaults()`  there. I would highly appreciate your feedback on
> this.
>
> Thanks a lot,
> Iñigo
>
> PS: I also tried the same approach with Count, but I couldn't make it work
> (I think because of the accumulators)
>
> PS2: This was made in A. Beam 2.20.0, but should also work in 2.21.0
>
>
> [1]
> https://beam.apache.org/releases/pydoc/2.20.0/_modules/apache_beam/transforms/combiners.html#Mean
> 
>
> --
>
> *•** Iñigo San Jose | josein...@google.coom*
> *•** Big Data *Technical Solutions Engineer T2
> *• *Google Cloud Platform
> • *Google, Dublin*
>
>
>


[PROPOSAL] Mean Fn without_defaults()

2020-06-04 Thread Iñigo San Jose
Hi all!

Some time ago I was tinkering with A. Beam and I encountered the
typical error when using Global Combiners and windows (the need of
without_defaults). I was actually using the Mean function and I noticed
that I couldn't use the without_defaults() option and I needed to use
"CombineGlobally(MeanCombineFn()).without_defaults()".

I checked the Python code [1] and I made a "fix" to be able to use
Mean.Globally().without_defaults() without running into errors. Please find
attached both the quick tests I ran and the code itself of the file [1]
modified.

I was considering making a pull request to Apache Beam, but I would like to
discuss it with someone before in case there is a reason for not using
`without_defaults()`  there. I would highly appreciate your feedback on
this.

Thanks a lot,
Iñigo

PS: I also tried the same approach with Count, but I couldn't make it work
(I think because of the accumulators)

PS2: This was made in A. Beam 2.20.0, but should also work in 2.21.0


[1]
https://beam.apache.org/releases/pydoc/2.20.0/_modules/apache_beam/transforms/combiners.html#Mean


-- 

*•** Iñigo San Jose | josein...@google.coom*
*•** Big Data *Technical Solutions Engineer T2
*• *Google Cloud Platform
• *Google, Dublin*
import logging
import random
import time
import datetime


import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam import Create, FlatMap, Map, ParDo, Filter, Flatten, Partition
from apache_beam import Keys, Values, GroupByKey, CoGroupByKey, CombineGlobally, CombinePerKey
from apache_beam import pvalue, window, WindowInto
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions
from apache_beam.transforms.util import WithKeys
from apache_beam.transforms.combiners import Top, Mean, Count, MeanCombineFn, CountCombineFn


with beam.Pipeline() as p:
scores = [
{"player": "Juan", "score": 1000, "timestamp": "2020-04-30 15:35"},
{"player": "Marina", "score": 1500, "timestamp": "2020-04-30 16:10"},
{"player": "Cristina", "score": 2000, "timestamp": "2020-04-30 15:00"},
{"player": "Cristina", "score": 3500, "timestamp": "2020-04-30 15:45"},
{"player": "Marina", "score": 500, "timestamp": "2020-04-30 16:30"},
{"player": "Juan", "score": 4000, "timestamp": "2020-04-30 15:15"},
{"player": "Cristina", "score": 3000, "timestamp": "2020-04-30 16:50"},
{"player": "Juan", "score": 2000, "timestamp": "2020-04-30 16:59"},
]


def date2unix(string):
unix = int(time.mktime(datetime.datetime.strptime(string, "%Y-%m-%d %H:%M").timetuple()))
return unix


def toKV(element):
return (element['player'], element['score'])


create = (p | "Create" >> Create(scores)
  | "Add timestamps" >> Map(lambda x: window.TimestampedValue(x, date2unix(x['timestamp'])))
  )
mean_pk = (create | "To KV" >> Map(toKV)
 | "FixedWindow" >> WindowInto(window.FixedWindows(60 * 60))
 | "Mean Per Key" >> Mean.PerKey()
 | Map(lambda x: print("Mean per Player: {} ".format(x)))
 )

add_score = (create | "Get Score" >> Map(lambda x: x['score']))

add_window = add_score | "SlidingWindow" >> WindowInto(window.SlidingWindows(60 * 60, 60 * 20))

(add_score | "Modified No window" >> Mean.Globally()
  | "Print avg no window" >> Map(lambda x: print("No Window Average: {} ".format(x

(add_window | "Original" >> CombineGlobally(MeanCombineFn()).without_defaults()
  | "Print avg og" >> Map(lambda x: print("Original Average: {} ".format(x

(add_window | "Modified" >> Mean.Globally().without_defaults()
  | "Print avg modified" >> Map(lambda x: print("Modified Average: {} ".format(x
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""A library of basic combiner PTransform subclasses."""

# pytype: skip-file

from __future__ import absolute_import
from __future__ import division

import heapq
import operator
import random
import sys
import warnings
from builtins import object
from builtins import zip
from typing import Any
from typing import Dict
from