Daniel Suissa
01/04/2022, 8:01 PMpydevd_pycharm.settrace
at the top of my @job definition and using in_process_executor
but my job seems to hang when I do that. The debugger does connect though. Wondering if the Dagster contributors know anything about this or if anyone else in the community managed to do this.Ryan Riopelle
01/04/2022, 9:05 PMMark Kudryk
01/04/2022, 11:28 PMRyan Riopelle
01/05/2022, 12:28 AMRyan Riopelle
01/05/2022, 12:28 AM2022-01-04 23:45:38 - dagster - ERROR - hello_cereal_shield_job - b1817925-525e-4768-ae49-2937cf528ab7 - 610 - RUN_FAILURE - Execution of run for "hello_cereal_shield_job" failed. An exception was thrown during execution.
RuntimeError:
An attempt has been made to start a new process before the
current process has finished its bootstrapping phase.
This probably means that you are not using fork to start your
child processes and you have forgotten to use the proper idiom
in the main module:
if __name__ == '__main__':
freeze_support()
...
The "freeze_support()" line can be omitted if the program
is not going to be frozen to produce an executable.
Stack Trace:
File "/app/web/piper/hmc/dagster_cli_bin.runfiles/pypi__36__dagster_0_13_4_linux_x86_64/dagster/core/execution/api.py", line 774, in pipeline_execution_iterator
for event in pipeline_context.executor.execute(pipeline_context, execution_plan):
File "/app/web/piper/hmc/dagster_cli_bin.runfiles/pypi__36__dagster_0_13_4_linux_x86_64/dagster/core/executor/multiprocess.py", line 163, in execute
event_or_none = next(step_iter)
File "/app/web/piper/hmc/dagster_cli_bin.runfiles/pypi__36__dagster_0_13_4_linux_x86_64/dagster/core/executor/multiprocess.py", line 268, in execute_step_out_of_process
for ret in execute_child_process_command(command):
File "/app/web/piper/hmc/dagster_cli_bin.runfiles/pypi__36__dagster_0_13_4_linux_x86_64/dagster/core/executor/child_process_executor.py", line 140, in execute_child_process_command
process.start()
File "/usr/local/lib/python3.6/multiprocessing/process.py", line 105, in start
self._popen = self._Popen(self)
File "/usr/local/lib/python3.6/multiprocessing/context.py", line 284, in _Popen
return Popen(process_obj)
File "/usr/local/lib/python3.6/multiprocessing/popen_spawn_posix.py", line 32, in __init__
super().__init__(process_obj)
File "/usr/local/lib/python3.6/multiprocessing/popen_fork.py", line 19, in __init__
self._launch(process_obj)
File "/usr/local/lib/python3.6/multiprocessing/popen_spawn_posix.py", line 42, in _launch
prep_data = spawn.get_preparation_data(process_obj._name)
File "/usr/local/lib/python3.6/multiprocessing/spawn.py", line 143, in get_preparation_data
_check_not_importing_main()
File "/usr/local/lib/python3.6/multiprocessing/spawn.py", line 136, in _check_not_importing_main
is not going to be frozen to produce an executable.''')
DK
01/05/2022, 12:52 AMNilesh Pandey
01/05/2022, 11:59 AMgeoHeil
01/05/2022, 3:33 PMcontext.solid_def.input_defs[0].asset_key
which seems a bit strange/legacy and most importantly depending on the latest materialization of the asset (not considering its history). I would expect something like a = context.assets['my_asset_key']
to get hold of all the metadata for the asset - and possibly then a.location
to get the data about the location of the asset. Is this possible already? If not, what would the best way to get similar functionality to work?Bryan Chavez
01/05/2022, 8:51 PM@run_status_sensor(
pipeline_run_status=PipelineRunStatus.SUCCESS,
pipeline_selection=pipeline_names,
)
which seem strange since I can see the pipelines loaded in Dagit and the the pipeline for the sensor exists and can run. Any insight on what could be happening?
SensorDaemon - ERROR - Sensor daemon caught an error for sensor job_success_email_sensor : dagster.core.errors.DagsterInvariantViolationError: No jobs, pipelines, graphs, or repositories found in "kh_dagster".
Stack Trace:
File "C:\Users\<redacted>\Envs\<redacted>\lib\site-packages\dagster\grpc\server.py", line 205, in init
self._repository_symbols_and_code_pointers.load()
File "C:\Users\<redacted>\Envs\<redacted>\lib\site-packages\dagster\grpc\server.py", line 90, in load
self._loadable_repository_symbols = load_loadable_repository_symbols(
File "C:\Users\<redacted>\Envs\<redacted>\lib\site-packages\dagster\grpc\server.py", line 108, in load_loadable_repository_symbols
loadable_targets = get_loadable_targets(
File "C:\Users\<redacted>\Envs\<redacted>\lib\site-packages\dagster\grpc\utils.py", line 37, in get_loadable_targets
else loadable_targets_from_python_package(package_name)
File "C:\Users\<redacted>\Envs\<redacted>\lib\site-packages\dagster\core\workspace\autodiscovery.py", line 30, in loadable_targets_from_python_package
return loadable_targets_from_loaded_module(module)
File "C:\Users\<redacted>\Envs\<redacted>\lib\site-packages\dagster\core\workspace\autodiscovery.py", line 78, in loadable_targets_from_loaded_module
raise DagsterInvariantViolationError(Ryan Riopelle
01/05/2022, 11:00 PMPrratek Ramchandani
01/05/2022, 11:13 PMmake_job()
and am running into some trouble. i have a number of dagster jobs that vary only in resource config and so i have a python function that returns a JobDefinition
to create them. since the graph for each job is identical i have one graph defined using the @graph
decorator that gets calls in my make_job()
function but dagster doesn’t like that i now have multiple graphs with the same name.
i figured i’d name the graph using the “configured” API but looks like that requires that i also pass in config, which the graph doesn’t need. is there another way to name these graphs?Bernardo Cortez
01/06/2022, 10:33 AMBastiaan
01/06/2022, 11:01 AM@dagster.op(
name='stream_data',
config_schema={
'data_feed_url': str,
'batch_size': dagster.Field(int, default_value=500000),
},
out=dagster.DynamicOut(list)
)
def stream_data(context):
with urlopen(context.op_config['data_feed_url']) as feed:
objects = ijson.items(feed, 'item')
for batch in enumerate(batch_iterator(
objects,
batch_size=context.op_config['batch_size']
)):
yield dagster.DynamicOutput(
value=batch,
mapping_key=f'data_{i}'
)
I checked that running the generator in isolation (as a normal python function) does free up memory after yielding each batch.
Is there something I can do to make sure each dynamic output is dereferenced after the io_manager has handled it, or is this not possible with dagster?Daniel Suissa
01/06/2022, 11:32 AMBernardo Cortez
01/06/2022, 12:09 PMcontext.resources.(...)
George Pearse
01/06/2022, 2:13 PMcontext.ipdb.set_trace()
?Jonathon Mobley
01/06/2022, 2:40 PMBernardo Cortez
01/06/2022, 3:59 PM@asset_sensor
updated?Alisson Gustavo
01/06/2022, 4:40 PMCody Degen
01/06/2022, 10:14 PMMessage: (sqlite3.DatabaseError) database disk image is malformed
[SQL: SELECT event_logs.id, event_logs.event
FROM event_logs
WHERE event_logs.run_id = ? ORDER BY event_logs.id ASC
LIMIT ? OFFSET ?]
[parameters: ('f07ef76b-2e8e-44cb-990d-7b151f52462f', -1, 0)]
(Background on this error at: <http://sqlalche.me/e/14/4xp6>)
Path:
Locations:
Stack Trace:
File "/mnt/c/Users/Cody/Documents/SimplyAnalytics/simplyanalytics-data-analysis/venv/lib/python3.8/site-packages/rx/core/observablebase.py", line 67, in set_disposable
subscriber = self._subscribe_core(auto_detach_observer)
File "/mnt/c/Users/Cody/Documents/SimplyAnalytics/simplyanalytics-data-analysis/venv/lib/python3.8/site-packages/rx/core/anonymousobservable.py", line 20, in _subscribe_core
return self._subscribe(observer)
File "/mnt/c/Users/Cody/Documents/SimplyAnalytics/simplyanalytics-data-analysis/venv/lib/python3.8/site-packages/dagster_graphql/implementation/pipeline_run_storage.py", line 11, in __call__
events = self.instance.logs_after(self.run_id, self.after_cursor)
File "/mnt/c/Users/Cody/Documents/SimplyAnalytics/simplyanalytics-data-analysis/venv/lib/python3.8/site-packages/dagster/core/instance/__init__.py", line 997, in logs_after
return self._event_storage.get_logs_for_run(run_id, cursor=cursor, of_type=of_type)
File "/mnt/c/Users/Cody/Documents/SimplyAnalytics/simplyanalytics-data-analysis/venv/lib/python3.8/site-packages/dagster/core/storage/event_log/sql_event_log.py", line 199, in get_logs_for_run
events_by_id = self.get_logs_for_run_by_log_id(run_id, cursor, of_type)
File "/mnt/c/Users/Cody/Documents/SimplyAnalytics/simplyanalytics-data-analysis/venv/lib/python3.8/site-packages/dagster/core/storage/event_log/sql_event_log.py", line 166, in get_logs_for_run_by_log_id
results = conn.execute(query).fetchall()
File "/mnt/c/Users/Cody/Documents/SimplyAnalytics/simplyanalytics-data-analysis/venv/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1262, in execute
return meth(self, multiparams, params, _EMPTY_EXECUTION_OPTS)
File "/mnt/c/Users/Cody/Documents/SimplyAnalytics/simplyanalytics-data-analysis/venv/lib/python3.8/site-packages/sqlalchemy/sql/elements.py", line 324, in _execute_on_connection
return connection._execute_clauseelement(
File "/mnt/c/Users/Cody/Documents/SimplyAnalytics/simplyanalytics-data-analysis/venv/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1451, in _execute_clauseelement
ret = self._execute_context(
File "/mnt/c/Users/Cody/Documents/SimplyAnalytics/simplyanalytics-data-analysis/venv/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1813, in _execute_context
self._handle_dbapi_exception(
File "/mnt/c/Users/Cody/Documents/SimplyAnalytics/simplyanalytics-data-analysis/venv/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1994, in _handle_dbapi_exception
util.raise_(
File "/mnt/c/Users/Cody/Documents/SimplyAnalytics/simplyanalytics-data-analysis/venv/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 207, in raise_
raise exception
File "/mnt/c/Users/Cody/Documents/SimplyAnalytics/simplyanalytics-data-analysis/venv/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1770, in _execute_context
self.dialect.do_execute(
File "/mnt/c/Users/Cody/Documents/SimplyAnalytics/simplyanalytics-data-analysis/venv/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 717, in do_execute
cursor.execute(statement, parameters)
The above exception was the direct cause of the following exception:
Message: database disk image is malformed
Stack Trace:
File "/mnt/c/Users/Cody/Documents/SimplyAnalytics/simplyanalytics-data-analysis/venv/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1770, in _execute_context
self.dialect.do_execute(
File "/mnt/c/Users/Cody/Documents/SimplyAnalytics/simplyanalytics-data-analysis/venv/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 717, in do_execute
cursor.execute(statement, parameters)
Bryan Chavez
01/06/2022, 10:22 PMRyan Riopelle
01/07/2022, 12:06 AMShreyas Karnik
01/07/2022, 12:08 AMworkspace.yaml
file and reload dagit. All of our user code deployments run a grpc server. Is it possible to register new workspace and reload dagit via the API?Roel Hogervorst
01/07/2022, 8:14 AMgeoHeil
01/07/2022, 11:55 AMcontext.solid_def.input_defs[0].asset_key
is it possible to somehow connect to a dagit instance running on localhost - and then exploring the API interactively?Roel Hogervorst
01/07/2022, 12:40 PMproduction_job = graphname.to_job(
config= {"ops": {"op1":{"inputs":{"arg1":"bla", "ar2":"bla"}},
{"op2":{"inputs":{"arg1: "something"}
})
But that doesn't work. It does work for 1 op, but I get unexpected config entry
. I tried a list in stead of 2 dictionaries. But that is also not allowed.
What is the best way to pass the arguments to the ops from the job?Jessica Franks
01/07/2022, 12:57 PMRodrigo Baron
01/07/2022, 1:00 PMAndy Chen
01/07/2022, 2:26 PMraul
01/07/2022, 3:27 PM