Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import akka.javasdk.agent.task.BacklogEntity;
import akka.javasdk.agent.task.BacklogEvent;
import akka.javasdk.agent.task.BacklogState;
import akka.javasdk.agent.task.TaskKey;
import akka.javasdk.testkit.EventSourcedResult;
import akka.javasdk.testkit.EventSourcedTestKit;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -39,7 +40,8 @@ public void shouldCreateIdempotently() {
@Test
public void shouldAddTask() {
var testKit = EventSourcedTestKit.of(BacklogEntity::new);
EventSourcedResult<Done> result = testKit.method(BacklogEntity::addTask).invoke("task-1");
EventSourcedResult<Done> result =
testKit.method(BacklogEntity::addTask).invoke(new TaskKey("task-1", "Task One"));
assertThat(result.getReply()).isEqualTo(done());
result.getNextEventOfType(BacklogEvent.TaskAdded.class);
assertThat(testKit.getState().containsTask("task-1")).isTrue();
Expand All @@ -49,21 +51,23 @@ public void shouldAddTask() {
@Test
public void shouldAddTaskIdempotently() {
var testKit = EventSourcedTestKit.of(BacklogEntity::new);
testKit.method(BacklogEntity::addTask).invoke("task-1");
EventSourcedResult<Done> result = testKit.method(BacklogEntity::addTask).invoke("task-1");
testKit.method(BacklogEntity::addTask).invoke(new TaskKey("task-1", "Task One"));
EventSourcedResult<Done> result =
testKit.method(BacklogEntity::addTask).invoke(new TaskKey("task-1", "Task One"));
assertThat(result.getReply()).isEqualTo(done());
assertThat(result.getAllEvents()).isEmpty();
}

@Test
public void shouldClaimTask() {
var testKit = EventSourcedTestKit.of(BacklogEntity::new);
testKit.method(BacklogEntity::addTask).invoke("task-1");
testKit.method(BacklogEntity::addTask).invoke(new TaskKey("task-1", "Task One"));

EventSourcedResult<Done> result =
testKit
.method(BacklogEntity::claim)
.invoke(new BacklogEntity.ClaimRequest("task-1", "agent-1"));

assertThat(result.getReply()).isEqualTo(done());
result.getNextEventOfType(BacklogEvent.TaskClaimed.class);
assertThat(testKit.getState().isClaimed("task-1")).isTrue();
Expand All @@ -83,7 +87,7 @@ public void shouldRejectClaimForNonExistentTask() {
@Test
public void shouldRejectClaimForAlreadyClaimedTask() {
var testKit = EventSourcedTestKit.of(BacklogEntity::new);
testKit.method(BacklogEntity::addTask).invoke("task-1");
testKit.method(BacklogEntity::addTask).invoke(new TaskKey("task-1", "Task One"));
testKit
.method(BacklogEntity::claim)
.invoke(new BacklogEntity.ClaimRequest("task-1", "agent-1"));
Expand All @@ -98,7 +102,7 @@ public void shouldRejectClaimForAlreadyClaimedTask() {
@Test
public void shouldReleaseClaimedTask() {
var testKit = EventSourcedTestKit.of(BacklogEntity::new);
testKit.method(BacklogEntity::addTask).invoke("task-1");
testKit.method(BacklogEntity::addTask).invoke(new TaskKey("task-1", "Task One"));
testKit
.method(BacklogEntity::claim)
.invoke(new BacklogEntity.ClaimRequest("task-1", "agent-1"));
Expand All @@ -112,7 +116,7 @@ public void shouldReleaseClaimedTask() {
@Test
public void shouldReleaseUnclaimedTaskIdempotently() {
var testKit = EventSourcedTestKit.of(BacklogEntity::new);
testKit.method(BacklogEntity::addTask).invoke("task-1");
testKit.method(BacklogEntity::addTask).invoke(new TaskKey("task-1", "Task One"));

EventSourcedResult<Done> result = testKit.method(BacklogEntity::release).invoke("task-1");
assertThat(result.getReply()).isEqualTo(done());
Expand All @@ -122,7 +126,7 @@ public void shouldReleaseUnclaimedTaskIdempotently() {
@Test
public void shouldTransferTask() {
var testKit = EventSourcedTestKit.of(BacklogEntity::new);
testKit.method(BacklogEntity::addTask).invoke("task-1");
testKit.method(BacklogEntity::addTask).invoke(new TaskKey("task-1", "Task One"));
testKit
.method(BacklogEntity::claim)
.invoke(new BacklogEntity.ClaimRequest("task-1", "agent-1"));
Expand All @@ -140,9 +144,9 @@ public void shouldTransferTask() {
@Test
public void shouldCancelUnclaimed() {
var testKit = EventSourcedTestKit.of(BacklogEntity::new);
testKit.method(BacklogEntity::addTask).invoke("task-1");
testKit.method(BacklogEntity::addTask).invoke("task-2");
testKit.method(BacklogEntity::addTask).invoke("task-3");
testKit.method(BacklogEntity::addTask).invoke(new TaskKey("task-1", "Task One"));
testKit.method(BacklogEntity::addTask).invoke(new TaskKey("task-2", "Task Two"));
testKit.method(BacklogEntity::addTask).invoke(new TaskKey("task-3", "Task Three"));
testKit
.method(BacklogEntity::claim)
.invoke(new BacklogEntity.ClaimRequest("task-2", "agent-1"));
Expand Down Expand Up @@ -199,14 +203,15 @@ public void shouldRejectAddTaskWhenClosed() {
var testKit = EventSourcedTestKit.of(BacklogEntity::new);
testKit.method(BacklogEntity::close).invoke();

EventSourcedResult<Done> result = testKit.method(BacklogEntity::addTask).invoke("task-1");
EventSourcedResult<Done> result =
testKit.method(BacklogEntity::addTask).invoke(new TaskKey("task-1", "Task One"));
assertThat(result.getError()).isEqualTo("Backlog is closed");
}

@Test
public void shouldRejectClaimWhenClosed() {
var testKit = EventSourcedTestKit.of(BacklogEntity::new);
testKit.method(BacklogEntity::addTask).invoke("task-1");
testKit.method(BacklogEntity::addTask).invoke(new TaskKey("task-1", "Task One"));
testKit.method(BacklogEntity::close).invoke();

EventSourcedResult<Done> result =
Expand All @@ -219,7 +224,7 @@ public void shouldRejectClaimWhenClosed() {
@Test
public void shouldRejectReleaseWhenClosed() {
var testKit = EventSourcedTestKit.of(BacklogEntity::new);
testKit.method(BacklogEntity::addTask).invoke("task-1");
testKit.method(BacklogEntity::addTask).invoke(new TaskKey("task-1", "Task One"));
testKit
.method(BacklogEntity::claim)
.invoke(new BacklogEntity.ClaimRequest("task-1", "agent-1"));
Expand All @@ -232,7 +237,7 @@ public void shouldRejectReleaseWhenClosed() {
@Test
public void shouldRejectTransferWhenClosed() {
var testKit = EventSourcedTestKit.of(BacklogEntity::new);
testKit.method(BacklogEntity::addTask).invoke("task-1");
testKit.method(BacklogEntity::addTask).invoke(new TaskKey("task-1", "Task One"));
testKit
.method(BacklogEntity::claim)
.invoke(new BacklogEntity.ClaimRequest("task-1", "agent-1"));
Expand All @@ -248,7 +253,7 @@ public void shouldRejectTransferWhenClosed() {
@Test
public void shouldRejectCancelUnclaimedWhenClosed() {
var testKit = EventSourcedTestKit.of(BacklogEntity::new);
testKit.method(BacklogEntity::addTask).invoke("task-1");
testKit.method(BacklogEntity::addTask).invoke(new TaskKey("task-1", "Task One"));
testKit.method(BacklogEntity::close).invoke();

EventSourcedResult<Done> result = testKit.method(BacklogEntity::cancelUnclaimed).invoke();
Expand All @@ -258,7 +263,7 @@ public void shouldRejectCancelUnclaimedWhenClosed() {
@Test
public void shouldAllowGetStateWhenClosed() {
var testKit = EventSourcedTestKit.of(BacklogEntity::new);
testKit.method(BacklogEntity::addTask).invoke("task-1");
testKit.method(BacklogEntity::addTask).invoke(new TaskKey("task-1", "Task One"));
testKit.method(BacklogEntity::close).invoke();

EventSourcedResult<BacklogState> result = testKit.method(BacklogEntity::getState).invoke();
Expand All @@ -269,8 +274,8 @@ public void shouldAllowGetStateWhenClosed() {
@Test
public void shouldGetState() {
var testKit = EventSourcedTestKit.of(BacklogEntity::new);
testKit.method(BacklogEntity::addTask).invoke("task-1");
testKit.method(BacklogEntity::addTask).invoke("task-2");
testKit.method(BacklogEntity::addTask).invoke(new TaskKey("task-1", "Task One"));
testKit.method(BacklogEntity::addTask).invoke(new TaskKey("task-2", "Task Two"));
testKit
.method(BacklogEntity::claim)
.invoke(new BacklogEntity.ClaimRequest("task-1", "agent-1"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ private String newTaskId() {
public void shouldCreateTask() {
var taskId = newTaskId();
var created = componentClient.forTask(taskId).create(TEST_TASK.instructions("do something"));
assertThat(created).isEqualTo(taskId);
assertThat(created.id()).isEqualTo(taskId);
assertThat(created.name()).isEqualTo(TEST_TASK.name());
}

// --- get ---
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,11 @@ public void shouldPauseAndResumeAgent() {
var state = agentClient.getState();
assertThat(state.paused()).isTrue();

var taskId =
var taskKey =
componentClient
.forTask(UUID.randomUUID().toString())
.create(TestTasks.TEST_TASK.instructions("Do something."));
agentClient.assignTasks(taskId);
agentClient.assignTasks(taskKey);

agentClient.resume();

Expand All @@ -92,7 +92,7 @@ public void shouldPauseAndResumeAgent() {
.atMost(10, TimeUnit.SECONDS)
.untilAsserted(
() -> {
var snapshot = componentClient.forTask(taskId).get(TestTasks.TEST_TASK);
var snapshot = componentClient.forTask(taskKey.id()).get(TestTasks.TEST_TASK);
assertThat(snapshot.result()).isNotNull();
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,17 @@ public Effect<Done> create(String name) {
return effects().persist(new BacklogEvent.BacklogCreated(name)).thenReply(__ -> done());
}

/** Add a task ID reference to this backlog. The task must already exist in TaskEntity. */
public Effect<Done> addTask(String taskId) {
/** Add a task reference to this backlog. The task must already exist in TaskEntity. */
public Effect<Done> addTask(TaskKey taskKey) {
if (currentState().closed()) {
return closedError();
}
if (currentState().containsTask(taskId)) {
if (currentState().containsTask(taskKey.id())) {
return effects().reply(done()); // idempotent
}
return effects().persist(new BacklogEvent.TaskAdded(taskId)).thenReply(__ -> done());
return effects()
.persist(new BacklogEvent.TaskAdded(taskKey.id(), taskKey.name()))
.thenReply(__ -> done());
}

/** Atomic first-come-first-served claim. */
Expand Down Expand Up @@ -128,7 +130,7 @@ private Effect<Done> closedError() {
public BacklogState applyEvent(BacklogEvent event) {
return switch (event) {
case BacklogEvent.BacklogCreated e -> currentState().withName(e.name());
case BacklogEvent.TaskAdded e -> currentState().withTaskAdded(e.taskId());
case BacklogEvent.TaskAdded e -> currentState().withTaskAdded(e.taskId(), e.taskName());
case BacklogEvent.TaskClaimed e -> currentState().withTaskClaimed(e.taskId(), e.claimedBy());
case BacklogEvent.TaskReleased e -> currentState().withTaskReleased(e.taskId());
case BacklogEvent.TaskTransferred e ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public sealed interface BacklogEvent {
record BacklogCreated(String name) implements BacklogEvent {}

@TypeName("akka-backlog-task-added")
record TaskAdded(String taskId) implements BacklogEvent {}
record TaskAdded(String taskId, String taskName) implements BacklogEvent {}

@TypeName("akka-backlog-task-claimed")
record TaskClaimed(String taskId, String claimedBy) implements BacklogEvent {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@
import java.util.Optional;

/** State of a backlog entity — tracks task references and their claim status. */
public record BacklogState(String name, Map<String, Optional<String>> tasks, boolean closed) {
public record BacklogState(String name, Map<String, Entry> tasks, boolean closed) {

/** An entry in the backlog: task ID and who has claimed it (if anyone). */
public record Entry(String taskId, Optional<String> claimedBy) {}
/** An entry in the backlog: task ID, task name, and who has claimed it (if anyone). */
public record Entry(String taskId, String taskName, Optional<String> claimedBy) {}

public static BacklogState empty() {
return new BacklogState("", Map.of(), false);
Expand All @@ -31,27 +31,33 @@ public boolean containsTask(String taskId) {
}

public boolean isClaimed(String taskId) {
return tasks.containsKey(taskId) && tasks.get(taskId).isPresent();
return tasks.containsKey(taskId) && tasks.get(taskId).claimedBy().isPresent();
}

public String taskName(String taskId) {
var entry = tasks.get(taskId);
return entry != null ? entry.taskName() : "";
}

public Optional<String> claimedBy(String taskId) {
return tasks.getOrDefault(taskId, Optional.empty());
var entry = tasks.get(taskId);
return entry != null ? entry.claimedBy() : Optional.empty();
}

public List<Entry> entries() {
return tasks.entrySet().stream().map(e -> new Entry(e.getKey(), e.getValue())).toList();
return List.copyOf(tasks.values());
}

public List<String> unclaimedTaskIds() {
return tasks.entrySet().stream()
.filter(e -> e.getValue().isEmpty())
.filter(e -> e.getValue().claimedBy().isEmpty())
.map(Map.Entry::getKey)
.toList();
}

public List<String> claimedTaskIds() {
return tasks.entrySet().stream()
.filter(e -> e.getValue().isPresent())
.filter(e -> e.getValue().claimedBy().isPresent())
.map(Map.Entry::getKey)
.toList();
}
Expand All @@ -60,27 +66,31 @@ public BacklogState withName(String name) {
return new BacklogState(name, tasks, closed);
}

public BacklogState withTaskAdded(String taskId) {
public BacklogState withTaskAdded(String taskId, String taskName) {
var updated = new java.util.HashMap<>(tasks);
updated.put(taskId, Optional.empty());
updated.put(taskId, new Entry(taskId, taskName, Optional.empty()));
return new BacklogState(name, Map.copyOf(updated), closed);
}

public BacklogState withTaskClaimed(String taskId, String claimedBy) {
var updated = new java.util.HashMap<>(tasks);
updated.put(taskId, Optional.of(claimedBy));
var existing = updated.get(taskId);
var taskName = existing != null ? existing.taskName() : "";
updated.put(taskId, new Entry(taskId, taskName, Optional.of(claimedBy)));
return new BacklogState(name, Map.copyOf(updated), closed);
}

public BacklogState withTaskReleased(String taskId) {
var updated = new java.util.HashMap<>(tasks);
updated.put(taskId, Optional.empty());
var existing = updated.get(taskId);
var taskName = existing != null ? existing.taskName() : "";
updated.put(taskId, new Entry(taskId, taskName, Optional.empty()));
return new BacklogState(name, Map.copyOf(updated), closed);
}

public BacklogState withUnclaimedRemoved() {
var updated = new java.util.HashMap<>(tasks);
updated.entrySet().removeIf(e -> e.getValue().isEmpty());
updated.entrySet().removeIf(e -> e.getValue().claimedBy().isEmpty());
return new BacklogState(name, Map.copyOf(updated), closed);
}

Expand Down
13 changes: 13 additions & 0 deletions akka-javasdk/src/main/java/akka/javasdk/agent/task/TaskKey.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* Copyright (C) 2021-2026 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.javasdk.agent.task;

/** A reference to a task, holding its ID and name. */
public record TaskKey(String id, String name) {
@Override
public String toString() {
return name + "|" + id;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import akka.javasdk.agent.autonomous.AutonomousAgent;
import akka.javasdk.agent.autonomous.Notification;
import akka.javasdk.agent.task.Task;
import akka.javasdk.agent.task.TaskKey;
import akka.stream.javadsl.Source;
import java.util.concurrent.CompletionStage;

Expand Down Expand Up @@ -47,18 +48,18 @@ default String runSingleTask(Task<?> task) {
* Assign one or more previously created tasks to this agent. Tasks are queued if the agent is
* busy.
*
* @param taskIds IDs of previously created tasks
* @param taskKeys references to previously created tasks
*/
default void assignTasks(String... taskIds) {
assignTasksAsync(taskIds).toCompletableFuture().join();
default void assignTasks(TaskKey... taskKeys) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Requiring the key in client api? Is this going in the right direction?
I didn't notice when reviewing the runtime PR.

assignTasksAsync(taskKeys).toCompletableFuture().join();
}

/**
* Async variant of {@link #assignTasks}.
*
* @param taskIds IDs of previously created tasks
* @param taskKeys references to previously created tasks
*/
CompletionStage<Done> assignTasksAsync(String... taskIds);
CompletionStage<Done> assignTasksAsync(TaskKey... taskKeys);

/**
* Apply per-instance configuration to this agent. The goal overrides the static goal from the
Expand Down
Loading
Loading