Re: Any recomendation for key for GroupIntoBatches

2024-04-15 Thread Robert Bradshaw via user
On Fri, Apr 12, 2024 at 1:39 PM Ruben Vargas 
wrote:

> On Fri, Apr 12, 2024 at 2:17 PM Jaehyeon Kim  wrote:
> >
> > Here is an example from a book that I'm reading now and it may be
> applicable.
> >
> > JAVA - (id.hashCode() & Integer.MAX_VALUE) % 100
> > PYTHON - ord(id[0]) % 100
>

or abs(hash(id)) % 100, in case the first character of your id is not well
distributed.


> Maybe this is what I'm looking for. I'll give it a try. Thanks!
>
> >
> > On Sat, 13 Apr 2024 at 06:12, George Dekermenjian 
> wrote:
> >>
> >> How about just keeping track of a buffer and flush the buffer after 100
> messages and if there is a buffer on finish_bundle as well?
> >>
> >>
>
> If this is in memory, It could lead to potential loss of data. That is
> why the state is used or at least that is my understanding. but maybe
> there is a way to do this in the state?
>

Bundles are the unit of commitment in Beam [1], so finish_bundle won't drop
any data. A possible downside is that, especially in streaming, they may be
small which would cap the amount of batching you get.

https://beam.apache.org/documentation/runtime/model/#bundling-and-persistence


> >> On Fri, Apr 12, 2024 at 21.23 Ruben Vargas 
> wrote:
> >>>
> >>> Hello guys
> >>>
> >>> Maybe this question was already answered, but I cannot find it  and
> >>> want some more input on this topic.
> >>>
> >>> I have some messages that don't have any particular key candidate,
> >>> except the ID,  but I don't want to use it because the idea is to
> >>> group multiple IDs in the same batch.
> >>>
> >>> This is my use case:
> >>>
> >>> I have an endpoint where I'm gonna send the message ID, this endpoint
> >>> is gonna return me certain information which I will use to enrich my
> >>> message. In order to avoid fetching the endpoint per message I want to
> >>> batch it in 100 and send the 100 IDs in one request ( the endpoint
> >>> supports it) . I was thinking on using GroupIntoBatches.
> >>>
> >>> - If I choose the ID as the key, my understanding is that it won't
> >>> work in the way I want (because it will form batches of the same ID).
> >>> - Use a constant will be a problem for parallelism, is that correct?
> >>>
> >>> Then my question is, what should I use as a key? Maybe something
> >>> regarding the timestamp? so I can have groups of messages that arrive
> >>> at a certain second?
> >>>
> >>> Any suggestions would be appreciated
> >>>
> >>> Thanks.
>


Re: Hot update in dataflow without lossing messages

2024-04-15 Thread Robert Bradshaw via user
Are you draining[1] your pipeline or simply canceling it and starting a new
one? Draining should close open windows and attempt to flush all in-flight
data before shutting down. For PubSub you may also need to read from
subscriptions rather than topics to ensure messages are processed by either
one or the other.

[1] https://cloud.google.com/dataflow/docs/guides/stopping-a-pipeline#drain

On Mon, Apr 15, 2024 at 9:33 AM Juan Romero  wrote:

> Hi guys. Good morning.
>
> I haven't done some test in apache beam over data flow in order to see if
> i can do an hot update or hot swap meanwhile the pipeline is processing a
> bunch of messages that fall in a time window of 10 minutes. What I saw is
> that when I do a hot update over the pipeline and currently there are some
> messages in the time window (before sending them to the target), the
> current job is shutdown and dataflow creates a new one. The problem is that
> it seems that I am losing the messages that were being processed in the old
> one and they are not taken by the new one, which imply we are incurring in
> losing data .
>
> Can you help me or recommend any strategy to me?
>
> Thanks!!
>


Re: Fails to run two multi-language pipelines locally?

2024-03-08 Thread Robert Bradshaw via user
The way that cross-language pipelines work is that each transform has
an attached "environment" in which its workers should be instantiated.
By default these are identified as docker images + a possible set of
dependencies. Transforms with the same environment can be colocated.

There is a tension between having large environments that can be
shared and more targeted ones that have fewer dependencies.

By default the environment for SQL relies on Beam's jar built as
"sdks:java:extensions:sql:expansion-service:shadowJar" and the
environment for Kafka relies on Beam's jar for
"sdks:java:io:expansion-service:shadowJar" To use a single jar you
could create a jar with both of these (plus whatever else you need)
and use that (the easiest would be to pass the flag

--beamServices='{
  "sdks:java:extensions:sql:expansion-service:shadowJar": "/path/to/uber.jar",
  "sdks:java:io:expansion-service:shadowJar": "/path/to/uber.jar"
}'

That being said, it should work just fine with distinct jars on an
arbitrary setup as well (and the fact that it works locally hints to
something being up with that setup). Not sure what your flink master
configuration is like, but maybe it's a docker-in-docker issue?

On Wed, Mar 6, 2024 at 8:30 PM Jaehyeon Kim  wrote:
>
> It may be a configuration issue. It works if I don't specify flink_master, 
> which uses an embedded flink cluster.
>
> On Thu, 7 Mar 2024 at 12:47, Jaehyeon Kim  wrote:
>>
>> I works fine if I only use Kafka read/write as I only see a single container 
>> - two transforms (read and write) but a single container.
>>
>> If I add SqlTransform, however, another container is created and it begins 
>> to create an error. My speculation is the containers don't recognise each 
>> other and get killed by the Flink task manager. I see containers are kept 
>> created and killed.
>>
>> Does every multi-language pipeline runs in a separate container?
>>
>> On Thu, 7 Mar 2024, 12:35 pm Robert Bradshaw via user, 
>>  wrote:
>>>
>>> Oh, sorry, I didn't see that.
>>>
>>> I would look earlier in the logs and see why it failed to bring up the
>>> containers (or, if they started, look in the container logs to see why
>>> they died).
>>>
>>> On Wed, Mar 6, 2024 at 5:28 PM Jaehyeon Kim  wrote:
>>> >
>>> > I am not using the python local runner but the flink runner. A flink 
>>> > cluster is started locally.
>>> >
>>> > On Thu, 7 Mar 2024 at 12:13, Robert Bradshaw via user 
>>> >  wrote:
>>> >>
>>> >> Streaming portable pipelines are not yet supported on the Python local 
>>> >> runner.
>>> >>
>>> >> On Wed, Mar 6, 2024 at 5:03 PM Jaehyeon Kim  wrote:
>>> >> >
>>> >> > Hello,
>>> >> >
>>> >> > I use the python SDK and my pipeline reads messages from Kafka and 
>>> >> > transforms via SQL. I see two containers are created but it seems that 
>>> >> > they don't communicate with each other so that the Flink task manager 
>>> >> > keeps killing the containers. The Flink cluster runs locally. Is there 
>>> >> > a way to run two multi-language pipelines (running on Docker) 
>>> >> > communicating with each other?
>>> >> >
>>> >> > Cheers,
>>> >> > Jaehyeon
>>> >> >
>>> >> >
>>> >> >
>>> >> > def run():
>>> >> > parser = argparse.ArgumentParser(
>>> >> > description="Process statistics by user from website visit 
>>> >> > event"
>>> >> > )
>>> >> > parser.add_argument(
>>> >> > "--inputs",
>>> >> > default="inputs",
>>> >> > help="Specify folder name that event records are saved",
>>> >> > )
>>> >> > parser.add_argument(
>>> >> > "--runner", default="FlinkRunner", help="Specify Apache Beam 
>>> >> > Runner"
>>> >> > )
>>> >> > opts = parser.parse_args()
>>> >> >
>>> >> > options = PipelineOptions()
>>> >> > pipeline_opts = {
>>> >> > "runner": opts.runner,
>>> >> > "flink_master": "localhost:8081",
>>> &g

Re: [Question] Python Streaming Pipeline Support

2024-03-08 Thread Robert Bradshaw via user
The Python Local Runner has limited support for streaming pipelines.
For the time being would recommend using Dataflow or Flink (the latter
can be run locally) to try out streaming pipelines.

On Fri, Mar 8, 2024 at 2:11 PM Puertos tavares, Jose J (Canada) via
user  wrote:
>
> Hello  Hu:
>
>
>
> Not really.  This one as you have coded it finishes as per   
> stop_timestamp=time.time() + 16 and after it finish emitting then everything 
> else gets output and the pipeline in batch mode terminates.
>
>
>
> You can rule out STDOUT issues and  confirm this behavior as putting a ParDo  
> with something that would throw an exception after the GroupBy  or write 
> temporary files/make HTTP requests. This ParDO won’t be executed until your 
> PeriodImpulse terminates (you can extend it to +60  and see this is not being 
> trigger on your 4 second window, but until it stops generating)
>
>
>
> I am looking for something that is really streaming and executes constantly 
> and that in this case , every 4 seconds the window would process the elements 
> in the window and wait for the next window to accumulate.
>
>
>
> Regards,
>
> JP
>
>
>
>
>
>
>
>
>
>
> INTERNAL USE
>
> From: XQ Hu 
> Sent: Friday, March 8, 2024 3:51 PM
> To: user@beam.apache.org
> Cc: Puertos tavares, Jose J (Canada) 
> Subject: [EXTERNAL] Re: [Question] Python Streaming Pipeline Support
>
>
>
> Is this what you are looking for? import random import time import 
> apache_beam as beam from apache_beam. transforms import trigger, window from 
> apache_beam. transforms. periodicsequence import PeriodicImpulse from 
> apache_beam. utils. timestamp import
>
> Is this what you are looking for?
>
>
>
> import random
> import time
>
> import apache_beam as beam
> from apache_beam.transforms import trigger, window
> from apache_beam.transforms.periodicsequence import PeriodicImpulse
> from apache_beam.utils.timestamp import Timestamp
>
> with beam.Pipeline() as p:
> input = (
> p
> | PeriodicImpulse(
> start_timestamp=time.time(),
> stop_timestamp=time.time() + 16,
> fire_interval=1,
> apply_windowing=False,
> )
> | beam.Map(lambda x: random.random())
> | beam.WindowInto(window.FixedWindows(4))
> | beam.GroupBy()
> | "Print Windows"
> >> beam.transforms.util.LogElements(with_timestamp=True, 
> with_window=True)
> )
>
>
>
> On Fri, Mar 8, 2024 at 6:48 AM Puertos tavares, Jose J (Canada) via user 
>  wrote:
>
> Hello Beam Users!
>
>
>
> I was looking into a simple example in Python to have an unbound (--streaming 
> flag ) pipeline that generated random numbers , applied a Fixed Window (let’s 
> say 5 seconds) and then applies a group by operation ( reshuffle) and print 
> the result just to check.
>
>
>
> I notice that this seems to work as long as there is no grouping operation 
> (reshuffle, groupBy ,etc. ) that would leverage the windowing semantics.
>
>
>
> #Get Parameters from Command Line for the Pipeline
>
> known_args, pipeline_options = parser.parse_known_args(argv)
>
> pipeline_options = PipelineOptions(flags=argv)
>
>
>
> #Create pipeline
>
> p = beam.Pipeline(options=pipeline_options)
>
>
>
>
>
> #Execute Pipeline
>
> (p | "Start pipeline " >> beam.Create([0])
>
> | "Get values"  >> beam.ParDo(RandomNumberGenerator())
>
> | 'Applied fixed windows ' >> beam.WindowInto( window.FixedWindows(1*5) )
>
> | 'Reshuffle ' >> beam.Reshuffle()
>
> |  "Print" >> beam.Map(lambda x: print ("{} - {} ".format(os.getpid(), x) 
> ,flush=True ) )
>
> )
>
>
>
> result = p.run()
>
> result.wait_until_finish()
>
>
>
>
>
> Even thought the  Random Generator is unbound and tagged as so with the 
> decorator, it seems to stuck, if I make that step finite (i.e. adding a 
> counter and exiting) then the code works in regular batch mode.
>
>
>
> # 
> =
>
> # Class for Splittable Do  Random Generatered numbers
>
> # 
> =
>
>
>
> @beam.transforms.core.DoFn.unbounded_per_element()
>
> class RandomNumberGenerator(beam.DoFn):
>
>
>
> @beam.transforms.core.DoFn.unbounded_per_element()
>
> def process(self, element ):
>
> import random
>
> import time
>
>
>
> counter=0
>
>
>
>
>
> while True:
>
>
>
> #if counter>5:
>
> #break
>
> nmb = random.randint(0, 1000)
>
> wait = random.randint(0, 5)
>
> rnow = time.time()
>
>
>
>
>
> print("Randy random", nmb)
>
>
>
> yield beam.window.TimestampedValue(nmb, rnow)
>
> time.sleep(wait)
>
> counter+=1
>
>
>
> I have tried to implement as per documentation the tracker and watermark, but 
> it seems that none of that seems to work either for the DirectRunner or 
> 

Re: Fails to run two multi-language pipelines locally?

2024-03-06 Thread Robert Bradshaw via user
Oh, sorry, I didn't see that.

I would look earlier in the logs and see why it failed to bring up the
containers (or, if they started, look in the container logs to see why
they died).

On Wed, Mar 6, 2024 at 5:28 PM Jaehyeon Kim  wrote:
>
> I am not using the python local runner but the flink runner. A flink cluster 
> is started locally.
>
> On Thu, 7 Mar 2024 at 12:13, Robert Bradshaw via user  
> wrote:
>>
>> Streaming portable pipelines are not yet supported on the Python local 
>> runner.
>>
>> On Wed, Mar 6, 2024 at 5:03 PM Jaehyeon Kim  wrote:
>> >
>> > Hello,
>> >
>> > I use the python SDK and my pipeline reads messages from Kafka and 
>> > transforms via SQL. I see two containers are created but it seems that 
>> > they don't communicate with each other so that the Flink task manager 
>> > keeps killing the containers. The Flink cluster runs locally. Is there a 
>> > way to run two multi-language pipelines (running on Docker) communicating 
>> > with each other?
>> >
>> > Cheers,
>> > Jaehyeon
>> >
>> >
>> >
>> > def run():
>> > parser = argparse.ArgumentParser(
>> > description="Process statistics by user from website visit event"
>> > )
>> > parser.add_argument(
>> > "--inputs",
>> > default="inputs",
>> > help="Specify folder name that event records are saved",
>> > )
>> > parser.add_argument(
>> > "--runner", default="FlinkRunner", help="Specify Apache Beam 
>> > Runner"
>> > )
>> > opts = parser.parse_args()
>> >
>> > options = PipelineOptions()
>> > pipeline_opts = {
>> > "runner": opts.runner,
>> > "flink_master": "localhost:8081",
>> > "job_name": "traffic-agg-sql",
>> > "environment_type": "LOOPBACK",
>> > "streaming": True,
>> > "parallelism": 3,
>> > "experiments": [
>> > "use_deprecated_read"
>> > ],  ## https://github.com/apache/beam/issues/20979
>> > "checkpointing_interval": "6",
>> > }
>> > options = PipelineOptions([], **pipeline_opts)
>> > # Required, else it will complain that when importing worker functions
>> > options.view_as(SetupOptions).save_main_session = True
>> >
>> > query = """
>> > WITH cte AS (
>> > SELECT id, CAST(event_datetime AS TIMESTAMP) AS ts
>> > FROM PCOLLECTION
>> > )
>> > SELECT
>> > CAST(TUMBLE_START(ts, INTERVAL '10' SECOND) AS VARCHAR) AS 
>> > window_start,
>> > CAST(TUMBLE_END(ts, INTERVAL '10' SECOND) AS VARCHAR) AS 
>> > window_end,
>> > COUNT(*) AS page_view
>> > FROM cte
>> > GROUP BY
>> > TUMBLE(ts, INTERVAL '10' SECOND), id
>> > """
>> >
>> > p = beam.Pipeline(options=options)
>> > (
>> > p
>> > | "Read from Kafka"
>> > >> kafka.ReadFromKafka(
>> > consumer_config={
>> > "bootstrap.servers": os.getenv(
>> > "BOOTSTRAP_SERVERS",
>> > "host.docker.internal:29092",
>> > ),
>> > "auto.offset.reset": "earliest",
>> > # "enable.auto.commit": "true",
>> > "group.id": "traffic-agg",
>> > },
>> > topics=["website-visit"],
>> > )
>> > | "Decode messages" >> beam.Map(decode_message)
>> > | "Parse elements" >> 
>> > beam.Map(parse_json).with_output_types(EventLog)
>> > | "Format timestamp" >> 
>> > beam.Map(format_timestamp).with_output_types(EventLog)
>> > | "Count per minute" >> SqlTransform(query)
>> > | beam.Map(print)
>> > )
>> >
>> > logging.

Re: Fails to run two multi-language pipelines locally?

2024-03-06 Thread Robert Bradshaw via user
Streaming portable pipelines are not yet supported on the Python local runner.

On Wed, Mar 6, 2024 at 5:03 PM Jaehyeon Kim  wrote:
>
> Hello,
>
> I use the python SDK and my pipeline reads messages from Kafka and transforms 
> via SQL. I see two containers are created but it seems that they don't 
> communicate with each other so that the Flink task manager keeps killing the 
> containers. The Flink cluster runs locally. Is there a way to run two 
> multi-language pipelines (running on Docker) communicating with each other?
>
> Cheers,
> Jaehyeon
>
>
>
> def run():
> parser = argparse.ArgumentParser(
> description="Process statistics by user from website visit event"
> )
> parser.add_argument(
> "--inputs",
> default="inputs",
> help="Specify folder name that event records are saved",
> )
> parser.add_argument(
> "--runner", default="FlinkRunner", help="Specify Apache Beam Runner"
> )
> opts = parser.parse_args()
>
> options = PipelineOptions()
> pipeline_opts = {
> "runner": opts.runner,
> "flink_master": "localhost:8081",
> "job_name": "traffic-agg-sql",
> "environment_type": "LOOPBACK",
> "streaming": True,
> "parallelism": 3,
> "experiments": [
> "use_deprecated_read"
> ],  ## https://github.com/apache/beam/issues/20979
> "checkpointing_interval": "6",
> }
> options = PipelineOptions([], **pipeline_opts)
> # Required, else it will complain that when importing worker functions
> options.view_as(SetupOptions).save_main_session = True
>
> query = """
> WITH cte AS (
> SELECT id, CAST(event_datetime AS TIMESTAMP) AS ts
> FROM PCOLLECTION
> )
> SELECT
> CAST(TUMBLE_START(ts, INTERVAL '10' SECOND) AS VARCHAR) AS 
> window_start,
> CAST(TUMBLE_END(ts, INTERVAL '10' SECOND) AS VARCHAR) AS window_end,
> COUNT(*) AS page_view
> FROM cte
> GROUP BY
> TUMBLE(ts, INTERVAL '10' SECOND), id
> """
>
> p = beam.Pipeline(options=options)
> (
> p
> | "Read from Kafka"
> >> kafka.ReadFromKafka(
> consumer_config={
> "bootstrap.servers": os.getenv(
> "BOOTSTRAP_SERVERS",
> "host.docker.internal:29092",
> ),
> "auto.offset.reset": "earliest",
> # "enable.auto.commit": "true",
> "group.id": "traffic-agg",
> },
> topics=["website-visit"],
> )
> | "Decode messages" >> beam.Map(decode_message)
> | "Parse elements" >> beam.Map(parse_json).with_output_types(EventLog)
> | "Format timestamp" >> 
> beam.Map(format_timestamp).with_output_types(EventLog)
> | "Count per minute" >> SqlTransform(query)
> | beam.Map(print)
> )
>
> logging.getLogger().setLevel(logging.INFO)
> logging.info("Building pipeline ...")
>
> p.run().wait_until_finish()
>
> Here is the error message from the flink UI.
>
> 2024-03-07 12:01:41
> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException:
>  java.lang.IllegalStateException: No container running for id 
> cd132e177c5867a80becaeffd50075e87287db468e069c61e02056c7cdc90fc8
> at 
> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2086)
> at 
> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.get(LocalCache.java:4012)
> at 
> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4035)
> at 
> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:5013)
> at 
> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:5020)
> at 
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.(DefaultJobBundleFactory.java:458)
> at 
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.(DefaultJobBundleFactory.java:443)
> at 
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:310)
> at 
> org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.getStageBundleFactory(DefaultExecutableStageContext.java:38)
> at 
> org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingExecutableStageContextFactory.java:207)
> at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator.open(ExecutableStageDoFnOperator.java:258)
> at 
> 

Re: Roadmap of Calcite support on Beam SQL?

2024-03-04 Thread Robert Bradshaw via user
There is no longer a huge amount of active development going on here,
but implementing a missing function seems like an easy contribution
(lots of examples to follow). Otherwise, definitely worth filing a
feature request as a useful signal for prioritization.

On Mon, Mar 4, 2024 at 4:33 PM Jaehyeon Kim  wrote:
>
> Hi Wiśniowski
>
> Thank you so much for your reply. The query works for me. As I'm new to Beam 
> SQL, I'd spend some more time before issuing a feature request.
>
> Cheers,
> Jaehyeon
>
> On Mon, 4 Mar 2024 at 23:03, Wiśniowski Piotr 
>  wrote:
>>
>> Hi,
>>
>> 1. I do not have up to date knowledge, but Beam sql was missing quite a lot 
>> of things regarding Calcite full support. I think the current way is to 
>> create a feature request on repository and get votes and interest. I 
>> definitely would vote for You initiative ;)
>>
>> 2. Regarding the query itself I got it working for something like this:
>> ```
>>
>> WITH cte AS (
>> SELECT CAST(event_datetime AS TIMESTAMP) AS ts
>> FROM PCOLLECTION
>> )
>> SELECT
>> CAST(TUMBLE_START(cte.ts, INTERVAL '1' MINUTE) AS VARCHAR) AS start_time,
>> CAST(TUMBLE_END(cte.ts, INTERVAL '1' MINUTE) AS VARCHAR) AS end_time,
>> COUNT(*) AS page_views
>> FROM cte
>> GROUP BY
>> TUMBLE(cte.ts, INTERVAL '1' MINUTE)
>> ;
>>
>> ```
>>
>> Maybe it would be useful for you. Note that I am not up to date with recent 
>> versions of Beam SQL, but I will need to catch up (the syntax for defining 
>> window on table is cool).
>>
>> Best
>>
>> Wiśniowski Piotr
>>
>> On 4.03.2024 05:27, Jaehyeon Kim wrote:
>>
>> Hello,
>>
>> I just tried a simple tumbling window but failed with the following error
>>
>> RuntimeError: org.apache.beam.sdk.extensions.sql.impl.ParseException: Unable 
>> to parse query
>> WITH cte AS (
>> SELECT TO_TIMESTAMP(event_datetime) AS ts FROM PCOLLECTION
>> )
>> SELECT
>> CAST(window_start AS VARCHAR) AS start_time,
>> CAST(window_end AS VARCHAR) AS end_time,
>> COUNT(*) AS page_views
>> FROM TABLE(
>> TUMBLE(TABLE cte, DESCRIPTOR(ts), 'INTERVAL 1 MINUTE')
>> )
>> GROUP BY
>> window_start, window_end
>>
>> I guess it is because TO_TIMESTAMP is not implemented. When I check the 
>> document, it misses lots of functions. Is there any roadmap about Calcite 
>> support on Beam SQL?
>>
>> Cheers,
>> Jaehyeon


Re: ParDo(DoFn) with multiple context.output vs FlatMapElements

2024-01-26 Thread Robert Bradshaw via user
There is no difference; FlatMapElements is implemented in terms of a
DoFn that invokes context.output multiple times. And, yes, Dataflow
will fuse consecutive operations automatically. So if you have
something like

... -> DoFnA -> DoFnB -> GBK -> DoFnC -> ...

Dataflow will fuse DoFnA and DoFnB together, and if DoFnA produces a
lot of data for DoFnB to consume then more workers will be allocated
to handle the (DoFnA + DoFnB) combination. If the fanout is so huge
that a single worker would not be expected to handle the output DoFnA
produces from a single input, you could look into making DoFnA into a
SplittableDoFn https://beam.apache.org/blog/splittable-do-fn-is-available
. If DoFnB is just really expensive, you can also decouple the
parallelism between the two with a Reshuffle. Most of the time neither
of these is needed.

On Wed, Dec 27, 2023 at 5:44 PM hsy...@gmail.com  wrote:
>
> Hello
>
> I have a question. If I have a transform for each input it will emit 1 or 
> many output (same collection)
> I can do it with ParDo + DoFun while in processElement method for each input, 
> call context.output multiply times vs doing it with FlatMapElements, is there 
> any difference? Does the dataflow fuse the downstream transform 
> automatically? Eventually I want more downstream transform workers cause it 
> needs to handle more data, How do I supposed to do that?
>
> Regards,
> Siyuan


Re: Downloading and executing addition jar file when using Python API

2024-01-24 Thread Robert Bradshaw via user
On Wed, Jan 24, 2024 at 10:48 AM Mark Striebeck
 wrote:
>
> If point beam to the local jar, will beam start and also stop the expansion 
> service?

Yes it will.

> Thanks
>  Mark
>
> On Wed, 24 Jan 2024 at 08:30, Robert Bradshaw via user  
> wrote:
>>
>> You can also manually designate a replacement jar to be used rather
>> than fetching the jar from maven, either as a pipeline option or (as
>> of the next release) as an environment variable. The format is a json
>> mapping from gradle targets (which is how we identify these jars) to
>> local files (or urls). For example, pass
>>
>>   --beam_services='{":sdks:java:extensions:sql:expansion-service:shadowJar":
>> "/path/to/your/copy.jar"}'
>>
>> to use the local jar to automatically expand your SQL transforms.
>>
>> See the docs at
>> https://github.com/apache/beam/blob/7e95776a8d08ef738be49ef47842029c306f2bf5/sdks/python/apache_beam/options/pipeline_options.py#L587
>>
>> On Tue, Jan 23, 2024 at 5:59 PM Chamikara Jayalath via user
>>  wrote:
>> >
>> > The expansion service jar is needed since sql.py includes cross-language 
>> > transforms that use the Java implementation behind the hood.
>> >
>> > Once downloaded, the jar is cached, and subsequent jobs should use the jar 
>> > from that location.
>> >
>> > If you want to use a locally available jar, you can manually startup an 
>> > expansion service [1] and point the Python SQL transform to that [2].
>> >
>> > Thanks,
>> > Cham
>> >
>> > [1] 
>> > https://beam.apache.org/documentation/sdks/python-multi-language-pipelines/#choose-an-expansion-service
>> > [2] 
>> > https://github.com/apache/beam/blob/7ff25d896250508570b27683bc76523ac2fe3210/sdks/python/apache_beam/transforms/sql.py#L84
>> >
>> > On Tue, Jan 23, 2024 at 3:57 PM Mark Striebeck  
>> > wrote:
>> >>
>> >> Hi,
>> >>
>> >> Sorry, this question seems so obvious that I'm sure it came up before. 
>> >> But I couldn't find anything in the docs or the mail archives. Feel free 
>> >> to point me in the right direction...
>> >>
>> >> We are using the Python API for Beam. Recently we started using Beam SQL 
>> >> - which apparently needs a jar file that is not provided with the Python 
>> >> Pip package. When I run tests,I can see that Beam downloads 
>> >> beam-sdks-java-extensions-sql-expansion-service-2.52.0.jar and unpacks it 
>> >> into ~/.apache_beam and uses it to start an RPC server.
>> >>
>> >> While this works for local testing, I am trying to figure out how to work 
>> >> this into our CI and deployment process.
>> >>
>> >> Preferably would be to download a pip package that has this jar (and 
>> >> others) in it and just uses it.
>> >>
>> >> If that doesn't exist (I couldn't find it), then we'd need to check this 
>> >> jar file into our source tree, so that we can use it for CI but then also 
>> >> make it part of the docker image that we use to run our Beam pipelines on 
>> >> GCP Dataflow. How could I tell Beam to use that file instead of 
>> >> downloading it? I tried obvious settings like CLASSPATH environment 
>> >> variable - but nothing works. Beam always tries to fetch the file from 
>> >> maven.
>> >>
>> >> Again, feel free to point me to any relevant mail discussion or web page.
>> >>
>> >> Thanks
>> >>  Mark


Re: Downloading and executing addition jar file when using Python API

2024-01-24 Thread Robert Bradshaw via user
You can also manually designate a replacement jar to be used rather
than fetching the jar from maven, either as a pipeline option or (as
of the next release) as an environment variable. The format is a json
mapping from gradle targets (which is how we identify these jars) to
local files (or urls). For example, pass

  --beam_services='{":sdks:java:extensions:sql:expansion-service:shadowJar":
"/path/to/your/copy.jar"}'

to use the local jar to automatically expand your SQL transforms.

See the docs at
https://github.com/apache/beam/blob/7e95776a8d08ef738be49ef47842029c306f2bf5/sdks/python/apache_beam/options/pipeline_options.py#L587

On Tue, Jan 23, 2024 at 5:59 PM Chamikara Jayalath via user
 wrote:
>
> The expansion service jar is needed since sql.py includes cross-language 
> transforms that use the Java implementation behind the hood.
>
> Once downloaded, the jar is cached, and subsequent jobs should use the jar 
> from that location.
>
> If you want to use a locally available jar, you can manually startup an 
> expansion service [1] and point the Python SQL transform to that [2].
>
> Thanks,
> Cham
>
> [1] 
> https://beam.apache.org/documentation/sdks/python-multi-language-pipelines/#choose-an-expansion-service
> [2] 
> https://github.com/apache/beam/blob/7ff25d896250508570b27683bc76523ac2fe3210/sdks/python/apache_beam/transforms/sql.py#L84
>
> On Tue, Jan 23, 2024 at 3:57 PM Mark Striebeck  
> wrote:
>>
>> Hi,
>>
>> Sorry, this question seems so obvious that I'm sure it came up before. But I 
>> couldn't find anything in the docs or the mail archives. Feel free to point 
>> me in the right direction...
>>
>> We are using the Python API for Beam. Recently we started using Beam SQL - 
>> which apparently needs a jar file that is not provided with the Python Pip 
>> package. When I run tests,I can see that Beam downloads 
>> beam-sdks-java-extensions-sql-expansion-service-2.52.0.jar and unpacks it 
>> into ~/.apache_beam and uses it to start an RPC server.
>>
>> While this works for local testing, I am trying to figure out how to work 
>> this into our CI and deployment process.
>>
>> Preferably would be to download a pip package that has this jar (and others) 
>> in it and just uses it.
>>
>> If that doesn't exist (I couldn't find it), then we'd need to check this jar 
>> file into our source tree, so that we can use it for CI but then also make 
>> it part of the docker image that we use to run our Beam pipelines on GCP 
>> Dataflow. How could I tell Beam to use that file instead of downloading it? 
>> I tried obvious settings like CLASSPATH environment variable - but nothing 
>> works. Beam always tries to fetch the file from maven.
>>
>> Again, feel free to point me to any relevant mail discussion or web page.
>>
>> Thanks
>>  Mark


Re: TypeError: '_ConcatSequence' object is not subscriptable

2024-01-22 Thread Robert Bradshaw via user
This is probably because you're trying to index into the result of the
GroupByKey in your AnalyzeSession as if it were a list. All that is
promised is that it is an iterable. If it is large enough to merit
splitting over multiple fetches, it won't be a list. (If you need to
index, explicitly convert it into a list first, assuming it fits into
memory. Otherwise just stick to re-iterating over it.)

On Mon, Jan 22, 2024 at 7:21 AM Nimrod Shory  wrote:
>
> Hello everyone,
> We encounter a weird issue while running a Python + Beam streaming job on 
> Google Cloud Dataflow.
> The job listens to a PubSub subscription of events, and my pipeline looks 
> like this:
>
>> messages = (
>>  p | "Read Topic" >> 
>> beam.io.ReadFromPubSub(subscription=options.subscription.get())
>>| "JSON" >> beam.Map(json.loads)
>> )
>> sessions = (
>> messages | "Add Keys" >> beam.WithKeys(lambda x: x["id"])
>> | "Session Window" >> 
>> beam.WindowInto(beam.window.Sessions(SESSION_TIMEOUT))
>> | beam.GroupByKey()
>> | "Analyze Session" >> beam.ParDo(AnalyzeSession())
>> )
>> sessions | beam.io.WriteToPubSub(topic=options.session_topic.get())
>
>
>
> After it runs for some time without any issues, I suddenly start getting the 
> following error:
>
>> TypeError: '_ConcatSequence' object is not subscriptable
>
>
> Instead of getting the expected key value pair I usually get:
>>
>> ('ID123', [{...},{...},{...}])
>
>
> I start getting:
>>
>> ('ID234', > 0x7feca40d1d90>)
>
>
> I suspect this happens due to a heavy load, but I could not find any 
> information on why it could happen and how to mitigate it.
>
> Any help would be much appreciated!
> Thanks.


Re: Does withkeys transform enforce a reshuffle?

2024-01-19 Thread Robert Bradshaw via user
Reshuffle is perfectly fine to use if the goal is just to redistribute
work. It's only deprecated as a "checkpointing" mechanism.

On Fri, Jan 19, 2024 at 9:44 AM Danny McCormick via user
 wrote:
>
> For runners that support Reshuffle, it should be safe to use. Its been 
> "deprecated" for 7 years, but is still heavily used/often the recommended way 
> to do things like this. I actually just added a PR to undeprecate it earlier 
> today. Looks like you're using Dataflow, which also has always supported 
> ReShuffle.
>
> > Also I looked at the code, reshuffle seems doing some groupby work 
> > internally. But I don't really need groupby
>
> Groupby is basically an implementation detail that creates the desired 
> shuffling behavior in many runners (runners can also override transform 
> implementations if needed for some primitives like this, but that's another 
> can of worms). Basically, in order to prevent fusion you need some operation 
> that does this and GroupBy is one option.
>
> Given that you're using DataFlow, I'd also recommend checking out 
> https://cloud.google.com/dataflow/docs/pipeline-lifecycle#prevent_fusion 
> which describes how to do this in more detail.
>
> Thanks,
> Danny
>
> On Fri, Jan 19, 2024 at 12:36 PM hsy...@gmail.com  wrote:
>>
>> Also I looked at the code, reshuffle seems doing some groupby work 
>> internally. But I don't really need groupby
>>
>> On Fri, Jan 19, 2024 at 9:35 AM hsy...@gmail.com  wrote:
>>>
>>> ReShuffle is deprecated
>>>
>>> On Fri, Jan 19, 2024 at 8:25 AM XQ Hu via user  wrote:

 I do not think it enforces a reshuffle by just checking the doc here: 
 https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html?highlight=withkeys#apache_beam.transforms.util.WithKeys

 Have you tried to just add ReShuffle after PubsubLiteIO?

 On Thu, Jan 18, 2024 at 8:54 PM hsy...@gmail.com  wrote:
>
> Hey guys,
>
> I have a question, does withkeys transformation enforce a reshuffle?
>
> My pipeline basically look like this PubsubLiteIO -> ParDo(..) -> ParDo() 
> -> BigqueryIO.write()
>
> The problem is PubsubLiteIO -> ParDo(..) -> ParDo() always fused 
> together. But The ParDo is expensive and I want dataflow to have more 
> workers to work on that, what's the best way to do that?
>
> Regards,
>


Re: How to debug ArtifactStagingService ?

2024-01-05 Thread Robert Bradshaw via user
Nothing problematic is standing out for me in those logs. A job
service and artifact staging service is spun up to allow the job (and
its artifacts) to be submitted, then they are shut down. What are the
actual errors that you are seeing?

On Wed, Jan 3, 2024 at 7:39 AM Lydian  wrote:
>
>
> Hi,
>
> We are running Beam 2.41.0 with the portable flink runner using python SDK. 
> However, we suddenly noticed that all our jobs are now failing with error 
> like this:
> ```
> 2024-01-03 15:35:30,067 INFO  
> org.apache.beam.runners.jobsubmission.JobServerDriver[] - 
> ArtifactStagingService started on localhost:41047
> 2024-01-03 15:35:31,640 INFO  
> org.apache.beam.runners.jobsubmission.JobServerDriver[] - Java 
> ExpansionService started on localhost:35299
> 2024-01-03 15:35:31,676 INFO  
> org.apache.beam.runners.jobsubmission.JobServerDriver[] - JobService 
> started on localhost:42519
> 2024-01-03 15:35:31,677 INFO  
> org.apache.beam.runners.jobsubmission.JobServerDriver[] - Job server 
> now running, terminate with Ctrl+C
> 2024-01-03 15:35:31,996 INFO  
> org.apache.beam.runners.flink.FlinkPortableClientEntryPoint  [] - Started 
> driver program
> 2024-01-03 15:35:43,899 INFO  
> org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService [] - 
> Staging artifacts for job_12e792dc-6e6f-417f-aad3-0da89df2b6d8.
> 2024-01-03 15:35:43,899 INFO  
> org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService [] - 
> Resolving artifacts for 
> job_12e792dc-6e6f-417f-aad3-0da89df2b6d8.ref_Environment_d
> efault_environment_2.
> 2024-01-03 15:35:43,902 INFO  
> org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService [] - 
> Getting 0 artifacts for 
> job_12e792dc-6e6f-417f-aad3-0da89df2b6d8.external_1beam:en
> v:process:v1.
> 2024-01-03 15:35:43,902 INFO  
> org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService [] - 
> Resolving artifacts for 
> job_12e792dc-6e6f-417f-aad3-0da89df2b6d8.external_1beam:en
> v:process:v1.
> 2024-01-03 15:35:43,903 INFO  
> org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService [] - 
> Getting 1 artifacts for job_12e792dc-6e6f-417f-aad3-0da89df2b6d8.null.
> 2024-01-03 15:36:02,047 INFO  
> org.apache.beam.runners.flink.FlinkPortableClientEntryPoint  [] - Stopping 
> job service
> 2024-01-03 15:36:02,050 INFO  
> org.apache.beam.runners.jobsubmission.JobServerDriver[] - JobServer 
> stopped on localhost:42519
> ```
> It seems like the error is related to the ArtifactStagingService, but I am 
> having trouble identifying the root cause. Wondering if someone would be able 
> to help me figure out how to pull more informative debug logging to fix this 
> issue. Thanks!
>
> Sincerely,
> Lydian Lee
>


