Roel Hogervorst
11/16/2022, 4:42 PMJames Brady
11/16/2022, 5:23 PMChris Nogradi
11/16/2022, 7:21 PMZachary Bluhm
11/16/2022, 8:35 PMJose Uribe
11/16/2022, 8:37 PMDAGSTER_CURRENT_IMAGE
env var has been defined in the grpc’s container?David Reuss
11/16/2022, 9:04 PMAndrew Martin
11/16/2022, 10:57 PMsce_nol.caiso_discharging_analysis_helper_graph.filter_down_flowgate_analysis_by_number_of_harmers
op is failing and is looking for a dagster file that would be written by a prior op in the job. What's weird is this op fails early on when running the job, but the job run hasn't even gotten to the op yet. E.g. the job is running the ops out of order (see the screenshot where the failed op hasn't had the preceding ops run yet).
One thing to note: we're using dynamic outs in the job.Reid Beels
11/16/2022, 11:08 PMGCSComputeLogManager
on a k8s deployment.
I have it configured like this in the helm values file:
computeLogManager:
type: GCSComputeLogManager
config:
gcsComputeLogManager:
bucket: reidab-dagster
prefix: log/compute/
localDir: /tmp/k8s-compute-logs/
Authentication is configured for GCS via the dagster
k8s service account, and I beleive that’s working because the log/compute
directory was created within my previously-empty GCS bucket. I’m just not seeing any logs get put into the bucket or made available in dagit.
(I’m also not sure what the localDir setting is and I’ve tried a few values there. Is that local to the pod being run, or to the runner job, or the user code deployment, or the daemon…?)
Any ideas?Nolan Nichols
11/17/2022, 12:06 AMk8s_job_executor
and specifying tolerations
/ nodeSelector
tags so that pods will run on a specific node group.
This works as expected for the run
pod where I can see that my tags
were properly applied in the pod's yaml file and the pod runs on a node in the correct node group; however, the step
pods do not execute on the correct node group and when I examine the step
pod yaml file the tolerations
/ nodeSelector
tags are not present.
Is this the expected behavior? Is there another way to add tags to individual step pods?Casper Weiss Bang
11/17/2022, 4:46 AMTableRecord
in the docs:
https://docs.dagster.io/_apidocs/ops#dagster.MetadataValue.table
It says the signature of a TableRecord is as follows:
TableRecord(code="invalid-data-type", row=2, col="name"}]
Which obviously isn't correct python syntax. It ought to be:
TableRecord(code="invalid-data-type", row=2, col="name")
Upon looking at the actual source code
However it still doesn't seem to work. Not sure what the exact bug is, but I get a:
TypeError: TableRecord.__new__() takes 1 positional argument but 2 were given
Jori Geysen
11/17/2022, 10:23 AMmem_io_manager
or s3_pickle_io_manager
depending on the config of my custom get_in_memory_or_s3_io_manager
. This custom io-manager worked fine in dagster 0.14.12
.
@io_manager(
config_schema=Field(
Selector(
{
"s3": {
"s3_bucket": Field(StringSource),
"s3_prefix": Field(
StringSource, is_required=False, default_value="dagster"
),
},
"in_memory": Field(bool, is_required=False, default_value=True),
},
),
default_value={"in_memory": True},
),
)
def get_in_memory_or_s3_io_manager(init_context: InitResourceContext) -> IOManager:
resource_config = init_context.resource_config
if "s3" in resource_config:
return s3_pickle_io_manager(init_context.replace_config(resource_config["s3"]))
elif resource_config["in_memory"]:
return mem_io_manager(unbound_context)
else:
raise DagsterInvalidConfigError("Error in config for the io manager", [], None)
However, I've just updated my dagster version to 1.0.13
and am now seeing following error:
dagster._check.ParameterCheckError: Param "_" is not a UnboundInitResourceContext. Got <dagster._core.execution.context.init.InitResourceContext object at 0x7f05c06d6970> which is type <class 'dagster._core.execution.context.init.InitResourceContext'>.
Stack Trace:
File "/opt/pysetup/.venv/lib/python3.8/site-packages/dagster/_core/errors.py", line 184, in user_code_error_boundary
yield
File "/opt/pysetup/.venv/lib/python3.8/site-packages/dagster/_core/execution/resources_init.py", line 325, in single_resource_event_generator
resource_def.resource_fn(context)
File "/dagflow/dagflow/default_job_configs/custom_resources.py", line 87, in get_in_memory_or_s3_io_manager
return mem_io_manager(init_context)
File "/opt/pysetup/.venv/lib/python3.8/site-packages/dagster/_core/definitions/resource_definition.py", line 224, in __call__
check.opt_inst_param(args[0], context_param_name, UnboundInitResourceContext)
File "/opt/pysetup/.venv/lib/python3.8/site-packages/dagster/_check/__init__.py", line 682, in opt_inst_param
raise _param_type_mismatch_exception(obj, ttype, param_name, additional_message)
My questions;
• What's wrong with my custom io-manager that this error gets thrown in dagster 1.0.13
?
• Are there better patterns to write an io-manager which switches between memory and s3 based on resource config?
Thanks !Michael Cowling
11/17/2022, 11:39 AMDaniel Galea
11/17/2022, 11:49 AM@op(ins={"start": In(Nothing)})
def launch_emr_cluster() -> Tuple[EmrJobRunner, str]:
emr_config = get_emr_config(
release_label="emr-6.8.0",
cluster_name="Dagster Cluster",
master_node_instance_type="m5.xlarge",
worker_node_instance_type="m5.xlarge",
worker_node_instance_count=2,
ec2_subnet_id="",
bid_price="0.039",
)
emr = EmrJobRunner(
region="eu-central-1", aws_access_key_id="", aws_secret_access_key=""
)
cluster_id = emr.run_job_flow(log=log, cluster_config=emr_config)
<http://log.info|log.info>(cluster_id)
return (emr, cluster_id)
@op
def add_job_step(emr: EmrJobRunner, cluster_id: str) -> Tuple[EmrJobRunner, str, str]:
s3_spark_code_path="<s3://PATH/test-file.py>",
step_defs = get_step_config(s3_spark_code_path=s3_spark_code_path)
step_ids: list = emr.add_job_flow_steps(log=log, cluster_id=cluster_id, step_defs=step_defs)
<http://log.info|log.info>(step_ids)
return (emr, cluster_id, step_ids)
@op
def is_step_complete(emr: EmrJobRunner, cluster_id:str , step_id: str) -> None:
while not(emr.is_step_complete(log=log, cluster_id=cluster_id, emr_step_id=step_id)):
time.sleep(10)
and this is how I build my graph:
@graph
def emr_pipeline():
is_step_complete(add_job_step(launch_emr_cluster(ingest())))
This is how I build my job:
def emr_job():
job = <http://emr_pipeline.to|emr_pipeline.to>_job(
name="daily_etl",
description="Daily ETL job",
partitions_def=DailyPartitionsDefinition(
start_date="2022-11-09", timezone="Europe/Amsterdam"
),
tags={
"dagster-k8s/config": {
"job_spec_config": {"ttl_seconds_after_finished": 300}
}
},
)
job_schedule = build_schedule_from_partitioned_job(
job, default_status=DefaultScheduleStatus.RUNNING
)
return job, job_schedule
However, when I run Dagster I am getting the following error in the UI:
Missing required config entry "ops" at the root. Sample config for missing entry: {'ops': {'add_job_step': {'inputs': {'cluster_id': '...'}}, 'is_step_complete': {'inputs': {'cluster_id': '...', 'step_id': '...'}}}}
I don't understand why Dagster is requesting a runtime configuration when I am passing objects from one op to the next. Can someone help with this? Even pointing to the right documentation would be enough as I've been reading the Ops documentation but haven't been able to resolve it. Thanks in advance!James Brady
11/17/2022, 12:21 PMAndreas Fred Ojala
11/17/2022, 12:39 PMSam Van Den Berghe
11/17/2022, 1:19 PMRuntimeError: Java gateway process exited before sending its port number
. Is this a limitation of dagster-cloud or do I need to setup some other things to make it work?Josh Taylor
11/17/2022, 1:39 PM@asset_sensor(
asset_key=AssetKey("my_awesome_job"),
I'm using Asset Materializations, and would like to listen for anything on foobar_
.
edit: seems i've hit https://github.com/dagster-io/dagster/issues/4582Kartik Muruganantham
11/17/2022, 2:08 PMRiccardo Tesselli
11/17/2022, 2:48 PMload_from:
- python_file:
relative_path: my/path/subpackage1/repository.py
- python_file:
relative_path: my/path/subpackage2/repository.py
and I get the following error:
dagster._check.CheckError: Invariant failed. Description: Cannot have multiple locations with the same name, got multiple "repository.py"
Then, if I rename one of the two repository.py
files with another name then the errors disappears but dagit
looks stuck (the server is not spawning and if I try to abort with ctrl-C it doesn’t stop).
I’m using dagit 1.0.17
Any suggestions? ThanksFrank Dekervel
11/17/2022, 3:26 PMHarry James
11/17/2022, 4:01 PMfrom dagster import StaticPartitionsDefinition, materialize_to_memory, asset
@asset
def independent_asset():
return 1 + 2
@asset(partitions_def=StaticPartitionsDefinition(["1", "2"]))
def partitioned(context):
return context.partition_key
@asset
def top_level_asset(partitioned, independent_asset):
return partitioned, independent_asset
materialize_to_memory([top_level_asset, partitioned, independent_asset], partition_key=["1", "2"])
Simon
11/17/2022, 5:02 PMWill Ho
11/17/2022, 6:24 PMparent-run-id
in the op context object. However, that field is set for both automatic run retries and user-initiated retries in the Dagster UI. Is there a programmatic way to differentiate between automatic retries vs user retries?Chris Histe
11/17/2022, 8:02 PMAndré Augusto
11/17/2022, 8:30 PMOutputs
and DynamicOutputs
in an IOManager
at runtime? Or only the metadata provided by the Out
class at the op
definition?Zachary Bluhm
11/17/2022, 8:57 PMBennett Norman
11/17/2022, 10:11 PMAkira Renbokoji
11/18/2022, 12:21 AMbotocore.errorfactory.InvalidParameterException: An error occurred (InvalidParameterException) when calling the RunTask operation: launch type should be one of [EC2,FARGATE,EXTERNAL]
dagster.yaml
run_launcher:
module: "dagster_aws.ecs"
class: "EcsRunLauncher"
config:
container_name: "my-dagster"
run_task_kwargs:
launchType: "Fargate"
I'm thinking that I didn't configure my Dagster to use Fargate correctly.Mohammad Nazeeruddin
11/18/2022, 4:45 AMPeter Rietzler
11/18/2022, 5:09 AM