Perfect, thanks.

I did some more testing, and it seems to narrow down to using FileIO.match
-> readMatches -> to drive the upstream side input. I have attached a
pipeline that reproduces the error. When I run it with Beam 2.17 or 2.18 it
will fail on Dataflow. I have not tested with 2.19 due to the blocker on
Win Java.

Please let me know if there is anything else I can do to help. I am very
motivated to get this sorted out as we have lots of scenarios lined up.

Best,
Kjetil

On Thu, Mar 19, 2020 at 10:56 PM Luke Cwik <[email protected]> wrote:

> That doesn't sound like it should be an issue and sounds like a bug in
> Dataflow.
>
> If you're willing to share a minimal pipeline that gets this error. I can
> get an issue opened up internally and assigned.
>
> On Thu, Mar 19, 2020 at 2:09 PM Kjetil Halvorsen <
> [email protected]> wrote:
>
>> Thank you for the tip about the "--dataFlowJobFile". I wasn't aware of
>> it, and it was of great help to interpret the error message from Dataflow.
>>
>> I found the error/bug in an upstream DoFn (execute before the SDF) with a
>> side-input. Both the main input to the DoFn and the side input were bounded
>> and using the default window and trigger (i.e. no windowing nor trigger
>> specified in the job).
>>
>> When I moved that particular DoFn to be downstream to the SDF, the job
>> started working.
>>
>> Maybe this is by design and I just hadn't registered that one cannot have
>> a side-input DoFn upstream to an unbound SDF?
>>
>> In any case, thank you for the patience and willingness to help out.
>>
>> Best,
>> Kjetil
>>
>> On Tue, Mar 17, 2020 at 5:14 PM Luke Cwik <[email protected]> wrote:
>>
>>>
>>>
>>> On Tue, Mar 17, 2020 at 5:15 AM Kjetil Halvorsen <
>>> [email protected]> wrote:
>>>
>>>> Thanks for looking into this. I have been distracted on a separate
>>>> (Beam) feature the past week so it took me some time to make progress. In
>>>> any case, I have run new tests on Dataflow with a minimal pipeline.
>>>> Unfortunately with the same results: "step 22 has conflicting bucketing
>>>> functions". More info inline below.
>>>>
>>>> Best,
>>>> Kjetil
>>>>
>>>> On Mon, Mar 9, 2020 at 10:31 PM Luke Cwik <[email protected]> wrote:
>>>>
>>>>> The bucketing "error" is likely related to what windowing
>>>>> strategy/pipeline shape you have. Have you tried running your SDF inside 
>>>>> an
>>>>> empty pipeline possibly followed by a ParDo to log what records you are
>>>>> seeing?
>>>>>
>>>>
>>>> I slimmed the pipeline down to just being this sdf plus a MapElements
>>>> that log the records. No windowing definitions nor any trigger definitions.
>>>> The results were exactly the same: The job fails somewhere in the
>>>> startup/verification phase in Dataflow (i.e. after compile/upload from the
>>>> client, but as a part of the Dataflow startup procedure). "Step 22 has
>>>> conflicting bucketing functions".
>>>>
>>>
>>> The error is because the windowing fn on the GBKs are different. You can
>>> dump and inspect the JSON job description using the flag
>>> --dataflowJobFile=/path/to/dump/file.json
>>>
>>>
>>>>
>>>>>
>>>>> On Tue, Mar 3, 2020 at 3:34 AM Kjetil Halvorsen <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> Thank's for the willingness to help out. The general context is that
>>>>>> we are developing a set of new Beam based connectors/readers.
>>>>>>
>>>>>> I had hoped that SDF was ready for use with Dataflow--just because
>>>>>> the interface is nice to work with. In general, would you recommend that 
>>>>>> we
>>>>>> look at the legacy source APIs for building our connectors/readers?
>>>>>>
>>>>>
>>>>> I would not. A few contributors have been making rapid progress over
>>>>> the past few months to finish SDFs with Python done from an API standpoint
>>>>> (there is some additional integration/scaling testing going on), Java is
>>>>> missing progress reporting from the API and watermark estimation but I was
>>>>> hoping to finish those API pieces this month and Go has started on the
>>>>> batch API implementation.
>>>>>
>>>>
>>>> Great, I am happy to hear that. Would love to just keep investing in
>>>> the SDF implementations we started.
>>>>
>>>>>
>>>>>
>>>>>>
>>>>>> Anyways, I have pasted the skeleton of the SDF below (I apologize for
>>>>>> the bad formatting--still learning the grips of communicating code via
>>>>>> e-mail). . We have used the overall pattern from the file watcher. I.e. 
>>>>>> the
>>>>>> SDF creates "poll requests" at regular intervals which a downstream parDo
>>>>>> executes. The SDF uses the built-in OffserRange as the basis for the 
>>>>>> range
>>>>>> tracker.
>>>>>>
>>>>>> I am happy to receive any pointers on improvements, changes,
>>>>>> debugging paths.
>>>>>>
>>>>>> /**
>>>>>>  * This function generates an unbounded stream of source queries.
>>>>>>  */
>>>>>> @DoFn.UnboundedPerElement
>>>>>> public class GenerateTsPointRequestsUnboundFn extends
>>>>>> DoFn<RequestParameters, RequestParameters> {
>>>>>>
>>>>>>     @Setup
>>>>>>     public void setup() {
>>>>>>         validate();
>>>>>>     }
>>>>>>
>>>>>>     @ProcessElement
>>>>>>     public ProcessContinuation processElement(@Element Element
>>>>>> inputElement,
>>>>>>
>>>>>> RestrictionTracker<OffsetRange, Long> tracker,
>>>>>>
>>>>>> OutputReceiver<outputElement> out,
>>>>>>                                               ProcessContext context)
>>>>>> throws Exception {
>>>>>>
>>>>>>         long startRange = tracker.currentRestriction().getFrom();
>>>>>>         long endRange = tracker.currentRestriction().getTo();
>>>>>>
>>>>>>         while (startRange < (System.currentTimeMillis() -
>>>>>> readerConfig.getPollOffset().get().toMillis())) {
>>>>>>             // Set the query's max end to current time - offset.
>>>>>>             if (endRange > (System.currentTimeMillis() -
>>>>>> readerConfig.getPollOffset().get().toMillis())) {
>>>>>>                 endRange = (System.currentTimeMillis() -
>>>>>> readerConfig.getPollOffset().get().toMillis());
>>>>>>             }
>>>>>>
>>>>>>             if (tracker.tryClaim(endRange - 1)) {
>>>>>>
>>>>>
>>>>> Why do you try and claim to the endRange here? Shouldn't you claim
>>>>> subranges, so [start, start+pollsize), [start+pollisize, 
>>>>> start+pollsize*2),
>>>>> ..., [start+pollsize*N, end)?
>>>>>
>>>>> Also, if start is significantly smaller then current time, you could
>>>>> implement the @SplitRestriction method.
>>>>>
>>>>> https://github.com/apache/beam/blob/4a7eb329734131e1ef90419f405986de94a30846/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L990
>>>>>
>>>>
>>>> Good points! My original thinking was to have a second (bounded) SDF
>>>> that splits the ranges and executes the actual reads from the source.
>>>> Similar to the "watch + read" pattern. That way I can reuse most of the
>>>> code between the unbounded and bounded scenario. Maybe that's a sub-optimal
>>>> approach?
>>>>
>>>
>>> Following a watch + read pattern works well.
>>>
>>> And claiming the entire range when writing a generator function makes
>>> sense.
>>>
>>>
>>>>
>>>>>
>>>>>
>>>>>>
>>>>>>
>>>>>> context.updateWatermark(org.joda.time.Instant.ofEpochMilli(startRange));
>>>>>>
>>>>>> out.outputWithTimestamp(buildOutputElement(inputElement, startRange,
>>>>>> endRange),
>>>>>>
>>>>>> org.joda.time.Instant.ofEpochMilli(startRange));
>>>>>>
>>>>>>                 // Update the start and end range for the next
>>>>>> iteration
>>>>>>                 startRange = endRange;
>>>>>>                 endRange = tracker.currentRestriction().getTo();
>>>>>>             } else {
>>>>>>                 LOG.info(localLoggingPrefix + "Stopping work due to
>>>>>> checkpointing or splitting.");
>>>>>>                 return ProcessContinuation.stop();
>>>>>>             }
>>>>>>
>>>>>>             if (startRange >= tracker.currentRestriction().getTo()) {
>>>>>>                 LOG.info(localLoggingPrefix + "Completed the request
>>>>>> time range. Will stop the reader.");
>>>>>>                 return ProcessContinuation.stop();
>>>>>>             }
>>>>>>
>>>>>>             return
>>>>>> ProcessContinuation.resume().withResumeDelay(org.joda.time.Duration.millis(
>>>>>>                     readerConfig.getPollInterval().get().toMillis()));
>>>>>>         }
>>>>>>
>>>>>>         return
>>>>>> ProcessContinuation.resume().withResumeDelay(org.joda.time.Duration.millis(
>>>>>>                 readerConfig.getPollInterval().get().toMillis()));
>>>>>>     }
>>>>>>
>>>>>>     private OutputElement buildOutputElement(Element element,
>>>>>>                                                      long start,
>>>>>>                                                      long end) {
>>>>>>         return outputElement
>>>>>>                 .withParameter(START_KEY, start)
>>>>>>                 .withParameter(END_KEY, end);
>>>>>>     }
>>>>>>
>>>>>>     @GetInitialRestriction
>>>>>>     public OffsetRange getInitialRestriction(Element element) throws
>>>>>> Exception {
>>>>>>         return new OffsetRange(startTimestamp, endTimestamp);
>>>>>>     }
>>>>>> }
>>>>>>
>>>>>>
>>>>>> On Mon, Mar 2, 2020 at 11:21 PM Luke Cwik <[email protected]> wrote:
>>>>>>
>>>>>>> SplittableDoFn has experimental support within Dataflow so the way
>>>>>>> you may be using it could be correct but unsupported.
>>>>>>>
>>>>>>> Can you provide snippets/details of your splittable dofn
>>>>>>> implementation?
>>>>>>>
>>>>>>> On Mon, Mar 2, 2020 at 11:50 AM Kjetil Halvorsen <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> I am looking for pointers to a Dataflow runner error message: Workflow
>>>>>>>> failed. Causes: Step s22 has conflicting bucketing functions,
>>>>>>>>
>>>>>>>> This happens at the very startup of the job execution, and I am
>>>>>>>> unable to find any pointer as to where in the code/job definition the
>>>>>>>> origin of the conflict is. The same job runs just fine in the 
>>>>>>>> DirectRunner.
>>>>>>>>
>>>>>>>> The job contains a splittable DoFn (unbound) and I have tried it
>>>>>>>> with both a windowing transform and without a windowing transform--both
>>>>>>>> fail with the same result on Dataflow.
>>>>>>>>
>>>>>>>> This is my first foray into splittable DoFn territory so I am sure
>>>>>>>> I have just made some basic missteps.
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Kjetil
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>>
>>>>>>>> *Kjetil Halvorsen*
>>>>>>>> Chief Architect, Enterprise Integration
>>>>>>>> +47 48 01 13 75 | [email protected]
>>>>>>>> www.cognite.com | LIBERATE YOUR DATA™
>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>> --
>>>>>>
>>>>>> *Kjetil Halvorsen*
>>>>>> Chief Architect, Enterprise Integration
>>>>>> +47 48 01 13 75 | [email protected]
>>>>>> www.cognite.com | LIBERATE YOUR DATA™
>>>>>>
>>>>>>
>>>>
>>>> --
>>>>
>>>> *Kjetil Halvorsen*
>>>> Chief Architect, Enterprise Integration
>>>> +47 48 01 13 75 | [email protected]
>>>> www.cognite.com | LIBERATE YOUR DATA™
>>>>
>>>>
>>
>> --
>>
>> *Kjetil Halvorsen*
>> Chief Architect, Enterprise Integration
>> +47 48 01 13 75 | [email protected]
>> www.cognite.com | LIBERATE YOUR DATA™
>>
>>

-- 

*Kjetil Halvorsen*
Chief Architect, Enterprise Integration
+47 48 01 13 75 | [email protected]
www.cognite.com | LIBERATE YOUR DATA™

Attachment: TsPointsStreamingFail.java
Description: Binary data

Reply via email to