From 309c66018704bd978b021b84fd56519b9defc27d Mon Sep 17 00:00:00 2001 From: xing Date: Wed, 7 Dec 2022 13:26:52 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E5=96=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Dockerfile | 8 ++--- cache/map.go | 4 +-- cache/slice.go | 2 +- helper/func.go | 1 - taskPools/pools.go | 76 +++++++++++++++++++++++++++++++++++++++++ taskPools/pools_test.go | 43 +++++++++++++++++++++++ 6 files changed, 126 insertions(+), 8 deletions(-) create mode 100644 taskPools/pools.go create mode 100644 taskPools/pools_test.go diff --git a/Dockerfile b/Dockerfile index 5de12cf..9e5aea7 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,10 +1,10 @@ -FROM golang:latest +FROM golang:latest as gobulidIso COPY ./ /go/src/wp-go WORKDIR /go/src/wp-go -#ENV GOPROXY="https://goproxy.cn" -RUN go build -tags netgo +ENV GOPROXY="https://goproxy.cn" +RUN go build -ldflags "-w" -tags netgo FROM alpine:latest WORKDIR /opt/wp-go -COPY --from=0 /go/src/wp-go/wp-go ./ +COPY --from=gobulidIso /go/src/wp-go/wp-go ./ ENTRYPOINT ["./wp-go"] \ No newline at end of file diff --git a/cache/map.go b/cache/map.go index 213beba..af39271 100644 --- a/cache/map.go +++ b/cache/map.go @@ -168,7 +168,7 @@ func (m *MapCache[K, V]) GetCache(c context.Context, key K, timeout time.Duratio if timeout > 0 { ctx, cancel := context.WithTimeout(c, timeout) defer cancel() - done := make(chan struct{}) + done := make(chan struct{}, 1) go func() { call() done <- struct{}{} @@ -230,7 +230,7 @@ func (m *MapCache[K, V]) GetCacheBatch(c context.Context, key []K, timeout time. if timeout > 0 { ctx, cancel := context.WithTimeout(c, timeout) defer cancel() - done := make(chan struct{}) + done := make(chan struct{}, 1) go func() { call() done <- struct{}{} diff --git a/cache/slice.go b/cache/slice.go index e4602d1..f43bede 100644 --- a/cache/slice.go +++ b/cache/slice.go @@ -72,7 +72,7 @@ func (c *SliceCache[T]) GetCache(ctx context.Context, timeout time.Duration, par if timeout > 0 { ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() - done := make(chan struct{}) + done := make(chan struct{}, 1) go func() { call() done <- struct{}{} diff --git a/helper/func.go b/helper/func.go index e392354..d45a9a7 100644 --- a/helper/func.go +++ b/helper/func.go @@ -353,7 +353,6 @@ func Slice[T any](arr []T, offset, length int) (r []T) { } func Comb[T any](arr []T, m int) (r [][]T) { - r = make([][]T, 0) if m == 1 { for _, t := range arr { r = append(r, []T{t}) diff --git a/taskPools/pools.go b/taskPools/pools.go new file mode 100644 index 0000000..480ec15 --- /dev/null +++ b/taskPools/pools.go @@ -0,0 +1,76 @@ +package taskPools + +import ( + "context" + "log" + "sync" + "time" +) + +type Pools struct { + ch chan struct{} + wg *sync.WaitGroup +} + +func NewPools(n int) *Pools { + if n <= 0 { + panic("n must >= 1") + } + c := make(chan struct{}, n) + for i := 0; i < n; i++ { + c <- struct{}{} + } + return &Pools{ + ch: c, + wg: &sync.WaitGroup{}, + } +} + +func (p *Pools) ExecuteWithTimeOut(timeout time.Duration, fn func(), args ...string) { + if timeout <= 0 { + p.Execute(fn) + return + } + p.wg.Add(1) + q := <-p.ch + go func() { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + done := make(chan struct{}, 1) + defer func() { + cancel() + p.wg.Done() + p.ch <- q + }() + go func() { + fn() + done <- struct{}{} + }() + select { + case <-ctx.Done(): + if len(args) > 0 && args[0] != "" { + log.Printf("执行%s超时", args[0]) + } + case <-done: + } + }() +} + +func (p *Pools) Execute(fn func()) { + if cap(p.ch) == 1 { + fn() + return + } + p.wg.Add(1) + q := <-p.ch + go func() { + defer func() { + p.wg.Done() + p.ch <- q + }() + fn() + }() +} + +func (p *Pools) Wait() { + p.wg.Wait() +} diff --git a/taskPools/pools_test.go b/taskPools/pools_test.go new file mode 100644 index 0000000..4a2796c --- /dev/null +++ b/taskPools/pools_test.go @@ -0,0 +1,43 @@ +package taskPools + +import ( + "fmt" + "log" + "testing" + "time" +) + +func TestPools_Execute(t *testing.T) { + + t.Run("pools", func(t *testing.T) { + p := NewPools(3) + for i := 0; i < 10; i++ { + i := i + p.Execute(func() { + time.Sleep(time.Second) + log.Printf("task[%d] done", i) + }) + } + p.Wait() + }) +} + +func TestPools_ExecuteWithTimeOut(t *testing.T) { + + t.Run("timeout test", func(t *testing.T) { + p := NewPools(3) + for i := 0; i < 10; i++ { + i := i + p.ExecuteWithTimeOut(2*time.Second, func() { + log.Printf("start task[%d]", i) + tt := time.Second + if i == 0 { + tt = 7 * time.Second + } + time.Sleep(tt) + log.Printf("task[%d] done", i) + }, fmt.Sprintf("task [%d]", i)) + } + p.Wait() + }) +}