numerical-collection-cpp 0.10.0
A collection of algorithms in numerical analysis implemented in C++
Loading...
Searching...
No Matches
producer_consumer_circular_queue.h
Go to the documentation of this file.
1/*
2 * Copyright 2021 MusicScience37 (Kenta Kabashima)
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
20#pragma once
21
22#include <atomic>
23#include <cstddef>
24#include <limits>
25#include <type_traits> // IWYU pragma: keep
26#include <utility>
27
34
35namespace num_collect::util {
36
46template <typename T>
47class alignas(cache_line) producer_consumer_circular_queue {
48public:
55 : begin_(new storage_type[get_buffer_size(size)]),
56 // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
57 end_(begin_ + (size + 1)),
58 producer_pos_(begin_),
59 consumer_pos_(begin_) {}
60
65 while (try_ignore()) {
66 }
67 delete[] begin_;
68 }
69
71 const producer_consumer_circular_queue&) = delete;
74 auto operator=(const producer_consumer_circular_queue&) = delete;
75 auto operator=(producer_consumer_circular_queue&&) = delete;
76
84 template <typename... Args>
85 [[nodiscard]] auto try_emplace(Args&&... args) noexcept(
86 std::is_nothrow_constructible_v<T, Args...>) -> bool {
87 storage_type* const pushed_pos =
88 producer_pos_.load(std::memory_order::relaxed);
89 storage_type* const next_producer_pos = increment(pushed_pos);
90 if (next_producer_pos ==
91 consumer_pos_.load(std::memory_order::acquire)) {
92 return false;
93 }
94
95 pushed_pos->emplace(std::forward<Args>(args)...);
96 producer_pos_.store(next_producer_pos, std::memory_order::release);
97 return true;
98 }
99
107 template <typename Output>
108 [[nodiscard]] auto try_pop(Output& output) noexcept(
109 noexcept(output = std::move(std::declval<T&>()))) -> bool {
110 storage_type* const popped_pos =
111 consumer_pos_.load(std::memory_order::relaxed);
112 if (popped_pos == producer_pos_.load(std::memory_order::acquire)) {
113 return false;
114 }
115
116 output = std::move(popped_pos->get_ref());
117 popped_pos->reset();
118 consumer_pos_.store(increment(popped_pos), std::memory_order::release);
119 return true;
120 }
121
127 [[nodiscard]] auto try_ignore() noexcept -> bool {
128 storage_type* const popped_pos =
129 consumer_pos_.load(std::memory_order::relaxed);
130 if (popped_pos == producer_pos_.load(std::memory_order::acquire)) {
131 return false;
132 }
133
134 popped_pos->reset();
135 consumer_pos_.store(increment(popped_pos), std::memory_order::release);
136 return true;
137 }
138
139private:
142
149 [[nodiscard]] static auto get_buffer_size(index_type val) -> std::size_t {
150 if (val <= 0 || val == std::numeric_limits<index_type>::max()) {
152 invalid_argument, "Invalid queue size {}.", val);
153 }
154 return safe_cast<std::size_t>(val + 1);
155 }
156
163 [[nodiscard]] auto increment(storage_type* ptr) noexcept -> storage_type* {
164 // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
165 ++ptr;
166 if (ptr == end_) {
167 ptr = begin_;
168 }
169 return ptr;
170 }
171
174
177
183 std::atomic<storage_type*> producer_pos_;
184
190 std::atomic<storage_type*> consumer_pos_;
191};
192
193} // namespace num_collect::util
Definition of cache_line variable.
Class of exception on invalid arguments.
Definition exception.h:85
Class of storage of objects.
auto get_ref() noexcept -> T &
Get the reference of the value.
void emplace(Args &&... args)
Construct an object.
void reset() noexcept
Destruct the object.
Class of a queue using a circular buffer and thread-safe for a single producer thread and a single co...
auto try_emplace(Args &&... args) noexcept(std::is_nothrow_constructible_v< T, Args... >) -> bool
Try to push an element constructing in-place.
std::atomic< storage_type * > consumer_pos_
Position of the consumer.
storage_type * end_
Past-the-end pointer of the buffer.
auto try_pop(Output &output) noexcept(noexcept(output=std::move(std::declval< T & >()))) -> bool
Try to pop an element.
static auto get_buffer_size(index_type val) -> std::size_t
Validate a size and get the size of the buffer.
auto increment(storage_type *ptr) noexcept -> storage_type *
Increment a pointer with consideration of the loop of the buffer.
std::atomic< storage_type * > producer_pos_
Position of the producer.
auto try_ignore() noexcept -> bool
Try to pop an element without getting the element.
Definition of exceptions.
Definition of index_type type.
Definition of macros for logging.
#define NUM_COLLECT_LOG_AND_THROW(EXCEPTION_TYPE,...)
Write an error log and throw an exception for an error.
std::ptrdiff_t index_type
Type of indices in this library.
Definition index_type.h:33
Namespace of utilities.
Definition assert.h:30
Definition of object_storage class.
Definition of safe_cast function.