Project: Sentiment Analysis Pipeline
1. Project Overview
Project Goal
Xây dựng complete sentiment analysis pipeline từ data collection → preprocessing → training → deployment, sử dụng các kỹ thuật đã học trong course.
1.1 Business Context
Scenario: Bạn là Data Engineer tại một e-commerce company. Team cần hệ thống tự động phân tích sentiment của customer reviews để:
- Monitor product quality
- Identify issues quickly
- Improve customer experience
- Generate insights cho business team
1.2 Project Requirements
| Component | Description |
|---|---|
| Data Source | E-commerce product reviews |
| Processing | Batch + Real-time capability |
| Model | Multi-class sentiment (Positive/Neutral/Negative) |
| Output | Dashboard-ready insights |
| Scale | Handle 100K+ reviews |
1.3 Architecture
Text
1┌────────────────────────────────────────────────────────────────────┐2│ Sentiment Analysis Architecture │3├────────────────────────────────────────────────────────────────────┤4│ │5│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │6│ │ Raw Data │────▶│ ETL Layer │────▶│ Feature │ │7│ │ (Reviews) │ │ (Spark) │ │ Store │ │8│ └─────────────┘ └─────────────┘ └──────┬──────┘ │9│ │ │10│ ┌─────────────┐ ┌─────────────┐ ┌──────▼──────┐ │11│ │ Model │◀────│ Training │◀────│ Processed │ │12│ │ Registry │ │ Pipeline │ │ Data │ │13│ └──────┬──────┘ └─────────────┘ └─────────────┘ │14│ │ │15│ ┌──────▼──────┐ ┌─────────────┐ ┌─────────────┐ │16│ │ Serving │────▶│ Predictions│────▶│ Dashboard │ │17│ │ Layer │ │ API/Batch │ │ Analytics │ │18│ └─────────────┘ └─────────────┘ └─────────────┘ │19│ │20└────────────────────────────────────────────────────────────────────┘2. Data Collection & Exploration
2.1 Load Sample Data
Python
1import pandas as pd2import numpy as np3from sklearn.model_selection import train_test_split4import matplotlib.pyplot as plt5import seaborn as sns67# Simulate e-commerce review data8np.random.seed(42)910# Generate sample data (in practice, load from database/files)11n_samples = 100001213reviews_data = {14 'review_id': range(1, n_samples + 1),15 'product_id': np.random.randint(1, 500, n_samples),16 'user_id': np.random.randint(1, 2000, n_samples),17 'rating': np.random.choice([1, 2, 3, 4, 5], n_samples, p=[0.05, 0.1, 0.15, 0.35, 0.35]),18 'review_text': None, # Will generate19 'review_date': pd.date_range('2023-01-01', periods=n_samples, freq='30min')20}2122# Sample review templates23positive_reviews = [24 "Excellent product! Exceeded my expectations. Fast shipping too.",25 "Love it! Great quality and perfect fit. Will buy again.",26 "Amazing value for money. Highly recommend to everyone.",27 "Best purchase I've made this year. Works perfectly.",28 "Super happy with this product. Customer service was great too."29]3031neutral_reviews = [32 "Product is okay, nothing special but works as described.",33 "Average quality. Does the job but could be better.",34 "It's fine for the price. Not amazing, not terrible.",35 "Decent product. Shipping took a while but it arrived.",36 "Good enough for basic use. Won't wow you but works."37]3839negative_reviews = [40 "Terrible quality. Broke after one week. Don't buy!",41 "Very disappointed. Product looks nothing like the pictures.",42 "Waste of money. Customer service is horrible.",43 "Poor quality and late delivery. Want my money back.",44 "Awful experience. Product defective and no refund given."45]4647# Generate reviews based on rating48def generate_review(rating):49 if rating >= 4:50 return np.random.choice(positive_reviews) + " " + f"Rating: {rating}/5"51 elif rating == 3:52 return np.random.choice(neutral_reviews) + " " + f"Rating: {rating}/5"53 else:54 return np.random.choice(negative_reviews) + " " + f"Rating: {rating}/5"5556reviews_data['review_text'] = [generate_review(r) for r in reviews_data['rating']]5758# Create DataFrame59df = pd.DataFrame(reviews_data)6061# Add sentiment labels62def get_sentiment(rating):63 if rating >= 4:64 return 'positive'65 elif rating == 3:66 return 'neutral'67 else:68 return 'negative'6970df['sentiment'] = df['rating'].apply(get_sentiment)7172print(df.head())73print(f"\nDataset shape: {df.shape}")74print(f"\nSentiment distribution:\n{df['sentiment'].value_counts()}")2.2 Exploratory Data Analysis
Python
1import matplotlib.pyplot as plt2import seaborn as sns34fig, axes = plt.subplots(2, 2, figsize=(14, 10))56# Rating distribution7axes[0, 0].bar(df['rating'].value_counts().index, df['rating'].value_counts().values)8axes[0, 0].set_title('Rating Distribution')9axes[0, 0].set_xlabel('Rating')10axes[0, 0].set_ylabel('Count')1112# Sentiment distribution13sentiment_counts = df['sentiment'].value_counts()14axes[0, 1].pie(sentiment_counts.values, labels=sentiment_counts.index, autopct='%1.1f%%')15axes[0, 1].set_title('Sentiment Distribution')1617# Review length distribution18df['review_length'] = df['review_text'].str.len()19axes[1, 0].hist(df['review_length'], bins=30, edgecolor='black')20axes[1, 0].set_title('Review Length Distribution')21axes[1, 0].set_xlabel('Character Count')2223# Reviews over time24daily_reviews = df.groupby(df['review_date'].dt.date).size()25axes[1, 1].plot(daily_reviews.index, daily_reviews.values)26axes[1, 1].set_title('Reviews Over Time')27axes[1, 1].tick_params(axis='x', rotation=45)2829plt.tight_layout()30plt.show()3132# Word frequency analysis33from collections import Counter34import re3536def get_words(text):37 return re.findall(r'\b[a-zA-Z]+\b', text.lower())3839all_words = []40for text in df['review_text']:41 all_words.extend(get_words(text))4243word_freq = Counter(all_words)44print("\nTop 20 Words:")45print(word_freq.most_common(20))3. Data Preprocessing Module
3.1 Text Preprocessor Class
Python
1import re2import string3import nltk4from nltk.tokenize import word_tokenize5from nltk.corpus import stopwords6from nltk.stem import WordNetLemmatizer7from nltk import pos_tag89nltk.download('punkt', quiet=True)10nltk.download('stopwords', quiet=True)11nltk.download('wordnet', quiet=True)12nltk.download('averaged_perceptron_tagger', quiet=True)1314class ReviewPreprocessor:15 """16 Complete text preprocessing pipeline for sentiment analysis.17 """18 19 def __init__(self, 20 remove_stopwords=True, 21 lemmatize=True,22 min_word_length=2):23 self.remove_stopwords = remove_stopwords24 self.lemmatize = lemmatize25 self.min_word_length = min_word_length26 self.stop_words = set(stopwords.words('english'))27 self.lemmatizer = WordNetLemmatizer()28 29 # Add custom stop words30 self.custom_stops = {'rating', 'product', 'would', 'also', 'get', 'got'}31 self.stop_words.update(self.custom_stops)32 33 def clean_text(self, text):34 """Basic text cleaning"""35 if not isinstance(text, str):36 return ""37 38 # Lowercase39 text = text.lower()40 41 # Remove URLs42 text = re.sub(r'http\S+|www\.\S+', '', text)43 44 # Remove HTML tags45 text = re.sub(r'<.*?>', '', text)46 47 # Remove ratings like "Rating: 5/5"48 text = re.sub(r'rating:\s*\d+/\d+', '', text)49 50 # Remove numbers51 text = re.sub(r'\d+', '', text)52 53 # Remove punctuation54 text = text.translate(str.maketrans('', '', string.punctuation))55 56 # Remove extra whitespace57 text = ' '.join(text.split())58 59 return text60 61 def get_wordnet_pos(self, tag):62 """Map POS tag to WordNet POS"""63 if tag.startswith('J'):64 return 'a'65 elif tag.startswith('V'):66 return 'v'67 elif tag.startswith('R'):68 return 'r'69 return 'n'70 71 def process_text(self, text):72 """Full preprocessing pipeline"""73 # Clean74 text = self.clean_text(text)75 76 if not text:77 return ""78 79 # Tokenize80 tokens = word_tokenize(text)81 82 # Filter by length83 tokens = [t for t in tokens if len(t) >= self.min_word_length]84 85 # Remove stop words86 if self.remove_stopwords:87 tokens = [t for t in tokens if t not in self.stop_words]88 89 # Lemmatize with POS90 if self.lemmatize and tokens:91 pos_tags = pos_tag(tokens)92 tokens = [93 self.lemmatizer.lemmatize(word, self.get_wordnet_pos(tag))94 for word, tag in pos_tags95 ]96 97 return ' '.join(tokens)98 99 def process_batch(self, texts):100 """Process multiple texts"""101 return [self.process_text(text) for text in texts]102103104# Initialize and process105preprocessor = ReviewPreprocessor()106107# Process all reviews108df['processed_text'] = preprocessor.process_batch(df['review_text'])109110print("Sample original vs processed:")111for i in range(3):112 print(f"\nOriginal: {df['review_text'].iloc[i]}")113 print(f"Processed: {df['processed_text'].iloc[i]}")4. Feature Engineering
4.1 TF-IDF Features
Python
1from sklearn.feature_extraction.text import TfidfVectorizer2from sklearn.preprocessing import LabelEncoder34class FeatureEngineer:5 """Feature engineering for sentiment analysis"""6 7 def __init__(self, max_features=5000, ngram_range=(1, 2)):8 self.tfidf = TfidfVectorizer(9 max_features=max_features,10 ngram_range=ngram_range,11 min_df=5,12 max_df=0.9513 )14 self.label_encoder = LabelEncoder()15 self.fitted = False16 17 def fit(self, texts, labels):18 """Fit vectorizer and encoder"""19 self.tfidf.fit(texts)20 self.label_encoder.fit(labels)21 self.fitted = True22 return self23 24 def transform_text(self, texts):25 """Transform texts to TF-IDF features"""26 if not self.fitted:27 raise ValueError("FeatureEngineer not fitted. Call fit() first.")28 return self.tfidf.transform(texts)29 30 def transform_labels(self, labels):31 """Transform labels to numeric"""32 return self.label_encoder.transform(labels)33 34 def inverse_transform_labels(self, numeric_labels):35 """Convert numeric labels back to text"""36 return self.label_encoder.inverse_transform(numeric_labels)37 38 def get_feature_names(self):39 """Get feature names"""40 return self.tfidf.get_feature_names_out()414243# Prepare data44X = df['processed_text']45y = df['sentiment']4647# Split data48X_train, X_test, y_train, y_test = train_test_split(49 X, y, test_size=0.2, random_state=42, stratify=y50)5152# Initialize and fit feature engineer53feature_eng = FeatureEngineer(max_features=5000, ngram_range=(1, 2))54feature_eng.fit(X_train, y_train)5556# Transform57X_train_tfidf = feature_eng.transform_text(X_train)58X_test_tfidf = feature_eng.transform_text(X_test)59y_train_enc = feature_eng.transform_labels(y_train)60y_test_enc = feature_eng.transform_labels(y_test)6162print(f"Training features shape: {X_train_tfidf.shape}")63print(f"Test features shape: {X_test_tfidf.shape}")64print(f"Classes: {feature_eng.label_encoder.classes_}")4.2 Additional Features
Python
1import numpy as np23def extract_meta_features(df):4 """Extract additional features from reviews"""5 features = pd.DataFrame()6 7 # Text length features8 features['char_count'] = df['review_text'].str.len()9 features['word_count'] = df['review_text'].str.split().str.len()10 features['avg_word_length'] = features['char_count'] / (features['word_count'] + 1)11 12 # Punctuation features13 features['exclamation_count'] = df['review_text'].str.count('!')14 features['question_count'] = df['review_text'].str.count('\?')15 features['uppercase_ratio'] = df['review_text'].apply(16 lambda x: sum(1 for c in x if c.isupper()) / (len(x) + 1)17 )18 19 # Sentiment lexicon features (simplified)20 positive_words = {'excellent', 'amazing', 'love', 'great', 'best', 'perfect', 'happy'}21 negative_words = {'terrible', 'awful', 'worst', 'hate', 'disappointed', 'waste', 'poor'}22 23 features['positive_word_count'] = df['review_text'].str.lower().apply(24 lambda x: sum(1 for word in x.split() if word in positive_words)25 )26 features['negative_word_count'] = df['review_text'].str.lower().apply(27 lambda x: sum(1 for word in x.split() if word in negative_words)28 )29 features['sentiment_score'] = features['positive_word_count'] - features['negative_word_count']30 31 return features3233# Extract meta features34meta_features = extract_meta_features(df)35print(meta_features.head())5. Model Training
5.1 Train Multiple Models
Python
1from sklearn.linear_model import LogisticRegression2from sklearn.naive_bayes import MultinomialNB3from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier4from sklearn.svm import LinearSVC5from sklearn.metrics import classification_report, accuracy_score, confusion_matrix6import time78class SentimentModelTrainer:9 """Train and evaluate multiple sentiment models"""10 11 def __init__(self):12 self.models = {13 'Logistic Regression': LogisticRegression(max_iter=1000, class_weight='balanced'),14 'Naive Bayes': MultinomialNB(alpha=0.1),15 'Linear SVM': LinearSVC(class_weight='balanced', max_iter=2000),16 'Random Forest': RandomForestClassifier(n_estimators=100, class_weight='balanced', n_jobs=-1)17 }18 self.results = {}19 self.best_model = None20 self.best_score = 021 22 def train_all(self, X_train, y_train, X_test, y_test):23 """Train and evaluate all models"""24 for name, model in self.models.items():25 print(f"\nTraining {name}...")26 start_time = time.time()27 28 # Train29 model.fit(X_train, y_train)30 train_time = time.time() - start_time31 32 # Predict33 y_pred = model.predict(X_test)34 35 # Evaluate36 accuracy = accuracy_score(y_test, y_pred)37 38 self.results[name] = {39 'model': model,40 'accuracy': accuracy,41 'train_time': train_time,42 'predictions': y_pred,43 'report': classification_report(y_test, y_pred, output_dict=True)44 }45 46 print(f" Accuracy: {accuracy:.4f}")47 print(f" Training time: {train_time:.2f}s")48 49 # Track best model50 if accuracy > self.best_score:51 self.best_score = accuracy52 self.best_model = name53 54 print(f"\n🏆 Best Model: {self.best_model} (Accuracy: {self.best_score:.4f})")55 return self56 57 def get_comparison_df(self):58 """Get comparison DataFrame"""59 comparison = []60 for name, result in self.results.items():61 comparison.append({62 'Model': name,63 'Accuracy': result['accuracy'],64 'Train Time (s)': result['train_time'],65 'Precision (macro)': result['report']['macro avg']['precision'],66 'Recall (macro)': result['report']['macro avg']['recall'],67 'F1 (macro)': result['report']['macro avg']['f1-score']68 })69 return pd.DataFrame(comparison).sort_values('Accuracy', ascending=False)70 71 def get_best_model(self):72 """Return best model"""73 return self.results[self.best_model]['model']747576# Train models77trainer = SentimentModelTrainer()78trainer.train_all(X_train_tfidf, y_train_enc, X_test_tfidf, y_test_enc)7980# Get comparison81comparison_df = trainer.get_comparison_df()82print("\n📊 Model Comparison:")83print(comparison_df.to_string(index=False))5.2 Detailed Evaluation
Python
1import matplotlib.pyplot as plt2import seaborn as sns3from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay45def plot_model_evaluation(trainer, y_test, class_names):6 """Visualize model evaluation"""7 fig, axes = plt.subplots(2, 2, figsize=(14, 10))8 9 # 1. Accuracy comparison10 models = list(trainer.results.keys())11 accuracies = [trainer.results[m]['accuracy'] for m in models]12 axes[0, 0].barh(models, accuracies, color='steelblue')13 axes[0, 0].set_xlabel('Accuracy')14 axes[0, 0].set_title('Model Accuracy Comparison')15 axes[0, 0].set_xlim([0, 1])16 for i, v in enumerate(accuracies):17 axes[0, 0].text(v + 0.01, i, f'{v:.3f}', va='center')18 19 # 2. Training time comparison20 train_times = [trainer.results[m]['train_time'] for m in models]21 axes[0, 1].barh(models, train_times, color='coral')22 axes[0, 1].set_xlabel('Time (seconds)')23 axes[0, 1].set_title('Training Time Comparison')24 25 # 3. Confusion matrix for best model26 best_predictions = trainer.results[trainer.best_model]['predictions']27 cm = confusion_matrix(y_test, best_predictions)28 sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', ax=axes[1, 0],29 xticklabels=class_names, yticklabels=class_names)30 axes[1, 0].set_xlabel('Predicted')31 axes[1, 0].set_ylabel('Actual')32 axes[1, 0].set_title(f'Confusion Matrix ({trainer.best_model})')33 34 # 4. F1 scores by class35 f1_scores = {m: [] for m in models}36 for name in models:37 report = trainer.results[name]['report']38 for cls in ['0', '1', '2']:39 if cls in report:40 f1_scores[name].append(report[cls]['f1-score'])41 42 x = np.arange(len(class_names))43 width = 0.244 for i, (name, scores) in enumerate(f1_scores.items()):45 if len(scores) == len(class_names):46 axes[1, 1].bar(x + i*width, scores, width, label=name)47 48 axes[1, 1].set_xlabel('Class')49 axes[1, 1].set_ylabel('F1 Score')50 axes[1, 1].set_title('F1 Score by Class')51 axes[1, 1].set_xticks(x + width * 1.5)52 axes[1, 1].set_xticklabels(class_names)53 axes[1, 1].legend()54 55 plt.tight_layout()56 plt.show()5758# Plot evaluation59class_names = list(feature_eng.label_encoder.classes_)60plot_model_evaluation(trainer, y_test_enc, class_names)6. Production Pipeline
6.1 Complete Pipeline Class
Python
1import pickle2import json3from datetime import datetime45class SentimentPipeline:6 """7 Production-ready sentiment analysis pipeline.8 """9 10 def __init__(self):11 self.preprocessor = None12 self.feature_engineer = None13 self.model = None14 self.metadata = {}15 16 def train(self, df, text_col='review_text', label_col='sentiment'):17 """Train complete pipeline"""18 print("🚀 Training Sentiment Pipeline...")19 20 # 1. Initialize preprocessor21 print(" 1. Initializing preprocessor...")22 self.preprocessor = ReviewPreprocessor()23 24 # 2. Preprocess texts25 print(" 2. Preprocessing texts...")26 processed_texts = self.preprocessor.process_batch(df[text_col])27 28 # 3. Split data29 X_train, X_test, y_train, y_test = train_test_split(30 processed_texts, df[label_col], 31 test_size=0.2, random_state=42, stratify=df[label_col]32 )33 34 # 4. Feature engineering35 print(" 3. Engineering features...")36 self.feature_engineer = FeatureEngineer(max_features=5000)37 self.feature_engineer.fit(X_train, y_train)38 39 X_train_feat = self.feature_engineer.transform_text(X_train)40 X_test_feat = self.feature_engineer.transform_text(X_test)41 y_train_enc = self.feature_engineer.transform_labels(y_train)42 y_test_enc = self.feature_engineer.transform_labels(y_test)43 44 # 5. Train model45 print(" 4. Training model...")46 self.model = LogisticRegression(max_iter=1000, class_weight='balanced')47 self.model.fit(X_train_feat, y_train_enc)48 49 # 6. Evaluate50 y_pred = self.model.predict(X_test_feat)51 accuracy = accuracy_score(y_test_enc, y_pred)52 53 # 7. Save metadata54 self.metadata = {55 'trained_at': datetime.now().isoformat(),56 'train_samples': len(X_train),57 'test_samples': len(X_test),58 'accuracy': float(accuracy),59 'classes': list(self.feature_engineer.label_encoder.classes_),60 'feature_count': X_train_feat.shape[1]61 }62 63 print(f"\n✅ Pipeline trained successfully!")64 print(f" Accuracy: {accuracy:.4f}")65 print(f" Features: {X_train_feat.shape[1]}")66 67 return self68 69 def predict(self, texts):70 """Predict sentiment for new texts"""71 if not isinstance(texts, list):72 texts = [texts]73 74 # Preprocess75 processed = self.preprocessor.process_batch(texts)76 77 # Extract features78 features = self.feature_engineer.transform_text(processed)79 80 # Predict81 predictions = self.model.predict(features)82 probabilities = self.model.predict_proba(features)83 84 # Convert to labels85 labels = self.feature_engineer.inverse_transform_labels(predictions)86 87 # Format results88 results = []89 for i, text in enumerate(texts):90 results.append({91 'text': text[:100] + '...' if len(text) > 100 else text,92 'sentiment': labels[i],93 'confidence': float(max(probabilities[i])),94 'probabilities': {95 cls: float(prob) 96 for cls, prob in zip(self.feature_engineer.label_encoder.classes_, probabilities[i])97 }98 })99 100 return results101 102 def predict_batch(self, df, text_col='review_text'):103 """Batch prediction for DataFrame"""104 results = self.predict(df[text_col].tolist())105 106 df_results = df.copy()107 df_results['predicted_sentiment'] = [r['sentiment'] for r in results]108 df_results['confidence'] = [r['confidence'] for r in results]109 110 return df_results111 112 def save(self, path='sentiment_pipeline.pkl'):113 """Save pipeline to file"""114 with open(path, 'wb') as f:115 pickle.dump({116 'preprocessor': self.preprocessor,117 'feature_engineer': self.feature_engineer,118 'model': self.model,119 'metadata': self.metadata120 }, f)121 print(f"✅ Pipeline saved to {path}")122 123 @classmethod124 def load(cls, path='sentiment_pipeline.pkl'):125 """Load pipeline from file"""126 with open(path, 'rb') as f:127 data = pickle.load(f)128 129 pipeline = cls()130 pipeline.preprocessor = data['preprocessor']131 pipeline.feature_engineer = data['feature_engineer']132 pipeline.model = data['model']133 pipeline.metadata = data['metadata']134 135 print(f"✅ Pipeline loaded from {path}")136 print(f" Trained at: {pipeline.metadata['trained_at']}")137 print(f" Accuracy: {pipeline.metadata['accuracy']:.4f}")138 139 return pipeline140141142# Train and use pipeline143pipeline = SentimentPipeline()144pipeline.train(df)145146# Test predictions147test_reviews = [148 "This is absolutely amazing! Best product ever!",149 "Terrible quality, waste of money. Never buying again.",150 "It's okay, nothing special but does the job."151]152153results = pipeline.predict(test_reviews)154for r in results:155 print(f"\n📝 {r['text']}")156 print(f" Sentiment: {r['sentiment']} (confidence: {r['confidence']:.2f})")157 print(f" Probabilities: {r['probabilities']}")6.2 Spark Integration (Large Scale)
Python
1from pyspark.sql import SparkSession2from pyspark.sql.functions import udf, col3from pyspark.sql.types import StringType, FloatType, StructType, StructField45# Initialize Spark6spark = SparkSession.builder \7 .appName("SentimentAnalysis") \8 .config("spark.sql.adaptive.enabled", "true") \9 .getOrCreate()1011# Broadcast the pipeline for distributed prediction12# (In production, use proper model serving)13pipeline_broadcast = spark.sparkContext.broadcast(pipeline)1415# Define UDF for sentiment prediction16@udf(returnType=StructType([17 StructField("sentiment", StringType(), True),18 StructField("confidence", FloatType(), True)19]))20def predict_sentiment_udf(text):21 if not text:22 return ("unknown", 0.0)23 24 p = pipeline_broadcast.value25 result = p.predict([text])[0]26 return (result['sentiment'], result['confidence'])2728# Load data to Spark DataFrame29spark_df = spark.createDataFrame(df[['review_id', 'review_text', 'sentiment']])3031# Apply predictions32predictions_df = spark_df.withColumn(33 "prediction", 34 predict_sentiment_udf(col("review_text"))35).select(36 "review_id",37 "review_text",38 "sentiment",39 col("prediction.sentiment").alias("predicted"),40 col("prediction.confidence").alias("confidence")41)4243# Show results44predictions_df.show(5, truncate=50)4546# Calculate accuracy47accuracy = predictions_df.filter(48 col("sentiment") == col("predicted")49).count() / predictions_df.count()5051print(f"\nSpark Prediction Accuracy: {accuracy:.4f}")5253# Save results54predictions_df.write.mode("overwrite").parquet("sentiment_predictions.parquet")7. Monitoring & Analytics
7.1 Create Dashboard Metrics
Python
1def generate_analytics_report(df_predictions):2 """Generate analytics report from predictions"""3 4 report = {}5 6 # Overall metrics7 report['total_reviews'] = len(df_predictions)8 report['sentiment_distribution'] = df_predictions['predicted_sentiment'].value_counts().to_dict()9 report['avg_confidence'] = df_predictions['confidence'].mean()10 11 # Time-based analysis12 df_predictions['date'] = pd.to_datetime(df_predictions['review_date']).dt.date13 14 daily_sentiment = df_predictions.groupby(['date', 'predicted_sentiment']).size().unstack(fill_value=0)15 report['daily_sentiment'] = daily_sentiment.to_dict()16 17 # Product analysis18 product_sentiment = df_predictions.groupby(['product_id', 'predicted_sentiment']).size().unstack(fill_value=0)19 product_sentiment['total'] = product_sentiment.sum(axis=1)20 product_sentiment['positive_rate'] = product_sentiment.get('positive', 0) / product_sentiment['total']21 22 # Top products with most negative reviews23 report['products_with_issues'] = product_sentiment.nlargest(10, 'negative')[['negative', 'total', 'positive_rate']].to_dict()24 25 return report2627# Generate report (simulated predictions)28df_with_predictions = df.copy()29df_with_predictions['predicted_sentiment'] = df['sentiment'] # Use actual for demo30df_with_predictions['confidence'] = np.random.uniform(0.7, 0.99, len(df))3132report = generate_analytics_report(df_with_predictions)3334print("📊 Analytics Report")35print("=" * 50)36print(f"Total Reviews: {report['total_reviews']:,}")37print(f"Average Confidence: {report['avg_confidence']:.2%}")38print(f"\nSentiment Distribution:")39for sentiment, count in report['sentiment_distribution'].items():40 print(f" {sentiment}: {count:,} ({count/report['total_reviews']:.1%})")8. Bài tập tổng hợp
Project Challenge
Hoàn thành các tasks sau để có complete sentiment analysis system:
Task 1: Improve Model
Python
1# TODO: Thử nghiệm với:2# 1. Different feature extraction (word embeddings, BERT embeddings)3# 2. Hyperparameter tuning với GridSearchCV4# 3. Ensemble methods5# 4. Handle imbalanced classes better67# YOUR CODE HERETask 2: Add Real-time Capability
Python
1# TODO: Implement real-time prediction với:2# 1. REST API endpoint (Flask/FastAPI)3# 2. WebSocket for streaming predictions4# 3. Caching for frequent queries56# YOUR CODE HERETask 3: Vietnamese Support
Python
1# TODO: Extend pipeline for Vietnamese:2# 1. Use underthesea or pyvi for tokenization3# 2. Create Vietnamese stopwords list4# 3. Find/train Vietnamese word embeddings56# YOUR CODE HERE💡 Hints
Task 1: Model Improvement
Python
1from sklearn.model_selection import GridSearchCV23# Grid search for Logistic Regression4param_grid = {5 'C': [0.1, 1, 10],6 'penalty': ['l1', 'l2'],7 'solver': ['liblinear', 'saga']8}910grid_search = GridSearchCV(11 LogisticRegression(max_iter=1000, class_weight='balanced'),12 param_grid,13 cv=5,14 scoring='f1_macro'15)16grid_search.fit(X_train_tfidf, y_train_enc)17print(f"Best params: {grid_search.best_params_}")Task 2: FastAPI Endpoint
Python
1from fastapi import FastAPI2from pydantic import BaseModel34app = FastAPI()56class ReviewInput(BaseModel):7 text: str89@app.post("/predict")10def predict(review: ReviewInput):11 result = pipeline.predict([review.text])[0]12 return resultTask 3: Vietnamese Processing
Python
1from underthesea import word_tokenize23def tokenize_vietnamese(text):4 return word_tokenize(text, format="text")9. Tổng kết Project
Key Learnings
| Concept | Implementation |
|---|---|
| Text Preprocessing | Custom ReviewPreprocessor class |
| Feature Engineering | TF-IDF + meta features |
| Model Training | Multiple models comparison |
| Production Pipeline | SentimentPipeline class |
| Scalability | Spark integration |
| Monitoring | Analytics dashboard |
Best Practices Applied
- ✅ Modular code design
- ✅ Consistent preprocessing
- ✅ Multiple model evaluation
- ✅ Proper train/test split
- ✅ Model serialization
- ✅ Batch processing support
- ✅ Performance monitoring
Next Steps
- Deploy API endpoint
- Set up CI/CD pipeline
- Add A/B testing
- Implement feedback loop
- Scale với Kubernetes
🎉 Chúc mừng!
Bạn đã hoàn thành Data Wrangling Course!
Những gì bạn đã học:
- Advanced Pandas & Data Quality
- ETL Pipeline Design
- Apache Spark & PySpark
- Real-time Streaming với Kafka
- NLP & Text Processing
- Production ML Pipelines
Khóa tiếp theo: Advanced Data Analysis
