Devaraj Nadiger
06/16/2021, 6:20 AMPeter B
06/16/2021, 7:26 AMDmitry Mogilevsky
06/16/2021, 9:25 AMAlejandro DE LA CRUZ LOPEZ
06/16/2021, 12:04 PMKS
06/16/2021, 12:46 PMAPI
and create pipeline on spot.Daniel Kim
06/16/2021, 2:37 PMScott Peters
06/16/2021, 3:18 PMdagster
would need to have dagster
installed. For instance, I have a bunch of classes written to access rclone
endpoints as part of a larger docker app
. I assume, to write pipelines for that framework, I would have to have all of dagster
included in the list of dependencies when building that product?Scott Peters
06/16/2021, 3:19 PMdagster
install is, and this seems obvious, but I assume this would bloat any tooling we are writing. it also makes that code directly dependent on dagster
Michel Rouly
06/16/2021, 7:03 PM@sensor
, and I have the same function (copy/paste) passed into an explicit SensorDefinition
call as the evaluation_fn
.
But the decorated function is working, whereas the SensorDefinition
instance I'm constructing is not doing anything. Specifically, it sets up and runs as a sensor, but it's not invoking the evaluation_fn
as far as I can tell.
Any idea what I need to do before calling SensorDefinition
if I'm not able to use @sensor
? Would be super helpful 🙂Scott Peters
06/16/2021, 9:55 PMstr
, but it seems that just passing a dict to a pipeline is way out of line with the core logic of `dagster`s resources` ... seems there has to be a shorter path than something like this?:
@solid(
config_schema={'endpoint' :str, 'collection': str, 'asset_type': str, 'asset_name':str},
required_resource_keys={'endpoint', 'collection', 'asset_type', 'asset_name'}
)
def get_path(context) -> str:
endpoint = context.resources.endpoint
collection = context.resources.collection
asset_type = context.resources.asset_type
asset_name = context.resources.asset_name
path = f'{endpoint}/{collection}/{asset_type}/{asset_name}'
return path
@resource(config_schema=str)
def endpoint(init_context):
return init_context.resource_config
@resource(config_schema=str)
def collection(init_context):
return init_context.resource_config
@resource(config_schema=str)
def asset_type(init_context):
return init_context.resource_config
@resource(config_schema=str)
def asset_name(init_context):
return init_context.resource_config
@pipeline(
mode_defs=[ModeDefinition(
resource_defs=
{
'endpoint': endpoint,
'collection': collection,
'asset_type': asset_type,
'asset_name': asset_name,
}
)
]
)
def create():
path = get_path()
....
Scott Peters
06/16/2021, 10:20 PMdagster
will be running as a background service, where end users are simply passing arguments to some kind of cli
. Are there methods or practices within dagster
that lower the front-end complexity such that we don't have to map every argument so explicitly? This is especially necessary for our tooling since the methods will always required user input for every run and will not likely be tied to any schedulerGabriel Milan
06/16/2021, 10:59 PMCaught an error for run 49895d5a-ad57-494a-a310-ce6115cdc193 while removing it from the queue. Marking the run as failed and dropping it from the queue: grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
status = StatusCode.DEADLINE_EXCEEDED
details = "Deadline Exceeded"
debug_error_string = "{"created":"@1623854325.910854270","description":"Error received from peer ipv4:172.19.0.2:4000","file":"src/core/lib/surface/call.cc","file_line":1066,"grpc_message":"Deadline Exceeded","grpc_status":4}"
>
Stack Trace:
File "/usr/local/lib/python3.8/site-packages/dagster/daemon/run_coordinator/queued_run_coordinator_daemon.py", line 154, in run_iteration
self._dequeue_run(instance, run, workspace)
File "/usr/local/lib/python3.8/site-packages/dagster/daemon/run_coordinator/queued_run_coordinator_daemon.py", line 206, in _dequeue_run
location = workspace.get_location(repository_location_origin)
File "/usr/local/lib/python3.8/site-packages/dagster/cli/workspace/dynamic_workspace.py", line 36, in get_location
location = existing_location if existing_location else origin.create_location()
File "/usr/local/lib/python3.8/site-packages/dagster/core/host_representation/origin.py", line 251, in create_location
return GrpcServerRepositoryLocation(self)
File "/usr/local/lib/python3.8/site-packages/dagster/core/host_representation/repository_location.py", line 478, in __init__
list_repositories_response = sync_list_repositories_grpc(self.client)
File "/usr/local/lib/python3.8/site-packages/dagster/api/list_repositories.py", line 13, in sync_list_repositories_grpc
api_client.list_repositories(), (ListRepositoriesResponse, SerializableErrorInfo)
File "/usr/local/lib/python3.8/site-packages/dagster/grpc/client.py", line 143, in list_repositories
res = self._query("ListRepositories", api_pb2.ListRepositoriesRequest)
File "/usr/local/lib/python3.8/site-packages/dagster/grpc/client.py", line 89, in _query
response = getattr(stub, method)(request_type(**kwargs), timeout=timeout)
File "/usr/local/lib/python3.8/site-packages/grpc/_channel.py", line 946, in __call__
return _end_unary_response_blocking(state, call, False, None)
File "/usr/local/lib/python3.8/site-packages/grpc/_channel.py", line 849, in _end_unary_response_blocking
raise _InactiveRpcError(state)
If you need further information about our setup, I'll be more than happy to share them. Thanks in advance!David
06/17/2021, 1:14 AM<http://context.log.info|context.log.info>("...")
then the message shows up in the pipeline run log in Dagit. However the solid is also making calls into legacy python code that uses the standard logging
module, and those logs don’t show up. Is there a way to make those logs visible in Dagit (without having to modify the legacy code to use context.log
)?Arun Kumar
06/17/2021, 2:59 AMprint
statement with in the sensors. But I am not able to find them in the dagster-daemon
logs. Is there any other place that I need to look into to find the stdout
logs from sensors?Peter B
06/17/2021, 5:51 AMAdrian Rumpold
06/17/2021, 10:38 AMMarco
06/17/2021, 11:48 AM[DagsterApiServer] Pipeline execution process for xxx unexpectedly exited with exit code -11
Célio de Assis Picanço Filho
06/17/2021, 1:27 PMChristian Lam
06/17/2021, 6:34 PM0.11.11
.Noah Sanor
06/17/2021, 7:10 PMhardcoded_resource
, which looks to be exactly what I want, but from the docs, it looks like it is meant to be used for mocks in testing. Is there a recommended approach for this?Oliver
06/18/2021, 5:34 AMPeter B
06/18/2021, 8:54 AMBen Torvaney
06/18/2021, 1:18 PMModuleNotFoundError: No module named 'dagster.core.definitions.event_metadata'
on today’s releases - is anyone else having the same issue?Martim Passos
06/18/2021, 4:22 PMrun_config
and Dagit’s playground or the way to go is using Presets
?Ruth Ikegah
06/19/2021, 8:35 AMScott Peters
06/20/2021, 11:04 PMdocker_deploy
example from the dagster
repo and was wondering about how one might send commands to a containerized application running on dagster
?
As in, I will have users that want to run a pipeline from their local terminal with some arguments: {'my_arg': 5}
to say a service called my_service
, which is containerized and available to dagit
... are there any docs on how to forward those instructions?
The assumption here would be that they are not connected to / have access to the server side dagit
, ie - some basic cli
or other client
for inputScott Peters
06/21/2021, 1:58 AMdocker_deploy
example from the latest release, but even though the docker-compose up
command seems to launch everything, my new pipeline doesn't seem to show up in the available repos:Scott Peters
06/21/2021, 2:00 AMScott Peters
06/21/2021, 2:05 AMrepo.py
seem to be fine, as dagster
doesn't complain about them when running docker-compose -f docker-compose.yml up
, but I must be missing something about how the mapping between the workspace.yaml
and where dagster
finds the docker images / repos?
Currently, the repo.py
for beekeeper_pipelines
is copied over to /opt/dagster/app
in it's container, and the workdir
is set appropiately
Also, I'm assuming that adding new containers to the running dagit
/ dagster daemon
is as simple as starting the new_service
container with the proper args and joining the proper docker network? ( without having to add the new service to this example compose file, rebuild and up? )
- keep running instance of dagster/dagit
- pull new_repo container
- start new_repo container on the dagster docker network
- add new_repo container to workspace.yaml
- refresh repos within dagit
??
Somasundaram Sekar
06/21/2021, 8:46 AM