%PDF- <> %âãÏÓ endobj 2 0 obj <> endobj 3 0 obj <>/ExtGState<>/ProcSet[/PDF/Text/ImageB/ImageC/ImageI] >>/Annots[ 28 0 R 29 0 R] /MediaBox[ 0 0 595.5 842.25] /Contents 4 0 R/Group<>/Tabs/S>> endobj ºaâÚÎΞ-ÌE1ÍØÄ÷{òò2ÿ ÛÖ^ÔÀá TÎ{¦?§®¥kuµùÕ5sLOšuY>endobj 2 0 obj<>endobj 2 0 obj<>endobj 2 0 obj<>endobj 2 0 obj<> endobj 2 0 obj<>endobj 2 0 obj<>es 3 0 R>> endobj 2 0 obj<> ox[ 0.000000 0.000000 609.600000 935.600000]/Fi endobj 3 0 obj<> endobj 7 1 obj<>/ProcSet[/PDF/Text/ImageB/ImageC/ImageI]>>/Subtype/Form>> stream
open Wonka_types; open Wonka_helpers; type subscribeStateT = { mutable talkback: (. talkbackT) => unit, mutable ended: bool, }; [@genType] type subscribeConsumerT('a) = sourceT('a) => subscriptionT; [@genType] let subscribe = (f: (. 'a) => unit): subscribeConsumerT('a) => curry(source => { let state: subscribeStateT = { talkback: talkbackPlaceholder, ended: false, }; source((. signal) => switch (signal) { | Start(x) => state.talkback = x; x(. Pull); | Push(x) when !state.ended => f(. x); state.talkback(. Pull); | Push(_) => () | End => state.ended = true } ); { unsubscribe: () => if (!state.ended) { state.ended = true; state.talkback(. Close); }, }; }); [@genType] type forEachConsumerT('a) = sourceT('a) => unit; [@genType] let forEach = (f: (. 'a) => unit): forEachConsumerT('a) => curry(source => ignore(subscribe(f, source))); [@genType] let publish = (source: sourceT('a)): subscriptionT => subscribe((. _) => (), source); type toArrayStateT('a) = { values: Rebel.MutableQueue.t('a), mutable talkback: (. talkbackT) => unit, mutable value: option('a), mutable ended: bool, }; [@genType] let toArray = (source: sourceT('a)): array('a) => { let state: toArrayStateT('a) = { values: Rebel.MutableQueue.make(), talkback: talkbackPlaceholder, value: None, ended: false, }; source((. signal) => switch (signal) { | Start(x) => state.talkback = x; x(. Pull); | Push(value) => Rebel.MutableQueue.add(state.values, value); state.talkback(. Pull); | End => state.ended = true } ); if (!state.ended) { state.talkback(. Close); }; Rebel.MutableQueue.toArray(state.values); };