Rubén Lopez Lozoya
02/24/2021, 8:52 AMTraceback (most recent call last):
File "/usr/local/bin/dagster", line 8, in <module>
sys.exit(main())
File "/usr/local/lib/python3.8/site-packages/dagster/cli/__init__.py", line 41, in main
cli(auto_envvar_prefix=ENV_PREFIX) # pylint:disable=E1123
File "/usr/local/lib/python3.8/site-packages/click/core.py", line 829, in __call__
return self.main(*args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/click/core.py", line 782, in main
rv = self.invoke(ctx)
File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1259, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1259, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1066, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/usr/local/lib/python3.8/site-packages/click/core.py", line 610, in invoke
return callback(*args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/dagster/cli/api.py", line 86, in execute_run_with_structured_logs_command
DagsterInstance.from_ref(args.instance_ref) if args.instance_ref else DagsterInstance.get()
File "/usr/local/lib/python3.8/site-packages/dagster/core/instance/__init__.py", line 311, in from_ref
run_storage=instance_ref.run_storage,
File "/usr/local/lib/python3.8/site-packages/dagster/core/instance/ref.py", line 196, in run_storage
return self.run_storage_data.rehydrate()
File "/usr/local/lib/python3.8/site-packages/dagster/serdes/__init__.py", line 342, in rehydrate
module = importlib.import_module(self.module_name)
File "/usr/local/lib/python3.8/importlib/__init__.py", line 127, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
File "<frozen importlib._bootstrap>", line 1014, in _gcd_import
File "<frozen importlib._bootstrap>", line 991, in _find_and_load
File "<frozen importlib._bootstrap>", line 961, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "<frozen importlib._bootstrap>", line 1014, in _gcd_import
File "<frozen importlib._bootstrap>", line 991, in _find_and_load
File "<frozen importlib._bootstrap>", line 975, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 671, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 783, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/usr/local/lib/python3.8/site-packages/dagster_postgres/__init__.py", line 4, in <module>
from .run_storage import PostgresRunStorage
File "/usr/local/lib/python3.8/site-packages/dagster_postgres/run_storage/__init__.py", line 1, in <module>
from .run_storage import PostgresRunStorage
File "/usr/local/lib/python3.8/site-packages/dagster_postgres/run_storage/run_storage.py", line 3, in <module>
from dagster.core.storage.runs import DaemonHeartbeatsTable, RunStorageSqlMetadata, SqlRunStorage
ImportError: cannot import name 'DaemonHeartbeatsTable' from 'dagster.core.storage.runs' (/usr/local/lib/python3.8/site-packages/dagster/core/storage/runs/__init__.py)
I've followed the deploy docker example and the only difference is the docker_example pipelines dockerfile (where I use my own app). All the containers are up and running but the error rises when a new container is spawned after trying to run a pipeline. Anybody knows why does this happen?
Edit: The log error comes from the spawned container (which stops immediately)Hamza Khurshid Butt
02/24/2021, 10:26 AMMarco
02/24/2021, 11:42 AMMelle Minderhoud
02/24/2021, 12:07 PMMarco
02/24/2021, 1:16 PMraaid
02/24/2021, 2:10 PMdhume
02/24/2021, 3:29 PMPrratek Ramchandani
02/24/2021, 8:24 PMdagster_gcp
and had a question - I’d like to avoid typing out the entire schema in a YAML file and it seems my best bet is to use the configured
API and write Python to load the schema from a JSON file. Is there any way I can instead specify the path to the file in a YAML file and write a custom type with a corresponding type loader to pass the schema to the solid? My concern then is that the solid config expects a schema of type Array(inner_type=dict)
but this would then be some other custom type. Also, are custom types even supported for config, or is that only for inputs?sk4la
02/24/2021, 8:29 PMDynamicOutput
assignation and I find the example in the documentation a bit vague in this regard.
Quick example from the documentation:
@pipeline
def process_directory():
files_in_directory().map(process_file)
How would one pass the DynamicOutput
from files_in_directory
as a specific keyword parameter to process_file
, as in this:
@pipeline
def process_directory():
files_in_directory().map(process_file(file_path=mapped_value, other_arg="fixed value"))
Does the mapping rely on the name given to the DynamicOutput
? Is something like this even possible at the moment?user
02/25/2021, 9:44 PMsandy
02/26/2021, 1:10 AMSasha Gorelikov
02/26/2021, 10:39 AMSasha Gorelikov
02/26/2021, 10:40 AMif __name__ == "__main__":
execute_pipeline(
pipe_line_process_data_exch,
mode="some_mode"
, run_config={"loggers": {"my_logger": {}}}
)
but for sensor I didn't find any example.gazpot
02/26/2021, 1:19 PMrun_config
passing to execute_pipeline
.
The error I am getting i s:
raise DagsterInvalidConfigError(
dagster.core.errors.DagsterInvalidConfigError: Error in config for pipeline run_pipeline
Error 1: Missing required config entry "solids" at the root.
The problem appears to be that the config that is being passed to the input_code
function is resolving as NoneType. I've read elsewhere that malformed returns cause the above error, and so it may be that context.solid_config
is the cause, however this is a fairly minimal, modified from https://docs.dagster.io/tutorial/basics_solids.
Any idea what I am doing wrong here?
from dagster import (
execute_pipeline,
pipeline,
solid,
String,
)
@solid
def market_iterator(context, market_now_is):
market_mod = market_now_is + '_modified'
<http://context.log.info|context.log.info>(
f"Processing {market_mod}"
)
return market_mod
@solid(config_schema={"market_id": String})
def input_code(context):
market_now_is = context.solid_config["market_id"]
return market_now_is
@pipeline
def run_pipeline():
market_mod = market_iterator(input_code())
config_dict = {
"solids": {
"input_code": {
"config": {"market_id": "DATA"}}
}
}
def batch_markets():
execute_pipeline(run_pipeline, run_config=config_dict)
if __name__ == "__main__":
batch_markets()
Rubén Lopez Lozoya
02/26/2021, 2:49 PMJai Kumaran
02/26/2021, 4:19 PMStefan Reist
02/26/2021, 10:08 PMgazpot
02/27/2021, 6:33 PMdef get_market_ohlc_factory(name, market):
@solid(name=name, required_resource_keys={"api"})
def _get_market_ohlc(context):
ts_ohlc = context.resources.api.get_ohlc(market)
return ts_ohlc
return _get_market_ohlc
This runs as expected
@solid(required_resource_keys={"api"})
def _get_market_ohlc(context):
market = 'BTC/USDT'
ts_ohlc = context.resources.api.get_ohlc(market)
return ts_ohlc
The purpose of the factory is to pass parameters form a list to create solids. The pipeline func looks like this. The get_market_list()
returns a python list from a file.
@pipeline(
mode_defs=[
ModeDefinition(
resource_defs={"postgres": postgres_resource,
"api": api_resource,
"redis": redis_resource})
])
def run_pipeline():
market_list = get_market_list()
for market in market_list:
ts_ohlc = get_market_ohlc_factory(name=f'_get_market_ohlc', market=market)
Thomas
02/28/2021, 5:52 PMSteve Pletcher
03/01/2021, 4:01 PMjonathan
03/01/2021, 5:02 PMOhad Basan
03/01/2021, 10:23 PMdagster-k8s/config
tag to apply tags to the k8s job that’s created by dagster
I’m doing it like this:
@solid(
tags={
'dagster-k8s/config':{
'pod_template_spec_metadata': {
'annotations': { "<http://iam.amazonaws.com/role|iam.amazonaws.com/role>": "Dist"}
},
}
},
required_resource_keys={"data_lake", "dataset_name"},
config_schema=String,
)
the created job is being created without the above tags.
I downloaded the run’s debug log from the dagster UI and i see that the tags do appear in the debug log. just not on the k8s job. am i doing something wrong? thank you!NawafSheikh
03/02/2021, 5:49 AMSimon Späti
03/02/2021, 11:06 AMrun_key
isn’t a filename, but a list of filenames. Let’s say our pipeline will process a list of files rather than file-by-file, therefore we pass a list along. The sensor will check if there are new files and package them into a list of files around 500MB. Now the challenge, a file could be contained in different lists in case of re-processing. Is there a possible way the sensor could check if a “file in a list” has been processed? 👉 [more in thread]Frank Dekervel
03/02/2021, 9:44 PMSteve Pletcher
03/02/2021, 9:47 PMszalai1
03/03/2021, 8:55 AMxxxx_pipeline cannot be executed with the provided config.
If I copy the schedule config runs fine.Jeff Hulbert
03/03/2021, 9:16 PMLaura Moraes
03/03/2021, 10:25 PMSasha Gorelikov
03/04/2021, 8:32 AM@solid(output_defs=[OutputDefinition(Dict, 'data', is_required=False)])
def solid_1(context):
#read data code
if data:
return dict()
else:
return None #how stop pipeline here?
@solid(input_defs=[InputDefinition('data', Dict)])
def solid_2(context,data):
return dict()
Thanks!