Write your own stream processing library Part1
Tweetpipes and conduit are two competing libraries for handling stream data processing in Haskell. Though both libraries provide excellent tutorials on the usage of the libraries, the implementation details are impenetrable to most Haskell programmers.
The best way to understand how these streaming libraries work is to write a minimalistic version by ourselves. In this post, I will show you how we can write a small streaming data library with coroutine. Our implementation is based on Mario Blazevic’s excellent article Coroutine Pipelines.
Generator
Generator
is a monad transformer which allows the base monad to pause its computation and yield a value. This corresponds to Producer
of pipes or Source
of conduit.
{-# LANGUAGE LambdaCase #-}
import Control.Monad
import Control.Monad.Trans.Class
newtype Generator a m x =
Generator { bounceGen :: m (Either (a, Generator a m x) x) }
Generator a m x
represents a computation which yields values of type a
on top of the base monad m
and returns a value of type x
.
Either
indicates that Generator
has two cases:
(a, Generator a m x)
: A pair of a yielded value and a suspension to be resumed.x
: A return valuex
.
The enclosing m
allows us to perform monadic actions while running the generator.
The definition of Monad
instance for Generator
is as follows:
instance Monad m => Monad (Generator a m) where
return = Generator . return . Right
t >>= f = Generator $ bounceGen t
>>= \case Left (a, cont) -> return $ Left (a, cont >>= f)
Right x -> bounceGen (f x)
instance MonadTrans (Generator a) where
lift = Generator . liftM Right
yield :: Monad m => a -> Generator a m ()
yield a = Generator (return $ Left (a, return ()))
>>=
operator has two cases to consider. If t
is a suspension (Left
case), it yields a
and combines the remaining computation cont
with f
. If t
is a value x
(Right
case), it continues the computation by passing the value to f
. Once we define >>=
this way, the definition of yield
is straightforward. It yields a value and does nothing more.
To run a Generator
, we need runGenerator
function which collects the yielded values while executing the generator. run'
uses a difference list to collect yielded values and converts it to the normal list by applying []
at the end.
runGenerator :: Monad m => Generator a m x -> m ([a], x)
runGenerator = run' id where
run' f g = bounceGen g
>>= \case Left (a, cont) -> run' (f.(a:)) cont
Right x -> return (f [], x)
Now we are ready to create generators. triple
is a generator which yields the given value three times.
triple :: Monad m => a -> Generator a m ()
triple x = do
yield x
yield x
yield x
Running triple 3
returns ([3, 3, 3], ())
as expected.
λ> runGenerator $ triple 3
([3,3,3],())
When the base monad is IO
, we can interleave IO actions. For example, loop
yields the line input from the stdin until an empty string is read.
loop :: Generator String IO ()
loop = do
str <- lift getLine
when (str /= "") $ do
yield str
loop
λ> runGenerator loop
Hello
world!
(["Hello","world!"],())
It is even possible to mix two generators by alternating each generator.
alternate :: Monad m => Generator a m () -> Generator a m () -> Generator a m ()
alternate g1 g2 = Generator $ liftM2 go (bounceGen g1) (bounceGen g2)
where
go (Left (a, cont)) (Left (b, cont')) = Left (a, Generator $ return $ Left (b, alternate cont cont'))
go (Left (a, cont)) (Right _) = Left (a, cont)
go (Right _) (Left (b, cont)) = Left (b, cont)
go (Right _) (Right _) = Right ()
We can see that the outputs of triple 1
and triple 2
are intermingled.
λ> runGenerator $ alternate (triple 1) (triple 2)
([1,2,1,2,1,2],())
Part 2 of this post will continue the discussion with Iteratees.