1

Implementing C++20 atomic waiting in libstdc++

 1 year ago
source link: https://developers.redhat.com/articles/2022/12/06/implementing-c20-atomic-waiting-libstdc
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Implementing C++20 atomic waiting in libstdc++ Skip to main content

C_C_featuredimage.png?itok=o1mJTa7J

The C++ standard library gained some new concurrency features with C++20:

  • Wait and notify operations on std::atomic<T>
  • Semaphores
  • Latches
  • Barriers

In this article, I will cover the current implementation approach for atomic wait/notify, as these are basis operations required to implement the remaining coordination primitives introduced with C++20. Subsequent articles in this series will cover the details of the other types.

Note: The implementation presented here is considered experimental and the details will almost certainly change in a future version of GCC as we commit to an ABI for these features.

Let's start by taking a look at what the C++ standard says about atomic waiting:

1 Atomic waiting operations and atomic notifying operations provide a mechanism to wait for the value of an atomic object to change more efficiently than can be achieved with polling. An atomic waiting operation may block until it is unblocked by an atomic notifying operation, according to each function's effects.

[Note 1 : Programs are not guaranteed to observe transient atomic values, an issue known as the A-B-A problem, resulting in continued blocking if a condition is only temporarily met. — end note]

2 [Note 2 : The following functions are atomic waiting operations:

(2.1) — atomic<T>::wait,

(2.2) — atomic_flag::wait,

(2.3) — atomic_wait and atomic_wait_explicit

(2.4) — atomic_flag_wait and atomic_flag_wait_explicit, and

(2.5) — atomic_ref::wait. — end note]

3 [Note 3 : The following functions are atomic notifying operations:

(3.1) — atomic<T>::notify_one and atomic<T>::notify_all,

(3.2) — atomic_flag::notify_one and atomic_flag::notify_all,

(3.3) — atomic_notify_one and atomic_notify_all,

(3.4) — atomic_flag_notify_one and atomic_flag_notify_all, and

(3.5) — atomic_ref<T>::notify_one and atomic_ref<T>::notify_all. — end note]

4 A call to an atomic waiting operation on an atomic object M is eligible to be unblocked by a call to an atomic notifying operation on M if there exist side effects X and Y on M such that:

(4.1) — the atomic waiting operation has blocked after observing the result of X,

(4.2) — X precedes Y in the modification order of M, and

(4.3) — Y happens before the call to the atomic notifying operation.

How can we implement atomic waiting?

How can we implement atomic waiting?

The only universal strategy for implementing atomic waiting is to spin in an atomic load-compare loop. This isn't particularly efficient if the waiter is likely to block for some extended period, but it is advantageous in terms of application responsiveness in many cases to do a bit of spinning before calling into a more expensive operating system level primitive.

libstdc++ implements its spin logic as follows:

  • If supported, this is how we let the CPU or kernel know we are able to yield or relax our spinning:

    inline void
    __thread_yield() noexcept
    {
    #if defined _GLIBCXX_HAS_GTHREADS && defined _GLIBCXX_USE_SCHED_YIELD
      __gthread_yield();
    #endif
    }
    
    inline void
    __thread_relax() noexcept
    {
    #if defined __i386__ || defined __x86_64__
      __builtin_ia32_pause();
    #else
      __thread_yield();
    #endif
    }
    
  • We use these constants in the spin loop:

    • constexpr auto __atomic_spin_count_1 = 12;
    • constexpr auto __atomic_spin_count_2 = 4;
  • We provide for a pluggable policy in the form of a callable that is invoked as the last step in the spin algorithm. This will be used later in the implementation of timed waiting. The default policy is to exit the spin loop.

    
    struct __default_spin_policy
    {
      bool
      operator()() const noexcept
      { return false; }
    };
    
  • The spin loop itself performs the following steps, in order:

    1. Spin for a few iterations evaluating the spin predicate and then performing the relax operation to either issue a pause instruction or yield the thread.
    2. Spin for a few iterations and yield the thread if the spin predicate is not satisfied.
    3. Spin until the spin policy indicates we should stop.

    The spin returns true if the predicate is satisfied.

    template<typename _Pred, typename _Spin = __default_spin_policy>
    bool
    __atomic_spin(_Pred& __pred, _Spin __spin = _Spin{ }) noexcept
    {
    	for (auto __i = 0; __i < __atomic_spin_count; ++__i)
    	  {
    	    if (__pred())
    	      return true;
    
    	    if (__i < __atomic_spin_count_relax)
    	      __detail::__thread_relax();
    	    else
    	      __detail::__thread_yield();
    	  }
    
    	while (__spin())
    	  {
    	    if (__pred())
    	      return true;
    	  }
    
    	return false;
    }
    

A more efficient wait

A more efficient wait

On some platforms, the operating system provides an efficient waiting primitive; we should use that facility where available. For Linux, this waiting primitive is futex(2); on Darwin, it is ulock_wait/ulock_wake. If present, these facilities are typically restricted in the type on which you can wait; either a 32-bit int, in the case of a futex, or a 64-bit unsigned int in the case ulock_wait. Clearly, atomic can (and is required by the standard to) support specialization on more types than int32_t or uint64_t, but we'd certainly like to take advantage of the platform-specific wait if it's possible to do so.

libstdc++ implements the low-level details of the platform-specific waiting strategy conditionally as follows. First, we define the fundamental type we can wait efficiently on:

#ifdef _GLIBCXX_HAVE_LINUX_FUTEX
    using __platform_wait_t = int;
    static constexpr size_t __platform_wait_alignment = 4;
#else
    using __platform_wait_t = uint64_t;
    static constexpr size_t __platform_wait_alignment
      = __alignof__(__platform_wait_t);
#endif

libstdc++ defines its own versions of the constants it uses to invoke the futex syscall rather than relying on the presence of <linux/futex.h>:

#ifdef _GLIBCXX_HAVE_LINUX_FUTEX
#define _GLIBCXX_HAVE_PLATFORM_WAIT 1
    enum class __futex_wait_flags : int
    {
#ifdef _GLIBCXX_HAVE_LINUX_FUTEX_PRIVATE
      __private_flag = 128,
#else
      __private_flag = 0,
#endif
      __wait = 0,
      __wake = 1,
      __wait_bitset = 9,
      __wake_bitset = 10,
      __wait_private = __wait | __private_flag,
      __wake_private = __wake | __private_flag,
      __wait_bitset_private = __wait_bitset | __private_flag,
      __wake_bitset_private = __wake_bitset | __private_flag,
      __bitset_match_any = -1
    };

We invoke the futex syscall to wait as follows:

__platform_wait(const _Tp* __addr, __platform_wait_t __val) noexcept
{
  auto __e = syscall (SYS_futex, static_cast<const void*>(__addr),
                      static_cast<int>(__futex_wait_flags::__wait_private),
                      __val, nullptr);
  if (!__e || errno == EAGAIN)
    return;
  if (errno != EINTR)
    __throw_system_error(errno);
}

Then we invoke the futex syscall to notify one or more waiting threads as follows:

template<typename _Tp>
  void
  __platform_notify(const _Tp* __addr, bool __all) noexcept
  {
    syscall (SYS_futex, static_cast<const void*>(__addr),
             static_cast<int>(__futex_wait_flags::__wake_private),
             __all ? INT_MAX : 1);
  }

The current (as of GCC 12) implementation does not support ulock_wait-based waiting. That (and presumably other similar OS facilities) can be conditionally supported in the future:

// define _GLIBCX_HAVE_PLATFORM_WAIT and implement __platform_wait()
// and __platform_notify() if there is a more efficient primitive supported
// by the platform (e.g. __ulock_wait()/__ulock_wake()) which is better than
// a mutex/condvar based wait

How to handle those types that do not fit in a __platform_wait_t

How to handle those types that do not fit in a __platform_wait_t

To solve this problem, we employ the timeless advice of adding another layer of indirection, of course. Just because we need to atomically modify the contents of an atomic<T>, we aren't necessarily restricted to implementing wait and notify in terms of that type; we can carry a __platform_wait_t that is used only for this purpose.

One approach would be to add a new __platform_wait_t member to atomic<T>. This would be a fairly expensive approach in terms of ballooning the size of atomic<T> and pessimizing every use of it. We are also precluded from taking this approach because of ABI stability.

Instead, we create a side table to hold these proxy __platform_wait_ts and hash into that table by the address of the atomic value.

