Add input primitive locking to avoid infiniloops
This commit is contained in:
parent
921b51e2b3
commit
93ded90b97
@ -16,6 +16,10 @@ const getToken = stream(
|
|||||||
);
|
);
|
||||||
```
|
```
|
||||||
|
|
||||||
|
Note: Property and container dependencies are "locked" while the computation function is run. If
|
||||||
|
there are any out-of-band updates to a dependency, the value will be set but will not propagate
|
||||||
|
until after the computation function is complete and the stream's subscribers are notified.
|
||||||
|
|
||||||
### Read
|
### Read
|
||||||
|
|
||||||
```js
|
```js
|
||||||
|
|||||||
@ -202,6 +202,30 @@ describe('A computed', () => {
|
|||||||
expect(checker()).toBe(true);
|
expect(checker()).toBe(true);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('locks subscribers to delay propagating out-of-band changes', () => {
|
||||||
|
let aCount = 0;
|
||||||
|
|
||||||
|
const a = prop(1);
|
||||||
|
a.subscribe(val => {
|
||||||
|
aCount = val;
|
||||||
|
});
|
||||||
|
|
||||||
|
const aDep = computed(
|
||||||
|
() => {
|
||||||
|
expect(aCount).toBe(3);
|
||||||
|
a(2);
|
||||||
|
expect(aCount).toBe(3);
|
||||||
|
},
|
||||||
|
[a]
|
||||||
|
);
|
||||||
|
|
||||||
|
expect(aCount).toBe(0);
|
||||||
|
a(3);
|
||||||
|
expect(aCount).toBe(3);
|
||||||
|
aDep();
|
||||||
|
expect(aCount).toBe(2);
|
||||||
|
});
|
||||||
|
|
||||||
it('calls subscriptions in order', () => {
|
it('calls subscriptions in order', () => {
|
||||||
let order = '';
|
let order = '';
|
||||||
|
|
||||||
|
|||||||
@ -86,6 +86,47 @@ describe('A container', () => {
|
|||||||
expect(checker()).toBe(true);
|
expect(checker()).toBe(true);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('can be locked to delay change propagation', () => {
|
||||||
|
let aVal = 0;
|
||||||
|
const a = container([0], c => c[0]);
|
||||||
|
a.subscribe(val => {
|
||||||
|
aVal = val[0];
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(aVal).toBe(0);
|
||||||
|
a[0] = 1;
|
||||||
|
expect(aVal).toBe(1);
|
||||||
|
a._lock();
|
||||||
|
a[0] = 2;
|
||||||
|
expect(aVal).toBe(1);
|
||||||
|
a[0] = 3;
|
||||||
|
expect(aVal).toBe(1);
|
||||||
|
a._unlock();
|
||||||
|
expect(aVal).toBe(3);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('requires the same number of unlocks as locks to resume change propagation', () => {
|
||||||
|
let aVal = 0;
|
||||||
|
const a = container([0], c => c[0]);
|
||||||
|
a.subscribe(val => {
|
||||||
|
aVal = val[0];
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(aVal).toBe(0);
|
||||||
|
a[0] = 1;
|
||||||
|
expect(aVal).toBe(1);
|
||||||
|
a._lock();
|
||||||
|
a._lock();
|
||||||
|
a[0] = 2;
|
||||||
|
expect(aVal).toBe(1);
|
||||||
|
a[0] = 3;
|
||||||
|
expect(aVal).toBe(1);
|
||||||
|
a._unlock();
|
||||||
|
expect(aVal).toBe(1);
|
||||||
|
a._unlock();
|
||||||
|
expect(aVal).toBe(3);
|
||||||
|
});
|
||||||
|
|
||||||
it('calls subscriptions in order', () => {
|
it('calls subscriptions in order', () => {
|
||||||
let order = '';
|
let order = '';
|
||||||
|
|
||||||
|
|||||||
@ -91,6 +91,47 @@ describe('A property', () => {
|
|||||||
expect(checker()).toBe(true);
|
expect(checker()).toBe(true);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('can be locked to delay change propagation', () => {
|
||||||
|
let aVal = 0;
|
||||||
|
const a = prop(0);
|
||||||
|
a.subscribe(val => {
|
||||||
|
aVal = val;
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(aVal).toBe(0);
|
||||||
|
a(1);
|
||||||
|
expect(aVal).toBe(1);
|
||||||
|
a._lock();
|
||||||
|
a(2);
|
||||||
|
expect(aVal).toBe(1);
|
||||||
|
a(3);
|
||||||
|
expect(aVal).toBe(1);
|
||||||
|
a._unlock();
|
||||||
|
expect(aVal).toBe(3);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('requires the same number of unlocks as locks to resume change propagation', () => {
|
||||||
|
let aVal = 0;
|
||||||
|
const a = prop(0);
|
||||||
|
a.subscribe(val => {
|
||||||
|
aVal = val;
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(aVal).toBe(0);
|
||||||
|
a(1);
|
||||||
|
expect(aVal).toBe(1);
|
||||||
|
a._lock();
|
||||||
|
a._lock();
|
||||||
|
a(2);
|
||||||
|
expect(aVal).toBe(1);
|
||||||
|
a(3);
|
||||||
|
expect(aVal).toBe(1);
|
||||||
|
a._unlock();
|
||||||
|
expect(aVal).toBe(1);
|
||||||
|
a._unlock();
|
||||||
|
expect(aVal).toBe(3);
|
||||||
|
});
|
||||||
|
|
||||||
it('calls subscriptions in order', () => {
|
it('calls subscriptions in order', () => {
|
||||||
let order = '';
|
let order = '';
|
||||||
|
|
||||||
|
|||||||
@ -125,12 +125,34 @@ describe('A stream', () => {
|
|||||||
|
|
||||||
// Set b.dirty flag
|
// Set b.dirty flag
|
||||||
a(2);
|
a(2);
|
||||||
return Promise.all([b(), b()])
|
return Promise.all([b(), b()]).then(([res_1, res_2]) => {
|
||||||
.then(([res_1, res_2]) => {
|
|
||||||
expect(res_1).toEqual(5);
|
expect(res_1).toEqual(5);
|
||||||
expect(res_2).toEqual(5);
|
expect(res_2).toEqual(5);
|
||||||
expect(callCount).toEqual(3);
|
expect(callCount).toEqual(3);
|
||||||
})
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('locks subscribers to delay propagating out-of-band changes', async () => {
|
||||||
|
let aCount = 0;
|
||||||
|
|
||||||
|
const a = prop(1);
|
||||||
|
a.subscribe(val => {
|
||||||
|
aCount = val;
|
||||||
|
});
|
||||||
|
|
||||||
|
const aDep = stream(
|
||||||
|
async () => {
|
||||||
|
expect(aCount).toBe(3);
|
||||||
|
a(2);
|
||||||
|
expect(aCount).toBe(3);
|
||||||
|
},
|
||||||
|
[a]
|
||||||
|
);
|
||||||
|
|
||||||
|
expect(aCount).toBe(0);
|
||||||
|
a(3);
|
||||||
|
expect(aCount).toBe(3);
|
||||||
|
await aDep();
|
||||||
|
expect(aCount).toBe(2);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|||||||
@ -5,31 +5,34 @@ export const hashableComputed = hash => (fn, dependencies = []) => {
|
|||||||
let isDirty = true;
|
let isDirty = true;
|
||||||
let val;
|
let val;
|
||||||
let oldId;
|
let oldId;
|
||||||
|
const params = dependencies.map(d => (d._lock ? d._lock : d));
|
||||||
|
const unlockableDeps = dependencies.map(d => d._unlock).filter(id);
|
||||||
|
|
||||||
// Compute new value, call subscribers if changed.
|
// Compute new value, call subscribers if changed.
|
||||||
const accessor = function _computed() {
|
const accessor = function _computed() {
|
||||||
if (isDirty) {
|
if (isDirty) {
|
||||||
const newVal = fn.apply(null, dependencies.map(call));
|
const newVal = fn.apply(null, params.map(call));
|
||||||
isDirty = false;
|
isDirty = false;
|
||||||
const newId = hash(newVal);
|
const newId = hash(newVal);
|
||||||
if (oldId !== newId) {
|
if (oldId !== newId) {
|
||||||
oldId = newId;
|
oldId = newId;
|
||||||
val = newVal;
|
val = newVal;
|
||||||
accessor.fire(val);
|
accessor._fire(val);
|
||||||
}
|
}
|
||||||
|
unlockableDeps.forEach(call);
|
||||||
}
|
}
|
||||||
return val;
|
return val;
|
||||||
};
|
};
|
||||||
|
|
||||||
// Add child nodes to the logic graph (value-based)
|
// Add child nodes to the logic graph (value-based)
|
||||||
accessor.subscribe = registerSubscriptions(subscribers);
|
accessor.subscribe = registerSubscriptions(subscribers);
|
||||||
accessor.fire = registerFire(subscribers);
|
accessor._fire = registerFire(subscribers);
|
||||||
|
|
||||||
// Receive dirty flag from parent logic node (dependency). Pass it down.
|
// Receive dirty flag from parent logic node (dependency). Pass it down.
|
||||||
accessor.setDirty = function setDirty() {
|
accessor._setDirty = function setDirty() {
|
||||||
if (!isDirty) {
|
if (!isDirty) {
|
||||||
isDirty = true;
|
isDirty = true;
|
||||||
subscribers.forEach(s => s.setDirty && s.setDirty());
|
subscribers.forEach(s => s._setDirty && s._setDirty());
|
||||||
}
|
}
|
||||||
return subscribers.length && accessor;
|
return subscribers.length && accessor;
|
||||||
};
|
};
|
||||||
@ -45,7 +48,7 @@ export const hashableComputed = hash => (fn, dependencies = []) => {
|
|||||||
subscribers = [];
|
subscribers = [];
|
||||||
};
|
};
|
||||||
|
|
||||||
const dependentSubscriptions = dependencies.map(d => d.subscribe(accessor.setDirty));
|
const dependentSubscriptions = dependencies.map(d => d.subscribe(accessor._setDirty));
|
||||||
|
|
||||||
return accessor;
|
return accessor;
|
||||||
};
|
};
|
||||||
|
|||||||
@ -3,20 +3,33 @@ import { registerSubscriptions, registerFire } from './util.js';
|
|||||||
export const hashableContainer = hash => store => {
|
export const hashableContainer = hash => store => {
|
||||||
let subscribers = [];
|
let subscribers = [];
|
||||||
let id = hash && hash(store);
|
let id = hash && hash(store);
|
||||||
|
let lockCount = 0;
|
||||||
|
|
||||||
const containerMethods = {
|
const containerMethods = {
|
||||||
subscribe: registerSubscriptions(subscribers),
|
subscribe: registerSubscriptions(subscribers),
|
||||||
fire: registerFire(subscribers),
|
_fire: registerFire(subscribers),
|
||||||
unsubscribeAll: () => {
|
unsubscribeAll: () => {
|
||||||
subscribers = [];
|
subscribers = [];
|
||||||
|
},
|
||||||
|
_lock: () => {
|
||||||
|
lockCount += 1;
|
||||||
|
return p;
|
||||||
|
},
|
||||||
|
_unlock: () => {
|
||||||
|
if (lockCount && --lockCount === 0) {
|
||||||
|
checkUpdate(store);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
function checkUpdate(target) {
|
function checkUpdate(target) {
|
||||||
|
if (lockCount) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
let newId = hash && hash(target);
|
let newId = hash && hash(target);
|
||||||
if (!hash || id !== newId) {
|
if (!hash || id !== newId) {
|
||||||
id = newId;
|
id = newId;
|
||||||
containerMethods.fire(target);
|
containerMethods._fire(target);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -3,19 +3,31 @@ import { id, registerSubscriptions, registerFire } from './util.js';
|
|||||||
export const hashableProperty = hash => store => {
|
export const hashableProperty = hash => store => {
|
||||||
let subscribers = [];
|
let subscribers = [];
|
||||||
let oldId = hash(store);
|
let oldId = hash(store);
|
||||||
|
let lockCount = 0;
|
||||||
|
|
||||||
const accessor = function _prop(newVal) {
|
const accessor = function _prop(newVal) {
|
||||||
const newId = hash(newVal);
|
const newId = hash(newVal);
|
||||||
if (newVal !== undefined && oldId !== newId) {
|
if (newVal !== undefined && oldId !== newId) {
|
||||||
oldId = newId;
|
|
||||||
store = newVal;
|
store = newVal;
|
||||||
accessor.fire(store);
|
if (!lockCount) {
|
||||||
|
oldId = newId;
|
||||||
|
accessor._fire(store);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return store;
|
return store;
|
||||||
};
|
};
|
||||||
accessor.subscribe = registerSubscriptions(subscribers);
|
accessor.subscribe = registerSubscriptions(subscribers);
|
||||||
accessor.fire = registerFire(subscribers);
|
|
||||||
accessor.unsubscribeAll = () => (subscribers = []);
|
accessor.unsubscribeAll = () => (subscribers = []);
|
||||||
|
accessor._fire = registerFire(subscribers);
|
||||||
|
accessor._lock = () => {
|
||||||
|
lockCount += 1;
|
||||||
|
return accessor();
|
||||||
|
};
|
||||||
|
accessor._unlock = () => {
|
||||||
|
if (lockCount && --lockCount === 0) {
|
||||||
|
accessor(store);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
return accessor;
|
return accessor;
|
||||||
};
|
};
|
||||||
|
|||||||
@ -5,6 +5,8 @@ export const hashableStream = hash => (fn, dependencies = []) => {
|
|||||||
let isDirty = true;
|
let isDirty = true;
|
||||||
let val;
|
let val;
|
||||||
let oldId;
|
let oldId;
|
||||||
|
const params = dependencies.map(d => (d._lock ? d._lock : d));
|
||||||
|
const unlockableDeps = dependencies.map(d => d._unlock).filter(id);
|
||||||
let currentProcess;
|
let currentProcess;
|
||||||
|
|
||||||
// Compute new value, call subscribers if changed.
|
// Compute new value, call subscribers if changed.
|
||||||
@ -13,7 +15,7 @@ export const hashableStream = hash => (fn, dependencies = []) => {
|
|||||||
return Promise.resolve(val);
|
return Promise.resolve(val);
|
||||||
}
|
}
|
||||||
if (!currentProcess) {
|
if (!currentProcess) {
|
||||||
currentProcess = Promise.all(dependencies.map(call))
|
currentProcess = Promise.all(params.map(call))
|
||||||
.then(params => fn.apply(null, params))
|
.then(params => fn.apply(null, params))
|
||||||
.then(res => {
|
.then(res => {
|
||||||
const newId = hash(res);
|
const newId = hash(res);
|
||||||
@ -21,8 +23,9 @@ export const hashableStream = hash => (fn, dependencies = []) => {
|
|||||||
if (oldId !== newId) {
|
if (oldId !== newId) {
|
||||||
oldId = newId;
|
oldId = newId;
|
||||||
val = res;
|
val = res;
|
||||||
accessor.fire(val);
|
accessor._fire(val);
|
||||||
}
|
}
|
||||||
|
unlockableDeps.forEach(call);
|
||||||
return val;
|
return val;
|
||||||
})
|
})
|
||||||
.finally(_ => {
|
.finally(_ => {
|
||||||
@ -35,13 +38,13 @@ export const hashableStream = hash => (fn, dependencies = []) => {
|
|||||||
|
|
||||||
// Add child nodes to the logic graph (value-based)
|
// Add child nodes to the logic graph (value-based)
|
||||||
accessor.subscribe = registerSubscriptions(subscribers);
|
accessor.subscribe = registerSubscriptions(subscribers);
|
||||||
accessor.fire = registerFire(subscribers);
|
accessor._fire = registerFire(subscribers);
|
||||||
|
|
||||||
// Receive dirty flag from parent logic node (dependency). Pass it down.
|
// Receive dirty flag from parent logic node (dependency). Pass it down.
|
||||||
accessor.setDirty = function setDirty() {
|
accessor._setDirty = function setDirty() {
|
||||||
if (!isDirty) {
|
if (!isDirty) {
|
||||||
isDirty = true;
|
isDirty = true;
|
||||||
subscribers.forEach(s => s.setDirty && s.setDirty());
|
subscribers.forEach(s => s._setDirty && s._setDirty());
|
||||||
}
|
}
|
||||||
return subscribers.length && accessor;
|
return subscribers.length && accessor;
|
||||||
};
|
};
|
||||||
@ -57,7 +60,7 @@ export const hashableStream = hash => (fn, dependencies = []) => {
|
|||||||
subscribers = [];
|
subscribers = [];
|
||||||
};
|
};
|
||||||
|
|
||||||
const dependentSubscriptions = dependencies.map(d => d.subscribe(accessor.setDirty));
|
const dependentSubscriptions = dependencies.map(d => d.subscribe(accessor._setDirty));
|
||||||
|
|
||||||
return accessor;
|
return accessor;
|
||||||
};
|
};
|
||||||
|
|||||||
Reference in New Issue
Block a user