Auster Cid
02/14/2020, 11:07 PMAlexander Verbitsky
02/17/2020, 1:12 PMsystem_storage
, is it correct assumption?Alexander Verbitsky
02/17/2020, 4:14 PMAuster Cid
02/17/2020, 11:18 PMmultiprocess_executor
or if I'm better off using something like aiohttp.Eric
02/18/2020, 9:22 PMEric
02/18/2020, 11:47 PMdwall
02/18/2020, 11:54 PMdwall
02/19/2020, 8:25 PMS3FileHandle
different than S3Coordinate
?Eric
02/19/2020, 10:01 PMWe obviously don't want to have to write a separate solid for each permutation of these parameters that we use in our pipelines – especially because, in more realistic cases like configuring a Spark job or even parametrizing the read_csv function from a popular package like Pandas, we might have dozens or hundreds of parameters like these.
But hoisting all of these parameters into the signature of the solid function as inputs isn't the right answer either:
...
The solution is to define a config schema for our solid
@solid(
config={
'delimiter': Field(
String,
default_value=',',
is_required=False,
description=('A one-character string used to separate fields.'),
),
'doublequote': Field(
Bool,
default_value=False,
is_required=False,
description=(
'Controls how instances of quotechar appearing inside a field '
'should themselves be quoted. When True, the character is '
'doubled. When False, the escapechar is used as a prefix to '
'the quotechar.'
),
),
'escapechar': Field(
String,
default_value='\\',
is_required=False,
description=(
'On reading, the escapechar removes any special meaning from '
'the following character.'
),
),
'quotechar': Field(
String,
default_value='"',
is_required=False,
description=(
'A one-character string used to quote fields containing '
'special characters, such as the delimiter or quotechar, '
'or which contain new-line characters.'
),
),
'quoting': Field(
Int,
default_value=csv.QUOTE_MINIMAL,
is_required=False,
description=(
'Controls when quotes should be generated by the writer and '
'recognised by the reader. It can take on any of the '
'csv.QUOTE_* constants'
),
),
'skipinitialspace': Field(
Bool,
default_value=False,
is_required=False,
description=(
'When True, whitespace immediately following the delimiter '
'is ignored. The default is False.'
),
),
'strict': Field(
Bool,
default_value=False,
is_required=False,
description=('When True, raise exception on bad CSV input.'),
),
}
)
def read_csv(context, csv_path: str):
with open(csv_path, 'r') as fd:
lines = [
row
for row in csv.DictReader(
fd,
delimiter=context.solid_config['delimiter'],
doublequote=context.solid_config['doublequote'],
escapechar=context.solid_config['escapechar'],
quotechar=context.solid_config['quotechar'],
quoting=context.solid_config['quoting'],
skipinitialspace=context.solid_config['skipinitialspace'],
strict=context.solid_config['strict'],
)
]
<http://context.log.info|context.log.info>('Read {n_lines} lines'.format(n_lines=len(lines)))
return lines
This example seems to work great but what about parameters with multiple types ? For example, the index_col
keyword arg of the pandas read_csv that has multiple types ?https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html
index_colint, str, sequence of int / str, or False, default None
Is using the Any type really appropriate for these cases?dwall
02/19/2020, 11:56 PMachintamiri
02/20/2020, 1:24 PMdwall
02/20/2020, 9:05 PMs3_resource
in the dagster_aws
library. I get a PutObject Access Denied
error when trying to use context.resources.s3.upload_fileobj(fp, bucket, key)
but when I swap it out with:
s3 = boto3.resource('s3').meta.client
s3.upload_fileobj(fp, bucket, key)
everything works fineuser
02/20/2020, 11:27 PMmax
02/20/2020, 11:27 PMThein
02/21/2020, 6:08 PMdwall
02/21/2020, 6:39 PMschrockn
02/21/2020, 8:01 PMAuster Cid
02/22/2020, 4:28 PMbotocore.exceptions.ClientError: An error occurred (InvalidAMIID.NotFound) when calling the RunInstances operation: The image id '[ami-08fd8ae3806f09a08]' does not exist
Edit: hmm, seems like it works if I use region us-west-1 instead of sa-east-1Lyle
02/24/2020, 2:36 AMdagster-aws
? You could prompt via click
with something like:
1) 0.0.0.0/0
2) whatever ip detected via requests.get("<https://wikipedia.com>").headers.get('X-Client-IP')
3) enter IP(s)
I don’t mind throwing up a PR, but wanted to make sure there was interest. I get there’s a decent disclaimer in the docs, but people forget this stuff is up and all sorts of other human things.achintamiri
02/24/2020, 11:16 AMkevin
02/24/2020, 4:16 PMAuster Cid
02/24/2020, 10:43 PMdwall
02/25/2020, 3:07 AMslack_resource
within solids, but there are instances where I'd prefer to have a blanket notification sent regardless of which solid and/or pipeline failedThein
02/25/2020, 3:54 AMdagster.core.errors.DagsterInvariantViolationError: Compute function for solid
when I tried pd.read_sql
with chunksize
. It works fine without chunksize
.
@solid
def read_db(_context, query, engine):
df = pd.read_sql(query, con=engine.connect(), chunksize=500000)
return df
@composite_solid
def savedb():
connect_db = connectdb.alias("connect_db")
read_sql_file = read_sqlfile.alias("read_sql_file")
# <http://context.log.info|context.log.info>("test done")
data = read_db(read_sql_file(), connect_db())
return data
dwall
02/25/2020, 4:38 PM0.7.2
?schrockn
02/25/2020, 4:40 PMvu
02/25/2020, 5:32 PMramshackle-jamathon
02/25/2020, 5:47 PMstart execution
option from being available in the dagit UI?Eric
02/25/2020, 6:42 PMsolid_from_func
decorator in this pull request https://github.com/dagster-io/dagster/pull/2049 and couple it with an automatic signature generation in hopes to use 3rd part library functions out of the box.
Our dev team here is very small and having to duplicate the signatures of all the functions from all the libraries we plan to use has become a serious blocker.Auster Cid
02/25/2020, 8:51 PM