From 4c20e1400ea2508fc4738b91312d03c2bc3fc87a Mon Sep 17 00:00:00 2001 From: Timothy Farrell Date: Thu, 31 May 2018 15:48:18 -0500 Subject: [PATCH] Add stream type to frptools. (prettier got ahold of this commit) --- packages/frptools/README.md | 28 +++++++- packages/frptools/package.json | 2 +- packages/frptools/spec/stream.spec.js | 100 ++++++++++++++++++++++++++ packages/frptools/src/index.js | 1 + packages/frptools/src/stream.js | 51 +++++++++++++ 5 files changed, 179 insertions(+), 3 deletions(-) create mode 100644 packages/frptools/spec/stream.spec.js create mode 100644 packages/frptools/src/stream.js diff --git a/packages/frptools/README.md b/packages/frptools/README.md index c2cbeb9..a5f100e 100644 --- a/packages/frptools/README.md +++ b/packages/frptools/README.md @@ -1,7 +1,7 @@ # FRP tools -Property, Container and Computed value stores designed to work together for storing discrete and -derived state. +Property, Container, Computed and Stream value stores designed to work together for storing discrete +and derived state. # [property](./src/property.js) @@ -153,6 +153,30 @@ const b = prop(new Set([2, 3]), hashSet); const intersection = computed(_intersection, [a, b], hashSet); ``` +# [stream](./src/stream.js) + +`stream` is a computed that works asynchronously. Dependencies can be synchronous or asynchronous +functions but the hash function remains synchronous. + +## Usage + +### Creation + +```js +const getToken = stream( + async tokenUrl => fetch(tokenUrl), // computation function + [getTokenUrl] // array of dependencies, can be a property, computed or stream +); +``` + +### Read + +```js +if (await getToken()) { /* do stuff with the token */ } +``` + +Call it to receive the stored value, recomputing if necessary. + # [container](./src/container.js) `container` is a wrapper around any container type (Object, Set, Map, or Array) while monitoring diff --git a/packages/frptools/package.json b/packages/frptools/package.json index e5911a5..f8d72d9 100644 --- a/packages/frptools/package.json +++ b/packages/frptools/package.json @@ -1,6 +1,6 @@ { "name": "frptools", - "version": "3.1.1", + "version": "3.2.0", "description": "Observable Property and Computed data streams", "main": "src/index.js", "files": ["src"], diff --git a/packages/frptools/spec/stream.spec.js b/packages/frptools/spec/stream.spec.js new file mode 100644 index 0000000..ca97521 --- /dev/null +++ b/packages/frptools/spec/stream.spec.js @@ -0,0 +1,100 @@ +import { prop, computed, stream } from '../src/index.js'; +import { dirtyMock, hashSet } from '../src/testUtil.js'; + +describe('A stream', () => { + const add = (a, b) => a + b; + const square = a => a * a; + + async function delayAdd(a, b) { + return new Promise(resolve => + setTimeout(() => { + resolve(a + b); + }, 30) + ); + } + async function delaySquare(a) { + return new Promise(resolve => + setTimeout(() => { + resolve(a * a); + }, 30) + ); + } + + it('accepts prop, computed and stream dependencies', async done => { + const a = prop(0); + const b = computed(square, [a]); + const c = stream(delaySquare, [a]); + const d = stream(delayAdd, [a, c]); + const e = stream(delaySquare, [b]); + + expect(await c()).toEqual(0); + expect(await d()).toEqual(0); + expect(await e()).toEqual(0); + + a(1); + expect(await c()).toEqual(1); + expect(await d()).toEqual(2); + expect(await e()).toEqual(1); + + a(2); + expect(await c()).toEqual(4); + expect(await d()).toEqual(6); + expect(await e()).toEqual(16); + + a(3); + expect(await c()).toEqual(9); + expect(await d()).toEqual(12); + expect(await e()).toEqual(81); + done(); + }); + + it('computes automatically when subscribed', async done => { + let runCount = 0; + let subRunCount = 0; + let currentValue = 1; + const a = prop(0); + const b = stream( + async val => { + runCount += 1; + expect(val).toEqual(currentValue); + return new Promise(resolve => setTimeout(() => resolve(val * val), 30)); + }, + [a] + ); + + // b does not evaluate + a(1); + expect(runCount).toEqual(0); + // b evaluates + expect(await b()).toEqual(1); + expect(runCount).toEqual(1); + // b does not evaluate + expect(await b()).toEqual(1); + expect(runCount).toEqual(1); + + const cancelSubscription = b.subscribe(val => { + subRunCount += 1; + expect(val).toEqual(currentValue * currentValue); + }); + + currentValue = 3; + // b evaluates + a(3); + + // b is triggered to update but hasn't yet + expect(runCount).toEqual(1); + expect(subRunCount).toEqual(0); + + setTimeout(async () => { + // b should have updated now + expect(runCount).toEqual(2); + expect(subRunCount).toEqual(1); + + // b does not evaluate + expect(await b()).toEqual(9); + expect(runCount).toEqual(2); + expect(subRunCount).toEqual(1); + done(); + }, 40); + }); +}); diff --git a/packages/frptools/src/index.js b/packages/frptools/src/index.js index db0e58b..a412cb1 100644 --- a/packages/frptools/src/index.js +++ b/packages/frptools/src/index.js @@ -1,4 +1,5 @@ export { prop } from './property.js'; export { computed } from './computed.js'; +export { stream } from './stream.js'; export { container } from './container.js'; export { call, id, pick } from './util.js'; diff --git a/packages/frptools/src/stream.js b/packages/frptools/src/stream.js new file mode 100644 index 0000000..a64020f --- /dev/null +++ b/packages/frptools/src/stream.js @@ -0,0 +1,51 @@ +import { id, registerFire, registerSubscriptions, call } from './util.js'; + +export function stream(fn, dependencies = [], hash = id) { + let subscribers = []; + let isDirty = true; + let val; + let oldId; + + // Compute new value, call subscribers if changed. + const accessor = async function _computed() { + if (isDirty) { + const newVal = await fn.apply(null, await Promise.all(dependencies.map(call))); + isDirty = false; + const newId = hash(newVal); + if (oldId !== newId) { + oldId = newId; + val = newVal; + accessor.fire(val); + } + } + return val; + }; + + // Add child nodes to the logic graph (value-based) + accessor.subscribe = registerSubscriptions(subscribers); + accessor.fire = registerFire(subscribers); + + // Receive dirty flag from parent logic node (dependency). Pass it down. + accessor.setDirty = function setDirty() { + if (!isDirty) { + isDirty = true; + subscribers.forEach(s => s.setDirty && s.setDirty()); + } + return subscribers.length && accessor; + }; + + // Remove this node from the logic graph completely + accessor.detach = () => { + subscribers = []; + dependentSubscriptions.forEach(call); + }; + + // Remove child nodes from the logic graph + accessor.unsubscribeAll = () => { + subscribers = []; + }; + + const dependentSubscriptions = dependencies.map(d => d.subscribe(accessor.setDirty)); + + return accessor; +}