Source code for taskflow.utils.redis_utils
# -*- coding: utf-8 -*-
# Copyright (C) 2015 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import enum
import redis
from redis import exceptions as redis_exceptions
import six
def _raise_on_closed(meth):
@six.wraps(meth)
def wrapper(self, *args, **kwargs):
if self.closed:
raise redis_exceptions.ConnectionError("Connection has been"
" closed")
return meth(self, *args, **kwargs)
return wrapper
[docs]class RedisClient(redis.StrictRedis):
"""A redis client that can be closed (and raises on-usage after closed).
TODO(harlowja): if https://github.com/andymccurdy/redis-py/issues/613 ever
gets resolved or merged or other then we can likely remove this.
"""
def __init__(self, *args, **kwargs):
super(RedisClient, self).__init__(*args, **kwargs)
self.closed = False
def close(self):
self.closed = True
self.connection_pool.disconnect()
execute_command = _raise_on_closed(redis.StrictRedis.execute_command)
transaction = _raise_on_closed(redis.StrictRedis.transaction)
pubsub = _raise_on_closed(redis.StrictRedis.pubsub)
[docs]class UnknownExpire(enum.IntEnum):
"""Non-expiry (not ttls) results return from :func:`.get_expiry`.
See: http://redis.io/commands/ttl or http://redis.io/commands/pttl
"""
DOES_NOT_EXPIRE = -1
"""
The command returns ``-1`` if the key exists but has no associated expire.
"""
#: The command returns ``-2`` if the key does not exist.
KEY_NOT_FOUND = -2
DOES_NOT_EXPIRE = UnknownExpire.DOES_NOT_EXPIRE
KEY_NOT_FOUND = UnknownExpire.KEY_NOT_FOUND
_UNKNOWN_EXPIRE_MAPPING = dict((e.value, e) for e in list(UnknownExpire))
[docs]def get_expiry(client, key, prior_version=None):
"""Gets an expiry for a key (using **best** determined ttl method)."""
is_new_enough, _prior_version = is_server_new_enough(
client, (2, 6), prior_version=prior_version)
if is_new_enough:
result = client.pttl(key)
try:
return _UNKNOWN_EXPIRE_MAPPING[result]
except KeyError:
return result / 1000.0
else:
result = client.ttl(key)
try:
return _UNKNOWN_EXPIRE_MAPPING[result]
except KeyError:
return float(result)
[docs]def apply_expiry(client, key, expiry, prior_version=None):
"""Applies an expiry to a key (using **best** determined expiry method)."""
is_new_enough, _prior_version = is_server_new_enough(
client, (2, 6), prior_version=prior_version)
if is_new_enough:
# Use milliseconds (as that is what pexpire uses/expects...)
ms_expiry = expiry * 1000.0
ms_expiry = max(0, int(ms_expiry))
result = client.pexpire(key, ms_expiry)
else:
# Only supports seconds (not subseconds...)
sec_expiry = int(expiry)
sec_expiry = max(0, sec_expiry)
result = client.expire(key, sec_expiry)
return bool(result)
[docs]def is_server_new_enough(client, min_version,
default=False, prior_version=None):
"""Checks if a client is attached to a new enough redis server."""
if not prior_version:
try:
server_info = client.info()
except redis_exceptions.ResponseError:
server_info = {}
version_text = server_info.get('redis_version', '')
else:
version_text = prior_version
version_pieces = []
for p in version_text.split("."):
try:
version_pieces.append(int(p))
except ValueError:
break
if not version_pieces:
return (default, version_text)
else:
version_pieces = tuple(version_pieces)
return (version_pieces >= min_version, version_text)