newfetch/main.go

520 lines
12 KiB
Go
Raw Normal View History

2022-07-01 05:45:49 +00:00
package main
import (
2022-07-07 08:52:18 +00:00
"embed"
2022-07-25 14:38:30 +00:00
"encoding/json"
2022-07-07 08:52:18 +00:00
"errors"
2022-07-23 18:01:07 +00:00
"fmt"
2022-07-01 05:45:49 +00:00
"github.com/PuerkitoBio/goquery"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
2022-07-23 18:01:07 +00:00
"github/fthvgb1/newsfetch/data"
"github/fthvgb1/newsfetch/newsource"
2022-07-26 09:21:54 +00:00
"github/fthvgb1/newsfetch/tools"
2022-07-07 08:52:18 +00:00
"io/fs"
2022-07-06 15:07:24 +00:00
"io/ioutil"
2022-07-01 05:45:49 +00:00
"log"
"net/http"
2022-07-23 18:01:07 +00:00
"net/http/cookiejar"
"net/url"
2022-07-06 15:07:24 +00:00
"os/exec"
2022-07-07 08:52:18 +00:00
"path/filepath"
2022-07-25 14:38:30 +00:00
"reflect"
2022-07-06 15:07:24 +00:00
"runtime"
2022-07-25 14:38:30 +00:00
"runtime/debug"
2022-07-05 03:54:09 +00:00
"strings"
2022-07-06 02:55:28 +00:00
"sync"
2022-07-01 05:45:49 +00:00
"time"
)
2022-07-05 03:54:09 +00:00
type connChan struct {
conn string
msg message
}
type dataChan struct {
conn string
2022-07-23 18:01:07 +00:00
item []data.FetchData
2022-07-05 03:54:09 +00:00
}
2022-07-01 05:45:49 +00:00
type fetchHandler struct {
2022-07-23 18:01:07 +00:00
hadFetchData []data.FetchData
2022-07-06 09:03:42 +00:00
cronTime mapXS[time.Duration]
keyword mapXS[string]
2022-07-26 04:31:47 +00:00
searchSource mapXS[[]string]
2022-07-06 09:03:42 +00:00
hadFetchedMap mapXS[int]
reloadCron mapXS[chan int]
2022-07-01 05:45:49 +00:00
isOff chan int
2022-07-05 03:54:09 +00:00
rMsgChan chan connChan
newFetchItem chan dataChan
2022-07-06 09:03:42 +00:00
connMap mapXS[*websocket.Conn]
2022-07-26 04:31:47 +00:00
sourceMap map[string]newsource.Source
sourceArr []string
2022-07-05 03:54:09 +00:00
}
type setting struct {
2022-07-26 04:31:47 +00:00
Keyword string `json:"keyword"`
TimeStep int `json:"timeStep"`
SearchSource []string `json:"searchSource"`
2022-07-01 05:45:49 +00:00
}
type message struct {
Status bool
Action string
Message string
Data interface{}
}
2022-07-07 08:52:18 +00:00
type dist struct {
embed.FS
path string
}
//go:embed dist/*
var st embed.FS
func (r dist) Open(name string) (fs.File, error) {
if filepath.Separator != '/' && strings.ContainsRune(name, filepath.Separator) {
return nil, errors.New("http: invalid character in file path")
}
fullName := strings.TrimLeft(name, "/")
prifix := strings.Split(fullName, ".")
l := len(prifix)
p := prifix[l-1]
if p == "js" || p == "css" {
fullName = p + "/" + fullName
} else if p == "map" {
fullName = "js/" + fullName
}
fullName = r.path + "/" + fullName
file, err := r.FS.Open(fullName)
return file, err
}
2022-07-26 04:31:47 +00:00
func isContain[T comparable](i T, arr []T) bool {
for _, t := range arr {
if i == t {
return true
}
}
return false
}
2022-07-28 08:26:04 +00:00
func (r *mapXS[T]) set(k string, v T) {
r.Lock()
(*r.mapX)[k] = v
r.Unlock()
2022-07-06 02:55:28 +00:00
}
2022-07-28 08:26:04 +00:00
func (r *mapXS[T]) del(k string) {
r.Lock()
delete(*r.mapX, k)
r.Unlock()
2022-07-06 09:03:42 +00:00
}
type mapT interface {
2022-07-26 04:31:47 +00:00
string | []string | int | time.Duration | *websocket.Conn | chan int
2022-07-06 09:03:42 +00:00
}
type mapX[T mapT] map[string]T
2022-07-06 02:55:28 +00:00
2022-07-06 09:03:42 +00:00
type mapXS[T mapT] struct {
2022-07-06 02:55:28 +00:00
*mapX[T]
*sync.Mutex
}
2022-07-26 04:31:47 +00:00
func newFetchHandler() *fetchHandler {
var arr = make(map[string]newsource.Source)
var x []string
for _, source := range newsource.GetSource() {
arr[source.Name] = source
x = append(x, source.Name)
}
2022-07-01 05:45:49 +00:00
return &fetchHandler{
2022-07-26 04:31:47 +00:00
sourceMap: arr,
sourceArr: x,
2022-07-06 09:03:42 +00:00
keyword: mapXS[string]{
&mapX[string]{},
&sync.Mutex{},
},
hadFetchedMap: mapXS[int]{
&mapX[int]{},
&sync.Mutex{},
},
cronTime: mapXS[time.Duration]{
&mapX[time.Duration]{},
&sync.Mutex{},
},
2022-07-26 04:31:47 +00:00
searchSource: mapXS[[]string]{
&mapX[[]string]{},
&sync.Mutex{},
},
2022-07-06 09:03:42 +00:00
reloadCron: mapXS[chan int]{
&mapX[chan int]{},
&sync.Mutex{},
},
isOff: make(chan int),
rMsgChan: make(chan connChan, 10),
newFetchItem: make(chan dataChan, 10),
connMap: mapXS[*websocket.Conn]{
&mapX[*websocket.Conn]{},
&sync.Mutex{},
},
2022-07-01 05:45:49 +00:00
}
}
2022-07-05 03:54:09 +00:00
func (f *fetchHandler) handle(conn string) {
key := "纪检"
2022-07-06 09:03:42 +00:00
if kk, ok := (*f.keyword.mapX)[conn]; ok && kk != "" {
2022-07-05 03:54:09 +00:00
key = kk
}
2022-07-26 04:31:47 +00:00
for _, sourceName := range (*f.searchSource.mapX)[conn] {
source := f.sourceMap[sourceName]
2022-08-27 13:30:14 +00:00
go func() {
r := f.fetch2(source, key)
if r != nil && r.StatusCode == 200 {
if strings.ToUpper(source.Type) == "HTML" {
f.parsesDom(r, conn, source)
} else {
f.parseAjax(r, source, conn)
}
2022-07-25 14:38:30 +00:00
}
2022-08-27 13:30:14 +00:00
}()
2022-07-23 18:01:07 +00:00
}
2022-07-01 05:45:49 +00:00
}
func (f *fetchHandler) receiveMsg() {
for {
r := <-f.rMsgChan
2022-07-05 03:54:09 +00:00
switch r.msg.Action {
2022-07-01 05:45:49 +00:00
case "search":
2022-07-05 03:54:09 +00:00
if t, ok := r.msg.Data.(*setting); ok {
2022-07-28 08:26:04 +00:00
f.keyword.set(r.conn, t.Keyword)
f.searchSource.set(r.conn, t.SearchSource)
2022-07-05 03:54:09 +00:00
f.handle(r.conn)
2022-07-26 04:31:47 +00:00
(*f.reloadCron.mapX)[r.conn] <- t.TimeStep
2022-07-01 05:45:49 +00:00
}
}
}
}
2022-07-23 18:01:07 +00:00
func (f *fetchHandler) fetch2(source newsource.Source, key string) *http.Response {
2022-07-25 14:38:30 +00:00
defer func() {
if r := recover(); r != nil {
log.Printf("err:%s. stack:%s", r, debug.Stack())
}
}()
2022-07-23 18:01:07 +00:00
jar, _ := cookiejar.New(nil)
client := http.Client{
Transport: nil,
CheckRedirect: nil,
Jar: jar,
Timeout: 10 * time.Second,
}
searchUrl := source.SearchUrl
source.Method = strings.ToUpper(source.Method)
2022-07-25 14:38:30 +00:00
if source.Method == "GET" && source.KeywordField != "" {
2022-07-23 18:01:07 +00:00
if !strings.Contains(searchUrl, "?") {
searchUrl += "?" + source.KeywordField + "=" + url.QueryEscape(key)
} else {
searchUrl += "&" + source.KeywordField + "=" + url.QueryEscape(key)
}
2022-07-25 14:38:30 +00:00
} else if source.Method == "GET" && source.KeywordField == "" {
if strings.Contains(searchUrl, "${keyword}") {
searchUrl = strings.Replace(searchUrl, "${keyword}", url.QueryEscape(key), -1)
} else {
searchUrl += url.QueryEscape(key)
}
2022-07-23 18:01:07 +00:00
}
var req *http.Request
if source.Method == "POST" {
2022-07-25 14:38:30 +00:00
body := ""
2022-07-23 18:01:07 +00:00
if nil != source.ExternParam {
2022-07-25 14:38:30 +00:00
if source.IsJson {
t := source.ExternParam
t[source.KeywordField] = key
bytes, err := json.Marshal(t)
if err != nil {
log.Printf("build post json param err:[%s]", err)
return nil
}
body = string(bytes)
} else {
body = source.KeywordField + "=" + key
body += "&"
for s, s2 := range source.ExternParam {
body += s + "=" + s2 + "&"
}
body = strings.TrimRight(body, "&")
2022-07-23 18:01:07 +00:00
}
}
req, _ = http.NewRequest(source.Method, searchUrl, strings.NewReader(body))
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
} else {
req, _ = http.NewRequest(source.Method, searchUrl, nil)
}
if source.Header != nil {
for s, s2 := range source.Header {
req.Header.Set(s, s2)
}
}
if source.HeaderFun != nil {
source.HeaderFun(req)
}
client.CheckRedirect = func(req *http.Request, via []*http.Request) error {
if len(via) > 0 && via[0].URL.Scheme == "https" && req.URL.Scheme != "https" {
lastHop := via[len(via)-1].URL
return fmt.Errorf("redirected from secure URL %s to insecure URL %s", lastHop, req.URL)
}
// Go's http.DefaultClient allows 10 redirects before returning an error.
// The securityPreservingHTTPClient also uses this default policy to avoid
// Go command hangs.
if len(via) >= 3 {
return errors.New("stopped after 3 redirects")
}
return nil
}
response, err := client.Do(req)
if err != nil {
2022-07-25 14:38:30 +00:00
log.Printf("request %s err: %s", req.URL, err)
return nil
2022-07-23 18:01:07 +00:00
}
return response
}
2022-07-25 14:38:30 +00:00
func (f *fetchHandler) parseAjax(response *http.Response, source newsource.Source, conn string) {
2022-07-05 03:54:09 +00:00
defer func() {
if r := recover(); r != nil {
2022-07-25 14:38:30 +00:00
log.Printf("parse ajax response err[%s]. stack:[%s]", r, debug.Stack())
2022-07-05 03:54:09 +00:00
}
}()
2022-07-25 14:38:30 +00:00
nowDate := time.Now().Format("2006-01-02 15:04:06")
var newFetch []data.FetchData
2022-07-05 03:54:09 +00:00
2022-07-25 14:38:30 +00:00
if source.AjaxSimpleDeal != nil && source.AjaxDealFun == nil {
bytes, err := ioutil.ReadAll(response.Body)
if err != nil {
log.Printf("read response body err:[%s]", err)
return
}
if source.AjaxSimpleDeal != nil && source.Target != nil {
dst := reflect.New(source.Target).Elem()
err = json.Unmarshal(bytes, dst.Addr().Interface())
if err != nil {
log.Printf("jsondecode err:[%s]", err)
return
}
source.AjaxSimpleDeal(dst.Interface(), &newFetch)
}
} else if source.AjaxDealFun != nil && source.AjaxSimpleDeal == nil {
source.AjaxDealFun(&newFetch, response)
2022-07-01 05:45:49 +00:00
}
2022-07-23 18:01:07 +00:00
if len(newFetch) > 0 {
2022-07-26 04:31:47 +00:00
var newF []data.FetchData
2022-07-25 14:38:30 +00:00
for i := 0; i < len(newFetch); i++ {
fetchData := newFetch[i]
2022-07-23 18:01:07 +00:00
k := conn + "_" + fetchData.Url + "_" + fetchData.Title
2022-07-25 14:38:30 +00:00
if newFetch[i].CreatedTime == "" {
newFetch[i].CreatedTime = nowDate
}
if newFetch[i].Source == "" {
newFetch[i].Source = source.Name
}
2022-07-26 09:21:54 +00:00
if !tools.IsInToday(newFetch[i].Date) {
continue
}
2022-07-23 18:01:07 +00:00
if _, ok := (*f.hadFetchedMap.mapX)[k]; !ok {
f.hadFetchData = append(f.hadFetchData, fetchData)
2022-07-28 08:26:04 +00:00
f.hadFetchedMap.set(k, 1)
2022-07-26 04:31:47 +00:00
newF = append(newF, newFetch[i])
2022-07-23 18:01:07 +00:00
}
}
f.newFetchItem <- dataChan{
conn: conn,
2022-07-26 04:31:47 +00:00
item: newF,
2022-07-23 18:01:07 +00:00
}
}
err := response.Body.Close()
2022-07-01 05:45:49 +00:00
if err != nil {
panic(err)
}
2022-07-23 18:01:07 +00:00
}
2022-07-05 03:54:09 +00:00
2022-07-23 18:01:07 +00:00
func (f *fetchHandler) parsesDom(html *http.Response, conn string, source newsource.Source) {
defer func() {
if r := recover(); r != nil {
2022-07-25 14:38:30 +00:00
log.Printf("parse html err:[%s]. stack:[%s]", r, debug.Stack())
2022-07-05 03:54:09 +00:00
}
2022-07-23 18:01:07 +00:00
}()
doc, err := goquery.NewDocumentFromReader(html.Body)
if err != nil {
panic(err)
}
var newFetch []data.FetchData
nowDate := time.Now().Format("2006-01-02 15:04:05")
doc.Find(source.ListQuery).Each(func(i int, selection *goquery.Selection) {
fetchData := data.FetchData{
CreatedTime: nowDate,
2022-07-25 14:38:30 +00:00
Source: source.Name,
2022-07-23 18:01:07 +00:00
}
source.QueryHandler(i, selection, &fetchData)
2022-07-26 09:21:54 +00:00
if !tools.IsInToday(fetchData.Date) {
return
}
2022-07-23 18:01:07 +00:00
k := conn + "_" + fetchData.Url + "_" + fetchData.Title
2022-07-06 09:03:42 +00:00
if _, ok := (*f.hadFetchedMap.mapX)[k]; !ok {
2022-07-23 18:01:07 +00:00
f.hadFetchData = append(f.hadFetchData, fetchData)
2022-07-28 08:26:04 +00:00
f.hadFetchedMap.set(k, 1)
2022-07-23 18:01:07 +00:00
newFetch = append(newFetch, fetchData)
2022-07-01 05:45:49 +00:00
}
})
2022-07-05 03:54:09 +00:00
if len(newFetch) > 0 {
f.newFetchItem <- dataChan{
conn: conn,
item: newFetch,
}
}
2022-07-01 05:45:49 +00:00
err = html.Body.Close()
if err != nil {
panic(err)
}
}
func (f *fetchHandler) sendFetchData() {
for {
2022-07-23 18:01:07 +00:00
dataFetch := <-f.newFetchItem
2022-07-05 03:54:09 +00:00
2022-07-23 18:01:07 +00:00
err := (*f.connMap.mapX)[dataFetch.conn].WriteJSON(message{
2022-07-01 05:45:49 +00:00
Status: true,
Action: "newData",
Message: "",
2022-07-23 18:01:07 +00:00
Data: dataFetch.item,
2022-07-01 05:45:49 +00:00
})
if err != nil {
2022-07-25 14:38:30 +00:00
log.Printf("send new fetch data err:[%s]", err)
2022-07-01 05:45:49 +00:00
}
}
}
2022-07-05 03:54:09 +00:00
func (f *fetchHandler) cronFetch(conn string, c chan int) {
2022-07-06 09:03:42 +00:00
step, ok := (*f.cronTime.mapX)[conn]
2022-07-05 03:54:09 +00:00
if !ok {
step = time.Second * 60
}
t := time.NewTicker(step)
2022-07-06 09:03:42 +00:00
if _, ok := (*f.cronTime.mapX)[conn]; !ok {
2022-07-28 08:26:04 +00:00
f.reloadCron.set(conn, make(chan int))
2022-07-05 03:54:09 +00:00
}
2022-07-01 05:45:49 +00:00
defer t.Stop()
for {
select {
case <-t.C:
2022-07-05 03:54:09 +00:00
f.handle(conn)
2022-07-06 09:03:42 +00:00
case tt := <-(*f.reloadCron.mapX)[conn]:
2022-07-28 08:26:04 +00:00
f.cronTime.set(conn, time.Duration(tt)*time.Second)
2022-07-05 03:54:09 +00:00
go f.cronFetch(conn, c)
2022-07-01 05:45:49 +00:00
return
2022-07-05 03:54:09 +00:00
case <-c:
close(c)
2022-07-01 05:45:49 +00:00
return
}
}
}
func main() {
2022-07-26 04:31:47 +00:00
h := newFetchHandler()
2022-07-01 05:45:49 +00:00
router := gin.Default()
2022-07-07 08:52:18 +00:00
static := dist{
FS: st,
path: "dist",
2022-07-06 15:07:24 +00:00
}
router.StaticFS("/js", http.FS(static))
router.StaticFS("/css", http.FS(static))
2022-07-05 03:54:09 +00:00
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}
go h.sendFetchData()
go h.receiveMsg()
2022-07-06 15:07:24 +00:00
router.GET("/", func(c *gin.Context) {
file, err := static.Open("index.html")
if err != nil {
c.String(404, "%s", err)
return
}
bytes, err := ioutil.ReadAll(file)
if err != nil {
c.String(404, "%s", err)
return
}
c.Data(200, "text/html", bytes)
2022-07-01 05:45:49 +00:00
})
2022-07-06 15:07:24 +00:00
2022-07-01 05:45:49 +00:00
router.GET("ws", func(c *gin.Context) {
conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
c.JSON(201, message{
Status: false,
Message: err.Error(),
Data: nil,
Action: "upgradeWs",
})
2022-07-05 03:54:09 +00:00
log.Println(err)
2022-07-01 05:45:49 +00:00
return
}
2022-07-26 04:31:47 +00:00
_ = conn.WriteJSON(message{
Status: true,
Action: "sourceList",
Message: "",
Data: h.sourceArr,
})
2022-07-05 03:54:09 +00:00
remote := conn.RemoteAddr().String()
2022-07-06 09:03:42 +00:00
if _, ok := (*h.connMap.mapX)[remote]; !ok {
2022-07-28 08:26:04 +00:00
h.connMap.set(remote, conn)
2022-07-01 05:45:49 +00:00
}
2022-07-05 03:54:09 +00:00
cc := make(chan int)
go h.cronFetch(remote, cc)
go func() {
msg := connChan{
conn: remote,
msg: message{
Data: &setting{},
},
}
for {
err := conn.ReadJSON(&msg.msg)
if err != nil {
2022-07-06 09:03:42 +00:00
if _, ok := (*h.connMap.mapX)[remote]; ok && !websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) {
2022-07-28 08:26:04 +00:00
h.connMap.del(remote)
2022-07-05 03:54:09 +00:00
cc <- 1
return
}
2022-07-25 14:38:30 +00:00
log.Printf("websocket read client msg err:[%s]", err)
2022-07-05 03:54:09 +00:00
} else {
h.rMsgChan <- msg
}
}
}()
2022-07-01 05:45:49 +00:00
})
2022-07-06 15:07:24 +00:00
go func() {
time.Sleep(2 * time.Second)
2022-07-26 04:31:47 +00:00
u := "http://127.0.0.1:8080"
2022-07-06 15:07:24 +00:00
switch runtime.GOOS {
case "linux":
2022-07-26 04:31:47 +00:00
exec.Command(`xdg-open`, u).Start()
2022-07-06 15:07:24 +00:00
case "windows":
2022-07-26 04:31:47 +00:00
exec.Command(`cmd`, `/c`, `start`, u).Start()
2022-07-06 15:07:24 +00:00
}
}()
2022-07-05 03:54:09 +00:00
err := router.Run(":8080")
if err != nil {
panic(err)
}
2022-07-01 05:45:49 +00:00
}