diff --git a/docs/stream.md b/docs/stream.md index eff6d4a..abc6eab 100644 --- a/docs/stream.md +++ b/docs/stream.md @@ -16,6 +16,10 @@ const getToken = stream( ); ``` +Note: Property and container dependencies are "locked" while the computation function is run. If +there are any out-of-band updates to a dependency, the value will be set but will not propagate +until after the computation function is complete and the stream's subscribers are notified. + ### Read ```js diff --git a/spec/computed.spec.js b/spec/computed.spec.js index 264eacf..f7c1724 100644 --- a/spec/computed.spec.js +++ b/spec/computed.spec.js @@ -202,6 +202,30 @@ describe('A computed', () => { expect(checker()).toBe(true); }); + it('locks subscribers to delay propagating out-of-band changes', () => { + let aCount = 0; + + const a = prop(1); + a.subscribe(val => { + aCount = val; + }); + + const aDep = computed( + () => { + expect(aCount).toBe(3); + a(2); + expect(aCount).toBe(3); + }, + [a] + ); + + expect(aCount).toBe(0); + a(3); + expect(aCount).toBe(3); + aDep(); + expect(aCount).toBe(2); + }); + it('calls subscriptions in order', () => { let order = ''; diff --git a/spec/container.spec.js b/spec/container.spec.js index 9ce055a..89160a5 100644 --- a/spec/container.spec.js +++ b/spec/container.spec.js @@ -86,6 +86,47 @@ describe('A container', () => { expect(checker()).toBe(true); }); + it('can be locked to delay change propagation', () => { + let aVal = 0; + const a = container([0], c => c[0]); + a.subscribe(val => { + aVal = val[0]; + }); + + expect(aVal).toBe(0); + a[0] = 1; + expect(aVal).toBe(1); + a._lock(); + a[0] = 2; + expect(aVal).toBe(1); + a[0] = 3; + expect(aVal).toBe(1); + a._unlock(); + expect(aVal).toBe(3); + }); + + it('requires the same number of unlocks as locks to resume change propagation', () => { + let aVal = 0; + const a = container([0], c => c[0]); + a.subscribe(val => { + aVal = val[0]; + }); + + expect(aVal).toBe(0); + a[0] = 1; + expect(aVal).toBe(1); + a._lock(); + a._lock(); + a[0] = 2; + expect(aVal).toBe(1); + a[0] = 3; + expect(aVal).toBe(1); + a._unlock(); + expect(aVal).toBe(1); + a._unlock(); + expect(aVal).toBe(3); + }); + it('calls subscriptions in order', () => { let order = ''; diff --git a/spec/property.spec.js b/spec/property.spec.js index 0f2cd4f..33aa844 100644 --- a/spec/property.spec.js +++ b/spec/property.spec.js @@ -91,6 +91,47 @@ describe('A property', () => { expect(checker()).toBe(true); }); + it('can be locked to delay change propagation', () => { + let aVal = 0; + const a = prop(0); + a.subscribe(val => { + aVal = val; + }); + + expect(aVal).toBe(0); + a(1); + expect(aVal).toBe(1); + a._lock(); + a(2); + expect(aVal).toBe(1); + a(3); + expect(aVal).toBe(1); + a._unlock(); + expect(aVal).toBe(3); + }); + + it('requires the same number of unlocks as locks to resume change propagation', () => { + let aVal = 0; + const a = prop(0); + a.subscribe(val => { + aVal = val; + }); + + expect(aVal).toBe(0); + a(1); + expect(aVal).toBe(1); + a._lock(); + a._lock(); + a(2); + expect(aVal).toBe(1); + a(3); + expect(aVal).toBe(1); + a._unlock(); + expect(aVal).toBe(1); + a._unlock(); + expect(aVal).toBe(3); + }); + it('calls subscriptions in order', () => { let order = ''; diff --git a/spec/stream.spec.js b/spec/stream.spec.js index ab9d845..bfcbc02 100644 --- a/spec/stream.spec.js +++ b/spec/stream.spec.js @@ -125,12 +125,34 @@ describe('A stream', () => { // Set b.dirty flag a(2); - return Promise.all([b(), b()]) - .then(([res_1, res_2]) => { - expect(res_1).toEqual(5); - expect(res_2).toEqual(5); - expect(callCount).toEqual(3); - }) + return Promise.all([b(), b()]).then(([res_1, res_2]) => { + expect(res_1).toEqual(5); + expect(res_2).toEqual(5); + expect(callCount).toEqual(3); + }); + }); + + it('locks subscribers to delay propagating out-of-band changes', async () => { + let aCount = 0; + + const a = prop(1); + a.subscribe(val => { + aCount = val; + }); + + const aDep = stream( + async () => { + expect(aCount).toBe(3); + a(2); + expect(aCount).toBe(3); + }, + [a] + ); + + expect(aCount).toBe(0); + a(3); + expect(aCount).toBe(3); + await aDep(); + expect(aCount).toBe(2); }); }); - diff --git a/src/computed.js b/src/computed.js index 8bd6965..0e51bce 100644 --- a/src/computed.js +++ b/src/computed.js @@ -5,31 +5,34 @@ export const hashableComputed = hash => (fn, dependencies = []) => { let isDirty = true; let val; let oldId; + const params = dependencies.map(d => (d._lock ? d._lock : d)); + const unlockableDeps = dependencies.map(d => d._unlock).filter(id); // Compute new value, call subscribers if changed. const accessor = function _computed() { if (isDirty) { - const newVal = fn.apply(null, dependencies.map(call)); + const newVal = fn.apply(null, params.map(call)); isDirty = false; const newId = hash(newVal); if (oldId !== newId) { oldId = newId; val = newVal; - accessor.fire(val); + accessor._fire(val); } + unlockableDeps.forEach(call); } return val; }; // Add child nodes to the logic graph (value-based) accessor.subscribe = registerSubscriptions(subscribers); - accessor.fire = registerFire(subscribers); + accessor._fire = registerFire(subscribers); // Receive dirty flag from parent logic node (dependency). Pass it down. - accessor.setDirty = function setDirty() { + accessor._setDirty = function setDirty() { if (!isDirty) { isDirty = true; - subscribers.forEach(s => s.setDirty && s.setDirty()); + subscribers.forEach(s => s._setDirty && s._setDirty()); } return subscribers.length && accessor; }; @@ -45,7 +48,7 @@ export const hashableComputed = hash => (fn, dependencies = []) => { subscribers = []; }; - const dependentSubscriptions = dependencies.map(d => d.subscribe(accessor.setDirty)); + const dependentSubscriptions = dependencies.map(d => d.subscribe(accessor._setDirty)); return accessor; }; diff --git a/src/container.js b/src/container.js index af348dd..402ca94 100644 --- a/src/container.js +++ b/src/container.js @@ -3,20 +3,33 @@ import { registerSubscriptions, registerFire } from './util.js'; export const hashableContainer = hash => store => { let subscribers = []; let id = hash && hash(store); + let lockCount = 0; const containerMethods = { subscribe: registerSubscriptions(subscribers), - fire: registerFire(subscribers), + _fire: registerFire(subscribers), unsubscribeAll: () => { subscribers = []; + }, + _lock: () => { + lockCount += 1; + return p; + }, + _unlock: () => { + if (lockCount && --lockCount === 0) { + checkUpdate(store); + } } }; function checkUpdate(target) { + if (lockCount) { + return; + } let newId = hash && hash(target); if (!hash || id !== newId) { id = newId; - containerMethods.fire(target); + containerMethods._fire(target); } } diff --git a/src/property.js b/src/property.js index d9fd808..71416e0 100644 --- a/src/property.js +++ b/src/property.js @@ -3,19 +3,31 @@ import { id, registerSubscriptions, registerFire } from './util.js'; export const hashableProperty = hash => store => { let subscribers = []; let oldId = hash(store); + let lockCount = 0; const accessor = function _prop(newVal) { const newId = hash(newVal); if (newVal !== undefined && oldId !== newId) { - oldId = newId; store = newVal; - accessor.fire(store); + if (!lockCount) { + oldId = newId; + accessor._fire(store); + } } return store; }; accessor.subscribe = registerSubscriptions(subscribers); - accessor.fire = registerFire(subscribers); accessor.unsubscribeAll = () => (subscribers = []); + accessor._fire = registerFire(subscribers); + accessor._lock = () => { + lockCount += 1; + return accessor(); + }; + accessor._unlock = () => { + if (lockCount && --lockCount === 0) { + accessor(store); + } + }; return accessor; }; diff --git a/src/stream.js b/src/stream.js index 65b61f6..2e071cc 100644 --- a/src/stream.js +++ b/src/stream.js @@ -5,6 +5,8 @@ export const hashableStream = hash => (fn, dependencies = []) => { let isDirty = true; let val; let oldId; + const params = dependencies.map(d => (d._lock ? d._lock : d)); + const unlockableDeps = dependencies.map(d => d._unlock).filter(id); let currentProcess; // Compute new value, call subscribers if changed. @@ -13,7 +15,7 @@ export const hashableStream = hash => (fn, dependencies = []) => { return Promise.resolve(val); } if (!currentProcess) { - currentProcess = Promise.all(dependencies.map(call)) + currentProcess = Promise.all(params.map(call)) .then(params => fn.apply(null, params)) .then(res => { const newId = hash(res); @@ -21,8 +23,9 @@ export const hashableStream = hash => (fn, dependencies = []) => { if (oldId !== newId) { oldId = newId; val = res; - accessor.fire(val); + accessor._fire(val); } + unlockableDeps.forEach(call); return val; }) .finally(_ => { @@ -35,13 +38,13 @@ export const hashableStream = hash => (fn, dependencies = []) => { // Add child nodes to the logic graph (value-based) accessor.subscribe = registerSubscriptions(subscribers); - accessor.fire = registerFire(subscribers); + accessor._fire = registerFire(subscribers); // Receive dirty flag from parent logic node (dependency). Pass it down. - accessor.setDirty = function setDirty() { + accessor._setDirty = function setDirty() { if (!isDirty) { isDirty = true; - subscribers.forEach(s => s.setDirty && s.setDirty()); + subscribers.forEach(s => s._setDirty && s._setDirty()); } return subscribers.length && accessor; }; @@ -57,7 +60,7 @@ export const hashableStream = hash => (fn, dependencies = []) => { subscribers = []; }; - const dependentSubscriptions = dependencies.map(d => d.subscribe(accessor.setDirty)); + const dependentSubscriptions = dependencies.map(d => d.subscribe(accessor._setDirty)); return accessor; };