Go Map深度解析以便实现随机获取Map值(译) 翻译

saltshaker 4月前 548

为了私利和兴趣破解Go Map

原文地址:https://lukechampine.com/hackmap.html

推荐阅读:https://www.cnblogs.com/qcrao-2018/p/10903807.html(有助于更好的理解Golang map)

上周,我不负责任的花了大量工作时间,去深入了解了一下Go map实现的奥秘。我的动机很简单:像选择随机slice值一样简单地选择随机map值为了实现这个目标,我深陷runtime包中, 最终我成功了,并对这个不起眼的map有了新的认识。

这是一篇非常长的文章,详细描述了那段旅程. 假设你对unsafe包有一定的了解,若果没有可以花5分钟读一下godoc page. 假如你不关心整个过程,你可以直接查看成品:randmap - Truly random map access and iteration for Go

1: 问题

正如你所知道的,选择一个随机值从slice中是很容易的,通过 Intn (从 math/rand包中),它返回一个范围为[0,n]的随机整数:
func randSliceValue(xs []string) string {
        return xs[rand.Intn(len(xs))]
}
这很好,因为它在恒定的时间和空间中运行。但是map没有明显的对应关系,这是因为我们有两种方式访问map的数据: 查找(例如 m["foo"] range 所以,只给定这些访问方法和一个随机整数,我们如何选择一个随机Key? (注意,如果我们可以选择一个随机map的key,我们就可以简单地选择一个随机map的value。)
一种方式可以循环map; 把key放到slice中:
func randMapKey(m map[string]int) string {
        mapKeys = make([]string, 0, len(m)) // pre-allocate exact size
        for key := range m {
                mapKeys = append(mapKeys, key)
        }
        return mapKeys[rand.Intn(len(mapKeys))]
}
很容易验证这段代码确实会产生随机key。 虽然简单但是有代价:O(n) 的时间复杂度 和 O(n) 空间复杂度。

稍微好一点的方法依赖于range 不指定顺序的情况下range 将只访问一次 key/value。  知道了这一点,我们可以使用我们的随机整数作为计数器,在每次迭代之后递减:

func randMapKey(m map[string]int) string {
        r := rand.Intn(len(m))
        for k := range m {
                if r == 0 {
                        return k
                }
                r--
        }
        panic("unreachable")
}
这个程序运行的时间复杂度为O(n) 空间复杂度 O(1) ,这是可以接受的。 但是有个新的问题:这个函数不通用! 对于“扁平化”方法,我们可以使用反射使其通用化,感谢 MapKeys 方法:
func randMapKey(m interface{}) interface{} {
        mapKeys := reflect.ValueOf(m).MapKeys()
        return mapKeys[rand.Intn(len(mapKeys))].Interface()
但是反射并没有为我们提供一个通用的 range 因此,我们不能以通用的方式实现另一种“迭代”方法。
除非…
除非我们使用 unsafe

2: 深陷其中

除此之外, unsafe 允许我们跳过 Go的类型系统。也就是说,它允许我们将类型X的值看作实际上是类型Y的值 最美妙的是,这不仅适用于用户定义的类型,也适用于Go的内置类型(如string、slice和map)。 注意,这是双向的:我们可以将[]byte转换成其他类型并对其进行修改,并且我们也可以告诉Go任意对象是[]byte,Go会相信我们的!
我们可以使用这个技巧访问对象的底层内存, 我在我的 few 项目用到了。我们将把一个map(任何类型)转换为其 运行时定义(打开这个链接,它将是一个有用的参考)的本地副本。 然后,我们可以直接访问它的数据,希望能设计出一种选择随机key的有效方法。
查看 hashmap.go 我们看到了一个 hiter 类型还有 mapiterinit 和 mapiternext方法。当你range map的时候会运行这些代码。 hiter 是一个迭代器; mapiterinit 初始化它, mapiternext 寻找下一次迭代位置。所以现在我有了个计划:
  1. 1. 转换map为一个 hmap
  2. 2. 使用 new(hiter) 创建一个迭代器并且使用 mapiterinit 进行初始化
  3. 3. 使用range [0,len(m)] 生成随机数n
  4. 4. 运行 mapiternext n
  5. 5. 返回迭代器当前位置的key
这种方法的好处是,我们不需要了解 mapiterinit 和 mapiternext 的具体细节; 我们可以复制所有相关的代码从 hashmap.go  ( 实际上,编译器会报缺少一些运行时方法,比如 atomic.Or8,所以我们必须把这些东西拿出来,以避免编译器报错) 然后, 我们只是需要更多的代码来转换 interface{} 的值。明确的,我们想通过我们的randMapKey 函数 interface{} 提取map 类型和值, 并且,我们希望将最后的迭代器位置打包到调用者可以使用的 interface{} 中。这些辅助函数非常容易实现,如果你熟悉的 interface internals 的话:
// runtime representation of an interface{}
type emptyInterface struct {
        typ unsafe.Pointer
        val unsafe.Pointer
}
func mapTypeAndValue(m interface{}) (*maptype, *hmap) {
        ei := (*emptyInterface)(unsafe.Pointer(&m))
        return (*maptype)(ei.typ), (*hmap)(ei.val)
}
func iterKey(it *hiter) interface{} {
        ei := emptyInterface{
                typ: unsafe.Pointer(it.t.key), // it.t is the maptype
                val: it.key,
        }
        return *(*interface{})(unsafe.Pointer(&ei))
现在我们终于准备好实现一个通用的 randMapKey (O(n) 时间复杂度 、O(1) 空间复杂度):
func randMapKey(m interface{}) interface{} {
        // get map internals
        t, h := mapTypeAndValue(m)
        // initialize iterator
        it := new(hiter)
        mapiterinit(t, h, it)
        // advance iterator a random number of times
        r := rand.Intn(h.count) // h.count == len(m)
        for i := 0; i < r; i++ {
                mapiternext(it)
        }
        // return current iterator key as an interface{}
        return iterKey(it)
}
它工作了,但要付出了代价。 unsafe被称为不安全是有原因的——实际上有很多原因,但这里的原因是我们假设了Go的map类型的底层结构和操作。如果Go 1.8 向  hmap  、 maptype 或者  hiter 添加一个字段,它可能完全破坏我们的代码。实际上,Go 1.8 添加了一个字段到 hmap 所以我们的代码必须在Go 1.8 发布后立即更新。这对于发布我们的 randMapKey 包有非常大的影响,因为我们只能假设它的一些用户运行在Go 1.7 上,而另一些用户运行在Go 1.8 上。因此,使用 build tags 来确保我们的包不会在用户升级他们的Go版本后不能使用,这一点非常重要。
我们现在可以收工了,但是有件事在困扰着我……我们能做得比O(n)时间复杂度更好吗?

3: 是

事实上,我们可以在常数时间内完成, 就像slice一样(尽管常数要大一些)。但为了达到这个目标,我们需要更好地理解Go maps是如何工作的。
一个 hmap 是一个buckets数组。 buckets的数量总是2的幂次(具体来说,是1 << h.B)。  由于bucket的数量随着map的增长而变化,所以bucket数组表示为 unsafe.Pointer 在 hmap 结构体中。 每个bucket包含8个“cell”(键/值对),其中可能包含有效数据,也可能不包含有效数据,每个cell有一个tophash值,用于标识其内容。bucket的结构定义有点混乱;它是这样的:
const bucketCnt = 8 // number of cells per bucket
type bmap struct {
        tophash [bucketCnt]uint8
        // Followed by bucketCnt keys and then bucketCnt values.
        // NOTE: packing all the keys together and then all the values together makes the
        // code a bit more complicated than alternating key/value/key/value/... but it allows
        // us to eliminate padding which would be needed for, e.g., map[int64]int8.
        // Followed by an overflow pointer.
}
正如你所看到的,实际的key和value数据没有在结构中表示!这是因为key和value的类型是未知的,所以编译器不知道bmap的大小。 假如map是这样的: map[string]int, 你可以猜到 bmap 像这样:
type bmap struct {
        tophash  [bucketCnt]uint8
        keys     [bucketCnt]string
        values   [bucketCnt]int
        overflow *bmap
插入一个 key/value 键值对,我们首先计算key的哈希值,它表示为 uintptr。(运行时中的每种类型都有一个关联的哈希函数。)  然后我们需要决定使用哪个bucket 因为bucket的数量总是2的幂数,所以我们可以使用一个简单的位掩码:
// h is an hmap, t is a maptype
hash := t.key.alg.hash(key, uintptr(h.hash0))
bucketIndex := hash & (uintptr(1) << h.B - 1)
在选择bucket之后,我们需要找到一个打开的cell。我们遍历每个cell,检查 tophash 的值。tophash 包含存储在cell中的key的哈希值的前8位,特殊情况下 tophash == 0 表示一个空cell。找到空cell后,我们将key和value存储在那里,并相应地设置tophash。下面是(大大简化的)算法:
// calculate tophash
top := uint8(hash >> (unsafe.Sizeof(hash)*8 - 8))
// seek to offset of bucketIndex in h.buckets
b := (*bmap)(unsafe.Pointer(uintptr(h.buckets) + bucketIndex*uintptr(t.bucketsize)))
// iterate through the cells of b. If a tophash matches top, it means we've
// already inserted a value with this key, so overwrite it. Otherwise, store
// the key/value in the first empty cell.
for i := 0; i < bucketCnt; i++ {
        if b.tophash[i] == top {
                // overwrite the existing value
                // [ code omitted ]
                return
        } else if b.tophash[i] == 0 {
                // insert the new key/value
                // [ code omitted ]
                b.tophash[i] = top
                h.count++
                return
        }
}
正如您可能期望的那样,为了检索map值,我们只是重复这个过程,但是我们返回cell的数据,而不是覆盖/插入它。
好了, 现在我们可以改善我们的 randMapKey 函数了。回想一下,选择一个随机slice元素是很容易的;仅仅需要选择个随机索引。好吧,随便一看, h.buckets 本质上是一个slice; 他是一个连续的key/value cells。最大的区别是有的cell是空的。所以我们想随机选择一个索引,就要避免空cell。
一种解决方案是,如果我们遇到一个空cell,则简单地向前迭代,直到找到一个非cell为止。这就是 mapiterinit 所做的!但这种方法有一个严重的缺陷。 考虑一下一个bucket包含两个有效cell的情况:
[foo] [bar] [---] [---] [---] [---] [---] [---]
如果我们选择一个随机索引[0,8]并跳过空cell,发生什么呢? 假如我们选择 0,我们将得到 foo假如我们选择1,我们将得到bar 但如果我们选择任何其他索引,我们将继续前进,将绕到索引0,并落在在 foo上!因此,即使我们使用随机索引,我们在foo上的可能性也比bar高7倍。这对我们的目的来说绝对是不可接受的。
幸运的是,还有另一种解决方案:我们选择一个随机索引,如果cell是空的,我们只需再次尝试一个新的随机索引。 平均而言,我们将在k/n次尝试中选择出一个有效的cell,其中k是cell的数量,n是非空cell的数量。 当然,这个算法要保证生成一个均匀随机的cell分布,因为每个索引被选中的概率是相等的。
新的 randMapKey 我们使用这个算法。 我们只需要一个辅助函数来从cell中提取key数据。我还将定义一个add函数,使指针在数学方面的操作更具可读性:
func add(p unsafe.Pointer, x uintptr) unsafe.Pointer {
        return unsafe.Pointer(uintptr(p) + x)
}
func cellKey(t *maptype, b *bmap, i int) interface{} {
        // dataOffset is where the cell data begins in a bmap.
        const dataOffset = unsafe.Offsetof(struct {
                tophash [bucketCnt]uint8
                cells   int64
        }{}.cells)
        k := add(unsafe.Pointer(b), dataOffset+uintptr(i)*uintptr(t.keysize))
        if t.indirectkey {
                // if the map's key type is too big, a pointer will be stored in
                // the map instead of the actual data. In that case, we need to
                // dereference the pointer.
                k = *(*unsafe.Pointer)(k)
        }
        ei := emptyInterface{
                typ: unsafe.Pointer(t.key),
                val: k,
        }
        return *(*interface{})(unsafe.Pointer(&ei))
}
func randMapKey(m interface{}) interface{} {
        // get map internals
        t, h := mapTypeAndValue(m)
        numBuckets := 1 << h.B
        // loop until we hit a valid cell
        for {
                // pick random indices
                bucketIndex := rand.Intn(numBuckets)
                cellIndex := rand.Intn(bucketCnt)
                // lookup cell
                b := (*bmap)(add(h.buckets, uintptr(bucketIndex)*uintptr(t.bucketsize)))
                if b.tophash[cellIndex] == 0 {
                        // cell is empty; try again
                        continue
                }
                return cellKey(t, b, cellIndex)
        }
}
太好了,它可以正常工作!我们做完了吗?

4: 不

上面的例子是有问题的, 例子中的map只包含整数[0,8],我知道它们在散列时不会发生碰撞。 那么,如果两个不同的key散列得到同一个值会发生什么? 答:overflow buckets。 如果你重新审视 bmap 的定义, 你会看到它有一个 overflow 指向另一个bmap。  在发生冲突时,将分配一个新bucket并将其链接到旧bucket上。然后,在查找期间,我们首先检查cell中的key是否与我们正在查找的key匹配,如果不匹配,我们将移动到下一个overflow bucket 并再次尝试。
这如何影响我们的 randMapKey 函数?事实上,不是很显著。我们只需要添加一个维度。之前,我们从随机bucket中选择一个随机cell。现在,我们需要从overflow链的随机bucket中选择一个随机cell。
           bucket0   bucket1   bucket2   bucket3
overflow0 [|||||||] [|||||||] [|||||||] [|||||||]
overflow1 [|||||||]           [|||||||]
overflow2                     [|||||||]
之前,我们只从最上面一行中选择4个bucket。现在,我们需要从整个网格中选择12个buckets,即使其中一些bucket不存在。这和之前的空cell是一样的; 如果一个overflow bucket不存在,我们就用一个新的随机数重新开始。
不过,有一个麻烦是,我们不知道预先有多少个overflow bucket hmap 不包含可以乘上的 maxOverflow 整数。相反,我们需要自己计算它,我们可以通过遍历每个bucket并跟踪它们的overflow指针来实现,直到overflow为nil为止。这增加了一些启动成本,但没有办法。代码如下:
func (b *bmap) overflow(t *maptype) *bmap {
        offset := uintptr(t.bucketsize)-unsafe.Sizeof(uintptr(0))
        return *(**bmap)(add(unsafe.Pointer(b), offset))
}
func maxOverflow(t *maptype, h *hmap) int {
        numBuckets := uintptr(1 << h.B)
        max := 0
        for i := uintptr(0); i < numBuckets; i++ {
                over := 0
                b := (*bmap)(add(h.buckets, i*uintptr(t.bucketsize)))
                for b = b.overflow(t); b != nil; over++ {
                        b = b.overflow(t)
                }
                if over > max {
                        max = over
                }
        }
        return max
现在, randMapKey 如下:
func randMapKey(m interface{}) interface{} {
        // get map internals
        t, h := mapTypeAndValue(m)
        numBuckets := 1 << h.B
        numOver := maxOverflow(t, h) + 1 // add 1 to account for "base" bucket
        // loop until we hit a valid cell
loop:
        for {
                // pick random indices
                bucketIndex := rand.Intn(numBuckets)
                overIndex := rand.Intn(numOver)
                cellIndex := rand.Intn(bucketCnt)
                // seek to index in h.buckets
                b := (*bmap)(add(h.buckets, uintptr(bucketIndex)*uintptr(t.bucketsize)))
                // seek to index in overflow chain
                for i := 0; i < overIndex; i++ {
                        b = b.overflow(t)
                        if b == nil {
                                // invalid bucket; try again
                                continue loop
                        }
                }
                // lookup cell
                if b.tophash[cellIndex] == 0 {
                        // cell is empty; try again
                        continue loop
                }
                return cellKey(t, b, cellIndex)
        }
我们可以确认这对于包含overflow bucket的map是有效的,我担心我还有第五章...

5: 当然有第五章

(这是最后一章,我保证)
Go map是高度优化的,其中一个优化称为“增量复制”。基本上来说,当一个map被填满,并且你试图插入一个新元素时,Go runtime将立即分配一个新的bucket数组(是旧数组的两倍大),在那里存储新的key/value。 但它不会复制旧bucket中的cell;相反,每次插入或删除一个元素时,它的bucket(以及任何链式overflow bucket)将首先被复制(“ evacuated”)到新内存中。 一旦所有的bucket被evacuated,h.oldbuckets将为nil。
我相信你已经看到了问题:到目前为止,我们只从 h.buckets 中选择cell。为了覆盖所有可能的值,我们还需要检查 h.oldbuckets 我们需要做三个改变:
  1. 1. 当我们选择一个bucket时,我们需要检查它对应的oldbucket是否已被复制。如果没有,我们需要从oldbucke中进行选择。
  2. 2. 如果我们从一个oldbucket中选择,我们需要检查cell最终将被疏散到哪里。如果它的目的地是我们最初选择的bucket,我们可以返回它。否则,再试一次。(这对于防止过度采样是必要的,因为每个旧oldbucket都会扩展到两个新bucket。)
  3. 3 maxOverflow 需要返回 h.buckets 和 h.oldbuckets 的最大值。
幸运的是,这些并不难实现。首先,我们将修改 maxOverflow
func maxOverflow(t *maptype, h *hmap) int {
        numBuckets := uintptr(1 << h.B)
        max := 0
        for i := uintptr(0); i < numBuckets; i++ {
                over := 0
                b := (*bmap)(add(h.buckets, i*uintptr(t.bucketsize)))
                for b = b.overflow(t); b != nil; over++ {
                        b = b.overflow(t)
                }
                if over > max {
                        max = over
                }
        }
        // check oldbuckets too, if it exists
        if h.oldbuckets != nil {
                for i := uintptr(0); i < numBuckets/2; i++ {
                        var over int
                        b := (*bmap)(add(h.oldbuckets, i*uintptr(t.bucketsize)))
                        if evacuated(b) {
                                // we already counted this bucket in the first loop
                                continue
                        }
                        for b = b.overflow(t); b != nil; over++ {
                                b = b.overflow(t)
                        }
                        if over > max {
                                max = over
                        }
                }
        }
        return max
最后,我们可以写一个最终版本的 randMapKey 当我们检查未疏散的oldbucket时,我们将设置一个标志,告诉我们检查cell的未来目的地:
func randMapKey(m interface{}) interface{} {
        // get map internals
        t, h := mapTypeAndValue(m)
        numBuckets := uintptr(1 << h.B)
        numOver := maxOverflow(t, h) + 1 // add 1 to account for "base" bucket
        // loop until we hit a valid cell
loop:
        for {
                // pick a random index
                bucketIndex := uintptr(rand.Intn(int(numBuckets)))
                overIndex := rand.Intn(numOver)
                cellIndex := rand.Intn(bucketCnt)
                // seek to index in h.buckets
                b := (*bmap)(add(h.buckets, bucketIndex*uintptr(t.bucketsize)))
                // if the oldbucket hasn't been evacuated, then we need to use that
                // pointer instead.
                usingOldBucket := false
                if h.oldbuckets != nil {
                        numOldBuckets := numBuckets / 2
                        oldBucketIndex := bucketIndex & (numOldBuckets - 1)
                        oldB := (*bmap)(add(h.oldbuckets, oldBucketIndex*uintptr(t.bucketsize)))
                        if !evacuated(oldB) {
                                b = oldB
                                usingOldBucket = true
                        }
                }
                // seek to index in overflow chain
                for i := 0; i < overIndex; i++ {
                        b = b.overflow(t)
                        if b == nil {
                                // invalid bucket; try again
                                continue loop
                        }
                }
                // lookup cell
                if b.tophash[cellIndex] == 0 {
                        // cell is empty; try again
                        continue loop
                }
                // grab key and dereference if necessary (same as cellKey)
                k := add(unsafe.Pointer(b), dataOffset+uintptr(cellIndex)*uintptr(t.keysize))
                if t.indirectkey {
                        k = *(*unsafe.Pointer)(k)
                }
                // if this is an old bucket, we need to check whether this key is destined
                // for the new bucket. Otherwise, we will have a 2x bias towards oldbucket
                // values, since two different bucket selections can result in the same
                // oldbucket.
                if usingOldBucket {
                        hash := t.key.alg.hash(k, uintptr(h.hash0))
                        if hash&(numBuckets-1) != bucketIndex {
                                // this key is destined for a different bucket
                                continue loop
                        }
                }
                // pack key into interface{} (same as cellKey)
                ei := emptyInterface{
                        typ: unsafe.Pointer(t.key),
                        val: k,
                }
                return *(*interface{})(unsafe.Pointer(&ei))
        }
考虑到这一点,还不算太糟!如果您错过了顶部的链接,完整的代码可以在这里找到
我希望你喜欢这个map类型的旅程。在以后的文章中,我们将讨论 randmap 的另一个很酷的方面:高效随机迭代。同时,请让我知道这个帖子还有什么需要改进的地方!

原文地址:https://lukechampine.com/hackmap.html


最后于 4月前 被愚人乙编辑 ,原因:
最新回复 (0)
    • 运维开源项目互助社区—致敬开源
      2
        立即登录 立即注册 
返回