Lý thuyết
Bài 14/15

Project: Sentiment Analysis Pipeline

Xây dựng end-to-end sentiment analysis pipeline với real-world data

Project: Sentiment Analysis Pipeline

Sentiment Analysis AI Project

1. Project Overview

Project Goal

Xây dựng complete sentiment analysis pipeline từ data collection → preprocessing → training → deployment, sử dụng các kỹ thuật đã học trong course.

1.1 Business Context

Scenario: Bạn là Data Engineer tại một e-commerce company. Team cần hệ thống tự động phân tích sentiment của customer reviews để:

  • Monitor product quality
  • Identify issues quickly
  • Improve customer experience
  • Generate insights cho business team

1.2 Project Requirements

ComponentDescription
Data SourceE-commerce product reviews
ProcessingBatch + Real-time capability
ModelMulti-class sentiment (Positive/Neutral/Negative)
OutputDashboard-ready insights
ScaleHandle 100K+ reviews

1.3 Architecture

Text
1┌────────────────────────────────────────────────────────────────────┐
2│ Sentiment Analysis Architecture │
3├────────────────────────────────────────────────────────────────────┤
4│ │
5│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
6│ │ Raw Data │────▶│ ETL Layer │────▶│ Feature │ │
7│ │ (Reviews) │ │ (Spark) │ │ Store │ │
8│ └─────────────┘ └─────────────┘ └──────┬──────┘ │
9│ │ │
10│ ┌─────────────┐ ┌─────────────┐ ┌──────▼──────┐ │
11│ │ Model │◀────│ Training │◀────│ Processed │ │
12│ │ Registry │ │ Pipeline │ │ Data │ │
13│ └──────┬──────┘ └─────────────┘ └─────────────┘ │
14│ │ │
15│ ┌──────▼──────┐ ┌─────────────┐ ┌─────────────┐ │
16│ │ Serving │────▶│ Predictions│────▶│ Dashboard │ │
17│ │ Layer │ │ API/Batch │ │ Analytics │ │
18│ └─────────────┘ └─────────────┘ └─────────────┘ │
19│ │
20└────────────────────────────────────────────────────────────────────┘

2. Data Collection & Exploration

2.1 Load Sample Data

Python
1import pandas as pd
2import numpy as np
3from sklearn.model_selection import train_test_split
4import matplotlib.pyplot as plt
5import seaborn as sns
6
7# Simulate e-commerce review data
8np.random.seed(42)
9
10# Generate sample data (in practice, load from database/files)
11n_samples = 10000
12
13reviews_data = {
14 'review_id': range(1, n_samples + 1),
15 'product_id': np.random.randint(1, 500, n_samples),
16 'user_id': np.random.randint(1, 2000, n_samples),
17 'rating': np.random.choice([1, 2, 3, 4, 5], n_samples, p=[0.05, 0.1, 0.15, 0.35, 0.35]),
18 'review_text': None, # Will generate
19 'review_date': pd.date_range('2023-01-01', periods=n_samples, freq='30min')
20}
21
22# Sample review templates
23positive_reviews = [
24 "Excellent product! Exceeded my expectations. Fast shipping too.",
25 "Love it! Great quality and perfect fit. Will buy again.",
26 "Amazing value for money. Highly recommend to everyone.",
27 "Best purchase I've made this year. Works perfectly.",
28 "Super happy with this product. Customer service was great too."
29]
30
31neutral_reviews = [
32 "Product is okay, nothing special but works as described.",
33 "Average quality. Does the job but could be better.",
34 "It's fine for the price. Not amazing, not terrible.",
35 "Decent product. Shipping took a while but it arrived.",
36 "Good enough for basic use. Won't wow you but works."
37]
38
39negative_reviews = [
40 "Terrible quality. Broke after one week. Don't buy!",
41 "Very disappointed. Product looks nothing like the pictures.",
42 "Waste of money. Customer service is horrible.",
43 "Poor quality and late delivery. Want my money back.",
44 "Awful experience. Product defective and no refund given."
45]
46
47# Generate reviews based on rating
48def generate_review(rating):
49 if rating >= 4:
50 return np.random.choice(positive_reviews) + " " + f"Rating: {rating}/5"
51 elif rating == 3:
52 return np.random.choice(neutral_reviews) + " " + f"Rating: {rating}/5"
53 else:
54 return np.random.choice(negative_reviews) + " " + f"Rating: {rating}/5"
55
56reviews_data['review_text'] = [generate_review(r) for r in reviews_data['rating']]
57
58# Create DataFrame
59df = pd.DataFrame(reviews_data)
60
61# Add sentiment labels
62def get_sentiment(rating):
63 if rating >= 4:
64 return 'positive'
65 elif rating == 3:
66 return 'neutral'
67 else:
68 return 'negative'
69
70df['sentiment'] = df['rating'].apply(get_sentiment)
71
72print(df.head())
73print(f"\nDataset shape: {df.shape}")
74print(f"\nSentiment distribution:\n{df['sentiment'].value_counts()}")

