Skip to content

std.concurrent

The std.concurrent modules carry the concurrency primitives that landed across the 0.3.x releases: atomics and sleep_ms in 0.3.0, channels in 0.3.1, mutexes and condition variables in 0.3.2, and the thread pool with the non-blocking and timed channel operations in 0.3.3. There are five modules, each imported separately:

@import std.concurrent.atomic
@import std.concurrent.thread
@import std.concurrent.channel
@import std.concurrent.sync
@import std.concurrent.pool

The thread primitives themselves are not here. spawn, join, and submit are always-available builtins, gated behind no paradigm and needing no import, like alloc and the error machinery. They are documented in the concurrency reference and listed with the other builtins. This page covers what the library adds on top.

A shared shape runs through these modules: each handle (AtomicInt, Channel<T>, Mutex, Condvar) is a small struct over a runtime shim, one word in practice, that copies freely (including into a spawned lambda’s captures) and every copy names the same underlying object. Handles are deliberately exempt from the single-owner rule because they are not managed pointers: they exist to be shared. Each also has an explicit _free function (or, for the pool, a shutdown), and using a handle after freeing it is on the raw layer’s honor system.

For the ordering guarantees these primitives provide and the data-race stance behind them, see the memory model in the concurrency reference. For a task-oriented walkthrough, see the concurrency guide.

Added in 0.3.0. A sequentially consistent int64 atomic over a heap word: the sanctioned way to share a mutable counter between threads without a mutex.

struct AtomicInt
func atomic_new(v: int64) -> AtomicInt
func atomic_load(a: AtomicInt) -> int64
func atomic_store(a: AtomicInt, v: int64) -> void
func atomic_add(a: AtomicInt, d: int64) -> int64
func atomic_cas(a: AtomicInt, expect: int64, desired: int64) -> bool
func atomic_free(a: AtomicInt) -> void
  • atomic_new(v) allocates the word and initializes it to v.
  • atomic_load(a) and atomic_store(a, v) read and write the word.
  • atomic_add(a, d) adds d and returns the new value.
  • atomic_cas(a, expect, desired) compares and swaps, returning true when the swap happened.
  • atomic_free(a) releases the word.

All operations are sequentially consistent, so they order the accesses they mediate, the property that makes the total below exact where a plain shared int64 would race:

counter.dusk
@paradigm procedural
@import std.concurrent.atomic
func main() -> int32 {
c := atomic_new(0)
t1, e1 := spawn(lambda () -> void {
mut i: int64 = 0
while i < 1000 {
atomic_add(c, 1)
i = i + 1
}
})
e1.ignore()
t2, e2 := spawn(lambda () -> void {
mut i: int64 = 0
while i < 1000 {
atomic_add(c, 1)
i = i + 1
}
})
e2.ignore()
j1 := join(t1)
j1.ignore()
j2 := join(t2)
j2.ignore()
println(atomic_load(c))
atomic_free(c)
return 0
}

The AtomicInt handle is a struct holding a raw pointer, so it crosses a spawn capture freely; both threads above increment the same heap word.

Added in 0.3.0. spawn and join need no import; this module carries what rides beside them. Today that is one function:

func sleep_ms(ms: int64) -> void

sleep_ms(ms) blocks the calling thread for at least ms milliseconds.

Added in 0.3.1, with the non-blocking and timed operations added in 0.3.3. A channel is a bounded, thread-safe queue for handing values between threads, an ordinary generic struct over runtime shims, not a compiler type.

struct Channel<T>
func chan_new<T>(cap: int64) -> Channel<T>
func chan_send<T>(c: Channel<T>, x: T) -> error
func chan_recv<T>(c: Channel<T>) -> (T, error)
func chan_try_send<T>(c: Channel<T>, x: T) -> error
func chan_try_recv<T>(c: Channel<T>) -> (T, error)
func chan_recv_timeout<T>(c: Channel<T>, ms: int64) -> (T, error)
func chan_close<T>(c: Channel<T>) -> void
func chan_free<T>(c: Channel<T>) -> void

chan_new(cap) builds a channel holding at most cap elements. The element type is pinned by the binding annotation, the same sizing rule alloc uses, so a bare jobs := chan_new(8) is a compile error:

jobs: Channel<int64> = chan_new(8)

A capacity below one or exhausted memory is fatal rather than an error: the allocator’s contract.

