https://dagster.io/ logo
#announcements
Title
# announcements
d

dwall

02/04/2020, 8:59 PM
I know I've seen the Dagster team use factory functions for this, but is it possible just for a solid to be an abstraction of another solid?
m

max

02/04/2020, 9:15 PM
composite solids are the way to go i think
they let you remap config, inputs, and outputs arbitrarily
it may be that we want some sugar on the model of functools.wraps for this
👍 1
d

dwall

02/04/2020, 9:16 PM
cool. I keep coming back to composite solids but the docs make it seem more like a sub-DAG abstraction than a single solid abstraction
wanna make sure Im not using it incorrectly
m

max

02/04/2020, 9:20 PM
the only obstacle to using them everywhere is that display support might be awkward
for like, five or six nested levels in dagit
but that's a reasonable thing to push us to improve
d

dwall

02/04/2020, 9:44 PM
ah hah - I think this is what I've been looking for:
Copy code
config_fn (Callable[[ConfigMappingContext, dict], dict]) –

By specifying a config mapping function, you can override the configuration for the child solids contained within this composite solid.

Config mappings require the configuration field to be specified as config, which will be exposed as the configuration field for the composite solid, as well as a configuration mapping function, config_fn, which maps the config provided to the composite solid to the config that will be provided to the child solids.
do you know anywhere I can see that ^ in action
d

dwall

02/04/2020, 9:55 PM
yeah - this is money
🤑 2
I think the config mapping has been the missing piece for me
Copy code
@solid(
    description="A solid to invoke ranch run.",
    input_defs=[InputDefinition(name="start", dagster_type=Nothing, description="Nothing.")],
    output_defs=[OutputDefinition(name="end", dagster_type=Nothing, description="Nothing.")],
    config={
        "command": Field(
            dagster_type=String, is_optional=False, description="The command to run against the ranch application.",
        ),
        "app": Field(
            dagster_type=String, is_optional=False, description="The ranch application to run the command against.",
        ),
        "env": Field(
            dagster_type=PermissiveDict(),
            default_value=None,
            is_optional=True,
            description="An optional dict of environment variables to pass to the subprocess.",
        ),
        "output_logging": Field(
            dagster_type=Enum(
                name="OutputType",
                enum_values=[
                    EnumValue("STREAM", description="Stream script stdout/stderr."),
                    EnumValue("BUFFER", description="Buffer bash script stdout/stderr, then log upon completion.",),
                    EnumValue("NONE", description="No logging"),
                ],
            ),
            is_optional=True,
            default_value="STREAM",
        ),
    },
    required_resource_keys={"ranch"},
)
def ranch_run(context):

    bash_command = f"ranch run {context.solid_config['command']}" + f" --app {context.solid_config['app']}"

    <http://context.log.info|context.log.info>("Setting up environment for subprocess..")
    if context.solid_config["env"] is not None:
        env = context.solid_config["env"]
    else:
        env = os.environ.copy()
    env.update({"RANCH_TOKEN": context.resources.ranch.token})

    <http://context.log.info|context.log.info>(f"Temporary directory root location: \n {gettempdir()}")

    with TemporaryDirectory(prefix="dagstertmp_") as tmpdir:
        with NamedTemporaryFile(dir=tmpdir, prefix=context.run_id) as f:

            f.write(bytes(bash_command, "utf-8"))
            f.flush()
            fname = f.name
            script_location = os.path.abspath(fname)
            <http://context.log.info|context.log.info>(f"Temporary script location: {script_location}")

            def pre_exec():
                for sig in ("SIGPIPE", "SIGXFZ", "SIGXFSZ"):
                    if hasattr(signal, sig):
                        signal.signal(getattr(signal, sig), signal.SIG_DFL)
                os.setsid()

            <http://context.log.info|context.log.info>(f"Running command: {bash_command}")

            sub_process = Popen(["bash", f.name], stdout=PIPE, stderr=STDOUT, cwd=tmpdir, env=env, preexec_fn=pre_exec,)

            # Stream back logs as they are emitted
            if context.solid_config["output_logging"] == "STREAM":
                line = ""
                for raw_line in iter(sub_process.stdout.readline, b""):
                    line = raw_line.decode("utf-8").rstrip()
                    <http://context.log.info|context.log.info>(line)

            sub_process.wait()

            # Collect and buffer all logs, then emit
            if context.solid_config["output_logging"] == "BUFFER":
                line = ""
                for raw_line in iter(sub_process.stdout.readline, b""):
                    line += raw_line.decode("utf-8")
                <http://context.log.info|context.log.info>(line)

            # no logging in this case
            elif context.solid_config["output_logging"] == "NONE":
                pass

            <http://context.log.info|context.log.info>(f"Command exited with return code {sub_process.returncode}")

            if sub_process.returncode:
                raise Failure(description=f"Bash command failed: {bash_command}")


def gsheet_wrap_ranch_run_fn(cfg) -> Dict:

    bash_command = (
        "ranch run -- /entrypoint /project/platform/utils/launch.sh"
        + f" ./bin/mk-sheets-importer --id {cfg['gsheet_id']}"
    )

    return {
        "ranch_run": {
            "config": {
                "command": bash_command,
                "app": "etl-ng-mk-sheets-importer",
                "env": cfg["env"],
                "output_logging": cfg["output_logging"],
            }
        }
    }


