wholeTextAPI() extremely SLOW under high load (How to fix?)

2021-10-02 Thread Rachana Srivastava
Issue:  We are using wholeTextFile() API to read files from S3.  But this API 
is extremely SLOW due to reasons mentioned below.  Question is how to fix this 
issue?
Here is our analysis so FAR: 
Issue is we are using Spark WholeTextFile API to read s3 files. WholeTextFile 
API works in two step. First step driver/master tries to list all the S3 files 
second step is driver/master tries to split the list of files and distribute 
those files to number of worker nodes and executor to process).

STEP 1. List all the s3 files in the given paths (we pass this path when we run 
the every single gw/device/app step). Issue is every single batch of every 
single report is first listing number of files. Main problem that we have is we 
are using S3 where listing files in a bucket is single threaded. This is 
because the S3 API for listing the keys in a bucket only returns keys by chunks 
of 1000 per call. Single threaded S3 API just tries to list files 1000 at a 
time, so for a million files we are looking at 1000 S3 single threaded API 
call. 

STEP 2. Control the number of splits depends on number of input partitions and 
distribute the load to worker nodes to process.


S3-SQS vs Auto Loader With Apache Spark Structured Streaming

2020-12-20 Thread Rachana Srivastava

Problem Statement: I want to read files from S3 write files to s3 using Spark 
Structured Streaming. I looked at the reference architecture recommended by 
Spark team that recommends using S3 -> SNS -> SQS using S3-SQS file source.

Question:
   
   - S3-SQS file source: Is S3-SQS file source available in Apache Spark? Do we 
need to use apache Bahir's SQS implementation 
https://github.com/apache/bahir/tree/master/sql-streaming-sqs
   - Auto Loader: This article recommends that we should use Auto Loader. Is 
Auto Loader available from Apache Spark 
https://docs.databricks.com/spark/latest/structured-streaming/sqs.html






Need your help!! (URGENT Code works fine when submitted as java main but part of data missing when running as Spark-Submit)

2020-07-21 Thread Rachana Srivastava
I am unable to identify the root cause of why my code is missing data when I 
run as spark-submit but the code works fine when I run as java main  Any 
idea

OOM while processing read/write to S3 using Spark Structured Streaming

2020-07-19 Thread Rachana Srivastava
Issue: I am trying to process 5000+ files of gzipped json file periodically 
from S3 using Structured Streaming code. 
Here are the key steps:   
   -
Read json schema and broadccast to executors

   -
Read Stream
   
Dataset inputDS = sparkSession.readStream() .format("text") 
.option("inferSchema", "true") .option("header", "true") .option("multiLine", 
true).schema(jsonSchema) .option("mode", "PERMISSIVE") .json(inputPath + "/*");

   -
Process each file in a map Dataset ds = inputDS.map(x -> { ... }, 
Encoders.STRING());

   -
Write output to S3
   
StreamingQuery query = ds .coalesce(1) .writeStream() .outputMode("append") 
.format("csv") ... .start();


maxFilesPerTrigger is set to 500 so I was hoping the streaming will pick only 
that many file to process. Why are we getting OOM? If in a we have more than 
3500 files then system crashes with OOM.



Re: Is Spark Structured Streaming TOTALLY BROKEN (Spark Metadata Issues)

2020-06-17 Thread Rachana Srivastava
 Structured Stream Vs Spark Steaming (DStream)?
Which is recommended for system stability.  Exactly once is NOT first priority. 
 First priority is STABLE system.
I am I need to make a decision soon.  I need help.  Here is the question again. 
 Should I go backward and use Spark Streaming DStream based.  Write our own 
checkpoint and go from there.  At least we never encounter these metadata 
issues there.
Thanks,
Rachana
On Wednesday, June 17, 2020, 02:02:20 PM PDT, Jungtaek Lim 
 wrote:  
 
 Just in case if anyone prefers ASF projects then there are other alternative 
projects in ASF as well, alphabetically, Apache Hudi [1] and Apache Iceberg 
[2]. Both are recently graduated as top level projects. (DISCLAIMER: I'm not 
involved in both.)
BTW it would be nice if we make the metadata implementation on file stream 
source/sink be pluggable - from what I've seen, plugin approach has been 
selected as the way to go whenever some part is going to be complicated and it 
becomes arguable whether the part should be handled in Spark project vs should 
be outside. e.g. checkpoint manager, state store provider, etc. It would open 
up chances for the ecosystem to play with the challenge "without completely 
re-writing the file stream source and sink", focusing on scalability for 
metadata in a long run query. Alternative projects described above will still 
provide more higher-level features and look attractive, but sometimes it may be 
just "using a sledgehammer to crack a nut".
1. https://hudi.apache.org/2. https://iceberg.apache.org/


On Thu, Jun 18, 2020 at 2:34 AM Tathagata Das  
wrote:

Hello Rachana,
Getting exactly-once semantics on files and making it scale to a very large 
number of files are very hard problems to solve. While Structured Streaming + 
built-in file sink solves the exactly-once guarantee that DStreams could not, 
it is definitely limited in other ways (scaling in terms of files, combining 
batch and streaming writes in the same place, etc). And solving this problem 
requires a holistic solution that is arguably beyond the scope of the Spark 
project. 
There are other projects that are trying to solve this file management issue. 
For example, Delta Lake (full disclosure, I am involved in it) was built to 
exactly solve this problem - get exactly-once and ACID guarantees on files, but 
also scale to handling millions of files. Please consider it as part of your 
solution. 



On Wed, Jun 17, 2020 at 9:50 AM Rachana Srivastava 
 wrote:

I have written a simple spark structured steaming app to move data from Kafka 
to S3. Found that in order to support exactly-once guarantee spark creates 
_spark_metadata folder, which ends up growing too large as the streaming app is 
SUPPOSE TO run FOREVER. But when the streaming app runs for a long time the 
metadata folder grows so big that we start getting OOM errors. Only way to 
resolve OOM is delete Checkpoint and Metadata folder and loose VALUABLE 
customer data.

Spark open JIRAs SPARK-24295 and SPARK-29995, SPARK-30462, and SPARK-24295)
Since Spark Streaming was NOT broken like this. Is Spark Streaming a BETTER 
choice?

  

Re: Is Spark Structured Streaming TOTALLY BROKEN (Spark Metadata Issues)

2020-06-17 Thread Rachana Srivastava
 Frankly speaking I do not care about EXACTLY ONCE... I am OK with ATLEAST ONCE 
at long as system does not fail every 5 to 7 days with no recovery option.
On Wednesday, June 17, 2020, 02:31:50 PM PDT, Rachana Srivastava 
 wrote:  
 
  Thanks so much TD.  Thanks for forwarding your datalake project but at this 
time we have budget constraints we can only use open source project.  
I just want the Structured Streaming Application or Spark Streaming DStream 
Application to run without and issue for a long time..  I do not want the size 
of metadata to grow too large that we start getting these OOM issues.  The only 
way to resolve this OOM issues is by deleting the checkpoint and metadata 
folders.  That means loosing customer data.   We have 60 seconds batch where we 
are coleasing and returning only one file per partition.  So we do not have 
small file issues also...
What do you suggest?  How should we resolve this issue?
We have very simple 5 line program that reads data from Kafka and output data 
to S3.  1. Reading records from Kafka topic
  Dataset inputDf = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .option("startingOffsets", "earliest") \
  .load()
