Max
03/07/2022, 7:14 AMDAGSTER_HOME=/opt/dagster/dagster_home
. Copy the dagster.yml there (works just fine)
• workdir is /opt/dagster/app
. I copy all my files there, including /opt/dagster/app/my_project/repository.py
and the workspace.yaml
in the root
• my workspace contains python_file: relative_path: my_project/repository.py
. This loads correctly when I run dagit
locally but not in docker (module my_project was not found
)
What's the best practice there to make sure that the import works?Irven Aelbrecht
03/07/2022, 7:35 AMRay
? (https://github.com/ray-project/ray)
and for my understanding was there any reason to choose Dask
over Ray
for multi-processing?Chen Tsinovoy
03/07/2022, 8:25 AMfrom unittest.mock import MagicMock
mocked_http = http_async_downloader()
mocked_http.fetch_all = MagicMock(return_value=["4 /n hello"])
@job(resource_defs={"http": mocked_http})
def mocked_job():
print_htmls(download_urls(convert_cids_to_urls()))
if __name__ == "__main__":
# upload_pubchem_3d_conformers_dev_job.execute_in_process()
mocked_job.execute_in_process(run_config=config_from_files([file_relative_path(__file__, 'config.yaml')]))
but I got the following exception:
dagster.check.CheckError: Value in dictionary mismatches expected type for key http. Expected value of type <class 'dagster.core.definitions.resource_definition.ResourceDefinition'>. Got value <resources.aka_http.HttpAsyncDownloader object at 0x7fd8b05cc390> of type <class 'resources.aka_http.HttpAsyncDownloader'>.
what am I missing ?Sebastian Napiorkowski
03/07/2022, 1:09 PMSathish
03/07/2022, 3:25 PMGeorge Pearse
03/07/2022, 4:20 PMMarjori Pomarole
03/07/2022, 11:15 PMsqlalchemy.exc.OperationalError: (psycopg2.OperationalError) connection to server at "<http://core-shared-tools.cktverwlkjvp.us-east-1.rds.amazonaws.com|core-shared-tools.cktverwlkjvp.us-east-1.rds.amazonaws.com>" (10.20.177.69), port 5432 failed: FATAL: password authentication failed for user "dagster"
42
connection to server at "<http://core-shared-tools.cktverwlkjvp.us-east-1.rds.amazonaws.com|core-shared-tools.cktverwlkjvp.us-east-1.rds.amazonaws.com>" (10.20.177.69), port 5432 failed: FATAL: password authentication failed for user "dagster"
I know what you are thinking, have you verified that the password is correct? We have tried so many combinations of setting this password. autogenerated, from vault, plain text in the values.yaml file. With an external postgres instance, without. And it always says it is incorrect. I was wondering if anyone has hit a similar issue?
This is the values.yaml file we have now:Chris Nogradi
03/07/2022, 11:18 PMTyler Hillery
03/07/2022, 11:18 PMdagster.daemon.SchedulerDaemon - WARNING - Schedule stock_market_data_job_schedule was started from a location StockData.py that can no longer be found in the workspace, or has metadata that has changed since the schedule was started. You can turn off this schedule in the Dagit UI from the Status tab.
I confirmed in dagit that Daemon is running, clock symbol is green next to my job & I believe that I am using the same workspace.yaml.Stefan Adelbert
03/08/2022, 2:57 AMpython_logs
in dagster.yaml
. There is no logger available on SensorEvaluationContext
. I thought I could get a logger using dagster.get_dagster_logger
and use that in a sensor, but I tried that and it doesn't seem like the log messages are captured anywhere.
My key use case is getting notified if a sensor fails primarily so that I can be proactively notified if a job that should run (that a sensor would have kicked off) doesn't.
I looked into using a failure_hook
, but it doesn't seem to be designed to work on sensors.
Any tips?
https://docs.dagster.io/0.13.19/_apidocs/utilities#dagster.get_dagster_loggerjasono
03/08/2022, 4:28 AMStefan Adelbert
03/08/2022, 4:45 AMop A
produces some data that ops in graph C
depend on (data dependency).
op B1
, op B2
, op B3
, all need to complete before graph C
should start (order-based dependency). op B1
, op B2
, op B3
execution order is not important.
Can anyone suggest a way to build this graph please?Andrea Giardini
03/08/2022, 9:31 AMRomain
03/08/2022, 9:54 AMKalyan katamreddi
03/08/2022, 12:46 PM@op
def op_great_expectations(context, df: DataFrame):
ge = ge_validation_op_factory(
name='op_great_expecations',
datasource_name='test_data',
suite_name='basic.warning',
)(context=build_solid_context(
resources={'ge_data_context': ge_data_context},
config={
'resources': {
'ge_data_context': {
'resource_config': {'ge_root_dir': file_relative_path(__file__, '../kh_great_expectations')}
}
}}
))
res = ge(df)
if res['success']:
return df
return ValueError
Hi all,
we are trying to integrate ge_validation_op_factory to our dagster job. We are facing the below error.
great_expectations.exceptions.exceptions.ConfigNotFoundError: Error: No great_expectations directory was found here!
- Please check that you are in the correct directory or have specified the correct directory.
- If you have never run Great Expectations in this project, please run great_expectations init
to get started.
we are passing the correct path, but still facing the error. please do let us know if there are ways to resolve this error.
Note: 'df' would be the output from the previous opRubén Lopez Lozoya
03/08/2022, 1:11 PMMatthias Queitsch
03/08/2022, 1:45 PMJuan
03/08/2022, 3:08 PMGordon F
03/08/2022, 3:25 PM# orchestration.py
from dagster import job, op
from os import system
@op
def test():
system("ls")
@job
def run_pipeline():
test()
if __name__ == "__main__":
result = run_pipeline.execute_in_process()
# dagster job execute -f orchestration.py
This is my framework for extending it to run every 5 minutes. What am I missing from the script?
# orchestration.py
from dagster import job, op
from os import system
@op
def test():
system("ls")
@job
def run_pipeline():
test()
@schedule(
cron_schedule = "*/5 * * * *",
job = run_pipeline,
execution_timezone="US/Central",
)
def run_pipeline_schedule():
pass
if __name__ == "__main__":
pass
# dagster schedule start
# start dagster-daemon?
victor
03/08/2022, 3:46 PMK8sRunLauncher
with multiprocessing executor) but then execute some ops in their own pod (i.e., as if we were using the k8s_job_executor
). An option to define different executor for a @graph
within a pipeline would be awesome in terms of flexibility. Has this option been considered?Dylan Hunt
03/08/2022, 4:29 PMbuild_reconstructable_pipeline
which takes module name and function name as string to do the dynamic imports later using importlib.
It was working fine within sub directories and modules in Dagster 0.13.0 but now throwing below error in 0.14.3
It is pythonic execution
dagster.core.errors.DagsterImportError: Encountered ImportError: `No module named 'builder'` while importing module builder. If relying on the working directory to resolve modules, please explicitly specify the appropriate path using the `-d` or `--working-directory` for CLI based targets or the `working_directory` configuration option for workspace targets.
prha
03/08/2022, 5:06 PMefrat
03/08/2022, 6:44 PM@graph
def read_db_graph():
read_db_op()
sqlalchemy_cheese_job = read_db_graph.to_job(
resource_defs={"db_resource": sqlalchemy_resource},
config=config_from_files([file_relative_path(__file__, "config_cheesedb.yaml")]))
then - I want to get the data frame.
result_df = sqlalchemy_cheese_job.execute_in_process()
but I got an object : dagster.core.execution.execute_in_process_result.ExecuteInProcessResult object
what did I do wrong ?Ben Gatewood
03/08/2022, 11:19 PMJohn Cenzano-Fong
03/09/2022, 1:55 AMStefan Adelbert
03/09/2022, 6:20 AMdagster.core.errors.DagsterLaunchFailedError: Error during RPC setup for executing run: ImportError: cannot import name 'calculate_delay' from 'dagster.core.definitions.policy' (/opt/venv/lib/python3.9/site-packages/dagster/core/definitions/policy.py)
File "/opt/venv/lib/python3.9/site-packages/dagster/core/instance/__init__.py", line 1575, in launch_run self._run_launcher.launch_run(LaunchRunContext(pipeline_run=run, workspace=workspace)) File "/opt/venv/lib/python3.9/site-packages/dagster/core/launcher/default_run_launcher.py", line 106, in launch_run raise (
Any tips on what I might have done wrong?Sanat Mouli
03/09/2022, 7:05 AM@asset(required_resource_keys={'coinmetrics_client'},
io_manager_key='io_manager', dagster_type=AssetMetrics,
metadata={'dataset_id': CM_ASSETS_LIST_DSID})
def generate_assets(context, lst):
# This function takes a key and generates a dataframe which needs to be stored as an asset.
return asset_metric_df
def generate_subtasks(df: pd.DataFrame):
for row in df.iter_tuples():
yield(row[0])
def master_asset(df: pd.DataFrame):
# For each row in df, generate an asset
generate_subtasks(df).map(lambda x: generate_assets(x))
Frank Dekervel
03/09/2022, 11:33 AMJuan
03/09/2022, 2:06 PMScott Hood
03/09/2022, 4:23 PMcompute_logs
looks like one of the avialble resources in the AzureBlobComputeLogManager. Looking at the required inputs looks like it requires a storage account key. Is it possible to hook this up utilizing managed identity / service principal instead? If not is compute_logs also something that we can create our own custom solutions?