For those seeing this for the first time, look in the quoted
email history for the email from me showing the outline of RxJava
operators...
On Mon, Aug 17, 2015 at 7:20 AM, cogmission (David Ray)
<[email protected] <mailto:[email protected]>>
wrote:
Omg! I just found this!
On each of the above operator links (seen in red if you're
using Rich Text or HTML email client), there is a link on the
landing page where you can go to their awesome marble
diagrams (which the have for each operator)...
Except, this one is interactive! So you can move around the
marbles and see the effect in their output (see how the
transformation functions change!)
Check it out! http://rxmarbles.com/#zip
Cheers,
David
On Sun, Aug 16, 2015 at 10:04 PM, cogmission (David Ray)
<[email protected]
<mailto:[email protected]>> wrote:
Hi Matt,
I'm only responding in order to avoid what may possibly
be a missed opportunity, and I don't want you or anyone
else who may see the term "Observable" and mistake it for
a simple publish/subscribe pattern that has been seen
before. On the contrary, this library is extremely
powerful, easy on the eyes and with some subtle usage,
can exert extreme power and flexibility in its usage.
For example:
Say you have a service where you receive a message for
the retrieval of a document on the web. In servicing that
request you need to make an asynchronous query to
back-end services, and then say that service needs to
aggregate several formats from a db, and other streaming
sources, so you nest yet another blocking request to
receive those items in a synchronized fashion and maybe
it doesn't end there? Maybe there is an additional stop
along the way where another nested asynchronous call
needs to be made? Now if one of those connections
experiences undue latency or the service is unavailable
for some reason, it is extremely laborious to try and
percolate that exception back up to the original caller.
RxJava handles the bubbling of exceptions from deeply
nested calls back to the original call site.
That's just one example of its use.
Then there are the 100 or so operators that can be
performed on each Observable. Each function returns an
Observable that "emits" one or more items or "streams" of
data. And each Observable can be "operated upon"
mathematically or combined/reduced/transformed etc. And
yes, if your Observables are composed in a "Functional"
manner, meaning they don't mutate or convey state, you
are isolated from a lot of concurrency issues that you
would otherwise have to handle.
I'm not saying that a reckless developer can't still get
themselves in trouble - that is true with any
breakthrough library. I'm just saying this is quite an
exceptional offering in the Java language. Other
languages might have something similar - but this is also
being used in the C# camp too (and Ruby, Python, Scala,
Clojure, C++, Swift, JRuby, Kotlin etc.) - Entitled
ReactiveX as seen here: http://reactivex.io/languages.html
This is also different from the Java 8 Streams offering
because it can do both Pull (like Java 8 Streams) and
Push oriented (its major use case) query patterns. As a
matter of fact, I'm sure the authors of the Java 8
release got a lot of their ideas from this library!
Take a look at some of its operators:
Operators By Category
Creating Observables
Operators that originate new Observables.
* *|Create|*
<http://reactivex.io/documentation/operators/create.html> —
create an Observable from scratch by calling observer
methods programmatically
* *|Defer|*
<http://reactivex.io/documentation/operators/defer.html> —
do not create the Observable until the observer
subscribes, and create a fresh Observable for each
observer
* *|Empty|/|Never|/|Throw|*
<http://reactivex.io/documentation/operators/empty-never-throw.html> —
create Observables that have very precise and limited
behavior
* *|From|*
<http://reactivex.io/documentation/operators/from.html> —
convert some other object or data structure into an
Observable
* *|Interval|*
<http://reactivex.io/documentation/operators/interval.html> —
create an Observable that emits a sequence of
integers spaced by a particular time interval
* *|Just|*
<http://reactivex.io/documentation/operators/just.html> —
convert an object or a set of objects into an
Observable that emits that or those objects
* *|Range|*
<http://reactivex.io/documentation/operators/range.html> —
create an Observable that emits a range of sequential
integers
* *|Repeat|*
<http://reactivex.io/documentation/operators/repeat.html> —
create an Observable that emits a particular item or
sequence of items repeatedly
* *|Start|*
<http://reactivex.io/documentation/operators/start.html> —
create an Observable that emits the return value of a
function
* *|Timer|*
<http://reactivex.io/documentation/operators/timer.html> —
create an Observable that emits a single item after a
given delay
Transforming Observables
Operators that transform items that are emitted by an
Observable.
* *|Buffer|*
<http://reactivex.io/documentation/operators/buffer.html> —
periodically gather items from an Observable into
bundles and emit these bundles rather than emitting
the items one at a time
* *|FlatMap|*
<http://reactivex.io/documentation/operators/flatmap.html> —
transform the items emitted by an Observable into
Observables, then flatten the emissions from those
into a single Observable
* *|GroupBy|*
<http://reactivex.io/documentation/operators/groupby.html> —
divide an Observable into a set of Observables that
each emit a different group of items from the
original Observable, organized by key
* *|Map|*
<http://reactivex.io/documentation/operators/map.html> —
transform the items emitted by an Observable by
applying a function to each item
* *|Scan|*
<http://reactivex.io/documentation/operators/scan.html> —
apply a function to each item emitted by an
Observable, sequentially, and emit each successive value
* *|Window|*
<http://reactivex.io/documentation/operators/window.html> —
periodically subdivide items from an Observable into
Observable windows and emit these windows rather than
emitting the items one at a time
Filtering Observables
Operators that selectively emit items from a source
Observable.
* *|Debounce|*
<http://reactivex.io/documentation/operators/debounce.html> —
only emit an item from an Observable if a particular
timespan has passed without it emitting another item
* *|Distinct|*
<http://reactivex.io/documentation/operators/distinct.html> —
suppress duplicate items emitted by an Observable
* *|ElementAt|*
<http://reactivex.io/documentation/operators/elementat.html> —
emit only item /n/ emitted by an Observable
* *|Filter|*
<http://reactivex.io/documentation/operators/filter.html> —
emit only those items from an Observable that pass a
predicate test
* *|First|*
<http://reactivex.io/documentation/operators/first.html> —
emit only the first item, or the first item that
meets a condition, from an Observable
* *|IgnoreElements|*
<http://reactivex.io/documentation/operators/ignoreelements.html> —
do not emit any items from an Observable but mirror
its termination notification
* *|Last|*
<http://reactivex.io/documentation/operators/last.html> —
emit only the last item emitted by an Observable
* *|Sample|*
<http://reactivex.io/documentation/operators/sample.html> —
emit the most recent item emitted by an Observable
within periodic time intervals
* *|Skip|*
<http://reactivex.io/documentation/operators/skip.html> —
suppress the first /n/ items emitted by an Observable
* *|SkipLast|*
<http://reactivex.io/documentation/operators/skiplast.html> —
suppress the last /n/ items emitted by an Observable
* *|Take|*
<http://reactivex.io/documentation/operators/take.html> —
emit only the first /n/ items emitted by an Observable
* *|TakeLast|*
<http://reactivex.io/documentation/operators/takelast.html> —
emit only the last /n/ items emitted by an Observable
Combining Observables
Operators that work with multiple source Observables to
create a single Observable
* *|And|/|Then|/|When|*
<http://reactivex.io/documentation/operators/and-then-when.html> —
combine sets of items emitted by two or more
Observables by means of |Pattern|and
|Plan| intermediaries
* *|CombineLatest|*
<http://reactivex.io/documentation/operators/combinelatest.html> —
when an item is emitted by either of two Observables,
combine the latest item emitted by each Observable
via a specified function and emit items based on the
results of this function
* *|Join|*
<http://reactivex.io/documentation/operators/join.html> —
combine items emitted by two Observables whenever an
item from one Observable is emitted during a time
window defined according to an item emitted by the
other Observable
* *|Merge|*
<http://reactivex.io/documentation/operators/merge.html> —
combine multiple Observables into one by merging
their emissions
* *|StartWith|*
<http://reactivex.io/documentation/operators/startwith.html> —
emit a specified sequence of items before beginning
to emit the items from the source Observable
* *|Switch|*
<http://reactivex.io/documentation/operators/switch.html> —
convert an Observable that emits Observables into a
single Observable that emits the items emitted by the
most-recently-emitted of those Observables
* *|Zip|*
<http://reactivex.io/documentation/operators/zip.html> —
combine the emissions of multiple Observables
together via a specified function and emit single
items for each combination based on the results of
this function
Error Handling Operators
Operators that help to recover from error notifications
from an Observable
* *|Catch|*
<http://reactivex.io/documentation/operators/catch.html> —
recover from an |onError| notification by continuing
the sequence without error
* *|Retry|*
<http://reactivex.io/documentation/operators/retry.html> —
if a source Observable sends an
|onError| notification, resubscribe to it in the
hopes that it will complete without error
Observable Utility Operators
A toolbox of useful Operators for working with Observables
* *|Delay|*
<http://reactivex.io/documentation/operators/delay.html> —
shift the emissions from an Observable forward in
time by a particular amount
* *|Do|*
<http://reactivex.io/documentation/operators/do.html> —
register an action to take upon a variety of
Observable lifecycle events
* *|Materialize|/|Dematerialize|*
<http://reactivex.io/documentation/operators/materialize-dematerialize.html> —
represent both the items emitted and the
notifications sent as emitted items, or reverse this
process
* *|ObserveOn|*
<http://reactivex.io/documentation/operators/observeon.html> —
specify the scheduler on which an observer will
observe this Observable
* *|Serialize|*
<http://reactivex.io/documentation/operators/serialize.html> —
force an Observable to make serialized calls and to
be well-behaved
* *|Subscribe|*
<http://reactivex.io/documentation/operators/subscribe.html> —
operate upon the emissions and notifications from an
Observable
* *|SubscribeOn|*
<http://reactivex.io/documentation/operators/subscribeon.html> —
specify the scheduler an Observable should use when
it is subscribed to
* *|TimeInterval|*
<http://reactivex.io/documentation/operators/timeinterval.html>
—
convert an Observable that emits items into one that
emits indications of the amount of time elapsed
between those emissions
* *|Timeout|*
<http://reactivex.io/documentation/operators/timeout.html> —
mirror the source Observable, but issue an error
notification if a particular period of time elapses
without any emitted items
* *|Timestamp|*
<http://reactivex.io/documentation/operators/timestamp.html> —
attach a timestamp to each item emitted by an Observable
* *|Using|*
<http://reactivex.io/documentation/operators/using.html> —
create a disposable resource that has the same
lifespan as the Observable
Conditional and Boolean Operators
Operators that evaluate one or more Observables or items
emitted by Observables
* *|All|*
<http://reactivex.io/documentation/operators/all.html> —
determine whether all items emitted by an Observable
meet some criteria
* *|Amb|*
<http://reactivex.io/documentation/operators/amb.html> —
given two or more source Observables, emit all of the
items from only the first of these Observables to
emit an item
* *|Contains|*
<http://reactivex.io/documentation/operators/contains.html> —
determine whether an Observable emits a particular
item or not
* *|DefaultIfEmpty|*
<http://reactivex.io/documentation/operators/defaultifempty.html> —
emit items from the source Observable, or a default
item if the source Observable emits nothing
* *|SequenceEqual|*
<http://reactivex.io/documentation/operators/sequenceequal.html> —
determine whether two Observables emit the same
sequence of items
* *|SkipUntil|*
<http://reactivex.io/documentation/operators/skipuntil.html> —
discard items emitted by an Observable until a second
Observable emits an item
* *|SkipWhile|*
<http://reactivex.io/documentation/operators/skipwhile.html> —
discard items emitted by an Observable until a
specified condition becomes false
* *|TakeUntil|*
<http://reactivex.io/documentation/operators/takeuntil.html> —
discard items emitted by an Observable after a second
Observable emits an item or terminates
* *|TakeWhile|*
<http://reactivex.io/documentation/operators/takewhile.html> —
discard items emitted by an Observable after a
specified condition becomes false
Mathematical and Aggregate Operators
Operators that operate on the entire sequence of items
emitted by an Observable
* *|Average|*
<http://reactivex.io/documentation/operators/average.html> —
calculates the average of numbers emitted by an
Observable and emits this average
* *|Concat|*
<http://reactivex.io/documentation/operators/concat.html> —
emit the emissions from two or more Observables
without interleaving them
* *|Count|*
<http://reactivex.io/documentation/operators/count.html> —
count the number of items emitted by the source
Observable and emit only this value
* *|Max|*
<http://reactivex.io/documentation/operators/max.html> —
determine, and emit, the maximum-valued item emitted
by an Observable
* *|Min|*
<http://reactivex.io/documentation/operators/min.html> —
determine, and emit, the minimum-valued item emitted
by an Observable
* *|Reduce|*
<http://reactivex.io/documentation/operators/reduce.html> —
apply a function to each item emitted by an
Observable, sequentially, and emit the final value
* *|Sum|*
<http://reactivex.io/documentation/operators/sum.html> —
calculate the sum of numbers emitted by an Observable
and emit this sum
Backpressure Operators
* *backpressure operators*
<http://reactivex.io/documentation/operators/backpressure.html>
—
strategies for coping with Observables that produce
items more rapidly than their observers consume them
Connectable Observable Operators
Specialty Observables that have more precisely-controlled
subscription dynamics
* *|Connect|*
<http://reactivex.io/documentation/operators/connect.html> —
instruct a connectable Observable to begin emitting
items to its subscribers
* *|Publish|*
<http://reactivex.io/documentation/operators/publish.html> —
convert an ordinary Observable into a connectable
Observable
* *|RefCount|*
<http://reactivex.io/documentation/operators/refcount.html> —
make a Connectable Observable behave like an ordinary
Observable
* *|Replay|*
<http://reactivex.io/documentation/operators/replay.html> —
ensure that all observers see the same sequence of
emitted items, even if they subscribe after the
Observable has begun emitting items
On Sun, Aug 16, 2015 at 8:29 PM, Matthew Lohbihler
<[email protected]
<mailto:[email protected]>> wrote:
Hi David,
It's nice to see that this pattern has been
formalized, but this isn't really anything terribly
new. I've been using observables to do things like
rollups and other data stream analysis for years now,
just not with this particular package.
Also, note that Futures/Promises don't really compete
with Observables. Futures are good for chaining
arbitrary asynchronous events, Observables for
processing streams of homogenous objects. (But is it
true that the Java Future impl is not very useful. I
had to develop my own for what i thought was a very
straightforward use case.)
Finally, there is nothing inherently thread safe
about Observables (not that anyone said they were...
just saying...). When using the same Observable
instance in a multi-threaded context, you still have
to manage synchonization on your own if the context
doesn't somehow do it for you.
Regards,
m@
On 8/15/2015 11:56 PM, cogmission (David Ray) wrote:
...and Their Use In HTM.java's Network API (NAPI)
Hi Everybody,
I wanted to share a one-page short but very well
explained tutorial on composing asynchronous
workflows using RxJava.
Due to the availability of "Big Data", Multicore
Processors, Cloud Computing and emerging concepts of
Streaming Workflows; tools like "Hadoop", "Spark"
etc. have come to the forefront as a means of
optimizing concurrency and parallel architectures.
Likewise, on a smaller scale, mastering the tools of
Streaming Data utilization is essential in order to
be ready when the future comes visiting upon us.
Enter RxJava...
As you may or may not know, HTM.java's Network API
was constructed using RxJava Observables which are a
way of composing units of work in such a way where
they can be used asynchronously or
non-asynchronously without blocking. This also
allows you to seamlessly "tie in" the NAPI in your
own applications using chains of Observable items.
Unlike Java "Futures" which only return a single
item, RxJava Observables (which also can use
Futures) can return a "streaming" flow of items in a
non-blocking fashion; this allows one to fully
utilize any Threads in use by letting them be
continuously active throughout their life cycle.
Better yet, it is very lightweight has no outside
dependencies and can be used with a multitude of
languages!!
Netflix, uses RxJava and is its original sponsor and
creator. It is a very exciting new concept that is
sweeping the Java developer ranks, and this is one
of the best tutorials on the subject I have found.
http://docs.couchbase.com/developer/java-2.0/observables.html
Enjoy!
David
--
/With kind regards,/
David Ray
Java Solutions Architect
*Cortical.io <http://cortical.io/>*
Sponsor of: HTM.java
<https://github.com/numenta/htm.java>
[email protected] <mailto:[email protected]>
http://cortical.io
--
/With kind regards,/
David Ray
Java Solutions Architect
*Cortical.io <http://cortical.io/>*
Sponsor of: HTM.java <https://github.com/numenta/htm.java>
[email protected] <mailto:[email protected]>
http://cortical.io <http://cortical.io/>
--
/With kind regards,/
David Ray
Java Solutions Architect
*Cortical.io <http://cortical.io/>*
Sponsor of: HTM.java <https://github.com/numenta/htm.java>
[email protected] <mailto:[email protected]>
http://cortical.io
--
/With kind regards,/
David Ray
Java Solutions Architect
*Cortical.io <http://cortical.io/>*
Sponsor of: HTM.java <https://github.com/numenta/htm.java>
[email protected] <mailto:[email protected]>
http://cortical.io