4

Python:使用threading模块实现多线程编程

 2 years ago
source link: https://www.hi-roy.com/posts/python%E4%BD%BF%E7%94%A8threading%E6%A8%A1%E5%9D%97%E5%AE%9E%E7%8E%B0%E5%A4%9A%E7%BA%BF%E7%A8%8B%E7%BC%96%E7%A8%8B/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Python:使用threading模块实现多线程编程

2013-11-13

原文链接找不到了,故整理格式后记录于此。

    '''''  
    Created on 2012-9-7  
    @author:  walfred
    @module: thread.ThreadTest3  
    @description:
    '''    
    import threading  
    class MyThread(threading.Thread):  
        def __init__(self):  
            threading.Thread.__init__(self)  
        def run(self):  
            print "I am %s" % (self.name)  
    if __name__ == "__main__":  
        for i in range(0, 5):  
            my_thread = MyThread()  
            my_thread.start()

name相关

你可以为每一个thread指定name,默认的是Thread-No形式的,如上述实例代码打印出的一样:

    I am Thread-1
    I am Thread-2
    I am Thread-3
    I am Thread-4
    I am Thread-5

当然你可以指定每一个thread的name,这个通过setName方法,代码:

def __init__(self):  
    threading.Thread.__init__(self)  
    self.setName("new" + self.name)

join方法

join方法原型如下,这个方法是用来阻塞当前上下文,直至该线程运行结束。

setDaemon方法

当我们在程序运行中,执行一个主线程,如果主线程又创建一个子线程,主线程和子线程就分兵两路,当主线程完成想退出时,会检验子线 程是否完成。如果子线程未完成,则主线程会等待子线程完成后再退出。但是有时候我们需要的是,只要主线程完成了,不管子线程是否完成,都要和主线程一起退 出,这时就可以用setDaemon方法,并设置其参数为True。

使用Lock互斥锁

假设各个线程需要访问同一公共资源,我们的代码该怎么写?

    '''''
    Created on 2012-9-8
    @author: walfred
    @module: thread.ThreadTest3
    '''  
    import threading  
    import time  
    counter = 0  
    class MyThread(threading.Thread):  
        def __init__(self):  
            threading.Thread.__init__(self)  
        def run(self):  
            global counter  
            time.sleep(1);  
            counter += 1  
            print "I am %s, set counter:%s" % (self.name, counter)  
    if __name__ == "__main__":  
        for i in range(0, 200):  
            my_thread = MyThread()  
            my_thread.start()

解决上面的问题,我们兴许会写出这样的代码,我们假设跑200个线程,但是这200个线程都会去访问counter这个公共资源,并对该资源进行处理(counter += 1),代码看起来就是这个样了,但是我们看下运行结果:

I am Thread-69, set counter:64I am Thread-73, set counter:66I am Thread-74, set counter:67I am Thread-75, set counter:68
I am Thread-76, set counter:69I am Thread-78, set counter:70I am Thread-77, set counter:71I am Thread-58, set counter:72
I am Thread-60, set counter:73I am Thread-62, set counter:74I am Thread-66, set counter:75I am Thread-70, set counter:76
I am Thread-72, set counter:77I am Thread-79, set counter:78I am Thread-71, set counter:78

打印结果我只贴了一部分,从中我们已经看出了这个全局资源(counter)被抢占的情况,问题产生的原因就是没有控制多个线程对同一资源的访问, 对数据造成破坏,使得线程运行的结果不可预期。这种现象称为“线程不安全”。在开发过程中我们必须要避免这种情况,那怎么避免?这就用到了我们在综述中提 到的互斥锁了。

互斥锁概念

Python编程中,引入了对象互斥锁的概念,来保证共享数据操作的完整性。每个对象都对应于一个可称为" 互斥锁" 的标记,这个标记用来保证在任一时刻,只能有一个线程访问该对象。在Python中我们使用threading模块提供的Lock类。 我们对上面的程序进行整改,为此我们需要添加一个互斥锁变量mutex = threading.Lock(),然后在争夺资源的时候之前我们会先抢占这把锁mutex.acquire(),对资源使用完成之后我们在释放这把锁 mutex.release()。代码如下:

    '''
    Created on 2012-9-8
    @author: walfred
    @module: thread.ThreadTest4
    '''  
    import threading  
    import time  
    counter = 0  
    mutex = threading.Lock()  
    class MyThread(threading.Thread):  
        def __init__(self):  
            threading.Thread.__init__(self)  
        def run(self):  
            global counter, mutex  
            time.sleep(1);  
            if mutex.acquire():  
                counter += 1  
                print "I am %s, set counter:%s" % (self.name, counter)  
                mutex.release()  
    if __name__ == "__main__":  
        for i in range(0, 100):  
            my_thread = MyThread()  
            my_thread.start()

