Thursday, July 30, 2015

Parallel/Pipelined Conduit

Summary: I wrote a Conduit combinator which makes the upstream and downstream run in parallel. It makes Hoogle database generation faster.

The Hoogle database generation parses lines one-by-one using haskell-src-exts, and then encodes each line and writes it to a file. Using Conduit, that ends up being roughly:

parse =$= write

Conduit ensures that parsing and writing are interleaved, so each line is parsed and written before the next is parsed - ensuring minimal space usage. Recently the FP Complete guys profiled Hoogle database generation and found each of these pieces takes roughly the same amount of time, and together are the bottleneck. Therefore, it seems likely that if we could parse the next line while writing the previous line we should be able to speed up database generation. I think of this as analogous to CPU pipelining, where the next instruction is decoded while the current one is executed.

I came up with the combinator:

 pipelineC :: Int -> Consumer o IO r -> Consumer o IO r

Allowing us to write:

 parse =$= pipelineC 10 write

Given a buffer size 10 (the maximum number of elements in memory simultaneously), and a Consumer (write), produce a new Consumer which is roughly the same but runs in parallel to its upstream (parse).

The Result

When using 2 threads the Hoogle 5 database creation drops from 45s to 30s. The CPU usage during the pipelined stage hovers between 180% and 200%, suggesting the stages are quite well balanced (as the profile suggested). The parsing stage is currently a little slower than the writing, so a buffer of 10 is plenty - increasing the buffer makes no meaningful difference. The reason the drop in total time is only by 33% is that the non-pipelined steps (parsing Cabal files, writing summary information) take about 12s.

Note that Hoogle 5 remains unreleased, but can be tested from the git repo and will hopefully be ready soon.

The Code

The idea is to run the Consumer on a separate thread, and on the main thread keep pulling elements (using await) and pass them to the other thread, without blocking the upstream yield. The only tricky bit is what to do with exceptions. If the consumer thread throws an exception we have to get that back to the main thread so it can be dealt with normally. Fortunately async exceptions fit the bill perfectly. The full code is:

pipelineC :: Int -> Consumer o IO r -> Consumer o IO r
pipelineC buffer sink = do
    sem <- liftIO $ newQSem buffer  -- how many are in flow, to avoid excess memory
    chan <- liftIO newChan          -- the items in flow (type o)
    bar <- liftIO newBarrier        -- the result type (type r)
    me <- liftIO myThreadId
    liftIO $ flip forkFinally (either (throwTo me) (signalBarrier bar)) $ do
        runConduit $
            (whileM $ do
                x <- liftIO $ readChan chan
                liftIO $ signalQSem sem
                whenJust x yield
                return $ isJust x) =$=
            sink
    awaitForever $ \x -> liftIO $ do
        waitQSem sem
        writeChan chan $ Just x
    liftIO $ writeChan chan Nothing
    liftIO $ waitBarrier bar

We are using a channel chan to move elements from producer to consumer, a quantity semaphore sem to limit the number of items in the channel, and a barrier bar to store the return result (see about the barrier type). On the consumer thread we read from the channel and yield to the consumer. On the main thread we awaitForever and write to the channel. At the end we move the result back from the consumer thread to the main thread. The full implementation is in the repo.

Enhancements

I have specialised pipelineC for Consumers that run in the IO monad. Since the Consumer can do IO, and the order of that IO has changed, it isn't exactly equivalent - but relying on such IO timing seems to break the spirit of Conduit anyway. I suspect pipelineC is applicable in some other moands, but am not sure which (ReaderT and ResourceT seem plausible, StateT seems less likely).

Acknowledgements: Thanks to Tom Ellis for helping figure out what type pipelineC should have.

Sunday, July 19, 2015

Thoughts on Conduits

Summary: I'm growing increasingly fond of the Conduit library. Here I give my intuitions and some hints I'd have found useful.

Recently I've been working on converting the Hoogle database generation to use the Conduit abstraction, in an effort to reduce the memory and improve the speed. It worked - database generation has gone from 2Gb of RAM to 320Mb, and time has dropped from several minutes (or > 15 mins on memory constrained machines) to 45s. These changes are all in the context of Hoogle 5, which should hopefully be out in a month or so.