A channel element must be safe to carry to another thread, the same rule spawn captures follow: an element type containing a slice, a closure, or an interface value, wherever it sits, including buried in a struct or enum field, is a compile error at the instantiation. Send heap-owned data instead.

chan_send(c, x) copies the value in and blocks while the channel is full. Its error exists when the channel is closed, whether it was closed before the call or while the sender waited; the message is "send on a closed channel".

chan_recv(c) copies the oldest value out and blocks while the channel is empty. Its error exists only once the channel is closed and drained (message "receive on a closed, drained channel"), so a loop breaking on e.exists() consumes everything senders managed to deliver. The value beside that error is the zero pattern for T and means nothing; when T is a managed pointer that zero is null, and dereferencing it faults by name.

The consumer’s idiom, receive until the error exists:

drain.dusk
@paradigm procedural
@import std.concurrent.channel
func main() -> int32 {
c: Channel<int64> = chan_new(8)
t, e := spawn(lambda () -> void {
mut i: int64 = 1
while i <= 3 {
se := chan_send(c, i)
se.ignore()
i = i + 1
}
chan_close(c)
})
if e.exists() {
printerr(e)
return 1
}
mut going := true
while going {
v, re := chan_recv(c)
if re.exists() {
going = false
} else {
println(v)
}
}
je := join(t)
je.ignore()
chan_free(c)
return 0
}

Added in 0.3.3, three operations refuse instead of parking:

  • chan_try_send(c, x) sends only if the channel has room right now. Its error reads "channel is full" or the closed message, so the two refusals are distinguishable.
  • chan_try_recv(c) receives only if a value is ready right now. Its error reads "channel is empty" or the closed-and-drained message.
  • chan_recv_timeout(c, ms) parks at most ms milliseconds against a monotonic clock, so a wall-clock step cannot stretch or shrink the wait. Its error reads "receive timed out" or the closed-and-drained message. A timeout of zero polls once and gives blocked senders no chance to arrive.

The value beside any of these errors is the zero pattern for T, the drained receive’s contract.

chan_close(c) is idempotent, wakes every blocked sender and receiver, and discards nothing already buffered. Values sent before the close stay receivable after it.

Ownership crosses a thread boundary by moving a managed pointer through a channel: chan_send(c, move(p)) kills the sender’s name at compile time, and the receiver’s q, e := chan_recv(c) binds a fresh owner. Sending without move leaves both sides sharing the record with no order between them. Two documented leak paths follow from the ring holding raw bytes: a moved send refused by a closed channel loses its record, and managed pointers still buffered when chan_free runs are dropped as bytes. Neither is corruption, and neither happens in the sanctioned protocol where senders finish before the close.

chan_free(c) releases the channel. Freeing while a thread is blocked inside a send or receive is fatal with a named message. The sanctioned shutdown order is: close the channel, join every thread that touches it, then free it. Using a channel after chan_free is undefined, since the one-word handle carries no generation.

Added in 0.3.2. A mutex and a condition variable over pthread: the sanctioned way to mutate shared memory between threads. The blessed shape for shared mutable state is a *raw buffer guarded by one Mutex: lock, touch the buffer, unlock.

struct Mutex
struct Condvar
func mutex_new() -> Mutex
func lock(m: Mutex) -> void
func unlock(m: Mutex) -> void
func mutex_free(m: Mutex) -> void
func cond_new() -> Condvar
func cond_wait(cv: Condvar, m: Mutex) -> void
func cond_signal(cv: Condvar) -> void
func cond_broadcast(cv: Condvar) -> void
func cond_free(cv: Condvar) -> void

lock(m) acquires the mutex, blocking until it is free. unlock(m) releases it. An unlock happens before the lock that next acquires the same mutex, which is the ordering that makes the guarded memory safe to touch. Inside a function body the idiom is lock(m) followed by defer unlock(m), so every return path releases.

The mutex is the error-checking kind, so the classic pthread misuses (undefined in the default flavor) fault by name instead of hanging or corrupting:

  • relocking a mutex the thread already holds is fatal,
  • unlocking a mutex the thread does not hold is fatal,
  • freeing a held mutex is fatal (a trylock probe catches it),
  • operating on a mutex already freed faults as an invalid mutex rather than blaming a holder that does not exist.
