Re: Pyarrow Plasma client.release() fault

2018-07-20 Thread Corey Nolet
Robert,

Yes I am using separate Plasma clients in each different thread. I also
verified that I am not using up all the file descriptors or reaching the
overcommit limit.

I do see that the Plasma server is evicting objects every so often. I'm
assuming this eviction may be going on in the background? Is it possible
that the locking up may be the result of a massive eviction? I am
allocating over 8TB for the Plasma server.

Wes,

Best practices would be great. I did find that the @ray.remote scheduler
from the Ray project has drastically simplified my code.

I also attempted using single-node PySpark but the type conversion I need
for going from CSV->Dataframes was orders of magnitude slower than Pandas
and Python.



On Mon, Jul 16, 2018 at 8:17 PM Wes McKinney  wrote:

> Seems like we might want to write down some best practices for this
> level of large scale usage, essentially a supercomputer-like rig. I
> wouldn't even know where to come by a machine with a machine with >
> 2TB memory for scalability / concurrency load testing
>
> On Mon, Jul 16, 2018 at 2:59 PM, Robert Nishihara
>  wrote:
> > Are you using the same plasma client from all of the different threads?
> If
> > so, that could cause race conditions as the client is not thread safe.
> >
> > Alternatively, if you have a separate plasma client for each thread, then
> > you may be running out of file descriptors somewhere (either the client
> > process or the store).
> >
> > Can you check if the object store evicting objects (it prints something
> to
> > stdout/stderr when this happens)? Could you be running out of memory but
> > failing to release the objects?
> >
> > On Tue, Jul 10, 2018 at 9:48 AM Corey Nolet  wrote:
> >
> >> Update:
> >>
> >> I'm investigating the possibility that I've reached the overcommit
> limit in
> >> the kernel as a result of all the parallel processes.
> >>
> >> This still doesn't fix the client.release() problem but it might explain
> >> why the processing appears to halt, after some time, until I restart the
> >> Jupyter kernel.
> >>
> >> On Tue, Jul 10, 2018 at 12:27 PM Corey Nolet  wrote:
> >>
> >> > Wes,
> >> >
> >> > Unfortunately, my code is on a separate network. I'll try to explain
> what
> >> > I'm doing and if you need further detail, I can certainly pseudocode
> >> > specifics.
> >> >
> >> > I am using multiprocessing.Pool() to fire up a bunch of threads for
> >> > different filenames. In each thread, I'm performing a pd.read_csv(),
> >> > sorting by the timestamp field (rounded to the day) and chunking the
> >> > Dataframe into separate Dataframes. I create a new Plasma ObjectID for
> >> each
> >> > of the chunked Dataframes, convert them to RecordBuffer objects,
> stream
> >> the
> >> > bytes to Plasma and seal the objects. Only the objectIDs are returned
> to
> >> > the orchestration thread.
> >> >
> >> > In follow-on processing, I'm combining the ObjectIDs for each of the
> >> > unique day timestamps into lists and I'm passing those into a
> function in
> >> > parallel using multiprocessing.Pool(). In this function, I'm iterating
> >> > through the lists of objectIds, loading them back into Dataframes,
> >> > appending them together until their size
> >> > is > some predefined threshold, and performing a df.to_parquet().
> >> >
> >> > The steps in the 2 paragraphs above are performing in a loop,
> batching up
> >> > 500-1k files at a time for each iteration.
> >> >
> >> > When I run this iteration a few times, it eventually locks up the
> Plasma
> >> > client. With regards to the release() fault, it doesn't seem to matter
> >> when
> >> > or where I run it (in the orchestration thread or in other threads),
> it
> >> > always seems to crash the Jupyter kernel. I'm thinking I might be
> using
> >> it
> >> > wrong, I'm just trying to figure out where and what I'm doing.
> >> >
> >> > Thanks again!
> >> >
> >> > On Tue, Jul 10, 2018 at 12:05 PM Wes McKinney 
> >> wrote:
> >> >
> >> >> hi Corey,
> >> >>
> >> >> Can you provide the code (or a simplified version thereof) that shows
> >> >> how you're using Plasma?
> >> >>
> >> >> - Wes
> >> >>
> >> >> On Tue, Jul 10, 2018 at 11:45 AM, Corey Nolet 
> >&

Re: Pyarrow Plasma client.release() fault

2018-07-10 Thread Corey Nolet
Update:

I'm investigating the possibility that I've reached the overcommit limit in
the kernel as a result of all the parallel processes.

This still doesn't fix the client.release() problem but it might explain
why the processing appears to halt, after some time, until I restart the
Jupyter kernel.

On Tue, Jul 10, 2018 at 12:27 PM Corey Nolet  wrote:

> Wes,
>
> Unfortunately, my code is on a separate network. I'll try to explain what
> I'm doing and if you need further detail, I can certainly pseudocode
> specifics.
>
> I am using multiprocessing.Pool() to fire up a bunch of threads for
> different filenames. In each thread, I'm performing a pd.read_csv(),
> sorting by the timestamp field (rounded to the day) and chunking the
> Dataframe into separate Dataframes. I create a new Plasma ObjectID for each
> of the chunked Dataframes, convert them to RecordBuffer objects, stream the
> bytes to Plasma and seal the objects. Only the objectIDs are returned to
> the orchestration thread.
>
> In follow-on processing, I'm combining the ObjectIDs for each of the
> unique day timestamps into lists and I'm passing those into a function in
> parallel using multiprocessing.Pool(). In this function, I'm iterating
> through the lists of objectIds, loading them back into Dataframes,
> appending them together until their size
> is > some predefined threshold, and performing a df.to_parquet().
>
> The steps in the 2 paragraphs above are performing in a loop, batching up
> 500-1k files at a time for each iteration.
>
> When I run this iteration a few times, it eventually locks up the Plasma
> client. With regards to the release() fault, it doesn't seem to matter when
> or where I run it (in the orchestration thread or in other threads), it
> always seems to crash the Jupyter kernel. I'm thinking I might be using it
> wrong, I'm just trying to figure out where and what I'm doing.
>
> Thanks again!
>
> On Tue, Jul 10, 2018 at 12:05 PM Wes McKinney  wrote:
>
>> hi Corey,
>>
>> Can you provide the code (or a simplified version thereof) that shows
>> how you're using Plasma?
>>
>> - Wes
>>
>> On Tue, Jul 10, 2018 at 11:45 AM, Corey Nolet  wrote:
>> > I'm on a system with 12TB of memory and attempting to use Pyarrow's
>> Plasma
>> > client to convert a series of CSV files (via Pandas) into a Parquet
>> store.
>> >
>> > I've got a little over 20k CSV files to process which are about 1-2gb
>> each.
>> > I'm loading 500 to 1000 files at a time.
>> >
>> > In each iteration, I'm loading a series of files, partitioning them by a
>> > time field into separate dataframes, then writing parquet files in
>> > directories for each day.
>> >
>> > The problem I'm having is that the Plasma client & server appear to
>> lock up
>> > after about 2-3 iterations. It locks up to the point where I can't even
>> > CTRL+C the server. I am able to stop the notebook and re-trying the code
>> > just continues to lock up when interacting with Jupyter. There are no
>> > errors in my logs to tell me something's wrong.
>> >
>> > Just to make sure I'm not just being impatient and possibly need to wait
>> > for some background services to finish, I allowed the code to run
>> overnight
>> > and it was still in the same state when I came in to work this morning.
>> I'm
>> > running the Plasma server with 4TB max.
>> >
>> > In an attempt to pro-actively free up some of the object ids that I no
>> > longer need, I also attempted to use the client.release() function but I
>> > cannot seem to figure out how to make this work properly. It crashes my
>> > Jupyter kernel each time I try.
>> >
>> > I'm using Pyarrow 0.9.0
>> >
>> > Thanks in advance.
>>
>


Re: Pyarrow Plasma client.release() fault

2018-07-10 Thread Corey Nolet
Wes,

Unfortunately, my code is on a separate network. I'll try to explain what
I'm doing and if you need further detail, I can certainly pseudocode
specifics.

I am using multiprocessing.Pool() to fire up a bunch of threads for
different filenames. In each thread, I'm performing a pd.read_csv(),
sorting by the timestamp field (rounded to the day) and chunking the
Dataframe into separate Dataframes. I create a new Plasma ObjectID for each
of the chunked Dataframes, convert them to RecordBuffer objects, stream the
bytes to Plasma and seal the objects. Only the objectIDs are returned to
the orchestration thread.

In follow-on processing, I'm combining the ObjectIDs for each of the unique
day timestamps into lists and I'm passing those into a function in parallel
using multiprocessing.Pool(). In this function, I'm iterating through the
lists of objectIds, loading them back into Dataframes, appending them
together until their size
is > some predefined threshold, and performing a df.to_parquet().

The steps in the 2 paragraphs above are performing in a loop, batching up
500-1k files at a time for each iteration.

When I run this iteration a few times, it eventually locks up the Plasma
client. With regards to the release() fault, it doesn't seem to matter when
or where I run it (in the orchestration thread or in other threads), it
always seems to crash the Jupyter kernel. I'm thinking I might be using it
wrong, I'm just trying to figure out where and what I'm doing.

Thanks again!

On Tue, Jul 10, 2018 at 12:05 PM Wes McKinney  wrote:

> hi Corey,
>
> Can you provide the code (or a simplified version thereof) that shows
> how you're using Plasma?
>
> - Wes
>
> On Tue, Jul 10, 2018 at 11:45 AM, Corey Nolet  wrote:
> > I'm on a system with 12TB of memory and attempting to use Pyarrow's
> Plasma
> > client to convert a series of CSV files (via Pandas) into a Parquet
> store.
> >
> > I've got a little over 20k CSV files to process which are about 1-2gb
> each.
> > I'm loading 500 to 1000 files at a time.
> >
> > In each iteration, I'm loading a series of files, partitioning them by a
> > time field into separate dataframes, then writing parquet files in
> > directories for each day.
> >
> > The problem I'm having is that the Plasma client & server appear to lock
> up
> > after about 2-3 iterations. It locks up to the point where I can't even
> > CTRL+C the server. I am able to stop the notebook and re-trying the code
> > just continues to lock up when interacting with Jupyter. There are no
> > errors in my logs to tell me something's wrong.
> >
> > Just to make sure I'm not just being impatient and possibly need to wait
> > for some background services to finish, I allowed the code to run
> overnight
> > and it was still in the same state when I came in to work this morning.
> I'm
> > running the Plasma server with 4TB max.
> >
> > In an attempt to pro-actively free up some of the object ids that I no
> > longer need, I also attempted to use the client.release() function but I
> > cannot seem to figure out how to make this work properly. It crashes my
> > Jupyter kernel each time I try.
> >
> > I'm using Pyarrow 0.9.0
> >
> > Thanks in advance.
>


Pyarrow Plasma client.release() fault

2018-07-10 Thread Corey Nolet
I'm on a system with 12TB of memory and attempting to use Pyarrow's Plasma
client to convert a series of CSV files (via Pandas) into a Parquet store.

I've got a little over 20k CSV files to process which are about 1-2gb each.
I'm loading 500 to 1000 files at a time.

In each iteration, I'm loading a series of files, partitioning them by a
time field into separate dataframes, then writing parquet files in
directories for each day.

The problem I'm having is that the Plasma client & server appear to lock up
after about 2-3 iterations. It locks up to the point where I can't even
CTRL+C the server. I am able to stop the notebook and re-trying the code
just continues to lock up when interacting with Jupyter. There are no
errors in my logs to tell me something's wrong.

Just to make sure I'm not just being impatient and possibly need to wait
for some background services to finish, I allowed the code to run overnight
and it was still in the same state when I came in to work this morning. I'm
running the Plasma server with 4TB max.

In an attempt to pro-actively free up some of the object ids that I no
longer need, I also attempted to use the client.release() function but I
cannot seem to figure out how to make this work properly. It crashes my
Jupyter kernel each time I try.

I'm using Pyarrow 0.9.0

Thanks in advance.


Re: PySpark API on top of Apache Arrow

2018-05-26 Thread Corey Nolet
Gourav & Nicholas,

THank you! It does look like the pyspark Pandas UDF is exactly what I want
and the article I read didn't mention that it used Arrow underneath. Looks
like Wes McKinney was also key part of building the Pandas UDF.

Gourav,

I totally apologize for my long and drawn out response to you. I initially
misunderstood your response. I also need to take the time to dive into the
PySpark source code- I was assuming that it was just firing up JVMs under
the hood.

Thanks again! I'll report back with findings.

On Sat, May 26, 2018 at 2:51 PM, Nicolas Paris <nipari...@gmail.com> wrote:

> hi corey
>
> not familiar with arrow, plasma. However recently read an article about
> spark on
> a standalone machine (your case). Sounds like you could take benefit of
> pyspark
> "as-is"
>
> https://databricks.com/blog/2018/05/03/benchmarking-
> apache-spark-on-a-single-node-machine.html
>
> regars,
>
> 2018-05-23 22:30 GMT+02:00 Corey Nolet <cjno...@gmail.com>:
>
>> Please forgive me if this question has been asked already.
>>
>> I'm working in Python with Arrow+Plasma+Pandas Dataframes. I'm curious if
>> anyone knows of any efforts to implement the PySpark API on top of Apache
>> Arrow directly. In my case, I'm doing data science on a machine with 288
>> cores and 1TB of ram.
>>
>> It would make life much easier if I was able to use the flexibility of
>> the PySpark API (rather than having to be tied to the operations in
>> Pandas). It seems like an implementation would be fairly straightforward
>> using the Plasma server and object_ids.
>>
>> If you have not heard of an effort underway to accomplish this, any
>> reasons why it would be a bad idea?
>>
>>
>> Thanks!
>>
>
>


PySpark API on top of Apache Arrow

2018-05-23 Thread Corey Nolet
Please forgive me if this question has been asked already.

I'm working in Python with Arrow+Plasma+Pandas Dataframes. I'm curious if
anyone knows of any efforts to implement the PySpark API on top of Apache
Arrow directly. In my case, I'm doing data science on a machine with 288
cores and 1TB of ram.

It would make life much easier if I was able to use the flexibility of the
PySpark API (rather than having to be tied to the operations in Pandas). It
seems like an implementation would be fairly straightforward using the
Plasma server and object_ids.

If you have not heard of an effort underway to accomplish this, any reasons
why it would be a bad idea?


Thanks!


Re: PyArrow & Python Multiprocessing

2018-05-16 Thread Corey Nolet
I must say, I’m super excited about using Arrow and Plasma.

The code you just posted worked for me at home and I’m sure I’ll figure out 
what I was doing wrong tomorrow at work. 

Anyways, thanks so much for your help and fast replies! 

Sent from my iPhone

> On May 16, 2018, at 7:42 PM, Robert Nishihara <robertnishih...@gmail.com> 
> wrote:
> 
> You should be able to do something like the following.
> 
> # Start the store.
> plasma_store -s /tmp/store -m 10
> 
> Then in Python, do the following:
> 
> import pandas as pd
> import pyarrow.plasma as plasma
> import numpy as np
> 
> client = plasma.connect('/tmp/store', '', 0)
> series = pd.Series(np.zeros(100))
> object_id = client.put(series)
> 
> And yes, I would create a separate Plasma client for each process. I don't
> think you'll be able to pickle a Plasma client object successfully (it has
> a socket connection to the store).
> 
> On Wed, May 16, 2018 at 3:43 PM Corey Nolet <cjno...@gmail.com> wrote:
> 
>> Robert,
>> 
>> Thank you for the quick response. I've been playing around for a few hours
>> to get a feel for how this works.
>> 
>> If I understand correctly, it's better to have the Plasma client objects
>> instantiated within each separate process? Weird things seemed to happen
>> when I attempted to share a single one. I was assuming that the pickle
>> serialization by python multiprocessing would have been serializing the
>> connection info and re-instantiating on the other side but that didn't seem
>> to be the case.
>> 
>> I managed to load up a gigantic set of CSV files into Dataframes. Now I'm
>> attempting to read the chunks, perform a groupby-aggregate, and write the
>> results back to the Plasma store. Unless I'm mistaken, there doesn't seem
>> to be a very direct way of accomplishing this. When I tried converting the
>> Series object into a Plasma Array and just doing a client.put(array) I get
>> a pickling error. Unless maybe I'm misunderstanding the architecture here,
>> I believe that error would have been referring to attempts to serialize the
>> object into a file? I would hope that the data isn't all being sent to the
>> single Plasma server (or sent over sockets for that matter).
>> 
>> What would be the recommended strategy for serializing Pandas Series
>> objects? I really like the StreamWriter concept here but there does not
>> seem to be a direct way (or documentation) to accomplish this.
>> 
>> Thanks again.
>> 
>> On Wed, May 16, 2018 at 1:28 PM, Robert Nishihara <
>> robertnishih...@gmail.com
>>> wrote:
>> 
>>> Take a look at the Plasma object store
>>> https://arrow.apache.org/docs/python/plasma.html.
>>> 
>>> Here's an example using it (along with multiprocessing to sort a pandas
>>> dataframe)
>>> https://github.com/apache/arrow/blob/master/python/
>>> examples/plasma/sorting/sort_df.py.
>>> It's possible the example is a bit out of date.
>>> 
>>> You may be interested in taking a look at Ray
>>> https://github.com/ray-project/ray. We use Plasma/Arrow under the hood
>> to
>>> do all of these things but hide a lot of the bookkeeping (like object ID
>>> generation). For your setting, you can think of it as a replacement for
>>> Python multiprocessing that automatically uses shared memory and Arrow
>> for
>>> serialization.
>>> 
>>>> On Wed, May 16, 2018 at 10:02 AM Corey Nolet <cjno...@gmail.com> wrote:
>>>> 
>>>> I've been reading through the PyArrow documentation and trying to
>>>> understand how to use the tool effectively for IPC (using zero-copy).
>>>> 
>>>> I'm on a system with 586 cores & 1TB of ram. I'm using Panda's
>> Dataframes
>>>> to process several 10's of gigs of data in memory and the pickling that
>>> is
>>>> done by Python's multiprocessing API is very wasteful.
>>>> 
>>>> I'm running a little hand-built map-reduce where I chunk the dataframe
>>> into
>>>> N_mappers number of chunks, run some processing on them, then run some
>>>> number N_reducers to finalize the operation. What I'd like to be able
>> to
>>> do
>>>> is chunk up the dataframe into Arrow Buffer objects and just have each
>>>> mapped task read their respective Buffer object with the guarantee of
>>>> zero-copy.
>>>> 
>>>> I see there's a couple Filesystem abstractions for doing memory-mapped
>>>> files. Durability isn't something I need and I'm willing to forego the
>>>> expense of putting the files on disk.
>>>> 
>>>> Is it possible to write the data directly to memory and pass just the
>>>> reference around to the different processes? What's the recommended way
>>> to
>>>> accomplish my goal here?
>>>> 
>>>> 
>>>> Thanks in advance!
>>>> 
>>> 
>> 


Re: PyArrow & Python Multiprocessing

2018-05-16 Thread Corey Nolet
Robert,

Thank you for the quick response. I've been playing around for a few hours
to get a feel for how this works.

If I understand correctly, it's better to have the Plasma client objects
instantiated within each separate process? Weird things seemed to happen
when I attempted to share a single one. I was assuming that the pickle
serialization by python multiprocessing would have been serializing the
connection info and re-instantiating on the other side but that didn't seem
to be the case.

I managed to load up a gigantic set of CSV files into Dataframes. Now I'm
attempting to read the chunks, perform a groupby-aggregate, and write the
results back to the Plasma store. Unless I'm mistaken, there doesn't seem
to be a very direct way of accomplishing this. When I tried converting the
Series object into a Plasma Array and just doing a client.put(array) I get
a pickling error. Unless maybe I'm misunderstanding the architecture here,
I believe that error would have been referring to attempts to serialize the
object into a file? I would hope that the data isn't all being sent to the
single Plasma server (or sent over sockets for that matter).

What would be the recommended strategy for serializing Pandas Series
objects? I really like the StreamWriter concept here but there does not
seem to be a direct way (or documentation) to accomplish this.

Thanks again.

On Wed, May 16, 2018 at 1:28 PM, Robert Nishihara <robertnishih...@gmail.com
> wrote:

> Take a look at the Plasma object store
> https://arrow.apache.org/docs/python/plasma.html.
>
> Here's an example using it (along with multiprocessing to sort a pandas
> dataframe)
> https://github.com/apache/arrow/blob/master/python/
> examples/plasma/sorting/sort_df.py.
> It's possible the example is a bit out of date.
>
> You may be interested in taking a look at Ray
> https://github.com/ray-project/ray. We use Plasma/Arrow under the hood to
> do all of these things but hide a lot of the bookkeeping (like object ID
> generation). For your setting, you can think of it as a replacement for
> Python multiprocessing that automatically uses shared memory and Arrow for
> serialization.
>
> On Wed, May 16, 2018 at 10:02 AM Corey Nolet <cjno...@gmail.com> wrote:
>
> > I've been reading through the PyArrow documentation and trying to
> > understand how to use the tool effectively for IPC (using zero-copy).
> >
> > I'm on a system with 586 cores & 1TB of ram. I'm using Panda's Dataframes
> > to process several 10's of gigs of data in memory and the pickling that
> is
> > done by Python's multiprocessing API is very wasteful.
> >
> > I'm running a little hand-built map-reduce where I chunk the dataframe
> into
> > N_mappers number of chunks, run some processing on them, then run some
> > number N_reducers to finalize the operation. What I'd like to be able to
> do
> > is chunk up the dataframe into Arrow Buffer objects and just have each
> > mapped task read their respective Buffer object with the guarantee of
> > zero-copy.
> >
> > I see there's a couple Filesystem abstractions for doing memory-mapped
> > files. Durability isn't something I need and I'm willing to forego the
> > expense of putting the files on disk.
> >
> > Is it possible to write the data directly to memory and pass just the
> > reference around to the different processes? What's the recommended way
> to
> > accomplish my goal here?
> >
> >
> > Thanks in advance!
> >
>


PyArrow & Python Multiprocessing

2018-05-16 Thread Corey Nolet
I've been reading through the PyArrow documentation and trying to
understand how to use the tool effectively for IPC (using zero-copy).

I'm on a system with 586 cores & 1TB of ram. I'm using Panda's Dataframes
to process several 10's of gigs of data in memory and the pickling that is
done by Python's multiprocessing API is very wasteful.

I'm running a little hand-built map-reduce where I chunk the dataframe into
N_mappers number of chunks, run some processing on them, then run some
number N_reducers to finalize the operation. What I'd like to be able to do
is chunk up the dataframe into Arrow Buffer objects and just have each
mapped task read their respective Buffer object with the guarantee of
zero-copy.

I see there's a couple Filesystem abstractions for doing memory-mapped
files. Durability isn't something I need and I'm willing to forego the
expense of putting the files on disk.

Is it possible to write the data directly to memory and pass just the
reference around to the different processes? What's the recommended way to
accomplish my goal here?