2.2 Exploratory Data Analysis

Python
1import matplotlib.pyplot as plt
2import seaborn as sns
3
4fig, axes = plt.subplots(2, 2, figsize=(14, 10))
5
6# Rating distribution
7axes[0, 0].bar(df['rating'].value_counts().index, df['rating'].value_counts().values)
8axes[0, 0].set_title('Rating Distribution')
9axes[0, 0].set_xlabel('Rating')
10axes[0, 0].set_ylabel('Count')
11
12# Sentiment distribution
13sentiment_counts = df['sentiment'].value_counts()
14axes[0, 1].pie(sentiment_counts.values, labels=sentiment_counts.index, autopct='%1.1f%%')
15axes[0, 1].set_title('Sentiment Distribution')
16
17# Review length distribution
18df['review_length'] = df['review_text'].str.len()
19axes[1, 0].hist(df['review_length'], bins=30, edgecolor='black')
20axes[1, 0].set_title('Review Length Distribution')
21axes[1, 0].set_xlabel('Character Count')
22
23# Reviews over time
24daily_reviews = df.groupby(df['review_date'].dt.date).size()
25axes[1, 1].plot(daily_reviews.index, daily_reviews.values)
26axes[1, 1].set_title('Reviews Over Time')
27axes[1, 1].tick_params(axis='x', rotation=45)
28
29plt.tight_layout()
30plt.show()
31
32# Word frequency analysis
33from collections import Counter
34import re
35
36def get_words(text):
37 return re.findall(r'\b[a-zA-Z]+\b', text.lower())
38
39all_words = []
40for text in df['review_text']:
41 all_words.extend(get_words(text))
42
43word_freq = Counter(all_words)
44print("\nTop 20 Words:")
45print(word_freq.most_common(20))

3. Data Preprocessing Module

3.1 Text Preprocessor Class

