ยท Develop  ยท 17 min read

Reactive Programming ๊ณผ Reactive Stream

Spring Webflux๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด์„œ Reactive Programming๊ณผ Reactive Stream์— ๋Œ€ํ•ด ๊ณต๋ถ€ํ–ˆ๋˜ ๋ถ€๋ถ„์„ ์ •๋ฆฌํ•ฉ๋‹ˆ๋‹ค.

1. ๋ฆฌ์—‘ํ‹ฐ๋ธŒ ํ”„๋กœ๊ทธ๋ž˜๋ฐ์ด๋ž€?

๋‹จ์ˆœํžˆ ์ด๋ฆ„๋งŒ ๋“ฃ๊ณ ์„œ๋Š” ์–ด๋–ค ๋œป์ธ์ง€ ์ •ํ™•ํ•˜๊ฒŒ ์™€๋‹ฟ์ง€ ์•Š์Šต๋‹ˆ๋‹ค.

์œ„ํ‚ค ํ”ผ๋””์•„์—์„œ๋Š” ๋ฆฌ์—‘ํ‹ฐ๋ธŒ ํ”„๋กœ๊ทธ๋ž˜๋ฐ์„ ๋‹ค์Œ๊ณผ ๊ฐ™์ด ์ •์˜ํ•˜๊ณ  ์žˆ์Šต๋‹ˆ๋‹ค.

In computing, reactive programming is a declarative programming paradigm concerned with data streams and the propagation of change. With this paradigm it is possible to express static (e.g., arrays) or dynamic (e.g., event emitters) data streams with ease, and also communicate that an inferred dependency within the associated execution model exists, which facilitates the automatic propagation of the changed data flow.

์œ„์˜ ๋‚ด์šฉ์„ ๋ณด์•„, ๋ฆฌ์—‘ํ‹ฐ๋ธŒ ํ”„๋กœ๊ทธ๋ž˜๋ฐ์€ ์„ ์–ธ์  ํ”„๋กœ๊ทธ๋ž˜๋ฐ์˜ ํ•œ ์ข…๋ฅ˜์ด๊ณ , ๋ฐ์ดํ„ฐ์˜ ํ๋ฆ„์„ ๋จผ์ € ์ •์˜ํ•˜๊ณ  ๋ฐ์ดํ„ฐ๊ฐ€ ๋ณ€๊ฒฝ๋˜์—ˆ์„ ๋•Œ ์ด์— ๋Œ€ํ•œ ์—ฐ์‚ฐ์ด ์ „ํŒŒ๋˜๋ฉด์„œ ๋ฐ์ดํ„ฐ๋ฅผ ์ŠคํŠธ๋ฆผ ํ˜•ํƒœ๋กœ ์ฒ˜๋ฆฌํ•˜๋Š” ๋ฐฉ์‹์„ ๋งํ•ฉ๋‹ˆ๋‹ค.

์„ ์–ธ์  ํ”„๋กœ๊ทธ๋ž˜๋ฐ(Declarative) ์€ ํ–‰์œ„๋ฅผ ์„ ์–ธ๋งŒ ํ•˜๋Š” ํ”„๋กœ๊ทธ๋ž˜๋ฐ ํ˜•ํƒœ๋ฅผ ๋งํ•ฉ๋‹ˆ๋‹ค.
๋ช…๋ นํ˜• ํ”„๋กœ๊ทธ๋ž˜๋ฐ(Imperative) ์ฒ˜๋Ÿผ ์ผ์ผ์ด ์ฝ”๋“œ๋ฅผ ์ง€์‹œํ•˜๊ธฐ๋ณด๋‹ค๋Š” ์˜๋„๋ฅผ ํ‘œํ˜„ํ•ฉ๋‹ˆ๋‹ค.
๋Œ€ํ‘œ์ ์œผ๋กœ ํ•จ์ˆ˜ํ˜• ํ”„๋กœ๊ทธ๋ž˜๋ฐ์˜ map, reduce ์—ฐ์‚ฐ์ด ์ด์— ํ•ด๋‹นํ•ฉ๋‹ˆ๋‹ค.

2. ๋“ฑ์žฅ ๋ฐฐ๊ฒฝ

์™œ ์จ์•ผ ํ• ๊นŒ์š”? ๊ธฐ์กด์— ์„œ๋ฒ„๋Š” ์š”์ฒญ์„ ์–ด๋–ป๊ฒŒ ์ฒ˜๋ฆฌํ•˜์˜€์„๊นŒ์š”?

์ „ํ†ต์ ์ธ ์•„ํ‚คํ…์ฒ˜์—์„œ๋Š” ๋™๊ธฐ ๋ธ”๋กœํ‚น ๋ฐฉ์‹์„ ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค. ๊ฐ„๋‹จํ•˜๊ฒŒ ํ•˜๋‚˜์˜ ์š”์ฒญ์— ๋Œ€ํ•ด ํ•˜๋‚˜์˜ ์Šค๋ ˆ๋“œ๋ฅผ ํ†ตํ•ด ์ฒ˜๋ฆฌํ•˜๋Š” ๋ฐฉ๋ฒ•์ž…๋‹ˆ๋‹ค. ๋ชจ๋“  ๋ฐ์ดํ„ฐ๋ฅผ ๊ฐ€์ ธ์™€์„œ ์ฒ˜๋ฆฌํ•  ๋•Œ๊นŒ์ง€ ํ•ด๋‹น ์Šค๋ ˆ๋“œ๋ฅผ ๋ธ”๋กœํ‚นํ•ฉ๋‹ˆ๋‹ค. ๊ตฌํ˜„๋„ ์‰ฝ๊ณ  ์•ˆ์ •์„ฑ๋„ ์–ด๋А ์ •๋„ ๋ณด์žฅ๋ฉ๋‹ˆ๋‹ค.