当一个线程调用Lock对象的acquire()方法获得锁时,这把锁就进入“locked”状态。因为每次只有一个线程1可以获 得锁,所以如果此时另一个线程2试图获得这个锁,该线程2就会变为“blo同步阻塞状态。直到拥有锁的线程1调用锁的release()方法释放锁之后, 该锁进入“unlocked”状态。线程调度程序从处于同步阻塞状态的线程中选择一个来获得锁,并使得该线程进入运行(running)状态。 进一步考虑 通过对公共资源使用互斥锁,这样就简单的到达了我们的目的,但是如果我们又遇到下面的情况:

  1. 遇到锁嵌套的情况该怎么办,这个嵌套是指当我一个线程在获取临界资源时,又需要再次获取;
  2. 如果有多个公共资源,在线程间共享多个资源的时候,如果两个线程分别占有一部分资源并且同时等待对方的资源;

上述这两种情况会直接造成程序挂起,即死锁,下面我们会谈死锁及可重入锁RLock。

死锁的形成

如果有多个公共资源,在线程间共享多个资源的时候,如果两个线程分别占有一部分资源并且同时等待对方的资源,这会引起什么问题?

所谓死锁: 是指两个或两个以上的进程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生 了死锁,这些永远在互相等待的进程称为死锁进程。 由于资源占用是互斥的,当某个进程提出申请资源后,使得有关进程在无外力协助下,永远分配不到必需的资源而无法继续运行,这就产生了一种特殊现象死锁。

    '''''
    Created on 2012-9-8
    @author: walfred
    @module: thread.TreadTest5
    '''   
    import threading  
    counterA = 0  
    counterB = 0  
    mutexA = threading.Lock()  
    mutexB = threading.Lock()  
    class MyThread(threading.Thread):  
        def __init__(self):  
            threading.Thread.__init__(self)  
        def run(self):  
            self.fun1()  
            self.fun2()  
        def fun1(self):  
            global mutexA, mutexB  
            if mutexA.acquire():  
                print "I am %s , get res: %s" %(self.name, "ResA")  
                if mutexB.acquire():  
                    print "I am %s , get res: %s" %(self.name, "ResB")  
                    mutexB.release()  
            mutexA.release()   
        def fun2(self):  
            global mutexA, mutexB  
            if mutexB.acquire():  
                print "I am %s , get res: %s" %(self.name, "ResB")  
                if mutexA.acquire():  
                    print "I am %s , get res: %s" %(self.name, "ResA")  
                    mutexA.release()  
            mutexB.release()   
    if __name__ == "__main__":  
        for i in range(0, 100):  
            my_thread = MyThread()  
            my_thread.start()

代码中展示了一个线程的两个功能函数分别在获取了一个竞争资源之后再次获取另外的竞争资源,我们看运行结果:

I am Thread-1 , get res: ResA
I am Thread-1 , get res: ResB
I am Thread-2 , get res: ResAI am Thread-1 , get res: ResB

可以看到,程序已经挂起在那儿了,这种现象我们就称之为”死锁“。

避免死锁主要方法就是:正确有序的分配资源,避免死锁算法中最有代表性的算法是Dijkstra E.W 于1968年提出的银行家算法

 可重入锁RLock

考虑这种情况:如果一个线程遇到锁嵌套的情况该怎么办,这个嵌套是指当我一个线程在获取临界资源时,又需要再次获取。

根据这种情况,代码如下:

    '''''
    Created on 2012-9-8
    @author: walfred
    @module: thread.ThreadTest6
    '''  
    import threading  
    import time  
    counter = 0  
    mutex = threading.Lock()  
    class MyThread(threading.Thread):  
        def __init__(self):  
            threading.Thread.__init__(self)  
        def run(self):  
            global counter, mutex  
            time.sleep(1);  
            if mutex.acquire():  
                counter += 1  
                print "I am %s, set counter:%s" % (self.name, counter)  
                if mutex.acquire():  
                    counter += 1  
                    print "I am %s, set counter:%s" % (self.name, counter)  
                    mutex.release()  
                mutex.release()  
    if __name__ == "__main__":  
        for i in range(0, 200):  
            my_thread = MyThread()  
            my_thread.start()

这种情况的代码运行情况如下:

I am Thread-1, set counter:1

之后就直接挂起了,这种情况形成了最简单的死锁。

那有没有一种情况可以在某一个线程使用互斥锁访问某一个竞争资源时,可以再次获取呢?在Python中为了支持在同一线程中多次请求同一资源,python提供了“可重入锁”:threading.RLock。这个RLock内部维护着一个Lock和一个counter变量,counter 记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。上 面的例子如果使用RLock代替Lock,则不会发生死锁:mutex = threading.RLock()

 使用Condition实现复杂同步

目前我们已经会使用Lock去对公共资源进行互斥访问了,也探讨了同一线程可以使用RLock去重入锁,但是尽管如此我们只不过才处理了一些程序中简单的同步现象,我们甚至还不能很合理的去解决使用Lock锁带来的死锁问题。所以我们得学会使用更深层的解决同步问题。

