Robin Lindner
07/31/2023, 1:10 PMfrom datetime import datetime
import os
from stat import S_ISDIR, S_ISREG
import zipfile
from paramiko import SFTPClient, SSHClient
import pandas as pd
import requests
from dagster import FilesystemIOManager, FreshnessPolicy, Out, op, asset, graph, with_resources, OpExecutionContext
from dagster_ssh import SSHResource, ssh_resource
def list_files(sftp_client: SFTPClient, path='.') -> list[tuple[str, datetime]]:
file_list = []
for entry in sftp_client.listdir_attr(path):
mode = entry.st_mode
full_path = os.path.join(path, entry.filename)
if S_ISDIR(mode):
file_list.extend(list_files(sftp_client, full_path))
elif S_ISREG(mode):
creation_date = datetime.fromtimestamp(entry.st_mtime)
file_list.append((full_path, creation_date))
return file_list
@asset(required_resource_keys={'cj_ssh'}, freshness_policy=FreshnessPolicy(maximum_lag_minutes=24 * 60))
def file_paths_from_sftp(context: OpExecutionContext) -> list[str]:
cj_ssh: SSHResource = context.resources.cj_ssh
ssh_client: SSHClient = cj_ssh.get_connection()
sftp_client = ssh_client.open_sftp()
files = list_files(sftp_client)
# Sort by creation date
files.sort(key=lambda x: x[1])
<http://context.log.info|context.log.info>(f"Found {len(files)} files on SFTP server")
return [file[0] for file in files]
@asset(required_resource_keys={'cj_ssh'})
def files_from_sftp(context: OpExecutionContext, file_paths_from_sftp: list[str]) -> list[tuple[str, str]]:
cj_ssh: SSHResource = context.resources.cj_ssh
ssh_client: SSHClient = cj_ssh.get_connection()
sftp_client = ssh_client.open_sftp()
if not os.path.exists("zips"):
os.mkdir("zips")
for file_path in file_paths_from_sftp:
local_file = "zips/" + os.path.basename(file_path)
<http://context.log.info|context.log.info>(f"Downloading file from {file_path} to {local_file}")
sftp_client.get(file_path, local_file)
<http://context.log.info|context.log.info>(f"Downloaded file from {file_path} to {local_file}")
yield (local_file, file_path)
@asset()
def csvs_from_sftp(context: OpExecutionContext, files_from_sftp: list[tuple[str, str]]) -> list[str]:
if not os.path.exists("csvs"):
os.mkdir("csvs")
for file in files_from_sftp:
local_file = file[0]
remote_file = file[1]
with zipfile.ZipFile(local_file,"r") as zip_ref:
for file in zip_ref.filelist:
if file.filename.lower().endswith(".txt") or file.filename.lower().endswith(".csv"):
csv_path = zip_ref.extract(file, "csvs")
yield csv_path
else:
<http://context.log.info|context.log.info>(f"Skipping file {file.filename} in zip {local_file}")
@asset()
def csv_data_from_csvs(csvs_from_sftp: list[str]) -> list[pd.DataFrame]:
for csv in csvs_from_sftp:
yield pd.read_csv(csv)