Parse RDD[Seq[String]] to DataFrame with types.

2019-07-17 Thread Guillermo Ortiz Fernández
I'm trying to parse a RDD[Seq[String]] to Dataframe.
ALthough it's a Seq of Strings they could have a more specific type as Int,
Boolean, Double, String an so on.
For example, a line could be:
"hello", "1", "bye", "1.1"
"hello1", "11", "bye1", "2.1"
...

First column is going to be always a String, second an int and so on and
it's going to be always on this way. On the other hand, one execution could
have  seq of five elements and others the sequences could have 2000, so it
depends of the execution but in each execution I know the types of each
"column" or "elem" of the sequence.

To do it, I could have something like this:
//I could have a parameter to generate the StructType dinamically.
def getSchema(): StructType = {
  var schemaArray = scala.collection.mutable.ArrayBuffer[StructField]()
  schemaArray += StructField("col1" , IntegerType, true)
  schemaArray += StructField("col2" , StringType, true)
  schemaArray += StructField("col2" , DoubleType, true)
  StructType(schemaArray)
}

//Array of Any?? it doesn't seem the best option!!
val l1: Seq[Any] = Seq(1,"2", 1.1 )
val rdd1 = sc.parallelize(Lz).map(Row.fromSeq(_))

val schema = getSchema()
val df = sqlContext.createDataFrame(rdd1, schema)
df.show()
df.schema

I don't like at all to have a Seq of Any, but it's really what I have.
Another chance??

On the other hand I was thinking that I have something similar to a CSV, I
could create one. With spark there is a library to read an CSV and return a
dataframe where types are infered. Is it possible to call it if I have
already an RDD[String]?


Re: Putting record in HBase with Spark - error get regions.

2019-05-28 Thread Guillermo Ortiz Fernández
After a while it's possible to see this error too:

9/05/28 11:11:18 ERROR executor.Executor: Exception in task 35.1 in
stage 0.0 (TID 265)
org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException:
Failed 122 actions: my_table: 122 times,
at 
org.apache.hadoop.hbase.client.AsyncProcess$BatchErrors.makeException(AsyncProcess.java:258)
at 
org.apache.hadoop.hbase.client.AsyncProcess$BatchErrors.access$2000(AsyncProcess.java:238)
at 
org.apache.hadoop.hbase.client.AsyncProcess.waitForAllPreviousOpsAndReset(AsyncProcess.java:1810)
at 
org.apache.hadoop.hbase.client.BufferedMutatorImpl.backgroundFlushCommits(BufferedMutatorImpl.java:240)
at 
org.apache.hadoop.hbase.client.BufferedMutatorImpl.mutate(BufferedMutatorImpl.java:146)
at org.apache.hadoop.hbase.client.HTable.put(HTable.java:1092)
at 
example.bigdata.v360.dsl.UpsertDsl$$anonfun$writeToHBase$1.apply(UpsertDsl.scala:25)
at 
example.bigdata.v360.dsl.UpsertDsl$$anonfun$writeToHBase$1.apply(UpsertDsl.scala:19)
at 
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:929)
at 
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:929)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
19/05/28 11:11:18 INFO executor.CoarseGrainedExecutorBackend: Got
assigned task 369


El mar., 28 may. 2019 a las 12:12, Guillermo Ortiz Fernández (<
guillermo.ortiz.f...@gmail.com>) escribió:

> I'm executing a load process into HBase with spark. (around 150M record).
> At the end of the process there are a lot of fail tasks.
>
> I get this error:
>
> 19/05/28 11:02:31 ERROR client.AsyncProcess: Failed to get region location
> org.apache.hadoop.hbase.TableNotFoundException: my_table
>   at 
> org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegionInMeta(ConnectionManager.java:1417)
>   at 
> org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1211)
>   at 
> org.apache.hadoop.hbase.client.AsyncProcess.submit(AsyncProcess.java:410)
>   at 
> org.apache.hadoop.hbase.client.AsyncProcess.submit(AsyncProcess.java:359)
>   at 
> org.apache.hadoop.hbase.client.BufferedMutatorImpl.backgroundFlushCommits(BufferedMutatorImpl.java:238)
>   at 
> org.apache.hadoop.hbase.client.BufferedMutatorImpl.mutate(BufferedMutatorImpl.java:146)
>   at org.apache.hadoop.hbase.client.HTable.put(HTable.java:1092)
>   at 
> example.bigdata.v360.dsl.UpsertDsl$$anonfun$writeToHBase$1.apply(UpsertDsl.scala:25)
>   at 
> example.bigdata.v360.dsl.UpsertDsl$$anonfun$writeToHBase$1.apply(UpsertDsl.scala:19)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:929)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:929)
>   at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
>   at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>   at org.apache.spark.scheduler.Task.run(Task.scala:109)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
>
>
> When I execute from the hbase shell an scan, it works. Which could it be the 
> reason? I'm not sure if it's more a error from HBase or Spark.
>
>


