refine cache

This commit is contained in:
xing 2023-12-10 19:15:49 +08:00
parent 0cdb3ba040
commit d72bed0c8c
6 changed files with 225 additions and 23 deletions

View File

@ -190,7 +190,7 @@ func NewPaginationCache[K comparable, V any](m *cache.MapCache[string, helper.Pa
func NewMapCache[K comparable, V any](data cache.Cache[K, V], batchFn cache.MapBatchFn[K, V], fn cache.MapSingleFn[K, V], args ...any) *cache.MapCache[K, V] { func NewMapCache[K comparable, V any](data cache.Cache[K, V], batchFn cache.MapBatchFn[K, V], fn cache.MapSingleFn[K, V], args ...any) *cache.MapCache[K, V] {
inc := helper.ParseArgs((*cache.IncreaseUpdate[K, V])(nil), args...) inc := helper.ParseArgs((*cache.IncreaseUpdate[K, V])(nil), args...)
m := cache.NewMapCache[K, V](data, fn, batchFn, inc) m := cache.NewMapCache[K, V](data, fn, batchFn, inc, args...)
FlushPush(m) FlushPush(m)
ClearPush(m) ClearPush(m)
name, f := parseArgs(args...) name, f := parseArgs(args...)
@ -242,7 +242,7 @@ func ClearExpired() {
func NewVarCache[T any](c cache.AnyCache[T], fn func(context.Context, ...any) (T, error), a ...any) *cache.VarCache[T] { func NewVarCache[T any](c cache.AnyCache[T], fn func(context.Context, ...any) (T, error), a ...any) *cache.VarCache[T] {
inc := helper.ParseArgs((*cache.IncreaseUpdateVar[T])(nil), a...) inc := helper.ParseArgs((*cache.IncreaseUpdateVar[T])(nil), a...)
ref := helper.ParseArgs(cache.RefreshVar[T](nil), a...) ref := helper.ParseArgs(cache.RefreshVar[T](nil), a...)
v := cache.NewVarCache(c, fn, inc, ref) v := cache.NewVarCache(c, fn, inc, ref, a...)
FlushPush(v) FlushPush(v)
name, _ := parseArgs(a...) name, _ := parseArgs(a...)
if name != "" { if name != "" {

97
cache/map.go vendored
View File

@ -20,7 +20,35 @@ type MapCache[K comparable, V any] struct {
getCacheBatchToMap func(c context.Context, key []K, timeout time.Duration, params ...any) (map[K]V, error) getCacheBatchToMap func(c context.Context, key []K, timeout time.Duration, params ...any) (map[K]V, error)
increaseUpdate *IncreaseUpdate[K, V] increaseUpdate *IncreaseUpdate[K, V]
refresh Refresh[K, V] refresh Refresh[K, V]
gets func(ctx context.Context, key K) (V, bool)
sets func(ctx context.Context, key K, val V)
getExpireTimes func(ctx context.Context) time.Duration
ttl func(ctx context.Context, key K) time.Duration
flush func(ctx context.Context)
del func(ctx context.Context, key ...K)
clearExpired func(ctx context.Context)
} }
func (m *MapCache[K, V]) Get(ctx context.Context, key K) (V, bool) {
return m.gets(ctx, key)
}
func (m *MapCache[K, V]) Set(ctx context.Context, key K, val V) {
m.sets(ctx, key, val)
}
func (m *MapCache[K, V]) Ttl(ctx context.Context, key K) time.Duration {
return m.ttl(ctx, key)
}
func (m *MapCache[K, V]) GetExpireTime(ctx context.Context) time.Duration {
return m.getExpireTimes(ctx)
}
func (m *MapCache[K, V]) Del(ctx context.Context, key ...K) {
m.del(ctx, key...)
}
func (m *MapCache[K, V]) ClearExpired(ctx context.Context) {
m.clearExpired(ctx)
}
type IncreaseUpdate[K comparable, V any] struct { type IncreaseUpdate[K comparable, V any] struct {
CycleTime func() time.Duration CycleTime func() time.Duration
Fn IncreaseFn[K, V] Fn IncreaseFn[K, V]
@ -35,7 +63,7 @@ type MapSingleFn[K, V any] func(context.Context, K, ...any) (V, error)
type MapBatchFn[K comparable, V any] func(context.Context, []K, ...any) (map[K]V, error) type MapBatchFn[K comparable, V any] func(context.Context, []K, ...any) (map[K]V, error)
type IncreaseFn[K comparable, V any] func(c context.Context, currentData V, k K, t time.Time, a ...any) (data V, save bool, refresh bool, err error) type IncreaseFn[K comparable, V any] func(c context.Context, currentData V, k K, t time.Time, a ...any) (data V, save bool, refresh bool, err error)
func NewMapCache[K comparable, V any](ca Cache[K, V], cacheFunc MapSingleFn[K, V], batchCacheFn MapBatchFn[K, V], inc *IncreaseUpdate[K, V]) *MapCache[K, V] { func NewMapCache[K comparable, V any](ca Cache[K, V], cacheFunc MapSingleFn[K, V], batchCacheFn MapBatchFn[K, V], inc *IncreaseUpdate[K, V], a ...any) *MapCache[K, V] {
r := &MapCache[K, V]{ r := &MapCache[K, V]{
Cache: ca, Cache: ca,
mux: sync.Mutex{}, mux: sync.Mutex{},
@ -60,9 +88,74 @@ func NewMapCache[K comparable, V any](ca Cache[K, V], cacheFunc MapSingleFn[K, V
if ok { if ok {
r.refresh = re r.refresh = re
} }
initCache(r, a...)
return r return r
} }
func initCache[K comparable, V any](r *MapCache[K, V], a ...any) {
gets := helper.ParseArgs[func(Cache[K, V], context.Context, K) (V, bool)](nil, a...)
if gets == nil {
r.gets = r.Cache.Get
} else {
r.gets = func(ctx context.Context, key K) (V, bool) {
return gets(r.Cache, ctx, key)
}
}
sets := helper.ParseArgs[func(Cache[K, V], context.Context, K, V)](nil, a...)
if sets == nil {
r.sets = r.Cache.Set
} else {
r.sets = func(ctx context.Context, key K, val V) {
sets(r.Cache, ctx, key, val)
}
}
getExpireTimes := helper.ParseArgs[func(Cache[K, V], context.Context) time.Duration](nil, a...)
if getExpireTimes == nil {
r.getExpireTimes = r.Cache.GetExpireTime
} else {
r.getExpireTimes = func(ctx context.Context) time.Duration {
return getExpireTimes(r.Cache, ctx)
}
}
ttl := helper.ParseArgs[func(Cache[K, V], context.Context, K) time.Duration](nil, a...)
if ttl == nil {
r.ttl = r.Cache.Ttl
} else {
r.ttl = func(ctx context.Context, k K) time.Duration {
return ttl(r.Cache, ctx, k)
}
}
del := helper.ParseArgs[func(Cache[K, V], context.Context, ...K)](nil, a...)
if del == nil {
r.del = r.Cache.Del
} else {
r.del = func(ctx context.Context, key ...K) {
del(r.Cache, ctx, key...)
}
}
flushAndClearExpired := helper.ParseArgs[[]func(Cache[K, V], context.Context)](nil, a...)
if flushAndClearExpired == nil {
r.flush = r.Cache.Flush
r.clearExpired = r.Cache.ClearExpired
} else {
r.flush = func(ctx context.Context) {
flushAndClearExpired[0](r.Cache, ctx)
}
if len(flushAndClearExpired) > 1 {
r.clearExpired = func(ctx context.Context) {
flushAndClearExpired[1](r.Cache, ctx)
}
} else {
r.clearExpired = r.Cache.ClearExpired
}
}
}
func (m *MapCache[K, V]) SetDefaultBatchFunc(fn MapSingleFn[K, V]) { func (m *MapCache[K, V]) SetDefaultBatchFunc(fn MapSingleFn[K, V]) {
m.batchCacheFn = func(ctx context.Context, ids []K, a ...any) (map[K]V, error) { m.batchCacheFn = func(ctx context.Context, ids []K, a ...any) (map[K]V, error) {
var err error var err error
@ -115,7 +208,7 @@ func (m *MapCache[K, V]) setDefaultCacheFn(fn MapBatchFn[K, V]) {
func (m *MapCache[K, V]) Flush(ctx context.Context) { func (m *MapCache[K, V]) Flush(ctx context.Context) {
m.mux.Lock() m.mux.Lock()
defer m.mux.Unlock() defer m.mux.Unlock()
m.Cache.Flush(ctx) m.flush(ctx)
} }
func (m *MapCache[K, V]) increaseUpdates(c context.Context, timeout time.Duration, data V, key K, params ...any) (V, error) { func (m *MapCache[K, V]) increaseUpdates(c context.Context, timeout time.Duration, data V, key K, params ...any) (V, error) {

26
cache/pagination.go vendored
View File

@ -19,6 +19,7 @@ type Pagination[K comparable, V any] struct {
dbFn func(ctx context.Context, k K, page, limit, totalRaw int, a ...any) ([]V, int, error) dbFn func(ctx context.Context, k K, page, limit, totalRaw int, a ...any) ([]V, int, error)
localFn func(ctx context.Context, data []V, k K, page, limit int, a ...any) ([]V, int, error) localFn func(ctx context.Context, data []V, k K, page, limit int, a ...any) ([]V, int, error)
batchFetchNum func() int batchFetchNum func() int
localKeyFn func(K K, a ...any) string
dbKeyFn func(K K, a ...any) string dbKeyFn func(K K, a ...any) string
name string name string
} }
@ -29,7 +30,7 @@ type DbFn[K comparable, V any] func(ctx context.Context, k K, page, limit, total
type LocalFn[K comparable, V any] func(ctx context.Context, data []V, k K, page, limit int, a ...any) ([]V, int, error) type LocalFn[K comparable, V any] func(ctx context.Context, data []V, k K, page, limit int, a ...any) ([]V, int, error)
func NewPagination[K comparable, V any](m *MapCache[string, helper.PaginationData[V]], maxNum func() int, dbFn DbFn[K, V], localFn LocalFn[K, V], dbKeyFn func(K, ...any) string, batchFetchNum func() int, name string) *Pagination[K, V] { func NewPagination[K comparable, V any](m *MapCache[string, helper.PaginationData[V]], maxNum func() int, dbFn DbFn[K, V], localFn LocalFn[K, V], dbKeyFn, localKeyFn func(K, ...any) string, batchFetchNum func() int, name string) *Pagination[K, V] {
if dbKeyFn == nil { if dbKeyFn == nil {
dbKeyFn = func(k K, a ...any) string { dbKeyFn = func(k K, a ...any) string {
s := str.NewBuilder() s := str.NewBuilder()
@ -40,6 +41,11 @@ func NewPagination[K comparable, V any](m *MapCache[string, helper.PaginationDat
return strings.TrimRight(s.String(), "|") return strings.TrimRight(s.String(), "|")
} }
} }
if localKeyFn == nil {
localKeyFn = func(k K, a ...any) string {
return fmt.Sprintf("%v", k)
}
}
return &Pagination[K, V]{ return &Pagination[K, V]{
MapCache: m, MapCache: m,
maxNum: maxNum, maxNum: maxNum,
@ -49,6 +55,7 @@ func NewPagination[K comparable, V any](m *MapCache[string, helper.PaginationDat
batchFetchNum: batchFetchNum, batchFetchNum: batchFetchNum,
name: name, name: name,
dbKeyFn: dbKeyFn, dbKeyFn: dbKeyFn,
localKeyFn: localKeyFn,
} }
} }
@ -68,7 +75,7 @@ func (p *Pagination[K, V]) Pagination(ctx context.Context, timeout time.Duration
} }
func (p *Pagination[K, V]) paginationByLocal(ctx context.Context, timeout time.Duration, k K, page, limit int, a ...any) ([]V, int, error) { func (p *Pagination[K, V]) paginationByLocal(ctx context.Context, timeout time.Duration, k K, page, limit int, a ...any) ([]V, int, error) {
key := fmt.Sprintf("%v", k) key := p.localKeyFn(k)
data, ok := p.Get(ctx, key) data, ok := p.Get(ctx, key)
if ok { if ok {
if p.increaseUpdate != nil && p.refresh != nil { if p.increaseUpdate != nil && p.refresh != nil {
@ -83,6 +90,12 @@ func (p *Pagination[K, V]) paginationByLocal(ctx context.Context, timeout time.D
} }
return p.localFn(ctx, data.Data, k, page, limit, a...) return p.localFn(ctx, data.Data, k, page, limit, a...)
} }
p.mux.Lock()
defer p.mux.Unlock()
data, ok = p.Get(ctx, key)
if ok {
return data.Data, data.TotalRaw, nil
}
batchNum := p.batchFetchNum() batchNum := p.batchFetchNum()
da, totalRaw, err := p.fetchDb(ctx, timeout, k, 1, 0, 0, a...) da, totalRaw, err := p.fetchDb(ctx, timeout, k, 1, 0, 0, a...)
if err != nil { if err != nil {
@ -101,6 +114,9 @@ func (p *Pagination[K, V]) paginationByLocal(ctx context.Context, timeout time.D
} }
da = append(da, daa...) da = append(da, daa...)
} }
data.Data = da
data.TotalRaw = totalRaw
p.Set(ctx, key, data)
} }
return p.localFn(ctx, data.Data, k, page, limit, a...) return p.localFn(ctx, data.Data, k, page, limit, a...)
@ -112,6 +128,12 @@ func (p *Pagination[K, V]) paginationByDB(ctx context.Context, timeout time.Dura
if ok { if ok {
return data.Data, data.TotalRaw, nil return data.Data, data.TotalRaw, nil
} }
p.mux.Lock()
defer p.mux.Unlock()
data, ok = p.Get(ctx, key)
if ok {
return data.Data, data.TotalRaw, nil
}
dat, total, err := p.fetchDb(ctx, timeout, k, page, limit, totalRaw, a...) dat, total, err := p.fetchDb(ctx, timeout, k, page, limit, totalRaw, a...)
if err != nil { if err != nil {
return nil, 0, err return nil, 0, err

73
cache/vars.go vendored
View File

@ -14,6 +14,10 @@ type VarCache[T any] struct {
mutex sync.Mutex mutex sync.Mutex
increaseUpdate *IncreaseUpdateVar[T] increaseUpdate *IncreaseUpdateVar[T]
refresh RefreshVar[T] refresh RefreshVar[T]
get func(ctx context.Context) (T, bool)
set func(ctx context.Context, v T)
flush func(ctx context.Context)
getLastSetTime func(ctx context.Context) time.Time
} }
type IncreaseUpdateVar[T any] struct { type IncreaseUpdateVar[T any] struct {
@ -23,6 +27,67 @@ type IncreaseUpdateVar[T any] struct {
type IncreaseVarFn[T any] func(c context.Context, currentData T, t time.Time, a ...any) (data T, save bool, refresh bool, err error) type IncreaseVarFn[T any] func(c context.Context, currentData T, t time.Time, a ...any) (data T, save bool, refresh bool, err error)
func (t *VarCache[T]) Get(ctx context.Context) (T, bool) {
return t.get(ctx)
}
func (t *VarCache[T]) Set(ctx context.Context, v T) {
t.set(ctx, v)
}
func (t *VarCache[T]) Flush(ctx context.Context) {
t.flush(ctx)
}
func (t *VarCache[T]) GetLastSetTime(ctx context.Context) time.Time {
return t.getLastSetTime(ctx)
}
func initVarCache[T any](t *VarCache[T], a ...any) {
gets := helper.ParseArgs[func(AnyCache[T], context.Context) (T, bool)](nil, a...)
if gets == nil {
t.get = t.AnyCache.Get
} else {
t.get = func(ctx context.Context) (T, bool) {
return gets(t.AnyCache, ctx)
}
}
set := helper.ParseArgs[func(AnyCache[T], context.Context, T)](nil, a...)
if set == nil {
t.set = t.AnyCache.Set
} else {
t.set = func(ctx context.Context, v T) {
set(t.AnyCache, ctx, v)
}
}
flush := helper.ParseArgs[func(AnyCache[T], context.Context)](nil, a...)
if flush == nil {
t.flush = t.AnyCache.Flush
} else {
t.flush = func(ctx context.Context) {
flush(t.AnyCache, ctx)
}
}
getLastSetTime := helper.ParseArgs[func(AnyCache[T], context.Context) time.Time](nil, a...)
if getLastSetTime == nil {
t.getLastSetTime = t.AnyCache.GetLastSetTime
} else {
t.getLastSetTime = func(ctx context.Context) time.Time {
return getLastSetTime(t.AnyCache, ctx)
}
}
}
func NewVarCache[T any](cache AnyCache[T], fn func(context.Context, ...any) (T, error), inc *IncreaseUpdateVar[T], ref RefreshVar[T], a ...any) *VarCache[T] {
r := &VarCache[T]{
AnyCache: cache, setCacheFunc: fn, mutex: sync.Mutex{},
increaseUpdate: inc,
refresh: ref,
}
initVarCache(r, a...)
return r
}
func (t *VarCache[T]) GetCache(ctx context.Context, timeout time.Duration, params ...any) (T, error) { func (t *VarCache[T]) GetCache(ctx context.Context, timeout time.Duration, params ...any) (T, error) {
data, ok := t.Get(ctx) data, ok := t.Get(ctx)
var err error var err error
@ -125,14 +190,6 @@ func (c *VarMemoryCache[T]) GetLastSetTime(_ context.Context) time.Time {
return c.v.Load().setTime return c.v.Load().setTime
} }
func NewVarCache[T any](cache AnyCache[T], fn func(context.Context, ...any) (T, error), inc *IncreaseUpdateVar[T], ref RefreshVar[T]) *VarCache[T] {
return &VarCache[T]{
AnyCache: cache, setCacheFunc: fn, mutex: sync.Mutex{},
increaseUpdate: inc,
refresh: ref,
}
}
func (c *VarMemoryCache[T]) Flush(_ context.Context) { func (c *VarMemoryCache[T]) Flush(_ context.Context) {
c.v.Flush() c.v.Flush()
} }

View File

@ -62,11 +62,27 @@ func Delete[T any](a *[]T, index int) {
*a = append(arr[:index], arr[index+1:]...) *a = append(arr[:index], arr[index+1:]...)
} }
func Copy[T any](a []T) []T { func Copy[T any](a []T, l ...int) []T {
dst := make([]T, len(a)) length := len(a)
if len(l) > 0 {
length = l[0]
}
var dst []T
if len(a) < length {
dst = make([]T, len(a), length)
} else {
dst = make([]T, length)
}
copy(dst, a) copy(dst, a)
return dst return dst
} }
func Copies[T any](a ...[]T) []T {
var r []T
for _, ts := range a {
r = append(r, ts...)
}
return r
}
func Unshift[T any](a *[]T, e ...T) { func Unshift[T any](a *[]T, e ...T) {
*a = append(e, *a...) *a = append(e, *a...)

View File

@ -3,8 +3,8 @@ package safety
import ( import (
"fmt" "fmt"
"github.com/fthvgb1/wp-go/helper/number" "github.com/fthvgb1/wp-go/helper/number"
"github.com/fthvgb1/wp-go/taskPools"
"testing" "testing"
"time"
) )
func TestSlice_Append(t *testing.T) { func TestSlice_Append(t *testing.T) {
@ -26,13 +26,27 @@ func TestSlice_Append(t *testing.T) {
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
fn := func() { fn := func() {
tt.r.Append(tt.args.t...) switch number.Rand(1, 3) {
case 1:
f := tt.r.Load()
fmt.Println(f)
case 2:
tt.r.Append(tt.args.t...)
case 3:
/*s := tt.r.Load()
if len(s) < 1 {
break
}
ii, v := slice.Rand(number.Range(0, len(s)))
s[ii] = v*/
}
} }
go fn() p := taskPools.NewPools(20)
go fn() for i := 0; i < 50; i++ {
go fn() p.Execute(fn)
go fn() }
time.Sleep(time.Second) p.Wait()
fmt.Println(tt.r.Load()) fmt.Println(tt.r.Load())
}) })
} }