From d49bd2131359b16f8d10a31a5388bb265cf8a4b0 Mon Sep 17 00:00:00 2001 From: Peter Sanchez Date: Thu, 25 Aug 2022 21:18:16 -0600 Subject: [PATCH] Adding GQueue dowork queue wrapper to be able to send tasks to specific queues --- email/email.go | 3 +-- server/queue.go | 14 ++++++++++++++ server/server.go | 24 ++++++++++++++++++++---- 3 files changed, 35 insertions(+), 6 deletions(-) create mode 100644 server/queue.go diff --git a/email/email.go b/email/email.go index c33f3ec..96e929f 100644 --- a/email/email.go +++ b/email/email.go @@ -39,8 +39,7 @@ func (s *ServiceQueue) SendMail(msg *carrier.Message) error { // No work queue set, send immediately return s.service.Send(msg) } - work.Enqueue(s.mailTask(msg)) - return nil + return s.queue.Enqueue(s.mailTask(msg)) } func (s *ServiceQueue) mailTask(msg *carrier.Message) *work.Task { diff --git a/server/queue.go b/server/queue.go new file mode 100644 index 0000000..2cfa86c --- /dev/null +++ b/server/queue.go @@ -0,0 +1,14 @@ +package server + +import work "git.sr.ht/~sircmpwn/dowork" + +// GQueue is a dowork queue wrapper that lets you name it +type GQueue struct { + Queue *work.Queue + Name string +} + +// NewQueue returns a new GQueue instance +func NewQueue(name string) *GQueue { + return &GQueue{work.NewQueue(name), name} +} diff --git a/server/server.go b/server/server.go index 6123ea7..ae54d15 100644 --- a/server/server.go +++ b/server/server.go @@ -59,7 +59,7 @@ type Server struct { e *echo.Echo ai *appInfo csrfSkip []string - queues []*work.Queue + queues []*GQueue } // Context is the context passed to handlers and middlewares @@ -321,15 +321,26 @@ func (s *Server) WithMiddleware(middlewares ...echo.MiddlewareFunc) *Server { } // WithQueues add dowork task queues for this server to manage -func (s *Server) WithQueues(queues ...*work.Queue) *Server { +func (s *Server) WithQueues(queues ...*GQueue) *Server { ctx := context.Background() s.queues = append(s.queues, queues...) for _, queue := range queues { - queue.Start(ctx) + queue.Queue.Start(ctx) } return s } +// QueueTask will add a task to a specific queue. Queue should be identified +// by queue name. +func (s *Server) QueueTask(qname string, task *work.Task) error { + for _, q := range s.queues { + if q.Name == qname { + return q.Queue.Enqueue(task) + } + } + return fmt.Errorf("No queue found with name '%s'", qname) +} + // WithEmail sets the email ServiceQueue instance func (s *Server) WithEmail(svc *email.ServiceQueue) *Server { s.Email = svc @@ -400,7 +411,12 @@ func (s *Server) Shutdown() { cancel() s.e.Logger.Printf("Terminating work queues...\n") - work.Join(s.queues...) + var queues []*work.Queue + for _, q := range s.queues { + queues = append(queues, q.Queue) + } + work.Join(queues...) + s.e.Logger.Printf("Shutdown completed.\n") } -- 2.45.2