Re: [structured-streaming]How to reset Kafka offset in readStream and read from beginning
You can delete the write ahead log directory you provided to the sink via the “checkpointLocation” option. From: karthikjaySent: Tuesday, May 22, 2018 7:24:45 AM To: user@spark.apache.org Subject: [structured-streaming]How to reset Kafka offset in readStream and read from beginning I have the following readstream in Spark structured streaming reading data from Kafka val kafkaStreamingDF = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "...") .option("subscribe", "testtopic") .option("failOnDataLoss", "false") .option("startingOffsets","earliest") .load() .selectExpr("CAST(value as STRING)", "CAST(topic as STRING)") As far as I know, every time I start the job, underneath the covers, Spark created new consumer, new consumer group and retrieves the last successful offset for the job(using the job name ?) and seeks to that offset and start reading from there. Is that the case ? If yes, how do I reset the offset to start and force my job to read from beginning ? -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: [Structured Streaming] [Kafka] How to repartition the data and distribute the processing among worker nodes
The primary role of a sink is storing output tuples. Consider groupByKey and map/flatMapGroupsWithState instead. -Chris From: karthikjaySent: Friday, April 20, 2018 4:49:49 PM To: user@spark.apache.org Subject: [Structured Streaming] [Kafka] How to repartition the data and distribute the processing among worker nodes Any help appreciated. please find the question in the link: https://stackoverflow.com/questions/49951022/spark-structured-streaming-with-kafka-how-to-repartition-the-data-and-distribu -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Writing record once after the watermarking interval in Spark Structured Streaming
The watermark is just a user-provided indicator to spark that it's ok to drop internal state after some period of time. The watermark "interval" doesn't directly dictate whether hot rows are sent to a sink. Think of a hot row as data younger than the watermark. However, the watermark will prevent cold rows from being fully processed and sent to the sink (e.g., rows older than the watermark). There is no notion of requesting all data be queued and released only after the watermark has advanced past the time-based groups in that queue. If you want to ensure only one row per time-based group is sent to the sink, you could get fancy with timeouts and flatMapGroupsWithState. Keep in mind, even in this scenario, the same row may be sent more than once if a micro-batch is reprocessed (this is why it is important for sinks to be idempotent, because it's really at-least-once effectively exactly-once). In general, I would assume you care about this fine-grained control because your sink is not idempotent. -Chris From: karthikjaySent: Thursday, March 29, 2018 5:10:09 PM To: user@spark.apache.org Subject: Writing record once after the watermarking interval in Spark Structured Streaming I have the following query: val ds = dataFrame .filter(! $"requri".endsWith(".m3u8")) .filter(! $"bserver".contains("trimmer")) .withWatermark("time", "120 seconds") .groupBy(window(dataFrame.col("time"),"60 seconds"),col("channelName")) .agg(sum("bytes")/100 as "byte_count") How do I implement a foreach writer so that its process method is triggered only once for every watermarking interval. i.e in the aforementioned example, I will get the following 10.00-10.01 Channel-1 100(bytes) 10.00-10.01 Channel-2 120(bytes) 10.01-10.02 Channel-1 110(bytes) ... -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Structured Streaming Spark 2.3 Query
Use a streaming query listener that tracks repetitive progress events for the same batch id. If x amount of time has elapsed given repetitive progress events for the same batch id, the source is not providing new offsets and stream execution is not scheduling new micro batches. See also: spark.sql.streaming.pollingDelay. Alternative methods may produce less than desirable results due to specific characteristics of a source / sink / workflow. It may be more desirable to represent the amount of time as the number of repetitive progress events to be more forgiving of implementation details (e.g., kafka source has internal retry attempts to determine latest offsets and sleeps in between attempts if there is a miss when asked for new data, etc.). -Chris From: Aakash BasuSent: Thursday, March 22, 2018 10:45:38 PM To: user Subject: Structured Streaming Spark 2.3 Query Hi, What is the way to stop a Spark Streaming job if there is no data inflow for an arbitrary amount of time (eg: 2 mins)? Thanks, Aakash.
Re: Structured Streaming - Kafka
https://issues.apache.org/jira/browse/SPARK-19853, pr by eow From: Shixiong(Ryan) Zhu <shixi...@databricks.com> Sent: Tuesday, March 7, 2017 2:04:45 PM To: Bowden, Chris Cc: user; Gudenkauf, Jack Subject: Re: Structured Streaming - Kafka Good catch. Could you create a ticket? You can also submit a PR to fix it if you have time :) On Tue, Mar 7, 2017 at 1:52 PM, Bowden, Chris <chris.bow...@hpe.com<mailto:chris.bow...@hpe.com>> wrote: Potential bug when using startingOffsets = SpecificOffsets with Kafka topics containing uppercase characters? KafkaSourceProvider#L80/86: val startingOffsets = caseInsensitiveParams.get(STARTING_OFFSETS_OPTION_KEY).map(_.trim.toLowerCase) match { case Some("latest") => LatestOffsets case Some("earliest") => EarliestOffsets case Some(json) => SpecificOffsets(JsonUtils.partitionOffsets(json)) case None => LatestOffsets } Topics in JSON get lowered so underlying assignments in the consumer are incorrect, and the assertion in KafkaSource#L326 triggers: private def fetchSpecificStartingOffsets( partitionOffsets: Map[TopicPartition, Long]): Map[TopicPartition, Long] = { val result = withRetriesWithoutInterrupt { // Poll to get the latest assigned partitions consumer.poll(0) val partitions = consumer.assignment() consumer.pause(partitions) assert(partitions.asScala == partitionOffsets.keySet, "If startingOffsets contains specific offsets, you must specify all TopicPartitions.\n" + "Use -1 for latest, -2 for earliest, if you don't care.\n" + s"Specified: ${partitionOffsets.keySet} Assigned: ${partitions.asScala}")
Structured Streaming - Kafka
Potential bug when using startingOffsets = SpecificOffsets with Kafka topics containing uppercase characters? KafkaSourceProvider#L80/86: val startingOffsets = caseInsensitiveParams.get(STARTING_OFFSETS_OPTION_KEY).map(_.trim.toLowerCase) match { case Some("latest") => LatestOffsets case Some("earliest") => EarliestOffsets case Some(json) => SpecificOffsets(JsonUtils.partitionOffsets(json)) case None => LatestOffsets } Topics in JSON get lowered so underlying assignments in the consumer are incorrect, and the assertion in KafkaSource#L326 triggers: private def fetchSpecificStartingOffsets( partitionOffsets: Map[TopicPartition, Long]): Map[TopicPartition, Long] = { val result = withRetriesWithoutInterrupt { // Poll to get the latest assigned partitions consumer.poll(0) val partitions = consumer.assignment() consumer.pause(partitions) assert(partitions.asScala == partitionOffsets.keySet, "If startingOffsets contains specific offsets, you must specify all TopicPartitions.\n" + "Use -1 for latest, -2 for earliest, if you don't care.\n" + s"Specified: ${partitionOffsets.keySet} Assigned: ${partitions.asScala}")
Catalyst Expression(s) - Cleanup
Is there currently any way to receive a signal when an Expression will no longer receive any rows so internal resources can be cleaned up? I have seen Generators are allowed to terminate() but my Expression(s) do not need to emit 0..N rows.
FunctionRegistry
Thoughts on exposing FunctionRegistry via ExperimentalMethods? I have functionality which can not be expressed efficiently via UDFs, consequently I implement my own Expressions. Currently I have to lift access to FunctionRegistry in my project(s) within org.apache.spark.sql.*. I also have to duplicate a number of functions to reconstruct FunctionRegistry.expression's behavior. If we allow public modification of strategies and optimizations is there much risk in allowing direct access to register Expressions that can't be succinctly or efficiently represented as high level UDFs?