@@ 58,6 58,21 @@ type Extension interface {
Extend(gserver *Server) (*Server, error)
}
+// WorkerPoolFunc is a function that takes a dowork *Queue instance and
+// returns how many concurrent workers should be used when starting the
+// queue.
+type WorkerPoolFunc func(queue *work.Queue) int
+
+func defaultWorkerPool(queue *work.Queue) int {
+ qSize := int(queue.Size() / 10) // Random, just take 10% of size for worker pool
+ if qSize < 1 {
+ qSize = 1
+ } else if qSize > 5 {
+ qSize = 5
+ }
+ return qSize
+}
+
// MiddlewareConfig is a config struct to set default middlewares
type MiddlewareConfig struct {
Sessions bool
@@ 91,9 106,10 @@ type Server struct {
autocert bool
csrfSkip []string
- queues []*work.Queue
- queuecan map[string]context.CancelFunc
- queueto time.Duration
+ queues []*work.Queue
+ queuecan map[string]context.CancelFunc
+ queueto time.Duration
+ workerPoolFunc WorkerPoolFunc
}
// Context is the context passed to handlers and middlewares
@@ 187,10 203,9 @@ func (s *Server) HTTPErrorHandler(err error, c echo.Context) {
// Print Debug info
if s.Config.Debug {
- msg := fmt.Sprintf(
- "[ERROR] %v\n%s (%s)\n%v\n%s\n", err, c.Request().URL.Path, c.Path(), c.QueryParams(), stack)
- s.e.Logger.Printf(msg)
- fmt.Fprintf(os.Stderr, msg)
+ fmtStr := "[ERROR] %v\n%s (%s)\n%v\n%s\n"
+ s.e.Logger.Printf(fmtStr, err, c.Request().URL.Path, c.Path(), c.QueryParams(), stack)
+ fmt.Fprintf(os.Stderr, fmtStr, err, c.Request().URL.Path, c.Path(), c.QueryParams(), stack)
}
if !s.Config.Debug && s.Config.EmailAdminErrors && s.Email != nil {
@@ 469,19 484,14 @@ func (s *Server) WithQueues(queues ...*work.Queue) *Server {
if s.queuecan == nil {
s.queuecan = make(map[string]context.CancelFunc)
}
+ if s.workerPoolFunc == nil {
+ s.workerPoolFunc = defaultWorkerPool
+ }
s.queues = append(s.queues, queues...)
for _, queue := range queues {
ctx, cancel := context.WithCancel(context.Background())
s.queuecan[queue.Name()] = cancel
- qSize := int(queue.Size() / 10) // Random, just take 10% of size for worker pool
- // For now this is probably fine. We should work in a way for this to be configurable
- // per queue.
- if qSize < 1 {
- qSize = 1
- } else if qSize > 5 {
- qSize = 5
- }
- queue.Start(ctx, qSize)
+ queue.Start(ctx, s.workerPoolFunc(queue))
}
return s
}
@@ 508,6 518,13 @@ func (s *Server) SetQueueTimeout(d time.Duration) {
s.queueto = d
}
+// SetWorkerPoolFunc will set the function called when determining the size of a
+// task queue's concurrent worker pool
+func (s *Server) SetWorkerPoolFunc(fn WorkerPoolFunc) *Server {
+ s.workerPoolFunc = fn
+ return s
+}
+
// WithEmail sets the email ServiceQueue instance
func (s *Server) WithEmail(svc *email.ServiceQueue) *Server {
s.Email = svc
@@ 612,10 629,11 @@ func (s *Server) Shutdown() {
// New creates a new server instance
func New(e *echo.Echo, db *sql.DB, c *config.Config) *Server {
server := &Server{
- Config: c,
- DB: db,
- e: e,
- queueto: 30 * time.Second,
+ Config: c,
+ DB: db,
+ e: e,
+ queueto: 30 * time.Second,
+ workerPoolFunc: defaultWorkerPool,
}
return server
}