# Filtering Sequences

In addition to transformation, filtering is another important operation when working with sequences. In this lesson, we’re going to review the following operators:

## filter

If you want to filter the elements of a sequence based on a determined condition, the `filter` operator must be your first choice. It works exactly the same as the filter method from the Stream API:

``````// For Mono
Mono<T> filter(Predicate<? super T> tester)

// For Flux
Flux<T> filter(Predicate<? super T> p)
``````

This operator returns a `Mono` or a `Flux` with the elements that match the given Predicate, a functional interface with the following method:

``````@FunctionalInterface
public interface Predicate<T> {
boolean test(T var1);
// ...
}
``````

Take a look at the marble diagram of this operator for `Flux`:

Here’s an example:

``````Flux<Integer> integerFlux =
Flux.just(1, 2, 3, 4, 5, 6);

Flux<Integer> filteredFlux =
integerFlux.filter(i -> i % 2 == 0);

filteredFlux.subscribe(System.out::println);
``````

If you run it, it will print the even values of the original `Flux`:

``````2
4
6
``````

The odd values, the values that didn’t match the filter, are discarded.

If we change the predicate so none of the elements match it:

``````Flux<Integer> integerFlux =
Flux.just(1, 2, 3, 4, 5, 6);

Flux<Integer> filteredFlux =
integerFlux.filter(i -> i > 100);

filteredFlux.subscribe(System.out::println);
``````

Nothing will be printed.

In this case:

• `filteredFlux` completes without emitting values.
• Its type is still `Flux<Integer>`, not `Flux<Void>` or `Mono<Void>`.

However, this operator can also discard elements upon cancellation or due to an error.

## filterWhen

Sometimes, you’d want to perform the predicate test asynchronously. For this cases, we have the `filterWhen` operator.

If you think of `filter` as a specialization of the `map` operator (both take a synchronous function and return a `Publisher`), you can think of `filterWhen` as a specialization of `flatMap`.

Here’s the definition of the `filterWhen` operator:

``````// For Mono
Mono<T> filterWhen(
Function<? super T,? extends Publisher<Boolean>> asyncPredicate
)

// For Flux
Flux<T> filterWhen(
Function<? super T,? extends Publisher<Boolean>> asyncPredicate
)
// bufferSize is the maximum expected number of values to hold
// pending a result of their respective asynchronous test,
// rounded to the next power of two.
Flux<T> filterWhen(
Function<? super T,? extends Publisher<Boolean>> asyncPredicate,
int bufferSize
)
``````

As you can see, `filterWhen` filters elements based on a `Publisher<Boolean>` rather than a `Predicate`.

Here’s the example that filters even numbers adapted to `filterWhen`:

``````Flux<Integer> integerFlux =
Flux.just(1, 2, 3, 4, 5, 6);

integerFlux
.filterWhen(i -> Mono.just(i % 2 == 0))
.subscribe(System.out::println);
``````

The result is the same as before:

``````2
4
6
``````

The function generates a `Publisher` of `Boolean` to test each value asynchronously. The element from the `Flux` or `Mono` is replayed if the value emitted by the `Publisher<Boolean>` is `true`. If the emitted value is `false` or empty, the element is dropped.

For example, the following code will print nothing:

``````Flux<Integer> integerFlux =
Flux.just(1, 2, 3, 4, 5, 6);

integerFlux
.filterWhen(i -> Mono.just(false))
//.filterWhen(i -> Mono.empty()) // Same as false
.subscribe(System.out::println);
``````

Notice that the function allows us to also return a `Flux`, however, only the first value of the test publisher is considered. If the function returns a `Flux` with more than one value, the test will be canceled after receiving the first value.

Take the following code for example:

``````Flux<Integer> integerFlux =
Flux.just(1, 2, 3, 4, 5, 6);

integerFlux
.filterWhen(i -> Flux.just(i < 6, i % 2 == 0))
.subscribe(System.out::println);
``````

This is the result:

``````1
2
3
4
5
``````

As you can see, only the first boolean expression of the `Flux` (`i < 6`) is used to filter the elements of `integerFlux`.

## distinct

As the name implies, this operator ignores duplicates. It only exists for `Flux`:

``````Flux<T> distinct()
``````

This operator tracks elements that have appeared before in the sequence to filter out duplicates.

Here’s the marble diagram of this operator:

If you run the following example:

