Phil Whitby
11/22/2022, 5:44 PMfreshness_policies_by_key={
asset_key: FreshnessPolicy(maximum_lag_minutes=45) for asset_key, (input_name, _) in (asset_ins | asset_outs).items()
}
Casper Weiss Bang
11/22/2022, 8:03 PMjordan
11/22/2022, 8:04 PMChris Evans
11/22/2022, 8:07 PMdagster-k8s/config
tag to its underlying op. It seems like tags are only defined on the jog level and not necessarily on the asset level?Qwame
11/22/2022, 8:25 PMcontext.add_input_metadata
, how do I access the metadata in another run. I currently use this to get metadata that was added using context.add_output_metadata
latest_materialization_event = self.init_context.instance.get_latest_materialization_events(
[asset_key]
).get(asset_key)
<http://context.log.info|context.log.info>(latest_materialization_event)
if latest_materialization_event:
materialization = (
latest_materialization_event.dagster_event.event_specific_data.materialization
)
metadata = {entry.label: entry.entry_data for entry in materialization.metadata_entries}
Wondering if there is a way to get context.add_input_metadata
metadata?Ashkan Al e Ali
11/22/2022, 10:13 PMBen Gatewood
11/22/2022, 11:53 PMto_job()
. The graph has one op in it which is async and must be awaited therefore the graph function must be async as well obviously but this arrangement throws
dagster._core.errors.DagsterInvalidDefinitionError: @graph 'nba_trends_graph' returned problematic value of type <class 'coroutine'>. Expected return value from invoked solid or dict mapping output name to return values from invoked solids
nickvazz
11/23/2022, 12:43 AMdagster job launch -j some_job --config-json {...}
Do I need to pass the gRPC host/port? Where do I find those, are they shown somewhere on the dagit ui?Oliver
11/23/2022, 1:23 AMpartition_def = StaticPartitionDefinition(['0','1','2'])
@asset(
partition_def=partition_def
)
def a(): return 'a'
@asset(
partition_def=partition_def
)
def b(): return 'b'
now I want to introduce a third asset c that produces a partition for the cross product of the other two. so
cross_part = MultiPartitionsDefinition({
'a': partition_def,
'b': partition_def
})
@asset(
partition_def=partition_def
)
def c(a, b): return a+b
I guess I need to implement two `PartitionMapping`s? one which takes the first part of the multikey and the other that takes the second part and then supply like partition_mapping={'a': APartitionMapping(), 'b': BPartitionMapping()}
- is that the correct approach?Mahesh Chandran
11/23/2022, 3:15 AMMoses Milazzo
11/23/2022, 7:00 AMwhile (True)
loop listening to this subscription and sending the data provided by the event to a particular function. The plan is that part of that function will write a file to disc, which will then trigger a Dagster sensor.
I would prefer to be able to have Dagster listen to this subscription and fire an op whenever an event happens on that subscription. Is there something like an IOManager example that could be helpful for developing something like this?
Thanks in advance.Frédéric Kaczynski
11/23/2022, 9:01 AMstep_isolated_job
mode of execution in Kubernetes, and I'm running into an issue:
When a job is launched, the run will create a pod for each op (or asset). This pod will run the command dagster api execute_run
. My issue is that I use poetry, and to run dagster
, the command to run dagster
is poetry run dagster ....
. My pod thus crashes and the job stops. For the user code deployment pod, it's easy because I can specify my own command to run the Dagster GRPC api, but since pods of the run are launched and managed by Dagster, I have no control on them.
Is there a way to specify some kind of prefix so that Dagster launches the correct command inside the pod?
Thanks in advance! 🙂Gustavo Carvalho
11/23/2022, 11:57 AMin_process_executor
, are the outputs/inputs serialized/deserialized between steps? Or since they are in the same process, they are pushed from one Op to the other in memory?Jonathan Lamiel
11/23/2022, 12:15 PMDagster deamon
which can’t (per the doc) handle replicas for the moment. As it’s running sensors / schedule etc… I was wondering:
1. If there was benchmarks of workload it can handle in //?
2. Any plan to make it scalable? (Didn’t found any issues mentioning it)Bethany
11/23/2022, 3:41 PMrun_monitoring
without having to wait for the situation where a run worker crashes to arise on it’s own?Rama
11/23/2022, 3:42 PMTom Reilly
11/23/2022, 3:46 PM@asset(group_name=ASSET_GROUP)
def file_uuid() -> str:
file_uuid = utils.file.create_file_uuid()
return file_uuid
from dagit I can materialize this on its own when clicking the "Materialize" button. If I shift+click the "Materialize" button, the launchpad opens but errors for missing config are thrown. If I then click "Scaffold Missing Config", config for unrelated ops/assets is populated in the launchpad. Why am I forced to scaffold config for assets/ops that are not part of the lineage of the asset I want to materialize? I also noticed in the bottom right of the launchpad under the header "Assets (Ops)" I see all my repo's assets instead of just the one asset I selected when attempting to materialize. Thoughts?Wonjae Lee
11/23/2022, 5:31 PMLucas Gabriel
11/23/2022, 5:36 PMPhil Whitby
11/23/2022, 5:36 PMScheduleDefinition(cron_schedule="* * * * *",
job = define_asset_job("update_sources", selection=AssetSelection.all()))
but I cant seem to find an equivalent for observing sources on a schedule?Gayathiri
11/23/2022, 8:02 PMClovis Warlop
11/23/2022, 9:35 PMVictor Shen
11/24/2022, 3:02 AMmaterialize all
button always shows the launchpad, rather than the UI for launching backfills 🧵Oliver
11/24/2022, 4:07 AMOperation name: AssetGraphLiveQuery
Message: maximum recursion depth exceeded while calling a Python object
Path: ["assetNodes",26,"projectedLogicalVersion"]
Locations: [{"line":44,"column":3}]
Clovis Warlop
11/24/2022, 4:53 AMChris Comeau
11/24/2022, 5:51 AMKoby Kilimnik
11/24/2022, 6:45 AMДаниил Конев
11/24/2022, 7:33 AMCasper Weiss Bang
11/24/2022, 10:36 AMCasper Weiss Bang
11/24/2022, 11:27 AM