#!/usr/bin/python3
import re
import subprocess
import os
import threading
import sys
from PyQt5.QtCore import QObject
from PyQt5.QtCore import QThread
from PyQt5.QtCore import QTimer
from PyQt5.QtCore import pyqtSignal
from PyQt5.QtCore import pyqtSlot
[docs]class Pygphoto(QObject):
"""Allows simple operations on a USB connected camera by interfacing
the gphoto2 command line tool.
This class allows to interact with a USB camera. List the names of
the photos present in the camera, watch for new files, and
eventually download the photos individually. Needs a QApplication
to be instantiated in order to watch properly for events.
Args:
watch_camera=False (bool): Should the daemon thread watch for camera connection.
watch_files=False (bool): Should the daemon thread watch for files creation/deletion.
"""
# Constants
# Command lines string value
_GPHOTO = "gphoto2"
# Camera events check period in milliseconds
_EVENTS_PERIOD = 500
# Signals
onCameraConnection = pyqtSignal(bool)
"""``pyqtSignal(bool)`` This signal indicates if a camera is connected."""
onContentChanged = pyqtSignal(list, list)
"""``pyqtSignal(list, list)`` When watching the camera for new files,
emit this signal when there has been some changes in the camera
filesystem. Arguments are the lists of new files and deleted files
"""
_onWatchCamera = pyqtSignal(bool)
"""This signal indicates if the component should start or stop
watching for connection events with a camera."""
_onWatchFile = pyqtSignal(bool)
"""This signal indicates if the component should start or stop
watching for new files on the camera."""
def __init__(self, watch_camera=False, watch_files=False):
# Super constructor
QObject.__init__(self)
# _files_index is an internal dictionnary that associate all the
# files present on the camera, along with their gphoto index
self._files_index = dict()
# Create mutex for the camera
self._camera_lock = threading.Lock()
# Create an internal CameraWatcher
self._camera_watcher = Pygphoto._CameraWatcher(self, watch_camera, watch_files)
# Connect watch file and watch camera signals
self._onWatchFile.connect(self._camera_watcher.set_watching_files)
self._onWatchCamera.connect(self._camera_watcher.set_watching_camera)
# Forward signals from camera_watcher
self._camera_watcher.onCameraConnection.connect(self.__forward_onCameraConnection)
self._camera_watcher.onContentChanged.connect(self.__forward_onContentChanged)
# Create a thread to execute the CameraWatcher in
self._watcher_thread = QThread()
self._camera_watcher.moveToThread(self._watcher_thread)
# Start the watching
self._watcher_thread.started.connect(self._camera_watcher._watch_events)
self._watcher_thread.start()
[docs] def check_camera_connected(self):
"""
Returns:
A boolean indicating the presence of a connected camera.
Raises:
CalledProcessError: when gphoto2 raised an error.
"""
# Try an auto-detect and see if there are results
command = [Pygphoto._GPHOTO, "--auto-detect"]
with self._camera_lock:
output = subprocess.check_output(command).decode("utf-8")
lines = output.splitlines()
# The first two lines are
# Model Port
# ----------------------------------------------------------
# Then comes the list of connected camera (that can be empty)
return (len(lines) > 2)
[docs] def query_camera_name(self):
"""
Returns:
The camera model name, or None.
"""
result = None
model_string = "Model: "
command = [Pygphoto._GPHOTO, "--summary"]
try:
with self._camera_lock:
output = subprocess.check_output(command).decode("utf-8")
lines = output.splitlines()
for line in lines:
if line.startswith(model_string):
# The name follows the model_string
result = line[len(model_string):]
except subprocess.CalledProcessError:
pass
return result
[docs] def query_storage_info(self):
"""
Returns:
A dictionnary of values concerning memory usage {free, occupied,
total} containing values in KB.
Raises:
CalledProcessError: when gphoto2 raised an error.
"""
def first_int(string):
# Take the first result of all the digit-only substrings
return int(re.findall(r"\d+", string)[0])
result = dict()
command = [Pygphoto._GPHOTO, "--storage-info"]
with self._camera_lock:
output = subprocess.check_output(command).decode("utf_8")
output = output.splitlines()
# Scan output for relevant information
for line in output:
if line.startswith("totalcapacity="):
result["total"] = first_int(line)
elif line.startswith("free="):
result["free"] = first_int(line)
# Compute occupied value
result["occupied"] = result["total"] - result["free"]
return result
def _filelist_to_dict(filenames_list):
"""Convert a filename list to a dictionnary associating filenames with
their index.
Args:
filenames_list (list): The list of filenames.
Returns:
A dictionnary associating each filename with their original list index.
"""
# Number each files, starting with 1
return dict(zip(filenames_list, range(1, len(filenames_list) + 1)))
def _query_filename(self, index):
"""Return the filename of the file indexed "index" when listing all
the files present on the camera
Raises a CalledProcessError when gphoto2 raised an error.
"""
# Show info on the file indexed "index"
command = [Pygphoto._GPHOTO, "--show-info", str(index)]
with self._camera_lock:
output = subprocess.check_output(command).decode("utf-8")
# The filename is the fourth word
filename = output.split()[3]
# We remove the trailing simple quotes ("'")
return filename.strip("'")
[docs] def query_file_list(self):
"""Generate the list of filenames for all the files present on the
first camera found by requesting directly the camera.
Returns:
The list of all filenames present on the camera.
Raises:
CalledProcessError: when gphoto2 raised an error.
"""
retval = [] # Result list of filenames
# Grab the output of the list file command
command = [Pygphoto._GPHOTO,"--list-files"]
with self._camera_lock:
output = subprocess.check_output(command).decode("utf-8")
# Parse the output for "#" lines
for line in iter(output.splitlines()):
if line[0] == "#":
# Split every one or more whitespaces
words = line.split()
filename = words[1]
retval.append(filename)
return retval
[docs] def download_file(self, filename, output_dir, overwrite=True, thumbnail=False):
"""Download the file name "filename" and copy it to the given path.
Args:
filename (str): The name of the file to download.
output_dir (str): The directory where the files should be downloaded to.
overwrite=True (bool): If any existing file with the same name should be overwritten.
thumbnail=False (bool): Download the thumbnail instead of the file.
Returns:
0 if succeeded. Else returns the error code returned by
gphoto.
"""
# Check that the output dir is a valid directory
assert(os.path.isdir(output_dir))
# Get the gphoto index of the file and check it is up to date
if(not filename in self._files_index
or not self._query_filename(self._files_index[filename]) == filename):
# Update the files dictionnary
self._files_index = Pygphoto._filelist_to_dict(self.query_file_list())
index = self._files_index[filename]
# The destination is "output_dir/filename
destination_path = os.path.normpath(os.path.join(output_dir, filename))
# Check that the file does not already exist
if(os.path.exists(destination_path)):
if(overwrite):
# First remove the file
os.remove(destination_path)
else:
# Do nothing
return 0
# Determine the command name
if(thumbnail):
command = "--get-thumbnail"
else:
command = "--get-file"
command_line = [Pygphoto._GPHOTO,
command, str(index),
"--filename", destination_path]
with self._camera_lock:
return_code = subprocess.call(command_line)
return return_code
[docs] def download_files(self, filename_list, output_dir, overwrite=True, thumbnail=False):
"""Download the whole list of files to the ouput directory
This is equivalent to calling download_file on every file in
the "filename_list", but should be faster for a large number
of files.
Args:
filename_list (list): The name list of files to download.
output_dir (str): The directory where the files should be downloaded to.
overwrite=True (bool): If any existing file with the same name should be overwritten.
thumbnail=False (bool): Download the thumbnail instead of the file.
Returns:
files_list (list): The paths list of all downloaded files.
"""
# Check that the output dir is a valid directory
assert(os.path.isdir(output_dir))
# Init the result list
result = []
# Determine the command name
if(thumbnail):
command = "--get-thumbnail"
else:
command = "--get-file"
# Update the files dictionnary
self._files_index = Pygphoto._filelist_to_dict(self.query_file_list())
# Download each file
for filename in filename_list:
index = self._files_index[filename]
# The destination is "output_dir/filename
destination_path = os.path.normpath(os.path.join(output_dir, filename))
command_line = [Pygphoto._GPHOTO,
command, str(index),
"--filename", destination_path]
# Check that the file does not already exist
if(os.path.exists(destination_path)):
if(overwrite):
# First remove the file
os.remove(destination_path)
else:
# Do nothing
continue
with self._camera_lock:
return_code = subprocess.call(command_line)
if(return_code == 0):
result.append(destination_path)
return result
[docs] def download_all(self, output_dir, overwrite=True, thumbnail=False):
"""Download all the files present on the camera.
Overwrites preexisting files. Faster than 'download_files()'.
Args:
output_dir (str): The directory where the files should be downloaded to.
overwrite=True (bool): If any existing file with the same name should be overwritten.
thumbnail=False (bool): Download the thumbnail instead of the file.
Returns:
The return code returned by the gphoto2
"""
# Check that the output dir is a valid directory
assert(os.path.isdir(output_dir))
# The destination is "output_dir/filename.suffix
destination_path = os.path.normpath(os.path.join(output_dir, "%f.%C"))
if(thumbnail):
command = "--get-all-thumbnails"
else:
command = "--get-all-files"
command_line = [Pygphoto._GPHOTO,
command,
"--filename",
destination_path]
if(overwrite):
command_line.append("--force-overwrite")
with self._camera_lock:
return_code = subprocess.call(command_line)
return return_code
###############################
# Watching functionality #
###############################
# If we want to forward signals
@pyqtSlot(bool)
def __forward_onCameraConnection(self, boolean):
self.onCameraConnection.emit(boolean)
@pyqtSlot(list, list)
def __forward_onContentChanged(self, newfiles, delfiles):
self.onContentChanged.emit(newfiles, delfiles)
@pyqtSlot(bool)
[docs] def set_watching_files(self, value):
"""Set whether the component should watch for changes in the camera
filesystem.
Args:
value (bool): True for watching.
"""
self._onWatchFile.emit(value)
@pyqtSlot(bool)
[docs] def set_watching_camera(self, value):
"""Set whether the component should watch for presence of a connected
camera.
Args:
value (bool): True for watching.
"""
self._onWatchCamera.emit(value)
class _CameraWatcher(QObject):
"""Internal class responsible for active watching. The threading is
made by moving the instance of this class to a QThread. Using
QThread allows for easy asynchronous communication through the
use of Qt‘s signals and slots.
"""
# Signals
onCameraConnection = pyqtSignal(bool)
"""This signal indicates if a camera is connected"""
onContentChanged = pyqtSignal(list, list)
"""When watching the camera for new files, emit this signal when there
has been some changes in the camera filesystem. Arguments are
the lists of new files and deleted files
"""
def __init__(self, pygphoto, watch_camera, watch_files):
# Super constructor
QObject.__init__(self)
# The reference to the gphoto instance to use
self.__pygph = pygphoto
# _files_index is an internal dictionnary that associate
# all the files present on the camera, along with their
# gphoto index
self._files_index = dict()
# Internal lock for accessing some of the attributes
self.__lock__ = threading.RLock()
# _watching_files indicates if this component is watching for new files
self._watching_files = watch_files
# _watching_camera indicates if this component is watching for camera connections
self._watching_camera = watch_camera
# The memorized state of camera connection
self._camera_connection = False
# The memorized occupied space on camera
self._camera_occupied_space = 0
@pyqtSlot(bool)
def set_watching_files(self, value):
"""Set whether the component should watch for changes in the camera
filesystem.
"""
print("Set watching files: " + str(value))
with self.__lock__:
self._watching_files = value
@pyqtSlot(bool)
def set_watching_camera(self, value):
"""Set whether the component should watch for presence of a connected
camera.
"""
with self.__lock__:
self._watching_camera = value
def is_watching_file(self):
"""Indicates whether the component is watching for changes in the camera
filesystem.
"""
with self.__lock__:
return self._watching_files
def is_watching_camera(self):
"""Indicates whether the component is watching for presence of a connected
camera.
"""
with self.__lock__:
return self._watching_camera
@pyqtSlot()
def _watch_events(self):
"""Executed by the watching thread : checks for every events.
"""
# Check for the different events
with self.__lock__:
if self._watching_camera:
self._watch_camera()
if (self._watching_files
and self._camera_connection):
self._watch_files()
# Non blocking 'recursive' call
QTimer.singleShot(Pygphoto._EVENTS_PERIOD, self._watch_events)
def _watch_camera(self):
"""Check for a change in camera connection, possibly raising a
onCameraConnection signal.
"""
new_camera_connection = self.__pygph.check_camera_connected()
if(new_camera_connection != self._camera_connection):
# Raise a signal
self.onCameraConnection.emit(new_camera_connection)
# Set new state
self._camera_connection = new_camera_connection
def _watch_files(self):
"""Check for changes in the camera filesystem, possibly raising a
onContentChanged signal.
"""
try:
new_occupied_space = self.__pygph.query_storage_info()
if(new_occupied_space != self._camera_occupied_space):
# Search for new and deleted files
diff = self._diff_files()
# Check there are actually some changes
if(diff[0] or diff[1]):
# Raise a signal
self.onContentChanged.emit(diff[0], diff[1])
# Set new state
self._camera_occupied_space = new_occupied_space
except subprocess.CalledProcessError:
pass
def _diff_files(self):
"""Query the camera and return the couple of lists (new_files,
deleted_files) relatively to the last check.
"""
new_files = []
deleted_files = []
# Copy last index
last_files_index = self._files_index.copy()
# Update index
self._files_index = Pygphoto._filelist_to_dict(self.__pygph.query_file_list())
# Check for new files
for recent_file in self._files_index:
# Check recent_files was already present
if not(recent_file in last_files_index):
new_files.append(recent_file)
# Check for deleted files
for last_file in last_files_index:
# Check last_files is still present
if not(last_file in self._files_index):
deleted_files.append(last_file)
return (new_files, deleted_files)
if __name__ == "__main__":
# TESTINGS
from PyQt5.QtWidgets import QApplication
class TestPygphoto(QObject):
def __init__(self):
QObject.__init__(self)
@pyqtSlot(bool)
def connectCamera(self, boolean):
print("Camera connected : " + str(boolean))
@pyqtSlot(list, list)
def newFiles(self, new, deleted):
print("New files : " + str(new))
print("Deleted files : " + str(deleted))
app = QApplication(sys.argv)
testpygph = TestPygphoto()
pygph = Pygphoto(watch_camera=True, watch_files=True)
pygph.onCameraConnection.connect(testpygph.connectCamera)
pygph.onContentChanged.connect(testpygph.newFiles)
print("\n~~~~~~~~ query_camera_name")
print(pygph.query_camera_name())
print("\n~~~~~~~~ _query_filename")
filename = pygph._query_filename(1)
print(filename)
print("\n~~~~~~~~ _query_file_list")
filelist = pygph.query_file_list()
print("\n~~~~~~~~ download_file False")
print(pygph.download_file(filename, "test", overwrite=False))
print("\n~~~~~~~~ download_file False")
print(pygph.download_file(filename, "test", overwrite=False))
print("\n~~~~~~~~ download_file True")
print(pygph.download_file(filename, "test", overwrite=True))
print("\n~~~~~~~~ download_all_thumbnails")
print(pygph.download_all("thumbnails", thumbnail=True))
print("\n~~~~~~~~ download_files False")
print(pygph.download_files(filelist, "test", overwrite=False))
print("\n~~~~~~~~ download_all")
print(pygph.download_all("test"))
input("Waiting for events now. Press Return to disable watching.\n\n")
pygph.set_watching_camera(False)
pygph.set_watching_files(False)
input("Watching disabled. Press Return to end.\n\n")