André Augusto
03/09/2022, 6:13 PMop
that access the run_config
in it (using context.run_config
). When I tried to develop tests for this op
, I found out that I cannot set the run_config
in the build_op_context
. Is that correct? If so, the best way to test it is doing something like this?
"""file: test_op.py"""
@graph
def new_job_to_test_op(inputs):
op_to_be_tested(inputs)
def test_op():
run_config = ...
testing_op = job_to_test_op.to_job()
testing_op.execute_in_process(run_config=run_config)
Leo Kell
03/09/2022, 8:34 PMScheduleDefinition
solid_selection
parameter work for op names? If any ops belong to nested graphs, should we include the graph name for the solid_selection
? I'm having trouble getting this to work - Whenever I put a list of op names inside solid_selection
(with or without the nested graph names), the schedule's launched runs included all of the ops, not just the selected ones.Anoop Sharma
03/09/2022, 10:30 PMBen Gatewood
03/10/2022, 6:02 AMs3_pickle_asset_io_manager
. I've configured it as per the doc page and it's popped up in dagit but when I try to materialise it, it complains about missing config for the s3 io manager (fair enough, I haven't given it any). Where on earth an I supposed to provide that though?Ben Gatewood
03/10/2022, 6:02 AMKalyan katamreddi
03/10/2022, 8:15 AMwith build_op_context(
resources={'ge_data_context': ge_data_context},
resources_config={
'ge_data_context': {
'config': {
'ge_root_dir': file_relative_path(
__file__,
'../great_expectations'
)
}
}
}
) as context:
ge = ge_validation_op_factory(
name='great_expecations',
datasource_name='test_data',
suite_name='target.warning',
)(context=context)
res = ge(df)
if res['success']:
return df
return ValueError
Hi all,
I have tried invoking ge_op_validation_factory using build_op_context as a context manager, but facing the below error
""" No value provided for required input "dataset" """"
please do let me know where can i provide the datasetbeenin
03/10/2022, 9:27 AMYoann Couble
03/10/2022, 10:59 AMJuan
03/10/2022, 3:12 PMRoel Hogervorst
03/10/2022, 3:22 PM{{ }}
and I could also see the rendered SQL, is something like that possible with dagster ?Steven Tran
03/10/2022, 4:37 PMTravis Cole
03/10/2022, 6:28 PM@op(
required_resource_keys={'warehouse', 'snowflake_db'},
out=Out(
io_manager_key="snowflake_io_manager",
metadata={"table": "DAGSTER.CHATS", "partitioned": False},
))
def load_data_chats(context, incremental=True, table='chats'):
if incremental:
max_update_date = str(context.resources.snowflake_db.get_max_updated_at(table=table,schema='DAGSTER'))
return context.resources.warehouse.incremental_load_from_table(table, max_update_date)
else:
return context.resources.warehouse.full_load_from_table(table)
Jay Ragbeer
03/10/2022, 6:54 PMif __name__ == "__main__":
with a scheduled job?
I think the docs show a way of using that syntax with a job, but only when executing a job outside of a schedule?
Depending on where i put the above, i get errors such as _"NameError: name 'mp_func' is not defined"_ or _"dagster.core.errors.DagsterInvariantViolationError: home_repository not found at module scope in file C:/Users/Jay/PycharmProjects/dagster_home/home_scheduler.py."_
My end goal is to schedule a multithreaded/multiprocessed task (using dask).Solaris Wang
03/10/2022, 8:10 PMStephen Bailey
03/10/2022, 8:10 PMsnowflake_resource
and pandas
to construct dataframes, or at least why this doesn't work?
import pandas
from dagster import job, op
from dagster_snowflake import snowflake_resource
@op(required_resource_keys={'snowflake'})
def get_df(context):
conn = context.resources.snowflake.get_connection()
return pandas.read_sql_query('select 1 as foo', conn)
@op
def print_df(context, data):
<http://context.log.info|context.log.info>(data)
@job(resource_defs={"snowflake": snowflake_resource})
def my_snowflake_job():
df = get_df()
print_df(df)
Stephen Bailey
03/10/2022, 8:14 PMresource
? The snowflake_resource
shown here always seems default to a certain set of environment variables (snowflake_username, snowflake_password, etc.). How do i override those settings?
@job(resource_defs={"snowflake": snowflake_resource})
def my_snowflake_job():
df = get_df()
print_df(df)
Stephen Bailey
03/10/2022, 8:32 PMresource_defs
overwrite all config variables? If i define default configuration, but supply a resource_def
, it seems to overwrite my default configs.
for example, the below yields the following in dagster
defaults = {
"ops":{
"write_df": {
"config": {
"table": "my_table",
"schema": "my_schema",
}
}
},
"resources": {
"snowflake": {
"config": {
"user": "foo",
"password": "bar"
}
}
}
}
@job(resource_defs={"snowflake": snowflake_resource}, config=defaults)
def my_snowflake_job():
df = get_df()
print_df(df)
write_df(df)
LaurentS
03/10/2022, 8:34 PMdocker pull
with no problem, so is there some extra config I need to pass to dagit or the daemon so they can access the container registry? (I'm saying "I've started..." because it only appeared since I upgraded to 0.14.3 from 0.13.19 but my config was pretty messy, so it might have been hidden behind other problems)
docker.errors.APIError: 500 Server Error for <http+docker://localhost/v1.41/images/create?tag=49a20c1d&fromImage=registry.gitlab.com%2F[myrepo]%2Fdagster_user_code>: Internal Server Error ("Head "<https://registry.gitlab.com/v2/[myrepo]/dagster_user_code/manifests/49a20c1d>": denied: access forbidden")
File "/usr/local/lib/python3.9/site-packages/dagster/core/instance/__init__.py", line 1575, in launch_run
self._run_launcher.launch_run(LaunchRunContext(pipeline_run=run, workspace=workspace))
File "/usr/local/lib/python3.9/site-packages/dagster_docker/docker_run_launcher.py", line 149, in launch_run
self._launch_container_with_command(run, docker_image, command)
File "/usr/local/lib/python3.9/site-packages/dagster_docker/docker_run_launcher.py", line 107, in _launch_container_with_command
client.images.pull(docker_image)
File "/usr/local/lib/python3.9/site-packages/docker/models/images.py", line 444, in pull
pull_log = self.client.api.pull(
File "/usr/local/lib/python3.9/site-packages/docker/api/image.py", line 428, in pull
self._raise_for_status(response)
File "/usr/local/lib/python3.9/site-packages/docker/api/client.py", line 270, in _raise_for_status
raise create_api_error_from_http_exception(e)
File "/usr/local/lib/python3.9/site-packages/docker/errors.py", line 31, in create_api_error_from_http_exception
raise cls(e, response=response, explanation=explanation)
mrdavidlaing
03/11/2022, 11:24 AMpandas.DataFrame
to a graph()
when testing?
The “old” execute_solid()
works for me:
result = execute_solid(
merge_fact_entitlements, # a graph() expecting 2 dataframe inputs
mode_def=inmemory_mode,
raise_on_error=False,
input_values={
'df_from_booking_orders': df_bo, # a pandas DataFrame
'df_from_purchase_orders': df_po, # a pandas DataFrame
},
)
But trying to do the same thing using <http://graph.to|graph.to>_job().execute_in_process()
fails whilst trying to coerce the Dataframe inputs (see 🧵 for full stacktrace)
result = merge_fact_entitlements.to_job( # a graph() expecting 2 dataframe inputs
name="merge_fact_entitlements_job",
resource_defs={
'io_manager': mem_io_manager,
..snip..
},
).execute_in_process(
raise_on_error=False,
run_config={'inputs': {
'df_from_booking_orders': df_bo, # a pandas DataFrame
'df_from_purchase_orders': df_po, # a pandas DataFrame
}}
)
Jori Geysen
03/11/2022, 12:00 PMpython_logs
option in the dagster.yaml
to stream the python logs to dagit
and it works great. Really pleased about that. I've mounted the dagster.yaml
in my dagit
container and doing so, I'm setting a global logging level for all UCD containers.
However, I'm wondering if it would be possible to set the logging level for each UCD separately? E.g. by mounting a separate .yaml
file in each UCD container. Is this at all supported/possible?
Thanks, J.Anoop Sharma
03/11/2022, 3:14 PMCharles Lariviere
03/11/2022, 4:03 PMnamespace
dynamically at execution time? For example, I want to define assets that represent Snowflake tables with the following namespace:
@asset(
namespace=["snowflake", <database>, <schema>],
)
def users() -> SnowflakeTable:
...
In this example, I would like <database>
and <schema>
to be set dynamically depending on the execution context. For instance, if this is run in dev or prod, the database and schema would be different.
With the current @op
framework, I’m achieving that by yielding an AssetMaterialization
which dynamically sets the asset key based on the current context — can we do something similar with the new @asset
? Or am I thinking about this the wrong way?Bernardo Cortez
03/11/2022, 4:09 PMsandy
03/11/2022, 4:25 PMEvan Arnold
03/11/2022, 9:27 PMModuleNotFoundError: No module named 'ipython_genutils'
Evan Arnold
03/11/2022, 9:37 PMAlexander Butler
03/12/2022, 3:31 AMQuy
03/12/2022, 12:07 PMconfig_from_files
, and partitioned_config
to illustrate
config=[
partitioned_config,
config_from_files(
[file_relative_path(__file__, os.path.join("..", "..", "run_config", "pipeline.yaml"))]
)
]
in pipeline.yaml, I want to config resource io_manager
e.g: s3_bucket
…Francis
03/12/2022, 2:31 PMlocal_artifact_storage:
module: dagster.core.storage.root
class: LocalArtifactStorage
config:
base_dir: [$DAGSTER_HOME/storage]
or alternatively, is there a way to store artifacts in cloud storage like in an S3 bucket?jasono
03/12/2022, 4:52 PMop
in order to review them for a failed run, what is the commonly used method? (either through Dagster or outside)