menu 冷 の Codeworld
search self_improvement
目录

Spring5 Webflux 响应式编程

冷环渊
冷环渊 2022年04月03日  ·  阅读 57

Spring5 Webflux

前言

★ 这里是小冷的博客
✓ 优质技术好文见专栏
个人公众号,分享一些技术上的文章,以及遇到的坑
当前系列:Spring5 Webflux 系列
源代码 git 仓库 代码Git 仓库地址

Lambda

这个表达式 其实就是一个新的语法糖,这里Java8主要是对语法做了简化,让我们java的代码更加的简洁

Lambda可以总在哪里呢?

函数式接口

只实现了一个方法的接口,我们就叫函数式接口,这个时候可能会有java的警报

@FunctionalInterface有这个注解,java就会知道哦 你这个是函数式接口,就不会有警报了

简单的Lambda实战

我们就拿多线程中的 Runnable接口来做例子

    @Test
    public void test() {
        new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("你好我是传统线程");
            }
        }).start();
        new Thread(() -> {
            System.out.println("你好我是Lambda的第一个线程");
        }).start();
    }

我们可以根据上边语句的变化来看出 语法的简洁

() = 代表的是我们的参数列表,Lambda表达式的参数和我们调用方法参数必须一致
    
-> 尖头标识符 代表我们要使用Lambda
    
{} 方法体,这里是我们使用表达式的具体操作,也可以用方法引用的方式,用其他包装好点类的方法来做处理    

编写一个自己的函数式接口,并且练习

@FunctionalInterface
public interface MyinterFace {
    void method();
}

class test {

    void dosth(MyinterFace myinterFace) {
        System.out.print("Function A ");
        myinterFace.method();
    }

    public static void main(String[] args) {
        //这里我们使用自己的函数式接口 输出语句
        test test = new test();
        test.dosth(() -> {
            System.out.print(" do sth");
        });
    }
}

可以看到我们用自己的函数式接口作为参数 调用函数方法的 dosth,这个时候我们可以用Lambda表达式来实现我们这个接口里的步骤,这里我们以输出 do sth 为操作。

image-20211213185628713

问题处理

这里时候我们有两个方法,一个使用了 myinterfaceA 一个使用率 myinterFace B 这个时候我们 Lambda表达式没办法去识别,需要我们显示的声明用谁的

image-20211213185920018

@FunctionalInterface
interface MyinterFaceA {
    void method();
}

@FunctionalInterface
interface MyinterFaceB {
    void method();
}

class test {

    void dosth(MyinterFaceA myinterFace) {
        System.out.print("Function A ");
        myinterFace.method();
    }

    void dosth(MyinterFaceB myinterFace) {
        System.out.print("Function A ");
        myinterFace.method();
    }

    public static void main(String[] args) {
        //这里我们使用自己的函数式接口 输出语句
        test test = new test();
        test.dosth((MyinterFaceA) () -> {
            System.out.print(" do sth");
        });
    }
}

常用的java函数

提供者接口 : Supplier 没有输入只有输出

消费者接口 : Consumer 没有出只有输入

函数接口 : Function 放入一个对象返回一个新对象

  • UnaryOperator 对于 放入和输出类型一致时候的函数借口
  • BiFunction接口: 输入两个对象,返回一个新对象

Coding

/**
 * @projectName: Webflux_demo
 * @package: Lambda
 * @className: JdkFunctionDmo
 * @author: 冷环渊 doomwatcher
 * @description: TODO
 * @date: 2021/12/13 19:04
 * @version: 1.0
 */
public class JdkFunctionDmo {
    public static void main(String[] args) {
        // Supplier 没有输入 只有输出
        Supplier<String> supplier = () -> "我是一个  Supplier 方法";
        System.out.println(supplier.get());

        //Consummer 只有输入 没有输出
        Consumer<String> con = i -> System.out.println("我是一个 Conusmer Demo" + i);
        con.accept(" hello Consumer i am 冷环渊");

        //function 放入一个对象生成一个新的对象
        Function<Integer, Integer> func = i -> i * i;
        Integer apply = func.apply(9);
        System.out.println("Function demo out:" + apply);

        //对于 放入和输出类型一致的哦我们 Function接口里有一个实现 UnaryOperator
        UnaryOperator<Integer> unaryOperator = i -> i * i - i;
        System.out.println("Function demo out:" + unaryOperator.apply(9));

        //输入两个对象 返回一个新的对象 BiFunction
        BiFunction<Integer, Integer, String> biFunction = (i, e) -> i * e + "元";
        System.out.println("我一共该交给你多少钱:" + biFunction.apply(40, 80));
    }
}


到这里我们 Lambda表达式的快速认识就结束了,接下来是Java8的另一个特性,流式编程

Stream

我们通过演示的代码来带入 Stream api 的变成 以及我们做一个小练习

coding

/**
 * @projectName: Webflux_demo
 * @package: Stream
 * @className: StreamAPITest
 * @author: 冷环渊 doomwatcher
 * @description: TODO
 * @date: 2021/12/13 20:09
 * @version: 1.0
 */
public class StreamAPITest {
    public static void main(String[] args) {
        String[] strarr = {"bo_le", "", "webfulx", "redis", "spring", "mirc_Sercice"};
        // 数组 arr 创建 Stream
        //Arrays.stream(strarr).forEach(System.out::println);

        //2.list
        //Arrays.asList(strarr).stream().forEach(System.out::println);

        //3.stream.of()
        //Stream.of(strarr).forEach(System.out::println);

        //    4.迭代器 打印 1-10 元素
        //Stream.iterate(1, i -> i + 1).limit(10).forEach(System.out::println);

        //    5. generate 打印随机数 10以内的随机数
        //Stream.generate(() -> new Random().nextInt(10)).limit(10).forEach(System.out::println);

        /*
         *  现实中的流 变成 完整案例
         *  元素的中间操作,元素的终止操作
         *  结果依次 输出 abceo
         *
         * 结果 一次输出 belo
         * bo_le --> bole ->字符转换成一个新的流(b o l e)-> sorted->(belo);
         *
         *  PS: 注意事项 在流编程中 终止操作只能有一个,中间操作可以有 0-n个
         * */
        String[] arr = {"react", "", "spring", "bo_le", "bo_le"};
        Stream.of(arr)
                .filter(i -> !i.isEmpty())
                .distinct()
                .sorted()
                .limit(1)
                .map(i -> i.replace("_", ""))
                .flatMap(i -> Stream.of(i.split("")))
                .sorted()
                .forEach(System.out::println);

    }
}

Reactor Project

官网地址 : https://spring.io/reactive

简介

​ Reacive 异步非阻塞响应式框架 特点: 低延迟,高吞吐,以下简介均来自spring官方文档。

​ 反应式系统具有一些特性,使其成为低延迟、高吞吐量工作负载的理想选择。Project Reactor 和 Spring 产品组合协同工作,使开发人员能够构建具有响应性、弹性、弹性和消息驱动的企业级反应式系统。

响应式系统和传统的同步阻塞调用模型

  • 传统的模型 ,client 不管有多少信息都会一次性发给server,这个时候如果Server性能够,可以能会造成大量的客户端请求无法响应,之后就会拒绝请求和请求失败
  • 而响应式的模型有一个东西叫做 背压,需要数据,可以通过背压去控制数量,这样就不会让大量的数据冲垮我们的服务器

image-20211213215257820

什么是响应式?

响应式处理是一种范例,它使开发人员能够构建可以处理背压(流控制)的非阻塞、异步应用程序。

为什么需要响应式

反应式系统更好地利用现代处理器。此外,在反应式编程中包含背压可确保解耦组件之间具有更好的弹性。

有关响应式系统特质的论文

论文地址:https://www.reactivemanifesto.org/zh-CN

Reactor 核心库

