API Reference

AtomicChannels.AtomicCellType
mutable struct AtomicCell{T}
    state::Threads.Atomic{UInt8}
    value::Base.RefValue{Union{T, Nothing}}
end

A cell in the AtomicChannel. The state can be empty (0), filled (1), or busy (2; being written/cleared). The value field holds the item when the cell is filled or nothing when it is empty.

source
AtomicChannels.AtomicChannelType
AtomicChannel{T<:Any}(capacity::Int)
AtomicChannel(capacity::Int) -> AtomicChannel{Any}

A thread-safe, lock-free and task-free channel/queue implementation for type T.

It supports concurrent producers and consumers with blocking and non-blocking APIs. The channel is implemented as a ring buffer with atomic operations to ensure thread safety.

  • capacity specifies the maximum number of items the channel can hold. It must be greater than 0.

Examples

julia> chnl = AtomicChannel{Int}(2)
AtomicChannel{Int64} with 0/2 items
  head = 0, tail = 0, free = 2
  slots = [..]

julia> put!(chnl, 1)
AtomicChannel{Int64} with 1/2 items
  head = 0, tail = 1, free = 1
  slots = [X.]

julia> put!(chnl, 2)
AtomicChannel{Int64} with 2/2 items
  head = 0, tail = 2, free = 0
  slots = [XX]

julia> tryput!(chnl, 3)  # returns false since the chnl is full
false

julia> take!(chnl)  # returns 1
1

julia> take!(chnl)  # returns 2
2

julia> trytake!(chnl)  # returns nothing since the chnl is empty
source
AtomicChannels.ReusePoolType
ReusePool(create::Function, size::Int = 1, reset::Function = identity)
ReusePool{T}(create::Function, size::Int = 1, reset::Function = identity)

A thread-safe and reusable pool of objects of type T. The pool allows for efficient reuse of objects, reducing the overhead of creating and destroying objects frequently. The pool is thread-safe and can be used in concurrent environments.

Once a ReusePool is created, it comes with one instance of T created by the create function. The pool can hold up to size instances of T. When an item is taken from the pool, it is removed from the pool until it is put back.

Fields

  • create::Function: A function that creates a new instance of T. This function must be thread-safe and should return a new instance of T each time it is called.
  • reset::Function: A function that resets an instance of T to a clean state. Defaults to identity. This function must edit the item in place and should not return a new instance.
  • chnl::AtomicChannel{T}: An atomic lock-free channel that holds the reusable objects.

API

  • Blocking: take!(pool), put!(pool, item)
  • Non-blocking: acquire!(pool), release!(pool, item), fill!(pool)
source
AtomicChannels.acquire!Method
acquire!(reuse_pool::ReusePool{T}) where T<:Any

Gets an item from the pool without blocking. If the pool is empty, a new item is created using the create function.

See also: release! to release an item back to the pool without blocking.

See also: take! and put! for blocking versions.

source
AtomicChannels.release!Method
release!(reuse_pool::ReusePool{T}, item::T) where T<:Any

Releases an item back to the pool without blocking. If the pool is full, the item is discarded. Return true if the item was successfully released back to the pool, or false if the item was discarded because the pool is full.

See also: acquire! to get an item from the pool without blocking.

See also: take! and put! for blocking versions.

source
AtomicChannels.tryput!Method
tryput!(chnl::AtomicChannel{T}, item::T) where T<:Any
tryput!(reset_func::Base.Callable, chnl::AtomicChannel{T}, item::T) where T<:Any

Non-blocking variant of put!.

The reset_func should in-place edit item. It is applied to the item only when the item can be inserted into the chnl.

Returns true if item was inserted, or false immediately when the chnl is full.

source
AtomicChannels.trytake!Method
trytake!(chnl::AtomicChannel{T}) where T<:Any

Non-blocking variant of take!.

Returns an item from the chnl when available, or nothing immediately when the chnl is empty.

Edge case:

  • If you put a nothing into the chnl, it will be treated as a valid item and returned.
source
Base.fetchMethod
fetch(chnl::AtomicChannel{T}) where T<:Any

Waits for and returns (without removing) the first available item from the AtomicChannel.

source
Base.fill!Method
fill!(reuse_pool::ReusePool{T}) where T<:Any

Fills the pool to its maximum size by creating new items using the create function.

source
Base.put!Method
put!(chnl::AtomicChannel{T}, item::T) where T<:Any

Puts an item into the chnl. If the chnl is full, the function is blocked until space is available.

See also: take! to take an item from the chnl (blocked when empty).

See also: tryput! and trytake! for non-blocking versions of put and take.

source
Base.put!Method
put!(reuse_pool::ReusePool{T}, item::T) where T<:Any

Puts an item back into the pool after resetting it. If the pool is full, the function is blocked until space is available.

See also: take! to take an item from the pool (blocked when empty).

See also: acquire! and release! for non-blocking versions of take and put.

source
Base.take!Method
take!(reuse_pool::ReusePool{T}) where T<:Any

Takes an item from the pool. If the pool is empty, the function is blocked until an item is available.

See also: put! to reset and put an item back to the pool (blocked when full).

See also: acquire! and release! for non-blocking versions of take and put.

source
Base.take!Method
take!(chnl::AtomicChannel{T}) where T<:Any

Takes an item from the chnl. If the chnl is empty, the function is blocked until an item is available.

Edge case:

  • If you put a nothing into the chnl, it will be treated as a valid item and returned.

See also: put! to put an item into the chnl (blocked when full).

See also: tryput! and trytake! for non-blocking versions of put and take.

source