Re: Dataflow not able to find a module specified using extra_package

2023-12-19 Thread Robert Bradshaw via user
And should it be a list of strings, rather than a string?

On Tue, Dec 19, 2023 at 10:10 AM Anand Inguva via user 
wrote:

> Can you try passing `extra_packages` instead of `extra_package` when
> passing pipeline options as a dict?
>
> On Tue, Dec 19, 2023 at 12:26 PM Sumit Desai via user <
> user@beam.apache.org> wrote:
>
>> Hi all,
>> I have created a Dataflow pipeline in batch mode using Apache beam Python
>> SDK. I am using one non-public dependency 'uplight-telemetry'. I have
>> specified it using parameter extra_package while creating pipeline_options
>> object. However, the pipeline loading is failing with an error *No
>> module named 'uplight_telemetry'*.
>> The code to create pipeline_options is as following-
>>
>> def __create_pipeline_options_dataflow(job_name):
>> # Set up the Dataflow runner options
>> gcp_project_id = os.environ.get(GCP_PROJECT_ID)
>> current_dir = os.path.dirname(os.path.abspath(__file__))
>> print("current_dir=", current_dir)
>> setup_file_path = os.path.join(current_dir, '..', '..', 'setup.py')
>> print("Set-up file path=", setup_file_path)
>> #TODO:Move file to proper location
>> uplight_telemetry_tar_file_path=os.path.join(current_dir, '..', 
>> '..','..','non-public-dependencies', 'uplight-telemetry-1.0.0.tar.gz')
>> # TODO:Move to environmental variables
>> pipeline_options = {
>> 'project': gcp_project_id,
>> 'region': "us-east1",
>> 'job_name': job_name,  # Provide a unique job name
>> 'temp_location': 
>> f'gs://{TAS_GCS_BUCKET_NAME_PREFIX}{os.getenv("UP_PLATFORM_ENV")}/temp',
>> 'staging_location': 
>> f'gs://{TAS_GCS_BUCKET_NAME_PREFIX}{os.getenv("UP_PLATFORM_ENV")}/staging',
>> 'runner': 'DataflowRunner',
>> 'save_main_session': True,
>> 'service_account_email': os.environ.get(SERVICE_ACCOUNT),
>> # 'network': f'projects/{gcp_project_id}/global/networks/default',
>> 'subnetwork': os.environ.get(SUBNETWORK_URL),
>> 'setup_file': setup_file_path,
>> 'extra_package': uplight_telemetry_tar_file_path
>> # 'template_location': 
>> 'gcr.io/dataflow-templates-base/python310-template-launcher-base'
>> }
>> print("Pipeline created for job-name", job_name)
>> logger.debug(f"pipeline_options created as {pipeline_options}")
>> return pipeline_options
>>
>> Why is it not trying to install this package from extra_package?
>>
>


Re: Streaming management exception in the sink target.

