You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
82 lines
1.5 KiB
82 lines
1.5 KiB
package workpool |
|
|
|
import ( |
|
"errors" |
|
"runtime" |
|
"sync/atomic" |
|
) |
|
|
|
// ringBuffer . |
|
type ringBuffer struct { |
|
capacity uint64 |
|
mask uint64 |
|
padding1 [7]uint64 |
|
lastCommintIdx uint64 |
|
padding2 [7]uint64 |
|
nextFreeIdx uint64 |
|
padding3 [7]uint64 |
|
readerIdx uint64 |
|
padding4 [7]uint64 |
|
slots []*worker |
|
} |
|
|
|
// newRingBuffer . |
|
func newRingBuffer(c uint64) (*ringBuffer, error) { |
|
if c == 0 || c&3 != 0 { |
|
return nil, errors.New("capacity must be N power of 2") |
|
} |
|
return &ringBuffer{ |
|
lastCommintIdx: 0, |
|
nextFreeIdx: 1, |
|
readerIdx: 0, |
|
capacity: c, |
|
mask: c - 1, |
|
slots: make([]*worker, c), |
|
}, nil |
|
} |
|
|
|
// push . |
|
func (r *ringBuffer) push(w *worker) error { |
|
var head, tail, next uint64 |
|
for { |
|
head = r.nextFreeIdx |
|
tail = r.readerIdx |
|
if (head > tail+r.capacity-2) || (head < tail-1) { |
|
return errors.New("buffer is full") |
|
} |
|
|
|
next = (head + 1) & r.mask |
|
if atomic.CompareAndSwapUint64(&r.nextFreeIdx, head, next) { |
|
break |
|
} |
|
runtime.Gosched() |
|
} |
|
r.slots[head] = w |
|
|
|
for !atomic.CompareAndSwapUint64(&r.lastCommintIdx, head-1, head) { |
|
runtime.Gosched() |
|
} |
|
return nil |
|
} |
|
|
|
// pop . |
|
func (r *ringBuffer) pop() *worker { |
|
var head, next uint64 |
|
for { |
|
head = r.readerIdx |
|
if head == r.lastCommintIdx { |
|
return r.slots[head] |
|
} |
|
next = (head + 1) & r.mask |
|
if atomic.CompareAndSwapUint64(&r.readerIdx, head, next) { |
|
break |
|
} |
|
runtime.Gosched() |
|
} |
|
return r.slots[head] |
|
} |
|
|
|
// size . |
|
func (r *ringBuffer) size() uint64 { |
|
return r.lastCommintIdx - r.readerIdx |
|
}
|
|
|