Makas Tzavellas
12/20/2022, 9:43 AMAndras Somi
12/20/2022, 10:26 AMobservable_source_asset
for this, but I need to have access to a resource (db connection) within the observe_fn
to calculate a LogicalVersion
value. For a normal asset I have required_resource_keys
and the context
object, but how do I do this in an observable source asset? Does this even make sense?Adam Ward
12/20/2022, 1:34 PMChris Histe
12/20/2022, 3:32 PMretry_policy
on a @graph
and define_asset_job
? As a workaround we can set the retry policy on the assets and ops but since this property exists for @job
it would be nice to have it.Daniel Daum
12/20/2022, 4:52 PM@asset
def asset1():
return [1, 2, 3]
@asset
def asset2(asset1):
return asset1 + [4]
all_assets_job = define_asset_job(name="all_assets_job")
asset1_job = define_asset_job(name="asset1_job", selection="asset1")
defs = Definitions(
assets=[asset1, asset2],
jobs=[all_assets_job, asset1_job]
I stole the code snippet from the "how to create jobs from software defined assets section of the docs"
https://docs.dagster.io/concepts/ops-jobs-graphs/jobs#from-software-defined-assets
Am I doing something incorrect with the new API?Daniel Daum
12/20/2022, 4:54 PMArchie Kennedy
12/20/2022, 5:29 PMload_from:
- python_package:
package_name: pipeline
executable_path: .venv/bin/python
Everything works great on Linux, but Windows uses a different path to the Python binary:
load_from:
- python_package:
package_name: pipeline
executable_path: .venv/Scripts/python
Maybe this is more of a Poetry question..Airton Neto
12/20/2022, 5:43 PMAssetIn(key=, partition_key=MultiPartitionKey({..})) # pseudo-code
Mario Carrillo
12/20/2022, 8:32 PMDetails --> job is running on local machine, dagit==1.0.17, dagster==1.0.17
Steven Litvack-Winkler
12/20/2022, 9:21 PMLindsay S
12/20/2022, 10:41 PMPablo Beltran
12/20/2022, 10:50 PMDarshan Bhagat
12/21/2022, 3:27 AMSven Lito
12/21/2022, 4:06 AMEmilja Dankevičiūtė
12/21/2022, 7:31 AMop_retry_policy
on define_asset_job
? tags={"dagster/max_retries": 2}
would probably work. But I'm also interested in controlling delay between retries..Daniel Galea
12/21/2022, 9:00 AM@graph
decorator on a function and specifying the Ops inside that function. I am getting the following error due to the Op
which has a hook
. The error: AttributeError: 'PendingNodeInvocation' object has no attribute '__name__'
. This happens because GraphDefinition
expects node_defs
to be a list of NodeDefinition
and not PendingNodeInvocation
. I tried to separate the hooks from the nodes and pass them to the repository but a repository does not accept the HookDefinition
class.
I want to apply certain hooks to specific Ops so applying a hook to the entire job doesn't fit my use case. Is there a way I can work around this? or a different way I can create Ops dynamically in the same way I want to? Below is an example of how I create my code:
from dagster import GraphDefinition, op, DependencyDefinition, OpExecutionContext
nodes = list()
def get_template_node(x: int):
@op(name=f"node{x}")
def template_node(context: OpExecutionContext, some_value) -> str:
return f"This is node {x}"
return template_node
def get_downstream_node(x: int):
@op(name=f"downstream_of_node{x}")
def downstream_node(context: OpExecutionContext, some_value) -> str:
return f"DOWNSTREAM GOT VALUE {some_value}"
return downstream_node
for x in range(0, 5):
nodes.append(get_template_node(x=x))
nodes.append(get_downstream_node(x=x).with_hooks({my_hook_definition}))
dependencies = dict()
for x in range(len(nodes) - 1, -1, -1):
if x != 0:
dependencies[nodes[x].name] = {
"some_value": DependencyDefinition(nodes[x - 1].name)
}
my_graph = GraphDefinition(
name="dynamic_graph",
description="dynamic grap desc",
node_defs=nodes,
dependencies=dependencies,
)
my_job = <http://my_graph.to|my_graph.to>_job(name="dynamic_job")
@repository
def repo():
return [my_job]
Dane Linssen
12/21/2022, 10:20 AMmy_workflow_job = define_asset_job(
"my_workflow_job",
selection=AssetSelection.keys(asset_from_upstream_repo_A, asset_from_upstream_repo_B, asset_from_this_repo),
)
If this is not possible, what’s the best way to create such workflows?SUBHADEEP ROY
12/21/2022, 2:06 PMDaniel Mosesson
12/21/2022, 2:32 PMdagster._core.errors.DagsterLaunchFailedError: Error during RPC setup for executing run: dagster_postgres.utils.DagsterPosgresException: too many retries for DB connection
but other jobs work just fine. Any ideas what could be happening?Daniel Gafni
12/21/2022, 2:35 PM.configured
config?
My resource:
@resource(
config_schema={
"host": Field(StringSource, default_value="localhost"),
"db": int,
"password": Field(StringSource, is_required=False),
}
)
def redis_client(init_context: InitResourceContext) -> Redis:
...
The Launchpad config:
resources:
redis_client:
config:
db: 1
I pass a "partially " configured (I realize it's actually fully configured) resource to my `Definitions`:
"redis_client": redis_client.configured({"host": {"env": "REDIS_HOST"}, "db": 0}),
After this the db
value is always 0, while I would expect the Launchpad config to override the value provided from the code.Rodrigo Parra
12/21/2022, 2:47 PMcontext.upstream_output.has_partition_key
is False
.
Why could this be happening? Both assets are annotated with @asset(partitions_def=DailyPartitionsDefinition(start_date="2015-02-10"))
and their job definitions include partitions_def=DailyPartitionsDefinition(start_date="2015-02-10")
as a parameter. I need the io manager the pick up only the partition from A that corresponds to the same date of B that I’m trying to materialize.Mark Fickett
12/21/2022, 3:08 PMdagit
I get cannot import name 'introspection_query' from 'graphql'
. Anyone else seeing this? I tried reinstalling my requirements in a fresh venv. And I think my only graphql requirements are coming from Dagster. I can call execute_job
or run via Dagster Cloud just fine.Gabe Schine
12/21/2022, 3:14 PMjob
that calls an asset
that itself has two upstream assets. The job
has a config=
set for one of the ops
(via config={"ops":{"op1": ...}}
). When I attempt to load the repository, I get an error like so:
Missing required config entries "asset1" at the root. Sample config for missing entries: {'asset1': {'inputs': {'upstream_asset1': '...', 'upstream_asset2': '...'}}}
If I add the above scaffold to my `job`'s config=
stanza, I get another error:
Error 1: Must specify a field at path root:asset1:inputs:upstream_asset1 if more than one field is defined. Defined fields: ['json', 'pickle', 'value']
I'm not sure why this configuration is needed (I did not define any config_schema
for any of the assets), and I also wouldn't know what are the correct values to place in these config sections. I can't find anything in the documentation (or by reading the dagster source code, even) that helps me figure it out.
Thanks for any help/pointers you can offer!Peter Davidson
12/21/2022, 5:42 PM@asset(partitions_def=StaticPartitionsDefinition('abc'))
def partition_samplestep(partition_sample) -> pd.DataFrame:
...
return partition_sample
part_job = define_asset_job(name='partition_job', selection=partition_samplestep, partitions_def=<some syntax to select only partitoin c>)
Gabe Schine
12/21/2022, 6:28 PM@asset
function from a @job
cause the asset to be re-materialized, or does it use whatever was the last materialization value?Farhan Baig
12/21/2022, 7:02 PMPeter Davidson
12/21/2022, 7:16 PMBennett Norman
12/21/2022, 8:28 PMTerry Lines
12/21/2022, 11:59 PMGatsby Lee
12/22/2022, 12:33 AM