Johannes Müller
08/29/2023, 1:36 PMfrom unittest import mock
from unittest.mock import patch
from dagster import (
op,
job,
)
from dagster_slack import slack_resource
from dagster._core.execution.plan.compute_generator import invoke_compute_fn
@op(required_resource_keys={"slack"})
def my_op(context):
print(f'Inside the real operation: {context.op.name}')
context.resources.slack.chat_postMessage(channel="#test42", text="test")
@op(required_resource_keys={"slack"})
def my_second_op(context):
print(f'Inside the real second operation: {context.op.name}')
@job(resource_defs={"slack": slack_resource.configured({"token": "xoxb-123456"})})
def my_job():
my_op()
my_second_op()
def test_my_job():
"""Mock the context of the operation my_op within a job and completely mock the operation my_second_op."""
mocked_context = mock.MagicMock()
my_second_op_mock = mock.MagicMock()
def invoke_op_with_mocked_context(fn, context, kwargs, context_arg_provided, config_arg_cls, resource_args=None):
# Mock specific operations (or mock all of them by replacing fn).
if context.op.name == 'my_second_op':
fn = my_second_op_mock
# Use this to mock the context for all operations
context = mocked_context
return invoke_compute_fn(fn, context, kwargs, context_arg_provided, config_arg_cls, resource_args)
with patch('dagster._core.execution.plan.compute_generator.invoke_compute_fn') as mocked_invoke_compute_fn:
mocked_invoke_compute_fn.side_effect = invoke_op_with_mocked_context
my_job.execute_in_process()
assert mocked_context.resources.slack.chat_postMessage.called
assert my_second_op_mock.called
Johannes Müller
08/29/2023, 1:40 PMmy_job.execute_in_process(resources={'slack': mock.MagicMock()})
Johannes Müller
08/29/2023, 1:42 PM