Re: [structured-streaming]How to reset Kafka offset in readStream and read from beginning

2018-05-22 Thread Bowden, Chris
You can delete the write ahead log directory you provided to the sink via the 
“checkpointLocation” option.

From: karthikjay 
Sent: Tuesday, May 22, 2018 7:24:45 AM
To: user@spark.apache.org
Subject: [structured-streaming]How to reset Kafka offset in readStream and read 
from beginning

I have the following readstream in Spark structured streaming reading data
from Kafka

val kafkaStreamingDF = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "...")
  .option("subscribe", "testtopic")
  .option("failOnDataLoss", "false")
  .option("startingOffsets","earliest")
  .load()
  .selectExpr("CAST(value as STRING)", "CAST(topic as STRING)")

As far as I know, every time I start the job, underneath the covers, Spark
created new consumer, new consumer group and retrieves the last successful
offset for the job(using the job name ?) and seeks to that offset and start
reading from there. Is that the case ? If yes, how do I reset the offset to
start and force my job to read from beginning ?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [Structured Streaming] [Kafka] How to repartition the data and distribute the processing among worker nodes

2018-04-20 Thread Bowden, Chris
The primary role of a sink is storing output tuples. Consider groupByKey and 
map/flatMapGroupsWithState instead.

-Chris

From: karthikjay 
Sent: Friday, April 20, 2018 4:49:49 PM
To: user@spark.apache.org
Subject: [Structured Streaming] [Kafka] How to repartition the data and 
distribute the processing among worker nodes

Any help appreciated. please find the question in the link:

https://stackoverflow.com/questions/49951022/spark-structured-streaming-with-kafka-how-to-repartition-the-data-and-distribu




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Writing record once after the watermarking interval in Spark Structured Streaming

2018-03-29 Thread Bowden, Chris
The watermark is just a user-provided indicator to spark that it's ok to drop 
internal state after some period of time. The watermark "interval" doesn't 
directly dictate whether hot rows are sent to a sink. Think of a hot row as 
data younger than the watermark. However, the watermark will prevent cold rows 
from being fully processed and sent to the sink (e.g., rows older than the 
watermark). There is no notion of requesting all data be queued and released 
only after the watermark has advanced past the time-based groups in that queue.


