Consuming OpenCV through Hadoop Storm DRPC Server from .NET

In previous article I gave a basic overview of Hadoop Storm framework and showed how to use it in order to perform some simple operations like word counting and persistence of the information in real time. But Storm service can cover much wider scope of tasks. The scalable and dynamic nature of this product allows to wrap most complicated algorithms and distribute their handling among different machines of the Hadoop cluster. Computer vision is a good candidate for such type of functionality. This term covers a large range of tasks related to the processing of graphical data and performing such operations upon it like objects detection, motion tracking or face recognition. As you can imaging these procedures can be quite expensive and wrapping them within some scalable parallel processing model could significantly increase the end capacity of the potential solutions. You can find a good example of such application called Amazon Recognition in the list of official Amazon services. In this article I want to show you how to build similar products using Hadoop Storm framework and open-source Computer Vision libraries.

Face detection overview

If you do a small dive into the world of computer vision you will probably find out that face processing is one of the most spread tasks solved by this science. People like to see how their smartphones make photos and do some funny things upon the faces located there. But how such applications really works and what is happening behind the scene every time we click the button in our device? Of cause it is impossible to cover all aspects of these complicated operations in single article but lets try at figure out high level stages of this process.

In typical image processing program when we click some Capture button our application sends the signal to the camera to take a new photo. The information is then persisted into the local storage of the device. After that application uploads the photo into memory and begins its scanning. During that stage it tries to localize the pieces which contain the objects of interest  – faces in our case. Internally program usually uses a set of prototype images which contain the example objects. Application’s scanner operates with that collection to handle proper match comparisons. This process is called detection. Once objects have been identified, program usually performs next steps like marking, replacement, filtering or recognition upon them. The overall workflow is sequential and can be divided into a series of tasks which needs to be solved to get the end result:

  1. Capturing data from device input sources
  2. Creation of training sets with different samples of objects of interest which will be used to underline the common features of the generic object
  3. Processing captured data through some framework which will be able to detect  objects of interest relying on the training set of generic objects
  4. Update found matching objects

This process looks quite challenging from the first glance. The good thing is that all these tasks can be accomplished through the usage of different open-source components available for free usage.

Solution design

In my example I want to show you how to process video through computer vision based workflows. We will download a video sample from the internet and will process it through DRPC Server of Hadoop Storm service where we will host the topology. It will parse video into a set of image frames, perform marking face detection with skin recognition upon every frame and return the combined video file created from the set of modified images. This schema represents the workflow of the application:

part8Workflow.png

In order to create this program I will use following open-source components:

  • FFMpeg – complete, cross-platform solution to record, convert and stream audio and video
  • OpenCv – open source computer vision library written natively in C++ which contains different algorithms for working with the graphical data
  • ByteCode JavaCv – bridge solution which wraps OpenCv and ffmpeg products and provides Java-based API for their consumption

Besides on the Hadoop side I will use Storm DRPC Server to host the solution with the logic built with usage of components mentioned above:

  • Storm DRPC – Hadoop service which provides Thrift API for consuming hosted topologies as remote functions

OpenCV detection

Before moving on I would like to give you some more details on how we will perform the actual detection. As you’ve probably noticed OpenCV is the core component of the solution. Indeed this is a powerful library which encapsulates more then 2500 different computer vision algorithms and provides an API for their consumption. For our task we will use its piece related to the detection of objects in the images. We will use Face Detection through Haar Cascades algorithm proposed by Paul Viola and improved by Rainer Lienhart. Originally the problems related with the localization of objects in the images were solved with very expensive algorithms which refereed to the RGB pixels comparisons. Haar Cascades approach introduces a new way of detecting the objects. The idea is that we first create of a set of positive images which contain the correct object of interest and a set of negative images without object of interest. Algorithm then studies theses two collections and identifies a number of common features of the object. For every new image algorithm will create a scanner which will loop through the content and will try to detect these features at different regions of the picture. This video explains the work of algorithm:

As I’ve mentioned native OpenCV framework is implemented in C++ language and it could be quite challenging to adopt this library to the common Hadoop services like Storm. In order to simplify this task we will use the Bytecode JavaCV packages which wrap all required functionality with Java language.

Now once we have some basic understanding of detection technique, we can start implementing the solution. I divided it into three parts – detection implementation, message processing implementation and wrapping workflow into Storm Topology. In order to simplify things I will be embed all the parts within a single Java project.

Implementing Detection:

As I’ve mentioned before in order to perform detection we need to have a training set of object of interest. The good thing is that instead of dealing with its creation we can download ready for use instance from official OpenCV GitHub repository. It is an ordinary XML file with formatted frontal face training history inside. We will point this file for our detection logic later. This part of program will be implemented inside following classes:

  • Face Scanner – primary container for the detectors which will scan through the input images and will mark the faces. This class plays the role of interface for the consumers over OpenCV methods. To make it more generic class will operate with the collection of detectors. Each detector will be able to localize particular object in the upcoming image. Currently we will have a single frontal face detector but in the same fashion you can extend with eyes or mouth detectors:
public class FaceScanner {

  List<FaceDetector> detectors = new ArrayList<FaceDetector>();

  public List<IplImage> Detect(IplImage image) throws Exception {
    if (detectors.size() == 0) {
      throw new Exception("No detectors found to perform recognition");
    }

    List<CvRect> rects = new ArrayList<CvRect>();
    List<IplImage> faces = new ArrayList<IplImage>();

    for (FaceDetector detector : detectors) {
      CvSeq catches = detector.detect(image);
      for (int i = 0; i < catches.total(); i++) {
        CvRect rectNext = new CvRect(cvGetSeqElem(catches, i));

        IplImage ROIFrame = image.clone();
        cvSetImageROI(ROIFrame, rectNext);
        faces.add(ROIFrame);
      }
      detector.mark(CvScalar.RED);
    }

    return faces;
  }

  public void AddDetector(FaceDetector detector) {
    detectors.add(detector);
  }
}
  • Face Detector – will perform the frontal face detection upon images basing on the training set contained in the frontal face classifier downloaded from GitHub repository. Our detector will support the extension with filters which will improve the detection quality with some extra logic:
public class FaceDetector implements FeatureDetector {

    CvHaarClassifierCascade cascadeInstance;

    FrameFilter<IplImage>filter;

    CvSeq detectedSeq;

    IplImage image;

    public FaceDetector(String historyLocation) {
      if (cascadeInstance == null) {
        cascadeInstance = new CvHaarClassifierCascade(cvLoad(historyLocation));
      }
    }

    @Override
    public CvSeq detect(IplImage src) {
      image = src;
      CvMemStorage storage = CvMemStorage.create();
      CvSeq signs = cvHaarDetectObjects(src, cascadeInstance, storage, 2, 1, 1);
      if (signs.total() &gt; 0) {
        cvClearMemStorage(storage);
        if (filter != null) {
          signs = filter.execute(src, signs);
        }
      }

      detectedSeq = signs;
      return signs;
    }

    @Override
    public void mark(CvScalar color) {
      if (detectedSeq != null && image != null) {
        for (int i = 0; i < detectedSeq.total(); i++) {
          CvRect rect = new CvRect(cvGetSeqElem(detectedSeq, i));
          cvRectangle(image, cvPoint(rect.x(), rect.y()),
          cvPoint(rect.width() + rect.x(), rect.height() + rect.y()), color, 2, CV_AA, 0);
        }
      }
    }

    public void WithSkinDetection(boolean markSkin) {
      filter = new SkinFilter(filter, markSkin);
    }
}
  • Skin Filter – will improve the quality of detector by applying the skin verification logic. The filter will analyze the regions located by detectors and will check if the range of colors inside them will be relevant to the human skin color range:
public class SkinFilter extends SequenceFrameFilterBase<IplImage>{

    public static int SkinThreshhold = 35;

    protected boolean markArea = false;

    public SkinFilter(FrameFilter<IplImage> frameFilter, boolean markArea) {
      super(frameFilter);
      this.markArea = markArea;
    }

    @Override
    public CvSeq execute(IplImage image, CvSeq catches) {
      if (frameFilter!=null) catches = frameFilter.execute(image, catches);
      locateSkin(image, catches);
      return catches;
    }

    private void locateSkin(IplImage image, CvSeq catches) {
      for (int i = 0; i < catches.total(); i++) {
        CvRect rect = new CvRect(cvGetSeqElem(catches, i));
        IplImage ROIFrame = image;
        cvSetImageROI(ROIFrame, rect);
        int persentage = getSkinPercentageInFrame(new Mat(ROIFrame));
        if (persentage < SkinThreshhold)
        {
          cvSeqRemove(catches, i);
        }
        cvSetImageROI(image, new CvRect(0, 0, image.width(), image.height()));
      }
    }

    private int getSkinPercentageInFrame(Mat original) {
      IplImage imageWithPhotoFilter = new IplImage(original.clone());
      cvtColor(original, new Mat(imageWithPhotoFilter), COLOR_BGR2YCrCb);

      CvScalar min = cvScalar(0, 133, 77, 0);
      CvScalar max = cvScalar(255, 173, 127, 0);

      IplImage imgSkin = cvCreateImage(cvGetSize(imageWithPhotoFilter), 8, 1);
      cvInRangeS(imageWithPhotoFilter, min, max, imgSkin);

      final MatVector skinContours = new MatVector();
      findContours(new Mat(imgSkin), skinContours, RETR_EXTERNAL, CHAIN_APPROX_SIMPLE);

      if (markArea) drawContours(original, skinContours, -1, AbstractScalar.GREEN);
      double totalSize = 0;
      for (int i = 0; i < skinContours.size(); i++) {
        totalSize = totalSize + contourArea(skinContours.get(i));
      }

      int persentage = (int) ((totalSize / (original.size().width() * original.size().height())) * 100);

      return persentage;
   }
}

Message processing implementation:

Our detector module is ready to process images. But it can accept them in special OpenCV IplImage format only. We need to convert our video files into a set of suitable image objects. For this purpose we will use ffmpeg frame grabber. This class allows us to split the incoming video file into a set of frames and convert them into the IplImage objects suitable for OpenvCV processing. On the other end we will use ffmpeg frame recorder to rejoin the images produced by our detector into a single video file:

  • Grab:
private static List<IplImage> Grab(byte[] data) throws IOException {

  List<IplImage> images = new ArrayList<IplImage>();
  InputStream bis = new ByteArrayInputStream(data);

  FileOutputStream fos = new FileOutputStream("/tmp/tmp.avi");
  fos.write(data);
  fos.close();

  logger.info("File saved to temp dir. Trying to read frames");

  FFmpegFrameGrabber grabber = new FFmpegFrameGrabber("/tmp/tmp.avi");

  try {
    grabber.start();

    int frame_count = grabber.getLengthInFrames();

    logger.info("Found " + frame_count + " frames");

    for (int i = 0; i < frame_count; i++) {
      grabber.setFrameNumber(i);
      Frame frame = grabber.grabFrame();

      if (frame == null)
        break;
      if (frame.image == null)
        continue;

      OpenCVFrameConverter.ToIplImage converter = new OpenCVFrameConverter.ToIplImage();
      IplImage grabbedImage = converter.convert(frame);
      IplImage grayImage = grabbedImage.clone();

      images.add(grayImage);
    }
    grabber.stop();

    logger.info("Grabbing finished. Processes images - " + images.size());

  } catch (java.lang.Exception e) {
      e.printStackTrace();
  }
  return images;
}
  • SaveToVideo:
private static byte[] SaveToVideo(List<IplImage> images)
throws org.bytedeco.javacv.FrameRecorder.Exception, IOException {
  File newFile = new File("/tmp/sample.mp4");

  FFmpegFrameRecorder fr = new FFmpegFrameRecorder(newFile, 640, 480);
  fr.setVideoCodec(avcodec.AV_CODEC_ID_H264);
  fr.setFormat("mp4");

  fr.start();

  OpenCVFrameConverter.ToIplImage converter = new OpenCVFrameConverter.ToIplImage();

  for (int i = 0; i < images.size(); i++) {
    IplImage img = images.get(i);
    Frame frame = converter.convert(img);
    fr.record(frame);
  }

  fr.stop();
  byte[] data = Files.readAllBytes(Paths.get("/tmp/sample.mp4"));
  return data;
}

On the top level we will create a method for initialization of Face Scanner instance and for maintenance of the process of detection of faces in the images produced by the grabbers:

  • DetectFaces:
private void DetectFaces(List<IplImage> images) {
  int size = images.size();

  FaceScanner searcher = new FaceScanner();
  FaceDetector frontalDetector = new FaceDetector(XML_FILE_FACE);
  frontalDetector.WithSkinDetection(false);
  searcher.AddDetector(frontalDetector);

  logger.info("Face detector created");

  for (int i = 0; i < size; i++) {

    IplImage img = images.get(i);

    try {
      List<IplImage> faces = searcher.Detect(img);
    } catch (Exception e) {
      e.printStackTrace();
    }

    IplImage imageWithPhotoFilter = new IplImage(img.clone());
    Mat imgmt = new Mat(img);

    images.set(i, imageWithPhotoFilter);
  }
}

Wrapping workflow into Storm Topology:

At this stage most part of our detection logic is ready and we can start wrapping it into a single Storm Topology workflow. In previous article I showed how to use Kafka message broker for interactions with the service from side of external applications. But in this article I want to follow another approach called Distributed Remote Procedure Call. This technique allows to consume topologies like usual remote procedures through a special cross platform Thrift API. Such step will make our solution universal for most part of exiting platforms. But in order to create such application our Topology should follow two requirements:

  • Bolts within topology should re-emit tuples with starting with id as first field
  • We should use Liner DRPC Topology made with a LinearDRPCTopologyBuilder class

You may have noticed that LinearDRPCTopology is considered to be deprecated and some recent articles advice to work with Trident Topologies instead. But I think that in our case we would introduce some piece of unnecessary overhead for our task and it would be easier to follow the original approach. Now we will create our VideoDetectionBolt and will override its base methods following first requirement:

public class VideoDetectionBolt extends BaseBasicBolt {

  private static Logger logger = Logger.getLogger(VideoDetectionBolt.class.getName());

  public static final String XML_FILE_FACE = "/tmp/haarcascade_frontalface_alt.xml";

  @Override
  public void execute(Tuple tuple, BasicOutputCollector collector) {
    try {
      byte[] data = Base64.getDecoder().decode(tuple.getString(1));
      logger.info("Got new message " + data.length);
      List<IplImage> images = Grab(data);
      logger.info("Located  " + data.length + " images");
      DetectFaces(images);
      byte[] updatedVideo = SaveToVideo(images);
      String dataStr = Base64.getEncoder().encodeToString(updatedVideo);
      logger.info("Video processed. Emitting back to sender - " + dataStr.length());
      collector.emit(new Values(tuple.getValue(0), dataStr));
    } catch (IOException e) {
      e.printStackTrace();
    }
  }

  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("id", "video"));
  }

  //Detection methods
  //........
}

Don’t forget to put the training set into some location accessible by the worker:

wget -P https://github.com/opencv/opencv/blob/master/data/haarcascades/haarcascade_frontalface_alt.xml

At the end we need to summarize everything using LinearDRPCTopologyBuilder and submit our workflow using StormSubmitter class.

public class VideoProcessingTopology {

    public static void main(String[] args) throws Exception {
      Config cfg = new Config();
      cfg.put("drpc.servers", Arrays.asList(new String[]{"localhost"}));

      LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("video");
      builder.addBolt(new VideoDetectionBolt(), 1);

      cfg.setNumWorkers(1);
      StormSubmitter.submitTopology(args[0], cfg, builder.createRemoteTopology());
   }
}

Now lets submit the topology and verify that it is up and running:

storm jar target/RPVideoProcessingTopology-0.0.1-SNAPSHOT.jar demo.topologies.VideoProcessingTopology

storm list

Topology_name Status Num_tasks Num_workers Uptime_secs
——————————————————————-
VideoTopology ACTIVE 7 1 100083

Consuming DRPC Topology from .NET

I’ve mentioned earlier that DRPC server provides Thrift API. It means that we can build custom client applications using original contracts of the service on any language which supports this protocol. In our program we will use Storm.Net.Adapter NuGet package which already contains Thrift client proxi classes which we can be used to send messages to Storm DRPC server. The code part here is very simple:

class Program
{
  static void Main(string[] args)
  {
    var bytes = File.ReadAllBytes(@"c:\out.mp4");
    string base64Str = Convert.ToBase64String(bytes);

    DRPCClient client = new DRPCClient("localhost", 3772);
    string result = client.execute("video", base64Str);

    File.WriteAllBytes("tmp.avi", Convert.FromBase64String(result));
  }
}

The solution is ready and now it is time to test it end to end. I will use a mp4 sample downloaded for this link.

After processing the data through the topology I obtained the following output video file.

As you can see the original video was updated with the marking of the faces detected in every frame of the file. With certain percentage of invalid catches in general our program managed to perform the localization of face objects very well. Now we can start scaling it with any extra pieces of logic like replacements or recognition. Also we can try improve the quality of algorithm with extra features like eyes and mouth detection. Besides we can play with the paralleling parts of the workflow through different Storm workers and executors to increase the speed of our algorithms.

The sources are available in GitHub.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s