Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions apps/webapp/app/presenters/v3/QueueListPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ export class QueueListPresenter extends BasePresenter {
concurrencyLimitBase: true,
concurrencyLimitOverriddenAt: true,
concurrencyLimitOverriddenBy: true,
rateLimit: true,
type: true,
paused: true,
},
Expand Down Expand Up @@ -163,6 +164,7 @@ export class QueueListPresenter extends BasePresenter {
concurrencyLimitOverriddenBy: queue.concurrencyLimitOverriddenBy
? overriddenByMap.get(queue.concurrencyLimitOverriddenBy) ?? null
: null,
rateLimit: queue.rateLimit,
paused: queue.paused,
})
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ export class QueueRetrievePresenter extends BasePresenter {
concurrencyLimitBase: queue.concurrencyLimitBase ?? null,
concurrencyLimitOverriddenAt: queue.concurrencyLimitOverriddenAt ?? null,
concurrencyLimitOverriddenBy: queue.concurrencyLimitOverriddenBy ?? null,
rateLimit: queue.rateLimit,
paused: queue.paused,
}),
};
Expand Down Expand Up @@ -144,6 +145,7 @@ export function toQueueItem(data: {
concurrencyLimitBase: number | null;
concurrencyLimitOverriddenAt: Date | null;
concurrencyLimitOverriddenBy: User | null;
rateLimit: any;
paused: boolean;
}): QueueItem & { releaseConcurrencyOnWaitpoint: boolean } {
return {
Expand All @@ -162,6 +164,7 @@ export function toQueueItem(data: {
overriddenBy: toQueueConcurrencyOverriddenBy(data.concurrencyLimitOverriddenBy),
overriddenAt: data.concurrencyLimitOverriddenAt,
},
rateLimit: data.rateLimit as any,
// TODO: This needs to be removed but keeping this here for now to avoid breaking existing clients
releaseConcurrencyOnWaitpoint: true,
};
Expand Down
1 change: 1 addition & 0 deletions apps/webapp/app/presenters/v3/TaskPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ export class TaskPresenter {
},
},
},
queueConfig: true,
},
where: {
friendlyId: taskFriendlyId,
Expand Down

Large diffs are not rendered by default.

14 changes: 14 additions & 0 deletions apps/webapp/app/v3/marqs/index.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,20 @@ export class MarQS {
return this.redis.del(this.keys.queueConcurrencyLimitKey(env, queue));
}

public async updateQueueRateLimits(
env: AuthenticatedEnvironment,
queue: string,
rateLimits: Array<{ limit: number; window: number }>
) {
// For now, we just store it in redis as JSON. The engine will need to read it.
// We need a key for rate limits. Let's assume `queueRateLimitKey` exists or we create it.
return this.redis.set(this.keys.queueRateLimitKey(env, queue), JSON.stringify(rateLimits));
}

public async removeQueueRateLimits(env: AuthenticatedEnvironment, queue: string) {
return this.redis.del(this.keys.queueRateLimitKey(env, queue));
}

public async updateEnvConcurrencyLimits(env: AuthenticatedEnvironment) {
const envConcurrencyLimitKey = this.keys.envConcurrencyLimitKey(env);

Expand Down
4 changes: 4 additions & 0 deletions apps/webapp/app/v3/marqs/marqsKeyProducer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ export class MarQSShortKeyProducer implements MarQSKeyProducer {
return [this.queueKey(env, queue), constants.CONCURRENCY_LIMIT_PART].join(":");
}

queueRateLimitKey(env: MarQSKeyProducerEnv, queue: string) {
return [this.queueKey(env, queue), "rateLimit"].join(":");
}

envConcurrencyLimitKey(envId: string): string;
envConcurrencyLimitKey(env: MarQSKeyProducerEnv): string;
envConcurrencyLimitKey(envOrId: MarQSKeyProducerEnv | string): string {
Expand Down
1 change: 1 addition & 0 deletions apps/webapp/app/v3/marqs/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ export type MarQSKeyProducerEnv = {

export interface MarQSKeyProducer {
queueConcurrencyLimitKey(env: MarQSKeyProducerEnv, queue: string): string;
queueRateLimitKey(env: MarQSKeyProducerEnv, queue: string): string;

envConcurrencyLimitKey(envId: string): string;
envConcurrencyLimitKey(env: MarQSKeyProducerEnv): string;
Expand Down
23 changes: 23 additions & 0 deletions apps/webapp/app/v3/runQueue.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,26 @@ export async function removeQueueConcurrencyLimits(
engine.runQueue.removeQueueConcurrencyLimits(environment, queueName),
]);
}

/** Updates MARQS and the RunQueue rate limits for a queue */
export async function updateQueueRateLimits(
environment: AuthenticatedEnvironment,
queueName: string,
rateLimits: Array<{ limit: number; window: number }>
) {
await Promise.allSettled([
marqs?.updateQueueRateLimits?.(environment, queueName, rateLimits),
engine.runQueue.updateQueueRateLimits?.(environment, queueName, rateLimits),
]);
}

/** Removes MARQS and the RunQueue rate limits for a queue */
export async function removeQueueRateLimits(
environment: AuthenticatedEnvironment,
queueName: string
) {
await Promise.allSettled([
marqs?.removeQueueRateLimits?.(environment, queueName),
engine.runQueue.removeQueueRateLimits?.(environment, queueName),
]);
}
41 changes: 41 additions & 0 deletions apps/webapp/app/v3/services/rateLimitSystem.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import { PrismaClient, Prisma } from "@trigger.dev/database";
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { removeQueueRateLimits, updateQueueRateLimits } from "../runQueue.server";

export class RateLimitSystem {
constructor(
private prisma: PrismaClient
) {}

async overrideQueueRateLimit(
environment: AuthenticatedEnvironment,
queueName: string,
rateLimits: Array<{ limit: number; window: number }>
) {
const queue = await this.prisma.taskQueue.updateMany({
where: {
runtimeEnvironmentId: environment.id,
name: queueName,
},
data: {
rateLimit: rateLimits,
},
});

await updateQueueRateLimits(environment, queueName, rateLimits);
}

async resetQueueRateLimit(environment: AuthenticatedEnvironment, queueName: string) {
await this.prisma.taskQueue.updateMany({
where: {
runtimeEnvironmentId: environment.id,
name: queueName,
},
data: {
rateLimit: Prisma.DbNull,
},
});

await removeQueueRateLimits(environment, queueName);
}
}
8 changes: 8 additions & 0 deletions apps/webapp/app/v3/services/rateLimitSystemInstance.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import { prisma } from "~/db.server";
import { RateLimitSystem } from "./rateLimitSystem.server";
import { singleton } from "~/utils/singleton";

export const rateLimitSystem = singleton(
"rateLimitSystem",
() => new RateLimitSystem(prisma)
);
82 changes: 82 additions & 0 deletions apps/webapp/test/rateLimitSystem.server.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import { describe, it, expect, vi, beforeEach } from "vitest";
import { RateLimitSystem } from "../app/v3/services/rateLimitSystem.server";
import { PrismaClient, Prisma } from "@trigger.dev/database";
import { Redis } from "ioredis";
import { AuthenticatedEnvironment } from "../app/services/apiAuth.server";
import * as runQueueServer from "../app/v3/runQueue.server";

vi.mock("../app/v3/runQueue.server", () => ({
updateQueueRateLimits: vi.fn(),
removeQueueRateLimits: vi.fn(),
}));

describe("RateLimitSystem", () => {
let prismaMock: any;
let redisMock: any;
let rateLimitSystem: RateLimitSystem;
let mockEnvironment: AuthenticatedEnvironment;

beforeEach(() => {
prismaMock = {
taskQueue: {
updateMany: vi.fn().mockResolvedValue({ count: 1 }),
},
};

rateLimitSystem = new RateLimitSystem(prismaMock as unknown as PrismaClient);

mockEnvironment = {
id: "env-123",
} as AuthenticatedEnvironment;

vi.clearAllMocks();
});

describe("overrideQueueRateLimit", () => {
it("should update the rateLimit field in the database and call the Redis sync method", async () => {
const queueName = "test-queue";
const rateLimits = [{ limit: 10, window: 60 }];

await rateLimitSystem.overrideQueueRateLimit(mockEnvironment, queueName, rateLimits);

expect(prismaMock.taskQueue.updateMany).toHaveBeenCalledWith({
where: {
runtimeEnvironmentId: mockEnvironment.id,
name: queueName,
},
data: {
rateLimit: rateLimits,
},
});

expect(runQueueServer.updateQueueRateLimits).toHaveBeenCalledWith(
mockEnvironment,
queueName,
rateLimits
);
});
});

describe("resetQueueRateLimit", () => {
it("should clear the rateLimit field in the database and call the Redis sync method", async () => {
const queueName = "test-queue";

await rateLimitSystem.resetQueueRateLimit(mockEnvironment, queueName);

expect(prismaMock.taskQueue.updateMany).toHaveBeenCalledWith({
where: {
runtimeEnvironmentId: mockEnvironment.id,
name: queueName,
},
data: {
rateLimit: Prisma.DbNull,
},
});

expect(runQueueServer.removeQueueRateLimits).toHaveBeenCalledWith(
mockEnvironment,
queueName
);
});
});
});
101 changes: 101 additions & 0 deletions apps/webapp/test/rateLimitUI.e2e.full.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import { describe, expect, it } from "vitest";
import { getTestServer } from "./helpers/sharedTestServer";
import { seedTestSession } from "./helpers/seedTestSession";
import { seedTestUserProject } from "./helpers/seedTestUserProject";

describe("Rate Limiting UI", () => {
it("should override and remove queue limits via the UI action", async () => {
const server = getTestServer();
const { user, organization, project, environment } = await seedTestUserProject(server.prisma);
await server.prisma.user.update({
where: { id: user.id },
data: { confirmedBasicDetails: true },
});
const cookie = await seedTestSession({ userId: user.id });

// Get the org member
const orgMember = await server.prisma.orgMember.findFirst({
where: { userId: user.id, organizationId: organization.id },
});

// Update environment to have a high maximumConcurrencyLimit and link to orgMember
await server.prisma.runtimeEnvironment.update({
where: { id: environment.id },
data: {
maximumConcurrencyLimit: 100,
orgMemberId: orgMember?.id,
},
});

// Create a queue
const queue = await server.prisma.taskQueue.create({
data: {
name: "test-queue",
friendlyId: "queue_12345",
type: "NAMED",
runtimeEnvironmentId: environment.id,
projectId: project.id,
concurrencyLimit: 5,
},
});

const path = `/orgs/${organization.slug}/projects/${project.slug}/env/${environment.slug}/queues`;

// 1. Override limits
const overrideFormData = new URLSearchParams();
overrideFormData.append("action", "queue-override");
overrideFormData.append("friendlyId", queue.friendlyId);
overrideFormData.append("concurrencyLimit", "5");
overrideFormData.append("rateLimits", JSON.stringify([{ limit: 10, window: 60 }]));

const overrideRes = await server.webapp.fetch(path, {
method: "POST",
body: overrideFormData.toString(),
headers: {
"Content-Type": "application/x-www-form-urlencoded",
Cookie: cookie,
},
redirect: "manual",
});

expect(overrideRes.status).toBe(302);
const location = overrideRes.headers.get("location");
if (location?.includes("error")) {
throw new Error(`Redirected with error: ${location}`);
}

// Verify database
const updatedQueue = await server.prisma.taskQueue.findUnique({
where: { id: queue.id },
});

expect(updatedQueue?.concurrencyLimit).toBe(5);
expect(updatedQueue?.rateLimit).toEqual([{ limit: 10, window: 60 }]);

// 2. Remove override
const removeFormData = new URLSearchParams();
removeFormData.append("action", "queue-remove-override");
removeFormData.append("friendlyId", queue.friendlyId);

const removeRes = await server.webapp.fetch(path, {
method: "POST",
body: removeFormData.toString(),
headers: {
"Content-Type": "application/x-www-form-urlencoded",
Cookie: cookie,
},
redirect: "manual",
});

expect(removeRes.status).toBe(302);

// Verify database
const resetQueue = await server.prisma.taskQueue.findUnique({
where: { id: queue.id },
});

// Concurrency limit is reset to base (which was 5)
expect(resetQueue?.concurrencyLimit).toBe(5);
expect(resetQueue?.rateLimit).toBe(null);
});
});
1 change: 1 addition & 0 deletions docs/docs.json
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
]
},
"queue-concurrency",
"rate-limiting",
"versioning",
"machines",
"idempotency",
Expand Down
3 changes: 3 additions & 0 deletions docs/introduction.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ We provide everything you need to build and manage background tasks: a CLI and S
<Card title="Concurrency & Queues" icon="line-height" href="/queue-concurrency" color="#D946EF">
Configure what you want to happen when there is more than one run at a time.
</Card>
<Card title="Rate Limiting" icon="gauge-high" href="/rate-limiting" color="#F59E0B">
Control how many runs can execute within a specific time window.
</Card>
<Card
title="Wait for token (human-in-the-loop)"
icon="hand"
Expand Down
Loading