diff --git a/main.go b/main.go index ae76cc0..b98f637 100644 --- a/main.go +++ b/main.go @@ -6,26 +6,44 @@ import ( "github.com/gorilla/websocket" "log" "net/http" + "regexp" + "strconv" + "strings" "time" ) type fetchData struct { - url string - title string - desc string - date string + Url string `json:"url"` + Title string `json:"title"` + Desc string `json:"desc"` + Date string `json:"date"` } + +type connChan struct { + conn string + msg message +} +type dataChan struct { + conn string + item []fetchData +} + type fetchHandler struct { fetchUrl string hadFetchData []fetchData - cronTime time.Duration - keyword string + cronTime map[string]time.Duration + keyword map[string]string hadFetchedMap map[string]int - reloadCron chan int + reloadCron map[string]chan int isOff chan int - ws *websocket.Conn - rMsgChan chan message - newFetchItem chan []fetchData + rMsgChan chan connChan + newFetchItem chan dataChan + connMap map[string]*websocket.Conn +} + +type setting struct { + Keyword string `json:"keyword"` + TimeStep int `json:"timeStep"` } type message struct { @@ -35,39 +53,53 @@ type message struct { Data interface{} } -func newFetchHandler(fetchUrl, keyword string) *fetchHandler { +func newFetchHandler(fetchUrl string) *fetchHandler { return &fetchHandler{ fetchUrl: fetchUrl, - keyword: keyword, + keyword: make(map[string]string), hadFetchedMap: make(map[string]int), - cronTime: 60 * time.Second, - reloadCron: make(chan int), + cronTime: make(map[string]time.Duration), + reloadCron: make(map[string]chan int), isOff: make(chan int), - rMsgChan: make(chan message, 10), - newFetchItem: make(chan []fetchData, 10), + rMsgChan: make(chan connChan, 10), + newFetchItem: make(chan dataChan, 10), + connMap: make(map[string]*websocket.Conn), } } -func (f *fetchHandler) handle() { - f.parsesDom(f.fetch(f.fetchUrl + f.keyword)) +func (f *fetchHandler) handle(conn string) { + key := "纪检" + if kk, ok := f.keyword[conn]; ok && kk != "" { + key = kk + } + f.parsesDom(f.fetch(f.fetchUrl+key), conn) } func (f *fetchHandler) receiveMsg() { for { r := <-f.rMsgChan - switch r.Action { + switch r.msg.Action { case "search": - f.handle() - case "timeStepSet": - if t, ok := r.Data.(int); ok { - f.reloadCron <- t + if t, ok := r.msg.Data.(*setting); ok { + f.reloadCron[r.conn] <- t.TimeStep + f.keyword[r.conn] = t.Keyword + f.handle(r.conn) } + case "timeStepSet": + if t, ok := r.msg.Data.(int); ok { + f.reloadCron[r.conn] <- t + } } } } func (f *fetchHandler) fetch(url string) *http.Response { + defer func() { + if r := recover(); r != nil { + log.Println(r) + } + }() client := http.Client{ Transport: nil, CheckRedirect: nil, @@ -94,33 +126,55 @@ func (f *fetchHandler) fetch(url string) *http.Response { req.Header.Add("postman-token", "81407fbc-2b96-54a7-0193-f640156714ab") response, err := client.Do(req) + if err != nil { panic(err) } return response } -func (f *fetchHandler) parsesDom(html *http.Response) { +func (f *fetchHandler) parsesDom(html *http.Response, conn string) { + defer func() { + if r := recover(); r != nil { + log.Println(r) + } + }() doc, err := goquery.NewDocumentFromReader(html.Body) if err != nil { panic(err) } + var newFetch []fetchData + ti := time.Now() + compile := regexp.MustCompile(`(\d+)`) + doc.Find("div[class=\"result-op c-container xpath-log new-pmd\"]").Each(func(i int, selection *goquery.Selection) { data := fetchData{} - data.url, _ = selection.Attr("mu") + data.Url, _ = selection.Attr("mu") t := selection.Find(".news-title-font_1xS-F").First() - data.title = t.Text() - data.desc = selection.Find(".c-row .c-color-text").First().Text() - k := data.url + "_" + data.title - var newFetch []fetchData + data.Title = t.Text() + data.Desc = selection.Find(".c-row .c-color-text").First().Text() + data.Date = selection.Find("span[class=\"c-color-gray2 c-font-normal c-gap-right-xsmall\"]").First().Text() + n := compile.FindAllStringSubmatch(data.Date, -1) + nn, _ := strconv.Atoi(n[0][0]) + if strings.Contains(data.Date, "小时") { + data.Date = ti.Add(-time.Duration(nn) * time.Hour).Format("2006-01-02 15:04") + } + if strings.Contains(data.Date, "分钟") { + data.Date = ti.Add(-time.Duration(nn) * time.Minute).Format("2006-01-02 15:04") + } + k := conn + "_" + data.Url + "_" + data.Title if _, ok := f.hadFetchedMap[k]; !ok { f.hadFetchData = append(f.hadFetchData, data) f.hadFetchedMap[k] = 1 newFetch = append(newFetch, data) } - f.newFetchItem <- newFetch }) - + if len(newFetch) > 0 { + f.newFetchItem <- dataChan{ + conn: conn, + item: newFetch, + } + } err = html.Body.Close() if err != nil { panic(err) @@ -130,11 +184,12 @@ func (f *fetchHandler) parsesDom(html *http.Response) { func (f *fetchHandler) sendFetchData() { for { data := <-f.newFetchItem - err := f.ws.WriteJSON(message{ + + err := f.connMap[data.conn].WriteJSON(message{ Status: true, Action: "newData", Message: "", - Data: data, + Data: data.item, }) if err != nil { log.Println(err) @@ -142,34 +197,46 @@ func (f *fetchHandler) sendFetchData() { } } -func (f *fetchHandler) cronFetch() { - t := time.NewTicker(f.cronTime) +func (f *fetchHandler) cronFetch(conn string, c chan int) { + step, ok := f.cronTime[conn] + if !ok { + step = time.Second * 60 + } + t := time.NewTicker(step) + if _, ok := f.cronTime[conn]; !ok { + f.reloadCron[conn] = make(chan int) + } defer t.Stop() for { select { case <-t.C: - f.handle() - case tt := <-f.reloadCron: - f.cronTime = time.Duration(tt) * time.Second - go f.cronFetch() + f.handle(conn) + case tt := <-f.reloadCron[conn]: + f.cronTime[conn] = time.Duration(tt) * time.Second + go f.cronFetch(conn, c) return - case <-f.isOff: + case <-c: + close(c) return } } } func main() { - h := newFetchHandler("https://www.baidu.com/s?rtt=1&bsst=1&cl=2&tn=news&rsv_dl=ns_pc&word=", "纪检") + h := newFetchHandler("https://www.baidu.com/s?rtt=1&bsst=1&cl=2&tn=news&rsv_dl=ns_pc&word=") router := gin.Default() - var upgrader = websocket.Upgrader{} + var upgrader = websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { + return true + }, + } + go h.sendFetchData() + go h.receiveMsg() router.LoadHTMLGlob("templates/*") //router.LoadHTMLFiles("templates/template1.html", "templates/template2.html") router.GET("/index", func(c *gin.Context) { c.HTML(http.StatusOK, "index.gohtml", gin.H{ - "title": "爬虫", - "keyword": h.keyword, - "timeStep": h.cronTime.Seconds(), + "title": "爬虫", }) }) router.GET("ws", func(c *gin.Context) { @@ -181,17 +248,39 @@ func main() { Data: nil, Action: "upgradeWs", }) + log.Println(err) return } - h.ws = conn - msg := message{} - for { - err := conn.ReadJSON(msg) - if err != nil { - return - } - h.rMsgChan <- msg + remote := conn.RemoteAddr().String() + if _, ok := h.connMap[remote]; !ok { + h.connMap[remote] = conn } + cc := make(chan int) + go h.cronFetch(remote, cc) + go func() { + msg := connChan{ + conn: remote, + msg: message{ + Data: &setting{}, + }, + } + for { + err := conn.ReadJSON(&msg.msg) + if err != nil { + if !websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) { + delete(h.connMap, remote) + cc <- 1 + return + } + log.Println(err) + } else { + h.rMsgChan <- msg + } + } + }() }) - router.Run(":8080") + err := router.Run(":8080") + if err != nil { + panic(err) + } }