Remco Loof
07/26/2022, 2:23 PMXavier BALESI
07/26/2022, 2:46 PMRemco Loof
07/27/2022, 6:53 AMXavier BALESI
07/27/2022, 12:25 PMimport csv
import io
from csv import DictReader
from typing import List
import requests
from dagster import SolidExecutionContext, io_manager
from dagster.core.definitions.asset_in import AssetIn
from dagster.core.definitions.decorators.asset_decorator import asset
from dagster.core.definitions.decorators.repository_decorator import repository
from dagster.core.definitions.resource_definition import resource
from dagster.core.definitions.unresolved_asset_job_definition import define_asset_job
from dagster.core.execution.context.input import InputContext
from dagster.core.execution.context.output import OutputContext
from dagster.core.execution.with_resources import with_resources
from dagster.core.storage.fs_io_manager import fs_io_manager
from dagster.core.storage.io_manager import IOManager
@resource()
def titanic_source(context):
return (
"<https://raw.githubusercontent.com/datasciencedojo/datasets/master/titanic.csv>"
)
class CsvIOManager(IOManager):
def __init__(self, file_path: str) -> None:
self._file_path = file_path
super().__init__()
def load_input(self, _: InputContext) -> List[dict]:
with open(self._file_path) as f:
r = DictReader(f, dialect="unix")
return [row for row in r]
def handle_output(self, context: OutputContext, obj: List[dict]) -> None:
<http://context.log.info|context.log.info>(f"obj = {obj}")
with open(self._file_path, "w") as f:
writer = csv.DictWriter(f, fieldnames=obj[0].keys())
writer.writeheader()
for row in obj:
writer.writerow(row)
@io_manager
def csv_io_manager():
return CsvIOManager("data.csv")
@asset(
name="extract_asset",
io_manager_key="extract_io_manager",
required_resource_keys={"source_url"},
)
def extract_asset(context: SolidExecutionContext):
url = context.resources.source_url
response = requests.get(url)
if response.status_code != 200:
raise requests.RequestException(f"cannot get url {url}")
context.add_output_metadata({"url": url})
return response.text
@asset(
name="transform_asset",
ins={"extract": AssetIn("extract_asset", input_manager_key="extract_io_manager")},
io_manager_key="transform_io_manager",
)
def titanic_survivors(context: SolidExecutionContext, extract) -> List[dict]:
file = io.StringIO(extract)
data = DictReader(file, dialect="unix")
survivors = [row for row in data if row["Survived"] == "1"]
print(f"survivor = {survivors}")
return survivors
assets_with_resources = with_resources(
[extract_asset, titanic_survivors],
resource_defs={
"extract_io_manager": fs_io_manager.configured({"base_dir": "."}),
"transform_io_manager": csv_io_manager,
"source_url": titanic_source,
},
)
all_assets_job = define_asset_job(
name="all_assets_job",
description="run all assets",
)
@repository
def repo():
return [assets_with_resources, all_assets_job]
Remco Loof
07/27/2022, 12:58 PMXavier BALESI
07/27/2022, 1:00 PMRemco Loof
07/27/2022, 1:07 PMsandy
07/27/2022, 3:53 PM1. Can an asset exists out of ops? E.g. one for each step in the ETL pipeline.Am I understanding correctly that you want to be able to use multiple ops to create a single asset? Graph-backed assets let you accomplish this: https://docs.dagster.io/concepts/assets/software-defined-assets#graph-backed-assets. @Remco Loof - do you remember which docs pages you used to learn what you've learned so far? I'm asking because I'd like to edit them to clear up this question
Remco Loof
07/28/2022, 7:23 AM