Putting record in HBase with Spark - error get regions.

2019-05-28 Thread Guillermo Ortiz Fernández
I'm executing a load process into HBase with spark. (around 150M record).
At the end of the process there are a lot of fail tasks.

I get this error:

19/05/28 11:02:31 ERROR client.AsyncProcess: Failed to get region location
org.apache.hadoop.hbase.TableNotFoundException: my_table
at 
org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegionInMeta(ConnectionManager.java:1417)
at 
org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1211)
at 
org.apache.hadoop.hbase.client.AsyncProcess.submit(AsyncProcess.java:410)
at 
org.apache.hadoop.hbase.client.AsyncProcess.submit(AsyncProcess.java:359)
at 
org.apache.hadoop.hbase.client.BufferedMutatorImpl.backgroundFlushCommits(BufferedMutatorImpl.java:238)
at 
org.apache.hadoop.hbase.client.BufferedMutatorImpl.mutate(BufferedMutatorImpl.java:146)
at org.apache.hadoop.hbase.client.HTable.put(HTable.java:1092)
at 
example.bigdata.v360.dsl.UpsertDsl$$anonfun$writeToHBase$1.apply(UpsertDsl.scala:25)
at 
example.bigdata.v360.dsl.UpsertDsl$$anonfun$writeToHBase$1.apply(UpsertDsl.scala:19)
at 
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:929)
at 
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:929)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)


When I execute from the hbase shell an scan, it works. Which could it
be the reason? I'm not sure if it's more a error from HBase or Spark.


Trying to improve performance of the driver.

2018-09-13 Thread Guillermo Ortiz Fernández
I have a process in Spark Streamin which lasts 2 seconds. When I check
where the time is spent I see about 0.8s-1s in processing time although the
global time is 2s. This one second is spent in the driver.
I reviewed the code which is executed by the driver and I commented some of
this code with the same result. So I don't have any idea where the time is
spent.

Righ now, I'm executing in client mode from one the node inside the cluster
so I can't set the number the cores to the driver (although I don't think
that it's going to make the difference) .

How could I know where the driver is spending the time? I'm not sure if it
possible to improve the performance in this point or that second is spent
scheduling the graph of each microbatch mainly


Re: deploy-mode cluster. FileNotFoundException

2018-09-05 Thread Guillermo Ortiz Fernández
I'm using standalone cluster and the final command I'm trying is:
spark-submit --verbose --deploy-mode cluster --driver-java-options
"-Dlogback.configurationFile=conf/i${1}Logback.xml" \
--class com.example.Launcher --driver-class-path
lib/spark-streaming-kafka-0-10_2.11-2.0.2.jar:lib/kafka-clients-0.10.0.1.jar
\
--files conf/${1}Conf.json iris-core-0.0.1-SNAPSHOT.jar conf/${1}Conf.json

El mié., 5 sept. 2018 a las 11:11, Guillermo Ortiz Fernández (<
guillermo.ortiz.f...@gmail.com>) escribió:

> I want to execute my processes in cluster mode. As I don't know where the
> driver has been executed I have to do available all the file it needs. I
> undertand that they are two options. Copy all the files to all nodes of
> copy them to HDFS.
>
> My doubt is,, if I want to put all the files in HDFS, isn't it automatic
> with --files and --jar parameters in the spark-submit command? or do I have
> to copy to HDFS manually?
>
> My idea is to execute something like:
> spark-submit --driver-java-options
> "-Dlogback.configurationFile=conf/${1}Logback.xml" \
> --class com.example.Launcher --driver-class-path
> lib/spark-streaming-kafka-0-10_2.11-2.0.2.jar:lib/kafka-clients-1.0.0.jar \
> --files /conf/${1}Conf.json example-0.0.1-SNAPSHOT.jar conf/${1}Conf.json
> I have tried to with --files hdfs:// without copying anything to hdfs
> and it doesn't work either.
>
>


deploy-mode cluster. FileNotFoundException

2018-09-05 Thread Guillermo Ortiz Fernández
I want to execute my processes in cluster mode. As I don't know where the
driver has been executed I have to do available all the file it needs. I
undertand that they are two options. Copy all the files to all nodes of
copy them to HDFS.

My doubt is,, if I want to put all the files in HDFS, isn't it automatic
with --files and --jar parameters in the spark-submit command? or do I have
to copy to HDFS manually?

My idea is to execute something like:
spark-submit --driver-java-options
"-Dlogback.configurationFile=conf/${1}Logback.xml" \
--class com.example.Launcher --driver-class-path
lib/spark-streaming-kafka-0-10_2.11-2.0.2.jar:lib/kafka-clients-1.0.0.jar \
--files /conf/${1}Conf.json example-0.0.1-SNAPSHOT.jar conf/${1}Conf.json
I have tried to with --files hdfs:// without copying anything to hdfs
and it doesn't work either.


