Re: --jars from spark-submit on master on YARN don't get added properly to the executors - ClassNotFoundException

2017-08-09 Thread Mikhailau, Alex
Thanks, Marcelo. Will give it a shot tomorrow.

-Alex

On 8/9/17, 5:59 PM, "Marcelo Vanzin"  wrote:

Jars distributed using --jars are not added to the system classpath,
so log4j cannot see them.

To work around that, you need to manually add the *name* jar to the
driver executor classpaths:

spark.driver.extraClassPath=some.jar
spark.executor.extraClassPath=some.jar

In client mode you should use spark.yarn.dist.jars instead of --jars,
and change the driver classpath above to point to the local copy of
the jar.


On Wed, Aug 9, 2017 at 2:52 PM, Mikhailau, Alex  
wrote:
> I have log4j json layout jars added via spark-submit on EMR
>
>
>
> /usr/lib/spark/bin/spark-submit --deploy-mode cluster --master yarn --jars
> 
/home/hadoop/lib/jsonevent-layout-1.7.jar,/home/hadoop/lib/json-smart-1.1.1.jar
> --driver-java-options "-XX:+AlwaysPreTouch -XX:MaxPermSize=6G" --class
> com.mlbam.emr.XXX  s3://xxx/aa/jars/ spark-job-assembly-1.4.1-SNAPSHOT.jar
> ActionOnFailure=CONTINUE
>
>
>
>
>
> this is the process running on the executor:
>
>
>
> /usr/lib/jvm/java-1.8.0/bin/java -server -Xmx8192m -XX:+AlwaysPreTouch
> -XX:MaxPermSize=6G
> 
-Djava.io.tmpdir=/mnt/yarn/usercache/hadoop/appcache/application_1502310393755_0003/container_1502310393755_0003_01_05/tmp
> -Dspark.driver.port=32869 -Dspark.history.ui.port=18080 -Dspark.ui.port=0
> 
-Dspark.yarn.app.container.log.dir=/var/log/hadoop-yarn/containers/application_1502310393755_0003/container_1502310393755_0003_01_05
> -XX:OnOutOfMemoryError=kill %p
> org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url
> spark://CoarseGrainedScheduler@10.202.138.158:32869 --executor-id 3
> --hostname ip-10-202-138-98.mlbam.qa.us-east-1.bamgrid.net --cores 8
> --app-id application_1502310393755_0003 --user-class-path
> 
file:/mnt/yarn/usercache/hadoop/appcache/application_1502310393755_0003/container_1502310393755_0003_01_05/__app__.jar
> --user-class-path
> 
file:/mnt/yarn/usercache/hadoop/appcache/application_1502310393755_0003/container_1502310393755_0003_01_05/jsonevent-layout-1.7.jar
> --user-class-path
> 
file:/mnt/yarn/usercache/hadoop/appcache/application_1502310393755_0003/container_1502310393755_0003_01_05/json-smart-1.1.1.jar
>
>
>
> I see that jsonevent-layout-1.7.jar is passed as –user-class-path to the 
job
> (see the above process), yet, I see the following log exception in my
> stderr:
>
>
>
> log4j:ERROR Could not instantiate class
> [net.logstash.log4j.JSONEventLayoutV1].
>
> java.lang.ClassNotFoundException: net.logstash.log4j.JSONEventLayoutV1
>
>
>
>
>
> Am I doing something wrong?
>
>
>
> Thank you,
>
>
>
> Alex



-- 
Marcelo



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: --jars from spark-submit on master on YARN don't get added properly to the executors - ClassNotFoundException

2017-08-09 Thread Marcelo Vanzin
Jars distributed using --jars are not added to the system classpath,
so log4j cannot see them.

To work around that, you need to manually add the *name* jar to the
driver executor classpaths:

spark.driver.extraClassPath=some.jar
spark.executor.extraClassPath=some.jar

In client mode you should use spark.yarn.dist.jars instead of --jars,
and change the driver classpath above to point to the local copy of
the jar.


On Wed, Aug 9, 2017 at 2:52 PM, Mikhailau, Alex  wrote:
> I have log4j json layout jars added via spark-submit on EMR
>
>
>
> /usr/lib/spark/bin/spark-submit --deploy-mode cluster --master yarn --jars
> /home/hadoop/lib/jsonevent-layout-1.7.jar,/home/hadoop/lib/json-smart-1.1.1.jar
> --driver-java-options "-XX:+AlwaysPreTouch -XX:MaxPermSize=6G" --class
> com.mlbam.emr.XXX  s3://xxx/aa/jars/ spark-job-assembly-1.4.1-SNAPSHOT.jar
> ActionOnFailure=CONTINUE
>
>
>
>
>
> this is the process running on the executor:
>
>
>
> /usr/lib/jvm/java-1.8.0/bin/java -server -Xmx8192m -XX:+AlwaysPreTouch
> -XX:MaxPermSize=6G
> -Djava.io.tmpdir=/mnt/yarn/usercache/hadoop/appcache/application_1502310393755_0003/container_1502310393755_0003_01_05/tmp
> -Dspark.driver.port=32869 -Dspark.history.ui.port=18080 -Dspark.ui.port=0
> -Dspark.yarn.app.container.log.dir=/var/log/hadoop-yarn/containers/application_1502310393755_0003/container_1502310393755_0003_01_05
> -XX:OnOutOfMemoryError=kill %p
> org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url
> spark://CoarseGrainedScheduler@10.202.138.158:32869 --executor-id 3
> --hostname ip-10-202-138-98.mlbam.qa.us-east-1.bamgrid.net --cores 8
> --app-id application_1502310393755_0003 --user-class-path
> file:/mnt/yarn/usercache/hadoop/appcache/application_1502310393755_0003/container_1502310393755_0003_01_05/__app__.jar
> --user-class-path
> file:/mnt/yarn/usercache/hadoop/appcache/application_1502310393755_0003/container_1502310393755_0003_01_05/jsonevent-layout-1.7.jar
> --user-class-path
> file:/mnt/yarn/usercache/hadoop/appcache/application_1502310393755_0003/container_1502310393755_0003_01_05/json-smart-1.1.1.jar
>
>
>
> I see that jsonevent-layout-1.7.jar is passed as –user-class-path to the job
> (see the above process), yet, I see the following log exception in my
> stderr:
>
>
>
> log4j:ERROR Could not instantiate class
> [net.logstash.log4j.JSONEventLayoutV1].
>
> java.lang.ClassNotFoundException: net.logstash.log4j.JSONEventLayoutV1
>
>
>
>
>
> Am I doing something wrong?
>
>
>
> Thank you,
>
>
>
> Alex



-- 
Marcelo

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



--jars from spark-submit on master on YARN don't get added properly to the executors - ClassNotFoundException

2017-08-09 Thread Mikhailau, Alex
I have log4j json layout jars added via spark-submit on EMR

