I have this mutable array in an ST monad. And I have this loop function.
runST $ do
myarray <- newMArray (Sz 10) 0
loopM_ 0 (<10) ( 1) (\j ->
loopM_ 0 (<10) ( 1) (\i ->
when (mytruthcheck j i)
(modifyM_ myarray (pure . ( 1)) ((funcofji j i) :: Int)
)))
I want to use forkST_ to run the outer loop in parallel like this.
runST $ do
myarray <- newMArray (Sz 10) 0
loopM_ 0 (<10) ( 1) (\j ->
void (forkST_ (loopM_ 0 (<10) ( 1) (\i ->
when (mytruthcheck j i)
(Data.Massiv.Array.Mutable.modifyM_
myarray (pure . ( 1)) ((funcofji j i) :: Int)
))))
But I’m guessing this will cause thread collisions but I don’t really know although I do know it is possible that funcofji can output the same value for different values of j and therefore the loop can modify the same index of myarray for different j s. Is there a way to ensure this is done atomically or is that already the case?
Btw here’s the loopM_ function
loopM_ :: Monad m => Int -> (Int -> Bool) -> (Int -> Int) -> (Int -> m a) -> m ()
loopM_ !init' condition increment f = go init'
where
go !step
| condition step = f step >> go (increment step)
| otherwise = pure ()
CodePudding user response:
As it was mentioned in the comments, atomic modification is only useful for concurrency which doesn't look like what is needed here. What you need is parallelism., which is available in massiv for Ints: atomicAddIntArray
There is also a builtin way to do parallelism in massiv very efficiently, so there is definitely no need to reinvent the wheel:
createArray_ Par (Sz 10) $ \scheduler myarray ->
loopM_ 0 (<10) ( 1) $ \j ->
loopM_ 0 (<10) ( 1) $ \i ->
when (mytruthcheck j i) $
scheduleWork_ scheduler $
void $ atomicAddIntArray myarray ((funcofji j i) :: Int) 1
Also don't lie to yourself, ST (state thread) was not build for multi-threading, use IO instead. However, if you can guarantee, that despite multi-threaded setup, the outcome that is produced is still deterministic in the end, then it is ok to use unsafePerformIO.
Edit
I just noticed this comment:
The loop size of j is close to 100,000 and the loop size of i is close to 1 Billion.
Which makes me believe that it will be better to parallelize it in this way instead:
createArray_ Par (Sz 10) $ \scheduler myarray ->
iforSchedulerM_ scheduler (0 ..: (100000 :. 1000000000)) $ \_ (j :. i) ->
when (mytruthcheck j i) $
void $ atomicAddIntArray myarray ((funcofji j i) :: Int) 1
This will ensure that you only schedule a few jobs, instead of billions. Check out iforSchedulerM_ implementation in order to customize it parallelization if you have more insight into your per i and j workload.
CodePudding user response:
As a result of the discussion in the comments, I wrote a prototype of how this could be, maybe it will be useful (I didn't try to compile so there are maybe some type/syntax errors).
runST $ do
let arrsz = 10 :: Int -- depends on codomain of funcofji
let ncaps = 8 :: Int64 -- see also getNumCapabilities
let outerLoopSize = 10^5 :: Int64
let innerLoopSize = 10^12 :: Int64
let chunksz = outerLoopSize `div` ncaps
sync <- newEmptyMVar
forM_ [0 .. ncaps - 1] $ \k -> forkST_ $ do
localArr <- newMArray (Sz arrsz) 0
forM_ [k * chunksz .. min outerLoopSize ((k 1) * chunksz) - 1] $ \j -> do
forM_ [0 .. innerLoopSize - 1] $ \i -> do
when (mytruthcheck j i) $
modifyM_ localArr (pure . ( 1)) $ funcofji j i
putMVar sync localArr
resultArr <- takeMVar sync
replicateM_ (ncaps - 1) $ do
localArr <- takeMVar sync
forM_ [0 .. arrsz - 1] $ do \ix ->
elm <- readM localArr ix
modifyM_ resultArr (pure . ( elm)) ix
...
