Oliver
08/15/2022, 12:17 PMPartitionMapping
(marked experimental). but it's not clear to me where that gets used and wether the io manager needs to handle it?sandy
08/15/2022, 6:12 PMasset_partitions_key_range
and asset_partitions_time_window
methods on InputContext
will return the correct values.
we don't yet offer the ability to have a single run / job that materializes assets with different partitions definitionsOliver
08/16/2022, 8:02 AMValueError: Tried to get asset partitions for an input that correponds to a partitioned asset that is not partitioned with a TimeWindowPartitionsDefinition
for the weekly partitioned asset in load_input
and working around that by using the range I later get dagster._core.errors.DagsterInvariantViolationError: Attempting to access asset_key, but it was not provided when constructing the OutputContext
I am defining the asset from a graph if that makes a differencesandy
08/16/2022, 3:39 PMOliver
08/17/2022, 6:24 AMλ reddit_scrape dagster --version
dagster, version 1.0.2
import os
import dagster
from dagster import AssetKey, AssetsDefinition, GraphIn, GraphOut, IOManager, MetadataValue, WeeklyPartitionsDefinition, asset, graph, io_manager, op, repository, with_resources, fs_io_manager, DailyPartitionsDefinition
from psaw import PushshiftAPI
import pandas as pd
import datetime as dt
START_DATE = dt.datetime(2022, 6, 23)
from pathlib import Path
class LocalFileSystemIOManager(IOManager):
"""Translates between Pandas DataFrames and CSVs on the local filesystem.
from -
<https://docs.dagster.io/guides/dagster/software-defined-assets#connecting-assets-to-external-services>
"""
def _get_fs_path(self, asset_key: AssetKey, partition_key: str) -> str:
rpath = os.path.join('data', partition_key, *asset_key.path) + ".csv"
rpath = Path(rpath).absolute()
rpath.parent.mkdir(parents=True, exist_ok=True)
return os.path.abspath(rpath)
def handle_output(self, context, obj: pd.DataFrame):
"""This saves the dataframe as a CSV."""
# context.add_output_metadata({
# 'asset_partition_key_range': MetadataValue.text(str(context.asset_partition_key_range)),
# 'asset_partitions_time_window': MetadataValue.text(str(context.asset_partitions_time_window)),
# })
fpath = self._get_fs_path(context.asset_key, context.partition_key)
obj.to_csv(fpath)
context.add_output_metadata({
"path": MetadataValue.path(os.path.abspath(fpath)),
"rows": <http://MetadataValue.int|MetadataValue.int>(len(obj))
})
def load_input(self, context):
"""This reads a dataframe from a CSV."""
<http://context.log.info|context.log.info>({
'asset_partition_key_range': str([key for key in context.asset_partition_key_range]),
'asset_partitions_time_window': str(context.asset_partitions_time_window),
})
# y m d
start_date, end_date = map(lambda x: dt.datetime(*map(int, x.split('-'))), context.asset_partition_key_range)
results = []
delta = dt.timedelta(days=1)
while start_date <= end_date:
fpath = self._get_fs_path(context.asset_key, start_date.strftime("%Y-%m-%d"))
start_date += delta
results.append(pd.read_csv(fpath))
return pd.concat(results, ignore_index=True)
@io_manager
def csv_io():
return LocalFileSystemIOManager()
def subreddit_scraper_factory(subreddit, start_date=None):
if start_date is None:
start_date= START_DATE
# start_date= dt.datetime(2005, 6, 23)
partition = DailyPartitionsDefinition(
start_date,
minute_offset=0,
timezone=None,
fmt=None,
end_offset=0,
# tags_for_partition_fn=None
)
@asset(
name=f'subreddit_{subreddit}',
io_manager_key='pandas_to_table',
partitions_def=partition
)
def subreddit_asset(context):
# partition ~ `2022-08-10-00:00`
partition = context.asset_partition_key_for_output()
<http://context.log.info|context.log.info>(
f"Processing asset partition '{partition}'"
)
y, m, d = partition.split('-')
y, m, d = map(int, (y, m, d))
# h, min = map(int, t.split(':'))
start_time = dt.datetime(y, m, d)
end_time = start_time + dt.timedelta(days=1)
api = PushshiftAPI()
posts_generator = api.search_submissions(
after=int(start_time.timestamp()),
before=int(end_time.timestamp()),
subreddit=subreddit,
filter=['id', 'url','author', 'title', 'subreddit'],
# limit=
)
posts_dicts = list(map(lambda x: x.d_, posts_generator))
posts = pd.DataFrame(posts_dicts)
posts = posts.set_index('id')
<http://context.log.info|context.log.info>(api.metadata_)
return posts
return subreddit_asset
def dataset_factory(subreddits, start_date=None):
if start_date is None:
start_date = START_DATE
@op
def subgraph(df):
return df
@op
def collect(dfs):
return dfs
partition = WeeklyPartitionsDefinition(
start_date,
minute_offset=0,
timezone=None,
# fmt="%d-%m-%Y",
end_offset=0,
# tags_for_partition_fn=None
)
@graph(
name='dataset',
ins={
**{f'subreddit_{subreddit}':GraphIn(f'subredit_{subreddit}') for subreddit in subreddits}
},
out={"dataset": GraphOut(),}
)
def outer(**kwargs):
# keys, values = kwargs.items()
return collect(list(map(subgraph, kwargs.values())))
outer = AssetsDefinition.from_graph(outer,
keys_by_input_name= {
**{f'subreddit_{subreddit}':AssetKey(f'subreddit_{subreddit}') for subreddit in subreddits},
},
partitions_def=partition,
# io_manager_key='pandas_to_table', ## How to set io_manager_key for asset from graph?
resource_defs={
"io_manager": csv_io,
},
keys_by_output_name={
'dataset': AssetKey('dataset')
}
# partition_mappings=
)
return outer
@repository
def repository():
return with_resources(
[
subreddit_scraper_factory('ProgrammerHumor'),
subreddit_scraper_factory('science'),
subreddit_scraper_factory('brisbane'),
subreddit_scraper_factory('machinelearning'),
dataset_factory((
'ProgrammerHumor',
'science',
'brisbane',
'machinelearning',
))
],
resource_defs={
"pandas_to_table": csv_io,
},
)
requirements.txt
psaw==0.1.0
dagster==1.0.2
dagit==1.0.2
pandas==1.4.3
AssetsDefinition.from_graph
applies to internal steps and the io manager key needs to be set on the graph out which doesn't accept it and my io manager doesnt work for internal steps.
I monkey patched this in
class GraphOut(NamedTuple("_GraphOut", [("description", PublicAttr[Optional[str]]), ("io_manager_key", PublicAttr[Optional[str]])])):
"""
Represents information about the outputs that a graph maps.
Args:
description (Optional[str]): Human-readable description of the output.
"""
def __new__(cls, description: Optional[str] = None, io_manager_key: Optional[str] = None):
return super(GraphOut, cls).__new__(cls, description=description, io_manager_key=io_manager_key)
def to_definition(self, name: Optional[str]) -> "OutputDefinition":
return OutputDefinition(name=name, description=self.description, io_manager_key=self.io_manager_key)
sandy
08/19/2022, 4:29 AMOliver
08/19/2022, 5:22 AM