From d72bed0c8c4e7206c849e0d36035a2bff3ec083b Mon Sep 17 00:00:00 2001 From: xing Date: Sun, 10 Dec 2023 19:15:49 +0800 Subject: [PATCH] refine cache --- cache/cachemanager/manger.go | 4 +- cache/map.go | 97 +++++++++++++++++++++++++++++++++++- cache/pagination.go | 26 +++++++++- cache/vars.go | 73 ++++++++++++++++++++++++--- helper/slice/slices.go | 20 +++++++- safety/slice_test.go | 28 ++++++++--- 6 files changed, 225 insertions(+), 23 deletions(-) diff --git a/cache/cachemanager/manger.go b/cache/cachemanager/manger.go index 1886481..96e36b5 100644 --- a/cache/cachemanager/manger.go +++ b/cache/cachemanager/manger.go @@ -190,7 +190,7 @@ func NewPaginationCache[K comparable, V any](m *cache.MapCache[string, helper.Pa func NewMapCache[K comparable, V any](data cache.Cache[K, V], batchFn cache.MapBatchFn[K, V], fn cache.MapSingleFn[K, V], args ...any) *cache.MapCache[K, V] { inc := helper.ParseArgs((*cache.IncreaseUpdate[K, V])(nil), args...) - m := cache.NewMapCache[K, V](data, fn, batchFn, inc) + m := cache.NewMapCache[K, V](data, fn, batchFn, inc, args...) FlushPush(m) ClearPush(m) name, f := parseArgs(args...) @@ -242,7 +242,7 @@ func ClearExpired() { func NewVarCache[T any](c cache.AnyCache[T], fn func(context.Context, ...any) (T, error), a ...any) *cache.VarCache[T] { inc := helper.ParseArgs((*cache.IncreaseUpdateVar[T])(nil), a...) ref := helper.ParseArgs(cache.RefreshVar[T](nil), a...) - v := cache.NewVarCache(c, fn, inc, ref) + v := cache.NewVarCache(c, fn, inc, ref, a...) FlushPush(v) name, _ := parseArgs(a...) if name != "" { diff --git a/cache/map.go b/cache/map.go index 54f27b6..fe2bde1 100644 --- a/cache/map.go +++ b/cache/map.go @@ -20,7 +20,35 @@ type MapCache[K comparable, V any] struct { getCacheBatchToMap func(c context.Context, key []K, timeout time.Duration, params ...any) (map[K]V, error) increaseUpdate *IncreaseUpdate[K, V] refresh Refresh[K, V] + gets func(ctx context.Context, key K) (V, bool) + sets func(ctx context.Context, key K, val V) + getExpireTimes func(ctx context.Context) time.Duration + ttl func(ctx context.Context, key K) time.Duration + flush func(ctx context.Context) + del func(ctx context.Context, key ...K) + clearExpired func(ctx context.Context) } + +func (m *MapCache[K, V]) Get(ctx context.Context, key K) (V, bool) { + return m.gets(ctx, key) +} + +func (m *MapCache[K, V]) Set(ctx context.Context, key K, val V) { + m.sets(ctx, key, val) +} +func (m *MapCache[K, V]) Ttl(ctx context.Context, key K) time.Duration { + return m.ttl(ctx, key) +} +func (m *MapCache[K, V]) GetExpireTime(ctx context.Context) time.Duration { + return m.getExpireTimes(ctx) +} +func (m *MapCache[K, V]) Del(ctx context.Context, key ...K) { + m.del(ctx, key...) +} +func (m *MapCache[K, V]) ClearExpired(ctx context.Context) { + m.clearExpired(ctx) +} + type IncreaseUpdate[K comparable, V any] struct { CycleTime func() time.Duration Fn IncreaseFn[K, V] @@ -35,7 +63,7 @@ type MapSingleFn[K, V any] func(context.Context, K, ...any) (V, error) type MapBatchFn[K comparable, V any] func(context.Context, []K, ...any) (map[K]V, error) type IncreaseFn[K comparable, V any] func(c context.Context, currentData V, k K, t time.Time, a ...any) (data V, save bool, refresh bool, err error) -func NewMapCache[K comparable, V any](ca Cache[K, V], cacheFunc MapSingleFn[K, V], batchCacheFn MapBatchFn[K, V], inc *IncreaseUpdate[K, V]) *MapCache[K, V] { +func NewMapCache[K comparable, V any](ca Cache[K, V], cacheFunc MapSingleFn[K, V], batchCacheFn MapBatchFn[K, V], inc *IncreaseUpdate[K, V], a ...any) *MapCache[K, V] { r := &MapCache[K, V]{ Cache: ca, mux: sync.Mutex{}, @@ -60,9 +88,74 @@ func NewMapCache[K comparable, V any](ca Cache[K, V], cacheFunc MapSingleFn[K, V if ok { r.refresh = re } + initCache(r, a...) return r } +func initCache[K comparable, V any](r *MapCache[K, V], a ...any) { + gets := helper.ParseArgs[func(Cache[K, V], context.Context, K) (V, bool)](nil, a...) + if gets == nil { + r.gets = r.Cache.Get + } else { + r.gets = func(ctx context.Context, key K) (V, bool) { + return gets(r.Cache, ctx, key) + } + } + + sets := helper.ParseArgs[func(Cache[K, V], context.Context, K, V)](nil, a...) + if sets == nil { + r.sets = r.Cache.Set + } else { + r.sets = func(ctx context.Context, key K, val V) { + sets(r.Cache, ctx, key, val) + } + } + + getExpireTimes := helper.ParseArgs[func(Cache[K, V], context.Context) time.Duration](nil, a...) + if getExpireTimes == nil { + r.getExpireTimes = r.Cache.GetExpireTime + } else { + r.getExpireTimes = func(ctx context.Context) time.Duration { + return getExpireTimes(r.Cache, ctx) + } + } + + ttl := helper.ParseArgs[func(Cache[K, V], context.Context, K) time.Duration](nil, a...) + if ttl == nil { + r.ttl = r.Cache.Ttl + } else { + r.ttl = func(ctx context.Context, k K) time.Duration { + return ttl(r.Cache, ctx, k) + } + } + + del := helper.ParseArgs[func(Cache[K, V], context.Context, ...K)](nil, a...) + if del == nil { + r.del = r.Cache.Del + } else { + r.del = func(ctx context.Context, key ...K) { + del(r.Cache, ctx, key...) + } + } + + flushAndClearExpired := helper.ParseArgs[[]func(Cache[K, V], context.Context)](nil, a...) + if flushAndClearExpired == nil { + r.flush = r.Cache.Flush + r.clearExpired = r.Cache.ClearExpired + } else { + r.flush = func(ctx context.Context) { + flushAndClearExpired[0](r.Cache, ctx) + } + if len(flushAndClearExpired) > 1 { + r.clearExpired = func(ctx context.Context) { + flushAndClearExpired[1](r.Cache, ctx) + } + } else { + r.clearExpired = r.Cache.ClearExpired + } + } +} + func (m *MapCache[K, V]) SetDefaultBatchFunc(fn MapSingleFn[K, V]) { m.batchCacheFn = func(ctx context.Context, ids []K, a ...any) (map[K]V, error) { var err error @@ -115,7 +208,7 @@ func (m *MapCache[K, V]) setDefaultCacheFn(fn MapBatchFn[K, V]) { func (m *MapCache[K, V]) Flush(ctx context.Context) { m.mux.Lock() defer m.mux.Unlock() - m.Cache.Flush(ctx) + m.flush(ctx) } func (m *MapCache[K, V]) increaseUpdates(c context.Context, timeout time.Duration, data V, key K, params ...any) (V, error) { diff --git a/cache/pagination.go b/cache/pagination.go index 609a760..c62183f 100644 --- a/cache/pagination.go +++ b/cache/pagination.go @@ -19,6 +19,7 @@ type Pagination[K comparable, V any] struct { dbFn func(ctx context.Context, k K, page, limit, totalRaw int, a ...any) ([]V, int, error) localFn func(ctx context.Context, data []V, k K, page, limit int, a ...any) ([]V, int, error) batchFetchNum func() int + localKeyFn func(K K, a ...any) string dbKeyFn func(K K, a ...any) string name string } @@ -29,7 +30,7 @@ type DbFn[K comparable, V any] func(ctx context.Context, k K, page, limit, total type LocalFn[K comparable, V any] func(ctx context.Context, data []V, k K, page, limit int, a ...any) ([]V, int, error) -func NewPagination[K comparable, V any](m *MapCache[string, helper.PaginationData[V]], maxNum func() int, dbFn DbFn[K, V], localFn LocalFn[K, V], dbKeyFn func(K, ...any) string, batchFetchNum func() int, name string) *Pagination[K, V] { +func NewPagination[K comparable, V any](m *MapCache[string, helper.PaginationData[V]], maxNum func() int, dbFn DbFn[K, V], localFn LocalFn[K, V], dbKeyFn, localKeyFn func(K, ...any) string, batchFetchNum func() int, name string) *Pagination[K, V] { if dbKeyFn == nil { dbKeyFn = func(k K, a ...any) string { s := str.NewBuilder() @@ -40,6 +41,11 @@ func NewPagination[K comparable, V any](m *MapCache[string, helper.PaginationDat return strings.TrimRight(s.String(), "|") } } + if localKeyFn == nil { + localKeyFn = func(k K, a ...any) string { + return fmt.Sprintf("%v", k) + } + } return &Pagination[K, V]{ MapCache: m, maxNum: maxNum, @@ -49,6 +55,7 @@ func NewPagination[K comparable, V any](m *MapCache[string, helper.PaginationDat batchFetchNum: batchFetchNum, name: name, dbKeyFn: dbKeyFn, + localKeyFn: localKeyFn, } } @@ -68,7 +75,7 @@ func (p *Pagination[K, V]) Pagination(ctx context.Context, timeout time.Duration } func (p *Pagination[K, V]) paginationByLocal(ctx context.Context, timeout time.Duration, k K, page, limit int, a ...any) ([]V, int, error) { - key := fmt.Sprintf("%v", k) + key := p.localKeyFn(k) data, ok := p.Get(ctx, key) if ok { if p.increaseUpdate != nil && p.refresh != nil { @@ -83,6 +90,12 @@ func (p *Pagination[K, V]) paginationByLocal(ctx context.Context, timeout time.D } return p.localFn(ctx, data.Data, k, page, limit, a...) } + p.mux.Lock() + defer p.mux.Unlock() + data, ok = p.Get(ctx, key) + if ok { + return data.Data, data.TotalRaw, nil + } batchNum := p.batchFetchNum() da, totalRaw, err := p.fetchDb(ctx, timeout, k, 1, 0, 0, a...) if err != nil { @@ -101,6 +114,9 @@ func (p *Pagination[K, V]) paginationByLocal(ctx context.Context, timeout time.D } da = append(da, daa...) } + data.Data = da + data.TotalRaw = totalRaw + p.Set(ctx, key, data) } return p.localFn(ctx, data.Data, k, page, limit, a...) @@ -112,6 +128,12 @@ func (p *Pagination[K, V]) paginationByDB(ctx context.Context, timeout time.Dura if ok { return data.Data, data.TotalRaw, nil } + p.mux.Lock() + defer p.mux.Unlock() + data, ok = p.Get(ctx, key) + if ok { + return data.Data, data.TotalRaw, nil + } dat, total, err := p.fetchDb(ctx, timeout, k, page, limit, totalRaw, a...) if err != nil { return nil, 0, err diff --git a/cache/vars.go b/cache/vars.go index 45776ea..3f42b9f 100644 --- a/cache/vars.go +++ b/cache/vars.go @@ -14,6 +14,10 @@ type VarCache[T any] struct { mutex sync.Mutex increaseUpdate *IncreaseUpdateVar[T] refresh RefreshVar[T] + get func(ctx context.Context) (T, bool) + set func(ctx context.Context, v T) + flush func(ctx context.Context) + getLastSetTime func(ctx context.Context) time.Time } type IncreaseUpdateVar[T any] struct { @@ -23,6 +27,67 @@ type IncreaseUpdateVar[T any] struct { type IncreaseVarFn[T any] func(c context.Context, currentData T, t time.Time, a ...any) (data T, save bool, refresh bool, err error) +func (t *VarCache[T]) Get(ctx context.Context) (T, bool) { + return t.get(ctx) +} +func (t *VarCache[T]) Set(ctx context.Context, v T) { + t.set(ctx, v) +} +func (t *VarCache[T]) Flush(ctx context.Context) { + t.flush(ctx) +} +func (t *VarCache[T]) GetLastSetTime(ctx context.Context) time.Time { + return t.getLastSetTime(ctx) +} + +func initVarCache[T any](t *VarCache[T], a ...any) { + gets := helper.ParseArgs[func(AnyCache[T], context.Context) (T, bool)](nil, a...) + if gets == nil { + t.get = t.AnyCache.Get + } else { + t.get = func(ctx context.Context) (T, bool) { + return gets(t.AnyCache, ctx) + } + } + + set := helper.ParseArgs[func(AnyCache[T], context.Context, T)](nil, a...) + if set == nil { + t.set = t.AnyCache.Set + } else { + t.set = func(ctx context.Context, v T) { + set(t.AnyCache, ctx, v) + } + } + + flush := helper.ParseArgs[func(AnyCache[T], context.Context)](nil, a...) + if flush == nil { + t.flush = t.AnyCache.Flush + } else { + t.flush = func(ctx context.Context) { + flush(t.AnyCache, ctx) + } + } + + getLastSetTime := helper.ParseArgs[func(AnyCache[T], context.Context) time.Time](nil, a...) + if getLastSetTime == nil { + t.getLastSetTime = t.AnyCache.GetLastSetTime + } else { + t.getLastSetTime = func(ctx context.Context) time.Time { + return getLastSetTime(t.AnyCache, ctx) + } + } +} + +func NewVarCache[T any](cache AnyCache[T], fn func(context.Context, ...any) (T, error), inc *IncreaseUpdateVar[T], ref RefreshVar[T], a ...any) *VarCache[T] { + r := &VarCache[T]{ + AnyCache: cache, setCacheFunc: fn, mutex: sync.Mutex{}, + increaseUpdate: inc, + refresh: ref, + } + initVarCache(r, a...) + return r +} + func (t *VarCache[T]) GetCache(ctx context.Context, timeout time.Duration, params ...any) (T, error) { data, ok := t.Get(ctx) var err error @@ -125,14 +190,6 @@ func (c *VarMemoryCache[T]) GetLastSetTime(_ context.Context) time.Time { return c.v.Load().setTime } -func NewVarCache[T any](cache AnyCache[T], fn func(context.Context, ...any) (T, error), inc *IncreaseUpdateVar[T], ref RefreshVar[T]) *VarCache[T] { - return &VarCache[T]{ - AnyCache: cache, setCacheFunc: fn, mutex: sync.Mutex{}, - increaseUpdate: inc, - refresh: ref, - } -} - func (c *VarMemoryCache[T]) Flush(_ context.Context) { c.v.Flush() } diff --git a/helper/slice/slices.go b/helper/slice/slices.go index e6a39d2..1097dfd 100644 --- a/helper/slice/slices.go +++ b/helper/slice/slices.go @@ -62,11 +62,27 @@ func Delete[T any](a *[]T, index int) { *a = append(arr[:index], arr[index+1:]...) } -func Copy[T any](a []T) []T { - dst := make([]T, len(a)) +func Copy[T any](a []T, l ...int) []T { + length := len(a) + if len(l) > 0 { + length = l[0] + } + var dst []T + if len(a) < length { + dst = make([]T, len(a), length) + } else { + dst = make([]T, length) + } copy(dst, a) return dst } +func Copies[T any](a ...[]T) []T { + var r []T + for _, ts := range a { + r = append(r, ts...) + } + return r +} func Unshift[T any](a *[]T, e ...T) { *a = append(e, *a...) diff --git a/safety/slice_test.go b/safety/slice_test.go index 7ac307d..d9b928c 100644 --- a/safety/slice_test.go +++ b/safety/slice_test.go @@ -3,8 +3,8 @@ package safety import ( "fmt" "github.com/fthvgb1/wp-go/helper/number" + "github.com/fthvgb1/wp-go/taskPools" "testing" - "time" ) func TestSlice_Append(t *testing.T) { @@ -26,13 +26,27 @@ func TestSlice_Append(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { fn := func() { - tt.r.Append(tt.args.t...) + switch number.Rand(1, 3) { + case 1: + f := tt.r.Load() + fmt.Println(f) + case 2: + tt.r.Append(tt.args.t...) + case 3: + /*s := tt.r.Load() + if len(s) < 1 { + break + } + ii, v := slice.Rand(number.Range(0, len(s))) + s[ii] = v*/ + } + } - go fn() - go fn() - go fn() - go fn() - time.Sleep(time.Second) + p := taskPools.NewPools(20) + for i := 0; i < 50; i++ { + p.Execute(fn) + } + p.Wait() fmt.Println(tt.r.Load()) }) }