And also include some test data as well. I quickly looked through the code and the code may require a specific format of the record.
On Tue, Jan 19, 2021 at 12:10 AM German Schiavon <gschiavonsp...@gmail.com> wrote: > Hi, > > This is the jira <https://issues.apache.org/jira/projects/SPARK/summary> and > regarding the repo, I believe just commit it to your personal repo and that > should be it. > > Regards > > On Mon, 18 Jan 2021 at 15:46, Eric Beabes <mailinglist...@gmail.com> > wrote: > >> Sorry. Can you please tell me where to create the JIRA? Also is there any >> specific Github repository I need to commit code into - OR - just in our >> own? Please let me know. Thanks. >> >> On Mon, Jan 18, 2021 at 7:07 PM Gabor Somogyi <gabor.g.somo...@gmail.com> >> wrote: >> >>> Thanks you, as we've asked could you please create a jira and commit the >>> code into github? >>> It would speed things up a lot. >>> >>> G >>> >>> >>> On Mon, Jan 18, 2021 at 2:14 PM Eric Beabes <mailinglist...@gmail.com> >>> wrote: >>> >>>> Here's a very simple reproducer app. I've attached 3 files: >>>> SparkTest.scala, QueryListener.scala & pom.xml. Copying contents in the >>>> email as well: >>>> >>>> package com.myorg >>>> >>>> import org.apache.hadoop.conf.Configuration >>>> import org.apache.hadoop.fs.{FileSystem, Path} >>>> import org.apache.hadoop.security.UserGroupInformation >>>> import org.apache.kafka.clients.producer.ProducerConfig >>>> import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout} >>>> import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession} >>>> >>>> import scala.util.{Failure, Success, Try} >>>> >>>> object Spark3Test { >>>> >>>> val isLocal = false >>>> >>>> implicit val stringEncoder: Encoder[String] = Encoders.STRING >>>> implicit val myStateEncoder: Encoder[MyState] = Encoders.kryo[MyState] >>>> >>>> val START_DATE_INDEX = 21 >>>> val END_DATE_INDEX = 40 >>>> >>>> def main(args: Array[String]) { >>>> >>>> val spark: SparkSession = initializeSparkSession("Spark 3.0 Upgrade", >>>> isLocal) >>>> spark.sparkContext.setLogLevel("WARN") >>>> >>>> readKafkaStream(spark) >>>> .groupByKey(row => { >>>> row.substring(START_DATE_INDEX, END_DATE_INDEX) >>>> }) >>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())( >>>> updateAcrossEvents >>>> ) >>>> .filter(row => !row.inProgress) >>>> .map(row => "key: " + row.dateTime + " " + "count: " + row.count) >>>> .writeStream >>>> .format("kafka") >>>> .option( >>>> s"kafka.${ProducerConfig.BOOTSTRAP_SERVERS_CONFIG}", >>>> "10.29.42.141:9092" >>>> // "localhost:9092" >>>> ) >>>> .option("topic", "spark3test") >>>> .option("checkpointLocation", "/tmp/checkpoint_5") >>>> .outputMode("update") >>>> .start() >>>> manageStreamingQueries(spark) >>>> } >>>> >>>> def readKafkaStream(sparkSession: SparkSession): Dataset[String] = { >>>> >>>> val stream = sparkSession.readStream >>>> .format("kafka") >>>> .option("kafka.bootstrap.servers", "10.29.42.141:9092") >>>> .option("subscribe", "inputTopic") >>>> .option("startingOffsets", "latest") >>>> .option("failOnDataLoss", "false") >>>> .option("kafkaConsumer.pollTimeoutMs", "120000") >>>> .load() >>>> .selectExpr("CAST(value AS STRING)") >>>> .as[String](Encoders.STRING) >>>> stream >>>> } >>>> >>>> def updateAcrossEvents(key: String, inputs: Iterator[String], oldState: >>>> GroupState[MyState]): MyState = { >>>> if (!oldState.exists) { >>>> println(key) >>>> val state = MyState(key) >>>> oldState.update(state) >>>> oldState.setTimeoutDuration("1 minutes") >>>> oldState.get >>>> } else { >>>> if (oldState.hasTimedOut) { >>>> oldState.get.inProgress = false >>>> val state = oldState.get >>>> println("State timed out for key: " + state.dateTime) >>>> oldState.remove() >>>> state >>>> } else { >>>> val state = oldState.get >>>> state.count = state.count + 1 >>>> oldState.update(state) >>>> oldState.setTimeoutDuration("1 minutes") >>>> oldState.get >>>> } >>>> } >>>> } >>>> >>>> def initializeSparkSession(applicationName: String, isLocal: Boolean): >>>> SparkSession = { >>>> UserGroupInformation.setLoginUser( >>>> UserGroupInformation.createRemoteUser("hduser") >>>> ) >>>> >>>> val builder = SparkSession >>>> .builder() >>>> .appName(applicationName) >>>> >>>> if (isLocal) { >>>> builder.config("spark.master", "local[2]") >>>> } >>>> >>>> builder.getOrCreate() >>>> } >>>> >>>> def manageStreamingQueries(spark: SparkSession): Unit = { >>>> >>>> val sparkQueryListener = new QueryListener() >>>> spark.streams.addListener(sparkQueryListener) >>>> >>>> val shutdownMarker: String = "/tmp/stop_job" >>>> >>>> val timeoutInMilliSeconds = 60000 >>>> while (!spark.streams.active.isEmpty) { >>>> Try(spark.streams.awaitAnyTermination(timeoutInMilliSeconds)) match { >>>> case Success(result) => >>>> if (result) { >>>> println("A streaming query was terminated successfully") >>>> spark.streams.resetTerminated() >>>> } >>>> case Failure(e) => >>>> println("Query failed with message: " + e.getMessage) >>>> e.printStackTrace() >>>> spark.streams.resetTerminated() >>>> } >>>> >>>> if (checkMarker(shutdownMarker)) { >>>> spark.streams.active.foreach(query => { >>>> println(s"Stopping streaming query: ${query.id}") >>>> query.stop() >>>> }) >>>> spark.stop() >>>> removeMarker(shutdownMarker) >>>> } >>>> } >>>> assert(spark.streams.active.isEmpty) >>>> spark.streams.removeListener(sparkQueryListener) >>>> } >>>> >>>> def checkMarker(markerFile: String): Boolean = { >>>> val fs = FileSystem.get(new Configuration()) >>>> fs.exists(new Path(markerFile)) >>>> } >>>> >>>> def removeMarker(markerFile: String): Unit = { >>>> val fs = FileSystem.get(new Configuration()) >>>> fs.delete(new Path(markerFile), true) >>>> } >>>> >>>> } >>>> >>>> case class MyState(var dateTime: String, var inProgress: Boolean = true, >>>> var count: Int = 1) >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> package com.myorg >>>> >>>> import org.apache.spark.sql.streaming.StreamingQueryListener >>>> >>>> class QueryListener extends StreamingQueryListener { >>>> >>>> def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): >>>> Unit = {} >>>> >>>> def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): >>>> Unit = { >>>> if (event.progress.numInputRows != 0) { >>>> println( >>>> s"InputRows: ${event.progress.numInputRows}" >>>> ) >>>> } >>>> } >>>> >>>> def onQueryTerminated(event: >>>> StreamingQueryListener.QueryTerminatedEvent): Unit = { >>>> println(s"Query with id ${event.id} terminated") >>>> } >>>> } >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> <project xmlns="http://maven.apache.org/POM/4.0.0" >>>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" >>>> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 >>>> http://maven.apache.org/maven-v4_0_0.xsd"> >>>> <modelVersion>4.0.0</modelVersion> >>>> <groupId>com.myorg</groupId> >>>> <artifactId>spark-3-conversion</artifactId> >>>> <packaging>jar</packaging> >>>> <version>1.0-SNAPSHOT</version> >>>> <name>spark-3-conversion</name> >>>> <url>http://maven.apache.org</url> >>>> >>>> <properties> >>>> <spark.version>3.0.0</spark.version> >>>> <scala.binary.version>2.12</scala.binary.version> >>>> <scala.version>2.12.10</scala.version> >>>> <scoverage.plugin.version>1.4.0-RC1</scoverage.plugin.version> >>>> <skipTests>true</skipTests> >>>> <maven.compiler.source>1.5</maven.compiler.source> >>>> <maven.compiler.target>1.5</maven.compiler.target> >>>> <encoding>UTF-8</encoding> >>>> </properties> >>>> >>>> >>>> <dependencies> >>>> <dependency> >>>> <groupId>org.scala-lang</groupId> >>>> <artifactId>scala-library</artifactId> >>>> <version>${scala.version}</version> >>>> </dependency> >>>> >>>> <dependency> >>>> <groupId>org.apache.spark</groupId> >>>> <artifactId>spark-core_${scala.binary.version}</artifactId> >>>> <version>${spark.version}</version> >>>> <scope>provided</scope> >>>> </dependency> >>>> <dependency> >>>> <groupId>org.apache.spark</groupId> >>>> <artifactId>spark-streaming_${scala.binary.version}</artifactId> >>>> <version>${spark.version}</version> >>>> <scope>provided</scope> >>>> </dependency> >>>> <dependency> >>>> <groupId>org.apache.spark</groupId> >>>> <artifactId>spark-sql_${scala.binary.version}</artifactId> >>>> <version>${spark.version}</version> >>>> <scope>provided</scope> >>>> </dependency> >>>> <dependency> >>>> <groupId>org.apache.spark</groupId> >>>> >>>> <artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId> >>>> <version>${spark.version}</version> >>>> </dependency> >>>> >>>> <dependency> >>>> <groupId>org.apache.spark</groupId> >>>> <artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId> >>>> <version>${spark.version}</version> >>>> </dependency> >>>> >>>> <dependency> >>>> <groupId>org.slf4j</groupId> >>>> <artifactId>slf4j-log4j12</artifactId> >>>> <version>1.7.7</version> >>>> <scope>runtime</scope> >>>> </dependency> >>>> <dependency> >>>> <groupId>log4j</groupId> >>>> <artifactId>log4j</artifactId> >>>> <version>1.2.17</version> >>>> <scope>compile</scope> >>>> </dependency> >>>> >>>> <dependency> >>>> <groupId>org.scalatest</groupId> >>>> <artifactId>scalatest_${scala.binary.version}</artifactId> >>>> <version>3.0.7</version> >>>> <scope>test</scope> >>>> </dependency> >>>> >>>> <dependency> >>>> <groupId>junit</groupId> >>>> <artifactId>junit</artifactId> >>>> <version>3.8.1</version> >>>> <scope>test</scope> >>>> </dependency> >>>> </dependencies> >>>> >>>> <build> >>>> <plugins> >>>> <plugin> >>>> <groupId>org.apache.maven.plugins</groupId> >>>> <artifactId>maven-shade-plugin</artifactId> >>>> <version>3.0.0</version> >>>> <executions> >>>> <!-- Run shade goal on package phase --> >>>> <execution> >>>> <phase>install</phase> >>>> <goals> >>>> <goal>shade</goal> >>>> </goals> >>>> </execution> >>>> </executions> >>>> </plugin> >>>> >>>> <!-- Scala Compiler --> >>>> <plugin> >>>> <groupId>net.alchim31.maven</groupId> >>>> <artifactId>scala-maven-plugin</artifactId> >>>> <version>3.2.2</version> >>>> <executions> >>>> <execution> >>>> <goals> >>>> <goal>compile</goal> >>>> <goal>testCompile</goal> >>>> </goals> >>>> </execution> >>>> </executions> >>>> </plugin> >>>> >>>> <plugin> >>>> <groupId>org.codehaus.mojo</groupId> >>>> <artifactId>build-helper-maven-plugin</artifactId> >>>> <version>1.7</version> >>>> <executions> >>>> <!-- Add src/main/scala to eclipse build path --> >>>> <execution> >>>> <id>add-source</id> >>>> <phase>generate-sources</phase> >>>> <goals> >>>> <goal>add-source</goal> >>>> </goals> >>>> <configuration> >>>> <sources> >>>> <source>src/main/scala</source> >>>> </sources> >>>> </configuration> >>>> </execution> >>>> <!-- Add src/test/scala to eclipse build path --> >>>> <execution> >>>> <id>add-test-source</id> >>>> <phase>generate-test-sources</phase> >>>> <goals> >>>> <goal>add-test-source</goal> >>>> </goals> >>>> <configuration> >>>> <sources> >>>> <source>src/test/scala</source> >>>> </sources> >>>> </configuration> >>>> </execution> >>>> </executions> >>>> </plugin> >>>> >>>> <!-- we disable surefile and enable scalatest so that maven can run >>>> our tests --> >>>> <plugin> >>>> <groupId>org.apache.maven.plugins</groupId> >>>> <artifactId>maven-surefire-plugin</artifactId> >>>> <version>2.7</version> >>>> <configuration> >>>> <skipTests>true</skipTests> >>>> </configuration> >>>> </plugin> >>>> <plugin> >>>> <groupId>org.scalatest</groupId> >>>> <artifactId>scalatest-maven-plugin</artifactId> >>>> <version>1.0</version> >>>> <configuration> >>>> >>>> <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory> >>>> <junitxml>.</junitxml> >>>> <filereports>WDF TestSuite.txt</filereports> >>>> </configuration> >>>> <executions> >>>> <execution> >>>> <id>test</id> >>>> <goals> >>>> <goal>test</goal> >>>> </goals> >>>> </execution> >>>> </executions> >>>> </plugin> >>>> <plugin> >>>> <groupId>org.scalastyle</groupId> >>>> <artifactId>scalastyle-maven-plugin</artifactId> >>>> <version>1.0.0</version> >>>> <configuration> >>>> <verbose>false</verbose> >>>> <failOnViolation>true</failOnViolation> >>>> <includeTestSourceDirectory>true</includeTestSourceDirectory> >>>> <failOnWarning>false</failOnWarning> >>>> >>>> <sourceDirectory>${project.basedir}/src/main/scala</sourceDirectory> >>>> >>>> <testSourceDirectory>${project.basedir}/src/test/scala</testSourceDirectory> >>>> <configLocation>lib/scalastyle_config.xml</configLocation> >>>> >>>> <outputFile>${project.basedir}/target/scalastyle-output.xml</outputFile> >>>> <outputEncoding>UTF-8</outputEncoding> >>>> </configuration> >>>> <executions> >>>> <execution> >>>> <goals> >>>> <goal>check</goal> >>>> </goals> >>>> </execution> >>>> </executions> >>>> </plugin> >>>> <plugin> >>>> <groupId>org.sonarsource.scanner.maven</groupId> >>>> <artifactId>sonar-maven-plugin</artifactId> >>>> <version>3.6.0.1398</version> >>>> </plugin> >>>> >>>> </plugins> >>>> </build> >>>> </project> >>>> >>>> >>>> >>>> On Wed, Jan 13, 2021 at 11:26 PM Eric Beabes <mailinglist...@gmail.com> >>>> wrote: >>>> >>>>> Ok. I will work on creating a reproducible app. Thanks. >>>>> >>>>> On Wed, Jan 13, 2021 at 3:57 PM Gabor Somogyi < >>>>> gabor.g.somo...@gmail.com> wrote: >>>>> >>>>>> Just reached this thread. +1 on to create a simple reproducer app and >>>>>> I suggest to create a jira attaching the full driver and executor logs. >>>>>> Ping me on the jira and I'll pick this up right away... >>>>>> >>>>>> Thanks! >>>>>> >>>>>> G >>>>>> >>>>>> >>>>>> On Wed, Jan 13, 2021 at 8:54 AM Jungtaek Lim < >>>>>> kabhwan.opensou...@gmail.com> wrote: >>>>>> >>>>>>> Would you mind if I ask for a simple reproducer? Would be nice if >>>>>>> you could create a repository in Github and push the code including the >>>>>>> build script. >>>>>>> >>>>>>> Thanks in advance! >>>>>>> >>>>>>> On Wed, Jan 13, 2021 at 3:46 PM Eric Beabes < >>>>>>> mailinglist...@gmail.com> wrote: >>>>>>> >>>>>>>> I tried both. First tried 3.0.0. That didn't work so I tried 3.1.0. >>>>>>>> >>>>>>>> On Wed, Jan 13, 2021 at 11:35 AM Jungtaek Lim < >>>>>>>> kabhwan.opensou...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Which exact Spark version did you use? Did you make sure the >>>>>>>>> version for Spark and the version for spark-sql-kafka artifact are the >>>>>>>>> same? (I asked this because you've said you've used Spark 3.0 but >>>>>>>>> spark-sql-kafka dependency pointed to 3.1.0.) >>>>>>>>> >>>>>>>>> On Tue, Jan 12, 2021 at 11:04 PM Eric Beabes < >>>>>>>>> mailinglist...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> org.apache.spark.sql.streaming.StreamingQueryException: Data >>>>>>>>>> source v2 streaming sinks does not support Update mode. === >>>>>>>>>> Streaming Query >>>>>>>>>> === Identifier: [id = 1f342043-29de-4381-bc48-1c6eef99232e, runId = >>>>>>>>>> 62410f05-db59-4026-83fc-439a79b01c29] Current Committed Offsets: {} >>>>>>>>>> Current >>>>>>>>>> Available Offsets: {} Current State: INITIALIZING Thread State: >>>>>>>>>> RUNNABLE at >>>>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:353) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244) >>>>>>>>>> Caused by: java.lang.IllegalArgumentException: Data source v2 >>>>>>>>>> streaming >>>>>>>>>> sinks does not support Update mode. at >>>>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution.createStreamingWrite(StreamExecution.scala:635) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:130) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:61) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:320) >>>>>>>>>> ... 1 more >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> *Please see the attached image for more information.* >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Tue, Jan 12, 2021 at 6:01 PM Jacek Laskowski <ja...@japila.pl> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi, >>>>>>>>>>> >>>>>>>>>>> Can you post the whole message? I'm trying to find what might be >>>>>>>>>>> causing it. A small reproducible example would be of help too. >>>>>>>>>>> Thank you. >>>>>>>>>>> >>>>>>>>>>> Pozdrawiam, >>>>>>>>>>> Jacek Laskowski >>>>>>>>>>> ---- >>>>>>>>>>> https://about.me/JacekLaskowski >>>>>>>>>>> "The Internals Of" Online Books <https://books.japila.pl/> >>>>>>>>>>> Follow me on https://twitter.com/jaceklaskowski >>>>>>>>>>> >>>>>>>>>>> <https://twitter.com/jaceklaskowski> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Tue, Jan 12, 2021 at 6:35 AM Eric Beabes < >>>>>>>>>>> mailinglist...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Trying to port my Spark 2.4 based (Structured) streaming >>>>>>>>>>>> application to Spark 3.0. I compiled it using the dependency given >>>>>>>>>>>> below: >>>>>>>>>>>> >>>>>>>>>>>> <dependency> >>>>>>>>>>>> <groupId>org.apache.spark</groupId> >>>>>>>>>>>> >>>>>>>>>>>> <artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId> >>>>>>>>>>>> <version>3.1.0</version> >>>>>>>>>>>> </dependency> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Every time I run it under Spark 3.0, I get this message: *Data >>>>>>>>>>>> source v2 streaming sinks does not support Update mode* >>>>>>>>>>>> >>>>>>>>>>>> I am using '*mapGroupsWithState*' so as per this link ( >>>>>>>>>>>> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes), >>>>>>>>>>>> the only supported Output mode is "*Update*". >>>>>>>>>>>> >>>>>>>>>>>> My Sink is a Kafka topic so I am using this: >>>>>>>>>>>> >>>>>>>>>>>> .writeStream >>>>>>>>>>>> .format("kafka") >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> What am I missing? >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> --------------------------------------------------------------------- >>>>>>>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>>>>>>>> >>>>>>>>>