[structured-streaming][parquet] readStream files order in Parquet

2018-06-14 Thread karthikjay
My parquet files are first partitioned by environment and then by date like:

env=testing/
   date=2018-03-04/
  part1.parquet
  part2.parquet
  part3.parquet
   date=2018-03-05/
  part1.parquet
  part2.parquet
  part3.parquet
   date=2018-03-06/
  part1.parquet
  part2.parquet
  part3.parquet
In our read stream, I do the following:

val tunerParquetDF = spark
  .readStream
  .schema(...)
  .format("parquet")
  .option("basePath", basePath)
  .option("path", basePath+"/env*")
  .option("maxFilesPerTrigger", 5)
  .load()

The expected behavior is that readstream will read files in order of the
dates but the observed behavior is that files are shuffled in random order.
How do I force the date order of read in Parquet files ?




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [Beginner][StructuredStreaming] Using Spark aggregation - WithWatermark on old data

2018-05-24 Thread karthikjay
My data looks like this:


{
  "ts2" : "2018/05/01 00:02:50.041",
  "serviceGroupId" : "123",
  "userId" : "avv-0",
  "stream" : "",
  "lastUserActivity" : "00:02:50",
  "lastUserActivityCount" : "0"
}
{
  "ts2" : "2018/05/01 00:09:02.079",
  "serviceGroupId" : "123",
  "userId" : "avv-0",
  "stream" : "",
  "lastUserActivity" : "00:09:02",
  "lastUserActivityCount" : "0"
}
{
  "ts2" : "2018/05/01 00:09:02.086",
  "serviceGroupId" : "123",
  "userId" : "avv-2",
  "stream" : "",
  "lastUserActivity" : "00:09:02",
  "lastUserActivityCount" : "0"
}
...

And my aggregation is :

val sdvTuneInsAgg1 = df
  .withWatermark("ts2", "10 seconds")
  .groupBy(window(col("ts2"),"10 seconds"))
  .agg(count("*") as "count")
  .as[CountMetric1]

But, the only anomaly is that the current date is 2018/05/24 but the record
ts2 has old dates. Will aggregation / count work in this scenario ?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[Beginner][StructuredStreaming] Using Spark aggregation - WithWatermark on old data

2018-05-23 Thread karthikjay
I am doing the following aggregation on the data

val channelChangesAgg = tunerDataJsonDF
  .withWatermark("ts2", "10 seconds")
  .groupBy(window(col("ts2"),"10 seconds"),
col("env"),
col("servicegroupid"))
  .agg(count("linetransactionid") as "count1")

The only constraint here is that the data is backdated; even though the data
is chronologically ordered, the ts2 will be a old date. Given this
condition, will the watermarking and aggregation still work ?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [structured-streaming]How to reset Kafka offset in readStream and read from beginning

2018-05-23 Thread karthikjay
Chris,

Thank you for responding. I get it. 

But, if I am using a console sink without checkpoint location, I do not see
any messages in the console in IntellijIDEA IDE. I do not explicitly specify
checkpointLocation in this case. How do I clear the working directory data
and force Spark to read Kafka messages from the beginning. ?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[Beginner][StructuredStreaming] Console sink is not working as expected

2018-05-22 Thread karthikjay
I have the following code to read and process Kafka data using Structured
Streaming 

  
object ETLTest {

  case class record(value: String, topic: String)

  def main(args: Array[String]): Unit = {
run();
  }

