Messages - Python SDK feature guide
This page shows how to do the following:
See Workflow message-passing for an introduction to using messages with Temporal Workflows and guidance on safe usage of async Signal and Update handlers.
Writing Query, Signal, and Update handlers
Here is an example Workflow Definition featuring Query, Signal, and Update handlers, and an Update validator. You can view this as a runnable sample at hello/hello_message_passing.py.
import asyncio
from dataclasses import dataclass
from enum import IntEnum
from typing import Optional
from temporalio import workflow
class Language(IntEnum):
Chinese = 1
English = 2
French = 3
Spanish = 4
Portuguese = 5
@dataclass
class GetLanguagesInput:
supported_only: bool
@dataclass
class ApproveInput:
name: str
@workflow.defn
class GreetingWorkflow:
def __init__(self) -> None:
self.approved_for_release = False
self.approver_name: Optional[str] = None
self.language = Language.English
self.greetings = {
Language.English: "Hello, world",
Language.Chinese: "你好,世界",
}
@workflow.run
async def run(self) -> str:
await workflow.wait_condition(lambda: self.approved_for_release)
return self.greetings[self.language]
@workflow.query
def get_languages(self, input: GetLanguagesInput) -> list[Language]:
# A Query handler returns a value but must not mutate the Workflow state.
if input.supported_only:
return [lang for lang in Language if lang in self.greetings]
else:
return list(Language)
@workflow.signal
def approve(self, input: ApproveInput) -> None:
# A Signal handler mutates the Workflow state but cannot return a value.
self.approved_for_release = True
self.approver_name = input.name
@workflow.update
def set_language(self, language: Language) -> Language:
# An Update handler can mutate the Workflow state and return a value.
previous_language, self.language = self.language, language
return previous_language
@set_language.validator
def validate_language(self, language: Language) -> None:
if language not in self.greetings:
# In an Update validator you raise any exception to reject the Update.
raise ValueError(f"{language.name} is not supported")
@workflow.query
def get_language(self) -> Language:
return self.language
-
Arguments and return values of handlers (and the main Workflow function) must be serializable: a dataclass will often be the right choice. While multiple arguments are supported, it's recommended to use a single dataclass argument to which fields can be added as needed.
-
The argument and return types of the handler methods define the argument and return types that client code will use when sending the message (but Signals have no return type).
-
It's possible to write handler methods that take multiple arguments, but this is not recommended: instead use a single dataclass argument to which fields can be added/removed as needed.
-
The decorators can take arguments, for example to set the name to something other than the method name. See the API reference docs:
@workflow.query
,@workflow.signal
,workflow.update
.
Query handlers
-
A Query is a synchronous operation that's used to get the state of a Workflow Execution.
-
A Query handler is a plain
def
, not anasync def
: you can't do async operations such as executing an Activity in a Query handler.
Signal handlers
-
A Signal is a message sent asynchronously to a running Workflow Execution which can be used to change the state and control the flow of a Workflow Execution.
-
The handler should not return a value: it will be ignored (the sender of a signal gets a response immediately from the server, without waiting for the workflow to receive the signal).
-
Signal handlers can be
async def
and may use Activities, Child Workflows, durableasyncio.sleep(...)
timers, andworkflow.wait_condition(...)
conditions. See Async handlers below.
Update handlers and validators
-
An Update is a trackable request sent synchronously to a running Workflow Execution that can change the state and control the flow of a Workflow Execution, and return a result.
-
The sender of the request must wait until the Update is at least accepted or rejected by a Worker, and will often opt to wait further to receive the value returned by the Update handler, or an exception indicating what went wrong.
-
Validators are optional; if you don't want to be able to reject updates then you don't need a validator.
-
The name of the decorator you use to define the validator is based on the name that you give to the handler. The validator must take the same argument type as the handler, but always returns
None
. -
To reject an Update, you raise an exception (of any type) in the validator.
-
Update handlers can be
async def
and may use Activities, Child Workflows, durableasyncio.sleep(...)
timers, andworkflow.wait_condition(...)
conditions. See Async handlers below.
Sending messages
To send Queries, Signals, or Updates you call methods on a WorkflowHandle object.
To obtain the Workflow handle, you can:
- Use the start_workflow() to start a Workflow and return its handle.
- Use the get_workflow_handle_for() method to get a type-safe Workflow handle.
- Use the get_workflow_handle() method to get a non-type-safe Workflow handle.
For example:
client = await Client.connect("localhost:7233")
workflow_handle = await client.start_workflow(
GreetingWorkflow.run, id="greeting-workflow-1234", task_queue="my-task-queue"
)
Sending a Query
supported_languages = await workflow_handle.query(
GreetingWorkflow.get_languages, GetLanguagesInput(supported_only=True)
)
- Sending a Query does not add any events to the Workflow's Event History.
- You can send a Query to a Workflow Execution that has closed.
Sending a Signal
- A Signal can only be sent to a Workflow Execution that has not already closed.
- A Signal can be sent to a Workflow Execution from a Temporal Client or from another Workflow Execution.
Sending a Signal from a Client
await workflow_handle.signal(GreetingWorkflow.approve, ApproveInput(name=""))
- The call will return as soon as the server accepts the Signal: it does not wait for the Signal to be delivered to the Workflow Execution.
- The WorkflowExecutionSignaled Event appears in the Workflow's Event History if the Server accepts the Signal.
Sending a Signal from a Workflow
A Workflow can send a Signal to another Workflow, in which case it's called an External Signal.
In this case you need to obtain a Workflow handle for the external Workflow:
- Use
get_external_workflow_handle_for
to get a typed Workflow handle to an existing Workflow. - Use
get_external_workflow_handle
when you don't know the type of the other Workflow.
# ...
@workflow.defn
class WorkflowB:
@workflow.run
async def run(self) -> None:
handle = workflow.get_external_workflow_handle_for(WorkflowA.run, "workflow-a")
await handle.signal(WorkflowA.your_signal, "signal argument")
When an External Signal is sent:
- A SignalExternalWorkflowExecutionInitiated Event appears in the sender's Event History.
- A WorkflowExecutionSignaled Event appears in the recipient's Event History.
Signal-With-Start
Signal-With-Start is sent by a Client.
To send a Signal-With-Start, use the start_workflow()
method and pass the start_signal
argument with the name of your Signal.
If there's a Workflow running with the given Workflow Id, the Signal will be sent to it. If there isn't, a new Workflow will be started and the Signal will be sent immediately on start.
from temporalio.client import Client
# ...
# ...
async def main():
client = await Client.connect("localhost:7233")
await client.start_workflow(
GreetingWorkflow.run,
id="your-signal-with-start-workflow",
task_queue="signal-tq",
start_signal="submit_greeting",
start_signal_args=["User Signal with Start"],
)
Sending an Update
- A client sending an Update must be prepared to wait until the Server has delivered the Update to the Worker: if the Worker is offline or too slow, the call will time out. If you want the server to send a response as soon as it receives your request, then you must use a Signal instead.
WorkflowExecutionUpdateAccepted
will be added to Event History when the Worker has responded to the Server that the Update passed validation.WorkflowExecutionUpdateCompleted
will be added to Event History when the Worker has responded to the Server that the Update has completed.
It is not yet possible to send an Update to another Workflow, or to do the Update equivalent of Signal-With-Start.
To send an Update to a Workflow Execution, you have two choices:
1. Use execute_update
to wait for the update to complete
execute_update
sends an Update and waits until it has been completed by a Worker. It returns the Update result:
previous_language = await workflow_handle.execute_update(
GreetingWorkflow.set_language, Language.Chinese
)
2. Use start_update
to receive a handle as soon as the update is accepted or rejected
start_update
sends an Update and waits until the Update has been accepted or rejected by a Worker. It returns an UpdateHandle
:
You would use this when the Update handler is an async def
and does some long-running async activity (i.e. it uses await
to wait for Activities, Child Workflows, timers, or wait_condition
), but you don't want to wait for all this activity to be finished; instead you want to only until the Worker has accepted or rejected the Update.
See the "Async handlers" section.
# Wait until the update is accepted
update_handle = await workflow_handle.start_update(
HelloWorldWorkflow.set_greeting,
HelloWorldInput("World"),
)
# Wait until the update is completed
update_result = await update_handle.result()
Exceptions
The following exceptions might be raised by execute_update
, or when calling update_handle.result()
on a handle obtained from start_update
:
-
temporalio.client.WorkflowUpdateFailedError
The Update was either rejected by the validator, or the Workflow author deliberately failed the Update by raising
ApplicationError
in the handler. -
temporalio.service.RPCError
"workflow execution already completed"`This will happen if the Workflow finished while the Update handler execution was in progress, for example because
- The Workflow was canceled or was deliberately failed (the Workflow author raised
ApplicationError
outside an Update handler) - The Workflow completed normally or continued-as-new and the Workflow author did not wait for handlers to be finished.
- The Workflow was canceled or was deliberately failed (the Workflow author raised
If the Workflow handle references a Workflow that doesn't exist then execute_update
and start_update
will both raise temporalio.service.RPCError "sql: no rows in result set"
.
Message handler patterns
Inject work into the main Workflow
TODO (delicate subject; probably make this non-blocking Public Preview )
Ensuring your messages are processed exactly once
TODO
Async handlers
In the example at the top of the page, all Signal and Update handlers were plain def
methods.
But handlers can also be async def
.
This allows them to use await
to wait for Activities, Child Workflows, asyncio.sleep(...)
timers, or workflow.wait_condition(...)
conditions, thus opening up many powerful possibilities.
However, this means that handler executions, and your main Workflow method, are all running concurrently, with switching occurring between them at await
calls.
(I.e. they interleave, but there is no parallelism.)
It's essential to understand the things that could go wrong in order to use async def
handlers safely.
See Controlling handler concurrency and Waiting for message handlers to finish below.
See also Workflow message-passing for guidance on safe usage of async Signal and Update handlers, and the Safe message handlers sample.
As an example of an async handler, in the following code sample the Update handler from above has been changed to be an async def
so that it can execute an Activity to make a network call to a remote service:
@activity.defn
async def call_greeting_service(to_language: Language) -> Optional[str]:
await asyncio.sleep(0.2) # Pretend that we are calling a remote service.
greetings = {
Language.Arabic: "مرحبا بالعالم",
Language.Chinese: "你好,世界",
Language.English: "Hello, world",
Language.French: "Bonjour, monde",
Language.Hindi: "नमस्ते दुनिया",
Language.Spanish: "Hola mundo",
}
return greetings.get(to_language)
@workflow.defn
class GreetingWorkflow:
...
@workflow.update
async def set_language(self, language: Language) -> Language:
if language not in self.greetings:
greeting = await workflow.execute_activity(
call_greeting_service,
language,
start_to_close_timeout=timedelta(seconds=10),
)
if greeting is None:
# An update validator cannot be async, so cannot be used to check that the remote
# call_greeting_service supports the requested language. Raising ApplicationError
# will fail the Update, but the WorkflowExecutionUpdateAccepted event will still be
# added to history.
raise ApplicationError(
f"Greeting service does not support {language.name}"
)
self.greetings[language] = greeting
previous_language, self.language = self.language, language
return previous_language
Waiting
await workflow.wait_condition(...)
is often useful in Temporal Workflow Definitions.
You can use it in a handler as long as it's an async def
:
@workflow.update
async def my_update(self, update_input: UpdateInput) -> str:
await workflow.wait_condition(
lambda: self.ready_for_update_to_execute(update_input)
)
Use asyncio.Lock
to prevent concurrent handler execution
Sometimes you may write code that is incorrect if multiple instances of a handler are in progress concurrently. Here's an example:
@workflow.defn
class MyWorkflow:
@workflow.signal
async def bad_async_handler(self):
data = await workflow.execute_activity(
fetch_data, start_to_close_timeout=timedelta(seconds=10)
)
self.x = data.x
# 🐛🐛 Bug!! If multiple instances of this handler are executing concurrently, then
# there may be times when the Workflow has self.x from one Activity execution and self.y from another.
await asyncio.sleep(1) # or await anything else
self.y = data.y
To make this safe, you can use asyncio.Lock
:
@workflow.defn
class MyWorkflow:
def __init__(self) -> None:
...
self.lock = asyncio.Lock()
...
@workflow.signal
async def safe_async_handler(self):
async with self.lock:
data = await workflow.execute_activity(
fetch_data, start_to_close_timeout=timedelta(seconds=10)
)
self.x = data.x
# OK: the scheduler may switch now to a different handler execution, or to the main workflow
# method, but no other execution of this handler can run until this execution finishes.
await asyncio.sleep(1) # or await anything else
self.y = data.y
Finishing handlers before the Workflow completes
If your Workflow has async def
Signal or Update handlers, then there is nothing to stop you allowing your main Workflow method to return or continue-as-new while a handler execution is waiting on an async task such as an Activity result.
However, this means that the handler may have been interrupted before it finished important work.
And if it's an Update handler, then the client will get an error when they try to retrieve their Update result.
Is this really what you want?
Or would it be more correct for your Workflow to wait for in-progress handlers to finish before allowing your main Workflow method to return?
You can do this using all_handlers_finished()
:
@workflow.defn
class MyWorkflow:
@workflow.run
async def run(self) -> str:
...
await workflow.wait_condition(workflow.all_handlers_finished)
return "workflow-result"
By default, your Worker will log a warning if you allow your Workflow Execution to finish with unfinished handler executions.
You can silence these warnings on a per-handler basis by passing the unfinished_policy
argument to the @workflow.signal
/ workflow.update
decorator:
@workflow.update(unfinished_policy=workflow.HandlerUnfinishedPolicy.ABANDON)
async def my_update(self) -> None:
...
Dynamic Handlers
Temporal supports Dynamic Workflows, Activities, Signals, Updates, and Queries. These are unnamed handlers that are invoked if no other statically defined handler with the given name exists.
Dynamic Handlers provide flexibility to handle cases where the names of Workflows, Activities, Signals, or Queries aren't known at run time.
Dynamic Handlers should be used judiciously as a fallback mechanism rather than the primary approach. Overusing them can lead to maintainability and debugging issues down the line.
Instead, Workflows, Activities, Signals, and Queries should be defined statically whenever possible, with clear names that indicate their purpose. Use static definitions as the primary way of structuring your Workflows.
Reserve Dynamic Handlers for cases where the handler names are not known at compile time and need to be looked up dynamically at runtime. They are meant to handle edge cases and act as a catch-all, not as the main way of invoking logic.
Set a Dynamic Signal, Query, or Update handler
A Dynamic Signal in Temporal is a Signal that is invoked dynamically at runtime if no other Signal with the same input is registered.
A Signal can be made dynamic by adding dynamic=True
to the @signal.defn
decorator.
The handler must accept self
, a string input, and a Sequence[temporalio.common.RawValue]
.
The payload_converter() function is used to convert a RawValue
object to the desired type.
For example:
from typing import Sequence
from temporalio.common import RawValue
...
@workflow.signal(dynamic=True)
async def dynamic_signal(self, name: str, args: Sequence[RawValue]) -> None:
...
Set a Dynamic Workflow
How to set a Dynamic Workflow
A Dynamic Workflow in Temporal is a Workflow that is invoked dynamically at runtime if no other Workflow with the same name is registered.
A Workflow can be made dynamic by adding dynamic=True
to the @workflow.defn
decorator.
You must register the Workflow with the Worker before it can be invoked.
The Workflow Definition must then accept a single argument of type Sequence[temporalio.common.RawValue]
.
The payload_converter() function is used to convert a RawValue
object to the desired type.
# ...
@workflow.defn(dynamic=True)
class DynamicWorkflow:
@workflow.run
async def run(self, args: Sequence[RawValue]) -> str:
name = workflow.payload_converter().from_payload(args[0].payload, str)
return await workflow.execute_activity(
default_greeting,
YourDataClass("Hello", name),
start_to_close_timeout=timedelta(seconds=10),
)
Set a Dynamic Activity
How to set a Dynamic Activity
A Dynamic Activity in Temporal is an Activity that is invoked dynamically at runtime if no other Activity with the same name is registered.
An Activity can be made dynamic by adding dynamic=True
to the @activity.defn
decorator.
You must register the Activity with the Worker before it can be invoked.
The Activity Definition must then accept a single argument of type Sequence[temporalio.common.RawValue]
.
The payload_converter() function is used to convert a RawValue
object to the desired type.
# ...
@activity.defn(dynamic=True)
async def dynamic_greeting(args: Sequence[RawValue]) -> str:
arg1 = activity.payload_converter().from_payload(args[0].payload, YourDataClass)
return (
f"{arg1.greeting}, {arg1.name}!\nActivity Type: {activity.info().activity_type}"
)
# ...
@workflow.defn
class GreetingWorkflow:
@workflow.run
async def run(self, name: str) -> str:
return await workflow.execute_activity(
"unregistered_activity",
YourDataClass("Hello", name),
start_to_close_timeout=timedelta(seconds=10),
)