Alex Rudolph
03/03/2022, 6:12 PMkubernetes.client.exceptions.ApiException: (404)
Reason: Not Found
HTTP response headers: HTTPHeaderDict({'Audit-Id': '6829c683-9321-4b55-9592-c94794c50c81', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Kubernetes-Pf-Flowschema-Uid': 'd62db2cc-fd32-4618-af78-d12b3e721ec9', 'X-Kubernetes-Pf-Prioritylevel-Uid': 'f91e6c2b-ed56-4c59-98f2-420aed375018', 'Date': 'Thu, 03 Mar 2022 17:51:34 GMT', 'Content-Length': '282'})
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"jobs.batch \"dagster-job-365e6e829dfcac3a5f67eb4970876776-2\" not found","reason":"NotFound","details":{"name":"dagster-job-365e6e829dfcac3a5f67eb4970876776-2","group":"batch","kind":"jobs"},"code":404}
File "/usr/local/lib/python3.7/site-packages/dagster/core/execution/api.py", line 775, in pipeline_execution_iterator
for event in pipeline_context.executor.execute(pipeline_context, execution_plan):
File "/usr/local/lib/python3.7/site-packages/dagster/core/executor/step_delegating/step_delegating_executor.py", line 205, in execute
plan_context, [step], active_execution
File "/usr/local/lib/python3.7/site-packages/dagster_k8s/executor.py", line 217, in check_step_health
job = self._batch_api.read_namespaced_job(namespace=self._job_namespace, name=job_name)
File "/usr/local/lib/python3.7/site-packages/kubernetes/client/api/batch_v1_api.py", line 1257, in read_namespaced_job
return self.read_namespaced_job_with_http_info(name, namespace, **kwargs) # noqa: E501
File "/usr/local/lib/python3.7/site-packages/kubernetes/client/api/batch_v1_api.py", line 1366, in read_namespaced_job_with_http_info
collection_formats=collection_formats)
File "/usr/local/lib/python3.7/site-packages/kubernetes/client/api_client.py", line 353, in call_api
_preload_content, _request_timeout, _host)
File "/usr/local/lib/python3.7/site-packages/kubernetes/client/api_client.py", line 184, in __call_api
_request_timeout=_request_timeout)
File "/usr/local/lib/python3.7/site-packages/kubernetes/client/api_client.py", line 377, in request
headers=headers)
File "/usr/local/lib/python3.7/site-packages/kubernetes/client/rest.py", line 243, in GET
query_params=query_params)
File "/usr/local/lib/python3.7/site-packages/kubernetes/client/rest.py", line 233, in request
raise ApiException(http_resp=r)
Jay Sharma
03/03/2022, 9:22 PMWill Gunadi
03/03/2022, 9:35 PMStefan Adelbert
03/03/2022, 9:55 PMdagster
inside a docker container as part of my development workflow. I'd like to be able to modify job code and then be able to run that job code from dagit
without having to rebuild the docker image and restart the repository container.
My idea is to bind mount the job code into the container, so then I can edit the code without needing to rebuild the container. I already do this for running unit tests continuously using pytest-watch
.
Is there a way to cause dagster api grpc --package-name <package>
to reload <package>
on the fly?Erik
03/04/2022, 12:28 AMChris Nogradi
03/04/2022, 12:48 AMMark Fickett
03/04/2022, 1:38 AMbranch_1
but then assign branch_2, branch_1 = branching_op()
in my @job
, then I get backwards execution.
# <https://docs.dagster.io/concepts/ops-jobs-graphs/jobs-graphs#conditional-branching>
import random
from dagster import Out, Output, job, op
@op(out={"branch_1": Out(is_required=False), "branch_2": Out(is_required=False)})
def branching_op():
num = 0
if num == 0:
yield Output(1, "branch_1")
else:
yield Output(2, "branch_2")
@op
def branch_1_op(_input):
pass
@op
def branch_2_op(_input):
pass
@job
def branching():
branch_2, branch_1 = branching_op() # Does this always return branch_1, branch_2? Why?
branch_1_op(branch_1)
branch_2_op(branch_2)
Mark Fickett
03/04/2022, 3:22 AMBen Gatewood
03/04/2022, 3:49 AMOperation name: JobMetadataQuery
Message: 'Select' object has no attribute 'filter'
Path: ["pipelineRunsOrError","results",0,"assets"]
Locations: [{"line":73,"column":3}]
Ben Gatewood
03/04/2022, 3:49 AMBen Gatewood
03/04/2022, 3:50 AMRobert Schmidtke
03/04/2022, 12:36 PMchrispc
03/04/2022, 1:54 PM"File "/usr/local/lib/python3.9/site-packages/dagster_docker/docker_run_launcher.py", line 83, in _get_docker_image
raise Exception(“No docker image specified by the instance config or repository”)” , my docker-compose :
version: "3.7"
services:
dagster_pipelines:
build:
context: .
dockerfile: ./Dockerfile_pipelines
container_name: dagster_pipelines
image: dagster_pipeline_image
restart: always
ports:
- "4000:4000"
env_file:
- .env
volumes:
- ./-----.json:/var/-----.json
networks:
- dagster_network
dagster_dagit:
build:
context: .
dockerfile: ./Dockerfile_dagster
entrypoint:
- dagit
- -h
- "0.0.0.0"
- -p
- "3000"
- -w
- workspace.yaml
container_name: dagster_dagit
image: dagster_app
expose:
- "3000"
ports:
- "3000:3000"
env_file:
- .env
volumes: # Make docker client accessible so we can terminate containers from dagit
- /var/run/docker.sock:/var/run/docker.sock
networks:
- dagster_network
depends_on:
- dagster_pipelines
dagster_daemon:
entrypoint:
- dagster-daemon
- run
container_name: dagster_daemon
image: dagster_app
restart: on-failure
env_file:
- .env
volumes: # Make docker client accessible so we can launch containers using host docker
- /var/run/docker.sock:/var/run/docker.sock
networks:
- dagster_network
depends_on:
- dagster_pipelines
networks:
dagster_network:
driver: bridge
name: dagster_network
Dockerfile_pipelines:
FROM python:3.9.10-slim
RUN pip install -U pip setuptools
RUN pip install --no-cache-dir\
pandas==1.3.3 \
dagster==0.14.2 \
dagster-graphql==0.14.2 \
dagit==0.14.2 \
dagster-postgres==0.14.2 \
dagster-docker==0.14.2 \
dagster-gcp==0.14.2 \
gcsfs==2022.2.0 \
dateparser==1.0.0 \
db-dtypes \
great-expectations==0.14.4 \
python-decouple==3.4 \
avro
WORKDIR /var
COPY -------.json ./
ENV GOOGLE_APPLICATION_CREDENTIALS=/var/-----.json
WORKDIR /opt/dagster/app/
# Add repository code
RUN mkdir CAPS
RUN mkdir great_expectations_root
RUN mkdir -p local_artifact_storage/storage/
COPY CAPS/ CAPS/
COPY great_expectations_root/ great_expectations_root/
COPY config/local_docker/dagster.yaml config/local_docker/workspace.yaml setup.py ./
ENV DAGSTER_HOME=/opt/dagster/app/
RUN pip install -e .
# Run dagster gRPC server on port 4000
EXPOSE 4000
# Using CMD rather than ENTRYPOINT allows the command to be overridden in
# run launchers or executors to run other commands using this image
CMD ["dagster", "api", "grpc", "-h", "0.0.0.0", "-p", "4000", "-f", "CAPS/caps.py"]
Dockerfile_dagster:
FROM python:3.9.10-slim
ENV DAGSTER_HOME=/opt/dagster/dagster_home/
ENV DAGSTER_PG_USERNAME=-----
ENV DAGSTER_PG_PASSWORD=-----
ENV DAGSTER_PG_HOST=----
ENV DAGSTER_PG_DB=---
RUN mkdir -p $DAGSTER_HOME
RUN pip install -U pip setuptools
RUN pip install --no-cache-dir\
dagster==0.14.2 \
dagster-graphql==0.14.2 \
dagit==0.14.2 \
dagster-postgres==0.14.2 \
dagster-docker==0.14.2 \
dagster-gcp==0.14.2
# Set $DAGSTER_HOME and copy dagster instance and workspace YAML there
COPY config/local_docker/dagster.yaml config/local_docker/workspace.yaml $DAGSTER_HOME
WORKDIR $DAGSTER_HOME
EXPOSE 3000
ENTRYPOINT ["dagit", "-h", "0.0.0.0", "-p", "3000"]
worskpace.yaml
load_from:
- grpc_server:
host: dagster_pipelines
port: 4000
location_name: "dagster_pipelines"
dagster.yaml
scheduler:
module: dagster.core.scheduler
class: DagsterDaemonScheduler
run_launcher:
module: dagster_docker
class: DockerRunLauncher
config:
env_vars:
- DAGSTER_PG_USERNAME
- DAGSTER_PG_PASSWORD
- DAGSTER_PG_HOST
- DAGSTER_PG_DB
network: dagster_network
container_kwargs:
auto_remove: true
run_coordinator:
module: dagster.core.run_coordinator
class: QueuedRunCoordinator
config:
max_concurrent_runs: 25
run_storage:
module: dagster_postgres.run_storage
class: PostgresRunStorage
config:
postgres_db:
username:
env: DAGSTER_PG_USERNAME
password:
env: DAGSTER_PG_PASSWORD
hostname:
env: DAGSTER_PG_HOST
db_name:
env: DAGSTER_PG_DB
port: 5432
Ant idea what is happening? Thank you in advance.Mark Fickett
03/04/2022, 4:18 PM.collect()
calls in a order dependency start
argument? Like this:
@op(
ins={"start": In(Nothing)}
)
def wait_for_many():
pass
@gaph
def mostly_parallel():
parallel_done = []
parallel_done.append(has_dynamic_out.map(do_one_thing))
parallel_done.append(has_dynamic_out.map(do_another_thing))
parallel_done.append(has_dynamic_out.map(do_different_thing))
wait_for_many(start=parallel_done)
When I try to do it as written above, I get: DagsterInvalidDefinitionError: In @graph mostly_parallel, received a list containing an invalid type at index 0 for input "start" (passed by keyword) in op invocation wait_for_many. Lists can only contain the output from previous solid invocations or input mappings, received <class 'list'>
(actual op names changed to match my minimal example).Alex Service
03/04/2022, 6:06 PMstorage_id
, is it guaranteed to be monotonically increasing? Sequential?Danny Jackowitz
03/04/2022, 8:44 PMupdate_cursor()
and yielded run requests? Is there anything we need to watch out for that could result in missed runs while using cursors (e.g. the cursor is updated but the run never actually happens)? Or is it guaranteed that the cursor update will never be recorded without the run request also?Alexander Verbitsky
03/04/2022, 8:44 PMStarted
status, but in the same time all tasks have already finished?Will Gunadi
03/05/2022, 12:05 AMAnoop Sharma
03/05/2022, 9:42 AMsashank
03/05/2022, 6:11 PM@graph
def my_graph():
dynamic_op().map(op_with_input)
> AttributeError: 'InvokedSolidOutputHandle' object has no attribute 'map'
@graph
def my_graph():
dynamic_op.map(op_with_input)
> AttributeError: 'OpDefinition' object has no attribute 'map'
Tyler Hillery
03/05/2022, 11:56 PMThis process will periodically check for any running schedules and launch their associated runs. If you leave this process running, it will launch a new run for your schedule each day at the expected time.
jasono
03/06/2022, 7:36 AMdagster-ge
seems to support the old version of Great Expectations (config version 2). Any idea when it will be updated to v3?efrat
03/06/2022, 8:20 AMgeoHeil
03/06/2022, 4:23 PMjasono
03/06/2022, 4:38 PMinvalid argument: dagster_home\\logs\\event.log'
error when running dagit -f hello_cereal.py
. I don’t get any error if it’s run by python hello_cereal.py
. Can someone please take a look at the error message below?
If I try to read event.log
file manually, I get an unknown file system error.Stefan Adelbert
03/06/2022, 10:38 PM@op(ins={"start": In(Nothing), "data": In(Any)})
def C(context, start, data):
# do stuff
I get the following error
dagster.core.errors.DagsterInvalidDefinitionError: @op 'C' decorated function has parameter 'start' that is one of the input_defs of type 'Nothing' which should not be included since no data will be passed for it.
Any tips?Stefan Adelbert
03/07/2022, 5:25 AM@sensor
doesn't support providing resources.
Any tips on reusing resources with sensors?Max
03/07/2022, 7:14 AMDAGSTER_HOME=/opt/dagster/dagster_home
. Copy the dagster.yml there (works just fine)
• workdir is /opt/dagster/app
. I copy all my files there, including /opt/dagster/app/my_project/repository.py
and the workspace.yaml
in the root
• my workspace contains python_file: relative_path: my_project/repository.py
. This loads correctly when I run dagit
locally but not in docker (module my_project was not found
)
What's the best practice there to make sure that the import works?Irven Aelbrecht
03/07/2022, 7:35 AMRay
? (https://github.com/ray-project/ray)
and for my understanding was there any reason to choose Dask
over Ray
for multi-processing?Chen Tsinovoy
03/07/2022, 8:25 AMfrom unittest.mock import MagicMock
mocked_http = http_async_downloader()
mocked_http.fetch_all = MagicMock(return_value=["4 /n hello"])
@job(resource_defs={"http": mocked_http})
def mocked_job():
print_htmls(download_urls(convert_cids_to_urls()))
if __name__ == "__main__":
# upload_pubchem_3d_conformers_dev_job.execute_in_process()
mocked_job.execute_in_process(run_config=config_from_files([file_relative_path(__file__, 'config.yaml')]))
but I got the following exception:
dagster.check.CheckError: Value in dictionary mismatches expected type for key http. Expected value of type <class 'dagster.core.definitions.resource_definition.ResourceDefinition'>. Got value <resources.aka_http.HttpAsyncDownloader object at 0x7fd8b05cc390> of type <class 'resources.aka_http.HttpAsyncDownloader'>.
what am I missing ?