Page tree
Skip to end of metadata
Go to start of metadata

4장 리액터 프로젝트 - 리액티브 앱의 기초

리액터 프로젝트 버전 1.x

  • 장점

  • 당시 이벤트를 빠른 속도로 처리하기에 충분했다.

  • 스프링 프레임워크와의 완벽한 통합, 네티와의 결합을 통해 비동기 및 논블로킹 메시지 처리를 제공
  • 단점

  • 배압 조절 기능의 부재
  • 오류처리의 복잡함

리액터 프로젝트 버전 2.x

  • 장점
  • 이벤트버스 및 스트림 기능을 별도의 모듈로 추출
  • 리액티브 스트림 스펙 준수 리액티브 시퀀스를 블로킹 구조로 전환하기
  • 자바 컬렉션 API와 쉽게 통합할 수 있게 됨
  • RxJava API와 유사해짐
  • 배압관리, 스레드 처리, 복원력 지연등 기능 추가

리액터 프로젝트 버전 3.x

  • 자바8 기준 적용
  • 스프링 프레임워크 5의 리액티브적인 변형을 담당

    배압

    • 푸시 전용 : 구독자가 효과적으로 무한한 숫자를 요청하는 경우
    • 풀 전용 : 구독자가 이전 요소를 받은 후에만 다음 요소를 요청하는 경우
    • 풀+푸시 : 구독자가 수요를 실시간 제어할 수 있고 게시자가 데이터 소비 속도에 적응할 수 있는 경우

리액티브 타입 - Flux와 Mono

  • Flux
  • 0,1 또는 여러 요소를 생성할 수 있는 일반적인 리액티브 스트림을 정의
  • 무한한 양의 요소를 만들 수 있다
  • Mono
  • 최대 하나의 요소를 생성할 수 있는 스트림을 정의
  • CompletableFuture와 의미적으로 동일
    • Mono가 리액티브 연산자를 더 많이 제공
  • 작업 성공시 Mono 유형 반환
  • 처리 완료시 onComplete(), 실패시 onError()
  • Flux와 Mono는 서로를 쉽게 변환할 수 있다.

