Archie Kennedy
10/19/2022, 6:50 AMAndrew Ryan
10/19/2022, 10:25 AM"""
Example using partitions to run a workflow daily
dagit -f partitions_example.py
"""
from dagster import (
asset,
AssetIn,
build_schedule_from_partitioned_job,
define_asset_job,
HourlyPartitionsDefinition,
DailyPartitionsDefinition,
repository,
)
hourly = HourlyPartitionsDefinition(start_date="2022-10-01-00:00")
daily = DailyPartitionsDefinition(start_date="2022-10-01")
@asset(partitions_def=hourly)
def model_hourly(context) -> list[int]:
"""Example partitioned assets for model"""
<http://context.log.info|context.log.info>(f"asset partition: {context.asset_partition_key_for_output()}")
return [1, 2, 3]
@asset(partitions_def=daily)
def model_daily(context, model_hourly):
"""Example partitioned assets for model"""
<http://context.log.info|context.log.info>(f"asset partition: {context.asset_partition_key_for_output()}")
assets_schedule_hourly = build_schedule_from_partitioned_job(
define_asset_job(
"job_hourly",
selection=["model_hourly"],
partitions_def=hourly,
)
)
assets_schedule_daily = build_schedule_from_partitioned_job(
define_asset_job(
"job_daily",
selection=["model_daily"],
partitions_def=daily,
)
)
@repository
def repo():
return [
model_daily,
model_hourly,
assets_schedule_daily,
assets_schedule_hourly,
]
Attempting to materialize the model_daily
asset (with the required model_hourly
assets already materialized manually) generates the following exception, I posted the full traceback in case there are any subtlties I may have overlooked in my previous debugging efforts.
Thanks in advance for any help you may be able to offer.
dagster._core.errors.DagsterExecutionLoadInputError: Error occurred while loading input "model_hourly" of step "model_daily":
File "/Users/andrewryan/opt/miniconda3/envs/dagster/lib/python3.10/site-packages/dagster/_core/execution/plan/execute_plan.py", line 225, in dagster_event_sequence_for_step
for step_event in check.generator(step_events):
File "/Users/andrewryan/opt/miniconda3/envs/dagster/lib/python3.10/site-packages/dagster/_core/execution/plan/execute_step.py", line 322, in core_dagster_event_sequence_for_step
for event_or_input_value in ensure_gen(
File "/Users/andrewryan/opt/miniconda3/envs/dagster/lib/python3.10/site-packages/dagster/_core/execution/plan/inputs.py", line 201, in load_input_object
yield from _load_input_with_input_manager(loader, load_input_context)
File "/Users/andrewryan/opt/miniconda3/envs/dagster/lib/python3.10/site-packages/dagster/_core/execution/plan/inputs.py", line 857, in _load_input_with_input_manager
with solid_execution_error_boundary(
File "/Users/andrewryan/opt/miniconda3/envs/dagster/lib/python3.10/contextlib.py", line 153, in __exit__
self.gen.throw(typ, value, traceback)
File "/Users/andrewryan/opt/miniconda3/envs/dagster/lib/python3.10/site-packages/dagster/_core/execution/plan/utils.py", line 77, in solid_execution_error_boundary
raise error_cls(
The above exception was caused by the following exception:
dagster._check.CheckError: Failure condition: Tried to access partition key for input 'model_hourly' of step 'model_daily', but the step input has a partition range: '2022-10-17-00:00' to '2022-10-17-23:00'.
File "/Users/andrewryan/opt/miniconda3/envs/dagster/lib/python3.10/site-packages/dagster/_core/execution/plan/utils.py", line 47, in solid_execution_error_boundary
yield
File "/Users/andrewryan/opt/miniconda3/envs/dagster/lib/python3.10/site-packages/dagster/_core/execution/plan/inputs.py", line 867, in _load_input_with_input_manager
value = input_manager.load_input(context)
File "/Users/andrewryan/opt/miniconda3/envs/dagster/lib/python3.10/site-packages/dagster/_core/storage/fs_io_manager.py", line 205, in load_input
filepath = self._get_path(context)
File "/Users/andrewryan/opt/miniconda3/envs/dagster/lib/python3.10/site-packages/dagster/_core/storage/fs_io_manager.py", line 140, in _get_path
path = context.get_asset_identifier()
File "/Users/andrewryan/opt/miniconda3/envs/dagster/lib/python3.10/site-packages/dagster/_core/execution/context/input.py", line 397, in get_asset_identifier
return self.asset_key.path + [self.asset_partition_key]
File "/Users/andrewryan/opt/miniconda3/envs/dagster/lib/python3.10/site-packages/dagster/_core/execution/context/input.py", line 312, in asset_partition_key
return self.step_context.asset_partition_key_for_input(self.name)
File "/Users/andrewryan/opt/miniconda3/envs/dagster/lib/python3.10/site-packages/dagster/_core/execution/context/system.py", line 787, in asset_partition_key_for_input
check.failed(
File "/Users/andrewryan/opt/miniconda3/envs/dagster/lib/python3.10/site-packages/dagster/_check/__init__.py", line 1485, in failed
raise CheckError(f"Failure condition: {desc}")
Mykola Palamarchuk
10/19/2022, 10:47 AMNicolas Parot Alvarez
10/19/2022, 11:10 AMmy_job = define_asset_job(
"my_job",
selection=AssetSelection.assets(source_files),
partitions_def=DAILY_PARTITION,
)
I also want to use a sensor that tells which files should be integrated. This sensor computes the difference between an integration table listing the integrated files and the files found in the source directory.
Because I still want to monitor which days were integrated, I still want to go through my partitioned job and assets from above, so from the missing files, I compute the dates where files are missing and request runs for those dates.
This also works well.
@sensor(job=my_job)
def my_partitioned_sensor():
computed_partition_date = "2022-10-18"
return my_job.run_request_for_partition(partition_key=computed_partition_date)
Now where things get complicated, is that because my machine is slow, and my files are big, I want to avoid reintegrating already integrated files while still using the partition feature for monitoring in the Partitions tab. So I want my sensor to trigger the run of partial partitions.
I define a partial partition by adding an optional config to my asset, which is a requested list of files to integrate for a partition computed by the sensor. So now I have to provide the config requested_file_paths
through a run_config
to the run request, and that's where I fail to find a solution:
1. my_job.run_request_for_partition()
doesn't accept a run_config
.
2. I'm failing to create a partition run using the lower level RunRequest()
. The job triggered by my sensor keeps failing with: dagster._check.CheckError: Invariant failed. Description: Tried to access partition_key for a non-partitioned run
. The core of the issue seems to be building the correct config so Dagster understands it's a partitioned job.
I'm attaching a working example below (except for the partial sensor).
Thanks!Olivier Dupuis
10/19/2022, 2:44 PMMartin O'Leary
10/19/2022, 4:23 PM@dataclass
class Foo:
"""some description"""
first: dict[str, int]
second: list[float]
third: SomeEnum
to an op's config schema like:
dagster.Permissive(
fields={
"first": Field(dagster.Permissive()),
"second": Field(dagster.Array(float)),
"third": Field(dagster.Enum.Enum.from_python_enum(SomeEnum)),
}
)
Anyone ? 😂Caio Tavares
10/19/2022, 6:08 PMAdam Bloom
10/19/2022, 6:33 PMMike Atlas
10/19/2022, 6:55 PMephemeralStorage
? I'm not having any luck using ecs/task_overrides
tag to set that valueShondace
10/19/2022, 9:22 PMAdam Bloom
10/19/2022, 10:08 PMSTOPPED
. Oddity #1 - it started as enabled the first time. Which is likely related to oddity #2 - I've just enabled the sensor, but it isn't ticking. dagster-daemon, however, is trying to tick the old schedule. I'm getting no logs from SensorDaemon
with this sensor name, but I am getting the following: [dagster-daemon-559fb67f7f-4rhv8 dagster] 2022-10-19 22:04:52 +0000 - dagster.daemon.SchedulerDaemon - WARNING - Could not find schedule <schedule/sensor name> in repository <repo>. If this schedule no longer exists, you can turn it off in the Dagit UI from the Status tab.
Dagit shows the sensor enabled, and the status tab (that log should be corrected to "Overview" now) doesn't show a schedule that no longer exists (I disabled them all earlier)Alexander Whillas
10/20/2022, 2:28 AMget_dagster_logger
to get a logger in the sensor but the logs are not showing up in the cloudwatch daemon process?Jean Gonzalez
10/20/2022, 2:54 AMcontext.dagster_run.run_config
. I see the type is Mapping[str, object]
but I keep getting errors when trying to access one of the values.Mikhail Knyazev
10/20/2022, 10:30 AMdict[str, dict[frozenset[tuple[float, float]], str]]
in inputs and outputs? I’m getting TypeError: isinstance() argument 2 cannot be a parameterized generic
error while doing so. Full code in the thread:Daniel Gafni
10/20/2022, 12:11 PMpath
) without actually loading the asset?
For example, I need to run a third-party utility (like inserting data stored in object storage into a database). I need the upstream asset path but I don't want to load the asset itself into memory.
I understand I can make a custom IO manager that would only return this metadata, but maybe there is a better way?Brandon Butler
10/20/2022, 3:02 PMSteven Tran
10/20/2022, 5:49 PMSteven Tran
10/20/2022, 5:52 PMZach P
10/20/2022, 8:05 PMFélix Tremblay
10/20/2022, 8:15 PMAirton Neto
10/20/2022, 9:20 PMdef retrieve_dataset(date, fct_step, lats...):
, which isn't dagster decorated, inside an asset.
This function works fine when I'm using built-in types. For example, variables: list[str] is one of these args.
Current definition for the Asset is
@asset(
description="GFS dataset fetched for NOAA Forecast data from NOAA NOMADS server.",
config_schema=dict(
date=Field(
str, default_value=pd.Timestamp.now(tz="UTC").strftime("%Y-%m-%d")
),
lats=Field(list, default_value=[-4.0, -2.75]),
lons=Field(list, default_value=[-41.5, -40.25]),
runtime=Field(str, default_value="00"),
variables=Field(list, default_value=noaa_gfs_features),
),
output_required=True,
)
def noaa_dataset(context):
date = context.op_config["date"]
fct_step = None
lats = context.op_config["lats"]
lons = context.op_config["lons"]
runtime = context.op_config["runtime"]
variables = context.op_config["variables"]
source = "model-gfs-003"
dataset = retrieve_dataset(
date, fct_step, lats, lons, runtime, source, variables
)
return Output(dataset)
When I do the following to test the asset (which calls this funtion inside it):
test_variables = noaa_gfs_features[0:2]
with build_op_context(
config={
"variables": test_variables,
}
) as context:
dataset = noaa_dataset(context)
There comes an error KeyError: ['dpt2m', 'pres80m']
, and I think this error occurs because variables
, when declared as
variables = context.op_config['variables']
is typed 'dagster._utils.frozenlist'
OBS: Applying variables = list(variables) solves this error. But I don't think this is the right way to solve it.Stefan Adelbert
10/21/2022, 2:27 AMdagit
Is there a way to force a sensor tick in dagit
(or otherwise)? I've just deployed a new sensor with minimum_interval_seconds=10*60
, which is reasonable in general, but sometimes I'd like to be able to force a tick rather than waiting for the next tick.Kyle Gobel
10/21/2022, 2:57 AMMaksym Domariev
10/21/2022, 5:13 AM```load_from:
- python_package:
package_name: common
- python_package:
package_name: keyword_extraction
- python_package:
package_name: nlp_v3```How can I hint GRPC to check this file? I didn't find that in the options
Mykola Palamarchuk
10/21/2022, 1:02 PMmem_io_manager
).Anonymus ly
10/21/2022, 1:44 PMjob_x = <http://graph_x.to|graph_x.to>_job(config = {"ops": {"op_x": {"config": {config_param": {"env": "CONFIG_PARAM_ENV"}}}})
I have doubts on how to supply the ENV value. More specifically where to specify the env variable's value. ENV's value should not be baked into the code.
What's working:
I specify the env var and its value(value is fetched by docker from .env file which is in gitignore):
1. in the user_code container's service in the docker-compose.yml.
2. in dagster_daemon container's service in the docker-compose.yml.
3. specify the env var's name (not value) in dagster.yaml under DockerRunLauncher's config->env_vars
Not providing any one of the above results in errors saying (one way or other) that the env var can't be found
As noted above, these are two different compose files (and kind of two different repos/projects).
I understand that DockerRunLauncher needs to know which env var to pass to the new docker container that will be spun up for the run, hence the 3rd point. Now run launcher needs to know the env var's value, so it searches in it's local env (which i think is the daemon since daemon starts the containers), so 2 is required.
I don't understand why 1 is required. Is it because I'm launching runs from dagit or something?
Ideally I want the env vars to be available by only changing user_code related stuff, not the "core" stuff. Because user_code will change more frequently since it has the logic for the pipelines, whereas the "core" stuff should not change often
Is there a way to achieve this?Jason
10/21/2022, 1:57 PMDagsterInstance.get().get_latest_materialization_event(asset_key=AssetKey("<my asset name>"))
but this always returns None...
Could you help me with finding a way to get info about the last materialization of an asset?Blake Hughes
10/21/2022, 3:29 PMmy_airbyte_resource = airbyte_resource.configured(
{
"host": "localhost",
"port": "8000",
}
)
sync_foobar = airbyte_sync_op.configured({"connection_id": "8bb7ae88-350d-40f4-947c-dfbe682311af"}, name="sync_foobar")
Zach
10/21/2022, 4:48 PMstep_context.run_id
and step_context.pipeline_run.run_id
?Zachary Bluhm
10/21/2022, 5:58 PM_get_dbt_op
function.
However when running the associated job, it seems like there can be random cases where an asset fails to materialize because the op
outputs the wrong table. Example:
dagster._core.errors.DagsterInvariantViolationError: Core compute for op "stg_guild_joined" returned an output "stg_accepted_instant_invite" that does not exist. The available outputs are ['stg_guild_joined']