Flux<String> firstFlux = Flux.first(slowFlux, fastFlux);
StepVerifier.create(firstFlux)
.expectNext("hare")
.expectNext("cheetah")
.expectNext("squirrel")
.verifyComplete();
}
В этом случае, поскольку медленный Flux не будет публиковать никаких значений до 100 мс после начала публикации быстрого Flux, вновь созданный Flux будет просто игнорировать медленный Flux и публиковать только значения из быстрого Flux.
10.3.3 Преобразование и фильтрация реактивных потоков
Когда данные проходят через поток, вам, вероятно, потребуется отфильтровать некоторые значения и изменить другие значения. В этом разделе мы рассмотрим операции, которые преобразуют и фильтруют данные, проходящие через реактивный поток.
ФИЛЬТРАЦИЯ ДАННЫХ ИЗ РЕАКТИВНЫХ ТИПОВ
Один из самых основных способов фильтрации данных при их поступлении из Flux - просто игнорировать первые записи. Операция skip(), показанная на рисунке 10.10, делает именно это.
Рисунок 10.10. Операция skip пропускает указанное количество сообщений перед передачей оставшихся сообщений в результирующий Flux.
Учитывая Flux с несколькими записями, операция skip() создаст новый Flux, который пропускает заданное количество элементов, прежде чем отдавать остальные элементы из исходного Flux. Следующий метод тестирования показывает, как использовать skip():
@Test
public void skipAFew() {
Flux<String> skipFlux = Flux.just(
"one", "two", "skip a few", "ninety nine", "one hundred")
.skip(3);
StepVerifier.create(skipFlux)
.expectNext("ninety nine", "one hundred")
.verifyComplete();
}
В этом случае у вас есть Flux из пяти String элементов. Вызов метода skip(3) для этого потока создает новый Flux, который пропускает первые три элемента и публикует только последние два элемента.
Но, возможно, вы не хотите пропускать определенное количество элементов, а вместо этого нужно пропустить некоторое количество элементов, определяемое не количеством, а временем. Альтернативная форма операции skip(), показанная на рисунке 10.11, создает Flux, который ожидает, пока не пройдет некоторое заданное время, прежде чем отдавать элементы из исходного Flux.
Рисунок 10.11. Альтернативная форма операции skip ждет, пока не пройдет некоторое время, прежде чем передавать сообщения в результирующий поток.
Следующий метод тестирования использует функцию skip() для создания Flux, который ожидает четыре секунды, прежде чем начинает отдавать какие-либо значения. Поскольку этот Flux был создан из Flux, который имеет односекундную задержку между элементами (используя delayElements()), будут переданы только последние два элемента:
@Test
public void skipAFewSeconds() {
Flux<String> skipFlux = Flux.just(
"one", "two", "skip a few", "ninety nine", "one hundred")
.delayElements(Duration.ofSeconds(1))
.skip(Duration.ofSeconds(4));
StepVerifier.create(skipFlux)
.expectNext("ninety nine", "one hundred")
.verifyComplete();
}
Вы уже видели пример метода take(), но в познакомившись с методом skip(), take() можно рассматривать как противоположность skip (). В то время как функция skip() пропускает первые несколько элементов, функция take() выдает только первые несколько элементов (как показано на диаграмме marble на рис. 10.12):
@Test
public void take() {
Flux<String> nationalParkFlux = Flux.just(
"Yellowstone", "Yosemite", "Grand Canyon",
"Zion", "Grand Teton")
.take(3);
StepVerifier.create(nationalParkFlux)
.expectNext("Yellowstone", "Yosemite", "Grand Canyon")
.verifyComplete();
}
Рис. 10.12 операция take передает только первые несколько сообщений из входящего Flux, а затем отменяет подписку.
Как и skip(), take() также имеет альтернативную форму, основанную на длительности, а не на количестве элементов. Он будет принимать и отдавать столько элементов, сколько проходит через исходный поток, пока не пройдет некоторый период времени, после чего поток завершится. Это показано на рисунке 10.13.
Рис. 10.13. Альтернативная форма операции take передает сообщения в результирующий поток до тех пор, пока не пройдет некоторое время.
В следующем методе тестирования используется альтернативная форма take() для отправки максимально возможного количества элементов в первые 3,5 секунды после подписки:
@Test
public void take() {
Flux<String> nationalParkFlux = Flux.just(
"Yellowstone", "Yosemite", "Grand Canyon",
"Zion", "Grand Teton")
.delayElements(Duration.ofSeconds(1))
.take(Duration.ofMillis(3500));
StepVerifier.create(nationalParkFlux)
.expectNext("Yellowstone", "Yosemite", "Grand Canyon")
.verifyComplete();
}
Операции skip() и take() можно рассматривать как операции фильтрации, где критерии фильтра основаны на количестве или длительности. Для более общей фильтрации значений Flux вы найдете операцию filter() весьма полезной.
При наличии предиката, который решает, будет ли элемент проходить через поток или нет, операция filter() позволяет выборочно публиковать на основе любых критериев, которые вы хотите. Мраморная диаграмма на рисунке 10.14 показывает, как работает filter().