Computational Tools Overview
Our computational toolkit consists of six independent programs designed to support both dry lab analysis and wet lab experimentation. Programs 1-4 focus on dry lab computational analysis of hepatocellular carcinoma (HCC) RNA-seq data and HBV genome conservation, while Programs 5-6 support wet lab plasmid design and experimental data analysis.
Program 1: Gene Expression Analysis - Standalone Python script that filters and prioritizes HCC-associated genes using TPM normalization
Program 2: RNA-seq Alignment - SLURM batch script for HPC clusters that maps HCC RNA reads to the HBV genome
Program 3: Alignment Evaluation - Google Colab notebook for TPM calculation and position-wise conservation analysis
Program 4: Conservation Profiling - Independent Python program that analyzes thousands of HBV genomes for conserved regions
Program 5: Conservation Visualization - Google Colab notebook to visualize the position of conserved 140 base pair sequences across the most common HBV genomes
Program 6: Plasmid Design Tool - Google Colab notebook for modifying NCBI sequences to meet RADAR sensor design criteria
Program 7: Flow Cytometry Analysis - Google Colab notebook for statistical analysis and visualization of experimental flow cytometry data
Program 1: Gene Expression Analysis
Independent Python script for TPM-based gene filtering and differential expression analysis
This standalone Python program processes transcriptomic data from hepatocellular carcinoma samples to identify genes with elevated expression levels. It can be run independently on any system with Python and pandas installed.
- Standalone execution - no dependencies on other programs
- Command-line interface with user-defined TPM cutoffs
- Automated gene symbol mapping and normalization
- Generates three output files ranked by different metrics
- Works with standard CSV and TSV input formats
Run with: python gene_expression_pipeline.py
Requires: ACH-000625_expression.csv, Gene_symbol_mapping__txt_-__csv_.csv, table_degenes.txt
Outputs: tpm_sorted.csv, log+tpm_sorted.csv, average_sorted.csv
Complete Program Code
#!/usr/bin/env python3
"""
Gene Expression Analysis Pipeline
Filters genes by TPM cutoff, merges with differential expression data,
and ranks candidates by multiple metrics for therapeutic targeting.
"""
import re
import sys
import pandas as pd
from typing import Tuple
# -------- File Paths --------
ACH_PATH = "ACH-000625_expression.csv"
MAP_PATH = "Gene_symbol_mapping__txt_-__csv_.csv"
DE_PATH = "table_degenes.txt"
TPM_OUT_PATH = "tpm_sorted.csv"
LOG_OUT_PATH = "log+tpm_sorted.csv"
AVG_OUT_PATH = "average_sorted.csv"
# -------- Helper Functions --------
def load_ach_expression(path_csv: str) -> pd.DataFrame:
"""Load ACH expression data with TPM values in first row"""
df = pd.read_csv(path_csv)
first_col = df.columns[0]
df2 = df.drop(columns=[first_col])
tpm_series = df2.iloc[0]
tidy = (
tpm_series.reset_index()
.rename(columns={'index': 'csv_col_name', 0: 'tpm'})
.assign(tpm=lambda x: pd.to_numeric(x['tpm'], errors='coerce'))
.dropna(subset=['tpm'])
)
return tidy
def parse_csv_col(col: str) -> Tuple[str, str]:
"""Extract gene symbol and Entrez ID from column names"""
m = re.match(r"^([^ ]+)\s*\((\d+)\)$", str(col))
if m:
return m.group(1), m.group(2)
return str(col), None
def normalize_symbol(s: str) -> str:
"""Normalize gene symbols for matching"""
return str(s).strip().upper()
def main():
# Step 1: Get TPM cutoff from user
try:
user_in = input("Enter TPM cutoff (float): ").strip()
TPM_CUTOFF = float(user_in) if user_in else 1.0
except Exception:
TPM_CUTOFF = 1.0
print(f"[INFO] Using TPM cutoff: {TPM_CUTOFF}")
# Step 2: Load and filter expression data
ach_tidy = load_ach_expression(ACH_PATH)
ach_tidy_sorted = ach_tidy.sort_values('tpm', ascending=False)
ach_tpm_filtered = ach_tidy_sorted[
ach_tidy_sorted['tpm'] >= TPM_CUTOFF
].reset_index(drop=True)
print(f"[INFO] Genes with TPM ≥ {TPM_CUTOFF}: {len(ach_tpm_filtered)}")
ach_tpm_filtered.to_csv(TPM_OUT_PATH, index=False)
print(f"[SAVED] {TPM_OUT_PATH}")
# Step 3: Load mapping and differential expression data
mapping_df = pd.read_csv(MAP_PATH)
de_df = pd.read_csv(DE_PATH, sep="\t")
# Step 4: Merge datasets by normalized gene symbols
ach_with_parsed = ach_tpm_filtered.copy()
ach_with_parsed[['csv_gene_symbol', 'entrez_id']] = \
ach_with_parsed['csv_col_name'].apply(lambda c: pd.Series(parse_csv_col(c)))
ach_with_parsed['gene_symbol_norm'] = \
ach_with_parsed['csv_gene_symbol'].apply(normalize_symbol)
# Normalize mapping symbols
cols_lower = {c.lower(): c for c in mapping_df.columns}
gene_symbol_col = cols_lower.get('gene symbol', cols_lower.get('gene_symbol'))
if gene_symbol_col:
mapping_df['gene_symbol_norm'] = \
mapping_df[gene_symbol_col].apply(normalize_symbol)
# Normalize DE symbols
de_cols_lower = {c.lower(): c for c in de_df.columns}
de_gene_symbol_col = de_cols_lower.get('gene symbol')
de_logfc_col = de_cols_lower.get('log2(fold change)')
if de_gene_symbol_col:
de_df['gene_symbol_norm'] = \
de_df[de_gene_symbol_col].apply(normalize_symbol)
# Merge operations
merged = ach_with_parsed.merge(
mapping_df, on='gene_symbol_norm', how='left'
)
merged2 = merged.merge(
de_df[['gene_symbol_norm', de_logfc_col]],
on='gene_symbol_norm',
how='left'
)
# Step 5: Filter positive log2FC and sort
pos_log = merged2[merged2[de_logfc_col] > 0].copy()
pos_log_sorted = pos_log.sort_values(
by=de_logfc_col, ascending=False
).reset_index(drop=True)
pos_log_sorted.to_csv(LOG_OUT_PATH, index=False)
print(f"[SAVED] {LOG_OUT_PATH}")
# Step 6: Calculate average metric and sort
pos_log_sorted['avg_tpm_log2fc'] = \
(pos_log_sorted['tpm'] + pos_log_sorted[de_logfc_col]) / 2.0
avg_sorted = pos_log_sorted.sort_values(
'avg_tpm_log2fc', ascending=False
).reset_index(drop=True)
avg_sorted.to_csv(AVG_OUT_PATH, index=False)
print(f"[SAVED] {AVG_OUT_PATH}")
# Display top candidates
print("\nTop 10 by Log2FC:")
print(pos_log_sorted[['csv_col_name', 'tpm', de_logfc_col]].head(10))
print("\nTop 10 by combined average:")
print(avg_sorted[['csv_col_name', 'tpm', de_logfc_col, 'avg_tpm_log2fc']].head(10))
if __name__ == "__main__":
main()
Program 2: HCC RNA-seq Alignment
SLURM batch script for high-performance computing clusters
This is a standalone SLURM job script designed to run on HPC infrastructure like Stanford's Sherlock cluster. It performs complete RNA-seq alignment workflow independently, requiring no connection to other programs.
- Self-contained HPC job submission script
- Automated STAR index generation and alignment
- Built-in quality control and statistics generation
- Configurable resource allocation (CPUs, memory, time)
- Results packaged as downloadable archive
Submit with: sbatch hcc_rnaseq_pipeline.sh
Requires: HPC cluster with SLURM, conda environment with STAR and samtools
Outputs: Aligned BAM files, gene counts, coverage statistics, archived results
Complete Program Code
#!/bin/bash
#SBATCH --job-name=hcc_rnaseq
#SBATCH --time=07:00:00
#SBATCH --cpus-per-task=8
#SBATCH --mem=138G
#SBATCH --output=hcc_rnaseq_%j.out
#SBATCH --error=hcc_rnaseq_%j.err
# ====================================================================
# USER CONFIGURATION
# ====================================================================
SCRATCH_BASE=/scratch/groups/...
INPUT_DIR=$SCRATCH_BASE
OUTPUT_DIR=$SCRATCH_BASE/align_outputs
HCC_RNA_READS="$INPUT_DIR/SRR953772.fastq"
HEPB_GENOME="$INPUT_DIR/sequence.fasta"
HEPB_GTF="$INPUT_DIR/U95551.1.gtf"
THREADS=8
MEMORY_GB=138
# ====================================================================
# ACTIVATE CONDA ENVIRONMENT
# ====================================================================
echo "Activating Conda environment 'rnaseq_env'..."
source $HOME/miniconda3/bin/activate rnaseq_env
# ====================================================================
# CREATE OUTPUT DIRECTORIES
# ====================================================================
HEPB_INDEX_DIR="${OUTPUT_DIR}/hepb_star_index"
ALIGNMENTS_DIR="${OUTPUT_DIR}/alignments"
COUNTS_DIR="${OUTPUT_DIR}/counts"
mkdir -p $HEPB_INDEX_DIR $ALIGNMENTS_DIR $COUNTS_DIR
# ====================================================================
# STEP 1: BUILD STAR INDEX
# ====================================================================
echo "Building STAR index for HBV genome..."
if [ ! -f "${HEPB_INDEX_DIR}/genomeParameters.txt" ]; then
STAR --runMode genomeGenerate \
--genomeDir $HEPB_INDEX_DIR \
--genomeFastaFiles $HEPB_GENOME \
--sjdbGTFfile $HEPB_GTF \
--sjdbOverhang 49 \
--runThreadN $THREADS \
--limitGenomeGenerateRAM $((MEMORY_GB * 1000000000)) \
--genomeSAindexNbases 4
echo "STAR index built successfully"
else
echo "STAR index already exists, skipping..."
fi
# ====================================================================
# STEP 2: ALIGN RNA READS TO HBV GENOME
# ====================================================================
echo "Running STAR alignment..."
STAR --genomeDir $HEPB_INDEX_DIR \
--readFilesIn $HCC_RNA_READS \
--outFileNamePrefix ${ALIGNMENTS_DIR}/hepb_alignment_ \
--outSAMtype BAM SortedByCoordinate \
--runThreadN $THREADS \
--limitBAMsortRAM $((MEMORY_GB * 1000000000)) \
--outFilterMultimapNmax 100 \
--outFilterMismatchNmax 3 \
--outSAMattributes All \
--outSAMprimaryFlag OneBestScore \
--outSAMunmapped Within
echo "Indexing BAM file..."
samtools index ${ALIGNMENTS_DIR}/hepb_alignment_Aligned.sortedByCoord.out.bam
# ====================================================================
# STEP 3: COUNT READS PER GENE
# ====================================================================
echo "Counting reads with featureCounts..."
featureCounts -a $HEPB_GTF \
-o ${COUNTS_DIR}/hepb_gene_counts.txt \
-T $THREADS \
-g locus_tag \
-t CDS \
-M \
--fraction \
${ALIGNMENTS_DIR}/hepb_alignment_Aligned.sortedByCoord.out.bam
# ====================================================================
# STEP 4: GENERATE ALIGNMENT STATISTICS
# ====================================================================
echo "Generating alignment statistics..."
samtools flagstat ${ALIGNMENTS_DIR}/hepb_alignment_Aligned.sortedByCoord.out.bam \
> ${COUNTS_DIR}/alignment_stats.txt
echo "Calculating coverage depth..."
samtools depth ${ALIGNMENTS_DIR}/hepb_alignment_Aligned.sortedByCoord.out.bam \
> ${COUNTS_DIR}/coverage_depth.txt
# ====================================================================
# STEP 5: ARCHIVE RESULTS
# ====================================================================
echo "Archiving results..."
cd $OUTPUT_DIR
tar -czf align_outputs.tar.gz alignments counts
echo "====================================================================="
echo "HCC RNA-seq pipeline completed successfully!"
echo "Outputs are in: $OUTPUT_DIR"
echo "Download align_outputs.tar.gz for local analysis"
echo "====================================================================="
Program 3: Alignment Evaluation & Conservation Analysis
Google Colab notebook for interactive post-alignment analysis
This is a complete Jupyter notebook that runs entirely in Google Colab. It can analyze alignment results from any source, not just from Program 2. The notebook operates independently and includes all necessary functions for TPM normalization, conservation analysis, and visualization.
This standalone notebook contains 12 cells covering complete analysis workflow: environment setup, file upload interface, data loading, TPM calculation, position-wise conservation analysis, consensus sequence generation, antisense RNA design, interactive visualizations, and automated report generation.
- Runs completely in Google Colab - no local installation needed
- Interactive file upload and processing
- Independent analysis tool - can work with any BAM files
- Built-in visualization engine with matplotlib and plotly
- Generates downloadable reports and results packages
Upload: BAM files from Program 2 or any STAR alignment results
Outputs: TPM matrices, conservation plots, quality reports
Complete Notebook Code
# HCC RNA-seq Conservation Analysis - Google Colab Notebook
# Part 2: Local Analysis of Supercomputer Alignment Results
# ====================================================================
# CELL 1: SETUP AND INSTALLATIONS
# ====================================================================
# Install required packages
!pip install pysam pandas biopython matplotlib seaborn numpy scipy plotly
# Import libraries
import pysam
import pandas as pd
from collections import defaultdict, Counter
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from Bio import SeqIO
import json
import plotly.graph_objects as go
import plotly.subplots as sp
from plotly.offline import plot
from datetime import datetime
import warnings
warnings.filterwarnings('ignore')
# Set up plotting
plt.style.use('default')
sns.set_palette("husl")
print("Setup completed! Ready for analysis.")
# ====================================================================
# CELL 2: FILE UPLOAD AND EXTRACTION
# ====================================================================
from google.colab import files
import tarfile
import os
print("Please upload your alignment results archive (hcc_alignment_results.tar.gz)")
print("Also upload sequence.fasta and HepBannotations.gtf if not included in archive")
# Upload files
uploaded = files.upload()
# Extract archive if uploaded
for filename in uploaded.keys():
if filename.endswith('.tar.gz'):
print(f"Extracting {filename}...")
with tarfile.open(filename, 'r:gz') as tar:
tar.extractall()
print("Extraction completed!")
elif filename == 'sequence.fasta':
print("HBV genome reference uploaded")
elif filename == 'HepBannotations.gtf':
print("HBV annotations uploaded")
# List uploaded files
print("\nFiles available for analysis:")
for root, dirs, files in os.walk('.'):
for file in files:
if not file.startswith('.'):
print(os.path.join(root, file))
# ====================================================================
# CELL 3: LOAD AND PREPARE DATA
# ====================================================================
def load_reference_genome(fasta_file="sequence.fasta"):
"""Load reference genome sequences"""
ref_genome = {}
for record in SeqIO.parse(fasta_file, "fasta"):
ref_genome[record.id] = str(record.seq).upper()
return ref_genome
def load_gene_coordinates(bed_file="counts/hepb_genes.bed"):
"""Load gene coordinates from bed file"""
genes = {}
with open(bed_file, "r") as f:
for line in f:
parts = line.strip().split("\t")
if len(parts) >= 5:
chrom = parts[0]
start = int(parts[1]) - 1 # Convert to 0-based
end = int(parts[2])
gene_id = parts[3]
gene_name = parts[4]
genes[gene_id] = {
'chrom': chrom,
'start': start,
'end': end,
'name': gene_name,
'length': end - start
}
return genes
# Load data
print("Loading reference genome...")
ref_genome = load_reference_genome()
print("Loading gene coordinates...")
genes = load_gene_coordinates()
print(f"Loaded {len(ref_genome)} chromosome(s) and {len(genes)} genes")
print("Genes found:", list(genes.keys()))
# ====================================================================
# CELL 4: TPM NORMALIZATION
# ====================================================================
def calculate_tpm_normalization(counts_file="counts/hepb_gene_counts.txt"):
"""Calculate TPM normalized expression values"""
# Read count data
counts = pd.read_csv(counts_file, sep='\t', comment='#', header=1)
# Extract count columns (usually the last column)
count_col = counts.columns[-1]
gene_counts = counts.set_index('Geneid')[count_col]
# Remove zero counts
gene_counts = gene_counts[gene_counts > 0]
# HBV gene lengths (actual CDS lengths)
hbv_gene_lengths = {
"HBVORF01": 1170, # PreS1/LHBs
"HBVORF02": 639, # PreC/HBeAg
"HBVORF03": 465, # X protein
"HBVORF04": 336, # SP (non-functional)
"HBVORF11": 844, # PreS2/MHBs
"HBVORF12": 552, # Core/HBcAg
"HBVORF13": 2499, # Polymerase
"HBVORF21": 681 # Surface/HBsAg
}
# Get gene lengths for our data
valid_genes = [gene for gene in gene_counts.index if gene in hbv_gene_lengths]
gene_lengths = pd.Series([hbv_gene_lengths[gene] for gene in valid_genes], index=valid_genes)
# Filter counts to valid genes
gene_counts = gene_counts[valid_genes]
# TPM calculation
# Step 1: RPK (Reads Per Kilobase)
rpk = gene_counts / (gene_lengths / 1000)
# Step 2: TPM (Transcripts Per Million)
tpm = rpk / (rpk.sum() / 1e6)
print("TPM Normalization Results:")
print(f"Genes processed: {len(tpm)}")
print(f"Total TPM: {tpm.sum():.1f}")
for gene, tpm_val in tpm.items():
gene_name = hbv_gene_lengths.get(gene, "Unknown")
print(f"{gene}: {tpm_val:.2f} TPM")
return tpm, gene_counts
# Calculate TPM values
tpm_values, raw_counts = calculate_tpm_normalization()
# ====================================================================
# CELL 5: POSITION-BY-POSITION CONSERVATION ANALYSIS
# ====================================================================
def extract_nucleotides_at_positions(bam_file, genes, ref_genome, min_read_support=5):
"""Extract nucleotides at each position for each gene"""
bam = pysam.AlignmentFile(bam_file, "rb")
gene_position_nucleotides = {}
for gene_id, gene_info in genes.items():
print(f"Processing gene {gene_id} ({gene_info['name']})...")
chrom = gene_info['chrom']
start = gene_info['start']
end = gene_info['end']
# Initialize position-wise nucleotide storage
position_nucleotides = defaultdict(list)
# Get all reads mapping to this gene
try:
for read in bam.fetch(chrom, start, end):
if read.is_unmapped or read.is_secondary or read.is_supplementary:
continue
# Get aligned pairs (query_pos, ref_pos)
aligned_pairs = read.get_aligned_pairs(matches_only=True, with_seq=False)
for query_pos, ref_pos in aligned_pairs:
if query_pos is not None and ref_pos is not None:
# Check if position is within gene boundaries
if start <= ref_pos < end:
gene_relative_pos = ref_pos - start
nucleotide = read.query_sequence[query_pos].upper()
position_nucleotides[gene_relative_pos].append(nucleotide)
except Exception as e:
print(f"Warning: Could not process {gene_id}: {e}")
continue
# Filter positions with sufficient read support
filtered_positions = {}
for pos, nucleotides in position_nucleotides.items():
if len(nucleotides) >= min_read_support:
filtered_positions[pos] = nucleotides
gene_position_nucleotides[gene_id] = {
'positions': filtered_positions,
'gene_info': gene_info
}
print(f" Found {len(filtered_positions)} positions with ≥{min_read_support} reads")
bam.close()
return gene_position_nucleotides
def calculate_nucleotide_frequencies(gene_position_nucleotides):
"""Calculate nucleotide frequencies at each position"""
gene_frequencies = {}
for gene_id, data in gene_position_nucleotides.items():
positions = data['positions']
gene_info = data['gene_info']
position_frequencies = {}
for pos, nucleotides in positions.items():
# Count nucleotides
nucleotide_counts = Counter(nucleotides)
total_reads = len(nucleotides)
# Calculate frequencies
frequencies = {}
for nucleotide in ['A', 'T', 'G', 'C']:
count = nucleotide_counts.get(nucleotide, 0)
frequencies[nucleotide] = {
'count': count,
'frequency': count / total_reads * 100
}
# Find most common nucleotide
most_common = max(frequencies.items(), key=lambda x: x[1]['count'])
# Calculate entropy
entropy = 0
for count in nucleotide_counts.values():
if count > 0:
p = count / total_reads
entropy -= p * np.log2(p)
position_frequencies[pos] = {
'nucleotides': frequencies,
'most_common': most_common[0],
'conservation': most_common[1]['frequency'],
'total_reads': total_reads,
'entropy': entropy
}
gene_frequencies[gene_id] = {
'positions': position_frequencies,
'gene_info': gene_info
}
return gene_frequencies
def find_conserved_regions(gene_frequencies, min_length=3, min_conservation=70):
"""Find continuously conserved regions"""
gene_conserved_regions = {}
for gene_id, data in gene_frequencies.items():
positions = data['positions']
gene_info = data['gene_info']
# Sort positions
sorted_positions = sorted(positions.keys())
conserved_regions = []
current_region = []
for pos in sorted_positions:
conservation = positions[pos]['conservation']
if conservation >= min_conservation:
# Check if this position is continuous with current region
if not current_region or pos == current_region[-1]['position'] + 1:
current_region.append({
'position': pos,
'nucleotide': positions[pos]['most_common'],
'conservation': conservation,
'total_reads': positions[pos]['total_reads']
})
else:
# Save current region if it meets length requirement
if len(current_region) >= min_length:
conserved_regions.append(current_region)
# Start new region
current_region = [{
'position': pos,
'nucleotide': positions[pos]['most_common'],
'conservation': conservation,
'total_reads': positions[pos]['total_reads']
}]
else:
# End current region
if len(current_region) >= min_length:
conserved_regions.append(current_region)
current_region = []
# Don't forget the last region
if len(current_region) >= min_length:
conserved_regions.append(current_region)
gene_conserved_regions[gene_id] = {
'regions': conserved_regions,
'gene_info': gene_info
}
print(f"Gene {gene_id}: Found {len(conserved_regions)} conserved regions")
return gene_conserved_regions
# Run conservation analysis
print("Extracting nucleotides at positions...")
bam_file = "alignments/hepb_alignment_Aligned.sortedByCoord.out.bam"
gene_position_nucleotides = extract_nucleotides_at_positions(bam_file, genes, ref_genome, min_read_support=5)
print("Calculating nucleotide frequencies...")
gene_frequencies = calculate_nucleotide_frequencies(gene_position_nucleotides)
print("Finding conserved regions...")
gene_conserved_regions = find_conserved_regions(gene_frequencies, min_length=3, min_conservation=70)
# ====================================================================
# CELL 6: GENERATE CONSENSUS SEQUENCES
# ====================================================================
def generate_consensus_sequences(gene_frequencies, ref_genome):
"""Generate consensus sequences for each gene"""
gene_consensus = {}
for gene_id, freq_data in gene_frequencies.items():
positions = freq_data['positions']
gene_info = freq_data['gene_info']
# Get reference sequence for this gene
chrom = gene_info['chrom']
start = gene_info['start']
end = gene_info['end']
ref_sequence = ref_genome[chrom][start:end]
# Create consensus sequence
consensus_sequence = list(ref_sequence) # Start with reference
# Replace with most common nucleotides where we have data
for pos, pos_data in positions.items():
if pos < len(consensus_sequence):
consensus_sequence[pos] = pos_data['most_common']
consensus_sequence = ''.join(consensus_sequence)
# Generate complementary sequence
complement_map = {'A': 'T', 'T': 'A', 'G': 'C', 'C': 'G', 'N': 'N'}
complementary_sequence = ''.join(complement_map.get(base, 'N') for base in consensus_sequence)
gene_consensus[gene_id] = {
'gene_info': gene_info,
'reference_sequence': ref_sequence,
'consensus_sequence': consensus_sequence,
'complementary_sequence': complementary_sequence,
'coverage_positions': len(positions),
'total_positions': len(ref_sequence)
}
return gene_consensus
# Generate consensus sequences
print("Generating consensus sequences...")
gene_consensus = generate_consensus_sequences(gene_frequencies, ref_genome)
# ====================================================================
# CELL 7: SAVE ANALYSIS RESULTS
# ====================================================================
def save_results(gene_frequencies, gene_conserved_regions, gene_consensus):
"""Save all results to files"""
# Create results directory
os.makedirs('analysis_results', exist_ok=True)
# Save detailed position-by-position frequencies
with open("analysis_results/position_nucleotide_frequencies.txt", "w") as f:
f.write("Gene\tPosition\tA_Count\tA_Freq\tT_Count\tT_Freq\tG_Count\tG_Freq\tC_Count\tC_Freq\t"
"Most_Common\tConservation\tTotal_Reads\tEntropy\n")
for gene_id, data in gene_frequencies.items():
for pos, pos_data in data['positions'].items():
nucleotides = pos_data['nucleotides']
f.write(f"{gene_id}\t{pos}\t"
f"{nucleotides['A']['count']}\t{nucleotides['A']['frequency']:.1f}\t"
f"{nucleotides['T']['count']}\t{nucleotides['T']['frequency']:.1f}\t"
f"{nucleotides['G']['count']}\t{nucleotides['G']['frequency']:.1f}\t"
f"{nucleotides['C']['count']}\t{nucleotides['C']['frequency']:.1f}\t"
f"{pos_data['most_common']}\t{pos_data['conservation']:.1f}\t"
f"{pos_data['total_reads']}\t{pos_data['entropy']:.3f}\n")
# Save conserved regions
with open("analysis_results/conserved_regions_detailed.txt", "w") as f:
f.write("Gene\tRegion_ID\tStart_Pos\tEnd_Pos\tLength\tSequence\tAvg_Conservation\tMin_Reads\n")
for gene_id, data in gene_conserved_regions.items():
for i, region in enumerate(data['regions']):
start_pos = region[0]['position']
end_pos = region[-1]['position']
length = len(region)
sequence = ''.join([pos['nucleotide'] for pos in region])
avg_conservation = np.mean([pos['conservation'] for pos in region])
min_reads = min([pos['total_reads'] for pos in region])
f.write(f"{gene_id}\t{i+1}\t{start_pos}\t{end_pos}\t{length}\t"
f"{sequence}\t{avg_conservation:.1f}\t{min_reads}\n")
# Save consensus sequences
with open("analysis_results/consensus_sequences.txt", "w") as f:
f.write("Gene\tGene_Name\tLength\tCoverage_Positions\tCoverage_Percent\t"
"Reference_Sequence\tConsensus_Sequence\tComplementary_Sequence\n")
for gene_id, data in gene_consensus.items():
gene_info = data['gene_info']
coverage_percent = data['coverage_positions'] / data['total_positions'] * 100
f.write(f"{gene_id}\t{gene_info['name']}\t{data['total_positions']}\t"
f"{data['coverage_positions']}\t{coverage_percent:.1f}\t"
f"{data['reference_sequence']}\t{data['consensus_sequence']}\t"
f"{data['complementary_sequence']}\n")
print("Results saved to analysis_results/ directory")
# Save results
save_results(gene_frequencies, gene_conserved_regions, gene_consensus)
# ====================================================================
# CELL 8: ANTISENSE RNA DESIGN
# ====================================================================
def design_antisense_sequences(gene_frequencies, min_conservation=85, max_length=50):
"""Design optimal antisense RNA sequences"""
def calculate_binding_energy(sequence1, sequence2):
"""Estimate binding energy (simplified)"""
base_pairs = {'A': 'T', 'T': 'A', 'G': 'C', 'C': 'G'}
if len(sequence1) != len(sequence2):
return float('inf')
energy = 0
for i in range(len(sequence1)):
base1, base2 = sequence1[i], sequence2[i]
if base_pairs.get(base1) == base2:
if (base1, base2) in [('A', 'T'), ('T', 'A')]:
energy -= 2.0 # kcal/mol
elif (base1, base2) in [('G', 'C'), ('C', 'G')]:
energy -= 3.0 # kcal/mol
else:
energy += 1.0 # Penalty for mismatch
return energy
antisense_designs = {}
for gene_id, data in gene_frequencies.items():
positions = data['positions']
gene_info = data['gene_info']
print(f"Designing antisense for {gene_id}...")
# Sort positions by conservation
sorted_positions = sorted(
[(pos, pos_data) for pos, pos_data in positions.items()],
key=lambda x: x[1]['conservation'],
reverse=True
)
# Find optimal target regions
optimal_regions = []
current_region = []
for pos, pos_data in sorted_positions:
if pos_data['conservation'] >= min_conservation:
# Check if continuous
if not current_region or pos == current_region[-1]['position'] + 1:
current_region.append({
'position': pos,
'nucleotide': pos_data['most_common'],
'conservation': pos_data['conservation'],
'total_reads': pos_data['total_reads']
})
else:
# Save current region
if len(current_region) >= 15: # Minimum length
optimal_regions.append(current_region)
# Start new region
current_region = [{
'position': pos,
'nucleotide': pos_data['most_common'],
'conservation': pos_data['conservation'],
'total_reads': pos_data['total_reads']
}]
else:
# End current region
if len(current_region) >= 15:
optimal_regions.append(current_region)
current_region = []
# Don't forget the last region
if len(current_region) >= 15:
optimal_regions.append(current_region)
# Design antisense sequences
region_designs = []
for i, region in enumerate(optimal_regions):
target_sequence = ''.join([pos['nucleotide'] for pos in region])
# Generate complementary sequence
complement_map = {'A': 'T', 'T': 'A', 'G': 'C', 'C': 'G'}
antisense_sequence = ''.join([complement_map.get(base, 'N') for base in target_sequence])
# Calculate properties
avg_conservation = np.mean([pos['conservation'] for pos in region])
min_reads = min([pos['total_reads'] for pos in region])
gc_content = (antisense_sequence.count('G') + antisense_sequence.count('C')) / len(antisense_sequence) * 100
binding_energy = calculate_binding_energy(target_sequence, antisense_sequence)
region_designs.append({
'region_id': i + 1,
'start_pos': region[0]['position'],
'end_pos': region[-1]['position'],
'length': len(region),
'target_sequence': target_sequence,
'antisense_sequence': antisense_sequence,
'avg_conservation': avg_conservation,
'min_reads': min_reads,
'gc_content': gc_content,
'estimated_binding_energy': binding_energy
})
# Select best regions within length constraint
regions_sorted = sorted(region_designs,
key=lambda x: (x['avg_conservation'], -x['estimated_binding_energy']),
reverse=True)
selected_regions = []
total_length = 0
for region in regions_sorted:
if total_length + region['length'] <= max_length:
selected_regions.append(region)
total_length += region['length']
# If no regions fit, take the best single region
if not selected_regions and regions_sorted:
best_region = regions_sorted[0]
if best_region['length'] <= max_length:
selected_regions = [best_region]
# Generate final antisense sequence with linkers
linker_sequence = "AAAAAA"
antisense_parts = [region['antisense_sequence'] for region in selected_regions]
combined_antisense = linker_sequence.join(antisense_parts)
# Calculate final properties
gc_content = (combined_antisense.count('G') + combined_antisense.count('C')) / len(combined_antisense) * 100 if combined_antisense else 0
total_conservation = np.mean([region['avg_conservation'] for region in selected_regions]) if selected_regions else 0
coverage = (total_length / gene_info['length']) * 100 if gene_info['length'] > 0 else 0
antisense_designs[gene_id] = {
'gene_info': gene_info,
'antisense_sequence': combined_antisense,
'target_regions': selected_regions,
'properties': {
'total_length': len(combined_antisense),
'gc_content': gc_content,
'num_target_regions': len(selected_regions),
'avg_conservation': total_conservation,
'gene_coverage': coverage
}
}
return antisense_designs
def save_antisense_designs(antisense_designs):
"""Save antisense design results"""
# Save final antisense sequences
with open("analysis_results/final_antisense_sequences.txt", "w") as f:
f.write("Gene\tGene_Name\tAntisense_Sequence\tLength\tGC_Content\tNum_Targets\t"
"Avg_Conservation\tGene_Coverage\n")
for gene_id, data in antisense_designs.items():
gene_name = data['gene_info']['name']
props = data['properties']
f.write(f"{gene_id}\t{gene_name}\t{data['antisense_sequence']}\t"
f"{props['total_length']}\t{props['gc_content']:.1f}\t{props['num_target_regions']}\t"
f"{props['avg_conservation']:.1f}\t{props['gene_coverage']:.1f}\n")
# Save FASTA format
with open("analysis_results/antisense_sequences.fasta", "w") as f:
for gene_id, data in antisense_designs.items():
if data['antisense_sequence']:
gene_name = data['gene_info']['name']
props = data['properties']
f.write(f">{gene_id}_{gene_name}_antisense |length={props['total_length']} "
f"|GC={props['gc_content']:.1f}% |targets={props['num_target_regions']} "
f"|conservation={props['avg_conservation']:.1f}%\n")
f.write(f"{data['antisense_sequence']}\n")
print("Antisense designs saved to analysis_results/")
# Design antisense sequences
print("Designing antisense RNA sequences...")
antisense_designs = design_antisense_sequences(gene_frequencies, min_conservation=85, max_length=50)
# Save antisense designs
save_antisense_designs(antisense_designs)
Program 4: HBV Genome Conservation Analysis
Standalone Python program for large-scale genome comparison
This is an independent Python application that downloads and analyzes thousands of HBV genome sequences from public databases. It operates completely separately from the other programs and can be used to validate any potential therapeutic targets against global HBV sequence diversity.
- Standalone execution - downloads data automatically from public databases
- Self-contained parallel processing engine
- No dependencies on other programs in this toolkit
- Configurable conservation thresholds and window sizes
- Generates comprehensive text reports
Run with: python hbv_conservation_analysis.py
Requires: Python with requests, biopython, numpy, and concurrent.futures
Outputs: conservation_report.txt with detailed analysis of conserved regions
Complete Program Code
# Optimized version with major performance improvements
import requests
from collections import defaultdict, Counter
from Bio import SeqIO
import io
import numpy as np
from concurrent.futures import ThreadPoolExecutor
import multiprocessing
def download_hbv_genomes(url, output_file="hbv_genomes.fas"):
"""Download the HBV genomes file from the given URL."""
print("Downloading HBV genomes file...")
try:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
response = requests.get(url, stream=True, headers=headers, timeout=30)
response.raise_for_status()
with open(output_file, 'wb') as f:
total_size = 0
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
total_size += len(chunk)
print(f"Downloaded successfully: {output_file} ({total_size} bytes)")
return output_file
except requests.exceptions.RequestException as e:
print(f"Error downloading file: {e}")
return None
def load_genomes(fasta_file):
"""Load genome sequences with better error handling."""
genomes = []
genome_ids = []
print("Loading genome sequences...")
try:
encodings = ['utf-8', 'latin-1', 'ascii']
file_content = None
for encoding in encodings:
try:
with open(fasta_file, 'r', encoding=encoding) as f:
file_content = f.read()
print(f"Successfully read file with {encoding} encoding")
break
except UnicodeDecodeError:
continue
if file_content is None or not file_content.strip().startswith('>'):
print("Could not read file or not FASTA format")
return [], []
sequences_found = 0
for record in SeqIO.parse(io.StringIO(file_content), "fasta"):
sequence = str(record.seq).upper()
if len(sequence) > 2500: # Filter short sequences
genomes.append(sequence)
genome_id = record.id.split('|')[2] if '|' in record.id else record.id
genome_ids.append(genome_id)
sequences_found += 1
if sequences_found % 100 == 0 and sequences_found > 0:
print(f"Loaded {sequences_found} sequences...")
print(f"Loaded {len(genomes)} HBV genome sequences")
if genomes:
lengths = [len(genome) for genome in genomes]
print(f"Length stats: min={min(lengths)}, max={max(lengths)}, avg={sum(lengths)/len(lengths):.0f}")
return genomes, genome_ids
except Exception as e:
print(f"Error loading genomes: {e}")
return [], []
def count_mismatches_fast(seq1, seq2):
"""Fast mismatch counting using numpy if sequences are same length."""
if len(seq1) != len(seq2):
return float('inf')
# Convert to numpy arrays for faster comparison
arr1 = np.frombuffer(seq1.encode('ascii'), dtype=np.uint8)
arr2 = np.frombuffer(seq2.encode('ascii'), dtype=np.uint8)
return np.sum(arr1 != arr2)
def analyze_position_batch(args):
"""Process a batch of positions - for multiprocessing."""
genomes, start_pos, end_pos, window_size, max_mismatches = args
position_data = {}
for pos in range(start_pos, min(end_pos, len(genomes[0]) - window_size + 1)):
# Get all sequences at this position
sequences_at_pos = {}
for genome_idx, genome in enumerate(genomes):
if pos + window_size <= len(genome):
window = genome[pos:pos + window_size]
if len(window) == window_size and all(nt in 'ATCG' for nt in window):
if window not in sequences_at_pos:
sequences_at_pos[window] = []
sequences_at_pos[window].append(genome_idx)
# Find conserved sequences at this position
for ref_seq, ref_genomes in sequences_at_pos.items():
if len(ref_genomes) == 1: # Only check if not already highly represented
continue
conserved_genomes = set(ref_genomes)
# Check other sequences for similarity
for other_seq, other_genomes in sequences_at_pos.items():
if other_seq != ref_seq:
mismatches = count_mismatches_fast(ref_seq, other_seq)
if mismatches <= max_mismatches:
conserved_genomes.update(other_genomes)
if len(conserved_genomes) >= len(genomes) * 0.8: # 80% threshold
conservation_pct = (len(conserved_genomes) / len(genomes)) * 100
position_data[pos] = {
'sequence': ref_seq,
'genomes': conserved_genomes,
'count': len(conserved_genomes),
'percentage': conservation_pct
}
return position_data
def find_conserved_regions_optimized(genomes, window_size=92, top_n=50, min_conservation_percent=80, max_mismatch_percent=5):
"""Optimized version using multiprocessing and smarter algorithms."""
print(f"Analyzing conserved regions (OPTIMIZED VERSION)...")
print(f"Using {multiprocessing.cpu_count()} CPU cores")
if not genomes:
return []
min_length = min(len(genome) for genome in genomes)
max_positions = min_length - window_size + 1
max_mismatches = int(window_size * max_mismatch_percent / 100)
print(f"Analyzing {max_positions} positions with up to {max_mismatches} mismatches allowed")
# Split work into batches for multiprocessing
batch_size = max(1, max_positions // (multiprocessing.cpu_count() * 4))
batches = []
for start in range(0, max_positions, batch_size):
end = min(start + batch_size, max_positions)
batches.append((genomes, start, end, window_size, max_mismatches))
print(f"Processing {len(batches)} batches...")
# Process batches in parallel
all_conserved = []
with multiprocessing.Pool() as pool:
results = pool.map(analyze_position_batch, batches)
# Combine results
for batch_result in results:
for pos, data in batch_result.items():
all_conserved.append((
data['sequence'],
data['count'],
data['percentage'],
data['genomes'],
pos,
list(data['genomes'])[0] # Reference genome
))
# Sort by conservation percentage
all_conserved.sort(key=lambda x: (x[2], x[1]), reverse=True)
print(f"Found {len(all_conserved)} conserved regions")
if all_conserved:
print(f"Top conservation: {all_conserved[0][2]:.2f}%")
return all_conserved[:top_n]
def find_conserved_regions_simple(genomes, genome_ids, window_size=140, top_n=10, sample_positions=1000):
"""Even simpler version - just sample positions instead of checking all."""
print(f"Using SIMPLE SAMPLING approach - checking {sample_positions} positions...")
if not genomes:
return []
min_length = min(len(genome) for genome in genomes)
max_positions = min_length - window_size + 1
# Sample positions instead of checking all
import random
positions_to_check = random.sample(range(max_positions), min(sample_positions, max_positions))
conserved_regions = []
total_genomes = len(genomes) # Get total genomes once
# Set max mismatches to exactly 7
max_mismatches = 7
for i, pos in enumerate(positions_to_check):
if (i + 1) % 100 == 0:
print(f"Processed {i + 1}/{len(positions_to_check)} positions...")
# Get sequence from first genome as reference
ref_seq = genomes[0][pos:pos + window_size]
if len(ref_seq) != window_size or not all(nt in 'ATCG' for nt in ref_seq):
continue
# Count how many genomes have exactly this sequence and similar matches (<= max_mismatches)
exact_matches = 0
similar_matches = 0
exact_match_genome_ids = [] # Store IDs of genomes with exact matches
for genome_idx, genome in enumerate(genomes):
if pos + window_size <= len(genome):
genome_seq = genome[pos:pos + window_size]
if genome_seq == ref_seq:
exact_matches += 1
similar_matches += 1
exact_match_genome_ids.append(genome_ids[genome_idx]) # Store the ID
else:
# Count mismatches
mismatches = sum(1 for j in range(window_size) if ref_seq[j] != genome_seq[j])
if mismatches <= max_mismatches: # Allow up to max_mismatches (7)
similar_matches += 1
conservation_pct = (similar_matches / total_genomes) * 100 # Use total_genomes
exact_match_pct = (exact_matches / total_genomes) * 100 # Calculate exact match percentage
if conservation_pct >= 80: # 80% threshold based on >= 7 mismatches
conserved_regions.append((
ref_seq,
similar_matches,
conservation_pct,
exact_matches, # Add exact match count
exact_match_pct, # Add exact match percentage
exact_match_genome_ids, # Add list of exact match genome IDs
set(range(similar_matches)), # Simplified genome set for similar matches
pos,
0 # Reference genome index
))
# Sort by conservation (>= 7 mismatches)
conserved_regions.sort(key=lambda x: x[2], reverse=True)
print(f"Found {len(conserved_regions)} conserved regions from sampling (based on <= {max_mismatches} mismatches)")
return conserved_regions[:top_n]
def main():
"""Main function with option to use different algorithms."""
url = "https://hbvdb.lyon.inserm.fr/data/nucleic/fasta/all_Genomes.fas"
# Configuration
window_size = 140
top_n = 50
min_conservation_percent = 80 # Only consider sequences conserved in at least 80% of genomes
sample_positions = 500 # Number of positions to sample in simple mode
# Download and load genomes
fasta_file = download_hbv_genomes(url)
if not fasta_file:
print("Failed to download file")
return
genomes, genome_ids = load_genomes(fasta_file)
if not genomes:
print("No genomes loaded")
return
print(f"Loaded {len(genomes)} genomes")
# Choose algorithm based on dataset size
if len(genomes) > 100:
print("Large dataset detected - using sampling approach")
# Pass genome_ids and updated window_size to simple function
conserved_windows = find_conserved_regions_simple(genomes, genome_ids, window_size=window_size, sample_positions=sample_positions)
else:
print("Using optimized multiprocessing approach")
# Pass updated window_size to optimized function
conserved_windows = find_conserved_regions_optimized(genomes, window_size=window_size) # Optimized version doesn't currently calculate exact matches
if not conserved_windows:
print("No conserved regions found")
return
# Print results
print(f"\nTOP {len(conserved_windows)} MOST CONSERVED REGIONS:")
print("=" * 60)
# Modified printing loop to include exact match information and genome IDs
for i, (seq, similar_count, similar_pct, exact_count, exact_pct, exact_ids, genome_set, pos, ref_idx) in enumerate(conserved_windows):
print(f"\nRank {i+1}:")
print(f"Position: {pos}-{pos + window_size - 1}") # Adjust position range printout
print(f"Conservation (<= 7 Mismatches): {similar_count}/{len(genomes)} genomes ({similar_pct:.2f}%)") # Updated printout
print(f"Exact Match Conservation: {exact_count}/{len(genomes)} genomes ({exact_pct:.2f}%)")
# Print exact match genome IDs, limiting the number for readability
print(f"Exact Match Genomes: [{', '.join(exact_ids[:5])}{', ...' if len(exact_ids) > 5 else ''}]")
print(f"Sequence: {seq}")
gc_count = seq.count('G') + seq.count('C')
gc_pct = (gc_count / len(seq)) * 100
print(f"GC content: {gc_pct:.1f}%")
print("-" * 40)
if __name__ == "__main__":
try:
import numpy as np
main()
except ImportError:
print("Installing numpy...")
import subprocess
import sys
subprocess.check_call([sys.executable, "-m", "pip", "install", "numpy"])
import numpy as np
main()
Program 5: Conservation Visualization Dashboard
Google Colab notebook for interactive visualization of conserved HBV sequences
This standalone visualization tool creates an interactive dashboard to display the genomic positions of conserved 140 base pair sequences across the most common HBV genotypes. It operates independently from other programs and can visualize any set of conserved sequences against HBV reference genomes, helping identify optimal target regions for therapeutic design.
- Interactive visualization - generates dynamic plots with plotly
- Multi-genotype comparison - displays sequences across HBV genotypes A-H
- Position mapping - shows exact genomic coordinates of conserved regions
- Gene annotation overlay - displays HBV gene boundaries and features
- Conservation heatmaps - visualizes sequence conservation patterns
- Export capabilities - saves publication-ready figures
- Custom sequence input - allows user-defined sequences for analysis
Input: Conserved sequences from Program 4 or custom sequences
Output: Interactive visualizations of sequence positions across HBV genotypes
Requirements: Google Colab environment with plotly, pandas, biopython
Visualization Components
- Genomic Position Plot - Shows where conserved sequences map across different genotypes
- Gene Annotation Track - Displays HBV genes (PreC, C, X, PreS1, PreS2, S, P, SP)
- Conservation Heatmap - Visualizes degree of conservation at each position
- Interactive Dashboard - Allows zooming and exploration of specific regions
Applications in Our Project
- Target validation - Visualized conserved regions identified in Program 4
- Genotype comparison - Confirmed sequence conservation across HBV variants
- Gene region analysis - Identified which genes contain highest conservation
- Therapeutic design - Selected optimal target positions for RADAR sensors
Complete Embedded Code
# HBV Conservation Visualization Dashboard
# Interactive visualization of conserved sequences across HBV genotypes
# ====================================================================
# CELL 1: SETUP AND INSTALLATIONS
# ====================================================================
!pip install plotly biopython pandas numpy matplotlib seaborn
import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots
import pandas as pd
import numpy as np
from Bio import SeqIO
import matplotlib.pyplot as plt
import seaborn as sns
from collections import defaultdict
print("Setup completed! Ready for visualization.")
# ====================================================================
# CELL 2: LOAD HBV REFERENCE GENOMES
# ====================================================================
from google.colab import files
print("Upload HBV reference genomes (FASTA format)")
print("Or use default genotypes A-H")
# Option to upload custom genomes
uploaded = files.upload()
# HBV gene coordinates (standard X02763 reference)
HBV_GENES = {
'PreC': (1814, 2458),
'C': (1901, 2458),
'X': (1374, 1838),
'PreS1': [(2854, 3221), (1, 835)],
'PreS2': [(3211, 3221), (1, 835)],
'S': (155, 835),
'P': [(2307, 3221), (1, 1623)],
'SP': [(2307, 2453), (489, 683)]
}
def load_reference_genomes(fasta_file):
"""Load HBV reference genomes"""
genomes = {}
for record in SeqIO.parse(fasta_file, "fasta"):
genotype = record.id.split('_')[0]
genomes[genotype] = str(record.seq).upper()
return genomes
print("Reference genomes loaded successfully")
# ====================================================================
# CELL 3: INPUT CONSERVED SEQUENCES
# ====================================================================
# Input conserved sequences from Program 4 or custom sequences
conserved_sequences = []
print("Enter conserved sequences to visualize:")
print("Format: One sequence per line, or upload CSV file")
print()
# Option 1: Manual input
manual_input = input("Enter sequences manually? (y/n): ").lower()
if manual_input == 'y':
print("Enter sequences (one per line, empty line to finish):")
while True:
seq = input().strip().upper()
if not seq:
break
if all(nt in 'ATCG' for nt in seq):
conserved_sequences.append(seq)
else:
print("Invalid sequence - only ATCG allowed")
# Option 2: Upload CSV
else:
print("Upload CSV file with 'sequence' column:")
uploaded = files.upload()
for filename in uploaded.keys():
df = pd.read_csv(filename)
if 'sequence' in df.columns:
conserved_sequences = df['sequence'].tolist()
print(f"Loaded {len(conserved_sequences)} conserved sequences")
# ====================================================================
# CELL 4: MAP SEQUENCES TO GENOMES
# ====================================================================
def find_sequence_positions(sequence, genome, max_mismatches=7):
"""Find all positions where sequence occurs in genome"""
positions = []
seq_len = len(sequence)
for i in range(len(genome) - seq_len + 1):
genome_segment = genome[i:i + seq_len]
# Count mismatches
mismatches = sum(1 for j in range(seq_len)
if sequence[j] != genome_segment[j])
if mismatches <= max_mismatches:
positions.append({
'position': i,
'mismatches': mismatches,
'match_type': 'exact' if mismatches == 0 else 'similar'
})
return positions
def map_all_sequences(conserved_sequences, genomes):
"""Map all conserved sequences to all genomes"""
mapping_data = []
for seq_idx, sequence in enumerate(conserved_sequences):
for genotype, genome in genomes.items():
positions = find_sequence_positions(sequence, genome)
for pos_data in positions:
mapping_data.append({
'sequence_id': f"Seq_{seq_idx + 1}",
'sequence': sequence,
'genotype': genotype,
'position': pos_data['position'],
'position_end': pos_data['position'] + len(sequence),
'mismatches': pos_data['mismatches'],
'match_type': pos_data['match_type']
})
return pd.DataFrame(mapping_data)
# Perform mapping
print("Mapping sequences to genomes...")
mapping_df = map_all_sequences(conserved_sequences, reference_genomes)
print(f"Found {len(mapping_df)} total matches across genomes")
# ====================================================================
# CELL 5: CREATE INTERACTIVE POSITION PLOT
# ====================================================================
def create_position_plot(mapping_df, genes):
"""Create interactive plot showing sequence positions"""
fig = make_subplots(
rows=2, cols=1,
row_heights=[0.8, 0.2],
subplot_titles=("Conserved Sequence Positions", "HBV Gene Map"),
vertical_spacing=0.1
)
# Plot 1: Sequence positions
for genotype in mapping_df['genotype'].unique():
genotype_data = mapping_df[mapping_df['genotype'] == genotype]
# Exact matches
exact = genotype_data[genotype_data['match_type'] == 'exact']
if not exact.empty:
fig.add_trace(
go.Scatter(
x=exact['position'],
y=[genotype] * len(exact),
mode='markers',
name=f'{genotype} (exact)',
marker=dict(size=10, symbol='circle'),
hovertemplate='%{y}
' +
'Position: %{x}
' +
'Exact match '
),
row=1, col=1
)
# Similar matches
similar = genotype_data[genotype_data['match_type'] == 'similar']
if not similar.empty:
fig.add_trace(
go.Scatter(
x=similar['position'],
y=[genotype] * len(similar),
mode='markers',
name=f'{genotype} (≤7 diff)',
marker=dict(size=8, symbol='triangle-up'),
hovertemplate='%{y}
' +
'Position: %{x}
' +
'Mismatches: %{customdata} ',
customdata=similar['mismatches']
),
row=1, col=1
)
# Plot 2: Gene annotations
colors = ['#FF6B6B', '#4ECDC4', '#45B7D1', '#96CEB4',
'#FFEAA7', '#DDA0DD', '#98D8C8', '#F7DC6F']
for idx, (gene, regions) in enumerate(genes.items()):
color = colors[idx % len(colors)]
if isinstance(regions, tuple):
# Single region
start, end = regions
fig.add_trace(
go.Scatter(
x=[start, end],
y=[0, 0],
mode='lines',
name=gene,
line=dict(color=color, width=10),
hovertemplate=f'{gene}
' +
f'Position: {start}-{end} '
),
row=2, col=1
)
else:
# Split regions
for start, end in regions:
fig.add_trace(
go.Scatter(
x=[start, end],
y=[0, 0],
mode='lines',
name=gene,
line=dict(color=color, width=10),
showlegend=False,
hovertemplate=f'{gene}
' +
f'Position: {start}-{end} '
),
row=2, col=1
)
# Update layout
fig.update_xaxes(title_text="Genome Position (bp)", row=2, col=1)
fig.update_yaxes(title_text="Genotype", row=1, col=1)
fig.update_yaxes(showticklabels=False, row=2, col=1)
fig.update_layout(
height=800,
title_text="HBV Conserved Sequence Visualization Dashboard",
hovermode='closest'
)
return fig
# Create and display plot
print("Creating interactive visualization...")
fig = create_position_plot(mapping_df, HBV_GENES)
fig.show()
# ====================================================================
# CELL 6: CONSERVATION HEATMAP
# ====================================================================
def create_conservation_heatmap(mapping_df):
"""Create heatmap showing conservation across genotypes"""
# Create pivot table
heatmap_data = mapping_df.pivot_table(
index='sequence_id',
columns='genotype',
values='position',
aggfunc='count',
fill_value=0
)
# Create heatmap
fig = go.Figure(data=go.Heatmap(
z=heatmap_data.values,
x=heatmap_data.columns,
y=heatmap_data.index,
colorscale='Viridis',
hovertemplate='Sequence: %{y}
' +
'Genotype: %{x}
' +
'Matches: %{z} '
))
fig.update_layout(
title='Conservation Heatmap Across HBV Genotypes',
xaxis_title='Genotype',
yaxis_title='Sequence ID',
height=600
)
return fig
# Create and display heatmap
print("Creating conservation heatmap...")
heatmap_fig = create_conservation_heatmap(mapping_df)
heatmap_fig.show()
# ====================================================================
# CELL 7: SUMMARY STATISTICS
# ====================================================================
def generate_summary_stats(mapping_df):
"""Generate summary statistics"""
print("="*60)
print("CONSERVATION SUMMARY STATISTICS")
print("="*60)
# Overall statistics
total_sequences = mapping_df['sequence_id'].nunique()
total_genotypes = mapping_df['genotype'].nunique()
total_matches = len(mapping_df)
print(f"\nTotal sequences analyzed: {total_sequences}")
print(f"Total genotypes: {total_genotypes}")
print(f"Total matches found: {total_matches}")
# Per-sequence statistics
print("\nPer-sequence conservation:")
for seq_id in mapping_df['sequence_id'].unique():
seq_data = mapping_df[mapping_df['sequence_id'] == seq_id]
genotypes_found = seq_data['genotype'].nunique()
exact_matches = len(seq_data[seq_data['match_type'] == 'exact'])
print(f"\n{seq_id}:")
print(f" Found in {genotypes_found}/{total_genotypes} genotypes")
print(f" Exact matches: {exact_matches}")
print(f" Similar matches: {len(seq_data) - exact_matches}")
# Per-genotype statistics
print("\n" + "="*60)
print("Per-genotype statistics:")
for genotype in mapping_df['genotype'].unique():
gen_data = mapping_df[mapping_df['genotype'] == genotype]
sequences_found = gen_data['sequence_id'].nunique()
print(f"\n{genotype}:")
print(f" Sequences found: {sequences_found}/{total_sequences}")
print(f" Total matches: {len(gen_data)}")
print(f" Average mismatches: {gen_data['mismatches'].mean():.2f}")
# Generate statistics
generate_summary_stats(mapping_df)
# ====================================================================
# CELL 8: EXPORT RESULTS
# ====================================================================
def export_results(mapping_df, figures):
"""Export results and figures"""
# Save mapping data
mapping_df.to_csv('conservation_mapping.csv', index=False)
print("Saved: conservation_mapping.csv")
# Save figures
for idx, fig in enumerate(figures):
fig.write_html(f'conservation_plot_{idx + 1}.html')
print(f"Saved: conservation_plot_{idx + 1}.html")
# Save summary report
with open('conservation_summary.txt', 'w') as f:
f.write("HBV CONSERVATION ANALYSIS SUMMARY\n")
f.write("="*60 + "\n\n")
# Write statistics
f.write(f"Total sequences: {mapping_df['sequence_id'].nunique()}\n")
f.write(f"Total genotypes: {mapping_df['genotype'].nunique()}\n")
f.write(f"Total matches: {len(mapping_df)}\n\n")
# Write per-sequence data
for seq_id in mapping_df['sequence_id'].unique():
seq_data = mapping_df[mapping_df['sequence_id'] == seq_id]
f.write(f"\n{seq_id}:\n")
f.write(f" Sequence: {seq_data.iloc[0]['sequence']}\n")
f.write(f" Found in {seq_data['genotype'].nunique()} genotypes\n")
f.write(f" Positions: {', '.join(map(str, seq_data['position'].unique()))}\n")
print("Saved: conservation_summary.txt")
# Download files
files.download('conservation_mapping.csv')
files.download('conservation_summary.txt')
# Export all results
print("Exporting results...")
export_results(mapping_df, [fig, heatmap_fig])
print("Export completed!")
Program 6: Plasmid Design & Sequence Modification Tool
Google Colab notebook for automated plasmid design and sensor sequence modification
This tool was specifically developed for our RADAR sensor design process. It processes raw DNA sequences from NCBI gene banks (like GPC3 from NM_004484.3) and automatically modifies them to meet sensor design criteria. The tool performs codon-level analysis, sequence manipulation, and automated editing to generate functional sensor constructs suitable for our synthetic biology applications.
- NCBI sequence processing - automatically handles gene bank sequences
- RADAR sensor optimization - modifies sequences to meet sensor design criteria
- Codon-level analysis - converts DNA to codon triplets for precise editing
- Sequence manipulation - reverse and complement operations for sensor design
- Smart codon editing - replaces stop codons and start codons with functional alternatives
- TGG targeting - identifies and modifies tryptophan codons for sensor optimization
- Plasmid-ready output - generates sequences suitable for synthetic biology applications
Input: NCBI gene bank sequences (e.g., GPC3 from NM_004484.3)
Output: RADAR sensor-optimized sequences ready for plasmid design
Requirements: Google Colab environment with Python
Key Functions
clean_sequence()- Removes non-alphabetic characters from input DNAto_codons()- Converts DNA sequence to codon triplets, handling remainder basesmake_sequence()- Creates spaced sequence format for readabilityreverse_sequence()- Reverses the DNA sequencereverse_complement()- Generates reverse complement (A↔T, C↔G)sequence_edit()- Optimizes sequence by replacing stop codons and start codons
Applications in Our Project
- RADAR sensor design - Modified GPC3 sequences from NCBI for our sensor constructs
- Plasmid construction - Generated sequences ready for synthetic biology applications
- HBx sensor optimization - Applied same methodology to Hepatitis B virus sequences
- Wet lab integration - Prepared sequences for experimental validation in our lab work
Complete Embedded Code
def main():
# Input DNA sequence below
dna = "" # the sequence you want to be made into a sensor seq
dna = clean_sequence(dna)
codons = to_codons(dna)
sequence = make_sequence(codons)
print("Spaced sequence:", sequence)
reverse = reverse_sequence(sequence)
print("Reversed:", reverse)
complement = reverse_complement(reverse)
print("Reverse complement:", complement)
edited_sequence = sequence_edit(complement)
print("Edited sequence:", edited_sequence)
def clean_sequence(seq):
"""Remove non-alphabetic characters from DNA sequence"""
cleaned = ""
for char in seq:
if char.isalpha():
cleaned += char
return cleaned
def to_codons(seq):
"""Convert DNA sequence to codon triplets"""
codons = []
remainder = len(seq) % 3
print(f"{remainder} letters were excluded from your sequence.")
length = len(seq) - remainder
i = 0
while i + 2 < length:
codon = seq[i] + seq[i+1] + seq[i+2]
codons.append(codon)
i = i + 3
return codons
def make_sequence(codons):
"""Create spaced sequence from codon list"""
spaced_sequence = codons[0]
for i in range(1, len(codons)):
spaced_sequence = spaced_sequence + " " + codons[i]
return spaced_sequence
def reverse_sequence(sequence):
"""Reverse the DNA sequence"""
reverse = ""
for i in range(len(sequence) - 1, -1, -1): # start at last index, go to 0
reverse += sequence[i]
return reverse
def reverse_complement(reverse):
"""Generate reverse complement of DNA sequence"""
complement = ""
for char in reverse:
if char.lower() == "a":
complement += "t"
elif char.lower() == "t":
complement += "a"
elif char.lower() == "c":
complement += "g"
elif char.lower() == "g":
complement += "c"
else:
complement += char
return complement
def sequence_edit(complement):
"""Edit sequence to optimize for sensor applications"""
current_sequence = complement.split(" ")
new_sequence = current_sequence[:]
tgg_indices = [i for i, codon in enumerate(current_sequence) if codon.lower() == "tgg"]
if tgg_indices:
middle_index = tgg_indices[len(tgg_indices) // 2]
new_sequence[middle_index] = "uag"
for i in range(len(new_sequence)):
codon = new_sequence[i].lower()
if not (tgg_indices and i == middle_index):
if codon == "taa":
new_sequence[i] = "gaa"
elif codon == "tag":
new_sequence[i] = "gag"
elif codon == "tga":
new_sequence[i] = "gga"
elif codon == "atg":
new_sequence[i] = "agg"
else:
new_sequence[i] = codon
edited_sequence = make_sequence(new_sequence)
return edited_sequence
main()
Program 7: Flow Cytometry Data Analysis
Google Colab notebook for statistical analysis and visualization of flow cytometry data
This tool was specifically developed for analyzing our RADAR sensor flow cytometry experiments. It processes flow cytometry data from HBX, AKR1B10, GPC3, and AND-gated experiments to perform statistical analysis and generate annotated plots. The program uses Python-based packages to assess statistical significance and visualize fold changes across experimental replicates.
- Statistical analysis - performs t-tests on replicate datasets
- Annotated plotting - generates publication-ready visualizations
- Fold change calculation - quantifies experimental results
- Replicate handling - processes multiple experimental conditions
- Python-based analysis - uses statannotations, matplotlib, and pandas
- Standardized gating - maintains consistency across samples
- Background correction - accounts for unstained controls and bead samples
Input: Flow cytometry data from RADAR sensor experiments
Output: Statistical analysis and annotated plots
Requirements: Google Colab environment with Python packages
Analysis Workflow
Our flow cytometry analysis follows a standardized sequential gating strategy:
- Debris removal - Gate on FSC-A vs SSC-A to exclude debris
- Doublet removal - Gate on FSC-A vs FSC-H to retain only singlets
- Fluorescence gating - Distinguish positive from negative events
- Laser-specific analysis - mCherry (G610-A) and GFP (B525-A) detection
- Population identification - Define gates for high B525-A fluorescence
- Statistical analysis - Calculate fold changes and significance
Applications in Our Project
- RADAR sensor validation - Analyzed HBX, AKR1B10, and GPC3 sensor experiments
- AND-gate characterization - Evaluated double-positive cell populations
- Statistical validation - Performed t-tests on replicate datasets
- Publication preparation - Generated annotated plots for results presentation
Complete Embedded Code
# -*- coding: utf-8 -*-
"""RADARoutput_plotting_template.ipynb
Automatically generated by Colab.
Original file is located at
https://colab.research.google.com/drive/1lti9eKx7lz_DEt0c3L70LSz1aiHnVvHw
"""
#### IMPORT NECESSARY PYTHON PACKAGES ###
!pip install statannotations
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
from IPython.display import display
from statannotations.Annotator import Annotator
"""**Now we need to upload the data file into the colab. Run the code below and then upload the csv file with your data**"""
from google.colab import files
# Upload a file from local computer
uploaded = files.upload()
data = pd.read_csv('sample raw data for plotting - Sheet1.csv')
# print out data to see what we are working with
data.head()
# Calculate the average of the three 'normalized GFP output' columns and create a new column 'Average GFP Output'
data['Average GFP Output'] = data[['normalized GFP output', 'normalized GFP output.1', 'normalized GFP output.2']].mean(axis=1)
# Reshape the data for plotting
data_melted = data.melt(id_vars=['Unnamed: 0'], value_vars=['normalized GFP output', 'normalized GFP output.1', 'normalized GFP output.2'], var_name='Replicate', value_name='GFP Output')
# Create the figure and axes
fig, ax = plt.subplots(figsize=(5, 5))
# Create the swarm plot
sns.swarmplot(x='Unnamed: 0', y='GFP Output', data=data_melted, ax=ax, color='gray')
# Add markers for the average GFP output
sns.pointplot(x='Unnamed: 0', y='Average GFP Output', data=data, ax=ax, color='black', linestyles='', marker='_', markersize=20, markeredgewidth=3)
# Set labels and title
ax.set_xlabel('Sample')
ax.set_ylabel('GFP Output (normalized to background)')
# Rotate x-axis labels for better readability
plt.xticks(rotation=90, ha='right')
# Display the plot
plt.tight_layout()
plt.show()
"""# Task
Create a new column in the dataframe `data` that is the average of columns 2, 3, and 4. Generate a swarm plot using columns 2, 3, and 4, with each replicate plotted across the row. Add a bold dash marker representing the average of the three replicates for each row. Create a second plot using the same data, perform independent t-tests comparing the first and second samples, and the third and fourth samples. Display the statistical test results as stars on the second plot and print the p-values for these comparisons.
"""
# Reshape the data for plotting
data_melted = data.melt(id_vars=['Unnamed: 0'], value_vars=['normalized GFP output', 'normalized GFP output.1', 'normalized GFP output.2'], var_name='Replicate', value_name='GFP Output')
# Confirm the structure of the melted DataFrame by displaying its head
display(data_melted.head())
# Create the figure and axes
fig, ax = plt.subplots(figsize=(5, 5))
# Create the swarm plot
sns.swarmplot(x='Unnamed: 0', y='GFP Output', data=data_melted, ax=ax, color='gray')
# Add markers for the average GFP output
sns.pointplot(x='Unnamed: 0', y='Average GFP Output', data=data, ax=ax, color='black', linestyles='', marker='_', markersize=20, markeredgewidth=3)
# Set labels and title
ax.set_xlabel('Sample')
ax.set_ylabel('GFP Output (normalized to background)')
# Rotate x-axis labels for better readability
plt.xticks(rotation=90, ha='right')
# Adjust layout
plt.tight_layout()
# Display the plot
plt.show()
"""## Define pairs for statistical testing
### Subtask:
Specify the pairs of samples to be compared using independent t-tests.
**Reasoning**:
Create a list of tuples specifying the pairs of samples for independent t-tests based on the 'Unnamed: 0' column in the `data` DataFrame.
"""
# Get the sample names from the 'Unnamed: 0' column
sample_names = data['Unnamed: 0'].tolist()
# Specify the pairs for comparison
pairs = [(sample_names[0], sample_names[1]), (sample_names[2], sample_names[3])]
# Print the pairs to verify
print(pairs)
"""## Add statistical annotations to the plot
### Subtask:
Use `statannotations` to add significance stars to the specified pairs on the plot.
**Reasoning**:
Create an Annotator object, set the statistical test, and apply the annotations to the plot.
"""
# Create an Annotator object
annotator = Annotator(ax, pairs, data=data_melted, x='Unnamed: 0', y='GFP Output')
# Configure the statistical test
annotator.configure(test='t-test_ind', text_format='star', loc='outside')
# Run the test and add annotations
annotator.apply_and_annotate()
# Display the plot with annotations
fig
"""## Perform statistical tests and print p-values
### Subtask:
Manually perform independent t-tests for the specified pairs and print the resulting p-values.
**Reasoning**:
Perform independent t-tests for the specified pairs and print the resulting p-values.
"""
from scipy.stats import ttest_ind
# Filter data for the first pair (complete match -trigger vs complete match +trigger)
complete_match_minus_trigger = data_melted[data_melted['Unnamed: 0'] == 'complete match -trigger']['GFP Output']
complete_match_plus_trigger = data_melted[data_melted['Unnamed: 0'] == 'complete match +trigger']['GFP Output']
# Perform independent t-test for the first pair
ttest_complete = ttest_ind(complete_match_minus_trigger, complete_match_plus_trigger)
# Print the p-value for the first pair
print(f"P-value for 'complete match -trigger' vs 'complete match +trigger': {ttest_complete.pvalue}")
# Filter data for the second pair (partial match -trigger vs partial match +trigger)
partial_match_minus_trigger = data_melted[data_melted['Unnamed: 0'] == 'partial match -trigger']['GFP Output']
partial_match_plus_trigger = data_melted[data_melted['Unnamed: 0'] == 'partial match +trigger']['GFP Output']
# Perform independent t-test for the second pair
ttest_partial = ttest_ind(partial_match_minus_trigger, partial_match_plus_trigger)
# Print the p-value for the second pair
print(f"P-value for 'partial match -trigger' vs 'partial match +trigger': {ttest_partial.pvalue}")
"""## Summary:
### Data Analysis Key Findings
* The average GFP output for each sample was calculated and added as a new column (`Average GFP Output`) to the `data` DataFrame.
* A swarm plot was successfully generated displaying individual data points and the average GFP output for each sample, with replicate data plotted across rows.
* Independent t-tests were performed comparing 'complete match -trigger' with 'complete match +trigger' and 'partial match -trigger' with 'partial match +trigger'.
* The p-value for the comparison between 'complete match -trigger' and 'complete match +trigger' was approximately 0.0174.
* The p-value for the comparison between 'partial match -trigger' and 'partial match +trigger' was approximately 0.0185.
* Significance stars based on the t-test results were successfully added to the plot.
### Insights or Next Steps
* Both comparisons showed statistically significant differences in GFP output (p < 0.05), suggesting that the "+trigger" condition has a significant impact on GFP expression for both complete and partial matches.
* Further investigation into the magnitude of the GFP output change and potential biological implications of the observed differences between complete and partial matches under the "+trigger" condition could be valuable.
"""
Using the Programs
Each program is designed to run independently with its own inputs and outputs
While these six programs can work together as part of an integrated workflow, each is a standalone tool that can be used independently for specific analysis tasks. You can use any single program without running the others.
- Program 1: Use this to analyze any gene expression dataset with TPM values, regardless of whether you plan to do alignment analysis
- Program 2: Submit this as a standalone HPC job to align any RNA-seq reads to HBV genome, independent of expression analysis
- Program 3: Upload any BAM alignment files to this Colab notebook for conservation analysis, whether they came from Program 2 or elsewhere
- Program 4: Run this anytime to check conservation of candidate sequences against global HBV diversity, no other programs required
- Program 5: Visually compare candidate sequences across HBV genomes
- Program 6: Use this to modify NCBI sequences for RADAR sensor design, independent of other analysis tools
- Program 7: Analyze flow cytometry data from any RADAR sensor experiments, no other programs required