Thanks in advance!


Re: Using MatrixFactorizationModel as a feature extractor

2017-11-27 Thread Corey Nolet
I know that the algorithm itself is not able to extract features for a user
that it was not trained on, however, I'm trying to find a way to compare
users for similarity so that when I find a user that's really similar to
another user, I can just use the similar user's recommendations until the
other user gets worked into the model.

On Mon, Nov 27, 2017 at 3:08 PM, Corey Nolet <cjno...@gmail.com> wrote:

> I'm trying to use the MatrixFactorizationModel to, for instance, determine
> the latent factors of a user or item that were not used in the training
> data of the model. I'm not as concerned about the rating as I am with the
> latent factors for the user/item.
>
> Thanks!
>


Using MatrixFactorizationModel as a feature extractor

2017-11-27 Thread Corey Nolet
I'm trying to use the MatrixFactorizationModel to, for instance, determine
the latent factors of a user or item that were not used in the training
data of the model. I'm not as concerned about the rating as I am with the
latent factors for the user/item.

Thanks!


[theano-users] Re: Apply generic element wise function against tensor

2017-01-17 Thread Corey Nolet
Here's what my code in numpy looks like:

tensor = shared(np.random.randn(7, 16, 16))

e_tensor = tensor.eval()

tensor2 = e_tensor[0,:,:]
tensor2[tensor2 < 1] = 0.0
tensor2[tensor2 > 0 = 1.0

new_tensor = [tensor2]
for i in range(1, e_tensor.shape[0]):
new_tensor.append(np.multiply(tensor2, e_tensor[i,:,:]))

output = np.array(new_tensor).reshape(7,16,16)




On Tuesday, January 17, 2017 at 10:47:24 AM UTC-5, Corey Nolet wrote:
>
> I have a tensor which is (7,16,16), let's denote as Tijk. I need to make 
> the following function:
>
> for j in range(0, 16):
> for k in range(0, 16):
> if T0jk == 0:
> for i in range(1, 7):
> Tijk = 0
>
>
> Any ideas on how this can be done with Theano's tensor API?
>
>
> Thanks! :
> 
>

-- 

--- 
You received this message because you are subscribed to the Google Groups 
"theano-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to theano-users+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.


[theano-users] Apply generic element wise function against tensor

2017-01-17 Thread Corey Nolet
I have a tensor which is (7,16,16), let's denote as Tijk. I need to make 
the following function:

for j in range(0, 16):
for k in range(0, 16):
if T0jk == 0:
for i in range(1, 7):
Tijk = 0


Any ideas on how this can be done with Theano's tensor API?


Thanks! :


-- 

--- 
You received this message because you are subscribed to the Google Groups 
"theano-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to theano-users+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.


[theano-users] Complex if/else operations in a custom loss function on 3-dimensional tensors

2017-01-13 Thread Corey Nolet
I am currently implementing the fully convolutional regression network 
which is outlined in detail in the paper "Synthetic Data for Text 
Localisation in Natural Images" by Ankush Gupta et al. 

I've got the network model compiled using the Keras API and I'm trying to 
implement the custom loss layer. This network outputs a tensor which is 
7x16x16 and each of the 7 output values for the 16x16 output feature maps 
represents a different regressed value that ultimately helps to predict the 
possibility, size, and location of text in each convolutional cell. The 
part that's confusing me is implementing the special operations that need 
to be performed in the loss function. One of the 7 regressed output values, 
let's call it "c", represents the probability of a cell to contain text. 
This is used to help determine whether or not the loss of the other 6 
output values even get back propagated. The rules in the custom loss 
function are as follows:

1. If the value 'c' is 0 (meaning there is no text in the current cell), 
the loss of the other 6 values is ignored completely (I'm assuming this 
means it's set to 0 and nothing is actually back-propagated, right?) and 
the MSE of the 'c' value is back-propagated.
2. If the value 'c' is 0, the MSE loss of the 'c' value is discounted. At 
the beginning of training, only 1% of the loss is back-propagated and this 
value is steadily increased throughout training until 100% of the loss is 
back propagated.

So, since my output tensor is 7x16x16, I believe this means I have 7 
different 16x16 matrices (feature maps) by which to specify the loss. I 
have an easier time thinking about a tensor that's 16x16x7 (which means 
each cell in the 16x16 matrix has 7 values) and I'm not exactly sure how to 
approach coding up the 2 rules that I specified because it looks like I 
need to specify them as lazily evaluated theano tensor functions instead of 
being able to loop through the tensors myself to apply the if/then/else 
logic. 

It would be helpful to be pointed in the right direction. I've looked at 
the documentation for the theano tensor functions already and I don't think 
I'm understanding exactly what a sequence of those functions may look like 
to help solve my problem. I can see there being an IfElse function in there 
but I'm not exactly sure how I would do the logic for the two steps I 
outlined above using this method. 

Thank you in advance for any examples, suggestions, or insight you are able 
to provide. 

-- 

--- 
You received this message because you are subscribed to the Google Groups 
"theano-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to theano-users+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.


Re: New Committers/PMC members!

2016-09-01 Thread Corey Nolet
Welcome, guys!

On Thu, Sep 1, 2016 at 9:53 AM, Billie Rinaldi 
wrote:

> Welcome, Mike and Marc!
>
> On Wed, Aug 31, 2016 at 7:58 AM, Josh Elser  wrote:
>
> > Hiya folks,
> >
> > I wanted to take a moment to publicly announce some recent additions to
> > the Apache Accumulo family (committers and PMC).
> >
> > We had Mike Wall join the ranks back in April (sorry for the delayed
> > announcement!) and Marc Parisi has just joined us this week.
> >
> > Thank you both for your continued contributions to the project and we all
> > look forward to working with you more!
> >
> > - Josh (on behalf of Apache Accumulo)
> >
>


Re: Hadoop

2016-06-02 Thread Corey Nolet
This may not be directly related but I've noticed Hadoop packages have been
not uninstalling/updating well the past year or so. The last couple times
I've run fedup, I've had to go back in manually and remove/update a bunch
of the Hadoop packages like Zookeeper and Parquet.

On Thu, Jun 2, 2016 at 4:59 PM, Christopher 
wrote:

> That first post was intended for the Fedora developer list. Apologies for
> sending to the wrong list.
>
> If anybody is curious, it seems the Fedora community support around Hadoop
> and Big Data is really dying... the packager for Flume and HTrace has
> abandoned their efforts to package for Fedora, and now it looks like the
> Hadoop package maintainer abandoned Hadoop, leaving Accumulo with
> unsatisfied dependencies. This is actually kind of a sad state of affairs,
> because better packaging downstream could really help users, and expose
> more ways to improve the upstream products.
>
> As it stands, I think there is a disconnect between the upstream
> communities and the downstream packagers in the Big Data space which
> includes Accumulo. I would love to see more interest in better packaging
> for downstream users through these existing downstream packager communities
> (Homebrew, Fedora, Debian, EPEL, Ubuntu, etc.), and I would love to see
> more volunteers come from these downstream communities to make improvements
> upstream.
>
> As an upstream community, I believe the responsibility is for us to reach
> down first, rather than wait for them to come to us. I've tried to do that
> within Fedora, with the hope that others would follow for the downstream
> communities they care about. Unfortunately, things haven't turned out how
> I'd have preferred, but I'm still hopeful. If there is anybody interested
> in downstream community packaging, let me know if I can help you get
> started.
>
> On Thu, Jun 2, 2016 at 4:28 PM Christopher 
> wrote:
>
> > Sorry, wrong list.
> >
> > On Thu, Jun 2, 2016 at 4:20 PM Christopher 
> > wrote:
> >
> >> So, it would seem at some point, without me noticing (certainly my
> fault,
> >> for not paying attention enough), the Hadoop packages got orphaned
> and/or
> >> retired? in Fedora.
> >>
> >> This is a big problem for me, because the main package I work on is
> >> dependent upon Hadoop.
> >>
> >> What's the state of Hadoop in Fedora these days? Are there packaging
> >> problems? Not enough support from upstream Apache community? Missing
> >> dependencies in Fedora? Not enough time to work on it? No interest from
> >> users?
> >>
> >> Whatever the issue is... I'd like to help wherever I can... I'd like to
> >> keep this stuff going.
> >>
> >
>


Re: adding ACL enforcement based on ACLProvider, for consistency

2016-04-22 Thread Corey Nolet
Appears some projects are still being hit as of 11:45am.

On Fri, Apr 22, 2016 at 11:53 AM, Jordan Zimmerman <
jor...@jordanzimmerman.com> wrote:

> I just saw this on the Apache Jira:
>
> "Jira is in Temporary Lockdown mode as a spam countermeasure. Only
> logged-in users with active roles (committer, contributor, PMC, etc.) will
> be able to create issues or comments during this time. Lockdown period from
> 22 April  UTC to estimated 22 April 2000 UTC.”
>
> So, wait until the lockdown is over.
>
> > On Apr 22, 2016, at 9:48 AM, Jordan Zimmerman <
> jor...@jordanzimmerman.com> wrote:
> >
> > I suggest you contact Apache about the issue. Your the first who’s
> mentioned any problems. For this type of feature request, you’ll need to
> write the change and send a Pull Request. A Jira issue is vital. Please
> keep trying and engage Apache if you continue to have trouble.
> >
> > -Jordan
> >
> >> On Apr 21, 2016, at 7:40 PM, Sergey Shelukhin 
> wrote:
> >>
> >> Hi.
> >> For whatever strange reason I cannot choose Apache Curator as the
> project
> >> when creating a JIRA (I can create in other projects).
> >> So, I figure I¹d post it here.
> >>
> >> Presently, one can supply an ACLProvider when creating CuratorFramework.
> >> That is used to set ACLs when creating directories.
> >> For consistency, it would be nice to have an option to enforce the same
> >> ACLs for the case when the directory already exists.
> >>
> >> In a secure scenario, one could assume that the ACLs on his secure nodes
> >> would be set to whatever ACLProvider dictates; however, if someone has
> >> created the node before with a different ACL (say, giving themselves
> >> access), this assumption would be broken. As of now, the ACLs need to be
> >> verified or enforced manually.
> >>
> >>
> >
>
>


Re: adding ACL enforcement based on ACLProvider, for consistency

2016-04-22 Thread Corey Nolet
Not sure if this is related but a bunch of projects in the Apache JIRA got
hit with a strange series of Spam messages in newly created JIRA tickets
yesterday. I know Infra adjusted some of the permissions for users as a
result.

On Fri, Apr 22, 2016 at 10:48 AM, Jordan Zimmerman <
jor...@jordanzimmerman.com> wrote:

> I suggest you contact Apache about the issue. Your the first who’s
> mentioned any problems. For this type of feature request, you’ll need to
> write the change and send a Pull Request. A Jira issue is vital. Please
> keep trying and engage Apache if you continue to have trouble.
>
> -Jordan
>
> > On Apr 21, 2016, at 7:40 PM, Sergey Shelukhin 
> wrote:
> >
> > Hi.
> > For whatever strange reason I cannot choose Apache Curator as the project
> > when creating a JIRA (I can create in other projects).
> > So, I figure I¹d post it here.
> >
> > Presently, one can supply an ACLProvider when creating CuratorFramework.
> > That is used to set ACLs when creating directories.
> > For consistency, it would be nice to have an option to enforce the same
> > ACLs for the case when the directory already exists.
> >
> > In a secure scenario, one could assume that the ACLs on his secure nodes
> > would be set to whatever ACLProvider dictates; however, if someone has
> > created the node before with a different ACL (say, giving themselves
> > access), this assumption would be broken. As of now, the ACLs need to be
> > verified or enforced manually.
> >
> >
>
>


Re: [jira] [Deleted] (ARROW-117) AVG SUPPORT 18004923958 avg Antivirus tech support number avg Antivirus Help Desk Number customer care

2016-04-21 Thread Corey Nolet
Nevermind, I just noticed the message from infrastructure. Looks like it
affected a bunch of projects.

On Thu, Apr 21, 2016 at 10:07 PM, Corey Nolet <cjno...@gmail.com> wrote:

> Should someone request that this account is suspended until further
> notice? It looks like this person may have been compromised.
>
> On Thu, Apr 21, 2016 at 7:08 PM, Daniel Takamori (JIRA) <j...@apache.org>
> wrote:
>
>>
>>  [
>> https://issues.apache.org/jira/browse/ARROW-117?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
>> ]
>>
>> Daniel Takamori deleted ARROW-117:
>> --
>>
>>
>> > AVG SUPPORT   18004923958 avg Antivirus tech support number avg
>> Antivirus Help Desk Number customer care
>> >
>> 
>> >
>> > Key: ARROW-117
>> > URL: https://issues.apache.org/jira/browse/ARROW-117
>> > Project: Apache Arrow
>> >  Issue Type: Test
>> > Environment: -avg phone number===avg Antivirus tech support
>> PHONE NUMBER Helpline tollfree
>> >Reporter: poonamsingh
>> >  Labels: features, github-import, patch, test
>> >   Original Estimate: 12h
>> >  Remaining Estimate: 12h
>> >
>> > AVG SUPPORT   18004923958 avg Antivirus tech support number avg
>> Antivirus Help Desk Number customer care
>> > >>>>>JUSTs avg Antivirus Wireless Wifi USA 1-8004923958 FREE FREE FREE
>> avg Antivirus tech support number, Wireless avg Antivirus customer service
>> number, avg Antivirus support number,avg phone number avg Antivirus
>> customer phone number helpline number,
>> > USA CANADA>>18004923958 avg TECH SUPPORT PHONE NUMBER avg Antivirus
>> CUSTOMER SERVICE PHONE NUMBER here.18004923958 avg TECH SUPPORT PHONE
>> NUMBER %%%avg Antivirus CUSTOMER SERVICE PHONE NUMBER avg technical support
>> number avg helpdesk number avg helpdesk phone number avg Helpline
>> number,avg Phone number USA CANADA
>> > (((18004923958-avg phone number===avg Antivirus tech support PHONE
>> NUMBER Helpline tollfree ===!Unable to render embedded object: File (===
>> 18004923958== avg tech support phone number ==) not found.! avg
>> install TECH SUPPORT PHONE NUMBER FREE USA CANADA !Unable to render
>> embedded object: File (18004923958 avg TECH SUPPORT PHONE NUMBER %%%avg
>> Antivirus CUSTOMER SERVICE PHONE NUMBER ===) not found.!! ***avg customer
>> service phone number!!! avg phone number avg Antivirus telephone
>> number!Unable to render embedded object: File (18004923958==avg
>> Antivirus PHONE NUMBER) not found.!!TECH SUPPORT phone number ++avg TOLL
>> FREE PHONE NUMBERPhone Number 18004923958 USA CANADA, avg 360 Support
>> Phone Number, avg Technical Support Phone Numbe here.Describe toll free avg
>> Phone Number 18004923958 USA CANADA, avg 360 Support Phone Number, avg
>> Technical Support Phone Number here.toll free avg Phone Number
>> 18004923958 USA CANADA, avg 360 Support Phone Number, avg Technical
>> Support Phone Number toll free avg Phone Number 18004923958 USA CANADA,
>> avg 360 Support Phone Number, avg Technical Support Phone Number, avg
>> Customer Support Number?? This article is in need of a technical review.
>> This article is in need of an editorial review. avg Phone Number
>> 18004923958 USA CANADA, avg Antivirus Support Phone Number, avg
>> Technical Support Phone Number, avg Customer Support Number?? avg Support
>> Phone Number 18004923958 USA CANADA, avg Antivirus Phone Number, avg
>> Antivirus Technical Support Phone Number, avg Service Support Number avg
>> Support Phone Number 18004923958 USA CANADA, avg Antivirus Phone Number,
>> avg Antivirus Technical Support Phone Number, avg Service Support Number??
>> avg Phone Number 18004923958 USA CANADA, avg 360 Support Phone Number,
>> avg Technical Support Phone Number, avg Customer Support Number?? avg
>> Support Phone Number 18004923958 USA CANADA, avg Antivirus Support Phone
>> Number, avg Technical Support Phone Number, avg Customer Support Number??
>> avg phone number #$#18004923958##$avg Antivirus phone number avg
>> security phone number avg 360 phone number #$#18004923958##$avg support
>> phone number avg symantec phone number avg phone number customer service #$#
>> 18004923958##$avg customer service phone number avg tech support phone
>> number avg technical support phone number avg phone numbers avg help phone
>> number avg Antiviru

Re: Apache Flink

2016-04-17 Thread Corey Nolet
Peyman,

I'm sorry, I missed the comment that microbatching was a waste of time. Did
someone mention this? I know this thread got pretty long so I may have
missed it somewhere above.

My comment about Spark's microbatching being a downside is stricly in
reference to CEP. Complex CEP flows are reactive and the batched streaming
technique that Spark's architecture utilizes is not very easy for
programming real-time reactive designs. The thing is, many good streaming
engines start with just that, the streaming engine. They start at the core
with an architecture that generally promotes tuple-at-a-time. Whatever they
build on top of that is strictly just to make other use-cases easier to
implement, hence the main difference between Flink and Spark.

Storm, Esper and Infosphere Streams are three examples of this that come to
mind very quickly. All three of them are powerful tuple-at-a-time streams
processing engines under the hood and all 3 of them also have abstractions
 built on top of that core that make it easier to implement more specific
and more batch processing paradigms. Flink is similar to this.

I hope you didn't take my comment as an attack that Spark's microbatching
does not follow a traditional design at it's core as most well-accepted
streams processing framework have in the past. I am not implying that
microbatching is not useful in some use cases. What I am implying is that
it does not make real-time reactive environments very easy to implement.



On Sun, Apr 17, 2016 at 8:49 PM, Peyman Mohajerian <mohaj...@gmail.com>
wrote:

> Microbatching is certainly not a waste of time, you are making way too
> strong of an statement. In fact in certain cases one tuple at the time
> makes no sense, it all depends on the use cases. In fact if you understand
> the history of the project Storm you would know that microbatching was
> added later in Storm, Trident, and it is specifically for
> microbatching/windowing.
> In certain cases you are doing aggregation/windowing and throughput is the
> dominant design consideration and you don't care what each individual
> event/tuple does, e.g. of you push different event types to separate kafka
> topics and all you care is to do a count, what is the need for single event
> processing.
>
> On Sun, Apr 17, 2016 at 12:43 PM, Corey Nolet <cjno...@gmail.com> wrote:
>
>> i have not been intrigued at all by the microbatching concept in Spark. I
>> am used to CEP in real streams processing environments like Infosphere
>> Streams & Storm where the granularity of processing is at the level of each
>> individual tuple and processing units (workers) can react immediately to
>> events being received and processed. The closest Spark streaming comes to
>> this concept is the notion of "state" that that can be updated via the
>> "updateStateBykey()" functions which are only able to be run in a
>> microbatch. Looking at the expected design changes to Spark Streaming in
>> Spark 2.0.0, it also does not look like tuple-at-a-time processing is on
>> the radar for Spark, though I have seen articles stating that more effort
>> is going to go into the Spark SQL layer in Spark streaming which may make
>> it more reminiscent of Esper.
>>
>> For these reasons, I have not even tried to implement CEP in Spark. I
>> feel it's a waste of time without immediate tuple-at-a-time processing.
>> Without this, they avoid the whole problem of "back pressure" (though keep
>> in mind, it is still very possible to overload the Spark streaming layer
>> with stages that will continue to pile up and never get worked off) but
>> they lose the granular control that you get in CEP environments by allowing
>> the rules & processors to react with the receipt of each tuple, right away.
>>
>> Awhile back, I did attempt to implement an InfoSphere Streams-like API
>> [1] on top of Apache Storm as an example of what such a design may look
>> like. It looks like Storm is going to be replaced in the not so distant
>> future by Twitter's new design called Heron. IIRC, Heron does not have an
>> open source implementation as of yet.
>>
>> [1] https://github.com/calrissian/flowmix
>>
>> On Sun, Apr 17, 2016 at 3:11 PM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Hi Corey,
>>>
>>> Can you please point me to docs on using Spark for CEP? Do we have a set
>>> of CEP libraries somewhere. I am keen on getting hold of adaptor libraries
>>> for Spark something like below
>>>
>>>
>>>
>>> ​
>>> Thanks
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>&g

Re: Apache Flink

2016-04-17 Thread Corey Nolet
i have not been intrigued at all by the microbatching concept in Spark. I
am used to CEP in real streams processing environments like Infosphere
Streams & Storm where the granularity of processing is at the level of each
individual tuple and processing units (workers) can react immediately to
events being received and processed. The closest Spark streaming comes to
this concept is the notion of "state" that that can be updated via the
"updateStateBykey()" functions which are only able to be run in a
microbatch. Looking at the expected design changes to Spark Streaming in
Spark 2.0.0, it also does not look like tuple-at-a-time processing is on
the radar for Spark, though I have seen articles stating that more effort
is going to go into the Spark SQL layer in Spark streaming which may make
it more reminiscent of Esper.

For these reasons, I have not even tried to implement CEP in Spark. I feel
it's a waste of time without immediate tuple-at-a-time processing. Without
this, they avoid the whole problem of "back pressure" (though keep in mind,
it is still very possible to overload the Spark streaming layer with stages
that will continue to pile up and never get worked off) but they lose the
granular control that you get in CEP environments by allowing the rules &
processors to react with the receipt of each tuple, right away.

Awhile back, I did attempt to implement an InfoSphere Streams-like API [1]
on top of Apache Storm as an example of what such a design may look like.
It looks like Storm is going to be replaced in the not so distant future by
Twitter's new design called Heron. IIRC, Heron does not have an open source
implementation as of yet.

[1] https://github.com/calrissian/flowmix

On Sun, Apr 17, 2016 at 3:11 PM, Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> Hi Corey,
>
> Can you please point me to docs on using Spark for CEP? Do we have a set
> of CEP libraries somewhere. I am keen on getting hold of adaptor libraries
> for Spark something like below
>
>
>
> ​
> Thanks
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 17 April 2016 at 16:07, Corey Nolet <cjno...@gmail.com> wrote:
>
>> One thing I've noticed about Flink in my following of the project has
>> been that it has established, in a few cases, some novel ideas and
>> improvements over Spark. The problem with it, however, is that both the
>> development team and the community around it are very small and many of
>> those novel improvements have been rolled directly into Spark in subsequent
>> versions. I was considering changing over my architecture to Flink at one
>> point to get better, more real-time CEP streaming support, but in the end I
>> decided to stick with Spark and just watch Flink continue to pressure it
>> into improvement.
>>
>> On Sun, Apr 17, 2016 at 11:03 AM, Koert Kuipers <ko...@tresata.com>
>> wrote:
>>
>>> i never found much info that flink was actually designed to be fault
>>> tolerant. if fault tolerance is more bolt-on/add-on/afterthought then that
>>> doesn't bode well for large scale data processing. spark was designed with
>>> fault tolerance in mind from the beginning.
>>>
>>> On Sun, Apr 17, 2016 at 9:52 AM, Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I read the benchmark published by Yahoo. Obviously they already use
>>>> Storm and inevitably very familiar with that tool. To start with although
>>>> these benchmarks were somehow interesting IMO, it lend itself to an
>>>> assurance that the tool chosen for their platform is still the best choice.
>>>> So inevitably the benchmarks and the tests were done to support
>>>> primary their approach.
>>>>
>>>> In general anything which is not done through TCP Council or similar
>>>> body is questionable..
>>>> Their argument is that because Spark handles data streaming in micro
>>>> batches then inevitably it introduces this in-built latency as per design.
>>>> In contrast, both Storm and Flink do not (at the face value) have this
>>>> issue.
>>>>
>>>> In addition as we already know Spark has far more capabilities compared
>>>> to Flink (know nothing about Storm). So really it boils down to the
>>>> business SLA to choose which tool one wants to deploy for your use case.
>>>> IMO Sp

Re: Apache Flink

2016-04-17 Thread Corey Nolet
One thing I've noticed about Flink in my following of the project has been
that it has established, in a few cases, some novel ideas and improvements
over Spark. The problem with it, however, is that both the development team
and the community around it are very small and many of those novel
improvements have been rolled directly into Spark in subsequent versions. I
was considering changing over my architecture to Flink at one point to get
better, more real-time CEP streaming support, but in the end I decided to
stick with Spark and just watch Flink continue to pressure it into
improvement.

On Sun, Apr 17, 2016 at 11:03 AM, Koert Kuipers  wrote:

> i never found much info that flink was actually designed to be fault
> tolerant. if fault tolerance is more bolt-on/add-on/afterthought then that
> doesn't bode well for large scale data processing. spark was designed with
> fault tolerance in mind from the beginning.
>
> On Sun, Apr 17, 2016 at 9:52 AM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Hi,
>>
>> I read the benchmark published by Yahoo. Obviously they already use Storm
>> and inevitably very familiar with that tool. To start with although these
>> benchmarks were somehow interesting IMO, it lend itself to an assurance
>> that the tool chosen for their platform is still the best choice. So
>> inevitably the benchmarks and the tests were done to support primary their
>> approach.
>>
>> In general anything which is not done through TCP Council or similar body
>> is questionable..
>> Their argument is that because Spark handles data streaming in micro
>> batches then inevitably it introduces this in-built latency as per design.
>> In contrast, both Storm and Flink do not (at the face value) have this
>> issue.
>>
>> In addition as we already know Spark has far more capabilities compared
>> to Flink (know nothing about Storm). So really it boils down to the
>> business SLA to choose which tool one wants to deploy for your use case.
>> IMO Spark micro batching approach is probably OK for 99% of use cases. If
>> we had in built libraries for CEP for Spark (I am searching for it), I
>> would not bother with Flink.
>>
>> HTH
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 17 April 2016 at 12:47, Ovidiu-Cristian MARCU <
>> ovidiu-cristian.ma...@inria.fr> wrote:
>>
>>> You probably read this benchmark at Yahoo, any comments from Spark?
>>>
>>> https://yahooeng.tumblr.com/post/135321837876/benchmarking-streaming-computation-engines-at
>>>
>>>
>>> On 17 Apr 2016, at 12:41, andy petrella  wrote:
>>>
>>> Just adding one thing to the mix: `that the latency for streaming data
>>> is eliminated` is insane :-D
>>>
>>> On Sun, Apr 17, 2016 at 12:19 PM Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
  It seems that Flink argues that the latency for streaming data is
 eliminated whereas with Spark RDD there is this latency.

 I noticed that Flink does not support interactive shell much like Spark
 shell where you can add jars to it to do kafka testing. The advice was to
 add the streaming Kafka jar file to CLASSPATH but that does not work.

 Most Flink documentation also rather sparce with the usual example of
 word count which is not exactly what you want.

 Anyway I will have a look at it further. I have a Spark Scala streaming
 Kafka program that works fine in Spark and I want to recode it using Scala
 for Flink with Kafka but have difficulty importing and testing libraries.

 Cheers

 Dr Mich Talebzadeh


 LinkedIn * 
 https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 *


 http://talebzadehmich.wordpress.com



 On 17 April 2016 at 02:41, Ascot Moss  wrote:

> I compared both last month, seems to me that Flink's MLLib is not yet
> ready.
>
> On Sun, Apr 17, 2016 at 12:23 AM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Thanks Ted. I was wondering if someone is using both :)
>>
>> Dr Mich Talebzadeh
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 16 April 2016 at 17:08, Ted Yu  wrote:
>>
>>> Looks like this question is more relevant on flink mailing list :-)
>>>
>>> On Sat, Apr 16, 2016 at 8:52 AM, Mich Talebzadeh <
>>> 

Re: Understanding "shared" memory implications

2016-03-19 Thread Corey Nolet
 we start with trusting so we don't get too wrapped up
> > in all the extra complexities of security. My experience with these
> things
> > is that a lot of users will frequently pick performance or footprint over
> > security for quite some time. For example, if I recall correctly, on the
> > shared file descriptor model that was initially implemented in the HDFS
> > client, that people used short-circuit reads for years before security
> was
> > correctly implemented. (Am I remembering this right?)
> >
> > Lastly, as I mentioned above, I don't think there should be any
> requirement
> > that Arrow communication be limited to only 'IPC'. As Todd points out, in
> > many cases unix domain sockets will be just fine.
> >
> > We need to implement both models because we all know that locality will
> > never be guaranteed. The IPC design/implementation needs to be good for
> > anything to make into arrow.
> >
> > thanks
> > Jacques
> >
> >
> >
> > On Wed, Mar 16, 2016 at 8:54 AM, Zhe Zhang <z...@apache.org> wrote:
> >
> >> I have similar concerns as Todd stated below. With an mmap-based
> approach,
> >> we are treating shared memory objects like files. This brings in all
> >> filesystem related considerations like ACL and lifecycle mgmt.
> >>
> >> Stepping back a little, the shared-memory work isn't really specific to
> >> Arrow. A few questions related to this:
> >> 1) Has the topic been discussed in the context of protobuf (or other IPC
> >> protocols) before? Seems Cap'n Proto (https://capnproto.org/) has
> >> zero-copy
> >> shared memory. I haven't read implementation detail though.
> >> 2) If the shared-memory work benefits a wide range of protocols, should
> it
> >> be a generalized and standalone library?
> >>
> >> Thanks,
> >> Zhe
> >>
> >> On Tue, Mar 15, 2016 at 8:30 PM Todd Lipcon <t...@cloudera.com> wrote:
> >>
> >>> Having thought about this quite a bit in the past, I think the
> mechanics
> >> of
> >>> how to share memory are by far the easiest part. The much harder part
> is
> >>> the resource management and ownership. Questions like:
> >>>
> >>> - if you are using an mmapped file in /dev/shm/, how do you make sure
> it
> >>> gets cleaned up if the process crashes?
> >>> - how do you allocate memory to it? there's nothing ensuring that
> >> /dev/shm
> >>> doesn't swap out if you try to put too much in there, and then your
> >>> in-memory super-fast access will basically collapse under swap
> thrashing
> >>> - how do you do lifecycle management across the two processes? If, say,
> >>> Kudu wants to pass a block of data to some Python program, how does it
> >> know
> >>> when the Python program is done reading it and it should be deleted?
> What
> >>> if the python program crashed in the middle - when can Kudu release it?
> >>> - how do you do security? If both sides of the connection don't trust
> >> each
> >>> other, and use length prefixes and offsets, you have to be constantly
> >>> validating and re-validating everything you read.
> >>>
> >>> Another big factor is that shared memory is not, in my experience,
> >>> immediately faster than just copying data over a unix domain socket. In
> >>> particular, the first time you read an mmapped file, you'll end up
> paying
> >>> minor page fault overhead on every page. This can be improved with
> >>> HugePages, but huge page mmaps are not supported yet in current Linux
> >> (work
> >>> going on currently to address this). So you're left with hugetlbfs,
> which
> >>> involves static allocations and much more pain.
> >>>
> >>> All the above is a long way to say: let's make sure we do the write
> >>> prototyping and up-front design before jumping into code.
> >>>
> >>> -Todd
> >>>
> >>>
> >>>
> >>> On Tue, Mar 15, 2016 at 5:54 PM, Jacques Nadeau <jacq...@apache.org>
> >>> wrote:
> >>>
> >>>> @Corey
> >>>> The POC Steven and Wes are working on is based on MappedBuffer but I'm
> >>>> looking at using netty's fork of tcnative to use shared memory
> >> directly.
> >>>>
> >>>> @Yiannis
> >>>> We need to have both RPC and a shared memory mechanisms (what I'm
> >>

Re: unsubscribe

2016-03-19 Thread Corey Nolet
Gerald,

In order to unsubscribe from this lister, you need to send an email to
user-unsubscr...@hadoop.apache.org.

On Wed, Mar 16, 2016 at 4:39 AM, Gerald-G  wrote:

>
>


Re: Understanding "shared" memory implications

2016-03-15 Thread Corey Nolet
I was seeing Netty's unsafe classes being used here, not mapped byte
buffer  not sure if that statement is completely correct but I'll have to
dog through the code again to figure that out.

The more I was looking at unsafe, it makes sense why that would be
used.apparently it's also supposed to be included on Java 9 as a first
class API
On Mar 15, 2016 7:03 PM, "Wes McKinney"  wrote:

> My understanding is that you can use java.nio.MappedByteBuffer to work
> with memory-mapped files as one way to share memory pages between Java
> (and non-Java) processes without copying.
>
> I am hoping that we can reach a POC of zero-copy Arrow memory sharing
> Java-to-Java and Java-to-C++ in the near future. Indeed this will have
> huge implications once we get it working end to end (for example,
> receiving memory from a Java process in Python without a heavy ser-de
> step -- it's what we've always dreamed of) and with the metadata and
> shared memory control flow standardized.
>
> - Wes
>
> On Wed, Mar 9, 2016 at 9:25 PM, Corey J Nolet  wrote:
> > If I understand correctly, Arrow is using Netty underneath which is
> using Sun's Unsafe API in order to allocate direct byte buffers off heap.
> It is using Netty to communicate between "client" and "server", information
> about memory addresses for data that is being requested.
> >
> > I've never attempted to use the Unsafe API to access off heap memory
> that has been allocated in one JVM from another JVM but I'm assuming this
> must be the case in order to claim that the memory is being accessed
> directly without being copied, correct?
> >
> > The implication here is huge. If the memory is being directly shared
> across processes by them being allowed to directly reach into the direct
> byte buffers, that's true shared memory. Otherwise, if there's copies going
> on, it's less appealing.
> >
> >
> > Thanks.
> >
> > Sent from my iPad
>


Re: Welcome to our new Pig PMC member Xuefu Zhang

2016-03-04 Thread Corey Nolet
Congrats!

On Thu, Mar 3, 2016 at 4:40 AM, Lorand Bendig  wrote:

> Congratulations!
> --Lorand
> On Feb 24, 2016 22:30, "Rohini Palaniswamy" 
> wrote:
>
> > It is my pleasure to announce that Xuefu Zhang is our newest addition to
> > the Pig PMC. Xuefu is a long time committer of Pig and has been actively
> > involved in driving the Pig on Spark effort for the past year.
> >
> > Please join me in congratulating Xuefu !!!
> >
> > Regards,
> > Rohini
> >
>


Re: Shuffle guarantees

2016-03-01 Thread Corey Nolet
Nevermind, a look @ the ExternalSorter class tells me that the iterator for
each key that's only partially ordered ends up being merge sorted by
equality after the fact. Wanted to post my finding on here for others who
may have the same questions.


On Tue, Mar 1, 2016 at 3:05 PM, Corey Nolet <cjno...@gmail.com> wrote:

> The reason I'm asking, I see a comment in the ExternalSorter class that
> says this:
>
> "If we need to aggregate by key, we either use a total ordering from the
> ordering parameter, or read the keys with the same hash code and compare
> them with each other for equality to merge values".
>
> How can this be assumed if the object used for the key, for instance, in
> the case where a HashPartitioner is used, cannot assume ordering and
> therefore cannot assume a comparator can be used?
>
> On Tue, Mar 1, 2016 at 2:56 PM, Corey Nolet <cjno...@gmail.com> wrote:
>
>> So if I'm using reduceByKey() with a HashPartitioner, I understand that
>> the hashCode() of my key is used to create the underlying shuffle files.
>>
>> Is anything other than hashCode() used in the shuffle files when the data
>> is pulled into the reducers and run through the reduce function? The reason
>> I'm asking is because there's a possibility of hashCode() colliding in two
>> different objects which end up hashing to the same number, right?
>>
>>
>>
>


Re: Shuffle guarantees

2016-03-01 Thread Corey Nolet
The reason I'm asking, I see a comment in the ExternalSorter class that
says this:

"If we need to aggregate by key, we either use a total ordering from the
ordering parameter, or read the keys with the same hash code and compare
them with each other for equality to merge values".

How can this be assumed if the object used for the key, for instance, in
the case where a HashPartitioner is used, cannot assume ordering and
therefore cannot assume a comparator can be used?

On Tue, Mar 1, 2016 at 2:56 PM, Corey Nolet <cjno...@gmail.com> wrote:

> So if I'm using reduceByKey() with a HashPartitioner, I understand that
> the hashCode() of my key is used to create the underlying shuffle files.
>
> Is anything other than hashCode() used in the shuffle files when the data
> is pulled into the reducers and run through the reduce function? The reason
> I'm asking is because there's a possibility of hashCode() colliding in two
> different objects which end up hashing to the same number, right?
>
>
>


Shuffle guarantees

2016-03-01 Thread Corey Nolet
So if I'm using reduceByKey() with a HashPartitioner, I understand that the
hashCode() of my key is used to create the underlying shuffle files.

Is anything other than hashCode() used in the shuffle files when the data
is pulled into the reducers and run through the reduce function? The reason
I'm asking is because there's a possibility of hashCode() colliding in two
different objects which end up hashing to the same number, right?


Re: Shuffle memory woes

2016-02-08 Thread Corey Nolet
I sure do! [1] And yes- I'm really hoping they will chime in, otherwise I
may dig a little deeper myself and start posting some jira tickets.

[1] http://www.slideshare.net/cjnolet

On Mon, Feb 8, 2016 at 3:02 AM, Igor Berman <igor.ber...@gmail.com> wrote:

> It's interesting to see what spark dev people will say.
> Corey do you have presentation available online?
>
> On 8 February 2016 at 05:16, Corey Nolet <cjno...@gmail.com> wrote:
>
>> Charles,
>>
>> Thank you for chiming in and I'm glad someone else is experiencing this
>> too and not just me. I know very well how the Spark shuffles work and I've
>> done deep dive presentations @ Spark meetups in the past. This problem is
>> somethng that goes beyond that and, I believe, it exposes a fundamental
>> paradigm flaw in the design of Spark, unfortunately. Good thing is, I think
>> it can be fixed.
>>
>> Also- in regards to how much data actually gets shuffled- believe it or
>> not this problem can take a 30-40 minute job and make it run for 4 or more
>> hours. If  let the job run for 4+ hours the amount of data being shuffled
>> for this particular dataset will be 100 or more TB. Usually, however, I end
>> up killing the job long before that point because I realize it should not
>> be taking this long. The particular dataset we're doing is not for
>> real-time exploration. These are very large joins we're doing for jobs that
>> we run a few times a day.
>>
>> On Sun, Feb 7, 2016 at 9:56 PM, Charles Chao <xpnc54byp...@gmail.com>
>> wrote:
>>
>>>  "The dataset is 100gb at most, the spills can up to 10T-100T"
>>>
>>> -- I have had the same experiences, although not to this extreme (the
>>> spills were < 10T while the input was ~ 100s gb) and haven't found any
>>> solution yet. I don't believe this is related to input data format. in my
>>> case, I got my input data by loading from Hive tables.
>>>
>>> On Sun, Feb 7, 2016 at 6:28 AM, Sea <261810...@qq.com> wrote:
>>>
>>>> Hi,Corey:
>>>>    "The dataset is 100gb at most, the spills can up to 10T-100T", Are
>>>> your input files lzo format, and you use sc.text() ? If memory is not
>>>> enough, spark will spill 3-4x of input data to disk.
>>>>
>>>>
>>>> -- 原始邮件 --
>>>> *发件人:* "Corey Nolet";<cjno...@gmail.com>;
>>>> *发送时间:* 2016年2月7日(星期天) 晚上8:56
>>>> *收件人:* "Igor Berman"<igor.ber...@gmail.com>;
>>>> *抄送:* "user"<user@spark.apache.org>;
>>>> *主题:* Re: Shuffle memory woes
>>>>
>>>> As for the second part of your questions- we have a fairly complex join
>>>> process which requires a ton of stage orchestration from our driver. I've
>>>> written some code to be able to walk down our DAG tree and execute siblings
>>>> in the tree concurrently where possible (forcing cache to disk on children
>>>> that that have multiple chiildren themselves so that they can be run
>>>> concurrently). Ultimatey, we have seen significant speedup in our jobs by
>>>> keeping tasks as busy as possible processing concurrent stages. Funny
>>>> enough though, the stage that is causing problems with shuffling for us has
>>>> a lot of children and doesn't even run concurrently with any other stages
>>>> so I ruled out the concurrency of the stages as a culprit for the
>>>> shuffliing problem we're seeing.
>>>>
>>>> On Sun, Feb 7, 2016 at 7:49 AM, Corey Nolet <cjno...@gmail.com> wrote:
>>>>
>>>>> Igor,
>>>>>
>>>>> I don't think the question is "why can't it fit stuff in memory". I
>>>>> know why it can't fit stuff in memory- because it's a large dataset that
>>>>> needs to have a reduceByKey() run on it. My understanding is that when it
>>>>> doesn't fit into memory it needs to spill in order to consolidate
>>>>> intermediary files into a single file. The more data you need to run
>>>>> through this, the more it will need to spill. My findings is that once it
>>>>> gets stuck in this spill chain with our dataset it's all over @ that point
>>>>> because it will spill and spill and spill and spill and spill. If I give
>>>>> the shuffle enough memory it won't- irrespective of the number of
>>>>> partitions we have (i've done everything from repartition(500) to
>>>>> repartition(250

Re: Welcoming two new committers

2016-02-08 Thread Corey Nolet
Congrats guys!

On Mon, Feb 8, 2016 at 12:23 PM, Ted Yu  wrote:

> Congratulations, Herman and Wenchen.
>
> On Mon, Feb 8, 2016 at 9:15 AM, Matei Zaharia 
> wrote:
>
>> Hi all,
>>
>> The PMC has recently added two new Spark committers -- Herman van Hovell
>> and Wenchen Fan. Both have been heavily involved in Spark SQL and Tungsten,
>> adding new features, optimizations and APIs. Please join me in welcoming
>> Herman and Wenchen.
>>
>> Matei
>> -
>> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
>> For additional commands, e-mail: dev-h...@spark.apache.org
>>
>>
>


Re: Shuffle memory woes

2016-02-07 Thread Corey Nolet
As for the second part of your questions- we have a fairly complex join
process which requires a ton of stage orchestration from our driver. I've
written some code to be able to walk down our DAG tree and execute siblings
in the tree concurrently where possible (forcing cache to disk on children
that that have multiple chiildren themselves so that they can be run
concurrently). Ultimatey, we have seen significant speedup in our jobs by
keeping tasks as busy as possible processing concurrent stages. Funny
enough though, the stage that is causing problems with shuffling for us has
a lot of children and doesn't even run concurrently with any other stages
so I ruled out the concurrency of the stages as a culprit for the
shuffliing problem we're seeing.

On Sun, Feb 7, 2016 at 7:49 AM, Corey Nolet <cjno...@gmail.com> wrote:

> Igor,
>
> I don't think the question is "why can't it fit stuff in memory". I know
> why it can't fit stuff in memory- because it's a large dataset that needs
> to have a reduceByKey() run on it. My understanding is that when it doesn't
> fit into memory it needs to spill in order to consolidate intermediary
> files into a single file. The more data you need to run through this, the
> more it will need to spill. My findings is that once it gets stuck in this
> spill chain with our dataset it's all over @ that point because it will
> spill and spill and spill and spill and spill. If I give the shuffle enough
> memory it won't- irrespective of the number of partitions we have (i've
> done everything from repartition(500) to repartition(2500)). It's not a
> matter of running out of memory on a single node because the data is
> skewed. It's more a matter of the shuffle buffer filling up and needing to
> spill. I think what may be happening is that it gets to a point where it's
> spending more time reading/writing from disk while doing the spills then it
> is actually processing any data. I can tell this because I can see that the
> spills sometimes get up into the 10's to 100's of TB where the input data
> was maybe 100gb at most. Unfortunately my code is on a private internal
> network and I'm not able to share it.
>
> On Sun, Feb 7, 2016 at 3:38 AM, Igor Berman <igor.ber...@gmail.com> wrote:
>
>> so can you provide code snippets: especially it's interesting to see what
>> are your transformation chain, how many partitions are there on each side
>> of shuffle operation
>>
>> the question is why it can't fit stuff in memory when you are shuffling -
>> maybe your partitioner on "reduce" side is not configured properly? I mean
>> if map side is ok, and you just reducing by key or something it should be
>> ok, so some detail is missing...skewed data? aggregate by key?
>>
>> On 6 February 2016 at 20:13, Corey Nolet <cjno...@gmail.com> wrote:
>>
>>> Igor,
>>>
>>> Thank you for the response but unfortunately, the problem I'm referring
>>> to goes beyond this. I have set the shuffle memory fraction to be 90% and
>>> set the cache memory to be 0. Repartitioning the RDD helped a tad on the
>>> map side but didn't do much for the spilling when there was no longer any
>>> memory left for the shuffle. Also the new auto-memory management doesn't
>>> seem like it'll have too much of an effect after i've already given most
>>> the memory i've allocated to the shuffle. The problem I'm having is most
>>> specifically related to the shuffle performing declining by several orders
>>> of magnitude when it needs to spill multiple times (it ends up spilling
>>> several hundred for me when it can't fit stuff into memory).
>>>
>>>
>>>
>>> On Sat, Feb 6, 2016 at 6:40 AM, Igor Berman <igor.ber...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>> usually you can solve this by 2 steps
>>>> make rdd to have more partitions
>>>> play with shuffle memory fraction
>>>>
>>>> in spark 1.6 cache vs shuffle memory fractions are adjusted
>>>> automatically
>>>>
>>>> On 5 February 2016 at 23:07, Corey Nolet <cjno...@gmail.com> wrote:
>>>>
>>>>> I just recently had a discovery that my jobs were taking several hours
>>>>> to completely because of excess shuffle spills. What I found was that when
>>>>> I hit the high point where I didn't have enough memory for the shuffles to
>>>>> store all of their file consolidations at once, it could spill so many
>>>>> times that it causes my job's runtime to increase by orders of magnitude
>>>>> (and sometimes fail altogether).
>>>>>
>>

Re: Shuffle memory woes

2016-02-07 Thread Corey Nolet
Igor,

I don't think the question is "why can't it fit stuff in memory". I know
why it can't fit stuff in memory- because it's a large dataset that needs
to have a reduceByKey() run on it. My understanding is that when it doesn't
fit into memory it needs to spill in order to consolidate intermediary
files into a single file. The more data you need to run through this, the
more it will need to spill. My findings is that once it gets stuck in this
spill chain with our dataset it's all over @ that point because it will
spill and spill and spill and spill and spill. If I give the shuffle enough
memory it won't- irrespective of the number of partitions we have (i've
done everything from repartition(500) to repartition(2500)). It's not a
matter of running out of memory on a single node because the data is
skewed. It's more a matter of the shuffle buffer filling up and needing to
spill. I think what may be happening is that it gets to a point where it's
spending more time reading/writing from disk while doing the spills then it
is actually processing any data. I can tell this because I can see that the
spills sometimes get up into the 10's to 100's of TB where the input data
was maybe 100gb at most. Unfortunately my code is on a private internal
network and I'm not able to share it.

On Sun, Feb 7, 2016 at 3:38 AM, Igor Berman <igor.ber...@gmail.com> wrote:

> so can you provide code snippets: especially it's interesting to see what
> are your transformation chain, how many partitions are there on each side
> of shuffle operation
>
> the question is why it can't fit stuff in memory when you are shuffling -
> maybe your partitioner on "reduce" side is not configured properly? I mean
> if map side is ok, and you just reducing by key or something it should be
> ok, so some detail is missing...skewed data? aggregate by key?
>
> On 6 February 2016 at 20:13, Corey Nolet <cjno...@gmail.com> wrote:
>
>> Igor,
>>
>> Thank you for the response but unfortunately, the problem I'm referring
>> to goes beyond this. I have set the shuffle memory fraction to be 90% and
>> set the cache memory to be 0. Repartitioning the RDD helped a tad on the
>> map side but didn't do much for the spilling when there was no longer any
>> memory left for the shuffle. Also the new auto-memory management doesn't
>> seem like it'll have too much of an effect after i've already given most
>> the memory i've allocated to the shuffle. The problem I'm having is most
>> specifically related to the shuffle performing declining by several orders
>> of magnitude when it needs to spill multiple times (it ends up spilling
>> several hundred for me when it can't fit stuff into memory).
>>
>>
>>
>> On Sat, Feb 6, 2016 at 6:40 AM, Igor Berman <igor.ber...@gmail.com>
>> wrote:
>>
>>> Hi,
>>> usually you can solve this by 2 steps
>>> make rdd to have more partitions
>>> play with shuffle memory fraction
>>>
>>> in spark 1.6 cache vs shuffle memory fractions are adjusted automatically
>>>
>>> On 5 February 2016 at 23:07, Corey Nolet <cjno...@gmail.com> wrote:
>>>
>>>> I just recently had a discovery that my jobs were taking several hours
>>>> to completely because of excess shuffle spills. What I found was that when
>>>> I hit the high point where I didn't have enough memory for the shuffles to
>>>> store all of their file consolidations at once, it could spill so many
>>>> times that it causes my job's runtime to increase by orders of magnitude
>>>> (and sometimes fail altogether).
>>>>
>>>> I've played with all the tuning parameters I can find. To speed the
>>>> shuffles up, I tuned the akka threads to different values. I also tuned the
>>>> shuffle buffering a tad (both up and down).
>>>>
>>>> I feel like I see a weak point here. The mappers are sharing memory
>>>> space with reducers and the shuffles need enough memory to consolidate and
>>>> pull otherwise they will need to spill and spill and spill. What i've
>>>> noticed about my jobs is that this is a difference between them taking 30
>>>> minutes and 4 hours or more. Same job- just different memory tuning.
>>>>
>>>> I've found that, as a result of the spilling, I'm better off not
>>>> caching any data in memory and lowering my storage fraction to 0 and still
>>>> hoping I was able to give my shuffles enough memory that my data doesn't
>>>> continuously spill. Is this the way it's supposed to be? It makes it hard
>>>> because it seems like it forces the memory limits on my job- otherwise it
>>>> could take orders of magnitude longer to execute.
>>>>
>>>>
>>>
>>
>


Re: Shuffle memory woes

2016-02-07 Thread Corey Nolet
Charles,

Thank you for chiming in and I'm glad someone else is experiencing this too
and not just me. I know very well how the Spark shuffles work and I've done
deep dive presentations @ Spark meetups in the past. This problem is
somethng that goes beyond that and, I believe, it exposes a fundamental
paradigm flaw in the design of Spark, unfortunately. Good thing is, I think
it can be fixed.

Also- in regards to how much data actually gets shuffled- believe it or not
this problem can take a 30-40 minute job and make it run for 4 or more
hours. If  let the job run for 4+ hours the amount of data being shuffled
for this particular dataset will be 100 or more TB. Usually, however, I end
up killing the job long before that point because I realize it should not
be taking this long. The particular dataset we're doing is not for
real-time exploration. These are very large joins we're doing for jobs that
we run a few times a day.

On Sun, Feb 7, 2016 at 9:56 PM, Charles Chao <xpnc54byp...@gmail.com> wrote:

>  "The dataset is 100gb at most, the spills can up to 10T-100T"
>
> -- I have had the same experiences, although not to this extreme (the
> spills were < 10T while the input was ~ 100s gb) and haven't found any
> solution yet. I don't believe this is related to input data format. in my
> case, I got my input data by loading from Hive tables.
>
> On Sun, Feb 7, 2016 at 6:28 AM, Sea <261810...@qq.com> wrote:
>
>> Hi,Corey:
>>"The dataset is 100gb at most, the spills can up to 10T-100T", Are
>> your input files lzo format, and you use sc.text() ? If memory is not
>> enough, spark will spill 3-4x of input data to disk.
>>
>>
>> -- 原始邮件 --
>> *发件人:* "Corey Nolet";<cjno...@gmail.com>;
>> *发送时间:* 2016年2月7日(星期天) 晚上8:56
>> *收件人:* "Igor Berman"<igor.ber...@gmail.com>;
>> *抄送:* "user"<user@spark.apache.org>;
>> *主题:* Re: Shuffle memory woes
>>
>> As for the second part of your questions- we have a fairly complex join
>> process which requires a ton of stage orchestration from our driver. I've
>> written some code to be able to walk down our DAG tree and execute siblings
>> in the tree concurrently where possible (forcing cache to disk on children
>> that that have multiple chiildren themselves so that they can be run
>> concurrently). Ultimatey, we have seen significant speedup in our jobs by
>> keeping tasks as busy as possible processing concurrent stages. Funny
>> enough though, the stage that is causing problems with shuffling for us has
>> a lot of children and doesn't even run concurrently with any other stages
>> so I ruled out the concurrency of the stages as a culprit for the
>> shuffliing problem we're seeing.
>>
>> On Sun, Feb 7, 2016 at 7:49 AM, Corey Nolet <cjno...@gmail.com> wrote:
>>
>>> Igor,
>>>
>>> I don't think the question is "why can't it fit stuff in memory". I know
>>> why it can't fit stuff in memory- because it's a large dataset that needs
>>> to have a reduceByKey() run on it. My understanding is that when it doesn't
>>> fit into memory it needs to spill in order to consolidate intermediary
>>> files into a single file. The more data you need to run through this, the
>>> more it will need to spill. My findings is that once it gets stuck in this
>>> spill chain with our dataset it's all over @ that point because it will
>>> spill and spill and spill and spill and spill. If I give the shuffle enough
>>> memory it won't- irrespective of the number of partitions we have (i've
>>> done everything from repartition(500) to repartition(2500)). It's not a
>>> matter of running out of memory on a single node because the data is
>>> skewed. It's more a matter of the shuffle buffer filling up and needing to
>>> spill. I think what may be happening is that it gets to a point where it's
>>> spending more time reading/writing from disk while doing the spills then it
>>> is actually processing any data. I can tell this because I can see that the
>>> spills sometimes get up into the 10's to 100's of TB where the input data
>>> was maybe acquireExecutionMemory at most. Unfortunately my code is on a
>>> private internal network and I'm not able to share it.
>>>
>>> On Sun, Feb 7, 2016 at 3:38 AM, Igor Berman <igor.ber...@gmail.com>
>>> wrote:
>>>
>>>> so can you provide code snippets: especially it's interesting to see
>>>> what are your transformation chain, how many partitions are there on each
>>>> side of shuffle operation
>>&

Re: Help needed in deleting a message posted in Spark User List

2016-02-06 Thread Corey Nolet
The whole purpose of Apache mailing lists is that the messages get indexed
all over the web so that discussions and questions/solutions can be
searched easily by google and other engines.

For this reason, and the messages being sent via email as Steve pointed
out, it's just not possible to retract the messages.

On Sat, Feb 6, 2016 at 10:21 AM, Steve Loughran 
wrote:

>
> > On 5 Feb 2016, at 17:35, Marcelo Vanzin  wrote:
> >
> > You don't... just send a new one.
> >
> > On Fri, Feb 5, 2016 at 9:33 AM, swetha kasireddy
> >  wrote:
> >> Hi,
> >>
> >> I want to edit/delete a message posted in Spark User List. How do I do
> that?
> >>
> >> Thanks!
> >
> >
> >
>
> it isn't technically possible
>
> http://apache.org/foundation/public-archives.html
>
> People do occasionally ask on the infrastructure mailing list to do do
> this, but they aren't in a position to do anything about the copies that
> end up in the mailboxes of every subscriber.
>
> Don't worry about it; we've all done things like post internal stack
> traces, accidentally mail the wrong list, etc, etc.
>
> Now, accidentally breaking the nightly build of everything, that's
> somewhat embarrassing —but you haven't done that and it's been ~4 months
> since I've done that myself.
>
>
> -Steve


Re: Shuffle memory woes

2016-02-06 Thread Corey Nolet
Igor,

Thank you for the response but unfortunately, the problem I'm referring to
goes beyond this. I have set the shuffle memory fraction to be 90% and set
the cache memory to be 0. Repartitioning the RDD helped a tad on the map
side but didn't do much for the spilling when there was no longer any
memory left for the shuffle. Also the new auto-memory management doesn't
seem like it'll have too much of an effect after i've already given most
the memory i've allocated to the shuffle. The problem I'm having is most
specifically related to the shuffle performing declining by several orders
of magnitude when it needs to spill multiple times (it ends up spilling
several hundred for me when it can't fit stuff into memory).



On Sat, Feb 6, 2016 at 6:40 AM, Igor Berman <igor.ber...@gmail.com> wrote:

> Hi,
> usually you can solve this by 2 steps
> make rdd to have more partitions
> play with shuffle memory fraction
>
> in spark 1.6 cache vs shuffle memory fractions are adjusted automatically
>
> On 5 February 2016 at 23:07, Corey Nolet <cjno...@gmail.com> wrote:
>
>> I just recently had a discovery that my jobs were taking several hours to
>> completely because of excess shuffle spills. What I found was that when I
>> hit the high point where I didn't have enough memory for the shuffles to
>> store all of their file consolidations at once, it could spill so many
>> times that it causes my job's runtime to increase by orders of magnitude
>> (and sometimes fail altogether).
>>
>> I've played with all the tuning parameters I can find. To speed the
>> shuffles up, I tuned the akka threads to different values. I also tuned the
>> shuffle buffering a tad (both up and down).
>>
>> I feel like I see a weak point here. The mappers are sharing memory space
>> with reducers and the shuffles need enough memory to consolidate and pull
>> otherwise they will need to spill and spill and spill. What i've noticed
>> about my jobs is that this is a difference between them taking 30 minutes
>> and 4 hours or more. Same job- just different memory tuning.
>>
>> I've found that, as a result of the spilling, I'm better off not caching
>> any data in memory and lowering my storage fraction to 0 and still hoping I
>> was able to give my shuffles enough memory that my data doesn't
>> continuously spill. Is this the way it's supposed to be? It makes it hard
>> because it seems like it forces the memory limits on my job- otherwise it
>> could take orders of magnitude longer to execute.
>>
>>
>


Shuffle memory woes

2016-02-05 Thread Corey Nolet
I just recently had a discovery that my jobs were taking several hours to
completely because of excess shuffle spills. What I found was that when I
hit the high point where I didn't have enough memory for the shuffles to
store all of their file consolidations at once, it could spill so many
times that it causes my job's runtime to increase by orders of magnitude
(and sometimes fail altogether).

I've played with all the tuning parameters I can find. To speed the
shuffles up, I tuned the akka threads to different values. I also tuned the
shuffle buffering a tad (both up and down).

I feel like I see a weak point here. The mappers are sharing memory space
with reducers and the shuffles need enough memory to consolidate and pull
otherwise they will need to spill and spill and spill. What i've noticed
about my jobs is that this is a difference between them taking 30 minutes
and 4 hours or more. Same job- just different memory tuning.

I've found that, as a result of the spilling, I'm better off not caching
any data in memory and lowering my storage fraction to 0 and still hoping I
was able to give my shuffles enough memory that my data doesn't
continuously spill. Is this the way it's supposed to be? It makes it hard
because it seems like it forces the memory limits on my job- otherwise it
could take orders of magnitude longer to execute.


Re: ROSE: Spark + R on the JVM.

2016-01-12 Thread Corey Nolet
David,

Thank you very much for announcing this! It looks like it could be very
useful. Would you mind providing a link to the github?

On Tue, Jan 12, 2016 at 10:03 AM, David 
wrote:

> Hi all,
>
> I'd like to share news of the recent release of a new Spark package, ROSE.
>
>
> ROSE is a Scala library offering access to the full scientific computing
> power of the R programming language to Apache Spark batch and streaming
> applications on the JVM. Where Apache SparkR lets data scientists use Spark
> from R, ROSE is designed to let Scala and Java developers use R from Spark.
>
> The project is available and documented on GitHub and I would encourage
> you to take a look. Any feedback, questions etc very welcome.
>
> David
>
> "All that is gold does not glitter, Not all those who wander are lost."
>


Re: ROSE: Spark + R on the JVM.

2016-01-12 Thread Corey Nolet
David,

Thank you very much for announcing this! It looks like it could be very
useful. Would you mind providing a link to the github?

On Tue, Jan 12, 2016 at 10:03 AM, David 
wrote:

> Hi all,
>
> I'd like to share news of the recent release of a new Spark package, ROSE.
>
>
> ROSE is a Scala library offering access to the full scientific computing
> power of the R programming language to Apache Spark batch and streaming
> applications on the JVM. Where Apache SparkR lets data scientists use Spark
> from R, ROSE is designed to let Scala and Java developers use R from Spark.
>
> The project is available and documented on GitHub and I would encourage
> you to take a look. Any feedback, questions etc very welcome.
>
> David
>
> "All that is gold does not glitter, Not all those who wander are lost."
>


Re: Hetergeneous Hadoop Cluster

2015-09-25 Thread Corey Nolet
I'm basically referring to federating multiple namenodes (connecting two
different hdfs instances under a single namespace so data can be
distributed across them). Here's the documentation for Hadoop 2.6.0 [1]

[1]
https://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/Federation.html


On Fri, Sep 25, 2015 at 12:42 AM, Ashish Kumar9 <ashis...@in.ibm.com> wrote:

> This is interesting . Can you share any blog/document that talks
> multi-volume HDFS instances .
>
> Thanks and Regards,
> Ashish Kumar
>
>
> From:Corey Nolet <cjno...@gmail.com>
> To:user@hadoop.apache.org
> Date:09/24/2015 10:40 PM
> Subject:Re: Hetergeneous Hadoop Cluster
> --
>
>
>
> If the hardware is drastically different, I would think a multi-volume
> HDFS instance would be a good idea (put like-hardware in the same volumes).
>
> On Mon, Sep 21, 2015 at 3:29 PM, Tushar Kapila <*tgkp...@gmail.com*
> <tgkp...@gmail.com>> wrote:
> Would only matter if OS specific communication was being used between
> nodes. I assume they do not do that.
> If that is true -> It would depend on the network and each nodes config
> for the work it is doing. Cluster performance would not suffer just because
> it is heterogeneous.
>
> On Mon, Sep 21, 2015 at 8:09 PM, Ashish Kumar9 <*ashis...@in.ibm.com*
> <ashis...@in.ibm.com>> wrote:
> Hi :
>
> Has anyone tried a heterogeneous hadoop cluster with management nodes and
> data nodes running on multiple linux distros. and on multiple h/w
> architecture .
>
> If so , what is the performance of such cluster .
>
> Thanks
> Ashish
>
>
>
> --
> Regards
> Tushar Kapila
>
>


Re: Hetergeneous Hadoop Cluster

2015-09-24 Thread Corey Nolet
If the hardware is drastically different, I would think a multi-volume HDFS
instance would be a good idea (put like-hardware in the same volumes).

On Mon, Sep 21, 2015 at 3:29 PM, Tushar Kapila  wrote:

> Would only matter if OS specific communication was being used between
> nodes. I assume they do not do that.
> If that is true -> It would depend on the network and each nodes config
> for the work it is doing. Cluster performance would not suffer just because
> it is heterogeneous.
>
> On Mon, Sep 21, 2015 at 8:09 PM, Ashish Kumar9 
> wrote:
>
>> Hi :
>>
>> Has anyone tried a heterogeneous hadoop cluster with management nodes and
>> data nodes running on multiple linux distros. and on multiple h/w
>> architecture .
>>
>> If so , what is the performance of such cluster .
>>
>> Thanks
>> Ashish
>
>
>
>
> --
> Regards
> Tushar Kapila
>


Re: Forecasting Library For Apache Spark

2015-09-21 Thread Corey Nolet
Mohamed,

Have you checked out the Spark Timeseries [1] project? Non-seasonal ARIMA
was added to this recently and seasonal ARIMA should be following shortly.

[1] https://github.com/cloudera/spark-timeseries

On Mon, Sep 21, 2015 at 7:47 AM, Mohamed Baddar 
wrote:

> Hello everybody , this my first mail in the List , and i would like to
> introduce my self first :)
> My Name is Mohamed baddar , I work as Big Data and Analytics Software
> Engieer at BADRIT (http://badrit.com/) , a software Startup with focus in
> Big Data , also i have been working for 6+ years at IBM R Egypt , in HPC
> , Big Data and Analytics Are
>
> I just have a question , i can't find supported Apache Spark library for
> forecasting using ARIMA , ETS , Bayesian model or any method , is there any
> plans for such a development , as i can't find any issue talking about it ,
> is any one interested to have/develop a related module , as i find it a
> critical feature to be added to SPARK
>
> Thanks
>


Re: Mini Accumulo Cluster reusing the directory

2015-09-16 Thread Corey Nolet
Sven,

What version of Accumulo are you running? We have a ticket for this [1]
which has had a lot of discussion on it. Christopher Tubbs mentioned that
he had gotten this to work.

[1] https://issues.apache.org/jira/browse/ACCUMULO-1378

On Wed, Sep 16, 2015 at 9:20 AM, Sven Hodapp  wrote:

> Hi there,
>
> is it possible for MiniAccumuloCluster to reuse a given directory?
> Sadly, I haven't found anything in the docs?
>
> I’ll fire up my instance like this:
>
>val dict = new File("/tmp/accumulo-mini-cluster")
>val accumulo = new MiniAccumuloCluster(dict, "test“)
>
> If I’ll restart my JVM it will raise a error like this:
>
>Exception in thread "main" java.lang.IllegalArgumentException:
> Directory /tmp/accumulo-mini-cluster is not empty
>
> It would be nice if the data can survive a JVM restart and the folder
> structure must not be constructed every time.
>
> Thanks a lot!
>
> Regards,
> Sven
>
> --
> Sven Hodapp M.Sc.,
> Fraunhofer Institute for Algorithms and Scientific Computing SCAI,
> Department of Bioinformatics
> Schloss Birlinghoven, 53754 Sankt Augustin, Germany
> sven.hod...@scai.fraunhofer.de
> www.scai.fraunhofer.de
>


Re: Mini Accumulo Cluster reusing the directory

2015-09-16 Thread Corey Nolet
Christopher, my reply to Sven was just going off of your last comment on
the ticket. Perhaps I misread the comment.

On Wed, Sep 16, 2015 at 10:50 AM, Christopher <ctubb...@apache.org> wrote:

> Hi Sven, I think Corey was mistaken. I don't recall working on that,
> exactly.
>
> On Wed, Sep 16, 2015, 10:40 Sven Hodapp <sven.hod...@scai.fraunhofer.de>
> wrote:
>
>> Hi Corey,
>>
>> thanks for your reply and the link. Sounds good, if that will be
>> available in the future!
>> Is the code from Christopher somewhere deployed?
>>
>> Currently I'm using version 1.7
>>
>> Regards,
>> Sven
>>
>> - Ursprüngliche Mail -
>> > Von: "Corey Nolet" <cjno...@gmail.com>
>> > An: "user" <user@accumulo.apache.org>
>> > Gesendet: Mittwoch, 16. September 2015 16:31:02
>> > Betreff: Re: Mini Accumulo Cluster reusing the directory
>>
>> > Sven,
>> >
>> > What version of Accumulo are you running? We have a ticket for this [1]
>> > which has had a lot of discussion on it. Christopher Tubbs mentioned
>> that
>> > he had gotten this to work.
>> >
>> > [1] https://issues.apache.org/jira/browse/ACCUMULO-1378
>> >
>> > On Wed, Sep 16, 2015 at 9:20 AM, Sven Hodapp <
>> sven.hod...@scai.fraunhofer.de
>> >> wrote:
>> >
>> >> Hi there,
>> >>
>> >> is it possible for MiniAccumuloCluster to reuse a given directory?
>> >> Sadly, I haven't found anything in the docs?
>> >>
>> >> I’ll fire up my instance like this:
>> >>
>> >>val dict = new File("/tmp/accumulo-mini-cluster")
>> >>val accumulo = new MiniAccumuloCluster(dict, "test“)
>> >>
>> >> If I’ll restart my JVM it will raise a error like this:
>> >>
>> >>Exception in thread "main" java.lang.IllegalArgumentException:
>> >> Directory /tmp/accumulo-mini-cluster is not empty
>> >>
>> >> It would be nice if the data can survive a JVM restart and the folder
>> >> structure must not be constructed every time.
>> >>
>> >> Thanks a lot!
>> >>
>> >> Regards,
>> >> Sven
>> >>
>> >> --
>> >> Sven Hodapp M.Sc.,
>> >> Fraunhofer Institute for Algorithms and Scientific Computing SCAI,
>> >> Department of Bioinformatics
>> >> Schloss Birlinghoven, 53754 Sankt Augustin, Germany
>> >> sven.hod...@scai.fraunhofer.de
>> >> www.scai.fraunhofer.de
>>
>


Re: MongoDB and Spark

2015-09-11 Thread Corey Nolet
Unfortunately, MongoDB does not directly expose its locality via its client
API so the problem with trying to schedule Spark tasks against it is that
the tasks themselves cannot be scheduled locally on nodes containing query
results- which means you can only assume most results will be sent over the
network to the task that needs to process it. This is bad. The other reason
(which is also related to the issue of locality) is that I'm not sure if
there's an easy way to spread the results of a query over multiple
different clients- thus you'd probably have to start your Spark RDD with a
single partition and then repartition. What you've done at that point is
you've taken data from multiple mongodb nodes and you've collected them on
a single node just to re-partition them, again across the network, onto
multiple nodes. This is also bad.

I think this is the reason it was recommended to use MongoDB's mapreduce
because they can use their locality information internally. I had this same
issue w/ Couchbase a couple years back- it's unfortunate but it's the
reality.




On Fri, Sep 11, 2015 at 9:34 AM, Sandeep Giri 
wrote:

> I think it should be possible by loading collections as RDD and then doing
> a union on them.
>
> Regards,
> Sandeep Giri,
> +1 347 781 4573 (US)
> +91-953-899-8962 (IN)
>
> www.KnowBigData.com. 
> Phone: +1-253-397-1945 (Office)
>
> [image: linkedin icon]  [image:
> other site icon]   [image: facebook icon]
>  [image: twitter icon]
>  
>
>
> On Fri, Sep 11, 2015 at 3:40 PM, Mishra, Abhishek <
> abhishek.mis...@xerox.com> wrote:
>
>> Anything using Spark RDD’s ???
>>
>>
>>
>> Abhishek
>>
>>
>>
>> *From:* Sandeep Giri [mailto:sand...@knowbigdata.com]
>> *Sent:* Friday, September 11, 2015 3:19 PM
>> *To:* Mishra, Abhishek; u...@spark.apache.org; dev@spark.apache.org
>> *Subject:* Re: MongoDB and Spark
>>
>>
>>
>> use map-reduce.
>>
>>
>>
>> On Fri, Sep 11, 2015, 14:32 Mishra, Abhishek 
>> wrote:
>>
>> Hello ,
>>
>>
>>
>> Is there any way to query multiple collections from mongodb using spark
>> and java.  And i want to create only one Configuration Object. Please help
>> if anyone has something regarding this.
>>
>>
>>
>>
>>
>> Thank You
>>
>> Abhishek
>>
>>
>


Re: MongoDB and Spark

2015-09-11 Thread Corey Nolet
Unfortunately, MongoDB does not directly expose its locality via its client
API so the problem with trying to schedule Spark tasks against it is that
the tasks themselves cannot be scheduled locally on nodes containing query
results- which means you can only assume most results will be sent over the
network to the task that needs to process it. This is bad. The other reason
(which is also related to the issue of locality) is that I'm not sure if
there's an easy way to spread the results of a query over multiple
different clients- thus you'd probably have to start your Spark RDD with a
single partition and then repartition. What you've done at that point is
you've taken data from multiple mongodb nodes and you've collected them on
a single node just to re-partition them, again across the network, onto
multiple nodes. This is also bad.

I think this is the reason it was recommended to use MongoDB's mapreduce
because they can use their locality information internally. I had this same
issue w/ Couchbase a couple years back- it's unfortunate but it's the
reality.




On Fri, Sep 11, 2015 at 9:34 AM, Sandeep Giri 
wrote:

> I think it should be possible by loading collections as RDD and then doing
> a union on them.
>
> Regards,
> Sandeep Giri,
> +1 347 781 4573 (US)
> +91-953-899-8962 (IN)
>
> www.KnowBigData.com. 
> Phone: +1-253-397-1945 (Office)
>
> [image: linkedin icon]  [image:
> other site icon]   [image: facebook icon]
>  [image: twitter icon]
>  
>
>
> On Fri, Sep 11, 2015 at 3:40 PM, Mishra, Abhishek <
> abhishek.mis...@xerox.com> wrote:
>
>> Anything using Spark RDD’s ???
>>
>>
>>
>> Abhishek
>>
>>
>>
>> *From:* Sandeep Giri [mailto:sand...@knowbigdata.com]
>> *Sent:* Friday, September 11, 2015 3:19 PM
>> *To:* Mishra, Abhishek; user@spark.apache.org; d...@spark.apache.org
>> *Subject:* Re: MongoDB and Spark
>>
>>
>>
>> use map-reduce.
>>
>>
>>
>> On Fri, Sep 11, 2015, 14:32 Mishra, Abhishek 
>> wrote:
>>
>> Hello ,
>>
>>
>>
>> Is there any way to query multiple collections from mongodb using spark
>> and java.  And i want to create only one Configuration Object. Please help
>> if anyone has something regarding this.
>>
>>
>>
>>
>>
>> Thank You
>>
>> Abhishek
>>
>>
>


Re: What is the reason for ExecutorLostFailure?

2015-08-18 Thread Corey Nolet
Usually more information as to the cause of this will be found down in your
logs. I generally see this happen when an out of memory exception has
occurred for one reason or another on an executor. It's possible your
memory settings are too small per executor or the concurrent number of
tasks you are running are too large for some of the executors. Other times,
it's possible using RDD functions like groupBy() that collect an unbounded
amount of items into memory could be causing it.

Either way, the logs for the executors should be able to give you some
insight, have you looked at those yet?

On Tue, Aug 18, 2015 at 6:26 PM, VIJAYAKUMAR JAWAHARLAL sparkh...@data2o.io
 wrote:

 Hi All

 Why am I getting ExecutorLostFailure and executors are completely lost
 for rest of the processing? Eventually it makes job to fail. One thing for
 sure that lot of shuffling happens across executors in my program.

 Is there a way to understand and debug ExecutorLostFailure? Any pointers
 regarding “ExecutorLostFailure” would help me a lot.

 Thanks
 Vijay



Re: Newbie question: what makes Spark run faster than MapReduce

2015-08-07 Thread Corey Nolet
1) Spark only needs to shuffle when data needs to be partitioned around the
workers in an all-to-all fashion.
2) Multi-stage jobs that would normally require several map reduce jobs,
thus causing data to be dumped to disk between the jobs can be cached in
memory.


SparkConf ignoring keys

2015-08-05 Thread Corey Nolet
I've been using SparkConf on my project for quite some time now to store
configuration information for its various components. This has worked very
well thus far in situations where I have control over the creation of the
SparkContext  the SparkConf.

I have run into a bit of a problem trying to integrate this same approach
to the use of the shell, however. I have a bunch of properties in a
properties file that are shared across several different types of
applications (web containers, etc...) but the SparkConf ignores these
properties because they aren't prefixed with spark.*

Is this really necessary? It's not really stopping people from adding their
own properties and it limits the power of being able to utilize one central
configuration object.


Re: [ Potential bug ] Spark terminal logs say that job has succeeded even though job has failed in Yarn cluster mode

2015-07-28 Thread Corey Nolet
On Tue, Jul 28, 2015 at 2:17 PM, Elkhan Dadashov elkhan8...@gmail.com
wrote:

 Thanks Corey for your answer,

 Do you mean that final status : SUCCEEDED in terminal logs means that
 YARN RM could clean the resources after the application has finished
 (application finishing does not necessarily mean succeeded or failed) ?

 Correct.


 With that logic it totally makes sense.

 Basically the YARN logs does not say anything about the Spark job itself.
 It just says that Spark job resources have been cleaned up after the job
 completed and returned back to Yarn.


If you have log aggregation enabled of your cluster, the yarn log command
should give you any exceptions that were thrown in the driver / executors
when you are running in yarn cluster mode. If you were running in
yarn-client mode, you'd see the errors that caused a job to fail in your
local log (errors that would cause a job to fail will be caught by the
SparkContext on the driver) because the driver is running locally instead
of being deployed in a yarn container. Also, using the Spark HistoryServer
will give you a more visual insight into the exact problems (like which
partitions failed, which executors died trying to process them, etc...)



 It would be great if Yarn logs could also say about the consequence of the
 job, because the user is interested in more about the job final status.


This is just an artifact of running with yarn-cluster mode. It's still easy
enough to run the yarn log command to see all the logs (you can grep for
the node designated as the application master to find any exceptions in
your driver that may show you why your application failed).  The
HistoryServer would still give you enough information after the fact to see
the failures.

Generally, I submit my jobs in yarn-client mode while i'm testing so that I
can spot errors right away. I generally only use yarn-cluster mode for jobs
that are deployed onto operational hardware- that way if a job does fail, I
can still use yarn log to find out why, but I don't need a local process
running on the machine that submitted the job taking up resources (see the
waitForAppCompletion property introduced into Spark 1.4).

I'll also caveat my response and say that I have not used Spark's Python
API so I can only give you a general overview of how the Yarn integration
works from the Scala point of view.


Hope this helps.


 Yarn related logs can be found in RM ,NM, DN, NN log files in detail.

 Thanks again.

 On Mon, Jul 27, 2015 at 7:45 PM, Corey Nolet cjno...@gmail.com wrote:

 Elkhan,

 What does the ResourceManager say about the final status of the job?
 Spark jobs that run as Yarn applications can fail but still successfully
 clean up their resources and give them back to the Yarn cluster. Because of
 this, there's a difference between your code throwing an exception in an
 executor/driver and the Yarn application failing. Generally you'll see a
 yarn application fail when there's a memory problem (too much memory being
 allocated or not enough causing executors to fail multiple times not
 allowing your job to finish).

 What I'm seeing from your post is that you had an exception in your
 application which was caught by the Spark framework which then proceeded to
 clean up the job and shut itself down- which it did successfully. When you
 aren't running in the Yarn modes, you aren't seeing any Yarn status that's
 telling you the Yarn application was successfully shut down, you are just
 seeing the failure(s) from your drivers/executors.



 On Mon, Jul 27, 2015 at 2:11 PM, Elkhan Dadashov elkhan8...@gmail.com
 wrote:

 Any updates on this bug ?

 Why Spark log results  Job final status does not match ? (one saying
 that job has failed, another stating that job has succeeded)

 Thanks.


 On Thu, Jul 23, 2015 at 4:43 PM, Elkhan Dadashov elkhan8...@gmail.com
 wrote:

 Hi all,

 While running Spark Word count python example with intentional mistake
 in *Yarn cluster mode*, Spark terminal states final status as
 SUCCEEDED, but log files state correct results indicating that the job
 failed.

 Why terminal log output  application log output contradict each other ?

 If i run same job on *local mode* then terminal logs and application
 logs match, where both state that job has failed to expected error in
 python script.

 More details: Scenario

 While running Spark Word count python example on *Yarn cluster mode*,
 if I make intentional error in wordcount.py by changing this line (I'm
 using Spark 1.4.1, but this problem exists in Spark 1.4.0 and in 1.3.0
 versions - which i tested):

 lines = sc.textFile(sys.argv[1], 1)

 into this line:

 lines = sc.textFile(*nonExistentVariable*,1)

 where nonExistentVariable variable was never created and initialized.

 then i run that example with this command (I put README.md into HDFS
 before running this command):

 *./bin/spark-submit --master yarn-cluster wordcount.py /README.md*

 The job runs and finishes successfully according the log

Re: [ Potential bug ] Spark terminal logs say that job has succeeded even though job has failed in Yarn cluster mode

2015-07-27 Thread Corey Nolet
Elkhan,

What does the ResourceManager say about the final status of the job?  Spark
jobs that run as Yarn applications can fail but still successfully clean up
their resources and give them back to the Yarn cluster. Because of this,
there's a difference between your code throwing an exception in an
executor/driver and the Yarn application failing. Generally you'll see a
yarn application fail when there's a memory problem (too much memory being
allocated or not enough causing executors to fail multiple times not
allowing your job to finish).

