From 2eb58f732ba981c05d5a496b58295be3cfe78674 Mon Sep 17 00:00:00 2001 From: xing Date: Sun, 29 Oct 2023 18:46:01 +0800 Subject: [PATCH] expend cache map --- cache/cache.go | 6 +++ cache/map.go | 107 +++++++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 110 insertions(+), 3 deletions(-) diff --git a/cache/cache.go b/cache/cache.go index 87b5571..2e12264 100644 --- a/cache/cache.go +++ b/cache/cache.go @@ -15,3 +15,9 @@ type Cache[K comparable, V any] interface { Del(ctx context.Context, key ...K) ClearExpired(ctx context.Context) } + +type Expend[K comparable, V any] interface { + Gets(ctx context.Context, k []K) (map[K]V, error) + Vers(ctx context.Context, k []K) map[K]int + Sets(ctx context.Context, m map[K]V) +} diff --git a/cache/map.go b/cache/map.go index dca74af..2b88f19 100644 --- a/cache/map.go +++ b/cache/map.go @@ -11,9 +11,10 @@ import ( type MapCache[K comparable, V any] struct { Cache[K, V] - mux sync.Mutex - cacheFunc MapSingleFn[K, V] - batchCacheFn MapBatchFn[K, V] + mux sync.Mutex + cacheFunc MapSingleFn[K, V] + batchCacheFn MapBatchFn[K, V] + getCacheBatch func(c context.Context, key []K, timeout time.Duration, params ...any) ([]V, error) } type MapSingleFn[K, V any] func(context.Context, K, ...any) (V, error) @@ -31,6 +32,12 @@ func NewMapCache[K comparable, V any](data Cache[K, V], cacheFunc MapSingleFn[K, } else if batchCacheFn == nil && cacheFunc != nil { r.SetDefaultBatchFunc(cacheFunc) } + ex, ok := any(data).(Expend[K, V]) + if !ok { + r.getCacheBatch = r.getCacheBatchs + } else { + r.getCacheBatch = r.getBatches(ex) + } return r } @@ -129,6 +136,10 @@ func (m *MapCache[K, V]) GetCache(c context.Context, key K, timeout time.Duratio } func (m *MapCache[K, V]) GetCacheBatch(c context.Context, key []K, timeout time.Duration, params ...any) ([]V, error) { + return m.getCacheBatch(c, key, timeout, params...) +} + +func (m *MapCache[K, V]) getCacheBatchs(c context.Context, key []K, timeout time.Duration, params ...any) ([]V, error) { var res = make([]V, 0, len(key)) var needIndex = make(map[K]int) var ver = make(map[K]int) @@ -196,3 +207,93 @@ func (m *MapCache[K, V]) GetCacheBatch(c context.Context, key []K, timeout time. return res, err } + +func (m *MapCache[K, V]) getBatches(e Expend[K, V]) func(ctx context.Context, key []K, timeout time.Duration, params ...any) ([]V, error) { + cc := e + return func(ctx context.Context, key []K, timeout time.Duration, params ...any) ([]V, error) { + var res = make([]V, 0, len(key)) + var needIndex = make(map[K]int) + var err error + mm, err := cc.Gets(ctx, key) + if err != nil { + return nil, err + } + var flushKeys []K + for i, k := range key { + v, ok := mm[k] + if !ok { + flushKeys = append(flushKeys, k) + needIndex[k] = i + } else { + var vv V + v = vv + } + res = append(res, v) + } + if len(needIndex) < 1 { + return res, nil + } + vers := cc.Vers(ctx, flushKeys) + + call := func() { + m.mux.Lock() + defer m.mux.Unlock() + verss := cc.Vers(ctx, flushKeys) + needFlushs := maps.FilterToSlice(needIndex, func(k K, v int) (K, bool) { + vv, ok := vers[k] + vvv, ook := verss[k] + if !ok || !ook || vv >= vvv { + return k, true + } + return k, false + }) + + if len(needFlushs) < 1 { + vv, er := cc.Gets(ctx, needFlushs) + if er != nil { + err = er + return + } + for k, i := range needIndex { + v, ok := vv[k] + if ok { + res[i] = v + } + } + return + } + + r, er := m.batchCacheFn(ctx, needFlushs, params...) + if err != nil { + err = er + return + } + cc.Sets(ctx, r) + + for k, i := range needIndex { + v, ok := r[k] + if ok { + res[i] = v + } + } + } + if timeout > 0 { + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + done := make(chan struct{}, 1) + go func() { + call() + done <- struct{}{} + }() + select { + case <-ctx.Done(): + err = errors.New(fmt.Sprintf("get cache %v %s", key, ctx.Err().Error())) + case <-done: + } + } else { + call() + } + + return res, err + } +}