NoNodeAvailableException (None of the configured nodes are available) error when trying to push data to Elastic from a Spark job

2017-02-07 Thread dgoldenberg
Hi,

Any reason why we might be getting this error? The code seems to work fine
in the non-distributed mode but the same code when run from a Spark job is
not able to get to Elastic.

Spark version: 2.0.1 built for Hadoop 2.4, Scala 2.11
Elastic version: 2.3.1

I've verified the Elastic hosts and the cluster name.

The spot in the code where this happens is:

ClusterHealthResponse clusterHealthResponse = client.admin().cluster()
.prepareHealth()
.setWaitForGreenStatus()
.setTimeout(TimeValue.timeValueSeconds(10))
.get();

Stack trace:

Driver stacktrace:
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at scala.Option.foreach(Option.scala:257)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1890)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1903)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1916)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1930)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:902)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:900)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:900)
at
org.apache.spark.api.java.JavaRDDLike$class.foreachPartition(JavaRDDLike.scala:218)
at
org.apache.spark.api.java.AbstractJavaRDDLike.foreachPartition(JavaRDDLike.scala:45)
at com.myco.MyDriver$3.call(com.myco.MyDriver.java:214)
at com.myco.MyDriver$3.call(KafkaSparkStreamingDriver.java:201)
at
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:272)
at
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:272)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:247)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:247)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:247)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:246)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: NoNodeAvailableException[None of the configured nodes are
available: 

How to set the heap size on consumers?

2015-07-13 Thread dgoldenberg
Hi,

I'm seeing quite a bit of information on Spark memory management. I'm just
trying to set the heap size, e.g. Xms as 512m and Xmx as 1g or some such.

Per
http://apache-spark-user-list.1001560.n3.nabble.com/Use-of-SPARK-DAEMON-JAVA-OPTS-tt10479.html#a10529:

SPARK_DAEMON_JAVA_OPTS is not intended for setting memory. Please use
SPARK_DAEMON_MEMORY instead. It turns out that java respects only the last
-Xms and -Xmx values, and in spark-class we put SPARK_DAEMON_JAVA_OPTS
before the SPARK_DAEMON_MEMORY. In general, memory configuration in spark
should not be done through any config or environment variable that
references java opts.

Is this still applicable for Spark 1.3/1.4?

Are there any plans to tackle
https://issues.apache.org/jira/browse/SPARK-1264 ?

Thanks.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-set-the-heap-size-on-consumers-tp23810.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



What is a best practice for passing environment variables to Spark workers?

2015-07-09 Thread dgoldenberg
I have about 20 environment variables to pass to my Spark workers. Even
though they're in the init scripts on the Linux box, the workers don't see
these variables.

Does Spark do something to shield itself from what may be defined in the
environment?

I see multiple pieces of info on how to pass the env vars into workers and
they seem dated and/or unclear.

Here:
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-pass-config-variables-to-workers-tt5780.html

SparkConf conf = new SparkConf(); 
conf.set(spark.myapp.myproperty, propertyValue); 
OR
set them in spark-defaults.conf, as in
spark.config.one value
spark.config.two value2

In another posting,
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-set-environment-variable-for-a-spark-job-tt3180.html:
conf.setExecutorEnv(ORACLE_HOME, myOraHome) 
conf.setExecutorEnv(SPARK_JAVA_OPTS,
-Djava.library.path=/my/custom/path) 

The configuration guide talks about
spark.executorEnv.[EnvironmentVariableName] -- Add the environment variable
specified by EnvironmentVariableName to the Executor process. The user can
specify multiple of these to set multiple environment variables.

Then there are mentions of SPARK_JAVA_OPTS which seems to be deprecated (?)

What is the easiest/cleanest approach here?  Ideally, I'd not want to burden
my driver program with explicit knowledge of all the env vars that are
needed on the worker side.  I'd also like to avoid having to jam them into
spark-defaults.conf since they're already set in the system init scripts, so
why duplicate.

I suppose one approach would be to namespace all my vars to start with a
well-known prefix, then cycle through the env in the driver and stuff all
these variables into the Spark context.  If I'm doing that, would I want to 

conf.set(spark.myapp.myproperty, propertyValue);

and is spark. necessary? or was that just part of the example?

or would I want to

conf.setExecutorEnv(MYPREFIX_MY_VAR_1, some-value);

Thanks.







--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/What-is-a-best-practice-for-passing-environment-variables-to-Spark-workers-tp23751.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Best practice for using singletons on workers (seems unanswered) ?

2015-07-07 Thread dgoldenberg
Hi,

I am seeing a lot of posts on singletons vs. broadcast variables, such as
*
http://apache-spark-user-list.1001560.n3.nabble.com/Best-way-to-have-some-singleton-per-worker-tt20277.html
*
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-share-a-NonSerializable-variable-among-tasks-in-the-same-worker-node-tt11048.html#a21219

What's the best approach to instantiate an object once and have it be reused
by the worker(s).

E.g. I have an object that loads some static state such as e.g. a
dictionary/map, is a part of 3rd party API and is not serializable.  I can't
seem to get it to be a singleton on the worker side as the JVM appears to be
wiped on every request so I get a new instance.  So the singleton doesn't
stick.

Is there an approach where I could have this object or a wrapper of it be a
broadcast var? Can Kryo get me there? would that basically mean writing a
custom serializer?  However, the 3rd party object may have a bunch of member
vars hanging off it, so serializing it properly may be non-trivial...

Any pointers/hints greatly appreciated.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Best-practice-for-using-singletons-on-workers-seems-unanswered-tp23692.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Any way to retrieve time of message arrival to Kafka topic, in Spark Streaming?

2015-06-22 Thread dgoldenberg
Is there any way to retrieve the time of each message's arrival into a Kafka
topic, when streaming in Spark, whether with receiver-based or direct
streaming?

Thanks.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Any-way-to-retrieve-time-of-message-arrival-to-Kafka-topic-in-Spark-Streaming-tp23442.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Registering custom metrics

2015-06-22 Thread dgoldenberg
Hi Gerard,

Have there been any responses? Any insights as to what you ended up doing to
enable custom metrics? I'm thinking of implementing a custom metrics sink,
not sure how doable that is yet...

Thanks.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Registering-custom-metrics-tp17765p23426.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Custom Metrics Sink

2015-06-22 Thread dgoldenberg
Hi,

I was wondering if there've been any responses to this?

Thanks.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Custom-Metrics-Sink-tp10068p23425.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



The Initial job has not accepted any resources error; can't seem to set

2015-06-18 Thread dgoldenberg
Hi,

I'm running Spark Standalone on a single node with 16 cores. Master and 4
workers are running.

I'm trying to submit two applications via spark-submit and am getting the
following error when submitting the second one: Initial job has not
accepted any resources; check your cluster UI to ensure that workers are
registered and have sufficient resources.

The Web UI shows the first job taking up all the cores. 

Have tried setting spark.deploy.defaultCores, or spark.cores.max, or both,
at the value of 2:
spark-submit \
--conf spark.deploy.defaultCores=2 spark.cores.max=2 \
...
or
spark-submit \
--conf spark.deploy.defaultCores=2 \
...
This doesn't seem to get propagated. Or perhaps this is not the way to pass
this in?

Does spark.executor.cores play into this? I have it set to 2 in
spark-defaults.conf.

Thanks.






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/The-Initial-job-has-not-accepted-any-resources-error-can-t-seem-to-set-tp23398.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: The Initial job has not accepted any resources error; can't seem to set

2015-06-18 Thread dgoldenberg
I just realized that --conf needs to be one key-value pair per line. And
somehow I needed 
--conf spark.cores.max=2 \

However, when it was 
--conf spark.deploy.defaultCores=2 \

then one job would take up all 16 cores on the box.

What's the actual model here?

We've got 10 apps we want to submit. These are apps that consume, directly,
out of Kafka topics. Now with max=2 I'm lacking a few cores. What should the
actual strategy be here?

How do the below parameters affect this strategy and each other?

Set this (max) lower on a shared cluster to prevent users from grabbing the
whole cluster by default.  But why tie a consumer to 1 or 2 cores only?
isn't the idea to split RDD's into partitions and send them to multiple
workers?

spark.cores.max
Default=not set
When running on a standalone deploy cluster or a Mesos cluster in
coarse-grained sharing mode,
the maximum amount of CPU cores to request for the application from across
the cluster
(not from each machine). If not set, the default will be
spark.deploy.defaultCores on
Spark's standalone cluster manager, or infinite (all available cores) on
Mesos.

spark.executor.cores
Default=1 in YARN mode, all the available cores on the worker in standalone
mode.
The number of cores to use on each executor. For YARN and standalone mode
only. In standalone mode,
setting this parameter allows an application to run multiple executors on
the same worker,
provided that there are enough cores on that worker. Otherwise, only one
executor per application
will run on each worker.

spark.deploy.defaultCores
Default=infinite
Default number of cores to give to applications in Spark's standalone mode
if they don't set
spark.cores.max. If not set, applications always get all available cores
unless they configure
spark.cores.max themselves. Set this lower on a shared cluster to prevent
users from grabbing
the whole cluster by default. 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/The-Initial-job-has-not-accepted-any-resources-error-can-t-seem-to-set-tp23398p23399.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



What happens when a streaming consumer job is killed then restarted?

2015-06-16 Thread dgoldenberg
I'd like to understand better what happens when a streaming consumer job
(with direct streaming, but also with receiver-based streaming) is
killed/terminated/crashes.

Assuming it was processing a batch of RDD data, what happens when the job is
restarted?  How much state is maintained within Spark's checkpointing to
allow for little or no data loss?

For the direct streaming case, would we need to update offsets in Zookeeper
to achieve more fault tolerance?

I'm looking at
https://databricks.com/blog/2015/01/15/improved-driver-fault-tolerance-and-zero-data-loss-in-spark-streaming.html
and it talks about the Write-Ahead Logs. Do they work with direct streaming?

With write ahead logs in place, e.g. streaming from Kafka, where would a
restarted consumer resume processing?  E.g. it was processing Message# 25
out of 100 messages in the Kafka topic when it crashed or was terminated.

Thanks.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/What-happens-when-a-streaming-consumer-job-is-killed-then-restarted-tp23348.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Custom Spark metrics?

2015-06-16 Thread dgoldenberg
I'm looking at the doc here:
https://spark.apache.org/docs/latest/monitoring.html.

Is there a way to define custom metrics in Spark, via Coda Hale perhaps, and
emit those?

Can a custom metrics sink be defined?

And, can such a sink collect some metrics, execute some metrics handling
logic, then invoke a callback and notify the Spark consumers that had
emitted the metrics that that logic has been executed?

Thanks.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Custom-Spark-metrics-tp23350.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



What is Spark's data retention policy?

2015-06-16 Thread dgoldenberg
What is Spark's data retention policy?

As in, the jobs that are sent from the master to the worker nodes, how long
do they persist on those nodes?  What about the RDD data, how is that
cleaned up? Are all RDD's cleaned up at GC time unless they've been
.persist()'ed or .cache()'ed?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/What-is-Spark-s-data-retention-policy-tp23349.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Split RDD based on criteria

2015-06-10 Thread dgoldenberg
Hi,

I'm gathering that the typical approach for splitting an RDD is to apply
several filters to it.

rdd1 = rdd.filter(func1);
rdd2 = rdd.filter(func2);
...

Is there/should there be a way to create 'buckets' like these in one go?

ListRDD rddList = rdd.filter(func1, func2, ..., funcN)

Another angle here is, when applying a filter(func), is there a way to get
two RDD's back, one for which func returned true for all elements of the
original RDD (the one being filtered), and the other one for which func
returned false for all the elements?

PairRDD pair = rdd.filterTrueFalse(func);

Right now I'm doing

RDD x = rdd.filter(func);
RDD y = rdd.filter(reverseOfFunc);

This seems a bit tautological to me, though Spark must be optimizing this
out (?)

Thanks.





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Split-RDD-based-on-criteria-tp23254.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to share large resources like dictionaries while processing data with Spark ?

2015-06-04 Thread dgoldenberg
We have some pipelines defined where sometimes we need to load potentially
large resources such as dictionaries.

What would be the best strategy for sharing such resources among the
transformations/actions within a consumer?  Can they be shared somehow
across the RDD's?

I'm looking for a way to load such a resource once into the cluster memory
and have it be available throughout the lifecycle of a consumer...

Thanks.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-share-large-resources-like-dictionaries-while-processing-data-with-Spark-tp23162.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Objects serialized before foreachRDD/foreachPartition ?

2015-06-03 Thread dgoldenberg
I'm looking at https://spark.apache.org/docs/latest/tuning.html.  Basically
the takeaway is that all objects passed into the code processing RDD's must
be serializable. So if I've got a few objects that I'd rather initialize
once and deinitialize once outside of the logic processing the RDD's, I'd
need to think twice about the costs of serializing such objects, it would
seem.

In the below, does the Spark serialization happen before calling foreachRDD
or before calling foreachPartition?

Param param = new Param();
param.initialize();
messageBodies.foreachRDD(new FunctionJavaRDDlt;String, Void() {
  @Override
  public Void call(JavaRDDString rdd) throws Exception {
ProcessPartitionFunction func = new ProcessPartitionFunction(param);
rdd.foreachPartition(func);
return null;
  }
});
param.deinitialize();

If param gets initialized to a significant memory footprint, are we better
off creating/initializing it before calling new ProcessPartitionFunction()
or perhaps in the 'call' method within that function?

I'm trying to avoid calling expensive init()/deinit() methods while
balancing against the serialization costs. Thanks.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Objects-serialized-before-foreachRDD-foreachPartition-tp23134.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



StreamingListener, anyone?

2015-06-03 Thread dgoldenberg
Hi,

I've got a Spark Streaming driver job implemented and in it, I register a
streaming listener, like so:

JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
   Durations.milliseconds(params.getBatchDurationMillis()));
jssc.addStreamingListener(new JobListener(jssc));

where JobListener is defined like so
private static class JobListener implements StreamingListener {

private JavaStreamingContext jssc;

JobListener(JavaStreamingContext jssc) {
this.jssc = jssc;
}

@Override
public void onBatchCompleted(StreamingListenerBatchCompleted
batchCompleted) {
System.out.println( Batch completed.);
jssc.stop(true);
System.out.println( The job has been stopped.);
}


I do not seem to be seeing onBatchCompleted being triggered.  Am I doing
something wrong?

In this particular case, I was trying to implement a bulk ingest type of
logic where the first batch is all we're interested in (reading out of a
Kafka topic with offset reset set to smallest).




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/StreamingListener-anyone-tp23140.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Behavior of the spark.streaming.kafka.maxRatePerPartition config param?

2015-06-02 Thread dgoldenberg
Hi,

Could someone explain the behavior of the
spark.streaming.kafka.maxRatePerPartition parameter? The doc says An
important (configuration) is spark.streaming.kafka.maxRatePerPartition which
is the maximum rate at which each Kafka partition will be read by (the)
direct API.

What is the default behavior for this parameter? From some testing it
appears that with it not being set, the RDD size tends to be quite low. With
it set, we're seeing the consumer picking up items off the topic quite more
actively, e.g. -Dspark.streaming.kafka.maxRatePerPartition=1000 in
--driver-java-options.

Does this parameter set the RDD size to a very low value? 

seems to be defaulting to 0... but what's the effect of that?
  protected val maxMessagesPerPartition: Option[Long] = {
val ratePerSec = context.sparkContext.getConf.getInt(
  spark.streaming.kafka.maxRatePerPartition, 0)
if (ratePerSec  0) {
  val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble /
1000
  Some((secsPerBatch * ratePerSec).toLong)
} else {
  None
}
  }
  // limits the maximum number of messages per partition
  protected def clamp(
leaderOffsets: Map[TopicAndPartition, LeaderOffset]):
Map[TopicAndPartition, LeaderOffset] = {
maxMessagesPerPartition.map { mmp =
  leaderOffsets.map { case (tp, lo) =
tp - lo.copy(offset = Math.min(currentOffsets(tp) + mmp,
lo.offset))
  }
}.getOrElse(leaderOffsets)
  }

what would we limit by default?  And once Spark Streaming does pick up
messages, would it be at the maximum value? does it ever fall below max even
if there are max or more than max in the topic? Thanks.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Behavior-of-the-spark-streaming-kafka-maxRatePerPartition-config-param-tp23117.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to monitor Spark Streaming from Kafka?

2015-06-01 Thread dgoldenberg
Hi,

What are some of the good/adopted approached to monitoring Spark Streaming
from Kafka?  I see that there are things like
http://quantifind.github.io/KafkaOffsetMonitor, for example.  Do they all
assume that Receiver-based streaming is used?