Re: Spark Streaming - Kafka. java.lang.IllegalStateException: This consumer has already been closed.

2018-08-29 Thread Guillermo Ortiz Fernández
I can't... do you think that it's a possible bug of this version?? from
Spark or Kafka?

El mié., 29 ago. 2018 a las 22:28, Cody Koeninger ()
escribió:

> Are you able to try a recent version of spark?
>
> On Wed, Aug 29, 2018 at 2:10 AM, Guillermo Ortiz Fernández
>  wrote:
> > I'm using Spark Streaming 2.0.1 with Kafka 0.10, sometimes I get this
> > exception and Spark dies.
> >
> > I couldn't see any error or problem among the machines, anybody has the
> > reason about this error?
> >
> >
> > java.lang.IllegalStateException: This consumer has already been closed.
> > at
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1787)
> > ~[kafka-clients-1.0.0.jar:na]
> > at
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1091)
> > ~[kafka-clients-1.0.0.jar:na]
> > at
> >
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.paranoidPoll(DirectKafkaInputDStream.scala:169)
> > ~[spark-streaming-kafka-0-10_2.11-2.0.2.jar:2.0.2]
> > at
> >
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:188)
> > ~[spark-streaming-kafka-0-10_2.11-2.0.2.jar:2.0.2]
> > at
> >
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:215)
> > ~[spark-streaming-kafka-0-10_2.11-2.0.2.jar:2.0.2]
> > at
> >
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> > at
> >
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> > ~[scala-library-2.11.11.jar:na]
> > at
> >
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> > at
> >
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> > at
> >
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> > at
> >
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> > at
> >
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> > at scala.Option.orElse(Option.scala:289)
> ~[scala-library-2.11.11.jar:na]
> > at
> >
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> > at
> >
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> > at
> >
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> > at
> >
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> > at
> >
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> > ~[scala-library-2.11.11.jar:na]
> > at
> >
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> > ~[scala-library-2.11.11.jar:na]
> > at
> >
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> > ~[scala-library-2.11.11.jar:na]
> > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> > ~[scala-library-2.11.11.jar:na]
> > at
> > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
> > ~[scala-library-2.11.11.jar:na]
> > at
> scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
> > ~[scala-library-2.11.11.jar:na]
> > at
> >
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> > at
> >
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
> > ~[spark-streaming_2.

java.lang.OutOfMemoryError: Java heap space - Spark driver.

2018-08-29 Thread Guillermo Ortiz Fernández
I got this error from spark driver, it seems that I should increase the
memory in the driver although it's 5g (and 4 cores) right now. It seems
weird to me because I'm not using Kryo or broadcast in this process but in
the log there are references to Kryo and broadcast.
How could I figure out the reason of this outOfMemory? Is it normal that
there are references to Kryo and broadcasting when I'm not using it?

05:11:19.110 [streaming-job-executor-0] WARN
c.datastax.driver.core.CodecRegistry - Ignoring codec DateRangeCodec
['org.apache.cassandra.db.marshal.DateRangeType' <->
com.datastax.driver.dse.search.DateRange] because it collides with
previously registered codec DateRangeCodec
['org.apache.cassandra.db.marshal.DateRangeType' <->
com.datastax.driver.dse.search.DateRange]
05:11:26.806 [dag-scheduler-event-loop] WARN  org.apache.spark.util.Utils -
Suppressing exception in finally: Java heap space
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
~[na:1.8.0_162]
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) ~[na:1.8.0_162]
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$4.apply(TorrentBroadcast.scala:231)
~[spark-core_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$4.apply(TorrentBroadcast.scala:231)
~[spark-core_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.util.io.ChunkedByteBufferOutputStream.allocateNewChunkIfNeeded(ChunkedByteBufferOutputStream.scala:87)
~[spark-core_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.util.io.ChunkedByteBufferOutputStream.write(ChunkedByteBufferOutputStream.scala:75)
~[spark-core_2.11-2.0.2.15.jar:2.0.2.15]
at
net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:205)
~[lz4-1.3.0.jar:na]
at
net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:158)
~[lz4-1.3.0.jar:na]
at com.esotericsoftware.kryo.io.Output.flush(Output.java:181)
~[kryo-3.0.3.jar:na]
at com.esotericsoftware.kryo.io.Output.close(Output.java:191)
~[kryo-3.0.3.jar:na]
at
org.apache.spark.serializer.KryoSerializationStream.close(KryoSerializer.scala:209)
~[spark-core_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject$1.apply$mcV$sp(TorrentBroadcast.scala:238)
~[spark-core_2.11-2.0.2.15.jar:2.0.2.15]
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1319)
~[spark-core_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:237)
[spark-core_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:107)
[spark-core_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:86)
[spark-core_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
[spark-core_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:56)
[spark-core_2.11-2.0.2.15.jar:2.0.2.15]
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1387)
[spark-core_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1012)
[spark-core_2.11-2.0.2.15.jar:2.0.2.15]
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:933)
[spark-core_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:936)
[spark-core_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:935)
[spark-core_2.11-2.0.2.15.jar:2.0.2.15]
at scala.collection.immutable.List.foreach(List.scala:392)
[scala-library-2.11.11.jar:na]
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:935)
[spark-core_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:873)
[spark-core_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1630)
[spark-core_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)
[spark-core_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)
[spark-core_2.11-2.0.2.15.jar:2.0.2.15]
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
[spark-core_2.11-2.0.2.15.jar:2.0.2.15]
05:40:53.535 [dse-app-client-thread-pool-0] WARN
c.datastax.driver.core.CodecRegistry - Ignoring codec DateRangeCodec
['org.apache.cassandra.db.marshal.DateRangeType' 

Spark Streaming - Kafka. java.lang.IllegalStateException: This consumer has already been closed.

2018-08-29 Thread Guillermo Ortiz Fernández
I'm using Spark Streaming 2.0.1 with Kafka 0.10, sometimes I get this
exception and Spark dies.

I couldn't see any error or problem among the machines, anybody has the
reason about this error?


java.lang.IllegalStateException: This consumer has already been closed.
at
org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1787)
~[kafka-clients-1.0.0.jar:na]
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1091)
~[kafka-clients-1.0.0.jar:na]
at
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.paranoidPoll(DirectKafkaInputDStream.scala:169)
~[spark-streaming-kafka-0-10_2.11-2.0.2.jar:2.0.2]
at
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:188)
~[spark-streaming-kafka-0-10_2.11-2.0.2.jar:2.0.2]
at
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:215)
~[spark-streaming-kafka-0-10_2.11-2.0.2.jar:2.0.2]
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
~[scala-library-2.11.11.jar:na]
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at scala.Option.orElse(Option.scala:289) ~[scala-library-2.11.11.jar:na]
at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
~[scala-library-2.11.11.jar:na]
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
~[scala-library-2.11.11.jar:na]
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
~[scala-library-2.11.11.jar:na]
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
~[scala-library-2.11.11.jar:na]
at
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
~[scala-library-2.11.11.jar:na]
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
~[scala-library-2.11.11.jar:na]
at
org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at scala.util.Try$.apply(Try.scala:192) ~[scala-library-2.11.11.jar:na]
at
org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at 
org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
~[spark-core_2.11-2.0.2.15.jar:2.0.2.15]


