Is there a way to have "dynamic inputs"? Specifica...
# ask-community
b
Is there a way to have "dynamic inputs"? Specifically, I am trying to create a asset the reads in all my airbyte syncs and unions the data together
🤖 1
Maybe it is "fan-in"? But not sure how that works from the docs
s
Hi Brent, “Fan-in” (in the way it’s referred to in the Dagster docs) isn’t supported for assets. I can think of two approaches that might work for you: • When constructing your
Definitions
, you could dynamically compute the keys of the upstream assets and place them in the
deps
arg of the “union asset”. • You could represent the upstreams as a single partitioned asset and have an
AllPartitionsMapping
forming a many-to-one relationship between the upstream partitioned asset and union asset.
b
Currently, I am using the
load_assets_from_airbyte_instance
function. I should be able to pull all the Asset keys from there, right? And if so, how do I add those as
deps
to the union? Still a bit of a Dagster n00b
Copy code
> <https://docs.dagster.io/_apidocs/assets>

deps (Optional[Sequence[Union[AssetsDefinition, SourceAsset, AssetKey, str]]]) – The assets that are upstream dependencies, but do not correspond to a parameter of the decorated function.
Can you do a bit of translation on that for me?
s
Hmm I’m not sure off the top of my head how to do this with the return value of
load_assets_from_airbyte_instance
, but I’ve put in a request for help.
b
But can you explain
deps
vs
ins
a bit more for me as well?
o
Hi @Brent Shulman -- for the specific
load_assets_from_airbyte_instance
function, this sort of workflow isn't really possible, as the way these airbyte assets are loaded differs from traditional assets (essentially, the definitions are loaded in lazily, to avoid having all subprocesses need to hit the airbyte instance in order to proceed, and therefore aren't available at the time you're defining other assets). one alternative would be to use
load_assets_from_airbyte_project
(https://docs.dagster.io/integrations/airbyte#loading-airbyte-asset-definitions-from-yaml-config), but if you're stuck with
from_instance
I think the main option would be to hardcode these dependencies unfortunately
a
@Brent Shulman @owen I have a similar problem. We are now managing Airbyte with Terraform, so we use the
load_assets_from_airbyte_instance
function to generate SDAs in Dagster. However, we then want a downstream job to run for each SDA independently. The initial idea was if we had a list of asset keys from the ``load_assets_from_airbyte_instance``, we could just write an asset constructor and then iterate over the keys to define the upstream dependencies. I'm assuming this isn't possible? The only other thing I can think of, although I'm still a newbie with Dagster, is to hit the Airbyte API independently, get all connection and stream name data. Then reconstruct the AssetKey based on how we define it in the
connection_to_asset_key_fn
arguments for the ``load_assets_from_airbyte_instance`` function. Then we can do the above.
o
Hi! That "independently hit the API" solution would mostly work, although it does mean that every single step launched in that code location will independently need to hit the airbyte API (which may cause perf issues). I'm not familiar with the airbyte terraform integration, but is it possible that you'd be able to parse the terraform config file to determine which asset keys you'd expect to exist?
a
Ok, thanks. In airbyte, if you refresh a source schema (UI), it will pick up new namespaces and stream names. So this isn't configured within Terraform, i.e. we don't need to hard code schemas or table names for a database source. Providing the
load_assets_from_airbyte_instance
runs periodically (code location reload), this dagster-airbyte integration will pickup the new tables and create SDAs. This is what we want as it removes the need to hard code source schemas/table names. However, that means we do not know the asset keys to expect at the Terraform level, hence why I think we need the API calls. Of course, I might be getting this wrong? I mean ultimately, we just want to create an asset downstream of each asset created by the airbyte integration (one-to-one airbyte SDA to our downstream SDA). Are there any other approaches that come to mind? Or is this the way to go given the requirements in the meantime? Thanks
o
so code location reloading is not the only time
load_assets_from_airbyte_instance
executes -- it executes any time any of your code is executed, e.g. when a computation is launched in a subprocess. the integration works by doing a single API call when the code location is reloaded, then forwarding the results of that API call to the subprocesses when they launch, resulting in a single API call per code location reload. if you were to make your own API call outside of the integration, you'd get a call for each subprocess launch, which generally is not recommended (but won't immediately break or anything). Another option to avoid the large number of API calls would be to bake that API call into your CI/CD process (e.g. when building your docker image you run a script that makes that API call and serializes it in a way that dagster understands)