Thank you for the tip about the "--dataFlowJobFile". I wasn't aware of it, and it was of great help to interpret the error message from Dataflow.
I found the error/bug in an upstream DoFn (execute before the SDF) with a side-input. Both the main input to the DoFn and the side input were bounded and using the default window and trigger (i.e. no windowing nor trigger specified in the job). When I moved that particular DoFn to be downstream to the SDF, the job started working. Maybe this is by design and I just hadn't registered that one cannot have a side-input DoFn upstream to an unbound SDF? In any case, thank you for the patience and willingness to help out. Best, Kjetil On Tue, Mar 17, 2020 at 5:14 PM Luke Cwik <[email protected]> wrote: > > > On Tue, Mar 17, 2020 at 5:15 AM Kjetil Halvorsen < > [email protected]> wrote: > >> Thanks for looking into this. I have been distracted on a separate (Beam) >> feature the past week so it took me some time to make progress. In any >> case, I have run new tests on Dataflow with a minimal pipeline. >> Unfortunately with the same results: "step 22 has conflicting bucketing >> functions". More info inline below. >> >> Best, >> Kjetil >> >> On Mon, Mar 9, 2020 at 10:31 PM Luke Cwik <[email protected]> wrote: >> >>> The bucketing "error" is likely related to what windowing >>> strategy/pipeline shape you have. Have you tried running your SDF inside an >>> empty pipeline possibly followed by a ParDo to log what records you are >>> seeing? >>> >> >> I slimmed the pipeline down to just being this sdf plus a MapElements >> that log the records. No windowing definitions nor any trigger definitions. >> The results were exactly the same: The job fails somewhere in the >> startup/verification phase in Dataflow (i.e. after compile/upload from the >> client, but as a part of the Dataflow startup procedure). "Step 22 has >> conflicting bucketing functions". >> > > The error is because the windowing fn on the GBKs are different. You can > dump and inspect the JSON job description using the flag > --dataflowJobFile=/path/to/dump/file.json > > >> >>> >>> On Tue, Mar 3, 2020 at 3:34 AM Kjetil Halvorsen < >>> [email protected]> wrote: >>> >>>> Thank's for the willingness to help out. The general context is that we >>>> are developing a set of new Beam based connectors/readers. >>>> >>>> I had hoped that SDF was ready for use with Dataflow--just because the >>>> interface is nice to work with. In general, would you recommend that we >>>> look at the legacy source APIs for building our connectors/readers? >>>> >>> >>> I would not. A few contributors have been making rapid progress over the >>> past few months to finish SDFs with Python done from an API standpoint >>> (there is some additional integration/scaling testing going on), Java is >>> missing progress reporting from the API and watermark estimation but I was >>> hoping to finish those API pieces this month and Go has started on the >>> batch API implementation. >>> >> >> Great, I am happy to hear that. Would love to just keep investing in the >> SDF implementations we started. >> >>> >>> >>>> >>>> Anyways, I have pasted the skeleton of the SDF below (I apologize for >>>> the bad formatting--still learning the grips of communicating code via >>>> e-mail). . We have used the overall pattern from the file watcher. I.e. the >>>> SDF creates "poll requests" at regular intervals which a downstream parDo >>>> executes. The SDF uses the built-in OffserRange as the basis for the range >>>> tracker. >>>> >>>> I am happy to receive any pointers on improvements, changes, debugging >>>> paths. >>>> >>>> /** >>>> * This function generates an unbounded stream of source queries. >>>> */ >>>> @DoFn.UnboundedPerElement >>>> public class GenerateTsPointRequestsUnboundFn extends >>>> DoFn<RequestParameters, RequestParameters> { >>>> >>>> @Setup >>>> public void setup() { >>>> validate(); >>>> } >>>> >>>> @ProcessElement >>>> public ProcessContinuation processElement(@Element Element >>>> inputElement, >>>> >>>> RestrictionTracker<OffsetRange, Long> tracker, >>>> >>>> OutputReceiver<outputElement> out, >>>> ProcessContext context) >>>> throws Exception { >>>> >>>> long startRange = tracker.currentRestriction().getFrom(); >>>> long endRange = tracker.currentRestriction().getTo(); >>>> >>>> while (startRange < (System.currentTimeMillis() - >>>> readerConfig.getPollOffset().get().toMillis())) { >>>> // Set the query's max end to current time - offset. >>>> if (endRange > (System.currentTimeMillis() - >>>> readerConfig.getPollOffset().get().toMillis())) { >>>> endRange = (System.currentTimeMillis() - >>>> readerConfig.getPollOffset().get().toMillis()); >>>> } >>>> >>>> if (tracker.tryClaim(endRange - 1)) { >>>> >>> >>> Why do you try and claim to the endRange here? Shouldn't you claim >>> subranges, so [start, start+pollsize), [start+pollisize, start+pollsize*2), >>> ..., [start+pollsize*N, end)? >>> >>> Also, if start is significantly smaller then current time, you could >>> implement the @SplitRestriction method. >>> >>> https://github.com/apache/beam/blob/4a7eb329734131e1ef90419f405986de94a30846/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L990 >>> >> >> Good points! My original thinking was to have a second (bounded) SDF that >> splits the ranges and executes the actual reads from the source. Similar to >> the "watch + read" pattern. That way I can reuse most of the code between >> the unbounded and bounded scenario. Maybe that's a sub-optimal approach? >> > > Following a watch + read pattern works well. > > And claiming the entire range when writing a generator function makes > sense. > > >> >>> >>> >>>> >>>> >>>> context.updateWatermark(org.joda.time.Instant.ofEpochMilli(startRange)); >>>> >>>> out.outputWithTimestamp(buildOutputElement(inputElement, startRange, >>>> endRange), >>>> org.joda.time.Instant.ofEpochMilli(startRange)); >>>> >>>> // Update the start and end range for the next iteration >>>> startRange = endRange; >>>> endRange = tracker.currentRestriction().getTo(); >>>> } else { >>>> LOG.info(localLoggingPrefix + "Stopping work due to >>>> checkpointing or splitting."); >>>> return ProcessContinuation.stop(); >>>> } >>>> >>>> if (startRange >= tracker.currentRestriction().getTo()) { >>>> LOG.info(localLoggingPrefix + "Completed the request >>>> time range. Will stop the reader."); >>>> return ProcessContinuation.stop(); >>>> } >>>> >>>> return >>>> ProcessContinuation.resume().withResumeDelay(org.joda.time.Duration.millis( >>>> readerConfig.getPollInterval().get().toMillis())); >>>> } >>>> >>>> return >>>> ProcessContinuation.resume().withResumeDelay(org.joda.time.Duration.millis( >>>> readerConfig.getPollInterval().get().toMillis())); >>>> } >>>> >>>> private OutputElement buildOutputElement(Element element, >>>> long start, >>>> long end) { >>>> return outputElement >>>> .withParameter(START_KEY, start) >>>> .withParameter(END_KEY, end); >>>> } >>>> >>>> @GetInitialRestriction >>>> public OffsetRange getInitialRestriction(Element element) throws >>>> Exception { >>>> return new OffsetRange(startTimestamp, endTimestamp); >>>> } >>>> } >>>> >>>> >>>> On Mon, Mar 2, 2020 at 11:21 PM Luke Cwik <[email protected]> wrote: >>>> >>>>> SplittableDoFn has experimental support within Dataflow so the way you >>>>> may be using it could be correct but unsupported. >>>>> >>>>> Can you provide snippets/details of your splittable dofn >>>>> implementation? >>>>> >>>>> On Mon, Mar 2, 2020 at 11:50 AM Kjetil Halvorsen < >>>>> [email protected]> wrote: >>>>> >>>>>> >>>>>> Hi, >>>>>> >>>>>> I am looking for pointers to a Dataflow runner error message: Workflow >>>>>> failed. Causes: Step s22 has conflicting bucketing functions, >>>>>> >>>>>> This happens at the very startup of the job execution, and I am >>>>>> unable to find any pointer as to where in the code/job definition the >>>>>> origin of the conflict is. The same job runs just fine in the >>>>>> DirectRunner. >>>>>> >>>>>> The job contains a splittable DoFn (unbound) and I have tried it with >>>>>> both a windowing transform and without a windowing transform--both fail >>>>>> with the same result on Dataflow. >>>>>> >>>>>> This is my first foray into splittable DoFn territory so I am sure I >>>>>> have just made some basic missteps. >>>>>> >>>>>> Cheers, >>>>>> Kjetil >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> >>>>>> *Kjetil Halvorsen* >>>>>> Chief Architect, Enterprise Integration >>>>>> +47 48 01 13 75 | [email protected] >>>>>> www.cognite.com | LIBERATE YOUR DATA™ >>>>>> >>>>>> >>>> >>>> -- >>>> >>>> *Kjetil Halvorsen* >>>> Chief Architect, Enterprise Integration >>>> +47 48 01 13 75 | [email protected] >>>> www.cognite.com | LIBERATE YOUR DATA™ >>>> >>>> >> >> -- >> >> *Kjetil Halvorsen* >> Chief Architect, Enterprise Integration >> +47 48 01 13 75 | [email protected] >> www.cognite.com | LIBERATE YOUR DATA™ >> >> -- *Kjetil Halvorsen* Chief Architect, Enterprise Integration +47 48 01 13 75 | [email protected] www.cognite.com | LIBERATE YOUR DATA™