ํ•˜์ง€๋งŒ ํ•œ ๋ฒˆ์— ๋งŽ์€ ์š”์ฒญ์ด ๋“ค์–ด์˜ฌ ๊ฒฝ์šฐ, ๊ณ„์‚ฐํ–ˆ๋˜ TPS๋งŒํผ ์ฒ˜๋ฆฌ๋Ÿ‰์ด ๋‚˜์˜ค์ง€ ์•Š๊ณ  ์„œ๋ฒ„๋Š” ์žฅ์• ์— ๋น ์งˆ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

3. ๋น„๊ต

- ์ „ํ†ต์ ์ธ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ ๋ฐฉ์‹

์ „ํ†ต์ ์ธ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ ๋ฐฉ์‹ [1 request == 1 thread]

๊ธฐ์กด์˜ ๋ฐฉ์‹์—์„œ๋Š” ํ•˜๋‚˜์˜ ์š”์ฒญ์— ๋Œ€ํ•ด ํ•˜๋‚˜์˜ ์Šค๋ ˆ๋“œ๊ฐ€ ํ• ๋‹น๋˜๋Š” ๋ฐฉ์‹์ž…๋‹ˆ๋‹ค. ์ด๋Ÿฐ ์•„ํ‚คํ…์ฒ˜์—์„œ๋Š” ์Šค๋ ˆ๋“œ๋ฅผ ๋งŒ๋“œ๋Š” ์‹œ๊ฐ„์„ ์ค„์ด๊ธฐ ์œ„ํ•ด ์Šค๋ ˆ๋“œํ’€์„ ๋งŒ๋“ค์–ด ์ฒ˜๋ฆฌํ•ฉ๋‹ˆ๋‹ค. ํ•˜์ง€๋งŒ ์Šค๋ ˆ๋“œํ’€์„ ๋„˜์–ด์„œ๋Š” ์š”์ฒญ์ด ๋“ค์–ด์˜ฌ ๋•Œ๋Š” ์–ด๋–ป๊ฒŒ ๋ ๊นŒ์š”?

์•„๋ž˜๋Š” ์ „ํ†ต์ ์ธ ๋ชจ๋ธ์ธ Tomcat - Spring MVC ํ™˜๊ฒฝ์—์„œ ๋ถ€ํ•˜ ํ…Œ์ŠคํŠธ๋ฅผ ์ฃผ์—ˆ์„ ๋•Œ ์Šค๋ ˆ๋“œ๋“ค์˜ ์ƒํƒœ์ž…๋‹ˆ๋‹ค. ๋นจ๊ฐ„์ƒ‰์€ parked() ์ƒํƒœ๋กœ ์š”์ฒญ์„ ์ฒ˜๋ฆฌ ์ค‘์ด๋ผ ๋‹ค๋ฅธ ์ผ์„ ํ•˜์ง€ ๋ชปํ•˜๋Š” ๋ธ”๋กœํ‚น ์ƒํƒœ์ž…๋‹ˆ๋‹ค.

์•„๋ž˜์ฒ˜๋Ÿผ ๋ชจ๋“  ์Šค๋ ˆ๋“œ๊ฐ€ ์ด๋Ÿฐ ์ƒํƒœ๋ฉด ์–ด๋–ป๊ฒŒ ๋ ๊นŒ์š”? ์ดํ›„์— ๋“ค์–ด์˜ค๋Š” ์š”์ฒญ๋“ค์€ ๋ธ”๋กœํ‚น ์ƒํƒœ์— ์žˆ๋‹ค๊ฐ€ ๊ฒฐ๊ตญ ์ฒ˜๋ฆฌ๋˜์ง€ ๋ชปํ•˜๊ณ  timeout๋˜์–ด ์š”์ฒญ ์ฒ˜๋ฆฌ์— ์‹คํŒจํ•  ๊ฒƒ์ž…๋‹ˆ๋‹ค.

์ฆ‰, ์ฒ˜๋ฆฌํ•  ์Šค๋ ˆ๋“œ๊ฐ€ ๋ถ€์กฑํ•ด์„œ ์š”์ฒญ์„ ์ฒ˜๋ฆฌํ•˜์ง€ ๋ชปํ•ฉ๋‹ˆ๋‹ค. CPU์™€ ๋ฉ”๋ชจ๋ฆฌ๊ฐ€ ์ถฉ๋ถ„ํ•œ๋ฐ๋„ ๋ง์ด์ฃ .

๊ทธ๋ƒฅ ์Šค๋ ˆ๋“œ ๊ฐœ์ˆ˜๋ฅผ ๋Š˜๋ฆฌ๊ฑฐ๋‚˜, Scale-up, Scale-out ํ•˜๋ฉด ์•ˆ ๋˜๋‚˜์š”?

