Instructions to use openpecha/aligner with libraries, inference providers, notebooks, and local apps. Follow these links to get started.
- Libraries
- Transformers
How to use openpecha/aligner with Transformers:
# Load model directly from transformers import AutoTokenizer, AutoModelForSeq2SeqLM tokenizer = AutoTokenizer.from_pretrained("openpecha/aligner") model = AutoModelForSeq2SeqLM.from_pretrained("openpecha/aligner") - Notebooks
- Google Colab
- Kaggle
| import subprocess | |
| from typing import Dict, List, Any | |
| import os | |
| import json | |
| import logging | |
| import sys | |
| import tempfile | |
| import time | |
| from pathlib import Path | |
| import re | |
| import shutil | |
| import stat | |
| import subprocess | |
| import uuid | |
| from contextlib import contextmanager | |
| import requests | |
| logging.basicConfig(format="%(asctime)s - %(message)s", level=logging.INFO) | |
| # Git clone command | |
| git_clone_command = "git clone https://github.com/TenzinGayche/bertalign.git" | |
| # Run the command using subprocess | |
| try: | |
| subprocess.run(git_clone_command, shell=True, check=True) | |
| print("Git clone successful!") | |
| except subprocess.CalledProcessError as e: | |
| print(f"Error while running Git clone command: {e}") | |
| from bertalign import Bertalign | |
| import requests | |
| GITHUB_USERNAME = "pechawa" | |
| GITHUB_ACCESS_TOKEN = "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" | |
| GITHUB_TOKEN = "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" | |
| GITHUB_EMAIL = "openpecha-bot@openpecha.org" | |
| GITHUB_ORG = "MonlamAI" | |
| MAI_TM_PUBLISH_TODO_REPO = "MonlamAI_TMs_Publish_TODO" | |
| GITHUB_API_ENDPOINT = f"https://api.github.com/orgs/{GITHUB_ORG}/repos" | |
| DEBUG = True | |
| quiet = "-q" if DEBUG else "" | |
| import subprocess | |
| import logging | |
| def create_github_repo(repo_path: Path, repo_name: str, version: str, realign: bool): | |
| logging.info("[INFO] Creating GitHub repo...") | |
| # Configure git users | |
| subprocess.run(f"git config --global user.name {GITHUB_USERNAME}".split()) | |
| subprocess.run(f"git config --global user.email {GITHUB_EMAIL}".split()) | |
| # Initialize a Git repository | |
| subprocess.run(f"git init".split(), cwd=str(repo_path)) | |
| # Add files and check if there are changes to commit | |
| subprocess.run("git add .".split(), cwd=str(repo_path)) | |
| status_result = subprocess.run("git status --porcelain".split(), cwd=str(repo_path), capture_output=True) | |
| if status_result.stdout: | |
| commit_message = "Initial commit" if not realign else f"Commit for version {version}" | |
| subprocess.run(f"git commit -m".split() + [commit_message], cwd=str(repo_path)) | |
| else: | |
| logging.info("No changes to commit.") | |
| # Check if realigning | |
| if realign: | |
| if version: | |
| # Checkout the new branch | |
| subprocess.run(f"git checkout -b {version}".split(), cwd=str(repo_path)) | |
| else: | |
| logging.error("Version not specified for realignment.") | |
| return | |
| else: | |
| # Create and checkout main branch if it doesn't exist | |
| subprocess.run("git branch -m main".split(), cwd=str(repo_path)) | |
| subprocess.run("git checkout main".split(), cwd=str(repo_path)) | |
| # Create a new repository on GitHub | |
| response = requests.post( | |
| GITHUB_API_ENDPOINT, | |
| json={ | |
| "name": repo_name, | |
| "private": True, | |
| }, | |
| auth=(GITHUB_USERNAME, GITHUB_ACCESS_TOKEN), | |
| ) | |
| response.raise_for_status() | |
| time.sleep(3) | |
| # Add the GitHub remote to the local Git repository | |
| remote_url = f"https://{GITHUB_USERNAME}:{GITHUB_ACCESS_TOKEN}@github.com/{GITHUB_ORG}/{repo_name}.git" | |
| subprocess.run( | |
| f"git remote add origin {remote_url}", cwd=str(repo_path), shell=True | |
| ) | |
| # Push the changes to the appropriate branch | |
| branch_name = version if realign else "main" | |
| subprocess.run(f"git push -u origin {branch_name}".split(), cwd=str(repo_path)) | |
| return f"Branch '{branch_name}' updated in {repo_name}" if realign else response.json()["html_url"] | |
| def convert_raw_align_to_tm(align_fn: Path, tm_path: Path): | |
| if DEBUG: | |
| logging.debug("[INFO] Conerting raw alignment to TM repo...") | |
| def load_alignment(fn: Path): | |
| try: | |
| # Ensure fn is a Path object | |
| fn = Path(fn) | |
| # Check if the file exists | |
| if not fn.exists(): | |
| logging.error(f"File does not exist: {fn}") | |
| return | |
| # Read and log the content of the file | |
| content = fn.read_text() | |
| except Exception as e: | |
| logging.error(f"Error while reading file {fn}: {e}") | |
| if not content: | |
| return [] | |
| for seg_pair in content.splitlines(): | |
| if not seg_pair: | |
| continue | |
| if "\t" in seg_pair: | |
| try: | |
| bo_seg, en_seg = seg_pair.split("\t", 1) | |
| except Exception as e: | |
| logging.error(f"{e} in {fn}") | |
| raise | |
| else: | |
| bo_seg = seg_pair | |
| en_seg = "\n" | |
| yield bo_seg, en_seg | |
| text_bo_fn = tm_path / f"{tm_path.name}-bo.txt" | |
| text_en_fn = tm_path / f"{tm_path.name}-en.txt" | |
| with open(text_bo_fn, "w", encoding="utf-8") as bo_file, open( | |
| text_en_fn, "w", encoding="utf-8" | |
| ) as en_file: | |
| for bo_seg, en_seg in load_alignment(align_fn): | |
| bo_file.write(bo_seg + "\n") | |
| en_file.write(en_seg + "\n") | |
| return tm_path | |
| def get_github_dev_url(raw_github_url: str) -> str: | |
| base_url = "https://github.dev" | |
| _, file_path = raw_github_url.split(".com") | |
| blob_file_path = file_path.replace("main", "blob/main") | |
| return base_url + blob_file_path | |
| def add_input_in_readme(input_dict: Dict[str, str], path: Path) -> Path: | |
| input_readme_fn = path / "README.md" | |
| text_id = input_dict["text_id"] | |
| bo_file_url = get_github_dev_url(input_dict["bo_file_url"]) | |
| en_file_url = get_github_dev_url(input_dict["en_file_url"]) | |
| bo_file_url=os.path.split(bo_file_url) | |
| bo_file_url=os.path.join("https://github.com/MonlamAI/",str(text_id).replace("TM","BO")) | |
| en_file_url=os.path.split(en_file_url) | |
| en_file_url=os.path.join("https://github.com/MonlamAI/",str(text_id).replace("TM","EN")) | |
| print(bo_file_url,en_file_url) | |
| input_string = "## Input\n- [BO{}]({})\n- [EN{}]({})".format( | |
| text_id, bo_file_url, text_id, en_file_url | |
| ) | |
| input_readme_fn.write_text(input_string) | |
| return path | |
| def add_to_publish_todo_repo(org, repo_name, file_path, access_token): | |
| base_url = f"https://api.github.com/repos/{org}/{repo_name}/contents/" | |
| headers = { | |
| "Authorization": f"Bearer {access_token}", | |
| "Accept": "application/vnd.github.v3+json", | |
| } | |
| url = base_url + file_path | |
| response = requests.get(url, headers=headers) | |
| if response.status_code == 200: | |
| print(f"[INFO] '{file_path}' already added.") | |
| return | |
| payload = {"message": f"Add {file_path}", "content": ""} | |
| response = requests.put(url, headers=headers, json=payload) | |
| if response.status_code == 201: | |
| print(f"[INFO] '{file_path}' added to publish todo") | |
| else: | |
| print(f"[ERROR] Failed to add '{file_path}'.") | |
| print(f"[ERROR] Response: {response.text}") | |
| def create_tm(align_fn: Path, text_pair): | |
| align_fn = Path(align_fn) | |
| text_id = text_pair["text_id"] | |
| try: | |
| version= text_pair["version"] | |
| realign=text_pair["realign"] | |
| except: | |
| version="" | |
| realign=False | |
| with tempfile.TemporaryDirectory() as tmp_dir: | |
| output_dir = Path(tmp_dir) | |
| repo_name = f"TM{text_id}" | |
| tm_path = output_dir / repo_name | |
| tm_path.mkdir(exist_ok=True, parents=True) | |
| repo_path = convert_raw_align_to_tm(align_fn, tm_path) | |
| repo_path = add_input_in_readme(text_pair, tm_path) | |
| repo_url = create_github_repo(repo_path, repo_name,version,realign) | |
| logging.info(f"TM repo created: {repo_url}") | |
| add_to_publish_todo_repo(GITHUB_ORG, MAI_TM_PUBLISH_TODO_REPO, repo_name, GITHUB_ACCESS_TOKEN) | |
| return repo_url | |
| ##--------------------------------------------- MAIN -----------------------## | |
| def TemporaryDirectory(): | |
| tmpdir = Path("./output").resolve() / uuid.uuid4().hex[:8] | |
| tmpdir.mkdir(exist_ok=True, parents=True) | |
| try: | |
| yield tmpdir | |
| finally: | |
| shutil.rmtree(str(tmpdir)) | |
| def download_file(s3_public_url: str, output_fn) -> Path: | |
| """Download file from a public S3 bucket URL.""" | |
| with requests.get(s3_public_url, stream=True) as r: | |
| r.raise_for_status() | |
| with open(output_fn, "wb") as f: | |
| for chunk in r.iter_content(chunk_size=8192): | |
| f.write(chunk) | |
| return output_fn | |
| # def download_file(github_file_url: str, output_fn) -> Path: | |
| # """Download file from github""" | |
| # headers = { | |
| # "Authorization": f"token {GITHUB_TOKEN}", | |
| # "Accept": "application/vnd.github+json", | |
| # } | |
| # authenticated_file_url = f"{github_file_url}?token={GITHUB_TOKEN}" | |
| # with requests.get(authenticated_file_url, headers=headers, stream=True) as r: | |
| # r.raise_for_status() | |
| # with open(output_fn, "wb") as f: | |
| # for chunk in r.iter_content(chunk_size=8192): | |
| # f.write(chunk) | |
| # return output_fn | |
| def _run_align_script(bo_fn, en_fn, output_dir): | |
| start = time.time() | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| # Read the text from the files | |
| bo_text = Path(bo_fn).read_text(encoding='utf-8') | |
| en_text = Path(en_fn).read_text(encoding='utf-8') | |
| # Initialize the aligner and align sentences | |
| aligner = Bertalign(bo_text, en_text, "bo", "en") | |
| aligner.align_sents() | |
| result = aligner.return_tsv() | |
| # Prepare the output filename and write the result | |
| output_fn = Path(output_dir).joinpath('result.txt') # Ensures correct path handling | |
| with open(output_fn, "w", encoding='utf-8') as f: | |
| f.write(result) | |
| # Calculate and log the time taken | |
| end = time.time() | |
| total_time = round((end - start) / 60, 2) | |
| logging.info(f"Total time taken for Aligning: {total_time} mins") | |
| return output_fn | |
| def align(text_pair): | |
| logging.info(f"Running aligner for TM{text_pair['text_id']}...") | |
| with TemporaryDirectory() as tmpdir: | |
| output_dir = Path(tmpdir) | |
| # Download files and validate them | |
| bo_fn = download_file(text_pair["bo_file_url"], output_fn=output_dir / "bo.txt") | |
| en_fn = download_file(text_pair["en_file_url"], output_fn=output_dir / "en.txt") | |
| # Check if files are downloaded correctly | |
| if not bo_fn.exists() or bo_fn.stat().st_size == 0: | |
| logging.error(f"Failed to download or empty file: {bo_fn}") | |
| return {"error": "Failed to download Tibetan file or file is empty"} | |
| if not en_fn.exists() or en_fn.stat().st_size == 0: | |
| logging.error(f"Failed to download or empty file: {en_fn}") | |
| return {"error": "Failed to download English file or file is empty"} | |
| # Log content of files for verification | |
| # Run alignment script | |
| try: | |
| aligned_fn = _run_align_script(bo_fn, en_fn, output_dir) | |
| except Exception as e: | |
| logging.error(f"Alignment script error: {e}") | |
| return {"error": f"Alignment script failed: {e}"} | |
| # Create TM repository | |
| try: | |
| repo_url = create_tm(aligned_fn, text_pair=text_pair) | |
| return {"tm_repo_url": repo_url} | |
| except Exception as e: | |
| logging.error(f"Error in creating TM repository: {e}") | |
| return {"error": f"Error in repository creation: {e}"} | |
| class EndpointHandler(): | |
| def __init__(self, path=""): | |
| self.path = path | |
| def __call__(self, data: Any) -> List[List[Dict[str, float]]]: | |
| """ | |
| Args: | |
| data (:obj:): | |
| includes the input data and the parameters for the inference. | |
| Return: | |
| A :obj:`list`:. The list contains the embeddings of the inference inputs | |
| """ | |
| data = data.pop("inputs",data) | |
| return align(data) |