Re: Refreshing a persisted RDD

2017-05-19 Thread Sudhir Menon
Part of the problem here is that the static dataframe is designed to be
used a read only abstraction in Spark, and updating that requires the user
to drop the dataframe holding the reference data and recreate it. And in
order for the join to use the recreated dataframe, the query has to be
restarted, which results in the stream processing glitching. Jayesh's use
case is something we see frequently in working with real customers who love
Spark for a lot of reasons but like to have the ease of use that comes from
being able to work with mutable abstractions.

If you were to do this with SnappyData, since the product supports
mutability of tables (and hence dataframe mutability) out of the box, you
would simply be able to update the information in your "static" data frame.
The code snippet below allows you to simply mutate the blacklist table
(which is the only thing the user needs to do) and keep going and the join
now automatically uses the updated dataframe without any further action
from the user.

-

  *val spark: SparkSession = SparkSession*

*.builder*

*.appName("Demo")*

*.master("local[*]")*

*.getOrCreate*


*  val session = new SnappySession(spark.sparkContext)*

*  val snc = session.snappyContext*


*  case class Account(accountName: String) // For simplicity, account just
has one column, accountName*

*//Black list includes account numbers ranging from 15 to 25  *

*val rdd = snc.sparkContext.parallelize((15 to 25).map(i =>
Account(i.toString)))*

*  val dfBlackList = snc.createDataFrame(rdd)   // This can be any DF, like
created from s3 using session.read.csv("pathToCSV")*

*  // create a SnappyData table*

*  snc.createTable("blacklist", "row", dfBlackList.schema,
Map.empty[String, String]) *

*  import org.apache.spark.sql.snappy._*

*  dfBlackList.write.putInto("blacklist") // populate the table
'blacklist'. *

*  // This is an updatable table, so you can update/insert like a normal
RDBMS table, for example as shown below*

 *//   val newColumnValues: Row = Row(26)*

* //  snc.update("blacklist",  newColumnValues, "**accountName**")*



*  import spark.implicits._*

*  // Read the accounts from Kafka source*

*  val dfAccount = session*

*.readStream*

*.format("kafka")*

*.option("kafka.bootstrap.servers", "localhost:9092")*

*.option("subscribe", topic)*

*.option("startingOffsets", "earliest").load*

*.selectExpr("CAST(value AS STRING) accountName").as[(String)]*


*  // Simple join your streaming DataFrame with the blackList DataFrame.*

*  val query = dfAccount.join(dfBlackList, "accountName")*

*.writeStream*

*.outputMode("append")*

*.format("snappy") // stored in-memory SQL tables.*

*.option("checkpointLocation", "/tmp")*

*.queryName("snappyResultTable") // You can save it to SnappyData's
column table*

*.trigger(ProcessingTime("1 seconds"))*

*.start*


 * query.awaitTermination(timeoutMs = 15000) *

*  session.sql("select * from snappyResultTable").show*

You can get the product release that works with this code snippet here


You can reach out to us with questions here


Generally avoid responding on this forum with product specific answers, but
in this case, it seems to offer a simpler pattern


Suds

On Wed, May 3, 2017 at 4:30 PM, Tathagata Das  wrote:

> Yes, you will have to recreate the streaming Dataframe along with the
> static Dataframe, and restart the query. There isnt a currently feasible to
> do this without a query restart. But restarting a query WITHOUT restarting
> the whole application + spark cluster, is reasonably fast. If your
> applicatoin can tolerate 10 second latencies, then stopping and restarting
> a query within the same Spark application is a reasonable solution.
>
> On Wed, May 3, 2017 at 4:13 PM, Lalwani, Jayesh <
> jayesh.lalw...@capitalone.com> wrote:
>
>> Thanks, TD for answering this question on the Spark mailing list.
>>
>>
>>
>> A follow-up. So, let’s say we are joining a cached dataframe with a
>> streaming dataframe, and we recreate the cached dataframe, do we have to
>> recreate the streaming dataframe too?
>>
>>
>>
>> One possible solution that we have is
>>
>>
>>
>> val dfBlackList = spark.read.csv(….) //batch dataframe… assume that this
>> dataframe has a single column namedAccountName
>> dfBlackList.createOrReplaceTempView(“blacklist”)
>> val dfAccount = spark.readStream.kafka(…..) // assume for brevity’s sake
>> that we have parsed the kafka payload and have a data frame here with
>> multiple columns.. one of them called accountName
>>
>> dfAccount. createOrReplaceTempView(“account”)
>>
>> val dfBlackListedAccount = spark.sql(“select * from account inner join
>> blacklist on account.accountName = blacklist.accountName”)
>>
>> df.writeStream(…..).start() // boom started
>>
>>
>>
>> Now some time later while the query is running 

Re: RE: Fast write datastore...

