Categories
程式開發

錯過了初戀,別錯過WebFlux


1、概要

Spring 在5.0版本引入WebFlux,增加響應式框架的支持,響應式編程是一種範式,它促進了數據處理的異步、非阻塞、事件驅動方法。那麼到底什麼是Reactive呢?它就像是初戀,讓人想要知道關於她的一切。

reactive是指圍繞對變化作出反映的編程模型,網絡組建對I/O事件作出反映,UI控制器對鼠標事件作出反映,等等。從這個意義上說非阻塞(non-blocking)就是reactive。還有另外一個重要的機制,Spring將reacive和非阻塞背壓(non-blocking back pressure)聯繫在一起。在同步的代碼中,阻塞調用就是一種天然的背壓的一種形式,它迫使調用者必須等待。在非阻塞的代碼中,控制事件的速率就變的很重要,這樣快速的事件生產者就不會壓垮消費者。

2、Spring WebFlux Framework

Spring WebFlux是同Spring MVC同級別且完簡單全支持響應流的web框架。 “她”框架支持兩種編程模式

Annotated Controllers基於註解的響應式組件Functional Endpoints基於lambada輕量級函數式編程模型

下面我們就聚焦在第一種,基於註解響應式組件,如果你熟悉Spring MVC編程風格,那麼你也能輕鬆使用WebFlux,下面我們溫柔的扒一扒WebFlux

3、Reactive REST 應用

我們現在使用Spring WebFlux 創建一個簡單的Reactive REST **FuckingGreatWebfluxApplication**應用:

3.1 項目結構

.
├── pom.xml
└── src
├── main
│   ├── java
│   │   └── com
│   │   └── example
│   │   └── webflux
│   │   ├── controller
│   │   │   └── UserController.java
│   │   ├── domain
│   │   │   └── User.java
│   │   ├── repository
│   │   │   └── UserRepository.java
│   │   └── service
│   │   └── UserService.java
│   │   ├── FuckingGreatWebfluxApplication.java
│   └── resources
│   └── application.properties
└── test
└── java
└── com
└── example
└── webflux
├── controller
│   └── UserControllerTest.java
└── FuckingGreatWebfluxApplicationTests.java

3.2、Maven依賴


org.springframework.boot
spring-boot-starter-webflux

io.projectreactor
reactor-test
test

通過Spring Initaializr新建工程的時候 選擇如下:

錯過了初戀,別錯過WebFlux 1

3.3、Controller

controller層面提供了三個方法。 getUser方法返回Mono,findUser返回的是Flux。 Mono和Flux的區別在於,Mono返回的是0或者1個元素,Flux返回的是0或者多個元素。換句話說,如果想要從Spring MVC Web遷移到Reactive Web的話,凡是返回對象的則調整為Mono,凡是返回集合時則可以調整Flux即可(依實際情況)。

另外特別說明的是我在getUser和*findUser*方法裡面都使用了.log方法,她會打印出日誌供我們觀察。

