Выбрать главу

Однако Java Streams обычно являются синхронными и работают с конечным набором данных. По сути, они являются средством перебора коллекции с помощью функций.

Reactive Streams поддерживают асинхронную обработку наборов данных любого размера, включая бесконечные наборы данных. Они обрабатывают данные в режиме реального времени, когда они становятся доступными, с противодавлением (backpressure), чтобы не перегружать своих потребителей.

Спецификация реактивных потоков может быть описана четырьмя определениями интерфейсами: Publisher, Subscriber, Subscription, и Processor. Publisher создает данные, которые он отправляет Subscriber на Subscription. Интерфейс Publisher объявляет единый метод subscribe() с помощью которого Subscriber может подписаться на Publisher:

public interface Publisher<T> {

    void subscribe(Subscriber<? super T> subscriber);

}

После того как Subscriber подписался, он может получать события от Publisher. Эти события отправляются через методы интерфейса Subscriber:

public interface Subscriber<T> {

    void onSubscribe(Subscription sub);

    void onNext(T item);

    void onError(Throwable ex);

    void onComplete();

}

Первое событие, которое получит Subscriber, - это вызов функции onSubscribe(). Когда Publisher вызывает функцию onSubscribe(), он передает Subscription объект Subscriber. Именно через Subscription Subscriber может управлять своей подпиской:

public interface Subscription {

    void request(long n);

    void cancel();

}

Subscriber может вызвать функцию request(), чтобы запросить отправку данных, или функцию cancel(), чтобы указать, что он больше не заинтересован в получении данных и отменяет подписку. При вызове функции request() Subscriber передает long значение, чтобы указать, сколько элементов данных он готов принять. Именно здесь возникает обратное давление (backpressure), препятствующее Publisher отправлять больше данных, чем может обработать Subscriber. После того, как Publisher отправил столько элементов, сколько было запрошено, Subscriber может снова вызвать функцию request(), чтобы запросить больше.

После того, как Subscriber запросил данные, данные начинают поступать через поток. Для каждого элемента, опубликованного Publisher, будет вызван метод onNext() для доставки данных Subscriber. Если есть какие-либо ошибки, вызывается onError(). Если у Publisher нет больше данных для отправки и он не будет генерировать больше данных, он вызовет onComplete(), чтобы сообщить подписчику, что он завершил процесс.

Что касается интерфейса Processor, это комбинация Subscriber и Publisher, как показано здесь:

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {}

Как Subscriber, Processor будет получать данные и обрабатывать их каким-либо образом. Затем он будет “переоденется” и выступит в качестве Publisher, чтобы публиковать результаты для своих Subscribers.

Как вы можете видеть, спецификация Reactive Streams довольно проста. Довольно легко понять, как можно построить конвейер обработки данных, который начинается с Publisher, прогоняет данные через ноль или более Processors, а затем передает конечные результаты в Subscriber.

Однако интерфейсы Reactive Streams не могут использоваться для создания такого потока функциональным способом. Project Reactor - это реализация спецификации Reactive Streams, которая предоставляет функциональный API для создания Reactive Streams. Как вы увидите в следующих главах, Reactor является основой модели реактивного программирования в Spring 5. В оставшейся части этой главы мы собираемся исследовать (и, осмелюсь сказать, очень весело провести время) Project Reactor.

10.2 Начало работы с Reactor

Реактивное программирование требует от нас думать совсем иначе, чем императивное программирование. Вместо того, чтобы описывать набор шагов, которые необходимо предпринять, реактивное программирование означает построение конвейера, по которому будут проходить данные. Когда данные проходят через конвейер, они могут быть изменены или использованы каким-либо образом.

Например, предположим, что вы хотите взять имя человека, изменить все его буквы на заглавные, использовать его для создания приветственного сообщения, а затем, наконец, напечатать его. В модели императивного программирования код будет выглядеть примерно так:

String name = "Craig";

String capitalName = name.toUpperCase();

String greeting = "Hello, " + capitalName + "!";

System.out.println(greeting);

В императивной модели каждая строка кода выполняет шаг, один за другим, и определенно в одном и том же потоке. Каждый шаг блокирует выполнение потока до следующего шага до его завершения.

Функциональный, реактивный код может достичь того же этого же:

Mono.just("Craig")

   .map(n -> n.toUpperCase())

   .map(cn -> "Hello, " + cn + "!")

   .subscribe(System.out::println);

Не волнуйтесь про, возможное, недопонимание этого примера; мы скоро поговорим об операциях just(), map() и subscribe(). На данный момент важно понимать, что хотя реактивный пример все еще следует пошаговой модели, на самом деле это конвейер, через который проходят данные. На каждом этапе конвейера данные каким-то образом изменяются, но нельзя точно сказать, какой поток выполняет какие операции. Все операции могут выполняться в одном потоке ... или не в одном.