expend cache map
This commit is contained in:
parent
936d033547
commit
2eb58f732b
6
cache/cache.go
vendored
6
cache/cache.go
vendored
|
@ -15,3 +15,9 @@ type Cache[K comparable, V any] interface {
|
||||||
Del(ctx context.Context, key ...K)
|
Del(ctx context.Context, key ...K)
|
||||||
ClearExpired(ctx context.Context)
|
ClearExpired(ctx context.Context)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type Expend[K comparable, V any] interface {
|
||||||
|
Gets(ctx context.Context, k []K) (map[K]V, error)
|
||||||
|
Vers(ctx context.Context, k []K) map[K]int
|
||||||
|
Sets(ctx context.Context, m map[K]V)
|
||||||
|
}
|
||||||
|
|
107
cache/map.go
vendored
107
cache/map.go
vendored
|
@ -11,9 +11,10 @@ import (
|
||||||
|
|
||||||
type MapCache[K comparable, V any] struct {
|
type MapCache[K comparable, V any] struct {
|
||||||
Cache[K, V]
|
Cache[K, V]
|
||||||
mux sync.Mutex
|
mux sync.Mutex
|
||||||
cacheFunc MapSingleFn[K, V]
|
cacheFunc MapSingleFn[K, V]
|
||||||
batchCacheFn MapBatchFn[K, V]
|
batchCacheFn MapBatchFn[K, V]
|
||||||
|
getCacheBatch func(c context.Context, key []K, timeout time.Duration, params ...any) ([]V, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type MapSingleFn[K, V any] func(context.Context, K, ...any) (V, error)
|
type MapSingleFn[K, V any] func(context.Context, K, ...any) (V, error)
|
||||||
|
@ -31,6 +32,12 @@ func NewMapCache[K comparable, V any](data Cache[K, V], cacheFunc MapSingleFn[K,
|
||||||
} else if batchCacheFn == nil && cacheFunc != nil {
|
} else if batchCacheFn == nil && cacheFunc != nil {
|
||||||
r.SetDefaultBatchFunc(cacheFunc)
|
r.SetDefaultBatchFunc(cacheFunc)
|
||||||
}
|
}
|
||||||
|
ex, ok := any(data).(Expend[K, V])
|
||||||
|
if !ok {
|
||||||
|
r.getCacheBatch = r.getCacheBatchs
|
||||||
|
} else {
|
||||||
|
r.getCacheBatch = r.getBatches(ex)
|
||||||
|
}
|
||||||
return r
|
return r
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -129,6 +136,10 @@ func (m *MapCache[K, V]) GetCache(c context.Context, key K, timeout time.Duratio
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *MapCache[K, V]) GetCacheBatch(c context.Context, key []K, timeout time.Duration, params ...any) ([]V, error) {
|
func (m *MapCache[K, V]) GetCacheBatch(c context.Context, key []K, timeout time.Duration, params ...any) ([]V, error) {
|
||||||
|
return m.getCacheBatch(c, key, timeout, params...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MapCache[K, V]) getCacheBatchs(c context.Context, key []K, timeout time.Duration, params ...any) ([]V, error) {
|
||||||
var res = make([]V, 0, len(key))
|
var res = make([]V, 0, len(key))
|
||||||
var needIndex = make(map[K]int)
|
var needIndex = make(map[K]int)
|
||||||
var ver = make(map[K]int)
|
var ver = make(map[K]int)
|
||||||
|
@ -196,3 +207,93 @@ func (m *MapCache[K, V]) GetCacheBatch(c context.Context, key []K, timeout time.
|
||||||
|
|
||||||
return res, err
|
return res, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *MapCache[K, V]) getBatches(e Expend[K, V]) func(ctx context.Context, key []K, timeout time.Duration, params ...any) ([]V, error) {
|
||||||
|
cc := e
|
||||||
|
return func(ctx context.Context, key []K, timeout time.Duration, params ...any) ([]V, error) {
|
||||||
|
var res = make([]V, 0, len(key))
|
||||||
|
var needIndex = make(map[K]int)
|
||||||
|
var err error
|
||||||
|
mm, err := cc.Gets(ctx, key)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
var flushKeys []K
|
||||||
|
for i, k := range key {
|
||||||
|
v, ok := mm[k]
|
||||||
|
if !ok {
|
||||||
|
flushKeys = append(flushKeys, k)
|
||||||
|
needIndex[k] = i
|
||||||
|
} else {
|
||||||
|
var vv V
|
||||||
|
v = vv
|
||||||
|
}
|
||||||
|
res = append(res, v)
|
||||||
|
}
|
||||||
|
if len(needIndex) < 1 {
|
||||||
|
return res, nil
|
||||||
|
}
|
||||||
|
vers := cc.Vers(ctx, flushKeys)
|
||||||
|
|
||||||
|
call := func() {
|
||||||
|
m.mux.Lock()
|
||||||
|
defer m.mux.Unlock()
|
||||||
|
verss := cc.Vers(ctx, flushKeys)
|
||||||
|
needFlushs := maps.FilterToSlice(needIndex, func(k K, v int) (K, bool) {
|
||||||
|
vv, ok := vers[k]
|
||||||
|
vvv, ook := verss[k]
|
||||||
|
if !ok || !ook || vv >= vvv {
|
||||||
|
return k, true
|
||||||
|
}
|
||||||
|
return k, false
|
||||||
|
})
|
||||||
|
|
||||||
|
if len(needFlushs) < 1 {
|
||||||
|
vv, er := cc.Gets(ctx, needFlushs)
|
||||||
|
if er != nil {
|
||||||
|
err = er
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for k, i := range needIndex {
|
||||||
|
v, ok := vv[k]
|
||||||
|
if ok {
|
||||||
|
res[i] = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
r, er := m.batchCacheFn(ctx, needFlushs, params...)
|
||||||
|
if err != nil {
|
||||||
|
err = er
|
||||||
|
return
|
||||||
|
}
|
||||||
|
cc.Sets(ctx, r)
|
||||||
|
|
||||||
|
for k, i := range needIndex {
|
||||||
|
v, ok := r[k]
|
||||||
|
if ok {
|
||||||
|
res[i] = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if timeout > 0 {
|
||||||
|
ctx, cancel := context.WithTimeout(ctx, timeout)
|
||||||
|
defer cancel()
|
||||||
|
done := make(chan struct{}, 1)
|
||||||
|
go func() {
|
||||||
|
call()
|
||||||
|
done <- struct{}{}
|
||||||
|
}()
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
err = errors.New(fmt.Sprintf("get cache %v %s", key, ctx.Err().Error()))
|
||||||
|
case <-done:
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
call()
|
||||||
|
}
|
||||||
|
|
||||||
|
return res, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue
Block a user