Thanks for looking into this. I have been distracted on a separate (Beam) feature the past week so it took me some time to make progress. In any case, I have run new tests on Dataflow with a minimal pipeline. Unfortunately with the same results: "step 22 has conflicting bucketing functions". More info inline below.
Best, Kjetil On Mon, Mar 9, 2020 at 10:31 PM Luke Cwik <[email protected]> wrote: > The bucketing "error" is likely related to what windowing > strategy/pipeline shape you have. Have you tried running your SDF inside an > empty pipeline possibly followed by a ParDo to log what records you are > seeing? > I slimmed the pipeline down to just being this sdf plus a MapElements that log the records. No windowing definitions nor any trigger definitions. The results were exactly the same: The job fails somewhere in the startup/verification phase in Dataflow (i.e. after compile/upload from the client, but as a part of the Dataflow startup procedure). "Step 22 has conflicting bucketing functions". > > On Tue, Mar 3, 2020 at 3:34 AM Kjetil Halvorsen < > [email protected]> wrote: > >> Thank's for the willingness to help out. The general context is that we >> are developing a set of new Beam based connectors/readers. >> >> I had hoped that SDF was ready for use with Dataflow--just because the >> interface is nice to work with. In general, would you recommend that we >> look at the legacy source APIs for building our connectors/readers? >> > > I would not. A few contributors have been making rapid progress over the > past few months to finish SDFs with Python done from an API standpoint > (there is some additional integration/scaling testing going on), Java is > missing progress reporting from the API and watermark estimation but I was > hoping to finish those API pieces this month and Go has started on the > batch API implementation. > Great, I am happy to hear that. Would love to just keep investing in the SDF implementations we started. > > >> >> Anyways, I have pasted the skeleton of the SDF below (I apologize for the >> bad formatting--still learning the grips of communicating code via e-mail). >> . We have used the overall pattern from the file watcher. I.e. the SDF >> creates "poll requests" at regular intervals which a downstream parDo >> executes. The SDF uses the built-in OffserRange as the basis for the range >> tracker. >> >> I am happy to receive any pointers on improvements, changes, debugging >> paths. >> >> /** >> * This function generates an unbounded stream of source queries. >> */ >> @DoFn.UnboundedPerElement >> public class GenerateTsPointRequestsUnboundFn extends >> DoFn<RequestParameters, RequestParameters> { >> >> @Setup >> public void setup() { >> validate(); >> } >> >> @ProcessElement >> public ProcessContinuation processElement(@Element Element >> inputElement, >> >> RestrictionTracker<OffsetRange, Long> tracker, >> >> OutputReceiver<outputElement> out, >> ProcessContext context) >> throws Exception { >> >> long startRange = tracker.currentRestriction().getFrom(); >> long endRange = tracker.currentRestriction().getTo(); >> >> while (startRange < (System.currentTimeMillis() - >> readerConfig.getPollOffset().get().toMillis())) { >> // Set the query's max end to current time - offset. >> if (endRange > (System.currentTimeMillis() - >> readerConfig.getPollOffset().get().toMillis())) { >> endRange = (System.currentTimeMillis() - >> readerConfig.getPollOffset().get().toMillis()); >> } >> >> if (tracker.tryClaim(endRange - 1)) { >> > > Why do you try and claim to the endRange here? Shouldn't you claim > subranges, so [start, start+pollsize), [start+pollisize, start+pollsize*2), > ..., [start+pollsize*N, end)? > > Also, if start is significantly smaller then current time, you could > implement the @SplitRestriction method. > > https://github.com/apache/beam/blob/4a7eb329734131e1ef90419f405986de94a30846/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L990 > Good points! My original thinking was to have a second (bounded) SDF that splits the ranges and executes the actual reads from the source. Similar to the "watch + read" pattern. That way I can reuse most of the code between the unbounded and bounded scenario. Maybe that's a sub-optimal approach? > > > >> >> >> context.updateWatermark(org.joda.time.Instant.ofEpochMilli(startRange)); >> out.outputWithTimestamp(buildOutputElement(inputElement, >> startRange, endRange), >> org.joda.time.Instant.ofEpochMilli(startRange)); >> >> // Update the start and end range for the next iteration >> startRange = endRange; >> endRange = tracker.currentRestriction().getTo(); >> } else { >> LOG.info(localLoggingPrefix + "Stopping work due to >> checkpointing or splitting."); >> return ProcessContinuation.stop(); >> } >> >> if (startRange >= tracker.currentRestriction().getTo()) { >> LOG.info(localLoggingPrefix + "Completed the request time >> range. Will stop the reader."); >> return ProcessContinuation.stop(); >> } >> >> return >> ProcessContinuation.resume().withResumeDelay(org.joda.time.Duration.millis( >> readerConfig.getPollInterval().get().toMillis())); >> } >> >> return >> ProcessContinuation.resume().withResumeDelay(org.joda.time.Duration.millis( >> readerConfig.getPollInterval().get().toMillis())); >> } >> >> private OutputElement buildOutputElement(Element element, >> long start, >> long end) { >> return outputElement >> .withParameter(START_KEY, start) >> .withParameter(END_KEY, end); >> } >> >> @GetInitialRestriction >> public OffsetRange getInitialRestriction(Element element) throws >> Exception { >> return new OffsetRange(startTimestamp, endTimestamp); >> } >> } >> >> >> On Mon, Mar 2, 2020 at 11:21 PM Luke Cwik <[email protected]> wrote: >> >>> SplittableDoFn has experimental support within Dataflow so the way you >>> may be using it could be correct but unsupported. >>> >>> Can you provide snippets/details of your splittable dofn implementation? >>> >>> On Mon, Mar 2, 2020 at 11:50 AM Kjetil Halvorsen < >>> [email protected]> wrote: >>> >>>> >>>> Hi, >>>> >>>> I am looking for pointers to a Dataflow runner error message: Workflow >>>> failed. Causes: Step s22 has conflicting bucketing functions, >>>> >>>> This happens at the very startup of the job execution, and I am unable >>>> to find any pointer as to where in the code/job definition the origin of >>>> the conflict is. The same job runs just fine in the DirectRunner. >>>> >>>> The job contains a splittable DoFn (unbound) and I have tried it with >>>> both a windowing transform and without a windowing transform--both fail >>>> with the same result on Dataflow. >>>> >>>> This is my first foray into splittable DoFn territory so I am sure I >>>> have just made some basic missteps. >>>> >>>> Cheers, >>>> Kjetil >>>> >>>> >>>> >>>> >>>> -- >>>> >>>> *Kjetil Halvorsen* >>>> Chief Architect, Enterprise Integration >>>> +47 48 01 13 75 | [email protected] >>>> www.cognite.com | LIBERATE YOUR DATA™ >>>> >>>> >> >> -- >> >> *Kjetil Halvorsen* >> Chief Architect, Enterprise Integration >> +47 48 01 13 75 | [email protected] >> www.cognite.com | LIBERATE YOUR DATA™ >> >> -- *Kjetil Halvorsen* Chief Architect, Enterprise Integration +47 48 01 13 75 | [email protected] www.cognite.com | LIBERATE YOUR DATA™
