~netlandish/gobwebs

d49bd2131359b16f8d10a31a5388bb265cf8a4b0 — Peter Sanchez 2 years ago cbf23a5
Adding GQueue dowork queue wrapper to be able to send tasks to specific queues
3 files changed, 35 insertions(+), 6 deletions(-)

M email/email.go
A server/queue.go
M server/server.go
M email/email.go => email/email.go +1 -2
@@ 39,8 39,7 @@ func (s *ServiceQueue) SendMail(msg *carrier.Message) error {
		// No work queue set, send immediately
		return s.service.Send(msg)
	}
	work.Enqueue(s.mailTask(msg))
	return nil
	return s.queue.Enqueue(s.mailTask(msg))
}

func (s *ServiceQueue) mailTask(msg *carrier.Message) *work.Task {

A server/queue.go => server/queue.go +14 -0
@@ 0,0 1,14 @@
package server

import work "git.sr.ht/~sircmpwn/dowork"

// GQueue is a dowork queue wrapper that lets you name it
type GQueue struct {
	Queue *work.Queue
	Name  string
}

// NewQueue returns a new GQueue instance
func NewQueue(name string) *GQueue {
	return &GQueue{work.NewQueue(name), name}
}

M server/server.go => server/server.go +20 -4
@@ 59,7 59,7 @@ type Server struct {
	e        *echo.Echo
	ai       *appInfo
	csrfSkip []string
	queues   []*work.Queue
	queues   []*GQueue
}

// Context is the context passed to handlers and middlewares


@@ 321,15 321,26 @@ func (s *Server) WithMiddleware(middlewares ...echo.MiddlewareFunc) *Server {
}

// WithQueues add dowork task queues for this server to manage
func (s *Server) WithQueues(queues ...*work.Queue) *Server {
func (s *Server) WithQueues(queues ...*GQueue) *Server {
	ctx := context.Background()
	s.queues = append(s.queues, queues...)
	for _, queue := range queues {
		queue.Start(ctx)
		queue.Queue.Start(ctx)
	}
	return s
}

// QueueTask will add a task to a specific queue. Queue should be identified
// by queue name.
func (s *Server) QueueTask(qname string, task *work.Task) error {
	for _, q := range s.queues {
		if q.Name == qname {
			return q.Queue.Enqueue(task)
		}
	}
	return fmt.Errorf("No queue found with name '%s'", qname)
}

// WithEmail sets the email ServiceQueue instance
func (s *Server) WithEmail(svc *email.ServiceQueue) *Server {
	s.Email = svc


@@ 400,7 411,12 @@ func (s *Server) Shutdown() {
	cancel()

	s.e.Logger.Printf("Terminating work queues...\n")
	work.Join(s.queues...)
	var queues []*work.Queue
	for _, q := range s.queues {
		queues = append(queues, q.Queue)
	}
	work.Join(queues...)

	s.e.Logger.Printf("Shutdown completed.\n")
}