Coverage Report

Created: 2024-10-11 06:23

/builds/MusicScience37Projects/numerical-analysis/numerical-collection-cpp/include/num_collect/util/producer_consumer_circular_queue.h
Line
Count
Source
1
/*
2
 * Copyright 2021 MusicScience37 (Kenta Kabashima)
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
/*!
17
 * \file
18
 * \brief Definition of producer_consumer_circular_queue class.
19
 */
20
#pragma once
21
22
#include <atomic>
23
#include <cstddef>
24
#include <limits>
25
#include <type_traits>  // IWYU pragma: keep
26
#include <utility>
27
28
#include "num_collect/base/exception.h"
29
#include "num_collect/base/index_type.h"
30
#include "num_collect/logging/logging_macros.h"
31
#include "num_collect/util/cache_line.h"
32
#include "num_collect/util/object_storage.h"
33
#include "num_collect/util/safe_cast.h"
34
35
namespace num_collect::util {
36
37
/*!
38
 * \brief Class of a queue using a circular buffer and thread-safe for a single
39
 * producer thread and a single consumer thread.
40
 *
41
 * \tparam T Type of elements in the queue.
42
 *
43
 * \thread_safety Operations for the same object is thread safe only if a single
44
 * producer thread and a single consumer thread exists.
45
 */
46
template <typename T>
47
class alignas(cache_line) producer_consumer_circular_queue {
48
public:
49
    /*!
50
     * \brief Constructor.
51
     *
52
     * \param[in] size Size.
53
     */
54
    explicit producer_consumer_circular_queue(index_type size)
55
16
        : begin_(new storage_type[get_buffer_size(size)]),
56
          // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
57
16
          end_(begin_ + (size + 1)),
58
16
          producer_pos_(begin_),
59
16
          consumer_pos_(begin_) {}
_ZN11num_collect4util32producer_consumer_circular_queueINSt3__112basic_stringIcNS2_11char_traitsIcEENS2_9allocatorIcEEEEEC2El
Line
Count
Source
55
12
        : begin_(new storage_type[get_buffer_size(size)]),
56
          // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
57
12
          end_(begin_ + (size + 1)),
58
12
          producer_pos_(begin_),
59
12
          consumer_pos_(begin_) {}
_ZN11num_collect4util32producer_consumer_circular_queueIiEC2El
Line
Count
Source
55
2
        : begin_(new storage_type[get_buffer_size(size)]),
56
          // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
57
2
          end_(begin_ + (size + 1)),
58
2
          producer_pos_(begin_),
59
2
          consumer_pos_(begin_) {}
_ZN11num_collect4util32producer_consumer_circular_queueINSt3__110unique_ptrIiNS2_14default_deleteIiEEEEEC2El
Line
Count
Source
55
2
        : begin_(new storage_type[get_buffer_size(size)]),
56
          // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
57
2
          end_(begin_ + (size + 1)),
58
2
          producer_pos_(begin_),
59
2
          consumer_pos_(begin_) {}
60
61
    /*!
62
     * \brief Destructor.
63
     */
64
13
    ~producer_consumer_circular_queue() noexcept {
65
16
        while (try_ignore()) {
66
3
        }
67
13
        delete[] begin_;
68
13
    }
_ZN11num_collect4util32producer_consumer_circular_queueINSt3__112basic_stringIcNS2_11char_traitsIcEENS2_9allocatorIcEEEEED2Ev
Line
Count
Source
64
9
    ~producer_consumer_circular_queue() noexcept {
65
12
        while (try_ignore()) {
66
3
        }
67
9
        delete[] begin_;
68
9
    }
_ZN11num_collect4util32producer_consumer_circular_queueIiED2Ev
Line
Count
Source
64
2
    ~producer_consumer_circular_queue() noexcept {
65
2
        while (try_ignore()) {
66
0
        }
67
2
        delete[] begin_;
68
2
    }
_ZN11num_collect4util32producer_consumer_circular_queueINSt3__110unique_ptrIiNS2_14default_deleteIiEEEEED2Ev
Line
Count
Source
64
2
    ~producer_consumer_circular_queue() noexcept {
65
2
        while (try_ignore()) {
66
0
        }
67
2
        delete[] begin_;
68
2
    }
69
70
    producer_consumer_circular_queue(
71
        const producer_consumer_circular_queue&) = delete;
72
    producer_consumer_circular_queue(
73
        producer_consumer_circular_queue&&) = delete;
74
    auto operator=(const producer_consumer_circular_queue&) = delete;
75
    auto operator=(producer_consumer_circular_queue&&) = delete;
76
77
    /*!
78
     * \brief Try to push an element constructing in-place.
79
     *
80
     * \tparam Args Types of arguments of the constructor.
81
     * \param[in] args Arguments of the constructor.
82
     * \return Whether an element could be pushed.
83
     */
84
    template <typename... Args>
85
    [[nodiscard]] auto try_emplace(Args&&... args) noexcept(
86
343
        std::is_nothrow_constructible_v<T, Args...>) -> bool {
87
343
        storage_type* const pushed_pos =
88
343
            producer_pos_.load(std::memory_order::relaxed);
89
343
        storage_type* const next_producer_pos = increment(pushed_pos);
90
343
        if (next_producer_pos ==
91
343
            consumer_pos_.load(std::memory_order::acquire)) {
92
29
            return false;
93
29
        }
94
95
314
        pushed_pos->emplace(std::forward<Args>(args)...);
96
314
        producer_pos_.store(next_producer_pos, std::memory_order::release);
97
314
        return true;
98
343
    }
_ZN11num_collect4util32producer_consumer_circular_queueINSt3__112basic_stringIcNS2_11char_traitsIcEENS2_9allocatorIcEEEEE11try_emplaceIJRA4_KcEEEbDpOT_
Line
Count
Source
86
15
        std::is_nothrow_constructible_v<T, Args...>) -> bool {
87
15
        storage_type* const pushed_pos =
88
15
            producer_pos_.load(std::memory_order::relaxed);
89
15
        storage_type* const next_producer_pos = increment(pushed_pos);
90
15
        if (next_producer_pos ==
91
15
            consumer_pos_.load(std::memory_order::acquire)) {
92
1
            return false;
93
1
        }
94
95
14
        pushed_pos->emplace(std::forward<Args>(args)...);
96
14
        producer_pos_.store(next_producer_pos, std::memory_order::release);
97
14
        return true;
98
15
    }
_ZN11num_collect4util32producer_consumer_circular_queueINSt3__112basic_stringIcNS2_11char_traitsIcEENS2_9allocatorIcEEEEE11try_emplaceIJRKS8_EEEbDpOT_
Line
Count
Source
86
110
        std::is_nothrow_constructible_v<T, Args...>) -> bool {
87
110
        storage_type* const pushed_pos =
88
110
            producer_pos_.load(std::memory_order::relaxed);
89
110
        storage_type* const next_producer_pos = increment(pushed_pos);
90
110
        if (next_producer_pos ==
91
110
            consumer_pos_.load(std::memory_order::acquire)) {
92
10
            return false;
93
10
        }
94
95
100
        pushed_pos->emplace(std::forward<Args>(args)...);
96
100
        producer_pos_.store(next_producer_pos, std::memory_order::release);
97
100
        return true;
98
110
    }