์Šค๋ ˆ๋“œ๋ฅผ ๋ฌด์ž‘์ • ๋Š˜๋ฆด ๊ฒฝ์šฐ ์ด๋ฒˆ์—๋Š” ์Šค๋ ˆ๋“œ๊ฐ€ ์ถฉ๋ถ„ํ•จ์—๋„ ๋ถˆ๊ตฌํ•˜๊ณ  ๊ณผ๋„ํ•œ ์ปจํ…์ŠคํŠธ ์Šค์œ„์นญ์ด ๋ฐœ์ƒํ•˜์—ฌ CPU์™€ ๋ฉ”๋ชจ๋ฆฌ ๋•Œ๋ฌธ์— ์ฒ˜๋ฆฌ์œจ์ด ๋–จ์–ด์งˆ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

์ปจํ…์ŠคํŠธ ์Šค์œ„์นญ์ด๋ž€ ์Šค๋ ˆ๋“œ์˜ ์ƒํƒœ๋ฅผ ์ €์žฅํ•˜๊ณ  ๋‹ค๋ฅธ ์Šค๋ ˆ๋“œ๋ฅผ ๊ฐ€์ ธ์™€์„œ ์‹คํ–‰ํ•˜๋Š” ํ–‰์œ„์ž…๋‹ˆ๋‹ค. ํ”„๋กœ์„ธ์Šค์˜ ์ •๋ณด๋Š” PCB ์•ˆ์— ์ €์žฅ๋˜์–ด ์žˆ๊ณ  ์ด PCB ์•ˆ์— ํ˜„์žฌ ์‹คํ–‰ ์ค‘์ธ ์Šค๋ ˆ๋“œ ์ •๋ณด๋“ค์ด ์žˆ์Šต๋‹ˆ๋‹ค. CPU๋Š” ๊ธฐ์กด์— ์ฒ˜๋ฆฌํ•˜๋˜ ์Šค๋ ˆ๋“œ์˜ ์ƒํƒœ๋ฅผ ์ €์žฅํ•˜๊ณ  ๋‹ค๋ฅธ ์Šค๋ ˆ๋“œ์˜ ์ƒํƒœ๋ฅผ ๋กœ๋“œํ•˜์—ฌ ์ฒ˜๋ฆฌํ•ฉ๋‹ˆ๋‹ค. ๋‹น์—ฐํžˆ ์ด๋•Œ ์ปดํ“จํŒ… ์—ฐ์‚ฐ์ด ํ•„์š”ํ•˜๋ฉฐ, ์Šค๋ ˆ๋“œ๊ฐ€ ๋งŽ์„์ˆ˜๋ก ์ด๋Ÿฐ ์—ฐ์‚ฐ์€ ๋งŽ์•„์งˆ ์ˆ˜๋ฐ–์— ์—†์Šต๋‹ˆ๋‹ค.

Scale up๊ณผ Scale out์ด ํ•˜๋‚˜์˜ ํ•ด๊ฒฐ์ฑ…์ด ๋  ์ˆ˜ ์žˆ์ง€๋งŒ, ๊ทผ๋ณธ์ ์ธ ๋ฌธ์ œ๊ฐ€ ํ•ด๊ฒฐ๋œ ๊ฒƒ์ด ์•„๋‹ˆ๊ธฐ ๋•Œ๋ฌธ์— ์ธํ”„๋ผ ์ž์›์„ ๋‚ญ๋น„ํ•˜๊ฒŒ ๋  ๊ฒƒ์ž…๋‹ˆ๋‹ค.

- Reactive Stream์˜ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ ๋ฐฉ์‹

Reactive Stream์—์„œ์˜ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ ๋ฐฉ์‹ [many request == 1 thread]

๊ทธ๋ ‡๋‹ค๋ฉด Reactive Programming์—์„œ๋Š” ์–ด๋–จ๊นŒ์š”? ๋‹ค์Œ์€ ๋ฆฌ์—‘ํ‹ฐ๋ธŒ ํ”„๋กœ๊ทธ๋ž˜๋ฐ ๊ธฐ๋ฐ˜์˜ Netty - Spring Webflux ํ™˜๊ฒฝ์—์„œ ๋ถ€ํ•˜ ํ…Œ์ŠคํŠธ๋ฅผ ํ–ˆ์„ ๋•Œ ์Šค๋ ˆ๋“œ๋“ค์˜ ์ƒํƒœ์ž…๋‹ˆ๋‹ค. ๋ณด์‹œ๋‹ค์‹œํ”ผ ๋งŽ์€ ์Šค๋ ˆ๋“œ๋ฅผ ๋งŒ๋“ค์ง€ ์•Š์Šต๋‹ˆ๋‹ค.

