00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #ifndef __TBB_flow_graph_H
00022 #define __TBB_flow_graph_H
00023
00024 #include "tbb_stddef.h"
00025 #include "atomic.h"
00026 #include "spin_mutex.h"
00027 #include "null_mutex.h"
00028 #include "spin_rw_mutex.h"
00029 #include "null_rw_mutex.h"
00030 #include "task.h"
00031 #include "concurrent_vector.h"
00032 #include "internal/_aggregator_impl.h"
00033
00034
00035 #if TBB_IMPLEMENT_CPP0X && (!defined(_MSC_VER) || _MSC_VER < 1600)
00036 #define TBB_PREVIEW_TUPLE 1
00037 #include "compat/tuple"
00038 #else
00039 #include <tuple>
00040 #endif
00041
00042 #include<list>
00043 #include<queue>
00044
00055 namespace tbb {
00056 namespace flow {
00057
00059 enum concurrency { unlimited = 0, serial = 1 };
00060
00061 namespace interface6 {
00062
00064 class graph_node {
00065 public:
00066 virtual ~graph_node() {}
00067 };
00068
00070 class continue_msg {};
00071
00072 template< typename T > class sender;
00073 template< typename T > class receiver;
00074 class continue_receiver;
00075
00077 template< typename T >
00078 class sender {
00079 public:
00081 typedef T output_type;
00082
00084 typedef receiver<T> successor_type;
00085
00086 virtual ~sender() {}
00087
00089 virtual bool register_successor( successor_type &r ) = 0;
00090
00092 virtual bool remove_successor( successor_type &r ) = 0;
00093
00095 virtual bool try_get( T & ) { return false; }
00096
00098 virtual bool try_reserve( T & ) { return false; }
00099
00101 virtual bool try_release( ) { return false; }
00102
00104 virtual bool try_consume( ) { return false; }
00105
00106 };
00107
00108
00110 template< typename T >
00111 class receiver {
00112 public:
00113
00115 typedef T input_type;
00116
00118 typedef sender<T> predecessor_type;
00119
00121 virtual ~receiver() {}
00122
00124 virtual bool try_put( const T& t ) = 0;
00125
00127 virtual bool register_predecessor( predecessor_type & ) { return false; }
00128
00130 virtual bool remove_predecessor( predecessor_type & ) { return false; }
00131
00132 };
00133
00135
00136 class continue_receiver : public receiver< continue_msg > {
00137 public:
00138
00140 typedef continue_msg input_type;
00141
00143 typedef sender< continue_msg > predecessor_type;
00144
00146 continue_receiver( int number_of_predecessors = 0 ) {
00147 my_predecessor_count = my_initial_predecessor_count = number_of_predecessors;
00148 my_current_count = 0;
00149 }
00150
00152 continue_receiver( const continue_receiver& src ) : receiver<continue_msg>() {
00153 my_predecessor_count = my_initial_predecessor_count = src.my_initial_predecessor_count;
00154 my_current_count = 0;
00155 }
00156
00158 virtual ~continue_receiver() { }
00159
00161 bool register_predecessor( predecessor_type & ) {
00162 spin_mutex::scoped_lock l(my_mutex);
00163 ++my_predecessor_count;
00164 return true;
00165 }
00166
00168
00171 bool remove_predecessor( predecessor_type & ) {
00172 spin_mutex::scoped_lock l(my_mutex);
00173 --my_predecessor_count;
00174 return true;
00175 }
00176
00178
00180 bool try_put( const input_type & ) {
00181 {
00182 spin_mutex::scoped_lock l(my_mutex);
00183 if ( ++my_current_count < my_predecessor_count )
00184 return true;
00185 else
00186 my_current_count = 0;
00187 }
00188 execute();
00189 return true;
00190 }
00191
00192 protected:
00193
00194 spin_mutex my_mutex;
00195 int my_predecessor_count;
00196 int my_current_count;
00197 int my_initial_predecessor_count;
00198
00200
00202 virtual void execute() = 0;
00203
00204 };
00205
00206 #include "internal/_flow_graph_impl.h"
00207 using namespace internal::graph_policy_namespace;
00208
00210
00211 class graph : tbb::internal::no_copy {
00212
00213 template< typename Body >
00214 class run_task : public task {
00215 public:
00216 run_task( Body& body ) : my_body(body) {}
00217 task *execute() {
00218 my_body();
00219 return NULL;
00220 }
00221 private:
00222 Body my_body;
00223 };
00224
00225 template< typename Receiver, typename Body >
00226 class run_and_put_task : public task {
00227 public:
00228 run_and_put_task( Receiver &r, Body& body ) : my_receiver(r), my_body(body) {}
00229 task *execute() {
00230 my_receiver.try_put( my_body() );
00231 return NULL;
00232 }
00233 private:
00234 Receiver &my_receiver;
00235 Body my_body;
00236 };
00237
00238 public:
00239
00240
00242 graph() : my_root_task( new ( task::allocate_root( ) ) empty_task ) {
00243 my_root_task->set_ref_count(1);
00244 }
00245
00247
00249 ~graph() {
00250 wait_for_all();
00251 my_root_task->set_ref_count(0);
00252 task::destroy( *my_root_task );
00253 }
00254
00255
00257
00259 void increment_wait_count() {
00260 if (my_root_task)
00261 my_root_task->increment_ref_count();
00262 }
00263
00265
00267 void decrement_wait_count() {
00268 if (my_root_task)
00269 my_root_task->decrement_ref_count();
00270 }
00271
00273
00275 template< typename Receiver, typename Body >
00276 void run( Receiver &r, Body body ) {
00277 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
00278 run_and_put_task< Receiver, Body >( r, body ) );
00279 }
00280
00282
00284 template< typename Body >
00285 void run( Body body ) {
00286 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
00287 run_task< Body >( body ) );
00288 }
00289
00291
00292 void wait_for_all() {
00293 if (my_root_task)
00294 my_root_task->wait_for_all();
00295 my_root_task->set_ref_count(1);
00296 }
00297
00299 task * root_task() {
00300 return my_root_task;
00301 }
00302
00303 private:
00304
00305 task *my_root_task;
00306
00307 };
00308
00309 #include "internal/_flow_graph_node_impl.h"
00310
00312 template < typename Output >
00313 class source_node : public graph_node, public sender< Output > {
00314 public:
00315
00317 typedef Output output_type;
00318
00320 typedef receiver< Output > successor_type;
00321
00323 template< typename Body >
00324 source_node( graph &g, Body body, bool is_active = true )
00325 : my_root_task(g.root_task()), my_active(is_active), init_my_active(is_active),
00326 my_body( new internal::source_body_leaf< output_type, Body>(body) ),
00327 my_reserved(false), my_has_cached_item(false)
00328 {
00329 my_successors.set_owner(this);
00330 }
00331
00333 source_node( const source_node& src ) :
00334 graph_node(), sender<Output>(),
00335 my_root_task( src.my_root_task), my_active(src.init_my_active),
00336 init_my_active(src.init_my_active), my_body( src.my_body->clone() ),
00337 my_reserved(false), my_has_cached_item(false)
00338 {
00339 my_successors.set_owner(this);
00340 }
00341
00343 ~source_node() { delete my_body; }
00344
00346 bool register_successor( receiver<output_type> &r ) {
00347 spin_mutex::scoped_lock lock(my_mutex);
00348 my_successors.register_successor(r);
00349 if ( my_active )
00350 spawn_put();
00351 return true;
00352 }
00353
00355 bool remove_successor( receiver<output_type> &r ) {
00356 spin_mutex::scoped_lock lock(my_mutex);
00357 my_successors.remove_successor(r);
00358 return true;
00359 }
00360
00362 bool try_get( output_type &v ) {
00363 spin_mutex::scoped_lock lock(my_mutex);
00364 if ( my_reserved )
00365 return false;
00366
00367 if ( my_has_cached_item ) {
00368 v = my_cached_item;
00369 my_has_cached_item = false;
00370 } else if ( (*my_body)(v) == false ) {
00371 return false;
00372 }
00373 return true;
00374 }
00375
00377 bool try_reserve( output_type &v ) {
00378 spin_mutex::scoped_lock lock(my_mutex);
00379 if ( my_reserved ) {
00380 return false;
00381 }
00382
00383 if ( !my_has_cached_item && (*my_body)(my_cached_item) )
00384 my_has_cached_item = true;
00385
00386 if ( my_has_cached_item ) {
00387 v = my_cached_item;
00388 my_reserved = true;
00389 return true;
00390 } else {
00391 return false;
00392 }
00393 }
00394
00396
00397 bool try_release( ) {
00398 spin_mutex::scoped_lock lock(my_mutex);
00399 __TBB_ASSERT( my_reserved && my_has_cached_item, "releasing non-existent reservation" );
00400 my_reserved = false;
00401 spawn_put();
00402 return true;
00403 }
00404
00406 bool try_consume( ) {
00407 spin_mutex::scoped_lock lock(my_mutex);
00408 __TBB_ASSERT( my_reserved && my_has_cached_item, "consuming non-existent reservation" );
00409 my_reserved = false;
00410 my_has_cached_item = false;
00411 if ( !my_successors.empty() ) {
00412 spawn_put();
00413 }
00414 return true;
00415 }
00416
00418 void activate() {
00419 spin_mutex::scoped_lock lock(my_mutex);
00420 my_active = true;
00421 if ( !my_successors.empty() )
00422 spawn_put();
00423 }
00424
00425 private:
00426
00427 task *my_root_task;
00428 spin_mutex my_mutex;
00429 bool my_active;
00430 bool init_my_active;
00431 internal::source_body<output_type> *my_body;
00432 internal::broadcast_cache< output_type > my_successors;
00433 bool my_reserved;
00434 bool my_has_cached_item;
00435 output_type my_cached_item;
00436
00437 friend class internal::source_task< source_node< output_type > >;
00438
00440 void apply_body( ) {
00441 output_type v;
00442 if ( try_reserve(v) == false )
00443 return;
00444
00445 if ( my_successors.try_put( v ) )
00446 try_consume();
00447 else
00448 try_release();
00449 }
00450
00452 void spawn_put( ) {
00453 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
00454 internal::source_task< source_node< output_type > >( *this ) );
00455 }
00456
00457 };
00458
00460 template < typename Input, typename Output = continue_msg, graph_buffer_policy = queueing, typename Allocator=cache_aligned_allocator<Input> >
00461 class function_node : public graph_node, public internal::function_input<Input,Output,Allocator>, public internal::function_output<Output> {
00462 public:
00463
00464 typedef Input input_type;
00465 typedef Output output_type;
00466 typedef sender< input_type > predecessor_type;
00467 typedef receiver< output_type > successor_type;
00468 typedef internal::function_output<output_type> fOutput_type;
00469
00471 template< typename Body >
00472 function_node( graph &g, size_t concurrency, Body body )
00473 : internal::function_input<input_type,output_type,Allocator>( g, concurrency, body ) {
00474 }
00475
00477 function_node( const function_node& src ) :
00478 graph_node(), internal::function_input<input_type,output_type,Allocator>( src ),
00479 fOutput_type() {}
00480
00481 protected:
00482
00483 internal::broadcast_cache<output_type> &successors () { return fOutput_type::my_successors; }
00484
00485 };
00486
00488 template < typename Input, typename Output, typename Allocator >
00489 class function_node<Input,Output,queueing,Allocator> : public graph_node, public internal::function_input<Input,Output,Allocator>, public internal::function_output<Output> {
00490 public:
00491
00492 typedef Input input_type;
00493 typedef Output output_type;
00494 typedef sender< input_type > predecessor_type;
00495 typedef receiver< output_type > successor_type;
00496 typedef internal::function_input<input_type,output_type,Allocator> fInput_type;
00497 typedef internal::function_input_queue<input_type, Allocator> queue_type;
00498 typedef internal::function_output<output_type> fOutput_type;
00499
00501 template< typename Body >
00502 function_node( graph &g, size_t concurrency, Body body ) : fInput_type( g, concurrency, body, new queue_type() ) {
00503 }
00504
00506 function_node( const function_node& src ) :
00507 graph_node(), fInput_type( src, new queue_type() ) , fOutput_type() { }
00508
00509 protected:
00510
00511 internal::broadcast_cache<output_type> &successors () { return fOutput_type::my_successors; }
00512
00513 };
00514
00515 #include "tbb/internal/_flow_graph_types_impl.h"
00516
00517 #if TBB_PREVIEW_GRAPH_NODES
00519 // Output is a tuple of output types.
00520 template < typename Input, typename Output, graph_buffer_policy = queueing, typename Allocator=cache_aligned_allocator<Input> >
00521 class multioutput_function_node :
00522 public graph_node,
00523 public internal::multioutput_function_input
00524 <
00525 Input,
00526 typename internal::wrap_tuple_elements<
00527 std::tuple_size<Output>::value,
00528 internal::function_output,
00529 Output
00530 >::type,
00531 Allocator
00532 > {
00533 private:
00534 static const int N = std::tuple_size<Output>::value;
00535 public:
00536 typedef Input input_type;
00537 typedef typename internal::wrap_tuple_elements<N,internal::function_output, Output>::type ports_type;
00538 private:
00539 typedef typename internal::multioutput_function_input<input_type, ports_type, Allocator> base_type;
00540 typedef typename internal::function_input_queue<input_type,Allocator> queue_type;
00541 public:
00542 template<typename Body>
00543 multioutput_function_node( graph &g, size_t concurrency, Body body ) : base_type(g,concurrency, body) {}
00544 multioutput_function_node( const multioutput_function_node &other) :
00545 graph_node(), base_type(other) {}
00546
00547
00548 };
00549
00550 template < typename Input, typename Output, typename Allocator >
00551 class multioutput_function_node<Input,Output,queueing,Allocator> : public graph_node, public internal::multioutput_function_input<Input,
00552 typename internal::wrap_tuple_elements<std::tuple_size<Output>::value, internal::function_output, Output>::type, Allocator> {
00553 static const int N = std::tuple_size<Output>::value;
00554 public:
00555 typedef Input input_type;
00556 typedef typename internal::wrap_tuple_elements<N, internal::function_output, Output>::type ports_type;
00557 private:
00558 typedef typename internal::multioutput_function_input<input_type, ports_type, Allocator> base_type;
00559 typedef typename internal::function_input_queue<input_type,Allocator> queue_type;
00560 public:
00561
00562 template<typename Body>
00563 multioutput_function_node( graph &g, size_t concurrency, Body body) : base_type(g,concurrency, body, new queue_type()) {}
00564 multioutput_function_node( const multioutput_function_node &other) :
00565 graph_node(), base_type(other, new queue_type()) {}
00566
00567 };
00568
00570
00571
00572 template<typename TupleType, typename Allocator=cache_aligned_allocator<TupleType> >
00573 class split_node : public multioutput_function_node<TupleType, TupleType, rejecting, Allocator> {
00574 static const int N = std::tuple_size<TupleType>::value;
00575 typedef multioutput_function_node<TupleType,TupleType,rejecting,Allocator> base_type;
00576 public:
00577 typedef typename base_type::ports_type ports_type;
00578 private:
00579
00580 struct splitting_body {
00581 void operator()(const TupleType& t, ports_type &p) {
00582 internal::emit_element<N>::emit_this(t, p);
00583 }
00584 };
00585 public:
00586 typedef TupleType input_type;
00587 typedef Allocator allocator_type;
00588 split_node(graph &g) : base_type(g, unlimited, splitting_body()) { }
00589 split_node( const split_node & other) : base_type(other) { }
00590 };
00591 #endif // TBB_PREVIEW_GRAPH_NODES
00592
00594 template <typename Output>
00595 class continue_node : public graph_node, public internal::continue_input<Output>, public internal::function_output<Output> {
00596 public:
00597
00598 typedef continue_msg input_type;
00599 typedef Output output_type;
00600 typedef sender< input_type > predecessor_type;
00601 typedef receiver< output_type > successor_type;
00602 typedef internal::function_output<output_type> fOutput_type;
00603
00605 template <typename Body >
00606 continue_node( graph &g, Body body )
00607 : internal::continue_input<output_type>( g, body ) {
00608 }
00609
00611 template <typename Body >
00612 continue_node( graph &g, int number_of_predecessors, Body body )
00613 : internal::continue_input<output_type>( g, number_of_predecessors, body )
00614 {
00615 }
00616
00618 continue_node( const continue_node& src ) :
00619 graph_node(), internal::continue_input<output_type>(src),
00620 internal::function_output<Output>() { }
00621
00622 protected:
00623
00624 internal::broadcast_cache<output_type> &successors () { return fOutput_type::my_successors; }
00625
00626 };
00627
00628 template< typename T >
00629 class overwrite_node : public graph_node, public receiver<T>, public sender<T> {
00630 public:
00631
00632 typedef T input_type;
00633 typedef T output_type;
00634 typedef sender< input_type > predecessor_type;
00635 typedef receiver< output_type > successor_type;
00636
00637 overwrite_node() : my_buffer_is_valid(false) {
00638 my_successors.set_owner( this );
00639 }
00640
00641
00642 overwrite_node( const overwrite_node& ) :
00643 graph_node(), receiver<T>(), sender<T>(), my_buffer_is_valid(false) {
00644 my_successors.set_owner( this );
00645 }
00646
00647 ~overwrite_node() {}
00648
00649 bool register_successor( successor_type &s ) {
00650 spin_mutex::scoped_lock l( my_mutex );
00651 if ( my_buffer_is_valid ) {
00652
00653 if ( s.try_put( my_buffer ) || !s.register_predecessor( *this ) ) {
00654
00655 my_successors.register_successor( s );
00656 return true;
00657 } else {
00658
00659 return false;
00660 }
00661 } else {
00662
00663 my_successors.register_successor( s );
00664 return true;
00665 }
00666 }
00667
00668 bool remove_successor( successor_type &s ) {
00669 spin_mutex::scoped_lock l( my_mutex );
00670 my_successors.remove_successor(s);
00671 return true;
00672 }
00673
00674 bool try_put( const T &v ) {
00675 spin_mutex::scoped_lock l( my_mutex );
00676 my_buffer = v;
00677 my_buffer_is_valid = true;
00678 my_successors.try_put(v);
00679 return true;
00680 }
00681
00682 bool try_get( T &v ) {
00683 spin_mutex::scoped_lock l( my_mutex );
00684 if ( my_buffer_is_valid ) {
00685 v = my_buffer;
00686 return true;
00687 } else {
00688 return false;
00689 }
00690 }
00691
00692 bool is_valid() {
00693 spin_mutex::scoped_lock l( my_mutex );
00694 return my_buffer_is_valid;
00695 }
00696
00697 void clear() {
00698 spin_mutex::scoped_lock l( my_mutex );
00699 my_buffer_is_valid = false;
00700 }
00701
00702 protected:
00703
00704 spin_mutex my_mutex;
00705 internal::broadcast_cache< T, null_rw_mutex > my_successors;
00706 T my_buffer;
00707 bool my_buffer_is_valid;
00708
00709 };
00710
00711 template< typename T >
00712 class write_once_node : public overwrite_node<T> {
00713 public:
00714
00715 typedef T input_type;
00716 typedef T output_type;
00717 typedef sender< input_type > predecessor_type;
00718 typedef receiver< output_type > successor_type;
00719
00721 write_once_node() : overwrite_node<T>() {}
00722
00724 write_once_node( const write_once_node& src ) : overwrite_node<T>(src) {}
00725
00726 bool try_put( const T &v ) {
00727 spin_mutex::scoped_lock l( this->my_mutex );
00728 if ( this->my_buffer_is_valid ) {
00729 return false;
00730 } else {
00731 this->my_buffer = v;
00732 this->my_buffer_is_valid = true;
00733 this->my_successors.try_put(v);
00734 return true;
00735 }
00736 }
00737 };
00738
00740 template <typename T>
00741 class broadcast_node : public graph_node, public receiver<T>, public sender<T> {
00742
00743 internal::broadcast_cache<T> my_successors;
00744
00745 public:
00746
00747 typedef T input_type;
00748 typedef T output_type;
00749 typedef sender< input_type > predecessor_type;
00750 typedef receiver< output_type > successor_type;
00751
00752 broadcast_node( ) {
00753 my_successors.set_owner( this );
00754 }
00755
00756
00757 broadcast_node( const broadcast_node& ) : graph_node(), receiver<T>(), sender<T>() {
00758 my_successors.set_owner( this );
00759 }
00760
00762 virtual bool register_successor( receiver<T> &r ) {
00763 my_successors.register_successor( r );
00764 return true;
00765 }
00766
00768 virtual bool remove_successor( receiver<T> &r ) {
00769 my_successors.remove_successor( r );
00770 return true;
00771 }
00772
00773 bool try_put( const T &t ) {
00774 my_successors.try_put(t);
00775 return true;
00776 }
00777
00778 };
00779
00780 #include "internal/_flow_graph_item_buffer_impl.h"
00781
00783 template <typename T, typename A=cache_aligned_allocator<T> >
00784 class buffer_node : public graph_node, public reservable_item_buffer<T, A>, public receiver<T>, public sender<T> {
00785 public:
00786 typedef T input_type;
00787 typedef T output_type;
00788 typedef sender< input_type > predecessor_type;
00789 typedef receiver< output_type > successor_type;
00790 typedef buffer_node<T, A> my_class;
00791 protected:
00792 typedef size_t size_type;
00793 internal::round_robin_cache< T, null_rw_mutex > my_successors;
00794
00795 task *my_parent;
00796
00797 friend class internal::forward_task< buffer_node< T, A > >;
00798
00799 enum op_type {reg_succ, rem_succ, req_item, res_item, rel_res, con_res, put_item, try_fwd};
00800 enum op_stat {WAIT=0, SUCCEEDED, FAILED};
00801
00802
00803 class buffer_operation : public internal::aggregated_operation< buffer_operation > {
00804 public:
00805 char type;
00806 T *elem;
00807 successor_type *r;
00808 buffer_operation(const T& e, op_type t) :
00809 type(char(t)), elem(const_cast<T*>(&e)), r(NULL) {}
00810 buffer_operation(op_type t) : type(char(t)), r(NULL) {}
00811 };
00812
00813 bool forwarder_busy;
00814 typedef internal::aggregating_functor<my_class, buffer_operation> my_handler;
00815 friend class internal::aggregating_functor<my_class, buffer_operation>;
00816 internal::aggregator< my_handler, buffer_operation> my_aggregator;
00817
00818 virtual void handle_operations(buffer_operation *op_list) {
00819 buffer_operation *tmp;
00820 bool try_forwarding=false;
00821 while (op_list) {
00822 tmp = op_list;
00823 op_list = op_list->next;
00824 switch (tmp->type) {
00825 case reg_succ: internal_reg_succ(tmp); try_forwarding = true; break;
00826 case rem_succ: internal_rem_succ(tmp); break;
00827 case req_item: internal_pop(tmp); break;
00828 case res_item: internal_reserve(tmp); break;
00829 case rel_res: internal_release(tmp); try_forwarding = true; break;
00830 case con_res: internal_consume(tmp); try_forwarding = true; break;
00831 case put_item: internal_push(tmp); try_forwarding = true; break;
00832 case try_fwd: internal_forward(tmp); break;
00833 }
00834 }
00835 if (try_forwarding && !forwarder_busy) {
00836 forwarder_busy = true;
00837 task::enqueue(*new(task::allocate_additional_child_of(*my_parent)) internal::forward_task< buffer_node<input_type, A> >(*this));
00838 }
00839 }
00840
00842 virtual void forward() {
00843 buffer_operation op_data(try_fwd);
00844 do {
00845 op_data.status = WAIT;
00846 my_aggregator.execute(&op_data);
00847 } while (op_data.status == SUCCEEDED);
00848 }
00849
00851 virtual void internal_reg_succ(buffer_operation *op) {
00852 my_successors.register_successor(*(op->r));
00853 __TBB_store_with_release(op->status, SUCCEEDED);
00854 }
00855
00857 virtual void internal_rem_succ(buffer_operation *op) {
00858 my_successors.remove_successor(*(op->r));
00859 __TBB_store_with_release(op->status, SUCCEEDED);
00860 }
00861
00863 virtual void internal_forward(buffer_operation *op) {
00864 T i_copy;
00865 bool success = false;
00866 size_type counter = my_successors.size();
00867
00868 while (counter>0 && !this->buffer_empty() && this->item_valid(this->my_tail-1)) {
00869 this->fetch_back(i_copy);
00870 if( my_successors.try_put(i_copy) ) {
00871 this->invalidate_back();
00872 --(this->my_tail);
00873 success = true;
00874 }
00875 --counter;
00876 }
00877 if (success && !counter)
00878 __TBB_store_with_release(op->status, SUCCEEDED);
00879 else {
00880 __TBB_store_with_release(op->status, FAILED);
00881 forwarder_busy = false;
00882 }
00883 }
00884
00885 virtual void internal_push(buffer_operation *op) {
00886 this->push_back(*(op->elem));
00887 __TBB_store_with_release(op->status, SUCCEEDED);
00888 }
00889
00890 virtual void internal_pop(buffer_operation *op) {
00891 if(this->pop_back(*(op->elem))) {
00892 __TBB_store_with_release(op->status, SUCCEEDED);
00893 }
00894 else {
00895 __TBB_store_with_release(op->status, FAILED);
00896 }
00897 }
00898
00899 virtual void internal_reserve(buffer_operation *op) {
00900 if(this->reserve_front(*(op->elem))) {
00901 __TBB_store_with_release(op->status, SUCCEEDED);
00902 }
00903 else {
00904 __TBB_store_with_release(op->status, FAILED);
00905 }
00906 }
00907
00908 virtual void internal_consume(buffer_operation *op) {
00909 this->consume_front();
00910 __TBB_store_with_release(op->status, SUCCEEDED);
00911 }
00912
00913 virtual void internal_release(buffer_operation *op) {
00914 this->release_front();
00915 __TBB_store_with_release(op->status, SUCCEEDED);
00916 }
00917
00918 public:
00920 buffer_node( graph &g ) : reservable_item_buffer<T>(),
00921 my_parent( g.root_task() ), forwarder_busy(false) {
00922 my_successors.set_owner(this);
00923 my_aggregator.initialize_handler(my_handler(this));
00924 }
00925
00927 buffer_node( const buffer_node& src ) :
00928 graph_node(), reservable_item_buffer<T>(), receiver<T>(), sender<T>(),
00929 my_parent( src.my_parent ) {
00930 forwarder_busy = false;
00931 my_successors.set_owner(this);
00932 my_aggregator.initialize_handler(my_handler(this));
00933 }
00934
00935 virtual ~buffer_node() {}
00936
00937
00938
00939
00940
00942
00943 bool register_successor( receiver<output_type> &r ) {
00944 buffer_operation op_data(reg_succ);
00945 op_data.r = &r;
00946 my_aggregator.execute(&op_data);
00947 return true;
00948 }
00949
00951
00953 bool remove_successor( receiver<output_type> &r ) {
00954 r.remove_predecessor(*this);
00955 buffer_operation op_data(rem_succ);
00956 op_data.r = &r;
00957 my_aggregator.execute(&op_data);
00958 return true;
00959 }
00960
00962
00964 bool try_get( T &v ) {
00965 buffer_operation op_data(req_item);
00966 op_data.elem = &v;
00967 my_aggregator.execute(&op_data);
00968 return (op_data.status==SUCCEEDED);
00969 }
00970
00972
00974 bool try_reserve( T &v ) {
00975 buffer_operation op_data(res_item);
00976 op_data.elem = &v;
00977 my_aggregator.execute(&op_data);
00978 return (op_data.status==SUCCEEDED);
00979 }
00980
00982
00983 bool try_release() {
00984 buffer_operation op_data(rel_res);
00985 my_aggregator.execute(&op_data);
00986 return true;
00987 }
00988
00990
00991 bool try_consume() {
00992 buffer_operation op_data(con_res);
00993 my_aggregator.execute(&op_data);
00994 return true;
00995 }
00996
00998
00999 bool try_put(const T &t) {
01000 buffer_operation op_data(t, put_item);
01001 my_aggregator.execute(&op_data);
01002 return true;
01003 }
01004 };
01005
01006
01008 template <typename T, typename A=cache_aligned_allocator<T> >
01009 class queue_node : public buffer_node<T, A> {
01010 protected:
01011 typedef typename buffer_node<T, A>::size_type size_type;
01012 typedef typename buffer_node<T, A>::buffer_operation queue_operation;
01013
01014 enum op_stat {WAIT=0, SUCCEEDED, FAILED};
01015
01017 void internal_forward(queue_operation *op) {
01018 T i_copy;
01019 bool success = false;
01020 size_type counter = this->my_successors.size();
01021 if (this->my_reserved || !this->item_valid(this->my_head)){
01022 __TBB_store_with_release(op->status, FAILED);
01023 this->forwarder_busy = false;
01024 return;
01025 }
01026
01027 while (counter>0 && this->item_valid(this->my_head)) {
01028 this->fetch_front(i_copy);
01029 if(this->my_successors.try_put(i_copy)) {
01030 this->invalidate_front();
01031 ++(this->my_head);
01032 success = true;
01033 }
01034 --counter;
01035 }
01036 if (success && !counter)
01037 __TBB_store_with_release(op->status, SUCCEEDED);
01038 else {
01039 __TBB_store_with_release(op->status, FAILED);
01040 this->forwarder_busy = false;
01041 }
01042 }
01043
01044 void internal_pop(queue_operation *op) {
01045 if ( this->my_reserved || !this->item_valid(this->my_head)){
01046 __TBB_store_with_release(op->status, FAILED);
01047 }
01048 else {
01049 this->pop_front(*(op->elem));
01050 __TBB_store_with_release(op->status, SUCCEEDED);
01051 }
01052 }
01053 void internal_reserve(queue_operation *op) {
01054 if (this->my_reserved || !this->item_valid(this->my_head)) {
01055 __TBB_store_with_release(op->status, FAILED);
01056 }
01057 else {
01058 this->my_reserved = true;
01059 this->fetch_front(*(op->elem));
01060 this->invalidate_front();
01061 __TBB_store_with_release(op->status, SUCCEEDED);
01062 }
01063 }
01064 void internal_consume(queue_operation *op) {
01065 this->consume_front();
01066 __TBB_store_with_release(op->status, SUCCEEDED);
01067 }
01068
01069 public:
01070
01071 typedef T input_type;
01072 typedef T output_type;
01073 typedef sender< input_type > predecessor_type;
01074 typedef receiver< output_type > successor_type;
01075
01077 queue_node( graph &g ) : buffer_node<T, A>(g) {}
01078
01080 queue_node( const queue_node& src) : buffer_node<T, A>(src) {}
01081 };
01082
01084 template< typename T, typename A=cache_aligned_allocator<T> >
01085 class sequencer_node : public queue_node<T, A> {
01086 internal::function_body< T, size_t > *my_sequencer;
01087 public:
01088
01089 typedef T input_type;
01090 typedef T output_type;
01091 typedef sender< input_type > predecessor_type;
01092 typedef receiver< output_type > successor_type;
01093
01095 template< typename Sequencer >
01096 sequencer_node( graph &g, const Sequencer& s ) : queue_node<T, A>(g),
01097 my_sequencer(new internal::function_body_leaf< T, size_t, Sequencer>(s) ) {}
01098
01100 sequencer_node( const sequencer_node& src ) : queue_node<T, A>(src),
01101 my_sequencer( src.my_sequencer->clone() ) {}
01102
01104 ~sequencer_node() { delete my_sequencer; }
01105 protected:
01106 typedef typename buffer_node<T, A>::size_type size_type;
01107 typedef typename buffer_node<T, A>::buffer_operation sequencer_operation;
01108
01109 enum op_stat {WAIT=0, SUCCEEDED, FAILED};
01110
01111 private:
01112 void internal_push(sequencer_operation *op) {
01113 size_type tag = (*my_sequencer)(*(op->elem));
01114
01115 this->my_tail = (tag+1 > this->my_tail) ? tag+1 : this->my_tail;
01116
01117 if(this->size() > this->capacity())
01118 this->grow_my_array(this->size());
01119 this->item(tag) = std::make_pair( *(op->elem), true );
01120 __TBB_store_with_release(op->status, SUCCEEDED);
01121 }
01122 };
01123
01125 template< typename T, typename Compare = std::less<T>, typename A=cache_aligned_allocator<T> >
01126 class priority_queue_node : public buffer_node<T, A> {
01127 public:
01128 typedef T input_type;
01129 typedef T output_type;
01130 typedef sender< input_type > predecessor_type;
01131 typedef receiver< output_type > successor_type;
01132
01134 priority_queue_node( graph &g ) : buffer_node<T, A>(g), mark(0) {}
01135
01137 priority_queue_node( const priority_queue_node &src ) : buffer_node<T, A>(src), mark(0) {}
01138
01139 protected:
01140 typedef typename buffer_node<T, A>::size_type size_type;
01141 typedef typename buffer_node<T, A>::item_type item_type;
01142 typedef typename buffer_node<T, A>::buffer_operation prio_operation;
01143
01144 enum op_stat {WAIT=0, SUCCEEDED, FAILED};
01145
01146 void handle_operations(prio_operation *op_list) {
01147 prio_operation *tmp ;
01148 bool try_forwarding=false;
01149 while (op_list) {
01150 tmp = op_list;
01151 op_list = op_list->next;
01152 switch (tmp->type) {
01153 case buffer_node<T, A>::reg_succ: this->internal_reg_succ(tmp); try_forwarding = true; break;
01154 case buffer_node<T, A>::rem_succ: this->internal_rem_succ(tmp); break;
01155 case buffer_node<T, A>::put_item: internal_push(tmp); try_forwarding = true; break;
01156 case buffer_node<T, A>::try_fwd: internal_forward(tmp); break;
01157 case buffer_node<T, A>::rel_res: internal_release(tmp); try_forwarding = true; break;
01158 case buffer_node<T, A>::con_res: internal_consume(tmp); try_forwarding = true; break;
01159 case buffer_node<T, A>::req_item: internal_pop(tmp); break;
01160 case buffer_node<T, A>::res_item: internal_reserve(tmp); break;
01161 }
01162 }
01163
01164 if (mark<this->my_tail) heapify();
01165 if (try_forwarding && !this->forwarder_busy) {
01166 this->forwarder_busy = true;
01167 task::enqueue(*new(task::allocate_additional_child_of(*(this->my_parent))) internal::forward_task< buffer_node<input_type, A> >(*this));
01168 }
01169 }
01170
01172 void internal_forward(prio_operation *op) {
01173 T i_copy;
01174 bool success = false;
01175 size_type counter = this->my_successors.size();
01176
01177 if (this->my_reserved || this->my_tail == 0) {
01178 __TBB_store_with_release(op->status, FAILED);
01179 this->forwarder_busy = false;
01180 return;
01181 }
01182
01183 while (counter>0 && this->my_tail > 0) {
01184 i_copy = this->my_array[0].first;
01185 bool msg = this->my_successors.try_put(i_copy);
01186 if ( msg == true ) {
01187 if (mark == this->my_tail) --mark;
01188 --(this->my_tail);
01189 this->my_array[0].first=this->my_array[this->my_tail].first;
01190 if (this->my_tail > 1)
01191 reheap();
01192 success = true;
01193 }
01194 --counter;
01195 }
01196 if (success && !counter)
01197 __TBB_store_with_release(op->status, SUCCEEDED);
01198 else {
01199 __TBB_store_with_release(op->status, FAILED);
01200 this->forwarder_busy = false;
01201 }
01202 }
01203
01204 void internal_push(prio_operation *op) {
01205 if ( this->my_tail >= this->my_array_size )
01206 this->grow_my_array( this->my_tail + 1 );
01207 this->my_array[this->my_tail] = std::make_pair( *(op->elem), true );
01208 ++(this->my_tail);
01209 __TBB_store_with_release(op->status, SUCCEEDED);
01210 }
01211 void internal_pop(prio_operation *op) {
01212 if ( this->my_reserved == true || this->my_tail == 0 ) {
01213 __TBB_store_with_release(op->status, FAILED);
01214 }
01215 else {
01216 if (mark<this->my_tail &&
01217 compare(this->my_array[0].first,
01218 this->my_array[this->my_tail-1].first)) {
01219
01220
01221 *(op->elem) = this->my_array[this->my_tail-1].first;
01222 --(this->my_tail);
01223 __TBB_store_with_release(op->status, SUCCEEDED);
01224 }
01225 else {
01226 *(op->elem) = this->my_array[0].first;
01227 if (mark == this->my_tail) --mark;
01228 --(this->my_tail);
01229 __TBB_store_with_release(op->status, SUCCEEDED);
01230 this->my_array[0].first=this->my_array[this->my_tail].first;
01231 if (this->my_tail > 1)
01232 reheap();
01233 }
01234 }
01235 }
01236 void internal_reserve(prio_operation *op) {
01237 if (this->my_reserved == true || this->my_tail == 0) {
01238 __TBB_store_with_release(op->status, FAILED);
01239 }
01240 else {
01241 this->my_reserved = true;
01242 *(op->elem) = reserved_item = this->my_array[0].first;
01243 if (mark == this->my_tail) --mark;
01244 --(this->my_tail);
01245 __TBB_store_with_release(op->status, SUCCEEDED);
01246 this->my_array[0].first = this->my_array[this->my_tail].first;
01247 if (this->my_tail > 1)
01248 reheap();
01249 }
01250 }
01251 void internal_consume(prio_operation *op) {
01252 this->my_reserved = false;
01253 __TBB_store_with_release(op->status, SUCCEEDED);
01254 }
01255 void internal_release(prio_operation *op) {
01256 if (this->my_tail >= this->my_array_size)
01257 this->grow_my_array( this->my_tail + 1 );
01258 this->my_array[this->my_tail] = std::make_pair(reserved_item, true);
01259 ++(this->my_tail);
01260 this->my_reserved = false;
01261 __TBB_store_with_release(op->status, SUCCEEDED);
01262 heapify();
01263 }
01264 private:
01265 Compare compare;
01266 size_type mark;
01267 input_type reserved_item;
01268
01269 void heapify() {
01270 if (!mark) mark = 1;
01271 for (; mark<this->my_tail; ++mark) {
01272 size_type cur_pos = mark;
01273 input_type to_place = this->my_array[mark].first;
01274 do {
01275 size_type parent = (cur_pos-1)>>1;
01276 if (!compare(this->my_array[parent].first, to_place))
01277 break;
01278 this->my_array[cur_pos].first = this->my_array[parent].first;
01279 cur_pos = parent;
01280 } while( cur_pos );
01281 this->my_array[cur_pos].first = to_place;
01282 }
01283 }
01284
01285 void reheap() {
01286 size_type cur_pos=0, child=1;
01287 while (child < mark) {
01288 size_type target = child;
01289 if (child+1<mark &&
01290 compare(this->my_array[child].first,
01291 this->my_array[child+1].first))
01292 ++target;
01293
01294 if (compare(this->my_array[target].first,
01295 this->my_array[this->my_tail].first))
01296 break;
01297 this->my_array[cur_pos].first = this->my_array[target].first;
01298 cur_pos = target;
01299 child = (cur_pos<<1)+1;
01300 }
01301 this->my_array[cur_pos].first = this->my_array[this->my_tail].first;
01302 }
01303 };
01304
01306
01309 template< typename T >
01310 class limiter_node : public graph_node, public receiver< T >, public sender< T > {
01311 public:
01312
01313 typedef T input_type;
01314 typedef T output_type;
01315 typedef sender< input_type > predecessor_type;
01316 typedef receiver< output_type > successor_type;
01317
01318 private:
01319
01320 task *my_root_task;
01321 size_t my_threshold;
01322 size_t my_count;
01323 internal::predecessor_cache< T > my_predecessors;
01324 spin_mutex my_mutex;
01325 internal::broadcast_cache< T > my_successors;
01326 int init_decrement_predecessors;
01327
01328 friend class internal::forward_task< limiter_node<T> >;
01329
01330
01331 friend class internal::decrementer< limiter_node<T> >;
01332
01333 void decrement_counter() {
01334 input_type v;
01335
01336
01337 if ( my_predecessors.get_item( v ) == false
01338 || my_successors.try_put(v) == false ) {
01339 spin_mutex::scoped_lock lock(my_mutex);
01340 --my_count;
01341 if ( !my_predecessors.empty() )
01342 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
01343 internal::forward_task< limiter_node<T> >( *this ) );
01344 }
01345 }
01346
01347 void forward() {
01348 {
01349 spin_mutex::scoped_lock lock(my_mutex);
01350 if ( my_count < my_threshold )
01351 ++my_count;
01352 else
01353 return;
01354 }
01355 decrement_counter();
01356 }
01357
01358 public:
01359
01361 internal::decrementer< limiter_node<T> > decrement;
01362
01364 limiter_node(graph &g, size_t threshold, int num_decrement_predecessors=0) :
01365 my_root_task(g.root_task()), my_threshold(threshold), my_count(0),
01366 init_decrement_predecessors(num_decrement_predecessors),
01367 decrement(num_decrement_predecessors)
01368 {
01369 my_predecessors.set_owner(this);
01370 my_successors.set_owner(this);
01371 decrement.set_owner(this);
01372 }
01373
01375 limiter_node( const limiter_node& src ) :
01376 graph_node(), receiver<T>(), sender<T>(),
01377 my_root_task(src.my_root_task), my_threshold(src.my_threshold), my_count(0),
01378 init_decrement_predecessors(src.init_decrement_predecessors),
01379 decrement(src.init_decrement_predecessors)
01380 {
01381 my_predecessors.set_owner(this);
01382 my_successors.set_owner(this);
01383 decrement.set_owner(this);
01384 }
01385
01387 bool register_successor( receiver<output_type> &r ) {
01388 my_successors.register_successor(r);
01389 return true;
01390 }
01391
01393
01394 bool remove_successor( receiver<output_type> &r ) {
01395 r.remove_predecessor(*this);
01396 my_successors.remove_successor(r);
01397 return true;
01398 }
01399
01401 bool try_put( const T &t ) {
01402 {
01403 spin_mutex::scoped_lock lock(my_mutex);
01404 if ( my_count >= my_threshold )
01405 return false;
01406 else
01407 ++my_count;
01408 }
01409
01410 bool msg = my_successors.try_put(t);
01411
01412 if ( msg != true ) {
01413 spin_mutex::scoped_lock lock(my_mutex);
01414 --my_count;
01415 if ( !my_predecessors.empty() )
01416 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
01417 internal::forward_task< limiter_node<T> >( *this ) );
01418 }
01419
01420 return msg;
01421 }
01422
01424 bool register_predecessor( predecessor_type &src ) {
01425 spin_mutex::scoped_lock lock(my_mutex);
01426 my_predecessors.add( src );
01427 if ( my_count < my_threshold && !my_successors.empty() )
01428 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
01429 internal::forward_task< limiter_node<T> >( *this ) );
01430 return true;
01431 }
01432
01434 bool remove_predecessor( predecessor_type &src ) {
01435 my_predecessors.remove( src );
01436 return true;
01437 }
01438
01439 };
01440
01441 #include "internal/_flow_graph_join_impl.h"
01442
01443 using internal::reserving_port;
01444 using internal::queueing_port;
01445 using internal::tag_matching_port;
01446 using internal::input_port;
01447 using internal::tag_value;
01448 using internal::NO_TAG;
01449
01450 template<typename OutputTuple, graph_buffer_policy JP=queueing> class join_node;
01451
01452 template<typename OutputTuple>
01453 class join_node<OutputTuple,reserving>: public internal::unfolded_join_node<std::tuple_size<OutputTuple>::value, reserving_port, OutputTuple, reserving> {
01454 private:
01455 static const int N = std::tuple_size<OutputTuple>::value;
01456 typedef typename internal::unfolded_join_node<N, reserving_port, OutputTuple, reserving> unfolded_type;
01457 public:
01458 typedef OutputTuple output_type;
01459 typedef typename unfolded_type::input_ports_tuple_type input_ports_tuple_type;
01460 join_node(graph &g) : unfolded_type(g) { }
01461 join_node(const join_node &other) : unfolded_type(other) {}
01462 };
01463
01464 template<typename OutputTuple>
01465 class join_node<OutputTuple,queueing>: public internal::unfolded_join_node<std::tuple_size<OutputTuple>::value, queueing_port, OutputTuple, queueing> {
01466 private:
01467 static const int N = std::tuple_size<OutputTuple>::value;
01468 typedef typename internal::unfolded_join_node<N, queueing_port, OutputTuple, queueing> unfolded_type;
01469 public:
01470 typedef OutputTuple output_type;
01471 typedef typename unfolded_type::input_ports_tuple_type input_ports_tuple_type;
01472 join_node(graph &g) : unfolded_type(g) { }
01473 join_node(const join_node &other) : unfolded_type(other) {}
01474 };
01475
01476
01477 template<typename OutputTuple>
01478 class join_node<OutputTuple, tag_matching> : public internal::unfolded_join_node<std::tuple_size<OutputTuple>::value,
01479 tag_matching_port, OutputTuple, tag_matching> {
01480 private:
01481 static const int N = std::tuple_size<OutputTuple>::value;
01482 typedef typename internal::unfolded_join_node<N, tag_matching_port, OutputTuple, tag_matching> unfolded_type;
01483 public:
01484 typedef OutputTuple output_type;
01485 typedef typename unfolded_type::input_ports_tuple_type input_ports_tuple_type;
01486 template<typename B0, typename B1>
01487 join_node(graph &g, B0 b0, B1 b1) : unfolded_type(g, b0, b1) { }
01488 template<typename B0, typename B1, typename B2>
01489 join_node(graph &g, B0 b0, B1 b1, B2 b2) : unfolded_type(g, b0, b1, b2) { }
01490 template<typename B0, typename B1, typename B2, typename B3>
01491 join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3) : unfolded_type(g, b0, b1, b2, b3) { }
01492 template<typename B0, typename B1, typename B2, typename B3, typename B4>
01493 join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4) : unfolded_type(g, b0, b1, b2, b3, b4) { }
01494 template<typename B0, typename B1, typename B2, typename B3, typename B4, typename B5>
01495 join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4, B5 b5) : unfolded_type(g, b0, b1, b2, b3, b4, b5) { }
01496 template<typename B0, typename B1, typename B2, typename B3, typename B4, typename B5, typename B6>
01497 join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4, B5 b5, B6 b6) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6) { }
01498 template<typename B0, typename B1, typename B2, typename B3, typename B4, typename B5, typename B6, typename B7>
01499 join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4, B5 b5, B6 b6, B7 b7) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7) { }
01500 template<typename B0, typename B1, typename B2, typename B3, typename B4, typename B5, typename B6, typename B7, typename B8>
01501 join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4, B5 b5, B6 b6, B7 b7, B8 b8) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8) { }
01502 template<typename B0, typename B1, typename B2, typename B3, typename B4, typename B5, typename B6, typename B7, typename B8, typename B9>
01503 join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4, B5 b5, B6 b6, B7 b7, B8 b8, B9 b9) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8, b9) { }
01504 join_node(const join_node &other) : unfolded_type(other) {}
01505 };
01506
01507 #if TBB_PREVIEW_GRAPH_NODES
01508
01509 #include "internal/_flow_graph_or_impl.h"
01510
01511 template<typename InputTuple>
01512 class or_node : public internal::unfolded_or_node<InputTuple> {
01513 private:
01514 static const int N = std::tuple_size<InputTuple>::value;
01515 public:
01516 typedef typename internal::or_output_type<InputTuple>::type output_type;
01517 typedef typename internal::unfolded_or_node<InputTuple> unfolded_type;
01518 or_node() : unfolded_type() { }
01519
01520 or_node( const or_node& ) : unfolded_type() { }
01521 };
01522 #endif // TBB_PREVIEW_GRAPH_NODES
01523
01525 template< typename T >
01526 inline void make_edge( sender<T> &p, receiver<T> &s ) {
01527 p.register_successor( s );
01528 }
01529
01531 template< typename T >
01532 inline void remove_edge( sender<T> &p, receiver<T> &s ) {
01533 p.remove_successor( s );
01534 }
01535
01537 template< typename Body, typename Node >
01538 Body copy_body( Node &n ) {
01539 return n.template copy_function_object<Body>();
01540 }
01541
01542
01543 }
01544
01545 using interface6::graph;
01546 using interface6::graph_node;
01547 using interface6::continue_msg;
01548 using interface6::sender;
01549 using interface6::receiver;
01550 using interface6::continue_receiver;
01551
01552 using interface6::source_node;
01553 using interface6::function_node;
01554 #if TBB_PREVIEW_GRAPH_NODES
01555 using interface6::multioutput_function_node;
01556 using interface6::split_node;
01557 using interface6::internal::output_port;
01558 using interface6::or_node;
01559 #endif
01560 using interface6::continue_node;
01561 using interface6::overwrite_node;
01562 using interface6::write_once_node;
01563 using interface6::broadcast_node;
01564 using interface6::buffer_node;
01565 using interface6::queue_node;
01566 using interface6::sequencer_node;
01567 using interface6::priority_queue_node;
01568 using interface6::limiter_node;
01569 using namespace interface6::internal::graph_policy_namespace;
01570 using interface6::join_node;
01571 using interface6::input_port;
01572 using interface6::copy_body;
01573 using interface6::make_edge;
01574 using interface6::remove_edge;
01575 using interface6::internal::NO_TAG;
01576 using interface6::internal::tag_value;
01577
01578 }
01579 }
01580
01581 #endif // __TBB_flow_graph_H