-
Notifications
You must be signed in to change notification settings - Fork 246
Expand file tree
/
Copy pathclient.py
More file actions
385 lines (329 loc) · 14.7 KB
/
client.py
File metadata and controls
385 lines (329 loc) · 14.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
#
# Copyright (c) 2018-2026, NVIDIA CORPORATION. All rights reserved.
#
from typing import List, Optional, Tuple, Union
import os
import warnings
from aistore.sdk.enums import Colocation
from aistore.sdk.bucket import Bucket
from aistore.sdk.provider import Provider
from aistore.sdk.const import (
AIS_AUTHN_TOKEN,
AIS_READ_TIMEOUT,
AIS_CONNECT_TIMEOUT,
AIS_MAX_CONN_POOL,
EXT_TAR,
)
from aistore.sdk.cluster import Cluster
from aistore.sdk.dsort import Dsort
from aistore.sdk.request_client import RequestClient
from aistore.sdk.session_manager import SessionManager
from aistore.sdk.types import Namespace
from aistore.sdk.job import Job
from aistore.sdk.etl.etl import Etl
from aistore.sdk.utils import parse_url
from aistore.sdk.obj.object import Object
from aistore.sdk.errors import InvalidURLException
from aistore.sdk.retry_config import RetryConfig
from aistore.sdk.batch.batch import Batch
class Client:
"""
AIStore client for managing buckets, objects, and ETL jobs.
Args:
endpoint (str): AIStore endpoint.
skip_verify (bool, optional): If True, skip SSL certificate verification. If False (default),
the 'AIS_SKIP_VERIFY' environment variable is also checked.
ca_cert (str, optional): Path to a CA certificate file for SSL verification. If not provided,
the 'AIS_CLIENT_CA' environment variable will be used. Defaults to None.
client_cert (Union[str, Tuple[str, str], None], optional): Path to a client certificate PEM file
or a tuple (cert, key) for mTLS. If not provided, 'AIS_CRT' and 'AIS_CRT_KEY' environment
variables will be used. Defaults to None.
timeout (Union[float, Tuple[float, float], None], optional): Timeout for HTTP requests.
- Single float (e.g., `5.0`): Applies to both connection and read timeouts.
- Tuple (e.g., `(3.0, 20.0)`): First value is the connection timeout, second is the read timeout.
- Tuple with 0 (e.g., `(0, 20.0)` or `(3.0, 0)`): Use `0` to disable specific timeout.
- `0` or `0.0` or `(0, 0)`: Disables all timeouts.
- `None` (default): Check environment variables 'AIS_CONNECT_TIMEOUT' and 'AIS_READ_TIMEOUT'.
If env var is set to `0`, that specific timeout is disabled. Defaults to `(3, 20)` if not set.
retry_config (RetryConfig, optional): Defines retry behavior for HTTP and network failures.
If not provided, the default retry configuration (`RetryConfig.default()`) is used.
token (str, optional): Authorization token. If not provided, the 'AIS_AUTHN_TOKEN' environment variable
will be used. Defaults to None.
max_pool_size (int, optional): Maximum number of connections per host in the connection pool.
If not provided, the 'AIS_MAX_CONN_POOL' environment variable will be used, or defaults to 10.
"""
# Default timeout values (connect, read)
_DEFAULT_CONNECT_TIMEOUT = 3
_DEFAULT_READ_TIMEOUT = 20
_DEFAULT_MAX_POOL_SIZE = 10
@staticmethod
def _parse_timeout_from_env(
env_var_name: str, default_value: float
) -> Optional[float]:
"""
Parse a timeout value from an environment variable.
Args:
env_var_name: Name of the environment variable to read
default_value: Default value to use if not set or parsing fails
Returns:
Parsed timeout value (None if 0, float otherwise), or default if invalid
"""
env_value = os.environ.get(env_var_name)
if not env_value:
return default_value
try:
parsed = float(env_value)
# If env var is 0, return None (no timeout limit)
if parsed == 0:
return None
return parsed
except (ValueError, TypeError):
warnings.warn(
f"Invalid value for {env_var_name}='{env_value}'. "
f"Using default timeout of {default_value} seconds."
)
return default_value
@staticmethod
def _resolve_timeout(
timeout: Optional[Union[float, Tuple[float, float]]],
) -> Optional[Union[float, Tuple[float, float]]]:
"""
Resolve timeout value from parameter or environment variables.
Priority: explicit parameter > environment variables > defaults
Special handling:
- timeout=0 or timeout=(0, 0) -> None (disable all timeouts)
- timeout=(0, 20) or timeout=(3, 0) -> convert 0 to None for that specific timeout
- timeout=None (default) -> check env vars, fallback to (3, 20)
- timeout=<value> -> use as-is (with 0 converted to None)
- AIS_CONNECT_TIMEOUT=0 -> None for connect timeout (no limit)
- AIS_READ_TIMEOUT=0 -> None for read timeout (no limit)
- Both env vars=0 -> None (disable all timeouts)
Args:
timeout: Timeout parameter passed to __init__
Returns:
Resolved timeout value, tuple (connect, read), or None if disabled
"""
# Convert 0 to None (disable timeout)
if timeout in (0, 0.0, (0, 0)):
return None
# If timeout was explicitly provided (not None), use it
if timeout is not None:
# Handle tuple: convert 0 to None for individual timeouts
if isinstance(timeout, tuple):
connect, read = timeout
connect = None if connect == 0 else connect
read = None if read == 0 else read
# If both are None, return None completely
if connect is None and read is None:
return None
return (connect, read)
# Single float value - use for both
return timeout
# timeout is None - check environment variables or use defaults
connect_timeout = Client._parse_timeout_from_env(
AIS_CONNECT_TIMEOUT, Client._DEFAULT_CONNECT_TIMEOUT
)
read_timeout = Client._parse_timeout_from_env(
AIS_READ_TIMEOUT, Client._DEFAULT_READ_TIMEOUT
)
# If both timeouts are None, return None (disable timeout completely)
if connect_timeout is None and read_timeout is None:
return None
return (connect_timeout, read_timeout)
@staticmethod
def _resolve_max_pool_size(max_pool_size: Optional[int]) -> int:
"""
Resolve max_pool_size value from parameter or environment variable.
Priority: explicit parameter > environment variable > default
Args:
max_pool_size: max_pool_size parameter passed to __init__
Returns:
Resolved max_pool_size value
"""
# If max_pool_size was explicitly provided, use it
if max_pool_size is not None:
return max_pool_size
# Try to get value from environment variable
env_max_pool = os.environ.get(AIS_MAX_CONN_POOL)
if env_max_pool:
try:
return int(env_max_pool)
except (ValueError, TypeError):
warnings.warn(
f"Invalid value for {AIS_MAX_CONN_POOL}='{env_max_pool}'. "
f"Using default max_pool_size of {Client._DEFAULT_MAX_POOL_SIZE}."
)
return Client._DEFAULT_MAX_POOL_SIZE
# pylint: disable=too-many-arguments, too-many-positional-arguments, too-many-locals
def __init__(
self,
endpoint: str,
skip_verify: bool = False,
ca_cert: Optional[str] = None,
client_cert: Optional[Union[str, Tuple[str, str]]] = None,
timeout: Optional[Union[float, Tuple[float, float]]] = None,
retry_config: Optional[RetryConfig] = None,
token: Optional[str] = None,
max_pool_size: Optional[int] = None,
):
self.retry_config = (
retry_config if retry_config is not None else RetryConfig.default()
)
# Resolve configuration values: params > env_vars > defaults
timeout = self._resolve_timeout(timeout)
max_pool_size = self._resolve_max_pool_size(max_pool_size)
session_manager = SessionManager(
retry=self.retry_config.http_retry,
ca_cert=ca_cert,
client_cert=client_cert,
skip_verify=skip_verify,
max_pool_size=max_pool_size,
)
# Check for token from arguments or environment variable
if not token:
token = os.environ.get(AIS_AUTHN_TOKEN)
self._request_client = RequestClient(
endpoint=endpoint,
session_manager=session_manager,
timeout=timeout,
token=token,
retry_config=self.retry_config,
)
def bucket(
self,
bck_name: str,
provider: Union[Provider, str] = Provider.AIS,
namespace: Namespace = None,
):
"""
Factory constructor for bucket object.
Does not make any HTTP request, only instantiates a bucket object.
Args:
bck_name (str): Name of bucket
provider (str or Provider): Provider of bucket, one of "ais", "aws", "gcp", ...
(optional, defaults to ais)
namespace (Namespace): Namespace of bucket (optional, defaults to None)
Returns:
The bucket object created.
"""
return Bucket(
client=self._request_client,
name=bck_name,
provider=provider,
namespace=namespace,
)
def cluster(self):
"""
Factory constructor for cluster object.
Does not make any HTTP request, only instantiates a cluster object.
Returns:
The cluster object created.
"""
return Cluster(client=self._request_client)
def job(self, job_id: str = "", job_kind: str = ""):
"""
Factory constructor for job object, which contains job-related functions.
Does not make any HTTP request, only instantiates a job object.
Args:
job_id (str, optional): Optional ID for interacting with a specific job
job_kind (str, optional): Optional specific type of job empty for all kinds
Returns:
The job object created.
"""
return Job(client=self._request_client, job_id=job_id, job_kind=job_kind)
def etl(self, etl_name: str):
"""
Factory constructor for ETL object.
Contains APIs related to AIStore ETL operations.
Does not make any HTTP request, only instantiates an ETL object.
Args:
etl_name (str): Name of the ETL
Returns:
The ETL object created.
"""
return Etl(client=self._request_client, name=etl_name)
def dsort(self, dsort_id: str = ""):
"""
Factory constructor for dSort object.
Contains APIs related to AIStore dSort operations.
Does not make any HTTP request, only instantiates a dSort object.
Args:
dsort_id: ID of the dSort job
Returns:
dSort object created
"""
return Dsort(client=self._request_client, dsort_id=dsort_id)
def get_object_from_url(self, url: str) -> Object:
"""
Creates an Object instance from a URL.
This method does not make any HTTP requests.
Args:
url (str): Full URL of the object (e.g., "ais://bucket1/file.txt")
Returns:
Object: The object constructed from the specified URL
Raises:
InvalidURLException: If the URL is invalid.
"""
try:
provider, bck_name, obj_name = parse_url(url)
if not provider or not bck_name or not obj_name:
raise InvalidURLException(url)
return self.bucket(bck_name, provider=provider).object(obj_name)
except InvalidURLException as err:
raise err
def batch(
self,
objects: Union[List[Object], Object, str, List[str]] = None,
bucket: Optional[Bucket] = None,
output_format: str = EXT_TAR,
cont_on_err: bool = True,
only_obj_name: bool = False,
streaming_get: bool = True,
colocation: Colocation = Colocation.NONE,
):
"""
Factory constructor for Get-Batch API (MOSS - Multi-Object Streaming Service).
Efficiently retrieve multiple objects, archive files, or byte ranges in a single request,
reducing network overhead and improving throughput for ML training workloads.
Args:
objects (Optional[Union[List[Object], Object, str, List[str]]]): Objects to retrieve. Can be:
- Single object name: "file.txt"
- List of names: ["file1.txt", "file2.txt"]
- Single Object instance
- List of Object instances
- None (add objects later via batch.add())
Note: if objects are specified as raw names (str or list of str), bucket must be provided
bucket (Optional[Bucket]): Default bucket for all objects
output_format (str): Archive format (tar, tgz, zip). Defaults to ".tar"
cont_on_err (bool): Continue on errors (missing files under __404__/). Defaults to True
only_obj_name (bool): Use only obj name in archive path. Defaults to False
streaming_get (bool): Stream resulting archive prior to finalizing it in memory. Defaults to True
colocation (Colocation): Colocation hint for optimization. Defaults to Colocation.NONE.
- Colocation.NONE: no optimization - suitable for uniformly distributed data
- Colocation.TARGET_AWARE: target-aware - objects are collocated on few targets
- Colocation.TARGET_AND_SHARD_AWARE: target and shard-aware - enables archive handle reuse
Returns:
Batch: Batch object for building and executing Get-Batch requests
Example:
# Quick batch with string names
batch = client.batch(["file1.txt", "file2.txt"], bucket=bucket)
for obj_info, data in batch.get():
print(f"Object: {obj_info.obj_name}, Size: {len(data)}")
# Build batch incrementally with advanced options
batch = client.batch(bucket=bucket)
batch.add("simple.txt")
batch.add("archive.tar", archpath="images/photo.jpg") # extract from archive
batch.add("tracked.txt", opaque=b"user-id-123") # with tracking data
for obj_info, data in batch.get():
print(f"Object: {obj_info.obj_name}")
"""
return Batch(
self._request_client,
objects,
bucket,
output_format,
cont_on_err,
only_obj_name,
streaming_get,
colocation,
)