Netty ๊ธฐ๋ฐ˜ ์›นํ”Œ๋Ÿญ์Šค์—์„œ๋Š” CPU ์ฝ”์–ด ร— 2๊ฐœ์˜ ์Šค๋ ˆ๋“œ๋งŒ ์‚ฌ์šฉํ•ด์„œ ์š”์ฒญ์„ ์ฒ˜๋ฆฌํ•ฉ๋‹ˆ๋‹ค. ์•ž๋‹จ์—์„œ ์š”์ฒญ์„ ๋ฐ›๋Š” reactor-http-nio ์Šค๋ ˆ๋“œ๋“ค์€ ์ ์€ ์ˆ˜๋ฅผ ์œ ์ง€ํ•˜๋ฉด์„œ ์—ฐ์‚ฐ์„ ๋’ค์˜ ์ด๋ฒคํŠธ ๋ฃจํ”„์—๊ฒŒ ์œ„์ž„ํ•˜๊ณ  ๊ฒฐ๊ณผ๊ฐ’๋งŒ ๋ฐ›์•„์„œ ์ฒ˜๋ฆฌํ•ฉ๋‹ˆ๋‹ค. ๊ฒฐ๊ณผ์ ์œผ๋กœ๋Š” ์ปจํ…์ŠคํŠธ ์Šค์œ„์นญ์„ ์ค„์ผ ์ˆ˜ ์žˆ๊ณ  ๋” ๋งŽ์€ ์š”์ฒญ์„ ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

์ž๋ฐ” NIO์˜ ๋…ผ๋ธ”๋กœํ‚น I/O(selector)๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์ด๋ฅผ ๊ฐ€๋Šฅํ•˜๊ฒŒ ํ•˜์ง€๋งŒ ๊ธ€์˜ ๋ฒ”์œ„๋ฅผ ๋ฒ—์–ด๋‚˜๊ธฐ ๋•Œ๋ฌธ์— ๋”ฐ๋กœ ์„ค๋ช…ํ•˜์ง€๋Š” ์•Š๊ฒ ์Šต๋‹ˆ๋‹ค. (Linux ์ƒ์—์„œ๋Š” epoll์„ ์ง์ ‘ ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค.)

4. ๋ฆฌ์—‘ํ‹ฐ๋ธŒ ํ”„๋กœ๊ทธ๋ž˜๋ฐ์˜ ๊ธฐ๋ฐ˜ ๊ธฐ์ˆ 

  • Observer Pattern (๊ด€์ฐฐ์ž ํŒจํ„ด): ์ผ๋Œ€๋‹ค์˜ ๊ด€๋ จ์„ฑ์„ ๊ฐ–๋Š” ๊ฐ์ฒด๋“ค ์ค‘ ํ•œ ๊ฐ์ฒด์˜ ์ƒํƒœ๊ฐ€ ๋ณ€ํ•˜๋ฉด ๋‹ค๋ฅธ ๋ชจ๋“  ๊ฐ์ฒด์— ๊ทธ ์‚ฌํ•ญ์„ ์•Œ๋ฆฌ๊ณ  ํ•„์š”ํ•œ ์ˆ˜์ •์ด ์ž๋™์œผ๋กœ ์ด๋ฃจ์–ด์ง€๋„๋ก ํ•˜๋Š” ๋””์ž์ธ ํŒจํ„ด์ž…๋‹ˆ๋‹ค. ConcreteSubject์˜ ์ƒํƒœ์— ๋ณ€๊ฒฝ์ด ์ผ์–ด๋‚ฌ์„ ๋•Œ Subject์˜ notify() ํ•จ์ˆ˜๋ฅผ ํ†ตํ•ด Observer์˜ ์ƒํƒœ๋ฅผ ๋ณ€๊ฒฝํ•ฉ๋‹ˆ๋‹ค. ์˜ต์ €๋ฒ„ ํŒจํ„ด์—์„œ Subject๋Š” Observer์˜ ์ƒํƒœ๋ฅผ ๊ณ ๋ คํ•˜์ง€ ์•Š๊ณ  ์ด๋ฒคํŠธ๋ฅผ ๋ณด๋‚ด๊ธฐ ๋•Œ๋ฌธ์— push ๋ฐฉ์‹์ด๋ผ๊ณ  ํ•ฉ๋‹ˆ๋‹ค.
  • Iterator Pattern (๋ฐ˜๋ณต์ž ํŒจํ„ด): ์ปฌ๋ ‰์…˜ ๊ตฌํ˜„ ๋ฐฉ๋ฒ•์„ ๋…ธ์ถœ์‹œํ‚ค์ง€ ์•Š์œผ๋ฉด์„œ ์ปฌ๋ ‰์…˜ ์•ˆ์— ๋“ค์–ด์žˆ๋Š” ๋ชจ๋“  ์—˜๋ฆฌ๋จผํŠธ์— ์ ‘๊ทผํ•  ์ˆ˜ ์žˆ๋Š” ๋ฐฉ์‹์„ ๊ตฌํ˜„ํ•œ ํŒจํ„ด์ž…๋‹ˆ๋‹ค. ์˜ต์ €๋ฒ„ ํŒจํ„ด๊ณผ ๋‹ฌ๋ฆฌ ๋ฐ˜๋ณต์ž ํŒจํ„ด์€ next()๋ฅผ ํ†ตํ•ด ๋ฐ์ดํ„ฐ๋ฅผ ๋ฆฌํ„ด๋ฐ›์•„์„œ pull ๋ฐฉ์‹์ด๋ผ๊ณ  ํ•ฉ๋‹ˆ๋‹ค.

5. ๋ฆฌ์—‘ํ‹ฐ๋ธŒ ์ŠคํŠธ๋ฆผ

