Timo Klockow
12/27/2022, 10:50 AMdaniel
12/27/2022, 1:02 PMdaniel
12/27/2022, 3:28 PMAlec Koumjian
12/27/2022, 4:07 PMBartosz Kopytek
12/27/2022, 4:44 PMSean Han
12/27/2022, 6:55 PMYassine AIT BAHA
12/27/2022, 8:57 PMTerry Lines
12/28/2022, 1:12 AMdata1 = SourceAsset(key='data1')
data1 = SourceAsset(key='data2')
@op(out=Out(io_manager_key='database_io_manager',asset_key=AssetKey('my_table_name),)
)
def infutor_address_tci(tci: str):
return tci
@op(
ins={
'start': In(Nothing),
'tch': In()
},
out=Out(
io_manager_key='kinetica_io_manager',
asset_key=AssetKey(['kinetica', 'address', 'infutor_address']),
metadata={
'load_options': {
'batch_size': '50000',
# tch only has a subset of data, hence the links to out of bounds (901....)
'columns_to_load': '29, 8, 10..20,24,901..902,,32,904',
'file_type': 'csv',
'text_delimiter': '\t',
'text_has_header': 'false',
'text_quote_character': '~',
'error_handling': 'permissive',
}
}
)
)
def infutor_address_tch(tch:str):
return tch
@op(required_resource_keys={'db'})
def update_infutor_address(context: OpExecutionContext, table):
# remove lat/lon not at geocode rooftop, line interpolated, or parcel_centroid level.
context.resources.db.execute_sql(f"""
UPDATE {table.name}
SET
latitude=NULL,
longitude=NULL
where
(latitude is not null or
longitude is not null) and
nvl(geocode not in ('01','02'),true)
"""
)
return None
@graph()
def infutor_address(tci: str, tch: str):
table = infutor_address_tch(tch, start=infutor_address_tci(tci))
return update_infutor_address(table)
infutor_address = AssetsDefinition.from_graph(
infutor_address,
keys_by_input_name={
'tci': AssetKey(['s3', 'infutor', 'tci']),
'tch': AssetKey(['s3', 'infutor', 'tch'])
},
key_prefix=['kinetica', 'address'])
Rafael Figueiredo
12/28/2022, 1:51 AMNicholas Buck
12/28/2022, 5:13 AMPawan Kumar Meena
12/28/2022, 5:16 AMTimo Klockow
12/28/2022, 1:14 PMDefinition
object? Like with using the @repository
decorator?
In the source code I can see it gets a default value of name=SINGLETON_REPOSITORY_NAME
Can we override it somehow or is this only possible using the decorator?Kulanjith Deelaka
12/28/2022, 3:25 PMasset_key
parameter of AssetMaterialization
and AssetObservation
is it?Timo Klockow
12/28/2022, 5:12 PMdagster._check.CheckError: Invariant failed. Description: Assets defined for node 'DailyJob' have a partitions_def of Daily, starting 2022-10-20 UTC., but job 'HourlyJob' has non-matching partitions_def of Hourly, starting 2022-10-20-00:00 UTC..
Jose Estudillo
12/28/2022, 5:37 PMhooks
or run_failure_sensor
there are for jobs/ops. What is the best Approach for this?Mitchell Hynes
12/28/2022, 5:51 PMdaniel
12/28/2022, 6:16 PMOleksandr (Alex) Dimov
12/28/2022, 7:26 PMDaniel Mosesson
12/28/2022, 8:26 PMdagit -w workspace.yaml
works
• dagit -w ../other/folder/workspace.yaml
works
but `dagit -w worspace.yaml -w ../other/folder/workspace.yaml`hangs after starting the code server for the first package
Doesn't matter which order I put them in, both workspaces have only one python package specified (with attendant executable paths). Same issue if I try putting the workspaces together. No additional logs show up when I add --log-level trace
Any idea what could be going on?Ismael Rodrigues
12/28/2022, 9:16 PMRafael Figueiredo
12/28/2022, 10:45 PM@asset
def my_dataframe() -> Output[pandas.DataFrame]:
df = pandas.read_csv('some_path.csv')
return df
@op
def persist_df(my_dataframe):
for index, row in my_dataframe.iterrows():
# request here
pass
@job
def schedulable_job():
persist_df()
Philip Orlando
12/29/2022, 1:19 AMYeshwanth LN
12/29/2022, 5:23 AMKulanjith Deelaka
12/29/2022, 10:27 AMUPathIOManager
the method context.add_output_metadata()
does not work, rather we have to implement UPathIOManager.get_metadata()
. It would be helpful if the docs explicitly stated that calling context.add_output_metadata()
from UPathIOManager.dump_to_path()
does not work.Joris Ganne
12/29/2022, 10:28 AMDaniel Galea
12/29/2022, 1:12 PMsqlalchemy.exc.OperationalError: (psycopg2.OperationalError) could not translate host name "$HOST_NAME" to address: Temporary failure in name resolution
This started happening all of a sudden and my DB is running so that shouldn't be an issue. I am running Dagster version 1.1.3 and HELM chart 1.1.3. Is this a known bug in that version and updating should fix it or is it something else?Bartek Marczak
12/29/2022, 1:28 PM@asset(partitions_def=HourlyPartitionsDefinition(start_date="2022-01-01-00:00"))
def blog_posts(context) -> List[Dict]:
partition_datetime_str = context.asset_partition_key_for_output()
hour = datetime.fromisoformat(partition_datetime_str)
posts = fetch_blog_posts_from_external_api(hour_when_posted=hour)
return posts
I can use the above asset as input to other assets - that’s all nice and fine.
But what I would also like to be able to do is to run a job each time a new partition of blog_posts
has been materialized in order to publish the data to yet another external API. I assumed I would be able to do this fairly easily using an asset sensor, like so:
@asset_sensor(asset_key=AssetKey("blog_posts"), job=my_job)
def new_materialized_blog_posts_sensor(context, asset_event):
yield RunRequest(
run_key=context.cursor,
run_config={
"ops": {
"my_op": {
"config": {
"asset_key_path": asset_event.dagster_event.asset_key.path,
"partition_key": asset_event.dagster_event.partition,
}
}
}
},
)
This triggers my_job
and that job executes my_op
which receives the asset_key and partition. So it seems like I’m just a step away. But the problem I have is: how do I access the underlying data? Do I need to work directly with the IOManager
class to load the input? If so, how do initialise the correct IO manager based on what has been configured for the asset storage? Once I have the IO manager, I should be able to access the data using the approach for testing:
context = build_input_context(asset_key=asset_key, partition_key=partition_key)
blog_posts = manager.load_input(context)
But this seems a bit hacky (the recommendation is to use this for tests only, I think).
So what is the best way I can access the materialized asset partition in my other job?Roman Maliushkin
12/29/2022, 2:03 PMAssetMaterialization
event?
For instance, I have `AssetMaterialization`:
AssetMaterialization(
asset_key=AssetKey("my_key"),
description="Update asset",
metadata={
"date": "2022-12-01"
},
)
Then I would like to get these value in asset_sensor
method:
@asset_sensor(asset_key=AssetKey("my_key"), job=dbx_to_druid_general_job)
def internal_bi_bid_funnel_etl_job_sensor(context: SensorEvaluationContext, asset_event: EventLogEntry):
...
owen
12/29/2022, 3:10 PM@op(ins={"cc_id": In(), "tbl": In(), "start_after": In(Nothing))
def cf_run_event_processing(context, cc_id, tbl):
then your job would have
meltano_done = meltano_run_op(...)()
cc_id, cache_key = cf_run_event_processing(..., start_after=meltano_done)
Jose Estudillo
12/29/2022, 6:28 PM