22

如何快速的随机从 map 中返回一个值

 3 years ago
source link: https://studygolang.com/articles/28467
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

从一个切片中随机的获取一个值是非常简单的,可以使用 map.Int(n) ,这样就可以从[0,n)中随机的返回一个值,从而可以从切片中随机的返回一个元素

func randSliceValue(xs []string) string {
	return xs[rand.Intn(len(xs))]
}
复制代码

这个方法是非常好的,因为耗费的时间和内存都是 O(1) 的。但是呢,对于 map 而言,没有简单并且等效的方式来做到随机从 map 中获取一个值。我们获取 map 中的数据有两种方式,取值(e.g. m["foo"])或者 range 。所以,如何根据这两种方式和一个随机的值来在 map 中随机的返回一个 key 呢?(注意,既然可以随机的获取一个 key,那么根据此 key 获得的 value 也相当于是随机获得的了)

一种方式是我们可以把 map 展开,然后复用切片中随机获取一个值的方法

func randMapKey(m map[string]int) string {
	mapKeys = make([]string, 0, len(m)) // pre-allocate exact size
	for key := range m {
		mapKeys = append(mapKeys, key)
	}
	return mapKeys[rand.Intn(len(mapKeys))]
}
复制代码

很容易就可以看出来这个确实可以随机的返回 key,但是这种简单的代价却是以性能为代价:时间和空间复杂度均是 O(n)

一个表现稍微好的一个方式是使用 rangerange 会访问每对 key/value 一次,并且每次使用 range 的顺序并不会一致(译者注:在调用 mapiterinit 会取一个随机值来决定访问的起始位置)。我们可以把随机取得的值作为一个计数器,在每次遍历完一对键值对之后进行减一,最后在值为 0 的时候返回此时遍历的键即可:

func randMapKey(m map[string]int) string {
	r := rand.Intn(len(m))
	for k := range m {
		if r == 0 {
			return k
		}
		r--
	}
	panic("unreachable")
}
复制代码

这个操作的时间复杂度为 O(n) ,空间复杂度为 O(1) ,在大多数情况下都是可以接受的。但是我们会遇到一个新的问题,我们无法完成一个通用的函数来进行随机取值的操作!

第二部分:深入理解

unsafe 可以允许我们转换 go 的类型。也就是说我们可以把一个变量看做类型 X,即使它的类型原来是 Y。另外比较友好的情况是这种操作不仅仅可以用来处理 go 的内置类型(比如说 strings、slices、maps),也可以用来处理用户自定义的类型。需要注意的是这是一种双向的操作,我们不仅仅可以把 []byte 转化成其他类型,也可以把其他类型转化为 []byte 类型。

我们可以使用这种技巧来操作对象底层的内存,就如我在 一些包 中做的一样。但是呢,我并不赞同这样做。今天我们只会使用一个方向。具体点就是我们将把 map 转化为局部复制为 runtime definition .(不要关闭这个页面,后续会用到)。然后我们就可以直接的获取内部的数据,并且有希望实现一个快速的获取 map 中随机一个 key 的方法。

查看 map.go 文件,我们可以看到一种类型为 hiter 的结构体,并且有方法 mapiterinitmapinternext 。这就是当你执行 range 来遍历一个 map 进行的操作。 hiter 是一个迭代器, mapinitinit 用来初始化这个迭代器, mapiternext 用于寻找下个迭代器迭代的位置。现在我有一个计划:

  1. 把 map 转化为 hmap
  2. 使用方法 new(hiter) 来创建一个迭代器,并且使用 mapiterinit 来初始化
  3. 产生一个处于[0, len(m) ]范围内的随机数字 n
  4. 执行 mapiternext 方法 n 次
  5. 在执行到 n 为 0 的位置,返回当前的 key

这种做法的一个优势就是我们不需要理解方法 mapiterinitmapiternext 的细节。我们仅仅在 hashmap.go 中复制相关代码即可。(实际上编译器会警告缺少 runtime 相关的方法,如 atomic.Or8 ,我们把这些删除即可)。之后,我们仅仅需要完成关于转变变量类型和从 interface{} 中变换回来的代码就好了。

// runtime representation of an interface{}
type emptyInterface struct {
	typ unsafe.Pointer
	val unsafe.Pointer
}

