Aaron Smith
10/15/2021, 5:29 PM@solid()
def fan_in(context, results_dict):
...
@pipeline()
def ppln():
results = {solid.name: solid() for solid in solids_list}
final = fan_in(results)
Is this kind of thing possible? Or is this an antipattern? I get a error that indicated to me that Dagster can’t parse this as a DAG appropriately.
dagster.core.errors.DagsterInvalidDefinitionError: In @pipeline process_tables, received invalid type <class 'dict'> for input "table_dict" (at position 0) in solid invocation "write_all_tables". Must pass the output from previous solid invocations or inputs to the composition function as inputs when invoking solids during composition.
Aaron Smith
10/15/2021, 5:58 PMList[solid_output_type]
but not for example Dict[str,solid_output_type]
Loc Nguyen
10/16/2021, 7:09 PMmarcos
10/17/2021, 7:06 PMgrpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
status = StatusCode.RESOURCE_EXHAUSTED
details = "Received message larger than max (10985123 vs. 10485760)"
debug_error_string = "{"created":"@1634491696.811821461","description":"Received message larger than max (10985123 vs. 10485760)","file":"src/core/ext/filters/message_size/message_size_filter.cc","file_line":206,"grpc_status":8}"
Aleksandr Filippov
10/18/2021, 3:19 AMfrom dagster import solid, pipeline, InputDefinition, Nothing
@solid
def solid1():
pass
@solid
def solid2():
pass
@solid
def solid3(s1=[InputDefinition("start", Nothing)], s2=[InputDefinition("start", Nothing)]):
return 1
@pipeline
def pl():
solid3(solid1(), solid2())
Jonathan PHOON
10/18/2021, 5:59 AMdagit
Traceback (most recent call last):
File "/Users/jon/python_env/dagster/bin/dagit", line 5, in <module>
from dagit.cli import main
File "/Users/jon/python_env/dagster/lib/python3.8/site-packages/dagit/cli.py", line 22, in <module>
from .app import create_app_from_workspace_process_context
File "/Users/jon/python_env/dagster/lib/python3.8/site-packages/dagit/app.py", line 23, in <module>
from .subscription_server import DagsterSubscriptionServer
File "/Users/jon/python_env/dagster/lib/python3.8/site-packages/dagit/subscription_server.py", line 4, in <module>
from graphql_ws.gevent import GeventSubscriptionServer, SubscriptionObserver
ImportError: cannot import name 'SubscriptionObserver' from 'graphql_ws.gevent' (/Users/jon/python_env/dagster/lib/python3.8/site-packages/graphql_ws/gevent.py)
Does anyone know why?Martin Carlsson
10/18/2021, 11:37 AMsourabh upadhye
10/18/2021, 12:26 PMMartin Carlsson
10/18/2021, 3:20 PMfrom
parameters between each run.
Are there any Dagster feature that can keep this information, or are there a best practice - or does anyone have an advice :)Kirk Stennett
10/18/2021, 4:27 PMDmitry Mogilevsky
10/18/2021, 6:54 PMPeter Bayerle
10/18/2021, 7:33 PMdagit
command, I'm met with an Error loading repository location repo_name:dagster.serdes.ipc.DagsterIPCProtocolError: Timeout: read stream has not received any data in 60 seconds
. Does anyone have experience with this error? What could be the issue?Grigoriy Sterin
10/18/2021, 7:33 PMbotocore.exceptions.NoCredentialsError: Unable to locate credentials
even though I have a proper IAM task role assigned to the ECS task. I also tried providing AWS credentials via environment variables, but had the same result. FWIW besides the problem with sensors the daemon works fine and I'm able to call AWS services from inside the solids without any problemsMichael Russell
10/18/2021, 9:44 PMRubén Lopez Lozoya
10/18/2021, 10:32 PMLevan
10/19/2021, 10:19 AMworkplace.yml
for deploy_docker example?
When I try adding
load_from:
# - grpc_server:
# host: docker_example_pipelines
# port: 4000
# location_name: "example_pipelines"
- python_file:
relative_path: repo.py
executable_path: /opt/conda/envs/test/bin/python
Run queue gives error Caught an error for run c59de7e6-0b8e-48e4-a45a-86e33ccc0881 while removing it from the queue. Marking the run as failed and dropping it from the queue: Exception: No docker image specified by the instance config or repository
esztermarton
10/19/2021, 10:39 AMMark Fickett
10/19/2021, 1:44 PMSara
10/19/2021, 1:49 PMdaniel blinick
10/19/2021, 1:59 PMGabe Calvo
10/19/2021, 2:01 PMAndy H
10/19/2021, 3:38 PMMohit.ASingh
10/19/2021, 6:30 PMfrom dagster import ModeDefinition, pipeline, graph
from dagster_databricks import databricks_client, create_databricks_job_op
sparkpi = create_databricks_job_op().configured(
{
"job": {
"name": "Python job",
"new_cluster": {
"spark_version": "7.3.x-scala2.12",
"node_type_id": "i3.xlarge",
"num_workers": 2,
},
"notebook_task": {"notebook_path": "/Users/mohit.asingh@gmail.com/E1"},
}
},
name="sparkpi"
)
@pipeline(
mode_defs=[
ModeDefinition(
resource_defs={
"databricks_client": databricks_client.configured(
{"host": "***<some_value>***.<http://azuredatabricks.net|azuredatabricks.net>",
"token": "***<token>***"}
)
}
)
]
)
def my_pipeline():
sparkpi()
while running i am getting this error can anyone tell did i miss any config?
2021-10-19 23:46:17 - dagster - DEBUG - my_pipeline - 18c90b7b-7d41-4d43-868d-bc7ea3f07915 - 3788 - PIPELINE_START - Started execution of pipeline "my_pipeline".
2021-10-19 23:46:17 - dagster - DEBUG - my_pipeline - 18c90b7b-7d41-4d43-868d-bc7ea3f07915 - 3788 - ENGINE_EVENT - Executing steps in process (pid: 3788)
2021-10-19 23:46:17 - dagster - DEBUG - my_pipeline - 18c90b7b-7d41-4d43-868d-bc7ea3f07915 - 3788 - sparkpi - ENGINE_EVENT - Starting initialization of resources [databricks_client, io_manager].
2021-10-19 23:46:17 - dagster - DEBUG - my_pipeline - 18c90b7b-7d41-4d43-868d-bc7ea3f07915 - 3788 - sparkpi - ENGINE_EVENT - Finished initialization of resources [databricks_client, io_manager].
2021-10-19 23:46:17 - dagster - DEBUG - my_pipeline - 18c90b7b-7d41-4d43-868d-bc7ea3f07915 - 3788 - sparkpi - LOGS_CAPTURED - Started capturing logs for solid: sparkpi.
2021-10-19 23:46:17 - dagster - DEBUG - my_pipeline - 18c90b7b-7d41-4d43-868d-bc7ea3f07915 - 3788 - sparkpi - STEP_START - Started execution of step "sparkpi".
2021-10-19 23:46:17 - dagster - ERROR - my_pipeline - 18c90b7b-7d41-4d43-868d-bc7ea3f07915 - 3788 - sparkpi - STEP_FAILURE - Execution of step "sparkpi" failed.
dagster.core.errors.DagsterExecutionStepExecutionError: Error occurred while executing solid "sparkpi"::
TypeError: submit_run() got an unexpected keyword argument 'name'
Stack Trace:
File "/Users/Mohit.Asingh/PycharmProjects/Dagster/venv/lib/python3.7/site-packages/dagster/core/execution/plan/utils.py", line 44, in solid_execution_error_boundary
yield
File "/Users/Mohit.Asingh/PycharmProjects/Dagster/venv/lib/python3.7/site-packages/dagster/utils/__init__.py", line 383, in iterate_with_context
next_output = next(iterator)
File "/Users/Mohit.Asingh/PycharmProjects/Dagster/venv/lib/python3.7/site-packages/dagster/core/execution/plan/compute_generator.py", line 65, in _coerce_solid_compute_fn_to_iterator
result = fn(context, **kwargs) if context_arg_provided else fn(**kwargs)
File "/Users/Mohit.Asingh/PycharmProjects/Dagster/venv/lib/python3.7/site-packages/dagster_databricks/solids.py", line 173, in databricks_fn
run_id = databricks_client.submit_run(**job_config)
File "/Users/Mohit.Asingh/PycharmProjects/Dagster/venv/lib/python3.7/site-packages/dagster_databricks/databricks.py", line 33, in submit_run
return self.client.jobs.submit_run(*args, **kwargs)["run_id"] # pylint: disable=no-member
Andy H
10/19/2021, 10:20 PMAleksandr Filippov
10/20/2021, 3:43 AMfrom dagster import solid, pipeline, InputDefinition, Nothing, execute_pipeline, ModeDefinition, fs_io_manager, reconstructable, DagsterInstance
@solid
def solid1():
pass
@solid
def solid2():
pass
@solid
def solid3(input_defs=[InputDefinition("start", Nothing)]):
pass
@pipeline(mode_defs=[ModeDefinition(resource_defs={"io_manager": fs_io_manager})])
def test_pipeline():
solid3([solid1(), solid2()])
if __name__ == "__main__":
result = execute_pipeline(
reconstructable(test_pipeline),
run_config={"execution": {"multiprocess": {}}},
instance=DagsterInstance.local_temp(),
)
The problem is that the pipeline is executed from system interpreter, ignoring virtual environment from which the script is executed. How to run this pipeline using virtual environment?Francis Niu
10/20/2021, 9:35 AMdbt_cli_resource
so that they can be referenced in dbt profiles? like this: https://github.com/dagster-io/dagster/blob/master/examples/dbt_example/dbt_example_project/profiles.ymlNilesh Pandey
10/20/2021, 12:01 PMuserDeployments:
enabled: true
deployments:
- name: "my-user-code-1"
image:
repository: "<------>"
tag: latest
pullPolicy: Always
dagsterApiGrpcArgs:
- "-f"
- "/my_repo/repo.py"
port: 3030
Sara
10/20/2021, 12:31 PMfrom dagster import solid, pipeline, resource, ModeDefinition
class Velocity:
def car_speed():
speed = 200
return speed
@resource
def select_speed(init_context):
return Velocity()
high_speed_mode = ModeDefinition(
"speed_mode",
resource_defs= {
"speed": select_speed <-------------------
},
)
@solid(required_resource_keys={"speed_mode"})
def print_velocity(context):
pass
speed = ??????????? <---------------------------------
<http://context.log.info|context.log.info>(speed)
@pipeline(mode_defs=[high_speed_mode])
def my_pipeline():
print_velocity()
Greetings!Jay Sharma
10/20/2021, 2:46 PM@schedule(cron_schedule="0 * * * *", pipeline_name="my_pipeline", execution_timezone="US/Eastern")
def my_schedule():
return {}
Not sure what else I have to do here, do I need to pass this schedule to my repository for it to be defined?
Thanks a lot!Martim Passos
10/20/2021, 3:09 PMDynamicOutput
-specific metadata to an OutputContext
? Say I want to yield 3 or 4 JSON objects that should be stored on s3 by an IOManager. How can I tell it which key to use, mime type to set, ACL, etc? I can return a dict containing this stuff as the DynamicOutput
value
and parse it inside the IO Manager, just wondering if there’s a cleaner way to do it