~netlandish/links

02c85f04f6867069e8f9f8296841f4a4a017b39d — Peter Sanchez 6 days ago fd727f9
Adding a simple go routine to process BaseURL metadata if they have previously failed.

This will allow 1 day between attempts and a maximum of 3 tries before
giving up on parsing that particular URL.

Changelog-changed: Failed metadata collection on base url's will be
  attempted a max of 3 times in 1 day intervals.
M api/graph/schema.resolvers.go => api/graph/schema.resolvers.go +4 -4
@@ 509,7 509,7 @@ func (r *mutationResolver) AddLink(ctx context.Context, input *model.LinkInput) 

	if input.Override == nil || (*input.Override && input.ParseBaseURL != nil && *input.ParseBaseURL) {
		if visibility == models.OrgLinkVisibilityPublic {
			srv.QueueTask("general", core.ParseBaseURLTask(srv, BaseURL))
			srv.QueueTask("general", core.ParseBaseURLTask(srv, BaseURL, nil))
		}
	}



@@ 680,7 680,7 @@ func (r *mutationResolver) UpdateLink(ctx context.Context, input *model.UpdateLi
		orgLink.BaseURLID = sql.NullInt64{Valid: true, Int64: int64(BaseURL.ID)}
		orgLink.URL = *input.URL
		if input.Visibility != nil && string(*input.Visibility) == models.OrgLinkVisibilityPublic {
			srv.QueueTask("general", core.ParseBaseURLTask(srv, BaseURL))
			srv.QueueTask("general", core.ParseBaseURLTask(srv, BaseURL, nil))
		}
	}



@@ 706,7 706,7 @@ func (r *mutationResolver) UpdateLink(ctx context.Context, input *model.UpdateLi
			if err != nil {
				return nil, err
			}
			srv.QueueTask("general", core.ParseBaseURLTask(srv, BaseURL))
			srv.QueueTask("general", core.ParseBaseURLTask(srv, BaseURL, nil))
		}
		orgLink.Visibility = string(*input.Visibility)
	}


