Re: Apache Beam Python SDK - Inaccurate session window interval

2020-04-24 Thread Luke Cwik
The session gap duration of 60mins tells the runner to close the window
when there is a gap between records of at least 60mins. So for a given key
X, if we had data at timestamps:
1:01
1:30
2:45
2:50
then 1:01 and 1:30 would be in one session while 2:45 and 2:50 would be in
a second session.

If I'm misunderstanding, maybe you could share some data with timestamps
and what you expect the output to be.

On Fri, Apr 24, 2020 at 2:17 AM Yohei Onishi 
wrote:

> Hi,
>
> I also deployed the application to Dataflow then got the same result.
> The actual session interval was not the same as the given session interval
> (60 minutes).
>
> ---
> new 2 record(s) in 1:50:00 session (start 2018-10-19 11:10:00 end
> 2018-10-19 13:00:00)
> new 2 record(s) in 1:01:00 session (start 2018-10-19 10:02:00 end
> 2018-10-19 11:03:00)
> new 5 record(s) in 3:00:00 session (start 2018-10-19 10:00:00 end
> 2018-10-19 13:00:00)
> ---
>
>
> https://stackoverflow.com/questions/61402895/apache-beam-python-sdk-inaccurate-session-window-interval
>
> On Fri, Apr 24, 2020 at 3:09 PM Yohei Onishi 
> wrote:
>
>> Hi,
>>
>> I am trying to process data with 60 minutes session interval using Apache
>> Beam Python SDK. But the actual session interval was inaccurate such as
>> 3:00:00 or 1:01:00 or 1:50:00 when I run my application locally using
>> DirectRunner.
>>
>> Would you help me find a solution to fix this issue and process data with
>> 60 minutes session?
>>
>> I built my pipeline as bellow.
>>
>> -
>> with Pipeline(options=pipeline_options) as pipeline:
>> (
>> pipeline
>> | "Read" >> ReadFromText(known_args.input, skip_header_lines=1)
>> | "Convert" >> ParDo(Convert())
>> | "Add Timestamp" >> Map(lambda x: window.TimestampedValue(x,
>> get_timestamp_from_element(x).timestamp()))
>> | "Use User ID As Key" >> Map(lambda x: (x["user_id"], x))
>> | "Apply Session Window" >>
>> WindowInto(window.Sessions(known_args.session_interval))
>> | "Group" >> GroupByKey()
>> | "Write To CSV" >> ParDo(WriteToCSV(known_args.output))
>> )
>> result = pipeline.run()
>> result.wait_until_finish()
>> -
>>
>> session_interval (60 minutes) is provided as bellow.
>>
>> -
>> parser.add_argument(
>> "--session_interval",
>> help="Interval of each session",
>> default=60*60) # 60 mins
>> -
>>
>> WriteToCSV function process data per session. I logged the session
>> duration but it was not accurate.
>>
>> -
>> class WriteToCSV(DoFn):
>> def __init__(self, output_path):
>> self.output_path = output_path
>>
>> def process(self, element, window=DoFn.WindowParam):
>> window_start = window.start.to_utc_datetime()
>> window_end = window.end.to_utc_datetime()
>> duration = window_end - window_start
>> logging.info(">>> new %s record(s) in %s session (start %s end
>> %s)", len(click_records), duration, window_start, window_end)
>> 
>> -
>>
>> Then I got this log messages when I run this application locally with
>> DirectRunner.
>>
>> -
>> new 5 records in 3:00:00 session (start 2018-10-19 02:00:00 end
>> 2018-10-19 05:00:00)
>> new 2 records in 1:01:00 session (start 2018-10-19 02:02:00 end
>> 2018-10-19 03:03:00)
>> new 2 records in 1:50:00 session (start 2018-10-19 03:10:00 end
>> 2018-10-19 05:00:00
>> -
>>
>> Thanks.
>>
>>
>>


Re: SparkRunner on k8s

2020-04-24 Thread Kyle Weaver
> In other words, are there options to the job runner that would eventually
translate to ' --volume /storage1:/storage1 ' while the docker container is
being run by Flink? Even if it means code changes and building from source,
its fine. Please point me in the right direction.

I found an open feature request for this, but unfortunately it looks like
neither of two attempted implementations ended up being merged:
https://issues.apache.org/jira/browse/BEAM-5440

Sorry I haven't had much time to look into your issue with the Spark
runner. If you are still interested in trying it, you might try using a
different Beam version and see if the problem persists.

On Wed, Apr 22, 2020 at 7:56 PM Ramanan, Buvana (Nokia - US/Murray Hill) <
buvana.rama...@nokia-bell-labs.com> wrote:

> Hi Kyle,
>
> About FlinkRunner:
>
> "One is you can mount a directory from the Docker host inside the
> container(s). But the more scalable solution is to use a distributed file
> system, such as HDFS, Google Cloud Storage, or Amazon S3"
>
> I am running some benchmarking tests and so I prefer not to use GCS or S3
> (as the network delay can kill the performance).
>
> I would like to focus on the option of the host mounting the volume into
> the containers, but I have not come across a docker command where a host
> can mount volumes into running containers. I do not think 'docker create'
> volume will help here, please correct if I am wrong.
>
> Is there a way the job runner can tell the Flink cluster to mount certain
> volumes before running the sdk container? And if so, is there a way I can
> tell the job runner to tell Flink to mount these volumes?
>
> In other words, are there options to the job runner that would eventually
> translate to ' --volume /storage1:/storage1 ' while the docker container is
> being run by Flink? Even if it means code changes and building from source,
> its fine. Please point me in the right direction.
>
> Thanks,
> Buvana
> --
> *From:* Kyle Weaver 
> *Sent:* Monday, April 13, 2020 7:34 PM
> *To:* user@beam.apache.org 
> *Subject:* Re: SparkRunner on k8s
>
> > It appears that the filesystem in the client side is not the same as the
> environment that Flink creates to run the Beam pipeline (I think Flink does
> a docker run of the python sdk to run the Beam pipeline? In that case, how
> would the container know where to write the file?)
>
>
> You are correct. Beam Python execution takes place within a Docker
> container, or often multiple containers, depending on your pipeline and
> configuration. Multiple containers is probably the cause of the error here.
> The Python SDK doesn't do anything special with local file paths; it just
> writes them to the local file system of the container. So in order to get a
> persistent, shared file system, you have a couple options. One is you can
> mount a directory from the Docker host inside the container(s). But the
> more scalable solution is to use a distributed file system, such as HDFS,
> Google Cloud Storage, or Amazon S3. Check out the Beam programming guide
> for more info:
> https://beam.apache.org/documentation/programming-guide/#pipeline-io
>
> On Mon, Apr 13, 2020 at 6:55 PM Ramanan, Buvana (Nokia - US/Murray Hill) <
> buvana.rama...@nokia-bell-labs.com> wrote:
>
> Kyle,
>
>
>
> Thanks a lot for the pointers. I got interested to run my beam pipeline on
> FlinkRunner and got a local Flink cluster setup, tested a sample code to
> work fine.
>
>
>
> I started the Beam job runner going:
>
> docker run --net=host apachebeam/flink1.8_job_server:latest --flink-master
> $IP:8081 --job-host $IP  --job-port 8099
>
>
>
> Submitted a beam pipeline, which when run with LocalRunner works totally
> fine. The last stage of the pipeline code looks as follows:
>
> . . .
>
> . . .
>
> . . .
>
> output= (
>
> {
>
> 'Mean Open': mean_open,
>
> 'Mean Close': mean_close
>
> } |
>
> beam.CoGroupByKey() |
>
> beam.io.WriteToText(args.output)
>
> )
>
>
>
> So, we are ending the pipeline with a io.WriteToText()
>
>
>
> Now, when I supply a filename, whether residing in local disk (/tmp) or
> network mounted disk(e.g /nas2), I get the following error:
>
> python test-beam.py –input data/sp500.csv –output /tmp/result.txt
>
>
>
> WARNING:root:Make sure that locally built Python SDK docker image has
> Python 3.6 interpreter.
>
> ERROR:root:java.lang.RuntimeException: Error received from SDK harness for
> instruction 2: Traceback (most recent call last):
>
>   File "apache_beam/runners/common.py", line 883, in
> apache_beam.runners.common.DoFnRunner.process
>
>   File "apache_beam/runners/common.py", line 667, in
> apache_beam.runners.common.PerWindowInvoker.invoke_process
>
>   File "apache_beam/runners/common.py", line 748, in
> apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
>
>   File "/usr/local/lib/python3.6/site-packages/apache_beam/io/iobase.py",
> line 1095, in 

Re: Stateful & Timely Call

2020-04-24 Thread Ismaël Mejía
Sounds like a good addition to the Beam patterns page Reza :)

