From 9c60d10568d0d8e92501c574275b0b8d0d3169ba Mon Sep 17 00:00:00 2001 From: xing Date: Wed, 19 Jun 2024 16:10:42 +0800 Subject: [PATCH] optimize sign mechanism --- app/cmd/main.go | 55 +-------------- app/ossigns/sings.go | 69 ++++++++++++++++++ helper/httptool/http.go | 67 ++++++++++-------- helper/slice/mockmap/map.go | 3 + helper/slice/slice.go | 9 +++ safety/vars.go | 8 ++- signs/signs.go | 135 ++++++++++++++++++++++++++++++++++++ 7 files changed, 263 insertions(+), 83 deletions(-) create mode 100644 app/ossigns/sings.go create mode 100644 signs/signs.go diff --git a/app/cmd/main.go b/app/cmd/main.go index 4655d24..f8fb5d3 100644 --- a/app/cmd/main.go +++ b/app/cmd/main.go @@ -3,7 +3,7 @@ package main import ( "flag" "fmt" - "github.com/fthvgb1/wp-go/app/mail" + "github.com/fthvgb1/wp-go/app/ossigns" "github.com/fthvgb1/wp-go/app/pkg/cache" "github.com/fthvgb1/wp-go/app/pkg/config" "github.com/fthvgb1/wp-go/app/pkg/db" @@ -14,14 +14,10 @@ import ( "github.com/fthvgb1/wp-go/app/theme" "github.com/fthvgb1/wp-go/app/wpconfig" "github.com/fthvgb1/wp-go/cache/cachemanager" - "github.com/fthvgb1/wp-go/cache/reload" "github.com/fthvgb1/wp-go/model" - "log" "os" - "os/signal" "regexp" "strings" - "syscall" "time" ) @@ -89,52 +85,6 @@ func cronClearCache() { } } -func flushCache() { - defer func() { - if r := recover(); r != nil { - err := mail.SendMail([]string{config.GetConfig().Mail.User}, "清空缓存失败", fmt.Sprintf("err:[%s]", r)) - logs.IfError(err, "发邮件失败") - } - }() - cachemanager.Flush() - log.Println("all cache flushed") -} - -func reloads() { - defer func() { - if r := recover(); r != nil { - log.Println(r) - } - }() - err := config.InitConfig(confPath) - logs.IfError(err, "获取配置文件失败", confPath) - err = logs.InitLogger() - logs.IfError(err, "日志配置错误") - _, err = db.InitDb() - logs.IfError(err, "重新读取db失败", config.GetConfig().Mysql) - err = wpconfig.InitOptions() - logs.IfError(err, "获取网站设置WpOption失败") - err = wpconfig.InitTerms() - logs.IfError(err, "获取WpTerms表失败") - wphandle.LoadPlugins() - reload.Reloads("themeArgAndConfig") - flushCache() - log.Println("reload complete") -} - -func signalNotify() { - c := make(chan os.Signal) - signal.Notify(c, syscall.SIGUSR1, syscall.SIGUSR2) - for { - switch <-c { - case syscall.SIGUSR1: - go reloads() - case syscall.SIGUSR2: - go flushCache() - } - } -} - func main() { defer func() { if r := recover(); r != nil { @@ -143,7 +93,8 @@ func main() { } }() inits() - go signalNotify() + ossigns.SetConfPath(confPath) + go ossigns.SignalNotify() Gin := route.SetupRouter() c := config.GetConfig() if c.Ssl.Key != "" && c.Ssl.Cert != "" { diff --git a/app/ossigns/sings.go b/app/ossigns/sings.go new file mode 100644 index 0000000..bf6f0a3 --- /dev/null +++ b/app/ossigns/sings.go @@ -0,0 +1,69 @@ +package ossigns + +import ( + "fmt" + "github.com/fthvgb1/wp-go/app/mail" + "github.com/fthvgb1/wp-go/app/pkg/config" + "github.com/fthvgb1/wp-go/app/pkg/db" + "github.com/fthvgb1/wp-go/app/pkg/logs" + "github.com/fthvgb1/wp-go/app/plugins/wphandle" + "github.com/fthvgb1/wp-go/app/wpconfig" + "github.com/fthvgb1/wp-go/cache/cachemanager" + "github.com/fthvgb1/wp-go/cache/reload" + "github.com/fthvgb1/wp-go/signs" + "log" + "syscall" +) + +var confPath string + +func SetConfPath(path string) { + confPath = path +} + +func FlushCache() { + defer func() { + if r := recover(); r != nil { + err := mail.SendMail([]string{config.GetConfig().Mail.User}, "清空缓存失败", fmt.Sprintf("err:[%s]", r)) + logs.IfError(err, "发邮件失败") + } + }() + cachemanager.Flush() + log.Println("all cache flushed") +} + +func Reloads() { + defer func() { + if r := recover(); r != nil { + log.Println(r) + } + }() + err := config.InitConfig(confPath) + logs.IfError(err, "获取配置文件失败", confPath) + err = logs.InitLogger() + logs.IfError(err, "日志配置错误") + _, err = db.InitDb() + logs.IfError(err, "重新读取db失败", config.GetConfig().Mysql) + err = wpconfig.InitOptions() + logs.IfError(err, "获取网站设置WpOption失败") + err = wpconfig.InitTerms() + logs.IfError(err, "获取WpTerms表失败") + wphandle.LoadPlugins() + reload.Reloads("themeArgAndConfig") + FlushCache() + log.Println("reload complete") +} + +func SignalNotify() { + rel := func() bool { + go Reloads() + return true + } + flu := func() bool { + go FlushCache() + return true + } + signs.Install(syscall.SIGUSR1, rel, "reload") + signs.Install(syscall.SIGUSR2, flu, "flush") + signs.Wait() +} diff --git a/helper/httptool/http.go b/helper/httptool/http.go index 5ac13b6..51d4cb1 100644 --- a/helper/httptool/http.go +++ b/helper/httptool/http.go @@ -153,46 +153,45 @@ func SetBody(req *http.Request, types int, form map[string]any) (err error) { return } bb, ok := maps.GetStrAnyVal[*[]byte](form, "binary") - if !ok { - return errors.New("no binary value") + if ok { + req.Body = io.NopCloser(NewBodyBuffer(bb)) + req.Header.Add("Content-Type", "application/octet-stream") + req.ContentLength = int64(len(*bb)) + return } - req.Body = io.NopCloser(&BodyBuffer{0, bb}) - req.Header.Add("Content-Type", "application/octet-stream") - req.ContentLength = int64(len(*bb)) + bf, ok := maps.GetStrAnyVal[*BodyBuffer](form, "binary") + if ok { + req.Body = bf + req.Header.Add("Content-Type", "application/octet-stream") + req.ContentLength = int64(len(*bf.Data)) + return + } + return errors.New("no binary value") } return } type BodyBuffer struct { - Offset int - Data *[]byte + Offset int + Data *[]byte + ReadFn func([]byte) (int, error) + CloseFn func() error } -func (b *BodyBuffer) Write(p []byte) (int, error) { - if b.Offset == 0 { - copy(*b.Data, p) - if len(p) <= len(*b.Data) { - b.Offset += len(p) - return len(p), nil - } - b.Offset += len(*b.Data) - return len(*b.Data), nil +func (b *BodyBuffer) Close() error { + if b.CloseFn != nil { + return b.CloseFn() } - if len(p)+b.Offset <= len(*b.Data) { - copy((*b.Data)[b.Offset:b.Offset+len(p)], p) - b.Offset += len(p) - return len(p), nil - } - l := len(*b.Data) - b.Offset - if l <= 0 { - return 0, nil - } - copy((*b.Data)[b.Offset:], p[:l]) - return l, nil + b.Offset = 0 + return nil } func (b *BodyBuffer) Read(p []byte) (int, error) { + return b.ReadFn(p) +} + +func (b *BodyBuffer) Reads(p []byte) (int, error) { data := (*b.Data)[b.Offset:] if len(p) <= len(data) { copy(p, data[0:len(p)]) @@ -208,8 +207,18 @@ func (b *BodyBuffer) Read(p []byte) (int, error) { return len(data), io.EOF } -func NewBodyBuffer(bytes *[]byte) BodyBuffer { - return BodyBuffer{0, bytes} +func NewBodyBuffer(byt *[]byte, readFn ...func(*BodyBuffer, []byte) (int, error)) *BodyBuffer { + var fn func([]byte) (int, error) + b := &BodyBuffer{Data: byt} + if len(readFn) > 1 { + fn = func(p []byte) (int, error) { + return readFn[0](b, p) + } + } else { + fn = b.Reads + } + b.ReadFn = fn + return b } func PostClient(u string, types int, form map[string]any, a ...any) (cli *http.Client, req *http.Request, err error) { diff --git a/helper/slice/mockmap/map.go b/helper/slice/mockmap/map.go index d6fde61..23c0539 100644 --- a/helper/slice/mockmap/map.go +++ b/helper/slice/mockmap/map.go @@ -42,3 +42,6 @@ func (q *Map[K, T]) Del(name K) { slice.Delete((*[]Item[K, T])(q), i) } } +func (q *Map[K, T]) DelByIndex(i int) { + slice.Delete((*[]Item[K, T])(q), i) +} diff --git a/helper/slice/slice.go b/helper/slice/slice.go index 52db2ca..63ab629 100644 --- a/helper/slice/slice.go +++ b/helper/slice/slice.go @@ -22,6 +22,15 @@ func FilterAndMap[N any, T any](arr []T, fn func(T) (N, bool)) (r []N) { } return } +func FilterAndMaps[N any, T any](arr []T, fn func(int, T) (N, bool)) (r []N) { + for i, t := range arr { + x, ok := fn(i, t) + if ok { + r = append(r, x) + } + } + return +} func Walk[T any](arr []T, fn func(*T)) { for i := 0; i < len(arr); i++ { diff --git a/safety/vars.go b/safety/vars.go index b0a5fb8..d3c34c1 100644 --- a/safety/vars.go +++ b/safety/vars.go @@ -10,8 +10,12 @@ type Var[T any] struct { p unsafe.Pointer } -func NewVar[T any](val T) *Var[T] { - return &Var[T]{val: val, p: unsafe.Pointer(&val)} +func NewVar[T any](vals ...T) *Var[T] { + var v T + if len(vals) > 0 { + v = vals[0] + } + return &Var[T]{val: v, p: unsafe.Pointer(&v)} } func (r *Var[T]) Load() T { diff --git a/signs/signs.go b/signs/signs.go new file mode 100644 index 0000000..9a6adb1 --- /dev/null +++ b/signs/signs.go @@ -0,0 +1,135 @@ +package signs + +import ( + "github.com/fthvgb1/wp-go/helper" + "github.com/fthvgb1/wp-go/helper/slice" + "github.com/fthvgb1/wp-go/helper/slice/mockmap" + "os" + "os/signal" + "sync" +) + +type Call func() bool + +type HookFn func(mockmap.Item[string, Call]) (os.Signal, mockmap.Item[string, Call], bool) + +var queues = map[os.Signal]mockmap.Map[string, Call]{} + +var ch = make(chan os.Signal, 1) + +var stopCh = make(chan struct{}, 1) + +var hooks = map[os.Signal][]HookFn{} + +var mux = sync.Mutex{} + +func GetChannel() chan os.Signal { + return ch +} + +func Cancel(sings ...os.Signal) { + if len(sings) < 1 { + return + } + mux.Lock() + defer mux.Unlock() + for _, sing := range sings { + delete(queues, sing) + } + signal.Reset(sings...) +} + +func Hook(sign os.Signal, fn HookFn) { + mux.Lock() + defer mux.Unlock() + hooks[sign] = append(hooks[sign], fn) +} + +func hook(item []mockmap.Item[string, Call], sign os.Signal) []mockmap.Item[string, Call] { + mux.Lock() + defer mux.Unlock() + if len(item) < 1 { + delete(hooks, sign) + return item + } + hooksFn, ok := hooks[sign] + if !ok { + return item + } + for _, fn := range hooksFn { + item = slice.FilterAndMap(item, func(t mockmap.Item[string, Call]) (mockmap.Item[string, Call], bool) { + s, c, ok := fn(t) + if sign != s { + install(s, t.Value, t.Name, t.Order) + return t, false + } + return c, ok + }) + + } + delete(hooks, sign) + slice.SimpleSort(item, slice.DESC, func(t mockmap.Item[string, Call]) float64 { + return t.Order + }) + return item +} + +func Install(sign os.Signal, fn Call, a ...any) { + mux.Lock() + defer mux.Unlock() + arr := helper.ParseArgs([]os.Signal{}, a...) + if len(arr) > 0 { + for _, o := range arr { + install(o, fn, a...) + } + } + install(sign, fn, a...) +} +func install(sign os.Signal, fn Call, a ...any) { + m, ok := queues[sign] + if !ok { + queues[sign] = make(mockmap.Map[string, Call], 0) + signal.Notify(ch, sign) + } + m.Set(helper.ParseArgs("", a...), fn, helper.ParseArgs[float64](0, a...)) + queues[sign] = m +} + +func del(queue mockmap.Map[string, Call], sign os.Signal, i int) { + mux.Lock() + defer mux.Unlock() + queue.DelByIndex(i) + queues[sign] = queue +} + +func Stop() { + stopCh <- struct{}{} +} + +func Wait() { + for { + select { + case <-stopCh: + break + case sign := <-ch: + queue, ok := queues[sign] + if !ok { + break + } + queue = hook(queue, sign) + queues[sign] = queue + if len(queue) < 1 { + signal.Reset(sign) + continue + } + for i, item := range queue { + if !item.Value() { + del(queue, sign, i) + } + } + if len(queues[sign]) < 1 { + signal.Reset(sign) + } + } + } +}