Выбрать главу

Рисунок 10.14. Входящий Flux может быть отфильтрован так, что полученный Flux получает только сообщения, которые соответствуют заданному предикату.

Чтобы увидеть filter() в действии, рассмотрите следующий метод тестирования:

@Test

public void filter() {

   Flux<String> nationalParkFlux = Flux.just(

         "Yellowstone", "Yosemite", "Grand Canyon",

         "Zion", "Grand Teton")

      .filter(np -> !np.contains(" "));

   StepVerifier.create(nationalParkFlux)

      .expectNext("Yellowstone", "Yosemite", "Zion")

      .verifyComplete();

}

Здесь filter() задается предикатом в виде лямбды, которая принимает только String значения без пробелов. Следовательно, «Grand Canyon» и «Grand Teton» отфильтровываются из итогового Flux.

Возможно, вам нужна фильтрация для всех предметов, которые вы уже получили. Операция Different(), как показано на рисунке 10.15, приводит к тому, что Flux публикует только элементы из исходного потока, которые еще не были опубликованы.

Рисунок 10.15. Операция distinct отфильтровывает любые повторяющиеся сообщения.

В следующем тесте только уникальные String значения будут излучаться из Flux:

@Test

public void distinct() {

   Flux<String> animalFlux = Flux.just(

         "dog", "cat", "bird", "dog", "bird", "anteater")

      .distinct();

   StepVerifier.create(animalFlux)

      .expectNext("dog", "cat", "bird", "anteater")

      .verifyComplete();

}

Хотя «dog» и «bird» публикуются дважды из исходного потока, отдельный поток публикует их только один раз.

МАППИНГ РЕАКТИВНЫХ ДАННЫХ

Одной из наиболее распространенных операций, которые вы будете использовать в Flux или Моно, является преобразование опубликованных элементов в какую-либо другую форму или тип. Типы Reactor предлагают операции map() и flatMap() для этой цели.

Операция map() создает Flux, который просто выполняет преобразование, как предписано данной функцией, для каждого объекта, который он получает до его повторной публикации. На рисунке 10.16 показано, как работает операция map().

Рисунок 10.16. Операция map выполняет преобразование входящих сообщений в новые сообщения в результирующем потоке.

В следующем методе тестирования Flux String значения, представляющие баскетболистов, сопоставляются с новы Flux объектами Player:

@Test

public void map() {

   Flux<Player> playerFlux = Flux

      .just("Michael Jordan", "Scottie Pippen", "Steve Kerr")

      .map(n -> {

         String[] split = n.split("\\s");

         return new Player(split[0], split[1]);

      });

   StepVerifier.create(playerFlux)

      .expectNext(new Player("Michael", "Jordan"))

      .expectNext(new Player("Scottie", "Pippen"))

      .expectNext(new Player("Steve", "Kerr"))

      .verifyComplete();

}

Функция, заданная для map() (как лямбда), разбивает входящую String по пробелу и использует полученный массив String-ов для создания объекта Player. Хотя поток, созданный с помощью just(), переносил объекты String, Flux, полученный из map(), переносит объекты Player.

Что важно понимать в map(), так это то, что сопоставление выполняется синхронно, так как каждый элемент публикуется исходным Flux. Если вы хотите выполнить сопоставление асинхронно, вы должны рассмотреть операцию flatMap().

Операция flatMap() требует некоторой мысли и практики, чтобы овладеть всеми навыками. Как показано на рисунке 10.17, вместо простого сопоставления одного объекта другому, как в случае map(), flatMap() сопоставляет каждый объект новому Mono или Flux. Результаты Mono или Flux сведены в новый результирующий Flux. Когда используется вместе с subscribeOn(), flatMap() может раскрыть асинхронную мощь типов Reactor.

Рис. 10.17. Операция плоской карты (flat map) использует промежуточный Flux для выполнения преобразования, следовательно, допускает асинхронные преобразования.

Следующий метод тестирования демонстрирует использование flatMap() и subscribeOn():

@Test

public void flatMap() {

   Flux<Player> playerFlux = Flux

      .just("Michael Jordan", "Scottie Pippen", "Steve Kerr")

      .flatMap(n -> Mono.just(n)

      .map(p -> {

         String[] split = p.split("\\s");

         return new Player(split[0], split[1]);

      })

      .subscribeOn(Schedulers.parallel())

   );

   List<Player> playerList = Arrays.asList(

      new Player("Michael", "Jordan"),

      new Player("Scottie", "Pippen"),

      new Player("Steve", "Kerr"));

   StepVerifier.create(playerFlux)

      .expectNextMatches(p -> playerList.contains(p))

      .expectNextMatches(p -> playerList.contains(p))

      .expectNextMatches(p -> playerList.contains(p))

      .verifyComplete();

}

Обратите внимание, что flatMap() получает лямбда-функцию, которая преобразует входящую String в Mono типа String. Затем к Mono применяется операция map() для преобразования String в Player.

Если вы остановитесь прямо здесь, результирующий поток будет передавать объекты Player, созданные синхронно в том же порядке, что и в примере с map(). Но операции с Mono завершаются вызовом subscribeOn(), чтобы указать, что каждая подписка должна проходить в параллельном потоке. Следовательно, операции сопоставления для нескольких входящих объектов String могут выполняться асинхронно и параллельно.