Python提供的Condition对象提供了对复杂线程同步问题的支持。Condition被称为条件变量,除了提供与Lock类似的acquire和release方法外,还提供了wait和notify方法。

使用Condition的主要方式为:线程首先acquire一个条件变量,然后判断一些条件。如果条件不满足则wait;如果条件满足,进行一些处理改 变条件后,通过notify方法通知其他线程,其他处于wait状态的线程接到通知后会重新判断条件。不断的重复这一过程,从而解决复杂的同步问题。

下面我们通过很著名的“生产者-消费者”模型来来演示下,在Python中使用Condition实现复杂同步。

    '''''
    Created on 2012-9-8
    @author: walfred
    @module: thread.TreadTest7
    '''  
    import threading  
    import time  
    condition = threading.Condition()  
    products = 0  
    class Producer(threading.Thread):  
        def __init__(self):  
            threading.Thread.__init__(self)  
        def run(self):  
            global condition, products  
            while True:  
                if condition.acquire():  
                    if products < 10:  
                        products += 1;  
                        print "Producer(%s):deliver one, now products:%s" %(self.name, products)  
                        condition.notify()  
                    else:  
                        print "Producer(%s):already 10, stop deliver, now products:%s" %(self.name, products)  
                        condition.wait();  
                    condition.release()  
                    time.sleep(2)  
    class Consumer(threading.Thread):  
        def __init__(self):  
            threading.Thread.__init__(self)  
        def run(self):  
            global condition, products  
            while True:  
                if condition.acquire():  
                    if products > 1:  
                        products -= 1  
                        print "Consumer(%s):consume one, now products:%s" %(self.name, products)  
                        condition.notify()  
                    else:  
                        print "Consumer(%s):only 1, stop consume, products:%s" %(self.name, products)  
                        condition.wait();  
                    condition.release()  
                    time.sleep(2)  
    if __name__ == "__main__":  
        for p in range(0, 2):  
            p = Producer()  
            p.start()  
        for c in range(0, 10):  
            c = Consumer()  
            c.start()

代码中主要实现了生产者和消费者线程,双方将会围绕products来产生同步问题,首先是2个生成者生产products ,而接下来的10个消费者将会消耗products,代码运行如下:

Producer(Thread-1):deliver one, now products:1
Producer(Thread-2):deliver one, now products:2
Consumer(Thread-3):consume one, now products:1
Consumer(Thread-4):only 1, stop consume, products:1
Consumer(Thread-5):only 1, stop consume, products:1
Consumer(Thread-6):only 1, stop consume, products:1
Consumer(Thread-7):only 1, stop consume, products:1
Consumer(Thread-8):only 1, stop consume, products:1
Consumer(Thread-10):only 1, stop consume, products:1
Consumer(Thread-9):only 1, stop consume, products:1
Consumer(Thread-12):only 1, stop consume, products:1
Consumer(Thread-11):only 1, stop consume, products:1

另外:Condition对象的构造函数可以接受一个Lock/RLock对象作为参数,如果没有指定,则Condition对象会在内部自行创建一个 RLock;除了notify方法外,Condition对象还提供了notifyAll方法,可以通知waiting池中的所有线程尝试acquire 内部锁。由于上述机制,处于waiting状态的线程只能通过notify方法唤醒,所以notifyAll的作用在于防止有线程永远处于沉默状态。

使用Event实现线程间通信

使用threading.Event可以实现线程间相互通信,之前我们已经初步实现了线程间通信的基本功能,但是更为通用的一种做法是使用threading.Event对象。

使用threading.Event可以使一个线程等待其他线程的通知,我们把这个Event传递到线程对象中,Event默认内置了一个标志,初始值为 False。一旦该线程通过wait()方法进入等待状态,直到另一个线程调用该Event的set()方法将内置标志设置为True时,该Event会 通知所有等待状态的线程恢复运行。

    '''''
    Created on 2012-9-9
    @author: walfred
    @module: thread.TreadTest8
    '''  
    import threading  
    import time  
    class MyThread(threading.Thread):  
        def __init__(self, signal):  
            threading.Thread.__init__(self)  
            self.singal = signal  
        def run(self):  
            print "I am %s,I will sleep ..."%self.name  
            self.singal.wait()  
            print "I am %s, I awake..." %self.name  
    if __name__ == "__main__":  
        singal = threading.Event()  
        for t in range(0, 3):  
            thread = MyThread(singal)  
            thread.start()  
        print "main thread sleep 3 seconds... "  
        time.sleep(3)  
        singal.set()

运行效果如下:

I am Thread-1,I will sleep ...
I am Thread-2,I will sleep ...
I am Thread-3,I will sleep ...
main thread sleep 3 seconds...
I am Thread-1, I awake...I am Thread-2, I awake...
I am Thread-3, I awake...

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK