자료2014. 1. 2. 14:34

프로그래밍을 처음 배울 때부터 사용했던 C의 fread, C++의 ifstream, Java의 InputStream 등은 모두 IO 작업이 완료될 때까지 스레드가 멈추는 Blocking API이다. 디스크 IO, 네트워크 IO 등을 기다리느라 수 밀리세컨드 동안 아무 것도 하지 않는 것은 귀중한 CPU 자원의 낭비이며, 프로그램의 전체적인 성능 하락을 불러오게 된다.

이를 해결하기 위해 스레드가 멈추지 않는 방식으로 프로그램을 작성하는 것을 비동기 프로그래밍이라고 부른다. 일반적으로 비동기 프로그래밍에선 특정 동작이 완료되었을 때에 호출되는 콜백을 등록하는데, 이를 남용하면 프로그램의 복잡도가 크게 상승하여 콜백 지옥이라는 늪에 빠지게 된다.

Microsoft에서 개발한 Reactive Extensions(RX)와 Netflix에서 이를 JVM 기반으로 포팅한 RxJava 는 이러한 비동기 프로그래밍을 간결하게 할 수 있도록 도와주는 라이브러리다. 이 라이브러리를 사용하면 모든 비동기 동작은 Future<T>와 Iterable<T>의 특징을 모두 갖고 있는 Observable<T> 타입을 바탕으로 이루어진다. 

ZooKeeper와 같이 원격으로 관리되는 트리의 예를 들어보자. 동기 API 는 아래와 같은 형태일 것이다.


List<String> getChildren(String path); // 주어진 경로의 하위 노드들을 구함
void delete(String path);              // 자식 노드가 없는 노드를 삭제
void deleteAll(String path);           // 주어진 노드와 모든 자식 노드를 삭제


RxJava를 이용한 비동기 API는 여기서 리턴타입을 Observable로 감싼 형태가 된다.


Observable<List<String>> getChildren(String path); // 주어진 경로의 하위 노드들을 구함
Observable<Void> delete(String path);              // 자식 노드가 없는 노드를 삭제
Observable<Void> deleteAll(String path);           // 주어진 노드와 모든 자식 노드를 삭제


Observable클래스에는 Observable 을 처리하기 위한 다양한 메소드들이 포함되어 있으며, 이들을 이용하여 Observable 형태로 가져온 리턴값을  값을 변형하거나, 여러 Observable과 결합하거나, 필터링하여 새로운 Observable을 만드는 비동기 동작에 활용할 수 있고, 필요한 경우엔 BlockingObservable로 변환하여 원하는 결과가 주어질 때까지 기다릴 수 있다. 

예시로 든 deleteAll 함수는 아래와 같이 getChildren과 delete를 이용하여 만들 수 있다. getChildren 함수를 이용해 현재 노드의 모든 자식 노드를 가져와서 재귀적으로 삭제한 뒤 delete 함수로 해당 노드를 삭제하는 방식이다.


void deleteAll(String path) {
    List<String> children = getChildren(path);
    for (String child : children) {
        deleteAll(path + "/" + child);
    }
    delete(path);
}


getChildren과 delete의 비동기 버전, 즉 Observable 을 반환하는 버전을 이미 만들었다고 하면 이를 이용해서 어떻게 deleteAll의 비동기 버전을 만들 수 있을까? 약간의 고민 뒤에 내가 당도한 해결책은 다음과 같다.


Observable<Void> deleteAll(String path) {
    return getChildren(path).flatMap(children -> {
        List<Observable<Void>> requests = transform(children, child -> 
            deleteAll(path + "/" + child)
        );
        return merge(from(requests)).lastOrDefault(null);
    }).flatMap(done -> delete(path));
}

가독성을 위해 Observable.merge, Observable.from과 Guava의 Lists.transform 함수를 static import 했다. 

1행은 앞서 말한 것과 같이 void 였던 리턴 타입을 Observable<Void>로 바꾼 모습이다. 이 Observable에 subscribe하게 되면 onNext를 통해 값을 전달하지는 않고, 동작이 정상적으로 끝났을 때 onComplete를, 에러가 발생했을 때 onError가 호출된다.

2행에서는 getChildren 메소드를 호출하여 돌려받은 Observable<List<String>> 타입의 리턴값에 flatMap 메소드를 이어서 호출한다. 이렇게 해서 비동기적으로 자식 노드들의 리스트를 구해오고, 구해온 다음에 수행할 동작을 표현할 수 있다. 여기서 flatMap 메소드에 넘기는 인자는 List<String>을 받아서 Observable<Void>를 반환하는 함수이며, 역시 Java 8 lambda로 작성이 가능하다.

3행~5행에서는 children 각각에 대해 deleteAll 함수를 부른다. 자식 노드 각각에 대해 Observable을 부르기 때문에 여러 개의 Observable<Void>를 얻게 되고, Guava의 Lists.transform 을 이용해 List로 만들었다.

6행의 merge(from(request))는 List<Observable<Void>> 타입의 requests에 포함된 모든 Observable을 결합한 하나의 Observable을 만든다. 이 Observable<Void>는 모든 자식노드에 대한 처리가 성공한 경우에 onComplete로 끝나며, 하나라도 실패한 경우 onError를 발생시킨다. lastOrDefault 는 이 Observable이 끝날 때 null을 전달하도록 하기 위해 추가했으며, 아래쪽의 subscribe 부분을 간단히 작성할 수 있도록 하는 트릭이다.

7행에선 3행부터 시작한 flatMap 호출이 끝난다. 이 Observable이 완료된 시점에서는 하위 노드들이 모두 지워졌다고 가정할 수 있으므로, 한번 더 flatMap 메소드를 호출하여 path 경로에 있는 노드를 지우도록 한다.


세 번의 lambda를 사용했음에도 다섯 줄로 끝나는 동기 버전보다 현저히 가독성이 떨어지는 것이 사실이다. 자바 8에서조차 C#이나 Scala에 존재하는 async/await을 사용할 수 없고 7행에서처럼 비동기 연산의 결과를 다양한 방식으로 조작하는 것은 async/await이 있더라도 한계가 있는 부분이라, 비동기 어플리케이션을 구현하기 위한 아직은 어쩔 수 없는 trade-off라고 봐야 할 것 같다.

그래도 안도할 것은 Java 8 의 lambda 덕분에 가독성이 몇 배는 좋아졌다는 점이다. 위 구현에서 사용한 lambda 3개를 모두   anonymous class로 써야 했다면 누구라도 일찌감치 포기하고 말았을 것이다. Scala에서만 가능하다고 생각했던 Functional Reactive Programming 이 Java 8 에서야 비로소 현실적인 대안으로 자리잡은 듯하다. 과도한 패턴으로 악명이 높은 구시대적 자바의 관습이 Java 8을 계기로 보다 미래지향적으로 전환되길 하는 바람을 가져 본다.



Posted by jongwook