n.BoundedPerElement annotation with some kind of
>> (optional) hint - e.g. @DoFn.BoundedPerElement(Bounded.POSSIBLY_HUGE), the
>> default would be Bounded.FITS_IN_MEMORY (which is the current approach)
>>
>> The approach (3) seems to give more information to all runners a
in a way that doesn't require the runner to support unbounded outputs
> from any individual @ProcessElements downcall.
>
> -Daniel
>
> On Wed, Dec 28, 2022 at 1:36 PM Jozef Vilcek
> wrote:
>
>> Hello,
>>
>> I am working on an issue which currently limits spar
Hello,
I am working on an issue which currently limits spark runner by requiring
the result of processElement to fit the memory [1]. This is problematic e.g
for flatMap where the input element is file split and generates possibly
large output.
The intended fix is to add an option to have dofn
I would like to discuss a problem I am facing upgrading Beam 2.24.0 ->
2.33.0.
Running Beam batch jobs on SparkRunner with Spark 2.4.4 stopped showing me
job details on Spark History Server. Problem is that there are 2 event
logging. listener running and they step on each other. More details in
Any feedback on this one please?
On Mon, Dec 13, 2021 at 11:02 AM Jozef Vilcek wrote:
> Yes, field is marked as nullable. Here is a test case change to
> illustrate it:
>
> https://github.com/JozoVilcek/beam/commit/5e1c6324868c2fd6145dd2348c7358fdc787ac38
>
> On Sun, Dec 1
Yes, field is marked as nullable. Here is a test case change to
illustrate it:
https://github.com/JozoVilcek/beam/commit/5e1c6324868c2fd6145dd2348c7358fdc787ac38
On Sun, Dec 12, 2021 at 7:28 PM Reuven Lax wrote:
> Is the schema field marked as nullable?
>
> On Sun, Dec 12, 2021 at 4:21
I did notice that protobuf schema translator supports nullable for proto
fiels [1]. E.g. if I want to a nullable string, then in proto I can use
`google.protobuf.StringValue` and schema will look fine.
However, fromRow creator does not support this and throw exception if it is
presented with row
based on which I can evolve my proposal.
On Wed, Jul 7, 2021 at 10:01 AM Jozef Vilcek wrote:
>
>
> On Sat, Jul 3, 2021 at 7:30 PM Reuven Lax wrote:
>
>>
>>
>> On Sat, Jul 3, 2021 at 1:02 AM Jozef Vilcek
>> wrote:
>>
>>> I don't think thi
On Sat, Jul 3, 2021 at 7:30 PM Reuven Lax wrote:
>
>
> On Sat, Jul 3, 2021 at 1:02 AM Jozef Vilcek wrote:
>
>> I don't think this has anything to do with external shuffle services.
>>
>> Arbitrarily recomputing data is fundamentally incompatible with Beam,
&
It feels ok to tackle those separately.
> Jan
> On 7/2/21 8:23 PM, Reuven Lax wrote:
>
>
>
> On Tue, Jun 29, 2021 at 1:03 AM Jozef Vilcek
> wrote:
>
>>
>> How will @RequiresStableInput prevent this situation when running batch
>>> use case?
>>>
`, not checkpoint. Also, I thought cache
is invoked on fork points in DAG. I have just a join and map and in such
cases I thought data is always served out by shuffle service. Am I mistaken?
On Fri, Jul 2, 2021 at 8:23 PM Reuven Lax wrote:
>
>
> On Tue, Jun 29, 2021 at 1:03 AM Jozef Vilcek
f penalty for that case).
User would have to explicitly ask FileIO to use specific sharding.
Documentation can educate about the tradeoffs. But I am open to workaround
alternatives.
On Mon, Jun 28, 2021 at 5:50 PM Chamikara Jayalath
wrote:
>
>
> On Mon, Jun 28, 2021 at 2:47 AM Jozef Vilcek
>
t.
How will @RequiresStableInput prevent this situation when running batch use
case?
On Mon, Jun 28, 2021 at 10:29 AM Chamikara Jayalath
wrote:
>
>
> On Sun, Jun 27, 2021 at 10:48 PM Jozef Vilcek
> wrote:
>
>> Hi,
>>
>> how do we proceed with reviewing MR proposed f
to do the
same. While this is true, as it is illustrated in previous commnet, it is
not simple nor convenient to use and requires more customization than
exposing sharding which is already there.
Are there more negatives to exposing sharding function?
On Wed, Jun 23, 2021 at 9:36 AM Jozef Vilcek
your PR. How is this PR different than the by()
> method in FileIO?
>
> On Tue, Jun 22, 2021 at 1:22 AM Jozef Vilcek
> wrote:
>
>> MR for review for this change is here:
>> https://github.com/apache/beam/pull/15051
>>
>> On Fri, Jun 18, 2021 at 8:47 AM Jozef Vilce
MR for review for this change is here:
https://github.com/apache/beam/pull/15051
On Fri, Jun 18, 2021 at 8:47 AM Jozef Vilcek wrote:
> I would like this thread to stay focused on sharding FileIO only. Possible
> change to the model is an interesting topic but of a much different scope.
&
communicate that this
> checkpointing was required for correctness. I think we still have many IOs
> that are written using Reshuffle instead of @RequiresStableInput, and I
> don't know which runners process @RequiresStableInput properly.
> >
> > By the way, I believe the Spark
;> key preserving) which may allow a runner to avoid an additional shuffle if
>> a preceding shuffle can guarantee the sharding requirements?
>>
>> Where X is the shuffle that could be avoided: input -> shuffle (key
>> sharding fn A) -> transform1 (key preserving) -> tr
gt; preserving) -> X -> fileio (key sharding fn A)
>
> On Tue, Jun 15, 2021 at 1:02 AM Jozef Vilcek
> wrote:
>
>> I would like to extend FileIO with possibility to specify a custom
>> sharding function:
>> https://issues.apache.org/jira/browse/BEAM-12493
>>
I would like to extend FileIO with possibility to specify a custom sharding
function:
https://issues.apache.org/jira/browse/BEAM-12493
I have 2 use-cases for this:
1. I need to generate shards which are compatible with Hive bucketing
and therefore need to decide shard assignment based on
s a consistent result.
>
> -Max
>
> On 08.07.20 11:08, Jozef Vilcek wrote:
> > My last question was more towards the graph translation for batch mode.
> >
> > Should DoFn with @RequiresStableInput be translated/expanded in some
> > specific way (e.g. DoFn -> Reshuff
m when @RequiresStableInput.
>
> The intermediate state is to analyze whether input is already stable from
> materialize() and add another materialize() only if it is not stable.
>
> I don't know the current state of the SparkRunner. This may already have
> changed.
>
> Kenn
>
> On T
it in batch?
On Wed, Jul 1, 2020 at 10:27 AM Jozef Vilcek wrote:
> We have a component which we use in streaming and batch jobs. Streaming we
> run on FlinkRunner and batch on SparkRunner. Recently we needed to
> add @RequiresStableInput to taht component because of streaming use-case.
We have a component which we use in streaming and batch jobs. Streaming we
run on FlinkRunner and batch on SparkRunner. Recently we needed to
add @RequiresStableInput to taht component because of streaming use-case.
But now batch case crash on SparkRunner with
Caused by:
leNamePolicy than before was used (window + timing + shards).
>> Then, you can find files that contains the original filenames in
>> windowing-textio/pipe_with_lateness_60s/files-after-distinct. This is the
>> interesting part, because you will find several files with LATE
I am using FileIO and I do observe the drop of pane info information on
Flink runner too. It was mentioned in this thread:
https://www.mail-archive.com/dev@beam.apache.org/msg20186.html
It is a result of different reshuffle expansion for optimisation reasons.
However, I did not observe a data
rg/jira/browse/BEAM-9824)
>
> D.
>
> On Mon, May 4, 2020 at 2:48 PM Jozef Vilcek wrote:
>
>> I have a pipeline which
>>
>> 1. Read from KafkaIO
>> 2. Does stuff with events and writes windowed file via FileIO
>> 3. Apply statefull DoFn on written files i
I have a pipeline which
1. Read from KafkaIO
2. Does stuff with events and writes windowed file via FileIO
3. Apply statefull DoFn on written files info
The statefull DoFn does some logic which depends on PaneInfo.Timing, if it
is EARLY or something else. When testing in DirectRunner, all is
Thanks Ismael!
On Mon, Mar 2, 2020 at 2:15 PM Ismaël Mejía wrote:
> Done, also assigned the issue you mentioned in the previous email to you.
>
> On Mon, Mar 2, 2020 at 12:56 PM Jozef Vilcek
> wrote:
>
>> Recently I had a problem with Beam pipeline unable to start due t
Can I please get a permission in JIRA for `jvilcek` user to self assign
JIRAs?
Recently I had a problem with Beam pipeline unable to start due to
unhealthy broker in the list of configured bootstrap servers. I have
created a JIRA for it and plan to work on the fix.
https://issues.apache.org/jira/browse/BEAM-9420
Please let me know in case it does not make sense of should
Will do! Thanks Alex.
On Sat, Feb 22, 2020 at 9:23 AM Alex Van Boxel wrote:
> I've assigned it too you. If you create a PR you can add me as reviewer.
>
> _/
> _/ Alex Van Boxel
>
>
> On Sat, Feb 22, 2020 at 9:14 AM Jozef Vilcek
> wrote:
>
>> I have fil
rce that the metadata match.
>
> On Fri, Feb 21, 2020 at 10:38 AM Jozef Vilcek
> wrote:
>
>> Hi,
>>
>> I am playing with Schemas in Beam and conversion between types. I am
>> experimenting with Convert transform to convert PCollection of POJOs to
>> Protobu
Hi,
I am playing with Schemas in Beam and conversion between types. I am
experimenting with Convert transform to convert PCollection of POJOs to
Protobufs, but get a failure about schemas not being compatible.
The root cause is that FieldType are not passing `equivalent()` check
because of
+1
On Fri, Dec 13, 2019 at 5:58 AM Kenneth Knowles wrote:
> Please vote on the proposal for Beam's mascot to be the Firefly. This
> encompasses the Lampyridae family of insects, without specifying a genus or
> species.
>
> [ ] +1, Approve Firefly being the mascot
> [ ] -1, Disapprove Firefly
On Wed, Nov 20, 2019 at 3:43 AM Kenneth Knowles wrote:
> Please cast your votes of approval [1] for animals you would support as
> Beam mascot. The animal with the most approval will be identified as the
> favorite.
>
> *** Vote for as many as you like, using this checklist as a template
>
Interesting topic :) I kind of liked also Alex's firefly. The impression it
made on me. To drive it further, hands on hips make strong / serious pose,
hovering in the air above all.
I would put logo on the him, to become is torso / body or a dress. Logo
with a big B on it almost looks like
f I provide this
data e.g. via file dump, then whole job runs OK with 4GB executor heap.
Run is about 400 cores for 1 hour, so triple the heap size for all just for
one initial load on one executor is inefficient.
I am not aware about any regression.
>
> All my best, Ryan
>
>
>
>
, in such cases, what is an opinion about BoundedSource vs DoFn as a
source. What is a recommendation to IO developer if one want's to achieve
equivalent execution scalability across runners?
On Sun, Oct 27, 2019 at 6:02 PM Jozef Vilcek wrote:
> typo in my previous message. I meant to say =>
will help! I'll
> take a closer look at the DoFnOutputManager. In the meantime, is
> there anything particularly about your job that might help
> investigate?
>
> All my best, Ryan
>
> On Fri, Oct 25, 2019 at 2:47 PM Jozef Vilcek
> wrote:
> >
> > I agree I might be too quick to c
I agree I might be too quick to call DoFn output need to fit in memory.
Actually I am not sure what Beam model say on this matter and what output
managers of particular runners do about it.
But SparkRunner definitely has an issue here. I did try set small
`fetchSize` for JdbcIO as well as change
Hi,
I am in a need to read a big-ish data set via JdbcIO. This forced me to
bump up memory for my executor (right now using SparkRunner). It seems that
JdbcIO has a requirement to fit all data in memory as it is using DoFn to
unfold query to list of elements.
BoundedSource would not face the
We do have 2.15.0 Beam batch jobs running on Spark runner. I did have a bit
of tricky time with spark.default.parallelism, but at the end it works fine
for us (custom parallelism on source stages and spark.default.parallelism
on all other stages after shuffles)
Tricky part in my case was
Interesting discussion. I think it is very important information, that when
user will use a stateful ParDo, he can run into the situation where it will
not behave correctly in "batch operating mode".
But some transforms known to Beam, like fixed-window, will work fine? Is
there a sorting applied
t; On Sun, May 5, 2019 at 5:25 AM Jozef Vilcek wrote:
>
>> Sorry, it took a while. I wanted to actually use this extension for
>> WriteFiles in Flink and see it works and that proved too be a bit bumpy.
>> PR is at https://github.com/apache/beam/pull/8499
>>
>> On Thu
ryo or?
> >>
> >> -Max
> >>
> >> On 02.05.19 13:15, Robert Bradshaw wrote:
> >> > Thanks for filing those.
> >> >
> >> > As for how not doing a copy is "safe," it's not really. Beam
ook at the PR!
>
> Reuven
>
> On Wed, May 1, 2019 at 6:47 AM Jozef Vilcek wrote:
>
>> That coder is added extra as a re-map stage from "original" key to new
>> ShardAwareKey ... But pipeline might get broken I guess.
>> Very fair point. I am having a second thou
; > asserts that you MUST NOT mutate your inputs (and direct runners,
> > which are used during testing, do perform extra copies and checks to
> > catch violations of this requirement).
> >
> > On Thu, May 2, 2019 at 1:02 PM Jozef Vilcek
> wrote:
> >>
> &g
I have created
https://issues.apache.org/jira/browse/BEAM-7204
https://issues.apache.org/jira/browse/BEAM-7206
to track these topics further
On Wed, May 1, 2019 at 1:24 PM Jozef Vilcek wrote:
>
>
> On Tue, Apr 30, 2019 at 5:42 PM Kenneth Knowles wrote:
>
>>
>>
>&g
t; Reuven
>
> On Tue, Apr 30, 2019 at 8:42 AM Jozef Vilcek
> wrote:
>
>> Hm, what would be the scenario? Have version A running with original
>> random sharding and then start version B where I change sharding to some
>> custom function?
>> So I have to enable the
imilar
chaining via this "fusion of stages"? Curious here... how is it different
from chaining so runner can be sure that not doing copy is "safe" with
respect to user defined functions and their behaviour over inputs?
>
>> Reuven
>>
>> On Tue, Apr 30, 201
2019 at 8:21 AM Jozef Vilcek
> wrote:
>
>> I have created a PR for enhancing WriteFiles for custom sharding function.
>> https://github.com/apache/beam/pull/8438
>>
>> If this sort of change looks good, then next step would be to use in in
>> Flink runner transform o
I have created a PR for enhancing WriteFiles for custom sharding function.
https://github.com/apache/beam/pull/8438
If this sort of change looks good, then next step would be to use in in
Flink runner transform override. Let me know what do you think
On Fri, Apr 26, 2019 at 9:24 AM Jozef Vilcek
ink runner I
> believe had no knowledge of fusion, which was known to make it extremely
> slow. A lot of work went into making the portable runner fusion aware, so
> we don't need to round trip through coders on every ParDo.
>
> Reuven
>
> On Tue, Apr 30, 2019 at 6:58
dug
>> into this performance too deeply. I suspect that there is low-hanging fruit
>> to optimize as a result.
>> >>
>> >> You're right that ReduceFnRunner schedules a timer for each element. I
>> think this code dates back to before Beam; on Dataflow timers
Hello,
I am interested in any knowledge or thoughts on what should be / is an
overhead of running Beam pipelines instead of pipelines written on "bare
runner". Is this something which is being tested or investigated by
community? Is there a consensus in what bounds should the overhead
typically
rds >> num_workers => good spread of the load across workers,
>> but huge number of files
>>
>> Your approach would give users control over the sharding keys such that
>> they could be adjusted to spread load more evenly.
>>
>> I'd like to hear from
Hello,
Right now, if someone needs sharded files via FileIO, there is only one
option which is random (round robin) shard assignment per element and it
always use ShardedKey as a key for the GBK which follows.
I would like to generalize this and have a possibility to provide some
I want to reach out for opinions on what would be the best way to proceed
with https://issues.apache.org/jira/browse/BEAM-6077
The problem is, that when FlinkRunner job is being restored from
checkpoint, it needs to resurrect source and it's readers given the
checkpoint state. State element is
also leverage spread of the Kafka partitions.
>
> Thanks,
> Max
>
> On 16.11.18 10:57, Jozef Vilcek wrote:
> > Hi,
> >
> > I want to collect some feedback on rescaling streaming Beam pipeline on
> > Flink runner. Flink seems to be able to re-scale jobs, which in Beam
Hi,
I want to collect some feedback on rescaling streaming Beam pipeline on
Flink runner. Flink seems to be able to re-scale jobs, which in Beam terms
means changing the parallelism in Beam. However, one have to make sure that
state can rescale as well to the predefined MAX parallelism. Max
rkers). Probably we should do something
> > similar for the Flink runner.
> >
> > This needs to be done by the runner, as # of workers is a runner
> > concept; the SDK itself has no concept of workers.
> >
> > On Thu, Oct 25, 2018 at 3:28 AM Jozef Vilcek > &
cify the number of shards in streaming mode?
>
> -Max
>
> On 25.10.18 10:12, Jozef Vilcek wrote:
> > Hm, yes, this makes sense now, but what can be done for my case? I do
> > not want to end up with too many files on disk.
> >
> > I think what I am looking fo
ough keys, the chance
> increases they are equally spread.
>
> This should be similar to what the other Runners do.
>
> On 24.10.18 10:58, Jozef Vilcek wrote:
> >
> > So if I run 5 workers with 50 shards, I end up with:
> >
> > DurationBytes rec
what I ended up doing, when I could not for any reasono rely on kafka
timestamps, but need to parse them form message is:
* have a cusom kafka deserializer which never throws but returns message
which is either a success with parsed data structure plus timestamp or
failure with original kafka
Oct 24, 2018 at 12:28 AM Jozef Vilcek
> wrote:
>
>> cc (dev)
>>
>> I tried to run the example with FlinkRunner in batch mode and received
>> again bad data spread among the workers.
>>
>> When I tried to remove number of shards for batch mode i
,
AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)).withLateFirings(AfterFirst.of(Repeatedly.fo
rever(AfterPane.elementCountAtLeast(1)),
Repeatedly.forever(AfterSynchronizedProcessingTime.pastFirstElementInPane(
On Tue, Oct 23, 2018 at 12:01 PM Jozef Vilcek wrote:
> Hi
Just curious here. What happens when this local JVM context dies for any
reason? How does it work with DataFlow?
On Thu, Oct 4, 2018 at 1:50 AM Scott Wegner wrote:
> Another point that we discussed at ApacheCon is that a difference between
> Dataflow and other runners is Dataflow is
Just for reference, there is a JIRA open for
FileBasedSink.moveToOutputFiles() and filesystem move behavior
https://issues.apache.org/jira/browse/BEAM-5036
On Wed, Aug 22, 2018 at 9:15 PM Tim Robertson
wrote:
> Reuven, I think you might be on to something
>
> The Beam HadoopFileSystem copy()
the new state.
>> >
>> > I am not deeply into the of how Beam and the Flink runner implement
>> > their use of state, but it looks this part is not present, which could
>> > mean that savepoints taken from Beam applications are not backwards
>> > compatible.
&
Hello,
I am attempting to upgrade Beam app from 2.5.0 running on Flink 1.4.0 to
Beam 2.6.0 running on Flink 1.5.0. I am not aware of any state migration
changes needed for Flink 1.4.0 -> 1.5.0 so I am just starting a new App
with updated libs from Flink save-point captured by previous version of
Hello,
this commit added export-ignore for some of the gradle stuff
https://github.com/apache/beam/commit/2a0f68b0c743d37c46486b81500043b4b420c825
This means that downloaded zip archive of git repository is not build-able
via 'gradlew` command. I am curious about the rationale behind this
Hello, is there a change this bug make it into the release?
https://issues.apache.org/jira/browse/BEAM-5028
On Sat, Jul 28, 2018 at 1:38 AM Pablo Estrada wrote:
> Hello all,
> I will start daily updates of progress on the 2.6.0 release.
> As of today, the main release blockers are issues in
gt; we would need to make sure that all Filesystems support cross-directory
>> rename.
>>
>> On Thu, Jul 26, 2018 at 9:58 AM Lukasz Cwik wrote:
>>
>>> +dev
>>>
>>> On Thu, Jul 26, 2018 at 2:40 AM Jozef Vilcek
>>> wrote:
>>&
74 matches
Mail list logo