Python
1import re
2import string
3import nltk
4from nltk.tokenize import word_tokenize
5from nltk.corpus import stopwords
6from nltk.stem import WordNetLemmatizer
7from nltk import pos_tag
8
9nltk.download('punkt', quiet=True)
10nltk.download('stopwords', quiet=True)
11nltk.download('wordnet', quiet=True)
12nltk.download('averaged_perceptron_tagger', quiet=True)
13
14class ReviewPreprocessor:
15 """
16 Complete text preprocessing pipeline for sentiment analysis.
17 """
18
19 def __init__(self,
20 remove_stopwords=True,
21 lemmatize=True,
22 min_word_length=2):
23 self.remove_stopwords = remove_stopwords
24 self.lemmatize = lemmatize
25 self.min_word_length = min_word_length
26 self.stop_words = set(stopwords.words('english'))
27 self.lemmatizer = WordNetLemmatizer()
28
29 # Add custom stop words
30 self.custom_stops = {'rating', 'product', 'would', 'also', 'get', 'got'}
31 self.stop_words.update(self.custom_stops)
32
33 def clean_text(self, text):
34 """Basic text cleaning"""
35 if not isinstance(text, str):
36 return ""
37
38 # Lowercase
39 text = text.lower()
40
41 # Remove URLs
42 text = re.sub(r'http\S+|www\.\S+', '', text)
43
44 # Remove HTML tags
45 text = re.sub(r'<.*?>', '', text)
46
47 # Remove ratings like "Rating: 5/5"
48 text = re.sub(r'rating:\s*\d+/\d+', '', text)
49
50 # Remove numbers
51 text = re.sub(r'\d+', '', text)
52
53 # Remove punctuation
54 text = text.translate(str.maketrans('', '', string.punctuation))
55
56 # Remove extra whitespace
57 text = ' '.join(text.split())
58
59 return text
60
61 def get_wordnet_pos(self, tag):
62 """Map POS tag to WordNet POS"""
63 if tag.startswith('J'):
64 return 'a'
65 elif tag.startswith('V'):
66 return 'v'
67 elif tag.startswith('R'):
68 return 'r'
69 return 'n'
70
71 def process_text(self, text):
72 """Full preprocessing pipeline"""
73 # Clean
74 text = self.clean_text(text)
75
76 if not text:
77 return ""
78
79 # Tokenize
80 tokens = word_tokenize(text)
81
82 # Filter by length
83 tokens = [t for t in tokens if len(t) >= self.min_word_length]
84
85 # Remove stop words
86 if self.remove_stopwords:
87 tokens = [t for t in tokens if t not in self.stop_words]
88
89 # Lemmatize with POS
90 if self.lemmatize and tokens:
91 pos_tags = pos_tag(tokens)
92 tokens = [
93 self.lemmatizer.lemmatize(word, self.get_wordnet_pos(tag))
94 for word, tag in pos_tags
95 ]
96
97 return ' '.join(tokens)
98
99 def process_batch(self, texts):
100 """Process multiple texts"""
101 return [self.process_text(text) for text in texts]
102
103
104# Initialize and process
105preprocessor = ReviewPreprocessor()
106
107# Process all reviews
108df['processed_text'] = preprocessor.process_batch(df['review_text'])
109
110print("Sample original vs processed:")
111for i in range(3):
112 print(f"\nOriginal: {df['review_text'].iloc[i]}")
113 print(f"Processed: {df['processed_text'].iloc[i]}")

4. Feature Engineering

4.1 TF-IDF Features

Python
1from sklearn.feature_extraction.text import TfidfVectorizer
2from sklearn.preprocessing import LabelEncoder
3
4class FeatureEngineer:
5 """Feature engineering for sentiment analysis"""
6
7 def __init__(self, max_features=5000, ngram_range=(1, 2)):
8 self.tfidf = TfidfVectorizer(
9 max_features=max_features,
10 ngram_range=ngram_range,
11 min_df=5,
12 max_df=0.95
13 )
14 self.label_encoder = LabelEncoder()
15 self.fitted = False
16
17 def fit(self, texts, labels):
18 """Fit vectorizer and encoder"""
19 self.tfidf.fit(texts)
20 self.label_encoder.fit(labels)
21 self.fitted = True
22 return self
23
24 def transform_text(self, texts):
25 """Transform texts to TF-IDF features"""
26 if not self.fitted:
27 raise ValueError("FeatureEngineer not fitted. Call fit() first.")
28 return self.tfidf.transform(texts)
29
30 def transform_labels(self, labels):
31 """Transform labels to numeric"""
32 return self.label_encoder.transform(labels)
33
34 def inverse_transform_labels(self, numeric_labels):
35 """Convert numeric labels back to text"""
36 return self.label_encoder.inverse_transform(numeric_labels)
37
38 def get_feature_names(self):
39 """Get feature names"""
40 return self.tfidf.get_feature_names_out()
41
42
43# Prepare data
44X = df['processed_text']
45y = df['sentiment']
46
47# Split data
48X_train, X_test, y_train, y_test = train_test_split(
49 X, y, test_size=0.2, random_state=42, stratify=y
50)
51
52# Initialize and fit feature engineer
53feature_eng = FeatureEngineer(max_features=5000, ngram_range=(1, 2))
54feature_eng.fit(X_train, y_train)
55
56# Transform
57X_train_tfidf = feature_eng.transform_text(X_train)
58X_test_tfidf = feature_eng.transform_text(X_test)
59y_train_enc = feature_eng.transform_labels(y_train)
60y_test_enc = feature_eng.transform_labels(y_test)
61
62print(f"Training features shape: {X_train_tfidf.shape}")
63print(f"Test features shape: {X_test_tfidf.shape}")
64print(f"Classes: {feature_eng.label_encoder.classes_}")

