Thank's for the willingness to help out. The general context is that we are
developing a set of new Beam based connectors/readers.

I had hoped that SDF was ready for use with Dataflow--just because the
interface is nice to work with. In general, would you recommend that we
look at the legacy source APIs for building our connectors/readers?

Anyways, I have pasted the skeleton of the SDF below (I apologize for the
bad formatting--still learning the grips of communicating code via e-mail).
. We have used the overall pattern from the file watcher. I.e. the SDF
creates "poll requests" at regular intervals which a downstream parDo
executes. The SDF uses the built-in OffserRange as the basis for the range
tracker.

I am happy to receive any pointers on improvements, changes, debugging
paths.

/**
 * This function generates an unbounded stream of source queries.
 */
@DoFn.UnboundedPerElement
public class GenerateTsPointRequestsUnboundFn extends
DoFn<RequestParameters, RequestParameters> {

    @Setup
    public void setup() {
        validate();
    }

    @ProcessElement
    public ProcessContinuation processElement(@Element Element inputElement,

RestrictionTracker<OffsetRange, Long> tracker,
                                              OutputReceiver<outputElement>
out,
                                              ProcessContext context)
throws Exception {

        long startRange = tracker.currentRestriction().getFrom();
        long endRange = tracker.currentRestriction().getTo();

        while (startRange < (System.currentTimeMillis() -
readerConfig.getPollOffset().get().toMillis())) {
            // Set the query's max end to current time - offset.
            if (endRange > (System.currentTimeMillis() -
readerConfig.getPollOffset().get().toMillis())) {
                endRange = (System.currentTimeMillis() -
readerConfig.getPollOffset().get().toMillis());
            }

            if (tracker.tryClaim(endRange - 1)) {


context.updateWatermark(org.joda.time.Instant.ofEpochMilli(startRange));
                out.outputWithTimestamp(buildOutputElement(inputElement,
startRange, endRange),
                        org.joda.time.Instant.ofEpochMilli(startRange));

                // Update the start and end range for the next iteration
                startRange = endRange;
                endRange = tracker.currentRestriction().getTo();
            } else {
                LOG.info(localLoggingPrefix + "Stopping work due to
checkpointing or splitting.");
                return ProcessContinuation.stop();
            }

            if (startRange >= tracker.currentRestriction().getTo()) {
                LOG.info(localLoggingPrefix + "Completed the request time
range. Will stop the reader.");
                return ProcessContinuation.stop();
            }

            return
ProcessContinuation.resume().withResumeDelay(org.joda.time.Duration.millis(
                    readerConfig.getPollInterval().get().toMillis()));
        }

        return
ProcessContinuation.resume().withResumeDelay(org.joda.time.Duration.millis(
                readerConfig.getPollInterval().get().toMillis()));
    }

    private OutputElement buildOutputElement(Element element,
                                                     long start,
                                                     long end) {
        return outputElement
                .withParameter(START_KEY, start)
                .withParameter(END_KEY, end);
    }

    @GetInitialRestriction
    public OffsetRange getInitialRestriction(Element element) throws
Exception {
        return new OffsetRange(startTimestamp, endTimestamp);
    }
}


On Mon, Mar 2, 2020 at 11:21 PM Luke Cwik <[email protected]> wrote:

> SplittableDoFn has experimental support within Dataflow so the way you may
> be using it could be correct but unsupported.
>
> Can you provide snippets/details of your splittable dofn implementation?
>
> On Mon, Mar 2, 2020 at 11:50 AM Kjetil Halvorsen <
> [email protected]> wrote:
>
>>
>> Hi,
>>
>> I am looking for pointers to a Dataflow runner error message: Workflow
>> failed. Causes: Step s22 has conflicting bucketing functions,
>>
>> This happens at the very startup of the job execution, and I am unable to
>> find any pointer as to where in the code/job definition the origin of the
>> conflict is. The same job runs just fine in the DirectRunner.
>>
>> The job contains a splittable DoFn (unbound) and I have tried it with
>> both a windowing transform and without a windowing transform--both fail
>> with the same result on Dataflow.
>>
>> This is my first foray into splittable DoFn territory so I am sure I have
>> just made some basic missteps.
>>
>> Cheers,
>> Kjetil
>>
>>
>>
>>
>> --
>>
>> *Kjetil Halvorsen*
>> Chief Architect, Enterprise Integration
>> +47 48 01 13 75 | [email protected]
>> www.cognite.com | LIBERATE YOUR DATA™
>>
>>

-- 

*Kjetil Halvorsen*
Chief Architect, Enterprise Integration
+47 48 01 13 75 | [email protected]
www.cognite.com | LIBERATE YOUR DATA™

Reply via email to