This feature is not yet available in workflow-py. See our Roadmap for feature parity plans and Changelog for updates.

Workflow client allows you to interact with your workflow runs. Currently, it has three basic functionality:

We are planning to add more functionality in the future. See the roadmap for more details.

Trigger Workflow

Using the trigger method, you can start a workflow run and get its run id.

import { Client } from "@upstash/workflow";

const client = new Client({ token: "<QSTASH_TOKEN>" })
const { workflowRunId } = await client.trigger({
  url: "https://<YOUR_WORKFLOW_ENDPOINT>/<YOUR-WORKFLOW-ROUTE>",
  body: "hello there!",         // optional body
  headers: { ... },             // optional headers
  workflowRunId: "my-workflow", // optional workflow run id
  retries: 3                    // optional retries in the initial request
  flowControl: {                // optional flow control
    key: "USER_GIVEN_KEY",
    ratePerSecond: 10,
    parallelism: 5
  }
})

console.log(workflowRunId)
// prints wfr_my-workflow

If workflowRunId parameter isn’t passed, a run id will be generated randomly. If workflowRunId is passed, wfr_ prefix will be added to it.

For other alternatives of starting a workflow, see the documentation on starting a workflow run.

Get Worklfow Logs

Using the log method, you can use the List Workflow Runs API:

import { Client } from "@upstash/workflow";

const client = new Client({ token: "<QSTASH_TOKEN>" })
const { runs, cursor } = await client.logs({
  // Id of the workflow run to get
  workflowRunId,
  // Number of workflows to get
  count,
  // Workflow state to filter for.
  // One of "RUN_STARTED", "RUN_SUCCESS", "RUN_FAILED", "RUN_CANCELED"
  state,
  // Workflow url to search for. should be an exact match
  workflowUrl,       
  // Unix timestamp when the run was created
  workflowCreatedAt, 
  // Cursor from a previous request to continue the search
  cursor             
})

All the parameters above are optional.

The response includes cursor and runs. Using the cursor, you can continue the search later.

runs will be a list of runs. Each run has these fields:

  • workflowRunId: ID of the workflow
  • workflowUrl: URL of the workflow
  • workflowState: State of the workflow run.
  • workflowRunCreatedAt: number; Time when the workflow run started (as unix timestamp)
  • workflowRunCompletedAt; If run has completed, time when workflow run completed (as unix timestamp)
  • failureFunction: Information on the message published when the workflow failed and failureUrl or failureFunction were called
  • steps: List of steps showing the progress of the workflow

The steps field contains a list of steps. Each step is in one of the following three formats:

{
  type: "single";
  // a single step in an array
  steps: [
    {
      // step info...
    }
  ]
}

Cancel Workflow Runs

There are multiple ways you can cancel workflow runs:

  • pass one or more workflow run ids to cancel them
  • pass a workflow url to cancel all runs starting with this url
  • cancel all pending or active workflow runs

Cancel a set of workflow runs

// cancel a single workflow
await client.cancel({ ids: "<WORKFLOW_RUN_ID>" });

// cancel a set of workflow runs
await client.cancel({ ids: ["<WORKFLOW_RUN_ID_1>", "<WORKFLOW_RUN_ID_2>"] });

Cancel workflows starting with a url

If you have an endpoint called https://your-endpoint.com and you want to cancel all workflow runs on it, you can use urlStartingWith.

Note that this will cancel workflows in all endpoints under https://your-endpoint.com.

await client.cancel({ urlStartingWith: "https://your-endpoint.com" });

Cancel all workflows

To cancel all pending and currently running workflows, you can do it like this:

await client.cancel({ all: true });

Notify Waiting Workflow

To notify a workflow waiting for an event, you can use the notify method:

import { Client } from "@upstash/workflow";

const client = new Client({ token: "<QSTASH_TOKEN>" });
await client.notify({
  eventId: "my-event-id",
  eventData: "my-data", // data passed to the workflow run
});

The data passed in eventData will be available to the workflow run in the eventData field of the context.waitForEvent method.

Get Waiters of Event

To get the list of waiters for some event id, you can use the getWaiters method:

import { Client } from "@upstash/workflow";

const client = new Client({ token: "<QSTASH_TOKEN>" });
const result = await client.getWaiters({
  eventId: "my-event-id",
});

Result will be a list of Waiter objects:

Waiter