Skip to content

py> operator runs a Python script using python command. This feature is called "Custom Scripts" in Treasure Workflow.

See Python API documents for details including variable mappings to keyword arguments.

+step1:
  py>: my_step1_method
+step2:
  py>: tasks.MyWorkflow.step2

Secrets

When you don't know how to set secrets, please refer to Managing Workflow Secret

  • aws.s3.region, aws.region

    An optional explicit AWS Region in which to access S3. Default is us-east-1.

  • aws.s3.access_key_id, aws.access_key_id

    The AWS Access Key ID to use when accessing S3. When using s3_credential_provider: assume_role, this is not required.

  • aws.s3.secret_access_key, aws.secret_access_key

    The AWS Secret Access Key to use when accessing S3. When using s3_credential_provider: assume_role, this is not required.

Options

  • py>: [PACKAGE.CLASS.]METHOD

    Name of a method to run.

    Examples:

    # sample.dig
    py>: tasks.MyWorkflow.my_task

  This example assume the following directory structure:

. ├── sample.dig └── tasks └── init.py


  You can write `__init__.py` like:

  ```python
  # __init__.py
  class MyWorkflow(object):
      def my_task(self):
          print("awesome execution")

Or, you can create put a Python script named tasks.py in a same directory as dig file.

  .
  ├── sample.dig
  └── tasks.py

Here is the example of tasks.py:

# tasks.py
class MyWorkflow(object):
    def my_task(self):
        print("awesome execution")

You can write a function without creating a class as the following:

# simple_sample.dig
py>: simple_tasks.my_func
  .
  ├── simple_sample.dig
  └── simple_tasks.py
# simple_tasks.py
def my_func():
  print("simple execution")

You can pass arguments to class for initialization by defining arguments under the py>: operation as the following:

# sample.dig
+some_task:
  py>: tasks.MyWorkflow.my_task
  required1_1: awesome execution
  required1_2: "awesome execution"
  required2: {a: "a"}
  required3: 1
  required4: 1.0
  required5: [a, 1, 1.0, "a"]

Also, you can do the same thing using _export as the following:

# sample.dig
+some_task:
  _export:
    required1_1: awesome execution
    required1_2: "awesome execution"
    required2: {a: "a"}
    required3: 1
    required4: 1.0
    required5: [a, 1, 1.0, "a"]
  py>: tasks.MyWorkflow.my_task

This example assume following Python script:

# tasks.py
from typing import Union


class MyWorkflow(object):
    def __init__(
      self,
      required1_1: str,
      required1_2: str,
      required2: dict[str, str],
      required3: int,
      required4: float,
      required5: list[Union[str, int, float]]
    ):
        print(f"{required1_1} same as {required1_2}")
        self.arg2 = required2
        print(f"{float(required3)} same as {required4}")
        self.arg5 = required5

    def my_task(self):
        pass

Or, you can pass arguments to function as the following:

# sample.dig
+some_task:
  py>: simple_tasks.my_func
  required1: simple execution
  required2: {a: "a"}
# simple_sample.dig
+some_task:
  _export:
    required1: simple execution
    required2: {a: "a"}
  py>: simple_tasks.my_func
# simple_tasks.py
def my_func(required1: str, required2: dict[str, str]):
  print(f"{required1}: {required2}")

Finally, you can pass combination (must have different names) of class and mehtod arguments to Python script as the following:

# sample.dig
+some_task:
  py>: tasks.MyWorkflow.my_task
  required_class_arg: awesome execution
  required_method_arg: ["a", "b"]
# sample.dig
+some_task:
  _export:
    required_class_arg: awesome execution
    required_method_arg: ["a", "b"]
  py>: tasks.MyWorkflow.my_task
# tasks.py
class MyWorkflow:
  def __init__(self, required_class_arg: str):
    self.arg = required_class_arg

  def my_task(self, required_method_arg: list[str]):
    print(f"{self.arg}: {required_method_arg}")
  • python: PATH STRING or COMMAND ARGUMENTS LIST

    The python defaults to python. If an alternate python and options are desired, use the python option.

    Examples:

    python: /opt/conda/bin/python

  ```yaml
  python: ["python", "-v"]

It is also possible to configure in _export section.

Examples:

_export:
  py:
    python: /opt/conda/bin/python

S3 Operations

The py> operator supports S3 file operations (s3_get and s3_put) with both access key and assume role credential providers.

S3 Options

  • s3_get: LIST

    List of S3 objects to download before executing the Python script. Each item should specify from (S3 path) and to (local path).

    The recursive option can be used to download all files in a directory.

    Examples:

    py>: tasks.MyWorkflow.my_task
    s3_get:
      - from: my-bucket/data/input-data.csv
        to: tmp/data/input.csv
      - from: my-bucket/data/config.json
        to: tmp/data/config.json
      - from: my-bucket/scripts/
        to: tmp/scripts/
        recursive: true
  • s3_put: LIST

    List of local files to upload to S3 after executing the Python script. Each item should specify from (local path) and to (S3 path).

    The recursive option can be used to upload all files in a directory.

    Examples:

    py>: tasks.MyWorkflow.my_task
    s3_put:
      - from: tmp/output/result.csv
        to: my-bucket/results/result.csv
      - from: tmp/logs/
        to: my-bucket/logs/
        recursive: true
  • s3_credential_provider: NAME

    The credential provider to use for S3 operations. Supported values are access_key (default) and assume_role.

    Examples:

    py>: tasks.MyWorkflow.my_task
    s3_credential_provider: assume_role
    s3_assume_role_authentication_id: ${auth_id}
    s3_region: us-east-1
    s3_get: ...
    s3_put: ...
  • s3_assume_role_authentication_id: NUMBER

    The authentication ID for assume role when using s3_credential_provider: assume_role. This corresponds to the TD Data Connector configuration.

    How to get authentication_id is written in Reusing the existing Authentication.

  • s3_region: REGION

    AWS region for S3 operations. Default is us-east-1.