concurrent_priority_queue.h

00001 /*
00002     Copyright 2005-2011 Intel Corporation.  All Rights Reserved.
00003 
00004     The source code contained or described herein and all documents related
00005     to the source code ("Material") are owned by Intel Corporation or its
00006     suppliers or licensors.  Title to the Material remains with Intel
00007     Corporation or its suppliers and licensors.  The Material is protected
00008     by worldwide copyright laws and treaty provisions.  No part of the
00009     Material may be used, copied, reproduced, modified, published, uploaded,
00010     posted, transmitted, distributed, or disclosed in any way without
00011     Intel's prior express written permission.
00012 
00013     No license under any patent, copyright, trade secret or other
00014     intellectual property right is granted to or conferred upon you by
00015     disclosure or delivery of the Materials, either expressly, by
00016     implication, inducement, estoppel or otherwise.  Any license under such
00017     intellectual property rights must be express and approved by Intel in
00018     writing.
00019 */
00020 
00021 #ifndef __TBB_concurrent_priority_queue_H
00022 #define __TBB_concurrent_priority_queue_H
00023 
00024 #include "atomic.h"
00025 #include "cache_aligned_allocator.h"
00026 #include "tbb_exception.h"
00027 #include "tbb_stddef.h"
00028 #include "tbb_profiling.h"
00029 #include "internal/_aggregator_impl.h"
00030 #include <vector>
00031 #include <iterator>
00032 #include <functional>
00033 
00034 namespace tbb {
00035 namespace interface5 {
00036 
00037 using namespace tbb::internal;
00038 
00040 template <typename T, typename Compare=std::less<T>, typename A=cache_aligned_allocator<T> >
00041 class concurrent_priority_queue {
00042  public:
00044     typedef T value_type;
00045 
00047     typedef T& reference;
00048 
00050     typedef const T& const_reference;
00051 
00053     typedef size_t size_type;
00054 
00056     typedef ptrdiff_t difference_type;
00057 
00059     typedef A allocator_type;
00060 
00062     explicit concurrent_priority_queue(const allocator_type& a = allocator_type()) : mark(0), my_size(0), data(a)
00063     {
00064         my_aggregator.initialize_handler(my_functor_t(this));
00065     }
00066 
00068     explicit concurrent_priority_queue(size_type init_capacity, const allocator_type& a = allocator_type()) :
00069         mark(0), my_size(0), data(a)
00070     {
00071         data.reserve(init_capacity);
00072         my_aggregator.initialize_handler(my_functor_t(this));
00073     }
00074 
00076     template<typename InputIterator>
00077     concurrent_priority_queue(InputIterator begin, InputIterator end, const allocator_type& a = allocator_type()) :
00078         data(begin, end, a)
00079     {
00080         mark = 0;
00081         my_aggregator.initialize_handler(my_functor_t(this));
00082         heapify();
00083         my_size = data.size();
00084     }
00085 
00087 
00088     explicit concurrent_priority_queue(const concurrent_priority_queue& src) : mark(src.mark),
00089         my_size(src.my_size), data(src.data.begin(), src.data.end(), src.data.get_allocator())
00090     {
00091         my_aggregator.initialize_handler(my_functor_t(this));
00092         heapify();
00093     }
00094 
00096 
00097     concurrent_priority_queue(const concurrent_priority_queue& src, const allocator_type& a) : mark(src.mark),
00098         my_size(src.my_size), data(src.data.begin(), src.data.end(), a)
00099     {
00100         my_aggregator.initialize_handler(my_functor_t(this));
00101         heapify();
00102     }
00103 
00105 
00106     concurrent_priority_queue& operator=(const concurrent_priority_queue& src) {
00107         if (this != &src) {
00108             std::vector<value_type, allocator_type>(src.data.begin(), src.data.end(), src.data.get_allocator()).swap(data);
00109             mark = src.mark;
00110             my_size = src.my_size;
00111         }
00112         return *this;
00113     }
00114 
00116 
00118     bool empty() const { return size()==0; }
00119 
00121 
00123     size_type size() const { return __TBB_load_with_acquire(my_size); }
00124 
00126 
00127     void push(const_reference elem) {
00128         cpq_operation op_data(elem, PUSH_OP);
00129         my_aggregator.execute(&op_data);
00130         if (op_data.status == FAILED) // exception thrown
00131             throw_exception(eid_bad_alloc);
00132     }
00133 
00135 
00138     bool try_pop(reference elem) {
00139         cpq_operation op_data(POP_OP);
00140         op_data.elem = &elem;
00141         my_aggregator.execute(&op_data);
00142         return op_data.status==SUCCEEDED;
00143     }
00144 
00146 
00149     void clear() {
00150         data.clear();
00151         mark = 0;
00152         my_size = 0;
00153     }
00154 
00156 
00157     void swap(concurrent_priority_queue& q) {
00158         data.swap(q.data);
00159         std::swap(mark, q.mark);
00160         std::swap(my_size, q.my_size);
00161     }
00162 
00164     allocator_type get_allocator() const { return data.get_allocator(); }
00165 
00166  private:
00167     enum operation_type {INVALID_OP, PUSH_OP, POP_OP};
00168     enum operation_status { WAIT=0, SUCCEEDED, FAILED };
00169 
00170     class cpq_operation : public aggregated_operation<cpq_operation> {
00171      public:
00172         operation_type type;
00173         union {
00174             value_type *elem;
00175             size_type sz;
00176         };
00177         cpq_operation(const_reference e, operation_type t) :
00178             type(t), elem(const_cast<value_type*>(&e)) {}
00179         cpq_operation(operation_type t) : type(t) {}
00180     };
00181 
00182     class my_functor_t {
00183         concurrent_priority_queue<T, Compare, A> *cpq;
00184      public:
00185         my_functor_t() {}
00186         my_functor_t(concurrent_priority_queue<T, Compare, A> *cpq_) : cpq(cpq_) {}
00187         void operator()(cpq_operation* op_list) {
00188             cpq->handle_operations(op_list);
00189         }
00190     };
00191 
00192     aggregator< my_functor_t, cpq_operation> my_aggregator;
00194     char padding1[NFS_MaxLineSize - sizeof(aggregator< my_functor_t, cpq_operation >)];
00196     size_type mark;
00197     __TBB_atomic size_type my_size;
00198     Compare compare;
00200     char padding2[NFS_MaxLineSize - (2*sizeof(size_type)) - sizeof(Compare)];
00202 
00219     std::vector<value_type, allocator_type> data;
00220 
00221     void handle_operations(cpq_operation *op_list) {
00222         cpq_operation *tmp, *pop_list=NULL;
00223 
00224         __TBB_ASSERT(mark == data.size(), NULL);
00225 
00226         // first pass processes all constant time operations: pushes,
00227         // tops, some pops. Also reserve.
00228         while (op_list) {
00229             // ITT note: &(op_list->status) tag is used to cover accesses to op_list
00230             // node. This thread is going to handle the operation, and so will acquire it
00231             // and perform the associated operation w/o triggering a race condition; the
00232             // thread that created the operation is waiting on the status field, so when
00233             // this thread is done with the operation, it will perform a
00234             // store_with_release to give control back to the waiting thread in
00235             // aggregator::insert_operation.
00236             call_itt_notify(acquired, &(op_list->status));
00237             __TBB_ASSERT(op_list->type != INVALID_OP, NULL);
00238             tmp = op_list;
00239             op_list = itt_hide_load_word(op_list->next);
00240             if (tmp->type == PUSH_OP) {
00241                 __TBB_TRY {
00242                     data.push_back(*(tmp->elem));
00243                     __TBB_store_with_release(my_size, my_size+1);
00244                     itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
00245                 } __TBB_CATCH(...) {
00246                     itt_store_word_with_release(tmp->status, uintptr_t(FAILED));
00247                 }
00248             }
00249             else { // tmp->type == POP_OP
00250                 __TBB_ASSERT(tmp->type == POP_OP, NULL);
00251                 if (mark < data.size() &&
00252                     compare(data[0], data[data.size()-1])) {
00253                     // there are newly pushed elems and the last one
00254                     // is higher than top
00255                     *(tmp->elem) = data[data.size()-1]; // copy the data
00256                     __TBB_store_with_release(my_size, my_size-1);
00257                     itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
00258                     data.pop_back();
00259                     __TBB_ASSERT(mark<=data.size(), NULL);
00260                 }
00261                 else { // no convenient item to pop; postpone
00262                     itt_hide_store_word(tmp->next, pop_list);
00263                     pop_list = tmp;
00264                 }
00265             }
00266         }
00267 
00268         // second pass processes pop operations
00269         while (pop_list) {
00270             tmp = pop_list;
00271             pop_list = itt_hide_load_word(pop_list->next);
00272             __TBB_ASSERT(tmp->type == POP_OP, NULL);
00273             if (data.empty()) {
00274                 itt_store_word_with_release(tmp->status, uintptr_t(FAILED));
00275             }
00276             else {
00277                 __TBB_ASSERT(mark<=data.size(), NULL);
00278                 if (mark < data.size() &&
00279                     compare(data[0], data[data.size()-1])) {
00280                     // there are newly pushed elems and the last one is
00281                     // higher than top
00282                     *(tmp->elem) = data[data.size()-1]; // copy the data
00283                     __TBB_store_with_release(my_size, my_size-1);
00284                     itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
00285                     data.pop_back();
00286                 }
00287                 else { // extract top and push last element down heap
00288                     *(tmp->elem) = data[0]; // copy the data
00289                     __TBB_store_with_release(my_size, my_size-1);
00290                     itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
00291                     reheap();
00292                 }
00293             }
00294         }
00295 
00296         // heapify any leftover pushed elements before doing the next
00297         // batch of operations
00298         if (mark<data.size()) heapify();
00299         __TBB_ASSERT(mark == data.size(), NULL);
00300     }
00301 
00303     void heapify() {
00304         if (!mark && data.size()>0) mark = 1;
00305         for (; mark<data.size(); ++mark) {
00306             // for each unheapified element under size
00307             size_type cur_pos = mark;
00308             value_type to_place = data[mark];
00309             do { // push to_place up the heap
00310                 size_type parent = (cur_pos-1)>>1;
00311                 if (!compare(data[parent], to_place)) break;
00312                 data[cur_pos] = data[parent];
00313                 cur_pos = parent;
00314             } while( cur_pos );
00315             data[cur_pos] = to_place;
00316         }
00317     }
00318 
00320 
00321     void reheap() {
00322         size_type cur_pos=0, child=1;
00323 
00324         while (child < mark) {
00325             size_type target = child;
00326             if (child+1 < mark && compare(data[child], data[child+1]))
00327                 ++target;
00328             // target now has the higher priority child
00329             if (compare(data[target], data[data.size()-1])) break;
00330             data[cur_pos] = data[target];
00331             cur_pos = target;
00332             child = (cur_pos<<1)+1;
00333         }
00334         data[cur_pos] = data[data.size()-1];
00335         data.pop_back();
00336         if (mark > data.size()) mark = data.size();
00337     }
00338 };
00339 
00340 } // namespace interface5
00341 
00342 using interface5::concurrent_priority_queue;
00343 
00344 } // namespace tbb
00345 
00346 #endif /* __TBB_concurrent_priority_queue_H */

Copyright © 2005-2011 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.