Webflux 실습 자료
- 2018-2021년까지 진행했던 webflux 실습 강의 자료
- github
사전준비
- 필수 : JDK 17
- 필수 : GRADLE 7.2+
- 필수 : IDEA
- 선택 : docker
- 선택 : docker-compose 설치 확인
실행
git clone https://github.com/chk386/webflux2021
cd webflux2021
gradle clean assemble
gradle bootRun -Dspring.profiles.active=cloud
로컬에서 kafka, mysql, mongodb, redis 띄우고 실행하는 법
- /docker/docker-compose up
- 몽고 - db, 계정 생성
docker exec -it mongo /bin/sh
mongo -u nhn -p nhn
use webflux
db.createUser({user: "webflux", pwd: "webflux", roles: ["readWrite"]}) - mysql 스키마 생성
- r2dbc:mysql://localhost:3306/webflux (webflux/webflux) 접속
- schema-mysql.sql 실행
- gradle clean build
- gradle bootRun
image로 실행 (설정하기 싫다면..)
docker run -e spring.profiles.active=cloud -p 8080:8080 -m=1G chk386/webflux
dockerizing 참고
gradle bootBuildImage --imageName=chk386/webflux:latest # container 만들기
# image upload to docker hub
docker login # docker hub 계정 필요
docker tag chk386/webflux:latest chk386/webflux:latest # docker hub repository로 변경필요
docker push chk386/webflux:latest
docker run -e spring.profiles.active=local -p 8080:8080 -m=2G chk386/webflux # 실행
Spring Webflux?
- Spring 5.x+
- fully non-blocking, back pressure
- Reactor Netty, undertow, Servlet 3.1+ Containers (tomcat 9.x)
Why was Spring WebFlux created?/
- Part of the answer is the need for a non-blocking web stack to handle concurrency with a small number of threads and scale with fewer hardware resources.
- The other part of the answer is functional programming
Programming models
// Annotated Controllers
@PostMapping("/owners/{ownerId}/pets/{petId}/edit")
public Mono<String> processSubmit(@Valid @ModelAttribute("pet") Mono<Pet> petMono) {
return petMono
.flatMap(pet -> {
// ...
})
.onErrorResume(ex -> {
// ...
});
}
multi-thread vs event loop
multi-thread
Thread Pool
- blocking가 길어질경우 cpu, 메모리등 하드웨어 리소스는 여유롭지만 쓰레드 부족
- thread pool의 수를 늘리면 context switching비용 증가
event loop
- event loop은 few threads로 동작한다.
- event loop은 순차적으로 event queue의 event를 처리하며 platform에 callback을 등록후 즉시 리턴한다.
- event loop은 작업이 완료된 callback을 triggering한다.
- 메인쓰레드는 small thread로 실행되며 내부,외부 IO(파일입출력, DB호출, http 통신등)에게 이벤트를 보내고 콜백(결과가 아니라 함수)을 전달하여 별도 쓰레드에서 작업 완료시 메인쓰레드에서 콜백이 실행
- 콜백이 오기 전까지 기다리지 않고 메인쓰레드에서는 다른 작업을 실행
sync - async, blocking - nonblocking
- blocking: 호출시 제어권 넘김
- nonblocking: 호출시 제어권은 그대로 두고 실행만 시킴
- sync : 순차적 실행, 작업이 끝날때까지 다른 작업 X
- async: 병렬적 실행, 작업이 끝나기를 기다리지 않고 다른 작업 수행
Sync-Blocking
예 : servlet stack
Sync-NonBlocking
// jdk 1.5
Future<Integer> future = new SquareCalculator().calculate(10);
while(!future.isDone()) {
System.out.println("Calculating...");
Thread.sleep(300);
}
Integer result = future.get();
Async-Blocking
- spring webflux에서 jdbc를 사용할 경우
- node.js + mysql
Async-NonBlocking
// jdk 1.8
CompletableFuture
.supplyAsync(() -> "Hello, World")
.exceptionally(Throwable::getMessage)
.thenApply(s -> s + "!!!")
.handle((s, t) -> s != null ? s : "Hello, Stranger!!!");
JDK, Spring 버전별 Async 키워드
jdk 1.5
Future FutureTask Callable
jdk 8
CompletableFuture -> Mono.fromFuture(CompletableFuture::new)
jdk9 - Flow API
Publisher Subscriber Subscription Processor
spring4
@Async ListenableFuture AsyncRestTemplate deferredResult WebAsyncTask CompletionStage ResponseBodyEmitter
spring5 - 실습
Mono : https://github.com/chk386/webflux2021/blob/main/src/test/java/com/nhn/webflux2021/MonoTest.java Flux : https://github.com/chk386/webflux2021/blob/main/src/test/java/com/nhn/webflux2021/FluxTest.java
Reactive
- reactive : 변화에 반응 하는것, 반응형 프로그래밍
- network components reacting to I/O events
- UI controllers reacting to mouse
- non-blocking with backpressure
Reactive Programming
인프라 스트럭처에 대한 도전 -> async-nonblocking 프로그래밍 모델의 전환 (보이지 않는 리소스 문제를 해결하기 위해 보이는 코드의 변화) -> functional programming
publisher가 subscriber를 압도하지 않도록 backpressure와 small thread를 사용하여 nonblocking/event driven 방식의 프로그래밍
Reactive Streams
Reactive Streams is a standard for asynchronous data processing in a streaming fashion with non-blocking back pressure.
- 2013년 말 neflix, pivotal, lightbend 엔지니어들이 시작
- 비동기 스트림 처리를 위해서 표준을 제공하기 위한 이니셔티브
- contributors : Lightbend(play, akka team), netflix, pivotal, Red Hat, Oracle, Twitter, spray.io
- 구현체들
- ReactiveX(cross-platform, Microsoft, Netflix)
- ProjectReactor(Spring, Pivotal)
Interface
@FunctionalInterface
public static interface Publisher<t> {
public void subscribe(Subscriber<? super T> subscriber);
}
public static interface Subscriber<T> {
public void onSubscribe(Subscription subscription);
public void onNext(T item);
public void onError(Throwable throwable);
public void onComplete();
}
public static interface Subscription {
public void request(long n);
public void cancel();
}
public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
}
BackPressure
subscriber는 publisher가 push해주는 데이터나 이벤트들의 흐름을 제어할 수 있도록 backpressure를 제공한다.
ProjectReactor
- reactive streams 인터페이스의 jvm 구현체
- pivotal에서 오픈소스로 관리
- Publisher의 구현체이며 수많은 operator를 제공
- subscriber와 publisher의 실행되는 쓰레드 풀을 지정하여 비동기 논블럭킹 프로그래밍을 쉽게 구현 가능
- 모든 마이크로 리엑티브 툴킷
- spring boot and webflux, reactive client(redis, mongo, kafka, RSocket, R2DBC, Netty)
Mono, Flux
- Flux : 0...N개의 데이터를 발행(emit), 하나의 데이터를 전달할때마다 onNext이벤트 발생, 모든 데이터가 푸시되면 oncomplete 이벤트 발생, 데이터를 전달하는 과정에서 오류가 발생하면 onError이벤트 발생
- Mono : 0...1을 의미
Mono.subscribe(), Flux.subscribe()가 실행되는 순간 publisher는 이벤트(데이터)를 emit한다.
Iterable vs Observable vs Reactive Streams
Iterable | Observable | Reactive Streams |
---|---|---|
it.next() | notifyObservers(i) | s.onNext(i) |
E next(); | void notifyObservers(Object arg) | void onNext(T t) |
pull | push | push |
전통적인 비동기 프로그래밍 방식인 Observable과 비교하여 다음과 같은 장점이 있다.
- 에러전파 구현이 쉽다.
- 완료 시점을 알수 있다.
- backpressure
Spring5 Webflux 실습
@ExtendWith(OutputCaptureExtension.class)
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
class ReactiveStreamTest {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final List<Integer> integers = List.of(1, 2, 3, 4, 5);
@Test
@Order(1)
@DisplayName("Iterable 테스트")
void iterableTest(CapturedOutput output) {
for (Iterator<Integer> it = integers.iterator(); it.hasNext(); ) {
Integer integer = it.next();
logger.info("Iterable Pattern : {}", integer);
}
assertThat("1,2,3,4,5가 출력되어야 한다.", captureOutput(output), everyItem(is(in(integers))));
}
@Test
@Order(2)
@DisplayName("Observable 테스트")
void observableTest(CapturedOutput output) {
ExamObservable observable = new ExamObservable();
observable.addObserver((o, arg) -> logger.info("Observable Pattern : {}", arg));
observable.push(integers);
assertThat("1,2,3,4,5가 출력되어야 한다.", captureOutput(output), everyItem(is(in(integers))));
}
@SuppressWarnings("deprecation")
static class ExamObservable extends Observable {
void push(List<Integer> integers) {
integers.forEach(i -> {
this.setChanged();
this.notifyObservers(i);
});
}
}
@Test
@Order(3)
@DisplayName("Reactive Streams 테스트")
@SuppressWarnings("all")
void reactiveStreamsTest(CapturedOutput output) {
Publisher<Integer> publisher = s -> integers.forEach(s::onNext);
publisher.subscribe(new Subscriber<>() {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
Subscription subscription;
@Override
public void onSubscribe(Subscription s) {
logger.info("onSubscribe");
this.subscription = s;
}
@Override
public void onNext(Integer integer) {
// publisher가 데이터를 push할때 실행
logger.info("Reactive Streams : {}", integer);
// 시스템 로드 평균이 90을 넘을경우
// if (ManagementFactory.getPlatformMXBean(OperatingSystemMXBean.class)
// .getSystemLoadAverage() > 90) {
// this.subscription.request(3);
// } else {
// this.subscription.request(1);
// }
}
@Override
public void onError(Throwable t) {
logger.error(t.getMessage());
}
@Override
public void onComplete() {
logger.debug("onComplete");
}
});
assertThat("1,2,3,4,5가 출력되어야 한다.", captureOutput(output), everyItem(is(in(integers))));
}
@Test
@Order(4)
@DisplayName("Reactor 테스트")
void reactorTest(CapturedOutput output) {
Flux.fromIterable(integers)
.subscribe(v -> logger.info("Reactor : {}", v));
assertThat("1,2,3,4,5가 출력되어야 한다.", captureOutput(output), everyItem(is(in(integers))));
}
private List<Integer> captureOutput(CapturedOutput output) {
return Arrays.stream(output.getOut()
.split("\n"))
.filter(line -> line.contains("DualityTest"))
.map(line -> Integer.parseInt(line.substring(line.length() - 1)))
.collect(toList());
}
}
- Iterable Pattern
- Observer Pattern
- Reactive Stream + BackPressure
- Reactor
- MonoTest.java
public class MonoTest {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Test
@DisplayName("mono를 처음 만들어보고 map을 이용하여 문자열을 숫자로 변환한다.")
void monoTest() {
final String ONE = "1";
Mono.just(ONE)
.log()
.subscribeOn(Schedulers.newSingle("mono"))
.doOnSubscribe(s -> logger.info("doOnSubscribe"))
.doOnNext(v -> logger.info("data type is {}", v.getClass()))
.map(Integer::parseInt)
.doOnNext(v -> logger.info("data type is {}", v.getClass()))
.subscribe(v -> logger.info("숫자 변환완료 : {}", v));
// error, complete는 어떻게?
}
@Test
@DisplayName("데이터를 처리중 에러 발생시 테스트")
void monoError() {
var notNum = "A";
Mono.just(notNum)
.log()
.map(Integer::parseInt)
.doOnError(e -> logger.error(e.toString()))
.subscribe(v -> {
}, e -> assertThat("A는 숫자가 아니다.", notNum, not(instanceOf(Integer.class))));
}
@Test
@DisplayName("데이터를 처리중 에러 발생시 테스트")
void monoError2() {
Mono.error(NumberFormatException::new)
.log()
.doOnError(e -> assertThat("A는 숫자가 아니다.", e, Matchers.instanceOf(NumberFormatException.class)))
.subscribe();
}
@Test
@DisplayName("멀티 라인 문자열을 한 라인씩 1초 지연 후 subscriber에게 푸시를 하여 테스트를 진행한다.")
void monoDelay() {
var flux =
Mono.just("hello\nwebflux")
.flatMapMany(s -> Flux.fromArray(s.split("\n"))
.delayElements(Duration.ofSeconds(1))
).log();
StepVerifier.create(flux)
.expectNext("hello")
.expectNext("webflux")
.verifyComplete();
}
@Test
@DisplayName("mono first, zip, zipWith 테스트")
void monoFirst() {
var mono1 = Mono.just("1")
.delayElement(Duration.ofSeconds(3));
var mono2 = Mono.just("2")
.delayElement(Duration.ofSeconds(1));
var mono3 = Mono.just("3")
.delayElement(Duration.ofSeconds(2));
var first = Mono.firstWithSignal(mono1, mono2, mono3)
.log();
StepVerifier.create(first)
.expectNext("2")
.verifyComplete();
final long start = System.currentTimeMillis();
StepVerifier.create(Mono.zip(mono1, mono2, mono3).log())
.consumeNextWith(tuple3 -> {
assertEquals("1", tuple3.getT1());
assertEquals("2", tuple3.getT2());
assertEquals("3", tuple3.getT3());
long time = System.currentTimeMillis() - start;
MatcherAssert.assertThat("3개의 mono.zip의 실행시간 약 3000ms이다.",
time,
lessThan(4000L));
})
.verifyComplete();
var zipWith = Mono.just("A")
.zipWith(Mono.just(1), (s, num) -> s + num).log();
StepVerifier.create(zipWith)
.expectNext("A1")
.verifyComplete();
}
}
- Mono생성 & 테스트
- map : 동기, 값을 변환 , flatMap : 비동기, 새로운 publisher 변환
- 쓰레드 격리
- error발생
- emit된 mono -> flux변환, delay
- Publisher 테스트를 위한 StepVerifier
- Mono.fisrt, Mono.zip, mono.zipWith
- 비동기 세상에서는 가장 긴 실행시간이 전체 실행시간
- FluxTest.java
public class FluxTest {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Test
@DisplayName("flux생성 : just")
void createFlux1() {
var flux1 = Flux.just("1", "2", "3")
.log();
StepVerifier.create(flux1)
.expectNext("1")
.expectNextCount(2)
.verifyComplete();
}
@Test
@DisplayName("flux생성 : range")
void createFlux2() {
var monoFromFlux2 = Flux.range(0, 50)
.delayElements(Duration.ofMillis(100))
.publishOn(Schedulers.newSingle("single"))
.map(String::valueOf)
.doOnNext(logger::info)
.doOnComplete(() -> logger.info("데이터 처리가 완료 되었습니다."))
.collectList();
StepVerifier.create(monoFromFlux2)
.expectNextCount(1)
.verifyComplete();
}
@Test
@DisplayName("flux생성 : interval")
void createFlux3() {
var fluxBuffferList = Flux.interval(Duration.ofMillis(100))
.log()
.buffer(10);
StepVerifier.create(fluxBuffferList.log()
.take(3))
.recordWith(ArrayList::new)
.expectNextCount(1)
.expectNextCount(1)
.expectNextCount(1)
.consumeRecordedWith(v -> {
assertEquals(3, v.size());
assertEquals((int) v.stream()
.mapToLong(Collection::size)
.sum(),
30);
})
.verifyComplete();
}
@Test
@DisplayName("쓰레드 격리")
void createFlux4() {
var flux = Flux.just("A", "B", "C")
.log()
.publishOn(Schedulers.newSingle("SUBSCRIBER"))
.log()
.subscribeOn(Schedulers.newSingle("PUBLISHER"));
StepVerifier.create(flux)
.expectNext("A")
.expectNext("B")
.expectNext("C")
.verifyComplete();
}
@Test
@DisplayName("유용한 flux 유틸 테스트")
void createFlux5() {
Flux<String> flux1 = Flux.just(1, 2, 3, 4, 4, 4, 5, 6, 7, 8, 9, 9, 10)
.delayElements(Duration.ofMillis(200))
.publishOn(Schedulers.newSingle("AAAA"))
.groupBy(v -> v)
.log()
.map(v -> "flux1 [data:" + v.key() + ", count:" + v.count() + "]");
Flux<String> flux2 = Flux.range(100, 10)
.delayElements(Duration.ofMillis(500))
.publishOn(Schedulers.newSingle("BBBB"))
.log()
.map(v -> "flux2 : " + v);
// 병합
Flux<String> merged = Flux.merge(flux1, flux2);
// 연결
Flux<String> concat = Flux.concat(flux1, flux2);
StepVerifier.create(concat)
.recordWith(ArrayList::new)
.expectNextCount(20)
.consumeRecordedWith(v -> {
for (String s : v) {
logger.info("[record] {}", s);
}
})
.verifyComplete();
}
}
- Flux생성 & 테스트
- Flux.range , Flux.inteval
- Flux.buffer : emit된 데이터를 n개까지 모아서 반환
- StepVerifier.recordWith
- log에서 cancel이유는? take
- publishOn vs subscribeOn
- Flux.groupBy, MonoCount
- Flux.merge(비동기) vs Flux.concat(동기)
- BackPressureTest.java
public class BackPressureTest {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Test
void backpressureTest() {
final var freeMemory = new CountDownLatch(100);
BoardRepository boardRepository = new BoardRepository();
Flux<Long> boards = boardRepository.findAll();
boards.publishOn(Schedulers.newSingle("SUBSCRIBER"))
.log()
.subscribe(new BaseSubscriber<>() {
@Override
protected void hookOnNext(Long value) {
freeMemory.countDown();
var freeMem = freeMemory.getCount();
if (freeMem == 1) {
logger.warn("메모리 부족 -> pusblisher에게 cancel");
cancel();
}
if (freeMem <= 30 && freeMem > 1) {
logger.info("남은 메모리 용량 : {}%, 1개씩 전송해주세요", freeMem);
request(1);
} else if (freeMem % 10 == 0) {
request(10);
}
}
@Override
protected void hookOnCancel() {
logger.warn("publisher -> subscriber.onCanceled");
}
});
StepVerifier.create(boards)
.expectNextCount(99)
.thenCancel()
.verify();
}
static class BoardRepository {
Flux<Long> findAll() {
return Flux.interval(Duration.ofMillis(100));
}
}
}
- BaseSubscriber.hookOnNext
- HotPublisherTest.java
@ExtendWith(OutputCaptureExtension.class)
public class HotPublisherTest {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Test
@DisplayName("flux: cold 테스트")
void coldTest(CapturedOutput output) throws InterruptedException {
final var source = Flux.fromIterable(List.of("blue", "green", "orange", "purple"))
.map(String::toUpperCase);
source.subscribe(color1 -> logger.info("subscriber1 --- {}", color1));
Thread.sleep(3000);
source.subscribe(color2 -> logger.info("subscriber2 --- {}", color2));
assertThat("2번 실행된다.",
extractColorsFromConsole(output),
contains("BLUE", "GREEN", "ORANGE", "PURPLE", "BLUE", "GREEN", "ORANGE", "PURPLE"));
}
@Test
@DisplayName("flux: hot 테스트")
void hotTest(CapturedOutput output) {
Sinks.Many<Object> hot = Sinks.many()
.multicast()
.directBestEffort();
var hotFlux = hot.asFlux();
hot.tryEmitNext("BLACK");
hot.tryEmitNext("RED");
hotFlux.subscribe(d -> logger.info("Subscriber 1 --- {}", d));
hot.tryEmitNext("BLUE");
hot.tryEmitNext("GREEN");
hotFlux.subscribe(d -> logger.info("Subscriber 2 --- {}", d));
hot.tryEmitNext("ORANGE");
hot.tryEmitNext("PURPLE");
hot.tryEmitComplete();
assertThat("hot publisher 테스트",
extractColorsFromConsole(output),
contains("BLUE", "GREEN", "ORANGE", "ORANGE", "PURPLE", "PURPLE"));
}
private List<String> extractColorsFromConsole(CapturedOutput output) {
return Arrays.stream(output.getOut()
.split("\n"))
.filter(v -> v.contains("--- "))
.map(v -> v.substring(v.lastIndexOf("--- ") + 4))
.collect(toList());
}
}
-
cold vs hot
-
Sinks.many().multicast().directBestEffort() vs onBackpressureBuffer() : reference
-
webflux2021 spring diagram
-
WebfluxConfig
- @EnableWebflux
- WebFluxConfigurer
- AbstractErrorWebExceptionHandler : 글로벌 exception handler, @ExceptionHandler(Exception.class) 금지
-
MemberRouter
@Bean
public RouterFunction<ServerResponse> memberRoute() {
return route().before(this::before)
.path("/members",
b -> b.GET("/{id}", memberHandler::getMember)
.POST("", memberHandler::createMember)
)
.POST("/upload", memberHandler::upload)
.GET("/addresses", memberHandler::getAddresses)
.GET("/blocking", memberHandler::blocking)
.after(this::after)
.build();
}
RouterFunction
- route(), before, path, nest,
- before, after
@Component
public class MemberHandler {
final Logger log = LoggerFactory.getLogger(this.getClass());
public Mono<ServerResponse> getMember(ServerRequest request) {
request.headers()
.asHttpHeaders()
.toSingleValueMap()
.forEach((key, value) -> log.info("{} : {}", key, value));
log.info(request.queryParam("reactive")
.orElse("no queryParam"));
return ServerResponse.ok()
.bodyValue(request.pathVariable("id"));
}
public Mono<ServerResponse> createMember(ServerRequest request) {
var memberMono = request.bodyToMono(Member.class)
.map(member -> {
member.setPhone("031-0101-0101");
return member;
});
return ServerResponse.ok()
.body(memberMono, Member.class);
}
public Mono<ServerResponse> upload(ServerRequest request) {
Mono<MultiValueMap<String, Part>> body = request.body(BodyExtractors.toMultipartData());
return body.flatMap(parts -> {
Part file = parts.getFirst("upload.log");
log.info("파일명 : {}", Objects.requireNonNull(file).name());
var flux = file.content()
.flatMap(buf -> {
String received = buf.toString(Charset.defaultCharset());
return Flux.fromStream(Arrays.stream(received.split("\n")));
})
.buffer(100)
.delayElements(Duration.ofMillis(500))
.log()
.flatMapSequential(lists -> Mono.just(lists.stream()
.map(Integer::parseInt)
.reduce(Integer::sum)
.orElse(0))
);
return ServerResponse.ok()
.contentType(MediaType.TEXT_EVENT_STREAM)
.body(flux, Integer.class);
});
}
public Mono<ServerResponse> getAddresses(ServerRequest request) {
String keyword = request.queryParam("keyword")
.orElseThrow();
String pageNumber = request.queryParam("pageNumber")
.orElseThrow();
String pageSize = request.queryParam("pageSize")
.orElseThrow();
String clientId = request.headers()
.firstHeader("clientId");
String platform = request.headers()
.firstHeader("platform");
return WebClient.create("https://alpha-shop-api.e-ncp.com/")
.get()
.uri("/addresses/search?pageNumber={pageNumber}&pageSize={pageSize}&keyword={keyword}", pageNumber, pageSize, keyword)
.header("clientId", clientId)
.header("platform", platform)
.accept(MediaType.APPLICATION_JSON)
.retrieve() // vs exchange
.onStatus(HttpStatus::is4xxClientError, response -> Mono.error(new ServerWebInputException("input error")))
.onStatus(HttpStatus::is5xxServerError, response -> Mono.error(new ServerWebInputException("input error")))
.bodyToMono(String.class)
.flatMap(body -> ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(body));
}
/**
* @see <a href="https://projectreactor.io/docs/core/release/reference/#faq.wrap-blocking">wrap-blocking</a>
*/
public Mono<ServerResponse> blocking(ServerRequest request) {
return Mono.fromCallable(this::findOne)
.subscribeOn(Schedulers.boundedElastic())
.as(body -> ServerResponse.ok()
.body(body, String.class));
}
}
HandlerFunction
- MemberHandler : ServerRequest, ServerResponse, request.bodyToMono, BodyExtractoers(multipart), WebClient(no AsyncRestTemplate)
- /http/webflux.http 순서대로 실행
- blocking call
- MemberRouterTest
- @WebFluxTest
- WebTestClient (no MockMvc)
- MemberHistoryReactiveRepositoryTest
- @EnableReactiveMongoRepositories
- @DataMongoTest
- MemberReactiveRepositoryTest
- @EnableR2dbcRepositories
- @DataR2dbcTest
- ReactiveCrudRepository
- R2dbcEntityTemplate
- TransactionalOperator (선언적 트랜잭션 X, programatic) : commit, rollback
- MemberCacheTest
- ReactiveRedisTemplate
- @RedisHash X, RedisRepository X
- WebSocketConfig
@Configuration
public class WebSocketConfig {
Logger log = LoggerFactory.getLogger(this.getClass());
final ReactiveMongoTemplate reactiveMongoTemplate;
final Sinks.Many<String> multicast;
public WebSocketConfig(ReactiveMongoTemplate reactiveMongoTemplate, Sinks.Many<String> multicast) {
this.reactiveMongoTemplate = reactiveMongoTemplate;
this.multicast = multicast;
}
@Bean
public HandlerMapping handlerMapping() {
var urlMap = Map.of("/chat", chatHandler()
, "/other", chatHandler());
var order = -1; // before annotated controllers
return new SimpleUrlHandlerMapping(urlMap, order);
}
WebSocketHandler chatHandler() {
return session -> {
final var ip = Objects.requireNonNull(session.getHandshakeInfo()
.getRemoteAddress())
.getAddress()
.getHostAddress();
var input = session.receive()
.map(message -> new Chat(ip, message.getPayloadAsText()))
.doOnNext(chat -> log.debug("메세지 : {}, IP : {}", chat.message, chat.ip))
.doOnNext(chat -> multicast.tryEmitNext(chat.toString()))
.bufferTimeout(10, Duration.ofSeconds(5))
.doOnNext(chats -> reactiveMongoTemplate.insert(chats, "chats")
.subscribe())
.then();
var output = session.send(multicast.asFlux()
.map(session::textMessage));
return Mono.zip(input, output)
.then();
};
}
@Document
static record Chat(String ip, String message) {
}
}
- /websocket.html 접속
- @Bean multicast()등록, hot publisher
- input과 output을 모두 구현
- bufferTimout : n개가 emited될때까지 또는 n초까지 모아서 처리
- ReactiveMongoTemplate, @Document
- reference
- 서버가 N대 일떄? redis pub-sub
- KafkaConfig
- ReactiveKafkaProducerTemplate
- key in -> map -> kafka produce -> @Bean multicast()
event(data) stream
결론
- webflux가 빠르다? No!
- 외부 I/O 응답이 느릴수록 동시에 호출하는 것이 많을수록 Non-Blocking의 장점이 극대화 된다.
- functional programming : jdk8 lambda, functional interface, higher order function, Non-Iterable, method chaining
Declarative
vs Imperative- (개인 의견) kotlin으로... 코드 살펴보기(coRouter, coroutine, transaction)
- jpa와 같은 jdbc를 사용할 경우(특히 insert, update, delete) 실행되는 thread를 반드시 확인할것.
더 공부해볼만한 것들
- webflux functional endpoint 문서화
- reactive kafka consumer 참고
- redis pub/sub 구현 참고
- flux를 모아서 중복없이 처리하는 방법 : Flux.bufferTimeout(), Flux.groupBy(), Flux.create(sink)