Sean Han
12/13/2022, 5:49 PM$ cat .env
# .env
SNOWFLAKE_USERNAME=user@example.com
$ dagit
2022-12-13 09:48:21 -0800 - dagit - INFO - Serving dagit on <http://127.0.0.1:3000> in process 58295
Is there any indication that dagit has recognized the file and ingested it correctly? The error I see is
dagster._core.errors.DagsterInvalidConfigError: Error in config for resource snowflake
Error 1: Post processing at path root:config:user of original value {'env': 'SNOWFLAKE_USERNAME'} failed:
dagster._config.errors.PostProcessingError: You have attempted to fetch the environment variable "SNOWFLAKE_USERNAME" which is not set. In order for this execution to succeed it must be set in this environment.
Mark Fickett
12/13/2022, 6:28 PM@op
yielding `DynamicOutput`s that's taking 30m to yield 5600 outputs, taking about 0.5s between each "Yielded output" log message both when it started and when it was finishing. Is this expected? Running on k8s. I wouldn't mind except it doesn't seem like the outputs start running until the op finishes. The op is pretty simple:
@dataclass
class Test:
test_id: str
# etc
@op(out=DynamicOut(Test))
def yield_each_test(test_list: List[Test]):
for test in test_list:
yield DynamicOutput(value=test, mapping_key=test.test_id)
Is there a way to structure the @op
more efficiently?Sven Lito
12/13/2022, 6:28 PMdagster._check.CheckError: Failure condition: Couldn't import module dagster_cloud.instance when attempting to load the class dagster_cloud.instance.DagsterCloudAgentInstance
Todd
12/13/2022, 6:46 PMRafael Gomes
12/13/2022, 8:37 PMGatsby Lee
12/13/2022, 9:18 PMrun_status_sensor
?
is multiple functions have to be defined with each DagsterRunStatus?Hebo Yang
12/13/2022, 11:42 PMuntil nslookup metrics-repo; do echo waiting for user service; sleep 2; done
Does it mean that if one user repo is problematic, dagit won’ boot up?Edmund Tian
12/14/2022, 4:51 AMJakub Zgrzebnicki
12/14/2022, 6:34 AM@sensor(job=asset_job, default_status=SOME_ENV_CONSTANCE)
def my_sensor():
...
Casper Weiss Bang
12/14/2022, 8:12 AMTimo Klockow
12/14/2022, 8:30 AMfreeze
the create_timestamp
| update_timestamp
when creating a run for testing purposes? I want to have it fixed but its always using the current timestamp
with instance_for_test() as instance:
for run in MOCK_RECORDS:
cast(DagsterInstance, instance).add_run(
PipelineRun(
pipeline_name=run.pipeline_run.pipeline_name,
run_id=run.pipeline_run.run_id,
status=run.pipeline_run.status,
)
)
[RunRecord(storage_id=3, pipeline_run=DagsterRun(pipeline_name='job_dependency', run_id='coo', run_config={}, mode=None, asset_selection=None, solid_selection=None, solids_to_execute=None, step_keys_to_execute=None, status=<DagsterRunStatus.SUCCESS: 'SUCCESS'>, tags={}, root_run_id=None, parent_run_id=None, pipeline_snapshot_id=None, execution_plan_snapshot_id=None, external_pipeline_origin=None, pipeline_code_origin=None), create_timestamp=datetime.datetime(2022, 12, 14, 9, 14, 20), update_timestamp=datetime.datetime(2022, 12, 14, 9, 14, 20), start_time=None, end_time=None), ... ]
I want to filter by said dates
or is there another way?Vinnie
12/14/2022, 9:15 AMtyping.Union
return type from an @op
that doesn’t cause Dagster to raise a DagsterInvalidDefinitionError
? I have an op
that should return one of two custom types depending on some of the config passed, but specifying it in the return type signature or Out(Union[type1, type2])
raises this error.
The workaround I found was to define another output type (below) but it feels unnecessary for this type of use case.
type1_or_type2 = DagsterType(
name="type1_or_type2",
type_check_fn=lambda _, x: isinstance(x, type1) or isinstance(x, type2),
)
scott simpson
12/14/2022, 9:19 AMDeprecationWarning: Non-built-in PartitionMappings, such as MarketToDatePartitionMap are deprecated and will not work with asset reconciliation. The built-in...
Am I still going to able to have custom PartitionMappings in the future? Or is this feature just going away for asset reconciliation sensors?Daniel Galea
12/14/2022, 9:27 AMApoorv Yadav
12/14/2022, 10:23 AMDaniel Galea
12/14/2022, 10:36 AM@Op(ins={"start": In(Nothing)})
but start
is not a key that is available in ins
of an asset
. I have a pipeline that is like:
ingest_dataset1 -> ingest_dataset2 -> start_processing_data
in the form of:
asset -> asset -> op.
The assets do not depend on each other, I just want one to run after the other.
I could also use:
op -> op -> op
but it is to my understanding that an Op should be used when performing some type of transformation or logic whereas an asset is signifies something which is stored to a persistent storage.Peter Davidson
12/14/2022, 12:12 PM@op
def generate_sample1() -> pd.DataFrame:
# <http://context.log.info|context.log.info>("config_param: " + context.op_config["config_param"])
return sample_data()
@graph
def graph_multi_sample():
n_samples = 5
samples = []
for i in range(n_samples):
samples.append(generate_sample1.alias(f"sample_{i}")())
return concat_samples(samples)
job_from_graph = graph_multi_sample.to_job(resource_defs=resource_defs, config=ops_output_config)
Where the ops_output_config is a conf mapping to generate file paths for each op:
@config_mapping(config_schema={"param_id": int})
def ops_output_config(val):
conf = load_conf_from_csv(val["param_id"])
workspace_root = os.path.join(conf.get('run_type'), conf.get('rep_date'), conf.get('nickname'))
ops_output_config_schema = {}
for output in ['concat_samples', 'generate_sample1', 'generate_sample2']:
output_path = os.path.join(workspace_root, 'result', f"{output}.pkl")
ops_output_config_schema[output] = {'outputs': {'result': {'output_path': output_path}}}
return {"ops": ops_output_config_schema}
Now the config mapping is not very complex -> is there a way to pass the required ops to the config mapping, so it knows to create records for all of the sample aliases?Daniel Gafni
12/14/2022, 12:27 PMsource_assets_with_resources = with_resources(
source_assets,
resource_defs=resource_defs,
resource_config_by_key=resource_config_by_key,
)
assets_with_resources = with_resources(
assets,
resource_defs=resource_defs,
resource_config_by_key=resource_config_by_key,
)
This code fails:
Error loading repository location repo.py:dagster._core.errors.DagsterInvalidDefinitionError: Conflicting versions of resource with key 'parquet_io_manager' were provided to different assets. When constructing a job, all resource definitions provided to assets must match by reference equality for a given key.
I'm clearly providing the same resources to different assets (source_assets
and assets
lists are not intersecting).
This code fails in Dagster:
resource_defs_from_assets = {}
all_assets: Sequence[Union[AssetsDefinition, SourceAsset]] = [*assets, *source_assets]
for asset in all_assets:
for resource_key, resource_def in asset.resource_defs.items():
if resource_key not in resource_defs_from_assets:
resource_defs_from_assets[resource_key] = resource_def
if resource_defs_from_assets[resource_key] != resource_def:
raise DagsterInvalidDefinitionError(
f"Conflicting versions of resource with key '{resource_key}' "
"were provided to different assets. When constructing a "
"job, all resource definitions provided to assets must "
"match by reference equality for a given key."
)
so for some reason the line if resource_defs_from_assets[resource_key] != resource_def
evaluates to True. How is this possible if I'm only defining each resource (io_manager) once? I went in with a debugger and apparently these objects are indeed different.Thomas
12/14/2022, 1:30 PMTodd
12/14/2022, 2:12 PMTodd
12/14/2022, 2:12 PMAlexis Manuel
12/14/2022, 2:35 PMquery test(
$repositorySelector: RepositorySelector!,
$partitionSetName: String!
)
{
partitionSetOrError(
repositorySelector: $repositorySelector
partitionSetName: $partitionSetName
) {
... on PartitionSet {
id
name
pipelineName
partitionsOrError {
... on Partitions {
results {
name
}
}
}
partitionStatusesOrError {
__typename
... on PartitionStatuses {
results {
id
partitionName
runStatus
runDuration
}
}
}
}
}
}
and I have the following response : {"error": "Unexpected token '<', \"\n<html><hea\"... is not valid JSON"}
Am I missing something or is it a known problem ?
I am using Dagster & Dagit version 1.0.17, deployed on K8S.Sheya Bernstein
12/14/2022, 2:45 PMKoby Kilimnik
12/14/2022, 3:34 PMpython_logs:
managed_python_loggers:
- root
- __main__
Thomas
12/14/2022, 4:01 PMTraceback (most recent call last):
File "/Users/thomas/miniforge3/envs/datagenerator/lib/python3.10/site-packages/IPython/core/interactiveshell.py", line 3433, in run_code
exec(code_obj, self.user_global_ns, self.user_ns)
File "<ipython-input-22-26c8dc208c5a>", line 10, in <module>
ins={"num": In(EvenDagsterType)},
TypeError: 'list' object is not callable
Caio Tavares
12/14/2022, 4:29 PMSven Lito
12/14/2022, 5:17 PMDean Morin
12/14/2022, 5:26 PMdagit
, I get ModuleNotFoundError: No module named 'dagster_dbt'
$ pip freeze | grep dagster-dbt
dagster-dbt==0.17.6
Any idea what’s going on here?Tony
12/14/2022, 5:54 PMGuru Prasath
12/14/2022, 7:51 PM