Parallelism on divide & conquer algorithm

I'm facing problems to make my code runs in parallel. It is a 3D Delaunay generator using a divide & conquer algorithm named DeWall.

The main function is:

deWall::[SimplexPointer] -> SetSimplexFace -> Box -> StateT DeWallSets IO ([Simplex], [Edge])
deWall p afl box = do
   ...
   ...
   get >>= recursion box1 box2 p1 p2 sigma edges
   ...
   ...

It calls the "recursion" function that might call the dewall function back. And it is here where the parallization opportunity appears. The following code shows the sequential solution.

recursion::Box -> Box -> [SimplexPointer] -> [SimplexPointer] -> [Simplex] -> [Edge] -> DeWallSets -> StateT DeWallSets IO ([Simplex], [Edge])    
recursion box1 box2 p1 p2 sigma edges deWallSet
        | null afl1 && null afl2 = return (sigma, edges)
        | (null) afl1 = do
            (s, e) <- deWall p2 afl2 box2
            return (s ++ sigma, e ++ edges)
        | (null) afl2 = do
            (s,e) <- deWall p1 afl1 box1
            return (s ++ sigma, e ++ edges)
        | otherwise   = do
            x <- get
            liftIO $ do
                (s1, e1) <- evalStateT (deWall p1 afl1 box1) x
                (s2, e2) <- evalStateT (deWall p2 afl2 box2) x
                return (s1 ++ s2 ++ sigma, e1 ++ e2 ++ edges)

        where   afl1 = aflBox1 deWallSet
                afl2 = aflBox2 deWallSet

State and IO monads are used to pipe the state and to generate UID for each tetrahedron found using MVar's. My first attempt was to add a forkIO but it doesn't work. It gives a wrong output due a lack of control during the merge part that doesn't wait for both threads to finish. I don't know how to make it wait for them.

            liftIO $ do
                let 
                    s1 = evalStateT (deWall p1 afl1 box1) x
                    s2 = evalStateT (deWall p2 afl2 box2) x
                    concatThread var (a1, b1) = takeMVar var >>= (a2, b2) -> putMVar var (a1 ++ a2, b1 ++ b2)
                mv <- newMVar ([],[])
                forkIO (s1 >>= concatThread mv)
                forkIO (s2 >>= concatThread mv)
                takeMVar mv >>= (s, e) -> return (s ++ sigma, e ++ edges)

So, my next attempt was to use a better parallel strategy "par" and "pseq" which gives the right result but no parallel execution according to threadScope.

        liftIO $ do
            let
                s1 = evalStateT (deWall p1 afl1 box1) x
                s2 = evalStateT (deWall p2 afl2 box2) x
                conc = liftM2 ((a1, b1) (a2, b2) -> (a1 ++ a2, b1 ++ b2))
            (stotal, etotal) = s1 `par` (s2 `pseq` (s1 `conc` s2))
            return (stotal ++ sigma, etotal ++ edges)

What am I doing wrong?

UPDATE : Somehow this problem seems to be related with the presence of IO monads. In an other (old) version with no IO monad, only State monad, the parallel execution runs with 'par' and 'pseq' . The GHC -sstderr gives SPARKS: 1160 (69 converted, 1069 pruned) .

recursion::Box -> Box -> [SimplexPointer] -> [SimplexPointer] -> [Simplex] -> [Edge] -> DeWallSets -> State DeWallSets ([Simplex], [Edge])  
recursion p1 p2 sigma deWallSet
     | null afl1 && null afl2 = return sigma
     | (null) afl1 = do
         s <- deWall p2 afl2 box2
         return (s ++ sigma)
     | (null) afl2 = do
         s <- deWall p1 afl1 box1
         return (s ++ sigma)
     | otherwise   = do
                     x <- get
                     let s1 = evalState (deWall p1 afl1 box1) x
                     let s2 = evalState (deWall p2 afl2 box2) x
                     return $ s1 `par` (s2 `pseq` (s1 ++ s2 ++ sigma))
     where   afl1 = aflBox1 deWallSet
             afl2 = aflBox2 deWallSet

Cloud someone explain that?


The easiest way to make this work would be use something like:

liftIO $ do
            let 
                s1 = evalStateT (deWall p1 afl1 box1) x
                s2 = evalStateT (deWall p2 afl2 box2) x
            mv1 <- newMVar ([],[])
            mv2 <- newMVar ([],[])
            forkIO (s1 >>= putMVar mv1)
            forkIO (s2 >>= putMVar mv2)
            (a1,b1) <- takeMVar mv1
            (a2,b2) <- takeMVar mv2
            return (a1++a2++sigma, b1++b2++edges)

This works, but there's some unnecessary overhead. A better solution is:

liftIO $ do
            let 
                s1 = evalStateT (deWall p1 afl1 box1) x
                s2 = evalStateT (deWall p2 afl2 box2) x
            mv <- newMVar ([],[])
            forkIO (s2 >>= putMVar mv2)
            (a1,b1) <- s1
            (a2,b2) <- takeMVar mv2
             return (a1++a2++sigma, b1++b2++edges)

Or possible this if the results aren't being evaluated where you'd like them to be:

liftIO $ do
        let 
            s1 = evalStateT (deWall p1 afl1 box1) x
            s2 = evalStateT (deWall p2 afl2 box2) x
        mv <- newMVar ([],[])
        forkIO (s2 >>= evaluate >>= putMVar mv2)
        (a1,b1) <- s1
        (a2,b2) <- takeMVar mv2
         return (a1++a2++sigma, b1++b2++edges)

(these are answers that I gave to the poster in #haskell that I thought would be useful here as well)

Edit: removed unnecessary evaluate.


Use of par and pseq should occur on the "execution path", ie, not inside a local let . Try this (modify your last snippet)

let s1 = ...
    s2 = ...
    conc = ...
case s1 `par` (s2 `pseq` (s1 `conc` s2)) of
  (stotal, etotal) ->
     return (stotal ++ sigma, etotal ++ edges)

A case forces evaluation of its argument to weak head normal form (WHNF) before continuing in one of its branches. WHNF means that the argument is evaluated until the outermost constructor is visible. Fields may still be unevaluated.

To force full evaluation of an argument use deepseq . Be careful with that, though, because deepseq can sometimes make things slower by doing too much work.

A more lightweight approach to adding strictness is to make fields strict:

data Foo = Foo !Int String

Now, whenever a value of type Foo is evaluated to WHNF, so is its first argument (but not the second one).


If you want to stick with explicit threads, rather than pseq, as you noted, you need some way to wait for the worker threads to end. That's a great use-case for a quantity semaphore. After you divide up the work to be done, have each worker thread, on termination, signal the semaphore with how much work it has done.

Then wait for all the units of work to be completed.

http://www.haskell.org/ghc/docs/6.8.3/html/libraries/base/Control-Concurrent-QSemN.html

Edit: some pseudocode to help explain the notion

do
 let workchunks :: [(WorkChunk, Size)]
     workchunks = dividework work

  let totalsize = sum $ map snd workchunks

 sem <- newQSem 0

 let forkworkThread (workchunk, size) = do
        executeWorkChunk workchunk
        signalQSem size

 mapM_ forkWorkThread workchunks
 waitQSem totalsize

 -- now all your work is done.
链接地址: http://www.djcxy.com/p/68026.html

上一篇: 在C编程中的rand()问题?

下一篇: 分治算法的并行性