Source code for datalad_next.utils

"""Assorted utility functions"""

import logging
import os
from typing import (
    Any,
    Dict,
)

from datalad.utils import (
    Path,
    check_symlink_capability,
    chpwd,
    ensure_bool,
    ensure_list,
    get_dataset_root,
    getargspec,
    get_wrapped_class,
    knows_annex,
    on_linux,
    on_windows,
    rmtemp,
    rmtree,
    swallow_outputs,
)
from datalad.distribution.utils import _yield_ds_w_matching_siblings
from datalad.support.external_versions import external_versions

from datalad_next.credman import CredentialManager
from .log import log_progress


lgr = logging.getLogger('datalad.utils')


[docs] def get_specialremote_param_dict(params): """ Parameters ---------- params : list Returns ------- dict """ return dict(p.split('=', maxsplit=1) for p in params)
[docs] def get_specialremote_credential_properties(params): """Determine properties of credentials special remote configuration The input is a parameterization as it would be given to `git annex initremote|enableremote <name> ...`, or as stored in `remote.log`. These parameters are inspected and a dictionary of credential properties, suitable for `CredentialManager.query()` is returned. This inspection may involve network activity, e.g. HTTP requests. Parameters ---------- params : list or dict Either a list of strings of the format 'param=value', or a dictionary with parameter names as keys. Returns ------- dict or None Credential property name-value mapping. This mapping can be passed to `CredentialManager.query()`. If no credential properties could be inferred, for example, because the special remote type is not recognized `None` is returned. """ if isinstance(params, (list, tuple)): params = get_specialremote_param_dict(params) props = {} # no other way to do this specifically for each supported remote type remote_type = params.get('type') if remote_type == 'webdav': from .http_helpers import get_auth_realm from datalad_next.url_operations.http import HttpUrlOperations url = params.get('url') if not url: return url, urlprops = HttpUrlOperations().probe_url(url) realm = get_auth_realm(url, urlprops.get('auth')) if realm: props['realm'] = realm else: return return props or None
[docs] def update_specialremote_credential( srtype, credman, credname, credprops, credtype_hint=None, duplicate_hint=None): """ Parameters ---------- srtype: str credman: CredentialManager credname: str or Name credprops: dict """ if not credname: # name could still be None, if this was entered # create a default name, and check if it has not been used credname = '{type}{udelim}{user}{delim}{realm}'.format( type=srtype, udelim='-' if 'user' in credprops else '', user=credprops.get('user', ''), delim='-' if 'realm' in credprops else '', realm=credprops.get('realm', ''), ) if credman.get( name=credname, # give to make legacy credentials accessible _type_hint=credtype_hint): # this is already in use, do not override lgr.warning( 'The entered credential will not be stored, ' 'a credential with the default name %r already exists.%s', credname, f' {duplicate_hint}' if duplicate_hint else '') return # we have used a credential, store it with updated usage info try: credman.set(credname, _lastused=True, **credprops) except Exception as e: from datalad_next.exceptions import CapturedException # we do not want to crash for any failure to store a # credential lgr.warn( 'Exception raised when storing credential %r %r: %s', credname, credprops, CapturedException(e), )
# mapping for credential properties for specific special remote # types. this is unpleasantly non-generic, but only a small # subset of git-annex special remotes require credentials to be # given via ENV vars, and all of rclone's handle it internally specialremote_credential_envmap = dict( # it makes no sense to pull a short-lived access token from # a credential store, it can be given via AWS_SESSION_TOKEN # in any case glacier=dict( user='AWS_ACCESS_KEY_ID', # nosec secret='AWS_SECRET_ACCESS_KEY'), # nosec s3=dict( user='AWS_ACCESS_KEY_ID', # nosec secret='AWS_SECRET_ACCESS_KEY'), # nosec webdav=dict( user='WEBDAV_USERNAME', # nosec secret='WEBDAV_PASSWORD'), # nosec )
[docs] def needs_specialremote_credential_envpatch(remote_type): """Returns whether the environment needs to be patched with credentials Returns ------- bool False, if the special remote type is not recognized as one needing credentials, or if there are credentials already present. True, otherwise. """ if remote_type not in specialremote_credential_envmap: lgr.debug('Special remote type %r not supported for credential setup', remote_type) return False # retrieve deployment mapping env_map = specialremote_credential_envmap[remote_type] if all(k in os.environ for k in env_map.values()): # the ENV is fully set up # let's prefer the environment to behave like git-annex lgr.debug( 'Not deploying credentials for special remote type %r, ' 'already present in environment', remote_type) return False # no counterevidence return True
[docs] def get_specialremote_credential_envpatch(remote_type, cred): """Create an environment path for a particular remote type and credential Returns ------- dict or None A dict with all required items to patch the environment, or None if not enough information is available, or nothing needs to be patched. """ env_map = specialremote_credential_envmap.get(remote_type, {}) return { # take whatever partial setup the ENV has already v: cred[k] for k, v in env_map.items() if v not in os.environ } or None
[docs] class ParamDictator: """Parameter dict access helper This class can be used to wrap a dict containing function parameter name-value mapping, and get/set values by parameter name attribute rather than via the ``__getitem__`` dict API. """ def __init__(self, params: Dict): self.__params = params def __getattr__(self, attr: str): if attr.startswith('__'): return super().__getattr__(attr) return self.__params[attr] def __setattr__(self, attr: str, value: Any): if attr.startswith('_ParamDictator__'): super().__setattr__(attr, value) else: self.__params[attr] = value