hello :wave: I have a group of sql server resource...
# ask-community
d
hello 👋 I have a group of sql server resources that I would like to distribute across a fanned out op. I was thinking the best way to do this would be to create some sort of
compute_server_round_robin
resource that would get a compute server resource based on the modulus of the total number of compute servers and the index of the fanned out op. Is there a way to get a resource to use based on a value passed into a resource? Or do I need to rethink my strategy?
dagster bot responded by community 1
s
Resources are instantiated at the beginning of a run. So if you had a resource representing a SQL server which actually selected the "right" or "next" SQL Server to use on instantiation, then every time you run a job the SQL Server resource would select a SQL Server and make that available to the ops in that run.
z
Resources aren't mapping-key / fan-out index aware. Instead, when generating your mapping_key for your DynamicOutputs in your fan-out you could use an integer index, then have your sql server resource be a resource factory that uses the mapping key to determine which sql server to use. something like
Copy code
@resource
def sql_server_round_robin(context):

    def factory(index):
        # custom logic for generating server name from index
        server = get_server_using_index(index)
        cnxn = pyodbc.connect("Driver={SQL Server Native Client 11.0};"
                              f"Server={server};"
                              "Database=db_name;"
                              "Trusted_Connection=yes;")
        return cnxn
    return factory

@op(required_resource_keys={"sql_server_selector"})
def fanned_out_op(context: OpExecutionContext):
    connection = context.resources.sql_server_selector(context.get_mapping_key())
d
Thanks @Zach that's exactly what I was thinking of, but didn't know if resources took parameters. Appreciate it!
I'm using that code you mentioned above, but with a twist. I have
required_resource_keys
defined in the resource for previously created connections that we create dynamically. However, on code load I'm getting an error that those resources don't exist. What am I missing here?
Here's the code where we generate the resources dynamically for a bunch of sql servers:
Copy code
for index, row in databases.iterrows():
    server = str(row["ServerName"])

    print(f"{str(row['DatabaseLabel']).replace('-','_')}_resource")
    RESOURCES[
        f"{str(row['DatabaseLabel']).replace('-','_')}_resource"
    ] = sr.sql_server_resource.configured(
        {"server": server, "database": row["DatabaseName"]}
    )
And here's the code below that where I'm trying to create my round robin resource:
Copy code
@resource(
    required_resource_keys={
        "sql_01_resource,sql_02_resource,sql_03_resource,sql_04_resource,sql_05_resource,sql_06_resource,sql_07_resource,sql_08_resource"
    }
)
def sql_server_round_robin(context):
    def factory(index):
        # custom logic for generating server name from index
        cnxn = context.resources.sql_01_resource
        match (index % 2):
            case 1:
                cnxn = context.resources.sql_02_resource
            case 2:
                cnxn = context.resources.sql_03_resource
            case 3:
                cnxn = context.resources.sql_04_resource
            case 4:
                cnxn = context.resources.sql_05_resource
        return cnxn

    return factory
error:
Copy code
Resource with key 'sql_01_resource,sql_02_resource,sql_03_resource,sql_04_resource,sql_05_resource,sql_06_resource,sql_07_resource,sql_08_resource' required by resource with key 'sql_server_selector', but not provided.
z
hmm I'm not sure it's necessary to formally generate the resources dynamically outside of the factory function. I would just embed that logic within the factory function, and then ops that depend on the resource just depend on
sql_server_round_robin
and dynamically generate their server using that as a factory. this would require directly instantiating the sql server object within
sql_server_round_robin
with the server / database, instead of using the
.configured()
api on another resource. more like
Copy code
@resource()
def sql_server_round_robin(context):
    RESOURCES = []
    for index, row in databases.iterrows():
        server = str(row["ServerName"])

        print(f"{str(row['DatabaseLabel']).replace('-','_')}_resource")
        RESOURCES.append(pyodbc.connect("Driver={SQL Server Native Client 11.0};"
                              f"Server={server};"
                              "Database={row['DatabaseName']};"
                              "Trusted_Connection=yes;"))

    def factory(index):
        # custom logic for generating server name from index
        return RESOURCES[index%len(RESOURCES)]

    return factory
but maybe I'm missing something. also, isn't
index % 2
only ever going to match 0 or 1?
d
yeah I'm just beginning a benchmarking test to see if we can do this with a subset of our data. we have all our database information in a centralized database, so I'm reading that in and generating the sql server resources from that. If it's not possible to use those dynamically generated resources, I'll have to look at doing it the way you mentioned above (generate the connections inside the round robin resource), but it feels like duplicating code
Ah sorry didn't address the other part of your comment. We might have a use-case in the future to connect to a single of the compute sql servers, and we'd want those resources registered with dagster as a standalone connection. But I can do both I think, just need to restructure our resources
z
One problem I didn't initially point out is that I don't think you can use the
.configured()
API within a resource definition on one of its dependent resources -
.configured()
is only available on ResourceDefinitions - when you access
context.resources.resource_name
you're accessing an object that has been instantiated as a result of resolving a ResourceDefinition - in other words you're accessing whatever object you're returning from the resource definition mapped to
context.resources.resource_name
, not the ResourceDefinition itself. you can use
.configured()
on resources at a global level or within
@graph
/
@job
definitions because the definition objects have not