Sean Pool
03/13/2024, 9:59 PMScout
03/13/2024, 9:59 PMpython from dagster import asset @asset def query1(): # Your query logic here return query1_result @asset def generate1(query1_result): # Your generate logic here, using query1_result return generate1_result @asset def write1(generate1_result): # Your write logic here, using generate1_result pass # Assuming this step doesn't produce an output # Repeat the pattern for query2/generate2/write2, etc. @asset def upload_to_s3(write1_result, write2_result, write3_result, write4_result): # Your upload logic here, using the results from all write steps pass
In this example, each generate
asset depends on the result of the corresponding query
asset, and each write
asset depends on the result of the corresponding generate
asset. Finally, the upload_to_s3
asset depends on the results of all write
assets. By defining these dependencies, Dagster ensures that the assets are executed in the correct order, with each step waiting for the necessary inputs to be available. This is a simplified example, and in a real-world scenario, you would include the actual logic for querying, generating, writing, and uploading within the functions. Additionally, you might need to pass configuration or resources to your assets, which you can do using the @asset
decorator's parameters. Remember that the actual implementation details will depend on your specific use case, including how you handle inputs, outputs, and any external systems you interact with. The documentation provides more detailed guidance on defining assets and dependencies, which you can refer to for more in-depth examples and best practices.Sean Pool
03/13/2024, 10:05 PMScout
03/13/2024, 10:05 PMpython from dagster import job, op, In, Nothing @op def step1(): # Your logic for step1 @op def step2(start: Nothing): # Your logic for step2 @op def step3(start: Nothing): # Your logic for step3 @op def step4(start: Nothing): # Your logic for step4 @job def sequential_job(): step1_result = step1() step2(start=step1_result) step3(start=step2()) step4(start=step3())
In this example, step2
, step3
, and step4
each have an input parameter start
of type Nothing
. This parameter is used to establish a dependency on the previous step. When defining the job, you pass the output of each step (even though it's not used) to the next step to enforce the execution order. The Nothing
type tells Dagster that an op should wait for the specified dependency to complete before it begins execution, even though it does not consume any actual output from that dependency. This way, you can ensure that your steps are executed in the desired sequence without needing to pass data between them.