-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathfirst_v2.py
More file actions
133 lines (109 loc) · 3.88 KB
/
Copy pathfirst_v2.py
File metadata and controls
133 lines (109 loc) · 3.88 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import csv
import os
from pymongo import MongoClient
from decimal import Decimal, ROUND_HALF_UP, getcontext
from bson.decimal128 import Decimal128
from datetime import datetime
# Setup
client = MongoClient('mongodb://localhost:27017/')
db = client['theratesapi']
collection = db['currency']
collection.drop()
getcontext().prec = 28
DECIMAL_PLACES = Decimal('0.000001')
def parse_date(date_str):
"""Simple date parser"""
try:
return datetime.strptime(date_str.strip(), '%Y-%m-%d').strftime('%Y-%m-%d')
except ValueError:
return None
def clean_rates(row):
"""Extract valid currency rates from CSV row"""
rates = {}
for key, value in row.items():
if key == 'Date' or not value or value == 'N/A':
continue
try:
rate = Decimal(value.strip())
if rate > 0:
rates[key] = rate
except:
continue
return rates
def to_decimal128(value):
"""Convert to MongoDB Decimal128"""
return Decimal128(Decimal(str(value)).quantize(DECIMAL_PLACES, rounding=ROUND_HALF_UP))
def create_base_rates(date, base_currency, rates):
"""Create rates document for given base currency"""
if base_currency not in rates:
return None
base_rate = rates[base_currency]
new_rates = {}
# Add EUR rate (only if base is not EUR)
if base_currency != 'EUR':
new_rates['EUR'] = to_decimal128(Decimal('1') / base_rate)
# Add other currency rates
for currency, rate in rates.items():
if currency != base_currency:
new_rates[currency] = to_decimal128(rate / base_rate)
# Fix: Don't return empty rates document
if not new_rates:
return None
return {
'date': date,
'base': base_currency,
'rates': new_rates
}
# Main processing
csv_path = '../eurofxref-hist.csv' if os.path.exists('../eurofxref-hist.csv') else './eurofxref-hist.csv'
try:
# Create unique index to prevent duplicates
collection.create_index([("date", 1), ("base", 1)], unique=True)
with open(csv_path, 'r') as file:
reader = csv.DictReader(file)
batch = []
processed = 0
for row in reader:
# Parse date
date = parse_date(row.get('Date', ''))
if not date:
continue
# Clean rates
rates = clean_rates(row)
if not rates:
continue
# Create EUR base document
eur_doc = {
'date': date,
'base': 'EUR',
'rates': {k: to_decimal128(v) for k, v in rates.items()}
}
batch.append(eur_doc)
# Create documents for other base currencies
for currency in rates:
doc = create_base_rates(date, currency, rates)
if doc:
batch.append(doc)
# Insert in batches with duplicate handling
if len(batch) >= 1000:
try:
collection.insert_many(batch, ordered=False)
print(f"Inserted batch of {len(batch)} documents")
except Exception as e:
print(f"Batch insert warning (likely duplicates): {e}")
batch = []
processed += 1
if processed % 50 == 0:
print(f"Processed {processed} dates...")
# Insert remaining documents
if batch:
try:
collection.insert_many(batch, ordered=False)
print(f"Inserted final batch of {len(batch)} documents")
except Exception as e:
print(f"Final batch insert warning: {e}")
print(f"Complete! Processed {processed} dates")
except Exception as e:
print(f"Error: {e}")
finally:
client.close()