Mose
05/21/2020, 10:26 PMuser
05/22/2020, 12:59 AMcat
05/22/2020, 1:14 AMFran Sanchez
05/22/2020, 11:03 AMdagster-graphql
will run a pipeline directly or does it only communicate that to a dagit
instance? Also, is it possible to run dagster
instead of dagit
to keep a server running or is it only dagit
that can do it? I find the documentation great, but the architecture isn't completely clear to me by reading it.Keval
05/24/2020, 7:19 PMDAGSTER
I have just installed it and tried to execute demo code. What I am trying to achieve is as follow, Please excuse me me if I am asking silly question as I am trying to understand it
1. Download Zip file from web URL
2. Unzip it
3. Read CSV from the unzip files
4. Validate CSV (validating data like type,value range etc) and Apply Some Transformation - Join, Merge
5. Save CSV back to specified path
Can this work flow possible to implement in Dagster and is there any similar example any one can point me ?
ThanksKeval
05/25/2020, 8:30 PMread_csv
solid do I have to copy paste every where in each project where I want to use it or there is any solution to import solids an use them?Ben Sully
05/26/2020, 2:08 PMalir
05/26/2020, 7:19 PMCouldn't import module_name
exception.Keval
05/26/2020, 8:34 PManderson
05/26/2020, 10:50 PMFile "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/url.py", line 71, in __init__
self.port = int(port)
ValueError: invalid literal for int() with base 10: ''
seems like the port is empty but this is my config
schedule_storage:
module: dagster_postgres.schedule_storage
class: PostgresScheduleStorage
config:
postgres_db:
username:
env: POSTGRES_USER
password:
env: POSTGRES_PASS
hostname: "<http://dagster-database>"
db_name:
env: POSTGRES_DBNAME
port: 5432
using a local postgres deployment with service, k8s launcherJohn Mav
05/27/2020, 6:07 PM@solid
def get_urls(_):
url_list = pd.read_csv('urls_to_update.csv')
# This can be a list of size 0-100
for url in url_list:
yield Output(url, "url")
@solid
def parse_data(_, url):
res = requests.get(url).json()
structured_data = dict(
url=url,
col_a=res["col_a"],
col_b=res["col_b"]
)
yield Output(structured_data, 'result')
@solid
def upsert_data(_, data):
# Some logic to load the data into warehouse
@pipeline
def my_dynamic_pipeline():
upsert_data(
data=parse_data(
url=get_urls()
)
)
Is this just not feasible? Not a huge issue if not I can work around it.s.zepelin
05/27/2020, 6:46 PMs.zepelin
05/27/2020, 6:48 PMmax
05/27/2020, 9:48 PMFlavien
05/28/2020, 1:04 PMdagster_spark
package doesn't seem to have configurations to support Hive (spark.sql.warehouse.dir
and .enableHiveSupport
). Will it be possible to do so in an upcoming release ? If not, are you open to PR ?Sam Rausser
05/28/2020, 5:44 PMdhume
05/28/2020, 7:14 PMTravis Cline
05/28/2020, 8:21 PMmax
05/28/2020, 9:30 PMuser
05/29/2020, 2:36 AMnate
05/29/2020, 2:38 AMFran Sanchez
05/29/2020, 3:12 PMDamian
05/29/2020, 3:59 PMuser
05/29/2020, 6:08 PMJoseph Sayad
05/29/2020, 6:13 PMdistributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? Process memory: 907.54 MB -- Worker memory limit: 1.07 GB
As a fix, I have tried to increase the memory limit for my workers to around 3GB for a memory expensive solid:
@dg.solid(
...
tags={'dagster-dask/resource_requirements': {"MEMORY": 3e9}},
)
def parse_dimensions(context, ...):
But, the solid doesn't seem to be executed at all when I run the pipeline. Any insight or general knowledge regarding this type of problem would be very appreciated.SamSinayoko
05/29/2020, 6:14 PMSam Rausser
05/30/2020, 12:55 AMSamSinayoko
05/30/2020, 6:36 AMWe can register these partitions decorating a function that returns a list of partition set definitions with the Invalid: @repository_partitions
Sam Rausser
05/30/2020, 7:05 PMexecute_pipeline
in another thread or process and loop?
like say a kafka consumer spilling to disk after N bytes and then running execute_pipeline
on a pipeline that finds the file and processes it etc.SamSinayoko
06/01/2020, 7:16 AM(dagster) $ DAGSTER_HOME=$PWD dagster pipeline backfill -y repository.yaml
Select a pipeline to backfill: compute_total_stock_volume: compute_total_stock_volume
Pipeline: compute_total_stock_volume
Partition set: stock_data_partitions_set
Partitions:
2018-01-01 2018-02-01 2018-03-01 2018-04-01 2018-05-01 2018-06-01 2018-07-01 2018-08-01
2018-09-01 2018-10-01 2018-11-01 2018-12-01
Do you want to proceed with the backfill (12 partitions)? [y/N]: y
Launching runs...
Traceback (most recent call last):
File "/Users/sinayoks/apps/miniconda3/envs/dagster/bin/dagster", line 8, in <module>
sys.exit(main())
File "/Users/sinayoks/apps/miniconda3/envs/dagster/lib/python3.6/site-packages/dagster/cli/__init__.py", line 38, in main
cli(obj={}) # pylint:disable=E1123
File "/Users/sinayoks/apps/miniconda3/envs/dagster/lib/python3.6/site-packages/click/core.py", line 829, in __call__
return self.main(*args, **kwargs)
File "/Users/sinayoks/apps/miniconda3/envs/dagster/lib/python3.6/site-packages/click/core.py", line 782, in main
rv = self.invoke(ctx)
File "/Users/sinayoks/apps/miniconda3/envs/dagster/lib/python3.6/site-packages/click/core.py", line 1259, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/Users/sinayoks/apps/miniconda3/envs/dagster/lib/python3.6/site-packages/click/core.py", line 1259, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/Users/sinayoks/apps/miniconda3/envs/dagster/lib/python3.6/site-packages/click/core.py", line 1066, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/Users/sinayoks/apps/miniconda3/envs/dagster/lib/python3.6/site-packages/click/core.py", line 610, in invoke
return callback(*args, **kwargs)
File "/Users/sinayoks/apps/miniconda3/envs/dagster/lib/python3.6/site-packages/dagster/cli/pipeline.py", line 599, in pipeline_backfill_command
execute_backfill_command(kwargs, click.echo)
File "/Users/sinayoks/apps/miniconda3/envs/dagster/lib/python3.6/site-packages/dagster/cli/pipeline.py", line 693, in execute_backfill_command
instance.launch_run(run.run_id)
File "/Users/sinayoks/apps/miniconda3/envs/dagster/lib/python3.6/site-packages/dagster/core/instance/__init__.py", line 801, in launch_run
return self._run_launcher.launch_run(self, run)
File "/Users/sinayoks/apps/miniconda3/envs/dagster/lib/python3.6/site-packages/dagster_graphql/launcher/__init__.py", line 97, in launch_run
cls=self.__class__.__name__, address=self._address, result=result
dagster.core.errors.DagsterLaunchFailedError: Failed to launch run with RemoteDagitRunLauncher targeting <http://127.0.0.1:3000>:
{'__typename': 'PipelineRunNotFoundError'}
So it looks like the job is creating a run but Dagit isn't able to find it. Any idea what I'm doing wrong here?