/usr/lib/spark/bin/spark-submit --deploy-mode cluster --master yarn --jars 
/home/hadoop/lib/jsonevent-layout-1.7.jar,/home/hadoop/lib/json-smart-1.1.1.jar 
--driver-java-options "-XX:+AlwaysPreTouch -XX:MaxPermSize=6G" --class 
com.mlbam.emr.XXX  s3://xxx/aa/jars/ spark-job-assembly-1.4.1-SNAPSHOT.jar 
ActionOnFailure=CONTINUE


this is the process running on the executor:

/usr/lib/jvm/java-1.8.0/bin/java -server -Xmx8192m -XX:+AlwaysPreTouch 
-XX:MaxPermSize=6G 
-Djava.io.tmpdir=/mnt/yarn/usercache/hadoop/appcache/application_1502310393755_0003/container_1502310393755_0003_01_05/tmp
 -Dspark.driver.port=32869 -Dspark.history.ui.port=18080 -Dspark.ui.port=0 
-Dspark.yarn.app.container.log.dir=/var/log/hadoop-yarn/containers/application_1502310393755_0003/container_1502310393755_0003_01_05
 -XX:OnOutOfMemoryError=kill %p 
org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url 
spark://CoarseGrainedScheduler@10.202.138.158:32869 --executor-id 3 --hostname 
ip-10-202-138-98.mlbam.qa.us-east-1.bamgrid.net --cores 8 --app-id 
application_1502310393755_0003 --user-class-path 
file:/mnt/yarn/usercache/hadoop/appcache/application_1502310393755_0003/container_1502310393755_0003_01_05/__app__.jar
 --user-class-path 
file:/mnt/yarn/usercache/hadoop/appcache/application_1502310393755_0003/container_1502310393755_0003_01_05/jsonevent-layout-1.7.jar
 --user-class-path 
file:/mnt/yarn/usercache/hadoop/appcache/application_1502310393755_0003/container_1502310393755_0003_01_05/json-smart-1.1.1.jar

I see that jsonevent-layout-1.7.jar is passed as –user-class-path to the job 
(see the above process), yet, I see the following log exception in my stderr:

log4j:ERROR Could not instantiate class [net.logstash.log4j.JSONEventLayoutV1].
java.lang.ClassNotFoundException: net.logstash.log4j.JSONEventLayoutV1


Am I doing something wrong?

Thank you,

Alex


Anyone has come across incorta that relies on Spark,Parquet and open source ibraries.

2017-08-09 Thread Mich Talebzadeh
Hi,

There is a tool called incorta that uses Spark, Parquet and open source big
data analytics libraries.
Its aim is to accelerate Analytics. It claims that it incorporates Direct
Data Mapping to deliver near real-time analytics on top of original,
intricate, transactional data such as ERP systems. Direct Data Mapping
executes real-time joins with aggregations. It is designed to eradicate
cumbersome, time-consuming ETL routines, dimensional data stores and
traditional OLAP semantic layers.

So a lot of talk but very little light. It claims that there is no need for
star schema and other DW design schemes. So I was wondering anyone has come
across it?


Some stuff here

http://www.jenunderwood.com/2017/04/11/accelerating-analytics-incorta-direct-data-mapping/


Thanks,

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


Re: KafkaUtils.createRDD , How do I read all the data from kafka in a batch program for a given topic?

2017-08-09 Thread Cody Koeninger
org.apache.spark.streaming.kafka.KafkaCluster has methods
getLatestLeaderOffsets and getEarliestLeaderOffsets

On Mon, Aug 7, 2017 at 11:37 PM, shyla deshpande
 wrote:
> Thanks TD.
>
> On Mon, Aug 7, 2017 at 8:59 PM, Tathagata Das 
> wrote:
>>
>> I dont think there is any easier way.
>>
>> On Mon, Aug 7, 2017 at 7:32 PM, shyla deshpande 
>> wrote:
>>>
>>> Thanks TD for the response. I forgot to mention that I am not using
>>> structured streaming.
>>>
>>> I was looking into KafkaUtils.createRDD, and looks like I need to get the
>>> earliest and the latest offset for each partition to build the
>>> Array(offsetRange). I wanted to know if there was a easier way.
>>>
>>> 1 reason why we are hesitating to use structured streaming is because I
>>> need to persist the data in Cassandra database which I believe is not
>>> production ready.
>>>
>>>
>>> On Mon, Aug 7, 2017 at 6:11 PM, Tathagata Das
>>>  wrote:

 Its best to use DataFrames. You can read from as streaming or as batch.
 More details here.


 https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#creating-a-kafka-source-for-batch-queries

 https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html

 On Mon, Aug 7, 2017 at 6:03 PM, shyla deshpande
  wrote:
>
> Hi all,
>
> What is the easiest way to read all the data from kafka in a batch
> program for a given topic?
> I have 10 kafka partitions, but the data is not much. I would like to
> read  from the earliest from all the partitions for a topic.
>
> I appreciate any help. Thanks


>>>
>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark SVD benchmark for dense matrices

2017-08-09 Thread Jose Francisco Saray Villamizar
Hi everyone,

I am trying to invert a 5000 x 5000 Dense Matrix (99% non-zeros), by using
SVD with an approach simmilar to :

https://stackoverflow.com/questions/29969521/how-to-compute-the-inverse-of-a-rowmatrix-in-apache-spark

The time Im getting with SVD is close to 10 minutes what is very long for
me.

A benchmark for SVD is already given here

https://databricks.com/blog/2014/07/21/distributing-the-singular-value-decomposition-with-spark.html

However, it seems they are using sparse matrices, thats why they get short
times.
Have anyone of you try to perform a SVD on a very dense big matrix . ?

Is this time normal ?

Thank you.

-- 
-- 
Buen dia, alegria !!
José Francisco Saray Villamizar
cel +33 6 13710693
Lyon, France


StreamingQueryListner spark structered Streaming

2017-08-09 Thread purna pradeep
Im working on structered streaming application wherein im reading from
Kafka as stream and for each batch of streams i need to perform S3 lookup
file (which is nearly 200gb) to fetch some attributes .So im using
df.persist() (basically caching the lookup) but i need to refresh the
dataframe as the S3 lookup data changes frequently.im using below code


class RefreshcachedDF(sparkSession: SparkSession) extends
StreamingQueryListener {

override def onQueryStarted(event:
org.apache.spark.sql.streaming.StreamingQueryListener.QueryStartedEvent):
Unit = {}
override def onQueryTerminated(event:
org.apache.spark.sql.streaming.StreamingQueryListener.QueryTerminatedEvent):
Unit = {}

override def onQueryProgress(event:
StreamingQueryListener.QueryProgressEvent): Unit = {
   val currTime = System.currentTimeMillis()
   if (currTime > (latestrefreshtime mentioned in a
globaltempview)) {
  //oldDF is a cached Dataframe created from GlobalTempView
which is of size 150GB.
  oldDF.unpersist() //I guess this is async call ,should i use
unpersist(true) which is blocking?and is it safe ?
  val inputDf: DataFrame = readFile(spec, sparkSession)
  val recreateddf = inputDf.persist()
  val count = recreateddf.count()
  }

  }
}
  }