  def run(): Unit = {

val spark = SparkSession
  .builder
  .appName("Test JOB")
  .master("local[*]")
  .getOrCreate()

val kafkaStreamingDF = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "...")
  .option("subscribe", "...")
  .option("failOnDataLoss", "false")
  .option("startingOffsets","earliest")
  .load()
  .selectExpr("CAST(value as STRING)", "CAST(timestamp as
STRING)","CAST(topic as STRING)")

val sdvWriter = new ForeachWriter[record] {
  def open(partitionId: Long, version: Long): Boolean = {
true
  }
  def process(record: record) = {
println("record:: " + record)
  }
  def close(errorOrNull: Throwable): Unit = {}
}

val sdvDF = kafkaStreamingDF
  .as[record]
  .filter($"value".isNotNull)

// DOES NOT WORK
/*val query = sdvDF
.writeStream
.format("console")
.start()
.awaitTermination()*/

// WORKS
/*val query = sdvDF
  .writeStream
  .foreach(sdvWriter)
  .start()
  .awaitTermination()
  */

  }

}

I am running this code from IntellijIdea IDE and when I use the
foreach(sdvWriter), I could see the records consumed from Kafka, but when I
use .writeStream.format("console") I do not see any records. I assume that
the console write stream is maintaining some sort of checkpoint and assumes
it has processed all the records. Is that the case ? Am I missing something
obvious here ?




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[structured-streaming]How to reset Kafka offset in readStream and read from beginning

2018-05-22 Thread karthikjay
I have the following readstream in Spark structured streaming reading data
from Kafka

val kafkaStreamingDF = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "...")
  .option("subscribe", "testtopic")
  .option("failOnDataLoss", "false")
  .option("startingOffsets","earliest")
  .load()
  .selectExpr("CAST(value as STRING)", "CAST(topic as STRING)")

As far as I know, every time I start the job, underneath the covers, Spark
created new consumer, new consumer group and retrieves the last successful
offset for the job(using the job name ?) and seeks to that offset and start
reading from there. Is that the case ? If yes, how do I reset the offset to
start and force my job to read from beginning ? 



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark job terminated without any errors

2018-05-18 Thread karthikjay
We have created multiples spark jobs (as far JAR) and run it using
spark-submit in a nohup mode. Most of the jobs quits after a while. We tried
to harness the logs for failures but the only message that gave us some clue
was "18/05/07 18:31:38 INFO Worker: Executor app-20180507180436-0016/0
finished with state KILLED exitStatus 143" Any help appreciated.

Worker and master logs -
https://gist.github.com/karthik-arris/57804a2b80a8a89754578ab308084b48#file-master-log




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[structured-streaming] foreachPartition alternative in structured streaming.

2018-05-17 Thread karthikjay
I am reading data from Kafka using structured streaming and I need to save
the data to InfluxDB. In the regular Dstreams based approach I did this as
follows:

  val messages:DStream[(String, String)] =  kafkaStream.map(record =>
(record.topic, record.value))
  messages.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
  val influxService = new InfluxService()
  val connection = influxService.createInfluxDBConnectionWithParams(
  host,
  port,
  username,
  password,
  database
  )
  partitionOfRecords.foreach(record => {
ABCService.handleData(connection, record._1, record._2)
  }
  )
}
  }
  ssc.start()
  logger.info("Started Spark-Kafka streaming session")
  ssc.awaitTermination()

Note: I create connection object inside foreachpartition. How do I do this
in Structured Streaming ? I tried connection pooling approach (where I
create a pool of connections on the master node and pass it to worker nodes
)  here

  
and the workers could not get the connection pool object. Anything obvious
that I am missing here ?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[structured-streaming][kafka] Will the Kafka readstream timeout after connections.max.idle.ms 540000 ms ?

2018-05-15 Thread karthikjay
Hi all,

We are running into a scenario where the structured streaming job is exiting
after a while specifically when the Kafka topic is not getting any data.
>From the job logs, I see this connections.max.idle.ms = 54. Does that
mean the spark readstream will close when it does not get data for 54
milliseconds ? If yes, how do I override this ?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[Structured-Streaming][Beginner] Out of order messages with Spark kafka readstream from a specific partition

2018-05-09 Thread karthikjay
On the producer side, I make sure data for a specific user lands on the same
partition. On the consumer side, I use a regular Spark kafka readstream and
read the data. I also use a console write stream to print out the spark
kafka DataFrame. What I observer is, the data for a specific user (even
though in the same partition) arrives out of order in the console. 

