Stepan Dvoiak
06/30/2022, 2:34 PMcontext.log
when using it from concurrent.futures.Future
it’s preventing asset materialisation. Im trying to wait on every future I have created and when its done - yield AssetMaterialization
. But its seems like when inside future function Im using <http://context.log.info|context.log.info>()
there will be only 1 AssetMaterialization
at the end of execution.
Also testing event stream tells that all assets was in the stream.
Here is code example for recreating this strange behaviour, was made by redacting dagster new-project PROJECT_NAME
generated code.
https://github.com/oyvsyo/dagster_logger_example
Im very interested why this happening. Thanks in advance!logging.getLogger('my_logger')
, register it in dagster.yaml
as managed_python_loggers
and pass it to the function - will solve asset materialisation issue.
https://docs.dagster.io/concepts/logging/python-loggingchris
06/30/2022, 7:41 PMimport time
from concurrent.futures import ThreadPoolExecutor
from dagster import op, AssetMaterialization, AssetKey, Nothing, Out, job
def work(i: int, logger) -> int:
time.sleep(3)
return logger
def work_logger(i: int, logger) -> int:
<http://logger.info|logger.info>(f'working {i}')
time.sleep(3)
return logger
def test_op():
@op(out=Out(Nothing))
def hello(context):
"""
An op definition. This example op outputs a single string.
For more hints about writing Dagster ops, see our documentation overview on Ops:
<https://docs.dagster.io/concepts/ops-jobs-graphs/ops>
"""
futures = []
executor = ThreadPoolExecutor(1)
for i in range(3):
f = executor.submit(work, i, context.log)
futures.append(f)
for f in futures:
<http://context.log.info|context.log.info>(f"waiting future done")
res = f.result()
yield AssetMaterialization(
asset_key=AssetKey('asset'),
metadata={"logger": str(res)},
)
<http://context.log.info|context.log.info>(f"future done {res}")
context.log.warning("Starting jobs with logger in futures")
futures = []
for i in range(3):
f = executor.submit(work_logger, i, context.log)
futures.append(f)
for f in futures:
<http://context.log.info|context.log.info>(f"waiting future done")
res = f.result()
yield AssetMaterialization(
asset_key=AssetKey('asset'),
metadata={"logger": str(res)},
)
<http://context.log.info|context.log.info>(f"future done {res}")
return Nothing
@job
def the_job():
hello()
result = the_job.execute_in_process()
assert len(result.asset_materializations_for_node("hello")) == 6
Stepan Dvoiak
07/01/2022, 11:19 AMdagster job execute -f minimal_example/jobs/say_hello.py
chris
07/01/2022, 9:26 PMStepan Dvoiak
07/13/2022, 3:38 PMmanaged_python_loggers
is not a solution to the problem.
But i found another workaround - to use concurrent.futures.wait
function to pool ready futures and yield assets getting the result. Here is the commit:
https://github.com/oyvsyo/dagster_logger_example/pull/1/files
Also there is some mismatch of chronological order of logger INFO
and ASSET_MATERIALISATION
events, but for me its ok