Bart
06/28/2023, 9:35 AMasset
for each endpoint but then I need to copy paste the same boilerplate code:
@asset
def fetch_one():
return requests.get("<http://example.com/one>").json()
@asset
def fetch_two():
return requests.get("<http://example.com/two>").json()
@asset
def clean_one(fetch_one):
df = pd.json_normalize(fetch_one)
return df.dropna()
@asset
def clean_two(fetch_two):
df = pd.json_normalize(fetch_two)
return df.dropna()
What would be the recommended way to avoid that?
• Do I work with a config
like https://docs.dagster.io/concepts/configuration/config-schema where each endpoint is a different RunConfig
? That seems a bit heavy.
• Another option is maybe with context
and multi-asset
like https://docs.dagster.io/concepts/assets/multi-assets#subsetting-multi-assets ?
• Or do I use an asset-factory pattern? (Any example of this?)Harry Park
06/28/2023, 11:34 AMdef shell_script_runner(context, script_name):
execute_shell_command(f"{script_name}")
@op
def script1(context):
shell_script_runner(context, "hello_world.sh")
@op
def script2(context):
shell_script_runner(context, "hello_dagster.sh")
I’m thinking you could do something similar for assets.Bart
06/28/2023, 12:19 PMHarry Park
06/28/2023, 12:41 PM@op
def data_reporter(context):
pass
@op(ins={"start": In(Nothing)})
def data_transform(context):
pass
@job()
def Sample_etl_job():
data = data_transform(start=data_reporter())
I think if you’d like a better answer, I’d suggest putting some example of your boilerplate asset code so you can get a better answer.Bart
06/28/2023, 12:49 PM@asset
def fetch_one():
return requests.get("<http://example.com/one>").json()
@asset
def fetch_two():
return requests.get("<http://example.com/two>").json()
@asset
def clean_one(fetch_one):
df = pd.json_normalize(fetch_one)
return df.dropna()
@asset
def clean_two(fetch_two):
df = pd.json_normalize(fetch_two)
return df.dropna()
Now, I have many one, two, .... endpointsHarry Park
06/28/2023, 1:03 PMBart
06/28/2023, 1:04 PMHarry Park
06/28/2023, 1:24 PMendpoint_partitions_def = StaticPartitionsDefinition(["one", "two", "three"])
@asset(partitions_def=endpoint_partitions_def)
def raw_json_asset():
…
@asset(partitions_def=endpoint_partitions_def)
def cleaned_asset(raw_json_asset):
…
Bart
06/28/2023, 1:31 PMsean
06/28/2023, 1:33 PMendpoints = ["one", "two"]
assets=[]
# necessary to create a new scope to make sure endpoint varies
def make_fetch(endpoint):
@asset(name=f"fetch_{endpoint}")
def fetch():
return requests.get("<http://example.com/{endpoint}>").json()
return fetch
for endpoint in endpoints:
assets.append(make_fetch(endpoint))
If you don’t want to manage them as separate assets, then partitions make sense as you and Harry have been discussing.Harry Park
06/28/2023, 1:37 PM@asset(partitions_def=endpoint_partitions_def)
def raw_json_asset():
return requests.get(f"<http://example.com/{get_asset_partition_key()}>").json()
get_asset_partition_key
is a guess at a methodBart
06/28/2023, 1:39 PMsean
06/28/2023, 1:45 PMif you don’t manage them as separate assets, @sean could you show how you would do the body of the raw_json_asset asset? I’m trying to understand how you get the partition_key used.Use `context.partition_key`:
@asset(partitions_def=endpoint_partitions_def)
def raw_json_asset(context: AssetExecutionContext):
return requests.get(f"<http://example.com/{context.partition_key}>").json()
Ah I see. So with seperate assets you mean that they represent different nodes in the graph? With partitions, it will be one node (but parametrized in a way).Yes.
So the last option, would not allow parallel execution?The answer here is complicated. It will require multiple runs to materialize the two different partition keys-- those runs can be in parallel though. With Option 1, both endpoints can be done within the same run in parallel.
Bart
06/28/2023, 1:47 PMHarry Park
06/28/2023, 1:51 PMassets=[]
for endpoint in endpoints:
assets.append(make_fetch(endpoint))
you’d then add those assets in the Definitions block in the __init__.py
?sean
06/28/2023, 1:53 PMyou’d then add those assets in the Definitions block in the init.py ?exactly