@@ 1022,7 1022,7 @@ func (r *mutationResolver) AddNote(ctx context.Context, input *model.NoteInput) 

	// We process the based link metadata after saving the current note
	if OrgLinkNote.BaseURLID.Valid && OrgLinkNote.Visibility == models.OrgLinkVisibilityPublic {
		srv.QueueTask("general", core.ParseBaseURLTask(srv, BaseURL))
		srv.QueueTask("general", core.ParseBaseURLTask(srv, BaseURL, nil))
	}

	if len(tags) > 0 {

M cmd/links/main.go => cmd/links/main.go +5 -0
@@ 1,6 1,7 @@
package main

import (
	"context"
	"fmt"
	"net/http"
	"net/url"


@@ 381,6 382,10 @@ func run() error {
		e.Reverse("ses-feedback:endpoint"),
	)

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	go parseBaseURLs(ctx, srv)
	srv.Run()

	return nil

A cmd/links/parse.go => cmd/links/parse.go +94 -0
@@ 0,0 1,94 @@
package main

import (
	"context"
	"fmt"
	"links/core"
	"links/models"
	"time"

	sq "github.com/Masterminds/squirrel"
	"netlandish.com/x/gobwebs/database"
	"netlandish.com/x/gobwebs/server"
	"netlandish.com/x/gobwebs/timezone"
)

func runParse(ctx context.Context, srv *server.Server) error {
	sig := make(chan int, 5)
	dayAgo := time.Now().AddDate(0, 0, -1).UTC()
	opts := &database.FilterOptions{
		Filter: sq.And{
			sq.Eq{"b.public_ready": false},
			sq.Lt{"b.parse_attempts": 3},
			sq.Gt{"b.counter": 0}, // Should only be > 0 if there are public links
			sq.Or{
				sq.Eq{"b.last_parse_attempt": nil},
				sq.LtOrEq{"b.last_parse_attempt": dayAgo},
			},
		},
		OrderBy: "id", // oldest first
		Limit:   2000, // TODO: Make this random or configurable?
	}

	burls, err := models.GetBaseURLs(ctx, opts)
	if err != nil {
		return err
	}

	parseCnt := 0
	for _, burl := range burls {
		srv.QueueTask("import", core.ParseBaseURLTask(srv, burl, sig))
		parseCnt++
		if parseCnt == 5 {
		loop:
			// Loop until at least one finishes or the context is canceled
			for {
				select {
				case <-ctx.Done():
					srv.Logger().Printf("runParse: Context canceled.")
					return nil
				case <-sig:
					parseCnt--
					break loop
				}
			}
		}
	}
	srv.Logger().Printf("runParse: Finished run")
	return nil
}

// This is a helper function to process base URL's that had errors
// when fetching them.
func parseBaseURLs(ctx context.Context, srv *server.Server) {
	nctx, cancel := context.WithCancel(context.Background())
	nctx = server.ServerContext(nctx, srv)
	nctx = database.Context(nctx, srv.DB)
	nctx = timezone.Context(nctx, "UTC")
	defer cancel()

	srv.Logger().Printf("parseBaseURLs: starting parser engine")
	err := runParse(nctx, srv)
	if err != nil {
		srv.Logger().Printf("!! parseBaseURLs (first run): Error %v", err)
		return
	}

	ticker := time.NewTicker(30 * time.Minute)
	defer ticker.Stop()

	for {
		select {
		case <-ctx.Done():
			cancel()
			fmt.Println("parseBaseURLs: Context canceled, shutting down.")
			return
		case <-ticker.C:
			err = runParse(nctx, srv)
			if err != nil {
				srv.Logger().Printf("!! parseBaseURLs (loop run): Error %v", err)
				return
			}
		}
	}
}

M cmd/migrations.go => cmd/migrations.go +7 -0
@@ 45,5 45,12 @@ func GetMigrations() []migrate.Migration {
			0,
			links.MigrateFS,
		),
		migrate.FSFileMigration(
			"0004_add_parse_fields_baseurls",
			"migrations/0004_add_parse_fields_baseurls.up.sql",
			"migrations/0004_add_parse_fields_baseurls.down.sql",
			0,
			links.MigrateFS,
		),
	}
}