guarded.dusk
@paradigm procedural
@import std.concurrent.sync
func main() -> int32 {
m := mutex_new()
buf: *raw int64 = alloc_bytes(8)
buf[0] = 0
t1, e1 := spawn(lambda () -> void {
mut i: int64 = 0
while i < 1000 {
lock(m)
buf[0] = buf[0] + 1
unlock(m)
i = i + 1
}
})
e1.ignore()
t2, e2 := spawn(lambda () -> void {
mut i: int64 = 0
while i < 1000 {
lock(m)
buf[0] = buf[0] + 1
unlock(m)
i = i + 1
}
})
e2.ignore()
j1 := join(t1)
j1.ignore()
j2 := join(t2)
j2.ignore()
println(buf[0])
mutex_free(m)
free(buf)
return 0
}

cond_wait(cv, m) releases the mutex, sleeps until cond_signal(cv) wakes one waiter or cond_broadcast(cv) wakes all, and reacquires the mutex before returning. The caller must hold the mutex, and every concurrent wait on one condition variable must name the same mutex. Wakeups can be spurious, so a wait always sits in a loop that rechecks its predicate under the lock:

lock(m)
while buf[5] == 0 {
cond_wait(notempty, m)
}
// consume under the lock, then
unlock(m)

cond_free(cv) releases the condition variable. Freeing one a thread still waits on is fatal by name. A waiter count in the runtime makes this deterministic instead of the silent forever-hang the bare pthread destroy gives. A condition variable wait has no timeout, so a predicate nothing ever makes true is a deadlock; the wait that can time out is chan_recv_timeout.

Added in 0.3.3. The pool is a process singleton of OS threads running fire-and-forget tasks over an unbounded FIFO queue: the substrate the 0.4.x async line schedules onto. This module carries the lifecycle; task submission is the submit builtin, which needs no import.

func pool_start(workers: int64) -> error
func pool_shutdown() -> void
func ncpu() -> int64
  • pool_start(workers) starts the singleton with a fixed worker count. Its error exists when the count is below one, the pool is already running, it was already shut down, or the operating system refuses a worker thread. A refused start leaves the pool pristine, so a later attempt can succeed. But a successful start is the only one the process gets, and after a shutdown the pool stays down.
  • pool_shutdown() stops new submissions, runs everything already queued to completion, and joins the workers. It is idempotent and a no-op before the pool starts. When callers race into it, the loser waits for the winner, so every caller returns holding the drain guarantee. A pool task calling pool_shutdown itself is fatal by name.
  • ncpu() returns the number of processors online, at least one (the natural worker count).

submit(f: () -> void) -> error shares spawn’s whole argument rule: one lambda literal of type () -> void, captures copied to a private heap block, the same slice, closure, and interface capture ban, and a captured managed pointer borrowed rather than owned. It returns only an error, because the pool owns the task and results flow back through a channel. A submission never blocks the submitter, whatever the queue holds; the error exists only when the pool is not running, and on that path the task body never runs and leaks nothing.

Submission order is queue order, but tasks run on many workers at once, so completion order is not promised. Queuing a task happens before its body runs, and everything a body did is visible to whoever receives its completion through a channel. Shut the pool down before main returns, for the same reason threads are joined.

The basic shape is capture in, channel out:

poolfold.dusk
@paradigm procedural
@import std.concurrent.channel
@import std.concurrent.pool
func main() -> int32 {
pe := pool_start(ncpu())
if pe.exists() {
printerr(pe)
return 1
}
results: Channel<int64> = chan_new(16)
mut i: int64 = 1
while i <= 10 {
n := i
se := submit(lambda () -> void {
we := chan_send(results, n * n)
we.ignore()
})
se.ignore()
i = i + 1
}
mut sum: int64 = 0
mut got: int64 = 0
while got < 10 {
v, re := chan_recv(results)
re.ignore()
sum = sum + v
got = got + 1
}
println(sum)
pool_shutdown()
chan_free(results)
return 0
}

dusk does not detect data races; the sanctioned paths provide the ordering they name:

  • capture at spawn copies values into the thread’s private environment,
  • the sequentially consistent atomics in std.concurrent.atomic order the accesses they mediate,
  • a chan_recv happens after the chan_send that delivered the value,
  • an unlock happens before the next lock of the same mutex,
  • queuing a pool task happens before its body runs,
  • join orders everything the thread did before everything the joiner does after.

Everything else built by hand out of *raw T buffers is on the honor system across threads unless a mutex guards every touch. The full statement, including how the generational heap’s checks degrade under a true race, is in the concurrency reference.