Thanks for looking into this. I have been distracted on a separate (Beam)
feature the past week so it took me some time to make progress. In any
case, I have run new tests on Dataflow with a minimal pipeline.
Unfortunately with the same results: "step 22 has conflicting bucketing
functions". More info inline below.

Best,
Kjetil

On Mon, Mar 9, 2020 at 10:31 PM Luke Cwik <[email protected]> wrote:

> The bucketing "error" is likely related to what windowing
> strategy/pipeline shape you have. Have you tried running your SDF inside an
> empty pipeline possibly followed by a ParDo to log what records you are
> seeing?
>

I slimmed the pipeline down to just being this sdf plus a MapElements that
log the records. No windowing definitions nor any trigger definitions. The
results were exactly the same: The job fails somewhere in the
startup/verification phase in Dataflow (i.e. after compile/upload from the
client, but as a part of the Dataflow startup procedure). "Step 22 has
conflicting bucketing functions".


>
> On Tue, Mar 3, 2020 at 3:34 AM Kjetil Halvorsen <
> [email protected]> wrote:
>
>> Thank's for the willingness to help out. The general context is that we
>> are developing a set of new Beam based connectors/readers.
>>
>> I had hoped that SDF was ready for use with Dataflow--just because the
>> interface is nice to work with. In general, would you recommend that we
>> look at the legacy source APIs for building our connectors/readers?
>>
>
> I would not. A few contributors have been making rapid progress over the
> past few months to finish SDFs with Python done from an API standpoint
> (there is some additional integration/scaling testing going on), Java is
> missing progress reporting from the API and watermark estimation but I was
> hoping to finish those API pieces this month and Go has started on the
> batch API implementation.
>

Great, I am happy to hear that. Would love to just keep investing in the
SDF implementations we started.

>
>
>>
>> Anyways, I have pasted the skeleton of the SDF below (I apologize for the
>> bad formatting--still learning the grips of communicating code via e-mail).
>> . We have used the overall pattern from the file watcher. I.e. the SDF
>> creates "poll requests" at regular intervals which a downstream parDo
>> executes. The SDF uses the built-in OffserRange as the basis for the range
>> tracker.
>>
>> I am happy to receive any pointers on improvements, changes, debugging
>> paths.
>>
>> /**
>>  * This function generates an unbounded stream of source queries.
>>  */
>> @DoFn.UnboundedPerElement
>> public class GenerateTsPointRequestsUnboundFn extends
>> DoFn<RequestParameters, RequestParameters> {
>>
>>     @Setup
>>     public void setup() {
>>         validate();
>>     }
>>
>>     @ProcessElement
>>     public ProcessContinuation processElement(@Element Element
>> inputElement,
>>
>> RestrictionTracker<OffsetRange, Long> tracker,
>>
>> OutputReceiver<outputElement> out,
>>                                               ProcessContext context)
>> throws Exception {
>>
>>         long startRange = tracker.currentRestriction().getFrom();
>>         long endRange = tracker.currentRestriction().getTo();
>>
>>         while (startRange < (System.currentTimeMillis() -
>> readerConfig.getPollOffset().get().toMillis())) {
>>             // Set the query's max end to current time - offset.
>>             if (endRange > (System.currentTimeMillis() -
>> readerConfig.getPollOffset().get().toMillis())) {
>>                 endRange = (System.currentTimeMillis() -
>> readerConfig.getPollOffset().get().toMillis());
>>             }
>>
>>             if (tracker.tryClaim(endRange - 1)) {
>>
>
> Why do you try and claim to the endRange here? Shouldn't you claim
> subranges, so [start, start+pollsize), [start+pollisize, start+pollsize*2),
> ..., [start+pollsize*N, end)?
>
> Also, if start is significantly smaller then current time, you could
> implement the @SplitRestriction method.
>
> https://github.com/apache/beam/blob/4a7eb329734131e1ef90419f405986de94a30846/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L990
>

Good points! My original thinking was to have a second (bounded) SDF that
splits the ranges and executes the actual reads from the source. Similar to
the "watch + read" pattern. That way I can reuse most of the code between
the unbounded and bounded scenario. Maybe that's a sub-optimal approach?

>
>
>
>>
>>
>> context.updateWatermark(org.joda.time.Instant.ofEpochMilli(startRange));
>>                 out.outputWithTimestamp(buildOutputElement(inputElement,
>> startRange, endRange),
>>                         org.joda.time.Instant.ofEpochMilli(startRange));
>>
>>                 // Update the start and end range for the next iteration
>>                 startRange = endRange;
>>                 endRange = tracker.currentRestriction().getTo();
>>             } else {
>>                 LOG.info(localLoggingPrefix + "Stopping work due to
>> checkpointing or splitting.");
>>                 return ProcessContinuation.stop();
>>             }
>>
>>             if (startRange >= tracker.currentRestriction().getTo()) {
>>                 LOG.info(localLoggingPrefix + "Completed the request time
>> range. Will stop the reader.");
>>                 return ProcessContinuation.stop();
>>             }
>>
>>             return
>> ProcessContinuation.resume().withResumeDelay(org.joda.time.Duration.millis(
>>                     readerConfig.getPollInterval().get().toMillis()));
>>         }
>>
>>         return
>> ProcessContinuation.resume().withResumeDelay(org.joda.time.Duration.millis(
>>                 readerConfig.getPollInterval().get().toMillis()));
>>     }
>>
>>     private OutputElement buildOutputElement(Element element,
>>                                                      long start,
>>                                                      long end) {
>>         return outputElement
>>                 .withParameter(START_KEY, start)
>>                 .withParameter(END_KEY, end);
>>     }
>>
>>     @GetInitialRestriction
>>     public OffsetRange getInitialRestriction(Element element) throws
>> Exception {
>>         return new OffsetRange(startTimestamp, endTimestamp);
>>     }
>> }
>>
>>
>> On Mon, Mar 2, 2020 at 11:21 PM Luke Cwik <[email protected]> wrote:
>>
>>> SplittableDoFn has experimental support within Dataflow so the way you
>>> may be using it could be correct but unsupported.
>>>
>>> Can you provide snippets/details of your splittable dofn implementation?
>>>
>>> On Mon, Mar 2, 2020 at 11:50 AM Kjetil Halvorsen <
>>> [email protected]> wrote:
>>>
>>>>
>>>> Hi,
>>>>
>>>> I am looking for pointers to a Dataflow runner error message: Workflow
>>>> failed. Causes: Step s22 has conflicting bucketing functions,
>>>>
>>>> This happens at the very startup of the job execution, and I am unable
>>>> to find any pointer as to where in the code/job definition the origin of
>>>> the conflict is. The same job runs just fine in the DirectRunner.
>>>>
>>>> The job contains a splittable DoFn (unbound) and I have tried it with
>>>> both a windowing transform and without a windowing transform--both fail
>>>> with the same result on Dataflow.
>>>>
>>>> This is my first foray into splittable DoFn territory so I am sure I
>>>> have just made some basic missteps.
>>>>
>>>> Cheers,
>>>> Kjetil
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> *Kjetil Halvorsen*
>>>> Chief Architect, Enterprise Integration
>>>> +47 48 01 13 75 | [email protected]
>>>> www.cognite.com | LIBERATE YOUR DATA™
>>>>
>>>>
>>
>> --
>>
>> *Kjetil Halvorsen*
>> Chief Architect, Enterprise Integration
>> +47 48 01 13 75 | [email protected]
>> www.cognite.com | LIBERATE YOUR DATA™
>>
>>

-- 

*Kjetil Halvorsen*
Chief Architect, Enterprise Integration
+47 48 01 13 75 | [email protected]
www.cognite.com | LIBERATE YOUR DATA™

Reply via email to