I know I've seen the Dagster team use factory func...
# announcements
d
I know I've seen the Dagster team use factory functions for this, but is it possible just for a solid to be an abstraction of another solid?
m
composite solids are the way to go i think
they let you remap config, inputs, and outputs arbitrarily
it may be that we want some sugar on the model of functools.wraps for this
👍 1
d
cool. I keep coming back to composite solids but the docs make it seem more like a sub-DAG abstraction than a single solid abstraction
wanna make sure Im not using it incorrectly
m
the only obstacle to using them everywhere is that display support might be awkward
for like, five or six nested levels in dagit
but that's a reasonable thing to push us to improve
d
ah hah - I think this is what I've been looking for:
Copy code
config_fn (Callable[[ConfigMappingContext, dict], dict]) –

By specifying a config mapping function, you can override the configuration for the child solids contained within this composite solid.

Config mappings require the configuration field to be specified as config, which will be exposed as the configuration field for the composite solid, as well as a configuration mapping function, config_fn, which maps the config provided to the composite solid to the config that will be provided to the child solids.
do you know anywhere I can see that ^ in action
d
yeah - this is money
🤑 2
I think the config mapping has been the missing piece for me
Copy code
@solid(
    description="A solid to invoke ranch run.",
    input_defs=[InputDefinition(name="start", dagster_type=Nothing, description="Nothing.")],
    output_defs=[OutputDefinition(name="end", dagster_type=Nothing, description="Nothing.")],
    config={
        "command": Field(
            dagster_type=String, is_optional=False, description="The command to run against the ranch application.",
        ),
        "app": Field(
            dagster_type=String, is_optional=False, description="The ranch application to run the command against.",
        ),
        "env": Field(
            dagster_type=PermissiveDict(),
            default_value=None,
            is_optional=True,
            description="An optional dict of environment variables to pass to the subprocess.",
        ),
        "output_logging": Field(
            dagster_type=Enum(
                name="OutputType",
                enum_values=[
                    EnumValue("STREAM", description="Stream script stdout/stderr."),
                    EnumValue("BUFFER", description="Buffer bash script stdout/stderr, then log upon completion.",),
                    EnumValue("NONE", description="No logging"),
                ],
            ),
            is_optional=True,
            default_value="STREAM",
        ),
    },
    required_resource_keys={"ranch"},
)
def ranch_run(context):

    bash_command = f"ranch run {context.solid_config['command']}" + f" --app {context.solid_config['app']}"

    <http://context.log.info|context.log.info>("Setting up environment for subprocess..")
    if context.solid_config["env"] is not None:
        env = context.solid_config["env"]
    else:
        env = os.environ.copy()
    env.update({"RANCH_TOKEN": context.resources.ranch.token})

    <http://context.log.info|context.log.info>(f"Temporary directory root location: \n {gettempdir()}")

    with TemporaryDirectory(prefix="dagstertmp_") as tmpdir:
        with NamedTemporaryFile(dir=tmpdir, prefix=context.run_id) as f:

            f.write(bytes(bash_command, "utf-8"))
            f.flush()
            fname = f.name
            script_location = os.path.abspath(fname)
            <http://context.log.info|context.log.info>(f"Temporary script location: {script_location}")

            def pre_exec():
                for sig in ("SIGPIPE", "SIGXFZ", "SIGXFSZ"):
                    if hasattr(signal, sig):
                        signal.signal(getattr(signal, sig), signal.SIG_DFL)
                os.setsid()

            <http://context.log.info|context.log.info>(f"Running command: {bash_command}")

            sub_process = Popen(["bash", f.name], stdout=PIPE, stderr=STDOUT, cwd=tmpdir, env=env, preexec_fn=pre_exec,)

            # Stream back logs as they are emitted
            if context.solid_config["output_logging"] == "STREAM":
                line = ""
                for raw_line in iter(sub_process.stdout.readline, b""):
                    line = raw_line.decode("utf-8").rstrip()
                    <http://context.log.info|context.log.info>(line)

            sub_process.wait()

            # Collect and buffer all logs, then emit
            if context.solid_config["output_logging"] == "BUFFER":
                line = ""
                for raw_line in iter(sub_process.stdout.readline, b""):
                    line += raw_line.decode("utf-8")
                <http://context.log.info|context.log.info>(line)

            # no logging in this case
            elif context.solid_config["output_logging"] == "NONE":
                pass

            <http://context.log.info|context.log.info>(f"Command exited with return code {sub_process.returncode}")

            if sub_process.returncode:
                raise Failure(description=f"Bash command failed: {bash_command}")