``````Flux<Integer> integerFlux =
Flux.just(1, 2, 3, 1, 4, 2);

integerFlux
.distinct()
.subscribe(System.out::println);
``````

This is the result:

``````1
2
3
4
``````

Since values `1` and `2` are duplicated, they are only included once.

To track elements, this operator adds the elements to a HashSet.

However, there are more versions of `distinct`.

For example, for each `Subscriber`, the following version tracks elements from the `Flux` that have been seen and filter out duplicates as compared by a key extracted through the user provided `Function`:

``````<V> Flux<T> distinct(
Function<? super T,? extends V> keySelector
)
``````

For each `Subscriber`, this other version tracks elements from the `Flux` that have been seen and filters out duplicates, as compared by a key extracted through the user provided `Function` and by the add method of the `Collection` supplied (typically a `Set`):

``````<V,C extends Collection<? super V>> Flux<T> distinct(
Function<? super T,? extends V> keySelector,
Supplier<C> distinctCollectionSupplier
)
``````

And the following version, that tracks elements from the `Flux` that have been seen and filters out duplicates, as compared by applying a BiPredicate that should typically add the key to the arbitrary store for further comparison. It also takes a function to extract the key, an arbitrary user-supplied `<C>` store, and a cleanup callback to be invoked on the store upon termination of the sequence:

``````<V,C> Flux<T> distinct(
Function<? super T,? extends V> keySelector,
Supplier<C> distinctStoreSupplier,
BiPredicate<C,V> distinctPredicate,
Consumer<C> cleanup
)
``````

## distinctUntilChanged

This is a variation of the previous operator. Instead of ignoring duplicates, `distinctUntilChanged` ignores subsequent repetitions of an element (assuming they arrive one after another).

Here’s the definition of this operator:

``````Flux<T> distinctUntilChanged()
``````

And its marble diagram:

For example, considering the following `Flux`:

``````Flux<Integer> integerFlux =
Flux.just(1, 1, 1, 2, 3, 3, 4, 2);

integerFlux
.distinctUntilChanged()
.subscribe(System.out::println);
``````

This will be the result:

``````1
2
3
4
2
``````

As you can see, `1` and `3` are repeated three and two times in a row respectively, but they are printed only once. However, notice that `2` is printed two times because this value doesn’t repeat one after the other.

And just like `distinct`, this operator also has versions that allow you to customize how the function to compare keys from the values to detect duplicates and provide a `BiPredicate` to test elements for distinction:

``````Flux<T> distinctUntilChanged(
Function<? super T,? extends V> keySelector
)
Flux<T> distinctUntilChanged(
Function<? super T,? extends V> keySelector,
BiPredicate<? super V,? super V> keyComparator
)
``````

## take

Now it’s the turn of the `take` operator and its variations. In general, these operators allow you to take certain elements from the sequences until a certain condition is fulfilled.

For `Mono`, only three versions of `take` make sense and they have to do with time.

The first one gives the `Mono` a chance to resolve within a specified time frame but completes (without error) if it doesn’t:

``````Mono<T> take(Duration duration)
``````

The second one gives the `Mono` a chance to resolve within a specified time frame but completes (without error) if it doesn’t. The timeframe is evaluated using the provided `Scheduler`:

``````Mono<T> take(Duration duration, Scheduler timer)
``````

And the third one gives the `Mono` a chance to resolve before the given `Publisher` emits a value. After that, the `Mono` completes without error:

``````Mono<T> takeUntilOther(Publisher<?> other)
``````

`Flux` has more variations of `take`.

For instance, to relay values from the `Flux` until the specified `Duration` elapses (optionally, the timeframe is evaluated using a provided `Scheduler`):

``````Flux<T> take(Duration timespan)
Flux<T> take(Duration timespan, Scheduler timer)
``````

To take only the first `N` values from the `Flux`, if available. If `n` is zero, the source is subscribed to but immediately cancelled, then the operator completes:

``````Flux<T> take(long n)
``````

Another version of this method takes an additional `limitRequest` parameter:

``````Flux<T> take(
long n,
boolean limitRequest
)
``````

Here are the rules for this parameter:

• If `limitRequest` is `true`, it ensures that the total amount requested upstream is capped at `n`.
• If `n` is zero, the source isn’t even subscribed to and the operator completes immediately upon subscription.
• If `limitRequest` is `false`, this operator doesn’t propagate the backpressure requested amount. Rather, it makes an unbounded request and cancels once `N` elements have been emitted.
• If `n` is zero, the source is subscribed to but immediately cancelled, then the operator completes.

