Cloud coverage prediction is critical in weather forecasting and a variety of applications such as solar energy generation, aviation, and climate monitoring. Accurate forecasts help decision-makers and sectors plan for and adapt to changing weather conditions. The advancement of artificial intelligence and computer vision techniques in recent years has created new opportunities for enhancing cloud coverage forecasts.
One promising approach is the use of SkyCam images.
- In the face of rapidly changing global climate patterns, there is an urgent need for innovative tools and technologies to better understand and predict weather-related phenomena.
- One crucial aspect of climate analysis is the assessment of cloud coverage, which plays a pivotal role in influencing weather conditions and climate trends.
- Experts may not always be available to monitor climatic shifts. Therefore, developing an automated weather monitoring system is crucial for various applications, including agriculture and disaster management.
The purpose of this research is to estimate the opaque Cloud Coverage from a Skycam Image using AI/ML methodologies.
Table of Content
- Cloud Coverage Prediction using SkyCam Images
- Implementations Cloud Coverage Prediction using SkyCam Images
- Cloud Coverage Prediction Models:
- Part I. Model Building & Traning Pipeline
- A. Clip Model Finetuning
- B. Catboost Regressor Model Building
- Part II. UI Inference Codes for Deployed Model
- Results:
Cloud Coverage Prediction using SkyCam Images
The integration of Computer Vision and Machine Learning, leading to regression and classification use cases, has been one of the major trending research areas. The purpose of this research is to estimate cloud coverage using SkyCam images and Computer Vision techniques. We hope to develop a system that can deliver real-time or short-term forecasts of cloud cover percentages by training predictive algorithms. This predictive skill has the potential to improve existing weather forecasting models and decision-making in industries such as energy production and transportation.
- To address this imperative, a novel approach leveraging Computer Vision and Machine Learning techniques is used to develop cloud coverage calculators from skycam images.
- Cloud coverage is a key parameter in weather forecasting, and automating its assessment can significantly improve weather predictions.
- Furthermore, it can benefit industries reliant on weather conditions, such as renewable energy generation and transportation.
- In the energy domain, specifically related to climatic patterns, the robust cloud coverage calculator model can accurately calculate cloud coverage from skycam images. The model aims to analyze the cloud formations in the provided images and provide a percentage indicating the extent of cloud coverage.
Moreover, integrating this cloud coverage model with skycam can serve as an early warning system for impending storms, heavy rains, and climatic shifts, helping to take preventive measures and ensure public safety.
Before delving into the core model development, it’s essential to acquire the domain knowledge required to build this project!
Domain Knowledge
- Skycam Images: SkyCam, short for Sky Camera, is a specialized camera system often installed in various locations, including airports, research stations, and meteorological facilities. These cameras continuously capture images of the sky, providing valuable data for weather monitoring.
Implementations Cloud Coverage Prediction using SkyCam Images
System Architecture for the project:
- There are 2 pipelines as shown below, one is for Training both the models i.e. CLIP & Catboost & other is for Inference.
- Detailed explanation of system architecture is provided in Implementations Cloud Coverage Prediction using SkyCam Images
Prerequsite:
- Programming Language: Python
- AI/ML Platform for Model Training: Jupyter Notebook
- Web App: Gradio
- Libraries/Requirements: OpenCv, timm, pytorch, transformers, clip, Catboost
DataSet:
- Data Contains 1,33,653 skycam images with their opaque cloud coverage in percentage.
- During scraping this data, I have used OCR to extract the cloud coverage in percentage.
- Dataset Link : Skycam Images
Cloud Coverage Prediction Models:
1. CLIP Model & its working:
- There are various methods to extract features from images, but here, have utilized the Contrastive Language Image Pretrained (CLIP) Model.
- CLIP is typically used for image-text similarity problems.
- The CLIP model comprises 3 main components: Image Encoder, Text Encoder, and Projection Head.
- Image Encoder: Utilizes ResNet50 to extract features from images in vector format.
- Text Encoder: Utilizes Distilled-Bert to extract features from text in vector format.
- Projection Head: Transforms both image and text vectors into the same size/shape.
- A Dot Product is applied, creating a knowledge base, i.e., image-text mapping knowledge.
- During inference, when an image is passed, image/text vectors are created, and a similarity score is generated between the knowledge base and the image/text vector to provide results.
2. Catboost Model:
- Used Catboost Regressor to calculate cloud coverage.
- Catboost is chosen for its robustness to outliers, high efficiency, out-of-the-box support, and faster processing compared to other bagging and generic models especially when we have image feature vectors.
Technical Workflow Steps:
- Initially, we have data: images mapped with their cloud coverage values.
- Generate text from the cloud coverage value, e.g. if cloud coverage = 97, then Text = “There is High Cloud Coverage. Opaque Cloud Coverage is 97%.”
- Fine-tune the Contrastive Language Image Pretrained (CLIP) model on the skycam images + corresponding text.
- Extraction of features from skycam images is done using the Fine-tuned CLIP model’s Image Encoder.
- Extracted features are treated as ‘x’ (independent variables with a size of 2048) and cloud coverage values from Step 1 are treated as ‘y’ (dependent features) for the Catboost model.
- Training, validation & testing records: 70,168 | 30,072 | 33,414.
- The best hyperparameters for the Catboost model are determined, and a Regressor is implemented to predict cloud coverage in percentages.
- Later, developed and deployed a Gradio App on Hugging Face Spaces, where users input a skycam image and receive the opaque cloud coverage percentage as output.
Python Libraries
Lets install following dependencies for the project:
- Keep all the below dependencies in your requirements.txt.
- Run command: pip install -r requirements.txt in your terminal.
- Once dependencies are installed, we are set to go!
numpy
pandas
matplotlib
albumentations
opencv-python
torch
timm
tqdm
scikit-learn
catboost
transformers
gradio
git+https://github.com/openai/CLIP.git
Part I. Model Building & Traning Pipeline
Step 1:Load the necessary Libraries
Let’s first import all the necessary libraries which are required for both clip model & Catboost model.
Python3
# Importing Libraries import os, cv2, gc, itertools, pickle from PIL import Image import numpy as np import pandas as pd import albumentations as A import matplotlib.pyplot as plt import torch, timm, clip from torch import nn import torch.nn.functional as F from torch.utils.data import Dataset, DataLoader from tqdm.autonotebook import tqdm from transformers import DistilBertModel, DistilBertConfig, DistilBertTokenizer from sklearn.metrics import r2_score, mean_absolute_error, mean_squared_error from sklearn.model_selection import train_test_split from catboost import CatBoostRegressor |
Step 2: Load the dataset
Python3
# Importing skycam .csv Data: The label i.e. text feature is generated from the numeric cloud coverage. folder = 'SkyCam' df = pd.read_csv(folder + "/cloud_data_cleaned1.csv" ) df = df[[ 'image_name' , 'label' , 'opaque_clouds' ]] df.columns = [ 'image' , 'caption' , 'cloudcover' ] print (df.head()) |
Output:
image caption \
0 20160101075000.raw.jpg Image has No Cloud Coverage. Image has 7% of o...
1 20160101080000.raw.jpg Image has No Cloud Coverage. Image has 7% of o...
2 20160101081000.raw.jpg Image has No Cloud Coverage. Image has 7% of o...
3 20160101082000.raw.jpg Image has No Cloud Coverage. Image has 7% of o...
4 20160101083000.raw.jpg Image has No Cloud Coverage. Image has 7% of o...
cloudcover
0 7
1 7
2 7
3 7
4 7
- The Data consists of skycam image name, caption which I have generated while building this csv.
- Caption is generated from the third feature shown above i.e. cloudcover which is in percentage.
- Our Aim is to calculate cloud cover given a skycam image.
- This is a typical Classical Machine Learning Regression Problem with integration of Computer Vision.
Let’s check an image
Python3
img_folder = os.path.join(folder, 'Extracted Images' , 'Extracted Images' ) #img_filename = os.listdir(img_folder)[0] img_path = os.path.join(img_folder, df[ 'image' ].iloc[ 777 ]) img = Image. open (img_path) plt.imshow(img) plt.show() |
Output:
A. Clip Model Finetuning
Step 3: Setting up Configurations:
The below code sets up essential hyperparameters and configurations for CLIP model. It includes settings for image and text data processing, batch size, learning rates, and training epochs. It also specifies the use of a GPU if available, and the choice of model architectures for image and text encoding. Additionally, it defines parameters for a projection head used for both image and text encoders, including the dimensionality of the projection and dropout rate. These configurations are crucial for the successful training and execution of the machine learning model.
Python3
# ----- Setting up Hyper Parameters in Configurations ----- # class CFG: debug = False image_path = img_folder # Specify your Image directory path captions_path = "." batch_size = 128 num_workers = 4 head_lr = 1e - 3 image_encoder_lr = 1e - 4 text_encoder_lr = 1e - 5 weight_decay = 1e - 3 patience = 3 factor = 0.8 epochs = 15 device = torch.device( "cuda" if torch.cuda.is_available() else "cpu" ) model_name = 'resnet50' image_embedding = 2048 text_encoder_model = "distilbert-base-uncased" text_embedding = 768 text_tokenizer = "distilbert-base-uncased" max_length = 200 pretrained = True # for both image encoder and text encoder trainable = True # for both image encoder and text encoder temperature = 1.0 size = 224 # For projection head: used for both image and text encoders num_projection_layers = 1 projection_dim = 256 dropout = 0.1 |
Step 4: Setting up Utils:
The below code defines utility functions for monitoring and managing metrics during training. It includes an AvgMeter class to calculate averages and a function get_lr to extract the learning rate from an optimizer.
Python3
# ----- Setting up Utils ----- # class AvgMeter: def __init__( self , name = "Metric" ): self .name = name self .reset() def reset( self ): self .avg, self . sum , self .count = [ 0 ] * 3 def update( self , val, count = 1 ): self .count + = count self . sum + = val * count self .avg = self . sum / self .count def __repr__( self ): text = f "{self.name}: {self.avg:.4f}" return text def get_lr(optimizer): for param_group in optimizer.param_groups: return param_group[ "lr" ] |
Step 5: Building Custom Torch Dataset:
The below code defines a custom dataset class to transform the input images & text to a specific format that CLIP model intakes. It takes image filenames, captions, a tokenizer, and transforms as inputs, allowing for efficient data loading and processing. Additionally, it provides image transformation functions based on the specified mode (train or not) through get_transforms.
Python3
# ----- Building Custom Dataset ----- # class CLIPDataset(torch.utils.data.Dataset): def __init__( self , image_filenames, captions, tokenizer, transforms): """ image_filenames and captions must have the same length; so, if there are multiple captions for each image, the image_filenames must have repetitive file names. """ self .image_filenames = image_filenames self .captions = list (captions) self .encoded_captions = tokenizer( list (captions), padding = True , truncation = True , max_length = CFG.max_length) self .transforms = transforms def __getitem__( self , idx): item = { key: torch.tensor(values[idx]) for key, values in self .encoded_captions.items() } image = cv2.imread(f "{CFG.image_path}/{self.image_filenames[idx]}" ) image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) image = self .transforms(image = image)[ 'image' ] item[ 'image' ] = torch.tensor(image).permute( 2 , 0 , 1 ). float () item[ 'caption' ] = self .captions[idx] return item def __len__( self ): return len ( self .captions) def get_transforms(mode = "train" ): if mode = = "train" : return A.Compose( [ A.Resize(CFG.size, CFG.size, always_apply = True ), A.Normalize(max_pixel_value = 255.0 , always_apply = True ), ] ) else : return A.Compose( [ A.Resize(CFG.size, CFG.size, always_apply = True ), A.Normalize(max_pixel_value = 255.0 , always_apply = True ), ] ) |
Step 6: Image Encoder Class:
The CLIP model uses the below Image Encoder Class to pass the image to Resnet50 i.e. the Image Encoder for CLIP model. It is basically used to extract features from image data. We define an image encoder class, which utilizes a pre-trained model to encode images into fixed-size vectors. The model’s architecture, pre-training status, and trainability are configurable.
Python3
# ----- Image Encoder ----- # class ImageEncoder(nn.Module): # Encode images to a fixed size vector def __init__( self , model_name = CFG.model_name, pretrained = CFG.pretrained, trainable = CFG.trainable): super ().__init__() self .model = timm.create_model( model_name, pretrained, num_classes = 0 , global_pool = "avg" ) for p in self .model.parameters(): p.requires_grad = trainable def forward( self , x): return self .model(x) |
Step 7: Text Encoder Class:
CLIP model has a text encoder which is Distilled Bert. It is used to encoder extracts sentence embeddings from text input.
Python3
# ----- Text Encoder ----- # class TextEncoder(nn.Module): def __init__( self , model_name = CFG.text_encoder_model, pretrained = CFG.pretrained, trainable = CFG.trainable): super ().__init__() if pretrained: self .model = DistilBertModel.from_pretrained(model_name) else : self .model = DistilBertModel(config = DistilBertConfig()) for p in self .model.parameters(): p.requires_grad = trainable # W are using the CLS token hidden representation as the sentence's embedding self .target_token_idx = 0 def forward( self , input_ids, attention_mask): output = self .model(input_ids = input_ids, attention_mask = attention_mask) last_hidden_state = output.last_hidden_state return last_hidden_state[:, self .target_token_idx, :] |
Step 8: Projection Head Class:
Below code defines a projection head module for dimensionality reduction of input image embeddings & text embeddings. It includes linear projections, activation functions (GELU), dropout, and layer normalization. The module is used to transform embeddings into a lower-dimensional space while preserving important features in order to increase training efficiency and decrease training time.
Python3
# ----- Projection Head ----- # class ProjectionHead(nn.Module): def __init__( self , embedding_dim, projection_dim = CFG.projection_dim, dropout = CFG.dropout ): super ().__init__() self .projection = nn.Linear(embedding_dim, projection_dim) self .gelu = nn.GELU() self .fc = nn.Linear(projection_dim, projection_dim) self .dropout = nn.Dropout(dropout) self .layer_norm = nn.LayerNorm(projection_dim) def forward( self , x): projected = self .projection(x) x = self .gelu(projected) x = self .fc(x) x = self .dropout(x) x = x + projected x = self .layer_norm(x) return x |
Step 9: Defining Clip Model:
Now we define our custom CLIP model class, where we initialize the constructor with the image encoder, text encoder & projection head. The model computes embeddings for images and texts and calculates a loss that encourages similar images and text to have high similarity scores. Cross-entropy loss is used for training, and the model aims to align image and text embeddings in a joint embedding space for various applications like image-text retrieval and understanding.
Python3
# ----- CLIP Model Define ----- # class CLIPModel(nn.Module): def __init__( self , temperature = CFG.temperature, image_embedding = CFG.image_embedding, text_embedding = CFG.text_embedding, ): super ().__init__() self .image_encoder = ImageEncoder() self .text_encoder = TextEncoder() self .image_projection = ProjectionHead(embedding_dim = image_embedding) self .text_projection = ProjectionHead(embedding_dim = text_embedding) self .temperature = temperature def forward( self , batch): # Getting Image and Text Features image_features = self .image_encoder(batch[ "image" ]) text_features = self .text_encoder( input_ids = batch[ "input_ids" ], attention_mask = batch[ "attention_mask" ] ) # Getting Image and Text Embeddings (with same dimension) image_embeddings = self .image_projection(image_features) text_embeddings = self .text_projection(text_features) # Calculating the Loss logits = (text_embeddings @ image_embeddings.T) / self .temperature images_similarity = image_embeddings @ image_embeddings.T texts_similarity = text_embeddings @ text_embeddings.T targets = F.softmax( (images_similarity + texts_similarity) / 2 * self .temperature, dim = - 1 ) texts_loss = cross_entropy(logits, targets, reduction = 'none' ) images_loss = cross_entropy(logits.T, targets.T, reduction = 'none' ) loss = (images_loss + texts_loss) / 2.0 # shape: (batch_size) return loss.mean() def cross_entropy(preds, targets, reduction = 'none' ): log_softmax = nn.LogSoftmax(dim = - 1 ) loss = ( - targets * log_softmax(preds)). sum ( 1 ) if reduction = = "none" : return loss elif reduction = = "mean" : return loss.mean() |
Step 10: Defining Training Functions for Clip Model:
Below code contains essential training methods to train a CLIP model. It includes functions for splitting a dataset into training and validation sets, building data loaders with transformations, setting up of epochs, batch size and other hyperparameters and performing training and validation epochs. These methods are crucial for training and evaluating CLIP model effectively.
Python3
# ----- Training Methods ----- # def make_train_valid_dfs(df): # First 1,30,000 records for training train_dataframe = df.iloc[: 130000 , :] valid_dataframe = df.iloc[ 130000 :, :] # Last 30k records for validation return train_dataframe.reset_index(drop = True ), valid_dataframe.reset_index(drop = True ) def build_loaders(dataframe, tokenizer, mode): transforms = get_transforms(mode = mode) dataset = CLIPDataset( dataframe[ "image" ].values, dataframe[ "caption" ].values, tokenizer = tokenizer, transforms = transforms, ) dataloader = torch.utils.data.DataLoader( dataset, batch_size = CFG.batch_size, num_workers = CFG.num_workers, shuffle = True if mode = = "train" else False , ) return dataloader def train_epoch(model, train_loader, optimizer, lr_scheduler, step): loss_meter = AvgMeter() tqdm_object = tqdm(train_loader, total = len (train_loader)) for batch in tqdm_object: batch = {k: v.to(CFG.device) for k, v in batch.items() if k ! = "caption" } loss = model(batch) optimizer.zero_grad() loss.backward() optimizer.step() if step = = "batch" : lr_scheduler.step() count = batch[ "image" ].size( 0 ) loss_meter.update(loss.item(), count) tqdm_object.set_postfix( train_loss = loss_meter.avg, lr = get_lr(optimizer)) return loss_meter def valid_epoch(model, valid_loader): loss_meter = AvgMeter() tqdm_object = tqdm(valid_loader, total = len (valid_loader)) for batch in tqdm_object: batch = {k: v.to(CFG.device) for k, v in batch.items() if k ! = "caption" } loss = model(batch) count = batch[ "image" ].size( 0 ) loss_meter.update(loss.item(), count) tqdm_object.set_postfix(valid_loss = loss_meter.avg) return loss_meter |
Step 11: Train Validation Split:
We split the input data into train & validation split. In train set we have 1,30,000 records & in valid set we have 3654 records. We are not using any test set over here, that because we are just using CLIP model to extract feature embeddings out of skycam images.
Python3
# ----- Train-Valid Split ----- # train_df, valid_df = make_train_valid_dfs(df) print ( len (train_df), len (valid_df)) tokenizer = DistilBertTokenizer.from_pretrained(CFG.text_tokenizer) train_loader = build_loaders(train_df, tokenizer, mode = "train" ) valid_loader = build_loaders(valid_df, tokenizer, mode = "valid" ) |
Output:
130000 3654
Step 12: Clip Model Finetuning:
Now, we finetune the CLIP model on our custom data. The below provided code segment loads a pre-trained CLIP model and sets up the training process. It defines the model’s parameters and optimizers, with separate learning rates for different components. It then runs the training loop for a specified number of epochs, saving the best model based on validation loss and adjusting the learning rate using a scheduler. This code trains the model and saves the best-performing version.
Python3
# ----- Loading Pretrained Model ----- # model = CLIPModel().to(CFG.device) params = [ { "params" : model.image_encoder.parameters(), "lr" : CFG.image_encoder_lr}, { "params" : model.text_encoder.parameters(), "lr" : CFG.text_encoder_lr}, { "params" : itertools.chain( model.image_projection.parameters(), model.text_projection.parameters() ), "lr" : CFG.head_lr, "weight_decay" : CFG.weight_decay} ] optimizer = torch.optim.AdamW(params, weight_decay = 0. ) lr_scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau( optimizer, mode = "min" , patience = CFG.patience, factor = CFG.factor ) |
Model Training
Python3
# ----- Model Training ----- # step = "epoch" best_loss = float ( 'inf' ) for epoch in range (CFG.epochs): print (f "Epoch: {epoch + 1}" ) model.train() train_loss = train_epoch( model, train_loader, optimizer, lr_scheduler, step) model. eval () with torch.no_grad(): valid_loss = valid_epoch(model, valid_loader) if valid_loss.avg < best_loss: best_loss = valid_loss.avg torch.save(model.state_dict(), "CLIP_model.pt" ) print ( "Saved Best Model!" ) lr_scheduler.step(valid_loss.avg) |
Step 13: Save the Clip Model & its configurations
Now, we save the Clip Model & its configs into pickle file. In Step 12, already a .pt extension model is been saved but still for model safety purpose we also save it in .pkl file.
Python3
with open ( 'clip_mdl.pkl' , 'wb' ) as f: pickle.dump(model, f) with open ( 'clip_cfg.pkl' , 'wb' ) as f: pickle.dump(CFG, f) |
B. Catboost Regressor Model Building
Step 14: Train-Test-Valid Split for Regression:
In the below code we do a train-test-valid split. We have 70,168 records for training, 30,072 records for validation and 33,414 records for testing. Set a random state to ensure model output repeatability.
Python3
x = df[ 'image' ] y = df[ 'cloudcover' ] x_train, x_test, y_train, y_test = train_test_split(x, y, test_size = 0.25 , random_state = 48 ) x_train, x_val, y_train, y_val = train_test_split(x_train, y_train, test_size = 0.30 , random_state = 48 ) print ((x_train.shape, x_val.shape, x_test.shape)) |
Output:
((70168,), (30072,), (33414,))
Step 15: Loading the Finetuned Clip Model:
In the below code we load the finetuned .pt CLIP model. Now since we have our finetuned CLIP model which is ready for image feature extraction, we first of all extract the feature embeddings for all the sky cam images from CLIP model and those embeddings would work as input features for our catboost model.
Python3
model = CLIPModel().to(CFG.device) model.load_state_dict(torch.load( "clip_model.pt" , map_location = CFG.device)) model. eval () |
Output:
CLIPModel(
(image_encoder): ImageEncoder(
(model): ResNet(
(conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
(bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(act1): ReLU(inplace=True)
(maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
(layer1): Sequential(
(0): Bottleneck(
(conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(act1): ReLU(inplace=True)
(conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
(bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(drop_block): Identity()
(act2): ReLU(inplace=True)
(aa): Identity()
(conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(act3): ReLU(inplace=True)
(downsample): Sequential(
(0): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
(1): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
)
)
(1): Bottleneck(
(conv1): Conv2d(256, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(act1): ReLU(inplace=True)
(conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
(bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(drop_block): Identity()
(act2): ReLU(inplace=True)
(aa): Identity()
(conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(act3): ReLU(inplace=True)
)
(2): Bottleneck(
(conv1): Conv2d(256, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(act1): ReLU(inplace=True)
(conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
(bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(drop_block): Identity()
(act2): ReLU(inplace=True)
(aa): Identity()
(conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(act3): ReLU(inplace=True)
)
)
(layer2): Sequential(
(0): Bottleneck(
(conv1): Conv2d(256, 128, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn1): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(act1): ReLU(inplace=True)
(conv2): Conv2d(128, 128, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
(bn2): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(drop_block): Identity()
(act2): ReLU(inplace=True)
(aa): Identity()
(conv3): Conv2d(128, 512, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn3): BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(act3): ReLU(inplace=True)
(downsample): Sequential(
(0): Conv2d(256, 512, kernel_size=(1, 1), stride=(2, 2), bias=False)
(1): BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
)
)
(1): Bottleneck(
(conv1): Conv2d(512, 128, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn1): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(act1): ReLU(inplace=True)
(conv2): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
(bn2): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(drop_block): Identity()
(act2): ReLU(inplace=True)
(aa): Identity()
(conv3): Conv2d(128, 512, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn3): BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(act3): ReLU(inplace=True)
)
(2): Bottleneck(
(conv1): Conv2d(512, 128, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn1): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(act1): ReLU(inplace=True)
(conv2): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
(bn2): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(drop_block): Identity()
(act2): ReLU(inplace=True)
(aa): Identity()
(conv3): Conv2d(128, 512, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn3): BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(act3): ReLU(inplace=True)
)
(3): Bottleneck(
(conv1): Conv2d(512, 128, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn1): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(act1): ReLU(inplace=True)
(conv2): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
(bn2): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(drop_block): Identity()
(act2): ReLU(inplace=True)
(aa): Identity()
(conv3): Conv2d(128, 512, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn3): BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(act3): ReLU(inplace=True)
)
)
(layer3): Sequential(
(0): Bottleneck(
(conv1): Conv2d(512, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn1): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(act1): ReLU(inplace=True)
(conv2): Conv2d(256, 256, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
(bn2): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(drop_block): Identity()
(act2): ReLU(inplace=True)
(aa): Identity()
(conv3): Conv2d(256, 1024, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn3): BatchNorm2d(1024, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(act3): ReLU(inplace=True)
(downsample): Sequential(
(0): Conv2d(512, 1024, kernel_size=(1, 1), stride=(2, 2), bias=False)
(1): BatchNorm2d(1024, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
)
)
(1): Bottleneck(
(conv1): Conv2d(1024, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn1): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(act1): ReLU(inplace=True)
(conv2): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
(bn2): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(drop_block): Identity()
(act2): ReLU(inplace=True)
(aa): Identity()
(conv3): Conv2d(256, 1024, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn3): BatchNorm2d(1024, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(act3): ReLU(inplace=True)
)
(2): Bottleneck(
(conv1): Conv2d(1024, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn1): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(act1): ReLU(inplace=True)
(conv2): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
(bn2): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(drop_block): Identity()
(act2): ReLU(inplace=True)
(aa): Identity()
(conv3): Conv2d(256, 1024, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn3): BatchNorm2d(1024, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(act3): ReLU(inplace=True)
)
(3): Bottleneck(
(conv1): Conv2d(1024, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn1): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(act1): ReLU(inplace=True)
(conv2): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
(bn2): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(drop_block): Identity()
(act2): ReLU(inplace=True)
(aa): Identity()
(conv3): Conv2d(256, 1024, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn3): BatchNorm2d(1024, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(act3): ReLU(inplace=True)
)
(4): Bottleneck(
(conv1): Conv2d(1024, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn1): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(act1): ReLU(inplace=True)
(conv2): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
(bn2): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(drop_block): Identity()
(act2): ReLU(inplace=True)
(aa): Identity()
(conv3): Conv2d(256, 1024, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn3): BatchNorm2d(1024, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(act3): ReLU(inplace=True)
)
(5): Bottleneck(
(conv1): Conv2d(1024, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn1): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(act1): ReLU(inplace=True)
(conv2): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
(bn2): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(drop_block): Identity()
(act2): ReLU(inplace=True)
(aa): Identity()
(conv3): Conv2d(256, 1024, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn3): BatchNorm2d(1024, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(act3): ReLU(inplace=True)
)
)
(layer4): Sequential(
(0): Bottleneck(
(conv1): Conv2d(1024, 512, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn1): BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(act1): ReLU(inplace=True)
(conv2): Conv2d(512, 512, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
(bn2): BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(drop_block): Identity()
(act2): ReLU(inplace=True)
(aa): Identity()
(conv3): Conv2d(512, 2048, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn3): BatchNorm2d(2048, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(act3): ReLU(inplace=True)
(downsample): Sequential(
(0): Conv2d(1024, 2048, kernel_size=(1, 1), stride=(2, 2), bias=False)
(1): BatchNorm2d(2048, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
)
)
(1): Bottleneck(
(conv1): Conv2d(2048, 512, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn1): BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(act1): ReLU(inplace=True)
(conv2): Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
(bn2): BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(drop_block): Identity()
(act2): ReLU(inplace=True)
(aa): Identity()
(conv3): Conv2d(512, 2048, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn3): BatchNorm2d(2048, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(act3): ReLU(inplace=True)
)
(2): Bottleneck(
(conv1): Conv2d(2048, 512, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn1): BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(act1): ReLU(inplace=True)
(conv2): Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
(bn2): BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(drop_block): Identity()
(act2): ReLU(inplace=True)
(aa): Identity()
(conv3): Conv2d(512, 2048, kernel_size=(1, 1), stride=(1, 1), bias=False)
(bn3): BatchNorm2d(2048, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(act3): ReLU(inplace=True)
)
)
(global_pool): SelectAdaptivePool2d (pool_type=avg, flatten=Flatten(start_dim=1, end_dim=-1))
(fc): Identity()
)
)
(text_encoder): TextEncoder(
(model): DistilBertModel(
(embeddings): Embeddings(
(word_embeddings): Embedding(30522, 768, padding_idx=0)
(position_embeddings): Embedding(512, 768)
(LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_affine=True)
(dropout): Dropout(p=0.1, inplace=False)
)
(transformer): Transformer(
(layer): ModuleList(
(0-5): 6 x TransformerBlock(
(attention): MultiHeadSelfAttention(
(dropout): Dropout(p=0.1, inplace=False)
(q_lin): Linear(in_features=768, out_features=768, bias=True)
(k_lin): Linear(in_features=768, out_features=768, bias=True)
(v_lin): Linear(in_features=768, out_features=768, bias=True)
(out_lin): Linear(in_features=768, out_features=768, bias=True)
)
(sa_layer_norm): LayerNorm((768,), eps=1e-12, elementwise_affine=True)
(ffn): FFN(
(dropout): Dropout(p=0.1, inplace=False)
(lin1): Linear(in_features=768, out_features=3072, bias=True)
(lin2): Linear(in_features=3072, out_features=768, bias=True)
(activation): GELUActivation()
)
(output_layer_norm): LayerNorm((768,), eps=1e-12, elementwise_affine=True)
)
)
)
)
)
(image_projection): ProjectionHead(
(projection): Linear(in_features=2048, out_features=256, bias=True)
(gelu): GELU(approximate='none')
(fc): Linear(in_features=256, out_features=256, bias=True)
(dropout): Dropout(p=0.1, inplace=False)
(layer_norm): LayerNorm((256,), eps=1e-05, elementwise_affine=True)
)
(text_projection): ProjectionHead(
(projection): Linear(in_features=768, out_features=256, bias=True)
(gelu): GELU(approximate='none')
(fc): Linear(in_features=256, out_features=256, bias=True)
(dropout): Dropout(p=0.1, inplace=False)
(layer_norm): LayerNorm((256,), eps=1e-05, elementwise_affine=True)
)
)
Step 16: Dataset Preparation for Regression Model:
In the below code, we are creating a custom Torch dataset for our images and label. We load all the images and labels to this dataset which will be passed to the finetuned model for predictions.
Python3
# ----- Custom Dataset Loader ----- # class SkyImage(Dataset): def __init__( self , img_dir, labels): self .img_dir = img_dir self .img_labels = labels def __len__( self ): return len ( self .img_dir) def __getitem__( self , idx): img_path = os.path.join(img_folder, self .img_dir[idx]) #os.path.join("Extracted Images/", self.img_dir[idx]) image = cv2.imread(img_path) image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) image = cv2.resize(image, ( 244 , 244 )) image = np.moveaxis(image, - 1 , 0 ) label = self .img_labels[idx] return image, label # ----- Dataset ----- # train_images = SkyImage(x_train.to_list(), y_train.to_list()) valid_images = SkyImage(x_val.to_list(), y_val.to_list()) test_images = SkyImage(x_test.to_list(), y_test.to_list()) |
Step 17: Extracting image features from CLIP Model:
Check if CUDA (GPU) is available and set the device accordingly
Python3
# Check if CUDA (GPU) is available and set the device accordingly device = torch.device( "cuda" if torch.cuda.is_available() else "cpu" ) device |
Output:
device(type='cpu')
Building Features
The below get features method intakes our torch dataset for training, testing & validation set and extract image feature vectors from skycam images and returns the extracted feature vectors.
Python3
# ----- Building Features ----- # def get_features(dataset): all_features, all_labels, all_embeddings = [], [], [] with torch.no_grad(): for images, labels in tqdm(DataLoader(dataset, batch_size = 64 )): image_input = torch.tensor(np.stack(images)).to(device). float () image_features = model.image_encoder(image_input) image_embeddings = model.image_projection(image_features) all_features.append(image_features) all_labels.append(labels) all_embeddings.append(image_embeddings) return torch.cat(all_features), torch.cat(all_labels).to(device), torch.cat(all_embeddings).to(device) # ----- Get Features ----- # train_features, train_labels, train_embeddings = get_features(train_images) valid_features, valid_labels, valid_embeddings = get_features(valid_images) test_features, test_labels, test_embeddings = get_features(test_images) |
Step 18: Evaluation Metrics Method:
We build our custom metrics method to evaluate our catboost model. We will be using Mean Absolute Error (MAE), Root Mean Square Error (RMSE) & R-Squared (R2) as our model evaluators.
Python3
def evaluate(name, x, y, n, p): # p: features, #n: no of observations print ( "---------------------------------------------------" ) print ( "{} MAE: {}" . format (name, mean_absolute_error(x, y))) print ( "{} RMSE: {}" . format (name, mean_squared_error(x, y, squared = False ))) print ( "{} MSE: {}" . format (name, mean_squared_error(x, y))) r2 = r2_score(x, y) print ( "{} R2: {}" . format (name, r2)) print ( "---------------------------------------------------" ) |
Step 19: Catboost model Training:
We train the catboost model with multiple hyperparameters. Lets use RMSE as evaluation metrics to evaluate model on validation data. We pass both training & validation data to fit method. By this way we ensure that there is no model overfitting because model is validation on validation data which is not present in training data. I have found out the best hyperparameters for this scenario and have used those in below code. For finding out best hyperparameters you can do hit and trial or apply cross validation.
Python3
# ----- Model Training ----- # CB_model = CatBoostRegressor(iterations = 700 , learning_rate = 0.1 , max_depth = 8 , eval_metric = 'RMSE' , random_seed = 48 ) CB_model.fit(train_features.cpu().numpy(), train_labels.cpu().numpy(), eval_set = (valid_features.cpu().numpy(), valid_labels.cpu().numpy()), use_best_model = True , plot = True , verbose = 50 ) |
Output:
0: learn: 28.1361841 test: 28.2423136 best: 28.2423136 (0) total: 2.13s remaining: 24m 49s
50: learn: 11.5614561 test: 11.9335237 best: 11.9335237 (50) total: 1m 3s remaining: 13m 21s
100: learn: 10.7263689 test: 11.4059249 best: 11.4059249 (100) total: 2m 1s remaining: 12m 1s
150: learn: 10.0566562 test: 11.0617557 best: 11.0617557 (150) total: 3m remaining: 10m 55s
200: learn: 9.5172739 test: 10.8473396 best: 10.8473396 (200) total: 3m 58s remaining: 9m 51s
250: learn: 9.0923719 test: 10.6886373 best: 10.6886373 (250) total: 4m 55s remaining: 8m 47s
300: learn: 8.7042622 test: 10.5734544 best: 10.5734544 (300) total: 5m 51s remaining: 7m 45s
350: learn: 8.3755575 test: 10.4773273 best: 10.4773273 (350) total: 6m 47s remaining: 6m 45s
400: learn: 8.0759744 test: 10.3938604 best: 10.3938604 (400) total: 7m 44s remaining: 5m 46s
450: learn: 7.7814581 test: 10.3233375 best: 10.3233375 (450) total: 8m 42s remaining: 4m 48s
500: learn: 7.5160766 test: 10.2628795 best: 10.2628795 (500) total: 9m 39s remaining: 3m 50s
550: learn: 7.2897423 test: 10.2027638 best: 10.2027638 (550) total: 10m 35s remaining: 2m 51s
600: learn: 7.0611325 test: 10.1574324 best: 10.1574324 (600) total: 11m 33s remaining: 1m 54s
650: learn: 6.8320990 test: 10.1136860 best: 10.1136860 (650) total: 12m 30s remaining: 56.5s
699: learn: 6.6529638 test: 10.0780409 best: 10.0780409 (699) total: 13m 25s remaining: 0us
bestTest = 10.07804086
bestIteration = 699
Step 20: Model Prediction
Lets do the prediction on Train, Test & Validation sets.
Python3
# ----- Model Prediction ----- # cbt_train_pred = CB_model.predict(train_features.cpu().numpy()) cbt_valid_pred = CB_model.predict(valid_features.cpu().numpy()) cbt_test_pred = CB_model.predict(test_features.cpu().numpy()) |
Step 21: Model Evaluation
Now, we have the actual values of cloud cover and predicted values of cloud cover we can evaluate our Catboost model.
Python3
# ----- Model Evaluation ----- # evaluate( "Train" , train_labels.cpu(), cbt_train_pred, len (cbt_train_pred), 1 ) evaluate( "Valid" , valid_labels.cpu(), cbt_valid_pred, len (cbt_valid_pred), 1 ) evaluate( "Test" , test_labels.cpu(), cbt_test_pred, len (cbt_test_pred), 1 ) |
Output:
---------------------------------------------------
Train MAE: 4.437975369402876
Train RMSE: 6.652963762088708
Train MSE: 44.26192681966554
Train R2: 0.9523583786704957
---------------------------------------------------
---------------------------------------------------
Valid MAE: 6.304070193782646
Valid RMSE: 10.078040861839906
Valid MSE: 101.56690761291485
Valid R2: 0.8914442298156392
---------------------------------------------------
---------------------------------------------------
Test MAE: 6.364711156454016
Test RMSE: 10.198410458657648
Test MSE: 104.0075758832577
Test R2: 0.889060898998321
---------------------------------------------------
Insights:
- Out model is perfectly fitted on the data, and can be deployed on UI.
- Test & Validation Metrics are very good.
- There is no overfitting as the Train Metrics are closer to Validation & Test Metrics.
Step 22: Save the Catboost Model
We save the Catboost Model for deploying purpose.
Python3
pickle.dump(CB_model, open ( 'catboost_model.sav' , 'wb' )) |
Part II. UI Inference Codes for Deployed Model
A. cloud_coverage_pipeline.py code:
- The provided below code comprises several essential components for cloud coverage prediction.
- It begins by importing necessary libraries, including popular deep learning frameworks like PyTorch and Hugging Face Transformers.
- The configuration settings (CFG) are specified for hyperparameters, dataset, and model configurations.
- The CLIP Model class is defined to encapsulate the Cloud Coverage Prediction model, incorporating the Image Encoder and optional Text Encoder and Projection Head modules.
- The Image Encoder class employs a pre-trained ResNet model from the ‘timm’ library to extract image features.
- The Sky Image class defines a custom dataset loader for image data, allowing for data transformation and preprocessing. It takes images as input.
- Additional utility functions are included for initializing models (Catboost and CLIP), extracting features from the CLIP model, and predicting cloud coverage based on the extracted features.
- Overall, the code sets up the foundation for cloud coverage prediction, including data loading, model initialization, and feature extraction, making it ready for cloud coverage assessment using the CLIP and Catboost models.
- This file returns predicted cloud coverage in percentage to app.py which returns the cloud coverage to the User on User Interface.
Python3
# Importing Libraries import os import numpy as np import cv2 import torch from torch import nn import timm import pickle from transformers import DistilBertModel, DistilBertConfig from torch.utils.data import Dataset, DataLoader from tqdm.autonotebook import tqdm os.environ[ 'CUDA_VISIBLE_DEVICES' ] = '-1' # Trained Model Configurations CFG = { "debug" : False , "captions_path" : "." , "batch_size" : 64 , "num_workers" : 4 , "head_lr" : 1e - 3 , "image_encoder_lr" : 1e - 4 , "text_encoder_lr" : 1e - 5 , "weight_decay" : 1e - 3 , "patience" : 1 , "factor" : 0.8 , "epochs" : 12 , "device" : "cpu" , "model_name" : 'resnet50' , "image_embedding" : 2048 , "text_encoder_model" : "distilbert-base-uncased" , "text_embedding" : 768 , "text_tokenizer" : "distilbert-base-uncased" , "max_length" : 200 , "pretrained" : True , "trainable" : True , "temperature" : 1.0 , "size" : 224 , "num_projection_layers" : 1 , "projection_dim" : 256 , "dropout" : 0.1 } # Loading Finetuned Clip Model to the below class format class CLIPModel(nn.Module): def __init__( self , temperature = CFG[ "temperature" ], image_embedding = CFG[ "image_embedding" ], text_embedding = CFG[ "text_embedding" ], ): super ().__init__() self .image_encoder = ImageEncoder() self .text_encoder = TextEncoder() self .image_projection = ProjectionHead(embedding_dim = image_embedding) self .text_projection = ProjectionHead(embedding_dim = text_embedding) self .temperature = temperature # Image Encoder Class to extract features using finetuned clip's Resnet Image Encoder class ImageEncoder(nn.Module): def __init__( self , model_name = CFG[ "model_name" ], pretrained = CFG[ "pretrained" ], trainable = CFG[ "trainable" ]): super ().__init__() self .model = timm.create_model( model_name, pretrained, num_classes = 0 , global_pool = "avg" ) for p in self .model.parameters(): p.requires_grad = trainable def forward( self , x): return self .model(x) # Text Encoder - Optional in inference class TextEncoder(nn.Module): def __init__( self , model_name = CFG[ "text_encoder_model" ], pretrained = CFG[ "pretrained" ], trainable = CFG[ "trainable" ]): super ().__init__() if pretrained: self .model = DistilBertModel.from_pretrained(model_name) else : self .model = DistilBertModel(config = DistilBertConfig()) for p in self .model.parameters(): p.requires_grad = trainable self .target_token_idx = 0 def forward( self , input_ids, attention_mask): output = self .model(input_ids = input_ids, attention_mask = attention_mask) last_hidden_state = output.last_hidden_state return last_hidden_state[:, self .target_token_idx, :] # Projection Class - Optional in inference class ProjectionHead(nn.Module): def __init__( self , embedding_dim, projection_dim = CFG[ "projection_dim" ], dropout = CFG[ "dropout" ] ): super ().__init__() self .projection = nn.Linear(embedding_dim, projection_dim) self .gelu = nn.GELU() self .fc = nn.Linear(projection_dim, projection_dim) self .dropout = nn.Dropout(dropout) self .layer_norm = nn.LayerNorm(projection_dim) def forward( self , x): projected = self .projection(x) x = self .gelu(projected) x = self .fc(x) x = self .dropout(x) x = x + projected x = self .layer_norm(x) return x # Class to transform image to custom data format class SkyImage(Dataset): def __init__( self , img, label): self .img = img self .img_label = label def __len__( self ): return len ( self .img) def __getitem__( self , idx): image = cv2.resize( self .img[idx], ( 244 , 244 )) image = np.moveaxis(image, - 1 , 0 ) label = self .img_label[idx] return image, label # Method to initialize CatBoost model def initialize_models(): cbt_model = pickle.load( open ( "/home/gfg19509@gfg.geeksforgeeks.org/PawanKrGunjan/Computer Vision/Skycam/catboost_model.sav" , 'rb' )) clip_model = CLIPModel().to(CFG[ "device" ]) clip_model.load_state_dict(torch.load( "/home/gfg19509@gfg.geeksforgeeks.org/PawanKrGunjan/Computer Vision/Skycam/clip_model.pt" , map_location = CFG[ "device" ])) clip_model. eval () return cbt_model, clip_model # Method to extract features from finetuned clip model def get_features(clip_model, dataset): features, label, embeddings = [], [], [] with torch.no_grad(): for images, labels in tqdm(DataLoader(dataset, batch_size = 64 )): image_input = torch.tensor(np.stack(images)).cpu(). float () image_features = clip_model.image_encoder(image_input) features.append(image_features) label.append(labels) return torch.cat(features), torch.cat(label).cpu() # Method to calculate cloud coverage def predict_cloud_coverage(image, clip_model, CTBR_model): img, lbl = [image], [ 0 ] # Transforming Data into custom format test_image = SkyImage(img, lbl) # Extracting Features from Finetuned CLIP model features, label = get_features(clip_model, test_image) # Predicting Cloud Coverage based on extracted features pred_cloud_coverage = CTBR_model.predict(features.cpu().numpy()) return round ( max ( 0.0 , min ( 100.0 , pred_cloud_coverage[ 0 ])), 1 ) |
app.py code:
The below code sets up a Gradio web interface for a cloud coverage prediction model.
- It imports necessary libraries, initializes the CLIP and Catboost models, and defines a predict function to calculate cloud coverage based on an uploaded sky image.
- The Gradio app takes an image as input, processes it, and provides a textual prediction output, categorizing cloud coverage as low, moderate, or high.
- The interface allows users to upload sky images and receive cloud coverage predictions interactively.
- The app launches and runs for real-time predictions via a web interface.
- There is a separate cloud_coverage_pipeline.py file which includes pipeline code. Keep it in same folder.
Python3
# Importing Libraries import gradio as gr from gradio.components import Image, Textbox from cloud_coverage_pipeline import predict_cloud_coverage, initialize_models # Initialize the CLIP model and CatBoost model only once cbt_model, clip_model = initialize_models() # Method to call cloud_coverage_pipeline.py to calculate cloud coverage def predict(image): if image is None : return "Please Upload a valid sky image!" pred_cloud_coverage = predict_cloud_coverage(image, clip_model, cbt_model) if pred_cloud_coverage < = 33.0 : s = "There is Low Cloud Coverage! Predicted Opaque Cloud Coverage: {}%" . format ( pred_cloud_coverage) elif pred_cloud_coverage > 33.0 and pred_cloud_coverage < = 66.0 : s = "There is Moderate Cloud Coverage! Predicted Opaque Cloud Coverage: {}%" . format ( pred_cloud_coverage) else : s = "There is High Cloud Coverage! Predicted Opaque Cloud Coverage: {}%" . format ( pred_cloud_coverage) return s # Create the Gradio app iface = gr.Interface( fn = predict, inputs = [Image(label = "Upload a Sky Cam image" )], outputs = [Textbox(label = "Prediction" )], title = "GFG EcoTech Hackathon: Cloud Coverage Calculator From a Sky Cam Image" , description = 'Upload only a skycam image and get the opaque cloud coverage in % | (Low: 0-33 | Moderate: 33-66 | High: 66-100) | <a href="https://drive.google.com/drive/folders/1r8mTWEG4XEBZDg0TNyXTYkGzZVixXvcj?usp=drive_link">Find Sample Testing Images Here!</a>' , ) # Run the Gradio app iface.launch(debug = True ) |
Output:
Running on local URL: http://127.0.0.1:7860
To create a public link, set `share=True` in `launch()`.
The output will look like below. We can uplad the images by clicling “Click to Upload”
Predictions
Results:
– |
Train Data |
Valid Data |
Test Data |
---|---|---|---|
No. of Records |
70,168 |
30,072 |
33,414 |
MAE |
4.43 |
6.3 |
6.36 |
RMSE |
6.65 |
10.07 |
10.19 |
R2 |
0.95 |
0.89 |
0.88 |
- Above metrics indicate that Catboost model is perfectly fitted on the data with test RMSE of 10.19 & R2 of 0.88.
- The system successfully predicts cloud coverage (ranging from 0% to 100%) from skycam images, providing valuable weather information.
- Future opportunities include integrating the model with Skycamera and creating early alerting systems for climatic shifts.