> Where exactly would I see the start/end offset values per batch, is that
in the spark logs?

Yes, it's in the Spark logs. Please see this:
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#reading-metrics-interactively

On Mon, May 13, 2019 at 10:53 AM Austin Weaver <aus...@flyrlabs.com> wrote:

> Hi Akshay,
>
> Thanks very much for the reply!
>
> 1) The topics have 12 partitions (both input and output)
> 2-3) I read that "trigger" is used for microbatching, but it you would
> like the stream to truly process as a "stream" as quickly as possible, then
> to leave this opted out? In any case, I am using a minimum
> maxOffsetsPerTrigger (varying from 20-200 just to test this argument)
>
> Could you please link me to documentation or an example of the progress
> status display you mention? I see a horizontal status bar on my Spark UI
> for the running job but it doesn't appear to give me any specific metrics
> other than a ui display. Where exactly would I see the start/end offset
> values per batch, is that in the spark logs? Any example (documentation,
> sample log) would be very helpful to point me in the right direction. I
> think debugging the offsets per batch is exactly what I need right now.
>
> Thanks,
> Austin
>
>
> On Wed, May 8, 2019 at 9:59 AM Akshay Bhardwaj <
> akshay.bhardwaj1...@gmail.com> wrote:
>
>> Hi Austin,
>>
>> A few questions:
>>
>>    1. What is the partition of the kafka topic that used for input and
>>    output data?
>>    2. In the write stream, I will recommend to use "trigger" with a
>>    defined interval, if you prefer micro-batching strategy,
>>    3. along with defining "maxOffsetsPerTrigger" in kafka readStream
>>    options, which lets you choose the amount of messages you want per 
>> trigger.
>>    (Helps in maintaining the expected threshold of executors/memory for the
>>    cluster)
>>
>> For repeated log messages, notice in your logs the streaming query
>> progress published. This progress status displays a lot of metrics that
>> shall be your first diagnosis to identify issues.
>> The progress status with kafka stream displays the "startOffset" and
>> "endOffset" values per batch. This is listed topic-partition wise the start
>> to end offsets per trigger batch of streaming query.
>>
>>
>> Akshay Bhardwaj
>> +91-97111-33849
>>
>>
>> On Tue, May 7, 2019 at 8:02 PM Austin Weaver <aus...@flyrlabs.com> wrote:
>>
>>> Hey Spark Experts,
>>>
>>> After listening to some of you, and the presentations at Spark Summit in
>>> SF, I am transitioning from d-streams to structured streaming however I am
>>> seeing some weird results.
>>>
>>> My use case is as follows: I am reading in a stream from a kafka topic,
>>> transforming a message, and writing the transformed message to another
>>> kafka topic.
>>>
>>> While running my stream, I can see the transformed messages on the
>>> output topic so I know the basic structure of my stream seems to be running
>>> as intended.
>>>
>>> Inside my transformation, I am logging the total transform time as well
>>> as the raw message being transformed. (Java by the way)
>>>
>>> The 2 weird things I am seeing:
>>> 1) I am seeing that the consumer lag for this particular consumer group
>>> on the input topic is increasing. This does not make sense to me - looking
>>> at the transform time from the logs, it should easily be able to handle the
>>> incoming feed. To give an example the transform times are < 10 ms per
>>> record and the sample of data does not contain > 100 messages per second.
>>> The stream should be reducing consumer lag as it runs (especially
>>> considering multiple workers and partitions)
>>>
>>> 2) I am seeing the same log transformation messages over and over on the
>>> dataproc spark cluster logs. For example, I am currently looking at my logs
>>> and the last 20+ log messages are the exact same
>>>
>>> I thought 2 may be due to offsets not being handled correctly, but I am
>>> seeing a reasonable range of transformed messages on the target topic, and
>>> I'm using the built in checkpointing for spark to handle the offsets for me.
>>>
>>> In terms of 1, why would I be seeing the same log messages over and
>>> over? It doesnt make sense to me - wouldnt the message only be transformed
>>> once and it's offset committed?
>>>
>>> If anything stands out as incorrect, or something you've seen please let
>>> me know - this is rather new to me and my code seems to be following the
>>> same as other examples I see across the net
>>>
>>> Here's a redacted snippet of my stream:
>>>
>>> spark.readStream().format("kafka").option("kafka.bootstrap.servers",
>>> "XXXXX")
>>>         .option("kafka.partition.assignment.strategy",
>>> RoundRobinAssignor.class.getName())
>>>         .option("subscribe", ""XXXX"")
>>>         .option("startingOffsets", "earliest")
>>>         .load()
>>>         .select("value")
>>>         .as(Encoders.STRING())
>>>         .map((MapFunction<String, String>) value -> transform(value),
>>> Encoders.STRING())
>>>         .writeStream()
>>>         .format("kafka")
>>>         .option("kafka.bootstrap.servers", "XXXXX")
>>>         .option("topic", ""XXXXX"")
>>>         .outputMode("append")
>>>         .option("checkpointLocation", "/checkpoints/testCheckpoint")
>>>         .start()
>>>         .awaitTermination();
>>>
>>> Thanks!
>>> Austin
>>>
>>
>
> --
> Austin Weaver
> Software Engineer
> FLYR, Inc.   www.flyrlabs.com
>

Reply via email to