diff --git a/purescript/switcheroo/.gitignore b/purescript/switcheroo/.gitignore index 30efe19..1081dbc 100644 --- a/purescript/switcheroo/.gitignore +++ b/purescript/switcheroo/.gitignore @@ -8,3 +8,4 @@ /.purs* /.psa* /.spago +main.js diff --git a/purescript/switcheroo/flake.lock b/purescript/switcheroo/flake.lock new file mode 100644 index 0000000..e3ffc2e --- /dev/null +++ b/purescript/switcheroo/flake.lock @@ -0,0 +1,640 @@ +{ + "nodes": { + "deadnix": { + "inputs": { + "fenix": "fenix", + "naersk": "naersk", + "nixpkgs": "nixpkgs_4", + "utils": "utils" + }, + "locked": { + "lastModified": 1655647809, + "narHash": "sha256-npyHYIJW7HyGIFpCZZK+t5JM/v2LsyFhAGJxX1DXO7E=", + "owner": "astro", + "repo": "deadnix", + "rev": "83c42cc64d190ecb72f5929eab0f64fe88e25dc4", + "type": "github" + }, + "original": { + "owner": "astro", + "repo": "deadnix", + "type": "github" + } + }, + "deadnix_2": { + "inputs": { + "fenix": "fenix_2", + "naersk": "naersk_2", + "nixpkgs": "nixpkgs_8", + "utils": "utils_2" + }, + "locked": { + "lastModified": 1656370114, + "narHash": "sha256-XBbSWeBuF6Ck0jc634yAp2hjPXWM2JyRDPCdK0dh3w4=", + "owner": "astro", + "repo": "deadnix", + "rev": "9f450f7250ad7680cb5f12ce5985cc18496c2d5f", + "type": "github" + }, + "original": { + "owner": "astro", + "repo": "deadnix", + "type": "github" + } + }, + "docs-search": { + "flake": false, + "locked": { + "lastModified": 1661787609, + "narHash": "sha256-jgOl8PKisRmcaHOya3HzArI3eKjVErx+XIBGminh9Zk=", + "owner": "purs-nix", + "repo": "purescript-docs-search", + "rev": "4620575e21886fcbf516d0b43910ba4ead2a60d0", + "type": "github" + }, + "original": { + "owner": "purs-nix", + "repo": "purescript-docs-search", + "type": "github" + } + }, + "fenix": { + "inputs": { + "nixpkgs": "nixpkgs_2", + "rust-analyzer-src": "rust-analyzer-src" + }, + "locked": { + "lastModified": 1655533500, + "narHash": "sha256-qJJmLVoMYfDLywI9MNL7sb0W/GsKQF9HDatdHm1tSl0=", + "owner": "nix-community", + "repo": "fenix", + "rev": "b6630603af13df17d0dd4df8629e9a24e6ba0fbd", + "type": "github" + }, + "original": { + "owner": "nix-community", + "repo": "fenix", + "type": "github" + } + }, + "fenix_2": { + "inputs": { + "nixpkgs": "nixpkgs_6", + "rust-analyzer-src": "rust-analyzer-src_2" + }, + "locked": { + "lastModified": 1655533500, + "narHash": "sha256-qJJmLVoMYfDLywI9MNL7sb0W/GsKQF9HDatdHm1tSl0=", + "owner": "nix-community", + "repo": "fenix", + "rev": "b6630603af13df17d0dd4df8629e9a24e6ba0fbd", + "type": "github" + }, + "original": { + "owner": "nix-community", + "repo": "fenix", + "type": "github" + } + }, + "fenix_3": { + "inputs": { + "nixpkgs": [ + "purs-nix", + "statix", + "nixpkgs" + ], + "rust-analyzer-src": "rust-analyzer-src_3" + }, + "locked": { + "lastModified": 1645251813, + "narHash": "sha256-cQ66tGjnZclBCS3nD26mZ5fUH+3/HnysGffBiWXUSHk=", + "owner": "nix-community", + "repo": "fenix", + "rev": "9892337b588c38ec59466a1c89befce464aae7f8", + "type": "github" + }, + "original": { + "owner": "nix-community", + "repo": "fenix", + "type": "github" + } + }, + "flake-utils": { + "locked": { + "lastModified": 1618217525, + "narHash": "sha256-WGrhVczjXTiswQaoxQ+0PTfbLNeOQM6M36zvLn78AYg=", + "owner": "numtide", + "repo": "flake-utils", + "rev": "c6169a2772643c4a93a0b5ac1c61e296cba68544", + "type": "github" + }, + "original": { + "owner": "numtide", + "repo": "flake-utils", + "type": "github" + } + }, + "flake-utils_2": { + "locked": { + "lastModified": 1618217525, + "narHash": "sha256-WGrhVczjXTiswQaoxQ+0PTfbLNeOQM6M36zvLn78AYg=", + "owner": "numtide", + "repo": "flake-utils", + "rev": "c6169a2772643c4a93a0b5ac1c61e296cba68544", + "type": "github" + }, + "original": { + "owner": "numtide", + "repo": "flake-utils", + "type": "github" + } + }, + "get-flake": { + "locked": { + "lastModified": 1644686428, + "narHash": "sha256-zkhYsURWFrvEZLkIoBeqFBzSu+cA2u5mo6M8vq9LN7M=", + "owner": "ursi", + "repo": "get-flake", + "rev": "703f15558daa56dfae19d1858bb3046afe68831a", + "type": "github" + }, + "original": { + "owner": "ursi", + "repo": "get-flake", + "type": "github" + } + }, + "gitignore": { + "inputs": { + "nixpkgs": [ + "purs-nix", + "statix", + "nixpkgs" + ] + }, + "locked": { + "lastModified": 1635165013, + "narHash": "sha256-o/BdVjNwcB6jOmzZjOH703BesSkkS5O7ej3xhyO8hAY=", + "owner": "hercules-ci", + "repo": "gitignore.nix", + "rev": "5b9e0ff9d3b551234b4f3eb3983744fa354b17f1", + "type": "github" + }, + "original": { + "owner": "hercules-ci", + "repo": "gitignore.nix", + "type": "github" + } + }, + "make-shell": { + "locked": { + "lastModified": 1634940815, + "narHash": "sha256-P69OmveboXzS+es1vQGS4bt+ckwbeIExqxfGLjGuJqA=", + "owner": "ursi", + "repo": "nix-make-shell", + "rev": "8add91681170924e4d0591b22f294aee3f5516f9", + "type": "github" + }, + "original": { + "owner": "ursi", + "ref": "1", + "repo": "nix-make-shell", + "type": "github" + } + }, + "make-shell_2": { + "locked": { + "lastModified": 1634940815, + "narHash": "sha256-P69OmveboXzS+es1vQGS4bt+ckwbeIExqxfGLjGuJqA=", + "owner": "ursi", + "repo": "nix-make-shell", + "rev": "8add91681170924e4d0591b22f294aee3f5516f9", + "type": "github" + }, + "original": { + "owner": "ursi", + "ref": "1", + "repo": "nix-make-shell", + "type": "github" + } + }, + "naersk": { + "inputs": { + "nixpkgs": "nixpkgs_3" + }, + "locked": { + "lastModified": 1655042882, + "narHash": "sha256-9BX8Fuez5YJlN7cdPO63InoyBy7dm3VlJkkmTt6fS1A=", + "owner": "nix-community", + "repo": "naersk", + "rev": "cddffb5aa211f50c4b8750adbec0bbbdfb26bb9f", + "type": "github" + }, + "original": { + "owner": "nix-community", + "repo": "naersk", + "type": "github" + } + }, + "naersk_2": { + "inputs": { + "nixpkgs": "nixpkgs_7" + }, + "locked": { + "lastModified": 1655042882, + "narHash": "sha256-9BX8Fuez5YJlN7cdPO63InoyBy7dm3VlJkkmTt6fS1A=", + "owner": "nix-community", + "repo": "naersk", + "rev": "cddffb5aa211f50c4b8750adbec0bbbdfb26bb9f", + "type": "github" + }, + "original": { + "owner": "nix-community", + "repo": "naersk", + "type": "github" + } + }, + "nixpkgs": { + "locked": { + "lastModified": 1675698036, + "narHash": "sha256-BgsQkQewdlQi8gapJN4phpxkI/FCE/2sORBaFcYbp/A=", + "owner": "NixOS", + "repo": "nixpkgs", + "rev": "1046c7b92e908a1202c0f1ba3fc21d19e1cf1b62", + "type": "github" + }, + "original": { + "owner": "NixOS", + "ref": "nixpkgs-unstable", + "repo": "nixpkgs", + "type": "github" + } + }, + "nixpkgs_10": { + "locked": { + "lastModified": 1645013224, + "narHash": "sha256-b7OEC8vwzJv3rsz9pwnTX2LQDkeOWz2DbKypkVvNHXc=", + "owner": "nixos", + "repo": "nixpkgs", + "rev": "b66b39216b1fef2d8c33cc7a5c72d8da80b79970", + "type": "github" + }, + "original": { + "owner": "nixos", + "ref": "nixpkgs-unstable", + "repo": "nixpkgs", + "type": "github" + } + }, + "nixpkgs_2": { + "locked": { + "lastModified": 1655400192, + "narHash": "sha256-49OBVVRgb9H/PSmNT9W61+NRdDbuSJVuDDflwXlaUKU=", + "owner": "nixos", + "repo": "nixpkgs", + "rev": "3d7435c638baffaa826b85459df0fff47f12317d", + "type": "github" + }, + "original": { + "owner": "nixos", + "ref": "nixos-unstable", + "repo": "nixpkgs", + "type": "github" + } + }, + "nixpkgs_3": { + "locked": { + "lastModified": 1655481042, + "narHash": "sha256-XHbcywq2vIQ5CeH1OK3TN793jkiNAAZsSctS1PFgseo=", + "owner": "NixOS", + "repo": "nixpkgs", + "rev": "103a4c0ae46afa9cf008c30744175315ca38e9f9", + "type": "github" + }, + "original": { + "id": "nixpkgs", + "type": "indirect" + } + }, + "nixpkgs_4": { + "locked": { + "lastModified": 1655481042, + "narHash": "sha256-XHbcywq2vIQ5CeH1OK3TN793jkiNAAZsSctS1PFgseo=", + "owner": "NixOS", + "repo": "nixpkgs", + "rev": "103a4c0ae46afa9cf008c30744175315ca38e9f9", + "type": "github" + }, + "original": { + "id": "nixpkgs", + "type": "indirect" + } + }, + "nixpkgs_5": { + "locked": { + "lastModified": 1646506091, + "narHash": "sha256-sWNAJE2m+HOh1jtXlHcnhxsj6/sXrHgbqVNcVRlveK4=", + "owner": "NixOS", + "repo": "nixpkgs", + "rev": "3e644bd62489b516292c816f70bf0052c693b3c7", + "type": "github" + }, + "original": { + "owner": "NixOS", + "ref": "nixpkgs-unstable", + "repo": "nixpkgs", + "type": "github" + } + }, + "nixpkgs_6": { + "locked": { + "lastModified": 1655400192, + "narHash": "sha256-49OBVVRgb9H/PSmNT9W61+NRdDbuSJVuDDflwXlaUKU=", + "owner": "nixos", + "repo": "nixpkgs", + "rev": "3d7435c638baffaa826b85459df0fff47f12317d", + "type": "github" + }, + "original": { + "owner": "nixos", + "ref": "nixos-unstable", + "repo": "nixpkgs", + "type": "github" + } + }, + "nixpkgs_7": { + "locked": { + "lastModified": 1655481042, + "narHash": "sha256-XHbcywq2vIQ5CeH1OK3TN793jkiNAAZsSctS1PFgseo=", + "owner": "NixOS", + "repo": "nixpkgs", + "rev": "103a4c0ae46afa9cf008c30744175315ca38e9f9", + "type": "github" + }, + "original": { + "id": "nixpkgs", + "type": "indirect" + } + }, + "nixpkgs_8": { + "locked": { + "lastModified": 1655481042, + "narHash": "sha256-XHbcywq2vIQ5CeH1OK3TN793jkiNAAZsSctS1PFgseo=", + "owner": "NixOS", + "repo": "nixpkgs", + "rev": "103a4c0ae46afa9cf008c30744175315ca38e9f9", + "type": "github" + }, + "original": { + "id": "nixpkgs", + "type": "indirect" + } + }, + "nixpkgs_9": { + "locked": { + "lastModified": 1656549732, + "narHash": "sha256-eILutFZGjfk2bEzfim8S/qyYc//0S1KsCeO+OWbtoR0=", + "owner": "NixOS", + "repo": "nixpkgs", + "rev": "d3248619647234b5dc74a6921bcdf6dd8323eb22", + "type": "github" + }, + "original": { + "owner": "NixOS", + "ref": "nixpkgs-unstable", + "repo": "nixpkgs", + "type": "github" + } + }, + "parsec": { + "locked": { + "lastModified": 1635533376, + "narHash": "sha256-/HrG0UPGnI5VdkhrNrpDiM2+nhdL6lD/bqyGtYv0QDE=", + "owner": "nprindle", + "repo": "nix-parsec", + "rev": "1bf25dd9c5de1257a1c67de3c81c96d05e8beb5e", + "type": "github" + }, + "original": { + "owner": "nprindle", + "repo": "nix-parsec", + "type": "github" + } + }, + "ps-tools": { + "inputs": { + "deadnix": "deadnix_2", + "make-shell": "make-shell_2", + "nixpkgs": "nixpkgs_9", + "utils": "utils_3" + }, + "locked": { + "lastModified": 1658374818, + "narHash": "sha256-WxbQ/BR4Ep8tBbaOikXechspyZlvwfL5XNmRNEnaOFo=", + "owner": "purs-nix", + "repo": "purescript-tools", + "rev": "c0f887f60ea2331dfdc5b0e8be2e732976887345", + "type": "github" + }, + "original": { + "owner": "purs-nix", + "repo": "purescript-tools", + "type": "github" + } + }, + "purs-nix": { + "inputs": { + "deadnix": "deadnix", + "docs-search": "docs-search", + "get-flake": "get-flake", + "make-shell": "make-shell", + "nixpkgs": "nixpkgs_5", + "parsec": "parsec", + "ps-tools": "ps-tools", + "statix": "statix", + "utils": "utils_4" + }, + "locked": { + "lastModified": 1674243319, + "narHash": "sha256-o39rBVSNqchahHrMYNixdlasDro8omlf/n7yQZsdNI8=", + "owner": "purs-nix", + "repo": "purs-nix", + "rev": "2b7761ffaded363d0d00afe320350cc5c9ee9012", + "type": "github" + }, + "original": { + "owner": "purs-nix", + "ref": "ps-0.15", + "repo": "purs-nix", + "type": "github" + } + }, + "root": { + "inputs": { + "nixpkgs": "nixpkgs", + "ps-tools": [ + "purs-nix", + "ps-tools" + ], + "purs-nix": "purs-nix", + "utils": "utils_5" + } + }, + "rust-analyzer-src": { + "flake": false, + "locked": { + "lastModified": 1655507737, + "narHash": "sha256-o+AqNsjL6o2RHh4InZHQVpkmqg570YFJL4Db8mKq+fs=", + "owner": "rust-lang", + "repo": "rust-analyzer", + "rev": "12dd81092e37df28b7a3591cae9675e668927198", + "type": "github" + }, + "original": { + "owner": "rust-lang", + "ref": "nightly", + "repo": "rust-analyzer", + "type": "github" + } + }, + "rust-analyzer-src_2": { + "flake": false, + "locked": { + "lastModified": 1655507737, + "narHash": "sha256-o+AqNsjL6o2RHh4InZHQVpkmqg570YFJL4Db8mKq+fs=", + "owner": "rust-lang", + "repo": "rust-analyzer", + "rev": "12dd81092e37df28b7a3591cae9675e668927198", + "type": "github" + }, + "original": { + "owner": "rust-lang", + "ref": "nightly", + "repo": "rust-analyzer", + "type": "github" + } + }, + "rust-analyzer-src_3": { + "flake": false, + "locked": { + "lastModified": 1645205556, + "narHash": "sha256-e4lZW3qRyOEJ+vLKFQP7m2Dxh5P44NrnekZYLxlucww=", + "owner": "rust-analyzer", + "repo": "rust-analyzer", + "rev": "acf5874b39f3dc5262317a6074d9fc7285081161", + "type": "github" + }, + "original": { + "owner": "rust-analyzer", + "ref": "nightly", + "repo": "rust-analyzer", + "type": "github" + } + }, + "statix": { + "inputs": { + "fenix": "fenix_3", + "gitignore": "gitignore", + "nixpkgs": "nixpkgs_10" + }, + "locked": { + "lastModified": 1657460333, + "narHash": "sha256-5o6zMBASEsGKtjKDb3SizJnN9A7qpOcbzWBXsacfMyc=", + "owner": "nerdypepper", + "repo": "statix", + "rev": "6422c959d365dee2fda5eda8858fefad31f17b25", + "type": "github" + }, + "original": { + "owner": "nerdypepper", + "repo": "statix", + "type": "github" + } + }, + "utils": { + "locked": { + "lastModified": 1653893745, + "narHash": "sha256-0jntwV3Z8//YwuOjzhV2sgJJPt+HY6KhU7VZUL0fKZQ=", + "owner": "numtide", + "repo": "flake-utils", + "rev": "1ed9fb1935d260de5fe1c2f7ee0ebaae17ed2fa1", + "type": "github" + }, + "original": { + "owner": "numtide", + "repo": "flake-utils", + "type": "github" + } + }, + "utils_2": { + "locked": { + "lastModified": 1653893745, + "narHash": "sha256-0jntwV3Z8//YwuOjzhV2sgJJPt+HY6KhU7VZUL0fKZQ=", + "owner": "numtide", + "repo": "flake-utils", + "rev": "1ed9fb1935d260de5fe1c2f7ee0ebaae17ed2fa1", + "type": "github" + }, + "original": { + "owner": "numtide", + "repo": "flake-utils", + "type": "github" + } + }, + "utils_3": { + "inputs": { + "flake-utils": "flake-utils" + }, + "locked": { + "lastModified": 1656044990, + "narHash": "sha256-f01BB7CaOyntOab9XnpH9HD63rGcnu2iyL4M2ubs5F8=", + "owner": "ursi", + "repo": "flake-utils", + "rev": "f53b674a2c90f6202a2f4cd491aba121775490b5", + "type": "github" + }, + "original": { + "owner": "ursi", + "ref": "8", + "repo": "flake-utils", + "type": "github" + } + }, + "utils_4": { + "inputs": { + "flake-utils": "flake-utils_2" + }, + "locked": { + "lastModified": 1656044990, + "narHash": "sha256-f01BB7CaOyntOab9XnpH9HD63rGcnu2iyL4M2ubs5F8=", + "owner": "ursi", + "repo": "flake-utils", + "rev": "f53b674a2c90f6202a2f4cd491aba121775490b5", + "type": "github" + }, + "original": { + "owner": "ursi", + "ref": "8", + "repo": "flake-utils", + "type": "github" + } + }, + "utils_5": { + "locked": { + "lastModified": 1667395993, + "narHash": "sha256-nuEHfE/LcWyuSWnS8t12N1wc105Qtau+/OdUAjtQ0rA=", + "owner": "numtide", + "repo": "flake-utils", + "rev": "5aed5285a952e0b949eb3ba02c12fa4fcfef535f", + "type": "github" + }, + "original": { + "owner": "numtide", + "repo": "flake-utils", + "type": "github" + } + } + }, + "root": "root", + "version": 7 +} diff --git a/purescript/switcheroo/flake.nix b/purescript/switcheroo/flake.nix index a9ec6f4..40298d6 100644 --- a/purescript/switcheroo/flake.nix +++ b/purescript/switcheroo/flake.nix @@ -20,6 +20,10 @@ [ console effect prelude + exists + tuples + indexed-monad + aff ]; dir = ./.; diff --git a/purescript/switcheroo/src/Main.purs b/purescript/switcheroo/src/Main.purs index 12e5609..fabbb83 100644 --- a/purescript/switcheroo/src/Main.purs +++ b/purescript/switcheroo/src/Main.purs @@ -2,8 +2,119 @@ module Main where import Prelude +import Control.Monad.Indexed.Qualified as Ix +import Data.Array as Array +import Data.Identity (Identity(..)) import Effect (Effect) import Effect.Console (log) +import Safe.Coerce (coerce) +import Swictheroo.Stream (Finished, Producer, constantProducer, runConsumeM_) +import Swictheroo.Stream as Stream + +type Producers m = + { download :: Producer m Int + , reportDownload :: Producer m String + , ping :: Producer m Boolean + } + +program :: forall m. Monad m => Producers m -> Finished m String Unit +program producers = Ix.do + Stream.replace producers.download + a <- Stream.pull + + Stream.replace producers.reportDownload + b <- Stream.pull + + Stream.replace producers.ping + c <- Stream.pull + + Stream.terminate $ Array.fold + [ "Download: " + , show a + , ", Report: " + , show b + , ", Ping: " + , show c + ] + +{- +Pseudo-code for testing: + +data SyntheticEvent = Download Int | Report String | Ping Int + +producers = + { download: case _ of + Download s -> Just s + _ -> Nothing + , report: ... + , ping: ... + } + +story = + [ Emit (Download 7) + , Expect Cancel + , Emit (Report 7) + , Expect Cancel + , Emit (Ping 7) + , Expect Cancel + ] +-} + +{- How do we cancel our program? + +The program would require an extra producer called "cancel". +Every time we call "Stream.replace", we would merge "cancel" with +the respective new stream. This way, when we call "Stream.pull" we can +either receive a "Left" (which means we must cancel) or a "Right" +(which means we have gotten our value and can keep going). +-} + +{- All async effects should be avoided outside the internals of our monad! + +Eg: do + a <- lift $ Aff.wait ... + +This is *bad*, because it means we cannot receive a cancellation event. + +A better idea would be: +do + Stream.mapSource (\old -> merge old (fromAff (Aff.wait ...))) + + result <- Stream.pull + + case result of + -- Our effect finished running! + Left effectResult -> do + -- Return to old producer + Stream.mapSource Either.hush + -- Do stuff with the result of the effect + ... + + -- Try again later! + Right otherResult -> do + -- Handle the other result + ... + +This does not block the other events at any point! +Although this might look complicated, it's possible to create helpers for +common patterns like this one! + +Note that in an actual production codebase we would receive + (fromAff (Aff.wait ...)) from the exterior in order to + allow mocking and to not tie ourselves to a specific monad. +-} + +{- One could thing about this approach is +that simple state can be kept without the need for StateT! +(because we can simply pass around values) +-} main :: Effect Unit -main = log "❄" +main = log (coerce result) + where + result :: Identity _ + result = runConsumeM_ $ program + { download: constantProducer 3 + , reportDownload: constantProducer "foo" + , ping: constantProducer true + } diff --git a/purescript/switcheroo/src/Stream.purs b/purescript/switcheroo/src/Stream.purs new file mode 100644 index 0000000..708161a --- /dev/null +++ b/purescript/switcheroo/src/Stream.purs @@ -0,0 +1,192 @@ +module Swictheroo.Stream where + +import Prelude + +import Control.Alt (class Alt) +import Control.Alternative (class Plus, (<|>)) +import Control.Applicative.Indexed (class IxApplicative, class IxApply, class IxFunctor) +import Control.Bind.Indexed (class IxBind) +import Control.Monad.Indexed (class IxMonad, iap) +import Control.Parallel (parOneOf, parSequence_) +import Data.Either (Either(..)) +import Data.Maybe (Maybe(..)) +import Data.Tuple.Nested (type (/\), (/\)) +import Effect.Aff (Aff, never) + +-- | A producer can: +-- | - Produce values +-- | - Destroy itself +-- | +-- | A producer does this by reacting to input events +-- | in order to satisfy continuations. +newtype Producer m a = + Producer (forall c. Monad m => ProducerEvent m c a -> m c) + +data ProducerEvent m c a + -- | Producers advance to a new version once they produce a value. + -- | This allows pure producers to exist. For example, a pure producer + -- | could hold an array of events, return the first one, and then create + -- | a new producer from the tail of the array + = Produce (a -> Producer m a -> c) + | Destroy c + +produce :: forall m i. Monad m => Producer m i -> m (i /\ Producer m i) +produce (Producer producer) = producer (Produce (/\)) + +destroyProducer :: forall m i. Monad m => Producer m i -> m Unit +destroyProducer (Producer producer) = producer (Destroy unit) + +constantProducer :: forall m a. Monad m => a -> Producer m a +constantProducer value = Producer case _ of + Destroy c -> pure c + Produce next -> pure (next value (constantProducer value)) + +unitProducer :: forall m. Monad m => Producer m Unit +unitProducer = constantProducer unit + +filterMapProducer :: forall a b m. Monad m => (a -> Maybe b) -> Producer m a -> Producer m b +filterMapProducer f producer = Producer case _ of + Destroy c -> destroyProducer producer $> c + Produce next -> loop producer + where + -- Keeps producing values until one matches the given predicate! + loop producer = do + value /\ producer' <- produce producer + case f value of + Nothing -> loop producer' + Just updated -> pure (next updated (filterMapProducer f producer')) + + +-- | Type parameter explanation: +-- | - m = underlying monad +-- | - t = type we want to end on +-- | - i = what the producer present when +-- | the computation *starts* needs to produce +-- | - o = what the producer present when +-- | the computation *ends* needs to produce +-- | - a = result of the computation +-- | +-- | This monad encapsulates the followin 3 operations: +-- | - Basic continuations (we have a type we want to terminate on) +-- | - Consuming values from a producer +-- | - Cancelling + replacing the current producer with a different one. +newtype ConsumeM m t i o a = ConsumeM + ( forall c + . { producer :: Producer m i + , continue :: Producer m o -> a -> m c + , terminate :: t -> m c + } + -> m c + ) + +-- | A ConsumeM computation that has finshed running +type Finished m t i = ConsumeM m t i Void Void + +pull :: forall m t i. Monad m => ConsumeM m t i i i +pull = ConsumeM \inputs -> do + i /\ producer' <- produce inputs.producer + inputs.continue producer' i + +replace :: forall m t i o. Monad m => Producer m o -> ConsumeM m t i o Unit +replace producer' = ConsumeM \inputs -> do + destroyProducer inputs.producer + inputs.continue producer' unit + +terminate :: forall m t i. Monad m => t -> Finished m t i +terminate result = ConsumeM \inputs -> do + destroyProducer inputs.producer + inputs.terminate result + +lift :: forall m t i a. Monad m => m a -> ConsumeM m t i i a +lift computation = ConsumeM \inputs -> do + result <- computation + inputs.continue inputs.producer result + +producer :: forall m t i. Monad m => ConsumeM m t i i (Producer m i) +producer = ConsumeM \inputs -> inputs.continue inputs.producer inputs.producer + +mapSource :: forall m t i o. Monad m => (Producer m i -> Producer m o) -> ConsumeM m t i o Unit +mapSource f = ConsumeM \inputs -> do + let new = f inputs.producer + inputs.continue new unit + +runConsumeM :: forall m i t. Monad m => Producer m i -> Finished m t i -> m t +runConsumeM producer (ConsumeM run) = run + { producer + , terminate: pure + , continue: \producer value -> absurd value -- not eta-reduced for clarity + } + +runConsumeM_ :: forall m t. Monad m => Finished m t Unit -> m t +runConsumeM_ = runConsumeM unitProducer + +---------- Typeclass instances +instance Monad m => Functor (Producer m) where + map f (Producer producer) = Producer case _ of + Destroy c -> producer (Destroy c) + Produce next -> producer $ Produce \a p -> next (f a) (map f p) + +instance Monad m => IxFunctor (ConsumeM m t) where + imap f (ConsumeM consumer) = ConsumeM + \inputs -> consumer + { producer: inputs.producer + , terminate: inputs.terminate + , continue: \producer a -> inputs.continue producer (f a) + } + +instance Monad m => IxApply (ConsumeM m t) where + iapply = iap + +instance Monad m => IxApplicative (ConsumeM m t) where + ipure a = ConsumeM \inputs -> inputs.continue inputs.producer a + +instance Monad m => IxBind (ConsumeM m t) where + ibind (ConsumeM consumer) f = ConsumeM + \inputs -> do + consumerResult <- consumer + { producer: inputs.producer + , terminate: \result -> pure (Left result) + , continue: \a b -> pure (Right (a /\ b)) + } + + case consumerResult of + -- Terminated + Left final -> inputs.terminate final + + -- Kept going + Right (producer' /\ result) -> do + let (ConsumeM consumer') = f result + + consumer' + { producer: producer' + , continue: inputs.continue + , terminate: inputs.terminate + } + +instance Monad m => IxMonad (ConsumeM m t) + +---------- Merge producers +type AffProducer = Producer Aff + +instance Alt AffProducer where + alt first second = Producer case _ of + Produce continue -> ado + result <- parOneOf + [ produce first <#> Left + , produce second <#> Right + ] + in + case result of + Left (result /\ first') -> continue result (first' <|> second) + Right (result /\ second') -> continue result (first <|> second') + + Destroy continue -> ado + parSequence_ + [ destroyProducer first + , destroyProducer second + ] + in continue + +instance Plus AffProducer where + empty = Producer \_ -> never +