[Core] Fix TASK_PROFILE_EVENT aggregation for multiple phases#61559
[Core] Fix TASK_PROFILE_EVENT aggregation for multiple phases#61559Anarion-zuo wants to merge 2 commits intoray-project:masterfrom
Conversation
Signed-off-by: aaronzuo <anarionzuo@outlook.com>
There was a problem hiding this comment.
Code Review
This pull request correctly fixes an issue where only a single profile event was being exported per task attempt, instead of aggregating all execution phases. The modification in TaskProfileEvent::ToRpcRayEvents to append new profile events is sound, and the new test TestTaskProfileEventToRpcRayEventsMultipleEvents provides good coverage for this fix. I have one suggestion to simplify the implementation in task_event_buffer.cc for improved readability and maintainability.
| rpc::events::RayEvent *ray_event_ptr; | ||
| if (!ray_events_tuple.task_profile_event) { | ||
| auto &ray_event = ray_events_tuple.task_profile_event.emplace(); | ||
| PopulateRpcRayEventBaseFields(ray_event, timestamp); | ||
| ray_event_ptr = &ray_event; | ||
|
|
||
| // Populate the task profile event base fields | ||
| auto *task_profile_events = ray_event_ptr->mutable_task_profile_events(); | ||
| task_profile_events->set_task_id(task_id_.Binary()); | ||
| task_profile_events->set_job_id(job_id_.Binary()); | ||
| task_profile_events->set_attempt_number(attempt_number_); | ||
| } else { | ||
| ray_event_ptr = &ray_events_tuple.task_profile_event.value(); | ||
| } | ||
|
|
||
| // Add this profile event to the events list | ||
| auto *task_profile_events = ray_event_ptr->mutable_task_profile_events(); |
There was a problem hiding this comment.
The logic to get or create the RayEvent and then retrieve the TaskProfileEvents can be simplified. Using an intermediate raw pointer ray_event_ptr makes the code a bit indirect. You can work directly with a pointer to TaskProfileEvents to make the logic more straightforward and reduce redundancy.
rpc::events::TaskProfileEvents *task_profile_events;
if (!ray_events_tuple.task_profile_event) {
auto &ray_event = ray_events_tuple.task_profile_event.emplace();
PopulateRpcRayEventBaseFields(ray_event, timestamp);
task_profile_events = ray_event.mutable_task_profile_events();
task_profile_events->set_task_id(task_id_.Binary());
task_profile_events->set_job_id(job_id_.Binary());
task_profile_events->set_attempt_number(attempt_number_);
} else {
task_profile_events = ray_events_tuple.task_profile_event->mutable_task_profile_events();
}Signed-off-by: aaronzuo <anarionzuo@outlook.com>
911464b to
db4bf3f
Compare
Fix TASK_PROFILE_EVENT aggregation to preserve all phases
Description
Fixes an issue where the Ray Event Export API only exported a single profile event per task attempt instead of all the execution phases (like task:deserialize_arguments , task:execute , task:store_outputs , etc.). The problem was in TaskProfileEvent::ToRpcRayEvents , which unconditionally overwrote the task_profile_event in the RayEventsTuple instead of reusing the existing one and appending new events.
To test this fix:
Related issues
#61520
Additional information
Now the Ray Event Export API will export the full set of task profile phases (just like the Dashboard timeline), so you can reconstruct the complete task execution timeline from the exported events!