Add stream type to frptools.

(prettier got ahold of this commit)
This commit is contained in:
Timothy Farrell 2018-05-31 15:48:18 -05:00
parent cac220a127
commit 4c20e1400e
5 changed files with 179 additions and 3 deletions

View File

@ -1,7 +1,7 @@
# FRP tools
Property, Container and Computed value stores designed to work together for storing discrete and
derived state.
Property, Container, Computed and Stream value stores designed to work together for storing discrete
and derived state.
# [property](./src/property.js)
@ -153,6 +153,30 @@ const b = prop(new Set([2, 3]), hashSet);
const intersection = computed(_intersection, [a, b], hashSet);
```
# [stream](./src/stream.js)
`stream` is a computed that works asynchronously. Dependencies can be synchronous or asynchronous
functions but the hash function remains synchronous.
## Usage
### Creation
```js
const getToken = stream(
async tokenUrl => fetch(tokenUrl), // computation function
[getTokenUrl] // array of dependencies, can be a property, computed or stream
);
```
### Read
```js
if (await getToken()) { /* do stuff with the token */ }
```
Call it to receive the stored value, recomputing if necessary.
# [container](./src/container.js)
`container` is a wrapper around any container type (Object, Set, Map, or Array) while monitoring

View File

@ -1,6 +1,6 @@
{
"name": "frptools",
"version": "3.1.1",
"version": "3.2.0",
"description": "Observable Property and Computed data streams",
"main": "src/index.js",
"files": ["src"],

View File

@ -0,0 +1,100 @@
import { prop, computed, stream } from '../src/index.js';
import { dirtyMock, hashSet } from '../src/testUtil.js';
describe('A stream', () => {
const add = (a, b) => a + b;
const square = a => a * a;
async function delayAdd(a, b) {
return new Promise(resolve =>
setTimeout(() => {
resolve(a + b);
}, 30)
);
}
async function delaySquare(a) {
return new Promise(resolve =>
setTimeout(() => {
resolve(a * a);
}, 30)
);
}
it('accepts prop, computed and stream dependencies', async done => {
const a = prop(0);
const b = computed(square, [a]);
const c = stream(delaySquare, [a]);
const d = stream(delayAdd, [a, c]);
const e = stream(delaySquare, [b]);
expect(await c()).toEqual(0);
expect(await d()).toEqual(0);
expect(await e()).toEqual(0);
a(1);
expect(await c()).toEqual(1);
expect(await d()).toEqual(2);
expect(await e()).toEqual(1);
a(2);
expect(await c()).toEqual(4);
expect(await d()).toEqual(6);
expect(await e()).toEqual(16);
a(3);
expect(await c()).toEqual(9);
expect(await d()).toEqual(12);
expect(await e()).toEqual(81);
done();
});
it('computes automatically when subscribed', async done => {
let runCount = 0;
let subRunCount = 0;
let currentValue = 1;
const a = prop(0);
const b = stream(
async val => {
runCount += 1;
expect(val).toEqual(currentValue);
return new Promise(resolve => setTimeout(() => resolve(val * val), 30));
},
[a]
);
// b does not evaluate
a(1);
expect(runCount).toEqual(0);
// b evaluates
expect(await b()).toEqual(1);
expect(runCount).toEqual(1);
// b does not evaluate
expect(await b()).toEqual(1);
expect(runCount).toEqual(1);
const cancelSubscription = b.subscribe(val => {
subRunCount += 1;
expect(val).toEqual(currentValue * currentValue);
});
currentValue = 3;
// b evaluates
a(3);
// b is triggered to update but hasn't yet
expect(runCount).toEqual(1);
expect(subRunCount).toEqual(0);
setTimeout(async () => {
// b should have updated now
expect(runCount).toEqual(2);
expect(subRunCount).toEqual(1);
// b does not evaluate
expect(await b()).toEqual(9);
expect(runCount).toEqual(2);
expect(subRunCount).toEqual(1);
done();
}, 40);
});
});

View File

@ -1,4 +1,5 @@
export { prop } from './property.js';
export { computed } from './computed.js';
export { stream } from './stream.js';
export { container } from './container.js';
export { call, id, pick } from './util.js';

View File

@ -0,0 +1,51 @@
import { id, registerFire, registerSubscriptions, call } from './util.js';
export function stream(fn, dependencies = [], hash = id) {
let subscribers = [];
let isDirty = true;
let val;
let oldId;
// Compute new value, call subscribers if changed.
const accessor = async function _computed() {
if (isDirty) {
const newVal = await fn.apply(null, await Promise.all(dependencies.map(call)));
isDirty = false;
const newId = hash(newVal);
if (oldId !== newId) {
oldId = newId;
val = newVal;
accessor.fire(val);
}
}
return val;
};
// Add child nodes to the logic graph (value-based)
accessor.subscribe = registerSubscriptions(subscribers);
accessor.fire = registerFire(subscribers);
// Receive dirty flag from parent logic node (dependency). Pass it down.
accessor.setDirty = function setDirty() {
if (!isDirty) {
isDirty = true;
subscribers.forEach(s => s.setDirty && s.setDirty());
}
return subscribers.length && accessor;
};
// Remove this node from the logic graph completely
accessor.detach = () => {
subscribers = [];
dependentSubscriptions.forEach(call);
};
// Remove child nodes from the logic graph
accessor.unsubscribeAll = () => {
subscribers = [];
};
const dependentSubscriptions = dependencies.map(d => d.subscribe(accessor.setDirty));
return accessor;
}