David Robinson
12/29/2021, 8:45 PMWill Gunadi
12/29/2021, 10:04 PMsourabh upadhye
12/30/2021, 5:20 AMThomas Mignon
12/30/2021, 10:47 AMRahul Sharma
12/30/2021, 12:45 PMRyan Riopelle
12/30/2021, 7:04 PMRyan Riopelle
12/30/2021, 7:22 PMKyle Downey
12/31/2021, 12:01 AM@graph
def load_itsa_assets():
universe = load_token_universe()
id_map = load_id_map()
joined_df = pd.concat([id_map.set_index('symbol'), universe.set_index('symbol')], axis=1, join='inner')
load_classifications(joined_df['itin'].tolist())
For example:
@op(out={'token_universe': Out(io_manager_key='parquet_io_manager', dagster_type=UniverseDataFrame)})
def load_token_universe(context) -> pd.DataFrame:
universe_cfg = toml.load(UNIVERSE_CFG)
symbols = []
for symbol_cfg in universe_cfg['universe']:
symbols.append({
'symbol': symbol_cfg['symbol'],
'cmc_id': symbol_cfg['cmc_id'],
'effective_date': symbol_cfg['effective_date']
})
df = pd.DataFrame(symbols)
df['effective_date'] = <http://pd.to|pd.to>_datetime(df['effective_date'], utc=True)
return df
but the return types are not DataFrames: they are instead InvokedSolidOutputHandle
, and I am unable to make regular DataFrame method calls on them. What am I missing? How do I get the actual DataFrame out?Kyle Downey
12/31/2021, 12:56 AMRahul Sharma
12/31/2021, 10:18 AMCarter
12/31/2021, 8:12 PMThomas Mignon
01/03/2022, 10:05 AMfrom dagster import fs_io_manager, job, Any
from dagster_dask import dask_executor
from pathlib import Path
from semaphore_scripts import template_runner
import os
from dagster import Nothing, In, op
@op
def adder_purified(purified_ftp: Any, purified_ftp2: Any, purified_eftp1: Any, purified_eftp2: Any, purified_httpvpublic: Any, purified_httpvpublicnew: Any) -> Any:
return purified_ftp, purified_ftp2, purified_eftp1, purified_eftp2, purified_httpvpublic, purified_httpvpublicnew
def template_runner_op(
name="default_name",
ins=None,
**kwargs,
):
"""
Args:
args (any): One or more arguments used to generate the nwe op
name (str): The name of the new op.
ins (Dict[str, In]): Any Ins for the new op. Default: None.
Returns:
function: The new op.
"""
@op(
name=name,
ins=ins or {"start": In(Nothing)},
config_schema={
"command": str,
"command_parameter": str,
"semaphore_audience_version": str,
"workspace": str
},
required_resource_keys={"io_manager"},
**kwargs
)
def _template_runner_op(context):
workspace: Path = Path(
os.path.join(context.op_config["workspace"], f'{context.op_config["command"]}-{os.environ["PBS_JOBID"]}'))
template_runner.TemplateRunner(
command=context.op_config["command"],
version=context.op_config["semaphore_audience_version"],
command_parameter=context.op_config["command_parameter"],
workspace=workspace,
skip_signal=True
).run(exit=False)
return _template_runner_op
purify_ftp = template_runner_op(name="purify_ftp")
purify_ftp2 = template_runner_op(name="purify_ftp2")
purify_httpvpublic = template_runner_op(name="purify_httpvpublic")
purify_httpvpublicnew = template_runner_op(name="purify_httpvpublicnew")
purify_eftp1 = template_runner_op(name="purify_eftp1")
purify_eftp2 = template_runner_op(name="purify_eftp2")
partition = template_runner_op(name="partition")
event_session = template_runner_op(name="event_session")
#Business/Export
business_oceanography = template_runner_op(name="business_oceanography")
export_oceanography = template_runner_op(name="export_oceanography")
business_sextant = template_runner_op(name="business_sextant")
export_sextant = template_runner_op(name="export_sextant")
business_seanoe = template_runner_op(name="business_seanoe")
export_seanoe = template_runner_op(name="export_seanoe")
business_archimer = template_runner_op(name="business_archimer")
export_archimer = template_runner_op(name="export_archimer")
export_smoswind = template_runner_op(name="export_smoswind")
@job(
name='template_runner_job',
resource_defs={"io_manager": fs_io_manager},
executor_def=dask_executor
)
def template_runner_job():
#event = event_session(partition(start=[purify_ftp(), purify_ftp2(), purify_eftp1(), purify_eftp2(), purify_httpvpublic(), purify_httpvpublicnew()]))
event = event_session(partition(purify_ftp(purify_ftp2(purify_eftp1(purify_eftp2(purify_httpvpublic(purify_httpvpublicnew())))))))
export_oceanography(business_oceanography(event))
export_sextant(business_sextant(event))
export_seanoe(business_seanoe(event))
export_archimer(business_archimer(event))
export_smoswind(event)
If i do this i obtain a graph with in-series purify op which is not what i want ( i want them in parallel ) BUT i have no error when launching my graph
But if i change the in-series purify to have parallelised purify from this :
event = event_session(partition(purify_ftp(purify_ftp2(purify_eftp1(purify_eftp2(purify_httpvpublic(purify_httpvpublicnew())))))))
To this
event = event_session(partition(start=[purify_ftp(), purify_ftp2(), purify_eftp1(), purify_eftp2(), purify_httpvpublic(), purify_httpvpublicnew()]))
My graph looks great ( cf picture joined )
But unfortunately i obtain this error on launch :
dagster.core.errors.DagsterInvariantViolationError: template_runner_job not found at module scope in file /home1/datawork/semexp/workspace/dagit/semaphore-dagster-config/jobs/process_daily.py.
Stack Trace:
File "/home/datawork-semaphore-exp/conda/conda_envs/semaphore-scripts-1.0.11/lib/python3.9/site-packages/dagster/core/execution/api.py", line 775, in pipeline_execution_iterator
for event in pipeline_context.executor.execute(pipeline_context, execution_plan):
File "/home/datawork-semaphore-exp/conda/conda_envs/semaphore-scripts-1.0.11/lib/python3.9/site-packages/dagster_dask/executor.py", line 280, in execute
for future, result in iterate_with_context(raise_execution_interrupts, futures):
File "/home/datawork-semaphore-exp/conda/conda_envs/semaphore-scripts-1.0.11/lib/python3.9/site-packages/dagster/utils/__init__.py", line 383, in iterate_with_context
next_output = next(iterator)
File "/home/datawork-semaphore-exp/conda/conda_envs/semaphore-scripts-1.0.11/lib/python3.9/site-packages/distributed/client.py", line 4446, in __next__
return self._get_and_raise()
File "/home/datawork-semaphore-exp/conda/conda_envs/semaphore-scripts-1.0.11/lib/python3.9/site-packages/distributed/client.py", line 4437, in _get_and_raise
raise exc.with_traceback(tb)
File "/home/datawork-semaphore-exp/conda/conda_envs/semaphore-scripts-1.0.11/lib/python3.9/site-packages/dagster_dask/executor.py", line 134, in query_on_dask_worker
execution_plan = create_execution_plan(
File "/home/datawork-semaphore-exp/conda/conda_envs/semaphore-scripts-1.0.11/lib/python3.9/site-packages/dagster/core/execution/api.py", line 735, in create_execution_plan
pipeline_def = pipeline.get_definition()
File "/home/datawork-semaphore-exp/conda/conda_envs/semaphore-scripts-1.0.11/lib/python3.9/site-packages/dagster/core/definitions/reconstructable.py", line 112, in get_definition
defn = self.repository.get_definition().get_pipeline(self.pipeline_name)
File "/home/datawork-semaphore-exp/conda/conda_envs/semaphore-scripts-1.0.11/lib/python3.9/site-packages/dagster/core/definitions/reconstructable.py", line 48, in get_definition
return repository_def_from_pointer(self.pointer)
File "/home/datawork-semaphore-exp/conda/conda_envs/semaphore-scripts-1.0.11/lib/python3.9/site-packages/dagster/core/definitions/reconstructable.py", line 560, in repository_def_from_pointer
target = def_from_pointer(pointer)
File "/home/datawork-semaphore-exp/conda/conda_envs/semaphore-scripts-1.0.11/lib/python3.9/site-packages/dagster/core/definitions/reconstructable.py", line 502, in def_from_pointer
target = pointer.load_target()
File "/home/datawork-semaphore-exp/conda/conda_envs/semaphore-scripts-1.0.11/lib/python3.9/site-packages/dagster/core/code_pointer.py", line 231, in load_target
raise DagsterInvariantViolationError(
So if someone has an idea on what's going on i'm listening to you guys ( and girls ) 🙂
PS: I have another questions
1: When launching my graph on PBS via a dask executor, there is one QSUB opened and each steps of the graphs happend in this qsub, is there a possibility to kill the qsub after each step of the graph then recreate a new QSUB ?
2: Maybe linked to the previous question but i can't track the progression of the different steps in the dagit interface, even if under the hood my steps is done, in the interface it stay as running ( cf screen capture 2 )Andrea Giardini
01/03/2022, 10:33 AM@graph(
ins={
"gcs_list": In(List[str]),
}
)
def airbus_tristereo_orthorectification_pansharpening(gcs_lists):
generate_subtasks(gcs_lists).map(orthorectify_pansharpen_tristereo_op)
I would expect dagit to ask me for a parameter gcs_list before starting the job but indeed that's not the case.
Is it possible to define job/graph-wide inputs and outputs?Igal Dahan
01/03/2022, 3:18 PMload_from:
# Each entry here corresponds to a service in the docker-compose file that exposes user code.
- grpc_server:
host: single_sample_pipeline
port: 4000
location_name: "<http://gcr.io/immunai-registry-hub/panacea-ai/immunai-product-single_sample_pipeline:eldan-onbuild3|gcr.io/immunai-registry-hub/panacea-ai/immunai-product-single_sample_pipeline:eldan-onbuild3>"
and on the dagit UI on workspace there are not error all loaded, but i cannot run jobsIgal Dahan
01/03/2022, 3:18 PMIgal Dahan
01/03/2022, 3:18 PMdagster api grpc -h 0.0.0.0 -p 4000 -m single_sample_pipeline
Igal Dahan
01/03/2022, 3:19 PMIgal Dahan
01/03/2022, 4:37 PMIgal Dahan
01/03/2022, 4:39 PMDaniel Michaelis
01/03/2022, 4:47 PMOperation name: PipelineExplorerRootQuery
Message: 'HistoricalPipeline' object has no attribute 'get_external_origin_id'
Path: ["pipelineSnapshotOrError","solidHandles",0,"solid","definition","id"]
Locations: [{"line":261,"column":5}]
Stack Trace:
File ".../lib/python3.8/site-packages/graphql/execution/executor.py", line 452, in resolve_or_error
return executor.execute(resolve_fn, source, info, **args)
File ".../lib/python3.8/site-packages/graphql/execution/executors/sync.py", line 16, in execute
return fn(*args, **kwargs)
File ".../lib/python3.8/site-packages/dagster_graphql/schema/solids.py", line 537, in resolve_id
return f"{self._represented_pipeline.get_external_origin_id()}:{self._solid_def_snap.name}"
It's not bothering me but I didn't find an existing issue on that topic and thought I'd post it here, in case this issue is unknown.Igal Dahan
01/03/2022, 4:53 PMprha
01/03/2022, 5:29 PMBernardo Cortez
01/03/2022, 7:05 PMexecute_in_process
. The partitioned job was build with:
my_graph = my_job.graph
my_partitioned_job = <http://my_graph.to|my_graph.to>_job(resource_defs=my resources, config=my_partitioned_config)
I do know that having a partitioned job built from a graph of a job is unorthodox (I am only doing because this job is sometimes invoked in both contexts), but it should work right?
Well, I am stuck at the following error message:
File "/home/cortez/anaconda3/envs/dagster_0_13_12_spark/lib/python3.9/site-packages/dagster/core/definitions/job.py", line 146, in execute_in_process
return core_execute_in_process(
File "/home/cortez/anaconda3/envs/dagster_0_13_12_spark/lib/python3.9/site-packages/dagster/core/execution/execute_in_process.py", line 64, in core_execute_in_process
event_list = list(_execute_run_iterable)
File "/home/cortez/anaconda3/envs/dagster_0_13_12_spark/lib/python3.9/site-packages/dagster/core/execution/api.py", line 861, in iter
yield from self.iterator(
File "/home/cortez/anaconda3/envs/dagster_0_13_12_spark/lib/python3.9/site-packages/dagster/core/execution/api.py", line 774, in pipeline_execution_iterator
for event in pipeline_context.executor.execute(pipeline_context, execution_plan):
File "/home/cortez/anaconda3/envs/dagster_0_13_12_spark/lib/python3.9/site-packages/dagster/core/executor/in_process.py", line 38, in execute
yield from iter(
File "/home/cortez/anaconda3/envs/dagster_0_13_12_spark/lib/python3.9/site-packages/dagster/core/execution/api.py", line 861, in iter
yield from self.iterator(
File "/home/cortez/anaconda3/envs/dagster_0_13_12_spark/lib/python3.9/site-packages/dagster/core/execution/plan/execute_plan.py", line 69, in inner_plan_execution_iterator
for step_event in check.generator(_dagster_event_sequence_for_step(step_context)):
File "/home/cortez/anaconda3/envs/dagster_0_13_12_spark/lib/python3.9/site-packages/dagster/core/execution/plan/execute_plan.py", line 294, in _dagster_event_sequence_for_step
raise unexpected_exception
File "/home/cortez/anaconda3/envs/dagster_0_13_12_spark/lib/python3.9/site-packages/dagster/core/execution/plan/execute_plan.py", line 195, in _dagster_event_sequence_for_step
for step_event in check.generator(step_events):
File "/home/cortez/anaconda3/envs/dagster_0_13_12_spark/lib/python3.9/site-packages/dagster_databricks/databricks_pyspark_step_launcher.py", line 148, in launch_step
step_run_ref = step_context_to_step_run_ref(
File "/home/cortez/anaconda3/envs/dagster_0_13_12_spark/lib/python3.9/site-packages/dagster/core/execution/plan/external_step.py", line 145, in step_context_to_step_run_ref
return StepRunRef(
File "/home/cortez/anaconda3/envs/dagster_0_13_12_spark/lib/python3.9/site-packages/dagster/core/definitions/step_launcher.py", line 38, in new
check.inst_param(recon_pipeline, "recon_pipeline", ReconstructablePipeline),
File "/home/cortez/anaconda3/envs/dagster_0_13_12_spark/lib/python3.9/site-packages/dagster/check/__init__.py", line 189, in inst_param
raise _param_type_mismatch_exception(
dagster.check.ParameterCheckError: Param "recon_pipeline" is not a ReconstructablePipeline. Got <dagster.core.definitions.pipeline_base.InMemoryPipeline object at 0x7f09d637af70> which is type <class 'dagster.core.definitions.pipeline_base.InMemoryPipeline'>.PS: I am using dagster 0.13.12
Alex Service
01/03/2022, 9:58 PMPath
object works pretty wellJay Ragbeer
01/04/2022, 2:58 AMThomas Mignon
01/04/2022, 10:04 AMops:
purify_ftp:
config:
command: purify_raw_logs
semaphore_audience_version: '3.4.9'
command_parameter: '/home/datawork-semaphore-exp/spool/input/ftp/ftp/2022 ftp -f access.log.20220101.gz'
workspace: /home1/datawork/semexp/workspace/dagit/running/
purify_ftp2:
config:
command: purify_raw_logs
semaphore_audience_version: '3.4.9'
command_parameter: '/home/datawork-semaphore-exp/spool/input/ftp/ftp2/2022 ftp2 -f access.log.20220101.gz'
workspace: /home1/datawork/semexp/workspace/dagit/running/
How can I "normalise" this, by removing hardcoded dates, and have it as its own setting, something like:
day: 01
month: 01
year: 2020
ops:
purify_ftp:
config:
command: purify_raw_logs
semaphore_audience_version: '3.4.9'
command_parameter: '/home/datawork-semaphore-exp/spool/input/ftp/ftp/*year* ftp -f access.log.*year**month**day*.gz'
workspace: /home1/datawork/semexp/workspace/dagit/running/
purify_ftp2:
config:
command: purify_raw_logs
semaphore_audience_version: '3.4.9'
command_parameter: '/home/datawork-semaphore-exp/spool/input/ftp/ftp2/*year* ftp2 -f access.log.*year**month**day*.gz'
workspace: /home1/datawork/semexp/workspace/dagit/running/
I don't know if a syntax of this type exist (edited)Daniel Michaelis
01/04/2022, 10:52 AMRoel Hogervorst
01/04/2022, 2:56 PMQwame
01/04/2022, 5:34 PMWill Gunadi
01/04/2022, 7:52 PM