Jacob Marcil
03/06/2024, 7:36 PMScout
03/06/2024, 7:36 PMconfig_mapping
, I can explain how you would typically approach this based on the concepts of assets, ops, and configuration.
Firstly, config_mapping
is a feature that allows you to transform the configuration provided to a job or graph into the configuration for the ops within it. This can be useful when you want to provide a simplified configuration interface for your job or graph, or when you want to generate configuration dynamically based on some logic.
Here's a conceptual example based on the documentation's guidance:
1. Define your ops, each with its own configuration schema.
2. Create a graph that includes these ops.
3. Define a config_mapping
for the graph that takes an external configuration and maps it to the configurations of the individual ops.
4. Use @graph_asset
to turn the graph into an asset.
Here's a simplified example:
python
from dagster import graph, op, GraphDefinition, In, Out, graph_asset
# Define your ops with config schemas
@op(config_schema={"param": str})
def op_with_config(context):
# Access the op's config using context.op_config
param = context.op_config["param"]
<http://context.log.info|context.log.info>(f"Op config: {param}")
return param
@op
def process_data(data):
# Process the data in some way
return data.upper()
# Create a graph that includes these ops
@graph
def my_graph():
data = op_with_config()
processed_data = process_data(data)
return processed_data
# Define a config mapping for the graph
def my_graph_config_mapping(config):
return {"ops": {"op_with_config": {"config": {"param": config["param"]}}}}
# Create a graph definition with config mapping
my_graph_def = GraphDefinition(
name="my_graph",
node_defs=[op_with_config, process_data],
dependencies={"process_data": {"data": In("op_with_config")}},
config_mapping=my_graph_config_mapping,
)
# Use `@graph_asset` to turn the graph into an asset
@graph_asset
def my_graph_asset():
return my_graph()
In this example, my_graph_config_mapping
is a function that takes a configuration dictionary and maps it to the configuration expected by op_with_config
. When you materialize the my_graph_asset
, you would provide a configuration that includes the param
key at the top level, and my_graph_config_mapping
would transform it into the correct structure for the op.
Please note that this example is conceptual and based on the principles described in the documentation. The actual implementation details may vary, and you should refer to the latest Dagster documentation for the most accurate and up-to-date information on defining assets and using `coScout
03/06/2024, 7:37 PMJacob Marcil
03/06/2024, 7:42 PMScout
03/06/2024, 7:42 PM@config_mapping
decorator is used to define a config mapping for a graph (or job). It allows you to override the configuration for the child ops contained within a graph. Here's how you can apply @config_mapping
to the previous example: python from dagster import op, graph, config_mapping, GraphOut, Out # Define your ops with config schemas @op(config_schema={"param": str}) def op_with_config(context): # Access the op's config using context.op_config param = context.op_config["param"] <http://context.log.info|context.log.info>(f"Op config: {param}") return param @op def process_data(data): # Process the data in some way return data.upper() # Define a config mapping for the graph @config_mapping(config_schema={"param": str}) def my_graph_config_mapping(config): return {"ops": {"op_with_config": {"config": {"param": config["param"]}}}} # Create a graph that includes these ops and uses the config mapping @graph(out={"result": Out()}) def my_graph(): data = op_with_config() processed_data = process_data(data) return {"result": processed_data} # Apply the config mapping to the graph my_graph_with_config_mapping = my_graph.with_config_mapping(my_graph_config_mapping) # Use `@graph_asset` to turn the graph into an asset @graph_asset def my_graph_asset(): return my_graph_with_config_mapping()
In this updated example, we use the @config_mapping
decorator to define my_graph_config_mapping
. This function takes a simplified configuration schema (in this case, just a single parameter param
) and maps it to the configuration expected by the op_with_config
op. The my_graph_with_config_mapping
is a graph that includes the my_graph
and applies the my_graph_config_mapping
to it. Finally, we define my_graph_asset
as a graph asset that invokes my_graph_with_config_mapping
. When you materialize my_graph_asset
, you would provide a configuration that includes the param
key at the top level, and my_graph_config_mapping
would transform it into the correct structure for the op_with_config
. Please ensure that you refer to the latest Dagster documentation for the most accurate syntax and usage of @config_mapping
and other Dagster features.Jacob Marcil
03/06/2024, 7:54 PMScout
03/06/2024, 7:54 PMparam
is not explicitly set within the code snippet itself. Instead, the value for param
is expected to be provided through the configuration when the graph or asset is materialized (i.e., when a run is executed). Here's how the configuration would typically be provided: yaml # This is an example of a YAML configuration file that might be used to execute the graph or asset solids: op_with_config: config: param: "some_value"
Or, if you're using Python to execute the graph or asset, you might provide the configuration like this: python # This is an example of how you might provide configuration when launching a run programmatically run_config = { "solids": { "op_with_config": { "config": { "param": "some_value" } } } } # Assuming you have a repository and a pipeline defined, you would pass the run_config to the execution function result = my_repository.my_pipeline.execute_in_process(run_config=run_config)
In both cases, the value of param
is set to "some_value"
, which is a placeholder for whatever actual value you wish to configure for the op_with_config
operation. The param
value is then accessed within the op_with_config
operation using context.op_config["param"]
. Please note that the exact way you provide the configuration may vary depending on how you are executing your graph or asset (e.g., via the Dagster CLI, the Dagit UI, or programmatically). Always refer to the Dagster documentation for the specific details on how to provide configuration for your Dagster deployments.Jacob Marcil
03/06/2024, 7:55 PMScout
03/06/2024, 7:55 PM@graph
or @graph_asset
definition using the .configured
method on ops. This allows you to create a version of an op with specific configuration values bound to it. However, you cannot directly set configuration values on the @graph_asset
decorator itself. Here's an example of how you might use the .configured
method to set a default value for param
within a graph: python from dagster import op, graph, GraphOut @op(config_schema={"param": str}) def op_with_config(context): param = context.op_config["param"] <http://context.log.info|context.log.info>(f"Op config: {param}") return param configured_op_with_config = op_with_config.configured({"param": "default_value"}, name="configured_op_with_config") @graph def my_graph(): data = configured_op_with_config() # ... continue with the rest of the graph # Now when you use my_graph, the op_with_config will have "default_value" as the default for "param"
In this example, configured_op_with_config
is a version of op_with_config
with the param
configuration value set to "default_value"
. If you want to provide configuration at the graph level, you would typically use a config_mapping
to map the graph-level configuration to the op-level configuration. However, as of my knowledge cutoff date, the @graph_asset
decorator does not support a config
parameter directly. You would need to use config_mapping
or set defaults on the ops as shown above. For the most up-to-date information and capabilities, please refer to the latest Dagster documentation.Jacob Marcil
03/06/2024, 9:13 PMScout
03/06/2024, 9:13 PMpython from dagster import job, op, resource, RunRequest, RunConfig # Define a resource with a config schema @resource(config_schema={"connection_string": str}) def my_database_resource(context): connection_string = context.resource_config["connection_string"] # Here you would create a database connection using the connection string return connection_string # Define an op that requires the database resource @op(required_resource_keys={"database"}) def my_op(context): # Access the database resource within the op connection_string = context.resources.database <http://context.log.info|context.log.info>(f"Using database with connection string: {connection_string}") # Define a job that uses the op and the resource @job(resource_defs={"database": my_database_resource}) def my_job(): my_op() # Now, when you want to execute the job, you can provide the configuration for the resource at runtime run_config = { "resources": { "database": { "config": { "connection_string": "<postgresql://user:password@localhost:5432/mydb>" } } } } # Execute the job with the provided run configuration result = my_job.execute_in_process(run_config=run_config)
In this example, my_database_resource
is a resource that requires a connection_string
configuration. When defining the my_job
, we specify that it uses my_database_resource
by including it in resource_defs
. When we execute my_job
, we provide a run_config
dictionary that includes the configuration for the database
resource. The connection_string
is set to a PostgreSQL connection string in this case, but you would replace this with the actual connection string for your database. This is how you can configure a resource at runtime programmatically. The same configuration can be provided through the Dagit UI or the command line when launching a run, depending on your preferred execution environment. For the most accurate and up-to-date information on configuring resources at runtime, please refer to the latest Dagster documentation.