NoNodeAvailableException (None of the configured nodes are available) error when trying to push data to Elastic from a Spark job
Hi, Any reason why we might be getting this error? The code seems to work fine in the non-distributed mode but the same code when run from a Spark job is not able to get to Elastic. Spark version: 2.0.1 built for Hadoop 2.4, Scala 2.11 Elastic version: 2.3.1 I've verified the Elastic hosts and the cluster name. The spot in the code where this happens is: ClusterHealthResponse clusterHealthResponse = client.admin().cluster() .prepareHealth() .setWaitForGreenStatus() .setTimeout(TimeValue.timeValueSeconds(10)) .get(); Stack trace: Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1890) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1903) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1916) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1930) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:902) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:900) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:900) at org.apache.spark.api.java.JavaRDDLike$class.foreachPartition(JavaRDDLike.scala:218) at org.apache.spark.api.java.AbstractJavaRDDLike.foreachPartition(JavaRDDLike.scala:45) at com.myco.MyDriver$3.call(com.myco.MyDriver.java:214) at com.myco.MyDriver$3.call(KafkaSparkStreamingDriver.java:201) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:272) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:272) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) at scala.util.Try$.apply(Try.scala:192) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:247) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:247) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:247) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:246) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: NoNodeAvailableException[None of the configured nodes are available:
How to set the heap size on consumers?
Hi, I'm seeing quite a bit of information on Spark memory management. I'm just trying to set the heap size, e.g. Xms as 512m and Xmx as 1g or some such. Per http://apache-spark-user-list.1001560.n3.nabble.com/Use-of-SPARK-DAEMON-JAVA-OPTS-tt10479.html#a10529: SPARK_DAEMON_JAVA_OPTS is not intended for setting memory. Please use SPARK_DAEMON_MEMORY instead. It turns out that java respects only the last -Xms and -Xmx values, and in spark-class we put SPARK_DAEMON_JAVA_OPTS before the SPARK_DAEMON_MEMORY. In general, memory configuration in spark should not be done through any config or environment variable that references java opts. Is this still applicable for Spark 1.3/1.4? Are there any plans to tackle https://issues.apache.org/jira/browse/SPARK-1264 ? Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-set-the-heap-size-on-consumers-tp23810.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
What is a best practice for passing environment variables to Spark workers?
I have about 20 environment variables to pass to my Spark workers. Even though they're in the init scripts on the Linux box, the workers don't see these variables. Does Spark do something to shield itself from what may be defined in the environment? I see multiple pieces of info on how to pass the env vars into workers and they seem dated and/or unclear. Here: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-pass-config-variables-to-workers-tt5780.html SparkConf conf = new SparkConf(); conf.set(spark.myapp.myproperty, propertyValue); OR set them in spark-defaults.conf, as in spark.config.one value spark.config.two value2 In another posting, http://apache-spark-user-list.1001560.n3.nabble.com/How-to-set-environment-variable-for-a-spark-job-tt3180.html: conf.setExecutorEnv(ORACLE_HOME, myOraHome) conf.setExecutorEnv(SPARK_JAVA_OPTS, -Djava.library.path=/my/custom/path) The configuration guide talks about spark.executorEnv.[EnvironmentVariableName] -- Add the environment variable specified by EnvironmentVariableName to the Executor process. The user can specify multiple of these to set multiple environment variables. Then there are mentions of SPARK_JAVA_OPTS which seems to be deprecated (?) What is the easiest/cleanest approach here? Ideally, I'd not want to burden my driver program with explicit knowledge of all the env vars that are needed on the worker side. I'd also like to avoid having to jam them into spark-defaults.conf since they're already set in the system init scripts, so why duplicate. I suppose one approach would be to namespace all my vars to start with a well-known prefix, then cycle through the env in the driver and stuff all these variables into the Spark context. If I'm doing that, would I want to conf.set(spark.myapp.myproperty, propertyValue); and is spark. necessary? or was that just part of the example? or would I want to conf.setExecutorEnv(MYPREFIX_MY_VAR_1, some-value); Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/What-is-a-best-practice-for-passing-environment-variables-to-Spark-workers-tp23751.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Best practice for using singletons on workers (seems unanswered) ?
Hi, I am seeing a lot of posts on singletons vs. broadcast variables, such as * http://apache-spark-user-list.1001560.n3.nabble.com/Best-way-to-have-some-singleton-per-worker-tt20277.html * http://apache-spark-user-list.1001560.n3.nabble.com/How-to-share-a-NonSerializable-variable-among-tasks-in-the-same-worker-node-tt11048.html#a21219 What's the best approach to instantiate an object once and have it be reused by the worker(s). E.g. I have an object that loads some static state such as e.g. a dictionary/map, is a part of 3rd party API and is not serializable. I can't seem to get it to be a singleton on the worker side as the JVM appears to be wiped on every request so I get a new instance. So the singleton doesn't stick. Is there an approach where I could have this object or a wrapper of it be a broadcast var? Can Kryo get me there? would that basically mean writing a custom serializer? However, the 3rd party object may have a bunch of member vars hanging off it, so serializing it properly may be non-trivial... Any pointers/hints greatly appreciated. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Best-practice-for-using-singletons-on-workers-seems-unanswered-tp23692.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Any way to retrieve time of message arrival to Kafka topic, in Spark Streaming?
Is there any way to retrieve the time of each message's arrival into a Kafka topic, when streaming in Spark, whether with receiver-based or direct streaming? Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Any-way-to-retrieve-time-of-message-arrival-to-Kafka-topic-in-Spark-Streaming-tp23442.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Registering custom metrics
Hi Gerard, Have there been any responses? Any insights as to what you ended up doing to enable custom metrics? I'm thinking of implementing a custom metrics sink, not sure how doable that is yet... Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Registering-custom-metrics-tp17765p23426.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Custom Metrics Sink
Hi, I was wondering if there've been any responses to this? Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Custom-Metrics-Sink-tp10068p23425.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
The Initial job has not accepted any resources error; can't seem to set
Hi, I'm running Spark Standalone on a single node with 16 cores. Master and 4 workers are running. I'm trying to submit two applications via spark-submit and am getting the following error when submitting the second one: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources. The Web UI shows the first job taking up all the cores. Have tried setting spark.deploy.defaultCores, or spark.cores.max, or both, at the value of 2: spark-submit \ --conf spark.deploy.defaultCores=2 spark.cores.max=2 \ ... or spark-submit \ --conf spark.deploy.defaultCores=2 \ ... This doesn't seem to get propagated. Or perhaps this is not the way to pass this in? Does spark.executor.cores play into this? I have it set to 2 in spark-defaults.conf. Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/The-Initial-job-has-not-accepted-any-resources-error-can-t-seem-to-set-tp23398.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: The Initial job has not accepted any resources error; can't seem to set
I just realized that --conf needs to be one key-value pair per line. And somehow I needed --conf spark.cores.max=2 \ However, when it was --conf spark.deploy.defaultCores=2 \ then one job would take up all 16 cores on the box. What's the actual model here? We've got 10 apps we want to submit. These are apps that consume, directly, out of Kafka topics. Now with max=2 I'm lacking a few cores. What should the actual strategy be here? How do the below parameters affect this strategy and each other? Set this (max) lower on a shared cluster to prevent users from grabbing the whole cluster by default. But why tie a consumer to 1 or 2 cores only? isn't the idea to split RDD's into partitions and send them to multiple workers? spark.cores.max Default=not set When running on a standalone deploy cluster or a Mesos cluster in coarse-grained sharing mode, the maximum amount of CPU cores to request for the application from across the cluster (not from each machine). If not set, the default will be spark.deploy.defaultCores on Spark's standalone cluster manager, or infinite (all available cores) on Mesos. spark.executor.cores Default=1 in YARN mode, all the available cores on the worker in standalone mode. The number of cores to use on each executor. For YARN and standalone mode only. In standalone mode, setting this parameter allows an application to run multiple executors on the same worker, provided that there are enough cores on that worker. Otherwise, only one executor per application will run on each worker. spark.deploy.defaultCores Default=infinite Default number of cores to give to applications in Spark's standalone mode if they don't set spark.cores.max. If not set, applications always get all available cores unless they configure spark.cores.max themselves. Set this lower on a shared cluster to prevent users from grabbing the whole cluster by default. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/The-Initial-job-has-not-accepted-any-resources-error-can-t-seem-to-set-tp23398p23399.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
What happens when a streaming consumer job is killed then restarted?
I'd like to understand better what happens when a streaming consumer job (with direct streaming, but also with receiver-based streaming) is killed/terminated/crashes. Assuming it was processing a batch of RDD data, what happens when the job is restarted? How much state is maintained within Spark's checkpointing to allow for little or no data loss? For the direct streaming case, would we need to update offsets in Zookeeper to achieve more fault tolerance? I'm looking at https://databricks.com/blog/2015/01/15/improved-driver-fault-tolerance-and-zero-data-loss-in-spark-streaming.html and it talks about the Write-Ahead Logs. Do they work with direct streaming? With write ahead logs in place, e.g. streaming from Kafka, where would a restarted consumer resume processing? E.g. it was processing Message# 25 out of 100 messages in the Kafka topic when it crashed or was terminated. Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/What-happens-when-a-streaming-consumer-job-is-killed-then-restarted-tp23348.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Custom Spark metrics?
I'm looking at the doc here: https://spark.apache.org/docs/latest/monitoring.html. Is there a way to define custom metrics in Spark, via Coda Hale perhaps, and emit those? Can a custom metrics sink be defined? And, can such a sink collect some metrics, execute some metrics handling logic, then invoke a callback and notify the Spark consumers that had emitted the metrics that that logic has been executed? Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Custom-Spark-metrics-tp23350.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
What is Spark's data retention policy?
What is Spark's data retention policy? As in, the jobs that are sent from the master to the worker nodes, how long do they persist on those nodes? What about the RDD data, how is that cleaned up? Are all RDD's cleaned up at GC time unless they've been .persist()'ed or .cache()'ed? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/What-is-Spark-s-data-retention-policy-tp23349.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Split RDD based on criteria
Hi, I'm gathering that the typical approach for splitting an RDD is to apply several filters to it. rdd1 = rdd.filter(func1); rdd2 = rdd.filter(func2); ... Is there/should there be a way to create 'buckets' like these in one go? ListRDD rddList = rdd.filter(func1, func2, ..., funcN) Another angle here is, when applying a filter(func), is there a way to get two RDD's back, one for which func returned true for all elements of the original RDD (the one being filtered), and the other one for which func returned false for all the elements? PairRDD pair = rdd.filterTrueFalse(func); Right now I'm doing RDD x = rdd.filter(func); RDD y = rdd.filter(reverseOfFunc); This seems a bit tautological to me, though Spark must be optimizing this out (?) Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Split-RDD-based-on-criteria-tp23254.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to share large resources like dictionaries while processing data with Spark ?
We have some pipelines defined where sometimes we need to load potentially large resources such as dictionaries. What would be the best strategy for sharing such resources among the transformations/actions within a consumer? Can they be shared somehow across the RDD's? I'm looking for a way to load such a resource once into the cluster memory and have it be available throughout the lifecycle of a consumer... Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-share-large-resources-like-dictionaries-while-processing-data-with-Spark-tp23162.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Objects serialized before foreachRDD/foreachPartition ?
I'm looking at https://spark.apache.org/docs/latest/tuning.html. Basically the takeaway is that all objects passed into the code processing RDD's must be serializable. So if I've got a few objects that I'd rather initialize once and deinitialize once outside of the logic processing the RDD's, I'd need to think twice about the costs of serializing such objects, it would seem. In the below, does the Spark serialization happen before calling foreachRDD or before calling foreachPartition? Param param = new Param(); param.initialize(); messageBodies.foreachRDD(new FunctionJavaRDDlt;String, Void() { @Override public Void call(JavaRDDString rdd) throws Exception { ProcessPartitionFunction func = new ProcessPartitionFunction(param); rdd.foreachPartition(func); return null; } }); param.deinitialize(); If param gets initialized to a significant memory footprint, are we better off creating/initializing it before calling new ProcessPartitionFunction() or perhaps in the 'call' method within that function? I'm trying to avoid calling expensive init()/deinit() methods while balancing against the serialization costs. Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Objects-serialized-before-foreachRDD-foreachPartition-tp23134.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
StreamingListener, anyone?
Hi, I've got a Spark Streaming driver job implemented and in it, I register a streaming listener, like so: JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(params.getBatchDurationMillis())); jssc.addStreamingListener(new JobListener(jssc)); where JobListener is defined like so private static class JobListener implements StreamingListener { private JavaStreamingContext jssc; JobListener(JavaStreamingContext jssc) { this.jssc = jssc; } @Override public void onBatchCompleted(StreamingListenerBatchCompleted batchCompleted) { System.out.println( Batch completed.); jssc.stop(true); System.out.println( The job has been stopped.); } I do not seem to be seeing onBatchCompleted being triggered. Am I doing something wrong? In this particular case, I was trying to implement a bulk ingest type of logic where the first batch is all we're interested in (reading out of a Kafka topic with offset reset set to smallest). -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/StreamingListener-anyone-tp23140.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Behavior of the spark.streaming.kafka.maxRatePerPartition config param?
Hi, Could someone explain the behavior of the spark.streaming.kafka.maxRatePerPartition parameter? The doc says An important (configuration) is spark.streaming.kafka.maxRatePerPartition which is the maximum rate at which each Kafka partition will be read by (the) direct API. What is the default behavior for this parameter? From some testing it appears that with it not being set, the RDD size tends to be quite low. With it set, we're seeing the consumer picking up items off the topic quite more actively, e.g. -Dspark.streaming.kafka.maxRatePerPartition=1000 in --driver-java-options. Does this parameter set the RDD size to a very low value? seems to be defaulting to 0... but what's the effect of that? protected val maxMessagesPerPartition: Option[Long] = { val ratePerSec = context.sparkContext.getConf.getInt( spark.streaming.kafka.maxRatePerPartition, 0) if (ratePerSec 0) { val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000 Some((secsPerBatch * ratePerSec).toLong) } else { None } } // limits the maximum number of messages per partition protected def clamp( leaderOffsets: Map[TopicAndPartition, LeaderOffset]): Map[TopicAndPartition, LeaderOffset] = { maxMessagesPerPartition.map { mmp = leaderOffsets.map { case (tp, lo) = tp - lo.copy(offset = Math.min(currentOffsets(tp) + mmp, lo.offset)) } }.getOrElse(leaderOffsets) } what would we limit by default? And once Spark Streaming does pick up messages, would it be at the maximum value? does it ever fall below max even if there are max or more than max in the topic? Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Behavior-of-the-spark-streaming-kafka-maxRatePerPartition-config-param-tp23117.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to monitor Spark Streaming from Kafka?
Hi, What are some of the good/adopted approached to monitoring Spark Streaming from Kafka? I see that there are things like http://quantifind.github.io/KafkaOffsetMonitor, for example. Do they all assume that Receiver-based streaming is used? Then Note that one disadvantage of this approach (Receiverless Approach, #2) is that it does not update offsets in Zookeeper, hence Zookeeper-based Kafka monitoring tools will not show progress. However, you can access the offsets processed by this approach in each batch and update Zookeeper yourself. The code sample, however, seems sparse. What do you need to do here? - directKafkaStream.foreachRDD( new FunctionJavaPairRDDlt;String, String, Void() { @Override public Void call(JavaPairRDDString, Integer rdd) throws IOException { OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd).offsetRanges // offsetRanges.length = # of Kafka partitions being consumed ... return null; } } ); and if these are updated, will KafkaOffsetMonitor work? Monitoring seems to center around the notion of a consumer group. But in the receiverless approach, code on the Spark consumer side doesn't seem to expose a consumer group parameter. Where does it go? Can I/should I just pass in group.id as part of the kafkaParams HashMap? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-monitor-Spark-Streaming-from-Kafka-tp23103.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark and logging
I'm wondering how logging works in Spark. I see that there's the log4j.properties.template file in the conf directory. Safe to assume Spark is using log4j 1? What's the approach if we're using log4j 2? I've got a log4j2.xml file in the job jar which seems to be working for my log statements but Spark's logging seems to be taking its own default route despite me setting Spark's log to 'warn' only. More interestingly, what happens if file-based loggers are at play? If a log statement is in the driver program I assume it'll get logged into a log file that's collocated with the driver. What about log statements in the partition processing functions? Will their log statements get logged into a file residing on a given 'slave' machine, or will Spark capture this log output and divert it into the log file of the driver's machine? Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-and-logging-tp23049.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark Streaming from Kafka - no receivers and spark.streaming.receiver.maxRate?
Hi, With the no receivers approach to streaming from Kafka, is there a way to set something like spark.streaming.receiver.maxRate so as not to overwhelm the Spark consumers? What would be some of the ways to throttle the streamed messages so that the consumers don't run out of memory? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-from-Kafka-no-receivers-and-spark-streaming-receiver-maxRate-tp23061.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?
Hi, I'm trying to understand if there are design patterns for autoscaling Spark (add/remove slave machines to the cluster) based on the throughput. Assuming we can throttle Spark consumers, the respective Kafka topics we stream data from would start growing. What are some of the ways to generate the metrics on the number of new messages and the rate they are piling up? This perhaps is more of a Kafka question; I see a pretty sparse javadoc with the Metric interface and not much else... What are some of the ways to expand/contract the Spark cluster? Someone has mentioned Mesos... I see some info on Spark metrics in the Spark monitoring guide https://spark.apache.org/docs/latest/monitoring.html . Do we want to perhaps implement a custom sink that would help us autoscale up or down based on the throughput? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Autoscaling-Spark-cluster-based-on-topic-sizes-rate-of-growth-in-Kafka-or-Spark-s-metrics-tp23062.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Pipelining with Spark
From the performance and scalability standpoint, is it better to plug in, say a multi-threaded pipeliner into a Spark job, or implement pipelining via Spark's own transformation mechanisms such as e.g. map or filter? I'm seeing some reference architectures where things like 'morphlines' are plugged into Spark but it'd seem that Spark may yield better performance and scalability if each stage within a pipeline is a function in a Spark job - ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Pipelining-with-Spark-tp22976.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark Streaming and reducing latency
I keep hearing the argument that the way Discretized Streams work with Spark Streaming is a lot more of a batch processing algorithm than true streaming. For streaming, one would expect a new item, e.g. in a Kafka topic, to be available to the streaming consumer immediately. With the discretized streams, streaming is done with batch intervals i.e. the consumer has to wait the interval to be able to get at the new items. If one wants to reduce latency it seems the only way to do this would be by reducing the batch interval window. However, that may lead to a great deal of churn, with many requests going into Kafka out of the consumers, potentially with no results whatsoever as there's nothing new in the topic at the moment. Is there a counter-argument to this reasoning? What are some of the general approaches to reduce latency folks might recommend? Or, perhaps there are ways of dealing with this at the streaming API level? If latency is of great concern, is it better to look into streaming from something like Flume where data is pushed to consumers rather than pulled by them? Are there techniques, in that case, to ensure the consumers don't get overwhelmed with new data? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-and-reducing-latency-tp22922.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to speed up data ingestion with Spark
Hi, I'm looking at a data ingestion implementation which streams data out of Kafka with Spark Streaming, then uses a multi-threaded pipeline engine to process the data in each partition. Have folks looked at ways of speeding up this type of ingestion? Let's say the main part of the ingest process is fetching documents from somewhere and performing text extraction on them. Is this type of processing best done by expressing the pipelining with Spark RDD transformations or by just kicking off a multi-threaded pipeline? Or, is using a multi-threaded pipeliner per partition is a decent strategy and the performance comes from running in a clustered mode? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-speed-up-data-ingestion-with-Spark-tp22859.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark and RabbitMQ
Are there existing or under development versions/modules for streaming messages out of RabbitMQ with SparkStreaming, or perhaps a RabbitMQ RDD? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-and-RabbitMQ-tp22852.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to stream all data out of a Kafka topic once, then terminate job?
Hi, I'm wondering about the use-case where you're not doing continuous, incremental streaming of data out of Kafka but rather want to publish data once with your Producer(s) and consume it once, in your Consumer, then terminate the consumer Spark job. JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(...)); The batchDuration parameter is The time interval at which streaming data will be divided into batches. Can this be worked somehow to cause Spark Streaming to just get all the available data, then let all the RDD's within the Kafka discretized stream get processed, and then just be done and terminate, rather than wait another period and try and process any more data from Kafka? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-stream-all-data-out-of-a-Kafka-topic-once-then-terminate-job-tp22698.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Apache Spark User List: people's responses not showing in the browser view
Sorry if this is a total noob question but is there a reason why I'm only seeing folks' responses to my posts in emails but not in the browser view under apache-spark-user-list.1001560.n3.nabble.com? Is this a matter of setting your preferences such that your responses only go to email and never to the browser-based view of the list? I don't seem to see such a preference... -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-User-List-people-s-responses-not-showing-in-the-browser-view-tp22135.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark and Morphlines, parallelization, multithreading
Still a Spark noob grappling with the concepts... I'm trying to grok the idea of integrating something like the Morphlines pipelining library with Spark (or SparkStreaming). The Kite/Morphlines doc states that runtime executes all commands of a given morphline in the same thread... there are no queues, no handoffs among threads, no context switches and no serialization between commands, which minimizes performance overheads. Further: There is no need for a morphline to manage multiple processes, nodes, or threads because this is already addressed by host systems such as MapReduce, Flume, Spark or Storm. My question is, how exactly does Spark manage parallelization and multi-treading aspects of RDD processing? As I understand it, each collection of data is split into partitions and each partition is sent over to a slave machine to perform computations. So, for each data partition, how many processes are created? And for each process, how many threads? Knowing that would help me understand how to structure the following: JavaPairInputDStreamString, String messages = KafkaUtils.createDirectStream( jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet); JavaDStreamString messageBodies = messages.map(new FunctionTuple2lt;String, String, String() { @Override public String call(Tuple2String, String tuple2) { return tuple2._2(); } }); Would I want to create a morphline in a 'messages.foreachRDD' block? then invoke the morphline on each messageBody? What will Spark be doing behind the scenes as far as multiple processes and multiple threads? Should I rely on it to optimize performance with multiple threads and not worry about plugging in a multi-threaded pipelining engine? Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-and-Morphlines-parallelization-multithreading-tp22134.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
NotSerializableException: org.apache.http.impl.client.DefaultHttpClient when trying to send documents to Solr
I'm using Solrj in a Spark program. When I try to send the docs to Solr, I get the NotSerializableException on the DefaultHttpClient. Is there a possible fix or workaround? I'm using Spark 1.2.1 with Hadoop 2.4, SolrJ is version 4.0.0. final HttpSolrServer solrServer = new HttpSolrServer(SOLR_SERVER_URL); ... JavaRDDSolrInputDocument solrDocs = rdd.map(new FunctionRow, SolrInputDocument() { public SolrInputDocument call(Row r) { return r.toSolrDocument(); } }); solrDocs.foreachPartition(new VoidFunctionIteratorlt;SolrInputDocument() { public void call(IteratorSolrInputDocument solrDocIterator) throws Exception { ListSolrInputDocument batch = new ArrayListSolrInputDocument(); while (solrDocIterator.hasNext()) { SolrInputDocument inputDoc = solrDocIterator.next(); batch.add(inputDoc); if (batch.size() = batchSize) { Utils.sendBatchToSolr(solrServer, solrCollection, batch); } } if (!batch.isEmpty()) { Utils.sendBatchToSolr(solrServer, solrCollection, batch); } } }); Exception in thread main org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1478) at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:789) at org.apache.spark.api.java.JavaRDDLike$class.foreachPartition(JavaRDDLike.scala:195) at org.apache.spark.api.java.JavaRDD.foreachPartition(JavaRDD.scala:32) at com.kona.motivis.spark.proto.SparkProto.execute(SparkProto.java:158) at com.kona.motivis.spark.proto.SparkProto.main(SparkProto.java:186) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.io.NotSerializableException: org.apache.http.impl.client.DefaultHttpClient at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/NotSerializableException-org-apache-http-impl-client-DefaultHttpClient-when-trying-to-send-documentsr-tp21713.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Class loading issue, spark.files.userClassPathFirst doesn't seem to be working
I'm getting the below error when running spark-submit on my class. This class has a transitive dependency on HttpClient v.4.3.1 since I'm calling SolrJ 4.10.3 from within the class. This is in conflict with the older version, HttpClient 3.1 that's a dependency of Hadoop 2.4 (I'm running Spark 1.2.1 built for Hadoop 2.4). I've tried setting spark.files.userClassPathFirst to true in SparkConf in my program, also setting it to true in $SPARK-HOME/conf/spark-defaults.conf as spark.files.userClassPathFirst true No go, I'm still getting the error, as below. Is there anything else I can try? Are there any plans in Spark to support multiple class loaders? Exception in thread main java.lang.NoSuchMethodError: org.apache.http.impl.conn.SchemeRegistryFactory.createSystemDefault()Lorg/apache/http/conn/scheme/SchemeRegistry; at org.apache.http.impl.client.SystemDefaultHttpClient.createClientConnectionManager(SystemDefaultHttpClient.java:121) at org.apache.http.impl.client.AbstractHttpClient.getConnectionManager(AbstractHttpClient.java:445) at org.apache.solr.client.solrj.impl.HttpClientUtil.setMaxConnections(HttpClientUtil.java:206) at org.apache.solr.client.solrj.impl.HttpClientConfigurer.configure(HttpClientConfigurer.java:35) at org.apache.solr.client.solrj.impl.HttpClientUtil.configureClient(HttpClientUtil.java:142) at org.apache.solr.client.solrj.impl.HttpClientUtil.createClient(HttpClientUtil.java:118) at org.apache.solr.client.solrj.impl.HttpSolrServer.init(HttpSolrServer.java:168) at org.apache.solr.client.solrj.impl.HttpSolrServer.init(HttpSolrServer.java:141) ... -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Class-loading-issue-spark-files-userClassPathFirst-doesn-t-seem-to-be-working-tp21693.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org