diff --git a/packages/status-script/exe/Main.hs b/packages/status-script/exe/Main.hs index c519446f..297c7283 100644 --- a/packages/status-script/exe/Main.hs +++ b/packages/status-script/exe/Main.hs @@ -308,3 +308,62 @@ processNotifications = notificationBlockList = ["Automatic suspend", "Auto suspend"] diffIsSmall = \pathA pathB -> (== "[]") <$> (nix_diff "--json" [pathA, pathB] |> jq ".inputsDiff.inputDerivationDiffs" |> captureTrim) + +{- +import Data.IntMap.Strict qualified as IntMap +import Effectful (Eff, IOE, (:>)) +import Effectful qualified as Eff +import Relude +import Streamly.Data.Fold qualified as Fold +import Streamly.Data.Stream.Prelude (Stream, MonadAsync) +import Streamly.Data.Stream.Prelude qualified as Stream +import Streamly.Internal.Data.Stream.Time qualified as Time +import Prelude () +import Control.Monad.Catch (MonadCatch) + +type SEff es a = Stream (Eff es) a + +sourceEvent :: IOE :> es => SEff es Int +sourceEvent = + Time.ticks 1 + & Stream.indexed + & fmap fst + +-- | Drain first stream exactly once, second stream as often as you want! +mirrorStream :: (MonadAsync m, MonadCatch m) => Stream m a -> m (Stream m a, Stream m a) +mirrorStream stream = do + emiters <- newIORef [] + pure + ( stream + & Stream.finally + ( do + emiters' <- readIORef emiters + forM_ emiters' \emiter -> emiter Nothing + ) + & Stream.parMapM + id + ( \x -> do + emiters' <- readIORef emiters + forM_ emiters' \emiter -> emiter (Just x) + pure x + ) + , Stream.fromCallback (\emit -> modifyIORef' emiters (emit :)) & Stream.takeWhile isJust & Stream.catMaybes + ) + +mkModules :: IOE :> es => [SEff es (Maybe a)] -> SEff es (IntMap a) +mkModules modules = + Stream.parList id (zipWith (\i m -> (\new_value -> IntMap.alter (const new_value) i) <$> m) [0 ..] modules) + & Stream.scan (Fold.foldl' (&) IntMap.empty) + & Stream.sampleIntervalEnd 0.05 + +main :: IO () +main = + Eff.runEff $ Eff.withUnliftStrategy (Eff.ConcUnlift Eff.Persistent Eff.Unlimited) do + (origStream, mirroredStream) <- mirrorStream sourceEvent + let eventA = origStream <&> \x -> Just ("A: " <> show x) + eventB = mirroredStream <&> \x -> Just ("B: " <> show x) + eventC = mirroredStream <&> \x -> Just ("C: " <> show x) + mkModules [eventA, eventB, eventC] + & Stream.mapM (putTextLn . show) + & Stream.fold Fold.drain + -}