Sanidhya Singh05/31/2022, 5:53 AM
Abednego Santoso05/31/2022, 9:43 AM
function. I have already removed these but the gap still appears. Thank you!
Thierry Hue05/31/2022, 9:47 AM
the following way, all is good
The image is good:
dagster: dagster-user-deployments: enabled: true deployments: - name: "my-data-pipelines" image: repository: "myrepo/mycontainer" tag: local
But when I try to use my version instead of local like 20220531.1:
The image is wrong
dagster: dagster-user-deployments: enabled: true deployments: - name: "my-data-pipelines" image: repository: "myrepo/mycontainer" tag: "20220531.1"
and the image cannot be found in the repo because the image should be
This was working as expected in version 0.14.3 What should I do instead?
Vlad Efanov05/31/2022, 10:23 AM
Daniel Mosesson05/31/2022, 10:46 AM
and can do the logging, etc, and the utility functions that don't take a context that can't do logging the same way (I suppose I could pass the context object around as well, but that is also not ideal.) 1. Is this a problem that writing an IO manager would help with? Most of what all of these functions are doing is helping store or retrieve data. a. Most cases would be get/update a table, but there are some cases where I need to execute a custom query. Would that be possible? 2. How difficult is it to create an IO manager that does this? I looked at the code for
(https://github.com/dagster-io/dagster/blob/8e8ee8537146aad35d3dd75e181f998fd989325f/python_modules/dagster/dagster/core/storage/fs_io_manager.py) and it looks straightforward, but is there something that makes my use case different?
Mykola Palamarchuk05/31/2022, 11:30 AM
Kobroli05/31/2022, 11:52 AM
(such that run concurrencies can be defined, but no storages are persisted to disk)? Thanks a lot in advance!
Kayvan Shah05/31/2022, 1:09 PM
from dagster import repository from demo.jobs.say_hello import say_hello_job from demo.schedules.my_hourly_schedule import my_hourly_schedule, my_schedule from demo.sensors.my_sensor import my_sensor, mysensor from demo.jobs.cereal_diamond import diamond @repository def demo(): """ The repository definition for this demo Dagster repository. For hints on building your Dagster repository, see our documentation overview on Repositories: <https://docs.dagster.io/overview/repositories-workspaces/repositories> """ jobs = [say_hello_job] schedules = [my_hourly_schedule] sensors = [my_sensor] return jobs + schedules + sensors @repository def cereals(): jobs = [diamond] schedules = [my_schedule] sensors = [mysensor] return jobs + schedules + sensors
Martin Remy05/31/2022, 1:37 PM
on our EMR cluster. I use the
demonstrated in your documentation here. I strictly copied this code and simply replaced values to match my current EMR cluster. Then I created a test case using Pytest to try and launch it. My current version is :
I've tried both with
import pytest from dagster import execute_pipeline, reconstructable from pipeline.data.jobs.pyspark_dagster import make_and_filter_data_emr def test_emr(): result = execute_pipeline(reconstructable(make_and_filter_data_emr)) assert result.success
, I always have different errors raised by Dagster. Is that documentation up to date ? Is it suppose to work out of the gate ? Do you have other documentation I could look into or a solution ?
Samuel Stütz05/31/2022, 1:40 PM
I tried many different versions … all say invalid configuration
Field( Noneable(list), default_value=["GIT_HASH", "DAGSTER_VERSION"], is_required=False, description="Li.....", ),
Liezl Puzon05/31/2022, 2:37 PM
field \"jobName\": Unknown field.
Liezl Puzon05/31/2022, 2:53 PM
Jordan05/31/2022, 3:36 PM
I'm using an
for this but unfortunately only the last op that finishes makes the request. I have the impression that the context is overwritten each time an op executes. Is it possible to have several
in the same context to make sure that all the requests are executed and not just one?
@op def my_op_0(context): for date in ["2022-05-29", "2022-05-30", "2022-05-31"]: yield DynamicOutput(value = date, mapping_key = date.replace('-','')) @op def my_op_1(context, date : str): context.log_event( AssetMaterialization(asset_key="my_asset_0", partition = date) ) @job() def my_job(): my_op_0().map(my_op_1) my_asset_job_0 = build_assets_job("my_asset_job_0", assets=[my_asset_0]) my_asset_job_1 = build_assets_job("my_asset_job_1", assets=[my_asset_1]) @asset_sensor(asset_key=AssetKey("my_asset_0"), job=my_asset_job_1) def my_asset_sensor(context, asset_event): partition = asset_event.dagster_event.event_specific_data.materialization.partition request = my_asset_job_1.run_request_for_partition(partition_key=partition, run_key=context.cursor) yield request
Hebo Yang05/31/2022, 4:28 PM
Shriram Holla05/31/2022, 5:19 PM
within an op instead of a
? My use case didn’t really fit well with
Shriram Holla05/31/2022, 6:55 PM
Aaron Bailey05/31/2022, 7:35 PM
ibrahem05/31/2022, 7:40 PM
Gowtham Manne05/31/2022, 8:11 PM
Daniel Mosesson05/31/2022, 9:13 PM
via config. I tried annotating
and get the same error message: Input <foo> in op 'myOp' is not connected to the output of a previous node and can not be loaded from configuration, making it impossible to execute. Possible solutions are: • add a dagster_type_loader for type <CustomType> • connect <foo> to the output of another node I am trying to set this from a sensor if that makes sense
Zach05/31/2022, 9:14 PM
the docker compose logs seem to cut off there
dagster_docker_dagit | Traceback (most recent call last): dagster_docker_dagit | File "/usr/local/bin/dagit", line 5, in <module> dagster_docker_dagit | from dagit.cli import main dagster_docker_dagit | File "/usr/local/lib/python3.9/site-packages/dagit/cli.py", line 21, in <module> dagster_docker_dagit | from .app import create_app_from_workspace_process_context dagster_docker_dagit | File "/usr/local/lib/python3.9/site-packages/dagit/app.py", line 11, in <module> dagster_docker_dagit | from .webserver import DagitWebserver dagster_docker_dagit | File "/usr/local/lib/python3.9/site-packages/dagit/webserver.py", line 7, in <module> dagster_docker_dagit | import nbformat dagster_docker_dagit | File "/usr/local/lib/python3.9/site-packages/nbformat/__init__.py", line 9, in <module> dagster_docker_dagit | from traitlets.log import get_logger dagster_docker_dagit | File "/usr/local/lib/python3.9/site-packages/traitlets/__init__.py", line 5, in <module> dagster_docker_dagit | from ._version import __version__, version_info dagster_docker_dagit | File "/usr/local/lib/python3.9/site-packages/traitlets/_version.py", line 7, in <module> dagster_docker_dagit | assert __version__ == (
Eric Larson05/31/2022, 9:38 PM
It looks like the storage of the assets for S3 each run makes a subdirectory and stores them in there, but when a new run kicks off it doesn’t look in the directory of the original run that produced the upstream asset.
boto3 key not found
Spencer Guy05/31/2022, 10:31 PM
Son Giang06/01/2022, 4:20 AM
Caleb Fornari06/01/2022, 7:37 AM
Kayvan Shah06/01/2022, 9:18 AM
Kayvan Shah06/01/2022, 9:58 AM
/home/kayvan/projects/dagster/lib/python3.9/site-packages/dagster/core/definitions/composition.py:93: UserWarning: While in @job context 'ticker_single_pipeline', received an uninvoked op 'get_ticker_data'. warnings.warn(warning_message.strip()) /home/kayvan/projects/dagster/lib/python3.9/site-packages/dagster/core/workspace/context.py:554: UserWarning: Error loading repository location demo:dagster.core.errors.DagsterInvalidDefinitionError: In @job ticker_single_pipeline, received invalid type <class 'str'> for input "start_date" (passed by keyword) in op invocation "get_ticker_data". Must pass the output from previous node invocations or inputs to the composition function as inputs when invoking nodes during composition. Stack Trace: File "/home/kayvan/projects/dagster/lib/python3.9/site-packages/dagster/grpc/server.py", line 224, in __init__ self._loaded_repositories = LoadedRepositories( File "/home/kayvan/projects/dagster/lib/python3.9/site-packages/dagster/grpc/server.py", line 98, in __init__ loadable_targets = get_loadable_targets( File "/home/kayvan/projects/dagster/lib/python3.9/site-packages/dagster/grpc/utils.py", line 53, in get_loadable_targets else loadable_targets_from_python_package(package_name, working_directory) File "/home/kayvan/projects/dagster/lib/python3.9/site-packages/dagster/core/workspace/autodiscovery.py", line 48, in loadable_targets_from_python_package module = load_python_module( File "/home/kayvan/projects/dagster/lib/python3.9/site-packages/dagster/core/code_pointer.py", line 136, in load_python_module return importlib.import_module(module_name) File "/usr/lib/python3.9/importlib/__init__.py", line 127, in import_module return _bootstrap._gcd_import(name[level:], package, level) File "<frozen importlib._bootstrap>", line 1030, in _gcd_import File "<frozen importlib._bootstrap>", line 1007, in _find_and_load File "<frozen importlib._bootstrap>", line 986, in _find_and_load_unlocked File "<frozen importlib._bootstrap>", line 680, in _load_unlocked File "<frozen importlib._bootstrap_external>", line 850, in exec_module File "<frozen importlib._bootstrap>", line 228, in _call_with_frames_removed File "/home/kayvan/projects/dagster-demo/demo/demo/__init__.py", line 1, in <module> from .repository import demo, yahoo_finance File "/home/kayvan/projects/dagster-demo/demo/demo/repository.py", line 4, in <module> from demo.jobs.yahoofin.multi_ticker import ticker_mutli_pipeline File "/home/kayvan/projects/dagster-demo/demo/demo/jobs/yahoofin/multi_ticker.py", line 2, in <module> from demo.jobs.yahoofin.single_ticker import ticker_single_pipeline File "/home/kayvan/projects/dagster-demo/demo/demo/jobs/yahoofin/single_ticker.py", line 11, in <module> def ticker_single_pipeline(ticker_dict: dict): File "/home/kayvan/projects/dagster/lib/python3.9/site-packages/dagster/core/definitions/decorators/job_decorator.py", line 204, in job return _Job()(name) File "/home/kayvan/projects/dagster/lib/python3.9/site-packages/dagster/core/definitions/decorators/job_decorator.py", line 75, in __call__ ) = do_composition( File "/home/kayvan/projects/dagster/lib/python3.9/site-packages/dagster/core/definitions/composition.py", line 1012, in do_composition output = fn(**kwargs) File "/home/kayvan/projects/dagster-demo/demo/demo/jobs/yahoofin/single_ticker.py", line 12, in ticker_single_pipeline data = get_ticker_data(ticker_dict, start_date="2022-05-01") File "/home/kayvan/projects/dagster/lib/python3.9/site-packages/dagster/core/definitions/solid_definition.py", line 146, in __call__ return super(SolidDefinition, self).__call__(*args, **kwargs) File "/home/kayvan/projects/dagster/lib/python3.9/site-packages/dagster/core/definitions/node_definition.py", line 162, in __call__ return PendingNodeInvocation( File "/home/kayvan/projects/dagster/lib/python3.9/site-packages/dagster/core/definitions/composition.py", line 387, in __call__ self._process_argument_node( File "/home/kayvan/projects/dagster/lib/python3.9/site-packages/dagster/core/definitions/composition.py", line 510, in _process_argument_node raise DagsterInvalidDefinitionError( warnings.warn(
LP06/01/2022, 10:02 AM
and it fails with
at time.sleep(5). If API response time is fast it work properly but when API response time is slow it give me below error. Can anyone suggest me what wrong I am doing? Deployed on k8s and using 0.11.16.
dagster.core.errors.DagsterExecutionInterruptedError File "/usr/local/lib/python3.8/site-packages/dagster/core/execution/plan/execute_plan.py", line 193, in _dagster_event_sequence_for_step for step_event in check.generator(step_events): File "/usr/local/lib/python3.8/site-packages/dagster/core/execution/plan/execute_step.py", line 308, in core_dagster_event_sequence_for_step for user_event in check.generator( File "/usr/local/lib/python3.8/site-packages/dagster/core/execution/plan/execute_step.py", line 63, in _step_output_error_checked_user_event_sequence for user_event in user_event_sequence: File "/usr/local/lib/python3.8/site-packages/dagster/core/execution/plan/compute.py", line 138, in execute_core_compute for step_output in _yield_compute_results(step_context, inputs, compute_fn): File "/usr/local/lib/python3.8/site-packages/dagster/core/execution/plan/compute.py", line 111, in _yield_compute_results for event in iterate_with_context( File "/usr/local/lib/python3.8/site-packages/dagster/utils/__init__.py", line 382, in iterate_with_context next_output = next(iterator) File "/dp/dp/pipeline/steps/ips/save_to.py", line 67, in save_to ingestion_data(ingest_data, file) File "/dp/dp/core/utils/task_ingestion.py", line 58, in ingestion_data task_status = tm.post_task(spec) File "/usr/local/lib/python3.8/site-packages/mypackage/data_access/task_manager.py", line 103, in post_task task_status = self.check_for_status(res['task']) File "/usr/local/lib/python3.8/site-packages/mypackage/data_access/task_manager.py", line 126, in check_for_status time.sleep(5) File "/usr/local/lib/python3.8/site-packages/dagster/utils/interrupts.py", line 78, in _new_signal_handler raise error_cls()
Eric Larson06/01/2022, 2:32 PM
Charlie Bini06/01/2022, 4:03 PM
access to a resource?
Charlie Bini06/01/2022, 4:03 PM
access to a resource?
owen06/01/2022, 4:13 PM
@resource(...) def my_resource(context): return MyClass(..., logger=context.log)
, and inside the body of the resource call
from dagster import get_dagster_logger
Charlie Bini06/01/2022, 4:14 PM
param on the resource def is the same one that contains
owen06/01/2022, 4:17 PM
property (this is the resolved config value)
is the schema for that config blob, and will be inside the argument to the
Charlie Bini06/01/2022, 4:18 PM