If TypedColumn is a subclass of Column, why I cannot apply function on it in Dataset?

2017-03-18 Thread Yong Zhang
In the following example, after I used "typed.avg" to generate a TypedColumn, 
and I want to apply round on top of it? But why Spark complains about it? 
Because it doesn't know that it is a TypedColumn?


Thanks


Yong



scala> spark.version

res20: String = 2.1.0

scala> case class Token(name: String, productId: Int, score: Double)

defined class Token

scala> val data = Token("aaa", 100, 0.12) :: Token("aaa", 200, 0.29) :: 
Token("bbb", 200, 0.53) :: Token("bbb", 300, 0.42) :: Nil

data: List[Token] = List(Token(aaa,100,0.12), Token(aaa,200,0.29), 
Token(bbb,200,0.53), Token(bbb,300,0.42))

scala> val dataset = data.toDS

dataset: org.apache.spark.sql.Dataset[Token] = [name: string, productId: int 
... 1 more field]

scala> import org.apache.spark.sql.expressions.scalalang._

import org.apache.spark.sql.expressions.scalalang._

scala> dataset.groupByKey(_.productId).agg(typed.avg[Token](_.score)).show

+-+-+

|value|TypedAverage($line22.$readiwiw$Token)|

+-+-+

| 300| 0.42|

| 100| 0.12|

| 200| 0.41003|

+-+-+

scala> dataset.groupByKey(_.productId).agg(round(typed.avg[Token](_.score)))

:36: error: type mismatch;

found : org.apache.spark.sql.Column

required: org.apache.spark.sql.TypedColumn[Token,?]

dataset.groupByKey(_.productId).agg(round(typed.avg[Token](_.score)))



Re: Spark Streaming from Kafka, deal with initial heavy load.

2017-03-18 Thread Mal Edwin

Hi,
You can enable backpressure to handle this.

spark.streaming.backpressure.enabled
spark.streaming.receiver.maxRate

Thanks,
Edwin

On Mar 18, 2017, 12:53 AM -0400, sagarcasual . , wrote:
> Hi, we have spark 1.6.1 streaming from Kafka (0.10.1) topic using direct 
> approach. The streaming part works fine but when we initially start the job, 
> we have to deal with really huge Kafka message backlog, millions of messages, 
> and that first batch runs for over 40 hours,  and after 12 hours or so it 
> becomes very very slow, it keeps crunching messages, but at a very low speed. 
> Any idea how to overcome this issue? Once the job is all caught up, 
> subsequent batches are quick and fast since the load is really tiny to 
> process. So any idea how to avoid this problem?




Re: spark streaming exectors memory increasing and executor killed by yarn

2017-03-18 Thread Bill Schwanitz
I have had similar issues with some of my spark jobs especially doing 
things like repartitioning.


spark.yarn.driver.memoryOverhead	driverMemory * 0.10, with minimum of 
384	The amount of off-heap memory (in megabytes) to be allocated per 
driver in cluster mode. This is memory that accounts for things like VM 
overheads, interned strings, other native overheads, etc. This tends to 
grow with the container size (typically 6-10%).


I bumped the overhead memory as a way to work around the issue. Not sure 
if that is the best way but its how I got around it ;)


darin wrote:

Hi,
I got this exception when streaming program run some hours.

```
*User class threw exception: org.apache.spark.SparkException: Job aborted
due to stage failure: Task 21 in stage 1194.0 failed 4 times, most recent
failure: Lost task 21.3 in stage 1194.0 (TID 2475, 2.dev3, executor 66):
ExecutorLostFailure (executor 66 exited caused by one of the running tasks)
Reason: Container killed by YARN for exceeding memory limits. 3.5 GB of 3.5
GB physical memory used. Consider boosting
spark.yarn.executor.memoryOverhead.*
```

I have googled some solutions like close yarn memory monitor ,increasing
exector memory... .I think it is not the right way .


