Skip to main content

Activity basics - Rust SDK

Develop a basic Activity

One of the primary things that Workflows do is orchestrate the execution of Activities. An Activity is a normal function or method execution that's intended to execute a single, well-defined action (either short or long-running), such as querying a database, calling a third-party API, or transcoding a media file.

An Activity can interact with the world outside the Temporal Platform or use a Temporal Client to interact with a Temporal Service. For the Workflow to be able to execute the Activity, you need to define the Activity Definition.

The #[activities] macro marks an impl block as containing Activity definitions. Each method decorated with #[activity] becomes an Activity that can be invoked from a Workflow.

Here's an example of an Activity:

use temporalio_sdk::activities::{ActivityContext, ActivityError};
use temporalio_macros::activities;

pub struct GreetingActivities;

#[activities]
impl GreetingActivities {
#[activity]
pub async fn greet(_ctx: ActivityContext, name: String) -> Result<String, ActivityError> {
Ok(format!("Hello, {}!", name))
}

#[activity]
pub async fn send_notification(_ctx: ActivityContext, message: String) -> Result<(), ActivityError> {
println!("Sending notification: {}", message);
Ok(())
}
}

Activity method signature requirements

Each Activity method must:

  • Be async (return a future)
  • Take ActivityContext as the first parameter
  • Return Result<T, ActivityError> where T is the return type
  • Be pub (public)

The ActivityContext parameter provides access to Activity execution information and capabilities like heartbeating. If you don't need it, you can use _ctx as a parameter name.

Define Activity parameters

There is no explicit limit to the total number of parameters that an Activity Definition may support. However, there is a limit to the total size of the data that ends up encoded into a gRPC message Payload.

A single argument is limited to a maximum size of 2 MB. And the total size of a gRPC message, which includes all the arguments, is limited to a maximum of 4 MB.

Also, keep in mind that all Payload data is recorded in the Workflow Execution Event History and large Event Histories can affect Worker performance.

We recommend that you use a single struct as an argument that wraps all the application data passed to Activities. This way you can change what data is passed to the Activity without breaking the function signature.

Activity parameters must be serializable and deserializable using serde. Use #[derive(Serialize, Deserialize)] on your data types:

use serde::{Serialize, Deserialize};
use temporalio_macros::activities;
use temporalio_sdk::activities::{ActivityContext, ActivityError};

#[derive(Serialize, Deserialize)]
pub struct GreetingInput {
pub greeting: String,
pub name: String,
}

pub struct GreetingActivities;

#[activities]
impl GreetingActivities {
#[activity]
pub async fn compose_greeting(
_ctx: ActivityContext,
input: GreetingInput,
) -> Result<String, ActivityError> {
Ok(format!("{} {}!", input.greeting, input.name))
}
}

Define Activity return values

All data returned from an Activity must be serializable.

Activity return values are subject to payload size limits in Temporal. The default payload size limit is 2MB, and there is a hard limit of 4MB for any gRPC message size in the Event History transaction. Keep in mind that all return values are recorded in a Workflow Execution Event History.

The return type of an Activity is Result<T, ActivityError>. The T type must implement Serialize. Use ActivityError::Retryable for errors that should be retried, and ActivityError::NonRetryable for permanent failures:

#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq, Hash)]
pub struct ProcessedData {
pub processed: String,
}

#[activities]
impl MyActivities {
#[activity]
pub async fn process_data(
_ctx: ActivityContext,
input: String,
) -> Result<ProcessedData, ActivityError> {
// If an error should be retried
if !validate_input(&input) {
return Err(ActivityError::Retryable {
source: "Invalid input format".into(),
explicit_delay: Some(Duration::from_secs(5))
});
}

// If an error should not be retried
if input.len() > 1000000 {
return Err(ActivityError::NonRetryable(
"Input too large".into()
));
}

let result = ProcessedData {
processed: input.to_uppercase(),
};

Ok(result)
}
}

Customize your Activity Type

Activities have a Type that refers to the Activity name. The Activity name is used to identify Activity Types in the Workflow Execution Event History, Visibility Queries, and Metrics.

By default, the Activity name is the method name. You can customize it by providing a name parameter to the #[activity] macro:

#[activities]
impl GreetingActivities {
#[activity(name = "compose_greeting")]
pub async fn greet(_ctx: ActivityContext, name: String) -> Result<String, ActivityError> {
Ok(format!("Hello, {}!", name))
}

#[activity(name = "send_email")]
pub async fn send_notification(_ctx: ActivityContext, message: String) -> Result<(), ActivityError> {
println!("Sending notification: {}", message);
Ok(())
}
}