除了thread,boost
::thread另一个重要组成部分是mutex,以及工作在mutex上的boost
::mutex
::scoped_lock、condition和barrier,这些都是为实现线程同步提供的。
mutex
boost提供的mutex有6种:
boost
::mutex
boost
::try_mutex
boost
::timed_mutex
boost
::recursive_mutex
boost
::recursive_try_mutex
boost
::recursive_timed_mutex
下面仅对boost
::mutex进行分析。
mutex类是一个CriticalSection(临界区)封装类,它在构造函数中新建一个临界区并InitializeCriticalSection,然后用一个成员变量
void
* m_mutex
;来保存该临界区结构。
除此之外,mutex还提供了do_lock、do_unlock等方法,这些方法分别调用EnterCriticalSection、LeaveCriticalSection来修改成员变量m_mutex(CRITICAL_SECTION结构指针)的状态,但这些方法都是private的,以防止我们直接对mutex进行锁操作,所有的锁操作都必须通过mutex的友元类detail
::thread
::lock_ops
<mutex
>来完成,比较有意思的是,lock_ops的所有方法:lock、unlock、trylock等都是static的,如lock_ops
<Mutex
>::lock的实现:
template
<typename Mutex
>class lock_ops
: private noncopyable
{
...public
: static void lock
(Mutex
& m
)
{ m
.do_lock
();
}
...
}boost
::thread的设计者为什么会这么设计呢?我想大概是:
1、boost
::thread的设计者不希望被我们直接操作mutex,改变其状态,所以mutex的所有方法都是private的(除了构造函数,析构函数)。
2、虽然我们可以通过lock_ops来修改mutex的状态,如:
#include <boost/thread/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/detail/lock.hpp>
int main
()
{ boost
::mutex mt
; //mt.do_lock(); // Error! Can not access private member!
boost
::detail
::thread
::lock_ops
<boost
::mutex
>::lock
(mt
); return 0
;
}但是,这是不推荐的,因为mutex、scoped_lock、condition、barrier是一套完整的类系,它们是相互协同工作的,像上面这么操作没有办法与后面的几个类协同工作。
scoped_lock
上面说过,不应该直接用lock_ops来操作mutex对象,那么,应该用什么呢?答案就是scoped_lock。与存在多种mutex一样,存在多种与mutex对应的scoped_lock:
scoped_lock
scoped_try_lock
scoped_timed_lock
这里我们只讨论scoped_lock。
scoped_lock是定义在namespace boost
::detail
::thread下的,为了方便我们使用(也为了方便设计者),mutex使用了下面的typedef:
typedef detail
::thread
::scoped_lock
<mutex
> scoped_lock
;这样我们就可以通过:
boost
::mutex
::scoped_lock
来使用scoped_lock类模板了。
由于scoped_lock的作用仅在于对mutex加锁
/解锁(即使mutex EnterCriticalSection
/LeaveCriticalSection),因此,它的接口也很简单,除了构造函数外,仅有lock
/unlock
/locked(判断是否已加锁),及类型转换操作符void
*,一般我们不需要显式调用这些方法,因为scoped_lock的构造函数是这样定义的:
explicit scoped_lock
(Mutex
& mx
, bool initially_locked
=true)
: m_mutex
(mx
), m_locked
(false)
{ if
(initially_locked
) lock
();
}注:m_mutex是一个mutex的引用。
因此,当我们不指定initially_locked参数构造一个scoped_lock对象时,scoped_lock会自动对所绑定的mutex加锁,而析构函数会检查是否加锁,若已加锁,则解锁;当然,有些情况下,我们可能不需要构造时自动加锁,这样就需要自己调用lock方法。后面的condition、barrier也会调用scoped_lock的lock、unlock方法来实现部分方法。
正因为scoped_lock具有可在构造时加锁,析构时解锁的特性,我们经常会使用局部变量来实现对mutex的独占访问。如thread部分独占访问cout的例子:
#include <boost/thread/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <iostream>
boost
::mutex io_mutex
;void count
() // worker function
{ for
(int i
= 0
; i
< 10
; ++i
)
{ boost
::mutex
::scoped_lock lock
(io_mutex
); std
::cout
<< i
<< std
::endl
;
}
}int main
(int argc
, char
* argv
[])
{ boost
::thread thrd1
(&count
); boost
::thread thrd2
(&count
); thrd1
.join
(); thrd2
.join
(); return 0
;
}在每次输出信息时,为了防止整个输出过程被其它线程打乱,通过对io_mutex加锁(进入临界区),从而保证了输出的正确性。
在使用scoped_lock时,我们有时候需要使用全局锁(定义一个全局mutex,当需要独占访问全局资源时,以该全局mutex为参数构造一个scoped_lock对象即可。全局mutex可以是全局变量,也可以是类的静态方法等),有时候则需要使用对象锁(将mutex定义成类的成员变量),应该根据需要进行合理选择。
Java的synchronized可用于对方法加锁,对代码段加锁,对对象加锁,对类加锁(仍然是对象级的),这几种加锁方式都可以通过上面讲的对象锁来模拟;相反,在Java中实现全局锁好像有点麻烦,必须将请求封装到类中,以转换成上面的四种synchronized形式之一。
condition
condition的接口如下:
class condition
: private boost
::noncopyable // Exposition only
{public
: // construct/copy/destruct
condition
();
~condition
(); // notification
void notify_one
(); void notify_all
(); // waiting
template
<typename ScopedLock
> void wait
(ScopedLock
&); template
<typename ScopedLock
, typename Pred
> void wait
(ScopedLock
&, Pred
); template
<typename ScopedLock
> bool timed_wait
(ScopedLock
&, const boost
::xtime
&); template
<typename ScopedLock
, typename Pred
> bool timed_wait
(ScopedLock
&, Pred
);
};其中wait用于等待某个condition的发生,而timed_wait则提供具有超时的wait功能,notify_one用于唤醒一个等待该condition发生的线程,notify_all则用于唤醒所有等待该condition发生的线程。
由于condition的语义相对较为复杂,它的实现也是整个boost
::thread库中最复杂的(对Windows版本而言,对支持pthread的版本而言,由于pthread已经提供了pthread_cond_t,使得condition实现起来也十分简单),下面对wait和notify_one进行简要分析。
condition内部包含了一个condition_impl对象,由该对象执行来处理实际的wait、notify_one
...等操作。
下面先对condition_impl进行简要分析。
condition_impl在其构造函数中会创建两个Semaphore(信号量):m_gate、m_queue,及一个Mutex(互斥体,跟boost
::mutex类似,但boost
::mutex是基于CriticalSection
<临界区
>的):m_mutex,其中:
m_queue
相当于当前所有等待线程的等待队列,构造函数中调用CreateSemaphore来创建Semaphore时,lMaximumCount参数被指定为
(std
::numeric_limits
<long
>::max
)(),即便如此,condition的实现者为了防止出现大量等待线程的情况(以至于超过了long的最大值),在线程因执行condition
::wait进入等待状态时会先:
WaitForSingleObject
(reinterpret_cast
<HANDLE
>(m_queue
), INFINITE
);以等待被唤醒,但很难想象什么样的应用需要处理这么多线程。
m_mutex
用于内部同步的控制。
但对于m_gate我很奇怪,我仔细研究了一下condition_imp的实现,还是不明白作者引入m_gate这个变量的用意何在,既然已经有了用于同步控制的m_mutex,再引入一个m_gate实在让我有点不解。
以下是condition
::wait调用的do_wait方法简化后的代码:
template
<typename M
>void do_wait
(M
& mutex
)
{ m_impl
.enter_wait
(); lock_ops
::unlock
(mutex
, state
); //对传入的scoped_lock对象解锁,以便别的线程可以对其进行加锁,并执行某些处理,否则,本线程等待的condition永远不会发生(因为没有线程可以获得访问资源的权利以使condition发生)
m_impl
.do_wait
(); //执行等待操作,等待其它线程执行notify_one或notify_all操作以获得
lock_ops
::lock
(mutex
, state
); //重新对scoped_lock对象加锁,获得独占访问资源的权利
}condition
::timed_wait的实现方法与此类似,而notify_one、notify_all仅将调用请求转发给m_impl,就不多讲了。
虽然condition的内部实现比较复杂,但使用起来还是比较方便的。下面是一个使用condition的多Producer
-多Consumer同步的例子(这是本人为即将推出的“大卫的Design Patterns学习笔记”编写的Mediator模式的示例):
#include <boost/thread/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition.hpp>
#include <boost/thread/xtime.hpp>
#include <iostream>
#include <time.h> // for time()
#include <Windows.h> // for Sleep, change it for other platform, we can use
// boost::thread::sleep, but it's too inconvenient.
typedef boost
::mutex
::scoped_lock scoped_lock
;boost
::mutex io_mutex
;class Product
{ int num
;public
: Product
(int num
) : num
(num
) {} friend std
::ostream
& operator
<< (std
::ostream
& os
, Product
& product
)
{ return os
<< product
.num
;
}
};class Mediator
{private
: boost
::condition cond
; boost
::mutex mutex
; Product
** pSlot
; // product buffer/slot
unsigned int slotCount
, // buffer size
productCount
; // current product count
bool stopFlag
; // should all thread stop or not
public
: Mediator
(const int slotCount
) : slotCount
(slotCount
), stopFlag
(false), productCount
(0
)
{ pSlot
= new Product
*[slotCount
];
} virtual
~Mediator
()
{ for
(int i
= 0
; i
< static_cast
<int
>(productCount
); i
++)
{ delete pSlot
[i
];
} delete
[] pSlot
;
} bool Stop
() const
{ return stopFlag
; } void Stop
(bool
) { stopFlag
= true; } void NotifyAll
() // notify all blocked thread to exit
{ cond
.notify_all
();
} bool Put
( Product
* pProduct
)
{ scoped_lock lock
(mutex
); if
(productCount
== slotCount
)
{
{ scoped_lock lock
(io_mutex
); std
::cout
<< "Buffer is full. Waiting..."
<< std
::endl
;
} while
(!stopFlag
&& (productCount
== slotCount
)) cond
.wait
(lock
);
} if
(stopFlag
) // it may be notified by main thread to quit.
return
false; pSlot
[ productCount
++ ] = pProduct
; cond
.notify_one
(); // this call may cause *pProduct to be changed if it wakes up a consumer
return
true;
} bool Get
(Product
** ppProduct
)
{ scoped_lock lock
(mutex
); if
(productCount
== 0
)
{
{ scoped_lock lock
(io_mutex
); std
::cout
<< "Buffer is empty. Waiting..."
<< std
::endl
;
} while
(!stopFlag
&& (productCount
== 0
)) cond
.wait
(lock
);
} if
(stopFlag
) // it may be notified by main thread to quit.
{
*ppProduct
= NULL
; return
false;
}
*ppProduct
= pSlot
[--productCount
]; cond
.notify_one
(); return
true;
}
};class Producer
{private
: Mediator
* pMediator
; static unsigned int num
; unsigned int id
; // Producer id
public
: Producer
(Mediator
* pMediator
) : pMediator
(pMediator
) { id
= num
++; } void operator
() ()
{ Product
* pProduct
; srand
( (unsigned
)time
( NULL
) + id
); // each thread need to srand differently
while
(!pMediator
->Stop
())
{ pProduct
= new Product
( rand
() % 100
); // must print product info before call Put, as Put may wake up a consumer
// and cause *pProuct to be changed
{ scoped_lock lock
(io_mutex
); std
::cout
<< "Producer["
<< id
<< "] produces Product["
<< *pProduct
<< "]"
<< std
::endl
;
} if
(!pMediator
->Put
(pProduct
)) // this function only fails when it is notified by main thread to exit
delete pProduct
; Sleep
(100
);
}
}
};unsigned int Producer
::num
= 1
;class Consumer
{private
: Mediator
* pMediator
; static unsigned int num
; unsigned int id
; // Consumer id
public
: Consumer
(Mediator
* pMediator
) : pMediator
(pMediator
) { id
= num
++; } void operator
() ()
{ Product
* pProduct
= NULL
; while
(!pMediator
->Stop
())
{ if
(pMediator
->Get
(&pProduct
))
{ scoped_lock lock
(io_mutex
); std
::cout
<< "Consumer["
<< id
<< "] is consuming Product["
<< *pProduct
<< "]"
<< std
::endl
; delete pProduct
;
} Sleep
(100
);
}
}
};unsigned int Consumer
::num
= 1
;int main
()
{ Mediator mediator
(2
); // we have only 2 slot to put products
// we have 2 producers
Producer producer1
(&mediator
); boost
::thread thrd1
(producer1
); Producer producer2
(&mediator
); boost
::thread thrd2
(producer2
); // and we have 3 consumers
Consumer consumer1
(&mediator
); boost
::thread thrd3
(consumer1
); Consumer consumer2
(&mediator
); boost
::thread thrd4
(consumer2
); Consumer consumer3
(&mediator
); boost
::thread thrd5
(consumer3
); // wait 1 second
Sleep
(1000
); // and then try to stop all threads
mediator
.Stop
(true); mediator
.NotifyAll
(); // wait for all threads to exit
thrd1
.join
(); thrd2
.join
(); thrd3
.join
(); thrd4
.join
(); thrd5
.join
(); return 0
;
}barrier
barrier类的接口定义如下:
class barrier
: private boost
::noncopyable // Exposition only
{public
: // construct/copy/destruct
barrier
(size_t n
);
~barrier
(); // waiting
bool wait
();
};barrier类为我们提供了这样一种控制线程同步的机制:
前n
- 1次调用wait函数将被阻塞,直到第n次调用wait函数,而此后第n
+ 1次到第2n
- 1次调用wait也会被阻塞,直到第2n次调用,依次类推。
barrier
::wait的实现十分简单:
barrier
::barrier
(unsigned int count
)
: m_threshold
(count
), m_count
(count
), m_generation
(0
)
{ if
(count
== 0
) throw std
::invalid_argument
("count cannot be zero."
);
}bool barrier
::wait
()
{ boost
::mutex
::scoped_lock lock
(m_mutex
); // m_mutex is the base of barrier and is initilized by it's default constructor.
unsigned int gen
= m_generation
; // m_generation will be 0 for call 1~n-1, and 1 for n~2n - 1, and so on...
if
(--m_count
== 0
)
{ m_generation
++; // cause m_generation to be changed in call n/2n/...
m_count
= m_threshold
; // reset count
m_cond
.notify_all
(); // wake up all thread waiting here
return
true;
} while
(gen
== m_generation
) // if m_generation is not changed, lock current thread.
m_cond
.wait
(lock
); return
false;
}因此,说白了也不过是mutex的一个简单应用。
以下是一个使用barrier的例子:
#include <boost/thread/thread.hpp>
#include <boost/thread/barrier.hpp>
int i
= 0
;boost
::barrier barr
(3
); // call barr.wait 3 * n times will release all threads in waiting
void thread
()
{
++i
; barr
.wait
();
}int main
()
{ boost
::thread thrd1
(&thread
); boost
::thread thrd2
(&thread
); boost
::thread thrd3
(&thread
); thrd1
.join
(); thrd2
.join
(); thrd3
.join
(); return 0
;
}如果去掉其中thrd3相关的代码,将使得线程1、2一直处于wait状态,进而使得主线程无法退出。
xtime
xtime是boost
::thread中用来表示时间的一个辅助类,它是一个仅包含两个成员变量的结构体:
struct xtime
{//...
xtime_sec_t sec
; xtime_nsec_t nsec
;
};condition
::timed_wait、thread
::sleep等涉及超时的函数需要用到xtime。
需要注意的是,xtime表示的不是一个时间间隔,而是一个时间点,因此使用起来很不方便。为了方便使用xtime,boost提供了一些辅助的xtime操作函数,如xtime_get、xtime_cmp等。
以下是一个使用xtime来执行sleep的例子(跟简单的一句Sleep比起来,实在是太复杂了),其中用到了xtime初始化函数xtime_get:
#include <boost/thread/thread.hpp>
#include <boost/thread/xtime.hpp>
#include <iostream>
int main
()
{ boost
::xtime xt
; boost
::xtime_get
(&xt
, boost
::TIME_UTC
); // initialize xt with current time
xt
.sec
+= 1
; // change xt to next second
boost
::thread
::sleep
(xt
); // do sleep
std
::cout
<< "1 second sleep over."
<< std
::endl
; return 0
;
}
本文地址:http://com.8s8s.com/it/it21795.htm