Then Note that one disadvantage of this approach (Receiverless Approach,
#2) is that it does not update offsets in Zookeeper, hence Zookeeper-based
Kafka monitoring tools will not show progress. However, you can access the
offsets processed by this approach in each batch and update Zookeeper
yourself.

The code sample, however, seems sparse. What do you need to do here? -
 directKafkaStream.foreachRDD(
 new FunctionJavaPairRDDlt;String, String, Void() {
 @Override
 public Void call(JavaPairRDDString, Integer rdd) throws
IOException {
 OffsetRange[] offsetRanges =
((HasOffsetRanges)rdd).offsetRanges
 // offsetRanges.length = # of Kafka partitions being consumed
 ...
 return null;
 }
 }
 );

and if these are updated, will KafkaOffsetMonitor work?

Monitoring seems to center around the notion of a consumer group.  But in
the receiverless approach, code on the Spark consumer side doesn't seem to
expose a consumer group parameter.  Where does it go?  Can I/should I just
pass in group.id as part of the kafkaParams HashMap?

Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-monitor-Spark-Streaming-from-Kafka-tp23103.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark and logging

2015-05-27 Thread dgoldenberg
I'm wondering how logging works in Spark.

I see that there's the log4j.properties.template file in the conf directory. 
Safe to assume Spark is using log4j 1?  What's the approach if we're using
log4j 2?  I've got a log4j2.xml file in the job jar which seems to be
working for my log statements but Spark's logging seems to be taking its own
default route despite me setting Spark's log to 'warn' only.

More interestingly, what happens if file-based loggers are at play?

If a log statement is in the driver program I assume it'll get logged into a
log file that's collocated with the driver. What about log statements in the
partition processing functions?  Will their log statements get logged into a
file residing on a given 'slave' machine, or will Spark capture this log
output and divert it into the log file of the driver's machine?

Thanks.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-and-logging-tp23049.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark Streaming from Kafka - no receivers and spark.streaming.receiver.maxRate?

2015-05-27 Thread dgoldenberg
Hi,

With the no receivers approach to streaming from Kafka, is there a way to
set something like spark.streaming.receiver.maxRate so as not to overwhelm
the Spark consumers?

What would be some of the ways to throttle the streamed messages so that the
consumers don't run out of memory?





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-from-Kafka-no-receivers-and-spark-streaming-receiver-maxRate-tp23061.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-05-27 Thread dgoldenberg
Hi,

I'm trying to understand if there are design patterns for autoscaling Spark
(add/remove slave machines to the cluster) based on the throughput.

Assuming we can throttle Spark consumers, the respective Kafka topics we
stream data from would start growing.  What are some of the ways to generate
the metrics on the number of new messages and the rate they are piling up? 
This perhaps is more of a Kafka question; I see a pretty sparse javadoc with
the Metric interface and not much else...

What are some of the ways to expand/contract the Spark cluster? Someone has
mentioned Mesos...

I see some info on Spark metrics in  the Spark monitoring guide
https://spark.apache.org/docs/latest/monitoring.html  .  Do we want to
perhaps implement a custom sink that would help us autoscale up or down
based on the throughput?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Autoscaling-Spark-cluster-based-on-topic-sizes-rate-of-growth-in-Kafka-or-Spark-s-metrics-tp23062.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Pipelining with Spark

2015-05-21 Thread dgoldenberg
From the performance and scalability standpoint, is it better to plug in, say
a multi-threaded pipeliner into a Spark job, or implement pipelining via
Spark's own transformation mechanisms such as e.g. map or filter?

I'm seeing some reference architectures where things like 'morphlines' are
plugged into Spark but it'd seem that Spark may yield better performance and
scalability if each stage within a pipeline is a function in a Spark job - ?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Pipelining-with-Spark-tp22976.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark Streaming and reducing latency

2015-05-17 Thread dgoldenberg
I keep hearing the argument that the way Discretized Streams work with Spark
Streaming is a lot more of a batch processing algorithm than true streaming. 
For streaming, one would expect a new item, e.g. in a Kafka topic, to be
available to the streaming consumer immediately.

With the discretized streams, streaming is done with batch intervals i.e.
the consumer has to wait the interval to be able to get at the new items. If
one wants to reduce latency it seems the only way to do this would be by
reducing the batch interval window. However, that may lead to a great deal
of churn, with many requests going into Kafka out of the consumers,
potentially with no results whatsoever as there's nothing new in the topic
at the moment.

Is there a counter-argument to this reasoning? What are some of the general
approaches to reduce latency  folks might recommend? Or, perhaps there are
ways of dealing with this at the streaming API level? 

If latency is of great concern, is it better to look into streaming from
something like Flume where data is pushed to consumers rather than pulled by
them? Are there techniques, in that case, to ensure the consumers don't get
overwhelmed with new data?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-and-reducing-latency-tp22922.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to speed up data ingestion with Spark

2015-05-12 Thread dgoldenberg
Hi,

I'm looking at a data ingestion implementation which streams data out of
Kafka with Spark Streaming, then uses a multi-threaded pipeline engine to
process the data in each partition.  Have folks looked at ways of speeding
up this type of ingestion?

Let's say the main part of the ingest process is fetching documents from
somewhere and performing text extraction on them. Is this type of processing
best done by expressing the pipelining with Spark RDD transformations or by
just kicking off a multi-threaded pipeline?

Or, is using a multi-threaded pipeliner per partition is a decent strategy
and the performance comes from running in a clustered mode?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-speed-up-data-ingestion-with-Spark-tp22859.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark and RabbitMQ

2015-05-11 Thread dgoldenberg
Are there existing or under development versions/modules for streaming
messages out of RabbitMQ with SparkStreaming, or perhaps a RabbitMQ RDD?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-and-RabbitMQ-tp22852.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to stream all data out of a Kafka topic once, then terminate job?

2015-04-28 Thread dgoldenberg
Hi,

I'm wondering about the use-case where you're not doing continuous,
incremental streaming of data out of Kafka but rather want to publish data
once with your Producer(s) and consume it once, in your Consumer, then
terminate the consumer Spark job.

JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
Durations.milliseconds(...));