Making notify cheaper

Making notify cheaper

The syscall() to notify waiting threads is fairly expensive compared to an atomic load. A simple micro-benchmark bears this out -


------------------------------------------------------------------
Benchmark                        Time             CPU   Iterations
------------------------------------------------------------------
BM_empty_notify_checked       3.81 ns         3.81 ns    183296078
BM_empty_notify_syscall       96.9 ns         96.8 ns      7124788

As this is a micro-benchmark, there are number of caveats to these numbers (and discussion on the subject is a topic for a future article). That said, the cost of blindly making a futex syscall is roughly 30 times that of the optimization outlined here.

Waiters entering the wait syscall are going to pay for an expensive operation anyway, so making them pay a bit extra in terms of an atomic increment is reasonable. Notifiers can then perform an atomic load to determine if the syscall is likely to wake any waiters.

libstdc++'s side table elements could then be defined as follows:

struct __waiter_pool
{
  __platform_wait_t _M_wait = 0; // Count of waiters
  __platform_wait_t _M_ver = 0; // Proxy address to use for a futex
};

What about platforms that don't provide some low-level efficient wait?

What about platforms that don't provide some low-level efficient wait?

Wait and notify can be implemented in terms of a mutex and condition variable that the C++ standard library is required to provide, so libstdc++'s side table conditionally carries a mutex and condition_variable for those platforms:

  struct __waiter_pool
  {
#ifdef __cpp_lib_hardware_interference_size
    static constexpr auto _S_align = hardware_destructive_interference_size;
#else
    static constexpr auto _S_align = 64;
#endif

    alignas(_S_align) __platform_wait_t _M_wait = 0;

#ifndef _GLIBCXX_HAVE_PLATFORM_WAIT
    mutex _M_mtx;
#endif

    alignas(_S_align) __platform_wait_t _M_ver = 0;

#ifndef _GLIBCXX_HAVE_PLATFORM_WAIT
    __condvar _M_cv;
#endif

    // Note entry into a wait
    void
    _M_enter_wait() noexcept
    { __atomic_fetch_add(&_M_wait, 1, __ATOMIC_SEQ_CST); }

    // Note exit from a wait
    void
    _M_leave_wait() noexcept
    { __atomic_fetch_sub(&_M_wait, 1, __ATOMIC_RELEASE); }

    // Hello? Is there anybody in there? Just nod if you can hear me.
    bool
    _M_waiting() const noexcept
    {
      __platform_wait_t __res;
      __atomic_load(&_M_wait, &__res, __ATOMIC_SEQ_CST);
      return __res > 0;
    }
  };

This struct fits on two hardware cache lines (assuming 64-byte cache lines) and places the atomic counts on different lines.

The same consideration regarding pessimizing notifiers when no thread is waiting applies to platforms that fall back to the mutex/condvar implementation strategy. And the side table itself is a static instance, defined as follows:

struct __waiter_pool
{
  // ...

  static __waiter_pool&
  _S_for(const void* __addr) noexcept
  {
    constexpr uintptr_t __ct = 16;
    static __waiter_pool __w[__ct];
    auto __key = (uintptr_t(__addr) >> 2) % __ct;
    return __w[__key];
  }
};

Putting together the pieces for a wait primitive

Putting together the pieces for a wait primitive

The C++ standard has the following to say about what wait() should do:

void wait(T old, memory_order order = memory_order::seq_cst) const noexcept;

22 Preconditions: order is neither memory_order::release nor memory_order::acq_rel.

23 Effects: Repeatedly performs the following steps, in order:

(23.1) — Evaluates load(order) and compares its value representation for equality against that of old.

(23.2) — If they compare unequal, returns.

(23.3) — Blocks until it is unblocked by an atomic notifying operation or is unblocked spuriously.

For atomic<T>s where T is compatible with (and support is present for) __platform_wait(), we would perform the following steps:

  1. Obtain __waiter_pool entry __w for the address to be waited
  2. Call __w._M_enter_wait() to signal that a waiter is available to be notified
  3. Repeatedly perform the following steps, in order:
    1. Evaluate load(order) and compare its value representation for equality against that of __old
    2. If they compare unequal:
      1. Call __w._M_leave_wait() to signal that the waiter has left the wait
      2. return
    3. If they compare equal, then perform the following steps in order:
      1. Spin for a bit, performing load(order) to see if value representation changes
      2. If it doesn't change, call __platform_wait(__addr), where __addr is the address to be notified
    4. Call __w._M_leave_wait() to signal the waiter has left the wait

