Vinnie
11/30/2022, 8:43 AMdagster._check
as an “official” top-level module? After I delved into the source code a couple months ago, I found myself importing it into some of my projects, but it still leaves a sour taste in my mouth to import an API that’s not made public.josh
11/30/2022, 4:42 PMChris Evans
12/01/2022, 7:19 PMWithout software-defined assets, you're often forced to:
Split your pipeline into smaller jobs, which allows for easy maintenance but makes dependency tracking difficult
Now in the SDA paradigm I have much needed observability/dependency tracking across repositories for my deployment. Thank you to the whole Dagster team and community for continuing to build amazing products 🎉 !Mark Fickett
12/02/2022, 5:54 PMMatt Clarke
12/05/2022, 11:43 AMRobert Lawson
12/05/2022, 4:02 PMnon_argument_deps={"something_that_doesn't_exist"}
?Daniel Mosesson
12/05/2022, 6:57 PMop
to the io_manager
? What if it needs to be dynamic, e.g. if the op knows what the name of the output should be (setting metadata on the Output
does not seem to work)
• What is the best way to have an io_manager
that only really works for assets? (thanks to @owen for the idea of the custom decorator)
• What are all of the possible sources of the "name" of the thing:
◦ context.asset_key
?
◦ context.step_key
?
◦ various names
?
• detecting partition type (currently I use instanceof(context.asset_partitions_def
a bunch, maybe there is a better way)
• better understanding how context.asset_partitions_time_window
• How does all of this work in the context of:
◦ Backfills
◦ @multiasset
◦ SourceAsset
◦ DynamicOutput
• Other things to be aware of that may contain sharp edges for those who are writing io_manager
s
Currently what I do is look at the dagster
test suite, which is pretty awesome, but sometimes doesn't give the comprehensive overview that I sometimes lack. Does something like this already exist/can it?Alex Kan
12/05/2022, 10:05 PMNicolas Parot Alvarez
12/06/2022, 10:30 AM...
at the end here: https://docs.dagster.io/_apidocs/cliBinoy Shah
12/06/2022, 7:44 PMWilliam
12/07/2022, 7:56 AMmulti_asset
asset then I add one more. Therefore there's 2/3 materialized and 1/3 missing. I cannot materialize the missing one due to "partition is partial"Vinnie
12/07/2022, 9:01 AMasset_reconciliation_sensor
with SLAs, but this makes the lack of configuration options in assets and multi-assets even more relevant than it was before. While we can set the configs on the job level, it’s impossible to configure the assets themselves. Maybe I’m still stuck in dagster anti-patterns, but I’d like to keep my code as static as possible and configure assets with YAML files.Vinnie
12/08/2022, 11:46 AMAssetsDefinitions
don’t (can’t?) contain metadata. Inspecting the python object does properly return metadata under the property _metadata_by_key
but it doesn’t show up on dagit and can’t be accessed through my_asset.metadata
(or rather, the property doesn’t exist).
AssetsDefinition.from_graph(
my_graph,
keys_by_output_name={"result": AssetKey("my_cool_key")},
metadata_by_output_name={"result": {"column_schema": TableSchema.from_name_type_dict({"some_col": "str"})}},
)
Michael Giansiracusa (Mike Gold)
12/08/2022, 7:07 PMOperation name: AssetGraphLiveQuery
Message: maximum recursion depth exceeded while calling a Python object
Path: ["assetNodes",2,"projectedLogicalVersion"]
Locations: [{"line":44,"column":3}]
Alex Prykhodko
12/08/2022, 11:30 PMStephen Bailey
12/10/2022, 2:11 AMJobSelector
threw a warning or something if a job didn't exist. We had an instance today where we refactored a repo, and didn't find out for several days that this sensor wasnt' being tripped!
(Although maybe freshness policies will fix this 🙂 )
@run_status_sensor(
run_status=DagsterRunStatus.SUCCESS,
request_job=my_job,
monitored_jobs=[
JobSelector(
location_name="my_location",
repository_name="my_repo",
job_name="my_job",
)
]
)
def my_run_sensor(context: RunStatusSensorContext):
yield RunRequest(run_key=str(context.dagster_run.run_id))
Daniel Gafni
12/11/2022, 10:19 AMmaster
.
To solve this Dagster Cloud's Branch Deployments can be used, or one can setup a custom CI/CD job manually. In both cases, the job spinns up a new temporary Dagster deployment I will refer to this deployment as Feature Stage.
The Problem
Usually we don't want to have write access to Production from the FS as it would be unsafe. Thus, Dagster jobs must write data into a separate FS environment, like a temporary S3 bucket.
However, this means that we also can't read the production assets. One can of course write a custom IOManager that would somehow pick the read and write environment based on the asset metadata, but it would be extremely complicated to support it. In fact, what I'm trying to do here is to generalize this logic and let Dagster run it instead of the custom user code.
So when testing an asset, the user has to materialize it's upstream dependencies the FS environment first. This becomes a huge pain when working with heavy assets like trained machine learning models or a lot of data preprocessing for ML.
It may take hours to materialize the upstream assets before the developer can finally test the code he was working on. This has an enormous impact on development speed and productivity.
Now imagine adding backfills…
Solving this problem would bring a lot of value to developers being affected by it.
Proposed Solution
1. Introduce a env
tag to assets and op outputs. Internally it can be just a special metadata value like __environment__
. The default would be:
__environment__: default
. This tag can be displayed in Dagit. It has to be saved in Dagser's database when the asset is materialized (included in the materialization event).
2. Allow providing a dictionary of resource_configs
instead of a single config for every resource. Maybe also a dictionary of resource definitions instead of a single one. If the user hasn't provided a dictionary, wrap the single config into {"default": config}
.
3. When initializing a resource for the asset, pick the resource config (and possible the resource definition) using the asset's env
tag. If loading an asset, use the tag recorded in Dagster's database. If writing an asset, use the tag provided by the Dagster deployment.
Lets see some examples:
1. "Hey let's not overwrite prod files in CI"
with_resources(
assets,
resource_defs={"io_manager": my_io_manager},
resource_config_by_key={
"io_manager": {"prod": {"base_dir": "prod"}, "feature_stage": {"base_dir": f"fs-{FS_ID}"}}
},
)
2. "Hey let's not write into prod from CI"
with_resources(
assets,
resource_defs={"io_manager": my_io_manager, "aws_credentials": aws_credentials},
resource_config_by_key={
"io_manager": {"prod": {"bucket": "prod"}, "feature_stage": {"bucket": "stage", "base_dir": f"fs-{FS_ID}"}},
"aws_credentials": {
"prod": {"AWS_ACCESS_KEY_ID": READONLY_PROD_AWS_ACCESS_KEY_ID},
"feature_stage": {"AWS_ACCESS_KEY_ID": FS_AWS_ACCESS_KEY_ID},
},
},
)
What happens inside the feature stage (FS) Dagster deployment:
1. The production Dagster database is being cloned. The FS Dagster thus has access to all the assets materialized in production, as well as runs history, assets metadata, etc.
2. When materializing a new asset, Dagster will load upstream assets from the production environment. The env=prod
tag will tell Dagster to use the IOManager and resources that have read access to production.
3. When writing the asset, the env=feature_stage
tag will tell Dagster to use the FS IOManager and resources, thus materializing the asset in the FS environment.
As a result, all the assets produced before the FS deployment are going to be loaded from Production. All the assets produced after the deployment will be loaded from and written to the FS environment.
The proposed changes changes are:
• non-breaking
• very general, users can do a lot of stuff with custom environments
• would work with partitions
• very small codebase edits, we just have to add a few dictionaries here and there
• can be immediately used by Branch Deployments in Dagster Cloud
Would love to hear what everybody thinks! It's still not very clear to me where does the env
tag has to be defined - perhaps in the repository
decorator?
Tagging @sandy and @schrockn for future discussions since I've mentioned The Problem to you guys previously.Stephen Bailey
12/12/2022, 11:48 AMDefinitions
way of defining the code location. What's more, it paves the way for enabling Dagster to promote a default (optional) project structure, if sensors and schedules were able to be loaded from package directories in the same way that assets are. Here's what I'd love to see.
1. a new function called build_default_definitions
from dagster import Definitions, load_assets_from_package_name, load_sensors_from_pacakge_name, load_schedules_from_pacakge_name, load_jobs_from_pacakge_name
def build_default_definitions(
asset_package_name=".assets",
sensor_package_name=".sensors",
schedules_package_name=".schedules",
jobs_package_name=".jobs"
):
return Definitions(
assets=load_assets_from_package_name(asset_package_name),
sensors=load_sensors_from_pacakge_name(sensor_package_name),
jobs=load_jobs_from_pacakge_name(jobs_package_name),
schedules=load_schedules_from_pacakge_name(schedules_package_name),
)
then, in a user's auto-generated project __init__.py
from dagster import build_default_definitions
def = build_default_definitions()
This would make alternative project structures opt-in: users would never have to worry about whether a sensor, asset or job they added is getting sucked into the code location ; if they put it in the right folder, it just happens.Binoy Shah
12/13/2022, 6:22 PMCasper Weiss Bang
12/14/2022, 9:34 AMMitchell Hynes
12/15/2022, 4:16 PMworkspace.yml
so I could have code-locations spin up and automatically announce themselves to the instance, without having to rsync
the file from the server, edit it, and rsync
it back.Mark Fickett
12/15/2022, 9:39 PMWilliam
12/16/2022, 2:17 AMasset
but it’s strange schedules do not support native asset but we have to convert them to jobs. Sensors recently added support for asset, shall we do the same for schedules?Bz Mn
12/16/2022, 8:00 AMVinnie
12/16/2022, 8:29 AMAssetMaterialization
events were emitted a few miliseconds from each other, but in the correct order.
It’s also showing differently in the job view in dagit vs. the asset group view. See both screenshots for comparison. The latter is especially surprising as the source asset was materialized 2 minutes before the first asset in the graph. Inspecting each asset individually, all of them show as stale in their asset pages.
No FreshnessPolicies
are configured.Stephen Bailey
12/16/2022, 3:14 PMMark Fickett
12/16/2022, 5:53 PMZach
12/16/2022, 10:57 PMAlex Prykhodko
12/18/2022, 4:49 AMhandle_output()
in the load_input()
method. In my attempts, using add_output_metadata()
does work as expected – the event data in asset catalog displays the specified keys – but the keys are not available when reading upstream_ouput
property of the context in load_input()
.
Modified example from the doc:
class DataframeTableIOManagerWithMetadata(IOManager):
def handle_output(self, context, obj):
table_name = context.name
write_dataframe_to_table(name=table_name, dataframe=obj)
context.add_output_metadata({"num_rows": len(obj), "table_name": table_name})
def load_input(self, context):
table_name = context.upstream_output.name
num_rows = context.upstream_output.metadata["num_rows"] # <-- That'd be great
return read_dataframe_from_table(name=table_name)
If not using the approach suggested above, how else can I read the metadata in the load_input()
method?Greg Whittier
12/20/2022, 2:58 PM