Reactive Stream์ด๋ž€ ๋…ผ๋ธ”๋กœํ‚น๊ณผ ๋ฐฑํ”„๋ ˆ์…”๋ฅผ ๊ฐ–์ถ˜ ๋น„๋™๊ธฐ ์ŠคํŠธ๋ฆผ ์ฒ˜๋ฆฌ๋ฅผ ์œ„ํ•œ ํ‘œ์ค€์ž…๋‹ˆ๋‹ค. ๊ณต์‹ ํ™ˆํŽ˜์ด์ง€์—์„œ๋Š” ๋„คํŠธ์›Œํฌ๋ฟ ์•„๋‹ˆ๋ผ JVM๊ณผ JS ๊ฐ™์€ ๋Ÿฐํƒ€์ž„ ํ™˜๊ฒฝ์—์„œ๋„ ๋Œ์•„๊ฐ€๋„๋ก ํ•˜๋Š” ๊ฒƒ์ด ๋ชฉํ‘œ๋ผ๊ณ  ์ ํ˜€ ์žˆ์Šต๋‹ˆ๋‹ค. ๋” ์ž์„ธํ•œ API ๋ช…์„ธ๋Š” ์ด ๊ณณ์—์„œ ํ™•์ธํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

Reactive Stream = Observer Pattern + Iterator Pattern

์•„๋ž˜์™€ ๊ฐ™์€ ํŠน์ง• ๋•Œ๋ฌธ์— ๋ฆฌ์—‘ํ‹ฐ๋ธŒ ์ŠคํŠธ๋ฆผ์€ ์˜ต์ €๋ฒ„ ํŒจํ„ด๊ณผ ๋ฐ˜๋ณต์ž ํŒจํ„ด์˜ ๊ฒฐํ•ฉ์ด๋ผ๊ณ  ํ•ฉ๋‹ˆ๋‹ค. ํ•œ๋งˆ๋””๋กœ ํ‘œํ˜„ํ•˜๋ฉด **โ€˜๋‚ด๊ฐ€ ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ๋Š” ์–‘๋งŒํผ๋งŒ ๊ฐ€์ ธ์™€์„œ ์ฒ˜๋ฆฌํ•œ๋‹คโ€™**๋ผ๊ณ  ๋ด์•ผ๊ฒ ๋„ค์š”.

๋ฆฌ์—‘ํ‹ฐ๋ธŒ ์ŠคํŠธ๋ฆผ๊ณผ ์˜ต์ €๋ฒ„ ํŒจํ„ด์˜ ์ฐจ์ด์ 

  • [Backpressure] Hybrid Pull / Push

๊ธฐ์กด ์˜ต์ €๋ฒ„ ํŒจํ„ด์—์„œ ๋ฐ์ดํ„ฐ ๋ณ€ํ™”๋Š” Publisher(Subject) ๊ฐ€ Subscriber(Observer) ์—๊ฒŒ ๋ฐ์ดํ„ฐ๋ฅผ ๋ฐ€์–ด ๋„ฃ๋Š” notify ํ˜•ํƒœ์˜€์Šต๋‹ˆ๋‹ค. ํ•˜์ง€๋งŒ 1์ดˆ์— 10๊ฐœ๋ฐ–์— ์š”์ฒญ์„ ์ฒ˜๋ฆฌํ•˜์ง€ ๋ชปํ•˜๋Š” Subscriber์—๊ฒŒ Publisher๊ฐ€ ๋” ๋งŽ์€ ์ด๋ฒคํŠธ๋ฅผ ๋ณด๋‚ด๋ฉด ์–ด๋–ป๊ฒŒ ๋ ๊นŒ์š”?

๋‹น์—ฐํžˆ ์š”์ฒญ์€ ์ฒ˜๋ฆฌ๋˜์ง€ ๋ชปํ•  ๊ฒƒ์ด๊ณ , ์ด๋ฅผ ์œ„ํ•ด ๋ฒ„ํผ๋ฅผ ๋‘”๋‹ค๊ณ  ํ•œ๋“ค ์š”์ฒญ์ด ์Œ“์ด๋Š” ๊ฑด ๋˜‘๊ฐ™๊ธฐ ๋•Œ๋ฌธ์— ๋ฌธ์ œ๋Š” ํ•ด๊ฒฐ๋˜์ง€ ์•Š์„ ๊ฒƒ์ž…๋‹ˆ๋‹ค.

Reactive Stream์—์„œ๋Š” ์ด ๋ฌธ์ œ๋ฅผ ํ•ด๊ฒฐํ•˜๊ธฐ ์œ„ํ•ด Backpressure๋ฅผ ๋„์ž…ํ•˜์˜€์Šต๋‹ˆ๋‹ค. Backpressure๋ž€ ๋ฐฐ์••์ด๋ผ๋Š” ๋œป์œผ๋กœ ๋ฐฐ๊ด€ ๋“ฑ์—์„œ ๊ณผํˆฌ์ž…๋˜๋Š” ๊ฒƒ์„ ๋ง‰๊ธฐ ์œ„ํ•ด ์—ญ์œผ๋กœ ์••๋ ฅ์„ ์ฃผ์–ด ์••๋ ฅ์„ ์•ฝํ™”์‹œํ‚ค๋Š” ๊ฒƒ์„ ๋งํ•ฉ๋‹ˆ๋‹ค. ์†Œํ”„ํŠธ์›จ์–ด ๋ ˆ๋ฒจ์—์„œ๋„ ๋˜‘๊ฐ™์ด ์ž‘์šฉํ•ฉ๋‹ˆ๋‹ค. ์š”์ฒญ์ด ๊ณผํˆฌ์ž…๋˜์–ด Subscriber๊ฐ€ ์˜ค๋ฒ„ํ”Œ๋กœ์šฐ ๋˜๋Š” ๊ฒƒ์„ ๋ง‰๊ธฐ ์œ„ํ•œ ์šฉ๋„๋กœ Backpressure๊ฐ€ ์‚ฌ์šฉ๋ฉ๋‹ˆ๋‹ค.