I also verified the data ordering by running a simple Kafka consumer in Java
and the data seems to be ordered. What am I missing here ?

Thanks,
JK



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[beginner][StructuredStreaming] Null pointer exception - possible serialization errors.

2018-05-06 Thread karthikjay
I am getting a null pointer exception when trying to implement a connection
pooling mechanism in Apache Spark. Any help appreciated. 

https://stackoverflow.com/questions/50205650/spark-connection-pooling-is-this-the-right-approach




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[Structured Streaming] [Kafka] How to repartition the data and distribute the processing among worker nodes

2018-04-20 Thread karthikjay
Any help appreciated. please find the question in the link:

https://stackoverflow.com/questions/49951022/spark-structured-streaming-with-kafka-how-to-repartition-the-data-and-distribu




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[Structured Streaming][Kafka] For a Kafka topic with 3 partitions, how does the parallelism work ?

2018-04-20 Thread karthikjay
I have the following code to read data from Kafka topic using the structured
streaming. The topic has 3 partitions:

 val spark = SparkSession
  .builder
  .appName("TestPartition")
  .master("local[*]")
  .getOrCreate()

import spark.implicits._

val dataFrame = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers",
"1.2.3.184:9092,1.2.3.185:9092,1.2.3.186:9092")
  .option("subscribe", "partition_test")
  .option("failOnDataLoss", "false")
  .load()
  .selectExpr("CAST(value AS STRING)")

My understanding is that Spark will launch 3 Kafka consumers (for 3
partitions) and these 3 consumers will be running on the worker nodes. Is my
understanding right ?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Writing record once after the watermarking interval in Spark Structured Streaming

2018-03-29 Thread karthikjay
I have the following query:

val ds = dataFrame
  .filter(! $"requri".endsWith(".m3u8"))
  .filter(! $"bserver".contains("trimmer"))
  .withWatermark("time", "120 seconds")
  .groupBy(window(dataFrame.col("time"),"60
seconds"),col("channelName"))
  .agg(sum("bytes")/100 as "byte_count")

How do I implement a foreach writer so that its process method is triggered
only once for every watermarking interval. i.e in the aforementioned
example, I will get the following

10.00-10.01 Channel-1 100(bytes)
10.00-10.01 Channel-2 120(bytes)
10.01-10.02 Channel-1 110(bytes)
...



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[Structured Streaming][Parquet] How do specify partition and data when saving to Parquet

2018-03-02 Thread karthikjay
My DataFrame has the following schema
root
 |-- data: struct (nullable = true)
 ||-- zoneId: string (nullable = true)
 ||-- deviceId: string (nullable = true)
 ||-- timeSinceLast: long (nullable = true)
 |-- date: date (nullable = true)

 
How can I do a writeStream with Parquet format and write the data
(containing zoneId, deviceId, timeSinceLast except date) and partition the
data by date ? I tried the following code and the partition by clause did
not work

val query1 = df1
  .writeStream
  .format("parquet")
  .option("path", "/Users/abc/hb_parquet/data")
  .option("checkpointLocation", "/Users/abc/hb_parquet/checkpoint")
  .partitionBy("data.zoneId")
  .start()



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[Structured Streaming] Handling Kakfa Stream messages with different JSON Schemas.

2018-02-28 Thread karthikjay
Hi all,

I have the following code to stream Kafka data and apply a schema called
"hbSchema" on it and then act on the data. 

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "10.102.255.241:9092")
  .option("subscribe", "mabr_avro_json1")
  .option("failOnDataLoss", "false")
  .load()
  .selectExpr("""deserialize("avro_json1", value) AS message""")

import spark.implicits._

val df1 = df
  .selectExpr("cast (value as string) as json")
  .select(from_json($"message", schema=hbSchema).as("data"))
  .select("data.*")

But, what if the data in Kafka topic have different schemas ? How do I apply
different schemas based on the data ?




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org