5

[ Perl ] 多线程并发编程 - YEUNGCHIE

 3 years ago
source link: https://www.cnblogs.com/yeungchie/p/16147011.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

[ Perl ] 多线程并发编程

https://www.cnblogs.com/yeungchie/

记录一些常用的 模块 / 方法 。

使用模块 threads

use 5.010;
use threads;

sub func {
  my $id = shift;
  sleep 1;
  print "This is thread - $id\n";
}
  • new
sub start {
  my $id = shift;
  my $t = new threads \&func, $id;
  return $t;
}
  • create
sub start {
  my $id = shift;
  my $t = new threads \&func, $id;
  return $t;
}
  • async
sub start {
  my $id = shift;
  my $t = async { &func( $id ) };
  return $t;
}
  • 阻塞 join
&start( 'join' )->join;
say 'Done';

This is thread - join
Done
# 父线程被子线程阻塞,成功收尸。

  • 非阻塞 detach
&start( 'detach' )->detach;
say 'Done';

Done
# 由于非阻塞,父线程已经退出,子线程变成孤儿线程,无法收尸。

使用模块 threads::shared

use threads::shared;

标记共享变量

有几种不同的写法

  • 依次标记 :shared
my $scalar :shared;
my @array  :shared;
my %hash   :shared;
  • 批量标记 :shared
my ( $scalar, @array, %hash ) :shared;
  • 用函数标记 share()
my ( $scalar, @array, %hash );
share $scalar;
share @array;
share %hash;

克隆 shared_clone

向共享的变量中加入新的元素时,需要注意的地方。

my @newArray = qw( YEUNG CHIE 1 2 3 );
my $clone = shared_clone [@newArray];
push @array, $clone;
$hash{ keyName } = $clone;

lock

多个线程同时编辑一个共享变量时,需要注意的地方。

经典的取钱问题:
1 - 输出额度 $amount = 500
2 - func() 函数模拟取钱,每次取 300
3 - 当 $amount < 300 时,则无法取钱

  • 没加锁的情况
my $amount :shared = 500;

sub func {
    unless ( $amount < 300 ) {
        sleep 1;  # 睡眠一秒模拟延迟
        $amount -= 300;
    }
}

# 这里两个线程模拟,两次取钱同时进行
my $t1 = new threads \&func;
my $t2 = new threads \&func;
$t1->join;
$t2->join;

say $amount;

-100
# 结果被取了两次,剩余额度为 -100

  • 加了锁的情况

调整一下子函数 func(), 加个锁。

...
sub func {
    lock $amount;
    unless ( $amount < 300 ) {
        sleep 1;
        $amount -= 300;
    }
}
...

200
# 结果正确

使用模块 Thread::Queue

use Thread::Queue;
my $queue = new Thread::Queue;

入队 enqueue

my $var = 'YEUNG';
$queue->enqueue( $var );
$queue->enqueue( qw( CHIE 1 2 3 ) );

出队 dequeue

  • 默认出队一个项目
say $queue->dequeue;

YEUNG

  • 指定多个项目出队
say for $queue->dequeue( 3 );

CHIE
1
2

非阻塞出队 dequeue_nb

  • 如果是阻塞出队
my $queue = new Thread::Queue qw( YEUNG CHIE );
say while $_ = $queue->dequeue;

YEUNG
CHIE
# 程序会卡在这里,等待队列中新的项目加入

  • 使用非阻塞出队
my $queue = new Thread::Queue qw( YEUNG CHIE );
say while $_ = $queue->dequeue_nb;

YEUNG
CHIE

剩余 pending

pending 方法可以返回未出队的项目数量。

my $queue = new Thread::Queue qw( YEUNG CHIE );
say $queue->dequeue;
say $queue->pending;
say $queue->dequeue;
say $queue->pending;

YEUNG
1
CHIE
0

查看 peek

只是看看但是不出队。

my $queue = new Thread::Queue qw( YEUNG CHIE );
say $queue->peek;
say $queue->pending;
say $queue->peek( 2 );
say $queue->pending;

YEUNG
2
CHIE
2

入队结束 end

除了上面用 dequeue_nb 非阻塞出队,之外还可以用 end 方法来

my $queue = new Thread::Queue qw( YEUNG CHIE );
$queue->end;
say while $_ = $queue->dequeue;

# 这样虽然没有用 dequeue_nb 方法,程序也不会卡住了。

不过这个方法需要模块版本 >= 3.01,一般系统自带 Perl 是不支持的,但是我们也可以自己来实现这个效果:

  • 共享一个全局变量标记入队结束。

    my $endFlag :shared;
    
  • 生产者线程

    当入队结束时,$endFlag 赋值为真。

    $endFlag = 1;
    
  • 消费者线程

    循环操作非阻塞出队。

    while ( 1 ) {
        my $item = $queue->dequeue_nb;
        if ( defined $item ) {
            say $item;
        }
        else {
            # 当出队失败且入队结束时,退出循环
            last if $endFlag;
        }
    }
    

线程信号量

使用模块 Thread::Semaphore

use Thread::Semaphore;

使用模块 Thread::Pool

use Thread::Pool;

参考资料/拓展


Recommend

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK