// Continuous trigger with one-second checkpointing intervaldf.writeStream .format("console") .trigger(Trigger.Continuous("1 second")) .start()
On Tue, 14 May 2019 at 22:10, suket arora <suket...@gmail.com> wrote: > Hey Austin, > > If you truly want to process as a stream, use continuous streaming in > spark using a continous trigger. > Spark by default uses micro batches if you don't specify a trigger. > (trigger processing when next batch arrives) > > Regards, > Suket > > On Mon, 13 May 2019 at 14:23, Austin Weaver <aus...@flyrlabs.com> wrote: > >> Hi Akshay, >> >> Thanks very much for the reply! >> >> 1) The topics have 12 partitions (both input and output) >> 2-3) I read that "trigger" is used for microbatching, but it you would >> like the stream to truly process as a "stream" as quickly as possible, then >> to leave this opted out? In any case, I am using a minimum >> maxOffsetsPerTrigger (varying from 20-200 just to test this argument) >> >> Could you please link me to documentation or an example of the progress >> status display you mention? I see a horizontal status bar on my Spark UI >> for the running job but it doesn't appear to give me any specific metrics >> other than a ui display. Where exactly would I see the start/end offset >> values per batch, is that in the spark logs? Any example (documentation, >> sample log) would be very helpful to point me in the right direction. I >> think debugging the offsets per batch is exactly what I need right now. >> >> Thanks, >> Austin >> >> >> On Wed, May 8, 2019 at 9:59 AM Akshay Bhardwaj < >> akshay.bhardwaj1...@gmail.com> wrote: >> >>> Hi Austin, >>> >>> A few questions: >>> >>> 1. What is the partition of the kafka topic that used for input and >>> output data? >>> 2. In the write stream, I will recommend to use "trigger" with a >>> defined interval, if you prefer micro-batching strategy, >>> 3. along with defining "maxOffsetsPerTrigger" in kafka readStream >>> options, which lets you choose the amount of messages you want per >>> trigger. >>> (Helps in maintaining the expected threshold of executors/memory for the >>> cluster) >>> >>> For repeated log messages, notice in your logs the streaming query >>> progress published. This progress status displays a lot of metrics that >>> shall be your first diagnosis to identify issues. >>> The progress status with kafka stream displays the "startOffset" and >>> "endOffset" values per batch. This is listed topic-partition wise the start >>> to end offsets per trigger batch of streaming query. >>> >>> >>> Akshay Bhardwaj >>> +91-97111-33849 >>> >>> >>> On Tue, May 7, 2019 at 8:02 PM Austin Weaver <aus...@flyrlabs.com> >>> wrote: >>> >>>> Hey Spark Experts, >>>> >>>> After listening to some of you, and the presentations at Spark Summit >>>> in SF, I am transitioning from d-streams to structured streaming however I >>>> am seeing some weird results. >>>> >>>> My use case is as follows: I am reading in a stream from a kafka topic, >>>> transforming a message, and writing the transformed message to another >>>> kafka topic. >>>> >>>> While running my stream, I can see the transformed messages on the >>>> output topic so I know the basic structure of my stream seems to be running >>>> as intended. >>>> >>>> Inside my transformation, I am logging the total transform time as well >>>> as the raw message being transformed. (Java by the way) >>>> >>>> The 2 weird things I am seeing: >>>> 1) I am seeing that the consumer lag for this particular consumer group >>>> on the input topic is increasing. This does not make sense to me - looking >>>> at the transform time from the logs, it should easily be able to handle the >>>> incoming feed. To give an example the transform times are < 10 ms per >>>> record and the sample of data does not contain > 100 messages per second. >>>> The stream should be reducing consumer lag as it runs (especially >>>> considering multiple workers and partitions) >>>> >>>> 2) I am seeing the same log transformation messages over and over on >>>> the dataproc spark cluster logs. For example, I am currently looking at my >>>> logs and the last 20+ log messages are the exact same >>>> >>>> I thought 2 may be due to offsets not being handled correctly, but I am >>>> seeing a reasonable range of transformed messages on the target topic, and >>>> I'm using the built in checkpointing for spark to handle the offsets for >>>> me. >>>> >>>> In terms of 1, why would I be seeing the same log messages over and >>>> over? It doesnt make sense to me - wouldnt the message only be transformed >>>> once and it's offset committed? >>>> >>>> If anything stands out as incorrect, or something you've seen please >>>> let me know - this is rather new to me and my code seems to be following >>>> the same as other examples I see across the net >>>> >>>> Here's a redacted snippet of my stream: >>>> >>>> spark.readStream().format("kafka").option("kafka.bootstrap.servers", >>>> "XXXXX") >>>> .option("kafka.partition.assignment.strategy", >>>> RoundRobinAssignor.class.getName()) >>>> .option("subscribe", ""XXXX"") >>>> .option("startingOffsets", "earliest") >>>> .load() >>>> .select("value") >>>> .as(Encoders.STRING()) >>>> .map((MapFunction<String, String>) value -> transform(value), >>>> Encoders.STRING()) >>>> .writeStream() >>>> .format("kafka") >>>> .option("kafka.bootstrap.servers", "XXXXX") >>>> .option("topic", ""XXXXX"") >>>> .outputMode("append") >>>> .option("checkpointLocation", "/checkpoints/testCheckpoint") >>>> .start() >>>> .awaitTermination(); >>>> >>>> Thanks! >>>> Austin >>>> >>> >> >> -- >> Austin Weaver >> Software Engineer >> FLYR, Inc. www.flyrlabs.com >> >