"""
This is a straightforward Python wrapper for ssdeep by Jesse Kornblum,
which is a library for computing Context Triggered Piecewise Hashes (CTPH).
"""
import os
import six
from ssdeep.__about__ import (
__author__, __copyright__, __email__, __license__, __summary__, __title__,
__uri__, __version__
)
from ssdeep.binding import Binding
binding = Binding()
ffi = binding.ffi
[docs]class BaseError(Exception):
"""The base for all other Exceptions"""
pass
[docs]class InternalError(BaseError):
"""Raised if lib returns internal error"""
pass
[docs]class Hash(object):
"""
Hashlib like object. It is only supported with ssdeep/libfuzzy >= 2.10.
:raises InternalError: If lib returns internal error
:raises NotImplementedError: Required functions are not available
"""
def __init__(self):
self._state = ffi.NULL
if not hasattr(binding.lib, "fuzzy_new"):
raise NotImplementedError("Only supported with ssdeep >= 2.10")
self._state = binding.lib.fuzzy_new()
if self._state == ffi.NULL:
raise InternalError("Unable to create state object")
[docs] def update(self, buf, encoding="utf-8"):
"""
Feed the data contained in the given buffer to the state.
:param String|Byte buf: The data to be hashed
:param String encoding: Encoding is used if buf is String
:raises InternalError: If lib returns an internal error
:raises TypeError: If buf is not Bytes, String or Unicode
"""
if self._state == ffi.NULL:
raise InternalError("State object is NULL")
if isinstance(buf, six.text_type):
buf = buf.encode(encoding)
if not isinstance(buf, six.binary_type):
raise TypeError(
"Argument must be of string, unicode or bytes type not "
"'%r'" % type(buf)
)
if binding.lib.fuzzy_update(self._state, buf, len(buf)) != 0:
binding.lib.fuzzy_free(self._state)
raise InternalError("Invalid state object")
[docs] def digest(self, elimseq=False, notrunc=False):
"""
Obtain the fuzzy hash.
This operation does not change the state at all. It reports the hash
for the concatenation of the data previously fed using update().
:return: The fuzzy hash
:rtype: String
:raises InternalError: If lib returns an internal error
"""
if self._state == ffi.NULL:
raise InternalError("State object is NULL")
flags = (binding.lib.FUZZY_FLAG_ELIMSEQ if elimseq else 0) | \
(binding.lib.FUZZY_FLAG_NOTRUNC if notrunc else 0)
result = ffi.new("char[]", binding.lib.FUZZY_MAX_RESULT)
if binding.lib.fuzzy_digest(self._state, result, flags) != 0:
raise InternalError("Function returned an unexpected error code")
return ffi.string(result).decode("ascii")
def __del__(self):
if self._state != ffi.NULL:
binding.lib.fuzzy_free(self._state)
[docs]class PseudoHash(object):
"""
Hashlib like object. Use this class only if Hash() isn't supported by your
ssdeep/libfuzzy library. This class stores the provided data in memory, so
be careful when hashing large files.
"""
def __init__(self):
self._data = b""
[docs] def update(self, buf, encoding="utf-8"):
"""
Feed the data contained in the given buffer to the state.
:param String|Byte buf: The data to be hashed
:param String encoding: Encoding is used if buf is String
:raises TypeError: If buf is not Bytes, String or Unicode
"""
if isinstance(buf, six.text_type):
buf = buf.encode(encoding)
if not isinstance(buf, six.binary_type):
raise TypeError(
"Argument must be of string, unicode or bytes type not "
"'%r'" % type(buf)
)
self._data = self._data + buf
[docs] def digest(self, elimseq=False, notrunc=False):
"""
Obtain the fuzzy hash.
This operation does not change the state at all. It reports the hash
for the concatenation of the data previously fed using update().
:return: The fuzzy hash
:rtype: String
"""
return hash(self._data)
[docs]def compare(sig1, sig2):
"""
Computes the match score between two fuzzy hash signatures.
Returns a value from zero to 100 indicating the match score of the
two signatures. A match score of zero indicates the signatures
did not match.
:param Bytes|String sig1: First fuzzy hash signature
:param Bytes|String sig2: Second fuzzy hash signature
:return: Match score (0-100)
:rtype: Integer
:raises InternalError: If lib returns an internal error
:raises TypeError: If sig is not String, Unicode or Bytes
"""
if isinstance(sig1, six.text_type):
sig1 = sig1.encode("ascii")
if isinstance(sig2, six.text_type):
sig2 = sig2.encode("ascii")
if not isinstance(sig1, six.binary_type):
raise TypeError(
"First argument must be of string, unicode or bytes type not "
"'%s'" % type(sig1)
)
if not isinstance(sig2, six.binary_type):
raise TypeError(
"Second argument must be of string, unicode or bytes type not "
"'%r'" % type(sig2)
)
res = binding.lib.fuzzy_compare(sig1, sig2)
if res < 0:
raise InternalError("Function returned an unexpected error code")
return res
[docs]def hash(buf, encoding="utf-8"):
"""
Compute the fuzzy hash of a buffer
:param String|Bytes buf: The data to be fuzzy hashed
:return: The fuzzy hash
:rtype: String
:raises InternalError: If lib returns an internal error
:raises TypeError: If buf is not String or Bytes
"""
if isinstance(buf, six.text_type):
buf = buf.encode(encoding)
if not isinstance(buf, six.binary_type):
raise TypeError(
"Argument must be of string, unicode or bytes type not "
"'%r'" % type(buf)
)
# allocate memory for result
result = ffi.new("char[]", binding.lib.FUZZY_MAX_RESULT)
if binding.lib.fuzzy_hash_buf(buf, len(buf), result) != 0:
raise InternalError("Function returned an unexpected error code")
return ffi.string(result).decode("ascii")
[docs]def hash_from_file(filename):
"""
Compute the fuzzy hash of a file.
Opens, reads, and hashes the contents of the file 'filename'
:param String|Bytes filename: The name of the file to be hashed
:return: The fuzzy hash of the file
:rtype: String
:raises IOError: If Python is unable to read the file
:raises InternalError: If lib returns an internal error
"""
if not os.path.exists(filename):
raise IOError("Path not found")
if not os.path.isfile(filename):
raise IOError("File not found")
if not os.access(filename, os.R_OK):
raise IOError("File is not readable")
result = ffi.new("char[]", binding.lib.FUZZY_MAX_RESULT)
if binding.lib.fuzzy_hash_filename(filename.encode("utf-8"), result) != 0:
raise InternalError("Function returned an unexpected error code")
return ffi.string(result).decode("ascii")