Is the above approach is a better solution to refresh cached dataframe? and
the trigger for this refresh is will store the expirydate of cache for S3
in a globaltempview .

Note:S3 is one lookup source but i do have other sources which has data
size of 20 to 30 GB

 - So the question is this the right place to refresh the cached df ?
 - if yes should i use blocking or non-blocking unpersist method as the
data is huge 15GB?
 - For similar issue i see below response from Tdas with subject as Re:
Refreshing a persisted RDD

"Yes, you will have to recreate the streaming Dataframe along with the
static Dataframe, and restart the query. There isnt a currently
feasible to
do this without a query restart. But restarting a query WITHOUT
restarting
the whole application + spark cluster, is reasonably fast. If your
applicatoin can tolerate 10 second latencies, then stopping and
restarting
a query within the same Spark application is a reasonable solution."

[http://mail-archives.apache.org/mod_mbox/spark-user/201705.mbox/browser]


  [1]: http://SparkMailingList

So if thats better solution should i restart query as below

query.processAllavaialble()
query.stop()
df.unpersist()
val inputDf: DataFrame = readFile(spec, sparkSession) //read file from S3
or anyother source
val recreateddf = inputDf.persist()
query.start()


when i looked into spark documentation of above methods
void processAllAvailable() ///documentation says This method is intended
for testing///
Blocks until all available data in the source has been processed and
committed to the sink. This method is intended for testing. Note that in
the case of continually arriving data, this method may block forever.
Additionally, this method is only guaranteed to block until data that has
been synchronously appended data to a Source prior to invocation. (i.e.
getOffset must immediately reflect the addition).

stop()
Stops the execution of this query if it is running. This method blocks
until the threads performing execution has stopped.

https://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/sql/streaming/StreamingQuery.html#processAllAvailable()

Please suggest a better approach to refresh the cache.


Re[4]: Trying to connect Spark 1.6 to Hive

2017-08-09 Thread toletum
Yes... I know... but
The cluster is not administered by me
On Mié., Ago. 9, 2017 at 13:46, Gourav Sengupta  wrote: 
Hi,

Just out of sheer curiosity - why are you using SPARK 1.6? Since then SPARK has 
made significant advancement and improvement, why not take advantage of  that?
Regards,
Gourav
On Wed, Aug 9, 2017 at 10:41 AM, toletum  wrote:

Thanks Matteo
I fixed it
Regards,
JCS
On Mié., Ago. 9, 2017 at 11:22, Matteo Cossu  wrote:

 Hello,
try to use these options when starting Spark:
--conf "spark.driver.userClassPathFirst=true" --conf 
"spark.executor.userClassPathFirst=true"  

In this way you will be sure that the executor and the driver of Spark will use 
the classpath you define.

Best Regards,
Matteo Cossu
On 5 August 2017 at 23:04, toletum  wrote:
Hi everybody
I'm trying to connect Spark to Hive. 
Hive uses Derby Server for metastore_db. 
$SPARK_HOME/conf/hive-site.xml
  javax.jdo.option.ConnectionURL
  jdbc:derby://derby:1527/metastore_db;create=true
  JDBC connect string for a JDBC metastore
  javax.jdo.option.ConnectionDriverName
  org.apache.derby.jdbc.ClientDriver
  Driver class name for a JDBC metastore
I have copied to $SPARK_HOME/lib derby.jar, derbyclient.jar, derbytools.jar
Added to CLASSPATH the 3 jars too
$SPARK_HOMElib/derby.jar:$SPARK_HOME/lib/derbytools.jar:$SPARK_HOME/lib/derbyclient.jar
But spark-sql saids:
org.datanucleus.store.rdbms.co 
(http://org.datanucleus.store.rdbms.co)nnectionpool.DatastoreDriverNotFoundException:
 The specified datastore driver ("org.apache.derby.jdbc.ClientDriver") was not 
found in the CLASSPATH. Please check your CLASSPATH specification, and the name 
of the driver.
java finds the class
java org.apache.derby.jdbc.ClientDriver
Error: Main method not found in class org.apache.derby.jdbc.ClientDriver, 
please define the main method as:
   public static void main(String[] args)
or a JavaFX application class must extend javafx.application.Application
It seems Spark can't find the driver


Re: Re[2]: Trying to connect Spark 1.6 to Hive

2017-08-09 Thread Gourav Sengupta
Hi,

Just out of sheer curiosity - why are you using SPARK 1.6? Since then SPARK
has made significant advancement and improvement, why not take advantage
of  that?


Regards,
Gourav

On Wed, Aug 9, 2017 at 10:41 AM, toletum  wrote:

> Thanks Matteo
>
> I fixed it
>
> Regards,
> JCS
>
> On Mié., Ago. 9, 2017 at 11:22, Matteo Cossu  wrote:
>
> Hello,
> try to use these options when starting Spark:
>
> *--conf "spark.driver.userClassPathFirst=true" --conf
> "spark.executor.userClassPathFirst=true"  *
> In this way you will be sure that the executor and the driver of Spark
> will use the classpath you define.
>
> Best Regards,
> Matteo Cossu
>
>
> On 5 August 2017 at 23:04, toletum  wrote:
>
> Hi everybody
>
> I'm trying to connect Spark to Hive.
>
> Hive uses Derby Server for metastore_db.
>
> $SPARK_HOME/conf/hive-site.xml
>
> 
> 
>   javax.jdo.option.ConnectionURL
>   jdbc:derby://derby:1527/metastore_db;create=true
>   JDBC connect string for a JDBC metastore
> 
>
> 
>   javax.jdo.option.ConnectionDriverName
>   org.apache.derby.jdbc.ClientDriver
>   Driver class name for a JDBC metastore
> 
> 
>
> I have copied to $SPARK_HOME/lib derby.jar, derbyclient.jar, derbytools.jar
>
> Added to CLASSPATH the 3 jars too
>
> $SPARK_HOMElib/derby.jar:$SPARK_HOME/lib/derbytools.jar:$
> SPARK_HOME/lib/derbyclient.jar
>
> But spark-sql saids:
>
> org.datanucleus.store.rdbms.connectionpool.DatastoreDriverNotFoundException:
> The specified datastore driver ("org.apache.derby.jdbc.ClientDriver") was
> not found in the CLASSPATH. Please check your CLASSPATH specification, and
> the name of the driver.
>
> java finds the class
>
> java org.apache.derby.jdbc.ClientDriver
> Error: Main method not found in class org.apache.derby.jdbc.ClientDriver,
> please define the main method as:
>public static void main(String[] args)
> or a JavaFX application class must extend javafx.application.Application
>
> It seems Spark can't find the driver
>
>
>
>
>


Re[2]: Trying to connect Spark 1.6 to Hive

2017-08-09 Thread toletum
Thanks Matteo
I fixed it
Regards,
JCS
On Mié., Ago. 9, 2017 at 11:22, Matteo Cossu  wrote: Hello,
try to use these options when starting Spark:
--conf "spark.driver.userClassPathFirst=true" --conf 
"spark.executor.userClassPathFirst=true"  

In this way you will be sure that the executor and the driver of Spark will use 
the classpath you define.

Best Regards,
Matteo Cossu
On 5 August 2017 at 23:04, toletum  wrote:
Hi everybody
I'm trying to connect Spark to Hive. 
Hive uses Derby Server for metastore_db. 
$SPARK_HOME/conf/hive-site.xml
  javax.jdo.option.ConnectionURL
  jdbc:derby://derby:1527/metastore_db;create=true
  JDBC connect string for a JDBC metastore
  javax.jdo.option.ConnectionDriverName
  org.apache.derby.jdbc.ClientDriver
  Driver class name for a JDBC metastore
I have copied to $SPARK_HOME/lib derby.jar, derbyclient.jar, derbytools.jar
Added to CLASSPATH the 3 jars too
$SPARK_HOMElib/derby.jar:$SPARK_HOME/lib/derbytools.jar:$SPARK_HOME/lib/derbyclient.jar
But spark-sql saids:
org.datanucleus.store.rdbms.connectionpool.DatastoreDriverNotFoundException: 
The specified datastore driver ("org.apache.derby.jdbc.ClientDriver") was not 
found in the CLASSPATH. Please check your CLASSPATH specification, and the name 
of the driver.
java finds the class
java org.apache.derby.jdbc.ClientDriver
Error: Main method not found in class org.apache.derby.jdbc.ClientDriver, 
please define the main method as:
   public static void main(String[] args)
or a JavaFX application class must extend javafx.application.Application
It seems Spark can't find the driver


回复:Re: Re: Is there an operation to create multi record for every element in a RDD?

2017-08-09 Thread luohui20001
Thank you guys, I got my code worked like below:val record75df = 
sc.parallelize(listForRule75, numPartitions).map(x=> x.replace("|", 
",")).map(_.split(",")).map(x => 
Mycaseclass4(x(0).toInt,x(1).toInt,x(2).toFloat,x(3).toInt)).toDF()
val userids = 1 to 1
val uiddf = sc.parallelize(userids, numPartitions).toDF("userid")
record75df.registerTempTable("b")
uiddf.registerTempTable("a")
val rule75df = sqlContext.sql("select a.*,b.* from a join b")
rule75df.show




 

ThanksBest regards!
San.Luo

- 原始邮件 -
发件人:Ryan 
收件人:ayan guha 
抄送人:Riccardo Ferrari , luohui20...@sina.com, user 

主题:Re: Re: Is there an operation to create multi record for every element in a 
RDD?
日期:2017年08月09日 17点32分

rdd has a cartesian method

On Wed, Aug 9, 2017 at 5:12 PM, ayan guha  wrote:
If you use join without any condition in becomes cross join. In sql, it looks 
like
Select a.*,b.* from a join b
On Wed, 9 Aug 2017 at 7:08 pm,  wrote:
Riccardo and Ryan   Thank you for your ideas.It seems that crossjoin is a new 
dataset api after spark2.x. my spark version is 1.6.3. Is there a relative 
api to do crossjoin?thank you.




 

ThanksBest regards!
San.Luo

- 原始邮件 -
发件人:Riccardo Ferrari 
收件人:Ryan 
抄送人:luohui20...@sina.com, user 
主题:Re: Is there an operation to create multi record for every element in a RDD?
日期:2017年08月09日 16点54分

Depends on your Spark version, have you considered the Dataset api?
You can do something like:







val df1 = rdd1.toDF("userid")







val listRDD = sc.parallelize(listForRule77)







val listDF = listRDD.toDF("data")








df1.crossJoin(listDF).orderBy("userid").show(60, 
truncate=false)+--+--+|userid|data  
|+--+--+|1 |1,1,100.00|1483891200,||1 
|1,1,100.00|1483804800,|...|1 |1,1,100.00|1488902400,|
|1 |1,1,100.00|1489075200,||1 |1,1,100.00|1488470400,|...
On Wed, Aug 9, 2017 at 10:44 AM, Ryan  wrote:
It's just sort of inner join operation... If the second dataset isn't very 
large it's ok(btw, you can use flatMap directly instead of map followed by 
flatmap/flattern), otherwise you can register the second one as a rdd/dataset, 
and join them on user id.
On Wed, Aug 9, 2017 at 4:29 PM,   wrote:
hello guys:  I have a simple rdd like :val userIDs = 1 to 1val rdd1 = 
sc.parallelize(userIDs , 16)   //this rdd has 1 user id  And I have a 
List[String] like below:scala> listForRule77
res76: List[String] = List(1,1,100.00|1483286400, 1,1,100.00|1483372800, 
1,1,100.00|1483459200, 1,1,100.00|1483545600, 1,1,100.00|1483632000, 
1,1,100.00|1483718400, 1,1,100.00|1483804800, 1,1,100.00|1483891200, 
1,1,100.00|1483977600, 3,1,200.00|1485878400, 1,1,100.00|1485964800, 
1,1,100.00|1486051200, 1,1,100.00|1488384000, 1,1,100.00|1488470400, 
1,1,100.00|1488556800, 1,1,100.00|1488643200, 1,1,100.00|1488729600, 
1,1,100.00|1488816000, 1,1,100.00|1488902400, 1,1,100.00|1488988800, 
1,1,100.00|1489075200, 1,1,100.00|1489161600, 1,1,100.00|1489248000, 
1,1,100.00|1489334400, 1,1,100.00|1489420800, 1,1,100.00|1489507200, 
1,1,100.00|1489593600, 1,1,100.00|148968, 1,1,100.00|1489766400)
scala> listForRule77.length
res77: Int = 29
  I need to create a rdd containing  29 records. for every userid in 
rdd1 , I need to create 29 records according to listForRule77, each record 
start with the userid, for example 1(the userid),1,1,100.00|1483286400.   
My idea is like below:1.write a udfto add the userid to the beginning of every 
string element of listForRule77.2.use val rdd2 = rdd1.map{x=> 
List_udf(x))}.flatmap(), the result rdd2 maybe what I need.
  My question: Are there any problems in my idea? Is there a better way to 
