Page tree

Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Markdown
id-

# 4장 리액터 프로젝트 - 리액티브 앱의 기초  
  
## 리액터 프로젝트 버전 1.x  
  
- 장점  
  
  - 당시 이벤트를 빠른 속도로 처리하기에 충분했다.  
  - 스프링 프레임워크와의 완벽한 통합, 네티와의 결합을 통해 비동기 및 논블로킹 메시지 처리를 제공  
  
- 단점  
  - 배압 조절 기능의 부재  
  - 오류처리의 복잡함  
  
## 리액터 프로젝트 버전 2.x  
  
- 장점  
  - 이벤트버스 및 스트림 기능을 별도의 모듈로 추출  
  - 리액티브 스트림 스펙 준수    리액티브 시퀀스를 블로킹 구조로 전환하기
  - 자바 컬렉션 API와 쉽게 통합할 수 있게 됨    
  - RxJava API와 유사해짐  
  - 배압관리, 스레드 처리, 복원력 지연등 기능 추가  
  -   
## 리액터 프로젝트 버전 3.x  
  
- 자바8 기준 적용  
- 스프링 프레임워크 5의 리액티브적인 변형을 담당  
  
    ### 배압  
    - 푸시 전용 : 구독자가 효과적으로 무한한 숫자를 요청하는 경우  
    - 풀 전용 : 구독자가 이전 요소를 받은 후에만 다음 요소를 요청하는 경우  
    - 풀+푸시 : 구독자가 수요를 실시간 제어할 수 있고 게시자가 데이터 소비 속도에 적응할 수 있는 경우  
  
## 리액티브 타입 - Flux와 Mono  
  
- Flux  
  - 0,1 또는 여러 요소를 생성할 수 있는 일반적인 리액티브 스트림을 정의  
  - 무한한 양의 요소를 만들 수 있다  
- Mono  
  - 최대 하나의 요소를 생성할 수 있는 스트림을 정의  
  - CompletableFuture와 의미적으로 동일  
    - Mono가 리액티브 연산자를 더 많이 제공  
  - 작업 성공시 Mono<Void> 유형 반환  
  - 처리 완료시 onComplete(), 실패시 onError()  
- Flux와 Mono는 서로를 쉽게 변환할 수 있다.  
      
## 리액티브 스트림의 기능들  
  