4.2 Additional Features

Python
1import numpy as np
2
3def extract_meta_features(df):
4 """Extract additional features from reviews"""
5 features = pd.DataFrame()
6
7 # Text length features
8 features['char_count'] = df['review_text'].str.len()
9 features['word_count'] = df['review_text'].str.split().str.len()
10 features['avg_word_length'] = features['char_count'] / (features['word_count'] + 1)
11
12 # Punctuation features
13 features['exclamation_count'] = df['review_text'].str.count('!')
14 features['question_count'] = df['review_text'].str.count('\?')
15 features['uppercase_ratio'] = df['review_text'].apply(
16 lambda x: sum(1 for c in x if c.isupper()) / (len(x) + 1)
17 )
18
19 # Sentiment lexicon features (simplified)
20 positive_words = {'excellent', 'amazing', 'love', 'great', 'best', 'perfect', 'happy'}
21 negative_words = {'terrible', 'awful', 'worst', 'hate', 'disappointed', 'waste', 'poor'}
22
23 features['positive_word_count'] = df['review_text'].str.lower().apply(
24 lambda x: sum(1 for word in x.split() if word in positive_words)
25 )
26 features['negative_word_count'] = df['review_text'].str.lower().apply(
27 lambda x: sum(1 for word in x.split() if word in negative_words)
28 )
29 features['sentiment_score'] = features['positive_word_count'] - features['negative_word_count']
30
31 return features
32
33# Extract meta features
34meta_features = extract_meta_features(df)
35print(meta_features.head())

5. Model Training

5.1 Train Multiple Models

Python
1from sklearn.linear_model import LogisticRegression
2from sklearn.naive_bayes import MultinomialNB
3from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
4from sklearn.svm import LinearSVC
5from sklearn.metrics import classification_report, accuracy_score, confusion_matrix
6import time
7
8class SentimentModelTrainer:
9 """Train and evaluate multiple sentiment models"""
10
11 def __init__(self):
12 self.models = {
13 'Logistic Regression': LogisticRegression(max_iter=1000, class_weight='balanced'),
14 'Naive Bayes': MultinomialNB(alpha=0.1),
15 'Linear SVM': LinearSVC(class_weight='balanced', max_iter=2000),
16 'Random Forest': RandomForestClassifier(n_estimators=100, class_weight='balanced', n_jobs=-1)
17 }
18 self.results = {}
19 self.best_model = None
20 self.best_score = 0
21
22 def train_all(self, X_train, y_train, X_test, y_test):
23 """Train and evaluate all models"""
24 for name, model in self.models.items():
25 print(f"\nTraining {name}...")
26 start_time = time.time()
27
28 # Train
29 model.fit(X_train, y_train)
30 train_time = time.time() - start_time
31
32 # Predict
33 y_pred = model.predict(X_test)
34
35 # Evaluate
36 accuracy = accuracy_score(y_test, y_pred)
37
38 self.results[name] = {
39 'model': model,
40 'accuracy': accuracy,
41 'train_time': train_time,
42 'predictions': y_pred,
43 'report': classification_report(y_test, y_pred, output_dict=True)
44 }
45
46 print(f" Accuracy: {accuracy:.4f}")
47 print(f" Training time: {train_time:.2f}s")
48
49 # Track best model
50 if accuracy > self.best_score:
51 self.best_score = accuracy
52 self.best_model = name
53
54 print(f"\n🏆 Best Model: {self.best_model} (Accuracy: {self.best_score:.4f})")
55 return self
56
57 def get_comparison_df(self):
58 """Get comparison DataFrame"""
59 comparison = []
60 for name, result in self.results.items():
61 comparison.append({
62 'Model': name,
63 'Accuracy': result['accuracy'],
64 'Train Time (s)': result['train_time'],
65 'Precision (macro)': result['report']['macro avg']['precision'],
66 'Recall (macro)': result['report']['macro avg']['recall'],
67 'F1 (macro)': result['report']['macro avg']['f1-score']
68 })
69 return pd.DataFrame(comparison).sort_values('Accuracy', ascending=False)
70
71 def get_best_model(self):
72 """Return best model"""
73 return self.results[self.best_model]['model']
74
75
76# Train models
77trainer = SentimentModelTrainer()
78trainer.train_all(X_train_tfidf, y_train_enc, X_test_tfidf, y_test_enc)
79
80# Get comparison
81comparison_df = trainer.get_comparison_df()
82print("\n📊 Model Comparison:")
83print(comparison_df.to_string(index=False))

