-
Notifications
You must be signed in to change notification settings - Fork 273
Expand file tree
/
Copy pathcrypto.py
More file actions
378 lines (289 loc) · 10.2 KB
/
crypto.py
File metadata and controls
378 lines (289 loc) · 10.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
"""
STEGOSAURUS WRECKS - Cryptography Module
AES encryption for steganographic payloads
"""
import os
import hashlib
import secrets
from typing import Tuple, Optional
from dataclasses import dataclass
# Try to import cryptography library, fall back to basic XOR if not available
HAS_CRYPTO = False
try:
# Pre-check: verify cryptography's native bindings work.
# Some systems have a broken cryptography install where the Rust
# bindings crash with a pyo3 panic that Python can't catch.
import subprocess as _sp
_probe = _sp.run(
['python3', '-c', 'from cryptography.exceptions import InvalidSignature'],
capture_output=True, timeout=5
)
if _probe.returncode == 0:
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives import padding
from cryptography.hazmat.backends import default_backend
HAS_CRYPTO = True
except Exception:
HAS_CRYPTO = False
@dataclass
class EncryptedPayload:
"""Container for encrypted data with metadata"""
ciphertext: bytes
iv: bytes
salt: bytes
method: str # 'aes-256-cbc', 'aes-256-gcm', 'xor'
def derive_key(password: str, salt: bytes, key_length: int = 32) -> bytes:
"""
Derive encryption key from password using PBKDF2
Args:
password: User password
salt: Random salt (should be stored with ciphertext)
key_length: Desired key length in bytes (32 for AES-256)
Returns:
Derived key bytes
"""
return hashlib.pbkdf2_hmac(
'sha256',
password.encode('utf-8'),
salt,
iterations=600000,
dklen=key_length
)
def generate_salt(length: int = 16) -> bytes:
"""Generate cryptographically secure random salt"""
return secrets.token_bytes(length)
def generate_iv(length: int = 16) -> bytes:
"""Generate cryptographically secure random IV"""
return secrets.token_bytes(length)
# ============== AES Encryption (requires cryptography library) ==============
def encrypt_aes_cbc(data: bytes, password: str) -> EncryptedPayload:
"""
Encrypt data using AES-256-CBC
Args:
data: Plaintext bytes
password: Encryption password
Returns:
EncryptedPayload with ciphertext, IV, and salt
"""
if not HAS_CRYPTO:
raise RuntimeError("cryptography library not installed. Install with: pip install cryptography")
salt = generate_salt()
iv = generate_iv()
key = derive_key(password, salt)
# Pad data to block size
padder = padding.PKCS7(128).padder()
padded_data = padder.update(data) + padder.finalize()
# Encrypt
cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=default_backend())
encryptor = cipher.encryptor()
ciphertext = encryptor.update(padded_data) + encryptor.finalize()
return EncryptedPayload(
ciphertext=ciphertext,
iv=iv,
salt=salt,
method='aes-256-cbc'
)
def decrypt_aes_cbc(payload: EncryptedPayload, password: str) -> bytes:
"""
Decrypt AES-256-CBC encrypted data
Args:
payload: EncryptedPayload from encrypt_aes_cbc
password: Decryption password
Returns:
Decrypted plaintext bytes
"""
if not HAS_CRYPTO:
raise RuntimeError("cryptography library not installed. Install with: pip install cryptography")
key = derive_key(password, payload.salt)
# Decrypt
cipher = Cipher(algorithms.AES(key), modes.CBC(payload.iv), backend=default_backend())
decryptor = cipher.decryptor()
padded_data = decryptor.update(payload.ciphertext) + decryptor.finalize()
# Unpad
unpadder = padding.PKCS7(128).unpadder()
data = unpadder.update(padded_data) + unpadder.finalize()
return data
def encrypt_aes_gcm(data: bytes, password: str) -> EncryptedPayload:
"""
Encrypt data using AES-256-GCM (authenticated encryption)
Args:
data: Plaintext bytes
password: Encryption password
Returns:
EncryptedPayload with ciphertext (includes auth tag), IV, and salt
"""
if not HAS_CRYPTO:
raise RuntimeError("cryptography library not installed. Install with: pip install cryptography")
salt = generate_salt()
iv = generate_iv(12) # GCM uses 12-byte IV
key = derive_key(password, salt)
cipher = Cipher(algorithms.AES(key), modes.GCM(iv), backend=default_backend())
encryptor = cipher.encryptor()
ciphertext = encryptor.update(data) + encryptor.finalize()
# Append auth tag to ciphertext
ciphertext_with_tag = ciphertext + encryptor.tag
return EncryptedPayload(
ciphertext=ciphertext_with_tag,
iv=iv,
salt=salt,
method='aes-256-gcm'
)
def decrypt_aes_gcm(payload: EncryptedPayload, password: str) -> bytes:
"""
Decrypt AES-256-GCM encrypted data
Args:
payload: EncryptedPayload from encrypt_aes_gcm
password: Decryption password
Returns:
Decrypted plaintext bytes
"""
if not HAS_CRYPTO:
raise RuntimeError("cryptography library not installed. Install with: pip install cryptography")
key = derive_key(password, payload.salt)
# Extract auth tag (last 16 bytes)
ciphertext = payload.ciphertext[:-16]
tag = payload.ciphertext[-16:]
cipher = Cipher(algorithms.AES(key), modes.GCM(payload.iv, tag), backend=default_backend())
decryptor = cipher.decryptor()
data = decryptor.update(ciphertext) + decryptor.finalize()
return data
# ============== XOR Encryption (fallback, no dependencies) ==============
def encrypt_xor(data: bytes, password: str) -> EncryptedPayload:
"""
Simple XOR encryption (fallback when cryptography not available)
NOT CRYPTOGRAPHICALLY SECURE - use only as fallback
Args:
data: Plaintext bytes
password: Encryption password
Returns:
EncryptedPayload with XOR'd ciphertext
"""
salt = generate_salt()
key = derive_key(password, salt, key_length=len(data))
# Extend key to match data length using key derivation
extended_key = b''
counter = 0
while len(extended_key) < len(data):
extended_key += hashlib.sha256(key + counter.to_bytes(4, 'big')).digest()
counter += 1
extended_key = extended_key[:len(data)]
ciphertext = bytes(a ^ b for a, b in zip(data, extended_key))
return EncryptedPayload(
ciphertext=ciphertext,
iv=b'', # XOR doesn't use IV
salt=salt,
method='xor'
)
def decrypt_xor(payload: EncryptedPayload, password: str) -> bytes:
"""
Decrypt XOR encrypted data
Args:
payload: EncryptedPayload from encrypt_xor
password: Decryption password
Returns:
Decrypted plaintext bytes
"""
key = derive_key(password, payload.salt, key_length=len(payload.ciphertext))
# Extend key to match data length
extended_key = b''
counter = 0
while len(extended_key) < len(payload.ciphertext):
extended_key += hashlib.sha256(key + counter.to_bytes(4, 'big')).digest()
counter += 1
extended_key = extended_key[:len(payload.ciphertext)]
plaintext = bytes(a ^ b for a, b in zip(payload.ciphertext, extended_key))
return plaintext
# ============== Unified Interface ==============
def encrypt(data: bytes, password: str, method: str = 'auto') -> bytes:
"""
Encrypt data with specified method
Args:
data: Plaintext bytes
password: Encryption password
method: 'aes-cbc', 'aes-gcm', 'xor', or 'auto'
Returns:
Packed encrypted payload (can be embedded directly)
"""
if method == 'auto':
method = 'aes-gcm' if HAS_CRYPTO else 'xor'
if method == 'aes-cbc':
payload = encrypt_aes_cbc(data, password)
elif method == 'aes-gcm':
payload = encrypt_aes_gcm(data, password)
elif method == 'xor':
payload = encrypt_xor(data, password)
else:
raise ValueError(f"Unknown encryption method: {method}")
return pack_payload(payload)
def decrypt(packed_data: bytes, password: str) -> bytes:
"""
Decrypt packed encrypted payload
Args:
packed_data: Packed payload from encrypt()
password: Decryption password
Returns:
Decrypted plaintext bytes
"""
payload = unpack_payload(packed_data)
if payload.method == 'aes-256-cbc':
return decrypt_aes_cbc(payload, password)
elif payload.method == 'aes-256-gcm':
return decrypt_aes_gcm(payload, password)
elif payload.method == 'xor':
return decrypt_xor(payload, password)
else:
raise ValueError(f"Unknown encryption method: {payload.method}")
def pack_payload(payload: EncryptedPayload) -> bytes:
"""
Pack EncryptedPayload into bytes for embedding
Format:
[1 byte: method ID][1 byte: salt len][salt][1 byte: iv len][iv][ciphertext]
"""
method_ids = {'aes-256-cbc': 1, 'aes-256-gcm': 2, 'xor': 3}
method_id = method_ids.get(payload.method, 0)
packed = bytes([method_id])
packed += bytes([len(payload.salt)]) + payload.salt
packed += bytes([len(payload.iv)]) + payload.iv
packed += payload.ciphertext
return packed
def unpack_payload(data: bytes) -> EncryptedPayload:
"""
Unpack bytes into EncryptedPayload
Args:
data: Packed payload bytes
Returns:
EncryptedPayload object
"""
method_names = {1: 'aes-256-cbc', 2: 'aes-256-gcm', 3: 'xor'}
idx = 0
method_id = data[idx]
method = method_names.get(method_id, 'unknown')
idx += 1
salt_len = data[idx]
idx += 1
salt = data[idx:idx + salt_len]
idx += salt_len
iv_len = data[idx]
idx += 1
iv = data[idx:idx + iv_len]
idx += iv_len
ciphertext = data[idx:]
return EncryptedPayload(
ciphertext=ciphertext,
iv=iv,
salt=salt,
method=method
)
def get_available_methods() -> list:
"""Get list of available encryption methods"""
methods = ['xor'] # Always available
if HAS_CRYPTO:
methods = ['aes-gcm', 'aes-cbc'] + methods
return methods
def crypto_status() -> dict:
"""Get cryptography library status"""
return {
"cryptography_available": HAS_CRYPTO,
"available_methods": get_available_methods(),
"recommended": 'aes-gcm' if HAS_CRYPTO else 'xor'
}