GitHub - cgarciae/pypeln: Concurrent data pipelines made easy
source link: https://github.com/cgarciae/pypeln
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
README.md
Pypeline
Pypeline is a simple yet powerful python library for creating concurrent data pipelines.
- Pypeline was designed to solve simple medium data tasks that require concurrency and parallelism but where using frameworks like Spark or Dask feel exaggerated or unnatural.
- Pypeline exposes an easy to use, familiar, functional API.
- Pypeline enables you to build pipelines using Processes, Threads and asyncio.Tasks via the exact same API.
- Pypeline allows you to have control over the memory and cpu resources used at each stage of your pipeline.
Instalation
Install Pypeline using pip:
pip install pypeln
Basic Usage
With Pypeline you can easily create multi-stage data pipelines using 3 type of workers:
Processes
You can create a pipeline based on multiprocessing.Process workers by using the pr
module:
from pypeln import pr import time from random import random def slow_add1(x): time.sleep(random()) # <= some slow computation return x + 1 def slow_gt3(x): time.sleep(random()) # <= some slow computation return x > 3 data = range(10) # [0, 1, 2, ..., 9] stage = pr.map(slow_add1, data, workers = 3, maxsize = 4) stage = pr.filter(slow_gt3, stage, workers = 2) data = list(stage) # e.g. [5, 6, 9, 4, 8, 10, 7]
At each stage the you can specify the numbers of workers
. The maxsize
parameter limits the maximum amount of elements that the stage can hold simultaneously.
Threads
You can create a pipeline based on threading.Thread workers by using the th
module:
from pypeln import th import time from random import random def slow_add1(x): time.sleep(random()) # <= some slow computation return x + 1 def slow_gt3(x): time.sleep(random()) # <= some slow computation return x > 3 data = range(10) # [0, 1, 2, ..., 9] stage = th.map(slow_add1, data, workers = 3, maxsize = 4) stage = th.filter(slow_gt3, stage, workers = 2) data = list(stage) # e.g. [5, 6, 9, 4, 8, 10, 7]
Here we have the exact same situation as in the previous case except that the worker are Threads.
Tasks
You can create a pipeline based on asyncio.Task workers by using the io
module:
from pypeln import io import asyncio from random import random async def slow_add1(x): await asyncio.sleep(random()) # <= some slow computation return x + 1 async def slow_gt3(x): await asyncio.sleep(random()) # <= some slow computation return x > 3 data = range(10) # [0, 1, 2, ..., 9] stage = io.map(slow_add1, data, workers = 3, maxsize = 4) stage = io.filter(slow_gt3, stage, workers = 2) data = list(stage) # e.g. [5, 6, 9, 4, 8, 10, 7]
Conceptually similar but everything is running in a single thread and Task workers are created dynamically.
For more information see the Pypeline Guide.
Pipe Operator
In the spirit of being a true pipeline library, Pypeline also lets you create your pipelines using the pipe |
operator:
data = ( range(10) | pr.map(slow_add1, workers = 3, maxsize = 4) | pr.filter(slow_gt3, workers = 2) | list )
Benchmarks
Resources
Related Stuff
- mpipe
- Process Pools
- Making 100 million requests with Python aiohttp
- Python multiprocessing Queue memory management
Contributors
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK