Nidhish Sharma
07/12/2023, 11:57 AMclass FTPConnectionResource(ConfigurableResource):
ftp_host: str
ftp_username: str
ftp_password: str
_ftp: Any = PrivateAttr()
def setup_for_execution(self, context) -> None:
self._ftp = ftplib.FTP(self.ftp_host, self.ftp_username, self.ftp_password, timeout=600)
<http://self._ftp.af|self._ftp.af> = socket.AF_INET6
def reconnect(self):
try:
self._ftp.voidcmd("NOOP")
except AttributeError as e:
get_dagster_logger().error(e)
self._ftp = ftplib.FTP(self.ftp_host, self.ftp_username, self.ftp_password)
The job
daily_config = RunConfig(
ops={
'regions': PeriodConfig(period="Daily"),
'data_types': PeriodConfig(period="Daily"),
}
)
transfer_job = define_asset_job(name="transfer_job", config=daily_config)
assets in brief
@asset
def regions(pool: FTPConnectionPoolResource, config: PeriodConfig) -> dict:
----------------------------------------------------------------------------
@asset
def data_types(regions, pool: FTPConnectionPoolResource, config: PeriodConfig):
-------------------------------------------------------------------------------
@asset
def files(data_types, pool: FTPConnectionPoolResource, config: PeriodConfig) -> dict:
Definition
defs = Definitions(
assets=all_assets,
jobs=[data_transfer_job, transfer_job],
resources={
'pool': FTPConnectionPoolResource(
ftp_host='xxxxxxxxx',
ftp_username='xxxxxxxxxxx',
ftp_password='xxxxxxxxx',
)
}
)
cc: @Ankit Singhalchris
07/12/2023, 4:26 PM