Basil V
03/25/2020, 6:50 PMprha
03/25/2020, 6:51 PMNothing
type as Nick suggests, or you could pass a newly constructed object with the table name and maybe the load time or something like thatBasil V
03/25/2020, 6:52 PMBasil V
03/25/2020, 6:53 PMdwall
03/25/2020, 6:53 PMdwall
03/25/2020, 6:53 PM@solid(
description="A solid.",
input_defs=[InputDefinition(name="start_after", dagster_type=Nothing)]
)
def solid(context):
pass
...
@pipeline
def pipeline():
solid(start_after=[other_solid(), another_solid()])
prha
03/25/2020, 6:53 PMBasil V
03/25/2020, 6:54 PMBasil V
03/25/2020, 6:54 PMBasil V
03/25/2020, 6:55 PMprha
03/25/2020, 6:55 PMBasil V
03/25/2020, 6:55 PMBasil V
03/25/2020, 6:56 PMBasil V
03/25/2020, 6:56 PMprha
03/25/2020, 6:57 PMBasil V
03/25/2020, 6:58 PMprha
03/25/2020, 7:02 PMBasil V
03/25/2020, 7:03 PMprha
03/25/2020, 7:05 PMBasil V
03/25/2020, 7:23 PMBasil V
03/25/2020, 7:24 PMdagster.core.errors.DagsterInvalidDefinitionError: @composite_solid 'load' decorated function does not have parameter(s) 'start_after', which are in solid's input_defs. Solid functions should only have keyword arguments that match input names and a first positional parameter named 'context'.
Basil V
03/25/2020, 7:25 PM@composite_solid(
description='The L in ELT',
input_defs=[InputDefinition(name='start_after', dagster_type=Nothing)]
)
def load() -> Nothing:
...
dwall
03/25/2020, 7:31 PMstart_after
as an input to your load()
functionBasil V
03/25/2020, 7:32 PMdwall
03/25/2020, 7:32 PMdef load(start_after: Nothing) -> Nothing:
dwall
03/25/2020, 7:32 PMBasil V
03/25/2020, 7:33 PM@solid(
description='The L in ELT',
input_defs=[InputDefinition(name='start_after', dagster_type=Nothing)]
)
def load(context) -> Nothing:
Basil V
03/25/2020, 7:33 PMBasil V
03/25/2020, 7:33 PMdwall
03/25/2020, 7:34 PM@composite_solid(
name="dbt_rpc_snapshot_freshness_and_wait",
description="Invoke a dbt source snapshot-freshness process and wait for it to finish.",
input_defs=[InputDefinition(name="start_after", dagster_type=Nothing,)],
output_defs=[OutputDefinition(name="result", dagster_type=DbtRpcPollResult)],
config={
"select": Field(
config=Noneable(Array(String)),
default_value=None,
is_required=False,
description="The dbt sources to snapshot freshness for.",
),
"interval": Field(
config=Int,
is_required=False,
default_value=10,
description="The interval (in seconds) at which to poll the dbt rpc process.",
),
"logs": Field(config=Bool, is_required=False, default_value=True),
},
config_fn=lambda cfg: {
"dbt_rpc_snapshot_freshness": {"config": {"select": cfg["select"]}},
"dbt_rpc_poll": {"config": {"logs": cfg["logs"], "interval": cfg["interval"]}},
},
)
def dbt_rpc_snapshot_freshness_and_wait(start_after: Nothing) -> DbtRpcPollResult:
return dbt_rpc_poll(request_token=dbt_rpc_snapshot_freshness(start_after=start_after))
alex
03/25/2020, 7:34 PM@composite_solid
s are input mappings, and while Nothing
has no value so is not useful in the compute body of a @solid
, the @composite
still needs to route where the Nothing
is supposed to go toBasil V
03/25/2020, 7:37 PMBasil V
03/25/2020, 7:38 PMdbt_rpc_snapshot_freshness(start_after=start_after)
dwall
03/25/2020, 7:39 PMdbt_rpc_snapshot_freshness
solid has a Nothing
input, so all the composite solid is doing is mapping the Nothing
it recieved as an input to the Nothing
input of the dbt_rpc_snapshot_freshness
soliddwall
03/25/2020, 7:40 PMNothing
to any child solid that has Nothing
as an inputdwall
03/25/2020, 7:41 PMNothing
inputdwall
03/25/2020, 7:41 PMdbt_rpc_snapshot_freshness
solid def:
@solid(
description="A solid to invoke dbt source snapshot-freshness.",
input_defs=[InputDefinition(name="start_after", dagster_type=Nothing)],
output_defs=[
OutputDefinition(
name="request_token", dagster_type=String, description="The request token of the invoked dbt snapshot."
)
],
config={
"select": Field(
config=Noneable(Array(String)),
default_value=None,
is_required=False,
description="The dbt sources to snapshot-freshness for.",
)
},
required_resource_keys={"rpc"},
tags={"kind": "dbt"},
)
def dbt_rpc_snapshot_freshness(context) -> String:
resp = context.resources.rpc.snapshot_freshness(select=context.solid_config["select"])
return resp.json().get("result").get("request_token")
Basil V
03/25/2020, 7:41 PMstart_after
param? I mean there isn't way to simply define the start_after
in the composite and handle it there without having to pass it to any of the solids within?dwall
03/25/2020, 7:42 PMBasil V
03/25/2020, 7:44 PMBasil V
03/25/2020, 7:44 PMalex
03/25/2020, 7:52 PMNothing
is just a hacked up data dependency that can be used to create hacky sequencing dependencies. Its cumbersome nature reflects this.Basil V
03/25/2020, 7:57 PM