1

EM算法及简单例子python实现

 2 years ago
source link: https://cfonheart.github.io/2018/09/18/EM%E7%AE%97%E6%B3%95%E5%8F%8A%E7%AE%80%E5%8D%95%E4%BE%8B%E5%AD%90python%E5%AE%9E%E7%8E%B0/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

扔硬币问题em实现(二项分布)

具体的算法介绍不赘述了,看看网上的资料就可以了解大概意思了。

这里主要是两份代码的实现。一份是参考:https://blog.csdn.net/LilyNothing/article/details/64443563

扔硬币问题的极大似然地实现

问题描述:

定义问题: 假设用两种不同的硬币采样做了5次试验,每次试验丢10次,正反面结果表示在observations,
来判断这两个硬币正反面的概率分布(默认是二项分布,所以参数只有一个theta)
#硬币投掷结果
observations = np.array([[1,0,0,0,1,1,0,1,0,1],
[1,1,1,1,0,1,1,1,0,1],
[1,0,1,1,1,1,1,0,1,1],
[1,0,1,0,0,0,1,1,0,0],
[0,1,1,1,0,1,1,1,0,1]])
# reference: https://blog.csdn.net/LilyNothing/article/details/64443563
# 定义问题: 假设用两种不同的硬币采样做了5次试验,每次试验丢10次,正反面结果表示在observations,
# 来判断这两个硬币正反面的概率分布(默认是二项分布,所以参数只有一个theta)
# coding=utf-8
import numpy as np
import scipy
from scipy.stats import binom
def em_single(observations, priors):
EM算法的单次迭代
Arguments
------------
priors:[theta_A,theta_B]
observation:[m X n matrix]
Returns
---------------
new_priors:[new_theta_A,new_theta_B]
:param priors:
:param observations:
:return:
counts = {'A': {'H': 0, 'T': 0}, 'B': {'H': 0, 'T': 0}}
theta_A = priors[0]
theta_B = priors[1]
# E step
for observation in observations:
len_observation = len(observation)
num_heads = observation.sum() # 正面次数
num_tails = len_observation - num_heads # 反面次数
# 扔硬币服从二项分布,二项分布求解概率
contribution_A = binom.pmf(num_heads, len_observation, theta_A)
contribution_B = binom.pmf(num_heads, len_observation, theta_B)
weight_A = contribution_A / (contribution_A+contribution_B)
weight_B = contribution_B / (contribution_A+contribution_B)
# 更新在当前参数下A,B硬币产生的正反面次数
counts['A']['H'] += weight_A * num_heads
counts['A']['T'] += weight_A * num_tails
counts['B']['H'] += weight_B * num_heads
counts['B']['T'] += weight_B * num_tails
# M step
new_theta_A = counts['A']['H'] / (counts['A']['H'] + counts['A']['T'])
new_theta_B = counts['B']['H'] / (counts['B']['H'] + counts['B']['T'])
return [new_theta_A, new_theta_B]
def em(observations,prior,tol = 1e-6,iterations=10000):
:param observations :观测数据
:param prior:模型初值
:param tol:迭代结束阈值
:param iterations:最大迭代次数
:return:局部最优的模型参数
for i in range(iterations):
new_prior = em_single(observations, prior)
if abs(np.array(new_prior)-np.array(prior)).sum() < tol:
return [new_prior, i+1]
prior = new_prior.copy()
return [prior, iterations]
#硬币投掷结果
observations = np.array([[1,0,0,0,1,1,0,1,0,1],
[1,1,1,1,0,1,1,1,0,1],
[1,0,1,1,1,1,1,0,1,1],
[1,0,1,0,0,0,1,1,0,0],
[0,1,1,1,0,1,1,1,0,1]])
# 1. 参数初始化
theta_0 = 0.3
theta_1 = 0.9
print(em(observations, [theta_0, theta_1]))

学生身高采样em算法实现(高斯分布)

问题描述:

从学校中抽取了n个人的身高,在统计的时候不知道哪些身高是女生还是男生的,只知道男生女生各自的身高都服从高斯分布。需要根据采样结果估计男生女生的高斯分布参数

em算法流程

  1. 初始化男生的高斯分布参数(loc1, scale1), 女生的高斯分布参数(loc2, scale2) 【loc: 均值, scale: 标准差】
  2. 根据当前的高斯分布参数来求解这些身高属于各自分布的概率序列 menpro_list , womenpro_list,保证任意 i , mepro_list[i]+womenpro_list[i] = 1
  3. 根据这些身高所属分布的概率通过极大似然求解更优的高斯分布参数
  4. 如果新的参数与上一步的参数相差在tol的可忍受范围内,就确定这是最优解,否则重复2,3步,至多重复比如1000次。

该算法的实现过程中,数据是我手动高斯采样的结果,我以男生N~(1.70, 0.05^2) , 女生 N~(1.60, 0.1^2)各自采样了500人。

得到的身高分布如图所示

最后的运行结果基本和抽样采取的参数一致:

([[1.7028865487747595, 0.04686573049257553], [1.5892580182749219, 0.09871931155001457]], 40)
# coding=utf-8
from scipy.stats import norm
import numpy as np
import matplotlib.pyplot as plt
# section 1 生成男生和女生的数据各500人
men = np.random.normal(1.70, 0.05, 500)
women = np.random.normal(1.60, 0.1, 500)
plt.figure(21)
ax1 = plt.subplot(211)
ax2 = plt.subplot(212)
ax1.set_title('distribution of men')
ax1.hist(men, bins=60, histtype="stepfilled",normed=True,alpha=0.6, color='r')
ax2.set_title('distribution of women')
ax2.hist(women, bins=60, histtype="stepfilled",normed=True,alpha=0.6)
plt.savefig('distribution.png')
plt.show()
# 通过em算法来计算男女生的高斯分布参数
def em_single(observations, param_a, param_b):
:param observations: 抽样得到的身高序列
:param param_a: 男生分布的参数(loc, scale)
:param param_b: 女生分布的参数(loc, scale)
:return:
menpro_list = [] # E 步估计应该作为该位置男生的身高的概率
womenpro_list = [] # E 步估计应该作为该位置女生的身高的概率
for i, v in enumerate(observations):
p1 = norm.pdf(v, loc=param_a[0], scale=param_a[1])
p2 = norm.pdf(v, loc=param_b[0], scale=param_b[1])
# print(p1, p2)
menpro_list.append(p1/(p1+p2))
womenpro_list.append(p2/(p1+p2))
sum1 = 0 ; sum2 = 0
for i, v in enumerate(observations):
sum1 += menpro_list[i]*v
sum2 += womenpro_list[i]*v
loc1 = sum1 / sum(menpro_list)
loc2 = sum2 / sum(womenpro_list)
scale1 = 0. ; scale2 = 0.
for i, v in enumerate(observations):
scale1 += menpro_list[i]*(v-loc1)*(v-loc1)
scale2 += womenpro_list[i]*(v-loc2)*(v-loc2)
# print(sum(menpro_list),sum(womenpro_list))
scale1 = np.sqrt(scale1/sum(menpro_list)) # 标准差
scale2 = np.sqrt(scale2/sum(womenpro_list)) # 标准差
print([[loc1, scale1], [loc2, scale2]])
return [[loc1, scale1], [loc2, scale2]]
def em(observations, param_a, param_b, tol = 1e-6, iterations=1000):
for iter in range(iterations):
[param_a_new, param_b_new] = em_single(observations, param_a, param_b)
if abs(param_a_new[0]-param_a[0])+abs(param_a_new[1]-param_a[1]) < tol \
and abs(param_b_new[0]-param_b[0])+abs(param_b_new[1]-param_b[1]) < tol:
return ([param_a_new, param_b_new], iter+1)
param_a = param_a_new.copy()
param_b = param_b_new.copy()
return ([param_a, param_b], iterations)
observations = []
for v in men:
observations.append(v)
for v in women:
observations.append(v)
param_a = [1.7, 1]
param_b = [1.4, 1]
print(em(observations, param_a, param_b))

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK