Hi everyone! I’m very new to Dagster, but I’m real...
# ask-community
i
Hi everyone! I’m very new to Dagster, but I’m really loving its high-level concepts and fundamental building blocks. I’m also a big proponent of easily testing and validating changes without impacting production, and that’s something Dagster is really getting right. I’m one of the creators of SQLMesh - the open-source dataops / data transformation platform. We’ve integrated SQLMesh with Airflow and are now looking into integration with Dagster too. The integration itself should be quite similar to how Dagster integrates with dbt. SQLMesh, however, is quite different from dbt due to its stateful nature. Therefore, I have a few questions for which I couldn’t find answers easily in documentation: 1. Can assets be created dynamically based on some state outside Dagster, or can they only be defined statically as Python code? 2. Is there any kind of persistent state available that can shared between and can be accessed by any asset / op / job? Like eg. XComs or Variables in Airflow. My current assumption is that I can achieve the desired behavior by creating a custom I/O manager through which the state can be accessed and updated, but was wondering whether there is anything built-in available. What if my asset / op updates this state as a side effect? Should it be a multi-asset then? 3. Is there any existing Plugin management system in place which can be used to extend the Dagster UI / GraphQL? Apologies if my questions sound ignorant or naive. I’m coming from an Airflow background, so it’s the only point of reference I have. Happy to provide more details on any of my questions. Thanks!
🌈 1
❤️ 2
s
Hi Iaroslav - I've looked at SQLMesh a little bit and it looks cool
Can assets be created dynamically based on some state outside Dagster, or can they only be defined statically as Python code?
The set of assets is determined at "definition time", i.e. when Dagster loads your code, which typically happens ~once per time you deploy your code. One way to get more dynamicity is with dynamically-partitioned assets. An asset can comprise a set of partitions that change over time.
Is there any kind of persistent state available that can shared between and can be accessed by any asset / op / job? Like eg. XComs or Variables in Airflow. My current assumption is that I can achieve the desired behavior by creating a custom I/O manager through which the state can be accessed and updated, but was wondering whether there is anything built-in available. What if my asset / op updates this state as a side effect? Should it be a multi-asset then?
We don't have a repository for mutable global state. However, asset materialization events can include metadata, which can be accessed elsewhere. If you have more detail on what you're trying to accomplish, I might be able to make a recommendation.
Is there any existing Plugin management system in place which can be used to extend the Dagster UI / GraphQL?
Not currently. cc @dish to correct me if I'm wrong. Curious - how would you be interested in extending it?
d
There is no plugin system for the Dagster UI. I’d also be interested in hearing what you have in mind there. :)
i
Thank you, Sandy!
If you have more detail on what you’re trying to accomplish, I might be able to make a recommendation.
In SQLMesh users define their transformation logic as models (just like dbt). Every time a user creates or changes a model, a new version of this model is created. This means that multiple versions of the same model can coexist within SQLMesh. Thus, SQLMesh needs state to keep all versions of all models, not just the ones that are currently in the user’s repo. Similarly to Dagster, SQLMesh tracks which data intervals were processed by each version of each model. So when the asset is materialized, SQLMesh needs to update its state to record which interval has been processed.
Curious - how would you be interested in extending it?
SQLMesh users’ apply changes they made using the
sqlmesh plan
command. The command may launch execution of individual models as well as updates the metadata about the target environment in its state. On Airflow it also creates custom DAG specs for new model versions which are picked up by the Airflow scheduler and transformed into actual DAGs. We used the Airflow’s plugin system to extend its REST API with our custom endpoint that allows clients to submit plan applications. The endpoint creates a spec for the plan application DAG which is stored in the state and then picked up by the Airflow scheduler. This is how we’re able to create DAGs dynamically based on client’s API calls. I think the 3rd item is the least impactful one, since we can potentially achieve similar behavior by launching a custom op. We really need the 1st item if we want to control which model versions get transformed into assets, since model versions that are currently in the user’s code base may not necessarily be the ones that are promoted in production. We also need the 2nd item due to the side effects I described above. Though I believe the interval tracking use case may be addressed using the combination of asset (the model output itself) and op (updates the intervals).
Btw, all model versions in SQLMesh are technically separate assets, since each version has its own table associated with it
So I assume I can do something like this:
Copy code
def create_dynamic_asset(...): 
    state = _fetch_state()

    @asset
    def _dynamic_asset():
        // use state

    return _dynamic_asset
Then every time the code is evaluated by Dagster assets will be generated dynamically based on the current state.
j
@dish @rex Any thoughts on setting up a chat with Iaroslav on building out a SQLMesh integration with Dagster? I have a few clients working on migrating their dbt workloads to sqlmesh and their only pro for Airflow at the moment is a pre-built integration with Dagster. They'd rather be in this ecosystem.
s
Then every time the code is evaluated by Dagster assets will be generated dynamically based on the current state.
Right.
In SQLMesh users define their transformation logic as models (just like dbt). Every time a user creates or changes a model, a new version of this model is created. This means that multiple versions of the same model can coexist within SQLMesh. Thus, SQLMesh needs state to keep all versions of all models, not just the ones that are currently in the user’s repo.
Another way to consider achieving this in Dagster is with "dynamic partitions". There would be a "dynamic partition" for each model version. It might be easiest to find a time for a call to chat about the different options?
i
thanks @Jordan Fox and @sandy! A call would be fantastic 👍 I’ll DM you