Exercises
In these exercises, you’ll practice some of the concepts taught in this module.
First, either create a new Java project, adding the reactor-bom
and reactor-core
dependencies to your build file (Maven or Gradle) or use the stub you can find at: https://github.com/eh3rrera/project-reactor-course/tree/main/06/before/reactor-demo-exercises.
I’ll give you the instructions (and sometimes hints) so you can put all the code together in the main
method of a class and observe the output.
Here you can find the solution for the coding exercises: https://github.com/eh3rrera/project-reactor-course/tree/main/06/after/reactor-demo-exercises.
Exercise 1
In this exercise, you’ll create a (simulated) web scraping application that fetches the content of multiple URLs using more than one thread with a Scheduler.boundedElastic()
scheduler.
- Given the following code:
import reactor.core.publisher.Flux; import reactor.core.scheduler.Schedulers; public class Exercise01 { public static void main(String[] args) throws InterruptedException { Flux<String> urlFlux = Flux.just("url1", "url2", "url3", "url4"); // TODO: Change the threading context and apply the fetchAndCountWords method Flux<Integer> wordCountFlux = null; // TODO: Subscribe to the Flux and print the emitted elements Thread.sleep(4000); } // Simulates fetching the content of a URL and counting the number of words public static int fetchAndCountWords(String url) { int random = (int)(Math.random() * 500 + 100); System.out.println("Word count for " + url + ": " + random); return random; } }
- Use a
Schedulers.boundedElastic()
to change the threading context and apply thefetchAndCountWords
method. Assign the resultingFlux
towordCountFlux
. - Subscribe to the
Flux
, printing the emitted elements with the messageWord count: {wordCount}
. - Run the
Exercise01
class and analyze the output.
Exercise 2
In the following exercise, you’ll create a Flux
that emits a sequence of integers, simulating a fast publisher. Then, you will change the threading context to use a slow consumer that processes the emitted integers with a delay.
- Given the following code:
import reactor.core.publisher.Flux; public class Exercise02 { public static void main(String[] args) throws InterruptedException { Flux<Integer> fastPublisher = Flux.range(1, 5); // TODO: Change the threading context and apply the slowConsumer method Flux<Integer> processedFlux = null; // TODO: Subscribe to the Flux and print the emitted elements Thread.sleep(15000); } public static Integer slowConsumer(Integer value) { try { // Simulate a slow consumer by adding a 1-second delay Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return value; } }
- Use
Schedulers.single()
to change the threading context and apply theslowConsumer
method. Assign the resultingFlux
toprocessedFlux
. - Subscribe to the
Flux
, printing the emitted elements. - Run the
Exercise02
class and analyze the output.
Exercise 3
In this exercise, you will practice using the subscribeOn
method with a Flux
to handle a slow publisher, fast consumer scenario.
- Given the following code:
import reactor.core.publisher.Flux; public class Exercise03 { public static void main(String[] args) throws InterruptedException { // TODO: Call the method slowPublisher and // TODO: Use subscribeOn with a new bounding elastic scheduler Flux<Integer> processedFlux = null; // TODO: Subscribe to processedFlux passing the fastConsumer method Thread.sleep(11000); } private static Flux<Integer> slowPublisher() { return Flux.create(sink -> { for (int i = 1; i <= 10; i++) { try { // Simulate blocking IO with a 1-second delay Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } sink.next(i); } sink.complete(); }); } public static void fastConsumer(Integer value) { System.out.println("Received: " + value); } }
- Replace the
null
value assigned toprocessedFlux
with a call the methodslowPublisher
. - Next, use
subscribeOn
to change the threading context to a new bounding elastic scheduler with the following parameters:- threadCap:
2
- queuedTaskCap:
2
- name:
bounded-elastic
- ttlSeconds:
30
- daemon:
true
- threadCap:
- Subscribe to
processedFlux
, using thefastConsumer
method to print the emitted value. - Run the
Exercise03
class and analyze the output.
Exercise 4
In this exercise, you’ll create a publisher that emits a sequence of integers, changing the threading context for processing the items, and then changing the threading context for subscribing to the publisher.
- Given the following code:
import reactor.core.publisher.Flux; public class Exercise05 { public static void main(String[] args) throws InterruptedException { Flux<Integer> fluxRange = Flux.range(1, 10); // TODO: Use a parallel scheduler to change the threading context // TODO: Apply the processInteger method // TODO: Change the threading context for subscribing Flux<Integer> processedFlux = null; processedFlux.subscribe(value -> System.out.println("Received: " + value)); Thread.sleep(11000); } public static Integer processingFunction(Integer value) { try { // Simulate a task with a 500ms delay Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } return value * 2; } }
- Using
fluxRange
, utilize a parallel scheduler to change the threading context. - Apply the
processInteger
method. - Then, change the threading context for subscribing using
Schedulers.single()
. - Run the
Exercise04
class and analyze the output.
Exercise 5
In this exercise, you’ll create a Flux
that emits a sequence of integers from 1
to 10
, using a ParallelFlux to divide the data into multiple rails and perform the work in parallel.
- Given the following code:
import reactor.core.publisher.Flux; public class Exercise05 { public static void main(String[] args) throws InterruptedException { Flux<Integer> fluxRange = Flux.range(1, 10); // TODO: Create a ParallelFlux with 4 rails // TODO: Use Schedulers.parallel() to run the work in parallel // TODO: Apply the processingFunction method ParallelFlux<Integer> parallelFlux = null; parallelFlux.subscribe(value -> System.out.println("Received: " + value)); Thread.sleep(10000); } public static Integer processingFunction(Integer value) { try { // Simulate a task with a 500ms delay Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } return value * 2; } }
- Using
fluxRange
, create aParallelFlux
with4
rails. - Use
Schedulers.parallel()
to run the work in parallel. - Apply the
processingFunction
method. - Run the
Exercise05
class and analyze the output.