Using OpenSearch match or more_like_this to find duplicate news in JSON, tagged with "Header" and "Body"

Hi.

I am running OpenSearch together with a python script. The python script is supposed to find related news content in JSON news messages, and score these as percentage of probability of being a duplicate news message. (100% if the news are about exactly the same topic and content down to 0% if the news message is totally different.)

I first used SequenceMatcher which gave good results, but have a very poor performance. I then moved to OpenSearch to help find these duplicate records, but am unable to find any duplication, even if my header and body is exactly the same.

Any help would be appreciated. I added some test records and my current script below.
(Sorry, I am not a developer, the script was generated by ChatGPT)

====================
TestData

{“RIC”:“MRN_STORY”, “Start”:“2024-12-28T00:00:00.000Z”, “End”:“2024-12-28T23:59:59.999Z”, “Created”:“2024-12-29T00:25:31.403Z”, “MajorVersion”: “2”, “MinorVersion”:“14”, “Items”:[
{“guid”: “TOPEQE____1710306efqwsORmdEsmyqUeDc3VEP3TAE1koCfI0JoeS”, “timestamps”: [{“name”: “recorded”, “source”: “AMER”, “timestamp”: “2024-12-28T00:31:18.414Z”}, {“name”: “recorded”, “source”: “APAC”, “timestamp”: “2024-12-28T00:31:18.473Z”}, {“name”: “recorded”, “source”: “EMEA”, “timestamp”: “2024-12-28T00:31:18.350Z”}], “data”: {“altId”: “nTOPEQE”, “audiences”: [“NP:C”, “NP:GRO”, “NP:MTL”, “NP:SOF”, “NP:E”, “NP:O”, “NP:PRL”, “NP:U”, “NP:NAW”, “NP:UKI”, “NP:EMK”, “NP:OIL”, “NP:PSC”], “body”: “> YEARENDER-Big Oil backtracks on renewables push as climate agenda falters [nL8N3NB12J]\n> BREAKINGVIEWS-Buyout barons will find ways to douse fire sale [nL5N3MX1YU]\n> Telecom Italia seeks Vivendi’s views on simpler structure, sources say [nL8N3NG0CZ]\n> BioNTech enters settlement with US agency, UPenn over COVID vaccine royalties [nL4N3NS0L5]\n> Taiwan blocks Uber’s $950 mln Foodpanda deal, citing anti-competition concerns [nL1N3NQ02L]\n> BP says Whiting, Indiana refinery operations normal following leak [nL1N3NS0KY]\n> Court orders recall of Signify lighting products over patents, Seoul Semiconductor[nL1N3NR063]\n> Novartis must face claims it paid kickbacks to promote MS drug, US appeals court [nL1N3NS0I5]\n> Russia’s VTB buys nationalised agriculture firm [nL1N3NR05F]\n> Stellantis unit to pay $4.2 million to resolve California emissions probe [nL1N3NP02F]\n> Turkish drone maker Baykar buys Italy’s Piaggio Aerospace [nL8N3NS08Y]\n> First LNG cargo from Venture Global’s Plaquemines plant heads to Germany [nL1N3NR08G]\n> Chile files environmental charges against Anglo American copper mine [nL1N3NO0P3]\n \n…\nFor a richer, multimedia version of Top News visit:\n * Workspace/Eikon: cpurl:##apps.cp./apps/topnews#/tn/SP_PAGE_003\n * Thomson ONE: visit topnews.thomsonone-com/topnews \n…\nFor the latest top breaking news across all markets, click: [NEWS1]\n…\n \nTOP NEWS SUMMARIES ON OTHER SUBJECTS\n Front Page [TOP/NEWS] Central Banks & Global Economy [TOP/CEN]\n World News [TOP/G] Global Markets [TOP/GLOMKT]\n Foreign Exchange [TOP/FRX] Fixed Income [TOP/DBT] \n Emerging Markets [TOP/EMRG] Financial Services [TOP/FIN] \n Investment Banking [TOP/DEALS] Sustainable Finance [TOP/SUSFIN]\n IFR Markets [TOP/NOW2] U.S. Companies [TOP/EQU]\n European Companies [TOP/EQE] Asian Companies [TOP/EQA]\n Commodities [TOP/CE] Energy [TOP/O] \n Metals [TOP/MTL] Agricultural Commodities [TOP/GRO] \n Global Gas Power & Gas [TOP/ELE] Digital Finance & Crypto [TOP/DIGFIN] \n Healthcare & Pharma [TOP/HEALTH] Heavy Industry & Transport [TOP/INDTR]\n Consumer & Retail [TOP/RETAIL] Technology, Media & Telecoms [TOP/TMT] \n Sport [TOP/SPO] Lifestyle & Entertainment [TOP/LIFE] \n Regulation & Compliance [TOP/REG] Reuters BREAKINGVIEWS [BRV]\n Top News Directory [TOP/] \n \nREGIONAL TOP NEWS PAGES\n United States [TOP/US] Australia & New Zealand [TOP/ANZ]\n Greater China [TOP/CHINA] India & South Asia [TOP/INDIA]\n Europe & Russia [TOP/EURRU] Southeast Asia [TOP/SEASIA]\n Canada [TOP/CAN] United Kingdom [TOP/UK]\n Latin America [TOP/LATAM] Deutschland (in German) [TOP/DE]\n Middle East & Africa [TOP/MEAF] France (in French) [TOP/FRA]\n Japan & the Koreas [TOP/JPKOR] Brazil (in Portuguese) [TOP/BR]\n \n…\n NOTE: Access to some items may depend on subscription level\n…\n UP-TO-THE-MINUTE HEADLINES \n Economic news [M] Economic indicators [M-ECI]\n G7 live indicators Central bank news [M-CEN]\n U.S. company news [U] European companies [E-EUROPE]\n Asian companies [E-ASIA] Mergers & acquisitions [E-MRG]\n \n LIVE PRICES & DATA\n World Stocks <0#.INDEX> S&P 500 <.SPX>\n Dow Jones industrials <.DJI> Nasdaq <.IXIC>\n FTSEurofirst 300 <.FTEU3> Nikkei <.N225>\n FTSE 100 <.FTSE> Currency rates <EFX=> <NFX=>\n Debt <0#USBMK=> Gold <XAU=>\n Brent crude U.S. crude \n \n GUIDES ON HOW TO FIND THE DATA & NEWS INFORMATION YOU NEED:\n | | | <PHONE/HELP> | <USER/HELP> | <GB/EQUITY>|\n | | | | | |\n | | | |\n…\nPage Editor: Bangalore-pages@thomsonreuters-com\n…\n”, “firstCreated”: “2017-10-30T00:53:26.000Z”, “headline”: "TOP NEWS European Companies ", “id”: “TOPEQE____1710306efqwsORmdEsmyqUeDc3VEP3TAE1koCfI0JoeS”, “instancesOf”: [“RR:1304”, “NI:TOP/EQE”], “language”: “en”, “messageType”: 6, “mimeType”: “text/x-plain-fixed”, “provider”: “NS:RTRS”, “pubStatus”: “stat:usable”, “subjects”: [“A:1”, “G:3”, “G:A”, “G:D3”, “M:1QD”, “M:2CM”, “M:2CN”, “M:2CP”, “M:2CQ”, “M:2CR”, “M:2CS”, “M:2CT”, “M:2CU”, “M:2CV”, “M:2CW”, “M:2CX”, “M:2CY”, “M:2CZ”, “M:2D0”, “M:2D1”, “M:3Z”, “M:MR”, “M:NP”, “M:Z”, “MCC:OEC”, “MCCL:OEC”, “N2:CMPNY”, “N2:EMEA”, “N2:EUROP”, “N2:LEN”, “N2:REP”, “N2:STX”, “N2:TOPCMB”, “N2:TOPNP”, “N2:WEU”, “N2:XREF”], “takeSequence”: 105, “urgency”: 3, “versionCreated”: “2024-12-28T00:31:18.000Z”}},
{“guid”: “TEST1_____1710306efqwsORmdEsmyqUeDc3VEP3TAE1koCfI0JoeS”, “timestamps”: [{“name”: “recorded”, “source”: “AMER”, “timestamp”: “2024-12-28T00:31:18.414Z”}, {“name”: “recorded”, “source”: “APAC”, “timestamp”: “2024-12-28T00:31:18.473Z”}, {“name”: “recorded”, “source”: “EMEA”, “timestamp”: “2024-12-28T00:31:18.350Z”}], “data”: {“altId”: “nTOPEQE”, “audiences”: [“NP:C”, “NP:GRO”, “NP:MTL”, “NP:SOF”, “NP:E”, “NP:O”, “NP:PRL”, “NP:U”, “NP:NAW”, “NP:UKI”, “NP:EMK”, “NP:OIL”, “NP:PSC”], “body”: “> YEARENDER-Big Oil backtracks on renewables push as climate agenda falters [nL8N3NB12J]\n> BREAKINGVIEWS-Buyout barons will find ways to douse fire sale [nL5N3MX1YU]\n> Telecom Italia seeks Vivendi’s views on simpler structure, sources say [nL8N3NG0CZ]\n> BioNTech enters settlement with US agency, UPenn over COVID vaccine royalties [nL4N3NS0L5]\n> Taiwan blocks Uber’s $950 mln Foodpanda deal, citing anti-competition concerns [nL1N3NQ02L]\n> BP says Whiting, Indiana refinery operations normal following leak [nL1N3NS0KY]\n> Court orders recall of Signify lighting products over patents, Seoul Semiconductor[nL1N3NR063]\n> Novartis must face claims it paid kickbacks to promote MS drug, US appeals court [nL1N3NS0I5]\n> Russia’s VTB buys nationalised agriculture firm [nL1N3NR05F]\n> Stellantis unit to pay $4.2 million to resolve California emissions probe [nL1N3NP02F]\n> Turkish drone maker Baykar buys Italy’s Piaggio Aerospace [nL8N3NS08Y]\n> First LNG cargo from Venture Global’s Plaquemines plant heads to Germany [nL1N3NR08G]\n> Chile files environmental charges against Anglo American copper mine [nL1N3NO0P3]\n \n…\nFor a richer, multimedia version of Top News visit:\n * Workspace/Eikon: cpurl:##apps.cp./apps/topnews#/tn/SP_PAGE_003\n * Thomson ONE: visit topnews.thomsonone-com/topnews \n…\nFor the latest top breaking news across all markets, click: [NEWS1]\n…\n \nTOP NEWS SUMMARIES ON OTHER SUBJECTS\n Front Page [TOP/NEWS] Central Banks & Global Economy [TOP/CEN]\n World News [TOP/G] Global Markets [TOP/GLOMKT]\n Foreign Exchange [TOP/FRX] Fixed Income [TOP/DBT] \n Emerging Markets [TOP/EMRG] Financial Services [TOP/FIN] \n Investment Banking [TOP/DEALS] Sustainable Finance [TOP/SUSFIN]\n IFR Markets [TOP/NOW2] U.S. Companies [TOP/EQU]\n European Companies [TOP/EQE] Asian Companies [TOP/EQA]\n Commodities [TOP/CE] Energy [TOP/O] \n Metals [TOP/MTL] Agricultural Commodities [TOP/GRO] \n Global Gas Power & Gas [TOP/ELE] Digital Finance & Crypto [TOP/DIGFIN] \n Healthcare & Pharma [TOP/HEALTH] Heavy Industry & Transport [TOP/INDTR]\n Consumer & Retail [TOP/RETAIL] Technology, Media & Telecoms [TOP/TMT] \n Sport [TOP/SPO] Lifestyle & Entertainment [TOP/LIFE] \n Regulation & Compliance [TOP/REG] Reuters BREAKINGVIEWS [BRV]\n Top News Directory [TOP/] \n \nREGIONAL TOP NEWS PAGES\n United States [TOP/US] Australia & New Zealand [TOP/ANZ]\n Greater China [TOP/CHINA] India & South Asia [TOP/INDIA]\n Europe & Russia [TOP/EURRU] Southeast Asia [TOP/SEASIA]\n Canada [TOP/CAN] United Kingdom [TOP/UK]\n Latin America [TOP/LATAM] Deutschland (in German) [TOP/DE]\n Middle East & Africa [TOP/MEAF] France (in French) [TOP/FRA]\n Japan & the Koreas [TOP/JPKOR] Brazil (in Portuguese) [TOP/BR]\n \n…\n NOTE: Access to some items may depend on subscription level\n…\n UP-TO-THE-MINUTE HEADLINES \n Economic news [M] Economic indicators [M-ECI]\n G7 live indicators Central bank news [M-CEN]\n U.S. company news [U] European companies [E-EUROPE]\n Asian companies [E-ASIA] Mergers & acquisitions [E-MRG]\n \n LIVE PRICES & DATA\n World Stocks <0#.INDEX> S&P 500 <.SPX>\n Dow Jones industrials <.DJI> Nasdaq <.IXIC>\n FTSEurofirst 300 <.FTEU3> Nikkei <.N225>\n FTSE 100 <.FTSE> Currency rates <EFX=> <NFX=>\n Debt <0#USBMK=> Gold <XAU=>\n Brent crude U.S. crude \n \n GUIDES ON HOW TO FIND THE DATA & NEWS INFORMATION YOU NEED:\n | | | <PHONE/HELP> | <USER/HELP> | <GB/EQUITY>|\n | | | | | |\n | | | |\n…\nPage Editor: Bangalore-pages@thomsonreuters-com\n…\n”, “firstCreated”: “2017-10-30T00:53:26.000Z”, “headline”: "TOP NEWS European Companies “, “id”: “TOPEQE____1710306efqwsORmdEsmyqUeDc3VEP3TAE1koCfI0JoeS”, “instancesOf”: [“RR:1304”, “NI:TOP/EQE”], “language”: “en”, “messageType”: 6, “mimeType”: “text/x-plain-fixed”, “provider”: “NS:RTRS”, “pubStatus”: “stat:usable”, “subjects”: [“A:1”, “G:3”, “G:A”, “G:D3”, “M:1QD”, “M:2CM”, “M:2CN”, “M:2CP”, “M:2CQ”, “M:2CR”, “M:2CS”, “M:2CT”, “M:2CU”, “M:2CV”, “M:2CW”, “M:2CX”, “M:2CY”, “M:2CZ”, “M:2D0”, “M:2D1”, “M:3Z”, “M:MR”, “M:NP”, “M:Z”, “MCC:OEC”, “MCCL:OEC”, “N2:CMPNY”, “N2:EMEA”, “N2:EUROP”, “N2:LEN”, “N2:REP”, “N2:STX”, “N2:TOPCMB”, “N2:TOPNP”, “N2:WEU”, “N2:XREF”], “takeSequence”: 105, “urgency”: 3, “versionCreated”: “2024-12-28T00:31:18.000Z”}},
{“guid”: “L1N3NT03S_2412282a7rLq37pgG4FfbUXuoH8lKicVOtCAXQ4z9+Jm”, “timestamps”: [{“name”: “recorded”, “source”: “AMER”, “timestamp”: “2024-12-28T23:59:56.415Z”}, {“name”: “recorded”, “source”: “APAC”, “timestamp”: “2024-12-28T23:59:56.483Z”}, {“name”: “recorded”, “source”: “EMEA”, “timestamp”: “2024-12-28T23:59:56.436Z”}], “data”: {“altId”: “nL1N3NT03S”, “audiences”: [“NP:G”, “NP:USDN”, “NP:PSC”, “NP:RNP”, “NP:PGE”, “NP:RAST”, “NP:YDB”], “body”: " By Rich McKay\n Dec 28 (Reuters) - Severe weather disrupted holiday\ntravel on Saturday across the U.S. with deadly tornadoes in the\nsoutheast and heavy snow and wind on the west coast, delaying or\ncanceling thousands of flights across the country.\n More than 7,000 flights in the U.S. were delayed on\nSaturday, according to the tracking site FlightAware, and more\nthan 200 were canceled.\n About a third of the flights were delayed at\nHartsfield-Jackson International Airport in Atlanta, according\nto FlightAware, and nearly half of the flights originating from\nDallas/Fort Worth International Airport and George Bush\nIntercontinental Airport in Houston were delayed.\n At least 10 tornadoes touched down in the southeastern U.S.\nstates of Texas, Louisiana and Mississippi on Saturday, leaving\none person dead near Houston, the National Weather Service and\nlocal law enforcement said.\n "Those numbers will probably go up," said forecaster Aaron\nGleason, with the National Weather Service’s Storm Prediction\nCenter.\n The one death and four injuries were reported Saturday in\nBrazoria County, Texas, about 45 miles south of Houston,\naccording to the Brazoria County Sheriff’s Office.\n Officials say that many homes and schools were severely\ndamaged or destroyed. Images on social media show scattered\nruins of homes and snapped trees and utility poles strewn across\nstreets and lawns.\n Out west, high winds, with gusts up to 150 mph in the high\nelevations of the Tahoe Basin in California and 50 mph at lower\nelevations hit the area this weekend and with heavy rainfall\nexpected from San Francisco to Portland, Oregon, forecasters\nsaid.\n Between four to six inches of rain are expected to fall\nbefore New Year’s Eve and up to 3 feet of snow in Lake Tahoe,\nforecasters said.\n “Damaging winds could blow down trees and power lines,” the\nWeather Service warned. “Widespread power outages are possible.”\n \n\n (Reporting by Rich McKay in Atlanta;editing by Diane Craft)\n ((rich.mckay@thomsonreuters-com;))”, “firstCreated”: “2024-12-28T23:59:56.000Z”, “headline”: “Thousands of flights delayed across the US, one dead in deadly tornado as storms hit holiday travelers”, “id”: “L1N3NT03S_2412282a7rLq37pgG4FfbUXuoH8lKicVOtCAXQ4z9+Jm”, “instancesOf”: , “language”: “en”, “messageType”: 2, “mimeType”: “text/plain”, “provider”: “NS:RTRS”, “pubStatus”: “stat:usable”, “subjects”: [“A:4”, “G:20”, “G:4”, “G:4Z”, “G:50”, “G:6J”, “G:9”, “M:1LY”, “M:1QD”, “M:2CM”, “M:2CN”, “M:2CP”, “M:2CQ”, “M:2CR”, “M:2CS”, “M:2CT”, “M:2CU”, “M:2CV”, “M:2CW”, “M:2CX”, “M:2CY”, “M:2CZ”, “M:2D0”, “M:2D1”, “M:E7”, “M:H”, “N2:AMERS”, “N2:COM”, “N2:DIS”, “N2:ENV”, “N2:GEN”, “N2:LEN”, “N2:NAMER”, “N2:PRCP”, “N2:US”, “N2:USALA”, “N2:USAMS”, “N2:USATX”, “N2:WEA”, “U:CX”, “U:I”], “takeSequence”: 1, “urgency”: 3, “versionCreated”: “2024-12-28T23:59:56.000Z”}}]}

==========================
Script

“”"
deduplicate_news_v5.8.py

This script processes JSON news records from .gz files, filters out translated messages
(“N2:TRN” in “subjects”), deduplicates remaining items, and writes outputs to categorized subfolders.

“”"

import os
import gzip
import json
import argparse
import multiprocessing
import uuid
from datetime import datetime
from pathlib import Path
from itertools import combinations
from difflib import SequenceMatcher
from tqdm import tqdm
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor, as_completed
from opensearchpy import OpenSearch

def log(msg):
print(f"[{datetime.now().isoformat()}] {msg}")

def get_opensearch_client():
return OpenSearch(
hosts=[{“host”: “localhost”, “port”: 9200}],
http_compress=True,
use_ssl=False, # We disabled security in opensearch.yml
verify_certs=False
)

def index_records(client, index_name, records):
if client.indices.exists(index=index_name):
print(f"[INFO] Index ‘{index_name}’ already exists. Deleting it.")
client.indices.delete(index=index_name)

print(f"[INFO] Creating index '{index_name}'")

client.indices.create(
    index=index_name,
    body={
        "settings": {
            "number_of_shards": 1,
            "number_of_replicas": 0
        },
        "mappings": {
            "properties": {
                "guid": { "type": "keyword" },
                "headline": { "type": "text" },
                "body": { "type": "text" },
                "firstCreated": { "type": "date" }
            }
        }
    }
)


for record in records:
    flat_doc = {
        "guid": record.get("guid"),
        "body": record.get("data", {}).get("body", ""),
        "headline": record.get("data", {}).get("headline", ""),
        "firstCreated": record.get("data", {}).get("firstCreated", "")
    }
    client.index(index=index_name, body=flat_doc)

def read_gz_json_file(filepath):
with gzip.open(filepath, ‘rt’, encoding=‘utf-8’) as f:
full_json = f.read()
try:
data = json.loads(full_json)
header = {k: v for k, v in data.items() if k != “Items”}
items = data.get(“Items”, )
return header, items
except json.JSONDecodeError as e:
print(f"Error reading JSON in file {filepath}: {e}")
return {},

def write_gz_json(filepath, data, wrap_in_items=False):
with gzip.open(filepath, ‘wt’, encoding=‘utf-8’) as f:
if wrap_in_items:
json.dump({“Items”: data}, f, ensure_ascii=False)
else:
for item in data:
f.write(json.dumps(item, ensure_ascii=False) + “\n”)

def ensure_dir(path):
os.makedirs(path, exist_ok=True)

def process_file(filepath, exclude_translations):
log(f"Processing file: {os.path.basename(filepath)}")
header, data = read_gz_json_file(filepath)
header_metadata = header # This was already separated out
items = data # Now just the list of records
excluded =
kept =

for record in items:
    subjects = record.get("data", {}).get("subjects", [])
    if exclude_translations and '"N2:TRN"' in json.dumps(subjects):
        excluded.append(record)
    else:
        kept.append(record)

return {
    "filename": os.path.basename(filepath),
    "header": header,
    "total_records": len(items),
    "excluded_translations": len(excluded),
    "processed_records": len(kept),
    "kept": kept,
    "excluded": excluded
}

def similarity(a, b):
return SequenceMatcher(None, a, b).ratio()

def find_duplicates(records, threshold=0.1, max_workers=None):
if max_workers is None:
max_workers = multiprocessing.cpu_count()
potential_duplicates =
for rec1, rec2 in tqdm(combinations(records, 2), total=(len(records) * (len(records) - 1)) // 2, desc=“Finding duplicates”):
text1 = rec1[“data”].get(“body”, “”)
text2 = rec2[“data”].get(“body”, “”)
score = similarity(text1, text2)
if score >= threshold:
potential_duplicates.append({
“guid1”: rec1[“guid”],
“guid2”: rec2[“guid”],
“score”: score
})
return potential_duplicates

def chunked_combinations(records, chunk_size=1000):
all_combos = list(combinations(records, 2))
for i in range(0, len(all_combos), chunk_size):
yield all_combos[i:i + chunk_size]

def process_chunk(chunk):
results =
for rec1, rec2 in chunk:
text1 = rec1[“data”].get(“body”, “”)
text2 = rec2[“data”].get(“body”, “”)
score = similarity(text1, text2)
if score >= 0.1: # We’ll filter early here
results.append({
“guid1”: rec1[“guid”],
“guid2”: rec2[“guid”],
“score”: score
})
return results

def find_duplicates_opensearch(client, records, threshold=0.7, index_name=“news-records”):
duplicates =
max_score_seen = 0.0
scored_matches =

for rec in tqdm(records, desc="Finding duplicates (OpenSearch)"):
# inside find_duplicates_opensearch()

    rec_body = rec["data"].get("body", "")
    rec_headline = rec["data"].get("headline", "")
    rec_created = rec["data"].get("firstCreated", "")

    query = {
        "query": {
            "bool": {
                "should": [
                    { "match": { "body": rec_body }},
                    { "match": { "headline": rec_headline }}
                ],
                "minimum_should_match": 1
            }
        },
        "size": 10
    }


    res = client.search(index=index_name, body=query)

    hits = res.get("hits", {}).get("hits", [])
    log(f"Checking GUID: {rec['guid']} -> {len(hits)} hits")

    for hit in hits:
        match_guid = hit["_source"].get("guid")
        if not match_guid or match_guid == rec["guid"]:
            continue

        text1 = rec_body
        text2 = hit["_source"].get("body", "")
        score = SequenceMatcher(None, text1, text2).ratio()

        if score > max_score_seen:
            max_score_seen = score

        scored_matches.append({
            "guid1": match_guid,
            "guid2": rec["guid"],
            "score": score
        })

if max_score_seen == 0:
    return []

# Normalize scores and apply threshold
for match in scored_matches:
    if match["score"] >= threshold:
        duplicates.append({
            "guid1": match["guid1"],
            "guid2": match["guid2"],
            "score": normalized_score
        })

log(f"[INFO] Max OpenSearch _score seen: {max_score_seen:.2f}")
return duplicates

def compute_probability_stats(potential_duplicates, dedup_threshold, best_matches):
probability_ranges = list(range(0, 100, 10)) # 0%, 10%, …, 90%
if dedup_threshold * 100 > 90:
probability_ranges.append(int(dedup_threshold * 100))

stats = []

# Build reverse mapping from guid2 to its best match score
guid_scores = {guid: round(info['score'] * 100, 2) for guid, info in best_matches.items()}

# Track the highest score for each duplicate record (guid2), across all found duplicates
max_score_per_guid = {}
for match in potential_duplicates:
    guid = match['guid2']
    score = match['score'] * 100
    if guid not in max_score_per_guid or score > max_score_per_guid[guid]:
        max_score_per_guid[guid] = score


for i, threshold in enumerate(probability_ranges):
    next_threshold = probability_ranges[i + 1] if i + 1 < len(probability_ranges) else 101

    # Count all matches in this range
    matched = [
        guid for guid, score in max_score_per_guid.items()
        if threshold <= score < next_threshold
    ]

    # Count how many of those were actually removed (based on threshold match score)
    removed = [
        guid for guid in matched
        if best_matches.get(guid, {}).get("score", 0) * 100 >= dedup_threshold * 100
    ]

    stats.append((threshold, len(matched), len(removed)))


return stats

def write_summary_stats(stats_list, output_path, dup_summary=None, threshold=None, start_time=None, end_time=None, processing_duration=None):
output_lines = [f"Statistics generated on: {datetime.now().strftime(‘%Y-%m-%d %H:%M:%S’)}“, “”]
output_lines = [
“Processing Performance Statistics”,
f” Processing Started : {start_time.strftime(‘%Y-%m-%d %H:%M:%S’)}“,
f” Processing Ended : {end_time.strftime(‘%Y-%m-%d %H:%M:%S’)}“,
f” Total Time Taken : {processing_duration}“,
“”,
f"Statistics generated on: {datetime.now().strftime(‘%Y-%m-%d %H:%M:%S’)}”,
“”
]
for stat in stats_list:
output_lines.append(f"File: {Path(stat[‘filename’]).stem}“)
output_lines.append(f” Total Records : {stat[‘total_records’]}“)
output_lines.append(f” Excluded Translations: {stat[‘excluded_translations’]}“)
output_lines.append(”")

if dup_summary:
    output_lines.append(f"Total number of records for processing		{dup_summary.get('total_records', 0)}")
    output_lines.append("Probability	Probable Duplicates	Removed")
    for p, dups, rem in dup_summary.get("probabilities", []):
        output_lines.append(f"> {p}%		{dups}			{rem}")
    output_lines.append("")
    output_lines.append(f"Total number of removed Records		{dup_summary.get('removed_total', 0)}")
    output_lines.append(f"Duplicate Removal Threshold: > {int(threshold * 100)}%")
    output_lines.append("")
    output_lines.append("Total number of records kept for processing")
    for fname, count in dup_summary.get("kept_per_file", {}).items():
        output_lines.append(f"File: {fname}		{count}")

with open(output_path, "w", encoding="utf-8") as f:
    f.write("\n".join(output_lines))

def main():
start_time = datetime.now()
parser = argparse.ArgumentParser()
parser.add_argument(“input_dir”, help=“Directory containing gzipped JSON news files”)
parser.add_argument(“–threshold”, type=float, default=0.95, help=“Duplicate similarity threshold (0–1)”)
parser.add_argument(“–exclude-translations”, action=“store_true”, help=‘Exclude translated messages (with “N2:TRN” in subjects)’)
args = parser.parse_args()

input_dir = args.input_dir
files = [os.path.join(input_dir, f) for f in os.listdir(input_dir) if f.endswith(".gz")]
stats = []
all_records = []

dedup_root = str(Path(input_dir).resolve().parent)
dedup_path = os.path.join(dedup_root, "deduplication")
excluded_path = os.path.join(dedup_path, "excluded")
stats_path = os.path.join(dedup_path, "stats")
deduped_path = os.path.join(dedup_path, "deduplicated")
ensure_dir(dedup_path)
ensure_dir(excluded_path)
ensure_dir(stats_path)
ensure_dir(deduped_path)

for fpath in files:
    result = process_file(fpath, args.exclude_translations)
    stats.append(result)
    
    kept_records = result["kept"]


    base = Path(fpath).stem
    #write_gz_json(os.path.join(deduped_path, base + ".txt.gz"), result["kept"])
    write_gz_json(os.path.join(excluded_path, base.replace(".txt", "") + ".excluded.txt.gz"), result["excluded"])

    # Attach filename for stats tracking
    for rec in result["kept"]:
        rec["filename"] = result["filename"]
    all_records.extend(result["kept"])

client = get_opensearch_client()

index_name = f"news-records-{uuid.uuid4().hex[:6]}"
index_records(client, index_name, all_records)

potential_duplicates = find_duplicates_opensearch(client, all_records, threshold=args.threshold, index_name=index_name)


# Step: Identify records to remove
# Determine best match for each duplicate candidate
best_matches = {}
kept_guids = set()

Sort by highest score to ensure stronger matches are evaluated first

sorted_matches = sorted(
    [m for m in potential_duplicates if m["score"] >= args.threshold],
    key=lambda x: -x["score"]
)

already_mapped = set()

for match in sorted_matches:
    g1 = match["guid1"]
    g2 = match["guid2"]

    # Skip if either record has already been mapped (to prevent mutual removal)
    if g1 in already_mapped or g2 in already_mapped:
        continue

    best_matches[g2] = {"guid": g1, "score": match["score"]}
    already_mapped.add(g2)
    already_mapped.add(g1)  # Prevent g1 from being removed later

removed_records = []
kept_records = []

for rec in all_records:
    guid = rec["guid"]
    if guid in best_matches:
        rec["matched_guid"] = best_matches[guid]["guid"]
        rec["match_score"] = round(best_matches[guid]["score"] * 100, 2)
        removed_records.append(rec)
    else:
        kept_records.append(rec)

    # Re-group cleaned records AFTER deduplication filtering
kept_by_file = defaultdict(list)
for rec in kept_records:
    fname = Path(rec["filename"]).stem
    kept_by_file[fname].append(rec)

    
#write_gz_json(os.path.join(deduped_path, "deduplicated_records.json.gz"), removed_records)
# Group removed records by input file
removed_by_file = defaultdict(list)
for rec in removed_records:
    removed_by_file[Path(rec["filename"]).stem].append(rec)

for rec in removed_records:
    guid = rec["guid"]
    match = best_matches.get(guid, {})
    rec["match_info"] = {
        "filename": rec.get("filename"),  # Use existing filename from the record
        "matched_guid": match.get("guid"),
        "match_score": round(match.get("score", 0), 2)  # Already 0–1 scaled
    }

    # Now remove top-level duplicated keys
    rec.pop("matched_guid", None)
    rec.pop("match_score", None)
    rec.pop("filename", None)


for fname, records in removed_by_file.items():
    out_path = os.path.join(deduped_path, fname.replace(".txt", "") + ".deduplicated.txt.gz")
    write_gz_json(out_path, records)

    # Write deduplicated files back to original filenames
for stat in stats:
    fname = Path(stat["filename"]).stem
    header = stat.get("header")
    records = kept_by_file.get(fname, [])
    output_base = fname.replace(".txt", "") + ".Cleaned.txt.gz"
    output_file = os.path.join(input_dir, output_base)

    with gzip.open(output_file, 'wt', encoding='utf-8') as f:
        # Write the header separately
        f.write(json.dumps({**header, "Items": []}, ensure_ascii=False) + "\n")
        # Write each record on its own line
        for item in records:
            f.write(json.dumps(item, ensure_ascii=False) + "\n")



log(f"Total records removed due to deduplication: {len(removed_records)}")

probability_stats = compute_probability_stats(potential_duplicates, dedup_threshold=args.threshold, best_matches=best_matches)

dedup_summary = {
    "total_records": sum(s["processed_records"] for s in stats),
    "probabilities": probability_stats,
    "removed_total": len(removed_records),
    "kept_per_file": {
        fname: len(records) for fname, records in kept_by_file.items()
    }
}

timestamp = datetime.now().strftime("%Y%m%d%H%M")
summary_filename = f"summary_stats_{timestamp}.txt"
end_time = datetime.now()
processing_duration = end_time - start_time

write_summary_stats(
    stats,
    os.path.join(stats_path, summary_filename),
    dedup_summary,
    threshold=args.threshold,
    start_time=start_time,
    end_time=end_time,
    processing_duration=processing_duration
)


input_archive_path = os.path.join(dedup_path, "input")
ensure_dir(input_archive_path)

client.indices.delete(index="news-records", ignore=[400, 404])

for fpath in files:
    os.rename(fpath, os.path.join(input_archive_path, os.path.basename(fpath)))
    

log("Processing completed.")

if name == “main”:
main()