AndroidPub

The (retired) Pub(lication) for Android & Tech, focused on Development

Follow publication

How to wrap your imperative brain around functional reactive programming in RxJava

Disclaimer: RxJava is a beast. A beautiful beast, but a beast none the less. There is a lot to it and I can’t cover it all in this post. I tried to use the Pareto Principle to cover the 20% that will give you 80% of what you really need to know. With that being said, let’s jump in!

RxJava can be complicated to wrap your head around at first (at least it was for me!). The main reason that this can be tough at first is because of the way that a lot of Java programs are written and the way that java programming is usually taught is through programming imperatively. As I’m sure you are aware, there are many different programming paradigms and two of the most popular are imperative programming and functional programming. It is first important to understand this distinction because in order to use RxJava effectively you will have to understand the difference in these paradigms.

Functional vs. Imperative: Quick Overview

The functional programming paradigm was explicitly created to support a pure functional approach to problem solving. Functional programming is a form of declarative programming. In contrast, most mainstream languages, including object-oriented programming (OOP) languages such as C#, C++, and Java, were designed to primarily support imperative (procedural) programming.

With an imperative approach, a developer writes code that describes in exacting detail the steps that the computer must take to accomplish the goal. This is sometimes referred to as algorithmic programming. In contrast, a functional approach involves composing the problem as a set of functions to be executed. You define carefully the input to each function, and what each function returns. The following table describes some of the general differences between these two approaches.

source: MSDN

Although most languages were designed to support a specific programming paradigm, many general languages are flexible enough to support multiple paradigms. For example, most languages that contain function pointers can be used to credibly support functional programming. Furthermore, RxJava includes an explicit library to support functional programming. RxJava is a form of declarative, functional programming. Some even describe it as Functional Reactive Programming (Oh great, another paradigm!). Well don’t worry, it’s very similar to functional programming but it also adds in “reactive” behavior. See the image below:

Functional Reactive Programming Basics

Fundamentally, Functional Reactive Programming (FRP) is the Observer Pattern (this is the “reactive” part), with support for manipulating and transforming the stream of data our Observables emit and doing this without side effects (this is the functional part). Observables are a pipeline (or think stream) our data will flow through.

Side Note: A side effect refers simply to the modification of some kind of state — for instance:

  • Changing the value of a variable;
  • Writing some data to disk;
  • Enabling or disabling a button in the User Interface.

If you aren’t familiar with the Observer Pattern and don’t feel like reading the wiki on it, there are two main ideas that you need to understand. The Observer Pattern involves two roles: a source Observable, and one or many Observers that are interested in the events or objects that the source Observable will be emitting. The Observable emits objects, while the Observer subscribes and receives them.

Okay, now that you are an expert on FRP, I can start talking a little bit about RxJava. The simplest way to think about RxJava is that it introduces an easy library for developing functional reactive applications (great for micro-service architecture!) in the Java language. So why do we need that? Because non-blocking architecture is the best way to handle scalable applications.

FRP allows us to develop applications in this way. Now going back to FRP, you will note that to have a functional reactive system, you are going to have two main components to work with. These components are (thankfully) named the same as the components in the Observable Pattern that they implement: Observable and Observer. You will also see people (docs) talking about Subscriber. However, don’t let this confuse you. A Subscriber is just an implementation of Observer, with additional semantics on subscription (it’s more about un-subscription). Typically the two terms can be interchangeable. In both cases when we talk about Observers and Subscribers, we are talking about the objects interested in receiving objects from the source Observable that they are subscribed to. From here on out I will just refer to them as Subscribers (so slightly enhanced observers that you will typically be working with anyway) to keep it a little more clear from when I am talking about an Observable (Observable and Observer are too close to the same word! It hurts my mind).

TLDR; The Observable emits items, the Subscriber consumers those items as they are emitted to them. So instead of having a collection and pulling items out, we have a publisher (observable) pushing items to us.

So how does an Observable emit (send) items to its Subscribers? There is a method to this madness. The way this is done is an Observable may emit any number of items (including zero items and infinite items), then it terminates either by successfully completing, or due to an error. For each Subscriber it has, a Observable calls Subscriber.onNext(T item) any number of times passing the Subscriber the item they are interested in, followed by either Subscriber.onComplete() or Subscriber.onError(Throwable e). These two ways are the only ways that you know that an Observable is “finished”. Note that the Subscriber.onComplete() call will only happen if the Observable is not an infinite stream. Think of an Observable that wraps a timer emitting the time every “x” milliseconds to a log.

I know what you are thinking…WE NEED EXAMPLES! Okay, okay. Well, let’s start out with the basics of just creating an Observable. There are a few ways to create an Observable depending on the source that you are converting to an Observable stream. However, all of them rely on static factory methods from the Observable class. So let’s say if I have stringOne, stringTwo, and stringThree and I want to convert these to an Observable that could, later on, be subscribed to, all I would need to do is the following:

Observable<String> interestingStrings = Observable.just("I am interesting", "Subscribers will love me", "I hope Subscribers like me");

Wow, how easy was that?!

Okay now, we have an Observable that will be emitting 3 strings to any Subscriber that is interested. Not until a Subscriber is subscribed through will anything actual happen or get emitted. That makes sense if you think about it. If I had a magazine but no subscribers, I wouldn’t be doing any work to send my magazines anywhere. There would be no one interested!

So let’s create a subscriber and subscribe to our new shiny Observable (interestingStrings).

Observable<String> interestingStrings = Observable.just("I am interesting", "Subscribers will love me", "I hope Subscribers like me"); // created Observable
// create awesome Subscriber
Subscriber<String> iPrintLines = newSubscriber < String > ()
{
@Override
publicvoidonNext(String s) {
System.out.println(s);
}
@Override
publicvoidonCompleted() {
}
@Override
publicvoidonError(Throwable e) {
}
}
;
interestingStrings.subscribe(iPrintLines); // Yay, someone subscribed!
// this will print out:
//
// I am interesting
// Subscribers will love me
// I hope Subscribers like me
//
// (not printed) OnCompleted() will then be called but since we don't have any behavior for this function, nothing will be shown.
// OnError() was not called because none of our items threw an exception. But if they did, the exception would have been passed to
// the OnError() handler in the subscriber (it is in charge of deciding what to do with this) and then the Observable would stop emitting any items after this.

An important thing to remember is that as soon as you have a subscriber, the observable will begin emitting its source objects (if it has any yet) to the subscriber by calling subscriber.OnNext(String s) and in our case, passing us those 3 strings. The subscriber will then operate on each emitted item in a sequence and perform the work that you have declared in your subscriber.OnNext function. So in our case, we take each string and print it to the console.

That was a bit verbose because I wanted you to get a good idea of how the Observable and Subscriber is actually constructed and what is actually going on. A simpler and more common approach to working with items you are interested in might look like this. In this example, the subscriber is implicit created and you are just working with the OnNext, OnError, and OnCompleted callbacks.

// creation
Observable<String> interestingStrings = Observable.just("I am interesting", "Subscribers will love me", "I hope Subscribers like me");
// inline subscriber that only implements the OnNext() handler.
interestingStrings.subscribe(s -> System.out.println(s));
interestingStrings.subscribe(s -> System.out.println(s),// inline subscriber that implements OnNext() and OnError() handlers
e -> System.out.println(e.message()));
interestingStrings.subscribe(s -> System.out.println(s),e -> System.out.println(e.message()),// inline subscriber that implements all handlers.
c -> System.out.println("I'm Done!"));

Okay, so we’ve worked a little bit with creating an Observable and creating some Subscribers to do something with our emitted items. However, the Observable.just() creation factory method is just one way to create an Observable from one or many items. Another factory method that you will most likely want to familiarize yourself with is the Observable.from() method. The method takes a list of items and converts them from List to Observable. Awesome!

List<String> myStringList = {"Hello", "What's up", "Nothing Much"};Observable<String> stringsFromList = Observable.from(myStringList);// from here you can work with the Observable the same way as the above examples

We’ve now seen a couple ways to create and subscribe to Observables. Next, I want to get into what makes RxJava really awesome and that is all about it’s composable, functional API that exposes a lot of great higher order functions to filter, reduce, map, aggregate, etc., that you can apply to your Observable stream. If we were just interested in reactive programming then we could achieve something pretty close to what we have done so far just by using callbacks or futures.

Always keep in mind what it is that we are trying to achieve and how RxJava makes this easier. We are trying to build systems now that rely on asynchronous micro-services.

Our client code will be calling one to many micro-services asynchronously to compose data for use (remembering that this means we may not know when and which services will respond first). So how can we build a system this way? Well up until now in Java, we’ve only had a few options. We could use callbacks (a piece of executable code that is passed as an argument to other code, which is expected to call back (execute) the argument at some time in the future), or we could use Java Futures. Callbacks (if you’ve ever worked with them, you know) can quickly become unmanageable once a few callbacks themselves have callbacks. You get into what is know as “Callback Hell” which makes your code unreadable, hard to debug, and even harder to compose effectively. The image below shows what callback hell might look like…

So how about our other option? Java Futures? Let’s compare them to RxJava and see how they are different.

Futures

