Spark process failing to receive data from the Kafka queue in yarn-client mode.

2016-02-05 Thread Rachana Srivastava
I am trying to run following code using yarn-client mode in but getting slow 
readprocessor error mentioned below but the code works just fine in the local 
mode.  Any pointer is really appreciated.

Line of code to receive data from the Kafka Queue:
JavaPairReceiverInputDStream messages =  
KafkaUtils.createStream(jssc, String.class, String.class, StringDecoder.class, 
StringDecoder.class, kafkaParams, kafkaTopicMap, StorageLevel.MEMORY_ONLY());

JavaDStream lines = messages.map(new Function, 
String>() {
  public String call(Tuple2 tuple2) {
  LOG.info("  Input json stream 
data  " +  tuple2._2);
return tuple2._2();
  }
});


Error Details:
016-02-05 11:44:00 WARN DFSClient:975 - Slow ReadProcessor read fields took 30
011ms (threshold=3ms); ack: seqno: 1960 reply: 0 reply: 0 reply: 0 downstrea
mAckTimeNanos: 1227280, targets: [DatanodeInfoWithStorage[10.0.0.245:50010,DS-a5
5d9212-3771-4936-bbe7-02035e7de148,DISK], DatanodeInfoWithStorage[10.0.0.243:500
10,DS-231b9915-c2e2-4392-b075-8a52ba1820ac,DISK], DatanodeInfoWithStorage[10.0.0
.244:50010,DS-6b8b5814-7dd7-4315-847c-b73bd375af0e,DISK]]
2016-02-05 11:44:00 INFO BlockManager:59 - Removing RDD 1954
2016-02-05 11:44:00 INFO MapPartitionsRDD:59 - Removing RDD 1955 from persisten


Preserving partitioning with dataframe select

2016-02-05 Thread Matt Cheah
Hi everyone,

When using raw RDDs, it is possible to have a map() operation indicate that the 
partitioning for the RDD would be preserved by the map operation. This makes it 
easier to reduce the overhead of shuffles by ensuring that RDDs are 
co-partitioned when they are joined.

When I'm using Data Frames, I'm pre-partitioning the data frame by using 
DataFrame.partitionBy($"X"), but I will invoke a select statement after the 
partitioning before joining that dataframe with another. Roughly speaking, I'm 
doing something like this pseudo-code:

partitionedDataFrame = dataFrame.partitionBy("$X")
groupedDataFrame = partitionedDataFrame.groupBy($"X").agg(aggregations)
// Rename "X" to "Y" to make sure columns are unique
groupedDataFrameRenamed = groupedDataFrame.withColumnRenamed("X", "Y")
// Roughly speaking, join on "X == Y" to get the aggregation results onto every 
row
joinedDataFrame = partitionedDataFrame.join(groupedDataFrame)

However the renaming of the columns maps to a select statement, and to my 
knowledge, selecting the columns is throwing off the partitioning which results 
in shuffle both the partitionedDataFrame and the groupedDataFrame.

I have the following questions given this example:

1) Is pre-partitioning the Data Frame effective? In other words, does the 
physical planner recognize when underlying RDDs are co-partitioned and compute 
more efficient joins by reducing the amount of data that is shuffled?
2) If the planner takes advantage of co-partitioning, is the renaming of the 
columns invalidating the partitioning of the grouped Data Frame? When I look at 
the planner's conversion from logical.Project to the physical plan, I only see 
it invoking child.mapPartitions without specifying the preservesPartitioning 
flag.

Thanks,

-Matt Cheah


Re: Building Spark with Custom Hadoop Version

2016-02-05 Thread Steve Loughran

> On 4 Feb 2016, at 23:11, Ted Yu  wrote:
> 
> Assuming your change is based on hadoop-2 branch, you can use 'mvn install' 
> command which would put artifacts under 2.8.0-SNAPSHOT subdir in your local 
> maven repo.
> 
> Here is an example:
> ~/.m2/repository/org/apache/hadoop/hadoop-hdfs/2.8.0-SNAPSHOT
> 
> Then you can use the following command to build Spark:
> 
> -Pyarn -Phadoop-2.4 -Dhadoop.version=2.8.0-SNAPSHOT
> 

Better to choose the hadoop-2.6 profile, e.g.

mvn test -Pyarn,hadoop-2.6 -Dhadoop.version=2.7.1  -pl yarn -Dtest=m  
-DwildcardSuites=org.apache.spark.deploy.yarn.YarnClusterSuite

(the -Dtest= assignment skips all java tests)

if you are playing with -SNAPSHOT sourcess

(a) rebuild them every morning
(b) never do a test run that spans midnight

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: Building Spark with Custom Hadoop Version

2016-02-05 Thread Steve Loughran

> On 4 Feb 2016, at 23:11, Ted Yu  wrote:
> 
> Assuming your change is based on hadoop-2 branch, you can use 'mvn install' 
> command which would put artifacts under 2.8.0-SNAPSHOT subdir in your local 
> maven repo.
> 


+ generally, unless you want to run all the hadoop tests, set the  -DskipTests 
on the mvn commands. The HDFS ones take a while and can use up all your file 
handles.

mvn install -DskipTests

here's the aliases I use


export MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m 
-Xms256m -Djava.awt.headless=true"
alias mi="mvn install -DskipTests"
alias mci="mvn clean install -DskipTests"
alias mvt="mvn test"
alias mvct="mvn clean test"
alias mvp="mvn package -DskipTests"
alias mvcp="mvn clean package -DskipTests"
alias mvnsite="mvn site:site -Dmaven.javadoc.skip=true -DskipTests"
alias mvndep="mvn dependency:tree -Dverbose"


mvndep > target/dependencies.txt is my command of choice to start working out 
where some random dependency is coming in from

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: SparkOscope: Enabling Spark Optimization through Cross-stack Monitoring and Visualization

2016-02-05 Thread Pete Robbins
Yiannis,

I'm interested in what you've done here as I was looking for ways to allow
the Spark UI to display custom metrics in a pluggable way without having to
modify the Spark source code. It would be good to see if we could have
modify your code to add extension points into the UI so we could configure
sources of the additional metrics. So for instance rather than creating
events from your HDFS files I would like to have a module that is pulling
in system/jvm metrics that are in eg Elasticsearch.

Do any of the Spark committers have any thoughts on this?

Cheers,


On 3 February 2016 at 15:26, Yiannis Gkoufas  wrote:

> Hi all,
>
> I just wanted to introduce some of my recent work in IBM Research around
> Spark and especially its Metric System and Web UI.
> As a quick overview of our contributions:
> We have a created a new type of Sink for the metrics ( HDFSSink ) which
> captures the metrics into HDFS,
> We have extended the metrics reported by the Executors to include OS-level
> metrics regarding CPU, RAM, Disk IO, Network IO utilizing the Hyperic Sigar
> library
> We have extended the Web UI for the completed applications to visualize
> any of the above metrics the user wants to.
> The above functionalities can be configured in the metrics.properties and
> spark-defaults.conf files.
> We have recorded a small demo that shows those capabilities which you can
> find here : https://ibm.app.box.com/s/vyaedlyb444a4zna1215c7puhxliqxdg
> There is a blog post which gives more details on the functionality here:
> *www.spark.tc/sparkoscope-enabling-spark-optimization-through-cross-stack-monitoring-and-visualization-2/*
> 
> and also there is a public repo where anyone can try it:
> *https://github.com/ibm-research-ireland/sparkoscope*
> 
>
> I would really appreciate any feedback or advice regarding this work.
> Especially if you think it's worth it to upstream to the official Spark
> repository.
>
> Thanks a lot!
>