Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 25 additions & 35 deletions core/src/main/java/com/google/adk/agents/BaseAgent.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@
import com.google.adk.agents.Callbacks.BeforeAgentCallback;
import com.google.adk.events.Event;
import com.google.adk.plugins.Plugin;
import com.google.adk.telemetry.Instrumentation;
import com.google.adk.telemetry.Instrumentation.AgentInvocation;
import com.google.adk.telemetry.Instrumentation.AgentInvocationTransformer;
import com.google.adk.utils.AgentEnums.AgentOrigin;
import com.google.common.collect.ImmutableList;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
Expand Down Expand Up @@ -324,39 +323,30 @@ private Flowable<Event> run(
InvocationContext parentContext,
Function<InvocationContext, Flowable<Event>> runImplementation) {
Context otelContext = Context.current();
return Flowable.using(
() ->
Instrumentation.recordAgentInvocation(
createInvocationContext(parentContext), this, otelContext),
agentInvocation -> {
InvocationContext invocationContext = agentInvocation.getCtx();
Flowable<Event> mainAndAfterEvents =
Flowable.defer(() -> runImplementation.apply(invocationContext))
.concatWith(
Flowable.defer(
() ->
callCallback(
afterCallbacksToFunctions(
invocationContext.pluginManager(), afterAgentCallback),
invocationContext)
.toFlowable()));

return callCallback(
beforeCallbacksToFunctions(
invocationContext.pluginManager(), beforeAgentCallback),
invocationContext)
.flatMapPublisher(
beforeEvent -> {
if (invocationContext.endInvocation()) {
return Flowable.just(beforeEvent);
}
return Flowable.just(beforeEvent).concatWith(mainAndAfterEvents);
})
.switchIfEmpty(mainAndAfterEvents)
.doOnNext(agentInvocation::addEvent)
.doOnError(agentInvocation::setError);
},
AgentInvocation::close);
InvocationContext invocationContext = createInvocationContext(parentContext);
Flowable<Event> mainAndAfterEvents =
Flowable.defer(() -> runImplementation.apply(invocationContext))
.concatWith(
Flowable.defer(
() ->
callCallback(
afterCallbacksToFunctions(
invocationContext.pluginManager(), afterAgentCallback),
invocationContext)
.toFlowable()));

return callCallback(
beforeCallbacksToFunctions(invocationContext.pluginManager(), beforeAgentCallback),
invocationContext)
.flatMapPublisher(
beforeEvent -> {
if (invocationContext.endInvocation()) {
return Flowable.just(beforeEvent);
}
return Flowable.just(beforeEvent).concatWith(mainAndAfterEvents);
})
.switchIfEmpty(mainAndAfterEvents)
.compose(new AgentInvocationTransformer(invocationContext, this, otelContext));
}

