Add stream type to frptools.
(prettier got ahold of this commit)
This commit is contained in:
parent
b2de97f2b3
commit
5851d9c4fe
28
README.md
28
README.md
@ -1,7 +1,7 @@
|
||||
# FRP tools
|
||||
|
||||
Property, Container and Computed value stores designed to work together for storing discrete and
|
||||
derived state.
|
||||
Property, Container, Computed and Stream value stores designed to work together for storing discrete
|
||||
and derived state.
|
||||
|
||||
# [property](./src/property.js)
|
||||
|
||||
@ -153,6 +153,30 @@ const b = prop(new Set([2, 3]), hashSet);
|
||||
const intersection = computed(_intersection, [a, b], hashSet);
|
||||
```
|
||||
|
||||
# [stream](./src/stream.js)
|
||||
|
||||
`stream` is a computed that works asynchronously. Dependencies can be synchronous or asynchronous
|
||||
functions but the hash function remains synchronous.
|
||||
|
||||
## Usage
|
||||
|
||||
### Creation
|
||||
|
||||
```js
|
||||
const getToken = stream(
|
||||
async tokenUrl => fetch(tokenUrl), // computation function
|
||||
[getTokenUrl] // array of dependencies, can be a property, computed or stream
|
||||
);
|
||||
```
|
||||
|
||||
### Read
|
||||
|
||||
```js
|
||||
if (await getToken()) { /* do stuff with the token */ }
|
||||
```
|
||||
|
||||
Call it to receive the stored value, recomputing if necessary.
|
||||
|
||||
# [container](./src/container.js)
|
||||
|
||||
`container` is a wrapper around any container type (Object, Set, Map, or Array) while monitoring
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "frptools",
|
||||
"version": "3.1.1",
|
||||
"version": "3.2.0",
|
||||
"description": "Observable Property and Computed data streams",
|
||||
"main": "src/index.js",
|
||||
"files": ["src"],
|
||||
|
||||
100
spec/stream.spec.js
Normal file
100
spec/stream.spec.js
Normal file
@ -0,0 +1,100 @@
|
||||
import { prop, computed, stream } from '../src/index.js';
|
||||
import { dirtyMock, hashSet } from '../src/testUtil.js';
|
||||
|
||||
describe('A stream', () => {
|
||||
const add = (a, b) => a + b;
|
||||
const square = a => a * a;
|
||||
|
||||
async function delayAdd(a, b) {
|
||||
return new Promise(resolve =>
|
||||
setTimeout(() => {
|
||||
resolve(a + b);
|
||||
}, 30)
|
||||
);
|
||||
}
|
||||
async function delaySquare(a) {
|
||||
return new Promise(resolve =>
|
||||
setTimeout(() => {
|
||||
resolve(a * a);
|
||||
}, 30)
|
||||
);
|
||||
}
|
||||
|
||||
it('accepts prop, computed and stream dependencies', async done => {
|
||||
const a = prop(0);
|
||||
const b = computed(square, [a]);
|
||||
const c = stream(delaySquare, [a]);
|
||||
const d = stream(delayAdd, [a, c]);
|
||||
const e = stream(delaySquare, [b]);
|
||||
|
||||
expect(await c()).toEqual(0);
|
||||
expect(await d()).toEqual(0);
|
||||
expect(await e()).toEqual(0);
|
||||
|
||||
a(1);
|
||||
expect(await c()).toEqual(1);
|
||||
expect(await d()).toEqual(2);
|
||||
expect(await e()).toEqual(1);
|
||||
|
||||
a(2);
|
||||
expect(await c()).toEqual(4);
|
||||
expect(await d()).toEqual(6);
|
||||
expect(await e()).toEqual(16);
|
||||
|
||||
a(3);
|
||||
expect(await c()).toEqual(9);
|
||||
expect(await d()).toEqual(12);
|
||||
expect(await e()).toEqual(81);
|
||||
done();
|
||||
});
|
||||
|
||||
it('computes automatically when subscribed', async done => {
|
||||
let runCount = 0;
|
||||
let subRunCount = 0;
|
||||
let currentValue = 1;
|
||||
const a = prop(0);
|
||||
const b = stream(
|
||||
async val => {
|
||||
runCount += 1;
|
||||
expect(val).toEqual(currentValue);
|
||||
return new Promise(resolve => setTimeout(() => resolve(val * val), 30));
|
||||
},
|
||||
[a]
|
||||
);
|
||||
|
||||
// b does not evaluate
|
||||
a(1);
|
||||
expect(runCount).toEqual(0);
|
||||
// b evaluates
|
||||
expect(await b()).toEqual(1);
|
||||
expect(runCount).toEqual(1);
|
||||
// b does not evaluate
|
||||
expect(await b()).toEqual(1);
|
||||
expect(runCount).toEqual(1);
|
||||
|
||||
const cancelSubscription = b.subscribe(val => {
|
||||
subRunCount += 1;
|
||||
expect(val).toEqual(currentValue * currentValue);
|
||||
});
|
||||
|
||||
currentValue = 3;
|
||||
// b evaluates
|
||||
a(3);
|
||||
|
||||
// b is triggered to update but hasn't yet
|
||||
expect(runCount).toEqual(1);
|
||||
expect(subRunCount).toEqual(0);
|
||||
|
||||
setTimeout(async () => {
|
||||
// b should have updated now
|
||||
expect(runCount).toEqual(2);
|
||||
expect(subRunCount).toEqual(1);
|
||||
|
||||
// b does not evaluate
|
||||
expect(await b()).toEqual(9);
|
||||
expect(runCount).toEqual(2);
|
||||
expect(subRunCount).toEqual(1);
|
||||
done();
|
||||
}, 40);
|
||||
});
|
||||
});
|
||||
@ -1,4 +1,5 @@
|
||||
export { prop } from './property.js';
|
||||
export { computed } from './computed.js';
|
||||
export { stream } from './stream.js';
|
||||
export { container } from './container.js';
|
||||
export { call, id, pick } from './util.js';
|
||||
|
||||
51
src/stream.js
Normal file
51
src/stream.js
Normal file
@ -0,0 +1,51 @@
|
||||
import { id, registerFire, registerSubscriptions, call } from './util.js';
|
||||
|
||||
export function stream(fn, dependencies = [], hash = id) {
|
||||
let subscribers = [];
|
||||
let isDirty = true;
|
||||
let val;
|
||||
let oldId;
|
||||
|
||||
// Compute new value, call subscribers if changed.
|
||||
const accessor = async function _computed() {
|
||||
if (isDirty) {
|
||||
const newVal = await fn.apply(null, await Promise.all(dependencies.map(call)));
|
||||
isDirty = false;
|
||||
const newId = hash(newVal);
|
||||
if (oldId !== newId) {
|
||||
oldId = newId;
|
||||
val = newVal;
|
||||
accessor.fire(val);
|
||||
}
|
||||
}
|
||||
return val;
|
||||
};
|
||||
|
||||
// Add child nodes to the logic graph (value-based)
|
||||
accessor.subscribe = registerSubscriptions(subscribers);
|
||||
accessor.fire = registerFire(subscribers);
|
||||
|
||||
// Receive dirty flag from parent logic node (dependency). Pass it down.
|
||||
accessor.setDirty = function setDirty() {
|
||||
if (!isDirty) {
|
||||
isDirty = true;
|
||||
subscribers.forEach(s => s.setDirty && s.setDirty());
|
||||
}
|
||||
return subscribers.length && accessor;
|
||||
};
|
||||
|
||||
// Remove this node from the logic graph completely
|
||||
accessor.detach = () => {
|
||||
subscribers = [];
|
||||
dependentSubscriptions.forEach(call);
|
||||
};
|
||||
|
||||
// Remove child nodes from the logic graph
|
||||
accessor.unsubscribeAll = () => {
|
||||
subscribers = [];
|
||||
};
|
||||
|
||||
const dependentSubscriptions = dependencies.map(d => d.subscribe(accessor.setDirty));
|
||||
|
||||
return accessor;
|
||||
}
|
||||
Reference in New Issue
Block a user