feat: first release

This commit is contained in:
2025-05-24 22:46:35 +07:00
commit 9c6d9c491f
8 changed files with 1378 additions and 0 deletions

184
.gitignore vendored Normal file
View File

@@ -0,0 +1,184 @@
zen_sync_config.json
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# UV
# Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
#uv.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/latest/usage/project/#working-with-version-control
.pdm.toml
.pdm-python
.pdm-build/
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
# Visual Studio Code
# Visual Studio Code specific template is maintained in a separate VisualStudioCode.gitignore
# that can be found at https://github.com/github/gitignore/blob/main/Global/VisualStudioCode.gitignore
# and can be added to the global gitignore or merged into this file. However, if you prefer,
# you could uncomment the following to ignore the enitre vscode folder
# .vscode/
# Ruff stuff:
.ruff_cache/
# PyPI configuration file
.pypirc

21
LICENSE Normal file
View File

@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2025 Yuzu
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

81
README.md Normal file
View File

@@ -0,0 +1,81 @@
# 🧘‍♀️ Zen-Sync
A Windows only command-line tool for syncing [Zen Browser](https://zen-browser.app/) data with S3-compatible storage services.
## 🤔 What it does
Since Zen Browser doesn't have proper profile sync yet, this is my quick solution built in a few hours to keep my stuffs in sync across multiple machines.
It backs up all the important stuff to any S3-compatible cloud storage so you can restore or "sync" your profile anywhere. No more manually dragging around profile folders every time you edit a settings 🥹🥹😭. I'm so done with that.
The default (customizable) setting skips session cookies, temporary storage, and other data because sites I visit can detect copied sessions through fingerprinting and will invalidate them.
## ✨ Features
- 🔄 **Bidirectional sync** between local and S3 storage
- 🔍 **Filtering** - only syncs important files, excludes cache and temporary data
-**"Incremental" sync** - only uploads/downloads changed files
- 🔗 **Custom S3 endpoints** - works with any S3-compatible service
## 📋 What gets synced by default
**Included:**
- 📁 Profile configuration (`profiles.ini`, `installs.ini`, `compatibility.ini`)
- 🗃️ Profile Groups databases (`Profile Groups/*.sqlite`)
- 📚 Bookmarks (`places.sqlite`, `bookmarks.html`)
- 🔒 Saved passwords and certificates (`key4.db`, `cert9.db`, `logins.json`)
- 🧩 Extensions and their settings (`extensions.json`, `extension-*.json`)
- 🎨 Custom themes and CSS (`zen-*.json`, `zen-*.css`, `userChrome.css`, `userContent.css`)
- ⚙️ Browser preferences (`prefs.js`, `user.js`)
- 🔍 Search engine settings (`search.json.mozlz4`)
- 🖼️ Favicons (`favicons.sqlite`)
- 📂 Chrome folder customizations (`chrome/**/*`)
- 📔 and other files from customizable ruleset
**Excluded:**
- 🗑️ Cache files (`cache2/*`, `thumbnails/*`, `shader-cache/*`)
- 📜 Logs and crash reports (`logs/*`, `crashes/*`, `minidumps/*`)
- 🔒 Lock files (`*.lock`, `*.lck`, `parent.lock`)
- 💾 Temporary storage (`storage/temporary/*`, `storage/*/ls/*`)
- 📋 Session data (`sessionstore.jsonlz4`, `sessionCheckpoints.json`)
- 🍪 Session cookies (`cookies.sqlite*`)
- 🛡️ Temporary browsing data (`webappsstore.sqlite*`, `safebrowsing/*`)
Use `--help` with any command for detailed options.
## 🚀 Quick Start
1. ⚙️ **Configure your S3 settings:**
```bash
python zensync.py configure --bucket your-bucket-name --endpoint-url https://your-s3-endpoint.com
```
or just run ```python zensync.py configure``` then edit the configuration json manually.
2. ⬆️ **Upload your profiles:**
```bash
python zensync.py upload
```
3. ⬇️ **Download profiles on another machine:**
```bash
python zensync.py download
```
4. 🔄 **Two-way sync:**
```bash
python zensync.py sync
```
## Main Commands 🎮
- ⚙️ `configure` - Set up S3 credentials and paths
- ⬆️ `upload` - Backup profiles to S3
- ⬇️ `download` - Restore profiles from S3
- 🔄 `sync` - Bidirectional synchronization
- 📋 `list-profiles` - Show available local profiles
- `profile-info` - Display profile system information
## 📝 Configuration
Settings are stored in `zen_sync_config.json`.

259
cli.py Normal file
View File

@@ -0,0 +1,259 @@
import argparse
import sys
import json
import logging
from config import ZenSyncConfig
from sync import ZenS3Sync
logger = logging.getLogger(__name__)
def create_parser():
parser = argparse.ArgumentParser(
description="Zen Browser Profile S3 Sync Tool",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
zensync upload --bucket my-backup-bucket
zensync download --bucket my-backup-bucket
zensync sync --bucket my-backup-bucket
zensync configure --bucket my-bucket --endpoint-url http://localhost:9000
zensync list-profiles
"""
)
parser.add_argument('--config', default='zen_sync_config.json', help='Configuration file path')
parser.add_argument('--roaming-path', help='Override Zen roaming data path')
parser.add_argument('--local-path', help='Override Zen local data path')
parser.add_argument('--verbose', '-v', action='store_true', help='Enable verbose logging')
subparsers = parser.add_subparsers(dest='command', help='Available commands')
# Upload command
upload_parser = subparsers.add_parser('upload', help='Upload profiles to S3')
upload_parser.add_argument('--bucket', help='S3 bucket name')
upload_parser.add_argument('--prefix', default='zen-profiles/', help='S3 key prefix')
upload_parser.add_argument('--dry-run', action='store_true', help='Show what would be uploaded')
upload_parser.add_argument('--no-cache', action='store_true', help='Disable cache data upload')
upload_parser.add_argument('--force-full', action='store_true', help='Force full upload')
upload_parser.add_argument('--cleanup', action='store_true', help='Remove S3 files that no longer exist locally')
# Download command
download_parser = subparsers.add_parser('download', help='Download profiles from S3')
download_parser.add_argument('--bucket', help='S3 bucket name')
download_parser.add_argument('--prefix', default='zen-profiles/', help='S3 key prefix')
download_parser.add_argument('--dry-run', action='store_true', help='Show what would be downloaded')
download_parser.add_argument('--no-cache', action='store_true', help='Disable cache data download')
download_parser.add_argument('--force-full', action='store_true', help='Force full download')
download_parser.add_argument('--cleanup', action='store_true', help='Remove local files that no longer exist in S3')
# Sync command
sync_parser = subparsers.add_parser('sync', help='Bidirectional sync between local and S3')
sync_parser.add_argument('--bucket', help='S3 bucket name')
sync_parser.add_argument('--prefix', default='zen-profiles/', help='S3 key prefix')
sync_parser.add_argument('--dry-run', action='store_true', help='Show what would be synced')
sync_parser.add_argument('--no-cache', action='store_true', help='Disable cache data sync')
sync_parser.add_argument('--cleanup', action='store_true', help='Remove orphaned files')
# List profiles command
subparsers.add_parser('list-profiles', help='List available local profiles')
# Profile info command
subparsers.add_parser('profile-info', help='Show profile system information')
# Configure command
config_parser = subparsers.add_parser('configure', help='Configure sync settings')
config_parser.add_argument('--bucket', help='Set S3 bucket name')
config_parser.add_argument('--region', help='Set AWS region')
config_parser.add_argument('--endpoint-url', help='Set S3-compatible service endpoint')
config_parser.add_argument('--access-key', help='Set AWS access key ID')
config_parser.add_argument('--secret-key', help='Set AWS secret access key')
config_parser.add_argument('--profile', help='Set AWS profile name')
config_parser.add_argument('--roaming-path', help='Set Zen roaming data path')
config_parser.add_argument('--local-path', help='Set Zen local data path')
config_parser.add_argument('--auto-detect', action='store_true', help='Auto-detect Zen browser paths')
config_parser.add_argument('--enable-cache-sync', action='store_true', help='Enable cache data sync')
config_parser.add_argument('--disable-cache-sync', action='store_true', help='Disable cache data sync')
config_parser.add_argument('--disable-metadata', action='store_true', help='Disable S3 metadata')
config_parser.add_argument('--enable-metadata', action='store_true', help='Enable S3 metadata')
config_parser.add_argument('--signature-version', choices=['s3', 's3v4'], help='Set AWS signature version')
return parser
def handle_configure(args, config):
"""Handle configure command"""
if args.bucket:
config.config['aws']['bucket'] = args.bucket
if args.region:
config.config['aws']['region'] = args.region
if getattr(args, 'endpoint_url', None):
config.config['aws']['endpoint_url'] = args.endpoint_url
logger.info(f"Using custom S3 endpoint: {args.endpoint_url}")
if args.access_key:
config.config['aws']['access_key_id'] = args.access_key
logger.warning("Storing AWS access key in config file")
if args.secret_key:
config.config['aws']['secret_access_key'] = args.secret_key
logger.warning("Storing AWS secret key in config file")
if args.profile:
config.config['aws']['profile'] = args.profile
config.config['aws']['access_key_id'] = ""
config.config['aws']['secret_access_key'] = ""
logger.info(f"Configured to use AWS profile: {args.profile}")
if args.roaming_path:
config.config['sync']['zen_roaming_path'] = args.roaming_path
if args.local_path:
config.config['sync']['zen_local_path'] = args.local_path
if args.auto_detect:
auto_paths = config.auto_detect_zen_paths()
if auto_paths['roaming']:
config.config['sync']['zen_roaming_path'] = auto_paths['roaming']
print(f"Auto-detected roaming path: {auto_paths['roaming']}")
if auto_paths['local']:
config.config['sync']['zen_local_path'] = auto_paths['local']
print(f"Auto-detected local path: {auto_paths['local']}")
if args.enable_cache_sync:
config.config['sync']['sync_cache_data'] = True
if args.disable_cache_sync:
config.config['sync']['sync_cache_data'] = False
if getattr(args, 'disable_metadata', False):
config.config['aws']['disable_metadata'] = True
logger.info("S3 metadata disabled")
if getattr(args, 'enable_metadata', False):
config.config['aws']['disable_metadata'] = False
logger.info("S3 metadata enabled")
if getattr(args, 'signature_version', None):
config.config['aws']['signature_version'] = args.signature_version
logger.info(f"AWS signature version set to: {args.signature_version}")
config.save_config()
display_config = json.loads(json.dumps(config.config))
if display_config['aws'].get('secret_access_key'):
display_config['aws']['secret_access_key'] = "***HIDDEN***"
print("\nConfiguration updated:")
print(json.dumps(display_config, indent=2))
def handle_list_profiles(sync):
"""Handle list-profiles command"""
profiles = sync.list_profiles()
if profiles:
print(f"\nAvailable Zen Browser Profiles:")
print("=" * 70)
for profile_id, info in profiles.items():
status = " (Default)" if info['is_default'] else ""
print(f"{info['name']}{status}")
print(f" Profile ID: {profile_id}")
print(f" Path: {info['path']}")
print(f" Store ID: {info.get('store_id', 'N/A')}")
print(f" Full Path: {info['full_path']}")
print()
else:
print("No profiles found")
def handle_profile_info(sync):
"""Handle profile-info command"""
info = sync.get_profile_info()
print(f"\nZen Browser Profile System Information:")
print("=" * 70)
print(f"System Type: {info['system_type']}")
print("\nPaths:")
for path_name, path_value in info['paths'].items():
print(f" {path_name}: {path_value}")
print(f"\nProfiles Found: {len(info['profiles'])}")
if info['profiles']:
for profile_id, profile_info in info['profiles'].items():
status = " (Default)" if profile_info['is_default'] else ""
print(f"{profile_info['name']}{status}")
if 'profile_groups' in info:
print(f"\nProfile Groups:")
if info['profile_groups'].get('exists'):
print(f" Path: {info['profile_groups']['path']}")
print(f" Databases: {', '.join(info['profile_groups'].get('databases', []))}")
else:
print(" Not found")
def run_cli():
"""Main CLI entry point"""
parser = create_parser()
args = parser.parse_args()
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
config = ZenSyncConfig(args.config)
if args.roaming_path:
config.config['sync']['zen_roaming_path'] = args.roaming_path
if args.local_path:
config.config['sync']['zen_local_path'] = args.local_path
if args.command == 'configure':
handle_configure(args, config)
return
if args.command in ['upload', 'download', 'sync']:
if args.bucket:
config.config['aws']['bucket'] = args.bucket
if args.prefix:
config.config['aws']['prefix'] = args.prefix
if hasattr(args, 'no_cache') and args.no_cache:
config.config['sync']['sync_cache_data'] = False
logger.info("Cache sync disabled for this operation")
if not args.command:
parser.print_help()
return
try:
require_s3 = args.command not in ['list-profiles', 'profile-info']
if args.command in ['upload', 'download', 'sync'] and hasattr(args, 'dry_run') and args.dry_run:
require_s3 = True
logger.info("Dry run mode: Will analyze existing S3 objects")
sync = ZenS3Sync(config, require_s3=require_s3)
if args.command == 'upload':
incremental = not getattr(args, 'force_full', False)
cleanup = getattr(args, 'cleanup', False)
success = sync.upload_to_s3(
dry_run=args.dry_run,
incremental=incremental,
cleanup=cleanup
)
sys.exit(0 if success else 1)
elif args.command == 'download':
incremental = not getattr(args, 'force_full', False)
cleanup = getattr(args, 'cleanup', False)
success = sync.download_from_s3(
dry_run=args.dry_run,
incremental=incremental,
cleanup=cleanup
)
sys.exit(0 if success else 1)
elif args.command == 'sync':
cleanup = getattr(args, 'cleanup', False)
success = sync.sync_bidirectional(
dry_run=args.dry_run,
cleanup=cleanup
)
sys.exit(0 if success else 1)
elif args.command == 'list-profiles':
handle_list_profiles(sync)
elif args.command == 'profile-info':
handle_profile_info(sync)
except Exception as e:
logger.error(f"Error: {e}")
if args.verbose:
import traceback
traceback.print_exc()
sys.exit(1)

108
config.py Normal file
View File

@@ -0,0 +1,108 @@
import os
import json
import logging
import platform
from pathlib import Path
from typing import Dict
logger = logging.getLogger(__name__)
class ZenSyncConfig:
"""Configuration management for Zen sync operations"""
def __init__(self, config_file: str = "zen_sync_config.json"):
self.config_file = config_file
self.config = self.load_config()
def load_config(self) -> Dict:
"""Load configuration from file or create default"""
default_config = {
"aws": {
"region": "us-east-1",
"bucket": "",
"prefix": "zen-profiles/",
"endpoint_url": "",
"disable_metadata": False,
"signature_version": "s3v4",
"access_key_id": "",
"secret_access_key": "",
"profile": ""
},
"sync": {
"zen_roaming_path": "",
"zen_local_path": "",
"sync_cache_data": False,
"exclude_patterns": [
"*.lock", "*.lck", "*-wal", "*-shm", "*-journal",
"parent.lock", "cookies.sqlite*", "webappsstore.sqlite*",
"storage/temporary/*", "storage/default/*/ls/*", "storage/permanent/*/ls/*",
"cache2/*", "jumpListCache/*", "offlineCache/*", "thumbnails/*",
"crashes/*", "minidumps/*", "shader-cache/*", "startupCache/*",
"safebrowsing/*", "logs/*", "sessionstore-backups/previous.jsonlz4",
"sessionstore-backups/upgrade.jsonlz4-*",
"Profile Groups/*.sqlite-shm", "Profile Groups/*.sqlite-wal"
],
"include_important": [
"*.ini", "prefs.js", "user.js", "userChrome.css", "userContent.css",
"bookmarks.html", "places.sqlite", "favicons.sqlite", "key4.db",
"cert9.db", "extensions.json", "extension-settings.json",
"extension-preferences.json", "search.json.mozlz4", "handlers.json",
"containers.json", "zen-*.json", "zen-*.css", "chrome/**/*",
"profiles.ini", "installs.ini", "Profile Groups/*.sqlite",
"zen-keyboard-shortcuts.json", "zen-themes.json", "sessionstore.jsonlz4",
"sessionCheckpoints.json", "logins.json", "compatibility.ini"
]
}
}
if os.path.exists(self.config_file):
try:
with open(self.config_file, 'r') as f:
config = json.load(f)
# Merge with defaults for missing keys
for key in default_config:
if key not in config:
config[key] = default_config[key]
elif isinstance(default_config[key], dict):
for subkey in default_config[key]:
if subkey not in config[key]:
config[key][subkey] = default_config[key][subkey]
return config
except Exception as e:
logger.warning(f"Error loading config file: {e}. Using defaults.")
return default_config
def auto_detect_zen_paths(self) -> Dict[str, str]:
"""Auto-detect Zen browser installation paths"""
system = platform.system()
paths = {"roaming": "", "local": ""}
if system == "Windows":
roaming = os.path.expandvars(r"%APPDATA%\zen")
local = os.path.expandvars(r"%LOCALAPPDATA%\zen")
elif system == "Darwin":
home = os.path.expanduser("~")
roaming = os.path.join(home, "Library", "Application Support", "zen")
local = os.path.join(home, "Library", "Caches", "zen")
else:
home = os.path.expanduser("~")
roaming = os.path.join(home, ".zen")
local = os.path.join(home, ".cache", "zen")
if os.path.exists(roaming):
paths["roaming"] = roaming
if os.path.exists(local):
paths["local"] = local
return paths
def save_config(self):
"""Save current configuration to file"""
try:
with open(self.config_file, 'w') as f:
json.dump(self.config, f, indent=2)
logger.info(f"Configuration saved to {self.config_file}")
except Exception as e:
logger.error(f"Error saving config: {e}")

666
sync.py Normal file
View File

@@ -0,0 +1,666 @@
import os
import sys
import logging
import configparser
from pathlib import Path
from datetime import datetime
from typing import List, Dict, Set, Tuple, Optional
import boto3
from botocore.exceptions import ClientError, NoCredentialsError
from tqdm import tqdm
import fnmatch
from boto3.session import Config
from config import ZenSyncConfig
from utils import calculate_file_hash, format_size
logger = logging.getLogger(__name__)
class ZenS3Sync:
"""Main sync class for Zen Browser profiles"""
def __init__(self, config: ZenSyncConfig, require_s3: bool = True):
self.config = config
self.s3_client = None
self.bucket = config.config['aws']['bucket']
self.prefix = config.config['aws']['prefix']
self._initialize_paths()
self.exclude_patterns = config.config['sync']['exclude_patterns']
self.include_patterns = config.config['sync']['include_important']
if require_s3:
if not self.bucket:
raise ValueError("S3 bucket name must be configured")
self._init_s3_client()
def _initialize_paths(self):
"""Initialize Zen browser paths"""
sync_config = self.config.config['sync']
auto_paths = self.config.auto_detect_zen_paths()
self.zen_roaming_path = Path(sync_config['zen_roaming_path'] or auto_paths['roaming'] or '')
self.zen_local_path = Path(sync_config['zen_local_path'] or auto_paths['local'] or '')
logger.info(f"Zen Browser paths:")
logger.info(f" Roaming: {self.zen_roaming_path}")
logger.info(f" Local: {self.zen_local_path}")
if not self.zen_roaming_path.exists():
logger.warning(f"Roaming path does not exist: {self.zen_roaming_path}")
if not self.zen_local_path.exists():
logger.warning(f"Local path does not exist: {self.zen_local_path}")
def _init_s3_client(self):
"""Initialize S3 client"""
try:
aws_config = self.config.config['aws']
session_kwargs = {}
client_kwargs = {'region_name': aws_config['region']}
config_settings = {}
if aws_config.get('signature_version'):
config_settings['signature_version'] = aws_config['signature_version']
if aws_config.get('endpoint_url'):
client_kwargs['endpoint_url'] = aws_config['endpoint_url']
config_settings['s3'] = {'addressing_style': 'path'}
logger.info(f"Using S3 endpoint: {aws_config['endpoint_url']}")
if config_settings:
client_kwargs['config'] = Config(**config_settings)
if aws_config.get('profile'):
session_kwargs['profile_name'] = aws_config['profile']
logger.info(f"Using AWS profile: {aws_config['profile']}")
elif aws_config.get('access_key_id') and aws_config.get('secret_access_key'):
client_kwargs.update({
'aws_access_key_id': aws_config['access_key_id'],
'aws_secret_access_key': aws_config['secret_access_key']
})
logger.warning("Using credentials from config file")
if session_kwargs:
session = boto3.Session(**session_kwargs)
self.s3_client = session.client('s3', **client_kwargs)
else:
self.s3_client = boto3.client('s3', **client_kwargs)
self.s3_client.head_bucket(Bucket=self.bucket)
logger.info(f"Connected to S3, bucket: {self.bucket}")
except NoCredentialsError:
logger.error("AWS credentials not found")
sys.exit(1)
except ClientError as e:
if e.response['Error']['Code'] == '404':
logger.error(f"S3 bucket '{self.bucket}' not found")
else:
logger.error(f"Error connecting to S3: {e}")
sys.exit(1)
def _get_s3_key(self, file_path: Path, base_path: Path, path_type: str) -> str:
relative_path = file_path.relative_to(base_path)
if path_type in ['roaming', 'local']:
return f"{self.prefix}{path_type}/{relative_path}".replace('\\', '/')
return f"{self.prefix}{relative_path}".replace('\\', '/')
def _get_relative_s3_key(self, file_path: Path, base_path: Path, path_type: str) -> str:
relative_path = file_path.relative_to(base_path)
if path_type in ['roaming', 'local']:
return f"{path_type}/{relative_path}".replace('\\', '/')
return str(relative_path).replace('\\', '/')
def _get_download_path(self, relative_path: str) -> Optional[Path]:
if relative_path.startswith('roaming/'):
return self.zen_roaming_path / relative_path[8:] if self.zen_roaming_path else None
elif relative_path.startswith('local/'):
if self.zen_local_path and self.config.config['sync']['sync_cache_data']:
return self.zen_local_path / relative_path[6:]
return None
return self.zen_roaming_path / relative_path if self.zen_roaming_path else None
def _get_file_info(self, file_path: Path) -> Dict:
"""Get file information for comparison"""
try:
stat = file_path.stat()
return {
'size': stat.st_size,
'mtime': int(stat.st_mtime),
'hash': calculate_file_hash(file_path),
'exists': True
}
except (OSError, FileNotFoundError):
return {'exists': False}
def _files_are_different(self, local_info: Dict, s3_info: Dict) -> bool:
"""Compare local file with S3 object"""
if not local_info['exists'] or not s3_info['exists']:
return True
# Use hash comparison if available (apparently some s3 don't support putting custom metadata)
if (local_info.get('hash') and s3_info.get('hash') and
local_info['hash'] and s3_info['hash']):
are_different = local_info['hash'] != s3_info['hash']
if are_different:
logger.debug(f"Hash comparison: files different")
else:
logger.debug(f"Hash comparison: files identical")
return are_different
# Fallback to size comparison
if local_info['size'] != s3_info['size']:
logger.debug(f"Size comparison: files different")
return True
logger.debug(f"Size comparison: files identical")
return False
def _list_s3_objects(self) -> Dict[str, Dict]:
"""List all S3 objects with metadata"""
objects = {}
try:
paginator = self.s3_client.get_paginator('list_objects_v2')
pages = paginator.paginate(Bucket=self.bucket, Prefix=self.prefix)
for page in pages:
if 'Contents' in page:
for obj in page['Contents']:
relative_key = obj['Key'][len(self.prefix):]
obj_info = {
'size': obj['Size'],
'mtime': int(obj['LastModified'].timestamp()),
'etag': obj['ETag'].strip('"'),
'exists': True,
's3_key': obj['Key'],
'hash': None
}
# Try to get hash from metadata
try:
head_response = self.s3_client.head_object(Bucket=self.bucket, Key=obj['Key'])
if 'Metadata' in head_response and not self.config.config['aws'].get('disable_metadata', False):
metadata = head_response['Metadata']
if 'file-hash' in metadata:
obj_info['hash'] = metadata['file-hash']
elif 'file_hash' in metadata:
obj_info['hash'] = metadata['file_hash']
except Exception:
pass
objects[relative_key] = obj_info
except Exception as e:
logger.error(f"Error listing S3 objects: {e}")
return objects
def _log_sync_analysis(self, upload_files: List, download_files: List, skip_files: List, delete_files: List = None):
total_upload_size = sum(item[2] for item in upload_files)
total_download_size = sum(item[2] for item in download_files)
total_skip_size = sum(item[2] for item in skip_files)
logger.info(f"Sync analysis:")
logger.info(f" Upload: {len(upload_files)} files ({format_size(total_upload_size)})")
logger.info(f" Download: {len(download_files)} files ({format_size(total_download_size)})")
logger.info(f" Skip: {len(skip_files)} files ({format_size(total_skip_size)})")
if delete_files:
total_delete_size = sum(item[2] for item in delete_files)
logger.info(f" Delete: {len(delete_files)} files ({format_size(total_delete_size)})")
def _process_files(self, files: List, action: str, dry_run: bool, processor_func) -> bool:
if not files:
return True
logger.info(f"{'[DRY RUN] ' if dry_run else ''}{action.capitalize()} {len(files)} files...")
success_count = 0
error_count = 0
with tqdm(total=len(files), desc=action.capitalize(), unit="file") as pbar:
for file_args in files:
try:
if not dry_run:
processor_func(*file_args)
success_count += 1
except Exception as e:
logger.error(f"Error {action} {file_args[0]}: {e}")
error_count += 1
pbar.update(1)
return error_count == 0
def should_include_file(self, file_path: Path, base_path: Path) -> bool:
"""Check if file should be included in sync"""
relative_path = file_path.relative_to(base_path)
str_path = str(relative_path).replace('\\', '/')
for pattern in self.exclude_patterns:
if fnmatch.fnmatch(str_path, pattern) or fnmatch.fnmatch(file_path.name, pattern):
return False
for pattern in self.include_patterns:
if fnmatch.fnmatch(str_path, pattern) or fnmatch.fnmatch(file_path.name, pattern):
return True
return True
def get_local_files(self) -> List[tuple]:
"""Get list of local files to sync"""
files = []
if self.zen_roaming_path and self.zen_roaming_path.exists():
roaming_files = self._scan_directory(self.zen_roaming_path, 'roaming')
files.extend(roaming_files)
logger.info(f"Found {len(roaming_files)} files in roaming directory")
else:
logger.error("Roaming directory not found")
return []
if (self.zen_local_path and self.zen_local_path.exists() and
self.config.config['sync']['sync_cache_data']):
local_files = self._scan_directory(self.zen_local_path, 'local')
files.extend(local_files)
logger.info(f"Found {len(local_files)} files in local directory")
logger.info(f"Total files to sync: {len(files)}")
return files
def _scan_directory(self, base_path: Path, path_type: str) -> List[tuple]:
"""Scan directory for files to sync"""
files = []
for root, dirs, filenames in os.walk(base_path):
root_path = Path(root)
dirs_to_skip = []
for d in dirs:
should_skip = False
has_important_files = False
for pattern in self.exclude_patterns:
if '/' in pattern:
dir_pattern = pattern.split('/')[0]
if fnmatch.fnmatch(d, dir_pattern):
should_skip = True
break
if should_skip:
for pattern in self.include_patterns:
if '/' in pattern:
dir_pattern = pattern.split('/')[0]
if fnmatch.fnmatch(d, dir_pattern):
has_important_files = True
break
if should_skip and not has_important_files:
dirs_to_skip.append(d)
for d in dirs_to_skip:
dirs.remove(d)
for filename in filenames:
file_path = root_path / filename
if self.should_include_file(file_path, base_path):
files.append((file_path, base_path, path_type))
return files
def upload_to_s3(self, dry_run: bool = False, incremental: bool = True, cleanup: bool = False) -> bool:
"""Upload local Zen data to S3"""
files = self.get_local_files()
if not files:
logger.warning("No files found to upload")
return False
s3_objects = {}
if incremental or cleanup:
logger.info("Analyzing existing S3 objects...")
s3_objects = self._list_s3_objects()
files_to_upload, files_to_skip, files_to_delete = self._analyze_upload_files(files, s3_objects, incremental, cleanup)
self._log_sync_analysis(files_to_upload, [], files_to_skip, files_to_delete if cleanup else None)
if not files_to_upload and not files_to_delete:
logger.info("Everything is up to date!")
return True
upload_success = self._process_files(files_to_upload, "uploading", dry_run, self._upload_file_wrapper)
delete_success = True
if cleanup and files_to_delete:
delete_success = self._process_files(files_to_delete, "deleting", dry_run, self._delete_s3_file)
logger.info(f"Upload completed")
return upload_success and delete_success
def _analyze_upload_files(self, files: List, s3_objects: Dict, incremental: bool, cleanup: bool) -> Tuple[List, List, List]:
files_to_upload = []
files_to_skip = []
files_to_delete = []
logger.info(f"Analyzing {len(files)} local files...")
for file_path, base_path, path_type in files:
s3_key = self._get_s3_key(file_path, base_path, path_type)
relative_s3_key = self._get_relative_s3_key(file_path, base_path, path_type)
local_info = self._get_file_info(file_path)
if incremental and relative_s3_key in s3_objects:
s3_info = s3_objects[relative_s3_key]
if not self._files_are_different(local_info, s3_info):
files_to_skip.append((file_path, s3_key, local_info['size']))
continue
files_to_upload.append((file_path, s3_key, local_info['size'], path_type))
if cleanup:
local_s3_keys = {self._get_relative_s3_key(fp, bp, pt) for fp, bp, pt in files}
for s3_key in s3_objects:
if s3_key not in local_s3_keys:
s3_info = s3_objects[s3_key]
files_to_delete.append((s3_key, s3_info['s3_key'], s3_info['size']))
return files_to_upload, files_to_skip, files_to_delete
def download_from_s3(self, dry_run: bool = False, incremental: bool = True, cleanup: bool = False) -> bool:
"""Download Zen data from S3"""
try:
logger.info("Analyzing S3 objects...")
s3_objects = self._list_s3_objects()
if not s3_objects:
logger.warning(f"No objects found in S3 with prefix: {self.prefix}")
return False
files_to_download, files_to_skip, files_to_delete = self._analyze_download_files(s3_objects, incremental, cleanup)
self._log_sync_analysis([], files_to_download, files_to_skip, files_to_delete if cleanup else None)
if not files_to_download and not files_to_delete:
logger.info("Everything is up to date!")
return True
download_success = self._process_files(files_to_download, "downloading", dry_run, self._download_file_wrapper)
delete_success = True
if cleanup and files_to_delete:
delete_success = self._process_files(files_to_delete, "deleting local", dry_run, self._delete_local_file)
logger.info(f"Download completed")
return download_success and delete_success
except Exception as e:
logger.error(f"Error during download: {e}")
return False
def _analyze_download_files(self, s3_objects: Dict, incremental: bool, cleanup: bool) -> Tuple[List, List, List]:
files_to_download = []
files_to_skip = []
files_to_delete = []
logger.info(f"Analyzing {len(s3_objects)} S3 objects...")
for relative_s3_key, s3_info in s3_objects.items():
local_path = self._get_download_path(relative_s3_key)
if not local_path:
continue
local_info = self._get_file_info(local_path)
if incremental and local_info['exists']:
if not self._files_are_different(local_info, s3_info):
files_to_skip.append((local_path, s3_info['s3_key'], s3_info['size']))
continue
files_to_download.append((local_path, s3_info['s3_key'], s3_info['size'], relative_s3_key))
if cleanup:
local_files = self.get_local_files()
s3_relative_keys = set(s3_objects.keys())
for file_path, base_path, path_type in local_files:
relative_s3_key = self._get_relative_s3_key(file_path, base_path, path_type)
if relative_s3_key not in s3_relative_keys:
file_info = self._get_file_info(file_path)
if file_info['exists']:
files_to_delete.append((file_path, relative_s3_key, file_info['size']))
return files_to_download, files_to_skip, files_to_delete
def sync_bidirectional(self, dry_run: bool = False, cleanup: bool = False) -> bool:
"""Perform bidirectional sync between local and S3"""
logger.info("Starting bidirectional sync...")
local_files = self.get_local_files()
s3_objects = self._list_s3_objects()
local_lookup = {}
for file_path, base_path, path_type in local_files:
relative_s3_key = self._get_relative_s3_key(file_path, base_path, path_type)
local_lookup[relative_s3_key] = {
'path': file_path,
'info': self._get_file_info(file_path),
'path_type': path_type
}
upload_files, download_files, skip_files = self._analyze_bidirectional_sync(local_lookup, s3_objects)
self._log_sync_analysis(upload_files, download_files, skip_files)
if not upload_files and not download_files:
logger.info("Everything is in sync!")
return True
upload_success = self._process_files(upload_files, "uploading", dry_run, self._upload_file_wrapper)
download_success = self._process_files(download_files, "downloading", dry_run, self._download_file_wrapper)
logger.info("Bidirectional sync completed!")
return upload_success and download_success
def _analyze_bidirectional_sync(self, local_lookup: Dict, s3_objects: Dict) -> Tuple[List, List, List]:
upload_files = []
download_files = []
skip_files = []
for relative_key in set(local_lookup.keys()) & set(s3_objects.keys()):
local_info = local_lookup[relative_key]['info']
s3_info = s3_objects[relative_key]
if not self._files_are_different(local_info, s3_info):
skip_files.append((relative_key, None, local_info['size']))
continue
if local_info['mtime'] > s3_info['mtime']:
file_path = local_lookup[relative_key]['path']
path_type = local_lookup[relative_key]['path_type']
s3_key = s3_objects[relative_key]['s3_key']
upload_files.append((file_path, s3_key, local_info['size'], path_type))
else:
local_path = local_lookup[relative_key]['path']
s3_key = s3_objects[relative_key]['s3_key']
download_files.append((local_path, s3_key, s3_info['size'], relative_key))
for relative_key in set(local_lookup.keys()) - set(s3_objects.keys()):
local_data = local_lookup[relative_key]
file_path = local_data['path']
path_type = local_data['path_type']
base_path = self.zen_roaming_path if path_type == 'roaming' else self.zen_local_path
s3_key = self._get_s3_key(file_path, base_path, path_type)
upload_files.append((file_path, s3_key, local_data['info']['size'], path_type))
for relative_key in set(s3_objects.keys()) - set(local_lookup.keys()):
s3_info = s3_objects[relative_key]
local_path = self._get_download_path(relative_key)
if local_path:
download_files.append((local_path, s3_info['s3_key'], s3_info['size'], relative_key))
return upload_files, download_files, skip_files
def _upload_file_wrapper(self, file_path: Path, s3_key: str, size: int, path_type: str):
self._upload_file(file_path, s3_key, path_type)
def _download_file_wrapper(self, local_path: Path, s3_key: str, size: int, relative_key: str):
self._download_file(s3_key, local_path)
def _delete_s3_file(self, relative_key: str, s3_key: str, size: int):
self.s3_client.delete_object(Bucket=self.bucket, Key=s3_key)
def _delete_local_file(self, file_path: Path, relative_key: str, size: int):
file_path.unlink()
try:
file_path.parent.rmdir()
except OSError:
pass
def _upload_file(self, file_path: Path, s3_key: str, path_type: str):
"""Upload a single file to S3"""
if not self.config.config['aws'].get('disable_metadata', False):
file_hash = calculate_file_hash(file_path)
metadata = {
'path-type': path_type,
'original-mtime': str(int(file_path.stat().st_mtime)),
'file-hash': file_hash
}
try:
with open(file_path, 'rb') as file_data:
self.s3_client.put_object(
Bucket=self.bucket,
Key=s3_key,
Body=file_data,
Metadata=metadata
)
except ClientError as e:
error_msg = str(e)
if ('AccessDenied' in error_msg or 'headers' in error_msg.lower() or
'not signed' in error_msg or 'signature' in error_msg.lower()):
logger.warning(f"Metadata error, retrying without metadata for {file_path.name}")
with open(file_path, 'rb') as file_data:
self.s3_client.put_object(
Bucket=self.bucket,
Key=s3_key,
Body=file_data
)
if not self.config.config['aws'].get('disable_metadata', False):
self.config.config['aws']['disable_metadata'] = True
self.config.save_config()
logger.info("Auto-disabled metadata for compatibility")
else:
raise
else:
with open(file_path, 'rb') as file_data:
self.s3_client.put_object(
Bucket=self.bucket,
Key=s3_key,
Body=file_data
)
def _download_file(self, s3_key: str, local_path: Path):
"""Download a single file from S3"""
local_path.parent.mkdir(parents=True, exist_ok=True)
self.s3_client.download_file(
self.bucket,
s3_key,
str(local_path)
)
# Try to restore modification time
try:
obj_metadata = self.s3_client.head_object(Bucket=self.bucket, Key=s3_key)
if ('Metadata' in obj_metadata and
not self.config.config['aws'].get('disable_metadata', False)):
metadata = obj_metadata['Metadata']
original_mtime = None
if 'original-mtime' in metadata:
original_mtime = int(metadata['original-mtime'])
elif 'original_mtime' in metadata:
original_mtime = int(metadata['original_mtime'])
if original_mtime:
os.utime(local_path, (original_mtime, original_mtime))
except Exception:
pass
def list_profiles(self) -> Dict:
"""List available Zen browser profiles"""
profiles = {}
if self.zen_roaming_path:
profiles.update(self._list_profiles_from_path(self.zen_roaming_path, "roaming"))
else:
logger.error("Roaming path not configured")
return profiles
def _list_profiles_from_path(self, zen_path: Path, path_type: str) -> Dict:
"""List profiles from a specific path"""
profiles = {}
profiles_ini = zen_path / "profiles.ini"
if not profiles_ini.exists():
logger.warning(f"profiles.ini not found in {zen_path}")
return profiles
try:
config_parser = configparser.ConfigParser()
config_parser.read(profiles_ini)
for section in config_parser.sections():
if section.startswith('Profile'):
name = config_parser.get(section, 'Name', fallback='Unknown')
path = config_parser.get(section, 'Path', fallback='')
is_default = config_parser.getboolean(section, 'Default', fallback=False)
store_id = config_parser.get(section, 'StoreID', fallback='')
profile_path = zen_path / 'Profiles' / path if path else None
profiles[section] = {
'name': name,
'path': path,
'is_default': is_default,
'store_id': store_id,
'full_path': profile_path,
'path_type': path_type,
'base_path': zen_path
}
except Exception as e:
logger.error(f"Error reading profiles.ini from {zen_path}: {e}")
return profiles
def get_profile_info(self) -> Dict:
"""Get comprehensive profile information"""
info = {
'system_type': 'dual-path',
'paths': {},
'profiles': {},
'profile_groups': {}
}
info['paths'] = {
'roaming': str(self.zen_roaming_path) if self.zen_roaming_path else 'Not configured',
'local': str(self.zen_local_path) if self.zen_local_path else 'Not configured',
'roaming_exists': self.zen_roaming_path.exists() if self.zen_roaming_path else False,
'local_exists': self.zen_local_path.exists() if self.zen_local_path else False
}
info['profiles'] = self.list_profiles()
if self.zen_roaming_path:
profile_groups_dir = self.zen_roaming_path / "Profile Groups"
if profile_groups_dir.exists():
info['profile_groups']['exists'] = True
info['profile_groups']['path'] = str(profile_groups_dir)
db_files = list(profile_groups_dir.glob("*.sqlite"))
info['profile_groups']['databases'] = [f.name for f in db_files]
else:
info['profile_groups']['exists'] = False
return info

43
utils.py Normal file
View File

@@ -0,0 +1,43 @@
import hashlib
from pathlib import Path
import logging
logger = logging.getLogger(__name__)
def calculate_file_hash(file_path: Path, algorithm: str = 'md5') -> str:
"""Calculate hash of a file"""
if algorithm == 'md5':
hash_obj = hashlib.md5()
elif algorithm == 'sha256':
hash_obj = hashlib.sha256()
else:
raise ValueError(f"Unsupported hash algorithm: {algorithm}")
try:
with open(file_path, 'rb') as f:
for chunk in iter(lambda: f.read(8192), b""):
hash_obj.update(chunk)
return hash_obj.hexdigest()
except (OSError, IOError) as e:
logger.error(f"Error calculating hash for {file_path}: {e}")
return ""
def calculate_data_hash(data: bytes, algorithm: str = 'md5') -> str:
"""Calculate hash of data bytes"""
if algorithm == 'md5':
hash_obj = hashlib.md5()
elif algorithm == 'sha256':
hash_obj = hashlib.sha256()
else:
raise ValueError(f"Unsupported hash algorithm: {algorithm}")
hash_obj.update(data)
return hash_obj.hexdigest()
def format_size(size_bytes: int) -> str:
"""Format file size in human readable format"""
for unit in ['B', 'KB', 'MB', 'GB']:
if size_bytes < 1024.0:
return f"{size_bytes:.1f}{unit}"
size_bytes /= 1024.0
return f"{size_bytes:.1f}TB"

16
zensync.py Normal file
View File

@@ -0,0 +1,16 @@
#!/usr/bin/env python3
import logging
from cli import run_cli
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
def main():
"""Main entry point"""
run_cli()
if __name__ == "__main__":
main()