Workflow message passing - Rust SDK
A Workflow can act like a stateful web service that receives messages: Queries, Signals, and Updates. The Workflow implementation defines these endpoints via handler methods that can react to incoming messages and return values.
Temporal Clients use messages to read Workflow state and control its execution. See Workflow message passing for a general overview of this topic. This page introduces these features for the Temporal Rust SDK.
Write message handlers
Follow these guidelines when writing your message handlers:
- Message handlers are defined as methods on your Workflow struct and registered with the Workflow runtime.
- The parameters and return values of handlers and the main Workflow function must be serializable.
- Prefer structs to multiple input parameters to allow for forward-compatible changes.
Query handlers
A Query is a synchronous operation that retrieves state from a Workflow Execution:
// workflows.rs
use std::collections::HashMap;
use temporalio_macros::{workflow, workflow_methods};
use temporalio_sdk::{WorkflowContext, WorkflowContextView, WorkflowResult};
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum Language {
Chinese,
English,
French,
}
#[derive(serde::Serialize, serde::Deserialize)]
pub struct GetLanguagesInput {
pub include_unsupported: bool,
}
#[workflow(name = "greetings-workflow-10")]
pub struct GreetingsWorkflow {
pub greetings: HashMap<Language, String>,
pub input: GetLanguagesInput,
}
#[workflow_methods]
impl GreetingsWorkflow {
#[init]
fn new(_ctx: &WorkflowContextView) -> Self {
let mut greetings = HashMap::new();
greetings.insert(Language::Chinese, "你好,世界".to_string());
greetings.insert(Language::English, "Hello, world".to_string());
Self { greetings, input: GetLanguagesInput { include_unsupported: false } }
}
#[run]
pub async fn run(ctx: &mut WorkflowContext<Self>) -> WorkflowResult<String> {
let name = ctx.state(|s| s.greetings.clone());
Ok(format!("Hola: {:?}", name))
}
#[query]
pub fn get_languages(&self, _ctx: &WorkflowContextView) -> Vec<Language> {
if self.input.include_unsupported {
vec![Language::Chinese, Language::English, Language::French]
} else {
self.greetings.keys().copied().collect()
}
}
}
// main.rs
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Connect to local Temporal server
...
// Client setup
let connection = Connection::connect(connection_options).await?;
let client = Client::new(connection, ClientOptions::new("default").build())?;
let wf_handle = client.start_workflow(GreetingsWorkflow::run, (), WorkflowStartOptions::new("my-task-queue", "greetings-workflow-10").build()).await?;
// Set up Worker
...
let supported_languages = wf_handle.query(GreetingsWorkflow::get_languages, GetLanguagesInput { include_unsupported: true }, WorkflowQueryOptions::default()).await?;
// other Workflow stuff
...
Ok(())
}
- Query handlers can't mutate Workflow state.
- Query handlers can't perform async operations, like executing Activities.
Signal handlers
A Signal is an asynchronous message sent to a running Workflow Execution to change its state and control its flow:
#[derive(serde::Serialize, serde::Deserialize)]
pub struct ApproveInput {
pub name: String,
}
// Other structs
...
#[workflow_methods]
impl GreetingsWorkflow {
#[init]
fn new(_ctx: &WorkflowContextView) -> Self {
let mut greetings = HashMap::new();
greetings.insert(Language::Chinese, "你好,世界".to_string());
greetings.insert(Language::English, "Hello, world".to_string());
Self {greetings,input:GetLanguagesInput{include_unsupported:false}, approved_for_release: false, approver_name: None }
}
// Other Workflow logic
...
#[signal]
pub fn approve(&mut self, _ctx: &mut SyncWorkflowContext<Self>, input: ApproveInput) {
self.approved_for_release = true;
self.approver_name = Some(input.name);
}
}
// main.rs
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Connect to local Temporal server
...
// Client setup
let connection = Connection::connect(connection_options).await?;
let client = Client::new(connection, ClientOptions::new("default").build())?;
let wf_handle = client.start_workflow(GreetingsWorkflow::run, (), WorkflowStartOptions::new("my-task-queue", "greetings-workflow-10").build()).await?;
// Set up Worker
...
let supported_languages = wf_handle.query(GreetingsWorkflow::get_languages, GetLanguagesInput { include_unsupported: true }, WorkflowQueryOptions::default()).await?;
wf_handle.signal(GreetingsWorkflow::approve, ApproveInput { name: "Ziggy".to_string() }, WorkflowSignalOptions::default()).await?;
// other Workflow stuff
...
Ok(())
}
- Signal handlers do not return values.
- They can trigger async work (Activities, timers) depending on SDK capabilities.
Update handlers and validators
An Update is a trackable synchronous request sent to a running Workflow Execution. It can change the Workflow state, control its flow, and return a result. The sender must wait until the Worker accepts or rejects the Update. The sender may wait further to receive a returned value or an exception if something goes wrong:
// workflows.rs
...
#[derive(serde::Serialize, serde::Deserialize)]
pub struct SetLanguageInput {
pub language: Language,
}
...
#[workflow_methods]
impl GreetingsWorkflow {
// Other Workflow logic
...
#[update]
pub fn set_language(
&mut self,
_ctx: &mut SyncWorkflowContext<Self>,
input: SetLanguageInput,
) -> Language {
let previous_language = self.language;
self.language = input.language;
previous_language
}
#[update_validator(set_language)]
fn validate_set_language(
&self,
_ctx: &WorkflowContextView,
input: &SetLanguageInput,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
if !self.greetings.contains_key(&input.language) {
Err("Not a valid language".into())
} else {
Ok(())
}
}
}
- About validators:
- Use validators to reject an Update before it is written to History. Validators are always optional. If you don't need to reject Updates, you can skip them.
- If you choose to create validators for your Updates, they will reject Updates before they're applied.
- Accepting and rejecting Updates with validators:
- To reject an Update, raise an exception of any type in the validator.
- Without a validator, Updates are always accepted.
- Validators and Event History:
- The
WorkflowExecutionUpdateAcceptedevent is written into the History whether the acceptance was automatic or programmatic. - When a Validator raises an error, the Update is rejected and
WorkflowExecutionUpdateAcceptedwon't be added to the Event History. The caller receives an "Update failed" error.
- The
- Update and Signal handlers can be async, letting you use Activities, Child Workflows, and more. See Async handlers for safe usage guidelines.
Send messages
To send Queries, Signals, or Updates, use a Workflow handle from the client:
let client = Client::new(connection, ClientOptions::new("default").build())?;
let wf_handle = client.start_workflow(
GreetingsWorkflow::run,
(),
WorkflowStartOptions::new("my-task-queue", "greetings-workflow-10").build()
).await?;
Send a Query
let supported_languages = wf_handle.query(
GreetingsWorkflow::get_languages,
GetLanguagesInput { include_unsupported: true },
WorkflowQueryOptions::default()
).await?;
- Sending a Query doesn’t add events to a Workflow's Event History.
- You can send Queries to closed Workflow Executions within a Namespace's Workflow retention period. This includes Workflows that have completed, failed, or timed out. Querying terminated Workflows is not safe and, therefore, not supported.
- A Worker must be online and polling the Task Queue to process a Query.
Send a Signal
You can send a Signal to a Workflow Execution from a Temporal Client or from another Workflow Execution. However, you can only send Signals to Workflow Executions that haven’t closed.
From a Client
wf_handle.signal(
GreetingsWorkflow::approve,
ApproveInput { name: "Ziggy".to_string() },
WorkflowSignalOptions::default()
).await?;
- The call returns when the server accepts the Signal; it does not wait for the Signal to be delivered to the Workflow Execution.
From a Workflow
let signal_res = ctx.signal_workflow(SignalWorkflowOptions::new(
"workflow-id-1",
"run-id-1",
"approve",
serde_json::to_vec(&ApproveInput {
name: "Ziggy".to_string(),
}),
)).await;
Signal-With-Start
let signal_input = Payloads {
payloads: vec![Payload::from("Ziggy".to_string())],
};
let wf_handle = client.start_workflow(
GreetingsWorkflow::run,
(),
WorkflowStartOptions::new("my-task-queue", "greetings-workflow-10")
.start_signal(
WorkflowStartSignal::new("approve")
.input(signal_input).build(),
).build(),
).await?;
Send an Update
An Update is a synchronous, blocking call that can change Workflow state, control its flow, and return a result.
A client sending an Update must wait until the Server delivers the Update to a Worker. Workers must be available and responsive. If you need a response as soon as the Server receives the request, use a Signal instead. Also note that you can't send Updates to other Workflow Executions.
WorkflowExecutionUpdateAcceptedis added to the Event History when the Worker confirms that the Update passed validation.WorkflowExecutionUpdateCompletedis added to the Event History when the Worker confirms that the Update has finished.
To send an Update to a Workflow Execution, you can call execute_update and wait for the Update to complete.
This code fetches an Update result:
let previous_language = wf_handle.execute_update(
GreetingsWorkflow::set_language,
SetLanguageInput { language: Language::French },
WorkflowExecuteUpdateOptions::default()
).await?;
Update-With-Start
You can also send start_update to receive an UpdateHandle as soon as the Update is accepted.
- Use this
UpdateHandlelater to fetch your results. - Async Update handlers normally perform long-running asynchronous operations, such as executing an Activity.
start_updateonly waits until the Worker has accepted or rejected the Update, not until all asynchronous operations are complete.
For example:
let update_handle = main_wf_handle.start_update(
GreetingsWorkflow::set_language,
SetLanguageInput { language: Language::French },
WorkflowStartUpdateOptions::default()
).await?;
- Updates are synchronous and return results.
- Worker must accept the Update before it proceeds.
Message handler patterns
Async handlers
Signal and Update handlers can be async fn as well as fn. Using async fn allows you to use await with Activities, Child Workflows, Timers, etc. This expands the possibilities for what can be done by a handler, but it also means that handler executions and your main Workflow method are all running concurrently, with switching occurring between them at await calls.
It's essential to understand the things that could go wrong in order to use async fn handlers safely. See Workflow message passing for guidance on safe usage of async Signal and Update handlers, the Safe message handlers sample and the sections below.
The following code executes an Activity that makes a network call to a remote service:
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum ActivityLanguage {
Arabic,
Chinese,
English,
French,
Hindi,
Spanish,
}
pub struct MyActivities;
#[activities]
impl MyActivities {
#[activity]
pub async fn call_greeting_service(_ctx: ActivityContext, to_language: Language) -> Result<String, ActivityError> {
// Pretend that we are calling a remote service.
sleep(Duration::from_millis(200)).await;
let mut greetings = HashMap::new();
greetings.insert(Language::Arabic, "مرحبا بالعالم".to_string());
greetings.insert(Language::Chinese, "你好,世界".to_string());
greetings.insert(Language::English, "Hello, world".to_string());
greetings.insert(Language::French, "Bonjour, monde".to_string());
greetings.insert(Language::Hindi, "नमस्ते दुनिया".to_string());
greetings.insert(Language::Spanish, "Hola mundo".to_string());
let result = greetings.get(&to_language).cloned();
Ok(format!("Hello, {:?}!", result))
}
}
After updating the code to use an async fn, your Update handler can schedule an Activity and await the result. Although an async fn Signal handler can also execute an Activity, using an Update handler allows the client to receive a result or error once the Activity completes.
This lets your client track the progress of asynchronous work performed by the Update's Activities, Child Workflows, etc. Here's how you could start that Activity from within your Workflow:
#[update]
async fn set_language_activity(
ctx: &mut WorkflowContext<Self>,
language: Language,
) -> Language {
let needs_greeting = ctx.state(|s| !s.greetings.contains_key(&language));
if needs_greeting {
ctx.wait_condition(|s| !s.approved_for_release).await;
ctx.state_mut(|s| {
s.approved_for_release = true;
});
let result = async {
let greeting = ctx.start_activity(
MyActivities::call_greeting_service,
Language::French,
ActivityOptions::default()
).await;
ctx.state_mut(|s| {
s.greetings.insert(language, greeting.unwrap());
});
}.await;
ctx.state_mut(|s| {
s.approved_for_release = false;
});
result;
}
let previous_language = ctx.state(|s| s.language);
ctx.state_mut(|s| {
s.language = language;
});
previous_language
}
Add wait conditions to block
Sometimes, async Signal or Update handlers need to meet certain conditions before they should continue. You can use ctx.wait_condition to prevent the code from proceeding until a condition is true. You specify the condition by passing a function that returns a boolean and you can optionally set a timeout. This is an important feature that helps you control your handler logic.
Here are three important use cases for ctx.wait_condition:
- Wait for a Signal or Update to arrive.
- Wait in a handler until it's appropriate to continue.
- Wait in the main Workflow until all active handlers have finished.
It's common to use ctx.condition to wait for a particular Signal or Update to be sent by a client. In your Workflow, you will have something like:
#[run]
pub async fn run(ctx: &mut WorkflowContext<Self>) -> WorkflowResult<String> {
let name = ctx.state(|s| s.greetings.clone());
ctx.wait_condition(|s| !s.approved_for_release).await;
// Other Workflow logic
...
}
Ensure handlers finish before completion
Workflow wait conditions can ensure your handler completes before a Workflow finishes. When your Workflow uses async Signal or Update handlers, your main Workflow method can return or continue-as-new while a handler is still waiting on an async task, such as an Activity result.
The Workflow completing may interrupt the handler before it finishes crucial work and cause client errors when trying retrieve Update results. Use ctx.wait_condition to address this problem and allow your Workflow to end smoothly.