do this ? 



 

ThanksBest regards!
San.Luo





-- 
Best Regards,
Ayan Guha






Re: Reusing dataframes for streaming (spark 1.6)

2017-08-09 Thread Tathagata Das
There is a DStream.transform() that does exactly this.

On Tue, Aug 8, 2017 at 7:55 PM, Ashwin Raju  wrote:

> Hi,
>
> We've built a batch application on Spark 1.6.1. I'm looking into how to
> run the same code as a streaming (DStream based) application. This is using
> pyspark.
>
> In the batch application, we have a sequence of transforms that read from
> file, do dataframe operations, then write to file. I was hoping to swap out
> the read from file with textFileStream, then use the dataframe operations
> as is. This would mean that if we change the batch pipeline, so long as it
> is a sequence of dataframe operations, the streaming version can just reuse
> the code.
>
> Looking at the sql_network_wordcount
> 
> example, it looks like I'd have to do DStream.foreachRDD, convert the
> passed in RDD into a dataframe and then do my sequence of dataframe
> operations. However, that list of dataframe operations looks to be
> hardcoded into the process method, is there any way to pass in a function
> that takes a dataframe as input and returns a dataframe?
>
> what i see from the example:
>
> words.foreachRDD(process)
>
> def process(time, rdd):
> # create dataframe from RDD
> # hardcoded operations on the dataframe
>
> what i would like to do instead:
> def process(time, rdd):
> # create dataframe from RDD - input_df
> # output_df = dataframe_pipeline_fn(input_df)
>
> -ashwin
>
>
>
>