The `takeUntilOther` method gives the `Flux` a chance to resolve before the given `Publisher` emits a value:

``````Flux<T> takeUntilOther(
Publisher<?> other
)
``````

The `takeLast` method emits the last `N` values the `Flux` emitted before its completion:

``````Flux<T> takeLast(int n)
``````

The `takeUntil` method relays values until the given `Predicate` matches (this includes the matching data):

``````Flux<T> takeUntil(
Predicate<? super T> predicate
)
``````

And the `takeWhile` method relays values while a `Predicate` returns `true` (checked before each value is delivered and only including the matching data):

``````Flux<T> takeWhile(
Predicate<? super T> continuePredicate
)
``````

Let me show you some examples.

The simpler version is the one that takes the number of elements you want from the `Flux`:

``````Flux<Integer> integerFlux =
Flux.just(1, 2, 3, 4, 5);

integerFlux
.take(2)
.subscribe(System.out::println);
``````

Here we are requesting the first two elements from the `Flux`, so the result shouldn’t be a surprise:

``````1
2
``````

But we can also specify a time frame to take elements until the time elapses:

``````Flux<Integer> integerFlux =
Flux.just(1, 2, 3, 4, 5);

integerFlux
.delayElements(Duration.ofMillis(1))
.take(Duration.ofMillis(10))
.subscribe(System.out::println);

try {
} catch (InterruptedException e) {
e.printStackTrace();
}
``````

In the above example, the `Flux` publishes elements with a delay on millisecond (with `delayElements(Duration.ofMillis(1))`) so we can take elements for ten milliseconds (with `take(Duration.ofMillis(10))`). This happens so fast that we have to give time for the program to execute (remember, we’re working asynchronously). That’s why we need to sleep the program for a little while.

The result may vary between executions. Sometimes, ten milliseconds will be enough to take all the elements from the `Flux`, while other times, the program will only take two or three elements during that period.

One thing to take into account is that the time frame starts when we subscribe to the `Publisher`. If you take a look at the marble diagram of this operator:

You’ll see that the time of subscription and the time the operator starts executing is different.

This also means that if the duration is zero, the resulting `Flux` completes as soon as the `Flux` emits its first value, but this is not propagated, though.

Next, here’s an example for `takeUntilOther`:

``````Flux<Integer> integerFlux =
Flux.just(1, 2, 3, 4, 5);

integerFlux
.delayElements(Duration.ofMillis(10))
.takeUntilOther(
Mono.just(10)
.delayElement(Duration.ofMillis(5))
)
.subscribe(System.out::println);

try {
} catch (InterruptedException e) {
e.printStackTrace();
}
``````

In this case, nothing is printed. Each element is emitted every ten milliseconds but the `Mono` provided to `takeUntilOther` emits a value after five milliseconds, so, at this time, the `Flux` is canceled and nothing is emitted.

Now, about `takeUntil` and `takeWhile`, the difference between these operators may be confusing.

On the one hand, `takeUntil` propagates elements until the given predicate doesn’t return `true`.

On the other hand, `takeWhile` propagates elements while the given predicate returns `true`.

Consider this example for `takeUntil`:

``````Flux<Integer> integerFlux =
Flux.just(1, 2, 3, 4, 5);

integerFlux
.takeUntil(i -> i == 3)
.subscribe(System.out::println);
``````

Here’s the result:

``````1
2
3
``````

`1` is not equal to `3` so this value is emitted.

`2` is not equal to `3` so this value is emitted.

`3` is equal to `3`, so this value is emitted and, at this point, the `Flux` is cancelled and no more elements are emitted.

And for the `takeWhile` operator, consider the following example:

``````Flux<Integer> integerFlux =
Flux.just(1, 2, 3, 4, 5);

integerFlux
.takeWhile(i -> i < 4)
.subscribe(System.out::println);
``````

Here’s the result:

``````1
2
3
``````

`1` is less than `4` so this value is emitted.

`2` is less than `4` so this value is emitted.

`3` is less than `4` so this value is emitted.

`4` is not less than `4`, so this value is not emitted. Only matching values are emitted because the check is performed before emitting the value. At this point, the `Flux` is cancelled and no more elements are emitted.

