Andras Somi
01/22/2023, 2:19 PMio_manager
that saves the output of a partitioned asset and behaves nicely if a non-partitioned downstream asset asks for the output (it should return the latest available partition), but I got a bit lost on what to look for in the InputContext
object. I tried to decipher the relevant methods on UPathIOManager
but couldn’t really grasp what’s going on with the partition keys (comments are confusing for me, I even sense some typos there, but cannot be sure…). How should I decide whether the downstream asset is partitioned so load_input
should return the specific partition or non-partitioned therefore load_input
should return the latest available?Jake Kagan
01/22/2023, 7:21 PM@op(required_resource_keys={"rsrc_bigquery_runner"},
config_schema={'qry_str': str,
'filter_func': Field(Any, default_value=None)} # this function is meant to allow further play on the dataframe
)
def op_bigq_adjust_df_results(context: OpExecutionContext) -> pl.DataFrame:
qry_str = context.op_config['qry_str']
filter_func = context.op_config['filter_func']
df: pl.DataFrame = context.resources.rsrc_bigquery_runner(qry_str).bigquery_to_df()
if filter_func is None:
return df
else:
filtered_df = filter_func(df)
get_dagster_logger().info(
f'ADJUSTED DATAFRAME SHAPE: {str(filtered_df.shape)} columns: {str(filtered_df.shape[1])}, rows: {str(filtered_df.shape[0])}')
return filtered_df
then i want to be able to use this op in it's configured form either at the top of a job (no dependency) or somewhere else (with dependency of other ops)
so here would be initial configuration of the op - which leaves it open for further configuration:
@configured(configurable=op_bigq_adjust_df_results, config_schema={"query": str, "placeholder": str})
def checking_configured(context, SOME_UPSTREAM_OP): # SOME_UPSTREAM_OP would provide me with a replacement string for dynamic sql
replacement = SOME_UPSTREAM_OP
query = context.op_config['query'].replace(
placeholder=context.op_config['placeholder'],
replacement=replacement)
return {"qry_str": query, "qry_type": "DQL"} # this would then be passed to the above base op (op_bigq_adjust_df_results)
and then i would have a third op where i pass the actual values to the above configured op:
x = configured(configurable=checking_configured, name='x')(
{"query": QRY_BIGQ,
"placeholder": '$placeholder$',
}
)
i already have a bigquery resource, which has all sorts of methods - but i'd like to see stuff on the dagit chart. and id like to turn some configured ops into assets
THANK YOU!Jake Kagan
01/22/2023, 7:23 PMDinis Rodrigues
01/22/2023, 9:39 PMexecute_job
to test my jobs.
I'm using databricks step launcher, and it requires a reconstructable pipeline, so I'm unable to use job.execute_in_process
.
From the docs I'm doing:
instance = DagsterInstance.get()
execute_job(reconstructable(my_job), run_config=config, instance=instance)
But I get this error:
dagster._check.CheckError: Failure condition: Unexpected return value from child process <class 'collections.ChildProcessStartEvent'>
Stack Trace:
File "/opt/conda/envs/dagster_env/lib/python3.9/site-packages/dagster/_core/execution/api.py", line 991, in pipeline_execution_iterator
for event in pipeline_context.executor.execute(pipeline_context, execution_plan):
File "/opt/conda/envs/dagster_env/lib/python3.9/site-packages/dagster/_core/executor/multiprocess.py", line 240, in execute
event_or_none = next(step_iter)
File "/opt/conda/envs/dagster_env/lib/python3.9/site-packages/dagster/_core/executor/multiprocess.py", line 364, in execute_step_out_of_process
check.failed("Unexpected return value from child process {}".format(type(ret)))
File "/opt/conda/envs/dagster_env/lib/python3.9/site-packages/dagster/_check/__init__.py", line 1687, in failed
raise CheckError(f"Failure condition: {desc}")
Am I missing something?Gabe Schine
01/23/2023, 5:23 AMoutput_required=False
with a FreshnessPolicy
and `asset_reconciliation_sensor`: if the asset job is written to only materialize (yield
an Output
) when some source data on the internet changes, but the FreshnessPolicy
is set to "1 hour", what happens when the last materialization was 10 hours ago but the source data hasn't changed, yet?Manuel Kollegger
01/23/2023, 5:50 AMJava gateway process exited before sending its port number
.
It works locally, but not on kubernetes and I am out of ideas. I wanted to try https://github.com/dagster-io/dagster/issues/2748, but all links are brokenMycchaka Kleinbort
01/23/2023, 9:39 AMJonathan Wears
01/23/2023, 10:40 AMTraceback (most recent call last):
File "/usr/local/bin/dagster", line 8, in <module>
sys.exit(main())
File "/usr/local/lib/python3.10/site-packages/dagster/_cli/__init__.py", line 48, in main
cli(auto_envvar_prefix=ENV_PREFIX) # pylint:disable=E1123
File "/usr/local/lib/python3.10/site-packages/click/core.py", line 1130, in __call__
return self.main(*args, **kwargs)
File "/usr/local/lib/python3.10/site-packages/click/core.py", line 1055, in main
rv = self.invoke(ctx)
File "/usr/local/lib/python3.10/site-packages/click/core.py", line 1657, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/usr/local/lib/python3.10/site-packages/click/core.py", line 1657, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/usr/local/lib/python3.10/site-packages/click/core.py", line 1404, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/usr/local/lib/python3.10/site-packages/click/core.py", line 760, in invoke
return __callback(*args, **kwargs)
File "/usr/local/lib/python3.10/site-packages/dagster/_cli/api.py", line 61, in execute_run_command
DagsterInstance.from_ref(args.instance_ref)
File "/usr/local/lib/python3.10/site-packages/dagster/_core/instance/__init__.py", line 472, in from_ref
run_coordinator=instance_ref.run_coordinator,
File "/usr/local/lib/python3.10/site-packages/dagster/_core/instance/ref.py", line 437, in run_coordinator
return self.run_coordinator_data.rehydrate() if self.run_coordinator_data else None
File "/usr/local/lib/python3.10/site-packages/dagster/_serdes/config_class.py", line 93, in rehydrate
raise DagsterInvalidConfigError(
dagster._core.errors.DagsterInvalidConfigError: Errors whilst loading configuration for {'max_concurrent_runs': Field(<dagster._config.source.IntSourceType object at 0x7effa69b4dc0>, default=@, is_required=False), 'tag_concurrency_limits': Field(<dagster._config.config_type.Noneable object at 0x7effa02f4370>, default=@, is_required=False), 'dequeue_interval_seconds': Field(<dagster._config.source.IntSourceType object at 0x7effa69b4dc0>, default=@, is_required=False)}.
Error 1: Received unexpected config entries "['dequeue_num_workers', 'dequeue_use_threads']" at the root. Expected: "['dequeue_interval_seconds', 'max_concurrent_runs', 'tag_concurrency_limits']."
I am confused by this as I do not specify anywhere in my code the "unexpected" config entries .So, I was wondering if anyone has seen and dealt with a similar error before and what they did to fix it?Chris Boyles
01/23/2023, 10:57 AMGerben van der Huizen
01/23/2023, 12:07 PMAkshay Verma
01/23/2023, 12:23 PMDaniel Galea
01/23/2023, 1:31 PMimage
parameter is not optional, so this leads me to believe that the Pod will only have one container based on that image but then I also see that I can specify pod_spec_config
which tells me that I can define a Pod with multiple containers, like:
apiVersion: batch/v1
kind: Job
metadata:
name: hello
spec:
template:
# This is the pod template
spec:
containers:
- name: hello
image: busybox:1.28
command: ['sh', '-c', 'echo "Hello, Kubernetes!" && sleep 3600']
restartPolicy: OnFailure
- name: world
.....
# The pod template ends here
but then I'm not sure where the image
parameter fits in, since each container has its image specified in the Pod Specification. So which is the case? Are there any examples on this?Sean Davis
01/23/2023, 3:33 PMDinis Rodrigues
01/23/2023, 3:49 PMCombiz Khozoie
01/23/2023, 5:25 PM@op(out={"model": Out(), "metrics": Out()})
def train(dm):
res = sc.run_training(dm)
model = res.get_model("logreg")
metrics = res.get_model_metrics("logreg")
return (
Output(model, output_name="model"),
Output(str(metrics), output_name="metrics"),
)
Derek Truong
01/23/2023, 5:34 PMWill Holley
01/23/2023, 8:53 PMJordan
01/23/2023, 10:34 PMMikeVL
01/23/2023, 11:15 PMOperation name: SingleSensorQuery
Message: Invalid version: '8.0.31-google'
Path: ["sensorOrError","sensorState","runs"]
Locations: [{"line":28,"column":9}
Spencer Nelson
01/24/2023, 1:02 AM@resource
, I can use @contextmanager
and things just work; is that true of @io_manager
as well?Spencer Nelson
01/24/2023, 1:33 AM@asset
decorator’s io_manager_key
parameter supposed to work? I can’t seem to get it to recognize a custom IOManager even with a very minimal case (code in thread)CJ
01/24/2023, 3:14 AMAbhishek Agrawal
01/24/2023, 6:51 AMMarcel Coetzee
01/24/2023, 9:11 AMMarcel Coetzee
01/24/2023, 9:11 AMKlaus Stadler
01/24/2023, 1:36 PMJake Kagan
01/24/2023, 2:36 PMx = configured(reusable_op, name='x')({"whatever_is_configured_in_the_reusable_op":'some_arg')
how can i set an op above this one without changing the reusable op?Jake Kagan
01/24/2023, 2:42 PMJuan Diego Castrillon
01/24/2023, 3:11 PMArnas Ambrasas
01/24/2023, 3:32 PM...
python_logs:
python_log_level: INFO
dagster_handler_config:
handlers:
dagster:
class: logging.StreamHandler
level: INFO
stream: <ext://sys.stdout>
formatter: myFormatter
formatters:
myFormatter:
format: "%(message)s"
class: pythonjsonlogger.jsonlogger.JsonFormatter
...
Console logs:
...
{"message": "single_pod_job - bc6aee2e-79bc-4225-9d09-8790a2da5647 - multiply_the_word - Multiplying", "dagster_meta": {...}
2023-01-24 15:22:24 +0000 - dagster - INFO - single_pod_job - bc6aee2e-79bc-4225-9d09-8790a2da5647 - multiply_the_word - Multiplying
...