base-4.6.0.1: Basic libraries

Portabilitynon-portable (concurrency)
Stabilityexperimental
Maintainerlibraries@haskell.org
Safe HaskellTrustworthy

Control.Concurrent

Contents

Description

A common interface to a collection of useful concurrency abstractions.

Synopsis

Concurrent Haskell

The concurrency extension for Haskell is described in the paper Concurrent Haskell http://www.haskell.org/ghc/docs/papers/concurrent-haskell.ps.gz.

Concurrency is "lightweight", which means that both thread creation and context switching overheads are extremely low. Scheduling of Haskell threads is done internally in the Haskell runtime system, and doesn't make use of any operating system-supplied thread packages.

However, if you want to interact with a foreign library that expects your program to use the operating system-supplied thread package, you can do so by using forkOS instead of forkIO.

Haskell threads can communicate via MVars, a kind of synchronised mutable variable (see Control.Concurrent.MVar). Several common concurrency abstractions can be built from MVars, and these are provided by the Control.Concurrent library. In GHC, threads may also communicate via exceptions.

Basic concurrency operations

data ThreadId Source

A ThreadId is an abstract type representing a handle to a thread. ThreadId is an instance of Eq, Ord and Show, where the Ord instance implements an arbitrary total ordering over ThreadIds. The Show instance lets you convert an arbitrary-valued ThreadId to string form; showing a ThreadId value is occasionally useful when debugging or diagnosing the behaviour of a concurrent program.

Note: in GHC, if you have a ThreadId, you essentially have a pointer to the thread itself. This means the thread itself can't be garbage collected until you drop the ThreadId. This misfeature will hopefully be corrected at a later date.

Note: Hugs does not provide any operations on other threads; it defines ThreadId as a synonym for ().

myThreadId :: IO ThreadIdSource

Returns the ThreadId of the calling thread (GHC only).

forkIO :: IO () -> IO ThreadIdSource

Sparks off a new thread to run the IO computation passed as the first argument, and returns the ThreadId of the newly created thread.

The new thread will be a lightweight thread; if you want to use a foreign library that uses thread-local storage, use forkOS instead.

GHC note: the new thread inherits the masked state of the parent (see mask).

The newly created thread has an exception handler that discards the exceptions BlockedIndefinitelyOnMVar, BlockedIndefinitelyOnSTM, and ThreadKilled, and passes all other exceptions to the uncaught exception handler.

forkFinally :: IO a -> (Either SomeException a -> IO ()) -> IO ThreadIdSource

fork a thread and call the supplied function when the thread is about to terminate, with an exception or a returned value. The function is called with asynchronous exceptions masked.

 forkFinally action and_then =
   mask $ \restore ->
     forkIO $ try (restore action) >>= and_then

This function is useful for informing the parent when a child terminates, for example.

forkIOWithUnmask :: ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadIdSource

Like forkIO, but the child thread is passed a function that can be used to unmask asynchronous exceptions. This function is typically used in the following way

  ... mask_ $ forkIOWithUnmask $ \unmask ->
                 catch (unmask ...) handler

so that the exception handler in the child thread is established with asynchronous exceptions masked, meanwhile the main body of the child thread is executed in the unmasked state.

Note that the unmask function passed to the child thread should only be used in that thread; the behaviour is undefined if it is invoked in a different thread.

killThread :: ThreadId -> IO ()Source

killThread raises the ThreadKilled exception in the given thread (GHC only).

 killThread tid = throwTo tid ThreadKilled

throwTo :: Exception e => ThreadId -> e -> IO ()Source

throwTo raises an arbitrary exception in the target thread (GHC only).

throwTo does not return until the exception has been raised in the target thread. The calling thread can thus be certain that the target thread has received the exception. This is a useful property to know when dealing with race conditions: eg. if there are two threads that can kill each other, it is guaranteed that only one of the threads will get to kill the other.

Whatever work the target thread was doing when the exception was raised is not lost: the computation is suspended until required by another thread.

If the target thread is currently making a foreign call, then the exception will not be raised (and hence throwTo will not return) until the call has completed. This is the case regardless of whether the call is inside a mask or not. However, in GHC a foreign call can be annotated as interruptible, in which case a throwTo will cause the RTS to attempt to cause the call to return; see the GHC documentation for more details.

