Sorry. Can you please tell me where to create the JIRA? Also is there any specific Github repository I need to commit code into - OR - just in our own? Please let me know. Thanks.
On Mon, Jan 18, 2021 at 7:07 PM Gabor Somogyi <gabor.g.somo...@gmail.com> wrote: > Thanks you, as we've asked could you please create a jira and commit the > code into github? > It would speed things up a lot. > > G > > > On Mon, Jan 18, 2021 at 2:14 PM Eric Beabes <mailinglist...@gmail.com> > wrote: > >> Here's a very simple reproducer app. I've attached 3 files: >> SparkTest.scala, QueryListener.scala & pom.xml. Copying contents in the >> email as well: >> >> package com.myorg >> >> import org.apache.hadoop.conf.Configuration >> import org.apache.hadoop.fs.{FileSystem, Path} >> import org.apache.hadoop.security.UserGroupInformation >> import org.apache.kafka.clients.producer.ProducerConfig >> import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout} >> import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession} >> >> import scala.util.{Failure, Success, Try} >> >> object Spark3Test { >> >> val isLocal = false >> >> implicit val stringEncoder: Encoder[String] = Encoders.STRING >> implicit val myStateEncoder: Encoder[MyState] = Encoders.kryo[MyState] >> >> val START_DATE_INDEX = 21 >> val END_DATE_INDEX = 40 >> >> def main(args: Array[String]) { >> >> val spark: SparkSession = initializeSparkSession("Spark 3.0 Upgrade", >> isLocal) >> spark.sparkContext.setLogLevel("WARN") >> >> readKafkaStream(spark) >> .groupByKey(row => { >> row.substring(START_DATE_INDEX, END_DATE_INDEX) >> }) >> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( >> updateAcrossEvents >> ) >> .filter(row => !row.inProgress) >> .map(row => "key: " + row.dateTime + " " + "count: " + row.count) >> .writeStream >> .format("kafka") >> .option( >> s"kafka.${ProducerConfig.BOOTSTRAP_SERVERS_CONFIG}", >> "10.29.42.141:9092" >> // "localhost:9092" >> ) >> .option("topic", "spark3test") >> .option("checkpointLocation", "/tmp/checkpoint_5") >> .outputMode("update") >> .start() >> manageStreamingQueries(spark) >> } >> >> def readKafkaStream(sparkSession: SparkSession): Dataset[String] = { >> >> val stream = sparkSession.readStream >> .format("kafka") >> .option("kafka.bootstrap.servers", "10.29.42.141:9092") >> .option("subscribe", "inputTopic") >> .option("startingOffsets", "latest") >> .option("failOnDataLoss", "false") >> .option("kafkaConsumer.pollTimeoutMs", "120000") >> .load() >> .selectExpr("CAST(value AS STRING)") >> .as[String](Encoders.STRING) >> stream >> } >> >> def updateAcrossEvents(key: String, inputs: Iterator[String], oldState: >> GroupState[MyState]): MyState = { >> if (!oldState.exists) { >> println(key) >> val state = MyState(key) >> oldState.update(state) >> oldState.setTimeoutDuration("1 minutes") >> oldState.get >> } else { >> if (oldState.hasTimedOut) { >> oldState.get.inProgress = false >> val state = oldState.get >> println("State timed out for key: " + state.dateTime) >> oldState.remove() >> state >> } else { >> val state = oldState.get >> state.count = state.count + 1 >> oldState.update(state) >> oldState.setTimeoutDuration("1 minutes") >> oldState.get >> } >> } >> } >> >> def initializeSparkSession(applicationName: String, isLocal: Boolean): >> SparkSession = { >> UserGroupInformation.setLoginUser( >> UserGroupInformation.createRemoteUser("hduser") >> ) >> >> val builder = SparkSession >> .builder() >> .appName(applicationName) >> >> if (isLocal) { >> builder.config("spark.master", "local[2]") >> } >> >> builder.getOrCreate() >> } >> >> def manageStreamingQueries(spark: SparkSession): Unit = { >> >> val sparkQueryListener = new QueryListener() >> spark.streams.addListener(sparkQueryListener) >> >> val shutdownMarker: String = "/tmp/stop_job" >> >> val timeoutInMilliSeconds = 60000 >> while (!spark.streams.active.isEmpty) { >> Try(spark.streams.awaitAnyTermination(timeoutInMilliSeconds)) match { >> case Success(result) => >> if (result) { >> println("A streaming query was terminated successfully") >> spark.streams.resetTerminated() >> } >> case Failure(e) => >> println("Query failed with message: " + e.getMessage) >> e.printStackTrace() >> spark.streams.resetTerminated() >> } >> >> if (checkMarker(shutdownMarker)) { >> spark.streams.active.foreach(query => { >> println(s"Stopping streaming query: ${query.id}") >> query.stop() >> }) >> spark.stop() >> removeMarker(shutdownMarker) >> } >> } >> assert(spark.streams.active.isEmpty) >> spark.streams.removeListener(sparkQueryListener) >> } >> >> def checkMarker(markerFile: String): Boolean = { >> val fs = FileSystem.get(new Configuration()) >> fs.exists(new Path(markerFile)) >> } >> >> def removeMarker(markerFile: String): Unit = { >> val fs = FileSystem.get(new Configuration()) >> fs.delete(new Path(markerFile), true) >> } >> >> } >> >> case class MyState(var dateTime: String, var inProgress: Boolean = true, var >> count: Int = 1) >> >> >> >> >> >> >> >> package com.myorg >> >> import org.apache.spark.sql.streaming.StreamingQueryListener >> >> class QueryListener extends StreamingQueryListener { >> >> def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit >> = {} >> >> def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): >> Unit = { >> if (event.progress.numInputRows != 0) { >> println( >> s"InputRows: ${event.progress.numInputRows}" >> ) >> } >> } >> >> def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): >> Unit = { >> println(s"Query with id ${event.id} terminated") >> } >> } >> >> >> >> >> >> >> >> <project xmlns="http://maven.apache.org/POM/4.0.0" >> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" >> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 >> http://maven.apache.org/maven-v4_0_0.xsd"> >> <modelVersion>4.0.0</modelVersion> >> <groupId>com.myorg</groupId> >> <artifactId>spark-3-conversion</artifactId> >> <packaging>jar</packaging> >> <version>1.0-SNAPSHOT</version> >> <name>spark-3-conversion</name> >> <url>http://maven.apache.org</url> >> >> <properties> >> <spark.version>3.0.0</spark.version> >> <scala.binary.version>2.12</scala.binary.version> >> <scala.version>2.12.10</scala.version> >> <scoverage.plugin.version>1.4.0-RC1</scoverage.plugin.version> >> <skipTests>true</skipTests> >> <maven.compiler.source>1.5</maven.compiler.source> >> <maven.compiler.target>1.5</maven.compiler.target> >> <encoding>UTF-8</encoding> >> </properties> >> >> >> <dependencies> >> <dependency> >> <groupId>org.scala-lang</groupId> >> <artifactId>scala-library</artifactId> >> <version>${scala.version}</version> >> </dependency> >> >> <dependency> >> <groupId>org.apache.spark</groupId> >> <artifactId>spark-core_${scala.binary.version}</artifactId> >> <version>${spark.version}</version> >> <scope>provided</scope> >> </dependency> >> <dependency> >> <groupId>org.apache.spark</groupId> >> <artifactId>spark-streaming_${scala.binary.version}</artifactId> >> <version>${spark.version}</version> >> <scope>provided</scope> >> </dependency> >> <dependency> >> <groupId>org.apache.spark</groupId> >> <artifactId>spark-sql_${scala.binary.version}</artifactId> >> <version>${spark.version}</version> >> <scope>provided</scope> >> </dependency> >> <dependency> >> <groupId>org.apache.spark</groupId> >> >> <artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId> >> <version>${spark.version}</version> >> </dependency> >> >> <dependency> >> <groupId>org.apache.spark</groupId> >> <artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId> >> <version>${spark.version}</version> >> </dependency> >> >> <dependency> >> <groupId>org.slf4j</groupId> >> <artifactId>slf4j-log4j12</artifactId> >> <version>1.7.7</version> >> <scope>runtime</scope> >> </dependency> >> <dependency> >> <groupId>log4j</groupId> >> <artifactId>log4j</artifactId> >> <version>1.2.17</version> >> <scope>compile</scope> >> </dependency> >> >> <dependency> >> <groupId>org.scalatest</groupId> >> <artifactId>scalatest_${scala.binary.version}</artifactId> >> <version>3.0.7</version> >> <scope>test</scope> >> </dependency> >> >> <dependency> >> <groupId>junit</groupId> >> <artifactId>junit</artifactId> >> <version>3.8.1</version> >> <scope>test</scope> >> </dependency> >> </dependencies> >> >> <build> >> <plugins> >> <plugin> >> <groupId>org.apache.maven.plugins</groupId> >> <artifactId>maven-shade-plugin</artifactId> >> <version>3.0.0</version> >> <executions> >> <!-- Run shade goal on package phase --> >> <execution> >> <phase>install</phase> >> <goals> >> <goal>shade</goal> >> </goals> >> </execution> >> </executions> >> </plugin> >> >> <!-- Scala Compiler --> >> <plugin> >> <groupId>net.alchim31.maven</groupId> >> <artifactId>scala-maven-plugin</artifactId> >> <version>3.2.2</version> >> <executions> >> <execution> >> <goals> >> <goal>compile</goal> >> <goal>testCompile</goal> >> </goals> >> </execution> >> </executions> >> </plugin> >> >> <plugin> >> <groupId>org.codehaus.mojo</groupId> >> <artifactId>build-helper-maven-plugin</artifactId> >> <version>1.7</version> >> <executions> >> <!-- Add src/main/scala to eclipse build path --> >> <execution> >> <id>add-source</id> >> <phase>generate-sources</phase> >> <goals> >> <goal>add-source</goal> >> </goals> >> <configuration> >> <sources> >> <source>src/main/scala</source> >> </sources> >> </configuration> >> </execution> >> <!-- Add src/test/scala to eclipse build path --> >> <execution> >> <id>add-test-source</id> >> <phase>generate-test-sources</phase> >> <goals> >> <goal>add-test-source</goal> >> </goals> >> <configuration> >> <sources> >> <source>src/test/scala</source> >> </sources> >> </configuration> >> </execution> >> </executions> >> </plugin> >> >> <!-- we disable surefile and enable scalatest so that maven can run >> our tests --> >> <plugin> >> <groupId>org.apache.maven.plugins</groupId> >> <artifactId>maven-surefire-plugin</artifactId> >> <version>2.7</version> >> <configuration> >> <skipTests>true</skipTests> >> </configuration> >> </plugin> >> <plugin> >> <groupId>org.scalatest</groupId> >> <artifactId>scalatest-maven-plugin</artifactId> >> <version>1.0</version> >> <configuration> >> >> <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory> >> <junitxml>.</junitxml> >> <filereports>WDF TestSuite.txt</filereports> >> </configuration> >> <executions> >> <execution> >> <id>test</id> >> <goals> >> <goal>test</goal> >> </goals> >> </execution> >> </executions> >> </plugin> >> <plugin> >> <groupId>org.scalastyle</groupId> >> <artifactId>scalastyle-maven-plugin</artifactId> >> <version>1.0.0</version> >> <configuration> >> <verbose>false</verbose> >> <failOnViolation>true</failOnViolation> >> <includeTestSourceDirectory>true</includeTestSourceDirectory> >> <failOnWarning>false</failOnWarning> >> >> <sourceDirectory>${project.basedir}/src/main/scala</sourceDirectory> >> >> <testSourceDirectory>${project.basedir}/src/test/scala</testSourceDirectory> >> <configLocation>lib/scalastyle_config.xml</configLocation> >> >> <outputFile>${project.basedir}/target/scalastyle-output.xml</outputFile> >> <outputEncoding>UTF-8</outputEncoding> >> </configuration> >> <executions> >> <execution> >> <goals> >> <goal>check</goal> >> </goals> >> </execution> >> </executions> >> </plugin> >> <plugin> >> <groupId>org.sonarsource.scanner.maven</groupId> >> <artifactId>sonar-maven-plugin</artifactId> >> <version>3.6.0.1398</version> >> </plugin> >> >> </plugins> >> </build> >> </project> >> >> >> >> On Wed, Jan 13, 2021 at 11:26 PM Eric Beabes <mailinglist...@gmail.com> >> wrote: >> >>> Ok. I will work on creating a reproducible app. Thanks. >>> >>> On Wed, Jan 13, 2021 at 3:57 PM Gabor Somogyi <gabor.g.somo...@gmail.com> >>> wrote: >>> >>>> Just reached this thread. +1 on to create a simple reproducer app and I >>>> suggest to create a jira attaching the full driver and executor logs. >>>> Ping me on the jira and I'll pick this up right away... >>>> >>>> Thanks! >>>> >>>> G >>>> >>>> >>>> On Wed, Jan 13, 2021 at 8:54 AM Jungtaek Lim < >>>> kabhwan.opensou...@gmail.com> wrote: >>>> >>>>> Would you mind if I ask for a simple reproducer? Would be nice if you >>>>> could create a repository in Github and push the code including the build >>>>> script. >>>>> >>>>> Thanks in advance! >>>>> >>>>> On Wed, Jan 13, 2021 at 3:46 PM Eric Beabes <mailinglist...@gmail.com> >>>>> wrote: >>>>> >>>>>> I tried both. First tried 3.0.0. That didn't work so I tried 3.1.0. >>>>>> >>>>>> On Wed, Jan 13, 2021 at 11:35 AM Jungtaek Lim < >>>>>> kabhwan.opensou...@gmail.com> wrote: >>>>>> >>>>>>> Which exact Spark version did you use? Did you make sure the version >>>>>>> for Spark and the version for spark-sql-kafka artifact are the same? (I >>>>>>> asked this because you've said you've used Spark 3.0 but spark-sql-kafka >>>>>>> dependency pointed to 3.1.0.) >>>>>>> >>>>>>> On Tue, Jan 12, 2021 at 11:04 PM Eric Beabes < >>>>>>> mailinglist...@gmail.com> wrote: >>>>>>> >>>>>>>> org.apache.spark.sql.streaming.StreamingQueryException: Data source >>>>>>>> v2 streaming sinks does not support Update mode. === Streaming Query >>>>>>>> === >>>>>>>> Identifier: [id = 1f342043-29de-4381-bc48-1c6eef99232e, runId = >>>>>>>> 62410f05-db59-4026-83fc-439a79b01c29] Current Committed Offsets: {} >>>>>>>> Current >>>>>>>> Available Offsets: {} Current State: INITIALIZING Thread State: >>>>>>>> RUNNABLE at >>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:353) >>>>>>>> at >>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244) >>>>>>>> Caused by: java.lang.IllegalArgumentException: Data source v2 streaming >>>>>>>> sinks does not support Update mode. at >>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution.createStreamingWrite(StreamExecution.scala:635) >>>>>>>> at >>>>>>>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:130) >>>>>>>> at >>>>>>>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:61) >>>>>>>> at >>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:320) >>>>>>>> ... 1 more >>>>>>>> >>>>>>>> >>>>>>>> *Please see the attached image for more information.* >>>>>>>> >>>>>>>> >>>>>>>> On Tue, Jan 12, 2021 at 6:01 PM Jacek Laskowski <ja...@japila.pl> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> Can you post the whole message? I'm trying to find what might be >>>>>>>>> causing it. A small reproducible example would be of help too. Thank >>>>>>>>> you. >>>>>>>>> >>>>>>>>> Pozdrawiam, >>>>>>>>> Jacek Laskowski >>>>>>>>> ---- >>>>>>>>> https://about.me/JacekLaskowski >>>>>>>>> "The Internals Of" Online Books <https://books.japila.pl/> >>>>>>>>> Follow me on https://twitter.com/jaceklaskowski >>>>>>>>> >>>>>>>>> <https://twitter.com/jaceklaskowski> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Tue, Jan 12, 2021 at 6:35 AM Eric Beabes < >>>>>>>>> mailinglist...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Trying to port my Spark 2.4 based (Structured) streaming >>>>>>>>>> application to Spark 3.0. I compiled it using the dependency given >>>>>>>>>> below: >>>>>>>>>> >>>>>>>>>> <dependency> >>>>>>>>>> <groupId>org.apache.spark</groupId> >>>>>>>>>> >>>>>>>>>> <artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId> >>>>>>>>>> <version>3.1.0</version> >>>>>>>>>> </dependency> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Every time I run it under Spark 3.0, I get this message: *Data >>>>>>>>>> source v2 streaming sinks does not support Update mode* >>>>>>>>>> >>>>>>>>>> I am using '*mapGroupsWithState*' so as per this link ( >>>>>>>>>> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes), >>>>>>>>>> the only supported Output mode is "*Update*". >>>>>>>>>> >>>>>>>>>> My Sink is a Kafka topic so I am using this: >>>>>>>>>> >>>>>>>>>> .writeStream >>>>>>>>>> .format("kafka") >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> What am I missing? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>> --------------------------------------------------------------------- >>>>>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>>>>>> >>>>>>>