Vlad Efanov
02/13/2022, 6:17 AMrun_coordinator:
module: dagster.core.run_coordinator
class: QueuedRunCoordinator
config:
max_concurrent_runs: 3
tag_concurrency_limits:
- key: "exe_file"
limit: 2
- key: "python_script"
limit: 2
I want to be able to change the limit for "exe_file"/"python_script" or "max_concurrent_runs" tags while Dagster is running. Is it possible?Roei Jacobovich
02/13/2022, 9:51 AMSundara Moorthy
02/14/2022, 8:31 AMFrank Lu
02/14/2022, 9:29 AMbasic_ml
and taco
in a repository. Is there a way to specify both packages in the`dagsterApiGrpcArgs` ?
dagster-user-deployments:
enabled: true
deployments:
- name: "fow-dagster-ds"
image:
repository: "<http://artifactory.flexport.io/flexport-docker-sandbox-local/fow/dagster/ds|artifactory.flexport.io/flexport-docker-sandbox-local/fow/dagster/ds>"
tag: "0.1.0"
pullPolicy: Always
dagsterApiGrpcArgs:
- "--package-name"
- "basic_ml"
port: 3030
Mark
02/14/2022, 10:34 AMjob A
in run_status_sensor
, how can i do that? Thanks!Alex Molotsky
02/14/2022, 3:50 PMSilvio Domingos
02/14/2022, 5:22 PMdaniel blinick
02/14/2022, 6:42 PMrun_status_sensor
. our sensor is triggered when job A completes, and its purpose is to kick off job B. to achieve that, we yield a RunRequest
object. however, it seems - from looking at the logs on the dagster code - that what is actually being yielded, somehow, is a PipelineRunReaction
. im confused as to what this is, as I cant find much documentation about it in the code. it seems as though part of the code (e.g. the wrap_sensor_evaluation
function in sensor.py) validates that the the only values being returned are either RunRequest
or SkipReason
, but from other parts of the code it seems that PipelineRunReaction
is valid (e.g. the evaluate_tick
function in sensor.py).
can you shed some light on what might be happening here?
thanks a lot!Vlad Efanov
02/14/2022, 9:52 PM[CONSTANTS]
script_path = ..\..\scripts\mp.py
exe_file_path = ..\..\scripts\mp.exe
new_videos_directory_path = ..\..\video_files\new
in_process_videos_directory_path = ..\..\video_files\in_process
finished_videos_directory_path = ..\..\video_files\finished
Config class
import os
from configparser import ConfigParser
class Singleton(type):
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
return cls._instances[cls]
class MsaConfig(metaclass=Singleton):
config_object = ConfigParser()
default_group_name = "CONSTANTS"
default_configuration_file_path = f'{os.path.dirname(__file__)}/MSA_orchestrator_config.ini'
print(f'Constructor config_object is: {config_object}')
def configuration_extractor(self, group_name=default_group_name,
configuration_file_path=default_configuration_file_path):
self.config_object.read(configuration_file_path)
if group_name not in self.config_object:
raise "Default group name is not provided"
return self.config_object[group_name]
Code in the sensor that implements the class:
msa_config_object = MsaConfig()
constants = msa_config_object.configuration_extractor()
Vlad Efanov
02/15/2022, 12:26 AMpython_logs:
dagster_handler_config:
handlers:
myFileHandler:
class: logging.FileHandler
level: INFO
filename: 'logs.txt'
mode: 'a'
formatter: myFormatter
myConsoleHandler:
class: logging.StreamHandler
level: INFO
I have 2 questions:
1. How can I add a timestamp to the filename when creating a log file? I want this format: logs_yyyy-mm-ddThh_mm_ss.SSS
2. I want to set the maximum log file size. For example, when the log file is 30MB, the logger will start writing logs to a new log file. How should I do it?Bryan Chavez
02/15/2022, 2:46 AMDylan Hunt
02/15/2022, 5:18 AMLP
02/15/2022, 5:56 AMrun
have too many event log entry.Isy
02/15/2022, 10:24 AMjob_name.execute_in_process(run_config=run_config)
Sundara Moorthy
02/15/2022, 12:48 PMemr_step_def = self._get_emr_step_def(run_id, step_key, step_context.solid.name)
It is submitting the spark jobs for each solid. How it is segregating the solid code from the whole file to run as a separate job?? for example
@op(required_resource_keys={"pyspark", "pyspark_step_launcher"})
def make_people(context) -> DataFrame:
schema = StructType([StructField("name", StringType()), StructField("age", IntegerType())])
rows = [Row(name="Thom", age=51), Row(name="Jonny", age=48), Row(name="Nigel", age=49)]
return context.resources.pyspark.spark_session.createDataFrame(rows, schema)
@op(required_resource_keys={"pyspark_step_launcher"})
def filter_over_50(people: DataFrame) -> DataFrame:
return people.filter(people["age"] > 50)
@graph
def make_and_filter_data():
filter_over_50(make_people())
In this code, assuming we are using emr-step-launcher, how the filter_over_50 and make_people is segregated and executed as separate jobs?Jonas De Beukelaer
02/15/2022, 2:30 PMdagster-user-deployment
for a single dagster
deployment? Basically I would have two different teams with their own user deployments, and I’m wondering if I can have them manage their own code, separately. I just tried it but it seems the second user deployment wasn’t being picked upGinger Balmat
02/15/2022, 3:08 PMAssetMaterialization
events. I have an op
which yields this event:
yield AssetMaterialization(
asset_key="something_important",
metadata={
"path": EventMetadata.text("path/to/something/important")
}
)
I have a job
in a completely different repository listening for this event, which relies on that value for path
in the metadata. I've found that I can access the metadata in my asset_sensor
function using this:
asset_event.dagster_event.event_specific_data.materialization.metadata_entries[0].entry_data.text
This seems a bit convoluted though. Is there a better way to access AssetMaterialization
metadata so that it can be passed on to future jobs?Albert Hsiung
02/15/2022, 3:13 PMAuster Cid
02/15/2022, 4:13 PM0.12.15
to 0.13.18
and we're running into some problems. The scheduler daemon fails to connect to the grpc server and opening the schedules page results in the following graphql error:dixel
02/15/2022, 4:36 PMdownload.execute_in_process(resources={"api": ResourceDefinition.mock_resource()})
as execute_in_process
method doesn't have resources
or resource_defs
parameter (https://docs.dagster.io/_apidocs/jobs#dagster.JobDefinition.execute_in_process). I found some workarounds (converting all the jobs to graphs, and passing resource_defs in there), but wondering if resources
is still expected to be part of execute_in_process
.Zach
02/15/2022, 6:57 PMKevin Haynes
02/15/2022, 11:01 PMslack_on_failure
and slack_on_success
, which I thought would tell me when the job succeeds or fails, but it fires the Slack message when the individual ops succeed or fail. Is there a way to configure it so the hook is "at the job level?"Charlie Bini
02/15/2022, 11:36 PMSaurav Mittal
02/16/2022, 2:16 AMexecute_pipeline()
API to a trigger pipeline. I want to have sensors in my pipeline as well. But, the params of build_reconstructable_pipeline()
do not take SensorDefinition as input anywhere. *args and **kwargs which are passed are used for PipelineDefinition.
I was not sure why the module, function, and working directory name are being taken as input for build_reconstructable_pipeline(). So, I inspected the source code of the function and found the below lines of code in the image which show that an ephemeral repository is being created (Side question-1: What is an ephemeral instance or ephemeral repo?). Looks like they are being used to create a ReconstructableRepository
.
def build_reconstructable_target(
reconstructor_module_name,
reconstructor_function_name,
reconstructable_args=None,
reconstructable_kwargs=None,
reconstructor_working_directory=None,
)
How do I add SensorDefinition into pipeline execution? (Side question-2: What is the difference between job, graph, and pipeline?)Bennett Norman
02/16/2022, 4:20 AMNitin Madhavan
02/16/2022, 5:37 AMRaphaël Riel
02/16/2022, 3:00 PMload_env
call before dagster is booting.Dominick Giordano
02/16/2022, 3:28 PM@graph
with multiple @op
for each.
I would like to be able to run some of these entire graphs at a certain time (once a day/once a week) and some of them based on an outlying trigger (file upload etc.).
If anyone with experience configuring a schedule could help that would be greatly appreciated, thanks!David Serédi
02/16/2022, 3:51 PMDylan Hunt
02/16/2022, 4:11 PMexecute_pipeline
? I tried reconstructable with multiprocessing already but it is spawning a process for each step/jobs instead of creating for non dependent jobs alone. And since it is using FS as IO manager to read/write step outputs, it is also adding latency and results in 10x slower than sequential execution.