5.2 Detailed Evaluation

Python
1import matplotlib.pyplot as plt
2import seaborn as sns
3from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
4
5def plot_model_evaluation(trainer, y_test, class_names):
6 """Visualize model evaluation"""
7 fig, axes = plt.subplots(2, 2, figsize=(14, 10))
8
9 # 1. Accuracy comparison
10 models = list(trainer.results.keys())
11 accuracies = [trainer.results[m]['accuracy'] for m in models]
12 axes[0, 0].barh(models, accuracies, color='steelblue')
13 axes[0, 0].set_xlabel('Accuracy')
14 axes[0, 0].set_title('Model Accuracy Comparison')
15 axes[0, 0].set_xlim([0, 1])
16 for i, v in enumerate(accuracies):
17 axes[0, 0].text(v + 0.01, i, f'{v:.3f}', va='center')
18
19 # 2. Training time comparison
20 train_times = [trainer.results[m]['train_time'] for m in models]
21 axes[0, 1].barh(models, train_times, color='coral')
22 axes[0, 1].set_xlabel('Time (seconds)')
23 axes[0, 1].set_title('Training Time Comparison')
24
25 # 3. Confusion matrix for best model
26 best_predictions = trainer.results[trainer.best_model]['predictions']
27 cm = confusion_matrix(y_test, best_predictions)
28 sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', ax=axes[1, 0],
29 xticklabels=class_names, yticklabels=class_names)
30 axes[1, 0].set_xlabel('Predicted')
31 axes[1, 0].set_ylabel('Actual')
32 axes[1, 0].set_title(f'Confusion Matrix ({trainer.best_model})')
33
34 # 4. F1 scores by class
35 f1_scores = {m: [] for m in models}
36 for name in models:
37 report = trainer.results[name]['report']
38 for cls in ['0', '1', '2']:
39 if cls in report:
40 f1_scores[name].append(report[cls]['f1-score'])
41
42 x = np.arange(len(class_names))
43 width = 0.2
44 for i, (name, scores) in enumerate(f1_scores.items()):
45 if len(scores) == len(class_names):
46 axes[1, 1].bar(x + i*width, scores, width, label=name)
47
48 axes[1, 1].set_xlabel('Class')
49 axes[1, 1].set_ylabel('F1 Score')
50 axes[1, 1].set_title('F1 Score by Class')
51 axes[1, 1].set_xticks(x + width * 1.5)
52 axes[1, 1].set_xticklabels(class_names)
53 axes[1, 1].legend()
54
55 plt.tight_layout()
56 plt.show()
57
58# Plot evaluation
59class_names = list(feature_eng.label_encoder.classes_)
60plot_model_evaluation(trainer, y_test_enc, class_names)

6. Production Pipeline

6.1 Complete Pipeline Class

