Compare commits

...

2 Commits

Author SHA1 Message Date
86d1616732 fix bug 2023-10-29 18:58:06 +08:00
2eb58f732b expend cache map 2023-10-29 18:46:01 +08:00
2 changed files with 109 additions and 3 deletions

6
cache/cache.go vendored
View File

@ -15,3 +15,9 @@ type Cache[K comparable, V any] interface {
Del(ctx context.Context, key ...K) Del(ctx context.Context, key ...K)
ClearExpired(ctx context.Context) ClearExpired(ctx context.Context)
} }
type Expend[K comparable, V any] interface {
Gets(ctx context.Context, k []K) (map[K]V, error)
Vers(ctx context.Context, k []K) map[K]int
Sets(ctx context.Context, m map[K]V)
}

106
cache/map.go vendored
View File

@ -11,9 +11,10 @@ import (
type MapCache[K comparable, V any] struct { type MapCache[K comparable, V any] struct {
Cache[K, V] Cache[K, V]
mux sync.Mutex mux sync.Mutex
cacheFunc MapSingleFn[K, V] cacheFunc MapSingleFn[K, V]
batchCacheFn MapBatchFn[K, V] batchCacheFn MapBatchFn[K, V]
getCacheBatch func(c context.Context, key []K, timeout time.Duration, params ...any) ([]V, error)
} }
type MapSingleFn[K, V any] func(context.Context, K, ...any) (V, error) type MapSingleFn[K, V any] func(context.Context, K, ...any) (V, error)
@ -31,6 +32,12 @@ func NewMapCache[K comparable, V any](data Cache[K, V], cacheFunc MapSingleFn[K,
} else if batchCacheFn == nil && cacheFunc != nil { } else if batchCacheFn == nil && cacheFunc != nil {
r.SetDefaultBatchFunc(cacheFunc) r.SetDefaultBatchFunc(cacheFunc)
} }
ex, ok := any(data).(Expend[K, V])
if !ok {
r.getCacheBatch = r.getCacheBatchs
} else {
r.getCacheBatch = r.getBatches(ex)
}
return r return r
} }
@ -129,6 +136,10 @@ func (m *MapCache[K, V]) GetCache(c context.Context, key K, timeout time.Duratio
} }
func (m *MapCache[K, V]) GetCacheBatch(c context.Context, key []K, timeout time.Duration, params ...any) ([]V, error) { func (m *MapCache[K, V]) GetCacheBatch(c context.Context, key []K, timeout time.Duration, params ...any) ([]V, error) {
return m.getCacheBatch(c, key, timeout, params...)
}
func (m *MapCache[K, V]) getCacheBatchs(c context.Context, key []K, timeout time.Duration, params ...any) ([]V, error) {
var res = make([]V, 0, len(key)) var res = make([]V, 0, len(key))
var needIndex = make(map[K]int) var needIndex = make(map[K]int)
var ver = make(map[K]int) var ver = make(map[K]int)
@ -196,3 +207,92 @@ func (m *MapCache[K, V]) GetCacheBatch(c context.Context, key []K, timeout time.
return res, err return res, err
} }
func (m *MapCache[K, V]) getBatches(e Expend[K, V]) func(ctx context.Context, key []K, timeout time.Duration, params ...any) ([]V, error) {
cc := e
return func(ctx context.Context, key []K, timeout time.Duration, params ...any) ([]V, error) {
var res = make([]V, 0, len(key))
var needIndex = make(map[K]int)
var err error
mm, err := cc.Gets(ctx, key)
if err != nil {
return nil, err
}
var flushKeys []K
for i, k := range key {
v, ok := mm[k]
if !ok {
flushKeys = append(flushKeys, k)
needIndex[k] = i
var vv V
v = vv
}
res = append(res, v)
}
if len(needIndex) < 1 {
return res, nil
}
vers := cc.Vers(ctx, flushKeys)
call := func() {
m.mux.Lock()
defer m.mux.Unlock()
verss := cc.Vers(ctx, flushKeys)
needFlushs := maps.FilterToSlice(needIndex, func(k K, v int) (K, bool) {
vv, ok := vers[k]
vvv, ook := verss[k]
if !ok || !ook || vv >= vvv {
return k, true
}
return k, false
})
if len(needFlushs) < 1 {
vv, er := cc.Gets(ctx, needFlushs)
if er != nil {
err = er
return
}
for k, i := range needIndex {
v, ok := vv[k]
if ok {
res[i] = v
}
}
return
}
r, er := m.batchCacheFn(ctx, needFlushs, params...)
if err != nil {
err = er
return
}
cc.Sets(ctx, r)
for k, i := range needIndex {
v, ok := r[k]
if ok {
res[i] = v
}
}
}
if timeout > 0 {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
done := make(chan struct{}, 1)
go func() {
call()
done <- struct{}{}
}()
select {
case <-ctx.Done():
err = errors.New(fmt.Sprintf("get cache %v %s", key, ctx.Err().Error()))
case <-done:
}
} else {
call()
}
return res, err
}
}