From 72d6b538f7c22102898082ad56144127d3d27b83 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mike=20Schw=C3=B6rer?= Date: Sun, 29 Jan 2023 22:28:08 +0100 Subject: [PATCH] v0.0.71 --- syncext/atomic.go | 75 +++++++++++++++++++++++++---------------------- 1 file changed, 40 insertions(+), 35 deletions(-) diff --git a/syncext/atomic.go b/syncext/atomic.go index d1e4ce6..e9228f2 100644 --- a/syncext/atomic.go +++ b/syncext/atomic.go @@ -2,58 +2,50 @@ package syncext import ( "context" - "sync/atomic" + "gogs.mikescher.com/BlackForestBytes/goext/langext" + "sync" "time" ) type AtomicBool struct { - v int32 - waiter chan bool // unbuffered + v bool + listener map[string]chan bool + lock sync.Mutex } func NewAtomicBool(value bool) *AtomicBool { - if value { - return &AtomicBool{v: 1, waiter: make(chan bool)} - } else { - return &AtomicBool{v: 0, waiter: make(chan bool)} + return &AtomicBool{ + v: value, + listener: make(map[string]chan bool), + lock: sync.Mutex{}, } } func (a *AtomicBool) Get() bool { - return atomic.LoadInt32(&a.v) == 1 + a.lock.Lock() + defer a.lock.Unlock() + return a.v } func (a *AtomicBool) Set(value bool) { - if value { - atomic.StoreInt32(&a.v, 1) - } else { - atomic.StoreInt32(&a.v, 0) - } + a.lock.Lock() + defer a.lock.Unlock() - select { - case a.waiter <- value: - // message sent - default: - // no receiver on channel + a.v = value + + for k, v := range a.listener { + select { + case v <- value: + // message sent + default: + // no receiver on channel + delete(a.listener, k) + } } } func (a *AtomicBool) Wait(waitFor bool) { - if a.Get() == waitFor { - return - } - - for { - if v, ok := ReadChannelWithTimeout(a.waiter, 128*time.Millisecond); ok { - if v == waitFor { - return - } - } else { - if a.Get() == waitFor { - return - } - } - } + _ = a.WaitWithContext(context.Background(), waitFor) } func (a *AtomicBool) WaitWithTimeout(timeout time.Duration, waitFor bool) error { @@ -71,12 +63,25 @@ func (a *AtomicBool) WaitWithContext(ctx context.Context, waitFor bool) error { return nil } + uuid, _ := langext.NewHexUUID() + + waitchan := make(chan bool) + + a.lock.Lock() + a.listener[uuid] = waitchan + a.lock.Unlock() + defer func() { + a.lock.Lock() + delete(a.listener, uuid) + a.lock.Unlock() + }() + for { if err := ctx.Err(); err != nil { return err } - timeOut := 128 * time.Millisecond + timeOut := 1024 * time.Millisecond if dl, ok := ctx.Deadline(); ok { timeOutMax := dl.Sub(time.Now()) @@ -87,7 +92,7 @@ func (a *AtomicBool) WaitWithContext(ctx context.Context, waitFor bool) error { } } - if v, ok := ReadChannelWithTimeout(a.waiter, timeOut); ok { + if v, ok := ReadChannelWithTimeout(waitchan, timeOut); ok { if v == waitFor { return nil }