func mapTypeAndValue(m interface{}) (*maptype, *hmap) {
	ei := (*emptyInterface)(unsafe.Pointer(&m))
	return (*maptype)(ei.typ), (*hmap)(ei.val)
}

func iterKey(it *hiter) interface{} {
	ei := emptyInterface{
		typ: unsafe.Pointer(it.t.key), // it.t is the maptype
		val: it.key,
	}
	return *(*interface{})(unsafe.Pointer(&ei))
}
复制代码

现在,我们终于可以实现一个通用的 randMapKey 方法,并且时间复杂度为 O(n) 、空间复杂度为 O(1)

func randMapKey(m interface{}) interface{} {
	// get map internals
	t, h := mapTypeAndValue(m)
	// initialize iterator
	it := new(hiter)
	mapiterinit(t, h, it)
	// advance iterator a random number of times
	r := rand.Intn(h.count) // h.count == len(m)
	for i := 0; i < r; i++ {
		mapiternext(it)
	}
	// return current iterator key as an interface{}
	return iterKey(it)
}
复制代码

这个是可以运行成功的,但是也伴随着相应的代价。一个原因就是使用了 unsafe 包,正如其名,是不安全的。

第三部分:可以的

实际上我们是可以花费常数时间(虽然这个常数时间可以有点大)复杂度来获取 map 的随机 key 的。但是呢,为了达到常数时间复杂度的地步,我们需要更深的理解 go 中的 map 是如何运作的。

hmap 实际上就是一个 bucket 数组。bucket 的数量是 2 的 h.B 次方。由于随着 map 的增长,bucket 的数量也是变化的,所以 bucket 数组在 hmap 中是用作 unsafe.Pointer 来记载的。每个 bucket 有 8 个 cell (键值对),其中有些 cell 可能没有有效的数据。并且有一个 tophash 数组,用于存储每个空间部分的 key 的 hash 或者此键值对的状态。bucket 的结构体类型可能会让人疑惑,整体如下:

const bucketCnt = 8 // number of cells per bucket

type bmap struct {
	tophash [bucketCnt]uint8
	// Followed by bucketCnt keys and then bucketCnt values.
	// NOTE: packing all the keys together and then all the values together makes the
	// code a bit more complicated than alternating key/value/key/value/... but it allows
	// us to eliminate padding which would be needed for, e.g., map[int64]int8.
	// Followed by an overflow pointer.
}
复制代码

正如如你看见的,里面并没有键值对的空间。这是因为键值对的类型并不知道,编译器不知道 bmap 的大小。如果 map 的类型是 map[string]int ,那么你可以认为 bmap 会和下方代码一样:

type bmap struct {
	tophash  [bucketCnt]uint8
	keys     [bucketCnt]string
	values   [bucketCnt]int
	overflow *bmap
}
复制代码

要插入一个 key/value ,我们首先需要计算的是 key 的 hash, 其类型是 uintptr 。(每个类型在运行时都有一个对应的 hash 函数)。然后我们需要决定使用哪个 bucket 。由于 bucket 的数量永远都是 2的幂,所以我们可以使用key的经过 hash 函数计算后的二进制数后 h.B 位来决定使用哪个bucket:

// h is an hmap, t is a maptype
hash := t.key.alg.hash(key, uintptr(h.hash0))
bucketIndex := hash & (uintptr(1) << h.B - 1)
复制代码

在选定了 bucket 之后,我们需要找到一个可用的 cell。我们遍历每个 cell,检查 tophash 数组中的每个值。 tophash 数组中存储的是存储于此 bucket 中每个键值对中键的hash运行结果的二进制数的前8位,在 tophash 中元素值为 0 的时候,表示此 cell 是空的,没有存储任何值。当发现空的 cell 的时候,我们可以用此 cell 来存键值对,并且修改 tophash 数组对应位置的值。下面就是简化后的算法:

// calculate tophash
top := uint8(hash >> (unsafe.Sizeof(hash)*8 - 8))

// seek to offset of bucketIndex in h.buckets
b := (*bmap)(unsafe.Pointer(uintptr(h.buckets) + bucketIndex*uintptr(t.bucketsize)))