The bit that I've converted to Conduit is something that takes in a tarball of one text files per Hackage file, namely the Haddock output with one definition per line (this 22Mb file). It processes each definition, saves it to a single binary file (with compression and some processing), and returns some compact information about the definition for later processing. I don't expect the process to run in constant space as it is accumulating some return information, but it is important that most of the memory used by one definition is released before the next definition. I originally tried lazy IO, and while it somewhat worked, it was hard to abstract properly and very prone to space leaks. Converting to Conduit was relatively easy and is simpler and more robust.

The Conduit model

My mental model for a conduit Conduit a m b is roughly a function [a] -> m [b] - a values go in and b values come out (but interleaved with the monadic actions). More concretely you ask for an a with await and give back a b with yield, doing stuff in the middle in the m Monad.

A piece of conduit is always either running (doing it's actual work), waiting after a yield for the downstream person to ask for more results (with await), or waiting after an await for the upstream person to give the value (with yield). You can think of a conduit as making explicit the demand-order inherent in lazy evaluation.

Things to know about Conduit

I think it's fair to say Conduit shows its history - this is good for people who have been using it for a while (your code doesn't keep breaking), but bad for people learning it (there are lots of things you don't care about). Here are some notes I made:

  • The Data.Conduit module in the conduit library is not the right module to use - it seems generally accepted to use the Conduit module from the conduit-combinators package. However, I decided to build my own conduit-combinators style replacement in the Hoogle tree, see General.Conduit - the standard Conduit module has a lot of dependencies, and a lot of generalisations.
  • Don't use Source or Sink - use Producer and Consumer - the former are just a convenient way to get confusing error messages.
  • Don't use =$ or $=, always use =$= between conduits. The =$= operator is essentially flip (.).
  • Given a Conduit you can run it with runConduit. Alternatively, given a =$= b =$= c you can replace any of the =$= with $$ to run the Conduit as well. I find that a bit ugly, and have stuck to runConduit.
  • Conduit and ConduitM have their type arguments in different orders, which is just confusing. However, generally I use either Conduit (a connector) or Producer (something with a result). You rarely need something with a result and a return value.
  • You call await to see if a value is available to process. The most common bug I've had with conduits is forgetting to make the function processing items recursive - usually you want awaitForever, not just await.
  • The ByteString lines Conduit function was accidentally O(n^2) - I spotted and fixed that. Using difference lists does not necessarily make your code O(n)!

Useful functions

When using Conduit I found a number of functions seemingly missing, so defined them myself.

First up is countC which counts the number of items that are consumed. Just a simple definition on top of sumC.

countC :: (Monad m, Num c) => Consumer a m c
countC = sumC <| mapC (const 1)

While I recommend awaitForever in preference to await, it's occasionally useful to have awaitJust as the single-step awaitForever, if you are doing your own recursion.

awaitJust :: Monad m => (i -> Conduit i m o) -> Conduit i m o
awaitJust act = do
    x <- await
    whenJust x act

I regularly find zipFrom i = zip [i..] very useful in strict languages, and since Conduit can be viewed as a strict version of lazy lists (through very foggy glasses) it's no surprise a Conduit version is also useful.

zipFromC :: (Monad m, Enum c) => c -> Conduit a m (c, a)
zipFromC !i = awaitJust $ \a -> do
    yield (i,a)
    zipFromC (succ i)

Finally, it's useful to zip two conduits. I was surprised how fiddly that was with the standard operators (you have to use newtype wrappers and an Applicative instance), but a simple |$| definition hides that complexity away.

(|$|) :: Monad m => ConduitM i o m r1 -> ConduitM i o m r2 -> ConduitM i o m (r1,r2)
(|$|) a b = getZipConduit $ (,) <$> ZipConduit a <*> ZipConduit b

Why not Pipes?

I am curious if all Pipes users get asked "Why not use Conduit?", or if this FAQ is asymmetrical?

I realise pipes are billed as the more "principled" choice for this type of programming, but I've yet to see anywhere Conduit seems fundamentally unprincipled. I use WAI/Warp and http-conduit, so learning Conduit gives me some help there.