The batchDuration parameter is The time interval at which streaming data
will be divided into batches. Can this be worked somehow to cause Spark
Streaming to just get all the available data, then let all the RDD's within
the Kafka discretized stream get processed, and then just be done and
terminate, rather than wait another period and try and process any more data
from Kafka?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-stream-all-data-out-of-a-Kafka-topic-once-then-terminate-job-tp22698.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Apache Spark User List: people's responses not showing in the browser view

2015-03-18 Thread dgoldenberg
Sorry if this is a total noob question but is there a reason why I'm only
seeing folks' responses to my posts in emails but not in the browser view
under apache-spark-user-list.1001560.n3.nabble.com?  Is this a matter of
setting your preferences such that your responses only go to email and never
to the browser-based view of the list? I don't seem to see such a
preference...



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-User-List-people-s-responses-not-showing-in-the-browser-view-tp22135.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark and Morphlines, parallelization, multithreading

2015-03-18 Thread dgoldenberg
Still a Spark noob grappling with the concepts...

I'm trying to grok the idea of integrating something like the Morphlines
pipelining library with Spark (or SparkStreaming). The Kite/Morphlines doc
states that runtime executes all commands of a given morphline in the same
thread...  there are no queues, no handoffs among threads, no context
switches and no serialization between commands, which minimizes performance
overheads.

