@Test
public void createAFlux_fromIterable() {
List<String> fruitList = new ArrayList<>();
fruitList.add("Apple");
fruitList.add("Orange");
fruitList.add("Grape");
fruitList.add("Banana");
fruitList.add("Strawberry");
Flux<String> fruitFlux = Flux.fromIterable(fruitList);
// ... проверить шаги
}
Или, если у вас есть Java Stream, который вы хотели бы использовать в качестве источника для Flux, fromStream() - это метод, который вы будете использовать:
@Test
public void createAFlux_fromStream() {
Stream<String> fruitStream =
Stream.of("Apple", "Orange", "Grape", "Banana", "Strawberry");
Flux<String> fruitFlux = Flux.fromStream(fruitStream);
// ... проверить шаги
}
Опять же, тот же StepVerifier, что и раньше, можно использовать для проверки данных, опубликованных Flux.
ГЕНЕРИРОВАНИЕ FLUX ДАННЫХ
Иногда у вас нет данных для работы, и вам просто нужно, чтобы Flux действовал как счетчик, отдавая число, которое увеличивается с каждым новым значением. Для создания счетчика Flux вы можете использовать статический метод range(). Диаграмма на рисунке 10.4 иллюстрирует, как работает range().
Рисунок 10.4 Создание Flux из диапазона приводит к публикации сообщений в противоположном стиле.
Следующий метод тестирования демонстрирует, как создать Flux диапазон:
@Test
public void createAFlux_range() {
Flux<Integer> intervalFlux =
Flux.range(1, 5);
StepVerifier.create(intervalFlux)
.expectNext(1)
.expectNext(2)
.expectNext(3)
.expectNext(4)
.expectNext(5)
.verifyComplete();
}
В этом примере диапазон Flux создается с начальным значением 1 и конечным значением 5. StepVerifier доказывает, что он опубликует пять элементов, которые являются целыми числами от 1 до 5.
Еще один метод создания Flux, похожий на range(), представляет собой interval(). Как и метод range(), interval() создает поток, который возвращает увеличивающееся значение. Но то, что делает interval() особенным, заключается в том, что вместо того, чтобы указывать начальное и конечное значение, вы указываете длительность или как часто значение должно возвращаться. На рисунке 10.5 показана мраморная диаграмма для метода создания interval().
Рисунок 10.5 Flux, созданный из интервала, имеет периодическую запись, опубликованную в нем. (A Flux created from an interval has a periodic entry published to it.)
Например, чтобы создать поток интервалов, который выдает значение каждую секунду, можно использовать статический метод interval() следующим образом:
@Test
public void createAFlux_interval() {
Flux<Long> intervalFlux =
Flux.interval(Duration.ofSeconds(1))
.take(5);
StepVerifier.create(intervalFlux)
.expectNext(0L)
.expectNext(1L)
.expectNext(2L)
.expectNext(3L)
.expectNext(4L)
.verifyComplete();
}
Обратите внимание,что значение, возвращаемое потоком интервалов, начинается с 0 и увеличивается на каждом последующем элементе. Кроме того, поскольку interval() не имеет максимального значения, он потенциально будет работать вечно. Поэтому вы также используете операцию take(), чтобы ограничить результаты первыми пятью записями. Подробнее об операции take() мы поговорим в следующем разделе.
10.3.2 Комбинирование реактивных типов
Однажды может возникнуть задача когда Вам придется работать с двумя реактивными типами, которые вам нужно как-то объединить. Или, в других случаях, вам может потребоваться разделить Flux на несколько реактивных типов. В этом разделе мы рассмотрим операции, которые объединяют и разделяют Flux и Mono в Reactor.
СЛИЯНИЕ РЕАКТИВНЫХ ТИПОВ
Предположим, у вас есть два потока потока и нужно создать один результирующий поток, который будет производить данные, как только он станет доступен из любого из вышерасположенных Flux streams. Чтобы объединить один поток с другим, можно использовать операцию mergeWith(), как показано на marble диаграмме на рис.10.6.
Рисунок 10.6 слияние двух Flux потоков чередя их сообщения в новом Flux.
Например, предположим, что у вас есть Flux, значения которого являются именами телевизионных и киношных персонажей, и у вас есть второй Flux, значениями которого являются названия продуктов, которые эти персонажи любят есть. Следующий тестовый метод показывает, как можно объединить два объекта Flux с помощью метода mergeWith():
@Test
public void mergeFluxes() {
Flux<String> characterFlux = Flux
.just("Garfield", "Kojak", "Barbossa")
.delayElements(Duration.ofMillis(500));
Flux<String> foodFlux = Flux
.just("Lasagna", "Lollipops", "Apples")
.delaySubscription(Duration.ofMillis(250))
.delayElements(Duration.ofMillis(500));
Flux<String> mergedFlux = characterFlux.mergeWith(foodFlux);
StepVerifier.create(mergedFlux)
.expectNext("Garfield")
.expectNext("Lasagna")
.expectNext("Kojak")
.expectNext("Lollipops")
.expectNext("Barbossa")
.expectNext("Apples")
.verifyComplete();