Thank you for the tip about the "--dataFlowJobFile". I wasn't aware of it,
and it was of great help to interpret the error message from Dataflow.

I found the error/bug in an upstream DoFn (execute before the SDF) with a
side-input. Both the main input to the DoFn and the side input were bounded
and using the default window and trigger (i.e. no windowing nor trigger
specified in the job).

When I moved that particular DoFn to be downstream to the SDF, the job
started working.

Maybe this is by design and I just hadn't registered that one cannot have a
side-input DoFn upstream to an unbound SDF?

In any case, thank you for the patience and willingness to help out.

Best,
Kjetil

On Tue, Mar 17, 2020 at 5:14 PM Luke Cwik <[email protected]> wrote:

>
>
> On Tue, Mar 17, 2020 at 5:15 AM Kjetil Halvorsen <
> [email protected]> wrote:
>
>> Thanks for looking into this. I have been distracted on a separate (Beam)
>> feature the past week so it took me some time to make progress. In any
>> case, I have run new tests on Dataflow with a minimal pipeline.
>> Unfortunately with the same results: "step 22 has conflicting bucketing
>> functions". More info inline below.
>>
>> Best,
>> Kjetil
>>
>> On Mon, Mar 9, 2020 at 10:31 PM Luke Cwik <[email protected]> wrote:
>>
>>> The bucketing "error" is likely related to what windowing
>>> strategy/pipeline shape you have. Have you tried running your SDF inside an
>>> empty pipeline possibly followed by a ParDo to log what records you are
>>> seeing?
>>>
>>
>> I slimmed the pipeline down to just being this sdf plus a MapElements
>> that log the records. No windowing definitions nor any trigger definitions.
>> The results were exactly the same: The job fails somewhere in the
>> startup/verification phase in Dataflow (i.e. after compile/upload from the
>> client, but as a part of the Dataflow startup procedure). "Step 22 has
>> conflicting bucketing functions".
>>
>
> The error is because the windowing fn on the GBKs are different. You can
> dump and inspect the JSON job description using the flag
> --dataflowJobFile=/path/to/dump/file.json
>
>
>>
>>>
>>> On Tue, Mar 3, 2020 at 3:34 AM Kjetil Halvorsen <
>>> [email protected]> wrote:
>>>
>>>> Thank's for the willingness to help out. The general context is that we
>>>> are developing a set of new Beam based connectors/readers.
>>>>
>>>> I had hoped that SDF was ready for use with Dataflow--just because the
>>>> interface is nice to work with. In general, would you recommend that we
>>>> look at the legacy source APIs for building our connectors/readers?
>>>>
>>>
>>> I would not. A few contributors have been making rapid progress over the
>>> past few months to finish SDFs with Python done from an API standpoint
>>> (there is some additional integration/scaling testing going on), Java is
>>> missing progress reporting from the API and watermark estimation but I was
>>> hoping to finish those API pieces this month and Go has started on the
>>> batch API implementation.
>>>
>>
>> Great, I am happy to hear that. Would love to just keep investing in the
>> SDF implementations we started.
>>
>>>
>>>
>>>>
>>>> Anyways, I have pasted the skeleton of the SDF below (I apologize for
>>>> the bad formatting--still learning the grips of communicating code via
>>>> e-mail). . We have used the overall pattern from the file watcher. I.e. the
>>>> SDF creates "poll requests" at regular intervals which a downstream parDo
>>>> executes. The SDF uses the built-in OffserRange as the basis for the range
>>>> tracker.
>>>>
>>>> I am happy to receive any pointers on improvements, changes, debugging
>>>> paths.
>>>>
>>>> /**
>>>>  * This function generates an unbounded stream of source queries.
>>>>  */
>>>> @DoFn.UnboundedPerElement
>>>> public class GenerateTsPointRequestsUnboundFn extends
>>>> DoFn<RequestParameters, RequestParameters> {
>>>>
>>>>     @Setup
>>>>     public void setup() {
>>>>         validate();
>>>>     }
>>>>
>>>>     @ProcessElement
>>>>     public ProcessContinuation processElement(@Element Element
>>>> inputElement,
>>>>
>>>> RestrictionTracker<OffsetRange, Long> tracker,
>>>>
>>>> OutputReceiver<outputElement> out,
>>>>                                               ProcessContext context)
>>>> throws Exception {
>>>>
>>>>         long startRange = tracker.currentRestriction().getFrom();
>>>>         long endRange = tracker.currentRestriction().getTo();
>>>>
>>>>         while (startRange < (System.currentTimeMillis() -
>>>> readerConfig.getPollOffset().get().toMillis())) {
>>>>             // Set the query's max end to current time - offset.
>>>>             if (endRange > (System.currentTimeMillis() -
>>>> readerConfig.getPollOffset().get().toMillis())) {
>>>>                 endRange = (System.currentTimeMillis() -
>>>> readerConfig.getPollOffset().get().toMillis());
>>>>             }
>>>>
>>>>             if (tracker.tryClaim(endRange - 1)) {
>>>>
>>>
>>> Why do you try and claim to the endRange here? Shouldn't you claim
>>> subranges, so [start, start+pollsize), [start+pollisize, start+pollsize*2),
>>> ..., [start+pollsize*N, end)?
>>>
>>> Also, if start is significantly smaller then current time, you could
>>> implement the @SplitRestriction method.
>>>
>>> https://github.com/apache/beam/blob/4a7eb329734131e1ef90419f405986de94a30846/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L990
>>>
>>
>> Good points! My original thinking was to have a second (bounded) SDF that
>> splits the ranges and executes the actual reads from the source. Similar to
>> the "watch + read" pattern. That way I can reuse most of the code between
>> the unbounded and bounded scenario. Maybe that's a sub-optimal approach?
>>
>
> Following a watch + read pattern works well.
>
> And claiming the entire range when writing a generator function makes
> sense.
>
>
>>
>>>
>>>
>>>>
>>>>
>>>> context.updateWatermark(org.joda.time.Instant.ofEpochMilli(startRange));
>>>>
>>>> out.outputWithTimestamp(buildOutputElement(inputElement, startRange,
>>>> endRange),
>>>>                         org.joda.time.Instant.ofEpochMilli(startRange));
>>>>
>>>>                 // Update the start and end range for the next iteration
>>>>                 startRange = endRange;
>>>>                 endRange = tracker.currentRestriction().getTo();
>>>>             } else {
>>>>                 LOG.info(localLoggingPrefix + "Stopping work due to
>>>> checkpointing or splitting.");
>>>>                 return ProcessContinuation.stop();
>>>>             }
>>>>
>>>>             if (startRange >= tracker.currentRestriction().getTo()) {
>>>>                 LOG.info(localLoggingPrefix + "Completed the request
>>>> time range. Will stop the reader.");
>>>>                 return ProcessContinuation.stop();
>>>>             }
>>>>
>>>>             return
>>>> ProcessContinuation.resume().withResumeDelay(org.joda.time.Duration.millis(
>>>>                     readerConfig.getPollInterval().get().toMillis()));
>>>>         }
>>>>
>>>>         return
>>>> ProcessContinuation.resume().withResumeDelay(org.joda.time.Duration.millis(
>>>>                 readerConfig.getPollInterval().get().toMillis()));
>>>>     }
>>>>
>>>>     private OutputElement buildOutputElement(Element element,
>>>>                                                      long start,
>>>>                                                      long end) {
>>>>         return outputElement
>>>>                 .withParameter(START_KEY, start)
>>>>                 .withParameter(END_KEY, end);
>>>>     }
>>>>
>>>>     @GetInitialRestriction
>>>>     public OffsetRange getInitialRestriction(Element element) throws
>>>> Exception {
>>>>         return new OffsetRange(startTimestamp, endTimestamp);
>>>>     }
>>>> }
>>>>
>>>>
>>>> On Mon, Mar 2, 2020 at 11:21 PM Luke Cwik <[email protected]> wrote:
>>>>
>>>>> SplittableDoFn has experimental support within Dataflow so the way you
>>>>> may be using it could be correct but unsupported.
>>>>>
>>>>> Can you provide snippets/details of your splittable dofn
>>>>> implementation?
>>>>>
>>>>> On Mon, Mar 2, 2020 at 11:50 AM Kjetil Halvorsen <
>>>>> [email protected]> wrote:
>>>>>
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I am looking for pointers to a Dataflow runner error message: Workflow
>>>>>> failed. Causes: Step s22 has conflicting bucketing functions,
>>>>>>
>>>>>> This happens at the very startup of the job execution, and I am
>>>>>> unable to find any pointer as to where in the code/job definition the
>>>>>> origin of the conflict is. The same job runs just fine in the 
>>>>>> DirectRunner.
>>>>>>
>>>>>> The job contains a splittable DoFn (unbound) and I have tried it with
>>>>>> both a windowing transform and without a windowing transform--both fail
>>>>>> with the same result on Dataflow.
>>>>>>
>>>>>> This is my first foray into splittable DoFn territory so I am sure I
>>>>>> have just made some basic missteps.
>>>>>>
>>>>>> Cheers,
>>>>>> Kjetil
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>>
>>>>>> *Kjetil Halvorsen*
>>>>>> Chief Architect, Enterprise Integration
>>>>>> +47 48 01 13 75 | [email protected]
>>>>>> www.cognite.com | LIBERATE YOUR DATA™
>>>>>>
>>>>>>
>>>>
>>>> --
>>>>
>>>> *Kjetil Halvorsen*
>>>> Chief Architect, Enterprise Integration
>>>> +47 48 01 13 75 | [email protected]
>>>> www.cognite.com | LIBERATE YOUR DATA™
>>>>
>>>>
>>
>> --
>>
>> *Kjetil Halvorsen*
>> Chief Architect, Enterprise Integration
>> +47 48 01 13 75 | [email protected]
>> www.cognite.com | LIBERATE YOUR DATA™
>>
>>

-- 

*Kjetil Halvorsen*
Chief Architect, Enterprise Integration
+47 48 01 13 75 | [email protected]
www.cognite.com | LIBERATE YOUR DATA™

Reply via email to