wp-go/cache/map.go

483 lines
11 KiB
Go
Raw Normal View History

2022-09-20 08:11:20 +00:00
package cache
import (
"context"
"errors"
"fmt"
2023-11-28 14:46:22 +00:00
"github.com/fthvgb1/wp-go/cache/reload"
"github.com/fthvgb1/wp-go/helper"
2023-10-28 07:19:39 +00:00
"github.com/fthvgb1/wp-go/helper/maps"
2022-09-20 08:11:20 +00:00
"sync"
"time"
)
type MapCache[K comparable, V any] struct {
Cache[K, V]
2023-11-07 07:18:34 +00:00
mux sync.Mutex
cacheFunc MapSingleFn[K, V]
batchCacheFn MapBatchFn[K, V]
getCacheBatch func(c context.Context, key []K, timeout time.Duration, params ...any) ([]V, error)
getCacheBatchToMap func(c context.Context, key []K, timeout time.Duration, params ...any) (map[K]V, error)
2023-12-03 14:42:44 +00:00
increaseUpdate *IncreaseUpdate[K, V]
2023-11-28 14:46:22 +00:00
refresh Refresh[K, V]
}
type IncreaseUpdate[K comparable, V any] struct {
CycleTime func() time.Duration
Fn IncreaseFn[K, V]
}
2023-12-03 14:42:44 +00:00
func NewIncreaseUpdate[K comparable, V any](name string, fn IncreaseFn[K, V], cycleTime time.Duration, tFn func() time.Duration) *IncreaseUpdate[K, V] {
2023-11-28 14:46:22 +00:00
tFn = reload.FnVal(name, cycleTime, tFn)
2023-12-03 14:42:44 +00:00
return &IncreaseUpdate[K, V]{CycleTime: tFn, Fn: fn}
}
type MapSingleFn[K, V any] func(context.Context, K, ...any) (V, error)
type MapBatchFn[K comparable, V any] func(context.Context, []K, ...any) (map[K]V, error)
2023-11-28 14:46:22 +00:00
type IncreaseFn[K comparable, V any] func(c context.Context, currentData V, k K, t time.Time, a ...any) (data V, save bool, refresh bool, err error)
2023-12-03 14:42:44 +00:00
func NewMapCache[K comparable, V any](ca Cache[K, V], cacheFunc MapSingleFn[K, V], batchCacheFn MapBatchFn[K, V], inc *IncreaseUpdate[K, V]) *MapCache[K, V] {
r := &MapCache[K, V]{
2023-11-28 14:46:22 +00:00
Cache: ca,
mux: sync.Mutex{},
cacheFunc: cacheFunc,
batchCacheFn: batchCacheFn,
increaseUpdate: inc,
}
if cacheFunc == nil && batchCacheFn != nil {
r.setDefaultCacheFn(batchCacheFn)
} else if batchCacheFn == nil && cacheFunc != nil {
r.SetDefaultBatchFunc(cacheFunc)
}
2023-11-28 14:46:22 +00:00
ex, ok := any(ca).(Expend[K, V])
2023-10-29 10:46:01 +00:00
if !ok {
r.getCacheBatch = r.getCacheBatchs
2023-11-07 07:18:34 +00:00
r.getCacheBatchToMap = r.getBatchToMapes
2023-10-29 10:46:01 +00:00
} else {
r.getCacheBatch = r.getBatches(ex)
2023-11-07 07:18:34 +00:00
r.getCacheBatchToMap = r.getBatchToMap(ex)
2023-10-29 10:46:01 +00:00
}
2023-11-28 14:46:22 +00:00
re, ok := any(ca).(Refresh[K, V])
if ok {
r.refresh = re
}
return r
}
func (m *MapCache[K, V]) SetDefaultBatchFunc(fn MapSingleFn[K, V]) {
m.batchCacheFn = func(ctx context.Context, ids []K, a ...any) (map[K]V, error) {
var err error
rr := make(map[K]V)
for _, id := range ids {
v, er := fn(ctx, id, a...)
if er != nil {
err = errors.Join(er)
continue
}
rr[id] = v
}
return rr, err
}
}
func (m *MapCache[K, V]) SetCacheFunc(fn MapSingleFn[K, V]) {
2022-10-08 06:01:05 +00:00
m.cacheFunc = fn
2022-09-27 13:52:15 +00:00
}
2023-02-02 14:56:09 +00:00
2023-02-02 11:16:18 +00:00
func (m *MapCache[K, V]) GetLastSetTime(ctx context.Context, k K) (t time.Time) {
tt := m.Ttl(ctx, k)
2023-02-02 11:16:18 +00:00
if tt <= 0 {
return
2022-10-07 14:27:34 +00:00
}
return time.Now().Add(m.Ttl(ctx, k)).Add(-m.GetExpireTime(ctx))
2022-10-07 14:27:34 +00:00
}
func (m *MapCache[K, V]) SetCacheBatchFn(fn MapBatchFn[K, V]) {
2022-10-08 06:01:05 +00:00
m.batchCacheFn = fn
if m.cacheFunc == nil {
m.setDefaultCacheFn(fn)
2022-10-08 06:01:05 +00:00
}
}
func (m *MapCache[K, V]) setDefaultCacheFn(fn MapBatchFn[K, V]) {
m.cacheFunc = func(ctx context.Context, k K, a ...any) (V, error) {
var err error
var r map[K]V
r, err = fn(ctx, []K{k}, a...)
2022-10-08 06:01:05 +00:00
if err != nil {
var rr V
return rr, err
}
2023-02-28 07:17:16 +00:00
return r[k], err
2022-10-08 06:01:05 +00:00
}
2022-09-27 13:52:15 +00:00
}
2023-02-02 11:16:18 +00:00
func (m *MapCache[K, V]) Flush(ctx context.Context) {
m.mux.Lock()
defer m.mux.Unlock()
m.Cache.Flush(ctx)
}
2023-12-03 14:42:44 +00:00
func (m *MapCache[K, V]) increaseUpdates(c context.Context, timeout time.Duration, data V, key K, params ...any) (V, error) {
var err error
nowTime := time.Now()
if nowTime.Sub(m.GetLastSetTime(c, key)) < m.increaseUpdate.CycleTime() {
return data, err
}
fn := func() {
m.mux.Lock()
defer m.mux.Unlock()
if nowTime.Sub(m.GetLastSetTime(c, key)) < m.increaseUpdate.CycleTime() {
return
}
dat, save, refresh, er := m.increaseUpdate.Fn(c, data, key, m.GetLastSetTime(c, key), params...)
if er != nil {
err = er
return
}
if refresh {
m.refresh.Refresh(c, key, params...)
}
if save {
m.Set(c, key, dat)
data = dat
}
}
if timeout > 0 {
er := helper.RunFnWithTimeout(c, timeout, fn)
if err == nil && er != nil {
return data, fmt.Errorf("increateUpdate cache %v err:[%s]", key, er)
}
} else {
fn()
}
return data, err
}
func (m *MapCache[K, V]) GetCache(c context.Context, key K, timeout time.Duration, params ...any) (V, error) {
data, ok := m.Get(c, key)
2023-11-28 14:46:22 +00:00
var err error
if ok {
2023-12-03 14:42:44 +00:00
if m.increaseUpdate == nil || m.refresh == nil {
2023-11-28 14:46:22 +00:00
return data, err
}
2023-12-03 14:42:44 +00:00
return m.increaseUpdates(c, timeout, data, key, params...)
}
call := func() {
m.mux.Lock()
defer m.mux.Unlock()
if data, ok = m.Get(c, key); ok {
return
}
data, err = m.cacheFunc(c, key, params...)
if err != nil {
return
}
m.Set(c, key, data)
}
if timeout > 0 {
2023-11-28 14:46:22 +00:00
er := helper.RunFnWithTimeout(c, timeout, call, fmt.Sprintf("get cache %v ", key))
if err == nil && er != nil {
err = er
}
} else {
call()
}
2023-02-02 11:16:18 +00:00
return data, err
}
2022-09-27 13:52:15 +00:00
func (m *MapCache[K, V]) GetCacheBatch(c context.Context, key []K, timeout time.Duration, params ...any) ([]V, error) {
2023-10-29 10:46:01 +00:00
return m.getCacheBatch(c, key, timeout, params...)
}
2023-11-07 07:18:34 +00:00
func (m *MapCache[K, V]) GetBatchToMap(c context.Context, key []K, timeout time.Duration, params ...any) (map[K]V, error) {
return m.getCacheBatchToMap(c, key, timeout, params...)
}
func (m *MapCache[K, V]) getBatchToMap(e Expend[K, V]) func(c context.Context, key []K, timeout time.Duration, params ...any) (map[K]V, error) {
return func(ctx context.Context, key []K, timeout time.Duration, params ...any) (map[K]V, error) {
2023-11-07 07:39:38 +00:00
var res map[K]V
2023-11-07 07:18:34 +00:00
var err error
mm, err := e.Gets(ctx, key)
2023-11-07 07:36:00 +00:00
if err != nil || len(key) == len(mm) {
return mm, err
2023-11-07 07:18:34 +00:00
}
2023-11-07 07:39:38 +00:00
var needIndex = make(map[K]int)
2023-11-07 07:36:00 +00:00
res = mm
2023-11-07 07:18:34 +00:00
var flushKeys []K
for i, k := range key {
2023-11-07 07:36:00 +00:00
_, ok := mm[k]
2023-11-07 07:18:34 +00:00
if !ok {
flushKeys = append(flushKeys, k)
needIndex[k] = i
}
}
call := func() {
m.mux.Lock()
defer m.mux.Unlock()
mmm, er := e.Gets(ctx, maps.FilterToSlice(needIndex, func(k K, v int) (K, bool) {
return k, true
}))
if er != nil {
err = er
return
}
for k, v := range mmm {
res[k] = v
delete(needIndex, k)
}
if len(needIndex) < 1 {
return
}
r, er := m.batchCacheFn(ctx, maps.FilterToSlice(needIndex, func(k K, v int) (K, bool) {
return k, true
}), params...)
2023-11-28 16:00:25 +00:00
if er != nil {
2023-11-07 07:18:34 +00:00
err = er
return
}
e.Sets(ctx, r)
for k := range needIndex {
v, ok := r[k]
if ok {
res[k] = v
}
}
}
if timeout > 0 {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
done := make(chan struct{}, 1)
go func() {
call()
done <- struct{}{}
}()
select {
case <-ctx.Done():
err = errors.New(fmt.Sprintf("get cache %v %s", key, ctx.Err().Error()))
return nil, err
case <-done:
}
} else {
call()
}
return res, err
}
}
func (m *MapCache[K, V]) getBatchToMapes(c context.Context, key []K, timeout time.Duration, params ...any) (r map[K]V, err error) {
r = make(map[K]V)
var needIndex = make(map[K]int)
for i, k := range key {
v, ok := m.Get(c, k)
if !ok {
needIndex[k] = i
} else {
r[k] = v
}
}
if len(needIndex) < 1 {
return
}
call := func() {
m.mux.Lock()
defer m.mux.Unlock()
needFlushs := maps.FilterToSlice(needIndex, func(k K, v int) (K, bool) {
vv, ok := m.Get(c, k)
if ok {
r[k] = vv
delete(needIndex, k)
return k, false
}
return k, true
})
if len(needFlushs) < 1 {
return
}
rr, er := m.batchCacheFn(c, needFlushs, params...)
2023-11-28 16:00:25 +00:00
if er != nil {
2023-11-07 07:18:34 +00:00
err = er
return
}
for k := range needIndex {
v, ok := rr[k]
if ok {
r[k] = v
}
m.Set(c, k, v)
}
}
if timeout > 0 {
ctx, cancel := context.WithTimeout(c, timeout)
defer cancel()
done := make(chan struct{}, 1)
go func() {
call()
done <- struct{}{}
}()
select {
case <-ctx.Done():
err = errors.New(fmt.Sprintf("get cache %v %s", key, ctx.Err().Error()))
return nil, err
case <-done:
}
} else {
call()
}
return
}
2023-10-29 10:46:01 +00:00
func (m *MapCache[K, V]) getCacheBatchs(c context.Context, key []K, timeout time.Duration, params ...any) ([]V, error) {
2023-10-28 06:50:06 +00:00
var res = make([]V, 0, len(key))
var needIndex = make(map[K]int)
for i, k := range key {
2023-10-28 05:30:32 +00:00
v, ok := m.Get(c, k)
2023-10-28 06:50:06 +00:00
if !ok {
needIndex[k] = i
2022-09-27 13:52:15 +00:00
}
2023-10-28 06:50:06 +00:00
res = append(res, v)
}
2023-10-28 07:19:39 +00:00
if len(needIndex) < 1 {
2023-10-28 05:30:32 +00:00
return res, nil
}
2023-10-28 06:50:06 +00:00
var err error
2023-10-28 05:30:32 +00:00
call := func() {
m.mux.Lock()
defer m.mux.Unlock()
2023-10-28 07:19:39 +00:00
needFlushs := maps.FilterToSlice(needIndex, func(k K, v int) (K, bool) {
vv, ok := m.Get(c, k)
if ok {
res[needIndex[k]] = vv
delete(needIndex, k)
return k, false
}
return k, true
2023-10-28 07:03:14 +00:00
})
2023-02-02 11:16:18 +00:00
2023-10-28 07:03:14 +00:00
if len(needFlushs) < 1 {
2023-10-28 05:30:32 +00:00
return
}
2023-02-02 11:16:18 +00:00
2023-10-28 07:03:14 +00:00
r, er := m.batchCacheFn(c, needFlushs, params...)
2023-11-28 16:00:25 +00:00
if er != nil {
2023-10-28 05:30:32 +00:00
err = er
return
2022-09-20 08:11:20 +00:00
}
2023-10-28 07:19:39 +00:00
for k, i := range needIndex {
2023-10-28 10:37:00 +00:00
v, ok := r[k]
2023-10-28 07:19:39 +00:00
if ok {
res[i] = v
2023-10-28 10:40:21 +00:00
m.Set(c, k, v)
2023-10-28 06:50:06 +00:00
}
2023-10-28 05:30:32 +00:00
}
}
if timeout > 0 {
ctx, cancel := context.WithTimeout(c, timeout)
defer cancel()
done := make(chan struct{}, 1)
go func() {
2022-09-20 08:11:20 +00:00
call()
2023-10-28 05:30:32 +00:00
done <- struct{}{}
}()
select {
case <-ctx.Done():
err = errors.New(fmt.Sprintf("get cache %v %s", key, ctx.Err().Error()))
return nil, err
2023-10-28 05:30:32 +00:00
case <-done:
}
} else {
call()
}
2023-10-28 06:50:06 +00:00
2022-09-27 13:52:15 +00:00
return res, err
2022-09-20 08:11:20 +00:00
}
2023-10-29 10:46:01 +00:00
func (m *MapCache[K, V]) getBatches(e Expend[K, V]) func(ctx context.Context, key []K, timeout time.Duration, params ...any) ([]V, error) {
cc := e
return func(ctx context.Context, key []K, timeout time.Duration, params ...any) ([]V, error) {
var res = make([]V, 0, len(key))
var needIndex = make(map[K]int)
var err error
mm, err := cc.Gets(ctx, key)
if err != nil {
return nil, err
}
var flushKeys []K
for i, k := range key {
v, ok := mm[k]
if !ok {
flushKeys = append(flushKeys, k)
needIndex[k] = i
var vv V
v = vv
}
res = append(res, v)
}
if len(needIndex) < 1 {
return res, nil
}
call := func() {
m.mux.Lock()
defer m.mux.Unlock()
mmm, er := cc.Gets(ctx, maps.FilterToSlice(needIndex, func(k K, v int) (K, bool) {
return k, true
}))
if er != nil {
err = er
return
}
for k, v := range mmm {
res[needIndex[k]] = v
delete(needIndex, k)
}
2023-10-29 10:46:01 +00:00
if len(needIndex) < 1 {
2023-10-29 10:46:01 +00:00
return
}
r, er := m.batchCacheFn(ctx, maps.FilterToSlice(needIndex, func(k K, v int) (K, bool) {
return k, true
}), params...)
2023-11-28 16:00:25 +00:00
if er != nil {
2023-10-29 10:46:01 +00:00
err = er
return
}
cc.Sets(ctx, r)
for k, i := range needIndex {
v, ok := r[k]
if ok {
res[i] = v
}
}
}
if timeout > 0 {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
done := make(chan struct{}, 1)
go func() {
call()
done <- struct{}{}
}()
select {
case <-ctx.Done():
err = errors.New(fmt.Sprintf("get cache %v %s", key, ctx.Err().Error()))
return nil, err
2023-10-29 10:46:01 +00:00
case <-done:
}
} else {
call()
}
return res, err
}
}