์ด ๋ถ€๋ถ„์€ ๋ง๋กœ๋งŒ ํ•˜๋ฉด ์ดํ•ด๊ฐ€ ์ž˜ ๊ฐ€์ง€ ์•Š์œผ๋‹ˆ ์ฝ”๋“œ ๋ ˆ๋ฒจ์—์„œ ์‚ดํŽด๋ณด๊ฒ ์Šต๋‹ˆ๋‹ค. ๋‹ค์Œ์€ Reactive Stream์˜ JVM ๊ตฌํ˜„์ฒด ์ค‘ ํ•˜๋‚˜์ธ Project Reactor์˜ ์ฝ”๋“œ์ž…๋‹ˆ๋‹ค.

import org.junit.jupiter.api.Test;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Flux;

public class BackPressureTest {

    @Test
    public void subscribeTest() {
        Flux.range(0, 1000)
                .log()
                .subscribe(new MySubscriber());
    }
}

class MySubscriber implements Subscriber<Object> {

    private Subscription subscription;
    private int requestCnt;

    @Override
    public void onSubscribe(Subscription s) {
        this.subscription = s;

        // ์ตœ์ดˆ subscribe์‹œ 10๊ฐœ๋งŒ ์š”์ฒญ
        this.subscription.request(10);
    }

    @Override
    public void onNext(Object object) {

        // ์ดํ›„ ์ „๋ถ€ ๊ฐ€์ ธ์˜ฌ๋•Œ๊นŒ์ง€ 10๊ฐœ์”ฉ ๋ฐ˜๋ณต์š”์ฒญ
        requestCnt++;
        if (requestCnt % 10 == 0) {
            this.subscription.request(10);
        }
    }

    @Override
    public void onError(Throwable t) {

    }

    @Override
    public void onComplete() {
        System.out.println("Subscribe Finished");
    }
}

์‹คํ–‰ ๊ฒฐ๊ณผ

Publisher๊ฐ€ subscribeํ•˜๋ฉด onSubscribe๋ฅผ ํ†ตํ•ด Subscription ๊ฐ์ฒด๋ฅผ ๊ฐ€์ ธ์˜ต๋‹ˆ๋‹ค. ์—ฌ๊ธฐ์„œ ํ•œ ๋ฒˆ์— ๋ฐ์ดํ„ฐ๋ฅผ ๋‹ค ๊ฐ€์ ธ์˜ค๋Š” ๊ฒƒ์ด ์•„๋‹ˆ๋ผ onNext()๋ฅผ ํ†ตํ•ด Subscription์˜ ๋ฐ์ดํ„ฐ๋ฅผ ํ•„์š”ํ•œ ๋งŒํผ ๋‚˜๋ˆ ์„œ ๊ฐ€์ ธ์˜ต๋‹ˆ๋‹ค. ์™œ Observer + Iterator ํŒจํ„ด์ด๋ผ๋Š”์ง€ ์•Œ ์ˆ˜ ์žˆ๋Š” ๋ถ€๋ถ„์ž…๋‹ˆ๋‹ค. ์ด๋ฅผ ํ†ตํ•ด Subscriber๊ฐ€ ์ˆ˜์šฉํ•  ์ˆ˜ ์žˆ๋Š” ๋งŒํผ ๋ฐ์ดํ„ฐ๋ฅผ ๊ฐ€์ ธ์˜ฌ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์ด๊ฒŒ Backpressure์˜ ๊ธฐ๋ณธ ๊ฐœ๋…์ž…๋‹ˆ๋‹ค. (๋ฌผ๋ก  WebFlux์—์„œ๋Š” ์ž์ฒด์ ์œผ๋กœ Subscriber๋ฅผ ๋งŒ๋“ค์–ด์ค๋‹ˆ๋‹ค.)

๋ฌผ๋ก  Subscriber์˜ ๋ถ€ํ•˜๋ฅผ ์œ ์ง€ํ•  ์ˆ˜ ์—†์„ ๊ฒฝ์šฐ๋ฅผ ๋Œ€๋น„ํ•ด ํ์— ์ž„์‹œ ์ €์žฅํ•˜์—ฌ ๋ฒ„ํผ๋ง์„ ํ•  ์ˆ˜๋„ ์žˆ์Šต๋‹ˆ๋‹ค.

import org.junit.jupiter.api.Test;
import org.junitpioneer.jupiter.SetSystemProperty;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.scheduler.Schedulers;

public class BackpressureBufferStrategyTest {

