Andres Crucetta
04/12/2023, 3:12 PMimport pytest
from dagster import EnvVar, build_op_context
from orchestrator import DatabaseResource
from orchestrator.ops import load_sftp_to_snowflake, DataTransferConfig
@pytest.fixture
def data_config():
"""
Op configuration parameters.
"""
return DataTransferConfig(
raw_table="RAW_CERPASS_MAC",
azure_stage="STAGE_CERPASS_MAC",
file_format="PLAIN_TEXT"
)
@pytest.fixture
def db_resource():
"""
Snowflake database resource.
"""
return DatabaseResource(
user=EnvVar("SNOWFLAKE_USER"),
account=EnvVar("SNOWFLAKE_ACCOUNT"),
password=EnvVar("SNOWFLAKE_PASSWORD"),
warehouse=EnvVar("SNOWFLAKE_WAREHOUSE_NAME"),
)
def test_load_sftp_to_snowflake(data_config: DataTransferConfig,
db_resource: DatabaseResource):
"""
Test the copy into op.
"""
context = build_op_context()
result = load_sftp_to_snowflake(context, config=data_config, db_conn=db_resource)
assert result is True
defs = Definitions(
sensors=all_sensors,
schedules=[upload_data_cerpass_mac],
jobs=BindResourcesToJobs([upload_data]),
resources={
"db_conn": DatabaseResource(
account=EnvVar("SNOWFLAKE_ACCOUNT"),
user=EnvVar("SNOWFLAKE_USER"),
password=EnvVar("SNOWFLAKE_PASSWORD"),
warehouse=EnvVar("SNOWFLAKE_WAREHOUSE_NAME"),
)
},
)
Charlie Bini
04/12/2023, 8:10 PMFrederik Löw
04/25/2023, 1:02 PM