Thanks you, as we've asked could you please create a jira and commit the code into github? It would speed things up a lot.
G On Mon, Jan 18, 2021 at 2:14 PM Eric Beabes <mailinglist...@gmail.com> wrote: > Here's a very simple reproducer app. I've attached 3 files: > SparkTest.scala, QueryListener.scala & pom.xml. Copying contents in the > email as well: > > package com.myorg > > import org.apache.hadoop.conf.Configuration > import org.apache.hadoop.fs.{FileSystem, Path} > import org.apache.hadoop.security.UserGroupInformation > import org.apache.kafka.clients.producer.ProducerConfig > import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout} > import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession} > > import scala.util.{Failure, Success, Try} > > object Spark3Test { > > val isLocal = false > > implicit val stringEncoder: Encoder[String] = Encoders.STRING > implicit val myStateEncoder: Encoder[MyState] = Encoders.kryo[MyState] > > val START_DATE_INDEX = 21 > val END_DATE_INDEX = 40 > > def main(args: Array[String]) { > > val spark: SparkSession = initializeSparkSession("Spark 3.0 Upgrade", > isLocal) > spark.sparkContext.setLogLevel("WARN") > > readKafkaStream(spark) > .groupByKey(row => { > row.substring(START_DATE_INDEX, END_DATE_INDEX) > }) > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( > updateAcrossEvents > ) > .filter(row => !row.inProgress) > .map(row => "key: " + row.dateTime + " " + "count: " + row.count) > .writeStream > .format("kafka") > .option( > s"kafka.${ProducerConfig.BOOTSTRAP_SERVERS_CONFIG}", > "10.29.42.141:9092" > // "localhost:9092" > ) > .option("topic", "spark3test") > .option("checkpointLocation", "/tmp/checkpoint_5") > .outputMode("update") > .start() > manageStreamingQueries(spark) > } > > def readKafkaStream(sparkSession: SparkSession): Dataset[String] = { > > val stream = sparkSession.readStream > .format("kafka") > .option("kafka.bootstrap.servers", "10.29.42.141:9092") > .option("subscribe", "inputTopic") > .option("startingOffsets", "latest") > .option("failOnDataLoss", "false") > .option("kafkaConsumer.pollTimeoutMs", "120000") > .load() > .selectExpr("CAST(value AS STRING)") > .as[String](Encoders.STRING) > stream > } > > def updateAcrossEvents(key: String, inputs: Iterator[String], oldState: > GroupState[MyState]): MyState = { > if (!oldState.exists) { > println(key) > val state = MyState(key) > oldState.update(state) > oldState.setTimeoutDuration("1 minutes") > oldState.get > } else { > if (oldState.hasTimedOut) { > oldState.get.inProgress = false > val state = oldState.get > println("State timed out for key: " + state.dateTime) > oldState.remove() > state > } else { > val state = oldState.get > state.count = state.count + 1 > oldState.update(state) > oldState.setTimeoutDuration("1 minutes") > oldState.get > } > } > } > > def initializeSparkSession(applicationName: String, isLocal: Boolean): > SparkSession = { > UserGroupInformation.setLoginUser( > UserGroupInformation.createRemoteUser("hduser") > ) > > val builder = SparkSession > .builder() > .appName(applicationName) > > if (isLocal) { > builder.config("spark.master", "local[2]") > } > > builder.getOrCreate() > } > > def manageStreamingQueries(spark: SparkSession): Unit = { > > val sparkQueryListener = new QueryListener() > spark.streams.addListener(sparkQueryListener) > > val shutdownMarker: String = "/tmp/stop_job" > > val timeoutInMilliSeconds = 60000 > while (!spark.streams.active.isEmpty) { > Try(spark.streams.awaitAnyTermination(timeoutInMilliSeconds)) match { > case Success(result) => > if (result) { > println("A streaming query was terminated successfully") > spark.streams.resetTerminated() > } > case Failure(e) => > println("Query failed with message: " + e.getMessage) > e.printStackTrace() > spark.streams.resetTerminated() > } > > if (checkMarker(shutdownMarker)) { > spark.streams.active.foreach(query => { > println(s"Stopping streaming query: ${query.id}") > query.stop() > }) > spark.stop() > removeMarker(shutdownMarker) > } > } > assert(spark.streams.active.isEmpty) > spark.streams.removeListener(sparkQueryListener) > } > > def checkMarker(markerFile: String): Boolean = { > val fs = FileSystem.get(new Configuration()) > fs.exists(new Path(markerFile)) > } > > def removeMarker(markerFile: String): Unit = { > val fs = FileSystem.get(new Configuration()) > fs.delete(new Path(markerFile), true) > } > > } > > case class MyState(var dateTime: String, var inProgress: Boolean = true, var > count: Int = 1) > > > > > > > > package com.myorg > > import org.apache.spark.sql.streaming.StreamingQueryListener > > class QueryListener extends StreamingQueryListener { > > def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = > {} > > def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit > = { > if (event.progress.numInputRows != 0) { > println( > s"InputRows: ${event.progress.numInputRows}" > ) > } > } > > def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): > Unit = { > println(s"Query with id ${event.id} terminated") > } > } > > > > > > > > <project xmlns="http://maven.apache.org/POM/4.0.0" > xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" > xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 > http://maven.apache.org/maven-v4_0_0.xsd"> > <modelVersion>4.0.0</modelVersion> > <groupId>com.myorg</groupId> > <artifactId>spark-3-conversion</artifactId> > <packaging>jar</packaging> > <version>1.0-SNAPSHOT</version> > <name>spark-3-conversion</name> > <url>http://maven.apache.org</url> > > <properties> > <spark.version>3.0.0</spark.version> > <scala.binary.version>2.12</scala.binary.version> > <scala.version>2.12.10</scala.version> > <scoverage.plugin.version>1.4.0-RC1</scoverage.plugin.version> > <skipTests>true</skipTests> > <maven.compiler.source>1.5</maven.compiler.source> > <maven.compiler.target>1.5</maven.compiler.target> > <encoding>UTF-8</encoding> > </properties> > > > <dependencies> > <dependency> > <groupId>org.scala-lang</groupId> > <artifactId>scala-library</artifactId> > <version>${scala.version}</version> > </dependency> > > <dependency> > <groupId>org.apache.spark</groupId> > <artifactId>spark-core_${scala.binary.version}</artifactId> > <version>${spark.version}</version> > <scope>provided</scope> > </dependency> > <dependency> > <groupId>org.apache.spark</groupId> > <artifactId>spark-streaming_${scala.binary.version}</artifactId> > <version>${spark.version}</version> > <scope>provided</scope> > </dependency> > <dependency> > <groupId>org.apache.spark</groupId> > <artifactId>spark-sql_${scala.binary.version}</artifactId> > <version>${spark.version}</version> > <scope>provided</scope> > </dependency> > <dependency> > <groupId>org.apache.spark</groupId> > > <artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId> > <version>${spark.version}</version> > </dependency> > > <dependency> > <groupId>org.apache.spark</groupId> > <artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId> > <version>${spark.version}</version> > </dependency> > > <dependency> > <groupId>org.slf4j</groupId> > <artifactId>slf4j-log4j12</artifactId> > <version>1.7.7</version> > <scope>runtime</scope> > </dependency> > <dependency> > <groupId>log4j</groupId> > <artifactId>log4j</artifactId> > <version>1.2.17</version> > <scope>compile</scope> > </dependency> > > <dependency> > <groupId>org.scalatest</groupId> > <artifactId>scalatest_${scala.binary.version}</artifactId> > <version>3.0.7</version> > <scope>test</scope> > </dependency> > > <dependency> > <groupId>junit</groupId> > <artifactId>junit</artifactId> > <version>3.8.1</version> > <scope>test</scope> > </dependency> > </dependencies> > > <build> > <plugins> > <plugin> > <groupId>org.apache.maven.plugins</groupId> > <artifactId>maven-shade-plugin</artifactId> > <version>3.0.0</version> > <executions> > <!-- Run shade goal on package phase --> > <execution> > <phase>install</phase> > <goals> > <goal>shade</goal> > </goals> > </execution> > </executions> > </plugin> > > <!-- Scala Compiler --> > <plugin> > <groupId>net.alchim31.maven</groupId> > <artifactId>scala-maven-plugin</artifactId> > <version>3.2.2</version> > <executions> > <execution> > <goals> > <goal>compile</goal> > <goal>testCompile</goal> > </goals> > </execution> > </executions> > </plugin> > > <plugin> > <groupId>org.codehaus.mojo</groupId> > <artifactId>build-helper-maven-plugin</artifactId> > <version>1.7</version> > <executions> > <!-- Add src/main/scala to eclipse build path --> > <execution> > <id>add-source</id> > <phase>generate-sources</phase> > <goals> > <goal>add-source</goal> > </goals> > <configuration> > <sources> > <source>src/main/scala</source> > </sources> > </configuration> > </execution> > <!-- Add src/test/scala to eclipse build path --> > <execution> > <id>add-test-source</id> > <phase>generate-test-sources</phase> > <goals> > <goal>add-test-source</goal> > </goals> > <configuration> > <sources> > <source>src/test/scala</source> > </sources> > </configuration> > </execution> > </executions> > </plugin> > > <!-- we disable surefile and enable scalatest so that maven can run our > tests --> > <plugin> > <groupId>org.apache.maven.plugins</groupId> > <artifactId>maven-surefire-plugin</artifactId> > <version>2.7</version> > <configuration> > <skipTests>true</skipTests> > </configuration> > </plugin> > <plugin> > <groupId>org.scalatest</groupId> > <artifactId>scalatest-maven-plugin</artifactId> > <version>1.0</version> > <configuration> > > <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory> > <junitxml>.</junitxml> > <filereports>WDF TestSuite.txt</filereports> > </configuration> > <executions> > <execution> > <id>test</id> > <goals> > <goal>test</goal> > </goals> > </execution> > </executions> > </plugin> > <plugin> > <groupId>org.scalastyle</groupId> > <artifactId>scalastyle-maven-plugin</artifactId> > <version>1.0.0</version> > <configuration> > <verbose>false</verbose> > <failOnViolation>true</failOnViolation> > <includeTestSourceDirectory>true</includeTestSourceDirectory> > <failOnWarning>false</failOnWarning> > <sourceDirectory>${project.basedir}/src/main/scala</sourceDirectory> > > <testSourceDirectory>${project.basedir}/src/test/scala</testSourceDirectory> > <configLocation>lib/scalastyle_config.xml</configLocation> > > <outputFile>${project.basedir}/target/scalastyle-output.xml</outputFile> > <outputEncoding>UTF-8</outputEncoding> > </configuration> > <executions> > <execution> > <goals> > <goal>check</goal> > </goals> > </execution> > </executions> > </plugin> > <plugin> > <groupId>org.sonarsource.scanner.maven</groupId> > <artifactId>sonar-maven-plugin</artifactId> > <version>3.6.0.1398</version> > </plugin> > > </plugins> > </build> > </project> > > > > On Wed, Jan 13, 2021 at 11:26 PM Eric Beabes <mailinglist...@gmail.com> > wrote: > >> Ok. I will work on creating a reproducible app. Thanks. >> >> On Wed, Jan 13, 2021 at 3:57 PM Gabor Somogyi <gabor.g.somo...@gmail.com> >> wrote: >> >>> Just reached this thread. +1 on to create a simple reproducer app and I >>> suggest to create a jira attaching the full driver and executor logs. >>> Ping me on the jira and I'll pick this up right away... >>> >>> Thanks! >>> >>> G >>> >>> >>> On Wed, Jan 13, 2021 at 8:54 AM Jungtaek Lim < >>> kabhwan.opensou...@gmail.com> wrote: >>> >>>> Would you mind if I ask for a simple reproducer? Would be nice if you >>>> could create a repository in Github and push the code including the build >>>> script. >>>> >>>> Thanks in advance! >>>> >>>> On Wed, Jan 13, 2021 at 3:46 PM Eric Beabes <mailinglist...@gmail.com> >>>> wrote: >>>> >>>>> I tried both. First tried 3.0.0. That didn't work so I tried 3.1.0. >>>>> >>>>> On Wed, Jan 13, 2021 at 11:35 AM Jungtaek Lim < >>>>> kabhwan.opensou...@gmail.com> wrote: >>>>> >>>>>> Which exact Spark version did you use? Did you make sure the version >>>>>> for Spark and the version for spark-sql-kafka artifact are the same? (I >>>>>> asked this because you've said you've used Spark 3.0 but spark-sql-kafka >>>>>> dependency pointed to 3.1.0.) >>>>>> >>>>>> On Tue, Jan 12, 2021 at 11:04 PM Eric Beabes < >>>>>> mailinglist...@gmail.com> wrote: >>>>>> >>>>>>> org.apache.spark.sql.streaming.StreamingQueryException: Data source >>>>>>> v2 streaming sinks does not support Update mode. === Streaming Query === >>>>>>> Identifier: [id = 1f342043-29de-4381-bc48-1c6eef99232e, runId = >>>>>>> 62410f05-db59-4026-83fc-439a79b01c29] Current Committed Offsets: {} >>>>>>> Current >>>>>>> Available Offsets: {} Current State: INITIALIZING Thread State: >>>>>>> RUNNABLE at >>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:353) >>>>>>> at >>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244) >>>>>>> Caused by: java.lang.IllegalArgumentException: Data source v2 streaming >>>>>>> sinks does not support Update mode. at >>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution.createStreamingWrite(StreamExecution.scala:635) >>>>>>> at >>>>>>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:130) >>>>>>> at >>>>>>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:61) >>>>>>> at >>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:320) >>>>>>> ... 1 more >>>>>>> >>>>>>> >>>>>>> *Please see the attached image for more information.* >>>>>>> >>>>>>> >>>>>>> On Tue, Jan 12, 2021 at 6:01 PM Jacek Laskowski <ja...@japila.pl> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> Can you post the whole message? I'm trying to find what might be >>>>>>>> causing it. A small reproducible example would be of help too. Thank >>>>>>>> you. >>>>>>>> >>>>>>>> Pozdrawiam, >>>>>>>> Jacek Laskowski >>>>>>>> ---- >>>>>>>> https://about.me/JacekLaskowski >>>>>>>> "The Internals Of" Online Books <https://books.japila.pl/> >>>>>>>> Follow me on https://twitter.com/jaceklaskowski >>>>>>>> >>>>>>>> <https://twitter.com/jaceklaskowski> >>>>>>>> >>>>>>>> >>>>>>>> On Tue, Jan 12, 2021 at 6:35 AM Eric Beabes < >>>>>>>> mailinglist...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Trying to port my Spark 2.4 based (Structured) streaming >>>>>>>>> application to Spark 3.0. I compiled it using the dependency given >>>>>>>>> below: >>>>>>>>> >>>>>>>>> <dependency> >>>>>>>>> <groupId>org.apache.spark</groupId> >>>>>>>>> >>>>>>>>> <artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId> >>>>>>>>> <version>3.1.0</version> >>>>>>>>> </dependency> >>>>>>>>> >>>>>>>>> >>>>>>>>> Every time I run it under Spark 3.0, I get this message: *Data >>>>>>>>> source v2 streaming sinks does not support Update mode* >>>>>>>>> >>>>>>>>> I am using '*mapGroupsWithState*' so as per this link ( >>>>>>>>> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes), >>>>>>>>> the only supported Output mode is "*Update*". >>>>>>>>> >>>>>>>>> My Sink is a Kafka topic so I am using this: >>>>>>>>> >>>>>>>>> .writeStream >>>>>>>>> .format("kafka") >>>>>>>>> >>>>>>>>> >>>>>>>>> What am I missing? >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>> --------------------------------------------------------------------- >>>>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>>>>> >>>>>>