Sparse vs. Dense vector memory usage

2021-08-02 Thread Gerard Maas
Dear Spark folks,

Is there somewhere a guideline on the density tipping point when it makes
more sense to use a spark ml dense vector vs. a sparse vector with regards
to the memory usage on fairly large (image processing) vectors?
My google-foo didn't deliver me anything useful.

Thanks in advance!

Gerard.


Re: [spark-structured-streaming] [kafka] consume topics from multiple Kafka clusters

2020-06-09 Thread Gerard Maas
Hi Srinivas,

Reading from different brokers is possible but you need to connect to each
Kafka cluster separately.
Trying to mix connections to two different Kafka clusters in one subscriber
is not supported. (I'm sure that it would give all kind of weird errors)
The  "kafka.bootstrap.servers" option is there to indicate the potential
many brokers of the *same* Kafka cluster.

The way to address this is following the suggestion of German to create a
subscriptions for each Kafka cluster you are talking to.

val df_cluster1 = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "cluster1_host:cluster1_port")
  .option("subscribe", "topic1, topic2")
 .load()

val df_cluster2 = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "cluster2_host:cluster2_port")
  .option("subscribe", "topic3, topicn, topicn+1,")
 .load()

After acquiring the DataFrame, you can union them and treat all the data
with a single process.

val unifiedData = df_cluster1.union(df_cluster2)
// apply further transformations on `unifiedData`

kr, Gerard.


:



On Tue, Jun 9, 2020 at 6:30 PM Srinivas V  wrote:

> Thanks for the quick reply. This may work but I have like 5 topics to
> listen to right now, I am trying to keep all topics in an array in a
> properties file and trying to read all at once. This way it is dynamic and
> you have one code block like below and you may add or delete topics from
> the config file without changing code. If someone confirms that it does not
> work, I would have to do something like you have provided.
>
> val df_cluster1 = spark
>   .read
>   .format("kafka")
>   .option("kafka.bootstrap.servers", 
> "cluster1_host:cluster1_port,cluster2_host:port")
>
> .option("subscribe", "topic1, topic2,topic3,topic4,topic5")
>
>


Re: Spark Streaming not working

2020-04-14 Thread Gerard Maas
Hi,

Could you share the code that you're using to configure the connection to
the Kafka broker?

This is a bread-and-butter feature. My first thought is that there's
something in your particular setup that prevents this from working.

kind regards, Gerard.

On Fri, Apr 10, 2020 at 7:34 PM Debabrata Ghosh 
wrote:

> Hi,
> I have a spark streaming application where Kafka is producing
> records but unfortunately spark streaming isn't able to consume those.
>
> I am hitting the following error:
>
> 20/04/10 17:28:04 ERROR Executor: Exception in task 0.5 in stage 0.0 (TID 24)
> java.lang.AssertionError: assertion failed: Failed to get records for 
> spark-executor-service-spark-ingestion dice-ingestion 11 0 after polling for 
> 12
>   at scala.Predef$.assert(Predef.scala:170)
>   at 
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74)
>   at 
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223)
>   at 
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189)
>
>
> Would you please be able to help with a resolution.
>
> Thanks,
> Debu
>


Re: What is the best way to consume parallely from multiple topics in Spark Stream with Kafka

2020-03-04 Thread Gerard Maas
Hi Hrishi,

When using the Direct Kafka stream approach, processing tasks will be
distributed to the cluster.
The level of parallelism is dependent on how many partitions the consumed
topics have.
Why do you think that the processing is not happening in parallel?

I would advise you to get the base scenario working before looking into
advanced features like `concurrentJobs` or a particular scheduler.

kind regards, Gerard.

On Wed, Mar 4, 2020 at 7:42 PM Hrishikesh Mishra 
wrote:

> Hi
>
> My spark stream job consumes from multiple Kafka topics. How can I process
> parallely? Should I try for *spark.streaming.concurrentJobs,* but it has
> some adverse effects as mentioned by the creator. Is it still valid with
> Spark 2.4 and Direct Kafka Stream? What about FAIR scheduling mode, will it
> help in this scenario. I am not getting any valid links around this.
>
> Regards
> Hrishi
>
>


Re: [StructuredStreaming] HDFSBackedStateStoreProvider is leaking .crc files.

2019-06-12 Thread Gerard Maas
Ooops - linked the wrong JIRA ticket:  (that other one is related)

https://issues.apache.org/jira/browse/SPARK-28025

On Wed, Jun 12, 2019 at 1:21 PM Gerard Maas  wrote:

> Hi!
> I would like to socialize this issue we are currently facing:
> The Structured Streaming default CheckpointFileManager leaks .crc files by
> leaving them behind after users of this class (like
> HDFSBackedStateStoreProvider) apply their cleanup methods.
>
> This results in an unbounded creation of tiny files that eat away storage
> by the block and, in our case, deteriorates the file system performance.
>
> We correlated the processedRowsPerSecond reported by the
> StreamingQueryProgress against a count of the .crc files in the storage
> directory (checkpoint + state store). The performance impact we observe is
> dramatic.
>
> We are running on Kubernetes, using GlusterFS as the shared storage
> provider.
> [image: out processedRowsPerSecond vs. files in storage_process.png]
> I have created a JIRA ticket with additional detail:
>
> https://issues.apache.org/jira/browse/SPARK-17475
>
> This is also related to an earlier discussion about the state store
> unbounded disk-size growth, which was left unresolved back then:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Understanding-State-Store-storage-behavior-for-the-Stream-Deduplication-function-td34883.html
>
> If there's any additional detail I should add/research, please let me know.
>
> kind regards, Gerard.
>
>
>


[StructuredStreaming] HDFSBackedStateStoreProvider is leaking .crc files.

2019-06-12 Thread Gerard Maas
Hi!
I would like to socialize this issue we are currently facing:
The Structured Streaming default CheckpointFileManager leaks .crc files by
leaving them behind after users of this class (like
HDFSBackedStateStoreProvider) apply their cleanup methods.

This results in an unbounded creation of tiny files that eat away storage
by the block and, in our case, deteriorates the file system performance.

We correlated the processedRowsPerSecond reported by the
StreamingQueryProgress against a count of the .crc files in the storage
directory (checkpoint + state store). The performance impact we observe is
dramatic.

We are running on Kubernetes, using GlusterFS as the shared storage
provider.
[image: out processedRowsPerSecond vs. files in storage_process.png]
I have created a JIRA ticket with additional detail:

https://issues.apache.org/jira/browse/SPARK-17475

This is also related to an earlier discussion about the state store
unbounded disk-size growth, which was left unresolved back then:
http://apache-spark-user-list.1001560.n3.nabble.com/Understanding-State-Store-storage-behavior-for-the-Stream-Deduplication-function-td34883.html

If there's any additional detail I should add/research, please let me know.

kind regards, Gerard.


Re: The following Java MR code works for small dataset but throws(arrayindexoutofBound) error for large dataset

2019-05-09 Thread Gerard Maas
Hi,

I'm afraid you sent this email to the wrong Mailing list.
This is the Spark users mailing list. We could probably tell you how to do
this with Spark, but I think that's not your intention :)

kr, Gerard.


On Thu, May 9, 2019 at 11:03 AM Balakumar iyer S 
wrote:

> Hi All,
>
> I am trying to read a orc file and  perform groupBy operation on it , but
> When i run it on a large data set we are facing the following error
> message.
>
> Input format of INPUT DATA
>
> |178111256|  107125374|
> |178111256|  107148618|
> |178111256|  107175361|
> |178111256|  107189910|
>
> and we are try to group by the first column.
>
> But as per the logic and syntax the code is appropriate but it is  working
> well on small data set. I have attached the code in the text file.
>
> Thank you for your time.
>
> ERROR MESSAGE:
> Error: java.lang.ArrayIndexOutOfBoundsException at
> org.apache.hadoop.mapred.MapTask$MapOutputBuffer$Buffer.write(MapTask.java:1453)
> at
> org.apache.hadoop.mapred.MapTask$MapOutputBuffer$Buffer.write(MapTask.java:1349)
> at java.io.DataOutputStream.writeByte(DataOutputStream.java:153) at
> org.apache.hadoop.io.WritableUtils.writeVLong(WritableUtils.java:273) at
> org.apache.hadoop.io.WritableUtils.writeVInt(WritableUtils.java:253) at
> org.apache.hadoop.io.Text.write(Text.java:330) at
> org.apache.hadoop.io.serializer.WritableSerialization$WritableSerializer.serialize(WritableSerialization.java:98)
> at
> org.apache.hadoop.io.serializer.WritableSerialization$WritableSerializer.serialize(WritableSerialization.java:82)
> at
> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1149)
> at
> org.apache.hadoop.mapred.MapTask$OldOutputCollector.collect(MapTask.java:610)
> at orc_groupby.orc_groupby.Orc_groupBy$MyMapper.map(Orc_groupBy.java:73) at
> orc_groupby.orc_groupby.Orc_groupBy$MyMapper.map(Orc_groupBy.java:39) at
> org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54) at
> org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:453) at
> org.apache.hadoop.mapred.MapTask.run(MapTask.java:343) at
> org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:170) at
> java.security.AccessController.doPrivileged(Native Method) at
> javax.security.auth.Subject.doAs(Subject.java:422) at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1869)
> at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:164)
>
>
>
> --
> REGARDS
> BALAKUMAR SEETHARAMAN
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Re: Convert RDD[Iterrable[MyCaseClass]] to RDD[MyCaseClass]

2018-12-03 Thread Gerard Maas
James,

How do you create an instance of `RDD[Iterable[MyCaseClass]]` ?
Is it in that first code snippet? > new SparkContext(sc).parallelize(seq)?

kr, Gerard




On Fri, Nov 30, 2018 at 3:02 PM James Starks 
wrote:

> When processing data, I create an instance of RDD[Iterable[MyCaseClass]]
> and I want to convert it to RDD[MyCaseClass] so that it can be further
> converted to dataset or dataframe with toDS() function. But I encounter a
> problem that SparkContext can not be instantiated within SparkSession.map
> function because it already exists, even with allowMultipleContexts set to
> true.
>
> val sc = new SparkConf()
> sc.set("spark.driver.allowMultipleContexts", "true")
> new SparkContext(sc).parallelize(seq)
>
> How can I fix this?
>
> Thanks.
>


Re: Re: About the question of Spark Structured Streaming window output

2018-08-27 Thread Gerard Maas
Hi,

> 1、why the start time is "2018-08-27 09:50:00" not "2018-08-27 09:53:00"?
When I define the window, the starttime is not set.
When no 'starttime' is defined, windows are aligned to the start of the
upper time magnitude. So, if your window is defined in minutes, it will be
aligned to the start of the hour and the first window will be the current
window.
In the case of a window of 5:00 minutes, it will be 9:00 - 9:05, 9:05-9:10,
9:10-9:15, ... 9:45-9:50, 9:50-9:55, ...
The first data point sets the internal 'event time clock' and the first
corresponding window for 9:53 is 9:50-9:55

Also, note that 'start time' is a very misleading name for that window
parameter. It's actually `starttimeoffset`. If you would specify
'startTime' to `1 minute`, then the windows will be 9:01-9:06, 9:06-9:11,
...

2、why the agg result of time "2018-08-27 09:53:00 " is not output when the
batch1 data is coming?
Yes. the resulting value is clearly there:
2018-08-27 09:50:00|2018-08-27 09:55:00|   2|
the two datapoints that fall in this window are summed and the result is 2.

What would were you expecting?

kr, Gerard.


On Mon, Aug 27, 2018 at 5:37 AM z...@zjdex.com  wrote:

> Hi Jungtaek Lim & Gerard Mass:
> Thanks very much.
> When I put three batch data like following :
>
> batch 0:
> 2018-08-27 09:53:00,1
> 2018-08-27 09:53:01,1
>
> batch 1:
> 2018-08-27 11:04:00,1
> 2018-08-27 11:04:01,1
>
> batch 2:
> 2018-08-27 11:17:00,1
> 2018-08-27 11:17:01,1
>
> the agg result of time "2018-08-27 09:53:00" is output like following:
> Batch: 2
> ---
> +---+---++
> |  start|end|sumvalue|
> +---+---++
> |2018-08-27 09:50:00|2018-08-27 09:55:00|   2|
> +---+---++
>
> For the result, I wonder to know:
> 1、why the start time is "2018-08-27 09:50:00" not "2018-08-27 09:53:00"?
> When I define the window, the starttime is not set.
> 2、why the agg result of time "2018-08-27 09:53:00 " is not output when
> the batch1 data is comming?
>
> Thanks a lot!
>
>
>
> --
> z...@zjdex.com
>
>
> *From:* Jungtaek Lim 
> *Date:* 2018-08-27 11:01
> *To:* z...@zjdex.com
> *CC:* Gerard Maas ; user ;
> 王程浩 
> *Subject:* Re: Re: About the question of Spark Structured Streaming
> window output
> You may want to add streaming listener to your query and see when/how
> watermark is updated. In short, watermark is calculated from previous batch
> and calculated value is applied to current batch. So you may think that the
> result is provided later than expected, maybe a batch.
>
> 2018년 8월 27일 (월) 오전 11:56, z...@zjdex.com 님이 작성:
>
>> Hi Gerard Mass:
>> Thanks a lot for your reply.
>> When i use "append" model,  I send the following data:
>> 2018-08-27 09:53:00,1
>> 2018-08-27 09:53:01,1
>> The result (which has only schema, like the following) has received after
>> the batch is end. But when the time of window + watermark is up, there
>> is no result to output. Is there something I misss? Thanks in advance.
>>
>>
>>
>> --
>> z...@zjdex.com
>>
>>
>> *From:* Gerard Maas 
>> *Date:* 2018-08-27 05:00
>> *To:* zrc 
>> *CC:* spark users ; wch 
>> *Subject:* Re: About the question of Spark Structured Streaming window
>> output
>>
>> Hi,
>>
>> When you use a window in Append mode, you need to wait for the end of the
>> window + watermark to see the final record from the "append" mode.
>> This is your query over time. Note the timestamp at the right side of the
>> cell and the data present in it.
>>
>> val windowedCounts = dataSrc
>>   .withWatermark("timestamp", "1 minutes")
>>   .groupBy(window($"timestamp", "5 minutes"))
>>   .agg(sum("value") as "sumvalue")
>>   .select("window.start", "window.end","sumvalue")
>>
>>
>> [image: image.png]
>>
>> Going back to your questions:
>> 1、when I set the append output model,  I send inputdata, but there is no
>> result to output. How to use append model in window aggreate case ?
>> Wait for the window + watermark to expire and you'll see the append
>> record output
>>
>> 2、when I set the update output model, I send inputdata, the result is
>> output every batch .But I want output the result only once 

Re: How to convert Spark Streaming to Static Dataframe on the fly and pass it to a ML Model as batch

2018-08-14 Thread Gerard Maas
Hi Aakash,

In Spark Streaming, forEachRDD provides you access to the data in
each micro batch.
You can transform that RDD into a DataFrame and implement the flow you
describe.

eg.:

var historyRDD:RDD[mytype] = sparkContext.emptyRDD

// create Kafka Dstream ...

dstream.foreachRDD{ rdd =>
  val allData = historyRDD union rdd
  val df = allData.toDF   // requires the RDD to be of some structured
type. i.e. a case class
  // do something with the dataframe df
historyRDD = allData  // this needs checkpointing
}
Depending on the volume of data you're dealing with, it might not be
possible to hold all data in memory.
Checkpoint of the historyRDD is mandatory to break up the growing lineage
(union will keep a reference to the previous RDDs and at some point, things
will blow up)
So, while this trick might keep data within the Spark boundaries, you still
need resilient storage to write the checkpoints in order to implement a
reliable streaming job.

As you are using Kafka, another alternative would be to write the
transformed data to Kafka and have the training job consume that topic,
replaying data from the start.
Confluent has some good resources on how to use "kafka as a storage"

I  hope this helps.

kr, Gerard.

PS: I'm also not sure why you are initially writing the files to Kafka. It
would be easier to read the files directly from Spark Streaming or
Structured Streaming.





On Tue, Aug 14, 2018 at 9:31 AM Aakash Basu 
wrote:

> Hi all,
>
> The requirement is, to process file using Spark Streaming fed from Kafka
> Topic and once all the transformations are done, make it a batch of static
> dataframe and pass it into a Spark ML Model tuning.
>
> As of now, I had been doing it in the below fashion -
>
> 1) Read the file using Kafka
> 2) Consume it in Spark using a streaming dataframe
> 3) Run spark transformation jobs on streaming data
> 4) Append and write on HDFS.
> 5) Read the transformed file as batch in Spark
> 6) Run Spark ML Model
>
> But, the requirement is to avoid use of HDFS as it may not be installed in
> certain clusters, so, we've to avoid the disk I/O and do it on the fly from
> Kafka to append in a spark static DF and hence pass that DF to the ML Model.
>
> How to go about it?
>
> Thanks,
> Aakash.
>


Re: [STRUCTURED STREAM] Join static dataset in state function (flatMapGroupsWithState)

2018-07-19 Thread Gerard Maas
Hi Chris,

Could you show the code you are using? When you mention "I like to use a
static datasource (JDBC) in the state function" do you refer to a DataFrame
from a JDBC source or an independent JDBC connection?

The key point to consider is that the flatMapGroupsWithState function must
be serializable. Its execution happens in the workers of a Spark job.

If you are using a JDBC connection, you need to make sure the connection is
made in the context of the function. JDBC connections are not serializable.
Likewise, Dataset/DataFrames only function in the driver where they are
defined. They are bound to the Spark Session in the driver and it does not
make sense to access them in a remote executor.

Make sure you check the executor logs as well. There might be a
NullPointerException lurking somewhere in your logs.

met vriendelijke groeten, Gerard.

PS: spark-dev (d...@spark.apache.org) is for discussions about open source
development of the Spark project.
For general questions like this, use the user's  mailing list (
user@spark.apache.org)  (note that I changed that address in the to: )

On Thu, Jul 19, 2018 at 12:51 PM Christiaan Ras <
christiaan@semmelwise.nl> wrote:

> I use the state function flatmapgroupswithstate to track state of a kafka
> stream. To further customize the state function I like to use a static
> datasource (JDBC) in the state function. This datasource contains data I
> like to join with the stream (as Iterator) within flatmapgroupswithstate.
>
>
>
> When I try to access the JDBC source within flatmapgroupswithstate Spark
> execution freezes without any Exceptions or logging.
>
> To verify the JDBC connection works, I also tried to access the source
> outside the state function and that works. So now I join the static source
> with streaming source before feeding it to flatmapgroupswithstate. It seems
> to work so far…
>
>
>
> Any ideas why accessing the JDBC source within flatmapgroupswithstate
> could fail (freezes Spark execution)? Is it wise to use external
> datasources within flatmapgroupswithstate?
>
>
>
> Thanks,
>
> Chris
>
>
>
>
>


Re: How to reduceByKeyAndWindow in Structured Streaming?

2018-06-28 Thread Gerard Maas
Hi,

In Structured Streaming lingo, "ReduceByKeyAndWindow" would be a window
aggregation with a composite key.
Something like:
stream.groupBy($"key", window($"timestamp", "5 minutes"))
   .agg(sum($"value") as "total")

The aggregate could be any supported SQL function.
Is this what you are looking for? Otherwise, share your specific use case
to see how it could be implemented in Structured Streaming.

kr, Gerard.

On Thu, Jun 28, 2018 at 10:21 AM oripwk  wrote:

> In Structured Streaming, there's the notion of event-time windowing:
>
>
>
> However, this is not quite similar to DStream's windowing operations: in
> Structured Streaming, windowing groups the data by fixed time-windows, and
> every event in a time window is associated to its group:
>
>
> And in DStreams it just outputs all the data according to a limited window
> in time (last 10 minutes for example).
>
> The question was asked also  here
> <
> https://stackoverflow.com/questions/49821646/is-there-someway-to-do-the-eqivalent-of-reducebykeyandwindow-in-spark-structured>
>
> , if it makes it clearer.
>
> How the latter can be achieved in Structured Streaming?
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: [Spark Streaming] Measure latency

2018-06-26 Thread Gerard Maas
Hi Daniele,

A pragmatic approach to do that would be to execute the computations in the
scope of a foreachRDD, surrounded by wall-clock timers.
For example:
dstream.foreachRDD{ rdd =>
   val t0 = System.currentTimeMillis()
   val aggregates = rdd.
   // make sure you get a result here, not another RDD.
   // Otherwise you need to do something like rdd.count to materialize it
   val elapsedTime = System.currentTimeMillis() - t0
   println(s"operation took $elapsedTime")
}

In the end, this will result in the same performance as the batch spark
engine, so you might want to check the performance there first.
If you want to add the stream ingestion time to this, it gets a bit more
tricky.

kind regards, Gerard.



On Tue, Jun 26, 2018 at 11:49 AM Daniele Foroni 
wrote:

> Hi all,
>
> I am using spark streaming and I need to evaluate the latency of the
> standard aggregations (avg, min, max, …) provided by the spark APIs.
> Any way to do it in the code?
>
> Thanks in advance,
> ---
> Daniele
>
>


Re: Advice on multiple streaming job

2018-05-07 Thread Gerard Maas
Dhaval,

Which Streaming API are you using?
In Structured Streaming, you are able to start several streaming queries
within the same context.

kind regards, Gerard.

On Sun, May 6, 2018 at 7:59 PM, Dhaval Modi  wrote:

> Hi Susan,
>
> Thanks for your response.
>
> Will try configuration as suggested.
>
> But still i am looking for answer does Spark support running multiple jobs
> on the same port?
>
> On Sun, May 6, 2018, 20:27 Susan X. Huynh  wrote:
>
>> Hi Dhaval,
>>
>> Not sure if you have considered this: the port 4040 sounds like a driver
>> UI port. By default it will try up to 4056, but you can increase that
>> number with "spark.port.maxRetries". (https://spark.apache.org/
>> docs/latest/configuration.html) Try setting it to "32". This would help
>> if the only conflict is among the driver UI ports (like if you have > 16
>> drivers running on the same host).
>>
>> Susan
>>
>> On Sun, May 6, 2018 at 12:32 AM, vincent gromakowski <
>> vincent.gromakow...@gmail.com> wrote:
>>
>>> Use a scheduler that abstract the network away with a CNI for instance
>>> or other mécanismes (mesos, kubernetes, yarn). The CNI will allow to always
>>> bind on the same ports because each container will have its own IP. Some
>>> other solution like mesos and marathon can work without CNI , with host IP
>>> binding, but will manage the ports for you ensuring there isn't any
>>> conflict.
>>>
>>> Le sam. 5 mai 2018 à 17:10, Dhaval Modi  a
>>> écrit :
>>>
 Hi All,

 Need advice on executing multiple streaming jobs.

 Problem:- We have 100's of streaming job. Every streaming job uses new
 port. Also, Spark automatically checks port from 4040 to 4056, post that it
 fails. One of the workaround, is to provide port explicitly.

 Is there a way to tackle this situation? or Am I missing any thing?

 Thanking you in advance.

 Regards,
 Dhaval Modi
 dhavalmod...@gmail.com

>>>
>>
>>
>> --
>> Susan X. Huynh
>> Software engineer, Data Agility
>> xhu...@mesosphere.com
>>
>


Re: [Structured Streaming] More than 1 streaming in a code

2018-04-16 Thread Gerard Maas
Aakash,

There are two issues here.
The issue with the code on the first question is that the first query
blocks and the code for the second does not get executed. Panagiotis
pointed this out correctly.
In the updated code, the issue is related to netcat (nc) and the way
structured streaming works. As far as I remember, netcat only delivers data
to the first network connection.
On the structured streaming side, each query will issue its own connection.
This results in only the first query getting the data.
If you would talk to a TPC server supporting multiple connected clients,
you would see data in both queries.

If your actual source is Kafka, the original solution of using
`spark.streams.awaitAnyTermination`  should solve the problem.

-kr, Gerard.



On Mon, Apr 16, 2018 at 10:52 AM, Aakash Basu 
wrote:

