Hello! Having unpredictable behaviour of dagster `...
# ask-community
s
Hello! Having unpredictable behaviour of dagster
context.log
when using it from
concurrent.futures.Future
it’s preventing asset materialisation. Im trying to wait on every future I have created and when its done - yield
AssetMaterialization
. But its seems like when inside future function Im using
<http://context.log.info|context.log.info>()
there will be only 1
AssetMaterialization
at the end of execution. Also testing event stream tells that all assets was in the stream. Here is code example for recreating this strange behaviour, was made by redacting
dagster new-project PROJECT_NAME
generated code. https://github.com/oyvsyo/dagster_logger_example Im very interested why this happening. Thanks in advance!
Just find out workaround with this - create python logger with
logging.getLogger('my_logger')
, register it in
dagster.yaml
as
managed_python_loggers
and pass it to the function - will solve asset materialisation issue. https://docs.dagster.io/concepts/logging/python-logging
however it is interesting case and worth investigation, because of silent ignorance of assets de facto and tests passing can create hidden bugs that will be hard to debug
c
hey Stepan. Thanks for the detailed writeup here. It's going to take some time to go through this, but for now I'm going to create an issue regarding logging and futures.
Hey Stepan, unfortunately I wasn't able to reproduce the issue you describe. I wrote up a simple test based around your example, feel free to run it. But from what I can see, 6 asset materialization events are logged by the op, which is what one would expect. Where exactly are you seeing the behavioral inconsistency? Something that could be throwing things off is that in this example, it's always writing to the same asset keys, so each might not be picked up as separate in dagit. If you switched asset keys for every asset materialization, or used partitions along with the asset materializatons, perhaps that might fix some confusion? Example test below:
Copy code
import time
from concurrent.futures import ThreadPoolExecutor
from dagster import op, AssetMaterialization, AssetKey, Nothing, Out, job


def work(i: int, logger) -> int:
    time.sleep(3)
    return logger


def work_logger(i: int, logger) -> int:
    <http://logger.info|logger.info>(f'working {i}')
    time.sleep(3)
    return logger


def test_op():
    @op(out=Out(Nothing))
    def hello(context):
        """
        An op definition. This example op outputs a single string.
        For more hints about writing Dagster ops, see our documentation overview on Ops:
        <https://docs.dagster.io/concepts/ops-jobs-graphs/ops>
        """

        futures = []
        executor = ThreadPoolExecutor(1)
        for i in range(3):
            f = executor.submit(work, i, context.log)
            futures.append(f)

        for f in futures:
            <http://context.log.info|context.log.info>(f"waiting future done")
            res = f.result()
            yield AssetMaterialization(
                asset_key=AssetKey('asset'),
                metadata={"logger": str(res)},
            )
            <http://context.log.info|context.log.info>(f"future done {res}")

        context.log.warning("Starting jobs with logger in futures")

        futures = []
        for i in range(3):
            f = executor.submit(work_logger, i, context.log)
            futures.append(f)

        for f in futures:
            <http://context.log.info|context.log.info>(f"waiting future done")
            res = f.result()
            yield AssetMaterialization(
                asset_key=AssetKey('asset'),
                metadata={"logger": str(res)},
            )
            <http://context.log.info|context.log.info>(f"future done {res}")

        return Nothing

    @job
    def the_job():
        hello()

    result = the_job.execute_in_process()
    assert len(result.asset_materializations_for_node("hello")) == 6
s
Hi Chris! Thank You for help As I wrote in github, example tests are running Okay. Unfortunately, we can’t change asset key because of asset sensor that connects this job with another one. The behavioral inconsistency is can be seen on dagit UI or in logs that are produced by running
dagster job execute -f minimal_example/jobs/say_hello.py
Recently I have recreated asset_sensor behaviour. It will yield SkipReason on every asset and run nothing, but thats how we can count sensor cursor. (commit) After pipeline run - the sensor cursor is 4 and not 6 as expected
c
okay gotcha - let me try and reproduce that
okay, was able to reproduce via dagit. Definitely a weird scenario that only seems to be brought about when logging inside the thread.
1
s
I found that I was wrong and
managed_python_loggers
is not a solution to the problem. But i found another workaround - to use
concurrent.futures.wait
function to pool ready futures and yield assets getting the result. Here is the commit: https://github.com/oyvsyo/dagster_logger_example/pull/1/files Also there is some mismatch of chronological order of logger
INFO
and
ASSET_MATERIALISATION
events, but for me its ok