pipeline.h

00001 /*
00002     Copyright 2005-2011 Intel Corporation.  All Rights Reserved.
00003 
00004     The source code contained or described herein and all documents related
00005     to the source code ("Material") are owned by Intel Corporation or its
00006     suppliers or licensors.  Title to the Material remains with Intel
00007     Corporation or its suppliers and licensors.  The Material is protected
00008     by worldwide copyright laws and treaty provisions.  No part of the
00009     Material may be used, copied, reproduced, modified, published, uploaded,
00010     posted, transmitted, distributed, or disclosed in any way without
00011     Intel's prior express written permission.
00012 
00013     No license under any patent, copyright, trade secret or other
00014     intellectual property right is granted to or conferred upon you by
00015     disclosure or delivery of the Materials, either expressly, by
00016     implication, inducement, estoppel or otherwise.  Any license under such
00017     intellectual property rights must be express and approved by Intel in
00018     writing.
00019 */
00020 
00021 #ifndef __TBB_pipeline_H 
00022 #define __TBB_pipeline_H 
00023 
00024 #include "atomic.h"
00025 #include "task.h"
00026 #include "tbb_allocator.h"
00027 #include <cstddef>
00028 
00029 namespace tbb {
00030 
00031 class pipeline;
00032 class filter;
00033 
00035 namespace internal {
00036 
00037 // The argument for PIPELINE_VERSION should be an integer between 2 and 9
00038 #define __TBB_PIPELINE_VERSION(x) (unsigned char)(x-2)<<1
00039 
00040 typedef unsigned long Token;
00041 typedef long tokendiff_t;
00042 class stage_task;
00043 class input_buffer;
00044 class pipeline_root_task;
00045 class pipeline_cleaner;
00046 
00047 } // namespace internal
00048 
00049 namespace interface6 {
00050     template<typename T, typename U> class filter_t;
00051 
00052     namespace internal {
00053         class pipeline_proxy;
00054     }
00055 }
00056 
00058 
00060 
00061 class filter: internal::no_copy {
00062 private:
00064     static filter* not_in_pipeline() {return reinterpret_cast<filter*>(intptr_t(-1));}
00065 protected:    
00067     static const unsigned char filter_is_serial = 0x1; 
00068 
00070 
00072     static const unsigned char filter_is_out_of_order = 0x1<<4;  
00073 
00075     static const unsigned char filter_is_bound = 0x1<<5;  
00076 
00078     static const unsigned char filter_may_emit_null = 0x1<<6;
00079 
00081     static const unsigned char exact_exception_propagation =
00082 #if TBB_USE_CAPTURED_EXCEPTION
00083             0x0;
00084 #else
00085             0x1<<7;
00086 #endif /* TBB_USE_CAPTURED_EXCEPTION */
00087 
00088     static const unsigned char current_version = __TBB_PIPELINE_VERSION(5);
00089     static const unsigned char version_mask = 0x7<<1; // bits 1-3 are for version
00090 public:
00091     enum mode {
00093         parallel = current_version | filter_is_out_of_order, 
00095         serial_in_order = current_version | filter_is_serial,
00097         serial_out_of_order = current_version | filter_is_serial | filter_is_out_of_order,
00099         serial = serial_in_order
00100     };
00101 protected:
00102     filter( bool is_serial_ ) : 
00103         next_filter_in_pipeline(not_in_pipeline()),
00104         my_input_buffer(NULL),
00105         my_filter_mode(static_cast<unsigned char>((is_serial_ ? serial : parallel) | exact_exception_propagation)),
00106         prev_filter_in_pipeline(not_in_pipeline()),
00107         my_pipeline(NULL),
00108         next_segment(NULL)
00109     {}
00110     
00111     filter( mode filter_mode ) :
00112         next_filter_in_pipeline(not_in_pipeline()),
00113         my_input_buffer(NULL),
00114         my_filter_mode(static_cast<unsigned char>(filter_mode | exact_exception_propagation)),
00115         prev_filter_in_pipeline(not_in_pipeline()),
00116         my_pipeline(NULL),
00117         next_segment(NULL)
00118     {}
00119 
00120     // signal end-of-input for concrete_filters
00121     void __TBB_EXPORTED_METHOD set_end_of_input();
00122 
00123 public:
00125     bool is_serial() const {
00126         return bool( my_filter_mode & filter_is_serial );
00127     }  
00128     
00130     bool is_ordered() const {
00131         return (my_filter_mode & (filter_is_out_of_order|filter_is_serial))==filter_is_serial;
00132     }
00133 
00135     bool is_bound() const {
00136         return ( my_filter_mode & filter_is_bound )==filter_is_bound;
00137     }
00138 
00140     bool object_may_be_null() { 
00141         return ( my_filter_mode & filter_may_emit_null ) == filter_may_emit_null;
00142     }
00143 
00145 
00146     virtual void* operator()( void* item ) = 0;
00147 
00149 
00150     virtual __TBB_EXPORTED_METHOD ~filter();
00151 
00152 #if __TBB_TASK_GROUP_CONTEXT
00154 
00156     virtual void finalize( void* /*item*/ ) {};
00157 #endif
00158 
00159 private:
00161     filter* next_filter_in_pipeline;
00162 
00164     //  (pipeline has not yet reached end_of_input or this filter has not yet
00165     //  seen the last token produced by input_filter)
00166     bool has_more_work();
00167 
00169 
00170     internal::input_buffer* my_input_buffer;
00171 
00172     friend class internal::stage_task;
00173     friend class internal::pipeline_root_task;
00174     friend class pipeline;
00175     friend class thread_bound_filter;
00176 
00178     const unsigned char my_filter_mode;
00179 
00181     filter* prev_filter_in_pipeline;
00182 
00184     pipeline* my_pipeline;
00185 
00187 
00188     filter* next_segment;
00189 };
00190 
00192 
00193 class thread_bound_filter: public filter {
00194 public:
00195     enum result_type {
00196         // item was processed
00197         success,
00198         // item is currently not available
00199         item_not_available,
00200         // there are no more items to process
00201         end_of_stream
00202     };
00203 protected:
00204     thread_bound_filter(mode filter_mode): 
00205          filter(static_cast<mode>(filter_mode | filter::filter_is_bound))
00206     {}
00207 public:
00209 
00214     result_type __TBB_EXPORTED_METHOD try_process_item(); 
00215 
00217 
00221     result_type __TBB_EXPORTED_METHOD process_item();
00222 
00223 private:
00225     result_type internal_process_item(bool is_blocking);
00226 };
00227 
00229 
00230 class pipeline {
00231 public:
00233     __TBB_EXPORTED_METHOD pipeline();
00234 
00237     virtual __TBB_EXPORTED_METHOD ~pipeline();
00238 
00240     void __TBB_EXPORTED_METHOD add_filter( filter& filter_ );
00241 
00243     void __TBB_EXPORTED_METHOD run( size_t max_number_of_live_tokens );
00244 
00245 #if __TBB_TASK_GROUP_CONTEXT
00247     void __TBB_EXPORTED_METHOD run( size_t max_number_of_live_tokens, tbb::task_group_context& context );
00248 #endif
00249 
00251     void __TBB_EXPORTED_METHOD clear();
00252 
00253 private:
00254     friend class internal::stage_task;
00255     friend class internal::pipeline_root_task;
00256     friend class filter;
00257     friend class thread_bound_filter;
00258     friend class internal::pipeline_cleaner;
00259     friend class tbb::interface6::internal::pipeline_proxy;
00260 
00262     filter* filter_list;
00263 
00265     filter* filter_end;
00266 
00268     task* end_counter;
00269 
00271     atomic<internal::Token> input_tokens;
00272 
00274     atomic<internal::Token> token_counter;
00275 
00277     bool end_of_input;
00278 
00280     bool has_thread_bound_filters;
00281 
00283     void remove_filter( filter& filter_ );
00284 
00286     void __TBB_EXPORTED_METHOD inject_token( task& self );
00287 
00288 #if __TBB_TASK_GROUP_CONTEXT
00290     void clear_filters();
00291 #endif
00292 };
00293 
00294 //------------------------------------------------------------------------
00295 // Support for lambda-friendly parallel_pipeline interface
00296 //------------------------------------------------------------------------
00297 
00298 namespace interface6 {
00299 
00300 namespace internal {
00301     template<typename T, typename U, typename Body> class concrete_filter;
00302 }
00303 
00305 class flow_control {
00306     bool is_pipeline_stopped;
00307     flow_control() { is_pipeline_stopped = false; }
00308     template<typename T, typename U, typename Body> friend class internal::concrete_filter;
00309 public:
00310     void stop() { is_pipeline_stopped = true; }
00311 };
00312 
00314 namespace internal {
00315 
00316 template<typename T> struct is_large_object { enum { r = sizeof(T) > sizeof(void *) }; };
00317 
00318 template<typename T, bool> class token_helper;
00319 
00320 // large object helper (uses tbb_allocator)
00321 template<typename T>
00322 class token_helper<T, true> {
00323     public:
00324     typedef typename tbb::tbb_allocator<T> allocator;
00325     typedef T* pointer;
00326     typedef T value_type;
00327     static pointer create_token(const value_type & source) {
00328         pointer output_t = allocator().allocate(1);
00329         return new (output_t) T(source);
00330     }
00331     static value_type & token(pointer & t) { return *t;}
00332     static void * cast_to_void_ptr(pointer ref) { return (void *) ref; }
00333     static pointer cast_from_void_ptr(void * ref) { return (pointer)ref; }
00334     static void destroy_token(pointer token) {
00335         allocator().destroy(token);
00336         allocator().deallocate(token,1);
00337     }
00338 };
00339 
00340 // pointer specialization
00341 template<typename T>
00342 class token_helper<T*, false > {
00343     public:
00344     typedef T* pointer;
00345     typedef T* value_type;
00346     static pointer create_token(const value_type & source) { return source; }
00347     static value_type & token(pointer & t) { return t;}
00348     static void * cast_to_void_ptr(pointer ref) { return (void *)ref; }
00349     static pointer cast_from_void_ptr(void * ref) { return (pointer)ref; }
00350     static void destroy_token( pointer /*token*/) {}
00351 };
00352 
00353 // small object specialization (converts void* to the correct type, passes objects directly.)
00354 template<typename T>
00355 class token_helper<T, false> {
00356     typedef union {
00357         T actual_value;
00358         void * void_overlay;
00359     } type_to_void_ptr_map;
00360     public:
00361     typedef T pointer;  // not really a pointer in this case.
00362     typedef T value_type;
00363     static pointer create_token(const value_type & source) {
00364         return source; }
00365     static value_type & token(pointer & t) { return t;}
00366     static void * cast_to_void_ptr(pointer ref) { 
00367         type_to_void_ptr_map mymap; 
00368         mymap.void_overlay = NULL;
00369         mymap.actual_value = ref; 
00370         return mymap.void_overlay; 
00371     }
00372     static pointer cast_from_void_ptr(void * ref) { 
00373         type_to_void_ptr_map mymap;
00374         mymap.void_overlay = ref;
00375         return mymap.actual_value;
00376     }
00377     static void destroy_token( pointer /*token*/) {}
00378 };
00379 
00380 template<typename T, typename U, typename Body>
00381 class concrete_filter: public tbb::filter {
00382     const Body& my_body;
00383     typedef token_helper<T,is_large_object<T>::r > t_helper;
00384     typedef typename t_helper::pointer t_pointer;
00385     typedef token_helper<U,is_large_object<U>::r > u_helper;
00386     typedef typename u_helper::pointer u_pointer;
00387 
00388     /*override*/ void* operator()(void* input) {
00389         t_pointer temp_input = t_helper::cast_from_void_ptr(input);
00390         u_pointer output_u = u_helper::create_token(my_body(t_helper::token(temp_input)));
00391         t_helper::destroy_token(temp_input);
00392         return u_helper::cast_to_void_ptr(output_u);
00393     }
00394 
00395 public:
00396     concrete_filter(tbb::filter::mode filter_mode, const Body& body) : filter(filter_mode), my_body(body) {}
00397 };
00398 
00399 // input 
00400 template<typename U, typename Body>
00401 class concrete_filter<void,U,Body>: public filter {
00402     const Body& my_body;
00403     typedef token_helper<U, is_large_object<U>::r > u_helper;
00404     typedef typename u_helper::pointer u_pointer;
00405 
00406     /*override*/void* operator()(void*) {
00407         flow_control control;
00408         u_pointer output_u = u_helper::create_token(my_body(control));
00409         if(control.is_pipeline_stopped) {
00410             u_helper::destroy_token(output_u);
00411             set_end_of_input();
00412             return NULL;
00413         }
00414         return u_helper::cast_to_void_ptr(output_u);
00415     }
00416 
00417 public:
00418     concrete_filter(tbb::filter::mode filter_mode, const Body& body) : 
00419         filter(static_cast<tbb::filter::mode>(filter_mode | filter_may_emit_null)),
00420         my_body(body)
00421     {}
00422 };
00423 
00424 template<typename T, typename Body>
00425 class concrete_filter<T,void,Body>: public filter {
00426     const Body& my_body;
00427     typedef token_helper<T, is_large_object<T>::r > t_helper;
00428     typedef typename t_helper::pointer t_pointer;
00429    
00430     /*override*/ void* operator()(void* input) {
00431         t_pointer temp_input = t_helper::cast_from_void_ptr(input);
00432         my_body(t_helper::token(temp_input));
00433         t_helper::destroy_token(temp_input);
00434         return NULL;
00435     }
00436 public:
00437     concrete_filter(tbb::filter::mode filter_mode, const Body& body) : filter(filter_mode), my_body(body) {}
00438 };
00439 
00440 template<typename Body>
00441 class concrete_filter<void,void,Body>: public filter {
00442     const Body& my_body;
00443     
00445     /*override*/ void* operator()(void*) {
00446         flow_control control;
00447         my_body(control);
00448         void* output = control.is_pipeline_stopped ? NULL : (void*)(intptr_t)-1; 
00449         return output;
00450     }
00451 public:
00452     concrete_filter(filter::mode filter_mode, const Body& body) : filter(filter_mode), my_body(body) {}
00453 };
00454 
00456 
00457 class pipeline_proxy {
00458     tbb::pipeline my_pipe;
00459 public:
00460     pipeline_proxy( const filter_t<void,void>& filter_chain );
00461     ~pipeline_proxy() {
00462         while( filter* f = my_pipe.filter_list ) 
00463             delete f; // filter destructor removes it from the pipeline
00464     }
00465     tbb::pipeline* operator->() { return &my_pipe; }
00466 };
00467 
00469 
00470 class filter_node: tbb::internal::no_copy {
00472     tbb::atomic<intptr_t> ref_count;
00473 protected:
00474     filter_node() {
00475         ref_count = 0;
00476 #ifdef __TBB_TEST_FILTER_NODE_COUNT
00477         ++(__TBB_TEST_FILTER_NODE_COUNT);
00478 #endif
00479     }
00480 public:
00482     virtual void add_to( pipeline& ) = 0;
00484     void add_ref() {++ref_count;}
00486     void remove_ref() {
00487         __TBB_ASSERT(ref_count>0,"ref_count underflow");
00488         if( --ref_count==0 ) 
00489             delete this;
00490     }
00491     virtual ~filter_node() {
00492 #ifdef __TBB_TEST_FILTER_NODE_COUNT
00493         --(__TBB_TEST_FILTER_NODE_COUNT);
00494 #endif
00495     }
00496 };
00497 
00499 template<typename T, typename U, typename Body>
00500 class filter_node_leaf: public filter_node  {
00501     const tbb::filter::mode mode;
00502     const Body body;
00503     /*override*/void add_to( pipeline& p ) {
00504         concrete_filter<T,U,Body>* f = new concrete_filter<T,U,Body>(mode,body);
00505         p.add_filter( *f );
00506     }
00507 public:
00508     filter_node_leaf( tbb::filter::mode m, const Body& b ) : mode(m), body(b) {}
00509 };
00510 
00512 class filter_node_join: public filter_node {
00513     friend class filter_node; // to suppress GCC 3.2 warnings
00514     filter_node& left;
00515     filter_node& right;
00516     /*override*/~filter_node_join() {
00517        left.remove_ref();
00518        right.remove_ref();
00519     }
00520     /*override*/void add_to( pipeline& p ) {
00521         left.add_to(p);
00522         right.add_to(p);
00523     }
00524 public:
00525     filter_node_join( filter_node& x, filter_node& y ) : left(x), right(y) {
00526        left.add_ref();
00527        right.add_ref();
00528     }
00529 };
00530 
00531 } // namespace internal
00533 
00535 template<typename T, typename U, typename Body>
00536 filter_t<T,U> make_filter(tbb::filter::mode mode, const Body& body) {
00537     return new internal::filter_node_leaf<T,U,Body>(mode, body);
00538 }
00539 
00540 template<typename T, typename V, typename U>
00541 filter_t<T,U> operator& (const filter_t<T,V>& left, const filter_t<V,U>& right) {
00542     __TBB_ASSERT(left.root,"cannot use default-constructed filter_t as left argument of '&'");
00543     __TBB_ASSERT(right.root,"cannot use default-constructed filter_t as right argument of '&'");
00544     return new internal::filter_node_join(*left.root,*right.root);
00545 }
00546 
00548 template<typename T, typename U>
00549 class filter_t {
00550     typedef internal::filter_node filter_node;
00551     filter_node* root;
00552     filter_t( filter_node* root_ ) : root(root_) {
00553         root->add_ref();
00554     }
00555     friend class internal::pipeline_proxy;
00556     template<typename T_, typename U_, typename Body>
00557     friend filter_t<T_,U_> make_filter(tbb::filter::mode, const Body& );
00558     template<typename T_, typename V_, typename U_>
00559     friend filter_t<T_,U_> operator& (const filter_t<T_,V_>& , const filter_t<V_,U_>& );
00560 public:
00561     filter_t() : root(NULL) {}
00562     filter_t( const filter_t<T,U>& rhs ) : root(rhs.root) {
00563         if( root ) root->add_ref();
00564     }
00565     template<typename Body>
00566     filter_t( tbb::filter::mode mode, const Body& body ) :
00567         root( new internal::filter_node_leaf<T,U,Body>(mode, body) ) {
00568         root->add_ref();
00569     }
00570 
00571     void operator=( const filter_t<T,U>& rhs ) {
00572         // Order of operations below carefully chosen so that reference counts remain correct
00573         // in unlikely event that remove_ref throws exception.
00574         filter_node* old = root;
00575         root = rhs.root; 
00576         if( root ) root->add_ref();
00577         if( old ) old->remove_ref();
00578     }
00579     ~filter_t() {
00580         if( root ) root->remove_ref();
00581     }
00582     void clear() {
00583         // Like operator= with filter_t() on right side.
00584         if( root ) {
00585             filter_node* old = root;
00586             root = NULL;
00587             old->remove_ref();
00588         }
00589     }
00590 };
00591 
00592 inline internal::pipeline_proxy::pipeline_proxy( const filter_t<void,void>& filter_chain ) : my_pipe() {
00593     __TBB_ASSERT( filter_chain.root, "cannot apply parallel_pipeline to default-constructed filter_t"  );
00594     filter_chain.root->add_to(my_pipe);
00595 }
00596 
00597 inline void parallel_pipeline(size_t max_number_of_live_tokens, const filter_t<void,void>& filter_chain
00598 #if __TBB_TASK_GROUP_CONTEXT
00599     , tbb::task_group_context& context
00600 #endif
00601     ) {
00602     internal::pipeline_proxy pipe(filter_chain);
00603     // tbb::pipeline::run() is called via the proxy
00604     pipe->run(max_number_of_live_tokens
00605 #if __TBB_TASK_GROUP_CONTEXT
00606               , context
00607 #endif
00608     );
00609 }
00610 
00611 #if __TBB_TASK_GROUP_CONTEXT
00612 inline void parallel_pipeline(size_t max_number_of_live_tokens, const filter_t<void,void>& filter_chain) {
00613     tbb::task_group_context context;
00614     parallel_pipeline(max_number_of_live_tokens, filter_chain, context);
00615 }
00616 #endif // __TBB_TASK_GROUP_CONTEXT
00617 
00618 } // interface6
00619 
00620 using interface6::flow_control;
00621 using interface6::filter_t;
00622 using interface6::make_filter;
00623 using interface6::parallel_pipeline;
00624 
00625 } // tbb
00626 
00627 #endif /* __TBB_pipeline_H */

Copyright © 2005-2011 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.