2017-03-16 Thread Sudhir Menon
I am extremely leery about pushing product on this forum and have refrained
from it in the past. But since you are talking about loading parquet data
into Spark, run some aggregate queries and then write the results to a fast
data store, and specifically asking for product options,  it makes absolute
sense to consider SnappyData. SnappyData turns Spark into a fast read write
store and you can do what you are trying to do with a single cluster which
hosts Spark and the database. It is an in memory store that supports high
concurrency, fast lookups and the ability to run queries via
ODBC/JDBC/Thrift. The tables stored in the database are accessible as
dataframes and you can use the Spark API to access the data.

Check it out here . Happy to answer any
questions (there are tons of resources on the site and you can post
questions on the slack  channel)

On Thu, Mar 16, 2017 at 2:43 AM, yohann jardin 
wrote:

> Hello everyone,
>
> I'm also really interested in the answers as I will be facing the same
> issue soon.
> Muthu, if you evaluate again Apache Ignite, can you share your results? I
> also noticed Alluxio to store spark results in memory that you might want
> to investigate.
>
> In my case I want to use them to have a real time dashboard (or like
> waiting very few seconds to refine a dashboard), and that use case seems
> similar to your filter/aggregate previously computed spark results.
>
> Regards,
> Yohann
>
>
> --
> *De :* Rick Moritz 
> *Envoyé :* jeudi 16 mars 2017 10:37
> *À :* user
> *Objet :* Re: RE: Fast write datastore...
>
> If you have enough RAM/SSDs available, maybe tiered HDFS storage and
> Parquet might also be an option. Of course, management-wise it has much
> more overhead than using ES, since you need to manually define partitions
> and buckets, which is suboptimal. On the other hand, for querying, you can
> probably get some decent performance by hooking up Impala or Presto or
> LLAP-Hive, if Spark were too slow/cumbersome.
> Depending on your particular access patterns, this may not be very
> practical, but as a general approach it might be one way to get
> intermediate results quicker, and with less of a storage-zoo than some
> alternatives.
>
> On Thu, Mar 16, 2017 at 7:57 AM, Shiva Ramagopal 
> wrote:
>
>> I do think Kafka is an overkill in this case. There are no streaming use-
>> cases that needs a queue to do pub-sub.
>>
>> On 16-Mar-2017 11:47 AM, "vvshvv"  wrote:
>>
>>> Hi,
>>>
>>> >> A slightly over-kill solution may be Spark to Kafka to ElasticSearch?
>>>
>>> I do not think so, in this case you will be able to process Parquet
>>> files as usual, but Kafka will allow your Elasticsearch cluster to be
>>> stable and survive regarding the number of rows.
>>>
>>> Regards,
>>> Uladzimir
>>>
>>>
>>>
>>> On jasbir.s...@accenture.com, Mar 16, 2017 7:52 AM wrote:
>>>
>>> Hi,
>>>
>>>
>>>
>>> Will MongoDB not fit this solution?
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> *From:* Vova Shelgunov [mailto:vvs...@gmail.com]
>>> *Sent:* Wednesday, March 15, 2017 11:51 PM
>>> *To:* Muthu Jayakumar 
>>> *Cc:* vincent gromakowski ; Richard
>>> Siebeling ; user ; Shiva
>>> Ramagopal 
>>> *Subject:* Re: Fast write datastore...
>>>
>>>
>>>
>>> Hi Muthu,.
>>>
>>>
>>>
>>> I did not catch from your message, what performance do you expect from
>>> subsequent queries?
>>>
>>>
>>>
>>> Regards,
>>>
>>> Uladzimir
>>>
>>>
>>>
>>> On Mar 15, 2017 9:03 PM, "Muthu Jayakumar"  wrote:
>>>
>>> Hello Uladzimir / Shiva,
>>>
>>>
>>>
>>> From ElasticSearch documentation (i have to see the logical plan of a
>>> query to confirm), the richness of filters (like regex,..) is pretty good
>>> while comparing to Cassandra. As for aggregates, i think Spark Dataframes
>>> is quite rich enough to tackle.
>>>
>>> Let me know your thoughts.
>>>
>>>
>>>
>>> Thanks,
>>>
>>> Muthu
>>>
>>>
>>>
>>>
>>>
>>> On Wed, Mar 15, 2017 at 10:55 AM, vvshvv  wrote:
>>>
>>> Hi muthu,
>>>
>>>
>>>
>>> I agree with Shiva, Cassandra also supports SASI indexes, which can
>>> partially replace Elasticsearch functionality.
>>>
>>>
>>>
>>> Regards,
>>>
>>> Uladzimir
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> Sent from my Mi phone
>>>
>>> On Shiva Ramagopal , Mar 15, 2017 5:57 PM wrote:
>>>
>>> Probably Cassandra is a good choice if you are mainly looking for a
>>> datastore that supports fast writes. You can ingest the data into a table
>>> and define one or more materialized views on top of it to support your
>>> queries. Since you mention that your queries are going to be simple you can
>>> define your indexes in the materialized views according to how you want to
>>> query the data.
>>>
>>> Thanks,
>>>
>>> Shiva
>>>
>>>