Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions example/submitqueue/orchestrator/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ go_library(
"//extension/counter/mysql",
"//extension/messagequeue",
"//extension/messagequeue/mysql",
"//submitqueue/core/changeset",
"//submitqueue/core/consumer",
"//submitqueue/entity",
"//submitqueue/extension/buildrunner",
Expand Down
17 changes: 10 additions & 7 deletions example/submitqueue/orchestrator/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
mysqlcounter "github.com/uber/submitqueue/extension/counter/mysql"
extqueue "github.com/uber/submitqueue/extension/messagequeue"
queueMySQL "github.com/uber/submitqueue/extension/messagequeue/mysql"
"github.com/uber/submitqueue/submitqueue/core/changeset"
"github.com/uber/submitqueue/submitqueue/core/consumer"
"github.com/uber/submitqueue/submitqueue/entity"
"github.com/uber/submitqueue/submitqueue/extension/buildrunner"
Expand Down Expand Up @@ -230,7 +231,7 @@ func run() error {
// back to a baseline profile for queues without an explicit entry. This is
// the single place queue topology is known; the extension packages stay
// queue-agnostic.
queues, err := newQueueRegistry(logger, scope)
queues, err := newQueueRegistry(logger, scope, changeset.New(store.GetRequestStore(), store.GetChangeStore()))
if err != nil {
return fmt.Errorf("failed to build queue registry: %w", err)
}
Expand Down Expand Up @@ -796,7 +797,7 @@ func newPusher(logger *zap.Logger, scope tally.Scope) (pusher.Pusher, error) {
// conflict analyzer. Queues without an explicit profile fall back to the
// baseline. This is the one place queue topology lives; extension packages stay
// queue-agnostic.
func newQueueRegistry(logger *zap.Logger, scope tally.Scope) (queueRegistry, error) {
func newQueueRegistry(logger *zap.Logger, scope tally.Scope, resolver changeset.Resolver) (queueRegistry, error) {
mc, err := newMergeChecker(logger, scope)
if err != nil {
return queueRegistry{}, fmt.Errorf("failed to create merge checker: %w", err)
Expand Down Expand Up @@ -833,7 +834,8 @@ func newQueueRegistry(logger *zap.Logger, scope tally.Scope) (queueRegistry, err
changeProvider: cp,
pusher: psh,
buildRunner: buildfake.New(),
scorer: scorerfake.New(heuristic.New(
scorer: scorerfake.New(resolver, heuristic.New(
resolver,
[]heuristic.Bucket{{Min: 0, Max: 1<<31 - 1, Score: 0.5}},
batchLines, scope.SubScope("scorer.default"),
)),
Expand All @@ -845,7 +847,8 @@ func newQueueRegistry(logger *zap.Logger, scope tally.Scope) (queueRegistry, err
// test-queue: bucketed heuristic scorer; conservative (serialized) conflicts
// inherited from the baseline.
testQueue := base
testQueue.scorer = scorerfake.New(heuristic.New(
testQueue.scorer = scorerfake.New(resolver, heuristic.New(
resolver,
[]heuristic.Bucket{
{Min: 0, Max: 1, Score: 0.95},
{Min: 2, Max: 5, Score: 0.80},
Expand All @@ -858,10 +861,10 @@ func newQueueRegistry(logger *zap.Logger, scope tally.Scope) (queueRegistry, err
// e2e-test-queue: composite scorer; no conflicts (maximum parallelism).
e2eQueue := base
e2eQueue.analyzer = conflictfake.New(none.New(), nil)
e2eQueue.scorer = scorerfake.New(composite.New(
e2eQueue.scorer = scorerfake.New(resolver, composite.New(
map[string]scorer.Scorer{
"size": heuristic.New([]heuristic.Bucket{{Min: 0, Max: 1<<31 - 1, Score: 0.8}}, batchLines, scope),
"flat": heuristic.New([]heuristic.Bucket{{Min: 0, Max: 1<<31 - 1, Score: 0.6}}, batchLines, scope),
"size": heuristic.New(resolver, []heuristic.Bucket{{Min: 0, Max: 1<<31 - 1, Score: 0.8}}, batchLines, scope),
"flat": heuristic.New(resolver, []heuristic.Bucket{{Min: 0, Max: 1<<31 - 1, Score: 0.6}}, batchLines, scope),
},
composite.Avg, scope.SubScope("scorer.e2e-test-queue"),
))
Expand Down
8 changes: 4 additions & 4 deletions submitqueue/extension/scorer/composite/scorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,15 @@ func New(scorers map[string]scorer.Scorer, reduce ReduceFunc, scope tally.Scope)
}
}

// Score evaluates all child scorers and combines their results using the reduce function.
// If any child scorer returns an error, that error is returned immediately.
func (c *compositeScorer) Score(ctx context.Context, changes entity.BatchChanges) (ret float64, retErr error) {
// Score evaluates all child scorers on the batch and combines their results using the
// reduce function. If any child scorer returns an error, that error is returned immediately.
func (c *compositeScorer) Score(ctx context.Context, batch entity.Batch) (ret float64, retErr error) {
op := metrics.Begin(c.scope, "score")
defer func() { op.Complete(retErr) }()

scores := make(map[string]float64, len(c.scorers))
for name, s := range c.scorers {
score, err := s.Score(ctx, changes)
score, err := s.Score(ctx, batch)
if err != nil {
return 0, err
}
Expand Down
10 changes: 5 additions & 5 deletions submitqueue/extension/scorer/composite/scorer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ type fixedScorer struct {
score float64
}

func (f *fixedScorer) Score(_ context.Context, _ entity.BatchChanges) (float64, error) {
func (f *fixedScorer) Score(_ context.Context, _ entity.Batch) (float64, error) {
return f.score, nil
}

// errorScorer always returns an error.
type errorScorer struct{}

func (e *errorScorer) Score(_ context.Context, _ entity.BatchChanges) (float64, error) {
func (e *errorScorer) Score(_ context.Context, _ entity.Batch) (float64, error) {
return 0, fmt.Errorf("scorer failed")
}

Expand Down Expand Up @@ -99,7 +99,7 @@ func TestScorer_Score(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := New(tt.scorers, tt.reduce, tally.NoopScope)
got, err := s.Score(context.Background(), entity.BatchChanges{})
got, err := s.Score(context.Background(), entity.Batch{})
require.NoError(t, err)
assert.InDelta(t, tt.want, got, 1e-9)
})
Expand All @@ -111,7 +111,7 @@ func TestScorer_Score_ChildError(t *testing.T) {
"error": &errorScorer{},
"files": &fixedScorer{0.9},
}, Min, tally.NoopScope)
_, err := s.Score(context.Background(), entity.BatchChanges{})
_, err := s.Score(context.Background(), entity.Batch{})
require.Error(t, err)
}

Expand Down Expand Up @@ -140,7 +140,7 @@ func TestReduceFunc_ReceivesNames(t *testing.T) {
"files": &fixedScorer{0.9},
"deps": &fixedScorer{0.95},
}, custom, tally.NoopScope)
got, err := s.Score(context.Background(), entity.BatchChanges{})
got, err := s.Score(context.Background(), entity.Batch{})
require.NoError(t, err)
assert.Equal(t, 0.9, got)
assert.ElementsMatch(t, []string{"files", "deps"}, receivedNames)
Expand Down
3 changes: 3 additions & 0 deletions submitqueue/extension/scorer/fake/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
importpath = "github.com/uber/submitqueue/submitqueue/extension/scorer/fake",
visibility = ["//visibility:public"],
deps = [
"//submitqueue/core/changeset",
"//submitqueue/core/fakemarker",
"//submitqueue/entity",
"//submitqueue/extension/scorer",
Expand All @@ -17,6 +18,8 @@ go_test(
srcs = ["fake_test.go"],
embed = [":fake"],
deps = [
"//submitqueue/core/changeset",
"//submitqueue/core/changeset/fake",
"//submitqueue/entity",
"//submitqueue/extension/scorer",
"//submitqueue/extension/scorer/heuristic",
Expand Down
21 changes: 14 additions & 7 deletions submitqueue/extension/scorer/fake/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"context"
"fmt"

"github.com/uber/submitqueue/submitqueue/core/changeset"
"github.com/uber/submitqueue/submitqueue/core/fakemarker"
"github.com/uber/submitqueue/submitqueue/entity"
"github.com/uber/submitqueue/submitqueue/extension/scorer"
Expand All @@ -36,25 +37,31 @@ import (
const tokenError = "score-error"

// scorerFake decorates a delegate Scorer, injecting an error when a change URI
// carries the failure marker.
// carries the failure marker. It resolves the batch itself to inspect URIs.
type scorerFake struct {
resolver changeset.Resolver
delegate scorer.Scorer
}

// New returns a scorer.Scorer that delegates to the given scorer but returns an
// error when a change URI carries the "sq-fake=score-error" marker. The delegate
// is the existing scorer implementation to wrap (e.g. heuristic or composite).
func New(delegate scorer.Scorer) scorer.Scorer {
return scorerFake{delegate: delegate}
// error when a change URI carries the "sq-fake=score-error" marker. The resolver
// resolves the batch's changes so the marker can be inspected; the delegate is the
// existing scorer implementation to wrap (e.g. heuristic or composite).
func New(resolver changeset.Resolver, delegate scorer.Scorer) scorer.Scorer {
return scorerFake{resolver: resolver, delegate: delegate}
}

// Score returns an error when a change URI carries the failure marker; otherwise
// it delegates to the wrapped scorer.
func (s scorerFake) Score(ctx context.Context, changes entity.BatchChanges) (float64, error) {
func (s scorerFake) Score(ctx context.Context, batch entity.Batch) (float64, error) {
changes, err := s.resolver.DetailedForBatch(ctx, batch)
if err != nil {
return 0, err
}
if markerToken(changes) == tokenError {
return 0, fmt.Errorf("fake: marked score error")
}
return s.delegate.Score(ctx, changes)
return s.delegate.Score(ctx, batch)
}

// markerToken returns the marker token embedded in the first change URI that
Expand Down
40 changes: 24 additions & 16 deletions submitqueue/extension/scorer/fake/fake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,42 +21,50 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/uber-go/tally/v4"
"github.com/uber/submitqueue/submitqueue/core/changeset"
changesetfake "github.com/uber/submitqueue/submitqueue/core/changeset/fake"
"github.com/uber/submitqueue/submitqueue/entity"
"github.com/uber/submitqueue/submitqueue/extension/scorer"
"github.com/uber/submitqueue/submitqueue/extension/scorer/heuristic"
)

const batchID = "q/batch/1"

func TestNew_ImplementsInterface(t *testing.T) {
var _ scorer.Scorer = New(nil)
var _ scorer.Scorer = New(nil, nil)
}

// resolverFor returns a changeset resolver seeded so that batchID's detailed
// changes carry the given URIs.
func resolverFor(uris ...string) changeset.Resolver {
changes := make([]entity.ChangeInfo, 0, len(uris))
for _, u := range uris {
changes = append(changes, entity.ChangeInfo{URI: u})
}
return changesetfake.New().SetDetailed(batchID, entity.BatchChanges{BatchID: batchID, Queue: "q", Changes: changes})
}

// delegate returns a heuristic scorer that scores every batch at want.
func delegate(t *testing.T, want float64) scorer.Scorer {
t.Helper()
// delegate returns a heuristic scorer (backed by resolver) that scores every batch at want.
func delegate(resolver changeset.Resolver, want float64) scorer.Scorer {
return heuristic.New(
resolver,
[]heuristic.Bucket{{Min: 0, Max: 1<<31 - 1, Score: want}},
func(_ context.Context, c entity.BatchChanges) (int, error) { return len(c.Changes), nil },
tally.NoopScope,
)
}

func batch(uris ...string) entity.BatchChanges {
changes := make([]entity.ChangeInfo, 0, len(uris))
for _, u := range uris {
changes = append(changes, entity.ChangeInfo{URI: u})
}
return entity.BatchChanges{BatchID: "q/batch/1", Queue: "q", Changes: changes}
}

func TestScore_DelegatesWhenUnmarked(t *testing.T) {
s := New(delegate(t, 0.7))
got, err := s.Score(context.Background(), batch("github://o/r/pull/1/a"))
r := resolverFor("github://o/r/pull/1/a")
s := New(r, delegate(r, 0.7))
got, err := s.Score(context.Background(), entity.Batch{ID: batchID})
require.NoError(t, err)
assert.Equal(t, 0.7, got)
}

func TestScore_ErrorMarker(t *testing.T) {
s := New(delegate(t, 0.7))
_, err := s.Score(context.Background(), batch("github://o/r/pull/1/a?sq-fake=score-error"))
r := resolverFor("github://o/r/pull/1/a?sq-fake=score-error")
s := New(r, delegate(r, 0.7))
_, err := s.Score(context.Background(), entity.Batch{ID: batchID})
require.Error(t, err)
}
2 changes: 2 additions & 0 deletions submitqueue/extension/scorer/heuristic/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//core/metrics",
"//submitqueue/core/changeset",
"//submitqueue/entity",
"//submitqueue/extension/scorer",
"@com_github_uber_go_tally_v4//:tally",
Expand All @@ -18,6 +19,7 @@ go_test(
srcs = ["scorer_test.go"],
embed = [":heuristic"],
deps = [
"//submitqueue/core/changeset/fake",
"//submitqueue/entity",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
Expand Down
19 changes: 14 additions & 5 deletions submitqueue/extension/scorer/heuristic/scorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/uber-go/tally/v4"
"github.com/uber/submitqueue/core/metrics"
"github.com/uber/submitqueue/submitqueue/core/changeset"
"github.com/uber/submitqueue/submitqueue/entity"
"github.com/uber/submitqueue/submitqueue/extension/scorer"
)
Expand All @@ -40,6 +41,8 @@ type Bucket struct {
// heuristicScorer computes a success probability by bucketing a metric extracted from a batch of changes.
// It follows the Java HeuristicsBasedSuccessPredictor pattern.
type heuristicScorer struct {
// resolver resolves the batch identity into its detailed changes.
resolver changeset.Resolver
// buckets is the list of ranges to match against.
buckets []Bucket
// valueFunc extracts the numeric value from a batch of changes.
Expand All @@ -48,24 +51,30 @@ type heuristicScorer struct {
scope tally.Scope
}

// New creates a new heuristic Scorer with the given buckets and value function.
// New creates a new heuristic Scorer with the given resolver, buckets and value function.
// Panics if valueFunc is nil.
func New(buckets []Bucket, valueFunc ValueFunc, scope tally.Scope) scorer.Scorer {
func New(resolver changeset.Resolver, buckets []Bucket, valueFunc ValueFunc, scope tally.Scope) scorer.Scorer {
if valueFunc == nil {
panic("heuristic.New: valueFunc must not be nil")
}
return &heuristicScorer{
resolver: resolver,
buckets: buckets,
valueFunc: valueFunc,
scope: scope,
}
}

// Score extracts the value from the batch of changes, then returns the probability score for the
// first bucket whose range [Min, Max] contains the value. Returns an error if no bucket matches.
func (s *heuristicScorer) Score(ctx context.Context, changes entity.BatchChanges) (ret float64, retErr error) {
// Score resolves the batch's changes, extracts the metric, then returns the probability
// score for the first bucket whose range [Min, Max] contains the value. Returns an error
// if no bucket matches.
func (s *heuristicScorer) Score(ctx context.Context, batch entity.Batch) (ret float64, retErr error) {
op := metrics.Begin(s.scope, "score")
defer func() { op.Complete(retErr) }()
changes, err := s.resolver.DetailedForBatch(ctx, batch)
if err != nil {
return 0, err
}
value, err := s.valueFunc(ctx, changes)
if err != nil {
return 0, err
Expand Down
11 changes: 6 additions & 5 deletions submitqueue/extension/scorer/heuristic/scorer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/uber-go/tally/v4"
changesetfake "github.com/uber/submitqueue/submitqueue/core/changeset/fake"
"github.com/uber/submitqueue/submitqueue/entity"
)

Expand Down Expand Up @@ -106,8 +107,8 @@ func TestScorer_Score(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := New(tt.buckets, tt.valueFunc, tally.NoopScope)
got, err := s.Score(context.Background(), entity.BatchChanges{})
s := New(changesetfake.New(), tt.buckets, tt.valueFunc, tally.NoopScope)
got, err := s.Score(context.Background(), entity.Batch{})
if tt.wantErr {
require.Error(t, err)
return
Expand All @@ -122,13 +123,13 @@ func TestScorer_Score_ValueFuncError(t *testing.T) {
failing := func(_ context.Context, _ entity.BatchChanges) (int, error) {
return 0, assert.AnError
}
s := New([]Bucket{{Min: 0, Max: 10, Score: 0.9}}, failing, tally.NoopScope)
_, err := s.Score(context.Background(), entity.BatchChanges{})
s := New(changesetfake.New(), []Bucket{{Min: 0, Max: 10, Score: 0.9}}, failing, tally.NoopScope)
_, err := s.Score(context.Background(), entity.Batch{})
require.Error(t, err)
}

func TestNew_NilValueFunc(t *testing.T) {
assert.Panics(t, func() {
New([]Bucket{{Min: 0, Max: 10, Score: 0.85}}, nil, tally.NoopScope)
New(changesetfake.New(), []Bucket{{Min: 0, Max: 10, Score: 0.85}}, nil, tally.NoopScope)
})
}
8 changes: 4 additions & 4 deletions submitqueue/extension/scorer/mock/scorer_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading