Hey everyone. I have a job which transfers certain...
# ask-community
n
Hey everyone. I have a job which transfers certain files from a FTP server to S3 bucket and I have broken down this into 3 software-defined-assets. Each asset needs to establish an FTP connection for which I created a configurable resource, which is being shared with the assets. Now when I execute the job, the assets are materialised by using this resource. However, I observe that the configurable resource is instantiated before each asset materialisation and not just one instance is used by all. Below are attached the code snippets for reference. Can someone please help me as how do I make a single instance of resource be used by all the assets in job The resource
Copy code
class FTPConnectionResource(ConfigurableResource):
    ftp_host: str
    ftp_username: str
    ftp_password: str
    _ftp: Any = PrivateAttr()

    def setup_for_execution(self, context) -> None:
        self._ftp = ftplib.FTP(self.ftp_host, self.ftp_username, self.ftp_password, timeout=600)
        <http://self._ftp.af|self._ftp.af> = socket.AF_INET6

    def reconnect(self):
        try:
            self._ftp.voidcmd("NOOP")
        except AttributeError as e:
            get_dagster_logger().error(e)
            self._ftp = ftplib.FTP(self.ftp_host, self.ftp_username, self.ftp_password)
The job
Copy code
daily_config = RunConfig(
    ops={
        'regions': PeriodConfig(period="Daily"),
        'data_types': PeriodConfig(period="Daily"),
    }
)

transfer_job = define_asset_job(name="transfer_job", config=daily_config)
assets in brief
Copy code
@asset
def regions(pool: FTPConnectionPoolResource, config: PeriodConfig) -> dict:

----------------------------------------------------------------------------

@asset
def data_types(regions, pool: FTPConnectionPoolResource, config: PeriodConfig):

-------------------------------------------------------------------------------

@asset
def files(data_types, pool: FTPConnectionPoolResource, config: PeriodConfig) -> dict:
Definition
Copy code
defs = Definitions(
    assets=all_assets,
    jobs=[data_transfer_job, transfer_job],
    resources={
        'pool': FTPConnectionPoolResource(
            ftp_host='xxxxxxxxx',
            ftp_username='xxxxxxxxxxx',
            ftp_password='xxxxxxxxx',
        )
    }
)
cc: @Ankit Singhal
c
this is because by default, we use a multiple-process spawning execution model, which can’t share python state across process boundaries. If you need to share that state, it might be worth implementing a resource that can check if it’s already been instantiated by another process, and not re-initialize all of its state. An alternative would be to switch to in-process execution, which only spins up resources once