For atomic<T>s where T is not compatible with __platform_wait() but __platform_wait() is available, we would perform the following steps:

  1. Obtain __waiter_pool entry __w for the address to be waited
  2. Call __w._M_enter_wait() to signal that a waiter is available to be notified
  3. Repeatedly perform the following steps, in order:
    1. Evaluate load(order) and compare its value representation for equality against that of __old
    2. If they compare unequal:
      1. Call __w._M_leave_wait() to signal that the waiter has left the wait
      2. return
    3. If they compare equal, then perform the following steps in order:
      1. Spin for a bit, performing load(order) to see if value representation changes
      2. If it doesn't change, call __platform_wait(&__w._M_ver)
  4. Call __w._M_leave_wait() to signal that the waiter has left the wait

When no __platform_wait() is available, we would perform the following steps:

  1. Obtain __waiter_pool entry __w for the address to be waited
  2. Call __w._M_enter_wait() to signal that a waiter is available to be notified
  3. Repeatedly perform the following steps, in order:
    1. Evaluate load(order) and compare its value representation for equality against that of __old
    2. If they compare unequal:
      1. Call __w._M_leave_wait() to signal that the waiter has left the wait
      2. return
    3. If they compare equal, then perform the following steps in order:
      1. Spin for a bit, performing relaxed load of __w._M_ver and checking to see if its value changes; if so, exit the spin indicating success
      2. Otherwise, acquire __w._M_mtx
      3. Atomically load __w._M_ver and compare its result to the value last observed during the spin loop; if they compare equal, enter __w._M_cv.wait(__w._M_mtx)
      4. Release __w._M_mtx
  4. Call __w._M_leave_wait() to signal that the waiter has left the wait

Putting together the pieces for a notify primitive

Putting together the pieces for a notify primitive

The C++ standard has the following to say about what notify_one() and notify_all() should do:

void notify_one() const noexcept;

25 Effects: Unblocks the execution of at least one atomic waiting operation on *ptr that is eligible to be unblocked (31.6) by this call, if any such atomic waiting operations exist. 26 Remarks: This function is an atomic notifying operation (31.6) on atomic object *ptr.

void notify_all() const noexcept;

27 Effects: Unblocks the execution of all atomic waiting operations on *ptr that are eligible to be unblocked (31.6) by this call.

For atomic<T>s where T is compatible with (and support is preset for) __platform_notify(), we would perform the following steps:

  1. Obtain __waiter_pool entry __w for the address to be notified
  2. Check if there are any __w._M_waiting(), and, if so, call __platform_notify(__addr, __all), where:
    • __all indicates whether a wake one or wake all is to be performed
    • __addr is the address to be notified

For atomic<T>s where T is not compatible with __platform_notify() but __platform_notify() is available, we would perform the following steps:

  1. Obtain __waiter_pool entry __w for the address to be notified
  2. Check if there are any __w._M_waiting, and, if so, perform the following steps:
    1. Perform an atomic increment on w._M_ver
    2. Call __platform_notify(&._M_ver, __all), where __all indicates whether a wake one or wake all is to be performed

When no __platform_notify() is available, we would perform the following steps:

  1. Obtain __waiter_pool entry __w for the address to be notified
  2. Check if there are any __w._M_waiting, and, if so, perform the following steps:
    1. Perform an atomic increment on __w._M_ver
    2. Call __w._M_cv.notify_one/all()

Handling the various type and platform choices

Handling the various type and platform choices

The libstdc++ code to determine whether or not a given type T supports __platform_wait() is:

  template<typename _Tp>
    inline constexpr bool __platform_wait_uses_type
#ifdef _GLIBCXX_HAVE_PLATFORM_WAIT
      = is_scalar_v<_Tp>
  && ((sizeof(_Tp) == sizeof(__detail::__platform_wait_t))
  && (alignof(_Tp*) >= __platform_wait_alignment));
#else
      = false;
#endif

