Arun Kumar
05/13/2021, 11:44 PMPeter B
05/14/2021, 3:07 AMassaf
05/14/2021, 3:47 PMdb.t3.micro
, which is grossly underpowered. I recently scaled it up to db.m6g.large
, restarted the Dagit deployment, yet the issue persists.
I'd like some clues on how exactly this issue appears, and what I many be able to do about it.Cameron Gallivan
05/14/2021, 10:14 PMExpected type '_SpecialForm[bool]', got 'bool' instead
It mostly just happens when constructing an OutputDefinition but I’ve seen it trigger in the @resource
decorator as well. The only problem it actually is causing is that it makes it more tedious looking for Inspections that actually matter.Benj Smith
05/16/2021, 2:21 AMdagster api grpc --python-file pipelines/my_repo.py --attribute my_repository --host 127.0.0.1 --port 8889
But every pipeline fails with:
AttributeError: module 'grpc' has no attribute 'insecure_channel'
Seen this before?john eipe
05/16/2021, 8:00 AMeg:
task-group seq
A 1
B 1
C 2
task seq task-group
a1 1. A
a2 1. A
b1 1. B
b2 2. B
c3 1. C
task-group is only a logical wrapper that helps to group and control the execution flow, maybe composite solids are a fit here
tasks are essentially solids, say
@solid
def a1():
pass
Execution flow:
Task group A and B starts parallely (seq=1) and the task a1, a2 run parallel (seq=1) but b2 runs only after b1 is complete. c3 starts after tasks in Task group A and B are complete.
Now based off this metadata that resides in file or DB - is it possible to create a dynamic pipeline?Drew Sonne
05/17/2021, 12:29 PMdagster_pandas
. I haven't done any development on dagster before, but I created a PR. I'm not sure how to do about this from here on out: https://github.com/dagster-io/dagster/pull/4179Drew Sonne
05/17/2021, 12:37 PMAn unexpected exception was thrown. Please file an issue.
TypeError: Object of type Timestamp is not JSON serializable
File "/Users/drew/.pyenv/versions/3.8.9/envs/presentient-dags/lib/python3.8/site-packages/dagster/core/execution/plan/execute_plan.py", line 189, in _dagster_event_sequence_for_step
for step_event in check.generator(step_events):
File "/Users/drew/.pyenv/versions/3.8.9/envs/presentient-dags/lib/python3.8/site-packages/dagster/core/execution/plan/execute_step.py", line 311, in core_dagster_event_sequence_for_step
for evt in _type_check_and_store_output(step_context, user_event, input_lineage):
File "/Users/drew/.pyenv/versions/3.8.9/envs/presentient-dags/lib/python3.8/site-packages/dagster/core/execution/plan/execute_step.py", line 362, in _type_check_and_store_output
for output_event in _type_check_output(step_context, step_output_handle, output, version):
File "/Users/drew/.pyenv/versions/3.8.9/envs/presentient-dags/lib/python3.8/site-packages/dagster/core/execution/plan/execute_step.py", line 229, in _type_check_output
yield DagsterEvent.step_output_event(
File "/Users/drew/.pyenv/versions/3.8.9/envs/presentient-dags/lib/python3.8/site-packages/dagster/core/events/__init__.py", line 580, in step_output_event
return DagsterEvent.from_step(
File "/Users/drew/.pyenv/versions/3.8.9/envs/presentient-dags/lib/python3.8/site-packages/dagster/core/events/__init__.py", line 291, in from_step
log_step_event(step_context, event)
File "/Users/drew/.pyenv/versions/3.8.9/envs/presentient-dags/lib/python3.8/site-packages/dagster/core/events/__init__.py", line 197, in log_step_event
log_fn(
File "/Users/drew/.pyenv/versions/3.8.9/envs/presentient-dags/lib/python3.8/site-packages/dagster/core/log_manager.py", line 234, in debug
return self._log(logging.DEBUG, msg, kwargs)
File "/Users/drew/.pyenv/versions/3.8.9/envs/presentient-dags/lib/python3.8/site-packages/dagster/core/log_manager.py", line 204, in _log
logger_.log(level, message, extra=extra)
File "/Users/drew/.pyenv/versions/3.8.9/lib/python3.8/logging/__init__.py", line 1512, in log
self._log(level, msg, args, **kwargs)
File "/Users/drew/.pyenv/versions/3.8.9/lib/python3.8/logging/__init__.py", line 1589, in _log
self.handle(record)
File "/Users/drew/.pyenv/versions/3.8.9/lib/python3.8/logging/__init__.py", line 1599, in handle
self.callHandlers(record)
File "/Users/drew/.pyenv/versions/3.8.9/lib/python3.8/logging/__init__.py", line 1661, in callHandlers
hdlr.handle(record)
File "/Users/drew/.pyenv/versions/3.8.9/lib/python3.8/logging/__init__.py", line 954, in handle
self.emit(record)
File "/Users/drew/.pyenv/versions/3.8.9/envs/presentient-dags/lib/python3.8/site-packages/dagster/core/instance/__init__.py", line 110, in emit
self._instance.handle_new_event(event)
File "/Users/drew/.pyenv/versions/3.8.9/envs/presentient-dags/lib/python3.8/site-packages/dagster/core/instance/__init__.py", line 1065, in handle_new_event
self._event_storage.store_event(event)
File "/Users/drew/.pyenv/versions/3.8.9/envs/presentient-dags/lib/python3.8/site-packages/dagster/core/storage/event_log/sqlite/sqlite_event_log.py", line 217, in store_event
insert_event_statement = self.prepare_insert_event(event)
File "/Users/drew/.pyenv/versions/3.8.9/envs/presentient-dags/lib/python3.8/site-packages/dagster/core/storage/event_log/sql_event_log.py", line 81, in prepare_insert_event
event=serialize_dagster_namedtuple(event),
File "/Users/drew/.pyenv/versions/3.8.9/envs/presentient-dags/lib/python3.8/site-packages/dagster/serdes/serdes.py", line 174, in serialize_dagster_namedtuple
return _serialize_dagster_namedtuple(nt, whitelist_map=_WHITELIST_MAP, **json_kwargs)
File "/Users/drew/.pyenv/versions/3.8.9/envs/presentient-dags/lib/python3.8/site-packages/dagster/serdes/serdes.py", line 180, in _serialize_dagster_namedtuple
return seven.json.dumps(_pack_value(nt, whitelist_map), **json_kwargs)
File "/Users/drew/.pyenv/versions/3.8.9/lib/python3.8/json/__init__.py", line 234, in dumps
return cls(
File "/Users/drew/.pyenv/versions/3.8.9/lib/python3.8/json/encoder.py", line 199, in encode
chunks = self.iterencode(o, _one_shot=True)
File "/Users/drew/.pyenv/versions/3.8.9/lib/python3.8/json/encoder.py", line 257, in iterencode
return _iterencode(o, 0)
File "/Users/drew/.pyenv/versions/3.8.9/lib/python3.8/json/encoder.py", line 179, in default raise TypeError(f'Object of type {o.__class__.__name__} '
I have just started going down the path of a custom materializer, but then thought I should ask if there's any guidance on how to handle this?Drew Sonne
05/17/2021, 12:38 PMszalai1
05/17/2021, 3:31 PMintermediate_storage_defs
relate to pipeline level io_manager
's ?
Our current mode_defs have intermediate storage and at some places we use io_manager in solids, if I set <http://resources.io|resources.io>_manager
can/should I delete ``intermediate_storage_defs` from the mode definitions ?jeremy
05/17/2021, 8:23 PMCharles Lariviere
05/18/2021, 5:06 PMStatus:Terminated
and Reason: Completed
.
An exception was thrown during execution that is likely a framework error, rather than an error in user code.
Jamie
05/18/2021, 6:26 PMMakoto
05/18/2021, 6:39 PMstate_modified_config_in_test = {"project-dir": PROJECT_DIR, "models": ["state:modified"], "target": "test"}
run_only_changed_models_in_test = dbt_cli_run.configured(state_modified_config_in_test,
name="run_only_changed_models_in_test")
run_all_config_in_test = {"project-dir": PROJECT_DIR, "target": "test"}
run_all_models_in_test = dbt_cli_run.configured(run_all_config_in_test, name="run_all_models_in_test")
I want to be able to conditionally run them depending on if the run
directory exists under the target directory. I read up on conditional branching but what I am not sure how to do is to conditionally invoke one of the dbt solids. I tried using a composite solid to pass in the yielded output but since the dbt solid does not take any input argument, I get the unused input error for the composite solid. Is there a way to work around it or a better way to achieve it?Kirk Stennett
05/18/2021, 7:17 PMJenny Webster
05/19/2021, 3:58 PMBillie Thompson
05/19/2021, 4:35 PMChristian Lam
05/19/2021, 5:18 PMThomas
05/19/2021, 8:12 PMKirk Stennett
05/19/2021, 8:49 PMKirk Stennett
05/20/2021, 12:20 AMAn exception was thrown during execution that is likely a framework error, rather than an error in user code.
dagster.core.errors.DagsterInvariantViolationError: Could not find pipeline 'redshift_checks'. Found ['y', 'z']
This is happening on a pipeline that has a first solid with a step that outputs a DynamicOutput which I think is somewhat related to this. For whatever reason it runs fine when I execute it locally with preset and mode not listed here, but when I run it in k8s with this config it returns the above error. It loads fine in dagit & playground but not when execution is launched. I'm using version 0.11.3
Pipeline def:
@pipeline(
mode_defs=[
ModeDefinition(
name="production",
resource_defs={
"redshift": redshift_resource,
"slack": slack_resource,
"s3": s3_resource,
"io_manager": s3_pickle_io_manager
},
executor_defs=default_executors + [celery_k8s_job_executor],
),
],
preset_defs=[
PresetDefinition.from_files(
name="celery_k8s",
mode="production",
config_files=[
'celery_k8s.yaml',
'redshift_config.yaml', -- contains the env info for redshift resource
'redshift_queries.yaml', -- contains the list of dicts to run queries
]
),
]
)
def redshift_checks():
redshift_yaml_map().map(redshift_data_check)
could this be related to the dynamic output with k8s processing / is this something that updating would solve?Rubén Lopez Lozoya
05/20/2021, 7:25 AMdagster.core.errors.DagsterExecutionHandleOutputError: Error occurred while handling output "result" of step "solid_publish_contract_info_response":
File "/opt/pysetup/.venv/lib/python3.8/site-packages/dagster/core/execution/plan/execute_plan.py", line 190, in _dagster_event_sequence_for_step
for step_event in check.generator(step_events):
File "/opt/pysetup/.venv/lib/python3.8/site-packages/dagster/core/execution/plan/execute_step.py", line 317, in core_dagster_event_sequence_for_step
for evt in _type_check_and_store_output(step_context, user_event, input_lineage):
File "/opt/pysetup/.venv/lib/python3.8/site-packages/dagster/core/execution/plan/execute_step.py", line 369, in _type_check_and_store_output
for evt in _store_output(step_context, step_output_handle, output, input_lineage):
File "/opt/pysetup/.venv/lib/python3.8/site-packages/dagster/core/execution/plan/execute_step.py", line 484, in _store_output
handle_output_res = output_manager.handle_output(output_context, output.value)
File "/usr/local/lib/python3.8/contextlib.py", line 131, in __exit__
self.gen.throw(type, value, traceback)
File "/opt/pysetup/.venv/lib/python3.8/site-packages/dagster/core/errors.py", line 197, in user_code_error_boundary
raise error_cls(
The above exception was caused by the following exception:
dagster_postgres.utils.DagsterPostgresException: too many retries for DB connection
File "/opt/pysetup/.venv/lib/python3.8/site-packages/dagster/core/errors.py", line 187, in user_code_error_boundary
yield
File "/opt/pysetup/.venv/lib/python3.8/site-packages/dagster/core/execution/plan/execute_step.py", line 484, in _store_output
handle_output_res = output_manager.handle_output(output_context, output.value)
File "/opt/pysetup/.venv/lib/python3.8/site-packages/dagster/core/storage/fs_io_manager.py", line 97, in handle_output
context.log.debug(f"Writing file at: {filepath}")
File "/opt/pysetup/.venv/lib/python3.8/site-packages/dagster/core/log_manager.py", line 234, in debug
return self._log(logging.DEBUG, msg, kwargs)
File "/opt/pysetup/.venv/lib/python3.8/site-packages/dagster/core/log_manager.py", line 204, in _log
logger_.log(level, message, extra=extra)
File "/usr/local/lib/python3.8/logging/__init__.py", line 1512, in log
self._log(level, msg, args, **kwargs)
File "/usr/local/lib/python3.8/logging/__init__.py", line 1589, in _log
self.handle(record)
File "/usr/local/lib/python3.8/logging/__init__.py", line 1599, in handle
self.callHandlers(record)
File "/usr/local/lib/python3.8/logging/__init__.py", line 1661, in callHandlers
hdlr.handle(record)
File "/usr/local/lib/python3.8/logging/__init__.py", line 954, in handle
self.emit(record)
File "/opt/pysetup/.venv/lib/python3.8/site-packages/dagster/core/instance/__init__.py", line 152, in emit
self._instance.handle_new_event(event)
File "/opt/pysetup/.venv/lib/python3.8/site-packages/dagster/core/instance/__init__.py", line 1087, in handle_new_event
self._event_storage.store_event(event)
File "/opt/pysetup/.venv/lib/python3.8/site-packages/dagster_postgres/event_log/event_log.py", line 139, in store_event
with self._connect() as conn:
File "/usr/local/lib/python3.8/contextlib.py", line 113, in __enter__
return next(self.gen)
File "/opt/pysetup/.venv/lib/python3.8/site-packages/dagster_postgres/utils.py", line 157, in create_pg_connection
conn = retry_pg_connection_fn(engine.connect)
File "/opt/pysetup/.venv/lib/python3.8/site-packages/dagster_postgres/utils.py", line 124, in retry_pg_connection_fn
raise DagsterPostgresException("too many retries for DB connection") from exc
Liam Coatman
05/20/2021, 10:50 AMJean-Pierre M
05/20/2021, 1:32 PMMuthu
05/20/2021, 7:16 PMscheduler
?szalai1
05/21/2021, 12:35 PMconfig_schema
(arbitrary types).
As a workaround we can use inputs and configure it in the run_config
. But I would like to do it in code.
Can I codify solid static inputs (not in run_config) ?jeremy
05/21/2021, 7:15 PMPiotr Dworzyński
05/22/2021, 6:01 PMPiotr Dworzyński
05/23/2021, 4:19 PMMemoizableIOManager.has_output
. For example, the below pipeline (self-contained) runs without issues when the MEMOIZED_RUN_TAG
is not applied but fails when it is. This is due to `MyIOManager`'s has_output
having it's context.resource
set to None despite it being dependent on dataset
resource. However, in MyIOManager.load_input
and MyIOManager.handle_output
functions context.resource.dataset
is set (as you can see when running the pipeline without the MEMOIZED_RUN_TAG
. Is there any way around this issue?
import dagstermill as dm
from dagster import ModeDefinition, pipeline, OutputDefinition, solid, Any
from dagster.utils import script_relative_path
from dagster import IOManager, io_manager, resource, make_values_resource
from dagster.core.storage.memoizable_io_manager import MemoizableIOManager
from dagster.core.storage.tags import MEMOIZED_RUN_TAG
import pandas as pd
import pickle
import os
output_versions = {
"df": "1"
}
class MyIOManager(MemoizableIOManager):
def _get_obj_dir_path(self, context):
client_id = context.resources.dataset["client_id"]
dataset_id = context.resources.dataset["dataset_id"]
#
#invoice_line_df_version = context.resources.dataset["invoice_line_df_version"]
obj_dir_path = f"/tmp/{client_id}/{dataset_id}/"
return obj_dir_path
def _get_solid_version(self, context, obj_name):
#This is used as dagstermill doesn't allow (yet?) for versioning of solids
if context.version != None:
solid_version = context.version
else:
if obj_name in output_versions:
solid_version = output_versions[obj_name]
else:
solid_version = "0"
return solid_version
def handle_output(self, context, obj):
# name is the name given to the OutputDefinition that we're storing for
obj_name = context.name
obj_dir_path = self._get_obj_dir_path(context)
solid_version = self._get_solid_version(context,obj_name)
if not os.path.exists(obj_dir_path):
os.makedirs(obj_dir_path)
with open(obj_dir_path + obj_name + f"_v{solid_version}" + ".pickle", "wb") as f:
pickle.dump(obj, f)
def load_input(self, context):
# upstream_output.name is the name given to the OutputDefinition that we're loading for
obj_name = context.upstream_output.name
solid_version = self._get_solid_version(context,obj_name)
file_path = self._get_obj_dir_path(context) + obj_name + f"_v{solid_version}.pickle"
with open(file_path, "rb") as f:
return pickle.load(f)
def has_output(self, context):
print(context)
obj_name = context.name
obj_dir_path = self._get_obj_dir_path(context)
solid_version = self._get_solid_version(context,obj_name)
return os.path.exists(obj_dir_path + obj_name + f"_v{solid_version}" + extension)
@io_manager(required_resource_keys={"dataset"})
def my_io_manager(_):
return MyIOManager()
@solid
def test_solid():
return pd.DataFrame.from_records([{"a": 1, "b": 2}, {"a": 2, "b": 3}])
@pipeline(
mode_defs=[
ModeDefinition(
resource_defs={"io_manager": my_io_manager, "dataset": make_values_resource()}
)
],
#tags={MEMOIZED_RUN_TAG: "true"}
)
def ingestion_pipeline():
test_solid()
Example config:
resources:
dataset:
config:
client_id: test-client
dataset_id: test-dataset
Daniel Kim
05/24/2021, 2:08 AMbase_dir
in a dagstermill example when producing local artifacts on a Windows 10 machine, I found out that if I specify a run_coordinator
in my dagster.yaml
file, then the base_dir
seems to default to where I launch the dagster-daemon run
command. If I don't have run_coordinator
specified in the dagster.yaml
file, then the base_dir
defaults to where the dagit
command was executed from. Is this expected? I am on version 0.11.10.
I know I can set base_dir
in local_artifact_storage
specification in the dagster.yaml
file to override the default location, but per my GH issue (https://github.com/dagster-io/dagster/issues/3898) it is not working for me, presumably due to Windows path.