M core/import.go => core/import.go +0 -1
@@ 469,7 469,6 @@ func ImportFromPinBoard(ctx context.Context, path string,
			}

			totalCount += listlen
			fmt.Println("Processed", totalCount, "entries...")
			//time.Sleep(3 * time.Second) // Let the parse url workers catch up
		} else {
			break // No more items to process

M core/processors.go => core/processors.go +4 -1
@@ 84,7 84,7 @@ func ImportBookmarksTask(c echo.Context, origin int, path string,
}

// ParseBaseURLTask task to parse html title tags.
func ParseBaseURLTask(srv *server.Server, baseURL *models.BaseURL) *work.Task {
func ParseBaseURLTask(srv *server.Server, baseURL *models.BaseURL, sig chan int) *work.Task {
	return work.NewTask(func(ctx context.Context) error {
		ctx = server.ServerContext(ctx, srv)
		ctx = database.Context(ctx, srv.DB)


@@ 105,6 105,9 @@ func ParseBaseURLTask(srv *server.Server, baseURL *models.BaseURL) *work.Task {
				"Failed task ParseBaseURLTask %s after %d attempts: %v",
				task.Metadata["id"], task.Attempts(), task.Result())
		}
		if sig != nil {
			sig <- 1
		}
	})
}


M helpers.go => helpers.go +28 -2
@@ 3,6 3,7 @@ package links
import (
	"bytes"
	"context"
	"database/sql"
	"encoding/json"
	"encoding/xml"
	"errors"


@@ 402,16 403,41 @@ func ParseBaseURL(ctx context.Context, baseURL *models.BaseURL) error {
		return err
	}

	baseURL.ParseAttempts += 1
	baseURL.LastParseAttempt = sql.NullTime{Valid: true, Time: time.Now().UTC()}

	userAgent := BuildUserAgent(ctx)
	req.Header.Set("User-Agent", userAgent)
	resp, err := client.Do(req)
	if err != nil {
		parseErr := models.BaseURLParseError{
			Timestamp:    time.Now().UTC().Unix(),
			Attempt:      baseURL.ParseAttempts,
			ErrorMessage: err.Error(),
		}
		baseURL.Data.ParseErrors = append(baseURL.Data.ParseErrors, parseErr)
		serr := baseURL.Store(ctx)
		if serr != nil {
			return serr
		}
		return err
	}
	defer resp.Body.Close()

	if resp.StatusCode > 299 {
		return fmt.Errorf("Failed to fetch %s, status code: %d", baseURL.URL, resp.StatusCode)
		err = fmt.Errorf("Failed to fetch %s, status code: %d", baseURL.URL, resp.StatusCode)
		parseErr := models.BaseURLParseError{
			Timestamp:    time.Now().UTC().Unix(),
			Attempt:      baseURL.ParseAttempts,
			StatusCode:   resp.StatusCode,
			ErrorMessage: err.Error(),
		}
		baseURL.Data.ParseErrors = append(baseURL.Data.ParseErrors, parseErr)
		serr := baseURL.Store(ctx)
		if serr != nil {
			return serr
		}
		return err
	}

	hm := extract(resp.Body)


@@ 423,7 449,7 @@ func ParseBaseURL(ctx context.Context, baseURL *models.BaseURL) error {
		baseURL.Title = baseURL.Title[:150]
	}
	baseURL.PublicReady = true
	baseURL.Data = models.BaseURLData{Meta: *hm}
	baseURL.Data.Meta = *hm
	err = baseURL.Store(ctx)
	if err != nil {
		return err

A migrations/0004_add_parse_fields_baseurls.down.sql => migrations/0004_add_parse_fields_baseurls.down.sql +2 -0
@@ 0,0 1,2 @@
ALTER TABLE base_urls DROP parse_attempts;
ALTER TABLE base_urls DROP last_parse_attempt;

A migrations/0004_add_parse_fields_baseurls.up.sql => migrations/0004_add_parse_fields_baseurls.up.sql +2 -0
@@ 0,0 1,2 @@
ALTER TABLE base_urls ADD parse_attempts INT DEFAULT 0;
ALTER TABLE base_urls ADD last_parse_attempt TIMESTAMPTZ;

M models/base_url.go => models/base_url.go +18 -5
@@ 25,9 25,18 @@ type HTMLMeta struct {
	SiteName    string `json:"site_name"`
}

// BaseURLParseError is an error record for failed parsing of BaseURL instances
type BaseURLParseError struct {
	Attempt      int    `json:"attempt"`
	Timestamp    int64  `json:"timestamp"`
	StatusCode   int    `json:"status_cude"`
	ErrorMessage string `json:"error_msg"`
}

// BaseURLData holds metadata for the BaseURL model
type BaseURLData struct {
	Meta HTMLMeta `json:"meta"`
	Meta        HTMLMeta            `json:"meta"`
	ParseErrors []BaseURLParseError `json:"parse_errors,omitempty"`
}

// Value ...


@@ 55,7 64,7 @@ func GetBaseURLs(ctx context.Context, opts *database.FilterOptions) ([]*BaseURL,
		q := opts.GetBuilder(nil)
		rows, err := q.
			Columns("b.id", "b.url", "b.title", "b.counter", "b.data", "b.public_ready", "b.hash",
				"b.created_on", "json_agg(t)::jsonb").
				"b.parse_attempts", "b.last_parse_attempt", "b.created_on", "json_agg(t)::jsonb").
			From("base_urls b").
			LeftJoin("org_links ol ON ol.base_url_id = b.id").
			LeftJoin("tag_links tl ON tl.org_link_id = ol.id").


@@ 77,7 86,8 @@ func GetBaseURLs(ctx context.Context, opts *database.FilterOptions) ([]*BaseURL,
			var url BaseURL
			var tags string
			if err = rows.Scan(&url.ID, &url.URL, &url.Title, &url.Counter,
				&url.Data, &url.PublicReady, &url.Hash, &url.CreatedOn, &tags); err != nil {
				&url.Data, &url.PublicReady, &url.Hash, &url.ParseAttempts,
				&url.LastParseAttempt, &url.CreatedOn, &tags); err != nil {
				return err
			}
			re := regexp.MustCompile(`(,\s)?null,?`)


@@ 116,13 126,14 @@ func (b *BaseURL) Load(ctx context.Context) error {
	tz := timezone.ForContext(ctx)
	err := database.WithTx(ctx, database.TxOptionsRO, func(tx *sql.Tx) error {
		err := sq.
			Select("id", "title", "url", "counter", "data", "public_ready", "hash", "created_on").
			Select("id", "title", "url", "counter", "data", "public_ready", "hash",
				"parse_attempts", "last_parse_attempt", "created_on").
			From("base_urls").
			Where("id = ?", b.ID).
			PlaceholderFormat(sq.Dollar).
			RunWith(tx).
			ScanContext(ctx, &b.ID, &b.Title, &b.URL, &b.Counter, &b.Data,
				&b.PublicReady, &b.Hash, &b.CreatedOn)
				&b.PublicReady, &b.Hash, &b.ParseAttempts, &b.LastParseAttempt, &b.CreatedOn)
		if err != nil {
			if err == sql.ErrNoRows {
				return nil


@@ 168,6 179,8 @@ func (b *BaseURL) Store(ctx context.Context) error {
				Set("data", b.Data).
				Set("public_ready", b.PublicReady).
				Set("hash", b.Hash).
				Set("parse_attempts", b.ParseAttempts).
				Set("last_parse_attempt", b.LastParseAttempt).
				Where("id = ?", b.ID).
				Suffix(`RETURNING (updated_on)`).
				PlaceholderFormat(sq.Dollar).

M models/models.go => models/models.go +11 -9
@@ 54,15 54,17 @@ type Organization struct {

// BaseURL ...
type BaseURL struct {
	ID          int         `db:"id"`
	Title       string      `db:"title"`
	URL         string      `db:"url"`
	Counter     int         `db:"counter"`
	Data        BaseURLData `db:"data"`
	PublicReady bool        `db:"public_ready"`
	Hash        string      `db:"hash" json:"hash"`
	CreatedOn   time.Time   `db:"created_on"`
	UpdatedOn   time.Time   `db:"updated_on"`
	ID               int          `db:"id"`
	Title            string       `db:"title"`
	URL              string       `db:"url"`
	Counter          int          `db:"counter"`
	Data             BaseURLData  `db:"data"`
	PublicReady      bool         `db:"public_ready"`
	Hash             string       `db:"hash" json:"hash"`
	ParseAttempts    int          `db:"parse_attempts"`
	LastParseAttempt sql.NullTime `db:"last_parse_attempt"`
	CreatedOn        time.Time    `db:"created_on"`
	UpdatedOn        time.Time    `db:"updated_on"`

	Tags []Tag `db:"-" json:"tags"`
}

M models/schema.sql => models/schema.sql +2 -0
@@ 151,6 151,8 @@ CREATE TABLE base_urls (
  hash VARCHAR(128) UNIQUE NOT NULL,
  data JSONB DEFAULT '{}',
  counter INT DEFAULT 0,
  parse_attempts INT DEFAULT 0,
  last_parse_attempt TIMESTAMPTZ,
  created_on TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
  updated_on TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP
);

Do not follow this link