Re: Multiple queries on same stream

2017-08-09 Thread Tathagata Das
Its important to note that running multiple streaming queries, as of today,
would read the input data that many number of time. So there is a trade off
between the two approaches.
So even though scenario 1 wont get great catalyst optimization, it may be
more efficient overall in terms of resource usage.

There may be an hybrid solution possible. You could craft multiple rules
using sql dsl. For N rules, you can have N boolean columns added with value
set based on each rule expressed through sql functions. Finally, the
foreach would take appropriate actions. A rough example would be.

dataframe
  .withColumn("rule1", when(...).otherwise(...))
  .withColumn("rule2", when(...).otherwise(...))
  ...
 .filter(...)  // filter out data where no rules were matched
 .as[RuleMatches].foreach { matches =>
// take action for each rule matched
  }

This would evalue the rules with catalyst optimization, and apply
non-optimized foreach function ONLY on rows that matched some rule (which
is hopefully << total rows).



On Tue, Aug 8, 2017 at 11:12 PM, Jörn Franke  wrote:

> This is not easy to say without testing. It depends on type of computation
> etc. it also depends on the Spark version. Generally vectorization / SIMD
> could be much faster if it is applied by Spark / the JVM in scenario 2.
>
> > On 9. Aug 2017, at 07:05, Raghavendra Pandey <
> raghavendra.pan...@gmail.com> wrote:
> >
> > I am using structured streaming to evaluate multiple rules on same
> running stream.
> > I have two options to do that. One is to use forEach and evaluate all
> the rules on the row..
> > The other option is to express rules in spark sql dsl and run multiple
> queries.
> > I was wondering if option 1 will result in better performance even
> though I can get catalyst optimization in option 2.
> >
> > Thanks
> > Raghav
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Re: Is there an operation to create multi record for every element in a RDD?

2017-08-09 Thread Ryan
rdd has a cartesian method

On Wed, Aug 9, 2017 at 5:12 PM, ayan guha  wrote:

> If you use join without any condition in becomes cross join. In sql, it
> looks like
>
> Select a.*,b.* from a join b
>
> On Wed, 9 Aug 2017 at 7:08 pm,  wrote:
>
>> Riccardo and Ryan
>>Thank you for your ideas.It seems that crossjoin is a new dataset api
>> after spark2.x.
>> my spark version is 1.6.3. Is there a relative api to do crossjoin?
>> thank you.
>>
>>
>>
>> 
>>
>> ThanksBest regards!
>> San.Luo
>>
>> - 原始邮件 -
>> 发件人:Riccardo Ferrari 
>> 收件人:Ryan 
>> 抄送人:luohui20...@sina.com, user 
>> 主题:Re: Is there an operation to create multi record for every element in
>> a RDD?
>> 日期:2017年08月09日 16点54分
>>
>> Depends on your Spark version, have you considered the Dataset api?
>>
>> You can do something like:
>>
>> val df1 = rdd1.toDF("userid")
>>
>> val listRDD = sc.parallelize(listForRule77)
>>
>> val listDF = listRDD.toDF("data")
>>
>> df1.crossJoin(listDF).orderBy("userid").show(60, truncate=false)
>>
>> +--+--+
>>
>> |userid|data  |
>>
>> +--+--+
>>
>> |1 |1,1,100.00|1483891200,|
>>
>> |1 |1,1,100.00|1483804800,|
>>
>> ...
>>
>> |1 |1,1,100.00|1488902400,|
>>
>> |1 |1,1,100.00|1489075200,|
>>
>> |1 |1,1,100.00|1488470400,|
>>
>> ...
>>
>> On Wed, Aug 9, 2017 at 10:44 AM, Ryan  wrote:
>>
>> It's just sort of inner join operation... If the second dataset isn't
>> very large it's ok(btw, you can use flatMap directly instead of map
>> followed by flatmap/flattern), otherwise you can register the second one as
>> a rdd/dataset, and join them on user id.
>>
>> On Wed, Aug 9, 2017 at 4:29 PM,  wrote:
>>
>> hello guys:
>>   I have a simple rdd like :
>> val userIDs = 1 to 1
>> val rdd1 = sc.parallelize(userIDs , 16)   //this rdd has 1 user id
>>   And I have a List[String] like below:
>> scala> listForRule77
>> res76: List[String] = List(1,1,100.00|1483286400, 1,1,100.00|1483372800,
>> 1,1,100.00|1483459200, 1,1,100.00|1483545600, 1,1,100.00|1483632000,
>> 1,1,100.00|1483718400, 1,1,100.00|1483804800, 1,1,100.00|1483891200,
>> 1,1,100.00|1483977600, 3,1,200.00|1485878400, 1,1,100.00|1485964800,
>> 1,1,100.00|1486051200, 1,1,100.00|1488384000, 1,1,100.00|1488470400,
>> 1,1,100.00|1488556800, 1,1,100.00|1488643200, 1,1,100.00|1488729600,
>> 1,1,100.00|1488816000, 1,1,100.00|1488902400, 1,1,100.00|1488988800,
>> 1,1,100.00|1489075200, 1,1,100.00|1489161600, 1,1,100.00|1489248000,
>> 1,1,100.00|1489334400, 1,1,100.00|1489420800, 1,1,100.00|1489507200,
>> 1,1,100.00|1489593600, 1,1,100.00|148968, 1,1,100.00|1489766400)
>>
>> scala> listForRule77.length
>> res77: Int = 29
>>
>>   I need to create a rdd containing  29 records. for every userid
>> in rdd1 , I need to create 29 records according to listForRule77, each
>> record start with the userid, for example 1(the
>> userid),1,1,100.00|1483286400.
>>   My idea is like below:
>> 1.write a udf
>> to add the userid to the beginning of every string element
>> of listForRule77.
>> 2.use
>> val rdd2 = rdd1.map{x=> List_udf(x))}.flatmap()
>> , the result rdd2 maybe what I need.
>>
>>   My question: Are there any problems in my idea? Is there a better
>> way to do this ?
>>
>>
>>
>> 
>>
>> ThanksBest regards!
>> San.Luo
>>
>>
>>
>> --
> Best Regards,
> Ayan Guha
>