Futures were introduced in Java 5. They are objects that promise to hold the result of something that (maybe) hasn’t occurred yet. They are created, for example, when a task (i.e Runnable orCallable) is submitted to an executor. The caller can use the the future to check whether the task.isDone(), or wait for it to finish using get(). Example:

/**
* A task that sleeps for a second, then returns 1
**/
public static class MyCallable implements Callable<Integer> {
@Override
public Integer call() throws Exception {
Thread.sleep(1000);
return 1;
}
}
public static void main(String[] args) throws Exception{
ExecutorService exec = Executors.newSingleThreadExecutor();
Future<Integer> f = exec.submit(new MyCallable());
System.out.println(f.isDone()); //False
System.out.println(f.get()); //Waits until the task is done(blocking!), then prints 1
}

CompletableFutures

CompletableFutures were introduced in Java 8, and are in fact an evolution of regular Futures inspired by Google’s Listenable Futures. They are Futures that also allow you to string tasks together in a chain. You can use them to tell some worker thread to “go do some task X, and when you’re done, go do this other thing using the result of X”. Here’s a simple example:

/**
* A supplier that sleeps for a second, and then returns one
**/
public static class MySupplier implements Supplier<Integer> {
@Override
public Integer get() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
return 1;
}
}
/**
* A function that adds one to a given Integer
**/
public static class PlusOne implements Function<Integer, Integer> {
@Override
public Integer apply(Integer x) {
return x + 1;
}
}
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newSingleThreadExecutor();
CompletableFuture<Integer> f = CompletableFuture.supplyAsync(newMySupplier(), exec);
System.out.println(f.isDone()); // False
CompletableFuture<Integer> f2 = f.thenApply(new PlusOne());
System.out.println(f2.get()); // Waits until the "calculation" is done, then prints 2
}

RxJava

RxJava, as we’ve seen above, is a whole library for reactive programming. Its main advantage over futures is that RxJava works on streams of zero or more items including never-ending streams with an infinite number of items. It can do so asynchronously and with little or no thread blocking. Futures, on the other hand, are single use. Each holds a single “future” result and that’s it. RxJava has a very rich collection of operators and is extremely flexible.

The Observable and Subscriber are independent of the transformational steps in between them.

I can stick as many map() calls as I want in between the original source Observable and its ultimate Subscriber. The system is highly composable: it is easy to manipulate the data. It’s all about the operators in RxJava that give you so much power.

So here is a code example that does a ton of work in such a concise, simple, and readable way. Try to achieve this using Futures or callbacks!

// This is a more advanced query that will query the DB, get us back // URLs (endpoints to other async services),
// get the title from them, filter them where the title isn't null, // take the first 5 we get and preform a save to
// another endpoint all in one composed stream. Pretty amazing. And // this is still just scratching the surface of what
// you can do once you learn all of the operators.

query("SELECT url FROM EnpointsTable")
.flatMap(urls -> Observable.from(urls))
.flatMap(url -> getTitle(url))
.filter(title -> title != null)
.take(5)
.doOnNext(title -> saveTitle(title))
.subscribe(title -> System.out.println(title));

// Remember that all of this is deferred until there is a subscriber

So the next thing you are going to want to do in your “Learning RxJava Journey” is to start looking at all the operators that you have access to. I won’t lie. Some of them can be complicated to understand at first and that is why the great people behind Reactive Programming have come up with a great visual aid to help you understand what each operator is capable of doing. These visual guides are known as Marble Diagrams and they go something like this:

To give you an idea of some of the most popular operators you will be using all the time, look at the marble diagrams for them below:

There is so much to learn in RxJava but I hope this quick introduction gets you excited about this technology and makes your life easier once you begin using it! Always keep in mind what RxJava is used for and what it is not. At the heart of it all, RxJava is a library for composing asynchronous and event-based programs by using observable sequences. That’s all I have for you today.

I hope you enjoyed this post, and if you did, please share it! That is the biggest compliment I can get as a blogger. Also, if you want to be notified about any future posts of mine, please subscribe to my blog at jasonroell.com. Have a great day!

Sources:

youtube.com/watch?v=Dk8cR1Kxj0Y, youtube.com/watch?v=QOR69q1e63Y, blog.danlew.net/2014/09/15/grokking-rxjava-part-1/, msdn.microsoft.com/en-us/library/mt693186.aspx

AndroidPub
AndroidPub

Published in AndroidPub

The (retired) Pub(lication) for Android & Tech, focused on Development

Jason Roell
Jason Roell

Written by Jason Roell

AI Practitioner/Software Engineer (jasonroell.com) looking for the hardest problems to solve. (https://www.linkedin.com/in/jason-roell-47830817/)

Responses (1)

Write a response