Java 响应式编程是一种基于异步数据流处理的编程范式,它强调数据流的声明式构建和传播变化的自动响应。Java 9 引入的Flow API
为响应式编程提供了标准接口,而 Reactor 和 RxJava 等第三方库则提供了更丰富的操作符和工具。
核心概念
- Publisher(发布者):产生数据流的源头。
- Subscriber(订阅者):消费数据流的接收者。
- Subscription(订阅):连接发布者和订阅者的桥梁,管理背压(Backpressure)。
- Processor(处理者):兼具发布者和订阅者的功能,用于转换数据流。
简单示例:使用 Java Flow API
下面是一个使用 Java 标准库Flow API
的简单响应式编程示例:
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;public class ReactiveExample {public static void main(String[] args) throws InterruptedException {// 创建发布者try (SubmissionPublisher<String> publisher = new SubmissionPublisher<>()) {// 创建订阅者Flow.Subscriber<String> subscriber = new Flow.Subscriber<>() {private Flow.Subscription subscription;@Overridepublic void onSubscribe(Flow.Subscription subscription) {System.out.println("订阅成功");this.subscription = subscription;subscription.request(1); // 请求1个数据}@Overridepublic void onNext(String item) {System.out.println("接收到数据: " + item);subscription.request(1); // 处理完后再请求1个}@Overridepublic void onError(Throwable throwable) {System.out.println("发生错误: " + throwable.getMessage());}@Overridepublic void onComplete() {System.out.println("数据流处理完成");}};// 订阅publisher.subscribe(subscriber);// 发布数据publisher.submit("Hello");publisher.submit("Reactive");publisher.submit("World");// 等待所有数据处理完成Thread.sleep(1000);}}
}
常用操作符(以 Reactor 库为例)
Reactor 是 Spring 生态中推荐的响应式编程库,提供了Mono
(0-1 个元素)和Flux
(0-N 个元素)两种核心类型:
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;public class ReactorExample {public static void main(String[] args) {// 创建FluxFlux<String> flux = Flux.just("A", "B", "C").map(String::toLowerCase) // 转换操作.filter(s -> s.startsWith("a")); // 过滤操作// 创建MonoMono<String> mono = Mono.just("Hello").flatMap(s -> Mono.just(s + " World")); // 异步转换// 订阅并消费flux.subscribe(System.out::println, // 正常数据处理Throwable::printStackTrace, // 错误处理() -> System.out.println("Flux完成") // 完成回调);mono.subscribe(System.out::println);}
}
背压(Backpressure)处理
响应式编程的重要特性是支持背压,即消费者可以控制生产者发送数据的速率:
Flux.range(1, 1000) // 生成1到1000的整数.onBackpressureBuffer(100) // 缓冲100个元素.subscribe(num -> {// 模拟慢速处理try { Thread.sleep(100); } catch (InterruptedException e) {}System.out.println(num);},Throwable::printStackTrace,() -> System.out.println("处理完成"));
响应式 Web 示例(Spring WebFlux)
Spring WebFlux 是基于 Reactor 的响应式 Web 框架:
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;@SpringBootApplication
public class WebFluxExample {public static void main(String[] args) {SpringApplication.run(WebFluxExample.class, args);}
}@RestController
class HelloController {@GetMapping("/hello")public Mono<String> hello() {return Mono.just("Hello, Reactive Web!");}
}
总结
Java 响应式编程通过异步数据流提供了高效处理大量并发请求的能力,适合构建非阻塞、低延迟的应用程序。主要应用场景包括微服务、实时数据处理和高并发系统。