Search

비동기: 동시성 프로그래밍2

지연 평가 + Promise - L.map, map, take

이터러블을 다루는 많은 함수들은 map, filter, reduce를 뼈대로 응용해서 함수합성등을 통해 (ex: L.map + takeAll, flatMap...) 응용 함수들을 만들었습니다.
L.map, map, take는 기본적으로 동기적으로 돌아가는 상황에서만 정상적인 동작을 보장했습니다. reduce, pipe등 비동기상황에서도 동작하는 이런 함수들처럼 L.map, map, take함수들도 비동기적 상황에서도 정상동작하도록 코드를 리팩토링 해보겠습니다.

before - Promise를 인자값으로 L.map 사용

go([Promise.resolve(1), Promise.resolve(2), Promise.resolve(3)], L.map(a=>a+10), take(2), log );//"[object Promise]10", "[object Promise]10"]
Java
복사
정상적으로 동작하지 않습니다.

process - L.map에서 Promise를 받도록 적용

const go1 = (a, f) => a instanceof Promise ? a.then(f) : f(a); L.map = curry(function* (f, iter) { for (const a of iter) { yield go1(a,f); } }); ... go([Promise.resolve(1), Promise.resolve(2), Promise.resolve(3)], L.map(a=>a+10), take(2), log ); //실행결과 //0: Promise {<fulfilled>: 11} //1: Promise {<fulfilled>: 12}
Java
복사
이제 프로미스에 map의 인자값으로 받은 함수는 적용되어 11,12과 되었습니다. 이제 Promise안에 들어있는 값을 꺼내보도록 합니다.

process - take에서 Promise내부의 값 꺼내어 반환

const take = curry((l, iter) => { let res = []; iter = iter[Symbol.iterator](); return function recur() { let cur; while (!(cur = iter.next()).done) { const a = cur.value; if (a instanceof Promise) return a.then(a => (res.push(a), res).length === l ? res : recur()) res.push(a); if (res.length === l) return res; } return res; }(); }); go([Promise.resolve(1), Promise.resolve(2), Promise.resolve(3)], L.map(a=>a+10), take(2), log );//[11,12]
Java
복사
기존 take 함수에서 currentValue(cur.value)이 Promise일 경우 내부의 값을 then을 통해 res Array에 넣어준 뒤 재귀적으로 다시 유명함수 recur()를 호출해 문제를 해결하고 있습니다.

Complete - 이제 map, L.map전부 정상 동작

go( [Promise.resolve(1), Promise.resolve(2), Promise.resolve(3)], map(a => Promise.resolve(a+10)), take(2), log )//[11,12] go( [2,3,4], map(a => Promise.resolve(a+10)), take(2), log )//[11,12
Java
복사

Kleisli Composition - L.filter, filter, nop, take

