From 760e6e4e2a8592289a6cc576a5ba95bcd3b23aca Mon Sep 17 00:00:00 2001 From: Peter Sanchez Date: Wed, 26 Jul 2023 17:31:59 -0600 Subject: [PATCH] Adding timeout option for queue termination so we don't end up waiting forever. --- server/server.go | 39 +++++++++++++++++++++++++++++---------- 1 file changed, 29 insertions(+), 10 deletions(-) diff --git a/server/server.go b/server/server.go index 39a8cae..c00ffc5 100644 --- a/server/server.go +++ b/server/server.go @@ -77,12 +77,13 @@ type MiddlewareConfig struct { // Server is the global echo application server type Server struct { - Config *config.Config - DB *sql.DB - Session *scs.SessionManager - Email *email.ServiceQueue - Storage storage.Service - GQL *GraphQL + Config *config.Config + DB *sql.DB + Session *scs.SessionManager + Email *email.ServiceQueue + Storage storage.Service + GQL *GraphQL + QueueTimeout time.Duration e *echo.Echo ai *appInfo @@ -90,6 +91,7 @@ type Server struct { autocert bool csrfSkip []string queues []*work.Queue + queuecan map[string]context.CancelFunc } // Context is the context passed to handlers and middlewares @@ -582,9 +584,13 @@ func (s *Server) WithMiddleware(middlewares ...echo.MiddlewareFunc) *Server { // WithQueues add dowork task queues for this server to manage func (s *Server) WithQueues(queues ...*work.Queue) *Server { + if s.queuecan == nil { + s.queuecan = make(map[string]context.CancelFunc) + } s.queues = append(s.queues, queues...) for _, queue := range queues { - ctx := context.Background() + ctx, can := context.WithCancel(context.Background()) + s.queuecan[queue.Name()] = can queue.Start(ctx) } return s @@ -687,14 +693,27 @@ func (s *Server) Shutdown() { s.e.Shutdown(ctx) cancel() - s.e.Logger.Printf("Terminating work queues...\n") - work.Join(s.queues...) + s.e.Logger.Printf("Terminating work queues (timeout in %v)...", s.QueueTimeout) + go work.Join(s.queues...) + select { + case <-time.After(s.QueueTimeout): + // Call the context cancel function after QueueTimeout expires + s.e.Logger.Printf("QueueTimeout reached. Terminating queue processing immediately...") + for _, can := range s.queuecan { + can() + } + } s.e.Logger.Printf("Shutdown completed.\n") } // New creates a new server instance func New(e *echo.Echo, db *sql.DB, c *config.Config) *Server { - server := &Server{Config: c, DB: db, e: e} + server := &Server{ + Config: c, + DB: db, + QueueTimeout: 30 * time.Second, + e: e, + } return server } -- 2.45.2