William
12/16/2022, 2:17 AMasset
but itโs strange schedules do not support native asset but we have to convert them to jobs. Sensors recently added support for asset, shall we do the same for schedules?Stephen Bailey
12/16/2022, 2:44 AMfrom dagster import define_asset_job, ScheduleDefinition, AssetSelection
schedule = ScheduleDefinition(
name = "run_asset_group",
job = define_asset_job(AssetSelection.groups("my_group"),
cron_schedule = "..."
)
I don't love the asset selection thing, but it does make it pretty lightweight to spin up a custom asset schedulesandy
12/16/2022, 4:20 PMsandy
12/16/2022, 4:21 PMStephen Bailey
12/16/2022, 4:22 PMStephen Bailey
12/16/2022, 4:23 PMStephen Bailey
12/16/2022, 4:24 PMsandy
12/16/2022, 4:28 PMAssetSelection.assets(*my_assets_list)
Stephen Bailey
12/16/2022, 7:33 PMasset_a_key = AssetKey(["snowflake", "db", "schema", "table_a"])
asset_b_key = AssetKey(["snowflake", "db", "schema", "table_b"])
how do i select those two assets?Sean Lopp
12/16/2022, 9:21 PMAssetSelection.keys([AssetKey["schema", "dbt_model"],AssetKey["schema", "dbt_model2"]) | AssetSelection.assets(python_asset_object)
And I never get it right the first, second, or third time. And I never remember if keys or assets is the string or the object. And whether or not the arguments are a list or a list of lists. The IDE tries to help but the type hint is basically "thing coercible to keys" and IDK what thing or coerce means necessarily in this context
The upstream
, downstream
stuff is coolSean Lopp
12/16/2022, 9:21 PMsandy
12/16/2022, 9:40 PMSean Lopp
12/16/2022, 9:46 PMasset_a_key = AssetKey(["snowflake", "db", "schema", "table_a"])
asset_b_key = AssetKey(["snowflake", "db", "schema", "table_b"])
AssetSelection.assets(asset_a_key, asset_b_key)
#OR
AssetSelection.assets(asset_a_key) |
AssetSelection.assets(asset_a_key)
Note that in this context &
is restrictive |
or is additive
You could also probably do some invocation of AssetSelection.keys
though I'd have to play around to see if I could figure out whether keys is a list of AssetKeys or a list of lists or something elseStephen Bailey
01/03/2023, 3:40 PMsandy
01/03/2023, 9:49 PMAssetSelection.keys(*asset_key_list)
to work
is it possible that one of your assets used to be partitioned, but is no longer partitioned? or vice versa?Stephen Bailey
01/04/2023, 2:18 AMsandy
01/04/2023, 5:22 AMStephen Bailey
01/04/2023, 1:57 PMdef test_selection():
selection = AssetSelection.group("my_group_with_four_assets")
assert selection.asset_count == 4
owen
01/04/2023, 9:36 PMfrom my_project import my_repo
def test_selection():
selection = AssetSelection.groups("my_group")
selected_keys = selection.resolve(my_repo.asset_graph)
assert len(selected_keys) == 4
Nicolas Parot Alvarez
01/05/2023, 5:04 PMAssetSelection.groups("my_group")
because strings are hard to refactor automatically and if I make a typo, my IDE cannot detect, at coding time, that the object doesn't actually exist.
So instead, I try to use things like:
asset_keys=[asset.key for asset in my_assets]
Similarly, I try to avoid those hard coded op/asset names in my job config, so it doesn't break if I decide to change my op/asset names.
So I try to use things like:
my_op.name: { "config": {...}}
sandy
01/05/2023, 6:06 PMAssetSelection.keys(*[asset.key for asset in my_assets])
, you can do AssetSelection.assets(*my_assets)
.Nicolas Parot Alvarez
01/06/2023, 4:05 PMasset_selection
instead of both asset_selection
and asset_keys
, where we can pass anything that clearly points to assets: AssetSelection, a list of Assets, a list of AssetKeys, or just strings, and then Dagster handles the interpretation of it.sandy
01/06/2023, 4:36 PMMaybe we could have only one parameterAre there particular functions / classes that you're talking for this?instead of bothasset_selection
andasset_selection
, where we can pass anything that clearly points to assets: AssetSelection, a list of Assets, a list of AssetKeys, or just strings, and then Dagster handles the interpretation of itasset_keys
define_asset_job
?Nicolas Parot Alvarez
01/06/2023, 6:08 PM@multi_asset_sensor()
, but my point is general for all signatures where assets need to be selected.sandy
01/06/2023, 6:31 PM@multi_asset_sensor
params especially are a bit of a mess. I filed an issue for addressing this: https://github.com/dagster-io/dagster/issues/11558sandy
01/06/2023, 6:32 PMNicolas Parot Alvarez
01/06/2023, 7:16 PMAssetSelection
like you told me:
@multi_asset_sensor(
asset_selection=AssetSelection.assets(*my_asset_sequence),
job=run_assets,
)
I would expect to be able to just pass my sequence of assets:
@multi_asset_sensor(
asset_selection=my_asset_sequence,
job=run_assets,
)
Why does my sequence of assets need to go through another abstraction for a sensor to understand it ?
If the sensor needs a specific attribute of the asset, it's its job to look for it.Stephen Bailey
01/06/2023, 7:57 PMdefine_asset_job
), and it does allow the mixing and matching selecting via groups
and assets
, which could be really useful for more complex cases. (It could also be useful, for example, to have tag matching in a future version.)Nicolas Parot Alvarez
01/06/2023, 8:07 PMdefine_asset_job
could also allow to directly use a sequence of assets.
Ok for having an additional abstraction if one requires additional complexity like mixing groups and keys in the selection.
But I think the syntax for the default usage of providing a sequence of assets could be easier.sandy
01/06/2023, 10:50 PMsandy
01/07/2023, 1:46 AM