From 137b2a9738c481a14a9e0d6b9f71c977c18ff306 Mon Sep 17 00:00:00 2001 From: xing Date: Wed, 29 Nov 2023 18:24:41 +0800 Subject: [PATCH] var cache increaseUpdate --- app/pkg/dao/comments.go | 2 +- cache/cache.go | 3 ++ cache/cachemanager/manger.go | 10 ++---- cache/vars.go | 67 ++++++++++++++++++++++++++---------- 4 files changed, 55 insertions(+), 27 deletions(-) diff --git a/app/pkg/dao/comments.go b/app/pkg/dao/comments.go index ec71918..448d2e0 100644 --- a/app/pkg/dao/comments.go +++ b/app/pkg/dao/comments.go @@ -82,7 +82,7 @@ func GetCommentByIds(ctx context.Context, ids []uint64, _ ...any) (map[uint64]mo } func GetIncreaseComment(ctx context.Context, currentData []uint64, k uint64, t time.Time, _ ...any) (data []uint64, save bool, refresh bool, err error) { - r, err := model.ChunkFind[models.Comments](ctx, 300, model.Conditions( + r, err := model.ChunkFind[models.Comments](ctx, 1000, model.Conditions( model.Where(model.SqlBuilder{ {"comment_approved", "1"}, {"comment_post_ID", "=", number.IntToString(k), "int"}, diff --git a/cache/cache.go b/cache/cache.go index 21bf44f..c49e98c 100644 --- a/cache/cache.go +++ b/cache/cache.go @@ -34,3 +34,6 @@ type AnyCache[T any] interface { type Refresh[K comparable, V any] interface { Refresh(ctx context.Context, k K, a ...any) } +type RefreshVar[T any] interface { + Refresh(ctx context.Context, a ...any) +} diff --git a/cache/cachemanager/manger.go b/cache/cachemanager/manger.go index 3f7653d..a83cb0e 100644 --- a/cache/cachemanager/manger.go +++ b/cache/cachemanager/manger.go @@ -47,12 +47,6 @@ func SetMapCache[K comparable, V any](name string, ca *cache.MapCache[K, V]) err return nil } -type expire struct { - fn func() time.Duration - p *safety.Var[time.Duration] - isUseManger *safety.Var[bool] -} - type flush interface { Flush(ctx context.Context) } @@ -225,7 +219,9 @@ func ClearExpired() { } func NewVarCache[T any](c cache.AnyCache[T], fn func(context.Context, ...any) (T, error), a ...any) *cache.VarCache[T] { - v := cache.NewVarCache(c, fn) + inc := helper.ParseArgs(cache.IncreaseUpdateVar[T]{}, a...) + ref := helper.ParseArgs(cache.RefreshVar[T](nil), a...) + v := cache.NewVarCache(c, fn, inc, ref) FlushPush(v) name, _ := parseArgs(a...) if name != "" { diff --git a/cache/vars.go b/cache/vars.go index bfde6b2..27e4490 100644 --- a/cache/vars.go +++ b/cache/vars.go @@ -2,8 +2,7 @@ package cache import ( "context" - "errors" - "fmt" + "github.com/fthvgb1/wp-go/helper" "github.com/fthvgb1/wp-go/safety" "sync" "time" @@ -11,16 +10,54 @@ import ( type VarCache[T any] struct { AnyCache[T] - setCacheFunc func(context.Context, ...any) (T, error) - mutex sync.Mutex + setCacheFunc func(context.Context, ...any) (T, error) + mutex sync.Mutex + increaseUpdate IncreaseUpdateVar[T] + refresh RefreshVar[T] } +type IncreaseUpdateVar[T any] struct { + CycleTime func() time.Duration + Fn IncreaseVarFn[T] +} + +type IncreaseVarFn[T any] func(c context.Context, currentData T, t time.Time, a ...any) (data T, save bool, refresh bool, err error) + func (t *VarCache[T]) GetCache(ctx context.Context, timeout time.Duration, params ...any) (T, error) { data, ok := t.Get(ctx) + var err error if ok { + if t.increaseUpdate.Fn != nil && t.refresh != nil { + nowTime := time.Now() + if t.increaseUpdate.CycleTime() > nowTime.Sub(t.GetLastSetTime(ctx)) { + return data, nil + } + fn := func() { + t.mutex.Lock() + defer t.mutex.Unlock() + da, save, refresh, er := t.increaseUpdate.Fn(ctx, data, t.GetLastSetTime(ctx), params...) + if er != nil { + err = er + return + } + if save { + t.Set(ctx, da) + } + if refresh { + t.refresh.Refresh(ctx, params...) + } + } + if timeout > 0 { + er := helper.RunFnWithTimeout(ctx, timeout, fn, "increaseUpdate cache fail") + if err == nil && er != nil { + err = er + } + } else { + fn() + } + } return data, nil } - var err error call := func() { t.mutex.Lock() defer t.mutex.Unlock() @@ -38,19 +75,9 @@ func (t *VarCache[T]) GetCache(ctx context.Context, timeout time.Duration, param data = r } if timeout > 0 { - ctx, cancel := context.WithTimeout(ctx, timeout) - defer cancel() - done := make(chan struct{}, 1) - go func() { - call() - done <- struct{}{} - close(done) - }() - select { - case <-ctx.Done(): - err = errors.New(fmt.Sprintf("get cache %s", ctx.Err().Error())) - case <-done: - + er := helper.RunFnWithTimeout(ctx, timeout, call, "get cache fail") + if err == nil && er != nil { + err = er } } else { call() @@ -98,9 +125,11 @@ func (c *VarMemoryCache[T]) GetLastSetTime(_ context.Context) time.Time { return c.v.Load().setTime } -func NewVarCache[T any](cache AnyCache[T], fn func(context.Context, ...any) (T, error)) *VarCache[T] { +func NewVarCache[T any](cache AnyCache[T], fn func(context.Context, ...any) (T, error), inc IncreaseUpdateVar[T], ref RefreshVar[T]) *VarCache[T] { return &VarCache[T]{ AnyCache: cache, setCacheFunc: fn, mutex: sync.Mutex{}, + increaseUpdate: inc, + refresh: ref, } }