~netlandish/gobwebs

8cc27ccc67539b3374e7c9516efc826db5d86ea6 — Peter Sanchez 2 days ago b82c014 master
Adding ability to configure queue worker pool size
1 files changed, 38 insertions(+), 20 deletions(-)

M server/server.go
M server/server.go => server/server.go +38 -20
@@ 58,6 58,21 @@ type Extension interface {
	Extend(gserver *Server) (*Server, error)
}

// WorkerPoolFunc is a function that takes a dowork *Queue instance and
// returns how many concurrent workers should be used when starting the
// queue.
type WorkerPoolFunc func(queue *work.Queue) int

func defaultWorkerPool(queue *work.Queue) int {
	qSize := int(queue.Size() / 10) // Random, just take 10% of size for worker pool
	if qSize < 1 {
		qSize = 1
	} else if qSize > 5 {
		qSize = 5
	}
	return qSize
}

// MiddlewareConfig is a config struct to set default middlewares
type MiddlewareConfig struct {
	Sessions bool


@@ 91,9 106,10 @@ type Server struct {
	autocert bool
	csrfSkip []string

	queues   []*work.Queue
	queuecan map[string]context.CancelFunc
	queueto  time.Duration
	queues         []*work.Queue
	queuecan       map[string]context.CancelFunc
	queueto        time.Duration
	workerPoolFunc WorkerPoolFunc
}

// Context is the context passed to handlers and middlewares


@@ 187,10 203,9 @@ func (s *Server) HTTPErrorHandler(err error, c echo.Context) {

		// Print Debug info
		if s.Config.Debug {
			msg := fmt.Sprintf(
				"[ERROR] %v\n%s (%s)\n%v\n%s\n", err, c.Request().URL.Path, c.Path(), c.QueryParams(), stack)
			s.e.Logger.Printf(msg)
			fmt.Fprintf(os.Stderr, msg)
			fmtStr := "[ERROR] %v\n%s (%s)\n%v\n%s\n"
			s.e.Logger.Printf(fmtStr, err, c.Request().URL.Path, c.Path(), c.QueryParams(), stack)
			fmt.Fprintf(os.Stderr, fmtStr, err, c.Request().URL.Path, c.Path(), c.QueryParams(), stack)
		}

		if !s.Config.Debug && s.Config.EmailAdminErrors && s.Email != nil {


@@ 469,19 484,14 @@ func (s *Server) WithQueues(queues ...*work.Queue) *Server {
	if s.queuecan == nil {
		s.queuecan = make(map[string]context.CancelFunc)
	}
	if s.workerPoolFunc == nil {
		s.workerPoolFunc = defaultWorkerPool
	}
	s.queues = append(s.queues, queues...)
	for _, queue := range queues {
		ctx, cancel := context.WithCancel(context.Background())
		s.queuecan[queue.Name()] = cancel
		qSize := int(queue.Size() / 10) // Random, just take 10% of size for worker pool
		// For now this is probably fine. We should work in a way for this to be configurable
		// per queue.
		if qSize < 1 {
			qSize = 1
		} else if qSize > 5 {
			qSize = 5
		}
		queue.Start(ctx, qSize)
		queue.Start(ctx, s.workerPoolFunc(queue))
	}
	return s
}


@@ 508,6 518,13 @@ func (s *Server) SetQueueTimeout(d time.Duration) {
	s.queueto = d
}

// SetWorkerPoolFunc will set the function called when determining the size of a
// task queue's concurrent worker pool
func (s *Server) SetWorkerPoolFunc(fn WorkerPoolFunc) *Server {
	s.workerPoolFunc = fn
	return s
}

// WithEmail sets the email ServiceQueue instance
func (s *Server) WithEmail(svc *email.ServiceQueue) *Server {
	s.Email = svc


@@ 612,10 629,11 @@ func (s *Server) Shutdown() {
// New creates a new server instance
func New(e *echo.Echo, db *sql.DB, c *config.Config) *Server {
	server := &Server{
		Config:  c,
		DB:      db,
		e:       e,
		queueto: 30 * time.Second,
		Config:         c,
		DB:             db,
		e:              e,
		queueto:        30 * time.Second,
		workerPoolFunc: defaultWorkerPool,
	}
	return server
}