Project Reactor 是一个完全无阻塞的基础,包括背压支持。它是 Spring 生态系统中响应式堆栈的基础,并在 Spring WebFlux、Spring Data 和 Spring Cloud Gateway 等项目中具有特色。

与springBoot整合

Spring 产品组合提供了两个并行堆栈。一种是基于带有 Spring MVC 和 Spring Data 构造的 Servlet API。另一个是利用 Spring WebFlux 和 Spring Data 的反应式存储库的完全反应式堆栈。在这两种情况下,Spring Security 都为您提供了对这两个堆栈的本机支持。

image-20211213233054209

可以看到,响应式的技术栈,和我们熟悉的MVC那一套不一样,这里我们的技术基本是换了一套,还没有很好的第三方框架的兼容性

响应式技术组建的关系

我们之后的demo Coding也会跟着从里到外的API 来学习

  1. ReativeStream

image-20211213234122714

我们来看一下,响应式的流程

image-20211213235759201

订阅者来决定可以接受多少数据,生产者根据背压的规则来传递,这样就不会出现像传统架构一样的问题

下图:就是我们的响应流的运行模型

image-20211214001652032

ReactiveStream(JDK9)编程

coding

ReactiveStream helloworld

  • 我们需要 发布者,订阅者,两者绑定,发送消息,关闭流
/**
 * @projectName: Webflux_demo
 * @package: reactiveStream
 * @className: ReactiveStreamDemo
 * @author: 冷环渊 doomwatcher
 * @description: TODO
 * @date: 2021/12/14 0:24
 * @version: 1.0
 */
public class ReactiveStreamDemo {
    public static void main(String[] args) {
        // 1.创建一个 发布者
        SubmissionPublisher publisher = new SubmissionPublisher();
        // 2.创建一个订阅者
        Flow.Subscriber subscriber = new Flow.Subscriber() {
            Flow.Subscription subscription;

            @Override
            public void onSubscribe(Flow.Subscription subscription) {
                this.subscription = subscription;
                System.out.println("创建订阅关系 ");
                subscription.request(1); //第一次需要发送一个 之后的都不需要了
            }

            @Override
            public void onNext(Object item) {
                System.out.println("接收数据:" + item);
                //接收数据 业务处理
                subscription.request(10);
            }

            @Override
            public void onError(Throwable throwable) {
                System.out.println("发生错误了");
            }

            @Override
            public void onComplete() {
                System.out.println("数据发送完成了");
            }
        };
        // 3 建立订阅者
        publisher.subscribe(subscriber);
        for (int i = 0; i < 100; i++) {
            // 4 发送数据
            publisher.submit("第" + i + "条hello reactive stream");
        }
        publisher.close();
        try {
            Thread.currentThread().join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
}

这里我们需要编写一个 Processor 来当做中间处理数据的

我们的发布者先发给Processor之后由Processor发给订阅者,

/**
 * @projectName: Webflux_demo
 * @package: reactiveStream
 * @className: ReactiveProcessor
 * @author: 冷环渊 doomwatcher
 * @description: TODO
 * @date: 2021/12/14 0:25
 * @version: 1.0
 */
public class ReactiveProcessor extends SubmissionPublisher<String> implements Flow.Processor<String, String> {
    private Flow.Subscription subscription;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        System.out.println("Processor建立订阅关系");
        subscription.request(1);
    }

    @Override
    public void onNext(String item) {
        System.out.println("Processor接收数据:" + item);
        //中间处理
        //数据发给最终订阅者
        this.submit(item.toUpperCase());
        //背压的实现核心
        this.subscription.request(1);
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("出现错误了");
    }

    @Override
    public void onComplete() {
        System.out.println("数据传输成功");
    }
}

编写有中间处理器 Processor的demo

/**
 * @projectName: Webflux_demo
 * @package: reactiveStream
 * @className: ReactiveStreamDemo2
 * @author: 冷环渊 doomwatcher
 * @description: TODO
 * @date: 2021/12/14 0:25
 * @version: 1.0
 */
public class ReactiveStreamDemo2 {
    public static void main(String[] args) {
        // 1.创建一个 发布者
        SubmissionPublisher publisher = new SubmissionPublisher();
        // 2.创建一个 Processor
        ReactiveProcessor processor = new ReactiveProcessor();
        // 3 发布者将消息给processor来做处理之后转发到最终订阅者
        publisher.subscribe(processor);
        // 4.创建一个最终订阅者
        Flow.Subscriber subscriber = new Flow.Subscriber() {
            Flow.Subscription subscription;

            @Override
            public void onSubscribe(Flow.Subscription subscription) {
                this.subscription = subscription;
                System.out.println("创建订阅关系 ");
                subscription.request(1); //第一次需要发送一个 之后的都不需要了
            }

            @Override
            public void onNext(Object item) {
                System.out.println("接收数据:" + item);
                //接收数据 业务处理
                subscription.request(10);
            }

            @Override
            public void onError(Throwable throwable) {
                System.out.println("发生错误了");
            }

            @Override
            public void onComplete() {
                System.out.println("数据发送完成了");
            }
        };
        processor.subscribe(subscriber);
        for (int i = 0; i < 100; i++) {
            System.out.println("发布数据" + i);
            // 4 发送数据
            publisher.submit("第" + i + "条hello reactive stream");
        }
        publisher.close();
        try {
            Thread.currentThread().join(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

到这里我们基于ReactiveStream的小练习demo就到这里了

Reactor Project(spring)

Flux And Mono 他们都是 Publisher

Flux 0-N 项的异步序列 代表0-多个

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-QcWlgvn2-1648923751095)(https://gitee.com/cold-abyss_admin/my-image-host/raw/master/%20img%20/flux.svg)]

AFlux<T>是一个标准Publisher<T>,表示 0 到 N 个发出的项目的异步序列,可选地由完成信号或错误终止。如无流规范,这三种类型的信号转换为呼叫到下游用户的onNextonCompleteonError方法。

具有这种大范围的可能信号,Flux是通用的反应型。请注意,所有事件,即使是终止事件,都是可选的:没有onNext事件但 onComplete事件代表一个空的有限序列,但是删除onComplete并且您有一个无限的空序列(不是特别有用,除了围绕取消的测试)。同样,无限序列也不一定是空的。

Mono, 异步 0-1 结果 要么有一个 要么没有

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-h5VSKfJy-1648923751096)(https://gitee.com/cold-abyss_admin/my-image-host/raw/master/%20img%20/mono.svg)]

AMono<T>是一种特殊的Publisher<T>,它通过onNext信号最多发出一个项目, 然后以一个onComplete信号(成功Mono,有或没有值)终止,或者只发出一个onError信号(失败Mono)。

可以使用 aMono来表示只有完成概念的无值异步进程(类似于 a Runnable)一个空的 Mono<Void>.

Reactor Coding

Coding之前 我们先把Reactor 需要的Mavern依赖 导入到maven 环境里

        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-core</artifactId>
            <version>3.4.6</version>
        </dependency>

Mono

    /**
     * @author 冷环渊 Doomwatcher
     * @context: 这里是 Mono 创建 0-1个元素序列的测试方法
     * @date: 2021/12/14 15:01
     * @param
     * @return: void
     */
    @Test
    public void MonoTset() {
        // 1. Mono 的创建方式
        /*
         *创建 空的 Mono 对象 输出 “”
         *     public final Disposable subscribe(Consumer<? super T> consumer) {
         *   Objects.requireNonNull(consumer, "consumer");
         *  return this.subscribe(consumer, (Consumer)null, (Runnable)null);
         * }
         * 从源码看出 我们的 subsrcibe参数是 Consumer,也就是说只进 不出
         * */
        Mono.empty().subscribe(System.out::println);
        /*
         *创建一个 Mono 输出内容就是我们just()参数的内容
         * public static <T> Mono<T> just(T data) {
         *     return onAssembly(new MonoJust(data));
         * }
         * */
        Mono.just("我的今天就结束 webflux 的学习了 hello Mono").subscribe(System.out::println);
    }

Flux

   /**
     * @author 冷环渊 Doomwatcher
     * @context: 这里是 flux 创建多个 0-n个元素序列 测试方法
     * @date: 2021/12/14 15:01
     * @param
     * @return: void
     */
    @Test
    public void FluxTset() {
        // 创建一个Flux 
        Flux.just(1, 2, 3, 4, 5, 6).subscribe(System.out::print);
        System.out.println();
        //创建多个 集合的形式
        Flux.fromIterable(Arrays.asList("a1", "b1", "c1", "d1")).subscribe(System.out::print);
        System.out.println();
        //创建多个 数组的形式
        Flux.fromArray(new String[]{"a1", "b1", "c1", "d1", "e1"}).subscribe(System.out::print);
        System.out.println();
        //基于流创建
        Flux.fromStream(Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)).subscribe(System.out::print);
        //通过饭未创建
        System.out.println();
        Flux.range(1, 100).subscribe(System.out::println);

        /*
         * 小案例
         * Flux.generate这里我们以两个参数为例子
         * 2的乘法口诀
         * 2*0 = 0
         * 2*1=1
         * 2*2 = 4
         * */
        Flux.generate(() -> 0, (i, sink) -> {
            sink.next("2*" + i + "=" + 2 * i);
            if (i == 9) {
                sink.complete();
            }
            return i + 1;
        }).subscribe(System.out::println);

    }

响应式编程需求实战

需求 我们这个需求的案例

给一定随机英文字符串,要求以26个字母的顺序输出排列

  • ​ 不能用循循环
  • 不要以暴力的方式

解题思路

image-20211214163738445

这里我们写了两种 解题目的方法,一个是基于StreamAPI 一个是基于ReactorAPI

  • 思路是这个样子的,创建出一个去掉空格获得的字符数组,之后去重排序即可
    /**
     * @author 冷环渊 Doomwatcher
     * @context: 响应式变成小练习
     * 给一定随机英文字符串,要求以26个字母的顺序输出排列
     * 小冷没看视频 用Stream流api 编写的
     * @date: 2021/12/14 16:44
     * @param
     * @return: void
     */
    @Test
    public void StreamDemoTest() {
        String[] arr = new String[]{"hello", "guys", "i", "prizev", "abc"};
        List<String> list = Arrays.asList(arr);
        list.stream()
                .filter(i -> !i.isEmpty())
                .flatMap(i -> Stream.of(i.split("")))
                .distinct()
                .sorted()
                .forEach(System.out::print);
    }

    /**
     * @author 冷环渊 Doomwatcher
     * @context: 这个是根据视频 用 reactor flux api 编写的
     * @date: 2021/12/14 16:54
     * @param
     * @return: void
     */
    @Test
    public void VedioReactorTest() {
        String str = "hello guys i am bole welcome to normal school jdk quick fox prizev ";
        Flux.fromArray(str.split(" "))
                .flatMap(i -> Flux.fromArray(i.split("")))
                .distinct()
                .sort()
                .subscribe(System.out::print);
    }

WebFlux 响应式框架

Spring WebFlux

Spring Framework 中包含的原始 Web 框架 Spring Web MVC 是专门为 Servlet API 和 Servlet 容器构建的。响应式堆栈 Web 框架 Spring WebFlux 是在 5.0 版本中添加的。它是完全非阻塞的,支持 Reactive Streams背压,并在 Netty、Undertow 和 Servlet 3.1+ 容器等服务器上运行。

这两个 Web 框架都反映了它们的源模块(spring-webmvcspring-webflux)的名称,并在 Spring 框架中并排共存。每个模块都是可选的。应用程序可以使用一个或另一个模块,或者在某些情况下,两者都使用——例如,带有响应式WebClient.

为什么我们需要Webflux

1.我们需要少量的线程来支持更多的处理。Servlet 3.1 确实为非阻塞 I/O 提供了 API。然而,使用它会远离 Servlet API 的其余部分,其中契约是同步 ( Filter, Servlet) 或阻塞 ( getParameter, getPart)。这就是将新的通用 API 用作任何非阻塞运行时的基础的动机。这很重要,因为服务器(例如 Netty)在异步、非阻塞空间中建立良好。

2 是函数式编程。就像 Java 5 中添加注释创造了机会(例如带注释的 REST 控制器或单元测试)一样,Java 8 中添加的 lambda 表达式为 Java 中的函数式 API 创造了机会。这对于允许异步逻辑的声明式组合的非阻塞应用程序和延续式 API(由CompletableFutureReactiveX推广)是一个福音。在编程模型级别,Java 8 使 Spring WebFlux 能够提供功能性 Web 端点以及带注释的控制器。

Spring MVC和spring webflux 的技术场景使用图

image-20211214183523382

Webflux的核心库就是我们的 Reactor API 与MVC区别所在

  • 接收但是 Publisher 返回的是 Mono/Flux
  • 同时支持注解和函数式编程两种模式

spring-web模块包含以下对反应式 Web 应用程序的基础支持:

  • 对于服务器请求处理,有两个级别的支持。
    • HttpHandler:HTTP 请求处理的基本契约,具有非阻塞 I/O 和反应流背压,以及用于 Reactor Netty、Undertow、Tomcat、Jetty 和任何 Servlet 3.1+ 容器的适配器。
    • WebHandlerAPI:用于请求处理的稍高级别的通用 Web API,在其之上构建了具体的编程模型,例如带注释的控制器和功能端点。
  • 对于客户端,有一个基本ClientHttpConnector合同来执行带有非阻塞 I/O 和响应式流背压的 HTTP 请求,以及用于Reactor Netty、响应式 Jetty HttpClientApache HttpComponents 的适配器 。应用程序中使用的更高级别的WebClient建立在这个基本契约之上。
  • 对于客户端和服务器,用于 HTTP 请求和响应内容的序列化和反序列化的编解码器

理论就到这里,我们来上手实操吧!

WebFlux Coding

编写controller 注解 hello world

/**
 * @projectName: webflux
 * @package: com.hyc.webflux.Controller
 * @className: ReactorController
 * @author: 冷环渊 doomwatcher
 * @description: TODO
 * @date: 2021/12/14 19:27
 * @version: 1.0
 */
@RestController
@RequestMapping("/annotated")
public class ReactorController {
    @GetMapping("/greeting")
    public Mono<String> greeting() {
        return Mono.just(" hello webflux by annotated");
    }
}
  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::                (v2.4.2)

2021-12-14 19:35:53.017  INFO 15172 --- [           main] com.hyc.webflux.WebfluxApplication       : Starting WebfluxApplication using Java 11.0.2 on DESKTOP-OG41IMR with PID 15172 (D:\JavaEngineer\Spirng5Webflux\webflux\target\classes started by doomwstcher in D:\JavaEngineer\Spirng5Webflux\webflux)
2021-12-14 19:35:53.022  INFO 15172 --- [           main] com.hyc.webflux.WebfluxApplication       : No active profile set, falling back to default profiles: default
2021-12-14 19:35:54.094  INFO 15172 --- [           main] o.s.b.web.embedded.netty.NettyWebServer  : Netty started on port 8080
2021-12-14 19:35:54.104  INFO 15172 --- [           main] com.hyc.webflux.WebfluxApplication       : Started WebfluxApplication in 1.501 seconds (JVM running for 2.712)

这里我们查看

image-20211214194409189

这就是我们注解版本的helloworld

函数式 hello world

    //函数式
@Bean
    public RouterFunction<ServerResponse> routers() {
        return RouterFunctions.route().GET("/func/greeting", serverRequest -> ok().bodyValue("hello  webflux by function")).build();
    }

image-20211214195211608

结语

这篇文章主要是帮助 想要了解 spring 最新技术特性的小伙伴进行一个 简单的入门,

想要了解更多,可以通过文档,视频等继续深入学习,工程师的路上 学无止境

分类:
标签: Webflux