Re: Trying to connect Spark 1.6 to Hive

2017-08-09 Thread Matteo Cossu
Hello,
try to use these options when starting Spark:

*--conf "spark.driver.userClassPathFirst=true" --conf
"spark.executor.userClassPathFirst=true"  *
In this way you will be sure that the executor and the driver of Spark will
use the classpath you define.

Best Regards,
Matteo Cossu


On 5 August 2017 at 23:04, toletum  wrote:

> Hi everybody
>
> I'm trying to connect Spark to Hive.
>
> Hive uses Derby Server for metastore_db.
>
> $SPARK_HOME/conf/hive-site.xml
>
> 
> 
>   javax.jdo.option.ConnectionURL
>   jdbc:derby://derby:1527/metastore_db;create=true
>   JDBC connect string for a JDBC metastore
> 
>
> 
>   javax.jdo.option.ConnectionDriverName
>   org.apache.derby.jdbc.ClientDriver
>   Driver class name for a JDBC metastore
> 
> 
>
> I have copied to $SPARK_HOME/lib derby.jar, derbyclient.jar, derbytools.jar
>
> Added to CLASSPATH the 3 jars too
>
> $SPARK_HOMElib/derby.jar:$SPARK_HOME/lib/derbytools.jar:
> $SPARK_HOME/lib/derbyclient.jar
>
> But spark-sql saids:
>
> org.datanucleus.store.rdbms.connectionpool.DatastoreDriverNotFoundException:
> The specified datastore driver ("org.apache.derby.jdbc.ClientDriver") was
> not found in the CLASSPATH. Please check your CLASSPATH specification, and
> the name of the driver.
>
> java finds the class
>
> java org.apache.derby.jdbc.ClientDriver
> Error: Main method not found in class org.apache.derby.jdbc.ClientDriver,
> please define the main method as:
>public static void main(String[] args)
> or a JavaFX application class must extend javafx.application.Application
>
> It seems Spark can't find the driver
>
>
>
>


Re: [Structured Streaming] Recommended way of joining streams

2017-08-09 Thread Tathagata Das
Writing streams into some sink (preferably fault-tolerant, exactly once
sink, see docs) and then joining is definitely a possible way. But you will
likely incur higher latency. If you want lower latency, then stream-stream
joins is the best approach, which we are working on right now. Spark 2.3 is
likely to have stream-stream joins (no release date). For now, the best way
would be to use mapGroupsWithState (spark 2.2, scala/java). The rough idea
of how to implement inner join is as follows.

case class Type1(...)// fields in first streamcase class
Type2(...)// fields in second streamcase class CombinedType(first:
Type1, second: Type2)   // a combined type that can hold data from
both streams
val streamingDataset1 = streamingDF1.as[Type1].map { first =>
CombinedType(first, null) }// first stream as common typed
datasetval streamingDataset2 = streamingDF2.as[Type2].map { second =>
CombinedType(null, second) }   // second stream as common typed
dataset
val combinedDataset = streamingDataset1.union(streamingDataset2)
combinedDataset
  .groupByKey { x => getKey(x) }  // group by common id
  .flatMapGroupsWithState {  case (key, values, state) =>
  // update state for the key using the values, and possible
output an object
   }




On Wed, Aug 9, 2017 at 12:05 AM, Priyank Shrivastava  wrote:

> I have streams of data coming in from various applications through Kafka.
> These streams are converted into dataframes in Spark.  I would like to join
> these dataframes on a common ID they all contain.
>
> Since  joining streaming dataframes is currently not supported, what is
> the current recommended way to join two dataFrames  in a streaming context.
>
>
> Is it recommended to keep writing the streaming dataframes into some sink
> to convert them into static dataframes which can then be joined?  Would
> this guarantee end-to-end exactly once and fault tolerant guarantees?
>
> Priyank
>


Re: Re: Is there an operation to create multi record for every element in a RDD?

2017-08-09 Thread ayan guha
If you use join without any condition in becomes cross join. In sql, it
looks like

Select a.*,b.* from a join b

On Wed, 9 Aug 2017 at 7:08 pm,  wrote:

> Riccardo and Ryan
>Thank you for your ideas.It seems that crossjoin is a new dataset api
> after spark2.x.
> my spark version is 1.6.3. Is there a relative api to do crossjoin?
> thank you.
>
>
>
> 
>
> ThanksBest regards!
> San.Luo
>
> - 原始邮件 -
> 发件人:Riccardo Ferrari 
> 收件人:Ryan 
> 抄送人:luohui20...@sina.com, user 
> 主题:Re: Is there an operation to create multi record for every element in a
> RDD?
> 日期:2017年08月09日 16点54分
>
> Depends on your Spark version, have you considered the Dataset api?
>
> You can do something like:
>
> val df1 = rdd1.toDF("userid")
>
> val listRDD = sc.parallelize(listForRule77)
>
> val listDF = listRDD.toDF("data")
>
> df1.crossJoin(listDF).orderBy("userid").show(60, truncate=false)
>
> +--+--+
>
> |userid|data  |
>
> +--+--+
>
> |1 |1,1,100.00|1483891200,|
>
> |1 |1,1,100.00|1483804800,|
>
> ...
>
> |1 |1,1,100.00|1488902400,|
>
> |1 |1,1,100.00|1489075200,|
>
> |1 |1,1,100.00|1488470400,|
>
> ...
>
> On Wed, Aug 9, 2017 at 10:44 AM, Ryan  wrote:
>
> It's just sort of inner join operation... If the second dataset isn't very
> large it's ok(btw, you can use flatMap directly instead of map followed by
> flatmap/flattern), otherwise you can register the second one as a
> rdd/dataset, and join them on user id.
>
> On Wed, Aug 9, 2017 at 4:29 PM,  wrote:
>
> hello guys:
>   I have a simple rdd like :
> val userIDs = 1 to 1
> val rdd1 = sc.parallelize(userIDs , 16)   //this rdd has 1 user id
>   And I have a List[String] like below:
> scala> listForRule77
> res76: List[String] = List(1,1,100.00|1483286400, 1,1,100.00|1483372800,
> 1,1,100.00|1483459200, 1,1,100.00|1483545600, 1,1,100.00|1483632000,
> 1,1,100.00|1483718400, 1,1,100.00|1483804800, 1,1,100.00|1483891200,
> 1,1,100.00|1483977600, 3,1,200.00|1485878400, 1,1,100.00|1485964800,
> 1,1,100.00|1486051200, 1,1,100.00|1488384000, 1,1,100.00|1488470400,
> 1,1,100.00|1488556800, 1,1,100.00|1488643200, 1,1,100.00|1488729600,
> 1,1,100.00|1488816000, 1,1,100.00|1488902400, 1,1,100.00|1488988800,
> 1,1,100.00|1489075200, 1,1,100.00|1489161600, 1,1,100.00|1489248000,
> 1,1,100.00|1489334400, 1,1,100.00|1489420800, 1,1,100.00|1489507200,
> 1,1,100.00|1489593600, 1,1,100.00|148968, 1,1,100.00|1489766400)
>
> scala> listForRule77.length
> res77: Int = 29
>
>   I need to create a rdd containing  29 records. for every userid
> in rdd1 , I need to create 29 records according to listForRule77, each
> record start with the userid, for example 1(the
> userid),1,1,100.00|1483286400.
>   My idea is like below:
> 1.write a udf
> to add the userid to the beginning of every string element
> of listForRule77.
> 2.use
> val rdd2 = rdd1.map{x=> List_udf(x))}.flatmap()
> , the result rdd2 maybe what I need.
>
>   My question: Are there any problems in my idea? Is there a better
> way to do this ?
>
>
>
> 
>
> ThanksBest regards!
> San.Luo
>
>
>
> --
Best Regards,
Ayan Guha