And this is the submit script:
```
*spark-submit --master yarn-cluster --driver-cores 1 --driver-memory 1G
--num-executors 6 --executor-cores 3 --executor-memory 3G --conf
"spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC -XX:+UseParNewGC
-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/javadump.hprof" --conf
"spark.kryoserializer.buffer.max=512m" --class
com.dtise.data.streaming.ad.DTStreamingStatistics
hdfs://nameservice1/user/yanghb/spark-streaming-1.0.jar*
```

And This is the main codes:

```
val originalStream = ssc.textFileStream(rawDataPath)
 originalStream.repartition(10).mapPartitions(parseAdLog).reduceByKey(_
++ _)
   .mapWithState(StateSpec.function(countAdLogWithState
_)).foreachRDD(rdd =>  {
 if (!rdd.isEmpty()) {
   val batchTime = Calendar.getInstance.getTimeInMillis
   val dimensionSumMap = rdd.map(_._1).reduce(_ ++ _)
   val nameList = rdd.map(_._2).reduce(_ ++ _).toList
   val jedis = RedisUtils.jedis()
   jedis.hmset(joinString("t_ad_dimension_sum", batchTime),
dimensionSumMap)
   jedis.lpush(joinString("t_ad_name", batchTime), nameList: _*)
   jedis.set(joinString("t_ad", batchTime.toString), "OK")
   jedis.close()

   rdd.flatMap(_._3).foreachPartition(logInfoList =>  {
 val producter = new StringProducter
 for (logInfo<- logInfoList) {
   val logInfoArr = logInfo.split("\t", -1)
   val kafkaKey = "ad/" + logInfoArr(campaignIdIdx) + "/" +
logInfoArr(logDateIdx)
   producter.send("cookedLog", kafkaKey, logInfo)
 }
 producter.close()
   })
 }
   })
```

These are jvm heap mat results





/*Anybody has any advice about this ?
Thanks*/





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-exectors-memory-increasing-and-executor-killed-by-yarn-tp28500.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [Spark SQL & Core]: RDD to Dataset 1500 columns data with createDataFrame() throw exception of grows beyond 64 KB

2017-03-18 Thread Kazuaki Ishizaki
Hi
There is the latest status for code generation.

When we use the master that will be Spark 2.2, the following exception 
occurs. The latest version fixed 64KB errors in this case. However, we 
meet another JVM limit, the number of the constant pool entry.

Caused by: org.codehaus.janino.JaninoRuntimeException: Constant pool for 
class 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection
 
has grown past JVM limit of 0x
at 
org.codehaus.janino.util.ClassFile.addToConstantPool(ClassFile.java:499)
at 
org.codehaus.janino.util.ClassFile.addConstantNameAndTypeInfo(ClassFile.java:439)
at 
org.codehaus.janino.util.ClassFile.addConstantMethodrefInfo(ClassFile.java:358)
...

While this PR https://github.com/apache/spark/pull/16648 addresses the 
number of the constant pool issue, it has not been merged yet.

Regards,
Kazuaki Ishizaki



From:   elevy 
To: user@spark.apache.org
Date:   2017/03/18 17:14
Subject:[Spark SQL & Core]: RDD to Dataset 1500 columns data with 
createDataFrame() throw exception of grows beyond 64 KB



Hello all, 
I am using the Spark 2.1.0 release,
I am trying to load BigTable CSV file with more than 1500 columns into our
system 

Our flow of doing that is:

•   First, read the data as an RDD 
•   generate continuing record id using the zipWithIndex()
(this operation exist only in RDD API, 
 in the Dataset there is similar option which is
monotonically_increasing_id() 
 but this method work in partitioning and create id which is not
sequentially – and it is not what we need ☹)
•   converting the RDD to Dataset using the createDataFrame() of
sparkSession 
•   this last operation generate code that exceeded the JVM object size
limitation of 64KB 

I search the web for some solution or even similar Use Case, 
found few issues that talked about the 64KB error but all of the cases was
dealing with 100 column and solved in Spark 2.1.0 version by shrinking the
generated code, 
but none of them reach the JVM limitation 
 
*Any Idea from this forum of expert will be very appreciated *
there could be 2 type of solution we are looking for:
*1.* How should I overcome the size of the code generation 
*OR* 
*2.* How can I generate sequential ID directly on the Dataset

Our Temporal Solution:

•   reading the DS as RDD
•   generate sequential id 
•   write the new data as text file 
•   reading the data as Dataset
this solution cause us 30% of performance degradation :(

*Code That reproduced the issue *

/import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import poc.commons.SparkSessionInitializer;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.IntStream;

public class RDDConverter {
private static final int FIELD_COUNT = 1900;

private Dataset createBigSchema(SparkSession sparkSession , int
startColName, int fieldNumber) {
JavaSparkContext jsc = new
JavaSparkContext(sparkSession.sparkContext());
SQLContext sqlContext = new 
SQLContext(sparkSession.sparkContext());

String[] row = IntStream.range(startColName,
fieldNumber).mapToObj(String::valueOf).toArray(String[]::new);
List data = Collections.singletonList(row);
JavaRDD rdd = jsc.parallelize(data).map(RowFactory::create);

StructField[] structFields = IntStream.range(startColName,
fieldNumber)
.mapToObj(i -> new StructField(String.valueOf(i),
DataTypes.StringType, true, Metadata.empty()))
.toArray(StructField[]::new);
StructType schema = DataTypes.createStructType(structFields);

Dataset dataSet = sqlContext.createDataFrame(rdd, schema);
dataSet.show();
return dataSet;
}

public static void main(String[] args) {
SparkSessionInitializer sparkSessionInitializer = new
SparkSessionInitializer();
SparkSession sparkSession = sparkSessionInitializer.init();

RDDConverter rddConverter = new RDDConverter();
rddConverter.createBigSchema(sparkSession, 0, FIELD_COUNT);
}
}/

The Exception we are getting :

org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:893)
at
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:950)
at
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:947)
at
org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
at
org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
... 39 common frames omitted

Re: Streaming 2.1.0 - window vs. batch duration

2017-03-18 Thread Dominik Safaric
If I were to set the window duration to 60 seconds, while having a batch 
interval equal to a second, and a slide duration of 59 seconds I would get the 
desired behaviour.

However, would the Receiver pull messages from Kafka only at the 59th second 
slide interval or it would constantly pull the messages throughout the entire 
window duration of 60 seconds? 

Thanks,
Dominik

> On 17 Mar 2017, at 16:57, Cody Koeninger  wrote:
> 
> Probably easier if you show some more code, but if you just call
> dstream.window(Seconds(60))
> you didn't specify a slide duration, so it's going to default to your
> batch duration of 1 second.
> So yeah, if you're just using e.g. foreachRDD to output every message
> in the window, every second it's going to output the last 60 seconds
> of messages... which does mean each message will be output a total of
> 60 times.
> 
> Using a smaller window of 5 seconds for an example, 1 message per
> second, notice that message 1489765610 will be output a total of 5
> times
> 
> Window:
> 1489765605
> 1489765606
> 1489765607
> 1489765608
> 1489765609
> Window:
> 1489765606
> 1489765607
> 1489765608
> 1489765609
> 1489765610
> Window:
> 1489765607
> 1489765608
> 1489765609
> 1489765610
> 1489765611
> Window:
> 1489765608
> 1489765609
> 1489765610
> 1489765611
> 1489765612
> Window:
> 1489765609
> 1489765610
> 1489765611
> 1489765612
> 1489765613
> Window:
> 1489765610
> 1489765611
> 1489765612
> 1489765613
> 1489765614
> Window:
> 1489765611
> 1489765612
> 1489765613
> 1489765614
> 1489765615
> 
> On Thu, Mar 16, 2017 at 2:34 PM, Dominik Safaric
>  wrote:
>> Hi all,
>> 
>> As I’ve implemented a streaming application pulling data from Kafka every 1
>> second (batch interval), I am observing some quite strange behaviour (didn’t
>> use Spark extensively in the past, but continuous operator based engines
>> instead of).
>> 
>> Namely the dstream.window(Seconds(60)) windowed stream when written back to
>> Kafka contains more messages then they were consumed (for debugging purposes
>> using a small dataset of a million Kafka byte array deserialized messages).
>> In particular, in total I’ve streamed exactly 1 million messages, whereas
>> upon window expiry 60 million messages are written back to Kafka.
>> 
>> I’ve read on the official docs that both the window and window slide
>> duration must be multiples of the batch interval. Does this mean that when
>> consuming messages between two windows every batch interval the RDDs of a
>> given batch interval t the same batch is being ingested 59 more times into
>> the windowed stream?
>> 
>> If I would like to achieve this behaviour (batch every being equal to a
>> second, window duration 60 seconds) - how might one achieve this?
>> 
>> I would appreciate if anyone could correct me if I got the internals of
>> Spark’s windowed operations wrong and elaborate a bit.
>> 
>> Thanks,
>> Dominik


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Streaming 2.1.0 - window vs. batch duration

