From 8cc27ccc67539b3374e7c9516efc826db5d86ea6 Mon Sep 17 00:00:00 2001 From: Peter Sanchez Date: Fri, 20 Dec 2024 14:43:03 -0600 Subject: [PATCH] Adding ability to configure queue worker pool size --- server/server.go | 58 +++++++++++++++++++++++++++++++----------------- 1 file changed, 38 insertions(+), 20 deletions(-) diff --git a/server/server.go b/server/server.go index 12d7838..b48642f 100644 --- a/server/server.go +++ b/server/server.go @@ -58,6 +58,21 @@ type Extension interface { Extend(gserver *Server) (*Server, error) } +// WorkerPoolFunc is a function that takes a dowork *Queue instance and +// returns how many concurrent workers should be used when starting the +// queue. +type WorkerPoolFunc func(queue *work.Queue) int + +func defaultWorkerPool(queue *work.Queue) int { + qSize := int(queue.Size() / 10) // Random, just take 10% of size for worker pool + if qSize < 1 { + qSize = 1 + } else if qSize > 5 { + qSize = 5 + } + return qSize +} + // MiddlewareConfig is a config struct to set default middlewares type MiddlewareConfig struct { Sessions bool @@ -91,9 +106,10 @@ type Server struct { autocert bool csrfSkip []string - queues []*work.Queue - queuecan map[string]context.CancelFunc - queueto time.Duration + queues []*work.Queue + queuecan map[string]context.CancelFunc + queueto time.Duration + workerPoolFunc WorkerPoolFunc } // Context is the context passed to handlers and middlewares @@ -187,10 +203,9 @@ func (s *Server) HTTPErrorHandler(err error, c echo.Context) { // Print Debug info if s.Config.Debug { - msg := fmt.Sprintf( - "[ERROR] %v\n%s (%s)\n%v\n%s\n", err, c.Request().URL.Path, c.Path(), c.QueryParams(), stack) - s.e.Logger.Printf(msg) - fmt.Fprintf(os.Stderr, msg) + fmtStr := "[ERROR] %v\n%s (%s)\n%v\n%s\n" + s.e.Logger.Printf(fmtStr, err, c.Request().URL.Path, c.Path(), c.QueryParams(), stack) + fmt.Fprintf(os.Stderr, fmtStr, err, c.Request().URL.Path, c.Path(), c.QueryParams(), stack) } if !s.Config.Debug && s.Config.EmailAdminErrors && s.Email != nil { @@ -469,19 +484,14 @@ func (s *Server) WithQueues(queues ...*work.Queue) *Server { if s.queuecan == nil { s.queuecan = make(map[string]context.CancelFunc) } + if s.workerPoolFunc == nil { + s.workerPoolFunc = defaultWorkerPool + } s.queues = append(s.queues, queues...) for _, queue := range queues { ctx, cancel := context.WithCancel(context.Background()) s.queuecan[queue.Name()] = cancel - qSize := int(queue.Size() / 10) // Random, just take 10% of size for worker pool - // For now this is probably fine. We should work in a way for this to be configurable - // per queue. - if qSize < 1 { - qSize = 1 - } else if qSize > 5 { - qSize = 5 - } - queue.Start(ctx, qSize) + queue.Start(ctx, s.workerPoolFunc(queue)) } return s } @@ -508,6 +518,13 @@ func (s *Server) SetQueueTimeout(d time.Duration) { s.queueto = d } +// SetWorkerPoolFunc will set the function called when determining the size of a +// task queue's concurrent worker pool +func (s *Server) SetWorkerPoolFunc(fn WorkerPoolFunc) *Server { + s.workerPoolFunc = fn + return s +} + // WithEmail sets the email ServiceQueue instance func (s *Server) WithEmail(svc *email.ServiceQueue) *Server { s.Email = svc @@ -612,10 +629,11 @@ func (s *Server) Shutdown() { // New creates a new server instance func New(e *echo.Echo, db *sql.DB, c *config.Config) *Server { server := &Server{ - Config: c, - DB: db, - e: e, - queueto: 30 * time.Second, + Config: c, + DB: db, + e: e, + queueto: 30 * time.Second, + workerPoolFunc: defaultWorkerPool, } return server } -- 2.45.2