00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #ifndef __TBB_concurrent_priority_queue_H
00022 #define __TBB_concurrent_priority_queue_H
00023
00024 #include "atomic.h"
00025 #include "cache_aligned_allocator.h"
00026 #include "tbb_exception.h"
00027 #include "tbb_stddef.h"
00028 #include "tbb_profiling.h"
00029 #include "internal/_aggregator_impl.h"
00030 #include <vector>
00031 #include <iterator>
00032 #include <functional>
00033
00034 namespace tbb {
00035 namespace interface5 {
00036
00037 using namespace tbb::internal;
00038
00040 template <typename T, typename Compare=std::less<T>, typename A=cache_aligned_allocator<T> >
00041 class concurrent_priority_queue {
00042 public:
00044 typedef T value_type;
00045
00047 typedef T& reference;
00048
00050 typedef const T& const_reference;
00051
00053 typedef size_t size_type;
00054
00056 typedef ptrdiff_t difference_type;
00057
00059 typedef A allocator_type;
00060
00062 explicit concurrent_priority_queue(const allocator_type& a = allocator_type()) : mark(0), my_size(0), data(a)
00063 {
00064 my_aggregator.initialize_handler(my_functor_t(this));
00065 }
00066
00068 explicit concurrent_priority_queue(size_type init_capacity, const allocator_type& a = allocator_type()) :
00069 mark(0), my_size(0), data(a)
00070 {
00071 data.reserve(init_capacity);
00072 my_aggregator.initialize_handler(my_functor_t(this));
00073 }
00074
00076 template<typename InputIterator>
00077 concurrent_priority_queue(InputIterator begin, InputIterator end, const allocator_type& a = allocator_type()) :
00078 data(begin, end, a)
00079 {
00080 mark = 0;
00081 my_aggregator.initialize_handler(my_functor_t(this));
00082 heapify();
00083 my_size = data.size();
00084 }
00085
00087
00088 explicit concurrent_priority_queue(const concurrent_priority_queue& src) : mark(src.mark),
00089 my_size(src.my_size), data(src.data.begin(), src.data.end(), src.data.get_allocator())
00090 {
00091 my_aggregator.initialize_handler(my_functor_t(this));
00092 heapify();
00093 }
00094
00096
00097 concurrent_priority_queue(const concurrent_priority_queue& src, const allocator_type& a) : mark(src.mark),
00098 my_size(src.my_size), data(src.data.begin(), src.data.end(), a)
00099 {
00100 my_aggregator.initialize_handler(my_functor_t(this));
00101 heapify();
00102 }
00103
00105
00106 concurrent_priority_queue& operator=(const concurrent_priority_queue& src) {
00107 if (this != &src) {
00108 std::vector<value_type, allocator_type>(src.data.begin(), src.data.end(), src.data.get_allocator()).swap(data);
00109 mark = src.mark;
00110 my_size = src.my_size;
00111 }
00112 return *this;
00113 }
00114
00116
00118 bool empty() const { return size()==0; }
00119
00121
00123 size_type size() const { return __TBB_load_with_acquire(my_size); }
00124
00126
00127 void push(const_reference elem) {
00128 cpq_operation op_data(elem, PUSH_OP);
00129 my_aggregator.execute(&op_data);
00130 if (op_data.status == FAILED)
00131 throw_exception(eid_bad_alloc);
00132 }
00133
00135
00138 bool try_pop(reference elem) {
00139 cpq_operation op_data(POP_OP);
00140 op_data.elem = &elem;
00141 my_aggregator.execute(&op_data);
00142 return op_data.status==SUCCEEDED;
00143 }
00144
00146
00149 void clear() {
00150 data.clear();
00151 mark = 0;
00152 my_size = 0;
00153 }
00154
00156
00157 void swap(concurrent_priority_queue& q) {
00158 data.swap(q.data);
00159 std::swap(mark, q.mark);
00160 std::swap(my_size, q.my_size);
00161 }
00162
00164 allocator_type get_allocator() const { return data.get_allocator(); }
00165
00166 private:
00167 enum operation_type {INVALID_OP, PUSH_OP, POP_OP};
00168 enum operation_status { WAIT=0, SUCCEEDED, FAILED };
00169
00170 class cpq_operation : public aggregated_operation<cpq_operation> {
00171 public:
00172 operation_type type;
00173 union {
00174 value_type *elem;
00175 size_type sz;
00176 };
00177 cpq_operation(const_reference e, operation_type t) :
00178 type(t), elem(const_cast<value_type*>(&e)) {}
00179 cpq_operation(operation_type t) : type(t) {}
00180 };
00181
00182 class my_functor_t {
00183 concurrent_priority_queue<T, Compare, A> *cpq;
00184 public:
00185 my_functor_t() {}
00186 my_functor_t(concurrent_priority_queue<T, Compare, A> *cpq_) : cpq(cpq_) {}
00187 void operator()(cpq_operation* op_list) {
00188 cpq->handle_operations(op_list);
00189 }
00190 };
00191
00192 aggregator< my_functor_t, cpq_operation> my_aggregator;
00194 char padding1[NFS_MaxLineSize - sizeof(aggregator< my_functor_t, cpq_operation >)];
00196 size_type mark;
00197 __TBB_atomic size_type my_size;
00198 Compare compare;
00200 char padding2[NFS_MaxLineSize - (2*sizeof(size_type)) - sizeof(Compare)];
00202
00219 std::vector<value_type, allocator_type> data;
00220
00221 void handle_operations(cpq_operation *op_list) {
00222 cpq_operation *tmp, *pop_list=NULL;
00223
00224 __TBB_ASSERT(mark == data.size(), NULL);
00225
00226
00227
00228 while (op_list) {
00229
00230
00231
00232
00233
00234
00235
00236 call_itt_notify(acquired, &(op_list->status));
00237 __TBB_ASSERT(op_list->type != INVALID_OP, NULL);
00238 tmp = op_list;
00239 op_list = itt_hide_load_word(op_list->next);
00240 if (tmp->type == PUSH_OP) {
00241 __TBB_TRY {
00242 data.push_back(*(tmp->elem));
00243 __TBB_store_with_release(my_size, my_size+1);
00244 itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
00245 } __TBB_CATCH(...) {
00246 itt_store_word_with_release(tmp->status, uintptr_t(FAILED));
00247 }
00248 }
00249 else {
00250 __TBB_ASSERT(tmp->type == POP_OP, NULL);
00251 if (mark < data.size() &&
00252 compare(data[0], data[data.size()-1])) {
00253
00254
00255 *(tmp->elem) = data[data.size()-1];
00256 __TBB_store_with_release(my_size, my_size-1);
00257 itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
00258 data.pop_back();
00259 __TBB_ASSERT(mark<=data.size(), NULL);
00260 }
00261 else {
00262 itt_hide_store_word(tmp->next, pop_list);
00263 pop_list = tmp;
00264 }
00265 }
00266 }
00267
00268
00269 while (pop_list) {
00270 tmp = pop_list;
00271 pop_list = itt_hide_load_word(pop_list->next);
00272 __TBB_ASSERT(tmp->type == POP_OP, NULL);
00273 if (data.empty()) {
00274 itt_store_word_with_release(tmp->status, uintptr_t(FAILED));
00275 }
00276 else {
00277 __TBB_ASSERT(mark<=data.size(), NULL);
00278 if (mark < data.size() &&
00279 compare(data[0], data[data.size()-1])) {
00280
00281
00282 *(tmp->elem) = data[data.size()-1];
00283 __TBB_store_with_release(my_size, my_size-1);
00284 itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
00285 data.pop_back();
00286 }
00287 else {
00288 *(tmp->elem) = data[0];
00289 __TBB_store_with_release(my_size, my_size-1);
00290 itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
00291 reheap();
00292 }
00293 }
00294 }
00295
00296
00297
00298 if (mark<data.size()) heapify();
00299 __TBB_ASSERT(mark == data.size(), NULL);
00300 }
00301
00303 void heapify() {
00304 if (!mark && data.size()>0) mark = 1;
00305 for (; mark<data.size(); ++mark) {
00306
00307 size_type cur_pos = mark;
00308 value_type to_place = data[mark];
00309 do {
00310 size_type parent = (cur_pos-1)>>1;
00311 if (!compare(data[parent], to_place)) break;
00312 data[cur_pos] = data[parent];
00313 cur_pos = parent;
00314 } while( cur_pos );
00315 data[cur_pos] = to_place;
00316 }
00317 }
00318
00320
00321 void reheap() {
00322 size_type cur_pos=0, child=1;
00323
00324 while (child < mark) {
00325 size_type target = child;
00326 if (child+1 < mark && compare(data[child], data[child+1]))
00327 ++target;
00328
00329 if (compare(data[target], data[data.size()-1])) break;
00330 data[cur_pos] = data[target];
00331 cur_pos = target;
00332 child = (cur_pos<<1)+1;
00333 }
00334 data[cur_pos] = data[data.size()-1];
00335 data.pop_back();
00336 if (mark > data.size()) mark = data.size();
00337 }
00338 };
00339
00340 }
00341
00342 using interface5::concurrent_priority_queue;
00343
00344 }
00345
00346 #endif