517 lines
12 KiB
Go
517 lines
12 KiB
Go
package main
|
|
|
|
import (
|
|
"embed"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"github.com/PuerkitoBio/goquery"
|
|
"github.com/gin-gonic/gin"
|
|
"github.com/gorilla/websocket"
|
|
"github/fthvgb1/newsfetch/data"
|
|
"github/fthvgb1/newsfetch/newsource"
|
|
"github/fthvgb1/newsfetch/tools"
|
|
"io/fs"
|
|
"io/ioutil"
|
|
"log"
|
|
"net/http"
|
|
"net/http/cookiejar"
|
|
"net/url"
|
|
"os/exec"
|
|
"path/filepath"
|
|
"reflect"
|
|
"runtime"
|
|
"runtime/debug"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
type connChan struct {
|
|
conn string
|
|
msg message
|
|
}
|
|
type dataChan struct {
|
|
conn string
|
|
item []data.FetchData
|
|
}
|
|
|
|
type fetchHandler struct {
|
|
hadFetchData []data.FetchData
|
|
cronTime mapXS[time.Duration]
|
|
keyword mapXS[string]
|
|
searchSource mapXS[[]string]
|
|
hadFetchedMap mapXS[int]
|
|
reloadCron mapXS[chan int]
|
|
isOff chan int
|
|
rMsgChan chan connChan
|
|
newFetchItem chan dataChan
|
|
connMap mapXS[*websocket.Conn]
|
|
sourceMap map[string]newsource.Source
|
|
sourceArr []string
|
|
}
|
|
|
|
type setting struct {
|
|
Keyword string `json:"keyword"`
|
|
TimeStep int `json:"timeStep"`
|
|
SearchSource []string `json:"searchSource"`
|
|
}
|
|
|
|
type message struct {
|
|
Status bool
|
|
Action string
|
|
Message string
|
|
Data interface{}
|
|
}
|
|
|
|
type dist struct {
|
|
embed.FS
|
|
path string
|
|
}
|
|
|
|
//go:embed dist/*
|
|
var st embed.FS
|
|
|
|
func (r dist) Open(name string) (fs.File, error) {
|
|
if filepath.Separator != '/' && strings.ContainsRune(name, filepath.Separator) {
|
|
return nil, errors.New("http: invalid character in file path")
|
|
}
|
|
fullName := strings.TrimLeft(name, "/")
|
|
prifix := strings.Split(fullName, ".")
|
|
l := len(prifix)
|
|
p := prifix[l-1]
|
|
if p == "js" || p == "css" {
|
|
fullName = p + "/" + fullName
|
|
} else if p == "map" {
|
|
fullName = "js/" + fullName
|
|
}
|
|
fullName = r.path + "/" + fullName
|
|
file, err := r.FS.Open(fullName)
|
|
return file, err
|
|
}
|
|
|
|
func isContain[T comparable](i T, arr []T) bool {
|
|
for _, t := range arr {
|
|
if i == t {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (r *mapXS[T]) set(k string, v T) {
|
|
r.Lock()
|
|
(*r.mapX)[k] = v
|
|
r.Unlock()
|
|
}
|
|
|
|
func (r *mapXS[T]) del(k string) {
|
|
r.Lock()
|
|
delete(*r.mapX, k)
|
|
r.Unlock()
|
|
}
|
|
|
|
type mapT interface {
|
|
string | []string | int | time.Duration | *websocket.Conn | chan int
|
|
}
|
|
type mapX[T mapT] map[string]T
|
|
|
|
type mapXS[T mapT] struct {
|
|
*mapX[T]
|
|
*sync.Mutex
|
|
}
|
|
|
|
func newFetchHandler() *fetchHandler {
|
|
var arr = make(map[string]newsource.Source)
|
|
var x []string
|
|
for _, source := range newsource.GetSource() {
|
|
arr[source.Name] = source
|
|
x = append(x, source.Name)
|
|
}
|
|
return &fetchHandler{
|
|
sourceMap: arr,
|
|
sourceArr: x,
|
|
keyword: mapXS[string]{
|
|
&mapX[string]{},
|
|
&sync.Mutex{},
|
|
},
|
|
hadFetchedMap: mapXS[int]{
|
|
&mapX[int]{},
|
|
&sync.Mutex{},
|
|
},
|
|
cronTime: mapXS[time.Duration]{
|
|
&mapX[time.Duration]{},
|
|
&sync.Mutex{},
|
|
},
|
|
searchSource: mapXS[[]string]{
|
|
&mapX[[]string]{},
|
|
&sync.Mutex{},
|
|
},
|
|
reloadCron: mapXS[chan int]{
|
|
&mapX[chan int]{},
|
|
&sync.Mutex{},
|
|
},
|
|
isOff: make(chan int),
|
|
rMsgChan: make(chan connChan, 10),
|
|
newFetchItem: make(chan dataChan, 10),
|
|
connMap: mapXS[*websocket.Conn]{
|
|
&mapX[*websocket.Conn]{},
|
|
&sync.Mutex{},
|
|
},
|
|
}
|
|
}
|
|
|
|
func (f *fetchHandler) handle(conn string) {
|
|
key := "纪检"
|
|
if kk, ok := (*f.keyword.mapX)[conn]; ok && kk != "" {
|
|
key = kk
|
|
}
|
|
for _, sourceName := range (*f.searchSource.mapX)[conn] {
|
|
source := f.sourceMap[sourceName]
|
|
r := f.fetch2(source, key)
|
|
if r != nil && r.StatusCode == 200 {
|
|
if strings.ToUpper(source.Type) == "HTML" {
|
|
f.parsesDom(r, conn, source)
|
|
} else {
|
|
f.parseAjax(r, source, conn)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (f *fetchHandler) receiveMsg() {
|
|
for {
|
|
r := <-f.rMsgChan
|
|
switch r.msg.Action {
|
|
case "search":
|
|
if t, ok := r.msg.Data.(*setting); ok {
|
|
f.keyword.set(r.conn, t.Keyword)
|
|
f.searchSource.set(r.conn, t.SearchSource)
|
|
f.handle(r.conn)
|
|
(*f.reloadCron.mapX)[r.conn] <- t.TimeStep
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (f *fetchHandler) fetch2(source newsource.Source, key string) *http.Response {
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
log.Printf("err:%s. stack:%s", r, debug.Stack())
|
|
}
|
|
}()
|
|
jar, _ := cookiejar.New(nil)
|
|
client := http.Client{
|
|
Transport: nil,
|
|
CheckRedirect: nil,
|
|
Jar: jar,
|
|
Timeout: 10 * time.Second,
|
|
}
|
|
searchUrl := source.SearchUrl
|
|
source.Method = strings.ToUpper(source.Method)
|
|
if source.Method == "GET" && source.KeywordField != "" {
|
|
if !strings.Contains(searchUrl, "?") {
|
|
searchUrl += "?" + source.KeywordField + "=" + url.QueryEscape(key)
|
|
} else {
|
|
searchUrl += "&" + source.KeywordField + "=" + url.QueryEscape(key)
|
|
}
|
|
} else if source.Method == "GET" && source.KeywordField == "" {
|
|
if strings.Contains(searchUrl, "${keyword}") {
|
|
searchUrl = strings.Replace(searchUrl, "${keyword}", url.QueryEscape(key), -1)
|
|
} else {
|
|
searchUrl += url.QueryEscape(key)
|
|
}
|
|
}
|
|
var req *http.Request
|
|
if source.Method == "POST" {
|
|
body := ""
|
|
if nil != source.ExternParam {
|
|
if source.IsJson {
|
|
t := source.ExternParam
|
|
t[source.KeywordField] = key
|
|
bytes, err := json.Marshal(t)
|
|
if err != nil {
|
|
log.Printf("build post json param err:[%s]", err)
|
|
return nil
|
|
}
|
|
body = string(bytes)
|
|
} else {
|
|
body = source.KeywordField + "=" + key
|
|
body += "&"
|
|
for s, s2 := range source.ExternParam {
|
|
body += s + "=" + s2 + "&"
|
|
}
|
|
body = strings.TrimRight(body, "&")
|
|
}
|
|
}
|
|
req, _ = http.NewRequest(source.Method, searchUrl, strings.NewReader(body))
|
|
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
|
|
} else {
|
|
req, _ = http.NewRequest(source.Method, searchUrl, nil)
|
|
}
|
|
if source.Header != nil {
|
|
for s, s2 := range source.Header {
|
|
req.Header.Set(s, s2)
|
|
}
|
|
}
|
|
if source.HeaderFun != nil {
|
|
source.HeaderFun(req)
|
|
}
|
|
client.CheckRedirect = func(req *http.Request, via []*http.Request) error {
|
|
if len(via) > 0 && via[0].URL.Scheme == "https" && req.URL.Scheme != "https" {
|
|
lastHop := via[len(via)-1].URL
|
|
return fmt.Errorf("redirected from secure URL %s to insecure URL %s", lastHop, req.URL)
|
|
}
|
|
|
|
// Go's http.DefaultClient allows 10 redirects before returning an error.
|
|
// The securityPreservingHTTPClient also uses this default policy to avoid
|
|
// Go command hangs.
|
|
if len(via) >= 3 {
|
|
return errors.New("stopped after 3 redirects")
|
|
}
|
|
return nil
|
|
}
|
|
response, err := client.Do(req)
|
|
|
|
if err != nil {
|
|
log.Printf("request %s err: %s", req.URL, err)
|
|
return nil
|
|
}
|
|
return response
|
|
}
|
|
|
|
func (f *fetchHandler) parseAjax(response *http.Response, source newsource.Source, conn string) {
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
log.Printf("parse ajax response err[%s]. stack:[%s]", r, debug.Stack())
|
|
}
|
|
}()
|
|
nowDate := time.Now().Format("2006-01-02 15:04:06")
|
|
var newFetch []data.FetchData
|
|
|
|
if source.AjaxSimpleDeal != nil && source.AjaxDealFun == nil {
|
|
bytes, err := ioutil.ReadAll(response.Body)
|
|
if err != nil {
|
|
log.Printf("read response body err:[%s]", err)
|
|
return
|
|
}
|
|
if source.AjaxSimpleDeal != nil && source.Target != nil {
|
|
dst := reflect.New(source.Target).Elem()
|
|
err = json.Unmarshal(bytes, dst.Addr().Interface())
|
|
if err != nil {
|
|
log.Printf("jsondecode err:[%s]", err)
|
|
return
|
|
}
|
|
source.AjaxSimpleDeal(dst.Interface(), &newFetch)
|
|
}
|
|
} else if source.AjaxDealFun != nil && source.AjaxSimpleDeal == nil {
|
|
source.AjaxDealFun(&newFetch, response)
|
|
}
|
|
|
|
if len(newFetch) > 0 {
|
|
var newF []data.FetchData
|
|
for i := 0; i < len(newFetch); i++ {
|
|
fetchData := newFetch[i]
|
|
k := conn + "_" + fetchData.Url + "_" + fetchData.Title
|
|
if newFetch[i].CreatedTime == "" {
|
|
newFetch[i].CreatedTime = nowDate
|
|
}
|
|
if newFetch[i].Source == "" {
|
|
newFetch[i].Source = source.Name
|
|
}
|
|
if !tools.IsInToday(newFetch[i].Date) {
|
|
continue
|
|
}
|
|
if _, ok := (*f.hadFetchedMap.mapX)[k]; !ok {
|
|
f.hadFetchData = append(f.hadFetchData, fetchData)
|
|
f.hadFetchedMap.set(k, 1)
|
|
newF = append(newF, newFetch[i])
|
|
}
|
|
}
|
|
f.newFetchItem <- dataChan{
|
|
conn: conn,
|
|
item: newF,
|
|
}
|
|
}
|
|
err := response.Body.Close()
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
}
|
|
|
|
func (f *fetchHandler) parsesDom(html *http.Response, conn string, source newsource.Source) {
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
log.Printf("parse html err:[%s]. stack:[%s]", r, debug.Stack())
|
|
}
|
|
}()
|
|
doc, err := goquery.NewDocumentFromReader(html.Body)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
var newFetch []data.FetchData
|
|
nowDate := time.Now().Format("2006-01-02 15:04:05")
|
|
doc.Find(source.ListQuery).Each(func(i int, selection *goquery.Selection) {
|
|
fetchData := data.FetchData{
|
|
CreatedTime: nowDate,
|
|
Source: source.Name,
|
|
}
|
|
source.QueryHandler(i, selection, &fetchData)
|
|
if !tools.IsInToday(fetchData.Date) {
|
|
return
|
|
}
|
|
|
|
k := conn + "_" + fetchData.Url + "_" + fetchData.Title
|
|
if _, ok := (*f.hadFetchedMap.mapX)[k]; !ok {
|
|
f.hadFetchData = append(f.hadFetchData, fetchData)
|
|
f.hadFetchedMap.set(k, 1)
|
|
newFetch = append(newFetch, fetchData)
|
|
}
|
|
})
|
|
if len(newFetch) > 0 {
|
|
f.newFetchItem <- dataChan{
|
|
conn: conn,
|
|
item: newFetch,
|
|
}
|
|
}
|
|
err = html.Body.Close()
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
}
|
|
|
|
func (f *fetchHandler) sendFetchData() {
|
|
for {
|
|
dataFetch := <-f.newFetchItem
|
|
|
|
err := (*f.connMap.mapX)[dataFetch.conn].WriteJSON(message{
|
|
Status: true,
|
|
Action: "newData",
|
|
Message: "",
|
|
Data: dataFetch.item,
|
|
})
|
|
if err != nil {
|
|
log.Printf("send new fetch data err:[%s]", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (f *fetchHandler) cronFetch(conn string, c chan int) {
|
|
step, ok := (*f.cronTime.mapX)[conn]
|
|
if !ok {
|
|
step = time.Second * 60
|
|
}
|
|
t := time.NewTicker(step)
|
|
if _, ok := (*f.cronTime.mapX)[conn]; !ok {
|
|
f.reloadCron.set(conn, make(chan int))
|
|
}
|
|
defer t.Stop()
|
|
for {
|
|
select {
|
|
case <-t.C:
|
|
f.handle(conn)
|
|
case tt := <-(*f.reloadCron.mapX)[conn]:
|
|
f.cronTime.set(conn, time.Duration(tt)*time.Second)
|
|
go f.cronFetch(conn, c)
|
|
return
|
|
case <-c:
|
|
close(c)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func main() {
|
|
h := newFetchHandler()
|
|
router := gin.Default()
|
|
static := dist{
|
|
FS: st,
|
|
path: "dist",
|
|
}
|
|
router.StaticFS("/js", http.FS(static))
|
|
router.StaticFS("/css", http.FS(static))
|
|
var upgrader = websocket.Upgrader{
|
|
CheckOrigin: func(r *http.Request) bool {
|
|
return true
|
|
},
|
|
}
|
|
go h.sendFetchData()
|
|
go h.receiveMsg()
|
|
router.GET("/", func(c *gin.Context) {
|
|
file, err := static.Open("index.html")
|
|
if err != nil {
|
|
c.String(404, "%s", err)
|
|
return
|
|
}
|
|
bytes, err := ioutil.ReadAll(file)
|
|
if err != nil {
|
|
c.String(404, "%s", err)
|
|
return
|
|
}
|
|
c.Data(200, "text/html", bytes)
|
|
})
|
|
|
|
router.GET("ws", func(c *gin.Context) {
|
|
conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
|
|
if err != nil {
|
|
c.JSON(201, message{
|
|
Status: false,
|
|
Message: err.Error(),
|
|
Data: nil,
|
|
Action: "upgradeWs",
|
|
})
|
|
log.Println(err)
|
|
return
|
|
}
|
|
_ = conn.WriteJSON(message{
|
|
Status: true,
|
|
Action: "sourceList",
|
|
Message: "",
|
|
Data: h.sourceArr,
|
|
})
|
|
remote := conn.RemoteAddr().String()
|
|
if _, ok := (*h.connMap.mapX)[remote]; !ok {
|
|
h.connMap.set(remote, conn)
|
|
}
|
|
cc := make(chan int)
|
|
go h.cronFetch(remote, cc)
|
|
go func() {
|
|
msg := connChan{
|
|
conn: remote,
|
|
msg: message{
|
|
Data: &setting{},
|
|
},
|
|
}
|
|
for {
|
|
err := conn.ReadJSON(&msg.msg)
|
|
if err != nil {
|
|
if _, ok := (*h.connMap.mapX)[remote]; ok && !websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) {
|
|
h.connMap.del(remote)
|
|
cc <- 1
|
|
return
|
|
}
|
|
log.Printf("websocket read client msg err:[%s]", err)
|
|
} else {
|
|
h.rMsgChan <- msg
|
|
}
|
|
}
|
|
}()
|
|
})
|
|
go func() {
|
|
time.Sleep(2 * time.Second)
|
|
u := "http://127.0.0.1:8080"
|
|
switch runtime.GOOS {
|
|
case "linux":
|
|
exec.Command(`xdg-open`, u).Start()
|
|
case "windows":
|
|
exec.Command(`cmd`, `/c`, `start`, u).Start()
|
|
|
|
}
|
|
|
|
}()
|
|
err := router.Run(":8080")
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
}
|