Python
1import pickle
2import json
3from datetime import datetime
4
5class SentimentPipeline:
6 """
7 Production-ready sentiment analysis pipeline.
8 """
9
10 def __init__(self):
11 self.preprocessor = None
12 self.feature_engineer = None
13 self.model = None
14 self.metadata = {}
15
16 def train(self, df, text_col='review_text', label_col='sentiment'):
17 """Train complete pipeline"""
18 print("🚀 Training Sentiment Pipeline...")
19
20 # 1. Initialize preprocessor
21 print(" 1. Initializing preprocessor...")
22 self.preprocessor = ReviewPreprocessor()
23
24 # 2. Preprocess texts
25 print(" 2. Preprocessing texts...")
26 processed_texts = self.preprocessor.process_batch(df[text_col])
27
28 # 3. Split data
29 X_train, X_test, y_train, y_test = train_test_split(
30 processed_texts, df[label_col],
31 test_size=0.2, random_state=42, stratify=df[label_col]
32 )
33
34 # 4. Feature engineering
35 print(" 3. Engineering features...")
36 self.feature_engineer = FeatureEngineer(max_features=5000)
37 self.feature_engineer.fit(X_train, y_train)
38
39 X_train_feat = self.feature_engineer.transform_text(X_train)
40 X_test_feat = self.feature_engineer.transform_text(X_test)
41 y_train_enc = self.feature_engineer.transform_labels(y_train)
42 y_test_enc = self.feature_engineer.transform_labels(y_test)
43
44 # 5. Train model
45 print(" 4. Training model...")
46 self.model = LogisticRegression(max_iter=1000, class_weight='balanced')
47 self.model.fit(X_train_feat, y_train_enc)
48
49 # 6. Evaluate
50 y_pred = self.model.predict(X_test_feat)
51 accuracy = accuracy_score(y_test_enc, y_pred)
52
53 # 7. Save metadata
54 self.metadata = {
55 'trained_at': datetime.now().isoformat(),
56 'train_samples': len(X_train),
57 'test_samples': len(X_test),
58 'accuracy': float(accuracy),
59 'classes': list(self.feature_engineer.label_encoder.classes_),
60 'feature_count': X_train_feat.shape[1]
61 }
62
63 print(f"\n✅ Pipeline trained successfully!")
64 print(f" Accuracy: {accuracy:.4f}")
65 print(f" Features: {X_train_feat.shape[1]}")
66
67 return self
68
69 def predict(self, texts):
70 """Predict sentiment for new texts"""
71 if not isinstance(texts, list):
72 texts = [texts]
73
74 # Preprocess
75 processed = self.preprocessor.process_batch(texts)
76
77 # Extract features
78 features = self.feature_engineer.transform_text(processed)
79
80 # Predict
81 predictions = self.model.predict(features)
82 probabilities = self.model.predict_proba(features)
83
84 # Convert to labels
85 labels = self.feature_engineer.inverse_transform_labels(predictions)
86
87 # Format results
88 results = []
89 for i, text in enumerate(texts):
90 results.append({
91 'text': text[:100] + '...' if len(text) > 100 else text,
92 'sentiment': labels[i],
93 'confidence': float(max(probabilities[i])),
94 'probabilities': {
95 cls: float(prob)
96 for cls, prob in zip(self.feature_engineer.label_encoder.classes_, probabilities[i])
97 }
98 })
99
100 return results
101
102 def predict_batch(self, df, text_col='review_text'):
103 """Batch prediction for DataFrame"""
104 results = self.predict(df[text_col].tolist())
105
106 df_results = df.copy()
107 df_results['predicted_sentiment'] = [r['sentiment'] for r in results]
108 df_results['confidence'] = [r['confidence'] for r in results]
109
110 return df_results
111
112 def save(self, path='sentiment_pipeline.pkl'):
113 """Save pipeline to file"""
114 with open(path, 'wb') as f:
115 pickle.dump({
116 'preprocessor': self.preprocessor,
117 'feature_engineer': self.feature_engineer,
118 'model': self.model,
119 'metadata': self.metadata
120 }, f)
121 print(f"✅ Pipeline saved to {path}")
122
123 @classmethod
124 def load(cls, path='sentiment_pipeline.pkl'):
125 """Load pipeline from file"""
126 with open(path, 'rb') as f:
127 data = pickle.load(f)
128
129 pipeline = cls()
130 pipeline.preprocessor = data['preprocessor']
131 pipeline.feature_engineer = data['feature_engineer']
132 pipeline.model = data['model']
133 pipeline.metadata = data['metadata']
134
135 print(f"✅ Pipeline loaded from {path}")
136 print(f" Trained at: {pipeline.metadata['trained_at']}")
137 print(f" Accuracy: {pipeline.metadata['accuracy']:.4f}")
138
139 return pipeline
140
141
142# Train and use pipeline
143pipeline = SentimentPipeline()
144pipeline.train(df)
145
146# Test predictions
147test_reviews = [
148 "This is absolutely amazing! Best product ever!",
149 "Terrible quality, waste of money. Never buying again.",
150 "It's okay, nothing special but does the job."
151]
152
153results = pipeline.predict(test_reviews)
154for r in results:
155 print(f"\n📝 {r['text']}")
156 print(f" Sentiment: {r['sentiment']} (confidence: {r['confidence']:.2f})")
157 print(f" Probabilities: {r['probabilities']}")