回复:Re: Is there an operation to create multi record for every element in a RDD?

2017-08-09 Thread luohui20001
Riccardo and Ryan   Thank you for your ideas.It seems that crossjoin is a new 
dataset api after spark2.x. my spark version is 1.6.3. Is there a relative 
api to do crossjoin?thank you.




 

ThanksBest regards!
San.Luo

- 原始邮件 -
发件人:Riccardo Ferrari 
收件人:Ryan 
抄送人:luohui20...@sina.com, user 
主题:Re: Is there an operation to create multi record for every element in a RDD?
日期:2017年08月09日 16点54分

Depends on your Spark version, have you considered the Dataset api?
You can do something like:







val df1 = rdd1.toDF("userid")







val listRDD = sc.parallelize(listForRule77)







val listDF = listRDD.toDF("data")








df1.crossJoin(listDF).orderBy("userid").show(60, 
truncate=false)+--+--+|userid|data  
|+--+--+|1 |1,1,100.00|1483891200,||1 
|1,1,100.00|1483804800,|...|1 |1,1,100.00|1488902400,|
|1 |1,1,100.00|1489075200,||1 |1,1,100.00|1488470400,|...
On Wed, Aug 9, 2017 at 10:44 AM, Ryan  wrote:
It's just sort of inner join operation... If the second dataset isn't very 
large it's ok(btw, you can use flatMap directly instead of map followed by 
flatmap/flattern), otherwise you can register the second one as a rdd/dataset, 
and join them on user id.
On Wed, Aug 9, 2017 at 4:29 PM,   wrote:
hello guys:  I have a simple rdd like :val userIDs = 1 to 1val rdd1 = 
sc.parallelize(userIDs , 16)   //this rdd has 1 user id  And I have a 
List[String] like below:scala> listForRule77
res76: List[String] = List(1,1,100.00|1483286400, 1,1,100.00|1483372800, 
1,1,100.00|1483459200, 1,1,100.00|1483545600, 1,1,100.00|1483632000, 
1,1,100.00|1483718400, 1,1,100.00|1483804800, 1,1,100.00|1483891200, 
1,1,100.00|1483977600, 3,1,200.00|1485878400, 1,1,100.00|1485964800, 
1,1,100.00|1486051200, 1,1,100.00|1488384000, 1,1,100.00|1488470400, 
1,1,100.00|1488556800, 1,1,100.00|1488643200, 1,1,100.00|1488729600, 
1,1,100.00|1488816000, 1,1,100.00|1488902400, 1,1,100.00|1488988800, 
1,1,100.00|1489075200, 1,1,100.00|1489161600, 1,1,100.00|1489248000, 
1,1,100.00|1489334400, 1,1,100.00|1489420800, 1,1,100.00|1489507200, 
1,1,100.00|1489593600, 1,1,100.00|148968, 1,1,100.00|1489766400)
scala> listForRule77.length
res77: Int = 29
  I need to create a rdd containing  29 records. for every userid in 
rdd1 , I need to create 29 records according to listForRule77, each record 
start with the userid, for example 1(the userid),1,1,100.00|1483286400.   
My idea is like below:1.write a udfto add the userid to the beginning of every 
string element of listForRule77.2.use val rdd2 = rdd1.map{x=> 
List_udf(x))}.flatmap(), the result rdd2 maybe what I need.
  My question: Are there any problems in my idea? Is there a better way to 
do this ? 



 

ThanksBest regards!
San.Luo







Re: Is there an operation to create multi record for every element in a RDD?

2017-08-09 Thread Riccardo Ferrari
Depends on your Spark version, have you considered the Dataset api?

You can do something like:

val df1 = rdd1.toDF("userid")

val listRDD = sc.parallelize(listForRule77)

val listDF = listRDD.toDF("data")

df1.crossJoin(listDF).orderBy("userid").show(60, truncate=false)

+--+--+

|userid|data  |

+--+--+

|1 |1,1,100.00|1483891200,|

|1 |1,1,100.00|1483804800,|

...

|1 |1,1,100.00|1488902400,|

|1 |1,1,100.00|1489075200,|

|1 |1,1,100.00|1488470400,|

...

On Wed, Aug 9, 2017 at 10:44 AM, Ryan  wrote:

