Выбрать главу

   onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)

[main] INFO reactor.Flux.SubscribeOn.1 - request(32)

[main] INFO reactor.Flux.SubscribeOn.2 -

   onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)

[main] INFO reactor.Flux.SubscribeOn.2 - request(32)

[parallel-1] INFO reactor.Flux.SubscribeOn.1 - onNext(APPLE)

[parallel-2] INFO reactor.Flux.SubscribeOn.2 - onNext(KIWI)

[parallel-1] INFO reactor.Flux.SubscribeOn.1 - onNext(ORANGE)

[parallel-2] INFO reactor.Flux.SubscribeOn.2 - onNext(STRAWBERRY)

[parallel-1] INFO reactor.Flux.SubscribeOn.1 - onNext(BANANA)

[parallel-1] INFO reactor.Flux.SubscribeOn.1 - onComplete()

[parallel-2] INFO reactor.Flux.SubscribeOn.2 - onComplete()

Как ясно видно из записей журнала, фрукты в первом буфере (apple, orange и banana) обрабатываются в потоке parallel-1. Между тем, фрукты во втором буфере (kiwi и strawberry) обрабатываются в parallel-2. Как видно из того факта, что записи журнала из каждого буфера сплетены вместе, два буфера обрабатываются параллельно.

Если по какой-то причине вам нужно собрать все, что Flux генерирует в List, вы можете вызвать buffer() без аргументов:

Flux<List<String>> bufferedFlux = fruitFlux.buffer();

В результате создается новый Flux, который генерирует List, содержащий все элементы, опубликованные исходным Flux. Вы можете добиться того же самого с помощью операции collectList(), показанной на диаграмме на рисунке 10.19.

Рисунок 10.19 В результате операции сбора списка получается Mono, содержащий список всех сообщений, отправляемых входящим Flux.

Вместо того чтобы создавать Flux, который публикует List, collectList() создает Mono, который публикует List. Следующий метод тестирования показывает, как это можно использовать:

@Test

public void collectList() {

   Flux<String> fruitFlux = Flux.just(

      "apple", "orange", "banana", "kiwi", "strawberry");

   Mono<List<String>> fruitListMono = fruitFlux.collectList();

   StepVerifier

      .create(fruitListMono)

      .expectNext(Arrays.asList(

      "apple", "orange", "banana", "kiwi", "strawberry"))

      .verifyComplete();

}

Еще более интересный способ сбора элементов, возвращаемых Flux, - это собирать их в Map. Как показано на рисунке 10.20, операция collectMap() приводит к Mono, который публикует Map, заполненную записями, ключ которых рассчитывается данной функцией.

Рис. 10.20. Операция collectMap приводит к получению Mono, содержащему Map сообщений, передаваемым входящим Flux, где ключ выводится из некоторой характеристики входящих сообщений.

Чтобы увидеть collectMap() в действии, взгляните на следующий метод тестирования:

@Test

public void collectMap() {

   Flux<String> animalFlux = Flux.just(

      "aardvark", "elephant", "koala", "eagle", "kangaroo");

   Mono<Map<Character, String>> animalMapMono =

      animalFlux.collectMap(a -> a.charAt(0));

       StepVerifier

        .create(animalMapMono)

        .expectNextMatches(map -> {

         return

            map.size() == 3 &&

            map.get('a').equals("aardvark") &&

            map.get('e').equals("eagle") &&

            map.get('k').equals("kangaroo");

      })

   .verifyComplete();

}

Источник Flux испускает имена нескольких животных. К этому Flux вы применяете collectMap() для создания нового Mono, который создает Map, где значение ключа определяется первой буквой имени животного, а значение-само имя животного. В случае, если два названия животных начинаются с одной и той же буквы (например, elephant и eagle или koala и kangaroo), последняя запись, проходящая через поток, переопределяет все предыдущие записи.

10.3.4 Выполнение логических операций над реактивными типами

Иногда вам просто нужно знать, соответствуют ли записи, опубликованные Mono или Flux, некоторым критериям. Операции all() и any() выполняют такую логику. Рисунки 10.21 и 10.22 иллюстрируют, как работают all() и any().

Рисунок 10.21 Поток может быть проверен, чтобы убедиться, что все сообщения удовлетворяют некоторому условию в операции all.

Рисунок 10.22 поток может быть проверен, что по крайней мере одно сообщение удовлетворяет некоторому условию any операции.

Предположим, вы хотите знать, что каждая строка, публикуемая Flux, содержит букву a или букву k. Следующий тест показывает, как использовать all() для проверки этого условия:

@Test

public void all() {

   Flux<String> animalFlux = Flux.just(

      "aardvark", "elephant", "koala", "eagle", "kangaroo");

   Mono<Boolean> hasAMono = animalFlux.all(a -> a.contains("a"));

   StepVerifier.create(hasAMono)

      .expectNext(true)

      .verifyComplete();

   Mono<Boolean> hasKMono = animalFlux.all(a -> a.contains("k"));

   StepVerifier.create(hasKMono)

      .expectNext(false)

      .verifyComplete();

}

В первом StepVerifier, проверяется наличие буквы a. Операция all применяется к исходному Flux, в результате чего получается Mono типа Boolean. В этом случае все названия животных содержат букву а, поэтому Mono будет содержать true. Но на втором этапе проверки результирующий Mono будет выдавать false, потому что не все имена животных содержат k.

Вместо того, чтобы выполнять проверку "все или ничего", Возможно, будет достаточно, если хотя бы одна запись соответствует условиям. В этом случае операция any() - это то, что вы хотите. Этот новый тестовый случай использует any() для проверки букв t и z: