使用Spring Cloud Stream

Spring Cloud Stream使用Spring Cloud Function提供的功能,分别将SupplierFunctionConsumer作为stream的发布者、处理器和消费者的处理函数。

发布数据

在Spring Cloud Stream中,发布数据的方式主要有两种,一种是通过Supplier自动触发,一种是通过StreamBridge通过外部数据源触发。

通过Supplier<T>自动触发

使用Supplier<T>作为stream的发布者,需要一种触发机制,来发起发布操作。Spring Cloud Stream提供了如下几种自动触发机制。

命令式(Imperative)编程模式的触发

如果使用的是命令式编程模式(Imperative),则直接在Supplier<T>上标注@Bean将之注册到Spring容器即可。默认Spring Framework会每隔1秒触发一次。


java

复制代码

   @Bean    Supplier<String> stringSupplier() {        return () -> {            String value = "Hello World!";            System.out.println("sent: " + value);            return value;       };   }

我们可以通过两种方式来调整触发频率:

  • 全局属性:spring.integration.poller.xxx

    • 设置所有函数的拉取频率
  • 每个绑定特定的属性:spring.cloud.stream.bindings.<binding-name>.producer.poller.xxx

    • 为某个特定的绑定设置拉取频率
反应式(Reactive)编程模式的触发

使用反应式编程模式时,Supplier<T>默认只会触发一次,这对于无限流是合适的。对于有限流,如果想多次触发,可以借助于@PollableBean注解。

  • 无限流

    
    

    java

    复制代码

       @Bean    Supplier<Flux<String>> stringSupplier() {        return () -> Flux.fromStream(                        Stream.generate(() -> {                            try {                                TimeUnit.SECONDS.sleep(1);                                String value = "Hello World!!";                                log.info(">>> {}", value);                                return value;                           } catch (InterruptedException e) {                                throw new RuntimeException(e);                           }                       }))               .subscribeOn(Schedulers.boundedElastic())               .share();   }

  • 有限流,使用PollableBean注解

    
    

    java

    复制代码

       @PollableBean    Supplier<Flux<String>> stringSupplier() {        return () -> {            System.out.println("sending...");            return Flux.just("Hello", "World", "!");       };   }

通过StreamBridge触发外部源

如果要发布的数据来自于REST请求,或者其他的外部源系统,则可以使用StreamBridge这个bean来将外部源的数据发布到Spring Cloud Stream中。

基本使用

比如,下面是通过GET请求发布数据:


java

复制代码

   @GetMapping("/send")    public String send() {        if (streamBridge.send("my-binding", "Hello World!")) {            return "sent";       } else {            return "NOT sent";       }   }

上述send方法的第一个参数,就是bindingName,这里是my-binding,可以是提前通过属性指定的,如果没有指定,那么会自动创建一个binding。


yaml

复制代码

spring: cloud:   stream:     output-bindings: my-binding

建议提前配置好bindingName,否则可能会造成内存溢出。为了防止内存溢出,可以通过以下属性来限制动态创建binding的数量。

spring.cloud.stream.dynamic-destination-cache-size=5

使用Interceptor

因为StreamBridge使用MessageChannel来建立发布的binding,所以可以利用ChannelInterceptor来拦截发布过程。比如:


java

复制代码

@Component @GlobalChannelInterceptor(patterns = "*") public class MyInterceptor implements ChannelInterceptor { ?    @Override    public Message<?> preSend(Message<?> message, MessageChannel channel) {        System.out.println(message.getHeaders());        Object payload = message.getPayload();        if (payload instanceof byte[] data) {            System.out.println(new String(data));       } else {            System.out.println(message.getPayload());       }        return message;   } }

注:

  • ChannleInterceptor的实现类需要注册到Spring容器中

  • 使用GlobalChannelIntercptor来标注这个拦截器

    • 参数patterns的值可以控制该拦截器的硬性范围

      • *表示拦截所有binding
      • foo-*表示只拦截那些以foo-开头的binding

消费数据

可以使用多种方式来消费stream中的数据,最常见的方式是通过Consumer<T>函数来消费数据。除此之外,也可以通过所谓的“拉取式消费者”来消费数据。

反应式的Consumer

对于命令式编程模式,使用Consumer<T>很直观,不赘述。

对于反应式的Consumer,由于它没有返回值,导致Spring Framework无法自动subscribe它,所以需要一些特殊处理。有两种方式:

使用Function<Flux<T>,Mono<Void>

建议的方式,是不用Consumer<Flux<T>>,而是使用Function<Flux<T>,Mono<Void>>来保证有一个返回值,让Spring Framework可以订阅。


java

复制代码

   @Bean    Function<Flux<String>, Mono<Void>> printText() {        return flux -> flux.map(value -> {            log.info("received: " + value);            return value;       }).then();   }

具体的操作,可以放在map等等的算子中。

使用Consumer<Flux<T>>,主动订阅

如果必须使用Consumer<Flux<T>>,则需要自己编写订阅代码,如下:


java

复制代码

   @Bean    Consumer<Flux<String>> printText() {        return flux -> {            flux.subscribe(System.out::println);       };   }

函数组合(Function Composition)

利用Spring Cloud Function的函数组合功能,可以将多个Function<T,R>或者Consumer<T>组合在一起(当然Consumer<T>只能在最后),形成一个运行时的整体函数。比如:


yaml

复制代码

spring: cloud:   function:     definition: uppercase|quote|printText   stream:     function:       bindings:         uppercase|quote|printText-in-0: convert     bindings:       convert:         destination: my-binding

上述配置,表示:

  • 应用中有两个Function(uppercase和quote)和一个Consumer(printText),他们共同组成一个运行时函数;
  • 为了方便引用,这个运行时函数的绑定器被命名为convert,即:uppercase|quote|printText-in-0的别名
  • 将这个绑定器绑定到叫做my-binding的目标上

使用函数组合,可以很方便地执行切面操作,比如上述案例中的quote其实可以看作一个切面,它对输入做了quote的增强。

多输入/输出参数

在需要合并/分流stream的场景下,会涉及到多个输入/输出参数。可以利用Project Reactor提供的Tuple来处理。


java

复制代码

@Bean public Function<Tuple2<Flux<String>, Flux<Integer>>, Flux<String>> gather() { return tuple -> { Flux<String> stringStream = tuple.getT1(); Flux<String> intStream = tuple.getT2().map(i -> String.valueOf(i)); return Flux.merge(stringStream, intStream); }; }

对应的绑定器命名如下:

  • gather-in-0:第一个输入流的绑定器
  • gather-in-1:第二个输入流的绑定器
  • gather-out-0:第一个输出流的绑定器

使用PollableMessageSource

如果不想使用函数式编程,那么Spring Framework提供了PollableMessageSource来“拉取”stream中的数据。


java

复制代码

   @Bean    ApplicationRunner applicationRunner(PollableMessageSource source) {        return args -> {            while (true) {                if (!source.poll(m -> {                    if (m.getPayload() instanceof byte[] data) {                        System.out.println(new String(data));                   } else {                        System.out.println(m.getPayload());                   }               })) {                    TimeUnit.SECONDS.sleep(1);               }           }       };   }

结论

使用Spring Cloud Stream的函数式编程模式,可以使用很少的代码实现数据流的发布、流转和消费。另外,Spring Cloud Stream默认支持Kafka和RabbitMQ,内置了它们的binder。如果需要支持其他的消息软件,则可以根据规范自己开发。