@composite_solid(
    description="A solid to invoke a ranch process to ingest a gsheet to the data warehouse.",
    input_defs=[InputDefinition(name="start", dagster_type=Nothing, description="Nothing.")],
    output_defs=[OutputDefinition(name="end", dagster_type=Nothing, description="Nothing.")],
    config={
        "ghseet_id": Field(dagster_type=String, is_optional=False, description="The ID of the gsheet to ingest.",),
        "env": Field(
            dagster_type=PermissiveDict(),
            default_value=None,
            is_optional=True,
            description="An optional dict of environment variables to pass to the subprocess.",
        ),
        "output_logging": Field(
            dagster_type=Enum(
                name="OutputType",
                enum_values=[
                    EnumValue("STREAM", description="Stream script stdout/stderr."),
                    EnumValue("BUFFER", description="Buffer bash script stdout/stderr, then log upon completion.",),
                    EnumValue("NONE", description="No logging"),
                ],
            ),
            is_optional=True,
            default_value="STREAM",
        ),
    },
    config_fn=gsheet_wrap_ranch_run_fn,
)
def ranch_ingest_gsheet():
    return ranch_run()
okay I think Im really close here. Here are my solids:
when trying to run a pipeline using
ranch_ingest_gsheet
solid, I'm getting the following error:
Copy code
dagster.core.errors.DagsterInvalidDefinitionError: @composite_solid 'ranch_ingest_gsheet' decorated function does not have parameter(s) 'start', which are in solid's input_defs. Solid functions should only have keyword arguments that match input names and a first positional parameter named 'context'.
what am I missing?
a

alex

02/04/2020, 10:21 PM
def ranch_ingest_gsheet():
->
def ranch_ingest_gsheet(start):
+ also
return ranch_run(start)
d

dwall

02/04/2020, 10:22 PM
made that change and got this:
Copy code
TypeError: gsheet_wrap_ranch_run_fn() takes 1 positional argument but 2 were given
does the
config_fn
also need to be provided the solid inputs?
a

alex

02/04/2020, 10:22 PM
config mapping functions get passed 2 args
d

dwall

02/04/2020, 10:23 PM
👍 whats the second arg?
other than
cfg
a

alex

02/04/2020, 10:23 PM
Callable[[ConfigMappingContext, dict]
so the first arg is this context thing (which is going away)
so just do
def gsheet_wrap_ranch_run_fn(_, cfg) -> Dict:
d

dwall

02/04/2020, 10:24 PM
👍
a

alex

02/04/2020, 10:25 PM
any recommended changes to the error message above about missing the argument
start
?
d

dwall

02/04/2020, 10:26 PM
I guess I'm confused why I need to specify
start
argument in the composite solid but not the solid that it is wrapping
(in the solid definition)
a

alex

02/04/2020, 10:27 PM
well now that is indeed very confusing
composites are doing input / output mapping
you are wiring dependencies through to your inner solids
so even though no real value will show on these
Nothing
edges - you need to wire up how the dependencies flow through
which is
very confusing and i dont know how to make that any better 🙃
d

dwall

02/04/2020, 10:29 PM
Lol
okay well I appreciate the explanation regardless
this whole convo has been extremely enlightening
a

alex

02/04/2020, 10:29 PM
very useful feedback for us so thanks for playing along
d

dwall

02/04/2020, 10:29 PM
thanks @alex and @max!
m

max

02/04/2020, 10:46 PM
@dwall please let me know about anything else you run into that isn't documented anywhere and you wish you'd known about
d

dwall

02/04/2020, 10:47 PM
out of curiosity, what's the thinking behind composite solids not being able to have required resources?
now that I've seen the light wrt composite solids, I'd like to rewrite one of my solids to be a composite solid that just wraps the bash command solid in the
dagster-bash
library. However, if someone uses this composite solid I need them to have a specific resource (our homegrown deployment platform, "ranch") configured
a

alex

02/04/2020, 10:54 PM
the required resources of a composite is the union of all the required resources of its children - the composite doesnt actually execute, it gets compiled away at execution time. The only thing that can use a resource is a solid - so any dependencies should be on the solids them selves
d

dwall

02/04/2020, 10:55 PM
@alex makes sense. So in my case, I don't really have an option other than to just re-write the bash command solid with an added required resource
a

alex

02/04/2020, 10:56 PM
how were you thinking of accessing this resource?
d

dwall

02/04/2020, 10:57 PM
injecting resource credentials into env during bash execution
a

alex

02/04/2020, 10:58 PM
when/how would you do that injection?
a

alex

02/04/2020, 11:00 PM
hmm I dont think you have access to the resources from the config mapping function
d

dwall

02/04/2020, 11:02 PM
oh okay. so config mapping function doesnt get passed context
a

alex

02/04/2020, 11:03 PM
well - the version you have does
d

dwall

02/04/2020, 11:03 PM
hah
a

alex

02/04/2020, 11:03 PM
im not sure what exactly is on that context, we’ve removed it in master since we are restructuring our resource handling
d

dwall

02/04/2020, 11:04 PM
okay, nw
a

alex

02/04/2020, 11:33 PM
I think we should do a light refactor of the bash solid to make it easier to re use stuff cc @nate
1