quickshare/server/libs/httpworker/worker.go
hekk 61a1c93f0f !1 Merge back to master
Merge pull request !1 from dev branch
2018-05-27 21:32:55 +08:00

130 lines
2.4 KiB
Go

package httpworker
import (
"errors"
"net/http"
"runtime/debug"
"time"
)
import (
"quickshare/server/libs/logutil"
)
var (
ErrWorkerNotFound = errors.New("worker not found")
ErrTimeout = errors.New("timeout")
)
type DoFunc func(http.ResponseWriter, *http.Request)
type Task struct {
Ack chan error
Do DoFunc
Res http.ResponseWriter
Req *http.Request
}
type Workers interface {
Put(*Task) bool
IsInTime(ack chan error, msec time.Duration) error
}
type WorkerPool struct {
queue chan *Task
size int
workers []*Worker
log logutil.LogUtil // TODO: should not pass log here
}
func NewWorkerPool(poolSize int, queueSize int, log logutil.LogUtil) Workers {
queue := make(chan *Task, queueSize)
workers := make([]*Worker, 0, poolSize)
for i := 0; i < poolSize; i++ {
worker := &Worker{
Id: uint64(i),
queue: queue,
log: log,
}
go worker.Start()
workers = append(workers, worker)
}
return &WorkerPool{
queue: queue,
size: poolSize,
workers: workers,
log: log,
}
}
func (pool *WorkerPool) Put(task *Task) bool {
if len(pool.queue) >= pool.size {
return false
}
pool.queue <- task
return true
}
func (pool *WorkerPool) IsInTime(ack chan error, msec time.Duration) error {
start := time.Now().UnixNano()
timeout := make(chan error)
go func() {
time.Sleep(msec)
timeout <- ErrTimeout
}()
select {
case err := <-ack:
if err == nil {
pool.log.Printf(
"finish cost: %d usec",
(time.Now().UnixNano()-start)/1000,
)
} else {
pool.log.Printf(
"finish with error cost: %d usec",
(time.Now().UnixNano()-start)/1000,
)
}
return err
case errTimeout := <-timeout:
pool.log.Printf("timeout cost: %d usec", (time.Now().UnixNano()-start)/1000)
return errTimeout
}
}
type Worker struct {
Id uint64
queue chan *Task
log logutil.LogUtil
}
func (worker *Worker) RecoverPanic() {
if r := recover(); r != nil {
worker.log.Printf("Recovered:%v stack: %v", r, debug.Stack())
// restart worker and IsInTime will return timeout error for last task
worker.Start()
}
}
func (worker *Worker) Start() {
defer worker.RecoverPanic()
for {
task := <-worker.queue
if task.Do != nil {
task.Do(task.Res, task.Req)
task.Ack <- nil
} else {
task.Ack <- ErrWorkerNotFound
}
}
}
// ServiceFunc lets you return struct directly
type ServiceFunc func(http.ResponseWriter, *http.Request) interface{}