What I'm seeing from your post is that you had an exception in your
application which was caught by the Spark framework which then proceeded to
clean up the job and shut itself down- which it did successfully. When you
aren't running in the Yarn modes, you aren't seeing any Yarn status that's
telling you the Yarn application was successfully shut down, you are just
seeing the failure(s) from your drivers/executors.



On Mon, Jul 27, 2015 at 2:11 PM, Elkhan Dadashov elkhan8...@gmail.com
wrote:

 Any updates on this bug ?

 Why Spark log results  Job final status does not match ? (one saying that
 job has failed, another stating that job has succeeded)

 Thanks.


 On Thu, Jul 23, 2015 at 4:43 PM, Elkhan Dadashov elkhan8...@gmail.com
 wrote:

 Hi all,

 While running Spark Word count python example with intentional mistake in 
 *Yarn
 cluster mode*, Spark terminal states final status as SUCCEEDED, but log
 files state correct results indicating that the job failed.

 Why terminal log output  application log output contradict each other ?

 If i run same job on *local mode* then terminal logs and application
 logs match, where both state that job has failed to expected error in
 python script.

 More details: Scenario

 While running Spark Word count python example on *Yarn cluster mode*, if
 I make intentional error in wordcount.py by changing this line (I'm using
 Spark 1.4.1, but this problem exists in Spark 1.4.0 and in 1.3.0 versions -
 which i tested):

 lines = sc.textFile(sys.argv[1], 1)

 into this line:

 lines = sc.textFile(*nonExistentVariable*,1)

 where nonExistentVariable variable was never created and initialized.

 then i run that example with this command (I put README.md into HDFS
 before running this command):

 *./bin/spark-submit --master yarn-cluster wordcount.py /README.md*

 The job runs and finishes successfully according the log printed in the
 terminal :
 *Terminal logs*:
 ...
 15/07/23 16:19:17 INFO yarn.Client: Application report for
 application_1437612288327_0013 (state: RUNNING)
 15/07/23 16:19:18 INFO yarn.Client: Application report for
 application_1437612288327_0013 (state: RUNNING)
 15/07/23 16:19:19 INFO yarn.Client: Application report for
 application_1437612288327_0013 (state: RUNNING)
 15/07/23 16:19:20 INFO yarn.Client: Application report for
 application_1437612288327_0013 (state: RUNNING)
 15/07/23 16:19:21 INFO yarn.Client: Application report for
 application_1437612288327_0013 (state: FINISHED)
 15/07/23 16:19:21 INFO yarn.Client:
  client token: N/A
  diagnostics: Shutdown hook called before final status was reported.
  ApplicationMaster host: 10.0.53.59
  ApplicationMaster RPC port: 0
  queue: default
  start time: 1437693551439
  final status: *SUCCEEDED*
  tracking URL:
 http://localhost:8088/proxy/application_1437612288327_0013/history/application_1437612288327_0013/1
  user: edadashov
 15/07/23 16:19:21 INFO util.Utils: Shutdown hook called
 15/07/23 16:19:21 INFO util.Utils: Deleting directory
 /tmp/spark-eba0a1b5-a216-4afa-9c54-a3cb67b16444

 But if look at log files generated for this application in HDFS - it
 indicates failure of the job with correct reason:
 *Application log files*:
 ...
 \00 stdout\00 179Traceback (most recent call last):
   File wordcount.py, line 32, in module
 lines = sc.textFile(nonExistentVariable,1)
 *NameError: name 'nonExistentVariable' is not defined*


 Why terminal output - final status: *SUCCEEDED , *is not matching
 application log results - failure of the job (NameError: name
 'nonExistentVariable' is not defined) ?

 Is this bug ? Is there Jira ticket related to this issue ? (Is someone
 assigned to this issue ?)

 If i run this wordcount .py example (with mistake line) in local mode,
 then terminal log states that the job has failed in terminal logs too.

 *./bin/spark-submit wordcount.py /README.md*

 *Terminal logs*:

 ...
 15/07/23 16:31:55 INFO scheduler.EventLoggingListener: Logging events to
 hdfs:///app-logs/local-1437694314943
 Traceback (most recent call last):
   File /home/edadashov/tools/myspark/spark/wordcount.py, line 32, in
 module
 lines = sc.textFile(nonExistentVariable,1)
 NameError: name 'nonExistentVariable' is not defined
 15/07/23 16:31:55 INFO spark.SparkContext: Invoking stop() from shutdown
 hook


 Thanks.




 --

 Best regards,
 Elkhan Dadashov