// iterate through the cells of b. If a tophash matches top, it means we've
// already inserted a value with this key, so overwrite it. Otherwise, store
// the key/value in the first empty cell.
for i := 0; i < bucketCnt; i++ {
	if b.tophash[i] == top {
		// overwrite the existing value
		// [ code omitted ]
		return
	} else if b.tophash[i] == 0 {
		// insert the new key/value
		// [ code omitted ]
		b.tophash[i] = top
		h.count++
		return
	}
}
复制代码

正如你期待的那样,为了获得 map 中的值,我们仅仅是重复了此过程。但是这个过程是返回 cell 中的数据而不是往 cell 中插入或者覆盖。

好了,现在我们可以改进我们的 randMapKey 函数了。回顾从切片中随机返回一个元素的方法,仅仅获得一个随机的索引即可。如果你仔细瞅瞅,你就会发现 h.buckets 就是一个切片。是一个连续的 key/value 数组。最大的区别是一些 cell 中是空的。所以我们需要获得一个随机的索引,并且要避免这些空的 cell。

一个简单的方法就是当遍历到了空的 cell 的时候略过去就好了,直到找到一个非空的 cell。这个就是 mapinterinit 做的。但是这种方法有一个严重的缺陷,可以想想如下这种状况,一个 bucket 只有两个有效的 cell :

[foo] [bar] [---] [---] [---] [---] [---] [---]
复制代码

如果我们随机的选择 [0, 8) 中的一个索引,会发生什么?如果选择索引 0, 那么我们会获得 foo ,如果选择索引1, 我们会获得 bar 。但是如果我们选择其他索引,我们会接下来的往下寻找,直到折回到索引 0, 然后获取到元素 foo 。也就是说即使我们使用一个随机的索引,取得元素 foo 的概率也会是获得元素 bar 的 7 倍。这个对于随机的获取一个元素的目的肯定是不可行的。

幸运的是,我们有一个替代的方案:随机的获取一个索引,如果那个 cell 是空的,我们再次随机的获取一个索引即可。平均下来,我们没获取到一个非空的 cell 需要的次数为 k/n ,其中 k 是 cell 的个数,n 是非空的 cell 的个数。当然,算法必须保证随机值是均匀分布的,这样每个索引的取值都是等可能的。

让我们使用这种算法来实现 randMapKey 函数。我们需要一个函数,可以从 cell 中获取到 key 的值。我也会创建一个 add 函数,是的算法更为可读:

func add(p unsafe.Pointer, x uintptr) unsafe.Pointer {
	return unsafe.Pointer(uintptr(p) + x)
}

func cellKey(t *maptype, b *bmap, i int) interface{} {
	// dataOffset is where the cell data begins in a bmap.
	const dataOffset = unsafe.Offsetof(struct {
		tophash [bucketCnt]uint8
		cells   int64
	}{}.cells)

	k := add(unsafe.Pointer(b), dataOffset+uintptr(i)*uintptr(t.keysize))
	if t.indirectkey {
		// if the map's key type is too big, a pointer will be stored in
		// the map instead of the actual data. In that case, we need to
		// dereference the pointer.
		k = *(*unsafe.Pointer)(k)
	}

	ei := emptyInterface{
		typ: unsafe.Pointer(t.key),
		val: k,
	}
	return *(*interface{})(unsafe.Pointer(&ei))
}

func randMapKey(m interface{}) interface{} {
	// get map internals
	t, h := mapTypeAndValue(m)
	numBuckets := 1 << h.B

	// loop until we hit a valid cell
	for {
		// pick random indices
		bucketIndex := rand.Intn(numBuckets)
		cellIndex := rand.Intn(bucketCnt)

		// lookup cell
		b := (*bmap)(add(h.buckets, uintptr(bucketIndex)*uintptr(t.bucketsize)))
		if b.tophash[cellIndex] == 0 {
			// cell is empty; try again
			continue
		}
		return cellKey(t, b, cellIndex)
	}
}
复制代码

这个是可以正确的运作的 我们就该到此结束吗?

第四部分:不

其实上述的代码是有问题的。例子的代码中只有整数 [0, 8),这个是没有冲突的。如果在 map 中不同的 key 的 hash 值是相同的呢?答案:使用额外的 bucket。如果你再看看 bmap 的定义你会发现,内部含有一个 overflow 的指针类型来指向另一个 bmap 。在发生冲突的时候,会分配一个新的 bucket,挂在之前的 bmap 的后面。在查询的过程中,我们首先在 bucket 中的每个 cell 找到一个匹配的,如果没有匹配的,就会通过 overflow 指针,在下一个 bucket 中查找,如此反复循环。

