[Beginner][StructuredStreaming] Console sink is not working as expected

2018-05-22 Thread karthikjay
I have the following code to read and process Kafka data using Structured
Streaming 

  
object ETLTest {

  case class record(value: String, topic: String)

  def main(args: Array[String]): Unit = {
run();
  }

  def run(): Unit = {

val spark = SparkSession
  .builder
  .appName("Test JOB")
  .master("local[*]")
  .getOrCreate()

val kafkaStreamingDF = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "...")
  .option("subscribe", "...")
  .option("failOnDataLoss", "false")
  .option("startingOffsets","earliest")
  .load()
  .selectExpr("CAST(value as STRING)", "CAST(timestamp as
STRING)","CAST(topic as STRING)")

val sdvWriter = new ForeachWriter[record] {
  def open(partitionId: Long, version: Long): Boolean = {
true
  }
  def process(record: record) = {
println("record:: " + record)
  }
  def close(errorOrNull: Throwable): Unit = {}
}

val sdvDF = kafkaStreamingDF
  .as[record]
  .filter($"value".isNotNull)

// DOES NOT WORK
/*val query = sdvDF
.writeStream
.format("console")
.start()
.awaitTermination()*/

// WORKS
/*val query = sdvDF
  .writeStream
  .foreach(sdvWriter)
  .start()
  .awaitTermination()
  */

  }

}

I am running this code from IntellijIdea IDE and when I use the
foreach(sdvWriter), I could see the records consumed from Kafka, but when I
use .writeStream.format("console") I do not see any records. I assume that
the console write stream is maintaining some sort of checkpoint and assumes
it has processed all the records. Is that the case ? Am I missing something
obvious here ?




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



RE: spark sql in-clause problem

2018-05-22 Thread Shiva Prashanth Vallabhaneni
Assuming the list of values in the “IN” clause is small, you could try using