2. Use from_json API from Spark to extract your data for further transformation 
in a dataset.
   Dataset dataDf = inputDf.select(from_json(col("value").cast("string"), 
EVENT_SCHEMA).alias("event"))
   withColumn("oem_id", col("metadata.oem_id"));
3. Construct a temp table of above dataset using SQLContext
   SQLContext sqlContext = new SQLContext(sparkSession);
   dataDf.createOrReplaceTempView("event");
4. Flatten events since Parquet does not support hierarachical data.
5. Store output in parquet format on S3
   StreamingQuery query = flatDf.writeStream().format("parquet")

On Wednesday, June 17, 2020, 02:02:20 PM PDT, Jungtaek Lim 
 wrote:  
 
 Just in case if anyone prefers ASF projects then there are other alternative 
projects in ASF as well, alphabetically, Apache Hudi [1] and Apache Iceberg 
[2]. Both are recently graduated as top level projects. (DISCLAIMER: I'm not 
involved in both.)
BTW it would be nice if we make the metadata implementation on file stream 
source/sink be pluggable - from what I've seen, plugin approach has been 
selected as the way to go whenever some part is going to be complicated and it 
becomes arguable whether the part should be handled in Spark project vs should 
be outside. e.g. checkpoint manager, state store provider, etc. It would open 
up chances for the ecosystem to play with the challenge "without completely 
re-writing the file stream source and sink", focusing on scalability for 
metadata in a long run query. Alternative projects described above will still 
provide more higher-level features and look attractive, but sometimes it may be 
just "using a sledgehammer to crack a nut".
1. https://hudi.apache.org/2. https://iceberg.apache.org/


On Thu, Jun 18, 2020 at 2:34 AM Tathagata Das  
wrote:

Hello Rachana,
Getting exactly-once semantics on files and making it scale to a very large 
number of files are very hard problems to solve. While Structured Streaming + 
built-in file sink solves the exactly-once guarantee that DStreams could not, 
it is definitely limited in other ways (scaling in terms of files, combining 
batch and streaming writes in the same place, etc). And solving this problem 
requires a holistic solution that is arguably beyond the scope of the Spark 
project. 
There are other projects that are trying to solve this file management issue. 
For example, Delta Lake (full disclosure, I am involved in it) was built to 
exactly solve this problem - get exactly-once and ACID guarantees on files, but 
also scale to handling millions of files. Please consider it as part of your 
solution. 



On Wed, Jun 17, 2020 at 9:50 AM Rachana Srivastava 
 wrote:

I have written a simple spark structured steaming app to move data from Kafka 
to S3. Found that in order to support exactly-once guarantee spark creates 
_spark_metadata folder, which ends up growing too large as the streaming app is 
SUPPOSE TO run FOREVER. But when the streaming app runs for a long time the 
metadata folder grows so big that we start getting OOM errors. Only way to 
resolve OOM is delete Checkpoint and Metadata folder and loose VALUABLE 
customer data.

Spark open JIRAs SPARK-24295 and SPARK-29995, SPARK-30462, and SPARK-24295)
Since Spark Streaming was NOT broken like this. Is Spark Streaming a BETTER 
choice?



Re: Is Spark Structured Streaming TOTALLY BROKEN (Spark Metadata Issues)

2020-06-17 Thread Rachana Srivastava
 Thanks so much TD.  Thanks for forwarding your datalake project but at this 
time we have budget constraints we can only use open source project.  
I just want the Structured Streaming Application or Spark Streaming DStream 
Application to run without and issue for a long time..  I do not want the size 
of metadata to grow too large that we start getting these OOM issues.  The only 
way to resolve this OOM issues is by deleting the checkpoint and metadata 
folders.  That means loosing customer data.   We have 60 seconds batch where we 
are coleasing and returning only one file per partition.  So we do not have 
small file issues also...
What do you suggest?  How should we resolve this issue?
We have very simple 5 line program that reads data from Kafka and output data 
to S3.  1. Reading records from Kafka topic
  Dataset inputDf = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .option("startingOffsets", "earliest") \
  .load()
2. Use from_json API from Spark to extract your data for further transformation 
in a dataset.
   Dataset dataDf = inputDf.select(from_json(col("value").cast("string"), 
EVENT_SCHEMA).alias("event"))
   withColumn("oem_id", col("metadata.oem_id"));
3. Construct a temp table of above dataset using SQLContext
   SQLContext sqlContext = new SQLContext(sparkSession);
   dataDf.createOrReplaceTempView("event");
4. Flatten events since Parquet does not support hierarachical data.
5. Store output in parquet format on S3
   StreamingQuery query = flatDf.writeStream().format("parquet")

On Wednesday, June 17, 2020, 02:02:20 PM PDT, Jungtaek Lim 
 wrote:  
 
 Just in case if anyone prefers ASF projects then there are other alternative 
projects in ASF as well, alphabetically, Apache Hudi [1] and Apache Iceberg 
[2]. Both are recently graduated as top level projects. (DISCLAIMER: I'm not 
involved in both.)
BTW it would be nice if we make the metadata implementation on file stream 
source/sink be pluggable - from what I've seen, plugin approach has been 
selected as the way to go whenever some part is going to be complicated and it 
becomes arguable whether the part should be handled in Spark project vs should 
be outside. e.g. checkpoint manager, state store provider, etc. It would open 
up chances for the ecosystem to play with the challenge "without completely 
re-writing the file stream source and sink", focusing on scalability for 
metadata in a long run query. Alternative projects described above will still 
provide more higher-level features and look attractive, but sometimes it may be 
just "using a sledgehammer to crack a nut".
1. https://hudi.apache.org/2. https://iceberg.apache.org/


On Thu, Jun 18, 2020 at 2:34 AM Tathagata Das  
wrote:

Hello Rachana,
Getting exactly-once semantics on files and making it scale to a very large 
number of files are very hard problems to solve. While Structured Streaming + 
built-in file sink solves the exactly-once guarantee that DStreams could not, 
it is definitely limited in other ways (scaling in terms of files, combining 
batch and streaming writes in the same place, etc). And solving this problem 
requires a holistic solution that is arguably beyond the scope of the Spark 
project. 
There are other projects that are trying to solve this file management issue. 
For example, Delta Lake (full disclosure, I am involved in it) was built to 
exactly solve this problem - get exactly-once and ACID guarantees on files, but 
also scale to handling millions of files. Please consider it as part of your 
solution. 



On Wed, Jun 17, 2020 at 9:50 AM Rachana Srivastava 
 wrote:

I have written a simple spark structured steaming app to move data from Kafka 
to S3. Found that in order to support exactly-once guarantee spark creates 
_spark_metadata folder, which ends up growing too large as the streaming app is 
SUPPOSE TO run FOREVER. But when the streaming app runs for a long time the 
metadata folder grows so big that we start getting OOM errors. Only way to 
resolve OOM is delete Checkpoint and Metadata folder and loose VALUABLE 
customer data.