6.2 Spark Integration (Large Scale)

Python
1from pyspark.sql import SparkSession
2from pyspark.sql.functions import udf, col
3from pyspark.sql.types import StringType, FloatType, StructType, StructField
4
5# Initialize Spark
6spark = SparkSession.builder \
7 .appName("SentimentAnalysis") \
8 .config("spark.sql.adaptive.enabled", "true") \
9 .getOrCreate()
10
11# Broadcast the pipeline for distributed prediction
12# (In production, use proper model serving)
13pipeline_broadcast = spark.sparkContext.broadcast(pipeline)
14
15# Define UDF for sentiment prediction
16@udf(returnType=StructType([
17 StructField("sentiment", StringType(), True),
18 StructField("confidence", FloatType(), True)
19]))
20def predict_sentiment_udf(text):
21 if not text:
22 return ("unknown", 0.0)
23
24 p = pipeline_broadcast.value
25 result = p.predict([text])[0]
26 return (result['sentiment'], result['confidence'])
27
28# Load data to Spark DataFrame
29spark_df = spark.createDataFrame(df[['review_id', 'review_text', 'sentiment']])
30
31# Apply predictions
32predictions_df = spark_df.withColumn(
33 "prediction",
34 predict_sentiment_udf(col("review_text"))
35).select(
36 "review_id",
37 "review_text",
38 "sentiment",
39 col("prediction.sentiment").alias("predicted"),
40 col("prediction.confidence").alias("confidence")
41)
42
43# Show results
44predictions_df.show(5, truncate=50)
45
46# Calculate accuracy
47accuracy = predictions_df.filter(
48 col("sentiment") == col("predicted")
49).count() / predictions_df.count()
50
51print(f"\nSpark Prediction Accuracy: {accuracy:.4f}")
52
53# Save results
54predictions_df.write.mode("overwrite").parquet("sentiment_predictions.parquet")

7. Monitoring & Analytics

7.1 Create Dashboard Metrics

Python
1def generate_analytics_report(df_predictions):
2 """Generate analytics report from predictions"""
3
4 report = {}
5
6 # Overall metrics
7 report['total_reviews'] = len(df_predictions)
8 report['sentiment_distribution'] = df_predictions['predicted_sentiment'].value_counts().to_dict()
9 report['avg_confidence'] = df_predictions['confidence'].mean()
10
11 # Time-based analysis
12 df_predictions['date'] = pd.to_datetime(df_predictions['review_date']).dt.date
13
14 daily_sentiment = df_predictions.groupby(['date', 'predicted_sentiment']).size().unstack(fill_value=0)
15 report['daily_sentiment'] = daily_sentiment.to_dict()
16
17 # Product analysis
18 product_sentiment = df_predictions.groupby(['product_id', 'predicted_sentiment']).size().unstack(fill_value=0)
19 product_sentiment['total'] = product_sentiment.sum(axis=1)
20 product_sentiment['positive_rate'] = product_sentiment.get('positive', 0) / product_sentiment['total']
21
22 # Top products with most negative reviews
23 report['products_with_issues'] = product_sentiment.nlargest(10, 'negative')[['negative', 'total', 'positive_rate']].to_dict()
24
25 return report
26
27# Generate report (simulated predictions)
28df_with_predictions = df.copy()
29df_with_predictions['predicted_sentiment'] = df['sentiment'] # Use actual for demo
30df_with_predictions['confidence'] = np.random.uniform(0.7, 0.99, len(df))
31
32report = generate_analytics_report(df_with_predictions)
33
34print("📊 Analytics Report")
35print("=" * 50)
36print(f"Total Reviews: {report['total_reviews']:,}")
37print(f"Average Confidence: {report['avg_confidence']:.2%}")
38print(f"\nSentiment Distribution:")
39for sentiment, count in report['sentiment_distribution'].items():
40 print(f" {sentiment}: {count:,} ({count/report['total_reviews']:.1%})")

