From 32886cd5f99e48076eef28786354d8f8eff0c0bf Mon Sep 17 00:00:00 2001 From: Peter Sanchez Date: Thu, 27 Jul 2023 06:44:18 -0600 Subject: [PATCH] Let's not hold up shutdown for the queue timeout if it's not necessary --- server/server.go | 49 ++++++++++++++++++++++++++++++------------------ 1 file changed, 31 insertions(+), 18 deletions(-) diff --git a/server/server.go b/server/server.go index c00ffc5..a8bac4a 100644 --- a/server/server.go +++ b/server/server.go @@ -77,21 +77,22 @@ type MiddlewareConfig struct { // Server is the global echo application server type Server struct { - Config *config.Config - DB *sql.DB - Session *scs.SessionManager - Email *email.ServiceQueue - Storage storage.Service - GQL *GraphQL - QueueTimeout time.Duration + Config *config.Config + DB *sql.DB + Session *scs.SessionManager + Email *email.ServiceQueue + Storage storage.Service + GQL *GraphQL e *echo.Echo ai *appInfo certman autocert.Manager autocert bool csrfSkip []string + queues []*work.Queue queuecan map[string]context.CancelFunc + queueto time.Duration } // Context is the context passed to handlers and middlewares @@ -589,8 +590,8 @@ func (s *Server) WithQueues(queues ...*work.Queue) *Server { } s.queues = append(s.queues, queues...) for _, queue := range queues { - ctx, can := context.WithCancel(context.Background()) - s.queuecan[queue.Name()] = can + ctx, cancel := context.WithCancel(context.Background()) + s.queuecan[queue.Name()] = cancel queue.Start(ctx) } return s @@ -612,6 +613,12 @@ func (s *Server) WorkQueues() []*work.Queue { return s.queues } +// SetQueueTimeout will set the timeout duration to use when waiting for +// queues to shutdown +func (s *Server) SetQueueTimeout(d time.Duration) { + s.queueto = d +} + // WithEmail sets the email ServiceQueue instance func (s *Server) WithEmail(svc *email.ServiceQueue) *Server { s.Email = svc @@ -693,15 +700,21 @@ func (s *Server) Shutdown() { s.e.Shutdown(ctx) cancel() - s.e.Logger.Printf("Terminating work queues (timeout in %v)...", s.QueueTimeout) - go work.Join(s.queues...) + s.e.Logger.Printf("Terminating work queues (timeout in %v)...", s.queueto) + shutdown := make(chan int) + go func() { + work.Join(s.queues...) + shutdown <- 1 + }() select { - case <-time.After(s.QueueTimeout): + case <-time.After(s.queueto): // Call the context cancel function after QueueTimeout expires s.e.Logger.Printf("QueueTimeout reached. Terminating queue processing immediately...") - for _, can := range s.queuecan { - can() + for _, cancel := range s.queuecan { + cancel() } + case <-shutdown: + break } s.e.Logger.Printf("Shutdown completed.\n") @@ -710,10 +723,10 @@ func (s *Server) Shutdown() { // New creates a new server instance func New(e *echo.Echo, db *sql.DB, c *config.Config) *Server { server := &Server{ - Config: c, - DB: db, - QueueTimeout: 30 * time.Second, - e: e, + Config: c, + DB: db, + e: e, + queueto: 30 * time.Second, } return server } -- 2.45.2