concurrent_hash_map.h

00001 /*
00002     Copyright 2005-2009 Intel Corporation.  All Rights Reserved.
00003 
00004     The source code contained or described herein and all documents related
00005     to the source code ("Material") are owned by Intel Corporation or its
00006     suppliers or licensors.  Title to the Material remains with Intel
00007     Corporation or its suppliers and licensors.  The Material is protected
00008     by worldwide copyright laws and treaty provisions.  No part of the
00009     Material may be used, copied, reproduced, modified, published, uploaded,
00010     posted, transmitted, distributed, or disclosed in any way without
00011     Intel's prior express written permission.
00012 
00013     No license under any patent, copyright, trade secret or other
00014     intellectual property right is granted to or conferred upon you by
00015     disclosure or delivery of the Materials, either expressly, by
00016     implication, inducement, estoppel or otherwise.  Any license under such
00017     intellectual property rights must be express and approved by Intel in
00018     writing.
00019 */
00020 
00021 #ifndef __TBB_concurrent_hash_map_H
00022 #define __TBB_concurrent_hash_map_H
00023 
00024 #include <stdexcept>
00025 #include <iterator>
00026 #include <utility>      // Need std::pair
00027 #include <cstring>      // Need std::memset
00028 #include <string>
00029 #include "tbb_stddef.h"
00030 #include "cache_aligned_allocator.h"
00031 #include "tbb_allocator.h"
00032 #include "spin_rw_mutex.h"
00033 #include "atomic.h"
00034 #include "aligned_space.h"
00035 #if TBB_USE_PERFORMANCE_WARNINGS
00036 #include <typeinfo>
00037 #endif
00038 
00039 namespace tbb {
00040 
00041 template<typename T> struct tbb_hash_compare;
00042 template<typename Key, typename T, typename HashCompare = tbb_hash_compare<Key>, typename A = tbb_allocator<std::pair<Key, T> > >
00043 class concurrent_hash_map;
00044 
00046 namespace internal {
00048     void* __TBB_EXPORTED_FUNC itt_load_pointer_with_acquire_v3( const void* src );
00050     void __TBB_EXPORTED_FUNC itt_store_pointer_with_release_v3( void* dst, void* src );
00052     void* __TBB_EXPORTED_FUNC itt_load_pointer_v3( const void* src );
00053 
00055     typedef size_t hashcode_t;
00057     struct hash_map_node_base : no_copy {
00059         typedef spin_rw_mutex mutex_t;
00061         typedef mutex_t::scoped_lock scoped_t;
00063         hash_map_node_base *next;
00064         mutex_t mutex;
00065     };
00067     static hash_map_node_base *const rehash_req = reinterpret_cast<hash_map_node_base*>(size_t(3));
00069     static hash_map_node_base *const empty_rehashed = reinterpret_cast<hash_map_node_base*>(size_t(0));
00071     class hash_map_base {
00072     public:
00074         typedef size_t size_type;
00076         typedef size_t hashcode_t;
00078         typedef size_t segment_index_t;
00080         typedef hash_map_node_base node_base;
00082         struct bucket : no_copy {
00084             typedef spin_rw_mutex mutex_t;
00086             typedef mutex_t::scoped_lock scoped_t;
00087             mutex_t mutex;
00088             node_base *node_list;
00089         };
00091         static size_type const embedded_block = 1;
00093         static size_type const embedded_buckets = 1<<embedded_block;
00095         static size_type const first_block = 8; //including embedded_block. perfect with bucket size 16, so the allocations are power of 4096
00097         static size_type const pointers_per_table = sizeof(segment_index_t) * 8; // one segment per bit
00099         typedef bucket *segment_ptr_t;
00101         typedef segment_ptr_t segments_table_t[pointers_per_table];
00103         atomic<hashcode_t> my_mask;
00105         segments_table_t my_table;
00107         atomic<size_type> my_size; // It must be in separate cache line from my_mask due to performance effects
00109         bucket my_embedded_segment[embedded_buckets];
00110 
00112         hash_map_base() {
00113             std::memset( this, 0, pointers_per_table*sizeof(segment_ptr_t) // 32*4=128   or 64*8=512
00114                 + sizeof(my_size) + sizeof(my_mask)  // 4+4 or 8+8
00115                 + embedded_buckets*sizeof(bucket) ); // n*8 or n*16
00116             for( size_type i = 0; i < embedded_block; i++ ) // fill the table
00117                 my_table[i] = my_embedded_segment + segment_base(i);
00118             my_mask = embedded_buckets - 1;
00119             __TBB_ASSERT( embedded_block <= first_block, "The first block number must include embedded blocks");
00120         }
00121 
00123         static segment_index_t segment_index_of( size_type index ) {
00124             return segment_index_t( __TBB_Log2( index|1 ) );
00125         }
00126 
00128         static segment_index_t segment_base( segment_index_t k ) {
00129             return (segment_index_t(1)<<k & ~segment_index_t(1));
00130         }
00131 
00133         static size_type segment_size( segment_index_t k ) {
00134             return size_type(1)<<k; // fake value for k==0
00135         }
00136         
00138         static bool is_valid( void *ptr ) {
00139             return reinterpret_cast<size_t>(ptr) > size_t(63);
00140         }
00141 
00143         static void init_buckets( segment_ptr_t ptr, size_type sz, bool is_initial ) {
00144             if( is_initial ) std::memset(ptr, 0, sz*sizeof(bucket) );
00145             else for(size_type i = 0; i < sz; i++, ptr++) {
00146                     *reinterpret_cast<intptr_t*>(&ptr->mutex) = 0;
00147                     ptr->node_list = rehash_req;
00148                 }
00149         }
00150         
00152         static void add_to_bucket( bucket *b, node_base *n ) {
00153             __TBB_ASSERT(b->node_list != rehash_req, NULL);
00154             n->next = b->node_list;
00155             b->node_list = n; // its under lock and flag is set
00156         }
00157 
00159         struct enable_segment_failsafe {
00160             segment_ptr_t *my_segment_ptr;
00161             enable_segment_failsafe(segments_table_t &table, segment_index_t k) : my_segment_ptr(&table[k]) {}
00162             ~enable_segment_failsafe() {
00163                 if( my_segment_ptr ) *my_segment_ptr = 0; // indicate no allocation in progress
00164             }
00165         };
00166 
00168         void enable_segment( segment_index_t k, bool is_initial = false ) {
00169             __TBB_ASSERT( k, "Zero segment must be embedded" );
00170             enable_segment_failsafe watchdog( my_table, k );
00171             cache_aligned_allocator<bucket> alloc;
00172             size_type sz;
00173             __TBB_ASSERT( !is_valid(my_table[k]), "Wrong concurrent assignment");
00174             if( k >= first_block ) {
00175                 sz = segment_size( k );
00176                 segment_ptr_t ptr = alloc.allocate( sz );
00177                 init_buckets( ptr, sz, is_initial );
00178 #if TBB_USE_THREADING_TOOLS
00179                 // TODO: actually, fence and notification are unnecessary here and below
00180                 itt_store_pointer_with_release_v3( my_table + k, ptr );
00181 #else
00182                 my_table[k] = ptr;// my_mask has release fence
00183 #endif
00184                 sz <<= 1;// double it to get entire capacity of the container
00185             } else { // the first block
00186                 __TBB_ASSERT( k == embedded_block, "Wrong segment index" );
00187                 sz = segment_size( first_block );
00188                 segment_ptr_t ptr = alloc.allocate( sz - embedded_buckets );
00189                 init_buckets( ptr, sz - embedded_buckets, is_initial );
00190                 ptr -= segment_base(embedded_block);
00191                 for(segment_index_t i = embedded_block; i < first_block; i++) // calc the offsets
00192 #if TBB_USE_THREADING_TOOLS
00193                     itt_store_pointer_with_release_v3( my_table + i, ptr + segment_base(i) );
00194 #else
00195                     my_table[i] = ptr + segment_base(i);
00196 #endif
00197             }
00198 #if TBB_USE_THREADING_TOOLS
00199             itt_store_pointer_with_release_v3( &my_mask, (void*)(sz-1) );
00200 #else
00201             my_mask = sz - 1;
00202 #endif
00203             watchdog.my_segment_ptr = 0;
00204         }
00205 
00207         bucket *get_bucket( hashcode_t h ) const throw() { // TODO: add throw() everywhere?
00208             segment_index_t s = segment_index_of( h );
00209             h -= segment_base(s);
00210             segment_ptr_t seg = my_table[s];
00211             __TBB_ASSERT( is_valid(seg), "hashcode must be cut by valid mask for allocated segments" );
00212             return &seg[h];
00213         }
00214 
00216         // Splitting into two functions should help inlining
00217         inline bool check_mask_race( const hashcode_t h, hashcode_t &m ) const {
00218             hashcode_t m_now, m_old = m;
00219 #if TBB_USE_THREADING_TOOLS
00220             m_now = (hashcode_t) itt_load_pointer_with_acquire_v3( &my_mask );
00221 #else
00222             m_now = my_mask;
00223 #endif
00224             if( m_old != m_now )
00225                 return check_rehashing_collision( h, m_old, m = m_now );
00226             return false;
00227         }
00228 
00230         bool check_rehashing_collision( const hashcode_t h, hashcode_t m_old, hashcode_t m ) const {
00231             __TBB_ASSERT(m_old != m, NULL); // TODO?: m arg could be optimized out by passing h = h&m
00232             if( (h & m_old) != (h & m) ) { // mask changed for this hashcode, rare event
00233                 // condition above proves that 'h' has some other bits set beside 'm_old'
00234                 // find next applicable mask after m_old    //TODO: look at bsl instruction
00235                 for( ++m_old; !(h & m_old); m_old <<= 1 ); // at maximum few rounds depending on the first block size
00236                 m_old = (m_old<<1) - 1; // get full mask from a bit
00237                 __TBB_ASSERT((m_old&(m_old+1))==0 && m_old <= m, NULL);
00238                 // check whether it is rehashing/ed
00239 #if TBB_USE_THREADING_TOOLS
00240                 if( itt_load_pointer_with_acquire_v3(&( get_bucket(h & m_old)->node_list )) != rehash_req )
00241 #else
00242                 if( __TBB_load_with_acquire(get_bucket( h & m_old )->node_list) != rehash_req )
00243 #endif
00244                     return true;
00245             }
00246             return false;
00247         }
00248 
00250         segment_index_t insert_new_node( bucket *b, node_base *n, hashcode_t mask ) {
00251             size_type sz = ++my_size; // prefix form is to enforce allocation after the first item inserted
00252             add_to_bucket( b, n );
00253             // check load factor
00254             if( sz >= mask ) { // TODO: add custom load_factor 
00255                 segment_index_t new_seg = segment_index_of( mask+1 );
00256                 __TBB_ASSERT( is_valid(my_table[new_seg-1]), "new allocations must not publish new mask until segment has allocated");
00257 #if TBB_USE_THREADING_TOOLS
00258                 if( !itt_load_pointer_v3(my_table+new_seg)
00259 #else
00260                 if( !my_table[new_seg]
00261 #endif
00262                   && __TBB_CompareAndSwapW(&my_table[new_seg], 2, 0) == 0 )
00263                     return new_seg; // The value must be processed
00264             }
00265             return 0;
00266         }
00267 
00269         void reserve(size_type buckets) {
00270             if( !buckets-- ) return;
00271             bool is_initial = !my_size;
00272             for( size_type m = my_mask; buckets > m; m = my_mask )
00273                 enable_segment( segment_index_of( m+1 ), is_initial );
00274         }
00276         void internal_swap(hash_map_base &table) {
00277             std::swap(this->my_mask, table.my_mask);
00278             std::swap(this->my_size, table.my_size);
00279             for(size_type i = 0; i < embedded_buckets; i++)
00280                 std::swap(this->my_embedded_segment[i].node_list, table.my_embedded_segment[i].node_list);
00281             for(size_type i = embedded_block; i < pointers_per_table; i++)
00282                 std::swap(this->my_table[i], table.my_table[i]);
00283         }
00284     };
00285 
00286     template<typename Iterator>
00287     class hash_map_range;
00288 
00290 
00292     template<typename Container, typename Value>
00293     class hash_map_iterator
00294         : public std::iterator<std::forward_iterator_tag,Value>
00295     {
00296         typedef Container map_type;
00297         typedef typename Container::node node;
00298         typedef hash_map_base::node_base node_base;
00299         typedef hash_map_base::bucket bucket;
00300 
00301         template<typename C, typename T, typename U>
00302         friend bool operator==( const hash_map_iterator<C,T>& i, const hash_map_iterator<C,U>& j );
00303 
00304         template<typename C, typename T, typename U>
00305         friend bool operator!=( const hash_map_iterator<C,T>& i, const hash_map_iterator<C,U>& j );
00306 
00307         template<typename C, typename T, typename U>
00308         friend ptrdiff_t operator-( const hash_map_iterator<C,T>& i, const hash_map_iterator<C,U>& j );
00309     
00310         template<typename C, typename U>
00311         friend class internal::hash_map_iterator;
00312 
00313         template<typename I>
00314         friend class internal::hash_map_range;
00315 
00316         void advance_to_next_bucket() { // TODO?: refactor to iterator_base class
00317             size_t k = my_index+1;
00318             while( my_bucket && k <= my_map->my_mask ) {
00319                 // Following test uses 2's-complement wizardry
00320                 if( k& (k-2) ) // not the beginning of a segment
00321                     ++my_bucket;
00322                 else my_bucket = my_map->get_bucket( k );
00323                 my_node = static_cast<node*>( my_bucket->node_list );
00324                 if( hash_map_base::is_valid(my_node) ) {
00325                     my_index = k; return;
00326                 }
00327                 ++k;
00328             }
00329             my_bucket = 0; my_node = 0; my_index = k; // the end
00330         }
00331 #if !defined(_MSC_VER) || defined(__INTEL_COMPILER)
00332         template<typename Key, typename T, typename HashCompare, typename A>
00333         friend class tbb::concurrent_hash_map;
00334 #else
00335     public: // workaround
00336 #endif
00338         const Container *my_map;
00339 
00341         size_t my_index;
00342 
00344         const bucket *my_bucket;
00345 
00347         node *my_node;
00348 
00349         hash_map_iterator( const Container &map, size_t index, const bucket *b, node_base *n );
00350 
00351     public:
00353         hash_map_iterator() {}
00354         hash_map_iterator( const hash_map_iterator<Container,typename Container::value_type> &other ) :
00355             my_map(other.my_map),
00356             my_index(other.my_index),
00357             my_bucket(other.my_bucket),
00358             my_node(other.my_node)
00359         {}
00360         Value& operator*() const {
00361             __TBB_ASSERT( hash_map_base::is_valid(my_node), "iterator uninitialized or at end of container?" );
00362             return my_node->item;
00363         }
00364         Value* operator->() const {return &operator*();}
00365         hash_map_iterator& operator++();
00366         
00368         Value* operator++(int) {
00369             Value* result = &operator*();
00370             operator++();
00371             return result;
00372         }
00373     };
00374 
00375     template<typename Container, typename Value>
00376     hash_map_iterator<Container,Value>::hash_map_iterator( const Container &map, size_t index, const bucket *b, node_base *n ) :
00377         my_map(&map),
00378         my_index(index),
00379         my_bucket(b),
00380         my_node( static_cast<node*>(n) )
00381     {
00382         if( b && !hash_map_base::is_valid(n) )
00383             advance_to_next_bucket();
00384     }
00385 
00386     template<typename Container, typename Value>
00387     hash_map_iterator<Container,Value>& hash_map_iterator<Container,Value>::operator++() {
00388         my_node = static_cast<node*>( my_node->next );
00389         if( !my_node ) advance_to_next_bucket();
00390         return *this;
00391     }
00392 
00393     template<typename Container, typename T, typename U>
00394     bool operator==( const hash_map_iterator<Container,T>& i, const hash_map_iterator<Container,U>& j ) {
00395         return i.my_node == j.my_node && i.my_map == j.my_map;
00396     }
00397 
00398     template<typename Container, typename T, typename U>
00399     bool operator!=( const hash_map_iterator<Container,T>& i, const hash_map_iterator<Container,U>& j ) {
00400         return i.my_node != j.my_node || i.my_map != j.my_map;
00401     }
00402 
00404 
00405     template<typename Iterator>
00406     class hash_map_range {
00407         typedef typename Iterator::map_type map_type;
00408         Iterator my_begin;
00409         Iterator my_end;
00410         mutable Iterator my_midpoint;
00411         size_t my_grainsize;
00413         void set_midpoint() const;
00414         template<typename U> friend class hash_map_range;
00415     public:
00417         typedef std::size_t size_type;
00418         typedef typename Iterator::value_type value_type;
00419         typedef typename Iterator::reference reference;
00420         typedef typename Iterator::difference_type difference_type;
00421         typedef Iterator iterator;
00422 
00424         bool empty() const {return my_begin==my_end;}
00425 
00427         bool is_divisible() const {
00428             return my_midpoint!=my_end;
00429         }
00431         hash_map_range( hash_map_range& r, split ) : 
00432             my_end(r.my_end),
00433             my_grainsize(r.my_grainsize)
00434         {
00435             r.my_end = my_begin = r.my_midpoint;
00436             __TBB_ASSERT( !empty(), "Splitting despite the range is not divisible" );
00437             __TBB_ASSERT( !r.empty(), "Splitting despite the range is not divisible" );
00438             set_midpoint();
00439             r.set_midpoint();
00440         }
00442         template<typename U>
00443         hash_map_range( hash_map_range<U>& r) : 
00444             my_begin(r.my_begin),
00445             my_end(r.my_end),
00446             my_midpoint(r.my_midpoint),
00447             my_grainsize(r.my_grainsize)
00448         {}
00449 #if TBB_DEPRECATED
00451         hash_map_range( const Iterator& begin_, const Iterator& end_, size_type grainsize = 1 ) : 
00452             my_begin(begin_), 
00453             my_end(end_),
00454             my_grainsize(grainsize)
00455         {
00456             if(!my_end.my_index && !my_end.my_bucket) // end
00457                 my_end.my_index = my_end.my_map->my_mask + 1;
00458             set_midpoint();
00459             __TBB_ASSERT( grainsize>0, "grainsize must be positive" );
00460         }
00461 #endif
00463         hash_map_range( const map_type &map, size_type grainsize = 1 ) : 
00464             my_begin( Iterator( map, 0, map.my_embedded_segment, map.my_embedded_segment->node_list ) ),
00465             my_end( Iterator( map, map.my_mask + 1, 0, 0 ) ),
00466             my_grainsize( grainsize )
00467         {
00468             __TBB_ASSERT( grainsize>0, "grainsize must be positive" );
00469             set_midpoint();
00470         }
00471         const Iterator& begin() const {return my_begin;}
00472         const Iterator& end() const {return my_end;}
00474         size_type grainsize() const {return my_grainsize;}
00475     };
00476 
00477     template<typename Iterator>
00478     void hash_map_range<Iterator>::set_midpoint() const {
00479         // Split by groups of nodes
00480         size_t m = my_end.my_index-my_begin.my_index;
00481         if( m > my_grainsize ) {
00482             m = my_begin.my_index + m/2u;
00483             hash_map_base::bucket *b = my_begin.my_map->get_bucket(m);
00484             my_midpoint = Iterator(*my_begin.my_map,m,b,b->node_list);
00485         } else {
00486             my_midpoint = my_end;
00487         }
00488         __TBB_ASSERT( my_begin.my_index <= my_midpoint.my_index,
00489             "my_begin is after my_midpoint" );
00490         __TBB_ASSERT( my_midpoint.my_index <= my_end.my_index,
00491             "my_midpoint is after my_end" );
00492         __TBB_ASSERT( my_begin != my_midpoint || my_begin == my_end,
00493             "[my_begin, my_midpoint) range should not be empty" );
00494     }
00495 } // namespace internal
00497 
00499 static const size_t hash_multiplier = sizeof(size_t)==4? 2654435769U : 11400714819323198485ULL;
00501 template<typename T>
00502 inline static size_t tbb_hasher( const T& t ) {
00503     return static_cast<size_t>( t ) * hash_multiplier;
00504 }
00505 template<typename P>
00506 inline static size_t tbb_hasher( P* ptr ) {
00507     size_t const h = reinterpret_cast<size_t>( ptr );
00508     return (h >> 3) ^ h;
00509 }
00510 template<typename E, typename S, typename A>
00511 inline static size_t tbb_hasher( const std::basic_string<E,S,A>& s ) {
00512     size_t h = 0;
00513     for( const E* c = s.c_str(); *c; c++ )
00514         h = static_cast<size_t>(*c) ^ (h * hash_multiplier);
00515     return h;
00516 }
00517 template<typename F, typename S>
00518 inline static size_t tbb_hasher( const std::pair<F,S>& p ) {
00519     return tbb_hasher(p.first) ^ tbb_hasher(p.second);
00520 }
00521 
00523 template<typename T>
00524 struct tbb_hash_compare {
00525     static size_t hash( const T& t ) { return tbb_hasher(t); }
00526     static bool equal( const T& a, const T& b ) { return a == b; }
00527 };
00528 
00530 
00559 template<typename Key, typename T, typename HashCompare, typename Allocator>
00560 class concurrent_hash_map : protected internal::hash_map_base {
00561     template<typename Container, typename Value>
00562     friend class internal::hash_map_iterator;
00563 
00564     template<typename I>
00565     friend class internal::hash_map_range;
00566 
00567 public:
00568     typedef Key key_type;
00569     typedef T mapped_type;
00570     typedef std::pair<const Key,T> value_type;
00571     typedef internal::hash_map_base::size_type size_type;
00572     typedef ptrdiff_t difference_type;
00573     typedef value_type *pointer;
00574     typedef const value_type *const_pointer;
00575     typedef value_type &reference;
00576     typedef const value_type &const_reference;
00577     typedef internal::hash_map_iterator<concurrent_hash_map,value_type> iterator;
00578     typedef internal::hash_map_iterator<concurrent_hash_map,const value_type> const_iterator;
00579     typedef internal::hash_map_range<iterator> range_type;
00580     typedef internal::hash_map_range<const_iterator> const_range_type;
00581     typedef Allocator allocator_type;
00582 
00583 protected:
00584     friend class const_accessor;
00585     struct node;
00586     typedef typename Allocator::template rebind<node>::other node_allocator_type;
00587     node_allocator_type my_allocator;
00588     HashCompare my_hash_compare;
00589 
00590     struct node : public node_base {
00591         value_type item;
00592         node( const Key &key ) : item(key, T()) {}
00593         node( const Key &key, const T &t ) : item(key, t) {}
00594         // exception-safe allocation, see C++ Standard 2003, clause 5.3.4p17
00595         void *operator new( size_t /*size*/, node_allocator_type &a ) {
00596             void *ptr = a.allocate(1);
00597             if(!ptr) throw std::bad_alloc();
00598             return ptr;
00599         }
00600         // match placement-new form above to be called if exception thrown in constructor
00601         void operator delete( void *ptr, node_allocator_type &a ) {return a.deallocate(static_cast<node*>(ptr),1); }
00602     };
00603 
00604     void delete_node( node_base *n ) {
00605         my_allocator.destroy( static_cast<node*>(n) );
00606         my_allocator.deallocate( static_cast<node*>(n), 1);
00607     }
00608 
00609     node *search_bucket( const key_type &key, bucket *b ) const {
00610         node *n = static_cast<node*>( b->node_list );
00611         while( is_valid(n) && !my_hash_compare.equal(key, n->item.first) )
00612             n = static_cast<node*>( n->next );
00613         __TBB_ASSERT(n != internal::rehash_req, "Search can be executed only for rehashed bucket");
00614         return n;
00615     }
00616 
00618     class bucket_accessor : public bucket::scoped_t {
00619         bool my_is_writer; // TODO: use it from base type
00620         bucket *my_b;
00621     public:
00622         bucket_accessor( concurrent_hash_map *base, const hashcode_t h, bool writer = false ) { acquire( base, h, writer ); }
00624         inline void acquire( concurrent_hash_map *base, const hashcode_t h, bool writer = false ) {
00625             my_b = base->get_bucket( h );
00626 #if TBB_USE_THREADING_TOOLS
00627             // TODO: actually, notification is unnecessary here, just hiding double-check
00628             if( itt_load_pointer_with_acquire_v3(&my_b->node_list) == internal::rehash_req
00629 #else
00630             if( __TBB_load_with_acquire(my_b->node_list) == internal::rehash_req
00631 #endif
00632                 && try_acquire( my_b->mutex, /*write=*/true ) )
00633             {
00634                 if( my_b->node_list == internal::rehash_req ) base->rehash_bucket( my_b, h ); //recursive rehashing
00635                 my_is_writer = true;
00636             }
00637             else bucket::scoped_t::acquire( my_b->mutex, /*write=*/my_is_writer = writer );
00638             __TBB_ASSERT( my_b->node_list != internal::rehash_req, NULL);
00639         }
00641         bool is_writer() { return my_is_writer; }
00643         bucket *operator() () { return my_b; }
00644         // TODO: optimize out
00645         bool upgrade_to_writer() { my_is_writer = true; return bucket::scoped_t::upgrade_to_writer(); }
00646     };
00647 
00648     // TODO refactor to hash_base
00649     void rehash_bucket( bucket *b_new, const hashcode_t h ) {
00650         __TBB_ASSERT( *(intptr_t*)(&b_new->mutex), "b_new must be locked (for write)");
00651         __TBB_ASSERT( h > 1, "The lowermost buckets can't be rehashed" );
00652         __TBB_store_with_release(b_new->node_list, internal::empty_rehashed); // mark rehashed
00653         hashcode_t mask = ( 1u<<__TBB_Log2( h ) ) - 1; // get parent mask from the topmost bit
00654 
00655         bucket_accessor b_old( this, h & mask );
00656 
00657         mask = (mask<<1) | 1; // get full mask for new bucket
00658         __TBB_ASSERT( (mask&(mask+1))==0 && (h & mask) == h, NULL );
00659     restart:
00660         for( node_base **p = &b_old()->node_list, *n = __TBB_load_with_acquire(*p); is_valid(n); n = *p ) {
00661             hashcode_t c = my_hash_compare.hash( static_cast<node*>(n)->item.first );
00662             if( (c & mask) == h ) {
00663                 if( !b_old.is_writer() )
00664                     if( !b_old.upgrade_to_writer() ) {
00665                         goto restart; // node ptr can be invalid due to concurrent erase
00666                     }
00667                 *p = n->next; // exclude from b_old
00668                 add_to_bucket( b_new, n );
00669             } else p = &n->next; // iterate to next item
00670         }
00671     }
00672 
00673 public:
00674     
00675     class accessor;
00677     class const_accessor {
00678         friend class concurrent_hash_map<Key,T,HashCompare,Allocator>;
00679         friend class accessor;
00680         void operator=( const accessor & ) const; // Deny access
00681         const_accessor( const accessor & );       // Deny access
00682     public:
00684         typedef const typename concurrent_hash_map::value_type value_type;
00685 
00687         bool empty() const {return !my_node;}
00688 
00690         void release() {
00691             if( my_node ) {
00692                 my_lock.release();
00693                 my_node = 0;
00694             }
00695         }
00696 
00698         const_reference operator*() const {
00699             __TBB_ASSERT( my_node, "attempt to dereference empty accessor" );
00700             return my_node->item;
00701         }
00702 
00704         const_pointer operator->() const {
00705             return &operator*();
00706         }
00707 
00709         const_accessor() : my_node(NULL) {}
00710 
00712         ~const_accessor() {
00713             my_node = NULL; // my_lock.release() is called in scoped_lock destructor
00714         }
00715     private:
00716         node *my_node;
00717         typename node::scoped_t my_lock;
00718         hashcode_t my_hash;
00719     };
00720 
00722     class accessor: public const_accessor {
00723     public:
00725         typedef typename concurrent_hash_map::value_type value_type;
00726 
00728         reference operator*() const {
00729             __TBB_ASSERT( this->my_node, "attempt to dereference empty accessor" );
00730             return this->my_node->item;
00731         }
00732 
00734         pointer operator->() const {
00735             return &operator*();
00736         }
00737     };
00738 
00740     concurrent_hash_map(const allocator_type &a = allocator_type())
00741         : my_allocator(a)
00742     {}
00743 
00745     concurrent_hash_map(size_type n, const allocator_type &a = allocator_type())
00746         : my_allocator(a)
00747     {
00748         reserve( n );
00749     }
00750 
00752     concurrent_hash_map( const concurrent_hash_map& table, const allocator_type &a = allocator_type())
00753         : my_allocator(a)
00754     {
00755         internal_copy(table);
00756     }
00757 
00759     template<typename I>
00760     concurrent_hash_map(I first, I last, const allocator_type &a = allocator_type())
00761         : my_allocator(a)
00762     {
00763         reserve( std::distance(first, last) ); // TODO: load_factor?
00764         internal_copy(first, last);
00765     }
00766 
00768     concurrent_hash_map& operator=( const concurrent_hash_map& table ) {
00769         if( this!=&table ) {
00770             clear();
00771             internal_copy(table);
00772         } 
00773         return *this;
00774     }
00775 
00776 
00778     void clear();
00779 
00781     ~concurrent_hash_map() { clear(); }
00782 
00783     //------------------------------------------------------------------------
00784     // Parallel algorithm support
00785     //------------------------------------------------------------------------
00786     range_type range( size_type grainsize=1 ) {
00787         return range_type( *this, grainsize );
00788     }
00789     const_range_type range( size_type grainsize=1 ) const {
00790         return const_range_type( *this, grainsize );
00791     }
00792 
00793     //------------------------------------------------------------------------
00794     // STL support - not thread-safe methods
00795     //------------------------------------------------------------------------
00796     iterator begin() {return iterator(*this,0,my_embedded_segment,my_embedded_segment->node_list);}
00797     iterator end() {return iterator(*this,0,0,0);}
00798     const_iterator begin() const {return const_iterator(*this,0,my_embedded_segment,my_embedded_segment->node_list);}
00799     const_iterator end() const {return const_iterator(*this,0,0,0);}
00800     std::pair<iterator, iterator> equal_range( const Key& key ) { return internal_equal_range(key, end()); }
00801     std::pair<const_iterator, const_iterator> equal_range( const Key& key ) const { return internal_equal_range(key, end()); }
00802     
00804     size_type size() const { return my_size; }
00805 
00807     bool empty() const { return my_size == 0; }
00808 
00810     size_type max_size() const {return (~size_type(0))/sizeof(node);}
00811 
00813     allocator_type get_allocator() const { return this->my_allocator; }
00814 
00816     void swap(concurrent_hash_map &table);
00817 
00818     //------------------------------------------------------------------------
00819     // concurrent map operations
00820     //------------------------------------------------------------------------
00821 
00823     size_type count( const Key &key ) const {
00824         return const_cast<concurrent_hash_map*>(this)->lookup(/*insert*/false, key, NULL, NULL, /*write=*/false );
00825     }
00826 
00828 
00829     bool find( const_accessor &result, const Key &key ) const {
00830         result.release();
00831         return const_cast<concurrent_hash_map*>(this)->lookup(/*insert*/false, key, NULL, &result, /*write=*/false );
00832     }
00833 
00835 
00836     bool find( accessor &result, const Key &key ) {
00837         result.release();
00838         return lookup(/*insert*/false, key, NULL, &result, /*write=*/true );
00839     }
00840         
00842 
00843     bool insert( const_accessor &result, const Key &key ) {
00844         result.release();
00845         return lookup(/*insert*/true, key, NULL, &result, /*write=*/false );
00846     }
00847 
00849 
00850     bool insert( accessor &result, const Key &key ) {
00851         result.release();
00852         return lookup(/*insert*/true, key, NULL, &result, /*write=*/true );
00853     }
00854 
00856 
00857     bool insert( const_accessor &result, const value_type &value ) {
00858         result.release();
00859         return lookup(/*insert*/true, value.first, &value.second, &result, /*write=*/false );
00860     }
00861 
00863 
00864     bool insert( accessor &result, const value_type &value ) {
00865         result.release();
00866         return lookup(/*insert*/true, value.first, &value.second, &result, /*write=*/true );
00867     }
00868 
00870 
00871     bool insert( const value_type &value ) {
00872         return lookup(/*insert*/true, value.first, &value.second, NULL, /*write=*/false );
00873     }
00874 
00876     template<typename I>
00877     void insert(I first, I last) {
00878         for(; first != last; ++first)
00879             insert( *first );
00880     }
00881 
00883 
00884     bool erase( const Key& key );
00885 
00887 
00888     bool erase( const_accessor& item_accessor ) {
00889         return exclude( item_accessor, /*readonly=*/ true );
00890     }
00891 
00893 
00894     bool erase( accessor& item_accessor ) {
00895         return exclude( item_accessor, /*readonly=*/ false );
00896     }
00897 
00898 protected:
00900     bool lookup( bool op_insert, const Key &key, const T *t, const_accessor *result, bool write );
00901 
00903     bool exclude( const_accessor &item_accessor, bool readonly );
00904 
00906     template<typename I>
00907     std::pair<I, I> internal_equal_range( const Key& key, I end ) const;
00908 
00910     void internal_copy( const concurrent_hash_map& source );
00911 
00912     template<typename I>
00913     void internal_copy(I first, I last);
00914 
00916 
00918     const_pointer internal_fast_find( const Key& key ) const {
00919         hashcode_t h = my_hash_compare.hash( key );
00920 #if TBB_USE_THREADING_TOOLS
00921         hashcode_t m = (hashcode_t) itt_load_pointer_with_acquire_v3( &my_mask );
00922 #else
00923         hashcode_t m = my_mask;
00924 #endif
00925         node *n;
00926     restart:
00927         __TBB_ASSERT((m&(m+1))==0, NULL);
00928         bucket *b = get_bucket( h & m );
00929 #if TBB_USE_THREADING_TOOLS
00930         // TODO: actually, notification is unnecessary here, just hiding double-check
00931         if( itt_load_pointer_with_acquire_v3(&b->node_list) == internal::rehash_req )
00932 #else
00933         if( __TBB_load_with_acquire(b->node_list) == internal::rehash_req )
00934 #endif
00935         {
00936             bucket::scoped_t lock;
00937             if( lock.try_acquire( b->mutex, /*write=*/true ) ) {
00938                 if( b->node_list == internal::rehash_req)
00939                     const_cast<concurrent_hash_map*>(this)->rehash_bucket( b, h & m ); //recursive rehashing
00940             }
00941             else lock.acquire( b->mutex, /*write=*/false );
00942             __TBB_ASSERT(b->node_list!=internal::rehash_req,NULL);
00943         }
00944         n = search_bucket( key, b );
00945         if( n )
00946             return &n->item;
00947         else if( check_mask_race( h, m ) )
00948             goto restart;
00949         return 0;
00950     }
00951 };
00952 
00953 #if _MSC_VER && !defined(__INTEL_COMPILER)
00954     // Suppress "conditional expression is constant" warning.
00955     #pragma warning( push )
00956     #pragma warning( disable: 4127 )
00957 #endif
00958 
00959 template<typename Key, typename T, typename HashCompare, typename A>
00960 bool concurrent_hash_map<Key,T,HashCompare,A>::lookup( bool op_insert, const Key &key, const T *t, const_accessor *result, bool write ) {
00961     __TBB_ASSERT( !result || !result->my_node, NULL );
00962     segment_index_t grow_segment;
00963     bool return_value;
00964     node *n, *tmp_n = 0;
00965     hashcode_t const h = my_hash_compare.hash( key );
00966 #if TBB_USE_THREADING_TOOLS
00967     hashcode_t m = (hashcode_t) itt_load_pointer_with_acquire_v3( &my_mask );
00968 #else
00969     hashcode_t m = my_mask;
00970 #endif
00971     restart:
00972     {//lock scope
00973         __TBB_ASSERT((m&(m+1))==0, NULL);
00974         return_value = false;
00975         // get bucket
00976         bucket_accessor b( this, h & m );
00977 
00978         // find a node
00979         n = search_bucket( key, b() );
00980         if( op_insert ) {
00981             // [opt] insert a key
00982             if( !n ) {
00983                 if( !tmp_n ) {
00984                     if(t) tmp_n = new( my_allocator ) node(key, *t);
00985                     else  tmp_n = new( my_allocator ) node(key);
00986                 }
00987                 if( !b.is_writer() && !b.upgrade_to_writer() ) { // TODO: improved insertion
00988                     // Rerun search_list, in case another thread inserted the item during the upgrade.
00989                     n = search_bucket( key, b() );
00990                     if( is_valid(n) ) { // unfortunately, it did
00991                         b.downgrade_to_reader();
00992                         goto exists;
00993                     }
00994                 }
00995                 if( check_mask_race(h, m) )
00996                     goto restart; // b.release() is done in ~b().
00997                 // insert and set flag to grow the container
00998                 grow_segment = insert_new_node( b(), n = tmp_n, m );
00999                 tmp_n = 0;
01000                 return_value = true;
01001             } else {
01002     exists:     grow_segment = 0;
01003             }
01004         } else { // find or count
01005             if( !n ) {
01006                 if( check_mask_race( h, m ) )
01007                     goto restart; // b.release() is done in ~b(). TODO: replace by continue
01008                 return false;
01009             }
01010             return_value = true;
01011             grow_segment = 0;
01012         }
01013         if( !result ) goto check_growth;
01014         // TODO: the following seems as generic/regular operation
01015         // acquire the item
01016         if( !result->my_lock.try_acquire( n->mutex, write ) ) {
01017             // we are unlucky, prepare for longer wait
01018             internal::atomic_backoff trials;
01019             do {
01020                 if( !trials.bounded_pause() ) {
01021                     // the wait takes really long, restart the operation
01022                     b.release();
01023                     __TBB_ASSERT( !op_insert || !return_value, "Can't acquire new item in locked bucket?" );
01024                     __TBB_Yield();
01025                     m = my_mask;
01026                     goto restart;
01027                 }
01028             } while( !result->my_lock.try_acquire( n->mutex, write ) );
01029         }
01030     }//lock scope
01031     result->my_node = n;
01032     result->my_hash = h;
01033 check_growth:
01034     // [opt] grow the container
01035     if( grow_segment )
01036         enable_segment( grow_segment );
01037     if( tmp_n ) // if op_insert only
01038         delete_node( tmp_n );
01039     return return_value;
01040 }
01041 
01042 template<typename Key, typename T, typename HashCompare, typename A>
01043 template<typename I>
01044 std::pair<I, I> concurrent_hash_map<Key,T,HashCompare,A>::internal_equal_range( const Key& key, I end ) const {
01045     hashcode_t h = my_hash_compare.hash( key );
01046     hashcode_t m = my_mask;
01047     __TBB_ASSERT((m&(m+1))==0, NULL);
01048     h &= m;
01049     bucket *b = get_bucket( h );
01050     while( b->node_list == internal::rehash_req ) {
01051         m = ( 1u<<__TBB_Log2( h ) ) - 1; // get parent mask from the topmost bit
01052         b = get_bucket( h &= m );
01053     }
01054     node *n = search_bucket( key, b );
01055     if( !n )
01056         return std::make_pair(end, end);
01057     iterator lower(*this, h, b, n), upper(lower);
01058     return std::make_pair(lower, ++upper);
01059 }
01060 
01061 template<typename Key, typename T, typename HashCompare, typename A>
01062 bool concurrent_hash_map<Key,T,HashCompare,A>::exclude( const_accessor &item_accessor, bool readonly ) {
01063     __TBB_ASSERT( item_accessor.my_node, NULL );
01064     node_base *const n = item_accessor.my_node;
01065     item_accessor.my_node = NULL; // we ought release accessor anyway
01066     hashcode_t const h = item_accessor.my_hash;
01067     hashcode_t m = my_mask;
01068     do {
01069         // get bucket
01070         bucket_accessor b( this, h & m, /*writer=*/true );
01071         node_base **p = &b()->node_list;
01072         while( *p && *p != n )
01073             p = &(*p)->next;
01074         if( !*p ) { // someone else was the first
01075             if( check_mask_race( h, m ) )
01076                 continue;
01077             item_accessor.my_lock.release();
01078             return false;
01079         }
01080         __TBB_ASSERT( *p == n, NULL );
01081         *p = n->next; // remove from container
01082         my_size--;
01083         break;
01084     } while(true);
01085     if( readonly ) // need to get exclusive lock
01086         item_accessor.my_lock.upgrade_to_writer(); // return value means nothing here
01087     item_accessor.my_lock.release();
01088     delete_node( n ); // Only one thread can delete it due to write lock on the chain_mutex
01089     return true;
01090 }
01091 
01092 template<typename Key, typename T, typename HashCompare, typename A>
01093 bool concurrent_hash_map<Key,T,HashCompare,A>::erase( const Key &key ) {
01094     node_base *n;
01095     hashcode_t const h = my_hash_compare.hash( key );
01096     hashcode_t m = my_mask;
01097 restart:
01098     {//lock scope
01099         // get bucket
01100         bucket_accessor b( this, h & m );
01101     search:
01102         node_base **p = &b()->node_list;
01103         n = *p;
01104         while( is_valid(n) && !my_hash_compare.equal(key, static_cast<node*>(n)->item.first ) ) {
01105             p = &n->next;
01106             n = *p;
01107         }
01108         if( !n ) { // not found, but mask could be changed
01109             if( check_mask_race( h, m ) )
01110                 goto restart;
01111             return false;
01112         }
01113         else if( !b.is_writer() && !b.upgrade_to_writer() ) {
01114             if( check_mask_race( h, m ) ) // contended upgrade, check mask
01115                 goto restart;
01116             goto search;
01117         }
01118         *p = n->next;
01119         my_size--;
01120     }
01121     {
01122         typename node::scoped_t item_locker( n->mutex, /*write=*/true );
01123     }
01124     // note: there should be no threads pretending to acquire this mutex again, do not try to upgrade const_accessor!
01125     delete_node( n ); // Only one thread can delete it due to write lock on the bucket
01126     return true;
01127 }
01128 
01129 template<typename Key, typename T, typename HashCompare, typename A>
01130 void concurrent_hash_map<Key,T,HashCompare,A>::swap(concurrent_hash_map<Key,T,HashCompare,A> &table) {
01131     std::swap(this->my_allocator, table.my_allocator);
01132     std::swap(this->my_hash_compare, table.my_hash_compare);
01133     internal_swap(table);
01134 }
01135 
01136 template<typename Key, typename T, typename HashCompare, typename A>
01137 void concurrent_hash_map<Key,T,HashCompare,A>::clear() {
01138     hashcode_t m = my_mask;
01139     __TBB_ASSERT((m&(m+1))==0, NULL);
01140 #if TBB_USE_DEBUG || TBB_USE_PERFORMANCE_WARNINGS
01141 #if TBB_USE_PERFORMANCE_WARNINGS
01142     int size = int(my_size), buckets = int(m)+1, empty_buckets = 0, overpopulated_buckets = 0; // usage statistics
01143     static bool reported = false;
01144 #endif
01145     // check consistency
01146     for( segment_index_t b = 0; b <= m; b++ ) {
01147         node_base *n = get_bucket(b)->node_list;
01148 #if TBB_USE_PERFORMANCE_WARNINGS
01149         if( n == internal::empty_rehashed ) empty_buckets++;
01150         else if( n == internal::rehash_req ) buckets--;
01151         else if( n->next ) overpopulated_buckets++;
01152 #endif
01153         for(; is_valid(n); n = n->next ) {
01154             hashcode_t h = my_hash_compare.hash( static_cast<node*>(n)->item.first );
01155             h &= m;
01156             __TBB_ASSERT( h == b || get_bucket(h)->node_list == internal::rehash_req, "Rehashing is not finished until serial stage due to concurrent or unexpectedly terminated operation" );
01157         }
01158     }
01159 #if TBB_USE_PERFORMANCE_WARNINGS
01160     if( buckets > size) empty_buckets -= buckets - size;
01161     else overpopulated_buckets -= size - buckets; // TODO: load_factor?
01162     if( !reported && buckets >= 512 && ( 2*empty_buckets >= size || 2*overpopulated_buckets > size ) ) {
01163         internal::runtime_warning(
01164             "Performance is not optimal because the hash function produces bad randomness in lower bits in %s.\nSize: %d  Empties: %d  Overlaps: %d",
01165             typeid(*this).name(), size, empty_buckets, overpopulated_buckets );
01166         reported = true;
01167     }
01168 #endif
01169 #endif//TBB_USE_DEBUG || TBB_USE_PERFORMANCE_WARNINGS
01170     my_size = 0;
01171     segment_index_t s = segment_index_of( m );
01172     __TBB_ASSERT( s+1 == pointers_per_table || !my_table[s+1], "wrong mask or concurrent grow" );
01173     cache_aligned_allocator<bucket> alloc;
01174     do {
01175         __TBB_ASSERT( is_valid( my_table[s] ), "wrong mask or concurrent grow" );
01176         segment_ptr_t buckets = my_table[s];
01177         size_type sz = segment_size( s ? s : 1 );
01178         for( segment_index_t i = 0; i < sz; i++ )
01179             for( node_base *n = buckets[i].node_list; is_valid(n); n = buckets[i].node_list ) {
01180                 buckets[i].node_list = n->next;
01181                 delete_node( n );
01182             }
01183         if( s >= first_block) // the first segment or the next
01184             alloc.deallocate( buckets, sz );
01185         else if( s == embedded_block && embedded_block != first_block )
01186             alloc.deallocate( buckets, segment_size(first_block)-embedded_buckets );
01187         if( s >= embedded_block ) my_table[s] = 0;
01188     } while(s-- > 0);
01189     my_mask = embedded_buckets - 1;
01190 }
01191 
01192 template<typename Key, typename T, typename HashCompare, typename A>
01193 void concurrent_hash_map<Key,T,HashCompare,A>::internal_copy( const concurrent_hash_map& source ) {
01194     reserve( source.my_size ); // TODO: load_factor?
01195     hashcode_t mask = source.my_mask;
01196     if( my_mask == mask ) { // optimized version
01197         bucket *dst = 0, *src = 0;
01198         for( hashcode_t k = 0; k <= mask; k++ ) {
01199             if( k & (k-2) ) ++dst,src++; // not the beginning of a segment
01200             else { dst = get_bucket( k ); src = source.get_bucket( k ); }
01201             __TBB_ASSERT( dst->node_list != internal::rehash_req, "Invalid bucket in destination table");
01202             node *n = static_cast<node*>( src->node_list );
01203             if( n == internal::rehash_req ) { // source is not rehashed, items are in previous buckets
01204                 bucket_accessor b( this, k );
01205                 rehash_bucket( b(), k ); // TODO: use without synchronization
01206             } else for(; n; n = static_cast<node*>( n->next ) ) {
01207                 add_to_bucket( dst, new( my_allocator ) node(n->item.first, n->item.second) );
01208                 ++my_size; // TODO: replace by non-atomic op
01209             }
01210         }
01211     } else internal_copy( source.begin(), source.end() );
01212 }
01213 
01214 template<typename Key, typename T, typename HashCompare, typename A>
01215 template<typename I>
01216 void concurrent_hash_map<Key,T,HashCompare,A>::internal_copy(I first, I last) {
01217     hashcode_t m = my_mask;
01218     for(; first != last; ++first) {
01219         hashcode_t h = my_hash_compare.hash( first->first );
01220         bucket *b = get_bucket( h & m );
01221         __TBB_ASSERT( b->node_list != internal::rehash_req, "Invalid bucket in destination table");
01222         node *n = new( my_allocator ) node(first->first, first->second);
01223         add_to_bucket( b, n );
01224         ++my_size; // TODO: replace by non-atomic op
01225     }
01226 }
01227 
01228 template<typename Key, typename T, typename HashCompare, typename A1, typename A2>
01229 inline bool operator==(const concurrent_hash_map<Key, T, HashCompare, A1> &a, const concurrent_hash_map<Key, T, HashCompare, A2> &b) {
01230     if(a.size() != b.size()) return false;
01231     typename concurrent_hash_map<Key, T, HashCompare, A1>::const_iterator i(a.begin()), i_end(a.end());
01232     typename concurrent_hash_map<Key, T, HashCompare, A2>::const_iterator j, j_end(b.end());
01233     for(; i != i_end; ++i) {
01234         j = b.equal_range(i->first).first;
01235         if( j == j_end || !(i->second == j->second) ) return false;
01236     }
01237     return true;
01238 }
01239 
01240 template<typename Key, typename T, typename HashCompare, typename A1, typename A2>
01241 inline bool operator!=(const concurrent_hash_map<Key, T, HashCompare, A1> &a, const concurrent_hash_map<Key, T, HashCompare, A2> &b)
01242 {    return !(a == b); }
01243 
01244 template<typename Key, typename T, typename HashCompare, typename A>
01245 inline void swap(concurrent_hash_map<Key, T, HashCompare, A> &a, concurrent_hash_map<Key, T, HashCompare, A> &b)
01246 {    a.swap( b ); }
01247 
01248 #if _MSC_VER && !defined(__INTEL_COMPILER)
01249     #pragma warning( pop )
01250 #endif // warning 4127 is back
01251 
01252 } // namespace tbb
01253 
01254 #endif /* __TBB_concurrent_hash_map_H */

Copyright © 2005-2009 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.