There could very well be a bug in KafkaIO, but what you have described so
far does not necessarily show that. Please file jira with details about
reproducing the problem.

couple of things you can try :

   - run your pipeline with 10 files for TextIO.
   - With KafkaIO, remove your DoFn, and instead run something like this:
      -
      
pipeline.apply(KafkaIO.read()...withMaxReadTime(10.minutes).withoutMetadata())
              .apply(Count<String>.globally())
              .apply(ParDo.of(new DoFn<Long, Void>() {
                 @override void processElement(Context ctx) {
                   LOG.info("Read {} records from Kafka", ctx.element());
                 }
               })


On Tue, Aug 9, 2016 at 9:24 PM, amir bahmanyari <[email protected]> wrote:

> Hi Thomas,
> I removed the KafkaIO() call, and replaced it with TextIO() reading data
> records from file system.
> *Works perfect* :-( Not sure to be happy or sad...all this time I proved
> kafka itself was not sending duplicate records.
> But it seems like KafkaIO() has the brain of its own.
>
> Bottom-line: The difference is KafkaIO().....Its probably intermittently
> sending duplicates which I could not catch during my testing.
>
> Anyone can suggest a way to prevent KafkaIO() from re-sending to
> processElement() pls?
> Thanks.
>
> p.apply(TextIO.Read.from("/tmp/10m1x1K.dat"))
> .apply("PseudoLRDoFn", ParDo.of(new DoFn<String, String>() {
>
> ------------------------------
> *From:* Thomas Groh <[email protected]>
> *To:* [email protected]; amir bahmanyari <[email protected]>
>
> *Sent:* Tuesday, August 9, 2016 1:22 PM
> *Subject:* Re: Is Beam pipeline runtime behavior inconsistent?
>
> ConcurrentHashMaps can be interacted with in a way that does not preserve
> the intended semantics. If you are using exclusively atomic mutation
> operations (putIfAbsent(K, V), remove(K, V), replace(K, V, V)), you can
> ensure that the mutation semantics are obtained; however, using a
> ConcurrentMap purely like a map can cause Time-of-check-time-of-use errors.
> Otherwise, ConcurrentMaps provide happens-before and visibility guarantees
> only.
>
> For the second question, this is mainly about interacting with mutable
> per-element state - if you interact with, for example, mutable instance
> fields that have a base and a current state, the base state must be reset
> per-element. It doesn't sound like this is your problem.
>
> On Tue, Aug 9, 2016 at 11:31 AM, amir bahmanyari <[email protected]>
> wrote:
>
> Hi Thomas,
> I spent time to digest all of this. I think I understand it to a good
> extent.
> The only hang up I still have is controlling the execution trajectory with
> persisting state which you say its not guaranteed in Beam.
> Have some further questions* Q* below & appreciate your valuable time to
> respond to them. I reiterated your statements in " " for quick reference
> above them.
>
> "We do not encourage sharing objects between DoFn instances, and any
> shared state must be accessed in a thread-safe manner, and modifications to
> shared state should be idempotent, as otherwise retries and speculative
> execution may cause that state to be inconsistent."
> *Q*: I persisted state in (single instance) Redis. I got varying result
> at each run.
> I then replaced Redis with java (static) ConcurrentHashMaps which are
> automatically thread safe. Interesting enough, the very first run after
> this change produced precise result & I thought I GOT IT! Re-run, and I got
> varying results again till this moment I am typing this email. How would
> you suggest to "any shared state must be accessed in a thread-safe manner"
> different than using Concurrent HashMaps?
>
>
> "A DoFn will be reused for multiple elements across a single bundle, and
> may be reused across multiple bundles - if you require the DoFn to be
> "fresh" per element, it should perform any required setup at the start of
> the ProcessElement method."
> *Q*: What do you suggest to "it should perform any required setup at the
> start of the ProcessElement method."?
> I can think of persisting the DoFn Obj's HashCode at the Object class
> level (every-time ProcessElement is invoked)  & compare it later on for
> uniqueness with Object's equals(Obj). It gets a little hairy when
> "parallelism" manifests in execution I know.
> I appreciate your suggestions.
>
>
> Thanks+have a great day.
> Amir
>
>
> ------------------------------
> *From:* Thomas Groh <[email protected]>
> *To:* [email protected] ; amir bahmanyari <
> [email protected]>
> *Sent:* Monday, August 8, 2016 1:44 PM
> *Subject:* Re: Is Beam pipeline runtime behavior inconsistent?
>
> There's no way to guarantee that exactly one record is processed at a
> time. This is part of the design of ParDo to work efficiently across
> multiple processes and machines[1], where multiple instances of a DoFn must
> exist in order for progress to be made in a timely fashion. This includes
> processing the same element across multiple machines at the same time, with
> only one of the results being available in the output PCollection, as well
> as retries of failed elements.
>
> A runner is required to interact with a DoFn instance in a single-threaded
> manner - however, it is permitted to have multiple different DoFn instances
> active within a single process and across processes at any given time (for
> the same reasons as above). There's no support in the Beam model to
> restrict this type of execution. We do not encourage sharing objects
> between DoFn instances, and any shared state must be accessed in a
> thread-safe manner, and modifications to shared state should be idempotent,
> as otherwise retries and speculative execution may cause that state to be
> inconsistent. A DoFn will be reused for multiple elements across a single
> bundle, and may be reused across multiple bundles - if you require the DoFn
> to be "fresh" per element, it should perform any required setup at the
> start of the ProcessElement method.
>
> The best that can be done if it is absolutely required to restrict
> processing to a single element at a time would be to group all of the
> elements to a single key. Note that this will not solve the problem in all
> cases, as a runner is permitted to execute the group of elements multiple
> times so long as it only takes one completed bundle as the result, and
> additionally this removes the ability of the runner to balance work and
> introduces a performance bottleneck. To do so, you would key the inputs to
> a single static key and apply a GroupByKey, running the processing method
> on the output Iterable produced by the GroupByKey (directly; expanding the
> input iterable in a separate PCollection allows a runner to rebalance the
> elements, which will reintroduce parallelism)`.
>
> [1] https://github.com/apache/ incubator-beam/blob/master/
> sdks/java/core/src/main/java/ org/apache/beam/sdk/
> transforms/ParDo.java#L360
> <https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java#L360>
>
> On Mon, Aug 8, 2016 at 12:46 PM, amir bahmanyari <[email protected]>
> wrote:
>
> Hi Thomas,
> Thanks so much for your response. Here are answers to your questions.
> You have a specific collection of records stored in Kafka. You run your
> pipeline, and observe duplicate elements. Is that accurate?
>
> ==>> I send records to Kafka from my laptop. I use KafkaIO() to receive
> the records. I have confirmed that I dont get duplicates from Kafka.
> However,
> for some reason, certain parts of my code execute beyond the actual number
> of expected number of records, and subsequently produce extra resulting
> data.
> I tried playing with the Triggering. Stretching the window interval,
> DiscardingFiredPanes etc. all kinds of modes.
> Same.  How can I guarantee that one record at a time executes in one
> unique instance of the inner class object?
> I have all the shared objects synchronized and am using Java concurrent
> hashmaps. How can I guarantee synchronized operations amongst "parallel
> pipelines"? Analogous to multiple threads accessing a shared object and
> trying to modify it...
>
> Here is my current KafkaIO() call:
> PCollection<String> kafkarecords = p.apply(KafkaIO.read().
> withBootstrapServers(" kafkahost:9092").withTopics( topics).
> withValueCoder( StringUtf8Coder.of()). withoutMetadata()).apply(
> Values.<String>create()). apply(Window.<String>into(
> FixedWindows.of(Duration. standardMinutes(1)))
>          .triggering(AfterWatermark. pastEndOfWindow()).
> withAllowedLateness(Duration. ZERO)
>    .discardingFiredPanes());
>
>     kafkarecords.apply(ParDo. named("ProcessLRKafkaData"). of(new
> DoFn<String, String>() {.//I expect one record at a time to one object here
> ------------------------------ ------------------------------
> ------------------------------ ------------------------------
> -----------------------
>
> Have you confirmed that you're getting duplicate records via other library
> transforms (such as applying Count.globally() to k afkarecords)?
> ==>>No duplicates from Kafka.
> ------------------------------ ------------------------------
> ------------------------------ ------------------------------
> -----------------------
> Additionally, I'm not sure what you mean by "executes till a record lands
> on method"
> ==>>Sorry for my confusing statement. Like I mentioned above, I expect
> each record coming from Kafka gets assigned to one instance of the inner
> class and therefore one instance of the pipeline executed it in parallel
> with others executing their own unique records.
>
> ------------------------------ ------------------------------
> ------------------------------ ------------------------------
> -----------------------
>
> Additionally additionally, is this reproducible if you execute with the
> DirectRunner?
> ==>>I have not tried DirectRunner. Should I?
>
> Thanks so much Thomas.
>
>
> ------------------------------
> *From:* Thomas Groh <[email protected]>
> *To:* [email protected] ; amir bahmanyari <
> [email protected]>
> *Sent:* Monday, August 8, 2016 11:43 AM
> *Subject:* Re: Is Beam pipeline runtime behavior inconsistent?
>
> Just to make sure I understand the problem:
>
> You have a specific collection of records stored in Kafka. You run your
> pipeline, and observe duplicate elements. Is that accurate?
>
> Have you confirmed that you're getting duplicate records via other library
> transforms (such as applying Count.globally() to kafkarecords)?
>
> Additionally, I'm not sure what you mean by "executes till a record lands
> on method"
>
> Additionally additionally, is this reproducible if you execute with the
> DirectRunner?
>
>
> On Sun, Aug 7, 2016 at 11:44 PM, amir bahmanyari <[email protected]>
> wrote:
>
> Hi Colleagues,
> I refrained from posting this email before completing thorough testing.
> I think I did.
> My core code works perfect & produces the expect result every single time
> without wrapping it with Beam KafkaIO to receive the data.
> Without KafkaIO, it receives the records from a flat data file. I repeated
> it and it always produced the right result.
> With including a Beam KarkaIO and embedding exact same code in a anonymous
> class running Beam pipelines, I get a different result every time I rerun
> it.
> Below is the snippet from where KafkaIO executes till a record lands on
> method.
> Kafka sends precise number of records. No duplicates. all good.
> While executing in Beam, when the records are finished & I expect a
> correct result, it always produces something different.
> Different in different runs.
> I appreciate shedding light on this issue.  And thanks for your valuable
> time as always.
> Amir-
>
> public static synchronized void main(String[] args) throws Exception {
>
> // Create Beam Options for the Flink Runner.
> FlinkPipelineOptions options = PipelineOptionsFactory.as(
> FlinkPipelineOptions.class);
> // Set the Streaming engine as FlinkRunner
> options.setRunner( FlinkPipelineRunner.class);
> // This is a Streaming process (as opposed to Batch=false)
> options.setStreaming(true);
> //Create the DAG pipeline for parallel processing of independent LR records
> Pipeline p = Pipeline.create(options);
> //Kafka broker topic is identified as "lroad"
> List<String> topics = Arrays.asList("lroad");
>
> PCollection<String> kafkarecords = p.apply(KafkaIO.read().
> withBootstrapServers(" kafkahost:9092").withTopics( topics).
> withValueCoder( StringUtf8Coder.of()). withoutMetadata()).apply(
> Values.<String>create()). apply(Window.<String>into(
> FixedWindows.of(Duration. standardMinutes(1)))
>          .triggering(AfterWatermark. pastEndOfWindow()).
> withAllowedLateness(Duration. ZERO)
>    .accumulatingFiredPanes());
>
>     kafkarecords.apply(ParDo. named("ProcessLRKafkaData"). of(new
> DoFn<String, String>() {
>
>                         public void processElement(ProcessContext ctx)
> throws Exception {
>
>                                         *My core logic code here.*
> }));
> .
> .
> p.run(); // Start Beam Pipeline(s) in FlinkC Cluster
> } // of main
> }// of class
>
>
>
>
>
>
>
>
>
>
>

Reply via email to