안녕하세요. 볼드나인 백엔드 개발자 박연호 입니다.
이번 글에서는 nodejs환경에서 여러개의 비동기를 좀 더 효율적으로 처리하는 방법으로 @supercharge/promise-pool을 소개하면서, 기존의 여러개의 비동기 작업을 처리하는 방법에 비해 장점, 그리고 실제 코드를 분석하면서 어떤식으로 구현되어 있는지 알아보겠습니다.
여러개의 비동기 작업을 일괄로 처리
nodejs를 활용하여 다양한 비동기 작업을 처리할 수 있습니다.
•
외부 서비스 api를 호출하는 http 요청
•
orm을 사용하여 대용량 조회, 삽입 작업
•
MSA환경에서 이벤트 publish 작업
이외에도 수많은 Promise 작업들이 존재하며 보통 처리량을 높이기 위해 Promise.all, Promise.allSettled를 사용합니다.
await Promise.all(["1초 걸리는 작업", "3초 걸리는 작업", "2초 걸리는 작업"])
await Promise.all(["4초 걸리는 작업", "2초 걸리는 작업", "5초 걸리는 작업"])
JavaScript
복사
위 코드에서 각각의 Promise.all의 실행되는 시간은 Promise.all에서 가장 처리시간이 오래 걸리는 작업이 됩니다. 위의 코드예시에서 첫번째 Promise.all은 3초, 두번째 Promise.all은 5초로 총 8초가 소요됩니다.
총 6개의 Promise를 처리하는데 8초가 소요 되었는데, 사실 첫번째 Promise.all에서 1초 Promise, 2초 Promise는 이미 작업을 끝냈습니다. 하지만 3초 Promise가 작업을 끝내야, 다음 Promise.all이 실행되면서 다른 Promise작업이 실행됩니다.
모든 Promise는 결국 실행되야 하기 때문에, 3초 Promise를 끝날때까지 기다리는 것이 아닌, 1초 Promise 작업이 끝나면 곧바로 5초 Promise작업을 시작하고, 2초 Promise 작업이 끝나면 4초 Promise를 시작하면 Promise.all 실행보다 더 짧은 시간안에 작업을 완료할 것입니다.
5개의 라인이 있는 워터파크 슬라이드를 생각해보면, 5개의 라인에 있는 사람들이 도착지점에 도달하는 시간이 모두 다를 것입니다. 그렇다면 모든 라인에 있는 사람이 도착지점에 도달하기까지 기다리는 것이 아니라, 먼저 도착한 라인을 재빨리 다음 사람을 내려보내는 것이 훨씬 더 많은 사람이 슬라이드를 즐길 수 있을 것입니다.
슬라이드 라인과 마찬가지로 5개 라인(promise pool)을 만들어 첫번째 사람이 라인을 다 탔다면(Promise를 처리했으면) 다른 라인의 도착 여부와 상관없이 첫번째 사람을 슬라이드에 태우는 방법이 @supercharge/promise-pool 의 아이디어 이며 동작하는 방식입니다.
Promise.all vs Promise pool 실행시간 비교
/**
* 1000개의 배열에 500 ~ 2000사이의 값을 삽입
*/
const generateMs = () => {
return Array.from(
{ length: 1000 },
() => Math.floor(Math.random() * (2000 - 500 + 1)) + 500
);
};
const sleep = (ms: number) => {
return new Promise((res, rej) => {
setTimeout(() => {
res(true);
}, ms);
});
};
/**
* 배열을 size만큼 slice
* [1,2,3,4,5,6] -> [[1,2],[3,4],[5,6]]
*/
function chunkArray<T>(array: T[], size: number): T[][] {
const result: T[][] = [];
for (let i = 0; i < array.length; i += size) {
result.push(array.slice(i, i + size));
}
return result;
}
const promiseAll = async () => {
const randomMs = chunkArray(generateMs(), 100);
console.time("promiseAll timer");
for (const msArray of randomMs) {
await Promise.all(msArray.map((v) => sleep(v)));
}
console.timeEnd("promiseAll timer");
};
const promisePool = async () => {
console.time("promisePool timer");
await PromisePool.for(generateMs()).withConcurrency(100).process(sleep);
console.timeEnd("promisePool timer");
};
(async () => {
await promiseAll();
await promisePool();
})();
JavaScript
복사
0.5 ~ 2초 사이의 랜덤한 시간만큼 delay하는 Promise 함수로 성능 비교해보겠습니다.
500 ~ 2000사이의 값으로 1000길이의 배열을 초기화하고, 500 ~ 2000초 사이의 값을 받아서 setTimeout 함수로 대기하는 Promise 함수를 작성합니다.
•
Promise.all은 배열을 100개 단위로 잘라서 총 10번의 Promise.all을 실행
•
promise pool은 동시에 실행되는 Promise를 100개로 제한
결과는 Proise.all은 19s, promise pool은 13초로 promise pool을 사용했을 때 성능이 31% 개선된 것을 알 수 있습니다.
Promise.all은 총 10번을 반복합니다. 각각의 Promise 함수는 2초에 가까운 시간만큼 대기하며 총 10번을 실행하기 때문에 실행시간은 20초에 가깝습니다. 반면에 promise pool 방법은 Promise 100개를 동시에 실행하기 때문에 13초가 걸리게 됩니다.
동시에 Promise를 처리할 수 있는 개수를 증가하면 처리량이 높아지며 200개로 했을 때 7.5s, 300개로 했을 때 5.4s 시간이 소요됩니다.
@supercharge/promise-pool 코드 뜯어보기
코드를 이해하기 전에 선행학습으로 Promise.race 메서드는 가장 빨리 resolve한 Promise값을 반환합니다. 하지만, 중요한 점은 첫번째 이후의 Promise은 첫번째에만 못들었을 뿐이지 결국에는 resolve 된다는 것입니다.
const sleep = (ms: number) => {
return new Promise((resolve, reject) => {
setTimeout(() => {
resolve(ms);
console.log(ms);
}, ms);
});
};
(async () => {
Promise.race([sleep(1000), sleep(2000), sleep(3000)]).then((v) => {
console.log("결과 : ", v);
});
})();
// 출력
1000
결과 : 1000
2000
3000
JavaScript
복사
위 코드에서 결과값을 보면 2,3초 Promise는 결과값에는 출력되지 않았지만 결국에는 resolve함수가 실행됩니다. 이점을 생각하고 코드를 분석하면 더 이해가 잘됩니다.
promise pool의 시작점은 pool에서 실행할 promise 함수를 process 메서드에 전달하면서 시작합니다.
msArray : 500 ~ 2000사이의 값이 저장되어있는 배열
100 : 동시에 실행한 Promise의 개수(워터파크 슬라이드 라인 수)
sleep : 500 ~ 2000값을 받아 대기하는 Promise를 반환하는 함수
process 메서드에서는 pool에서 사용할 초기값을 설정하고 start 메서드를 호출합니다. callback함수는 위에서 전달받은 sleep함수로, Promise pool에서 실제로 실행될 함수 입니다.
start 함수에서는 process() 메서드를 실행합니다.
this.item()의 반환값은 앞서 초기화한 500~2000사이의 값이 있는 배열입니다. 이 값을 루프 돌면서 어떤 처리를 한 후 마지막에 this.drained()를 호출하면서 종료됩니다. 이게 전부 입니다.
루프안에서 중요한 메서드는 this.startProcessing()인데, 500~2000사이의 값 하나와 index를 전달하고 있습니다.
startProcessing에서는 첫번째로 this.createTaksFor()를 호출하여 Promise를 반환받는데, 이 Promise는 앞서 PromisePool.process(sleep)에서 전달한 sleep 함수의 반환값 입니다. createTaksFor 호출 시 전달한 item(500~2000사이의 값)이 sleep의 함수로 들어가게 됩니다. 결국에 task값은 특정 시간동안 sleep하는 Promise 입니다.
이렇게 생성한 Promise는 this.task()에서 반환한 배열에 넣습니다. 이 배열의 역할을 "현재 실행중인 Promise 작업들의 목록" 입니다.
Promise를 생성했으니 동시에 몇개를 관리할 지 판단하기 위해 배열에 저장해야 합니다.
여기서 가장 중요한 점은, Promise의 then, catch 메서드 내부에서 호출하는 removeActive입니다.
removeActive에서는 "현재 실행중인 Promise 작업들의 목록"에서 현재 Promise를 제거하고 있습니다. 즉 Promise가 완료/오류가 발생했으면 결국엔 마무리 된것이기 때문에 다음 Promise에게 턴을 줘야하기 때문에 제거하고 있습니다.
워터파크에서 n번째 라인을 다 탔으면 자리를 비워주고 다음 사람이 탈 수 있게 턴을 넘겨줘야 합니다.
지금까지 500~2000사이의 값 루프를 돌면서 Promise를 만들고 배열에 넣고 있고, 해당 Promise가 완료되면 배열에 제거해주는 로직까지 알아봤습니다.
지금 까지는 Promise를 만들고 "현재 실행중인 Promise 작업들의 목록에 넣기만 했을 뿐 동시에 실행될 Promise들의 제어, Promise 완료/에러 처리를 하지 않았습니다.
위 코드가 그 역할을 하며, 동시에 처리할 수 있는 Promise를 제어하면서 Promise의 작업을 완료/에러 처리하여 "현재 실행중인 Promise 작업들의 목록"에서 제거해줘야 합니다. 그래야 다른 Promise이 처리되기 때문이죠.
앞서 루프를 돌면서 Promise를 만들고 "현재 실행중인 Promise 작업들의 목록"에 계속 push하는데 계속 넣는것이 아니라 pool size만큼 push합니다. pool size는 앞서 PromisePool을 초기화할 때 withConcurrency로 초기화 했습니다.
pool size가 100개라면 98, 99, 100까지 Promise를 채우고 101이 되는 순간 이제는 비워줘야 합니다.
101이 되는순간 "현재 실행중인 Promise 작업들의 목록"을 완료/에러 처리를 합니다. race 메서드이기 때문에 가장 빨리 마무리되는 작업이 마무리되고 해당 라인을 지나 그 다음 반복문이 실행됩니다. 이후 반복문이 진행되면 앞서 진행한 Promise만들고, "현재 실행중인 Promise 작업 들의 목록"에 넣고 while문에서 대기하고 ...과정을 반복합니다.
여기서 가장 중요한 부분은, race메서드에서 가장 첫번째로 완료한 그리고 그 이후의 Promise 모두 then, catch내부에서 removeActive가 실행됩니다. 앞서 removeActive의 역할은 해당 Promise를 "현재 실행중인 Promise 작업들의 목록"에서 제거하는 것입니다. 이로써 목록에 자리가 생기고, 위의 while문의 조건절에 영향을 주어 "현재 실행중인 Promise 작업들의 목록"에 공석이 생깁니다.
for문을 돌면서 새로 생성한 Promise들은 이 공석에 들어가게 되고, "현재 실행중인 Promise 작업들의 목록"의 길이가 100을 넘어가게 되면 또 다시 race를 하게 됩니다.
이 과정을 반복하면서 동시에 실행할 수 있는 Promise들의 개수가 pool size만큼 팽팽하게 유지되면서 하나의 Promise가 완료되면 다른 Promise가 곧바로 처리되게 됩니다.
for문을 모두 돌았으면, 처리 중인 Promise 작업들이 모두 완료되기를 기다렸다가(await Promise.all()) 결과값을 반환합니다. this.error(), this.results()에서 반환하는 값은 Promise가 각각 then, catch 메서드에서 처리한 값을 반환합니다.
“남아있는 Promise를 모두 처리하고 값을 반환”이라는 의미에서 메서드 명을 drain이라고 사용한것이 직관적이고 재미있습니다.
마무리
코드를 보면서 다른 라이브러리 사용 없이 Promise의 기본 메서드를 사용했다는 점과 Promise.race()를 사용하여 공석이 1개 비었을 때 다음 Promise를 처리한다는 아이디어가 재미있었습니다. 라이브러리를 사용할 때 그냥 사용하는 것보단 내부 구현이 어떻게 되어있는지 확인하고 사용한다면 좀 더 코드의 흐름을 파악할 수 있고, 본인꺼로 만들 수 있다고 생각합니다.
@supercharge/promise-pool 를 사용할 때 바로 사용하는 것보단, 기존 Promise.all로 구현했을 때 성능면에서 이점이 있다고 생각되면 그때 사용하는것이 좋다고 생각합니다.