Larry Wall, the creator of Perl, once said there are three virtues of a great developer: laziness, impatience, and hubris. Laziness makes us write programs to help with tedious, labor-intensive tasks. Impatience makes us write programs that are responsive and anticipate your needs. Hubris provides us with unwarranted confidence to take on a Herculean task and just enough pride to write (and maintain) code others won’t laugh at.
Given my particular personality quirks, impatience and laziness come easily for me. I want responsive applications with as little heavy lifting as possible. Throughout the 1990s and early 2000s, this wasn’t too difficult. Software developers could rely on “lazy” optimization. If your application was too slow, you just needed to wait for newer, faster hardware. Since the early 1970s, chip performance has reliably doubled every 18 months. Today, though computer performance continues to increase at a similar rate, these gains are achieved by increasing the number of cores per chip rather than the number of transistors in a single CPU.
To take advantage of today’s multi-core architectures, we must write software differently. Our applications should be able to manage multiple concurrent tasks and divide large tasks into smaller segments that can run in parallel. The good news is that Java has built-in language elements to help us write multi-threaded applications. However, despite the claims of numerous books and websites, writing multi-threaded code that works as expected and actually improves performance is no simple task.
SO WHAT’S THE PROBLEM?
When tasked with writing a concurrent application in Java, we usually begin with Java’s “low-level” primitives: threads, locks, and synchronized, shared memory. Java’s threads and locks provide us with a very powerful concurrency model that can be applied to a wide range of problems. However, it’s also very easy to write code with hidden bugs.
In fact, the real problem with this style of concurrency is not that the code is hard to write, it’s that the code is really hard to test. Consider the solution to the Five Dining Philosophers problem below (this example is based on code from the excellent book Seven Concurrency Models in Seven Weeks by Paul Butcher). This program will eventually deadlock, but it may run for days or even weeks before the deadlock appears. How do you write a test for a bug that only occurs once every few weeks or only on certain hardware running a particular JVM?
package examples;
import java.util.Random;
class Philosopher extends Thread {
private Chopstick left, right;
private Random random;
private int thinkCount;
public Philosopher(Chopstick left, Chopstick right) {
this.left = left;
this.right = right;
random = new Random();
}
public void run() {
try {
while (true) {
++thinkCount;
if (thinkCount % 10 == 0)
System.out.println("Philosopher " + this + " has thought " + thinkCount + " times");
Thread.sleep(random.nextInt(1000));
synchronized (left) {
synchronized (right) {
Thread.sleep(random.nextInt(1000));
}
}
}
} catch (InterruptedException e) {
}}
public static void main(String[] args) {
Philosopher[] philosophers = new Philosopher[5];
Chopstick[] chopsticks = new Chopstick[5];
for (int i = 0; i < 5; ++i)
chopsticks[i] = new Chopstick(i);
for (int i = 0; i < 5; ++i) {
philosophers[i] = new Philosopher(chopsticks[i], chopsticks[(i + 1) % 5]);
philosophers[i].start();
}
try {
for (int i = 0; i < 5; ++i) {
philosophers[i].join();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}}
class Chopstick {
private int id;
public Chopstick(int id) {
this.id = id;
}
public int getId() {
return id;
}}
There are patterns that, if carefully followed, will help you avoid the most common threading pitfalls. However, the best way to avoid threading and locking problems is to avoid shared mutable state. Almost all of the problems we encounter using Java’s threading primitives are related to shared mutable state. There are several alternative concurrency models that are designed specifically to avoid these problems. For the rest of this blog, I’ll examine a style of concurrency that combines patterns commonly used in event-driven applications with constructs from functional programming that eliminate shared mutable state. This model has recently been called “Reactive Programming” (see the Reactive Manifesto). I’ll begin by looking at the common traits of the reactive model, and then I’ll move to a particular implementation called RxJava.
THE REACTIVE SHIFT
Compared to the traditional approach of multiple threads, which communicate through shared, synchronized state, a reactive application is composed of loosely coupled event handlers. The reactive approach emphasizes scalability that’s resilient, fault tolerant, and quick to react to stimuli. The name “reactive” comes from the four common traits of the reactive model:
1. React to events: Events, represented as messages, are sent and received by asynchronous, non-blocking components. These non-blocking components are used in every layer of the application, from the UI request and the web layer, to the middle-tier service components, down to the database.
2. React to changing load: Components should be loosely coupled, location independent, and should communicate via message-passing structures. This makes it much easier to scale up (adding more instances of a component on a multi-core node) and scale out (adding more nodes to a cluster).
3. React to failures: Resiliency must be built into the design from the beginning. Component failure is expected and managed through component isolation. Loosely coupled components, with strongly encapsulated state, are managed by observers/supervisors which prevent cascading failures.
4. React to users: Responsiveness is largely a result of getting the three other traits correct.
To achieve reactive nirvana, RxJava suggests the use of an “observable” model. This simple, powerful abstraction makes it easy to build non-blocking, loosely-coupled, resilient components.
OBSERVABLE VS. ITERABLE
If you’re a Java developer, you’ve probably written code similar to this:
List<Movie> movies = service.getMovies(customerId); // block and wait
List<MovieInfo> bigMovies = new ArrayList<Movie>();
for(Movie movie : movies) {
if(movie.getBoxOfficeTotal() > 20000000.00) { //filter
MovieInfo movieInfo = new MovieInfo(movie.getTitle());
movieInfo.setDirector(service.getDirector(movie.getTitle())); // block and wait
bigMovies.add(movieInfo);
}}
You call a service. Wait. Receive a collection from the service. Iterate over the collection. Filter the results. Call another service for each filtered item. Wait. Put the data you need into a new item. Add the new item to a new collection to return later.
I’ve written code like this many times and, while serviceable, what happens if the collection is very large or the number of concurrent users dramatically increases? If (or when) this happens, the services become slow, and the whole system could grind to a halt. To avoid this, you could try to break up the problem into asynchronous tasks that can run concurrently. Each server would then return a Future rather than a realized collection. This approach may work, but it requires changes to the client’s code, to handle the asynchronous Future. These client changes are a text-book example of “leaky” encapsulation. This model also adds significant complexity. Callbacks must be registered, service timeouts handled, and exception handling, as a whole, becomes more complicated. RxJava’s Observable type provides a solution to these issues. An Observable is similar to an Iterable, except it uses a producer/consumer model in which the producer:
Pushes
data to the consumer when it’s ready
Signals the consumer when it has reached the end of data stream or when an exception has occurred
Keeps its implementation details of encapsulated (i.e., the producer can change from synchronous to asynchronous or even lazy evaluation without impacting the subscriber)
Here’s a simple example of an Oberservable created from a list. The consumer subscribes to the Observable and processes each item in the list with the Action1.call() method.
List<String> stoogeList = Arrays.asList(“Larry”,”Moe”,”Curly”);
Observable<String> stooges = Observable.from(stoogeList);
stooges.subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println("Hello " + s + "!");
}});
…
Hello Larry!
Hello Moe!
Hello Curly!
As you can see from the results, the Observable streams each item in the list to the subscriber. This makes the Observable useful for bounded lists as well as unbounded and infinite streams of data.
ASYNCHRONICITY (OR NOT)
One of the most interesting aspects of the Observable is how it simplifies the interaction between the producer and subscriber, even abstracting away the details of concurrency. Consider the service method below. For each call, it first checks its cache. If the value is cached, it will return immediately. Otherwise, it executes a remote service call to retrieve the value in an asynchronous thread. The concurrency implementation details remain encapsulated in the producer without leaking into the subscriber code. In other words, the subscriber and publisher remain loosely coupled regardless of the implementation.
// Producer
ExecutorService executor = Executors.newFixedThreadPool(10);
public Observable<String> getMovieDirector(Movie movie) {
Observable.OnSubscribe<String> subscription = observer -> {
if(movieCache.contains(movie.getName()) {
observer.onNext(movieCache.get(movie.getName());
} else {
executor.execute(() -> {
try {
String director = (String)getMovieInfo(movie.getName()).get("Director");
observer.onNext(director);
} catch(Exception e){
observer.onError(e);
}
});
}
};
return Observable.create(subscription);}
// Subscriberpublic void getDirector(Movie movie) {
Observable<String> directors = getMovieDirector(movie);
directors.subscribe(new Subscriber<String>() {
@Override
public void onError(Throwable throwable) {
System.out.println("got an error");
}
@Override
public void onNext(String directorInfo) {
System.out.println(directorInfo);
}
@Override
public void onCompleted() {}
});}
EASILY RESILIENT
The subscriber in the code above has changed from the previous example with the addition of onError and onCompleted message handlers. By default, when an Observable encounters an error, the Observable invokes its subscriber’s onError method and then quits. However, the subscriber can easily change this default behavior. In the code below, if an exception occurs in the service, a default value will be returned and the Observable will continue.
Observable<String> b = getData().onErrorReturn(new Func1<Throwable, String>() {
@Override
public String call(Throwable throwable) {
return "data unavailable";
}}));
The subscriber can even handle the exception by unsubscribing from the current Observable and subscribing to an alternative Observable, mid-stream.
Observable<String> b = getData().onErrorResumeNext(getDataAlternative());
Powerful resilience with so little effort; it’s laziness at its best.
COMPOSING AN OBSERVABLE STREAM
We’ve seen how the Observable:
Streams values to the subscriber
Abstracts away the details of concurrency for the subscriber
Simplifies error handling
The real beauty of RxJava and the Observable model can be seen in the composition of monadic structures. In functional programming, a monad is, according to Wikipedia, “a structure that represents computations defined as sequences of steps.” Essentially, monads allow us to construct pipelines to process streams of data. We’ve already seen an example in the onErrorResumeNext and onErrorReturn functions. To further explore this powerful concept, I’ll use a more complete example.
In the code below, the subscriber constructs a pipeline to process a list of movies. When the consumer subscribes to the Observable list of movies, the producer begins to stream movies through the pipeline. For each movie, the pipeline filters out the movies that earned less than $25 million, calls a service to look up the director, then returns a new Observable Map that contains only the movie name and the director name. The subscriber does not block and all concurrency implementation details are encapsulated in the producer/services. This example uses Java 8 lambdas because it makes the code easier to read, but it’s not necessary.
public DeferredResult<String> getMovies() {
Observable<List<Object>> movies = movieService.getMovies()
.filter(movie-> movie.getBoxOffice() > 25000000)
.flatMap(movie -> {
Observable m = Observable.from(movie.getName())
.map(movieName -> { return getMap("movieName", movieName);}); // get movie name
Observable d = movieService.getMovieDirector(movie)
.map(director -> { return getMap("director", director);}); // get director
return Observable.zip(m, d, (movieInfo, directorInfo) -> { //return movie:director Map
((Map)movieInfo).putAll((Map)directorInfo);
return (movieInfo);
});
}).toList(); // combine movie/director Maps into one list;
movies.subscribe(movieInfo -> {
System.out.println(movieInfo);
deferredResult.setResult(gson.toJson(movieInfo));
},
throwable -> {throwable.printStackTrace();
});}
private Map<String,String> getMap(String key, String value) {
Map<String, String> m = new HashMap<String, String>();
m.put(key,value);
return m;}
Here’s the definition (from the RxJava documentation) of each of the high-order functions in the code example:
– Filter: Filters items emitted by an Observable.
– FlatMap: Returns an Observable that emits items based on applying a function that you supply to each item emitted by the source Observable, where that function returns an Observable and then merging those resulting Observables and emitting the results of this merger.
– Map: Returns an Observable that applies a specified function to each item emitted by the source Observable and emits the results of these function applications.
– Zip: Returns an Observable that emits the results of a function of your choosing applied to combinations of two items emitted, in sequence, by two other Observables.
– ToList: Returns an Observable that emits a single item, a list composed of all the items emitted by the source Observable.
READY TO GO REACTIVE
RxJava’s Observable provides a very powerful model for creating reactive programs that can be used for many different use cases, from web services to real-time data stream aggregators. It simplifies publisher/subscriber interactions, makes it easy to write loosely coupled, resilient components, and provides us with powerful functional constructs for composing process pipelines.
Contact Us
Ready to achieve your vision? We're here to help.
We'd love to start a conversation. Fill out the form and we'll connect you with the right person.
Searching for a new career?
View job openings