py> operator runs a Python script using python
command. This feature is called "Custom Scripts" in Treasure Workflow.
See Python API documents for details including variable mappings to keyword arguments.
+step1:
py>: my_step1_method
+step2:
py>: tasks.MyWorkflow.step2
When you don't know how to set secrets, please refer to Managing Workflow Secret
aws.s3.region, aws.region
An optional explicit AWS Region in which to access S3. Default is us-east-1.
aws.s3.access_key_id, aws.access_key_id
The AWS Access Key ID to use when accessing S3. When using
s3_credential_provider: assume_role
, this is not required.aws.s3.secret_access_key, aws.secret_access_key
The AWS Secret Access Key to use when accessing S3. When using
s3_credential_provider: assume_role
, this is not required.
py>: [PACKAGE.CLASS.]METHOD
Name of a method to run.
Examples:
# sample.dig py>: tasks.MyWorkflow.my_task
This example assume the following directory structure:
. ├── sample.dig └── tasks └── init.py
You can write `__init__.py` like:
```python
# __init__.py
class MyWorkflow(object):
def my_task(self):
print("awesome execution")
Or, you can create put a Python script named tasks.py
in a same directory as dig file.
.
├── sample.dig
└── tasks.py
Here is the example of tasks.py
:
# tasks.py
class MyWorkflow(object):
def my_task(self):
print("awesome execution")
You can write a function without creating a class as the following:
# simple_sample.dig
py>: simple_tasks.my_func
.
├── simple_sample.dig
└── simple_tasks.py
# simple_tasks.py
def my_func():
print("simple execution")
You can pass arguments to class for initialization by defining arguments under the py>:
operation as the following:
# sample.dig
+some_task:
py>: tasks.MyWorkflow.my_task
required1_1: awesome execution
required1_2: "awesome execution"
required2: {a: "a"}
required3: 1
required4: 1.0
required5: [a, 1, 1.0, "a"]
Also, you can do the same thing using _export
as the following:
# sample.dig
+some_task:
_export:
required1_1: awesome execution
required1_2: "awesome execution"
required2: {a: "a"}
required3: 1
required4: 1.0
required5: [a, 1, 1.0, "a"]
py>: tasks.MyWorkflow.my_task
This example assume following Python script:
# tasks.py
from typing import Union
class MyWorkflow(object):
def __init__(
self,
required1_1: str,
required1_2: str,
required2: dict[str, str],
required3: int,
required4: float,
required5: list[Union[str, int, float]]
):
print(f"{required1_1} same as {required1_2}")
self.arg2 = required2
print(f"{float(required3)} same as {required4}")
self.arg5 = required5
def my_task(self):
pass
Or, you can pass arguments to function as the following:
# sample.dig
+some_task:
py>: simple_tasks.my_func
required1: simple execution
required2: {a: "a"}
# simple_sample.dig
+some_task:
_export:
required1: simple execution
required2: {a: "a"}
py>: simple_tasks.my_func
# simple_tasks.py
def my_func(required1: str, required2: dict[str, str]):
print(f"{required1}: {required2}")
Finally, you can pass combination (must have different names) of class and mehtod arguments to Python script as the following:
# sample.dig
+some_task:
py>: tasks.MyWorkflow.my_task
required_class_arg: awesome execution
required_method_arg: ["a", "b"]
# sample.dig
+some_task:
_export:
required_class_arg: awesome execution
required_method_arg: ["a", "b"]
py>: tasks.MyWorkflow.my_task
# tasks.py
class MyWorkflow:
def __init__(self, required_class_arg: str):
self.arg = required_class_arg
def my_task(self, required_method_arg: list[str]):
print(f"{self.arg}: {required_method_arg}")
python: PATH STRING or COMMAND ARGUMENTS LIST
The python defaults to
python
. If an alternate python and options are desired, use thepython
option.Examples:
python: /opt/conda/bin/python
```yaml
python: ["python", "-v"]
It is also possible to configure in _export
section.
Examples:
_export:
py:
python: /opt/conda/bin/python
The py> operator supports S3 file operations (s3_get
and s3_put
) with both access key and assume role credential providers.
s3_get: LIST
List of S3 objects to download before executing the Python script. Each item should specify
from
(S3 path) andto
(local path).The
recursive
option can be used to download all files in a directory.Examples:
py>: tasks.MyWorkflow.my_task s3_get: - from: my-bucket/data/input-data.csv to: tmp/data/input.csv - from: my-bucket/data/config.json to: tmp/data/config.json - from: my-bucket/scripts/ to: tmp/scripts/ recursive: true
s3_put: LIST
List of local files to upload to S3 after executing the Python script. Each item should specify
from
(local path) andto
(S3 path).The
recursive
option can be used to upload all files in a directory.Examples:
py>: tasks.MyWorkflow.my_task s3_put: - from: tmp/output/result.csv to: my-bucket/results/result.csv - from: tmp/logs/ to: my-bucket/logs/ recursive: true
s3_credential_provider: NAME
The credential provider to use for S3 operations. Supported values are
access_key
(default) andassume_role
.Examples:
py>: tasks.MyWorkflow.my_task s3_credential_provider: assume_role s3_assume_role_authentication_id: ${auth_id} s3_region: us-east-1 s3_get: ... s3_put: ...
s3_assume_role_authentication_id: NUMBER
The authentication ID for assume role when using
s3_credential_provider: assume_role
. This corresponds to the TD Data Connector configuration.How to get authentication_id is written in Reusing the existing Authentication.
s3_region: REGION
AWS region for S3 operations. Default is us-east-1.