8. Bài tập tổng hợp

Project Challenge

Hoàn thành các tasks sau để có complete sentiment analysis system:

Task 1: Improve Model

Python
1# TODO: Thử nghiệm với:
2# 1. Different feature extraction (word embeddings, BERT embeddings)
3# 2. Hyperparameter tuning với GridSearchCV
4# 3. Ensemble methods
5# 4. Handle imbalanced classes better
6
7# YOUR CODE HERE

Task 2: Add Real-time Capability

Python
1# TODO: Implement real-time prediction với:
2# 1. REST API endpoint (Flask/FastAPI)
3# 2. WebSocket for streaming predictions
4# 3. Caching for frequent queries
5
6# YOUR CODE HERE

Task 3: Vietnamese Support

Python
1# TODO: Extend pipeline for Vietnamese:
2# 1. Use underthesea or pyvi for tokenization
3# 2. Create Vietnamese stopwords list
4# 3. Find/train Vietnamese word embeddings
5
6# YOUR CODE HERE
💡 Hints

Task 1: Model Improvement

Python
1from sklearn.model_selection import GridSearchCV
2
3# Grid search for Logistic Regression
4param_grid = {
5 'C': [0.1, 1, 10],
6 'penalty': ['l1', 'l2'],
7 'solver': ['liblinear', 'saga']
8}
9
10grid_search = GridSearchCV(
11 LogisticRegression(max_iter=1000, class_weight='balanced'),
12 param_grid,
13 cv=5,
14 scoring='f1_macro'
15)
16grid_search.fit(X_train_tfidf, y_train_enc)
17print(f"Best params: {grid_search.best_params_}")

Task 2: FastAPI Endpoint

Python
1from fastapi import FastAPI
2from pydantic import BaseModel
3
4app = FastAPI()
5
6class ReviewInput(BaseModel):
7 text: str
8
9@app.post("/predict")
10def predict(review: ReviewInput):
11 result = pipeline.predict([review.text])[0]
12 return result

Task 3: Vietnamese Processing

Python
1from underthesea import word_tokenize
2
3def tokenize_vietnamese(text):
4 return word_tokenize(text, format="text")

9. Tổng kết Project

Key Learnings

ConceptImplementation
Text PreprocessingCustom ReviewPreprocessor class
Feature EngineeringTF-IDF + meta features
Model TrainingMultiple models comparison
Production PipelineSentimentPipeline class
ScalabilitySpark integration
MonitoringAnalytics dashboard

Best Practices Applied

  1. ✅ Modular code design
  2. ✅ Consistent preprocessing
  3. ✅ Multiple model evaluation
  4. ✅ Proper train/test split
  5. ✅ Model serialization
  6. ✅ Batch processing support
  7. ✅ Performance monitoring

Next Steps

  • Deploy API endpoint
  • Set up CI/CD pipeline
  • Add A/B testing
  • Implement feedback loop
  • Scale với Kubernetes

🎉 Chúc mừng!

Bạn đã hoàn thành Data Wrangling Course!

Những gì bạn đã học:

  • Advanced Pandas & Data Quality
  • ETL Pipeline Design
  • Apache Spark & PySpark
  • Real-time Streaming với Kafka
  • NLP & Text Processing
  • Production ML Pipelines

Khóa tiếp theo: Advanced Data Analysis