This commit is contained in:
xing 2022-07-06 17:03:42 +08:00
parent 28f4c8f799
commit f6394cf1b3

83
main.go
View File

@ -18,6 +18,7 @@ type fetchData struct {
Title string `json:"title"` Title string `json:"title"`
Desc string `json:"desc"` Desc string `json:"desc"`
Date string `json:"date"` Date string `json:"date"`
CreatedTime string `json:"created_time"`
} }
type connChan struct { type connChan struct {
@ -32,14 +33,14 @@ type dataChan struct {
type fetchHandler struct { type fetchHandler struct {
fetchUrl string fetchUrl string
hadFetchData []fetchData hadFetchData []fetchData
cronTime map[string]time.Duration cronTime mapXS[time.Duration]
keyword map[string]string keyword mapXS[string]
hadFetchedMap map[string]int hadFetchedMap mapXS[int]
reloadCron map[string]chan int reloadCron mapXS[chan int]
isOff chan int isOff chan int
rMsgChan chan connChan rMsgChan chan connChan
newFetchItem chan dataChan newFetchItem chan dataChan
connMap map[string]*websocket.Conn connMap mapXS[*websocket.Conn]
} }
type setting struct { type setting struct {
@ -54,15 +55,24 @@ type message struct {
Data interface{} Data interface{}
} }
func setMap[T string | time.Duration | int](obj *mapXS[T], key string, v T) { func setMap[T mapT](obj *mapXS[T], key string, v T) {
obj.Lock() obj.Lock()
(*obj.mapX)[key] = v (*obj.mapX)[key] = v
obj.Unlock() obj.Unlock()
} }
type mapX[T string | int | time.Duration] map[string]T func delMap[T mapT](obj *mapXS[T], key string) {
obj.Lock()
delete(*obj.mapX, key)
obj.Unlock()
}
type mapXS[T string | int | time.Duration] struct { type mapT interface {
string | int | time.Duration | *websocket.Conn | chan int
}
type mapX[T mapT] map[string]T
type mapXS[T mapT] struct {
*mapX[T] *mapX[T]
*sync.Mutex *sync.Mutex
} }
@ -70,20 +80,35 @@ type mapXS[T string | int | time.Duration] struct {
func newFetchHandler(fetchUrl string) *fetchHandler { func newFetchHandler(fetchUrl string) *fetchHandler {
return &fetchHandler{ return &fetchHandler{
fetchUrl: fetchUrl, fetchUrl: fetchUrl,
keyword: make(map[string]string), keyword: mapXS[string]{
hadFetchedMap: make(map[string]int), &mapX[string]{},
cronTime: make(map[string]time.Duration), &sync.Mutex{},
reloadCron: make(map[string]chan int), },
hadFetchedMap: mapXS[int]{
&mapX[int]{},
&sync.Mutex{},
},
cronTime: mapXS[time.Duration]{
&mapX[time.Duration]{},
&sync.Mutex{},
},
reloadCron: mapXS[chan int]{
&mapX[chan int]{},
&sync.Mutex{},
},
isOff: make(chan int), isOff: make(chan int),
rMsgChan: make(chan connChan, 10), rMsgChan: make(chan connChan, 10),
newFetchItem: make(chan dataChan, 10), newFetchItem: make(chan dataChan, 10),
connMap: make(map[string]*websocket.Conn), connMap: mapXS[*websocket.Conn]{
&mapX[*websocket.Conn]{},
&sync.Mutex{},
},
} }
} }
func (f *fetchHandler) handle(conn string) { func (f *fetchHandler) handle(conn string) {
key := "纪检" key := "纪检"
if kk, ok := f.keyword[conn]; ok && kk != "" { if kk, ok := (*f.keyword.mapX)[conn]; ok && kk != "" {
key = kk key = kk
} }
f.parsesDom(f.fetch(f.fetchUrl+key), conn) f.parsesDom(f.fetch(f.fetchUrl+key), conn)
@ -95,8 +120,8 @@ func (f *fetchHandler) receiveMsg() {
switch r.msg.Action { switch r.msg.Action {
case "search": case "search":
if t, ok := r.msg.Data.(*setting); ok { if t, ok := r.msg.Data.(*setting); ok {
f.reloadCron[r.conn] <- t.TimeStep (*f.reloadCron.mapX)[r.conn] <- t.TimeStep
f.keyword[r.conn] = t.Keyword setMap[string](&f.keyword, r.conn, t.Keyword)
f.handle(r.conn) f.handle(r.conn)
} }
} }
@ -154,7 +179,6 @@ func (f *fetchHandler) parsesDom(html *http.Response, conn string) {
} }
var newFetch []fetchData var newFetch []fetchData
ti := time.Now() ti := time.Now()
log.Println(ti.Format("2006-01-02 15:04:05"))
compile := regexp.MustCompile(`(\d+)`) compile := regexp.MustCompile(`(\d+)`)
doc.Find("div[class=\"result-op c-container xpath-log new-pmd\"]").Each(func(i int, selection *goquery.Selection) { doc.Find("div[class=\"result-op c-container xpath-log new-pmd\"]").Each(func(i int, selection *goquery.Selection) {
@ -162,6 +186,7 @@ func (f *fetchHandler) parsesDom(html *http.Response, conn string) {
data.Url, _ = selection.Attr("mu") data.Url, _ = selection.Attr("mu")
t := selection.Find(".news-title-font_1xS-F").First() t := selection.Find(".news-title-font_1xS-F").First()
data.Title = t.Text() data.Title = t.Text()
data.CreatedTime = ti.Format("2006-01-02 15:04:05")
data.Desc = selection.Find(".c-row .c-color-text").First().Text() data.Desc = selection.Find(".c-row .c-color-text").First().Text()
data.Date = selection.Find("span[class=\"c-color-gray2 c-font-normal c-gap-right-xsmall\"]").First().Text() data.Date = selection.Find("span[class=\"c-color-gray2 c-font-normal c-gap-right-xsmall\"]").First().Text()
n := compile.FindAllStringSubmatch(data.Date, -1) n := compile.FindAllStringSubmatch(data.Date, -1)
@ -176,9 +201,9 @@ func (f *fetchHandler) parsesDom(html *http.Response, conn string) {
} }
k := conn + "_" + data.Url + "_" + data.Title k := conn + "_" + data.Url + "_" + data.Title
if _, ok := f.hadFetchedMap[k]; !ok { if _, ok := (*f.hadFetchedMap.mapX)[k]; !ok {
f.hadFetchData = append(f.hadFetchData, data) f.hadFetchData = append(f.hadFetchData, data)
f.hadFetchedMap[k] = 1 setMap(&f.hadFetchedMap, k, 1)
newFetch = append(newFetch, data) newFetch = append(newFetch, data)
} }
}) })
@ -198,7 +223,7 @@ func (f *fetchHandler) sendFetchData() {
for { for {
data := <-f.newFetchItem data := <-f.newFetchItem
err := f.connMap[data.conn].WriteJSON(message{ err := (*f.connMap.mapX)[data.conn].WriteJSON(message{
Status: true, Status: true,
Action: "newData", Action: "newData",
Message: "", Message: "",
@ -211,21 +236,21 @@ func (f *fetchHandler) sendFetchData() {
} }
func (f *fetchHandler) cronFetch(conn string, c chan int) { func (f *fetchHandler) cronFetch(conn string, c chan int) {
step, ok := f.cronTime[conn] step, ok := (*f.cronTime.mapX)[conn]
if !ok { if !ok {
step = time.Second * 60 step = time.Second * 60
} }
t := time.NewTicker(step) t := time.NewTicker(step)
if _, ok := f.cronTime[conn]; !ok { if _, ok := (*f.cronTime.mapX)[conn]; !ok {
f.reloadCron[conn] = make(chan int) setMap(&f.reloadCron, conn, make(chan int))
} }
defer t.Stop() defer t.Stop()
for { for {
select { select {
case <-t.C: case <-t.C:
f.handle(conn) f.handle(conn)
case tt := <-f.reloadCron[conn]: case tt := <-(*f.reloadCron.mapX)[conn]:
f.cronTime[conn] = time.Duration(tt) * time.Second setMap(&f.cronTime, conn, time.Duration(tt)*time.Second)
go f.cronFetch(conn, c) go f.cronFetch(conn, c)
return return
case <-c: case <-c:
@ -265,8 +290,8 @@ func main() {
return return
} }
remote := conn.RemoteAddr().String() remote := conn.RemoteAddr().String()
if _, ok := h.connMap[remote]; !ok { if _, ok := (*h.connMap.mapX)[remote]; !ok {
h.connMap[remote] = conn setMap(&h.connMap, remote, conn)
} }
cc := make(chan int) cc := make(chan int)
go h.cronFetch(remote, cc) go h.cronFetch(remote, cc)
@ -280,8 +305,8 @@ func main() {
for { for {
err := conn.ReadJSON(&msg.msg) err := conn.ReadJSON(&msg.msg)
if err != nil { if err != nil {
if _, ok := h.connMap[remote]; ok && !websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) { if _, ok := (*h.connMap.mapX)[remote]; ok && !websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) {
delete(h.connMap, remote) delMap(&h.connMap, remote)
cc <- 1 cc <- 1
return return
} }