Perfect, thanks. I did some more testing, and it seems to narrow down to using FileIO.match -> readMatches -> to drive the upstream side input. I have attached a pipeline that reproduces the error. When I run it with Beam 2.17 or 2.18 it will fail on Dataflow. I have not tested with 2.19 due to the blocker on Win Java.
Please let me know if there is anything else I can do to help. I am very motivated to get this sorted out as we have lots of scenarios lined up. Best, Kjetil On Thu, Mar 19, 2020 at 10:56 PM Luke Cwik <[email protected]> wrote: > That doesn't sound like it should be an issue and sounds like a bug in > Dataflow. > > If you're willing to share a minimal pipeline that gets this error. I can > get an issue opened up internally and assigned. > > On Thu, Mar 19, 2020 at 2:09 PM Kjetil Halvorsen < > [email protected]> wrote: > >> Thank you for the tip about the "--dataFlowJobFile". I wasn't aware of >> it, and it was of great help to interpret the error message from Dataflow. >> >> I found the error/bug in an upstream DoFn (execute before the SDF) with a >> side-input. Both the main input to the DoFn and the side input were bounded >> and using the default window and trigger (i.e. no windowing nor trigger >> specified in the job). >> >> When I moved that particular DoFn to be downstream to the SDF, the job >> started working. >> >> Maybe this is by design and I just hadn't registered that one cannot have >> a side-input DoFn upstream to an unbound SDF? >> >> In any case, thank you for the patience and willingness to help out. >> >> Best, >> Kjetil >> >> On Tue, Mar 17, 2020 at 5:14 PM Luke Cwik <[email protected]> wrote: >> >>> >>> >>> On Tue, Mar 17, 2020 at 5:15 AM Kjetil Halvorsen < >>> [email protected]> wrote: >>> >>>> Thanks for looking into this. I have been distracted on a separate >>>> (Beam) feature the past week so it took me some time to make progress. In >>>> any case, I have run new tests on Dataflow with a minimal pipeline. >>>> Unfortunately with the same results: "step 22 has conflicting bucketing >>>> functions". More info inline below. >>>> >>>> Best, >>>> Kjetil >>>> >>>> On Mon, Mar 9, 2020 at 10:31 PM Luke Cwik <[email protected]> wrote: >>>> >>>>> The bucketing "error" is likely related to what windowing >>>>> strategy/pipeline shape you have. Have you tried running your SDF inside >>>>> an >>>>> empty pipeline possibly followed by a ParDo to log what records you are >>>>> seeing? >>>>> >>>> >>>> I slimmed the pipeline down to just being this sdf plus a MapElements >>>> that log the records. No windowing definitions nor any trigger definitions. >>>> The results were exactly the same: The job fails somewhere in the >>>> startup/verification phase in Dataflow (i.e. after compile/upload from the >>>> client, but as a part of the Dataflow startup procedure). "Step 22 has >>>> conflicting bucketing functions". >>>> >>> >>> The error is because the windowing fn on the GBKs are different. You can >>> dump and inspect the JSON job description using the flag >>> --dataflowJobFile=/path/to/dump/file.json >>> >>> >>>> >>>>> >>>>> On Tue, Mar 3, 2020 at 3:34 AM Kjetil Halvorsen < >>>>> [email protected]> wrote: >>>>> >>>>>> Thank's for the willingness to help out. The general context is that >>>>>> we are developing a set of new Beam based connectors/readers. >>>>>> >>>>>> I had hoped that SDF was ready for use with Dataflow--just because >>>>>> the interface is nice to work with. In general, would you recommend that >>>>>> we >>>>>> look at the legacy source APIs for building our connectors/readers? >>>>>> >>>>> >>>>> I would not. A few contributors have been making rapid progress over >>>>> the past few months to finish SDFs with Python done from an API standpoint >>>>> (there is some additional integration/scaling testing going on), Java is >>>>> missing progress reporting from the API and watermark estimation but I was >>>>> hoping to finish those API pieces this month and Go has started on the >>>>> batch API implementation. >>>>> >>>> >>>> Great, I am happy to hear that. Would love to just keep investing in >>>> the SDF implementations we started. >>>> >>>>> >>>>> >>>>>> >>>>>> Anyways, I have pasted the skeleton of the SDF below (I apologize for >>>>>> the bad formatting--still learning the grips of communicating code via >>>>>> e-mail). . We have used the overall pattern from the file watcher. I.e. >>>>>> the >>>>>> SDF creates "poll requests" at regular intervals which a downstream parDo >>>>>> executes. The SDF uses the built-in OffserRange as the basis for the >>>>>> range >>>>>> tracker. >>>>>> >>>>>> I am happy to receive any pointers on improvements, changes, >>>>>> debugging paths. >>>>>> >>>>>> /** >>>>>> * This function generates an unbounded stream of source queries. >>>>>> */ >>>>>> @DoFn.UnboundedPerElement >>>>>> public class GenerateTsPointRequestsUnboundFn extends >>>>>> DoFn<RequestParameters, RequestParameters> { >>>>>> >>>>>> @Setup >>>>>> public void setup() { >>>>>> validate(); >>>>>> } >>>>>> >>>>>> @ProcessElement >>>>>> public ProcessContinuation processElement(@Element Element >>>>>> inputElement, >>>>>> >>>>>> RestrictionTracker<OffsetRange, Long> tracker, >>>>>> >>>>>> OutputReceiver<outputElement> out, >>>>>> ProcessContext context) >>>>>> throws Exception { >>>>>> >>>>>> long startRange = tracker.currentRestriction().getFrom(); >>>>>> long endRange = tracker.currentRestriction().getTo(); >>>>>> >>>>>> while (startRange < (System.currentTimeMillis() - >>>>>> readerConfig.getPollOffset().get().toMillis())) { >>>>>> // Set the query's max end to current time - offset. >>>>>> if (endRange > (System.currentTimeMillis() - >>>>>> readerConfig.getPollOffset().get().toMillis())) { >>>>>> endRange = (System.currentTimeMillis() - >>>>>> readerConfig.getPollOffset().get().toMillis()); >>>>>> } >>>>>> >>>>>> if (tracker.tryClaim(endRange - 1)) { >>>>>> >>>>> >>>>> Why do you try and claim to the endRange here? Shouldn't you claim >>>>> subranges, so [start, start+pollsize), [start+pollisize, >>>>> start+pollsize*2), >>>>> ..., [start+pollsize*N, end)? >>>>> >>>>> Also, if start is significantly smaller then current time, you could >>>>> implement the @SplitRestriction method. >>>>> >>>>> https://github.com/apache/beam/blob/4a7eb329734131e1ef90419f405986de94a30846/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L990 >>>>> >>>> >>>> Good points! My original thinking was to have a second (bounded) SDF >>>> that splits the ranges and executes the actual reads from the source. >>>> Similar to the "watch + read" pattern. That way I can reuse most of the >>>> code between the unbounded and bounded scenario. Maybe that's a sub-optimal >>>> approach? >>>> >>> >>> Following a watch + read pattern works well. >>> >>> And claiming the entire range when writing a generator function makes >>> sense. >>> >>> >>>> >>>>> >>>>> >>>>>> >>>>>> >>>>>> context.updateWatermark(org.joda.time.Instant.ofEpochMilli(startRange)); >>>>>> >>>>>> out.outputWithTimestamp(buildOutputElement(inputElement, startRange, >>>>>> endRange), >>>>>> >>>>>> org.joda.time.Instant.ofEpochMilli(startRange)); >>>>>> >>>>>> // Update the start and end range for the next >>>>>> iteration >>>>>> startRange = endRange; >>>>>> endRange = tracker.currentRestriction().getTo(); >>>>>> } else { >>>>>> LOG.info(localLoggingPrefix + "Stopping work due to >>>>>> checkpointing or splitting."); >>>>>> return ProcessContinuation.stop(); >>>>>> } >>>>>> >>>>>> if (startRange >= tracker.currentRestriction().getTo()) { >>>>>> LOG.info(localLoggingPrefix + "Completed the request >>>>>> time range. Will stop the reader."); >>>>>> return ProcessContinuation.stop(); >>>>>> } >>>>>> >>>>>> return >>>>>> ProcessContinuation.resume().withResumeDelay(org.joda.time.Duration.millis( >>>>>> readerConfig.getPollInterval().get().toMillis())); >>>>>> } >>>>>> >>>>>> return >>>>>> ProcessContinuation.resume().withResumeDelay(org.joda.time.Duration.millis( >>>>>> readerConfig.getPollInterval().get().toMillis())); >>>>>> } >>>>>> >>>>>> private OutputElement buildOutputElement(Element element, >>>>>> long start, >>>>>> long end) { >>>>>> return outputElement >>>>>> .withParameter(START_KEY, start) >>>>>> .withParameter(END_KEY, end); >>>>>> } >>>>>> >>>>>> @GetInitialRestriction >>>>>> public OffsetRange getInitialRestriction(Element element) throws >>>>>> Exception { >>>>>> return new OffsetRange(startTimestamp, endTimestamp); >>>>>> } >>>>>> } >>>>>> >>>>>> >>>>>> On Mon, Mar 2, 2020 at 11:21 PM Luke Cwik <[email protected]> wrote: >>>>>> >>>>>>> SplittableDoFn has experimental support within Dataflow so the way >>>>>>> you may be using it could be correct but unsupported. >>>>>>> >>>>>>> Can you provide snippets/details of your splittable dofn >>>>>>> implementation? >>>>>>> >>>>>>> On Mon, Mar 2, 2020 at 11:50 AM Kjetil Halvorsen < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> I am looking for pointers to a Dataflow runner error message: Workflow >>>>>>>> failed. Causes: Step s22 has conflicting bucketing functions, >>>>>>>> >>>>>>>> This happens at the very startup of the job execution, and I am >>>>>>>> unable to find any pointer as to where in the code/job definition the >>>>>>>> origin of the conflict is. The same job runs just fine in the >>>>>>>> DirectRunner. >>>>>>>> >>>>>>>> The job contains a splittable DoFn (unbound) and I have tried it >>>>>>>> with both a windowing transform and without a windowing transform--both >>>>>>>> fail with the same result on Dataflow. >>>>>>>> >>>>>>>> This is my first foray into splittable DoFn territory so I am sure >>>>>>>> I have just made some basic missteps. >>>>>>>> >>>>>>>> Cheers, >>>>>>>> Kjetil >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> >>>>>>>> *Kjetil Halvorsen* >>>>>>>> Chief Architect, Enterprise Integration >>>>>>>> +47 48 01 13 75 | [email protected] >>>>>>>> www.cognite.com | LIBERATE YOUR DATA™ >>>>>>>> >>>>>>>> >>>>>> >>>>>> -- >>>>>> >>>>>> *Kjetil Halvorsen* >>>>>> Chief Architect, Enterprise Integration >>>>>> +47 48 01 13 75 | [email protected] >>>>>> www.cognite.com | LIBERATE YOUR DATA™ >>>>>> >>>>>> >>>> >>>> -- >>>> >>>> *Kjetil Halvorsen* >>>> Chief Architect, Enterprise Integration >>>> +47 48 01 13 75 | [email protected] >>>> www.cognite.com | LIBERATE YOUR DATA™ >>>> >>>> >> >> -- >> >> *Kjetil Halvorsen* >> Chief Architect, Enterprise Integration >> +47 48 01 13 75 | [email protected] >> www.cognite.com | LIBERATE YOUR DATA™ >> >> -- *Kjetil Halvorsen* Chief Architect, Enterprise Integration +47 48 01 13 75 | [email protected] www.cognite.com | LIBERATE YOUR DATA™
TsPointsStreamingFail.java
Description: Binary data
