Reactive响应式(反应式)编程是一种新的编程风格,其特点是异步或并发、事件驱动、推送PUSH机制以及观察者模式的衍生。reactive应用(响应式应用)允许开发人员构建事件驱动(event-driven),可扩展性,弹性的反应系统∶提供高度敏感的实时的用户体验感觉,可伸缩性和弹性的应用程序栈的支持,随时可以部署在多核和云计算架构。
模拟一个例子
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
public class FlowDemo {
public static void main(String[] args) throws InterruptedException {
//1.定义发布者,数据类型是Integer
SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();
//2.定义消费者
Flow.Subscriber<Integer> subscriber = new Flow.Subscriber<Integer>() {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
//订阅关系管理
this.subscription = subscription;
this.subscription.request(1);
}
@Override
public void onNext(Integer integer) {
//获取到数据后,开始处理
System.out.println("我接收到的数据是:" + integer);
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
this.subscription.cancel();
}
@Override
public void onComplete() {
System.out.println("数据处理完毕");
}
};
// 发布者和订阅者建立联系
publisher.subscribe(subscriber);
//创建数据
//TODO -- 数据库,redis,缓存 省略掉数据获取的步骤
int data = 110;
publisher.submit(data);
publisher.close();
Thread.currentThread().join(1000);
}
}
执行结果
我接收到的数据是:110
数据处理完毕
package java.util.concurrent;
public final class Flow {
static final int DEFAULT_BUFFER_SIZE = 256;
private Flow() {
}
public static int defaultBufferSize() {
return 256;
}
public interface Processor<T, R> extends Flow.Subscriber<T>, Flow.Publisher<R> {
}
public interface Subscription {
void request(long var1);
void cancel();
}
public interface Subscriber<T> {
void onSubscribe(Flow.Subscription var1);
void onNext(T var1);
void onError(Throwable var1);
void onComplete();
}
@FunctionalInterface
public interface Publisher<T> {
void subscribe(Flow.Subscriber<? super T> var1);
}
}
可以看到上边也定义了一个Professor接口,所以按理来说我们是可以使用一个类来实现这个接口,然后处理相关的逻辑的,以下就用这种方式来实现类似上边的一个例子。
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
public class MyProcessor extends SubmissionPublisher<String> implements Flow.Processor<Integer, String> {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
this.subscription.request(1);
}
@Override
public void onNext(Integer integer) {
System.out.println("打印当前的数据:" + integer);
this.submit(String.valueOf(integer + 100));
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
this.subscription.cancel();
}
@Override
public void onComplete() {
System.out.println("调用完成");
}
}
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
public class MyFlowDemo {
public static void main(String[] args) throws InterruptedException {
SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();
MyProcessor myProcessor = new MyProcessor();
publisher.subscribe(myProcessor);
//定义消费者 --
Flow.Subscriber<String> subscriber = new Flow.Subscriber<String>() {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(String s) {
System.out.println("接收到数据:" + s);
throw new RuntimeException();
}
@Override
public void onError(Throwable throwable) {
System.out.println("人为制造异常,执行了onerror!!!");
throwable.printStackTrace();
this.subscription.cancel();
}
@Override
public void onComplete() {
System.out.println("发送方执行完毕");
}
};
myProcessor.subscribe(subscriber);
publisher.submit(111);
publisher.close();
Thread.currentThread().join(1000);
}
}
如果觉得有收获就点个赞吧,更多知识,请点击关注查看我的主页信息哦~