module GHC.Event.Thread
( getSystemEventManager
, getSystemTimerManager
, ensureIOManagerIsRunning
, ioManagerCapabilitiesChanged
, threadWaitRead
, threadWaitWrite
, threadWaitReadSTM
, threadWaitWriteSTM
, closeFdWith
, threadDelay
, registerDelay
, blockedOnBadFD
) where
import Control.Exception (finally, SomeException, toException)
import Data.Foldable (forM_, mapM_, sequence_)
import Data.IORef (IORef, newIORef, readIORef, writeIORef)
import Data.Tuple (snd)
import Foreign.C.Error (eBADF, errnoToIOError)
import Foreign.C.Types (CInt(..), CUInt(..))
import Foreign.Ptr (Ptr)
import GHC.Base
import GHC.List (zipWith, zipWith3)
import GHC.Conc.Sync (TVar, ThreadId, ThreadStatus(..), atomically, forkIO,
labelThread, modifyMVar_, withMVar, newTVar, sharedCAF,
getNumCapabilities, threadCapability, myThreadId, forkOn,
threadStatus, writeTVar, newTVarIO, readTVar, retry,throwSTM,STM)
import GHC.IO (mask_, onException)
import GHC.IO.Exception (ioError)
import GHC.IOArray (IOArray, newIOArray, readIOArray, writeIOArray,
boundsIOArray)
import GHC.MVar (MVar, newEmptyMVar, newMVar, putMVar, takeMVar)
import GHC.Event.Control (controlWriteFd)
import GHC.Event.Internal (eventIs, evtClose)
import GHC.Event.Manager (Event, EventManager, evtRead, evtWrite, loop,
new, registerFd, unregisterFd_)
import qualified GHC.Event.Manager as M
import qualified GHC.Event.TimerManager as TM
import GHC.Num ((), (+))
import GHC.Real (fromIntegral)
import GHC.Show (showSignedInt)
import System.IO.Unsafe (unsafePerformIO)
import System.Posix.Types (Fd)
threadDelay :: Int -> IO ()
threadDelay usecs = mask_ $ do
mgr <- getSystemTimerManager
m <- newEmptyMVar
reg <- TM.registerTimeout mgr usecs (putMVar m ())
takeMVar m `onException` TM.unregisterTimeout mgr reg
registerDelay :: Int -> IO (TVar Bool)
registerDelay usecs = do
t <- atomically $ newTVar False
mgr <- getSystemTimerManager
_ <- TM.registerTimeout mgr usecs . atomically $ writeTVar t True
return t
threadWaitRead :: Fd -> IO ()
threadWaitRead = threadWait evtRead
threadWaitWrite :: Fd -> IO ()
threadWaitWrite = threadWait evtWrite
closeFdWith :: (Fd -> IO ())
-> Fd
-> IO ()
closeFdWith close fd = do
eventManagerArray <- readIORef eventManager
let (low, high) = boundsIOArray eventManagerArray
mgrs <- flip mapM [low..high] $ \i -> do
Just (_,!mgr) <- readIOArray eventManagerArray i
return mgr
mask_ $ do
tables <- flip mapM mgrs $ \mgr -> takeMVar $ M.callbackTableVar mgr fd
cbApps <- zipWithM (\mgr table -> M.closeFd_ mgr table fd) mgrs tables
close fd `finally` sequence_ (zipWith3 finish mgrs tables cbApps)
where
finish mgr table cbApp = putMVar (M.callbackTableVar mgr fd) table >> cbApp
zipWithM f xs ys = sequence (zipWith f xs ys)
threadWait :: Event -> Fd -> IO ()
threadWait evt fd = mask_ $ do
m <- newEmptyMVar
mgr <- getSystemEventManager_
reg <- registerFd mgr (\_ e -> putMVar m e) fd evt M.OneShot
evt' <- takeMVar m `onException` unregisterFd_ mgr reg
if evt' `eventIs` evtClose
then ioError $ errnoToIOError "threadWait" eBADF Nothing Nothing
else return ()
blockedOnBadFD :: SomeException
blockedOnBadFD = toException $ errnoToIOError "awaitEvent" eBADF Nothing Nothing
threadWaitSTM :: Event -> Fd -> IO (STM (), IO ())
threadWaitSTM evt fd = mask_ $ do
m <- newTVarIO Nothing
mgr <- getSystemEventManager_
reg <- registerFd mgr (\_ e -> atomically (writeTVar m (Just e))) fd evt M.OneShot
let waitAction =
do mevt <- readTVar m
case mevt of
Nothing -> retry
Just evt' ->
if evt' `eventIs` evtClose
then throwSTM $ errnoToIOError "threadWaitSTM" eBADF Nothing Nothing
else return ()
return (waitAction, unregisterFd_ mgr reg >> return ())
threadWaitReadSTM :: Fd -> IO (STM (), IO ())
threadWaitReadSTM = threadWaitSTM evtRead
threadWaitWriteSTM :: Fd -> IO (STM (), IO ())
threadWaitWriteSTM = threadWaitSTM evtWrite
getSystemEventManager :: IO (Maybe EventManager)
getSystemEventManager = do
t <- myThreadId
(cap, _) <- threadCapability t
eventManagerArray <- readIORef eventManager
mmgr <- readIOArray eventManagerArray cap
return $ fmap snd mmgr
getSystemEventManager_ :: IO EventManager
getSystemEventManager_ = do
Just mgr <- getSystemEventManager
return mgr
foreign import ccall unsafe "getOrSetSystemEventThreadEventManagerStore"
getOrSetSystemEventThreadEventManagerStore :: Ptr a -> IO (Ptr a)
eventManager :: IORef (IOArray Int (Maybe (ThreadId, EventManager)))
eventManager = unsafePerformIO $ do
numCaps <- getNumCapabilities
eventManagerArray <- newIOArray (0, numCaps 1) Nothing
em <- newIORef eventManagerArray
sharedCAF em getOrSetSystemEventThreadEventManagerStore
numEnabledEventManagers :: IORef Int
numEnabledEventManagers = unsafePerformIO $ do
newIORef 0
foreign import ccall unsafe "getOrSetSystemEventThreadIOManagerThreadStore"
getOrSetSystemEventThreadIOManagerThreadStore :: Ptr a -> IO (Ptr a)
ioManagerLock :: MVar ()
ioManagerLock = unsafePerformIO $ do
m <- newMVar ()
sharedCAF m getOrSetSystemEventThreadIOManagerThreadStore
getSystemTimerManager :: IO TM.TimerManager
getSystemTimerManager = do
Just mgr <- readIORef timerManager
return mgr
foreign import ccall unsafe "getOrSetSystemTimerThreadEventManagerStore"
getOrSetSystemTimerThreadEventManagerStore :: Ptr a -> IO (Ptr a)
timerManager :: IORef (Maybe TM.TimerManager)
timerManager = unsafePerformIO $ do
em <- newIORef Nothing
sharedCAF em getOrSetSystemTimerThreadEventManagerStore
foreign import ccall unsafe "getOrSetSystemTimerThreadIOManagerThreadStore"
getOrSetSystemTimerThreadIOManagerThreadStore :: Ptr a -> IO (Ptr a)
timerManagerThreadVar :: MVar (Maybe ThreadId)
timerManagerThreadVar = unsafePerformIO $ do
m <- newMVar Nothing
sharedCAF m getOrSetSystemTimerThreadIOManagerThreadStore
ensureIOManagerIsRunning :: IO ()
ensureIOManagerIsRunning
| not threaded = return ()
| otherwise = do
startIOManagerThreads
startTimerManagerThread
startIOManagerThreads :: IO ()
startIOManagerThreads =
withMVar ioManagerLock $ \_ -> do
eventManagerArray <- readIORef eventManager
let (_, high) = boundsIOArray eventManagerArray
mapM_ (startIOManagerThread eventManagerArray) [0..high]
writeIORef numEnabledEventManagers (high+1)
show_int :: Int -> String
show_int i = showSignedInt 0 i ""
restartPollLoop :: EventManager -> Int -> IO ThreadId
restartPollLoop mgr i = do
M.release mgr
!t <- forkOn i $ loop mgr
labelThread t ("IOManager on cap " ++ show_int i)
return t
startIOManagerThread :: IOArray Int (Maybe (ThreadId, EventManager))
-> Int
-> IO ()
startIOManagerThread eventManagerArray i = do
let create = do
!mgr <- new
!t <- forkOn i $ do
c_setIOManagerControlFd
(fromIntegral i)
(fromIntegral $ controlWriteFd $ M.emControl mgr)
loop mgr
labelThread t ("IOManager on cap " ++ show_int i)
writeIOArray eventManagerArray i (Just (t,mgr))
old <- readIOArray eventManagerArray i
case old of
Nothing -> create
Just (t,em) -> do
s <- threadStatus t
case s of
ThreadFinished -> create
ThreadDied -> do
c_setIOManagerControlFd (fromIntegral i) (1)
M.cleanup em
create
_other -> return ()
startTimerManagerThread :: IO ()
startTimerManagerThread = modifyMVar_ timerManagerThreadVar $ \old -> do
let create = do
!mgr <- TM.new
c_setTimerManagerControlFd
(fromIntegral $ controlWriteFd $ TM.emControl mgr)
writeIORef timerManager $ Just mgr
!t <- forkIO $ TM.loop mgr
labelThread t "TimerManager"
return $ Just t
case old of
Nothing -> create
st@(Just t) -> do
s <- threadStatus t
case s of
ThreadFinished -> create
ThreadDied -> do
mem <- readIORef timerManager
_ <- case mem of
Nothing -> return ()
Just em -> do c_setTimerManagerControlFd (1)
TM.cleanup em
create
_other -> return st
foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool
ioManagerCapabilitiesChanged :: IO ()
ioManagerCapabilitiesChanged = do
withMVar ioManagerLock $ \_ -> do
new_n_caps <- getNumCapabilities
numEnabled <- readIORef numEnabledEventManagers
writeIORef numEnabledEventManagers new_n_caps
eventManagerArray <- readIORef eventManager
let (_, high) = boundsIOArray eventManagerArray
let old_n_caps = high + 1
if new_n_caps > old_n_caps
then do new_eventManagerArray <- newIOArray (0, new_n_caps 1) Nothing
forM_ [0..high] $ \i -> do
Just (tid,mgr) <- readIOArray eventManagerArray i
if i < numEnabled
then writeIOArray new_eventManagerArray i (Just (tid,mgr))
else do tid' <- restartPollLoop mgr i
writeIOArray new_eventManagerArray i (Just (tid',mgr))
forM_ [old_n_caps..new_n_caps1] $
startIOManagerThread new_eventManagerArray
writeIORef eventManager new_eventManagerArray
else when (new_n_caps > numEnabled) $
forM_ [numEnabled..new_n_caps1] $ \i -> do
Just (_,mgr) <- readIOArray eventManagerArray i
tid <- restartPollLoop mgr i
writeIOArray eventManagerArray i (Just (tid,mgr))
foreign import ccall unsafe "setIOManagerControlFd"
c_setIOManagerControlFd :: CUInt -> CInt -> IO ()
foreign import ccall unsafe "setTimerManagerControlFd"
c_setTimerManagerControlFd :: CInt -> IO ()