> Hey Jayesh and Others,
>
> Is there then, any other way to come to a solution for this use-case?
>
> Thanks,
> Aakash.
>
> On Mon, Apr 16, 2018 at 8:11 AM, Lalwani, Jayesh <
> jayesh.lalw...@capitalone.com> wrote:
>
>> Note that what you are trying to do here is join a streaming data frame
>> with an aggregated streaming data frame. As per the documentation, joining
>> an aggregated streaming data frame with another streaming data frame is not
>> supported
>>
>>
>>
>>
>>
>> *From: *spark receiver 
>> *Date: *Friday, April 13, 2018 at 11:49 PM
>> *To: *Aakash Basu 
>> *Cc: *Panagiotis Garefalakis , user <
>> user@spark.apache.org>
>> *Subject: *Re: [Structured Streaming] More than 1 streaming in a code
>>
>>
>>
>> Hi Panagiotis ,
>>
>>
>>
>> Wondering you solved the problem or not? Coz I met the same issue today.
>> I’d appreciate  so much if you could paste the code snippet  if it’s
>> working .
>>
>>
>>
>> Thanks.
>>
>>
>>
>>
>>
>> 在 2018年4月6日,上午7:40,Aakash Basu  写道:
>>
>>
>>
>> Hi Panagiotis,
>>
>> I did that, but it still prints the result of the first query and awaits
>> for new data, doesn't even goes to the next one.
>>
>> *Data -*
>>
>> $ nc -lk 9998
>>
>> 1,2
>> 3,4
>> 5,6
>> 7,8
>>
>> *Result -*
>>
>> ---
>> Batch: 0
>> ---
>> ++
>> |aver|
>> ++
>> | 3.0|
>> ++
>>
>> ---
>> Batch: 1
>> ---
>> ++
>> |aver|
>> ++
>> | 4.0|
>> ++
>>
>>
>>
>> *Updated Code -*
>>
>> from pyspark.sql import SparkSession
>> from pyspark.sql.functions import split
>>
>> spark = SparkSession \
>> .builder \
>> .appName("StructuredNetworkWordCount") \
>> .getOrCreate()
>>
>> data = spark \
>> .readStream \
>> .format("socket") \
>> .option("header","true") \
>> .option("host", "localhost") \
>> .option("port", 9998) \
>> .load("csv")
>>
>>
>> id_DF = data.select(split(data.value, ",").getItem(0).alias("col1"), 
>> split(data.value, ",").getItem(1).alias("col2"))
>>
>> id_DF.createOrReplaceTempView("ds")
>>
>> df = spark.sql("select avg(col1) as aver from ds")
>>
>> df.createOrReplaceTempView("abcd")
>>
>> wordCounts = spark.sql("Select col1, col2, col2/(select aver from abcd) col3 
>> from ds")  # (select aver from abcd)
>>
>> query2 = df \
>> .writeStream \
>> .format("console") \
>> .outputMode("complete") \
>> .trigger(processingTime='5 seconds') \
>> .start()
>>
>> query = wordCounts \
>> .writeStream \
>> .format("console") \
>> .trigger(processingTime='5 seconds') \
>> .start()
>>
>> spark.streams.awaitAnyTermination()
>>
>>
>>
>> Thanks,
>>
>> Aakash.
>>
>>
>>
>> On Fri, Apr 6, 2018 at 4:18 PM, Panagiotis Garefalakis <
>> panga...@gmail.com> wrote:
>>
>> Hello Aakash,
>>
>>
>>
>> When you use query.awaitTermination you are pretty much blocking there
>> waiting for the current query to stop or throw an exception. In your case
>> the second query will not even start.
>>
>> What you could do instead is remove all the blocking calls and use
>> spark.streams.awaitAnyTermination instead (waiting for either query1 or
>> query2 to terminate). Make sure you do that after the query2.start call.
>>
>>
>>
>> I hope this helps.
>>
>>
>>
>> Cheers,
>>
>> Panagiotis
>>
>>
>>
>> On Fri, Apr 6, 2018 at 11:23 AM, Aakash Basu 
>> wrote:
>>
>> Any help?
>>
>> Need urgent help. Someone please clarify the doubt?
>>
>>
>>
>> -- Forwarded message --
>> From: *Aakash Basu* 
>> Date: Thu, Apr 5, 2018 at 3:18 PM
>> Subject: [Structured Streaming] More than 1 streaming in a code
>> To: user 
>>
>> Hi,
>>
>> If I have more than one writeStream in a code, which operates on the same
>> readStream data, why does it produce only the first writeStream? I want the
>> second one to be also printed on the console.
>>
>> How to do that?
>>
>>
>>
>> from pyspark.sql import SparkSession
>> from pyspark.sql.functions import split, col
>>
>> class test:
>>
>>
>> spark = SparkSession.builder \
>> .appName("Stream_Col_Oper_Spark") \
>> .getOrCreate()
>>
>> data = 

[Structured Streaming] File source, Parquet format: use of the mergeSchema option.

2018-04-12 Thread Gerard Maas
Hi,

I'm looking into the Parquet format support for the File source in
Structured Streaming.
The docs mention the use of the option 'mergeSchema' to merge the schemas
of the part files found.[1]

What would be the practical use of that in a streaming context?

In its batch counterpart, `mergeSchemas` would infer the schema superset of
the part-files found.


When using the File source + parquet format in streaming mode, we must
provide a schema to the readStream.schema(...) builder and that schema is
fixed for the duration of the stream.

My current understanding is that:

- Files containing a subset of the fields declared in the schema will
render null values for the non-existing fields.
- For files containing a superset of the fields, the additional data fields
will be lost.
- Files not matching the schema set on the streaming source, will render
all fields null for each record in the file.

Is the 'mergeSchema' option playing another role? From the user
perspective, they may think that this option would help their job cope with
schema evolution at runtime, but that does not seem to be the case.

What is the use of this option?

-kr, Gerard.


[1] https://github.com/apache/spark/blob/master/sql/core/src
/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala#L376


Re: Scala - Spark for beginners

2018-03-18 Thread Gerard Maas
This is a good start:
https://github.com/deanwampler/JustEnoughScalaForSpark

And the corresponding talk:
https://www.youtube.com/watch?v=LBoSgiLV_NQ

There're many more resources if you search for it.

-kr, Gerard.

On Sun, Mar 18, 2018 at 11:15 AM, Mahender Sarangam <
mahender.bigd...@outlook.com> wrote:

> Hi,
>
> Can any one share with me nice tutorials on Spark with Scala like
> videos, blogs for beginners. Mostly focusing on writing scala code.
>
> Thanks in advance.
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark StreamingContext Question

2018-03-07 Thread Gerard Maas
Hi,

You can run as many jobs in your cluster as you want, provided you have
enough capacity.
The one streaming context constrain is per job.

You can submit several jobs for Flume and some other for Twitter, Kafka,
etc...

If you are getting started with Streaming with Spark, I'd recommend you to
look into Structured Streaming first.
In Structured Streaming, you can have many streaming queries running under
the same spark session.
Yet, that does not mean you need to put them all in the same job. You can
(and should) still submit different jobs for different application concerns.

kind regards, Gerard.



On Wed, Mar 7, 2018 at 4:56 AM, ☼ R Nair (रविशंकर नायर) <
ravishankar.n...@gmail.com> wrote:

> Hi all,
>
> Understand from documentation that, only one streaming context can be
> active in a JVM at the same time.
>
> Hence in an enterprise cluster, how can we manage/handle multiple users
> are having many different streaming applications, one may be ingesting data
> from Flume, another from Twitter etc? Is this not available now?
>
> Best,
> Passion
>


Re: can HDFS be a streaming source like Kafka in Spark 2.2.0?

2018-01-15 Thread Gerard Maas
Hi,

You can monitor a filesystem directory as streaming source as long as the
files placed there are atomically copied/moved into the directory.
Updating the files is not supported.

kr, Gerard.

On Mon, Jan 15, 2018 at 11:41 PM, kant kodali  wrote:

> Hi All,
>
> I am wondering if HDFS can be a streaming source like Kafka in Spark
> 2.2.0? For example can I have stream1 reading from Kafka and writing to
> HDFS and stream2 to read from HDFS and write it back to Kakfa ? such that
> stream2 will be pulling the latest updates written by stream1.
>
> Thanks!
>


Re: Spark Streaming with Confluent

2017-12-13 Thread Gerard Maas
Hi Arkadiusz,

Try 'rooting' your import. It looks like the import is being interpreted as
being relative to the previous.
'rooting; is done by adding the  '_root_'  prefix to your import:

import org.apache.spark.streaming.kafka.KafkaUtils
import _root_.io.confluent.kafka.serializers.KafkaAvroDecoder

kr, Gerard.

On Wed, Dec 13, 2017 at 6:05 PM, Arkadiusz Bicz 
wrote:

> Hi,
>
> I try to test spark streaming 2.2.0 version with confluent 3.3.0
>
> I have got lot of error during compilation this is my sbt:
>
> lazy val sparkstreaming = (project in file("."))
>   .settings(
>   name := "sparkstreaming",
>   organization := "org.arek",
>   version := "0.1-SNAPSHOT",
>   scalaVersion := "2.11.8",
> libraryDependencies ++=  Seq(
>   "org.apache.spark" %% "spark-streaming" % "2.2.0",
>   "org.apache.spark" %% "spark-streaming-kafka-0-8" % "2.2.0",
>   "io.confluent" % "kafka-avro-serializer" % "3.3.0"
> )
>   )
>
>
> import org.apache.spark._
> import org.apache.spark.streaming._
> import org.apache.spark.streaming.kafka.KafkaUtils
> import io.confluent.kafka.serializers.KafkaAvroDecoder
>
> object Transformation extends Serializable {
>
>   def main(args: Array[String]) = {
> val conf = new SparkConf().setAppName("StreamingTranformation").
> setMaster("local[*]")
> val streamingContext = new StreamingContext(conf, Seconds(1))
>
> val kafkaParams = Map[String, String]("metadata.broker.list" ->
> "local:2181",
>   "schema.registry.url" -> "http://local:8081";,
>   "auto.offset.reset" -> "smallest")
>
> val topicSet = Set("GEXPPROD_ROUTE")
> val messages = KafkaUtils.createDirectStream[Object, Object,
> KafkaAvroDecoder, KafkaAvroDecoder](streamingContext, kafkaParams,
> topicSet).map(_._2)
>
> val lines = messages.foreachRDD(rdd => {
>   rdd.foreach({ avroRecord =>
> println(avroRecord)
>   })
> })
>   }
>
>
> [warn] Found version conflict(s) in library dependencies; some are
> suspected to be binary incompatible:
> [warn]  * io.netty:netty:3.9.9.Final is selected over {3.6.2.Final,
> 3.7.0.Final}
> [warn]  +- org.apache.spark:spark-core_2.11:2.2.0
>  (depends on 3.7.0.Final)
> [warn]  +- org.apache.zookeeper:zookeeper:3.4.8
>  (depends on 3.7.0.Final)
> [warn]  +- org.apache.zookeeper:zookeeper:3.4.6
>  (depends on 3.6.2.Final)
> [warn]  +- org.apache.hadoop:hadoop-hdfs:2.6.5
> (depends on 3.6.2.Final)
> [warn]  * commons-net:commons-net:2.2 is selected over 3.1
> [warn]  +- org.apache.spark:spark-core_2.11:2.2.0
>  (depends on 3.1)
> [warn]  +- org.apache.hadoop:hadoop-common:2.6.5
> (depends on 3.1)
> [warn]  * com.google.guava:guava:11.0.2 is selected over {12.0.1, 16.0.1}
> [warn]  +- org.apache.hadoop:hadoop-yarn-client:2.6.5
>  (depends on 11.0.2)
> [warn]  +- org.apache.hadoop:hadoop-yarn-api:2.6.5
> (depends on 11.0.2)
> [warn]  +- org.apache.hadoop:hadoop-yarn-common:2.6.5
>  (depends on 11.0.2)
> [warn]  +- org.apache.hadoop:hadoop-yarn-server-nodemanager:2.6.5
> (depends on 11.0.2)
> [warn]  +- org.apache.hadoop:hadoop-common:2.6.5
> (depends on 11.0.2)
> [warn]  +- org.apache.hadoop:hadoop-yarn-server-common:2.6.5
> (depends on 11.0.2)
> [warn]  +- org.apache.hadoop:hadoop-hdfs:2.6.5
> (depends on 11.0.2)
> [warn]  +- org.apache.curator:curator-framework:2.6.0
>  (depends on 16.0.1)
> [warn]  +- org.apache.curator:curator-client:2.6.0
> (depends on 16.0.1)
> [warn]  +- org.apache.curator:curator-recipes:2.6.0
>  (depends on 16.0.1)
> [warn]  +- org.htrace:htrace-core:3.0.4   (depends
> on 12.0.1)
> [warn] Run 'evicted' to see detailed eviction warnings
> [info] Compiling 1 Scala source to /home/adminuser/data-
> streaming-platform/sparkstreaming/target/scala-2.11/classes ...
> [error] /home/adminuser/data-streaming-platform/
> sparkstreaming/src/main/scala/com/arek/streaming/spark/Transformation.scala:6:11:
> object confluent is not a member of package org.apache.spark.io
> [error] import io.confluent.kafka.serializers.KafkaAvroDecoder
> [error]   ^
> [error] /home/adminuser/data-streaming-platform/
> sparkstreaming/src/main/scala/com/arek/streaming/spark/Transformation.scala:19:66:
> not found: type KafkaAvroDecoder
> [error] val messages = KafkaUtils.createDirectStream[Object, Object,
> KafkaAvroDecoder, KafkaAvroDecoder](streamingContext, kafkaParams,
> topicSet).map(_._2)
> [error]  ^
> [error] /home/adminuser/data-streaming-platform/
> sparkstreaming/src/main/scala/com/arek/streaming/spark/Transformation.scala:19:84:
> not found: type KafkaAvroDecoder
> [error] val messages = KafkaUtils.createDirectStream[Object, Object,
> KafkaAvroDecoder, KafkaAvroDecoder](streamingContext, kafkaParams,
> topicSet).map(_._2)
> [error]
>
>
> When changing to library  "org.apache.spark" %%
> "spark-streaming-kafka-0-10" % "2.2.0" :
>
>
> [warn] Found version conflict(s) i

Re: Union of RDDs Hung

2017-12-12 Thread Gerard Maas
Can you show us the code?

On Tue, Dec 12, 2017 at 9:02 AM, Vikash Pareek 
wrote:

> Hi All,
>
> I am unioning 2 rdds(each of them having 2 records) but this union it is
> getting hang.
> I found a solution to this that is caching both the rdds before performing
> union but I could not figure out the root cause of hanging the job.
>
> Is somebody knows why this happens with union?
>
> Spark version I am using is 1.6.1
>
>
> Best Regards,
> Vikash Pareek
>
>
>
> -
>
> __Vikash Pareek
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Do I need to do .collect inside forEachRDD

2017-12-06 Thread Gerard Maas
Hi Kant,

>  but would your answer on .collect() change depending on running the
spark app in client vs cluster mode?

No, it should make no difference.

-kr, Gerard.

On Tue, Dec 5, 2017 at 11:34 PM, kant kodali  wrote:

> @Richard I don't see any error in the executor log but let me run again to
> make sure.
>
> @Gerard Thanks much!  but would your answer on .collect() change depending
> on running the spark app in client vs cluster mode?
>
> Thanks!
>
> On Tue, Dec 5, 2017 at 1:54 PM, Gerard Maas  wrote:
>
>> The general answer to your initial question is that "it depends". If the
>> operation in the rdd.foreach() closure can be parallelized, then you don't
>> need to collect first. If it needs some local context (e.g. a socket
>> connection), then you need to do rdd.collect first to bring the data
>> locally, which has a perf penalty and also is restricted to the memory size
>> to the driver process.
>>
>> Given the further clarification:
>> >Reads from Kafka and outputs to Kafka. so I check the output from Kafka.
>>
>> If it's writing to Kafka, that operation can be done in a distributed
>> form.
>>
>> You could use this lib: https://github.com/BenFradet/spark-kafka-writer
>>
>> Or, if you can upgrade to Spark 2.2 version, you can pave your way to
>> migrate to structured streaming by already adopting the 'structured' APIs
>> within Spark Streaming:
>>
>> case class KV(key: String, value: String)
>>
>> dstream.map().reduce().forEachRdd{rdd ->
>> import spark.implicits._
>> val kv = rdd.map{e => KV(extractKey(e), extractValue(e))} // needs to
>> be in a (key,value) shape
>> val dataFrame = rdd.toDF()
>> dataFrame.write
>>  .format("kafka")
>>  .option("kafka.bootstrap.servers",
>> "host1:port1,host2:port2")
>>  .option("topic", "topic1")
>>  .save()
>> }
>>
>> -kr, Gerard.
>>
>>
>>
>> On Tue, Dec 5, 2017 at 10:38 PM, kant kodali  wrote:
>>
>>> Reads from Kafka and outputs to Kafka. so I check the output from Kafka.
>>>
>>> On Tue, Dec 5, 2017 at 1:26 PM, Qiao, Richard <
>>> richard.q...@capitalone.com> wrote:
>>>
>>>> Where do you check the output result for both case?
>>>>
>>>> Sent from my iPhone
>>>>
>>>>
>>>> > On Dec 5, 2017, at 15:36, kant kodali  wrote:
>>>> >
>>>> > Hi All,
>>>> >
>>>> > I have a simple stateless transformation using Dstreams (stuck with
>>>> the old API for one of the Application). The pseudo code is rough like this
>>>> >
>>>> > dstream.map().reduce().forEachRdd(rdd -> {
>>>> >  rdd.collect(),forEach(); // Is this necessary ? Does execute
>>>> fine but a bit slow
>>>> > })
>>>> >
>>>> > I understand collect collects the results back to the driver but is
>>>> that necessary? can I just do something like below? I believe I tried both
>>>> and somehow the below code didn't output any results (It can be issues with
>>>> my env. I am not entirely sure) but I just would like some clarification on
>>>> .collect() since it seems to slow things down for me.
>>>> >
>>>> > dstream.map().reduce().forEachRdd(rdd -> {
>>>> >  rdd.forEach(() -> {} ); //
>>>> > })
>>>> >
>>>> > Thanks!
>>>> >
>>>> >
>>>> 
>>>>
>>>> The information contained in this e-mail is confidential and/or
>>>> proprietary to Capital One and/or its affiliates and may only be used
>>>> solely in performance of work or services for Capital One. The information
>>>> transmitted herewith is intended only for use by the individual or entity
>>>> to which it is addressed. If the reader of this message is not the intended
>>>> recipient, you are hereby notified that any review, retransmission,
>>>> dissemination, distribution, copying or other use of, or taking of any
>>>> action in reliance upon this information is strictly prohibited. If you
>>>> have received this communication in error, please contact the sender and
>>>> delete the material from your computer.
>>>>
>>>>
>>>
>>
>


Re: Do I need to do .collect inside forEachRDD

2017-12-05 Thread Gerard Maas
The general answer to your initial question is that "it depends". If the
operation in the rdd.foreach() closure can be parallelized, then you don't
need to collect first. If it needs some local context (e.g. a socket
connection), then you need to do rdd.collect first to bring the data
locally, which has a perf penalty and also is restricted to the memory size
to the driver process.

Given the further clarification:
>Reads from Kafka and outputs to Kafka. so I check the output from Kafka.

If it's writing to Kafka, that operation can be done in a distributed form.

You could use this lib: https://github.com/BenFradet/spark-kafka-writer

Or, if you can upgrade to Spark 2.2 version, you can pave your way to
migrate to structured streaming by already adopting the 'structured' APIs
within Spark Streaming:

case class KV(key: String, value: String)

dstream.map().reduce().forEachRdd{rdd ->
import spark.implicits._
val kv = rdd.map{e => KV(extractKey(e), extractValue(e))} // needs to
be in a (key,value) shape
val dataFrame = rdd.toDF()
dataFrame.write
 .format("kafka")
 .option("kafka.bootstrap.servers",
"host1:port1,host2:port2")
 .option("topic", "topic1")
 .save()
}

-kr, Gerard.



On Tue, Dec 5, 2017 at 10:38 PM, kant kodali  wrote:

> Reads from Kafka and outputs to Kafka. so I check the output from Kafka.
>
> On Tue, Dec 5, 2017 at 1:26 PM, Qiao, Richard  > wrote:
>
>> Where do you check the output result for both case?
>>
>> Sent from my iPhone
>>
>>
>> > On Dec 5, 2017, at 15:36, kant kodali  wrote:
>> >
>> > Hi All,
>> >
>> > I have a simple stateless transformation using Dstreams (stuck with the
>> old API for one of the Application). The pseudo code is rough like this
>> >
>> > dstream.map().reduce().forEachRdd(rdd -> {
>> >  rdd.collect(),forEach(); // Is this necessary ? Does execute fine
>> but a bit slow
>> > })
>> >
>> > I understand collect collects the results back to the driver but is
>> that necessary? can I just do something like below? I believe I tried both
>> and somehow the below code didn't output any results (It can be issues with
>> my env. I am not entirely sure) but I just would like some clarification on
>> .collect() since it seems to slow things down for me.
>> >
>> > dstream.map().reduce().forEachRdd(rdd -> {
>> >  rdd.forEach(() -> {} ); //
>> > })
>> >
>> > Thanks!
>> >
>> >
>> 
>>
>> The information contained in this e-mail is confidential and/or
>> proprietary to Capital One and/or its affiliates and may only be used
>> solely in performance of work or services for Capital One. The information
>> transmitted herewith is intended only for use by the individual or entity
>> to which it is addressed. If the reader of this message is not the intended
>> recipient, you are hereby notified that any review, retransmission,
>> dissemination, distribution, copying or other use of, or taking of any
>> action in reliance upon this information is strictly prohibited. If you
>> have received this communication in error, please contact the sender and
>> delete the material from your computer.
>>
>>
>


Re: Issue Storing offset in Kafka for Spark Streaming Application

2017-10-13 Thread Gerard Maas
Hi Arpan,

The error suggests that the streaming context has been started with
streamingContext.start() and after that statement, some other
dstream operations have been attempted.
A suggested pattern to manage the offsets is the following:

var offsetRanges: Array[OffsetRanger] = _

//create streaming context, streams, ...
// as first operation after the stream has been created, do:

stream.foreachRDD { rdd =>
   offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
}
//Then do other desired operations on the streaming data
val resultStream = stream.map(...).filter(...).transform(...)
//materialize the resulting stream