Filter에서 지연평과와 비동기성을 함께 지원하려면 Kleisli Composition을 적용해야 합니다.
go([1, 2, 3, 4, 5, 6], L.map(a=>Promise.resolve(a*a)), L.filter(a => a % 2), take(2), log );//[]
JavaScript
복사
현재 위 코드를 실행하면 정상적으로 동작을 하지 않습니다. 그 이유는 filter로 넘어오는 값이 Promise이기 때문입니다. 그럼 a⇒a%2는 결국 Promise % 2 가 되는 것이기에 정상적으로 동작할 수 없죠. 이 부분을 해결하기 위해서는 L.filter를 살펴보고 수정해야 할 필요가 있습니다.
기존 L.filter
L.filter = curry(function* (f, iter) { for (const a of iter) { if(f(a)) yield a; } });
JavaScript
복사
위 기존 L.filter에 log를 찍어보면 a는 Promise인 것을 확인할 수 있습니다. 그렇기 때문에 우선 a의 Promise내부에서 값을 꺼낼 필요가 있는데, 이전에 만든 go1()을 사용합니다.
L.filter = curry(function* (f, iter) { for (const a of iter) { const b = go1(a, f); if (b) yield a; } }); go([1, 2, 3, 4, 5, 6], L.filter(a => a % 2), take(2), log );//[1, 3] go([1, 2, 3, 4, 5, 6], L.map(a => Promise.resolve(a * a)), L.filter(a => a % 2), take(2), log );//[1, 4]
JavaScript
복사
첫 번째 go 로직은 정상적으로 동작을 하는 것을 확인할 수 있습니다. 하지만, 두 번째 비동기적인 go는 정상적이지 못한 값이 나오는것을 확인할 수 있습니다. 그 이유는 L.filter부분에서 const b = go1(a, f);에서 b를 출력해보면 Promise인 것을 확인할 수 있고, if문에서 Promise인 b는 true로 판단되어 정상적으로 동작하지 않는 것입니다.
그렇기에 L.filter에서 동기적인 상황에서만 정상동작을 하는 것입니다. 그럼 이제 Promise인 경우에도 정상동작하도록 코드를 추가합니다.
const nop = Symbol('nop'); L.filter = curry(function* (f, iter) { for (const a of iter) { const b = go1(a, f); if (b instanceof Promise) yield b.then(b => b ? a : Promise.reject(nop)); else if (b) yield a; } });
JavaScript
복사
b가 Promise 인지 검사를 한 뒤 promise이라면 then을 통해 b내부의 값을 풀어주는데 b가 true이면 a를 resolve 해주는데 a가 Promise이라 할지라도 then내부에서 다른곳으로 전달될때는 풀어져서 전달되기 때문에 괜찮습니다.
그리고 b가 false일 경우에는 아무 행동도 하지않도록 해야하는데, yield 를 통해 generated되서 전달되기 때문에 다음 함수의 인자값으로 들어가지 않도록 해야합니다.
그렇게 하기 위해서 Kleisli Composittion을 활용합니다. 위의 코드에서 b의 값이 없거나 false인 경우 reject를 해주는데, 그냥 reject를 하면 아무것도 안하길 바라는 reject인지 에러발생 reject인지 알 수 없기 때문에 플래그가 될 수 있는 값을 넣어줘야 하는데, nop이라는 구분자를 만들어서 사용하도록 합니다.
참고: nop 구분자를 Symbol로 선언한 이유는 Symbol값도 객체의 프로퍼티 값으로 사용할 수 있는 Symbol값은 유일한 값이므로 Symbol 값을 키로 갖는 프로퍼티는 다른 어떠한 프로퍼티와도 충돌하지 않기 때문에 약속된 구분자로 사용할 수 있어 사용합니다.
추가적으로 take function부분에서도 reject에 대한 처리를 해줄 필요가 있습니다.
const take = curry((l, iter) => { let res = []; iter = iter[Symbol.iterator](); return function recur() { let cur; while (!(cur = iter.next()).done) { const a = cur.value; if (a instanceof Promise) return a.then(a => (res.push(a), res).length === l ? res : recur()) .catch(e=> e === nop ? recur() : Promise.reject(e)); //reject로 nop이 올 경우 다음 코드를 평가한다. res.push(a); if (res.length === l) return res; } return res; }(); });
JavaScript
복사
이처럼 take에서 reject를 통해 catch가 잡혔을 때 해당 paramater(e)가 nop일 경우 무시하고 다음 함수를 평가하도록 합니다.

reduce 에서 nop 지원

take 가 아닌 reduce에서도 nop를 지원하도록 해서 지연성과 Promise를 지원하도록 만들어봅시다.
지연성+Promise를 사용해서 reduce를 호출하면 지금은 에러가 납니다.
go([1, 2, 3, 4, 5], L.map(a => Promise.resolve(a * a)), L.filter(a => Promise.resolve(a % 2)), reduce(add), log );//1[object Promise][object Promise][object Promise] Uncaught (in promise) Symbol(nop)
JavaScript
복사
기존 reduce function
const reduce = curry((f, acc, iter) => { if (!iter) { iter = acc[Symbol.iterator](); acc = iter.next().value; } else { iter = iter[Symbol.iterator](); } return go1(acc, function recur(acc) { let cur while (!(cur = iter.next()).done) { const a = cur.value; acc = f(acc, a); if (acc instanceof Promise) return acc.then(recur); } return acc; }); });
JavaScript
복사
이제 reduce내부의 while내부에서 nop을 캐치하는 부분과 a를 풀어주는 부분을 만들 필요가 있습니다. 저 두 부분만 해결하면 되기에 모듈로 만들어서 호출하도록 합니다.
const reduceF = (acc, a, f) => a instanceof Promise ? //a 가 Promise인지 평가 a.then(a=> f(acc,a), e=>e===nop?acc:Promise.reject(e)): //Promise라면 then을 통해 f(acc,a)를 수행, f(acc,a); const reduce = curry((f, acc, iter) => { if (!iter) { iter = acc[Symbol.iterator](); acc = iter.next().value; } else { iter = iter[Symbol.iterator](); } while (!(cur = iter.next()).done) { acc = reduceF(acc, cur.value, f); if (acc instanceof Promise) return acc.then(recur); } return acc; }); });
JavaScript
복사
reduceF라는 함수를 만들어 a가 promise일 경우 then을 통해 a값을 꺼내어 f(acc,a)를 수행하고 reject(nop)인 경우 acc를 그대로 반환합니다.
promise가 아닌 경우에는 그대로의 f(acc,a)를 수행해 반환합니다.
위 함수중 reduce에서 iter가 없을경우 acc에서 iterator를 꺼내 iter에 넣어주고 첫 번째값을 다시 acc에 넣어주는데 이 역시 모듈화가 가능합니다.
const head = iter => go1(take(1, iter), ([h])=>h); const reduceF = (acc, a, f) => a instanceof Promise ? a.then(a=> f(acc,a), e=>e===nop?acc:Promise.reject(e)): f(acc,a); const reduce = curry((f, acc, iter) => { if (!iter) return reduce(f, head(iter = acc[Symbol.iterator]()), iter); iter = iter[Symbol.iterator](); return go1(acc, function recur(acc) { let cur while (!(cur = iter.next()).done) { acc = reduceF(acc, cur.value, f); if (acc instanceof Promise) return acc.then(recur); } return acc; }); });
Java
복사
head라는 함수를 만들어 인자값으로 받은 iter에서 첫번째 인자값을 take로 가져옵니다. 그 뒤 take는 배열값을 반환하기에 구조분해로 내부값을 꺼내 반환합니다.
head라는 함수로 reduce 함수에서 iter가 없는 경우 재귀적으로 인자를 만들어 다시 호출해 정상동작하게 합니다.

지연 평가 + Promise의 효율성

지연평가 및 즉시평가에서 동기&비동기 모든상황에 맞춰서 map, filter, reduce가 동작하도록 만들어봤습니다.
Promise와 같은 비동기적이기에 비용의 소모가 큰 작업이 로직안에 들어가게 되면, 전체적인 성능부분에서 많은 딜레이 생길 수 있습니다.
비동기상황이 함수대기열에 등록되있는 경우 - 즉시평가
go([1, 2, 3, 4, 5, 6, 7, 8], map(a => { log(a); return new Promise(resolve => setTimeout(() => resolve(a * a), 1000)) }), filter(a => { log(a); return new Promise(resolve => setTimeout(() => resolve(a % 2), 1000)) }), take(2),// 2개를 가져오던 4개를가져오던 시가는 동일하다. log )
JavaScript
복사
비동기상황을 즉시평가로 가져오는 경우, take로 2개를 가져오던 5개를가져오던 혹은 takeAll로 전부 가져오던 소요시간은 같습니다.
비동기상황이 함수대기열에 등록되어있는 경우 - 지연평가
go([1, 2, 3, 4, 5, 6, 7, 8], L.map(a => { log(a); return new Promise(resolve => setTimeout(() => resolve(a * a), 1000)) }), L.filter(a => { log(a); return new Promise(resolve => setTimeout(() => resolve(a % 2), 1000)) }), take(2), // reduce(add), log )
JavaScript
복사
위와같이 지연평가를 하는 L.map, L.filter를 사용하게된다면 필요한 값을 다 구했다면, 그 다음 내용들은 비용소모가 큰 map이나 filter에 들어가 수행되지도 않기때문에 즉시평가와 비교해 성능상 이점을 얻을 수 있습니다.

지연된 함수열을 병렬적으로 평가하기 - C.reduce, C.take[1]

자바스크립트가 수행되는 환경은 브라우저나 Nodejs 가 대표적인데 보통 비동기 IO로 동작합니다. 비동기 IO는 일반적으로 싱글스레드를 기반으로 해서 IO를 동기적으로 처리하기보단 비동기적으로 처리를 해서 하나의 쓰레드에서도 CPU를 점유하는 것들을 효율적으로 IO작업을 하는 최신 트렌드 중 하나입니다. 자바스크립트가 이렇게 싱글스레드로 돌아가기 때문에 병렬 프로그래밍을 할 일이 없다고 생각하지만, 자바스크립트에서 어떤 로직을 제어하는것을 싱글스레드로 비동기적으로 제어하는 것일 뿐 병렬적인 프로그래밍은 충분이 사용할 수 있습니다.
(ex postgreSQL에 query를 날리는 것을 병렬적으로 처리해 동시에 보내 결과를 받아오는 것)
이처럼 Nodejs가 실제로 로직을 직접 수행하는 것이아닌 네트워크나 기타 IO로 작업을 보내놓고 대기하는 시점을 다루는 것을 Nodejs가 하는 것이기 때문에, 특정 처리에 대한 요청들을 동시에 보내서 하나의 로직으로 귀결시키는 로직은 개발자가 자바스크립트에서도 할 수 있고 잘 다뤄야 할 필요도 있습니다.
이와같은 동시성을 처리하는 병렬프로그래밍에 대해 알아봅니다.
예제
const delay500 = a => new Promise(resolve => { console.log('hi'); console.log(''); setTimeout(() => resolve(a), 500) }); console.time("Immediately time:"); go([1, 2, 3, 4, 5], L.map(a => delay500(a * a)), L.filter(a => a % 2), reduce(add), log, ()=>console.timeEnd("Immediately time:"); );//35 Immediately time:: 5019.67724609375ms
JavaScript
복사
L.map에서는 500ms정도의 시간이 걸리는 로직이 수행되고 있습니다.
map이나 filter가 즉시평가라면 평가의 방향은 가로입니다. map에서 [1,2,3,4,5]배열의 전부 평가되어 다음으로 넘어가고 filter에서도 해당 배열들을 전부 필터링한 뒤 reduce에서 add를 통해 합쳐집니다. 그렇기에 즉시평가로 돌아간다면 병렬 프로그래밍을 적용하기 적절치 않습니다. 하지만, 위와같이 지연평가를 사용하게된다면 평가순서는 가로에서 세로로 변경됩니다. 처음 1이 map에서 평가된 후 filter에서 조건검사 후 reduce에서 축약됩니다. 그 다음 배열의 값 '2'가 map, filter를 거치게 되죠. 이처럼 순서가 가로에서 세로로 바뀌면 각각의 값은 독립적으로 수행이 되게되고 이는 병렬 프로그래밍을 사용할 수 있다는 의미가 됩니다.
위 예제에서 reduce는 하나씩 값을 기다려서 값을 더해주고 있습니다. 그렇다면 값들을 전부 보내서 reduce를 수행한다면 어떨까요? 부하는 좀 생길 수 있지만, 더 빠르게 결과를 만들 수 있을 것입니다. 위와같이 병렬 프로그래밍이 적용 가능한 상황에서는 reduce를 그에 맞게 만들어봅니다.
let C = {}; C.reduce = curry((f, acc, iter) => iter ? reduce(f, acc, [...iter]):reduce(f, [...acc])); const delay500 = a => new Promise(resolve => { // console.log('hi'); // console.log(''); setTimeout(() => resolve(a), 1000) }); console.time("Conquer time:"); go([1, 2, 3, 4, 5], L.map(a => delay500(a * a)), L.filter(a => a % 2), C.reduce(add), log, ()=>console.timeEnd("Conquer time:"); );//3535 Conquer time:: 1006.485107421875ms
JavaScript
복사
일반적인 직렬 로직 수행보다 4초가량 빨라진 것을 확인할 수 있습니다.

지연된 함수열을 병렬적으로 평가하기 - C.reduce, C.take[2]

즉시 병렬적으로 처리하기 - C.map, C.filter

즉시, 지연, Promise, 병렬적 조합하기

코드 간단히 정리

Node.js 에서 SQL 병렬 평가로 얻은 효율