On Fri, Apr 24, 2020 at 3:22 AM Aniruddh Sharma  wrote:
>
> Thanks Robert,
>
> This is a life saver and its a great help :). It works like a charm.
>
> Thanks
> Aniruddh
>
> On Thu, Apr 23, 2020 at 4:45 PM Robert Bradshaw  wrote:
>>
>> I may have misinterpreted your email, I thought you didn't have a need for 
>> keys at all. If this is actually the case, you don't need a GroupByKey, just 
>> have your DoFn take Rows as input, and emit List as output. That is, 
>> it's a DoFn>.
>>
>> You can buffer multiple Rows in an instance variable between process element 
>> calls. For example,
>>
>> class MyBufferingDoFn> {
>>   List buffer = new ArrayList<>();
>>   @ProcessElement public void processElement(T elt, OutputReceiver> 
>> out) {
>> buffer.append(out);
>> if (buffer.size() > 100) {
>>   out.output(buffer);
>>   buffer = new ArrayList<>();
>> }
>>   }
>>   @FinishBundle public void finishBundle(OutputReceiver> out) {
>> out.output(buffer);
>> buffer = new ArrayList<>();
>>   }
>> }
>>
>> See 
>> https://beam.apache.org/releases/javadoc/2.20.0/org/apache/beam/sdk/transforms/ParDo.html
>>  for more information on the lifetime of DoFns.
>>
>> As for why your GBK is taking so long, yes, this can be a bottleneck. 
>> However, it should be noted that Dataflow (like most other runners) executes 
>> this step in conjunction with other steps as part of a "fused stage." So if 
>> your pipeline looks like
>>
>> Read -> DoFnA -> GBK -> DoFnB -> Write
>>
>> then Read, DoFnA, and GBK[part1] will execute concurrently (all starting up 
>> almost immediately), one element at at time, and when that's finished, 
>> GBK[part2, DoFnB, Write will execute concurrently, one element at a time, so 
>> you can't just look at the last unfinished stage to determine where the 
>> bottleneck is. (One helpful tool, however, is looking at the amount of time 
>> spent on each step in the UI.)
>>
>> Hopefully that helps.
>>
>> - Robert
>>
>>
>> On Thu, Apr 23, 2020 at 12:43 PM Aniruddh Sharma  
>> wrote:
>>>
>>> Thanks Robert and Luke
>>>
>>> This approach seems good to me. I am trying that , i have to include a 
>>> GroupBy to make Iterable available to do ParDo function to do same. 
>>> Now GroupBy is a bottleneck, its working for last 2 hours and proceed only 
>>> 40 GB data (still waiting for rest of 100's of GB of data).
>>>
>>> Currently I used GroupByKey.Create()
>>>
>>> What's recommended way to use what key to make it execute faster like same 
>>> key for all rows, vs different key for each row vs same row for a group of 
>>> keys.
>>>
>>> Thanks
>>> Aniruddh
>>>
>>> On Thu, Apr 23, 2020 at 12:47 PM Luke Cwik  wrote:

 As Robert suggested, what prevents you from doing:
 ReadFromBQ -> ParDo(BatchInMemory) -> DLP
 where BatchInMemory stores elements in the @ProcessElement method in an in 
 memory list and produce output every time the list is large enough with a 
 final output in the @FinishBundle method?

 On Thu, Apr 23, 2020 at 9:42 AM Aniruddh Sharma  
 wrote:
>
> Hi Luke
>
> Sorry forgot to mention the functions. Dataflow adds following function 
> and ["PartitionKeys", new GroupByKeyAndSortValuesOnly] this is super 
> slow, How to choose keys to make it faster ?
>
>  .apply("ReifyWindows", ParDo.of(new ReifyWindowedValueFn<>()))
>   .setCoder(
>   KvCoder.of(
>   keyCoder,
>   KvCoder.of(InstantCoder.of(), 
> WindowedValue.getFullCoder(kvCoder, windowCoder
>
>   // Group by key and sort by timestamp, dropping windows as they 
> are reified
>   .apply("PartitionKeys", new GroupByKeyAndSortValuesOnly<>())
>
>   // The GBKO sets the windowing strategy to the global default
>   .setWindowingStrategyInternal(inputWindowingStrategy);
>
> THanks
> ANiruddh
>
> On 2020/04/23 16:35:58, Aniruddh Sharma  wrote:
> > Thanks Luke for your response.
> >
> > My use case is following.
> > a) I read data from BQ (TableRow)
> > b) Convert it into (Table.Row) for DLP calls.
> > c) have to batch Table.Row collection up to a max size of 512 KB (i.e 
> > fit may rows from BQ into a single DLP table) and call DLP.
> >
> > Functionally, I don't have a need of key and window. As I just want to 
> > fit rows in DLP table up to a max size.
> >
> > In batch mode, when I call StateFulAPI,
> > it adds a "BatchStatefulParDoOverrides.GroupByKeyAndSortValuesOnly" 
> > step and this step is super slow. Like it is running on 50 node cluster 
> > for 800 GB data for last 10 hours.
> >
> > This step is not added when I call Dataflow in streaming mode. But I 
> > can't call it in Streaming mode for other reasons.
> >
> > So I am trying to understand following
> > a) 

Re: Apache Beam Python SDK - Inaccurate session window interval

2020-04-24 Thread Yohei Onishi
Hi,

I also deployed the application to Dataflow then got the same result.
The actual session interval was not the same as the given session interval
(60 minutes).

---
new 2 record(s) in 1:50:00 session (start 2018-10-19 11:10:00 end
2018-10-19 13:00:00)
new 2 record(s) in 1:01:00 session (start 2018-10-19 10:02:00 end
2018-10-19 11:03:00)
new 5 record(s) in 3:00:00 session (start 2018-10-19 10:00:00 end
2018-10-19 13:00:00)
---

https://stackoverflow.com/questions/61402895/apache-beam-python-sdk-inaccurate-session-window-interval

On Fri, Apr 24, 2020 at 3:09 PM Yohei Onishi 
wrote:

> Hi,
>
> I am trying to process data with 60 minutes session interval using Apache
> Beam Python SDK. But the actual session interval was inaccurate such as
> 3:00:00 or 1:01:00 or 1:50:00 when I run my application locally using
> DirectRunner.
>
> Would you help me find a solution to fix this issue and process data with
> 60 minutes session?
>
> I built my pipeline as bellow.
>
> -
> with Pipeline(options=pipeline_options) as pipeline:
> (
> pipeline
> | "Read" >> ReadFromText(known_args.input, skip_header_lines=1)
> | "Convert" >> ParDo(Convert())
> | "Add Timestamp" >> Map(lambda x: window.TimestampedValue(x,
> get_timestamp_from_element(x).timestamp()))
> | "Use User ID As Key" >> Map(lambda x: (x["user_id"], x))
> | "Apply Session Window" >>
> WindowInto(window.Sessions(known_args.session_interval))
> | "Group" >> GroupByKey()
> | "Write To CSV" >> ParDo(WriteToCSV(known_args.output))
> )
> result = pipeline.run()
> result.wait_until_finish()
> -
>
> session_interval (60 minutes) is provided as bellow.
>
> -
> parser.add_argument(
> "--session_interval",
> help="Interval of each session",
> default=60*60) # 60 mins
> -
>
> WriteToCSV function process data per session. I logged the session
> duration but it was not accurate.
>
> -
> class WriteToCSV(DoFn):
> def __init__(self, output_path):
> self.output_path = output_path
>
> def process(self, element, window=DoFn.WindowParam):
> window_start = window.start.to_utc_datetime()
> window_end = window.end.to_utc_datetime()
> duration = window_end - window_start
> logging.info(">>> new %s record(s) in %s session (start %s end
> %s)", len(click_records), duration, window_start, window_end)
> 
> -
>
> Then I got this log messages when I run this application locally with
> DirectRunner.
>
> -
> new 5 records in 3:00:00 session (start 2018-10-19 02:00:00 end 2018-10-19
> 05:00:00)
> new 2 records in 1:01:00 session (start 2018-10-19 02:02:00 end 2018-10-19
> 03:03:00)
> new 2 records in 1:50:00 session (start 2018-10-19 03:10:00 end 2018-10-19
> 05:00:00
> -
>
> Thanks.
>
>
>


Apache Beam Python SDK - Inaccurate session window interval

2020-04-24 Thread Yohei Onishi
Hi,

I am trying to process data with 60 minutes session interval using Apache
Beam Python SDK. But the actual session interval was inaccurate such as
3:00:00 or 1:01:00 or 1:50:00 when I run my application locally using
DirectRunner.

Would you help me find a solution to fix this issue and process data with
60 minutes session?

I built my pipeline as bellow.

-
with Pipeline(options=pipeline_options) as pipeline:
(
pipeline
| "Read" >> ReadFromText(known_args.input, skip_header_lines=1)
| "Convert" >> ParDo(Convert())
| "Add Timestamp" >> Map(lambda x: window.TimestampedValue(x,
get_timestamp_from_element(x).timestamp()))
| "Use User ID As Key" >> Map(lambda x: (x["user_id"], x))
| "Apply Session Window" >>
WindowInto(window.Sessions(known_args.session_interval))
| "Group" >> GroupByKey()
| "Write To CSV" >> ParDo(WriteToCSV(known_args.output))
)
result = pipeline.run()
result.wait_until_finish()
-

session_interval (60 minutes) is provided as bellow.

-
parser.add_argument(
"--session_interval",
help="Interval of each session",
default=60*60) # 60 mins
-

WriteToCSV function process data per session. I logged the session duration
but it was not accurate.

-
class WriteToCSV(DoFn):
def __init__(self, output_path):
self.output_path = output_path

def process(self, element, window=DoFn.WindowParam):
window_start = window.start.to_utc_datetime()
window_end = window.end.to_utc_datetime()
duration = window_end - window_start
logging.info(">>> new %s record(s) in %s session (start %s end
%s)", len(click_records), duration, window_start, window_end)

-

Then I got this log messages when I run this application locally with
DirectRunner.

-
new 5 records in 3:00:00 session (start 2018-10-19 02:00:00 end 2018-10-19
05:00:00)
new 2 records in 1:01:00 session (start 2018-10-19 02:02:00 end 2018-10-19
03:03:00)
new 2 records in 1:50:00 session (start 2018-10-19 03:10:00 end 2018-10-19
05:00:00
-

Thanks.