Eugenio Contreras
11/24/2022, 12:02 PMop
? like <http://logging.info|logging.info>
instead of <http://context.log.info|context.log.info>
?Morris Clay
11/24/2022, 1:33 PMException: Timed out after waiting 315s for server examplelocation-prod-7ae6cb.serverless-agents-namespace-3:4000. Most recent connection error: dagster._core.errors.DagsterUserCodeUnreachableError: Could not reach user code server Stack Trace: File "/dagster-cloud/dagster_cloud/workspace/user_code_launcher/user_code_launcher.py", line 1412, in _wait_for_server_process client.ping("") File "/dagster/dagster/_grpc/client.py", line 170, in ping res = self._query("Ping", api_pb2.PingRequest, echo=echo) File "/dagster/dagster/_grpc/client.py", line 141, in _query raise DagsterUserCodeUnreachableError("Could not reach user code server") from e The above exception was caused by the following exception: grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with: status = StatusCode.UNKNOWN details = "Exception calling application: <_InactiveRpcError of RPC that terminated with: status = StatusCode.UNAVAILABLE details = "failed to connect to all addresses" debug_error_string = "{"created":"@1669296569.032962470","description":"Failed to pick subchannel","file":"src/core/ext/filters/client_channel/client_channel.cc","file_line":3260,"referenced_errors":[{"created":"@1669296569.032961531","description":"failed to connect to all addresses","file":"src/core/lib/transport/error_utils.cc","file_line":167,"grpc_status":14}]}" >" debug_error_string = "{"created":"@1669296569.033740107","description":"Error received from peer ipv4:10.0.29.197:4000","file":"src/core/lib/surface/call.cc","file_line":966,"grpc_message":"Exception calling application: <_InactiveRpcError of RPC that terminated with:\n\tstatus = StatusCode.UNAVAILABLE\n\tdetails = "failed to connect to all addresses"\n\tdebug_error_string = "{"created":"@1669296569.032962470","description":"Failed to pick subchannel","file":"src/core/ext/filters/client_channel/client_channel.cc","file_line":3260,"referenced_errors":[{"created":"@1669296569.032961531","description":"failed to connect to all addresses","file":"src/core/lib/transport/error_utils.cc","file_line":167,"grpc_status":14}]}"\n>","grpc_status":2}" > Stack Trace: File "/dagster/dagster/_grpc/client.py", line 139, in _query return self._get_response(method, request=request_type(**kwargs), timeout=timeout) File "/dagster/dagster/_grpc/client.py", line 129, in _get_response return getattr(stub, method)(request, metadata=self._metadata, timeout=timeout) File "/usr/local/lib/python3.8/site-packages/grpc/_channel.py", line 946, in __call__ return _end_unary_response_blocking(state, call, False, None) File "/usr/local/lib/python3.8/site-packages/grpc/_channel.py", line 849, in _end_unary_response_blocking raise _InactiveRpcError(state)
File "/dagster-cloud/dagster_cloud/workspace/user_code_launcher/user_code_launcher.py", line 1047, in _reconcile
self._wait_for_new_server_ready(
File "/dagster-cloud/dagster_cloud/workspace/ecs/launcher.py", line 289, in _wait_for_new_server_ready
self._wait_for_dagster_server_process(
File "/dagster-cloud/dagster_cloud/workspace/user_code_launcher/user_code_launcher.py", line 1396, in _wait_for_dagster_server_process
self._wait_for_server_process(client, timeout, additional_check)
File "/dagster-cloud/dagster_cloud/workspace/user_code_launcher/user_code_launcher.py", line 1418, in _wait_for_server_process
raise Exception(
Mycchaka Kleinbort
11/24/2022, 1:43 PMfrom dagster._core import Project
project = Project(...)
df_users = project.load_asset('df_users', materialize=False)
Mycchaka Kleinbort
11/24/2022, 3:45 PM.../my-dagster-project/tmpav872908/storage/{assetkey}
Dane Linssen
11/24/2022, 4:29 PMVDBT
and VDBT_STG
) into our dagster repo (v 1.0.16
), however we’re getting the following error:
UserWarning: Error loading repository location dbt:dagster._core.errors.DagsterInvalidDefinitionError: Conflicting versions of resource with key 'dbt' were provided to different assets. When constructing a job, all resource definitions provided to assets must match by reference equality for a given key.
The error is pretty clear, though we haven’t been able to figure out how to solve it. We’ve tried with build_resources(..) as resources:
but have not been successful with that yet either, and aren’t sure if that’s actually the way to go. Does anyone here know the answer?
Minimal non-working example:
from dagster import repository, with_resources
from dagster_dbt import load_assets_from_dbt_project, dbt_cli_resource
import os
project_folder = os.path.join('python', 'dbt')
VDBT_DIR = os.path.join(project_folder, 'dagster', 'dbt', 'vdbt')
VDBT_PROFILES_DIR = os.path.join(VDBT_DIR, '.profiles')
VDBT_STG_DIR = os.path.join(project_folder, 'dagster', 'dbt', 'vdbt_stg')
VDBT_STG_PROFILES_DIR = os.path.join(VDBT_STG_DIR, '.profiles')
@repository
def dbt():
return [
*with_resources(
load_assets_from_dbt_project(
project_dir=VDBT_DIR,
profiles_dir=VDBT_PROFILES_DIR,
key_prefix='vdbt',
),
resource_defs={
"dbt": dbt_cli_resource.configured(
{"project_dir": VDBT_DIR, 'profiles_dir': VDBT_PROFILES_DIR},
)
},
),
*with_resources(
load_assets_from_dbt_project(
project_dir=VDBT_STG_DIR,
profiles_dir=VDBT_STG_PROFILES_DIR,
key_prefix='vdbt_stg',
),
resource_defs={
"dbt": dbt_cli_resource.configured(
{"project_dir": VDBT_STG_DIR, 'profiles_dir': VDBT_STG_PROFILES_DIR},
)
},
),
]
David Fernández Calle
11/24/2022, 4:46 PMSlackbot
11/24/2022, 5:05 PMDagster Jarred
11/24/2022, 9:23 PMStefan Adelbert
11/25/2022, 4:04 AMcontext.instance.get_runs()
and then runs some checks on those jobs, so that should cover the last point above.
But it's not clear from the [documentation](https://docs.dagster.io/_apidocs/internals#instance) how I could programmatically get information about workspaces, jobs and schedules from the dagster instance.
I can get this information from the GraphQL API (https://docs.dagster.io/concepts/dagit/graphql#get-a-list-of-repositories), but I'd rather have a job collect this info and then "call home". Any advice on this would be very useful.Peter Davidson
11/26/2022, 8:03 AMrun_config = {'local_artifact_storage':
{'config':
{'base_dir': r'C:\Users\peter\dagster'
}
}
}
How can I add this to an @repository ?
@repository
def my_repo():
return [test_asset1, test_asset2]
Andrea Giardini
11/27/2022, 4:52 PMVisualize the GitHub stars,
there is an indentation error in the code. The last three lines should be indented. Also the line where we defile the markdown
variable for some reason there are three spaces instead of fourOliver
11/28/2022, 10:46 AMSimo Tumelius
11/28/2022, 11:50 AMdagster._core.errors.DagsterInvalidDefinitionError: @graph 'graph_two' has unmapped input '_in'. Remove it or pass it to the appropriate op/graph invocation.
Any suggestions? 🙂
@graph
def graph_one():
pass
@graph(ins={"_in": In(Nothing)})
def graph_two(_in):
_in
@graph
def graph_three():
graph_two(_in=graph_one())
Lech
11/28/2022, 1:24 PMpartitions_def= StaticPartitionsDefinition(key_list)
partitioned scheduler:
def get_schedule_def(partitions_def, cron_schedule, job, execution_timezone):
@schedule(name=f'{job.name}_scheduler',
cron_schedule=cron_schedule,
job=job,
default_status=DefaultScheduleStatus.STOPPED,
execution_timezone=execution_timezone)
def schedule_def():
partition_keys = partitions_def.get_partition_keys()
if len(partition_keys) == 0:
return SkipReason("The job's PartitionsDefinition has no partitions")
for key in partition_keys:
yield job.run_request_for_partition(partition_key=key, run_key=key)
return schedule_def
partitioned job creation:
job = asset.to_job(name=f"Job_{job_name}", resource_defs=resource_defs,
config=config, partitions_def=partitions_def, executor_def=in_process_executor,)
and job variable + get_schedule_def goes to the repository functionJosh Taylor
11/28/2022, 1:39 PMMark Fickett
11/28/2022, 2:42 PMENVIRONMENT
env var, so for now I just have a switch based on that when I'm building the schedules, so when the code's loaded by the agent for ENVIRONMENT=production
we'll get one schedule definition, and another for ENVIRONMENT=staging
. Is there a better way?Mycchaka Kleinbort
11/28/2022, 3:58 PM@asset
def model(X_train:pd.DataFrame, y_train:pd.Series)->LogisticRegression:
...
return my_model
@asset
def confusion_matrix()->plt.Figure:
...
return fig
Jordan
11/28/2022, 5:07 PMSelene Hines
11/28/2022, 5:22 PMMatt Millican
11/28/2022, 10:41 PMdagster api grpc
in a single ECS task we call user_code
. We’re using the EcsRunLauncher
to launch jobs, so when a run is triggered through Dagit, a sensor, etc., we provision a new ECS task pointing to an ECR image holding our own code repository in order to do the work.
This scales quite well in terms of throughput and resource usage—runs don’t eat each other’s resources since we spin up as many ECS tasks as we like! But it takes about 1-2 minutes for ECS to provision and launch the new task for a run, which is slower than we’d want.
On the opposite extreme, I could imagine simply using the DefaultRunLauncher
in the gRPC task so that runs could start near-instantaneously after being triggered. But a single task, even a large one, probably couldn’t handle the volume of runs we’re working with.
Has anyone worked with any sort of in-between? I’m imagining something like an autoscaling ECS service with a bunch of user-code tasks that launch jobs instantly on demand, but how to put this together isn’t obvious to me. A custom run launcher? Or maybe add more user_code
gRPC tasks and somehow load-balance traffic to those tasks from dagit
/ dagster-daemon
?
Any thoughts welcome 😃daggy 3dOliver
11/28/2022, 11:17 PMDusty Shapiro
11/29/2022, 1:01 AMfrom .assets import lead_assets
, but in my user deployment I have to use the full path (i.e. from foo.sales.assets import lead_assets
This is the error I get
ImportError: attempted relative import with no known parent package
I see most examples using the dot notation, but I’m struggling in making this work while deployed (and I’d like to keep local vs deployed consistant)
Dagster is deployed to K8s via Helm Chart. Much thanks!Roei Jacobovich
11/29/2022, 8:17 AMFailed
status and return DagsterUserCodeUnreachableError
as expected due to timeout. A manual reload will fix the issue. We also increased the timeout itself.
Is there a way to auto-reload the failed repositories?
Thanks!Karn Saxena
11/29/2022, 10:40 AM@op(out=DynamicOut())
def build_ops(reports):
for report in reports:
yield DynamicOutput(report_op_factory(report), report[TABLE_KEY])
----
def report_op_factory(report):
@op(
name=report[TABLE_KEY],
required_resource_keys=DYNAMIC_OP_REQUIRED_RESOURCES,
out=Out(io_manager_key=BQ_IO_MANAGER_KEY),
)
def inner_op(context: OpExecutionContext):
"""Op to download a report from smartly given a URL"""
smartly: SmartlyReports = context.resources.smartly_reports
return smartly.download_report(report[URL_KEY])
return inner_op
Then in order to run all of them at the same time
run(ops.collect())
----
@op(required_resource_keys=DYNAMIC_OP_REQUIRED_RESOURCES)
def run(context, ops):
for o in ops:
o()
The issue i have is that the op call o() requires context but i'm not sure how to pass it.
I've tried using build_op_context
o(
build_op_context(
{
BQ_IO_MANAGER_KEY: getattr(context.resources, BQ_IO_MANAGER_KEY),
SMARTLY_REPORTS_RESOURCE: getattr(context.resources, SMARTLY_REPORTS_RESOURCE),
}
)
)
But it does not really attach the IO Manager I ask for even when i specified it in the factory,
Anybody has faced the same issue?Olivier Doisneau
11/29/2022, 1:42 PMManish Khatri
11/29/2022, 4:36 PMJeff
11/29/2022, 5:43 PMIOManager
or something) to determine which columns to partition the persisted delta table based on the Dagster partition. The problem I’m having is that with is that I can’t find a way to encode the column name into the Dagster partition definition without creating my own implementation; I believe partition definitions (except for multi-dimension partitions) are anonymous so for example I would like to have a partition with {date:2022-11-29}
instead of just 2022-11-29
.Rafael Gomes
11/29/2022, 5:47 PMMultiPartitionsDefinition
? It is flagged as "Experimental"Chris Roth
11/29/2022, 5:57 PMmap
method with more than one dynamic output? That is, I'd like to map
the summation of the squares and cubes below.
@op(out=DynamicOut(int))
def one_through_five():
for number in [1, 2, 3, 4, 5]:
yield DynamicOutput(number, mapping_key=f"number_{number}")
@op()
def square_the_number(number: int):
return number * number
@op()
def cube_the_number(number: int):
return number * number * number
@op()
def sum_squares_and_cubes(context, square: int, cube: int):
<http://context.log.info|context.log.info>(f"square: {square}, cube: {cube}")
return square + cube
@graph()
def process_sum_squares_and_cubes():
numbers = one_through_five()
squares = numbers.map(square_the_number)
cubes = numbers.map(cube_the_number)
# I want to map this
total = sum_squares_and_cubes(squares, cubes)
Zach P
11/29/2022, 7:33 PM