Re: Spark not releasing shuffle files in time (with very large heap)

2018-02-23 Thread Holden Karau
You can also look at the shuffle file cleanup tricks we do inside of the
ALS algorithm in Spark.

On Fri, Feb 23, 2018 at 6:20 PM, vijay.bvp  wrote:

> have you looked at
> http://apache-spark-user-list.1001560.n3.nabble.com/Limit-
> Spark-Shuffle-Disk-Usage-td23279.html
>
> and the post mentioned there
> https://forums.databricks.com/questions/277/how-do-i-avoid-
> the-no-space-left-on-device-error.html
>
> also try compressing the output
> https://spark.apache.org/docs/latest/configuration.html#
> compression-and-serialization
> spark.shuffle.compress
>
> thanks
> Vijay
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
Twitter: https://twitter.com/holdenkarau


Re: Spark not releasing shuffle files in time (with very large heap)

2018-02-23 Thread vijay.bvp
have you looked at 
http://apache-spark-user-list.1001560.n3.nabble.com/Limit-Spark-Shuffle-Disk-Usage-td23279.html

and the post mentioned there
https://forums.databricks.com/questions/277/how-do-i-avoid-the-no-space-left-on-device-error.html

also try compressing the output
https://spark.apache.org/docs/latest/configuration.html#compression-and-serialization
spark.shuffle.compress

thanks
Vijay



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Apache Spark - Structured Streaming reading from Kafka some tasks take much longer

2018-02-23 Thread vijay.bvp
Instead of spark-shell have you tried running it as a job. 

how many executors and cores, can you share the RDD graph and event timeline
on the UI and did you find which of  the tasks taking more time was they are
any GC 

please look at the UI if not already it can provide lot of information



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [Spark Streaming]: Non-deterministic uneven task-to-machine assignment

2018-02-23 Thread vijay.bvp
thanks for adding RDD lineage graph.
I could see 18 parallel tasks for HDFS Read was it changed. 


what is the spark job configuration, how many executors and cores per
exeuctor

i would say keep the partitioning multiple of  (no of executors * cores) for
all the RDD's

if you have 3 executors with 3 cores assigned for the job, 9 parallel tasks
are posible
set repartitioning on rdd;s to multiple of 9 

spark.read.parquet().repartition(27)
kafka.createDStream().repartition(27)

coalesce with shuff=false will actually causes problem with upstream
parallelism. 

please test the above scenario and share the findings.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: sqoop import job not working when spark thrift server is running.

2018-02-23 Thread vijay.bvp
it sure is not able to get sufficient resources from YARN to start the
containers.
is it only with this import job or if you submit any other job its failing
to start.

As a test just try to run another spark job or a mapredue job  and see if
the job can be started.

Reduce the thrift server executors and see overall there is available
cluster capacity for new jobs.





--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Can spark handle this scenario?

2018-02-23 Thread vijay.bvp
when HTTP connection is opened you are opening a connection between specific
machine (with IP and NIC card) to another specific machine, so this can't be
serialized and used on other machine right!!

This isn't spark limitation. 

I made a simple diagram if it helps. The Objects created at driver and
passed to worker need to be serialized. The objects created at workers need
not. 

In the diagram you have to create HTTPConnection on the executors
independently of the driver.
The HTTPConnection created at Executor-1 can be used for partitions P1-P3 of
RDD available on that executor. 

Spark is tolerant and does allow passing objects from driver to workers, but
in case if it reports "Task not serializable"  it does indicate some object
is having issue. mark the class as Serializable if you think if the object
of it can be serialized. As I said in the beginning not everything could
serializable particularly http connections, JDBC connections etc.. 

 














--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Unsubscribe

2018-02-23 Thread Brindha Sengottaiyan



Apache Spark - Structured Streaming reading from Kafka some tasks take much longer

2018-02-23 Thread M Singh
Hi:
I am working with spark structured streaming (2.2.1) reading data from Kafka 
(0.11).  

I need to aggregate data ingested every minute and I am using spark-shell at 
the moment.  The message rate ingestion rate is approx 500k/second.  During 
some trigger intervals (1 minute) especially when the streaming process is 
started, all tasks finish in 20seconds but during some triggers, it takes 90 
seconds.  

I have tried to reduce the number of partitions approx (100 from 300) to reduce 
the consumers for Kafka, but that has not helped. I also tried the 
kafkaConsumer.pollTimeoutMs to 30 seconds but then I see a lot of 
java.util.concurrent.TimeoutException: Cannot fetch record for offset.
So I wanted to see if anyone has any thoughts/recommendations.
Thanks




Spark with Kudu behaving unexpectedly when bringing down the Kudu Service

2018-02-23 Thread ravidspark
Hi All,