_ZN11num_collect4util32producer_consumer_circular_queueIiE11try_emplaceIJRKiEEEbDpOT_
Line
Count
Source
86
109
        std::is_nothrow_constructible_v<T, Args...>) -> bool {
87
109
        storage_type* const pushed_pos =
88
109
            producer_pos_.load(std::memory_order::relaxed);
89
109
        storage_type* const next_producer_pos = increment(pushed_pos);
90
109
        if (next_producer_pos ==
91
109
            consumer_pos_.load(std::memory_order::acquire)) {
92
9
            return false;
93
9
        }
94
95
100
        pushed_pos->emplace(std::forward<Args>(args)...);
96
100
        producer_pos_.store(next_producer_pos, std::memory_order::release);
97
100
        return true;
98
109
    }
_ZN11num_collect4util32producer_consumer_circular_queueINSt3__110unique_ptrIiNS2_14default_deleteIiEEEEE11try_emplaceIJS6_EEEbDpOT_
Line
Count
Source
86
109
        std::is_nothrow_constructible_v<T, Args...>) -> bool {
87
109
        storage_type* const pushed_pos =
88
109
            producer_pos_.load(std::memory_order::relaxed);
89
109
        storage_type* const next_producer_pos = increment(pushed_pos);
90
109
        if (next_producer_pos ==
91
109
            consumer_pos_.load(std::memory_order::acquire)) {
92
9
            return false;
93
9
        }
94
95
100
        pushed_pos->emplace(std::forward<Args>(args)...);
96
100
        producer_pos_.store(next_producer_pos, std::memory_order::release);
97
100
        return true;
98
109
    }
