dwall
02/04/2020, 8:59 PMmax
02/04/2020, 9:15 PMdwall
02/04/2020, 9:16 PMmax
02/04/2020, 9:20 PMdwall
02/04/2020, 9:44 PMconfig_fn (Callable[[ConfigMappingContext, dict], dict]) –
By specifying a config mapping function, you can override the configuration for the child solids contained within this composite solid.
Config mappings require the configuration field to be specified as config, which will be exposed as the configuration field for the composite solid, as well as a configuration mapping function, config_fn, which maps the config provided to the composite solid to the config that will be provided to the child solids.
alex
02/04/2020, 9:54 PMdwall
02/04/2020, 9:55 PM@solid(
description="A solid to invoke ranch run.",
input_defs=[InputDefinition(name="start", dagster_type=Nothing, description="Nothing.")],
output_defs=[OutputDefinition(name="end", dagster_type=Nothing, description="Nothing.")],
config={
"command": Field(
dagster_type=String, is_optional=False, description="The command to run against the ranch application.",
),
"app": Field(
dagster_type=String, is_optional=False, description="The ranch application to run the command against.",
),
"env": Field(
dagster_type=PermissiveDict(),
default_value=None,
is_optional=True,
description="An optional dict of environment variables to pass to the subprocess.",
),
"output_logging": Field(
dagster_type=Enum(
name="OutputType",
enum_values=[
EnumValue("STREAM", description="Stream script stdout/stderr."),
EnumValue("BUFFER", description="Buffer bash script stdout/stderr, then log upon completion.",),
EnumValue("NONE", description="No logging"),
],
),
is_optional=True,
default_value="STREAM",
),
},
required_resource_keys={"ranch"},
)
def ranch_run(context):
bash_command = f"ranch run {context.solid_config['command']}" + f" --app {context.solid_config['app']}"
<http://context.log.info|context.log.info>("Setting up environment for subprocess..")
if context.solid_config["env"] is not None:
env = context.solid_config["env"]
else:
env = os.environ.copy()
env.update({"RANCH_TOKEN": context.resources.ranch.token})
<http://context.log.info|context.log.info>(f"Temporary directory root location: \n {gettempdir()}")
with TemporaryDirectory(prefix="dagstertmp_") as tmpdir:
with NamedTemporaryFile(dir=tmpdir, prefix=context.run_id) as f:
f.write(bytes(bash_command, "utf-8"))
f.flush()
fname = f.name
script_location = os.path.abspath(fname)
<http://context.log.info|context.log.info>(f"Temporary script location: {script_location}")
def pre_exec():
for sig in ("SIGPIPE", "SIGXFZ", "SIGXFSZ"):
if hasattr(signal, sig):
signal.signal(getattr(signal, sig), signal.SIG_DFL)
os.setsid()
<http://context.log.info|context.log.info>(f"Running command: {bash_command}")
sub_process = Popen(["bash", f.name], stdout=PIPE, stderr=STDOUT, cwd=tmpdir, env=env, preexec_fn=pre_exec,)
# Stream back logs as they are emitted
if context.solid_config["output_logging"] == "STREAM":
line = ""
for raw_line in iter(sub_process.stdout.readline, b""):
line = raw_line.decode("utf-8").rstrip()
<http://context.log.info|context.log.info>(line)
sub_process.wait()
# Collect and buffer all logs, then emit
if context.solid_config["output_logging"] == "BUFFER":
line = ""
for raw_line in iter(sub_process.stdout.readline, b""):
line += raw_line.decode("utf-8")
<http://context.log.info|context.log.info>(line)
# no logging in this case
elif context.solid_config["output_logging"] == "NONE":
pass
<http://context.log.info|context.log.info>(f"Command exited with return code {sub_process.returncode}")
if sub_process.returncode:
raise Failure(description=f"Bash command failed: {bash_command}")
def gsheet_wrap_ranch_run_fn(cfg) -> Dict:
bash_command = (
"ranch run -- /entrypoint /project/platform/utils/launch.sh"
+ f" ./bin/mk-sheets-importer --id {cfg['gsheet_id']}"
)
return {
"ranch_run": {
"config": {
"command": bash_command,
"app": "etl-ng-mk-sheets-importer",
"env": cfg["env"],
"output_logging": cfg["output_logging"],
}
}
}
@composite_solid(
description="A solid to invoke a ranch process to ingest a gsheet to the data warehouse.",
input_defs=[InputDefinition(name="start", dagster_type=Nothing, description="Nothing.")],
output_defs=[OutputDefinition(name="end", dagster_type=Nothing, description="Nothing.")],
config={
"ghseet_id": Field(dagster_type=String, is_optional=False, description="The ID of the gsheet to ingest.",),
"env": Field(
dagster_type=PermissiveDict(),
default_value=None,
is_optional=True,
description="An optional dict of environment variables to pass to the subprocess.",
),
"output_logging": Field(
dagster_type=Enum(
name="OutputType",
enum_values=[
EnumValue("STREAM", description="Stream script stdout/stderr."),
EnumValue("BUFFER", description="Buffer bash script stdout/stderr, then log upon completion.",),
EnumValue("NONE", description="No logging"),
],
),
is_optional=True,
default_value="STREAM",
),
},
config_fn=gsheet_wrap_ranch_run_fn,
)
def ranch_ingest_gsheet():
return ranch_run()
ranch_ingest_gsheet
solid, I'm getting the following error:
dagster.core.errors.DagsterInvalidDefinitionError: @composite_solid 'ranch_ingest_gsheet' decorated function does not have parameter(s) 'start', which are in solid's input_defs. Solid functions should only have keyword arguments that match input names and a first positional parameter named 'context'.
what am I missing?alex
02/04/2020, 10:21 PMdef ranch_ingest_gsheet():
-> def ranch_ingest_gsheet(start):
return ranch_run(start)
dwall
02/04/2020, 10:22 PMTypeError: gsheet_wrap_ranch_run_fn() takes 1 positional argument but 2 were given
config_fn
also need to be provided the solid inputs?alex
02/04/2020, 10:22 PMdwall
02/04/2020, 10:23 PMcfg
alex
02/04/2020, 10:23 PMCallable[[ConfigMappingContext, dict]
def gsheet_wrap_ranch_run_fn(_, cfg) -> Dict:
dwall
02/04/2020, 10:24 PMalex
02/04/2020, 10:25 PMstart
?dwall
02/04/2020, 10:26 PMstart
argument in the composite solid but not the solid that it is wrappingalex
02/04/2020, 10:27 PMNothing
edges - you need to wire up how the dependencies flow throughdwall
02/04/2020, 10:29 PMalex
02/04/2020, 10:29 PMdwall
02/04/2020, 10:29 PMmax
02/04/2020, 10:46 PMdwall
02/04/2020, 10:47 PMdagster-bash
library. However, if someone uses this composite solid I need them to have a specific resource (our homegrown deployment platform, "ranch") configuredalex
02/04/2020, 10:54 PMdwall
02/04/2020, 10:55 PMalex
02/04/2020, 10:56 PMdwall
02/04/2020, 10:57 PMalex
02/04/2020, 10:58 PMdwall
02/04/2020, 10:59 PMalex
02/04/2020, 11:00 PMdwall
02/04/2020, 11:02 PMalex
02/04/2020, 11:03 PMdwall
02/04/2020, 11:03 PMalex
02/04/2020, 11:03 PMdwall
02/04/2020, 11:04 PMalex
02/04/2020, 11:33 PM