Binoy Shah
09/27/2022, 9:37 PMExperimentalWarning: "asset_key" is an experimental argument to
function "OutputDefinition.__init__". It may break in future versions, even between dot releases.
To mute warnings for experimental functionality,
invoke warnings.filterwarnings("ignore", category=dagster.ExperimentalWarning) or use one of the other methods
described at <https://docs.python.org/3/library/warnings.html#describing-warning-filters>.
return klass(
What does it mean and is this likely to break my system in future ?fahad
09/27/2022, 10:19 PMdagster_type_loader
? I have one defined using the decorator:
@dagster_type_loader(...config_schema, resources...)
def remote_path_loader(context, config) -> Path:
...pull a file and create a Path...
which is tested as such:
def test_remote_path_loader():
remote_path_loader(???, {"config_1": "value_1"})
But this just results in TypeError: 'DagsterTypeLoaderFromDecorator' object is not callable
. I’m also not sure how to build a StepExecutionContext
for thisYang
09/27/2022, 11:17 PM{
ops: {
load_config_from_yaml: {
inputs: {
score_config_filepath: esg/ybscore/jobs/idealratings_esg_score_config.yaml
}
},
pillar_scores: {
inputs: {
pillar_name: governance
}
}
},
resources:{
esg_outputter: {
config: {
new_score_versions: {
metric: '11',
trbc_industrygroup: {
score: '11.0',
pillar: '11.0.0'
}
},
scoring_context: trbc_industrygroup
}
}
},
I'm getting an error Missing , or : between flow map items
. It seems like it doesn't recognize resources as a keyword? But I think it should, and I've input resource configs for other jobs before. Any pointers? Thanks!Charles Zhan
09/27/2022, 11:28 PMVivek Ayer
09/28/2022, 12:41 AMdev
and branch
deployment, how would I add another ECS agent to handle prod
? Thanks!Ravishankar S R
09/28/2022, 6:36 AMValentin Vareškić
09/28/2022, 7:01 AMAlessandro Cantarelli
09/28/2022, 9:14 AM@asset(
partitions_def=daily,
io_manager_key="parquet_io_manager",
ins={'test': AssetIn(metadata={'allow_missing_partitions': True})}
)
def load_data(test: pandas.DataFrame) -> pandas.DataFrame:
return test
@asset(
partitions_def=daily,
io_manager_key="parquet_io_manager",
ins={"load_data": AssetIn(metadata={'allow_missing_partitions': True}, partition_mapping=NDaysPartitionMapping(days=1, offset=0))},
# days - determins number of extra days of data loaded (i.e. days=1 for partition 2018-01-03, will return data from 2018-01-03 and 2018-01-02)
# offset - offsets the data retrieved by date (i.e. offset = 1 for partition 2018-01-03, will return data from 2018-01-02 )
config_schema={'cellsSeries' : int, 'cellsParallel' : int}
)
def trailing_window(context, load_data: pandas.DataFrame) -> pandas.DataFrame:
df = load_data
<http://context.log.info|context.log.info>(f'Days of data loaded: {df.date.unique()}')
tidied_raw_data = two_day_data_trial.tidy_raw_data(context, database=df)
soc_ocv_lut = wr.s3.read_parquet("<s3://base-infra-bostonmodeldevelopment-lq00f0lfzytq/dagster/single_event_data/ocv_soc_lut.parquet>")
return tidied_raw_data
test_invoked = with_resources(
[test, load_data, trailing_window],
RESOURCES_PROD,
)
@repository
def source_repo():
return [test_invoked, schedule]
The first asset just loads in data from an s3 bucket and returns a DataFrame, This DataFrame is then inputted into the second asset that partitions it and does some operations. This second asset requires two config schemas ('cellsSeries' : int, 'cellsParallel' : int). How do I pass these?
This is the source asset I use:
test = SourceAsset(
key=AssetKey("test"),
description="Example partitioned data",
io_manager_key="parquet_io_manager",
partitions_def=daily,
)
This is the job scheduler I use:
schedule = build_schedule_from_partitioned_job(
define_asset_job("main", partitions_def=daily)
)
These are the resources I am using:
RESOURCES_PROD = {
"s3_bucket": ResourceDefinition.hardcoded_resource(
"some-url"
),
"s3": s3_resource,
"s3_prefix": ResourceDefinition.hardcoded_resource("dagster"),
"parquet_io_manager": s3_partitioned_parquet_io_manager,
}
How can I pass a config file into this setup so that ops called inside the "trailing_window" asset can access values using for example context.op_config['cellsParallel']?
Thanks!Thomas
09/28/2022, 10:04 AMVinnie
09/28/2022, 10:09 AMtarget
isn’t accepted by load_assets_from_dbt_project
? I’m currently facing an issue with that since I need to extract some information from the manifest that changes depending on the target (different dbs for dev and prod, need them to be part of the asset keys). Happy to open a PR if there’s no reason it’s not currently supported.Navneet Sajwan
09/28/2022, 10:47 AMKirk Stennett
09/28/2022, 12:25 PM@failure_hook
. I had an instance where a job was invoked but resource creation failed and an error was thrown. I'd expect the failure_alert to be triggered there but nothing happened. Is that the expected functionality? Or is there a way to have that be under the umbrella of the job's failure hook?Chris Histe
09/28/2022, 2:01 PMContext
]
Hello there,
is there an equivalent of build_init_resource_context
for StepExecutionContext
. I’m unit testing a custom IO Manager and getting an error when accessing _context_.has_partition_key
(InputContext). It seems you can pass a StepExecutionContext
to build_input_context
but I’m not sure how to instantiate that class.Sean Lopp
09/28/2022, 2:53 PM@asset
def a():
same_logic("a")
@asset
def b():
same_logic("b")
@asset
def c():
same_logic("c")
@asset(
ins = {"a": AssetIn(), "b": AssetIn(), "c": AssetIn()},
)
def all(a, b, c):
process(a,b,c)
I'm wondering if there is a more DRY approach to generating the assets? This doesn't seem to be exactly the use case for multi-assets, since each of a,b,c are generated by independent calls. I've considered programmatically yielding assets instead of using the decorator, but I wanted to check if I was missing anything obvious before heading down that path. I am also not sure of the best way to update all
when new ins are added. Thanks for the advice!Ruoyu Qian
09/28/2022, 3:50 PMdefine_asset_job()
where I can not only materialize dbt models but also test them?
So currently I have a sensor that got triggered when dbt model get materialized successfully by dagster. I want the sensor to be trigger not but dbt run
but by dbt test
Jan
09/28/2022, 4:51 PMMartino Ongaro
09/28/2022, 5:13 PMJeremy
09/28/2022, 5:30 PMgateway_pings_group_prod = AssetGroup(
[gateway_pings, get_unique_ip_addresses],
resource_defs=RESOURCES_PRODUCTION
)
gateway_pings_prod = gateway_pings_group_prod.build_job(
name="splunk_gateway_pings",
tags={"notify": "slack"}
)
gateway_pings_schedule = build_schedule_from_partitioned_job(
gateway_pings_prod
)
how do i do this now? I see that AssetGroup is deprecated and I should use load_assets_from_package_module
but I don;t see how to set resource_defs
and how should I rewrite the build_job part?
also, i don’t have my assets split into modules. do I need to do that? [gateway_pings, get_unique_ip_addresses]
are just 2 assets in a file with several others.claire
09/28/2022, 6:26 PMFraser Marlow
09/28/2022, 7:05 PMApoorv
09/28/2022, 9:00 PMJeremy
09/28/2022, 11:47 PMTimo
09/29/2022, 7:31 AMAverell
09/29/2022, 12:07 PMRyan Navaroli
09/29/2022, 12:23 PMUser code deployments can be updated independently from other Dagster components, including Dagit. As a result, updates to repositories can occur without causing downtime to any other repository or to Dagit. After updating, if there is an error with any repository, an error is surfaced for that repository within Dagit; all other repositories and Dagit will still operate normally.According to this thread https://dagster.slack.com/archives/C01U954MEER/p1650362406805329 it basically seems that this is not true? Is there some other meaning behind the statement? Best I can ascertain is that it alludes to the fact that the docker image behind a user code repository could be changed, however the daemon/dagit doesn't pick up changes in that case and that goes against the recommendations in the docs to note use a fixed tag "we recommend using a unique tag (ie not "latest")." Ultimately, I would like do what the docs seem to state. Ideally, I'd like to separate my core dagster deployment (dagit, daemon) from each individual user code repository. To note, I am attempting to use the dagster and dagster-user-deployment helm charts.
Ashley Dsouza
09/29/2022, 1:07 PMKirk Stennett
09/29/2022, 1:22 PMBrennon Lee
09/29/2022, 2:13 PMdagster_cloud.yaml
file (or a way to just configure multiple files)? We have different container registries as well as different secret tags we'd like to pass in on a per environment basis.
Ex. Our production dagster_cloud.yaml
would look like:
locations:
- location_name: workflows
code_source:
package_name: workflows
build:
directory: ./
registry: <http://123456789.dkr.ecr.us-east-1.amazonaws.com/production-deployments|123456789.dkr.ecr.us-east-1.amazonaws.com/production-deployments>
container_context:
ecs:
secrets_tags:
- my_secrets_with_production_tag
and our dev dagster_cloud.yaml
would look like:
locations:
- location_name: workflows
code_source:
package_name: workflows
build:
directory: ./
registry: <http://123456789.dkr.ecr.us-east-1.amazonaws.com/dev-deployments|123456789.dkr.ecr.us-east-1.amazonaws.com/dev-deployments>
container_context:
ecs:
secrets_tags:
- my_secrets_with_dev_tag
And so on for additional environments we have....does anyone have recommendations for handling this? I may be off basis and there's probably a better approach to thisBalázs Dukai
09/29/2022, 3:21 PMConfigMapping.resolve_from_validated_config(config)
to pre-configure my jobs. While it "works", as in, the configuration with "geofilter": "bla bla bla"
shows up in the Launchpad in dagit for job_main_nl_test
, the pre-configuration does not show for job_main_nl_sample
. Although, the two jobs are defined in the same way, both definitions are in the same module. The difference between them are the assets they contain.
I refreshed+restarted dagit several times.
job job_main_nl_sample
does not get it's configuration pre-filled with "geofilter": "bla bla bla"
in the dagit Launchpad. See the screenshot of the Launchpad.
@config_mapping(config_schema={
"geofilter": str
})
def config_main_pipeline(val):
return {
"ops": {
'extract_bgt': {
'config': {'download_dir': '...', 'geofilter': val['geofilter']}},
'extract_top10nl': {
'config': {'download_dir': '...', }}},
}
assets_source_and_input = AssetSelection.groups("source") | AssetSelection.groups("input")
job_main_nl_sample = define_asset_job(
name="job_main_nl_sample",
selection=assets_source_and_input,
config=config_main_pipeline.resolve_from_validated_config({"geofilter": "bla bla bla"})
job job_main_nl_test
does get it's configuration pre-filled with "geofilter": "bla bla bla"
in the dagit Launchpad. See the screenshot of the Launchpad.
@asset(
config_schema={
"featuretypes": Field(
list,
default_value=["gebouw", ],
description="The feature types to download.",
is_required=False
),
"geofilter": Field(
str,
description="WKT of the polygonal extent",
is_required=False
),
"download_dir": str
},
group_name="group_bla"
)
def bla(context):
<http://context.log.info|context.log.info>(context.op_config)
@config_mapping(config_schema={
"geofilter": str
})
def config_test(val):
return {
"ops": {'bla': {'config': {'download_dir': '...', 'geofilter': val['geofilter']}}, },
}
job_main_nl_test = define_asset_job(
name="job_main_nl_test",
selection=AssetSelection.groups("group_bla"),
config=config_test.resolve_from_validated_config({"geofilter": "bla bla bla"})
)
Konrad Schlatte
09/29/2022, 3:27 PMdagster._core.definitions.events.Failure: Request aa18527d-91c9-4b5d-8cd1-a1b6aa9d1949 finished with state 'failed' in 82.054154 seconds
File "/home/ec2-user/dagster/.venv/lib64/python3.7/site-packages/dagster/_core/execution/plan/execute_plan.py", line 224, in dagster_event_sequence_for_step
for step_event in check.generator(step_events):
File "/home/ec2-user/dagster/.venv/lib64/python3.7/site-packages/dagster/_core/execution/plan/execute_step.py", line 358, in core_dagster_event_sequence_for_step
_step_output_error_checked_user_event_sequence(step_context, user_event_sequence)
File "/home/ec2-user/dagster/.venv/lib64/python3.7/site-packages/dagster/_core/execution/plan/execute_step.py", line 69, in _step_output_error_checked_user_event_sequence
for user_event in user_event_sequence:
File "/home/ec2-user/dagster/.venv/lib64/python3.7/site-packages/dagster/_core/execution/plan/compute.py", line 174, in execute_core_compute
for step_output in _yield_compute_results(step_context, inputs, compute_fn):
File "/home/ec2-user/dagster/.venv/lib64/python3.7/site-packages/dagster/_core/execution/plan/compute.py", line 151, in _yield_compute_results
user_event_generator,
File "/home/ec2-user/dagster/.venv/lib64/python3.7/site-packages/dagster/_utils/__init__.py", line 430, in iterate_with_context
next_output = next(iterator)
File "/home/ec2-user/dagster/.venv/lib64/python3.7/site-packages/dagster/_core/execution/plan/compute_generator.py", line 73, in _coerce_solid_compute_fn_to_iterator
result = fn(context, **kwargs) if context_arg_provided else fn(**kwargs)
File "/home/ec2-user/dagster/.venv/lib64/python3.7/site-packages/dagster_dbt-1.0.5-py3.7.egg/dagster_dbt/ops.py", line 116, in dbt_test_op
return context.resources.dbt.test()
File "/home/ec2-user/dagster/.venv/lib64/python3.7/site-packages/dagster_dbt-1.0.5-py3.7.egg/dagster_dbt/rpc/resources.py", line 340, in test
return self._get_result(data=json.dumps(data))
File "/home/ec2-user/dagster/.venv/lib64/python3.7/site-packages/dagster_dbt-1.0.5-py3.7.egg/dagster_dbt/rpc/resources.py", line 561, in _get_result
f"Request {request_token} finished with state '{current_state}' in "