99
100
    /*!
101
     * \brief Try to pop an element.
102
     *
103
     * \tparam Output Type of the variable to pop to.
104
     * \param[in] output Variable to pop to.
105
     * \return Whether an element could be popped.
106
     */
107
    template <typename Output>
108
    [[nodiscard]] auto try_pop(Output& output) noexcept(
109
342
        noexcept(output = std::move(std::declval<T&>()))) -> bool {
110
342
        storage_type* const popped_pos =
111
342
            consumer_pos_.load(std::memory_order::relaxed);
112
342
        if (popped_pos == producer_pos_.load(std::memory_order::acquire)) {
113
31
            return false;
114
31
        }
115
116
311
        output = std::move(popped_pos->get_ref());
117
311
        popped_pos->reset();
118
311
        consumer_pos_.store(increment(popped_pos), std::memory_order::release);
119
311
        return true;
120
342
    }
_ZN11num_collect4util32producer_consumer_circular_queueINSt3__112basic_stringIcNS2_11char_traitsIcEENS2_9allocatorIcEEEEE7try_popIS8_EEbRT_
Line
Count
Source
109
124
        noexcept(output = std::move(std::declval<T&>()))) -> bool {
110
124
        storage_type* const popped_pos =
111
124
            consumer_pos_.load(std::memory_order::relaxed);
112
124
        if (popped_pos == producer_pos_.load(std::memory_order::acquire)) {
113
13
            return false;
114
13
        }
115
116
111
        output = std::move(popped_pos->get_ref());
117
111
        popped_pos->reset();
118
111
        consumer_pos_.store(increment(popped_pos), std::memory_order::release);
119
111
        return true;
120
124
    }
_ZN11num_collect4util32producer_consumer_circular_queueIiE7try_popIiEEbRT_
Line
Count
Source
109
109
        noexcept(output = std::move(std::declval<T&>()))) -> bool {
110
109
        storage_type* const popped_pos =
111
109
            consumer_pos_.load(std::memory_order::relaxed);
112
109
        if (popped_pos == producer_pos_.load(std::memory_order::acquire)) {
113
9
            return false;
114
9
        }
115
116
100
        output = std::move(popped_pos->get_ref());
117
100
        popped_pos->reset();
118
100
        consumer_pos_.store(increment(popped_pos), std::memory_order::release);
119
100
        return true;
120
109
    }
_ZN11num_collect4util32producer_consumer_circular_queueINSt3__110unique_ptrIiNS2_14default_deleteIiEEEEE7try_popIS6_EEbRT_
Line
Count
Source
109
109
        noexcept(output = std::move(std::declval<T&>()))) -> bool {
110
109
        storage_type* const popped_pos =
111
109
            consumer_pos_.load(std::memory_order::relaxed);
112
109
        if (popped_pos == producer_pos_.load(std::memory_order::acquire)) {
113
9
            return false;
114
9
        }
115
116
100
        output = std::move(popped_pos->get_ref());
117
100
        popped_pos->reset();
118
100
        consumer_pos_.store(increment(popped_pos), std::memory_order::release);
119
100
        return true;
120
109
    }