Further: There is no need for a morphline to manage multiple processes,
nodes, or threads because this is already addressed by host systems such as
MapReduce, Flume, Spark or Storm.

My question is, how exactly does Spark manage parallelization and
multi-treading aspects of RDD processing?  As I understand it, each
collection of data is split into partitions and each partition is sent over
to a slave machine to perform computations. So, for each data partition, how
many processes are created? And for each process, how many threads?

Knowing that would help me understand how to structure the following:

JavaPairInputDStreamString, String messages =
KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet);



JavaDStreamString messageBodies = messages.map(new
FunctionTuple2lt;String, String, String() {
@Override
public String call(Tuple2String, String tuple2) {
return tuple2._2();
}
});

Would I want to create a morphline in a 'messages.foreachRDD' block? then
invoke the morphline on each messageBody?

What will Spark be doing behind the scenes as far as multiple processes and
multiple threads? Should I rely on it to optimize performance with multiple
threads and not worry about plugging in a multi-threaded pipelining engine?

Thanks.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-and-Morphlines-parallelization-multithreading-tp22134.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



NotSerializableException: org.apache.http.impl.client.DefaultHttpClient when trying to send documents to Solr

2015-02-18 Thread dgoldenberg
I'm using Solrj in a Spark program. When I try to send the docs to Solr, I
get the NotSerializableException on the DefaultHttpClient.  Is there a
possible fix or workaround?

I'm using Spark 1.2.1 with Hadoop 2.4, SolrJ is version 4.0.0.

final HttpSolrServer solrServer = new HttpSolrServer(SOLR_SERVER_URL);
...
JavaRDDSolrInputDocument solrDocs = rdd.map(new FunctionRow,
SolrInputDocument() {
public SolrInputDocument call(Row r) {
return r.toSolrDocument();
}
});

solrDocs.foreachPartition(new VoidFunctionIteratorlt;SolrInputDocument()
{
public void call(IteratorSolrInputDocument solrDocIterator) throws
Exception {
ListSolrInputDocument batch = new 
ArrayListSolrInputDocument();

while (solrDocIterator.hasNext()) {
SolrInputDocument inputDoc = solrDocIterator.next();
batch.add(inputDoc);
if (batch.size() = batchSize) {
Utils.sendBatchToSolr(solrServer, 
solrCollection, batch);
}
}
if (!batch.isEmpty()) {
Utils.sendBatchToSolr(solrServer, solrCollection, 
batch);
}
}
});



Exception in thread main org.apache.spark.SparkException: Task not
serializable
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at
org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1478)
at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:789)
at
org.apache.spark.api.java.JavaRDDLike$class.foreachPartition(JavaRDDLike.scala:195)
at
org.apache.spark.api.java.JavaRDD.foreachPartition(JavaRDD.scala:32)
at
com.kona.motivis.spark.proto.SparkProto.execute(SparkProto.java:158)
at com.kona.motivis.spark.proto.SparkProto.main(SparkProto.java:186)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException:
org.apache.http.impl.client.DefaultHttpClient
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/NotSerializableException-org-apache-http-impl-client-DefaultHttpClient-when-trying-to-send-documentsr-tp21713.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Class loading issue, spark.files.userClassPathFirst doesn't seem to be working

2015-02-17 Thread dgoldenberg
I'm getting the below error when running spark-submit on my class. This class
has a transitive dependency on HttpClient v.4.3.1 since I'm calling SolrJ
4.10.3 from within the class.

This is in conflict with the older version, HttpClient 3.1 that's a
dependency of Hadoop 2.4 (I'm running Spark 1.2.1 built for Hadoop 2.4).

I've tried setting spark.files.userClassPathFirst to true in SparkConf in my
program, also setting it to true in  $SPARK-HOME/conf/spark-defaults.conf as

spark.files.userClassPathFirst true

No go, I'm still getting the error, as below. Is there anything else I can
try? Are there any plans in Spark to support multiple class loaders?

Exception in thread main java.lang.NoSuchMethodError:
org.apache.http.impl.conn.SchemeRegistryFactory.createSystemDefault()Lorg/apache/http/conn/scheme/SchemeRegistry;
at
org.apache.http.impl.client.SystemDefaultHttpClient.createClientConnectionManager(SystemDefaultHttpClient.java:121)
at
org.apache.http.impl.client.AbstractHttpClient.getConnectionManager(AbstractHttpClient.java:445)
at
org.apache.solr.client.solrj.impl.HttpClientUtil.setMaxConnections(HttpClientUtil.java:206)
at
org.apache.solr.client.solrj.impl.HttpClientConfigurer.configure(HttpClientConfigurer.java:35)
at
org.apache.solr.client.solrj.impl.HttpClientUtil.configureClient(HttpClientUtil.java:142)
at
org.apache.solr.client.solrj.impl.HttpClientUtil.createClient(HttpClientUtil.java:118)
at
org.apache.solr.client.solrj.impl.HttpSolrServer.init(HttpSolrServer.java:168)
at
org.apache.solr.client.solrj.impl.HttpSolrServer.init(HttpSolrServer.java:141)
...





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Class-loading-issue-spark-files-userClassPathFirst-doesn-t-seem-to-be-working-tp21693.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org