王昊
05/19/2023, 1:17 PMs3 = S3Resource.configure_at_launch(
endpoint_url=os.getenv('MINIO_S3_ENDPOINT_URL'),
aws_access_key_id=os.getenv('MINIO_S3_ACCESS_KEY'),
aws_secret_access_key=os.getenv('MINIO_S3_SECRET_KEY'),
)
defs = Definitions(
assets=load_assets_from_package_module(assets),
resources={
's3': s3,
'parquet_io_manager': S3PartitionedParquetIOManager(
s3=s3,
s3_bucket='dagster-data',
s3_prefix='',
),
},
jobs=[mongo_organization_materialize_job],
)
And my job code is as follows:
executor = dask_executor.configured({
"cluster": {
"local": {
"timeout": 5,
"n_workers": 2,
"threads_per_worker": 1,
}
}
})
mongo_organization_materialize_job = define_asset_job(
'materialize_mongo_organization_job',
selection=[mongo_organization],
partitions_def=mongo_organization_partitions_def,
executor_def=executor,
)
I found the following description in the documentation:
For distributing task execution on a Dask cluster, you must provide a config block that includes the address/port of the Dask scheduler:
```resources:
io_manager:
config:
s3_bucket: your_bucket_name
execution:
config:
cluster:
existing:
address: "dask_scheduler.dns_name:8787"```I want to know if these configurations exist as yaml files, where they need to be placed, and how to load them in Dagster.
Himanshu Kumar
06/21/2023, 7:49 AM