If you want to ensure only one row per time-based group is sent to the sink, 
you could get fancy with timeouts and flatMapGroupsWithState. Keep in mind, 
even in this scenario, the same row may be sent more than once if a micro-batch 
is reprocessed (this is why it is important for sinks to be idempotent, because 
it's really at-least-once effectively exactly-once).


In general, I would assume you care about this fine-grained control because 
your sink is not idempotent.


-Chris


From: karthikjay 
Sent: Thursday, March 29, 2018 5:10:09 PM
To: user@spark.apache.org
Subject: Writing record once after the watermarking interval in Spark 
Structured Streaming

I have the following query:

val ds = dataFrame
  .filter(! $"requri".endsWith(".m3u8"))
  .filter(! $"bserver".contains("trimmer"))
  .withWatermark("time", "120 seconds")
  .groupBy(window(dataFrame.col("time"),"60
seconds"),col("channelName"))
  .agg(sum("bytes")/100 as "byte_count")

How do I implement a foreach writer so that its process method is triggered
only once for every watermarking interval. i.e in the aforementioned
example, I will get the following

10.00-10.01 Channel-1 100(bytes)
10.00-10.01 Channel-2 120(bytes)
10.01-10.02 Channel-1 110(bytes)
...



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Structured Streaming Spark 2.3 Query

2018-03-23 Thread Bowden, Chris
Use a streaming query listener that tracks repetitive progress events for the 
same batch id. If x amount of time has elapsed given repetitive progress events 
for the same batch id, the source is not providing new offsets and stream 
execution is not scheduling new micro batches. See also: 
spark.sql.streaming.pollingDelay. Alternative methods may produce less than 
desirable results due to specific characteristics of a source / sink / 
workflow. It may be more desirable to represent the amount of time as the 
number of repetitive progress events to be more forgiving of implementation 
details (e.g., kafka source has internal retry attempts to determine latest 
offsets and sleeps in between attempts if there is a miss when asked for new 
data, etc.).


-Chris


From: Aakash Basu 
Sent: Thursday, March 22, 2018 10:45:38 PM
To: user
Subject: Structured Streaming Spark 2.3 Query

Hi,

What is the way to stop a Spark Streaming job if there is no data inflow for an 
arbitrary amount of time (eg: 2 mins)?

Thanks,
Aakash.


Re: Structured Streaming - Kafka

2017-03-07 Thread Bowden, Chris
https://issues.apache.org/jira/browse/SPARK-19853, pr by eow


From: Shixiong(Ryan) Zhu <shixi...@databricks.com>
Sent: Tuesday, March 7, 2017 2:04:45 PM
To: Bowden, Chris
Cc: user; Gudenkauf, Jack
Subject: Re: Structured Streaming - Kafka

Good catch. Could you create a ticket? You can also submit a PR to fix it if 
you have time :)

On Tue, Mar 7, 2017 at 1:52 PM, Bowden, Chris 
<chris.bow...@hpe.com<mailto:chris.bow...@hpe.com>> wrote:

Potential bug when using startingOffsets = SpecificOffsets with Kafka topics 
containing uppercase characters?

KafkaSourceProvider#L80/86:

val startingOffsets =
  
caseInsensitiveParams.get(STARTING_OFFSETS_OPTION_KEY).map(_.trim.toLowerCase) 
match {
case Some("latest") => LatestOffsets
case Some("earliest") => EarliestOffsets
case Some(json) => SpecificOffsets(JsonUtils.partitionOffsets(json))
case None => LatestOffsets
  }

Topics in JSON get lowered so underlying assignments in the consumer are 
incorrect, and the assertion in KafkaSource#L326 triggers:

private def fetchSpecificStartingOffsets(
partitionOffsets: Map[TopicPartition, Long]): Map[TopicPartition, Long] = {
  val result = withRetriesWithoutInterrupt {
// Poll to get the latest assigned partitions
consumer.poll(0)
val partitions = consumer.assignment()
consumer.pause(partitions)
assert(partitions.asScala == partitionOffsets.keySet,
  "If startingOffsets contains specific offsets, you must specify all 
TopicPartitions.\n" +
"Use -1 for latest, -2 for earliest, if you don't care.\n" +
s"Specified: ${partitionOffsets.keySet} Assigned: 
${partitions.asScala}")



Structured Streaming - Kafka

2017-03-07 Thread Bowden, Chris
Potential bug when using startingOffsets = SpecificOffsets with Kafka topics 
containing uppercase characters?

KafkaSourceProvider#L80/86:

val startingOffsets =
  
caseInsensitiveParams.get(STARTING_OFFSETS_OPTION_KEY).map(_.trim.toLowerCase) 
match {
case Some("latest") => LatestOffsets
case Some("earliest") => EarliestOffsets
case Some(json) => SpecificOffsets(JsonUtils.partitionOffsets(json))
case None => LatestOffsets
  }

Topics in JSON get lowered so underlying assignments in the consumer are 
incorrect, and the assertion in KafkaSource#L326 triggers:

private def fetchSpecificStartingOffsets(
partitionOffsets: Map[TopicPartition, Long]): Map[TopicPartition, Long] = {
  val result = withRetriesWithoutInterrupt {
// Poll to get the latest assigned partitions
consumer.poll(0)
val partitions = consumer.assignment()
consumer.pause(partitions)
assert(partitions.asScala == partitionOffsets.keySet,
  "If startingOffsets contains specific offsets, you must specify all 
TopicPartitions.\n" +
"Use -1 for latest, -2 for earliest, if you don't care.\n" +
s"Specified: ${partitionOffsets.keySet} Assigned: 
${partitions.asScala}")


Catalyst Expression(s) - Cleanup

2017-01-25 Thread Bowden, Chris
Is there currently any way to receive a signal when an Expression will no 
longer receive any rows so internal resources can be cleaned up?

I have seen Generators are allowed to terminate() but my Expression(s) do not 
need to emit 0..N rows.


FunctionRegistry

2017-01-20 Thread Bowden, Chris
Thoughts on exposing FunctionRegistry via ExperimentalMethods?


I have functionality which can not be expressed efficiently via UDFs, 
consequently I implement my own Expressions. Currently I have to lift access to 
FunctionRegistry in my project(s) within org.apache.spark.sql.*. I also have to 
duplicate a number of functions to reconstruct FunctionRegistry.expression's 
behavior.


If we allow public modification of strategies and optimizations is there much 
risk in allowing direct access to register Expressions that can't be succinctly 
or efficiently represented as high level UDFs?