@RestController
@RequestMapping("/v1")
public class UserController {

@Resource
UserService userService;

@GetMapping(path = "/users/{userId}", produces = MediaType.APPLICATION_JSON_VALUE)
@ResponseStatus(code = HttpStatus.OK)
public Mono getUser(@PathVariable("userId") Long userId) {
return userService.find(userId).log();
}

@PostMapping(path = "/users", consumes = MediaType.APPLICATION_JSON_VALUE)
@ResponseStatus(code = HttpStatus.CREATED)
public Mono createUser(@RequestBody User user) {
return userService.create(user);
}

@GetMapping(path = "/users", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux findUser() {
return userService.findAll().delayElements(Duration.ofSeconds(1L)).log();
}

3.4、 Service

服務層代碼,我在find方法裡面特意延遲了5​​秒鐘,我們後續觀察一下日誌運行情況

@Service
public class UserService {

@Resource
UserRepository userRepository;

public Mono find(Long id) {
try {
Thread.sleep(5 * 1000L);
} catch (InterruptedException e) {

}
return userRepository.find(id);
}

public Mono create(User user) {
return userRepository.create(user);
}

public Flux findAll() {
return userRepository.findAll();
}

其他的代碼這裡不羅列了,詳細看源碼部分吧!

4、單元測試

4.1 、getUser測試

如下測試代碼主要是獲取ID為1的user,並且斷言該user的firstname為Cattle

@Test
void getUser() {
webTestClient.get().uri("/v1/users/1").exchange().expectStatus().isOk().expectBody(User.class).value(
(Consumer) user -> Assertions.assertEquals(user.getFirstName(), "Cattle"));

}

輸出:

2020-05-11 00:07:24.849 INFO 19356 --- [ parallel-1] reactor.Mono.Just.1 : | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
2020-05-11 00:07:24.855 INFO 19356 --- [ parallel-1] reactor.Mono.Just.1 : | request(unbounded)
2020-05-11 00:07:24.856 INFO 19356 --- [ parallel-1] reactor.Mono.Just.1 : | onNext(User(id=1, firstName=Cattle, lastName=Ma))
2020-05-11 00:07:24.868 INFO 19356 --- [ parallel-1] reactor.Mono.Just.1 : | onComplete()

從日誌打印可以看出,雖然我在獲取的時候延遲了5秒啊,但是request和onComplete的時間完全一樣,大家想一想,說明了什麼呢?

4.2、findUser測試

測試代碼如下

@Test
void findUser() {
webTestClient.get().uri("/v1/users")
.accept(MediaType.APPLICATION_STREAM_JSON)
.exchange()
.expectStatus().isOk()
.expectBodyList(User.class).consumeWith(u -> u.getResponseBody().forEach(System.out::println));
}

輸出:

2020-05-11 00:15:44.597 INFO 20503 --- [ parallel-1] reactor.Flux.ConcatMap.1 : onSubscribe(FluxConcatMap.ConcatMapImmediate)
2020-05-11 00:15:44.603 INFO 20503 --- [ parallel-1] reactor.Flux.ConcatMap.1 : request(1)
2020-05-11 00:15:45.611 INFO 20503 --- [ parallel-2] reactor.Flux.ConcatMap.1 : onNext(User(id=1, firstName=Cattle, lastName=Ma))
2020-05-11 00:15:45.722 INFO 20503 --- [ main] reactor.Flux.ConcatMap.1 : request(31)
2020-05-11 00:15:46.637 INFO 20503 --- [ parallel-3] reactor.Flux.ConcatMap.1 : onNext(User(id=2, firstName=Tony, lastName=Ma))
2020-05-11 00:15:47.639 INFO 20503 --- [ parallel-4] reactor.Flux.ConcatMap.1 : onNext(User(id=3, firstName=Jack, lastName=Ma))
2020-05-11 00:15:47.641 INFO 20503 --- [ parallel-4] reactor.Flux.ConcatMap.1 : onComplete()
User(id=1, firstName=Cattle, lastName=Ma)
User(id=2, firstName=Tony, lastName=Ma)
User(id=3, firstName=Jack, lastName=Ma)

從日誌可以分析出,雖然我們隻請求了一次,但是它內部實際上是request多次才獲取到全部的結果,然後關閉了應用。思考一下,我們能不能做到無線推流(Server Send Event),而不會有onComplete出現呢?將不斷產生的數據全部流到客戶端。

5、源碼

https://github.com/cattles/fucking-great-webflux.git

6、總結

前面說了那麼多,而且還留下了一些疑問,那麼你是否有繼續探索的慾望了呢?好好交往吧,如同初戀一樣!相信未來我們都會越來越多的見到異步非阻塞響應式編程曼妙的身影。本次介紹的內容比較少,主要還是在應用層面,那麼數據庫層是否也需要異步支持呢?不錯,必須的。後面我會另外在介紹事件驅動的文章中提到支持的數據庫Reactive Relational Database,請大家持續關注哦!

7、附錄

[^Functional Endpoints]: https://docs.spring.io/spring/docs/current/spring-framework-reference/web-reactive.html#webflux-fn

[^Annotated Controllers]: https://docs.spring.io/spring/docs/current/spring-framework-reference/web-reactive.html#webflux-controller