Caleb Schoepp
02/16/2021, 10:41 PMimport dagster
import datetime
def run_config_for_date_partition(partition):
date = partition.value
return {"solids": {"query_telemetry_events": {"config": {"date": date}}}}
test_sessions_partition = dagster.PartitionSetDefinition(
name="test_sessions_partition",
pipeline_name="test_sessions",
partition_fn=dagster.utils.partitions.date_partition_range(
start=datetime.datetime(2021, 2, 16),
delta_range="hours",
inclusive=True,
fmt="%Y-%m-%d-%H",
),
run_config_fn_for_partition=run_config_for_date_partition,
)
When I try to run a backfill from the Dagit UI (v0.9.21
) I get the following error:
2021-02-16T22:25:40.046372Z [error ] Exception calling application: Object of type Pendulum is not JSON serializable [grpc._server]
Traceback (most recent call last):
File "/Users/caleb/Library/Caches/pypoetry/virtualenvs/panopticon-M-mcCUTC-py3.8/lib/python3.8/site-packages/grpc/_server.py", line 435, in _call_behavior
response_or_iterator = behavior(argument, context)
File "/Users/caleb/Library/Caches/pypoetry/virtualenvs/panopticon-M-mcCUTC-py3.8/lib/python3.8/site-packages/dagster/grpc/server.py", line 385, in ExternalPartitionSetExecutionParams
serialized_external_partition_set_execution_param_data_or_external_partition_execution_error=serialize_dagster_namedtuple(
File "/Users/caleb/Library/Caches/pypoetry/virtualenvs/panopticon-M-mcCUTC-py3.8/lib/python3.8/site-packages/dagster/serdes/__init__.py", line 227, in serialize_dagster_namedtuple
return _serialize_dagster_namedtuple(
File "/Users/caleb/Library/Caches/pypoetry/virtualenvs/panopticon-M-mcCUTC-py3.8/lib/python3.8/site-packages/dagster/serdes/__init__.py", line 213, in _serialize_dagster_namedtuple
return seven.json.dumps(_pack_value(nt, whitelist_map), **json_kwargs)
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/json/__init__.py", line 234, in dumps
return cls(
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/json/encoder.py", line 199, in encode
chunks = self.iterencode(o, _one_shot=True)
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/json/encoder.py", line 257, in iterencode
return _iterencode(o, 0)
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/json/encoder.py", line 179, in default
raise TypeError(f'Object of type {o.__class__.__name__} '
TypeError: Object of type Pendulum is not JSON serializable
Not sure how to go about fixing this b/c I could only get partitions working by using date_partition_range
but it seems to be returning the wrong data type. Any help is greatly appreciated!Basil V
02/16/2021, 11:47 PMdagster-aws
library to read/write files to S3. It works locally, however, when running within Docker, it can't find my credentials. The specific error is botocore.exceptions.NoCredentialsError: Unable to locate credentials
and seems to be coming from dagster_aws/s3/compute_log_manager.py
when boto3 tries to instantiate the client.
I volume mounted my ~/.aws
file (containing creds/config) into the Docker containers at /root
using docker-compose
but Dagster/boto3 still isn't able to find the credentials. Does anyone know how to resolve this/where the credentials should be stores so boto3 can find them? (note for actual deployment we plan to use IAM but for testing Docker locally I need to figure out how to pass the creds). Thanks again!pdpark
02/17/2021, 7:09 PMPeter B
02/17/2021, 11:30 PMszalai1
02/18/2021, 2:58 PMget_all_pipelines()
on my repo and iterating over the registred pipelines and modifying the tagsuser
02/18/2021, 10:15 PMdaniel
02/18/2021, 10:25 PMNick
02/19/2021, 12:14 PMTobias Ersted
02/19/2021, 1:15 PMMarco
02/19/2021, 4:26 PMPaul Wyatt
02/19/2021, 6:22 PM/opt/dagster/dagster_home/storage/
on our box. Is there a recommended fixPaul Wyatt
02/19/2021, 6:22 PMKlaus Stadler
02/19/2021, 9:05 PMdagster.core.errors.DagsterUserCodeProcessError: dagster.core.errors.DagsterInvariantViolationError: Encountered error attempting to parse yaml. Loading YAMLs from package resources [('airline_demo.environments', 'local_base.yaml'), ('airline_demo.environments', 'local_fast_ingest.yaml')] on preset "local_fast".
What do I need to do to get it running?Jai Kumaran
02/21/2021, 5:26 PMSensorDefinition
?Jai Kumaran
02/21/2021, 5:29 PMJai Kumaran
02/22/2021, 12:08 PMdagster-daemon
errors out:
2021-02-22 17:35:18 - SensorDaemon - INFO - Checking for new runs for the following sensors: some_sensor
2021-02-22 17:35:20 - SensorDaemon - ERROR - Sensor failed for some_sensor : dagster.core.errors.DagsterUserCodeProcessError: FileNotFoundError: [Errno 2] No such file or directory: 'sensor_repo.py'
Stack Trace:
File "/usr/local/lib/python3.8/site-packages/dagster/grpc/server.py", line 356, in ListRepositories
self._repository_symbols_and_code_pointers.loadable_repository_symbols,
File "/usr/local/lib/python3.8/site-packages/dagster/grpc/server.py", line 107, in loadable_repository_symbols
self.load()
File "/usr/local/lib/python3.8/site-packages/dagster/grpc/server.py", line 97, in load
self._loadable_repository_symbols = load_loadable_repository_symbols(
File "/usr/local/lib/python3.8/site-packages/dagster/grpc/server.py", line 121, in load_loadable_repository_symbols
loadable_targets = get_loadable_targets(
File "/usr/local/lib/python3.8/site-packages/dagster/grpc/utils.py", line 25, in get_loadable_targets
else loadable_targets_from_python_file(python_file, working_directory)
File "/usr/local/lib/python3.8/site-packages/dagster/cli/workspace/autodiscovery.py", line 11, in loadable_targets_from_python_file
loaded_module = load_python_file(python_file, working_directory)
File "/usr/local/lib/python3.8/site-packages/dagster/core/code_pointer.py", line 87, in load_python_file
os.stat(python_file)
Stack Trace:
File "/usr/local/lib/python3.8/site-packages/dagster/scheduler/sensor.py", line 130, in execute_sensor_iteration
with origin.create_handle() as repo_location_handle:
File "/usr/local/lib/python3.8/site-packages/dagster/core/host_representation/origin.py", line 163, in create_handle
return ManagedGrpcPythonEnvRepositoryLocationHandle(self)
File "/usr/local/lib/python3.8/site-packages/dagster/core/host_representation/handle.py", line 223, in __init__
list_repositories_response = sync_list_repositories_grpc(self.client)
File "/usr/local/lib/python3.8/site-packages/dagster/api/list_repositories.py", line 16, in sync_list_repositories_grpc
raise DagsterUserCodeProcessError(
The sensor repo is:
@solid(config_schema={"filename": str})
def process_file(context):
filename = context.solid_config["filename"]
<http://context.log.info|context.log.info>(filename)
@pipeline
def log_file_pipeline():
process_file()
@sensor(pipeline_name="log_file_pipeline")
def some_sensor():
yield RunRequest(
run_config={"solids": {"process_file": {"config": {"filename": "hi"}}}},
)
@repository
def define_sensor_repository():
repoDict = {}
repoDict["sensors"] = {"some_sensor": some_sensor}
repoDict["pipelines"] = {"log_file_pipeline": log_file_pipeline}
return repoDict
Ben Torvaney
02/22/2021, 12:15 PM-c
flag (i.e. with a yaml file).
Is this expected? I’m struggling to see the point of the presets if I need to provide yaml as well, which suggests to me that I am failing to understand somethingJason
02/22/2021, 2:32 PMAssets
and asset keys in the UI. I only recently realized that I could organize asset keys in a folder structure (proving a list) which means I have a few asset keys that are not organized the way I'd like.
Outside of manually going into the db and deleting stuff, is there anyway I can (visually) clean up/hide the Assets keys I don't want? And if no, does anyone have a targeted SQL to delete keys?daniel
02/22/2021, 3:34 PMBasil V
02/22/2021, 8:28 PMdagster.check.CheckError: node not present in dictionary {'status': 'success', 'timing': [{'name': 'compile', 'started_at': '2021-02-22T20:19:51.445649Z', 'completed_at': '2021-02-22T20:19:51.606730Z'}, {'name': 'execute', 'started_at': '2021-02-22T20:19:51.675891Z', 'completed_at': '2021-02-22T20:19:55.108534Z'}], 'thread_id': 'Thread-4', 'execution_time': 4.140827655792236, 'message': 'SUCCESS 1', 'adapter_response': {'code': 'SUCCESS', 'rows_affected': 1}, 'unique_id': 'model.<MY PROFILE>.<TABLE NAME>'}
File "/usr/local/lib/python3.7/site-packages/dagster/core/errors.py", line 182, in user_code_error_boundary
yield
File "/usr/local/lib/python3.7/site-packages/dagster/utils/__init__.py", line 355, in iterate_with_context
next_output = next(iterator)
File "/usr/local/lib/python3.7/site-packages/dagster/core/execution/plan/compute.py", line 104, in execute_core_compute
for step_output in _yield_compute_results(compute_context, inputs, compute_fn):
File "/usr/local/lib/python3.7/site-packages/dagster/core/execution/plan/compute.py", line 70, in _yield_compute_results
for event in user_event_sequence:
File "/usr/local/lib/python3.7/site-packages/dagster/core/definitions/decorators/solid.py", line 230, in compute
yield from result
File "/usr/local/lib/python3.7/site-packages/dagster_dbt/cli/solids.py", line 195, in dbt_cli_run
cli_output = DbtCliOutput.from_dict(cli_output_dict)
File "/usr/local/lib/python3.7/site-packages/dagster_dbt/cli/types.py", line 83, in from_dict
result=DbtResult.from_dict(d),
File "/usr/local/lib/python3.7/site-packages/dagster_dbt/types.py", line 169, in from_dict
results = [NodeResult.from_dict(d) for d in check.is_list(d["results"], of_type=Dict)]
File "/usr/local/lib/python3.7/site-packages/dagster_dbt/types.py", line 169, in <listcomp>
results = [NodeResult.from_dict(d) for d in check.is_list(d["results"], of_type=Dict)]
File "/usr/local/lib/python3.7/site-packages/dagster_dbt/types.py", line 102, in from_dict
node = check.dict_elem(d, "node")
File "/usr/local/lib/python3.7/site-packages/dagster/check/__init__.py", line 784, in dict_elem
CheckError("{key} not present in dictionary {ddict}".format(key=key, ddict=ddict))
File "/usr/local/lib/python3.7/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback
raise exc.with_traceback(traceback)
Thanks so much for any insights!Cameron Gallivan
02/22/2021, 8:40 PMdocker-compose.yml
but the new containers that it spins up only use the Dockerfile_pipelines
instructions and putting a MOUNT command in there won’t actually attach itMatej Války
02/23/2021, 12:50 AMBen Torvaney
02/23/2021, 11:29 AMresources.py
, solids.py
, and so on. Has anyone tried any different structures, perhaps splitting things up by domain, or data source instead? Are there any reasons why the former approach would be considered best?
Thank you in advance 🙂Nick
02/23/2021, 11:39 AMexception: stopping dagster-daemon process since the following threads are no longer sending heartbeats: ['SCHEDULER']
When observing the terminal, it looks like each schedule takes 3 seconds to assess, and it seems to drop out around the same point. I have also noticed in Dagit itself, the Daemon Status page shows the attached picture (basically 'no recent heartbeat detected')
Is there a limit to the number of schedules I can have running? Can I extend the timeout somehow? Or maybe split the schedule into 2 so it copes better?Marco
02/23/2021, 12:36 PMHamza Khurshid Butt
02/23/2021, 1:17 PMMy daily scheduler seems to be skipping my pipelines
basically it says that :
No new runs for good_morning_schedule
I am using the exact code from quick start tutorial from dagster website (https://docs.dagster.io/tutorial/advanced_scheduling) and when i schedule it to run at a specific time, it just skips my pipeline from execution.Hamza Khurshid Butt
02/23/2021, 1:17 PMimport csv
from datetime import datetime, time
from dagster import daily_schedule, pipeline, repository, solid
from dagster.utils import file_relative_path
@solid
def hello_cereal(context, date):
dataset_path = file_relative_path(__file__, "cereal.csv")
<http://context.log.info|context.log.info>(dataset_path)
with open(dataset_path, "r") as fd:
cereals = [row for row in csv.DictReader(fd)]
<http://context.log.info|context.log.info>(
"Today is {date}. Found {n_cereals} cereals".format(
date=date, n_cereals=len(cereals)
)
)
@daily_schedule(
pipeline_name="hello_cereal_pipeline",
start_date=datetime(2021, 2, 24),
execution_time=time(6, 45)
)
def good_morning_schedule(date):
return {
"solids": {
"hello_cereal": {
"inputs": {"date": {"value": date.strftime("%Y-%m-%d")}}
}
}
}
@pipeline
def hello_cereal_pipeline():
hello_cereal()
@repository
def hello_cereal_repository():
return [hello_cereal_pipeline, good_morning_schedule]
jonathan
02/23/2021, 5:15 PMbklau-zap
02/23/2021, 8:59 PMMarco
02/23/2021, 9:21 PM