%PDF- <> %âãÏÓ endobj 2 0 obj <> endobj 3 0 obj <>/ExtGState<>/ProcSet[/PDF/Text/ImageB/ImageC/ImageI] >>/Annots[ 28 0 R 29 0 R] /MediaBox[ 0 0 595.5 842.25] /Contents 4 0 R/Group<>/Tabs/S>> endobj ºaâÚÎΞ-ÌE1ÍØÄ÷{òò2ÿ ÛÖ^ÔÀá TÎ{¦?§®¥kuµùÕ5sLOšuY>endobj 2 0 obj<>endobj 2 0 obj<>endobj 2 0 obj<>endobj 2 0 obj<> endobj 2 0 obj<>endobj 2 0 obj<>es 3 0 R>> endobj 2 0 obj<> ox[ 0.000000 0.000000 609.600000 935.600000]/Fi endobj 3 0 obj<> endobj 7 1 obj<>/ProcSet[/PDF/Text/ImageB/ImageC/ImageI]>>/Subtype/Form>> stream
open Wonka_types; open Wonka_helpers; type bufferStateT('a) = { mutable buffer: Rebel.MutableQueue.t('a), mutable sourceTalkback: (. talkbackT) => unit, mutable notifierTalkback: (. talkbackT) => unit, mutable pulled: bool, mutable ended: bool, }; [@genType] let buffer = (notifier: sourceT('a)): operatorT('b, array('b)) => curry(source => curry(sink => { let state = { buffer: Rebel.MutableQueue.make(), sourceTalkback: talkbackPlaceholder, notifierTalkback: talkbackPlaceholder, pulled: false, ended: false, }; source((. signal) => { switch (signal) { | Start(tb) => state.sourceTalkback = tb; notifier((. signal) => { switch (signal) { | Start(tb) => state.notifierTalkback = tb | Push(_) when !state.ended => if (Rebel.MutableQueue.size(state.buffer) > 0) { let buffer = state.buffer; state.buffer = Rebel.MutableQueue.make(); sink(. Push(Rebel.MutableQueue.toArray(buffer))); } | Push(_) => () | End when !state.ended => state.ended = true; state.sourceTalkback(. Close); if (Rebel.MutableQueue.size(state.buffer) > 0) { sink(. Push(Rebel.MutableQueue.toArray(state.buffer))); }; sink(. End); | End => () }; (); }); | Push(value) when !state.ended => Rebel.MutableQueue.add(state.buffer, value); if (!state.pulled) { state.pulled = true; state.sourceTalkback(. Pull); state.notifierTalkback(. Pull); } else { state.pulled = false; }; | Push(_) => () | End when !state.ended => state.ended = true; state.notifierTalkback(. Close); if (Rebel.MutableQueue.size(state.buffer) > 0) { sink(. Push(Rebel.MutableQueue.toArray(state.buffer))); }; sink(. End); | End => () }; (); }); sink(. Start( (. signal) => if (!state.ended) { switch (signal) { | Close => state.ended = true; state.sourceTalkback(. Close); state.notifierTalkback(. Close); | Pull when !state.pulled => state.pulled = true; state.sourceTalkback(. Pull); state.notifierTalkback(. Pull); | Pull => () }; }, ), ); }) ); type combineStateT('a, 'b) = { mutable talkbackA: (. talkbackT) => unit, mutable talkbackB: (. talkbackT) => unit, mutable lastValA: option('a), mutable lastValB: option('b), mutable gotSignal: bool, mutable endCounter: int, mutable ended: bool, }; [@genType] let combine = (sourceA: sourceT('a), sourceB: sourceT('b)): sourceT(('a, 'b)) => curry(sink => { let state = { talkbackA: talkbackPlaceholder, talkbackB: talkbackPlaceholder, lastValA: None, lastValB: None, gotSignal: false, endCounter: 0, ended: false, }; sourceA((. signal) => switch (signal, state.lastValB) { | (Start(tb), _) => state.talkbackA = tb | (Push(a), None) => state.lastValA = Some(a); if (!state.gotSignal) { state.talkbackB(. Pull); } else { state.gotSignal = false; }; | (Push(a), Some(b)) when !state.ended => state.lastValA = Some(a); state.gotSignal = false; sink(. Push((a, b))); | (End, _) when state.endCounter < 1 => state.endCounter = state.endCounter + 1 | (End, _) when !state.ended => state.ended = true; sink(. End); | _ => () } ); sourceB((. signal) => switch (signal, state.lastValA) { | (Start(tb), _) => state.talkbackB = tb | (Push(b), None) => state.lastValB = Some(b); if (!state.gotSignal) { state.talkbackA(. Pull); } else { state.gotSignal = false; }; | (Push(b), Some(a)) when !state.ended => state.lastValB = Some(b); state.gotSignal = false; sink(. Push((a, b))); | (End, _) when state.endCounter < 1 => state.endCounter = state.endCounter + 1 | (End, _) when !state.ended => state.ended = true; sink(. End); | _ => () } ); sink(. Start( (. signal) => if (!state.ended) { switch (signal) { | Close => state.ended = true; state.talkbackA(. Close); state.talkbackB(. Close); | Pull when !state.gotSignal => state.gotSignal = true; state.talkbackA(. signal); state.talkbackB(. signal); | Pull => () }; }, ), ); }); type concatMapStateT('a) = { inputQueue: Rebel.MutableQueue.t('a), mutable outerTalkback: (. talkbackT) => unit, mutable outerPulled: bool, mutable innerTalkback: (. talkbackT) => unit, mutable innerActive: bool, mutable innerPulled: bool, mutable ended: bool, }; [@genType] let concatMap = (f: (. 'a) => sourceT('b)): operatorT('a, 'b) => curry(source => curry(sink => { let state: concatMapStateT('a) = { inputQueue: Rebel.MutableQueue.make(), outerTalkback: talkbackPlaceholder, outerPulled: false, innerTalkback: talkbackPlaceholder, innerActive: false, innerPulled: false, ended: false, }; let rec applyInnerSource = innerSource => { state.innerActive = true; innerSource((. signal) => { switch (signal) { | Start(tb) => state.innerTalkback = tb; state.innerPulled = false; tb(. Pull); | Push(_) when state.innerActive => sink(. signal); if (!state.innerPulled) { state.innerTalkback(. Pull); } else { state.innerPulled = false; }; | Push(_) => () | End when state.innerActive => state.innerActive = false; switch (Rebel.MutableQueue.pop(state.inputQueue)) { | Some(input) => applyInnerSource(f(. input)) | None when state.ended => sink(. End) | None when !state.outerPulled => state.outerPulled = true; state.outerTalkback(. Pull); | None => () }; | End => () }; (); }); (); }; source((. signal) => { switch (signal) { | Start(tb) => state.outerTalkback = tb | Push(x) when !state.ended => state.outerPulled = false; if (state.innerActive) { Rebel.MutableQueue.add(state.inputQueue, x); } else { applyInnerSource(f(. x)); }; | Push(_) => () | End when !state.ended => state.ended = true; if (!state.innerActive && Rebel.MutableQueue.isEmpty(state.inputQueue)) { sink(. End); }; | End => () }; (); }); sink(. Start( (. signal) => switch (signal) { | Pull => if (!state.ended && !state.outerPulled) { state.outerPulled = true; state.outerTalkback(. Pull); }; if (state.innerActive && !state.innerPulled) { state.innerPulled = true; state.innerTalkback(. Pull); }; | Close => if (!state.ended) { state.ended = true; state.outerTalkback(. Close); }; if (state.innerActive) { state.innerActive = false; state.innerTalkback(. Close); }; }, ), ); }) ); [@genType] let concatAll = (source: sourceT(sourceT('a))): sourceT('a) => concatMap((. x) => x, source); [@genType] let concat = (sources: array(sourceT('a))): sourceT('a) => concatMap((. x) => x, Wonka_sources.fromArray(sources)); [@genType] let filter = (f: (. 'a) => bool): operatorT('a, 'a) => curry(source => curry(sink => { let talkback = ref(talkbackPlaceholder); source((. signal) => { switch (signal) { | Start(tb) => talkback := tb; sink(. signal); | Push(x) when !f(. x) => talkback^(. Pull) | _ => sink(. signal) }; (); }); }) ); [@genType] let map = (f: (. 'a) => 'b): operatorT('a, 'b) => curry(source => curry(sink => source((. signal) => { sink(. /* The signal needs to be recreated for genType to generate the correct generics during codegen */ switch (signal) { | Start(x) => Start(x) | Push(x) => Push(f(. x)) | End => End }, ) }) ) ); type mergeMapStateT = { mutable outerTalkback: (. talkbackT) => unit, mutable outerPulled: bool, mutable innerTalkbacks: Rebel.Array.t((. talkbackT) => unit), mutable ended: bool, }; [@genType] let mergeMap = (f: (. 'a) => sourceT('b)): operatorT('a, 'b) => curry(source => curry(sink => { let state: mergeMapStateT = { outerTalkback: talkbackPlaceholder, outerPulled: false, innerTalkbacks: Rebel.Array.makeEmpty(), ended: false, }; let applyInnerSource = innerSource => { let talkback = ref(talkbackPlaceholder); innerSource((. signal) => switch (signal) { | Start(tb) => talkback := tb; state.innerTalkbacks = Rebel.Array.append(state.innerTalkbacks, tb); tb(. Pull); | Push(x) when Rebel.Array.size(state.innerTalkbacks) !== 0 => sink(. Push(x)); talkback^(. Pull); | Push(_) => () | End when Rebel.Array.size(state.innerTalkbacks) !== 0 => state.innerTalkbacks = Rebel.Array.filter(state.innerTalkbacks, x => x !== talkback^); let exhausted = Rebel.Array.size(state.innerTalkbacks) === 0; if (state.ended && exhausted) { sink(. End); } else if (!state.outerPulled && exhausted) { state.outerPulled = true; state.outerTalkback(. Pull); }; | End => () } ); }; source((. signal) => switch (signal) { | Start(tb) => state.outerTalkback = tb | Push(x) when !state.ended => state.outerPulled = false; applyInnerSource(f(. x)); if (!state.outerPulled) { state.outerPulled = true; state.outerTalkback(. Pull); }; | Push(_) => () | End when !state.ended => state.ended = true; if (Rebel.Array.size(state.innerTalkbacks) === 0) { sink(. End); }; | End => () } ); sink(. Start( (. signal) => switch (signal) { | Close => if (!state.ended) { state.ended = true; state.outerTalkback(. signal); }; Rebel.Array.forEach(state.innerTalkbacks, tb => tb(. signal)); state.innerTalkbacks = Rebel.Array.makeEmpty(); | Pull => if (!state.outerPulled && !state.ended) { state.outerPulled = true; state.outerTalkback(. Pull); } else { state.outerPulled = false; }; Rebel.Array.forEach(state.innerTalkbacks, tb => tb(. Pull)); }, ), ); }) ); [@genType] let merge = (sources: array(sourceT('a))): sourceT('a) => mergeMap((. x) => x, Wonka_sources.fromArray(sources)); [@genType] let mergeAll = (source: sourceT(sourceT('a))): sourceT('a) => mergeMap((. x) => x, source); [@genType] let flatten = mergeAll; [@genType] let onEnd = (f: (. unit) => unit): operatorT('a, 'a) => curry(source => curry(sink => { let ended = ref(false); source((. signal) => switch (signal) { | Start(talkback) => sink(. Start( (. signal) => if (! ended^) { switch (signal) { | Pull => talkback(. signal) | Close => ended := true; talkback(. signal); f(.); }; }, ), ) | Push(_) when ! ended^ => sink(. signal) | Push(_) => () | End when ! ended^ => ended := true; sink(. signal); f(.); | End => () } ); }) ); [@genType] let onPush = (f: (. 'a) => unit): operatorT('a, 'a) => curry(source => curry(sink => { let ended = ref(false); source((. signal) => { switch (signal) { | Start(talkback) => sink(. Start( (. signal) => if (! ended^) { switch (signal) { | Pull => talkback(. signal) | Close => ended := true; talkback(. signal); }; }, ), ) | Push(x) when ! ended^ => f(. x); sink(. signal); | Push(_) => () | End when ! ended^ => ended := true; sink(. signal); | End => () }; (); }); }) ); [@genType] let tap = onPush; [@genType] let onStart = (f: (. unit) => unit): operatorT('a, 'a) => curry(source => curry(sink => source((. signal) => switch (signal) { | Start(_) => sink(. signal); f(.); | _ => sink(. signal) } ) ) ); type sampleStateT('a) = { mutable sourceTalkback: (. talkbackT) => unit, mutable notifierTalkback: (. talkbackT) => unit, mutable value: option('a), mutable pulled: bool, mutable ended: bool, }; [@genType] let sample = (notifier: sourceT('a)): operatorT('b, 'b) => curry(source => curry(sink => { let state = { sourceTalkback: talkbackPlaceholder, notifierTalkback: talkbackPlaceholder, value: None, pulled: false, ended: false, }; source((. signal) => switch (signal) { | Start(tb) => state.sourceTalkback = tb | Push(x) => state.value = Some(x); if (!state.pulled) { state.pulled = true; state.notifierTalkback(. Pull); state.sourceTalkback(. Pull); } else { state.pulled = false; }; | End when !state.ended => state.ended = true; state.notifierTalkback(. Close); sink(. End); | End => () } ); notifier((. signal) => switch (signal, state.value) { | (Start(tb), _) => state.notifierTalkback = tb | (End, _) when !state.ended => state.ended = true; state.sourceTalkback(. Close); sink(. End); | (End, _) => () | (Push(_), Some(x)) when !state.ended => state.value = None; sink(. Push(x)); | (Push(_), _) => () } ); sink(. Start( (. signal) => if (!state.ended) { switch (signal) { | Pull when !state.pulled => state.pulled = true; state.sourceTalkback(. Pull); state.notifierTalkback(. Pull); | Pull => () | Close => state.ended = true; state.sourceTalkback(. Close); state.notifierTalkback(. Close); }; }, ), ); }) ); [@genType] let scan = (f: (. 'acc, 'a) => 'acc, seed: 'acc): operatorT('a, 'acc) => curry(source => curry(sink => { let acc = ref(seed); source((. signal) => sink(. switch (signal) { | Push(x) => acc := f(. acc^, x); Push(acc^); | Start(x) => Start(x) | End => End }, ) ); }) ); type shareStateT('a) = { mutable sinks: Rebel.Array.t(sinkT('a)), mutable talkback: (. talkbackT) => unit, mutable gotSignal: bool, }; [@genType] let share = (source: sourceT('a)): sourceT('a) => { let state = { sinks: Rebel.Array.makeEmpty(), talkback: talkbackPlaceholder, gotSignal: false, }; sink => { state.sinks = Rebel.Array.append(state.sinks, sink); if (Rebel.Array.size(state.sinks) === 1) { source((. signal) => switch (signal) { | Push(_) => state.gotSignal = false; Rebel.Array.forEach(state.sinks, sink => sink(. signal)); | Start(x) => state.talkback = x | End => Rebel.Array.forEach(state.sinks, sink => sink(. End)); state.sinks = Rebel.Array.makeEmpty(); } ); }; sink(. Start( (. signal) => switch (signal) { | Close => state.sinks = Rebel.Array.filter(state.sinks, x => x !== sink); if (Rebel.Array.size(state.sinks) === 0) { state.talkback(. Close); }; | Pull when !state.gotSignal => state.gotSignal = true; state.talkback(. signal); | Pull => () }, ), ); }; }; type skipStateT = { mutable talkback: (. talkbackT) => unit, mutable rest: int, }; [@genType] let skip = (wait: int): operatorT('a, 'a) => curry(source => curry(sink => { let state: skipStateT = {talkback: talkbackPlaceholder, rest: wait}; source((. signal) => switch (signal) { | Start(tb) => state.talkback = tb; sink(. signal); | Push(_) when state.rest > 0 => state.rest = state.rest - 1; state.talkback(. Pull); | _ => sink(. signal) } ); }) ); type skipUntilStateT = { mutable sourceTalkback: (. talkbackT) => unit, mutable notifierTalkback: (. talkbackT) => unit, mutable skip: bool, mutable pulled: bool, mutable ended: bool, }; [@genType] let skipUntil = (notifier: sourceT('a)): operatorT('b, 'b) => curry(source => curry(sink => { let state: skipUntilStateT = { sourceTalkback: talkbackPlaceholder, notifierTalkback: talkbackPlaceholder, skip: true, pulled: false, ended: false, }; source((. signal) => { switch (signal) { | Start(tb) => state.sourceTalkback = tb; notifier((. signal) => { switch (signal) { | Start(innerTb) => state.notifierTalkback = innerTb; innerTb(. Pull); | Push(_) => state.skip = false; state.notifierTalkback(. Close); | End when state.skip => state.ended = true; state.sourceTalkback(. Close); | End => () }; (); }); | Push(_) when !state.skip && !state.ended => state.pulled = false; sink(. signal); | Push(_) when !state.pulled => state.pulled = true; state.sourceTalkback(. Pull); state.notifierTalkback(. Pull); | Push(_) => state.pulled = false | End => if (state.skip) { state.notifierTalkback(. Close); }; state.ended = true; sink(. End); }; (); }); sink(. Start( (. signal) => if (!state.ended) { switch (signal) { | Close => state.ended = true; state.sourceTalkback(. Close); if (state.skip) { state.notifierTalkback(. Close); }; | Pull when !state.pulled => state.pulled = true; if (state.skip) { state.notifierTalkback(. Pull); }; state.sourceTalkback(. Pull); | Pull => () }; }, ), ); }) ); type skipWhileStateT = { mutable talkback: (. talkbackT) => unit, mutable skip: bool, }; [@genType] let skipWhile = (f: (. 'a) => bool): operatorT('a, 'a) => curry(source => curry(sink => { let state: skipWhileStateT = { talkback: talkbackPlaceholder, skip: true, }; source((. signal) => switch (signal) { | Start(tb) => state.talkback = tb; sink(. signal); | Push(x) when state.skip => if (f(. x)) { state.talkback(. Pull); } else { state.skip = false; sink(. signal); } | _ => sink(. signal) } ); }) ); type switchMapStateT('a) = { mutable outerTalkback: (. talkbackT) => unit, mutable outerPulled: bool, mutable innerTalkback: (. talkbackT) => unit, mutable innerActive: bool, mutable innerPulled: bool, mutable ended: bool, }; [@genType] let switchMap = (f: (. 'a) => sourceT('b)): operatorT('a, 'b) => curry(source => curry(sink => { let state: switchMapStateT('a) = { outerTalkback: talkbackPlaceholder, outerPulled: false, innerTalkback: talkbackPlaceholder, innerActive: false, innerPulled: false, ended: false, }; let applyInnerSource = innerSource => { state.innerActive = true; innerSource((. signal) => if (state.innerActive) { switch (signal) { | Start(tb) => state.innerTalkback = tb; state.innerPulled = false; tb(. Pull); | Push(_) => sink(. signal); if (!state.innerPulled) { state.innerTalkback(. Pull); } else { state.innerPulled = false; }; | End => state.innerActive = false; if (state.ended) { sink(. signal); } else if (!state.outerPulled) { state.outerPulled = true; state.outerTalkback(. Pull); }; }; } ); (); }; source((. signal) => { switch (signal) { | Start(tb) => state.outerTalkback = tb | Push(x) when !state.ended => if (state.innerActive) { state.innerTalkback(. Close); state.innerTalkback = talkbackPlaceholder; }; if (!state.outerPulled) { state.outerPulled = true; state.outerTalkback(. Pull); } else { state.outerPulled = false; }; applyInnerSource(f(. x)); | Push(_) => () | End when !state.ended => state.ended = true; if (!state.innerActive) { sink(. End); }; | End => () }; (); }); sink(. Start( (. signal) => switch (signal) { | Pull => if (!state.ended && !state.outerPulled) { state.outerPulled = true; state.outerTalkback(. Pull); }; if (state.innerActive && !state.innerPulled) { state.innerPulled = true; state.innerTalkback(. Pull); }; | Close => if (!state.ended) { state.ended = true; state.outerTalkback(. Close); }; if (state.innerActive) { state.innerActive = false; state.innerTalkback(. Close); }; }, ), ); }) ); [@genType] let switchAll = (source: sourceT(sourceT('a))): sourceT('a) => switchMap((. x) => x, source); type takeStateT = { mutable ended: bool, mutable taken: int, mutable talkback: (. talkbackT) => unit, }; [@genType] let take = (max: int): operatorT('a, 'a) => curry(source => curry(sink => { let state: takeStateT = { ended: false, taken: 0, talkback: talkbackPlaceholder, }; source((. signal) => switch (signal) { | Start(tb) when max <= 0 => state.ended = true; sink(. End); tb(. Close); | Start(tb) => state.talkback = tb | Push(_) when state.taken < max && !state.ended => state.taken = state.taken + 1; sink(. signal); if (!state.ended && state.taken >= max) { state.ended = true; sink(. End); state.talkback(. Close); }; | Push(_) => () | End when !state.ended => state.ended = true; sink(. End); | End => () } ); sink(. Start( (. signal) => if (!state.ended) { switch (signal) { | Pull when state.taken < max => state.talkback(. Pull) | Pull => () | Close => state.ended = true; state.talkback(. Close); }; }, ), ); }) ); type takeLastStateT('a) = { mutable queue: Rebel.MutableQueue.t('a), mutable talkback: (. talkbackT) => unit, }; [@genType] let takeLast = (max: int): operatorT('a, 'a) => curry(source => curry(sink => { let state: takeLastStateT('a) = { queue: Rebel.MutableQueue.make(), talkback: talkbackPlaceholder, }; source((. signal) => { switch (signal) { | Start(talkback) when max <= 0 => talkback(. Close); Wonka_sources.empty(sink); | Start(talkback) => state.talkback = talkback; talkback(. Pull); | Push(x) => let size = Rebel.MutableQueue.size(state.queue); if (size >= max && max > 0) { ignore(Rebel.MutableQueue.pop(state.queue)); }; Rebel.MutableQueue.add(state.queue, x); state.talkback(. Pull); | End => Wonka_sources.fromArray( Rebel.MutableQueue.toArray(state.queue), sink, ) }; (); }); }) ); type takeUntilStateT = { mutable ended: bool, mutable sourceTalkback: (. talkbackT) => unit, mutable notifierTalkback: (. talkbackT) => unit, }; [@genType] let takeUntil = (notifier: sourceT('a)): operatorT('b, 'b) => curry(source => curry(sink => { let state: takeUntilStateT = { ended: false, sourceTalkback: talkbackPlaceholder, notifierTalkback: talkbackPlaceholder, }; source((. signal) => { switch (signal) { | Start(tb) => state.sourceTalkback = tb; notifier((. signal) => { switch (signal) { | Start(innerTb) => state.notifierTalkback = innerTb; innerTb(. Pull); | Push(_) => state.ended = true; state.sourceTalkback(. Close); sink(. End); | End => () }; (); }); | End when !state.ended => state.ended = true; state.notifierTalkback(. Close); sink(. End); | End => () | Push(_) when !state.ended => sink(. signal) | Push(_) => () }; (); }); sink(. Start( (. signal) => if (!state.ended) { switch (signal) { | Close => state.ended = true; state.sourceTalkback(. Close); state.notifierTalkback(. Close); | Pull => state.sourceTalkback(. Pull) }; }, ), ); }) ); type takeWhileStateT = { mutable talkback: (. talkbackT) => unit, mutable ended: bool, }; [@genType] let takeWhile = (f: (. 'a) => bool): operatorT('a, 'a) => curry(source => curry(sink => { let state: takeWhileStateT = { talkback: talkbackPlaceholder, ended: false, }; source((. signal) => switch (signal) { | Start(tb) => state.talkback = tb; sink(. signal); | End when !state.ended => state.ended = true; sink(. End); | End => () | Push(x) when !state.ended => if (!f(. x)) { state.ended = true; sink(. End); state.talkback(. Close); } else { sink(. signal); } | Push(_) => () } ); }) );