var cache increaseUpdate

This commit is contained in:
xing 2023-11-29 18:24:41 +08:00
parent 9f49a274cd
commit 137b2a9738
4 changed files with 55 additions and 27 deletions

View File

@ -82,7 +82,7 @@ func GetCommentByIds(ctx context.Context, ids []uint64, _ ...any) (map[uint64]mo
} }
func GetIncreaseComment(ctx context.Context, currentData []uint64, k uint64, t time.Time, _ ...any) (data []uint64, save bool, refresh bool, err error) { func GetIncreaseComment(ctx context.Context, currentData []uint64, k uint64, t time.Time, _ ...any) (data []uint64, save bool, refresh bool, err error) {
r, err := model.ChunkFind[models.Comments](ctx, 300, model.Conditions( r, err := model.ChunkFind[models.Comments](ctx, 1000, model.Conditions(
model.Where(model.SqlBuilder{ model.Where(model.SqlBuilder{
{"comment_approved", "1"}, {"comment_approved", "1"},
{"comment_post_ID", "=", number.IntToString(k), "int"}, {"comment_post_ID", "=", number.IntToString(k), "int"},

3
cache/cache.go vendored
View File

@ -34,3 +34,6 @@ type AnyCache[T any] interface {
type Refresh[K comparable, V any] interface { type Refresh[K comparable, V any] interface {
Refresh(ctx context.Context, k K, a ...any) Refresh(ctx context.Context, k K, a ...any)
} }
type RefreshVar[T any] interface {
Refresh(ctx context.Context, a ...any)
}

View File

@ -47,12 +47,6 @@ func SetMapCache[K comparable, V any](name string, ca *cache.MapCache[K, V]) err
return nil return nil
} }
type expire struct {
fn func() time.Duration
p *safety.Var[time.Duration]
isUseManger *safety.Var[bool]
}
type flush interface { type flush interface {
Flush(ctx context.Context) Flush(ctx context.Context)
} }
@ -225,7 +219,9 @@ func ClearExpired() {
} }
func NewVarCache[T any](c cache.AnyCache[T], fn func(context.Context, ...any) (T, error), a ...any) *cache.VarCache[T] { func NewVarCache[T any](c cache.AnyCache[T], fn func(context.Context, ...any) (T, error), a ...any) *cache.VarCache[T] {
v := cache.NewVarCache(c, fn) inc := helper.ParseArgs(cache.IncreaseUpdateVar[T]{}, a...)
ref := helper.ParseArgs(cache.RefreshVar[T](nil), a...)
v := cache.NewVarCache(c, fn, inc, ref)
FlushPush(v) FlushPush(v)
name, _ := parseArgs(a...) name, _ := parseArgs(a...)
if name != "" { if name != "" {

67
cache/vars.go vendored
View File

@ -2,8 +2,7 @@ package cache
import ( import (
"context" "context"
"errors" "github.com/fthvgb1/wp-go/helper"
"fmt"
"github.com/fthvgb1/wp-go/safety" "github.com/fthvgb1/wp-go/safety"
"sync" "sync"
"time" "time"
@ -11,16 +10,54 @@ import (
type VarCache[T any] struct { type VarCache[T any] struct {
AnyCache[T] AnyCache[T]
setCacheFunc func(context.Context, ...any) (T, error) setCacheFunc func(context.Context, ...any) (T, error)
mutex sync.Mutex mutex sync.Mutex
increaseUpdate IncreaseUpdateVar[T]
refresh RefreshVar[T]
} }
type IncreaseUpdateVar[T any] struct {
CycleTime func() time.Duration
Fn IncreaseVarFn[T]
}
type IncreaseVarFn[T any] func(c context.Context, currentData T, t time.Time, a ...any) (data T, save bool, refresh bool, err error)
func (t *VarCache[T]) GetCache(ctx context.Context, timeout time.Duration, params ...any) (T, error) { func (t *VarCache[T]) GetCache(ctx context.Context, timeout time.Duration, params ...any) (T, error) {
data, ok := t.Get(ctx) data, ok := t.Get(ctx)
var err error
if ok { if ok {
if t.increaseUpdate.Fn != nil && t.refresh != nil {
nowTime := time.Now()
if t.increaseUpdate.CycleTime() > nowTime.Sub(t.GetLastSetTime(ctx)) {
return data, nil
}
fn := func() {
t.mutex.Lock()
defer t.mutex.Unlock()
da, save, refresh, er := t.increaseUpdate.Fn(ctx, data, t.GetLastSetTime(ctx), params...)
if er != nil {
err = er
return
}
if save {
t.Set(ctx, da)
}
if refresh {
t.refresh.Refresh(ctx, params...)
}
}
if timeout > 0 {
er := helper.RunFnWithTimeout(ctx, timeout, fn, "increaseUpdate cache fail")
if err == nil && er != nil {
err = er
}
} else {
fn()
}
}
return data, nil return data, nil
} }
var err error
call := func() { call := func() {
t.mutex.Lock() t.mutex.Lock()
defer t.mutex.Unlock() defer t.mutex.Unlock()
@ -38,19 +75,9 @@ func (t *VarCache[T]) GetCache(ctx context.Context, timeout time.Duration, param
data = r data = r
} }
if timeout > 0 { if timeout > 0 {
ctx, cancel := context.WithTimeout(ctx, timeout) er := helper.RunFnWithTimeout(ctx, timeout, call, "get cache fail")
defer cancel() if err == nil && er != nil {
done := make(chan struct{}, 1) err = er
go func() {
call()
done <- struct{}{}
close(done)
}()
select {
case <-ctx.Done():
err = errors.New(fmt.Sprintf("get cache %s", ctx.Err().Error()))
case <-done:
} }
} else { } else {
call() call()
@ -98,9 +125,11 @@ func (c *VarMemoryCache[T]) GetLastSetTime(_ context.Context) time.Time {
return c.v.Load().setTime return c.v.Load().setTime
} }
func NewVarCache[T any](cache AnyCache[T], fn func(context.Context, ...any) (T, error)) *VarCache[T] { func NewVarCache[T any](cache AnyCache[T], fn func(context.Context, ...any) (T, error), inc IncreaseUpdateVar[T], ref RefreshVar[T]) *VarCache[T] {
return &VarCache[T]{ return &VarCache[T]{
AnyCache: cache, setCacheFunc: fn, mutex: sync.Mutex{}, AnyCache: cache, setCacheFunc: fn, mutex: sync.Mutex{},
increaseUpdate: inc,
refresh: ref,
} }
} }