    @Test
    @SetSystemProperty(key = "reactor.bufferSize.small", value = "20")
    public void ๊ณ ์ •_๊ธธ์ด_๋ฒ„ํผ() throws InterruptedException {

        Flux<Object> fluxAsyncBackup = Flux.create(emitter -> {

            for (int i = 0; i < 50; i++) {
                emitter.next(i);
            }
            emitter.complete();

        }, FluxSink.OverflowStrategy.BUFFER) // ์˜ค๋ฒ„ํ”Œ๋กœ์šฐ์‹œ ๋ฒ„ํผ๋ง
        .onBackpressureBuffer(20);  // ๋ฒ„ํผ๋ง ์‚ฌ์ด์ฆˆ 20 (๊ณ ์ •๊ธธ์ด)

        fluxAsyncBackup
            .subscribeOn(Schedulers.elastic())
            .publishOn(Schedulers.elastic())
            .subscribe(reqNum -> {
                System.out.printf("%s  | Received = %s\n",
                    Thread.currentThread().getName(), reqNum);

                    // ์š”์ฒญ ์ฒ˜๋ฆฌ๊ฐ€ 1์ดˆ๊ฐ€ ๊ฑธ๋ฆฌ๋Š” ๋А๋ฆฐ Subscriber ์žฌํ˜„
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }

                }, error ->
                    System.out.printf("%s  | Error = %s %s\n" ,
                            Thread.currentThread().getName(), error.getClass().getSimpleName(), error.getMessage())
                );
        Thread.sleep(100000);
    }
}

์œ„์™€ ๊ฐ™์ด ํ•˜๋‚˜์˜ ์š”์ฒญ์ด 1์ดˆ๊ฐ€ ๊ฑธ๋ฆฌ๋Š” ๋А๋ฆฐ Subscriber๊ฐ€ ์กด์žฌํ•  ๊ฒฝ์šฐ, ์š”์ฒญ์— ๋Œ€ํ•ด ๋ฒ„ํผ๋ง(ํ์ž‰)์„ ํ†ตํ•ด ๋‚˜๋จธ์ง€ ์ด๋ฒคํŠธ๋ฅผ ์ €์žฅํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

ํ•ด๋‹น ์ฝ”๋“œ๋Š” ๊ธฐ๋ณธ์ ์œผ๋กœ 20 ์‚ฌ์ด์ฆˆ์˜ ์š”์ฒญ์„ ์ฒ˜๋ฆฌํ•˜์ง€๋งŒ, ์ถ”๊ฐ€๋กœ 20 ์‚ฌ์ด์ฆˆ ๋ฒ„ํผ์— ์ด๋ฒคํŠธ๋ฅผ ์ €์žฅํ•˜์—ฌ ์ด 40๊ฐœ์˜ ์š”์ฒญ์„ ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ์—ˆ์Šต๋‹ˆ๋‹ค. ๊ณ ์ • ๊ธธ์ด ๋ฒ„ํผ์ด๊ธฐ ๋•Œ๋ฌธ์— ๋ฒ„ํผ ์‚ฌ์ด์ฆˆ๋ฅผ 40๋ณด๋‹ค ๋งŽ์€ (50๊ฐœ) ์š”์ฒญ์ด ๋“ค์–ด์™”๊ธฐ ๋•Œ๋ฌธ์— ์˜ค๋ฒ„ํ”Œ๋กœ ์ต์…‰์…˜์ด ๋ฐœ์ƒํ•˜์˜€์Šต๋‹ˆ๋‹ค.

์ด๋ฅผ ๊ฐ€๋ณ€ ๊ธธ์ด๋กœ ๋ณ€ํ™˜ํ•˜๋ ค๋ฉด .onBackpressureBuffer(20) ๋ถ€๋ถ„์„ ์ง€์›Œ์ฃผ๋ฉด ๋ฉ๋‹ˆ๋‹ค. ๋‹จ, ๊ฐ€๋ณ€ ๊ธธ์ด์˜ ๊ฒฝ์šฐ ๋„ˆ๋ฌด ๋งŽ์€ ๋ฒ„ํผ๋ง๊ณผ ๋ฐฐ์—ด์˜ ๋ณต์ œ ๊ณผ์ •์œผ๋กœ ์ธํ•ด OutOfMemory Exception์ด ๋ฐœ์ƒํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

OverflowStrategy.BUFFER ์ด์™ธ์—๋„ ํ—ˆ์šฉ์น˜๋ฅผ ๋„˜์–ด์„œ๋Š” ์š”์ฒญ์ด ๋“ค์–ด์˜ค๋ฉด ๋ฒ„๋ฆฌ๋Š” DROP, ๊ฐ€์žฅ ์ตœ์‹  ๊ฒƒ๋งŒ ๊ฐ€์ ธ์˜ค๋Š” LATEST ๋“ฑ ์—ฌ๋Ÿฌ ์ „๋žต์ด ์žˆ์Šต๋‹ˆ๋‹ค.

Reactive Stream์˜ ํ๋ฆ„๋„ (marble diagram)

Reactive Stream์€ ๋‹ค์Œ๊ณผ ๊ฐ™์ด ๋งˆ๋ธ” ๋‹ค์ด์–ด๊ทธ๋žจ ํ˜•ํƒœ๋กœ ํ‘œํ˜„ํ•ฉ๋‹ˆ๋‹ค.

