Jim Abraham
09/23/2022, 1:23 PMpeay
09/23/2022, 3:11 PMowen
09/23/2022, 4:37 PMF(endpoint, schema, some config) -> List[AssetsDefinition]
which you can apply multiple times with different parameters. I think this pattern would end up pretty clean, but happy to talk through details if there are sticking pointsMartin O'Leary
09/23/2022, 5:38 PMMetadataValue
class to emit compiled SQL and have it print nicely in the asset catalog?Dusty Shapiro
09/23/2022, 6:56 PMDagsterUserCodeUnreachableError
indicative of a communication error between the User Code and the application, or could it be syntax?Spencer Guy
09/23/2022, 8:08 PM@op(
description="Get data shard1",
required_resource_keys={"db_resource1"}
)
def get_data_shard1(context):
query = "select * from table"
db = context.resources.db_resource1
df = db.fetch_data_by_query(query)
return df
@op(
description="Get data shard2",
required_resource_keys={"db_resource2"}
)
def get_data_shard2(context):
query = "select * from table"
db = context.resources.db_resource2
df = db.fetch_data_by_query(query)
return df
@op(description="Merge cross shard data")
def merge_data(context, df1, df2):
merged = pd.concat([df1, df2])
return merged
@job(
resource_defs={
"db_resource1": db_resource.configured({"database": "shard1"}),
"db_resource2": db_resource.configured({"database": "shard2"}),
}
)
def multi_shard_data_job():
df1 = get_data_shard1()
df2 = get_data_shard2()
merged = merge_data(df1, df2)
Thanks for reading!Yang
09/23/2022, 10:51 PM@job
def my_job()
op_many_inputs(connected1=input1, connected3=input2)
@op
def op_many_inputs(connected1:str, unconnected2: int, connected3: int):
Nadav Ben-Haim
09/25/2022, 6:46 AMEugenio Contreras
09/25/2022, 10:17 AM@slack_on_failure
on one of my dags, and I want to customize the message,
Is there a way to obtain the full stack trace from a HookContext?
If I try with `context.op_exception`; it just prints the error but not the full stack trace that I can see in the UI. For example:
'bool' object is not subscriptable
But I want something like this:
dagster.core.errors.DagsterExecutionStepExecutionError: Error occurred while executing op "exception_op":
File "/usr/local/lib/python3.7/site-packages/dagster/core/execution/plan/execute_plan.py", line 222, in _dagster_event_sequence_for_step
for step_event in check.generator(step_events):
File "/usr/local/lib/python3.7/site-packages/dagster/core/execution/plan/execute_step.py", line 343, in core_dagster_event_sequence_for_step
_step_output_error_checked_user_event_sequence(step_context, user_event_sequence)
File "/usr/local/lib/python3.7/site-packages/dagster/core/execution/plan/execute_step.py", line 69, in _step_output_error_checked_user_event_sequence
for user_event in user_event_sequence:
File "/usr/local/lib/python3.7/site-packages/dagster/core/execution/plan/compute.py", line 169, in execute_core_compute
for step_output in _yield_compute_results(step_context, inputs, compute_fn):
File "/usr/local/lib/python3.7/site-packages/dagster/core/execution/plan/compute.py", line 146, in _yield_compute_results
user_event_generator,
File "/usr/local/lib/python3.7/site-packages/dagster/utils/__init__.py", line 400, in iterate_with_context
return
File "/usr/local/lib/python3.7/contextlib.py", line 130, in __exit__
self.gen.throw(type, value, traceback)
File "/usr/local/lib/python3.7/site-packages/dagster/core/execution/plan/utils.py", line 78, in solid_execution_error_boundary
) from e
The above exception was caused by the following exception:
TypeError: 'bool' object is not subscriptable
File "/usr/local/lib/python3.7/site-packages/dagster/core/execution/plan/utils.py", line 47, in solid_execution_error_boundary
yield
File "/usr/local/lib/python3.7/site-packages/dagster/utils/__init__.py", line 398, in iterate_with_context
next_output = next(iterator)
File "/usr/local/lib/python3.7/site-packages/dagster/core/execution/plan/compute_generator.py", line 65, in _coerce_solid_compute_fn_to_iterator
result = fn(context, **kwargs) if context_arg_provided else fn(**kwargs)
File "/opt/dagster/app/batch/dagster_error_test.py", line 25, in exception_op
test = context.op_config["exception"]
Thanks!Geoffrey Greenleaf
09/25/2022, 10:28 PMAlessandro Cantarelli
09/26/2022, 10:12 AMJesper Bagge
09/26/2022, 10:59 AM@asset(metadata={'database':'db','table':'tab'})
And then fetching it through context.metadata['database']
be a good idea?Igor
09/26/2022, 2:53 PMEmily
09/26/2022, 3:15 PMIssac Loo
09/26/2022, 3:24 PMMatt O
09/26/2022, 3:57 PMKonrad Schlatte
09/26/2022, 5:17 PM@op(required_resource_keys={"dbt"})
def dbt_deps_op(context):
context.resources.dbt.cli("deps")
I am using rpc-server and I'm getting this error on the dbt server
"API Exception: {'type': 'InternalException', 'args': ('No matching handler found for rpc method deps (which=deps)',)
This is my resource
test_dbt_rpc_sync_resource = dbt_rpc_sync_resource.configured({
"host": { "env": "DBT_HOST" },
"port": { "env": "DBT_PORT" }
})
Nathan Saccon
09/26/2022, 7:09 PMdagster_home/storage
folder?
Ideally I would like to remove all data associated with runs >= 10 days old.Yuya Nakada
09/27/2022, 1:41 AMKatrin Grunert
09/27/2022, 9:23 AMRunStatusSensor
as I seem to be able to inspect only one job-state at a time, but I would need to monitor multiple jobs’ runtimes.
This issue I run into with the code example below, is that I am not able to retrieve the run_stats for a job. No error is being logged and no code seems to be executed after trying to retrieve run_stats via the DagsterInstance found in the op-context.
What could that issue be here? btw, I am running dagster version 1.0.7
from dagster import DagsterInstance, DagsterRun
@op()
def op_notify_long_runtime_jobs(context):
instance: DagsterInstance = context.instance
runs: Iterable[DagsterRun] = filter(
lambda run: run.status == DagsterRunStatus.STARTED and
run.job_name != 'notify_long_runtime_jobs',
instance.get_runs())
def evaluate_run_time(r):
print(f"{r.job_name} | {r.run_id} | {r.status} | {type(r)}") # anything below, is not being printed
print(f"{instance.get_run_stats(r.run_id)}")
run_stats = instance.get_run_stats(r.run_id)
elapsed_time = time.time() - run_stats.start_time
if elapsed_time > 20: # TODO Configurable Limit in seconds
print(f"Long running job {r.run_id}! Run for {elapsed_time} seconds ") # TODO run slack notification
[evaluate_run_time(r) for r in runs]
@job()
def notify_long_runtime_jobs():
op_notify_long_runtime_jobs()
peay
09/27/2022, 9:33 AMrepoA
at repository location location
I'd like to pre-pend location
, repoA
to prefix_list
. I use different repository locations for different branches of my code (e.g., one repository location for the user code deployment for develop
, one repository location for the user code deployment for feature-1234
, etc.), and it'd be great to allow these to coexist easily.
I've seen utilities like prefix_assets but I am wondering whether there's a way to do this automatically for all repositories, taking the location into account as well.Joseph McCartin
09/27/2022, 10:24 AMPavel Schudel
09/27/2022, 11:03 AMMegan Beckett
09/27/2022, 12:17 PMrun_config
to a schedule function. My set up looks like this:
pipelines
├── data_pipeline
│ ├── __init__.py
│ ├── repo.py
| ├── schedules.py
| ├── config
│ ├── run_config.yaml
The following schedule
is defined in the schedules.py
file:
@schedule(job=update_db_metadata_job, execution_timezone='Africa/Johannesburg', cron_schedule="08 14 27 * *")
def metadata_schedule_dev():
return RunRequest(
run_key=None,
run_config=config_from_files(
[file_relative_path(__file__, "/config/run_config.yaml")]
),
)
However, when the schedule executes, I get the following error about not being able to produce a result:
dagster._core.errors.DagsterInvariantViolationError: File or glob pattern "/config/run_config.yaml" for "config_files" produced no results.
File "/usr/local/lib/python3.8/site-packages/dagster/_grpc/impl.py", line 267, in get_external_schedule_execution
return schedule_def.evaluate_tick(schedule_context)
File "/usr/local/lib/python3.8/site-packages/dagster/_core/definitions/schedule_definition.py", line 500, in evaluate_tick
result = list(ensure_gen(execution_fn(context)))
File "/usr/local/lib/python3.8/site-packages/dagster/_core/definitions/decorators/schedule_decorator.py", line 139, in _wrapped_fn
result = fn() # type: ignore
File "/dagster/dhis2_pipeline/schedules.py", line 32, in metadata_schedule_dev
run_config=config_from_files(
File "/usr/local/lib/python3.8/site-packages/dagster/_core/definitions/utils.py", line 153, in config_from_files
raise DagsterInvariantViolationError(
I have tried several variations of supplying the config via a file but can't get it to work. A similar method used to work when I supplied the config as above with config_from_files
and using file_relative_path
to the job but I want to move the config to be defined in the schedule rather.
Any suggestions how to do this?Alessandro Cantarelli
09/27/2022, 2:53 PMGuru Prasath
09/27/2022, 3:29 PMGrigoriy Sterin
09/27/2022, 3:57 PMDict
on a job level?
I wonder if I'm doing something wrong, since when when I'm trying to run this job:
from typing import Any, Dict
from dagster import job, op
from dagster_aws.s3 import s3_pickle_io_manager, s3_resource
@op
def test_op(context, parameters_dict: Dict[str, Any]):
<http://context.log.info|context.log.info>(parameters_dict)
@job(resource_defs={
"io_manager": s3_pickle_io_manager,
"s3": s3_resource,
})
def test_job(parameters_dict: Dict[str, Any]):
test_op(parameters_dict)
With the following parameters:
inputs:
parameters_dict:
param_1: value1
param_2: 2
resources:
io_manager:
config:
s3_bucket: my-bucket
I'm getting the following error:
dagster.core.errors.DagsterTypeLoadingError: Error occurred while loading input "parameters_dict" of step "test_op":
File "/home/gsterin/.local/lib/python3.8/site-packages/dagster/core/execution/plan/execute_plan.py", line 224, in dagster_event_sequence_for_step
for step_event in check.generator(step_events):
File "/home/gsterin/.local/lib/python3.8/site-packages/dagster/core/execution/plan/execute_step.py", line 316, in core_dagster_event_sequence_for_step
step_input.source.load_input_object(step_context, input_def)
File "/home/gsterin/.local/lib/python3.8/site-packages/dagster/core/execution/plan/inputs.py", line 627, in load_input_object
return dagster_type.loader.construct_from_config_value(step_context, config_data)
File "/usr/lib/python3.8/contextlib.py", line 131, in __exit__
self.gen.throw(type, value, traceback)
File "/home/gsterin/.local/lib/python3.8/site-packages/dagster/core/errors.py", line 191, in user_code_error_boundary
raise error_cls(
The above exception was caused by the following exception:
AttributeError: 'str' object has no attribute 'items'
File "/home/gsterin/.local/lib/python3.8/site-packages/dagster/core/errors.py", line 184, in user_code_error_boundary
yield
File "/home/gsterin/.local/lib/python3.8/site-packages/dagster/core/execution/plan/inputs.py", line 627, in load_input_object
return dagster_type.loader.construct_from_config_value(step_context, config_data)
File "/home/gsterin/.local/lib/python3.8/site-packages/dagster/core/types/python_dict.py", line 38, in construct_from_config_value
runtime_value[key] = self._value_dagster_type.loader.construct_from_config_value(
File "/home/gsterin/.local/lib/python3.8/site-packages/dagster/core/types/config_schema.py", line 156, in construct_from_config_value
return self._func(context, config_value)
File "/home/gsterin/.local/lib/python3.8/site-packages/dagster/core/types/builtin_config_schemas.py", line 52, in _any_input_schema
return load_type_input_schema_dict(config_value)
File "/home/gsterin/.local/lib/python3.8/site-packages/dagster/core/types/builtin_config_schemas.py", line 35, in load_type_input_schema_dict
file_type, file_options = list(value.items())[0]
And with scalar inputs the same code works just fineGeoffrey Greenleaf
09/27/2022, 4:21 PMFileNotFoundError: [Errno 2] No such file or directory: '/home/leaf/code/origo-analytics/astra/tmpa7a0jq5q/history/runs/dc112b44-3e6a-4c89-a114-657d28380d17.db-wal'
File "/home/leaf/.cache/pypoetry/virtualenvs/astra-DF50hgcZ-py3.8/lib/python3.8/site-packages/dagster/_core/execution/plan/execute_plan.py", line 224, in dagster_event_sequence_for_step
for step_event in check.generator(step_events):
File "/home/leaf/.cache/pypoetry/virtualenvs/astra-DF50hgcZ-py3.8/lib/python3.8/site-packages/dagster_databricks/databricks_pyspark_step_launcher.py", line 193, in launch_step
self._upload_artifacts(log, step_run_ref, run_id, step_key)
File "/home/leaf/.cache/pypoetry/virtualenvs/astra-DF50hgcZ-py3.8/lib/python3.8/site-packages/dagster_databricks/databricks_pyspark_step_launcher.py", line 398, in _upload_artifacts
build_pyspark_zip(zip_local_path, self.local_dagster_job_package_path)
File "/home/leaf/.cache/pypoetry/virtualenvs/astra-DF50hgcZ-py3.8/lib/python3.8/site-packages/dagster_pyspark/utils.py", line 21, in build_pyspark_zip
zf.write(abs_fname, os.path.relpath(os.path.join(root, fname), path))
File "/usr/local/lib/python3.8/zipfile.py", line 1741, in write
zinfo = ZipInfo.from_file(filename, arcname,
File "/usr/local/lib/python3.8/zipfile.py", line 523, in from_file
st = os.stat(filename)
Tom Reilly
09/27/2022, 8:31 PMQueuedRunCoordinator
max_concurrent_runs
? We are using EcsRunLauncher
backed by a fargate cluster so our compute is flexibleVimal Selvadurai Christopher
09/27/2022, 9:06 PM