Rahul Dave
03/06/2023, 6:34 PM<http://graph.to|graph.to>_job
has a resource_defs
argument where i can specify io-managers. I would have expected define_asset_job
to have a similar resources section but it does not. Why is this the case? Is it because assets " can live outside of jobs" or is my mental model just wrong? I know i can define resources like so:
defs = Definitions(
assets=[asset1],
jobs=[define_asset_job("all_assets")],
resources={"slack_client": prod_slack_client},
)
(from the api docs). But this makes resource keys global, rather than the per-job SCOPED resource keys that to_job
provides. Is there a way of getting job-scoped resouce keys with assets? Or am i thinking of this plainly philoso[phically wrong! šRahul Dave
03/06/2023, 6:43 PMRahul Dave
03/06/2023, 8:23 PMcontext.log
. And there does not seem to be any way to config source assets...Rahul Dave
03/06/2023, 8:56 PMdefine_asset_job
?`Rahul Dave
03/06/2023, 9:57 PMdagster._core.errors.DagsterInvariantViolationError: Attempting to access log, but it was not provided when constructing the OutputContext
...wierd...Rahul Dave
03/06/2023, 9:58 PMchris
03/06/2023, 10:31 PMDefinitions
object has a set of software-defined assets provided to it, and a set of resources available to those assets. define_assets_job
provides a specification for a set of assets which should be executed together, but the Definitions
object is what provides the actual assets definitions and resource definitions to construct what actually runs. We donāt let you provide resource definitions to define_assets_job
directly because you always need to use a Definitions
object to figure out what actual asset defintions are being used, and if resource definitions are specified anywhere other than on the Definitions
object, you can run into resource key collisions. Does that make sense?Rahul Dave
03/06/2023, 11:53 PMRahul Dave
03/07/2023, 12:00 AMchris
03/07/2023, 12:20 AMSourceAsset
the upstream_output
context on the InputContext
is set?Rahul Dave
03/07/2023, 12:29 AMRahul Dave
03/07/2023, 12:29 AMRahul Dave
03/07/2023, 12:30 AMdef load_input(self, context):
<http://context.log.info|context.log.info>(f"CSVxxxxxxxxxxxxx\n {context} AND {context.upstream_output}")
<http://context.log.info|context.log.info>(dir(InitResourceContext))
# remove because dagstermill processes dont seem to have a context
# <http://context.log.info|context.log.info>(f"{context.metadata}<>{context.name}<>{context.resource_config}")
if context.has_asset_key or (context.upstream_output is None and 'file_name' in context.resource_config): # input manager
<http://context.log.info|context.log.info>("xxxxxxxxxxx Input Manager Path")
path = self._get_path(context)
else:
<http://context.log.info|context.log.info>("xxxxxxxxxxx Upstrem Output Path")
bla = context.upstream_output
<http://context.log.info|context.log.info>(dir(bla))
<http://context.log.info|context.log.info>(bla.asset_info)
path = self._get_path(context.upstream_output)
#path = self._get_path(context)
with path.open("rb") as file:
return pd.read_csv(file)
Rahul Dave
03/07/2023, 12:30 AMif context.has_asset_key or (context.upstream_output is None and 'file_name' in context.resource_config):
Rahul Dave
03/07/2023, 12:32 AMRahul Dave
03/07/2023, 12:48 AMSourceAsset
?chris
03/07/2023, 12:57 AMinput_context.upstream_output
is referring to the SourceAssetās asset key. So consider for example the following case:
source1 = SourceAsset(AssetKey("source1"))
@asset
def my_asset(source1):
...
My io managerās handle_input will have an input context object, and when loading source1, upstream_output.asset_key will be set to source1
.
Is the load path completely different depending on whether source asset or regular asset? If so, I think that you might be better off just using different IO managers. cc @sandy on this, but I donāt think thereās a super straightforward way to tell if youāre loading from source or not, and I think thatās intentional, because the asset should exist in the same place regardless of the context in which itās loaded.
Whatās the loading code look like here? Iām wondering whether what you really want are two separate IO managers, one for the source and one for inter-assetRahul Dave
03/07/2023, 1:29 AMRahul Dave
03/07/2023, 1:31 AMSourceAsset
were to set a flag in the constructor, that might be the best option? I suppose it can be hacked by having a "source_" in the asset key which can be parsed for?Rahul Dave
03/07/2023, 1:41 AMcontext.has_asset_key
attribute good for this?Rahul Dave
03/07/2023, 2:08 AMload_input
seems to handle being an input_manager
(btw why do SourceAssets use iomanagers rather than inputmanagers: i guess its the symmetry to use them downstream that we want), being an iomanager
, and be used in SourceAssets:
Rahul Dave
03/07/2023, 2:08 AMdef load_input(self, context):
<http://context.log.info|context.log.info>(f"PPxxxxxxxxxxxxx\n {context} AND {context.upstream_output}")
source_asset = False
asset_mode = False
if context.has_asset_key: # this io manager is being used in an asset op
asset_mode = True
<http://context.log.info|context.log.info>("asset_mode=True")
<http://context.log.info|context.log.info>(context.upstream_output.asset_key.path[0])
if context.upstream_output.asset_key.path[0][0:7]=='source_':
source_asset = True
if (source_asset and asset_mode) or (context.upstream_output is None and 'file_name' in context.resource_config): # input manager
#if context.upstream_output is None and 'file_name' in context.resource_config: # input manager
<http://context.log.info|context.log.info>("xxxxxxxxxxx Input Manager Path")
path = self._get_path(context)
else:
<http://context.log.info|context.log.info>("xxxxxxxxxxx Upstrem Output Path")
bla = context.upstream_output
<http://context.log.info|context.log.info>(dir(bla))
<http://context.log.info|context.log.info>(bla.asset_info)
path = self._get_path(context.upstream_output)
with path.open("rb") as file:
return pd.read_parquet(file)
sandy
03/07/2023, 4:15 PMIsYes, exactlyattribute good for this?context.has_asset_key
Rahul Dave
03/07/2023, 4:51 PMRahul Dave
03/07/2023, 8:27 PMcontext._log.
Rahul Dave
03/07/2023, 8:28 PMRahul Dave
03/07/2023, 8:44 PMsandy
03/07/2023, 8:59 PMIn my other thread i noticed the wierd behavior of dagit for asset jobs: no launchpad until i try one failing run of the job: is it a bug?Do you mind elaborating on what you mean by this in a little more detail?
sandy
03/07/2023, 8:59 PMIn this case I would want to use the regular io manager anyway so am not sure how to branch. I need a predicate which says that this is the "root" of the graph...@chris asked me to jump in and help on this thread, but I think I don't have the full context - would you mind backing up and explaining what you're aiming to accomplish?
Rahul Dave
03/07/2023, 9:07 PMRahul Dave
03/07/2023, 10:52 PMRahul Dave
03/07/2023, 10:56 PMRahul Dave
03/07/2023, 10:57 PMRahul Dave
03/07/2023, 11:00 PMsandy
03/07/2023, 11:19 PMIf I create a job through define_asset_job, a launchpad tab wonāt show.You can shift-click the Materialize button to open the launchpad (this should be less hidden)
Rahul Dave
03/07/2023, 11:21 PMsandy
03/07/2023, 11:21 PMRahul Dave
03/07/2023, 11:21 PMRahul Dave
03/07/2023, 11:22 PMsandy
03/07/2023, 11:22 PMNow in my iomanager I branch on the existence of an upstream context to decide if I should get the file from the upstream context.can you explain this a little more? what do you if the upstream context isn't available?
Rahul Dave
03/07/2023, 11:24 PMRahul Dave
03/07/2023, 11:26 PMRahul Dave
03/07/2023, 11:32 PMsandy
03/07/2023, 11:32 PMcontext
will always have an asset_key
- is the asset_key
not enough to determine the path?sandy
03/07/2023, 11:33 PMupstream_output
will also be there in all asset cases (source asset or upstream non-source asset)Rahul Dave
03/07/2023, 11:44 PMRahul Dave
03/07/2023, 11:45 PMRahul Dave
03/07/2023, 11:47 PMsandy
03/08/2023, 12:06 AMThis problem must have been solved for load_input elsewhere I am thinkingā¦I mean the decision of what to load has to be taken right?I'm still not 100% grasping what the problem is. In the asset case,
context
will always have an asset_key
- is the asset_key
not enough to determine the path?
and context.has_asset_key
allows you to determine whether you're in the asset casesandy
03/08/2023, 12:08 AMAnd does it makes sense that _log is not set in the upstream context?I think we should set this on the upstream context. I'll make a PR for this.
sandy
03/08/2023, 12:12 AMRahul Dave
03/08/2023, 2:28 PMsource_asset -> target -> target_downstream
.Printing out context.asset_key AND context.upstream_output.asset_key in the input manager being used for target
gives both to be source_asset
. And printing them out in the io-manager for inputting to target_downstream
gives both to be target
. This is true whether I selectively materialize target_downstream
or run the whole dag.Rahul Dave
03/08/2023, 2:35 PMtarget
and target_downstream
)Rahul Dave
03/08/2023, 2:36 PMsandy
03/08/2023, 4:29 PM, the asset_key in the iomanager will always point to the upstream asset? Imagine an asset dag:exactly.Printing out context.asset_key AND context.upstream_output.asset_key in the input manager being used forsource_asset -> target -> target_downstream
gives both to betarget
. And printing them out in the io-manager for inputting tosource_asset
gives both to betarget_downstream
. This is true whether I selectively materializetarget
or run the whole dag.target_downstream
Rahul Dave
03/08/2023, 10:04 PMRahul Dave
03/08/2023, 10:07 PMRahul Dave
03/08/2023, 10:11 PMRahul Dave
03/08/2023, 10:14 PMRahul Dave
03/08/2023, 10:19 PMsandy
03/08/2023, 10:24 PMthat is, the case of simple asset materialization outside of the job context: what is the UI for that?you can shift-click from the asset lineage graph page too (not just the asset job page)
sandy
03/08/2023, 10:24 PMSo everything works out!awesome!!
Rahul Dave
03/08/2023, 10:27 PMyou can shift-click from the asset lineage graph page too (not just the asset job page)
Rahul Dave
03/08/2023, 10:28 PMsandy
03/08/2023, 10:38 PMRahul Dave
03/08/2023, 10:47 PMsandy
03/08/2023, 10:58 PM