map cache add mutex lock fn

This commit is contained in:
xing 2023-12-26 23:09:30 +08:00
parent c38b62c82a
commit 568ab15a34
4 changed files with 134 additions and 12 deletions

11
cache/cache.go vendored
View File

@ -2,6 +2,7 @@ package cache
import ( import (
"context" "context"
"sync"
"time" "time"
) )
@ -37,3 +38,13 @@ type Refresh[K comparable, V any] interface {
type RefreshVar[T any] interface { type RefreshVar[T any] interface {
Refresh(ctx context.Context, a ...any) Refresh(ctx context.Context, a ...any)
} }
type Lockss[K comparable] interface {
GetLock(ctx context.Context, gMut *sync.Mutex, k ...K) *sync.Mutex
}
type LockFn[K comparable] func(ctx context.Context, gMut *sync.Mutex, k ...K) *sync.Mutex
type LocksNum interface {
SetLockNum(num int)
}

View File

@ -8,6 +8,7 @@ import (
"github.com/fthvgb1/wp-go/helper" "github.com/fthvgb1/wp-go/helper"
str "github.com/fthvgb1/wp-go/helper/strings" str "github.com/fthvgb1/wp-go/helper/strings"
"github.com/fthvgb1/wp-go/safety" "github.com/fthvgb1/wp-go/safety"
"runtime"
"time" "time"
) )
@ -192,7 +193,7 @@ func NewPaginationCache[K comparable, V any](m *cache.MapCache[string, helper.Pa
func NewMapCache[K comparable, V any](data cache.Cache[K, V], batchFn cache.MapBatchFn[K, V], fn cache.MapSingleFn[K, V], args ...any) *cache.MapCache[K, V] { func NewMapCache[K comparable, V any](data cache.Cache[K, V], batchFn cache.MapBatchFn[K, V], fn cache.MapSingleFn[K, V], args ...any) *cache.MapCache[K, V] {
inc := helper.ParseArgs((*cache.IncreaseUpdate[K, V])(nil), args...) inc := helper.ParseArgs((*cache.IncreaseUpdate[K, V])(nil), args...)
m := cache.NewMapCache[K, V](data, fn, batchFn, inc, args...) m := cache.NewMapCache[K, V](data, fn, batchFn, inc, buildLockFn[K](args...), args...)
FlushPush(m) FlushPush(m)
ClearPush(m) ClearPush(m)
name, f := parseArgs(args...) name, f := parseArgs(args...)
@ -204,6 +205,35 @@ func NewMapCache[K comparable, V any](data cache.Cache[K, V], batchFn cache.MapB
} }
return m return m
} }
func buildLockFn[K comparable](args ...any) cache.LockFn[K] {
lockFn := helper.ParseArgs(cache.LockFn[K](nil), args...)
name := helper.ParseArgs("", args...)
num := helper.ParseArgs(runtime.NumCPU(), args...)
loFn := func() int {
return num
}
loFn = helper.ParseArgs(loFn, args...)
if name != "" {
loFn = reload.FnVal(str.Join("cachesLocksNum-", name), num, loFn)
}
if lockFn == nil {
looo := helper.ParseArgs(cache.Lockss[K](nil), args...)
if looo != nil {
lockFn = looo.GetLock
loo, ok := any(looo).(cache.LocksNum)
if ok && loo != nil {
loo.SetLockNum(num)
}
} else {
lo := cache.NewLocks[K](loFn)
lockFn = lo.GetLock
FlushPush(lo)
}
}
return lockFn
}
func NewMemoryMapCache[K comparable, V any](batchFn cache.MapBatchFn[K, V], func NewMemoryMapCache[K comparable, V any](batchFn cache.MapBatchFn[K, V],
fn cache.MapSingleFn[K, V], expireTime time.Duration, args ...any) *cache.MapCache[K, V] { fn cache.MapSingleFn[K, V], expireTime time.Duration, args ...any) *cache.MapCache[K, V] {

75
cache/locks.go vendored Normal file
View File

@ -0,0 +1,75 @@
package cache
import (
"context"
"github.com/fthvgb1/wp-go/safety"
"sync"
"sync/atomic"
)
type Locks[K comparable] struct {
numFn func() int
locks []*sync.Mutex
m *safety.Map[K, *sync.Mutex]
counter *int64
}
func (l *Locks[K]) Flush(_ context.Context) {
l.m.Flush()
atomic.StoreInt64(l.counter, 0)
}
func (l *Locks[K]) SetNumFn(numFn func() int) {
l.numFn = numFn
}
func NewLocks[K comparable](num func() int) *Locks[K] {
var i int64
return &Locks[K]{numFn: num, m: safety.NewMap[K, *sync.Mutex](), counter: &i}
}
func (l *Locks[K]) SetLockNum(num int) {
if num > 0 {
l.locks = make([]*sync.Mutex, num)
for i := 0; i < num; i++ {
l.locks[i] = &sync.Mutex{}
}
}
}
func (l *Locks[K]) GetLock(ctx context.Context, gMut *sync.Mutex, keys ...K) *sync.Mutex {
k := keys[0]
lo, ok := l.m.Load(k)
if ok {
return lo
}
gMut.Lock()
defer gMut.Unlock()
lo, ok = l.m.Load(k)
if ok {
return lo
}
num := l.numFn()
if num <= 0 {
lo = &sync.Mutex{}
l.m.Store(k, lo)
return lo
}
if len(l.locks) == 0 {
l.SetLockNum(num)
}
counter := int(atomic.LoadInt64(l.counter))
if counter > len(l.locks)-1 {
atomic.StoreInt64(l.counter, 0)
counter = 0
}
lo = l.locks[counter]
l.m.Store(k, lo)
atomic.AddInt64(l.counter, 1)
if len(l.locks) < num {
for i := 0; i < num-len(l.locks); i++ {
l.locks = append(l.locks, &sync.Mutex{})
}
}
return lo
}

28
cache/map.go vendored
View File

@ -13,7 +13,8 @@ import (
type MapCache[K comparable, V any] struct { type MapCache[K comparable, V any] struct {
Cache[K, V] Cache[K, V]
mux sync.Mutex mux *sync.Mutex
muFn func(ctx context.Context, gMut *sync.Mutex, k ...K) *sync.Mutex
cacheFunc MapSingleFn[K, V] cacheFunc MapSingleFn[K, V]
batchCacheFn MapBatchFn[K, V] batchCacheFn MapBatchFn[K, V]
getCacheBatch func(c context.Context, key []K, timeout time.Duration, params ...any) ([]V, error) getCacheBatch func(c context.Context, key []K, timeout time.Duration, params ...any) ([]V, error)
@ -63,13 +64,14 @@ type MapSingleFn[K, V any] func(context.Context, K, ...any) (V, error)
type MapBatchFn[K comparable, V any] func(context.Context, []K, ...any) (map[K]V, error) type MapBatchFn[K comparable, V any] func(context.Context, []K, ...any) (map[K]V, error)
type IncreaseFn[K comparable, V any] func(c context.Context, currentData V, k K, t time.Time, a ...any) (data V, save bool, refresh bool, err error) type IncreaseFn[K comparable, V any] func(c context.Context, currentData V, k K, t time.Time, a ...any) (data V, save bool, refresh bool, err error)
func NewMapCache[K comparable, V any](ca Cache[K, V], cacheFunc MapSingleFn[K, V], batchCacheFn MapBatchFn[K, V], inc *IncreaseUpdate[K, V], a ...any) *MapCache[K, V] { func NewMapCache[K comparable, V any](ca Cache[K, V], cacheFunc MapSingleFn[K, V], batchCacheFn MapBatchFn[K, V], inc *IncreaseUpdate[K, V], lockFn LockFn[K], a ...any) *MapCache[K, V] {
r := &MapCache[K, V]{ r := &MapCache[K, V]{
Cache: ca, Cache: ca,
mux: sync.Mutex{}, mux: &sync.Mutex{},
cacheFunc: cacheFunc, cacheFunc: cacheFunc,
batchCacheFn: batchCacheFn, batchCacheFn: batchCacheFn,
increaseUpdate: inc, increaseUpdate: inc,
muFn: lockFn,
} }
if cacheFunc == nil && batchCacheFn != nil { if cacheFunc == nil && batchCacheFn != nil {
r.setDefaultCacheFn(batchCacheFn) r.setDefaultCacheFn(batchCacheFn)
@ -218,8 +220,9 @@ func (m *MapCache[K, V]) increaseUpdates(c context.Context, timeout time.Duratio
return data, err return data, err
} }
fn := func() { fn := func() {
m.mux.Lock() l := m.muFn(c, m.mux, key)
defer m.mux.Unlock() l.Lock()
defer l.Unlock()
if nowTime.Sub(m.GetLastSetTime(c, key)) < m.increaseUpdate.CycleTime() { if nowTime.Sub(m.GetLastSetTime(c, key)) < m.increaseUpdate.CycleTime() {
return return
} }
@ -257,8 +260,9 @@ func (m *MapCache[K, V]) GetCache(c context.Context, key K, timeout time.Duratio
return m.increaseUpdates(c, timeout, data, key, params...) return m.increaseUpdates(c, timeout, data, key, params...)
} }
call := func() { call := func() {
m.mux.Lock() l := m.muFn(c, m.mux, key)
defer m.mux.Unlock() l.Lock()
defer l.Unlock()
if data, ok = m.Get(c, key); ok { if data, ok = m.Get(c, key); ok {
return return
} }
@ -377,8 +381,9 @@ func (m *MapCache[K, V]) getBatchToMapes(c context.Context, key []K, timeout tim
} }
call := func() { call := func() {
m.mux.Lock() l := m.muFn(c, m.mux, key...)
defer m.mux.Unlock() l.Lock()
defer l.Unlock()
needFlushs := maps.FilterToSlice(needIndex, func(k K, v int) (K, bool) { needFlushs := maps.FilterToSlice(needIndex, func(k K, v int) (K, bool) {
vv, ok := m.Get(c, k) vv, ok := m.Get(c, k)
if ok { if ok {
@ -442,8 +447,9 @@ func (m *MapCache[K, V]) getCacheBatchs(c context.Context, key []K, timeout time
var err error var err error
call := func() { call := func() {
m.mux.Lock() l := m.muFn(c, m.mux, key...)
defer m.mux.Unlock() l.Lock()
defer l.Unlock()
needFlushs := maps.FilterToSlice(needIndex, func(k K, v int) (K, bool) { needFlushs := maps.FilterToSlice(needIndex, func(k K, v int) (K, bool) {
vv, ok := m.Get(c, k) vv, ok := m.Get(c, k)
if ok { if ok {