- 시퀀스 생성  
  
    ``````java  
    Flux<String >stream1 = Flux.just("Hello", "world");  
    Flux<Interger> stream2 = FLux.fromArray(new Integer[] {1, 2, 3});  
    FLux<Integer> stream3 = Flux.fromIterable(Arrays.asList(9, 8, 7));  
    ```  
  
- 리액티브 스트림 구독  
  - 프로듀서가 제한된 수요를 처리하는 데 적합하다면 구독 객체로 수요를 제어하거나 요청 제한 연산자를 적용하는 것이 좋다.   
    ``````java  
    Flux.range(1, 100)  
        .subscribe(  
            data -> log.info("onNext: {}", data),  
            err -> { /* ignore */ },  
            () -> log.info("onComplete"),  
            subscription -> {  
                subscription.request(4);  
                subscription.cancel();  
            }  
        );  
    ```  
      
    ``````java  
    onNext :1  
    onNext :2  
    onNext :3  
    onNext :4  
    ```  
  
- 연산자를 이용한 리액티브 시퀀스 변환  
  - 리액티브 시퀀스의 원소 매핑  
    ``````java  
    Flux.range(2018, 5)  
        .timestamp()  
        .index() //열거형으로 전환  
        .subscribe(e -> log.info("index: {}, ts: {}. value: {}",  
            e.getT1(),  
            Instant.ofEpochMilli(e.getT2().getT1()),  
            e.getT2().getT2()));  
    ```  
      
    ``````java  
    index: 0, ts: 2021-09-11T01:43:14.561Z. value: 2018  
    index: 1, ts: 2021-09-11T01:43:14.581Z. value: 2019  
    index: 2, ts: 2021-09-11T01:43:14.581Z. value: 2020  
    index: 3, ts: 2021-09-11T01:43:14.581Z. value: 2021  
    index: 4, ts: 2021-09-11T01:43:14.582Z. value: 2022  
    ```  
  - 리액티브 시퀀스 필터링  
    ``````java  
    Mono<?> startCommand = ...  
    Mono<?> stopCommand = ...  
    Flux<UserEvent> streamOfData = ...  
    
     streamOfData  
        .skipUntilOther(startCommand)  
        .takeUntilOther(stopCommand)  
        .subscribe(System.out::println);  
    ```  
  
  - 리액티브 시퀀스 수집하기  
    ``````java  
    Flux.just(1, 6, 2, 8, 3, 1, 5, 1)  
        .collectSortedList(Comparator.reverseOrder())  
        .subscribe(System.out::println);  
    ```  
    ``````java  
    [8, 6, 5, 3, 2, 1, 1, 1]  
    ```  
      
  - 스트림의 원소 줄이기  
    ``````java  
    Flux.just(3, 5, 7, 9, 11 ,15, 16, 17)  
        .any(e -> e % 2 == 0)  
        .subscribe(hasEvens -> log.info("Has evens: {}", hasEvens);  
    ```  
    ``````java  
    true  
    ```  
    ``````java  
    Flux.just(1, 2, 3)  
        .thenMany(FLux.just(4, 5))  
        .subscribe(e -> log.info("onNext: {}", e);  
    ```  
    ``````java  
    onNext: 4  
    onNext: 5  
    ```  
  
  - 리액티브 스트림 조합하기  
    ``````java  
    Flux.concat(  
        Flux.range(1, 3),  
        Flux.range(4, 3),  
        Flux.range(6, 5)  
    ).subscribe(e -> log.info("onNext: {}", e));  
    ```  
    ``````java  
    [1,2,3] + [4, 5] + [6,7,8,9,10]  
    onNext: 1  
    onNext: 2  
    onNext: 3  
    onNext: 4  
    onNext: 5  
    onNext: 6  
    onNext: 6  
    onNext: 7  
    onNext: 8  
    onNext: 9  
    onNext: 10  
    ```  
  
  - 스트림 내의 원소 일괄 처리하기  
    ``````java  
    Flux.range(1, 13)  
        .buffer(4)  
        .subscribe(e -> log.info("onNext: {}", e);  
    ```  
    ``````java  
    onNext: [1, 2, 3, 4]  
    onNext: [5, 6, 7, 8]  
    onNext: [9, 10, 11, 12]  
    onNext: [13]  
    ```  
  
  - flatMap  
    ``````java  
    public Flux<String> requestBooks(String user) {  
		return Flux.range(1, random.nextInt(3) + 1)  
			.map(i -> "book-" + i)  
			.delayElements((Duration.ofMillis(3)));  
	}  
    ```  
    ``````java  
	Flux.just("user-1", "user-2", "user-3")  
		.flatMap(u -> requestBooks(u)  
			.map(b -> u + "/" + b))  
		.subscribe(r -> log.info("onNext: {}", r));  
    ```  
    ``````java  
    onNext: user-3/book-1  
    onNext: user-1/book-1  
    onNext: user-1/book-2  
    ```
  
    - 리액티브 시퀀스를 블로킹 구조로 전환하기  
    - toIterable : Reactive Flux -> Blocking Iterable  
    - toStream : Reactive Flux -> Blocking Stream API  
      - reactor 3.2 : toIterable을 사용  
    - blockFirst : upstream이 첫번쨰 값을 보내거나 완료될 때까지 현재 스레드를 차단  
    - blockLast : upstream이 마지막 값을 보내거나 완료될 떄까지 현재 스레드를 차단  
      - onError는 차단된 스레드에 예외가 발생  
        
  - 시퀀스를 처리하는 동안 처리 내역 살펴보기  
    - doOnNext(Consumer <T>) 는 Flux나 Mono의 각 원소에 대해 어떤 액션을 수행할 수 있게 해준다.  
    - doOnComplete(), doOnError(Throwable)는 대응 이벤트 발생시에 호출  
    - doOnSubscribe(Consumer<Subscription>), doOnRequest(LongConsumer), doOnCancel(Runnable)을 사용하면 구독 라이플 사이클 이벤트에 대응할 수 있다.  
    - doOnTerminate(Runnable)는 스트림 종료 시에 종료의 원인과 관계없이 기동  
  
  - 모든 신호를 처리하는 doOnEach(Consumer<Signal>)  
    ``````java  
    Flux.just(1, 2, 3)  
	  	    .concatWith(Flux.error(new RuntimeException("Conn error")))  
		    .doOnEach(s -> log.info("signal: {}", s))  
		    .subscribe();  
    ```  
    ``````java  
    signal: doOnEach_onNext(1)  
    signal: doOnEach_onNext(2)  
    signal: doOnEach_onNext(3)  
    signal: onError(java.lang.RuntimeException: Conn error)  
    ```  
      
  - 데이터를 시그널로 변환하기  
    ``````java  
    Flux.range(1, 3)  
		    .doOnNext(e -> log.info("data : {}", e))  
		    .materialize()  
		    .doOnNext(e -> log.info("signal: {}", e))  
		    .dematerialize()  
		    .collectList()  
		    .subscribe(r -> log.info("result: {}", r));  
    ```  
    ``````java  
    data : 1  
    signal: onNext(1)  
    data : 2  
    signal: onNext(2)  
    data : 3  
    signal: onNext(3)  
    signal: onComplete()  
    result: [1, 2, 3]  
    ```  
  
## 리액터를 이용해 스트림 만들기  
  
- 팩토리 메서드 - push / create  
  - push : 단일 스레드 생성자를 적용해 Flux 인스턴스를 생성  
    - 배압과 cancel에 대한 걱정없이 비동기, 단일 스레드, 다중 값을 가지는 API를 적용하는데 유용  
    - 구독자가 부하를 처리할 수 없는 경우 배압과 취소는 모두 큐를 이용해 처리됨  
    ``````java  
    Flux.push(emitter -> IntStream  
			      .range(2000, 3000)  
			      .forEach(emitter::next))  
			      .delayElements(Duration.ofMillis(1))  
			      .subscribe(e -> log.info("onNext: {}", e));  
    ```  
    ``````java  
    onNext: 2000  
    onNext: 2001  
    onNext: 2002  
    onNext: 2003  
    onNext: 2004  
    onNext: 2005  
    onNext: 2006  
    onNext: 2007  
    onNext: 2008  
    ```  
  - create : FluxSink 인스턴스를 추가로 직렬화하므로 다른 스레드에서 이벤트를 보낼 수 있게 된다.  
    ``````java  
    Flux.create(emitter -> {  
				        emitter.onDispose(() -> log.info("Disposed"));  
				        // push events to emitter  
			      })  
			      .subscribe(e -> log.info("onNext: {}", e));  
    ```  
  
- 팩토리 메서드 - generate  
  - generate : 메서드를 호출하는 오브젝트의 내부 전달 상태를 기반으로 복잡한 시퀀스를 만들 수 있도록 설계됨  
    ``````java  
    Flux.generate(  
				        () -> Tuples.of(0L, 1L),  
				        (state, sink) -> {  
					          log.info("generated value: {}", state.getT2());  
					          sink.next(state.getT2());  
					          long newValue = state.getT1() + state.getT2();  
					          return Tuples.of(state.getT2(), newValue);  
				        })  
			      .delayElements(Duration.ofMillis(1))  
			      .take(7)  
			      .subscribe(e -> log.info("onNext: {}", e));  
    ```  
    ``````java  
    generated value: 1  
    generated value: 1  
    generated value: 2  
    generated value: 3  
    generated value: 5  
    generated value: 8  
    generated value: 13  
    generated value: 21  
    generated value: 34  
    generated value: 55  
    generated value: 89  
    ...  
    ```  
- 일회성 리소스에 의존하는 스트림 생성 - using  
  - using : 리액티브 프로그래밍에서 사용하는 try-with-resources 방식의 접근법  
    ``````java  
    Flux<String> ioRequestResults = Flux.using(  
        Connection::newConnection,  
        connection -> Flux.fromIterable(connection.getData()),  
        Connection::close  
    );  
      
    ioRequestResults.subscribe(  
        data -> log.info("Received data: {}", data),  
        e -> log.info("Error: {}", e.getMessage()),  
        () -> log.info("Stream finished"));  
    )  
    ```  
  
- usingWhen 팩토리를 사용한 리액티브 트랜잭션 래핑  
  - Publisher의 인스턴스에 가입해 관리되는 리소스를 리액티브 타입으로 검색  
  - 메인 스트림의 성공 및 실패에 대해 각각 다른 핸들러를 사용할 수 있음  
    - usingWhen 연산자만으로 완전한 논블로킹 리액티브 트랜잭션을 구현할 수 있게 됨  
  
    ``````java  
    Flux.usingWhen(  
    		    Transaction.beginTransaction(),  
    		    transaction -> transaction.insertRows(Flux.just("A", "B", "C")),  
    		    Transaction::commit,  
    		    Transaction::rollback  
    	  ).subscribe(  
    		    d -> log.info("onNext: {}", d),  
    		    e -> log.info("onError: {}", e.getMessage()),  
    		    () -> log.info("onComplete")  
    	  );  
  ```  ```
    
- 에러 처리하기
    - 최종 구독자가 onError 시그널에 대한 핸들러를 정의하지 않으면 UnsupportedOperationException을 발생시킨다.  
  - 리액티브 스트림은 onError가 스트림이 종료됐다고 정의하기 있기 때문에 시그널을 받으면 시퀀스가 실행을 중지한다.

    ```java ``` 
     //신뢰도가 낮은 문제있는 추천서비스  
	public Flux<String> recommendedBooks(String userId) {  
		return Flux.defer(() -> {  
			if (random.nextInt(10) < 7) {  
				return Flux.<String>error(new RuntimeException("Err"))  
					.delaySequence(Duration.ofMillis(100));  
			} else {  
				return Flux.just("Blue Mars", "The Expanse")  
					.delayElements(Duration.ofMillis(50));  
			}  
		}).doOnSubscribe(s -> log.info("Request for {}", userId));  
	}  
    ```
    ```java  ```
      Flux.just("user-1")  
			.flatMap(user ->  
				recommendedBooks(user)  
					.retryWhen(Retry.backoff(5, Duration.ofMillis(100)))  
					.timeout(Duration.ofSeconds(3))  
					.onErrorResume(e -> Flux.just("The Martian"))  
			).doOnSubscribe(  
				b -> log.info("onNext: {}", b),  
				e -> log.warn("onError: {}", e.getMessage()),  
				() -> log.info("onComplete")  
			);  
    ```  
  
## 배압 다루기  
  - onBackPressureBuffer : 제한되지 않은 요구를 요청하고 결과를 다운스트림으로 푸시  
  - onBackPressureDrop : 제한되지 않은 요구를 요청하고 데이터를 하위로 푸시  
  - onBackPressureLast : onBackPressureDrop과 유사. 가장 최근에 수신된 원소를 기억하고, 요청이 발생하면 다운스트림으로 푸시  
  - onBackPressureError : 데이터를 다운스트림으로 푸시하는 동안 크기를 제한하지 않고 요청  
    
## Hot 스트림 / Cold 스트림  
  - Cold 퍼블리셔는 구독자가 나타날 때마다 해당 구독자에 대해 모든 시퀀스 데이터가 생성되는 방식으로 동작한다.  
  - 구독자 없이는 데이터가 생성되지 않는다.  
    ``````java  
	Flux<String> coldPublisher = Flux.defer(() -> {  
		log.info("Generating new items");  
		return Flux.just(UUID.randomUUID().toString());  
	});  
  
	log.info("No Data was generated so far");  
	coldPublisher.subscribe(e -> log.info("onNext: {}", e));  
	coldPublisher.subscribe(e -> log.info("onNext: {}", e));  
	log.info("Data was generated twice for two subscribers");  
    ```  
    ``````java  
    No Data was generated so far  
    Generating new items  
    onNext: 017d3204-67e1-41cb-87f9-359a70e425c6  
    Generating new items  
    onNext: 29681cff-6c83-487c-aced-855dfb4d7ed4  
    Data was generated twice for two subscribers  
    ```  
      
  - Hot 퍼블리셔의 데이터 생성은 구독자의 존재 여부에 의존하지 않는다. 따라서 핫 퍼블리셔는 첫번째 구독자가 구독을 시작하기 전에 원소를 만들어내기 시작할 수 있다.  
      
  - 스트림 원소를 여러 곳으로 보내기  
    - 콜드 퍼블리셔를 리액티브변환을 통해 핫 퍼블리셔로 전환할 수 있다.  
    - ConnectableFlux를 이용하면 가장 수요가 많은 데이터를 생성하고, 다른 모든 가입자가 데이터를 처리할 수 있도록 캐싱된다.  
    ``````java  
    Flux<Integer> source = Flux.range(0, 3)  
		.doOnSubscribe(s -> log.info("new subscription for the cold publisher"));  
  
	ConnectableFlux<Integer> conn = source.publish();  
  
	conn.subscribe(e -> log.info("subscriber1 on next: {}", e));  
	conn.subscribe(e -> log.info("subscriber2 on next: {}", e));  
  
	log.info("all subscribers are ready, connection");  
	conn.connect();  
    ```  
    ``````java  
    all subscribers are ready, connection  
    new subscription for the cold publisher  
    subscriber1 on next: 0  
    subscriber2 on next: 0  
    subscriber1 on next: 1  
    subscriber2 on next: 1  
    subscriber1 on next: 2  
    subscriber2 on next: 2  
    ``` 
  
   - 스트림 내용 캐싱하기  
    - ConnectableFlux를 이용하면 다양한 캐싱 전략을 쉽게 구성할 수 있다.  
    - ConnectableFlux를 이용하면 가장 수요가 많은 데이터를 생성하고, 다른 모든 가입자가 데이터를 처리할 수 있도록 캐싱된다.  
    ``````java  
    Flux<Integer> source = Flux.range(0, 3)  
			.doOnSubscribe(s -> log.info("new subscription for the cold publisher"));  
  
	Flux<Integer> cachedSource = source.cache(Duration.ofSeconds(1));  
  
	cachedSource.subscribe(e -> log.info("subscriber1 on next: {}", e));  
	cachedSource.subscribe(e -> log.info("subscriber2 on next: {}", e));  
  
	Thread.sleep(1200);  
  
	cachedSource.subscribe(e -> log.info("subscriber3 on next: {}", e));  
    ```  
    ``````java  
    new subscription for the cold publisher  
    subscriber1 on next: 0  
    subscriber1 on next: 1  
    subscriber1 on next: 2  
    subscriber2 on next: 0  
    subscriber2 on next: 1  
    subscriber2 on next: 2  
    new subscription for the cold publisher  
    subscriber3 on next: 0  
    subscriber3 on next: 1  
    subscriber3 on next: 2  
    ```  
      
  - 스트림 내용 공유  
    - ConnectableFlux를 사용해 여러개의 구독자에 대한 이벤트를 멀티캐스트 할 수 있다.  
    - share연산자로 콜드 퍼블리셔를 핫 퍼블리셔로 변환할 수 있으며, 구독자가 각 신규 구독자에게 이벤트를 전파하는 방식으로 작동된다.  
    ``````java  
    Flux<Integer> source = Flux.range(0, 5)  
		.delayElements(Duration.ofMillis(100))  
		.doOnSubscribe(s -> log.info("new subscription for the cold publisher"));  
  
	Flux<Integer> cachedSource = source.share();  
  
	cachedSource.subscribe(e-> log.info("subscription 1 on next: {}", e));  
	Thread.sleep(400);  
	cachedSource.subscribe(e-> log.info("subscription 2 on next: {}", e));  
    ```  
    ``````java  
    new subscription for the cold publisher  
    subscription 1 on next: 0  
    subscription 1 on next: 1  
    subscription 1 on next: 2  
    subscription 1 on next: 3  
    subscription 2 on next: 3  
    ```    
  
## 시간 다루기  
  - elapsed로 이전 이벤트와의 시간 간격을 측정  
    ``````java  
    Flux.range(0, 5)  
  	    .delayElements(Duration.ofMillis(100))  
	    .elapsed()  
	    .subscribe(e -> log.info("elapsed {} ms : {}", e.getT1(), e.getT2()));  
    ```  
      
## 리액티브 스트림을 조합하고 변환하기  
  - transform : 스트림 구조 자체를 변경할 수 있다.  
    ``````java  
    Function<Flux<String>, Flux<String>> logUserInfo = stream -> stream.index()  
		    .doOnNext(tp -> log.info("[{}] User: {}", tp.getT1(), tp.getT2()))  
		    .map(Tuple2::toString);  
  
    Flux.range(1000, 3)  
		    .map(i -> "user-" + i)  
		    .transform(logUserInfo)  
		    .subscribe(e -> log.info("onNext: {}", e));  
    ```  
    ``````java  
    [0] User: user-1000  
    onNext: [0,user-1000]  
    [1] User: user-1001  
    onNext: [1,user-1001]  
    [2] User: user-1002  
    onNext: [2,user-1002]  
    ```  
  - transformDeferred: transform과 유사. 구독자가 도착할 때마다 동일한 스트림 변환 작업을 수행한다.(composer deprecated)  
    ``````java  
    Function<Flux<String>, Flux<String>> logUserInfo = (stream) -> {  
  		    if (random.nextBoolean()) {  
  			      return stream.doOnNext(e -> log.info("[path A] User: {}", e));  
  		    } else {  
  			      return stream.doOnNext(e -> log.info("[path B] User: {}", e));  
  		    }  
  	  };  
    
    Flux<String> publisher = Flux.just("1", "2")  
  	      .transformDeferred(logUserInfo);  
    
    publisher.subscribe();  
    publisher.subscribe();  
    ```  
    ``````java  
    [path A] User: 1  
    [path A] User: 2  
    [path B] User: 1  
    [path B] User: 2  
    ```  
      
## Processor  
  - Publisher이면서 동시에 Subscriber  
  - 구독이 가능하며, 시그널(onNext, onError, onComplete)를 수동으로 보낼 수 있다.  
  - 하지만 Processor 사용을 권장하지 않으며, 패곹리 메서드(push, create, generate) 사용을 권장  
    - direct : Process 입력부를 직접 구현해 데이터를 푸시  
    - Synchronous  
    - Asynchronous  
  
## 리액터 프로젝트 테스트 및 디버깅  
  - 심화학습장에서 다룸  
  ``````java  
  // stacktrace 수집  
  Hooks.onOperatorDebug();  
  ```

...