hi all! I'm trying to create jobs with logic condi...
# ask-community
hi all! I'm trying to create jobs with logic conditional on an environment variable. I am creating my jobs from a graph (there are multiple ones with the same logic). Essentially I don't want the last step (
) to run if i'm running dagster locally. this didn't seem to work - can someone point me in the right direction?
Copy code
# base graph for running dbt transformations.
# this will run transformations, run dbt test, and deploy the dbt documentation
def run_dbt_transformations():
    env = os.getenv('ENVIRONMENT')
    if env == Env.DEV:

# dagster jobs for specific model groups. they use the same dagster graph but with different dagster-dbt resources
dbt_run_assist_models = run_dbt_transformations.to_job(resource_defs={"dbt_resource": dbt_resource_assist}, tags=DBT_DOCS_K8S_CONFIG, name="dbt_run_assist_models")
Copy code
    "dagster-k8s/config": {
        "pod_spec_config": {
            "serviceAccountName": "dbt-data-transformation",
            "volumes": [{"name": "config-secrets", "secret": {"secret_name": "github-app-secret"}}]
        "container_config": {
            "resources": {
                "requests": {"cpu": "250m", "memory": "64Mi"},
                "limits": {"cpu": "500m", "memory": "2560Mi"},
            "volume_mounts": [
                {"name": "config-secrets", "mount_path": "/etc/secrets"}
            "envFrom": [ # THIS IS WHERE ENVIRONMENT IS PASSED IN 
                    "configMapRef": {
                        "name": "dbt-custom-run-env"
cc @owen
hi @Salina Wu! When the shape of your graph changes between different environments, this can definitely cause some unexpected issues (although I'm not sure exactly what is causing your example to break). A simpler way to model this would be to just have the dbt_docs_generation_op to do nothing in the case that you're in your dev environment, so something like:
Copy code
@op(required_resource_keys={"dbt"}, ins={"start_after": In(Nothing)})
def dbt_docs_generation_op(context):
    if env != Env.DEV:
ok, sounds good! thanks owen!
🎉 1
tried this, but it seems like the ops are run in separate subprocesses so they don't have access to the environment variable 😕 we're looking into workarounds but any other suggestions or insight would be very appreciated!
ah I see (that's probably what was going wrong with the graph solution as well). Generally, when you want to change how an op functions in different environments, you would specify that using config on the op, so something like
Copy code
from dagster import Field
@op(config_schema={"should_run": Field(bool, default_value=True)}, required_resource_keys={"dbt"}, ins={"start_after": In(Nothing)})
def dbt_docs_generation_op(context):
    if context.op_config.should_run:
then, when you're creating the dev version of this graph, you can supply a False value for this configuration option to "turn off" this op
ohhhh this is neat! this looks simpler than using sensors. can you explain what you mean by "dev version" of the graph and how to pass a False value to the op config?
ah sorry missed this notification -- when you're calling .to_job() on your graph, you can pass in config (so something like
.to_job(..., config={"ops": {"dbt_docs_generation_op": {"config": {"should_run": False}}}})
so you can create two versions of your graph, one for running in your development environment, and one for running in your production environment
Copy code
dbt_run_assist_models_dev = run_dbt_transformations.to_job(..., config=<that config>)
dbt_run_assist_models_prod = run_dbt_transformations.to_job(...)