sparkSqlContext.sql(select * from mytable where key = 1 and ( (X,Y) = (1,2) OR 
(X,Y) = (3,4) )

Another solution could be to load the possible values for X & Y into a table 
and then using this table in the sub-query;

Table coordinates (
Integer X,
Integer Y
)

sparkSqlContext.sql(select * from mytable where key = 1 and (X,Y) IN (select X, 
Y from coordinates))

From: onmstester onmstester 
Sent: Wednesday, May 23, 2018 10:33 AM
To: user 
Subject: spark sql in-clause problem

I'm reading from this table in cassandra:
Table mytable (
Integer Key,
Integer X,
Interger Y

Using:
sparkSqlContext.sql(select * from mytable where key = 1 and (X,Y) in 
((1,2),(3,4)))

Encountered error:

StructType(StructField((X,IntegerType,true),StructField((Y,IntegerType,true)) 
!= 
StructType(StructField((X,IntegerType,false),StructField((Y,IntegerType,false))



Sent using Zoho 
Mail



Any comments or statements made in this email are not necessarily those of 
Tavant Technologies. The information transmitted is intended only for the 
person or entity to which it is addressed and may contain confidential and/or 
privileged material. If you have received this in error, please contact the 
sender and delete the material from any computer. All emails sent from or to 
Tavant Technologies may be subject to our monitoring procedures.


Re: testing frameworks

2018-05-22 Thread umargeek
Hi Steve,

you can try out pytest-spark plugin if your writing programs using pyspark
,please find below link for reference.

https://github.com/malexer/pytest-spark
  

Thanks,
Umar



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Alternative for numpy in Spark Mlib

2018-05-22 Thread umargeek
Hi Folks,

I am planning to rewrite one of my python module written for entropy
calculation using numpy into Spark Mlib so that it can be processed in
distributed manner.

Can you please advise on the possibilities of the same approach or any
alternatives.

Thanks,
Umar



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



spark sql in-clause problem

2018-05-22 Thread onmstester onmstester
I'm reading from this table in cassandra:

Table mytable (
Integer Key,

Integer X,

Interger Y



Using:

sparkSqlContext.sql(select * from mytable where key = 1 and (X,Y) in 
((1,2),(3,4)))



Encountered error: 



StructType(StructField((X,IntegerType,true),StructField((Y,IntegerType,true)) 
!= 
StructType(StructField((X,IntegerType,false),StructField((Y,IntegerType,false))





Sent using Zoho Mail







How to validate orc vectorization is working within spark application?

2018-05-22 Thread umargeek
Hi Folks,

I have enabled below listed configurations within my spark streaming
application but I did not gain performance benefit even after setting these
parameters ,can you please help me is there a way to validate whether
vectorization is working as expeced/enabled correctly !

Note: I am using Spark 2.3 and converted all the data within my application
in orc format.

sparkSqlCtx.setConf("spark.sql.orc.filterPushdown", "true")
sparkSqlCtx.setConf("spark.sql.orc.enabled", "true")
sparkSqlCtx.setConf("spark.sql.hive.convertMetastoreOrc", "true")
sparkSqlCtx.setConf("spark.sql.orc.char.enabled", "true")
sparkSqlCtx.setConf("spark.sql.orc.impl","native")
sparkSqlCtx.setConf("spark.sql.orc.enableVectorizedReader","true")

Thanks,
Umar



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: OOM: Structured Streaming aggregation state not cleaned up properly

2018-05-22 Thread Tathagata Das
Just to be clear, these screenshots are about the memory consumption of the
driver. So this is nothing to do with streaming aggregation state which are
kept in the memory of the executors, not the driver.

On Tue, May 22, 2018 at 10:21 AM, Jungtaek Lim  wrote:

> 1. Could you share your Spark version?
> 2. Could you reduce "spark.sql.ui.retainedExecutions" and see whether it
> helps? This configuration is available in 2.3.0, and default value is 1000.
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>
> 2018년 5월 22일 (화) 오후 4:29, weand 님이 작성:
>
>> You can see it even better on this screenshot:
>>
>> TOP Entries Collapsed #2
>> > file/t8542/27_001.png>
>>
>> Sorry for the spam, attached a not so perfect screen in the mail before.
>>
>>
>>
>> --
>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>


Re: Spark driver pod eviction Kubernetes

2018-05-22 Thread Anirudh Ramanathan
I think a pod disruption budget might actually work here. It can select the
spark driver pod using a label. Using that with a minAvailable value that's
appropriate here could do it.

In a more general sense, we do plan on some future work to support driver
recovery which should help long running jobs to restart without losing
progress.

On Tue, May 22, 2018, 7:55 AM purna pradeep  wrote:

> Hi,
>
> What would be the recommended approach to wait for spark driver pod to
> complete the currently running job before it gets evicted to new nodes
> while maintenance on the current node is goingon (kernel upgrade,hardware
> maintenance etc..) using drain command
>
> I don’t think I can use PoDisruptionBudget as Spark pods deployment
> yaml(s) is taken by Kubernetes
>
> Please suggest !
>
>
>


Re: [EXTERNAL] - Re: testing frameworks

2018-05-22 Thread Joel D
We’ve developed our own version of testing framework consisting of
different areas of checking, sometimes providing expected data and
comparing with the resultant data from the data object.

Cheers.

On Tue, May 22, 2018 at 1:48 PM Steve Pruitt  wrote:

> Something more on the lines of integration I believe.  Run one or more
> Spark jobs and verify the output results.  If this makes sense.
>
>
>
> I am very new to the world of Spark.  We want to include pipeline testing
> from the get go.  I will check out spark-testing-base.
>
>
>
>
>
> Thanks.
>
>
>
> *From:* Holden Karau [mailto:hol...@pigscanfly.ca]
> *Sent:* Monday, May 21, 2018 11:32 AM
> *To:* Steve Pruitt 
> *Cc:* user@spark.apache.org
> *Subject:* [EXTERNAL] - Re: testing frameworks
>
>
>
> So I’m biased as the author of spark-testing-base but I think it’s pretty
> ok. Are you looking for unit or integration or something else?
>
>
>
> On Mon, May 21, 2018 at 5:24 AM Steve Pruitt  wrote:
>
> Hi,
>
>
>
> Can anyone recommend testing frameworks suitable for Spark jobs.
> Something that can be integrated into a CI tool would be great.
>
>
>
> Thanks.
>
>
>
> --
>
> Twitter: https://twitter.com/holdenkarau
> 
>


Re: Spark UNEVENLY distributing data

2018-05-22 Thread Saad Mufti
I think TableInputFormat will try to maintain as much locality as possible,
assigning one Spark partition per region and trying to assign that
partition to a YARN container/executor on the same node (assuming you're
using Spark over YARN). So the reason for the uneven distribution could be
that your HBase is not balanced to begin with and has too many regions on
the same region server corresponding to your largest bar. It all depends on
what HBase balancer you have configured and tuned. Assuming that is
properly configured, try to balance your HBase cluster before running the
Spark job. Tere are command s in hbase shell to do it manually if required.

Hope this helps.


Saad


On Sat, May 19, 2018 at 6:40 PM, Alchemist 
wrote:

> I am trying to parallelize a simple Spark program processes HBASE data in
> parallel.
>
> // Get Hbase RDD
> JavaPairRDD hBaseRDD = jsc
> .newAPIHadoopRDD(conf, TableInputFormat.class,
> ImmutableBytesWritable.class, Result.class);
> long count = hBaseRDD.count();
>
> Only two lines I see in the logs.  Zookeeper starts and Zookeeper stops
>
>
> The problem is my program is as SLOW as the largest bar. Found that ZK is 
> taking long time before shutting.
> 18/05/19 17:26:55 INFO zookeeper.ClientCnxn: Session establishment complete 
> on server :2181, sessionid = 0x163662b64eb046d, negotiated timeout = 4 
> 18/05/19
> 17:38:00 INFO zookeeper.ZooKeeper: Session: 0x163662b64eb046d closed
>
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>


RE: [EXTERNAL] - Re: testing frameworks

2018-05-22 Thread Steve Pruitt
Something more on the lines of integration I believe.  Run one or more Spark 
jobs and verify the output results.  If this makes sense.

I am very new to the world of Spark.  We want to include pipeline testing from 
the get go.  I will check out spark-testing-base.


Thanks.

From: Holden Karau [mailto:hol...@pigscanfly.ca]
Sent: Monday, May 21, 2018 11:32 AM
To: Steve Pruitt 
Cc: user@spark.apache.org
Subject: [EXTERNAL] - Re: testing frameworks

So I’m biased as the author of spark-testing-base but I think it’s pretty ok. 
Are you looking for unit or integration or something else?

On Mon, May 21, 2018 at 5:24 AM Steve Pruitt 
> wrote:
Hi,

Can anyone recommend testing frameworks suitable for Spark jobs.  Something 
that can be integrated into a CI tool would be great.

Thanks.

--
Twitter: 
https://twitter.com/holdenkarau


Re: Encounter 'Could not find or load main class' error when submitting spark job on kubernetes

2018-05-22 Thread Marcelo Vanzin
On Tue, May 22, 2018 at 12:45 AM, Makoto Hashimoto
 wrote:
> local:///usr/local/oss/spark-2.3.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.3.0.jar

Is that the path of the jar inside your docker image? The default
image puts that in /opt/spark IIRC.

-- 
Marcelo

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark driver pod eviction Kubernetes

2018-05-22 Thread purna pradeep
Hi,

What would be the recommended approach to wait for spark driver pod to
complete the currently running job before it gets evicted to new nodes
while maintenance on the current node is goingon (kernel upgrade,hardware
maintenance etc..) using drain command

I don’t think I can use PoDisruptionBudget as Spark pods deployment yaml(s)
is taken by Kubernetes

Please suggest !


Re: [structured-streaming]How to reset Kafka offset in readStream and read from beginning

2018-05-22 Thread Bowden, Chris
You can delete the write ahead log directory you provided to the sink via the 
“checkpointLocation” option.

From: karthikjay 
Sent: Tuesday, May 22, 2018 7:24:45 AM
To: user@spark.apache.org
Subject: [structured-streaming]How to reset Kafka offset in readStream and read 
from beginning

I have the following readstream in Spark structured streaming reading data
from Kafka

val kafkaStreamingDF = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "...")
  .option("subscribe", "testtopic")
  .option("failOnDataLoss", "false")
  .option("startingOffsets","earliest")
  .load()
  .selectExpr("CAST(value as STRING)", "CAST(topic as STRING)")

As far as I know, every time I start the job, underneath the covers, Spark
created new consumer, new consumer group and retrieves the last successful
offset for the job(using the job name ?) and seeks to that offset and start
reading from there. Is that the case ? If yes, how do I reset the offset to
start and force my job to read from beginning ?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[structured-streaming]How to reset Kafka offset in readStream and read from beginning

2018-05-22 Thread karthikjay
I have the following readstream in Spark structured streaming reading data
from Kafka

val kafkaStreamingDF = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "...")
  .option("subscribe", "testtopic")
  .option("failOnDataLoss", "false")
  .option("startingOffsets","earliest")
  .load()
  .selectExpr("CAST(value as STRING)", "CAST(topic as STRING)")

As far as I know, every time I start the job, underneath the covers, Spark
created new consumer, new consumer group and retrieves the last successful
offset for the job(using the job name ?) and seeks to that offset and start
reading from there. Is that the case ? If yes, how do I reset the offset to
start and force my job to read from beginning ? 



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Worker Re-register to Master

2018-05-22 Thread sushil.chaudhary
Can anyone please have a look and put thoughts here..



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



problem with saving RandomForestClassifier model - Saprk Java

2018-05-22 Thread Donni Khan
Hi SPark users,

I built Random forest model by using Spark 1.6 with Java. I'm getting the
following exception:

User class threw exception: java.lang.UnsupportedOperationException:
Pipeline write will fail on this Pipeline because it contains a stage which
does not implement Writable.


Does anyone know how I can fix it?

Many thanks,
Donni


Re: OOM: Structured Streaming aggregation state not cleaned up properly

2018-05-22 Thread Jungtaek Lim
1. Could you share your Spark version?
2. Could you reduce "spark.sql.ui.retainedExecutions" and see whether it
helps? This configuration is available in 2.3.0, and default value is 1000.

Thanks,
Jungtaek Lim (HeartSaVioR)

2018년 5월 22일 (화) 오후 4:29, weand 님이 작성:

> You can see it even better on this screenshot:
>
> TOP Entries Collapsed #2
> 
>
>
> Sorry for the spam, attached a not so perfect screen in the mail before.
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Encounter 'Could not find or load main class' error when submitting spark job on kubernetes

2018-05-22 Thread Makoto Hashimoto
Hi,

I am trying to run spark job on kubernetes. Using local spark job works

fine as follows:
$ ./bin/spark-submit --class org.apache.spark.examples.SparkPi
--master local[4] examples/jars/spark-examples_2.11-2.3.0.jar 100
..
2018-05-20 21:49:02 INFO  DAGScheduler:54 - Job 0 finished: reduce at
SparkPi.scala:38, took 2.459637 s
Pi is roughly 3.1418607141860715
2018-05-20 21:49:02 INFO  AbstractConnector:318 - Stopped
Spark@41bb8c78{HTTP/1.1,[http/1.1]}{localhost:4040}
2018-05-20 21:49:02 INFO  SparkUI:54 - Stopped Spark web UI at
http://localhost:4040
2018-05-20 21:49:02 INFO  MapOutputTrackerMasterEndpoint:54 -
MapOutputTrackerMasterEndpoint stopped!
2018-05-20 21:49:02 INFO  MemoryStore:54 - MemoryStore cleared
2018-05-20 21:49:02 INFO  BlockManager:54 - BlockManager stopped
2018-05-20 21:49:02 INFO  BlockManagerMaster:54 - BlockManagerMaster stopped
2018-05-20 21:49:02 INFO
OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:54 -
OutputCommitCoordinator stopped!
2018-05-20 21:49:02 INFO  SparkContext:54 - Successfully stopped SparkContext
2018-05-20 21:49:02 INFO  ShutdownHookManager:54 - Shutdown hook called
2018-05-20 21:49:02 INFO  ShutdownHookManager:54 - Deleting directory
/tmp/spark-ad68e56c-7991-4c6c-b3c5-99ab481a1449
2018-05-20 21:49:02 INFO  ShutdownHookManager:54 - Deleting directory
/tmp/spark-bbcce77f-70a4-4ec1-ad05-e8819fd3ba7a

When I submitted spark job on kubernetes, it ended with error.

$ bin/spark-submit --master k8s://https://192.168.99.100:8443
--deploy-mode cluster --name spark-pi --class
org.apache.spark.examples.SparkPi --conf spark.executor.instances=5
 --conf spark.kubernetes.container.image=tokoma1/spark:1.0 --conf
spark.kubernetes.driver.pod.name=spark-pi-driver
local:///usr/local/oss/spark-2.3.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.3.0.jar
100
...
 Container name: spark-kubernetes-driver
 Container image: tokoma1/spark:1.0
 Container state: Terminated
 Exit code: 1
2018-05-20 21:59:02 INFO  Client:54 - Application spark-pi finished.
2018-05-20 21:59:02 INFO  ShutdownHookManager:54 - Shutdown hook called
2018-05-20 21:59:02 INFO  ShutdownHookManager:54 - Deleting directory
/tmp/spark-485f73a5-7416-4caa-acb2-49b0bde5eb80

I checked the status of the pod as follows:

$ kubectl get pods

NAME  READY STATUSRESTARTS   AGE
spark-pi-driver   0/1   Error 0  1m

This means it ended with an error.

I checked log.

$ kubectl -n=default logs -f spark-pi-driver++ id -u
+ myuid=0
++ id -g
+ mygid=0
++ getent passwd 0
+ uidentry=root:x:0:0:root:/root:/bin/ash
+ '[' -z root:x:0:0:root:/root:/bin/ash ']'
+ SPARK_K8S_CMD=driver
+ '[' -z driver ']'
+ shift 1
+ SPARK_CLASSPATH=':/opt/spark/jars/*'
+ env
+ grep SPARK_JAVA_OPT_
+ sed 's/[^=]*=\(.*\)/\1/g'
+ readarray -t SPARK_JAVA_OPTS
+ '[' -n 
/usr/local/oss/spark-2.3.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.3.0.jar:/usr/local/oss/spark-2.3.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.3.0.jar
']'
+ 
SPARK_CLASSPATH=':/opt/spark/jars/*:/usr/local/oss/spark-2.3.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.3.0.jar:/usr/local/oss/spark-2.3.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.3.0.jar'
+ '[' -n '' ']'
+ case "$SPARK_K8S_CMD" in
+ CMD=(${JAVA_HOME}/bin/java "${SPARK_JAVA_OPTS[@]}" -cp
"$SPARK_CLASSPATH" -Xms$SPARK_DRIVER_MEMORY -Xmx$SPARK_DRIVER_MEMORY
-Dspark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS
$SPARK_DRIVER_CLASS $SPARK_DRIVER_ARGS)
+ exec /sbin/tini -s -- /usr/lib/jvm/java-1.8-openjdk/bin/java
-Dspark.driver.port=7078
-Dspark.master=k8s://https://192.168.99.100:8443
-Dspark.kubernetes.driver.pod.name=spark-pi-driver
-Dspark.driver.blockManager.port=7079
-Dspark.kubernetes.container.image=tokoma1/spark:1.0
-Dspark.jars=/usr/local/oss/spark-2.3.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.3.0.jar,/usr/local/oss/spark-2.3.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.3.0.jar
-Dspark.app.name=spark-pi
-Dspark.app.id=spark-9762ba052680404a9220f451d99ba818
-Dspark.submit.deployMode=cluster -Dspark.executor.instances=5
-Dspark.kubernetes.executor.podNamePrefix=spark-pi-01f873a813323a4a85eb7a2464949141
-Dspark.driver.host=spark-pi-01f873a813323a4a85eb7a2464949141-driver-svc.default.svc
-cp 
':/opt/spark/jars/*:/usr/local/oss/spark-2.3.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.3.0.jar:/usr/local/oss/spark-2.3.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.3.0.jar'
-Xms1g -Xmx1g -Dspark.driver.bindAddress=172.17.0.4
org.apache.spark.examples.SparkPi 100
Error: Could not find or load main class org.apache.spark.examples.SparkPi

Does anybody encountered the same error as I experienced and know the
resolution ?

Thanks,


Re: OOM: Structured Streaming aggregation state not cleaned up properly

2018-05-22 Thread weand
You can see it even better on this screenshot:

TOP Entries Collapsed #2 
 

Sorry for the spam, attached a not so perfect screen in the mail before.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: OOM: Structured Streaming aggregation state not cleaned up properly

2018-05-22 Thread weand
Instances of org.apache.spark.sql.execution.ui.SparkPlanGraphWrapper are not
cleaned up, see TOP Entries Collapsed #2:

TOP Entries All
 

TOP Entries Collapsed #1
 

TOP Entries Collapsed #2
 



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org