resultStream.foreachRDD{rdd =>
// do stuff... write to a db, to a kafka topic,... whatever,...

//at the end of the process, commit the offsets (note that I use the
original stream instance, not `resultStream`
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

I hope this helps,

kr, Gerard.











On Fri, Oct 13, 2017 at 3:34 PM, Arpan Rajani  wrote:

> Hi all,
>
> In our cluster we have Kafka 0.10.1 and Spark 2.1.0. We are trying to
> store the offsets in Kafka in order to achieve restartability of the
> streaming application. ( Using checkpoints, I already implemented, we will
> require to change code in production hence checkpoint won't work)
>
> Checking Spark Streaming documentation- Storing offsets on Kafka approach
> :
>
> http://spark.apache.org/docs/latest/streaming-kafka-0-10-
> integration.html#kafka-itself, which describes :
>
> stream.foreachRDD { rdd =>
>   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>
>   // some time later, after outputs have completed
>   stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
> }
>
>
> Based on this, I modified the code like following:
>
> val kafkaMap:Map[String,Object] = KakfaConfigs
>
> val stream:InputDStream[ConsumerRecord[String,String]] = 
> KafkaUtil.createDirectStream(ssc, PreferConsistent, Subscribe[String,String] 
> (Array("topicName"),kafkaMap))
>
> stream.foreach { rdd =>
> val offsetRangers : Array[OffsetRanger] = 
> rdd.asInstanceOf[HasOffsetRangers].offsetRanges
>
> // Filter out the values which have empty values and get the tuple of type
> // ( topicname, stringValue_read_from_kafka_topic)
> stream.map(x => ("topicName",x.value)).filter(x=> 
> !x._2.trim.isEmpty).foreachRDD(processRDD _)
>
> // Sometime later, after outputs have completed.
> stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
> }
>
>
> def processRDD(rdd:RDD[(String,String)]) {
>  // Process futher to hdfs
> }
>
> Now, When I try to start Streaming application, it does not start and
> looking at the logs, here is what we see :
>
> java.lang.IllegalStateException: Adding new inputs, transformations, and 
> output operations after starting a context is not supported
> at 
> org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:223)
> at org.apache.spark.streaming.dstream.DStream.(DStream.scala:65)
>
>
> Can anyone suggest, or help to understand what are we missing here?
>
>
> Regards,
> Arpan
>


Re: Spark Streaming - Multiple Spark Contexts (SparkSQL) Performance

2017-10-01 Thread Gerard Maas
Hammad,

The recommended way to implement this logic would be to:

Create a SparkSession.
Create a Streaming Context using the SparkContext embedded in the
SparkSession

Use the single SparkSession instance for the SQL operations within the
foreachRDD.
It's important to note that spark operations can process the complete
dataset. In this case, there's no need to do a perPartition or perElement
operation. (that would be the case if we were directly using the drivers
API and DB connections)

Reorganizing the code in the question a bit, we should have:

 SparkSession sparkSession = SparkSession
.builder()
.setMaster("local[2]").setAppName("TransformerStreamPOC")

.config("spark.some.config.option", "some-value")
.getOrCreate();

JavaStreamingContext jssc = new
JavaStreamingContext(sparkSession.sparkContext,
Durations.seconds(60));

// this dataset doesn't seem to depend on the received data, so we can
load it once.

Dataset baselineData =
sparkSession.read().jdbc(MYSQL_CONNECTION_URL, "table_name",
connectionProperties);

// create dstream

DStream dstream = ...

... operations on dstream...

dstream.foreachRDD { rdd =>

Dataset incomingData = sparkSession.createDataset(rdd)

   ... do something the incoming dataset, eg. join with the baseline ...

   DataFrame joined =  incomingData.join(baselineData, ...)

   ... do something with joined ...

  }


kr, Gerard.

On Sun, Oct 1, 2017 at 7:55 PM, Hammad  wrote:

> Hello,
>
> *Background:*
>
> I have Spark Streaming context;
>
> SparkConf conf = new 
> SparkConf().setMaster("local[2]").setAppName("TransformerStreamPOC");
> conf.set("spark.driver.allowMultipleContexts", "true");   *<== this*
> JavaStreamingContext jssc = new JavaStreamingContext(conf, 
> Durations.seconds(60));
>
>
> that subscribes to certain kafka *topics*;
>
> JavaInputDStream> stream =
> KafkaUtils.createDirectStream(
> jssc,
> LocationStrategies.PreferConsistent(),
> ConsumerStrategies.Subscribe(*topics*, 
> kafkaParams)
> );
>
> when messages arrive in queue, I recursively process them as follows (below 
> code section will repeat in Question statement)
>
> stream.foreachRDD(rdd -> {
> //process here - below two scenarions code is inserted here
>
> });
>
>
> *Question starts here:*
>
> Since I need to apply SparkSQL to received events in Queue - I create 
> SparkSession with two scenarios;
>
> *1) Per partition one sparkSession (after 
> "spark.driver.allowMultipleContexts" set to true); so all events under this 
> partition are handled by same sparkSession*
>
> rdd.foreachPartition(partition -> {
> SparkSession sparkSession = SparkSession
> .builder()
> .appName("Java Spark SQL basic example")
> .config("spark.some.config.option", "some-value")
> .getOrCreate();
>
> while (partition.hasNext()) {
>   Dataset df = sparkSession.read().jdbc(MYSQL_CONNECTION_URL, 
> "table_name", connectionProperties);
>
> }}
>
> *2) Per event under each session; so each event under each queue under each 
> stream has one sparkSession;*
>
> rdd.foreachPartition(partition -> {while (partition.hasNext()) {
> SparkSession sparkSession = SparkSession.builder().appName("Java Spark SQL 
> basic example").config("spark.some.config.option", 
> "some-value").getOrCreate();
>
> Dataset df = sparkSession.read().jdbc(MYSQL_CONNECTION_URL, 
> "table_name", connectionProperties);
>
> }}
>
>
> Is it good practice to create multiple contexts (lets say 10 or 100)?
> How does number of sparkContext to be allowed vs number of worker nodes
> relate?
> What are performance considerations with respect to scenario1 and
> scenario2?
>
> I am looking for these answers as I feel there is more to what I
> understand of performance w.r.t sparkContexts created by a streaming
> application.
> Really appreciate your support in anticipation.
>
> Hammad
>
>


Re: [StructuredStreaming] multiple queries of the socket source: only one query works.

2017-08-13 Thread Gerard Maas
Hi Shixiong,

Thanks for the explanation.

In my view, this is different from the intuitive understanding of the
Structured Streaming model [1], where incoming data is appended to an
'unbounded table' and queries are run on that. I had expected that all
queries would run on that 'unbounded table view', while I understand from
your explanation that every query maintains its own 'unbounded table' view
of the data stream. Is that correct?

How is that working in the case of Kafka? We have only one declared
consumer, so we should observe a similar behavior. Yet, the Kafka source is
able to deliver multiple output queries.
What is the difference?
Where could I learn more about the internal structured streaming model?

kind regards, Gerard.



[1]
https://spark.apache.org/docs/2.2.0/structured-streaming-programming-guide.html#basic-concepts

On Sun, Aug 13, 2017 at 1:22 AM, Shixiong(Ryan) Zhu  wrote:

> Spark creates one connection for each query. The behavior you observed is
> because how "nc -lk" works. If you use `netstat` to check the tcp
> connections, you will see there are two connections when starting two
> queries. However, "nc" forwards the input to only one connection.
>
> On Fri, Aug 11, 2017 at 10:59 PM, Rick Moritz  wrote:
>
>> Hi Gerard, hi List,
>>
>> I think what this would entail is for Source.commit to change its
>> funcationality. You would need to track all streams' offsets there.
>> Especially in the socket source, you already have a cache (haven't looked
>> at Kafka's implementation to closely yet), so that shouldn't be the issue,
>> if at start-time all streams subscribed to a source are known.
>> What I worry about is, that this may need an API-change, to pass a
>> stream-ID into commit. Since different streams can use different Triggers,
>> you can have any number of unforeseeable results, when multiple threads
>> call commit.
>>
>> I'll look into that, since I am in the progress of building a
>> TwitterSource based on the socket source's general functionality, and due
>> to the API restrictions there, it's even more important for multiple
>> streams using one source.
>>
>> What I did observe was that every query did initialize a separate source.
>> This won't work so well with socket, since the socket is in use, once you
>> try to set up a second one. It also won't work so well with Twitter, since
>> usually an API key is limited in how often it can be used somultaneously
>> (likely at 2).
>>
>> An alternative to the socket source issue would be to open a new free
>> socket, but then the user has to figure out where the source is listening.
>>
>> I second Gerard's request for additional information, and confirmation of
>> my theories!
>>
>> Thanks,
>>
>> Rick
>>
>> On Fri, Aug 11, 2017 at 2:53 PM, Gerard Maas 
>> wrote:
>>
>>> Hi,
>>>
>>> I've been investigating this SO question: https://stackoverflo
>>> w.com/questions/45618489/executing-separate-streaming-querie
>>> s-in-spark-structured-streaming
>>>
>>> TL;DR: when using the Socket source, trying to create multiple queries
>>> does not work properly, only one the first query in the start order will
>>> receive data.
>>>
>>> This minimal example reproduces the issue:
>>>
>>> val lines = spark
>>> .readStream
>>> .format("socket")
>>> .option("host", "localhost")
>>> .option("port", "")
>>> .option("includeTimestamp", true)
>>> .load()
>>>
>>> val q1 = lines.writeStream
>>>   .outputMode("append")
>>>   .format("console")
>>>   .start()
>>>
>>> val q2 = lines.withColumn("foo", lit("foo")).writeStream
>>>   .outputMode("append")
>>>   .format("console")
>>>   .start()
>>>
>>> Sample output (spark shell):
>>>
>>> Batch: 0
>>> ---
>>> +-+---+
>>> |value|  timestamp|
>>> +-+---+
>>> |  aaa|2017-08-11 23:37:59|
>>> +-+---+
>>>
>>> ---
>>> Batch: 1
>>> ---
>>> +-+---+
>>> |value|  timestamp|
>>> +-+---+
>>> |

[StructuredStreaming] multiple queries of the socket source: only one query works.

2017-08-11 Thread Gerard Maas
Hi,

I've been investigating this SO question:
https://stackoverflow.com/questions/45618489/executing-separate-streaming-queries-in-spark-structured-streaming

TL;DR: when using the Socket source, trying to create multiple queries does
not work properly, only one the first query in the start order will
receive data.

This minimal example reproduces the issue:

val lines = spark
.readStream
.format("socket")
.option("host", "localhost")
.option("port", "")
.option("includeTimestamp", true)
.load()

val q1 = lines.writeStream
  .outputMode("append")
  .format("console")
  .start()

val q2 = lines.withColumn("foo", lit("foo")).writeStream
  .outputMode("append")
  .format("console")
  .start()

Sample output (spark shell):

Batch: 0
---
+-+---+
|value|  timestamp|
+-+---+
|  aaa|2017-08-11 23:37:59|
+-+---+

---
Batch: 1
---
+-+---+
|value|  timestamp|
+-+---+
|  aaa|2017-08-11 23:38:00|
+-+---+

q1.stop

scala> ---
Batch: 0
---
+-+---+---+
|value|  timestamp|foo|
+-+---+---+
|b|2017-08-11 23:38:19|foo|
+-+---+---+

---
Batch: 1
---
+-+---+---+
|value|  timestamp|foo|
+-+---+---+
|b|2017-08-11 23:38:19|foo|
+-+---+---+

This is certainly unexpected behavior. Even though the socket source is
marked "not for production" I wouldn't expect to be so limited.

Am I right to think that the first running query consumes all the data in
the source, and therefore all the other queries do not work (until the
previous ones are stopped)?

Is this a generalized behavior? e.g. each query started on a structured
streaming job fully consumes the source? e.g. the Kafka source can be used
with multiple queries because it can be replayed?

As a workaround, would there be a way to cache the incoming data to
multiplex it? We cannot call `cache` a streaming dataset, but is there a
maybe way to do that?

Could I have more details on the execution model (I've consumed all I could
find) and what are the (near) future plans?

thanks!

-Gerard.


Re: Need Spark(Scala) Performance Tuning tips

2017-06-09 Thread Gerard Maas
also, read the newest book of Holden  on High-Performance Spark:

http://shop.oreilly.com/product/0636920046967.do

On Fri, Jun 9, 2017 at 5:38 PM, Alonso Isidoro Roman 
wrote:

> a quick search on google:
>
> https://www.cloudera.com/documentation/enterprise/5-9-
> x/topics/admin_spark_tuning.html
>
> https://blog.cloudera.com/blog/2015/03/how-to-tune-your-
> apache-spark-jobs-part-1/
>
> http://blog.cloudera.com/blog/2015/03/how-to-tune-your-
> apache-spark-jobs-part-2/
>
> and of course, Jacek`s
> 
>
>
>
> Alonso Isidoro Roman
> [image: https://]about.me/alonso.isidoro.roman
>
> 
>
> 2017-06-09 14:50 GMT+02:00 Debabrata Ghosh :
>
>> Hi,
>>  I need some help / guidance in performance tuning
>> Spark code written in Scala. Can you please help.
>>
>> Thanks
>>
>
>


Re: How to perform clean-up after stateful streaming processes an RDD?

2017-06-06 Thread Gerard Maas
It looks like the clean up should go into the foreachRDD function:

stateUpdateStream.foreachRdd(...) { rdd =>
// do stuff with the rdd

  stateUpdater.cleanupExternalService// should work in this position
}

Code within the foreachRDD(*) executes on the driver, so you can keep the
state of the object there.

What will not work is to update the stateUpdater state from a side effect
of the stateUpdateFunction used in the mapWithState transformation and
expect those changes to be visible at the call site sketched above.

kr, Gerard.

(*) a typical construct found in the wild is:
dstream.foreachRDD{rdd =>
   // do some preparation
   rdd.operation{elem => ... }
   ...
   // close/clean/report
}
So the code within the foreachRDD closure executes on the driver, *but* the
code within the rdd.operation{...} closure is a spark operation and
executes distributed on the executors.
One must be careful of not incorrectly mixing the scopes, in particular
when holding on to local state.



On Wed, Jun 7, 2017 at 1:08 AM, David Rosenstrauch 
wrote:

> We have some code we've written using stateful streaming (mapWithState)
> which works well for the most part.  The stateful streaming runs, processes
> the RDD of input data, calls the state spec function for each input record,
> and does all proper adding and removing from the state cache.
>
> However, I have a need to do some cleanup after stateful streaming
> processes the input data RDD, and I can't seem to find any place where we
> can put that code where it will run when it's supposed to.
>
> Essentially our state spec function needs to a) call out to an external
> service, b) hold some data from that service, and then c) inform the
> service to clean up the remaining data once the RDD is complete.
>
> I've gotten to the point where the code looks approximately like this:
>
>
> val eventStream = incomingStream.transform(...)
>
> val stateUpdater = new StateUpdater
> val stateUpdateStream = 
> eventStream.mapWithState(stateUpdater.stateUpdateFunction
> _)
>
> stateUpdateStream.foreachRdd(...) {
> ...
> }
> stateUpdater.cleanupExternalService// DOES NOT WORK!
>
>
> class StateUpdater extends Serializable {
>
> def stateUpdateFunction(key, value, state) {
> if (!state.initalized) {
> state.initialize(externalService)
> }
> ...
> }
>
> def cleanupExternalService {
> externalService.cleanup  // writes some data back to the external service
> }
>
> @transient lazy val externalService = new ExternalService
> }
>
>
> Note that the ExternalService object is holding onto a small bit of state
> that it needs to write back to the external service once we have completed
> running the stateUpdateFunction on every record in the RDD.  However this
> code doesn't actually work.  Because of the way Spark serializes objects on
> the driver and then deserializes them onto the executor, there's no way for
> me to get a hold of the ExternalService object that is being used on each
> RDD partition and clean up its leftover data.  Those objects seem to be
> held internally somewhere in the bowels of stateful streaming (where it
> processes an RDD of incoming data and applies it to the state).  And back
> in the main code where I'm trying to call the cleanup method, I'm actually
> calling it on a totally different object than the one that ran in the RDD
> partitions.  And stateful streaming doesn't provide me with any opportunity
> to perform any cleanup processing - say by calling some "rddDone" method to
> notify me that it just finished doing state processing on an RDD.  It just
> calls only the statespec function over and over, once for every record, and
> never notifying me that we've ended processing an RDD or started a new one.
>
>
> Is there any way out of this conundrum?  I've tried to avoid the problem
> by moving my interactions with the external service outside of the state
> spec function.  But that didn't work:  the interaction with the external
> service is really needed inside of the state spec function, and I caused a
> bug in our code when I tried to move it.
>
> Any suggestions that I might not have thought of on how to fix this issue?
>
> Thanks,
>
> DR
>


[StackOverflow] Size exceeds Integer.MAX_VALUE When Joining 2 Large DFs

2016-11-25 Thread Gerard Maas
This question seems to deserve an scalation from Stack Overflow:

http://stackoverflow.com/questions/40803969/spark-size-exceeds-integer-max-value-when-joining-2-large-dfs

Looks like an important limitation.

-kr, Gerard.

Meta:PS: What do you think would be the best way to scalate from SO? Should
I copy the question contents or just the link?


Re: HiveContext standalone => without a Hive metastore

2016-05-30 Thread Gerard Maas
Michael,  Mitch, Silvio,

Thanks!

The own directoy is the issue. We are running the Spark Notebook, which
uses the same dir per server (i.e. for all notebooks). So this issue
prevents us from running 2 notebooks using HiveContext.
I'll look in a proper Hive installation and I'm glad to know that this
dependency is gone in 2.0
Look forward to 2.1 :-) ;-)

-kr, Gerard.


On Thu, May 26, 2016 at 10:55 PM, Michael Armbrust 
wrote:

> You can also just make sure that each user is using their own directory.
> A rough example can be found in TestHive.
>
> Note: in Spark 2.0 there should be no need to use HiveContext unless you
> need to talk to a metastore.
>
> On Thu, May 26, 2016 at 1:36 PM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Well make sure than you set up a reasonable RDBMS as metastore. Ours is
>> Oracle but you can get away with others. Check the supported list in
>>
>> hduser@rhes564:: :/usr/lib/hive/scripts/metastore/upgrade> ltr
>> total 40
>> drwxr-xr-x 2 hduser hadoop 4096 Feb 21 23:48 postgres
>> drwxr-xr-x 2 hduser hadoop 4096 Feb 21 23:48 mysql
>> drwxr-xr-x 2 hduser hadoop 4096 Feb 21 23:48 mssql
>> drwxr-xr-x 2 hduser hadoop 4096 Feb 21 23:48 derby
>> drwxr-xr-x 3 hduser hadoop 4096 May 20 18:44 oracle
>>
>> you have few good ones in the list.  In general the base tables (without
>> transactional support) are around 55  (Hive 2) and don't take much space
>> (depending on the volume of tables). I attached a E-R diagram.
>>
>> HTH
>>
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 26 May 2016 at 19:09, Gerard Maas  wrote:
>>
>>> Thanks a lot for the advice!.
>>>
>>> I found out why the standalone hiveContext would not work:  it was
>>> trying to deploy a derby db and the user had no rights to create the dir
>>> where there db is stored:
>>>
>>> Caused by: java.sql.SQLException: Failed to create database
>>> 'metastore_db', see the next exception for details.
>>>
>>>at
>>> org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown
>>> Source)
>>>
>>>at
>>> org.apache.derby.impl.jdbc.SQLExceptionFactory40.wrapArgsForTransportAcrossDRDA(Unknown
>>> Source)
>>>
>>>... 129 more
>>>
>>> Caused by: java.sql.SQLException: Directory
>>> /usr/share/spark-notebook/metastore_db cannot be created.
>>>
>>>
>>> Now, the new issue is that we can't start more than 1 context at the
>>> same time. I think we will need to setup a proper metastore.
>>>
>>>
>>> -kind regards, Gerard.
>>>
>>>
>>>
>>>
>>> On Thu, May 26, 2016 at 3:06 PM, Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
>>>> To use HiveContext witch is basically an sql api within Spark without
>>>> proper hive set up does not make sense. It is a super set of Spark
>>>> SQLContext
>>>>
>>>> In addition simple things like registerTempTable may not work.
>>>>
>>>> HTH
>>>>
>>>> Dr Mich Talebzadeh
>>>>
>>>>
>>>>
>>>> LinkedIn * 
>>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>
>>>>
>>>>
>>>> http://talebzadehmich.wordpress.com
>>>>
>>>>
>>>>
>>>> On 26 May 2016 at 13:01, Silvio Fiorito 
>>>> wrote:
>>>>
>>>>> Hi Gerard,
>>>>>
>>>>>
>>>>>
>>>>> I’ve never had an issue using the HiveContext without a hive-site.xml
>>>>> configured. However, one issue you may have is if multiple users are
>>>>> starting the HiveContext from the same path, they’ll all be trying to 
>>>>> store
>>>>> the default Derby metastore in the same location. Also, if you want them 
>>>>> to
>>>>> be able to persist permanent table metadata for SparkSQL then you’ll want
>>>>> to

Re: HiveContext standalone => without a Hive metastore

2016-05-26 Thread Gerard Maas
Thanks a lot for the advice!.

I found out why the standalone hiveContext would not work:  it was trying
to deploy a derby db and the user had no rights to create the dir where
there db is stored:

Caused by: java.sql.SQLException: Failed to create database 'metastore_db',
see the next exception for details.

   at
org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown
Source)

   at
org.apache.derby.impl.jdbc.SQLExceptionFactory40.wrapArgsForTransportAcrossDRDA(Unknown
Source)

   ... 129 more

Caused by: java.sql.SQLException: Directory
/usr/share/spark-notebook/metastore_db cannot be created.


Now, the new issue is that we can't start more than 1 context at the same
time. I think we will need to setup a proper metastore.


-kind regards, Gerard.




On Thu, May 26, 2016 at 3:06 PM, Mich Talebzadeh 
wrote:

> To use HiveContext witch is basically an sql api within Spark without
> proper hive set up does not make sense. It is a super set of Spark
> SQLContext
>
> In addition simple things like registerTempTable may not work.
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 26 May 2016 at 13:01, Silvio Fiorito 
> wrote:
>
>> Hi Gerard,
>>
>>
>>
>> I’ve never had an issue using the HiveContext without a hive-site.xml
>> configured. However, one issue you may have is if multiple users are
>> starting the HiveContext from the same path, they’ll all be trying to store
>> the default Derby metastore in the same location. Also, if you want them to
>> be able to persist permanent table metadata for SparkSQL then you’ll want
>> to set up a true metastore.
>>
>>
>>
>> The other thing it could be is Hive dependency collisions from the
>> classpath, but that shouldn’t be an issue since you said it’s standalone
>> (not a Hadoop distro right?).
>>
>>
>>
>> Thanks,
>>
>> Silvio
>>
>>
>>
>> *From: *Gerard Maas 
>> *Date: *Thursday, May 26, 2016 at 5:28 AM
>> *To: *spark users 
>> *Subject: *HiveContext standalone => without a Hive metastore
>>
>>
>>
>> Hi,
>>
>>
>>
>> I'm helping some folks setting up an analytics cluster with  Spark.
>>
>> They want to use the HiveContext to enable the Window functions on
>> DataFrames(*) but they don't have any Hive installation, nor they need one
>> at the moment (if not necessary for this feature)
>>
>>
>>
>> When we try to create a Hive context, we get the following error:
>>
>>
>>
>> > val sqlContext = new org.apache.spark.sql.hive.HiveContext(sparkContext)
>>
>> java.lang.RuntimeException: java.lang.RuntimeException: Unable to
>> instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient
>>
>>at
>> org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:522)
>>
>>
>>
>> Is my HiveContext failing b/c it wants to connect to an unconfigured
>>  Hive Metastore?
>>
>>
>>
>> Is there  a way to instantiate a HiveContext for the sake of Window
>> support without an underlying Hive deployment?
>>
>>
>>
>> The docs are explicit in saying that that is should be the case: [1]
>>
>>
>>
>> "To use a HiveContext, you do not need to have an existing Hive setup,
>> and all of the data sources available to aSQLContext are still
>> available. HiveContext is only packaged separately to avoid including
>> all of Hive’s dependencies in the default Spark build."
>>
>>
>>
>> So what is the right way to address this issue? How to instantiate a
>> HiveContext with spark running on a HDFS cluster without Hive deployed?
>>
>>
>>
>>
>>
>> Thanks a lot!
>>
>>
>>
>> -Gerard.
>>
>>
>>
>> (*) The need for a HiveContext to use Window functions is pretty obscure.
>> The only documentation of this seems to be a runtime exception: 
>> "org.apache.spark.sql.AnalysisException:
>> Could not resolve window function 'max'. Note that, using window functions
>> currently requires a HiveContext;"
>>
>>
>>
>> [1]
>> http://spark.apache.org/docs/latest/sql-programming-guide.html#getting-started
>>
>
>


HiveContext standalone => without a Hive metastore

2016-05-26 Thread Gerard Maas
Hi,

I'm helping some folks setting up an analytics cluster with  Spark.
They want to use the HiveContext to enable the Window functions on
DataFrames(*) but they don't have any Hive installation, nor they need one
at the moment (if not necessary for this feature)

When we try to create a Hive context, we get the following error:

> val sqlContext = new org.apache.spark.sql.hive.HiveContext(sparkContext)

java.lang.RuntimeException: java.lang.RuntimeException: Unable to
instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient

   at
org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:522)

Is my HiveContext failing b/c it wants to connect to an unconfigured  Hive
Metastore?

Is there  a way to instantiate a HiveContext for the sake of Window support
without an underlying Hive deployment?

The docs are explicit in saying that that is should be the case: [1]

"To use a HiveContext, you do not need to have an existing Hive setup, and
all of the data sources available to aSQLContext are still available.
HiveContext is only packaged separately to avoid including all of Hive’s
dependencies in the default Spark build."

So what is the right way to address this issue? How to instantiate a
HiveContext with spark running on a HDFS cluster without Hive deployed?


Thanks a lot!

-Gerard.

(*) The need for a HiveContext to use Window functions is pretty obscure.
The only documentation of this seems to be a runtime exception: "
org.apache.spark.sql.AnalysisException: Could not resolve window function
'max'. Note that, using window functions currently requires a HiveContext;"


[1]
http://spark.apache.org/docs/latest/sql-programming-guide.html#getting-started


Re: Create one DB connection per executor

2016-03-24 Thread Gerard Maas
Hi Manas,

The approach is correct, with one caveat: You may have several tasks
executing in parallel in one executor. Having one single connection per JVM
will either fail, if the connection is not thread-safe or become a
bottleneck b/c all task will be competing for the same resource.
The best approach would be to extend your current idea with a pool of
connections, where you can 'borrow'  a connection and return it after use.

-kr, Gerard.


On Thu, Mar 24, 2016 at 2:00 PM, Manas  wrote:

> I understand that using foreachPartition I can create one DB connection per
> partition level. Is there a way to create a DB connection per executor
> level
> and share that for all partitions/tasks run within that executor? One
> approach I am thinking is to have a singleton with say a getConnection
> method. The connection object is not created in the driver rather it passes
> to the the singleton object the DB connection detail (host, port, user,
> password etc). In the foreachPartition this singleton object is passed too.
> The getConnection method of the singleton creates the actual connection
> object only the first time it's called and returns the same connection
> instance for all later invocations. I believe that way each executor JVM
> will have one instance of the singleton/connection and thus all
> partitions/tasks running within that executor would share the same
> connection. I'd like to validate this approach with the spark experts. Does
> it have any inherent flaw or is there a better way to create one instance
> of
> an object per executor?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Create-one-DB-connection-per-executor-tp26588.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Evaluating spark streaming use case

2016-02-21 Thread Gerard Maas
It sounds like another  window operation on top of the 30-min window will
achieve the  desired objective.
Just keep in mind that you'll need to set the clean TTL (spark.cleaner.ttl)
to a long enough value and you will require enough resources (mem & disk)
to keep the required data.

-kr, Gerard.

On Sun, Feb 21, 2016 at 12:54 PM, Jatin Kumar <
jku...@rocketfuelinc.com.invalid> wrote:

> Hello Spark users,
>
> I have to aggregate messages from kafka and at some fixed interval (say
> every half hour) update a memory persisted RDD and run some computation.
> This computation uses last one day data. Steps are:
>
> - Read from realtime Kafka topic X in spark streaming batches of 5 seconds
> - Filter the above DStream messages and keep some of them
> - Create windows of 30 minutes on above DStream and aggregate by Key
> - Merge this 30 minute RDD with a memory persisted RDD say combinedRdd
> - Maintain last N such RDDs in a deque persisting them on disk. While
> adding new RDD, subtract oldest RDD from the combinedRdd.
> - Final step consider last N such windows (of 30 minutes each) and do
> final aggregation
>
> Does the above way of using spark streaming looks reasonable? Is there a
> better way of doing the above?
>
> --
> Thanks
> Jatin
>
>


Hadoop credentials missing in some tasks?

2016-02-05 Thread Gerard Maas
Hi,

We're facing a situation where simple queries to parquet files stored in
Swift through a Hive Metastore sometimes fail with this exception:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 6
in stage 58.0 failed 4 times, most recent failure: Lost task 6.3 in stage
58.0 (TID 412, agent-1.mesos.private):
org.apache.hadoop.fs.swift.exceptions.SwiftConfigurationException: Missing
mandatory configuration option: fs.swift.service.##.auth.url
at 
org.apache.hadoop.fs.swift.http.RestClientBindings.copy(RestClientBindings.java:219)
(...)

Queries requiring a full table scan, like select(count(*)) would fail with
the mentioned exception while smaller chunks of work like " select *
 from... LIMIT 5" would succeed.

The problem seems to relate to the number of tasks scheduled:

If we force a reduction of the number of tasks to 1, the job  succeeds:

dataframe.rdd.coalesce(1).count()

Would return a correct result while

dataframe.count() would fail with the exception mentioned  above.

To me, it looks like credentials are lost somewhere in the serialization
path when the tasks are submitted to the cluster.  I have not found an
explanation yet to why a job that requires only one task succeeds.

We are running on Apache Zepellin  for Swift and Spark Notebook for S3.
Both show an equivalent exception within their specific hadoop filesystem
implementation when the task fails:

Zepelling + Swift:

org.apache.hadoop.fs.swift.exceptions.SwiftConfigurationException: Missing
mandatory configuration option: fs.swift.service.##.auth.url

Spark Notebook + S3:

java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key
must be specified as the username or password (respectively) of a s3n URL,
or by setting the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey
properties (respectively).
at org.apache.hadoop.fs.s3.S3Credentials.initialize(S3Credentials.java:70)

Valid credentials are being set programmatically through
sc.hadoopConfiguration

Our system: Zepellin or Spark Notebook with Spark 1.5.1 running on Docker,
Docker running on Mesos, Hadoop 2.4.0. One environment running on Softlayer
(Swift) and other Amazon EC2 (S3) of similar sizes.

Any ideas on how to address this issue or figure out what's going on??

Thanks,  Gerard.


Re: spark-cassandra

2016-02-03 Thread Gerard Maas
NoSuchMethodError usually refers to a version conflict. Probably your job
was built against a higher version of the cassandra connector than what's
available on the run time.
Check that the versions are aligned.

-kr, Gerard.

On Wed, Feb 3, 2016 at 1:37 PM, Madabhattula Rajesh Kumar <
mrajaf...@gmail.com> wrote:

> Hi,
>
> I am using Spark Jobserver to submit the jobs. I am using spark-cassandra
> connector to connect to Cassandra. I am getting below exception through
> spak jobserver.
>
> If I submit the job through *Spark-Submit *command it is working fine,.
>
> Please let me know how to solve this issue
>
>
> Exception in thread "pool-1-thread-1" java.lang.NoSuchMethodError:
> com.datastax.driver.core.TableMetadata.getIndexes()Ljava/util/List;
> at
> com.datastax.spark.connector.cql.Schema$.getIndexMap(Schema.scala:193)
> at
> com.datastax.spark.connector.cql.Schema$.com$datastax$spark$connector$cql$Schema$$fetchPartitionKey(Schema.scala:197)
> at
> com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchTables$1$2.apply(Schema.scala:239)
> at
> com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchTables$1$2.apply(Schema.scala:238)
> at
> scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:722)
> at
> scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:153)
> at
> scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306)
> at
> scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721)
> at
> com.datastax.spark.connector.cql.Schema$.com$datastax$spark$connector$cql$Schema$$fetchTables$1(Schema.scala:238)
> at
> com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchKeyspaces$1$2.apply(Schema.scala:247)
> at
> com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchKeyspaces$1$2.apply(Schema.scala:246)
> at
> scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:722)
> at
> scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:153)
> at
> scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306)
> at
> scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721)
> at
> com.datastax.spark.connector.cql.Schema$.com$datastax$spark$connector$cql$Schema$$fetchKeyspaces$1(Schema.scala:246)
> at
> com.datastax.spark.connector.cql.Schema$$anonfun$fromCassandra$1.apply(Schema.scala:252)
> at
> com.datastax.spark.connector.cql.Schema$$anonfun$fromCassandra$1.apply(Schema.scala:249)
> at
> com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withClusterDo$1.apply(CassandraConnector.scala:121)
> at
> com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withClusterDo$1.apply(CassandraConnector.scala:120)
> at
> com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:110)
> at
> com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:109)
> at
> com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:139)
> at
> com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:109)
> at
> com.datastax.spark.connector.cql.CassandraConnector.withClusterDo(CassandraConnector.scala:120)
> at
> com.datastax.spark.connector.cql.Schema$.fromCassandra(Schema.scala:249)
> at
> com.datastax.spark.connector.writer.TableWriter$.apply(TableWriter.scala:263)
> at
> com.datastax.spark.connector.RDDFunctions.saveToCassandra(RDDFunctions.scala:36)
> at
> com.cisco.ss.etl.utils.ETLHelper$class.persistBackupConfigDevicesData(ETLHelper.scala:79)
> at com.cisco.ss.etl.Main$.persistBackupConfigDevicesData(Main.scala:13)
> at
> com.cisco.ss.etl.utils.ETLHelper$class.persistByBacthes(ETLHelper.scala:43)
> at com.cisco.ss.etl.Main$.persistByBacthes(Main.scala:13)
> at com.cisco.ss.etl.Main$$anonfun$runJob$3.apply(Main.scala:48)
> at com.cisco.ss.etl.Main$$anonfun$runJob$3.apply(Main.scala:45)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at com.cisco.ss.etl.Main$.runJob(Main.scala:45)
> at com.cisco.ss.etl.Main$.runJob(Main.scala:13)
> at
> spark.jobserver.JobManagerActor$$anonfun$spark$jobserver$JobManagerActor$$getJobFuture$4.apply(JobManagerActor.scala:274)
> at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecut

Re: Determine Topic MetaData Spark Streaming Job

2016-01-25 Thread Gerard Maas
That's precisely what this constructor does:
KafkaUtils.createDirectStream[...](ssc,
kafkaConfig, topics)

Is there a reason to do that yourself?  In that case, look at how it's done
in Spark Streaming for inspiration:
https://github.com/apache/spark/blob/master/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala#L204

-kr, Gerard.




On Mon, Jan 25, 2016 at 5:53 PM, Ashish Soni  wrote:

> Correct what i am trying to achieve is that before the streaming job
> starts query the topic meta data from kafka , determine all the partition
> and provide those to direct API.
>
> So my question is should i consider passing all the partition from command
> line and query kafka and find and provide , what is the correct approach.
>
> Ashish
>
> On Mon, Jan 25, 2016 at 11:38 AM, Gerard Maas 
> wrote:
>
>> What are you trying to achieve?
>>
>> Looks like you want to provide offsets but you're not managing them
>> and I'm assuming you're using the direct stream approach.
>>
>> In that case, use the simpler constructor that takes the kafka config and
>> the topics. Let it figure it out the offsets (it will contact kafka and
>> request the partitions for the topics provided)
>>
>> KafkaUtils.createDirectStream[...](ssc, kafkaConfig, topics)
>>
>>  -kr, Gerard
>>
>> On Mon, Jan 25, 2016 at 5:31 PM, Ashish Soni 
>> wrote:
>>
>>> Hi All ,
>>>
>>> What is the best way to tell spark streaming job for the no of partition
>>> to to a given topic -
>>>
>>> Should that be provided as a parameter or command line argument
>>> or
>>> We should connect to kafka in the driver program and query it
>>>
>>> Map fromOffsets = new
>>> HashMap();
>>> fromOffsets.put(new TopicAndPartition(driverArgs.inputTopic, 0), 0L);
>>>
>>> Thanks,
>>> Ashish
>>>
>>
>>
>


Re: Determine Topic MetaData Spark Streaming Job

2016-01-25 Thread Gerard Maas
What are you trying to achieve?

Looks like you want to provide offsets but you're not managing them and I'm
assuming you're using the direct stream approach.

In that case, use the simpler constructor that takes the kafka config and
the topics. Let it figure it out the offsets (it will contact kafka and
request the partitions for the topics provided)

KafkaUtils.createDirectStream[...](ssc, kafkaConfig, topics)

 -kr, Gerard

On Mon, Jan 25, 2016 at 5:31 PM, Ashish Soni  wrote:

> Hi All ,
>
> What is the best way to tell spark streaming job for the no of partition
> to to a given topic -
>
> Should that be provided as a parameter or command line argument
> or
> We should connect to kafka in the driver program and query it
>
> Map fromOffsets = new HashMap Long>();
> fromOffsets.put(new TopicAndPartition(driverArgs.inputTopic, 0), 0L);
>
> Thanks,
> Ashish
>


Re: Inconsistent data in Cassandra

2015-12-13 Thread Gerard Maas
Hi Padma,

Have you considered reducing the dataset before writing it to Cassandra? Looks 
like this consistency problem could be avoided by cleaning the dataset of 
unnecessary records before persisting it:

val onlyMax = rddByPrimaryKey.reduceByKey{case (x,y) => Max(x,y)} // your max 
function here will need to pick the right max value from the records attached 
to the same primary key

-kr, Gerard.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Re: flatMap function in Spark

2015-12-08 Thread Gerard Maas
http://stackoverflow.com/search?q=%5Bapache-spark%5D+flatmap

-kr, Gerard.

On Tue, Dec 8, 2015 at 12:04 PM, Sateesh Karuturi <
sateesh.karutu...@gmail.com> wrote:

> Guys... I am new to Spark..
> Please anyone please explain me how flatMap function works with a little
> sample example...
> Thanks in advance...
>


Re: spark streaming count msg in batch

2015-12-01 Thread Gerard Maas
dstream.count()

See: http://spark.apache.org/docs/latest/programming-guide.html#actions

-kr, Gerard.

On Tue, Dec 1, 2015 at 6:32 PM, patcharee  wrote:

> Hi,
>
> In spark streaming how to count the total number of message (from Socket)
> in one batch?
>
> Thanks,
> Patcharee
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark DStream Data stored out of order in Cassandra

2015-11-30 Thread Gerard Maas
Spark Streaming will consumer and process data in parallel. So the order of
the output will depend not only on the order of the input but also in the
time it takes for each task to process. Different options, like
repartitions, sorts and shuffles at Spark level will also affect ordering,
so the best way would be to rely on the scheme in Cassandra to ensure the
ordering expected by the application.

What is the schema you're using at the Cassandra side?  And how is the data
going to be queried?   That last question should drive the required
ordering.

-kr, Gerard.

On Mon, Nov 30, 2015 at 12:37 PM, Prateek .  wrote:

> Hi,
>
>
>
> I have an time critical spark application, which is taking sensor data
> from kafka stream, storing in case class, applying transformations and then
> storing in cassandra schema. The data needs to be stored in schema, in FIFO
> order.
>
>
>
> The order is maintained at kafka queue but I am observing, out of order
> data in Cassandra schema. Does Spark Streaming provide any functionality to
> retain order. Or do we need do implement some sorting based on timestamp of
> arrival.
>
>
>
>
>
> Regards,
>
> Prateek
> "DISCLAIMER: This message is proprietary to Aricent and is intended solely
> for the use of the individual to whom it is addressed. It may contain
> privileged or confidential information and should not be circulated or used
> for any purpose other than for what it is intended. If you have received
> this message in error, please notify the originator immediately. If you are
> not the intended recipient, you are notified that you are strictly
> prohibited from using, copying, altering, or disclosing the contents of
> this message. Aricent accepts no responsibility for loss or damage arising
> from the use of the information transmitted by this email including damage
> from virus."
>


Re: streaming: missing data. does saveAsTextFile() append or replace?

2015-11-08 Thread Gerard Maas
Andy,

Using the rdd.saveAsTextFile(...)  will overwrite the data if your target
is the same file.

If you want to save to HDFS, DStream offers dstream.saveAsTextFiles(prefix,
suffix)  where a new file will be written at each streaming interval.
Note that this will result in a saved file for each streaming interval. If
you want to increase the file size (usually a good idea in HDFS), you can
use a window function over the dstream and save the 'windowed'  dstream
instead.

kind regards, Gerard.

On Sat, Nov 7, 2015 at 10:55 PM, Andy Davidson <
a...@santacruzintegration.com> wrote:

> Hi
>
> I just started a new spark streaming project. In this phase of the system
> all we want to do is save the data we received to hdfs. I after running for
> a couple of days it looks like I am missing a lot of data. I wonder if
> saveAsTextFile("hdfs:///rawSteamingData”); is overwriting the data I
> capture in previous window? I noticed that after running for a couple of
> days  my hdfs file system has 25 file. The names are something like 
> “part-6”. I
> used 'hadoop fs –dus’ to check the total data captured. While the system
> was running I would periodically call ‘dus’ I was surprised sometimes the
> numbers of total bytes actually dropped.
>
>
> Is there a better way to save write my data to disk?
>
> Any suggestions would be appreciated
>
> Andy
>
>
>public static void main(String[] args) {
>
>SparkConf conf = new SparkConf().setAppName(appName);
>
> JavaSparkContext jsc = new JavaSparkContext(conf);
>
> JavaStreamingContext ssc = new JavaStreamingContext(jsc, new
> Duration(5 * 1000));
>
>
> [ deleted code …]
>
>
> data.foreachRDD(new Function, Void>(){
>
> private static final long serialVersionUID =
> -7957854392903581284L;
>
>
> @Override
>
> public Void call(JavaRDD jsonStr) throws Exception {
>
> jsonStr.saveAsTextFile("hdfs:///rawSteamingData”); // 
> /rawSteamingData
> is a directory
>
> return null;
>
> }
>
> });
>
>
>
> ssc.checkpoint(checkPointUri);
>
>
>
> ssc.start();
>
> ssc.awaitTermination();
>
> }
>


Re: How to check whether the RDD is empty or not

2015-10-21 Thread Gerard Maas
As TD mentions, there's no such thing as an 'empty DStream'. Some intervals
of a DStream could be empty, in which case the related RDD will be empty.
This means that you should express such condition based on the RDD's of the
DStream. Translated in code:

dstream.foreachRDD{ rdd =>
 if (!rdd.isEmpty) {
...do stuff ...
}
}


On Wed, Oct 21, 2015 at 9:00 PM, Tathagata Das  wrote:

> What do you mean by checking when a "DStream is empty"? DStream represents
> an endless stream of data, and at point of time checking whether it is
> empty or not does not make sense.
>
> FYI, there is RDD.isEmpty()
>
>
>
> On Wed, Oct 21, 2015 at 10:03 AM, diplomatic Guru <
> diplomaticg...@gmail.com> wrote:
>
>> I tried below code but still carrying out the action even though there is no 
>> new data.
>>
>> JavaPairInputDStream input = ssc.fileStream(iFolder, 
>> LongWritable.class,Text.class, TextInputFormat.class);
>>
>>  if(input != null){
>> //do some action if it is not empty
>> }
>>
>>
>> On 21 October 2015 at 18:00, diplomatic Guru 
>> wrote:
>>
>>>
>>> Hello All,
>>>
>>> I have a Spark Streaming job that should  do some action only if the RDD
>>> is not empty. This can be done easily with the spark batch RDD as I could
>>> .take(1) and check whether it is empty or  not. But this cannot been done
>>> in Spark Streaming DStrem
>>>
>>>
>>> JavaPairInputDStream input = ssc.fileStream(iFolder, 
>>> LongWritable.class,Text.class, TextInputFormat.class);
>>>
>>>  if(inputLines!=null){
>>> //do some action if it is not empty
>>> }
>>>
>>> Any ideas please?
>>>
>>>
>>>
>>>
>>
>


Re: Is there a way to create multiple streams in spark streaming?

2015-10-20 Thread Gerard Maas
You can create as many functional derivates of your original stream by
using transformations. That's exactly the model that Spark Streaming offers.

In your example, that would become something like:

val stream = ssc.socketTextStream("localhost", )
val stream1 = stream.map(fun1)
val stream2 = stream.map(fun2)
// you could also:
val stream3 = stream2.filter(predicate).flatMap(ffun3)

// Then you need some action to materialize the streams:
stream2.print
stream2.saveAsTextFiles()

-kr, Gerard.


On Tue, Oct 20, 2015 at 12:20 PM, LinQili  wrote:

> Hi all,
> I wonder if there is a way to create some child streaming while using
> spark streaming?
> For example, I create a netcat main stream, read data from a socket, then
> create 3 different child streams on the main stream,
> in stream1, we do fun1 on the input data then print result to screen;
> in stream2, we do fun2 on the input data then print result to screen;
> in stream3, we do fun3 on the input data then print result to screen.
> Is any one some hints?
>


Re: Node afinity for Kafka-Direct Stream

2015-10-14 Thread Gerard Maas
Thanks! Indeed not a given.

I'm not sure we have the time to wait for nodes within a streaming
interval. I'll explore some alternatives. If I stumble on something
reasonable I'll report back.

-kr, Gerard.


On Wed, Oct 14, 2015 at 9:57 PM, Cody Koeninger  wrote:

> What I'm saying is that it's not a given with spark, even in
> receiver-based mode, because as soon as you lose an executor you'll have a
> rebalance.
>
> Spark's model in general isn't a good fit for pinning work to specific
> nodes.
>
> If you really want to try and fake this, you can override
> getPreferredLocations and set spark.locality.wait to a high value.
>
>
>
> On Wed, Oct 14, 2015 at 2:45 PM, Gerard Maas 
> wrote:
>
>> Hi Cody,
>>
>> I think that I misused the term 'data locality'. I think I should better
>> call it "node affinity"  instead, as this is what I would like to have:
>> For as long as an executor is available, I would like to have the same
>> kafka partition processed by the same node in order to take advantage of
>> local in-memory structures.
>>
>> In the receiver-based mode this was a given. Any ideas how to achieve
>> that with the direct stream approach?
>>
>> -greetz, Gerard.
>>
>>
>> On Wed, Oct 14, 2015 at 4:31 PM, Cody Koeninger 
>> wrote:
>>
>>> Assumptions about locality in spark are not very reliable, regardless of
>>> what consumer you use.  Even if you have locality preferences, and locality
>>> wait turned up really high, you still have to account for losing executors.
>>>
>>> On Wed, Oct 14, 2015 at 8:23 AM, Gerard Maas 
>>> wrote:
>>>
>>>> Thanks Saisai, Mishra,
>>>>
>>>> Indeed, that hint will only work on a case where the Spark executor is
>>>> co-located with the Kafka broker.
>>>> I think the answer to my question as stated  is that there's no
>>>> warranty of where the task will execute as it will depend on the scheduler
>>>> and cluster resources available  (Mesos in our case).
>>>> Therefore, any assumptions made about data locality using the
>>>> consumer-based approach need to be reconsidered when migrating to the
>>>> direct stream.
>>>>
>>>> ((In our case, we were using local caches to decide when a given
>>>> secondary index for a record should be produced and written.))
>>>>
>>>> -kr, Gerard.
>>>>
>>>>
>>>>
>>>>
>>>> On Wed, Oct 14, 2015 at 2:58 PM, Saisai Shao 
>>>> wrote:
>>>>
>>>>> This preferred locality is a hint to spark to schedule Kafka tasks on
>>>>> the preferred nodes, if Kafka and Spark are two separate cluster, 
>>>>> obviously
>>>>> this locality hint takes no effect, and spark will schedule tasks 
>>>>> following
>>>>> node-local -> rack-local -> any pattern, like any other spark tasks.
>>>>>
>>>>> On Wed, Oct 14, 2015 at 8:10 PM, Rishitesh Mishra <
>>>>> rmis...@snappydata.io> wrote:
>>>>>
>>>>>> Hi Gerard,
>>>>>> I am also trying to understand the same issue. Whatever code I have
>>>>>> seen it looks like once Kafka RDD is constructed the execution of that 
>>>>>> RDD
>>>>>> is upto the task scheduler and it can schedule the partitions based on 
>>>>>> the
>>>>>> load on nodes. There is preferred node specified in Kafks RDD. But ASFIK 
>>>>>> it
>>>>>> maps to the Kafka partitions host . So if Kafka and Spark are co hosted
>>>>>> probably this will work. If not, I am not sure how to get data locality 
>>>>>> for
>>>>>> a partition.
>>>>>> Others,
>>>>>> correct me if there is a way.
>>>>>>
>>>>>> On Wed, Oct 14, 2015 at 3:08 PM, Gerard Maas 
>>>>>> wrote:
>>>>>>
>>>>>>> In the receiver-based kafka streaming model, given that each
>>>>>>> receiver starts as a long-running task, one can rely in a certain 
>>>>>>> degree of
>>>>>>> data locality based on the kafka partitioning:  Data published on a 
>>>>>>> given
>>>>>>> topic/partition will land on the same spark streaming receiving node 
>>>>>>> until
>>>>>>> the receiver dies and needs to be restarted somewhere else.
>>>>>>>
>>>>>>> As I understand, the direct-kafka streaming model just computes
>>>>>>> offsets and relays the work to a KafkaRDD. How is the execution locality
>>>>>>> compared to the receiver-based approach?
>>>>>>>
>>>>>>> thanks, Gerard.
>>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>>
>>>>>> Regards,
>>>>>> Rishitesh Mishra,
>>>>>> SnappyData . (http://www.snappydata.io/)
>>>>>>
>>>>>> https://in.linkedin.com/in/rishiteshmishra
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>


Re: Node afinity for Kafka-Direct Stream

2015-10-14 Thread Gerard Maas
Hi Cody,

I think that I misused the term 'data locality'. I think I should better
call it "node affinity"  instead, as this is what I would like to have:
For as long as an executor is available, I would like to have the same
kafka partition processed by the same node in order to take advantage of
local in-memory structures.

In the receiver-based mode this was a given. Any ideas how to achieve that
with the direct stream approach?

-greetz, Gerard.


On Wed, Oct 14, 2015 at 4:31 PM, Cody Koeninger  wrote:

> Assumptions about locality in spark are not very reliable, regardless of
> what consumer you use.  Even if you have locality preferences, and locality
> wait turned up really high, you still have to account for losing executors.
>
> On Wed, Oct 14, 2015 at 8:23 AM, Gerard Maas 
> wrote:
>
>> Thanks Saisai, Mishra,
>>
>> Indeed, that hint will only work on a case where the Spark executor is
>> co-located with the Kafka broker.
>> I think the answer to my question as stated  is that there's no warranty
>> of where the task will execute as it will depend on the scheduler and
>> cluster resources available  (Mesos in our case).
>> Therefore, any assumptions made about data locality using the
>> consumer-based approach need to be reconsidered when migrating to the
>> direct stream.
>>
>> ((In our case, we were using local caches to decide when a given
>> secondary index for a record should be produced and written.))
>>
>> -kr, Gerard.
>>
>>
>>
>>
>> On Wed, Oct 14, 2015 at 2:58 PM, Saisai Shao 
>> wrote:
>>
>>> This preferred locality is a hint to spark to schedule Kafka tasks on
>>> the preferred nodes, if Kafka and Spark are two separate cluster, obviously
>>> this locality hint takes no effect, and spark will schedule tasks following
>>> node-local -> rack-local -> any pattern, like any other spark tasks.
>>>
>>> On Wed, Oct 14, 2015 at 8:10 PM, Rishitesh Mishra >> > wrote:
>>>
>>>> Hi Gerard,
>>>> I am also trying to understand the same issue. Whatever code I have
>>>> seen it looks like once Kafka RDD is constructed the execution of that RDD
>>>> is upto the task scheduler and it can schedule the partitions based on the
>>>> load on nodes. There is preferred node specified in Kafks RDD. But ASFIK it
>>>> maps to the Kafka partitions host . So if Kafka and Spark are co hosted
>>>> probably this will work. If not, I am not sure how to get data locality for
>>>> a partition.
>>>> Others,
>>>> correct me if there is a way.
>>>>
>>>> On Wed, Oct 14, 2015 at 3:08 PM, Gerard Maas 
>>>> wrote:
>>>>
>>>>> In the receiver-based kafka streaming model, given that each receiver
>>>>> starts as a long-running task, one can rely in a certain degree of data
>>>>> locality based on the kafka partitioning:  Data published on a given
>>>>> topic/partition will land on the same spark streaming receiving node until
>>>>> the receiver dies and needs to be restarted somewhere else.
>>>>>
>>>>> As I understand, the direct-kafka streaming model just computes
>>>>> offsets and relays the work to a KafkaRDD. How is the execution locality
>>>>> compared to the receiver-based approach?
>>>>>
>>>>> thanks, Gerard.
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> Regards,
>>>> Rishitesh Mishra,
>>>> SnappyData . (http://www.snappydata.io/)
>>>>
>>>> https://in.linkedin.com/in/rishiteshmishra
>>>>
>>>
>>>
>>
>


Re: Node afinity for Kafka-Direct Stream

2015-10-14 Thread Gerard Maas
Thanks Saisai, Mishra,

Indeed, that hint will only work on a case where the Spark executor is
co-located with the Kafka broker.
I think the answer to my question as stated  is that there's no warranty of
where the task will execute as it will depend on the scheduler and cluster
resources available  (Mesos in our case).
Therefore, any assumptions made about data locality using the
consumer-based approach need to be reconsidered when migrating to the
direct stream.

((In our case, we were using local caches to decide when a given secondary
index for a record should be produced and written.))

-kr, Gerard.




On Wed, Oct 14, 2015 at 2:58 PM, Saisai Shao  wrote:

> This preferred locality is a hint to spark to schedule Kafka tasks on the
> preferred nodes, if Kafka and Spark are two separate cluster, obviously
> this locality hint takes no effect, and spark will schedule tasks following
> node-local -> rack-local -> any pattern, like any other spark tasks.
>
> On Wed, Oct 14, 2015 at 8:10 PM, Rishitesh Mishra 
> wrote:
>
>> Hi Gerard,
>> I am also trying to understand the same issue. Whatever code I have seen
>> it looks like once Kafka RDD is constructed the execution of that RDD is
>> upto the task scheduler and it can schedule the partitions based on the
>> load on nodes. There is preferred node specified in Kafks RDD. But ASFIK it
>> maps to the Kafka partitions host . So if Kafka and Spark are co hosted
>> probably this will work. If not, I am not sure how to get data locality for
>> a partition.
>> Others,
>> correct me if there is a way.
>>
>> On Wed, Oct 14, 2015 at 3:08 PM, Gerard Maas 
>> wrote:
>>
>>> In the receiver-based kafka streaming model, given that each receiver
>>> starts as a long-running task, one can rely in a certain degree of data
>>> locality based on the kafka partitioning:  Data published on a given
>>> topic/partition will land on the same spark streaming receiving node until
>>> the receiver dies and needs to be restarted somewhere else.
>>>
>>> As I understand, the direct-kafka streaming model just computes offsets
>>> and relays the work to a KafkaRDD. How is the execution locality compared
>>> to the receiver-based approach?
>>>
>>> thanks, Gerard.
>>>
>>
>>
>>
>> --
>>
>> Regards,
>> Rishitesh Mishra,
>> SnappyData . (http://www.snappydata.io/)
>>
>> https://in.linkedin.com/in/rishiteshmishra
>>
>
>


Node afinity for Kafka-Direct Stream

2015-10-14 Thread Gerard Maas
In the receiver-based kafka streaming model, given that each receiver
starts as a long-running task, one can rely in a certain degree of data
locality based on the kafka partitioning:  Data published on a given
topic/partition will land on the same spark streaming receiving node until
the receiver dies and needs to be restarted somewhere else.

As I understand, the direct-kafka streaming model just computes offsets and
relays the work to a KafkaRDD. How is the execution locality compared to
the receiver-based approach?

thanks, Gerard.


Re: Weird performance pattern of Spark Streaming (1.4.1) + direct Kafka

2015-10-07 Thread Gerard Maas
Thanks for the feedback.

Cassandra does not seem to be the issue. The time for writing to Cassandra
is in the same order of magnitude (see below)

The code structure is roughly as follows:

dstream.filter(pred).foreachRDD{rdd =>
  val sparkT0 = currentTimeMs
  val metrics = rdd.mapPartitions{partition =>
 val partitionT0 = currentTimeMs
  partition.foreach{ transform andThen storeInCassandra _}
 val partitionT1 = currentTimeMs
 Seq(Metric( "local time", executor, partitionT1 - partitionT0,
records)).iterator
  }
  //materialize the rdd
  val allMetrics = metrics.collect()
  val sparkT1 = currentTimeMs
  val totalizedMetrics = // group by and reduce with sum
  val sparkT2 = currentTimeMs
  totalizedMetrics.foreach{ metric => gmetric.report(metric)}
}

Relating this code with the time table presented before (time in ms):

How measured?Slow TaskFast Taskexecutor local totalizedMetrics347.6281.53spark
computationsparkT1 - sparkT06930263metric collectionsparkT2 - sparkT170138wall
clock processsparkT2 - sparkT07000401total records processedtotalizedMetrics
42975002

What we observe is that the largest difference comes from the
materialization of the RDD. This pattern repeats cyclically one on, one off.

Any ideas where to further look?

kr, Gerard.


On Wed, Oct 7, 2015 at 1:33 AM, Tathagata Das  wrote:

> Good point!
>
> On Tue, Oct 6, 2015 at 4:23 PM, Cody Koeninger  wrote:
>
>> I agree getting cassandra out of the picture is a good first step.
>>
>> But if you just do foreachRDD { _.count } recent versions of direct
>> stream shouldn't do any work at all on the executor (since the number of
>> messages in the rdd is known already)
>>
>> do a foreachPartition and println or count the iterator manually.
>>
>> On Tue, Oct 6, 2015 at 6:02 PM, Tathagata Das 
>> wrote:
>>
>>> Are sure that this is not related to Cassandra inserts? Could you just
>>> do foreachRDD { _.count } instead  to keep Cassandra out of the picture and
>>> then test this agian.
>>>
>>> On Tue, Oct 6, 2015 at 12:33 PM, Adrian Tanase 
>>> wrote:
>>>
>>>> Also check if the Kafka cluster is still balanced. Maybe one of the
>>>> brokers manages too many partitions, all the work will stay on that
>>>> executor unless you repartition right after kakfka (and I'm not saying you
>>>> should).
>>>>
>>>> Sent from my iPhone
>>>>
>>>> On 06 Oct 2015, at 22:17, Cody Koeninger  wrote:
>>>>
>>>> I'm not clear on what you're measuring.  Can you post relevant code
>>>> snippets including the measurement code?
>>>>
>>>> As far as kafka metrics, nothing currently.  There is an info-level log
>>>> message every time a kafka rdd iterator is instantiated,
>>>>
>>>> log.info(s"Computing topic ${part.topic}, partition
>>>> ${part.partition} " +
>>>>
>>>>   s"offsets ${part.fromOffset} -> ${part.untilOffset}")
>>>>
>>>>
>>>> If you log once you're done with an iterator you should be able to see
>>>> the delta.
>>>>
>>>> The other thing to try is reduce the number of parts involved in the
>>>> job to isolate it ... first thing I'd do there is take cassandra out of the
>>>> equation.
>>>>
>>>>
>>>>
>>>> On Tue, Oct 6, 2015 at 2:00 PM, Gerard Maas 
>>>> wrote:
>>>>
>>>>> Hi Cody,
>>>>>
>>>>> The job is doing ETL from Kafka records to Cassandra. After a
>>>>> single filtering stage on Spark, the 'TL' part is done using the
>>>>> dstream.foreachRDD{rdd.foreachPartition{...TL ...}} pattern.
>>>>>
>>>>> We have metrics on the executor work which we collect and add
>>>>> together, indicated here by 'local computation'.  As you can see, we also
>>>>> measure how much it cost us to measure :-)
>>>>> See how 'local work'  times are comparable.  What's not visible is the
>>>>> task scheduling and consuming the data from Kafka which becomes part of 
>>>>> the
>>>>> 'spark computation' part.
>>>>>
>>>>> The pattern we see is 1 fast, 1 slow, 1 fast,... zig...zag...
>>>>>
>>>>> Are there metrics available somehow on the Kafka reading time?
>>>>>
>>>>> Slow Task Fast Task local computation 347.6 281.53 spark computation
>>>>&g

Re: Weird performance pattern of Spark Streaming (1.4.1) + direct Kafka

2015-10-06 Thread Gerard Maas
Hi Cody,

The job is doing ETL from Kafka records to Cassandra. After a
single filtering stage on Spark, the 'TL' part is done using the
dstream.foreachRDD{rdd.foreachPartition{...TL ...}} pattern.

We have metrics on the executor work which we collect and add together,
indicated here by 'local computation'.  As you can see, we also measure how
much it cost us to measure :-)
See how 'local work'  times are comparable.  What's not visible is the task
scheduling and consuming the data from Kafka which becomes part of the
'spark computation' part.

The pattern we see is 1 fast, 1 slow, 1 fast,... zig...zag...

Are there metrics available somehow on the Kafka reading time?

Slow TaskFast Tasklocal computation347.6281.53spark computation6930263metric
collection70138wall clock process7000401total records processed42975002

(time in ms)

kr, Gerard.


On Tue, Oct 6, 2015 at 8:01 PM, Cody Koeninger  wrote:

> Can you say anything more about what the job is doing?
>
> First thing I'd do is try to get some metrics on the time taken by your
> code on the executors (e.g. when processing the iterator) to see if it's
> consistent between the two situations.
>
> On Tue, Oct 6, 2015 at 11:45 AM, Gerard Maas 
> wrote:
>
>> Hi,
>>
>> We recently migrated our streaming jobs to the direct kafka receiver. Our
>> initial migration went quite fine but now we are seeing a weird zig-zag
>> performance pattern we cannot explain.
>> In alternating fashion, one task takes about 1 second to finish and the
>> next takes 7sec for a stable streaming rate.
>>
>> Here are comparable metrics for two successive tasks:
>> *Slow*:
>>
>>
>> ​
>>
>> Executor IDAddressTask TimeTotal TasksFailed TasksSucceeded Tasks
>> 20151006-044141-2408867082-5050-21047-S0dnode-3.hdfs.private:3686322 s303
>> 20151006-044141-2408867082-5050-21047-S1dnode-0.hdfs.private:4381240 s110
>> 1120151006-044141-2408867082-5050-21047-S4dnode-5.hdfs.private:5994549 s
>> 10010
>> *Fast*:
>>
>> ​
>>
>> Executor IDAddressTask TimeTotal TasksFailed TasksSucceeded Tasks
>> 20151006-044141-2408867082-5050-21047-S0dnode-3.hdfs.private:368630.6 s40
>> 420151006-044141-2408867082-5050-21047-S1dnode-0.hdfs.private:438121 s909
>> 20151006-044141-2408867082-5050-21047-S4dnode-5.hdfs.private:599451 s110
>> 11
>> We have some custom metrics that measure wall-clock time of execution of
>> certain blocks of the job, like the time it takes to do the local
>> computations (RDD.foreachPartition closure) vs total time.
>> The difference between the slow and fast executing task is on the 'spark
>> computation time' which is wall-clock for the task scheduling
>> (DStream.foreachRDD closure)
>>
>> e.g.
>> Slow task:
>>
>> local computation time: 347.6096849996, *spark computation time:
>> 6930*, metric collection: 70, total process: 7000, total_records: 4297
>>
>> Fast task:
>> local computation time: 281.539042,* spark computation time: 263*,
>> metric collection: 138, total process: 401, total_records: 5002
>>
>> We are currently running Spark 1.4.1. The load and the work to be done is
>> stable -this is on a dev env with that stuff under control.
>>
>> Any ideas what this behavior could be?
>>
>> thanks in advance,  Gerard.
>>
>>
>>
>>
>>
>>
>>
>


Weird performance pattern of Spark Streaming (1.4.1) + direct Kafka

2015-10-06 Thread Gerard Maas
Hi,

We recently migrated our streaming jobs to the direct kafka receiver. Our
initial migration went quite fine but now we are seeing a weird zig-zag
performance pattern we cannot explain.
In alternating fashion, one task takes about 1 second to finish and the
next takes 7sec for a stable streaming rate.

Here are comparable metrics for two successive tasks:
*Slow*:


​

Executor IDAddressTask TimeTotal TasksFailed TasksSucceeded Tasks
20151006-044141-2408867082-5050-21047-S0dnode-3.hdfs.private:3686322 s303
20151006-044141-2408867082-5050-21047-S1dnode-0.hdfs.private:4381240 s11011
20151006-044141-2408867082-5050-21047-S4dnode-5.hdfs.private:5994549 s10010
*Fast*:

​

Executor IDAddressTask TimeTotal TasksFailed TasksSucceeded Tasks
20151006-044141-2408867082-5050-21047-S0dnode-3.hdfs.private:368630.6 s404
20151006-044141-2408867082-5050-21047-S1dnode-0.hdfs.private:438121 s909
20151006-044141-2408867082-5050-21047-S4dnode-5.hdfs.private:599451 s11011
We have some custom metrics that measure wall-clock time of execution of
certain blocks of the job, like the time it takes to do the local
computations (RDD.foreachPartition closure) vs total time.
The difference between the slow and fast executing task is on the 'spark
computation time' which is wall-clock for the task scheduling
(DStream.foreachRDD closure)

e.g.
Slow task:

local computation time: 347.6096849996, *spark computation time: 6930*,
metric collection: 70, total process: 7000, total_records: 4297

Fast task:
local computation time: 281.539042,* spark computation time: 263*, metric
collection: 138, total process: 401, total_records: 5002

We are currently running Spark 1.4.1. The load and the work to be done is
stable -this is on a dev env with that stuff under control.

Any ideas what this behavior could be?

thanks in advance,  Gerard.


Re: Kafka Direct Stream

2015-10-03 Thread Gerard Maas
Hi,

collect(partialFunction) is equivalent to filter(x=>
partialFunction.isDefinedAt(x)).map(partialFunction)  so it's functionally
equivalent to your expression. I favor collect for its more compact form
but that's a personal preference. Use what you feel reads best.

Regarding performance, there will be some overhead of submitting many a
task for every filtered RDD that gets materialized to Cassandra. That's the
reason I proposed the ticket linked above. Have a look whether that would
improve your particular usecase and vote for it if so :-)

-kr, Gerard.

On Sat, Oct 3, 2015 at 3:53 PM, varun sharma 
wrote:

> Thanks Gerardthe code snippet you shared worked.. but can you please
> explain/point me the usage of *collect* here. How it is
> different(performance/readability) from *filter.*
>
>> *val filteredRdd = rdd.filter(x=> x._1 == topic).map(_._2))*
>
>
> I am doing something like this.Please tell if I can improve the *Processing
> time* of this particular code:
>
> kafkaStringStream.foreachRDD{rdd =>
>   val topics = rdd.map(_._1).distinct().collect()
>   if (topics.length > 0) {
> val rdd_value = rdd.take(10).mkString("\n.\n")
> Log.slogger(Log.FILE.DEFAULT, INFO, BaseSLog(s"Printing all 
> feeds\n$rdd_value"))
>
> topics.foreach { topic =>
>   //rdd.filter(x=> x._1 == topic).map(_._2)
>   val filteredRdd = rdd.collect { case (t, data) if t == topic => data }
>   CassandraHelper.saveDataToCassandra(topic, filteredRdd)
> }
> updateOffsetsinZk(rdd)
>   }
>
> }
>
> On Fri, Oct 2, 2015 at 11:58 PM, Gerard Maas 
> wrote:
>
>> Something like this?
>>
>> I'm making the assumption that your topic name equals your keyspace for
>> this filtering example.
>>
>> dstream.foreachRDD{rdd =>
>>   val topics = rdd.map(_._1).distinct.collect
>>   topics.foreach{topic =>
>> val filteredRdd =  rdd.collect{case (t, data) if t == topic => data}.
>> filteredRdd.saveToCassandra(topic, "table")  // do not confuse this
>> collect with rdd.collect() that brings data to the driver
>>   }
>> }
>>
>>
>> I'm wondering: would something like this (
>> https://datastax-oss.atlassian.net/browse/SPARKC-257) better fit your
>> purposes?
>>
>> -kr, Gerard.
>>
>> On Fri, Oct 2, 2015 at 8:12 PM, varun sharma 
>> wrote:
>>
>>> Hi Adrian,
>>>
>>> Can you please give an example of how to achieve this:
>>>
>>>> *I would also look at filtering by topic and saving as different
>>>> Dstreams in your code*
>>>
>>> I have managed to get DStream[(String, String)] which is (
>>> *topic,my_data)* tuple. Lets call it kafkaStringStream.
>>> Now if I do kafkaStringStream.groupByKey() then I would get a
>>> DStream[(String,Iterable[String])].
>>> But I want a DStream instead of Iterable in order to apply
>>> saveToCassandra for storing it.
>>>
>>> Please help in how to transform iterable to DStream or any other
>>> workaround for achieving same.
>>>
>>>
>>> On Thu, Oct 1, 2015 at 8:17 PM, Adrian Tanase  wrote:
>>>
>>>> On top of that you could make the topic part of the key (e.g. keyBy in
>>>> .transform or manually emitting a tuple) and use one of the .xxxByKey
>>>> operators for the processing.
>>>>
>>>> If you have a stable, domain specific list of topics (e.g. 3-5 named
>>>> topics) and the processing is *really* different, I would also look at
>>>> filtering by topic and saving as different Dstreams in your code.
>>>>
>>>> Either way you need to start with Cody’s tip in order to extract the
>>>> topic name.
>>>>
>>>> -adrian
>>>>
>>>> From: Cody Koeninger
>>>> Date: Thursday, October 1, 2015 at 5:06 PM
>>>> To: Udit Mehta
>>>> Cc: user
>>>> Subject: Re: Kafka Direct Stream
>>>>
>>>> You can get the topic for a given partition from the offset range.  You
>>>> can either filter using that; or just have a single rdd and match on topic
>>>> when doing mapPartitions or foreachPartition (which I think is a better
>>>> idea)
>>>>
>>>>
>>>> http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers
>>>>
>>>> On Wed, Sep 30, 2015 at 5:02 PM, Udit Mehta  wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I am using spark direct stream to consume from multiple topics in
>>>>> Kafka. I am able to consume fine but I am stuck at how to separate the 
>>>>> data
>>>>> for each topic since I need to process data differently depending on the
>>>>> topic.
>>>>> I basically want to split the RDD consisting on N topics into N RDD's
>>>>> each having 1 topic.
>>>>>
>>>>> Any help would be appreciated.
>>>>>
>>>>> Thanks in advance,
>>>>> Udit
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> *VARUN SHARMA*
>>> *Flipkart*
>>> *Bangalore*
>>>
>>
>>
>
>
> --
> *VARUN SHARMA*
> *Flipkart*
> *Bangalore*
>


Re: Kafka Direct Stream

2015-10-02 Thread Gerard Maas
Something like this?

I'm making the assumption that your topic name equals your keyspace for
this filtering example.

dstream.foreachRDD{rdd =>
  val topics = rdd.map(_._1).distinct.collect
  topics.foreach{topic =>
val filteredRdd =  rdd.collect{case (t, data) if t == topic => data}.
filteredRdd.saveToCassandra(topic, "table")  // do not confuse this
collect with rdd.collect() that brings data to the driver
  }
}


I'm wondering: would something like this (
https://datastax-oss.atlassian.net/browse/SPARKC-257) better fit your
purposes?

-kr, Gerard.

On Fri, Oct 2, 2015 at 8:12 PM, varun sharma 
wrote:

> Hi Adrian,
>
> Can you please give an example of how to achieve this:
>
>> *I would also look at filtering by topic and saving as different Dstreams
>> in your code*
>
> I have managed to get DStream[(String, String)] which is (*topic,my_data)*
> tuple. Lets call it kafkaStringStream.
> Now if I do kafkaStringStream.groupByKey() then I would get a
> DStream[(String,Iterable[String])].
> But I want a DStream instead of Iterable in order to apply saveToCassandra
> for storing it.
>
> Please help in how to transform iterable to DStream or any other
> workaround for achieving same.
>
>
> On Thu, Oct 1, 2015 at 8:17 PM, Adrian Tanase  wrote:
>
>> On top of that you could make the topic part of the key (e.g. keyBy in
>> .transform or manually emitting a tuple) and use one of the .xxxByKey
>> operators for the processing.
>>
>> If you have a stable, domain specific list of topics (e.g. 3-5 named
>> topics) and the processing is *really* different, I would also look at
>> filtering by topic and saving as different Dstreams in your code.
>>
>> Either way you need to start with Cody’s tip in order to extract the
>> topic name.
>>
>> -adrian
>>
>> From: Cody Koeninger
>> Date: Thursday, October 1, 2015 at 5:06 PM
>> To: Udit Mehta
>> Cc: user
>> Subject: Re: Kafka Direct Stream
>>
>> You can get the topic for a given partition from the offset range.  You
>> can either filter using that; or just have a single rdd and match on topic
>> when doing mapPartitions or foreachPartition (which I think is a better
>> idea)
>>
>>
>> http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers
>>
>> On Wed, Sep 30, 2015 at 5:02 PM, Udit Mehta  wrote:
>>
>>> Hi,
>>>
>>> I am using spark direct stream to consume from multiple topics in Kafka.
>>> I am able to consume fine but I am stuck at how to separate the data for
>>> each topic since I need to process data differently depending on the topic.
>>> I basically want to split the RDD consisting on N topics into N RDD's
>>> each having 1 topic.
>>>
>>> Any help would be appreciated.
>>>
>>> Thanks in advance,
>>> Udit
>>>
>>
>>
>
>
> --
> *VARUN SHARMA*
> *Flipkart*
> *Bangalore*
>


Re: unoin streams not working for streams > 3

2015-09-14 Thread Gerard Maas
How many cores are you assigning to your spark streaming job?

On Mon, Sep 14, 2015 at 10:33 PM, Василец Дмитрий 
wrote:

> hello
> I have 4 streams from kafka and streaming not working.
> without any errors or logs
> but with 3 streams everything perfect.
> make sense only amount of streams , different triple combinations always
> working.
> any ideas how to debug or fix it ?
>
>
>


Re: [streaming] Using org.apache.spark.Logging will silently break task execution

2015-09-06 Thread Gerard Maas
You need to take into consideration 'where' things are executing. The
closure of the 'forEachRDD'  executes in the driver. Therefore, the log
statements printed during the execution of that part will be found in the
driver logs.
In contrast, the foreachPartition closure executes on the worker nodes. You
will find the '+++ForEachPartition+++' messages printed in the executor log.

So both statements execute, but in different locations of the distributed
computing environment (aka cluster)

-kr, Gerard.

On Sun, Sep 6, 2015 at 10:53 PM, Alexey Ponkin  wrote:

> Hi,
>
> I have the following code
>
> object MyJob extends org.apache.spark.Logging{
> ...
>  val source: DStream[SomeType] ...
>
>  source.foreachRDD { rdd =>
>   logInfo(s"""+++ForEachRDD+++""")
>   rdd.foreachPartition { partitionOfRecords =>
> logInfo(s"""+++ForEachPartition+++""")
>   }
>   }
>
> I was expecting to see both log messages in job log.
> But unfortunately you will never see string '+++ForEachPartition+++' in
> logs, cause block foreachPartition will never execute.
> And also there is no error message or something in logs.
> I wonder is this a bug or known behavior?
> I know that org.apache.spark.Logging is DeveloperAPI, but why it is
> silently fails with no messages?
> What to use instead of org.apache.spark.Logging? in spark-streaming jobs?
>
> P.S. running spark 1.4.1 (on yarn)
>
> Thanks in advance
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Writing streaming data to cassandra creates duplicates

2015-08-04 Thread Gerard Maas
(removing dev from the to: as not relevant)

it would be good to see some sample data and the cassandra schema to have a
more concrete idea of the problem space.

Some thoughts: reduceByKey could still be used to 'pick' one element.
example of arbitrarily choosing the first one: reduceByKey{case (e1,e2) =>
e1}

The question to be answered is: what should happen to the multiple values
that arrive for 1 key?

And why are they creating duplicates in cassandra? if they have the same
key, they will result in an overwrite (that's not desirable due to
tombstones anyway)

-kr, Gerard.



On Tue, Aug 4, 2015 at 1:03 PM, Priya Ch 
wrote:

>
>
>
> Yes...union would be one solution. I am not doing any aggregation hence
> reduceByKey would not be useful. If I use groupByKey, messages with same
> key would be obtained in a partition. But groupByKey is very expensive
> operation as it involves shuffle operation. My ultimate goal is to write
> the messages to cassandra. if the messages with same key are handled by
> different streams...there would be concurrency issues. To resolve this i
> can union dstreams and apply hash parttioner so that it would bring all the
> same keys to a single partition or do a groupByKey which does the same.
>
> As groupByKey is expensive, is there any work around for this ?
>
> On Thu, Jul 30, 2015 at 2:33 PM, Juan Rodríguez Hortalá <
> juan.rodriguez.hort...@gmail.com> wrote:
>
>> Hi,
>>
>> Just my two cents. I understand your problem is that your problem is that
>> you have messages with the same key in two different dstreams. What I would
>> do would be making a union of all the dstreams with StreamingContext.union
>> or several calls to DStream.union, and then I would create a pair dstream
>> with the primary key as key, and then I'd use groupByKey or reduceByKey (or
>> combineByKey etc) to combine the messages with the same primary key.
>>
>> Hope that helps.
>>
>> Greetings,
>>
>> Juan
>>
>>
>> 2015-07-30 10:50 GMT+02:00 Priya Ch :
>>
>>> Hi All,
>>>
>>>  Can someone throw insights on this ?
>>>
>>> On Wed, Jul 29, 2015 at 8:29 AM, Priya Ch 
>>> wrote:
>>>


 Hi TD,

  Thanks for the info. I have the scenario like this.

  I am reading the data from kafka topic. Let's say kafka has 3
 partitions for the topic. In my streaming application, I would configure 3
 receivers with 1 thread each such that they would receive 3 dstreams (from
 3 partitions of kafka topic) and also I implement partitioner. Now there is
 a possibility of receiving messages with same primary key twice or more,
 one is at the time message is created and other times if there is an update
 to any fields for same message.

 If two messages M1 and M2 with same primary key are read by 2 receivers
 then even the partitioner in spark would still end up in parallel
 processing as there are altogether in different dstreams. How do we address
 in this situation ?

 Thanks,
 Padma Ch

 On Tue, Jul 28, 2015 at 12:12 PM, Tathagata Das 
 wrote:

> You have to partition that data on the Spark Streaming by the primary
> key, and then make sure insert data into Cassandra atomically per key, or
> per set of keys in the partition. You can use the combination of the 
> (batch
> time, and partition Id) of the RDD inside foreachRDD as the unique id for
> the data you are inserting. This will guard against multiple attempts to
> run the task that inserts into Cassandra.
>
> See
> http://spark.apache.org/docs/latest/streaming-programming-guide.html#semantics-of-output-operations
>
> TD
>
> On Sun, Jul 26, 2015 at 11:19 AM, Priya Ch <
> learnings.chitt...@gmail.com> wrote:
>
>> Hi All,
>>
>>  I have a problem when writing streaming data to cassandra. Or
>> existing product is on Oracle DB in which while wrtiting data, locks are
>> maintained such that duplicates in the DB are avoided.
>>
>> But as spark has parallel processing architecture, if more than 1
>> thread is trying to write same data i.e with same primary key, is there 
>> as
>> any scope to created duplicates? If yes, how to address this problem 
>> either
>> from spark or from cassandra side ?
>>
>> Thanks,
>> Padma Ch
>>
>
>


>>>
>>
>
>


Re: Spark Streaming

2015-07-29 Thread Gerard Maas
A side question: Any reason why you're using window(Seconds(10), Seconds(10))
instead of new StreamingContext(conf, Seconds(10)) ?
Making the micro-batch interval 10 seconds instead of 1 will provide you
the same 10-second window with less complexity.  Of course, this might just
be a test for the window functionality.

-kr, Gerard.

On Wed, Jul 29, 2015 at 10:54 AM, Sadaf  wrote:

> Hi,
>
> I am new to Spark Streaming and writing a code for twitter connector.
> I am facing the following exception.
>
> ERROR StreamingContext: Error starting the context, marking it as stopped
> org.apache.spark.SparkException:
> org.apache.spark.streaming.dstream.WindowedDStream@532d0784 has not been
> initialized
> at
> org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:321)
> at
>
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
> at
>
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
> at scala.Option.orElse(Option.scala:257)
> at
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
> at
>
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
> at
>
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
> at
>
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
> at
>
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> at
>
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> at
>
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
> at
> scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
> at
>
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120)
> at
>
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:227)
> at
>
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:222)
> at
>
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at
> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
> at
>
> org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:222)
> at
>
> org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:92)
> at
>
> org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:73)
> at
>
> org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:588)
> at
>
> org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:586)
> at twitter.streamingSpark$.twitterConnector(App.scala:38)
> at twitter.streamingSpark$.main(App.scala:26)
> at twitter.streamingSpark.main(App.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
>
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
> at
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
> at
> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
>
> the relavent code is
>
>  def twitterConnector() :Unit =
>   {
>  val atwitter=managingCredentials()
>
>val
> ssc=StreamingContext.getOrCreate("hdfs://
> 192.168.23.109:9000/home/cloud9/twitterCheckpointDir",()=>
> { managingContext() })
>fetchTweets(ssc, atwitter )
>
>ssc.start() // Start the computation
>ssc.awaitTermination()
>
>   }
>
> def managingContext():StreamingContext =
>   {
>//making spark context
>val conf = new
> SparkConf().setMaster("local[*]").setAppName("twitterConnector")
>val ssc = new StreamingContext(conf, Seconds(1))
>val sqlContext = new
> org.apache.spark.sql.SQLContext(ssc.sparkContext)
>import sqlContext.implicits._
>
>//checkpointing
>
> ssc.checkpoint("hdfs://
> 192.168.23.109:9000/home/cloud9/twitterCheckpointDir")
>ssc
>   }
>def fetchTweets (ssc : StreamingContext , atwitter :
> Option[twitter4j.auth.Authorization]) : Unit = {
>
>
>val tweets
> =TwitterUtils.createStream(ss

Re:

2015-07-07 Thread Gerard Maas
Evo,

I'd let the OP clarify the question. I'm not in position of clarifying his
requirements beyond what's written on the question.

Regarding window vs mutable union: window is a well-supported feature that
accumulates messages over time. The mutable unioning of RDDs is bound to
operational trouble as there're no warranties tied to data preservation and
it's unclear how one can produce 'cuts' of that union ready to be served
for some process/computation.  Intuitively, it will 'explode' at some point.

-kr, Gerard.



On Tue, Jul 7, 2015 at 2:06 PM, Evo Eftimov  wrote:

> spark.streaming.unpersist = false // in order for SStreaming to not drop
> the raw RDD data
>
> spark.cleaner.ttl = 
>
>
>
> why is the above suggested provided the persist/vache operation on the
> constantly unioniuzed Batch RDD will have to be invoked anyway (after every
> union with DStream RDD), besides it will result in DStraeam RDDs
> accumulating in RAM unncesesarily for the duration of TTL
>
>
>
> re
>
>
>
> “A more reliable way would be to do dstream.window(...) for the length of
> time you want to keep the data and then union that data with your RDD for
> further processing using transform.”
>
>
>
> I think the actual requirement here is picking up and adding Specific
> Messages from EVERY DStream RDD  to the Batch RDD rather than “preserving”
> messages from specific  sliding window and adding them to the Batch RDD
>
>
>
> This should be defined as the Frequency of Updates to the Batch RDD and
> then using dstream.window() equal to that frequency
>
>
>
> Can you also elaborate why you consider the dstream.window  approach more
> “reliable”
>
>
>
> *From:* Gerard Maas [mailto:gerard.m...@gmail.com]
> *Sent:* Tuesday, July 7, 2015 12:56 PM
> *To:* Anand Nalya
> *Cc:* spark users
> *Subject:* Re:
>
>
>
> Anand,
>
>
>
> AFAIK, you will need to change two settings:
>
>
>
> spark.streaming.unpersist = false // in order for SStreaming to not drop
> the raw RDD data
>
> spark.cleaner.ttl = 
>
>
>
> Also be aware that the lineage of your union RDD will grow with each batch
> interval. You will need to break lineage often with cache(), and rely on
> the ttl for clean up.
>
> You will probably be in some tricky ground with this approach.
>
>
>
> A more reliable way would be to do dstream.window(...) for the length of
> time you want to keep the data and then union that data with your RDD for
> further processing using transform.
>
> Something like:
>
> dstream.window(Seconds(900), Seconds(900)).transform(rdd => rdd union
> otherRdd)...
>
>
>
> If you need an unbound amount of dstream batch intervals, considering
> writing the data to secondary storage instead.
>
>
>
> -kr, Gerard.
>
>
>
>
>
>
>
> On Tue, Jul 7, 2015 at 1:34 PM, Anand Nalya  wrote:
>
> Hi,
>
>
>
> Suppose I have an RDD that is loaded from some file and then I also have a
> DStream that has data coming from some stream. I want to keep union some of
> the tuples from the DStream into my RDD. For this I can use something like
> this:
>
>
>
>   var myRDD: RDD[(String, Long)] = sc.fromText...
>
>   dstream.foreachRDD{ rdd =>
>
> myRDD = myRDD.union(rdd.filter(myfilter))
>
>   }
>
>
>
> My questions is that for how long spark will keep RDDs underlying the
> dstream around? Is there some configuratoin knob that can control that?
>
>
>
> Regards,
>
> Anand
>
>
>


Re:

2015-07-07 Thread Gerard Maas
Anand,

AFAIK, you will need to change two settings:

spark.streaming.unpersist = false // in order for SStreaming to not drop
the raw RDD data
spark.cleaner.ttl = 

Also be aware that the lineage of your union RDD will grow with each batch
interval. You will need to break lineage often with cache(), and rely on
the ttl for clean up.
You will probably be in some tricky ground with this approach.

A more reliable way would be to do dstream.window(...) for the length of
time you want to keep the data and then union that data with your RDD for
further processing using transform.
Something like:
dstream.window(Seconds(900), Seconds(900)).transform(rdd => rdd union
otherRdd)...

If you need an unbound amount of dstream batch intervals, considering
writing the data to secondary storage instead.

-kr, Gerard.



On Tue, Jul 7, 2015 at 1:34 PM, Anand Nalya  wrote:

> Hi,
>
> Suppose I have an RDD that is loaded from some file and then I also have a
> DStream that has data coming from some stream. I want to keep union some of
> the tuples from the DStream into my RDD. For this I can use something like
> this:
>
>   var myRDD: RDD[(String, Long)] = sc.fromText...
>   dstream.foreachRDD{ rdd =>
> myRDD = myRDD.union(rdd.filter(myfilter))
>   }
>
> My questions is that for how long spark will keep RDDs underlying the
> dstream around? Is there some configuratoin knob that can control that?
>
> Regards,
> Anand
>


Re: Time is ugly in Spark Streaming....

2015-06-26 Thread Gerard Maas
Are you sharing the SimpleDateFormat instance? This looks a lot more like
the non-thread-safe behaviour of SimpleDateFormat (that has claimed many
unsuspecting victims over the years), than any 'ugly' Spark Streaming. Try
writing the timestamps in millis to Kafka and compare.

-kr, Gerard.

On Fri, Jun 26, 2015 at 11:06 AM, Sea <261810...@qq.com> wrote:

> Hi, all
>
> I find a problem in spark streaming, when I use the time in function 
> foreachRDD...
> I find the time is very interesting.
>
> val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, 
> StringDecoder](ssc, kafkaParams, topicsSet)
>
> dataStream.map(x => createGroup(x._2, 
> dimensions)).groupByKey().foreachRDD((rdd, time) => {
>   try {
> if (!rdd.partitions.isEmpty) {
>   rdd.foreachPartition(partition => {
> handlePartition(partition, timeType, time, dimensions, outputTopic, 
> brokerList)
>   })
> }
>   } catch {
> case e: Exception => e.printStackTrace()
>   }
> })
>
>
> val dateFormat = new SimpleDateFormat("-MM-dd'T'HH:mm:ss")
>
> var date = dateFormat.format(new Date(time.milliseconds))
>
>
> Then I insert the 'date' into Kafka , but I found .
>
>
> {"timestamp":"2015-06-00T16:50:02","status":"3","type":"1","waittime":"0","count":17}
>
> {"timestamp":"2015-06-26T16:51:13","status":"1","type":"1","waittime":"0","count":34}
>
> {"timestamp":"2015-06-00T16:50:02","status":"4","type":"0","waittime":"0","count":279}
>
> {"timestamp":"2015-06-26T16:52:00","status":"11","type":"1","waittime":"0","count":9}
> {"timestamp":"0020-06-26T16:50:36
> ","status":"7","type":"0","waittime":"0","count":1722}
>
> {"timestamp":"2015-06-10T16:51:17","status":"0","type":"0","waittime":"0","count":2958}
>
> {"timestamp":"2015-06-26T16:52:00","status":"0","type":"1","waittime":"0","count":114}
>
> {"timestamp":"2015-06-10T16:51:17","status":"11","type":"0","waittime":"0","count":2066}
>
> {"timestamp":"2015-06-26T16:52:00","status":"1","type":"0","waittime":"0","count":1539}
>
>


Re: Spark Streaming reads from stdin or output from command line utility

2015-06-12 Thread Gerard Maas
Would using the socketTextStream and `yourApp | nc -lk ` work?? Not
sure how resilient the socket receiver is though. I've been playing with it
for a little demo and I don't understand yet its reconnection behavior.

Although I would think that putting some elastic buffer in between would be
a good idea to decouple producer from consumer. Kafka would be my first
choice.

-kr, Gerard.

On Fri, Jun 12, 2015 at 8:46 AM, Heath Guo  wrote:

>  Yes, it is lots of data, and the utility I'm working with prints out
> infinite real time data stream. Thanks.
>
>
>   From: Tathagata Das 
> Date: Thursday, June 11, 2015 at 11:43 PM
>
> To: Heath Guo 
> Cc: user 
> Subject: Re: Spark Streaming reads from stdin or output from command line
> utility
>
>   Is it a lot of data that is expected to come through stdin? I mean is
> it even worth parallelizing the computation using something like Spark
> Streaming?
>
> On Thu, Jun 11, 2015 at 9:56 PM, Heath Guo  wrote:
>
>>   Thanks for your reply! In my use case, it would be stream from only
>> one stdin. Also I'm working with Scala.
>> It would be great if you could talk about multi stdin case as well!
>> Thanks.
>>
>>   From: Tathagata Das 
>> Date: Thursday, June 11, 2015 at 8:11 PM
>> To: Heath Guo 
>> Cc: user 
>> Subject: Re: Spark Streaming reads from stdin or output from command
>> line utility
>>
>>Are you going to receive data from one stdin from one machine, or
>> many stdins on many machines?
>>
>>
>> On Thu, Jun 11, 2015 at 7:25 PM, foobar  wrote:
>>
>>> Hi, I'm new to Spark Streaming, and I want to create a application where
>>> Spark Streaming could create DStream from stdin. Basically I have a
>>> command
>>> line utility that generates stream data, and I'd like to pipe data into
>>> DStream. What's the best way to do that? I thought rdd.pipe() could help,
>>> but it seems that requires an rdd in the first place, which does not
>>> apply.
>>> Thanks!
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-reads-from-stdin-or-output-from-command-line-utility-tp23289.html
>>> 
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Re: Cassandra Submit

2015-06-08 Thread Gerard Maas
? = 

On Mon, Jun 8, 2015 at 10:12 AM, Yasemin Kaya  wrote:

> Hi ,
>
> How can I find spark.cassandra.connection.host? And what should I change ?
> Should I change cassandra.yaml ?
>
> Error says me *"Exception in thread "main" java.io.IOException: Failed to
> open native connection to Cassandra at {127.0.1.1}:9042"*
>
> What should I add *SparkConf sparkConf = new
> SparkConf().setAppName("JavaApiDemo").set(**"spark.driver.allowMultipleContexts",
> "true").set("spark.cassandra.connection.host", ?);*
>
> Best
> yasemin
>
> 2015-06-06 3:04 GMT+03:00 Mohammed Guller :
>
>>  Check your spark.cassandra.connection.host setting. It should be
>> pointing to one of your Cassandra nodes.
>>
>>
>>
>> Mohammed
>>
>>
>>
>> *From:* Yasemin Kaya [mailto:godo...@gmail.com]
>> *Sent:* Friday, June 5, 2015 7:31 AM
>> *To:* user@spark.apache.org
>> *Subject:* Cassandra Submit
>>
>>
>>
>> Hi,
>>
>>
>>
>> I am using cassandraDB in my project. I had that error *Exception in
>> thread "main" java.io.IOException: Failed to open native connection to
>> Cassandra at {127.0.1.1}:9042*
>>
>>
>>
>> I think I have to modify the submit line. What should I add or remove
>> when I submit my project?
>>
>>
>>
>> Best,
>>
>> yasemin
>>
>>
>>
>>
>>
>> --
>>
>> hiç ender hiç
>>
>
>
>
> --
> hiç ender hiç
>


Re: [Streaming] Configure executor logging on Mesos

2015-05-29 Thread Gerard Maas
Hi Tim,

Thanks for the info.   We (Andy Petrella and myself) have been diving a bit
deeper into this log config:

The log line I was referring to is this one (sorry, I provided the others
just for context)

*Using Spark's default log4j profile:
org/apache/spark/log4j-defaults.properties*

That line comes from Logging.scala [1] where a default config is loaded is
none is found in the classpath upon the startup of the Spark Mesos executor
in the Mesos sandbox. At that point in time, none of the
application-specific resources have been shipped yet as the executor JVM is
just starting up.   To load a custom configuration file we should have it
already on the sandbox before the executor JVM starts and add it to the
classpath on the startup command. Is that correct?

For the classpath customization, It looks like it should be possible to
pass a -Dlog4j.configuration  property by using the
'spark.executor.extraClassPath' that will be picked up at [2] and that
should be added to the command that starts the executor JVM, but the
resource must be already on the host before we can do that. Therefore we
also need some means of 'shipping' the log4j.configuration file to the
allocated executor.

This all boils down to your statement on the need of shipping extra files
to the sandbox. Bottom line: It's currently not possible to specify a
config file for your mesos executor. (ours grows several GB/day).

The only workaround I found so far is to open up the Spark assembly,
replace the log4j-default.properties and pack it up again.  That would
work, although kind of rudimentary as we use the same assembly for many
jobs.  Probably, accessing the log4j API programmatically should also work
(I didn't try that yet)

Should we open a JIRA for this functionality?

-kr, Gerard.




[1]
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Logging.scala#L128
[2]
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala#L77

On Thu, May 28, 2015 at 7:50 PM, Tim Chen  wrote:

>
> -- Forwarded message --
> From: Tim Chen 
> Date: Thu, May 28, 2015 at 10:49 AM
> Subject: Re: [Streaming] Configure executor logging on Mesos
> To: Gerard Maas 
>
>
> Hi Gerard,
>
> The log line you referred to is not Spark logging but Mesos own logging,
> which is using glog.
>
> Our own executor logs should only contain very few lines though.
>
> Most of the log lines you'll see is from Spark, and it can be controled by
> specifiying a log4j.properties to be downloaded with your Mesos task.
> Alternatively if you are downloading Spark executor via spark.executor.uri,
> you can include log4j.properties in that tar ball.
>
> I think we probably need some more configurations for Spark scheduler to
> pick up extra files to be downloaded into the sandbox.
>
> Tim
>
>
>
>
>
> On Thu, May 28, 2015 at 6:46 AM, Gerard Maas 
> wrote:
>
>> Hi,
>>
>> I'm trying to control the verbosity of the logs on the Mesos executors
>> with no luck so far. The default behaviour is INFO on stderr dump with an
>> unbounded growth that gets too big at some point.
>>
>> I noticed that when the executor is instantiated, it locates a default
>> log configuration in the spark assembly:
>>
>> I0528 13:36:22.958067 26890 exec.cpp:206] Executor registered on slave
>> 20150528-063307-780930314-5050-8152-S5
>> Spark assembly has been built with Hive, including Datanucleus jars on
>> classpath
>> Using Spark's default log4j profile:
>> org/apache/spark/log4j-defaults.properties
>>
>> So, no matter what I provide in my job jar files (or also tried with
>> (spark.executor.extraClassPath=log4j.properties) takes effect in the
>> executor's configuration.
>>
>> How should I configure the log on the executors?
>>
>> thanks, Gerard.
>>
>
>
>


[Streaming] Configure executor logging on Mesos

2015-05-28 Thread Gerard Maas
Hi,

I'm trying to control the verbosity of the logs on the Mesos executors with
no luck so far. The default behaviour is INFO on stderr dump with an
unbounded growth that gets too big at some point.

I noticed that when the executor is instantiated, it locates a default log
configuration in the spark assembly:

I0528 13:36:22.958067 26890 exec.cpp:206] Executor registered on slave
20150528-063307-780930314-5050-8152-S5
Spark assembly has been built with Hive, including Datanucleus jars on
classpath
Using Spark's default log4j profile:
org/apache/spark/log4j-defaults.properties

So, no matter what I provide in my job jar files (or also tried with
(spark.executor.extraClassPath=log4j.properties) takes effect in the
executor's configuration.

How should I configure the log on the executors?

thanks, Gerard.


Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-05-28 Thread Gerard Maas
Hi,

tl;dr At the moment (with a BIG disclaimer *) elastic scaling of spark
streaming processes is not supported.


*Longer version.*

I assume that you are talking about Spark Streaming as the discussion is
about handing Kafka streaming data.

Then you have two things to consider: the Streaming receivers and the Spark
processing cluster.

Currently, the receiving topology is static. One receiver is allocated with
each DStream instantiated and it will use 1 core in the cluster. Once the
StreamingContext is started, this topology cannot be changed, therefore the
number of Kafka receivers is fixed for the lifetime of your DStream.
What we do is to calculate the cluster capacity and use that as a fixed
upper bound (with a margin) for the receiver throughput.

There's work in progress to add a reactive model to the receiver, where
backpressure can be applied to handle overload conditions. See
https://issues.apache.org/jira/browse/SPARK-7398

Once the data is received, it will be processed in a 'classical' Spark
pipeline, so previous posts on spark resource scheduling might apply.

Regarding metrics, the standard metrics subsystem of spark will report
streaming job performance. Check the driver's metrics endpoint to peruse
the available metrics:

:/metrics/json

-kr, Gerard.


(*) Spark is a project that moves so fast that statements might be
invalidated by new work every minute.

On Thu, May 28, 2015 at 1:21 AM, dgoldenberg 
wrote:

> Hi,
>
> I'm trying to understand if there are design patterns for autoscaling Spark
> (add/remove slave machines to the cluster) based on the throughput.
>
> Assuming we can throttle Spark consumers, the respective Kafka topics we
> stream data from would start growing.  What are some of the ways to
> generate
> the metrics on the number of new messages and the rate they are piling up?
> This perhaps is more of a Kafka question; I see a pretty sparse javadoc
> with
> the Metric interface and not much else...
>
> What are some of the ways to expand/contract the Spark cluster? Someone has
> mentioned Mesos...
>
> I see some info on Spark metrics in  the Spark monitoring guide
>   .  Do we want to
> perhaps implement a custom sink that would help us autoscale up or down
> based on the throughput?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Autoscaling-Spark-cluster-based-on-topic-sizes-rate-of-growth-in-Kafka-or-Spark-s-metrics-tp23062.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark streaming closes with Cassandra Conector

2015-05-10 Thread Gerard Maas
I'm familiar with the TableWriter code and that log only appears if the
write actually succeeded. (See
https://github.com/datastax/spark-cassandra-connector/blob/master/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/TableWriter.scala
)

Thinking infrastructure, we see that it's always trying to reach
'localhost'. Are you running 1 node test in local mode?  Otherwise, there's
something wrong with the way you're configuring Cassandra or the connection
to it  (always tempted to say "her" :-)  ).

-kr, Gerard.

On Sun, May 10, 2015 at 12:47 PM, Evo Eftimov  wrote:

> I think the message that it has written 2 rows is misleading
>
>
>
> If you look further down you will see that it could not initialize a
> connection pool for Casandra (presumably while trying to write the
> previously mentioned 2 rows)
>
>
>
> Another confirmation of this hypothesis is the phrase “error during
> Transport Initialization” – so all these stuff points out in the direction
> of Infrastructure or Configuration issues – check you Casandra service and
> how you connect to it etc mate
>
>
>
> *From:* Gerard Maas [mailto:gerard.m...@gmail.com]
> *Sent:* Sunday, May 10, 2015 11:33 AM
> *To:* Sergio Jiménez Barrio; spark users
> *Subject:* Re: Spark streaming closes with Cassandra Conector
>
>
>
> It successfully writes some data and fails afterwards, like the host or
> connection goes down. Weird.
>
>
>
> Maybe you should post this question on the Spark-Cassandra connector group:
>
>
> https://groups.google.com/a/lists.datastax.com/forum/#!forum/spark-connector-user
>
>
>
>
>
> -kr, Gerard.
>
>
>
>
>
> On Sun, May 10, 2015 at 12:23 PM, Sergio Jiménez Barrio <
> drarse.a...@gmail.com> wrote:
>
> This is:
>
>
> 15/05/10 12:20:08 INFO TableWriter: Wrote 2 rows to ataques.attacks in
> 0,016 s.
> 15/05/10 12:20:08 INFO LocalNodeFirstLoadBalancingPolicy: Suspected host
> 127.0.0.1 (datacenter1)
> 15/05/10 12:20:08 ERROR Session: Error creating pool to /127.0.0.1:9042
> com.datastax.driver.core.ConnectionException: [/127.0.0.1:9042]
> Unexpected error during transport initialization
> (com.datastax.driver.core.TransportException: [/127.0.0.1:9042] Error
> writing: Closed channel)
> at
> com.datastax.driver.core.Connection.initializeTransport(Connection.java:186)
> at com.datastax.driver.core.Connection.(Connection.java:116)
> at
> com.datastax.driver.core.PooledConnection.(PooledConnection.java:32)
> at
> com.datastax.driver.core.Connection$Factory.open(Connection.java:586)
> at
> com.datastax.driver.core.DynamicConnectionPool.(DynamicConnectionPool.java:74)
> at
> com.datastax.driver.core.HostConnectionPool.newInstance(HostConnectionPool.java:33)
> at
> com.datastax.driver.core.SessionManager$2.call(SessionManager.java:231)
> at
> com.datastax.driver.core.SessionManager$2.call(SessionManager.java:224)
> at java.util.concurrent.FutureTask.run(FutureTask.java:262)
> at
> com.google.common.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293)
> at
> com.google.common.util.concurrent.AbstractListeningExecutorService.submit(AbstractListeningExecutorService.java:61)
> at
> com.datastax.driver.core.SessionManager.forceRenewPool(SessionManager.java:224)
> at com.datastax.driver.core.Cluster$Manager.onUp(Cluster.java:1469)
> at
> com.datastax.driver.core.Cluster$Manager.access$1100(Cluster.java:1144)
> at
> com.datastax.driver.core.Cluster$Manager$4.runMayThrow(Cluster.java:1562)
> at
> com.datastax.driver.core.ExceptionCatchingRunnable.run(ExceptionCatchingRunnable.java:32)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
> at java.util.concurrent.FutureTask.run(FutureTask.java:262)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: com.datastax.driver.core.TransportException: [/127.0.0.1:9042]
> Error writing: Closed channel
> at
> com.datastax.driver.core.Connection$1.operationComplete(Connection.java:432)
> at
> org.jboss.netty.channel.DefaultChannelFuture.notifyListener(DefaultChannelFuture.java:427)
> at
> org.jboss.netty.channel.DefaultChannelFuture.notifyListeners(DefaultChannelFuture.java:413)
> at
> org.jboss.netty.channel.DefaultChannelFuture.setFailure(DefaultChannelFuture.java:380)
> at
> org.jboss.netty.channel.socket.nio.AbstractNioWorker.write0(AbstractNioWorker.java:248)
> at
> org.jboss.net

Re: Spark streaming closes with Cassandra Conector

2015-05-10 Thread Gerard Maas
It successfully writes some data and fails afterwards, like the host or
connection goes down. Weird.

Maybe you should post this question on the Spark-Cassandra connector group:
https://groups.google.com/a/lists.datastax.com/forum/#!forum/spark-connector-user


-kr, Gerard.


On Sun, May 10, 2015 at 12:23 PM, Sergio Jiménez Barrio <
drarse.a...@gmail.com> wrote:

> This is:
>
> 15/05/10 12:20:08 INFO TableWriter: Wrote 2 rows to ataques.attacks in
> 0,016 s.
> 15/05/10 12:20:08 INFO LocalNodeFirstLoadBalancingPolicy: Suspected host
> 127.0.0.1 (datacenter1)
> 15/05/10 12:20:08 ERROR Session: Error creating pool to /127.0.0.1:9042
> com.datastax.driver.core.ConnectionException: [/127.0.0.1:9042]
> Unexpected error during transport initialization
> (com.datastax.driver.core.TransportException: [/127.0.0.1:9042] Error
> writing: Closed channel)
> at
> com.datastax.driver.core.Connection.initializeTransport(Connection.java:186)
> at com.datastax.driver.core.Connection.(Connection.java:116)
> at
> com.datastax.driver.core.PooledConnection.(PooledConnection.java:32)
> at
> com.datastax.driver.core.Connection$Factory.open(Connection.java:586)
> at
> com.datastax.driver.core.DynamicConnectionPool.(DynamicConnectionPool.java:74)
> at
> com.datastax.driver.core.HostConnectionPool.newInstance(HostConnectionPool.java:33)
> at
> com.datastax.driver.core.SessionManager$2.call(SessionManager.java:231)
> at
> com.datastax.driver.core.SessionManager$2.call(SessionManager.java:224)
> at java.util.concurrent.FutureTask.run(FutureTask.java:262)
> at
> com.google.common.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293)
> at
> com.google.common.util.concurrent.AbstractListeningExecutorService.submit(AbstractListeningExecutorService.java:61)
> at
> com.datastax.driver.core.SessionManager.forceRenewPool(SessionManager.java:224)
> at com.datastax.driver.core.Cluster$Manager.onUp(Cluster.java:1469)
> at
> com.datastax.driver.core.Cluster$Manager.access$1100(Cluster.java:1144)
> at
> com.datastax.driver.core.Cluster$Manager$4.runMayThrow(Cluster.java:1562)
> at
> com.datastax.driver.core.ExceptionCatchingRunnable.run(ExceptionCatchingRunnable.java:32)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
> at java.util.concurrent.FutureTask.run(FutureTask.java:262)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: com.datastax.driver.core.TransportException: [/127.0.0.1:9042]
> Error writing: Closed channel
> at
> com.datastax.driver.core.Connection$1.operationComplete(Connection.java:432)
> at
> org.jboss.netty.channel.DefaultChannelFuture.notifyListener(DefaultChannelFuture.java:427)
> at
> org.jboss.netty.channel.DefaultChannelFuture.notifyListeners(DefaultChannelFuture.java:413)
> at
> org.jboss.netty.channel.DefaultChannelFuture.setFailure(DefaultChannelFuture.java:380)
> at
> org.jboss.netty.channel.socket.nio.AbstractNioWorker.write0(AbstractNioWorker.java:248)
> at
> org.jboss.netty.channel.socket.nio.AbstractNioWorker.writeFromTaskLoop(AbstractNioWorker.java:151)
> at
> org.jboss.netty.channel.socket.nio.AbstractNioChannel$WriteTask.run(AbstractNioChannel.java:335)
> at
> org.jboss.netty.channel.socket.nio.AbstractNioSelector.processTaskQueue(AbstractNioSelector.java:372)
> at
> org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:296)
> at
> org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
> at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
> at
> org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
> at
> org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
> ... 3 more
> 15/05/10 12:20:08 ERROR ControlConnection: [Control connection] Cannot
> connect to any host, scheduling retry in 1000 milliseconds
>
> Thanks!
>
> 2015-05-10 0:58 GMT+02:00 Gerard Maas :
>
>> Hola Sergio,
>>
>> It would help if you added the error message + stack trace.
>>
>> -kr, Gerard.
>>
>> On Sat, May 9, 2015 at 11:32 PM, Sergio Jiménez Barrio <
>> drarse.a...@gmail.com> wrote:
>>
>>> I am trying save some data in Cassandra in app with spark Streaming:
>>>
>>> Messages.foreachRDD {
>>>  . . .
>>> CassandraRDD.saveToCassandra("test","test")
>>> }
>>>
>>> When I run, the app is closes when I recibe data or can't connect with
>>> Cassandra.
>>>
>>> Some idea? Thanks
>>>
>>>
>>> --
>>> Atte. Sergio Jiménez
>>>
>>
>>
>


Re: Spark streaming closes with Cassandra Conector

2015-05-09 Thread Gerard Maas
Hola Sergio,

It would help if you added the error message + stack trace.

-kr, Gerard.

On Sat, May 9, 2015 at 11:32 PM, Sergio Jiménez Barrio <
drarse.a...@gmail.com> wrote:

> I am trying save some data in Cassandra in app with spark Streaming:
>
> Messages.foreachRDD {
>  . . .
> CassandraRDD.saveToCassandra("test","test")
> }
>
> When I run, the app is closes when I recibe data or can't connect with
> Cassandra.
>
> Some idea? Thanks
>
>
> --
> Atte. Sergio Jiménez
>


Re: Map one RDD into two RDD

2015-05-07 Thread Gerard Maas
Hi Bill,

I just found weird that one would use parallel threads to 'filter', as
filter is lazy in Spark, and multithreading wouldn't have any effect unless
the action triggering the execution of the lineage containing such filter
is executed on a separate thread. One must have very specific
reasons/requirements to do that, beyond 'not traversing the data twice'.
The request for the code was only to help checking that.

-kr, Gerard.

On Thu, May 7, 2015 at 7:26 PM, Bill Q  wrote:

> The multi-threading code in Scala is quite simple and you can google it
> pretty easily. We used the Future framework. You can use Akka also.
>
> @Evo My concerns for filtering solution are: 1. Will rdd2.filter run
> before rdd1.filter finish? 2. We have to traverse rdd twice. Any comments?
>
>
>
> On Thursday, May 7, 2015, Evo Eftimov  wrote:
>
>> Scala is a language, Spark is an OO/Functional, Distributed Framework
>> facilitating Parallel Programming in a distributed environment
>>
>>
>>
>> Any “Scala parallelism” occurs within the Parallel Model imposed by the
>> Spark OO Framework – ie it is limited in terms of what it can achieve in
>> terms of influencing the Spark Framework behavior – that is the nature of
>> programming with/for frameworks
>>
>>
>>
>> When RDD1 and RDD2 are partitioned and different Actions applied to them
>> this will result in Parallel Pipelines / DAGs within the Spark Framework
>>
>> RDD1 = RDD.filter()
>>
>> RDD2 = RDD.filter()
>>
>>
>>
>>
>>
>> *From:* Bill Q [mailto:bill.q@gmail.com]
>> *Sent:* Thursday, May 7, 2015 4:55 PM
>> *To:* Evo Eftimov
>> *Cc:* user@spark.apache.org
>> *Subject:* Re: Map one RDD into two RDD
>>
>>
>>
>> Thanks for the replies. We decided to use concurrency in Scala to do the
>> two mappings using the same source RDD in parallel. So far, it seems to be
>> working. Any comments?
>>
>> On Wednesday, May 6, 2015, Evo Eftimov  wrote:
>>
>> RDD1 = RDD.filter()
>>
>> RDD2 = RDD.filter()
>>
>>
>>
>> *From:* Bill Q [mailto:bill.q@gmail.com]
>> *Sent:* Tuesday, May 5, 2015 10:42 PM
>> *To:* user@spark.apache.org
>> *Subject:* Map one RDD into two RDD
>>
>>
>>
>> Hi all,
>>
>> I have a large RDD that I map a function to it. Based on the nature of
>> each record in the input RDD, I will generate two types of data. I would
>> like to save each type into its own RDD. But I can't seem to find an
>> efficient way to do it. Any suggestions?
>>
>>
>>
>> Many thanks.
>>
>>
>>
>>
>>
>> Bill
>>
>>
>>
>> --
>>
>> Many thanks.
>>
>> Bill
>>
>>
>>
>>
>>
>> --
>>
>> Many thanks.
>>
>> Bill
>>
>>
>>
>
>
> --
> Many thanks.
>
>
> Bill
>
>


Re: Map one RDD into two RDD

2015-05-07 Thread Gerard Maas
Hi Bill,

Could you show a snippet of code to illustrate your choice?

-Gerard.

On Thu, May 7, 2015 at 5:55 PM, Bill Q  wrote:

> Thanks for the replies. We decided to use concurrency in Scala to do the
> two mappings using the same source RDD in parallel. So far, it seems to be
> working. Any comments?
>
>
> On Wednesday, May 6, 2015, Evo Eftimov  wrote:
>
>> RDD1 = RDD.filter()
>>
>> RDD2 = RDD.filter()
>>
>>
>>
>> *From:* Bill Q [mailto:bill.q@gmail.com]
>> *Sent:* Tuesday, May 5, 2015 10:42 PM
>> *To:* user@spark.apache.org
>> *Subject:* Map one RDD into two RDD
>>
>>
>>
>> Hi all,
>>
>> I have a large RDD that I map a function to it. Based on the nature of
>> each record in the input RDD, I will generate two types of data. I would
>> like to save each type into its own RDD. But I can't seem to find an
>> efficient way to do it. Any suggestions?
>>
>>
>>
>> Many thanks.
>>
>>
>>
>>
>>
>> Bill
>>
>>
>>
>> --
>>
>> Many thanks.
>>
>> Bill
>>
>>
>>
>
>
> --
> Many thanks.
>
>
> Bill
>
>


DataFrame DSL documentation

2015-05-06 Thread Gerard Maas
Hi,

Where could I find good documentation on the DataFrame DSL?
I'm struggling trying to combine selects, groupBy and aggregations.
A language definition would also help.

I perused these resources, but still have some gaps in my understanding and
things are not doing what I'd expect:

https://spark.apache.org/docs/1.3.0/sql-programming-guide.html
https://spark.apache.org/docs/1.3.0/api/scala/index.html#org.apache.spark.sql.DataFrame
https://spark.apache.org/docs/1.3.0/api/scala/index.html#org.apache.spark.sql.functions$


Thanks,  Gerard.


Re: How to deal with code that runs before foreach block in Apache Spark?

2015-05-04 Thread Gerard Maas
I'm not familiar with the Solr API but provided that ' SolrIndexerDriver'
is a singleton, I guess that what's going on when running on a cluster is
that the call to:

 SolrIndexerDriver.solrInputDocumentList.add(elem)

is happening on different singleton instances of the  SolrIndexerDriver on
different JVMs while

SolrIndexerDriver.solrServer.commit

is happening on the driver.

In practical terms, the lists on the executors are being filled-in but they
are never committed and on the driver the opposite is happening.

-kr, Gerard

On Mon, May 4, 2015 at 3:34 PM, Emre Sevinc  wrote:

> I'm trying to deal with some code that runs differently on Spark
> stand-alone mode and Spark running on a cluster. Basically, for each item
> in an RDD, I'm trying to add it to a list, and once this is done, I want to
> send this list to Solr.
>
> This works perfectly fine when I run the following code in stand-alone
> mode of Spark, but does not work when the same code is run on a cluster.
> When I run the same code on a cluster, it is like "send to Solr" part of
> the code is executed before the list to be sent to Solr is filled with
> items. I try to force the execution by solrInputDocumentJavaRDD.collect();
> after foreach, but it seems like it does not have any effect.
>
> // For each RDD
> solrInputDocumentJavaDStream.foreachRDD(
> new Function, Void>() {
>   @Override
>   public Void call(JavaRDD
> solrInputDocumentJavaRDD) throws Exception {
>
> // For each item in a single RDD
> solrInputDocumentJavaRDD.foreach(
> new VoidFunction() {
>   @Override
>   public void call(SolrInputDocument
> solrInputDocument) {
>
> // Add the solrInputDocument to the list of
> SolrInputDocuments
>
> SolrIndexerDriver.solrInputDocumentList.add(solrInputDocument);
>   }
> });
>
> // Try to force execution
> solrInputDocumentJavaRDD.collect();
>
>
> // After having finished adding every SolrInputDocument to the
> list
> // add it to the solrServer, and commit, waiting for the
> commit to be flushed
> try {
>
>   // Seems like when run in cluster mode, the list size is
> zero,
>  // therefore the following part is never executed
>
>   if (SolrIndexerDriver.solrInputDocumentList != null
>   && SolrIndexerDriver.solrInputDocumentList.size() >
> 0) {
>
> SolrIndexerDriver.solrServer.add(SolrIndexerDriver.solrInputDocumentList);
> SolrIndexerDriver.solrServer.commit(true, true);
> SolrIndexerDriver.solrInputDocumentList.clear();
>   }
> } catch (SolrServerException | IOException e) {
>   e.printStackTrace();
> }
>
>
> return null;
>   }
> }
> );
>
>
> What should I do, so that sending-to-Solr part executes after the list of
> SolrDocuments are added to solrInputDocumentList (and works also in cluster
> mode)?
>
>
> --
> Emre Sevinç
>


Re: How to do dispatching in Streaming?

2015-04-17 Thread Gerard Maas
Evo,

In Spark there's a fixed scheduling cost for each task, so more tasks mean
an increased bottom line for the same amount of work being done. The number
of tasks per batch interval should relate to the CPU resources available
for the job following the same 'rule of thumbs' than for Spark, being 2-3
times the #of cores.

In that physical model presented before, I think we could consider this
scheduling cost as a form of friction.

-kr, Gerard.

On Thu, Apr 16, 2015 at 11:47 AM, Evo Eftimov  wrote:

> Ooops – what does “more work” mean in a Parallel Programming paradigm and
> does it always translate in “inefficiency”
>
>
>
> Here are a few laws of physics in this space:
>
>
>
> 1.   More Work if done AT THE SAME time AND fully utilizes the
> cluster resources is a GOOD thing
>
> 2.   More Work which can not be done at the same time and has to be
> processed sequentially is a BAD thing
>
>
>
> So the key is whether it is about 1 or 2 and if it is about 1, whether it
> leads to e.g. Higher Throughput and Lower Latency or not
>
>
>
> Regards,
>
> Evo Eftimov
>
>
>
> *From:* Gerard Maas [mailto:gerard.m...@gmail.com]
> *Sent:* Thursday, April 16, 2015 10:41 AM
> *To:* Evo Eftimov
> *Cc:* Tathagata Das; Jianshi Huang; user; Shao, Saisai; Huang Jie
>
> *Subject:* Re: How to do dispatching in Streaming?
>
>
>
> From experience, I'd recommend using the  dstream.foreachRDD method and
> doing the filtering within that context. Extending the example of TD,
> something like this:
>
>
>
> dstream.foreachRDD { rdd =>
>
>rdd.cache()
>
>messageType.foreach (msgTyp =>
>
>val selection = rdd.filter(msgTyp.match(_))
>
> selection.foreach { ... }
>
> }
>
>rdd.unpersist()
>
> }
>
>
>
> I would discourage the use of:
>
> MessageType1DStream = MainDStream.filter(message type1)
>
> MessageType2DStream = MainDStream.filter(message type2)
>
> MessageType3DStream = MainDStream.filter(message type3)
>
>
>
> Because it will be a lot more work to process on the spark side.
>
> Each DSteam will schedule tasks for each partition, resulting in #dstream
> x #partitions x #stages tasks instead of the #partitions x #stages with the
> approach presented above.
>
>
>
>
>
> -kr, Gerard.
>
>
>
> On Thu, Apr 16, 2015 at 10:57 AM, Evo Eftimov 
> wrote:
>
> And yet another way is to demultiplex at one point which will yield
> separate DStreams for each message type which you can then process in
> independent DAG pipelines in the following way:
>
>
>
> MessageType1DStream = MainDStream.filter(message type1)
>
> MessageType2DStream = MainDStream.filter(message type2)
>
> MessageType3DStream = MainDStream.filter(message type3)
>
>
>
> Then proceed your processing independently with MessageType1DStream,
> MessageType2DStream and MessageType3DStream ie each of them is a starting
> point of a new DAG pipeline running in parallel
>
>
>
> *From:* Tathagata Das [mailto:t...@databricks.com]
> *Sent:* Thursday, April 16, 2015 12:52 AM
> *To:* Jianshi Huang
> *Cc:* user; Shao, Saisai; Huang Jie
> *Subject:* Re: How to do dispatching in Streaming?
>
>
>
> It may be worthwhile to do architect the computation in a different way.
>
>
>
> dstream.foreachRDD { rdd =>
>
>rdd.foreach { record =>
>
>   // do different things for each record based on filters
>
>}
>
> }
>
>
>
> TD
>
>
>
> On Sun, Apr 12, 2015 at 7:52 PM, Jianshi Huang 
> wrote:
>
> Hi,
>
>
>
> I have a Kafka topic that contains dozens of different types of messages.
> And for each one I'll need to create a DStream for it.
>
>
>
> Currently I have to filter the Kafka stream over and over, which is very
> inefficient.
>
>
>
> So what's the best way to do dispatching in Spark Streaming? (one DStream
> -> multiple DStreams)
>
>
>
>
> Thanks,
>
> --
>
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>
>
>
>
>


Re: How to do dispatching in Streaming?

2015-04-16 Thread Gerard Maas
>From experience, I'd recommend using the  dstream.foreachRDD method and
doing the filtering within that context. Extending the example of TD,
something like this:

dstream.foreachRDD { rdd =>
   rdd.cache()
   messageType.foreach (msgTyp =>
   val selection = rdd.filter(msgTyp.match(_))
selection.foreach { ... }
}
   rdd.unpersist()
}

I would discourage the use of:

MessageType1DStream = MainDStream.filter(message type1)

MessageType2DStream = MainDStream.filter(message type2)

MessageType3DStream = MainDStream.filter(message type3)

Because it will be a lot more work to process on the spark side.
Each DSteam will schedule tasks for each partition, resulting in #dstream x
#partitions x #stages tasks instead of the #partitions x #stages with the
approach presented above.


-kr, Gerard.

On Thu, Apr 16, 2015 at 10:57 AM, Evo Eftimov  wrote:

> And yet another way is to demultiplex at one point which will yield
> separate DStreams for each message type which you can then process in
> independent DAG pipelines in the following way:
>
>
>
> MessageType1DStream = MainDStream.filter(message type1)
>
> MessageType2DStream = MainDStream.filter(message type2)
>
> MessageType3DStream = MainDStream.filter(message type3)
>
>
>
> Then proceed your processing independently with MessageType1DStream,
> MessageType2DStream and MessageType3DStream ie each of them is a starting
> point of a new DAG pipeline running in parallel
>
>
>
> *From:* Tathagata Das [mailto:t...@databricks.com]
> *Sent:* Thursday, April 16, 2015 12:52 AM
> *To:* Jianshi Huang
> *Cc:* user; Shao, Saisai; Huang Jie
> *Subject:* Re: How to do dispatching in Streaming?
>
>
>
> It may be worthwhile to do architect the computation in a different way.
>
>
>
> dstream.foreachRDD { rdd =>
>
>rdd.foreach { record =>
>
>   // do different things for each record based on filters
>
>}
>
> }
>
>
>
> TD
>
>
>
> On Sun, Apr 12, 2015 at 7:52 PM, Jianshi Huang 
> wrote:
>
> Hi,
>
>
>
> I have a Kafka topic that contains dozens of different types of messages.
> And for each one I'll need to create a DStream for it.
>
>
>
> Currently I have to filter the Kafka stream over and over, which is very
> inefficient.
>
>
>
> So what's the best way to do dispatching in Spark Streaming? (one DStream
> -> multiple DStreams)
>
>
>
>
> Thanks,
>
> --
>
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>
>
>


Re: Writing Spark Streaming Programs

2015-03-19 Thread Gerard Maas
Try writing this Spark Streaming idiom in Java and you'll choose Scala soon
enough:

dstream.foreachRDD{rdd =>
 rdd.foreachPartition( partition => )
}

When deciding between Java and Scala for Spark, IMHO Scala has the
upperhand. If you're concerned with readability, have a look at the Scala
coding style recently open sourced by DataBricks:
https://github.com/databricks/scala-style-guide  (btw, I don't agree a good
part of it, but recognize that it can keep the most complex Scala
constructions out of your code)



On Thu, Mar 19, 2015 at 3:50 PM, James King  wrote:

> Hello All,
>
> I'm using Spark for streaming but I'm unclear one which implementation
> language to use Java, Scala or Python.
>
> I don't know anything about Python, familiar with Scala and have been
> doing Java for a long time.
>
> I think the above shouldn't influence my decision on which language to use
> because I believe the tool should, fit the problem.
>
> In terms of performance Java and Scala are comparable. However Java is OO
> and Scala is FP, no idea what Python is.
>
> If using Scala and not applying a consistent style of programming Scala
> code can become unreadable, but I do like the fact it seems to be possible
> to do so much work with so much less code, that's a strong selling point
> for me. Also it could be that the type of programming done in Spark is best
> implemented in Scala as FP language, not sure though.
>
> The question I would like your good help with is are there any other
> considerations I need to think about when deciding this? are there any
> recommendations you can make in regards to this?
>
> Regards
> jk
>
>
>
>
>
>
>


Re: Partitioning

2015-03-13 Thread Gerard Maas
In spark-streaming, the consumers will fetch data and put it into 'blocks'.
Each block becomes a partition of the rdd generated during that batch
interval.
The size of each is block controlled by the conf:
'spark.streaming.blockInterval'. That is, the amount of data the consumer
can collect in that time.

The number of  RDD partitions in a streaming interval will be then: batch
interval/ spark.streaming.blockInterval * # of consumers.

-kr, Gerard
On Mar 13, 2015 11:18 PM, "Mohit Anchlia"  wrote:

> I still don't follow how spark is partitioning data in multi node
> environment. Is there a document on how spark does portioning of data. For
> eg: in word count eg how is spark distributing words to multiple nodes?
>
> On Fri, Mar 13, 2015 at 3:01 PM, Tathagata Das 
> wrote:
>
>> If you want to access the keys in an RDD that is partition by key, then
>> you can use RDD.mapPartition(), which gives you access to the whole
>> partition as an iterator. You have the option of maintaing the
>> partitioning information or not by setting the preservePartitioning flag in
>> mapPartition (see docs). But use it at your own risk. If you modify the
>> keys, and yet preserve partitioning, the partitioning would not make sense
>> any more as the hash of the keys have changed.
>>
>> TD
>>
>>
>>
>> On Fri, Mar 13, 2015 at 2:26 PM, Mohit Anchlia 
>> wrote:
>>
>>> I am trying to look for a documentation on partitioning, which I can't
>>> seem to find. I am looking at spark streaming and was wondering how does it
>>> partition RDD in a multi node environment. Where are the keys defined that
>>> is used for partitioning? For instance in below example keys seem to be
>>> implicit:
>>>
>>> Which one is key and which one is value? Or is it called a flatMap
>>> because there are no keys?
>>>
>>> // Split each line into words
>>> JavaDStream words = lines.flatMap(
>>>   new FlatMapFunction() {
>>> @Override public Iterable call(String x) {
>>>   return Arrays.asList(x.split(" "));
>>> }
>>>   });
>>>
>>>
>>> And are Keys available inside of Function2 in case it's required for a
>>> given use case ?
>>>
>>>
>>> JavaPairDStream wordCounts = pairs.reduceByKey(
>>>   new Function2() {
>>> @Override public Integer call(Integer i1, Integer i2) throws
>>> Exception {
>>>   return i1 + i2;
>>> }
>>>   });
>>>
>>>
>>>
>>>
>>>
>>
>


Re: Unable to saveToCassandra while cassandraTable works fine

2015-03-12 Thread Gerard Maas
This: "java.lang.NoSuchMethodError"  almost always indicates a version
conflict somewhere.

It looks like you are using Spark 1.1.1 with the cassandra-spark connector
1.2.0. Try aligning those. Those metrics were introduced recently in the
1.2.0 branch of the cassandra connector.
Either upgrade your spark to 1.2.0 or downgrade the connector to something
compatible with Spark 1.1.1

-kr, Gerard

On Wed, Mar 11, 2015 at 1:42 PM, Tiwari, Tarun 
wrote:

>  Hi,
>
>
>
> I am stuck at this for 3 days now. I am using the
> spark-cassandra-connector with spark and I am able to make RDDs with
> sc.cassandraTable function that means spark is able to communicate with
> Cassandra properly.
>
>
>
> But somehow the saveToCassandra is not working. Below are the steps I am
> doing.
>
> Does it have something to do with my spark-env or spark-defaults? Am I
> missing something critical ?
>
>
>
> scala> import com.datastax.spark.connector._
>
> scala>
> sc.addJar("/home/analytics/Installers/spark-1.1.1/spark-cassandra-connector/spark-cassandra-connector/target/scala-2.10/spark-cassandra-connector-assembly-1.2.0-SNAPSHOT.jar")
>
> scala> val myTable = sc.cassandraTable("test2", " words")
>
> scala> myTable.collect()
>
> *--- this works perfectly fine.*
>
>
>
> scala> val data = sc.parallelize(Seq((81, "XXX"), (82, "")))
>
> scala> data.saveToCassandra("test2", "words", SomeColumns("word", "count"))
>
> *--- this fails*
>
>
>
> 15/03/11 15:16:45 INFO Cluster: New Cassandra host /10.131.141.192:9042
> added
>
> 15/03/11 15:16:45 INFO LocalNodeFirstLoadBalancingPolicy: Added host
> 10.131.141.192 (datacenter1)
>
> 15/03/11 15:16:45 INFO Cluster: New Cassandra host /10.131.141.193:9042
> added
>
> 15/03/11 15:16:45 INFO LocalNodeFirstLoadBalancingPolicy: Added host
> 10.131.141.193 (datacenter1)
>
> 15/03/11 15:16:45 INFO Cluster: New Cassandra host /10.131.141.200:9042
> added
>
> 15/03/11 15:16:45 INFO CassandraConnector: Connected to Cassandra cluster:
> wfan_cluster_DB
>
> 15/03/11 15:16:45 INFO SparkContext: Starting job: runJob at
> RDDFunctions.scala:29
>
> 15/03/11 15:16:45 INFO DAGScheduler: Got job 1 (runJob at
> RDDFunctions.scala:29) with 2 output partitions (allowLocal=false)
>
> 15/03/11 15:16:45 INFO DAGScheduler: Final stage: Stage 1(runJob at
> RDDFunctions.scala:29)
>
> 15/03/11 15:16:45 INFO DAGScheduler: Parents of final stage: List()
>
> 15/03/11 15:16:45 INFO DAGScheduler: Missing parents: List()
>
> 15/03/11 15:16:45 INFO DAGScheduler: Submitting Stage 1
> (ParallelCollectionRDD[1] at parallelize at :20), which has no
> missing parents
>
> 15/03/11 15:16:45 INFO MemoryStore: ensureFreeSpace(7400) called with
> curMem=1792, maxMem=2778778828
>
> 15/03/11 15:16:45 INFO MemoryStore: Block broadcast_1 stored as values in
> memory (estimated size 7.2 KB, free 2.6 GB)
>
> 15/03/11 15:16:45 INFO MemoryStore: ensureFreeSpace(3602) called with
> curMem=9192, maxMem=2778778828
>
> 15/03/11 15:16:45 INFO MemoryStore: Block broadcast_1_piece0 stored as
> bytes in memory (estimated size 3.5 KB, free 2.6 GB)
>
> 15/03/11 15:16:45 INFO BlockManagerInfo: Added broadcast_1_piece0 in
> memory on 10.131.141.200:56502 (size: 3.5 KB, free: 2.6 GB)
>
> 15/03/11 15:16:45 INFO BlockManagerMaster: Updated info of block
> broadcast_1_piece0
>
> 15/03/11 15:16:45 INFO DAGScheduler: Submitting 2 missing tasks from Stage
> 1 (ParallelCollectionRDD[1] at parallelize at :20)
>
> 15/03/11 15:16:45 INFO TaskSchedulerImpl: Adding task set 1.0 with 2 tasks
>
> 15/03/11 15:16:45 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID
> 2, 10.131.141.192, PROCESS_LOCAL, 1216 bytes)
>
> 15/03/11 15:16:45 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID
> 3, 10.131.141.193, PROCESS_LOCAL, 1217 bytes)
>
> 15/03/11 15:16:45 INFO BlockManagerInfo: Added broadcast_1_piece0 in
> memory on 10.131.141.193:51660 (size: 3.5 KB, free: 267.3 MB)
>
> 15/03/11 15:16:45 INFO BlockManagerInfo: Added broadcast_1_piece0 in
> memory on 10.131.141.192:32875 (size: 3.5 KB, free: 267.3 MB)
>
> 15/03/11 15:16:45 INFO CassandraConnector: Disconnected from Cassandra
> cluster: wfan_cluster_DB
>
> 15/03/11 15:16:46 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 2,
> 10.131.141.192): java.lang.NoSuchMethodError:
> org.apache.spark.executor.TaskMetrics.outputMetrics()Lscala/Option;
>
>
> com.datastax.spark.connector.metrics.OutputMetricsUpdater$.apply(OutputMetricsUpdater.scala:70)
>
>
> com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:119)
>
>
> com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:29)
>
>
> com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:29)
>
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>
> org.apache.spark.scheduler.Task.run(Task.scala:54)
>
>
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
>
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor

Re: Can't I mix non-Spark properties into a .properties file and pass it to spark-submit via --properties-file?

2015-02-17 Thread Gerard Maas
+1 for TypeSafe config
Our practice is to include all spark properties under a 'spark' entry in
the config file alongside job-specific configuration:

A config file would look like:
spark {
 master = ""
 cleaner.ttl = 123456
 ...
}
job {
context {
src = "foo"
action = "barAction"
}
prop1 = "val1"
}

Then, to create our Spark context, we transparently pass the spark section
to a SparkConf instance.
This idiom will instantiate the context with the spark specific
configuration:

sparkConfig.setAll(configToStringSeq(config.getConfig("spark").atPath("spark")))

And we can make use of the config object everywhere else.

We use the override model of the typesafe config: reasonable defaults go in
the reference.conf (within the jar). Environment-specific overrides go in
the application.conf (alongside the job jar) and hacks are passed with
-Dprop=value :-)


-kr, Gerard.


On Tue, Feb 17, 2015 at 1:45 PM, Emre Sevinc  wrote:

> I've decided to try
>
>   spark-submit ... --conf
> "spark.driver.extraJavaOptions=-DpropertiesFile=/home/emre/data/myModule.properties"
>
> But when I try to retrieve the value of propertiesFile via
>
>System.err.println("propertiesFile : " +
> System.getProperty("propertiesFile"));
>
> I get NULL:
>
>propertiesFile : null
>
> Interestingly, when I run spark-submit with --verbose, I see that it
> prints:
>
>   spark.driver.extraJavaOptions ->
> -DpropertiesFile=/home/emre/data/belga/schemavalidator.properties
>
> I couldn't understand why I couldn't get to the value of "propertiesFile"
> by using standard System.getProperty method. (I can use new
> SparkConf().get("spark.driver.extraJavaOptions")  and manually parse it,
> and retrieve the value, but I'd like to know why I cannot retrieve that
> value using System.getProperty method).
>
> Any ideas?
>
> If I can achieve what I've described above properly, I plan to pass a
> properties file that resides on HDFS, so that it will be available to my
> driver program wherever that program runs.
>
> --
> Emre
>
>
>
>
> On Mon, Feb 16, 2015 at 4:41 PM, Charles Feduke 
> wrote:
>
>> I haven't actually tried mixing non-Spark settings into the Spark
>> properties. Instead I package my properties into the jar and use the
>> Typesafe Config[1] - v1.2.1 - library (along with Ficus[2] - Scala
>> specific) to get at my properties:
>>
>> Properties file: src/main/resources/integration.conf
>>
>> (below $ENV might be set to either "integration" or "prod"[3])
>>
>> ssh -t root@$HOST "/root/spark/bin/spark-shell --jars /root/$JAR_NAME \
>> --conf 'config.resource=$ENV.conf' \
>> --conf 'spark.executor.extraJavaOptions=-Dconfig.resource=$ENV.conf'"
>>
>> Since the properties file is packaged up with the JAR I don't have to
>> worry about sending the file separately to all of the slave nodes. Typesafe
>> Config is written in Java so it will work if you're not using Scala. (The
>> Typesafe Config also has the advantage of being extremely easy to integrate
>> with code that is using Java Properties today.)
>>
>> If you instead want to send the file separately from the JAR and you use
>> the Typesafe Config library, you can specify "config.file" instead of
>> ".resource"; though I'd point you to [3] below if you want to make your
>> development life easier.
>>
>> 1. https://github.com/typesafehub/config
>> 2. https://github.com/ceedubs/ficus
>> 3.
>> http://deploymentzone.com/2015/01/27/spark-ec2-and-easy-spark-shell-deployment/
>>
>>
>>
>> On Mon Feb 16 2015 at 10:27:01 AM Emre Sevinc 
>> wrote:
>>
>>> Hello,
>>>
>>> I'm using Spark 1.2.1 and have a module.properties file, and in it I
>>> have non-Spark properties, as well as Spark properties, e.g.:
>>>
>>>job.output.dir=file:///home/emre/data/mymodule/out
>>>
>>> I'm trying to pass it to spark-submit via:
>>>
>>>spark-submit --class com.myModule --master local[4] --deploy-mode
>>> client --verbose --properties-file /home/emre/data/mymodule.properties
>>> mymodule.jar
>>>
>>> And I thought I could read the value of my non-Spark property, namely,
>>> job.output.dir by using:
>>>
>>> SparkConf sparkConf = new SparkConf();
>>> final String validatedJSONoutputDir =
>>> sparkConf.get("job.output.dir");
>>>
>>> But it gives me an exception:
>>>
>>> Exception in thread "main" java.util.NoSuchElementException:
>>> job.output.dir
>>>
>>> Is it not possible to mix Spark and non-Spark properties in a single
>>> .properties file, then pass it via --properties-file and then get the
>>> values of those non-Spark properties via SparkConf?
>>>
>>> Or is there another object / method to retrieve the values for those
>>> non-Spark properties?
>>>
>>>
>>> --
>>> Emre Sevinç
>>>
>>
>
>
> --
> Emre Sevinc
>


Re: Streaming scheduling delay

2015-02-12 Thread Gerard Maas
Hi Tim,

>From this: " There are 5 kafka receivers and each incoming stream is split
into 40 partitions"  I suspect that you're creating too many tasks for
Spark to process on time.
Could you try some of the 'knobs' I describe here to see if that would help?

http://www.virdata.com/tuning-spark/

-kr, Gerard.

On Thu, Feb 12, 2015 at 8:44 AM, Tim Smith  wrote:

> Just read the thread "Are these numbers abnormal for spark streaming?" and
> I think I am seeing similar results - that is - increasing the window seems
> to be the trick here. I will have to monitor for a few hours/days before I
> can conclude (there are so many knobs/dials).
>
>
>
> On Wed, Feb 11, 2015 at 11:16 PM, Tim Smith  wrote:
>
>> On Spark 1.2 (have been seeing this behaviour since 1.0), I have a
>> streaming app that consumes data from Kafka and writes it back to Kafka
>> (different topic). My big problem has been Total Delay. While execution
>> time is usually > minutes to hours(s) (keeps going up).
>>
>> For a little while, I thought I had solved the issue by bumping up the
>> driver memory. Then I expanded my Kafka cluster to add more nodes and the
>> issue came up again. I tried a few things to smoke out the issue and
>> something tells me the driver is the bottleneck again:
>>
>> 1) From my app, I took out the entire write-out-to-kafka piece. Sure
>> enough, execution, scheduling delay and hence total delay fell to sub
>> second. This assured me that whatever processing I do before writing back
>> to kafka isn't the bottleneck.
>>
>> 2) In my app, I had RDD persistence set at different points but my code
>> wasn't really re-using any RDDs so I took out all explicit persist()
>> statements. And added, "spar...unpersist" to "true" in the context. After
>> this, it doesn't seem to matter how much memory I give my executor, the
>> total delay seems to be in the same range. I tried per executor memory from
>> 2G to 12G with no change in total delay so executors aren't memory starved.
>> Also, in the SparkUI, under the Executors tab, all executors show 0/1060MB
>> used when per executor memory is set to 2GB, for example.
>>
>> 3) Input rate in the kafka consumer restricts spikes in incoming data.
>>
>> 4) Tried FIFO and FAIR but didn't make any difference.
>>
>> 5) Adding executors beyond a certain points seems useless (I guess excess
>> ones just sit idle).
>>
>> At any given point in time, the SparkUI shows only one batch pending
>> processing. So with just one batch pending processing, why would the
>> scheduling delay run into minutes/hours if execution time is within the
>> batch window duration? There aren't any failed stages or jobs.
>>
>> Right now, I have 100 executors ( i have tried setting executors from
>> 50-150), each with 2GB and 4 cores and the driver running with 16GB. There
>> are 5 kafka receivers and each incoming stream is split into 40 partitions.
>> Per receiver, input rate is restricted to 2 messages per second.
>>
>> Can anyone help me with clues or areas to look into, for troubleshooting
>> the issue?
>>
>> One nugget I found buried in the code says:
>> "The scheduler delay includes the network delay to send the task to the
>> worker machine and to send back the result (but not the time to fetch the
>> task result, if it needed to be fetched from the block manager on the
>> worker)."
>>
>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
>>
>> Could this be an issue with the driver being a bottlneck? All the
>> executors posting their logs/stats to the driver?
>>
>> Thanks,
>>
>> Tim
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>


Re: Writing RDD to a csv file

2015-02-03 Thread Gerard Maas
this is more of a scala question, so probably next time you'd like to
address a Scala forum eg. http://stackoverflow.com/questions/tagged/scala

val optArrStr:Option[Array[String]] = ???
optArrStr.map(arr => arr.mkString(",")).getOrElse("")  // empty string or
whatever default value you have for this.

kr, Gerard.

On Tue, Feb 3, 2015 at 2:09 PM, kundan kumar  wrote:

> I have a RDD which is of type
>
> org.apache.spark.rdd.RDD[(String, (Array[String], Option[Array[String]]))]
>
> I want to write it as a csv file.
>
> Please suggest how this can be done.
>
> myrdd.map(line => (line._1 + "," + line._2._1.mkString(",") + "," +
> line._2._2.mkString(','))).saveAsTextFile("hdfs://...")
>
> Doing mkString on line._2._1 works but does not work for the Option type.
>
> Please suggest how this can be done.
>
>
> Thanks
> Kundan
>
>
>


Re: Spark (Streaming?) holding on to Mesos resources

2015-01-29 Thread Gerard Maas
Thanks a lot.

After reading Mesos-1688, I still don't understand how/why a job will hoard
and hold on to so many resources even in the presence of that bug.
Looking at the release notes, I think this ticket could be relevant to
preventing the behavior we're seeing:
[MESOS-186] - Resource offers should be rescinded after some configurable
timeout

Bottom line, we're following your advice and we're testing Mesos 0.21 on
dev to roll out to our prod platforms later on.

Thanks!!

-kr, Gerard.


On Tue, Jan 27, 2015 at 9:15 PM, Tim Chen  wrote:

> Hi Gerard,
>
> As others has mentioned I believe you're hitting Mesos-1688, can you
> upgrade to the latest Mesos release (0.21.1) and let us know if it resolves
> your problem?
>
> Thanks,
>
> Tim
>
> On Tue, Jan 27, 2015 at 10:39 AM, Sam Bessalah 
> wrote:
>
>> Hi Geraard,
>> isn't this the same issueas this?
>> https://issues.apache.org/jira/browse/MESOS-1688
>>
>> On Mon, Jan 26, 2015 at 9:17 PM, Gerard Maas 
>> wrote:
>>
>>> Hi,
>>>
>>> We are observing with certain regularity that our Spark  jobs, as Mesos
>>> framework, are hoarding resources and not releasing them, resulting in
>>> resource starvation to all jobs running on the Mesos cluster.
>>>
>>> For example:
>>> This is a job that has spark.cores.max = 4 and spark.executor.memory="3g"
>>>
>>> IDFrameworkHostCPUsMem…5050-16506-1146497FooStreaming
>>> dnode-4.hdfs.private713.4 GB…5050-16506-1146495FooStreaming
>>> dnode-0.hdfs.private16.4 GB…5050-16506-1146491FooStreaming
>>> dnode-5.hdfs.private711.9 GB…5050-16506-1146449FooStreaming
>>> dnode-3.hdfs.private74.9 GB…5050-16506-1146247FooStreaming
>>> dnode-1.hdfs.private0.55.9 GB…5050-16506-1146226FooStreaming
>>> dnode-2.hdfs.private37.9 GB…5050-16506-1144069FooStreaming
>>> dnode-3.hdfs.private18.7 GB…5050-16506-1133091FooStreaming
>>> dnode-5.hdfs.private11.7 GB…5050-16506-1133090FooStreaming
>>> dnode-2.hdfs.private55.2 GB…5050-16506-1133089FooStreaming
>>> dnode-1.hdfs.private6.56.3 GB…5050-16506-1133088FooStreaming
>>> dnode-4.hdfs.private1251 MB…5050-16506-1133087FooStreaming
>>> dnode-0.hdfs.private6.46.8 GB
>>> The only way to release the resources is by manually finding the process
>>> in the cluster and killing it. The jobs are often streaming but also batch
>>> jobs show this behavior. We have more streaming jobs than batch, so stats
>>> are biased.
>>> Any ideas of what's up here? Hopefully some very bad ugly bug that has
>>> been fixed already and that will urge us to upgrade our infra?
>>>
>>> Mesos 0.20 +  Marathon 0.7.4 + Spark 1.1.0
>>>
>>> -kr, Gerard.
>>>
>>
>>
>


Re: how to split key from RDD for compute UV

2015-01-27 Thread Gerard Maas
Hi,

Did you try asking this on StackOverflow?
http://stackoverflow.com/questions/tagged/apache-spark

I'd also suggest adding some sample data to help others understanding your
logic.

-kr, Gerard.



On Tue, Jan 27, 2015 at 1:14 PM, 老赵  wrote:

> Hello All,
>
> I am writing a simple Spark application  to count  UV(unique view) from a
> log file。
>
> Below is my code,it is not right on the red line .
>
> My idea  here is same cookie on a host  only count one .So i want to split
> the host from the previous RDD. But now I don't know how to finish it .
>
> Any suggestion will be appreciate!
>
>
>
> val url_index = args(1).toInt
>
> val cookie_index = args(2).toInt
>
> val textRDD = sc.textFile(args(0))
>
> .map(_.split("\t"))
>
> .map(line => ((new java.net.URL(line(url_index)).getHost) + "\t" +
> line(cookie_index),1))
>
> .reduceByKey(_ + _)
>
> .map(line => (line.split("\t")(0),1))
>
> .reduceByKey(_ + _)
>
> .map(item => item.swap)
>
> .sortByKey(false)
>
> .map(item => item.swap)
>
> 
>
>


Re: Spark (Streaming?) holding on to Mesos Resources

2015-01-26 Thread Gerard Maas
Hi Jörn,

A memory leak on the job would be contained within the resources reserved
for it, wouldn't it?
And the job holding resources is not always the same. Sometimes it's one of
the Streaming jobs, sometimes it's a heavy batch job that runs every hour.
Looks to me that whatever is causing the issue, it's participating in the
resource offer protocol of Mesos and my first suspect would be the Mesos
scheduler in Spark. (The table above is the tab "Offers" from the Mesos UI.

Are there any other factors involved in the offer acceptance/rejection
between Mesos and a scheduler?

What do you think?

-kr, Gerard.

On Mon, Jan 26, 2015 at 11:23 PM, Jörn Franke  wrote:

> Hi,
>
> What do your jobs do?  Ideally post source code, but some description
> would already helpful to support you.
>
> Memory leaks can have several reasons - it may not be Spark at all.
>
> Thank you.
>
> Le 26 janv. 2015 22:28, "Gerard Maas"  a écrit :
>
> >
> > (looks like the list didn't like a HTML table on the previous email. My
> excuses for any duplicates)
> >
> > Hi,
> >
> > We are observing with certain regularity that our Spark  jobs, as Mesos
> framework, are hoarding resources and not releasing them, resulting in
> resource starvation to all jobs running on the Mesos cluster.
> >
> > For example:
> > This is a job that has spark.cores.max = 4 and spark.executor.memory="3g"
> >
> > | ID   |Framework  |Host|CPUs  |Mem
> > …5050-16506-1146497 FooStreaming dnode-4.hdfs.private 7 13.4 GB
> > …5050-16506-1146495 FooStreamingdnode-0.hdfs.private 1 6.4 GB
> > …5050-16506-1146491 FooStreamingdnode-5.hdfs.private 7 11.9 GB
> > …5050-16506-1146449 FooStreamingdnode-3.hdfs.private 7 4.9 GB
> > …5050-16506-1146247 FooStreamingdnode-1.hdfs.private 0.5 5.9 GB
> > …5050-16506-1146226 FooStreamingdnode-2.hdfs.private 3 7.9 GB
> > …5050-16506-1144069 FooStreamingdnode-3.hdfs.private 1 8.7 GB
> > …5050-16506-1133091 FooStreamingdnode-5.hdfs.private 1 1.7 GB
> > …5050-16506-1133090 FooStreamingdnode-2.hdfs.private 5 5.2 GB
> > …5050-16506-1133089 FooStreamingdnode-1.hdfs.private 6.5 6.3 GB
> > …5050-16506-1133088 FooStreamingdnode-4.hdfs.private 1 251 MB
> > …5050-16506-1133087 FooStreamingdnode-0.hdfs.private 6.4 6.8 GB
> >
> > The only way to release the resources is by manually finding the process
> in the cluster and killing it. The jobs are often streaming but also batch
> jobs show this behavior. We have more streaming jobs than batch, so stats
> are biased.
> > Any ideas of what's up here? Hopefully some very bad ugly bug that has
> been fixed already and that will urge us to upgrade our infra?
> >
> > Mesos 0.20 +  Marathon 0.7.4 + Spark 1.1.0
> >
> > -kr, Gerard.
>
>


Spark (Streaming?) holding on to Mesos Resources

2015-01-26 Thread Gerard Maas
(looks like the list didn't like a HTML table on the previous email. My
excuses for any duplicates)

Hi,

We are observing with certain regularity that our Spark  jobs, as Mesos
framework, are hoarding resources and not releasing them, resulting in
resource starvation to all jobs running on the Mesos cluster.

For example:
This is a job that has spark.cores.max = 4 and spark.executor.memory="3g"

| ID   |Framework  |Host|CPUs  |Mem
…5050-16506-1146497 FooStreaming dnode-4.hdfs.private 7 13.4 GB
…5050-16506-1146495 FooStreamingdnode-0.hdfs.private 1 6.4 GB
…5050-16506-1146491 FooStreamingdnode-5.hdfs.private 7 11.9 GB
…5050-16506-1146449 FooStreamingdnode-3.hdfs.private 7 4.9 GB
…5050-16506-1146247 FooStreamingdnode-1.hdfs.private 0.5 5.9 GB
…5050-16506-1146226 FooStreamingdnode-2.hdfs.private 3 7.9 GB
…5050-16506-1144069 FooStreamingdnode-3.hdfs.private 1 8.7 GB
…5050-16506-1133091 FooStreamingdnode-5.hdfs.private 1 1.7 GB
…5050-16506-1133090 FooStreamingdnode-2.hdfs.private 5 5.2 GB
…5050-16506-1133089 FooStreamingdnode-1.hdfs.private 6.5 6.3 GB
…5050-16506-1133088 FooStreamingdnode-4.hdfs.private 1 251 MB
…5050-16506-1133087 FooStreamingdnode-0.hdfs.private 6.4 6.8 GB

The only way to release the resources is by manually finding the process in
the cluster and killing it. The jobs are often streaming but also batch
jobs show this behavior. We have more streaming jobs than batch, so stats
are biased.
Any ideas of what's up here? Hopefully some very bad ugly bug that has been
fixed already and that will urge us to upgrade our infra?

Mesos 0.20 +  Marathon 0.7.4 + Spark 1.1.0

-kr, Gerard.


Spark (Streaming?) holding on to Mesos resources

2015-01-26 Thread Gerard Maas
Hi,

We are observing with certain regularity that our Spark  jobs, as Mesos
framework, are hoarding resources and not releasing them, resulting in
resource starvation to all jobs running on the Mesos cluster.

For example:
This is a job that has spark.cores.max = 4 and spark.executor.memory="3g"

IDFrameworkHostCPUsMem…5050-16506-1146497FooStreamingdnode-4.hdfs.private713.4
GB…5050-16506-1146495FooStreaming
dnode-0.hdfs.private16.4 GB…5050-16506-1146491FooStreaming
dnode-5.hdfs.private711.9 GB…5050-16506-1146449FooStreaming
dnode-3.hdfs.private74.9 GB…5050-16506-1146247FooStreaming
dnode-1.hdfs.private0.55.9 GB…5050-16506-1146226FooStreaming
dnode-2.hdfs.private37.9 GB…5050-16506-1144069FooStreaming
dnode-3.hdfs.private18.7 GB…5050-16506-1133091FooStreaming
dnode-5.hdfs.private11.7 GB…5050-16506-1133090FooStreaming
dnode-2.hdfs.private55.2 GB…5050-16506-1133089FooStreaming
dnode-1.hdfs.private6.56.3 GB…5050-16506-1133088FooStreaming
dnode-4.hdfs.private1251 MB…5050-16506-1133087FooStreaming
dnode-0.hdfs.private6.46.8 GB
The only way to release the resources is by manually finding the process in
the cluster and killing it. The jobs are often streaming but also batch
jobs show this behavior. We have more streaming jobs than batch, so stats
are biased.
Any ideas of what's up here? Hopefully some very bad ugly bug that has been
fixed already and that will urge us to upgrade our infra?

Mesos 0.20 +  Marathon 0.7.4 + Spark 1.1.0

-kr, Gerard.


Re: Discourse: A proposed alternative to the Spark User list

2015-01-23 Thread Gerard Maas
+1

On Fri, Jan 23, 2015 at 5:58 PM, Nicholas Chammas <
nicholas.cham...@gmail.com> wrote:

> That sounds good to me. Shall I open a JIRA / PR about updating the site
> community page?
> On 2015년 1월 23일 (금) at 오전 4:37 Patrick Wendell 
> wrote:
>
>> Hey Nick,
>>
>> So I think we what can do is encourage people to participate on the
>> stack overflow topic, and this I think we can do on the Spark website
>> as a first class community resource for Spark. We should probably be
>> spending more time on that site given its popularity.
>>
>> In terms of encouraging this explicitly *to replace* the ASF mailing
>> list, that I think is harder to do. The ASF makes a lot of effort to
>> host its own infrastructure that is neutral and not associated with
>> any corporation. And by and large the ASF policy is to consider that
>> as the de-facto forum of communication for any project.
>>
>> Personally, I wish the ASF would update this policy - for instance, by
>> allowing the use of third party lists or communication fora - provided
>> that they allow exporting the conversation if those sites were to
>> change course. However, the state of the art stands as such.
>>
>> - Patrick
>>
>>
>> On Wed, Jan 21, 2015 at 8:43 AM, Nicholas Chammas
>>  wrote:
>> > Josh / Patrick,
>> >
>> > What do y’all think of the idea of promoting Stack Overflow as a place
>> to
>> > ask questions over this list, as long as the questions fit SO’s
>> guidelines
>> > (how-to-ask, dont-ask)?
>> >
>> > The apache-spark tag is very active on there.
>> >
>> > Discussions of all types are still on-topic here, but when possible we
>> want
>> > to encourage people to use SO.
>> >
>> > Nick
>> >
>> > On Wed Jan 21 2015 at 8:37:05 AM Jay Vyas jayunit100.apa...@gmail.com
>> wrote:
>> >>
>> >> Its a very valid  idea indeed, but... It's a tricky  subject since the
>> >> entire ASF is run on mailing lists , hence there are so many different
>> but
>> >> equally sound ways of looking at this idea, which conflict with one
>> another.
>> >>
>> >> > On Jan 21, 2015, at 7:03 AM, btiernay  wrote:
>> >> >
>> >> > I think this is a really great idea for really opening up the
>> >> > discussions
>> >> > that happen here. Also, it would be nice to know why there doesn't
>> seem
>> >> > to
>> >> > be much interest. Maybe I'm misunderstanding some nuance of Apache
>> >> > projects.
>> >> >
>> >> > Cheers
>> >> >
>> >> >
>> >> >
>> >> > --
>> >> > View this message in context:
>> >> > http://apache-spark-user-list.1001560.n3.nabble.com/
>> Discourse-A-proposed-alternative-to-the-Spark-User-
>> list-tp20851p21288.html
>> >> > Sent from the Apache Spark User List mailing list archive at
>> Nabble.com.
>> >> >
>> >> > 
>> -
>> >> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> >> > For additional commands, e-mail: user-h...@spark.apache.org
>> >> >
>> >>
>> >> -
>> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> >> For additional commands, e-mail: user-h...@spark.apache.org
>> >>
>> >
>>
>


Re: Are these numbers abnormal for spark streaming?

2015-01-22 Thread Gerard Maas
Given that the process, and in particular, the setup of connections, is
bound to the number of partitions (in x.foreachPartition{ x=> ???}), I
think it would be worth trying reducing them.
Increasing the  'spark.streaming.BlockInterval' will do the trick (you can
read the tuning details here:
http://www.virdata.com/tuning-spark/#Partitions)

-kr, Gerard.

On Thu, Jan 22, 2015 at 4:28 PM, Gerard Maas  wrote:

> So the system has gone from 7msg in 4.961 secs (median) to 106msgs in
> 4,761 seconds.
> I think there's evidence that setup costs are quite high in this case and
> increasing the batch interval is helping.
>
> On Thu, Jan 22, 2015 at 4:12 PM, Sudipta Banerjee <
> asudipta.baner...@gmail.com> wrote:
>
>> Hi Ashic Mahtab,
>>
>> The Cassandra and the Zookeeper are they installed as a part of Yarn
>> architecture or are they installed in a separate layer with Apache Spark .
>>
>> Thanks and Regards,
>> Sudipta
>>
>> On Thu, Jan 22, 2015 at 8:13 PM, Ashic Mahtab  wrote:
>>
>>> Hi Guys,
>>> So I changed the interval to 15 seconds. There's obviously a lot more
>>> messages per batch, but (I think) it looks a lot healthier. Can you see any
>>> major warning signs? I think that with 2 second intervals, the setup /
>>> teardown per partition was what was causing the delays.
>>>
>>> Streaming
>>>
>>>- *Started at: *Thu Jan 22 13:23:12 GMT 2015
>>>- *Time since start: *1 hour 17 minutes 16 seconds
>>>- *Network receivers: *2
>>>- *Batch interval: *15 seconds
>>>- *Processed batches: *309
>>>- *Waiting batches: *0
>>>
>>>
>>>
>>> Statistics over last 100 processed batchesReceiver Statistics
>>>
>>>- Receiver
>>>
>>>
>>>- Status
>>>
>>>
>>>- Location
>>>
>>>
>>>- Records in last batch
>>>- [2015/01/22 14:40:29]
>>>
>>>
>>>- Minimum rate
>>>- [records/sec]
>>>
>>>
>>>- Median rate
>>>- [records/sec]
>>>
>>>
>>>- Maximum rate
>>>- [records/sec]
>>>
>>>
>>>- Last Error
>>>
>>> RmqReceiver-0ACTIVEVDCAPP53.foo.local2.6 K29106295-RmqReceiver-1ACTIVE
>>> VDCAPP50.bar.local2.6 K29107291-
>>> Batch Processing Statistics
>>>
>>>MetricLast batchMinimum25th percentileMedian75th 
>>> percentileMaximumProcessing
>>>Time4 seconds 812 ms4 seconds 698 ms4 seconds 738 ms4 seconds 761 ms4
>>>seconds 788 ms5 seconds 802 msScheduling Delay2 ms0 ms3 ms3 ms4 ms9
>>>msTotal Delay4 seconds 814 ms4 seconds 701 ms4 seconds 739 ms4
>>>seconds 764 ms4 seconds 792 ms5 seconds 809 ms
>>>
>>>
>>> Regards,
>>> Ashic.
>>> --
>>> From: as...@live.com
>>> To: gerard.m...@gmail.com
>>> CC: user@spark.apache.org
>>> Subject: RE: Are these numbers abnormal for spark streaming?
>>> Date: Thu, 22 Jan 2015 12:32:05 +
>>>
>>>
>>> Hi Gerard,
>>> Thanks for the response.
>>>
>>> The messages get desrialised from msgpack format, and one of the strings
>>> is desrialised to json. Certain fields are checked to decide if further
>>> processing is required. If so, it goes through a series of in mem filters
>>> to check if more processing is required. If so, only then does the "heavy"
>>> work start. That consists of a few db queries, and potential updates to the
>>> db + message on message queue. The majority of messages don't need
>>> processing. The messages needing processing at peak are about three every
>>> other second.
>>>
>>> One possible things that might be happening is the session
>>> initialisation and prepared statement initialisation for each partition. I
>>> can resort to some tricks, but I think I'll try increasing batch interval
>>> to 15 seconds. I'll report back with findings.
>>>
>>> Thanks,
>>> Ashic.
>>>
>>> --
>>> From: gerard.m...@gmail.com
>>> Date: Thu, 22 Jan 2015 12:30:08 +0100
>>> Subject: Re: Are these numbers abnormal for spark streaming?
>>> To: tathagata.das1...@gmail.com
>>> CC: as...@live.com; t...@databricks.com; user@spark.apache.org
>>>
>>> and post the code (if possible).
>>> In 

Re: Are these numbers abnormal for spark streaming?

2015-01-22 Thread Gerard Maas
So the system has gone from 7msg in 4.961 secs (median) to 106msgs in 4,761
seconds.
I think there's evidence that setup costs are quite high in this case and
increasing the batch interval is helping.

On Thu, Jan 22, 2015 at 4:12 PM, Sudipta Banerjee <
asudipta.baner...@gmail.com> wrote:

> Hi Ashic Mahtab,
>
> The Cassandra and the Zookeeper are they installed as a part of Yarn
> architecture or are they installed in a separate layer with Apache Spark .
>
> Thanks and Regards,
> Sudipta
>
> On Thu, Jan 22, 2015 at 8:13 PM, Ashic Mahtab  wrote:
>
>> Hi Guys,
>> So I changed the interval to 15 seconds. There's obviously a lot more
>> messages per batch, but (I think) it looks a lot healthier. Can you see any
>> major warning signs? I think that with 2 second intervals, the setup /
>> teardown per partition was what was causing the delays.
>>
>> Streaming
>>
>>- *Started at: *Thu Jan 22 13:23:12 GMT 2015
>>- *Time since start: *1 hour 17 minutes 16 seconds
>>- *Network receivers: *2
>>- *Batch interval: *15 seconds
>>- *Processed batches: *309
>>- *Waiting batches: *0
>>
>>
>>
>> Statistics over last 100 processed batchesReceiver Statistics
>>
>>- Receiver
>>
>>
>>- Status
>>
>>
>>- Location
>>
>>
>>- Records in last batch
>>- [2015/01/22 14:40:29]
>>
>>
>>- Minimum rate
>>- [records/sec]
>>
>>
>>- Median rate
>>- [records/sec]
>>
>>
>>- Maximum rate
>>- [records/sec]
>>
>>
>>- Last Error
>>
>> RmqReceiver-0ACTIVEVDCAPP53.foo.local2.6 K29106295-RmqReceiver-1ACTIVE
>> VDCAPP50.bar.local2.6 K29107291-
>> Batch Processing Statistics
>>
>>MetricLast batchMinimum25th percentileMedian75th 
>> percentileMaximumProcessing
>>Time4 seconds 812 ms4 seconds 698 ms4 seconds 738 ms4 seconds 761 ms4
>>seconds 788 ms5 seconds 802 msScheduling Delay2 ms0 ms3 ms3 ms4 ms9 
>> msTotal
>>Delay4 seconds 814 ms4 seconds 701 ms4 seconds 739 ms4 seconds 764 ms4
>>seconds 792 ms5 seconds 809 ms
>>
>>
>> Regards,
>> Ashic.
>> --
>> From: as...@live.com
>> To: gerard.m...@gmail.com
>> CC: user@spark.apache.org
>> Subject: RE: Are these numbers abnormal for spark streaming?
>> Date: Thu, 22 Jan 2015 12:32:05 +
>>
>>
>> Hi Gerard,
>> Thanks for the response.
>>
>> The messages get desrialised from msgpack format, and one of the strings
>> is desrialised to json. Certain fields are checked to decide if further
>> processing is required. If so, it goes through a series of in mem filters
>> to check if more processing is required. If so, only then does the "heavy"
>> work start. That consists of a few db queries, and potential updates to the
>> db + message on message queue. The majority of messages don't need
>> processing. The messages needing processing at peak are about three every
>> other second.
>>
>> One possible things that might be happening is the session initialisation
>> and prepared statement initialisation for each partition. I can resort to
>> some tricks, but I think I'll try increasing batch interval to 15 seconds.
>> I'll report back with findings.
>>
>> Thanks,
>> Ashic.
>>
>> --
>> From: gerard.m...@gmail.com
>> Date: Thu, 22 Jan 2015 12:30:08 +0100
>> Subject: Re: Are these numbers abnormal for spark streaming?
>> To: tathagata.das1...@gmail.com
>> CC: as...@live.com; t...@databricks.com; user@spark.apache.org
>>
>> and post the code (if possible).
>> In a nutshell, your processing time > batch interval,  resulting in an
>> ever-increasing delay that will end up in a crash.
>> 3 secs to process 14 messages looks like a lot. Curious what the job
>> logic is.
>>
>> -kr, Gerard.
>>
>> On Thu, Jan 22, 2015 at 12:15 PM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>> This is not normal. Its a huge scheduling delay!! Can you tell me more
>> about the application?
>> - cluser setup, number of receivers, whats the computation, etc.
>>
>> On Thu, Jan 22, 2015 at 3:11 AM, Ashic Mahtab  wrote:
>>
>> Hate to do this...but...erm...bump? Would really appreciate input from
>> others using Streaming. Or at least some docs that would tell me if these
>> are expected or not.
>>
>> --
>> From: as...@live.com
>> To: user@spark.apache.org
>> Subject: Are these numbers abnormal for spark streaming?
>> Date: Wed, 21 Jan 2015 11:26:31 +
>>
>>
>> Hi Guys,
>> I've got Spark Streaming set up for a low data rate system (using spark's
>> features for analysis, rather than high throughput). Messages are coming in
>> throughout the day, at around 1-20 per second (finger in the air
>> estimate...not analysed yet).  In the spark streaming UI for the
>> application, I'm getting the following after 17 hours.
>>
>> Streaming
>>
>>- *Started at: *Tue Jan 20 16:58:43 GMT 2015
>>- *Time since start: *18 hours 24 minutes 34 seconds
>>- *Network receivers: *2
>>- *Batch interval: *2 seconds
>>- *Processed batches: *16482
>>- *Waiting batches

Re: Are these numbers abnormal for spark streaming?

2015-01-22 Thread Gerard Maas
and post the code (if possible).
In a nutshell, your processing time > batch interval,  resulting in an
ever-increasing delay that will end up in a crash.
3 secs to process 14 messages looks like a lot. Curious what the job logic
is.

-kr, Gerard.

On Thu, Jan 22, 2015 at 12:15 PM, Tathagata Das  wrote:

> This is not normal. Its a huge scheduling delay!! Can you tell me more
> about the application?
> - cluser setup, number of receivers, whats the computation, etc.
>
> On Thu, Jan 22, 2015 at 3:11 AM, Ashic Mahtab  wrote:
>
>> Hate to do this...but...erm...bump? Would really appreciate input from
>> others using Streaming. Or at least some docs that would tell me if these
>> are expected or not.
>>
>> --
>> From: as...@live.com
>> To: user@spark.apache.org
>> Subject: Are these numbers abnormal for spark streaming?
>> Date: Wed, 21 Jan 2015 11:26:31 +
>>
>>
>> Hi Guys,
>> I've got Spark Streaming set up for a low data rate system (using spark's
>> features for analysis, rather than high throughput). Messages are coming in
>> throughout the day, at around 1-20 per second (finger in the air
>> estimate...not analysed yet).  In the spark streaming UI for the
>> application, I'm getting the following after 17 hours.
>>
>> Streaming
>>
>>- *Started at: *Tue Jan 20 16:58:43 GMT 2015
>>- *Time since start: *18 hours 24 minutes 34 seconds
>>- *Network receivers: *2
>>- *Batch interval: *2 seconds
>>- *Processed batches: *16482
>>- *Waiting batches: *1
>>
>>
>>
>> Statistics over last 100 processed batchesReceiver Statistics
>>
>>- Receiver
>>
>>
>>- Status
>>
>>
>>- Location
>>
>>
>>- Records in last batch
>>- [2015/01/21 11:23:18]
>>
>>
>>- Minimum rate
>>- [records/sec]
>>
>>
>>- Median rate
>>- [records/sec]
>>
>>
>>- Maximum rate
>>- [records/sec]
>>
>>
>>- Last Error
>>
>> RmqReceiver-0ACTIVEF
>> 144727-RmqReceiver-1ACTIVEBR
>> 124726-
>> Batch Processing Statistics
>>
>>MetricLast batchMinimum25th percentileMedian75th 
>> percentileMaximumProcessing
>>Time3 seconds 994 ms157 ms4 seconds 16 ms4 seconds 961 ms5 seconds 3
>>ms5 seconds 171 msScheduling Delay9 hours 15 minutes 4 seconds9 hours
>>10 minutes 54 seconds9 hours 11 minutes 56 seconds9 hours 12 minutes
>>57 seconds9 hours 14 minutes 5 seconds9 hours 15 minutes 4 secondsTotal
>>Delay9 hours 15 minutes 8 seconds9 hours 10 minutes 58 seconds9 hours
>>12 minutes9 hours 13 minutes 2 seconds9 hours 14 minutes 10 seconds9
>>hours 15 minutes 8 seconds
>>
>>
>> Are these "normal". I was wondering what the scheduling delay and total
>> delay terms are, and if it's normal for them to be 9 hours.
>>
>> I've got a standalone spark master and 4 spark nodes. The streaming app
>> has been given 4 cores, and it's using 1 core per worker node. The
>> streaming app is submitted from a 5th machine, and that machine has nothing
>> but the driver running. The worker nodes are running alongside Cassandra
>> (and reading and writing to it).
>>
>> Any insights would be appreciated.
>>
>> Regards,
>> Ashic.
>>
>
>


Re: Discourse: A proposed alternative to the Spark User list

2015-01-22 Thread Gerard Maas
I've have been contributing to SO for a while now.  Here're few
observations I'd like to contribute to the discussion:

The level of questions on SO is often of more entry-level. "Harder"
questions (that require expertise in a certain area) remain unanswered for
a while. Same questions here on the list (as they are often cross-posted)
receive faster turnaround.
Roughly speaking, there're two groups of questions: Implementing things on
Spark and Running Spark.  The second one is borderline on SO guidelines as
they often involve cluster setups, long logs and little idea of what's
going on (mind you, often those questions come from people starting with
Spark)

In my opinion, Stack Overflow offers a better Q/A experience, in
particular, they have tooling in place to reduce duplicates, something that
often overloads this list (same "getting started issues" or "how to map,
filter, flatmap" over and over again).  That said, this list offers a
richer forum, where the expertise pool is a lot deeper.
Also, while SO is fairly strict in requiring posters from showing a minimal
amount of effort in the question being asked, this list is quite friendly
to the same behavior. This could be probably an element that makes the list
'lower impedance'.
One additional thing on SO is that the [apache-spark] tag is a 'low rep'
tag. Neither questions nor answers get significant voting, reducing the
'rep gaming' factor  (discouraging participation?)

Thinking about how to improve both platforms: SO[apache-spark] and this ML,
and get back the list to "not overwhelming" message volumes, we could
implement some 'load balancing' policies:
- encourage new users to use Stack Overflow, in particular, redirect newbie
questions to SO the friendly way: "did you search SO already?" or link to
an existing question.
  - most how to "map, flatmap, filter, aggregate, reduce, ..." would fall
under  this category
- encourage domain experts to hang on SO more often  (my impression is that
MLLib, GraphX are fairly underserved)
- have an 'scalation process' in place, where we could post
'interesting/hard/bug' questions from SO back to the list (or encourage the
poster to do so)
- update our "community guidelines" on [
http://spark.apache.org/community.html] to implement such policies.

Those are just some ideas on how to improve the community and better serve
the newcomers while avoiding overload of our existing expertise pool.

kr, Gerard.


On Thu, Jan 22, 2015 at 10:42 AM, Sean Owen  wrote:

> Yes, there is some project business like votes of record on releases that
> needs to be carried on in standard, simple accessible place and SO is not
> at all suitable.
>
> Nobody is stuck with Nabble. The suggestion is to enable a different
> overlay on the existing list. SO remains a place you can ask questions too.
> So I agree with Nick's take.
>
> BTW are there perhaps plans to split this mailing list into
> subproject-specific lists? That might also help tune in/out the subset of
> conversations of interest.
> On Jan 22, 2015 10:30 AM, "Petar Zecevic"  wrote:
>
>>
>> Ok, thanks for the clarifications. I didn't know this list has to remain
>> as the only official list.
>>
>> Nabble is really not the best solution in the world, but we're stuck with
>> it, I guess.
>>
>> That's it from me on this subject.
>>
>> Petar
>>
>>
>> On 22.1.2015. 3:55, Nicholas Chammas wrote:
>>
>>  I think a few things need to be laid out clearly:
>>
>>1. This mailing list is the “official” user discussion platform. That
>>is, it is sponsored and managed by the ASF.
>>2. Users are free to organize independent discussion platforms
>>focusing on Spark, and there is already one such platform in Stack 
>> Overflow
>>under the apache-spark and related tags. Stack Overflow works quite
>>well.
>>3. The ASF will not agree to deprecating or migrating this user list
>>to a platform that they do not control.
>>4. This mailing list has grown to an unwieldy size and discussions
>>are hard to find or follow; discussion tooling is also lacking. We want to
>>improve the utility and user experience of this mailing list.
>>5. We don’t want to fragment this “official” discussion community.
>>6. Nabble is an independent product not affiliated with the ASF. It
>>offers a slightly better interface to the Apache mailing list archives.
>>
>> So to respond to some of your points, pzecevic:
>>
>> Apache user group could be frozen (not accepting new questions, if that’s
>> possible) and redirect users to Stack Overflow (automatic reply?).
>>
>> From what I understand of the ASF’s policies, this is not possible. :(
>> This mailing list must remain the official Spark user discussion platform.
>>
>> Other thing, about new Stack Exchange site I proposed earlier. If a new
>> site is created, there is no problem with guidelines, I think, because
>> Spark community can apply different guidelines for the new site.
>>
>> I think Stack Overflow and the various 

Re: dynamically change receiver for a spark stream

2015-01-21 Thread Gerard Maas
Hi Tamas,

I meant not changing the receivers, but starting/stopping the Streaming
jobs. So you would have a 'small' Streaming job for a subset of streams
that you'd configure->start->stop  on demand.
I haven't tried myself yet, but I think it should also be possible to
create a Streaming Job from the Spark Job Server (
https://github.com/spark-jobserver/spark-jobserver). Then you would have a
REST interface that even gives you the possibility of passing a
configuration.

-kr, Gerard.

On Wed, Jan 21, 2015 at 11:54 AM, Tamas Jambor  wrote:

> we were thinking along the same line, that is to fix the number of streams
> and change the input and output channels dynamically.
>
> But could not make it work (seems that the receiver is not allowing any
> change in the config after it started).
>
> thanks,
>
> On Wed, Jan 21, 2015 at 10:49 AM, Gerard Maas 
> wrote:
>
>> One possible workaround could be to orchestrate launch/stopping of
>> Streaming jobs on demand as long as the number of jobs/streams stay
>> within the boundaries of the resources (cores) you've available.
>> e.g. if you're using Mesos, Marathon offers a REST interface to manage
>> job lifecycle. You will still need to solve the dynamic configuration
>> through some alternative channel.
>>
>> On Wed, Jan 21, 2015 at 11:30 AM, Tamas Jambor 
>> wrote:
>>
>>> thanks for the replies.
>>>
>>> is this something we can get around? Tried to hack into the code without
>>> much success.
>>>
>>> On Wed, Jan 21, 2015 at 3:15 AM, Shao, Saisai 
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I don't think current Spark Streaming support this feature, all the
>>>> DStream lineage is fixed after the context is started.
>>>>
>>>> Also stopping a stream is not supported, instead currently we need to
>>>> stop the whole streaming context to meet what you want.
>>>>
>>>> Thanks
>>>> Saisai
>>>>
>>>> -Original Message-
>>>> From: jamborta [mailto:jambo...@gmail.com]
>>>> Sent: Wednesday, January 21, 2015 3:09 AM
>>>> To: user@spark.apache.org
>>>> Subject: dynamically change receiver for a spark stream
>>>>
>>>> Hi all,
>>>>
>>>> we have been trying to setup a stream using a custom receiver that
>>>> would pick up data from sql databases. we'd like to keep that stream
>>>> context running and dynamically change the streams on demand, adding and
>>>> removing streams based on demand. alternativel, if a stream is fixed, is it
>>>> possible to stop a stream, change to config and start again?
>>>>
>>>> thanks,
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/dynamically-change-receiver-for-a-spark-stream-tp21268.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> -
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For
>>>> additional commands, e-mail: user-h...@spark.apache.org
>>>>
>>>>
>>>
>>
>


Re: dynamically change receiver for a spark stream

2015-01-21 Thread Gerard Maas
One possible workaround could be to orchestrate launch/stopping of
Streaming jobs on demand as long as the number of jobs/streams stay within
the boundaries of the resources (cores) you've available.
e.g. if you're using Mesos, Marathon offers a REST interface to manage job
lifecycle. You will still need to solve the dynamic configuration through
some alternative channel.

On Wed, Jan 21, 2015 at 11:30 AM, Tamas Jambor  wrote:

> thanks for the replies.
>
> is this something we can get around? Tried to hack into the code without
> much success.
>
> On Wed, Jan 21, 2015 at 3:15 AM, Shao, Saisai 
> wrote:
>
>> Hi,
>>
>> I don't think current Spark Streaming support this feature, all the
>> DStream lineage is fixed after the context is started.
>>
>> Also stopping a stream is not supported, instead currently we need to
>> stop the whole streaming context to meet what you want.
>>
>> Thanks
>> Saisai
>>
>> -Original Message-
>> From: jamborta [mailto:jambo...@gmail.com]
>> Sent: Wednesday, January 21, 2015 3:09 AM
>> To: user@spark.apache.org
>> Subject: dynamically change receiver for a spark stream
>>
>> Hi all,
>>
>> we have been trying to setup a stream using a custom receiver that would
>> pick up data from sql databases. we'd like to keep that stream context
>> running and dynamically change the streams on demand, adding and removing
>> streams based on demand. alternativel, if a stream is fixed, is it possible
>> to stop a stream, change to config and start again?
>>
>> thanks,
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/dynamically-change-receiver-for-a-spark-stream-tp21268.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
>> commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: SPARK-streaming app running 10x slower on YARN vs STANDALONE cluster

2015-01-21 Thread Gerard Maas
Hi Mukesh,

How are you creating your receivers? Could you post the (relevant) code?

-kr, Gerard.

On Wed, Jan 21, 2015 at 9:42 AM, Mukesh Jha  wrote:

> Hello Guys,
>
> I've re partitioned my kafkaStream so that it gets evenly distributed
> among the executors and the results are better.
> Still from the executors page it seems that only 1 executors all 8 cores
> are getting used and other executors are using just 1 core.
>
> Is this the correct interpretation based on the below data? If so how can
> we fix this?
>
> [image: Inline image 1]
>
> On Wed, Dec 31, 2014 at 7:22 AM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> Thats is kind of expected due to data locality. Though you should see
>> some tasks running on the executors as the data gets replicated to
>> other nodes and can therefore run tasks based on locality. You have
>> two solutions
>>
>> 1. kafkaStream.repartition() to explicitly repartition the received
>> data across the cluster.
>> 2. Create multiple kafka streams and union them together.
>>
>> See
>> http://spark.apache.org/docs/latest/streaming-programming-guide.html#reducing-the-processing-time-of-each-batch
>>
>> On Tue, Dec 30, 2014 at 1:43 AM, Mukesh Jha 
>> wrote:
>> > Thanks Sandy, It was the issue with the no of cores.
>> >
>> > Another issue I was facing is that tasks are not getting distributed
>> evenly
>> > among all executors and are running on the NODE_LOCAL locality level
>> i.e.
>> > all the tasks are running on the same executor where my
>> kafkareceiver(s) are
>> > running even though other executors are idle.
>> >
>> > I configured spark.locality.wait=50 instead of the default 3000 ms,
>> which
>> > forced the task rebalancing among nodes, let me know if there is a
>> better
>> > way to deal with this.
>> >
>> >
>> > On Tue, Dec 30, 2014 at 12:09 AM, Mukesh Jha 
>> > wrote:
>> >>
>> >> Makes sense, I've also tries it in standalone mode where all 3 workers
>> &
>> >> driver were running on the same 8 core box and the results were
>> similar.
>> >>
>> >> Anyways I will share the results in YARN mode with 8 core yarn
>> containers.
>> >>
>> >> On Mon, Dec 29, 2014 at 11:58 PM, Sandy Ryza 
>> >> wrote:
>> >>>
>> >>> When running in standalone mode, each executor will be able to use
>> all 8
>> >>> cores on the box.  When running on YARN, each executor will only have
>> access
>> >>> to 2 cores.  So the comparison doesn't seem fair, no?
>> >>>
>> >>> -Sandy
>> >>>
>> >>> On Mon, Dec 29, 2014 at 10:22 AM, Mukesh Jha > >
>> >>> wrote:
>> 
>>  Nope, I am setting 5 executors with 2  cores each. Below is the
>> command
>>  that I'm using to submit in YARN mode. This starts up 5 executor
>> nodes and a
>>  drives as per the spark  application master UI.
>> 
>>  spark-submit --master yarn-cluster --num-executors 5 --driver-memory
>>  1024m --executor-memory 1024m --executor-cores 2 --class
>>  com.oracle.ci.CmsgK2H /homext/lib/MJ-ci-k2h.jar
>> vm.cloud.com:2181/kafka
>>  spark-yarn avro 1 5000
>> 
>>  On Mon, Dec 29, 2014 at 11:45 PM, Sandy Ryza <
>> sandy.r...@cloudera.com>
>>  wrote:
>> >
>> > *oops, I mean are you setting --executor-cores to 8
>> >
>> > On Mon, Dec 29, 2014 at 10:15 AM, Sandy Ryza <
>> sandy.r...@cloudera.com>
>> > wrote:
>> >>
>> >> Are you setting --num-executors to 8?
>> >>
>> >> On Mon, Dec 29, 2014 at 10:13 AM, Mukesh Jha <
>> me.mukesh@gmail.com>
>> >> wrote:
>> >>>
>> >>> Sorry Sandy, The command is just for reference but I can confirm
>> that
>> >>> there are 4 executors and a driver as shown in the spark UI page.
>> >>>
>> >>> Each of these machines is a 8 core box with ~15G of ram.
>> >>>
>> >>> On Mon, Dec 29, 2014 at 11:23 PM, Sandy Ryza
>> >>>  wrote:
>> 
>>  Hi Mukesh,
>> 
>>  Based on your spark-submit command, it looks like you're only
>>  running with 2 executors on YARN.  Also, how many cores does
>> each machine
>>  have?
>> 
>>  -Sandy
>> 
>>  On Mon, Dec 29, 2014 at 4:36 AM, Mukesh Jha
>>   wrote:
>> >
>> > Hello Experts,
>> > I'm bench-marking Spark on YARN
>> > (https://spark.apache.org/docs/latest/running-on-yarn.html) vs
>> a standalone
>> > spark cluster (
>> https://spark.apache.org/docs/latest/spark-standalone.html).
>> > I have a standalone cluster with 3 executors, and a spark app
>> > running on yarn with 4 executors as shown below.
>> >
>> > The spark job running inside yarn is 10x slower than the one
>> > running on the standalone cluster (even though the yarn has
>> more number of
>> > workers), also in both the case all the executors are in the
>> same datacenter
>> > so there shouldn't be any latency. On YARN each 5sec batch is
>> reading data
>> > from kafka and processing it in 5sec & on the standalo

Re: How to force parallel processing of RDD using multiple thread

2015-01-16 Thread Gerard Maas
Spark will use the number of cores available in the cluster. If your
cluster is 1 node with 4 cores, Spark will execute up to 4 tasks in
parallel.
Setting your #of partitions to 4 will ensure an even load across cores.
Note that this is different from saying "threads" - Internally Spark uses
many threads  (data block sender/receiver, listeners, notifications,
scheduler, ...)

-kr, Gerard.

On Fri, Jan 16, 2015 at 3:14 PM, Wang, Ningjun (LNG-NPV) <
ningjun.w...@lexisnexis.com> wrote:

> Does parallel processing mean it is executed in multiple worker or
> executed in one worker but multiple threads? For example if I have only one
> worker but my RDD has 4 partition, will it be executed parallel in 4 thread?
>
> The reason I am asking is try to decide whether I need to configure spark
> to have multiple workers. By default, it just start with one worker.
>
> Regards,
>
> Ningjun Wang
> Consulting Software Engineer
> LexisNexis
> 121 Chanlon Road
> New Providence, NJ 07974-1541
>
>
> -Original Message-
> From: Sean Owen [mailto:so...@cloudera.com]
> Sent: Thursday, January 15, 2015 11:04 PM
> To: Wang, Ningjun (LNG-NPV)
> Cc: user@spark.apache.org
> Subject: Re: How to force parallel processing of RDD using multiple thread
>
> Check the number of partitions in your input. It may be much less than the
> available parallelism of your small cluster. For example, input that lives
> in just 1 partition will spawn just 1 task.
>
> Beyond that parallelism just happens. You can see the parallelism of each
> operation in the Spark UI.
>
> On Thu, Jan 15, 2015 at 10:53 PM, Wang, Ningjun (LNG-NPV) <
> ningjun.w...@lexisnexis.com> wrote:
> > Spark Standalone cluster.
> >
> > My program is running very slow, I suspect it is not doing parallel
> processing of rdd. How can I force it to run parallel? Is there anyway to
> check whether it is processed in parallel?
> >
> > Regards,
> >
> > Ningjun Wang
> > Consulting Software Engineer
> > LexisNexis
> > 121 Chanlon Road
> > New Providence, NJ 07974-1541
> >
> >
> > -Original Message-
> > From: Sean Owen [mailto:so...@cloudera.com]
> > Sent: Thursday, January 15, 2015 4:29 PM
> > To: Wang, Ningjun (LNG-NPV)
> > Cc: user@spark.apache.org
> > Subject: Re: How to force parallel processing of RDD using multiple
> > thread
> >
> > What is your cluster manager? For example on YARN you would specify
> --executor-cores. Read:
> > http://spark.apache.org/docs/latest/running-on-yarn.html
> >
> > On Thu, Jan 15, 2015 at 8:54 PM, Wang, Ningjun (LNG-NPV) <
> ningjun.w...@lexisnexis.com> wrote:
> >> I have a standalone spark cluster with only one node with 4 CPU cores.
> >> How can I force spark to do parallel processing of my RDD using
> >> multiple threads? For example I can do the following
> >>
> >>
> >>
> >> Spark-submit  --master local[4]
> >>
> >>
> >>
> >> However I really want to use the cluster as follow
> >>
> >>
> >>
> >> Spark-submit  --master spark://10.125.21.15:7070
> >>
> >>
> >>
> >> In that case, how can I make sure the RDD is processed with multiple
> >> threads/cores?
> >>
> >>
> >>
> >> Thanks
> >>
> >> Ningjun
> >>
> >>
>


Re: Join RDDs with DStreams

2015-01-08 Thread Gerard Maas
You are looking for dstream.transform(rdd => rdd.(otherRdd))

The docs contain an example on how to use transform.

https://spark.apache.org/docs/latest/streaming-programming-guide.html#transformations-on-dstreams

-kr, Gerard.

On Thu, Jan 8, 2015 at 5:50 PM, Asim Jalis  wrote:

> Is there a way to join non-DStream RDDs with DStream RDDs?
>
> Here is the use case. I have a lookup table stored in HDFS that I want to
> read as an RDD. Then I want to join it with the RDDs that are coming in
> through the DStream. How can I do this?
>
> Thanks.
>
> Asim
>


Re: Registering custom metrics

2015-01-08 Thread Gerard Maas
Very interesting approach. Thanks for sharing it!

On Thu, Jan 8, 2015 at 5:30 PM, Enno Shioji  wrote:

> FYI I found this approach by Ooyala.
>
> /** Instrumentation for Spark based on accumulators.
>   *
>   * Usage:
>   * val instrumentation = new SparkInstrumentation("example.metrics")
>   * val numReqs = sc.accumulator(0L)
>   * instrumentation.source.registerDailyAccumulator(numReqs, "numReqs")
>   * instrumentation.register()
>   *
>   * Will create and report the following metrics:
>   * - Gauge with total number of requests (daily)
>   * - Meter with rate of requests
>   *
>   * @param prefix prefix for all metrics that will be reported by this 
> Instrumentation
>   */
>
> https://gist.github.com/ibuenros/9b94736c2bad2f4b8e23
> ᐧ
>
> On Mon, Jan 5, 2015 at 2:56 PM, Enno Shioji  wrote:
>
>> Hi Gerard,
>>
>> Thanks for the answer! I had a good look at it, but I couldn't figure out
>> whether one can use that to emit metrics from your application code.
>>
>> Suppose I wanted to monitor the rate of bytes I produce, like so:
>>
>> stream
>> .map { input =>
>>   val bytes = produce(input)
>>   // metricRegistry.meter("some.metrics").mark(bytes.length)
>>   bytes
>> }
>> .saveAsTextFile("text")
>>
>> Is there a way to achieve this with the MetricSystem?
>>
>>
>> ᐧ
>>
>> On Mon, Jan 5, 2015 at 10:24 AM, Gerard Maas 
>> wrote:
>>
>>> Hi,
>>>
>>> Yes, I managed to create a register custom metrics by creating an
>>>  implementation  of org.apache.spark.metrics.source.Source and
>>> registering it to the metrics subsystem.
>>> Source is [Spark] private, so you need to create it under a org.apache.spark
>>> package. In my case, I'm dealing with Spark Streaming metrics, and I
>>> created my CustomStreamingSource under org.apache.spark.streaming as I
>>> also needed access to some [Streaming] private components.
>>>
>>> Then, you register your new metric Source on the Spark's metric system,
>>> like so:
>>>
>>> SparkEnv.get.metricsSystem.registerSource(customStreamingSource)
>>>
>>> And it will get reported to the metrics Sync active on your system. By
>>> default, you can access them through the metric endpoint:
>>> http://:/metrics/json
>>>
>>> I hope this helps.
>>>
>>> -kr, Gerard.
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Tue, Dec 30, 2014 at 3:32 PM, eshioji  wrote:
>>>
>>>> Hi,
>>>>
>>>> Did you find a way to do this / working on this?
>>>> Am trying to find a way to do this as well, but haven't been able to
>>>> find a
>>>> way.
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-spark-developers-list.1001551.n3.nabble.com/Registering-custom-metrics-tp9030p9968.html
>>>> Sent from the Apache Spark Developers List mailing list archive at
>>>> Nabble.com.
>>>>
>>>> -
>>>> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
>>>> For additional commands, e-mail: dev-h...@spark.apache.org
>>>>
>>>>
>>>
>>
>


Re: KafkaUtils not consuming all the data from all partitions

2015-01-07 Thread Gerard Maas
AFAIK, there're two levels of parallelism related to the Spark Kafka
consumer:

At JVM level: For each receiver, one can specify the number of threads for
a given topic, provided as a map [topic -> nthreads].  This will
effectively start n JVM threads consuming partitions of that kafka topic.
At Cluster level: One can create several DStreams, and each will have one
receiver and use 1 executor core in Spark each DStream will have its
receiver as defined in the previous line.

What you need to ensure is that there's a consumer attached to each
partition of your kafka topic. That is, nthreads * nReceivers =
#kafka_partitions(topic)

e.g:
Given
nPartitions = #partitions of your topic
nThreads = #of threads per receiver

val kafkaStreams = (1 to nPartitions/nThreads).map{ i =>
KafkaUtils.createStream(ssc, …, kafkaConf, Map(topic -> nThreads),
StorageLevel.MEMORY_ONLY_SER)

For this to work, you need at least (nPartitions/nThreads +1) cores in your
Spark cluster, although I would recommend to have 2-3x
(nPartitions/nThreads).
(and don't forget to union the streams after creation)

-kr, Gerard.



On Wed, Jan 7, 2015 at 4:43 PM,  wrote:

> - You are launching up to 10 threads/topic per Receiver. Are you sure your
> receivers can support 10 threads each ? (i.e. in the default configuration,
> do they have 10 cores). If they have 2 cores, that would explain why this
> works with 20 partitions or less.
>
> - If you have 90 partitions, why start 10 Streams, each consuming 10
> partitions, and then removing the stream at index 0 ? Why not simply start
> 10 streams with 9 partitions ? Or, more simply,
>
> val kafkaStreams = (1 to numPartitions).map { _ =>
> KafkaUtils.createStream(ssc, …, kafkaConf, Map(topic -> 1),
> StorageLevel.MEMORY_ONLY_SER)
>
> - You’re consuming up to 10 local threads *per topic*, on each of your 10
> receivers. That’s a lot of threads (10* size of kafkaTopicsList) co-located
> on a single machine. You mentioned having a single Kafka topic with 90
> partitions. Why not have a single-element topicMap ?
>
> —
> FG
>
>
> On Wed, Jan 7, 2015 at 4:05 PM, Mukesh Jha 
> wrote:
>
>>  I understand that I've to create 10 parallel streams. My code is
>> running fine when the no of partitions is ~20, but when I increase the no
>> of partitions I keep getting in this issue.
>>
>> Below is my code to create kafka streams, along with the configs used.
>>
>> Map kafkaConf = new HashMap();
>> kafkaConf.put("zookeeper.connect", kafkaZkQuorum);
>> kafkaConf.put("group.id", kafkaConsumerGroup);
>> kafkaConf.put("consumer.timeout.ms", "3");
>> kafkaConf.put("auto.offset.reset", "largest");
>> kafkaConf.put("fetch.message.max.bytes", "2000");
>> kafkaConf.put("zookeeper.session.timeout.ms", "6000");
>> kafkaConf.put("zookeeper.connection.timeout.ms", "6000");
>> kafkaConf.put("zookeeper.sync.time.ms", "2000");
>> kafkaConf.put("rebalance.backoff.ms", "1");
>> kafkaConf.put("rebalance.max.retries", "20");
>> String[] topics = kafkaTopicsList;
>> int numStreams = numKafkaThreads; // this is *10*
>> Map topicMap = new HashMap<>();
>> for (String topic: topics) {
>>   topicMap.put(topic, numStreams);
>> }
>>
>> List> kafkaStreams = new
>> ArrayList<>(numStreams);
>> for (int i = 0; i < numStreams; i++) {
>>   kafkaStreams.add(KafkaUtils.createStream(sc, byte[].class,
>> byte[].class, DefaultDecoder.class, DefaultDecoder.class, kafkaConf,
>> topicMap, StorageLevel.MEMORY_ONLY_SER()));
>> }
>> JavaPairDStream ks = sc.union(kafkaStreams.remove(0),
>> kafkaStreams);
>>
>>
>> On Wed, Jan 7, 2015 at 8:21 PM, Gerard Maas 
>> wrote:
>>
>>> Hi,
>>>
>>> Could you add the code where you create the Kafka consumer?
>>>
>>> -kr, Gerard.
>>>
>>> On Wed, Jan 7, 2015 at 3:43 PM,  wrote:
>>>
>>>> Hi Mukesh,
>>>>
>>>> If my understanding is correct, each Stream only has a single Receiver.
>>>> So, if you have each receiver consuming 9 partitions, you need 10 input
>>>> DStreams to create 10 concurrent receivers:
>>>>
>>>>
>>>> https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving
>>>>
>>>> Would you mind sharing a bit more on how you achieve this ?
>>>>

Re: KafkaUtils not consuming all the data from all partitions

2015-01-07 Thread Gerard Maas
Hi,

Could you add the code where you create the Kafka consumer?

-kr, Gerard.

On Wed, Jan 7, 2015 at 3:43 PM,  wrote:

> Hi Mukesh,
>
> If my understanding is correct, each Stream only has a single Receiver.
> So, if you have each receiver consuming 9 partitions, you need 10 input
> DStreams to create 10 concurrent receivers:
>
>
> https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving
>
> Would you mind sharing a bit more on how you achieve this ?
>
> —
> FG
>
>
> On Wed, Jan 7, 2015 at 3:00 PM, Mukesh Jha 
> wrote:
>
>> Hi Guys,
>>
>> I have a kafka topic having 90 partitions and I running
>> SparkStreaming(1.2.0) to read from kafka via KafkaUtils to create 10
>> kafka-receivers.
>>
>> My streaming is running fine and there is no delay in processing, just
>> that some partitions data is never getting picked up. From the kafka
>> console I can see that each receiver is consuming data from 9 partitions
>> but the lag for some offsets keeps on increasing.
>>
>> Below is my kafka-consumers parameters.
>>
>> Any of you have face this kind of issue, if so then do you have any
>> pointers to fix it?
>>
>>  Map kafkaConf = new HashMap();
>>  kafkaConf.put("zookeeper.connect", kafkaZkQuorum);
>>  kafkaConf.put("group.id", kafkaConsumerGroup);
>>  kafkaConf.put("consumer.timeout.ms", "3");
>>  kafkaConf.put("auto.offset.reset", "largest");
>>  kafkaConf.put("fetch.message.max.bytes", "2000");
>>  kafkaConf.put("zookeeper.session.timeout.ms", "6000");
>>  kafkaConf.put("zookeeper.connection.timeout.ms", "6000");
>>  kafkaConf.put("zookeeper.sync.time.ms", "2000");
>>  kafkaConf.put("rebalance.backoff.ms", "1");
>>  kafkaConf.put("rebalance.max.retries", "20");
>>
>> --
>> Thanks & Regards,
>>
>> Mukesh Jha 
>>
>
>


  1   2   >