Sanidhya Singh
05/31/2022, 5:53 AMAbednego Santoso
05/31/2022, 9:43 AMjitter
and backoff
from with_retry_policy
function. I have already removed these but the gap still appears. Thank you!Thierry Hue
05/31/2022, 9:47 AMvalues.yaml
the following way, all is good
dagster:
dagster-user-deployments:
enabled: true
deployments:
- name: "my-data-pipelines"
image:
repository: "myrepo/mycontainer"
tag: local
The image is good:
image: "myrepo:local"
But when I try to use my version instead of local like 20220531.1:
dagster:
dagster-user-deployments:
enabled: true
deployments:
- name: "my-data-pipelines"
image:
repository: "myrepo/mycontainer"
tag: "20220531.1"
The image is wrong
image: "myrepo/mycontainer:\"20220531.1\""
and the image cannot be found in the repo because the image should be image: "myrepo/mycontainer:20220531.1"
This was working as expected in version 0.14.3
What should I do instead?Vlad Efanov
05/31/2022, 10:23 AMDaniel Mosesson
05/31/2022, 10:46 AMcontext
and can do the logging, etc, and the utility functions that don't take a context that can't do logging the same way (I suppose I could pass the context object around as well, but that is also not ideal.)
1. Is this a problem that writing an IO manager would help with? Most of what all of these functions are doing is helping store or retrieve data.
a. Most cases would be get/update a table, but there are some cases where I need to execute a custom query. Would that be possible?
2. How difficult is it to create an IO manager that does this? I looked at the code for fs_io_manager
(https://github.com/dagster-io/dagster/blob/8e8ee8537146aad35d3dd75e181f998fd989325f/python_modules/dagster/dagster/core/storage/fs_io_manager.py) and it looks straightforward, but is there something that makes my use case different?Mykola Palamarchuk
05/31/2022, 11:30 AMKobroli
05/31/2022, 11:52 AMdagster.yaml
(such that run concurrencies can be defined, but no storages are persisted to disk)?
Thanks a lot in advance!Kayvan Shah
05/31/2022, 1:09 PMfrom dagster import repository
from demo.jobs.say_hello import say_hello_job
from demo.schedules.my_hourly_schedule import my_hourly_schedule, my_schedule
from demo.sensors.my_sensor import my_sensor, mysensor
from demo.jobs.cereal_diamond import diamond
@repository
def demo():
"""
The repository definition for this demo Dagster repository.
For hints on building your Dagster repository, see our documentation overview on Repositories:
<https://docs.dagster.io/overview/repositories-workspaces/repositories>
"""
jobs = [say_hello_job]
schedules = [my_hourly_schedule]
sensors = [my_sensor]
return jobs + schedules + sensors
@repository
def cereals():
jobs = [diamond]
schedules = [my_schedule]
sensors = [mysensor]
return jobs + schedules + sensors
Martin Remy
05/31/2022, 1:37 PMop
on our EMR cluster.
I use the emr_pyspark_step_launcher
demonstrated in your documentation here. I strictly copied this code and simply replaced values to match my current EMR cluster.
Then I created a test case using Pytest to try and launch it.
My current version is :
import pytest
from dagster import execute_pipeline, reconstructable
from pipeline.data.jobs.pyspark_dagster import make_and_filter_data_emr
def test_emr():
result = execute_pipeline(reconstructable(make_and_filter_data_emr))
assert result.success
I've tried both with execute_pipeline
and execute_in_process
, I always have different errors raised by Dagster.
Is that documentation up to date ? Is it suppose to work out of the gate ? Do you have other documentation I could look into or a solution ?Samuel Stütz
05/31/2022, 1:40 PMField(
Noneable(list),
default_value=["GIT_HASH", "DAGSTER_VERSION"],
is_required=False,
description="Li.....",
),
I tried many different versions … all say invalid configurationLiezl Puzon
05/31/2022, 2:37 PMfield \"jobName\": Unknown field.
in graphql?Liezl Puzon
05/31/2022, 2:53 PMJordan
05/31/2022, 3:36 PMasset_event
I'm using an asset_sensor
for this but unfortunately only the last op that finishes makes the request. I have the impression that the context is overwritten each time an op executes. Is it possible to have several AssetMaterialisation
in the same context to make sure that all the requests are executed and not just one?
@op
def my_op_0(context):
for date in ["2022-05-29", "2022-05-30", "2022-05-31"]:
yield DynamicOutput(value = date, mapping_key = date.replace('-',''))
@op
def my_op_1(context, date : str):
context.log_event(
AssetMaterialization(asset_key="my_asset_0", partition = date)
)
@job()
def my_job():
my_op_0().map(my_op_1)
my_asset_job_0 = build_assets_job("my_asset_job_0", assets=[my_asset_0])
my_asset_job_1 = build_assets_job("my_asset_job_1", assets=[my_asset_1])
@asset_sensor(asset_key=AssetKey("my_asset_0"), job=my_asset_job_1)
def my_asset_sensor(context, asset_event):
partition = asset_event.dagster_event.event_specific_data.materialization.partition
request = my_asset_job_1.run_request_for_partition(partition_key=partition, run_key=context.cursor)
yield request
Hebo Yang
05/31/2022, 4:28 PMShriram Holla
05/31/2022, 5:19 PMos.system
within an op instead of a shell_op
? My use case didn’t really fit well with shell_op
.Shriram Holla
05/31/2022, 6:55 PMAaron Bailey
05/31/2022, 7:35 PMibrahem
05/31/2022, 7:40 PMGowtham Manne
05/31/2022, 8:11 PMDaniel Mosesson
05/31/2022, 9:13 PMCustomType
via config. I tried annotating CustomType
with @usable_as_dagster_type
and get the same error message:
Input <foo> in op 'myOp' is not connected to the output of a previous node and can not be loaded from configuration, making it impossible to execute. Possible solutions are:
• add a dagster_type_loader for type <CustomType>
• connect <foo> to the output of another node
I am trying to set this from a sensor if that makes senseZach
05/31/2022, 9:14 PMdagster_docker_dagit | Traceback (most recent call last):
dagster_docker_dagit | File "/usr/local/bin/dagit", line 5, in <module>
dagster_docker_dagit | from dagit.cli import main
dagster_docker_dagit | File "/usr/local/lib/python3.9/site-packages/dagit/cli.py", line 21, in <module>
dagster_docker_dagit | from .app import create_app_from_workspace_process_context
dagster_docker_dagit | File "/usr/local/lib/python3.9/site-packages/dagit/app.py", line 11, in <module>
dagster_docker_dagit | from .webserver import DagitWebserver
dagster_docker_dagit | File "/usr/local/lib/python3.9/site-packages/dagit/webserver.py", line 7, in <module>
dagster_docker_dagit | import nbformat
dagster_docker_dagit | File "/usr/local/lib/python3.9/site-packages/nbformat/__init__.py", line 9, in <module>
dagster_docker_dagit | from traitlets.log import get_logger
dagster_docker_dagit | File "/usr/local/lib/python3.9/site-packages/traitlets/__init__.py", line 5, in <module>
dagster_docker_dagit | from ._version import __version__, version_info
dagster_docker_dagit | File "/usr/local/lib/python3.9/site-packages/traitlets/_version.py", line 7, in <module>
dagster_docker_dagit | assert __version__ == (
the docker compose logs seem to cut off thereEric Larson
05/31/2022, 9:38 PMboto3 key not found
It looks like the storage of the assets for S3 each run makes a subdirectory and stores them in there, but when a new run kicks off it doesn’t look in the directory of the original run that produced the upstream asset.Spencer Guy
05/31/2022, 10:31 PMSon Giang
06/01/2022, 4:20 AMCaleb Fornari
06/01/2022, 7:37 AMKayvan Shah
06/01/2022, 9:18 AMKayvan Shah
06/01/2022, 9:58 AM/home/kayvan/projects/dagster/lib/python3.9/site-packages/dagster/core/definitions/composition.py:93: UserWarning: While in @job context 'ticker_single_pipeline', received an uninvoked op 'get_ticker_data'.
warnings.warn(warning_message.strip())
/home/kayvan/projects/dagster/lib/python3.9/site-packages/dagster/core/workspace/context.py:554: UserWarning: Error loading repository location demo:dagster.core.errors.DagsterInvalidDefinitionError: In @job ticker_single_pipeline, received invalid type <class 'str'> for input "start_date" (passed by keyword) in op invocation "get_ticker_data". Must pass the output from previous node invocations or inputs to the composition function as inputs when invoking nodes during composition.
Stack Trace:
File "/home/kayvan/projects/dagster/lib/python3.9/site-packages/dagster/grpc/server.py", line 224, in __init__
self._loaded_repositories = LoadedRepositories(
File "/home/kayvan/projects/dagster/lib/python3.9/site-packages/dagster/grpc/server.py", line 98, in __init__
loadable_targets = get_loadable_targets(
File "/home/kayvan/projects/dagster/lib/python3.9/site-packages/dagster/grpc/utils.py", line 53, in get_loadable_targets
else loadable_targets_from_python_package(package_name, working_directory)
File "/home/kayvan/projects/dagster/lib/python3.9/site-packages/dagster/core/workspace/autodiscovery.py", line 48, in loadable_targets_from_python_package
module = load_python_module(
File "/home/kayvan/projects/dagster/lib/python3.9/site-packages/dagster/core/code_pointer.py", line 136, in load_python_module
return importlib.import_module(module_name)
File "/usr/lib/python3.9/importlib/__init__.py", line 127, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
File "<frozen importlib._bootstrap>", line 1030, in _gcd_import
File "<frozen importlib._bootstrap>", line 1007, in _find_and_load
File "<frozen importlib._bootstrap>", line 986, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 680, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 850, in exec_module
File "<frozen importlib._bootstrap>", line 228, in _call_with_frames_removed
File "/home/kayvan/projects/dagster-demo/demo/demo/__init__.py", line 1, in <module>
from .repository import demo, yahoo_finance
File "/home/kayvan/projects/dagster-demo/demo/demo/repository.py", line 4, in <module>
from demo.jobs.yahoofin.multi_ticker import ticker_mutli_pipeline
File "/home/kayvan/projects/dagster-demo/demo/demo/jobs/yahoofin/multi_ticker.py", line 2, in <module>
from demo.jobs.yahoofin.single_ticker import ticker_single_pipeline
File "/home/kayvan/projects/dagster-demo/demo/demo/jobs/yahoofin/single_ticker.py", line 11, in <module>
def ticker_single_pipeline(ticker_dict: dict):
File "/home/kayvan/projects/dagster/lib/python3.9/site-packages/dagster/core/definitions/decorators/job_decorator.py", line 204, in job
return _Job()(name)
File "/home/kayvan/projects/dagster/lib/python3.9/site-packages/dagster/core/definitions/decorators/job_decorator.py", line 75, in __call__
) = do_composition(
File "/home/kayvan/projects/dagster/lib/python3.9/site-packages/dagster/core/definitions/composition.py", line 1012, in do_composition
output = fn(**kwargs)
File "/home/kayvan/projects/dagster-demo/demo/demo/jobs/yahoofin/single_ticker.py", line 12, in ticker_single_pipeline
data = get_ticker_data(ticker_dict, start_date="2022-05-01")
File "/home/kayvan/projects/dagster/lib/python3.9/site-packages/dagster/core/definitions/solid_definition.py", line 146, in __call__
return super(SolidDefinition, self).__call__(*args, **kwargs)
File "/home/kayvan/projects/dagster/lib/python3.9/site-packages/dagster/core/definitions/node_definition.py", line 162, in __call__
return PendingNodeInvocation(
File "/home/kayvan/projects/dagster/lib/python3.9/site-packages/dagster/core/definitions/composition.py", line 387, in __call__
self._process_argument_node(
File "/home/kayvan/projects/dagster/lib/python3.9/site-packages/dagster/core/definitions/composition.py", line 510, in _process_argument_node
raise DagsterInvalidDefinitionError(
warnings.warn(
LP
06/01/2022, 10:02 AMDagster Solid
and it fails with DagsterExecutionInterruptedError
at time.sleep(5). If API response time is fast it work properly but when API response time is slow it give me below error. Can anyone suggest me what wrong I am doing? Deployed on k8s and using 0.11.16.
dagster.core.errors.DagsterExecutionInterruptedError
File "/usr/local/lib/python3.8/site-packages/dagster/core/execution/plan/execute_plan.py", line 193, in _dagster_event_sequence_for_step
for step_event in check.generator(step_events):
File "/usr/local/lib/python3.8/site-packages/dagster/core/execution/plan/execute_step.py", line 308, in core_dagster_event_sequence_for_step
for user_event in check.generator(
File "/usr/local/lib/python3.8/site-packages/dagster/core/execution/plan/execute_step.py", line 63, in _step_output_error_checked_user_event_sequence
for user_event in user_event_sequence:
File "/usr/local/lib/python3.8/site-packages/dagster/core/execution/plan/compute.py", line 138, in execute_core_compute
for step_output in _yield_compute_results(step_context, inputs, compute_fn):
File "/usr/local/lib/python3.8/site-packages/dagster/core/execution/plan/compute.py", line 111, in _yield_compute_results
for event in iterate_with_context(
File "/usr/local/lib/python3.8/site-packages/dagster/utils/__init__.py", line 382, in iterate_with_context
next_output = next(iterator)
File "/dp/dp/pipeline/steps/ips/save_to.py", line 67, in save_to
ingestion_data(ingest_data, file)
File "/dp/dp/core/utils/task_ingestion.py", line 58, in ingestion_data
task_status = tm.post_task(spec)
File "/usr/local/lib/python3.8/site-packages/mypackage/data_access/task_manager.py", line 103, in post_task
task_status = self.check_for_status(res['task'])
File "/usr/local/lib/python3.8/site-packages/mypackage/data_access/task_manager.py", line 126, in check_for_status
time.sleep(5)
File "/usr/local/lib/python3.8/site-packages/dagster/utils/interrupts.py", line 78, in _new_signal_handler
raise error_cls()
Eric Larson
06/01/2022, 2:32 PMCharlie Bini
06/01/2022, 4:03 PMcontext.log
access to a resource?Charlie Bini
06/01/2022, 4:03 PMcontext.log
access to a resource?@resource
decoratorowen
06/01/2022, 4:13 PM@resource(...)
def my_resource(context):
return MyClass(..., logger=context.log)
should work!from dagster import get_dagster_logger
, and inside the body of the resource call get_dagster_logger().info("foo")
Charlie Bini
06/01/2022, 4:14 PMcontext
param on the resource def is the same one that contains config_schema
right?owen
06/01/2022, 4:17 PMresource_config
property (this is the resolved config value)config_schema
is the schema for that config blob, and will be inside the argument to the @resource
decoratorCharlie Bini
06/01/2022, 4:18 PM