def gsheet_wrap_ranch_run_fn(cfg) -> Dict:

    bash_command = (
        "ranch run -- /entrypoint /project/platform/utils/launch.sh"
        + f" ./bin/mk-sheets-importer --id {cfg['gsheet_id']}"
    )

    return {
        "ranch_run": {
            "config": {
                "command": bash_command,
                "app": "etl-ng-mk-sheets-importer",
                "env": cfg["env"],
                "output_logging": cfg["output_logging"],
            }
        }
    }


@composite_solid(
    description="A solid to invoke a ranch process to ingest a gsheet to the data warehouse.",
    input_defs=[InputDefinition(name="start", dagster_type=Nothing, description="Nothing.")],
    output_defs=[OutputDefinition(name="end", dagster_type=Nothing, description="Nothing.")],
    config={
        "ghseet_id": Field(dagster_type=String, is_optional=False, description="The ID of the gsheet to ingest.",),
        "env": Field(
            dagster_type=PermissiveDict(),
            default_value=None,
            is_optional=True,
            description="An optional dict of environment variables to pass to the subprocess.",
        ),
        "output_logging": Field(
            dagster_type=Enum(
                name="OutputType",
                enum_values=[
                    EnumValue("STREAM", description="Stream script stdout/stderr."),
                    EnumValue("BUFFER", description="Buffer bash script stdout/stderr, then log upon completion.",),
                    EnumValue("NONE", description="No logging"),
                ],
            ),
            is_optional=True,
            default_value="STREAM",
        ),
    },
    config_fn=gsheet_wrap_ranch_run_fn,
)
def ranch_ingest_gsheet():
    return ranch_run()
okay I think Im really close here. Here are my solids:
when trying to run a pipeline using
ranch_ingest_gsheet
solid, I'm getting the following error:
Copy code
dagster.core.errors.DagsterInvalidDefinitionError: @composite_solid 'ranch_ingest_gsheet' decorated function does not have parameter(s) 'start', which are in solid's input_defs. Solid functions should only have keyword arguments that match input names and a first positional parameter named 'context'.
what am I missing?
a
def ranch_ingest_gsheet():
->
def ranch_ingest_gsheet(start):
+ also
return ranch_run(start)
d
made that change and got this:
Copy code
TypeError: gsheet_wrap_ranch_run_fn() takes 1 positional argument but 2 were given
does the
config_fn
also need to be provided the solid inputs?
a
config mapping functions get passed 2 args
d
👍 whats the second arg?
other than
cfg
a
Callable[[ConfigMappingContext, dict]
so the first arg is this context thing (which is going away)
so just do
def gsheet_wrap_ranch_run_fn(_, cfg) -> Dict:
d
👍
a
any recommended changes to the error message above about missing the argument
start
?
d
I guess I'm confused why I need to specify
start
argument in the composite solid but not the solid that it is wrapping
(in the solid definition)
a
well now that is indeed very confusing
composites are doing input / output mapping
you are wiring dependencies through to your inner solids
so even though no real value will show on these
Nothing
edges - you need to wire up how the dependencies flow through
which is
very confusing and i dont know how to make that any better 🙃
d
Lol
okay well I appreciate the explanation regardless
this whole convo has been extremely enlightening
a
very useful feedback for us so thanks for playing along
d
thanks @alex and @max!
m
@dwall please let me know about anything else you run into that isn't documented anywhere and you wish you'd known about
d
out of curiosity, what's the thinking behind composite solids not being able to have required resources?
now that I've seen the light wrt composite solids, I'd like to rewrite one of my solids to be a composite solid that just wraps the bash command solid in the
dagster-bash
library. However, if someone uses this composite solid I need them to have a specific resource (our homegrown deployment platform, "ranch") configured
a
the required resources of a composite is the union of all the required resources of its children - the composite doesnt actually execute, it gets compiled away at execution time. The only thing that can use a resource is a solid - so any dependencies should be on the solids them selves
d
@alex makes sense. So in my case, I don't really have an option other than to just re-write the bash command solid with an added required resource
a
how were you thinking of accessing this resource?
d
injecting resource credentials into env during bash execution
a
when/how would you do that injection?
a
hmm I dont think you have access to the resources from the config mapping function
d
oh okay. so config mapping function doesnt get passed context
a
well - the version you have does
d
hah
a
im not sure what exactly is on that context, we’ve removed it in master since we are restructuring our resource handling
d
okay, nw
a
I think we should do a light refactor of the bash solid to make it easier to re use stuff cc @nate
✅ 1