Important note: the behaviour of throwTo differs from that described in the paper "Asynchronous exceptions in Haskell" (http://research.microsoft.com/~simonpj/Papers/asynch-exns.htm). In the paper, throwTo is non-blocking; but the library implementation adopts a more synchronous design in which throwTo does not return until the exception is received by the target thread. The trade-off is discussed in Section 9 of the paper. Like any blocking operation, throwTo is therefore interruptible (see Section 5.3 of the paper). Unlike other interruptible operations, however, throwTo is always interruptible, even if it does not actually block.

There is no guarantee that the exception will be delivered promptly, although the runtime will endeavour to ensure that arbitrary delays don't occur. In GHC, an exception can only be raised when a thread reaches a safe point, where a safe point is where memory allocation occurs. Some loops do not perform any memory allocation inside the loop and therefore cannot be interrupted by a throwTo.

If the target of throwTo is the calling thread, then the behaviour is the same as throwIO, except that the exception is thrown as an asynchronous exception. This means that if there is an enclosing pure computation, which would be the case if the current IO operation is inside unsafePerformIO or unsafeInterleaveIO, that computation is not permanently replaced by the exception, but is suspended as if it had received an asynchronous exception.

Note that if throwTo is called with the current thread as the target, the exception will be thrown even if the thread is currently inside mask or uninterruptibleMask.

Threads with affinity

forkOn :: Int -> IO () -> IO ThreadIdSource

Like forkIO, but lets you specify on which processor the thread should run. Unlike a forkIO thread, a thread created by forkOn will stay on the same processor for its entire lifetime (forkIO threads can migrate between processors according to the scheduling policy). forkOn is useful for overriding the scheduling policy when you know in advance how best to distribute the threads.

The Int argument specifies a capability number (see getNumCapabilities). Typically capabilities correspond to physical processors, but the exact behaviour is implementation-dependent. The value passed to forkOn is interpreted modulo the total number of capabilities as returned by getNumCapabilities.

GHC note: the number of capabilities is specified by the +RTS -N option when the program is started. Capabilities can be fixed to actual processor cores with +RTS -qa if the underlying operating system supports that, although in practice this is usually unnecessary (and may actually degrade perforamnce in some cases - experimentation is recommended).

forkOnWithUnmask :: Int -> ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadIdSource

Like forkIOWithUnmask, but the child thread is pinned to the given CPU, as with forkOn.

getNumCapabilities :: IO IntSource

Returns the number of Haskell threads that can run truly simultaneously (on separate physical processors) at any given time. To change this value, use setNumCapabilities.

setNumCapabilities :: Int -> IO ()Source

Set the number of Haskell threads that can run truly simultaneously (on separate physical processors) at any given time. The number passed to forkOn is interpreted modulo this value. The initial value is given by the +RTS -N runtime flag.

This is also the number of threads that will participate in parallel garbage collection. It is strongly recommended that the number of capabilities is not set larger than the number of physical processor cores, and it may often be beneficial to leave one or more cores free to avoid contention with other processes in the machine.

threadCapability :: ThreadId -> IO (Int, Bool)Source

returns the number of the capability on which the thread is currently running, and a boolean indicating whether the thread is locked to that capability or not. A thread is locked to a capability if it was created with forkOn.

Scheduling

Scheduling may be either pre-emptive or co-operative, depending on the implementation of Concurrent Haskell (see below for information related to specific compilers). In a co-operative system, context switches only occur when you use one of the primitives defined in this module. This means that programs such as:

   main = forkIO (write 'a') >> write 'b'
     where write c = putChar c >> write c

will print either aaaaaaaaaaaaaa... or bbbbbbbbbbbb..., instead of some random interleaving of as and bs. In practice, cooperative multitasking is sufficient for writing simple graphical user interfaces.

yield :: IO ()Source

The yield action allows (forces, in a co-operative multitasking implementation) a context-switch to any other currently runnable threads (if any), and is occasionally useful when implementing concurrency abstractions.

Blocking

Different Haskell implementations have different characteristics with regard to which operations block all threads.

Using GHC without the -threaded option, all foreign calls will block all other Haskell threads in the system, although I/O operations will not. With the -threaded option, only foreign calls with the unsafe attribute will block all other threads.

Using Hugs, all I/O operations and foreign calls will block all other Haskell threads.

Waiting

threadDelay :: Int -> IO ()Source

Suspends the current thread for a given number of microseconds (GHC only).

There is no guarantee that the thread will be rescheduled promptly when the delay has expired, but the thread will never continue to run earlier than specified.

threadWaitRead :: Fd -> IO ()Source

Block the current thread until data is available to read on the given file descriptor (GHC only).

This will throw an IOError if the file descriptor was closed while this thread was blocked. To safely close a file descriptor that has been used with threadWaitRead, use closeFdWith.

threadWaitWrite :: Fd -> IO ()Source

Block the current thread until data can be written to the given file descriptor (GHC only).

This will throw an IOError if the file descriptor was closed while this thread was blocked. To safely close a file descriptor that has been used with threadWaitWrite, use closeFdWith.

Communication abstractions

Merging of streams

mergeIO :: [a] -> [a] -> IO [a]Source

Deprecated: Control.Concurrent.mergeIO will be removed in GHC 7.8. Please use an alternative, e.g. the SafeSemaphore package, instead.

nmergeIO :: [[a]] -> IO [a]Source

Deprecated: Control.Concurrent.nmergeIO will be removed in GHC 7.8. Please use an alternative, e.g. the SafeSemaphore package, instead.

The mergeIO and nmergeIO functions fork one thread for each input list that concurrently evaluates that list; the results are merged into a single output list.

Note: Hugs does not provide these functions, since they require preemptive multitasking.

Bound Threads

Support for multiple operating system threads and bound threads as described below is currently only available in the GHC runtime system if you use the -threaded option when linking.

Other Haskell systems do not currently support multiple operating system threads.

A bound thread is a haskell thread that is bound to an operating system thread. While the bound thread is still scheduled by the Haskell run-time system, the operating system thread takes care of all the foreign calls made by the bound thread.

To a foreign library, the bound thread will look exactly like an ordinary operating system thread created using OS functions like pthread_create or CreateThread.

Bound threads can be created using the forkOS function below. All foreign exported functions are run in a bound thread (bound to the OS thread that called the function). Also, the main action of every Haskell program is run in a bound thread.

Why do we need this? Because if a foreign library is called from a thread created using forkIO, it won't have access to any thread-local state - state variables that have specific values for each OS thread (see POSIX's pthread_key_create or Win32's TlsAlloc). Therefore, some libraries (OpenGL, for example) will not work from a thread created using forkIO. They work fine in threads created using forkOS or when called from main or from a foreign export.

In terms of performance, forkOS (aka bound) threads are much more expensive than forkIO (aka unbound) threads, because a forkOS thread is tied to a particular OS thread, whereas a forkIO thread can be run by any OS thread. Context-switching between a forkOS thread and a forkIO thread is many times more expensive than between two forkIO threads.

Note in particular that the main program thread (the thread running Main.main) is always a bound thread, so for good concurrency performance you should ensure that the main thread is not doing repeated communication with other threads in the system. Typically this means forking subthreads to do the work using forkIO, and waiting for the results in the main thread.

rtsSupportsBoundThreads :: BoolSource

True if bound threads are supported. If rtsSupportsBoundThreads is False, isCurrentThreadBound will always return False and both forkOS and runInBoundThread will fail.

forkOS :: IO () -> IO ThreadIdSource

Like forkIO, this sparks off a new thread to run the IO computation passed as the first argument, and returns the ThreadId of the newly created thread.

However, forkOS creates a bound thread, which is necessary if you need to call foreign (non-Haskell) libraries that make use of thread-local state, such as OpenGL (see Control.Concurrent).

Using forkOS instead of forkIO makes no difference at all to the scheduling behaviour of the Haskell runtime system. It is a common misconception that you need to use forkOS instead of forkIO to avoid blocking all the Haskell threads when making a foreign call; this isn't the case. To allow foreign calls to be made without blocking all the Haskell threads (with GHC), it is only necessary to use the -threaded option when linking your program, and to make sure the foreign import is not marked unsafe.

isCurrentThreadBound :: IO BoolSource

Returns True if the calling thread is bound, that is, if it is safe to use foreign libraries that rely on thread-local state from the calling thread.

runInBoundThread :: IO a -> IO aSource

Run the IO computation passed as the first argument. If the calling thread is not bound, a bound thread is created temporarily. runInBoundThread doesn't finish until the IO computation finishes.

You can wrap a series of foreign function calls that rely on thread-local state with runInBoundThread so that you can use them without knowing whether the current thread is bound.

runInUnboundThread :: IO a -> IO aSource

Run the IO computation passed as the first argument. If the calling thread is bound, an unbound thread is created temporarily using forkIO. runInBoundThread doesn't finish until the IO computation finishes.

Use this function only in the rare case that you have actually observed a performance loss due to the use of bound threads. A program that doesn't need it's main thread to be bound and makes heavy use of concurrency (e.g. a web server), might want to wrap it's main action in runInUnboundThread.

Note that exceptions which are thrown to the current thread are thrown in turn to the thread that is executing the given computation. This ensures there's always a way of killing the forked thread.

Weak references to ThreadIds

mkWeakThreadId :: ThreadId -> IO (Weak ThreadId)Source

make a weak pointer to a ThreadId. It can be important to do this if you want to hold a reference to a ThreadId while still allowing the thread to receive the BlockedIndefinitely family of exceptions (e.g. BlockedIndefinitelyOnMVar). Holding a normal ThreadId reference will prevent the delivery of BlockedIndefinitely exceptions because the reference could be used as the target of throwTo at any time, which would unblock the thread.

Holding a Weak ThreadId, on the other hand, will not prevent the thread from receiving BlockedIndefinitely exceptions. It is still possible to throw an exception to a Weak ThreadId, but the caller must use deRefWeak first to determine whether the thread still exists.

GHC's implementation of concurrency

This section describes features specific to GHC's implementation of Concurrent Haskell.

Haskell threads and Operating System threads

In GHC, threads created by forkIO are lightweight threads, and are managed entirely by the GHC runtime. Typically Haskell threads are an order of magnitude or two more efficient (in terms of both time and space) than operating system threads.

The downside of having lightweight threads is that only one can run at a time, so if one thread blocks in a foreign call, for example, the other threads cannot continue. The GHC runtime works around this by making use of full OS threads where necessary. When the program is built with the -threaded option (to link against the multithreaded version of the runtime), a thread making a safe foreign call will not block the other threads in the system; another OS thread will take over running Haskell threads until the original call returns. The runtime maintains a pool of these worker threads so that multiple Haskell threads can be involved in external calls simultaneously.

The System.IO library manages multiplexing in its own way. On Windows systems it uses safe foreign calls to ensure that threads doing I/O operations don't block the whole runtime, whereas on Unix systems all the currently blocked I/O requests are managed by a single thread (the IO manager thread) using a mechanism such as epoll or kqueue, depending on what is provided by the host operating system.

The runtime will run a Haskell thread using any of the available worker OS threads. If you need control over which particular OS thread is used to run a given Haskell thread, perhaps because you need to call a foreign library that uses OS-thread-local state, then you need bound threads (see Control.Concurrent).

If you don't use the -threaded option, then the runtime does not make use of multiple OS threads. Foreign calls will block all other running Haskell threads until the call returns. The System.IO library still does multiplexing, so there can be multiple threads doing I/O, and this is handled internally by the runtime using select.

Terminating the program

In a standalone GHC program, only the main thread is required to terminate in order for the process to terminate. Thus all other forked threads will simply terminate at the same time as the main thread (the terminology for this kind of behaviour is "daemonic threads").

If you want the program to wait for child threads to finish before exiting, you need to program this yourself. A simple mechanism is to have each child thread write to an MVar when it completes, and have the main thread wait on all the MVars before exiting:

   myForkIO :: IO () -> IO (MVar ())
   myForkIO io = do
     mvar <- newEmptyMVar
     forkFinally io (\_ -> putMVar mvar ())
     return mvar

Note that we use forkFinally to make sure that the MVar is written to even if the thread dies or is killed for some reason.

A better method is to keep a global list of all child threads which we should wait for at the end of the program:

    children :: MVar [MVar ()]
    children = unsafePerformIO (newMVar [])
    
    waitForChildren :: IO ()
    waitForChildren = do
      cs <- takeMVar children
      case cs of
        []   -> return ()
        m:ms -> do
           putMVar children ms
           takeMVar m
           waitForChildren

    forkChild :: IO () -> IO ThreadId
    forkChild io = do
        mvar <- newEmptyMVar
        childs <- takeMVar children
        putMVar children (mvar:childs)
        forkFinally io (\_ -> putMVar mvar ())

     main =
       later waitForChildren $
       ...

The main thread principle also applies to calls to Haskell from outside, using foreign export. When the foreign exported function is invoked, it starts a new main thread, and it returns when this main thread terminates. If the call causes new threads to be forked, they may remain in the system after the foreign exported function has returned.

Pre-emption

GHC implements pre-emptive multitasking: the execution of threads are interleaved in a random fashion. More specifically, a thread may be pre-empted whenever it allocates some memory, which unfortunately means that tight loops which do no allocation tend to lock out other threads (this only seems to happen with pathological benchmark-style code, however).

The rescheduling timer runs on a 20ms granularity by default, but this may be altered using the -i<n> RTS option. After a rescheduling "tick" the running thread is pre-empted as soon as possible.

One final note: the aaaa bbbb example may not work too well on GHC (see Scheduling, above), due to the locking on a Handle. Only one thread may hold the lock on a Handle at any one time, so if a reschedule happens while a thread is holding the lock, the other thread won't be able to run. The upshot is that the switch from aaaa to bbbbb happens infrequently. It can be improved by lowering the reschedule tick period. We also have a patch that causes a reschedule whenever a thread waiting on a lock is woken up, but haven't found it to be useful for anything other than this example :-)

Deprecated functions

forkIOUnmasked :: IO () -> IO ThreadIdSource

Deprecated: use forkIOWithUnmask instead

This function is deprecated; use forkIOWithUnmask instead