I am trying to read data from Kafka and ingest into Kudu using Spark
Streaming. I am not using KuduContext to perform the upsert operation into
kudu. Instead using Kudus native Client API to build the PartialRow and
applying the operation for every record from Kafka. I am able to run the
spark streaming job and every thing looks good. I am able to see the data
into Kudu tables. But, after processing few batches, when I bring down the
Kudu service, then my executor program becomes a zombie(the execution is not
at all coming to my executor class anymore) and the internal threads that
establishes connection to Kudu(which I am not handling in my code) is
throwing exceptions which I am not able to handle resulting in message loss.
Please find below the exception:


18/02/23 00:16:30 ERROR client.TabletClient: [Peer
bd91f34d456a4eccaae50003c90f0fb2] Unexpected exception from downstream on
[id: 0x6e13b01f]
java.net.ConnectException: Connection refused:
kudu102.dev.sac.int.threatmetrix.com/10.112.3.12:7050
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at
org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.NioClientBoss.connect(NioClientBoss.java:152)
at
org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.NioClientBoss.processSelectedKeys(NioClientBoss.java:105)
at
org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.NioClientBoss.process(NioClientBoss.java:79)
at
org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337)
at
org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.NioClientBoss.run(NioClientBoss.java:42)
at
org.apache.kudu.client.shaded.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
at
org.apache.kudu.client.shaded.org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Also, my executor code(one of the map transformation in the lineage is
calling the below class) which establishes the connection to Kudu once per
JVM when application start is:


package org.dwh.streaming.kudu.sparkkudustreaming;

import java.util.List;
import java.util.Map;
import org.apache.kudu.client.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.dwh.streaming.kudu.sparkkudustreaming.config.LoadAppConf;
import
org.dwh.streaming.kudu.sparkkudustreaming.constants.SpecialNullConstants;
import org.dwh.streaming.kudu.sparkkudustreaming.models.Store;

public class KuduProcess {
private static Logger logger = 
LoggerFactory.getLogger(KuduProcess.class);

private static final KuduProcess instance = new KuduProcess();
private static KuduClient client;
private static KuduTable table;
private static KuduSession session;
private static OperationResponse response;

private KuduProcess(){
try {
Store store = LoadAppConf.loadAppConf();
client = new 
KuduClient.KuduClientBuilder(store.getKuduHost()).build(); 
table = client.openTable(store.getKuduTable());
session = client.newSession();

session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND);
} catch (KuduException e) {
logger.error("Kudu Exception:"+ e.getMessage());
} 
}

