optimize sign mechanism

This commit is contained in:
xing 2024-06-19 16:10:42 +08:00
parent dcbe760f09
commit 9c60d10568
7 changed files with 263 additions and 83 deletions

View File

@ -3,7 +3,7 @@ package main
import (
"flag"
"fmt"
"github.com/fthvgb1/wp-go/app/mail"
"github.com/fthvgb1/wp-go/app/ossigns"
"github.com/fthvgb1/wp-go/app/pkg/cache"
"github.com/fthvgb1/wp-go/app/pkg/config"
"github.com/fthvgb1/wp-go/app/pkg/db"
@ -14,14 +14,10 @@ import (
"github.com/fthvgb1/wp-go/app/theme"
"github.com/fthvgb1/wp-go/app/wpconfig"
"github.com/fthvgb1/wp-go/cache/cachemanager"
"github.com/fthvgb1/wp-go/cache/reload"
"github.com/fthvgb1/wp-go/model"
"log"
"os"
"os/signal"
"regexp"
"strings"
"syscall"
"time"
)
@ -89,52 +85,6 @@ func cronClearCache() {
}
}
func flushCache() {
defer func() {
if r := recover(); r != nil {
err := mail.SendMail([]string{config.GetConfig().Mail.User}, "清空缓存失败", fmt.Sprintf("err:[%s]", r))
logs.IfError(err, "发邮件失败")
}
}()
cachemanager.Flush()
log.Println("all cache flushed")
}
func reloads() {
defer func() {
if r := recover(); r != nil {
log.Println(r)
}
}()
err := config.InitConfig(confPath)
logs.IfError(err, "获取配置文件失败", confPath)
err = logs.InitLogger()
logs.IfError(err, "日志配置错误")
_, err = db.InitDb()
logs.IfError(err, "重新读取db失败", config.GetConfig().Mysql)
err = wpconfig.InitOptions()
logs.IfError(err, "获取网站设置WpOption失败")
err = wpconfig.InitTerms()
logs.IfError(err, "获取WpTerms表失败")
wphandle.LoadPlugins()
reload.Reloads("themeArgAndConfig")
flushCache()
log.Println("reload complete")
}
func signalNotify() {
c := make(chan os.Signal)
signal.Notify(c, syscall.SIGUSR1, syscall.SIGUSR2)
for {
switch <-c {
case syscall.SIGUSR1:
go reloads()
case syscall.SIGUSR2:
go flushCache()
}
}
}
func main() {
defer func() {
if r := recover(); r != nil {
@ -143,7 +93,8 @@ func main() {
}
}()
inits()
go signalNotify()
ossigns.SetConfPath(confPath)
go ossigns.SignalNotify()
Gin := route.SetupRouter()
c := config.GetConfig()
if c.Ssl.Key != "" && c.Ssl.Cert != "" {

69
app/ossigns/sings.go Normal file
View File

@ -0,0 +1,69 @@
package ossigns
import (
"fmt"
"github.com/fthvgb1/wp-go/app/mail"
"github.com/fthvgb1/wp-go/app/pkg/config"
"github.com/fthvgb1/wp-go/app/pkg/db"
"github.com/fthvgb1/wp-go/app/pkg/logs"
"github.com/fthvgb1/wp-go/app/plugins/wphandle"
"github.com/fthvgb1/wp-go/app/wpconfig"
"github.com/fthvgb1/wp-go/cache/cachemanager"
"github.com/fthvgb1/wp-go/cache/reload"
"github.com/fthvgb1/wp-go/signs"
"log"
"syscall"
)
var confPath string
func SetConfPath(path string) {
confPath = path
}
func FlushCache() {
defer func() {
if r := recover(); r != nil {
err := mail.SendMail([]string{config.GetConfig().Mail.User}, "清空缓存失败", fmt.Sprintf("err:[%s]", r))
logs.IfError(err, "发邮件失败")
}
}()
cachemanager.Flush()
log.Println("all cache flushed")
}
func Reloads() {
defer func() {
if r := recover(); r != nil {
log.Println(r)
}
}()
err := config.InitConfig(confPath)
logs.IfError(err, "获取配置文件失败", confPath)
err = logs.InitLogger()
logs.IfError(err, "日志配置错误")
_, err = db.InitDb()
logs.IfError(err, "重新读取db失败", config.GetConfig().Mysql)
err = wpconfig.InitOptions()
logs.IfError(err, "获取网站设置WpOption失败")
err = wpconfig.InitTerms()
logs.IfError(err, "获取WpTerms表失败")
wphandle.LoadPlugins()
reload.Reloads("themeArgAndConfig")
FlushCache()
log.Println("reload complete")
}
func SignalNotify() {
rel := func() bool {
go Reloads()
return true
}
flu := func() bool {
go FlushCache()
return true
}
signs.Install(syscall.SIGUSR1, rel, "reload")
signs.Install(syscall.SIGUSR2, flu, "flush")
signs.Wait()
}

View File

@ -153,12 +153,20 @@ func SetBody(req *http.Request, types int, form map[string]any) (err error) {
return
}
bb, ok := maps.GetStrAnyVal[*[]byte](form, "binary")
if !ok {
return errors.New("no binary value")
}
req.Body = io.NopCloser(&BodyBuffer{0, bb})
if ok {
req.Body = io.NopCloser(NewBodyBuffer(bb))
req.Header.Add("Content-Type", "application/octet-stream")
req.ContentLength = int64(len(*bb))
return
}
bf, ok := maps.GetStrAnyVal[*BodyBuffer](form, "binary")
if ok {
req.Body = bf
req.Header.Add("Content-Type", "application/octet-stream")
req.ContentLength = int64(len(*bf.Data))
return
}
return errors.New("no binary value")
}
return
@ -167,32 +175,23 @@ func SetBody(req *http.Request, types int, form map[string]any) (err error) {
type BodyBuffer struct {
Offset int
Data *[]byte
ReadFn func([]byte) (int, error)
CloseFn func() error
}
func (b *BodyBuffer) Write(p []byte) (int, error) {
if b.Offset == 0 {
copy(*b.Data, p)
if len(p) <= len(*b.Data) {
b.Offset += len(p)
return len(p), nil
func (b *BodyBuffer) Close() error {
if b.CloseFn != nil {
return b.CloseFn()
}
b.Offset += len(*b.Data)
return len(*b.Data), nil
}
if len(p)+b.Offset <= len(*b.Data) {
copy((*b.Data)[b.Offset:b.Offset+len(p)], p)
b.Offset += len(p)
return len(p), nil
}
l := len(*b.Data) - b.Offset
if l <= 0 {
return 0, nil
}
copy((*b.Data)[b.Offset:], p[:l])
return l, nil
b.Offset = 0
return nil
}
func (b *BodyBuffer) Read(p []byte) (int, error) {
return b.ReadFn(p)
}
func (b *BodyBuffer) Reads(p []byte) (int, error) {
data := (*b.Data)[b.Offset:]
if len(p) <= len(data) {
copy(p, data[0:len(p)])
@ -208,8 +207,18 @@ func (b *BodyBuffer) Read(p []byte) (int, error) {
return len(data), io.EOF
}
func NewBodyBuffer(bytes *[]byte) BodyBuffer {
return BodyBuffer{0, bytes}
func NewBodyBuffer(byt *[]byte, readFn ...func(*BodyBuffer, []byte) (int, error)) *BodyBuffer {
var fn func([]byte) (int, error)
b := &BodyBuffer{Data: byt}
if len(readFn) > 1 {
fn = func(p []byte) (int, error) {
return readFn[0](b, p)
}
} else {
fn = b.Reads
}
b.ReadFn = fn
return b
}
func PostClient(u string, types int, form map[string]any, a ...any) (cli *http.Client, req *http.Request, err error) {

View File

@ -42,3 +42,6 @@ func (q *Map[K, T]) Del(name K) {
slice.Delete((*[]Item[K, T])(q), i)
}
}
func (q *Map[K, T]) DelByIndex(i int) {
slice.Delete((*[]Item[K, T])(q), i)
}

View File

@ -22,6 +22,15 @@ func FilterAndMap[N any, T any](arr []T, fn func(T) (N, bool)) (r []N) {
}
return
}
func FilterAndMaps[N any, T any](arr []T, fn func(int, T) (N, bool)) (r []N) {
for i, t := range arr {
x, ok := fn(i, t)
if ok {
r = append(r, x)
}
}
return
}
func Walk[T any](arr []T, fn func(*T)) {
for i := 0; i < len(arr); i++ {

View File

@ -10,8 +10,12 @@ type Var[T any] struct {
p unsafe.Pointer
}
func NewVar[T any](val T) *Var[T] {
return &Var[T]{val: val, p: unsafe.Pointer(&val)}
func NewVar[T any](vals ...T) *Var[T] {
var v T
if len(vals) > 0 {
v = vals[0]
}
return &Var[T]{val: v, p: unsafe.Pointer(&v)}
}
func (r *Var[T]) Load() T {

135
signs/signs.go Normal file
View File

@ -0,0 +1,135 @@
package signs
import (
"github.com/fthvgb1/wp-go/helper"
"github.com/fthvgb1/wp-go/helper/slice"
"github.com/fthvgb1/wp-go/helper/slice/mockmap"
"os"
"os/signal"
"sync"
)
type Call func() bool
type HookFn func(mockmap.Item[string, Call]) (os.Signal, mockmap.Item[string, Call], bool)
var queues = map[os.Signal]mockmap.Map[string, Call]{}
var ch = make(chan os.Signal, 1)
var stopCh = make(chan struct{}, 1)
var hooks = map[os.Signal][]HookFn{}
var mux = sync.Mutex{}
func GetChannel() chan os.Signal {
return ch
}
func Cancel(sings ...os.Signal) {
if len(sings) < 1 {
return
}
mux.Lock()
defer mux.Unlock()
for _, sing := range sings {
delete(queues, sing)
}
signal.Reset(sings...)
}
func Hook(sign os.Signal, fn HookFn) {
mux.Lock()
defer mux.Unlock()
hooks[sign] = append(hooks[sign], fn)
}
func hook(item []mockmap.Item[string, Call], sign os.Signal) []mockmap.Item[string, Call] {
mux.Lock()
defer mux.Unlock()
if len(item) < 1 {
delete(hooks, sign)
return item
}
hooksFn, ok := hooks[sign]
if !ok {
return item
}
for _, fn := range hooksFn {
item = slice.FilterAndMap(item, func(t mockmap.Item[string, Call]) (mockmap.Item[string, Call], bool) {
s, c, ok := fn(t)
if sign != s {
install(s, t.Value, t.Name, t.Order)
return t, false
}
return c, ok
})
}
delete(hooks, sign)
slice.SimpleSort(item, slice.DESC, func(t mockmap.Item[string, Call]) float64 {
return t.Order
})
return item
}
func Install(sign os.Signal, fn Call, a ...any) {
mux.Lock()
defer mux.Unlock()
arr := helper.ParseArgs([]os.Signal{}, a...)
if len(arr) > 0 {
for _, o := range arr {
install(o, fn, a...)
}
}
install(sign, fn, a...)
}
func install(sign os.Signal, fn Call, a ...any) {
m, ok := queues[sign]
if !ok {
queues[sign] = make(mockmap.Map[string, Call], 0)
signal.Notify(ch, sign)
}
m.Set(helper.ParseArgs("", a...), fn, helper.ParseArgs[float64](0, a...))
queues[sign] = m
}
func del(queue mockmap.Map[string, Call], sign os.Signal, i int) {
mux.Lock()
defer mux.Unlock()
queue.DelByIndex(i)
queues[sign] = queue
}
func Stop() {
stopCh <- struct{}{}
}
func Wait() {
for {
select {
case <-stopCh:
break
case sign := <-ch:
queue, ok := queues[sign]
if !ok {
break
}
queue = hook(queue, sign)
queues[sign] = queue
if len(queue) < 1 {
signal.Reset(sign)
continue
}
for i, item := range queue {
if !item.Value() {
del(queue, sign, i)
}
}
if len(queues[sign]) < 1 {
signal.Reset(sign)
}
}
}
}