Melle Minderhoud
07/06/2021, 9:24 AM["dagster","api","execute_run","{\"__class__\": \"ExecuteRunArgs\", \"instance_ref\": {\"__class__\": \"InstanceRef\", \"compute_logs_data\": {\"__class__\": \"ConfigurableClassData\", \"class_name\": \"LocalComputeLogManager\", \"config_yaml\": \"base_dir: /opt/dagster/dagster_home/storage\\n\", \"module_name\": \"dagster.core.storage.local_compute_log_manager\"}, \"custom_instance_class_data\": null, \"event_storage_data\": {\"__class__\": \"ConfigurableClassData\", \"class_name\": \"PostgresEventLogStorage\", \"config_yaml\": \"postgres_db:\\n db_name:\\n env: DAGSTER_POSTGRES_DB\\n hostname:\\n env: DAGSTER_POSTGRES_HOSTNAME\\n password:\\n env: DAGSTER_POSTGRES_PASSWORD\\n port: 5432\\n username:\\n env: DAGSTER_POSTGRES_USER\\n\", \"module_name\": \"dagster_postgres.event_log\"}, \"local_artifact_storage_data\": {\"__class__\": \"ConfigurableClassData\", \"class_name\": \"LocalArtifactStorage\", \"config_yaml\": \"base_dir: /opt/dagster/dagster_home/\\n\", \"module_name\": \"dagster.core.storage.root\"}, \"run_coordinator_data\": {\"__class__\": \"ConfigurableClassData\", \"class_name\": \"QueuedRunCoordinator\", \"config_yaml\": \"{}\\n\", \"module_name\": \"dagster.core.run_coordinator\"}, \"run_launcher_data\": {\"__class__\": \"ConfigurableClassData\", \"class_name\": \"EcsRunLauncher\", \"config_yaml\": \"{}\\n\", \"module_name\": \"dagster_aws.ecs\"}, \"run_storage_data\": {\"__class__\": \"ConfigurableClassData\", \"class_name\": \"PostgresRunStorage\", \"config_yaml\": \"postgres_db:\\n db_name:\\n env: DAGSTER_POSTGRES_DB\\n hostname:\\n env: DAGSTER_POSTGRES_HOSTNAME\\n password:\\n env: DAGSTER_POSTGRES_PASSWORD\\n port: 5432\\n username:\\n env: DAGSTER_POSTGRES_USER\\n\", \"module_name\": \"dagster_postgres.run_storage\"}, \"schedule_storage_data\": {\"__class__\": \"ConfigurableClassData\", \"class_name\": \"PostgresScheduleStorage\", \"config_yaml\": \"postgres_db:\\n db_name:\\n env: DAGSTER_POSTGRES_DB\\n hostname:\\n env: DAGSTER_POSTGRES_HOSTNAME\\n password:\\n env: DAGSTER_POSTGRES_PASSWORD\\n port: 5432\\n username:\\n env: DAGSTER_POSTGRES_USER\\n\", \"module_name\": \"dagster_postgres.schedule_storage\"}, \"scheduler_data\": {\"__class__\": \"ConfigurableClassData\", \"class_name\": \"DagsterDaemonScheduler\", \"config_yaml\": \"{}\\n\", \"module_name\": \"dagster.core.scheduler\"}, \"settings\": {\"telemetry\": null}}, \"pipeline_origin\": {\"__class__\": \"PipelinePythonOrigin\", \"pipeline_name\": \"[deleted]\", \"repository_origin\": {\"__class__\": \"RepositoryPythonOrigin\", \"code_pointer\": {\"__class__\": \"FileCodePointer\", \"fn_name\": \"my_repository\", \"python_file\": \"repo.py\", \"working_directory\": \"[deleted]\"}, \"container_image\": \[deleted]\", \"executable_path\": \"/opt/pysetup/.venv/bin/python\"}}, \"pipeline_run_id\": \"018515a6-02f2-4441-a23e-25cedb785ee0\"}"]
I deployed dagit using the deploy_ecs example. I can execute the example pipeline successfully, but not my own pipeline. I am using version 0.11.16.
Does anybody has an idea what it wrong and how I could solve this issue?Melle Minderhoud
07/06/2021, 9:24 AMdagster-run
task definition containing a container run
, but it is killed due to the next error:
Error: Got unexpected extra arguments ("ExecuteRunArgs", "instance_ref": {"__class__": "InstanceRef", "compute_logs_data": {"__class__": "ConfigurableClassData", "class_name": "LocalComputeLogManager", "config_yaml": "base_dir: /opt/dagster/dagster_home/storage\n", "module_name": "dagster.core.storage.local_compute_log_manager"}, "custom_instance_class_data": null, "event_storage_data": {"__class__": "ConfigurableClassData", "class_name": "PostgresEventLogStorage", "config_yaml": "postgres_db:\n db_name:\n env: DAGSTER_POSTGRES_DB\n hostname:\n env: DAGSTER_POSTGRES_HOSTNAME\n password:\n env: DAGSTER_POSTGRES_PASSWORD\n port: 5432\n username:\n env: DAGSTER_POSTGRES_USER\n", "module_name": "dagster_postgres.event_log"}, "local_artifact_storage_data": {"__class__": "ConfigurableClassData", "class_name": "LocalArtifactStorage", "config_yaml": "base_dir: /opt/dagster/dagster_home/\n", "module_name": "dagster.core.storage.root"}, "run_coordinator_data": {"__class__": "ConfigurableClassData", "class_name": "QueuedRunCoordinator", "config_yaml": "{}\n", "module_name": "dagster.core.run_coordinator"}, "run_launcher_data": {"__class__": "ConfigurableClassData", "class_name": "EcsRunLauncher", "config_yaml": "{}\n", "module_name": "dagster_aws.ecs"}, "run_storage_data": {"__class__": "ConfigurableClassData", "class_name": "PostgresRunStorage", "config_yaml": "postgres_db:\n db_name:\n env: DAGSTER_POSTGRES_DB\n hostname:\n env: DAGSTER_POSTGRES_HOSTNAME\n password:\n env: DAGSTER_POSTGRES_PASSWORD\n port: 5432\n username:\n env: DAGSTER_POSTGRES_USER\n", "module_name": "dagster_postgres.run_storage"}, "schedule_storage_data": {"__class__": "ConfigurableClassData", "class_name": "PostgresScheduleStorage", "config_yaml": "postgres_db:\n db_name:\n env: DAGSTER_POSTGRES_DB\n hostname:\n env: DAGSTER_POSTGRES_HOSTNAME\n password:\n env: DAGSTER_POSTGRES_PASSWORD\n port: 5432\n username:\n env: DAGSTER_POSTGRES_USER\n", "module_name": "dagster_postgres.schedule_storage"}, "scheduler_data": {"__class__": "ConfigurableClassData", "class_name": "DagsterDaemonScheduler", "config_yaml": "{}\n", "module_name": "dagster.core.scheduler"}, "settings": {"telemetry": null}}, "pipeline_origin": {"__class__": "PipelinePythonOrigin", "pipeline_name": "[removed]", "repository_origin": {"__class__": "RepositoryPythonOrigin", "code_pointer": {"__class__": "FileCodePointer", "fn_name": "my_repository", "python_file": "repo.py", "working_directory": "[removed]"}, "container_image": "[removed]", "executable_path": "/opt/pysetup/.venv/bin/python"}}, "pipeline_run_id": "018515a6-02f2-4441-a23e-25cedb785ee0"})
ECS injects the next command in this run
container:Alejandro DE LA CRUZ LOPEZ
07/06/2021, 3:16 PMSarah Gruskin
07/06/2021, 3:29 PMImportError: cannot import name 'EventRecord' from 'dagster.core.events.log' (/root/.local/lib/python3.9/site-packages/dagster/core/events/log.py)
We are installing the following dagster files from the Docker file:
RUN pip install --user \
dagster==${DAGSTER_VERSION} \
dagster-graphql==${DAGSTER_VERSION} \
dagster-postgres==${DAGSTER_VERSION} \
dagster-cron==${DAGSTER_VERSION} \
dagster-celery[flower,redis,kubernetes]==${DAGSTER_VERSION} \
dagster-aws==${DAGSTER_VERSION} \
dagster-k8s==${DAGSTER_VERSION} \
dagster-celery-k8s==${DAGSTER_VERSION} \
dagit==${DAGSTER_VERSION}
With ARG DAGSTER_VERSION=0.11.3
Is there a package I'm missing?Akshay Verma
07/06/2021, 5:02 PMSarah Gruskin
07/06/2021, 6:38 PM@daily_schedule
decorator. I'm unsure where to place the configs for slack for the token and channel information.
Currently my pipeline is being called by
@daily_schedule(
pipeline_name="my_pipeline",
start_date=datetime(2021, 4, 1),
execution_time=time(hour=8, minute=15),
execution_timezone="UTC",
mode="prod"
)
def my_schedule(date):
return {
"solids": {
"fetch_data": {
"config": {
"date": date.strftime("%Y-%m-%d")
}
},
"delete_daily_data": {
"config": {
"date": date.strftime("%Y-%m-%d")
}
},
"validate_data": {
"config": {
"date": date.strftime("%Y-%m-%d")
}
}
}
}
would I add another section within the return for resources?madhurt
07/06/2021, 6:39 PMvalues.yaml
file that this stuff is handled by computeLogManager
. Should I switch from LocalComputeLogManager
to another one? perhaps S3ComputeLogManager
? Or can I do something to make it work with ``LocalComputeLogManager`` itself?pdpark
07/06/2021, 9:18 PMmapping_keys
in the context of downstream solids? (RE: https://dagster.slack.com/archives/C01U954MEER/p1620400766465200?thread_ts=1620397879.463400&cid=C01U954MEER)Peter B
07/07/2021, 5:33 AMArun Kumar
07/07/2021, 10:19 AMgroup by
based on entity ids. Each pipeline also yields an asset for the respective aggregate. We also have downstream pipelines which depend on these aggregates and gets triggered based on the aggregates assets.
Currently I am generating the asset keys by concatenating different strings like event name, aggregate name, entity ids, etc., For example: n_customers_support_account_troubleshooting_tasks_SUM_user_id
. I feel that it can become so hard to manage them with such keys as the number of assets grow larger. Just wondering if the team has any thoughts or best practices towards asset management. Any plans to support tags for assets (similar to pipelines)?paul.q
07/08/2021, 6:13 AMOutputDefinition
to provide hints to the custom IOManager we use. Not sure if this is the right channel, but I'd like to upvote it in the hope it stays.
Thanks
PaulGeorge Pearse
07/08/2021, 9:11 AMEric Cheminot
07/08/2021, 9:40 AMStepan Dvoiak
07/08/2021, 10:04 AMdyn_outputs = produce_dyn_outputs()
dyn_outputs.map(compose_plus_two_mul_ten).collect()
is this equal to?
dyn_outputs = produce_dyn_outputs()
dyn_outputs.map(plus_two).map(mul_ten).collect()
where compose_plus_two_mul_ten
is composite_solid consist of plus_two
and mul_ten
solids
The question here is:
How i can control the order of execution of that 2 solids? Why plus_two
executes first on all outputs and only than execution of mul_ten
started
Use case: first solid will train model and second commit it to general dvc repoJeff Tavernier
07/08/2021, 10:26 AMDaniel Kim
07/08/2021, 6:15 PMAbednego Santoso
07/09/2021, 7:05 AMmake_email_on_pipeline_failure_sensor
https://dagster.slack.com/archives/CG4QMQW0H/p1625783653190300Alejandro DE LA CRUZ LOPEZ
07/09/2021, 9:09 AME0709 11:02:52.896413 378418 portforward.go:372] error copying from remote stream to local connection: readfrom tcp6 [::1]:8080->[::1]:35554: write tcp6 [::1]:8080->[::1]:35554: write: broken pipe
I attach an image of the forever "starting" and the quota of the namespace (in case it's useful). Do you have any idea where the issue could be?Vivek
07/09/2021, 1:21 PMEric Cheminot
07/09/2021, 1:58 PMAlex
07/09/2021, 2:29 PMAlex
07/09/2021, 2:31 PMChris Le Sueur
07/09/2021, 4:56 PMload_yaml_from_path
) but it doesn't seem to be parsed in the same way as a regular preset; the env: ...
keys I use are rejected as invalid scalars. Any suggestions?Noah Sanor
07/09/2021, 5:40 PMdagster.yaml
file to configure these settings instead? Would it be built into the user code deployment image? (sorry if this is documented, I'm having trouble finding it).David Baur
07/09/2021, 9:18 PMDdOS
07/10/2021, 1:54 AMKateryna Chenina
07/10/2021, 10:48 AMdagster.core.errors.DagsterInvalidDefinitionError: @composite_solid clean returned problematic value of type <class 'list'>. Expected return value from invoked solid or dict mapping output name to return values from invoked solids
Thanks for any advice!Xu Zhang
07/11/2021, 3:12 AMdagster.grpc
threads running?Xu Zhang
07/11/2021, 4:43 PMRubén Lopez Lozoya
07/12/2021, 10:36 AMTypeError: Argument 'obj' has incorrect type (expected list, got frozenlist)
This is what my config looks like:
solid_uw_get_target_metrics_columns = configured(
solid_get_target_df_columns, name="solid_uw_get_target_metrics_columns"
)({"columns": ["field", "source", "value", "reliability"]})
And this is what the solid looks like:
@solid(
config_schema={
"columns": Field([str], is_required=True),
}
)
def solid_get_target_df_columns(context, dfs: Dict[str, DagsterDataFrame]):
"""Solid that returns a Dictionary where every DataFrame has the columns specified from the original DataFrame
Parameters:
---------
dfs : Dict[str, DagsterDataFrame]
Dictionary containing DagsterDataFrames to retrieve columns from
columns : List[str]
List of target columns
Returns:
-------
dfs: Dict[str, DagsterDataFrame]
Dictionary containing DagsterDataFrames with target columns
"""
processed_dfs = {}
for key, df in dfs.items():
processed_dfs[key] = df[context.solid_config["columns"]]
return processed_dfs
If I cast to list(context.solid_config["columns"])
it works, otherwise the error will keep appearing. Any ideas?