From 02c85f04f6867069e8f9f8296841f4a4a017b39d Mon Sep 17 00:00:00 2001 From: Peter Sanchez Date: Thu, 6 Mar 2025 19:27:03 -0600 Subject: [PATCH] Adding a simple go routine to process BaseURL metadata if they have previously failed. This will allow 1 day between attempts and a maximum of 3 tries before giving up on parsing that particular URL. Changelog-changed: Failed metadata collection on base url's will be attempted a max of 3 times in 1 day intervals. --- api/graph/schema.resolvers.go | 8 +- cmd/links/main.go | 5 + cmd/links/parse.go | 94 +++++++++++++++++++ cmd/migrations.go | 7 ++ core/import.go | 1 - core/processors.go | 5 +- helpers.go | 30 +++++- .../0004_add_parse_fields_baseurls.down.sql | 2 + .../0004_add_parse_fields_baseurls.up.sql | 2 + models/base_url.go | 23 ++++- models/models.go | 20 ++-- models/schema.sql | 2 + 12 files changed, 177 insertions(+), 22 deletions(-) create mode 100644 cmd/links/parse.go create mode 100644 migrations/0004_add_parse_fields_baseurls.down.sql create mode 100644 migrations/0004_add_parse_fields_baseurls.up.sql diff --git a/api/graph/schema.resolvers.go b/api/graph/schema.resolvers.go index 7bdc796..c708a12 100644 --- a/api/graph/schema.resolvers.go +++ b/api/graph/schema.resolvers.go @@ -509,7 +509,7 @@ func (r *mutationResolver) AddLink(ctx context.Context, input *model.LinkInput) if input.Override == nil || (*input.Override && input.ParseBaseURL != nil && *input.ParseBaseURL) { if visibility == models.OrgLinkVisibilityPublic { - srv.QueueTask("general", core.ParseBaseURLTask(srv, BaseURL)) + srv.QueueTask("general", core.ParseBaseURLTask(srv, BaseURL, nil)) } } @@ -680,7 +680,7 @@ func (r *mutationResolver) UpdateLink(ctx context.Context, input *model.UpdateLi orgLink.BaseURLID = sql.NullInt64{Valid: true, Int64: int64(BaseURL.ID)} orgLink.URL = *input.URL if input.Visibility != nil && string(*input.Visibility) == models.OrgLinkVisibilityPublic { - srv.QueueTask("general", core.ParseBaseURLTask(srv, BaseURL)) + srv.QueueTask("general", core.ParseBaseURLTask(srv, BaseURL, nil)) } } @@ -706,7 +706,7 @@ func (r *mutationResolver) UpdateLink(ctx context.Context, input *model.UpdateLi if err != nil { return nil, err } - srv.QueueTask("general", core.ParseBaseURLTask(srv, BaseURL)) + srv.QueueTask("general", core.ParseBaseURLTask(srv, BaseURL, nil)) } orgLink.Visibility = string(*input.Visibility) } @@ -1022,7 +1022,7 @@ func (r *mutationResolver) AddNote(ctx context.Context, input *model.NoteInput) // We process the based link metadata after saving the current note if OrgLinkNote.BaseURLID.Valid && OrgLinkNote.Visibility == models.OrgLinkVisibilityPublic { - srv.QueueTask("general", core.ParseBaseURLTask(srv, BaseURL)) + srv.QueueTask("general", core.ParseBaseURLTask(srv, BaseURL, nil)) } if len(tags) > 0 { diff --git a/cmd/links/main.go b/cmd/links/main.go index 2d70c83..4372a40 100644 --- a/cmd/links/main.go +++ b/cmd/links/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "fmt" "net/http" "net/url" @@ -381,6 +382,10 @@ func run() error { e.Reverse("ses-feedback:endpoint"), ) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go parseBaseURLs(ctx, srv) srv.Run() return nil diff --git a/cmd/links/parse.go b/cmd/links/parse.go new file mode 100644 index 0000000..0cbe8a4 --- /dev/null +++ b/cmd/links/parse.go @@ -0,0 +1,94 @@ +package main + +import ( + "context" + "fmt" + "links/core" + "links/models" + "time" + + sq "github.com/Masterminds/squirrel" + "netlandish.com/x/gobwebs/database" + "netlandish.com/x/gobwebs/server" + "netlandish.com/x/gobwebs/timezone" +) + +func runParse(ctx context.Context, srv *server.Server) error { + sig := make(chan int, 5) + dayAgo := time.Now().AddDate(0, 0, -1).UTC() + opts := &database.FilterOptions{ + Filter: sq.And{ + sq.Eq{"b.public_ready": false}, + sq.Lt{"b.parse_attempts": 3}, + sq.Gt{"b.counter": 0}, // Should only be > 0 if there are public links + sq.Or{ + sq.Eq{"b.last_parse_attempt": nil}, + sq.LtOrEq{"b.last_parse_attempt": dayAgo}, + }, + }, + OrderBy: "id", // oldest first + Limit: 2000, // TODO: Make this random or configurable? + } + + burls, err := models.GetBaseURLs(ctx, opts) + if err != nil { + return err + } + + parseCnt := 0 + for _, burl := range burls { + srv.QueueTask("import", core.ParseBaseURLTask(srv, burl, sig)) + parseCnt++ + if parseCnt == 5 { + loop: + // Loop until at least one finishes or the context is canceled + for { + select { + case <-ctx.Done(): + srv.Logger().Printf("runParse: Context canceled.") + return nil + case <-sig: + parseCnt-- + break loop + } + } + } + } + srv.Logger().Printf("runParse: Finished run") + return nil +} + +// This is a helper function to process base URL's that had errors +// when fetching them. +func parseBaseURLs(ctx context.Context, srv *server.Server) { + nctx, cancel := context.WithCancel(context.Background()) + nctx = server.ServerContext(nctx, srv) + nctx = database.Context(nctx, srv.DB) + nctx = timezone.Context(nctx, "UTC") + defer cancel() + + srv.Logger().Printf("parseBaseURLs: starting parser engine") + err := runParse(nctx, srv) + if err != nil { + srv.Logger().Printf("!! parseBaseURLs (first run): Error %v", err) + return + } + + ticker := time.NewTicker(30 * time.Minute) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + cancel() + fmt.Println("parseBaseURLs: Context canceled, shutting down.") + return + case <-ticker.C: + err = runParse(nctx, srv) + if err != nil { + srv.Logger().Printf("!! parseBaseURLs (loop run): Error %v", err) + return + } + } + } +} diff --git a/cmd/migrations.go b/cmd/migrations.go index 4552650..fa2c179 100644 --- a/cmd/migrations.go +++ b/cmd/migrations.go @@ -45,5 +45,12 @@ func GetMigrations() []migrate.Migration { 0, links.MigrateFS, ), + migrate.FSFileMigration( + "0004_add_parse_fields_baseurls", + "migrations/0004_add_parse_fields_baseurls.up.sql", + "migrations/0004_add_parse_fields_baseurls.down.sql", + 0, + links.MigrateFS, + ), } } diff --git a/core/import.go b/core/import.go index 1797d51..759c5b0 100644 --- a/core/import.go +++ b/core/import.go @@ -469,7 +469,6 @@ func ImportFromPinBoard(ctx context.Context, path string, } totalCount += listlen - fmt.Println("Processed", totalCount, "entries...") //time.Sleep(3 * time.Second) // Let the parse url workers catch up } else { break // No more items to process diff --git a/core/processors.go b/core/processors.go index 5720d32..9c38218 100644 --- a/core/processors.go +++ b/core/processors.go @@ -84,7 +84,7 @@ func ImportBookmarksTask(c echo.Context, origin int, path string, } // ParseBaseURLTask task to parse html title tags. -func ParseBaseURLTask(srv *server.Server, baseURL *models.BaseURL) *work.Task { +func ParseBaseURLTask(srv *server.Server, baseURL *models.BaseURL, sig chan int) *work.Task { return work.NewTask(func(ctx context.Context) error { ctx = server.ServerContext(ctx, srv) ctx = database.Context(ctx, srv.DB) @@ -105,6 +105,9 @@ func ParseBaseURLTask(srv *server.Server, baseURL *models.BaseURL) *work.Task { "Failed task ParseBaseURLTask %s after %d attempts: %v", task.Metadata["id"], task.Attempts(), task.Result()) } + if sig != nil { + sig <- 1 + } }) } diff --git a/helpers.go b/helpers.go index a5a8d47..b82b692 100644 --- a/helpers.go +++ b/helpers.go @@ -3,6 +3,7 @@ package links import ( "bytes" "context" + "database/sql" "encoding/json" "encoding/xml" "errors" @@ -402,16 +403,41 @@ func ParseBaseURL(ctx context.Context, baseURL *models.BaseURL) error { return err } + baseURL.ParseAttempts += 1 + baseURL.LastParseAttempt = sql.NullTime{Valid: true, Time: time.Now().UTC()} + userAgent := BuildUserAgent(ctx) req.Header.Set("User-Agent", userAgent) resp, err := client.Do(req) if err != nil { + parseErr := models.BaseURLParseError{ + Timestamp: time.Now().UTC().Unix(), + Attempt: baseURL.ParseAttempts, + ErrorMessage: err.Error(), + } + baseURL.Data.ParseErrors = append(baseURL.Data.ParseErrors, parseErr) + serr := baseURL.Store(ctx) + if serr != nil { + return serr + } return err } defer resp.Body.Close() if resp.StatusCode > 299 { - return fmt.Errorf("Failed to fetch %s, status code: %d", baseURL.URL, resp.StatusCode) + err = fmt.Errorf("Failed to fetch %s, status code: %d", baseURL.URL, resp.StatusCode) + parseErr := models.BaseURLParseError{ + Timestamp: time.Now().UTC().Unix(), + Attempt: baseURL.ParseAttempts, + StatusCode: resp.StatusCode, + ErrorMessage: err.Error(), + } + baseURL.Data.ParseErrors = append(baseURL.Data.ParseErrors, parseErr) + serr := baseURL.Store(ctx) + if serr != nil { + return serr + } + return err } hm := extract(resp.Body) @@ -423,7 +449,7 @@ func ParseBaseURL(ctx context.Context, baseURL *models.BaseURL) error { baseURL.Title = baseURL.Title[:150] } baseURL.PublicReady = true - baseURL.Data = models.BaseURLData{Meta: *hm} + baseURL.Data.Meta = *hm err = baseURL.Store(ctx) if err != nil { return err diff --git a/migrations/0004_add_parse_fields_baseurls.down.sql b/migrations/0004_add_parse_fields_baseurls.down.sql new file mode 100644 index 0000000..07fe6cb --- /dev/null +++ b/migrations/0004_add_parse_fields_baseurls.down.sql @@ -0,0 +1,2 @@ +ALTER TABLE base_urls DROP parse_attempts; +ALTER TABLE base_urls DROP last_parse_attempt; diff --git a/migrations/0004_add_parse_fields_baseurls.up.sql b/migrations/0004_add_parse_fields_baseurls.up.sql new file mode 100644 index 0000000..396c803 --- /dev/null +++ b/migrations/0004_add_parse_fields_baseurls.up.sql @@ -0,0 +1,2 @@ +ALTER TABLE base_urls ADD parse_attempts INT DEFAULT 0; +ALTER TABLE base_urls ADD last_parse_attempt TIMESTAMPTZ; diff --git a/models/base_url.go b/models/base_url.go index b8a96ef..1442e05 100644 --- a/models/base_url.go +++ b/models/base_url.go @@ -25,9 +25,18 @@ type HTMLMeta struct { SiteName string `json:"site_name"` } +// BaseURLParseError is an error record for failed parsing of BaseURL instances +type BaseURLParseError struct { + Attempt int `json:"attempt"` + Timestamp int64 `json:"timestamp"` + StatusCode int `json:"status_cude"` + ErrorMessage string `json:"error_msg"` +} + // BaseURLData holds metadata for the BaseURL model type BaseURLData struct { - Meta HTMLMeta `json:"meta"` + Meta HTMLMeta `json:"meta"` + ParseErrors []BaseURLParseError `json:"parse_errors,omitempty"` } // Value ... @@ -55,7 +64,7 @@ func GetBaseURLs(ctx context.Context, opts *database.FilterOptions) ([]*BaseURL, q := opts.GetBuilder(nil) rows, err := q. Columns("b.id", "b.url", "b.title", "b.counter", "b.data", "b.public_ready", "b.hash", - "b.created_on", "json_agg(t)::jsonb"). + "b.parse_attempts", "b.last_parse_attempt", "b.created_on", "json_agg(t)::jsonb"). From("base_urls b"). LeftJoin("org_links ol ON ol.base_url_id = b.id"). LeftJoin("tag_links tl ON tl.org_link_id = ol.id"). @@ -77,7 +86,8 @@ func GetBaseURLs(ctx context.Context, opts *database.FilterOptions) ([]*BaseURL, var url BaseURL var tags string if err = rows.Scan(&url.ID, &url.URL, &url.Title, &url.Counter, - &url.Data, &url.PublicReady, &url.Hash, &url.CreatedOn, &tags); err != nil { + &url.Data, &url.PublicReady, &url.Hash, &url.ParseAttempts, + &url.LastParseAttempt, &url.CreatedOn, &tags); err != nil { return err } re := regexp.MustCompile(`(,\s)?null,?`) @@ -116,13 +126,14 @@ func (b *BaseURL) Load(ctx context.Context) error { tz := timezone.ForContext(ctx) err := database.WithTx(ctx, database.TxOptionsRO, func(tx *sql.Tx) error { err := sq. - Select("id", "title", "url", "counter", "data", "public_ready", "hash", "created_on"). + Select("id", "title", "url", "counter", "data", "public_ready", "hash", + "parse_attempts", "last_parse_attempt", "created_on"). From("base_urls"). Where("id = ?", b.ID). PlaceholderFormat(sq.Dollar). RunWith(tx). ScanContext(ctx, &b.ID, &b.Title, &b.URL, &b.Counter, &b.Data, - &b.PublicReady, &b.Hash, &b.CreatedOn) + &b.PublicReady, &b.Hash, &b.ParseAttempts, &b.LastParseAttempt, &b.CreatedOn) if err != nil { if err == sql.ErrNoRows { return nil @@ -168,6 +179,8 @@ func (b *BaseURL) Store(ctx context.Context) error { Set("data", b.Data). Set("public_ready", b.PublicReady). Set("hash", b.Hash). + Set("parse_attempts", b.ParseAttempts). + Set("last_parse_attempt", b.LastParseAttempt). Where("id = ?", b.ID). Suffix(`RETURNING (updated_on)`). PlaceholderFormat(sq.Dollar). diff --git a/models/models.go b/models/models.go index 6728306..0a9e61a 100644 --- a/models/models.go +++ b/models/models.go @@ -54,15 +54,17 @@ type Organization struct { // BaseURL ... type BaseURL struct { - ID int `db:"id"` - Title string `db:"title"` - URL string `db:"url"` - Counter int `db:"counter"` - Data BaseURLData `db:"data"` - PublicReady bool `db:"public_ready"` - Hash string `db:"hash" json:"hash"` - CreatedOn time.Time `db:"created_on"` - UpdatedOn time.Time `db:"updated_on"` + ID int `db:"id"` + Title string `db:"title"` + URL string `db:"url"` + Counter int `db:"counter"` + Data BaseURLData `db:"data"` + PublicReady bool `db:"public_ready"` + Hash string `db:"hash" json:"hash"` + ParseAttempts int `db:"parse_attempts"` + LastParseAttempt sql.NullTime `db:"last_parse_attempt"` + CreatedOn time.Time `db:"created_on"` + UpdatedOn time.Time `db:"updated_on"` Tags []Tag `db:"-" json:"tags"` } diff --git a/models/schema.sql b/models/schema.sql index 16028d5..44d226c 100644 --- a/models/schema.sql +++ b/models/schema.sql @@ -151,6 +151,8 @@ CREATE TABLE base_urls ( hash VARCHAR(128) UNIQUE NOT NULL, data JSONB DEFAULT '{}', counter INT DEFAULT 0, + parse_attempts INT DEFAULT 0, + last_parse_attempt TIMESTAMPTZ, created_on TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP, updated_on TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP ); -- 2.45.3