这个是如何影响 randMapKey 函数的呢?实际上影响不大。我们仅需增加一个维度即可。在之前,我们是从一个随机的 bucket 中随机选择一个 cell。现在,我们需要在一个 bucket 的链表中随机选择一个 bucket,然后在 bucket 中随机选择一个 cell。map 的可视化图像如下:

bucket0   bucket1   bucket2   bucket3
overflow0 [|||||||] [|||||||] [|||||||] [|||||||]
overflow1 [|||||||]           [|||||||]
overflow2                     [|||||||]
复制代码

之前,我们仅仅从第一行的四个 bucket 中进行选择。现在我们需要在 12 个 bucket 中进行选择,即使一些 bucket 并不存在。操作和之前的遇到空的 cell 相同,如果一个 bucket 并不存在,我们从新选择一个随机值再来一次就好了。

有一点比较烦人,就是事先我们并不知道链接起来的 bucket 有多少个。 hmap 没有一个参数表示链接起来的 bucket 最多有多少个可以用于我们相乘。我们需要自己计算这个值,而这个方法是遍历每个 bucket 然后顺着 overflow 指针不断搜索直到 overflow 指针是一个 nil 。这个比较耗费时间,但是我们没有其他方法了,代码如下:

func (b *bmap) overflow(t *maptype) *bmap {
	offset := uintptr(t.bucketsize)-unsafe.Sizeof(uintptr(0))
	return *(**bmap)(add(unsafe.Pointer(b), offset))
}

func maxOverflow(t *maptype, h *hmap) int {
	numBuckets := uintptr(1 << h.B)
	max := 0
	for i := uintptr(0); i < numBuckets; i++ {
		over := 0
		b := (*bmap)(add(h.buckets, i*uintptr(t.bucketsize)))
		for b = b.overflow(t); b != nil; over++ {
			b = b.overflow(t)
		}
		if over > max {
			max = over
		}
	}
	return max
}
复制代码

现在 randMapKey 函数如下:

func randMapKey(m interface{}) interface{} {
	// get map internals
	t, h := mapTypeAndValue(m)
	numBuckets := 1 << h.B
	numOver := maxOverflow(t, h) + 1 // add 1 to account for "base" bucket

	// loop until we hit a valid cell
loop:
	for {
		// pick random indices
		bucketIndex := rand.Intn(numBuckets)
		overIndex := rand.Intn(numOver)
		cellIndex := rand.Intn(bucketCnt)

		// seek to index in h.buckets
		b := (*bmap)(add(h.buckets, uintptr(bucketIndex)*uintptr(t.bucketsize)))

		// seek to index in overflow chain
		for i := 0; i < overIndex; i++ {
			b = b.overflow(t)
			if b == nil {
				// invalid bucket; try again
				continue loop
			}
		}

		// lookup cell
		if b.tophash[cellIndex] == 0 {
			// cell is empty; try again
			continue loop
		}
		return cellKey(t, b, cellIndex)
	}
}
复制代码

我们可以确定 对于含有 overflow 的 bucket 是可以运行的 . 我在担心是否还有第五部分的内容

第五部分:当然还有第五部分

(这是最后一部分,我保证)

go 中的 map 是优化的非常好的,其中有一个优化是"incremental copying"。简单来讲就是在一个 map 满了,当你尝试插入一个新的元素的时候,go 会立刻分配一个新的 bucket 数组(数组长度是之前的两倍) 来存储新的 key/value。但是呢,他不会把旧的 bucket 复制到新的 bucket;而是每当你插入或者删除元素的时候,此 bucket(以及后续的通过 overflow 连接的 bucket) 会被复制( evacuated )到新的数组中去。在所有的 bucket 都移动完毕之后, h.oldbuckets 会被置为 nil

我确定你看到了问题所在:直到现在,我们取的 cell 的值都是来自于 h.buckets 。为了覆盖所有的可能,我们当然也需要检查 h.oldbuckets 。有三点需要我们做出改变:

  1. 当选择一个了一个 bucket 的时候,我们检查下对应的 oldbucket 是否复制过了。如果没有,我们从 oldbucket 中选择值。
  2. 当我们在 oldbucket 中选择了一个值的时候,我们需要确定此值最终需要分配到哪个 bucket 中去。如果正好是之前选择的 bucket 的时候我们返回此值即可。如果不是那么就需要从头再来了(这个为了避免重合,因为 oldbucket 迁移到会到两个可能的位置)
  3. maxOverflow 需要返回 h.bucketsh.oldbuckets 中最长的 overflow

幸运的是,这个并不难实现,首先,修改 maxOverflow 函数:

func maxOverflow(t *maptype, h *hmap) int {
	numBuckets := uintptr(1 << h.B)
	max := 0
	for i := uintptr(0); i < numBuckets; i++ {
		over := 0
		b := (*bmap)(add(h.buckets, i*uintptr(t.bucketsize)))
		for b = b.overflow(t); b != nil; over++ {
			b = b.overflow(t)
		}
		if over > max {
			max = over
		}
	}

	// check oldbuckets too, if it exists
	if h.oldbuckets != nil {
		for i := uintptr(0); i < numBuckets/2; i++ {
			var over int
			b := (*bmap)(add(h.oldbuckets, i*uintptr(t.bucketsize)))
			if evacuated(b) {
				// we already counted this bucket in the first loop
				continue
			}
			for b = b.overflow(t); b != nil; over++ {
				b = b.overflow(t)
			}
			if over > max {
				max = over
			}
		}
	}
	return max
}
复制代码

然后呢,创建最新版本的 randMapKey 函数。当我们检查未迁移的 oldbucket 的时候,设置一个表示告诉我们检测 cell 的迁移位置:

func randMapKey(m interface{}) interface{} {
	// get map internals
	t, h := mapTypeAndValue(m)
	numBuckets := uintptr(1 << h.B)
	numOver := maxOverflow(t, h) + 1 // add 1 to account for "base" bucket

	// loop until we hit a valid cell
loop:
	for {
		// pick a random index
		bucketIndex := uintptr(rand.Intn(int(numBuckets)))
		overIndex := rand.Intn(numOver)
		cellIndex := rand.Intn(bucketCnt)

		// seek to index in h.buckets
		b := (*bmap)(add(h.buckets, bucketIndex*uintptr(t.bucketsize)))

		// if the oldbucket hasn't been evacuated, then we need to use that
		// pointer instead.
		usingOldBucket := false
		if h.oldbuckets != nil {
			numOldBuckets := numBuckets / 2
			oldBucketIndex := bucketIndex & (numOldBuckets - 1)
			oldB := (*bmap)(add(h.oldbuckets, oldBucketIndex*uintptr(t.bucketsize)))
			if !evacuated(oldB) {
				b = oldB
				usingOldBucket = true
			}
		}

		// seek to index in overflow chain
		for i := 0; i < overIndex; i++ {
			b = b.overflow(t)
			if b == nil {
				// invalid bucket; try again
				continue loop
			}
		}

		// lookup cell
		if b.tophash[cellIndex] == 0 {
			// cell is empty; try again
			continue loop
		}

		// grab key and dereference if necessary (same as cellKey)
		k := add(unsafe.Pointer(b), dataOffset+uintptr(cellIndex)*uintptr(t.keysize))
		if t.indirectkey {
			k = *(*unsafe.Pointer)(k)
		}

		// if this is an old bucket, we need to check whether this key is destined
		// for the new bucket. Otherwise, we will have a 2x bias towards oldbucket
		// values, since two different bucket selections can result in the same
		// oldbucket.
		if usingOldBucket {
			hash := t.key.alg.hash(k, uintptr(h.hash0))
			if hash&(numBuckets-1) != bucketIndex {
				// this key is destined for a different bucket
				continue loop
			}
		}

		// pack key into interface{} (same as cellKey)
		ei := emptyInterface{
			typ: unsafe.Pointer(t.key),
			val: k,
		}
		return *(*interface{})(unsafe.Pointer(&ei))
	}
}
复制代码

感觉并不坏,全部情况都考虑到了!如果你没看到最初的代码地址, 那么看可以看这个完整的代码

欢迎关注我们的微信公众号,每天学习Go知识

FveQFjN.jpg!web

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK