geoHeil
12/22/2021, 9:45 PMreturn users_df
directly outputs the dataframe - so it is clear to me how the data is shared from op to op. But https://docs.dagster.io/tutorial/advanced-tutorial/materializations uses yield Output(None)
. Assuming I have created an asset with key `assset_foo_1`in the directory output/foo
How can I (1) link a second job using only the asset(key) as an input (i.e. like select * form table where the metastore knows the path details and (2) have the 2nd op/job re-trigger in case the asset was updated?owen
12/22/2021, 11:16 PMgeoHeil
12/23/2021, 6:06 AMowen
12/23/2021, 2:21 PMasset_foo_1
always be stored in the same place (output/foo
), or would it potentially changed based on things that happen at runtime?yield Output(None)
example is still defining a serde, but in a less flexible way where the serialization logic is inside the body of the op (and the deserialization logic will need to be duplicated in any op that wants to consume that asset). But whatever logic was used to determine where to store the asset can often be reused to determine where to load it (the IOManager is just a way of bundling it together). If the dependencies need to be across different jobs, a RootInputManager might be possible to use, but this would require that the path is dependent solely on things that are known about the input (so the name of the input or the asset key attached to the input, or arbitrary metadata you specify on the input definition).geoHeil
12/23/2021, 2:39 PMthe asset key attached to the input,
owen
12/23/2021, 2:45 PMgeoHeil
12/23/2021, 2:56 PMowen
12/23/2021, 2:59 PMgeoHeil
12/23/2021, 3:00 PMowen
12/23/2021, 3:06 PMsnowflake.schema_name.table_name
and parse the key to figure out where to read it from). Absent that, you can add arbitrary metadata to an InputDefinition using the metadata argument (which is just a dictionary). So you could for example add metadata to your input that says that it lives in table X in schema Y in database Z. This is basically just hardcoding, but the information does need to exist somewhere.geoHeil
12/23/2021, 3:09 PMpath
and value /path/to/asset
is present. How can I access the metadata for a given asset key? i.e. naively formulated look up the metastore for the table`s location parameter?owen
12/23/2021, 3:19 PMevent_records = context.instance.get_event_records(
EventRecordsFilter(
event_type=DagsterEventType.ASSET_MATERIALIZATION,
asset_key=asset_key,
),
ascending=False,
limit=1,
)
metadata = event_records[0].event_log_entry.materialization.metadata_entries
# ... pick out path from metadata
geoHeil
12/23/2021, 3:20 PMowen
12/23/2021, 3:34 PMgeoHeil
12/23/2021, 4:09 PMdaily_temperature_highs
) is updated (to remind me that the downstream one needs to be updated as well. Where would I get this function? Why is it not present here? (confused)assets=[daily_temperature_highs, hottest_dates]
all items seem to live in the same job (graph). But if a 2nd team would deliver some of the assets would I still manually need to add in a sensor to trigger? Where would I get the opportunity to add in the additional metadata hin case of the software defined assets similar to AssetMaterialization?@op(input_defs=In(asset_key=AssetKey("foo_my_asset_key")))
def get_asset(context):
get_dagster_logger().info(f"asset path: {context.asset_key.path}")
fails with:
dagster.check.ParameterCheckError: Param "input_defs" is not one of ['frozenlist', 'list']. Got In(dagster_type=<class 'dagster.core.definitions.utils.NoValueSentinel'>, description=None, default_value=<class 'dagster.core.definitions.utils.NoValueSentinel'>, root_manager_key=None, metadata=None, asset_key=AssetKey(['foo_my_asset_key']), asset_partitions=None) which is type <class 'dagster.core.definitions.input.In'>.
File "/path/to/conda_env/lib/python3.9/site-packages/dagster/grpc/server.py", line 205, in __init__
self._repository_symbols_and_code_pointers.load()
File "/path/to/conda_env/lib/python3.9/site-packages/dagster/grpc/server.py", line 90, in load
self._loadable_repository_symbols = load_loadable_repository_symbols(
File "/path/to/conda_env/lib/python3.9/site-packages/dagster/grpc/server.py", line 108, in load_loadable_repository_symbols
loadable_targets = get_loadable_targets(
File "/path/to/conda_env/lib/python3.9/site-packages/dagster/grpc/utils.py", line 37, in get_loadable_targets
else loadable_targets_from_python_package(package_name)
File "/path/to/conda_env/lib/python3.9/site-packages/dagster/core/workspace/autodiscovery.py", line 29, in loadable_targets_from_python_package
module = load_python_module(package_name, remove_from_path_fn=remove_from_path_fn)
File "/path/to/conda_env/lib/python3.9/site-packages/dagster/core/code_pointer.py", line 182, in load_python_module
module = importlib.import_module(module_name)
File "/path/to/conda_env/lib/python3.9/importlib/__init__.py", line 127, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
File "<frozen importlib._bootstrap>", line 1030, in _gcd_import
File "<frozen importlib._bootstrap>", line 1007, in _find_and_load
File "<frozen importlib._bootstrap>", line 986, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 680, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 850, in exec_module
File "<frozen importlib._bootstrap>", line 228, in _call_with_frames_removed
File "/path/to/dagster_project/__init__.py", line 1, in <module>
from .repository import INTEGRATED_PIPELINE
File "/path/to/dagster_project/repository.py", line 7, in <module>
from INTEGRATED_PIPELINE.graphs.my_jobthing import my_jobthing_job
File "/path/to/dagster_project/graphs/my_jobthing.py", line 66, in <module>
@op(input_defs=In(asset_key=AssetKey("foo_my_asset_key")))
File "/path/to/conda_env/lib/python3.9/site-packages/dagster/core/definitions/decorators/op.py", line 268, in op
return _Op(
File "/path/to/conda_env/lib/python3.9/site-packages/dagster/core/definitions/decorators/op.py", line 41, in __init__
self.input_defs = check.opt_nullable_list_param(
File "/path/to/conda_env/lib/python3.9/site-packages/dagster/check/__init__.py", line 532, in opt_nullable_list_param
raise _param_type_mismatch_exception(obj_list, (frozenlist, list), param_name)
whereas in the software defined asset of https://docs.dagster.io/guides/dagster/software-defined-assets a similar approach seems to work fine.owen
12/23/2021, 4:57 PMyield Output(my_value, metadata=...)
(instead of just return my_value
). The metadata on the Output will be automatically attached to the AssetMaterialization that Dagster automatically creates for @assets@op(ins={"input_name": In(...)})
. The legacy syntax would be @op(input_defs=[InputDefinition("input_name", ...)])
, which still works but is a bit more verbosecontext.solid_def.input_defs[0].asset_key
(a little verbose/uses legacy terms to do it from inside an op context, an oversight I'm just realizing now)geoHeil
12/23/2021, 9:11 PMowen
12/23/2021, 9:14 PMgeoHeil
12/23/2021, 9:19 PMDagsterInvalidDefinitionError: Invalid type: dagster_type must be an instance of DagsterType or a Python type: got AssetKey(['my_asset_key']).
But how can I feed the asset key as an input? (in case of the manual op - based approach?