## skip

You can think of this operator as the opposite of `take`. It allows you to skip elements from a sequence until a certain condition is fulfilled. The skipped elements are discarded, whereas the remaining elements are emitted.

This operator is only available for `Flux` (it wouldn’t make any sense to skip the only element a `Mono` emits). Like `take`, it has versions that take a `Duration`, another `Publisher`, the number of elements to skip, and a `Predicate`.

For example, use this version to skip elements from the `Flux` emitted within the specified initial duration:

``````Flux<T> skip(Duration timespan)
``````

Use this version to skip elements from the `Flux` emitted within the specified initial duration, as measured on the provided `Scheduler`:

``````Flux<T> skip(Duration timespan, Scheduler timer)
``````

This version to skip the specified number of elements from the beginning of the `Flux`, then emit the remaining elements:

``````Flux<T> skip(long skipped)
``````

Use `skipUntilOther` to skip values from the `Flux` until a specified `Publisher` emits a value:

``````Flux<T> skipUntilOther(Publisher<?> other)
``````

Use `skipLast` to skip a specified number of elements at the end of the `Flux` sequence:

``````Flux<T> skipLast(int n)
``````

Use `skipUntil` to skip values from the `Flux` until a `Predicate` returns `true` for the value (the resulting `Flux` will include and emit the matching value):

``````Flux<T> skipUntil(Predicate<? super T> untilPredicate)
``````

And use `skipWhile` to skip values from the `Flux` while a `Predicate` returns `true` for the value:

``````Flux<T> skipWhile(Predicate<? super T> skipPredicate)
``````

For instance, this is the marble diagram for the version that takes the number of elements to skip:

Let’s review some code examples.

Starting with the version that takes the number of elements to skip:

``````Flux<Integer> integerFlux =
Flux.just(1, 2, 3, 4, 5);

integerFlux
.skip(2)
.subscribe(System.out::println);
``````

In this case, the above example will skip the first two elements of the sequence:

``````3
4
5
``````

For the version that takes a `Duration` object:

``````Flux<Integer> integerFlux =
Flux.just(1, 2, 3, 4, 5);

integerFlux
.delayElements(Duration.ofMillis(1))
.skip(Duration.ofMillis(5))
.subscribe(System.out::println);

try {
} catch (InterruptedException e) {
e.printStackTrace();
}
``````

The results may vary between executions, but the above example skips elements during five milliseconds, printing, most of the time, the last three or two elements:

``````3
4
5
``````

Just like some of the examples of `take`, we have to sleep the program for a little while to give time to complete its execution.

Now, consider the following example:

``````Flux<Integer> integerFlux =
Flux.just(1, 2, 3, 4, 5);

integerFlux
.delayElements(Duration.ofMillis(10))
.skipUntilOther(
Mono.just(10).delayElement(Duration.ofMillis(5))
)
.subscribe(System.out::println);

try {
} catch (InterruptedException e) {
e.printStackTrace();
}
``````

With `skipUntilOther`, the above example specifies that it should skip elements for five milliseconds, however, each element is emitted every ten milliseconds, so none of the elements are skipped. This is the result:

``````1
2
3
4
5
``````

Now, the difference between `skipUntil` and `skipWhile` is similar to the difference between `takeUntil` and `takeWhile`.

`skipUntil` skips values until a `Predicate` returns `true`, including and emitting the matching value.

Here’s an example:

``````Flux<Integer> integerFlux =
Flux.just(1, 2, 3, 4, 5);

integerFlux
.skipUntil(i -> i == 3)
.subscribe(System.out::println);
``````

This is the result:

``````3
4
5
``````

`1` is not equal to `3` so this value is skipped.

`2` is not equal to `3` so this value is skipped.

`3` is equal to `3`, so this and all the subsequent values are emitted.

Then we have `skipWhile`, which skips values while a `Predicate` returns `true`.

Here’s an example:

``````Flux<Integer> integerFlux =
Flux.just(1, 2, 3, 4, 5);

integerFlux
.skipWhile(i -> i < 4)
.subscribe(System.out::println);
``````

This is the result:

``````4
5
``````

`1` is less than `4` so this value is skipped.

`2` is less than `4` so this value is skipped.

`3` is less than `4` so this value is skipped.

`4` is not less than `4`, so this and all subsequent values are emitted.