Mat Brady
10/05/2022, 1:57 PMBalázs Dukai
10/05/2022, 2:19 PMrepository.py
.
resource_defs_by_deployment_name = {
"prod": RESOURCES_PROD,
"local": RESOURCES_LOCAL,
}
@repository
def repo():
deployment_name = os.environ.get("DAGSTER_DEPLOYMENT", "local")
resource_defs = resource_defs_by_deployment_name[deployment_name]
definitions = [
with_resources(all_assets, resource_defs),
*all_jobs
]
return definitions
Then the RESOURCES_LOCAL
contains the resource definitions for the local environment, and it is defined together with all the resources in resources.___init___.py
(this is wrong!)
from resources.files import file_store
from resources.database import container
file_store_temp = file_store.configured({})
RESOURCES_LOCAL = {
"file_store": file_store_temp,
"container": container,
}
Where file_store
and container
are resource definitions. While file_store_temp
is a configured resource.
Now, here is where I stumbled (for many...many...hours).
I have a job, job_samle_data_image_test
, which is run by a run_status_sensor
. The important bit is the run_config
that goes into the job_sample_data_image_test
that is executed by the RunRequest
.
@run_status_sensor(
run_status=DagsterRunStatus.SUCCESS,
name=f"asset_testing_success",
monitored_jobs=[job_test, ],
request_job=job_sample_data_image_test
)
def sensor_asset_testing_success(context):
# The container and the temp dir in the monitored job were created with the
# Run Id of the monitored job. In order to find the container the dir path, we
# need the Run Id of the monitored job.
run_id = bcore.get_run_id(context, short=True)
container_id = make_container_id(run_id)
temp_path = make_temp_path(run_id)
image_tag = f"{datetime.today().date().isoformat()}-{run_id}"
if context.dagster_run.job_name != job_sample_data_image_test.name:
run_config = {
"ops": {
"sample_data_image_test": {
"config": {"image_repository": "test/sample-data",
"image_tag": image_tag,
"image_data_dir": "/tmp"}}
},
"resources": {
"file_store": {"config": {"data_dir": temp_path}},
"container": {"config": {"id": container_id}},
}
}
return RunRequest(run_key=None, run_config=run_config)
else:
return SkipReason("Don't report status of status_reporting_job")
Notice that in the run_config
I explicitly configure the two resources.
When the job_sample_data_image_test
is run, guess what resource configuration values is it going to take?
My guess was that it will take whatever I pass in the run_config
, thus "file_store": {"config": {"data_dir": temp_path}}
and "container": {"config": {"id": container_id}}
.
Well, it turns out that this is just half-true.
The job will get a "file_store": {"config": {}}
, because that is what is configured for file_store_temp
, which is passed to RESOURCES_LOCAL
, which apparently overwrites the run_config
for the job.
Then the container
will get {"id": container_id}
from the run_config
, because RESOURCES_LOCAL only has the resource definition, and not the configured resource.
That's it, I hope it can save some time for others.Zachary Bluhm
10/05/2022, 3:04 PMFélix Tremblay
10/05/2022, 3:10 PMAlexander Voishchev
10/05/2022, 3:12 PM@asset
def asset1():
df = get_external_data()
return df
@asset
def asset2(asset1):
# Load the current data of asset2 (state from previous materialization)
df = get_asset2_data()
# Append the data from asset1
df = pd.concat(df, asset1)
# Return the comdined result
return df
Is there any elegant way to do that without custom IOManager?
P.S. I found an example with an asset which returns itself data, as far as I understand:
examples/docs_snippets/docs_snippets/concepts/assets/asset_w_context.py
@asset(required_resource_keys={"api"})
def my_asset(context):
# fetches contents of an asset
return context.resources.api.fetch_table("my_asset")
But it is an abstract code, as I wrote above.Sean Lopp
10/05/2022, 4:33 PMload_asset_value
functionality in a notebook as an aide for local development. I am able to import my repository successfully, and I confirm in the notebook that DAGSTER_HOME
is set, but when I attempt to load the asset I get a file not found error because it is attempting to read from /tmp
. Perhaps I am not understanding how assets are loaded outside of a running dagster context, but is there a way to specify the storage path that was used for materializations?daniel
10/05/2022, 5:30 PMJames Hale
10/05/2022, 6:27 PMsnowflake_config = {...}
# ...
resource_defs = {
"snowflake_io_manager": snowflake_io_manager.configured(snowflake_config),
"snowflake_resource": snowflake_resource.configured(snowflake_config),
}
David Jayatillake
10/05/2022, 9:50 PMSaad Anwar
10/05/2022, 10:30 PMIsmael Rodrigues
10/06/2022, 3:20 AMteodorpaius
10/06/2022, 8:56 AMDavi
10/06/2022, 9:22 AMGeorge Fourmouzis
10/06/2022, 9:48 AMTimo
10/06/2022, 10:45 AMgeoHeil
10/06/2022, 1:29 PMHuy Dao
10/06/2022, 1:56 PMHuy Dao
10/06/2022, 1:57 PMJosh Clark
10/06/2022, 2:00 PM@asset(partitions_def=DailyPartitionsDefinition(start_date="2022-01-01"), io_manager_key="fs_io_manager")
def my_daily_partitioned_asset(context):
<http://context.log.info|context.log.info>(
f"Processing asset partition '{context.asset_partition_key_for_output()}'"
)
return [1,2,3,4,5]
assets_with_io = with_resources(
[my_daily_partitioned_asset],
resource_defs={
"fs_io_manager": fs_io_manager.configured(
{"base_dir": "/Users/josh/Documents/axion/axion-pipeline/assets"}
)
},
)
@repository
def repo():
return [assets_with_io]
But when I try to load (after materialising the asset) with
asset1_value = repo.load_asset_value(AssetKey(("my_daily_partitioned_asset","2022-10-05")))
or
asset1_value = repo.load_asset_value(AssetKey("my_daily_partitioned_asset/2022-10-05"))
I get KeyError: AssetKey
CJ
10/06/2022, 2:01 PMVinnie
10/06/2022, 2:06 PMAssetSelection
that matches all assets that include a specific key without passing the fully qualified name? e.g. I have assets foo / bar
and foo / baz
, I’d like to pass AssetSelection.some_method("foo")
and have it select both assets. I tried passing it to the AssetSelection.keys()
and it didn’t work. Looking at the source code, there doesn’t seem to be a more permissive method.
On a sidenote, I don’t think the AssetSelection
shows up in the docs website, I just found it by chance somewheregeoHeil
10/06/2022, 2:22 PMSaul Burgos
10/06/2022, 3:09 PMclass LazyRepositories():
get_all_jobs(self) -> List[JobDefinition]:
#response = requests.get('<https://api/jobs>')
#create dynamic jobs
#return jobs
def get_all_pipelines(self) -> List[PipelineDefinition]:
response = requests.get('<https://api/pipelines>')
#create dynamic pipelines
#return pipelines
def get_all_sensors(self) -> List[SensorDefinition]:
response = requests.get('<https://api/sensors>')
def get_all_schedules(self) -> List[ScheduleDefinition]:
response = requests.get('<https://api/schedules>')
#create dynamic schedules
#return schedules
@repository
def core_generic():
return LazyRepositories()
If the my API for some reason fails, My questions are:
- Should I add the logic on "My dagster repository" to retry again?
- Does dasgter has an endpoint to reset all the process?
- Should I manage the process of retrying outside of the dagster repostory? I mean.. check if the repository was loaded and if not wait and reload the repository?
What I am trying to ask is ....what would be the best approach if my API fails... inside dagster or not?Carter
10/06/2022, 3:42 PMSteven Tran
10/06/2022, 4:06 PMMadeline Wu
10/06/2022, 6:43 PMRunRequest
, which i I want to run with a dagster-k8s/config
tag. I’m running into a CheckError
because the value in the tags
are expected to be a str but the dagster k8s config is a dict. The relevant source code seems to be here. Is the str typing intended for the RunRequest
or can we loosen the typing requirement?James Hale
10/06/2022, 6:44 PMPablo Beltran
10/07/2022, 1:08 AMpeay
10/07/2022, 10:26 AMcontext.op_config
is None
when I try and materialize an asset without providing any configuration override. However, context.op_config
is correctly set if I do override at least one field from the materialize launchpad. Is this a known bug?
Running Dagster 1.0.6Archie Kennedy
10/07/2022, 10:30 AMPermissive
dict with 1 required field (op config_schema).
Example:
@op(
config_schema={
"payload": Field(
Permissive(
{
"URI": Field(str, is_required=True),
"Webhook": Field(str, is_required=False),
},
)
)
}
)