121
122
    /*!
123
     * \brief Try to pop an element without getting the element.
124
     *
125
     * \return Whether an element could be popped.
126
     */
127
16
    [[nodiscard]] auto try_ignore() noexcept -> bool {
128
16
        storage_type* const popped_pos =
129
16
            consumer_pos_.load(std::memory_order::relaxed);
130
16
        if (popped_pos == producer_pos_.load(std::memory_order::acquire)) {
131
13
            return false;
132
13
        }
133
134
3
        popped_pos->reset();
135
3
        consumer_pos_.store(increment(popped_pos), std::memory_order::release);
136
3
        return true;
137
16
    }
_ZN11num_collect4util32producer_consumer_circular_queueINSt3__112basic_stringIcNS2_11char_traitsIcEENS2_9allocatorIcEEEEE10try_ignoreEv
Line
Count
Source
127
12
    [[nodiscard]] auto try_ignore() noexcept -> bool {
128
12
        storage_type* const popped_pos =
129
12
            consumer_pos_.load(std::memory_order::relaxed);
130
12
        if (popped_pos == producer_pos_.load(std::memory_order::acquire)) {
131
9
            return false;
132
9
        }
133
134
3
        popped_pos->reset();
135
3
        consumer_pos_.store(increment(popped_pos), std::memory_order::release);
136
3
        return true;
137
12
    }
_ZN11num_collect4util32producer_consumer_circular_queueIiE10try_ignoreEv
Line
Count
Source
127
2
    [[nodiscard]] auto try_ignore() noexcept -> bool {
128
2
        storage_type* const popped_pos =
129
2
            consumer_pos_.load(std::memory_order::relaxed);
130
2
        if (popped_pos == producer_pos_.load(std::memory_order::acquire)) {
131
2
            return false;
132
2
        }
133
134
0
        popped_pos->reset();
135
0
        consumer_pos_.store(increment(popped_pos), std::memory_order::release);
136
0
        return true;
137
2
    }
_ZN11num_collect4util32producer_consumer_circular_queueINSt3__110unique_ptrIiNS2_14default_deleteIiEEEEE10try_ignoreEv
Line
Count
Source
127
2
    [[nodiscard]] auto try_ignore() noexcept -> bool {
128
2
        storage_type* const popped_pos =
129
2
            consumer_pos_.load(std::memory_order::relaxed);
130
2
        if (popped_pos == producer_pos_.load(std::memory_order::acquire)) {
131
2
            return false;
132
2
        }
133
134
0
        popped_pos->reset();
135
0
        consumer_pos_.store(increment(popped_pos), std::memory_order::release);
136
0
        return true;
137
2
    }
138
139
private:
140
    //! Type of storage.
141
    using storage_type = object_storage<T, cache_line>;
142
143
    /*!
144
     * \brief Validate a size and get the size of the buffer.
145
     *
146
     * \param[in] val Size of the queue.
147
     * \return Size of the buffer.
148
     */
149
16
    [[nodiscard]] static auto get_buffer_size(index_type val) -> std::size_t {
150
16
        if (val <= 0 || val == std::numeric_limits<index_type>::max()) {
151
3
            NUM_COLLECT_LOG_AND_THROW(
152
3
                invalid_argument, "Invalid queue size {}.", val);
153
3
        }
154
16
        return safe_cast<std::size_t>(val + 1);
155
16
    }
_ZN11num_collect4util32producer_consumer_circular_queueINSt3__112basic_stringIcNS2_11char_traitsIcEENS2_9allocatorIcEEEEE15get_buffer_sizeEl
Line
Count
Source
149
12
    [[nodiscard]] static auto get_buffer_size(index_type val) -> std::size_t {
150
12
        if (val <= 0 || val == std::numeric_limits<index_type>::max()) {
151
3
            NUM_COLLECT_LOG_AND_THROW(
152
3
                invalid_argument, "Invalid queue size {}.", val);
153
3
        }
154
12
        return safe_cast<std::size_t>(val + 1);
155
12
    }
