diff --git a/cache/cache.go b/cache/cache.go index c49e98c..7d57d86 100644 --- a/cache/cache.go +++ b/cache/cache.go @@ -2,6 +2,7 @@ package cache import ( "context" + "sync" "time" ) @@ -37,3 +38,13 @@ type Refresh[K comparable, V any] interface { type RefreshVar[T any] interface { Refresh(ctx context.Context, a ...any) } + +type Lockss[K comparable] interface { + GetLock(ctx context.Context, gMut *sync.Mutex, k ...K) *sync.Mutex +} + +type LockFn[K comparable] func(ctx context.Context, gMut *sync.Mutex, k ...K) *sync.Mutex + +type LocksNum interface { + SetLockNum(num int) +} diff --git a/cache/cachemanager/manger.go b/cache/cachemanager/manger.go index 6910978..173c7e3 100644 --- a/cache/cachemanager/manger.go +++ b/cache/cachemanager/manger.go @@ -8,6 +8,7 @@ import ( "github.com/fthvgb1/wp-go/helper" str "github.com/fthvgb1/wp-go/helper/strings" "github.com/fthvgb1/wp-go/safety" + "runtime" "time" ) @@ -192,7 +193,7 @@ func NewPaginationCache[K comparable, V any](m *cache.MapCache[string, helper.Pa func NewMapCache[K comparable, V any](data cache.Cache[K, V], batchFn cache.MapBatchFn[K, V], fn cache.MapSingleFn[K, V], args ...any) *cache.MapCache[K, V] { inc := helper.ParseArgs((*cache.IncreaseUpdate[K, V])(nil), args...) - m := cache.NewMapCache[K, V](data, fn, batchFn, inc, args...) + m := cache.NewMapCache[K, V](data, fn, batchFn, inc, buildLockFn[K](args...), args...) FlushPush(m) ClearPush(m) name, f := parseArgs(args...) @@ -204,6 +205,35 @@ func NewMapCache[K comparable, V any](data cache.Cache[K, V], batchFn cache.MapB } return m } + +func buildLockFn[K comparable](args ...any) cache.LockFn[K] { + lockFn := helper.ParseArgs(cache.LockFn[K](nil), args...) + name := helper.ParseArgs("", args...) + num := helper.ParseArgs(runtime.NumCPU(), args...) + loFn := func() int { + return num + } + loFn = helper.ParseArgs(loFn, args...) + if name != "" { + loFn = reload.FnVal(str.Join("cachesLocksNum-", name), num, loFn) + } + if lockFn == nil { + looo := helper.ParseArgs(cache.Lockss[K](nil), args...) + if looo != nil { + lockFn = looo.GetLock + loo, ok := any(looo).(cache.LocksNum) + if ok && loo != nil { + loo.SetLockNum(num) + } + } else { + lo := cache.NewLocks[K](loFn) + lockFn = lo.GetLock + FlushPush(lo) + } + + } + return lockFn +} func NewMemoryMapCache[K comparable, V any](batchFn cache.MapBatchFn[K, V], fn cache.MapSingleFn[K, V], expireTime time.Duration, args ...any) *cache.MapCache[K, V] { diff --git a/cache/locks.go b/cache/locks.go new file mode 100644 index 0000000..d15c842 --- /dev/null +++ b/cache/locks.go @@ -0,0 +1,75 @@ +package cache + +import ( + "context" + "github.com/fthvgb1/wp-go/safety" + "sync" + "sync/atomic" +) + +type Locks[K comparable] struct { + numFn func() int + locks []*sync.Mutex + m *safety.Map[K, *sync.Mutex] + counter *int64 +} + +func (l *Locks[K]) Flush(_ context.Context) { + l.m.Flush() + atomic.StoreInt64(l.counter, 0) +} + +func (l *Locks[K]) SetNumFn(numFn func() int) { + l.numFn = numFn +} + +func NewLocks[K comparable](num func() int) *Locks[K] { + var i int64 + return &Locks[K]{numFn: num, m: safety.NewMap[K, *sync.Mutex](), counter: &i} +} + +func (l *Locks[K]) SetLockNum(num int) { + if num > 0 { + l.locks = make([]*sync.Mutex, num) + for i := 0; i < num; i++ { + l.locks[i] = &sync.Mutex{} + } + } +} + +func (l *Locks[K]) GetLock(ctx context.Context, gMut *sync.Mutex, keys ...K) *sync.Mutex { + k := keys[0] + lo, ok := l.m.Load(k) + if ok { + return lo + } + gMut.Lock() + defer gMut.Unlock() + lo, ok = l.m.Load(k) + if ok { + return lo + } + num := l.numFn() + if num <= 0 { + lo = &sync.Mutex{} + l.m.Store(k, lo) + return lo + } + if len(l.locks) == 0 { + l.SetLockNum(num) + } + counter := int(atomic.LoadInt64(l.counter)) + if counter > len(l.locks)-1 { + atomic.StoreInt64(l.counter, 0) + counter = 0 + } + lo = l.locks[counter] + l.m.Store(k, lo) + atomic.AddInt64(l.counter, 1) + if len(l.locks) < num { + for i := 0; i < num-len(l.locks); i++ { + l.locks = append(l.locks, &sync.Mutex{}) + } + } + return lo +} diff --git a/cache/map.go b/cache/map.go index fe2bde1..d50feb6 100644 --- a/cache/map.go +++ b/cache/map.go @@ -13,7 +13,8 @@ import ( type MapCache[K comparable, V any] struct { Cache[K, V] - mux sync.Mutex + mux *sync.Mutex + muFn func(ctx context.Context, gMut *sync.Mutex, k ...K) *sync.Mutex cacheFunc MapSingleFn[K, V] batchCacheFn MapBatchFn[K, V] getCacheBatch func(c context.Context, key []K, timeout time.Duration, params ...any) ([]V, error) @@ -63,13 +64,14 @@ type MapSingleFn[K, V any] func(context.Context, K, ...any) (V, error) type MapBatchFn[K comparable, V any] func(context.Context, []K, ...any) (map[K]V, error) type IncreaseFn[K comparable, V any] func(c context.Context, currentData V, k K, t time.Time, a ...any) (data V, save bool, refresh bool, err error) -func NewMapCache[K comparable, V any](ca Cache[K, V], cacheFunc MapSingleFn[K, V], batchCacheFn MapBatchFn[K, V], inc *IncreaseUpdate[K, V], a ...any) *MapCache[K, V] { +func NewMapCache[K comparable, V any](ca Cache[K, V], cacheFunc MapSingleFn[K, V], batchCacheFn MapBatchFn[K, V], inc *IncreaseUpdate[K, V], lockFn LockFn[K], a ...any) *MapCache[K, V] { r := &MapCache[K, V]{ Cache: ca, - mux: sync.Mutex{}, + mux: &sync.Mutex{}, cacheFunc: cacheFunc, batchCacheFn: batchCacheFn, increaseUpdate: inc, + muFn: lockFn, } if cacheFunc == nil && batchCacheFn != nil { r.setDefaultCacheFn(batchCacheFn) @@ -218,8 +220,9 @@ func (m *MapCache[K, V]) increaseUpdates(c context.Context, timeout time.Duratio return data, err } fn := func() { - m.mux.Lock() - defer m.mux.Unlock() + l := m.muFn(c, m.mux, key) + l.Lock() + defer l.Unlock() if nowTime.Sub(m.GetLastSetTime(c, key)) < m.increaseUpdate.CycleTime() { return } @@ -257,8 +260,9 @@ func (m *MapCache[K, V]) GetCache(c context.Context, key K, timeout time.Duratio return m.increaseUpdates(c, timeout, data, key, params...) } call := func() { - m.mux.Lock() - defer m.mux.Unlock() + l := m.muFn(c, m.mux, key) + l.Lock() + defer l.Unlock() if data, ok = m.Get(c, key); ok { return } @@ -377,8 +381,9 @@ func (m *MapCache[K, V]) getBatchToMapes(c context.Context, key []K, timeout tim } call := func() { - m.mux.Lock() - defer m.mux.Unlock() + l := m.muFn(c, m.mux, key...) + l.Lock() + defer l.Unlock() needFlushs := maps.FilterToSlice(needIndex, func(k K, v int) (K, bool) { vv, ok := m.Get(c, k) if ok { @@ -442,8 +447,9 @@ func (m *MapCache[K, V]) getCacheBatchs(c context.Context, key []K, timeout time var err error call := func() { - m.mux.Lock() - defer m.mux.Unlock() + l := m.muFn(c, m.mux, key...) + l.Lock() + defer l.Unlock() needFlushs := maps.FilterToSlice(needIndex, func(k K, v int) (K, bool) { vv, ok := m.Get(c, k) if ok {