Martin Laurent
02/08/2024, 3:19 PMScout
02/08/2024, 3:19 PMdefine_asset_job
and using op_tags
to dictate the behavior of the hook. Here's an example provided in the GitHub issue: python @success_hook def slack_message_on_success(context: HookContext): if context.op.tags.get("success_message"): message = ( f"{context.op.name} with tags {context.op.tags} in job {context.job_name} succeeded." ) <http://context.log.info|context.log.info>(message) @asset( op_tags={"success_message": True}, ) def my_asset(): ...
This example shows a custom hook slack_message_on_success
that sends a Slack message when an op succeeds, and it's being used in conjunction with a software-defined asset by setting op_tags
on the asset. This is a workaround and not the direct way to set hooks on assets, as you would with ops in jobs. Please note that this is a workaround and might not be the ideal or recommended way to use hooks with software-defined assets. The Dagster team might introduce more direct support for this functionality in the future, and it's always a good idea to check the latest documentation or GitHub discussions for updates on this topic.Martin Laurent
02/08/2024, 3:23 PMMartin Laurent
02/08/2024, 3:30 PMdefine_asset_job
Scout
02/08/2024, 3:30 PMdefine_asset_job
, the resources are supplied to the assets, not the job itself. Each asset can specify its required resources using the required_resource_keys
parameter in the @asset
decorator. Here's an example of how you might define a resource and supply it to an asset, which is then included in a job defined by `define_asset_job`: python from dagster import asset, define_asset_job, ResourceDefinition, resource # Define a resource @resource def my_resource(context): return "my_resource_value" # Define an asset that requires the 'my_resource' resource @asset(required_resource_keys={"my_resource"}) def my_asset(context): # Access the resource via context.resources resource_value = context.resources.my_resource # ... use the resource_value ... # Define a job that includes the 'my_asset' asset my_asset_job = define_asset_job( name="my_asset_job", selection=[my_asset], resource_defs={"my_resource": my_resource} # Supply the resource to the job )
In this example, my_resource
is a resource that's being defined and then supplied to my_asset
via the required_resource_keys
parameter. When defining the job with define_asset_job
, you can pass the resource_defs
argument to supply the resource definitions that the assets within the job require. Please note that the example above is based on the typical usage pattern in Dagster as of the knowledge cutoff date. Always refer to the latest Dagster documentation for the most up-to-date practices and API usage.Martin Laurent
02/08/2024, 4:24 PMScout
02/08/2024, 4:24 PMretry_policy
argument within the @asset
decorator. This allows you to specify how the asset should be retried in case of failure during computation. You can define the number of retries, the delay between retries, and other parameters such as backoff and jitter.
Here's an example of how to set a retry policy on an asset:
python
from dagster import asset, RetryPolicy, Backoff, Jitter
@asset(
retry_policy=RetryPolicy(
max_retries=3,
delay=0.2, # 200ms
backoff=Backoff.EXPONENTIAL,
jitter=Jitter.PLUS_MINUS,
)
)
def my_asset():
# Asset computation logic here
...
In this example, the RetryPolicy
is configured to retry the asset computation up to three times with an initial delay of 200 milliseconds between retries. The delay will increase exponentially due to the Backoff.EXPONENTIAL
setting, and the Jitter.PLUS_MINUS
setting will add a random amount of jitter to the delay to prevent thundering herd problems.
Please ensure that you refer to the latest Dagster documentation to confirm the current API and best practices for setting retry policies on assets, as the platform is actively developed and features may evolve. How can i ensure that my assets and ops ... button Thank you for your feedback!