MapType vs StructType

2015-07-17 Thread Corey Nolet
I notice JSON objects are all parsed as Map[String,Any] in Jackson but for
some reason, the inferSchema tools in Spark SQL extracts the schema of
nested JSON objects as StructTypes.

This makes it really confusing when trying to rectify the object hierarchy
when I have maps because the Catalyst conversion layer underneath is
expecting a Row or Product and not a Map.

Why wasn't MapType used here? Is there any significant difference between
the two of these types that would cause me not to use a MapType when I'm
constructing my own schema representing a set of nested Map[String,_]'s?


Re: MapType vs StructType

2015-07-17 Thread Corey Nolet
This helps immensely. Thanks Michael!

On Fri, Jul 17, 2015 at 4:33 PM, Michael Armbrust mich...@databricks.com
wrote:

 I'll add there is a JIRA to override the default past some threshold of #
 of unique keys: https://issues.apache.org/jira/browse/SPARK-4476
 https://issues.apache.org/jira/browse/SPARK-4476

 On Fri, Jul 17, 2015 at 1:32 PM, Michael Armbrust mich...@databricks.com
 wrote:

 The difference between a map and a struct here is that in a struct all
 possible keys are defined as part of the schema and can each can have a
 different type (and we don't support union types).  JSON doesn't have
 differentiated data structures so we go with the one that gives you more
 information when doing inference by default.  If you pass in a schema to
 JSON however, you can override this and have a JSON object parsed as a map.

 On Fri, Jul 17, 2015 at 11:02 AM, Corey Nolet cjno...@gmail.com wrote:

 I notice JSON objects are all parsed as Map[String,Any] in Jackson but
 for some reason, the inferSchema tools in Spark SQL extracts the schema
 of nested JSON objects as StructTypes.

 This makes it really confusing when trying to rectify the object
 hierarchy when I have maps because the Catalyst conversion layer underneath
 is expecting a Row or Product and not a Map.

 Why wasn't MapType used here? Is there any significant difference
 between the two of these types that would cause me not to use a MapType
 when I'm constructing my own schema representing a set of nested
 Map[String,_]'s?








Re: Post 1.5.3 and 1.6.3

2015-07-06 Thread Corey Nolet
+1 on the happy hour!

On Mon, Jul 6, 2015 at 5:58 PM, Eric Newton eric.new...@gmail.com wrote:

 More importantly, when are we going to have a happy hour to celebrate?

 -Eric


 On Mon, Jul 6, 2015 at 4:04 PM, Josh Elser josh.el...@gmail.com wrote:

  Thanks to the efforts spearheaded by Christopher and verified by everyone
  else, we now have 1.5.3 and 1.6.3 releases!
 
  To keep the ball rolling, what's next? High level questions that come to
  mind...
 
  * When do we do 1.7.1 and/or 1.8.0?
  * What bug-fixes do we have outstanding for 1.7.1?
  * What other minor improvements do people want for 1.8.0?
  * Where does 2.0.0 stand? Should we make a bigger effort to getting the
  new client API stuff Christopher had started into Apache?
 
  Feel free to brainstorm here and/or on JIRA (tagging relevant issues to
  the desired fixVersion)
 
  - Josh
 



Re: map vs mapPartitions

2015-06-25 Thread Corey Nolet
Also,

I've noticed that .map() actually creates a MapPartitionsRDD under the
hood. SO I think the real difference is just in the API that's being
exposed. You can do a map() and not have to think about the partitions at
all or you can do a .mapPartitions() and be able to do things like chunking
of the data in the partition (fetching more than 1 record @ a time).

On Thu, Jun 25, 2015 at 12:19 PM, Corey Nolet cjno...@gmail.com wrote:

 I don't know exactly what's going on under the hood but I would not assume
 that just because a whole partition is not being pulled into memory @ one
 time that that means each record is being pulled at 1 time. That's the
 beauty of exposing Iterators  Iterables in an API rather than collections-
 there's a bunch of buffering that can be hidden from the user to make the
 iterations as efficient as they can be.

 On Thu, Jun 25, 2015 at 11:36 AM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 yes, 1 partition per core and  mapPartitions apply function on each
 partition.

 Question is Does complete partition loads in memory so that function can
 be applied to it or its an iterator and iterator.next() loads next record
 and if yes then how is it efficient than map which also works on 1 record
 at a time.


 Is the only difference is -- only while loop as in below runs per record
 as in map . But code above that will be run once per partition.


 public IterableInteger call(IteratorString input)
 throws Exception {
 ListInteger output = new ArrayListInteger();
 while(input.hasNext()){
 output.add(input.next().length());
  }


 so if I don't have any heavy code above while loop, performance will be
 same as of map function.



 On Thu, Jun 25, 2015 at 6:51 PM, Hao Ren inv...@gmail.com wrote:

 It's not the number of executors that matters, but the # of the CPU
 cores of your cluster.

 Each partition will be loaded on a core for computing.

 e.g. A cluster of 3 nodes has 24 cores, and you divide the RDD in 24
 partitions (24 tasks for narrow dependency).
 Then all the 24 partitions will be loaded to your cluster in parallel,
 one on each core.
 You may notice that some tasks will finish more quickly than others. So
 divide the RDD into (2~3) x (# of cores) for better pipeline performance.
 Say we have 72 partitions in your RDD, then initially 24 tasks run on 24
 cores, then first done first served until all 72 tasks are processed.

 Back to your origin question, map and mapPartitions are both
 transformation, but on different granularity.
 map = apply the function on each record in each partition.
 mapPartitions = apply the function on each partition.
 But the rule is the same, one partition per core.

 Hope it helps.
 Hao




 On Thu, Jun 25, 2015 at 1:28 PM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 say source is HDFS,And file is divided in 10 partitions. so what will
 be  input contains.

 public IterableInteger call(IteratorString input)

 say I have 10 executors in job each having single partition.

 will it have some part of partition or complete. And if some when I
 call input.next() - it will fetch rest or how is it handled ?





 On Thu, Jun 25, 2015 at 3:11 PM, Sean Owen so...@cloudera.com wrote:

 No, or at least, it depends on how the source of the partitions was
 implemented.

 On Thu, Jun 25, 2015 at 12:16 PM, Shushant Arora
 shushantaror...@gmail.com wrote:
  Does mapPartitions keep complete partitions in memory of executor as
  iterable.
 
  JavaRDDString rdd = jsc.textFile(path);
  JavaRDDInteger output = rdd.mapPartitions(new
  FlatMapFunctionIteratorString, Integer() {
 
  public IterableInteger call(IteratorString input)
  throws Exception {
  ListInteger output = new ArrayListInteger();
  while(input.hasNext()){
  output.add(input.next().length());
  }
  return output;
  }
 
  });
 
 
  Here does input is present in memory and can contain complete
 partition of
  gbs ?
  Will this function call(IteratorString input) is called only for
 no of
  partitions(say if I have 10 in this example) times. Not no of lines
  times(say 1000) .
 
 
  And whats the use of mapPartitionsWithIndex ?
 
  Thanks
 





 --
 Hao Ren

 Data Engineer @ leboncoin

 Paris, France






Re: map vs mapPartitions

2015-06-25 Thread Corey Nolet
I don't know exactly what's going on under the hood but I would not assume
that just because a whole partition is not being pulled into memory @ one
time that that means each record is being pulled at 1 time. That's the
beauty of exposing Iterators  Iterables in an API rather than collections-
there's a bunch of buffering that can be hidden from the user to make the
iterations as efficient as they can be.

On Thu, Jun 25, 2015 at 11:36 AM, Shushant Arora shushantaror...@gmail.com
wrote:

 yes, 1 partition per core and  mapPartitions apply function on each
 partition.

 Question is Does complete partition loads in memory so that function can
 be applied to it or its an iterator and iterator.next() loads next record
 and if yes then how is it efficient than map which also works on 1 record
 at a time.


 Is the only difference is -- only while loop as in below runs per record
 as in map . But code above that will be run once per partition.


 public IterableInteger call(IteratorString input)
 throws Exception {
 ListInteger output = new ArrayListInteger();
 while(input.hasNext()){
 output.add(input.next().length());
  }


 so if I don't have any heavy code above while loop, performance will be
 same as of map function.



 On Thu, Jun 25, 2015 at 6:51 PM, Hao Ren inv...@gmail.com wrote:

 It's not the number of executors that matters, but the # of the CPU cores
 of your cluster.

 Each partition will be loaded on a core for computing.

 e.g. A cluster of 3 nodes has 24 cores, and you divide the RDD in 24
 partitions (24 tasks for narrow dependency).
 Then all the 24 partitions will be loaded to your cluster in parallel,
 one on each core.
 You may notice that some tasks will finish more quickly than others. So
 divide the RDD into (2~3) x (# of cores) for better pipeline performance.
 Say we have 72 partitions in your RDD, then initially 24 tasks run on 24
 cores, then first done first served until all 72 tasks are processed.

 Back to your origin question, map and mapPartitions are both
 transformation, but on different granularity.
 map = apply the function on each record in each partition.
 mapPartitions = apply the function on each partition.
 But the rule is the same, one partition per core.

 Hope it helps.
 Hao




 On Thu, Jun 25, 2015 at 1:28 PM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 say source is HDFS,And file is divided in 10 partitions. so what will be
  input contains.

 public IterableInteger call(IteratorString input)

 say I have 10 executors in job each having single partition.

 will it have some part of partition or complete. And if some when I call
 input.next() - it will fetch rest or how is it handled ?





 On Thu, Jun 25, 2015 at 3:11 PM, Sean Owen so...@cloudera.com wrote:

 No, or at least, it depends on how the source of the partitions was
 implemented.

 On Thu, Jun 25, 2015 at 12:16 PM, Shushant Arora
 shushantaror...@gmail.com wrote:
  Does mapPartitions keep complete partitions in memory of executor as
  iterable.
 
  JavaRDDString rdd = jsc.textFile(path);
  JavaRDDInteger output = rdd.mapPartitions(new
  FlatMapFunctionIteratorString, Integer() {
 
  public IterableInteger call(IteratorString input)
  throws Exception {
  ListInteger output = new ArrayListInteger();
  while(input.hasNext()){
  output.add(input.next().length());
  }
  return output;
  }
 
  });
 
 
  Here does input is present in memory and can contain complete
 partition of
  gbs ?
  Will this function call(IteratorString input) is called only for no
 of
  partitions(say if I have 10 in this example) times. Not no of lines
  times(say 1000) .
 
 
  And whats the use of mapPartitionsWithIndex ?
 
  Thanks
 





 --
 Hao Ren

 Data Engineer @ leboncoin

 Paris, France





Reducer memory usage

2015-06-21 Thread Corey Nolet
I've seen a few places where it's been mentioned that after a shuffle each
reducer needs to pull its partition into memory in its entirety. Is this
true? I'd assume the merge sort that needs to be done (in the cases where
sortByKey() is not used) wouldn't need to pull all of the data into memory
at once... is it the sort for the sortByKey() that requires this to be done?


Re: Grouping elements in a RDD

2015-06-20 Thread Corey Nolet
If you use rdd.mapPartitions(), you'll be able to get a hold of the
iterators for each partiton. Then you should be able to do
iterator.grouped(size) on each of the partitions. I think it may mean you
have 1 element at the end of each partition that may have less than size
elements. If that's okay for you then that should work.

On Sat, Jun 20, 2015 at 7:48 PM, Brandon White bwwintheho...@gmail.com
wrote:

 How would you do a .grouped(10) on a RDD, is it possible? Here is an
 example for a Scala list

 scala List(1,2,3,4).grouped(2).toList
 res1: List[List[Int]] = List(List(1, 2), List(3, 4))

 Would like to group n elements.



Re: Shuffle produces one huge partition and many tiny partitions

2015-06-18 Thread Corey Nolet
Sorry Du,

Repartition means coalesce(shuffle = true) as per [1]. They are the same
operation. Coalescing with shuffle = false means you are specifying the max
amount of partitions after the coalesce (if there are less partitions you
will end up with the lesser amount.


[1]
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L341


On Thu, Jun 18, 2015 at 7:55 PM, Du Li l...@yahoo-inc.com.invalid wrote:

 repartition() means coalesce(shuffle=false)



   On Thursday, June 18, 2015 4:07 PM, Corey Nolet cjno...@gmail.com
 wrote:


 Doesn't repartition call coalesce(shuffle=true)?
 On Jun 18, 2015 6:53 PM, Du Li l...@yahoo-inc.com.invalid wrote:

 I got the same problem with rdd,repartition() in my streaming app, which
 generated a few huge partitions and many tiny partitions. The resulting
 high data skew makes the processing time of a batch unpredictable and often
 exceeding the batch interval. I eventually solved the problem by using
 rdd.coalesce() instead, which however is expensive as it yields a lot of
 shuffle traffic and also takes a long time.

 Du



   On Thursday, June 18, 2015 1:00 AM, Al M alasdair.mcbr...@gmail.com
 wrote:


 Thanks for the suggestion.  Repartition didn't help us unfortunately.  It
 still puts everything into the same partition.

 We did manage to improve the situation by making a new partitioner that
 extends HashPartitioner.  It treats certain exception keys differently.
 These keys that are known to appear very often are assigned random
 partitions instead of using the existing partitioning mechanism.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-produces-one-huge-partition-and-many-tiny-partitions-tp23358p23387.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org








Coalescing with shuffle = false in imbalanced cluster

2015-06-18 Thread Corey Nolet
I'm confused about this. The comment  on the function seems to indicate
that there is absolutely no shuffle or network IO but it also states that
it assigns an even number of parent partitions to each final partition
group. I'm having trouble seeing how this can be guaranteed without some
data passing around nodes.

For instance, lets saying I have 5 machines and 10 partitions but the way
the partitions are layed out is machines 1, 2, and 3 each have 3 partitions
while machine 4 only has 1 partition and machine 5 has none. Am I to assume
that coalesce(4, false) will the 3 partitions on nodes 1, 2, and 3 each to
1 partition while node 4 will just remain 1 partition?

Thanks.


Re: Is there programmatic way running Spark job on Yarn cluster without using spark-submit script ?

2015-06-18 Thread Corey Nolet
 This is not independent programmatic way of running of Spark job on Yarn
cluster.

The example I created simply demonstrates how to wire up the classpath so
that spark submit can be called programmatically. For my use case, I wanted
to hold open a connection so I could send tasks to the executors on demand.
If you were to submit this via yarn-cluster mode, it would only require any
extra files be placed on the executors, if needed.

On Wed, Jun 17, 2015 at 9:01 PM, Elkhan Dadashov elkhan8...@gmail.com
wrote:

 This is not independent programmatic way of running of Spark job on Yarn
 cluster.

 That example demonstrates running on *Yarn-client* mode, also will be
 dependent of Jetty. Users writing Spark programs do not want to depend on
 that.

 I found this SparkLauncher class introduced in Spark 1.4 version (
 https://github.com/apache/spark/tree/master/launcher) which allows
 running Spark jobs in programmatic way.

 SparkLauncher exists in Java and Scala APIs, but I could not find in
 Python API.

 Did not try it yet, but seems promising.

 Example:

 import org.apache.spark.launcher.SparkLauncher;

 public class MyLauncher {

 public static void main(String[] args) throws Exception {

  Process spark = new SparkLauncher()

.setAppResource(/my/app.jar)

.setMainClass(my.spark.app.Main)

.setMaster(local)

.setConf(SparkLauncher.DRIVER_MEMORY, 2g)

 .launch();

   spark.waitFor();

}

   }

 }



 On Wed, Jun 17, 2015 at 5:51 PM, Corey Nolet cjno...@gmail.com wrote:

 An example of being able to do this is provided in the Spark Jetty Server
 project [1]

 [1] https://github.com/calrissian/spark-jetty-server

 On Wed, Jun 17, 2015 at 8:29 PM, Elkhan Dadashov elkhan8...@gmail.com
 wrote:

 Hi all,

 Is there any way running Spark job in programmatic way on Yarn cluster
 without using spark-submit script ?

 I cannot include Spark jars on my Java application (due o dependency
 conflict and other reasons), so I'll be shipping Spark assembly uber jar
 (spark-assembly-1.3.1-hadoop2.3.0.jar) to Yarn cluster, and then execute
 job (Python or Java) on Yarn-cluster.

 So is there any way running Spark job implemented in python file/Java
 class without calling it through spark-submit script ?

 Thanks.






 --

 Best regards,
 Elkhan Dadashov



Re: Shuffle produces one huge partition and many tiny partitions

2015-06-18 Thread Corey Nolet
Doesn't repartition call coalesce(shuffle=true)?
On Jun 18, 2015 6:53 PM, Du Li l...@yahoo-inc.com.invalid wrote:

 I got the same problem with rdd,repartition() in my streaming app, which
 generated a few huge partitions and many tiny partitions. The resulting
 high data skew makes the processing time of a batch unpredictable and often
 exceeding the batch interval. I eventually solved the problem by using
 rdd.coalesce() instead, which however is expensive as it yields a lot of
 shuffle traffic and also takes a long time.

 Du



   On Thursday, June 18, 2015 1:00 AM, Al M alasdair.mcbr...@gmail.com
 wrote:


 Thanks for the suggestion.  Repartition didn't help us unfortunately.  It
 still puts everything into the same partition.

 We did manage to improve the situation by making a new partitioner that
 extends HashPartitioner.  It treats certain exception keys differently.
 These keys that are known to appear very often are assigned random
 partitions instead of using the existing partitioning mechanism.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-produces-one-huge-partition-and-many-tiny-partitions-tp23358p23387.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Executor memory allocations

2015-06-17 Thread Corey Nolet
So I've seen in the documentation that (after the overhead memory is
subtracted), the memory allocations of each executor are as follows (assume
default settings):

60% for cache
40% for tasks to process data


Reading about how Spark implements shuffling, I've also seen it say 20% of
executor memory is utilized for shuffles Does this 20% cut into the 40%
for tasks to process data or the 60% for the data cache?


Re: Is there programmatic way running Spark job on Yarn cluster without using spark-submit script ?

2015-06-17 Thread Corey Nolet
An example of being able to do this is provided in the Spark Jetty Server
project [1]

[1] https://github.com/calrissian/spark-jetty-server

On Wed, Jun 17, 2015 at 8:29 PM, Elkhan Dadashov elkhan8...@gmail.com
wrote:

 Hi all,

 Is there any way running Spark job in programmatic way on Yarn cluster
 without using spark-submit script ?

 I cannot include Spark jars on my Java application (due o dependency
 conflict and other reasons), so I'll be shipping Spark assembly uber jar
 (spark-assembly-1.3.1-hadoop2.3.0.jar) to Yarn cluster, and then execute
 job (Python or Java) on Yarn-cluster.

 So is there any way running Spark job implemented in python file/Java
 class without calling it through spark-submit script ?

 Thanks.





Using spark.hadoop.* to set Hadoop properties

2015-06-17 Thread Corey Nolet
I've become accustomed to being able to use system properties to override
properties in the Hadoop Configuration objects. I just recently noticed
that when Spark creates the Hadoop Configuraiton in the SparkContext, it
cycles through any properties prefixed with spark.hadoop. and add those
properties to the Hadoop Configuration (minus the spark.hadoop.). I don't
see this advertised anywhere in the documentation. Is this a method that is
supposed to be public to users? If so, should we add that to the
documentation?


Fully in-memory shuffles

2015-06-10 Thread Corey Nolet
Is it possible to configure Spark to do all of its shuffling FULLY in
memory (given that I have enough memory to store all the data)?


Re: Fully in-memory shuffles

2015-06-10 Thread Corey Nolet
So with this... to help my understanding of Spark under the hood-

Is this statement correct When data needs to pass between multiple JVMs, a
shuffle will *always* hit disk?

On Wed, Jun 10, 2015 at 10:11 AM, Josh Rosen rosenvi...@gmail.com wrote:

 There's a discussion of this at https://github.com/apache/spark/pull/5403



 On Wed, Jun 10, 2015 at 7:08 AM, Corey Nolet cjno...@gmail.com wrote:

 Is it possible to configure Spark to do all of its shuffling FULLY in
 memory (given that I have enough memory to store all the data)?







Re: Fully in-memory shuffles

2015-06-10 Thread Corey Nolet
Ok so it is the case that small shuffles can be done without hitting any
disk. Is this the same case for the aux shuffle service in yarn? Can that
be done without hitting disk?

On Wed, Jun 10, 2015 at 9:17 PM, Patrick Wendell pwend...@gmail.com wrote:

 In many cases the shuffle will actually hit the OS buffer cache and
 not ever touch spinning disk if it is a size that is less than memory
 on the machine.

 - Patrick

 On Wed, Jun 10, 2015 at 5:06 PM, Corey Nolet cjno...@gmail.com wrote:
  So with this... to help my understanding of Spark under the hood-
 
  Is this statement correct When data needs to pass between multiple
 JVMs, a
  shuffle will always hit disk?
 
  On Wed, Jun 10, 2015 at 10:11 AM, Josh Rosen rosenvi...@gmail.com
 wrote:
 
  There's a discussion of this at
 https://github.com/apache/spark/pull/5403
 
 
 
  On Wed, Jun 10, 2015 at 7:08 AM, Corey Nolet cjno...@gmail.com wrote:
 
  Is it possible to configure Spark to do all of its shuffling FULLY in
  memory (given that I have enough memory to store all the data)?
 
 
 
 
 



Re: yarn-cluster spark-submit process not dying

2015-05-28 Thread Corey Nolet
Thanks Sandy- I was digging through the code in the deploy.yarn.Client and
literally found that property right before I saw your reply. I'm on 1.2.x
right now which doesn't have the property. I guess I need to update sooner
rather than later.

On Thu, May 28, 2015 at 3:56 PM, Sandy Ryza sandy.r...@cloudera.com wrote:

 Hi Corey,

 As of this PR https://github.com/apache/spark/pull/5297/files, this can
 be controlled with spark.yarn.submit.waitAppCompletion.

 -Sandy

 On Thu, May 28, 2015 at 11:48 AM, Corey Nolet cjno...@gmail.com wrote:

 I am submitting jobs to my yarn cluster via the yarn-cluster mode and I'm
 noticing the jvm that fires up to allocate the resources, etc... is not
 going away after the application master and executors have been allocated.
 Instead, it just sits there printing 1 second status updates to the
 console. If I kill it, my job still runs (as expected).

 Is there an intended way to stop this from happening and just have the
 local JVM die when it's done allocating the resources and deploying the
 application master?





yarn-cluster spark-submit process not dying

2015-05-28 Thread Corey Nolet
I am submitting jobs to my yarn cluster via the yarn-cluster mode and I'm
noticing the jvm that fires up to allocate the resources, etc... is not
going away after the application master and executors have been allocated.
Instead, it just sits there printing 1 second status updates to the
console. If I kill it, my job still runs (as expected).

Is there an intended way to stop this from happening and just have the
local JVM die when it's done allocating the resources and deploying the
application master?


[jira] [Commented] (ACCUMULO-1444) Single Node Accumulo to start the tracer

2015-05-26 Thread Corey Nolet (JIRA)

[ 
https://issues.apache.org/jira/browse/ACCUMULO-1444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14560353#comment-14560353
 ] 

Corey Nolet commented on ACCUMULO-1444:
---

My apologies for the late reply. Christopher is correct.

On Tue, May 26, 2015 at 5:48 PM, Christopher Tubbs (JIRA) j...@apache.org



 Single Node Accumulo to start the tracer
 

 Key: ACCUMULO-1444
 URL: https://issues.apache.org/jira/browse/ACCUMULO-1444
 Project: Accumulo
  Issue Type: Sub-task
  Components: mini
Reporter: Corey J. Nolet
 Fix For: 1.8.0






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Jr. to Mid Level Big Data jobs in Bay Area

2015-05-17 Thread Corey Nolet
Agreed.

Apache user lists archive questions and answers specifically for the
purpose of helping the larger community navigate its projects. It is not a
place for classifieds and employment information.

On Sun, May 17, 2015 at 9:24 PM, Billy Watson williamrwat...@gmail.com
wrote:

 Uh, it's not about being tolerant. It's the wrong forum for it. There's
 enough chatter on here and there are 50 job boards all over the
 internetwebs. Please use the proper forums.

 William Watson
 Software Engineer
 (904) 705-7056 PCS

 On Sun, May 17, 2015 at 9:14 PM, Juan Suero juan.su...@gmail.com wrote:

 Hes a human asking for human advice.. its ok methinks.
 we should live in a more tolerant world.
 Thanks.

 On Sun, May 17, 2015 at 8:10 PM, Stephen Boesch java...@gmail.com
 wrote:

 Hi,  This is not a job board. Thanks.

 2015-05-17 16:00 GMT-07:00 Adam Pritchard apritchard...@gmail.com:

 Hi everyone,

 I was wondering if any of you know any openings looking to hire a big
 data dev in the Palo Alto area.

 Main thing I am looking for is to be on a team that will embrace having
 a Jr to Mid level big data developer, where I can grow my skill set and
 contribute.


 My skills are:

 3 years Java
 1.5 years Hadoop
 1.5 years Hbase
 1 year map reduce
 1 year Apache Storm
 1 year Apache Spark (did a Spark Streaming project in Scala)

 5 years PHP
 3 years iOS development
 4 years Amazon ec2 experience


 Currently I am working in San Francisco as a big data developer, but
 the team I'm on is content leaving me work that I already knew how to do
 when I came to the team (web services) and I want to work with big data
 technologies at least 70% of the time.


 I am not a senior big data dev, but I am motivated to be and am just
 looking for an opportunity where I can work all day or most of the day with
 big data technologies, and contribute and learn from the project at hand.


 Thanks if anyone can share any information,


 Adam








KafkaServer in integration test not properly assigning to leaders to partitions

2015-05-14 Thread Corey Nolet
I'm firing up a KafkaServer (using some EmbeddedKafkaBroker code that I
found on Github) so that I can run an end-to-end test ingesting data
through a kafka topic with consumers in Spark Streaming pushing to
Accumulo.

Thus far, my code is doing this:

1) Creating a MiniAccumuloCluster and KafkaServer (using the zookeepers
from the mini Accumulo cluster)
2) Creating a topic using AdminUtil
3) Starting up a Spark streaming context using a Kafka stream that puts all
data into Accumulo
4) Creating a producer and sending a message to the Kafka topic.


