wp-go/cache/map.go

265 lines
5.5 KiB
Go
Raw Permalink Normal View History

2022-09-20 08:11:20 +00:00
package cache
import (
"context"
"errors"
"fmt"
"sync"
2022-10-13 03:09:52 +00:00
"sync/atomic"
2022-09-20 08:11:20 +00:00
"time"
)
type MapCache[K comparable, V any] struct {
2022-10-13 03:09:52 +00:00
data atomic.Value
2022-10-08 06:01:05 +00:00
mutex *sync.Mutex
cacheFunc func(...any) (V, error)
batchCacheFn func(...any) (map[K]V, error)
expireTime time.Duration
}
2022-09-28 13:16:05 +00:00
func NewMapCache[K comparable, V any](expireTime time.Duration) *MapCache[K, V] {
2022-10-13 03:09:52 +00:00
var v atomic.Value
v.Store(make(map[K]mapCacheStruct[V]))
return &MapCache[K, V]{expireTime: expireTime, data: v}
2022-09-28 13:16:05 +00:00
}
type mapCacheStruct[T any] struct {
setTime time.Time
incr int
data T
2022-09-20 08:11:20 +00:00
}
2022-09-27 13:52:15 +00:00
func (m *MapCache[K, V]) SetCacheFunc(fn func(...any) (V, error)) {
2022-10-08 06:01:05 +00:00
m.cacheFunc = fn
2022-09-27 13:52:15 +00:00
}
2022-10-07 14:27:34 +00:00
func (m *MapCache[K, V]) GetSetTime(k K) (t time.Time) {
2022-10-13 03:09:52 +00:00
r, ok := m.data.Load().(map[K]mapCacheStruct[V])[k]
2022-10-07 14:27:34 +00:00
if ok {
t = r.setTime
}
return
}
2022-09-27 13:52:15 +00:00
func (m *MapCache[K, V]) SetCacheBatchFunc(fn func(...any) (map[K]V, error)) {
2022-10-08 06:01:05 +00:00
m.batchCacheFn = fn
if m.cacheFunc == nil {
m.setCacheFn(fn)
}
}
func (m *MapCache[K, V]) setCacheFn(fn func(...any) (map[K]V, error)) {
m.cacheFunc = func(a ...any) (V, error) {
id := a[0].(K)
r, err := fn([]K{id})
if err != nil {
var rr V
return rr, err
}
return r[id], err
}
2022-09-27 13:52:15 +00:00
}
2022-10-08 06:01:05 +00:00
func NewMapCacheByFn[K comparable, V any](fn func(...any) (V, error), expireTime time.Duration) *MapCache[K, V] {
2022-10-13 03:09:52 +00:00
var d atomic.Value
d.Store(make(map[K]mapCacheStruct[V]))
2022-09-20 08:11:20 +00:00
return &MapCache[K, V]{
2022-10-08 06:01:05 +00:00
mutex: &sync.Mutex{},
cacheFunc: fn,
expireTime: expireTime,
2022-10-13 03:09:52 +00:00
data: d,
}
}
2022-09-28 13:16:05 +00:00
func NewMapCacheByBatchFn[K comparable, V any](fn func(...any) (map[K]V, error), expireTime time.Duration) *MapCache[K, V] {
2022-10-13 03:09:52 +00:00
var d atomic.Value
d.Store(make(map[K]mapCacheStruct[V]))
2022-10-08 06:01:05 +00:00
r := &MapCache[K, V]{
mutex: &sync.Mutex{},
batchCacheFn: fn,
expireTime: expireTime,
2022-10-13 03:09:52 +00:00
data: d,
2022-09-20 08:11:20 +00:00
}
2022-10-08 06:01:05 +00:00
r.setCacheFn(fn)
return r
2022-09-20 08:11:20 +00:00
}
2022-09-28 13:16:05 +00:00
func (m *MapCache[K, V]) Flush() {
m.mutex.Lock()
defer m.mutex.Unlock()
2022-10-13 03:09:52 +00:00
var d atomic.Value
d.Store(make(map[K]mapCacheStruct[V]))
m.data = d
}
func (m *MapCache[K, V]) Get(k K) V {
2022-10-13 03:09:52 +00:00
return m.data.Load().(map[K]mapCacheStruct[V])[k].data
}
func (m *MapCache[K, V]) Set(k K, v V) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.set(k, v)
2022-09-20 08:11:20 +00:00
}
2022-09-27 07:35:34 +00:00
func (m *MapCache[K, V]) SetByBatchFn(params ...any) error {
m.mutex.Lock()
defer m.mutex.Unlock()
2022-10-08 06:01:05 +00:00
r, err := m.batchCacheFn(params...)
2022-09-27 07:35:34 +00:00
if err != nil {
return err
}
for k, v := range r {
m.set(k, v)
}
return nil
}
func (m *MapCache[K, V]) set(k K, v V) {
2022-10-13 03:09:52 +00:00
d, ok := m.data.Load().(map[K]mapCacheStruct[V])
t := time.Now()
2022-10-13 03:09:52 +00:00
data := d[k]
if !ok {
data.data = v
data.setTime = t
data.incr++
} else {
2022-10-13 03:09:52 +00:00
data = mapCacheStruct[V]{
data: v,
setTime: t,
}
}
2022-10-13 03:09:52 +00:00
d[k] = data
m.data.Store(d)
}
func (m *MapCache[K, V]) GetCache(c context.Context, key K, timeout time.Duration, params ...any) (V, error) {
2022-10-13 03:09:52 +00:00
d := m.data.Load().(map[K]mapCacheStruct[V])
data, ok := d[key]
if !ok {
data = mapCacheStruct[V]{}
}
2022-09-28 12:02:43 +00:00
now := time.Duration(time.Now().UnixNano())
2022-09-20 08:11:20 +00:00
var err error
2022-09-28 12:02:43 +00:00
expired := time.Duration(data.setTime.UnixNano())+m.expireTime < now
2022-09-20 13:16:51 +00:00
//todo 这里应该判断下取出的值是否为零值,不过怎么操作呢?
if !ok || (ok && m.expireTime >= 0 && expired) {
t := data.incr
2022-09-20 08:11:20 +00:00
call := func() {
2022-10-13 03:09:52 +00:00
tmp, o := m.data.Load().(map[K]mapCacheStruct[V])[key]
if o && tmp.incr > t {
2022-09-20 08:11:20 +00:00
return
}
2022-10-13 03:09:52 +00:00
m.mutex.Lock()
defer m.mutex.Unlock()
2022-10-08 06:01:05 +00:00
r, er := m.cacheFunc(params...)
2022-09-20 08:11:20 +00:00
if err != nil {
err = er
return
}
data.setTime = time.Now()
data.data = r
data.incr++
2022-10-13 03:09:52 +00:00
d[key] = data
m.data.Store(d)
}
if timeout > 0 {
ctx, cancel := context.WithTimeout(c, timeout)
defer cancel()
done := make(chan struct{})
go func() {
call()
done <- struct{}{}
}()
select {
case <-ctx.Done():
err = errors.New(fmt.Sprintf("get cache %v %s", key, ctx.Err().Error()))
case <-done:
}
} else {
call()
}
}
return data.data, err
}
2022-09-27 13:52:15 +00:00
func (m *MapCache[K, V]) GetCacheBatch(c context.Context, key []K, timeout time.Duration, params ...any) ([]V, error) {
var needFlush []K
var res []V
t := 0
2022-09-28 12:02:43 +00:00
now := time.Duration(time.Now().UnixNano())
2022-10-13 03:09:52 +00:00
data := m.data.Load().(map[K]mapCacheStruct[V])
2022-09-27 13:52:15 +00:00
for _, k := range key {
2022-10-13 03:09:52 +00:00
d, ok := data[k]
2022-09-27 13:52:15 +00:00
if !ok {
needFlush = append(needFlush, k)
continue
}
2022-09-28 12:02:43 +00:00
expired := time.Duration(d.setTime.UnixNano())+m.expireTime < now
2022-09-27 13:52:15 +00:00
if expired {
needFlush = append(needFlush, k)
}
t = t + d.incr
}
var err error
//todo 这里应该判断下取出的值是否为零值,不过怎么操作呢?
2022-09-27 13:52:15 +00:00
if len(needFlush) > 0 {
call := func() {
m.mutex.Lock()
defer m.mutex.Unlock()
2022-09-27 13:52:15 +00:00
tt := 0
for _, dd := range needFlush {
2022-10-13 03:09:52 +00:00
if ddd, ok := data[dd]; ok {
2022-09-27 13:52:15 +00:00
tt = tt + ddd.incr
}
}
if tt > t {
return
}
2022-10-08 06:01:05 +00:00
r, er := m.batchCacheFn(params...)
if err != nil {
err = er
return
}
for k, v := range r {
m.set(k, v)
}
2022-09-20 08:11:20 +00:00
}
if timeout > 0 {
ctx, cancel := context.WithTimeout(c, timeout)
2022-09-20 08:11:20 +00:00
defer cancel()
done := make(chan struct{})
go func() {
call()
done <- struct{}{}
}()
select {
case <-ctx.Done():
err = errors.New(fmt.Sprintf("get cache %v %s", key, ctx.Err().Error()))
case <-done:
}
} else {
call()
}
}
2022-09-27 13:52:15 +00:00
for _, k := range key {
2022-10-13 03:09:52 +00:00
d := data[k]
2022-09-27 13:52:15 +00:00
res = append(res, d.data)
}
return res, err
2022-09-20 08:11:20 +00:00
}
2022-09-28 12:02:43 +00:00
2022-09-28 13:16:05 +00:00
func (m *MapCache[K, V]) ClearExpired() {
2022-09-28 12:02:43 +00:00
now := time.Duration(time.Now().UnixNano())
m.mutex.Lock()
defer m.mutex.Unlock()
2022-10-13 03:09:52 +00:00
data := m.data.Load().(map[K]mapCacheStruct[V])
for k, v := range data {
2022-09-28 12:02:43 +00:00
if now > time.Duration(v.setTime.UnixNano())+m.expireTime {
2022-10-13 03:09:52 +00:00
delete(data, k)
2022-09-28 12:02:43 +00:00
}
}
2022-10-13 03:09:52 +00:00
m.data.Store(data)
2022-09-28 12:02:43 +00:00
}