_ZN11num_collect4util32producer_consumer_circular_queueIiE15get_buffer_sizeEl
Line
Count
Source
149
2
    [[nodiscard]] static auto get_buffer_size(index_type val) -> std::size_t {
150
2
        if (val <= 0 || val == std::numeric_limits<index_type>::max()) {
151
0
            NUM_COLLECT_LOG_AND_THROW(
152
0
                invalid_argument, "Invalid queue size {}.", val);
153
0
        }
154
2
        return safe_cast<std::size_t>(val + 1);
155
2
    }
_ZN11num_collect4util32producer_consumer_circular_queueINSt3__110unique_ptrIiNS2_14default_deleteIiEEEEE15get_buffer_sizeEl
Line
Count
Source
149
2
    [[nodiscard]] static auto get_buffer_size(index_type val) -> std::size_t {
150
2
        if (val <= 0 || val == std::numeric_limits<index_type>::max()) {
151
0
            NUM_COLLECT_LOG_AND_THROW(
152
0
                invalid_argument, "Invalid queue size {}.", val);
153
0
        }
154
2
        return safe_cast<std::size_t>(val + 1);
155
2
    }
156
157
    /*!
158
     * \brief Increment a pointer with consideration of the loop of the buffer.
159
     *
160
     * \param[in] ptr Pointer.
161
     * \return Incremented pointer.
162
     */
163
657
    [[nodiscard]] auto increment(storage_type* ptr) noexcept -> storage_type* {
164
        // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
165
657
        ++ptr;
166
657
        if (ptr == end_) {
167
62
            ptr = begin_;
168
62
        }
169
657
        return ptr;
170
657
    }
_ZN11num_collect4util32producer_consumer_circular_queueINSt3__112basic_stringIcNS2_11char_traitsIcEENS2_9allocatorIcEEEEE9incrementEPNS0_14object_storageIS8_Lm64EEE
Line
Count
Source
163
239
    [[nodiscard]] auto increment(storage_type* ptr) noexcept -> storage_type* {
164
        // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
165
239
        ++ptr;
166
239
        if (ptr == end_) {
167
24
            ptr = begin_;
168
24
        }
169
239
        return ptr;
170
239
    }
_ZN11num_collect4util32producer_consumer_circular_queueIiE9incrementEPNS0_14object_storageIiLm64EEE
Line
Count
Source
163
209
    [[nodiscard]] auto increment(storage_type* ptr) noexcept -> storage_type* {
164
        // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
165
209
        ++ptr;
166
209
        if (ptr == end_) {
167
19
            ptr = begin_;
168
19
        }
169
209
        return ptr;
170
209
    }
_ZN11num_collect4util32producer_consumer_circular_queueINSt3__110unique_ptrIiNS2_14default_deleteIiEEEEE9incrementEPNS0_14object_storageIS6_Lm64EEE
Line
Count
Source
163
209
    [[nodiscard]] auto increment(storage_type* ptr) noexcept -> storage_type* {
164
        // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
165
209
        ++ptr;
166
209
        if (ptr == end_) {
167
19
            ptr = begin_;
168
19
        }
169
209
        return ptr;
170
209
    }
171
172
    //! Beginning of the buffer.
173
    storage_type* begin_;
174
175
    //! Past-the-end pointer of the buffer.
176
    storage_type* end_;
177
178
    /*!
179
     * \brief Position of the producer.
180
     *
181
     * Producer will write the next object here.
182
     */
183
    std::atomic<storage_type*> producer_pos_;
184
185
    /*!
186
     * \brief Position of the consumer.
187
     *
188
     * Consumer will read the next object here.
189
     */
190
    std::atomic<storage_type*> consumer_pos_;
191
};
192
193
}  // namespace num_collect::util