Looking @ the topic metadata in zookeeper after the topic is created, let's
say testtopic, I never see the metadata for a leader in
/brokers/topics/testtopic/partitions show up. If I understand correctly,
creating a topic does this:

1) Adds a persistent node into Zookeeper with some json data to denote the
topic's name as well as the partitions and the list of each broker id for
each partition.
2) I am still in the process of digging into this part but I think the
first item in the list of replicas for each partition is used to define the
initial leader and the leader is notified via a watcher and told to create
an ephemeral node so that it can know when that node goes down to assign
another.

If I'm correct about #2, it seems like that watcher is never being invoked.
Any attempt to produce to the topic just returns an error back to the
producer that says there was no leader selected.

Anything advice would be much appreciated. I really would like to get our
stack tested fully through automated testing and Kafka is the last piece we
need to assemble.


Re: KafkaServer in integration test not properly assigning to leaders to partitions

2015-05-14 Thread Corey Nolet
I raised the log levels to try to figure out what happens. I see log
statements on the broker stating:

New topic creation callback for 
New partition creation callback for 
Invoking state change to NewPartition for partitions 
Invoking state change to OnlinePartitions for partitions 
Error while fetching metadata for partition [testtopic, 0]
kafka.common.LeaderNotAvailableExzception: No leader exists for partition
0...