/**
Expand Down
86 changes: 32 additions & 54 deletions core/src/main/java/com/google/adk/flows/llmflows/Functions.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@
import com.google.adk.events.Event;
import com.google.adk.events.EventActions;
import com.google.adk.events.ToolConfirmation;
import com.google.adk.telemetry.Instrumentation;
import com.google.adk.telemetry.Instrumentation.ToolExecution;
import com.google.adk.telemetry.Instrumentation.ToolExecutionTransformer;
import com.google.adk.telemetry.Tracing;
import com.google.adk.tools.BaseTool;
import com.google.adk.tools.FunctionTool;
Expand Down Expand Up @@ -290,43 +289,42 @@ private static Function<FunctionCall, Maybe<Event>> getFunctionCallMapper(
Context parentContext) {
return functionCall ->
Maybe.defer(
() -> {
BaseTool tool = tools.get(functionCall.name().get());
ToolContext toolContext =
ToolContext.builder(invocationContext)
.functionCallId(functionCall.id().orElse(""))
.toolConfirmation(
functionCall.id().map(toolConfirmations::get).orElse(null))
.build();

Map<String, Object> functionArgs =
functionCall.args().map(HashMap::new).orElse(new HashMap<>());

Maybe<Map<String, Object>> maybeFunctionResult =
maybeInvokeBeforeToolCall(invocationContext, tool, functionArgs, toolContext)
.switchIfEmpty(
Maybe.defer(
() ->
isLive
? processFunctionLive(
invocationContext,
tool,
toolContext,
functionCall,
functionArgs)
: callTool(tool, functionArgs, toolContext))
.compose(Tracing.withContext(parentContext)));

return postProcessFunctionResult(
() -> {
BaseTool tool = tools.get(functionCall.name().get());
ToolContext toolContext =
ToolContext.builder(invocationContext)
.functionCallId(functionCall.id().orElse(""))
.toolConfirmation(functionCall.id().map(toolConfirmations::get).orElse(null))
.build();

Map<String, Object> functionArgs =
functionCall.args().map(HashMap::new).orElse(new HashMap<>());

Maybe<Map<String, Object>> maybeFunctionResult =
maybeInvokeBeforeToolCall(invocationContext, tool, functionArgs, toolContext)
.switchIfEmpty(
Maybe.defer(
() ->
isLive
? processFunctionLive(
invocationContext,
tool,
toolContext,
functionCall,
functionArgs)
: callTool(tool, functionArgs, toolContext)));

return processFunctionResult(
maybeFunctionResult,
invocationContext,
tool,
functionArgs,
toolContext,
isLive,
parentContext);
})
.compose(Tracing.withContext(parentContext));
isLive)
.compose(
new ToolExecutionTransformer(
tool, invocationContext.agent(), functionArgs, parentContext));
});
}

/**
Expand Down Expand Up @@ -424,26 +422,6 @@ public static Set<String> getLongRunningFunctionCalls(
return longRunningFunctionCalls;
}

private static Maybe<Event> postProcessFunctionResult(
Maybe<Map<String, Object>> maybeFunctionResult,
InvocationContext invocationContext,
BaseTool tool,
Map<String, Object> functionArgs,
ToolContext toolContext,
boolean isLive,
Context parentContext) {
return Maybe.using(
() ->
Instrumentation.recordToolExecution(
tool, invocationContext.agent(), functionArgs, parentContext),
toolExecution ->
processFunctionResult(
maybeFunctionResult, invocationContext, tool, functionArgs, toolContext, isLive)
.doOnSuccess(event -> toolExecution.context().setFunctionResponseEvent(event))
.doOnError(toolExecution::setError),
ToolExecution::close);
}

private static Maybe<Event> processFunctionResult(
Maybe<Map<String, Object>> maybeFunctionResult,
InvocationContext invocationContext,
Expand Down
76 changes: 56 additions & 20 deletions core/src/main/java/com/google/adk/telemetry/Instrumentation.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,19 @@
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.FlowableTransformer;
import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.MaybeSource;
import io.reactivex.rxjava3.core.MaybeTransformer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.jspecify.annotations.Nullable;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -67,7 +73,7 @@ public void setFunctionResponseEvent(@Nullable Event functionResponseEvent) {
public abstract static class ClosableTelemetryScope implements AutoCloseable {
protected final long startTimeNanos;
protected final Span span;
protected final Scope scope;
protected Scope scope;
protected final TelemetryContext telemetryContext;
protected @Nullable Throwable caughtError;
protected final AtomicBoolean closed = new AtomicBoolean(false);
Expand All @@ -80,6 +86,11 @@ public abstract static class ClosableTelemetryScope implements AutoCloseable {
this.telemetryContext = new TelemetryContext(Context.current());
}

@SuppressWarnings("MustBeClosedChecker")
public void makeCurrent() {
this.scope = span.makeCurrent();
}

public TelemetryContext context() {
return telemetryContext;
}
Expand Down Expand Up @@ -136,10 +147,6 @@ public AgentInvocation(InvocationContext ctx, BaseAgent agent, Context parentCon
Tracing.traceAgentInvocation(span, agent.name(), agent.description(), ctx);
}

public InvocationContext getCtx() {
return ctx;
}

public void addEvent(Event event) {
events.add(event);
}
Expand Down Expand Up @@ -203,24 +210,53 @@ protected void handleMetricsError(RuntimeException e) {
}
}

/** Creates an AgentInvocation context to record agent invocation telemetry. */
public static AgentInvocation recordAgentInvocation(InvocationContext ctx, BaseAgent agent) {
return recordAgentInvocation(ctx, agent, Context.current());
}
/** A transformer that manages an AgentInvocation telemetry scope for RxJava streams. */
public static final class AgentInvocationTransformer
implements FlowableTransformer<Event, Event> {
private final AgentInvocation agentInvocation;

public static AgentInvocation recordAgentInvocation(
InvocationContext ctx, BaseAgent agent, Context parentContext) {
return new AgentInvocation(ctx, agent, parentContext);
}
public AgentInvocationTransformer(
InvocationContext ctx, BaseAgent agent, Context parentContext) {
this.agentInvocation = new AgentInvocation(ctx, agent, parentContext);
}

/** Creates a ToolExecution context to record tool execution telemetry. */
public static ToolExecution recordToolExecution(
BaseTool tool, BaseAgent agent, Map<String, Object> functionArgs) {
return recordToolExecution(tool, agent, functionArgs, Context.current());
@Override
public Publisher<Event> apply(Flowable<Event> upstream) {
return Flowable.using(
() -> {
agentInvocation.makeCurrent();
return agentInvocation;
},
agentInvocation ->
upstream.doOnNext(agentInvocation::addEvent).doOnError(agentInvocation::setError),
AgentInvocation::close);
}
}

public static ToolExecution recordToolExecution(
BaseTool tool, BaseAgent agent, Map<String, Object> functionArgs, Context parentContext) {
return new ToolExecution(tool, agent, functionArgs, parentContext);
/** A transformer that manages a ToolExecution telemetry scope for RxJava Maybe streams. */
public static final class ToolExecutionTransformer implements MaybeTransformer<Event, Event> {
private final BaseTool tool;
private final BaseAgent agent;
private final Map<String, Object> functionArgs;
private final Context parentContext;

public ToolExecutionTransformer(
BaseTool tool, BaseAgent agent, Map<String, Object> functionArgs, Context parentContext) {
this.tool = tool;
this.agent = agent;
this.functionArgs = functionArgs;
this.parentContext = parentContext;
}

@Override
public MaybeSource<Event> apply(Maybe<Event> upstream) {
return Maybe.using(
() -> new ToolExecution(tool, agent, functionArgs, parentContext),
toolExecution ->
upstream
.doOnSuccess(event -> toolExecution.context().setFunctionResponseEvent(event))
.doOnError(toolExecution::setError),
ToolExecution::close);
}
}
}
Loading