Another update on this issue. I observe the same with bounded SDFs when running in streaming mode. The general pipeline is [unbounded watcher, sdf] -> [ParDo with side input from File.IO] -> [bounded sdf] -> [ParDo]...
This also fails with the conflicting bucketing function error message. When I remove the File.IO side input, the pipeline executes again (on Dataflow). This one hurts us a bit because we use the File.IO side inputs to feed the pipeline with config settings, so it is not trivial for us to remove it. Best, Kjetil On Mon, Mar 23, 2020 at 9:52 PM Kjetil Halvorsen < [email protected]> wrote: > Perfect, thanks. > > I did some more testing, and it seems to narrow down to using FileIO.match > -> readMatches -> to drive the upstream side input. I have attached a > pipeline that reproduces the error. When I run it with Beam 2.17 or 2.18 it > will fail on Dataflow. I have not tested with 2.19 due to the blocker on > Win Java. > > Please let me know if there is anything else I can do to help. I am very > motivated to get this sorted out as we have lots of scenarios lined up. > > Best, > Kjetil > > On Thu, Mar 19, 2020 at 10:56 PM Luke Cwik <[email protected]> wrote: > >> That doesn't sound like it should be an issue and sounds like a bug in >> Dataflow. >> >> If you're willing to share a minimal pipeline that gets this error. I can >> get an issue opened up internally and assigned. >> >> On Thu, Mar 19, 2020 at 2:09 PM Kjetil Halvorsen < >> [email protected]> wrote: >> >>> Thank you for the tip about the "--dataFlowJobFile". I wasn't aware of >>> it, and it was of great help to interpret the error message from Dataflow. >>> >>> I found the error/bug in an upstream DoFn (execute before the SDF) with >>> a side-input. Both the main input to the DoFn and the side input were >>> bounded and using the default window and trigger (i.e. no windowing nor >>> trigger specified in the job). >>> >>> When I moved that particular DoFn to be downstream to the SDF, the job >>> started working. >>> >>> Maybe this is by design and I just hadn't registered that one cannot >>> have a side-input DoFn upstream to an unbound SDF? >>> >>> In any case, thank you for the patience and willingness to help out. >>> >>> Best, >>> Kjetil >>> >>> On Tue, Mar 17, 2020 at 5:14 PM Luke Cwik <[email protected]> wrote: >>> >>>> >>>> >>>> On Tue, Mar 17, 2020 at 5:15 AM Kjetil Halvorsen < >>>> [email protected]> wrote: >>>> >>>>> Thanks for looking into this. I have been distracted on a separate >>>>> (Beam) feature the past week so it took me some time to make progress. In >>>>> any case, I have run new tests on Dataflow with a minimal pipeline. >>>>> Unfortunately with the same results: "step 22 has conflicting bucketing >>>>> functions". More info inline below. >>>>> >>>>> Best, >>>>> Kjetil >>>>> >>>>> On Mon, Mar 9, 2020 at 10:31 PM Luke Cwik <[email protected]> wrote: >>>>> >>>>>> The bucketing "error" is likely related to what windowing >>>>>> strategy/pipeline shape you have. Have you tried running your SDF inside >>>>>> an >>>>>> empty pipeline possibly followed by a ParDo to log what records you are >>>>>> seeing? >>>>>> >>>>> >>>>> I slimmed the pipeline down to just being this sdf plus a MapElements >>>>> that log the records. No windowing definitions nor any trigger >>>>> definitions. >>>>> The results were exactly the same: The job fails somewhere in the >>>>> startup/verification phase in Dataflow (i.e. after compile/upload from the >>>>> client, but as a part of the Dataflow startup procedure). "Step 22 has >>>>> conflicting bucketing functions". >>>>> >>>> >>>> The error is because the windowing fn on the GBKs are different. You >>>> can dump and inspect the JSON job description using the flag >>>> --dataflowJobFile=/path/to/dump/file.json >>>> >>>> >>>>> >>>>>> >>>>>> On Tue, Mar 3, 2020 at 3:34 AM Kjetil Halvorsen < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> Thank's for the willingness to help out. The general context is that >>>>>>> we are developing a set of new Beam based connectors/readers. >>>>>>> >>>>>>> I had hoped that SDF was ready for use with Dataflow--just because >>>>>>> the interface is nice to work with. In general, would you recommend >>>>>>> that we >>>>>>> look at the legacy source APIs for building our connectors/readers? >>>>>>> >>>>>> >>>>>> I would not. A few contributors have been making rapid progress over >>>>>> the past few months to finish SDFs with Python done from an API >>>>>> standpoint >>>>>> (there is some additional integration/scaling testing going on), Java is >>>>>> missing progress reporting from the API and watermark estimation but I >>>>>> was >>>>>> hoping to finish those API pieces this month and Go has started on the >>>>>> batch API implementation. >>>>>> >>>>> >>>>> Great, I am happy to hear that. Would love to just keep investing in >>>>> the SDF implementations we started. >>>>> >>>>>> >>>>>> >>>>>>> >>>>>>> Anyways, I have pasted the skeleton of the SDF below (I apologize >>>>>>> for the bad formatting--still learning the grips of communicating code >>>>>>> via >>>>>>> e-mail). . We have used the overall pattern from the file watcher. I.e. >>>>>>> the >>>>>>> SDF creates "poll requests" at regular intervals which a downstream >>>>>>> parDo >>>>>>> executes. The SDF uses the built-in OffserRange as the basis for the >>>>>>> range >>>>>>> tracker. >>>>>>> >>>>>>> I am happy to receive any pointers on improvements, changes, >>>>>>> debugging paths. >>>>>>> >>>>>>> /** >>>>>>> * This function generates an unbounded stream of source queries. >>>>>>> */ >>>>>>> @DoFn.UnboundedPerElement >>>>>>> public class GenerateTsPointRequestsUnboundFn extends >>>>>>> DoFn<RequestParameters, RequestParameters> { >>>>>>> >>>>>>> @Setup >>>>>>> public void setup() { >>>>>>> validate(); >>>>>>> } >>>>>>> >>>>>>> @ProcessElement >>>>>>> public ProcessContinuation processElement(@Element Element >>>>>>> inputElement, >>>>>>> >>>>>>> RestrictionTracker<OffsetRange, Long> tracker, >>>>>>> >>>>>>> OutputReceiver<outputElement> out, >>>>>>> ProcessContext >>>>>>> context) throws Exception { >>>>>>> >>>>>>> long startRange = tracker.currentRestriction().getFrom(); >>>>>>> long endRange = tracker.currentRestriction().getTo(); >>>>>>> >>>>>>> while (startRange < (System.currentTimeMillis() - >>>>>>> readerConfig.getPollOffset().get().toMillis())) { >>>>>>> // Set the query's max end to current time - offset. >>>>>>> if (endRange > (System.currentTimeMillis() - >>>>>>> readerConfig.getPollOffset().get().toMillis())) { >>>>>>> endRange = (System.currentTimeMillis() - >>>>>>> readerConfig.getPollOffset().get().toMillis()); >>>>>>> } >>>>>>> >>>>>>> if (tracker.tryClaim(endRange - 1)) { >>>>>>> >>>>>> >>>>>> Why do you try and claim to the endRange here? Shouldn't you claim >>>>>> subranges, so [start, start+pollsize), [start+pollisize, >>>>>> start+pollsize*2), >>>>>> ..., [start+pollsize*N, end)? >>>>>> >>>>>> Also, if start is significantly smaller then current time, you could >>>>>> implement the @SplitRestriction method. >>>>>> >>>>>> https://github.com/apache/beam/blob/4a7eb329734131e1ef90419f405986de94a30846/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L990 >>>>>> >>>>> >>>>> Good points! My original thinking was to have a second (bounded) SDF >>>>> that splits the ranges and executes the actual reads from the source. >>>>> Similar to the "watch + read" pattern. That way I can reuse most of the >>>>> code between the unbounded and bounded scenario. Maybe that's a >>>>> sub-optimal >>>>> approach? >>>>> >>>> >>>> Following a watch + read pattern works well. >>>> >>>> And claiming the entire range when writing a generator function makes >>>> sense. >>>> >>>> >>>>> >>>>>> >>>>>> >>>>>>> >>>>>>> >>>>>>> context.updateWatermark(org.joda.time.Instant.ofEpochMilli(startRange)); >>>>>>> >>>>>>> out.outputWithTimestamp(buildOutputElement(inputElement, startRange, >>>>>>> endRange), >>>>>>> >>>>>>> org.joda.time.Instant.ofEpochMilli(startRange)); >>>>>>> >>>>>>> // Update the start and end range for the next >>>>>>> iteration >>>>>>> startRange = endRange; >>>>>>> endRange = tracker.currentRestriction().getTo(); >>>>>>> } else { >>>>>>> LOG.info(localLoggingPrefix + "Stopping work due to >>>>>>> checkpointing or splitting."); >>>>>>> return ProcessContinuation.stop(); >>>>>>> } >>>>>>> >>>>>>> if (startRange >= tracker.currentRestriction().getTo()) { >>>>>>> LOG.info(localLoggingPrefix + "Completed the request >>>>>>> time range. Will stop the reader."); >>>>>>> return ProcessContinuation.stop(); >>>>>>> } >>>>>>> >>>>>>> return >>>>>>> ProcessContinuation.resume().withResumeDelay(org.joda.time.Duration.millis( >>>>>>> >>>>>>> readerConfig.getPollInterval().get().toMillis())); >>>>>>> } >>>>>>> >>>>>>> return >>>>>>> ProcessContinuation.resume().withResumeDelay(org.joda.time.Duration.millis( >>>>>>> readerConfig.getPollInterval().get().toMillis())); >>>>>>> } >>>>>>> >>>>>>> private OutputElement buildOutputElement(Element element, >>>>>>> long start, >>>>>>> long end) { >>>>>>> return outputElement >>>>>>> .withParameter(START_KEY, start) >>>>>>> .withParameter(END_KEY, end); >>>>>>> } >>>>>>> >>>>>>> @GetInitialRestriction >>>>>>> public OffsetRange getInitialRestriction(Element element) throws >>>>>>> Exception { >>>>>>> return new OffsetRange(startTimestamp, endTimestamp); >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> >>>>>>> On Mon, Mar 2, 2020 at 11:21 PM Luke Cwik <[email protected]> wrote: >>>>>>> >>>>>>>> SplittableDoFn has experimental support within Dataflow so the way >>>>>>>> you may be using it could be correct but unsupported. >>>>>>>> >>>>>>>> Can you provide snippets/details of your splittable dofn >>>>>>>> implementation? >>>>>>>> >>>>>>>> On Mon, Mar 2, 2020 at 11:50 AM Kjetil Halvorsen < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> I am looking for pointers to a Dataflow runner error message: Workflow >>>>>>>>> failed. Causes: Step s22 has conflicting bucketing functions, >>>>>>>>> >>>>>>>>> This happens at the very startup of the job execution, and I am >>>>>>>>> unable to find any pointer as to where in the code/job definition the >>>>>>>>> origin of the conflict is. The same job runs just fine in the >>>>>>>>> DirectRunner. >>>>>>>>> >>>>>>>>> The job contains a splittable DoFn (unbound) and I have tried it >>>>>>>>> with both a windowing transform and without a windowing >>>>>>>>> transform--both >>>>>>>>> fail with the same result on Dataflow. >>>>>>>>> >>>>>>>>> This is my first foray into splittable DoFn territory so I am sure >>>>>>>>> I have just made some basic missteps. >>>>>>>>> >>>>>>>>> Cheers, >>>>>>>>> Kjetil >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> >>>>>>>>> *Kjetil Halvorsen* >>>>>>>>> Chief Architect, Enterprise Integration >>>>>>>>> +47 48 01 13 75 | [email protected] >>>>>>>>> www.cognite.com | LIBERATE YOUR DATA™ >>>>>>>>> >>>>>>>>> >>>>>>> >>>>>>> -- >>>>>>> >>>>>>> *Kjetil Halvorsen* >>>>>>> Chief Architect, Enterprise Integration >>>>>>> +47 48 01 13 75 | [email protected] >>>>>>> www.cognite.com | LIBERATE YOUR DATA™ >>>>>>> >>>>>>> >>>>> >>>>> -- >>>>> >>>>> *Kjetil Halvorsen* >>>>> Chief Architect, Enterprise Integration >>>>> +47 48 01 13 75 | [email protected] >>>>> www.cognite.com | LIBERATE YOUR DATA™ >>>>> >>>>> >>> >>> -- >>> >>> *Kjetil Halvorsen* >>> Chief Architect, Enterprise Integration >>> +47 48 01 13 75 | [email protected] >>> www.cognite.com | LIBERATE YOUR DATA™ >>> >>> > > -- > > *Kjetil Halvorsen* > Chief Architect, Enterprise Integration > +47 48 01 13 75 | [email protected] > www.cognite.com | LIBERATE YOUR DATA™ > > -- *Kjetil Halvorsen* Chief Architect, Enterprise Integration +47 48 01 13 75 | [email protected] www.cognite.com | LIBERATE YOUR DATA™
