00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #ifndef __TBB_pipeline_H
00022 #define __TBB_pipeline_H
00023
00024 #include "atomic.h"
00025 #include "task.h"
00026 #include "tbb_allocator.h"
00027 #include <cstddef>
00028
00029 namespace tbb {
00030
00031 class pipeline;
00032 class filter;
00033
00035 namespace internal {
00036
00037
00038 #define __TBB_PIPELINE_VERSION(x) (unsigned char)(x-2)<<1
00039
00040 typedef unsigned long Token;
00041 typedef long tokendiff_t;
00042 class stage_task;
00043 class input_buffer;
00044 class pipeline_root_task;
00045 class pipeline_cleaner;
00046
00047 }
00048
00049 namespace interface6 {
00050 template<typename T, typename U> class filter_t;
00051
00052 namespace internal {
00053 class pipeline_proxy;
00054 }
00055 }
00056
00058
00060
00061 class filter: internal::no_copy {
00062 private:
00064 static filter* not_in_pipeline() {return reinterpret_cast<filter*>(intptr_t(-1));}
00065 protected:
00067 static const unsigned char filter_is_serial = 0x1;
00068
00070
00072 static const unsigned char filter_is_out_of_order = 0x1<<4;
00073
00075 static const unsigned char filter_is_bound = 0x1<<5;
00076
00078 static const unsigned char filter_may_emit_null = 0x1<<6;
00079
00081 static const unsigned char exact_exception_propagation =
00082 #if TBB_USE_CAPTURED_EXCEPTION
00083 0x0;
00084 #else
00085 0x1<<7;
00086 #endif
00087
00088 static const unsigned char current_version = __TBB_PIPELINE_VERSION(5);
00089 static const unsigned char version_mask = 0x7<<1;
00090 public:
00091 enum mode {
00093 parallel = current_version | filter_is_out_of_order,
00095 serial_in_order = current_version | filter_is_serial,
00097 serial_out_of_order = current_version | filter_is_serial | filter_is_out_of_order,
00099 serial = serial_in_order
00100 };
00101 protected:
00102 filter( bool is_serial_ ) :
00103 next_filter_in_pipeline(not_in_pipeline()),
00104 my_input_buffer(NULL),
00105 my_filter_mode(static_cast<unsigned char>((is_serial_ ? serial : parallel) | exact_exception_propagation)),
00106 prev_filter_in_pipeline(not_in_pipeline()),
00107 my_pipeline(NULL),
00108 next_segment(NULL)
00109 {}
00110
00111 filter( mode filter_mode ) :
00112 next_filter_in_pipeline(not_in_pipeline()),
00113 my_input_buffer(NULL),
00114 my_filter_mode(static_cast<unsigned char>(filter_mode | exact_exception_propagation)),
00115 prev_filter_in_pipeline(not_in_pipeline()),
00116 my_pipeline(NULL),
00117 next_segment(NULL)
00118 {}
00119
00120
00121 void __TBB_EXPORTED_METHOD set_end_of_input();
00122
00123 public:
00125 bool is_serial() const {
00126 return bool( my_filter_mode & filter_is_serial );
00127 }
00128
00130 bool is_ordered() const {
00131 return (my_filter_mode & (filter_is_out_of_order|filter_is_serial))==filter_is_serial;
00132 }
00133
00135 bool is_bound() const {
00136 return ( my_filter_mode & filter_is_bound )==filter_is_bound;
00137 }
00138
00140 bool object_may_be_null() {
00141 return ( my_filter_mode & filter_may_emit_null ) == filter_may_emit_null;
00142 }
00143
00145
00146 virtual void* operator()( void* item ) = 0;
00147
00149
00150 virtual __TBB_EXPORTED_METHOD ~filter();
00151
00152 #if __TBB_TASK_GROUP_CONTEXT
00154
00156 virtual void finalize( void* ) {};
00157 #endif
00158
00159 private:
00161 filter* next_filter_in_pipeline;
00162
00164
00165
00166 bool has_more_work();
00167
00169
00170 internal::input_buffer* my_input_buffer;
00171
00172 friend class internal::stage_task;
00173 friend class internal::pipeline_root_task;
00174 friend class pipeline;
00175 friend class thread_bound_filter;
00176
00178 const unsigned char my_filter_mode;
00179
00181 filter* prev_filter_in_pipeline;
00182
00184 pipeline* my_pipeline;
00185
00187
00188 filter* next_segment;
00189 };
00190
00192
00193 class thread_bound_filter: public filter {
00194 public:
00195 enum result_type {
00196
00197 success,
00198
00199 item_not_available,
00200
00201 end_of_stream
00202 };
00203 protected:
00204 thread_bound_filter(mode filter_mode):
00205 filter(static_cast<mode>(filter_mode | filter::filter_is_bound))
00206 {}
00207 public:
00209
00214 result_type __TBB_EXPORTED_METHOD try_process_item();
00215
00217
00221 result_type __TBB_EXPORTED_METHOD process_item();
00222
00223 private:
00225 result_type internal_process_item(bool is_blocking);
00226 };
00227
00229
00230 class pipeline {
00231 public:
00233 __TBB_EXPORTED_METHOD pipeline();
00234
00237 virtual __TBB_EXPORTED_METHOD ~pipeline();
00238
00240 void __TBB_EXPORTED_METHOD add_filter( filter& filter_ );
00241
00243 void __TBB_EXPORTED_METHOD run( size_t max_number_of_live_tokens );
00244
00245 #if __TBB_TASK_GROUP_CONTEXT
00247 void __TBB_EXPORTED_METHOD run( size_t max_number_of_live_tokens, tbb::task_group_context& context );
00248 #endif
00249
00251 void __TBB_EXPORTED_METHOD clear();
00252
00253 private:
00254 friend class internal::stage_task;
00255 friend class internal::pipeline_root_task;
00256 friend class filter;
00257 friend class thread_bound_filter;
00258 friend class internal::pipeline_cleaner;
00259 friend class tbb::interface6::internal::pipeline_proxy;
00260
00262 filter* filter_list;
00263
00265 filter* filter_end;
00266
00268 task* end_counter;
00269
00271 atomic<internal::Token> input_tokens;
00272
00274 atomic<internal::Token> token_counter;
00275
00277 bool end_of_input;
00278
00280 bool has_thread_bound_filters;
00281
00283 void remove_filter( filter& filter_ );
00284
00286 void __TBB_EXPORTED_METHOD inject_token( task& self );
00287
00288 #if __TBB_TASK_GROUP_CONTEXT
00290 void clear_filters();
00291 #endif
00292 };
00293
00294
00295
00296
00297
00298 namespace interface6 {
00299
00300 namespace internal {
00301 template<typename T, typename U, typename Body> class concrete_filter;
00302 }
00303
00305 class flow_control {
00306 bool is_pipeline_stopped;
00307 flow_control() { is_pipeline_stopped = false; }
00308 template<typename T, typename U, typename Body> friend class internal::concrete_filter;
00309 public:
00310 void stop() { is_pipeline_stopped = true; }
00311 };
00312
00314 namespace internal {
00315
00316 template<typename T> struct is_large_object { enum { r = sizeof(T) > sizeof(void *) }; };
00317
00318 template<typename T, bool> class token_helper;
00319
00320
00321 template<typename T>
00322 class token_helper<T, true> {
00323 public:
00324 typedef typename tbb::tbb_allocator<T> allocator;
00325 typedef T* pointer;
00326 typedef T value_type;
00327 static pointer create_token(const value_type & source) {
00328 pointer output_t = allocator().allocate(1);
00329 return new (output_t) T(source);
00330 }
00331 static value_type & token(pointer & t) { return *t;}
00332 static void * cast_to_void_ptr(pointer ref) { return (void *) ref; }
00333 static pointer cast_from_void_ptr(void * ref) { return (pointer)ref; }
00334 static void destroy_token(pointer token) {
00335 allocator().destroy(token);
00336 allocator().deallocate(token,1);
00337 }
00338 };
00339
00340
00341 template<typename T>
00342 class token_helper<T*, false > {
00343 public:
00344 typedef T* pointer;
00345 typedef T* value_type;
00346 static pointer create_token(const value_type & source) { return source; }
00347 static value_type & token(pointer & t) { return t;}
00348 static void * cast_to_void_ptr(pointer ref) { return (void *)ref; }
00349 static pointer cast_from_void_ptr(void * ref) { return (pointer)ref; }
00350 static void destroy_token( pointer ) {}
00351 };
00352
00353
00354 template<typename T>
00355 class token_helper<T, false> {
00356 typedef union {
00357 T actual_value;
00358 void * void_overlay;
00359 } type_to_void_ptr_map;
00360 public:
00361 typedef T pointer;
00362 typedef T value_type;
00363 static pointer create_token(const value_type & source) {
00364 return source; }
00365 static value_type & token(pointer & t) { return t;}
00366 static void * cast_to_void_ptr(pointer ref) {
00367 type_to_void_ptr_map mymap;
00368 mymap.void_overlay = NULL;
00369 mymap.actual_value = ref;
00370 return mymap.void_overlay;
00371 }
00372 static pointer cast_from_void_ptr(void * ref) {
00373 type_to_void_ptr_map mymap;
00374 mymap.void_overlay = ref;
00375 return mymap.actual_value;
00376 }
00377 static void destroy_token( pointer ) {}
00378 };
00379
00380 template<typename T, typename U, typename Body>
00381 class concrete_filter: public tbb::filter {
00382 const Body& my_body;
00383 typedef token_helper<T,is_large_object<T>::r > t_helper;
00384 typedef typename t_helper::pointer t_pointer;
00385 typedef token_helper<U,is_large_object<U>::r > u_helper;
00386 typedef typename u_helper::pointer u_pointer;
00387
00388 void* operator()(void* input) {
00389 t_pointer temp_input = t_helper::cast_from_void_ptr(input);
00390 u_pointer output_u = u_helper::create_token(my_body(t_helper::token(temp_input)));
00391 t_helper::destroy_token(temp_input);
00392 return u_helper::cast_to_void_ptr(output_u);
00393 }
00394
00395 public:
00396 concrete_filter(tbb::filter::mode filter_mode, const Body& body) : filter(filter_mode), my_body(body) {}
00397 };
00398
00399
00400 template<typename U, typename Body>
00401 class concrete_filter<void,U,Body>: public filter {
00402 const Body& my_body;
00403 typedef token_helper<U, is_large_object<U>::r > u_helper;
00404 typedef typename u_helper::pointer u_pointer;
00405
00406 void* operator()(void*) {
00407 flow_control control;
00408 u_pointer output_u = u_helper::create_token(my_body(control));
00409 if(control.is_pipeline_stopped) {
00410 u_helper::destroy_token(output_u);
00411 set_end_of_input();
00412 return NULL;
00413 }
00414 return u_helper::cast_to_void_ptr(output_u);
00415 }
00416
00417 public:
00418 concrete_filter(tbb::filter::mode filter_mode, const Body& body) :
00419 filter(static_cast<tbb::filter::mode>(filter_mode | filter_may_emit_null)),
00420 my_body(body)
00421 {}
00422 };
00423
00424 template<typename T, typename Body>
00425 class concrete_filter<T,void,Body>: public filter {
00426 const Body& my_body;
00427 typedef token_helper<T, is_large_object<T>::r > t_helper;
00428 typedef typename t_helper::pointer t_pointer;
00429
00430 void* operator()(void* input) {
00431 t_pointer temp_input = t_helper::cast_from_void_ptr(input);
00432 my_body(t_helper::token(temp_input));
00433 t_helper::destroy_token(temp_input);
00434 return NULL;
00435 }
00436 public:
00437 concrete_filter(tbb::filter::mode filter_mode, const Body& body) : filter(filter_mode), my_body(body) {}
00438 };
00439
00440 template<typename Body>
00441 class concrete_filter<void,void,Body>: public filter {
00442 const Body& my_body;
00443
00445 void* operator()(void*) {
00446 flow_control control;
00447 my_body(control);
00448 void* output = control.is_pipeline_stopped ? NULL : (void*)(intptr_t)-1;
00449 return output;
00450 }
00451 public:
00452 concrete_filter(filter::mode filter_mode, const Body& body) : filter(filter_mode), my_body(body) {}
00453 };
00454
00456
00457 class pipeline_proxy {
00458 tbb::pipeline my_pipe;
00459 public:
00460 pipeline_proxy( const filter_t<void,void>& filter_chain );
00461 ~pipeline_proxy() {
00462 while( filter* f = my_pipe.filter_list )
00463 delete f;
00464 }
00465 tbb::pipeline* operator->() { return &my_pipe; }
00466 };
00467
00469
00470 class filter_node: tbb::internal::no_copy {
00472 tbb::atomic<intptr_t> ref_count;
00473 protected:
00474 filter_node() {
00475 ref_count = 0;
00476 #ifdef __TBB_TEST_FILTER_NODE_COUNT
00477 ++(__TBB_TEST_FILTER_NODE_COUNT);
00478 #endif
00479 }
00480 public:
00482 virtual void add_to( pipeline& ) = 0;
00484 void add_ref() {++ref_count;}
00486 void remove_ref() {
00487 __TBB_ASSERT(ref_count>0,"ref_count underflow");
00488 if( --ref_count==0 )
00489 delete this;
00490 }
00491 virtual ~filter_node() {
00492 #ifdef __TBB_TEST_FILTER_NODE_COUNT
00493 --(__TBB_TEST_FILTER_NODE_COUNT);
00494 #endif
00495 }
00496 };
00497
00499 template<typename T, typename U, typename Body>
00500 class filter_node_leaf: public filter_node {
00501 const tbb::filter::mode mode;
00502 const Body body;
00503 void add_to( pipeline& p ) {
00504 concrete_filter<T,U,Body>* f = new concrete_filter<T,U,Body>(mode,body);
00505 p.add_filter( *f );
00506 }
00507 public:
00508 filter_node_leaf( tbb::filter::mode m, const Body& b ) : mode(m), body(b) {}
00509 };
00510
00512 class filter_node_join: public filter_node {
00513 friend class filter_node;
00514 filter_node& left;
00515 filter_node& right;
00516 ~filter_node_join() {
00517 left.remove_ref();
00518 right.remove_ref();
00519 }
00520 void add_to( pipeline& p ) {
00521 left.add_to(p);
00522 right.add_to(p);
00523 }
00524 public:
00525 filter_node_join( filter_node& x, filter_node& y ) : left(x), right(y) {
00526 left.add_ref();
00527 right.add_ref();
00528 }
00529 };
00530
00531 }
00533
00535 template<typename T, typename U, typename Body>
00536 filter_t<T,U> make_filter(tbb::filter::mode mode, const Body& body) {
00537 return new internal::filter_node_leaf<T,U,Body>(mode, body);
00538 }
00539
00540 template<typename T, typename V, typename U>
00541 filter_t<T,U> operator& (const filter_t<T,V>& left, const filter_t<V,U>& right) {
00542 __TBB_ASSERT(left.root,"cannot use default-constructed filter_t as left argument of '&'");
00543 __TBB_ASSERT(right.root,"cannot use default-constructed filter_t as right argument of '&'");
00544 return new internal::filter_node_join(*left.root,*right.root);
00545 }
00546
00548 template<typename T, typename U>
00549 class filter_t {
00550 typedef internal::filter_node filter_node;
00551 filter_node* root;
00552 filter_t( filter_node* root_ ) : root(root_) {
00553 root->add_ref();
00554 }
00555 friend class internal::pipeline_proxy;
00556 template<typename T_, typename U_, typename Body>
00557 friend filter_t<T_,U_> make_filter(tbb::filter::mode, const Body& );
00558 template<typename T_, typename V_, typename U_>
00559 friend filter_t<T_,U_> operator& (const filter_t<T_,V_>& , const filter_t<V_,U_>& );
00560 public:
00561 filter_t() : root(NULL) {}
00562 filter_t( const filter_t<T,U>& rhs ) : root(rhs.root) {
00563 if( root ) root->add_ref();
00564 }
00565 template<typename Body>
00566 filter_t( tbb::filter::mode mode, const Body& body ) :
00567 root( new internal::filter_node_leaf<T,U,Body>(mode, body) ) {
00568 root->add_ref();
00569 }
00570
00571 void operator=( const filter_t<T,U>& rhs ) {
00572
00573
00574 filter_node* old = root;
00575 root = rhs.root;
00576 if( root ) root->add_ref();
00577 if( old ) old->remove_ref();
00578 }
00579 ~filter_t() {
00580 if( root ) root->remove_ref();
00581 }
00582 void clear() {
00583
00584 if( root ) {
00585 filter_node* old = root;
00586 root = NULL;
00587 old->remove_ref();
00588 }
00589 }
00590 };
00591
00592 inline internal::pipeline_proxy::pipeline_proxy( const filter_t<void,void>& filter_chain ) : my_pipe() {
00593 __TBB_ASSERT( filter_chain.root, "cannot apply parallel_pipeline to default-constructed filter_t" );
00594 filter_chain.root->add_to(my_pipe);
00595 }
00596
00597 inline void parallel_pipeline(size_t max_number_of_live_tokens, const filter_t<void,void>& filter_chain
00598 #if __TBB_TASK_GROUP_CONTEXT
00599 , tbb::task_group_context& context
00600 #endif
00601 ) {
00602 internal::pipeline_proxy pipe(filter_chain);
00603
00604 pipe->run(max_number_of_live_tokens
00605 #if __TBB_TASK_GROUP_CONTEXT
00606 , context
00607 #endif
00608 );
00609 }
00610
00611 #if __TBB_TASK_GROUP_CONTEXT
00612 inline void parallel_pipeline(size_t max_number_of_live_tokens, const filter_t<void,void>& filter_chain) {
00613 tbb::task_group_context context;
00614 parallel_pipeline(max_number_of_live_tokens, filter_chain, context);
00615 }
00616 #endif // __TBB_TASK_GROUP_CONTEXT
00617
00618 }
00619
00620 using interface6::flow_control;
00621 using interface6::filter_t;
00622 using interface6::make_filter;
00623 using interface6::parallel_pipeline;
00624
00625 }
00626
00627 #endif