wp-go/cache/map.go

582 lines
14 KiB
Go
Raw Normal View History

2022-09-20 08:11:20 +00:00
package cache
import (
"context"
"errors"
"fmt"
2023-11-28 14:46:22 +00:00
"github.com/fthvgb1/wp-go/cache/reload"
"github.com/fthvgb1/wp-go/helper"
2023-10-28 07:19:39 +00:00
"github.com/fthvgb1/wp-go/helper/maps"
2022-09-20 08:11:20 +00:00
"sync"
"time"
)
type MapCache[K comparable, V any] struct {
Cache[K, V]
2023-12-26 15:09:30 +00:00
mux *sync.Mutex
muFn func(ctx context.Context, gMut *sync.Mutex, k ...K) *sync.Mutex
2023-11-07 07:18:34 +00:00
cacheFunc MapSingleFn[K, V]
batchCacheFn MapBatchFn[K, V]
getCacheBatch func(c context.Context, key []K, timeout time.Duration, params ...any) ([]V, error)
getCacheBatchToMap func(c context.Context, key []K, timeout time.Duration, params ...any) (map[K]V, error)
2023-12-03 14:42:44 +00:00
increaseUpdate *IncreaseUpdate[K, V]
2023-11-28 14:46:22 +00:00
refresh Refresh[K, V]
2023-12-10 11:15:49 +00:00
gets func(ctx context.Context, key K) (V, bool)
sets func(ctx context.Context, key K, val V)
getExpireTimes func(ctx context.Context) time.Duration
ttl func(ctx context.Context, key K) time.Duration
flush func(ctx context.Context)
del func(ctx context.Context, key ...K)
clearExpired func(ctx context.Context)
2023-11-28 14:46:22 +00:00
}
2023-12-10 11:15:49 +00:00
func (m *MapCache[K, V]) Get(ctx context.Context, key K) (V, bool) {
return m.gets(ctx, key)
}
func (m *MapCache[K, V]) Set(ctx context.Context, key K, val V) {
m.sets(ctx, key, val)
}
func (m *MapCache[K, V]) Ttl(ctx context.Context, key K) time.Duration {
return m.ttl(ctx, key)
}
func (m *MapCache[K, V]) GetExpireTime(ctx context.Context) time.Duration {
return m.getExpireTimes(ctx)
}
func (m *MapCache[K, V]) Del(ctx context.Context, key ...K) {
m.del(ctx, key...)
}
func (m *MapCache[K, V]) ClearExpired(ctx context.Context) {
m.clearExpired(ctx)
}
2023-11-28 14:46:22 +00:00
type IncreaseUpdate[K comparable, V any] struct {
CycleTime func() time.Duration
Fn IncreaseFn[K, V]
}
2023-12-03 14:42:44 +00:00
func NewIncreaseUpdate[K comparable, V any](name string, fn IncreaseFn[K, V], cycleTime time.Duration, tFn func() time.Duration) *IncreaseUpdate[K, V] {
tFn = reload.BuildFnVal(name, cycleTime, tFn)
2023-12-03 14:42:44 +00:00
return &IncreaseUpdate[K, V]{CycleTime: tFn, Fn: fn}
}
type MapSingleFn[K, V any] func(context.Context, K, ...any) (V, error)
type MapBatchFn[K comparable, V any] func(context.Context, []K, ...any) (map[K]V, error)
2023-11-28 14:46:22 +00:00
type IncreaseFn[K comparable, V any] func(c context.Context, currentData V, k K, t time.Time, a ...any) (data V, save bool, refresh bool, err error)
2023-12-26 15:09:30 +00:00
func NewMapCache[K comparable, V any](ca Cache[K, V], cacheFunc MapSingleFn[K, V], batchCacheFn MapBatchFn[K, V], inc *IncreaseUpdate[K, V], lockFn LockFn[K], a ...any) *MapCache[K, V] {
r := &MapCache[K, V]{
2023-11-28 14:46:22 +00:00
Cache: ca,
2023-12-26 15:09:30 +00:00
mux: &sync.Mutex{},
2023-11-28 14:46:22 +00:00
cacheFunc: cacheFunc,
batchCacheFn: batchCacheFn,
increaseUpdate: inc,
2023-12-26 15:09:30 +00:00
muFn: lockFn,
}
if cacheFunc == nil && batchCacheFn != nil {
r.setDefaultCacheFn(batchCacheFn)
} else if batchCacheFn == nil && cacheFunc != nil {
r.SetDefaultBatchFunc(cacheFunc)
}
2023-11-28 14:46:22 +00:00
ex, ok := any(ca).(Expend[K, V])
2023-10-29 10:46:01 +00:00
if !ok {
r.getCacheBatch = r.getCacheBatchs
2023-11-07 07:18:34 +00:00
r.getCacheBatchToMap = r.getBatchToMapes
2023-10-29 10:46:01 +00:00
} else {
r.getCacheBatch = r.getBatches(ex)
2023-11-07 07:18:34 +00:00
r.getCacheBatchToMap = r.getBatchToMap(ex)
2023-10-29 10:46:01 +00:00
}
2023-11-28 14:46:22 +00:00
re, ok := any(ca).(Refresh[K, V])
if ok {
r.refresh = re
}
2023-12-10 11:15:49 +00:00
initCache(r, a...)
return r
}
2023-12-10 11:15:49 +00:00
func initCache[K comparable, V any](r *MapCache[K, V], a ...any) {
gets := helper.ParseArgs[func(Cache[K, V], context.Context, K) (V, bool)](nil, a...)
if gets == nil {
r.gets = r.Cache.Get
} else {
r.gets = func(ctx context.Context, key K) (V, bool) {
return gets(r.Cache, ctx, key)
}
}
sets := helper.ParseArgs[func(Cache[K, V], context.Context, K, V)](nil, a...)
if sets == nil {
r.sets = r.Cache.Set
} else {
r.sets = func(ctx context.Context, key K, val V) {
sets(r.Cache, ctx, key, val)
}
}
getExpireTimes := helper.ParseArgs[func(Cache[K, V], context.Context) time.Duration](nil, a...)
if getExpireTimes == nil {
r.getExpireTimes = r.Cache.GetExpireTime
} else {
r.getExpireTimes = func(ctx context.Context) time.Duration {
return getExpireTimes(r.Cache, ctx)
}
}
ttl := helper.ParseArgs[func(Cache[K, V], context.Context, K) time.Duration](nil, a...)
if ttl == nil {
r.ttl = r.Cache.Ttl
} else {
r.ttl = func(ctx context.Context, k K) time.Duration {
return ttl(r.Cache, ctx, k)
}
}
del := helper.ParseArgs[func(Cache[K, V], context.Context, ...K)](nil, a...)
if del == nil {
r.del = r.Cache.Del
} else {
r.del = func(ctx context.Context, key ...K) {
del(r.Cache, ctx, key...)
}
}
flushAndClearExpired := helper.ParseArgs[[]func(Cache[K, V], context.Context)](nil, a...)
if flushAndClearExpired == nil {
r.flush = r.Cache.Flush
r.clearExpired = r.Cache.ClearExpired
} else {
r.flush = func(ctx context.Context) {
flushAndClearExpired[0](r.Cache, ctx)
}
if len(flushAndClearExpired) > 1 {
r.clearExpired = func(ctx context.Context) {
flushAndClearExpired[1](r.Cache, ctx)
}
} else {
r.clearExpired = r.Cache.ClearExpired
}
}
}
func (m *MapCache[K, V]) SetDefaultBatchFunc(fn MapSingleFn[K, V]) {
m.batchCacheFn = func(ctx context.Context, ids []K, a ...any) (map[K]V, error) {
var err error
rr := make(map[K]V)
for _, id := range ids {
v, er := fn(ctx, id, a...)
if er != nil {
err = errors.Join(er)
continue
}
rr[id] = v
}
return rr, err
}
}
func (m *MapCache[K, V]) SetCacheFunc(fn MapSingleFn[K, V]) {
2022-10-08 06:01:05 +00:00
m.cacheFunc = fn
2022-09-27 13:52:15 +00:00
}
2023-02-02 14:56:09 +00:00
2023-02-02 11:16:18 +00:00
func (m *MapCache[K, V]) GetLastSetTime(ctx context.Context, k K) (t time.Time) {
tt := m.Ttl(ctx, k)
2023-02-02 11:16:18 +00:00
if tt <= 0 {
return
2022-10-07 14:27:34 +00:00
}
return time.Now().Add(m.Ttl(ctx, k)).Add(-m.GetExpireTime(ctx))
2022-10-07 14:27:34 +00:00
}
func (m *MapCache[K, V]) SetCacheBatchFn(fn MapBatchFn[K, V]) {
2022-10-08 06:01:05 +00:00
m.batchCacheFn = fn
if m.cacheFunc == nil {
m.setDefaultCacheFn(fn)
2022-10-08 06:01:05 +00:00
}
}
func (m *MapCache[K, V]) setDefaultCacheFn(fn MapBatchFn[K, V]) {
m.cacheFunc = func(ctx context.Context, k K, a ...any) (V, error) {
var err error
var r map[K]V
r, err = fn(ctx, []K{k}, a...)
2022-10-08 06:01:05 +00:00
if err != nil {
var rr V
return rr, err
}
2023-02-28 07:17:16 +00:00
return r[k], err
2022-10-08 06:01:05 +00:00
}
2022-09-27 13:52:15 +00:00
}
2023-02-02 11:16:18 +00:00
func (m *MapCache[K, V]) Flush(ctx context.Context) {
m.mux.Lock()
defer m.mux.Unlock()
2023-12-10 11:15:49 +00:00
m.flush(ctx)
}
2023-12-03 14:42:44 +00:00
func (m *MapCache[K, V]) increaseUpdates(c context.Context, timeout time.Duration, data V, key K, params ...any) (V, error) {
var err error
nowTime := time.Now()
if nowTime.Sub(m.GetLastSetTime(c, key)) < m.increaseUpdate.CycleTime() {
return data, err
}
fn := func() {
2023-12-26 15:09:30 +00:00
l := m.muFn(c, m.mux, key)
l.Lock()
defer l.Unlock()
2023-12-03 14:42:44 +00:00
if nowTime.Sub(m.GetLastSetTime(c, key)) < m.increaseUpdate.CycleTime() {
return
}
dat, save, refresh, er := m.increaseUpdate.Fn(c, data, key, m.GetLastSetTime(c, key), params...)
if er != nil {
err = er
return
}
if refresh {
m.refresh.Refresh(c, key, params...)
}
if save {
m.Set(c, key, dat)
data = dat
}
}
if timeout > 0 {
er := helper.RunFnWithTimeout(c, timeout, fn)
if err == nil && er != nil {
return data, fmt.Errorf("increateUpdate cache %v err:[%s]", key, er)
}
} else {
fn()
}
return data, err
}
func (m *MapCache[K, V]) GetCache(c context.Context, key K, timeout time.Duration, params ...any) (V, error) {
data, ok := m.Get(c, key)
2023-11-28 14:46:22 +00:00
var err error
if ok {
2023-12-03 14:42:44 +00:00
if m.increaseUpdate == nil || m.refresh == nil {
2023-11-28 14:46:22 +00:00
return data, err
}
2023-12-03 14:42:44 +00:00
return m.increaseUpdates(c, timeout, data, key, params...)
}
call := func() {
2023-12-26 15:09:30 +00:00
l := m.muFn(c, m.mux, key)
l.Lock()
defer l.Unlock()
if data, ok = m.Get(c, key); ok {
return
}
data, err = m.cacheFunc(c, key, params...)
if err != nil {
return
}
m.Set(c, key, data)
}
if timeout > 0 {
2023-11-28 14:46:22 +00:00
er := helper.RunFnWithTimeout(c, timeout, call, fmt.Sprintf("get cache %v ", key))
if err == nil && er != nil {
err = er
}
} else {
call()
}
2023-02-02 11:16:18 +00:00
return data, err
}
2022-09-27 13:52:15 +00:00
func (m *MapCache[K, V]) GetCacheBatch(c context.Context, key []K, timeout time.Duration, params ...any) ([]V, error) {
2023-10-29 10:46:01 +00:00
return m.getCacheBatch(c, key, timeout, params...)
}
2023-11-07 07:18:34 +00:00
func (m *MapCache[K, V]) GetBatchToMap(c context.Context, key []K, timeout time.Duration, params ...any) (map[K]V, error) {
return m.getCacheBatchToMap(c, key, timeout, params...)
}
func (m *MapCache[K, V]) getBatchToMap(e Expend[K, V]) func(c context.Context, key []K, timeout time.Duration, params ...any) (map[K]V, error) {
return func(ctx context.Context, key []K, timeout time.Duration, params ...any) (map[K]V, error) {
2023-11-07 07:39:38 +00:00
var res map[K]V
2023-11-07 07:18:34 +00:00
var err error
mm, err := e.Gets(ctx, key)
2023-11-07 07:36:00 +00:00
if err != nil || len(key) == len(mm) {
return mm, err
2023-11-07 07:18:34 +00:00
}
2023-11-07 07:39:38 +00:00
var needIndex = make(map[K]int)
2023-11-07 07:36:00 +00:00
res = mm
2023-11-07 07:18:34 +00:00
var flushKeys []K
for i, k := range key {
2023-11-07 07:36:00 +00:00
_, ok := mm[k]
2023-11-07 07:18:34 +00:00
if !ok {
flushKeys = append(flushKeys, k)
needIndex[k] = i
}
}
call := func() {
m.mux.Lock()
defer m.mux.Unlock()
mmm, er := e.Gets(ctx, maps.FilterToSlice(needIndex, func(k K, v int) (K, bool) {
return k, true
}))
if er != nil {
err = er
return
}
for k, v := range mmm {
res[k] = v
delete(needIndex, k)
}
if len(needIndex) < 1 {
return
}
r, er := m.batchCacheFn(ctx, maps.FilterToSlice(needIndex, func(k K, v int) (K, bool) {
return k, true
}), params...)
2023-11-28 16:00:25 +00:00
if er != nil {
2023-11-07 07:18:34 +00:00
err = er
return
}
e.Sets(ctx, r)
for k := range needIndex {
v, ok := r[k]
if ok {
res[k] = v
}
}
}
if timeout > 0 {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
done := make(chan struct{}, 1)
go func() {
call()
done <- struct{}{}
}()
select {
case <-ctx.Done():
err = errors.New(fmt.Sprintf("get cache %v %s", key, ctx.Err().Error()))
return nil, err
case <-done:
}
} else {
call()
}
return res, err
}
}
func (m *MapCache[K, V]) getBatchToMapes(c context.Context, key []K, timeout time.Duration, params ...any) (r map[K]V, err error) {
r = make(map[K]V)
var needIndex = make(map[K]int)
for i, k := range key {
v, ok := m.Get(c, k)
if !ok {
needIndex[k] = i
} else {
r[k] = v
}
}
if len(needIndex) < 1 {
return
}
call := func() {
2023-12-26 15:09:30 +00:00
l := m.muFn(c, m.mux, key...)
l.Lock()
defer l.Unlock()
2023-11-07 07:18:34 +00:00
needFlushs := maps.FilterToSlice(needIndex, func(k K, v int) (K, bool) {
vv, ok := m.Get(c, k)
if ok {
r[k] = vv
delete(needIndex, k)
return k, false
}
return k, true
})
if len(needFlushs) < 1 {
return
}
rr, er := m.batchCacheFn(c, needFlushs, params...)
2023-11-28 16:00:25 +00:00
if er != nil {
2023-11-07 07:18:34 +00:00
err = er
return
}
for k := range needIndex {
v, ok := rr[k]
if ok {
r[k] = v
}
m.Set(c, k, v)
}
}
if timeout > 0 {
ctx, cancel := context.WithTimeout(c, timeout)
defer cancel()
done := make(chan struct{}, 1)
go func() {
call()
done <- struct{}{}
}()
select {
case <-ctx.Done():
err = errors.New(fmt.Sprintf("get cache %v %s", key, ctx.Err().Error()))
return nil, err
case <-done:
}
} else {
call()
}
return
}
2023-10-29 10:46:01 +00:00
func (m *MapCache[K, V]) getCacheBatchs(c context.Context, key []K, timeout time.Duration, params ...any) ([]V, error) {
2023-10-28 06:50:06 +00:00
var res = make([]V, 0, len(key))
var needIndex = make(map[K]int)
for i, k := range key {
2023-10-28 05:30:32 +00:00
v, ok := m.Get(c, k)
2023-10-28 06:50:06 +00:00
if !ok {
needIndex[k] = i
2022-09-27 13:52:15 +00:00
}
2023-10-28 06:50:06 +00:00
res = append(res, v)
}
2023-10-28 07:19:39 +00:00
if len(needIndex) < 1 {
2023-10-28 05:30:32 +00:00
return res, nil
}
2023-10-28 06:50:06 +00:00
var err error
2023-10-28 05:30:32 +00:00
call := func() {
2023-12-26 15:09:30 +00:00
l := m.muFn(c, m.mux, key...)
l.Lock()
defer l.Unlock()
2023-10-28 07:19:39 +00:00
needFlushs := maps.FilterToSlice(needIndex, func(k K, v int) (K, bool) {
vv, ok := m.Get(c, k)
if ok {
res[needIndex[k]] = vv
delete(needIndex, k)
return k, false
}
return k, true
2023-10-28 07:03:14 +00:00
})
2023-02-02 11:16:18 +00:00
2023-10-28 07:03:14 +00:00
if len(needFlushs) < 1 {
2023-10-28 05:30:32 +00:00
return
}
2023-02-02 11:16:18 +00:00
2023-10-28 07:03:14 +00:00
r, er := m.batchCacheFn(c, needFlushs, params...)
2023-11-28 16:00:25 +00:00
if er != nil {
2023-10-28 05:30:32 +00:00
err = er
return
2022-09-20 08:11:20 +00:00
}
2023-10-28 07:19:39 +00:00
for k, i := range needIndex {
2023-10-28 10:37:00 +00:00
v, ok := r[k]
2023-10-28 07:19:39 +00:00
if ok {
res[i] = v
2023-10-28 10:40:21 +00:00
m.Set(c, k, v)
2023-10-28 06:50:06 +00:00
}
2023-10-28 05:30:32 +00:00
}
}
if timeout > 0 {
ctx, cancel := context.WithTimeout(c, timeout)
defer cancel()
done := make(chan struct{}, 1)
go func() {
2022-09-20 08:11:20 +00:00
call()
2023-10-28 05:30:32 +00:00
done <- struct{}{}
}()
select {
case <-ctx.Done():
err = errors.New(fmt.Sprintf("get cache %v %s", key, ctx.Err().Error()))
return nil, err
2023-10-28 05:30:32 +00:00
case <-done:
}
} else {
call()
}
2023-10-28 06:50:06 +00:00
2022-09-27 13:52:15 +00:00
return res, err
2022-09-20 08:11:20 +00:00
}
2023-10-29 10:46:01 +00:00
func (m *MapCache[K, V]) getBatches(e Expend[K, V]) func(ctx context.Context, key []K, timeout time.Duration, params ...any) ([]V, error) {
cc := e
return func(ctx context.Context, key []K, timeout time.Duration, params ...any) ([]V, error) {
var res = make([]V, 0, len(key))
var needIndex = make(map[K]int)
var err error
mm, err := cc.Gets(ctx, key)
if err != nil {
return nil, err
}
var flushKeys []K
for i, k := range key {
v, ok := mm[k]
if !ok {
flushKeys = append(flushKeys, k)
needIndex[k] = i
var vv V
v = vv
}
res = append(res, v)
}
if len(needIndex) < 1 {
return res, nil
}
call := func() {
m.mux.Lock()
defer m.mux.Unlock()
mmm, er := cc.Gets(ctx, maps.FilterToSlice(needIndex, func(k K, v int) (K, bool) {
return k, true
}))
if er != nil {
err = er
return
}
for k, v := range mmm {
res[needIndex[k]] = v
delete(needIndex, k)
}
2023-10-29 10:46:01 +00:00
if len(needIndex) < 1 {
2023-10-29 10:46:01 +00:00
return
}
r, er := m.batchCacheFn(ctx, maps.FilterToSlice(needIndex, func(k K, v int) (K, bool) {
return k, true
}), params...)
2023-11-28 16:00:25 +00:00
if er != nil {
2023-10-29 10:46:01 +00:00
err = er
return
}
cc.Sets(ctx, r)
for k, i := range needIndex {
v, ok := r[k]
if ok {
res[i] = v
}
}
}
if timeout > 0 {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
done := make(chan struct{}, 1)
go func() {
call()
done <- struct{}{}
}()
select {
case <-ctx.Done():
err = errors.New(fmt.Sprintf("get cache %v %s", key, ctx.Err().Error()))
return nil, err
2023-10-29 10:46:01 +00:00
case <-done:
}
} else {
call()
}
return res, err
}
}