What about chaining with akka or akka stream and the fair scheduler ? Le 13 sept. 2017 01:51, "Sunita Arvind" <sunitarv...@gmail.com> a écrit :
Hi Michael, I am wondering what I am doing wrong. I get error like: Exception in thread "main" java.lang.IllegalArgumentException: Schema must be specified when creating a streaming source DataFrame. If some files already exist in the directory, then depending on the file format you may be able to create a static DataFrame on that directory with 'spark.read.load(directory)' and infer schema from it. at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema( DataSource.scala:223) at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$ lzycompute(DataSource.scala:87) at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo( DataSource.scala:87) at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply( StreamingRelation.scala:30) at org.apache.spark.sql.streaming.DataStreamReader. load(DataStreamReader.scala:125) at org.apache.spark.sql.streaming.DataStreamReader. load(DataStreamReader.scala:134) at com.aol.persist.UplynkAggregates$.aggregator( UplynkAggregates.scala:23) at com.aol.persist.UplynkAggregates$.main(UplynkAggregates.scala:41) at com.aol.persist.UplynkAggregates.main(UplynkAggregates.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke( NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke( DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) 17/09/12 14:46:22 INFO SparkContext: Invoking stop() from shutdown hook I tried specifying the schema as well. Here is my code: object Aggregates { val aggregation= """select sum(col1), sum(col2), id, first(name) from enrichedtb group by id """.stripMargin def aggregator(conf:Config)={ implicit val spark = SparkSession.builder().appName(conf.getString("AppName")).getOrCreate() implicit val sqlctx = spark.sqlContext printf("Source path is" + conf.getString("source.path")) val schemadf = sqlctx.read.parquet(conf.getString("source.path")) // Added this as it was complaining about schema. val df=spark.readStream.format("parquet").option("inferSchema", true).schema(schemadf.schema).load(conf.getString("source.path")) df.createOrReplaceTempView("enrichedtb") val res = spark.sql(aggregation) res.writeStream.format("parquet").outputMode("append").option("checkpointLocation",conf.getString("checkpointdir")).start(conf.getString("sink.outputpath")) } def main(args: Array[String]): Unit = { val mainconf = ConfigFactory.load() val conf = mainconf.getConfig(mainconf.getString("pipeline")) print(conf.toString) aggregator(conf) } } I tried to extract schema from static read of the input path and provided it to the readStream API. With that, I get this error: at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297) at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:109) at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:232) at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278) at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:282) at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:222) While running on the EMR cluster all paths point to S3. In my laptop, they all point to local filesystem. I am using Spark2.2.0 Appreciate your help. regards Sunita On Wed, Aug 23, 2017 at 2:30 PM, Michael Armbrust <mich...@databricks.com> wrote: > If you use structured streaming and the file sink, you can have a > subsequent stream read using the file source. This will maintain exactly > once processing even if there are hiccups or failures. > > On Mon, Aug 21, 2017 at 2:02 PM, Sunita Arvind <sunitarv...@gmail.com> > wrote: > >> Hello Spark Experts, >> >> I have a design question w.r.t Spark Streaming. I have a streaming job >> that consumes protocol buffer encoded real time logs from a Kafka cluster >> on premise. My spark application runs on EMR (aws) and persists data onto >> s3. Before I persist, I need to strip header and convert protobuffer to >> parquet (I use sparksql-scalapb to convert from Protobuff to >> Spark.sql.Row). I need to persist Raw logs as is. I can continue the >> enrichment on the same dataframe after persisting the raw data, however, in >> order to modularize I am planning to have a separate job which picks up the >> raw data and performs enrichment on it. Also, I am trying to avoid all in >> 1 job as the enrichments could get project specific while raw data >> persistence stays customer/project agnostic.The enriched data is allowed to >> have some latency (few minutes) >> >> My challenge is, after persisting the raw data, how do I chain the next >> streaming job. The only way I can think of is - job 1 (raw data) >> partitions on current date (YYYYMMDD) and within current date, the job 2 >> (enrichment job) filters for records within 60s of current time and >> performs enrichment on it in 60s batches. >> Is this a good option? It seems to be error prone. When either of the >> jobs get delayed due to bursts or any error/exception this could lead to >> huge data losses and non-deterministic behavior . What are other >> alternatives to this? >> >> Appreciate any guidance in this regard. >> >> regards >> Sunita Koppar >> > >