Made it work e2e. Thank you all for the help!

On Wed, Oct 14, 2020 at 3:48 PM Piotr Filipiuk <[email protected]>
wrote:

> Got it, thank you for the clarification.
>
> I tried to run the pipeline locally, with the following main (see full
> source code attached):
>
> public static void main(String[] args) {
>   PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
>   Pipeline pipeline = Pipeline.create(options);
>   logger.info("running");
>
>   PCollection<KV<String, Long>> output =
>       FileProcessing.getData(
>           pipeline, "/tmp/input/*", Duration.standardSeconds(1), 
> Growth.never());
>
>   output
>       .apply("Window", Window.into(FixedWindows.of(Duration.standardDays(1))))
>       .apply("LogWindowed", Log.ofElements("testGetData"))
>       .apply(Sum.longsPerKey())
>       .apply(
>           "FormatResults",
>           MapElements.into(TypeDescriptors.strings())
>               .via((KV<String, Long> kv) -> String.format("{},{}", 
> kv.getKey(), kv.getValue())))
>       .apply("LogResults", Log.ofElements("results"))
>       .apply(
>           TextIO.write()
>               .to(Paths.get("/tmp/output/").resolve("Results").toString())
>               .withWindowedWrites()
>               .withNumShards(1));
>
>   pipeline.run();
> }
>
>
> Then I am generating files using:
>
> for i in {01..30}; do echo "handling $i"; echo "1\n2\n3\n4" >
> /tmp/input/1985-10-$i; sleep 2; touch /tmp/input/1985-10-$i.complete; sleep
> 2; done
>
> I do not see any outputs being generated though. Can you elaborate why
> that might be? I would suspect that once the watermark is set to day+1, the
> results of the previous day should be finalized and hence the result for a
> given window should be outputted.
>
> On Wed, Oct 14, 2020 at 1:41 PM Luke Cwik <[email protected]> wrote:
>
>> I think you should be using the largest "complete" timestamp from the
>> metadata results and not be setting the watermark if you don't have one.
>>
>> On Wed, Oct 14, 2020 at 11:47 AM Piotr Filipiuk <[email protected]>
>> wrote:
>>
>>> Thank you so much for the input, that was extremely helpful!
>>>
>>> I changed the pipeline from using FileIO.match() into using a custom
>>> matching (very similar to the FileIO.match()) that looks as follows:
>>>
>>> static PCollection<KV<String, Long>> getData(
>>>     Pipeline pipeline,
>>>     String filepattern,
>>>     Duration pollInterval,
>>>     TerminationCondition terminationCondition) {
>>>   final Growth<String, MatchResult.Metadata, String> 
>>> stringMetadataStringGrowth =
>>>       Watch.growthOf(
>>>               Contextful.of(new MatchPollFn(), Requirements.empty()), new 
>>> ExtractFilenameFn())
>>>           .withPollInterval(pollInterval)
>>>           .withTerminationPerInput(terminationCondition);
>>>   return pipeline
>>>       .apply("Create filepattern", Create.<String>of(filepattern))
>>>       .apply("Continuously match filepatterns", stringMetadataStringGrowth)
>>>       .apply(Values.create())
>>>       .apply(ParDo.of(new ReadFileFn()));
>>> }
>>>
>>> private static class MatchPollFn extends PollFn<String, Metadata> {
>>>   private static final Logger logger = 
>>> LoggerFactory.getLogger(MatchPollFn.class);
>>>
>>>   @Override
>>>   public Watch.Growth.PollResult<MatchResult.Metadata> apply(String 
>>> element, Context c)
>>>       throws Exception {
>>>     // Here we only have the filepattern i.e. element, and hence we do not 
>>> know what the timestamp
>>>     // and/or watermark should be. As a result, we output EPOCH as both the 
>>> timestamp and the
>>>     // watermark.
>>>     Instant instant = Instant.EPOCH;
>>>     return Watch.Growth.PollResult.incomplete(
>>>             instant, FileSystems.match(element, 
>>> EmptyMatchTreatment.ALLOW).metadata())
>>>         .withWatermark(instant);
>>>   }
>>> }
>>>
>>> private static class ExtractFilenameFn implements 
>>> SerializableFunction<Metadata, String> {
>>>   @Override
>>>   public String apply(MatchResult.Metadata input) {
>>>     return input.resourceId().toString();
>>>   }
>>> }
>>>
>>> The above together with fixing the bugs that Luke pointed out (Thank you
>>> Luke!), makes the unit test pass.
>>>
>>> Thank you again!
>>>
>>> If you have any feedback for the current code, I would appreciate it. I
>>> am especially interested whether setting event time and watermark in
>>> *MatchPollFn* to *EPOCH* is a correct way to go.
>>>
>>>
>>> On Wed, Oct 14, 2020 at 9:49 AM Reuven Lax <[email protected]> wrote:
>>>
>>>> FYI this is a major limitation in FileIO.match's watermarking ability.
>>>> I believe there is a JIRA issue about this, but nobody has ever worked on
>>>> improving it.
>>>>
>>>> On Wed, Oct 14, 2020 at 9:38 AM Luke Cwik <[email protected]> wrote:
>>>>
>>>>> FileIO.match doesn't allow one to configure how the watermark advances
>>>>> and it assumes that the watermark during polling is always the current
>>>>> system time[1].
>>>>>
>>>>> Because of this the downstream watermark advancement is limited. When
>>>>> an element and restriction starts processing, the maximum you can hold the
>>>>> output watermark back by for this element and restriction pair is limited
>>>>> to the current input watermark (a common value to use is the current
>>>>> element's timestamp as the lower bound for all future output but if that
>>>>> element is late the output you produce may or may not be late (depends on
>>>>> downstream windowing strategy)). Holding this watermark back is important
>>>>> since many of these elements and restrictions could be processed in
>>>>> parallel at different rates.
>>>>>
>>>>> Based upon your implementation, you wouldn't need to control the
>>>>> watermark from the file reading splittable DoFn if FileIO.match allowed 
>>>>> you
>>>>> to say what the watermark is after each polling round and allowed you to
>>>>> set the timestamp for each match found. This initial setting of the
>>>>> watermark during polling would be properly handled by the runner to block
>>>>> watermark advancement for those elements.
>>>>>
>>>>> Minor comments not related to your issue but would improve your
>>>>> implementation:
>>>>> 1) Typically you set the watermark right before returning. You are
>>>>> missing this from the failed tryClaim loop return.
>>>>> 2) You should structure your loop not based upon the end of the
>>>>> current restriction but continue processing till tryClaim fails. For
>>>>> example:
>>>>>       @ProcessElement
>>>>>       public void processElement(@Element String fileName,
>>>>> RestrictionTracker<OffsetRange, Long> tracker, OutputReceiver<Integer>
>>>>> outputReceiver) throws IOException {
>>>>>         RandomAccessFile file = new RandomAccessFile(fileName, "r");
>>>>>         seekToNextRecordBoundaryInFile(file,
>>>>> tracker.currentRestriction().getFrom());
>>>>>         while (tracker.tryClaim(file.getFilePointer())) {
>>>>>           outputReceiver.output(readNextRecord(file));
>>>>>         }
>>>>>       }
>>>>> 3) ensureTimestampWithinBounds is dangerous as you're masking a
>>>>> possible data issue since the code either parsed some filename 
>>>>> incorrectly.
>>>>> It is likely that you copied this from Beam code and it is used there
>>>>> because user implementations of UnboundedSource were incorrectly setting
>>>>> the watermark outside of the bounds and there is no way to fix them.
>>>>>
>>>>> 1:
>>>>> https://github.com/apache/beam/blob/29787b38b594e29428adaf230b45f9b33e24fa66/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L666
>>>>>
>>>>> On Tue, Oct 13, 2020 at 6:04 PM Piotr Filipiuk <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> Thank you for a quick response. I tried to follow the doc attached
>>>>>> and read existing Beam code that uses the Splittable DoFns, and I made 
>>>>>> some
>>>>>> progress.
>>>>>>
>>>>>> I created a simple pipeline that matches given filepattern, and uses
>>>>>> splittable dofn to control event times and watermarks. The pipeline 
>>>>>> expects
>>>>>> files with the following name patterns:
>>>>>>
>>>>>> *yyyy-MM-dd*
>>>>>>
>>>>>> *yyyy-MM-dd.complete*
>>>>>>
>>>>>> Every time it sees *yyyy-MM-dd*, it reads its contents and outputs
>>>>>> lines of the file using *outputWithTimestamp(..., timestamp)*.
>>>>>> Additionally, it calls *watermarkEstimator.setWatermark(timestamp)*.
>>>>>> In both cases the timestamp is *yyyy-MM-ddT00:00:00.000Z*.
>>>>>>
>>>>>> Once the pipeline matches *yyyy-MM-dd.complete* (which is empty) it
>>>>>> calls *watermarkEstimator.setWatermark(timestamp)*, where timestamp
>>>>>> is *yyyy-MM-ddT00:00:00.000Z plus one day* - hence it advances to
>>>>>> the next day.
>>>>>>
>>>>>> I am at the point when the following unit test fails the inWindow()
>>>>>> assertions, the last assertion passes. It seems that even though I
>>>>>> call watermarkEstimator.setWatermark() the window is not being closed.
>>>>>>
>>>>>> I would appreciate help/suggestions on what I am missing.
>>>>>>
>>>>>> Here is a unit test. The function being tested is getData() defined
>>>>>> below.
>>>>>>
>>>>>> public void testGetDataWithNewFiles() throws InterruptedException {
>>>>>>   final Duration duration = Duration.standardDays(1);
>>>>>>
>>>>>>   IntervalWindow firstWindow =
>>>>>>       new IntervalWindow(Instant.parse("2020-01-01T00:00:00.000Z"), 
>>>>>> duration);
>>>>>>   logger.info("first window {}", firstWindow);
>>>>>>   IntervalWindow secondWindow =
>>>>>>       new IntervalWindow(Instant.parse("2020-01-02T00:00:00.000Z"), 
>>>>>> duration);
>>>>>>   logger.info("second window {}", secondWindow);
>>>>>>
>>>>>>   MatchConfiguration matchConfiguration =
>>>>>>       MatchConfiguration.create(EmptyMatchTreatment.DISALLOW)
>>>>>>           .continuously(
>>>>>>               Duration.millis(100),
>>>>>>               
>>>>>> Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(5)));
>>>>>>
>>>>>>   PCollection<KV<String, Long>> output =
>>>>>>       FileProcessing.getData(p, tmpFolder.getRoot().getAbsolutePath() + 
>>>>>> "/*", matchConfiguration)
>>>>>>           .apply("Window", Window.into(FixedWindows.of(duration)))
>>>>>>           .apply("LogWindowedResult", Log.ofElements("testGetData"));
>>>>>>
>>>>>>   assertEquals(PCollection.IsBounded.UNBOUNDED, output.isBounded());
>>>>>>
>>>>>>   Thread writer =
>>>>>>       new Thread(
>>>>>>           () -> {
>>>>>>             try {
>>>>>>               Thread.sleep(1000);
>>>>>>
>>>>>>               Path firstPath = tmpFolder.newFile("2020-01-01").toPath();
>>>>>>               Files.write(firstPath, Arrays.asList("1", "2", "3"));
>>>>>>
>>>>>>               Thread.sleep(1000);
>>>>>>
>>>>>>               Path firstPathComplete = 
>>>>>> tmpFolder.newFile("2020-01-01.complete").toPath();
>>>>>>               Files.write(firstPathComplete, Arrays.asList());
>>>>>>
>>>>>>               Thread.sleep(1000);
>>>>>>
>>>>>>               Path secondPath = tmpFolder.newFile("2020-01-02").toPath();
>>>>>>               Files.write(secondPath, Arrays.asList("4", "5", "6"));
>>>>>>
>>>>>>               Thread.sleep(1000);
>>>>>>
>>>>>>               Path secondPathComplete = 
>>>>>> tmpFolder.newFile("2020-01-02.complete").toPath();
>>>>>>               Files.write(secondPathComplete, Arrays.asList());
>>>>>>
>>>>>>             } catch (IOException | InterruptedException e) {
>>>>>>               throw new RuntimeException(e);
>>>>>>             }
>>>>>>           });
>>>>>>   writer.start();
>>>>>>
>>>>>>   // THIS ASSERTION FAILS, THERE ARE NO ELEMENTS IN THIS WINDOW.
>>>>>>   PAssert.that(output)
>>>>>>       .inWindow(firstWindow)
>>>>>>       .containsInAnyOrder(KV.of("my-key", 1L), KV.of("my-key", 2L), 
>>>>>> KV.of("my-key", 3L));
>>>>>>
>>>>>>   // THIS ASSERTION FAILS, THERE ARE NO ELEMENTS IN THIS WINDOW.
>>>>>>   PAssert.that(output)
>>>>>>       .inWindow(secondWindow)
>>>>>>       .containsInAnyOrder(KV.of("my-key", 4L), KV.of("my-key", 5L), 
>>>>>> KV.of("my-key", 6L));
>>>>>>
>>>>>>   // THIS ASSERTION PASSES.
>>>>>>   PAssert.that(output)
>>>>>>       .containsInAnyOrder(
>>>>>>           KV.of("my-key", 1L),
>>>>>>           KV.of("my-key", 2L),
>>>>>>           KV.of("my-key", 3L),
>>>>>>           KV.of("my-key", 4L),
>>>>>>           KV.of("my-key", 5L),
>>>>>>           KV.of("my-key", 6L));
>>>>>>
>>>>>>   p.run();
>>>>>>
>>>>>>   writer.join();
>>>>>> }
>>>>>>
>>>>>> Here is the code. Essentially, I am using *FileIO.match()* to match
>>>>>> filepattern. Then the file *Metadata* is processed by my custom
>>>>>> Splittable DoFn.
>>>>>>
>>>>>> static PCollection<KV<String, Long>> getData(
>>>>>>     Pipeline pipeline, String filepattern, MatchConfiguration 
>>>>>> matchConfiguration) {
>>>>>>   PCollection<Metadata> matches =
>>>>>>       pipeline.apply(
>>>>>>           
>>>>>> FileIO.match().filepattern(filepattern).withConfiguration(matchConfiguration));
>>>>>>   return matches.apply(ParDo.of(new 
>>>>>> ReadFileFn())).apply(Log.ofElements("Get Data"));
>>>>>> }
>>>>>>
>>>>>> /**
>>>>>>  * Processes matched files by outputting key-value pairs where key is 
>>>>>> equal to "my-key" and values
>>>>>>  * are Long values corresponding to the lines in the file. In the case 
>>>>>> file does not contain one
>>>>>>  * Long per line, IOException is thrown.
>>>>>>  */
>>>>>> @DoFn.BoundedPerElement
>>>>>> private static final class ReadFileFn extends DoFn<Metadata, KV<String, 
>>>>>> Long>> {
>>>>>>   private static final Logger logger = 
>>>>>> LoggerFactory.getLogger(ReadFileFn.class);
>>>>>>
>>>>>>   @ProcessElement
>>>>>>   public void processElement(
>>>>>>       ProcessContext c,
>>>>>>       RestrictionTracker<OffsetRange, Long> tracker,
>>>>>>       ManualWatermarkEstimator<Instant> watermarkEstimator)
>>>>>>       throws IOException {
>>>>>>     Metadata metadata = c.element();
>>>>>>     logger.info(
>>>>>>         "reading {} with restriction {} @ {}",
>>>>>>         metadata,
>>>>>>         tracker.currentRestriction(),
>>>>>>         c.timestamp());
>>>>>>     String filename = metadata.resourceId().toString();
>>>>>>     Instant timestamp = getTimestamp(filename);
>>>>>>     try (BufferedReader br = new BufferedReader(new 
>>>>>> FileReader(filename))) {
>>>>>>       String line;
>>>>>>       for (long lineNumber = 0; (line = br.readLine()) != null; 
>>>>>> ++lineNumber) {
>>>>>>         if (lineNumber < tracker.currentRestriction().getFrom()
>>>>>>             || lineNumber >= tracker.currentRestriction().getTo()) {
>>>>>>           continue;
>>>>>>         }
>>>>>>         if (!tracker.tryClaim(lineNumber)) {
>>>>>>           logger.info("failed to claim {}", lineNumber);
>>>>>>           return;
>>>>>>         }
>>>>>>         c.outputWithTimestamp(KV.of("my-key", Long.parseLong(line)), 
>>>>>> timestamp);
>>>>>>       }
>>>>>>     }
>>>>>>     logger.info("setting watermark to {}", timestamp);
>>>>>>     watermarkEstimator.setWatermark(timestamp);
>>>>>>     logger.info("Finish processing {} in file {}", 
>>>>>> tracker.currentRestriction(), filename);
>>>>>>   }
>>>>>>
>>>>>>   private Instant getTimestamp(String filepath) {
>>>>>>     // Filename is assumed to be either yyyy-MM-dd or 
>>>>>> yyyy-MM-dd.complete.
>>>>>>     String filename = Paths.get(filepath).getFileName().toString();
>>>>>>     int index = filename.lastIndexOf(".complete");
>>>>>>     if (index != -1) {
>>>>>>       // In the case it has a suffix, strip it.
>>>>>>       filename = filename.substring(0, index);
>>>>>>     }
>>>>>>     Instant timestamp =
>>>>>>         Instant.parse(new 
>>>>>> StringBuilder().append(filename).append("T00:00:00.000Z").toString());
>>>>>>     if (index != -1) {
>>>>>>       // In the case it has a suffix i.e. it is complete, fast forward 
>>>>>> to the next day.
>>>>>>       return timestamp.plus(Duration.standardDays(1));
>>>>>>     }
>>>>>>     return timestamp;
>>>>>>   }
>>>>>>
>>>>>>   @GetInitialRestriction
>>>>>>   public OffsetRange getInitialRestriction(@Element Metadata metadata) 
>>>>>> throws IOException {
>>>>>>     long lineCount;
>>>>>>     try (Stream<String> stream = 
>>>>>> Files.lines(Paths.get(metadata.resourceId().toString()))) {
>>>>>>       lineCount = stream.count();
>>>>>>     }
>>>>>>     return new OffsetRange(0L, lineCount);
>>>>>>   }
>>>>>>
>>>>>>   @GetInitialWatermarkEstimatorState
>>>>>>   public Instant getInitialWatermarkEstimatorState(
>>>>>>       @Element Metadata metadata, @Restriction OffsetRange restriction) {
>>>>>>     String filename = metadata.resourceId().toString();
>>>>>>     logger.info("getInitialWatermarkEstimatorState {}", filename);
>>>>>>     // Compute and return the initial watermark estimator state for each 
>>>>>> element and restriction.
>>>>>>     // All subsequent processing of an element and restriction will be 
>>>>>> restored from the existing
>>>>>>     // state.
>>>>>>     return getTimestamp(filename);
>>>>>>   }
>>>>>>
>>>>>>   private static Instant ensureTimestampWithinBounds(Instant timestamp) {
>>>>>>     if (timestamp.isBefore(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
>>>>>>       timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
>>>>>>     } else if (timestamp.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
>>>>>>       timestamp = BoundedWindow.TIMESTAMP_MAX_VALUE;
>>>>>>     }
>>>>>>     return timestamp;
>>>>>>   }
>>>>>>
>>>>>>   @NewWatermarkEstimator
>>>>>>   public WatermarkEstimators.Manual newWatermarkEstimator(
>>>>>>       @WatermarkEstimatorState Instant watermarkEstimatorState) {
>>>>>>     logger.info("newWatermarkEstimator {}", watermarkEstimatorState);
>>>>>>     return new 
>>>>>> WatermarkEstimators.Manual(ensureTimestampWithinBounds(watermarkEstimatorState));
>>>>>>   }
>>>>>> }
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, Oct 8, 2020 at 2:15 PM Luke Cwik <[email protected]> wrote:
>>>>>>
>>>>>>> I'm working on a blog post[1] about splittable dofns that covers
>>>>>>> this topic.
>>>>>>>
>>>>>>> The TLDR; is that FileIO.match() should allow users to control the
>>>>>>> watermark estimator that is used and for your use case you should hold 
>>>>>>> the
>>>>>>> watermark to some computable value (e.g. the files are generated every 
>>>>>>> hour
>>>>>>> so once you know the last file has appeared for that hour you advance 
>>>>>>> the
>>>>>>> watermark to the current hour).
>>>>>>>
>>>>>>> 1:
>>>>>>> https://docs.google.com/document/d/1kpn0RxqZaoacUPVSMYhhnfmlo8fGT-p50fEblaFr2HE/edit#heading=h.fo3wm9qs0vql
>>>>>>>
>>>>>>> On Thu, Oct 8, 2020 at 1:55 PM Piotr Filipiuk <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> I am looking into:
>>>>>>>> https://beam.apache.org/documentation/patterns/file-processing/
>>>>>>>> since I would like to create a continuous pipeline that reads from 
>>>>>>>> files
>>>>>>>> and assigns Event Times based on e.g. file metadata or actual data 
>>>>>>>> inside
>>>>>>>> the file. For example:
>>>>>>>>
>>>>>>>> private static void run(String[] args) {
>>>>>>>>   PipelineOptions options = 
>>>>>>>> PipelineOptionsFactory.fromArgs(args).create();
>>>>>>>>   Pipeline pipeline = Pipeline.create(options);
>>>>>>>>
>>>>>>>>   PCollection<Metadata> matches = pipeline
>>>>>>>>       .apply(FileIO.match()
>>>>>>>>           .filepattern("/tmp/input/*")
>>>>>>>>           .continuously(Duration.standardSeconds(15), 
>>>>>>>> Watch.Growth.never()));
>>>>>>>>   matches
>>>>>>>>       .apply(ParDo.of(new ReadFileFn()))
>>>>>>>>
>>>>>>>>   pipeline.run();
>>>>>>>> }
>>>>>>>>
>>>>>>>> private static final class ReadFileFn extends DoFn<Metadata, String> {
>>>>>>>>   private static final Logger logger = 
>>>>>>>> LoggerFactory.getLogger(ReadFileFn.class);
>>>>>>>>
>>>>>>>>   @ProcessElement
>>>>>>>>   public void processElement(ProcessContext c) throws IOException {
>>>>>>>>     Metadata metadata = c.element();
>>>>>>>>     // I believe c.timestamp() is based on processing time.
>>>>>>>>     logger.info("reading {} @ {}", metadata, c.timestamp());
>>>>>>>>     String filename = metadata.resourceId().toString();
>>>>>>>>     // Output timestamps must be no earlier than the timestamp of the
>>>>>>>>     // current input minus the allowed skew (0 milliseconds).
>>>>>>>>     Instant timestamp = new Instant(metadata.lastModifiedMillis());
>>>>>>>>     logger.info("lastModified @ {}", timestamp);
>>>>>>>>     try (BufferedReader br = new BufferedReader(new 
>>>>>>>> FileReader(filename))) {
>>>>>>>>       String line;
>>>>>>>>       while ((line = br.readLine()) != null) {
>>>>>>>>         c.outputWithTimestamp(line, c.timestamp());
>>>>>>>>       }
>>>>>>>>     }
>>>>>>>>   }
>>>>>>>> }
>>>>>>>>
>>>>>>>> The issue is that when calling c.outputWithTimestamp() I am getting:
>>>>>>>>
>>>>>>>> Caused by: java.lang.IllegalArgumentException: Cannot output with
>>>>>>>> timestamp 1970-01-01T00:00:00.000Z. Output timestamps must be no 
>>>>>>>> earlier
>>>>>>>> than the timestamp of the current input (2020-10-08T20:39:44.286Z) 
>>>>>>>> minus
>>>>>>>> the allowed skew (0 milliseconds). See the 
>>>>>>>> DoFn#getAllowedTimestampSkew()
>>>>>>>> Javadoc for details on changing the allowed skew.
>>>>>>>>
>>>>>>>> I believe this is because MatchPollFn.apply() uses Instant.now() as
>>>>>>>> the event time for the PCollection<Metadata>. I can see that the
>>>>>>>> call to continuously() makes the PCollection unbounded and assigns
>>>>>>>> default Event Time. Without the call to continuously() I can assign the
>>>>>>>> timestamps without problems either via c.outputWithTimestamp or
>>>>>>>> WithTimestamp transform.
>>>>>>>>
>>>>>>>> I would like to know what is the way to fix the issue, and whether
>>>>>>>> this use-case is currently supported in Beam.
>>>>>>>>
>>>>>>>> --
>>>>>>>> Best regards,
>>>>>>>> Piotr
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Best regards,
>>>>>> Piotr
>>>>>>
>>>>>
>>>
>>> --
>>> Best regards,
>>> Piotr
>>>
>>
>
> --
> Best regards,
> Piotr
>


-- 
Best regards,
Piotr

Reply via email to