Anaqi Afendi
10/28/2021, 3:41 PMdagster.core.errors.DagsterExecutionStepExecutionError: Error occurred while executing solid "dbt_job":
...
The above exception was caused by the following exception:
KeyError: ('55fafb57-29ab-4d0e-a7b2-c6a30bcdf1db', 'dbt_job', 'dbt_job_status')
File "C:\Users\aafendi\Anaconda3\envs\dagster-environment\lib\site-packages\dagster\core\execution\plan\utils.py", line 44, in solid_execution_error_boundary
yield
File "C:\Users\aafendi\Anaconda3\envs\dagster-environment\lib\site-packages\dagster\utils\__init__.py", line 383, in iterate_with_context
next_output = next(iterator)
File "C:\Users\aafendi\Anaconda3\envs\dagster-environment\lib\site-packages\dagstermill\solids.py", line 268, in _t_fn
build_input_context(upstream_output=output_context)
File "C:\Users\aafendi\Anaconda3\envs\dagster-environment\lib\site-packages\dagster\core\storage\mem_io_manager.py", line 14, in load_input
return self.values[keys]
Slackbot
10/28/2021, 5:32 PMAnaqi Afendi
10/28/2021, 9:44 PMLouis Auneau
10/29/2021, 8:14 AMget_model(name: str)
twice to then do more complex stuff. In our original project we defined the names of our models as constants and then called the functions using those constants get_model(MY_MODEL_NAME)
. However when I transform my function into an op/solid, it doesn’t work since my constants are not another op/solid’s output. What’s the best practise to handle such cases ? Should I create one op/solid per constant which is just a function returning the value ?
Thank you by advance and have an excellent day !Jori Geysen
10/29/2021, 12:46 PMdagster.yaml
file:
1. When I want to save all logs to a file:
python_logs:
managed_python_loggers:
- my_logger
dagster_handler_config:
handlers:
console:
class: logging.FileHandler
level: DEBUG
stream: "logfile.log"
This works great, both the logs from mylogger
and the dagster logs from the user code are saved to logfile.log
, sitting in the gRPC container.
2. When I want to view the logs in terminal by running docker logs
on the gRPC container (without saving to file first):
python_logs:
managed_python_loggers:
- my_logger
dagster_handler_config:
handlers:
console:
class: logging.StreamHandler
level: DEBUG
stream: <ext://sys.stdout>
This does not work however. When running docker logs
for the gRPC container, only the the dagster logs from the user code logs show up. The logs emitted by mylogger
do however show up on the dagit UI.
=> where are the 'standard' python logs direct to when the StreamHandler is used in combination with <ext://sys.stdout>
?
Thanks a lot for your help in advance, have a nice day !Andy H
10/29/2021, 3:47 PM0.13.2
and changing to ops
from solids
. My run config appears to be correct, any ideas why I'm getting this exception?Sandeep Devarapalli
10/29/2021, 4:46 PMAlexander Butler
10/29/2021, 9:19 PMMarcel M
10/31/2021, 7:52 PM@op(out={'spec': DynamicOut(Tuple[str, str])})
def gen_query_specs():
queries = [
('activiteit', 'SELECT * FROM activiteit_data'),
('afsluitreden', 'SELECT * FROM afsluitreden_data'),
('aanleiding', 'SELECT * FROM aanleiding_data'),
]
for q in queries:
yield DynamicOutput(
output_name='spec',
value=q,
mapping_key=q[0],
)
@op(required_resource_keys={"openac"})
def ingest_query(context, query: str):
<http://context.log.info|context.log.info>(f"Running query on OpenAC: {query}")
df = context.resources.openac.query(query)
return df
@op(required_resource_keys={"rel_wh"})
def save_to_wh(context, df: pd.DataFrame, df_name: str):
df = context.resources.rel_wh.save_df(df_name, df)
<http://context.log.info|context.log.info>(f"Saved '{df_name}' to RelDB")
@graph
def ingest(spec: Tuple[str, str]):
# <http://context.log.info|context.log.info>(f"Ingesting {spec[0]}") # context not available?
query = spec[1] # <== error here: IndexError: tuple index out of range
df = ingest_query(query)
df_name = spec[0]
save_to_wh(df, df_name)
# <http://context.log.info|context.log.info>(f"Done ingesting {spec[0]}")
@graph
def ingest_openac():
specs = gen_query_specs()
specs.map(ingest)
job = ingest_openac.to_job(
resource_defs={
"openac": openac_service,
"rel_wh": reldb_warehouse_service,
},
config=config_from_files(["resources.yaml"]),
)
if __name__ == "__main__":
result = ingest_openac.execute_in_process()
Is this the wrong way to approach this problem?
What is the best way to handle this kind of repetition?
How does one map more than one op to a dynamic mapping?Francis
11/01/2021, 1:57 AMKateryna Chenina
11/01/2021, 11:59 AMdagster.core.executor.child_process_executor.ChildProcessCrashException
File "c:\users\kateryna\documents\pythonscripts\comp_rate_predictor\venv\lib\site-packages\dagster\core\executor\multiprocess.py", line 163, in execute
event_or_none = next(step_iter)
File "c:\users\kateryna\documents\pythonscripts\comp_rate_predictor\venv\lib\site-packages\dagster\core\executor\multiprocess.py", line 268, in execute_step_out_of_process
for ret in execute_child_process_command(command):
File "c:\users\kateryna\documents\pythonscripts\comp_rate_predictor\venv\lib\site-packages\dagster\core\executor\child_process_executor.py", line 158, in execute_child_process_command
raise ChildProcessCrashException(exit_code=process.exitcode)
Multiprocess executor: child process for step DS2_1_UCP.get_process_ucp_date_of_inquiry unexpectedly exited with code 3221225477Jeremy Fisher
11/01/2021, 9:36 PMMatthew Smicker
11/01/2021, 11:54 PMTypeError: cannot pickle '_thread.lock' object
File "/usr/local/lib/python3.8/site-packages/dagster/core/execution/plan/execute_plan.py", line 195, in _dagster_event_sequence_for_step
for step_event in check.generator(step_events):
File "/usr/local/lib/python3.8/site-packages/dagster/core/execution/plan/execute_step.py", line 327, in core_dagster_event_sequence_for_step
for evt in _type_check_and_store_output(step_context, user_event, input_lineage):
File "/usr/local/lib/python3.8/site-packages/dagster/core/execution/plan/execute_step.py", line 381, in _type_check_and_store_output
for evt in _store_output(step_context, step_output_handle, output, input_lineage):
File "/usr/local/lib/python3.8/site-packages/dagster/core/execution/plan/execute_step.py", line 491, in _store_output
handle_output_res = output_manager.handle_output(output_context, output.value)
File "/usr/local/lib/python3.8/site-packages/dagster/core/storage/fs_io_manager.py", line 122, in handle_output
pickle.dump(obj, write_obj, PICKLE_PROTOCOL)
Any ideas?Arturs Stramkals
11/01/2021, 11:57 PMcreate_engine
output, that now fails to pickle. I think I know how I can move this around, however.Max Wong
11/02/2021, 7:24 AMsearch redirect
bug I think
steps:
1. press /
to search for a pipeline name
2. it redirects me to <http://localhost:3000/workspace/crm@repos.py/jobs/crm>
--> produces Unknown repository
error
but the actual path is <http://localhost:3000/workspace/baania_repository@repos.py/jobs/crm>
this is my workspace config:Francis
11/02/2021, 11:39 AMExecution of pipeline [x] failed. An exception was thrown during execution.
sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) database is locked
[SQL: INSERT INTO event_logs (run_id, event, dagster_event_type, timestamp, step_key, asset_key, partition) VALUES (?, ?, ?, ?, ?, ?, ?)]
Context,
• We’re running multiprocessing on multiple concurrent pipelines
• This looks like it happens almost at randomDaniel Salama
11/02/2021, 12:04 PMyield RunRequest(
run_key="somename-{}".format(time_stamps),
run_config={"solids": {"somename_step1": {"config": {"filename": "file"}}}},
)
can i get the run id that was created during the run ? i want to print logs to third party platform with the runid during the run .Nick Dellosa
11/02/2021, 12:38 PMpipelineRun
configuration? It doesn't look there's an option for it in the helm chart: https://github.com/dagster-io/dagster/blob/master/helm/dagster/values.yaml#L298Nathan Saccon
11/02/2021, 7:09 PM@op
def recreate_error() -> tuple[str, list[str]]:
return ("str1", ["str2", "str3", "str4"])
I get this error:
dagster.core.errors.DagsterTypeCheckDidNotPass: Type check failed for step output "result" - expected type "Tuple[String,[String],String,String,String]".
Is this the correct behavior here? This seems incorrectKoby Kilimnik
11/03/2021, 2:28 PMKoby Kilimnik
11/03/2021, 2:28 PMKoby Kilimnik
11/03/2021, 2:28 PMNicolas Gaillard
11/03/2021, 4:03 PM@repository
in order to redeploy it) ? I thought the "reload" button was for that but it seems not.Kirk Stennett
11/03/2021, 4:30 PMWill Gunadi
11/03/2021, 5:28 PMChris Chan
11/03/2021, 7:30 PMdagster-daemon
uses? Our deployment had allocated 1.5 GB and it’s constantly OOMing (which isn’t a problem since the all the schedules and sensors run fine) but I’m worried that memory usage scales with the number of schedules and sensorsMakoto
11/03/2021, 8:46 PMdagster_pandas
, and trying to figure out a good way to show which column is in violation of the constraints you specified. Is there an example somewhere? Just showing that the DataType failed is a bit hard to debug.Andy Chen
11/03/2021, 9:16 PM@op
def sort_branches(context):
<http://context.log.info|context.log.info>("hello " + os.getcwd())
branches_df = pd.read_csv("pipeline/pjm/raw/branches.csv")
return branches_df.sort_values(by=['branch_id'])
the path changes depending on whether its run from dagster cli, or in dagit.
Dagit launch execution logs: "hello /Users/andychen/dev/nira-data-pipeline/pipeline"
dagster job execute -f pipeline/pipeline/pjm/graphs/pjm_graph.py
logs: "sort_branches - hello /Users/andychen/dev/nira-data-pipeline"
This is messing stuff up because I need to read csv's from a particular file path in my project directory, but depending on how I run it, it can't find the file.
workspace.yaml:
# Workspaces specify where to find user code in order to load it for Dagit and the Dagster CLI.
# See our documentation overview on Workspaces: <https://docs.dagster.io/overview/repositories-workspaces/workspaces>
load_from:
- python_package: pipeline
Arturs Stramkals
11/03/2021, 9:33 PMdef foo_op(context, bar)
.Xiaohui Liu
11/04/2021, 1:43 AM