Spark open JIRAs SPARK-24295 and SPARK-29995, SPARK-30462, and SPARK-24295)
Since Spark Streaming was NOT broken like this. Is Spark Streaming a BETTER 
choice?

  

How to manage offsets in Spark Structured Streaming?

2020-06-17 Thread Rachana Srivastava
  Background: I have written a simple spark structured steaming app to move 
data from Kafka to S3. Found that in order to support exactly-once guarantee 
spark creates _spark_metadata folder, which ends up growing too large, when the 
streaming app runs for a long time the metadata folder grows so big that we 
start getting OOM errors.   I want to get rid of metadata and checkpoint 
folders of Spark Structured Streaming and manage offsets myself.
How we managed offsets in Spark Streaming:I have used val offsetRanges = 
rdd.asInstanceOf[HasOffsetRanges].offsetRanges  to get offsets in Spark 
Structured Streaming.  But want to know how to get the offsets and other 
metadata to manage checkpointing ourself using Spark Structured Streaming.  Do 
you have any sample program that implements checkpointing?
How we managed offsets in Spark Structured Streaming??
Looking at this JIRA https://issues-test.apache.org/jira/browse/SPARK-18258. 
looks like offsets are not provided.  How should we go about?

  

Is Spark Structured Streaming TOTALLY BROKEN (Spark Metadata Issues)

2020-06-17 Thread Rachana Srivastava
I have written a simple spark structured steaming app to move data from Kafka 
to S3. Found that in order to support exactly-once guarantee spark creates 
_spark_metadata folder, which ends up growing too large as the streaming app is 
SUPPOSE TO run FOREVER. But when the streaming app runs for a long time the 
metadata folder grows so big that we start getting OOM errors. Only way to 
resolve OOM is delete Checkpoint and Metadata folder and loose VALUABLE 
customer data.

Spark open JIRAs SPARK-24295 and SPARK-29995, SPARK-30462, and SPARK-24295)
Since Spark Streaming was NOT broken like this. Is Spark Streaming a BETTER 
choice?

Not all KafkaReceivers processing the data Why?

2016-09-14 Thread Rachana Srivastava
Hello all,

I have created a Kafka topic with 5 partitions.  And I am using createStream 
receiver API like following.   But somehow only one receiver is getting the 
input data. Rest of receivers are not processign anything.  Can you please help?

JavaPairDStream messages = null;

if(sparkStreamCount > 0){
// We create an input DStream for each partition of the topic, 
unify those streams, and then repartition the unified stream.
List> kafkaStreams = new 
ArrayList>(sparkStreamCount);
for (int i = 0; i < sparkStreamCount; i++) {
kafkaStreams.add( KafkaUtils.createStream(jssc, 
contextVal.getString(KAFKA_ZOOKEEPER), contextVal.getString(KAFKA_GROUP_ID), 
kafkaTopicMap));
}
messages = jssc.union(kafkaStreams.get(0), 
kafkaStreams.subList(1, kafkaStreams.size()));
}
else{
messages =  KafkaUtils.createStream(jssc, 
contextVal.getString(KAFKA_ZOOKEEPER), contextVal.getString(KAFKA_GROUP_ID), 
kafkaTopicMap);
}



[cid:image001.png@01D20E84.3558F520]






How do we process/scale variable size batches in Apache Spark Streaming

2016-08-23 Thread Rachana Srivastava
I am running a spark streaming process where I am getting batch of data after n 
seconds. I am using repartition to scale the application. Since the repartition 
size is fixed we are getting lots of small files when batch size is very small. 
Is there anyway I can change the partitioner logic based on the input batch 
size in order to avoid lots of small files.


Number of tasks on executors become negative after executor failures

2016-08-15 Thread Rachana Srivastava
Summary:
I am running Spark 1.5 on CDH5.5.1.  Under extreme load intermittently I am 
getting this connection failure exception and later negative executor in the 
Spark UI.

Exception:
TRACE: org.apache.hadoop.hbase.ipc.AbstractRpcClient - Call: Multi, callTime: 
76ms
INFO : org.apache.spark.network.client.TransportClientFactory - Found inactive 
connection to /xxx.xxx.xxx., creating a new one.
ERROR: org.apache.spark.network.shuffle.RetryingBlockFetcher - Exception while 
beginning fetch of 1 outstanding blocks (after 1 retries)
java.io.IOException: Failed to connect to /xxx.xxx.xxx.
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193)
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
at 
org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:88)
at 
org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
at 
org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
at 
org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.ConnectException: Connection refused: /xxx.xxx.xxx.
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at 
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
at 
io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
at 
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:289)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
... 1 more


Related Defects:
https://issues.apache.org/jira/browse/SPARK-2319
https://issues.apache.org/jira/browse/SPARK-9591


[cid:image001.png@01D1F6EE.1CCFE110]


Missing Exector Logs From Yarn After Spark Failure

2016-07-19 Thread Rachana Srivastava
I am trying to find the root cause of recent Spark application failure in 
production. When the Spark application is running I can check NodeManager's 
yarn.nodemanager.log-dir property to get the Spark executor container logs.

The container has logs for both the running Spark applications

Here is the view of the container logs: drwx--x--- 3 yarn yarn 51 Jul 19 09:04 
application_1467068598418_0209 drwx--x--- 5 yarn yarn 141 Jul 19 09:04 
application_1467068598418_0210

But when the application is killed both the application logs are automatically 
deleted. I have set all the log retention setting etc in Yarn to a very large 
number. But still these logs are deleted as soon as the Spark applications are 
crashed.

Question: How can we retain these Spark application logs in Yarn for debugging 
when the Spark application is crashed for some reason.


Spark Text Streaming Does not Recognize Folder using RegEx

2016-04-01 Thread Rachana Srivastava
Hello All,

I have written a simple program to get data from

JavaDStream textStream = jssc.textFileStream();
JavaDStream ceRDD = textStream.map(
new Function() {
public String call(String ceData) throws Exception {
System.out.println(ceData);
}
});
}

My code works file when we pass complete path of the input directory 
 = 
hdfs://quickstart.cloudera:8020//user/cloudera/CE/Output/OUTPUTYarnClusterCEQ/2016-04-01/4489867359541/
 WORKS Fine.
But
 = 
hdfs://quickstart.cloudera:8020/user/cloudera/CE/Output/OUTPUTYarnClusterCEQ/2016-04-01/*/
 DOES NOT WORK
When we pass the folder name using regEx then i am getting the exception below.

Exception

16/04/01 13:48:40 WARN FileInputDStream: Error finding new files
java.io.FileNotFoundException: File 
hdfs://quickstart.cloudera:8020/user/cloudera/CE/Output/OUTPUTYarnClusterCEQ/2016-04-01/*
 does not exist.
at 
org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:704)
at 
org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:105)
at 
org.apache.hadoop.hdfs.DistributedFileSystem$15.doCall(DistributedFileSystem.java:762)
at 
org.apache.hadoop.hdfs.DistributedFileSystem$15.doCall(DistributedFileSystem.java:758)
at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at 
org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:758)


