Hi, I'm looking for a way to parameterise a pipeli...
# announcements
o
Hi, I'm looking for a way to parameterise a pipeline, so I can namespace all the outputs of various jobs in the DAG. For instance, ideally I'd like to: 1. Grab a username from the initiating user, if they are kicking it off in the Dagster UI for instance. 2. Use that to programmatically generate a database name, which would include the username. 3. Set that name in the pipeline context somewhere programmatically. 4. Use that database name in different solids in the pipeline, where one solid might delete/create the database, and another solid runs a job loading the database. Is there anyway to achieve this? Apologies, if I have missed something obvious in the docs.
s
Hey Owen, you could create a
resource
to achieve this: https://dagster.readthedocs.io/en/0.7.5/sections/tutorial/resources.html
You can configure the resource to take a
username
as config, and when initializing the resource create a connection to the the db with the username
Also, if you store the
username
as a property on the resource, you could access it on a solid context through
context.resources.my_resource_name.username
o
Brilliant. Thanks for the help. I'll check that out.
s
Alternatively, since you’re actually creating/deleting the database in your solid rather than connecting to an existing database, you could also avoid using a resource and just pass the username as config to each solid that requires it
For example, your solid may look like:
Copy code
@solid(config={'username: str})
def create_db(context):
   pass
o
In the second option, how would I write the programmatically generated database name into the context?
s
What I would recommend is passing the programmatically generated database name as an
Output
of the solid
And any solids that depend on the generated database name would take it as an
Input
o
I think I understand, yep, sounds like it should work. I will go it a go and see where I get. Thanks.
s
Here’s a fully working example that you can run that demonstrates this:
Copy code
from dagster import pipeline, solid, RepositoryDefinition, InputDefinition, OutputDefinition, Output

def create_new_db():
    return "generated_db_name"

@solid(
 config={'username': str}, 
 output_defs=[OutputDefinition(str, "db_name")],
)
def create_database(context):
   name = create_new_db()
   yield Output(value=name, output_name="db_name")

@solid(
    input_defs=[InputDefinition("db_name", str)]
)
def do_some_action(context, db_name):
    context.log(db_name)

@solid(
    input_defs=[InputDefinition("db_name", str)]
)
def do_some_other_action(context, db_name):
    context.log(db_name)


@pipeline
def my_pipeline():
	db_name = create_database()
	do_some_action(db_name)
	do_some_other_action(db_name)


def define_repository():
    return RepositoryDefinition("my_repository", pipeline_defs=[my_pipeline])
Then, when executing in the Dagit UI, you can just specify this config:
Copy code
solids:
  create_database:
    config:
      username: "owen"
o
thanks for the example code
🎉 1