flow_graph.h

Go to the documentation of this file.
00001 /*
00002     Copyright 2005-2011 Intel Corporation.  All Rights Reserved.
00003 
00004     The source code contained or described herein and all documents related
00005     to the source code ("Material") are owned by Intel Corporation or its
00006     suppliers or licensors.  Title to the Material remains with Intel
00007     Corporation or its suppliers and licensors.  The Material is protected
00008     by worldwide copyright laws and treaty provisions.  No part of the
00009     Material may be used, copied, reproduced, modified, published, uploaded,
00010     posted, transmitted, distributed, or disclosed in any way without
00011     Intel's prior express written permission.
00012 
00013     No license under any patent, copyright, trade secret or other
00014     intellectual property right is granted to or conferred upon you by
00015     disclosure or delivery of the Materials, either expressly, by
00016     implication, inducement, estoppel or otherwise.  Any license under such
00017     intellectual property rights must be express and approved by Intel in
00018     writing.
00019 */
00020 
00021 #ifndef __TBB_flow_graph_H
00022 #define __TBB_flow_graph_H
00023 
00024 #include "tbb_stddef.h"
00025 #include "atomic.h"
00026 #include "spin_mutex.h"
00027 #include "null_mutex.h"
00028 #include "spin_rw_mutex.h"
00029 #include "null_rw_mutex.h"
00030 #include "task.h"
00031 #include "concurrent_vector.h"
00032 #include "internal/_aggregator_impl.h"
00033 
00034 // use the VC10 or gcc version of tuple if it is available.
00035 #if TBB_IMPLEMENT_CPP0X && (!defined(_MSC_VER) || _MSC_VER < 1600)
00036 #define TBB_PREVIEW_TUPLE 1
00037 #include "compat/tuple"
00038 #else
00039 #include <tuple>
00040 #endif
00041 
00042 #include<list>
00043 #include<queue>
00044 
00055 namespace tbb {
00056 namespace flow {
00057 
00059 enum concurrency { unlimited = 0, serial = 1 };
00060 
00061 namespace interface6 {
00062 
00064 class graph_node {
00065 public:
00066     virtual ~graph_node() {} 
00067 }; 
00068 
00070 class continue_msg {};
00071         
00072 template< typename T > class sender;
00073 template< typename T > class receiver;
00074 class continue_receiver;
00075         
00077 template< typename T >
00078 class sender {
00079 public:
00081     typedef T output_type;
00082         
00084     typedef receiver<T> successor_type;
00085         
00086     virtual ~sender() {}
00087         
00089     virtual bool register_successor( successor_type &r ) = 0;
00090         
00092     virtual bool remove_successor( successor_type &r ) = 0;
00093         
00095     virtual bool try_get( T & ) { return false; }
00096         
00098     virtual bool try_reserve( T & ) { return false; }
00099         
00101     virtual bool try_release( ) { return false; }
00102         
00104     virtual bool try_consume( ) { return false; }
00105         
00106 };
00107         
00108         
00110 template< typename T >
00111 class receiver {
00112 public:
00113         
00115     typedef T input_type;
00116         
00118     typedef sender<T> predecessor_type;
00119         
00121     virtual ~receiver() {}
00122         
00124     virtual bool try_put( const T& t ) = 0;
00125         
00127     virtual bool register_predecessor( predecessor_type & ) { return false; }
00128         
00130     virtual bool remove_predecessor( predecessor_type & ) { return false; }
00131         
00132 };
00133         
00135 
00136 class continue_receiver : public receiver< continue_msg > {
00137 public:
00138         
00140     typedef continue_msg input_type;
00141         
00143     typedef sender< continue_msg > predecessor_type;
00144         
00146     continue_receiver( int number_of_predecessors = 0 ) { 
00147         my_predecessor_count = my_initial_predecessor_count = number_of_predecessors;
00148         my_current_count = 0;
00149     }
00150         
00152     continue_receiver( const continue_receiver& src ) : receiver<continue_msg>() { 
00153         my_predecessor_count = my_initial_predecessor_count = src.my_initial_predecessor_count;
00154         my_current_count = 0;
00155     }
00156         
00158     virtual ~continue_receiver() { }
00159         
00161     /* override */ bool register_predecessor( predecessor_type & ) {
00162         spin_mutex::scoped_lock l(my_mutex);
00163         ++my_predecessor_count;
00164         return true;
00165     }
00166         
00168 
00171     /* override */ bool remove_predecessor( predecessor_type & ) {
00172         spin_mutex::scoped_lock l(my_mutex);
00173         --my_predecessor_count;
00174         return true;
00175     }
00176         
00178 
00180     /* override */ bool try_put( const input_type & ) {
00181         {
00182             spin_mutex::scoped_lock l(my_mutex);
00183             if ( ++my_current_count < my_predecessor_count ) 
00184                 return true;
00185             else
00186                 my_current_count = 0;
00187         }
00188         execute();
00189         return true;
00190     }
00191         
00192 protected:
00193         
00194     spin_mutex my_mutex;
00195     int my_predecessor_count;
00196     int my_current_count;
00197     int my_initial_predecessor_count;
00198         
00200 
00202     virtual void execute() = 0;
00203         
00204 };
00205 
00206 #include "internal/_flow_graph_impl.h"
00207 using namespace internal::graph_policy_namespace;
00208 
00210 
00211 class graph : tbb::internal::no_copy {
00212         
00213     template< typename Body >
00214     class run_task : public task {
00215     public: 
00216         run_task( Body& body ) : my_body(body) {}
00217         task *execute() {
00218             my_body();
00219             return NULL;
00220         }
00221     private:
00222         Body my_body;
00223     };
00224         
00225     template< typename Receiver, typename Body >
00226     class run_and_put_task : public task {
00227     public: 
00228         run_and_put_task( Receiver &r, Body& body ) : my_receiver(r), my_body(body) {}
00229         task *execute() {
00230             my_receiver.try_put( my_body() );
00231             return NULL;
00232         }
00233     private:
00234         Receiver &my_receiver;
00235         Body my_body;
00236     };
00237         
00238 public:
00239         
00240         
00242     graph() : my_root_task( new ( task::allocate_root( ) ) empty_task ) {
00243         my_root_task->set_ref_count(1);
00244     }
00245         
00247 
00249     ~graph() {
00250         wait_for_all();
00251         my_root_task->set_ref_count(0);
00252         task::destroy( *my_root_task );
00253     }
00254         
00255         
00257 
00259     void increment_wait_count() { 
00260         if (my_root_task)
00261             my_root_task->increment_ref_count();
00262     }
00263         
00265 
00267     void decrement_wait_count() { 
00268         if (my_root_task)
00269             my_root_task->decrement_ref_count(); 
00270     }
00271         
00273 
00275     template< typename Receiver, typename Body >
00276         void run( Receiver &r, Body body ) {
00277        task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) ) 
00278            run_and_put_task< Receiver, Body >( r, body ) );
00279     }
00280         
00282 
00284     template< typename Body >
00285     void run( Body body ) {
00286        task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) ) 
00287            run_task< Body >( body ) );
00288     }
00289         
00291 
00292     void wait_for_all() {
00293         if (my_root_task)
00294             my_root_task->wait_for_all();
00295         my_root_task->set_ref_count(1);
00296     }
00297         
00299     task * root_task() {
00300         return my_root_task;
00301     }
00302         
00303 private:
00304         
00305     task *my_root_task;
00306         
00307 };
00308 
00309 #include "internal/_flow_graph_node_impl.h"
00310 
00312 template < typename Output >
00313 class source_node : public graph_node, public sender< Output > {
00314 public:
00315         
00317     typedef Output output_type;           
00318         
00320     typedef receiver< Output > successor_type;
00321         
00323     template< typename Body >
00324     source_node( graph &g, Body body, bool is_active = true )
00325         : my_root_task(g.root_task()), my_active(is_active), init_my_active(is_active),
00326         my_body( new internal::source_body_leaf< output_type, Body>(body) ),
00327         my_reserved(false), my_has_cached_item(false) 
00328     { 
00329         my_successors.set_owner(this);
00330     }
00331         
00333     source_node( const source_node& src ) :
00334         graph_node(), sender<Output>(),
00335         my_root_task( src.my_root_task), my_active(src.init_my_active),
00336         init_my_active(src.init_my_active), my_body( src.my_body->clone() ),
00337         my_reserved(false), my_has_cached_item(false)
00338     {
00339         my_successors.set_owner(this);
00340     }
00341 
00343     ~source_node() { delete my_body; }
00344         
00346     /* override */ bool register_successor( receiver<output_type> &r ) {
00347         spin_mutex::scoped_lock lock(my_mutex);
00348         my_successors.register_successor(r);
00349         if ( my_active )
00350             spawn_put();
00351         return true;
00352     }
00353         
00355     /* override */ bool remove_successor( receiver<output_type> &r ) {
00356         spin_mutex::scoped_lock lock(my_mutex);
00357         my_successors.remove_successor(r);
00358         return true;
00359     }
00360         
00362     /*override */ bool try_get( output_type &v ) {
00363         spin_mutex::scoped_lock lock(my_mutex);
00364         if ( my_reserved )  
00365             return false;
00366         
00367         if ( my_has_cached_item ) {
00368             v = my_cached_item;
00369             my_has_cached_item = false;
00370         } else if ( (*my_body)(v) == false ) {
00371             return false;
00372         }
00373         return true;
00374     }
00375         
00377     /* override */ bool try_reserve( output_type &v ) {
00378         spin_mutex::scoped_lock lock(my_mutex);
00379         if ( my_reserved ) {
00380             return false;
00381         }
00382         
00383         if ( !my_has_cached_item && (*my_body)(my_cached_item) )  
00384             my_has_cached_item = true;
00385         
00386         if ( my_has_cached_item ) {
00387             v = my_cached_item;
00388             my_reserved = true;
00389             return true;
00390         } else {
00391             return false;
00392         }
00393     }
00394         
00396 
00397     /* override */ bool try_release( ) {
00398         spin_mutex::scoped_lock lock(my_mutex);
00399         __TBB_ASSERT( my_reserved && my_has_cached_item, "releasing non-existent reservation" );
00400         my_reserved = false;
00401         spawn_put();
00402         return true;
00403     }
00404         
00406     /* override */ bool try_consume( ) {
00407         spin_mutex::scoped_lock lock(my_mutex);
00408         __TBB_ASSERT( my_reserved && my_has_cached_item, "consuming non-existent reservation" );
00409         my_reserved = false;
00410         my_has_cached_item = false;
00411         if ( !my_successors.empty() ) {
00412             spawn_put();
00413         }
00414         return true;
00415     }
00416         
00418     void activate() {
00419         spin_mutex::scoped_lock lock(my_mutex);
00420         my_active = true;
00421         if ( !my_successors.empty() )
00422             spawn_put();
00423     }
00424         
00425 private:
00426         
00427     task *my_root_task;
00428     spin_mutex my_mutex;
00429     bool my_active;
00430     bool init_my_active;
00431     internal::source_body<output_type> *my_body;
00432     internal::broadcast_cache< output_type > my_successors;
00433     bool my_reserved;
00434     bool my_has_cached_item;
00435     output_type my_cached_item;
00436         
00437     friend class internal::source_task< source_node< output_type > >;
00438         
00440     /* override */ void apply_body( ) {
00441         output_type v;
00442         if ( try_reserve(v) == false )
00443             return;
00444         
00445         if ( my_successors.try_put( v ) ) 
00446             try_consume();
00447         else
00448             try_release();
00449     }
00450         
00452     /* override */ void spawn_put( ) {
00453         task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) ) 
00454            internal::source_task< source_node< output_type > >( *this ) ); 
00455     }
00456         
00457 };
00458         
00460 template < typename Input, typename Output = continue_msg, graph_buffer_policy = queueing, typename Allocator=cache_aligned_allocator<Input> >
00461 class function_node : public graph_node, public internal::function_input<Input,Output,Allocator>, public internal::function_output<Output> {
00462 public:
00463         
00464     typedef Input input_type;
00465     typedef Output output_type;
00466     typedef sender< input_type > predecessor_type;
00467     typedef receiver< output_type > successor_type;
00468     typedef internal::function_output<output_type> fOutput_type;
00469         
00471     template< typename Body >
00472     function_node( graph &g, size_t concurrency, Body body )
00473     : internal::function_input<input_type,output_type,Allocator>( g, concurrency, body ) {
00474     }
00475 
00477     function_node( const function_node& src ) : 
00478         graph_node(), internal::function_input<input_type,output_type,Allocator>( src ),
00479         fOutput_type() {}
00480         
00481 protected:
00482 
00483     /* override */ internal::broadcast_cache<output_type> &successors () { return fOutput_type::my_successors; }
00484         
00485 };
00486 
00488 template < typename Input, typename Output, typename Allocator >
00489 class function_node<Input,Output,queueing,Allocator> : public graph_node, public internal::function_input<Input,Output,Allocator>, public internal::function_output<Output> {
00490 public:
00491         
00492     typedef Input input_type;
00493     typedef Output output_type;
00494     typedef sender< input_type > predecessor_type;
00495     typedef receiver< output_type > successor_type;
00496     typedef internal::function_input<input_type,output_type,Allocator> fInput_type;
00497     typedef internal::function_input_queue<input_type, Allocator> queue_type;
00498     typedef internal::function_output<output_type> fOutput_type;
00499         
00501     template< typename Body >
00502     function_node( graph &g, size_t concurrency, Body body ) : fInput_type( g, concurrency, body, new queue_type() ) {
00503     }
00504 
00506     function_node( const function_node& src ) : 
00507         graph_node(), fInput_type( src, new queue_type() ) , fOutput_type() { }
00508 
00509 protected:
00510 
00511     /* override */ internal::broadcast_cache<output_type> &successors () { return fOutput_type::my_successors; }
00512         
00513 };
00514 
00515 #include "tbb/internal/_flow_graph_types_impl.h"
00516 
00517 #if TBB_PREVIEW_GRAPH_NODES
00519 // Output is a tuple of output types.
00520 template < typename Input, typename Output, graph_buffer_policy = queueing, typename Allocator=cache_aligned_allocator<Input> >
00521 class multioutput_function_node : 
00522     public graph_node, 
00523     public internal::multioutput_function_input
00524     <  
00525         Input, 
00526         typename internal::wrap_tuple_elements<
00527             std::tuple_size<Output>::value,  // #elements in tuple
00528             internal::function_output,  // wrap this around each element
00529             Output // the tuple providing the types
00530         >::type,
00531         Allocator
00532     > {
00533 private:
00534     static const int N = std::tuple_size<Output>::value;
00535 public:
00536     typedef Input input_type;
00537     typedef typename internal::wrap_tuple_elements<N,internal::function_output, Output>::type ports_type;
00538 private:
00539     typedef typename internal::multioutput_function_input<input_type, ports_type, Allocator> base_type;
00540     typedef typename internal::function_input_queue<input_type,Allocator> queue_type;
00541 public:
00542     template<typename Body>
00543     multioutput_function_node( graph &g, size_t concurrency, Body body ) : base_type(g,concurrency, body) {}
00544     multioutput_function_node( const multioutput_function_node &other) :
00545         graph_node(), base_type(other) {}
00546     // all the guts are in multioutput_function_input...
00547 
00548 };  // multioutput_function_node
00549         
00550 template < typename Input, typename Output, typename Allocator >
00551 class multioutput_function_node<Input,Output,queueing,Allocator> : public graph_node, public internal::multioutput_function_input<Input, 
00552     typename internal::wrap_tuple_elements<std::tuple_size<Output>::value, internal::function_output, Output>::type, Allocator> {
00553     static const int N = std::tuple_size<Output>::value;
00554 public:
00555     typedef Input input_type;
00556     typedef typename internal::wrap_tuple_elements<N, internal::function_output, Output>::type ports_type;
00557 private:
00558     typedef typename internal::multioutput_function_input<input_type, ports_type, Allocator> base_type;
00559     typedef typename internal::function_input_queue<input_type,Allocator> queue_type;
00560 public:
00561 
00562     template<typename Body>
00563     multioutput_function_node( graph &g, size_t concurrency, Body body) : base_type(g,concurrency, body, new queue_type()) {}
00564     multioutput_function_node( const multioutput_function_node &other) :
00565         graph_node(), base_type(other, new queue_type()) {}
00566 
00567 };  // multioutput_function_node
00568 
00570 //  successors.  The node has unlimited concurrency, so though it is marked as
00571 //  "rejecting" it does not reject inputs.
00572 template<typename TupleType, typename Allocator=cache_aligned_allocator<TupleType> >
00573 class split_node : public multioutput_function_node<TupleType, TupleType, rejecting, Allocator> {
00574     static const int N = std::tuple_size<TupleType>::value;
00575     typedef multioutput_function_node<TupleType,TupleType,rejecting,Allocator> base_type;
00576 public:
00577     typedef typename base_type::ports_type ports_type;
00578 private:
00579 
00580     struct splitting_body {
00581         void operator()(const TupleType& t, ports_type &p) {
00582             internal::emit_element<N>::emit_this(t, p);
00583         }
00584     };
00585 public:
00586     typedef TupleType input_type;
00587     typedef Allocator allocator_type;
00588     split_node(graph &g) : base_type(g, unlimited, splitting_body()) { }
00589     split_node( const split_node & other) : base_type(other) { }
00590 };
00591 #endif  // TBB_PREVIEW_GRAPH_NODES
00592 
00594 template <typename Output>
00595 class continue_node : public graph_node, public internal::continue_input<Output>, public internal::function_output<Output> {
00596 public:
00597         
00598     typedef continue_msg input_type;
00599     typedef Output output_type;
00600     typedef sender< input_type > predecessor_type;
00601     typedef receiver< output_type > successor_type;
00602     typedef internal::function_output<output_type> fOutput_type;
00603         
00605      template <typename Body >
00606      continue_node( graph &g, Body body )
00607              : internal::continue_input<output_type>( g, body ) {
00608      }
00609         
00611     template <typename Body >
00612     continue_node( graph &g, int number_of_predecessors, Body body )
00613         : internal::continue_input<output_type>( g, number_of_predecessors, body )
00614     {
00615     }
00616  
00618     continue_node( const continue_node& src ) :
00619         graph_node(), internal::continue_input<output_type>(src),
00620         internal::function_output<Output>() { }
00621 
00622 protected:
00623         
00624     /* override */ internal::broadcast_cache<output_type> &successors () { return fOutput_type::my_successors; }
00625         
00626 };
00627         
00628 template< typename T >
00629 class overwrite_node : public graph_node, public receiver<T>, public sender<T> {
00630 public:
00631         
00632     typedef T input_type;
00633     typedef T output_type;
00634     typedef sender< input_type > predecessor_type;
00635     typedef receiver< output_type > successor_type;
00636         
00637     overwrite_node() : my_buffer_is_valid(false) {
00638         my_successors.set_owner( this );
00639     }
00640 
00641     // Copy constructor; doesn't take anything from src; default won't work
00642     overwrite_node( const overwrite_node& ) : 
00643         graph_node(), receiver<T>(), sender<T>(), my_buffer_is_valid(false) {
00644         my_successors.set_owner( this );
00645     }
00646         
00647     ~overwrite_node() {}
00648         
00649     /* override */ bool register_successor( successor_type &s ) {
00650         spin_mutex::scoped_lock l( my_mutex );
00651         if ( my_buffer_is_valid ) {
00652             // We have a valid value that must be forwarded immediately.
00653             if ( s.try_put( my_buffer ) || !s.register_predecessor( *this  ) ) {
00654                 // We add the successor: it accepted our put or it rejected it but won't let use become a predecessor
00655                 my_successors.register_successor( s );
00656                 return true;
00657             } else {
00658                 // We don't add the successor: it rejected our put and we became its predecessor instead
00659                 return false;
00660             }
00661         } else {
00662             // No valid value yet, just add as successor
00663             my_successors.register_successor( s );
00664             return true;
00665         }
00666     }
00667         
00668     /* override */ bool remove_successor( successor_type &s ) {
00669         spin_mutex::scoped_lock l( my_mutex );
00670         my_successors.remove_successor(s);
00671         return true;
00672     }
00673         
00674     /* override */ bool try_put( const T &v ) {
00675         spin_mutex::scoped_lock l( my_mutex );
00676         my_buffer = v;
00677         my_buffer_is_valid = true;
00678         my_successors.try_put(v);
00679         return true;
00680     }
00681         
00682     /* override */ bool try_get( T &v ) {
00683         spin_mutex::scoped_lock l( my_mutex );
00684         if ( my_buffer_is_valid ) {
00685             v = my_buffer;
00686             return true;
00687         } else {
00688             return false;
00689         }
00690     }
00691         
00692     bool is_valid() {
00693        spin_mutex::scoped_lock l( my_mutex );
00694        return my_buffer_is_valid;
00695     }
00696         
00697     void clear() {
00698        spin_mutex::scoped_lock l( my_mutex );
00699        my_buffer_is_valid = false;
00700     }
00701         
00702 protected:
00703         
00704     spin_mutex my_mutex;
00705     internal::broadcast_cache< T, null_rw_mutex > my_successors;
00706     T my_buffer;
00707     bool my_buffer_is_valid;
00708         
00709 };
00710         
00711 template< typename T >
00712 class write_once_node : public overwrite_node<T> {
00713 public:
00714         
00715     typedef T input_type;
00716     typedef T output_type;
00717     typedef sender< input_type > predecessor_type;
00718     typedef receiver< output_type > successor_type;
00719         
00721     write_once_node() : overwrite_node<T>() {}
00722 
00724     write_once_node( const write_once_node& src ) : overwrite_node<T>(src) {}
00725 
00726     /* override */ bool try_put( const T &v ) {
00727         spin_mutex::scoped_lock l( this->my_mutex );
00728         if ( this->my_buffer_is_valid ) {
00729             return false;
00730         } else {
00731             this->my_buffer = v;
00732             this->my_buffer_is_valid = true;
00733             this->my_successors.try_put(v);
00734             return true;
00735         }
00736     }
00737 };
00738         
00740 template <typename T>
00741 class broadcast_node : public graph_node, public receiver<T>, public sender<T> {
00742         
00743     internal::broadcast_cache<T> my_successors;
00744         
00745 public:
00746         
00747     typedef T input_type;
00748     typedef T output_type;
00749     typedef sender< input_type > predecessor_type;
00750     typedef receiver< output_type > successor_type;
00751         
00752     broadcast_node( ) {
00753         my_successors.set_owner( this );
00754     }
00755         
00756     // Copy constructor
00757     broadcast_node( const broadcast_node& ) : graph_node(), receiver<T>(), sender<T>() {
00758         my_successors.set_owner( this );
00759     }
00760         
00762     virtual bool register_successor( receiver<T> &r ) {
00763         my_successors.register_successor( r );
00764         return true;
00765     }
00766         
00768     virtual bool remove_successor( receiver<T> &r ) {
00769         my_successors.remove_successor( r );
00770         return true;
00771     }
00772         
00773     /* override */ bool try_put( const T &t ) {
00774         my_successors.try_put(t);
00775         return true;
00776     }
00777         
00778 };
00779 
00780 #include "internal/_flow_graph_item_buffer_impl.h"
00781 
00783 template <typename T, typename A=cache_aligned_allocator<T> >
00784 class buffer_node : public graph_node, public reservable_item_buffer<T, A>, public receiver<T>, public sender<T> {
00785 public:
00786     typedef T input_type;
00787     typedef T output_type;
00788     typedef sender< input_type > predecessor_type;
00789     typedef receiver< output_type > successor_type;
00790     typedef buffer_node<T, A> my_class;
00791 protected:
00792     typedef size_t size_type;
00793     internal::round_robin_cache< T, null_rw_mutex > my_successors;
00794         
00795     task *my_parent;
00796         
00797     friend class internal::forward_task< buffer_node< T, A > >;
00798         
00799     enum op_type {reg_succ, rem_succ, req_item, res_item, rel_res, con_res, put_item, try_fwd};
00800     enum op_stat {WAIT=0, SUCCEEDED, FAILED};
00801         
00802     // implements the aggregator_operation concept
00803     class buffer_operation : public internal::aggregated_operation< buffer_operation > {
00804     public:
00805         char type;
00806         T *elem;
00807         successor_type *r;
00808         buffer_operation(const T& e, op_type t) :
00809             type(char(t)), elem(const_cast<T*>(&e)), r(NULL) {}
00810         buffer_operation(op_type t) : type(char(t)), r(NULL) {}
00811     };
00812         
00813     bool forwarder_busy;
00814     typedef internal::aggregating_functor<my_class, buffer_operation> my_handler;
00815     friend class internal::aggregating_functor<my_class, buffer_operation>;
00816     internal::aggregator< my_handler, buffer_operation> my_aggregator;
00817         
00818     virtual void handle_operations(buffer_operation *op_list) {
00819         buffer_operation *tmp;
00820         bool try_forwarding=false;
00821         while (op_list) {
00822             tmp = op_list;
00823             op_list = op_list->next;
00824             switch (tmp->type) {
00825             case reg_succ: internal_reg_succ(tmp);  try_forwarding = true; break;
00826             case rem_succ: internal_rem_succ(tmp); break;
00827             case req_item: internal_pop(tmp); break;
00828             case res_item: internal_reserve(tmp); break;
00829             case rel_res:  internal_release(tmp);  try_forwarding = true; break;
00830             case con_res:  internal_consume(tmp);  try_forwarding = true; break;
00831             case put_item: internal_push(tmp);  try_forwarding = true; break;
00832             case try_fwd:  internal_forward(tmp); break;
00833             }
00834         }
00835         if (try_forwarding && !forwarder_busy) {
00836             forwarder_busy = true;
00837             task::enqueue(*new(task::allocate_additional_child_of(*my_parent)) internal::forward_task< buffer_node<input_type, A> >(*this));
00838         }
00839     }
00840         
00842     virtual void forward() {
00843         buffer_operation op_data(try_fwd);
00844         do {
00845             op_data.status = WAIT;
00846             my_aggregator.execute(&op_data);
00847         } while (op_data.status == SUCCEEDED);
00848     }
00849         
00851     virtual void internal_reg_succ(buffer_operation *op) {
00852         my_successors.register_successor(*(op->r));
00853         __TBB_store_with_release(op->status, SUCCEEDED);
00854     }
00855         
00857     virtual void internal_rem_succ(buffer_operation *op) {
00858         my_successors.remove_successor(*(op->r));
00859         __TBB_store_with_release(op->status, SUCCEEDED);
00860     }
00861         
00863     virtual void internal_forward(buffer_operation *op) {
00864         T i_copy;
00865         bool success = false; // flagged when a successor accepts
00866         size_type counter = my_successors.size();
00867         // Try forwarding, giving each successor a chance
00868         while (counter>0 && !this->buffer_empty() && this->item_valid(this->my_tail-1)) {
00869             this->fetch_back(i_copy);
00870             if( my_successors.try_put(i_copy) ) {
00871                 this->invalidate_back();
00872                 --(this->my_tail);
00873                 success = true; // found an accepting successor
00874             }
00875             --counter;
00876         }
00877         if (success && !counter)
00878             __TBB_store_with_release(op->status, SUCCEEDED);
00879         else {
00880             __TBB_store_with_release(op->status, FAILED);
00881             forwarder_busy = false;
00882         }
00883     }
00884         
00885     virtual void internal_push(buffer_operation *op) {
00886         this->push_back(*(op->elem));
00887         __TBB_store_with_release(op->status, SUCCEEDED);
00888     }
00889         
00890     virtual void internal_pop(buffer_operation *op) {
00891         if(this->pop_back(*(op->elem))) {
00892             __TBB_store_with_release(op->status, SUCCEEDED);
00893         }
00894         else {
00895             __TBB_store_with_release(op->status, FAILED);
00896         }
00897     }
00898         
00899     virtual void internal_reserve(buffer_operation *op) {
00900         if(this->reserve_front(*(op->elem))) {
00901             __TBB_store_with_release(op->status, SUCCEEDED);
00902         }
00903         else {
00904             __TBB_store_with_release(op->status, FAILED);
00905         }
00906     }
00907         
00908     virtual void internal_consume(buffer_operation *op) {
00909         this->consume_front();
00910         __TBB_store_with_release(op->status, SUCCEEDED);
00911     }
00912         
00913     virtual void internal_release(buffer_operation *op) {
00914         this->release_front();
00915         __TBB_store_with_release(op->status, SUCCEEDED);
00916     }
00917         
00918 public:
00920     buffer_node( graph &g ) : reservable_item_buffer<T>(),
00921         my_parent( g.root_task() ), forwarder_busy(false) {
00922         my_successors.set_owner(this);
00923         my_aggregator.initialize_handler(my_handler(this));
00924     }
00925 
00927     buffer_node( const buffer_node& src ) :
00928         graph_node(), reservable_item_buffer<T>(), receiver<T>(), sender<T>(),
00929         my_parent( src.my_parent ) {
00930         forwarder_busy = false;
00931         my_successors.set_owner(this);
00932         my_aggregator.initialize_handler(my_handler(this));
00933     }
00934 
00935     virtual ~buffer_node() {}
00936         
00937     //
00938     // message sender implementation
00939     //
00940         
00942 
00943     /* override */ bool register_successor( receiver<output_type> &r ) {
00944         buffer_operation op_data(reg_succ);
00945         op_data.r = &r;
00946         my_aggregator.execute(&op_data);
00947         return true;
00948     }
00949         
00951 
00953     /* override */ bool remove_successor( receiver<output_type> &r ) {
00954         r.remove_predecessor(*this);
00955         buffer_operation op_data(rem_succ);
00956         op_data.r = &r;
00957         my_aggregator.execute(&op_data);
00958         return true;
00959     }
00960         
00962 
00964     /* override */ bool try_get( T &v ) {
00965         buffer_operation op_data(req_item);
00966         op_data.elem = &v;
00967         my_aggregator.execute(&op_data);
00968         return (op_data.status==SUCCEEDED);
00969     }
00970         
00972 
00974     /* override */ bool try_reserve( T &v ) {
00975         buffer_operation op_data(res_item);
00976         op_data.elem = &v;
00977         my_aggregator.execute(&op_data);
00978         return (op_data.status==SUCCEEDED);
00979     }
00980         
00982 
00983     /* override */ bool try_release() {
00984         buffer_operation op_data(rel_res);
00985         my_aggregator.execute(&op_data);
00986         return true;
00987     }
00988         
00990 
00991     /* override */ bool try_consume() {
00992         buffer_operation op_data(con_res);
00993         my_aggregator.execute(&op_data);
00994         return true;
00995     }
00996         
00998 
00999     /* override */ bool try_put(const T &t) {
01000         buffer_operation op_data(t, put_item);
01001         my_aggregator.execute(&op_data);
01002         return true;
01003     }
01004 };
01005         
01006         
01008 template <typename T, typename A=cache_aligned_allocator<T> >
01009 class queue_node : public buffer_node<T, A> {
01010 protected:
01011 typedef typename buffer_node<T, A>::size_type size_type;
01012 typedef typename buffer_node<T, A>::buffer_operation queue_operation;
01013         
01014     enum op_stat {WAIT=0, SUCCEEDED, FAILED};
01015         
01017     /* override */ void internal_forward(queue_operation *op) {
01018         T i_copy;
01019         bool success = false; // flagged when a successor accepts
01020         size_type counter = this->my_successors.size();
01021         if (this->my_reserved || !this->item_valid(this->my_head)){
01022             __TBB_store_with_release(op->status, FAILED);
01023             this->forwarder_busy = false;
01024             return;
01025         }
01026         // Keep trying to send items while there is at least one accepting successor
01027         while (counter>0 && this->item_valid(this->my_head)) {
01028             this->fetch_front(i_copy);
01029             if(this->my_successors.try_put(i_copy)) {
01030                  this->invalidate_front();
01031                  ++(this->my_head);
01032                 success = true; // found an accepting successor
01033             }
01034             --counter;
01035         }
01036         if (success && !counter)
01037             __TBB_store_with_release(op->status, SUCCEEDED);
01038         else {
01039             __TBB_store_with_release(op->status, FAILED);
01040             this->forwarder_busy = false;
01041         }
01042     }
01043         
01044     /* override */ void internal_pop(queue_operation *op) {
01045         if ( this->my_reserved || !this->item_valid(this->my_head)){
01046             __TBB_store_with_release(op->status, FAILED);
01047         }
01048         else {
01049             this->pop_front(*(op->elem));
01050             __TBB_store_with_release(op->status, SUCCEEDED);
01051         }
01052     }
01053     /* override */ void internal_reserve(queue_operation *op) {
01054         if (this->my_reserved || !this->item_valid(this->my_head)) {
01055             __TBB_store_with_release(op->status, FAILED);
01056         }
01057         else {
01058             this->my_reserved = true;
01059             this->fetch_front(*(op->elem));
01060             this->invalidate_front();
01061             __TBB_store_with_release(op->status, SUCCEEDED);
01062         }
01063     }
01064     /* override */ void internal_consume(queue_operation *op) {
01065         this->consume_front();
01066         __TBB_store_with_release(op->status, SUCCEEDED);
01067     }
01068         
01069 public:
01070         
01071     typedef T input_type;
01072     typedef T output_type;
01073     typedef sender< input_type > predecessor_type;
01074     typedef receiver< output_type > successor_type;
01075         
01077     queue_node( graph &g ) : buffer_node<T, A>(g) {}
01078 
01080     queue_node( const queue_node& src) : buffer_node<T, A>(src) {}
01081 };
01082         
01084 template< typename T, typename A=cache_aligned_allocator<T> >
01085 class sequencer_node : public queue_node<T, A> {
01086     internal::function_body< T, size_t > *my_sequencer;
01087 public:
01088         
01089     typedef T input_type;
01090     typedef T output_type;
01091     typedef sender< input_type > predecessor_type;
01092     typedef receiver< output_type > successor_type;
01093         
01095     template< typename Sequencer >
01096     sequencer_node( graph &g, const Sequencer& s ) : queue_node<T, A>(g),
01097         my_sequencer(new internal::function_body_leaf< T, size_t, Sequencer>(s) ) {}
01098 
01100     sequencer_node( const sequencer_node& src ) : queue_node<T, A>(src),
01101         my_sequencer( src.my_sequencer->clone() ) {}
01102         
01104     ~sequencer_node() { delete my_sequencer; }
01105 protected:
01106     typedef typename buffer_node<T, A>::size_type size_type;
01107     typedef typename buffer_node<T, A>::buffer_operation sequencer_operation;
01108         
01109     enum op_stat {WAIT=0, SUCCEEDED, FAILED};
01110         
01111 private:
01112     /* override */ void internal_push(sequencer_operation *op) {
01113         size_type tag = (*my_sequencer)(*(op->elem));
01114         
01115         this->my_tail = (tag+1 > this->my_tail) ? tag+1 : this->my_tail;
01116         
01117         if(this->size() > this->capacity())
01118             this->grow_my_array(this->size());  // tail already has 1 added to it
01119         this->item(tag) = std::make_pair( *(op->elem), true );
01120         __TBB_store_with_release(op->status, SUCCEEDED);
01121     }
01122 };
01123         
01125 template< typename T, typename Compare = std::less<T>, typename A=cache_aligned_allocator<T> >
01126 class priority_queue_node : public buffer_node<T, A> {
01127 public:
01128     typedef T input_type;
01129     typedef T output_type;
01130     typedef sender< input_type > predecessor_type;
01131     typedef receiver< output_type > successor_type;
01132         
01134     priority_queue_node( graph &g ) : buffer_node<T, A>(g), mark(0) {}
01135 
01137     priority_queue_node( const priority_queue_node &src ) : buffer_node<T, A>(src), mark(0) {}
01138         
01139 protected:
01140     typedef typename buffer_node<T, A>::size_type size_type;
01141     typedef typename buffer_node<T, A>::item_type item_type;
01142     typedef typename buffer_node<T, A>::buffer_operation prio_operation;
01143         
01144     enum op_stat {WAIT=0, SUCCEEDED, FAILED};
01145         
01146     /* override */ void handle_operations(prio_operation *op_list) {
01147         prio_operation *tmp /*, *pop_list*/ ;
01148         bool try_forwarding=false;
01149         while (op_list) {
01150             tmp = op_list;
01151             op_list = op_list->next;
01152             switch (tmp->type) {
01153             case buffer_node<T, A>::reg_succ: this->internal_reg_succ(tmp); try_forwarding = true; break;
01154             case buffer_node<T, A>::rem_succ: this->internal_rem_succ(tmp); break;
01155             case buffer_node<T, A>::put_item: internal_push(tmp); try_forwarding = true; break;
01156             case buffer_node<T, A>::try_fwd: internal_forward(tmp); break;
01157             case buffer_node<T, A>::rel_res: internal_release(tmp); try_forwarding = true; break;
01158             case buffer_node<T, A>::con_res: internal_consume(tmp); try_forwarding = true; break;
01159             case buffer_node<T, A>::req_item: internal_pop(tmp); break;
01160             case buffer_node<T, A>::res_item: internal_reserve(tmp); break;
01161             }
01162         }
01163         // process pops!  for now, no special pop processing
01164         if (mark<this->my_tail) heapify();
01165         if (try_forwarding && !this->forwarder_busy) {
01166             this->forwarder_busy = true;
01167             task::enqueue(*new(task::allocate_additional_child_of(*(this->my_parent))) internal::forward_task< buffer_node<input_type, A> >(*this));
01168         }
01169     }
01170         
01172     /* override */ void internal_forward(prio_operation *op) {
01173         T i_copy;
01174         bool success = false; // flagged when a successor accepts
01175         size_type counter = this->my_successors.size();
01176         
01177         if (this->my_reserved || this->my_tail == 0) {
01178             __TBB_store_with_release(op->status, FAILED);
01179             this->forwarder_busy = false;
01180             return;
01181         }
01182         // Keep trying to send while there exists an accepting successor
01183         while (counter>0 && this->my_tail > 0) {
01184             i_copy = this->my_array[0].first;
01185             bool msg = this->my_successors.try_put(i_copy);
01186             if ( msg == true ) {
01187                  if (mark == this->my_tail) --mark;
01188                 --(this->my_tail);
01189                 this->my_array[0].first=this->my_array[this->my_tail].first;
01190                 if (this->my_tail > 1) // don't reheap for heap of size 1
01191                     reheap();
01192                 success = true; // found an accepting successor
01193             }
01194             --counter;
01195         }
01196         if (success && !counter)
01197             __TBB_store_with_release(op->status, SUCCEEDED);
01198         else {
01199             __TBB_store_with_release(op->status, FAILED);
01200             this->forwarder_busy = false;
01201         }
01202     }
01203         
01204     /* override */ void internal_push(prio_operation *op) {
01205         if ( this->my_tail >= this->my_array_size )
01206             this->grow_my_array( this->my_tail + 1 );
01207         this->my_array[this->my_tail] = std::make_pair( *(op->elem), true );
01208         ++(this->my_tail);
01209         __TBB_store_with_release(op->status, SUCCEEDED);
01210     }
01211     /* override */ void internal_pop(prio_operation *op) {
01212         if ( this->my_reserved == true || this->my_tail == 0 ) {
01213             __TBB_store_with_release(op->status, FAILED);
01214         }
01215         else {
01216             if (mark<this->my_tail &&
01217                 compare(this->my_array[0].first,
01218                         this->my_array[this->my_tail-1].first)) {
01219                 // there are newly pushed elems; last one higher than top
01220                 // copy the data
01221                 *(op->elem) = this->my_array[this->my_tail-1].first;
01222                 --(this->my_tail);
01223                 __TBB_store_with_release(op->status, SUCCEEDED);
01224             }
01225             else { // extract and push the last element down heap
01226                 *(op->elem) = this->my_array[0].first; // copy the data
01227                 if (mark == this->my_tail) --mark;
01228                 --(this->my_tail);
01229                 __TBB_store_with_release(op->status, SUCCEEDED);
01230                 this->my_array[0].first=this->my_array[this->my_tail].first;
01231                 if (this->my_tail > 1) // don't reheap for heap of size 1
01232                     reheap();
01233             }
01234         }
01235     }
01236     /* override */ void internal_reserve(prio_operation *op) {
01237         if (this->my_reserved == true || this->my_tail == 0) {
01238             __TBB_store_with_release(op->status, FAILED);
01239         }
01240         else {
01241             this->my_reserved = true;
01242             *(op->elem) = reserved_item = this->my_array[0].first;
01243             if (mark == this->my_tail) --mark;
01244             --(this->my_tail);
01245             __TBB_store_with_release(op->status, SUCCEEDED);
01246             this->my_array[0].first = this->my_array[this->my_tail].first;
01247             if (this->my_tail > 1) // don't reheap for heap of size 1
01248                 reheap();
01249         }
01250     }
01251     /* override */ void internal_consume(prio_operation *op) {
01252         this->my_reserved = false;
01253         __TBB_store_with_release(op->status, SUCCEEDED);
01254     }
01255     /* override */ void internal_release(prio_operation *op) {
01256         if (this->my_tail >= this->my_array_size)
01257             this->grow_my_array( this->my_tail + 1 );
01258         this->my_array[this->my_tail] = std::make_pair(reserved_item, true);
01259         ++(this->my_tail);
01260         this->my_reserved = false;
01261         __TBB_store_with_release(op->status, SUCCEEDED);
01262         heapify();
01263     }
01264 private:
01265     Compare compare;
01266     size_type mark;
01267     input_type reserved_item;
01268         
01269     void heapify() {
01270         if (!mark) mark = 1;
01271         for (; mark<this->my_tail; ++mark) { // for each unheaped element
01272             size_type cur_pos = mark;
01273             input_type to_place = this->my_array[mark].first;
01274             do { // push to_place up the heap
01275                 size_type parent = (cur_pos-1)>>1;
01276                 if (!compare(this->my_array[parent].first, to_place))
01277                     break;
01278                 this->my_array[cur_pos].first = this->my_array[parent].first;
01279                 cur_pos = parent;
01280             } while( cur_pos );
01281             this->my_array[cur_pos].first = to_place;
01282         }
01283     }
01284         
01285     void reheap() {
01286         size_type cur_pos=0, child=1;
01287         while (child < mark) {
01288             size_type target = child;
01289             if (child+1<mark &&
01290                 compare(this->my_array[child].first,
01291                         this->my_array[child+1].first))
01292                 ++target;
01293             // target now has the higher priority child
01294             if (compare(this->my_array[target].first,
01295                         this->my_array[this->my_tail].first))
01296                 break;
01297             this->my_array[cur_pos].first = this->my_array[target].first;
01298             cur_pos = target;
01299             child = (cur_pos<<1)+1;
01300         }
01301         this->my_array[cur_pos].first = this->my_array[this->my_tail].first;
01302     }
01303 };
01304         
01306 
01309 template< typename T >
01310 class limiter_node : public graph_node, public receiver< T >, public sender< T > {
01311 public:
01312         
01313     typedef T input_type;
01314     typedef T output_type;
01315     typedef sender< input_type > predecessor_type;
01316     typedef receiver< output_type > successor_type;
01317         
01318 private:
01319         
01320     task *my_root_task;
01321     size_t my_threshold;
01322     size_t my_count;
01323     internal::predecessor_cache< T > my_predecessors;
01324     spin_mutex my_mutex;
01325     internal::broadcast_cache< T > my_successors;
01326     int init_decrement_predecessors;
01327 
01328     friend class internal::forward_task< limiter_node<T> >;
01329         
01330     // Let decrementer call decrement_counter()
01331     friend class internal::decrementer< limiter_node<T> >;
01332         
01333     void decrement_counter() {
01334         input_type v;
01335         
01336         // If we can't get / put an item immediately then drop the count
01337         if ( my_predecessors.get_item( v ) == false 
01338              || my_successors.try_put(v) == false ) {
01339             spin_mutex::scoped_lock lock(my_mutex);
01340             --my_count;
01341             if ( !my_predecessors.empty() ) 
01342                 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) ) 
01343                             internal::forward_task< limiter_node<T> >( *this ) );
01344         }
01345     }
01346         
01347     void forward() {
01348         {
01349             spin_mutex::scoped_lock lock(my_mutex);
01350             if ( my_count < my_threshold ) 
01351                 ++my_count;
01352             else
01353                 return;
01354         }
01355         decrement_counter();
01356     }
01357         
01358 public:
01359         
01361     internal::decrementer< limiter_node<T> > decrement;
01362         
01364     limiter_node(graph &g, size_t threshold, int num_decrement_predecessors=0) : 
01365         my_root_task(g.root_task()), my_threshold(threshold), my_count(0), 
01366         init_decrement_predecessors(num_decrement_predecessors), 
01367         decrement(num_decrement_predecessors) 
01368     {
01369         my_predecessors.set_owner(this);
01370         my_successors.set_owner(this);
01371         decrement.set_owner(this);
01372     }
01373         
01375     limiter_node( const limiter_node& src ) : 
01376         graph_node(), receiver<T>(), sender<T>(),
01377         my_root_task(src.my_root_task), my_threshold(src.my_threshold), my_count(0), 
01378         init_decrement_predecessors(src.init_decrement_predecessors), 
01379         decrement(src.init_decrement_predecessors) 
01380     {
01381         my_predecessors.set_owner(this);
01382         my_successors.set_owner(this);
01383         decrement.set_owner(this);
01384     }
01385 
01387     /* override */ bool register_successor( receiver<output_type> &r ) {
01388         my_successors.register_successor(r);
01389         return true;
01390     }
01391         
01393 
01394     /* override */ bool remove_successor( receiver<output_type> &r ) {
01395         r.remove_predecessor(*this);
01396         my_successors.remove_successor(r);
01397         return true;
01398     }
01399         
01401     /* override */ bool try_put( const T &t ) {
01402         {
01403             spin_mutex::scoped_lock lock(my_mutex);
01404             if ( my_count >= my_threshold ) 
01405                 return false;
01406             else
01407                 ++my_count; 
01408         }
01409         
01410         bool msg = my_successors.try_put(t);
01411         
01412         if ( msg != true ) {
01413             spin_mutex::scoped_lock lock(my_mutex);
01414             --my_count;
01415             if ( !my_predecessors.empty() ) 
01416                 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) ) 
01417                             internal::forward_task< limiter_node<T> >( *this ) );
01418         }
01419         
01420         return msg;
01421     }
01422         
01424     /* override */ bool register_predecessor( predecessor_type &src ) {
01425         spin_mutex::scoped_lock lock(my_mutex);
01426         my_predecessors.add( src );
01427         if ( my_count < my_threshold && !my_successors.empty() ) 
01428             task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) ) 
01429                            internal::forward_task< limiter_node<T> >( *this ) );
01430         return true;
01431     }
01432         
01434     /* override */ bool remove_predecessor( predecessor_type &src ) {
01435         my_predecessors.remove( src );
01436         return true;
01437     }
01438         
01439 };
01440 
01441 #include "internal/_flow_graph_join_impl.h"
01442 
01443 using internal::reserving_port;
01444 using internal::queueing_port;
01445 using internal::tag_matching_port;
01446 using internal::input_port;
01447 using internal::tag_value;
01448 using internal::NO_TAG;
01449 
01450 template<typename OutputTuple, graph_buffer_policy JP=queueing> class join_node;
01451 
01452 template<typename OutputTuple>
01453 class join_node<OutputTuple,reserving>: public internal::unfolded_join_node<std::tuple_size<OutputTuple>::value, reserving_port, OutputTuple, reserving> {
01454 private:
01455     static const int N = std::tuple_size<OutputTuple>::value;
01456     typedef typename internal::unfolded_join_node<N, reserving_port, OutputTuple, reserving> unfolded_type;
01457 public:
01458     typedef OutputTuple output_type;
01459     typedef typename unfolded_type::input_ports_tuple_type input_ports_tuple_type;
01460     join_node(graph &g) : unfolded_type(g) { }
01461     join_node(const join_node &other) : unfolded_type(other) {}
01462 };
01463 
01464 template<typename OutputTuple>
01465 class join_node<OutputTuple,queueing>: public internal::unfolded_join_node<std::tuple_size<OutputTuple>::value, queueing_port, OutputTuple, queueing> {
01466 private:
01467     static const int N = std::tuple_size<OutputTuple>::value;
01468     typedef typename internal::unfolded_join_node<N, queueing_port, OutputTuple, queueing> unfolded_type;
01469 public:
01470     typedef OutputTuple output_type;
01471     typedef typename unfolded_type::input_ports_tuple_type input_ports_tuple_type;
01472     join_node(graph &g) : unfolded_type(g) { }
01473     join_node(const join_node &other) : unfolded_type(other) {}
01474 };
01475 
01476 // template for tag_matching join_node
01477 template<typename OutputTuple>
01478 class join_node<OutputTuple, tag_matching> : public internal::unfolded_join_node<std::tuple_size<OutputTuple>::value,
01479       tag_matching_port, OutputTuple, tag_matching> {
01480 private:
01481     static const int N = std::tuple_size<OutputTuple>::value;
01482     typedef typename internal::unfolded_join_node<N, tag_matching_port, OutputTuple, tag_matching> unfolded_type;
01483 public:
01484     typedef OutputTuple output_type;
01485     typedef typename unfolded_type::input_ports_tuple_type input_ports_tuple_type;
01486     template<typename B0, typename B1>
01487     join_node(graph &g, B0 b0, B1 b1) : unfolded_type(g, b0, b1) { }
01488     template<typename B0, typename B1, typename B2>
01489     join_node(graph &g, B0 b0, B1 b1, B2 b2) : unfolded_type(g, b0, b1, b2) { }
01490     template<typename B0, typename B1, typename B2, typename B3>
01491     join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3) : unfolded_type(g, b0, b1, b2, b3) { }
01492     template<typename B0, typename B1, typename B2, typename B3, typename B4>
01493     join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4) : unfolded_type(g, b0, b1, b2, b3, b4) { }
01494     template<typename B0, typename B1, typename B2, typename B3, typename B4, typename B5>
01495     join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4, B5 b5) : unfolded_type(g, b0, b1, b2, b3, b4, b5) { }
01496     template<typename B0, typename B1, typename B2, typename B3, typename B4, typename B5, typename B6>
01497     join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4, B5 b5, B6 b6) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6) { }
01498     template<typename B0, typename B1, typename B2, typename B3, typename B4, typename B5, typename B6, typename B7>
01499     join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4, B5 b5, B6 b6, B7 b7) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7) { }
01500     template<typename B0, typename B1, typename B2, typename B3, typename B4, typename B5, typename B6, typename B7, typename B8>
01501     join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4, B5 b5, B6 b6, B7 b7, B8 b8) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8) { }
01502     template<typename B0, typename B1, typename B2, typename B3, typename B4, typename B5, typename B6, typename B7, typename B8, typename B9>
01503     join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4, B5 b5, B6 b6, B7 b7, B8 b8, B9 b9) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8, b9) { }
01504     join_node(const join_node &other) : unfolded_type(other) {}
01505 };
01506 
01507 #if TBB_PREVIEW_GRAPH_NODES
01508 // or node
01509 #include "internal/_flow_graph_or_impl.h"
01510 
01511 template<typename InputTuple>
01512 class or_node : public internal::unfolded_or_node<InputTuple> {
01513 private:
01514     static const int N = std::tuple_size<InputTuple>::value;
01515 public:
01516     typedef typename internal::or_output_type<InputTuple>::type output_type;
01517     typedef typename internal::unfolded_or_node<InputTuple> unfolded_type;
01518     or_node() : unfolded_type() { }
01519     // Copy constructor
01520     or_node( const or_node& /*other*/ ) : unfolded_type() { }
01521 };
01522 #endif  // TBB_PREVIEW_GRAPH_NODES
01523 
01525 template< typename T >
01526 inline void make_edge( sender<T> &p, receiver<T> &s ) {
01527     p.register_successor( s );
01528 }
01529         
01531 template< typename T >
01532 inline void remove_edge( sender<T> &p, receiver<T> &s ) {
01533     p.remove_successor( s );
01534 }
01535 
01537 template< typename Body, typename Node >
01538 Body copy_body( Node &n ) {
01539     return n.template copy_function_object<Body>();
01540 }
01541         
01542         
01543 } // interface6
01544 
01545     using interface6::graph;
01546     using interface6::graph_node;
01547     using interface6::continue_msg;
01548     using interface6::sender;
01549     using interface6::receiver;
01550     using interface6::continue_receiver;
01551 
01552     using interface6::source_node;
01553     using interface6::function_node;
01554 #if TBB_PREVIEW_GRAPH_NODES
01555     using interface6::multioutput_function_node;
01556     using interface6::split_node;
01557     using interface6::internal::output_port;
01558     using interface6::or_node;
01559 #endif
01560     using interface6::continue_node;
01561     using interface6::overwrite_node;
01562     using interface6::write_once_node;
01563     using interface6::broadcast_node;
01564     using interface6::buffer_node;
01565     using interface6::queue_node;
01566     using interface6::sequencer_node;
01567     using interface6::priority_queue_node;
01568     using interface6::limiter_node;
01569     using namespace interface6::internal::graph_policy_namespace;
01570     using interface6::join_node;
01571     using interface6::input_port;
01572     using interface6::copy_body; 
01573     using interface6::make_edge; 
01574     using interface6::remove_edge; 
01575     using interface6::internal::NO_TAG;
01576     using interface6::internal::tag_value;
01577 
01578 } // flow
01579 } // tbb
01580 
01581 #endif // __TBB_flow_graph_H

Copyright © 2005-2011 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.