package main import ( "embed" "encoding/json" "errors" "fmt" "github.com/PuerkitoBio/goquery" "github.com/gin-gonic/gin" "github.com/gorilla/websocket" "github/fthvgb1/newsfetch/data" "github/fthvgb1/newsfetch/newsource" "io/fs" "io/ioutil" "log" "net/http" "net/http/cookiejar" "net/url" "os/exec" "path/filepath" "reflect" "runtime" "runtime/debug" "strings" "sync" "time" ) type connChan struct { conn string msg message } type dataChan struct { conn string item []data.FetchData } type fetchHandler struct { hadFetchData []data.FetchData cronTime mapXS[time.Duration] keyword mapXS[string] searchSource mapXS[[]string] hadFetchedMap mapXS[int] reloadCron mapXS[chan int] isOff chan int rMsgChan chan connChan newFetchItem chan dataChan connMap mapXS[*websocket.Conn] sourceMap map[string]newsource.Source sourceArr []string } type setting struct { Keyword string `json:"keyword"` TimeStep int `json:"timeStep"` SearchSource []string `json:"searchSource"` } type message struct { Status bool Action string Message string Data interface{} } type dist struct { embed.FS path string } //go:embed dist/* var st embed.FS func (r dist) Open(name string) (fs.File, error) { if filepath.Separator != '/' && strings.ContainsRune(name, filepath.Separator) { return nil, errors.New("http: invalid character in file path") } fullName := strings.TrimLeft(name, "/") prifix := strings.Split(fullName, ".") l := len(prifix) p := prifix[l-1] if p == "js" || p == "css" { fullName = p + "/" + fullName } else if p == "map" { fullName = "js/" + fullName } fullName = r.path + "/" + fullName file, err := r.FS.Open(fullName) return file, err } func isContain[T comparable](i T, arr []T) bool { for _, t := range arr { if i == t { return true } } return false } func setMap[T mapT](obj *mapXS[T], key string, v T) { obj.Lock() (*obj.mapX)[key] = v obj.Unlock() } func delMap[T mapT](obj *mapXS[T], key string) { obj.Lock() delete(*obj.mapX, key) obj.Unlock() } type mapT interface { string | []string | int | time.Duration | *websocket.Conn | chan int } type mapX[T mapT] map[string]T type mapXS[T mapT] struct { *mapX[T] *sync.Mutex } func newFetchHandler() *fetchHandler { var arr = make(map[string]newsource.Source) var x []string for _, source := range newsource.GetSource() { arr[source.Name] = source x = append(x, source.Name) } return &fetchHandler{ sourceMap: arr, sourceArr: x, keyword: mapXS[string]{ &mapX[string]{}, &sync.Mutex{}, }, hadFetchedMap: mapXS[int]{ &mapX[int]{}, &sync.Mutex{}, }, cronTime: mapXS[time.Duration]{ &mapX[time.Duration]{}, &sync.Mutex{}, }, searchSource: mapXS[[]string]{ &mapX[[]string]{}, &sync.Mutex{}, }, reloadCron: mapXS[chan int]{ &mapX[chan int]{}, &sync.Mutex{}, }, isOff: make(chan int), rMsgChan: make(chan connChan, 10), newFetchItem: make(chan dataChan, 10), connMap: mapXS[*websocket.Conn]{ &mapX[*websocket.Conn]{}, &sync.Mutex{}, }, } } func (f *fetchHandler) handle(conn string) { key := "纪检" if kk, ok := (*f.keyword.mapX)[conn]; ok && kk != "" { key = kk } for _, sourceName := range (*f.searchSource.mapX)[conn] { source := f.sourceMap[sourceName] r := f.fetch2(source, key) if r != nil { if strings.ToUpper(source.Type) == "HTML" { f.parsesDom(r, conn, source) } else { f.parseAjax(r, source, conn) } } } } func (f *fetchHandler) receiveMsg() { for { r := <-f.rMsgChan switch r.msg.Action { case "search": if t, ok := r.msg.Data.(*setting); ok { setMap[string](&f.keyword, r.conn, t.Keyword) setMap[[]string](&f.searchSource, r.conn, t.SearchSource) f.handle(r.conn) (*f.reloadCron.mapX)[r.conn] <- t.TimeStep } } } } func (f *fetchHandler) fetch2(source newsource.Source, key string) *http.Response { defer func() { if r := recover(); r != nil { log.Printf("err:%s. stack:%s", r, debug.Stack()) } }() jar, _ := cookiejar.New(nil) client := http.Client{ Transport: nil, CheckRedirect: nil, Jar: jar, Timeout: 10 * time.Second, } searchUrl := source.SearchUrl source.Method = strings.ToUpper(source.Method) if source.Method == "GET" && source.KeywordField != "" { if !strings.Contains(searchUrl, "?") { searchUrl += "?" + source.KeywordField + "=" + url.QueryEscape(key) } else { searchUrl += "&" + source.KeywordField + "=" + url.QueryEscape(key) } } else if source.Method == "GET" && source.KeywordField == "" { if strings.Contains(searchUrl, "${keyword}") { searchUrl = strings.Replace(searchUrl, "${keyword}", url.QueryEscape(key), -1) } else { searchUrl += url.QueryEscape(key) } } var req *http.Request if source.Method == "POST" { body := "" if nil != source.ExternParam { if source.IsJson { t := source.ExternParam t[source.KeywordField] = key bytes, err := json.Marshal(t) if err != nil { log.Printf("build post json param err:[%s]", err) return nil } body = string(bytes) } else { body = source.KeywordField + "=" + key body += "&" for s, s2 := range source.ExternParam { body += s + "=" + s2 + "&" } body = strings.TrimRight(body, "&") } } req, _ = http.NewRequest(source.Method, searchUrl, strings.NewReader(body)) req.Header.Set("Content-Type", "application/x-www-form-urlencoded") } else { req, _ = http.NewRequest(source.Method, searchUrl, nil) } if source.Header != nil { for s, s2 := range source.Header { req.Header.Set(s, s2) } } if source.HeaderFun != nil { source.HeaderFun(req) } client.CheckRedirect = func(req *http.Request, via []*http.Request) error { if len(via) > 0 && via[0].URL.Scheme == "https" && req.URL.Scheme != "https" { lastHop := via[len(via)-1].URL return fmt.Errorf("redirected from secure URL %s to insecure URL %s", lastHop, req.URL) } // Go's http.DefaultClient allows 10 redirects before returning an error. // The securityPreservingHTTPClient also uses this default policy to avoid // Go command hangs. if len(via) >= 3 { return errors.New("stopped after 3 redirects") } return nil } response, err := client.Do(req) if err != nil { log.Printf("request %s err: %s", req.URL, err) return nil } return response } func (f *fetchHandler) parseAjax(response *http.Response, source newsource.Source, conn string) { defer func() { if r := recover(); r != nil { log.Printf("parse ajax response err[%s]. stack:[%s]", r, debug.Stack()) } }() nowDate := time.Now().Format("2006-01-02 15:04:06") var newFetch []data.FetchData if source.AjaxSimpleDeal != nil && source.AjaxDealFun == nil { bytes, err := ioutil.ReadAll(response.Body) if err != nil { log.Printf("read response body err:[%s]", err) return } if source.AjaxSimpleDeal != nil && source.Target != nil { dst := reflect.New(source.Target).Elem() err = json.Unmarshal(bytes, dst.Addr().Interface()) if err != nil { log.Printf("jsondecode err:[%s]", err) return } source.AjaxSimpleDeal(dst.Interface(), &newFetch) } } else if source.AjaxDealFun != nil && source.AjaxSimpleDeal == nil { source.AjaxDealFun(&newFetch, response) } if len(newFetch) > 0 { var newF []data.FetchData for i := 0; i < len(newFetch); i++ { fetchData := newFetch[i] k := conn + "_" + fetchData.Url + "_" + fetchData.Title if newFetch[i].CreatedTime == "" { newFetch[i].CreatedTime = nowDate } if newFetch[i].Source == "" { newFetch[i].Source = source.Name } if _, ok := (*f.hadFetchedMap.mapX)[k]; !ok { f.hadFetchData = append(f.hadFetchData, fetchData) setMap(&f.hadFetchedMap, k, 1) newF = append(newF, newFetch[i]) } } f.newFetchItem <- dataChan{ conn: conn, item: newF, } } err := response.Body.Close() if err != nil { panic(err) } } func (f *fetchHandler) parsesDom(html *http.Response, conn string, source newsource.Source) { defer func() { if r := recover(); r != nil { log.Printf("parse html err:[%s]. stack:[%s]", r, debug.Stack()) } }() doc, err := goquery.NewDocumentFromReader(html.Body) if err != nil { panic(err) } var newFetch []data.FetchData nowDate := time.Now().Format("2006-01-02 15:04:05") doc.Find(source.ListQuery).Each(func(i int, selection *goquery.Selection) { fetchData := data.FetchData{ CreatedTime: nowDate, Source: source.Name, } source.QueryHandler(i, selection, &fetchData) k := conn + "_" + fetchData.Url + "_" + fetchData.Title if _, ok := (*f.hadFetchedMap.mapX)[k]; !ok { f.hadFetchData = append(f.hadFetchData, fetchData) setMap(&f.hadFetchedMap, k, 1) newFetch = append(newFetch, fetchData) } }) if len(newFetch) > 0 { f.newFetchItem <- dataChan{ conn: conn, item: newFetch, } } err = html.Body.Close() if err != nil { panic(err) } } func (f *fetchHandler) sendFetchData() { for { dataFetch := <-f.newFetchItem err := (*f.connMap.mapX)[dataFetch.conn].WriteJSON(message{ Status: true, Action: "newData", Message: "", Data: dataFetch.item, }) if err != nil { log.Printf("send new fetch data err:[%s]", err) } } } func (f *fetchHandler) cronFetch(conn string, c chan int) { step, ok := (*f.cronTime.mapX)[conn] if !ok { step = time.Second * 60 } t := time.NewTicker(step) if _, ok := (*f.cronTime.mapX)[conn]; !ok { setMap(&f.reloadCron, conn, make(chan int)) } defer t.Stop() for { select { case <-t.C: f.handle(conn) case tt := <-(*f.reloadCron.mapX)[conn]: setMap(&f.cronTime, conn, time.Duration(tt)*time.Second) go f.cronFetch(conn, c) return case <-c: close(c) return } } } func main() { h := newFetchHandler() router := gin.Default() static := dist{ FS: st, path: "dist", } router.StaticFS("/js", http.FS(static)) router.StaticFS("/css", http.FS(static)) var upgrader = websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { return true }, } go h.sendFetchData() go h.receiveMsg() router.GET("/", func(c *gin.Context) { file, err := static.Open("index.html") if err != nil { c.String(404, "%s", err) return } bytes, err := ioutil.ReadAll(file) if err != nil { c.String(404, "%s", err) return } c.Data(200, "text/html", bytes) }) router.GET("ws", func(c *gin.Context) { conn, err := upgrader.Upgrade(c.Writer, c.Request, nil) if err != nil { c.JSON(201, message{ Status: false, Message: err.Error(), Data: nil, Action: "upgradeWs", }) log.Println(err) return } _ = conn.WriteJSON(message{ Status: true, Action: "sourceList", Message: "", Data: h.sourceArr, }) remote := conn.RemoteAddr().String() if _, ok := (*h.connMap.mapX)[remote]; !ok { setMap(&h.connMap, remote, conn) } cc := make(chan int) go h.cronFetch(remote, cc) go func() { msg := connChan{ conn: remote, msg: message{ Data: &setting{}, }, } for { err := conn.ReadJSON(&msg.msg) if err != nil { if _, ok := (*h.connMap.mapX)[remote]; ok && !websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) { delMap(&h.connMap, remote) cc <- 1 return } log.Printf("websocket read client msg err:[%s]", err) } else { h.rMsgChan <- msg } } }() }) go func() { time.Sleep(2 * time.Second) u := "http://127.0.0.1:8080" switch runtime.GOOS { case "linux": exec.Command(`xdg-open`, u).Start() case "windows": exec.Command(`cmd`, `/c`, `start`, u).Start() } }() err := router.Run(":8080") if err != nil { panic(err) } }