Félix Tremblay
11/14/2022, 1:53 PMFélix Tremblay
11/14/2022, 1:55 PMZachary Bluhm
11/14/2022, 4:14 PMRunStatusSensor
? I couldn't find a way looking at the source or docs, but do see there's a "cursor" field in the UISlaven Tepsic
11/14/2022, 4:30 PMAmit Arie
11/14/2022, 4:47 PM@graph
def calculate_distance(metric: Metrics = Metrics.kilometer)
pass
such as Metrics
is an enum?
I’m getting:
input metric must get a value from the inputs section of its configuration.
Brian Pohl
11/14/2022, 7:25 PMk8s_job_op
? I declare my op like this:
def function_to_create_k8s_op( <a lot of variables > ):
op = k8s_job_op.configured(
config_or_config_fn = {
'image': f"{image_url}:{tag}",
'service_account_name': env['SERVICE_ACCOUNT'],
'command': ['java'],
'args': args,
'resources': {
'requests': {
'cpu': cpu,
'memory': memory,
},
},
'image_pull_policy': 'Always',
'env_vars': all_env_vars
},
name = name,
)
return op
I tried op.retry_policy = RetryPolicy(...
but that attribute is read-only. My understanding is that k8s_job_ops.configured(config_schema
should only be used to set the config schema of the op, which is already defined here. I have also tried setting other attributes by passing them into job_spec_config
, but I found that those were being ignored, and the only way to get attributes set was to pass them into config_or_config_fn
(which seems like a a bug, but because I'm using an older version - 1.0.2 - i'm not focusing on that).
So is there any opening to insert a retry policy? Or would the best option be trying to upgrade to the newest version and then using job_spec_config
? If so, I also could use a pointer to the correct K8s spec argument for retries, because in my last run through the docs I didn't see any.Mark
11/14/2022, 8:39 PMNicolas May
11/15/2022, 12:20 AMJohn Sears
11/15/2022, 5:30 AMsnowflake_io_manager
support non-date partitions?shailesh
11/15/2022, 7:56 AM"run_id": "f72306b3-xxx-xxxxx", "state_id": "el-data", "stdio": "stderr", "cmd_type": "extractor", "name": "eldata", "event": "pymysql.err.OperationalError: (2013, 'Lost connection to MySQL server during query')", "level": "info",
Some time below pod restart error:
started a new run worker while the run was already in state DagsterRunStatus.STARTED. This most frequently happens when the run worker unexpectedly stops and is restarted by the cluster. Marking the run as failed.
Please let me know any one had faced these issues and how it was resolved. 🙏Megan Beckett
11/15/2022, 8:44 AMAndras Somi
11/15/2022, 12:38 PMfoo/bar
(but possibly contain more levels, eg. foo/bar/baz
is matched as well as foo/bar
and foo/bar/baz/*qux*
). I’m trying to avoid explicitly listing all the assets in a define_asset_job()
.Gintvilė Bergerytė
11/15/2022, 1:17 PMk8s_job_op
without using configured
method? I would like to pass different configs for two k8s_job_ops in job config at runtime. It seems that I need to specify names.
Or is there another way to pass config to different k8s_job_ops
at runtime?
i.e. something like this:
@job
def my_job():
k8s_job_op(name="add")
k8s_job_op(name="substract"))
my_job.execute_in_process(run_config={"ops": {"k8s_job_op": {"config": {"args": ["the", "args"]}}}})
The problem is that I need several k8s_job_op
. It works with one sadpandaRoei Jacobovich
11/15/2022, 3:20 PMDagsterGraphQLClient.reload_repository_location
but I have to get somehow Dagit’s IP and all the repository locations. Is there a way to extract these from the sensor context? (maybe from the DagsterInstance
)?
If there’s another way (not using GraphQL and Dagit) that could be great too!
Thanks!Gayathiri
11/15/2022, 4:34 PMSkipped the execution of hook "slack_message_on_failure". It did not meet its triggering condition during the execution of "xxx".
Simon
11/15/2022, 6:00 PMPablo Beltran
11/15/2022, 7:31 PMJoel Olazagasti
11/15/2022, 7:33 PMUserWarning: Error loading repository location $dbt_project:KeyError: 'model.$subfolder.$model_name'
Error when trying to load in a DBT project? I've followed the basic folder structure from the github DBT/Dagster example, with both my DBT project and Dagster project at the root of the PWD, with the same profile/project import structure as in the exampleJames Hale
11/15/2022, 10:24 PMGustavo Carvalho
11/15/2022, 11:12 PMAssetMaterialization
to maintain asset materialization consistency on dagster when a materialization of multiple partitions of a partitioned asset is required.
I know it is possible to just launch a Job to do exactly that, but imho it defeats the great usage flow which comes from exploring the asset list and re-materializing a couple of partitions.Bennett Norman
11/16/2022, 4:37 AMStefan Adelbert
11/16/2022, 6:40 AMselenium
and headless firefox+geckodriver) and then read those file into pandas DataFrames and then load the data into a database (or whatever). I have written some polling functionality (using polling2
) which helps me to work out once a file has fully downloaded, but I'm not convinced that it is always accurate.
Sometimes the files are largish (~200MB). There is a significant lag between the downloaded file initially being created in the filesystem and the all the data being written into that file. I first poll for the existence of the file and then I poll for that file having non-zero size.
Is there a better way (on Linux) to confirm that a file is no longer being written to and is therefore ready to be consumed?Dominik Liebler
11/16/2022, 8:42 AM.spec.ttlSecondsAfterFinished
in the Job resources it creates resulting in a lot of accumulated Jobs never being deleted. I tried to set the dagster-k8s/config
tag explicitly like described in the documentation but to no avail. I also tried updating to 1.0.17 but that didn’t help either. Is there something else I need to consider here?Mark
11/16/2022, 8:47 AMZsuzsanna Orban-Nagy
11/16/2022, 11:59 AMfrom typing import List
from dagster import graph, op
@op
def return_one() -> int:
return 1
@op
def sum_fan_in(nums: List[int]) -> int:
return sum(nums)
@graph
def fan_in():
fan_outs = []
for i in range(0, 10):
fan_outs.append(return_one.alias(f'return_one_{i}')())
sum_fan_in(fan_outs)
Binoy Shah
11/16/2022, 1:32 PMMarc Keeling
11/16/2022, 3:12 PM@multi_asset
that also has a downstream dependency asset?Chris Histe
11/16/2022, 4:09 PMgcs_pickle_io_manager
IO Manager. Here’s a close version of my code:
import os
from dagster import op, graph, AssetsDefinition, ResourceDefinition
from dagster_gcp import bigquery_resource, gcs_resource
from dagster_gcp.gcs.io_manager import gcs_pickle_io_manager
@op
def upstream_op():
return 1
@op
def downstream_op(upstream_op):
return upstream_op + 1
@graph
def my_graph():
return downstream_op(upstream_op())
asset_definition = AssetsDefinition.from_graph(
my_graph,
group_name="my_group",
resource_defs={
"gcs_bucket": ResourceDefinition.hardcoded_resource(os.getenv("BUCKET", "bucket")),
"gcs": gcs_resource,
"bigquery": bigquery_resource,
"io_manager": gcs_pickle_io_manager,
},
)
The error I’m getting:
Conflicting versions of resource with key 'io_manager' were provided to different assets. When constructing a job, all resource definitions provided to assets must match by reference equality for a given key.
I’m not sure I understand what this error means.
I tried a couple different things but I was not able to fix it 😞
Any help is greatly appreciatedJames Brady
11/16/2022, 4:14 PMJames Brady
11/16/2022, 4:20 PM