// Copyright 2019 The Go Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. package sync import ( "sync/atomic" "unsafe" ) // poolDequeue is a lock-free fixed-size single-producer, // multi-consumer queue. The single producer can both push and pop // from the head, and consumers can pop from the tail. // // It has the added feature that it nils out unused slots to avoid // unnecessary retention of objects. This is important for sync.Pool, // but not typically a property considered in the literature. type poolDequeue struct { // headTail packs together a 32-bit head index and a 32-bit // tail index. Both are indexes into vals modulo len(vals)-1. // // tail = index of oldest data in queue // head = index of next slot to fill // // Slots in the range [tail, head) are owned by consumers. // A consumer continues to own a slot outside this range until // it nils the slot, at which point ownership passes to the // producer. // // The head index is stored in the most-significant bits so // that we can atomically add to it and the overflow is // harmless. headTail atomic.Uint64 // vals is a ring buffer of interface{} values stored in this // dequeue. The size of this must be a power of 2. // // vals[i].typ is nil if the slot is empty and non-nil // otherwise. A slot is still in use until *both* the tail // index has moved beyond it and typ has been set to nil. This // is set to nil atomically by the consumer and read // atomically by the producer. vals []eface } type eface struct { typ, val unsafe.Pointer } const dequeueBits = 32 // dequeueLimit is the maximum size of a poolDequeue. // // This must be at most (1<<dequeueBits)/2 because detecting fullness // depends on wrapping around the ring buffer without wrapping around // the index. We divide by 4 so this fits in an int on 32-bit. const dequeueLimit = (1 << dequeueBits) / 4 // dequeueNil is used in poolDequeue to represent interface{}(nil). // Since we use nil to represent empty slots, we need a sentinel value // to represent nil. type dequeueNil *struct{} func (d *poolDequeue) unpack(ptrs uint64) (head, tail uint32) { const mask = 1<<dequeueBits - 1 head = uint32((ptrs >> dequeueBits) & mask) tail = uint32(ptrs & mask) return } func (d *poolDequeue) pack(head, tail uint32) uint64 { const mask = 1<<dequeueBits - 1 return (uint64(head) << dequeueBits) | uint64(tail&mask) } // pushHead adds val at the head of the queue. It returns false if the // queue is full. It must only be called by a single producer. func (d *poolDequeue) pushHead(val any) bool { ptrs := d.headTail.Load() head, tail := d.unpack(ptrs) if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head { // Queue is full. return false } slot := &d.vals[head&uint32(len(d.vals)-1)] // Check if the head slot has been released by popTail. typ := atomic.LoadPointer(&slot.typ) if typ != nil { // Another goroutine is still cleaning up the tail, so // the queue is actually still full. return false } // The head slot is free, so we own it. if val == nil { val = dequeueNil(nil) } *(*any)(unsafe.Pointer(slot)) = val // Increment head. This passes ownership of slot to popTail // and acts as a store barrier for writing the slot. d.headTail.Add(1 << dequeueBits) return true } // popHead removes and returns the element at the head of the queue. // It returns false if the queue is empty. It must only be called by a // single producer. func (d *poolDequeue) popHead() (any, bool) { var slot *eface for { ptrs := d.headTail.Load() head, tail := d.unpack(ptrs) if tail == head { // Queue is empty. return nil, false } // Confirm tail and decrement head. We do this before // reading the value to take back ownership of this // slot. head-- ptrs2 := d.pack(head, tail) if d.headTail.CompareAndSwap(ptrs, ptrs2) { // We successfully took back slot. slot = &d.vals[head&uint32(len(d.vals)-1)] break } } val := *(*any)(unsafe.Pointer(slot)) if val == dequeueNil(nil) { val = nil } // Zero the slot. Unlike popTail, this isn't racing with // pushHead, so we don't need to be careful here. *slot = eface{} return val, true } // popTail removes and returns the element at the tail of the queue. // It returns false if the queue is empty. It may be called by any // number of consumers. func (d *poolDequeue) popTail() (any, bool) { var slot *eface for { ptrs := d.headTail.Load() head, tail := d.unpack(ptrs) if tail == head { // Queue is empty. return nil, false } // Confirm head and tail (for our speculative check // above) and increment tail. If this succeeds, then // we own the slot at tail. ptrs2 := d.pack(head, tail+1) if d.headTail.CompareAndSwap(ptrs, ptrs2) { // Success. slot = &d.vals[tail&uint32(len(d.vals)-1)] break } } // We now own slot. val := *(*any)(unsafe.Pointer(slot)) if val == dequeueNil(nil) { val = nil } // Tell pushHead that we're done with this slot. Zeroing the // slot is also important so we don't leave behind references // that could keep this object live longer than necessary. // // We write to val first and then publish that we're done with // this slot by atomically writing to typ. slot.val = nil atomic.StorePointer(&slot.typ, nil) // At this point pushHead owns the slot. return val, true } // poolChain is a dynamically-sized version of poolDequeue. // // This is implemented as a doubly-linked list queue of poolDequeues // where each dequeue is double the size of the previous one. Once a // dequeue fills up, this allocates a new one and only ever pushes to // the latest dequeue. Pops happen from the other end of the list and // once a dequeue is exhausted, it gets removed from the list. type poolChain struct { // head is the poolDequeue to push to. This is only accessed // by the producer, so doesn't need to be synchronized. head *poolChainElt // tail is the poolDequeue to popTail from. This is accessed // by consumers, so reads and writes must be atomic. tail atomic.Pointer[poolChainElt] } type poolChainElt struct { poolDequeue // next and prev link to the adjacent poolChainElts in this // poolChain. // // next is written atomically by the producer and read // atomically by the consumer. It only transitions from nil to // non-nil. // // prev is written atomically by the consumer and read // atomically by the producer. It only transitions from // non-nil to nil. next, prev atomic.Pointer[poolChainElt] } func (c *poolChain) pushHead(val any) { d := c.head if d == nil { // Initialize the chain. const initSize = 8 // Must be a power of 2 d = new(poolChainElt) d.vals = make([]eface, initSize) c.head = d c.tail.Store(d) } if d.pushHead(val) { return } // The current dequeue is full. Allocate a new one of twice // the size. newSize := len(d.vals) * 2 if newSize >= dequeueLimit { // Can't make it any bigger. newSize = dequeueLimit } d2 := &poolChainElt{} d2.prev.Store(d) d2.vals = make([]eface, newSize) c.head = d2 d.next.Store(d2) d2.pushHead(val) } func (c *poolChain) popHead() (any, bool) { d := c.head for d != nil { if val, ok := d.popHead(); ok { return val, ok } // There may still be unconsumed elements in the // previous dequeue, so try backing up. d = d.prev.Load() } return nil, false } func (c *poolChain) popTail() (any, bool) { d := c.tail.Load() if d == nil { return nil, false } for { // It's important that we load the next pointer // *before* popping the tail. In general, d may be // transiently empty, but if next is non-nil before // the pop and the pop fails, then d is permanently // empty, which is the only condition under which it's // safe to drop d from the chain. d2 := d.next.Load() if val, ok := d.popTail(); ok { return val, ok } if d2 == nil { // This is the only dequeue. It's empty right // now, but could be pushed to in the future. return nil, false } // The tail of the chain has been drained, so move on // to the next dequeue. Try to drop it from the chain // so the next pop doesn't have to look at the empty // dequeue again. if c.tail.CompareAndSwap(d, d2) { // We won the race. Clear the prev pointer so // the garbage collector can collect the empty // dequeue and so popHead doesn't back up // further than necessary. d2.prev.Store(nil) } d = d2 } }