This commit is contained in:
Mike Schwörer 2023-01-29 22:28:08 +01:00
parent 48dd30fb94
commit 72d6b538f7
Signed by: Mikescher
GPG Key ID: D3C7172E0A70F8CF

View File

@ -2,58 +2,50 @@ package syncext
import ( import (
"context" "context"
"sync/atomic" "gogs.mikescher.com/BlackForestBytes/goext/langext"
"sync"
"time" "time"
) )
type AtomicBool struct { type AtomicBool struct {
v int32 v bool
waiter chan bool // unbuffered listener map[string]chan bool
lock sync.Mutex
} }
func NewAtomicBool(value bool) *AtomicBool { func NewAtomicBool(value bool) *AtomicBool {
if value { return &AtomicBool{
return &AtomicBool{v: 1, waiter: make(chan bool)} v: value,
} else { listener: make(map[string]chan bool),
return &AtomicBool{v: 0, waiter: make(chan bool)} lock: sync.Mutex{},
} }
} }
func (a *AtomicBool) Get() bool { func (a *AtomicBool) Get() bool {
return atomic.LoadInt32(&a.v) == 1 a.lock.Lock()
defer a.lock.Unlock()
return a.v
} }
func (a *AtomicBool) Set(value bool) { func (a *AtomicBool) Set(value bool) {
if value { a.lock.Lock()
atomic.StoreInt32(&a.v, 1) defer a.lock.Unlock()
} else {
atomic.StoreInt32(&a.v, 0)
}
a.v = value
for k, v := range a.listener {
select { select {
case a.waiter <- value: case v <- value:
// message sent // message sent
default: default:
// no receiver on channel // no receiver on channel
delete(a.listener, k)
}
} }
} }
func (a *AtomicBool) Wait(waitFor bool) { func (a *AtomicBool) Wait(waitFor bool) {
if a.Get() == waitFor { _ = a.WaitWithContext(context.Background(), waitFor)
return
}
for {
if v, ok := ReadChannelWithTimeout(a.waiter, 128*time.Millisecond); ok {
if v == waitFor {
return
}
} else {
if a.Get() == waitFor {
return
}
}
}
} }
func (a *AtomicBool) WaitWithTimeout(timeout time.Duration, waitFor bool) error { func (a *AtomicBool) WaitWithTimeout(timeout time.Duration, waitFor bool) error {
@ -71,12 +63,25 @@ func (a *AtomicBool) WaitWithContext(ctx context.Context, waitFor bool) error {
return nil return nil
} }
uuid, _ := langext.NewHexUUID()
waitchan := make(chan bool)
a.lock.Lock()
a.listener[uuid] = waitchan
a.lock.Unlock()
defer func() {
a.lock.Lock()
delete(a.listener, uuid)
a.lock.Unlock()
}()
for { for {
if err := ctx.Err(); err != nil { if err := ctx.Err(); err != nil {
return err return err
} }
timeOut := 128 * time.Millisecond timeOut := 1024 * time.Millisecond
if dl, ok := ctx.Deadline(); ok { if dl, ok := ctx.Deadline(); ok {
timeOutMax := dl.Sub(time.Now()) timeOutMax := dl.Sub(time.Now())
@ -87,7 +92,7 @@ func (a *AtomicBool) WaitWithContext(ctx context.Context, waitFor bool) error {
} }
} }
if v, ok := ReadChannelWithTimeout(a.waiter, timeOut); ok { if v, ok := ReadChannelWithTimeout(waitchan, timeOut); ok {
if v == waitFor { if v == waitFor {
return nil return nil
} }