Skip to content

Multi-threaded and/or distributed access-pipeline for tensor data.

License

Notifications You must be signed in to change notification settings

sebastian-schlecht/highway

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

52 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Highway

Multi-threaded data access pipeline for tensor data.

CircleCI

abc

Install

run python setup.py install to install.

API

Highway is built around a sequential API to construct a pipeline that moves around data.

Simple example:

from highway.engine import Pipeline
from highway.modules import Augmentations, ImageFileReader
from highway.transforms import FlipX

data_dir = "../some-dir"

# Create an image reader that loads images from a directory containing sub-directories for each label
img_reader = ImageFileReader(data_dir, 16, (240, 320))

# Build the pipeline. Randomly flip images along the x axis with a probability p=0.5
p = Pipeline([img_reader, Augmentations([FlipX()])])

# Pop a batch to feed into NNs
images, labels = p.dequeue()

If you are handling massive data augmentations, you can distribute processing across different machines and scale augmentations according to the machines' CPU capabilities using the ZMQ transport layer. Note: Usually, bind is set to True on worker machines for the sink and False on the training machine for the source. The reason is to minimize port usage and thus the training machine collects data from all concurrent worker machines.

from highway.engine import Pipeline
from highway.modules import Augmentations, ImageFileReader, ZMQSink, ZMQSource
from highway.transforms import FlipX

data_dir = "../some-dir"

if is_data_source_machine:
  # Create an image reader that loads images from a directory containing sub-directories for each label
  img_reader = ImageFileReader(data_dir, 16, (240, 320))
  p = Pipeline([img_reader, ZMQSink("tcp://some-ip:some-port")])

elif is_worker_machine:
  # In case we're on a worker machine, pull some remote batches, augment them and push them into the training machine
  p = Pipeline([ZMQSource("tcp://some-ip:some-port"),  Augmentations([FlipX(), ...], ZMQSink("tcp://some-other-ip:some-port", bind=False)])

else:
  # Training machine
  p = Pipeline([ZMQSource("tcp://some-other-ip:some-port", bind=True)])
  images, labels = p.dequeue()

Testing

To run the test suite, install development dependencies pip install -e .test. From the project root, run tox.

Note: On macOS, levelDB might fail to compile. Run tox like this to allow compilation of levelDB binaries and replace the version number of levelDB with the one installed on your systemself. LIBRARY_PATH=/usr/local/Cellar/leveldb/1.18/lib CPATH=/usr/local/Cellar/leveldb/1.18/include tox

Benchmarking and performance

Benchmark scripts are located in the folder benchmarks. Right now, we measured that inter-process communication roughly maxes out at 500 Mbytes/s (depending on the machine you're using). For TCP communication we measured roughly 120Mbytes/s to be the upper limit (again, depends on the machine you're using but this may provide an idea where we're heading). We're not planning to add custom queues and message services ourselves so performance may be inherently limited by the tools that are available. Right now, we use ZMQ for messaging and msgpack for serialization which offer good performance but we yet have to validate whether that's enough in the future.

What we're planning to add

Some features are not there yet but may come in the near future. This includes, but is not limited to:

  • More augmentations and potentially a cleaner programming API for them
  • More examples
  • Support for object-detection in PASCAL VOC format.
  • Performance tweaks

About

Multi-threaded and/or distributed access-pipeline for tensor data.

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages