nickvazz
12/14/2022, 7:56 PMJeff
12/14/2022, 9:49 PM@asset
. Details in 🧵Shondace
12/14/2022, 10:36 PMQwame
12/14/2022, 10:40 PMpytest
. I have
from project import my_repository
def test_repos():
my_repository.load_all_definitions()
However, when I run this, I get the error
AttributeError: 'PendingRepositoryDefinition' object has no attribute 'load_all_definitions'
Zach
12/14/2022, 11:42 PMDagsterImportError: Encountered ImportError: `cannot import name 'introspection_query' from 'graphql' (/databricks/python/lib/python3.8/site-packages/graphql/__init__.py)` while importing module repo. Local modules were resolved using the working directory `/Users/zachary.romer/Documents/empirico/etxlib`. If another working directory should be used, please explicitly specify the appropriate path using the `-d` or `--working-directory` for CLI based targets or the `working_directory` configuration option for workspace targets.
it looks like the gql
library that's referenced in dagster-graphql
isn't pinned, so it didn't get updated when I updated my environment. does anyone know what minimum version of gql
is required by dagster-graphql
? looks like a breaking change in that library is causing issues for me, but I'm having some dependency resolution issues with other requirements when I try pinning it to 3.0.0+Shondace
12/15/2022, 12:16 AM2022-12-15 00:14:26 +0000 - dagster.daemon.QueuedRunCoordinatorDaemon - ERROR - Another QUEUED_RUN_COORDINATOR daemon is still sending heartbeats. You likely │
have multiple daemon processes running at once, which is not supported. Last heartbeat daemon id: 37ed50b2-39e7-437b-8823-bc4e36e6ea11, Current daemon_id: 5bf│
7e12e-1371-47c8-94ad-33b285b4d0a3 │
2022-12-15 00:14:26 +0000 - dagster.daemon.SchedulerDaemon - ERROR - Another SCHEDULER daemon is still sending heartbeats. You likely have multiple daemon pro│
cesses running at once, which is not supported. Last heartbeat daemon id: 37ed50b2-39e7-437b-8823-bc4e36e6ea11, Current daemon_id: 5bf7e12e-1371-47c8-94ad-33b│
285b4d0a3 │
2022-12-15 00:14:26 +0000 - dagster.daemon.BackfillDaemon - ERROR - Another BACKFILL daemon is still sending heartbeats. You likely have multiple daemon proce│
sses running at once, which is not supported. Last heartbeat daemon id: 37ed50b2-39e7-437b-8823-bc4e36e6ea11, Current daemon_id: 5bf7e12e-1371-47c8-94ad-33b28│
5b4d0a3 │
2022-12-15 00:14:31 +0000 - dagster.daemon.SensorDaemon - ERROR - Another SENSOR daemon is still sending heartbeats. You likely have multiple daemon processes│
running at once, which is not supported. Last heartbeat daemon id: 37ed50b2-39e7-437b-8823-bc4e36e6ea11, Current daemon_id: 5bf7e12e-1371-47c8-94ad-33b285b4d│
0a3
Indra
12/15/2022, 5:23 AMPraveen Kumar Jha
12/15/2022, 5:53 AMĐinh Đức Dương
12/15/2022, 7:44 AMThierry Hue
12/15/2022, 8:57 AM@run_status_sensor(run_status=DagsterRunStatus.CANCELED)
def job_canceled_sensor(context: RunStatusSensorContext):
prometheus = PrometheusClient()
status = 'canceled'
logger = get_dagster_logger(name='JOB_STATUS_SENSOR')
<http://logger.info|logger.info>(f"job_canceled_sensor - {context.dagster_run.job_name} - {status}")
prometheus.change_job_state(context.dagster_run.job_name, status)
prometheus.pushadd_to_gateway(job_name=context.dagster_run.job_name)
But then when I terminate the job via the UI, the job state is CANCELED but the sensor is not triggered.
Am I doing something wrong?Hemant Kumar
12/15/2022, 9:15 AMAlexis Manuel
12/15/2022, 9:57 AMMaterialize all / Materialize selected
button. Details in 🧵Kasper Ramström
12/15/2022, 11:04 AMdef run_node(node):
if node.has_child_nodes():
for child_node in node.child_nodes():
yield run_node(child_node)
yield {"res": my_func(node)}
def my_job():
graph = load_graph()
for node in graph:
run_node(node)
Emilja Dankevičiūtė
12/15/2022, 11:27 AMassets
. Said function is called once in a repository
along with dbt
and airbyte
build functions. All looks ok in Dagit (upstreams resoved, etc). However, when I click materialize before the op
that builds said asset
the parent function is called several times again. Am I doing something wrong? Or is this an expected behavior?
I've looked at airbyte assets and I can see a CacheableAssetsDefinition
instead of AssetsDefinition
used there. Should I just re-implement this for Hightouch? I couldn't find an example that would show how to load assets from external source, only how to explicitly define them in Dagster. But for us at the moment, this would cause a lot of boilerplate, so we're testing other aproaches.Martin Picard
12/15/2022, 12:05 PMMartin Picard
12/15/2022, 12:39 PMDaniel Kilcoyne
12/15/2022, 2:58 PMGeoffrey Greenleaf
12/15/2022, 3:24 PMdef create_bronze_assets(tenant: str):
@asset(...)
def some_asset()
return [some_asset]
# in __init__.py that has the new defs = Definitions line
tenant_one = create_accounting_system_assets('tenant_one')
tenant_two = create_accouting_system_assets('tenant_two')
tenant_three = create_other_accounting_system_assets('tenant_three')
defs = Defintions(...)
Depending on the tenant we would materialize each set of assets once a day. I'm looking at making the assets partitioned by day for that. Then I got looking into multi dimensional partitions and possible thinking about using the tenant name as the first key of the partition and the date as the second. Any thoughts on this idea or better ways to handle this multi tenancy?Davi
12/15/2022, 3:47 PMjsonCredentialsEnvvar
for gcsComputeLogManager
? I got the credentials json, converted to string using json.dumps() in python, and passed this string in the helm chart. However it returns: ValueError: ('Could not deserialize key data. The data may be in an incorrect format, it may be encrypted with an unsupported algorithm, or it may be an unsupported key type).
How should I pass this json ?Vinnie
12/15/2022, 3:55 PMasset_reconciliation_sensor
supposed to work with cross-repository asset dependencies? I get a KeyError
from the sensor when it detects a change in an upstream asset in a different repository, see stacktrace:
KeyError: AssetKey(['my', 'cool', 'asset'])
File "/app/venv/lib/python3.9/site-packages/dagster/_core/errors.py", line 199, in user_code_error_boundary
yield
File "/app/venv/lib/python3.9/site-packages/dagster/_grpc/impl.py", line 324, in get_external_sensor_execution
return sensor_def.evaluate_tick(sensor_context)
File "/app/venv/lib/python3.9/site-packages/dagster/_core/definitions/sensor_definition.py", line 425, in evaluate_tick
result = list(self._evaluation_fn(context))
File "/app/venv/lib/python3.9/site-packages/dagster/_core/definitions/sensor_definition.py", line 589, in _wrapped_fn
result = fn(context)
File "/app/venv/lib/python3.9/site-packages/dagster/_core/definitions/asset_reconciliation_sensor.py", line 858, in sensor_fn
run_requests, updated_cursor = reconcile(
File "/app/venv/lib/python3.9/site-packages/dagster/_core/definitions/asset_reconciliation_sensor.py", line 694, in reconcile
) = determine_asset_partitions_to_reconcile(
File "/app/venv/lib/python3.9/site-packages/dagster/_core/definitions/asset_reconciliation_sensor.py", line 362, in determine_asset_partitions_to_reconcile
all(
File "/app/venv/lib/python3.9/site-packages/dagster/_core/definitions/asset_reconciliation_sensor.py", line 374, in <genexpr>
instance_queryer.is_reconciled(
File "/app/venv/lib/python3.9/site-packages/dagster/_utils/cached_method.py", line 59, in helper
result = method(self, *args, **kwargs)
File "/app/venv/lib/python3.9/site-packages/dagster/_utils/caching_instance_queryer.py", line 205, in is_reconciled
for parent in asset_graph.get_parents_partitions(
File "/app/venv/lib/python3.9/site-packages/dagster/_core/definitions/asset_graph.py", line 257, in get_parents_partitions
for parent_asset_key in self.get_parents(asset_key):
File "/app/venv/lib/python3.9/site-packages/dagster/_core/definitions/asset_graph.py", line 172, in get_parents
return self.asset_dep_graph["upstream"][asset_key]
And the `RepositoryDefinition`; keeping in mind that all assets fetched from dbt depend on upstream `SourceAsset`s.
@repository
def my_repo():
project = "my_project"
return [
slack_on_run_failure,
*with_resources(**generate_dbt_repo(project, DEPLOYMENT_NAME)),
build_asset_reconciliation_sensor(
AssetSelection.all(),
name="reconciliation_sensor",
)
]
David C
12/15/2022, 4:45 PM@repository
def foo_jobs():
all_jobs = []
for customer_id in rpc_client.get_all_customers():
for job_name in rpc_client.get_all_foo_jobs(customer_id):
all_jobs.append(
dagster.JobDefinition( ??? )
)
return all_jobs
Or do I need to write some kind of generate to write out the python code with the annotations?
I am aware of this "assets" concept as well, but I don't think that will work because these jobs have different schedules, and I need direct control of the graph for each job (different FooJobs could have different graphs based on information known only at runtime).Caio Tavares
12/15/2022, 6:19 PMdagster-step
jobs? Is there a way to configure it in the Helm Chart or it should be passed in the k8s_executor?geoHeil
12/15/2022, 8:18 PMSean Han
12/16/2022, 1:20 AMWilliam
12/16/2022, 2:06 AMD depends on C, C on A + B
When I want to convert the asset to a job in order to define a schedule for it, shall I put all A+B+C+D to define_asset_job
?Davi
12/16/2022, 8:58 AMjsonCredentialsEnvvar
for gcsComputeLogManager
be ? Should it be the path to the json file? Should it be the json file dumped as a string? If it is the path to the file, should it be set inside each different workspace(inside each different docker image containing my repo's) ? Could I stock my credential.json in a GCP bucket and use its address as jsonCredentialsEnvvar
?
Thanks !won
12/16/2022, 9:40 AMJohn Lee
12/16/2022, 3:26 PMworkspace:
enabled: true
servers:
- host: "k8s-my-pipeline"
port: 3030
name: "my-pipeline"
When I update the image tag in the values.yaml of the subchart and redeploy it, the code server image updates correctly but the jobs launched from dagit still use the original image.
deployments:
- name: "k8s-my-pipeline"
image:
repository: "my_pipeline_image"
tag: changed_tag
pullPolicy: Always
dagsterApiGrpcArgs:
- "--python-file"
- "/opt/my_pipeline/repository.py"
port: 3030
includeConfigInLaunchedRuns:
enabled: True
Martin Picard
12/16/2022, 3:32 PMPeter Davidson
12/16/2022, 3:50 PM