Folks, a som ewhat fundamental confusion: I have a...
# ask-community
r
Folks, a som ewhat fundamental confusion: I have a succesful ops/graphs/jobs project and I am now trying to use assets. Now
<http://graph.to|graph.to>_job
has a
resource_defs
argument where i can specify io-managers. I would have expected
define_asset_job
to have a similar resources section but it does not. Why is this the case? Is it because assets " can live outside of jobs" or is my mental model just wrong? I know i can define resources like so:
Copy code
defs = Definitions(
    assets=[asset1],
    jobs=[define_asset_job("all_assets")],
    resources={"slack_client": prod_slack_client},
)
(from the api docs). But this makes resource keys global, rather than the per-job SCOPED resource keys that
to_job
provides. Is there a way of getting job-scoped resouce keys with assets? Or am i thinking of this plainly philoso[phically wrong! šŸ™‚
My notion is that some of the burden of ops goes sway as 2 separate assets automatically get their own job and own config, whereas in a job/op scenario if i have 2 resources of the same tupe using the same io-manager i need separate io-manager keys to supply separate config (filenames for example)
Hmm, materialization outside the context of a job seems to lose the
context.log
. And there does not seem to be any way to config source assets...
But there does not seem to be asset specific resource config. So it makes sense to use a different instance of the same resource manager per asset (by specifying a different resource key name per asset, so that this can be disambiguated in the resources section of config...and this enables per job config as well. So then, why not provide a resource def on the
define_asset_job
?`
The logging error is
dagster._core.errors.DagsterInvariantViolationError: Attempting to access log, but it was not provided when constructing the OutputContext
...wierd...
comes from an io-manager in which i am using logging. op based jobs get logging by default but asset jobs seem to need more...
c
So with op-based jobs, the ops are scoped to the job, so you can provide resources to the job directly / specify it as a fully executable unit. With assets things work a little differently. So the mental model is that each
Definitions
object has a set of software-defined assets provided to it, and a set of resources available to those assets.
define_assets_job
provides a specification for a set of assets which should be executed together, but the
Definitions
object is what provides the actual assets definitions and resource definitions to construct what actually runs. We donā€™t let you provide resource definitions to
define_assets_job
directly because you always need to use a
Definitions
object to figure out what actual asset defintions are being used, and if resource definitions are specified anywhere other than on the
Definitions
object, you can run into resource key collisions. Does that make sense?
r
yes it does! So to paraphrase, the act that assets may be manifested outside of job scopes means that we MUST pass in definitions to make sure there are no resource-key conflicts
I figured out the io-manager thing i think: I wanted to set up an io-manager for a SourceAsset, and it seems in assets, even source assets, the io manager gets an upstream context. I branch in my io-manager on the existence of upstream context (so that i can use my io manager as an input manager as well). So what i need to do is to figure if I am in a SourceAsset from my io-manager...any tips?
c
Are you saying that even in
SourceAsset
the
upstream_output
context on the
InputContext
is set?
r
yes it is!
and does not have a _log on it which causes my error
I put this code in to fix:
Copy code
def load_input(self, context):
        <http://context.log.info|context.log.info>(f"CSVxxxxxxxxxxxxx\n {context} AND {context.upstream_output}")
        <http://context.log.info|context.log.info>(dir(InitResourceContext))
        # remove because dagstermill processes dont seem to have a context
        # <http://context.log.info|context.log.info>(f"{context.metadata}<>{context.name}<>{context.resource_config}")
        if context.has_asset_key or (context.upstream_output is None and 'file_name' in context.resource_config): # input manager
            <http://context.log.info|context.log.info>("xxxxxxxxxxx Input Manager Path")
            path = self._get_path(context)
        else:
            <http://context.log.info|context.log.info>("xxxxxxxxxxx Upstrem Output Path")
            bla = context.upstream_output
            <http://context.log.info|context.log.info>(dir(bla))
            <http://context.log.info|context.log.info>(bla.asset_info)
            path = self._get_path(context.upstream_output)
        #path = self._get_path(context)
        with path.open("rb") as file:
            return pd.read_csv(file)
if context.has_asset_key or (context.upstream_output is None and 'file_name' in context.resource_config):
But am pretty sure i am going to create input/output confusion when i have one asset feeding into another: with this i am currently able to get the sourceasset to take the input path rather than the output-path (by which i mean read from upstream's input)
The problem here is that i send every iomanager down the read input fresh part rather than reading the output of the previous io-manager and asset. Any thoughts on how to detect a
SourceAsset
?
c
Okay so did some digging and hereā€™s the deal; ā€¢ When loading an input from a SourceAsset, the
input_context.upstream_output
is referring to the SourceAssetā€™s asset key. So consider for example the following case:
Copy code
source1 = SourceAsset(AssetKey("source1"))

@asset
def my_asset(source1):
    ...
My io managerā€™s handle_input will have an input context object, and when loading source1, upstream_output.asset_key will be set to
source1
. Is the load path completely different depending on whether source asset or regular asset? If so, I think that you might be better off just using different IO managers. cc @sandy on this, but I donā€™t think thereā€™s a super straightforward way to tell if youā€™re loading from source or not, and I think thatā€™s intentional, because the asset should exist in the same place regardless of the context in which itā€™s loaded. Whatā€™s the loading code look like here? Iā€™m wondering whether what you really want are two separate IO managers, one for the source and one for inter-asset
r
I think this sort of depends on whats being loaded. In my simple example, all sources are csv, so i can get away by simply mucking a new CSV "input" iomanager. Which is what i did to get it work. But on arbitrary machine learning pipelines, i'd be loading from parquet, which tends to be higly used as an intermediate format as well. Now multiply by source "areas", such as local/s3/elsewhere, and we are supporting too many combinations
So if
SourceAsset
were to set a flag in the constructor, that might be the best option? I suppose it can be hacked by having a "source_" in the asset key which can be parsed for?
I suppose then here the question is: how do i reliably test if i am in an asset or in a non-asset op when i drop down to my iomanager. Is
context.has_asset_key
attribute good for this?
Code which looks like this for
load_input
seems to handle being an
input_manager
(btw why do SourceAssets use iomanagers rather than inputmanagers: i guess its the symmetry to use them downstream that we want), being an
iomanager
, and be used in
SourceAssets:
Copy code
def load_input(self, context):
        <http://context.log.info|context.log.info>(f"PPxxxxxxxxxxxxx\n {context} AND {context.upstream_output}")
        source_asset = False
        asset_mode = False
        if context.has_asset_key: # this io manager is being used in an asset op
            asset_mode = True
            <http://context.log.info|context.log.info>("asset_mode=True")
            <http://context.log.info|context.log.info>(context.upstream_output.asset_key.path[0])
            if context.upstream_output.asset_key.path[0][0:7]=='source_':
                source_asset = True
        if (source_asset and asset_mode) or (context.upstream_output is None and 'file_name' in context.resource_config): # input manager
            #if context.upstream_output is None and 'file_name' in context.resource_config: # input manager
            <http://context.log.info|context.log.info>("xxxxxxxxxxx Input Manager Path")
            path = self._get_path(context)
        else:
            <http://context.log.info|context.log.info>("xxxxxxxxxxx Upstrem Output Path")
            bla = context.upstream_output
            <http://context.log.info|context.log.info>(dir(bla))
            <http://context.log.info|context.log.info>(bla.asset_info)
            path = self._get_path(context.upstream_output)
        with path.open("rb") as file:
            return pd.read_parquet(file)
s
Is
context.has_asset_key
attribute good for this?
Yes, exactly
r
Thanks @sandy! In my other thread i noticed the wierd behavior of dagit for asset jobs: no launchpad until i try one failing run of the job: is it a bug?
I found a new problem with this approach: If i have an already materialized upstream asset, and use "materialize selected" in dagster to only materialize the downstream asset, then I dont have an asset key starting with 'source_` upstream so i land up going the upstream asset path, and it once again does not set a
context._log.
In this case I would want to use the regular io manager anyway so am not sure how to branch. I need a predicate which says that this is the "root" of the graph...
How do I access the graph structure?
s
In my other thread i noticed the wierd behavior of dagit for asset jobs: no launchpad until i try one failing run of the job: is it a bug?
Do you mind elaborating on what you mean by this in a little more detail?
In this case I would want to use the regular io manager anyway so am not sure how to branch. I need a predicate which says that this is the "root" of the graph...
@chris asked me to jump in and help on this thread, but I think I don't have the full context - would you mind backing up and explaining what you're aiming to accomplish?
r
Weirdly tho I am thinking a selection materialization shoukd be ok in using upstream context to get the upstream asset. But I donā€™t think I know how to write the iomanager to properly do this. Or to deal with the similar SourceAsset issueā€¦
@sandy lemme get you full details in a bit. The precis is that both SourceAssets and downstream selectively materialized assets ( where the latter depends on an already materialized upstream asset ) become the ā€œfirstā€ asset. But both ( reasonably I think)?have an upstream context set
Now in my iomanager I branch on the existence of an upstream context to decide if I should get the file from the upstream context. This works fine for ops. But for assets I donā€™t know what to do and this branching gets me in trouble. In the SourceAsset case atleast I should be using the input context. The question is how should I branch safely.
And in both these cases, upstream_context._log is not set, defeating my use of context.log.info to figure things out šŸ˜ šŸ„²
On the laucnchpad not showing, see the image. If I create a job through define_asset_job, a launchpad tab wonā€™t show. The moment I create a run ( which will fail ) a launchpad tab shows. Then I can provide config and the job will run successfully!
s
If I create a job through define_asset_job, a launchpad tab wonā€™t show.
You can shift-click the Materialize button to open the launchpad (this should be less hidden)
r
Ooh I didnā€™t know that! Why hide it in the first place? Jobs need config, right?
s
r
But this is great! I can config away!
Thanks!!
s
Now in my iomanager I branch on the existence of an upstream context to decide if I should get the file from the upstream context.
can you explain this a little more? what do you if the upstream context isn't available?
r
The example here shows what I do in load_input
The bottom line is that if there is no upstream context I want to assume I am in an input, but if there is one I should grab the upstream contexts file
Now in a SourceAsset the upstream context is set so my idea wonā€™t work. So if I am in the asset situation I figured I would demand my source asset keys be like ā€œsource_blaā€ so I cam match on that. Ok so source assets work. Then i materialized an upstream asset. Next time around I only materialize downstream. Now I again land up taking the upstream condition because upstream_context exists. I donā€™t know if this is correct, or if I need to find another condition to get an input context instead. I was hoping to debug but it turns out that in both the SourceAsset and materialize downstream asset cases, upstream_context._log is not set, preventing me from loggingā€¦
s
in the asset case,
context
will always have an
asset_key
- is the
asset_key
not enough to determine the path?
however, I believe that
upstream_output
will also be there in all asset cases (source asset or upstream non-source asset)
r
This problem must have been solved for load_input elsewhere I am thinkingā€¦I mean the decision of what to load has to be taken right?
In my case I am up-managering to specifically named files as they can be inputs to other jobs and mean something in the context of the machine learning process
The original implementation worked nicely for ops but breaks down for assets. Do asset keys get internally represented in certain ways? Is there a way I can access that in both my cases I am at the root of the graph? And does it makes sense that _log is not set in the upstream context?
s
This problem must have been solved for load_input elsewhere I am thinkingā€¦I mean the decision of what to load has to be taken right?
I'm still not 100% grasping what the problem is. In the asset case,
context
will always have an
asset_key
- is the
asset_key
not enough to determine the path? and
context.has_asset_key
allows you to determine whether you're in the asset case
And does it makes sense that _log is not set in the upstream context?
I think we should set this on the upstream context. I'll make a PR for this.
r
So if i understand you correctly @sandy, the asset_key in the iomanager will always point to the upstream asset? Imagine an asset dag:
source_asset -> target -> target_downstream
.Printing out context.asset_key AND context.upstream_output.asset_key in the input manager being used for
target
gives both to be
source_asset
. And printing them out in the io-manager for inputting to
target_downstream
gives both to be
target
. This is true whether I selectively materialize
target_downstream
or run the whole dag.
(this is of-course in input iomanagers for
target
and
target_downstream
)
The output iomanagers point to the current asset.
s
, the asset_key in the iomanager will always point to the upstream asset? Imagine an asset dag:
source_asset -> target -> target_downstream
.Printing out context.asset_key AND context.upstream_output.asset_key in the input manager being used for
target
gives both to be
source_asset
. And printing them out in the io-manager for inputting to
target_downstream
gives both to be
target
. This is true whether I selectively materialize
target_downstream
or run the whole dag.
exactly
r
Aha @sandy so the whole time, for assets, I did not need to use the input branch in the io manager. Using the upstream branch would have worked just fine!. I think i might have got thrown off by the inability to log, which your PR fixes! (I think i was being stupid there, the source asset is an upstream source, and thus we do need to pick up stuff from its context!). In this sense the asset mechanism is simpler than the ops mechanism, for in the ops mechanism my iomanager DOES need to decide which branch to take. For Assets, for input, there just IS an upstream. So i dont need any "source_" to prefix my asset keys for SourceAssets at all. And when i ask it to selectively materialize it turns the upstream into a source asset as well, So everything works out!
Creating a Launchpad tab for asset jobs will use the trained muscle memory from regular jobs to set the config. Thanks for creating the issue! I suspect the subtlety there is providing the ability to give a config outside of the context of an asset job, that is, the case of simple asset materialization outside of the job context: what is the UI for that?
One thing that specifically happens on the shift-click to materialize when i want to do a selective materialization is that the config from previous runs is lost in the modal (config from previous runs is not lost in the context of asset job running as there is a tab for every run in that launchpad). I land up copy-pasting it from a file where i have put the configs
Thanks so much for your help @sandy! I now have sensors set up to retrain when new training data with labels comes in and sensors setup to do inference when new unlabelled data comes in. Its sweet! Next things is to figure how to use artefact management apis such as mlflow/wandb to use dagsters api to kick off new inference runs...so that in addition to sensors that help do inference on new data, I have sensors that do inference when a new best-fit model comes in!
Also looking forward to https://github.com/dagster-io/dagster/issues/10557 being handled: then i can have notebooks output regular data assets. BTW think that dagstermill has some rough edges around the process boundary problem: basically the dagstermill setup does IO twice..once in the calling process and once in the notebook process so as to transport the data out. For large data this could get expensive. Was wondering if this was a good usecase for a shared-memory io manager...
s
that is, the case of simple asset materialization outside of the job context: what is the UI for that?
you can shift-click from the asset lineage graph page too (not just the asset job page)
So everything works out!
awesome!!
r
you can shift-click from the asset lineage graph page too (not just the asset job page)
Yes, did notice! My 2c: explicit launchpad page which can access previous history rather than a modal dialog would be more consistent in terms of UX behavior with the additional feature that previous configs can be accessed!\
s
mind adding your thoughts to the issue? https://github.com/dagster-io/dagster/issues/12786 then when our UI folks take a look at it, they'll have that suggestion in mind
r
Will do!
s
here's an issue to track the Dagstermill double-write problem you mentioned: https://github.com/dagster-io/dagster/issues/12830