public static String upsertKudu(Map formattedMap) {
if (formattedMap.size() != 0) {
try {
Upsert upsert = table.newUpsert();
PartialRow row = upsert.getRow();
for(Map.Entry entry: 
formattedMap.entrySet()){
if 
(entry.getValue().getClass().equals(String.class)){

if(entry.getValue().equals(SpecialNullConstants.specialStringNull))

row.setNull(entry.getKey());
else 
row.addString(entry.getKey(), (String) entry.getValue());
}   

else if 
(entry.getValue().getClass().equals(Long.class)){

if(entry.getValue().equals(SpecialNullConstants.specialLongNull))

row.setNull(entry.getKey());
  

Reservoir sampling in parallel

2018-02-23 Thread Patrick McCarthy
I have a large dataset composed of scores for several thousand segments,
and the timestamps at which time those scores occurred. I'd like to apply
some techniques like reservoir sampling[1], where for every segment I
process records in order of their timestamps, generate a sample, and then
at intervals compute the quantiles in the sample. Ideally I'd like to write
a pyspark udf to do the sampling/quantizing procedure.

It seems like something I should be doing via rdd.map, but it's not really
clear how I can enforce a function to process records in order within a
partition. Any pointers?

Thanks,
Patrick

[1] https://en.wikipedia.org/wiki/Reservoir_sampling


NotSerializableException with Trait

2018-02-23 Thread Jean Rossier
Hello,

I have a few spark jobs that are doing the same aggregations. I want to
factorize the aggregation logic. For that I want to use a Trait.
When I run this job extending my Trait (over yarn, in client mode), I get
a NotSerializableException (in attachment).
If I change my Trait to an Object, the job runs fine and I don't have
a NotSerializableException.

Could you explain me why ? I don't understand this behavior

Thnaks
Jean

--

object SparkJob extends App {

  val conf = new SparkConf()
  val sparkSession: SparkSession = SparkSession.builder()
.appName("aggregateAdTechImpressions")
.config(conf)
.enableHiveSupport()
.getOrCreate()

...

  val impressionsAdtechDF =
MyUtil.prepareAggregationDataFrame(impressionsAdtechRawDF,
"timestamp")

  val impressionsAggregationDF: DataFrame =
MyUtil.aggregateImpressions(impressionsAdtechDF)

...

}

object MyUtil {

  private def parseTs(ts: Int): Int = {
val tsMilli: Long = ts.toLong * 1000L
val date: Date = new Date(tsMilli)
val dateFormat = new SimpleDateFormat("MMdd")
val dateStr = dateFormat.format(date)
if (dateStr == null) 19000101 else dateStr.toInt
  }
  private def udfParseTs: UserDefinedFunction = udf(parseTs _)

  def prepareAggregationDataFrame(rawDF: DataFrame,
timestampColumnName: String): DataFrame = {

rawDF
  .withColumn("original_placement_id", col("placementid"))
  .withColumn("date", udfParseTs(col(timestampColumnName)))
  .withColumn("placement_id", col("placementid") cast StringType)
  .withColumnRenamed("campaignid", "campaign_id")
  .withColumnRenamed("placementSizeTypeId", "size_id")
  .drop("placementid")
  .drop(timestampColumnName)
  }

  def aggregateImpressions(inputDF: DataFrame): DataFrame = {

inputDF.groupBy(
  col("date"),
  col("campaign_id"),
  col("original_placement_id"),
  col("placement_id"),
  col("size_id"))
  .agg(count(lit(1)).alias("cnt"))
  .withColumn("type", lit(1))
  .withColumn("revenue_chf", lit(0) cast DoubleType)
  .withColumn("revenue_eur", lit(0) cast DoubleType)
  .withColumn("source", lit(0)) // 0 for AdTech
  }
}



object SparkJob2 extends App with MyTrait {

  val conf = new SparkConf()
  val sparkSession: SparkSession = SparkSession.builder()
.appName("aggregateAdTechImpressions")
.config(conf)
.enableHiveSupport()
.getOrCreate()

...

  val impressionsAdtechDF =
prepareAggregationDataFrame(impressionsAdtechRawDF, "timestamp")

  val impressionsAggregationDF: DataFrame =
aggregateImpressions(impressionsAdtechDF)

...

}

trait MyTrait {

  private def parseTs(ts: Int): Int = {
val tsMilli: Long = ts.toLong * 1000L
val date: Date = new Date(tsMilli)
val dateFormat = new SimpleDateFormat("MMdd")
val dateStr = dateFormat.format(date)
if (dateStr == null) 19000101 else dateStr.toInt
  }
  private def udfParseTs: UserDefinedFunction = udf(parseTs _)

  def prepareAggregationDataFrame(rawDF: DataFrame,
timestampColumnName: String): DataFrame = {

rawDF
  .withColumn("original_placement_id", col("placementid"))
  .withColumn("date", udfParseTs(col(timestampColumnName)))
  .withColumn("placement_id", col("placementid") cast StringType)
  .withColumnRenamed("campaignid", "campaign_id")
  .withColumnRenamed("placementSizeTypeId", "size_id")
  .drop("placementid")
  .drop(timestampColumnName)
  }

  def aggregateImpressions(inputDF: DataFrame): DataFrame = {

inputDF.groupBy(
  col("date"),
  col("campaign_id"),
  col("original_placement_id"),
  col("placement_id"),
  col("size_id"))
  .agg(count(lit(1)).alias("cnt"))
  .withColumn("type", lit(1))
  .withColumn("revenue_chf", lit(0) cast DoubleType)
  .withColumn("revenue_eur", lit(0) cast DoubleType)
  .withColumn("source", lit(0)) // 0 for AdTech
  }
}


spark-NotSerializableException.log
Description: Binary data

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Spark-Solr -- unresolved dependencies

2018-02-23 Thread Selvam Raman
Hi,

spark version - EMR 2.0.0

spark-shell --packages com.lucidworks.spark:spark-solr:3.0.1

when i tired about command, am getting below error


::

::  UNRESOLVED DEPENDENCIES ::

::

:: org.restlet.jee#org.restlet;2.3.0: not found

:: org.restlet.jee#org.restlet.ext.servlet;2.3.0: not found

::



:: USE VERBOSE OR DEBUG MESSAGE LEVEL FOR MORE DETAILS
Exception in thread "main" java.lang.RuntimeException: [unresolved
dependency: org.restlet.jee#org.restlet;2.3.0: not found, unresolved
dependency: org.restlet.jee#org.restlet.ext.servlet;2.3.0: not found]
at
org.apache.spark.deploy.SparkSubmitUtils$.resolveMavenCoordinates(SparkSubmit.scala:1066)
at
org.apache.spark.deploy.SparkSubmit$.prepareSubmitEnvironment(SparkSubmit.scala:294)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:158)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


What happens if I can't fit data into memory while doing stream-stream join.

2018-02-23 Thread kant kodali
Hi All,

I am experimenting with Spark 2.3.0 stream-stream join feature to see if I
can leverage it to replace some of our existing services.

Imagine I have 3 worker nodes with *each node* having (16GB RAM and 100GB
SSD). My input dataset which is in Kafka is about 250GB per day. Now I want
to do a stream-stream join across 8 data frames with a watermark set to 24
hours of Tumbling window. so, I need to hold state for 24 hours and then I
can clear all the data.

Questions:

1) What happens if I can't fit data into memory while doing stream-stream
join?
2) What Storage Level should I choose here for near optimal performance?
3) Any other suggestions?

Thanks!


What's relationship between the TensorflowOnSpark core modules?

2018-02-23 Thread xiaobo
Hi,
After reading the mnist example and the API of TensorflowOnSpark, I somehow got 
confused, here are some questions:
1、 What's the relationship between TFCluster/TFManager/TFNode and TFSparkNode 
modules.
2、The conversion guide says we should replace the main function with a 
main_fun, but the examples actually defines a map function, are they the same 
thing?
3、map_function parameter question:
  when the map_function is called, argv is the sys.args, what about the ctx 
parameter, what's the type of ctx, and how its value got assigned,I can see the 
ctx object has various properties and even functions .
4、The conversion guide says there should be step 3 and step 4, but the mnist 
example does not do these things
5、By the way, where can I post questions about TFoS, I can  not join the 
official google group.


I am sorry if I post this in a wrong place.

Re: HBase connector does not read ZK configuration from Spark session

2018-02-23 Thread Deepak Sharma
Hi Dharmin
With the 1st approach , you will have to read the properties from the
--files using this below:
SparkFiles.get('file.txt')

Or else , you can copy the file to hdfs , read it using sc.textFile and use
the property within it.

If you add files using --files , it gets copied to executor's working
directory but you still have to read it and use the properties to be set in
conf.
Thanks
Deepak

On Fri, Feb 23, 2018 at 10:25 AM, Dharmin Siddesh J <
siddeshjdhar...@gmail.com> wrote:

> I am trying to write a Spark program that reads data from HBase and store
> it in DataFrame.
>
> I am able to run it perfectly with hbase-site.xml in the $SPARK_HOME/conf
> folder, but I am facing few issues here.
>
> Issue 1
>
> The first issue is passing hbase-site.xml location with the --files
> parameter submitted through client mode (it works in cluster mode).
>
>
> When I removed hbase-site.xml from $SPARK_HOME/conf and tried to execute
> it in client mode by passing with the --files parameter over YARN I keep
> getting the an exception (which I think means it is not taking the
> ZooKeeper configuration from hbase-site.xml.
>
> spark-submit \
>
>   --master yarn \
>
>   --deploy-mode client \
>
>   --files /home/siddesh/hbase-site.xml \
>
>   --class com.orzota.rs.json.HbaseConnector \
>
>   --packages com.hortonworks:shc:1.0.0-2.0-s_2.11 \
>
>   --repositories http://repo.hortonworks.com/content/groups/public/ \
>
>   target/scala-2.11/test-0.1-SNAPSHOT.jar
>
> at org.apache.zookeeper.ClientCnxn$SendThread.run(
> ClientCnxn.java:1125)
>
> 18/02/22 01:43:09 INFO ClientCnxn: Opening socket connection to server
> localhost/0:0:0:0:0:0:0:1:2181. Will not attempt to authenticate using
> SASL (unknown error)
>
> 18/02/22 01:43:09 WARN ClientCnxn: Session 0x0 for server null, unexpected
> error, closing socket connection and attempting reconnect
>
> java.net.ConnectException: Connection refused
>
> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>
> at sun.nio.ch.SocketChannelImpl.finishConnect(
> SocketChannelImpl.java:717)
>
> at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(
> ClientCnxnSocketNIO.java:361)
>
> at org.apache.zookeeper.ClientCnxn$SendThread.run(
> ClientCnxn.java:1125)
>
> However it works good when I run it in cluster mode.
>
>
> Issue 2
>
> Passing the HBase configuration details through the Spark session, which I
> can't get to work in both client and cluster mode.
>
>
> Instead of passing the entire hbase-site.xml I am trying to add the
> configuration directly in the code by adding it as a configuration
> parameter in the SparkSession, e.g.:
>
>
> val spark = SparkSession
>
>   .builder()
>
>   .appName(name)
>
>   .config("hbase.zookeeper.property.clientPort", "2181")
>
>   .config("hbase.zookeeper.quorum", "ip1,ip2,ip3")
>
>   .config("spark.hbase.host","zookeeperquorum")
>
>   .getOrCreate()
>
>
> val json_df =
>
>   spark.read.option("catalog",catalog_read).
>
>   format("org.apache.spark.sql.execution.datasources.hbase").
>
>   load()
>
> This is not working in cluster mode either.
>
>
> Can anyone help me with a solution or explanation why this is happening
> are there any workarounds?
>
>
>


-- 
Thanks
Deepak
www.bigdatabig.com
www.keosha.net