diff --git a/README.md b/README.md index a5f100e..97b0e41 100644 --- a/README.md +++ b/README.md @@ -156,7 +156,9 @@ const intersection = computed(_intersection, [a, b], hashSet); # [stream](./src/stream.js) `stream` is a computed that works asynchronously. Dependencies can be synchronous or asynchronous -functions but the hash function remains synchronous. +functions but the hash function remains synchronous. Also calling a stream multiple overlapping +times will return the same promise and result (so long as dependencies do not change in the time +gap). ## Usage diff --git a/package.json b/package.json index f8d72d9..189d5a1 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "frptools", - "version": "3.2.0", + "version": "3.2.1", "description": "Observable Property and Computed data streams", "main": "src/index.js", "files": ["src"], diff --git a/spec/stream.spec.js b/spec/stream.spec.js index ca97521..33b44bf 100644 --- a/spec/stream.spec.js +++ b/spec/stream.spec.js @@ -97,4 +97,39 @@ describe('A stream', () => { done(); }, 40); }); + + it('only computes once for overlapping calls', async done => { + let callCount = 0; + async function delayRun(a) { + return new Promise(resolve => + setTimeout(() => { + callCount += 1; + resolve(callCount + a); + }, 10) + ); + } + const a = prop(0); + const b = stream(delayRun, [a]); + + expect(await b()).toEqual(1); + expect(callCount).toEqual(1); + + a(1); + expect(await b()).toEqual(3); + expect(callCount).toEqual(2); + + // Just calling sequentially should not re-evaluate + expect(await b()).toEqual(3); + expect(callCount).toEqual(2); + + // Set b.dirty flag + a(2); + Promise.all([b(), b()]) + .then(([res_1, res_2]) => { + expect(res_1).toEqual(5); + expect(res_2).toEqual(5); + expect(callCount).toEqual(3); + }) + .finally(done); + }); }); diff --git a/src/stream.js b/src/stream.js index a64020f..d39f72d 100644 --- a/src/stream.js +++ b/src/stream.js @@ -5,20 +5,32 @@ export function stream(fn, dependencies = [], hash = id) { let isDirty = true; let val; let oldId; + let currentProcess; // Compute new value, call subscribers if changed. - const accessor = async function _computed() { - if (isDirty) { - const newVal = await fn.apply(null, await Promise.all(dependencies.map(call))); - isDirty = false; - const newId = hash(newVal); - if (oldId !== newId) { - oldId = newId; - val = newVal; - accessor.fire(val); - } + const accessor = function _stream() { + if (!isDirty) { + return Promise.resolve(val); } - return val; + if (!currentProcess) { + currentProcess = Promise.all(dependencies.map(call)) + .then(params => fn.apply(null, params)) + .then(res => { + const newId = hash(res); + isDirty = false; + if (oldId !== newId) { + oldId = newId; + val = res; + accessor.fire(val); + } + return val; + }) + .finally(_ => { + isDirty = false; + currentProcess = null; + }); + } + return currentProcess; }; // Add child nodes to the logic graph (value-based)