2023-12-05 Thread Robert Bradshaw via user
Currently error handling is implemented on sinks in an ad-hoc basis
(if at all) but John (cc'd) is looking at improving things here.

On Mon, Dec 4, 2023 at 10:25 AM Juan Romero  wrote:
>
> Hi guys. I want to ask you about how to deal with the scenario when the 
> target sink (eg: jdbc, kafka, bigquery, pubsub etc) fails for any reason and 
> i don't want to lost the message and create a bottleneck with many errors due 
> an hypothetical target sink problem,  and i want to use 
> with_excpetion_handling in order to get the message that failing to reach the 
> target and send the message to an other error topic. Any idea to solve this 
> scenario?


Re: [QUESTION] Why no auto labels?

2023-10-20 Thread Robert Bradshaw via user
I'll take another look at the PR. My inclination is still to use uuids
to uniquify. I think that's worth the cost to the readability hit (I'm
OK reducing this down to 6-8 hex digits which will still give very low
chances of collisions, though it doesn't solve the first one). If
someone cares about names more than this they can set them manually.

On Fri, Oct 20, 2023 at 9:30 AM Joey Tran  wrote:
>
> Just want to bump this discussion again. I'm introducing Beam to other 
> developers at my Schrodinger now and the first (of hopefully many!) developer 
> has started migrating our internal workflows to Beam. As I suspected though, 
> he's complained about the iteration cycles spent from using the same 
> transform without specifying a label and from using multiple assert_that's 
> without unique labels.
>
> From the java code[1], it looks like the same naming scheme is used (I think, 
> it's been a decade since I've read java so apologies if I'm misreading) as 
> the PR I posted
>
> [1] 
> https://github.com/apache/beam/blob/e7a6405800a83dd16437b8b1b372e020e010a042/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java#L630
>
> On Fri, Oct 13, 2023 at 1:32 PM Joey Tran  wrote:
>>
>>
>>
>> On Fri, Oct 13, 2023 at 1:18 PM Robert Bradshaw  wrote:
>>>
>>> On Fri, Oct 13, 2023 at 10:08 AM Joey Tran  
>>> wrote:
>>>>
>>>> Are there places on the SDK side that expect unique labels? Or in 
>>>> non-updateable runners?
>>>
>>>
>>> That's a good question. The label eventually ends up here: 
>>> https://github.com/apache/beam/blob/release-2.51.0/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto#L142
>>>  which is technically a violation of the spec if these are not unique 
>>> (though I guess Java already does this?). More importantly, though, the id 
>>> of the transform (the proto contains a map of strings -> transforms) must 
>>> be unique.
>>>
>>
>> Doesn't that suggest not raising an exception won't be a sufficient enough 
>> solution? But I suppose if the Java SDK does this already it's fine..?
>>
>>>
>>> Another option is to make the suffix a uuid rather than a single counter. 
>>> (This would still have issues with the first application possibly getting 
>>> mixed up with a "different" first application unless it was always 
>>> appended.)
>>>
>>
>> Is the marginal benefit (only the first application may be confused instead 
>> of possibly other applications) valuable enough to justify the marginal cost 
>> of the readability hit of adding uuids to the labels?
>>
>>
>>
>>
>>>> On Fri, Oct 13, 2023 at 12:52 PM Robert Bradshaw  
>>>> wrote:
>>>>>
>>>>>
>>>>> Thanks for the PR.
>>>>>
>>>>> I think we should follow Java and allow non-unique labels, but not 
>>>>> provide automatic uniquification, In particular, the danger of using a 
>>>>> counter is that one can get accidental (and potentially hard to check) 
>>>>> off-by-one collisions. As a concrete example, imagine one partitions a 
>>>>> dataset into two collections, each followed by a similarly-named 
>>>>> transform.
>>>>>
>>>>> --> B
>>>>>   /
>>>>> A
>>>>>  \
>>>>>--> B
>>>>>
>>>>> Uniquification would give something like
>>>>>
>>>>> --> B
>>>>>   /
>>>>> A
>>>>>  \
>>>>>--> B_2
>>>>>
>>>>> Suppose one then realizes there's a third case to handle, giving
>>>>>
>>>>> --> B
>>>>>   /
>>>>> A --> B
>>>>>  \
>>>>>--> B
>>>>>
>>>>> But this would be uniquified to
>>>>>
>>>>> --> B
>>>>>   /
>>>>> A --> B_2
>>>>>  \
>>>>>--> B_3
>>>>>
>>>>> where the old B_2 got renamed to B_3 and a new B_2 got put in its place. 
>>>>> This is bad because an updating runner would then attribute old B_2's 
>>>>> state to the new B_2 (and also possibly mis-direct any inflight 
>>>>> messages). At least with the old, intersecting names we can detect this 
>>>>> problem rather than silently give corrupt data.
>>>>>

Re: Advanced Composite Transform Documentation

2023-10-19 Thread Robert Bradshaw via user
On Thu, Oct 19, 2023 at 2:00 PM Joey Tran  wrote:
>
> For the python SDK, is there somewhere where we document more "advance" 
> composite transform operations?

I'm not sure, but
https://beam.apache.org/documentation/programming-guide/ is the
canonical palace information like this should probaby be. Maybe this
users list will serve as a searchable resource at least. (Stack
overflow can be good sometimes as well.)

> e.g. I've been stumbling with questions like "How do I use a transform that 
> expects a PBegin in a composite transform",

As you mentioned, you do "pipeline | Transform," and you can get the
pipeline object from any PCollection you have in hand.

> "What's the proper way to return multiple output pcollections?",

You can return them as a(n ordinary) tuple or a dict (with string
keys). This is best expressed with the typescript implementation
(https://github.com/apache/beam/blob/master/sdks/typescript/src/apache_beam/pvalue.ts#L172
) but works for Python too.

> "What's the proper way to typehint multiple output pcollections?"

Typehinting for multiple outputs is still a work in progress, but I
would just add standard Python typehints to the expand method (which
is where we'd pick them up).

> ChatGPT helped me figure out the first question (use `pcoll.pipeline`), the 
> second question I guessed and the third question I'm still unsure about.
>
> Tried looking for these answers in the documentation but might just be 
> missing it.
>
> Best,
> Joey


Re: [QUESTION] Why no auto labels?

2023-10-13 Thread Robert Bradshaw via user
On Fri, Oct 13, 2023 at 10:08 AM Joey Tran 
wrote:

> That makes sense. Would you suggest the new option simply suppress the
> RuntimeError and use the non-unique label?
>

Yes. (Or, rather, not raise it.)


> Are there places on the SDK side that expect unique labels? Or in
> non-updateable runners?
>

That's a good question. The label eventually ends up here:
https://github.com/apache/beam/blob/release-2.51.0/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto#L142
which is technically a violation of the spec if these are not unique
(though I guess Java already does this?). More importantly, though, the id
of the transform (the proto contains a map of strings -> transforms) must
be unique.


> Would making `--auto_unique_labels` a mutually exclusive option with
> streaming be a reasonable option? Not sure if stream/batch-specific options
> are a good idea or not... but if there's precedent, then it'd be an easy
> option
>

It's not always possible to even tell at pipeline construction whether the
pipeline will be run in streaming mode. (I suppose one could check later?)
It's generally something we try to avoid though.

Another option is to make the suffix a uuid rather than a single counter.
(This would still have issues with the first application possibly getting
mixed up with a "different" first application unless it was always
appended.)


> On Fri, Oct 13, 2023 at 12:52 PM Robert Bradshaw 
> wrote:
>
>>
>> Thanks for the PR.
>>
>> I think we should follow Java and allow non-unique labels, but not
>> provide automatic uniquification, In particular, the danger of using a
>> counter is that one can get accidental (and potentially hard to check)
>> off-by-one collisions. As a concrete example, imagine one partitions a
>> dataset into two collections, each followed by a similarly-named transform.
>>
>> --> B
>>   /
>> A
>>  \
>>--> B
>>
>> Uniquification would give something like
>>
>> --> B
>>   /
>> A
>>  \
>>--> B_2
>>
>> Suppose one then realizes there's a third case to handle, giving
>>
>> --> B
>>   /
>> A --> B
>>  \
>>--> B
>>
>> But this would be uniquified to
>>
>> --> B
>>   /
>> A --> B_2
>>  \
>>--> B_3
>>
>> where the old B_2 got renamed to B_3 and a new B_2 got put in its place.
>> This is bad because an updating runner would then attribute old B_2's state
>> to the new B_2 (and also possibly mis-direct any inflight messages). At
>> least with the old, intersecting names we can detect this problem
>> rather than silently give corrupt data.
>>
>>
>> On Fri, Oct 13, 2023 at 7:15 AM Joey Tran 
>> wrote:
>>
>>> For posterity: https://github.com/apache/beam/pull/28984
>>>
>>> On Tue, Oct 10, 2023 at 7:29 PM Robert Bradshaw 
>>> wrote:
>>>
>>>> I would definitely support a PR making this an option. Changing the
>>>> default would be a rather big change that would require more thought.
>>>>
>>>> On Tue, Oct 10, 2023 at 4:24 PM Joey Tran 
>>>> wrote:
>>>>
>>>>> Bump on this. Sorry to pester - I'm trying to get a few teams to adopt
>>>>> Apache Beam at my company and I'm trying to foresee parts of the API they
>>>>> might find inconvenient.
>>>>>
>>>>> If there's a conclusion to make the behavior similar to java, I'm
>>>>> happy to put up a PR
>>>>>
>>>>> On Thu, Oct 5, 2023, 12:49 PM Joey Tran 
>>>>> wrote:
>>>>>
>>>>>> Is it really toggleable in Java? I imagine that if it's a toggle it'd
>>>>>> be a very sticky toggle since it'd be easy for PTransforms to 
>>>>>> accidentally
>>>>>> rely on it.
>>>>>>
>>>>>> On Thu, Oct 5, 2023 at 12:33 PM Robert Bradshaw 
>>>>>> wrote:
>>>>>>
>>>>>>> Huh. This used to be a hard error in Java, but I guess it's
>>>>>>> togglable with an option now. We should probably add the option to 
>>>>>>> toggle
>>>>>>> Python too. (Unclear what the default should be, but this probably ties
>>>>>>> into re-thinking how pipeline update should work.)
>>>>>>>
>>>>>>> On Thu, Oct 5, 2023 at 4:58 AM Joey Tran 
>>>>>>> wrote:
>>>>>>>
>>>>>

Re: [QUESTION] Why no auto labels?

2023-10-13 Thread Robert Bradshaw via user
Thanks for the PR.

I think we should follow Java and allow non-unique labels, but not provide
automatic uniquification, In particular, the danger of using a counter is
that one can get accidental (and potentially hard to check) off-by-one
collisions. As a concrete example, imagine one partitions a dataset into
two collections, each followed by a similarly-named transform.

--> B
  /
A
 \
   --> B

Uniquification would give something like

--> B
  /
A
 \
   --> B_2

Suppose one then realizes there's a third case to handle, giving

--> B
  /
A --> B
 \
   --> B

But this would be uniquified to

--> B
  /
A --> B_2
 \
   --> B_3

where the old B_2 got renamed to B_3 and a new B_2 got put in its place.
This is bad because an updating runner would then attribute old B_2's state
to the new B_2 (and also possibly mis-direct any inflight messages). At
least with the old, intersecting names we can detect this problem
rather than silently give corrupt data.


On Fri, Oct 13, 2023 at 7:15 AM Joey Tran  wrote:

> For posterity: https://github.com/apache/beam/pull/28984
>
> On Tue, Oct 10, 2023 at 7:29 PM Robert Bradshaw 
> wrote:
>
>> I would definitely support a PR making this an option. Changing the
>> default would be a rather big change that would require more thought.
>>
>> On Tue, Oct 10, 2023 at 4:24 PM Joey Tran 
>> wrote:
>>
>>> Bump on this. Sorry to pester - I'm trying to get a few teams to adopt
>>> Apache Beam at my company and I'm trying to foresee parts of the API they
>>> might find inconvenient.
>>>
>>> If there's a conclusion to make the behavior similar to java, I'm happy
>>> to put up a PR
>>>
>>> On Thu, Oct 5, 2023, 12:49 PM Joey Tran 
>>> wrote:
>>>
>>>> Is it really toggleable in Java? I imagine that if it's a toggle it'd
>>>> be a very sticky toggle since it'd be easy for PTransforms to accidentally
>>>> rely on it.
>>>>
>>>> On Thu, Oct 5, 2023 at 12:33 PM Robert Bradshaw 
>>>> wrote:
>>>>
>>>>> Huh. This used to be a hard error in Java, but I guess it's togglable
>>>>> with an option now. We should probably add the option to toggle Python 
>>>>> too.
>>>>> (Unclear what the default should be, but this probably ties into
>>>>> re-thinking how pipeline update should work.)
>>>>>
>>>>> On Thu, Oct 5, 2023 at 4:58 AM Joey Tran 
>>>>> wrote:
>>>>>
>>>>>> Makes sense that the requirement is the same, but is the label
>>>>>> auto-generation behavior the same? I modified the BeamJava
>>>>>> wordcount example[1] to do the regex filter twice in a row, and unlike 
>>>>>> the
>>>>>> BeamPython example I posted before, it just warns instead of throwing an
>>>>>> exception.
>>>>>>
>>>>>> Tangentially, is it expected that the Beam playground examples don't
>>>>>> have a way to see the outputs of a run example? I have a vague memory 
>>>>>> that
>>>>>> there used to be a way to navigate to an output file after it's generated
>>>>>> but not sure if I just dreamt that up. Playing with the examples, I 
>>>>>> wasn't
>>>>>> positive if my runs were actually succeeding or not based on the stdout
>>>>>> alone.
>>>>>>
>>>>>> [1] https://play.beam.apache.org/?sdk=java=mI7WUeje_r2
>>>>>> <https://play.beam.apache.org/?sdk=java=mI7WUeje_r2>
>>>>>> [2] https://play.beam.apache.org/?sdk=python=hIrm7jvCamW
>>>>>>
>>>>>> On Wed, Oct 4, 2023 at 12:16 PM Robert Bradshaw via user <
>>>>>> user@beam.apache.org> wrote:
>>>>>>
>>>>>>> BeamJava and BeamPython have the exact same behavior:
>>>>>>> transform names within must be distinct [1]. This is because we do not
>>>>>>> necessarily know at pipeline construction time if the pipeline will be
>>>>>>> streaming or batch, or if it will be updated in the future, so the 
>>>>>>> decision
>>>>>>> was made to impose this restriction up front. Both will auto-generate a
>>>>>>> name for you if one is not given, but will do so deterministically (not
>>>>>>> depending on some global context) to avoid potential update problems.
>>>>>>>
>>>>>>> [1] Not

Re: [QUESTION] Why no auto labels?

2023-10-05 Thread Robert Bradshaw via user
Huh. This used to be a hard error in Java, but I guess it's togglable
with an option now. We should probably add the option to toggle Python too.
(Unclear what the default should be, but this probably ties into
re-thinking how pipeline update should work.)

On Thu, Oct 5, 2023 at 4:58 AM Joey Tran  wrote:

> Makes sense that the requirement is the same, but is the label
> auto-generation behavior the same? I modified the BeamJava
> wordcount example[1] to do the regex filter twice in a row, and unlike the
> BeamPython example I posted before, it just warns instead of throwing an
> exception.
>
> Tangentially, is it expected that the Beam playground examples don't have
> a way to see the outputs of a run example? I have a vague memory that there
> used to be a way to navigate to an output file after it's generated but not
> sure if I just dreamt that up. Playing with the examples, I wasn't positive
> if my runs were actually succeeding or not based on the stdout alone.
>
> [1] https://play.beam.apache.org/?sdk=java=mI7WUeje_r2
> <https://play.beam.apache.org/?sdk=java=mI7WUeje_r2>
> [2] https://play.beam.apache.org/?sdk=python=hIrm7jvCamW
>
> On Wed, Oct 4, 2023 at 12:16 PM Robert Bradshaw via user <
> user@beam.apache.org> wrote:
>
>> BeamJava and BeamPython have the exact same behavior: transform names
>> within must be distinct [1]. This is because we do not necessarily know at
>> pipeline construction time if the pipeline will be streaming or batch, or
>> if it will be updated in the future, so the decision was made to impose
>> this restriction up front. Both will auto-generate a name for you if one is
>> not given, but will do so deterministically (not depending on some global
>> context) to avoid potential update problems.
>>
>> [1] Note that this applies to the fully qualified transform name, so the
>> naming only has to be distinct within a composite transform (or at the top
>> level--the pipeline itself is isomorphic to a single composite transform).
>>
>> On Wed, Oct 4, 2023 at 3:43 AM Joey Tran 
>> wrote:
>>
>>> Cross posting this thread to dev@ to see if this is intentional
>>> behavior or if it's something worth changing for the python SDK
>>>
>>> On Tue, Oct 3, 2023, 10:10 PM XQ Hu via user 
>>> wrote:
>>>
>>>> That suggests the default label is created as that, which indeed causes
>>>> the duplication error.
>>>>
>>>> On Tue, Oct 3, 2023 at 9:15 PM Joey Tran 
>>>> wrote:
>>>>
>>>>> Not sure what that suggests
>>>>>
>>>>> On Tue, Oct 3, 2023, 6:24 PM XQ Hu via user 
>>>>> wrote:
>>>>>
>>>>>> Looks like this is the current behaviour. If you have `t =
>>>>>> beam.Filter(identity_filter)`, `t.label` is defined as
>>>>>> `Filter(identity_filter)`.
>>>>>>
>>>>>> On Mon, Oct 2, 2023 at 9:25 AM Joey Tran 
>>>>>> wrote:
>>>>>>
>>>>>>> You don't have to specify the names if the callable you pass in is
>>>>>>> /different/ for two `beam.Map`s, but  if the callable is the same you 
>>>>>>> must
>>>>>>> specify a label. For example, the below will raise an exception:
>>>>>>>
>>>>>>> ```
>>>>>>> | beam.Filter(identity_filter)
>>>>>>> | beam.Filter(identity_filter)
>>>>>>> ```
>>>>>>>
>>>>>>> Here's an example on playground that shows the error message you get
>>>>>>> [1]. I marked every line I added with a "# ++".
>>>>>>>
>>>>>>> It's a contrived example, but using a map or filter at the same
>>>>>>> pipeline level probably comes up often, at least in my inexperience. For
>>>>>>> example, you. might have a pipeline that partitions a pcoll into three
>>>>>>> different pcolls, runs some transforms on them, and then runs the same 
>>>>>>> type
>>>>>>> of filter on each of them.
>>>>>>>
>>>>>>> The case that happens most often for me is using the `assert_that`
>>>>>>> [2] testing transform. In this case, I think often users will really 
>>>>>>> have
>>>>>>> no need for a disambiguating label as they're often just writing unit 
>>>>>>> tests
>>>>>>> that tes

Re: [QUESTION] Why no auto labels?

2023-10-04 Thread Robert Bradshaw via user
BeamJava and BeamPython have the exact same behavior: transform names
within must be distinct [1]. This is because we do not necessarily know at
pipeline construction time if the pipeline will be streaming or batch, or
if it will be updated in the future, so the decision was made to impose
this restriction up front. Both will auto-generate a name for you if one is
not given, but will do so deterministically (not depending on some global
context) to avoid potential update problems.

[1] Note that this applies to the fully qualified transform name, so the
naming only has to be distinct within a composite transform (or at the top
level--the pipeline itself is isomorphic to a single composite transform).

On Wed, Oct 4, 2023 at 3:43 AM Joey Tran  wrote:

> Cross posting this thread to dev@ to see if this is intentional behavior
> or if it's something worth changing for the python SDK
>
> On Tue, Oct 3, 2023, 10:10 PM XQ Hu via user  wrote:
>
>> That suggests the default label is created as that, which indeed causes
>> the duplication error.
>>
>> On Tue, Oct 3, 2023 at 9:15 PM Joey Tran 
>> wrote:
>>
>>> Not sure what that suggests
>>>
>>> On Tue, Oct 3, 2023, 6:24 PM XQ Hu via user 
>>> wrote:
>>>
 Looks like this is the current behaviour. If you have `t =
 beam.Filter(identity_filter)`, `t.label` is defined as
 `Filter(identity_filter)`.

 On Mon, Oct 2, 2023 at 9:25 AM Joey Tran 
 wrote:

> You don't have to specify the names if the callable you pass in is
> /different/ for two `beam.Map`s, but  if the callable is the same you must
> specify a label. For example, the below will raise an exception:
>
> ```
> | beam.Filter(identity_filter)
> | beam.Filter(identity_filter)
> ```
>
> Here's an example on playground that shows the error message you get
> [1]. I marked every line I added with a "# ++".
>
> It's a contrived example, but using a map or filter at the same
> pipeline level probably comes up often, at least in my inexperience. For
> example, you. might have a pipeline that partitions a pcoll into three
> different pcolls, runs some transforms on them, and then runs the same 
> type
> of filter on each of them.
>
> The case that happens most often for me is using the `assert_that` [2]
> testing transform. In this case, I think often users will really have no
> need for a disambiguating label as they're often just writing unit tests
> that test a few different properties of their workflow.
>
> [1] https://play.beam.apache.org/?sdk=python=hIrm7jvCamW
> [2]
> https://beam.apache.org/releases/pydoc/2.29.0/apache_beam.testing.util.html#apache_beam.testing.util.assert_that
>
> On Mon, Oct 2, 2023 at 9:08 AM Bruno Volpato via user <
> user@beam.apache.org> wrote:
>
>> If I understand the question correctly, you don't have to specify
>> those names.
>>
>> As Reuven pointed out, it is probably a good idea so you have a
>> stable / deterministic graph.
>> But in the Python SDK, you can simply use pcollection | map_fn,
>> instead of pcollection | 'Map' >> map_fn.
>>
>> See an example here
>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/cookbook/group_with_coder.py#L100-L116
>>
>>
>> On Sun, Oct 1, 2023 at 9:08 PM Joey Tran 
>> wrote:
>>
>>> Hmm, I'm not sure what you mean by "updating pipelines in place".
>>> Can you elaborate?
>>>
>>> I forgot to mention my question is posed from the context of a
>>> python SDK user, and afaict, there doesn't seem to be an obvious way to
>>> autogenerate names/labels. Hearing that the java SDK supports it makes 
>>> me
>>> wonder if the python SDK could support it as well though... (If so, I'd 
>>> be
>>> happy to do implement it). Currently, it's fairly tedious to have to 
>>> name
>>> every instance of a transform that you might reuse in a pipeline, e.g. 
>>> when
>>> reapplying the same Map on different pcollections.
>>>
>>> On Sun, Oct 1, 2023 at 8:12 PM Reuven Lax via user <
>>> user@beam.apache.org> wrote:
>>>
 Are you talking about transform names? The main reason was because
 for runners that support updating pipelines in place, the only way to 
 do so
 safely is if the runner can perfectly identify which transforms in the 
 new
 graph match the ones in the old graph. There's no good way to auto 
 generate
 names that will stay stable across updates - even small changes to the
 pipeline might change the order of nodes in the graph, which could 
 result
 in a corrupted update.

 However, if you don't care about update, Beam can auto generate
 these names for you! When you call PCollection.apply (if using 
 BeamJava),
 

Re: UDF/UADF over complex structures

2023-09-28 Thread Robert Bradshaw via user
Yes, for sure. This is one of the areas Beam excels vs. more simple tools
like SQL. You can write arbitrary code to iterate over arbitrary structures
in the typical Java/Python/Go/Typescript/Scala/[pick your language] way. In
the Beam nomenclature. UDFs correspond to DoFns and UDAFs correspond to
CombineFns.

On Thu, Sep 28, 2023 at 4:23 AM Balogh, György  wrote:

> Hi,
> I've complex nested structure in my input data. Is it possible to have
> UDF/UDAF taking nested structure as input? I'm using java.
> Outputting nested structure is also a question.
> Thank you,
> Gyorgy
> --
>
> György Balogh
> CTO
> E gyorgy.bal...@ultinous.com 
> M +36 30 270 8342 <+36%2030%20270%208342>
> A HU, 1117 Budapest, Budafoki út 209.
> W www.ultinous.com
>


Re: [Question] Side Input pattern

2023-09-15 Thread Robert Bradshaw via user
Yes, if you block that bundle will not progress. Generally streaming
pipelines are processing many (hundreds) of bundles in parallel (e.g. one
per key on dataflow), but at the source there may not be as much available
parallelism and it's better to return than wait if there are no elements
left ot read.

On Fri, Sep 15, 2023 at 10:42 AM Ruben Vargas 
wrote:

> Hello thanks for the reply
>
> I was digging into the UnboundedReader interface, and I observed that some
> implementations block the entire progress of the other inputs when they get
> blocked into the advance() method, (probably waiting if there are new
> elements or not), an example of this is the AWS SQSIO  implementation. if I
> return true or false immediately the progress of the main input continues,
> but If I wait for results on the advance() method, all of other inputs get
> blocked
>
>
> Is that assumption correct?
>
>
>
> El El vie, 15 de septiembre de 2023 a la(s) 10:59, Robert Bradshaw via
> user  escribió:
>
>> Beam will block on side inputs until at least one value is available (or
>> the watermark has advanced such that we can be sure one will never become
>> available, which doesn't really apply to the global window case).
>> After that, workers generally cache the side input value (for performance
>> reasons) but may periodically re-fetch it (the exact cadence probably
>> depends on the runner implementation).
>>
>> On Tue, Sep 12, 2023 at 10:34 PM Ruben Vargas 
>> wrote:
>>
>>> Hello Everyone
>>>
>>> I have a question, I have on my pipeline one side input that
>>> fetches some configurations from an API endpoint each 30 seconds, my
>>> question is this.
>>>
>>>
>>> I have something similar to what is showed in the side input patterns
>>> documentation
>>>
>>>  PCollectionView> map =
>>> p.apply(GenerateSequence.from(0).withRate(1,
>>> Duration.standardSeconds(5L)))
>>> .apply(
>>> ParDo.of(
>>> new DoFn>() {
>>>
>>>   @ProcessElement
>>>   public void process(
>>>   @Element Long input,
>>>   @Timestamp Instant timestamp,
>>>   OutputReceiver> o) {
>>> call HTTP endpoint here!!
>>>   }
>>> }))
>>> .apply(
>>> Window.>into(new GlobalWindows())
>>>
>>> .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
>>> .discardingFiredPanes())
>>> .apply(Latest.globally())
>>> .apply(View.asSingleton());
>>>
>>> What happens if for example the HTTP endpoint takes time to respond due
>>> some network issues and/or the amount of data. Is this gonna introduce
>>> delays on my main pipeline? Is the main pipeline blocked until the pardo
>>> that processes the side input ends?
>>>
>>> I don't care too much about the consistency here, I mean if the
>>> configuration changed in the Time T1 I don't care if some registries with
>>> T2 timestamp are processed with the configuration version of T1.
>>>
>>>
>>> Regards.
>>>
>>>


Re: [Question] Side Input pattern

2023-09-15 Thread Robert Bradshaw via user
Beam will block on side inputs until at least one value is available (or
the watermark has advanced such that we can be sure one will never become
available, which doesn't really apply to the global window case).
After that, workers generally cache the side input value (for performance
reasons) but may periodically re-fetch it (the exact cadence probably
depends on the runner implementation).

On Tue, Sep 12, 2023 at 10:34 PM Ruben Vargas 
wrote:

> Hello Everyone
>
> I have a question, I have on my pipeline one side input that fetches some
> configurations from an API endpoint each 30 seconds, my question is this.
>
>
> I have something similar to what is showed in the side input patterns
> documentation
>
>  PCollectionView> map =
> p.apply(GenerateSequence.from(0).withRate(1,
> Duration.standardSeconds(5L)))
> .apply(
> ParDo.of(
> new DoFn>() {
>
>   @ProcessElement
>   public void process(
>   @Element Long input,
>   @Timestamp Instant timestamp,
>   OutputReceiver> o) {
> call HTTP endpoint here!!
>   }
> }))
> .apply(
> Window.>into(new GlobalWindows())
>
> .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
> .discardingFiredPanes())
> .apply(Latest.globally())
> .apply(View.asSingleton());
>
> What happens if for example the HTTP endpoint takes time to respond due
> some network issues and/or the amount of data. Is this gonna introduce
> delays on my main pipeline? Is the main pipeline blocked until the pardo
> that processes the side input ends?
>
> I don't care too much about the consistency here, I mean if the
> configuration changed in the Time T1 I don't care if some registries with
> T2 timestamp are processed with the configuration version of T1.
>
>
> Regards.
>
>


Re: "Decorator" pattern for PTramsforms

2023-09-15 Thread Robert Bradshaw via user
On Fri, Sep 15, 2023 at 9:46 AM Reuven Lax via user 
wrote:

> Creating composite DoFns is tricky today due to how they are implemented
> (via annotated methods).
>

Note that this depends on the language. This should be really easy to do
from Python.


> However providing such a method to compose DoFns would be very useful IMO.
>

+1


> On Fri, Sep 15, 2023 at 9:33 AM Joey Tran 
> wrote:
>
>> Yeah for (1) the concern would be adding a shuffle/fusion break and (2)
>> sounds like the likely solution, was just hoping there'd be one that could
>> wrap at the PTransform level but I realize now the PTransform abstraction
>> is too general as you mentioned to do something like that.
>>
>> (2) will be likely what we do, though now I'm wondering if it might be
>> possible to create a ParDo wrapper that can take a ParDo, extract it's
>> dofn, wrap it, and return a new ParDo
>>
>> On Fri, Sep 15, 2023, 11:53 AM Robert Bradshaw via user <
>> user@beam.apache.org> wrote:
>>
>>> +1 to looking at composite transforms. You could even have a composite
>>> transform that takes another transform as one of its construction arguments
>>> and whose expand method does pre- and post-processing to the inputs/outputs
>>> before/after applying the transform in question. (You could even implement
>>> this as a Python decorator if you really wanted, either decorating the
>>> expand method itself or the full class...)
>>>
>>> One of the difficulties is that for a general transform there isn't
>>> necessarily a 1:N relationship between outputs and inputs as one has for a
>>> DoFn (especially if there is any aggregation involved). There are, however,
>>> two partial solutions that might help.
>>>
>>> (1) You can do a CombineGlobally with a CombineFn (Like Sample) that
>>> returns at most N elements. You could do this with a CombinePerKey if you
>>> can come up with a reasonable key (e.g. the id of your input elements) that
>>> the limit should be a applied to. Note that this may cause a lot of data to
>>> be shuffled (though due to combiner lifting, no more than N per bundle).
>>>
>>> (2) You could have a DoFn that limits to N per bundle by initializing a
>>> counter in its start_bundle and passing elements through until the counter
>>> reaches a threshold. (Again, one could do this per id if one is available.)
>>> It wouldn't stop production of the elements, but if things get fused it
>>> would still likely be fairly cheap.
>>>
>>> Both of these could be prepended to the problematic consuming PTransform
>>> as well.
>>>
>>> - Robert
>>>
>>>
>>>
>>> On Fri, Sep 15, 2023 at 8:13 AM Joey Tran 
>>> wrote:
>>>
>>>> I'm aware of composite transforms and of the distributed nature of
>>>> PTransforms. I'm not suggesting limiting the entire set and my example was
>>>> more illustrative than the actual use case.
>>>>
>>>> My actual use case is basically: I have multiple PTransforms, and let's
>>>> say most of them average ~100 generated outputs for a single input. Most of
>>>> these PTransforms will occasionally run into an input though that might
>>>> output maybe 1M outputs. This can cause issues if for example there are
>>>> transforms that follow it that require a lot of compute per input.
>>>>
>>>> The simplest way to deal with this is to modify the `DoFn`s in our
>>>> Ptransforms and add a limiter in the logic (e.g. `if num_outputs_generated
>>>> >= OUTPUTS_PER_INPUT_LIMIT: return`). We could duplicate this logic across
>>>> our transforms, but it'd be much cleaner if we could lift up this limiting
>>>> logic out of the application logic and have some generic wrapper that
>>>> extends our transforms.
>>>>
>>>> Thanks for the discussion!
>>>>
>>>> On Fri, Sep 15, 2023 at 10:29 AM Alexey Romanenko <
>>>> aromanenko@gmail.com> wrote:
>>>>
>>>>> I don’t think it’s possible to extend in a way that you are asking
>>>>> (like, Java classes “*extend*"). Though, you can create your own
>>>>> composite PTransform that will incorporate one or several others inside
>>>>> *“expand()”* method. Actually, most of the Beam native PTransforms
>>>>> are composite transforms. Please, take a look on doc and examples [1]
>>>>>
>>>>> Regarding your example, please, be aware 

Re: "Decorator" pattern for PTramsforms

2023-09-15 Thread Robert Bradshaw via user
+1 to looking at composite transforms. You could even have a composite
transform that takes another transform as one of its construction arguments
and whose expand method does pre- and post-processing to the inputs/outputs
before/after applying the transform in question. (You could even implement
this as a Python decorator if you really wanted, either decorating the
expand method itself or the full class...)

One of the difficulties is that for a general transform there isn't
necessarily a 1:N relationship between outputs and inputs as one has for a
DoFn (especially if there is any aggregation involved). There are, however,
two partial solutions that might help.

(1) You can do a CombineGlobally with a CombineFn (Like Sample) that
returns at most N elements. You could do this with a CombinePerKey if you
can come up with a reasonable key (e.g. the id of your input elements) that
the limit should be a applied to. Note that this may cause a lot of data to
be shuffled (though due to combiner lifting, no more than N per bundle).

(2) You could have a DoFn that limits to N per bundle by initializing a
counter in its start_bundle and passing elements through until the counter
reaches a threshold. (Again, one could do this per id if one is available.)
It wouldn't stop production of the elements, but if things get fused it
would still likely be fairly cheap.

Both of these could be prepended to the problematic consuming PTransform as
well.

- Robert



On Fri, Sep 15, 2023 at 8:13 AM Joey Tran  wrote:

> I'm aware of composite transforms and of the distributed nature of
> PTransforms. I'm not suggesting limiting the entire set and my example was
> more illustrative than the actual use case.
>
> My actual use case is basically: I have multiple PTransforms, and let's
> say most of them average ~100 generated outputs for a single input. Most of
> these PTransforms will occasionally run into an input though that might
> output maybe 1M outputs. This can cause issues if for example there are
> transforms that follow it that require a lot of compute per input.
>
> The simplest way to deal with this is to modify the `DoFn`s in our
> Ptransforms and add a limiter in the logic (e.g. `if num_outputs_generated
> >= OUTPUTS_PER_INPUT_LIMIT: return`). We could duplicate this logic across
> our transforms, but it'd be much cleaner if we could lift up this limiting
> logic out of the application logic and have some generic wrapper that
> extends our transforms.
>
> Thanks for the discussion!
>
> On Fri, Sep 15, 2023 at 10:29 AM Alexey Romanenko <
> aromanenko@gmail.com> wrote:
>
>> I don’t think it’s possible to extend in a way that you are asking (like,
>> Java classes “*extend*"). Though, you can create your own composite
>> PTransform that will incorporate one or several others inside
>> *“expand()”* method. Actually, most of the Beam native PTransforms are
>> composite transforms. Please, take a look on doc and examples [1]
>>
>> Regarding your example, please, be aware that all PTransforms are
>> supposed to be executed in distributed environment and the order of records
>> is not guaranteed. So, limiting the whole output by fixed number of records
>> can be challenging - you’d need to make sure that it will be processed on
>> only one worker, that means that you’d need to shuffle all your records by
>> the same key and probably sort the records in way that you need.
>>
>> Did you consider to use “*org.apache.beam.sdk.transforms.Top*” for that?
>> [2]
>>
>> If it doesn’t work for you, could you provide more details of your use
>> case? Then we probably can propose the more suitable solutions for that.
>>
>> [1]
>> https://beam.apache.org/documentation/programming-guide/#composite-transforms
>> [2]
>> https://beam.apache.org/releases/javadoc/2.50.0/org/apache/beam/sdk/transforms/Top.html
>>
>> —
>> Alexey
>>
>> On 15 Sep 2023, at 14:22, Joey Tran  wrote:
>>
>> Is there a way to extend already defined PTransforms? My question is
>> probably better illustrated with an example. Let's say I have a PTransform
>> that generates a very variable number of outputs. I'd like to "wrap" that
>> PTransform such that if it ever creates more than say 1,000 outputs, then I
>> just take the first 1,000 outputs without generating the rest of the
>> outputs.
>>
>> It'd be trivial if I have access to the DoFn, but what if the PTransform
>> in question doesn't expose the `DoFn`?
>>
>>
>>


Re: Options for visualizing the pipeline DAG

2023-09-01 Thread Robert Bradshaw via user
(As an aside, I think all of these options would make for a great blog post
if anyone is interested in authoring one of those...)

On Fri, Sep 1, 2023 at 9:26 AM Robert Bradshaw  wrote:

> You can also use Python's RenderRunner, e.g.
>
>   python -m apache_beam.examples.wordcount --output out.txt \
> --runner=apache_beam.runners.render.RenderRunner \
> --render_output=pipeline.svg
>
> This also has an interactive mode, triggered by passing --port=N (where 0
> can be used to pick an unused port) which vends the graph as a local web
> service. This allows one to expand/collapse composites for easier
> exploration. Any --render_output arguments that are passed will get
> re-rendered as you edit the graph. (It uses graphviz under the hood, so can
> render any of those supported formats.)
>
> For rendering non-Python pipelines, one can start this up as a local
> portable "runner"
>
>   python -m apache_beam.runners.render
>
> and then "submit" this job from your other SDK over the jobs API to view
> it.
>
> [image: pipeline.png]
>
>
>
> On Fri, Sep 1, 2023 at 7:13 AM Joey Tran 
> wrote:
>
>> Perfect, `pipeline_graph` python module in the stack overflow post [1]
>> was exactly what I was looking for. The dependencies I'm working with are a
>> bit heavyweight and likely difficult to install into a notebook, so I was
>> looking for something I could do on my local machine.
>>
>> Thanks!
>> Joey
>>
>> [1] -
>> https://stackoverflow.com/questions/72592971/way-to-visualize-beam-pipeline-run-with-directrunner
>>
>> On Fri, Sep 1, 2023 at 8:40 AM Danny McCormick via user <
>> user@beam.apache.org> wrote:
>>
>>> Hey Joey,
>>>
>>> Dataflow and Beam playground are 2 options as you mentioned, locally
>>> many SDKs have local runner options with a visual component. For example,
>>> in Python you can use the interactive runner with the
>>> apache-beam-jupyterlab-sidepanel extension
>>> <https://cloud.google.com/dataflow/docs/guides/interactive-pipeline-development#visualize_the_data_through_the_interactive_beam_inspector>
>>> to view pipelines visually locally (this is similar to what the notebooks
>>> you reference are doing). You can also just call some of these pieces
>>> directly
>>> <https://stackoverflow.com/questions/72592971/way-to-visualize-beam-pipeline-run-with-directrunner>
>>> without an extension. Go has a dot runner
>>> <https://pkg.go.dev/github.com/apache/beam/sdks/v2@v2.50.0/go/pkg/beam/runners/dot>
>>> that produces a visual representation of a pipeline. Java has a similar dot
>>> renderer <https://mehmandarov.com/apache-beam-pipeline-graph/>.
>>>
>>> Thanks,
>>> Danny
>>>
>>> On Thu, Aug 31, 2023 at 6:38 PM Joey Tran 
>>> wrote:
>>>
>>>> Hi all,
>>>>
>>>> What're all the current options for visualizing a pipeline? I'm
>>>> guessing Dataflow has a visualization. I saw that there are also Apache
>>>> Beam notebooks through GCP, and I'm aware of the Beam playground, but is
>>>> there an easy way to create and view the visualization locally? For
>>>> example, I might have a large codebase that's used to construct and run a
>>>> pipeline, and in this case I don't think any of those three solutions would
>>>> be very easy to use to visualize my pipeline (though I could be wrong)
>>>>
>>>> Best,
>>>> Joey
>>>>
>>>> --
>>>>
>>>> Joey Tran | Senior Developer Il | AutoDesigner TL
>>>>
>>>> *he/him*
>>>>
>>>> [image: Schrödinger, Inc.] <https://schrodinger.com/>
>>>>
>>>


Re: Options for visualizing the pipeline DAG

2023-09-01 Thread Robert Bradshaw via user
You can also use Python's RenderRunner, e.g.

  python -m apache_beam.examples.wordcount --output out.txt \
--runner=apache_beam.runners.render.RenderRunner \
--render_output=pipeline.svg

This also has an interactive mode, triggered by passing --port=N (where 0
can be used to pick an unused port) which vends the graph as a local web
service. This allows one to expand/collapse composites for easier
exploration. Any --render_output arguments that are passed will get
re-rendered as you edit the graph. (It uses graphviz under the hood, so can
render any of those supported formats.)

For rendering non-Python pipelines, one can start this up as a local
portable "runner"

  python -m apache_beam.runners.render

and then "submit" this job from your other SDK over the jobs API to view
it.

[image: pipeline.png]



On Fri, Sep 1, 2023 at 7:13 AM Joey Tran  wrote:

> Perfect, `pipeline_graph` python module in the stack overflow post [1] was
> exactly what I was looking for. The dependencies I'm working with are a bit
> heavyweight and likely difficult to install into a notebook, so I was
> looking for something I could do on my local machine.
>
> Thanks!
> Joey
>
> [1] -
> https://stackoverflow.com/questions/72592971/way-to-visualize-beam-pipeline-run-with-directrunner
>
> On Fri, Sep 1, 2023 at 8:40 AM Danny McCormick via user <
> user@beam.apache.org> wrote:
>
>> Hey Joey,
>>
>> Dataflow and Beam playground are 2 options as you mentioned, locally many
>> SDKs have local runner options with a visual component. For example, in
>> Python you can use the interactive runner with the
>> apache-beam-jupyterlab-sidepanel extension
>> 
>> to view pipelines visually locally (this is similar to what the notebooks
>> you reference are doing). You can also just call some of these pieces
>> directly
>> 
>> without an extension. Go has a dot runner
>> 
>> that produces a visual representation of a pipeline. Java has a similar dot
>> renderer .
>>
>> Thanks,
>> Danny
>>
>> On Thu, Aug 31, 2023 at 6:38 PM Joey Tran 
>> wrote:
>>
>>> Hi all,
>>>
>>> What're all the current options for visualizing a pipeline? I'm guessing
>>> Dataflow has a visualization. I saw that there are also Apache Beam
>>> notebooks through GCP, and I'm aware of the Beam playground, but is there
>>> an easy way to create and view the visualization locally? For example, I
>>> might have a large codebase that's used to construct and run a pipeline,
>>> and in this case I don't think any of those three solutions would be very
>>> easy to use to visualize my pipeline (though I could be wrong)
>>>
>>> Best,
>>> Joey
>>>
>>> --
>>>
>>> Joey Tran | Senior Developer Il | AutoDesigner TL
>>>
>>> *he/him*
>>>
>>> [image: Schrödinger, Inc.] 
>>>
>>


Re: [Request for Feedback] Swift SDK Prototype

2023-08-24 Thread Robert Bradshaw via user
I would like to figure out a way to get the stream-y interface to work, as
I think it's more natural overall.

One hypothesis is that if any elements are carried over loop iterations,
there will likely be some that are carried over beyond the loop (after all
the callee doesn't know when the loop is supposed to end). We could reject
"plain" elements that are emitted after this point, requiring one to emit
timestamp-windowed-values.

Related to this, we could enforce that the only (user-accessible) way to
get such a timestamped value is to start with one, e.g. a
WindowedValue.withValue(O) produces a WindowedValue with the same
metadata but a new value. Thus a user wanting to do anything "fancy" would
have to explicitly request iteration over these windowed values rather than
over the raw elements. (This is also forward compatible with expanding the
metadata that can get attached, e.g. pane infos, and makes the right thing
the easiest/most natural.)

On Thu, Aug 24, 2023 at 12:10 PM Byron Ellis  wrote:

> Ah, that is a good point—being element-wise would make managing windows
> and time stamps easier for the user. Fortunately it’s a fairly easy change
> to make and maybe even less typing for the user. I was originally thinking
> side inputs and metrics would happen outside the loop, but I think you want
> a class and not a closure at that point for sanity.
>
> On Thu, Aug 24, 2023 at 12:02 PM Robert Bradshaw 
> wrote:
>
>> Ah, I see.
>>
>> Yeah, I've thought about using an iterable for the whole bundle rather
>> than start/finish bundle callbacks, but one of the questions is how that
>> would impact implicit passing of the timestamp (and other) metadata from
>> input elements to output elements. (You can of course attach the metadata
>> to any output that happens in the loop body, but it's very easy to
>> implicitly to break the 1:1 relationship here (e.g. by doing buffering or
>> otherwise modifying local state) and this would be hard to detect. (I
>> suppose trying to output after the loop finishes could require
>> something more explicit).
>>
>>
>> On Wed, Aug 23, 2023 at 6:56 PM Byron Ellis 
>> wrote:
>>
>>> Oh, I also forgot to mention that I included element-wise collection
>>> operations like "map" that eliminate the need for pardo in many cases. the
>>> groupBy command is actually a map + groupByKey under the hood. That was to
>>> be more consistent with Swift's collection protocol (and is also why
>>> PCollection and PCollectionStream are different types... PCollection
>>> implements map and friends as pipeline construction operations whereas
>>> PCollectionStream is an actual stream)
>>>
>>> I just happened to push some "IO primitives" that uses map rather than
>>> pardo in a couple of places to do a true wordcount using good ol'
>>> Shakespeare and very very primitive GCS IO.
>>>
>>> Best,
>>> B
>>>
>>> On Wed, Aug 23, 2023 at 6:08 PM Byron Ellis 
>>> wrote:
>>>
>>>> Indeed :-) Yeah, I went back and forth on the pardo syntax quite a bit
>>>> before settling on where I ended up. Ultimately I decided to go with
>>>> something that felt more Swift-y than anything else which means that rather
>>>> than dealing with a single element like you do in the other SDKs you're
>>>> dealing with a stream of elements (which of course will often be of size
>>>> 1). That's a really natural paradigm in the Swift world especially with the
>>>> async / await structures. So when you see something like:
>>>>
>>>> pardo(name:"Read Files") { filenames,output,errors in
>>>>
>>>> for try await (filename,_,_) in filenames {
>>>>   ...
>>>>   output.emit(data)
>>>>
>>>> }
>>>>
>>>> filenames is the input stream and then output and errors are both
>>>> output streams. In theory you can have as many output streams as you like
>>>> though at the moment there's a compiler bug in the new type pack feature
>>>> that limits it to "as many as I felt like supporting". Presumably this will
>>>> get fixed before the official 5.9 release which will probably be in the
>>>> October timeframe if history is any guide)
>>>>
>>>> If you had parameterization you wanted to send that would look like
>>>> pardo("Parameter") { param,filenames,output,error in ... } where "param"
>>>> would take on the value of "Parameter." All of this is being typechecked at
>>>>

Re: [Request for Feedback] Swift SDK Prototype

2023-08-23 Thread Robert Bradshaw via user
Neat.

Nothing like writing and SDK to actually understand how the FnAPI works :).
I like the use of groupBy. I have to admit I'm a bit mystified by the
syntax for parDo (I don't know swift at all which is probably tripping me
up). The addition of external (cross-language) transforms could let you
steal everything (e.g. IOs) pretty quickly from other SDKs.

On Fri, Aug 18, 2023 at 7:55 AM Byron Ellis via user 
wrote:

> For everyone who is interested, here's the draft PR:
>
> https://github.com/apache/beam/pull/28062
>
> I haven't had a chance to test it on my M1 machine yet though (there's a
> good chance there are a few places that need to properly address
> endianness. Specifically timestamps in windowed values and length in
> iterable coders as those both use specifically bigendian representations)
>
>
> On Thu, Aug 17, 2023 at 8:57 PM Byron Ellis  wrote:
>
>> Thanks Cham,
>>
>> Definitely happy to open a draft PR so folks can comment---there's not as
>> much code as it looks like since most of the LOC is just generated
>> protobuf. As for the support, I definitely want to add external transforms
>> and may actually add that support before adding the ability to make
>> composites in the language itself. With the way the SDK is laid out adding
>> composites to the pipeline graph is a separate operation than defining a
>> composite.
>>
>> On Thu, Aug 17, 2023 at 4:28 PM Chamikara Jayalath 
>> wrote:
>>
>>> Thanks Byron. This sounds great. I wonder if there is interest in Swift
>>> SDK from folks currently subscribed to the +user 
>>>  list.
>>>
>>> On Wed, Aug 16, 2023 at 6:53 PM Byron Ellis via dev 
>>> wrote:
>>>
 Hello everyone,

 A couple of months ago I decided that I wanted to really understand how
 the Beam FnApi works and how it interacts with the Portable Runner. For me
 at least that usually means I need to write some code so I can see things
 happening in a debugger and to really prove to myself I understood what was
 going on I decided I couldn't use an existing SDK language to do it since
 there would be the temptation to read some code and convince myself that I
 actually understood what was going on.

 One thing led to another and it turns out that to get a minimal FnApi
 integration going you end up writing a fair bit of an SDK. So I decided to
 take things to a point where I had an SDK that could execute a word count
 example via a portable runner backend. I've now reached that point and
 would like to submit my prototype SDK to the list for feedback.

 It's currently living in a branch on my fork here:

 https://github.com/byronellis/beam/tree/swift-sdk/sdks/swift

 At the moment it runs via the most recent XCode Beta using Swift 5.9 on
 Intel Macs, but should also work using beta builds of 5.9 for Linux running
 on Intel hardware. I haven't had a chance to try it on ARM hardware and
 make sure all of the endian checks are complete. The
 "IntegrationTests.swift" file contains a word count example that reads some
 local files (as well as a missing file to exercise DLQ functionality) and
 output counts through two separate group by operations to get it past the
 "map reduce" size of pipeline. I've tested it against the Python Portable
 Runner. Since my goal was to learn FnApi there is no Direct Runner at this
 time.

 I've shown it to a couple of folks already and incorporated some of
 that feedback already (for example pardo was originally called dofn when
 defining pipelines). In general I've tried to make the API as "Swift-y" as
 possible, hence the heavy reliance on closures and while there aren't yet
 composite PTransforms there's the beginnings of what would be needed for a
 SwiftUI-like declarative API for creating them.

 There are of course a ton of missing bits still to be implemented, like
 counters, metrics, windowing, state, timers, etc.

>>>
>>> This should be fine and we can get the code documented without these
>>> features. I think support for composites and adding an external transform
>>> (see, Java
>>> ,
>>> Python
>>> ,
>>> Go
>>> ,
>>> TypeScript
>>> )
>>> to add support for multi-lang will bring in a lot of features (for example,
>>> I/O connectors) for free.
>>>
>>>

 Any and all feedback welcome and happy to submit a PR if folks are
 interested, though the "Swift Way" would be to have it in its own repo so
 that 

Re: Getting Started With Implementing a Runner

2023-07-24 Thread Robert Bradshaw via user
I took a first pass at
https://github.com/apache/beam/blob/be19140f3e9194721f36e57f4a946adc6c43971a/website/www/site/content/en/contribute/runner-guide.md

https://github.com/apache/beam/blob/1cfc0fdc6ff27ad70365683fdc8264f42642f6e9/sdks/python/apache_beam/runners/trivial_runner.py
may also be of interest.

On Fri, Jul 21, 2023 at 7:25 AM Joey Tran  wrote:
>
> Could you let me know when you update it? I would be interested in rereading 
> after the rewrite.
>
> Thanks!
> Joey
>
> On Fri, Jul 14, 2023 at 4:38 PM Robert Bradshaw  wrote:
>>
>> I'm taking an action item to update that page, as it is *way* out of date.
>>
>> On Thu, Jul 13, 2023 at 6:54 PM Joey Tran  wrote:
>>>
>>> I see. I guess I got a little confused since these are mentioned in the 
>>> Authoring a Runner docs page which implied to me that they'd be safe to 
>>> use. I'll check out the bundle_processor. Thanks!
>>>
>>> On Mon, Jul 10, 2023 at 1:07 PM Robert Bradshaw  wrote:
>>>>
>>>> On Sun, Jul 9, 2023 at 9:22 AM Joey Tran  wrote:
>>>>>
>>>>> Working on this on and off now and getting some pretty good traction.
>>>>>
>>>>> One thing I'm a little worried about is all the classes that are marked 
>>>>> "internal use only". A lot of these seem either very useful or possibly 
>>>>> critical to writing a runner. How strictly should I interpret these 
>>>>> private implementation labels?
>>>>>
>>>>> A few bits that I'm interested in using ordered by how surprised I was to 
>>>>> find that they're internal only.
>>>>>
>>>>>  - apache_bean.pipeline.AppliedPTransform
>>>>>  - apache_beam.pipeline.PipelineVisitor
>>>>>  - apache_beam.runners.common.DoFnRunner
>>>>
>>>>
>>>> The public API is the protos. You should not have to interact with 
>>>> AppliedPTransform and PipelineVisitor directly (and while you can reach in 
>>>> and do so, there are no promises here and these are subject to change). As 
>>>> for DoFnRunner, if you're trying to reach in at this level you're probably 
>>>> going to have to be replicating a bunch of surrounding infrastructure as 
>>>> well. I would recommend using a BundleProcessor [1] to coordinate the work 
>>>> (which will internally wire up the chain of DoFns correctly and take them 
>>>> through their proper lifecycle). As mentioned above, you can directly 
>>>> borrow the translations in fn_api_runner to go from a full Pipeline graph 
>>>> (proto) to a set of fused DoFns to execute in topological order (as 
>>>> ProcessBundleDescriptor protos, which is what BundleProcessor accepts).
>>>>
>>>> [1] 
>>>> https://github.com/apache/beam/blob/release-2.48.0/sdks/python/apache_beam/runners/worker/bundle_processor.py#L851
>>>>
>>>>>
>>>>> Thanks again for the help,
>>>>> Joey
>>>>>
>>>>> On Fri, Jun 23, 2023 at 8:34 PM Chamikara Jayalath  
>>>>> wrote:
>>>>>>
>>>>>> Another advantage of a portable runner would be that it will be using 
>>>>>> well defined and backwards compatible Beam portable APIs to communicate 
>>>>>> with SDKs. I think this is specially important for runners that do not 
>>>>>> live in the Beam repo since otherwise future SDK releases could break 
>>>>>> your runner in subtle ways. Also portability gives you more flexibility 
>>>>>> when it comes to choosing an SDK to define the pipeline and will allow 
>>>>>> you to execute transforms in any SDK via cross-language.
>>>>>>
>>>>>> Thanks,
>>>>>> Cham
>>>>>>
>>>>>> On Fri, Jun 23, 2023 at 1:57 PM Robert Bradshaw via user 
>>>>>>  wrote:
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Jun 23, 2023 at 1:43 PM Joey Tran  
>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> Totally doable by one person, especially given the limited feature 
>>>>>>>>> set you mention above. 
>>>>>>>>> https://docs.google.com/presentation/d/1Cso0XP9dmj77OD9Bd53C1M3W1sPJF0ZnA20gzb2BPhE
>>>>>>>>>  is a good starting point as to what the relationship between a 
>>>>>>

Re: Growing checkpoint size with Python SDF for reading from Redis streams

2023-07-20 Thread Robert Bradshaw via user
Your SDF looks fine. I wonder if there is an issue with how Flink is
implementing SDFs (e.g. not garbage collecting previous remainders).

On Tue, Jul 18, 2023 at 5:43 PM Nimalan Mahendran
 wrote:
>
> Hello,
>
> I am running a pipeline built in the Python SDK that reads from a Redis 
> stream via an SDF, in the following environment:
>
> Python 3.11
> Apache Beam 2.48.0
> Flink 1.16
> Checkpoint interval: 60s
> state.backend (Flink): hashmap
> state_backend (Beam): filesystem
>
> The issue that I am observing is that the checkpoint size keeps growing, even 
> when there are no items to read on the Redis stream. Since there are no items 
> to read on the Redis stream, the Redis stream SDF is simply doing the 
> following steps repeatedly, as part of DoFn.process, i.e. the pattern 
> described in the user-initiated checkpoint pattern in the Apache Beam 
> programming guide to handle polling for new items with some delay, if the 
> last poll returned no items:
>
> Make the call to the Redis client to read items from the Redis stream
> Receive no items from the Redis stream, and hence,
> Call tracker.defer_remainder(Duration.of(5)) and return-ing to defer 
> execution for 5 seconds. That code is located here.
> Go back to step 1.
>
> This checkpoint size growth happens regardless of whether I'm using 
> heap-based or RocksDB-based checkpoints. Eventually, the checkpoint grows 
> large enough to cause the task manager to crash, due to exhausting Java heap 
> space. The rate of checkpoint size growth is proportional to the number of 
> tracker.defer_remainder() calls I have done, i.e. increasing parallelism 
> and/or decreasing the timeout used in tracker.defer_remainder will increase 
> the rate of checkpoint growth.
>
> I took a look at the heap-based checkpoint files that I observed were getting 
> larger with each checkpoint (just using the less command) and noticed that 
> many copies of the residual restriction were present, which seemed like a red 
> flag. The residual restriction here is the one that results from calling 
> tracker.defer_remainder(), which results in a tracker.try_split(0.0).
>
> I've included the SDF code and jobmanager logs showing growing checkpoint 
> size here: https://gist.github.com/nybbles/1feafda7321f7c6a9e16c1455ebf0f3e. 
> I've included the restriction provider/tracker and other pieces for 
> completeness, but the SDF is towards the bottom.
>
> Any help would be appreciated! 
>
> Thanks,
> --
> Nimalan Mahendran
> ML Engineer at Liminal Insights


Re: Getting Started With Implementing a Runner

2023-07-14 Thread Robert Bradshaw via user
I'm taking an action item to update that page, as it is *way* out of date.

On Thu, Jul 13, 2023 at 6:54 PM Joey Tran  wrote:

> I see. I guess I got a little confused since these are mentioned in the 
> Authoring
> a Runner
> <https://beam.apache.org/contribute/runner-guide/#the-runner-api-protos> docs
> page which implied to me that they'd be safe to use. I'll check out the
> bundle_processor. Thanks!
>
> On Mon, Jul 10, 2023 at 1:07 PM Robert Bradshaw 
> wrote:
>
>> On Sun, Jul 9, 2023 at 9:22 AM Joey Tran 
>> wrote:
>>
>>> Working on this on and off now and getting some pretty good traction.
>>>
>>> One thing I'm a little worried about is all the classes that are marked
>>> "internal use only". A lot of these seem either very useful or possibly
>>> critical to writing a runner. How strictly should I interpret these private
>>> implementation labels?
>>>
>>> A few bits that I'm interested in using ordered by how surprised I was
>>> to find that they're internal only.
>>>
>>>  - apache_bean.pipeline.AppliedPTransform
>>>  - apache_beam.pipeline.PipelineVisitor
>>>  - apache_beam.runners.common.DoFnRunner
>>>
>>
>> The public API is the protos. You should not have to interact
>> with AppliedPTransform and PipelineVisitor directly (and while you can
>> reach in and do so, there are no promises here and these are subject to
>> change). As for DoFnRunner, if you're trying to reach in at this level
>> you're probably going to have to be replicating a bunch of surrounding
>> infrastructure as well. I would recommend using a BundleProcessor [1] to
>> coordinate the work (which will internally wire up the chain of DoFns
>> correctly and take them through their proper lifecycle). As mentioned
>> above, you can directly borrow the translations in fn_api_runner to go from
>> a full Pipeline graph (proto) to a set of fused DoFns to execute in
>> topological order (as ProcessBundleDescriptor protos, which is
>> what BundleProcessor accepts).
>>
>> [1]
>> https://github.com/apache/beam/blob/release-2.48.0/sdks/python/apache_beam/runners/worker/bundle_processor.py#L851
>>
>>
>>> Thanks again for the help,
>>> Joey
>>>
>>> On Fri, Jun 23, 2023 at 8:34 PM Chamikara Jayalath 
>>> wrote:
>>>
>>>> Another advantage of a portable runner would be that it will be using
>>>> well defined and backwards compatible Beam portable APIs to communicate
>>>> with SDKs. I think this is specially important for runners that do not live
>>>> in the Beam repo since otherwise future SDK releases could break your
>>>> runner in subtle ways. Also portability gives you more flexibility when it
>>>> comes to choosing an SDK to define the pipeline and will allow you to
>>>> execute transforms in any SDK via cross-language.
>>>>
>>>> Thanks,
>>>> Cham
>>>>
>>>> On Fri, Jun 23, 2023 at 1:57 PM Robert Bradshaw via user <
>>>> user@beam.apache.org> wrote:
>>>>
>>>>>
>>>>>
>>>>> On Fri, Jun 23, 2023 at 1:43 PM Joey Tran 
>>>>> wrote:
>>>>>
>>>>>> Totally doable by one person, especially given the limited feature
>>>>>>> set you mention above.
>>>>>>> https://docs.google.com/presentation/d/1Cso0XP9dmj77OD9Bd53C1M3W1sPJF0ZnA20gzb2BPhE
>>>>>>>  is
>>>>>>> a good starting point as to what the relationship between a Runner and 
>>>>>>> the
>>>>>>> SDK is at a level of detail sufficient for implementation (told from the
>>>>>>> perspective of an SDK, but the story is largely about the interface 
>>>>>>> which
>>>>>>> is directly applicable).
>>>>>>
>>>>>>
>>>>>> Great slides, I really appreciate the illustrations.
>>>>>>
>>>>>> I hadn't realized there was a concept of an "SDK Worker", I had
>>>>>> imagined that once the Runner started execution of a workflow, it was
>>>>>> Runner all the way down. Is the Fn API the only way to implement a 
>>>>>> runner?
>>>>>> Our execution environment is a bit constrained in such a way that we 
>>>>>> can't
>>>>>> expose the APIs required to implement the Fn API. To be forthright, we
>>>>>> basi

Re: Pandas 2 Timeline Estimate

2023-07-12 Thread Robert Bradshaw via user
Contributions welcome! I don't think we're at the point we can stop
supporting Pandas 1.x though, so we'd have to do it in such a way as to
support both.

On Wed, Jul 12, 2023 at 4:53 PM XQ Hu via user  wrote:

> https://github.com/apache/beam/issues/27221#issuecomment-1603626880
>
> This tracks the progress.
>
> On Wed, Jul 12, 2023 at 7:37 PM Adlae D'Orazio 
> wrote:
>
>> Hello,
>>
>> I am currently trying to use Interactive Beam to run my pipelines through
>> a Jupyter notebook, but I
>> have internal packages depending on Pandas 2. When I install interactive
>> beam, it overrides these dependencies. I was wondering whether there's a
>> timeframe for moving Beam to Pandas 2.v2? I know it has only been out
>> since late April, so it's not surprising that ya'll haven't supported it
>> yet. Just curious. Thank you!
>>
>> Best,
>> Adlae
>>
>


Re: Getting Started With Implementing a Runner

2023-07-10 Thread Robert Bradshaw via user
On Sun, Jul 9, 2023 at 9:22 AM Joey Tran  wrote:

> Working on this on and off now and getting some pretty good traction.
>
> One thing I'm a little worried about is all the classes that are marked
> "internal use only". A lot of these seem either very useful or possibly
> critical to writing a runner. How strictly should I interpret these private
> implementation labels?
>
> A few bits that I'm interested in using ordered by how surprised I was to
> find that they're internal only.
>
>  - apache_bean.pipeline.AppliedPTransform
>  - apache_beam.pipeline.PipelineVisitor
>  - apache_beam.runners.common.DoFnRunner
>

The public API is the protos. You should not have to interact
with AppliedPTransform and PipelineVisitor directly (and while you can
reach in and do so, there are no promises here and these are subject to
change). As for DoFnRunner, if you're trying to reach in at this level
you're probably going to have to be replicating a bunch of surrounding
infrastructure as well. I would recommend using a BundleProcessor [1] to
coordinate the work (which will internally wire up the chain of DoFns
correctly and take them through their proper lifecycle). As mentioned
above, you can directly borrow the translations in fn_api_runner to go from
a full Pipeline graph (proto) to a set of fused DoFns to execute in
topological order (as ProcessBundleDescriptor protos, which is
what BundleProcessor accepts).

[1]
https://github.com/apache/beam/blob/release-2.48.0/sdks/python/apache_beam/runners/worker/bundle_processor.py#L851


> Thanks again for the help,
> Joey
>
> On Fri, Jun 23, 2023 at 8:34 PM Chamikara Jayalath 
> wrote:
>
>> Another advantage of a portable runner would be that it will be using
>> well defined and backwards compatible Beam portable APIs to communicate
>> with SDKs. I think this is specially important for runners that do not live
>> in the Beam repo since otherwise future SDK releases could break your
>> runner in subtle ways. Also portability gives you more flexibility when it
>> comes to choosing an SDK to define the pipeline and will allow you to
>> execute transforms in any SDK via cross-language.
>>
>> Thanks,
>> Cham
>>
>> On Fri, Jun 23, 2023 at 1:57 PM Robert Bradshaw via user <
>> user@beam.apache.org> wrote:
>>
>>>
>>>
>>> On Fri, Jun 23, 2023 at 1:43 PM Joey Tran 
>>> wrote:
>>>
>>>> Totally doable by one person, especially given the limited feature set
>>>>> you mention above.
>>>>> https://docs.google.com/presentation/d/1Cso0XP9dmj77OD9Bd53C1M3W1sPJF0ZnA20gzb2BPhE
>>>>>  is
>>>>> a good starting point as to what the relationship between a Runner and the
>>>>> SDK is at a level of detail sufficient for implementation (told from the
>>>>> perspective of an SDK, but the story is largely about the interface which
>>>>> is directly applicable).
>>>>
>>>>
>>>> Great slides, I really appreciate the illustrations.
>>>>
>>>> I hadn't realized there was a concept of an "SDK Worker", I had
>>>> imagined that once the Runner started execution of a workflow, it was
>>>> Runner all the way down. Is the Fn API the only way to implement a runner?
>>>> Our execution environment is a bit constrained in such a way that we can't
>>>> expose the APIs required to implement the Fn API. To be forthright, we
>>>> basically only have the ability to start a worker either with a known
>>>> Pub/Sub topic to expect data from and a Pub/Sub topic to write to; or with
>>>> a bundle of data to process and return the outputs for. We're constrained
>>>> from really any additional communication with a worker beyond that.
>>>>
>>>
>>> The "worker" abstraction gives the ability to wrap any user code in a
>>> way that it can be called from any runner. If you're willing to constrain
>>> the code you're wrapping (e.g. "Python DoFns only") then this "worker" can
>>> be a logical, rather than physical, concept.
>>>
>>> Another way to look at it is that in practice, the "runner" often has
>>> its own notion of "workers" which wrap (often in a 1:1 way) the logical
>>> "SDK Worker" (which in turn invokes the actual DoFns). This latter may be
>>> inlined (e.g. if it's 100% Python on both sides). See, for example,
>>> https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py#L350
>>>
>>>
>>&g

Re: Getting Started With Implementing a Runner

2023-06-23 Thread Robert Bradshaw via user
On Fri, Jun 23, 2023 at 1:43 PM Joey Tran  wrote:

> Totally doable by one person, especially given the limited feature set you
>> mention above.
>> https://docs.google.com/presentation/d/1Cso0XP9dmj77OD9Bd53C1M3W1sPJF0ZnA20gzb2BPhE
>>  is
>> a good starting point as to what the relationship between a Runner and the
>> SDK is at a level of detail sufficient for implementation (told from the
>> perspective of an SDK, but the story is largely about the interface which
>> is directly applicable).
>
>
> Great slides, I really appreciate the illustrations.
>
> I hadn't realized there was a concept of an "SDK Worker", I had imagined
> that once the Runner started execution of a workflow, it was Runner all the
> way down. Is the Fn API the only way to implement a runner? Our execution
> environment is a bit constrained in such a way that we can't expose the
> APIs required to implement the Fn API. To be forthright, we basically only
> have the ability to start a worker either with a known Pub/Sub topic to
> expect data from and a Pub/Sub topic to write to; or with a bundle of data
> to process and return the outputs for. We're constrained from really any
> additional communication with a worker beyond that.
>

The "worker" abstraction gives the ability to wrap any user code in a way
that it can be called from any runner. If you're willing to constrain the
code you're wrapping (e.g. "Python DoFns only") then this "worker" can be a
logical, rather than physical, concept.

Another way to look at it is that in practice, the "runner" often has its
own notion of "workers" which wrap (often in a 1:1 way) the logical "SDK
Worker" (which in turn invokes the actual DoFns). This latter may be
inlined (e.g. if it's 100% Python on both sides). See, for example,
https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py#L350


> On Fri, Jun 23, 2023 at 4:02 PM Robert Bradshaw 
> wrote:
>
>> On Fri, Jun 23, 2023 at 11:15 AM Joey Tran 
>> wrote:
>>
>>> Thanks all for the responses!
>>>
>>> If Beam Runner Authoring Guide is rather high-level for you, then, at
>>>> fist, I’d suggest to answer two questions for yourself:
>>>> - Am I going to implement a portable runner or native one?
>>>>
>>>
>>> Portable sounds great, but the answer depends on how much additional
>>> cost it'd require to implement portable over non-portable, even considering
>>> future deprecation (unless deprecation is happening tomorrow). I'm not
>>> familiar enough to know what the additional cost is so I don't have a firm
>>> answer.
>>>
>>
>> I would way it would not be that expensive to write it in a "portable
>> compatible" way (i.e it uses the publicly-documented protocol as the
>> interface rather than reaching into internal details) even if it doesn't
>> use GRCP and fire up separate processes/docker images for the workers
>> (preferring to do tall of that inline like the Python portable direct
>> runner does).
>>
>>
>>> - Which SDK I should use for this runner?
>>>>
>>> I'd be developing this runner against the python SDK and if the runner
>>> only worked with the python SDK that'd be okay in the short term
>>>
>>
>> Yes. And if you do it the above way, it should be easy to extend (or not)
>> if/when the need arises.
>>
>>
>>> Also, we don’t know if this new runner will be contributed back to Beam,
>>>> what is a runtime and what actually is a final goal of it.
>>>
>>> Likely won't be contributed back to Beam (not sure if it'd actually be
>>> useful to a wide audience anyways).
>>>
>>> The context is we've been developing an in-house large-scale pipeline
>>> framework that encapsulates both the programming model and the
>>> runner/execution of data workflows. As it's grown, I keep finding myself
>>> just reimplementing features and abstractions Beam has already implemented,
>>> so I wanted to explore adopting Beam. Our execution environment is very
>>> particular though and our workflows require it (due to the way we license
>>> our software), so my plan was to try to create a very basic runner that
>>> uses our execution environment. The runner could have very few features
>>> e.g. no streaming, no metrics, no side inputs, etc. After that I'd probably
>>> introduce a shim for some of our internally implemented transforms and
>>> assess from there.
>>>
>>> Not sure if 

Re: Getting Started With Implementing a Runner

2023-06-23 Thread Robert Bradshaw via user
On Fri, Jun 23, 2023 at 11:15 AM Joey Tran 
wrote:

> Thanks all for the responses!
>
> If Beam Runner Authoring Guide is rather high-level for you, then, at
>> fist, I’d suggest to answer two questions for yourself:
>> - Am I going to implement a portable runner or native one?
>>
>
> Portable sounds great, but the answer depends on how much additional cost
> it'd require to implement portable over non-portable, even considering
> future deprecation (unless deprecation is happening tomorrow). I'm not
> familiar enough to know what the additional cost is so I don't have a firm
> answer.
>

I would way it would not be that expensive to write it in a "portable
compatible" way (i.e it uses the publicly-documented protocol as the
interface rather than reaching into internal details) even if it doesn't
use GRCP and fire up separate processes/docker images for the workers
(preferring to do tall of that inline like the Python portable direct
runner does).


> - Which SDK I should use for this runner?
>>
> I'd be developing this runner against the python SDK and if the runner
> only worked with the python SDK that'd be okay in the short term
>

Yes. And if you do it the above way, it should be easy to extend (or not)
if/when the need arises.


> Also, we don’t know if this new runner will be contributed back to Beam,
>> what is a runtime and what actually is a final goal of it.
>
> Likely won't be contributed back to Beam (not sure if it'd actually be
> useful to a wide audience anyways).
>
> The context is we've been developing an in-house large-scale pipeline
> framework that encapsulates both the programming model and the
> runner/execution of data workflows. As it's grown, I keep finding myself
> just reimplementing features and abstractions Beam has already implemented,
> so I wanted to explore adopting Beam. Our execution environment is very
> particular though and our workflows require it (due to the way we license
> our software), so my plan was to try to create a very basic runner that
> uses our execution environment. The runner could have very few features
> e.g. no streaming, no metrics, no side inputs, etc. After that I'd probably
> introduce a shim for some of our internally implemented transforms and
> assess from there.
>
> Not sure if this is a lofty goal or not, so happy to hear your thoughts as
> to whether this seems reasonable and achievable without a large concerted
> effort or even if the general idea makes any sense. (I recognize that it
> might not be *easy*, but I don't have the resources to dedicate more than
> myself to work on a PoC)
>

Totally doable by one person, especially given the limited feature set you
mention above.
https://docs.google.com/presentation/d/1Cso0XP9dmj77OD9Bd53C1M3W1sPJF0ZnA20gzb2BPhE
is a good starting point as to what the relationship between a Runner and
the SDK is at a level of detail sufficient for implementation (told from
the perspective of an SDK, but the story is largely about the interface
which is directly applicable).

Given the limited feature set you proposed, this is similar to the original
Python portable runner which took a week or two to put together (granted a
lot has been added since then), or the typescript direct runner (
https://github.com/apache/beam/blob/ea9147ad2946f72f7d52924cba2820e9aae7cd91/sdks/typescript/src/apache_beam/runners/direct_runner.ts
) which was done (in its basic form, no support for side inputs and such)
in less than a week. Granted, as these are local runners, this illustrates
only the Beam-side complexity of things (not the work of actually
implementing a distributed shuffle, starting and assigning work to multiple
workers, etc. but presumably that's the kind of thing your execution
environment already takes care of.

As for some more concrete pointers, you could probably leverage a lot of
what's there by invoking create_stages

https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L362

which will do optimization, fusion, etc. and then implementing your own
version of run_stages

https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L392

to execute these in topological order on your compute infrastructure. (If
you're not doing streaming, this is much more straightforward than all the
bundler scheduler stuff that currently exists in that code).



>
>
>
>
>
> On Fri, Jun 23, 2023 at 12:17 PM Alexey Romanenko <
> aromanenko@gmail.com> wrote:
>
>>
>>
>> On 23 Jun 2023, at 17:40, Robert Bradshaw via user 
>> wrote:
>>
>> On Fri, Jun 23, 2023, 7:37 AM Alexey Romanenko 
>> wrote:
>>
>>> If Beam Runner Authoring Guide is rather high-level for you, then, at
&

Re: Getting Started With Implementing a Runner

2023-06-23 Thread Robert Bradshaw via user
On Fri, Jun 23, 2023, 7:37 AM Alexey Romanenko 
wrote:

> If Beam Runner Authoring Guide is rather high-level for you, then, at
> fist, I’d suggest to answer two questions for yourself:
> - Am I going to implement a portable runner or native one?
>

The answer to this should be portable, as non-portable ones will be
deprecated.

- Which SDK I should use for this runner?
>

The answer to the above question makes this one moot :).

On a more serious note, could you tell us a bit more about the runner
you're looking at implementing?


> Then, depending on answers, I’d suggest to take as an example one of the
> most similar Beam runners and use it as a more detailed source of
> information along with Beam runner doc mentioned before.
>
> —
> Alexey
>
> On 22 Jun 2023, at 14:39, Joey Tran  wrote:
>
> Hi Beam community!
>
> I'm interested in trying to implement a runner with my company's execution
> environment but I'm struggling to get started. I've read the docs page
>  on
> implementing a runner but it's quite high level. Anyone have any concrete
> suggestions on getting started?
>
> I've started by cloning and running the hello world example
> . I've then subclassed `
> PipelineRunner
> `
> to create my own custom runner but at this point I'm a bit stuck. My custom
> runner just looks like
>
> class CustomRunner(runner.PipelineRunner):
> def run_pipeline(self, pipeline,
>  options):
> self.visit_transforms(pipeline, options)
>
> And when using it I get an error about not having implemented "Impulse"
>
> NotImplementedError: Execution of []
> not implemented in runner .
>
> Am I going about this the right way? Are there tests I can run my custom
> runner against to validate it beyond just running the hello world example?
> I'm finding myself just digging through the beam source to try to piece
> together how a runner works and I'm struggling to get a foothold. Any
> guidance would be greatly appreciated, especially if anyone has any
> experience implementing their own python runner.
>
> Thanks in advance! Also, could I get a Slack invite?
> Cheers,
> Joey
>
>
>


Re: [Dataflow][Stateful] Bypass Dataflow Overrides?

2023-05-25 Thread Robert Bradshaw via user
The key in GroupIntoBatches is actually not semantically meaningful, and
for a batch pipeline the use of state/timers is not needed either. If all
you need to do is batch elements into groups of (at most) N, you can write
a DoFn that collects things in its process method and emits them when the
batch is full (and also in the finish bundle method, though some care needs
to be taken to handle windowing correctly). On the other hand, if you're
trying to limit the parallelism across all workers you'd likely need to
limit the number of concurrently-processed keys (which would require a
grouping of some sort onto a finite number of keys, unless you want to cap
your entire pipeline at a certain number of workers).

On Thu, May 25, 2023 at 2:34 PM Evan Galpin  wrote:

> Understood, thanks for the clarification, I'll need to look more in-depth
> at my pipeline code then.  I'm definitely observing that all steps
> downstream from the Stateful step in my pipeline do not start until steps
> upstream of the Stateful step are fully completed.  The Stateful step is a
> RateLimit[1] transfer which borrows heavily from GroupIntoBatches.
>
> [1] https://gist.github.com/egalpin/162a04b896dc7be1d0899acf17e676b3
>
> On Thu, May 25, 2023 at 2:25 PM Robert Bradshaw via user <
> user@beam.apache.org> wrote:
>
>> The GbkBeforeStatefulParDo is an implementation detail used to send all
>> elements with the same key to the same worker (so that they can share
>> state, which is itself partitioned by worker). This does cause a global
>> barrier in batch pipelines.
>>
>> On Thu, May 25, 2023 at 2:15 PM Evan Galpin  wrote:
>>
>>> Hi all,
>>>
>>> I'm running into a scenario where I feel that Dataflow Overrides
>>> (specifically BatchStatefulParDoOverrides.GbkBeforeStatefulParDo ) are
>>> unnecessarily causing a batch pipeline to "pause" throughput since a GBK
>>> needs to have processed all the data in a window before it can output.
>>>
>>> Is it strictly required that GbkBeforeStatefulParDo must run before any
>>> stateful DoFn? If not, what failure modes is GbkBeforeStatefulParDo trying
>>> to protect against, and how can it be bypassed/disabled while still using
>>> DataflowRunner?
>>>
>>> Thanks,
>>> Evan
>>>
>>


Re: [Dataflow][Stateful] Bypass Dataflow Overrides?

2023-05-25 Thread Robert Bradshaw via user
The GbkBeforeStatefulParDo is an implementation detail used to send all
elements with the same key to the same worker (so that they can share
state, which is itself partitioned by worker). This does cause a global
barrier in batch pipelines.

On Thu, May 25, 2023 at 2:15 PM Evan Galpin  wrote:

> Hi all,
>
> I'm running into a scenario where I feel that Dataflow Overrides
> (specifically BatchStatefulParDoOverrides.GbkBeforeStatefulParDo ) are
> unnecessarily causing a batch pipeline to "pause" throughput since a GBK
> needs to have processed all the data in a window before it can output.
>
> Is it strictly required that GbkBeforeStatefulParDo must run before any
> stateful DoFn? If not, what failure modes is GbkBeforeStatefulParDo trying
> to protect against, and how can it be bypassed/disabled while still using
> DataflowRunner?
>
> Thanks,
> Evan
>


Re: [EXTERNAL] Re: Vulnerabilities in Transitive dependencies

2023-05-02 Thread Robert Bradshaw via user
Generally these types of vulnerabilities are only exploitable when
processing untrusted data and/or exposing a public service to the
internet. This is not the typical use of Beam (especially the latter),
but that's not to say Beam can't be used in this way. That being said,
it's preferable to simply update libraries rather than have to do this
kind of analysis. On that note, +1 to the removal of Avro from core as
a mitigation for this vulnerability, though that's sometimes easier
said than done.

On Tue, May 2, 2023 at 7:35 AM Brule, Joshua L. (Josh), CISSP via user
 wrote:
>
> The SnakeYAML analysis is exactly what I was looking for. The library of 
> concern is org.codehaus.jackson jackson-mapper-asl 1.9.13.

Are you looking at
https://security.snyk.io/package/maven/org.codehaus.jackson:jackson-mapper-asl/1.9.13
?

I see NOTE: "This vulnerability is only exploitable when the
non-default UNWRAP_SINGLE_VALUE_ARRAYS feature is enabled" which a
quick grep through our codebase indicates we do not use.

> Our scanner is reporting ~20 CVEs with a CVSS of >= 7 and ~60 CVEs total.
>
>
>
> Thank you,
>
> Josh
>
>
>
> From: Bruno Volpato 
> Date: Monday, May 1, 2023 at 9:04 PM
> To: user@beam.apache.org , Brule, Joshua (Josh) L., 
> CISSP 
> Subject: [EXTERNAL] Re: Vulnerabilities in Transitive dependencies
>
> Hi Joshua,
>
>
>
> It may take a lot of effort and knowledge to review whether CVEs are 
> exploitable or not.
>
> I have seen this kind of analysis done in a few cases, such as SnakeYAML 
> recently: https://s.apache.org/beam-and-cve-2022-1471 
> (https://github.com/apache/beam/issues/25449)
>
>
>
> If there is a patch available, I believe we should err on the side of caution 
> and update them (if possible).
>
>
>
> For the example that you mentioned, there is some work started by Alexey 
> Romanenko to remove/decouple Avro from Beam core, so we can upgrade to newer 
> versions: https://github.com/apache/beam/issues/25252.
>
> Another recent progress is Beam releasing a new version of its vendored gRPC 
> to move past some CVEs originated from protobuf-java: 
> https://github.com/apache/beam/issues/25746
>
>
>
>
>
> Is there any other particular dependency that you are concerned about?
>
> Please consider filing an issue at https://github.com/apache/beam/issues so 
> we can start tracking it.
>
>
>
>
>
> Best,
>
> Bruno
>
>
>
>
>
>
>
> On Mon, May 1, 2023 at 5:28 PM Brule, Joshua L. (Josh), CISSP via user 
>  wrote:
>
> Hello,
>
>
>
> I am hoping you could help me with our vulnerability remediation process. We 
> have several development teams using Apache Beam in their projects. When 
> performing our Software Composition Analysis (Third-Party Software) scan, 
> projects utilizing Apache Beam have an incredible number of CVEs, Jackson 
> Data Mapper being an extreme outlier.
>
>
>
> I Jackson Data Mapper is a transitive dependency via Avro but I am wondering. 
> Has the Apache Beam team reviewed these CVEs and found them NOT EXPLOITABLE 
> as implemented. Or if exploitable implemented mitigations pre/post usage of 
> the library?
>
>
>
> Thank you for your time,
>
> Josh
>
>
>
> Joshua Brule | Sr Information Security Engineer


Re: How Beam Pipeline Handle late events

2023-04-24 Thread Robert Bradshaw via user
On Fri, Apr 21, 2023 at 3:37 AM Pavel Solomin  wrote:
>
> Thank you for the information.
>
> I'm assuming you had a unique ID in records, and you observed some IDs 
> missing in Beam output comparing with Spark, and not just some duplicates 
> produced by Spark.
>
> If so, I would suggest to create a P1 issue at 
> https://github.com/apache/beam/issues

+1, ideally with enough information to reproduce. As far as I
understand, what you have should just work (but I'm not a flink
expert).

> Also, did you try setting --checkpointingMode=AT_LEAST_ONCE ?
>
> Unfortunately, I can't be more helpful here, but let me share some of the 
> gotchas I had from my previous experience of running Beam on top of Flink for 
> similar use-case (landing of data from messaging system into files):
>
> (1) https://github.com/apache/beam/issues/26041 - I've solved that by adding 
> a runId into file names which is re-generated between app (re) starts
>
> (2) I used processing time watermarks and simple window without lateness set 
> up - combining it with (1) achieved no data loss
>
> Best Regards,
> Pavel Solomin
>
> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
>
>
>
>
>
>
> On Thu, 20 Apr 2023 at 02:18, Lydian  wrote:
>>
>> Yes, we did enabled this in our pipeline.
>>
>> On Wed, Apr 19, 2023 at 5:00 PM Pavel Solomin  wrote:
>>>
>>> Thank you
>>>
>>> Just to confirm: how did you configure Kafka offset commits? Did you have 
>>> this flag enabled?
>>>
>>>
>>> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#isCommitOffsetsInFinalizeEnabled--
>>>
>>>
>>> On Thursday, 20 April 2023, Trevor Burke  wrote:
>>> > Hi Pavel,
>>> > Thanks for the reply.
>>> > No, the event losses are not consistent. While we've been running our 
>>> > pipelines in parallel (Beam vs Spark) we are seeing some days with no 
>>> > event loss and some days with some, but it's always less than 0.05%
>>> >
>>> >
>>> > On Wed, Apr 19, 2023 at 8:07 AM Pavel Solomin  
>>> > wrote:
>>> >>
>>> >> Hello Lydian,
>>> >> Do you always observe data loss? Or - maybe, it happens only when you 
>>> >> restart your pipeline from a Flink savepoint? If you lose data only 
>>> >> between restarts - is you issue similar to 
>>> >> https://github.com/apache/beam/issues/26041 ?
>>> >>
>>> >> Best Regards,
>>> >> Pavel Solomin
>>> >>
>>> >> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
>>> >>
>>> >>
>>> >>
>>> >>
>>> >> On Tue, 18 Apr 2023 at 18:58, Lydian  wrote:
>>> >>>
>>> >>> Hi,
>>> >>>
>>> >>> We are using Beam (Python SDK + Flink Runner) to backup our streaming 
>>> >>> data from Kafka to S3. To avoid hitting the s3 threshold, we use 1 
>>> >>> minute fixed window to group messages.  We've had similar pipeline in 
>>> >>> spark that we want to replace it with this new pipeline.  However, the 
>>> >>> Beam pipeline seems always having events missing, which we are thinking 
>>> >>> could be due to late events (because the number of missing events get 
>>> >>> lower when having higher allow_lateness)
>>> >>>
>>> >>> We've tried the following approach to avoid late events, but none of 
>>> >>> them are working:
>>> >>> 1.  Use Processing timestamp instead of event time. Ideally if 
>>> >>> windowing is using the processing timestamp, It shouldn't consider any 
>>> >>> event as late. But this doesn't seem to work at all.
>>> >>> 2.  Configure allow_lateness to 12 hour.  Given that approach 1 seems 
>>> >>> not working as expected, we've also configured the allow_lateness. But 
>>> >>> it still have missing events compared to our old spark pipelines.
>>> >>>
>>> >>> Here's the simplified code we have
>>> >>> ```
>>> >>>
>>> >>> def add_timestamp(event: Any) -> Any:
>>> >>>
>>> >>> import time
>>> >>>
>>> >>> from apache_beam import window
>>> >>>
>>> >>> return window.TimestampedValue(event, time.time())
>>> >>>
>>> >>> (pipeline
>>> >>>
>>> >>> | "Kafka Read" >> ReadFromKafka(topic="test-topic", 
>>> >>> consumer_config=consumer_config)
>>> >>>
>>> >>> | "Adding 'trigger_processing_time' timestamp" >> 
>>> >>> beam.Map(add_timestamp)
>>> >>>
>>> >>> | "Window into Fixed Intervals"
>>> >>>
>>> >>> >> beam.WindowInto(
>>> >>>
>>> >>> beam.window.FixedWindows(fixed_window_size),
>>> >>>
>>> >>> allowed_lateness=beam.utils.timestamp.Duration(allowed_lateness)
>>> >>>
>>> >>> )
>>> >>>
>>> >>> |  "Write to s3" >> beam.ParDo(WriteBatchesToS3(s3_path))
>>> >>>
>>> >>> ```
>>> >>>
>>> >>> I am wondering:
>>> >>> 1. Is the add_timestamp approach correctly marked it to use processing 
>>> >>> time for windowing?  If so, why there still late event consider we are 
>>> >>> using processing time and not event time?
>>> >>> 2.  Are there are any other approaches to avoid dropping any late event 
>>> >>> besides ` allowed_lateness`?  In flink you can output those late events 
>>> >>> as side output, wondering if we can do similar thing in Beam as well? 
>>> >>> Would someone 

Re: [Question] - Time series - cumulative sum in right order with python api in a batch process

2023-04-24 Thread Robert Bradshaw via user
You are correct in that the data may arrive in an unordered way.
However, once a window finishes, you are guaranteed to have seen all
the data up to that point (modulo late data) and can then confidently
compute your ordered cumulative sum.

You could do something like this:

def cumulative_sums(key, timestamped_values):
  running = 0
  for _, x in sorted(timestamped_values):
yield x

sums = (timestamped_data
  | beam.Map(lambda x, t=DoFn.TimestampParam: (t, x)
  | beam.WindowInto(...)
  | beam.GroupByKey()
  | beam.FlatMapTuple(cumulative_sums))



On Mon, Apr 24, 2023 at 8:23 AM Guagliardo, Patrizio via user
 wrote:
>
> Hi together,
>
>
>
> I want to create a cumulative sum over a time series in a bounded batch 
> processing in Apache beam with the Python API. What you can do is to write a 
> cummulative sum with a stateful DoFn, but the problem you would still face is 
> that you cannot handle it this way when the data in unordered, which is the 
> case in a PCollection. Is there a way to make the cumulative sum over time in 
> a batch process? This is what i did (whithout order):
>
> import apache_beam as beam
>
> from apache_beam import TimeDomain
>
> from apache_beam.transforms.userstate import ReadModifyWriteStateSpec, 
> TimerSpec, CombiningValueStateSpec
>
> from apache_beam.transforms.window import FixedWindows, GlobalWindows
>
>
>
>
>
> class TimestampedSumAccumulator(beam.DoFn):
>
> SUM_STATE = 'sum'
>
>
>
> def process(
>
> self, element,
>
> timestamp=beam.DoFn.TimestampParam,
>
> sum_state=beam.DoFn.StateParam(ReadModifyWriteStateSpec(SUM_STATE, 
> beam.coders.FloatCoder()))
>
> ):
>
> sum_value = sum_state.read() or 0.0
>
> # print(element)
>
> sum_value += element[1]
>
> sum_state.write(sum_value)
>
> yield beam.transforms.window.TimestampedValue(sum_value, timestamp)
>
>
>
>
>
> with beam.Pipeline() as p:
>
> sums = (p
>
> | 'Create' >> beam.Create([
>
> (3.1, 3),
>
> (1.5, 1),
>
> (4.2, 4),
>
> (5.4, 5),
>
> (2.3, 2)
>
> ])
>
> | 'AddTimestamps' >> beam.Map(lambda x: 
> beam.transforms.window.TimestampedValue(x[0], x[1]))
>
> | 'AddKeys' >> beam.Map(lambda x: ('sum_key', x))
>
> | 'Window' >> beam.WindowInto(FixedWindows(10))
>
> | 'Accumulate' >> beam.ParDo(TimestampedSumAccumulator())
>
> | 'Print' >> beam.Map(print))
>
>
>
> How could that be done to make the cumulative sum in the “right” order?
>
>
>
> Thank you very much in advance.
>
>
>
>
> 
> This e-mail and any attachments may be confidential or legally privileged. If 
> you received this message in error or are not the intended recipient, you 
> should destroy the e-mail message and any attachments or copies, and you are 
> prohibited from retaining, distributing, disclosing or using any information 
> contained herein. Please inform us of the erroneous delivery by return 
> e-mail. Thank you for your cooperation. For more information on how we use 
> your personal data please see our Privacy Notice.


Re: Avoid using docker when I use a external transformation

2023-04-18 Thread Robert Bradshaw via user
Docker is not necessary to expand the transform (indeed, by default it
should just pull the Jar and invokes that directly to start the expansion
service), but it is used as the environment in which to execute the
expanded transform.

It would be in theory possible to run the worker without docker as well.
This would involve manually starting up a worker in Java, manually starting
up an expansion service that points to this worker as its environment, and
then using that expansion service from Python. I've never done that myself,
so I don't know how easy it would be, but the "LOOPBACK" runner in Java
could give some insight into how this could be done.



On Tue, Apr 18, 2023 at 5:22 PM Juan Romero  wrote:

> Hi.
>
> I have an issue when I try to run a kafka io pipeline in python on my
> local machine, because in my local machine it is not possible to install
> docker. Seems that beam try to use docker to pull and start the beam java
> sdk i order to start the expansion service. I tried to start manually the
> expansion service and define the expansion service url in the connector
> properties but in anyway it keeps asking by docker process. My question is
> if we can run a pipeline with external transformations without install
> docker.
>
> Looking forward to it. Thanks!!
>


Re: Is there any way to set the parallelism of operators like group by, join?

2023-04-18 Thread Robert Bradshaw via user
Yeah, I don't think we have a good per-operator API for this. If we were to
add it, it probably belongs in ResourceHints.

On Sun, Apr 16, 2023 at 11:28 PM Reuven Lax  wrote:

> Looking at FlinkPipelineOptions, there is a parallelism option you can
> set. I believe this sets the default parallelism for all Flink operators.
>
> On Sun, Apr 16, 2023 at 7:20 PM Jeff Zhang  wrote:
>
>> Thanks Holden, this would work for Spark, but Flink doesn't have such
>> kind of mechanism, so I am looking for a general solution on the beam side.
>>
>> On Mon, Apr 17, 2023 at 10:08 AM Holden Karau 
>> wrote:
>>
>>> To a (small) degree Sparks “new” AQE might be able to help depending on
>>> what kind of operations Beam is compiling it down to.
>>>
>>> Have you tried setting spark.sql.adaptive.enabled &
>>> spark.sql.adaptive.coalescePartitions.enabled
>>>
>>>
>>>
>>> On Mon, Apr 17, 2023 at 10:34 AM Reuven Lax via user <
>>> user@beam.apache.org> wrote:
>>>
>>>> I see. Robert - what is the story for parallelism controls on GBK with
>>>> the Spark or Flink runners?
>>>>
>>>> On Sun, Apr 16, 2023 at 6:24 PM Jeff Zhang  wrote:
>>>>
>>>>> No, I don't use dataflow, I use Spark & Flink.
>>>>>
>>>>>
>>>>> On Mon, Apr 17, 2023 at 8:08 AM Reuven Lax  wrote:
>>>>>
>>>>>> Are you running on the Dataflow runner? If so, Dataflow - unlike
>>>>>> Spark and Flink - dynamically modifies the parallelism as the operator
>>>>>> runs, so there is no need to have such controls. In fact these specific
>>>>>> controls wouldn't make much sense for the way Dataflow implements these
>>>>>> operators.
>>>>>>
>>>>>> On Sun, Apr 16, 2023 at 12:25 AM Jeff Zhang  wrote:
>>>>>>
>>>>>>> Just for performance tuning like in Spark and Flink.
>>>>>>>
>>>>>>>
>>>>>>> On Sun, Apr 16, 2023 at 1:10 PM Robert Bradshaw via user <
>>>>>>> user@beam.apache.org> wrote:
>>>>>>>
>>>>>>>> What are you trying to achieve by setting the parallelism?
>>>>>>>>
>>>>>>>> On Sat, Apr 15, 2023 at 5:13 PM Jeff Zhang 
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Thanks Reuven, what I mean is to set the parallelism in operator
>>>>>>>>> level. And the input size of the operator is unknown at compiling 
>>>>>>>>> stage if
>>>>>>>>> it is not a source
>>>>>>>>>  operator,
>>>>>>>>>
>>>>>>>>> Here's an example of flink
>>>>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/#operator-level
>>>>>>>>> Spark also support to set operator level parallelism (see groupByKey
>>>>>>>>> and reduceByKey):
>>>>>>>>> https://spark.apache.org/docs/latest/rdd-programming-guide.html
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Sun, Apr 16, 2023 at 1:42 AM Reuven Lax via user <
>>>>>>>>> user@beam.apache.org> wrote:
>>>>>>>>>
>>>>>>>>>> The maximum parallelism is always determined by the parallelism
>>>>>>>>>> of your data. If you do a GroupByKey for example, the number of keys 
>>>>>>>>>> in
>>>>>>>>>> your data determines the maximum parallelism.
>>>>>>>>>>
>>>>>>>>>> Beyond the limitations in your data, it depends on your execution
>>>>>>>>>> engine. If you're using Dataflow, Dataflow is designed to 
>>>>>>>>>> automatically
>>>>>>>>>> determine the parallelism (e.g. work will be dynamically split and 
>>>>>>>>>> moved
>>>>>>>>>> around between workers, the number of workers will autoscale, etc.), 
>>>>>>>>>> so
>>>>>>>>>> there's no need to explicitly set the parallelism of the execution.
>>>>>>>>>>
>>>>>>>>>> On Sat, Apr 15, 2023 at 1:12 AM Jeff Zhang 
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Besides the global parallelism of beam job, is there any way to
>>>>>>>>>>> set parallelism for individual operators like group by and join? I
>>>>>>>>>>> understand the parallelism setting depends on the underlying 
>>>>>>>>>>> execution
>>>>>>>>>>> engine, but it is very common to set parallelism like group by and 
>>>>>>>>>>> join in
>>>>>>>>>>> both spark & flink.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Best Regards
>>>>>>>>>>>
>>>>>>>>>>> Jeff Zhang
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Best Regards
>>>>>>>>>
>>>>>>>>> Jeff Zhang
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Best Regards
>>>>>>>
>>>>>>> Jeff Zhang
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Best Regards
>>>>>
>>>>> Jeff Zhang
>>>>>
>>>> --
>>> Twitter: https://twitter.com/holdenkarau
>>> Books (Learning Spark, High Performance Spark, etc.):
>>> https://amzn.to/2MaRAG9  <https://amzn.to/2MaRAG9>
>>> YouTube Live Streams: https://www.youtube.com/user/holdenkarau
>>>
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>


Re: Is there any way to set the parallelism of operators like group by, join?

2023-04-15 Thread Robert Bradshaw via user
What are you trying to achieve by setting the parallelism?

On Sat, Apr 15, 2023 at 5:13 PM Jeff Zhang  wrote:

> Thanks Reuven, what I mean is to set the parallelism in operator level.
> And the input size of the operator is unknown at compiling stage if it is
> not a source
>  operator,
>
> Here's an example of flink
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/#operator-level
> Spark also support to set operator level parallelism (see groupByKey and
> reduceByKey):
> https://spark.apache.org/docs/latest/rdd-programming-guide.html
>
>
> On Sun, Apr 16, 2023 at 1:42 AM Reuven Lax via user 
> wrote:
>
>> The maximum parallelism is always determined by the parallelism of your
>> data. If you do a GroupByKey for example, the number of keys in your data
>> determines the maximum parallelism.
>>
>> Beyond the limitations in your data, it depends on your execution engine.
>> If you're using Dataflow, Dataflow is designed to automatically determine
>> the parallelism (e.g. work will be dynamically split and moved around
>> between workers, the number of workers will autoscale, etc.), so there's no
>> need to explicitly set the parallelism of the execution.
>>
>> On Sat, Apr 15, 2023 at 1:12 AM Jeff Zhang  wrote:
>>
>>> Besides the global parallelism of beam job, is there any way to set
>>> parallelism for individual operators like group by and join? I
>>> understand the parallelism setting depends on the underlying execution
>>> engine, but it is very common to set parallelism like group by and join in
>>> both spark & flink.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> --
>>> Best Regards
>>>
>>> Jeff Zhang
>>>
>>
>
> --
> Best Regards
>
> Jeff Zhang
>


Re: Message guarantees

2023-04-14 Thread Robert Bradshaw via user
That is correct.

On Tue, Apr 11, 2023 at 5:44 AM Hans Hartmann  wrote:

>
> Hello,
>
> i'm wondering if Apache Beam is using the message guarantees of the
> execution engines, that the pipeline is running on.
>
> So if i use the SparkRunner the consistency guarantees are exactly-once?
>
> Have a good day and thanks,
>
> Hans Hartmann
>
>


Re: Why is FlatMap different from composing Flatten and Map?

2023-03-15 Thread Robert Bradshaw via user
On Mon, Mar 13, 2023 at 11:33 AM Godefroy Clair 
wrote:

> Hi,
> I am wondering about the way `Flatten()` and `FlatMap()` are implemented
> in Apache Beam Python.
> In most functional languages, FlatMap() is the same as composing
> `Flatten()` and `Map()` as indicated by the name, so Flatten() and
> Flatmap() have the same input.
> But in Apache Beam, Flatten() is using _iterable of PCollections_ while
> FlatMap() is working with _PCollection of Iterables_.
>
> If I am not wrong, the signature of Flatten, Map and FlatMap are :
> ```
> Flatten:: Iterable[PCollections[A]] -> PCollection[A]
> Map:: (PCollection[A], (A-> B)) -> PCollection[B]
> FlatMap:: (PCollection[Iterable[A]], (A->B)) -> [A]
>

FlatMap is actually (PCollection[A], (A->Iterable[B])) -> PCollection[B].


> ```
>
> So my question is is there another "Flatten-like" function  with this
> signature :
> ```
> anotherFlatten:: PCollection[Iterable[A]] -> PCollection[A]
> ```
>
> One of the reason this would be useful, is that when you just want to
> "flatten" a `PCollection` of `iterable` you have to use `FlatMap()`with an
> identity function.
>
> So instead of writing:
> `FlatMap(lambda e: e)`
> I would like to use a function
> `anotherFlatten()`
>

As Reuven mentions, Beam's Flatten could have been called Union, in which
case we'd free up the name Flatten for the PCollection[Iterable[A]] ->
PCollection[A] operation. It's Flatten for historical reasons, and would be
difficult to change now.

FlumeJava uses static constructors to provide Flatten.Iterables:
PCollection[Iterable[A]] -> PCollection[A] vs.  Flatten.PCollections:
Iterable[PCollection[A]] -> PCollection[A].

If you want a FlattenIterables in Python, you could easily implement it as
a composite transform [2] whose implementation is passing the identity
function to FlatMap.

[1]
https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/Flatten.html
[2]
https://beam.apache.org/documentation/programming-guide/#composite-transforms


Re: Deduplicate usage

2023-03-02 Thread Robert Bradshaw via user
Whenever state is used, the runner will arrange such that the same
keys will all go to the same worker, which often involves injecting a
shuffle-like operation if the keys are spread out among many workers
in the input. (An alternative implementation could involve storing the
state in a distributed transactional store with the appropriate
locks.) There is no need for you to do anything before calling the
Deduplicate transform.

On Thu, Mar 2, 2023 at 4:34 PM Binh Nguyen Van  wrote:
>
> Thanks Reuven,
>
> I got the idea of the state is per key and keys are distributed across 
> workers but I am trying to understand where/how the distribution part is 
> implemented so that elements with the same keys will go to the same worker. 
> Do I need to do this before calling `Deduplicate` transform? If not then 
> where is it being implemented?
>
> Thanks
> -Binh
>
>
> On Thu, Mar 2, 2023 at 12:57 PM Reuven Lax via user  
> wrote:
>>
>> State is per-key, and keys are distributed across workers. Two workers 
>> should not be working on the same state.
>>
>> On Thu, Mar 2, 2023 at 10:48 AM Binh Nguyen Van  wrote:
>>>
>>> Thank you Ankur,
>>>
>>> This is the current source code of Deduplicate transform.
>>>
>>>   Boolean seen = seenState.read();
>>>   // Seen state is either set or not set so if it has been set then it 
>>> must be true.
>>>   if (seen == null) {
>>> // We don't want the expiry timer to hold up watermarks.
>>> expiryTimer.offset(duration).withNoOutputTimestamp().setRelative();
>>> seenState.write(true);
>>> receiver.output(element);
>>>   }
>>>
>>> Could you please explain the synchronization for the following scenario?
>>>
>>> There are two workers.
>>> Both workers read the same state at the same time and the state was not set 
>>> yet. In this case, both will get null in the response (I believe)
>>> Both of them will try to set the state and send the output out.
>>>
>>> What will happen in this scenario?
>>>
>>> Thank you
>>> -Binh
>>>
>>>
>>> On Thu, Mar 2, 2023 at 10:29 AM Ankur Goenka  wrote:

 Hi Binh, The Deduplicate transform uses state api to do the de-duplication 
 which should do the needful operations to work across multiple concurrent 
 workers.

 Thanks,
 Ankur

 On Thu, 2 Mar 2023 at 10:18, Binh Nguyen Van  wrote:
>
> Hi,
>
> I am writing a pipeline and want to apply deduplication. I look at 
> Deduplicate transform that Beam provides and wonder about its usage. Do I 
> need to shuffle input collection by key before calling this 
> transformation? I look at its source code and it doesn’t do any shuffle 
> so wonder how it works when let’s say there are duplicates and the 
> duplicated elements are processed concurrently on multiple workers.
>
> Thank you
> -Binh


Re: OpenJDK8 / OpenJDK11 container deprecation

2023-02-07 Thread Robert Bradshaw via user
Seams reasonable to me.

On Tue, Feb 7, 2023 at 4:19 PM Luke Cwik via user  wrote:
>
> As per [1], the JDK8 and JDK11 containers that Apache Beam uses have stopped 
> being built and supported since July 2022. I have filed [2] to track the 
> resolution of this issue.
>
> Based upon [1], almost everyone is swapping to the eclipse-temurin 
> container[3] as their base based upon the linked issues from the deprecation 
> notice[1]. The eclipse-temurin container is released under these licenses:
> Apache License, Version 2.0
> Eclipse Distribution License 1.0 (BSD)
> Eclipse Public License 2.0
> 一 (Secondary) GNU General Public License, version 2 with OpenJDK Assembly 
> Exception
> 一 (Secondary) GNU General Public License, version 2 with the GNU Classpath 
> Exception
>
> I propose that we swap all our containers to the eclipse-temurin 
> containers[3].
>
> Open to other ideas and also would be great to hear about your experience in 
> any other projects that you have had to make a similar decision.
>
> 1: https://github.com/docker-library/openjdk/issues/505
> 2: https://github.com/apache/beam/issues/25371
> 3: https://hub.docker.com/_/eclipse-temurin


Re: How to submit beam python pipeline to GKE flink cluster

2023-02-03 Thread Robert Bradshaw via user
You should be able to omit the environment_type and environment_config
variables and they will be populated automatically. For running
locally, the flink_master parameter is not needed either (one will be
started up automatically).

On Fri, Feb 3, 2023 at 12:51 PM Talat Uyarer via user
 wrote:
>
> Hi,
>
> Do you use Flink operator or manually deployed session cluster ?
>
> Thanks
>
> On Fri, Feb 3, 2023, 4:32 AM P Singh  wrote:
>>
>> Hi Team,
>>
>> I have set up a flink cluster on GKE and am trying to submit a beam pipeline 
>> with below options. I was able to run this on a local machine but I don't 
>> understand what would be the environment_config? What should I do? what to 
>> put here instead of localhost:5
>>
>> Please help.
>> options = PipelineOptions([
>> "--runner=FlinkRunner",
>> "--flink_version=1.14",
>> "--flink_master=localhost:8081",
>> "--environment_type=EXTERNAL", #EXTERNAL
>> "--environment_config=localhost:5",
>> ])


Re: Dataflow and mounting large data sets

2023-01-30 Thread Robert Bradshaw via user
I'm also not sure it's part of the contract that the containerization
technology we use will always have these capabilities.

On Mon, Jan 30, 2023 at 10:53 AM Chad Dombrova  wrote:
>
> Hi Valentyn,
>
>>
>> Beam SDK docker containers on Dataflow VMs are currently launched in 
>> privileged mode.
>
>
> Does this only apply to stock sdk containers?  I'm asking because we use a 
> custom sdk container that we build.  We've tried various ways of running 
> mount from within our custom beam container in Dataflow and we could not get 
> it to work, while the same thing succeeds in local tests and in our CI 
> (gitlab).  The assessment at the time (this was maybe a year ago) was that 
> the container was not running in privileged mode, but if you think that's 
> incorrect we can revisit this and report back with some error logs.
>
> -chad
>


Re: Dataflow and mounting large data sets

2023-01-30 Thread Robert Bradshaw via user
Different idea: is it possible to serve this data via another protocol
(e.g. sftp) rather than requiring a mount?

On Mon, Jan 30, 2023 at 9:26 AM Chad Dombrova  wrote:
>
> Hi Robert,
> I know very little about the FileSystem classes, but I don’t think it’s 
> possible for a process running in docker to create an NFS mount without 
> running in privileged [1] mode, which cannot be done with Dataflow. The other 
> ways of gaining access to a mount are:
>
> A. the node running docker has the NFS mount itself and passes it along using 
> docker run --volume.
> B. the mount is created within the container by using docker run --mount.
>
> Neither of these are possible with Dataflow.
>
> Here’s a full example of how an NFS mount can be created when running docker:
>
> docker run -it --network=host \
>--mount 
> 'type=volume,src=pipe-nfs-test,dst=/Volumes/pipe-nfs-test,volume-driver=local,volume-opt=type=nfs,volume-opt=device=:/pipe,"volume-opt=o=addr=turbohal.luma.mel,vers=3"'
>   \
>luma/pipe-shell -- bash
>
> In my ideal world, I would make a PR to add support for the docker --mount 
> flag to Beam for the runners that I can control, and the Dataflow team would 
> add support on their end.
>
> Let me know if I'm missing anything.
>
> https://docs.docker.com/engine/reference/run/#runtime-privilege-and-linux-capabilities
>
> thanks,
> -chad


Re: Dataflow and mounting large data sets

2023-01-30 Thread Robert Bradshaw via user
If it's your input/output data, presumably you could implement a
https://beam.apache.org/releases/javadoc/2.3.0/org/apache/beam/sdk/io/FileSystem.html
for nfs. (I don't know what all that would entail...)

On Mon, Jan 30, 2023 at 9:04 AM Chad Dombrova  wrote:
>
> Hi Israel,
> Thanks for responding.
>
>> And could not the dataset be accessed from Cloud Storage? Does it need to be 
>> specifically NFS?
>
>
> No unfortunately it can't be accessed from Cloud Storage.   Our data resides 
> on high performance Isilon [1] servers using a posix filesystem, and NFS is 
> the tried and true protocol for this.  This configuration cannot be changed 
> for a multitude of reasons, not least of which is that fact that these 
> servers outperform cloud storage at a fraction of the cost of cloud offerings 
> (which is a very big difference for multiple petabytes of storage.  If you'd 
> like more details on why this is not possible I'm happy to explain, but for 
> now let's just say that it's been investigated and it's not practical).  The 
> use of fast posix filers over NFS is fairly ubiquitous in the media and 
> entertainment industry (if you want to know more about how we use Beam, I 
> gave a talk at the Beam Summit a few years ago[2]).
>
> thanks!
> -chad
>
> [1] https://www.dell.com/en-hk/dt/solutions/media-entertainment.htm
> [2] https://www.youtube.com/watch?v=gvbQI3I03a8=644s_channel=ApacheBeam
>


Re: [Python] Heterogeneous TaggedOutput Type Hints

2021-09-09 Thread Robert Bradshaw
Huh, that's strange. Yes, the exact error on the service would be helpful.

On Wed, Sep 8, 2021 at 10:12 AM Evan Galpin  wrote:
>
> Thanks for the response. I've created a gist here to demonstrate a minimal 
> repro: https://gist.github.com/egalpin/2d6ad2210cf9f66108ff48a9c7566ebc
>
> It seemed to run fine both on DirectRunner and PortableRunner (embed mode), 
> but Dataflow v2 runner raised an error at runtime seemingly associated with 
> the Shuffle service?  I have job IDs and trace links if those are helpful as 
> well.
>
> Thanks,
> Evan
>
> On Tue, Sep 7, 2021 at 4:35 PM Robert Bradshaw  wrote:
>>
>> This is not yet supported. Using a union for now is the way to go. (If
>> only the last value of the union was used, that sounds like a bug. Do
>> you have a minimal repro?)
>>
>> On Tue, Sep 7, 2021 at 1:23 PM Evan Galpin  wrote:
>> >
>> > Hi all,
>> >
>> > What is the recommended way to write type hints for a tagged output DoFn 
>> > where the outputs to different tags have different types?
>> >
>> > I tried using a Union to describe each of the possible output types, but 
>> > that resulted in mismatched coder errors where only the last entry in the 
>> > Union was used as the assumed type.  Is there a way to associate a type 
>> > hint to a tag or something like that?
>> >
>> > Thanks,
>> > Evan


Re: GroupIntoShards not sending bytes further when dealing with huge amount of data

2021-06-14 Thread Robert Bradshaw
Awesome, thanks!

On Mon, Jun 14, 2021 at 5:36 PM Evan Galpin  wrote:
>
> I’ll try to create something as small as possible from the pipeline I 
> mentioned  I should have time this week to do so.
>
> Thanks,
> Evan
>
> On Mon, Jun 14, 2021 at 18:09 Robert Bradshaw  wrote:
>>
>> Is it possible to post the code? (Or the code of a similar, but
>> minimal, pipeline that exhibits the same issues?)
>>
>> On Mon, Jun 14, 2021 at 2:15 PM Evan Galpin  wrote:
>> >
>> > @robert I have a pipeline which consistently shows a major slowdown (10 
>> > seconds Vs 10 minutes) between version <=2.23.0 and >=2.25.0 that can be 
>> > boiled down to:
>> >
>> > - Read GCS file patterns from PubSub
>> > - Window into Fixed windows (repeating every 15 seconds)
>> > - Deduplicate/distinct (have tried both)
>> > - Read GCS blobs via patterns from the first step
>> > - Write file contents to sink
>> >
>> > It doesn't seem to matter if there are 0 messages in a subscription or 50k 
>> > messages at startup. The rate of new messages however is very low. Not 
>> > sure if those are helpful details, let me know if there's anything else 
>> > specific which would help.
>> >
>> > On Mon, Jun 14, 2021 at 12:44 PM Robert Bradshaw  
>> > wrote:
>> >>
>> >> +1, we'd really like to get to the bottom of this, so clear
>> >> instructions on a pipeline/conditions that can reproduce it would be
>> >> great.
>> >>
>> >> On Mon, Jun 14, 2021 at 7:34 AM Jan Lukavský  wrote:
>> >> >
>> >> > Hi Eddy,
>> >> >
>> >> > you are probably hitting a not-yet discovered bug in SDF implementation 
>> >> > in FlinkRunner that (under some currently unknown conditions) seems to 
>> >> > stop advancing the watermark. This has been observed in one other 
>> >> > instance (that I'm aware of). I think we don't yet have a tracking JIRA 
>> >> > for that, would you mind filling it? It would be awesome if you could 
>> >> > include estimations of messages per sec throughput that causes the 
>> >> > issue in your case.
>> >> >
>> >> > +Tobias Kaymak
>> >> >
>> >> > Tobias, could you please confirm that the case you had with Flink 
>> >> > stopping progressing watermark resembled this one?
>> >> >
>> >> > Thanks.
>> >> >
>> >> >  Jan
>> >> >
>> >> > On 6/14/21 4:11 PM, Eddy G wrote:
>> >> >
>> >> > Hi Jan,
>> >> >
>> >> > I've added --experiments=use_deprecated_read and it seems to work 
>> >> > flawlessly (with my current Window and the one proposed by Evan).
>> >> >
>> >> > Why is this? Do Splittable DoFn now break current implementations? Are 
>> >> > there any posts of possible breaking changes?
>> >> >
>> >> > On 2021/06/14 13:19:39, Jan Lukavský  wrote:
>> >> >
>> >> > Hi Eddy,
>> >> >
>> >> > answers inline.
>> >> >
>> >> > On 6/14/21 3:05 PM, Eddy G wrote:
>> >> >
>> >> > Hi Jan,
>> >> >
>> >> > Thanks for replying so fast!
>> >> >
>> >> > Regarding your questions,
>> >> >
>> >> > - "Does your data get buffered in a state?"
>> >> > Yes, I do have a state within a stage prior ParquetIO writing together 
>> >> > with a Timer with PROCESSING_TIME.
>> >> >
>> >> > The stage which contains the state does send bytes to the next one 
>> >> > which is the ParquetIO writing. Seems the @OnTimer doesn't get 
>> >> > triggered and it's not clearing the state. This however does work under 
>> >> > normal circumstances without having too much data queued waiting to be 
>> >> > processed.
>> >> >
>> >> > OK, this suggests, that the watermark is for some reason "stuck". If you
>> >> > checkpoints enabled, you should see the size of the checkpoint to grow
>> >> > over time.
>> >> >
>> >> > - "Do you see watermark being updated in your Flink WebUI?"
>> >> > The stages that do have a watermark don't get updated. The same 
>> >> > watermark value has been constant

Re: GroupIntoShards not sending bytes further when dealing with huge amount of data

2021-06-14 Thread Robert Bradshaw
Is it possible to post the code? (Or the code of a similar, but
minimal, pipeline that exhibits the same issues?)

On Mon, Jun 14, 2021 at 2:15 PM Evan Galpin  wrote:
>
> @robert I have a pipeline which consistently shows a major slowdown (10 
> seconds Vs 10 minutes) between version <=2.23.0 and >=2.25.0 that can be 
> boiled down to:
>
> - Read GCS file patterns from PubSub
> - Window into Fixed windows (repeating every 15 seconds)
> - Deduplicate/distinct (have tried both)
> - Read GCS blobs via patterns from the first step
> - Write file contents to sink
>
> It doesn't seem to matter if there are 0 messages in a subscription or 50k 
> messages at startup. The rate of new messages however is very low. Not sure 
> if those are helpful details, let me know if there's anything else specific 
> which would help.
>
> On Mon, Jun 14, 2021 at 12:44 PM Robert Bradshaw  wrote:
>>
>> +1, we'd really like to get to the bottom of this, so clear
>> instructions on a pipeline/conditions that can reproduce it would be
>> great.
>>
>> On Mon, Jun 14, 2021 at 7:34 AM Jan Lukavský  wrote:
>> >
>> > Hi Eddy,
>> >
>> > you are probably hitting a not-yet discovered bug in SDF implementation in 
>> > FlinkRunner that (under some currently unknown conditions) seems to stop 
>> > advancing the watermark. This has been observed in one other instance 
>> > (that I'm aware of). I think we don't yet have a tracking JIRA for that, 
>> > would you mind filling it? It would be awesome if you could include 
>> > estimations of messages per sec throughput that causes the issue in your 
>> > case.
>> >
>> > +Tobias Kaymak
>> >
>> > Tobias, could you please confirm that the case you had with Flink stopping 
>> > progressing watermark resembled this one?
>> >
>> > Thanks.
>> >
>> >  Jan
>> >
>> > On 6/14/21 4:11 PM, Eddy G wrote:
>> >
>> > Hi Jan,
>> >
>> > I've added --experiments=use_deprecated_read and it seems to work 
>> > flawlessly (with my current Window and the one proposed by Evan).
>> >
>> > Why is this? Do Splittable DoFn now break current implementations? Are 
>> > there any posts of possible breaking changes?
>> >
>> > On 2021/06/14 13:19:39, Jan Lukavský  wrote:
>> >
>> > Hi Eddy,
>> >
>> > answers inline.
>> >
>> > On 6/14/21 3:05 PM, Eddy G wrote:
>> >
>> > Hi Jan,
>> >
>> > Thanks for replying so fast!
>> >
>> > Regarding your questions,
>> >
>> > - "Does your data get buffered in a state?"
>> > Yes, I do have a state within a stage prior ParquetIO writing together 
>> > with a Timer with PROCESSING_TIME.
>> >
>> > The stage which contains the state does send bytes to the next one which 
>> > is the ParquetIO writing. Seems the @OnTimer doesn't get triggered and 
>> > it's not clearing the state. This however does work under normal 
>> > circumstances without having too much data queued waiting to be processed.
>> >
>> > OK, this suggests, that the watermark is for some reason "stuck". If you
>> > checkpoints enabled, you should see the size of the checkpoint to grow
>> > over time.
>> >
>> > - "Do you see watermark being updated in your Flink WebUI?"
>> > The stages that do have a watermark don't get updated. The same watermark 
>> > value has been constant since the pipeline started.
>> >
>> > If no lateness is set, any late data should be admitted right?
>> >
>> > If no lateness is set, it means allowed lateness of Duration.ZERO, which
>> > means that data that arrive after end-of-window will be dropped.
>> >
>> > Regarding 'droppedDueToLateness' metric, can't see it exposed anywhere, 
>> > neither in Flink UI or Prometheus. I've seen it in Dataflow but seems to 
>> > be a Dataflow specific metric right?
>> >
>> > Should not be Dataflow specific. But if you don't see it, it means it
>> > could be zero. So, we can rule this out.
>> >
>> > We're using KinesisIO for reading messages.
>> >
>> > Kinesis uses UnboundedSource, which is expended to SDF starting from
>> > Beam 2.25.0. The flag should change that as well. Can you try the
>> > --experiments=use_deprecated_read and see if you Pipeline DAG changes
>> > (should not contain Impulse transform at the beginning) and if it solves
>> 

Re: GroupIntoShards not sending bytes further when dealing with huge amount of data

2021-06-14 Thread Robert Bradshaw
+1, we'd really like to get to the bottom of this, so clear
instructions on a pipeline/conditions that can reproduce it would be
great.

On Mon, Jun 14, 2021 at 7:34 AM Jan Lukavský  wrote:
>
> Hi Eddy,
>
> you are probably hitting a not-yet discovered bug in SDF implementation in 
> FlinkRunner that (under some currently unknown conditions) seems to stop 
> advancing the watermark. This has been observed in one other instance (that 
> I'm aware of). I think we don't yet have a tracking JIRA for that, would you 
> mind filling it? It would be awesome if you could include estimations of 
> messages per sec throughput that causes the issue in your case.
>
> +Tobias Kaymak
>
> Tobias, could you please confirm that the case you had with Flink stopping 
> progressing watermark resembled this one?
>
> Thanks.
>
>  Jan
>
> On 6/14/21 4:11 PM, Eddy G wrote:
>
> Hi Jan,
>
> I've added --experiments=use_deprecated_read and it seems to work flawlessly 
> (with my current Window and the one proposed by Evan).
>
> Why is this? Do Splittable DoFn now break current implementations? Are there 
> any posts of possible breaking changes?
>
> On 2021/06/14 13:19:39, Jan Lukavský  wrote:
>
> Hi Eddy,
>
> answers inline.
>
> On 6/14/21 3:05 PM, Eddy G wrote:
>
> Hi Jan,
>
> Thanks for replying so fast!
>
> Regarding your questions,
>
> - "Does your data get buffered in a state?"
> Yes, I do have a state within a stage prior ParquetIO writing together with a 
> Timer with PROCESSING_TIME.
>
> The stage which contains the state does send bytes to the next one which is 
> the ParquetIO writing. Seems the @OnTimer doesn't get triggered and it's not 
> clearing the state. This however does work under normal circumstances without 
> having too much data queued waiting to be processed.
>
> OK, this suggests, that the watermark is for some reason "stuck". If you
> checkpoints enabled, you should see the size of the checkpoint to grow
> over time.
>
> - "Do you see watermark being updated in your Flink WebUI?"
> The stages that do have a watermark don't get updated. The same watermark 
> value has been constant since the pipeline started.
>
> If no lateness is set, any late data should be admitted right?
>
> If no lateness is set, it means allowed lateness of Duration.ZERO, which
> means that data that arrive after end-of-window will be dropped.
>
> Regarding 'droppedDueToLateness' metric, can't see it exposed anywhere, 
> neither in Flink UI or Prometheus. I've seen it in Dataflow but seems to be a 
> Dataflow specific metric right?
>
> Should not be Dataflow specific. But if you don't see it, it means it
> could be zero. So, we can rule this out.
>
> We're using KinesisIO for reading messages.
>
> Kinesis uses UnboundedSource, which is expended to SDF starting from
> Beam 2.25.0. The flag should change that as well. Can you try the
> --experiments=use_deprecated_read and see if you Pipeline DAG changes
> (should not contain Impulse transform at the beginning) and if it solves
> your issues?
>
> On 2021/06/14 12:48:58, Jan Lukavský  wrote:
>
> Hi Eddy,
>
> does your data get buffered in a state - e.g. does the size of the state
> grow over time? Do you see watermark being updated in your Flink WebUI?
> When a stateful operation (and GroupByKey is a stateful operation) does
> not output any data, the first place to look at is if watermark
> correctly progresses. If it does not progress, then the input data must
> be buffered in state and the size of the state should grow over time. If
> it progresses, then it might be the case, that the data is too late
> after the watermark (the watermark estimator might need tuning) and the
> data gets dropped (note you don't set any allowed lateness, which
> _might_ cause issues). You could see if your pipeline drops data in
> "droppedDueToLateness" metric. The size of you state would not grow much
> in that situation.
>
> Another hint - If you use KafkaIO, try to disable SDF wrapper for it
> using "--experiments=use_deprecated_read" on command line (which you
> then must pass to PipelineOptionsFactory). There is some suspicion that
> SDF wrapper for Kafka might not work as expected in certain situations
> with Flink.
>
> Please feel free to share any results,
>
> Jan
>
> On 6/14/21 1:39 PM, Eddy G wrote:
>
> As seen in this image https://imgur.com/a/wrZET97, I'm trying to deal with 
> late data (intentionally stopped my consumer so data has been accumulating 
> for several days now). Now, with the following Window... I'm using Beam 2.27 
> and Flink 1.12.
>
>   
> Window.into(FixedWindows.of(Duration.standardMinutes(10)))
>
> And several parsing stages after, once it's time to write within the 
> ParquetIO stage...
>
>   FileIO
>   .writeDynamic()
>   .by(...)
>   .via(...)
>   .to(...)
>   

Re: Issues running Kafka streaming pipeline in Python

2021-06-04 Thread Robert Bradshaw
Glad you were able to figure it out.

Maybe it's moot with runner v2 becoming the default, but we really
should give a clearer error in this case.

On Wed, Jun 2, 2021 at 8:16 PM Chamikara Jayalath  wrote:
>
> Great :)
>
> On Wed, Jun 2, 2021 at 8:15 PM Alex Koay  wrote:
>>
>> Finally figured out the issue.
>> Can confirm that the kafka_taxi job is working as expected now.
>> The issue was that I ran the Dataflow job with an invalid experiments flag 
>> (runner_v2 instead of use_runner_v2), and I was getting logging messages (on 
>> 2.29) that said that I was using Runner V2 even though it seems that I 
>> wasn't.
>> Setting the correct flag fixes the issue (and so I get to see the correctly 
>> expanded transforms in the graph).
>> Thanks for your help Cham!
>>
>> Cheers
>> Alex
>>
>> On Thu, Jun 3, 2021 at 1:07 AM Chamikara Jayalath  
>> wrote:
>>>
>>> Can you mention the Job Logs you see in the Dataflow Cloud Console page for 
>>> your job ? Can you also mention the pipeline and configs you used for 
>>> Dataflow (assuming it's different from what's given in the example) ?
>>> Make sure that you used Dataflow Runner v2 (as given in the example).
>>> Are you providing null keys by any chance ? There's a known issue related 
>>> to that (but if you are just running the example, it should generate 
>>> appropriate keys).
>>>
>>> Unfortunately for actually debugging your job, I need a Dataflow customer 
>>> support ticket.
>>>
>>> Thanks,
>>> Cham
>>>
>>> On Wed, Jun 2, 2021 at 9:45 AM Alex Koay  wrote:

 CC-ing Chamikara as he got omitted from the reply all I did earlier.

 On Thu, Jun 3, 2021 at 12:43 AM Alex Koay  wrote:
>
> Yeah, I figured it wasn't supported correctly on DirectRunner. Stumbled 
> upon several threads saying so.
>
> On Dataflow, I've encountered a few different kinds of issues.
> 1. For the kafka_taxi example, the pipeline would start, the PubSub to 
> Kafka would run, but nothing gets read from Kafka (this seems to get 
> expanded as Dataflow shows KafkaIO.Read + Remove Kafka Metadata 
> sub-transforms.
> 2. For the snippet I shared above, I would vary it either with a "log" 
> transform or a direct "write" back to Kafka. Neither seems to work (and 
> the steps don't get expanded unlike the kafka_taxi example). With the 
> "write" step, I believe it didn't get captured in the Dataflow graph a 
> few times.
> 3. No errors appear in both Job Logs and Worker Logs, except for one 
> message emitted from the "log" step if that happens.
>
> All this is happening while I am producing ~4 messages/sec in Kafka. I 
> can verify that Kafka is working normally remotely and all (ran into some 
> issues setting it up).
> I've also tested the KafkaIO.read transform in Java and can confirm that 
> it works as expected.
>
> As an aside, I put together an ExternalTransform for MqttIO which you can 
> find here: 
> https://gist.github.com/alexkoay/df35eb12bc2afd8f502ef13bc915b33c
> I can confirm that it works in batch mode, but given that I couldn't get 
> Kafka to work with Dataflow, I don't have much confidence in getting this 
> to work.
>
> Thanks for your help.
>
> On Thu, Jun 3, 2021 at 12:05 AM Chamikara Jayalath  
> wrote:
>>
>> What error did you run into with Dataflow ? Did you observe any errors 
>> in worker logs ?
>> If you follow the steps given in the example here it should work. Make 
>> sure Dataflow workers have access to Kafka bootstrap servers you provide.
>>
>> Portable DirectRunner currently doesn't support streaming mode so you 
>> need to convert your pipeline to a batch pipeline and provide 
>> 'max_num_records' or 'max_read_time' to convert the Kafka source to a 
>> batch source.
>> This is tracked in https://issues.apache.org/jira/browse/BEAM-7514.
>>
>> Also portable runners (Flink, Spark etc.) have a known issue related to 
>> SDF checkpointing in streaming mode which results in messages not being 
>> pushed to subsequent steps. This is tracked in 
>> https://issues.apache.org/jira/browse/BEAM-11998.
>>
>> Thanks,
>> Cham
>>
>> On Wed, Jun 2, 2021 at 8:28 AM Ahmet Altay  wrote:
>>>
>>> /cc @Boyuan Zhang for kafka @Chamikara Jayalath for multi language 
>>> might be able to help.
>>>
>>> On Tue, Jun 1, 2021 at 9:39 PM Alex Koay  wrote:

 Hi all,

 I have created a simple snippet as such:

 import apache_beam as beam
 from apache_beam.io.kafka import ReadFromKafka
 from apache_beam.options.pipeline_options import PipelineOptions

 import logging
 logging.basicConfig(level=logging.WARNING)

 opts = direct_opts
 with beam.Pipeline(options=PipelineOptions(["--runner=DirectRunner", 
 "--streaming"])) as p:

Re: Is there a way (seetings) to limit the number of element per worker machine

2021-06-02 Thread Robert Bradshaw
On Wed, Jun 2, 2021 at 11:18 AM Vincent Marquez
 wrote:
>
> On Wed, Jun 2, 2021 at 11:11 AM Robert Bradshaw  wrote:
>>
>> If you want to control the total number of elements being processed
>> across all workers at a time, you can do this by assigning random keys
>> of the form RandomInteger() % TotalDesiredConcurrency followed by a
>> GroupByKey.
>>
>> If you want to control the number of elements being processed in
>> parallel per VM, you can use the fact that Dataflow assigns one work
>> item per core, so an n1-standard-4 would process 4 elements in
>> parallel, an n1-highmem-2 would process 2 elements in parallel, etc.
>>
>> You could also control this explicitly by using a global (per worker)
>> semaphore in your code. If you do this you may want to proceed your
>> rate-limited DoFn with a Reshuffle to ensure fair (and dynamic) work
>> distribution. This should be much easier than trying to coordinate
>> multiple parallel pipelines.
>>
>
> Is there a risk here of having an OOM error due to 'build up' of in memory 
> elements from a streaming input?  Or do the runners have some concept of 
> throttling bundles based on progress of stages further down the pipeline?

For streaming pipelines, hundreds of threads (aka work items) are
allocated for each worker, so limiting the number of concurrent items
per worker is harder there.

>> On Fri, May 28, 2021 at 5:16 AM Eila Oriel Research
>>  wrote:
>> >
>> > Thanks Robert.
>> > I found the following explanation for the number of threads for 4 cores:
>> > You have 4 CPU sockets, each CPU can have, up to, 12 cores and each core 
>> > can have two threads. Your max thread count is, 4 CPU x 12 cores x 2 
>> > threads per core, so 12 x 4 x 2 is 96
>> > Can I limit the threads using the pipeline options in some way? 10-20 
>> > elements per worker will work for me.
>> >
>> > My current practice to work around that issue is to limit the number of 
>> > elements in each dataflow pipeline (providing ~10 elements for each 
>> > pipeline)
>> > Once I have completed around 200 elements processing = 20 pipelines 
>> > (google does not allow more than 25 dataflow pipelines per region) with 10 
>> > elements each, I am launching the next 20 pipelines.
>> >
>> > This is ofcourse missing the benefit of serverless.
>> >
>> > Any idea, how to work around this?
>> >
>> > Best,
>> > Eila
>> >
>> >
>> > On Mon, May 17, 2021 at 1:27 PM Robert Bradshaw  
>> > wrote:
>> >>
>> >> Note that workers generally process one element per thread at a time. The 
>> >> number of threads defaults to the number of cores of the VM that you're 
>> >> using.
>> >>
>> >> On Mon, May 17, 2021 at 10:18 AM Brian Hulette  
>> >> wrote:
>> >>>
>> >>> What type of files are you reading? If they can be split and read by 
>> >>> multiple workers this might be a good candidate for a Splittable DoFn 
>> >>> (SDF).
>> >>>
>> >>> Brian
>> >>>
>> >>> On Wed, May 12, 2021 at 6:18 AM Eila Oriel Research 
>> >>>  wrote:
>> >>>>
>> >>>> Hi,
>> >>>> I am running out of resources on the workers machines.
>> >>>> The reasons are:
>> >>>> 1. Every pcollection is a reference to a LARGE file that is copied into 
>> >>>> the worker
>> >>>> 2. The worker makes calculations on the copied file using a software 
>> >>>> library that consumes memory / storage / compute resources
>> >>>>
>> >>>> I have changed the workers' CPUs and memory size. At some point, I am 
>> >>>> running out of resources with this method as well
>> >>>> I am looking to limit the number of pCollection / elements that are 
>> >>>> being processed in parallel on each worker at a time.
>> >>>>
>> >>>> Many thank for any advice,
>> >>>> Best wishes,
>> >>>> --
>> >>>> Eila
>> >>>>
>> >>>> Meetup
>> >
>> >
>> >
>> > --
>> > Eila
>> >
>> > Meetup
>
>
>
> ~Vincent


Re: Is there a way (seetings) to limit the number of element per worker machine

2021-06-02 Thread Robert Bradshaw
If you want to control the total number of elements being processed
across all workers at a time, you can do this by assigning random keys
of the form RandomInteger() % TotalDesiredConcurrency followed by a
GroupByKey.

If you want to control the number of elements being processed in
parallel per VM, you can use the fact that Dataflow assigns one work
item per core, so an n1-standard-4 would process 4 elements in
parallel, an n1-highmem-2 would process 2 elements in parallel, etc.

You could also control this explicitly by using a global (per worker)
semaphore in your code. If you do this you may want to proceed your
rate-limited DoFn with a Reshuffle to ensure fair (and dynamic) work
distribution. This should be much easier than trying to coordinate
multiple parallel pipelines.

On Fri, May 28, 2021 at 5:16 AM Eila Oriel Research
 wrote:
>
> Thanks Robert.
> I found the following explanation for the number of threads for 4 cores:
> You have 4 CPU sockets, each CPU can have, up to, 12 cores and each core can 
> have two threads. Your max thread count is, 4 CPU x 12 cores x 2 threads per 
> core, so 12 x 4 x 2 is 96
> Can I limit the threads using the pipeline options in some way? 10-20 
> elements per worker will work for me.
>
> My current practice to work around that issue is to limit the number of 
> elements in each dataflow pipeline (providing ~10 elements for each pipeline)
> Once I have completed around 200 elements processing = 20 pipelines (google 
> does not allow more than 25 dataflow pipelines per region) with 10 elements 
> each, I am launching the next 20 pipelines.
>
> This is ofcourse missing the benefit of serverless.
>
> Any idea, how to work around this?
>
> Best,
> Eila
>
>
> On Mon, May 17, 2021 at 1:27 PM Robert Bradshaw  wrote:
>>
>> Note that workers generally process one element per thread at a time. The 
>> number of threads defaults to the number of cores of the VM that you're 
>> using.
>>
>> On Mon, May 17, 2021 at 10:18 AM Brian Hulette  wrote:
>>>
>>> What type of files are you reading? If they can be split and read by 
>>> multiple workers this might be a good candidate for a Splittable DoFn (SDF).
>>>
>>> Brian
>>>
>>> On Wed, May 12, 2021 at 6:18 AM Eila Oriel Research 
>>>  wrote:
>>>>
>>>> Hi,
>>>> I am running out of resources on the workers machines.
>>>> The reasons are:
>>>> 1. Every pcollection is a reference to a LARGE file that is copied into 
>>>> the worker
>>>> 2. The worker makes calculations on the copied file using a software 
>>>> library that consumes memory / storage / compute resources
>>>>
>>>> I have changed the workers' CPUs and memory size. At some point, I am 
>>>> running out of resources with this method as well
>>>> I am looking to limit the number of pCollection / elements that are being 
>>>> processed in parallel on each worker at a time.
>>>>
>>>> Many thank for any advice,
>>>> Best wishes,
>>>> --
>>>> Eila
>>>>
>>>> Meetup
>
>
>
> --
> Eila
>
> Meetup


Re: Is there a way (seetings) to limit the number of element per worker machine

2021-05-17 Thread Robert Bradshaw
Note that workers generally process one element per thread at a time. The
number of threads defaults to the number of cores of the VM that you're
using.

On Mon, May 17, 2021 at 10:18 AM Brian Hulette  wrote:

> What type of files are you reading? If they can be split and read by
> multiple workers this might be a good candidate for a Splittable DoFn (SDF).
>
> Brian
>
> On Wed, May 12, 2021 at 6:18 AM Eila Oriel Research <
> e...@orielresearch.org> wrote:
>
>> Hi,
>> I am running out of resources on the workers machines.
>> The reasons are:
>> 1. Every pcollection is a reference to a LARGE file that is copied into
>> the worker
>> 2. The worker makes calculations on the copied file using a software
>> library that consumes memory / storage / compute resources
>>
>> I have changed the workers' CPUs and memory size. At some point, I am
>> running out of resources with this method as well
>> I am looking to limit the number of pCollection / elements that are being
>> processed in parallel on each worker at a time.
>>
>> Many thank for any advice,
>> Best wishes,
>> --
>> Eila
>> 
>> Meetup 
>>
>


Re: [EXT] Re: [EXT] Re: [EXT] Re: [EXT] Re: Beam Dataframe - sort and grouping

2021-05-13 Thread Robert Bradshaw
Sharding is determined by the distribution of work. Each worker writes to
its own shard, and in the case of dynamic partitioning, etc. workers may
end up processing more than one "bundle" of items and hence produce more
than one shard. See also
https://beam.apache.org/documentation/runtime/model/

On Thu, May 13, 2021 at 3:58 PM Wenbing Bai 
wrote:

> Hi team,
>
> I have another question when using Beam Dataframe IO connector. I tried
> to_parquet, and my data are written to several different files. I am
> wondering how I can control the number of files (shards) or how the
> sharding is done for to_parquet and other Beam Dataframe IO APIs?
>
> Thank you!
> Wenbing
>
> On Tue, May 11, 2021 at 12:20 PM Kenneth Knowles  wrote:
>
>> +dev 
>>
>> In the Beam Java ecosystem, this functionality is provided by the Sorter
>> library (
>> https://beam.apache.org/documentation/sdks/java-extensions/#sorter). I'm
>> curious what people think about various options:
>>
>>  - Python version of the transform(s)
>>  - Expose sorter as xlang transform(s)
>>  - Convenience transforms (that use pandas in DoFns?) to just do it for
>> small data per key to achieve compatibility
>>  - Beam model extension so that runners can do it as part of GBK
>>
>> Kenn
>>
>> On Mon, May 10, 2021 at 5:26 PM Wenbing Bai 
>> wrote:
>>
>>> Hi Robert and Brian,
>>>
>>> I don't know why I didn't catch your replies. But thank you so much for
>>> looking at this.
>>>
>>> My parquet files will be consumed by downstreaming processes which
>>> require data points with the same "key1" that are sorted by "key2". The
>>> downstreaming process, for example, will make a rolling window with size N
>>> that reads N records together at one time. But note, the rolling window
>>> will not cross different "key1".
>>>
>>> So that is saying, 1) I don't need to sort the whole dataset. 2) all
>>> data with the same "key1" should be located together.
>>>
>>> I am not sure if I explain the use case clearly. Let me know what you
>>> think.
>>>
>>> Wenbing
>>>
>>>
>>> On Tue, Apr 20, 2021 at 5:01 PM Robert Bradshaw 
>>> wrote:
>>>
>>>> It would also be helpful to understand what your overall objective is
>>>> with this output. Is there a reason you need it sorted/partitioned in a
>>>> certain way?
>>>>
>>>> On Tue, Apr 20, 2021 at 4:51 PM Brian Hulette 
>>>> wrote:
>>>>
>>>>> Hi Wenbing,
>>>>> Sorry for taking so long to get back to you on this.
>>>>> I discussed this with Robert offline and we came up with a potential
>>>>> workaround - you could try writing out the Parquet file from within the
>>>>> groupby.apply method. You can use beam's FileSystems abstraction to open a
>>>>> Python file object referencing a cloud storage file, and pass that file
>>>>> object directly to the pandas to_parquet. It would look something like 
>>>>> this:
>>>>>
>>>>>   df.groupby('key1').apply(lambda df:
>>>>> df.sort_values(by='key2').to_parquet(FileSystems.open("gs://bucket/file.pq"))
>>>>>
>>>>> If writing out sorted, partitioned parquet files is a common use-case
>>>>> we should think about making this easier though. At the very least
>>>>> partition_cols should work, I filed BEAM-12201 [1] for this. That alone
>>>>> won't be enough as our implementation will likely reshuffle the dataset to
>>>>> enforce the partitioning, removing any sorting that you've applied, so 
>>>>> we'd
>>>>> also need to think about how to optimize the pipeline to avoid that 
>>>>> shuffle.
>>>>>
>>>>> Brian
>>>>>
>>>>> [1] https://issues.apache.org/jira/browse/BEAM-12201
>>>>>
>>>>> On Wed, Apr 7, 2021 at 9:02 PM Wenbing Bai 
>>>>> wrote:
>>>>>
>>>>>> Thank you, Brian. I tried `partition_cols`, but it is not working. I
>>>>>> tried pure pandas, it does work, so I am not sure if anything wrong with
>>>>>> Beam.
>>>>>>
>>>>>> Wenbing
>>>>>>
>>>>>> On Wed, Apr 7, 2021 at 2:56 PM Brian Hulette 
>>>>>> wrote:
>>>>>>
>>>>>>> Hm, to_par

Re: Question on printing out a PCollection

2021-04-30 Thread Robert Bradshaw
Sorry, no Java versions of this stuff (though it may be possible to
use cross-language to invoke your Java pipeline from Python and get
the benefits that way).

On Fri, Apr 30, 2021 at 11:30 AM Tao Li  wrote:
>
> Thanks @Ning Kang.
>
> @Robert Bradshaw I assume you are referring to 
> https://beam.apache.org/releases/pydoc/2.22.0/apache_beam.runners.interactive.interactive_beam.html.
>  Is there a java version for it?
>
>
>
> On 4/30/21, 11:00 AM, "Robert Bradshaw"  wrote:
>
> You can also use interactive Beam's collect, to get the PCollection as
> a Dataframe, and then print it or do whatever else with it as you
> like.
>
> On Fri, Apr 30, 2021 at 10:24 AM Ning Kang  wrote:
> >
> > Hi Tao,
> >
> > The `show()` API works with any IPython notebook runtimes, including 
> Colab, Jupyter Lab and pre-lab Jupyter Notebooks, as long as you have `%pip 
> install apache-beam[interactive]`.
> >
> > Additionally, the `show_graph()` API needs GraphViz binary installed, 
> details can be found in the README.
> >
> > If you've created an Apache Beam notebook instance on Google Cloud, 
> there is an example notebook "Examples/Visualize_Data.ipynb" demonstrating 
> how to visualize data of PCollections with different libraries:
> >
> > Native Interactive Beam Visualization
> > Pandas DataFrame
> > Matplotlib
> > Seaborn
> > Bokeh
> > D3.js
> >
> > Hope this helps!
> >
> > Ning
> >
> > On Fri, Apr 30, 2021 at 9:24 AM Brian Hulette  
> wrote:
> >>
> >> +Ning Kang +Sam Rohde
> >>
> >> On Thu, Apr 29, 2021 at 6:13 PM Tao Li  wrote:
> >>>
> >>> Hi Beam community,
> >>>
> >>>
> >>>
> >>> The notebook console from Google Cloud defines a show() API to 
> display a PCollection which is very neat: 
> https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcloud.google.com%2Fdataflow%2Fdocs%2Fguides%2Finteractive-pipeline-developmentdata=04%7C01%7Ctaol%40zillow.com%7Cc17e88a15ae34f84412908d90c01d404%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637554024259762896%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000sdata=6tm5K%2BwZhuNhBtxsCu4AIHtBZpoTj2kgfHkeDzjLQ1g%3Dreserved=0
> >>>
> >>>
> >>>
> >>> If we are using a regular jupyter notebook to run beam app, how can 
> we print out a PCollection easily? What’s the best practice? Thanks!
> >>>
> >>>
>


Re: Question on printing out a PCollection

2021-04-30 Thread Robert Bradshaw
You can also use interactive Beam's collect, to get the PCollection as
a Dataframe, and then print it or do whatever else with it as you
like.

On Fri, Apr 30, 2021 at 10:24 AM Ning Kang  wrote:
>
> Hi Tao,
>
> The `show()` API works with any IPython notebook runtimes, including Colab, 
> Jupyter Lab and pre-lab Jupyter Notebooks, as long as you have `%pip install 
> apache-beam[interactive]`.
>
> Additionally, the `show_graph()` API needs GraphViz binary installed, details 
> can be found in the README.
>
> If you've created an Apache Beam notebook instance on Google Cloud, there is 
> an example notebook "Examples/Visualize_Data.ipynb" demonstrating how to 
> visualize data of PCollections with different libraries:
>
> Native Interactive Beam Visualization
> Pandas DataFrame
> Matplotlib
> Seaborn
> Bokeh
> D3.js
>
> Hope this helps!
>
> Ning
>
> On Fri, Apr 30, 2021 at 9:24 AM Brian Hulette  wrote:
>>
>> +Ning Kang +Sam Rohde
>>
>> On Thu, Apr 29, 2021 at 6:13 PM Tao Li  wrote:
>>>
>>> Hi Beam community,
>>>
>>>
>>>
>>> The notebook console from Google Cloud defines a show() API to display a 
>>> PCollection which is very neat: 
>>> https://cloud.google.com/dataflow/docs/guides/interactive-pipeline-development
>>>
>>>
>>>
>>> If we are using a regular jupyter notebook to run beam app, how can we 
>>> print out a PCollection easily? What’s the best practice? Thanks!
>>>
>>>


Re: [EXT] Re: [EXT] Re: Beam Dataframe - sort and grouping

2021-04-20 Thread Robert Bradshaw
It would also be helpful to understand what your overall objective is with
this output. Is there a reason you need it sorted/partitioned in a
certain way?

On Tue, Apr 20, 2021 at 4:51 PM Brian Hulette  wrote:

> Hi Wenbing,
> Sorry for taking so long to get back to you on this.
> I discussed this with Robert offline and we came up with a potential
> workaround - you could try writing out the Parquet file from within the
> groupby.apply method. You can use beam's FileSystems abstraction to open a
> Python file object referencing a cloud storage file, and pass that file
> object directly to the pandas to_parquet. It would look something like this:
>
>   df.groupby('key1').apply(lambda df:
> df.sort_values(by='key2').to_parquet(FileSystems.open("gs://bucket/file.pq"))
>
> If writing out sorted, partitioned parquet files is a common use-case we
> should think about making this easier though. At the very least
> partition_cols should work, I filed BEAM-12201 [1] for this. That alone
> won't be enough as our implementation will likely reshuffle the dataset to
> enforce the partitioning, removing any sorting that you've applied, so we'd
> also need to think about how to optimize the pipeline to avoid that shuffle.
>
> Brian
>
> [1] https://issues.apache.org/jira/browse/BEAM-12201
>
> On Wed, Apr 7, 2021 at 9:02 PM Wenbing Bai 
> wrote:
>
>> Thank you, Brian. I tried `partition_cols`, but it is not working. I
>> tried pure pandas, it does work, so I am not sure if anything wrong with
>> Beam.
>>
>> Wenbing
>>
>> On Wed, Apr 7, 2021 at 2:56 PM Brian Hulette  wrote:
>>
>>> Hm, to_parquet does have a `partition_cols` argument [1] which we pass
>>> through [2]. It would be interesting to see what  `partition_cols='key1'`
>>> does - I suspect it won't work perfectly though.
>>>
>>> Do you have any thoughts here Robert?
>>>
>>> [1]
>>> https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_parquet.html
>>> [2]
>>> https://github.com/apache/beam/blob/a8cd05932bed9b2480316fb8518409636cb2733b/sdks/python/apache_beam/dataframe/io.py#L525
>>>
>>> On Wed, Apr 7, 2021 at 2:22 PM Wenbing Bai 
>>> wrote:
>>>
>>>> Hi Robert and Brian,
>>>>
>>>> I tried groupby in my case. Here is my pipeline code. I do see all the
>>>> data in the final parquet file are sorted in each group. However, I'd like
>>>> to write each partition (group) to an individual file, how can I achieve
>>>> it? In addition, I am using the master of Apache Beam SDK, how can I test
>>>> the pipeline with DataflowRunner considering there is no dataflow worker
>>>> image available?
>>>>
>>>> data = [
>>>> {
>>>> "key1": 1000 + i % 10,
>>>> "key2": randrange(1),
>>>> "feature_1": "somestring{}".format(i)
>>>> } for i in range(1)
>>>> ]
>>>>
>>>> class TestRow(typing.NamedTuple):
>>>> key1: int
>>>> key2: int
>>>> feature_1: str
>>>>
>>>> with beam.Pipeline() as p:
>>>> pcoll = (
>>>> p
>>>> | beam.Create(data)
>>>> | beam.Map(lambda x:x).with_output_types(TestRow)
>>>> )
>>>>
>>>> df = to_dataframe(pcoll)
>>>> sorted_df = df.groupby('key1').apply(lambda df: df.sort_values(by=
>>>> 'key2')
>>>> sorted_df.to_parquet('test_beam_dataframe{}.parquet'.format(str
>>>> (uuid.uuid4())[:8]), engine='pyarrow', index=False)
>>>>
>>>> On Fri, Apr 2, 2021 at 10:00 AM Wenbing Bai 
>>>> wrote:
>>>>
>>>>> Thank you, Robert and Brian.
>>>>>
>>>>> I'd like to try this out. I am trying to distribute my dataset to
>>>>> nodes, sort each partition by some key and then store each partition to 
>>>>> its
>>>>> own file.
>>>>>
>>>>> Wenbing
>>>>>
>>>>> On Fri, Apr 2, 2021 at 9:23 AM Brian Hulette 
>>>>> wrote:
>>>>>
>>>>>> Note groupby.apply [1] in particular should be able to do what you
>>>>>> want, something like:
>>>>>>
>>>>>>   df.groupby('key1').apply(lambda df: df.sort_values('key2'))
>>>>>>
>>>>>> But as Robert noted we don't make any guarantees about preserving
>>>>>> this ordering later in the pipeline. For

Re: Beam Dataframe - sort and grouping

2021-04-02 Thread Robert Bradshaw
Thanks for trying this out.

Better support for groupby (e.g. https://github.com/apache/beam/pull/13843
, https://github.com/apache/beam/pull/13637) will be available in the next
Beam release (2.29, in progress, but you could try out head if you want).
Note, however, that Beam PCollections are by definition unordered, so
unless you sort a partition and immediately do something with it that
ordering may not be preserved. If you could let us know what you're trying
to do with this ordering that would be helpful.

- Robert


On Thu, Apr 1, 2021 at 7:31 PM Wenbing Bai 
wrote:

> Hi Beam users,
>
> I have a user case to partition my PCollection by some key, and then sort
> my rows within the same partition by some other key.
>
> I feel Beam Dataframe could be a candidate solution, but I cannot figure
> out how to make it work. Specifically, I tried df.groupby where I expect my
> data will be distributed to different nodes. I also tried df.sort_values,
> but it will sort my whole dataset, which is not what I need.
>
> Can someone shed some light on this?
>
>
>
>
>
> Wenbing Bai
>
> Senior Software Engineer
>
> Data Infrastructure, Cruise
>
> Pronouns: She/Her
>
>
>
> *Confidentiality Note:* We care about protecting our proprietary
> information, confidential material, and trade secrets. This message may
> contain some or all of those things. Cruise will suffer material harm if
> anyone other than the intended recipient disseminates or takes any action
> based on this message. If you have received this message (including any
> attachments) in error, please delete it immediately and notify the sender
> promptly.


Re: [Question] Need to write a pipeline in Go consuming events from Kafka

2021-03-29 Thread Robert Bradshaw
On Wed, Mar 24, 2021 at 4:24 AM Đức Trần Tiến 
wrote:

>
> And the last question: Could I write that pipeline in Java and invoke that
> pipeline from Go? :D
>

That is exactly the story we're trying to pursue for getting the large set
of Java connectors available to Go:

https://cloud.google.com/blog/products/data-analytics/multi-language-sdks-for-building-cloud-pipelines


Cc'ing some folks that can comment on the status.


Re: Write to multiple IOs in linear fashion

2021-03-24 Thread Robert Bradshaw
Yeah, the entire input is not always what is needed, and can generally be
achieved via

input -> wait(side input of write) -> do something with the input

Of course one could also do

entire_input_as_output_of_wait -> MapTo(KV.of(null, null)) ->
CombineGlobally(TrivialCombineFn)

to reduce this to a more minimal set with at least one element per Window.

The file writing operations emit the actual files that were written, which
can be handy. My suggestion of PCollection was just so that we can emit
something usable, and decide exactly what is the most useful is later.


On Wed, Mar 24, 2021 at 5:30 PM Reuven Lax  wrote:

> I believe that the Wait transform turns this output into a side input, so
> outputting the input PCollection might be problematic.
>
> On Wed, Mar 24, 2021 at 4:49 PM Kenneth Knowles  wrote:
>
>> Alex's idea sounds good and like what Vincent maybe implemented. I am
>> just reading really quickly so sorry if I missed something...
>>
>> Checking out the code for the WriteFn I see a big problem:
>>
>> @Setup
>> public void setup() {
>>   writer = new Mutator<>(spec, Mapper::saveAsync, "writes");
>> }
>>
>> @ProcessElement
>>   public void processElement(ProcessContext c) throws
>> ExecutionException, InterruptedException {
>>   writer.mutate(c.element());
>> }
>>
>> @Teardown
>> public void teardown() throws Exception {
>>   writer.close();
>>   writer = null;
>> }
>>
>> It is only in writer.close() that all async writes are waited on. This
>> needs to happen in @FinishBundle.
>>
>> Did you discover this when implementing your own Cassandra.Write?
>>
>> Until you have waited on the future, you should not output the element as
>> "has been written". And you cannot output from the @TearDown method which
>> is just for cleaning up resources.
>>
>> Am I reading this wrong?
>>
>> Kenn
>>
>> On Wed, Mar 24, 2021 at 4:35 PM Alex Amato  wrote:
>>
>>> How about a PCollection containing every element which was successfully
>>> written?
>>> Basically the same things which were passed into it.
>>>
>>> Then you could act on every element after its been successfully written
>>> to the sink.
>>>
>>> On Wed, Mar 24, 2021 at 3:16 PM Robert Bradshaw 
>>> wrote:
>>>
>>>> On Wed, Mar 24, 2021 at 2:36 PM Ismaël Mejía  wrote:
>>>>
>>>>> +dev
>>>>>
>>>>> Since we all agree that we should return something different than
>>>>> PDone the real question is what should we return.
>>>>>
>>>>
>>>> My proposal is that one returns a PCollection that consists,
>>>> internally, of something contentless like nulls. This is future compatible
>>>> with returning something more maningful based on the source source or write
>>>> process itself, but at least this would be followable.
>>>>
>>>>
>>>>> As a reminder we had a pretty interesting discussion about this
>>>>> already in the past but uniformization of our return values has not
>>>>> happened.
>>>>> This thread is worth reading for Vincent or anyone who wants to
>>>>> contribute Write transforms that return.
>>>>>
>>>>> https://lists.apache.org/thread.html/d1a4556a1e13a661cce19021926a5d0997fbbfde016d36989cf75a07%40%3Cdev.beam.apache.org%3E
>>>>
>>>>
>>>> Yeah, we should go ahead and finally do something.
>>>>
>>>>
>>>>>
>>>>> > Returning PDone is an anti-pattern that should be avoided, but
>>>>> changing it now would be backwards incompatible.
>>>>>
>>>>> Periodic reminder most IOs are still Experimental so I suppose it is
>>>>> worth to the maintainers to judge if the upgrade to return someething
>>>>> different of PDone is worth, in that case we can deprecate and remove
>>>>> the previous signature in short time (2 releases was the average for
>>>>> previous cases).
>>>>>
>>>>>
>>>>> On Wed, Mar 24, 2021 at 10:24 PM Alexey Romanenko
>>>>>  wrote:
>>>>> >
>>>>> > I thought that was said about returning a PCollection of write
>>>>> results as it’s done in other IOs (as I mentioned as examples) that have
>>>>> _additional_ write methods, like “withWrit

Re: Write to multiple IOs in linear fashion

2021-03-24 Thread Robert Bradshaw
On Wed, Mar 24, 2021 at 2:36 PM Ismaël Mejía  wrote:

> +dev
>
> Since we all agree that we should return something different than
> PDone the real question is what should we return.
>

My proposal is that one returns a PCollection that consists, internally,
of something contentless like nulls. This is future compatible
with returning something more maningful based on the source source or write
process itself, but at least this would be followable.


> As a reminder we had a pretty interesting discussion about this
> already in the past but uniformization of our return values has not
> happened.
> This thread is worth reading for Vincent or anyone who wants to
> contribute Write transforms that return.
>
> https://lists.apache.org/thread.html/d1a4556a1e13a661cce19021926a5d0997fbbfde016d36989cf75a07%40%3Cdev.beam.apache.org%3E


Yeah, we should go ahead and finally do something.


>
> > Returning PDone is an anti-pattern that should be avoided, but changing
> it now would be backwards incompatible.
>
> Periodic reminder most IOs are still Experimental so I suppose it is
> worth to the maintainers to judge if the upgrade to return someething
> different of PDone is worth, in that case we can deprecate and remove
> the previous signature in short time (2 releases was the average for
> previous cases).
>
>
> On Wed, Mar 24, 2021 at 10:24 PM Alexey Romanenko
>  wrote:
> >
> > I thought that was said about returning a PCollection of write results
> as it’s done in other IOs (as I mentioned as examples) that have
> _additional_ write methods, like “withWriteResults()” etc, that return
> PTransform<…, PCollection>.
> > In this case, we keep backwards compatibility and just add new
> funtionality. Though, we need to follow the same pattern for user API and
> maybe even naming for this feature across different IOs (like we have for
> "readAll()” methods).
> >
> >  I agree that we have to avoid returning PDone for such cases.
> >
> > On 24 Mar 2021, at 20:05, Robert Bradshaw  wrote:
> >
> > Returning PDone is an anti-pattern that should be avoided, but changing
> it now would be backwards incompatible. PRs to add non-PDone returning
> variants (probably as another option to the builders) that compose well
> with Wait, etc. would be welcome.
> >
> > On Wed, Mar 24, 2021 at 11:14 AM Alexey Romanenko <
> aromanenko@gmail.com> wrote:
> >>
> >> In this way, I think “Wait” PTransform should work for you but, as it
> was mentioned before, it doesn’t work with PDone, only with PCollection as
> a signal.
> >>
> >> Since you already adjusted your own writer for that, it would be great
> to contribute it back to Beam in the way as it was done for other IOs (for
> example, JdbcIO [1] or BigtableIO [2])
> >>
> >> In general, I think we need to have it for all IOs, at least to use
> with “Wait” because this pattern it's quite often required.
> >>
> >> [1]
> https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L1078
> >> [2]
> https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L715
> >>
> >> On 24 Mar 2021, at 18:01, Vincent Marquez 
> wrote:
> >>
> >> No, it only needs to ensure that one record seen on Pubsub has
> successfully written to a database.  So "record by record" is fine, or even
> "bundle".
> >>
> >> ~Vincent
> >>
> >>
> >> On Wed, Mar 24, 2021 at 9:49 AM Alexey Romanenko <
> aromanenko@gmail.com> wrote:
> >>>
> >>> Do you want to wait for ALL records are written for Cassandra and then
> write all successfully written records to PubSub or it should be performed
> "record by record"?
> >>>
> >>> On 24 Mar 2021, at 04:58, Vincent Marquez 
> wrote:
> >>>
> >>> I have a common use case where my pipeline looks like this:
> >>> CassandraIO.readAll -> Aggregate -> CassandraIO.write -> PubSubIO.write
> >>>
> >>> I do NOT want my pipeline to look like the following:
> >>>
> >>> CassandraIO.readAll -> Aggregate -> CassandraIO.write
> >>>  |
> >>>   ->
> PubsubIO.write
> >>>
> >>> Because I need to ensure that only items written to Pubsub have
> successfully finished a (quorum) write.
> >>>
> >>> Since CassandraIO.write is a PTransform I can't actually use
> it here so I often roll my own 'writer', but maybe there is a recommended
> way of doing this?
> >>>
> >>> Thanks in advance for any help.
> >>>
> >>> ~Vincent
> >>>
> >>>
> >>
> >
>


Re: Write to multiple IOs in linear fashion

2021-03-24 Thread Robert Bradshaw
Returning PDone is an anti-pattern that should be avoided, but changing it
now would be backwards incompatible. PRs to add non-PDone returning
variants (probably as another option to the builders) that compose well
with Wait, etc. would be welcome.

On Wed, Mar 24, 2021 at 11:14 AM Alexey Romanenko 
wrote:

> In this way, I think “Wait” PTransform should work for you but, as it was
> mentioned before, it doesn’t work with PDone, only with PCollection as a
> signal.
>
> Since you already adjusted your own writer for that, it would be great to
> contribute it back to Beam in the way as it was done for other IOs (for
> example, JdbcIO [1] or BigtableIO [2])
>
> In general, I think we need to have it for all IOs, at least to use with
> “Wait” because this pattern it's quite often required.
>
> [1]
> https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L1078
> [2]
> https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L715
>
> On 24 Mar 2021, at 18:01, Vincent Marquez 
> wrote:
>
> No, it only needs to ensure that one record seen on Pubsub has
> successfully written to a database.  So "record by record" is fine, or even
> "bundle".
>
> *~Vincent*
>
>
> On Wed, Mar 24, 2021 at 9:49 AM Alexey Romanenko 
> wrote:
>
>> Do you want to wait for ALL records are written for Cassandra and then
>> write all successfully written records to PubSub or it should be performed
>> "record by record"?
>>
>> On 24 Mar 2021, at 04:58, Vincent Marquez 
>> wrote:
>>
>> I have a common use case where my pipeline looks like this:
>> CassandraIO.readAll -> Aggregate -> CassandraIO.write -> PubSubIO.write
>>
>> I do NOT want my pipeline to look like the following:
>>
>> CassandraIO.readAll -> Aggregate -> CassandraIO.write
>>  |
>>   ->
>> PubsubIO.write
>>
>> Because I need to ensure that only items written to Pubsub have
>> successfully finished a (quorum) write.
>>
>> Since CassandraIO.write is a PTransform I can't actually use it
>> here so I often roll my own 'writer', but maybe there is a recommended way
>> of doing this?
>>
>> Thanks in advance for any help.
>>
>> *~Vincent*
>>
>>
>>
>


Re: [DISCUSS] Drop support for Flink 1.8 and 1.9

2021-03-12 Thread Robert Bradshaw
Do we now support 1.8 through 1.12?

Unless there are specific objections, makes sense to me.

On Fri, Mar 12, 2021 at 8:29 AM Alexey Romanenko 
wrote:

> +1 too but are there any potential objections for this?
>
> On 12 Mar 2021, at 11:21, David Morávek  wrote:
>
> +1
>
> D.
>
> On Thu, Mar 11, 2021 at 8:33 PM Ismaël Mejía  wrote:
>
>> +user
>>
>> > Should we add a warning or something to 2.29.0?
>>
>> Sounds like a good idea.
>>
>>
>>
>>
>> On Thu, Mar 11, 2021 at 7:24 PM Kenneth Knowles  wrote:
>> >
>> > Should we add a warning or something to 2.29.0?
>> >
>> > On Thu, Mar 11, 2021 at 10:19 AM Ismaël Mejía 
>> wrote:
>> >>
>> >> Hello,
>> >>
>> >> We have been supporting older versions of Flink that we had agreed in
>> previous
>> >> discussions where we said we will be supporting only the latest three
>> releases
>> >> [1].
>> >>
>> >> I would like to propose that for Beam 2.30.0 we stop supporting Flink
>> 1.8 and
>> >> 1.9 [2].  I prepared a PR for this [3] but of course I wanted to bring
>> the
>> >> subject here (and to user@) for your attention and in case someone
>> has a
>> >> different opinion or reason to still support the older versions.
>> >>
>> >> WDYT?
>> >>
>> >> Regards,
>> >> Ismael
>> >>
>> >> [1]
>> https://lists.apache.org/thread.html/rfb5ac9d889d0e3f4400471de3c25000a15352bde879622c899d97581%40%3Cdev.beam.apache.org%3E
>> >> [2] https://issues.apache.org/jira/browse/BEAM-11948
>> >> [3] https://github.com/apache/beam/pull/14203
>>
>
>


Re: Overwrite support from ParquetIO

2021-01-27 Thread Robert Bradshaw
Fortunately making deleting files idempotent is much easier than writing
them :). But one needs to handle the case of concurrent execution as well
as sequential re-execution due to possible zombie workers.

On Wed, Jan 27, 2021 at 5:04 PM Reuven Lax  wrote:

> Keep in mind thatt DoFns might be reexecuted (even if you think they have
> completed successfully). This makes DoFns with side effects such as
> deleting files tricky to write correctly.
>
> On Wed, Jan 27, 2021 at 4:36 PM Tao Li  wrote:
>
>> Thanks @Chamikara Jayalath  I think it’s a good
>> idea to define a DoFn for this deletion operation, or maybe a composite
>> PTransform that does deletion first followed by ParquetIO.Write.
>>
>>
>>
>> *From: *Chamikara Jayalath 
>> *Reply-To: *"user@beam.apache.org" 
>> *Date: *Wednesday, January 27, 2021 at 3:45 PM
>> *To: *user 
>> *Cc: *Alexey Romanenko 
>> *Subject: *Re: Overwrite support from ParquetIO
>>
>>
>>
>>
>>
>>
>>
>> On Wed, Jan 27, 2021 at 12:06 PM Tao Li  wrote:
>>
>> @Alexey Romanenko  thanks for your response.
>> Regarding your questions:
>>
>>
>>
>>1. Yes I can purge this directory (e.g. using s3 client from aws sdk)
>>before using ParquetIO to save files. The caveat is that this deletion
>>operation is not part of the beam pipeline, so it will kick off before the
>>pipeline starts. More ideally, this purge operation could be baked into 
>> the
>>write operation with ParquetIO so we will have the deletion happen right
>>before the files writes.
>>2. Regarding the naming strategy, yes the old files will be
>>overwritten by the new files if they have the same file names. However 
>> this
>>does not always guarantee that all the old files in this directory are
>>wiped out (which is actually my requirement). For example we may change 
>> the
>>shard count (through withNumShards() method) in different pipeline runs 
>> and
>>there could be old files from previous run that won’t get overwritten in
>>the current run.
>>
>>
>>
>> In general, Beam file-based sinks are intended  for writing new files. So
>> I don't think existing file-based sinks (including Parquet) will work out
>> of the box for replacing existing files or for appending to such files.
>>
>> But you should be able to delete existing files separately, for example.
>>
>> (1) As a function that is performed before executing the pipeline.
>>
>> (2) As a function that is performed from a ParDo step that is executed
>> before the ParquetIO.Write step. Also you will have to make sure that the
>> runner does not fuse the ParDo step and the Write step. Usually, this can
>> be done by performing it in a side-input step (to a ParDo that precedes
>> sink) or by adding a GBK/Reshuffle between the two steps.
>>
>>
>>
>> Thanks,
>>
>> Cham
>>
>>
>>
>>
>>
>>
>>1.
>>
>>
>>
>> Please let me know if this makes sense to you. Thanks!
>>
>>
>>
>>
>>
>> *From: *Alexey Romanenko 
>> *Reply-To: *"user@beam.apache.org" 
>> *Date: *Wednesday, January 27, 2021 at 9:10 AM
>> *To: *"user@beam.apache.org" 
>> *Subject: *Re: Overwrite support from ParquetIO
>>
>>
>>
>> What do you mean by “wipe out all existing parquet files before a write
>> operation”? Are these all files that already exist in the same output
>> directory? Can you purge this directory before or just use a new output
>> directory for every pipeline run?
>>
>>
>>
>> To write Parquet files you need to use ParquetIO.sink()
>> with FileIO.write() and I don’t think it will clean up the output directory
>> before write. Though, if there are the name collisions between existing and
>> new output files (it depends on used naming strategy) then I think the old
>> files will be overwritten by new ones.
>>
>>
>>
>>
>>
>>
>>
>> On 25 Jan 2021, at 19:10, Tao Li  wrote:
>>
>>
>>
>> Hi Beam community,
>>
>>
>>
>> Does ParquetIO support an overwrite behavior when saving files? More
>> specifically, I would like to wipe out all existing parquet files before a
>> write operation. Is there a ParquetIO API to support that? Thanks!
>>
>>
>>
>>


Re: Is there an array explode function/transform?

2021-01-14 Thread Robert Bradshaw
I don't see any other reasonable interpretation. (One could use this as an
argument to only support one field at a time, to make the potential
explosion in data size all the more obvious.)

On Thu, Jan 14, 2021 at 11:30 AM Reuven Lax  wrote:

> And the result is essentially a cross product of all the different array
> elements?
>
> On Thu, Jan 14, 2021 at 11:25 AM Robert Bradshaw 
> wrote:
>
>> I think it makes sense to allow specifying more than one, if desired.
>> This is equivalent to just stacking multiple Unnests. (Possibly one could
>> even have a special syntax like "*" for all array fields.)
>>
>> On Thu, Jan 14, 2021 at 10:05 AM Reuven Lax  wrote:
>>
>>> Should Unnest be allowed to specify multiple array fields, or just one?
>>>
>>> On Wed, Jan 13, 2021 at 11:59 PM Manninger, Matyas <
>>> matyas.mannin...@veolia.com> wrote:
>>>
>>>> I would also not unnest arrays nested in arrays just the top-level
>>>> array of the specified fields.
>>>>
>>>> On Wed, 13 Jan 2021 at 20:58, Reuven Lax  wrote:
>>>>
>>>>> Nested fields are not part of standard SQL AFAIK. Beam goes further
>>>>> and supports array of array, etc.
>>>>>
>>>>> On Wed, Jan 13, 2021 at 11:42 AM Kenneth Knowles 
>>>>> wrote:
>>>>>
>>>>>> Just the fields specified, IMO. When in doubt, copy SQL. (and I mean
>>>>>> SQL generally, not just Beam SQL)
>>>>>>
>>>>>> Kenn
>>>>>>
>>>>>> On Wed, Jan 13, 2021 at 11:17 AM Reuven Lax  wrote:
>>>>>>
>>>>>>> Definitely could be a top-level transform. Should it automatically
>>>>>>> unnest all arrays, or just the fields specified?
>>>>>>>
>>>>>>> We do have to define the semantics for nested arrays as well.
>>>>>>>
>>>>>>> On Wed, Jan 13, 2021 at 10:57 AM Robert Bradshaw <
>>>>>>> rober...@google.com> wrote:
>>>>>>>
>>>>>>>> Ah, thanks for the clarification. UNNEST does sound like what you
>>>>>>>> want here, and would likely make sense as a top-level relational 
>>>>>>>> transform
>>>>>>>> as well as being supported by SQL.
>>>>>>>>
>>>>>>>> On Wed, Jan 13, 2021 at 10:53 AM Tao Li  wrote:
>>>>>>>>
>>>>>>>>> @Kyle Weaver  sure thing! So the
>>>>>>>>> input/output definition for the Flatten.Iterables
>>>>>>>>> <https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/transforms/Flatten.Iterables.html>
>>>>>>>>> is:
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Input: PCollection
>>>>>>>>>
>>>>>>>>> Output: PCollection
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> The input/output for a explode transform would look like this:
>>>>>>>>>
>>>>>>>>> Input:  PCollection The row schema has a field which is an
>>>>>>>>> array of T
>>>>>>>>>
>>>>>>>>> Output: PCollection The array type field from input schema is
>>>>>>>>> replaced with a new field of type T. The elements from the array type 
>>>>>>>>> field
>>>>>>>>> are flattened into multiple rows in the new table (other fields of 
>>>>>>>>> input
>>>>>>>>> table are just duplicated.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Hope this clarification helps!
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> *From: *Kyle Weaver 
>>>>>>>>> *Reply-To: *"user@beam.apache.org" 
>>>>>>>>> *Date: *Tuesday, January 12, 2021 at 4:58 PM
>>>>>>>>> *To: *"user@beam.apache.org" 
>>>>>>>>> *Cc: *Reuven Lax 
>>>>>>>>> *Subject: *Re: Is there an array explode function/

Re: Is there an array explode function/transform?

2021-01-14 Thread Robert Bradshaw
I think it makes sense to allow specifying more than one, if desired. This
is equivalent to just stacking multiple Unnests. (Possibly one could even
have a special syntax like "*" for all array fields.)

On Thu, Jan 14, 2021 at 10:05 AM Reuven Lax  wrote:

> Should Unnest be allowed to specify multiple array fields, or just one?
>
> On Wed, Jan 13, 2021 at 11:59 PM Manninger, Matyas <
> matyas.mannin...@veolia.com> wrote:
>
>> I would also not unnest arrays nested in arrays just the top-level array
>> of the specified fields.
>>
>> On Wed, 13 Jan 2021 at 20:58, Reuven Lax  wrote:
>>
>>> Nested fields are not part of standard SQL AFAIK. Beam goes further and
>>> supports array of array, etc.
>>>
>>> On Wed, Jan 13, 2021 at 11:42 AM Kenneth Knowles 
>>> wrote:
>>>
>>>> Just the fields specified, IMO. When in doubt, copy SQL. (and I mean
>>>> SQL generally, not just Beam SQL)
>>>>
>>>> Kenn
>>>>
>>>> On Wed, Jan 13, 2021 at 11:17 AM Reuven Lax  wrote:
>>>>
>>>>> Definitely could be a top-level transform. Should it automatically
>>>>> unnest all arrays, or just the fields specified?
>>>>>
>>>>> We do have to define the semantics for nested arrays as well.
>>>>>
>>>>> On Wed, Jan 13, 2021 at 10:57 AM Robert Bradshaw 
>>>>> wrote:
>>>>>
>>>>>> Ah, thanks for the clarification. UNNEST does sound like what you
>>>>>> want here, and would likely make sense as a top-level relational 
>>>>>> transform
>>>>>> as well as being supported by SQL.
>>>>>>
>>>>>> On Wed, Jan 13, 2021 at 10:53 AM Tao Li  wrote:
>>>>>>
>>>>>>> @Kyle Weaver  sure thing! So the input/output
>>>>>>> definition for the Flatten.Iterables
>>>>>>> <https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/transforms/Flatten.Iterables.html>
>>>>>>> is:
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Input: PCollection
>>>>>>>
>>>>>>> Output: PCollection
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> The input/output for a explode transform would look like this:
>>>>>>>
>>>>>>> Input:  PCollection The row schema has a field which is an
>>>>>>> array of T
>>>>>>>
>>>>>>> Output: PCollection The array type field from input schema is
>>>>>>> replaced with a new field of type T. The elements from the array type 
>>>>>>> field
>>>>>>> are flattened into multiple rows in the new table (other fields of input
>>>>>>> table are just duplicated.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Hope this clarification helps!
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> *From: *Kyle Weaver 
>>>>>>> *Reply-To: *"user@beam.apache.org" 
>>>>>>> *Date: *Tuesday, January 12, 2021 at 4:58 PM
>>>>>>> *To: *"user@beam.apache.org" 
>>>>>>> *Cc: *Reuven Lax 
>>>>>>> *Subject: *Re: Is there an array explode function/transform?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> @Reuven Lax  yes I am aware of that transform,
>>>>>>> but that’s different from the explode operation I was referring to:
>>>>>>> https://spark.apache.org/docs/latest/api/sql/index.html#explode
>>>>>>> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fsql%2Findex.html%23explode=04%7C01%7Ctaol%40zillow.com%7C1226a5d9efee43fc7d5508d8b75e5bfd%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637460963191408293%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=IjXWhmHTGsbpgbxa1gJ5LcOFI%2BoiGIDYBwXPnukQfxk%3D=0>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> How is it different? It'd help if you could provide the signature
>>>>>>> (input and output PCollection types) of the transform you have in mind.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Jan 12, 2021 at 4:49 PM Tao Li  wrote:
>>>>>>>
>>>>>>> @Reuven Lax  yes I am aware of that transform,
>>>>>>> but that’s different from the explode operation I was referring to:
>>>>>>> https://spark.apache.org/docs/latest/api/sql/index.html#explode
>>>>>>> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fsql%2Findex.html%23explode=04%7C01%7Ctaol%40zillow.com%7C1226a5d9efee43fc7d5508d8b75e5bfd%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637460963191418249%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=XuUUmNB3fgBasjDj0Dq1Z2g6%2Bc5fbvluf%2BnAp2m8cuE%3D=0>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> *From: *Reuven Lax 
>>>>>>> *Reply-To: *"user@beam.apache.org" 
>>>>>>> *Date: *Tuesday, January 12, 2021 at 2:04 PM
>>>>>>> *To: *user 
>>>>>>> *Subject: *Re: Is there an array explode function/transform?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Have you tried Flatten.iterables
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Jan 12, 2021, 2:02 PM Tao Li  wrote:
>>>>>>>
>>>>>>> Hi community,
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Is there a beam function to explode an array (similarly to spark
>>>>>>> sql’s explode())? I did some research but did not find anything.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> BTW I think we can potentially use FlatMap to implement the explode
>>>>>>> functionality, but a Beam provided function would be very handy.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Thanks a lot!
>>>>>>>
>>>>>>>


Re: Is there an array explode function/transform?

2021-01-13 Thread Robert Bradshaw
Ah, thanks for the clarification. UNNEST does sound like what you want
here, and would likely make sense as a top-level relational transform as
well as being supported by SQL.

On Wed, Jan 13, 2021 at 10:53 AM Tao Li  wrote:

> @Kyle Weaver  sure thing! So the input/output
> definition for the Flatten.Iterables
> 
> is:
>
>
>
> Input: PCollection
>
> Output: PCollection
>
>
>
> The input/output for a explode transform would look like this:
>
> Input:  PCollection The row schema has a field which is an array of T
>
> Output: PCollection The array type field from input schema is
> replaced with a new field of type T. The elements from the array type field
> are flattened into multiple rows in the new table (other fields of input
> table are just duplicated.
>
>
>
> Hope this clarification helps!
>
>
>
> *From: *Kyle Weaver 
> *Reply-To: *"user@beam.apache.org" 
> *Date: *Tuesday, January 12, 2021 at 4:58 PM
> *To: *"user@beam.apache.org" 
> *Cc: *Reuven Lax 
> *Subject: *Re: Is there an array explode function/transform?
>
>
>
> @Reuven Lax  yes I am aware of that transform, but
> that’s different from the explode operation I was referring to:
> https://spark.apache.org/docs/latest/api/sql/index.html#explode
> 
>
>
>
> How is it different? It'd help if you could provide the signature (input
> and output PCollection types) of the transform you have in mind.
>
>
>
> On Tue, Jan 12, 2021 at 4:49 PM Tao Li  wrote:
>
> @Reuven Lax  yes I am aware of that transform, but
> that’s different from the explode operation I was referring to:
> https://spark.apache.org/docs/latest/api/sql/index.html#explode
> 
>
>
>
> *From: *Reuven Lax 
> *Reply-To: *"user@beam.apache.org" 
> *Date: *Tuesday, January 12, 2021 at 2:04 PM
> *To: *user 
> *Subject: *Re: Is there an array explode function/transform?
>
>
>
> Have you tried Flatten.iterables
>
>
>
> On Tue, Jan 12, 2021, 2:02 PM Tao Li  wrote:
>
> Hi community,
>
>
>
> Is there a beam function to explode an array (similarly to spark sql’s
> explode())? I did some research but did not find anything.
>
>
>
> BTW I think we can potentially use FlatMap to implement the explode
> functionality, but a Beam provided function would be very handy.
>
>
>
> Thanks a lot!
>
>


Re: Help measuring upcoming performance increase in flink runner on production systems

2020-12-21 Thread Robert Bradshaw
I agree. Borrowing the mutation detection from the direct runner as an
intermediate point sounds like a good idea.

On Mon, Dec 21, 2020 at 8:57 AM Kenneth Knowles  wrote:

> I really think we should make a plan to make this the default. If you test
> with the DirectRunner it will do mutation checking and catch pipelines that
> depend on the runner cloning every element. (also the DirectRunner doesn't
> clone). Since the cloning is similar in cost to the mutation detection,
> could we actually add some mutation detection to FlinkRunner pipelines and
> also directly warn if a pipeline is depending on it?
>
> Kenn
>
> On Mon, Dec 21, 2020 at 5:04 AM Teodor Spæren 
> wrote:
>
>> Hey! My option is not default as of now, since it can break pipelines
>> which rely on the faulty flink implementation. I'm creating my own
>> benchmarks locally and will run against those, but the idea of adding it
>> to the official benchmark runs sounds interesting, thanks for bringing
>> it up!
>>
>> Teodor
>>
>> On Tue, Dec 15, 2020 at 06:51:38PM -0800, Ahmet Altay wrote:
>> >Hi Teodor,
>> >
>> >Thank you for working on this. If I remember correctly, there were some
>> >opportunities to improve in the previous paper (e.g. not focusing
>> >deprecated runners, long running benchmarks, varying data sizes). And I
>> am
>> >excited that you are keeping the community as part of your research
>> process
>> >and we will be happy to help you where we can.
>> >
>> >Related to your question. Was the new option used by default? If that
>> >is the case you will probably see its impact on the metrics dashboard
>> [1].
>> >And if it is not on by default, you can add your variant as a new
>> benchmark
>> >and compare the difference across many runs in a controlled benchmarking
>> >environment. Would that help?
>> >
>> >Ahmet
>> >
>> >[1] http://metrics.beam.apache.org/d/1/getting-started?orgId=1
>> >
>> >
>> >On Tue, Dec 15, 2020 at 5:48 AM Teodor Spæren > >
>> >wrote:
>> >
>> >> Hey!
>> >>
>> >> Yeah, that paper was what prompted my master thesis! I definitivly will
>> >> post here, once I get more data :)
>> >>
>> >> Teodor
>> >>
>> >> On Mon, Dec 14, 2020 at 06:56:30AM -0600, Rion Williams wrote:
>> >> >Hi Teodor,
>> >> >
>> >> >Although I’m sure you’ve come across it, this might have some valuable
>> >> resources or methodologies to consider as you explore this a bit more:
>> >> >
>> >> >https://arxiv.org/pdf/1907.08302.pdf
>> >> >
>> >> >I’m looking forward to reading about your finding, especially using a
>> >> more recent iteration of Beam!
>> >> >
>> >> >Rion
>> >> >
>> >> >> On Dec 14, 2020, at 6:37 AM, Teodor Spæren <
>> teodor_spae...@riseup.net>
>> >> wrote:
>> >> >>
>> >> >> Just bumping this so people see it now that 2.26.0 is out :)
>> >> >>
>> >> >>> On Wed, Nov 25, 2020 at 11:09:52AM +0100, Teodor Spæren wrote:
>> >> >>> Hey!
>> >> >>>
>> >> >>> My name is Teodor Spæren and I'm writing a master thesis
>> investigating
>> >> the performance overhead of using Beam instead of using the underlying
>> >> systems directly. My focus has been on Flink and I've made a discovery
>> >> about some unnecessary copying between operators in the Flink
>> runner[1][2].
>> >> I wrote a fixed for this and it got accepted and merged,
>> >> >>> and will be in the upcoming 2.26.0 release[3].
>> >> >>>
>> >> >>> I'm writing this email to ask if anyone on these mailing lists
>> would
>> >> be willing to send me some result of applying this option when the new
>> >> version of beam releases. Anything will be very much appreciated,
>> stories,
>> >> screenshots of performance monitoring before and after, hard numbers,
>> >> anything! If you include the cluster size and the workload that would
>> be
>> >> awesome too! My master thesis is set to be complete the coming summer,
>> so
>> >> there is no real hurry :)
>> >> >>>
>> >> >>> The thesis will be freely accessible[4] and I hope that these
>> findings
>> >> will be of help to the beam community. If anyone wishes to submit
>> stories,
>> >> but remain anonymous that is also ok :)
>> >> >>>
>> >> >>> The best way to contact me would be to send an email my way here,
>> or
>> >> on teod...@mail.uio.no.
>> >> >>>
>> >> >>> Any help is appreciated, thanks for your attention!
>> >> >>>
>> >> >>> Best regards,
>> >> >>> Teodor Spæren
>> >> >>>
>> >> >>>
>> >> >>> [1]:
>> >>
>> https://lists.apache.org/thread.html/r24129dba98782e1cf4d18ec738ab9714dceb05ac23f13adfac5baad1%40%3Cdev.beam.apache.org%3E
>> >> >>> [2]: https://issues.apache.org/jira/browse/BEAM-11146
>> >> >>> [3]: https://github.com/apache/beam/pull/13240
>> >> >>> [4]: https://www.duo.uio.no/
>> >>
>>
>


Re: is apache beam go sdk supported by spark runner?

2020-11-25 Thread Robert Bradshaw
Yes, it should be for batch (just like for Python). There is ongoing
work to make it work for Streaming as well.

On Sat, Nov 21, 2020 at 2:57 PM Meriem Sara  wrote:
>
> Hello everyone. I am trying to use apache beam with Golang to execute a data 
> processing workflow using apache Spark.  However, I am confused if the go SDK 
> is supported by apache Spark. Could you please provide us wirh more 
> information ?
>
> Thank you


Re: Support for Flink 1.11

2020-10-16 Thread Robert Bradshaw
Support for Flink 1.11 is
https://issues.apache.org/jira/browse/BEAM-10612 . It has been
implemented and will be included in the next release (Beam 2.25). In
the meantime, you could try building yourself from head.

On Fri, Oct 16, 2020 at 4:39 AM Kishor Joshi  wrote:
>
> Hi Team,
>
> Since the beam has released the version 2.24 but it seems still the highest 
> version that is supported in beam 2.24 (>=2.21) is 1.10.
> So i would like to know what is the plan to release beam with the support of 
> latest flink i.e. 1.11.x.
>
> Thanks & Regards,
> Kishor


Re: Upload third party runtime dependencies for expanding transform like KafkaIO.Read in Python Portable Runner

2020-10-02 Thread Robert Bradshaw
If you make sure that these extra jars are in your path when you
execute your pipeline, they should get picked up when invoking the
expansion service (though this may not be the case long term).

The cleanest way would be to provide your own expansion service. If
you build a jar that consists of Beam's IO expansion service plus any
necessary dependencies, you should be able to do

ReadFromKafka(
[ordinary params],
expansion_service=BeamJarExpansionService('path/to/your/jar'))

to use this "custom" expansion service. See
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/kafka.py
An alternative is to pass a pipeline option

--beam_services={"sdks:java:io:expansion-service:shadowJar":
"path/to/your/jar"}

which will override the default. (You can pass "host:port" rather than
a path as well if you manually start the expansion service.)

Exactly how to specify at a top level a set of extra dependencies to
be applied to a particular subset of other-language transforms is
still an open problem. Alternatively we could try to make expansion
services themselves trivially easy to build, customize, and use.

Hopefully that helps.

- Robert




On Fri, Oct 2, 2020 at 5:57 PM Kobe Feng  wrote:
>
> Thanks Rober, yes, our Kafka requires JAAS configuration (sasl.jaas.config) 
> at the client side for security check with the corresponding LoginModule 
> which requires additional classes:
> ==
> Caused by: javax.security.auth.login.LoginException: unable to find 
> LoginModule class: io.${}.kafka.security.iaf.IAFLoginModule
> at 
> javax.security.auth.login.LoginContext.invoke(LoginContext.java:794)
> at 
> javax.security.auth.login.LoginContext.access$000(LoginContext.java:195)
> at javax.security.auth.login.LoginContext$4.run(LoginContext.java:682)
> at javax.security.auth.login.LoginContext$4.run(LoginContext.java:680)
> at java.security.AccessController.doPrivileged(Native Method)
> at 
> javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:680)
> at javax.security.auth.login.LoginContext.login(LoginContext.java:587)
> at 
> org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:52)
> at 
> org.apache.kafka.common.security.authenticator.LoginManager.(LoginManager.java:53)
> at 
> org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:76)
> at 
> org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:103)
> ... 42 more
>
> at 
> org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:177)
> at 
> org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157)
>
> On Fri, Oct 2, 2020 at 5:14 PM Robert Bradshaw  wrote:
>>
>> Could you clarify a bit exactly what you're trying to do? When using 
>> KafkaIO, the provided jar should have all the necessary dependencies to 
>> construct and execute the kafka read/write. Is there some reason you need to 
>> inject additional dependencies into the environment provided by kafka?
>>
>> On Fri, Oct 2, 2020 at 3:20 PM Kobe Feng  wrote:
>>>
>>> Just a followup since no one replied it.
>>> My understanding is for any expanded transforms beam wants the environment 
>>> self-described.
>>> So I updated boot and dockerfile for the java harness environment and use 
>>> --sdk_harness_container_image_overrides in portable runner but fail to see 
>>> the updated image loaded (default still), I guess only dataflow runner 
>>> support it by glancing the code, but I believe it's the correct way and 
>>> just need to deep dive the codes here when I turn back, then I will update 
>>> this thread too.
>>>
>>> Kobe
>>> On Wed, Sep 30, 2020 at 1:26 PM Kobe Feng  wrote:
>>>>
>>>> Hi everyone,
>>>> Is there any recommended way to upload a third party jar (runtime scope) 
>>>> for expanding transform like KafkaIO.Read when using the python portable 
>>>> runner? Thank you!
>>>>
>>>> I tried --experiments=jar_packages=abc.jar,d.jar but just found those 
>>>> artifacts in python harness with provision info, and the java harness just 
>>>> uses the default environment for dependencies after expanding 
>>>> transformation from the grpc server upon expansion jar for reading Kafka 
>>>> messages.
>>>>
>>>> Also noticed above option will be removed in the future then tried 
>>>> --files_to_stage but this option only exists in Java SDK pipeline options.
>>>>
>>>> --
>>>> Yours Sincerely
>>>> Kobe Feng
>>>
>>>
>>>
>>> --
>>> Yours Sincerely
>>> Kobe Feng
>
>
>
> --
> Yours Sincerely
> Kobe Feng


Re: [DISCUSS] Deprecation of AWS SDK v2 IO connectors

2020-09-15 Thread Robert Bradshaw
Thanks for clarifying the state of things. +1 to deprecating once we have
parity. If the v2 ones are better, perhaps a blog post would be a good way
to advertise (and document) their existence and advantages too.

On Tue, Sep 15, 2020 at 2:15 PM Ismaël Mejía  wrote:

> The reason why most people are using AWSv1 IOs is probably because they are
> in Beam since 2017 instead of just added in the last year which is the
> case of
> the AWSv2 ones.
>
> Alexey mentions that maintaining both versions is becoming painful and I
> would
> like to expand on that because we have now duplicated work for new
> features, for
> example someone contributing some small improvement does it in one of the
> two
> versions and we try to encourage them to do it in both and general
> confusion and
> lots of extra work is going into keeping them aligned. And for more
> complex IOs
> like Kinesis this might prove harder in the future.
>
> Notice that the migration path is incremental because users can have both
> Amazon
> SDKs in the same classpath without conflicts. And Alexey's proposal is
> about
> deprecating AWSv1 IOs to reduce the maintenance burden, not removing them
> from
> the codebase. This could help to raise awareness about the AWSv2 IOs so
> users
> migrate and diminish the extra overhead for contributors and maintainers.
>
> One minor comment to the proposal is that if we proceed with this plan we
> should
> deprecate a v1 IO ONLY when we have full feature parity in the v2 version.
> I think we don't have a replacement for AWSv1 S3 IO so that one should not
> be
> deprecated.
>
> On Tue, Sep 15, 2020 at 6:07 PM Robert Bradshaw 
> wrote:
> >
> > The 10x-100x ratio looks like an answer right there about
> (non-)suitability for deprecation. The new question would be *why* people
> are using the v1 APIs. Is it because it was the original, or that it's been
> around longer, or it has more features?
> >
>


Re: Info needed - pmc mailing list

2020-08-25 Thread Robert Bradshaw
Try priv...@beam.apache.org.

On Tue, Aug 25, 2020 at 6:18 AM D, Anup (Nokia - IN/Bangalore)
 wrote:
>
> Hi,
>
>
>
> We would like to know if there is a way to reach out to members of the pmc 
> group.
>
> We tried sending email to p...@beam.apache.org but it got bounced.
>
>
>
> Thanks
>
> Anup


Re: Out-of-orderness of window results when testing stateful operators with TextIO

2020-08-24 Thread Robert Bradshaw
As for the question of writing tests in the face of non-determinism,
you should look into TestStream. MyStatefulDoFn still needs to be
updated to not assume an ordering. (This can be done by setting timers
that provide guarantees that (modulo late data) one has seen all data
up to a certain timestamp.)

On Mon, Aug 24, 2020 at 8:56 AM Reuven Lax  wrote:
>
> Generally you should not rely on PCollection being ordered, though there have 
> been discussions about adding some time-ordering semantics.
>
>
>
> On Sun, Aug 23, 2020 at 9:06 PM Rui Wang  wrote:
>>
>> Current Beam model does not guarantee an ordering after a GBK (i.e. 
>> Combine.perKey() in your). So you cannot expect that the C step sees 
>> elements in a specific order.
>>
>> As I recall on Dataflow runner, there is very limited ordering support. Hi 
>> +Reuven Lax can share your insights about it?
>>
>>
>> -Rui
>>
>>
>>
>> On Sun, Aug 23, 2020 at 8:32 PM Dongwon Kim  wrote:
>>>
>>> Hi,
>>>
>>> My Beam pipeline is designed to work with an unbounded source KafkaIO.
>>> It roughly looks like below:
>>> p.apply(KafkaIO.read() ...)   // (A-1)
>>>   .apply(WithKeys.of(...).withKeyType(...))
>>>   .apply(Window.into(FixedWindows.of(...)))
>>>   .apply(Combine.perKey(...))  // (B)
>>>   .apply(Window.into(new GlobalWindows())) // to have per-key stats in 
>>> (C)
>>>   .apply(ParDo.of(new MyStatefulDoFn()))  // (C)
>>> Note that (C) has its own state which is expected to be fetched and updated 
>>> by window results (B) in order of event-time.
>>>
>>> Now I'm writing an integration test where (A-1) is replaced by (A-2):

 p.apply(TextIO.read().from("test.txt"))  // (A-2)
>>>
>>> "text.txt" contains samples having a single key.
>>>
>>> I get a wrong result and it turns out that window results didn't feed into 
>>> (C) in order.
>>> Is it because (A-2) makes the pipeline a bounded one?
>>>
>>> Q1. How to prevent this from happening?
>>> Q2. How do you guys usually write an integration test for an unbounded one 
>>> with stateful function?
>>>
>>> Best,
>>>
>>> Dongwon


Re: Resource Consumption increase With TupleTag

2020-08-19 Thread Robert Bradshaw
Is this 2kps coming out of Filter1 + 2kps coming out of Filter2 (which
would be 4kps total), or only 2kps coming out of KafkaIO and
MessageExtractor?

Though it /shouldn't/ matter, due to sibling fusion, there's a chance
things are getting fused poorly and you could write Filter1 and
Filter2 instead as a DoFn with multiple outputs (see
https://beam.apache.org/documentation/programming-guide/#additional-outputs).

- Robert

On Wed, Aug 19, 2020 at 3:37 PM Talat Uyarer
 wrote:
>
> Hi,
>
> I have a very simple DAG on my dataflow job. (KafkaIO->Filter->WriteGCS). 
> When I submit this Dataflow job per topic it has 4kps per instance processing 
> speed. However I want to consume two different topics in one DF job. I used 
> TupleTag. I created TupleTags per message type. Each topic has different 
> message types and also needs different filters. So my pipeline turned to 
> below DAG. Message Extractor is a very simple step checking header of kafka 
> messages and writing the correct TupleTag. However after starting to use this 
> new DAG, dataflow canprocess 2kps per instance.
>
>  |--->Filter1-->WriteGCS
> KafkaIO->MessageExtractor-> |
>  |--->Filter2-->WriteGCS
>
> Do you have any idea why my data process speed decreased ?
>
> Thanks


Re: Staged PIP package mysteriously ungzipped, non-installable inside the worker

2020-08-17 Thread Robert Bradshaw
I checked Java, it looks like the way things are structured we do not
have that bug there.

On Mon, Aug 17, 2020 at 3:31 PM Robert Bradshaw  wrote:
>
> +1
>
> Thanks, Eugene, for finding and fixing this!
>
> FWIW, most use of Python from the Python Portable Runner used the
> embedded environment (this is the default direct runner), so
> dependencies are already present.
>
> On Mon, Aug 17, 2020 at 3:19 PM Daniel Oliveira  
> wrote:
> >
> > Normally I'd say not to cherry-pick this since the issue is only affecting 
> > one runner and isn't really a regression, but given that it's the last Py2 
> > release and there won't be a follow-up release that will be able to include 
> > this fix, I think it's worth making an exception this time. There should be 
> > at least one release with a working portable runner for Py2 users, given 
> > that the portable runner is included in examples on the website. Plus, I'm 
> > already waiting for some other cherry-picks, so it won't even delay 
> > anything.
> >
> > So yes, Eugene if you could create a cherry-pick of this change into the 
> > release-2.24.0 branch, I'll review and merge it.
> >
> > On Mon, Aug 17, 2020 at 11:27 AM Valentyn Tymofieiev  
> > wrote:
> >>
> >> Will defer to the release manager; one reason to cherry-pick is that 
> >> 2.24.0 will be the last release with Python 2 support, so Py2 users of 
> >> Portable Python Local Runner might appreciate the fix, since they won't be 
> >> able to use the next release.
> >>
> >> On Thu, Aug 13, 2020 at 6:28 PM Eugene Kirpichov  
> >> wrote:
> >>>
> >>> +Daniel as in charge of 2.24 per dev@ thread.
> >>>
> >>> On Thu, Aug 13, 2020 at 6:24 PM Eugene Kirpichov  
> >>> wrote:
> >>>>
> >>>> The PR is merged.
> >>>>
> >>>> Do folks think this warrants being cherrypicked into v2.24? My hunch is 
> >>>> yes, cause basically one of the runners (local portable python runner) 
> >>>> is broken for any production workload (works only if your pipeline has 
> >>>> no dependencies).
> >>>>
> >>>> On Thu, Aug 13, 2020 at 12:56 PM Eugene Kirpichov  
> >>>> wrote:
> >>>>>
> >>>>> FWIW I sent a PR to fix this https://github.com/apache/beam/pull/12571
> >>>>>
> >>>>> However, I'm not up to date on the portable test infrastructure and 
> >>>>> would appreciate guidance on what tests I can add for this.
> >>>>>
> >>>>> On Tue, Aug 11, 2020 at 5:28 PM Eugene Kirpichov  
> >>>>> wrote:
> >>>>>>
> >>>>>> (FYI Sam +sbrot...@gmail.com)
> >>>>>>
> >>>>>> On Tue, Aug 11, 2020 at 5:00 PM Eugene Kirpichov 
> >>>>>>  wrote:
> >>>>>>>
> >>>>>>> Ok I found the bug, and now I don't understand how it could have 
> >>>>>>> possibly ever worked. And if this was never tested, then I don't 
> >>>>>>> understand why it works after fixing this one bug :)
> >>>>>>>
> >>>>>>> Basically the Python ArtifactStaging/RetrievalService uses 
> >>>>>>> FileSystems.open() to read the artifacts to be staged, and 
> >>>>>>> FileSystems.open() by default decompresses compressed files based on 
> >>>>>>> their extension.
> >>>>>>> I found two of such services - in Python and in Java. Is the Python 
> >>>>>>> used with an embedded job endpoint and the java one otherwise? I 
> >>>>>>> haven't inspected the Java one, but fixing Python does the trick.
> >>>>>>>
> >>>>>>> The fix is this patch:
> >>>>>>>
> >>>>>>> diff --git 
> >>>>>>> a/sdks/python/apache_beam/runners/portability/artifact_service.py 
> >>>>>>> b/sdks/python/apache_beam/runners/portability/artifact_service.py
> >>>>>>> index f2bbf534c3..1f3ec1c0b0 100644
> >>>>>>> --- a/sdks/python/apache_beam/runners/portability/artifact_service.py
> >>>>>>> +++ b/sdks/python/apache_beam/runners/portability/artifact_service.py
> >>>>>>> @@ -41,6 +41,7 @@ import grpc
> >>>>>>>  from future.moves.urllib.request import urlopen
> >

Re: Staged PIP package mysteriously ungzipped, non-installable inside the worker

2020-08-17 Thread Robert Bradshaw
+1

Thanks, Eugene, for finding and fixing this!

FWIW, most use of Python from the Python Portable Runner used the
embedded environment (this is the default direct runner), so
dependencies are already present.

On Mon, Aug 17, 2020 at 3:19 PM Daniel Oliveira  wrote:
>
> Normally I'd say not to cherry-pick this since the issue is only affecting 
> one runner and isn't really a regression, but given that it's the last Py2 
> release and there won't be a follow-up release that will be able to include 
> this fix, I think it's worth making an exception this time. There should be 
> at least one release with a working portable runner for Py2 users, given that 
> the portable runner is included in examples on the website. Plus, I'm already 
> waiting for some other cherry-picks, so it won't even delay anything.
>
> So yes, Eugene if you could create a cherry-pick of this change into the 
> release-2.24.0 branch, I'll review and merge it.
>
> On Mon, Aug 17, 2020 at 11:27 AM Valentyn Tymofieiev  
> wrote:
>>
>> Will defer to the release manager; one reason to cherry-pick is that 2.24.0 
>> will be the last release with Python 2 support, so Py2 users of Portable 
>> Python Local Runner might appreciate the fix, since they won't be able to 
>> use the next release.
>>
>> On Thu, Aug 13, 2020 at 6:28 PM Eugene Kirpichov  
>> wrote:
>>>
>>> +Daniel as in charge of 2.24 per dev@ thread.
>>>
>>> On Thu, Aug 13, 2020 at 6:24 PM Eugene Kirpichov  
>>> wrote:

 The PR is merged.

 Do folks think this warrants being cherrypicked into v2.24? My hunch is 
 yes, cause basically one of the runners (local portable python runner) is 
 broken for any production workload (works only if your pipeline has no 
 dependencies).

 On Thu, Aug 13, 2020 at 12:56 PM Eugene Kirpichov  
 wrote:
>
> FWIW I sent a PR to fix this https://github.com/apache/beam/pull/12571
>
> However, I'm not up to date on the portable test infrastructure and would 
> appreciate guidance on what tests I can add for this.
>
> On Tue, Aug 11, 2020 at 5:28 PM Eugene Kirpichov  
> wrote:
>>
>> (FYI Sam +sbrot...@gmail.com)
>>
>> On Tue, Aug 11, 2020 at 5:00 PM Eugene Kirpichov  
>> wrote:
>>>
>>> Ok I found the bug, and now I don't understand how it could have 
>>> possibly ever worked. And if this was never tested, then I don't 
>>> understand why it works after fixing this one bug :)
>>>
>>> Basically the Python ArtifactStaging/RetrievalService uses 
>>> FileSystems.open() to read the artifacts to be staged, and 
>>> FileSystems.open() by default decompresses compressed files based on 
>>> their extension.
>>> I found two of such services - in Python and in Java. Is the Python 
>>> used with an embedded job endpoint and the java one otherwise? I 
>>> haven't inspected the Java one, but fixing Python does the trick.
>>>
>>> The fix is this patch:
>>>
>>> diff --git 
>>> a/sdks/python/apache_beam/runners/portability/artifact_service.py 
>>> b/sdks/python/apache_beam/runners/portability/artifact_service.py
>>> index f2bbf534c3..1f3ec1c0b0 100644
>>> --- a/sdks/python/apache_beam/runners/portability/artifact_service.py
>>> +++ b/sdks/python/apache_beam/runners/portability/artifact_service.py
>>> @@ -41,6 +41,7 @@ import grpc
>>>  from future.moves.urllib.request import urlopen
>>>
>>>  from apache_beam.io import filesystems
>>> +from apache_beam.io.filesystems import CompressionTypes
>>>  from apache_beam.portability import common_urns
>>>  from apache_beam.portability.api import beam_artifact_api_pb2
>>>  from apache_beam.portability.api import beam_artifact_api_pb2_grpc
>>> @@ -263,7 +264,8 @@ class BeamFilesystemHandler(object):
>>>  self._root = root
>>>
>>>def file_reader(self, path):
>>> -return filesystems.FileSystems.open(path)
>>> +return filesystems.FileSystems.open(
>>> +path, compression_type=CompressionTypes.UNCOMPRESSED)
>>>
>>>def file_writer(self, name=None):
>>>  full_path = filesystems.FileSystems.join(self._root, name)
>>> diff --git 
>>> a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
>>>  
>>> b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
>>> index 5bf3282250..2684235be0 100644
>>> --- 
>>> a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
>>> +++ 
>>> b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
>>> @@ -45,6 +45,7 @@ from typing import overload
>>>  import grpc
>>>
>>>  from apache_beam.io import filesystems
>>> +from apache_beam.io.filesystems import CompressionTypes
>>>  from apache_beam.portability import common_urns
>>>  from apache_beam.portability import python_urns
>>>  

Re: Scio 0.9.3 released

2020-08-05 Thread Robert Bradshaw
Thanks for the update!

On Wed, Aug 5, 2020 at 11:46 AM Neville Li  wrote:
>
> Hi all,
>
> We just released Scio 0.9.3. This bumps Beam SDK to 2.23.0 and includes a lot 
> of improvements & bug fixes.
>
> Cheers,
> Neville
>
> https://github.com/spotify/scio/releases/tag/v0.9.3
>
> "Petrificus Totalus"
>
> There are no breaking changes in this release, but some were introduced with 
> v0.9.0:
>
> See v0.9.0 Migration Guide for detailed instructions.
>
> Improvements
>
> Allow user-supplied filename prefix for smb writes/reads (#3215)
> Refactor SortedBucketTransform into a BoundedSource + reuse merge logic 
> (#3097)
> Add keyGroupFilter optimization to scio-smb (#3160)
> Add error message to BaseAsyncLookupDoFn preconditions check (#3176)
> Add Elasticsearch 5,6,7 add/update alias on multiple indices ops (#3134)
> Add initial update alias op to ES7(#2920)
> Add ScioContext#applyTransform (#3146)
> Allow SCollection#transform name override (#3142)
> Allow setting default name through SCollection#applyTransform (#3144)
> Update 0.9 migration doc and add Bigquery Type read schema 
> documentation(#3148)
>
> Bug Fixes
>
> AvroBucketMetadata should validate keyPath (fix #3038) (#3140)
> Allow union types in non leaf field for key (#3187)
> Fix issue with union type as non-leaf field of smb key (#3193)
> Fix ContextAndArgs#typed overloading issue (#3199)
> Fix error propagation on Scala Future onSuccess callback (#3178)
> Fix ByteBuffer should be readOnly (#3220)
> Fix compiler warnings (#3183)
> Fix JdbcShardedReadOptions.fetchSize description (#3209)
> Fix FAQ typo (#3194)
> Fix scalafix error in SortMergeBucketScioContextSyntax (#3158)
> Add scalafix ExplicitReturnType and ProcedureSyntax rules (#3179)
> Cleanup a few more unused and unchecked params (#3223)
> Use GcpOptions#getWorkerZone instead of deprecated GcpOptions#getZone (#3224)
> Use raw coder in SCollection#applyKvTransform (#3171)
> Add raw beam coder materializer (#3164)
> Avoid circular dep between SCollection and PCollectionWrapper (#3163)
> Remove unused param of internal partitionFn (#3166)
> Remove unused CoderRegistry (#3165)
> Remove defunct scio-bench (#3150)
> Reuse applyTransform (#3162)
> Make multijoin.py python3
> Use TextIO#withCompression (#3145)
>
> Dependency Updates
>
> Update Beam SDK to 2.23.0 (#3197)
> Update dependencies to be inline with 2.23.0 (#3225)
> Update to scala 2.12.12 (#3157)
> Update auto-value to 1.7.4 (#3147)
> Update breeze to 1.1 (#3211)
> Update cassandra-all to 3.11.7 (#3186)
> Update cassandra-driver-core to 3.10.0 (#3195)
> Update commons-lang3 to 3.11 (#3161)
> Update commons-text to 1.9 (#3185)
> Update contributing guidelines with current tools (#3149)
> Update elasticsearch-rest-client, ... to 7.8.1 (#3192)
> Update elasticsearch, ... to 6.8.11 (#3188)
> Update jackson-module-scala to 2.10.5 (#3169)
> Update jna to 5.6.0 (#3156)
> Update magnolify to 0.2.2 (#3154)
> Update mysql-connector-java to 8.0.21 (#3153)
> Update pprint to 0.6.0 (#3203)
> Update protobuf version to 3.11.4 (#3200)
> Update sbt-scalafix to 0.9.18 (#3138)
> Update sbt-sonatype to 3.9.4 (#3136)
> Update scalafmt-core to 2.6.2 (#3139)
> Update scalafmt-core to 2.6.3 (#3152)
> Update scalafmt-core to 2.6.4 (#3167)
> Update sparkey to 3.1.0 (#3204)
> Fix conflicting gcsio dependency (#3180)


Re: ReadFromKafka returns error - RuntimeError: cannot encode a null byte[]

2020-07-22 Thread Robert Bradshaw
On Sat, Jul 18, 2020 at 12:08 PM Chamikara Jayalath 
wrote:

>
>
> On Fri, Jul 17, 2020 at 10:04 PM ayush sharma <1705ay...@gmail.com> wrote:
>
>> Thank you guys for the reply. I am really stuck and could not proceed
>> further.
>> Yes, the previous trial published message had null key.
>> But when I send key:value pair through producer using
>>
>> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
>> mytopic --property *"parse.key=true" --property "key.separator=:"*
>> > tryKey:tryValue
>>
>> I do not get any error but beam does not print the received message. Here
>> is how my pipeline looks like,
>> result = (
>> pipeline
>>
>> | "Read from kafka" >> ReadFromKafka(
>> consumer_config={
>> "bootstrap.servers": 'localhost:9092',
>> },
>> topics=['mytopic'],
>> expansion_service='localhost:8097',
>>
>> | "print" >> beam.Map(print)
>> )
>>
>>
> I suspect DirectRunner in LOOPBACK mode might not be working for
> cross-language transforms today.
>

When running a Streaming pipeline, the DirectRuner falls back to the old
runner that does not support cross-language.
https://issues.apache.org/jira/browse/BEAM-7514

Please note that cross-language transforms framework is fairly new [1] and
> we are adding support for various runners and environment configurations.
> Can you try with Flink in DOCKER mode ?
>
>
>> If this is not the way we make beam and kafka communicate then please
>> share a working example which showcases how a message published in kafka
>> gets received by beam while streaming.
>>
>
> I'm adding an example but I've only tested this with Dataflow yet. I hope
> to test that example for more runners and add additional instructions
> there.
> https://github.com/apache/beam/pull/12188
>
> Thanks,
> Cham
>
> [1] https://beam.apache.org/roadmap/connectors-multi-sdk/
>
>>
>> Regards,
>> Ayush Sharma
>>
>> On Fri, Jul 17, 2020 at 11:39 PM Chamikara Jayalath 
>> wrote:
>>
>>> Yes, seems like this is due to the key being null. XLang KafkaIO has to
>>> be updated to support this. You should not run into this error if you
>>> publish keys and values that are not null.
>>>
>>>
>>>
>>>
>>> On Fri, Jul 17, 2020 at 8:04 PM Luke Cwik  wrote:
>>>
 +dev 

 On Fri, Jul 17, 2020 at 8:03 PM Luke Cwik  wrote:

> +Heejong Lee  +Chamikara Jayalath
> 
>
> Do you know if your trial record has an empty key or value?
> If so, then you hit a bug and it seems as though there was a miss
> supporting this usecase.
>
> Heejong and Cham,
> It looks like the Javadoc for ByteArrayDeserializer and other
> Deserializers can return null[1, 2] and we aren't using
> NullableCoder.of(ByteArrayCoder.of()) in the expansion[3]. Note that the
> non XLang KafkaIO does this correctly in its regular coder inference
> logic[4]. I flied BEAM-10529[5]
>
> 1:
> https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/ByteArrayDeserializer.html#deserialize-java.lang.String-byte:A-
> 2:
> https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/StringDeserializer.html#deserialize-java.lang.String-byte:A-
> 3:
> https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L478
> 4:
> https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/LocalDeserializerProvider.java#L85
> 5: https://issues.apache.org/jira/browse/BEAM-10529
>
>
> On Fri, Jul 17, 2020 at 8:51 AM ayush sharma <1705ay...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I am trying to build a streaming beam pipeline in python which should
>> capture messages from kafka and then execute further stages of data
>> fetching from other sources and aggregation. The step-by-step process of
>> what I have built till now is:
>>
>>1.
>>
>>Running Kafka instance on localhost:9092
>>
>>./bin/kafka-server-start.sh ./config/server.properties
>>2.
>>
>>Run beam-flink job server using docker
>>
>>docker run --net=host apache/beam_flink1.10_job_server:latest
>>3.
>>
>>Run beam-kafka pipeline
>>
>> import apache_beam as beamfrom apache_beam.io.external.kafka import 
>> ReadFromKafka, WriteToKafkafrom apache_beam.options.pipeline_options 
>> import PipelineOptions, StandardOptions
>>
>> if __name__ == '__main__':
>> options = PipelineOptions([
>> "--job_endpoint=localhost:8099",
>> "--environment_type=LOOPBACK",
>> "--streaming",
>> "--environment_config={\"command\":\"/opt/apache/beam/boot\"}",
>> ])
>>
>> options = options.view_as(StandardOptions)
>> 

Re: Testing Apache Beam pipelines / python SDK

2020-07-21 Thread Robert Bradshaw
If you don't want to write to an actual file, the example with the Check
transform should allow you to use Check(...) as you would a sink. (I
realize this should have been

  run_my_pipeline(
  beam.Create([...]),
  "Check1" >> Check(equal_to([...])),
  "Check2" >> Check(any_callable_that_validates_result2))

to give distinct labels.

As for bigquery, yes, these are integration tests that write to real
bigquery. You can alternatively check to see the PCollection has the right
things you wanted to write.


On Tue, Jul 21, 2020 at 1:32 PM Sofia’s World  wrote:

> Hello Robert
>  could you point me to a test sample where a 'mock' sink is used?
> do you guys have a testing package , which provide an in memory sink where
> for example i can dump the result of
> my pipeline (as opposed to writing to a file) ?
> Additionally, what is the best way to test writing to BigQuery?
> I have seen this file
>
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/cookbook/bigtableio_it_test.py
> but it appears it writes to real big query?
>
> kind regards
>  Marco
>
>
>
>
>
>
>
> On Fri, Jul 17, 2020 at 11:05 PM Robert Bradshaw 
> wrote:
>
>> If want a full end-to-end integration test of your pipeline, what you can
>> do is:
>>
>> 1) Write your input data to temporary files.
>> 2) Run your pipeline, which writes its output somewhere (ideally a
>> temp location as well).
>> 3) Open up the outputs and see if it was as expected.
>>
>> This is similar to the test at
>>
>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/wordcount_test.py
>> , but a bit heavy weight.
>>
>> Another way to validate your pipeline is to refactor the code so the
>> inputs and outputs are pluggable. For example, you could write
>>
>> def run_my_pipeline(input):
>>[all your pipeline logic goes here]
>>[this could also be wrapped up as a PTransform]
>>return result1, result2
>>
>> def main(...):
>>   with beam.Pipeline(...) as p:
>> input = p | beam.io.ReadFromText(...)
>> result1, result2 = run_my_pipeline(input)
>> result1 | beam.io.WriteToSomewhere(...)
>> result2 | beam.io.WriteToSomewhereElse(...)
>>
>> def test():
>>   with beam.Pipeline(...) as p:
>> input = p | beam.Create(...)
>> result1, result2 = run_my_pipeline(input)
>> assert_that(result1, equal_to([...]))
>> assert_that(result2, any_callable_that_validates_result2,
>> label="Check2")
>>
>> You could also parameterize things on your sinks and sources, e.g.
>>
>> def run_my_pipeline(source, sink1, sink2):
>>with beam.Pipeline(...) as p:
>>  input = p | source
>>  ...
>>  result1 | sink1
>>  result2 | sinkn2
>>
>> def main(...):
>>   run_my_pipeline(
>>   beam.io.ReadFromText(...),
>>   beam.io.WriteToSomewhere(...),
>>   beam.io.WriteToSomewhereElse(...))
>>
>> def test():
>>
>>   class Check(beam.PTransform):
>> def __init__(checker):
>>   self._checker = checker
>> def expand(pcoll):
>>   assert_that(pcoll, self._checker)
>>
>>   run_my_pipeline(
>>   beam.Create([...]),
>>   Check1(equal_to([...])),
>>   Check2(any_callable_that_validates_result2))
>>
>> or various permutations thereof.
>>
>> Is that more what you're looking for?
>>
>>
>>
>> On Fri, Jul 17, 2020 at 2:46 PM Sofia’s World 
>> wrote:
>> >
>> > Hello Robert
>> >  thanks but i think i am either missing the point or not expressing
>> clearly my goal.
>> > I had a look at the util_test.py, and i see that in those tests
>> pipelines are being created as part of tests., and  in these tests what are
>> being tested are beam functions - eg beam.Map  etc.
>> > I am after testing a pipeline as a whole. Taking this example,
>> >
>> > p = beam.Pipeline(options=pipeline_options)
>> > lines = (p
>> >  | 'Get List of Tickers' >> ReadFromText(input_file)
>> >  | 'Split fields'  >> beam.Map(split_fields)
>> >  | 'Map to String' >> beam.Map(add_year)
>> >
>> > what i am trying to do is to test a full pipeline run, like in the test
>> example below
>> >
>> > from mypackage.email_pipeline import run
>> >
>> > @patch('testing.email_pipeline.ReadFromText')
>> > def test_create_pipelne(self, mock_read_from_text):

Re: Testing Apache Beam pipelines / python SDK

2020-07-17 Thread Robert Bradshaw
If want a full end-to-end integration test of your pipeline, what you can do is:

1) Write your input data to temporary files.
2) Run your pipeline, which writes its output somewhere (ideally a
temp location as well).
3) Open up the outputs and see if it was as expected.

This is similar to the test at
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/wordcount_test.py
, but a bit heavy weight.

Another way to validate your pipeline is to refactor the code so the
inputs and outputs are pluggable. For example, you could write

def run_my_pipeline(input):
   [all your pipeline logic goes here]
   [this could also be wrapped up as a PTransform]
   return result1, result2

def main(...):
  with beam.Pipeline(...) as p:
input = p | beam.io.ReadFromText(...)
result1, result2 = run_my_pipeline(input)
result1 | beam.io.WriteToSomewhere(...)
result2 | beam.io.WriteToSomewhereElse(...)

def test():
  with beam.Pipeline(...) as p:
input = p | beam.Create(...)
result1, result2 = run_my_pipeline(input)
assert_that(result1, equal_to([...]))
assert_that(result2, any_callable_that_validates_result2, label="Check2")

You could also parameterize things on your sinks and sources, e.g.

def run_my_pipeline(source, sink1, sink2):
   with beam.Pipeline(...) as p:
 input = p | source
 ...
 result1 | sink1
 result2 | sinkn2

def main(...):
  run_my_pipeline(
  beam.io.ReadFromText(...),
  beam.io.WriteToSomewhere(...),
  beam.io.WriteToSomewhereElse(...))

def test():

  class Check(beam.PTransform):
def __init__(checker):
  self._checker = checker
def expand(pcoll):
  assert_that(pcoll, self._checker)

  run_my_pipeline(
  beam.Create([...]),
  Check1(equal_to([...])),
  Check2(any_callable_that_validates_result2))

or various permutations thereof.

Is that more what you're looking for?



On Fri, Jul 17, 2020 at 2:46 PM Sofia’s World  wrote:
>
> Hello Robert
>  thanks but i think i am either missing the point or not expressing clearly 
> my goal.
> I had a look at the util_test.py, and i see that in those tests pipelines are 
> being created as part of tests., and  in these tests what are being tested 
> are beam functions - eg beam.Map  etc.
> I am after testing a pipeline as a whole. Taking this example,
>
> p = beam.Pipeline(options=pipeline_options)
> lines = (p
>  | 'Get List of Tickers' >> ReadFromText(input_file)
>  | 'Split fields'  >> beam.Map(split_fields)
>  | 'Map to String' >> beam.Map(add_year)
>
> what i am trying to do is to test a full pipeline run, like in the test 
> example below
>
> from mypackage.email_pipeline import run
>
> @patch('testing.email_pipeline.ReadFromText')
> def test_create_pipelne(self, mock_read_from_text):
> mock_read_from_text.return_value = ['One',
> 'Two',
> 'Three']
>
> test_pipeline = TestPipeline(is_integration_test=True)
> pipeline_verifiers = [
> PipelineStateMatcher(),
> ]
> extra_opts = {
> 'input_table': 'testtable',
> 'num_records': 1,
> 'beam_bq_source': 'source',
> 'on_success_matcher': all_of(*pipeline_verifiers)
> }
> result = run(
> test_pipeline.get_full_options_as_args(**extra_opts))
>
> print(result)
>
> Basically, i would expect a PCollection as result of the pipeline, and i 
> would be testing the content of the PCollection
>
> Running this results in this messsage
>
> IT is skipped because --test-pipeline-options is not specified
>
> Would you be able to advise on this?
>
> kind regards
>
>  Marco
>
>
>
>
>
>
>
>
>
>
>
>
> On Mon, Jul 13, 2020 at 10:43 PM Robert Bradshaw  wrote:
>>
>> You can use apache_beam.testing.util.assert_that to write tests of
>> Beam pipelines. This is what Beam uses for its tests, e.g.
>>
>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util_test.py#L80
>>
>> On Mon, Jul 13, 2020 at 2:36 PM Sofia’s World  wrote:
>> >
>> > Hi all
>> >  i was wondering if anyone could provide pointers on how  to test a 
>> > pipeline in python.
>> > I have the following pipeline
>> >
>> > lines = (p
>> >  | 'Get List of Tickers' >> beam.Map(get_tickers)
>> >  | 'Split fields'  >> beam.Map(split_fields)
>> >  | 'Map to String' >> beam.Map(add_year)
>> >  )
>> > result = p.run()
>> >
>> > Now i can easily test each individual function for each step (get_tickers, 
>> > split_fields, add_year)
>> >
>> > but is there a way to test the pipeline 'as a whole' ?#
>> >
>> > Could anyone point me to some examples?
>> >
>> > kind regards
>> >
>> >
>> >
>> >
>> >


Re: Testing Apache Beam pipelines / python SDK

2020-07-13 Thread Robert Bradshaw
You can use apache_beam.testing.util.assert_that to write tests of
Beam pipelines. This is what Beam uses for its tests, e.g.

https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util_test.py#L80

On Mon, Jul 13, 2020 at 2:36 PM Sofia’s World  wrote:
>
> Hi all
>  i was wondering if anyone could provide pointers on how  to test a pipeline 
> in python.
> I have the following pipeline
>
> lines = (p
>  | 'Get List of Tickers' >> beam.Map(get_tickers)
>  | 'Split fields'  >> beam.Map(split_fields)
>  | 'Map to String' >> beam.Map(add_year)
>  )
> result = p.run()
>
> Now i can easily test each individual function for each step (get_tickers, 
> split_fields, add_year)
>
> but is there a way to test the pipeline 'as a whole' ?#
>
> Could anyone point me to some examples?
>
> kind regards
>
>
>
>
>


Re: Not able to see WordCount output in docker /tmp/...

2020-07-07 Thread Robert Bradshaw
Does it work when you write to a distributed filesystem? (One issue
with Docker is that the manager and each of their workers have their
own local filesystem.)

On Tue, Jul 7, 2020 at 2:17 PM Avijit Saha  wrote:
>
> While  trying to run the Beam WordCount example on Flink runner using Job 
> Manager and Taskmanager docker images using the input pom.xml file, I can see 
> the task manager is creating the following files but no output inside the 
> counts- file as expected - Any pointer what might be wrong here?
>
> Thanks
>
> bash-4.4$ ls -lrt /tmp
> total 4
> drwxr-xr-x1 flinkflink2 Jul  7 21:15 hsperfdata_flink
> -rw-r--r--1 flinkflink 1179 Jul  7 21:15 
> jaas-3210126497662256938.conf
> drwxr-xr-x1 flinkflink0 Jul  7 21:15 
> flink-io-5922e9cc-91ee-4ae3-bee3-90006db36aaa
> drwxr-xr-x1 flinkflink0 Jul  7 21:15 
> blobStore-1f336cf6-8885-4aff-bd36-ddf38f11c0a0
> drwxr-xr-x1 flinkflink0 Jul  7 21:15 
> blobStore-13b09c7f-e371-430f-90aa-802aec50735e
> drwxr-xr-x1 flinkflink0 Jul  7 21:15 
> flink-dist-cache-6330c1d8-3ab8-4baa-812b-485d6d850c1d
> -rw-r--r--1 flinkflink0 Jul  7 21:15 counts-0-of-1
> drwxr-xr-x1 flinkflink0 Jul  7 21:15 
> flink-netty-shuffle-69803f1d-c641-41b0-a090-7dc542ed98f4
> drwxr-xr-x1 flinkflink0 Jul  7 21:15 localState


Re: Understanding combiner's distribution logic

2020-07-01 Thread Robert Bradshaw
On Tue, Jun 30, 2020 at 3:32 PM Julien Phalip  wrote:

> Thanks Luke!
>
> One part I'm still a bit unclear about is how exactly the PreCombine stage
> works. In particular, I'm wondering how it can perform the combination
> before the GBK. Is it because it can already compute the combination on
> adjacent elements that happen to share the same key?
>

Yes, exactly. Even non-adjacent elements within the same bundle (e.g. a
runner might instruct one worker to process a the first half of a file, and
another worker the second half; these two halves would each be a "bundle").

Basically what happens is a "DoFn" gets inserted right before the GBK that,
upon receipt of a KV, buffers elements by key in a hashtable in case it
sees the key again and can do some pre-emptive combining, and then emits
them all in finishing. Essentially.

public void processElement(KV element) {
  if (this.buffer.contains(element.getKey())) {
// merge element.value with this.buffer.get(element.getValue())
  } else {
this.buffer.put(element.getKey(), element.getValue());  well,
really creates a new acumulator here
  }
}

public void finishBundle(Context context) {
  for (kv : this.buffer.entrySet()) {
context.output(kv);
  }
}

(The actual logic is a bit more complicated to avoid unbounded growth in
the table, etc.)

Could you also clarify the term "lifting" in this context? Does that refer
> to the act of pushing a partial combination before the GBK?
>

Lifting is just the terminology used to designate that when the graph looks
like

... -> (GroupByKey -> Combine) -> ...

the runner may turn it into

... -> PartialCombine -> GroupByKey -> FinishCombine -> ...


> On Tue, Jun 30, 2020 at 12:34 PM Luke Cwik  wrote:
>
>> Your reasoning is correct around the withHotkeyFanout hint and it is to
>> help runners know that there is likely one or more keys that will have
>> significantly more data then the others but the logic around how it is
>> broken up is runner dependent and whether they rely on the hint or not is
>> also runner dependent. If a runner was smart enough, it wouldn't need the
>> hint and could automatically detect hotkeys and do the right thing. I would
>> take a look at this doc[1] to learn about how the optimization can work
>> from a runners perspective. Some runners never perform the PreCombine,
>> while others may have multiple rounds of it but the most common case is
>> that there is only a single PreCombine (assuming it is allowed).
>>
>> 1: https://s.apache.org/beam-runner-api-combine-model
>>
>> On Tue, Jun 30, 2020 at 10:56 AM Julien Phalip  wrote:
>>
>>> Hi,
>>>
>>> I had a question about how combiners work, particularly on how the
>>> combined PCollection's subsets are initially formed.
>>>
>>> I understand that, according to the documentation
>>> , a
>>> combiner allows parallelizing the computation to multiple workers by
>>> breaking up the PCollection into subsets. I like the database analogy given
>>> in this post
>>> ,
>>> which says that it is similar to pushing down a predicate.
>>>
>>> I also understand that it is possible to use withFanout or
>>> withHotkeyFanout to provide some explicit logic as a hint on how to
>>> manage the distribution.
>>>
>>> What is unclear to me, however, is whether by default the runner already
>>> plans the distribution of the computation, even when no explicit hints are
>>> provided. I'm guessing perhaps it always breaks up the PCollection into
>>> bundles
>>> 
>>> (similar to DoFns), then the combiner runs the combination on each bundle,
>>> saves the result into intermediary accumulators, and those results then
>>> bubble up recursively to the top? If that's the case, then I assume that
>>> the purpose of withFanout and withHotKeyFanout is to further break up
>>> those initially pre-created bundles into even smaller subsets? Or am I
>>> guessing this wrong? :)
>>>
>>> I couldn't find a clear description in the documentation on how the
>>> PCollection subsets are initially formed. Please let me know if you have
>>> some details on that, or if it is already documented somewhere.
>>>
>>> Thank you!
>>>
>>> Julien
>>>
>>


Re: PaneInfo showing UNKOWN State

2020-05-26 Thread Robert Bradshaw
To clarify, PaneInfo is supported on the FnAPI local runner, but not on the
bundle based one. Unfortunately, Streaming is not supported on the FnAPI
one (yet), but work there is ongoing.

On Tue, May 26, 2020 at 11:46 AM Pablo Estrada  wrote:

> Hi Jayadeep,
> Unfortunately, it seems that PaneInfo is not well supported yet on the
> local runners: https://issues.apache.org/jira/browse/BEAM-3759
>
> Can you share more about your use case, and what you'd like to do with the
> PaneInfo?
>
> On Sat, May 23, 2020 at 10:03 AM Jay  wrote:
>
>> Hi All -
>>
>> Below is a sample code written in Python which reads data from Pub/Sub
>> and tries to determine the PaneInfo for different elements
>>
>> There are 3 rows of data as shown below
>>
>> {"country":"USA","user_id": 1,"ts": 0},
>>> {"country":"USA","user_id": 2,"ts": 0},
>>> {"country":"USA","user_id": 3,"ts": 8}
>>
>>
>> Below is the sample code which modifies the event timestamp for the
>> messages
>>
>>> class AddTimestampDoFn(beam.DoFn):
>>>   def process(self, element):
>>> unix_timestamp = (datetime.datetime.now()).timestamp() +
>>> element["ts"]
>>> yield beam.window.TimestampedValue(element, unix_timestamp)
>>
>>
>> Below is a Helper class to retrieve the timestamp. This has no importance
>> apart from checking to see if the timestamp has been set correctly
>>
>>> class AddTimestamp(beam.DoFn):
>>>   def process(self, element, timestamp=beam.DoFn.TimestampParam):
>>> yield (timestamp.to_utc_datetime(), element)
>>
>>
>> Code below reads the records from Pub/Sub and runs the ParDo's mentioned
>> above
>>
>> data = p | "read"  >>
>>> beam.io.ReadFromPubSub(subscription=subscription)
>>>  | "JsonConvert"   >> beam.Map(json.loads)
>>
>> sliding_windows = (
>>>   data | 'ConvertIntoUserEvents' >> beam.ParDo(AddTimestampDoFn())
>>>| 'AddTimestamp'  >> beam.ParDo(AddTimestamp())
>>> )
>>
>>
>> Below is my trigger definition and the implementation
>>
>>>
>>>
>>> class ProcessRecord(beam.DoFn):
>>>   def process(self, element,
>>> window=beam.DoFn.WindowParam,pane_info=beam.DoFn.PaneInfoParam):
>>>  # access pane info e.g pane_info.is_first, pane_info.is_last,
>>> pane_info.timing
>>>  yield (element,
>>> datetime.datetime.now(),window.start.to_utc_datetime(),
>>> window.end.to_utc_datetime(), pane_info.timing, pane_info.is_first,
>>> pane_info.is_last)
>>
>>
>>
>>> window_fn  = beam.window.FixedWindows(10)
>>> trigger_fn = beam.transforms.trigger.AfterWatermark(early=AfterCount(1))
>>> acc_dis_fn = beam.transforms.trigger.AccumulationMode.ACCUMULATING
>>> new_final = sliding_windows | "acc_30" >> beam.WindowInto(
>>> window_fn,
>>> trigger= trigger_fn,
>>> accumulation_mode=acc_dis_fn
>>> ) | "acc_30_par" >> beam.ParDo(ProcessRecord())
>>
>>
>>
>>  When I look at the output I see the below
>>
>> 0 (datetime.datetime(2020, 5, 23, 16, 44, 9, 598681), {'country': 'USA',
>> 'user_id': 1, 'ts': 0}) 2020-05-23 16:44:45.895890 2020-05-23 16:44:00 
>> 2020-05-23
>> 16:44:10 3 True True
>> 1 (datetime.datetime(2020, 5, 23, 16, 44, 9, 995521), {'country': 'USA',
>> 'user_id': 2, 'ts': 0}) 2020-05-23 16:44:46.297163 2020-05-23 16:44:00 
>> 2020-05-23
>> 16:44:10 3 True True
>> 2 (datetime.datetime(2020, 5, 23, 16, 44, 17, 995603), {'country':
>> 'USA', 'user_id': 3, 'ts': 8}) 2020-05-23 16:44:46.297259 2020-05-23
>> 16:44:10 2020-05-23 16:44:20 3 True True
>> As can be observed above there are two Windows that have been defined
>> which is inline with the data and the FixedWindow strategy
>> Window 1 - 2020-05-23 16:44:00, 2020-05-23 16:44:10
>> Window 2 - 2020-05-23 16:44:10, 2020-05-23 16:44:20
>>
>> Couple of questions which I am not able to understand
>> 1. Why is the PaneInfo.Timing value returned as "3" (UNKNOWN) instead of
>> (EARLY, ON_TIME) ?
>> 2. Shouldn't for Window1 and Window 2 there should be two firings one
>> EARLY and one ON_TIME ?
>> 3. The last two boolean values are is_first and is_last again both have
>> been set to TRUE which doesn't look right.
>>
>>
>> Can someone suggest on what can be the issue ?
>>
>> Thanks,
>> Jayadeep
>>
>


Re: Timers exception on "Job Drain" while using stateful beam processing in global window

2020-05-18 Thread Robert Bradshaw
Glad you were able to get this working; thanks for following up.

On Mon, May 18, 2020 at 10:35 AM Mohil Khare  wrote:

> Hi,
> On another note, I think I was unnecessarily complicating things by
> applying a sliding window here and then an extra global window to remove
> duplicates.  I replaced the *sliding window with a session window *(*as I
> know that my transaction consisting of recordA logs and recordB logs for a
> key "MyKey" won't last for more than 60-90 secs*), and my use case seems
> to be working fine. Even DRAIN is working successfully.
>
> Thanks
> Mohil
>
> On Sun, May 17, 2020 at 3:37 PM Mohil Khare  wrote:
>
>> Hello,
>>
>> I have a use case where I have two sets of PCollections (RecordA and
>> RecordB) coming from a real time streaming source like Kafka.
>>
>> Both Records are correlated with a common key, let's say KEY.
>>
>> The purpose is to enrich RecordA with RecordB's data for which I am using
>> CoGbByKey. Since RecordA and RecordB for a common key can come within 1-2
>> minutes of event time, I am maintaining a sliding window for both records
>> and then do CoGpByKey for both PCollections.
>>
>> The sliding windows that will find both RecordA and RecordB for a common
>> key KEY, will emit enriched output. Now, since multiple sliding windows can
>> emit the same output, I finally remove duplicate results by feeding
>> aforementioned outputs to a global window where I maintain a state to check
>> whether output has already been processed or not. Since it is a global
>> window, I maintain a Timer on state (for GC) to let it expire after 10
>> minutes have elapsed since state has been written.
>>
>> This is working perfectly fine w.r.t the expected results. However, I am
>> unable to stop job gracefully i.e. Drain the job gracefully. I see
>> following exception:
>>
>> java.lang.IllegalStateException:
>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn@16b089a6 received
>> state cleanup timer for window
>> org.apache.beam.sdk.transforms.windowing.GlobalWindow@29ca0210 that is
>> before the appropriate cleanup time 294248-01-24T04:00:54.776Z
>>
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:842)
>>
>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processSystemTimer(SimpleParDoFn.java:384)
>>
>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.access$700(SimpleParDoFn.java:73)
>>
>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn$TimerType$2.processTimer(SimpleParDoFn.java:444)
>>
>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:467)
>>
>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:354)
>>
>> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:52)
>>
>> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85)
>>
>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1316)
>>
>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149)
>>
>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1049)
>>
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> java.lang.Thread.run(Thread.java:745)
>> java.lang.IllegalStateException:
>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn@59902a10 received
>> state cleanup timer for window
>> org.apache.beam.sdk.transforms.windowing.GlobalWindow@29ca0210 that is
>> before the appropriate cleanup time 294248-01-24T04:00:54.776Z
>>
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:842)
>>
>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processSystemTimer(SimpleParDoFn.java:384)
>>
>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.access$700(SimpleParDoFn.java:73)
>>
>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn$TimerType$2.processTimer(SimpleParDoFn.java:444)
>>
>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:467)
>>
>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:354)
>>
>> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:52)
>>
>> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85)
>>
>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1316)
>>
>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149)
>>
>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1049)
>>
>> 

Re: Portable Runner performance optimisation

2020-05-15 Thread Robert Bradshaw
I don't think this is being worked on, but given that Java already supports
the LOOPBACK environment (which is a special case of EXTERNAL) it would
just be a matter of properly parsing the flags.

On Fri, May 15, 2020 at 9:52 AM Alexey Romanenko 
wrote:

> Thanks! It looks that this is exactly what I need, though mostly for Java
> SDK.
> Don't you know if anyone works on this Jira?
>
> On 15 May 2020, at 18:01, Kyle Weaver  wrote:
>
> > Yes, you can start docker containers before hand using the worker_pool
> option:
>
> However, it only works for Python. Java doesn't have it yet:
> https://issues.apache.org/jira/browse/BEAM-8137
>
> On Fri, May 15, 2020 at 12:00 PM Kyle Weaver  wrote:
>
>> > 2. Is it possible to pre-run SDK Harness containers and reuse them for
>> every Portable Runner pipeline? I could win quite a lot of time on this for
>> more complicated pipelines.
>>
>> Yes, you can start docker containers before hand using the worker_pool
>> option:
>>
>> docker run -p=5:5 apachebeam/python3.7_sdk --worker_pool # or
>> some other port publishing
>>
>> and then in your pipeline options set:
>>
>> --environment_type=EXTERNAL --environment_config=localhost:5
>>
>> On Fri, May 15, 2020 at 11:47 AM Alexey Romanenko <
>> aromanenko@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> I’m trying to optimize my pipeline runtime while using it with Portable
>>> Runner and I have some related questions.
>>>
>>> This is a cross-language pipeline, written in Java SDK, and which
>>> executes some Python code through “External.of()” transform and my custom
>>> Python Expansion Service. I use Docker-based SDK Harness for Java and
>>> Python. In a primitive form the pipeline would look like this:
>>>
>>>
>>> [Source (Java)] -> [MyTransform1 (Java)] ->  [External (Execute Python
>>> code with Python SDK) ] - >  [MyTransform2 (Java SDK)]
>>>
>>>
>>>
>>> While running this pipeline with Portable Spark Runner, I see that quite
>>> a lot of time we spend for artifacts staging (in our case, we have quite a
>>> lot of artifacts in real pipeline) and launching a Docker container for
>>> every Spark stage. So, my questions are the following:
>>>
>>> 1. Is there any internal Beam functionality to pre-stage or, at least
>>> cache, already staged artifacts? Since the same pipeline will be executed
>>> many times in a row, there is no reason to stage the same artifacts every
>>> run.
>>>
>>> 2. Is it possible to pre-run SDK Harness containers and reuse them for
>>> every Portable Runner pipeline? I could win quite a lot of time on this for
>>> more complicated pipelines.
>>>
>>>
>>>
>>> Well, I guess I can find some workarounds for that but I wished to ask
>>> before that perhaps there is a better way to do that in Beam.
>>>
>>>
>>> Regards,
>>> Alexey
>>
>>
>


Re: adding apt-get to setup.py fails passing apt-get commands

2020-05-04 Thread Robert Bradshaw
Printing the result out shouldn't matter, but as mentioned in the doce
Popen.communicate is not intended to be used when the amount of output is
large. If you need to just run the process, I would recommend a simple
subprocess.check_output().

On Mon, May 4, 2020 at 9:00 AM OrielResearch Eila Arich-Landkof <
e...@orielresearch.org> wrote:

> I have updated the setup.py to print the error from the installation and
> it is working now. I have no idea why that would change anything. Happy to
> learn if someone knows the reason
>
> *The code before:*
> stdout_data, _ = p.communicate()
> print('Command output: %s' % stdout_data)
> *The code now:*
> stdout_data, *stderr_data* = p.communicate()
> print('Command output: %s' % stdout_data)
> *print('Command error data : %s' % stderr_data)*
>
> So that issue is resolved for me.
> Thanks,
> Eila
>
> On Sat, May 2, 2020 at 11:27 PM OrielResearch Eila Arich-Landkof <
> e...@orielresearch.org> wrote:
>
>> Hi all,
>>
>> I have experience and very odd behaviour.
>> when executing the setup.py with the following CUSTOM COMMAND
>> CUSTOM_COMMANDS = [['echo', 'Custom command worked!'],
>>['apt-get', 'update'],
>>['apt-get', 'install', '-y', 'unzip']]
>> everything works great.
>> when executing:
>> CUSTOM_COMMANDS = [['echo', 'Custom command worked!'],
>>['apt-get', 'update'],
>>['apt-get', 'install', '-y', 'unzip'],
>>['apt-get', 'install', '-y', 'perl'],
>>['apt-get', 'install', '-y', 'default-jre']]
>> unzip install return error:
>> RuntimeError: Command ['apt-get', 'install', '-y', 'unzip'] failed: exit
>> code: 100
>>
>> Any idea what is the issue? it is either something super simple or
>> workaround for some issue that I am not aware of .
>> I am using the format of
>> https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples/complete/juliaset
>> Thanks,
>> --
>> Eila
>> 
>> Meetup 
>>
>
>
> --
> Eila
> 
> Meetup 
>


Re: Kafka IO: value of expansion_service

2020-04-28 Thread Robert Bradshaw
https://github.com/apache/beam/pull/11557

On Tue, Apr 28, 2020 at 9:28 AM Robert Bradshaw  wrote:

> Java dependencies are not yet fully propagated over the expansion service,
> which might be what you're running into. I'm actually in the process of
> putting together a PR to fix this; I'll let you know when it's ready.
>
> On Mon, Apr 27, 2020 at 9:14 AM Kyle Weaver  wrote:
>
>> I'm not sure about the org.springframework.expression.EvaluationContext
>> issue, but "local class incompatible" usually happens when using Beam
>> components built from different sources. Make sure to rebuild everything
>> from the same commit.
>>
>> On Sat, Apr 25, 2020 at 10:07 AM Piotr Filipiuk 
>> wrote:
>>
>>> After syncing to:
>>>
>>> commit 24361d1b5981ef7d18e586a8e5deaf683f4329f1 (HEAD -> master,
>>> origin/master, origin/HEAD)
>>> Author: Ning Kang 
>>> Date:   Fri Apr 24 10:58:07 2020 -0700
>>>
>>> The new error is:
>>>
>>> RuntimeError: java.lang.IllegalArgumentException: unable to deserialize
>>> Custom DoFn With Execution Info
>>> at
>>> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74)
>>> at
>>> org.apache.beam.runners.core.construction.ParDoTranslation.doFnWithExecutionInformationFromProto(ParDoTranslation.java:697)
>>> at
>>> org.apache.beam.runners.core.construction.ParDoTranslation.getDoFn(ParDoTranslation.java:360)
>>> at
>>> org.apache.beam.fn.harness.FnApiDoFnRunner.(FnApiDoFnRunner.java:356)
>>> at
>>> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.createRunnerForPTransform(FnApiDoFnRunner.java:165)
>>> at
>>> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.createRunnerForPTransform(FnApiDoFnRunner.java:141)
>>> at
>>> org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:233)
>>> at
>>> org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:196)
>>> at
>>> org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:196)
>>> at
>>> org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:196)
>>> at
>>> org.apache.beam.fn.harness.control.ProcessBundleHandler.createBundleProcessor(ProcessBundleHandler.java:474)
>>> at
>>> org.apache.beam.fn.harness.control.ProcessBundleHandler.lambda$processBundle$0(ProcessBundleHandler.java:271)
>>> at
>>> org.apache.beam.fn.harness.control.ProcessBundleHandler$BundleProcessorCache.get(ProcessBundleHandler.java:534)
>>> at
>>> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:266)
>>> at
>>> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)
>>> at
>>> org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.io.InvalidClassException:
>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn; local class
>>> incompatible: stream classdesc serialVersionUID = 7311199418509482705,
>>> local class serialVersionUID = 5488866827627794770
>>> at
>>> java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
>>> at
>>> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1942)
>>> at
>>> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1808)
>>> at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2099)
>>> at
>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
>>> at
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344)
>>> at
>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)
>>> at
>>&

Re: Kafka IO: value of expansion_service

2020-04-28 Thread Robert Bradshaw
Java dependencies are not yet fully propagated over the expansion service,
which might be what you're running into. I'm actually in the process of
putting together a PR to fix this; I'll let you know when it's ready.

On Mon, Apr 27, 2020 at 9:14 AM Kyle Weaver  wrote:

> I'm not sure about the org.springframework.expression.EvaluationContext
> issue, but "local class incompatible" usually happens when using Beam
> components built from different sources. Make sure to rebuild everything
> from the same commit.
>
> On Sat, Apr 25, 2020 at 10:07 AM Piotr Filipiuk 
> wrote:
>
>> After syncing to:
>>
>> commit 24361d1b5981ef7d18e586a8e5deaf683f4329f1 (HEAD -> master,
>> origin/master, origin/HEAD)
>> Author: Ning Kang 
>> Date:   Fri Apr 24 10:58:07 2020 -0700
>>
>> The new error is:
>>
>> RuntimeError: java.lang.IllegalArgumentException: unable to deserialize
>> Custom DoFn With Execution Info
>> at
>> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74)
>> at
>> org.apache.beam.runners.core.construction.ParDoTranslation.doFnWithExecutionInformationFromProto(ParDoTranslation.java:697)
>> at
>> org.apache.beam.runners.core.construction.ParDoTranslation.getDoFn(ParDoTranslation.java:360)
>> at
>> org.apache.beam.fn.harness.FnApiDoFnRunner.(FnApiDoFnRunner.java:356)
>> at
>> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.createRunnerForPTransform(FnApiDoFnRunner.java:165)
>> at
>> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.createRunnerForPTransform(FnApiDoFnRunner.java:141)
>> at
>> org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:233)
>> at
>> org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:196)
>> at
>> org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:196)
>> at
>> org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:196)
>> at
>> org.apache.beam.fn.harness.control.ProcessBundleHandler.createBundleProcessor(ProcessBundleHandler.java:474)
>> at
>> org.apache.beam.fn.harness.control.ProcessBundleHandler.lambda$processBundle$0(ProcessBundleHandler.java:271)
>> at
>> org.apache.beam.fn.harness.control.ProcessBundleHandler$BundleProcessorCache.get(ProcessBundleHandler.java:534)
>> at
>> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:266)
>> at
>> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)
>> at
>> org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.io.InvalidClassException:
>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn; local class
>> incompatible: stream classdesc serialVersionUID = 7311199418509482705,
>> local class serialVersionUID = 5488866827627794770
>> at
>> java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
>> at
>> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1942)
>> at
>> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1808)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2099)
>> at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344)
>> at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)
>> at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
>> at
>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:465)
>> at
>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:423)
>> at
>> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:71)
>> ... 18 more
>>
>> I am not sure it is related to
>> https://issues.apache.org/jira/browse/BEAM-9745.
>>
>> On Wed, Apr 22, 2020 at 2:48 PM Piotr Filipiuk 
>> wrote:
>>
>>> Here is an error I am getting when using DirectRunner:
>>>
>>> DEBUG:apache_beam.runners.portability.fn_api_runner.fn_runner:Wait for
>>> the bundle bundle_1 to finish.
>>> 150f165c51d9ffbd902b6e80f691d095eb233812bb780625a95ab96a1134d951
>>> 

Re: Stateful & Timely Call

2020-04-23 Thread Robert Bradshaw
I may have misinterpreted your email, I thought you didn't have a need for
keys at all. If this is actually the case, you don't need a GroupByKey,
just have your DoFn take Rows as input, and emit List as output. That
is, it's a DoFn>.

You can buffer multiple Rows in an instance variable between process
element calls. For example,

class MyBufferingDoFn> {
  List buffer = new ArrayList<>();
  @ProcessElement public void processElement(T elt, OutputReceiver>
out) {
buffer.append(out);
if (buffer.size() > 100) {
  out.output(buffer);
  buffer = new ArrayList<>();
}
  }
  @FinishBundle public void finishBundle(OutputReceiver> out) {
out.output(buffer);
buffer = new ArrayList<>();
  }
}

See
https://beam.apache.org/releases/javadoc/2.20.0/org/apache/beam/sdk/transforms/ParDo.html
for
more information on the lifetime of DoFns.

As for why your GBK is taking so long, yes, this can be a bottleneck.
However, it should be noted that Dataflow (like most other runners)
executes this step in conjunction with other steps as part of a "fused
stage." So if your pipeline looks like

Read -> DoFnA -> GBK -> DoFnB -> Write

then Read, DoFnA, and GBK[part1] will execute concurrently (all starting up
almost immediately), one element at at time, and when that's finished,
GBK[part2, DoFnB, Write will execute concurrently, one element at a time,
so you can't just look at the last unfinished stage to determine where the
bottleneck is. (One helpful tool, however, is looking at the amount of time
spent on each step in the UI.)

Hopefully that helps.

- Robert


On Thu, Apr 23, 2020 at 12:43 PM Aniruddh Sharma 
wrote:

> Thanks Robert and Luke
>
> This approach seems good to me. I am trying that , i have to include a
> GroupBy to make Iterable available to do ParDo function to do same.
> Now GroupBy is a bottleneck, its working for last 2 hours and proceed only
> 40 GB data (still waiting for rest of 100's of GB of data).
>
> Currently I used GroupByKey.Create()
>
> What's recommended way to use what key to make it execute faster like same
> key for all rows, vs different key for each row vs same row for a group of
> keys.
>
> Thanks
> Aniruddh
>
> On Thu, Apr 23, 2020 at 12:47 PM Luke Cwik  wrote:
>
>> As Robert suggested, what prevents you from doing:
>> ReadFromBQ -> ParDo(BatchInMemory) -> DLP
>> where BatchInMemory stores elements in the @ProcessElement method in an
>> in memory list and produce output every time the list is large enough with
>> a final output in the @FinishBundle method?
>>
>> On Thu, Apr 23, 2020 at 9:42 AM Aniruddh Sharma 
>> wrote:
>>
>>> Hi Luke
>>>
>>> Sorry forgot to mention the functions. Dataflow adds following function
>>> and ["PartitionKeys", new GroupByKeyAndSortValuesOnly] this is super slow,
>>> How to choose keys to make it faster ?
>>>
>>>  .apply("ReifyWindows", ParDo.of(new ReifyWindowedValueFn<>()))
>>>   .setCoder(
>>>   KvCoder.of(
>>>   keyCoder,
>>>   KvCoder.of(InstantCoder.of(),
>>> WindowedValue.getFullCoder(kvCoder, windowCoder
>>>
>>>   // Group by key and sort by timestamp, dropping windows as
>>> they are reified
>>>   .apply("PartitionKeys", new GroupByKeyAndSortValuesOnly<>())
>>>
>>>   // The GBKO sets the windowing strategy to the global default
>>>   .setWindowingStrategyInternal(inputWindowingStrategy);
>>>
>>> THanks
>>> ANiruddh
>>>
>>> On 2020/04/23 16:35:58, Aniruddh Sharma  wrote:
>>> > Thanks Luke for your response.
>>> >
>>> > My use case is following.
>>> > a) I read data from BQ (TableRow)
>>> > b) Convert it into (Table.Row) for DLP calls.
>>> > c) have to batch Table.Row collection up to a max size of 512 KB (i.e
>>> fit may rows from BQ into a single DLP table) and call DLP.
>>> >
>>> > Functionally, I don't have a need of key and window. As I just want to
>>> fit rows in DLP table up to a max size.
>>> >
>>> > In batch mode, when I call StateFulAPI,
>>> > it adds a "BatchStatefulParDoOverrides.GroupByKeyAndSortValuesOnly"
>>> step and this step is super slow. Like it is running on 50 node cluster for
>>> 800 GB data for last 10 hours.
>>> >
>>> > This step is not added when I call Dataflow in streaming mode. But I
>>> can't call it in Streaming mode for other reasons.
>>> >
>>> > So I am trying to understand following
>>> > a) Either I give a hint somehow to Dataflow runner not to add this
>>> step "BatchStatefulParDoOverrides.GroupByKeyAndSortValuesOnly"  at all,
>>> then I don't have any issues.
>>> > b) if it adds this step, then how should I choose my ARTIFICIALLY
>>> created keys that step can execute as fast as possible. It does a SORT by
>>> on timestamps on records. As I don't have any functional key requirement,
>>> shall I choose same keys for all rows vs randomkey for some rows vs random
>>> key for each row; what timestamps shall I add same for all rows ? to make
>>> this function work faster.
>>> >
>>> > Thanks
>>> > Aniruddh
>>> 

  1   2   3   >