2017-03-18 Thread Dominik Safaric
Correct - that is the part that I understood nicely. 

However, what alternative transformation might I apply to iterate through the 
RDDs considering a window duration of 60 seconds which I cannot change?  

> On 17 Mar 2017, at 16:57, Cody Koeninger  wrote:
> 
> Probably easier if you show some more code, but if you just call
> dstream.window(Seconds(60))
> you didn't specify a slide duration, so it's going to default to your
> batch duration of 1 second.
> So yeah, if you're just using e.g. foreachRDD to output every message
> in the window, every second it's going to output the last 60 seconds
> of messages... which does mean each message will be output a total of
> 60 times.
> 
> Using a smaller window of 5 seconds for an example, 1 message per
> second, notice that message 1489765610 will be output a total of 5
> times
> 
> Window:
> 1489765605
> 1489765606
> 1489765607
> 1489765608
> 1489765609
> Window:
> 1489765606
> 1489765607
> 1489765608
> 1489765609
> 1489765610
> Window:
> 1489765607
> 1489765608
> 1489765609
> 1489765610
> 1489765611
> Window:
> 1489765608
> 1489765609
> 1489765610
> 1489765611
> 1489765612
> Window:
> 1489765609
> 1489765610
> 1489765611
> 1489765612
> 1489765613
> Window:
> 1489765610
> 1489765611
> 1489765612
> 1489765613
> 1489765614
> Window:
> 1489765611
> 1489765612
> 1489765613
> 1489765614
> 1489765615
> 
> On Thu, Mar 16, 2017 at 2:34 PM, Dominik Safaric
>  wrote:
>> Hi all,
>> 
>> As I’ve implemented a streaming application pulling data from Kafka every 1
>> second (batch interval), I am observing some quite strange behaviour (didn’t
>> use Spark extensively in the past, but continuous operator based engines
>> instead of).
>> 
>> Namely the dstream.window(Seconds(60)) windowed stream when written back to
>> Kafka contains more messages then they were consumed (for debugging purposes
>> using a small dataset of a million Kafka byte array deserialized messages).
>> In particular, in total I’ve streamed exactly 1 million messages, whereas
>> upon window expiry 60 million messages are written back to Kafka.
>> 
>> I’ve read on the official docs that both the window and window slide
>> duration must be multiples of the batch interval. Does this mean that when
>> consuming messages between two windows every batch interval the RDDs of a
>> given batch interval t the same batch is being ingested 59 more times into
>> the windowed stream?
>> 
>> If I would like to achieve this behaviour (batch every being equal to a
>> second, window duration 60 seconds) - how might one achieve this?
>> 
>> I would appreciate if anyone could correct me if I got the internals of
>> Spark’s windowed operations wrong and elaborate a bit.
>> 
>> Thanks,
>> Dominik


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[Spark SQL & Core]: RDD to Dataset 1500 columns data with createDataFrame() throw exception of grows beyond 64 KB

2017-03-18 Thread elevy
Hello all, 
I am using the Spark 2.1.0 release,
I am trying to load BigTable CSV file with more than 1500 columns into our
system 

Our flow of doing that is:

•   First, read the data as an RDD 
•   generate continuing record id using the zipWithIndex()
(this operation exist only in RDD API, 
 in the Dataset there is similar option which is
monotonically_increasing_id() 
 but this method work in partitioning and create id which is not
sequentially – and it is not what we need ☹)
•   converting the RDD to Dataset using the createDataFrame() of
sparkSession 
•   this last operation generate code that exceeded the JVM object size
limitation of 64KB 

I search the web for some solution or even similar Use Case, 
found few issues that talked about the 64KB error but all of the cases was
dealing with 100 column and solved in Spark 2.1.0 version by shrinking the
generated code, 
but none of them reach the JVM limitation 
 
*Any Idea from this forum of expert will be very appreciated *
there could be 2 type of solution we are looking for:
*1.* How should I overcome the size of the code generation 
*OR* 
*2.* How can I generate sequential ID directly on the Dataset

Our Temporal Solution:

•   reading the DS as RDD
•   generate sequential id 
•   write the new data as text file 
•   reading the data as Dataset
this solution cause us 30% of performance degradation :(

*Code That reproduced the issue *

/import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import poc.commons.SparkSessionInitializer;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.IntStream;

public class RDDConverter {
private static final int FIELD_COUNT = 1900;

private Dataset createBigSchema(SparkSession sparkSession , int
startColName, int fieldNumber) {
JavaSparkContext jsc = new
JavaSparkContext(sparkSession.sparkContext());
SQLContext sqlContext = new SQLContext(sparkSession.sparkContext());

String[] row = IntStream.range(startColName,
fieldNumber).mapToObj(String::valueOf).toArray(String[]::new);
List data = Collections.singletonList(row);
JavaRDD rdd = jsc.parallelize(data).map(RowFactory::create);

StructField[] structFields = IntStream.range(startColName,
fieldNumber)
.mapToObj(i -> new StructField(String.valueOf(i),
DataTypes.StringType, true, Metadata.empty()))
.toArray(StructField[]::new);
StructType schema = DataTypes.createStructType(structFields);

Dataset dataSet = sqlContext.createDataFrame(rdd, schema);
dataSet.show();
return dataSet;
}

public static void main(String[] args) {
SparkSessionInitializer sparkSessionInitializer = new
SparkSessionInitializer();
SparkSession sparkSession = sparkSessionInitializer.init();

RDDConverter rddConverter = new RDDConverter();
rddConverter.createBigSchema(sparkSession, 0, FIELD_COUNT);
}
}/

The Exception we are getting :

org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:893)
at
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:950)
at
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:947)
at
org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
at
org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
... 39 common frames omitted
*Caused by: org.codehaus.janino.JaninoRuntimeException: Code of method
"(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass;[Ljava/lang/Object;)V"
of class
"org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection"
grows beyond 64 KB*
at org.codehaus.janino.CodeContext.makeSpace(CodeContext.java:941)




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Core-RDD-to-Dataset-1500-columns-data-with-createDataFrame-throw-exception-of-grows-beyondB-tp28509.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



SparkStreaming getActiveOrCreate

2017-03-18 Thread Justin Pihony
The docs on getActiveOrCreate makes it seem that you'll get an already
started context:

> Either return the "active" StreamingContext (that is, started but not
> stopped), or create a new StreamingContext that is

However as far as I can tell from the code it is strictly dependent on the
the implementation of the create function, and per the tests it is even
expected that the create function will return a non-started function. So
this makes for a bit awkward code that must check the state before starting
or not. Was this considered when this was added? I couldn't find anything
explicit

-Justin Pihony



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkStreaming-getActiveOrCreate-tp28508.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org