Parse RDD[Seq[String]] to DataFrame with types.
I'm trying to parse a RDD[Seq[String]] to Dataframe. ALthough it's a Seq of Strings they could have a more specific type as Int, Boolean, Double, String an so on. For example, a line could be: "hello", "1", "bye", "1.1" "hello1", "11", "bye1", "2.1" ... First column is going to be always a String, second an int and so on and it's going to be always on this way. On the other hand, one execution could have seq of five elements and others the sequences could have 2000, so it depends of the execution but in each execution I know the types of each "column" or "elem" of the sequence. To do it, I could have something like this: //I could have a parameter to generate the StructType dinamically. def getSchema(): StructType = { var schemaArray = scala.collection.mutable.ArrayBuffer[StructField]() schemaArray += StructField("col1" , IntegerType, true) schemaArray += StructField("col2" , StringType, true) schemaArray += StructField("col2" , DoubleType, true) StructType(schemaArray) } //Array of Any?? it doesn't seem the best option!! val l1: Seq[Any] = Seq(1,"2", 1.1 ) val rdd1 = sc.parallelize(Lz).map(Row.fromSeq(_)) val schema = getSchema() val df = sqlContext.createDataFrame(rdd1, schema) df.show() df.schema I don't like at all to have a Seq of Any, but it's really what I have. Another chance?? On the other hand I was thinking that I have something similar to a CSV, I could create one. With spark there is a library to read an CSV and return a dataframe where types are infered. Is it possible to call it if I have already an RDD[String]?
Re: Putting record in HBase with Spark - error get regions.
After a while it's possible to see this error too: 9/05/28 11:11:18 ERROR executor.Executor: Exception in task 35.1 in stage 0.0 (TID 265) org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed 122 actions: my_table: 122 times, at org.apache.hadoop.hbase.client.AsyncProcess$BatchErrors.makeException(AsyncProcess.java:258) at org.apache.hadoop.hbase.client.AsyncProcess$BatchErrors.access$2000(AsyncProcess.java:238) at org.apache.hadoop.hbase.client.AsyncProcess.waitForAllPreviousOpsAndReset(AsyncProcess.java:1810) at org.apache.hadoop.hbase.client.BufferedMutatorImpl.backgroundFlushCommits(BufferedMutatorImpl.java:240) at org.apache.hadoop.hbase.client.BufferedMutatorImpl.mutate(BufferedMutatorImpl.java:146) at org.apache.hadoop.hbase.client.HTable.put(HTable.java:1092) at example.bigdata.v360.dsl.UpsertDsl$$anonfun$writeToHBase$1.apply(UpsertDsl.scala:25) at example.bigdata.v360.dsl.UpsertDsl$$anonfun$writeToHBase$1.apply(UpsertDsl.scala:19) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:929) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:929) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 19/05/28 11:11:18 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 369 El mar., 28 may. 2019 a las 12:12, Guillermo Ortiz Fernández (< guillermo.ortiz.f...@gmail.com>) escribió: > I'm executing a load process into HBase with spark. (around 150M record). > At the end of the process there are a lot of fail tasks. > > I get this error: > > 19/05/28 11:02:31 ERROR client.AsyncProcess: Failed to get region location > org.apache.hadoop.hbase.TableNotFoundException: my_table > at > org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegionInMeta(ConnectionManager.java:1417) > at > org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1211) > at > org.apache.hadoop.hbase.client.AsyncProcess.submit(AsyncProcess.java:410) > at > org.apache.hadoop.hbase.client.AsyncProcess.submit(AsyncProcess.java:359) > at > org.apache.hadoop.hbase.client.BufferedMutatorImpl.backgroundFlushCommits(BufferedMutatorImpl.java:238) > at > org.apache.hadoop.hbase.client.BufferedMutatorImpl.mutate(BufferedMutatorImpl.java:146) > at org.apache.hadoop.hbase.client.HTable.put(HTable.java:1092) > at > example.bigdata.v360.dsl.UpsertDsl$$anonfun$writeToHBase$1.apply(UpsertDsl.scala:25) > at > example.bigdata.v360.dsl.UpsertDsl$$anonfun$writeToHBase$1.apply(UpsertDsl.scala:19) > at > org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:929) > at > org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:929) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:109) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > > > When I execute from the hbase shell an scan, it works. Which could it be the > reason? I'm not sure if it's more a error from HBase or Spark. > >
Putting record in HBase with Spark - error get regions.
I'm executing a load process into HBase with spark. (around 150M record). At the end of the process there are a lot of fail tasks. I get this error: 19/05/28 11:02:31 ERROR client.AsyncProcess: Failed to get region location org.apache.hadoop.hbase.TableNotFoundException: my_table at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegionInMeta(ConnectionManager.java:1417) at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1211) at org.apache.hadoop.hbase.client.AsyncProcess.submit(AsyncProcess.java:410) at org.apache.hadoop.hbase.client.AsyncProcess.submit(AsyncProcess.java:359) at org.apache.hadoop.hbase.client.BufferedMutatorImpl.backgroundFlushCommits(BufferedMutatorImpl.java:238) at org.apache.hadoop.hbase.client.BufferedMutatorImpl.mutate(BufferedMutatorImpl.java:146) at org.apache.hadoop.hbase.client.HTable.put(HTable.java:1092) at example.bigdata.v360.dsl.UpsertDsl$$anonfun$writeToHBase$1.apply(UpsertDsl.scala:25) at example.bigdata.v360.dsl.UpsertDsl$$anonfun$writeToHBase$1.apply(UpsertDsl.scala:19) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:929) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:929) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) When I execute from the hbase shell an scan, it works. Which could it be the reason? I'm not sure if it's more a error from HBase or Spark.
Trying to improve performance of the driver.
I have a process in Spark Streamin which lasts 2 seconds. When I check where the time is spent I see about 0.8s-1s in processing time although the global time is 2s. This one second is spent in the driver. I reviewed the code which is executed by the driver and I commented some of this code with the same result. So I don't have any idea where the time is spent. Righ now, I'm executing in client mode from one the node inside the cluster so I can't set the number the cores to the driver (although I don't think that it's going to make the difference) . How could I know where the driver is spending the time? I'm not sure if it possible to improve the performance in this point or that second is spent scheduling the graph of each microbatch mainly
Re: deploy-mode cluster. FileNotFoundException
I'm using standalone cluster and the final command I'm trying is: spark-submit --verbose --deploy-mode cluster --driver-java-options "-Dlogback.configurationFile=conf/i${1}Logback.xml" \ --class com.example.Launcher --driver-class-path lib/spark-streaming-kafka-0-10_2.11-2.0.2.jar:lib/kafka-clients-0.10.0.1.jar \ --files conf/${1}Conf.json iris-core-0.0.1-SNAPSHOT.jar conf/${1}Conf.json El mié., 5 sept. 2018 a las 11:11, Guillermo Ortiz Fernández (< guillermo.ortiz.f...@gmail.com>) escribió: > I want to execute my processes in cluster mode. As I don't know where the > driver has been executed I have to do available all the file it needs. I > undertand that they are two options. Copy all the files to all nodes of > copy them to HDFS. > > My doubt is,, if I want to put all the files in HDFS, isn't it automatic > with --files and --jar parameters in the spark-submit command? or do I have > to copy to HDFS manually? > > My idea is to execute something like: > spark-submit --driver-java-options > "-Dlogback.configurationFile=conf/${1}Logback.xml" \ > --class com.example.Launcher --driver-class-path > lib/spark-streaming-kafka-0-10_2.11-2.0.2.jar:lib/kafka-clients-1.0.0.jar \ > --files /conf/${1}Conf.json example-0.0.1-SNAPSHOT.jar conf/${1}Conf.json > I have tried to with --files hdfs:// without copying anything to hdfs > and it doesn't work either. > >
deploy-mode cluster. FileNotFoundException
I want to execute my processes in cluster mode. As I don't know where the driver has been executed I have to do available all the file it needs. I undertand that they are two options. Copy all the files to all nodes of copy them to HDFS. My doubt is,, if I want to put all the files in HDFS, isn't it automatic with --files and --jar parameters in the spark-submit command? or do I have to copy to HDFS manually? My idea is to execute something like: spark-submit --driver-java-options "-Dlogback.configurationFile=conf/${1}Logback.xml" \ --class com.example.Launcher --driver-class-path lib/spark-streaming-kafka-0-10_2.11-2.0.2.jar:lib/kafka-clients-1.0.0.jar \ --files /conf/${1}Conf.json example-0.0.1-SNAPSHOT.jar conf/${1}Conf.json I have tried to with --files hdfs:// without copying anything to hdfs and it doesn't work either.
Re: Spark Streaming - Kafka. java.lang.IllegalStateException: This consumer has already been closed.
I can't... do you think that it's a possible bug of this version?? from Spark or Kafka? El mié., 29 ago. 2018 a las 22:28, Cody Koeninger () escribió: > Are you able to try a recent version of spark? > > On Wed, Aug 29, 2018 at 2:10 AM, Guillermo Ortiz Fernández > wrote: > > I'm using Spark Streaming 2.0.1 with Kafka 0.10, sometimes I get this > > exception and Spark dies. > > > > I couldn't see any error or problem among the machines, anybody has the > > reason about this error? > > > > > > java.lang.IllegalStateException: This consumer has already been closed. > > at > > > org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1787) > > ~[kafka-clients-1.0.0.jar:na] > > at > > > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1091) > > ~[kafka-clients-1.0.0.jar:na] > > at > > > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.paranoidPoll(DirectKafkaInputDStream.scala:169) > > ~[spark-streaming-kafka-0-10_2.11-2.0.2.jar:2.0.2] > > at > > > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:188) > > ~[spark-streaming-kafka-0-10_2.11-2.0.2.jar:2.0.2] > > at > > > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:215) > > ~[spark-streaming-kafka-0-10_2.11-2.0.2.jar:2.0.2] > > at > > > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) > > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] > > at > > > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) > > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] > > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) > > ~[scala-library-2.11.11.jar:na] > > at > > > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) > > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] > > at > > > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) > > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] > > at > > > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415) > > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] > > at > > > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335) > > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] > > at > > > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333) > > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] > > at scala.Option.orElse(Option.scala:289) > ~[scala-library-2.11.11.jar:na] > > at > > > org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330) > > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] > > at > > > org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48) > > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] > > at > > > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117) > > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] > > at > > > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) > > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] > > at > > > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > > ~[scala-library-2.11.11.jar:na] > > at > > > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > > ~[scala-library-2.11.11.jar:na] > > at > > > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > > ~[scala-library-2.11.11.jar:na] > > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > > ~[scala-library-2.11.11.jar:na] > > at > > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) > > ~[scala-library-2.11.11.jar:na] > > at > scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) > > ~[scala-library-2.11.11.jar:na] > > at > > > org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116) > > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] > > at > > > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249) > > ~[spark-streaming_2.
java.lang.OutOfMemoryError: Java heap space - Spark driver.
I got this error from spark driver, it seems that I should increase the memory in the driver although it's 5g (and 4 cores) right now. It seems weird to me because I'm not using Kryo or broadcast in this process but in the log there are references to Kryo and broadcast. How could I figure out the reason of this outOfMemory? Is it normal that there are references to Kryo and broadcasting when I'm not using it? 05:11:19.110 [streaming-job-executor-0] WARN c.datastax.driver.core.CodecRegistry - Ignoring codec DateRangeCodec ['org.apache.cassandra.db.marshal.DateRangeType' <-> com.datastax.driver.dse.search.DateRange] because it collides with previously registered codec DateRangeCodec ['org.apache.cassandra.db.marshal.DateRangeType' <-> com.datastax.driver.dse.search.DateRange] 05:11:26.806 [dag-scheduler-event-loop] WARN org.apache.spark.util.Utils - Suppressing exception in finally: Java heap space java.lang.OutOfMemoryError: Java heap space at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57) ~[na:1.8.0_162] at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) ~[na:1.8.0_162] at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$4.apply(TorrentBroadcast.scala:231) ~[spark-core_2.11-2.0.2.15.jar:2.0.2.15] at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$4.apply(TorrentBroadcast.scala:231) ~[spark-core_2.11-2.0.2.15.jar:2.0.2.15] at org.apache.spark.util.io.ChunkedByteBufferOutputStream.allocateNewChunkIfNeeded(ChunkedByteBufferOutputStream.scala:87) ~[spark-core_2.11-2.0.2.15.jar:2.0.2.15] at org.apache.spark.util.io.ChunkedByteBufferOutputStream.write(ChunkedByteBufferOutputStream.scala:75) ~[spark-core_2.11-2.0.2.15.jar:2.0.2.15] at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:205) ~[lz4-1.3.0.jar:na] at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:158) ~[lz4-1.3.0.jar:na] at com.esotericsoftware.kryo.io.Output.flush(Output.java:181) ~[kryo-3.0.3.jar:na] at com.esotericsoftware.kryo.io.Output.close(Output.java:191) ~[kryo-3.0.3.jar:na] at org.apache.spark.serializer.KryoSerializationStream.close(KryoSerializer.scala:209) ~[spark-core_2.11-2.0.2.15.jar:2.0.2.15] at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject$1.apply$mcV$sp(TorrentBroadcast.scala:238) ~[spark-core_2.11-2.0.2.15.jar:2.0.2.15] at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1319) ~[spark-core_2.11-2.0.2.15.jar:2.0.2.15] at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:237) [spark-core_2.11-2.0.2.15.jar:2.0.2.15] at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:107) [spark-core_2.11-2.0.2.15.jar:2.0.2.15] at org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:86) [spark-core_2.11-2.0.2.15.jar:2.0.2.15] at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) [spark-core_2.11-2.0.2.15.jar:2.0.2.15] at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:56) [spark-core_2.11-2.0.2.15.jar:2.0.2.15] at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1387) [spark-core_2.11-2.0.2.15.jar:2.0.2.15] at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1012) [spark-core_2.11-2.0.2.15.jar:2.0.2.15] at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:933) [spark-core_2.11-2.0.2.15.jar:2.0.2.15] at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:936) [spark-core_2.11-2.0.2.15.jar:2.0.2.15] at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:935) [spark-core_2.11-2.0.2.15.jar:2.0.2.15] at scala.collection.immutable.List.foreach(List.scala:392) [scala-library-2.11.11.jar:na] at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:935) [spark-core_2.11-2.0.2.15.jar:2.0.2.15] at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:873) [spark-core_2.11-2.0.2.15.jar:2.0.2.15] at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1630) [spark-core_2.11-2.0.2.15.jar:2.0.2.15] at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622) [spark-core_2.11-2.0.2.15.jar:2.0.2.15] at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611) [spark-core_2.11-2.0.2.15.jar:2.0.2.15] at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) [spark-core_2.11-2.0.2.15.jar:2.0.2.15] 05:40:53.535 [dse-app-client-thread-pool-0] WARN c.datastax.driver.core.CodecRegistry - Ignoring codec DateRangeCodec ['org.apache.cassandra.db.marshal.DateRangeType'
Spark Streaming - Kafka. java.lang.IllegalStateException: This consumer has already been closed.
I'm using Spark Streaming 2.0.1 with Kafka 0.10, sometimes I get this exception and Spark dies. I couldn't see any error or problem among the machines, anybody has the reason about this error? java.lang.IllegalStateException: This consumer has already been closed. at org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1787) ~[kafka-clients-1.0.0.jar:na] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1091) ~[kafka-clients-1.0.0.jar:na] at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.paranoidPoll(DirectKafkaInputDStream.scala:169) ~[spark-streaming-kafka-0-10_2.11-2.0.2.jar:2.0.2] at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:188) ~[spark-streaming-kafka-0-10_2.11-2.0.2.jar:2.0.2] at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:215) ~[spark-streaming-kafka-0-10_2.11-2.0.2.jar:2.0.2] at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) ~[scala-library-2.11.11.jar:na] at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415) ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335) ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333) ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] at scala.Option.orElse(Option.scala:289) ~[scala-library-2.11.11.jar:na] at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330) ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48) ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117) ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) ~[scala-library-2.11.11.jar:na] at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) ~[scala-library-2.11.11.jar:na] at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) ~[scala-library-2.11.11.jar:na] at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) ~[scala-library-2.11.11.jar:na] at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) ~[scala-library-2.11.11.jar:na] at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) ~[scala-library-2.11.11.jar:na] at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116) ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249) ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247) ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] at scala.util.Try$.apply(Try.scala:192) ~[scala-library-2.11.11.jar:na] at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247) ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183) ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89) ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88) ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15] at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) ~[spark-core_2.11-2.0.2.15.jar:2.0.2.15]
Refresh broadcast variable when it isn't the value.
Hello, I want to set data in a broadcast (Map) variable in Spark. Sometimes there are new data so I have to update/refresh the values but I'm not sure how I could do this. My idea is to use accumulators like a flag when a cache error occurs, in this point I could read the data and reload the broadcast variable for the next microbach and finally reset the accumulator to 0. I don't know if there are a better solution or others ideas to do this. Has anyone faced this problem?
Reset the offsets, Kafka 0.10 and Spark
I'm consuming data from Kafka with createDirectStream and store the offsets in Kafka ( https://spark.apache.org/docs/2.1.0/streaming-kafka-0-10-integration.html#kafka-itself ) val stream = KafkaUtils.createDirectStream[String, String]( streamingContext, PreferConsistent, Subscribe[String, String](topics, kafkaParams)) My Spark version is 2.0.2 and 0.10 from Kafka. This solution works well and when I restart the spark process starts from the last offset which Spark consumes, but sometimes I need to reprocess all the topic from the beginning. I have seen that I could reset the offset with a kafka script but it's not enable in Kafka 0.10... kafka-consumer-groups --bootstrap-server kafka-host:9092 --group my-group --reset-offsets --to-earliest --all-topics --execute Another possibility it's to set another kafka parameter in the createDirectStream with a map with the offsets but, how could I get first offset from each partition?, I have checked the api from the new consumer and I don't see any method to get these offsets. Any other way?? I could start with another groupId as well, but it doesn't seem a very clean option for production.
Measure performance time in some spark transformations.
I want to measure how long it takes some different transformations in Spark as map, joinWithCassandraTable and so on. Which one is the best aproximation to do it? def time[R](block: => R): R = { val t0 = System.nanoTime() val result = block val t1 = System.nanoTime() println("Elapsed time: " + (t1 - t0) + "ns") result} Could I use something like this?? I guess that the System.nanoTime will be executed in the driver before and after the workers execute the maps/joins and so on. Is it right? any other idea?