리액티브 스트림의 기능들

  • 시퀀스 생성

     Flux<String >stream1 = Flux.just("Hello", "world");  
    Flux<Interger> stream2 = FLux.fromArray(new Integer[] {1, 2, 3});  
    FLux<Integer> stream3 = Flux.fromIterable(Arrays.asList(9, 8, 7));  
    
     
  • 리액티브 스트림 구독

  • 프로듀서가 제한된 수요를 처리하는 데 적합하다면 구독 객체로 수요를 제어하거나 요청 제한 연산자를 적용하는 것이 좋다.
     Flux.range(1, 100)  
        .subscribe(  
            data -> log.info("onNext: {}", data),  
            err -> { /* ignore */ },  
            () -> log.info("onComplete"),  
            subscription -> {  
                subscription.request(4);  
                subscription.cancel();  
            }  
        );  
    
     
     onNext :1  
    onNext :2  
    onNext :3  
    onNext :4  
    
     
  • 연산자를 이용한 리액티브 시퀀스 변환

  • 리액티브 시퀀스의 원소 매핑
     Flux.range(2018, 5)  
        .timestamp()  
        .index() //열거형으로 전환  
        .subscribe(e -> log.info("index: {}, ts: {}. value: {}",  
            e.getT1(),  
            Instant.ofEpochMilli(e.getT2().getT1()),  
            e.getT2().getT2()));  
    
     
     index: 0, ts: 2021-09-11T01:43:14.561Z. value: 2018  
    index: 1, ts: 2021-09-11T01:43:14.581Z. value: 2019  
    index: 2, ts: 2021-09-11T01:43:14.581Z. value: 2020  
    index: 3, ts: 2021-09-11T01:43:14.581Z. value: 2021  
    index: 4, ts: 2021-09-11T01:43:14.582Z. value: 2022  
    
     
  • 리액티브 시퀀스 필터링
     Mono<?> startCommand = ...  
    Mono<?> stopCommand = ...  
    Flux<UserEvent> streamOfData = ...  
    
     streamOfData  
        .skipUntilOther(startCommand)  
        .takeUntilOther(stopCommand)  
        .subscribe(System.out::println);  
    
     
  • 리액티브 시퀀스 수집하기

     Flux.just(1, 6, 2, 8, 3, 1, 5, 1)  
        .collectSortedList(Comparator.reverseOrder())  
        .subscribe(System.out::println);  
    
     
     [8, 6, 5, 3, 2, 1, 1, 1]  
    
     
  • 스트림의 원소 줄이기

     Flux.just(3, 5, 7, 9, 11 ,15, 16, 17)  
        .any(e -> e % 2 == 0)  
        .subscribe(hasEvens -> log.info("Has evens: {}", hasEvens);  
    
     
     true  
    
     
     Flux.just(1, 2, 3)  
        .thenMany(FLux.just(4, 5))  
        .subscribe(e -> log.info("onNext: {}", e);  
    
     
     onNext: 4  
    onNext: 5  
    
     
  • 리액티브 스트림 조합하기

     Flux.concat(  
        Flux.range(1, 3),  
        Flux.range(4, 3),  
        Flux.range(6, 5)  
    ).subscribe(e -> log.info("onNext: {}", e));  
    
     
     [1,2,3] + [4, 5] + [6,7,8,9,10]  
    onNext: 1  
    onNext: 2  
    onNext: 3  
    onNext: 4  
    onNext: 5  
    onNext: 6  
    onNext: 6  
    onNext: 7  
    onNext: 8  
    onNext: 9  
    onNext: 10  
    
     
  • 스트림 내의 원소 일괄 처리하기

     Flux.range(1, 13)  
        .buffer(4)  
        .subscribe(e -> log.info("onNext: {}", e);  
    
     
     onNext: [1, 2, 3, 4]  
    onNext: [5, 6, 7, 8]  
    onNext: [9, 10, 11, 12]  
    onNext: [13]  
    
     
  • flatMap

     public Flux<String> requestBooks(String user) {  
    	return Flux.range(1, random.nextInt(3) + 1)  
    		.map(i -> "book-" + i)  
    		.delayElements((Duration.ofMillis(3)));  
    }  
    
     
     Flux.just("user-1", "user-2", "user-3")  
    	.flatMap(u -> requestBooks(u)  
    		.map(b -> u + "/" + b))  
    	.subscribe(r -> log.info("onNext: {}", r));  
    
     
     onNext: user-3/book-1  
    onNext: user-1/book-1  
    onNext: user-1/book-2  
    
     
  • 리액티브 시퀀스를 블로킹 구조로 전환하기

    • toIterable : Reactive Flux -> Blocking Iterable
    • toStream : Reactive Flux -> Blocking Stream API
    • reactor 3.2 : toIterable을 사용
    • blockFirst : upstream이 첫번쨰 값을 보내거나 완료될 때까지 현재 스레드를 차단
    • blockLast : upstream이 마지막 값을 보내거나 완료될 떄까지 현재 스레드를 차단
    • onError는 차단된 스레드에 예외가 발생
  • 시퀀스를 처리하는 동안 처리 내역 살펴보기

    • doOnNext(Consumer ) 는 Flux나 Mono의 각 원소에 대해 어떤 액션을 수행할 수 있게 해준다.
    • doOnComplete(), doOnError(Throwable)는 대응 이벤트 발생시에 호출
    • doOnSubscribe(Consumer), doOnRequest(LongConsumer), doOnCancel(Runnable)을 사용하면 구독 라이플 사이클 이벤트에 대응할 수 있다.
    • doOnTerminate(Runnable)는 스트림 종료 시에 종료의 원인과 관계없이 기동
  • 모든 신호를 처리하는 doOnEach(Consumer)

     Flux.just(1, 2, 3)  
      .concatWith(Flux.error(new RuntimeException("Conn error")))  
    .doOnEach(s -> log.info("signal: {}", s))  
    .subscribe();  
    
     
     signal: doOnEach_onNext(1)  
    signal: doOnEach_onNext(2)  
    signal: doOnEach_onNext(3)  
    signal: onError(java.lang.RuntimeException: Conn error)  
    
     
  • 데이터를 시그널로 변환하기

     Flux.range(1, 3)  
    .doOnNext(e -> log.info("data : {}", e))  
    .materialize()  
    .doOnNext(e -> log.info("signal: {}", e))  
    .dematerialize()  
    .collectList()  
    .subscribe(r -> log.info("result: {}", r));  
    
     
     data : 1  
    signal: onNext(1)  
    data : 2  
    signal: onNext(2)  
    data : 3  
    signal: onNext(3)  
    signal: onComplete()  
    result: [1, 2, 3]  
    
     

리액터를 이용해 스트림 만들기

  • 팩토리 메서드 - push / create
  • push : 단일 스레드 생성자를 적용해 Flux 인스턴스를 생성
    • 배압과 cancel에 대한 걱정없이 비동기, 단일 스레드, 다중 값을 가지는 API를 적용하는데 유용
    • 구독자가 부하를 처리할 수 없는 경우 배압과 취소는 모두 큐를 이용해 처리됨
     Flux.push(emitter -> IntStream  
      .range(2000, 3000)  
      .forEach(emitter::next))  
      .delayElements(Duration.ofMillis(1))  
      .subscribe(e -> log.info("onNext: {}", e));  
    
     
     onNext: 2000  
    onNext: 2001  
    onNext: 2002  
    onNext: 2003  
    onNext: 2004  
    onNext: 2005  
    onNext: 2006  
    onNext: 2007  
    onNext: 2008  
    
     
  • create : FluxSink 인스턴스를 추가로 직렬화하므로 다른 스레드에서 이벤트를 보낼 수 있게 된다.
     Flux.create(emitter -> {  
        emitter.onDispose(() -> log.info("Disposed"));  
        // push events to emitter  
      })  
      .subscribe(e -> log.info("onNext: {}", e));  
    
     
  • 팩토리 메서드 - generate

  • generate : 메서드를 호출하는 오브젝트의 내부 전달 상태를 기반으로 복잡한 시퀀스를 만들 수 있도록 설계됨
     Flux.generate(  
        () -> Tuples.of(0L, 1L),  
        (state, sink) -> {  
          log.info("generated value: {}", state.getT2());  
          sink.next(state.getT2());  
          long newValue = state.getT1() + state.getT2();  
          return Tuples.of(state.getT2(), newValue);  
        })  
      .delayElements(Duration.ofMillis(1))  
      .take(7)  
      .subscribe(e -> log.info("onNext: {}", e));  
    
     
     generated value: 1  
    generated value: 1  
    generated value: 2  
    generated value: 3  
    generated value: 5  
    generated value: 8  
    generated value: 13  
    generated value: 21  
    generated value: 34  
    generated value: 55  
    generated value: 89  
    ...  
    
     
  • 일회성 리소스에 의존하는 스트림 생성 - using
  • using : 리액티브 프로그래밍에서 사용하는 try-with-resources 방식의 접근법
     Flux<String> ioRequestResults = Flux.using(  
        Connection::newConnection,  
        connection -> Flux.fromIterable(connection.getData()),  
        Connection::close  
    );  
    
    ioRequestResults.subscribe(  
        data -> log.info("Received data: {}", data),  
        e -> log.info("Error: {}", e.getMessage()),  
        () -> log.info("Stream finished"));  
    )  
    
     
  • usingWhen 팩토리를 사용한 리액티브 트랜잭션 래핑

  • Publisher의 인스턴스에 가입해 관리되는 리소스를 리액티브 타입으로 검색
  • 메인 스트림의 성공 및 실패에 대해 각각 다른 핸들러를 사용할 수 있음
    • usingWhen 연산자만으로 완전한 논블로킹 리액티브 트랜잭션을 구현할 수 있게 됨
     Flux.usingWhen(  
        Transaction.beginTransaction(),  
        transaction -> transaction.insertRows(Flux.just("A", "B", "C")),  
        Transaction::commit,  
        Transaction::rollback  
      ).subscribe(  
        d -> log.info("onNext: {}", d),  
        e -> log.info("onError: {}", e.getMessage()),  
        () -> log.info("onComplete")  
      );  
    
     
  • 에러 처리하기

  • 최종 구독자가 onError 시그널에 대한 핸들러를 정의하지 않으면 UnsupportedOperationException을 발생시킨다.
  • 리액티브 스트림은 onError가 스트림이 종료됐다고 정의하기 있기 때문에 시그널을 받으면 시퀀스가 실행을 중지한다.
     //신뢰도가 낮은 문제있는 추천서비스  
    public Flux<String> recommendedBooks(String userId) {  
    	return Flux.defer(() -> {  
    		if (random.nextInt(10) < 7) {  
    			return Flux.<String>error(new RuntimeException("Err"))  
    				.delaySequence(Duration.ofMillis(100));  
    		} else {  
    			return Flux.just("Blue Mars", "The Expanse")  
    				.delayElements(Duration.ofMillis(50));  
    		}  
    	}).doOnSubscribe(s -> log.info("Request for {}", userId));  
    }  
    
     
     Flux.just("user-1")  
    		.flatMap(user ->  
    			recommendedBooks(user)  
    				.retryWhen(Retry.backoff(5, Duration.ofMillis(100)))  
    				.timeout(Duration.ofSeconds(3))  
    				.onErrorResume(e -> Flux.just("The Martian"))  
    		).doOnSubscribe(  
    			b -> log.info("onNext: {}", b),  
    			e -> log.warn("onError: {}", e.getMessage()),  
    			() -> log.info("onComplete")  
    		);  
    
     

배압 다루기

  • onBackPressureBuffer : 제한되지 않은 요구를 요청하고 결과를 다운스트림으로 푸시
  • onBackPressureDrop : 제한되지 않은 요구를 요청하고 데이터를 하위로 푸시
  • onBackPressureLast : onBackPressureDrop과 유사. 가장 최근에 수신된 원소를 기억하고, 요청이 발생하면 다운스트림으로 푸시
  • onBackPressureError : 데이터를 다운스트림으로 푸시하는 동안 크기를 제한하지 않고 요청

Hot 스트림 / Cold 스트림

  • Cold 퍼블리셔는 구독자가 나타날 때마다 해당 구독자에 대해 모든 시퀀스 데이터가 생성되는 방식으로 동작한다.
  • 구독자 없이는 데이터가 생성되지 않는다.
     Flux<String> coldPublisher = Flux.defer(() -> {  
    	log.info("Generating new items");  
    	return Flux.just(UUID.randomUUID().toString());  
    });  
    
    log.info("No Data was generated so far");  
    coldPublisher.subscribe(e -> log.info("onNext: {}", e));  
    coldPublisher.subscribe(e -> log.info("onNext: {}", e));  
    log.info("Data was generated twice for two subscribers");  
    
     
     No Data was generated so far  
    Generating new items  
    onNext: 017d3204-67e1-41cb-87f9-359a70e425c6  
    Generating new items  
    onNext: 29681cff-6c83-487c-aced-855dfb4d7ed4  
    Data was generated twice for two subscribers  
    
     
  • Hot 퍼블리셔의 데이터 생성은 구독자의 존재 여부에 의존하지 않는다. 따라서 핫 퍼블리셔는 첫번째 구독자가 구독을 시작하기 전에 원소를 만들어내기 시작할 수 있다.

  • 스트림 원소를 여러 곳으로 보내기

    • 콜드 퍼블리셔를 리액티브변환을 통해 핫 퍼블리셔로 전환할 수 있다.
    • ConnectableFlux를 이용하면 가장 수요가 많은 데이터를 생성하고, 다른 모든 가입자가 데이터를 처리할 수 있도록 캐싱된다.
     Flux<Integer> source = Flux.range(0, 3)  
    	.doOnSubscribe(s -> log.info("new subscription for the cold publisher"));  
    
    ConnectableFlux<Integer> conn = source.publish();  
    
    conn.subscribe(e -> log.info("subscriber1 on next: {}", e));  
    conn.subscribe(e -> log.info("subscriber2 on next: {}", e));  
    
    log.info("all subscribers are ready, connection");  
    conn.connect();  
    
     
     all subscribers are ready, connection  
    new subscription for the cold publisher  
    subscriber1 on next: 0  
    subscriber2 on next: 0  
    subscriber1 on next: 1  
    subscriber2 on next: 1  
    subscriber1 on next: 2  
    subscriber2 on next: 2  
    
     
  • 스트림 내용 캐싱하기

    • ConnectableFlux를 이용하면 다양한 캐싱 전략을 쉽게 구성할 수 있다.
    • ConnectableFlux를 이용하면 가장 수요가 많은 데이터를 생성하고, 다른 모든 가입자가 데이터를 처리할 수 있도록 캐싱된다.
     Flux<Integer> source = Flux.range(0, 3)  
    		.doOnSubscribe(s -> log.info("new subscription for the cold publisher"));  
    
    Flux<Integer> cachedSource = source.cache(Duration.ofSeconds(1));  
    
    cachedSource.subscribe(e -> log.info("subscriber1 on next: {}", e));  
    cachedSource.subscribe(e -> log.info("subscriber2 on next: {}", e));  
    
    Thread.sleep(1200);  
    
    cachedSource.subscribe(e -> log.info("subscriber3 on next: {}", e));  
    
     
     new subscription for the cold publisher  
    subscriber1 on next: 0  
    subscriber1 on next: 1  
    subscriber1 on next: 2  
    subscriber2 on next: 0  
    subscriber2 on next: 1  
    subscriber2 on next: 2  
    new subscription for the cold publisher  
    subscriber3 on next: 0  
    subscriber3 on next: 1  
    subscriber3 on next: 2  
    
     
  • 스트림 내용 공유

    • ConnectableFlux를 사용해 여러개의 구독자에 대한 이벤트를 멀티캐스트 할 수 있다.
    • share연산자로 콜드 퍼블리셔를 핫 퍼블리셔로 변환할 수 있으며, 구독자가 각 신규 구독자에게 이벤트를 전파하는 방식으로 작동된다.
     Flux<Integer> source = Flux.range(0, 5)  
    	.delayElements(Duration.ofMillis(100))  
    	.doOnSubscribe(s -> log.info("new subscription for the cold publisher"));  
    
    Flux<Integer> cachedSource = source.share();  
    
    cachedSource.subscribe(e-> log.info("subscription 1 on next: {}", e));  
    Thread.sleep(400);  
    cachedSource.subscribe(e-> log.info("subscription 2 on next: {}", e));  
    
     
     new subscription for the cold publisher  
    subscription 1 on next: 0  
    subscription 1 on next: 1  
    subscription 1 on next: 2  
    subscription 1 on next: 3  
    subscription 2 on next: 3  
    
     

시간 다루기

  • elapsed로 이전 이벤트와의 시간 간격을 측정
     Flux.range(0, 5)  
      .delayElements(Duration.ofMillis(100))  
    .elapsed()  
    .subscribe(e -> log.info("elapsed {} ms : {}", e.getT1(), e.getT2()));  
    
     

리액티브 스트림을 조합하고 변환하기

  • transform : 스트림 구조 자체를 변경할 수 있다.
     Function<Flux<String>, Flux<String>> logUserInfo = stream -> stream.index()  
    .doOnNext(tp -> log.info("[{}] User: {}", tp.getT1(), tp.getT2()))  
    .map(Tuple2::toString);  
    
    Flux.range(1000, 3)  
    .map(i -> "user-" + i)  
    .transform(logUserInfo)  
    .subscribe(e -> log.info("onNext: {}", e));  
    
     
     [0] User: user-1000  
    onNext: [0,user-1000]  
    [1] User: user-1001  
    onNext: [1,user-1001]  
    [2] User: user-1002  
    onNext: [2,user-1002]  
    
     
  • transformDeferred: transform과 유사. 구독자가 도착할 때마다 동일한 스트림 변환 작업을 수행한다.(composer deprecated)
     Function<Flux<String>, Flux<String>> logUserInfo = (stream) -> {  
      if (random.nextBoolean()) {  
        return stream.doOnNext(e -> log.info("[path A] User: {}", e));  
      } else {  
        return stream.doOnNext(e -> log.info("[path B] User: {}", e));  
      }  
    };  
    
    Flux<String> publisher = Flux.just("1", "2")  
        .transformDeferred(logUserInfo);  
    
    publisher.subscribe();  
    publisher.subscribe();  
    
     
     [path A] User: 1  
    [path A] User: 2  
    [path B] User: 1  
    [path B] User: 2  
    
     

Processor

  • Publisher이면서 동시에 Subscriber
  • 구독이 가능하며, 시그널(onNext, onError, onComplete)를 수동으로 보낼 수 있다.
  • 하지만 Processor 사용을 권장하지 않으며, 패곹리 메서드(push, create, generate) 사용을 권장
    • direct : Process 입력부를 직접 구현해 데이터를 푸시
    • Synchronous
    • Asynchronous

리액터 프로젝트 테스트 및 디버깅

  • 심화학습장에서 다룸
 // stacktrace 수집  
Hooks.onOperatorDebug();  

 


  • No labels