If the platform supports efficient notification, and if sizeof(T) is the same as __platform_wait_t and has the same alignment, then T uses __platform_wait(). In all other cases, it does not.

In all cases, we will end up doing some sort of wait against the value of a __platform_wait_t, as follows:

  struct __waiter_pool
  {
    // ...

    void
    _M_do_wait(const __platform_wait_t* __addr, __platform_wait_t __old) noexcept
    {
#ifdef _GLIBCXX_HAVE_PLATFORM_WAIT
      __platform_wait(__addr, __old);
#else
      __platform_wait_t __val;
      __atomic_load(__addr, &__val, __ATOMIC_SEQ_CST);
      if (__val == __old)
        {
          lock_guard<mutex> __l(_M_mtx);
          _M_cv.wait(_M_mtx);
        }
#endif // __GLIBCXX_HAVE_PLATFORM_WAIT
    }
  };

And the corresponding notification is implemented as follows:

  struct __waiter_pool
  {
    // ...

    void
    _M_notify(const __platform_wait_t* __addr, bool __all) noexcept
    {
      if (!_M_waiting())
        return;

#ifdef _GLIBCXX_HAVE_PLATFORM_WAIT
      __platform_notify(__addr, __all);
#else
      { lock_guard __l(_M_mtx); }
      if (__all)
        _M_cv.notify_all();
      else
        _M_cv.notify_one();
#endif
    }
  };

Note: the seemingly useless mutex acquisition above is significant, as it ensures that the state of the mutex is synchronized in this thread.

In each wait strategy, there are multiple points where __w._M_leave_wait() needs to be called. An RAII wrapper type can take care of making sure this always happens at scope exit:

struct __waiter
{
  __waiter_pool& _M_w;

  __waiter(__waiter_pool& __w)
    : _M_w(__w)
  { _M_w._M_enter_wait(); }

  ~__waiter()
  { _M_w._M_leave_wait(); }
};

In each wait strategy, there is always a __platform_wait_t* that will be used for determining if a wait has been notified; the particular address is a function of __platform_wait_uses_type<>. The __waiter RAII type can take care of initializing itself with the correct address:

struct __waiter
{
  __waiter_pool& _M_w;
  __platform_wait_t* _M_addr;

  template<typename _Up>
    static __platform_wait_t*
    _S_wait_addr(const _Up* __a, __platform_wait_t* __b)
    {
      if constexpr (__platform_wait_uses_type<_Up>)
        return reinterpret_cast<__platform_wait_t*>(const_cast<_Up*>(__a));
      else
        return __b;
    }

  template<typename _Up>
    explicit __waiter(const _Up* __addr) noexcept
      : _M_w(__waiter_pool::_S_for(__addr))
      , _M_addr(_S_wait_addr(__addr, &_M_w._M_ver))
    { _M_w._M_enter_wait(); }

  ~__waiter()
  { _M_w._M_leave_wait(); }
};

Our wait and notify primitives can now construct a __waiter __w(__addr) for any type and get a handle to the A type initialized with the correct address to wait and notify on. However, notify does not need to manipulate that waiter count, so we have two kinds of waiter:

template<typename _EntersWait>
  struct __waiter
  {
    __waiter_pool& _M_w;
    __platform_wait_t* _M_addr;

    template<typename _Up>
      static __platform_wait_t*
      _S_wait_addr(const _Up* __a, __platform_wait_t* __b)
      {
        if constexpr (__platform_wait_uses_type<_Up>)
          return reinterpret_cast<__platform_wait_t*>(const_cast<_Up*>(__a));
        else
          return __b;
      }

    template<typename _Up>
      explicit __waiter(const _Up* __addr) noexcept
        : _M_w(__waiter_pool::_S_for(__addr))
        , _M_addr(_S_wait_addr(__addr, &_M_w._M_ver))
      {
        if constexpr (_EntersWait::value)
          _M_w._M_enter_wait();
      }

    ~__waiter()
    {
      if constexpr (_EntersWait::value)
        _M_w._M_leave_wait();
    }
  };

  using __enters_wait = __waiter<std::true_type>;
  using __bare_wait = __waiter<std::false_type>;

Wait operations use the __enters_wait type alias, and notify operations use the __bare_wait type alias.

