Brandon Peebles
03/07/2024, 5:11 AMScout
03/07/2024, 5:11 AMpython from dagster import op, job, ResourceDefinition # Define a resource that you want to pass to your ops class MyResource: def __init__(self, resource_value): self.resource_value = resource_value # Define the op factory function def my_op_factory(string_var, resource_key): @op(required_resource_keys={resource_key}) def my_op(context): # Use the string_var and resource within the op <http://context.log.info|context.log.info>(f"String variable: {string_var}") <http://context.log.info|context.log.info>(f"Resource value: {context.resources[resource_key].resource_value}") # Your op logic goes here return my_op # Create two different resource instances resource_1 = ResourceDefinition.hardcoded_resource(MyResource("resource_1_value"), "resource_1") resource_2 = ResourceDefinition.hardcoded_resource(MyResource("resource_2_value"), "resource_2") # Instantiate the op with different configurations my_op_instance_1 = my_op_factory("string_1", "resource_1") my_op_instance_2 = my_op_factory("string_2", "resource_2") # Define a job that uses the two op instances with their respective resources @job(resource_defs={"resource_1": resource_1, "resource_2": resource_2}) def my_job(): my_op_instance_1() my_op_instance_2() # Now you can execute the job, and it will run both ops with their specific configurations and resources
In this example, my_op_factory
is a function that creates a new instance of an op with a given string variable and a resource key. The required_resource_keys
parameter in the @op
decorator specifies which resources the op requires. We then create two different resources, resource_1
and resource_2
, each with its own value. We instantiate the op twice using my_op_factory
, passing in different string variables and resource keys for each instance. Finally, we define a job that includes both op instances and specifies the resources to use for each one. When you run my_job
, Dagster will execute both instances of the op, each with its own string variable and resource.Brandon Peebles
03/07/2024, 5:14 AMScout
03/07/2024, 5:14 AMpython
from dagster import resource, Field, AssetIn, AssetOut, asset
# Define a resource with a simple configuration schema
@resource(config_schema={"resource_param": str})
def my_resource(context):
return context.resource_config["resource_param"]
# Define a configuration schema class for your asset
class MyAssetConfig:
def __init__(self, my_param: str):
self.my_param = my_param
Next, we'll create an asset factory function that takes a configuration object and a resource key:
python
# Define the asset factory function
def my_asset_factory(config: MyAssetConfig, resource_key: str):
@asset(required_resource_keys={resource_key}, config_schema=MyAssetConfig)
def my_asset(context):
# Use the config and resource within the asset
<http://context.log.info|context.log.info>(f"Config parameter: {config.my_param}")
<http://context.log.info|context.log.info>(f"Resource value: {context.resources[resource_key]}")
# Your asset logic goes here
return my_asset
Now, let's instantiate the asset with different configurations and resources:
python
# Instantiate the asset with different configurations
my_asset_instance_1 = my_asset_factory(MyAssetConfig(my_param="string_1"), "my_resource")
my_asset_instance_2 = my_asset_factory(MyAssetConfig(my_param="string_2"), "my_resource")
# Define a job that uses the two asset instances with their respective resources
@job(resource_defs={"my_resource": my_resource})
def my_job():
my_asset_instance_1()
my_asset_instance_2()
Finally, you can execute the job with specific run configurations for each asset:
python
from dagster import execute_job
# Execute the job with specific configurations for each asset
result = execute_job(
my_job,
run_config={
"resources": {
"my_resource": {"config": {"resource_param": "resource_value"}}
},
"ops": {
"my_asset_instance_1": {"config": {"my_param": "string_1"}},
"my_asset_instance_2": {"config": {"my_param": "string_2"}},
},
},
)
In this example, my_asset_factory
is a function that creates a new instance of an asset with a given configuration object and a resource key. The required_resource_keys
parameter in the @asset
decorator specifies which resources the asset requires. We then create two different instances of the asset, each with its own configuration.
When yScout
03/07/2024, 5:14 AMmy_job
with the provided run_config
, Dagster will execute both instances of the asset, each with its own configuration and the shared resource.Brandon Peebles
03/07/2024, 5:39 AMScout
03/07/2024, 5:39 AMpython
from dagster import resource, Field, StringSource
@resource(config_schema={
"account": StringSource,
"user": StringSource,
"password": StringSource,
"database": StringSource,
"schema": StringSource,
})
def snowflake_resource(context):
# Here you would create and return a connection object to Snowflake
# using the provided configuration.
pass
# Define resources for each Snowflake database
snowflake_db1_resource = snowflake_resource.configured({
"account": "account1",
"user": "user1",
"password": "password1",
"database": "database1",
"schema": "schema1",
})
snowflake_db2_resource = snowflake_resource.configured({
"account": "account2",
"user": "user2",
"password": "password2",
"database": "database2",
"schema": "schema2",
})
2. Single Snowflake Resource with Dynamic Configuration: If you frequently switch between accounts and databases or have a large number of them, it might be more efficient to have a single Snowflake resource that you configure dynamically at runtime. This approach is more flexible and allows you to pass the necessary configuration parameters when you launch a run.
```python
from dagster import job, op, resource, Field, StringSource
@resource(config_schema={
"account": StringSource,
"user": StringSource,
"password": StringSource,
"database": StringSource,
"schema": StringSource,
})
def snowflake_resource(context):
# Here you would create and return a connection object to Snowflake
# using the provided configuration.
pass
@op(required_resource_keys={"snowflake"})
def my_op(context):
# Use the Snowflake resource to perform operations
pass
@job(resource_defs={"snowflake": snowflake_resource})
def my_job():
my_op()
# Execute the job with specific configurations for the Snowflake resource
result = execute_job(
my_job,
run_config={
"resources": {
"snowflake": {
"config": {
"account": "account1",
"user": "user1",
"password": "password1",
"daScout
03/07/2024, 5:40 AMBrandon Peebles
03/07/2024, 5:43 AMScout
03/07/2024, 5:43 AMScout
03/07/2024, 5:43 AMsnowflake_op_factory
is a function that creates a new instance of an op configured to connect to a specific Snowflake database. The snowflake_resource
is defined with a dynamic configuration schema that allows you to specify the connection details at runtime. When you execute my_job
, you provide a run configuration that includes the connection details for each database. The snowflake_op_db1
and snowflake_op_db2
instances are configured to run queries on "database1" and "database2", respectively. This pattern provides the flexibility to generate multiple op instances with different configurations while sharing the same underlying resource definition. It's a powerful way to create reusable and configurable ops for different environments or databases.