How to obtain JavaHBaseContext to connection SparkStreaming with HBase

2016-03-09 Thread Rachana Srivastava
I am trying to integrate SparkStreaming with HBase. I am calling following APIs 
to connect to HBase

HConnection hbaseConnection = HConnectionManager.createConnection(conf); 
hBaseTable = hbaseConnection.getTable(hbaseTable);

Since I cannot get the connection and broadcast the connection each API call to 
get data from HBase is very expensive.

I tried using JavaHBaseContext (JavaHBaseContext hbaseContext = new 
JavaHBaseContext(jsc, conf)) by using hbase-spark library in CDH 5.5 but I 
cannot import the library from maven. Has anyone been able to successfully 
resolve this issue.

I am trying to use the latest APIs to connect HBase and SparkStreaming on 
Cloudera.

Some of the JIRA items mentioned here.

http://blog.cloudera.com/blog/2015/08/apache-spark-comes-to-apache-hbase-with-hbase-spark-module/


Question is how to get maven repository


  org.apache.hbase
  hbase-spark
  2.0.0-SNAPSHOT



HADOOP_HOME are not set when try to run spark application in yarn cluster mode

2016-02-09 Thread Rachana Srivastava
I am trying to run an application in yarn cluster mode.

Spark-Submit with Yarn Cluster
Here are setting of the shell script:
spark-submit --class "com.Myclass"  \
--num-executors 2 \
--executor-cores 2 \
--master yarn \
--supervise \
--deploy-mode cluster \
../target/ \

My application is working fine in yarn-client and local mode.

Excerpt for error when we submit application from spark-submit in yarn cluster 
mode.

&& HADOOP HOME correct path logged but still getting the 
error
/usr/lib/hadoop
&& HADOOP_CONF_DIR
/usr/lib/hadoop/etc/hadoop
...
Diagnostics: Exception from container-launch.
Container id: container_1454984479786_0006_02_01
Exit code: 15
Stack trace: ExitCodeException exitCode=15:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:543)
at org.apache.hadoop.util.Shell.run(Shell.java:460)
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:720)
at 
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:210)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)

Further I am getting following error
ERROR DETAILS FROM YARN LOGS APPLICATIONID
INFO : org.apache.spark.deploy.yarn.ApplicationMaster - Registered signal 
handlers for [TERM, HUP, INT]
DEBUG: org.apache.hadoop.util.Shell - Failed to detect a valid hadoop home 
directory
java.io.IOException: HADOOP_HOME or hadoop.home.dir are not set.
at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:307)
at org.apache.hadoop.util.Shell.(Shell.java:332)
at org.apache.hadoop.util.StringUtils.(StringUtils.java:79)
at 
org.apache.hadoop.yarn.conf.YarnConfiguration.(YarnConfiguration.java:590)
at 
org.apache.spark.deploy.yarn.YarnSparkHadoopUtil.newConfiguration(YarnSparkHadoopUtil.scala:62)
at org.apache.spark.deploy.SparkHadoopUtil.(SparkHadoopUtil.scala:52)
at 
org.apache.spark.deploy.yarn.YarnSparkHadoopUtil.(YarnSparkHadoopUtil.scala:47)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at java.lang.Class.newInstance(Class.java:374)
at 
org.apache.spark.deploy.SparkHadoopUtil$.liftedTree1$1(SparkHadoopUtil.scala:386)
at 
org.apache.spark.deploy.SparkHadoopUtil$.yarn$lzycompute(SparkHadoopUtil.scala:384)
at org.apache.spark.deploy.SparkHadoopUtil$.yarn(SparkHadoopUtil.scala:384)
at org.apache.spark.deploy.SparkHadoopUtil$.get(SparkHadoopUtil.scala:401)
at 
org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:623)
at 
org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala)

I tried modifying spark-env.sh like following and I see Hadoop_Home logged but 
still getting above error:
Modified added following entries to spark-env.sh
export HADOOP_HOME="/usr/lib/hadoop"
echo "&& HADOOP HOME "
echo "$HADOOP_HOME"
export HADOOP_CONF_DIR="$HADOOP_HOME/etc/hadoop"
echo "&& HADOOP_CONF_DIR "
echo "$HADOOP_CONF_DIR"


Spark process failing to receive data from the Kafka queue in yarn-client mode.

2016-02-05 Thread Rachana Srivastava
I am trying to run following code using yarn-client mode in but getting slow 
readprocessor error mentioned below but the code works just fine in the local 
mode.  Any pointer is really appreciated.

Line of code to receive data from the Kafka Queue:
JavaPairReceiverInputDStream messages =  
KafkaUtils.createStream(jssc, String.class, String.class, StringDecoder.class, 
StringDecoder.class, kafkaParams, kafkaTopicMap, StorageLevel.MEMORY_ONLY());

