@@ 77,21 77,22 @@ type MiddlewareConfig struct {
// Server is the global echo application server
type Server struct {
- Config *config.Config
- DB *sql.DB
- Session *scs.SessionManager
- Email *email.ServiceQueue
- Storage storage.Service
- GQL *GraphQL
- QueueTimeout time.Duration
+ Config *config.Config
+ DB *sql.DB
+ Session *scs.SessionManager
+ Email *email.ServiceQueue
+ Storage storage.Service
+ GQL *GraphQL
e *echo.Echo
ai *appInfo
certman autocert.Manager
autocert bool
csrfSkip []string
+
queues []*work.Queue
queuecan map[string]context.CancelFunc
+ queueto time.Duration
}
// Context is the context passed to handlers and middlewares
@@ 589,8 590,8 @@ func (s *Server) WithQueues(queues ...*work.Queue) *Server {
}
s.queues = append(s.queues, queues...)
for _, queue := range queues {
- ctx, can := context.WithCancel(context.Background())
- s.queuecan[queue.Name()] = can
+ ctx, cancel := context.WithCancel(context.Background())
+ s.queuecan[queue.Name()] = cancel
queue.Start(ctx)
}
return s
@@ 612,6 613,12 @@ func (s *Server) WorkQueues() []*work.Queue {
return s.queues
}
+// SetQueueTimeout will set the timeout duration to use when waiting for
+// queues to shutdown
+func (s *Server) SetQueueTimeout(d time.Duration) {
+ s.queueto = d
+}
+
// WithEmail sets the email ServiceQueue instance
func (s *Server) WithEmail(svc *email.ServiceQueue) *Server {
s.Email = svc
@@ 693,15 700,21 @@ func (s *Server) Shutdown() {
s.e.Shutdown(ctx)
cancel()
- s.e.Logger.Printf("Terminating work queues (timeout in %v)...", s.QueueTimeout)
- go work.Join(s.queues...)
+ s.e.Logger.Printf("Terminating work queues (timeout in %v)...", s.queueto)
+ shutdown := make(chan int)
+ go func() {
+ work.Join(s.queues...)
+ shutdown <- 1
+ }()
select {
- case <-time.After(s.QueueTimeout):
+ case <-time.After(s.queueto):
// Call the context cancel function after QueueTimeout expires
s.e.Logger.Printf("QueueTimeout reached. Terminating queue processing immediately...")
- for _, can := range s.queuecan {
- can()
+ for _, cancel := range s.queuecan {
+ cancel()
}
+ case <-shutdown:
+ break
}
s.e.Logger.Printf("Shutdown completed.\n")
@@ 710,10 723,10 @@ func (s *Server) Shutdown() {
// New creates a new server instance
func New(e *echo.Echo, db *sql.DB, c *config.Config) *Server {
server := &Server{
- Config: c,
- DB: db,
- QueueTimeout: 30 * time.Second,
- e: e,
+ Config: c,
+ DB: db,
+ e: e,
+ queueto: 30 * time.Second,
}
return server
}