Refresh broadcast variable when it isn't the value.

2018-08-19 Thread Guillermo Ortiz Fernández
Hello,

I want to set data in a broadcast (Map) variable in Spark.
Sometimes there are new data so I have to update/refresh the values but I'm
not sure how I could do this.

My idea is to use accumulators like a flag when a cache error occurs, in
this point I could read the data and reload the broadcast variable for the
next microbach and finally reset the accumulator to 0.
I don't know if there are a better solution or others ideas to do this.

Has anyone faced this problem?


Reset the offsets, Kafka 0.10 and Spark

2018-06-07 Thread Guillermo Ortiz Fernández
I'm consuming data from Kafka with  createDirectStream and store the
offsets in Kafka (
https://spark.apache.org/docs/2.1.0/streaming-kafka-0-10-integration.html#kafka-itself
)

val stream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams))



My Spark version is 2.0.2 and 0.10 from Kafka. This solution works well and
when I restart the spark process starts from the last offset which Spark
consumes, but sometimes I need to reprocess all the topic from the
beginning.

I have seen that I could reset the offset with a kafka script but it's not
enable in Kafka 0.10...

kafka-consumer-groups --bootstrap-server kafka-host:9092 --group
my-group --reset-offsets --to-earliest --all-topics --execute


Another possibility it's to set another kafka parameter in the
createDirectStream with a map with the offsets but, how could I get first
offset from each partition?, I have checked the api from the new consumer
and I don't see any method to get these offsets.

Any other way?? I could start with another groupId as well, but it doesn't
seem a very clean option for production.


Measure performance time in some spark transformations.

2018-05-12 Thread Guillermo Ortiz Fernández
I want to measure how long it takes some different transformations in Spark
as map, joinWithCassandraTable and so on.  Which one is the best
aproximation to do it?

def time[R](block: => R): R = {
val t0 = System.nanoTime()
val result = block
val t1 = System.nanoTime()
println("Elapsed time: " + (t1 - t0) + "ns")
result}


Could I use something like this?? I guess that the System.nanoTime will be
executed in the driver before and after the workers execute the maps/joins
and so on. Is it right? any other idea?