I'm not sure what's happening between the time I create my topic and the
time the broker sees that it needs to add the partition assignment to
zookeeper with itself as the leader but it's strange that the log messages
above seem like they are missing the data. New topic creation callback for
 seems like it should be listing a topic and not blank.

Any ideas?

On Thu, May 14, 2015 at 1:00 PM, Corey Nolet cjno...@gmail.com wrote:

 I'm firing up a KafkaServer (using some EmbeddedKafkaBroker code that I
 found on Github) so that I can run an end-to-end test ingesting data
 through a kafka topic with consumers in Spark Streaming pushing to
 Accumulo.

 Thus far, my code is doing this:

 1) Creating a MiniAccumuloCluster and KafkaServer (using the zookeepers
 from the mini Accumulo cluster)
 2) Creating a topic using AdminUtil
 3) Starting up a Spark streaming context using a Kafka stream that puts
 all data into Accumulo
 4) Creating a producer and sending a message to the Kafka topic.


 Looking @ the topic metadata in zookeeper after the topic is created,
 let's say testtopic, I never see the metadata for a leader in
 /brokers/topics/testtopic/partitions show up. If I understand correctly,
 creating a topic does this:

 1) Adds a persistent node into Zookeeper with some json data to denote the
 topic's name as well as the partitions and the list of each broker id for
 each partition.
 2) I am still in the process of digging into this part but I think the
 first item in the list of replicas for each partition is used to define the
 initial leader and the leader is notified via a watcher and told to create
 an ephemeral node so that it can know when that node goes down to assign
 another.

 If I'm correct about #2, it seems like that watcher is never being
 invoked. Any attempt to produce to the topic just returns an error back to
 the producer that says there was no leader selected.

 Anything advice would be much appreciated. I really would like to get our
 stack tested fully through automated testing and Kafka is the last piece we
 need to assemble.





Re: KafkaServer in integration test not properly assigning to leaders to partitions

2015-05-14 Thread Corey Nolet
I think I figured out what the problem is, though I'm not sure how to fix
it.


I've managed to debug through the embedded broker's callback for the
TopicChangeListener#handleChildChange() int he PartitionStateMachine class.

The following line from that function that's failing look this:

val addedPartitionReplicaAssignment =
ZKUtils.getReplicaAssignmentForTopics(zkClient, newTopics.toSeq)

Inside the getReplicaAssignmentForTopics() it is pulling back a json blob
from the /brokers/topics/testtopic znode's data and it appears the json
blob has some extra bytes @ the beginning of it that are making it
unparseable once pulled from zookeeper.

Any ideas to what this could be? I'm using 0.8.2.0- this is really what's
holding me back right now from getting my tests functional.


On Thu, May 14, 2015 at 4:29 PM, Corey Nolet cjno...@gmail.com wrote:

 I raised the log levels to try to figure out what happens. I see log
 statements on the broker stating:

 New topic creation callback for 
 New partition creation callback for 
 Invoking state change to NewPartition for partitions 
 Invoking state change to OnlinePartitions for partitions 
 Error while fetching metadata for partition [testtopic, 0]
 kafka.common.LeaderNotAvailableExzception: No leader exists for partition
 0...

 I'm not sure what's happening between the time I create my topic and the
 time the broker sees that it needs to add the partition assignment to
 zookeeper with itself as the leader but it's strange that the log messages
 above seem like they are missing the data. New topic creation callback for
  seems like it should be listing a topic and not blank.

 Any ideas?

 On Thu, May 14, 2015 at 1:00 PM, Corey Nolet cjno...@gmail.com wrote:

 I'm firing up a KafkaServer (using some EmbeddedKafkaBroker code that I
 found on Github) so that I can run an end-to-end test ingesting data
 through a kafka topic with consumers in Spark Streaming pushing to
 Accumulo.

 Thus far, my code is doing this:

 1) Creating a MiniAccumuloCluster and KafkaServer (using the zookeepers
 from the mini Accumulo cluster)
 2) Creating a topic using AdminUtil
 3) Starting up a Spark streaming context using a Kafka stream that puts
 all data into Accumulo
 4) Creating a producer and sending a message to the Kafka topic.


 Looking @ the topic metadata in zookeeper after the topic is created,
 let's say testtopic, I never see the metadata for a leader in
 /brokers/topics/testtopic/partitions show up. If I understand correctly,
 creating a topic does this:

 1) Adds a persistent node into Zookeeper with some json data to denote
 the topic's name as well as the partitions and the list of each broker id
 for each partition.
 2) I am still in the process of digging into this part but I think the
 first item in the list of replicas for each partition is used to define the
 initial leader and the leader is notified via a watcher and told to create
 an ephemeral node so that it can know when that node goes down to assign
 another.

 If I'm correct about #2, it seems like that watcher is never being
 invoked. Any attempt to produce to the topic just returns an error back to
 the producer that says there was no leader selected.

 Anything advice would be much appreciated. I really would like to get our
 stack tested fully through automated testing and Kafka is the last piece we
 need to assemble.






Re: KafkaServer in integration test not properly assigning to leaders to partitions

2015-05-14 Thread Corey Nolet
Json encoded blob definitely appears to be going in as a json string. The
partition assignment json seems to be the only thing that is being prefixed
by these bytes. Any ideas?

On Thu, May 14, 2015 at 5:17 PM, Corey Nolet cjno...@gmail.com wrote:

 I think I figured out what the problem is, though I'm not sure how to fix
 it.


 I've managed to debug through the embedded broker's callback for the
 TopicChangeListener#handleChildChange() int he PartitionStateMachine class.

 The following line from that function that's failing look this:

 val addedPartitionReplicaAssignment =
 ZKUtils.getReplicaAssignmentForTopics(zkClient, newTopics.toSeq)

 Inside the getReplicaAssignmentForTopics() it is pulling back a json blob
 from the /brokers/topics/testtopic znode's data and it appears the json
 blob has some extra bytes @ the beginning of it that are making it
 unparseable once pulled from zookeeper.

 Any ideas to what this could be? I'm using 0.8.2.0- this is really what's
 holding me back right now from getting my tests functional.


 On Thu, May 14, 2015 at 4:29 PM, Corey Nolet cjno...@gmail.com wrote:

 I raised the log levels to try to figure out what happens. I see log
 statements on the broker stating:

 New topic creation callback for 
 New partition creation callback for 
 Invoking state change to NewPartition for partitions 
 Invoking state change to OnlinePartitions for partitions 
 Error while fetching metadata for partition [testtopic, 0]
 kafka.common.LeaderNotAvailableExzception: No leader exists for partition
 0...

 I'm not sure what's happening between the time I create my topic and the
 time the broker sees that it needs to add the partition assignment to
 zookeeper with itself as the leader but it's strange that the log messages
 above seem like they are missing the data. New topic creation callback for
  seems like it should be listing a topic and not blank.

 Any ideas?

 On Thu, May 14, 2015 at 1:00 PM, Corey Nolet cjno...@gmail.com wrote:

 I'm firing up a KafkaServer (using some EmbeddedKafkaBroker code that I
 found on Github) so that I can run an end-to-end test ingesting data
 through a kafka topic with consumers in Spark Streaming pushing to
 Accumulo.

 Thus far, my code is doing this:

 1) Creating a MiniAccumuloCluster and KafkaServer (using the zookeepers
 from the mini Accumulo cluster)
 2) Creating a topic using AdminUtil
 3) Starting up a Spark streaming context using a Kafka stream that puts
 all data into Accumulo
 4) Creating a producer and sending a message to the Kafka topic.


 Looking @ the topic metadata in zookeeper after the topic is created,
 let's say testtopic, I never see the metadata for a leader in
 /brokers/topics/testtopic/partitions show up. If I understand correctly,
 creating a topic does this:

 1) Adds a persistent node into Zookeeper with some json data to denote
 the topic's name as well as the partitions and the list of each broker id
 for each partition.
 2) I am still in the process of digging into this part but I think the
 first item in the list of replicas for each partition is used to define the
 initial leader and the leader is notified via a watcher and told to create
 an ephemeral node so that it can know when that node goes down to assign
 another.

 If I'm correct about #2, it seems like that watcher is never being
 invoked. Any attempt to produce to the topic just returns an error back to
 the producer that says there was no leader selected.

 Anything advice would be much appreciated. I really would like to get
 our stack tested fully through automated testing and Kafka is the last
 piece we need to assemble.







Re: 1.5.3 and 1.6.3

2015-05-12 Thread Corey Nolet
I can get a 1.6.3 together.

On Tue, May 12, 2015 at 2:04 PM, Christopher ctubb...@apache.org wrote:

 Sure, we can discuss that separately. I'll start a new thread.

 --
 Christopher L Tubbs II
 http://gravatar.com/ctubbsii


 On Tue, May 12, 2015 at 1:58 PM, Sean Busbey bus...@cloudera.com wrote:
  let's please have a labeled [DISCUSS] thread on when and how to EOL 1.5.
 
  On Tue, May 12, 2015 at 12:55 PM, Christopher ctubb...@apache.org
 wrote:
 
  And, whether or not we release 1.5.3, I do think we should consider
  closing out development on that branch after 1.7.0 is released.
  Anybody have any thoughts on that?
 
  --
  Christopher L Tubbs II
  http://gravatar.com/ctubbsii
 
 
  On Tue, May 12, 2015 at 1:48 PM, Christopher ctubb...@apache.org
 wrote:
   I'd like to think about releasing 1.5.3 and 1.6.3, since there are 75
   and 82 commits in those branches, presumably fixing a lot of bugs.
  
   Is anybody willing to act as release manager for either of these and
   prepare the RCs? Perhaps somebody who hasn't already done some
   releases who wants to try?
  
   --
   Christopher L Tubbs II
   http://gravatar.com/ctubbsii
 
 
 
 
  --
  Sean



Re: 1.5.3 and 1.6.3

2015-05-12 Thread Corey Nolet
That is, unless any of the new committers would like to take it on- in that
case, I can help ;-)

On Tue, May 12, 2015 at 3:41 PM, Corey Nolet cjno...@gmail.com wrote:

 I can get a 1.6.3 together.


 On Tue, May 12, 2015 at 2:04 PM, Christopher ctubb...@apache.org wrote:

 Sure, we can discuss that separately. I'll start a new thread.

 --
 Christopher L Tubbs II
 http://gravatar.com/ctubbsii


 On Tue, May 12, 2015 at 1:58 PM, Sean Busbey bus...@cloudera.com wrote:
  let's please have a labeled [DISCUSS] thread on when and how to EOL 1.5.
 
  On Tue, May 12, 2015 at 12:55 PM, Christopher ctubb...@apache.org
 wrote:
 
  And, whether or not we release 1.5.3, I do think we should consider
  closing out development on that branch after 1.7.0 is released.
  Anybody have any thoughts on that?
 
  --
  Christopher L Tubbs II
  http://gravatar.com/ctubbsii
 
 
  On Tue, May 12, 2015 at 1:48 PM, Christopher ctubb...@apache.org
 wrote:
   I'd like to think about releasing 1.5.3 and 1.6.3, since there are 75
   and 82 commits in those branches, presumably fixing a lot of bugs.
  
   Is anybody willing to act as release manager for either of these and
   prepare the RCs? Perhaps somebody who hasn't already done some
   releases who wants to try?
  
   --
   Christopher L Tubbs II
   http://gravatar.com/ctubbsii
 
 
 
 
  --
  Sean





Blocking DStream.forEachRDD()

2015-05-07 Thread Corey Nolet
Is this somehtign I can do. I am using a FileOutputFormat inside of the
foreachRDD call. After the input format runs, I want to do some directory
cleanup and I want to block while I'm doing that. Is that something I can
do inside of this function? If not, where would I accomplish this on every
micro-batched itnerval?


Re: Blocking DStream.forEachRDD()

2015-05-07 Thread Corey Nolet
It does look the function that's executed is in the driver so doing an
Await.result() on a thread AFTER i've executed an action should work. Just
updating this here in case anyone has this question in the future.
Is this somehtign I can do. I am using a FileOutputFormat inside of the
foreachRDD call. After the input format runs, I want to do some directory
cleanup and I want to block while I'm doing that. Is that something I can
do inside of this function? If not, where would I accomplish this on every
micro-batched itnerval?


Re: Running boolean or queries on accumulo

2015-04-30 Thread Corey Nolet
Vaibnav,

The difference in an OR iterator is that you will want it to return a
single key for all of the given OR terms so that the iterator in the stack
above it would see it was a single hit. It's essentially a merge at the
key level to stop duplicate results from being returned (thus appearing as
duplicate documents matching the criteria). A high level description of the
the intersecting iterator is that it uses a collection of internal
iterators to seek through partitions finding qualifiers (doc ids) where the
families (terms) all match the terms in the intersections. If, at any
point, all internal iterators are able to return top keys that have the
same qualifiers, then the intersection was successful and the event with
that id can be returned.

There used to be a project on github called Accumulo Wikisearch which
established a boolean logic iterator which would construct a tree out of
intersecting iterators and or iterators. To my knowledge, the wikisearch
iterators were removed from github as a sister repository because they
weren't being actively maintained. The logic behind the iterators could get
quite complex as well but, as far as I'm concerned, they can perform some
magic in the realm of scalable document query.

We took in the wikisearch iterators in the Accumulo Recipes project [1] and
attempted to refactor them into something that can be a little easier to
follow and augment. We've done quite a bit of this but there's still a lot
more to do.  We've built a planning/optimization layer as well. Perhaps
they could serve as an example for you as you build your own query layer.
Of course you're also welcome to jump in and help out on the Accumulo
Recipes as well.


[1]
https://github.com/calrissian/accumulo-recipes/tree/master/store/event-store




On Thu, Apr 30, 2015 at 2:40 PM, Eric Newton eric.new...@gmail.com wrote:

 You can transform or queries into separate queries and run them in
 parallel.

 Looking for A  (B|C) is the same as looking for (AB) | (AC). Just run
 two different queries and merge the results.

 Of course it can get a lot more complicated... you can spend the rest of
 your life on query optimization.

 -Eric


 On Thu, Apr 30, 2015 at 1:49 PM, vaibhav thapliyal 
 vaibhav.thapliyal...@gmail.com wrote:

 Hi

 I was trying to run boolean and queries and successfully did so using the
 intersecting iterator. Can I tweak this iterator to successfully run
 boolean OR queries or should I consider making a iterator from scratch for
 this purpose.  Could anyone so give me brief overview about the logic
 inside the intersecting iterator so that the modification part becomes
 easier. I have a document partitioned index table as described in the
 documentation.

 Thanks
 Vaibhav





Re: real time Query engine Spark-SQL on Hbase

2015-04-30 Thread Corey Nolet
A tad off topic, but could still be relevant.

Accumulo's design is a tad different in the realm of being able to shard
and perform set intersections/unions server-side (through seeks). I've got
an adapter for Spark SQL on top of a document store implementation in
Accumulo that accepts the push-down predicates and actually performs query
on the tablet servers. This strategy may be useful to you [1].

[1]
https://github.com/calrissian/accumulo-recipes/blob/master/thirdparty/spark/src/test/scala/org/calrissian/accumulorecipes/spark/sql/EventStoreFilteredTest.scala

On Thu, Apr 30, 2015 at 10:54 AM, Ted Yu yuzhih...@gmail.com wrote:

 bq. a single query on one filter criteria

 Can you tell us more about your filter ? How selective is it ?

 Which hbase release are you using ?

 Cheers

 On Thu, Apr 30, 2015 at 7:23 AM, Siddharth Ubale 
 siddharth.ub...@syncoms.com wrote:

  Hi,



 I want to use Spark as Query engine on HBase with sub second latency.



 I am  using Spark 1.3  version. And followed the steps below on Hbase
 table with around 3.5 lac rows :



 *1.   *Mapped the Dataframe to Hbase table .RDDCustomers maps to the
 hbase table which is used to create the Dataframe.

 *“ DataFrame schemaCustomers = sqlInstance*

 *
 .createDataFrame(SparkContextImpl.getRddCustomers(),*

 *
 Customers.class);” *

 2.   Used registertemp table i.e”
 *schemaCustomers.registerTempTable(customers);”*

 3.   Running the query on Dataframe using Sqlcontext Instance.



 What I am observing is that for a single query on one filter criteria the
 query is taking 7-8 seconds? And the time increases as I am increasing the
 number of rows in Hbase table. Also, there was one time when I was getting
 query response under 1-2 seconds. Seems like strange behavior.

 Is this expected behavior from Spark or am I missing something here?

 Can somebody help me understand this scenario . Please assist.



 Thanks,

 Siddharth Ubale,







Re: Q4A Project

2015-04-27 Thread Corey Nolet
Andrew,

Have you considered leveraging existing SQL query layers like Hive or
Spark's SQL/DataFrames API? There are some pretty massive optimizations
involved in that API making the push-down predicates / selections pretty
easy to adapt for Accumulo.

On Mon, Apr 27, 2015 at 8:37 PM, Andrew Wells awe...@clearedgeit.com
wrote:

 I have been working on a project, tentatively called Q4A (Query for
 Accumulo). Another possible name is ASQ (Accumulo Streaming Query) [discus].

 This is a streaming query as the query is completed via a stream, should
 never group data in memory. To batch, intermediate results would be written
 back to Accumulo temporarily.


 The *primary goal* is to have a complete SQL implementation native to
 Accumulo.

 *Why do this?*
 I am getting tired of writing bad java code to query a database. I would
 rather write bad SQL code. Also, people should be able to get queries out
 faster and it shouldn't take a developer.


 *Native To Accumulo*:

- There should be no special format to read a database created by Q4A
- There should be no special format for Q4A to query a table
- All tables are tables available to Q4A
- Any special tables, are stored away from the users databases
(indexes, column definitions, etc)

 *Other Goals*:

- Implement the entire SQL definition (currently all of SQLite)
- Create JDBC Driver/Server
- Push down Expressions to the Tablet Servers
- Install-less queries, use Q4A jar directly against any Accumulo
Cluster ( less push-down expressions)
- documentation :o
- testing ;)

 *Does it work?*
 Not yet, the project is still a work in progress. and I will be working on
 it at the Accumulo Summit this year. Progress is slow as I am getting
 married in about a month and some change.

 *Questions:*
 If you have questions about Q4A as here, I will be at the Accumulo Summit
 @ ClearEdgeIT Table and Hackathon.

 *WHERE IS TEH LINK?!1!*
 Oh here: https://github.com/agwells0714/q4a

 --
 *Andrew George Wells*
 *Software Engineer*
 *awe...@clearedgeit.com awe...@clearedgeit.com*




Re: Q4A Project

2015-04-27 Thread Corey Nolet
I'm always looking for places to help out and integrate/share designs 
ideas. I look forward to chatting with you about Q4A at the hackathon
tomorrow!

Have you, by chance, seen the Spark SQL adapter for the Accumulo Recipes
Event  Entity Stores [1]? At the very least, it's a good example of using
Spark's SQL abstraction over Accumulo. As Mike Drob pointed out, Spark SQL
has a pretty robust query planning / optimization layer. The Event/Entity
stores in Accumulo Recipes also have a pluggable query
planning/optimization layer.


[1]
https://github.com/calrissian/accumulo-recipes/blob/master/thirdparty/spark/src/test/scala/org/calrissian/accumulorecipes/spark/sql/EventStoreCatalystTest.scala

On Mon, Apr 27, 2015 at 9:38 PM, Mike Drob mad...@cloudera.com wrote:

 Andrew,

 This is a cool thing to work on, I hope you have great success!

 A couple of questions about the motivations behind this, if you don't mind
 -
 - There are several SQL implementations already in the Hadoop ecosystem.
 In what ways do you expect this to improve upon
 Hive/Impala/Phoenix/Presto/Spark SQL? I haven't looked at the code, so it
 is quite possible you're already using one of those technologies.
 - In a conversation with some HP engineers earlier this year, they
 mentioned that building a SQL-92 layer is the easy part, and that a mature
 optimization engine is the really hard part. This is where Oracle may still
 be leaps and bounds ahead of its nearest competitors. Do you have plans for
 a query planner? If not, you might be back to writing MapReduce jobs sooner
 than you think.

 Look forward to seeing more!

 Mike

 On Mon, Apr 27, 2015 at 7:37 PM, Andrew Wells awe...@clearedgeit.com
 wrote:

 I have been working on a project, tentatively called Q4A (Query for
 Accumulo). Another possible name is ASQ (Accumulo Streaming Query) [discus].

 This is a streaming query as the query is completed via a stream, should
 never group data in memory. To batch, intermediate results would be written
 back to Accumulo temporarily.


 The *primary goal* is to have a complete SQL implementation native to
 Accumulo.

 *Why do this?*
 I am getting tired of writing bad java code to query a database. I would
 rather write bad SQL code. Also, people should be able to get queries out
 faster and it shouldn't take a developer.


 *Native To Accumulo*:

- There should be no special format to read a database created by Q4A
- There should be no special format for Q4A to query a table
- All tables are tables available to Q4A
- Any special tables, are stored away from the users databases
(indexes, column definitions, etc)

 *Other Goals*:

- Implement the entire SQL definition (currently all of SQLite)
- Create JDBC Driver/Server
- Push down Expressions to the Tablet Servers
- Install-less queries, use Q4A jar directly against any Accumulo
Cluster ( less push-down expressions)
- documentation :o
- testing ;)

 *Does it work?*
 Not yet, the project is still a work in progress. and I will be working
 on it at the Accumulo Summit this year. Progress is slow as I am getting
 married in about a month and some change.

 *Questions:*
 If you have questions about Q4A as here, I will be at the Accumulo Summit
 @ ClearEdgeIT Table and Hackathon.

 *WHERE IS TEH LINK?!1!*
 Oh here: https://github.com/agwells0714/q4a

 --
 *Andrew George Wells*
 *Software Engineer*
 *awe...@clearedgeit.com awe...@clearedgeit.com*





Re: DAG

2015-04-25 Thread Corey Nolet
Giovanni,

The DAG can be walked by calling the dependencies() function on any RDD.
It returns a  Seq containing the parent RDDs. If you start at the leaves
and walk through the parents until dependencies() returns an empty Seq, you
ultimately have your DAG.

On Sat, Apr 25, 2015 at 1:28 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 May be this will give you a good start
 https://github.com/apache/spark/pull/2077

 Thanks
 Best Regards

 On Sat, Apr 25, 2015 at 1:29 AM, Giovanni Paolo Gibilisco 
 gibb...@gmail.com wrote:

 Hi,
 I would like to know if it is possible to build the DAG before actually
 executing the application. My guess is that in the scheduler the DAG is
 built dynamically at runtime since it might depend on the data, but I was
 wondering if there is a way (and maybe a tool already) to analyze the code
 and buidl the DAG.

 Thank you!





Horizontal scaling a topic

2015-04-23 Thread Corey Nolet
I have a cluster of 3 nodes and I've created a topic with some number of
partitions and some number of replicas, let's say 10 and 2, respectively.
Later, after I've got my 3 nodes fairly consumed with data in the 10
partitions, I want to add 2 more nodes to the mix to help balance out the
partitions/replicas of my topic across 5 physical nodes instead of just 3.

I was assuming Kafka would just notice the new node and auto-replicate
partitions to it but research is telling me that this probably isn't the
case. Let's say I want no data loss and I want Kafka to spread my 10
partitions across all 5 nodes. How would I do this currently?


Re: why does groupByKey return RDD[(K, Iterable[V])] not RDD[(K, CompactBuffer[V])] ?

2015-04-23 Thread Corey Nolet
If you return an iterable, you are not tying the API to a compactbuffer.
Someday, the data could be fetched lazily and he API would not have to
change.
On Apr 23, 2015 6:59 PM, Dean Wampler deanwamp...@gmail.com wrote:

 I wasn't involved in this decision (I just make the fries), but
 CompactBuffer is designed for relatively small data sets that at least fit
 in memory. It's more or less an Array. In principle, returning an iterator
 could hide the actual data structure that might be needed to hold a much
 bigger data set, if necessary.

 HOWEVER, it actually returns a CompactBuffer.


 https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L444


 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Thu, Apr 23, 2015 at 5:46 PM, Hao Ren inv...@gmail.com wrote:

 Should I repost this to dev list ?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/why-does-groupByKey-return-RDD-K-Iterable-V-not-RDD-K-CompactBuffer-V-tp22616p22640.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Streaming anomaly detection using ARIMA

2015-04-10 Thread Corey Nolet
Sean,

I do agree about the inside out parallelization but my curiosity is
mostly in what type of performance I can expect to have by piping out to R.
I'm playing with Twitter's new Anomaly Detection library btw, this could be
a solution if I can get the calls to R to stand up to the massive dataset
that I have.

I'll report back my findings.