If we are on a platform that does not support __platform_wait, it is necessary to communicate the last value seen during the spin loop to the wait. In libstdc++, this is implemented as follows:

struct __waiter
{
  // ...

template<typename _Up, typename _ValFn,
  typename _Spin = __default_spin_policy>
  static bool
  _S_do_spin_v(__platform_wait_t* __addr,
        const _Up& __old, _ValFn __vfn,
        __platform_wait_t& __val,
        _Spin __spin = _Spin{ })
  {
    auto const __pred = [=]
      { return !__atomic_compare(__old, __vfn()); };

    if constexpr (__platform_wait_uses_type<_Up>)
      { __val == __old; }
    else
      // if we implement wait/notify by mutex/condvar we
      // the current value of _M_w._M_ver, which is the
      // the address in __addr
      { __atomic_load(__addr, &__val, __ATOMIC_SEQ_CST); }
    return __atomic_spin(__pred, __spin);
  }

  template<typename _Up, typename _ValFn,
    typename _Spin = __default_spin_policy>
    bool
    _M_do_spin_v(const _Up& __old, _ValFn __vfn,
          __platform_wait_t& __val,
          _Spin __spin = _Spin{ })
    { return _S_do_spin_v(_M_addr, __old, __vfn, __val, __spin); }
};

_ValFn is a unary callable that knows how to return the current value for the address being waited. All functions that receive one have a _v() suffix.

The primitive wait implementation is:

struct __waiter
{
  // ...

template<typename _Tp, typename _ValFn>
  void
  _M_do_wait_v(_Tp __old, _ValFn __vfn)
  {
    do
    {
      __platform_wait_t __val;
      if (__base_type::_M_do_spin_v(__old, __vfn, __val))
        return;
      __base_type::_M_w._M_do_wait(__base_type::_M_addr, __val);
    } while (__atomic_compare(__old, __vfn()));
  }
};

The wrapper that atomic<T>::wait calls into is:

template<typename _Tp, typename _ValFn>
  void
  __atomic_wait_address_v(const _Tp* __addr, _Tp __old,
        _ValFn __vfn) noexcept
  {
    __detail::__enters_wait __w(__addr);
    __w._M_do_wait_v(__old, __vfn);
  }

And the corresponding primitive notify function is:

template<typename _EntersWait>
  struct __waiter
  {
    // ...

    void
    _M_notify(bool __all)
    {
      if (_M_addr == &_M_w._M_ver)
        __atomic_fetch_add(_M_addr, 1, __ATOMIC_SEQ_CST);
      _M_w._M_notify(_M_addr, __all);
    }
  };

However, there is a subtle problem with this formulation. If we are proxying notifications for a type not supported by __platform_notify(), or if __platform_notify() is not available, the call _M_notify(false) could wake the incorrect thread. So we add another member to __waiter, _M_launder, to capture this:

template<typename _EntersWait>
  struct __waiter
  {
    __waiter_pool& _M_w;
    __platform_wait_t* _M_addr;

    // ...

    template<typename _Up>
      explicit __waiter(const _Up* __addr) noexcept
        : _M_w(__waiter_pool::_S_for(__addr))
        , _M_addr(_S_wait_addr(__addr, &_M_w._M_ver))
        , _M_laundered(!__platform_wait_uses_type<_Up>)
      { ... }

    bool
    _M_laundered() const noexcept
    { return _M_addr == &_M_w._M_ver; }

    void
    _M_notify(bool __all)
    {
      if (_M_laundered())
        {
          __atomic_fetch_add(_M_addr, 1, __ATOMIC_SEQ_CST);
          __all = true;
        }
      _M_w._M_notify(_M_addr, __all);
    }
  };

And the wrapper that atomic<T>::notify calls into is:

template<typename _Tp>
  void
  __atomic_notify_address(const _Tp* __addr, bool __all) noexcept
  {
    __detail::__bare_wait __w(__addr);
    __w._M_notify(__all, true);
  }

Next time

Next time

C++20 introduces counting_semaphore and binary_semaphore (which support blocking acquire()), non-blocking try_acquire(), as well as timed try_acquire_for() and try_acquire_until(). The next article in this series will look at the implementation of counting_semaphore and binary_semaphore and the functionality to implement timed atomic waiting.


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK