diff --git a/app/pkg/cache/cache.go b/app/pkg/cache/cache.go index 53a174d..1813010 100644 --- a/app/pkg/cache/cache.go +++ b/app/pkg/cache/cache.go @@ -6,6 +6,7 @@ import ( "github.com/fthvgb1/wp-go/app/pkg/dao" "github.com/fthvgb1/wp-go/app/pkg/logs" "github.com/fthvgb1/wp-go/app/pkg/models" + "github.com/fthvgb1/wp-go/cache" "github.com/fthvgb1/wp-go/cache/cachemanager" "github.com/fthvgb1/wp-go/helper/slice" "github.com/fthvgb1/wp-go/safety" @@ -51,9 +52,15 @@ func InitActionsCommonCache() { return config.GetConfig().CacheTime.RecentCommentsCacheTime }) - cachemanager.NewMemoryMapCache(nil, dao.PostComments, c.CacheTime.PostCommentsCacheTime, "postCommentIds", func() time.Duration { - return config.GetConfig().CacheTime.PostCommentsCacheTime - }) + cachemanager.NewMemoryMapCache(nil, dao.PostComments, c.CacheTime.PostCommentsCacheTime, + "postCommentIds", + func() time.Duration { + return config.GetConfig().CacheTime.PostCommentsCacheTime + }, + cache.NewIncreaseUpdate("comment-increaseUpdate", dao.GetIncreaseComment, 30*time.Second, + func() time.Duration { + return config.GetConfig().CacheTime.CommentsIncreaseUpdateTime + })) cachemanager.NewVarMemoryCache(dao.GetMaxPostId, c.CacheTime.MaxPostIdCacheTime, "maxPostId", func() time.Duration { return config.GetConfig().CacheTime.MaxPostIdCacheTime diff --git a/app/pkg/config/config.go b/app/pkg/config/config.go index 69a4e6f..60fcf8d 100644 --- a/app/pkg/config/config.go +++ b/app/pkg/config/config.go @@ -46,23 +46,24 @@ type Config struct { } type CacheTime struct { - CacheControl time.Duration `yaml:"cacheControl" json:"cacheControl,omitempty"` - RecentPostCacheTime time.Duration `yaml:"recentPostCacheTime" json:"recentPostCacheTime,omitempty"` - CategoryCacheTime time.Duration `yaml:"categoryCacheTime" json:"categoryCacheTime,omitempty"` - ArchiveCacheTime time.Duration `yaml:"archiveCacheTime" json:"archiveCacheTime,omitempty"` - ContextPostCacheTime time.Duration `yaml:"contextPostCacheTime" json:"contextPostCacheTime,omitempty"` - RecentCommentsCacheTime time.Duration `yaml:"recentCommentsCacheTime" json:"recentCommentsCacheTime,omitempty"` - DigestCacheTime time.Duration `yaml:"digestCacheTime" json:"digestCacheTime,omitempty"` - PostListCacheTime time.Duration `yaml:"postListCacheTime" json:"postListCacheTime,omitempty"` - SearchPostCacheTime time.Duration `yaml:"searchPostCacheTime" json:"searchPostCacheTime,omitempty"` - MonthPostCacheTime time.Duration `yaml:"monthPostCacheTime" json:"monthPostCacheTime,omitempty"` - PostDataCacheTime time.Duration `yaml:"postDataCacheTime" json:"postDataCacheTime,omitempty"` - PostCommentsCacheTime time.Duration `yaml:"postCommentsCacheTime" json:"postCommentsCacheTime,omitempty"` - CrontabClearCacheTime time.Duration `yaml:"crontabClearCacheTime" json:"crontabClearCacheTime,omitempty"` - MaxPostIdCacheTime time.Duration `yaml:"maxPostIdCacheTime" json:"maxPostIdCacheTime,omitempty"` - UserInfoCacheTime time.Duration `yaml:"userInfoCacheTime" json:"userInfoCacheTime,omitempty"` - CommentsCacheTime time.Duration `yaml:"commentsCacheTime" json:"commentsCacheTime,omitempty"` - SleepTime []time.Duration `yaml:"sleepTime" json:"sleepTime,omitempty"` + CacheControl time.Duration `yaml:"cacheControl" json:"cacheControl,omitempty"` + RecentPostCacheTime time.Duration `yaml:"recentPostCacheTime" json:"recentPostCacheTime,omitempty"` + CategoryCacheTime time.Duration `yaml:"categoryCacheTime" json:"categoryCacheTime,omitempty"` + ArchiveCacheTime time.Duration `yaml:"archiveCacheTime" json:"archiveCacheTime,omitempty"` + ContextPostCacheTime time.Duration `yaml:"contextPostCacheTime" json:"contextPostCacheTime,omitempty"` + RecentCommentsCacheTime time.Duration `yaml:"recentCommentsCacheTime" json:"recentCommentsCacheTime,omitempty"` + DigestCacheTime time.Duration `yaml:"digestCacheTime" json:"digestCacheTime,omitempty"` + PostListCacheTime time.Duration `yaml:"postListCacheTime" json:"postListCacheTime,omitempty"` + SearchPostCacheTime time.Duration `yaml:"searchPostCacheTime" json:"searchPostCacheTime,omitempty"` + MonthPostCacheTime time.Duration `yaml:"monthPostCacheTime" json:"monthPostCacheTime,omitempty"` + PostDataCacheTime time.Duration `yaml:"postDataCacheTime" json:"postDataCacheTime,omitempty"` + PostCommentsCacheTime time.Duration `yaml:"postCommentsCacheTime" json:"postCommentsCacheTime,omitempty"` + CrontabClearCacheTime time.Duration `yaml:"crontabClearCacheTime" json:"crontabClearCacheTime,omitempty"` + MaxPostIdCacheTime time.Duration `yaml:"maxPostIdCacheTime" json:"maxPostIdCacheTime,omitempty"` + UserInfoCacheTime time.Duration `yaml:"userInfoCacheTime" json:"userInfoCacheTime,omitempty"` + CommentsCacheTime time.Duration `yaml:"commentsCacheTime" json:"commentsCacheTime,omitempty"` + SleepTime []time.Duration `yaml:"sleepTime" json:"sleepTime,omitempty"` + CommentsIncreaseUpdateTime time.Duration `yaml:"commentsIncreaseUpdateTime" json:"commentsIncreaseUpdateTime"` } type Ssl struct { diff --git a/app/pkg/dao/comments.go b/app/pkg/dao/comments.go index a061a1a..0cad097 100644 --- a/app/pkg/dao/comments.go +++ b/app/pkg/dao/comments.go @@ -2,11 +2,14 @@ package dao import ( "context" + "database/sql" + "errors" "github.com/fthvgb1/wp-go/app/pkg/models" "github.com/fthvgb1/wp-go/helper" "github.com/fthvgb1/wp-go/helper/number" "github.com/fthvgb1/wp-go/helper/slice" "github.com/fthvgb1/wp-go/model" + "time" ) // RecentComments @@ -50,9 +53,13 @@ func PostComments(ctx context.Context, postId uint64, _ ...any) ([]uint64, error func GetCommentByIds(ctx context.Context, ids []uint64, _ ...any) (map[uint64]models.Comments, error) { m := make(map[uint64]models.Comments) - r, err := model.SimpleFind[models.Comments](ctx, model.SqlBuilder{ - {"comment_ID", "in", ""}, {"comment_approved", "1"}, - }, "*", slice.ToAnySlice(ids)) + r, err := model.ChunkFind[models.Comments](ctx, 500, model.Conditions( + model.Where(model.SqlBuilder{ + {"comment_ID", "in", ""}, {"comment_approved", "1"}, + }), + model.Fields("*"), + model.In(slice.ToAnySlice(ids)), + )) if err != nil { return m, err } @@ -60,3 +67,35 @@ func GetCommentByIds(ctx context.Context, ids []uint64, _ ...any) (map[uint64]mo return t.CommentId }), err } + +func GetIncreaseComment(ctx context.Context, currentData []uint64, k uint64, t time.Time, _ ...any) (data []uint64, save bool, refresh bool, err error) { + r, err := model.ChunkFind[models.Comments](ctx, 300, model.Conditions( + model.Where(model.SqlBuilder{ + {"comment_approved", "1"}, + {"comment_post_ID", "=", number.IntToString(k), "int"}, + {"comment_date", ">=", t.Format(time.DateTime)}, + }), + model.Fields("comment_ID"), + model.Order(model.SqlBuilder{ + {"comment_date_gmt", "asc"}, + {"comment_ID", "asc"}, + })), + ) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + err = nil + refresh = true + } + return + } + if len(r) < 1 { + refresh = true + return + } + rr := slice.Map(r, func(t models.Comments) uint64 { + return t.CommentId + }) + data = append(currentData, rr...) + save = true + return +} diff --git a/cache/cache.go b/cache/cache.go index 8bd4d43..21bf44f 100644 --- a/cache/cache.go +++ b/cache/cache.go @@ -30,3 +30,7 @@ type AnyCache[T any] interface { Flush(ctx context.Context) GetLastSetTime(ctx context.Context) time.Time } + +type Refresh[K comparable, V any] interface { + Refresh(ctx context.Context, k K, a ...any) +} diff --git a/cache/cachemanager/manger.go b/cache/cachemanager/manger.go index 4d64ee2..3f7653d 100644 --- a/cache/cachemanager/manger.go +++ b/cache/cachemanager/manger.go @@ -5,6 +5,7 @@ import ( "errors" "github.com/fthvgb1/wp-go/cache" "github.com/fthvgb1/wp-go/cache/reload" + "github.com/fthvgb1/wp-go/helper" str "github.com/fthvgb1/wp-go/helper/strings" "github.com/fthvgb1/wp-go/safety" "time" @@ -15,8 +16,6 @@ var ctx = context.Background() var mapFlush = safety.NewMap[string, func(any)]() var anyFlush = safety.NewMap[string, func()]() -var expiredTime = safety.NewMap[string, expire]() - var varCache = safety.NewMap[string, any]() var mapCache = safety.NewMap[string, any]() @@ -176,7 +175,8 @@ func parseArgs(args ...any) (string, func() time.Duration) { } func NewMapCache[K comparable, V any](data cache.Cache[K, V], batchFn cache.MapBatchFn[K, V], fn cache.MapSingleFn[K, V], args ...any) *cache.MapCache[K, V] { - m := cache.NewMapCache[K, V](data, fn, batchFn) + inc := helper.ParseArgs(cache.IncreaseUpdate[K, V]{}, args...) + m := cache.NewMapCache[K, V](data, fn, batchFn, inc) FlushPush(m) ClearPush(m) name, f := parseArgs(args...) @@ -201,43 +201,13 @@ func SetExpireTime(c cache.SetTime, name string, expireTime time.Duration, expir if name == "" { return } - var t, tt func() time.Duration - t = expireTimeFn - if t == nil { - t = func() time.Duration { - return expireTime - } - } - tt = t - expireTime = t() - p := safety.NewVar(expireTime) - e := expire{ - fn: t, - p: p, - isUseManger: safety.NewVar(false), - } - expiredTime.Store(name, e) - reload.Push(func() { - if !e.isUseManger.Load() { - e.p.Store(tt()) - } - }, str.Join("cacheManger-", name, "-expiredTime")) - t = func() time.Duration { - return e.p.Load() - } - c.SetExpiredTime(t) + fn := reload.FnVal(str.Join("cacheManger-", name, "-expiredTime"), expireTime, expireTimeFn) + c.SetExpiredTime(fn) } func ChangeExpireTime(t time.Duration, name ...string) { for _, s := range name { - v, ok := expiredTime.Load(s) - if !ok { - continue - } - v.p.Store(t) - if !v.isUseManger.Load() { - v.isUseManger.Store(true) - } + reload.ChangeFnVal(s, t) } } diff --git a/cache/cachemanager/manger_test.go b/cache/cachemanager/manger_test.go index beabc8f..72032a6 100644 --- a/cache/cachemanager/manger_test.go +++ b/cache/cachemanager/manger_test.go @@ -94,18 +94,18 @@ func TestSetExpireTime(t *testing.T) { func TestSetMapCache(t *testing.T) { t.Run("t1", func(t *testing.T) { - NewMemoryMapCache(nil, func(ctx2 context.Context, k string, a ...any) (string, error) { + x := NewMemoryMapCache(nil, func(ctx2 context.Context, k string, a ...any) (string, error) { fmt.Println("memory cache") return strings.Repeat(k, 2), nil }, time.Hour, "test") fmt.Println(Get[string]("test", ctx, "test", time.Second)) - cc := NewMapCache[string, string](xx[string, string]{m: map[string]string{}}, nil, func(ctx2 context.Context, k string, a ...any) (string, error) { + NewMapCache[string, string](xx[string, string]{m: map[string]string{}}, nil, func(ctx2 context.Context, k string, a ...any) (string, error) { fmt.Println("other cache drives. eg: redis,file.....") return strings.Repeat(k, 2), nil - }, "kkk", time.Hour) + }, "test", time.Hour) - if err := SetMapCache("test", cc); err != nil { + if err := SetMapCache("kkk", x); err != nil { t.Errorf("SetMapCache() error = %v, wantErr %v", err, nil) } fmt.Println(Get[string]("test", ctx, "test", time.Second)) @@ -152,16 +152,16 @@ func (x xx[K, V]) ClearExpired(ctx context.Context) { func TestSetVarCache(t *testing.T) { t.Run("t1", func(t *testing.T) { - NewVarMemoryCache(func(ctx2 context.Context, a ...any) (string, error) { + bak := NewVarMemoryCache(func(ctx2 context.Context, a ...any) (string, error) { fmt.Println("memory cache") return "xxx", nil }, time.Hour, "test") fmt.Println(GetVarVal[string]("test", ctx, time.Second)) - o := NewVarCache[string](oo[string]{}, func(ctx2 context.Context, a ...any) (string, error) { + NewVarCache[string](oo[string]{}, func(ctx2 context.Context, a ...any) (string, error) { fmt.Println("other cache drives. eg: redis,file.....") return "ooo", nil - }) - if err := SetVarCache("test", o); err != nil { + }, "test") + if err := SetVarCache("xx", bak); err != nil { t.Errorf("SetVarCache() error = %v, wantErr %v", err, nil) } fmt.Println(GetVarVal[string]("test", ctx, time.Second)) diff --git a/cache/map.go b/cache/map.go index 71b7e73..6078ef3 100644 --- a/cache/map.go +++ b/cache/map.go @@ -4,6 +4,8 @@ import ( "context" "errors" "fmt" + "github.com/fthvgb1/wp-go/cache/reload" + "github.com/fthvgb1/wp-go/helper" "github.com/fthvgb1/wp-go/helper/maps" "sync" "time" @@ -16,24 +18,37 @@ type MapCache[K comparable, V any] struct { batchCacheFn MapBatchFn[K, V] getCacheBatch func(c context.Context, key []K, timeout time.Duration, params ...any) ([]V, error) getCacheBatchToMap func(c context.Context, key []K, timeout time.Duration, params ...any) (map[K]V, error) + increaseUpdate IncreaseUpdate[K, V] + refresh Refresh[K, V] +} +type IncreaseUpdate[K comparable, V any] struct { + CycleTime func() time.Duration + Fn IncreaseFn[K, V] +} + +func NewIncreaseUpdate[K comparable, V any](name string, fn IncreaseFn[K, V], cycleTime time.Duration, tFn func() time.Duration) IncreaseUpdate[K, V] { + tFn = reload.FnVal(name, cycleTime, tFn) + return IncreaseUpdate[K, V]{CycleTime: tFn, Fn: fn} } type MapSingleFn[K, V any] func(context.Context, K, ...any) (V, error) type MapBatchFn[K comparable, V any] func(context.Context, []K, ...any) (map[K]V, error) +type IncreaseFn[K comparable, V any] func(c context.Context, currentData V, k K, t time.Time, a ...any) (data V, save bool, refresh bool, err error) -func NewMapCache[K comparable, V any](data Cache[K, V], cacheFunc MapSingleFn[K, V], batchCacheFn MapBatchFn[K, V]) *MapCache[K, V] { +func NewMapCache[K comparable, V any](ca Cache[K, V], cacheFunc MapSingleFn[K, V], batchCacheFn MapBatchFn[K, V], inc IncreaseUpdate[K, V]) *MapCache[K, V] { r := &MapCache[K, V]{ - Cache: data, - mux: sync.Mutex{}, - cacheFunc: cacheFunc, - batchCacheFn: batchCacheFn, + Cache: ca, + mux: sync.Mutex{}, + cacheFunc: cacheFunc, + batchCacheFn: batchCacheFn, + increaseUpdate: inc, } if cacheFunc == nil && batchCacheFn != nil { r.setDefaultCacheFn(batchCacheFn) } else if batchCacheFn == nil && cacheFunc != nil { r.SetDefaultBatchFunc(cacheFunc) } - ex, ok := any(data).(Expend[K, V]) + ex, ok := any(ca).(Expend[K, V]) if !ok { r.getCacheBatch = r.getCacheBatchs r.getCacheBatchToMap = r.getBatchToMapes @@ -41,6 +56,10 @@ func NewMapCache[K comparable, V any](data Cache[K, V], cacheFunc MapSingleFn[K, r.getCacheBatch = r.getBatches(ex) r.getCacheBatchToMap = r.getBatchToMap(ex) } + re, ok := any(ca).(Refresh[K, V]) + if ok { + r.refresh = re + } return r } @@ -101,10 +120,44 @@ func (m *MapCache[K, V]) Flush(ctx context.Context) { func (m *MapCache[K, V]) GetCache(c context.Context, key K, timeout time.Duration, params ...any) (V, error) { data, ok := m.Get(c, key) - if ok { - return data, nil - } var err error + if ok { + if m.increaseUpdate.Fn == nil || m.refresh == nil { + return data, err + } + nowTime := time.Now() + if nowTime.Sub(m.GetLastSetTime(c, key)) < m.increaseUpdate.CycleTime() { + return data, err + } + fn := func() { + m.mux.Lock() + defer m.mux.Unlock() + if nowTime.Sub(m.GetLastSetTime(c, key)) < m.increaseUpdate.CycleTime() { + return + } + dat, save, refresh, er := m.increaseUpdate.Fn(c, data, key, m.GetLastSetTime(c, key), params...) + if er != nil { + err = er + return + } + if refresh { + m.refresh.Refresh(c, key, params...) + } + if save { + m.Set(c, key, dat) + data = dat + } + } + if timeout > 0 { + er := helper.RunFnWithTimeout(c, timeout, fn, fmt.Sprintf("increateUpdate cache %v err", key)) + if err == nil && er != nil { + return data, er + } + } else { + fn() + } + return data, err + } call := func() { m.mux.Lock() defer m.mux.Unlock() @@ -118,19 +171,9 @@ func (m *MapCache[K, V]) GetCache(c context.Context, key K, timeout time.Duratio m.Set(c, key, data) } if timeout > 0 { - ctx, cancel := context.WithTimeout(c, timeout) - defer cancel() - done := make(chan struct{}, 1) - go func() { - call() - done <- struct{}{} - }() - select { - case <-ctx.Done(): - err = errors.New(fmt.Sprintf("get cache %v %s", key, ctx.Err().Error())) - var vv V - return vv, err - case <-done: + er := helper.RunFnWithTimeout(c, timeout, call, fmt.Sprintf("get cache %v ", key)) + if err == nil && er != nil { + err = er } } else { call() diff --git a/cache/memorymapcache.go b/cache/memorymapcache.go index 808296f..01a31f7 100644 --- a/cache/memorymapcache.go +++ b/cache/memorymapcache.go @@ -2,6 +2,7 @@ package cache import ( "context" + "github.com/fthvgb1/wp-go/helper" "github.com/fthvgb1/wp-go/safety" "time" ) @@ -97,3 +98,13 @@ func (m *MemoryMapCache[K, V]) ClearExpired(_ context.Context) { return true }) } + +func (m *MemoryMapCache[K, V]) Refresh(_ context.Context, k K, a ...any) { + v, ok := m.Load(k) + if !ok { + return + } + t := helper.ParseArgs(time.Now(), a...) + v.setTime = t + m.Store(k, v) +} diff --git a/cache/reload/reload.go b/cache/reload/reload.go index 3f99242..182c9c6 100644 --- a/cache/reload/reload.go +++ b/cache/reload/reload.go @@ -261,7 +261,6 @@ func Push(fn func(), a ...any) { } func Reload() { - anyMap.Flush() callsM.Flush() flushMapFn.Flush() callll := calls.Load() @@ -273,3 +272,49 @@ func Reload() { } return } + +type Any[T any] struct { + fn func() T + v *safety.Var[T] + isUseManger *safety.Var[bool] +} + +func FnVal[T any](name string, t T, fn func() T) func() T { + if fn == nil { + fn = func() T { + return t + } + } else { + t = fn() + } + p := safety.NewVar(t) + e := Any[T]{ + fn: fn, + v: p, + isUseManger: safety.NewVar(false), + } + Push(func() { + if !e.isUseManger.Load() { + e.v.Store(fn()) + } + }) + anyMap.Store(name, e) + return func() T { + return e.v.Load() + } +} + +func ChangeFnVal[T any](name string, val T) { + v, ok := anyMap.Load(name) + if !ok { + return + } + vv, ok := v.(Any[T]) + if !ok { + return + } + if !vv.isUseManger.Load() { + vv.isUseManger.Store(true) + } + vv.v.Store(val) +} diff --git a/config.example.yaml b/config.example.yaml index 07fda2e..e5000a9 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -58,6 +58,8 @@ cacheTime: userInfoCacheTime: 24h # 单独评论缓存时间 commentsCacheTime: 24h + # 评论增量更新时间 + commentsIncreaseUpdateTime: 30s # 随机sleep时间 sleepTime: [ 1s,3s ] # 摘要字数 >0截取指定字数 =0输出出空字符 <0为不截取,原样输出 diff --git a/helper/func.go b/helper/func.go index ca6c73c..bb91e2b 100644 --- a/helper/func.go +++ b/helper/func.go @@ -2,6 +2,7 @@ package helper import ( "context" + "errors" "fmt" str "github.com/fthvgb1/wp-go/helper/strings" "net/url" @@ -9,6 +10,7 @@ import ( "reflect" "strconv" "strings" + "time" ) func ToAny[T any](v T) any { @@ -144,3 +146,45 @@ func ParseArgs[T any](defaults T, a ...any) T { } return defaults } + +func RunFnWithTimeout(ctx context.Context, t time.Duration, call func(), a ...any) (err error) { + ctx, cancel := context.WithTimeout(ctx, t) + defer cancel() + done := make(chan struct{}, 1) + go func() { + call() + done <- struct{}{} + }() + select { + case <-ctx.Done(): + msg := ParseArgs("", a...) + if msg != "" { + return errors.New(str.Join(msg, ":", ctx.Err().Error())) + } + return ctx.Err() + case <-done: + close(done) + } + return nil +} + +func RunFnWithTimeouts[A, V any](ctx context.Context, t time.Duration, ar A, call func(A) (V, error), a ...any) (v V, err error) { + ctx, cancel := context.WithTimeout(ctx, t) + defer cancel() + done := make(chan struct{}, 1) + go func() { + v, err = call(ar) + done <- struct{}{} + }() + select { + case <-ctx.Done(): + msg := ParseArgs("", a...) + if msg != "" { + return v, errors.New(str.Join(msg, ":", ctx.Err().Error())) + } + return v, ctx.Err() + case <-done: + close(done) + } + return v, err +} diff --git a/helper/slice/slice.go b/helper/slice/slice.go index 7ecd8e3..46488fc 100644 --- a/helper/slice/slice.go +++ b/helper/slice/slice.go @@ -58,6 +58,15 @@ func Filter[T any](arr []T, fn func(T, int) bool) []T { } return r } +func Filters[T any](arr []T, fn func(T) bool) []T { + var r []T + for _, t := range arr { + if fn(t) { + r = append(r, t) + } + } + return r +} func Reduce[R, T any](arr []T, fn func(T, R) R, r R) R { for _, t := range arr {