JavaDStream lines = messages.map(new Function, 
String>() {
  public String call(Tuple2 tuple2) {
  LOG.info("  Input json stream 
data  " +  tuple2._2);
return tuple2._2();
  }
});


Error Details:
016-02-05 11:44:00 WARN DFSClient:975 - Slow ReadProcessor read fields took 30
011ms (threshold=3ms); ack: seqno: 1960 reply: 0 reply: 0 reply: 0 downstrea
mAckTimeNanos: 1227280, targets: [DatanodeInfoWithStorage[10.0.0.245:50010,DS-a5
5d9212-3771-4936-bbe7-02035e7de148,DISK], DatanodeInfoWithStorage[10.0.0.243:500
10,DS-231b9915-c2e2-4392-b075-8a52ba1820ac,DISK], DatanodeInfoWithStorage[10.0.0
.244:50010,DS-6b8b5814-7dd7-4315-847c-b73bd375af0e,DISK]]
2016-02-05 11:44:00 INFO BlockManager:59 - Removing RDD 1954
2016-02-05 11:44:00 INFO MapPartitionsRDD:59 - Removing RDD 1955 from persisten


RE: Random Forest FeatureImportance throwing NullPointerException

2016-01-14 Thread Rachana Srivastava
Tried using 1.6 version of Spark that takes numberOfFeatures fifth argument in  
the API but still getting featureImportance as null.

RandomForestClassifier rfc = getRandomForestClassifier( numTrees,  maxBinSize,  
maxTreeDepth,  seed,  impurity);
RandomForestClassificationModel rfm = 
RandomForestClassificationModel.fromOld(model, rfc, categoricalFeatures, 
numberOfClasses,numberOfFeatures);
System.out.println(rfm.featureImportances());

Stack Trace:
Exception in thread "main" java.lang.NullPointerException
at 
org.apache.spark.ml.tree.impl.RandomForest$.computeFeatureImportance(RandomForest.scala:1152)
at 
org.apache.spark.ml.tree.impl.RandomForest$$anonfun$featureImportances$1.apply(RandomForest.scala:)
at 
org.apache.spark.ml.tree.impl.RandomForest$$anonfun$featureImportances$1.apply(RandomForest.scala:1108)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at 
scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at 
org.apache.spark.ml.tree.impl.RandomForest$.featureImportances(RandomForest.scala:1108)
at 
org.apache.spark.ml.classification.RandomForestClassificationModel.featureImportances$lzycompute(RandomForestClassifier.scala:237)
at 
org.apache.spark.ml.classification.RandomForestClassificationModel.featureImportances(RandomForestClassifier.scala:237)
at 
com.markmonitor.antifraud.ce.ml.CheckFeatureImportance.main(CheckFeatureImportance.java:49)

From: Rachana Srivastava
Sent: Wednesday, January 13, 2016 3:30 PM
To: 'user@spark.apache.org'; 'd...@spark.apache.org'
Subject: Random Forest FeatureImportance throwing NullPointerException

I have a Random forest model for which I am trying to get the featureImportance 
vector.

Map<Object,Object> categoricalFeaturesParam = new HashMap<>();
scala.collection.immutable.Map<Object,Object> categoricalFeatures =  
(scala.collection.immutable.Map<Object,Object>)
scala.collection.immutable.Map$.MODULE$.apply(JavaConversions.mapAsScalaMap(categoricalFeaturesParam).toSeq());
int numberOfClasses =2;
RandomForestClassifier rfc = new RandomForestClassifier();
RandomForestClassificationModel rfm = 
RandomForestClassificationModel.fromOld(model, rfc, categoricalFeatures, 
numberOfClasses);
System.out.println(rfm.featureImportances());

When I run above code I found featureImportance as null.  Do I need to set 
anything in specific to get the feature importance for the random forest model.

Thanks,

Rachana


Random Forest FeatureImportance throwing NullPointerException

2016-01-13 Thread Rachana Srivastava
I have a Random forest model for which I am trying to get the featureImportance 
vector.

Map categoricalFeaturesParam = new HashMap<>();
scala.collection.immutable.Map categoricalFeatures =  
(scala.collection.immutable.Map)
scala.collection.immutable.Map$.MODULE$.apply(JavaConversions.mapAsScalaMap(categoricalFeaturesParam).toSeq());
int numberOfClasses =2;
RandomForestClassifier rfc = new RandomForestClassifier();
RandomForestClassificationModel rfm = 
RandomForestClassificationModel.fromOld(model, rfc, categoricalFeatures, 
numberOfClasses);
System.out.println(rfm.featureImportances());

When I run above code I found featureImportance as null.  Do I need to set 
anything in specific to get the feature importance for the random forest model.

Thanks,

Rachana


RE: Double Counting When Using Accumulators with Spark Streaming

2016-01-05 Thread Rachana Srivastava
rg.apache.spark.storage.MemoryStore - Block broadcast_1_piece0 stored 
as bytes in memory (estimated size 2.2 KB, free 1806.1 MB)
INFO : org.apache.spark.storage.BlockManagerInfo - Added broadcast_1_piece0 in 
memory on localhost:58397 (size: 2.2 KB, free: 1806.1 MB)
INFO : org.apache.spark.SparkContext - Created broadcast 1 from broadcast at 
DAGScheduler.scala:861
INFO : org.apache.spark.scheduler.DAGScheduler - Submitting 1 missing tasks 
from ResultStage 1 (MapPartitionsRDD[3] at map at KafkaURLStreaming.java:83)
INFO : org.apache.spark.scheduler.TaskSchedulerImpl - Adding task set 1.0 with 
1 tasks
INFO : org.apache.spark.scheduler.TaskSetManager - Starting task 0.0 in stage 
1.0 (TID 1, localhost, ANY, 2026 bytes)
INFO : org.apache.spark.executor.Executor - Running task 0.0 in stage 1.0 (TID 
1)
INFO : org.apache.spark.streaming.kafka.KafkaRDD - Computing topic test11, 
partition 0 offsets 37 -> 38
INFO : kafka.utils.VerifiableProperties - Verifying properties
INFO : kafka.utils.VerifiableProperties - Property fetch.message.max.bytes is 
overridden to 1073741824
INFO : kafka.utils.VerifiableProperties - Property group.id is overridden to
INFO : kafka.utils.VerifiableProperties - Property zookeeper.connect is 
overridden to localhost:2181
INFO : com.markmonitor.antifraud.ce.KafkaURLStreaming - #  
Input json stream data  # one test message without saveAs
INFO : org.apache.spark.executor.Executor - Finished task 0.0 in stage 1.0 (TID 
1). 987 bytes result sent to driver
INFO : org.apache.spark.scheduler.DAGScheduler - ResultStage 1 (foreachRDD at 
KafkaURLStreaming.java:90) finished in 0.103 s
INFO : org.apache.spark.scheduler.DAGScheduler - Job 1 finished: foreachRDD at 
KafkaURLStreaming.java:90, took 0.151210 s

&&&&&&&&&&&&&&&&&&&&& AFTER COUNT OF ACCUMULATOR IS 1





-Original Message-
From: Jean-Baptiste Onofré [mailto:j...@nanthrax.net]
Sent: Tuesday, January 05, 2016 8:21 AM
To: user@spark.apache.org
Subject: Re: Double Counting When Using Accumulators with Spark Streaming



Hi Rachana,



don't you have two messages on the kafka broker ?



Regards

JB



On 01/05/2016 05:14 PM, Rachana Srivastava wrote:

> I have a very simple two lines program.  I am getting input from Kafka

> and save the input in a file and counting the input received.  My code

> looks like this, when I run this code I am getting two accumulator

> count for each input.

>

> HashMap<String, String> kafkaParams= *new*HashMap<String,

> String>();kafkaParams.put("metadata.broker.list", "localhost:9092");

> kafkaParams.put("zookeeper.connect", "localhost:2181");

>

> JavaPairInputDStream<String, String> messages=

> KafkaUtils./createDirectStream/( jssc,String.*class*, String.*class*,

> StringDecoder.*class*, StringDecoder.*class*, kafkaParams, topicsSet);

>

> *final**Accumulator **accum**=

> **jssc**.sparkContext().accumulator(0);***

>

> JavaDStream lines= messages.map(

>

> *new*_Function<Tuple2<String, String>, String>()_ {

>

> *public*String call(Tuple2<String, String> tuple2) { *accum.add(1);*

> *return*tuple2._2();

>

> }});

>

> lines.foreachRDD(*new*_Function<JavaRDD, Void>()_ {

>

> *public*Void call(JavaRDD rdd) *throws*Exception {

>

> *if*(!rdd.isEmpty() ||

> !rdd.partitions().isEmpty()){rdd.saveAsTextFile("hdfs://quickstart.clo

> udera:8020/user/cloudera/testDirJan4/test1.text");}

>

> System.*/out/*.println(" &&&&&&&&&&&&&&&&&&&&& COUNT OF ACCUMULATOR IS

> "+ *accum.value(*)); *return**null*;}

>

>   });

>

> jssc.start();

>

> If I comment rdd.saveAsTextFile I get correct count, but with

> rdd.saveAsTextFile for each input I am getting multiple accumulator count.

>

> Thanks,

>

> Rachana

>



--

Jean-Baptiste Onofré

jbono...@apache.org<mailto:jbono...@apache.org>

http://blog.nanthrax.net

Talend - http://www.talend.com



-

To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org> For 
additional commands, e-mail: 
user-h...@spark.apache.org<mailto:user-h...@spark.apache.org>




Double Counting When Using Accumulators with Spark Streaming

2016-01-05 Thread Rachana Srivastava
I have a very simple two lines program.  I am getting input from Kafka and save 
the input in a file and counting the input received.  My code looks like this, 
when I run this code I am getting two accumulator count for each input.

HashMap kafkaParams = new HashMap();  
kafkaParams.put("metadata.broker.list","localhost:9092");   
kafkaParams.put("zookeeper.connect", "localhost:2181");
JavaPairInputDStream messages = KafkaUtils.createDirectStream( 
jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, 
kafkaParams, topicsSet);
final Accumulator accum = jssc.sparkContext().accumulator(0);
JavaDStream lines = messages.map(
new Function, String>() {
   public String call(Tuple2 tuple2) { 
accum.add(1); return tuple2._2();
} });
lines.foreachRDD(new Function() {
public Void call(JavaRDD rdd) throws Exception {
if(!rdd.isEmpty() || !rdd.partitions().isEmpty()){ 
rdd.saveAsTextFile("hdfs://quickstart.cloudera:8020/user/cloudera/testDirJan4/test1.text");}
System.out.println(" & COUNT OF ACCUMULATOR IS " + 
accum.value()); return null;}
 });
 jssc.start();

If I comment rdd.saveAsTextFile I get correct count, but with 
rdd.saveAsTextFile for each input I am getting multiple accumulator count.

Thanks,

Rachana


Spark Streaming Application is Stuck Under Heavy Load Due to DeadLock

2016-01-04 Thread Rachana Srivastava
Hello All,

I am running my application on Spark cluster but under heavy load the system is 
hung due to deadlock.  I found similar issues resolved here 
https://datastax-oss.atlassian.net/browse/JAVA-555 in  Spark version 2.1.3.  
But I am running on Spark 1.3 still getting the same issue.

Here is the stack trace for reference:

sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
org.apache.spark.streaming.ContextWaiter.waitForStopOrError(ContextWaiter.scala:63)
org.apache.spark.streaming.StreamingContext.awaitTermination(StreamingContext.scala:521)
org.apache.spark.streaming.api.java.JavaStreamingContext.awaitTermination(JavaStreamingContext.scala:592)

sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:994)
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1303)
java.util.concurrent.Semaphore.acquire(Semaphore.java:317)
org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:62)
org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply(AsynchronousListenerBus.scala:61)
org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply(AsynchronousListenerBus.scala:61)
org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1617)
org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:60)

sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
org.spark-project.jetty.util.BlockingArrayQueue.poll(BlockingArrayQueue.java:342)
org.spark-project.jetty.util.thread.QueuedThreadPool.idleJobPoll(QueuedThreadPool.java:526)
org.spark-project.jetty.util.thread.QueuedThreadPool.access$600(QueuedThreadPool.java:44)
org.spark-project.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:572)
java.lang.Thread.run(Thread.java:745)


Thanks,

Rachana



java.lang.NoSuchMethodError while saving a random forest model Spark version 1.5

2015-12-15 Thread Rachana Srivastava
I have recently upgraded spark version but when I try to run save a random 
forest model using model save command I am getting nosuchmethoderror.  My code 
works fine with 1.3x version.



model.save(sc.sc(), "modelsavedir");


ERROR: org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation - 
Aborting job.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 22.0 failed 1 times, most recent failure: Lost task 0.0 in stage 22.0 
(TID 230, localhost): java.lang.NoSuchMethodError: 
parquet.schema.Types$GroupBuilder.addField(Lparquet/schema/Type;)Lparquet/schema/Types$BaseGroupBuilder;
at 
org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter$$anonfun$convertField$1.apply(CatalystSchemaConverter.scala:517)
at 
org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter$$anonfun$convertField$1.apply(CatalystSchemaConverter.scala:516)
at 
scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
at 
scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)
at 
scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:108)
at 
org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convertField(CatalystSchemaConverter.scala:516)
at 
org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convertField(CatalystSchemaConverter.scala:312)
at 
org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter$$anonfun$convert$1.apply(CatalystSchemaConverter.scala:305)
at 
org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter$$anonfun$convert$1.apply(CatalystSchemaConverter.scala:305)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at 
org.apache.spark.sql.types.StructType.foreach(StructType.scala:92)
at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at 
org.apache.spark.sql.types.StructType.map(StructType.scala:92)
at 
org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convert(CatalystSchemaConverter.scala:305)
at 
org.apache.spark.sql.execution.datasources.parquet.ParquetTypesConverter$.convertFromAttributes(ParquetTypesConverter.scala:58)
at 
org.apache.spark.sql.execution.datasources.parquet.RowWriteSupport.init(ParquetTableSupport.scala:55)
at 
parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:287)
at 
parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:261)
at 
org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.(ParquetRelation.scala:94)
at 
org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anon$3.newInstance(ParquetRelation.scala:272)
at 
org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:233)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
at 
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)



spark-submit is throwing NPE when trying to submit a random forest model

2015-11-19 Thread Rachana Srivastava
Issue:
I have a random forest model that am trying to load during streaming using 
following code.  The code is working fine when I am running the code from 
Eclipse but getting NPE when running the code using spark-submit.

JavaStreamingContext jssc = new JavaStreamingContext(jsc, 
Durations.seconds(duration));
System.out.println("& trying to get the context 
&&& " );
final RandomForestModel model = 
RandomForestModel.load(jssc.sparkContext().sc(), MODEL_DIRECTORY);//line 116 
causing the issue.
System.out.println("& model debug &&& " 
+ model.toDebugString());


Exception Details:
INFO : org.apache.spark.scheduler.TaskSchedulerImpl - Removed TaskSet 2.0, 
whose tasks have all completed, from pool
Exception in thread "main" java.lang.NullPointerException
at 
org.apache.spark.mllib.tree.model.DecisionTreeModel$SaveLoadV1_0$SplitData.toSplit(DecisionTreeModel.scala:144)
at 
org.apache.spark.mllib.tree.model.DecisionTreeModel$SaveLoadV1_0$$anonfun$16.apply(DecisionTreeModel.scala:291)
at 
org.apache.spark.mllib.tree.model.DecisionTreeModel$SaveLoadV1_0$$anonfun$16.apply(DecisionTreeModel.scala:291)
at scala.Option.map(Option.scala:145)
at 
org.apache.spark.mllib.tree.model.DecisionTreeModel$SaveLoadV1_0$.constructNode(DecisionTreeModel.scala:291)
at 
org.apache.spark.mllib.tree.model.DecisionTreeModel$SaveLoadV1_0$.constructNode(DecisionTreeModel.scala:286)
at 
org.apache.spark.mllib.tree.model.DecisionTreeModel$SaveLoadV1_0$.constructNode(DecisionTreeModel.scala:287)
at 
org.apache.spark.mllib.tree.model.DecisionTreeModel$SaveLoadV1_0$.constructNode(DecisionTreeModel.scala:286)
at 
org.apache.spark.mllib.tree.model.DecisionTreeModel$SaveLoadV1_0$.constructTree(DecisionTreeModel.scala:268)
at 
org.apache.spark.mllib.tree.model.DecisionTreeModel$SaveLoadV1_0$$anonfun$12.apply(DecisionTreeModel.scala:251)
at 
org.apache.spark.mllib.tree.model.DecisionTreeModel$SaveLoadV1_0$$anonfun$12.apply(DecisionTreeModel.scala:250)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at 
scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at 
scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at 
org.apache.spark.mllib.tree.model.DecisionTreeModel$SaveLoadV1_0$.constructTrees(DecisionTreeModel.scala:250)
at 
org.apache.spark.mllib.tree.model.TreeEnsembleModel$SaveLoadV1_0$.loadTrees(treeEnsembleModels.scala:340)
at 
org.apache.spark.mllib.tree.model.RandomForestModel$.load(treeEnsembleModels.scala:72)
at 
org.apache.spark.mllib.tree.model.RandomForestModel.load(treeEnsembleModels.scala)
at 
com.markmonitor.antifraud.ce.KafkaURLStreaming.main(KafkaURLStreaming.java:116)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
at 
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
at 
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
at 
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
at 
org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Nov 19, 2015 1:10:56 PM WARNING: parquet.hadoop.ParquetRecordReader: Can not 
initialize counter due to context is not a instance of TaskInputOutputContext, 
but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl

Spark Source Code:
case class PredictData(predict: Double, prob: Double) {
  def toPredict: Predict = new Predict(predict, prob)
}

Thanks,

Rachana




Frozen exception while dynamically creating classes inside Spark using JavaAssist API

2015-11-03 Thread Rachana Srivastava
I am trying to dynamically create a new class in Spark using javaassist API. 
The code seems very simple just invoking makeClass API on a hardcoded class 
name. The code works find outside Spark environment but getting this 
chedkNotFrozen exception when I am running the code inside Spark
Code Excerpt:
ClassPool pool = ClassPool.getDefault()
CtClass regExClass= pool.makeClass("TestClass14",baseFeatureProcessor)
Exception Details:
## Exception make Class
java.lang.RuntimeException: TestClass14: frozen class (cannot edit)
at javassist.ClassPool.checkNotFrozen(ClassPool.java:617)
at javassist.ClassPool.makeClass(ClassPool.java:859)



yarn-cluster mode throwing NullPointerException

2015-10-11 Thread Rachana Srivastava
I am trying to submit a job using yarn-cluster mode using spark-submit command. 
 My code works fine when I use yarn-client mode.

Cloudera Version:
CDH-5.4.7-1.cdh5.4.7.p0.3

Command Submitted:
spark-submit --class "com.markmonitor.antifraud.ce.KafkaURLStreaming"  \
--driver-java-options 
"-Dlog4j.configuration=file:///etc/spark/myconf/log4j.sample.properties" \
--conf 
"spark.driver.extraJavaOptions=-Dlog4j.configuration=file:///etc/spark/myconf/log4j.sample.properties"
 \
--conf 
"spark.executor.extraJavaOptions=-Dlog4j.configuration=file:///etc/spark/myconf/log4j.sample.properties"
 \
--num-executors 2 \
--executor-cores 2 \
../target/mm-XXX-ce-0.0.1-SNAPSHOT-jar-with-dependencies.jar \
yarn-cluster 10 "XXX:2181" "XXX:9092" groups kafkaurl 5 \
"hdfs://ip-10-0-0-XXX.us-west-2.compute.internal:8020/user/ec2-user/urlFeature.properties"
 \
"hdfs://ip-10-0-0-XXX.us-west-2.compute.internal:8020/user/ec2-user/urlFeatureContent.properties"
 \
"hdfs://ip-10-0-0-XXX.us-west-2.compute.internal:8020/user/ec2-user/hdfsOutputNEWScript/OUTPUTYarn2"
  false


Log Details:
INFO : org.apache.spark.SparkContext - Running Spark version 1.3.0
INFO : org.apache.spark.SecurityManager - Changing view acls to: ec2-user
INFO : org.apache.spark.SecurityManager - Changing modify acls to: ec2-user
INFO : org.apache.spark.SecurityManager - SecurityManager: authentication 
disabled; ui acls disabled; users with view permissions: Set(ec2-user); users 
with modify permissions: Set(ec2-user)
INFO : akka.event.slf4j.Slf4jLogger - Slf4jLogger started
INFO : Remoting - Starting remoting
INFO : Remoting - Remoting started; listening on addresses 
:[akka.tcp://sparkdri...@ip-10-0-0-xxx.us-west-2.compute.internal:49579]
INFO : Remoting - Remoting now listens on addresses: 
[akka.tcp://sparkdri...@ip-10-0-0-xxx.us-west-2.compute.internal:49579]
INFO : org.apache.spark.util.Utils - Successfully started service 'sparkDriver' 
on port 49579.
INFO : org.apache.spark.SparkEnv - Registering MapOutputTracker
INFO : org.apache.spark.SparkEnv - Registering BlockManagerMaster
INFO : org.apache.spark.storage.DiskBlockManager - Created local directory at 
/tmp/spark-1c805495-c7c4-471d-973f-b1ae0e2c8ff9/blockmgr-fff1946f-a716-40fc-a62d-bacba5b17638
INFO : org.apache.spark.storage.MemoryStore - MemoryStore started with capacity 
265.4 MB
INFO : org.apache.spark.HttpFileServer - HTTP File server directory is 
/tmp/spark-8ed6f513-854f-4ee4-95ea-87185364eeaf/httpd-75cee1e7-af7a-4c82-a9ff-a124ce7ca7ae
INFO : org.apache.spark.HttpServer - Starting HTTP Server
INFO : org.spark-project.jetty.server.Server - jetty-8.y.z-SNAPSHOT
INFO : org.spark-project.jetty.server.AbstractConnector - Started 
SocketConnector@0.0.0.0:46671
INFO : org.apache.spark.util.Utils - Successfully started service 'HTTP file 
server' on port 46671.
INFO : org.apache.spark.SparkEnv - Registering OutputCommitCoordinator
INFO : org.spark-project.jetty.server.Server - jetty-8.y.z-SNAPSHOT
INFO : org.spark-project.jetty.server.AbstractConnector - Started 
SelectChannelConnector@0.0.0.0:4040
INFO : org.apache.spark.util.Utils - Successfully started service 'SparkUI' on 
port 4040.
INFO : org.apache.spark.ui.SparkUI - Started SparkUI at 
http://ip-10-0-0-XXX.us-west-2.compute.internal:4040
INFO : org.apache.spark.SparkContext - Added JAR 
file:/home/ec2-user/CE/correlationengine/scripts/../target/mm-anti-fraud-ce-0.0.1-SNAPSHOT-jar-with-dependencies.jar
 at 
http://10.0.0.XXX:46671/jars/mm-anti-fraud-ce-0.0.1-SNAPSHOT-jar-with-dependencies.jar
 with timestamp 1444620509463
INFO : org.apache.spark.scheduler.cluster.YarnClusterScheduler - Created 
YarnClusterScheduler
ERROR: org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend - 
Application ID is not set.
INFO : org.apache.spark.network.netty.NettyBlockTransferService - Server 
created on 33880
INFO : org.apache.spark.storage.BlockManagerMaster - Trying to register 
BlockManager
INFO : org.apache.spark.storage.BlockManagerMasterActor - Registering block 
manager ip-10-0-0-XXX.us-west-2.compute.internal:33880 with 265.4 MB RAM, 
BlockManagerId(, ip-10-0-0-XXX.us-west-2.compute.internal, 33880)
INFO : org.apache.spark.storage.BlockManagerMaster - Registered BlockManager
INFO : org.apache.spark.scheduler.EventLoggingListener - Logging events to 
hdfs://ip-10-0-0-XXX.us-west-2.compute.internal:8020/user/spark/applicationHistory/spark-application-1444620509497
Exception in thread "main" java.lang.NullPointerException
at 
org.apache.spark.deploy.yarn.ApplicationMaster$.sparkContextInitialized(ApplicationMaster.scala:580)
at 
org.apache.spark.scheduler.cluster.YarnClusterScheduler.postStartHook(YarnClusterScheduler.scala:32)
at org.apache.spark.SparkContext.(SparkContext.scala:541)
at 
org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:61)
at 
com.markmonitor.antifraud.ce.KafkaURLStreaming.main(KafkaURLStreaming.java:91)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 

Where are logs for Spark Kafka Yarn on Cloudera

2015-09-29 Thread Rachana Srivastava
Hello all,

I am trying to test JavaKafkaWordCount on Yarn, to make sure Yarn is working 
fine I am saving the output to hdfs.  The example works fine in local mode but 
not on yarn mode.  I cannot see any output logged when I changed the mode to 
yarn-client or yarn-cluster or cannot find any errors logged.  For my 
application id I was looking for logs under /var/log/hadoop-yarn/containers 
(e.g 
/var/log/hadoop-yarn/containers/application_1439517792099_0010/container_1439517792099_0010_01_03/stderr)
 but I cannot find anything useful information.   Is it the only location where 
application logs are logged.

Also tried setting log output to spark.yarn.app.container.log.dir but got 
access denied error.

Question:  Do we need to have some special setup to run spark streaming on 
Yarn?  How do we debug?  Where to find more details to test streaming on Yarn.

Thanks,

Rachana


spark-submit classloader issue...

2015-09-28 Thread Rachana Srivastava
Hello all,

Goal:  I want to use APIs from HttpClient library 4.4.1.  I am using maven 
shaded plugin to generate JAR.



Findings: When I run my program as a java application within eclipse everything 
works fine.  But when I am running the program using spark-submit I am getting 
following error:

URL content Could not initialize class 
org.apache.http.conn.ssl.SSLConnectionSocketFactory

java.lang.NoClassDefFoundError: Could not initialize class 
org.apache.http.conn.ssl.SSLConnectionSocketFactory



When I tried to get the referred JAR it is pointing to some Hadoop JAR,  I am 
assuming this is something set in spark-submit.



ClassLoader classLoader = HttpEndPointClient.class.getClassLoader();

URL resource = 
classLoader.getResource("org/apache/http/message/BasicLineFormatter.class");

Prints following jar:

jar:file:/usr/lib/hadoop/lib/httpcore-4.2.5.jar!/org/apache/http/message/BasicLineFormatter.class



After research I found that I can override --conf 
spark.files.userClassPathFirst=true --conf spark.yarn.user.classpath.first=true



But when I do that I am getting following error:

ERROR: org.apache.spark.executor.Executor - Exception in task 0.0 in stage 0.0 
(TID 0)

java.io.InvalidClassException: org.apache.spark.scheduler.Task; local class 
incompatible: stream classdesc serialVersionUID = -4703555755588060120, local 
class serialVersionUID = -1589734467697262504

at 
java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:617)

at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622)

at 
java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)

at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622)

at 
java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)

at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)

at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)

at 
java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)

at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:68)

at 
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:94)

at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:185)

at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:745)



I am running on CDH 5.4  Here is my complete pom file.



http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd">

4.0.0

test

test

0.0.1-SNAPSHOT






org.apache.httpcomponents


httpcore

4.4.1






org.apache.httpcomponents


httpclient

4.4.1






org.apache.spark


spark-streaming-kafka_2.10

1.5.0





 httpcore

  org.apache.httpcomponents










org.apache.spark


spark-streaming_2.10

1.5.0





 httpcore

  org.apache.httpcomponents










org.apache.spark


spark-core_2.10

1.5.0





 httpcore

  org.apache.httpcomponents










org.apache.spark


spark-mllib_2.10

  

JavaRDD using Reflection

2015-09-14 Thread Rachana Srivastava
Hello all,

I am working a problem that requires us to create different set of JavaRDD 
based on different input arguments.  We are getting following error when we try 
to use a factory to create JavaRDD.  Error message is clear but I am wondering 
is there any workaround.

Question:
How to create different set of JavaRDD based on different input arguments 
dynamically.  Trying to implement something like factory pattern.

Error Message:
RDD transformations and actions can only be invoked by the driver, not inside 
of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) 
is invalid because the values transformation and count action cannot be 
performed inside of the rdd1.map transformation. For more information, see 
SPARK-5063.

Thanks,

Rachana


New JavaRDD Inside JavaPairDStream

2015-09-11 Thread Rachana Srivastava
Hello all,

Can we invoke JavaRDD while processing stream from Kafka for example.  
Following code is throwing some serialization exception.  Not sure if this is 
feasible.

  JavaStreamingContext jssc = new JavaStreamingContext(jsc, 
Durations.seconds(5));
JavaPairReceiverInputDStream messages = 
KafkaUtils.createStream(jssc, zkQuorum, group, topicMap);
JavaDStream lines = messages.map(new Function, String>() {
  public String call(Tuple2 tuple2) { return tuple2._2();
  }
});
JavaPairDStream wordCounts = lines.mapToPair( new 
PairFunction() {
public Tuple2 call(String urlString) {
String propertiesFile = 
"/home/cloudera/Desktop/sample/input/featurelist.properties";
JavaRDD propertiesFileRDD = 
jsc.textFile(propertiesFile);
  JavaPairRDD featureKeyClassPair = 
propertiesFileRDD.mapToPair(
  new PairFunction() {
  public Tuple2 
call(String property) {
return new 
Tuple2(property.split("=")[0], property.split("=")[1]);
  }
 });
featureKeyClassPair.count();
  return new Tuple2(urlString,  featureScore);
}
  });



Can Spark Provide Multiple Context Support?

2015-09-08 Thread Rachana Srivastava
Question: How does Spark support multiple context?

Background:  I have a stream of data coming to Spark from Kafka.   For each 
data in the stream I want to download some files from HDFS and process the file 
data.  I have written code to process the file from HDFS and I have code 
written to process stream data from Kafka using SparkStreaming API.  I have not 
been able to link both.

Can you please let me know if it is feasible to create JavaRDD from file inside 
SparkStreamingRDD job processing step?

Thanks,

Rachana