1. Reactor 기본 개념
Reactor는 Mono와 Flux라는 두 가지 핵심 타입을 제공합니다. Mono는 0 또는 1개의 요소를, Flux는 0개 이상의 요소를 나타냅니다.
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class ReactorBasics {
public Mono getSingleValue() {
return Mono.just("Hello, Reactive World!");
}
public Flux getMultipleValues() {
return Flux.just(1, 2, 3, 4, 5);
}
public Flux getInfiniteStream() {
return Flux.range(1, Integer.MAX_VALUE);
}
}
팁: Mono와 Flux는 지연 평가(lazy evaluation)를 사용합니다. 즉, subscribe 메서드를 호출하기 전까지는 실제로 실행되지 않습니다.
2. 연산자 활용
Reactor는 데이터 스트림을 변환, 필터링, 결합하는 다양한 연산자를 제공합니다.
public class ReactorOperators {
public Flux transformStream() {
return Flux.range(1, 10)
.map(i -> i * 2)
.filter(i -> i > 5)
.distinct();
}
public Flux combineStreams() {
Flux flux1 = Flux.just("A", "B", "C");
Flux flux2 = Flux.just("X", "Y", "Z");
return flux1.zipWith(flux2, (f1, f2) -> f1 + f2);
}
public Mono handleErrors() {
return Mono.just("10")
.map(Integer::parseInt)
.onErrorReturn(0);
}
}
팁: 연산자를 조합하여 복잡한 데이터 처리 파이프라인을 구축할 수 있습니다. 하지만 과도한 체이닝은 가독성을 해칠 수 있으므로 적절히 분리하세요.
3. 배압(Backpressure) 처리
배압은 데이터 소비자가 처리할 수 있는 속도보다 빠르게 데이터가 생성될 때 발생합니다. Reactor는 이를 효과적으로 처리할 수 있는 메커니즘을 제공합니다.
public class BackpressureExample {
public Flux handleBackpressure() {
return Flux.range(1, 1000000)
.onBackpressureBuffer(10000) // 버퍼 크기 제한
.onBackpressureDrop(i -> System.out.println("Dropped: " + i))
.limitRate(100); // 요청 개수 제한
}
public void consumeWithBackpressure() {
handleBackpressure()
.subscribe(new BaseSubscriber() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
request(10);
}
@Override
protected void hookOnNext(Integer value) {
System.out.println("Received: " + value);
request(1);
}
});
}
}
팁: 배압 처리 전략(버퍼링, 드롭, 최신 값 유지 등)을 상황에 맞게 선택하세요. 메모리 사용량과 처리 지연 사이의 균형을 고려해야 합니다.
4. 스케줄러 활용
Reactor의 스케줄러를 사용하면 비동기 실행과 쓰레드 관리를 효과적으로 할 수 있습니다.
import reactor.core.scheduler.Schedulers;
public class SchedulerExample {
public Flux performComputation() {
return Flux.range(1, 10)
.map(i -> heavyComputation(i))
.subscribeOn(Schedulers.parallel())
.publishOn(Schedulers.single());
}
private String heavyComputation(int i) {
// 시간이 오래 걸리는 계산 시뮬레이션
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "Result: " + i;
}
}
팁: subscribeOn은 전체 체인의 실행 컨텍스트를 설정하고, publishOn은 이후 연산자들의 실행 컨텍스트를 변경합니다. 적절히 조합하여 성능을 최적화하세요.
5. 반응형 코드 테스팅
반응형 코드의 테스트는 전통적인 동기식 코드와는 다른 접근이 필요합니다.
import reactor.test.StepVerifier;
public class ReactorTest {
@Test
public void testFlux() {
Flux flux = Flux.just(1, 2, 3, 4, 5);
StepVerifier.create(flux)
.expectNext(1, 2, 3, 4, 5)
.expectComplete()
.verify();
}
@Test
public void testMono() {
Mono mono = Mono.just("Hello");
StepVerifier.create(mono)
.expectNext("Hello")
.expectComplete()
.verify();
}
}
팁: StepVerifier를 사용하면 비동기 스트림의 동작을 단계별로 검증할 수 있습니다. 에러 케이스와 타임아웃 상황도 테스트하는 것을 잊지 마세요.
6. 반응형 프로그래밍 모범 사례
- 전체 애플리케이션 스택을 반응형으로 구성하세요. 일부만 반응형으로 만들면 이점이 제한될 수 있습니다.
- 블로킹 작업을 별도의 스케줄러에서 실행하여 메인 이벤트 루프가 차단되지 않도록 하세요.
- 에러 처리를 신중히 설계하세요. 스트림의 어느 지점에서 에러를 처리할지 결정하는 것이 중요합니다.
- Hot Publisher와 Cold Publisher의 차이를 이해하고 적절히 사용하세요.
- 복잡한 비즈니스 로직은 별도의 서비스 레이어로 분리하여 반응형 코드의 가독성을 유지하세요.
결론
Java에서의 반응형 프로그래밍은 높은 처리량과 효율적인 리소스 사용을 가능하게 합니다. Reactor를 활용하면 비동기 데이터 스트림을 우아하게 처리할 수 있으며, 특히 마이크로서비스 아키텍처나 실시간 데이터 처리 시스템에서 큰 이점을 제공합니다. 하지만 학습 곡선이 가파르고 디버깅이 복잡할 수 있으므로, 팀의 역량과 프로젝트의 요구사항을 고려하여 도입을 결정해야 원활하게 개발이 진행될 수 있습니다.