> It's just sort of inner join operation... If the second dataset isn't very
> large it's ok(btw, you can use flatMap directly instead of map followed by
> flatmap/flattern), otherwise you can register the second one as a
> rdd/dataset, and join them on user id.
>
> On Wed, Aug 9, 2017 at 4:29 PM,  wrote:
>
>> hello guys:
>>   I have a simple rdd like :
>> val userIDs = 1 to 1
>> val rdd1 = sc.parallelize(userIDs , 16)   //this rdd has 1 user id
>>   And I have a List[String] like below:
>> scala> listForRule77
>> res76: List[String] = List(1,1,100.00|1483286400, 1,1,100.00|1483372800,
>> 1,1,100.00|1483459200, 1,1,100.00|1483545600, 1,1,100.00|1483632000,
>> 1,1,100.00|1483718400, 1,1,100.00|1483804800, 1,1,100.00|1483891200,
>> 1,1,100.00|1483977600, 3,1,200.00|1485878400, 1,1,100.00|1485964800,
>> 1,1,100.00|1486051200, 1,1,100.00|1488384000, 1,1,100.00|1488470400,
>> 1,1,100.00|1488556800, 1,1,100.00|1488643200, 1,1,100.00|1488729600,
>> 1,1,100.00|1488816000, 1,1,100.00|1488902400, 1,1,100.00|1488988800,
>> 1,1,100.00|1489075200, 1,1,100.00|1489161600, 1,1,100.00|1489248000,
>> 1,1,100.00|1489334400, 1,1,100.00|1489420800, 1,1,100.00|1489507200,
>> 1,1,100.00|1489593600, 1,1,100.00|148968, 1,1,100.00|1489766400)
>>
>> scala> listForRule77.length
>> res77: Int = 29
>>
>>   I need to create a rdd containing  29 records. for every userid
>> in rdd1 , I need to create 29 records according to listForRule77, each
>> record start with the userid, for example 1(the
>> userid),1,1,100.00|1483286400.
>>   My idea is like below:
>> 1.write a udf
>> to add the userid to the beginning of every string element
>> of listForRule77.
>> 2.use
>> val rdd2 = rdd1.map{x=> List_udf(x))}.flatmap()
>> , the result rdd2 maybe what I need.
>>
>>   My question: Are there any problems in my idea? Is there a better
>> way to do this ?
>>
>>
>>
>> 
>>
>> ThanksBest regards!
>> San.Luo
>>
>
>


Re: Is there an operation to create multi record for every element in a RDD?

2017-08-09 Thread Ryan
It's just sort of inner join operation... If the second dataset isn't very
large it's ok(btw, you can use flatMap directly instead of map followed by
flatmap/flattern), otherwise you can register the second one as a
rdd/dataset, and join them on user id.

On Wed, Aug 9, 2017 at 4:29 PM,  wrote:

> hello guys:
>   I have a simple rdd like :
> val userIDs = 1 to 1
> val rdd1 = sc.parallelize(userIDs , 16)   //this rdd has 1 user id
>   And I have a List[String] like below:
> scala> listForRule77
> res76: List[String] = List(1,1,100.00|1483286400, 1,1,100.00|1483372800,
> 1,1,100.00|1483459200, 1,1,100.00|1483545600, 1,1,100.00|1483632000,
> 1,1,100.00|1483718400, 1,1,100.00|1483804800, 1,1,100.00|1483891200,
> 1,1,100.00|1483977600, 3,1,200.00|1485878400, 1,1,100.00|1485964800,
> 1,1,100.00|1486051200, 1,1,100.00|1488384000, 1,1,100.00|1488470400,
> 1,1,100.00|1488556800, 1,1,100.00|1488643200, 1,1,100.00|1488729600,
> 1,1,100.00|1488816000, 1,1,100.00|1488902400, 1,1,100.00|1488988800,
> 1,1,100.00|1489075200, 1,1,100.00|1489161600, 1,1,100.00|1489248000,
> 1,1,100.00|1489334400, 1,1,100.00|1489420800, 1,1,100.00|1489507200,
> 1,1,100.00|1489593600, 1,1,100.00|148968, 1,1,100.00|1489766400)
>
> scala> listForRule77.length
> res77: Int = 29
>
>   I need to create a rdd containing  29 records. for every userid
> in rdd1 , I need to create 29 records according to listForRule77, each
> record start with the userid, for example 1(the
> userid),1,1,100.00|1483286400.
>   My idea is like below:
> 1.write a udf
> to add the userid to the beginning of every string element
> of listForRule77.
> 2.use
> val rdd2 = rdd1.map{x=> List_udf(x))}.flatmap()
> , the result rdd2 maybe what I need.
>
>   My question: Are there any problems in my idea? Is there a better
> way to do this ?
>
>
>
> 
>
> ThanksBest regards!
> San.Luo
>


Is there an operation to create multi record for every element in a RDD?

2017-08-09 Thread luohui20001
hello guys:  I have a simple rdd like :val userIDs = 1 to 1val rdd1 = 
sc.parallelize(userIDs , 16)   //this rdd has 1 user id  And I have a 
List[String] like below:scala> listForRule77
res76: List[String] = List(1,1,100.00|1483286400, 1,1,100.00|1483372800, 
1,1,100.00|1483459200, 1,1,100.00|1483545600, 1,1,100.00|1483632000, 
1,1,100.00|1483718400, 1,1,100.00|1483804800, 1,1,100.00|1483891200, 
1,1,100.00|1483977600, 3,1,200.00|1485878400, 1,1,100.00|1485964800, 
1,1,100.00|1486051200, 1,1,100.00|1488384000, 1,1,100.00|1488470400, 
1,1,100.00|1488556800, 1,1,100.00|1488643200, 1,1,100.00|1488729600, 
1,1,100.00|1488816000, 1,1,100.00|1488902400, 1,1,100.00|1488988800, 
1,1,100.00|1489075200, 1,1,100.00|1489161600, 1,1,100.00|1489248000, 
1,1,100.00|1489334400, 1,1,100.00|1489420800, 1,1,100.00|1489507200, 
1,1,100.00|1489593600, 1,1,100.00|148968, 1,1,100.00|1489766400)
scala> listForRule77.length
res77: Int = 29
  I need to create a rdd containing  29 records. for every userid in 
rdd1 , I need to create 29 records according to listForRule77, each record 
start with the userid, for example 1(the userid),1,1,100.00|1483286400.   
My idea is like below:1.write a udfto add the userid to the beginning of every 
string element of listForRule77.2.use val rdd2 = rdd1.map{x=> 
List_udf(x))}.flatmap(), the result rdd2 maybe what I need.
  My question: Are there any problems in my idea? Is there a better way to 
do this ? 



 

ThanksBest regards!
San.Luo


[Structured Streaming] Recommended way of joining streams

2017-08-09 Thread Priyank Shrivastava
I have streams of data coming in from various applications through Kafka.
These streams are converted into dataframes in Spark.  I would like to join
these dataframes on a common ID they all contain.

Since  joining streaming dataframes is currently not supported, what is the
current recommended way to join two dataFrames  in a streaming context.

Is it recommended to keep writing the streaming dataframes into some sink
to convert them into static dataframes which can then be joined?  Would
this guarantee end-to-end exactly once and fault tolerant guarantees?

Priyank


Re: Multiple queries on same stream

2017-08-09 Thread Jörn Franke
This is not easy to say without testing. It depends on type of computation etc. 
it also depends on the Spark version. Generally vectorization / SIMD could be 
much faster if it is applied by Spark / the JVM in scenario 2.

> On 9. Aug 2017, at 07:05, Raghavendra Pandey  
> wrote:
> 
> I am using structured streaming to evaluate multiple rules on same running 
> stream. 
> I have two options to do that. One is to use forEach and evaluate all the 
> rules on the row.. 
> The other option is to express rules in spark sql dsl and run multiple 
> queries. 
> I was wondering if option 1 will result in better performance even though I 
> can get catalyst optimization in option 2.
> 
> Thanks 
> Raghav 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org