On Thu, Apr 2, 2015 at 3:46 AM, Sean Owen so...@cloudera.com wrote:

 This inside out parallelization has been a way people have used R
 with MapReduce for a long time. Run N copies of an R script on the
 cluster, on different subsets of the data, babysat by Mappers. You
 just need R installed on the cluster. Hadoop Streaming makes this easy
 and things like RDD.pipe in Spark make it easier.

 So it may be just that simple and so there's not much to say about it.
 I haven't tried this with Spark Streaming but imagine it would also
 work. Have you tried this?

 Within a window you would probably take the first x% as training and
 the rest as test. I don't think there's a question of looking across
 windows.

 On Thu, Apr 2, 2015 at 12:31 AM, Corey Nolet cjno...@gmail.com wrote:
  Surprised I haven't gotten any responses about this. Has anyone tried
 using
  rJava or FastR w/ Spark? I've seen the SparkR project but thta goes the
  other way- what I'd like to do is use R for model calculation and Spark
 to
  distribute the load across the cluster.
 
  Also, has anyone used Scalation for ARIMA models?
 
  On Mon, Mar 30, 2015 at 9:30 AM, Corey Nolet cjno...@gmail.com wrote:
 
  Taking out the complexity of the ARIMA models to simplify things- I
 can't
  seem to find a good way to represent even standard moving averages in
 spark
  streaming. Perhaps it's my ignorance with the micro-batched style of the
  DStreams API.
 
  On Fri, Mar 27, 2015 at 9:13 PM, Corey Nolet cjno...@gmail.com wrote:
 
  I want to use ARIMA for a predictive model so that I can take time
 series
  data (metrics) and perform a light anomaly detection. The time series
 data
  is going to be bucketed to different time units (several minutes within
  several hours, several hours within several days, several days within
  several years.
 
  I want to do the algorithm in Spark Streaming. I'm used to tuple at a
  time streaming and I'm having a tad bit of trouble gaining insight
 into how
  exactly the windows are managed inside of DStreams.
 
  Let's say I have a simple dataset that is marked by a key/value tuple
  where the key is the name of the component who's metrics I want to run
 the
  algorithm against and the value is a metric (a value representing a
 sum for
  the time bucket. I want to create histograms of the time series data
 for
  each key in the windows in which they reside so I can use that
 histogram
  vector to generate my ARIMA prediction (actually, it seems like this
 doesn't
  just apply to ARIMA but could apply to any sliding average).
 
  I *think* my prediction code may look something like this:
 
  val predictionAverages = dstream
.groupByKeyAndWindow(60*60*24, 60*60*24)
.mapValues(applyARIMAFunction)
 
  That is, keep 24 hours worth of metrics in each window and use that for
  the ARIMA prediction. The part I'm struggling with is how to join
 together
  the actual values so that i can do my comparison against the prediction
  model.
 
  Let's say dstream contains the actual values. For any time  window, I
  should be able to take a previous set of windows and use model to
 compare
  against the current values.
 
 
 
 



SparkR newHadoopAPIRDD

2015-04-01 Thread Corey Nolet
How hard would it be to expose this in some way? I ask because the current
textFile and objectFile functions are obviously at some point calling out
to a FileInputFormat and configuring it.

Could we get a way to configure any arbitrary inputformat / outputformat?


Re: Streaming anomaly detection using ARIMA

2015-04-01 Thread Corey Nolet
Surprised I haven't gotten any responses about this. Has anyone tried using
rJava or FastR w/ Spark? I've seen the SparkR project but thta goes the
other way- what I'd like to do is use R for model calculation and Spark to
distribute the load across the cluster.

Also, has anyone used Scalation for ARIMA models?

On Mon, Mar 30, 2015 at 9:30 AM, Corey Nolet cjno...@gmail.com wrote:

 Taking out the complexity of the ARIMA models to simplify things- I can't
 seem to find a good way to represent even standard moving averages in spark
 streaming. Perhaps it's my ignorance with the micro-batched style of the
 DStreams API.

 On Fri, Mar 27, 2015 at 9:13 PM, Corey Nolet cjno...@gmail.com wrote:

 I want to use ARIMA for a predictive model so that I can take time series
 data (metrics) and perform a light anomaly detection. The time series data
 is going to be bucketed to different time units (several minutes within
 several hours, several hours within several days, several days within
 several years.

 I want to do the algorithm in Spark Streaming. I'm used to tuple at a
 time streaming and I'm having a tad bit of trouble gaining insight into
 how exactly the windows are managed inside of DStreams.

 Let's say I have a simple dataset that is marked by a key/value tuple
 where the key is the name of the component who's metrics I want to run the
 algorithm against and the value is a metric (a value representing a sum for
 the time bucket. I want to create histograms of the time series data for
 each key in the windows in which they reside so I can use that histogram
 vector to generate my ARIMA prediction (actually, it seems like this
 doesn't just apply to ARIMA but could apply to any sliding average).

 I *think* my prediction code may look something like this:

 val predictionAverages = dstream
   .groupByKeyAndWindow(60*60*24, 60*60*24)
   .mapValues(applyARIMAFunction)

 That is, keep 24 hours worth of metrics in each window and use that for
 the ARIMA prediction. The part I'm struggling with is how to join together
 the actual values so that i can do my comparison against the prediction
 model.

 Let's say dstream contains the actual values. For any time  window, I
 should be able to take a previous set of windows and use model to compare
 against the current values.






Re: Streaming anomaly detection using ARIMA

2015-03-30 Thread Corey Nolet
Taking out the complexity of the ARIMA models to simplify things- I can't
seem to find a good way to represent even standard moving averages in spark
streaming. Perhaps it's my ignorance with the micro-batched style of the
DStreams API.

On Fri, Mar 27, 2015 at 9:13 PM, Corey Nolet cjno...@gmail.com wrote:

 I want to use ARIMA for a predictive model so that I can take time series
 data (metrics) and perform a light anomaly detection. The time series data
 is going to be bucketed to different time units (several minutes within
 several hours, several hours within several days, several days within
 several years.

 I want to do the algorithm in Spark Streaming. I'm used to tuple at a
 time streaming and I'm having a tad bit of trouble gaining insight into
 how exactly the windows are managed inside of DStreams.

 Let's say I have a simple dataset that is marked by a key/value tuple
 where the key is the name of the component who's metrics I want to run the
 algorithm against and the value is a metric (a value representing a sum for
 the time bucket. I want to create histograms of the time series data for
 each key in the windows in which they reside so I can use that histogram
 vector to generate my ARIMA prediction (actually, it seems like this
 doesn't just apply to ARIMA but could apply to any sliding average).

 I *think* my prediction code may look something like this:

 val predictionAverages = dstream
   .groupByKeyAndWindow(60*60*24, 60*60*24)
   .mapValues(applyARIMAFunction)

 That is, keep 24 hours worth of metrics in each window and use that for
 the ARIMA prediction. The part I'm struggling with is how to join together
 the actual values so that i can do my comparison against the prediction
 model.

 Let's say dstream contains the actual values. For any time  window, I
 should be able to take a previous set of windows and use model to compare
 against the current values.





Streaming anomaly detection using ARIMA

2015-03-27 Thread Corey Nolet
I want to use ARIMA for a predictive model so that I can take time series
data (metrics) and perform a light anomaly detection. The time series data
is going to be bucketed to different time units (several minutes within
several hours, several hours within several days, several days within
several years.

I want to do the algorithm in Spark Streaming. I'm used to tuple at a
time streaming and I'm having a tad bit of trouble gaining insight into
how exactly the windows are managed inside of DStreams.

Let's say I have a simple dataset that is marked by a key/value tuple where
the key is the name of the component who's metrics I want to run the
algorithm against and the value is a metric (a value representing a sum for
the time bucket. I want to create histograms of the time series data for
each key in the windows in which they reside so I can use that histogram
vector to generate my ARIMA prediction (actually, it seems like this
doesn't just apply to ARIMA but could apply to any sliding average).

I *think* my prediction code may look something like this:

val predictionAverages = dstream
  .groupByKeyAndWindow(60*60*24, 60*60*24)
  .mapValues(applyARIMAFunction)

That is, keep 24 hours worth of metrics in each window and use that for the
ARIMA prediction. The part I'm struggling with is how to join together the
actual values so that i can do my comparison against the prediction model.

Let's say dstream contains the actual values. For any time  window, I
should be able to take a previous set of windows and use model to compare
against the current values.


Re: iPython Notebook + Spark + Accumulo -- best practice?

2015-03-26 Thread Corey Nolet
Spark uses a SerializableWritable [1] to java serialize writable objects.
I've noticed (at least in Spark 1.2.1) that it breaks down with some
objects when Kryo is used instead of regular java serialization. Though it
is  wrapping the actual AccumuloInputFormat (another example of something
you may want to do in the future), we have Accumulo working to load data
from a table into Spark SQL [2]. The way Spark uses the InputFormat is very
straightforward.

[1]
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SerializableWritable.scala
[2]
https://github.com/calrissian/accumulo-recipes/blob/master/thirdparty/spark/src/main/scala/org/calrissian/accumulorecipes/spark/sql/EventStoreCatalyst.scala#L76

On Thu, Mar 26, 2015 at 3:06 PM, Nick Pentreath nick.pentre...@gmail.com
wrote:

 I'm guessing the Accumulo Key and Value classes are not serializable, so
 you would need to do something like

 val rdd = sc.newAPIHadoopRDD(...).map { case (key, value) =
 (extractScalaType(key), extractScalaType(value)) }

 Where 'extractScalaType converts the key or Value to a standard Scala type
 or case class or whatever - basically extracts the data from the Key or
 Value in a form usable in Scala

 —
 Sent from Mailbox https://www.dropbox.com/mailbox


 On Thu, Mar 26, 2015 at 8:59 PM, Russ Weeks rwe...@newbrightidea.com
 wrote:

 Hi, David,

 This is the code that I use to create a JavaPairRDD from an Accumulo
 table:

  JavaSparkContext sc = new JavaSparkContext(conf);
 Job hadoopJob = Job.getInstance(conf,TestSparkJob);
 job.setInputFormatClass(AccumuloInputFormat.class);
 AccumuloInputFormat.setZooKeeperInstance(job,
 conf.get(ZOOKEEPER_INSTANCE_NAME,
 conf.get(ZOOKEEPER_HOSTS)
 );
 AccumuloInputFormat.setConnectorInfo(job,
 conf.get(ACCUMULO_AGILE_USERNAME),
 new PasswordToken(conf.get(ACCUMULO_AGILE_PASSWORD))
 );
 AccumuloInputFormat.setInputTableName(job, conf.get(ACCUMULO_TABLE_NAME));
 AccumuloInputFormat.setScanAuthorizations(job, auths);
 JavaPairRDDKey, Value values =
 sc.newAPIHadoopRDD(hadoopJob.getConfiguration(), AccumuloInputFormat.class,
 Key.class, Value.class);

 Key.class and Value.class are from org.apache.accumulo.core.data. I use a
 WholeRowIterator so that the Value is actually an encoded representation of
 an entire logical row; it's a useful convenience if you can be sure that
 your rows always fit in memory.

 I haven't tested it since Spark 1.0.1 but I doubt anything important has
 changed.

 Regards,
 -Russ


 On Thu, Mar 26, 2015 at 11:41 AM, David Holiday dav...@annaisystems.com
 wrote:

  * progress!*

 i was able to figure out why the 'input INFO not set' error was
 occurring. the eagle-eyed among you will no doubt see the following code is
 missing a closing '('

 AbstractInputFormat.setConnectorInfo(jobConf, root, new 
 PasswordToken(password)

 as I'm doing this in spark-notebook, I'd been clicking the execute
 button and moving on because I wasn't seeing an error. what I forgot was
 that notebook is going to do what spark-shell will do when you leave off a
 closing ')' -- *it will wait forever for you to add it*. so the error
 was the result of the 'setConnectorInfo' method never getting executed.

 unfortunately, I'm still unable to shove the accumulo table data into an
 RDD that's useable to me. when I execute

 rddX.count

 I get back

 res15: Long = 1

 which is the correct response - there are 10,000 rows of data in the
 table I pointed to. however, when I try to grab the first element of data
 thusly:

 rddX.first

 I get the following error:

 org.apache.spark.SparkException: Job aborted due to stage failure: Task
 0.0 in stage 0.0 (TID 0) had a not serializable result:
 org.apache.accumulo.core.data.Key

 any thoughts on where to go from here?
   DAVID HOLIDAY
  Software Engineer
  760 607 3300 | Office
  312 758 8385 | Mobile
  dav...@annaisystems.com broo...@annaisystems.com


 GetFileAttachment.jpg

 www.AnnaiSystems.com

  On Mar 26, 2015, at 8:35 AM, David Holiday dav...@annaisystems.com
 wrote:

  hi Nick

 Unfortunately the Accumulo docs are woefully inadequate, and in some
 places, flat wrong. I'm not sure if this is a case where the docs are 'flat
 wrong', or if there's some wrinke with spark-notebook in the mix that's
 messing everything up. I've been working with some people on stack overflow
 on this same issue (including one of the people from the spark-notebook
 team):


 http://stackoverflow.com/questions/29244530/how-do-i-create-a-spark-rdd-from-accumulo-1-6-in-spark-notebook?noredirect=1#comment46755938_29244530

 if you click the link you can see the entire thread of code, responses
 from notebook, etc. I'm going to try invoking the same techniques both from
 within a stand-alone scala problem and from the shell itself to see if I
 can get some traction. I'll report back when I have more data.

 cheers (and thx!)



 DAVID HOLIDAY
  Software Engineer
  760 607 3300 | Office
  312 758 8385 | Mobile
  

Re: [SparkSQL] How to calculate stddev on a DataFrame?

2015-03-25 Thread Corey Nolet
I would do sum square. This would allow you to keep an ongoing value as an
associative operation (in an aggregator) and then calculate the variance 
std deviation after the fact.

On Wed, Mar 25, 2015 at 10:28 PM, Haopu Wang hw...@qilinsoft.com wrote:

  Hi,



 I have a DataFrame object and I want to do types of aggregations like
 count, sum, variance, stddev, etc.



 DataFrame has DSL to do simple aggregations like count and sum.



 How about variance and stddev?



 Thank you for any suggestions!





StreamingListener

2015-03-11 Thread Corey Nolet
Given the following scenario:

dstream.map(...).filter(...).window(...).foreachrdd()

When would the onBatchCompleted fire?


Re: [VOTE] Establishing a contrib repo for upgrade testing

2015-03-10 Thread Corey Nolet
+1

On Tue, Mar 10, 2015 at 10:57 AM, David Medinets david.medin...@gmail.com
wrote:

 +1

 On Tue, Mar 10, 2015 at 10:56 AM, Adam Fuchs afu...@apache.org wrote:
  +1
 
  Adam
  On Mar 10, 2015 2:48 AM, Sean Busbey bus...@cloudera.com wrote:
 
  Hi Accumulo!
 
  This is the VOTE thread following our DISCUSS thread on establishing a
 new
  contrib for upgrade testing. For more details, please see the prior
 DISCUSS
  thread on this topic[1].
 
  Cloudera has recently made public some code used for doing correctness
  testing for Accumulo installations across upgrades[2]. The project
 contains
  simple data load and verification tools as well as a rudimentary upgrade
  test automation script. Cloudera would like to donate this code to the
 ASF
  and use it as a starting place for a contrib repository focused on
 testing
  Accumulo across versions generally.
 
  Upon passage of this vote, the Accumulo PMC will adopt this repo as a
 code
  base for the new project contrib accumulo-upgrade-tests subject to the
  ASF IP clearance process[3].
 
  Either as a part of the IP clearance process or immediately thereafter
 the
  repo's docs, artifacts, and packages will be updated to make use of ASF
  releases ad naming conventions rather than vendor specific materials.
 
  I (Sean Busbey) have volunteered to shepherd the paperwork in the IP
  clearance process, handle the updates to ASF releases, and serve as
  component lead for a new Jira component to cover the contrib.
 
  Note that as a contrib repository, the artifacts from this repo will be
  versioned independently from the primary Accumulo codebase. While this
 repo
  seeks to be useful for testing across Accumulo releases, this proposal
 does
  not establish any requirement for its use on release candidates of the
  primary codebase.
 
  Per our bylaws, this vote will require consensus approval (at least 3
  binding +1 votes and no binding vetoes). Though only PMC votes are
 binding,
  all community members are encouraged to vote.
 
  The vote will remain open until 0700 GMT Tuesday March 17 2015 (0300
 EDT).
 
  Please vote one of:
 
  [ ] +1: Establish the 'accumulo-upgrade-tests' contrib by adopting the
  codebase as described
  [ ] -1: Do not adopt the codebase because ...
 
 
  [1]: http://s.apache.org/MsR
  [2]: https://github.com/cloudera/accumulo-upgrade-test/
  [3]: http://incubator.apache.org/ip-clearance/
 
  --
  Sean
 



Re: Batching at the socket layer

2015-03-10 Thread Corey Nolet
Thanks Jiangie! So what version is considered the new api? Is that the
javaapi in version 0.8.2?.

On Mon, Mar 9, 2015 at 2:29 PM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 The stickiness of partition only applies to old producer. In new producer
 we have the round robin for each message. The batching in new producer is
 per topic partition, the batch size it is controlled by both max batch
 size and linger time config.

 Jiangjie (Becket) Qin

 On 3/9/15, 10:10 AM, Corey Nolet cjno...@gmail.com wrote:

 I'm curious what type of batching Kafka producers do at the socket layer.
 For instance, if I have a partitioner that round robin's n messages to a
 different partition, am I guaranteed to get n different messages sent over
 the socket or is there some micro-batching going on underneath?
 
 I am trying to understand the semantics of the default partitioner and why
 it sticks to partitions for 10 minutes. If I were to lower that interval
 to
 1sec, would I acheive better batching that I would if I was to completely
 round-robin each message to a different partition?




Fwd: Verioning

2015-03-09 Thread Corey Nolet
I'm new to Kafka and I'm trying to understand the version semantics. We
want to use Kafka w/ Spark but our version of Spark is tied to 0.8.0. We
were wondering what guarantees are made about backwards compatbility across
0.8.x.x. At first glance, given the 3 digits used for versions, I figured
0.8.x would be a bugfix and fully version-compatible but I'm noticing newer
versions released w/ 4 digits which leads me to beleave there are less
guarantees between 0.8.0, 0.8.1.x and 0.8.2.x.


Fwd: Batching at the socket layer

2015-03-09 Thread Corey Nolet
I'm curious what type of batching Kafka producers do at the socket layer.
For instance, if I have a partitioner that round robin's n messages to a
different partition, am I guaranteed to get n different messages sent over
the socket or is there some micro-batching going on underneath?

I am trying to understand the semantics of the default partitioner and why
it sticks to partitions for 10 minutes. If I were to lower that interval to
1sec, would I acheive better batching that I would if I was to completely
round-robin each message to a different partition?


Re: [VOTE] Release Apache Spark 1.3.0 (RC3)

2015-03-09 Thread Corey Nolet
+1 (non-binding)

- Verified signatures
- Built on Mac OS X and Fedora 21.

On Mon, Mar 9, 2015 at 11:01 PM, Krishna Sankar ksanka...@gmail.com wrote:

 Excellent, Thanks Xiangrui. The mystery is solved.
 Cheers
 k/


 On Mon, Mar 9, 2015 at 3:30 PM, Xiangrui Meng men...@gmail.com wrote:

  Krishna, I tested your linear regression example. For linear
  regression, we changed its objective function from 1/n * \|A x -
  b\|_2^2 to 1/(2n) * \|Ax - b\|_2^2 to be consistent with common least
  squares formulations. It means you could re-produce the same result by
  multiplying the step size by 2. This is not a problem if both run
  until convergence (if not blow up). However, in your example, a very
  small step size is chosen and it didn't converge in 100 iterations. In
  this case, the step size matters. I will put a note in the migration
  guide. Thanks! -Xiangrui
 
  On Mon, Mar 9, 2015 at 1:38 PM, Sean Owen so...@cloudera.com wrote:
   I'm +1 as I have not heard of any one else seeing the Hive test
   failure, which is likely a test issue rather than code issue anyway,
   and not a blocker.
  
   On Fri, Mar 6, 2015 at 9:36 PM, Sean Owen so...@cloudera.com wrote:
   Although the problem is small, especially if indeed the essential docs
   changes are following just a couple days behind the final release, I
   mean, why the rush if they're essential? wait a couple days, finish
   them, make the release.
  
   Answer is, I think these changes aren't actually essential given the
   comment from tdas, so: just mark these Critical? (although ... they do
   say they're changes for the 1.3 release, so kind of funny to get to
   them for 1.3.x or 1.4, but that's not important now.)
  
   I thought that Blocker really meant Blocker in this project, as I've
   been encouraged to use it to mean don't release without this. I
   think we should use it that way. Just thinking of it as extra
   Critical doesn't add anything. I don't think Documentation should be
   special-cased as less important, and I don't think there's confusion
   if Blocker means what it says, so I'd 'fix' that way.
  
   If nobody sees the Hive failure I observed, and if we can just zap
   those Blockers one way or the other, +1
  
  
   On Fri, Mar 6, 2015 at 9:17 PM, Patrick Wendell pwend...@gmail.com
  wrote:
   Sean,
  
   The docs are distributed and consumed in a fundamentally different
 way
   than Spark code itself. So we've always considered the deadline for
   doc changes to be when the release is finally posted.
  
   If there are small inconsistencies with the docs present in the
 source
   code for that release tag, IMO that doesn't matter much since we
 don't
   even distribute the docs with Spark's binary releases and virtually
 no
   one builds and hosts the docs on their own (that I am aware of, at
   least). Perhaps we can recommend if people want to build the doc
   sources that they should always grab the head of the most recent
   release branch, to set expectations accordingly.
  
   In the past we haven't considered it worth holding up the release
   process for the purpose of the docs. It just doesn't make sense since
   they are consumed as a service. If we decide to change this
   convention, it would mean shipping our releases later, since we
   could't pipeline the doc finalization with voting.
  
   - Patrick
  
   On Fri, Mar 6, 2015 at 11:02 AM, Sean Owen so...@cloudera.com
 wrote:
   Given the title and tagging, it sounds like there could be some
   must-have doc changes to go with what is being released as 1.3. It
 can
   be finished later, and published later, but then the docs source
   shipped with the release doesn't match the site, and until then, 1.3
   is released without some must-have docs for 1.3 on the site.
  
   The real question to me is: are there any further, absolutely
   essential doc changes that need to accompany 1.3 or not?
  
   If not, just resolve these. If there are, then it seems like the
   release has to block on them. If there are some docs that should
 have
   gone in for 1.3, but didn't, but aren't essential, well I suppose it
   bears thinking about how to not slip as much work, but it doesn't
   block.
  
   I think Documentation issues certainly can be a blocker and
 shouldn't
   be specially ignored.
  
  
   BTW the UISeleniumSuite issue is a real failure, but I do not think
 it
   is serious: http://issues.apache.org/jira/browse/SPARK-6205  It
 isn't
   a regression from 1.2.x, but only affects tests, and only affects a
   subset of build profiles.
  
  
  
  
   On Fri, Mar 6, 2015 at 6:43 PM, Patrick Wendell pwend...@gmail.com
 
  wrote:
   Hey Sean,
  
   SPARK-5310 Update SQL programming guide for 1.3
   SPARK-5183 Document data source API
   SPARK-6128 Update Spark Streaming Guide for Spark 1.3
  
   For these, the issue is that they are documentation JIRA's, which
   don't need to be timed exactly with the release vote, since we can
   update the documentation on the 

  1   2   3   4   5   >