On 2020/03/26 13:42:51, Kjetil Halvorsen <[email protected]> wrote: 
> Another update on this issue. I observe the same with bounded SDFs when
> running in streaming mode. The general pipeline is [unbounded watcher, sdf]
> -> [ParDo with side input from File.IO] -> [bounded sdf] -> [ParDo]...
> 
> This also fails with the conflicting bucketing function error message. When
> I remove the File.IO side input, the pipeline executes again (on Dataflow).
> 
> This one hurts us a bit because we use the File.IO side inputs to feed the
> pipeline with config settings, so it is not trivial for us to remove it.
> 
> Best,
> Kjetil
> 
> On Mon, Mar 23, 2020 at 9:52 PM Kjetil Halvorsen <
> [email protected]> wrote:
> 
> > Perfect, thanks.
> >
> > I did some more testing, and it seems to narrow down to using FileIO.match
> > -> readMatches -> to drive the upstream side input. I have attached a
> > pipeline that reproduces the error. When I run it with Beam 2.17 or 2.18 it
> > will fail on Dataflow. I have not tested with 2.19 due to the blocker on
> > Win Java.
> >
> > Please let me know if there is anything else I can do to help. I am very
> > motivated to get this sorted out as we have lots of scenarios lined up.
> >
> > Best,
> > Kjetil
> >
> > On Thu, Mar 19, 2020 at 10:56 PM Luke Cwik <[email protected]> wrote:
> >
> >> That doesn't sound like it should be an issue and sounds like a bug in
> >> Dataflow.
> >>
> >> If you're willing to share a minimal pipeline that gets this error. I can
> >> get an issue opened up internally and assigned.
> >>
> >> On Thu, Mar 19, 2020 at 2:09 PM Kjetil Halvorsen <
> >> [email protected]> wrote:
> >>
> >>> Thank you for the tip about the "--dataFlowJobFile". I wasn't aware of
> >>> it, and it was of great help to interpret the error message from Dataflow.
> >>>
> >>> I found the error/bug in an upstream DoFn (execute before the SDF) with
> >>> a side-input. Both the main input to the DoFn and the side input were
> >>> bounded and using the default window and trigger (i.e. no windowing nor
> >>> trigger specified in the job).
> >>>
> >>> When I moved that particular DoFn to be downstream to the SDF, the job
> >>> started working.
> >>>
> >>> Maybe this is by design and I just hadn't registered that one cannot
> >>> have a side-input DoFn upstream to an unbound SDF?
> >>>
> >>> In any case, thank you for the patience and willingness to help out.
> >>>
> >>> Best,
> >>> Kjetil
> >>>
> >>> On Tue, Mar 17, 2020 at 5:14 PM Luke Cwik <[email protected]> wrote:
> >>>
> >>>>
> >>>>
> >>>> On Tue, Mar 17, 2020 at 5:15 AM Kjetil Halvorsen <
> >>>> [email protected]> wrote:
> >>>>
> >>>>> Thanks for looking into this. I have been distracted on a separate
> >>>>> (Beam) feature the past week so it took me some time to make progress. 
> >>>>> In
> >>>>> any case, I have run new tests on Dataflow with a minimal pipeline.
> >>>>> Unfortunately with the same results: "step 22 has conflicting bucketing
> >>>>> functions". More info inline below.
> >>>>>
> >>>>> Best,
> >>>>> Kjetil
> >>>>>
> >>>>> On Mon, Mar 9, 2020 at 10:31 PM Luke Cwik <[email protected]> wrote:
> >>>>>
> >>>>>> The bucketing "error" is likely related to what windowing
> >>>>>> strategy/pipeline shape you have. Have you tried running your SDF 
> >>>>>> inside an
> >>>>>> empty pipeline possibly followed by a ParDo to log what records you are
> >>>>>> seeing?
> >>>>>>
> >>>>>
> >>>>> I slimmed the pipeline down to just being this sdf plus a MapElements
> >>>>> that log the records. No windowing definitions nor any trigger 
> >>>>> definitions.
> >>>>> The results were exactly the same: The job fails somewhere in the
> >>>>> startup/verification phase in Dataflow (i.e. after compile/upload from 
> >>>>> the
> >>>>> client, but as a part of the Dataflow startup procedure). "Step 22 has
> >>>>> conflicting bucketing functions".
> >>>>>
> >>>>
> >>>> The error is because the windowing fn on the GBKs are different. You
> >>>> can dump and inspect the JSON job description using the flag
> >>>> --dataflowJobFile=/path/to/dump/file.json
> >>>>
> >>>>
> >>>>>
> >>>>>>
> >>>>>> On Tue, Mar 3, 2020 at 3:34 AM Kjetil Halvorsen <
> >>>>>> [email protected]> wrote:
> >>>>>>
> >>>>>>> Thank's for the willingness to help out. The general context is that
> >>>>>>> we are developing a set of new Beam based connectors/readers.
> >>>>>>>
> >>>>>>> I had hoped that SDF was ready for use with Dataflow--just because
> >>>>>>> the interface is nice to work with. In general, would you recommend 
> >>>>>>> that we
> >>>>>>> look at the legacy source APIs for building our connectors/readers?
> >>>>>>>
> >>>>>>
> >>>>>> I would not. A few contributors have been making rapid progress over
> >>>>>> the past few months to finish SDFs with Python done from an API 
> >>>>>> standpoint
> >>>>>> (there is some additional integration/scaling testing going on), Java 
> >>>>>> is
> >>>>>> missing progress reporting from the API and watermark estimation but I 
> >>>>>> was
> >>>>>> hoping to finish those API pieces this month and Go has started on the
> >>>>>> batch API implementation.
> >>>>>>
> >>>>>
> >>>>> Great, I am happy to hear that. Would love to just keep investing in
> >>>>> the SDF implementations we started.
> >>>>>
> >>>>>>
> >>>>>>
> >>>>>>>
> >>>>>>> Anyways, I have pasted the skeleton of the SDF below (I apologize
> >>>>>>> for the bad formatting--still learning the grips of communicating 
> >>>>>>> code via
> >>>>>>> e-mail). . We have used the overall pattern from the file watcher. 
> >>>>>>> I.e. the
> >>>>>>> SDF creates "poll requests" at regular intervals which a downstream 
> >>>>>>> parDo
> >>>>>>> executes. The SDF uses the built-in OffserRange as the basis for the 
> >>>>>>> range
> >>>>>>> tracker.
> >>>>>>>
> >>>>>>> I am happy to receive any pointers on improvements, changes,
> >>>>>>> debugging paths.
> >>>>>>>
> >>>>>>> /**
> >>>>>>>  * This function generates an unbounded stream of source queries.
> >>>>>>>  */
> >>>>>>> @DoFn.UnboundedPerElement
> >>>>>>> public class GenerateTsPointRequestsUnboundFn extends
> >>>>>>> DoFn<RequestParameters, RequestParameters> {
> >>>>>>>
> >>>>>>>     @Setup
> >>>>>>>     public void setup() {
> >>>>>>>         validate();
> >>>>>>>     }
> >>>>>>>
> >>>>>>>     @ProcessElement
> >>>>>>>     public ProcessContinuation processElement(@Element Element
> >>>>>>> inputElement,
> >>>>>>>
> >>>>>>> RestrictionTracker<OffsetRange, Long> tracker,
> >>>>>>>
> >>>>>>> OutputReceiver<outputElement> out,
> >>>>>>>                                               ProcessContext
> >>>>>>> context) throws Exception {
> >>>>>>>
> >>>>>>>         long startRange = tracker.currentRestriction().getFrom();
> >>>>>>>         long endRange = tracker.currentRestriction().getTo();
> >>>>>>>
> >>>>>>>         while (startRange < (System.currentTimeMillis() -
> >>>>>>> readerConfig.getPollOffset().get().toMillis())) {
> >>>>>>>             // Set the query's max end to current time - offset.
> >>>>>>>             if (endRange > (System.currentTimeMillis() -
> >>>>>>> readerConfig.getPollOffset().get().toMillis())) {
> >>>>>>>                 endRange = (System.currentTimeMillis() -
> >>>>>>> readerConfig.getPollOffset().get().toMillis());
> >>>>>>>             }
> >>>>>>>
> >>>>>>>             if (tracker.tryClaim(endRange - 1)) {
> >>>>>>>
> >>>>>>
> >>>>>> Why do you try and claim to the endRange here? Shouldn't you claim
> >>>>>> subranges, so [start, start+pollsize), [start+pollisize, 
> >>>>>> start+pollsize*2),
> >>>>>> ..., [start+pollsize*N, end)?
> >>>>>>
> >>>>>> Also, if start is significantly smaller then current time, you could
> >>>>>> implement the @SplitRestriction method.
> >>>>>>
> >>>>>> https://github.com/apache/beam/blob/4a7eb329734131e1ef90419f405986de94a30846/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L990
> >>>>>>
> >>>>>
> >>>>> Good points! My original thinking was to have a second (bounded) SDF
> >>>>> that splits the ranges and executes the actual reads from the source.
> >>>>> Similar to the "watch + read" pattern. That way I can reuse most of the
> >>>>> code between the unbounded and bounded scenario. Maybe that's a 
> >>>>> sub-optimal
> >>>>> approach?
> >>>>>
> >>>>
> >>>> Following a watch + read pattern works well.
> >>>>
> >>>> And claiming the entire range when writing a generator function makes
> >>>> sense.
> >>>>
> >>>>
> >>>>>
> >>>>>>
> >>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> context.updateWatermark(org.joda.time.Instant.ofEpochMilli(startRange));
> >>>>>>>
> >>>>>>> out.outputWithTimestamp(buildOutputElement(inputElement, startRange,
> >>>>>>> endRange),
> >>>>>>>
> >>>>>>> org.joda.time.Instant.ofEpochMilli(startRange));
> >>>>>>>
> >>>>>>>                 // Update the start and end range for the next
> >>>>>>> iteration
> >>>>>>>                 startRange = endRange;
> >>>>>>>                 endRange = tracker.currentRestriction().getTo();
> >>>>>>>             } else {
> >>>>>>>                 LOG.info(localLoggingPrefix + "Stopping work due to
> >>>>>>> checkpointing or splitting.");
> >>>>>>>                 return ProcessContinuation.stop();
> >>>>>>>             }
> >>>>>>>
> >>>>>>>             if (startRange >= tracker.currentRestriction().getTo()) {
> >>>>>>>                 LOG.info(localLoggingPrefix + "Completed the request
> >>>>>>> time range. Will stop the reader.");
> >>>>>>>                 return ProcessContinuation.stop();
> >>>>>>>             }
> >>>>>>>
> >>>>>>>             return
> >>>>>>> ProcessContinuation.resume().withResumeDelay(org.joda.time.Duration.millis(
> >>>>>>>
> >>>>>>> readerConfig.getPollInterval().get().toMillis()));
> >>>>>>>         }
> >>>>>>>
> >>>>>>>         return
> >>>>>>> ProcessContinuation.resume().withResumeDelay(org.joda.time.Duration.millis(
> >>>>>>>                 readerConfig.getPollInterval().get().toMillis()));
> >>>>>>>     }
> >>>>>>>
> >>>>>>>     private OutputElement buildOutputElement(Element element,
> >>>>>>>                                                      long start,
> >>>>>>>                                                      long end) {
> >>>>>>>         return outputElement
> >>>>>>>                 .withParameter(START_KEY, start)
> >>>>>>>                 .withParameter(END_KEY, end);
> >>>>>>>     }
> >>>>>>>
> >>>>>>>     @GetInitialRestriction
> >>>>>>>     public OffsetRange getInitialRestriction(Element element) throws
> >>>>>>> Exception {
> >>>>>>>         return new OffsetRange(startTimestamp, endTimestamp);
> >>>>>>>     }
> >>>>>>> }
> >>>>>>>
> >>>>>>>
> >>>>>>> On Mon, Mar 2, 2020 at 11:21 PM Luke Cwik <[email protected]> wrote:
> >>>>>>>
> >>>>>>>> SplittableDoFn has experimental support within Dataflow so the way
> >>>>>>>> you may be using it could be correct but unsupported.
> >>>>>>>>
> >>>>>>>> Can you provide snippets/details of your splittable dofn
> >>>>>>>> implementation?
> >>>>>>>>
> >>>>>>>> On Mon, Mar 2, 2020 at 11:50 AM Kjetil Halvorsen <
> >>>>>>>> [email protected]> wrote:
> >>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Hi,
> >>>>>>>>>
> >>>>>>>>> I am looking for pointers to a Dataflow runner error message: 
> >>>>>>>>> Workflow
> >>>>>>>>> failed. Causes: Step s22 has conflicting bucketing functions,
> >>>>>>>>>
> >>>>>>>>> This happens at the very startup of the job execution, and I am
> >>>>>>>>> unable to find any pointer as to where in the code/job definition 
> >>>>>>>>> the
> >>>>>>>>> origin of the conflict is. The same job runs just fine in the 
> >>>>>>>>> DirectRunner.
> >>>>>>>>>
> >>>>>>>>> The job contains a splittable DoFn (unbound) and I have tried it
> >>>>>>>>> with both a windowing transform and without a windowing 
> >>>>>>>>> transform--both
> >>>>>>>>> fail with the same result on Dataflow.
> >>>>>>>>>
> >>>>>>>>> This is my first foray into splittable DoFn territory so I am sure
> >>>>>>>>> I have just made some basic missteps.
> >>>>>>>>>
> >>>>>>>>> Cheers,
> >>>>>>>>> Kjetil
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> --
> >>>>>>>>>
> >>>>>>>>> *Kjetil Halvorsen*
> >>>>>>>>> Chief Architect, Enterprise Integration
> >>>>>>>>> +47 48 01 13 75 | [email protected]
> >>>>>>>>> www.cognite.com | LIBERATE YOUR DATA™
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>>>> --
> >>>>>>>
> >>>>>>> *Kjetil Halvorsen*
> >>>>>>> Chief Architect, Enterprise Integration
> >>>>>>> +47 48 01 13 75 | [email protected]
> >>>>>>> www.cognite.com | LIBERATE YOUR DATA™
> >>>>>>>
> >>>>>>>
> >>>>>
> >>>>> --
> >>>>>
> >>>>> *Kjetil Halvorsen*
> >>>>> Chief Architect, Enterprise Integration
> >>>>> +47 48 01 13 75 | [email protected]
> >>>>> www.cognite.com | LIBERATE YOUR DATA™
> >>>>>
> >>>>>
> >>>
> >>> --
> >>>
> >>> *Kjetil Halvorsen*
> >>> Chief Architect, Enterprise Integration
> >>> +47 48 01 13 75 | [email protected]
> >>> www.cognite.com | LIBERATE YOUR DATA™
> >>>
> >>>
> >
> > --
> >
> > *Kjetil Halvorsen*
> > Chief Architect, Enterprise Integration
> > +47 48 01 13 75 | [email protected]
> > www.cognite.com | LIBERATE YOUR DATA™
> >
> >
> 
> -- 
> 
> *Kjetil Halvorsen*
> Chief Architect, Enterprise Integration
> +47 48 01 13 75 | [email protected]
> www.cognite.com | LIBERATE YOUR DATA™
> 

[email protected]

Reply via email to