https://dagster.io/ logo
Title
g

geoHeil

12/22/2021, 9:45 PM
How can I get the path of an asset? I.e. like in a hive metastore it knows where the table is stored. https://docs.dagster.io/concepts/assets/asset-materializations#asset-lineage
return users_df
directly outputs the dataframe - so it is clear to me how the data is shared from op to op. But https://docs.dagster.io/tutorial/advanced-tutorial/materializations uses
yield Output(None)
. Assuming I have created an asset with key `assset_foo_1`in the directory
output/foo
How can I (1) link a second job using only the asset(key) as an input (i.e. like select * form table where the metastore knows the path details and (2) have the 2nd op/job re-trigger in case the asset was updated?
o

owen

12/22/2021, 11:16 PM
So dagster knows how to handle (and can be told how to handle) serializing and deserializing in memory python object using IOManagers. By default, this happens using the local file system, but this protocol can be completely customized if you write an IOManager that does something else. So for example an IOManager could be written to store an output Pandas data frame to hive and then load it as an input by running a select * - type query. An example of that sort of setup (that's more complicated than what you would need because it handles multiple output data types): https://github.com/dagster-io/dagster/blob/master/examples/hacker_news/hacker_news/resources/snowflake_io_manager.py
this only answers half of your question though because you want to work with asset keys not just normal inputs / outputs. There are asset sensors to detect when an asset has been materialized: https://docs.dagster.io/_apidocs/schedules-sensors#dagster.asset_sensor, which sounds like what you want, but on the whole it seems like if you're interested in having a more asset-focused experience where you define inputs and outputs purely in terms of the assets they produce, then you might want to check out Software Defined Assets: https://docs.dagster.io/guides/dagster/software-defined-assets
g

geoHeil

12/23/2021, 6:06 AM
So for the 2nd part - I think the software defined assets look great and I will try these. However, as far as I know these do not (yet) support partitioned assets. Regarding the first part: I think custom IO managers are not really the answer. I want to use assets to combine separate jobs (from potentially different teams) where one job is providing input for another one. Adding in the sensors sounds quite cumbersome - I was hoping for a simple solution. Perhaps the (2) will handle both?
I.e. I am talking about this case here: yield Output(None)
where the df is not moved in whatever way (via some IO manager) to a directly downstream task
o

owen

12/23/2021, 2:21 PM
You're right that software defined assets don't yet support partitioned assets (but this is in the works 🙂). Sensors are the abstraction Dagster has to trigger a job's execution based on some event, so to get the behavior of "if asset A is updated, run job X to update asset B", that's the best option in most scenarios where the assets are split into separate jobs. If you're ok with them being in the same job (maybe just in separate graphs), then you can avoid that friction, but I get that that's not always possible.
For your example would
asset_foo_1
always be stored in the same place (
output/foo
), or would it potentially changed based on things that happen at runtime?
In my mind, the IOManager approach is useful because it defines both sides of the serde protocol in a single place. In essence, the
yield Output(None)
example is still defining a serde, but in a less flexible way where the serialization logic is inside the body of the op (and the deserialization logic will need to be duplicated in any op that wants to consume that asset). But whatever logic was used to determine where to store the asset can often be reused to determine where to load it (the IOManager is just a way of bundling it together). If the dependencies need to be across different jobs, a RootInputManager might be possible to use, but this would require that the path is dependent solely on things that are known about the input (so the name of the input or the asset key attached to the input, or arbitrary metadata you specify on the input definition).
For the more dynamic case, where the path can really be anything and there's not a ton of guarantees, using an asset_sensor has an additional benefit, which is that it provides the actual AssetMaterialization event when you're launching a run. So assuming the AssetMaterialization has metadata on it specifying the exact path it was stored to, this can be read and used as configuration to your downstream job.
g

geoHeil

12/23/2021, 2:39 PM
For simplicity, let`s assume the path is constant.
the asset key attached to the input,
sounds interesting
How would one go about this?
o

owen

12/23/2021, 2:45 PM
https://docs.dagster.io/_apidocs/ops#dagster.In has an asset_key argument, although it's not actually used for much at the moment. However, assuming the upstream asset was stored using a path that was computed from that asset key, an InputDefinition is a useful place to stash that same AssetKey for running that same computation on the load side
g

geoHeil

12/23/2021, 2:56 PM
How could I retrieve the path from the asset key? > Used for tracking lineage information through Dagster. states
o

owen

12/23/2021, 2:59 PM
there's no built in way, but on the writing side the functionality to produce the path is often a function of the asset key it's writing (so it would just be a matter of using the same function on the writing and reading side)
g

geoHeil

12/23/2021, 3:00 PM
function (like IO Manager) agreed. But even the snowflake IO you showed above needs to know DB and table. And I would love the asset to provide (db, table or in my case path) for a given asset key.
o

owen

12/23/2021, 3:06 PM
ah yeah fair enough -- depending on your naming scheme for your asset keys, that information might already be there (for example I could name my assets
snowflake.schema_name.table_name
and parse the key to figure out where to read it from). Absent that, you can add arbitrary metadata to an InputDefinition using the metadata argument (which is just a dictionary). So you could for example add metadata to your input that says that it lives in table X in schema Y in database Z. This is basically just hardcoding, but the information does need to exist somewhere.
g

geoHeil

12/23/2021, 3:09 PM
hm. So given my asset key of: ``assset_foo_1`` and assuming metadata with name
path
and value
/path/to/asset
is present. How can I access the metadata for a given asset key? i.e. naively formulated look up the metastore for the table`s location parameter?
I.e. I would not want to add more arbitrary metadata to the input definition - but rather only the asset key (and then perform a lookup inside for the desired metadata values)
o

owen

12/23/2021, 3:19 PM
I guess the issue is that metadata is not specific to a particular asset key but rather to a particular asset materialization. So you would need to query the Dagster event log to get the most recent materialization for your asset key. This is totally doable (it's what the asset sensor is doing under the hood). Would look something like:
event_records = context.instance.get_event_records(
    EventRecordsFilter(
        event_type=DagsterEventType.ASSET_MATERIALIZATION,
        asset_key=asset_key,
    ),
    ascending=False,
    limit=1,
)
metadata = event_records[0].event_log_entry.materialization.metadata_entries
# ... pick out path from metadata
g

geoHeil

12/23/2021, 3:20 PM
interesting. Thanks. Regarding partitioned software defined assets: is there somewhere an Issue I can follow or when would you expect to get out this feature?
o

owen

12/23/2021, 3:34 PM
No trackable issue at the moment unfortunately, but it is one of our higher priority projects at the moment. If I had to guess, I'd say it be on the order of a couple of months (sooner for something useable but unfinished)
:next-level-daggy: 1
g

geoHeil

12/23/2021, 4:09 PM
here

https://www.youtube.com/watch?v=4ZYxVmxXzjw

you demo a right click to launch assets. When using the code form https://docs.dagster.io/guides/dagster/software-defined-assets I cannot right click in 13.12 and also do not get the yellow marker in case the first upstream asset (
daily_temperature_highs
) is updated (to remind me that the downstream one needs to be updated as well. Where would I get this function? Why is it not present here? (confused)
It looks like this/parts of it are only available in the asset view and not in the launchpad/overview tab for the job. In the video though it looked like it would be available there as well. Is this a misunderstanding?
A second thing which is still unclear to me is: Even in the case of software defined assets
assets=[daily_temperature_highs, hottest_dates]
all items seem to live in the same job (graph). But if a 2nd team would deliver some of the assets would I still manually need to add in a sensor to trigger? Where would I get the opportunity to add in the additional metadata hin case of the software defined assets similar to AssetMaterialization?
When still experimenting with the manual (non SDA) asset approach:
@op(input_defs=In(asset_key=AssetKey("foo_my_asset_key")))
def get_asset(context):
    get_dagster_logger().info(f"asset path: {context.asset_key.path}")
fails with:
dagster.check.ParameterCheckError: Param "input_defs" is not one of ['frozenlist', 'list']. Got In(dagster_type=<class 'dagster.core.definitions.utils.NoValueSentinel'>, description=None, default_value=<class 'dagster.core.definitions.utils.NoValueSentinel'>, root_manager_key=None, metadata=None, asset_key=AssetKey(['foo_my_asset_key']), asset_partitions=None) which is type <class 'dagster.core.definitions.input.In'>.
  File "/path/to/conda_env/lib/python3.9/site-packages/dagster/grpc/server.py", line 205, in __init__
    self._repository_symbols_and_code_pointers.load()
  File "/path/to/conda_env/lib/python3.9/site-packages/dagster/grpc/server.py", line 90, in load
    self._loadable_repository_symbols = load_loadable_repository_symbols(
  File "/path/to/conda_env/lib/python3.9/site-packages/dagster/grpc/server.py", line 108, in load_loadable_repository_symbols
    loadable_targets = get_loadable_targets(
  File "/path/to/conda_env/lib/python3.9/site-packages/dagster/grpc/utils.py", line 37, in get_loadable_targets
    else loadable_targets_from_python_package(package_name)
  File "/path/to/conda_env/lib/python3.9/site-packages/dagster/core/workspace/autodiscovery.py", line 29, in loadable_targets_from_python_package
    module = load_python_module(package_name, remove_from_path_fn=remove_from_path_fn)
  File "/path/to/conda_env/lib/python3.9/site-packages/dagster/core/code_pointer.py", line 182, in load_python_module
    module = importlib.import_module(module_name)
  File "/path/to/conda_env/lib/python3.9/importlib/__init__.py", line 127, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
  File "<frozen importlib._bootstrap>", line 1030, in _gcd_import
  File "<frozen importlib._bootstrap>", line 1007, in _find_and_load
  File "<frozen importlib._bootstrap>", line 986, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 680, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 850, in exec_module
  File "<frozen importlib._bootstrap>", line 228, in _call_with_frames_removed
  File "/path/to/dagster_project/__init__.py", line 1, in <module>
    from .repository import INTEGRATED_PIPELINE
  File "/path/to/dagster_project/repository.py", line 7, in <module>
    from INTEGRATED_PIPELINE.graphs.my_jobthing import my_jobthing_job
  File "/path/to/dagster_project/graphs/my_jobthing.py", line 66, in <module>
    @op(input_defs=In(asset_key=AssetKey("foo_my_asset_key")))
  File "/path/to/conda_env/lib/python3.9/site-packages/dagster/core/definitions/decorators/op.py", line 268, in op
    return _Op(
  File "/path/to/conda_env/lib/python3.9/site-packages/dagster/core/definitions/decorators/op.py", line 41, in __init__
    self.input_defs = check.opt_nullable_list_param(
  File "/path/to/conda_env/lib/python3.9/site-packages/dagster/check/__init__.py", line 532, in opt_nullable_list_param
    raise _param_type_mismatch_exception(obj_list, (frozenlist, list), param_name)
whereas in the software defined asset of https://docs.dagster.io/guides/dagster/software-defined-assets a similar approach seems to work fine.
o

owen

12/23/2021, 4:57 PM
For the UI stuff you need to enable the SDA APIs in the settings page (gear icon -> Experimental Asset APIs). For jobs that are composed out of only assets, you'll see those new UI elements.
👍 1
For adding additional metadata to asset materializations, you can add metadata to the Output by doing something like
yield Output(my_value, metadata=...)
(instead of just
return my_value
). The metadata on the Output will be automatically attached to the AssetMaterialization that Dagster automatically creates for @assets
😛artydagster: 1
If you include all the assets in the same job, there's no need for an asset sensor (you'd only need that if you had two separate asset jobs)
and for the error you're seeing, it should be
@op(ins={"input_name": In(...)})
. The legacy syntax would be
@op(input_defs=[InputDefinition("input_name", ...)])
, which still works but is a bit more verbose
and then to access the asset_key from the context you'd have to do something like
context.solid_def.input_defs[0].asset_key
(a little verbose/uses legacy terms to do it from inside an op context, an oversight I'm just realizing now)
g

geoHeil

12/23/2021, 9:11 PM
https://dagster.slack.com/archives/C01U954MEER/p1640278939073600?thread_ts=1640209536.005500&amp;cid=C01U954MEER indeed. But I am explicitly asking how to get it to work when I am not wanting to construct a single global gigantic graph. So do I understand correctly, that also for the software defined assets the sensor is necessary if I want to access stuff from another job?
o

owen

12/23/2021, 9:14 PM
the cross job software defined asset semantics haven't been fully ironed out at the moment. right now asset sensors are the way to do it, but a cleaner execution interface on top of these types of disjoint jobs is something we're working towards (you're right that it's a bit cumbersome at the moment)
g

geoHeil

12/23/2021, 9:19 PM
Is there some (organisatoric) limit you would recommend for the max size of a graph? I.e. would it be considered best practice to have a gigantic single big job graph? As far as I know a single repository wants to use the same version of python - but perhaps could manually activate some conda venv to use different packages/python version.
https://dagster.slack.com/archives/C01U954MEER/p1640279033073800?thread_ts=1640209536.005500&amp;cid=C01U954MEER
DagsterInvalidDefinitionError: Invalid type: dagster_type must be an instance of DagsterType or a Python type: got AssetKey(['my_asset_key']).
But how can I feed the asset key as an input? (in case of the manual op - based approach?