6. ํ›„๊ธฐ

Spring Webflux๋ฅผ ๋ฐฐ์šฐ๊ธฐ ์œ„ํ•ด ๊ณต๋ถ€ํ–ˆ๋˜ ๊ฐœ๋…์ด์—ˆ์ง€๋งŒ, Reactive Stream ์ž์ฒด๊ฐ€ ์ƒ๊ฐ๋ณด๋‹ค ์•Œ์•„์•ผ ํ•  ๊ฐœ๋…๋“ค์ด ๋งŽ๊ณ  ๋ณต์žกํ•ฉ๋‹ˆ๋‹ค. ์•„์ง ๋ชจ๋ฅด๋Š” ๋ถ€๋ถ„์ด ๋งŽ๊ณ  ๊ณต๋ถ€ํ•ด์•ผ ํ•  ๋ถ€๋ถ„์ด ๋งŽ์€ ๊ฒƒ ๊ฐ™๋„ค์š”. ๋‚˜๋จธ์ง€๋Š” ๋‹ค์Œ์— ์ •๋ฆฌํ•ด์•ผ๊ฒ ์Šต๋‹ˆ๋‹ค.

7. ํ˜ผ๋ž€์Šค๋Ÿฌ์› ๋˜ ๊ฐœ๋…๋“ค

Reactive Programming ๊ณผ Reactive System

๋ฆฌ์—‘ํ‹ฐ๋ธŒ ํ”„๋กœ๊ทธ๋ž˜๋ฐ์„ ๊ณต๋ถ€ํ•˜๋‹ค ๋ณด๋ฉด ๋‹ค์Œ๊ณผ ๊ฐ™์€ ํ˜ผ์„ ์„ ์œ ๋ฐœํ•˜๋Š” ์šฉ์–ด๋“ค์ด ๋ฐœ์ƒํ•ฉ๋‹ˆ๋‹ค. ์กฐ๊ธˆ๋งŒ ์ฐพ์•„๋ด๋„ Reactive Programming์€ Event-Driven์ธ๋ฐ Reactive System์€ Message-Driven์ด๋ผ๋Š” ๊ธ€์ด ์ˆ˜๋‘๋ฃฉํ•˜๊ฒŒ ๋‚˜์˜ต๋‹ˆ๋‹ค. ์ด๋Š” ํ•˜๋‚˜์˜ ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜ ๊ด€์ , ์ „์ฒด ์•„ํ‚คํ…์ฒ˜์˜ ๊ด€์ ์œผ๋กœ ๊ตฌ๋ถ„ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

๋ฆฌ์—‘ํ‹ฐ๋ธŒ ํ”„๋กœ๊ทธ๋ž˜๋ฐ์„ ํ•œ๋‹ค๊ณ  ์‹œ์Šคํ…œ ์•„ํ‚คํ…์ฒ˜๋ฅผ ๋ฆฌ์—‘ํ‹ฐ๋ธŒ ์‹œ์Šคํ…œ์œผ๋กœ ๋งŒ๋“ค ์ˆ˜๋Š” ์—†์Šต๋‹ˆ๋‹ค. ๋ฆฌ์—‘ํ‹ฐ๋ธŒ ์‹œ์Šคํ…œ์ด ๋˜๊ธฐ ์œ„ํ•ด์„œ๋Š” ๊ฐ ์ปดํฌ๋„ŒํŠธ๋“ค์ด ๋ฉ”์‹œ์ง€ ๋ธŒ๋กœ์ปค ๋“ฑ์„ ํ†ตํ•ด์„œ ๋ฉ”์‹œ์ง€ ๊ธฐ๋ฐ˜์œผ๋กœ ํ†ต์‹ ํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค. ๋˜ํ•œ ์š”์ฒญ์— ๋Œ€ํ•œ ํƒ„๋ ฅ์„ฑ๊ณผ ์œ ์—ฐ์„ฑ์ด ๋ณด์žฅ๋˜์–ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.

์ž์„ธํ•œ ๋‚ด์šฉ์€ ๊ณต์‹ ๋ฌธ์„œ๋ฅผ ์ฐธ๊ณ ํ•ด์ฃผ์„ธ์š”.

์ฆ‰, ์ •๋ฆฌํ•˜์ž๋ฉด ์•„๋ž˜์™€ ๊ฐ™์ด ๋˜๊ฒ ๊ตฐ์š”.

Reactive ProgrammingReactive System
Event-DrivenMessage-Driven
Observer PatternPub - Sub Pattern

์ž˜๋ชป๋œ ๋ถ€๋ถ„์— ๋Œ€ํ•œ ์ง€์ ์€ ๊ฐ์‚ฌํžˆ ๋ฐ›๊ฒ ์Šต๋‹ˆ๋‹ค!

์ถœ์ €

์‹ค์ „! ์Šคํ”„๋ง 5๋ฅผ ํ™œ์šฉํ•œ ๋ฆฌ์•กํ‹ฐ๋ธŒ ํ”„๋กœ๊ทธ๋ž˜๋ฐ projectreactor.io/docs/core/release/reference/

๋ธ”๋กœ๊ทธ๋กœ ๋Œ์•„๊ฐ€๊ธฐ