Thank's for the willingness to help out. The general context is that we are
developing a set of new Beam based connectors/readers.
I had hoped that SDF was ready for use with Dataflow--just because the
interface is nice to work with. In general, would you recommend that we
look at the legacy source APIs for building our connectors/readers?
Anyways, I have pasted the skeleton of the SDF below (I apologize for the
bad formatting--still learning the grips of communicating code via e-mail).
. We have used the overall pattern from the file watcher. I.e. the SDF
creates "poll requests" at regular intervals which a downstream parDo
executes. The SDF uses the built-in OffserRange as the basis for the range
tracker.
I am happy to receive any pointers on improvements, changes, debugging
paths.
/**
* This function generates an unbounded stream of source queries.
*/
@DoFn.UnboundedPerElement
public class GenerateTsPointRequestsUnboundFn extends
DoFn<RequestParameters, RequestParameters> {
@Setup
public void setup() {
validate();
}
@ProcessElement
public ProcessContinuation processElement(@Element Element inputElement,
RestrictionTracker<OffsetRange, Long> tracker,
OutputReceiver<outputElement>
out,
ProcessContext context)
throws Exception {
long startRange = tracker.currentRestriction().getFrom();
long endRange = tracker.currentRestriction().getTo();
while (startRange < (System.currentTimeMillis() -
readerConfig.getPollOffset().get().toMillis())) {
// Set the query's max end to current time - offset.
if (endRange > (System.currentTimeMillis() -
readerConfig.getPollOffset().get().toMillis())) {
endRange = (System.currentTimeMillis() -
readerConfig.getPollOffset().get().toMillis());
}
if (tracker.tryClaim(endRange - 1)) {
context.updateWatermark(org.joda.time.Instant.ofEpochMilli(startRange));
out.outputWithTimestamp(buildOutputElement(inputElement,
startRange, endRange),
org.joda.time.Instant.ofEpochMilli(startRange));
// Update the start and end range for the next iteration
startRange = endRange;
endRange = tracker.currentRestriction().getTo();
} else {
LOG.info(localLoggingPrefix + "Stopping work due to
checkpointing or splitting.");
return ProcessContinuation.stop();
}
if (startRange >= tracker.currentRestriction().getTo()) {
LOG.info(localLoggingPrefix + "Completed the request time
range. Will stop the reader.");
return ProcessContinuation.stop();
}
return
ProcessContinuation.resume().withResumeDelay(org.joda.time.Duration.millis(
readerConfig.getPollInterval().get().toMillis()));
}
return
ProcessContinuation.resume().withResumeDelay(org.joda.time.Duration.millis(
readerConfig.getPollInterval().get().toMillis()));
}
private OutputElement buildOutputElement(Element element,
long start,
long end) {
return outputElement
.withParameter(START_KEY, start)
.withParameter(END_KEY, end);
}
@GetInitialRestriction
public OffsetRange getInitialRestriction(Element element) throws
Exception {
return new OffsetRange(startTimestamp, endTimestamp);
}
}
On Mon, Mar 2, 2020 at 11:21 PM Luke Cwik <[email protected]> wrote:
> SplittableDoFn has experimental support within Dataflow so the way you may
> be using it could be correct but unsupported.
>
> Can you provide snippets/details of your splittable dofn implementation?
>
> On Mon, Mar 2, 2020 at 11:50 AM Kjetil Halvorsen <
> [email protected]> wrote:
>
>>
>> Hi,
>>
>> I am looking for pointers to a Dataflow runner error message: Workflow
>> failed. Causes: Step s22 has conflicting bucketing functions,
>>
>> This happens at the very startup of the job execution, and I am unable to
>> find any pointer as to where in the code/job definition the origin of the
>> conflict is. The same job runs just fine in the DirectRunner.
>>
>> The job contains a splittable DoFn (unbound) and I have tried it with
>> both a windowing transform and without a windowing transform--both fail
>> with the same result on Dataflow.
>>
>> This is my first foray into splittable DoFn territory so I am sure I have
>> just made some basic missteps.
>>
>> Cheers,
>> Kjetil
>>
>>
>>
>>
>> --
>>
>> *Kjetil Halvorsen*
>> Chief Architect, Enterprise Integration
>> +47 48 01 13 75 | [email protected]
>> www.cognite.com | LIBERATE YOUR DATA™
>>
>>
--
*Kjetil Halvorsen*
Chief Architect, Enterprise Integration
+47 48 01 13 75 | [email protected]
www.cognite.com | LIBERATE YOUR DATA™