Speed up Spark writes to Google Cloud storage
hi, Our Spark writes to GCS are slow. The reason I see is that a staging directory used for the initial data generation following by copying the data to actual directory in GCS. Following are few configs and code. Any suggestions on how to speed this thing up will be great. sparkSession.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic") sparkSession.conf.set("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2") sparkSession.conf.set("spark.hadoop.mapreduce.use.directfileoutputcommitter", "true") sparkSession.conf.set( "spark.hadoop.mapred.output.committer.class", "org.apache.hadoop.mapred.DirectFileOutputCommitter" ) sparkSession.sparkContext.hadoopConfiguration .set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false") sparkSession.sparkContext.hadoopConfiguration .set("spark.speculation", "false") snapshotInGCS.write .option("header", "true") .option("emptyValue", "") .option("delimiter", "^") .mode(SaveMode.Overwrite) .format("csv") .partitionBy("date", "id") .option("compression", "gzip") .save(s"gs://${bucketName}/${folderName}") Thank you, SK -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Issue with file names writeStream in Structured Streaming
Hi, We are using something like the following to write data to files in Structured Streaming and we seem to get file names as part* as mentioned in https://stackoverflow.com/questions/51056764/how-to-define-a-spark-structured-streaming-file-sink-file-path-or-file-name. How to get file names of our choice for each row in the dataframe? Like say /day/month/id/log.txt? df.writeStream .format("parquet") // can be "orc", "json", "csv", etc. .option("path", "/path/to/save/") .partitionBy("year", "month", "day", "hour") .start() Thanks for the help!!! -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Automatic Json Schema inference using Structured Streaming
Hi, Is there a way that Automatic Json Schema inference can be done using Structured Streaming? I do not want to supply a predefined schema and bind it. With Spark Kafka Direct I could do spark.read.json(). I see that this is not supported in Structured Streaming. Thanks! -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Lag and queued up batches info in Structured Streaming UI
hi, How do we get information like lag and queued up batches in Structured streaming? Following api does not seem to give any info about lag and queued up batches similar to DStreams. https://spark.apache.org/docs/2.2.1/api/java/org/apache/spark/streaming/scheduler/BatchInfo.html Thanks! -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
How do I implement forEachWriter in structured streaming so that the connection is created once per partition?
Hi, How do I implement forEachWriter in structured streaming so that the connect is created once per partition and updates are done in a batch just like forEachPartition in RDDs? Thanks for the help! -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Does structured streaming support Spark Kafka Direct?
hi, We have code based on Spark Kafka Direct in production and we want to port this code to Structured Streaming. Does structured streaming support spark kafka direct? What are the configs for parallelism and scalability in structured streaming? In Spark Kafka Direct, the number of kafka partitions take care of parallelism when doing the consumption. Is it the same case with Structured Streaming? Thanks for the help in Advance! -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
How to get time slice or the batch time for which the current micro batch is running in Spark Streaming
Hi, How to get the time slice or the batch time for which the current micro batch is running in Spark Streaming? Currently I am using System time which is causing the clearing keys feature of reduceByKeyAndWindow to not work properly. Thanks, Swetha -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Slower performance while running Spark Kafka Direct Streaming with Kafka 10 cluster
Hi, What would be the appropriate settings to run Spark with Kafka 10? My job works fine with Spark with Kafka 8 and with Kafka 8 cluster. But its very slow with Kafka 10 by using Kafka Direct' experimental APIs for Kafka 10 . I see the following error sometimes . Please see the kafka parameters and the consumer strategy for creating the stream below. Any suggestions on how to run this with better performance would be of great help. java.lang.AssertionError: assertion failed: Failed to get records for test stream1 72 324027964 after polling for 12 val kafkaParams = Map[String, Object]( "bootstrap.servers" -> kafkaBrokers, "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "auto.offset.reset" -> "latest", "heartbeat.interval.ms" -> Integer.valueOf(2), "session.timeout.ms" -> Integer.valueOf(6), "request.timeout.ms" -> Integer.valueOf(9), "enable.auto.commit" -> (false: java.lang.Boolean), "spark.streaming.kafka.consumer.cache.enabled" -> "false", "group.id" -> "test1" ) val hubbleStream = KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams) ) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Slower-performance-while-running-Spark-Kafka-Direct-Streaming-with-Kafka-10-cluster-tp29108.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
ReduceByKeyAndWindow checkpoint recovery issues in Spark Streaming
Hi, ReduceByKeyAndWindow checkpoint recovery has issues when trying to recover for the second time. Basically it is losing the reduced value of the previous window but is present in the old values that needs to be inverse reduced resulting in the following error. Does anyone has any idea as to why it does not recover properly the second time? Neither previous window has value for key, nor new values found. Are you sure your key class hashes consistently? at org.apache.spark.streaming.dstream.ReducedWindowedDStream$$anonfun$4.apply(ReducedWindowedDStream.scala:143) at org.apache.spark.streaming.dstream.ReducedWindowedDStream$$anonfun$4.apply(ReducedWindowedDStream.scala:130) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ReduceByKeyAndWindow-checkpoint-recovery-issues-in-Spark-Streaming-tp29100.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
How to force Spark Kafka Direct to start from the latest offset when the lag is huge in kafka 10?
Hi, How to force Spark Kafka Direct to start from the latest offset when the lag is huge in kafka 10? It seems to be processing from the latest offset stored for a group id. One way to do this is to change the group id. But it would mean that each time that we need to process the job from the latest offset we have to provide a new group id. Is there a way to force the job to run from the latest offset in case we need to and still use the same group id? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-force-Spark-Kafka-Direct-to-start-from-the-latest-offset-when-the-lag-is-huge-in-kafka-10-tp29071.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Issues when trying to recover a textFileStream from checkpoint in Spark streaming
Hi, I am facing issues while trying to recover a textFileStream from checkpoint. Basically it is trying to load the files from the begining of the job start whereas I am deleting the files after processing them. I have the following configs set so was thinking that it should not look for files beyond 2 minutes when trying to recover from checkpoint. Any suggestions on this would be of great help. sparkConf.set("spark.streaming.minRememberDuration","120s") sparkConf.set("spark.streaming.fileStream.minRememberDuration","120s") Thanks, Swetha -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Issues-when-trying-to-recover-a-textFileStream-from-checkpoint-in-Spark-streaming-tp29052.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Memory consumption and checkpointed data seems to increase incrementally when reduceByKeyAndWIndow with inverse function is used with mapWithState in Stateful streaming
Hi, Memory consumption and checkpointed data seems to increase incrementally when reduceByKeyAndWindow with inverse function is used with mapWithState. My application uses stateful streaming with mapWithState. The keys generated by mapWithState are then used by reduceByKeyAndWindow to do rolling counts for 24 hours. The MapWithStateRDD seems to be getting persisted forever even though I have checkpointing enabled every 10 minutes and the ShuffledRDD generated by reduceByKeyAndWindow seems to be getting incremented in memory linearly. Any idea why this happens? Is it a possibility that ShuffledRDD is caching some data from mapWithState as it is dependent on that for keys? Thanks, Swetha -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Memory-consumption-and-checkpointed-data-seems-to-increase-incrementally-when-reduceByKeyAndWIndow-wg-tp28860.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Does mapWithState need checkpointing to be specified in Spark Streaming?
Hi, Do we need to specify checkpointing for mapWithState just like we do for updateStateByKey? Thanks, Swetha -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Does-mapWithState-need-checkpointing-to-be-specified-in-Spark-Streaming-tp28858.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Spark streaming does not seem to clear MapPartitionsRDD and ShuffledRDD that are persisted after the use of updateStateByKey and reduceByKeyAndWindow with inverse functions even after checkpointing th
Hi, Spark streaming does not seem to clear MapPartitionsRDD and ShuffledRDD that are persisted after the use of updateStateByKey and reduceByKeyAndWindow with inverse functions even after checkpointing the data. Any idea as to why thing happens? Is there a way that I can set a time out to clear the persisted data after a while? It seems to be not clearing the cached MapPartitionsRDD and ShuffledRDD even after I explicitly call unpersist and also do the checkpointing. Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-does-not-seem-to-clear-MapPartitionsRDD-and-ShuffledRDD-that-are-persisted-after-thea-tp28850.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Partitions cached by updatStateByKey does not seem to be getting evicted forever
Hi, We use updateStateByKey in our Spark streaming application. The partitions cached by updateStateByKey does not seem to be getting evicted. It was getting evicted fine with spark.cleaner.ttl in 1.5.1. I am facing issues with partitions not getting evicted with Stateful Streaming after Spark 2.1. upgrade. Any idea as to why this happens? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Partitions-cached-by-updatStateByKey-does-not-seem-to-be-getting-evicted-forever-tp28827.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
How to make sure that Spark Kafka Direct Streaming job maintains the state upon code deployment?
Hi, We use UpdateStateByKey, reduceByKeyWindow and checkpoint the data. We store the offsets in Zookeeper. How to make sure that the state of the job is maintained upon redeploying the code? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-make-sure-that-Spark-Kafka-Direct-Streaming-job-maintains-the-state-upon-code-deployment-tp28799.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
How to reduce the amount of data that is getting written to the checkpoint from Spark Streaming
Hi, I have checkpoints enabled in Spark streaming and I use updateStateByKey and reduceByKeyAndWindow with inverse functions. How do I reduce the amount of data that I am writing to the checkpoint or clear out the data that I dont care? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-reduce-the-amount-of-data-that-is-getting-written-to-the-checkpoint-from-Spark-Streaming-tp28798.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
How do I find the time taken by each step in a stage in a Spark Job
Hi, How do I find the time taken by each step in a stage in spark job? Also, how do I find the bottleneck in each step and if a stage is skipped because of the RDDs being persisted in streaming? I am trying to identify which step in a job is taking time in my Streaming job. Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-do-I-find-the-time-taken-by-each-step-in-a-stage-in-a-Spark-Job-tp28796.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Spark Streaming reduceByKeyAndWindow with inverse function seems to iterate over all the keys in the window even though they are not present in the current batch
Hi, We have reduceByKeyAndWindow with inverse function feature in our Streaming job to calculate rolling counts for the past hour and for the past 24 hours. It seems that the functionality is iterating over all the keys in the window even though they are not present in the current batch causing the processing times to be high. My batch size is 1 minute. Is there a way that the reduceByKeyAndWindow would just iterate over the keys present in the current batch instead of reducing over all the keys in the Window? Because typically the updates would happen only for the keys present in the current batch. Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-reduceByKeyAndWindow-with-inverse-function-seems-to-iterate-over-all-the-keys-in-theh-tp28792.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
How to bootstrap Spark Kafka direct with the previous state in case of a code upgrade
Hi, How do we bootstrap the streaming job with the previous state when we do a code change and redeploy? We use updateStateByKey to maintain the state and store session objects and LinkedHashMaps in the checkpoint. Thanks, Swetha -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-bootstrap-Spark-Kafka-direct-with-the-previous-state-in-case-of-a-code-upgrade-tp28775.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Is Structured streaming ready for production usage
Hi, Is structured streaming ready for production usage in Spark 2.2? Thanks, Swetha -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Is-Structured-streaming-ready-for-production-usage-tp28751.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Exception which using ReduceByKeyAndWindow in Spark Streaming.
Hi, I see the following error when I use ReduceByKeyAndWindow in my Spark Streaming app. I use reduce, invReduce and filterFunction as shown below. Any idea as to why I get the error? java.lang.Exception: Neither previous window has value for key, nor new values found. Are you sure your key class hashes consistently? def reduceWithHashSet: ((Long, HashSet[String]), (Long, HashSet[String])) => (Long, HashSet[String])= { case ((timeStamp1: Long, set1: HashSet[String]), (timeStamp2: Long, set2: HashSet[String])) => (Math.max(timeStamp1, timeStamp2), set1 ++set2 ) } def invReduceWithHashSet: ((Long, HashSet[String]), (Long, HashSet[String])) => (Long, HashSet[String])= { case ((timeStamp1: Long, set1: HashSet[String]), (timeStamp2: Long, set2: HashSet[String])) => (Math.max(timeStamp1, timeStamp2), set1.diff(set2)) } def filterFuncForInvReduce: ((String, (Long, HashSet[String]))) => (Boolean)= { case ((metricName:String, (timeStamp: Long, set: HashSet[String]))) => set.size>0 } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Exception-which-using-ReduceByKeyAndWindow-in-Spark-Streaming-tp28748.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Checkpointing fro reduceByKeyAndWindow with a window size of 1 hour and 24 hours
Hi, What happens if I dont specify checkpointing on a DStream that has reduceByKeyAndWindow with no inverse function? Would it cause the memory to be overflown? My window sizes are 1 hour and 24 hours. I cannot provide an inserse function for this as it is based on HyperLogLog. My code looks like something like the following: val logsByPubGeo = messages.map(_._2).filter(_.geo != Constants.UnknownGeo).map { log => val key = PublisherGeoKey(log.publisher, log.geo) val agg = AggregationLog( timestamp = log.timestamp, sumBids = log.bid, imps = 1, uniquesHll = hyperLogLog(log.cookie.getBytes(Charsets.UTF_8)) ) (key, agg) } val aggLogs = logsByPubGeo.reduceByKeyAndWindow(reduceAggregationLogs, BatchDuration) private def reduceAggregationLogs(aggLog1: AggregationLog, aggLog2: AggregationLog) = { aggLog1.copy( timestamp = math.min(aggLog1.timestamp, aggLog2.timestamp), sumBids = aggLog1.sumBids + aggLog2.sumBids, imps = aggLog1.imps + aggLog2.imps, uniquesHll = aggLog1.uniquesHll + aggLog2.uniquesHll ) } Please let me know. Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Checkpointing-fro-reduceByKeyAndWindow-with-a-window-size-of-1-hour-and-24-hours-tp28722.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
HyperLogLogMonoid for unique visitor count in Spark Streaming
Hi, We have a requirement to calculate unique visitors in Spark Streaming. Can HyperLogLogMonoid be applied to a sliding window in Spark Streaming to calculate unique visitors? Any example on how to do that would be of great help. Thanks, Swetha -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/HyperLogLogMonoid-for-unique-visitor-count-in-Spark-Streaming-tp28505.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
How does Spark provide Hive style bucketing support?
Hi, How does Spark provide Hive style bucketing support in Spark 2.x? Thanks, Swetha -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-does-Spark-provide-Hive-style-bucketing-support-tp28462.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
How to tune groupBy operations in Spark 2.x?
Hi, How to tune the Spark Jobs that use groupBy operations? Earlier I used to use --conf spark.shuffle.memoryFraction=0.8 --conf spark.storage.memoryFraction=0.1 to tune my jobs that use groupBy. But, with Spark 2.x this configs seem to have been deprecated. What would be the appropriate config options to tune the Spark Jobs that use groupBy operations? Thanks, Swetha -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-tune-groupBy-operations-in-Spark-2-x-tp28451.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
How to configure global_temp database via Spark Conf
Hi, How to configure global_temp database via SparkConf? I know that its a System Preserved database. Can it be preserved via Spark Conf? Thanks, Swetha -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-configure-global-temp-database-via-Spark-Conf-tp28440.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
global_temp database not getting created in Spark 2.x
Hi, The global_temp database is not getting created when I try to use Spark 2.x. Do I need to create it manually or do I need any permissions to do the same? 17/02/28 12:08:09 INFO HiveMetaStore.audit: ugi=user12345 ip=unknown-ip-addr cmd=get_database: global_temp 17/02/28 12:08:09 ERROR metastore.RetryingHMSHandler: NoSuchObjectException(message:There is no database named global_temp) at org.apache.hadoop.hive.metastore.ObjectStore.getMDatabase(ObjectStore.java:493) at org.apache.hadoop.hive.metastore.ObjectStore.getDatabase(ObjectStore.java:504) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.hive.metastore.RawStoreProxy.invoke(RawStoreProxy.java:108) at com.sun.proxy.$Proxy10.getDatabase(Unknown Source) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.get_database(HiveMetaStore.java:842) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:105) at com.sun.proxy.$Proxy11.get_database(Unknown Source) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getDatabase(HiveMetaStoreClient.java:949) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:89) at com.sun.proxy.$Proxy12.getDatabase(Unknown Source) at org.apache.hadoop.hive.ql.metadata.Hive.getDatabase(Hive.java:1165) at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getDatabaseOption$1.apply(HiveClientImpl.scala:340) at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getDatabaseOption$1.apply(HiveClientImpl.scala:340) at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$withHiveState$1.apply(HiveClientImpl.scala:283) at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:230) at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:229) at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:272) at org.apache.spark.sql.hive.client.HiveClientImpl.getDatabaseOption(HiveClientImpl.scala:339) at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$databaseExists$1.apply$mcZ$sp(HiveExternalCatalog.scala:170) at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$databaseExists$1.apply(HiveExternalCatalog.scala:170) at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$databaseExists$1.apply(HiveExternalCatalog.scala:170) at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:95) at org.apache.spark.sql.hive.HiveExternalCatalog.databaseExists(HiveExternalCatalog.scala:169) at org.apache.spark.sql.internal.SharedState.(SharedState.scala:111) at org.apache.spark.sql.SparkSession$$anonfun$sharedState$1.apply(SparkSession.scala:101) at org.apache.spark.sql.SparkSession$$anonfun$sharedState$1.apply(SparkSession.scala:101) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/global-temp-database-not-getting-created-in-Spark-2-x-tp28439.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Error while enabling Hive Support in Spark 2.1
Hi, I have been trying to get my Spark job upgraded to 2.x. I see the following error. It seems to be looking for some global_temp database by default. Is this a behaviour of Spark 2.x that it looks for global_temp database by default? 17/02/27 16:59:09 INFO HiveMetaStore.audit: ugi=user1234 ip=unknown-ip-addr cmd=get_database: global_temp 17/02/27 16:59:09 ERROR metastore.RetryingHMSHandler: NoSuchObjectException(message:There is no database named global_temp) at org.apache.hadoop.hive.metastore.ObjectStore.getMDatabase(ObjectStore.java:493) at org.apache.hadoop.hive.metastore.ObjectStore.getDatabase(ObjectStore.java:504) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.hive.metastore.RawStoreProxy.invoke(RawStoreProxy.java:108) at com.sun.proxy.$Proxy10.getDatabase(Unknown Source) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.get_database(HiveMetaStore.java:842) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:105) at com.sun.proxy.$Proxy11.get_database(Unknown Source) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getDatabase(HiveMetaStoreClient.java:949) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:89) at com.sun.proxy.$Proxy12.getDatabase(Unknown Source) at org.apache.hadoop.hive.ql.metadata.Hive.getDatabase(Hive.java:1165) at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getDatabaseOption$1.apply(HiveClientImpl.scala:340) at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getDatabaseOption$1.apply(HiveClientImpl.scala:340) at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$withHiveState$1.apply(HiveClientImpl.scala:283) at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:230) at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:229) at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:272) at org.apache.spark.sql.hive.client.HiveClientImpl.getDatabaseOption(HiveClientImpl.scala:339) at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$databaseExists$1.apply$mcZ$sp(HiveExternalCatalog.scala:170) at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$databaseExists$1.apply(HiveExternalCatalog.scala:170) at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$databaseExists$1.apply(HiveExternalCatalog.scala:170) at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:95) at org.apache.spark.sql.hive.HiveExternalCatalog.databaseExists(HiveExternalCatalog.scala:169) at org.apache.spark.sql.internal.SharedState.(SharedState.scala:111) at org.apache.spark.sql.SparkSession$$anonfun$sharedState$1.apply(SparkSession.scala:101) at org.apache.spark.sql.SparkSession$$anonfun$sharedState$1.apply(SparkSession.scala:101) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.sql.SparkSession.sharedState$lzycompute(SparkSession.scala:101) at org.apache.spark.sql.SparkSession.sharedState(SparkSession.scala:100) at org.apache.spark.sql.internal.SessionState.(SessionState.scala:157) at org.apache.spark.sql.hive.HiveSessionState.(HiveSessionState.scala:32) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$reflect(SparkSession.scala:978) at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:110) at
How to set hive configs in Spark 2.1?
Hi, How to set the hive configurations in Spark 2.1? I have the following in 1.6. How to set the configs related to hive using the new SparkSession? sqlContext.sql(s"use ${HIVE_DB_NAME} ") sqlContext.setConf("hive.exec.dynamic.partition", "true") sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict") sqlContext.setConf("hive.exec.max.dynamic.partitions.pernode", "10") sqlContext.setConf("hive.exec.max.dynamic.partitions", "10") sqlContext.setConf("hive.scratch.dir.permission", "777") sqlContext.setConf("spark.sql.orc.filterPushdown", "true") sqlContext.setConf("spark.sql.shuffle.partitions", "2000") sqlContext.setConf("hive.default.fileformat", "Orc") sqlContext.setConf("hive.exec.orc.memory.pool", "1.0") sqlContext.setConf("hive.optimize.sort.dynamic.partition", "true") sqlContext.setConf("hive.exec.reducers.max", "2000") sqlContext.setConf("spark.sql.orc.filterPushdown", "true") sqlContext.sql("set hive.default.fileformat=Orc ") sqlContext.sql("set hive.enforce.bucketing = true; ") sqlContext.sql("set hive.enforce.sorting = true; ") sqlContext.sql("set hive.auto.convert.join = true; ") sqlContext.sql("set hive.optimize.bucketmapjoin = true; ") sqlContext.sql("set hive.optimize.insert.dest.volume=true;") -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-set-hive-configs-in-Spark-2-1-tp28429.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Unable to see external table that is created from Hive Context in the list of hive tables
Hi, I created an external table in Spark sql using hiveContext ...something like CREATE EXTERNAL TABLE IF NOT EXISTS sampleTable stored as ORC LOCATION ... I can see the files getting created under the location I specified and able to query it as well... but, I don't see the table in Hive when I do show tables in Hive. Any idea as to why this is happening? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Unable-to-see-external-table-that-is-created-from-Hive-Context-in-the-list-of-hive-tables-tp27562.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
How to spin up Kafka using docker and use for Spark Streaming Integration tests
Hi, I need to do integration tests using Spark Streaming. My idea is to spin up kafka using docker locally and use it to feed the stream to my Streaming Job. Any suggestions on how to do this would be of great help. Thanks, Swetha -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-spin-up-Kafka-using-docker-and-use-for-Spark-Streaming-Integration-tests-tp27252.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Integration tests for Spark Streaming
Hi, I need to write some integration tests for my Spark Streaming app. Any example on how to do this would be of great help. Thanks, Swetha -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Integration-tests-for-Spark-Streaming-tp27246.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Kryo ClassCastException during Serialization/deserialization in Spark Streaming
Hi, I keep getting the following error in my Spark Streaming every now and then after the job runs for say around 10 hours. I have those 2 classes registered in kryo as shown below. sampleMap is a field in SampleSession as shown below. Any suggestion as to how to avoid this would be of great help!! public class SampleSession implements Serializable, Cloneable{ private MapsampleMap; } sparkConf.registerKryoClasses(Array( classOf[SampleSession], classOf[Sample])) com.esotericsoftware.kryo.KryoException: java.lang.ClassCastException: com.test.Sample cannot be cast to java.lang.String Serialization trace: sampleMap (com.test.SampleSession) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.twitter.chill.Tuple6Serializer.write(TupleSerializers.scala:96) at com.twitter.chill.Tuple6Serializer.write(TupleSerializers.scala:93) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:37) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:158) at org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:153) at org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1190) at org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1199) at org.apache.spark.storage.MemoryStore.putArray(MemoryStore.scala:132) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:793) at org.apache.spark.storage.BlockManager.putArray(BlockManager.scala:669) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:175) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) at org.apache.spark.rdd.RDD.iterator(RDD.scala:262) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassCastException: com.test.Sample cannot be cast to java.lang.String at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:146) at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549) at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:82) at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:17) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) ... 37 more -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Kryo-ClassCastException-during-Serialization-deserialization-in-Spark-Streaming-tp27219.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to insert data into 2000 partitions(directories) of ORC/parquet at a time using Spark SQL?
Hi, How to insert data into 2000 partitions(directories) of ORC/parquet at a time using Spark SQL? It seems to be not performant when I try to insert 2000 directories of Parquet/ORC using Spark SQL. Did anyone face this issue? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-insert-data-into-2000-partitions-directories-of-ORC-parquet-at-a-time-using-Spark-SQL-tp27132.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to insert data for 100 partitions at a time using Spark SQL
Hi, In my Spark SQL query to insert data, I have around 14,000 partitions of data which seems to be causing memory issues. How can I insert the data for 100 partitions at a time to avoid any memory issues? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-insert-data-for-100-partitions-at-a-time-using-Spark-SQL-tp26997.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to set the degree of parallelism in Spark SQL?
Hi, How to set the degree of parallelism in Spark SQL? I am using the following but it somehow seems to allocate only two executors at a time. sqlContext.sql(" set spark.sql.shuffle.partitions 200 ") Thanks, Swetha -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-set-the-degree-of-parallelism-in-Spark-SQL-tp26996.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
What factors decide the number of executors when doing a Spark SQL insert in Mesos?
Hi, What factors decide the number of executors when doing a Spark SQL insert? Right now when I submit my job in Mesos I see only 2 executors getting allocated all the time. Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/What-factors-decide-the-number-of-executors-when-doing-a-Spark-SQL-insert-in-Mesos-tp26990.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Memory issues when trying to insert data in the form of ORC using Spark SQL
Hi, I see some memory issues when trying to insert the data in the form of ORC using Spark SQL. Please find the query and exception below. Any idea as to why this is happening? sqlContext.sql(" CREATE EXTERNAL TABLE IF NOT EXISTS records (id STRING, record STRING) PARTITIONED BY (datePartition STRING, idPartition STRING) stored as ORC LOCATION '/user/users' ") sqlContext.sql(" orc.compress= SNAPPY") sqlContext.sql( """ from recordsTemp ps insert overwrite table users partition(datePartition , idPartition ) select ps.id, ps.record , ps.datePartition, ps.idPartition """.stripMargin) org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 13.0 failed 4 times, most recent failure: Lost task 0.3 in stage 13.0org.apache.hadoop.hive.ql.metadata.HiveException: parquet.hadoop.MemoryManager$1: New Memory allocation 1048575 bytes is smaller than the minimum allocation size of 1048576 bytes. at org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getHiveRecordWriter(HiveFileFormatUtils.java:249) at org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer.org$apache$spark$sql$hive$SparkHiveDynamicPartitionWriterContainer$$newWriter$1(hiveWriterContainers.scala:240) at org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer$$anonfun$getLocalFileWriter$1.apply(hiveWriterContainers.scala:249) at org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer$$anonfun$getLocalFileWriter$1.apply(hiveWriterContainers.scala:249) at scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:189) at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:91) at org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer.getLocalFileWriter(hiveWriterContainers.scala:249) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1$1.apply(InsertIntoHiveTable.scala:112) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1$1.apply(InsertIntoHiveTable.scala:104) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1(InsertIntoHiveTable.scala:104) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:84) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:84) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: parquet.hadoop.MemoryManager$1: New Memory allocation 1048575 bytes is smaller than the minimum allocation size of 1048576 bytes. at parquet.hadoop.MemoryManager.updateAllocation(MemoryManager.java:125) at parquet.hadoop.MemoryManager.addWriter(MemoryManager.java:82) at parquet.hadoop.ParquetRecordWriter.(ParquetRecordWriter.java:104) at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:303) at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:267) at org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper.(ParquetRecordWriterWrapper.java:65) at org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat.getParquerRecordWriterWrapper(MapredParquetOutputFormat.java:125) at org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat.getHiveRecordWriter(MapredParquetOutputFormat.java:114) at org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getRecordWriter(HiveFileFormatUtils.java:261) at org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getHiveRecordWriter(HiveFileFormatUtils.java:246) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Memory-issues-when-trying-to-insert-data-in-the-form-of-ORC-using-Spark-SQL-tp26988.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
What is the default value of rebalance.backoff.ms in Spark Kafka Direct?
Hi, We seem to be getting a lot of LeaderLostExceptions and our source Stream is working with a default value of rebalance.backoff.ms which is 2000. I was thinking to increase this value to 5000. Any suggestions on this? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/What-is-the-default-value-of-rebalance-backoff-ms-in-Spark-Kafka-Direct-tp26840.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to add an accumulator for a Set in Spark
Hi, How do I add an accumulator for a Set in Spark? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-add-an-accumulator-for-a-Set-in-Spark-tp26510.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Unit testing framework for Spark Jobs?
Hi, What is a good unit testing framework for Spark batch/streaming jobs? I have core spark, spark sql with dataframes and streaming api getting used. Any good framework to cover unit tests for these APIs? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Unit-testing-framework-for-Spark-Jobs-tp26380.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to control the number of parquet files getting created under a partition ?
Hi, How can I control the number of parquet files getting created under a partition? I have my sqlContext queries to create a table and insert the records as follows. It seems to create around 250 parquet files under each partition though I was expecting that to create around 2 or 3 files. Due to the large number of files, it takes a lot of time to scan the records. Any suggestions as to how to control the number of parquet files under each partition would be of great help. sqlContext.sql(" CREATE EXTERNAL TABLE IF NOT EXISTS testUserDts (userId STRING, savedDate STRING) PARTITIONED BY (partitioner STRING) stored as PARQUET LOCATION '/user/testId/testUserDts' ") sqlContext.sql( """from testUserDtsTemp ps insert overwrite table testUserDts partition(partitioner) select ps.userId, ps.savedDate , ps.partitioner """.stripMargin) Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-control-the-number-of-parquet-files-getting-created-under-a-partition-tp26374.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Error when trying to insert data to a Parquet data source in HiveQL
Hi, I seem to be getting the following error when I try to insert data to a parquet datasource. Any idea as to why this is happening? org.apache.hadoop.hive.ql.metadata.HiveException: parquet.hadoop.MemoryManager$1: New Memory allocation 1045004 bytes is smaller than the minimum allocation size of 1048576 bytes. at org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getHiveRecordWriter(HiveFileFormatUtils.java:249) at org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer.org$apache$spark$sql$hive$SparkHiveDynamicPartitionWriterContainer$$newWriter$1(hiveWriterContainers.scala:240) at org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer$$anonfun$getLocalFileWriter$1.apply(hiveWriterContainers.scala:249) at org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer$$anonfun$getLocalFileWriter$1.apply(hiveWriterContainers.scala:249) at scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:189) at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:91) at org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer.getLocalFileWriter(hiveWriterContainers.scala:249) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1$1.apply(InsertIntoHiveTable.scala:112) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1$1.apply(InsertIntoHiveTable.scala:104) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-trying-to-insert-data-to-a-Parquet-data-source-in-HiveQL-tp26365.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Error when trying to overwrite a partition dynamically in Spark SQL
Hi, I am getting an error when trying to overwrite a partition dynamically. Following is the code and the error. Any idea as to why this is happening? test.write.partitionBy("dtPtn","idPtn").mode(SaveMode.Overwrite).format("parquet").save("/user/test/sessRecs") 16/02/28 18:02:55 ERROR input.FileInputFormat: Exception while trying to createSplits java.util.concurrent.ExecutionException: java.lang.ArrayIndexOutOfBoundsException: -1 at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:188) at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:337) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-trying-to-overwrite-a-partition-dynamically-in-Spark-SQL-tp26355.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to overwrite data dynamically to specific partitions in Spark SQL
Hi, I need to overwrite data dynamically to specific partitions depending on filters. How can that be done in sqlContext? Thanks, Swetha -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-overwrite-data-dynamically-to-specific-partitions-in-Spark-SQL-tp26338.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to join multiple tables and use subqueries in Spark SQL using sqlContext?
Hi, How do I join multiple tables and use subqueries in Spark SQL using sqlContext? Can I do this using sqlContext or do I have to use HiveContext for the same? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-join-multiple-tables-and-use-subqueries-in-Spark-SQL-using-sqlContext-tp26315.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to delete a record from parquet files using dataframes
Hi, I am saving my records in the form of parquet files using dataframes in hdfs. How to delete the records using dataframes? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-delete-a-record-from-parquet-files-using-dataframes-tp26242.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to use a custom partitioner in a dataframe in Spark
Hi, How do I use a custom partitioner when I do a saveAsTable in a dataframe. Thanks, Swetha -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-a-custom-partitioner-in-a-dataframe-in-Spark-tp26240.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Error when doing a SaveAstable on a Spark dataframe
Hi, I get an error when I do a SaveAsTable as shown below. I do have write access to the hive volume. Any idea as to why this is happening? val df = testDF.toDF("id", "rec") df.printSchema() val options = Map("path" -> "/hive/test.db/") df.write.format("parquet").partitionBy("id").options(options).mode(SaveMode.Append).saveAsTable("sessRecs") 16/02/15 19:04:41 WARN scheduler.TaskSetManager: Lost task 369.0 in stage 2.0 (): org.apache.spark.SparkException: Task failed while writing rows. at org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:393) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: Failed to commit task at org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.commitTask$2(WriterContainer.scala:422) at org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:388) ... 8 more Caused by: java.io.IOException: Error: Read-only file system(30), file: test, user name: test, ID: 12345678 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-doing-a-SaveAstable-on-a-Spark-dataframe-tp26232.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to partition a dataframe based on an Id?
Hi, How to partition a dataframe of User Objects based on an Id so that I can both do a join on an Id and also retrieve all the user objects in between a time period when queried? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-partition-a-dataframe-based-on-an-Id-tp26228.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Is predicate push-down supported by default in dataframes?
Hi, Is predicate push down supported by default in dataframes or is it dependent on the format in which the dataframes is stored like Parquet? Thanks, Swetha -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Is-predicate-push-down-supported-by-default-in-dataframes-tp26227.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to join an RDD with a hive table?
Hi, How to join an RDD with a hive table and retrieve only the records that I am interested. Suppose, I have an RDD that has 1000 records and there is a Hive table with 100,000 records, I should be able to join the RDD with the hive table by an Id and I should be able to load only those 1000 records from Hive table so that are no memory issues. Also, I was planning on storing the data in hive in the form of parquet files. Any help on this is greatly appreciated. Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-join-an-RDD-with-a-hive-table-tp26225.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to query a hive table from inside a map in Spark
Hi, Is it possible to query a hive table which has data stored in the form of a parquet file from inside map/partitions in Spark? My requirement is that I have a User table in Hive/hdfs and for each record inside a sessions RDD, I should be able to query the User table and if the User table already has a record for that userId, query the record and do further processing. Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-query-a-hive-table-from-inside-a-map-in-Spark-tp26224.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to store documents in hdfs and query them by id using Hive/Spark SQL
Hi, We have a requirement wherein we need to store the documents in hdfs. The documents are nothing but Json Strings. We should be able to query them by Id using Spark SQL/Hive Context as and when needed. What would be the correct approach to do this? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-store-documents-in-hdfs-and-query-them-by-id-using-Hive-Spark-SQL-tp26219.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to query a Hive table by Id from inside map partitions
Hi, How can I query a hive table from inside mappartitions to retrieve a value by Id? Thanks, Swetha -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-query-a-Hive-table-by-Id-from-inside-map-partitions-tp26220.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to do a look up by id from files in hdfs inside a transformation/action ina RDD
Hi, How to do a lookup by id from a set of records stored in hdfs from inside a transformation/action of an RDD. Thanks, Swetha -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-do-a-look-up-by-id-from-files-in-hdfs-inside-a-transformation-action-ina-RDD-tp26185.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to collect/take arbitrary number of records in the driver?
Hi , How to get a fixed amount of records from an RDD in Driver? Suppose I want the records from 100 to 1000 and then save them to some external database, I know that I can do it from Workers in partition but I want to avoid that for some reasons. The idea is to collect the data to driver and save, although slowly. I am looking for something like take(100, 1000) or take (1000,2000) Thanks, Swetha -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-collect-take-arbitrary-number-of-records-in-the-driver-tp26184.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to use a register temp table inside mapPartitions of an RDD
hi, How to use a registerTempTable to register an RDD as a temporary table and use it inside mapPartitions of a different RDD? Thanks, Swetha -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-a-register-temp-table-inside-mapPartitions-of-an-RDD-tp26187.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to edit/delete a message posted in Apache Spark User List?
Hi, How do I edit/delete a message posted in Apache Spark User List? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-edit-delete-a-message-posted-in-Apache-Spark-User-List-tp26160.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Driver not able to restart the job automatically after the application of Streaming with Kafka Direct went down
Hi, I have the Streaming job running in qa/prod. Due to Kafka issues both the jobs went down. After the Kafka issues got resolved and after the deletion of the checkpoint directory the driver in the qa job restarted the job automatically and the application UI was up. But, in the prod job, the driver did not restart the application. Any idea as to why the prod driver not able to restart the job with everything being same in qa/prod including the --supervise option? Thanks, Swetha -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Driver-not-able-to-restart-the-job-automatically-after-the-application-of-Streaming-with-Kafka-Direcn-tp26155.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
java.nio.channels.ClosedChannelException in Spark Streaming KafKa Direct
Hi, I see the following error in Spark Streaming with Kafka Direct. I think that this error is related to Kafka topic. Any suggestions on how to avoid this error would be of great help. java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:192) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:396) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:413) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.insertAll(BypassMergeSortShuffleWriter.java:99) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/java-nio-channels-ClosedChannelException-in-Spark-Streaming-KafKa-Direct-tp26124.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Can a tempTable registered by sqlContext be used inside a forEachRDD?
Hi, Can a tempTable registered in sqlContext be used to query inside forEachRDD as shown below? My requirement is that I have a set of data in the form of parquet inside hdfs and I need to register the data as a tempTable using sqlContext and query it inside forEachRDD as shown below. sqlContext.registerTempTable("tempTable") messages.foreachRDD { rdd => val message:RDD[String] = rdd.map { y => y._2 } sqlContext.sql("SELECT time,To FROM tempTable") } Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Can-a-tempTable-registered-by-sqlContext-be-used-inside-a-forEachRDD-tp25862.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to load partial data from HDFS using Spark SQL
Hi, How to load partial data from hdfs using Spark SQL? Suppose I want to load data based on a filter like "Select * from table where id = " using Spark SQL with DataFrames, how can that be done? The idea here is that I do not want to load the whole data into memory when I use the SQL and I just want to load the data based on the filter. Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-load-partial-data-from-HDFS-using-Spark-SQL-tp25855.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Can SqlContext be used inside mapPartitions
Hi, Can SQL Context be used inside mapPartitions? My requirement is to register a set of data from hdfs as a temp table and to be able to lookup from inside MapPartitions based on a key. If it is not supported, is there a different way of doing this? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Can-SqlContext-be-used-inside-mapPartitions-tp25771.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark batch getting hung up
Hi, My Spark Batch job seems to hung up sometimes for a long time before it starts the next stage/exits. Basically it happens when it has mapPartition/foreachPartition in a stage. Any idea as to why this is happening? Thanks, Swetha -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-batch-getting-hung-up-tp25735.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to run multiple Spark jobs as a workflow that takes input from a Streaming job in Oozie
Hi, How to run multiple Spark jobs that takes Spark Streaming data as the input as a workflow in Oozie? We have to run our Streaming job first and then have a workflow of Spark Batch jobs to process the data. Any suggestions on this would be of great help. Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-run-multiple-Spark-jobs-as-a-workflow-that-takes-input-from-a-Streaming-job-in-Oozie-tp25739.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Comparisons between Ganglia and Graphite for monitoring the Streaming Cluster?
Hi, What are the comparisons between Ganglia and Graphite to monitor the Streaming Cluster? Which one has more advantages over the other? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Comparisons-between-Ganglia-and-Graphite-for-monitoring-the-Streaming-Cluster-tp25635.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Can not see any spark metrics on ganglia-web
Hi, Should the gmond be installed in all the Spark nodes? What should the host and port be? Should it be the host and port of gmetad? Enable GangliaSink for all instances *.sink.ganglia.class=org.apache.spark.metrics.sink.GangliaSink *.sink.ganglia.name=hadoop_cluster1 *.sink.ganglia.host=localhost *.sink.ganglia.port=8653 *.sink.ganglia.period=10 *.sink.ganglia.unit=seconds *.sink.ganglia.ttl=1 *.sink.ganglia.mode=multicast -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Can-not-see-any-spark-metrics-on-ganglia-web-tp14981p25636.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Executor metrics in spark application
Hi, Were you able to setup custom metrics in GangliaSink? If so, how did you register the custom metrics? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Executor-metrics-in-spark-application-tp188p25647.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Can not see any spark metrics on ganglia-web
Hi, Where does *.sink.csv.directory directory get created? I cannot see nay metrics in logs. How did you verify consoleSink and csvSink? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Can-not-see-any-spark-metrics-on-ganglia-web-tp14981p25643.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to get custom metrics using Ganglia Sink?
Hi, How do I configure custom metrics using Ganglia Sink? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-get-custom-metrics-using-Ganglia-Sink-tp25645.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Can not see any spark metrics on ganglia-web
Hi, I cannot see any metrics as well. How did you verify ConsoleSink and CSVSink works OK? Where does *.sink.csv.directory get created? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Can-not-see-any-spark-metrics-on-ganglia-web-tp14981p25644.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to build Spark with Ganglia to enable monitoring using Ganglia
Hi, How to do a maven build to enable monitoring using Ganglia? What is the command for the same? Thanks, Swetha -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-build-Spark-with-Ganglia-to-enable-monitoring-using-Ganglia-tp25625.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Task Time is too high in a single executor in Streaming
Hi, In my Streaming Job, most of the time seems to be taken by one executor. The shuffle read records is 713758 in that one particular executor but 0 in others. I have a groupBy followed by updateStateByKey, flatMap, map, reduceByKey and updateStateByKey operations in that Stage. I am suspecting that somewhere all the keys are getting collected in the same executor. But the number of partitions seem to be same for all of them as they are all in the same stage. Input Size / Records seems to be same in all of them. What does Shuffle Read Size / Records indicate exactly? Is that the shuffle going out of that executor or the shuffle performed by that executor? Task Time Total Tasks Failed TasksSucceeded Tasks Input Size / Records Shuffle Read Size / Records 2 s 12 0 12 392.3 KB / 867 18.0 KB / 0 2 s 12 0 12 411.5 KB / 878 17.9 KB / 0 2 s 12 0 12 397.7 KB / 889 18.0 KB / 0 2 s 12 0 12 387.4 KB / 834 18.0 KB / 0 1 s 8 0 8 263.6 KB / 597 11.9 KB / 0 2 s 12 0 12 397.9 KB / 902 18.0 KB / 0 2 s 12 0 12 411.1 KB / 901 18.0 KB / 0 2 s 12 0 12 370.4 KB / 837 18.0 KB / 0 34 s12 0 12 400.8 KB / 854 349.5 KB / 713758 2 s 12 0 12 393.3 KB / 885 17.9 KB / 0 2 s 12 0 12 390.3 KB / 862 17.9 KB / -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Task-Time-is-too-high-in-a-single-executor-in-Streaming-tp25614.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
ClassCastException in Kryo Serialization
Hi, I seem to be getting class cast exception in Kryo Serialization. Following is the error. Child1 class is a map in parent class. Child1 has a hashSet testObjects of the type Object1. I get an error when it tries to deserialize Object1. Any idea as to why this is happening? com.esotericsoftware.kryo.KryoException: java.lang.ClassCastException: Object1 cannot be cast to java.lang.String Serialization trace: testObjects (Child1) map (parent) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ClassCastException-in-Kryo-Serialization-tp25575.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to identify total schedule delay in a Streaming app using Ganglia?
Hi, How to identify total schedule delay in a Streaming app using Ganglia? Any sample code to integrate Ganglia with a Streaming app to generate Time Delay, number of batches failed in the last 5 minutes would be of great help. Thanks, Swetha -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-identify-total-schedule-delay-in-a-Streaming-app-using-Ganglia-tp25576.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Higher Processing times in Spark Streaming with kafka Direct
Hi, Our processing times in Spark Streaming with kafka Direct approach seems to have increased considerably with increase in the Site traffic. Would increasing the number of kafka partitions decrease the processing times? Any suggestions on tuning to reduce the processing times would be of great help. Thanks, Swetha -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Higher-Processing-times-in-Spark-Streaming-with-kafka-Direct-tp25571.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to modularize Spark Streaming Jobs?
Hi, What is the way to modularize Spark Streaming jobs something along the lines of what Spring XD does? Thanks, Swetha -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-modularize-Spark-Streaming-Jobs-tp25569.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Effective ways monitor and identify that a Streaming job has been failing for the last 5 minutes
Hi, We need to monitor and identify if the Streaming job has been failing for the last 5 minutes and restart the job accordingly. In most cases our Spark Streaming with Kafka direct fails with leader lost errors. Or offsets not found errors for that partition. What is the most effective way to monitor and identify that the Streamjng job has been failing with an error . The default monitoring provided by Spark does not seem to cover the case to check if the job has been failing for a specific time or am I missing something and this feature is already available? Thanks, Swetha -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Effective-ways-monitor-and-identify-that-a-Streaming-job-has-been-failing-for-the-last-5-minutes-tp25536.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Recovery for Spark Streaming Kafka Direct in case of issues with Kafka
Hi, So, our Streaming Job fails with the following errors. If you see the errors below, they are all related to Kafka losing offsets and OffsetOutOfRangeException. What are the options we have other than fixing Kafka? We would like to do something like the following. How can we achieve 1 and 2 with Spark Kafka Direct? 1.Need to see a way to skip some offsets if they are not available after the max retries are reached..in that case there might be data loss. 2.Catch that exception and somehow force things to "reset" for that partition And how would it handle the offsets already calculated in the backlog (if there is one)? 3.Track the offsets separately, restart the job by providing the offsets. 4.Or a straightforward approach would be to monitor the log for this error, and if it occurs more than X times, kill the job, remove the checkpoint directory, and restart. ERROR DirectKafkaInputDStream: ArrayBuffer(kafka.common.UnknownException, org.apache.spark.SparkException: Couldn't find leader offsets for Set([test_stream,5])) java.lang.ClassNotFoundException: kafka.common.NotLeaderForPartitionException at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699) java.util.concurrent.RejectedExecutionException: Task org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler@a48c5a8 rejected from java.util.concurrent.ThreadPoolExecutor@543258e0[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 12112] org.apache.spark.SparkException: Job aborted due to stage failure: Task 10 in stage 52.0 failed 4 times, most recent failure: Lost task 10.3 in stage 52.0 (TID 255, 172.16.97.97): UnknownReason Exception in thread "streaming-job-executor-0" java.lang.Error: java.lang.InterruptedException Caused by: java.lang.InterruptedException java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699) org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 33.0 failed 4 times, most recent failure: Lost task 7.3 in stage 33.0 (TID 283, 172.16.97.103): UnknownReason java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699) java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Recovery-for-Spark-Streaming-Kafka-Direct-in-case-of-issues-with-Kafka-tp25524.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Parquet files not getting coalesced to smaller number of files
Hi, I have the following code that saves the parquet files in my hourly batch to hdfs. My idea is to coalesce the files to 1500 smaller files. The first run it gives me 1500 files in hdfs. For the next runs the files seem to be increasing even though I coalesce. Its not getting coalesced to 1500 files as I want. I also have an example that I am using in the end. Please let me know if there is a different and more efficient way of doing this. val job = Job.getInstance() var filePath = "path" val metricsPath: Path = new Path(filePath) //Check if inputFile exists val fs: FileSystem = FileSystem.get(job.getConfiguration) if (fs.exists(metricsPath)) { fs.delete(metricsPath, true) } // Configure the ParquetOutputFormat to use Avro as the serialization format ParquetOutputFormat.setWriteSupportClass(job, classOf[AvroWriteSupport]) // You need to pass the schema to AvroParquet when you are writing objects but not when you // are reading them. The schema is saved in Parquet file for future readers to use. AvroParquetOutputFormat.setSchema(job, Metrics.SCHEMA$) // Create a PairRDD with all keys set to null and wrap each Metrics in serializable objects val metricsToBeSaved = metrics.map(metricRecord => (null, new SerializableMetrics(new Metrics(metricRecord._1, metricRecord._2._1, metricRecord._2._2; metricsToBeSaved.coalesce(1500) // Save the RDD to a Parquet file in our temporary output directory metricsToBeSaved.saveAsNewAPIHadoopFile(filePath, classOf[Void], classOf[Metrics], classOf[ParquetOutputFormat[Metrics]], job.getConfiguration) https://github.com/massie/spark-parquet-example/blob/master/src/main/scala/com/zenfractal/SparkParquetExample.scala -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Parquet-files-not-getting-coalesced-to-smaller-number-of-files-tp25509.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Automatic driver restart does not seem to be working in Spark Standalone
Hi, I am submitting my Spark job with supervise option as shown below. When I kill the driver and the app from UI, the driver does not restart automatically. This is in a cluster mode. Any suggestion on how to make Automatic Driver Restart work would be of great help. --supervise Thanks, Swetha -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Automatic-driver-restart-does-not-seem-to-be-working-in-Spark-Standalone-tp25478.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Does receiver based approach lose any data in case of a leader/broker loss in Spark Streaming?
Hi, Does receiver based approach lose any data in case of a leader/broker loss in Spark Streaming? We currently use Kafka Direct for Spark Streaming and it seems to be failing out when there is a leader loss and we can't really guarantee that there won't be any leader loss due rebalancing. If we go with receiver based approach, would it be able to overcome that situation? Thanks, Swetha -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Does-receiver-based-approach-lose-any-data-in-case-of-a-leader-broker-loss-in-Spark-Streaming-tp25470.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org