import { id, registerFire, registerSubscriptions, call } from './util.js'; export function stream(fn, dependencies = [], hash = id) { let subscribers = []; let isDirty = true; let val; let oldId; let currentProcess; // Compute new value, call subscribers if changed. const accessor = function _stream() { if (!isDirty) { return Promise.resolve(val); } if (!currentProcess) { currentProcess = Promise.all(dependencies.map(call)) .then(params => fn.apply(null, params)) .then(res => { const newId = hash(res); isDirty = false; if (oldId !== newId) { oldId = newId; val = res; accessor.fire(val); } return val; }) .finally(_ => { isDirty = false; currentProcess = null; }); } return currentProcess; }; // Add child nodes to the logic graph (value-based) accessor.subscribe = registerSubscriptions(subscribers); accessor.fire = registerFire(subscribers); // Receive dirty flag from parent logic node (dependency). Pass it down. accessor.setDirty = function setDirty() { if (!isDirty) { isDirty = true; subscribers.forEach(s => s.setDirty && s.setDirty()); } return subscribers.length && accessor; }; // Remove this node from the logic graph completely accessor.detach = () => { subscribers = []; dependentSubscriptions.forEach(call); }; // Remove child nodes from the logic graph accessor.unsubscribeAll = () => { subscribers = []; }; const dependentSubscriptions = dependencies.map(d => d.subscribe(accessor.setDirty)); return accessor; }