@@ 77,12 77,13 @@ type MiddlewareConfig struct {
// Server is the global echo application server
type Server struct {
- Config *config.Config
- DB *sql.DB
- Session *scs.SessionManager
- Email *email.ServiceQueue
- Storage storage.Service
- GQL *GraphQL
+ Config *config.Config
+ DB *sql.DB
+ Session *scs.SessionManager
+ Email *email.ServiceQueue
+ Storage storage.Service
+ GQL *GraphQL
+ QueueTimeout time.Duration
e *echo.Echo
ai *appInfo
@@ 90,6 91,7 @@ type Server struct {
autocert bool
csrfSkip []string
queues []*work.Queue
+ queuecan map[string]context.CancelFunc
}
// Context is the context passed to handlers and middlewares
@@ 582,9 584,13 @@ func (s *Server) WithMiddleware(middlewares ...echo.MiddlewareFunc) *Server {
// WithQueues add dowork task queues for this server to manage
func (s *Server) WithQueues(queues ...*work.Queue) *Server {
+ if s.queuecan == nil {
+ s.queuecan = make(map[string]context.CancelFunc)
+ }
s.queues = append(s.queues, queues...)
for _, queue := range queues {
- ctx := context.Background()
+ ctx, can := context.WithCancel(context.Background())
+ s.queuecan[queue.Name()] = can
queue.Start(ctx)
}
return s
@@ 687,14 693,27 @@ func (s *Server) Shutdown() {
s.e.Shutdown(ctx)
cancel()
- s.e.Logger.Printf("Terminating work queues...\n")
- work.Join(s.queues...)
+ s.e.Logger.Printf("Terminating work queues (timeout in %v)...", s.QueueTimeout)
+ go work.Join(s.queues...)
+ select {
+ case <-time.After(s.QueueTimeout):
+ // Call the context cancel function after QueueTimeout expires
+ s.e.Logger.Printf("QueueTimeout reached. Terminating queue processing immediately...")
+ for _, can := range s.queuecan {
+ can()
+ }
+ }
s.e.Logger.Printf("Shutdown completed.\n")
}
// New creates a new server instance
func New(e *echo.Echo, db *sql.DB, c *config.Config) *Server {
- server := &Server{Config: c, DB: db, e: e}
+ server := &Server{
+ Config: c,
+ DB: db,
+ QueueTimeout: 30 * time.Second,
+ e: e,
+ }
return server
}